Kafka message doesn't push to Couchbase via Sink connector


We have setup Couchbase Sink connector for Kafka. It is running fine. I am pushing messages to kafka topic, but that doesn’t get pushed to Couchbase further. What could be wrong?
I am able to see Kafka messages received fine as well.

Hi @jongladwin,

It would be good to confirm that the connector is subscribing to the correct topic(s). Check the topics property in the connector config.

Just guessing, but another possible issue is that the messages in the Kafka topic might be in a format different from the format the connector is configured to receive. For example, the default “quickstart” configuration expects messages to be in JSON format. If you’re sending Avro messages, you’ll need to modify the config to use the Avro converter. If this is the problem, I would expect to see errors in the connector log.

Beyond that, I’d need more information to help troubleshoot. Are you able to share the log output and config files (scrub the passwords, of course!) from the sink connector?

What format are your Kafka messages in (JSON, Avro, something else)?

What versions of Couchbase, Kafka, and the connector are you using? Couchbase 5 and later use Role-Based Access Control (RBAC), and require the connector to be configured with credentials for an account that has write access to the bucket.

Did you see this problem when working through the sink connector example from the Quickstart Guide, or are you building your own project?


Hi david,
Yes, it looks like message format issue. Topic is perfect. I got the following error in log:

tasks": [
“state”: “FAILED”,
“trace”: “org.apache.kafka.connect.errors.DataException: mytopic\n\tat io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:95)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:468)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)\n\tat java.util.concurrent.FutureTask.run(Unknown Source)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)\n\tat java.lang.Thread.run(Unknown Source)\nCaused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1\nCaused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!\n”,

  "id": 0,
  "worker_id": “xx.xx.xx.xxx:8083"


I am using simple JSON and sending message to Kafka topic, not Avro or anything. I think, that’s not supported by Couch base. Is there way to change Couch base connector to receive simple message?

Kafka Producer code:

public void doPost(HttpServletRequest request, HttpServletResponse response)
throws IOException, ServletException {

 String data = "";   
  StringBuilder builder = new StringBuilder();
  BufferedReader reader = request.getReader();
  String line;
  while ((line = reader.readLine()) != null) {
  data = builder.toString();
  try {
  	JSONObject jsonobject = new JSONObject(data);
  	Properties properties = new Properties();
  	properties.put("bootstrap.servers", “xxxxxxxxx.xxxxx.com:9092");
  	properties.put("client.id", CLIENT_ID);
  	properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  	properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  	MyUtilKafkaProducer<String, String> producer = new MyUtilKafkaProducer <>(properties);
  	producer.send(new ProducerRecord<String,String>(topic, jsonobject.toString()));

  } catch (JSONException e) {
  	// TODO Auto-generated catch block


Kafka Consumer code:

try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);

			  for (ConsumerRecord<String, String> record : records) {
				  // To Retrieve Data in this Consumer
				  System.out.println("Received message: " + record.value() );            
	} finally {

Kafka Connector Configuration:

“name”: “xxxxxxx-couchbase-sink-connector",
“config”: {
“connector.class”: “com.couchbase.connect.kafka.CouchbaseSinkConnector”,
“tasks.max”: “20”,
“connection.cluster_address” : “xxxxxxxxxxxxxx:8091",
“connection.bucket” : “xxxxxxx”,
“connection.username”: “xxxxxxxx”,
“connection.password” : “xxxxxxx”,
“connection.timeout.ms” : “2000”,
“topics”: “xxxxxxxxxx”,
“name”: “xxxxxxx-couchbase-sink-connector”,
“key.converter”: “org.apache.kafka.connect.json.JsonConverter”,
“value.converter”: “org.apache.kafka.connect.json.JsonConverter”,
“value.converter.schemas.enable”: “true”,
“use_snapshots” : false

Hi @jongladwin,

Thank you for the details. I think you’re right, this looks like a converter configuration issue.

I do not understand why the exception message says the connector is using the Avro converter, but the Kafka Connector Configuration you posted says to use the JsonConverter. Could it be an issue with the configuration not being applied successfully (or perhaps the error message is from an earlier run that used different configuration?)

org.apache.kafka.connect.json.JsonConverter is the correct value.converter to use for receiving simple JSON messages. So that looks good.

Since the producer is not sending schemas, the value.converter.schemas.enable configuration property should be set to false. (I’d recommend setting key.converter.schemas.enable to false too, but maybe it doesn’t matter since you’re not setting keys when publishing).


Solved after setting schema false. thanks.

1 Like