Dataset join with document ID

Hey,

I rose this issue on the Gitter chat a few weeks ago (https://gitter.im/couchbase/discuss?at=588b5b5d5309d6b35875ae21). I’m using the Spark Couchbase Connector to join and persist records to a couchbase bucket.

Given a dataset of documents like this:

{
	"key": "123",
    "foo": "bar",
	"fooStuff": [
		{
			"moreStuff": "bar"
		}
	]
}

I persist to Couchbase like so:

val couchbaseConf = Map("bucket" -> "myBucket", "idField" -> "key")
myDataset.toDF().write.mode(SaveMode.Overwrite).couchbase(couchbaseConf)

When I do this I notice that the Connector will extract my “key” field and use it as the document ID, but then it will not persist the “key” to the document itself. I’m left with a document in couchbase like this:

{
    "foo": "bar",
	"fooStuff": [
		{
			"moreStuff": "bar"
		}
	]
}

I understand the reasoning for doing this, but what am I to do if later in my job I would like to perform a Dataset join? I no longer have the “key” field to join on the documents in the Couchbase bucket. My workaround is to duplicate they key for my document like this:

{
    "couchbaseId": "123",
	"key": "123",
    "foo": "bar",
	"fooStuff": [
		{
			"moreStuff": "bar"
		}
	]
}

Then persist to Couchbase like so:

val couchbaseConf = Map("bucket" -> "myBucket", "idField" -> "couchbaseId")
myDataset.toDF().write.mode(SaveMode.Overwrite).couchbase(couchbaseConf)

So that I can perform a join based on documents I’m streaming in, and those that already exist in my couchbase bucket:

val couchbaseDataset = sparkSession.sqlContext.read.couchbase(couchbaseConf)
streamedDataset.joinWith(couchbaseDataset, streamedDataset.col("key") === couchbaseDataset.col("key"), "inner")

This works, but seems clunky. Is there a way to join by the Couchbase document ID using the Dataset API?

Thanks,
Sean

Bonus Question:

If my couchbase bucket contains 1,000,000 documents, but I am only joining it with a local Dataset that contains 1 document, will the connector attempt to load all 1,000,000 bucket documents into Spark cluster?

@daschl Is this something you could assist me with?

@sean.glover unfortunately I think the way you are doing it is the only real way right now. We could think about adding another option which also stores the field on write and not removes it, so it would be stored with the document. Would that improve your situation?

@daschl Thanks for the response. That would make the solution a little bit cleaner.

However, now I’m questioning the choice of using a Dataframe join to begin with based on what I mentioned in the “Bonus Question”. Can you elaborate on what’s happening when you perform a join? What would you recommend to do the lookup I’m trying to do (small in memory dataset to a large couchbase bucket/dataset)?

Here is the problem: when you perform a join in Spark SQL it doesn’t push down the join at all. The only thing “we” get are the predicates and the filters (basically the fieldnames from the SELECT and the WHERE clause).

So the spark connector has no idea to begin with that you are joining stuff together. The only way to make it work is to run a raw N1QL query with an RDD and then create a dataframe out of it.

@daschl I see. Thanks for that. What if I were to insert my small dataset into a “temporary” bucket and perform a raw N1QL query to join on that? I’m fairly new to Couchbase, so I’m not sure if such a thing is possible, but if I were to do this with a standard RDBMS that would probably be my most efficient option.

@sean.glover no need to move data into a different bucket, N1QL supports cross document JOINS in a bucket and even joins in a single document :slight_smile:

@daschl Cool. But what’s happening in my app is I’m streaming in new documents from an entirely different source (Kafka). These new documents have ID’s which I need to lookup efficiently within an existing couchbase bucket. There could be 1000’s of ID’s I need to lookup within a single Spark Streaming interval. If I were to insert that data temporarily into couchbase then I could do a cross bucket join.

Does that make sense or would you propose a different strategy?

@sean.glover if you know the document IDs, why not direct KV access via its document ID? You can then create a DF out of it still if you need to

@daschl So I would use the underlying Java SDK to perform a bulk get of many documents? I found this doucmentation using RxJava to do bulk operations, but it looks like an older resource (http://docs.couchbase.com/developer/java-2.0/documents-bulk.html). Can you point me in the right direction of how I would do a bulk retrieval? Is it just 1 giant N1QL statement with a large filter (i.e. a where clause with 1000 document ID’s?)

This section has content on creating RDDs https://developer.couchbase.com/documentation/server/4.5/connectors/spark-2.0/working-with-rdds.html you can use couchbaseGet[JsonDocument] with a sequence of document IDs, and then you’ll get an RDD of JsonDocument which you can then do whatever you need - would that fit your needs?

@daschl Yes, that should work. I’ll try it. Thanks.

1 Like

@sean.glover I merged a changed into master which allows you to set the “removeIdField” to false -> http://review.couchbase.org/#/c/73569/ in the upcoming release