Kafka-couchbase-connect sink couchbase.remove.document.id=true is not working

This issue is secnario based issue, it’s appropriated if anyone solve this.

I have Apache kafka_2.13, couchbase server and apache kafka-cocuhbase-connect-4.0 running in my windows machine. The source-connector reads the data from couchbase bucket-1(sourceBucket) and publish in the kafka Topic. The sink-connector reads the data from the kafka Topic and insert the data in the couchbase bucket-2(sinkBucket).

Presently, any update in bucket-1(sourceBucket) is updating the bucket-2(sinkBucket).

My requirement is: The mutation of any document in the bucket-1(sourceBucket) need to store as new Document in the bucket-2(sinkBucket).

Ex:: sourceBucket having 3 Documents Document-1, Document-D2, Document-D3. And in the sinkBucket also 3 Documents Document-1, Document-D2, Document-D3 are present.

Presently:: If we update the Document-1 for first time in the sourceBucket , then Document-1 in sinkBucket also updating. If we update the Document-1 for second time in the sourceBucket , then Document-1 in sinkBucket also updating.

My requirement is: If we update the Document-1 for first time in the sourceBucket , then new Document-4 need to insert in sinkBucket. If we update the Document-1 for second time in the sourceBucket , then new Document-5 need to insert in sinkBucket.

Finally, sourceBucket need to have 3 Documents and sinkBucket need to have 5 Documents. But presently sourceBucket need to have 3 Documents and sinkBucket need to have 3 Documents.

========================= I have tried to remove the existing document id by couchbase.remove.document.id = true in quickstart-cocuhbase-sink.properties file. Then in the sinkBucket , it will treat always sourceBucket Document as new Document in the sinkBucket.
But couchbase.remove.document.id = true is not working.

Thanks…

@david.nault can you please assist ?

I have tried to add couchbase.remove.document.id = true in the quickstart-cocuhbase-sink.properties file.
But couchbase.remove.document.id = true is not working.

I can’t find other solution.
Please assist.

Hi Raj,

It sounds like you want the sink connector to create a new document with a unique ID every time it receives a message. Is that correct?

The closest you could get would be to write a Single Message Transform to remove the key from the message. When the sink connector receives a message without a key, it uses a default document ID of the form topic/partition/offset. This would generate a unique document in Couchbase for each Kafka message.

Thanks,
David

Hi David,
I have tried the Single Message Transofrom but still it’s not working.

I have added below lines.

transforms= MakeMap,InsertSource
transforms.MakeMap.type=org.apache.kafka.connect.transforms.HoistField$Value
transforms.MakeMap.field=line
transforms.InsertSource.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.InsertSource.static.field=id
transforms.InsertSource.static.value=05Aug-Jul2020

Here it is creating new Field with value “05Aug-Jul2020” in the same Document in the Bucket but not creating the new Document in the Bucket.

it’s mentioned in the docs.couchbase.com offical website, we you can write Java code to implement more complex logic. - in the " Modify Documents Before Writing to Couchbase" section.
https://docs.couchbase.com/kafka-connector/current/quickstart.html

My doubt is:
I will try to implement logic by extends ConnectRecord> implements Transformation class and interface.
where to place in the custom-java.class file and how to call this in the quickstart-couchbase-sink.properties…
Can you help me in this.

Hi David,

I have created CustomRemoveDocumentID implements Transformation class and interface.
And places the jar in the $KAFKA-COUCHBASE-CONNECT-HOME.
Added the configuration in the properties file. It’s working fine. I can able to create new Document ID in SinkBucket for every mutation (everytime update in the Document).

Thanks for your response.

Thanks,