spark.sparkContext.couchbaseQuery number of partitions

Hi all,

I use spark 2.1 to load data from couchbase. I have a cluster of couchbase, 5 nodes. The global secondary index is built on one node. Here is my code

val query = "SELECT * FROM `mybucket` WHERE `myvalue` = 'value'"
val mydata = spark.sparkContext.couchbaseQuery(N1qlQuery.simple(query))
logger.info(s"number of partitions ${mydata.getNumPartitions}")

I print out the number of partitions the mydata has. It’s always 1. I tried following config in my spark code

.config("spark.sql.shuffle.partitions", "300")
.config("spark.default.parallelism", "300")

It doesn’t change anything. Since there is only 1 partition, no matter what I do there is only one task running.

I tried to repartition the rdd, but it doesn’t make any progress. From spark, I can see only one task (I have 4 cores for the executor) running without making any progress. CPU usage is extremely low, around 2%.

I’m not so sure whether this is related to how I build the index, do I need to distribute the index across the cluster to increase parallelism. Any suggestion about how to set the number of partitions for rdd when I load the data from couchbase or in general how to increase the parallelism when loading data from couchbase?

Thanks a lot

Xuan

I managed to partially solve the problem. What I did is to first load all Meta id and partition it into a few hundred partitions. Then use couchbaseGet to get the actual docs from couchbase. At least it fully used the hadoop cluster resources.

val query = "SELECT META(`mybucket`).id AS id FROM `mybucket` WHERE `myvalue` = 'value'"
val mydata = spark.sparkContext.couchbaseQuery(N1qlQuery.simple(query))
        .repartition(300)
        .map(_.value.getString("id"))
        .couchbaseGet[JsonDocument]()

Still it would be nice to know how to set the number of partitions when loading data from couchbase instead of this indirect temporary solution. I guess this might not work if I have billions of docs in couchbase.

This is a huge issue, and there is not a single mention about it anywhere. Spark Couchbase connector is for no use practically for data that can not fit a single node. All operations overload a single partition. Is there any fix coming up from Couchbase?

As I’m scaling my deployment to 40M documents (and expected to double in 2 weeks) on 4 nodes, all the solutions presented above transition from impractical to infeasible.

I have managed to PoC out the following approaches:

  1. Fetch all IDs using a Couchbase query to a single Spark partition as presented above by @xuan. Here I observed query timeouts, OOM errors, backfill limits. On the other hand, it was exceptionally slow. Several hours passed and I got to the point where I had IDs redistributed on the cluster, so that the .gets could start. The deadline SLA I specified in this setting for my experimental batch processing workload was not going to be met by this approach.
  2. Tried to distribute Couchbase queries among Spark partitions in a way that each partitions fires up a query, but with LIMIT and OFFSET specified, so that all partitions acquire a subset of IDs (or documents). Could not get pass the backfill errors, and it was slow as well. (Why is the backfill limit burned into CB 4.5?)
  3. I knew I either get my hands dirty or replace Couchbase with another system (for example Cassandra). I designed and implemented an API to Spark, that would use DCP clients to stream data from a bucket (from the beginning to now).

The third solution seems to be fast, although there were some problems:

  • When you stream using DCP from the beginning to now, the DCP client will acquire failover logs for each vBucket, that was exceptionally slow, since it is done 1 by 1. I tried to run 10 DCP clients per machine, so it slowed it down even more. I couldn’t track down why failover logs are so slow to acquire.
  • To tackle the problem above, I assign vBucket IDs to Spark partitions (probably more then one vBucket is assigned to a Spark partition), and 1 or 2 preinitialized DCP client is reused on each Spark executor.
  • DCP client sessions has to be managed carefully as well, since within a Spark job, the same bucket might be acquired more than once.
  • I use commons-pool2 to manage a DCP client pool.

My initial tests shows that this approach is very practical. I hope a similar solution will hit the Spark Connector as well. I can not imagine that anyone is using the stock Spark Connector on its own for any Big Data use-case.

Cheers & Christmas,
Zoltán

Having problems in several scenarios, it seems in those, when I initiate streaming when there are mutations occur on the cluster as well.

This is the log of my DCP client:

13:44:18.279 [nioEventLoopGroup-2-6] WARN com.couchbase.client.dcp.Client - Received rollback for vbucket 41 to seqno 0
rollback message for partition 41
13:44:18.280 [nioEventLoopGroup-2-6] INFO com.couchbase.client.dcp.Client - Stopping to Stream for 1 partitions
13:44:18.280 [nioEventLoopGroup-2-6] INFO com.couchbase.client.dcp.Client - Starting to Stream for 1 partitions
13:44:18.280 [nioEventLoopGroup-2-6] WARN c.c.client.dcp.conductor.Conductor - Error during Partition Move for partition 43
com.couchbase.client.dcp.error.RollbackException: null
at com.couchbase.client.dcp.conductor.DcpChannelControlHandler.filterOpenStreamResponse(DcpChannelControlHandler.java:85)
at com.couchbase.client.dcp.conductor.DcpChannelControlHandler.onEvent(DcpChannelControlHandler.java:53)
at com.couchbase.client.dcp.transport.netty.DcpMessageHandler.channelRead(DcpMessageHandler.java:108)
at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)
at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342)
at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:335)
at com.couchbase.client.deps.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:312)
at com.couchbase.client.deps.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:299)
at com.couchbase.client.deps.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:415)
at com.couchbase.client.deps.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)
at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342)
at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:335)
at com.couchbase.client.deps.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1302)
at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)
at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342)
at com.couchbase.client.deps.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at com.couchbase.client.deps.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:135)
at com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:646)
at com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:581)
at com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:498)
at com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:460)
at com.couchbase.client.deps.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
at com.couchbase.client.deps.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
at java.lang.Thread.run(Thread.java:748)
13:44:18.300 [nioEventLoopGroup-2-6] WARN com.couchbase.client.dcp.Client - Received rollback for vbucket 43 to seqno 0
rollback message for partition 43
13:44:18.300 [nioEventLoopGroup-2-6] INFO com.couchbase.client.dcp.Client - Stopping to Stream for 1 partitions
13:44:18.300 [nioEventLoopGroup-2-6] INFO com.couchbase.client.dcp.Client - Starting to Stream for 1 partitions
rollback completed for partition 44
rollback completed for partition 42
rollback completed for partition 41
rollback completed for partition 43
detected stream end 44
13:44:49.806 [nioEventLoopGroup-2-6] INFO c.c.client.dcp.conductor.DcpChannel - Node 2.couchbase.prod.omega/172.100.0.6:11210 socket closed, initiating reconnect.
13:44:50.091 [nioEventLoopGroup-2-7] INFO c.c.client.dcp.conductor.DcpChannel - Connected to Node 2.couchbase.prod.omega/172.100.0.6:11210
13:44:50.207 [nioEventLoopGroup-2-7] WARN c.c.client.dcp.conductor.Conductor - Error during Partition Move for partition 41
com.couchbase.client.dcp.error.RollbackException: null
at com.couchbase.client.dcp.conductor.DcpChannelControlHandler.filterOpenStreamResponse(DcpChannelControlHandler.java:85)
at com.couchbase.client.dcp.conductor.DcpChannelControlHandler.onEvent(DcpChannelControlHandler.java:53)
at com.couchbase.client.dcp.transport.netty.DcpMessageHandler.channelRead(DcpMessageHandler.java:108)
at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)
at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342)
at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:335)
at com.couchbase.client.deps.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:312)
at com.couchbase.client.deps.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:286)
at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)
at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342)
at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:335)
at com.couchbase.client.deps.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1302)
at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)
at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342)
at com.couchbase.client.deps.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at com.couchbase.client.deps.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:135)
at com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:646)
at com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:581)
at com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:498)
at com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:460)
at com.couchbase.client.deps.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
at com.couchbase.client.deps.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
at java.lang.Thread.run(Thread.java:748)
13:44:50.207 [nioEventLoopGroup-2-7] WARN com.couchbase.client.dcp.Client - Received rollback for vbucket 41 to seqno 0
rollback message for partition 41
13:44:50.207 [nioEventLoopGroup-2-7] INFO com.couchbase.client.dcp.Client - Stopping to Stream for 1 partitions
13:44:50.207 [nioEventLoopGroup-2-7] INFO com.couchbase.client.dcp.Client - Starting to Stream for 1 partitions
13:44:50.222 [nioEventLoopGroup-2-7] WARN c.c.client.dcp.conductor.Conductor - Error during Partition Move for partition 43
com.couchbase.client.dcp.error.RollbackException: null
at com.couchbase.client.dcp.conductor.DcpChannelControlHandler.filterOpenStreamResponse(DcpChannelControlHandler.java:85)
at com.couchbase.client.dcp.conductor.DcpChannelControlHandler.onEvent(DcpChannelControlHandler.java:53)
at com.couchbase.client.dcp.transport.netty.DcpMessageHandler.channelRead(DcpMessageHandler.java:108)
at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)
at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342)
at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:335)
at com.couchbase.client.deps.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:312)
at com.couchbase.client.deps.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:299)
at com.couchbase.client.deps.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:415)
at com.couchbase.client.deps.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)
at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342)
at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:335)
at com.couchbase.client.deps.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1302)
at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)
at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342)
at com.couchbase.client.deps.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at com.couchbase.client.deps.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:135)
at com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:646)
at com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:581)
at com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:498)
at com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:460)
at com.couchbase.client.deps.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
at com.couchbase.client.deps.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
at java.lang.Thread.run(Thread.java:748)
13:44:50.222 [nioEventLoopGroup-2-7] WARN com.couchbase.client.dcp.Client - Received rollback for vbucket 43 to seqno 0
rollback message for partition 43
13:44:50.222 [nioEventLoopGroup-2-7] INFO com.couchbase.client.dcp.Client - Stopping to Stream for 1 partitions
13:44:50.222 [nioEventLoopGroup-2-7] INFO com.couchbase.client.dcp.Client - Starting to Stream for 1 partitions
13:44:50.222 [nioEventLoopGroup-2-7] WARN c.c.client.dcp.conductor.Conductor - Error during Partition Move for partition 42
com.couchbase.client.dcp.error.RollbackException: null
at com.couchbase.client.dcp.conductor.DcpChannelControlHandler.filterOpenStreamResponse(DcpChannelControlHandler.java:85)
at com.couchbase.client.dcp.conductor.DcpChannelControlHandler.onEvent(DcpChannelControlHandler.java:53)
at com.couchbase.client.dcp.transport.netty.DcpMessageHandler.channelRead(DcpMessageHandler.java:108)
at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)
at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342)
at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:335)
at com.couchbase.client.deps.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:312)
at com.couchbase.client.deps.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:286)
at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)
at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342)
at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:335)
at com.couchbase.client.deps.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1302)
at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)
at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342)
at com.couchbase.client.deps.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at com.couchbase.client.deps.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:135)
at com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:646)
at com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:581)
at com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:498)
at com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:460)
at com.couchbase.client.deps.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
at com.couchbase.client.deps.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
at java.lang.Thread.run(Thread.java:748)
13:44:50.238 [nioEventLoopGroup-2-7] WARN com.couchbase.client.dcp.Client - Received rollback for vbucket 42 to seqno 0
rollback message for partition 42
13:44:50.238 [nioEventLoopGroup-2-7] INFO com.couchbase.client.dcp.Client - Stopping to Stream for 1 partitions
13:44:50.238 [nioEventLoopGroup-2-7] INFO com.couchbase.client.dcp.Client - Starting to Stream for 1 partitions
rollback completed for partition 41
rollback completed for partition 43
rollback completed for partition 42
detected stream end 41
detected stream end 43
13:45:22.890 [nioEventLoopGroup-2-7] INFO c.c.client.dcp.conductor.DcpChannel - Node 2.couchbase.prod.omega/172.100.0.6:11210 socket closed, initiating reconnect.
13:45:23.150 [nioEventLoopGroup-2-8] INFO c.c.client.dcp.conductor.DcpChannel - Connected to Node 2.couchbase.prod.omega/172.100.0.6:11210
13:45:23.270 [nioEventLoopGroup-2-8] WARN c.c.client.dcp.conductor.Conductor - Error during Partition Move for partition 42
com.couchbase.client.dcp.error.RollbackException: null
at com.couchbase.client.dcp.conductor.DcpChannelControlHandler.filterOpenStreamResponse(DcpChannelControlHandler.java:85)
at com.couchbase.client.dcp.conductor.DcpChannelControlHandler.onEvent(DcpChannelControlHandler.java:53)
at com.couchbase.client.dcp.transport.netty.DcpMessageHandler.channelRead(DcpMessageHandler.java:108)
at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)
at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342)
at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:335)
at com.couchbase.client.deps.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:312)
at com.couchbase.client.deps.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:286)
at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)
at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342)
at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:335)
at com.couchbase.client.deps.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1302)
at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)
at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342)
at com.couchbase.client.deps.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at com.couchbase.client.deps.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:135)
at com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:646)
at com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:581)
at com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:498)
at com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:460)
at com.couchbase.client.deps.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
at com.couchbase.client.deps.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
at java.lang.Thread.run(Thread.java:748)
13:45:23.270 [nioEventLoopGroup-2-8] WARN com.couchbase.client.dcp.Client - Received rollback for vbucket 42 to seqno 0
rollback message for partition 42
13:45:23.270 [nioEventLoopGroup-2-8] INFO com.couchbase.client.dcp.Client - Stopping to Stream for 1 partitions
13:45:23.270 [nioEventLoopGroup-2-8] INFO com.couchbase.client.dcp.Client - Starting to Stream for 1 partitions
rollback completed for partition 42
detected stream end 42
13:45:36.070 [nioEventLoopGroup-2-8] INFO c.c.client.dcp.conductor.DcpChannel - Node 2.couchbase.prod.omega/172.100.0.6:11210 socket closed, initiating reconnect.
Total Docs: 2060894
Total Bytes: 4007184913
Average Size per Doc: 1944b
13:45:36.354 [main] INFO com.couchbase.client.dcp.Client - Starting to Stream for 5 partitions
13:45:36.738 [nioEventLoopGroup-2-1] INFO c.c.client.dcp.conductor.DcpChannel - Connected to Node 2.couchbase.prod.omega/172.100.0.6:11210
detected stream end 48
detected stream end 47
detected stream end 49
detected stream end 46
detected stream end 45
13:46:54.590 [nioEventLoopGroup-2-1] INFO c.c.client.dcp.conductor.DcpChannel - Node 2.couchbase.prod.omega/172.100.0.6:11210 socket closed, initiating reconnect.
Total Docs: 2200213
Total Bytes: 4276676309
Average Size per Doc: 1943b
13:46:54.775 [main] INFO com.couchbase.client.dcp.Client - Starting to Stream for 5 partitions
13:46:54.800 [nioEventLoopGroup-2-2] INFO c.c.client.dcp.conductor.DcpChannel - Connected to Node 2.couchbase.prod.omega/172.100.0.6:11210

Logs of DCP producer:

2017-12-26T12:44:42.600609Z WARNING (general) DCP (Producer) eq_dcpq:dcp-java-26695272-8e25-4bf6-b026-0141db54d7d1 - (vb 42) Stream request failed because a rollback to seqno 0 is required (start seqno 134865, vb_uuid 244630521369222, snapStartSeqno 0, snapEndSeqno 195461)
2017-12-26T12:44:42.619577Z NOTICE (general) DCP (Producer) eq_dcpq:dcp-java-26695272-8e25-4bf6-b026-0141db54d7d1 - (vb 41) Creating stream with start seqno 0 and end seqno 193269
2017-12-26T12:44:42.619630Z NOTICE (general) DCP (Producer) eq_dcpq:dcp-java-26695272-8e25-4bf6-b026-0141db54d7d1 - (vb 41) Scheduling backfill from 1 to 193269, reschedule flag : False
2017-12-26T12:44:42.619792Z NOTICE (general) DCP (Producer) eq_dcpq:dcp-java-26695272-8e25-4bf6-b026-0141db54d7d1 - (vb 41) Sending disk snapshot with start seqno 0 and end seqno 193269
2017-12-26T12:44:42.635528Z NOTICE (general) DCP (Producer) eq_dcpq:dcp-java-26695272-8e25-4bf6-b026-0141db54d7d1 - (vb 43) Creating stream with start seqno 0 and end seqno 184519
2017-12-26T12:44:42.635574Z NOTICE (general) DCP (Producer) eq_dcpq:dcp-java-26695272-8e25-4bf6-b026-0141db54d7d1 - (vb 43) Scheduling backfill from 1 to 184519, reschedule flag : False
2017-12-26T12:44:42.641961Z NOTICE (general) DCP (Producer) eq_dcpq:dcp-java-26695272-8e25-4bf6-b026-0141db54d7d1 - (vb 43) Sending disk snapshot with start seqno 0 and end seqno 184521
2017-12-26T12:44:42.642014Z NOTICE (general) DCP (Producer) eq_dcpq:dcp-java-26695272-8e25-4bf6-b026-0141db54d7d1 - (vb 42) Creating stream with start seqno 0 and end seqno 195429
2017-12-26T12:44:42.642039Z NOTICE (general) DCP (Producer) eq_dcpq:dcp-java-26695272-8e25-4bf6-b026-0141db54d7d1 - (vb 42) Scheduling backfill from 1 to 195429, reschedule flag : False
2017-12-26T12:44:42.663184Z NOTICE (general) DCP (Producer) eq_dcpq:dcp-java-26695272-8e25-4bf6-b026-0141db54d7d1 - (vb 42) Sending disk snapshot with start seqno 0 and end seqno 195463
2017-12-26T12:44:51.735900Z WARNING (session) Illegal value for lock timeout specified 30000. Using default value: 15
2017-12-26T12:45:07.079409Z NOTICE (general) DCP (Producer) eq_dcpq:dcp-java-26695272-8e25-4bf6-b026-0141db54d7d1 - (vb 41) Backfill complete, 24553 items read from disk, 3744 from memory, last seqno read: 193269
2017-12-26T12:45:11.250123Z NOTICE (general) DCP (Producer) eq_dcpq:dcp-java-26695272-8e25-4bf6-b026-0141db54d7d1 - (vb 42) Backfill complete, 24579 items read from disk, 3995 from memory, last seqno read: 195463
2017-12-26T12:45:12.756968Z WARNING (session) Illegal value for lock timeout specified 30000. Using default value: 15
2017-12-26T12:45:12.868444Z NOTICE (general) DCP (Producer) eq_dcpq:dcp-java-26695272-8e25-4bf6-b026-0141db54d7d1 - (vb 43) Backfill complete, 24409 items read from disk, 3922 from memory, last seqno read: 184521
2017-12-26T12:45:13.984844Z WARNING (session) Illegal value for lock timeout specified 30000. Using default value: 15
2017-12-26T12:45:14.890440Z NOTICE (general) DCP (Producer) eq_dcpq:dcp-java-26695272-8e25-4bf6-b026-0141db54d7d1 - (vb 41) Stream closing, sent until seqno 193269 remaining items 0, reason: The stream ended due to all items being streamed
2017-12-26T12:45:14.895479Z NOTICE (general) DCP (Producer) eq_dcpq:dcp-java-26695272-8e25-4bf6-b026-0141db54d7d1 - (vb 41) Stream closed, 28297 items sent from backfill phase, 0 items sent from memory phase, 193269 was last seqno sent
2017-12-26T12:45:15.164722Z NOTICE (general) DCP (Producer) eq_dcpq:dcp-java-26695272-8e25-4bf6-b026-0141db54d7d1 - (vb 43) Stream closing, sent until seqno 184521 remaining items 0, reason: The stream ended due to all items being streamed
2017-12-26T12:45:15.164801Z NOTICE (general) DCP (Producer) eq_dcpq:dcp-java-26695272-8e25-4bf6-b026-0141db54d7d1 - (vb 43) Stream closed, 28331 items sent from backfill phase, 0 items sent from memory phase, 184521 was last seqno sent
2017-12-26T12:45:15.268669Z WARNING (general) DCP (Producer) eq_dcpq:dcp-java-26695272-8e25-4bf6-b026-0141db54d7d1 - (vb 43) Stream request failed because the start seqno (184521) is larger than the end seqno (184519); Incorrect params passed by the DCP client
2017-12-26T12:45:15.269692Z NOTICE (general) DCP (Producer) eq_dcpq:dcp-java-26695272-8e25-4bf6-b026-0141db54d7d1 - (vb 42) Stream closing, sent until seqno 191974 remaining items 0, reason: The stream closed early because the conn was disconnected
2017-12-26T12:45:15.668092Z WARNING (general) DCP (Producer) eq_dcpq:dcp-java-00d4c092-3989-4beb-a166-d0172594fdf2 - (vb 42) Stream request failed because a rollback to seqno 0 is required (start seqno 191974, vb_uuid 244630521369222, snapStartSeqno 0, snapEndSeqno 195463)
2017-12-26T12:45:15.681594Z NOTICE (general) DCP (Producer) eq_dcpq:dcp-java-00d4c092-3989-4beb-a166-d0172594fdf2 - (vb 42) Creating stream with start seqno 0 and end seqno 195429
2017-12-26T12:45:15.681633Z NOTICE (general) DCP (Producer) eq_dcpq:dcp-java-00d4c092-3989-4beb-a166-d0172594fdf2 - (vb 42) Scheduling backfill from 1 to 195429, reschedule flag : False
2017-12-26T12:45:15.681767Z NOTICE (general) DCP (Producer) eq_dcpq:dcp-java-00d4c092-3989-4beb-a166-d0172594fdf2 - (vb 42) Sending disk snapshot with start seqno 0 and end seqno 195465
2017-12-26T12:45:21.642043Z WARNING (session) Illegal value for lock timeout specified 30000. Using default value: 15
2017-12-26T12:45:22.962050Z NOTICE (general) DCP (Producer) eq_dcpq:dcp-java-00d4c092-3989-4beb-a166-d0172594fdf2 - (vb 42) Backfill complete, 24579 items read from disk, 3995 from memory, last seqno read: 195465
2017-12-26T12:45:28.367234Z NOTICE (general) DCP (Producer) eq_dcpq:dcp-java-00d4c092-3989-4beb-a166-d0172594fdf2 - (vb 42) Stream closing, sent until seqno 195465 remaining items 0, reason: The stream ended due to all items being streamed
2017-12-26T12:45:28.367283Z NOTICE (general) DCP (Producer) eq_dcpq:dcp-java-00d4c092-3989-4beb-a166-d0172594fdf2 - (vb 42) Stream closed, 28574 items sent from backfill phase, 0 items sent from memory phase, 195465 was last seqno sent
2017-12-26T12:45:28.465920Z WARNING (general) DCP (Producer) eq_dcpq:dcp-java-00d4c092-3989-4beb-a166-d0172594fdf2 - (vb 42) Stream request failed because the start seqno (195465) is larger than the end seqno (195429); Incorrect params passed by the DCP client
2017-12-26T12:45:28.761714Z WARNING (session) Illegal value for lock timeout specified 30000. Using default value: 15

I start DCP streams, 5 at a time. When these 5 streams complete, I go on with the other partitions, 5 by 5.
When all the streams are started for 1024 partitions at the same time, there is no problem.
There are 4 nodes in the cluster.

There is also a bug in the current DCP client implementation on in the method rollbackToPosition of SessionState. The line where flog.remove is called, will always throw an exception, since the iterator acquired a few lines earlier, will not support remove. I’ve fixed this issue for my tests.

I suspect that there are other problems in regard the DCP client implementation. As I see, the client acquires seqnos which are not valid for partitions. I always want to start a stream from 0, it is not clear to me why the DCP client would start 200000 later.

@daschl do you have any idea?

Due to Christmas I have very limited time to continue with this, but found another issue with the Java DCP implementation. When the stream is being closed in BEGINNING -> NOW scenarios, this bug forces the DCP client to stuck in and endless loop of RollbackExceptions.

See https://github.com/zzvara/java-dcp-client/commits/master

@david.nault might be able to help you out on the DCP issues here, he is the one maintaining it at the moment (and @avsej might also jump in)

Thanks Michael. @zoltan.zvara, thank you for the pull request. I have a question about whether it’s necessary to add a new hasPassedEnd, method or whether it makes more sense to modify isAtEnd to also return true for the “past end” state. Let’s continue the discussion in on Gerrit. http://review.couchbase.org/c/87509/

Cheers,
David

As of today, it appears that the Spark connector still reads in a partition at a time with no concurrency. Is there a reason that this has not been addressed yet? It’s the primary purpose of using Spark. @david.nault @daschl