Kafka Couchbase connector putting 2 events in kafka for 1 change

Hi Team,

Kafka Couchbase connector is pushing twice events for each document change.

What can be the issue can it be that the DCP client is giving the data twice.

This is very urgent as it is putting millions of events in kafka which require multiple processing.

CC:- @househippo @avsej

Regards
Pankaj Sharma

Hi Pankaj,

Is it safe to assume you’re running the connector in distributed mode? I wonder if the messages are being sent twice because the workers don’t know they’re part of the same cluster. Can you check that the group.id worker config property is the same for all workers?

If that’s not the problem, it might be useful to see your worker and connector configurations (with passwords and other sensitive data removed, of course).

Thanks,
David

No I am not running it in distributed mode. I have only one pod of the connector running. And that pod is a standalone pod.

I will send the configs ASAP. Or somebody from team will send the config with removed passwords.

Thanks. Would also be good to know the versions of Couchbase, Kafka Connector, and Kafka.

Couchbase server 5.1.1, Kafka Connector 3.3.0 (The one where we fixed restart of kafka connector after restart of couch base server).
Kafka Version :- 1.1.0

and configuration used is as follows :-

connect-standalone.properties:
bootstrap.servers=kafka-0.broker.kafka.svc.cluster.local:9092,kafka-1.broker.kafka.svc.cluster.local:9092,kafka-2.broker.kafka.svc.cluster.local:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
rest.port=8085

offset.flush.interval.ms=10000
max.request.size=15048576
quickstart-couchbase.properties:
name=test-couch4
connector.class=com.couchbase.connect.kafka.CouchbaseSourceConnector
tasks.max=2
connection.cluster_address=192.192.1.41
connection.bucket=bucketname
connection.password=********
connection.username=bucketname
connection.timeout.ms=20000
couchbase.stream_from=SAVED_OFFSET_OR_NOW
topic.name=couchbase.app-data
use_snapshots=false
max.request.size=15048576

I hope this will help finding out if there is miss configuration.

@david.nault Did you got a chance to look into this. Else I will have to do a customisation and will need to keep a in memory hash of all document ids and then before posting it into kafka I will also need to check it.

Hi @pankaj.sharma,

The configuration looks okay. The behavior you’re seeing is mysterious.

The “obvious” causes would be if a second copy of the connector is somehow getting launched. You could check the number of DCP connections by going to the bucket statistics and expanding “DCP Queues” and viewing the “Other” DCP Connections graph:

The connector is expected to open tasks.max (2, in this case) connections per Couchbase node. Might want to take a baseline measurement without the Kafka connector running, then measure again after it’s running. If the number jumps up by more than 2 per Couchbase node, then it’s likely there’s more than one copy of the connector running.

Another cause would be if the application talking to Couchbase is modifying the documents redundantly. A DCP mutation event is fired even if a document has identical contents before and after an update operation. I would try seeing how many Kafka messages are published when a document is modified via the web console UI.

The connector logs might also have some useful information. Grepping for “INFO Poll returns” should show how many messages are published in each batch. The expected value here would be 1 for each Couchbase mutation or deletion.

If you want to enable debug logging, there should be a connect-log4j.properties file somewhere under your Kafka installation. Set the root log level to DEBUG and restart the connector. Now you’ll see messages containing “About to send” which show how many messages the Kafka Connect framework is publishing.

If all else fails, plugging in a custom SourceHandler like you described would indeed give you complete control over the message flow. But I wouldn’t recommend that as a long-term solution … there’s got to be something else going wrong.

Please check for the possibility of duplicate connector instances, and also see if you can reproduce the problem be editing a document in the Web UI. If that doesn’t get us any closer to a solution then we’ll need to dig deeper.

Thanks,
David

David the connector is running in kubernetes. And there is no replica of the pod. I can provide whatever you need to debug this. but this is reproducable for almost all documents in our cluster.

Hi @pankaj.sharma,

What were the results of the suggested actions?

  • Can you reproduce the problem by creating or editing a document in the Couchbase web console?

  • How many DCP connections are shown in the Couchbase web console?

Given the urgency of the situation, if you are an Enterprise Edition customer I would recommend filing a support ticket at this point so we can get some engineers from our support team involved. (I don’t have any Kubernetes experience, and it would take me a while to ramp up to the point where I could reproduce your environment.)

Thanks,
David

We are creating our own kafka connector using DCP and Spring Kafka Integration. So that any customisation required can be done. Love to get some tips from you.