org.apache.kafka.connect.errors.ConnectException: com.couchbase.client.dcp.error.RollbackException

Hello,

We are facing an issue with Couchbase connector on Kafka. For some reason the connector has stopped responding and changes to the couchbase documents are not being propogated to kafka anymore due to the following error message. The couchbase server and kafka broker were both running succesfully. We have restarted the connector and it worked fine, however this is not a desired situation as it disrupts the chain of events.

org.apache.kafka.connect.errors.ConnectException: com.couchbase.client.dcp.error.RollbackException

Could someone assist please?

org.apache.kafka.connect.errors.ConnectException: com.couchbase.client.dcp.error.RollbackException
    at com.couchbase.connect.kafka.CouchbaseSourceTask.checkErrorQueue(CouchbaseSourceTask.java:160)
    at com.couchbase.connect.kafka.CouchbaseSourceTask.poll(CouchbaseSourceTask.java:125)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:270)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:237)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.couchbase.client.dcp.error.RollbackException
    at com.couchbase.client.dcp.conductor.DcpChannel$6$1.operationComplete(DcpChannel.java:559)
    at com.couchbase.client.dcp.deps.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
    at com.couchbase.client.dcp.deps.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:570)
    at com.couchbase.client.dcp.deps.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:549)
    at com.couchbase.client.dcp.deps.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
    at com.couchbase.client.dcp.deps.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
    at com.couchbase.client.dcp.deps.io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:604)
    at com.couchbase.client.dcp.deps.io.netty.util.concurrent.DefaultPromise.setSuccess(DefaultPromise.java:96)
    at com.couchbase.client.dcp.transport.netty.DcpMessageHandler.channelRead(DcpMessageHandler.java:338)
    at com.couchbase.client.dcp.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at com.couchbase.client.dcp.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at com.couchbase.client.dcp.deps.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at com.couchbase.client.dcp.deps.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
    at com.couchbase.client.dcp.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at com.couchbase.client.dcp.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at com.couchbase.client.dcp.deps.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at com.couchbase.client.dcp.deps.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
    at com.couchbase.client.dcp.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at com.couchbase.client.dcp.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at com.couchbase.client.dcp.deps.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at com.couchbase.client.dcp.transport.netty.BucketConfigHandler.channelRead0(BucketConfigHandler.java:103)
    at com.couchbase.client.dcp.transport.netty.BucketConfigHandler.channelRead0(BucketConfigHandler.java:39)
    at com.couchbase.client.dcp.deps.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
    at com.couchbase.client.dcp.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at com.couchbase.client.dcp.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at com.couchbase.client.dcp.deps.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at com.couchbase.client.dcp.deps.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:321)
    at com.couchbase.client.dcp.deps.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:308)
    at com.couchbase.client.dcp.deps.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:422)
    at com.couchbase.client.dcp.deps.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
    at com.couchbase.client.dcp.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at com.couchbase.client.dcp.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at com.couchbase.client.dcp.deps.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at com.couchbase.client.dcp.deps.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    at com.couchbase.client.dcp.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at com.couchbase.client.dcp.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at com.couchbase.client.dcp.deps.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    at com.couchbase.client.dcp.deps.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
    at com.couchbase.client.dcp.deps.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
    at com.couchbase.client.dcp.deps.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
    at com.couchbase.client.dcp.deps.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
    at com.couchbase.client.dcp.deps.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at com.couchbase.client.dcp.deps.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at com.couchbase.client.dcp.deps.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at com.couchbase.client.dcp.deps.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)

What version of Kafka Connector?

Hi David,
We are using version 4, from master branch on git repo

Hi Maurius,

Looks like a bug. I don’t know how the RollbackException ended up in the error queue, but it probably shouldn’t be there. Any error in that queue causes the connector to terminate, but rollbacks are recoverable.

I’ll take a look this afternoon. Tracking as KAFKAC-221.

Thanks,
David

Can you help me by letting me know if this error occurred immediately after you started the connector, or did it happen later after the connector was running for at least a little bit?

Thanks,
David

Hi David, no the error didn’t occur immediately. The connector has been running for some time and then this error had occured. We had restarted the connector and it worked fine. My understanding is that there has been a rollback operation due to diverging histories, is this correct?
https://github.com/couchbase/kv_engine/blob/master/docs/dcp/documentation/rollback.md

We only have one server as this is our dev environment.

I would like to ask you, is the master branch in your github repository production ready?

https://github.com/couchbase/kafka-connect-couchbase

My understanding is that there has been a rollback operation due to diverging histories, is this correct?

Right, usually due to a failover of one of the Couchbase Server nodes. A Rollback can also occur on startup if the connector had been stopped for so long that the Couchbase Server can no longer resume from the saved offset, but it sounds like that’s not what happened in this case.

I would like to ask you, is the master branch in your github repository production ready?

No, the master branch is not production-ready. Our branch management scheme uses master for development work, so it’s equivalent to a snapshot. Production-ready releases can be downloaded from https://docs.couchbase.com/kafka-connector/current/release-notes.html (or if you want to build from source, you can check out a Git tag).

FYI the master branch is currently (in July, 2020) a snapshot of 4.0.0-dp.2 which is a developer preview. The first GA release of 4.0.0 is currently scheduled for July 21st.

Thanks,
David

Hey David, thanks for your feedback. We have a single node cluster for dev environment and don’t think that there was a failover.

Let me know if you require more information