Kafka-couchbase-connector Fetching metadata from broker id:0,host:9.102.20.225,port:9092

Hi there,

I’m very new to couchbase. My goal is extend & develop for our requirement the existing kafka-CB-connector in java to connect to Couchbase bucket and get the document by document published to kafka topic. For which I’m suing this link https://github.com/couchbase/couchbase-kafka-connector.

My couchbase in my local machine is couchabe 6.0 and Kafka 2.12

My question. Why is SampleEncoder getting properties from somewhere and overriding ? Because of this its trying to connect to some IP address:9092 kafka its not able to publish to kafka topic. Please help me understand this problem and how do I publish to kafka topic?
my Example Code :

public class Example {
public static void main(String args) {
DefaultCouchbaseKafkaEnvironment.Builder builder =
(DefaultCouchbaseKafkaEnvironment.Builder) DefaultCouchbaseKafkaEnvironment.builder()
.kafkaFilterClass(“example.SampleFilter”)
.kafkaTopic(“test”)
.kafkaZookeeperAddress(“127.0.0.1:2181”)
.couchbaseNodes(“localhost”)
.couchbaseBucket(“Devdb”)
.couchbasePassword(“password-working”)

                    .kafkaValueSerializerClass("example.SampleEncoder")
                    .couchbaseStateSerializerClass("example.NullStateSerializer")
                    .dcpEnabled(true);
    CouchbaseKafkaConnector connector = CouchbaseKafkaConnector.create(builder.build());
    
    connector.run();
}

}

ERROR
**

WARN - Fetching topic metadata with correlation id 0 for topics [Set(test)] from broker [id:0,host:9.102.20.225,port:9092] failed

**
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
at kafka.utils.Utils$.swallow(Utils.scala:172)
at kafka.utils.Logging$class.swallowError(Logging.scala:106)
at kafka.utils.Utils$.swallowError(Utils.scala:45)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
at kafka.producer.Producer.send(Producer.scala:77)
at kafka.javaapi.producer.Producer.send(Producer.scala:33)
at com.couchbase.kafka.KafkaWriter.onEvent(KafkaWriter.java:77)
at com.couchbase.kafka.KafkaWriter.onEvent(KafkaWriter.java:37)
at com.couchbase.client.deps.com.lmax.disruptor.BatchEventProcessor.run(BatchEventProcessor.java:129)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at com.couchbase.client.deps.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:748)
INFO - Disconnecting from 9.102.20.225:9092
ERROR - fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(id:0,host:9.102.20.225,port:9092)] failed
kafka.common.KafkaException: fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(id:0,host:9.102.20.225,port:9092)] failed
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)

I should probably use https://github.com/couchbase/kafka-connect-couchbase this and develop for my requirement. but there is no good example on how to start getting data from bucket for this version of github code

Hi Hykavitha,

I should probably use https://github.com/couchbase/kafka-connect-couchbase this and develop for my requirement.

Correct. The other repository you mentioned is old and unsupported.

but there is no good example on how to start getting data from bucket for this version of github code

The documentation includes a quickstart guide for running the connector. Once you get a feel for how it works, look at the custom source handler example which shows how to customize the message published to Kafka.

I’m happy to answer any questions you have along the way.

Thanks,
David

Hi David,

The issue is resolved now. I was able to use th eexample you pointed to connect to CB and produce to kafka.

Now I have question on making customSourceHandler.java do custome query…? is there any example that you can point me to?
ex:
for list of id’s or an id I need to produce the results of this query to a kafka-topic .
SELECT META(t).id, *
FROM Devpoc as t
where META(t).id = ‘1271’

also, I want this customeSourcehandler listen to a topic-1
topic-1 is going to get id
for each id select a document from CB and publish it to topic-2

how would I achieve all this?

i have written java-kafka code to consume for topic-1 and do some processing and produce it to topic-2.
here my processing is get data from CB using query above and using custom source handler

Hi Hykavitha,

The source connector can’t do a custom query; it always receives change notifications for all documents. However, you can filter the output so only the desired documents are sent to Kafka. A source handler’s handle method can return null for documents you want to skip.

A source handler can transform a document, skip it, or route it to a specific topic. It acts in response to a document change notification from Couchbase. I don’t think it was designed for the use case you have in mind.

It sounds like you want to listen to a Kafka topic, do some document lookups in Couchbase, and publish the results to another topic. Have you evaluated Kafka Streams? It might be a better match for your project’s requirements, since you it would let you sit between two topics and do anything you want using the Couchbase Java SDK.

Thanks,
David