Kafka-couchbase-connect sink couchbase.remove.document.id=true is not working

This issue is secnario based issue, it’s appropriated if anyone solve this.

I have Apache kafka_2.13, couchbase server and apache kafka-cocuhbase-connect-4.0 running in my windows machine. The source-connector reads the data from couchbase bucket-1(sourceBucket) and publish in the kafka Topic. The sink-connector reads the data from the kafka Topic and insert the data in the couchbase bucket-2(sinkBucket).

Presently, any update in bucket-1(sourceBucket) is updating the bucket-2(sinkBucket).

My requirement is: The mutation of any document in the bucket-1(sourceBucket) need to store as new Document in the bucket-2(sinkBucket).

Ex:: sourceBucket having 3 Documents Document-1, Document-D2, Document-D3. And in the sinkBucket also 3 Documents Document-1, Document-D2, Document-D3 are present.

Presently:: If we update the Document-1 for first time in the sourceBucket , then Document-1 in sinkBucket also updating. If we update the Document-1 for second time in the sourceBucket , then Document-1 in sinkBucket also updating.

My requirement is: If we update the Document-1 for first time in the sourceBucket , then new Document-4 need to insert in sinkBucket. If we update the Document-1 for second time in the sourceBucket , then new Document-5 need to insert in sinkBucket.

Finally, sourceBucket need to have 3 Documents and sinkBucket need to have 5 Documents. But presently sourceBucket need to have 3 Documents and sinkBucket need to have 3 Documents.

========================= I have tried to remove the existing document id by couchbase.remove.document.id = true in quickstart-cocuhbase-sink.properties file. Then in the sinkBucket , it will treat always sourceBucket Document as new Document in the sinkBucket.
But couchbase.remove.document.id = true is not working.

Thanks…

@david.nault can you please assist ?

I have tried to add couchbase.remove.document.id = true in the quickstart-cocuhbase-sink.properties file.
But couchbase.remove.document.id = true is not working.

I can’t find other solution.
Please assist.

Hi Raj,

It sounds like you want the sink connector to create a new document with a unique ID every time it receives a message. Is that correct?

The closest you could get would be to write a Single Message Transform to remove the key from the message. When the sink connector receives a message without a key, it uses a default document ID of the form topic/partition/offset. This would generate a unique document in Couchbase for each Kafka message.

Thanks,
David

Hi David,
I have tried the Single Message Transofrom but still it’s not working.

I have added below lines.

transforms= MakeMap,InsertSource
transforms.MakeMap.type=org.apache.kafka.connect.transforms.HoistField$Value
transforms.MakeMap.field=line
transforms.InsertSource.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.InsertSource.static.field=id
transforms.InsertSource.static.value=05Aug-Jul2020

Here it is creating new Field with value “05Aug-Jul2020” in the same Document in the Bucket but not creating the new Document in the Bucket.

it’s mentioned in the docs.couchbase.com offical website, we you can write Java code to implement more complex logic. - in the " Modify Documents Before Writing to Couchbase" section.
https://docs.couchbase.com/kafka-connector/current/quickstart.html

My doubt is:
I will try to implement logic by extends ConnectRecord> implements Transformation class and interface.
where to place in the custom-java.class file and how to call this in the quickstart-couchbase-sink.properties…
Can you help me in this.

Hi David,

I have created CustomRemoveDocumentID implements Transformation class and interface.
And places the jar in the $KAFKA-COUCHBASE-CONNECT-HOME.
Added the configuration in the properties file. It’s working fine. I can able to create new Document ID in SinkBucket for every mutation (everytime update in the Document).

Thanks for your response.

Thanks,

Hi Raj,

I’m glad you were able to engineer a solution. Congrats!

Thanks,
David

Hi David,

I am having facing one more issue.
Normally, two separate buckets will be using for kafka-couchbase-source-connector and kafka-couchbase-sink -connector.
Requirement: Use one bucket for both kafka-couchbase- source-connector and kafka-couchbase-sink-connector. I tried to use one bucket for both. This result in creating N-number of same Documents in the Bucket for only single Document update.
As per my assumption, kafka-couchbase-Source-connector and kafka-couchbase-Sink -connector require two separate buckets. This way it’s working fine. But the requirement is use Single Bucket.

Is it possible to use same Bucket for kafka-couchbase- Source-connector and kafka-couchbase-Sink-connector.

Kindly replay for this.
Thanks,

Is it possible to use same Bucket for kafka-couchbase- Source-connector and kafka-couchbase-Sink-connector?

No, I cannot recommend that. You’ll end up in with an out-of-control loop where documents written by the sink connector are fed back into the system by the source connector, without end.

Thanks,
David

Hi David,
For the Filter story, I have written CustomFilter class and it is working for single Document type.
Here , I need to filter 15 different type of Documents. How I can pass the multiple “Filter_type” Fields from quickstart-couchbase-source.properties or .json file.
I tried to calling the quickstart-couchbase-source.properties file using FileReader and Properties class, it is working fine. But I need to change the location of the file in java class for DEV, PROD. This is not a suggestible.
can I avoid this and pass using Key=value in source.properties or .json file. Where I can pass Filter_type array , like connection.cluster_address=.
Please help me in this.

Hi Raj,

I’ve filed an enhancement request KAFKAC-220 that would allow custom components like Filters and SourceHandlers to read properties from the connector config.

I’m working on it this afternoon. If all goes well, it should be ready for the August 18th release of version 4.0.0.

Thanks,
David

Hi David,

I am having facing issue in bulk insert.

i am trying to insert 50 records from java into source bucket. All records are inserted in the source buckets but in the sink bucket 23 records are missing.
I have seen this issue with single task, 2 tasks also.

can you suggest any solution.

Thanks,
Raj

Hi Raj,

  • What version of Kafka?
  • What version of Couchbase?
  • What version of the Couchbase Kafka connector?
  • Are all 50 document IDs unique?
  • Have you looked for clues in the Kafka Connect log?
  • If you restart the connector, do the missing documents appear in the sink?

Thanks,
David

Hi David,

In my project I have created a CustomRemoveDocumentID implements Transformation class and interface.
By using this, I can able to push Source-bucket data to sink-Bucket database. This is working in single insert.
When I am doing bulk insert. The 50% data records is missing.

I had done analysis of the issue., I found the issue. When I doing bulk insert of 10 or 50 or 100 or 500 records. All records are pushed to Kafka Topic. From Kafka topic each record sending to kafka-sink-connector. In the sink connector, I have implemented the CustomRemoveDocumentID class to remove DocumentID and replace with new DocumentID. This class is not able to hold the all the Messages coming from Kafka Topic.
CustomRemoveDocumentID class processing one message after the message is processed which ever message is present, it processing. Here data leakage is happening.
I have implemented this in multi-threaded mode also , but can’t hold all messages.

This is not working in confluent Kafka, but working in Apache Kafka. In my project, confluent Kafka is using.
I have given the software details using as below:

Softwares used in office system:: Bulk Insert failed

  • What version of Kafka? - Confluent Kafka 5.5.1
  • What version of Couchbase? -couchbase 6.5
  • What version of the Couchbase Kafka connector? 3.4.8
  • Are all 50 document IDs unique? Yes, all have unique document IDs
  • Have you looked for clues in the Kafka Connect log? in the logs, i didn’t find anything.
  • If you restart the connector, do the missing documents appear in the sink? I didn’t restarted. the connector.

Softwares used in Personal system::Bulk Insert working fine.

Same bulk update apache kafka, with couchbaseinc-kafka-connect-couchbase-4.0.0-dp.3 .
In all secnarios, it is working.

  • What version of Kafka? -Apache kafka_2.13-2.5.0
  • What version of Couchbase? - couchbase 6.5
  • What version of the Couchbase Kafka connector? couchbaseinc-kafka-connect-couchbase-4.0.0-dp.3
  • Are all 50 document IDs unique? Yes, all have unique document IDs

Can you kindly help me in this.

Hi Raj,

Can you explain in more detail what the CustomRemoveDocumentID class is doing? I don’t understand why a Single Message Transform would want to “hold” more than one message.

Thanks,
David