Unable to get couchbase source connector to load

I can’t seem to get my couchbase connector to build/load I get this error when trying to create it:

[2022-01-14 18:40:17,510] ERROR Failed to create job for config/connect-couchbase-UBDS-source.properties (org.apache.kafka.connect.cli.ConnectStandalone:107)
[2022-01-14 18:40:17,512] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:117)
java.util.concurrent.ExecutionException: java.lang.NoClassDefFoundError: com/couchbase/client/core/logging/RedactionLevel
at org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:115)
at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:99)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:114)
Caused by: java.lang.NoClassDefFoundError: com/couchbase/client/core/logging/RedactionLevel
at java.base/java.lang.Class.getDeclaredMethods0(Native Method)
at java.base/java.lang.Class.privateGetDeclaredMethods(Class.java:3166)
at java.base/java.lang.Class.privateGetPublicMethods(Class.java:3191)
at java.base/java.lang.Class.privateGetPublicMethods(Class.java:3203)
at java.base/java.lang.Class.privateGetPublicMethods(Class.java:3203)
at java.base/java.lang.Class.getMethods(Class.java:1904)
at com.couchbase.connect.kafka.util.config.KafkaConfigProxyFactory.define(KafkaConfigProxyFactory.java:166)
at com.couchbase.connect.kafka.util.config.KafkaConfigProxyFactory.define(KafkaConfigProxyFactory.java:158)
at com.couchbase.connect.kafka.util.config.ConfigHelper.define(ConfigHelper.java:34)
at com.couchbase.connect.kafka.CouchbaseSourceConnector.config(CouchbaseSourceConnector.java:133)
at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:450)
at org.apache.kafka.connect.runtime.AbstractHerder.lambda$validateConnectorConfig$2(AbstractHerder.java:362)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException: com.couchbase.client.core.logging.RedactionLevel
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:103)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
… 17 more
[2022-01-14 18:40:17,521] INFO Kafka Connect stopping (org.apache.kafka.connect.runtime.Connect:67)

Do I have the wrong version or did I miss a config or a file setting in one of the properties?
I’m running this in a stand alone Kafka configuration, under an linux VM, using the community couchbase in single cluster.

Here’s what I have for the couchbase connector properties file:

Arbitrary unique name for the connector. Attempting to register

two connectors with the same name will fail.

name=couchbase-UBDS-source-connector

The Java class for the connector.

connector.class=com.couchbase.connect.kafka.CouchbaseSourceConnector

The maximum number of tasks that should be created for this connector.

tasks.max=2

* Configure deadletter topic for error handling

errors.deadletterqueue.context.headers.enable = true
errors.deadletterqueue.topic.name = deadLetters_couchbase
errors.deadletterqueue.topic.replication.factor = 1
errors.log.enable = true
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = all

Publish to this Kafka topic.

couchbase.topic=UBDS.userProfile

Connect to this Couchbase cluster (comma-separated list of bootstrap nodes).

couchbase.seed.nodes=sandbox-couchbase.bluevolt.io
couchbase.bootstrap.timeout=10s

Optionally connect to Couchbase Server over a secure channel.

If the KAFKA_COUCHBASE_TRUST_STORE_PASSWORD environment variable is set,

it will override the password specified here.

couchbase.enable.tls=true

couchbase.trust.store.path=/path/to/keystore

couchbase.trust.store.password=secret

Read from this Couchbase bucket using these credentials.

If the KAFKA_COUCHBASE_PASSWORD environment variable is set,

it will override the password specified here.

couchbase.bucket=userProfile
couchbase.username=kafka_connect
couchbase.password=**************

Keys of published messages are just Strings with no schema.

key.converter=org.apache.kafka.connect.storage.StringConverter

A “source handler” converts the Couchbase document into a Kafka record.

This quickstart config uses “RawJsonSourceHandler” which creates a Kafka

record whose content is exactly the same as the Couchbase JSON document.

When using RawJsonSourceHandler (or its cousin RawJsonWithMetadataSourceHandler)

the value converter must be ByteArrayConverter… unless you’re using

Single Message Transforms. See the quickstart documentation for more details.

couchbase.source.handler=com.couchbase.connect.kafka.handler.source.RawJsonSourceHandler
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter

Control which Couchbase document change notifications get published to Kafka

using this Java class, which must implement com.couchbase.connect.kafka.filter.Filter.

couchbase.event.filter=com.couchbase.connect.kafka.filter.AllPassFilter

Specifies when in Couchbase history the connector should start streaming from.

Modes starting with “SAVED_OFFSET” tell the connector to resume from when each

vBucket’s state was most recently saved by the Kafka Connect framework, falling back

to the secondary mode if no saved state exists for a vBucket.

couchbase.stream.from=SAVED_OFFSET_OR_BEGINNING
#couchbase.stream.from=SAVED_OFFSET_OR_NOW
#couchbase.stream.from=BEGINNING
#couchbase.stream.from=NOW

To reduce bandwidth usage, Couchbase Server 5.5 and later can send documents to the connector in compressed form.

(Messages are always published to the Kafka topic in uncompressed form, regardless of this setting.)

If the requested mode is not supported by your version of Couchbase Server, compression will be disabled.

ENABLED - (default) Couchbase Server decides whether to use compression

on a per-document basis, depending on whether the compressed form of the

document is readily available. Select this mode to prioritize Couchbase Server

performance and reduced bandwidth usage (recommended).

Requires Couchbase Server 5.5 or later.

DISABLED - No compression. Select this mode to prioritize reduced CPU load for the Kafka connector.

FORCED - Compression is used for every document, unless compressed size is greater than uncompressed size.

Select this mode to prioritize bandwidth usage reduction above all else.

Requires Couchbase Server 5.5 or later.

couchbase.compression=ENABLED
#couchbase.compression=DISABLED
#couchbase.compression=FORCED

The flow control buffer limits how much data Couchbase will send before waiting for the connector to acknowledge

the data has been processed. See the connector documentation for details on how this affects connector memory usage.

couchbase.flow.control.buffer=16m

In some failover scenarios, Couchbase Server may roll back (undo) database

changes that have not yet been persisted across all replicas. By default,

the Kafka connector will poll Couchbase Server and defer event publication

until the change has been persisted to all replicas in the cluster,

at which time the change is unlikely to be rolled back. This feature

introduces some latency, and increases connector memory usage and network

traffic, but prevents rolled-back changes from appearing in the Kafka topic.

The longer the polling interval, the larger the flow control buffer required

in order to maintain steady throughput.

If instead you wish to publish events immediately, set the polling interval to 0.

If you do, be aware that when rollbacks occur you may end up with events

in the Kafka topic from an “alternate timeline” in Couchbase Server’s history.

If the source is an ephemeral bucket (which never persists documents)

this value must be set to 0 to disable the persistence check.

couchbase.persistence.polling.interval=100ms

Set this to true to log document lifecycle milestones at INFO level instead of DEBUG.

Lets you watch how documents flow through the connector.

couchbase.log.document.lifecycle=true

Hi Larry,

java.util.concurrent.ExecutionException: java.lang.NoClassDefFoundError: com/couchbase/client/core/logging/RedactionLevel

This usually means there’s something wrong with how the connector was installed. Could there be a duplicate version of the Couchbase core-io jar hanging around somewhere inside your Kafka installation?

Can you start with a clean Kafka installation? Or grep for core-io inside your installation directory and see what turns up?

Incidentally, a nice way to include preformated text (like a config file) in a forum post is to put three backticks (`) on a line, then your preformatted text, then finally another line with three backticks. Like this:

```
preformatted text
```

Thanks,
David

1 Like

Thanks for the tip on including code.

Here’s what I have in the plugins folder:
kafka@Kafka-server:~/kafka/plugins/couchbase-connector$ ls
HdrHistogram-2.1.12.jar jsoup-1.14.2.jar micrometer-registry-jmx-1.5.5.jar
LatencyUtils-2.0.3.jar kafka-connect-couchbase-4.1.4.jar reactive-streams-1.0.3.jar
core-io-2.2.3.jar metrics-core-4.0.7.jar reactor-core-3.4.6.jar
dcp-client-0.37.0.jar metrics-jmx-4.0.7.jar slf4j-api-1.7.30.jar
java-client-3.2.3.jar micrometer-core-1.5.5.jar therapi-runtime-javadoc-0.12.0.jar

I also recopied the jar files from my unzipped couchbase 4.1.4 lib folder back into my plugin folder but I get the same results.

Is there someplace else I should be looking?

Maybe somewhere else under ~/kafka?

find ~/kafka | grep core-io

Or maybe the class path isn’t getting set correctly… what command are you using to run the connector?