Couchbase kafka source connector order of messages

My question is regarding the ordering of messages from CouchBase to Kafka topic. This is not well documented in my opinion and hence this question.

Use case

The use case is to get the changes for every document and get the latest change for every document. Till the messages reach the Kafka topic, its real time and the consumer of the Kafka topic can be a batch application as well. This means that i may end up receiving multiple events for the same document in a given batch.

My understanding

  1. CouchBase bucket has multiple vBuckets
  2. When a document is inserted into the bucket, it gets into one of the vBuckets (based on the hash of the document key). This means that if the document with the same key is updated, it goes to the same vBucket.
  3. CouchBase streams the change events using the DCP for the other applications like Kafka Source connector to consume
  4. CouchBase ensures the ordering of the events per vBucket. Cluster wide ordering is not guaranteed. This means that if the document with the same key is updated multiple times, then those event ordering is guaranteed.
  5. Now, when the Kafka source connector reads the DCP events, it reads in the same order that came into the DCP streams
  6. Ordering within a vBucket is guaranteed. So far so good

Question

  1. When the Kafka source connector publishes the messages to the Kafka topic, does it maintain the same ordering?
  2. How does the source connector decides the kafka partitions for the messages? (assuming that there would be more than 1 partition for the topic)
1 Like

Yes, the Couchbase connector publishes documents to Kafka in the same order they were received from the DCP stream.

By default the partition is assigned based on the Couchbase document ID. If you need to override that behavior, you can implement a custom SourceHandler and specify a Kafka partition when building the CouchbaseSourceRecord.

Alternatively, you can write a Single Message Transform that assigns the partition. There’s an example on StackOverflow: java - Setting Partition Strategy in a Kafka Connector - Stack Overflow

Thanks,
David

1 Like

Iv found out, that when i set tasks.max=2, then the events for one document key are no longer in sequence, i have to set it to tasks.max=1 Im running the connector in standalone mode, i have only one couchbase node (1 vBucket), and only one kafka topic partition.
How can i use more than one task and keep the events for one document key in oder?

Hi @Henry ,

That’s a surprising observation, since the workload is split between tasks in a way that should prevent that from happening. What are the steps to reproduce the issue?

Thanks,
David N

@Henry I too have the same requirement to have more parallelism (higher tasks.max) but at the same time maintain the order with more than 1 topic partition. Can you please provide the steps as @david.nault requested?

@david.nault Few questions,

  1. So even if there are more tasks and more topic partitions, the ordering is maintained end to end? i.e. the documents with the same key go to the same task and to the same topic partition

  2. The source handler would be called in every task. Is this correct?

  3. Also when i run the command ‘curl localhost:8083/connectors/CouchDBSourceConnectorCaseTs/tasks’ i get the attached output. Whats the partitions attribute and why is 0…1023?

[{“id”:{“connector”:“CouchDBSourceConnectorCaseTs”,“task”:0},“config”:{“connector.class”:“com.couchbase.connect.kafka.CouchbaseSourceConnector”,“partitions”:“0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,…,1020,1021,1022,1023”,…}}]

Hi @manikandan.kannan ,

  1. Yes, ordering is maintained end to end, with two caveats. First, there’s the aforementioned caveat that ordering is only maintained between documents in the same Couchbase virtual bucket. Second, when the connector restarts, the stream may rewind to the last save point, resulting in some messages being republished.

More details on that second caveat: The Kafka Connect framework manages the connector’s “offset” (think of it like a save point in the source stream). For performance reasons this offset is not saved after every successful publication, but is instead saved periodically at an interval defined by the offset.flush.interval.ms worker config property. Quick example scenario:

  • Connector publishes messages A, B, and C.
  • Framework decides it’s time to save the offset.
  • Connector publishes messages D and E and the shuts down without offsets being saved.
  • Connector starts again and framework tells it “C” was the last message published.
  • Connector re-sends messages D and E.

So there’s the potential for the message stream to rewind to a previous point in time (last saved offset) but other than that the messages within a virtual bucket are always published in the same order.

This is a limitation of the Kafka Connect framework.

  1. Yes, source handler is called in every task.

  2. The “partitions” attribute in that response lists the Couchbase virtual buckets (vbuckets) the task is responsible for replicating. A bucket consists of 1024 independent vbuckets (or 64 on macOS) numbered 0 to 1023. The response you got indicates there’s a single connector task running, and that task is handling all of the vbuckets.

Incidentally, it looks like you’ve named the connector “CouchDBSourceConnectorCaseTs”. Just wanted to point out that “CouchDB” and “Couchbase” are two different databases. Yeah, the names are confusing, and if we had a time machine we’d probably go back and pick a more distinct name :slight_smile:

Thanks,
David

@david.nault Thanks for your reply. Yes, the connector name can be more appropriate.

What will happen in the situation when tasks.max > number of vBuckets? I know this may not be the desirable setting but am just curious. I assume that either some of the tasks would be idle or would not be initiated at all.

That’s my assumption as well. Not recommended.

1 Like

@david.nault What is the message delivery semantics? Is it at least once by default? If so, is it possible to achieve exactly once semantic?

@manikandan.kannan At least once. There’s no built-in way to achieve exactly once semantics. Not sure how you’d even do that.

@david.nault, Is it possible using the idempotent feature of Kafka producer? Not sure if the Kafka Couchbase connector has enabled that capability.

Is it possible using the idempotent feature of Kafka producer? Not sure if the Kafka Couchbase connector has enabled that capability.

@manikandan.kannan I think you’re right; that’s the approach the Kafka folks are considering with Kafka Improvement Proposal 318 and Jira issue KAFKA-7077. Unfortunately, since the Kafka producer is managed by the Kafka Connect framework, it’s out of our hands. We’ll need to wait for this improvement on the Kafka side.

1 Like

@david.nault Thanks a ton