Multiple buckets data in different topics of kafka

Hi,

is it possible to have multiple buckets pushing data in same kafka cluster but on different topics.

If yes what configurations should be used for the same.

Regards
Pankaj Sharma

Hi Pankaj,

The current version of the connector (3.2.0) is limited to reading from a single bucket.

However, it is possible to run multiple instances of the connector, each configured with a different bucket/topic.

Thanks,
David

Can you provide some link or configuration help in it?

I’d recommend following the The Kafka Connect Couchbase Quickstart guide, which shows how to configure and run the connector. Once you’ve got the first connector instance running, make a copy of the quickstart-couchbase-source.properties file and edit the topic.name, connection.bucket and name properties (name is arbitrary, and must uniquely identify the connector instance). Then follow the steps to run the connector again, but this time use the modified config. Unless you stopped the first connector instance, you should now have two connector instances running, each reading from a different bucket.

2 Likes

Hi @david.nault
When i tried to run second instance of Kafka connector, then it gave me error .
FAILED org.eclipse.jetty.server.Server@565e7091: java.net.BindException: Address already in use (org.eclipse.jetty.util.component.AbstractLifeCycle:212)
java.net.BindException: Address already in use
at sun.nio.ch.Net.bind0(Native Method)
at sun.nio.ch.Net.bind(Net.java:433)
at sun.nio.ch.Net.bind(Net.java:425)
at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
at org.eclipse.jetty.server.ServerConnector.open(ServerConnector.java:321)
at org.eclipse.jetty.server.AbstractNetworkConnector.doStart(AbstractNetworkConnector.java:80)
at org.eclipse.jetty.server.ServerConnector.doStart(ServerConnector.java:236)
at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68)
at org.eclipse.jetty.server.Server.doStart(Server.java:366)
at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68)
at org.apache.kafka.connect.runtime.rest.RestServer.start(RestServer.java:145)
at org.apache.kafka.connect.runtime.Connect.start(Connect.java:53)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:82)

Hi @manya.goyal, source connector or sink connector?

It’s a source connector

Hi @manya.goyal, @pankaj.sharma,

I overlooked an important detail documented in Running Workers - Standalone Mode:

If you run multiple standalone instances on the same host, there are a couple of settings that must be unique between each instance:

  • offset.storage.file.filename - storage for connector offsets, which are stored on the local filesystem in standalone mode; using the same file will lead to offset data being deleted or overwritten with different values
  • rest.port - the port the REST interface listens on for HTTP requests

The “Address already in use” exception occurs because the two connector workers are configured with the same value for rest.port.

Thanks,
David

Hi @david.nault
Thank you so much for the detailed info! I already figured out the solution for “address already in use” Problem by changing rest.port
Though i will also include offset.storage.file.filename property in my config.
Thanks once again.

1 Like

For posterity, another way to run multiple instances of the Couchbase connector is to specify multiple *.properties files when starting the worker process, like:

connect-standalone.sh worker-config.properties \
                      connector1-config.properties \ 
                      connector2-config.properties