spark.sparkContext.couchbaseQuery number of partitions

As I’m scaling my deployment to 40M documents (and expected to double in 2 weeks) on 4 nodes, all the solutions presented above transition from impractical to infeasible.

I have managed to PoC out the following approaches:

  1. Fetch all IDs using a Couchbase query to a single Spark partition as presented above by @xuan. Here I observed query timeouts, OOM errors, backfill limits. On the other hand, it was exceptionally slow. Several hours passed and I got to the point where I had IDs redistributed on the cluster, so that the .gets could start. The deadline SLA I specified in this setting for my experimental batch processing workload was not going to be met by this approach.
  2. Tried to distribute Couchbase queries among Spark partitions in a way that each partitions fires up a query, but with LIMIT and OFFSET specified, so that all partitions acquire a subset of IDs (or documents). Could not get pass the backfill errors, and it was slow as well. (Why is the backfill limit burned into CB 4.5?)
  3. I knew I either get my hands dirty or replace Couchbase with another system (for example Cassandra). I designed and implemented an API to Spark, that would use DCP clients to stream data from a bucket (from the beginning to now).

The third solution seems to be fast, although there were some problems:

  • When you stream using DCP from the beginning to now, the DCP client will acquire failover logs for each vBucket, that was exceptionally slow, since it is done 1 by 1. I tried to run 10 DCP clients per machine, so it slowed it down even more. I couldn’t track down why failover logs are so slow to acquire.
  • To tackle the problem above, I assign vBucket IDs to Spark partitions (probably more then one vBucket is assigned to a Spark partition), and 1 or 2 preinitialized DCP client is reused on each Spark executor.
  • DCP client sessions has to be managed carefully as well, since within a Spark job, the same bucket might be acquired more than once.
  • I use commons-pool2 to manage a DCP client pool.

My initial tests shows that this approach is very practical. I hope a similar solution will hit the Spark Connector as well. I can not imagine that anyone is using the stock Spark Connector on its own for any Big Data use-case.

Cheers & Christmas,
Zoltán