I am writing a Scala 2.10.4 application that uses the Spark connector (v1.2) for Couchbase (v4.1) to insert documents into a specific bucket.
Here is my code so far:
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("Load Docs to CB")
//.setMaster("local[*]")
.set("com.couchbase.nodes", "00.00.00.000")
.set("com.couchbase.bucket.mybucket", "password")
println("##### Finished Configuration")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
println("##### Finished setting up Spark Context and Sql Context")
val people = sc.parallelize(Seq(
Person("user::michael", "Michael", 27),
Person("user::tom", "Tom", 33)
)).toDF()
people.registerTempTable("people")
people.write.couchbase(Map("idField" -> "uid"))
println("##### Finished writing to Couchbase")
This code successfully writes to Couchbase.
However, I would like to now insert new data which MAY/MAY NOT contain an already existing key in Couchbase. Please note that this bucket is shared with another team, so at any point I cannot simply flush the entire bucket. In addition, I do not want to check if a key already exists and then upsert it because over time, this may lead to an unnecessary build up of unmodified/dead data in the Couchbase bucket.
Therefore, instead of upserting a document, I would like to go for a full deletion of all documents with keys having a certain prefix and then freshly insert documents.
In summary: 1. Delete all documents that have “user::” as the prefix in their keys. 2. Insert new data - which I have pretty much figured out.
How can I implement #1 in Scala? This is my first shot at Scala and Couchbase as well. Any help, directions or suggestions would be greatly appreciated.
That’s what I was thinking too Matt.
HK, you can do it with N1Ql without needing to switch libraries.
To constrain to document keys use meta().id or use other parameters just like you would in SQL.
import com.couchbase.client.java.query._
val query = "DELETE FROM `travel-sample` where NAME LIKE 'Air Austr%' OR meta().id LIKE 'airline_112%'"
sc.couchbaseQuery(N1qlQuery.simple(query))
Using the DELETE query suggested, I was able to write and build the Scala project using SBT. The code runs completely without any issue and also prints the last println statement - “Successfully deleted the doc”.
However, when I lookup a document with the id “abc_10113210” in the user bucket on my Couchbase UI, the document still exists.
So, the document is NOT being deleted …
Here is the code that I built and ran:
import com.couchbase.client.java.query._
import com.couchbase.spark._
import com.couchbase.spark.sql._
import org.apache.spark.{SparkConf, SparkContext}
object Remover {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("DeleteDocsFromCB")
//.setMaster("local");
.set("com.couchbase.nodes", "xx.xx.xx.xxx")
.set("com.couchbase.bucket.user", "")
println("###### Finished setting up SparkConf")
val sc = new SparkContext(sparkConf)
println("###### Finished setting up SparkContext")
val query = "DELETE FROM `user` WHERE meta().id LIKE '%abc_10113210%'"
println("###### Query - "+ query)
println("Deleting doc")
sc.couchbaseQuery(N1qlQuery.simple(query))
println("Successfully deleted the doc")
Am I still missing something here? Any help would be greatly appreciated. Thanks!