Hey,
I rose this issue on the Gitter chat a few weeks ago (https://gitter.im/couchbase/discuss?at=588b5b5d5309d6b35875ae21). I’m using the Spark Couchbase Connector to join and persist records to a couchbase bucket.
Given a dataset of documents like this:
{
"key": "123",
"foo": "bar",
"fooStuff": [
{
"moreStuff": "bar"
}
]
}
I persist to Couchbase like so:
val couchbaseConf = Map("bucket" -> "myBucket", "idField" -> "key")
myDataset.toDF().write.mode(SaveMode.Overwrite).couchbase(couchbaseConf)
When I do this I notice that the Connector will extract my “key” field and use it as the document ID, but then it will not persist the “key” to the document itself. I’m left with a document in couchbase like this:
{
"foo": "bar",
"fooStuff": [
{
"moreStuff": "bar"
}
]
}
I understand the reasoning for doing this, but what am I to do if later in my job I would like to perform a Dataset join? I no longer have the “key” field to join on the documents in the Couchbase bucket. My workaround is to duplicate they key for my document like this:
{
"couchbaseId": "123",
"key": "123",
"foo": "bar",
"fooStuff": [
{
"moreStuff": "bar"
}
]
}
Then persist to Couchbase like so:
val couchbaseConf = Map("bucket" -> "myBucket", "idField" -> "couchbaseId")
myDataset.toDF().write.mode(SaveMode.Overwrite).couchbase(couchbaseConf)
So that I can perform a join based on documents I’m streaming in, and those that already exist in my couchbase bucket:
val couchbaseDataset = sparkSession.sqlContext.read.couchbase(couchbaseConf)
streamedDataset.joinWith(couchbaseDataset, streamedDataset.col("key") === couchbaseDataset.col("key"), "inner")
This works, but seems clunky. Is there a way to join by the Couchbase document ID using the Dataset API?
Thanks,
Sean
Bonus Question:
If my couchbase bucket contains 1,000,000 documents, but I am only joining it with a local Dataset that contains 1 document, will the connector attempt to load all 1,000,000 bucket documents into Spark cluster?