What is the mechanism to pass the query results from couchbase to spark for further processing?

#1

As far as I understand, when we make a view query, one of the data service nodes gets it and coordinates to scatter the query to the other nodes. Once the query finishes, that node coordinates to collect the results and return to the user.

My question is when this is combined with spark (using spark-couchbase-connector), when the results are obtained from all the nodes, what is the mechanism to pass the RDD results to a number of spark executors?

Does the coordinating node in couchbase asks all the nodes working on the query to pass the RDD to spark executor nodes by themselves? or Does the coordinating node forward the RDD from all the couchbase working nodes to the spark executor nodes? (it seems the former one is preferred, but I want to understand how the current spark-couchbase-connector works. )

thanks
edward

#2

Hi Edward,

Good question. Couchbase Server has data locality when you’re using the key value operations but not with views and N1QL queries, for exactly the reason you mentioned. View queries are executed by scatter gather so data locality wouldn’t really help much. By that I mean, even though the Couchbase Spark Connector knows where any individual key resides, if it calls the view interface, the views will execute the query with a broadcast.

There are different deployment architectures you can choose for running the Couchbase Spark Connector, so if you have a case like this, you might choose to run your Spark job on a single executor node (not always sensible, but it might be if you don’t have a lot of data that you need to get back). That just saves you having to distribute the data to multiple Spark nodes if it isn’t really necessary for other reasons.

CC @daschl in case he wants to improve my answer.

#3

Thank you Will !

My use case is to retrieve all the keys from ViewQuery or N1ql query, then fetch all the jsonDocuments associated with the keys. I guess I have to distribute the keys retrieved to multiple spark nodes to achieve better parallelism when it comes to retrieve the jsonDocuments.

Furthermore, I am curious about how couchbase scales on the ViewQuery. If all the View queries are executed by scatter gather, and it runs on a single executor node, what if the query results size goes beyond the total RAM of that node when it comes down to gather stage? Will it spill to the disk of that node? Will it significantly slow down the working node? Is it designed to handle this scenario?

thanks
edward

#4

You have two options here: perform a view query with or without fetching the documents right away with the view query. Depending on the size of the query the first one might be appropriate but if you get into the hundreds of thousands you might be better off doing it separately. If you want the connector to fetch the docs for you right away set the “includeDocs” param on the ViewQuery, then in the RDD you’ll get the document delivered with it.

If you really get into high volume doing what you said is another way to go, just fetching the IDs from the view and then redistributing it. Note that currently the “preferred locations” feature for KV is only available on the SparkContext when the RDD is created, I’m currently investigating how I can provide the preferred locations even when you use the RDD[ViewQuery].couchbaseView API, basically. SO TL;DR is I think stick with includeDocs for now and see if it works, I’m pretty sure it will.

View Queries in Couchbase Server are delivered from disk (they are stored there). The Btree lookup is efficient, but if you need all in memory indexing I recommend to use N1QL on 4.5 instead (will be GA very soon and you can try it out with the beta right now) where you can create memory optimized indexes, which has the additional benefit so that you can use Spark SQL with it if you want.

If you can shed more light on your actual view use case then I’d be happy to provide more feedback :slight_smile:

#5

Thank you so much @dachl !
My use case is as follows:

  1. retrieve jsondocument from bucket based on a certain value of a field. I can use a predefined view or n1ql query for that.
  2. extract features from jsondocument.
  3. build machine learning model based on the features and labels.

I prefer to leverage spark to pipeline these steps such that the entire processing is distributed, thus converting data into RDD is preferred.

I fully understand the two options you advised. I do want to get to the bottom of how the RDD is obtained when doing a ViewQuery. When you say get the RDD right away with the view query using spark-connector, I guess the spark executor which runs the ViewQuery probably reads the results sequentially and converts them into RDDs since there is no preferred location feature for RDD[ViewQuery].couchbaseView API.

Is my understanding correct?

Another way I can think of is to implement map-reduce function for a feature view, then I concern about the flexibility and complexity couchbase can provide to a map/reduce function in a view definition, as well as how I can efficiently pass the View results as input to machine learning library in spark.

best,
edward

#6

Correct, because under the covers the SDK “round robins” between all available view endpoints on the server.

Actually for your specific case, if you can wait for Couchbase Server 4.5 there is another very performant option you can think of.

  1. create a secondary index on field
  2. create an RDD out of SELECT META().id as id FROM bucket WHERE field = “bla”
  3. then you’ll get basically a RDD with all the ids
  4. then you can perform a subdocument lookup to extract only just the fields you need from the json document

Or alternatively, use Spark SQL straight away::

  1. create a secondary index on field
  2. create a dataframe with a filter depending on the type of docs
  3. use a select with a where clause for your field, the predicate will be pushed down to N1QL
  4. pass that DF to machine learning

So while I think you can do that with views, doing it with N1QL might give you better perf if you use MOI (memory optimized indexes) with Server 4.5. Using views might be fine too :slight_smile: