Update & Delete are not working with kafka as desired

Hello,

Requesting your assistance on the below :
Source : Oracle Database
Target : couchbase DB
Replication of data using Kafka…

  1. When I do an insert, it works fine.
  2. When I update a record on the source, it puts a new entry on the target (couchbase) instead of updating the already existing information.
  3. When I delete a record in the table on the source (oracle), I dont see the record getting deleted on the target (couchbase).

Issue : Insert works fine, but update and delete are not working. Please let me know what should I do to fix the issue ? What am I missing here?

My Source Connector :

name=testusersUTC
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=3
connection.password=********
connection.url=jdbc:oracle:thin:@hostname:1521/ORCL
connection.user=kfkuser
table.whitelist=USERS1
mode=timestamp+incrementing
incrementing.column.name=ID
timestamp.column.name=MODIFIED
topic.prefix=ANUTC
plugin.path=/u01/kafka/confluent-5.5.1/share
table.poll.interval.ms = 60
poll.interval.ms = 50
connection.backoff.ms = 10
output.data.format=JSON

My Sink Connector :

name=test-couchbase-sink
connector.class=com.couchbase.connect.kafka.CouchbaseSinkConnector
tasks.max=2
topics=ANUTCUSERS1
connection.cluster_address=127.0.0.1
couchbase.bootstrap.timeout=10s
connection.bucket=BucketPRD
connection.username=Administrator
connection.password=************
couchbase.persist.to=NONE
couchbase.replicate.to=NONE
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
value.converter.schemas.enable=false
key.converter.schemas.enable=false

Hi Asif, welcome to the forum!

If a Kafka message doesn’t have a key, the Couchbase sink connector uses a synthetic key consisting of the Kafka topic, partition, and offset; every Kafka message will generate a unique document in Couchbase. That’s probably why updates aren’t working – you need to assign a key to the Kafka records so Couchbase knows which document to update/delete.

By default, the JDBC source connector does not assign a key to the published Kafka records. You can assign the key using Single Message Transforms (SMTs). Or you can tell the Couchbase sink to use a field of message as the document id.

From what little I’ve read, it seems like the JDBC source connector has no way of knowing when you delete a row. If you need to propagate deletions to Kafka, you can use a “soft delete” instead of actually deleting the row. (Alternatively, you could research whether the Debezium Connector for Oracle meets your requirements.)

The Couchbase Sink will delete a document when the Kafka record has a null value. To get that to work, you can use an SMT to transform the output of the JDBC connector so the Kafka record’s key is the ID of the document to delete, and the Kafka record’s value is null.

Thanks,
David

Thanks so much for your response.

I tried adding the below content to the file “/u01/kafka/confluent-5.5.1/etc/schema-registry/connect-avro-standalonekey.properties” and tried starting the source connector. I also tried adding this content to my JDBC source connector properties file but no luck.

Note : The column with the name as “ID” is the primary key column in my oracle database (Source).

#Added the below for key testing
transforms=createKey,extractInt
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.createKey.fields=id
transforms.extractInt.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractInt.field=id

Queries : 1. Can you please tell me what am I missing here ? Anything else to be done apart from this.
2. I still dont see update and delete working on the target (couchbase). Instead of the document getting updated in couchbase, a document is getting created and also old document is existing there as well with no changes. It would help me atleast if update is working fine on the target.
3. Do I need to add these values in the sink connector as well?

Requesting your response. Thanks again for your help and time.

Regards.

The column with the name as “ID” is the primary key column in my oracle database (Source).**

Maybe it’s a case sensitivity issue. Should it be ID instead of id in your transform definition?

If you have trouble assigning the message key using the transforms, you can have the sink assign the document ID instead using the couchbase.document.id sink config property. For more details, take another look at the “use a field of message as the document id” link in my previous post.

  1. Do I need to add these values in the sink connector as well?

No, defining the transform in the source config should be sufficient. Only reason to modify the sink config would be if you want to define the couchbase.document.id sink config property.

Thanks,
David

Thanks for your prompt response.

I tried to change the case as suggested from “id” to “ID”, but still no luck with it. The behaviour is the same.

There is no difference in the output, it still give the same response with the below command :
kafka-console-consumer --bootstrap-server localhost:9092 --topic ANUTCUSERS1 --from-beginning

{“ID”:“T”,“USERNAME”:{“string”:“okfnine”},“PASSWORD”:{“string”:“2530”},“MODIFIED”:1597399208000}
{“ID”:“U”,“USERNAME”:{“string”:“wholw”},“PASSWORD”:{“string”:“2830”},“MODIFIED”:1597399741000}
{“ID”:“V”,“USERNAME”:{“string”:“try”},“PASSWORD”:{“string”:“2930”},“MODIFIED”:1597399855000}
{“ID”:“U”,“USERNAME”:{“string”:“wholw”},“PASSWORD”:{“string”:“2830”},“MODIFIED”:1597399741000}
{“ID”:“V”,“USERNAME”:{“string”:“try”},“PASSWORD”:{“string”:“2930”},“MODIFIED”:1597399855000}
{“ID”:“W”,“USERNAME”:{“string”:“try456”},“PASSWORD”:{“string”:“2940”},“MODIFIED”:1597450749000}
{“ID”:“W”,“USERNAME”:{“string”:“try456”},“PASSWORD”:{“string”:“345”},“MODIFIED”:1597450805000}

Have you tried adding a couchbase.document.id property to the sink config?

couchbase.document.id=myDocumentIdPrefix::${/ID}

Thanks David for your response.

The given parameter helps in achieving the update functionality in couchbase instead of inserting a new document as before.

I am getting the document name as shown below (myDocumentIdPrefix::WQ==) and the ID as “WQ==”, but actually the ID is auto generated in the source (oracle database) and its a number.
Anything to be tweaked here ?

image.png

Hi Asif,

WQ== looks like a Base64 encoded value, which makes me suspect the ID field is being transported as a byte array (or at least, the Avro schema thinks it’s a byte array).

I guess I’d recommend investigating how your Avro schema is defined, or looking into whether the source might be generating a byte array (not a number value) for the ID field.

Thanks,
David