[ANN] Kafka Connector 3.1.2


#1

Hi Everyone

I’m glad to announce you new release of Kafka Couchbase Connector. This is a maintenance release, and fixes issue with possible OOM when loading huge bucket, and the internal queue cannot be drained fast enough (KAFKAC-66).

Package: http://packages.couchbase.com/clients/kafka/3.1.2/kafka-connect-couchbase-3.1.2.zip
Source code: https://github.com/couchbase/kafka-connect-couchbase#readme
Issue Tracker: https://issues.couchbase.com/projects/KAFKAC/

It also available in maven in case you are going to extend and integrate with the connector.


#2

HI @avsej.
I have installed Kafka Connector 3.0 to connect my couchbase with kafka.
I have followed the steps mentioned in https://developer.couchbase.com/documentation/server/4.6/connectors/kafka-3.0/quickstart.html
I can see my data from console consumer in kafka. But my data contains payload.content in encrypted form.
I need help to decrypt my data, since i have to show data on kibana using logstash.
Here is the JSON i am getting:

{  
   "schema":{  
      "type":"struct",
      "fields":[  
         {  
            "type":"string",
            "optional":false,
            "field":"event"
         },
         {  
            "type":"int16",
            "optional":false,
            "field":"partition"
         },
         {  
            "type":"string",
            "optional":false,
            "field":"key"
         },
         {  
            "type":"int64",
            "optional":false,
            "field":"cas"
         },
         {  
            "type":"int64",
            "optional":false,
            "field":"bySeqno"
         },
         {  
            "type":"int64",
            "optional":false,
            "field":"revSeqno"
         },
         {  
            "type":"int32",
            "optional":true,
            "field":"expiration"
         },
         {  
            "type":"int32",
            "optional":true,
            "field":"flags"
         },
         {  
            "type":"int32",
            "optional":true,
            "field":"lockTime"
         },
         {  
            "type":"bytes",
            "optional":true,
            "field":"content"
         }
      ],
      "optional":false,
      "name":"com.couchbase.DcpMessage"
   },
   "payload":{  
      "event":"mutation",
      "partition":46,
      "key":"Logs::d42f91ce-e0ec-444f-bb20-09398f15efe4",
      "cas":1501735721952542720,
      "bySeqno":12,
      "revSeqno":1,
      "expiration":0,
      "flags":0,
      "lockTime":0,
      "content":"eyJfc3luYyI6eyJyZXYiOiIxLWEwMzQ4NjlhNzYyYzAyZWYxMzJiZjhhMDFhM2IxOTUwIiwic2VxdWVuY2UiOjEyNDkxLCJyZWNlbnRfc2VxdWVuY2VzIjpbMTI0OTFdLCJoaXN0b3J5Ijp7InJldnMiOlsiMS1hMDM0ODY5YTc2MmMwMmVmMTMyYmY4YTAxYTNiMTk1MCJdLCJwYXJlbnRzIjpbLTFdLCJjaGFubmVscyI6W251bGxdfSwidGltZV9zYXZlZCI6IjIwMTctMDgtMDNUMDQ6NDg6MjYuNjM4NzAxOTU5WiJ9LCJjb21wb25lbnROYW1lIjoiVmlyZ2luIFZveWFnZXMiLCJjb21wb25lbnRWZXJzaW9uIjoiMS4yLjUiLCJjb3JlbGF0aW9uSWQiOiIxNTAxNzM1NTMyODk1IiwiZGV2aWNlaW5mbyI6eyJpZCI6IjVmNWJhNThkNjVmMzRiNDUiLCJuYW1lIjoibW90b3JvbGEgWFQxMDY4Iiwib3BlcmF0aW5nU3lzdGVtIjoiQW5kcm9pZCIsIm9zVmVyc2lvbiI6IjUuMC4yIiwidHlwZSI6IlBob25lIn0sImhvc3RuYW1lIjoibW90b3JvbGEgWFQxMDY4IiwibWVzc2FnZSI6bnVsbCwibWVzc2FnZUNvZGUiOiJJTkpFQ1QiLCJtZXNzYWdlRGV0YWlsIjoie1wiaW5mb3JtYXRpb25UeXBlXCI6XCJiZWFjb25EZXRhaWxzXCIsXCJhY3Rpb25EYXRhXCI6W3tcImJlYWNvbl9pZFwiOlwiYjk0MDdmMzAtZjVmOC00NjZlLWFmZjktMjU1NTZiNTdmZTZkOjE0NDQ2OjYyNjc3XCIsXCJkaXN0YW5jZVwiOjMuMDYzOTI4NjcyNTY0MTU1N30se1wiYmVhY29uX2lkXCI6XCJiOTQwN2YzMC1mNWY4LTQ2NmUtYWZmOS0yNTU1NmI1N2ZlNmQ6MTk3NTU6NTg5OTdcIixcImRpc3RhbmNlXCI6Mi4zNDE5MzU3MDM2MzE3MjE3fSx7XCJiZWFjb25faWRcIjpcImI5NDA3ZjMwLWY1ZjgtNDY2ZS1hZmY5LTI1NTU2YjU3ZmU2ZDoyMDA2Mzo0NTgxNFwiLFwiZGlzdGFuY2VcIjoyNi41MDQwMDkyMDE4OTgwOTZ9LHtcImJlYWNvbl9pZFwiOlwiYjk0MDdmMzAtZjVmOC00NjZlLWFmZjktMjU1NTZiNTdmZTZkOjI4MTM6MTM5OTVcIixcImRpc3RhbmNlXCI6Ny43MzY3Mzg5ODExMTY0Mzh9LHtcImJlYWNvbl9pZFwiOlwiYjk0MDdmMzAtZjVmOC00NjZlLWFmZjktMjU1NTZiNTdmZTZkOjMzNjMxOjU2NjUyXCIsXCJkaXN0YW5jZVwiOjMuNjUxMjc5OTkyNzg3Nzk1fSx7XCJiZWFjb25faWRcIjpcImI5NDA3ZjMwLWY1ZjgtNDY2ZS1hZmY5LTI1NTU2YjU3ZmU2ZDo0MjQyMzozNzE0NVwiLFwiZGlzdGFuY2VcIjoxLjk1MDgzMDY5OTMyOTkwNTJ9LHtcImJlYWNvbl9pZFwiOlwiYjk0MDdmMzAtZjVmOC00NjZlLWFmZjktMjU1NTZiNTdmZTZkOjUxNzcxOjY0ODk4XCIsXCJkaXN0YW5jZVwiOjIuMzQxOTM1NzAzNjMxNzIxN30se1wiYmVhY29uX2lkXCI6XCJiOTQwN2YzMC1mNWY4LTQ2NmUtYWZmOS0yNTU1NmI1N2ZlNmQ6NTc0NzM6MTE1NzFcIixcImRpc3RhbmNlXCI6MTcuMzQ3ODg0NzY2MDM4Mzl9LHtcImJlYWNvbl9pZFwiOlwiYjk0MDdmMzAtZjVmOC00NjZlLWFmZjktMjU1NTZiNTdmZTZkOjYwMzgzOjQ4NDgxXCIsXCJkaXN0YW5jZVwiOjcuMTM5MzExMjg1MDYxMjY0Nn0se1wiYmVhY29uX2lkXCI6XCJiOTQwN2YzMC1mNWY4LTQ2NmUtYWZmOS0yNTU1NmI1N2ZlNmQ6Njk0OToxNDQ4MFwiLFwiZGlzdGFuY2VcIjozLjM0NjAyMDI4ODQyNjIzMn1dfSIsIm9mZnNldCI6IiswNTozMCIsInNldmVyaXR5IjoiRGVidWciLCJzdGFja3RyYWNlIjpudWxsLCJ0aW1lc3RhbXAiOiIyMDE3LTA4LTAzVDEwOjE1OjMyLjg5NSIsInRyYWNlaW5mbyI6eyJkdXJhdGlvbiI6IjAiLCJsb2dnaW5nUG9pbnQiOm51bGwsIm1ldGhvZFRpbWUiOiIyMDE3LTA4LTAzVDEwOjE1OjMyLjg5NSJ9LCJ0eXBlIjoibG9nIiwidXNlcmluZm8iOnsiYXBwSWQiOm51bGwsImRldmljZUlkIjpudWxsLCJpZCI6bnVsbCwidG9rZW4iOm51bGwsInR5cGUiOm51bGx9fQ=="
   }
}

I need help to decrypt Content field in it.


#3

It does not encrypt your data. The default converter passes document body as bytesequence to Kafka, and its converted configured to represent bytes using base64, because otherwise it might break JSON syntax.

You have several options here.

  1. implement custom converter for Couchbase Connector, and instead writing body as bytes, write it in structured form your application needs
  2. leave default couchbase connector converter, but replace JSON converter you are using for Kafka, for example Avro converter can handle bytesequence directly.
  3. our the side of you application you, can use Base64 decoder to extract the content.

#4

@avsej. I think i can use second option. But How can i replace JSON convertor with Avro convertor?


#5

@avsej Here are my configuration for connect-standalone-properties.

bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

#6

apache kafka does not ship avro converter, but it is accessible in confluent distribution, and require their schema regitry:

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

#7

You can also try to override BYTES converter in default JsonConverter:


#8

@avsej I have already installed confluent and schema registry. So can i directly use those properties?


#9

yes, but you should check that schema address is correct unless it listening http://localhost:8081


#10

Hi @avsej
I have installed my kafka-connector without confluent-platform and schema registry.
Is there any way to decode the content field when kafka-connect-couchbase connector is installed with apache-kafka only?
I want my content field to display data in simplle text format only.


#11

Thanks @avsej,
I have changed the code ,in the connector, actually apart from what you suggested, One also need to do change in schema. If you suggest I can send the code to you for code review and if this can be helpful for the community.

Regards
Pankaj Sharma