State serialization in Couchbase Kafka Connector v2

I am using the Couchbase Kafka Connector to stream bucket changes to a Kafka topic and was testing the case of Kafka being down, therefore not able to write the change to it.

First I was surprised to not see a lot of error logs when this happen, so was not sure that the write were not going through! Confirmed when restarting Kafka and looking at the messages in the topic - the latest DCP messages were missing.

Then, it looks like the vBucket state is updated in Zookeeper even if writing to Kafka fails, therefore, even restarting the connector from its last known state will not pick up the missed changes?

Am I missing something?
What is the intended usage in this case? Are we expected to replay the DCP stream from beginning of time? And in this case, I guess to avoid duplicate, will have either to purge the current topic and stream the changes to another topic.

Any help and guidance will be appreciated.
Thanks.

@avsej - this looks like a question for you.

@lbertrand what versions are you using? (Couchbase Server, Kafka, and Couchbase Kafka Connector)

@WillGardella Here is the information requested:

  • Couchbase Server 4.5.0
  • Kafka Server 0.10.0.1
  • Couchbase Kafka Connector 2.0.1

Thanks.

At the moment, it does not support avoiding of duplicates and buffering messages in case of failure or for DCP snapshots. it just relays DCP message to Kafka and then drops it. State is serialized in parallel, this is why you see it in Zookeeper.

In upcoming Kafka connector update state is tied to offsets in Kafka, so it will be easier to continue. But as for DP2 release of third version of connector, the snapshot buffering mode is not implemented yet, so it still might cause duplicates on retry.

@avsej Thanks for the update…
So I guess until the new couchbase kafka connector version 3 handle the buffering and duplicates detection, in the case of version 2, we will have to mitigate this issue if this happen by replaying the full DCP into a new topic, for example, to make sure we capture the missed one!

Or can you think of another option to handle the use case?