How to run a distributed query and get different sections of the result to each executor

I’ve got quite a large dataset in Couchbase which I’m processing in parallel, however the initial query is starting to get too big to fit on a single machine. It’s also a very slow operation to repartition the data once the full query result has arrived. Is there any way to send this query to the executors to be run in parallel, so that each executor gets a different section of the results?

I’m on 2.2.0 of Spark / Couchbase connector.

Unfortunately there is no way for Couchbase at the moment to split up a n1ql query for you since it won’t be possible to assemble the results back in the exact form (since individual queries are not coordinated and it is not clear how to shard the query properly).

You’d have to run the query on one executor and then I think spark provides a way to repartition the response to multiple workers based on some information you have available in the data payload. Is that an option that might work for you?

Unfortunately not, that’s exactly the issue. The query result won’t fit on a single executor. When you say individual queries aren’t coordinated, do you mean that OFFSET and LIMIT clauses will produce some duplicate results? I’m considering distributing queries that will return a range of the data to the executors instead.

@lukeb it depends on the type of workload you have. Since the indexes are updated in the background, it could be that 3 independent queries are returning different results depending on where the indexes are at each time of the query. If you know this is not a problem in your env (i.e. you do not have updates going on at the time of query or those updates do not affect the indexes… or you are okay with a little “fuzziness” then this might be a viable option). It’s just that we cannot provide those guarantees in a generic effort.

I’m attempting to run different portions of the query using OFFSET / LIMIT / ORDER BY, but I’m running into this issue now:

java.lang.IllegalStateException: The content of this Observable (querySignature.c05a72da-8aa5-493c-bf15-3c7dbd48e753) is already released. Subscribe earlier or tune the CouchbaseEnvironment#autoreleaseAfter() setting.

It happens while mapping each of the queries into their result rows:

val paginatedQuery = sc.parallelize(queryStrings, numPartitions)
val responses = paginatedQuery.map(page => RunQueryPage(page))
val results = responses.flatMap(r => r)

Each executor runs this method to run the query and get the results:

  def RunQueryPage(queryString: String) : List[String] = {
    val queryResult = SingContext.jobBucket.query(N1qlQuery.simple(queryString))
    val rows = queryResult.allRows()
    val jsonData = rows.map(r => r.value().toString).toList
    jsonData
  }

I don’t see any Observables anywhere in the N1QLQueryResult object, so I’ve got no idea what is timing out.

It seems that using the regular Bucket object on the executors hits the autoreleaseAfter limit for any query that doesn’t return almost instantly. Moving to the async API fixed the issue for me.

1 Like