Kafka Couchbase Connector Issue

Hi Team, I am trying to implement couchbase kafka connector version - 4.1.5. Implemented as well but facing issue. When I am doing any updates, deletes, any event in couchbase, connector is not pushing that data in kafka.
I am attaching my logs - There are the logs which I am getting after executing connector. So Basically I am not getting issue in logs but Connector is not doing its work. Can anyone assist me in that. May be I am doing something wrong.

Hi Ankita,

Can you share the the full logs as text instead of an image, please? It might also be useful to see your connector configuration.

Note that if the connector has had a chance to save its checkpoints, it won’t start over from the beginning unless you rename the connector or set the couchbase.stream.from property to BEGINNING (see Source Configuration Options | Couchbase Docs

Can you try running kafka-console-consumer to view the same topic the connector is using?

https://docs.couchbase.com/kafka-connector/current/quickstart.html#observe-messages-published-by-couchbase-source-connector

Thanks,
David

Hi David,

Thanks for quick response. I am attaching the logs. As there is a limit of a number of characters.

connect.zip (7.3 KB)

Thanks,
Ankita Singla

Hi David,

I’ve tried setting the couchbase.stream.from property to BEGINNING as well, but I’m not getting the expected results. I’m running consumer as well.

Thanks,
Ankita Singla

Hi David,

The kafka couchbase connector works wonderfully when I connect it to local couchbase (i.e. couchbase running in a docker) in my system. However, when I connect it to a couchbase installed on dev environment(secure one) , it is not working as expected. When I am doing any updates, deletes, any event in couchbase, connector is not pushing that data in kafka.

Thanks,
Ankita Singla

Hi Ankita, thank you for uploading the logs. I noticed you’re bootstrapping against port 11207 which is the KV port for TLS (secure connections).

couchbase.seed.nodes = [10.148.34.68:11207]

In order to use secure connections, you’ll need to configure a couple of connector config properties.

  1. Download the Couchbase Server CA certificate and put it on the filesystem of the Kafka Connect worker nodes.

  2. Set the couchbase.enable.tls connector config property to true.

  3. Set the couchbase.trust.certificate.path connector config property to the absolute filesystem path to the CA certificate you downloaded in step 1.

Since 11207 is the default port for secure connections, you can omit it from the couchbase.seed.nodes property and just write couchbase.seed.nodes=10.148.34.68.

If you don’t need to connect over TLS, ignore all of this and just remove the port from couchbase.seed.nodes.

Thanks,
David

Hi David,

Thanks , Sure I will follow these steps then I will get back to you.

Thanks,
Ankita Singla

Hi David,

I am facing one more issue. When I am trying to deploy kafka couchbase connector in docker. I am getting issue.

[2021-11-30 06:23:07,036] ERROR Failed to create job for quickstart-couchbase-source.properties (org.apache.kafka.connect.cli.ConnectStandalone:107)
[2021-11-30 06:23:07,037] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:117)
java.util.concurrent.ExecutionException: java.lang.NoClassDefFoundError: com/couchbase/client/core/logging/RedactionLevel
at org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:115)
at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:99)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:114)
Caused by: java.lang.NoClassDefFoundError: com/couchbase/client/core/logging/RedactionLevel
at java.base/java.lang.Class.getDeclaredMethods0(Native Method)
at java.base/java.lang.Class.privateGetDeclaredMethods(Class.java:3166)
at java.base/java.lang.Class.privateGetPublicMethods(Class.java:3191)
at java.base/java.lang.Class.privateGetPublicMethods(Class.java:3203)
at java.base/java.lang.Class.privateGetPublicMethods(Class.java:3203)
at java.base/java.lang.Class.getMethods(Class.java:1904)
at com.couchbase.connect.kafka.util.config.KafkaConfigProxyFactory.define(KafkaConfigProxyFactory.java:166)
at com.couchbase.connect.kafka.util.config.KafkaConfigProxyFactory.define(KafkaConfigProxyFactory.java:158)
at com.couchbase.connect.kafka.util.config.ConfigHelper.define(ConfigHelper.java:34)
at com.couchbase.connect.kafka.CouchbaseSourceConnector.config(CouchbaseSourceConnector.java:133)
at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:450)
at org.apache.kafka.connect.runtime.AbstractHerder.lambda$validateConnectorConfig$2(AbstractHerder.java:362)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException: com.couchbase.client.core.logging.RedactionLevel
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
… 17 more

Thanks,
Ankita Singla

Hi Ankita,

The Couchbase connector might not be installed correctly – or could there be interference from a previous version of the connector?

How & where are you installing the connector?

Thanks,
David

Hi David,

The Kafka-connect-couchbase I am downloading it from GitHub - couchbase/kafka-connect-couchbase: Kafka Connect connector for Couchbase Server here.
That is running fine on my system but when I am trying to run it as a container then it is failing.
May be my docker file is not correct.
These are my docker file contents -

FROM centos:centos7

RUN mkdir -p /apps/price

RUN chmod 776 /apps/price

FROM confluentinc/cp-kafka-connect

COPY target/kafka-connect-couchbase-4.1.5-SNAPSHOT.jar /apps/price/kafka-connect-couchbase-4.1.5-SNAPSHOT.jar

COPY target/components /apps/price/components

COPY config/quickstart-couchbase-source.properties /apps/price/quickstart-couchbase-source.properties

COPY config/connect-standalone.properties /apps/price/connect-standalone.properties

WORKDIR /apps/price

CMD [“sh”, “-c”, “env CLASSPATH=./* connect-standalone connect-standalone.properties quickstart-couchbase-source.properties”]

Thanks,
Ankita Singla

@david.nault

I wanted to jump in on

Note that if the connector has had a chance to save its checkpoints, it won’t start over from the beginning unless you rename the connector or set the couchbase.stream.from property to BEGINNING

I am also setting up a pipeline and have this question in this forum.

But this potentially solves it all.

If my connect cluster dies and then comes up again and is set to stream from lastSaved (or beginning) will it receive only the changes since the last checkpoint? If so, how much downtime can it have before the relevant cdc/wal/dcp logs get rolled over and can we configure this?

(I tested this behavior using a standalone connector deployment and simply got from the beginning behavior but the guys at confluence told me that the standalone deployment does not use the same checkpoint offset system the distributed deployment does which gives me hope)

Also, just to clarify fromTheBeginning really only means the current state of the data is ins’t a literal replay of the history (at least that what it looks like from my tests), is that correct?

Thank you so much for your help.

Hi Ankita,

Instead of building the connector from source, I would recommend using a pre-built version downloaded from Couchbase or Confluent Hub. If you must build from source, please check out a tag instead of building a snapshot from the main branch.

Since you’re using a Confluent docker image, the simplest thing would be to install from Confluent Hub:

FROM confluentinc/cp-kafka-connect

RUN confluent-hub install --no-prompt couchbase/kafka-connect-couchbase:4.1.4

This way you do not need to set the CLASSPATH environment variable, since the confluent-hub install command adds the Couchbase connector to the plugin path. (You’ll still need to make your config file available to the connector somehow; COPY-ing it into the image as you are doing will work; a more flexible way might be to use a volume mount.)

Please see the Confluent Docker Developer Guide for the correct way to launch the connector. Instead of having a CMD instruction that directly launches connect-standalone, it sounds like they want you to create a /etc/confluent/docker/launch script:

The /etc/confluent/docker/launch script runs the actual process. The script should ensure that:

  • The process is run with process id 1. Your script should use exec so the program takes over the shell process rather than running as a child process. This is so that your program will receive signals like SIGTERM directly rather than its parent shell process receiving them.
  • Log to stdout.

Yes, the connector will resume from the last saved offset, unless the connector has been offline long enough for the offset to become stale (typically days) in which case the connector will restart from the beginning and stream everything again.

It’s not a literal replay of history since it might not send every historical version of a document, but it does send every document again.

1 Like

Hi David,

I tried that also. But I am getting error while running kafka connector on dev environnment.
I am attaching the logs as well.
Logs.zip (45.5 KB)

08:48:20.824 [nioEventLoopGroup-3-1] WARN c.c.client.dcp.conductor.Conductor - Cannot connect configuration provider.
08:48:20.824 [nioEventLoopGroup-3-1] DEBUG c.c.client.dcp.conductor.Conductor - Instructed to shutdown.
08:48:20.828 [nioEventLoopGroup-3-1] DEBUG c.c.c.d.c.HttpStreamingConfigProvider - Initiating streaming config provider shutdown on channel.
08:48:20.828 [nioEventLoopGroup-3-1] INFO c.c.client.dcp.conductor.Conductor - Shutdown complete.
08:48:20.851 [task-thread-couchbase-kafka-connector-0] INFO o.a.k.c.runtime.WorkerSourceTask - WorkerSourceTask{id=couchbase-kafka-connector-0} Committing offsets
08:48:20.851 [task-thread-couchbase-kafka-connector-0] INFO o.a.k.c.runtime.WorkerSourceTask - WorkerSourceTask{id=couchbase-kafka-connector-0} flushing 0 outstanding messages for offset commit
08:48:20.851 [task-thread-couchbase-kafka-connector-0] DEBUG o.a.k.c.runtime.WorkerSourceTask - WorkerSourceTask{id=couchbase-kafka-connector-0} Finished offset commitOffsets successfully in 0 ms
08:48:20.852 [task-thread-couchbase-kafka-connector-0] ERROR o.a.kafka.connect.runtime.WorkerTask - WorkerSourceTask{id=couchbase-kafka-connector-0} Task threw an uncaught and unrecoverable exception
org.apache.kafka.connect.errors.ConnectException: com.couchbase.client.dcp.error.BootstrapException: Could not connect to Cluster/Bucket
at com.couchbase.connect.kafka.CouchbaseSourceTask.poll(CouchbaseSourceTask.java:194) ~[kafka-connect-couchbase-3.4.6-SNAPSHOT.jar:na]
at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:289) ~[connect-runtime-6.0.1-ccs.jar:na]
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:256) ~[connect-runtime-6.0.1-ccs.jar:na]
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185) ~[connect-runtime-6.0.1-ccs.jar:na]
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235) ~[connect-runtime-6.0.1-ccs.jar:na]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]

Thanks,
Ankita Singla

Instantiated connector couchbase-kafka-connector with version 3.4.6-SNAPSHOT of type class com.couchbase.connect.kafka.CouchbaseSourceConnector

It will be easier for us to help if you:

  1. Use an official version of the connector downloaded from Couchbase or Confluent Hub (I have no idea what could be in that SNAPSHOT version).
  2. Upgrade to the latest version of the connector (4.1.4 at time of writing).
connection.bucket = productlocator
...
clusterAt=[HostAndPort{host='10.148.33.224', port=8091}
...
Caused by: com.couchbase.client.core.CouchbaseException: Unknown error code during connect: 400 Bad Request
	at com.couchbase.client.dcp.transport.netty.StartStreamHandler.channelRead0(StartStreamHandler.java:81) ~[kafka-connect-couchbase-3.4.6-SNAPSHOT.jar:na]
	... 24 common frames omitted

The connector is sending a GET request to http://10.148.33.224:8091/pools/default/b/productlocator to get the bucket configuration. This request is failing with 400 Bad Request. I am suspicious, because I can’t think of a case where Couchbase would return that error code. I wonder if something else is receiving that request?

Here’s a curl command you can run (from the same envionrment where the connector is deployed) to see who’s servicing the request. Maybe the response body will give you a clue?

curl -v -u <username>:<password> http://10.148.33.224:8091/pools/default/b/productlocator

Thanks,
David