Multiple Couchbase buckets for Spark

Hi,

I’m trying to read / write multiple buckets (actually 2 buckets) for single Spark batch.

http://developer.couchbase.com/documentation/server/current/connectors/spark-1.2/getting-started.html

In the getting started of Spark Connector steps for setting Couchbase RDD as follows:

// Configure Spark
val cfg = new SparkConf()
.setAppName(“couchbaseQuickstart”) // give your app a name
.setMaster(“local[*]”) // set the master to local for easy experimenting
.set(“com.couchbase.bucket.travel-sample”, “”) // open the travel-sample bucket instead of “default”
// .set(“com.couchbase.nodes”, “127.0.0.1”) // ; separated list of bootstrap nodes, 127.0.0.1 is the default

// Generate The Context
val sc = new SparkContext(cfg)

Let’s assume I’d like to read documents from ‘default’ and ‘travel-sample’.
What can I write my code for the above purpose ?

Best

You should be able to add another bucket and then refer to it by name after that. @daschl is going to update the Spark Samples to include an example of this. In the meantime, here’s a starter hint based on the KV sample

object KeyValueExample {

  def main(args: Array[String]): Unit = {

    // Configure Spark
    val cfg = new SparkConf()
      .setAppName("keyValueExample")
      .setMaster("local[*]")
      .set("com.couchbase.bucket.travel-sample", "")
      .set("com.couchbase.bucket.beer-sample", "foo") //Adds a second bucket name with bucket password foo

    // Generate The Context
    val sc = new SparkContext(cfg)

    sc
      .parallelize(Seq("airline_10123", "airline_10748")) // Define Document IDs
      .couchbaseGet[JsonDocument](bucketName = "travel-sample") // Set the bucket name explicitly (you can leave blank if only one bucket)
      .map(_.content()) // extract the content
      .collect() // collect all data
      .foreach(println) // print it out
  }

}

One quick note: if you open more than one bucket and forget to set the bucket name explicitly you’ll get an exception pretty quickly since it will tell you the bucket name is ambiguous and its not able to choose one :slight_smile:

That is great ! It worked.
I tried to the following code in spark-shell:

import org.apache.spark._
import com.couchbase.spark._
import com.couchbase.spark.sql._
import com.couchbase.client.java.document.JsonDocument

sc.stop

val conf = new SparkConf()
	.setMaster("local[*]")
	.setAppName("myapp")
	.set("com.couchbase.nodes", "localhost")
	.set("com.couchbase.bucket.travel-sample", "")
        .set("com.couchbase.bucket.beer-sample", "")

val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val airlines = sqlContext.read.couchbase(schemaFilter = org.apache.spark.sql.sources.EqualTo("type", "airline"))
val breweries = sqlContext.read.couchbase(schemaFilter = org.apache.spark.sql.sources.EqualTo("type", "brewery"))

How do I specify the bucket name for travel-sample ?

Cheers

@webber on the couchbase() where you can pass in the schemaFilter you can also explicitly set the bucket name!

I tried doing it in Java and it still opened just 1 bucket and looked into it.

Can you please provide your code?

The following is the code for opening both the buckets.
SparkSession sparkSession = SparkSession.builder().config(conf) .config("spark.couchbase.bucket.incrementaldata", "") .config("spark.couchbase.bucket.basedata","").getOrCreate();

Below is the code for accessing the needed bucket
EqualTo bucket = new EqualTo("bucketName","basedata"); EqualTo type = new EqualTo("type","custom");

And filters = new And(bucket, type); return couchbaseReader(sparkSession.sqlContext().read()).couchbase(filters);

However, when the code is executed only the incremental data bucket opens even after specifying the bucket as basedata.

@neeleshkumar_mannur I think you’re messing up some concepts here. The type is correctly in the filter but the bucketName is not part of the filter (unless its part of the JSON document structure too which I don’t think is the case).

You need to provide the bucket name as an “option” with the key “bucket”.

@daschl I get the following exception when specifying the bucket name in options as a map

java.lang.IllegalStateException: Not able to find bucket password for bucket incrementaldata

@neeleshkumar_mannur can you show me the new code, and do you have a bucket password set?

@daschl My bad. I forgot to open both buckets. I am able to query the respective buckets now. Sorry for the inconvenience. Thanks for helping out. And if possible, please update Java code examples for such cases also.

Finally, just wanted to know how can I increase the timeout interval when opening the bucket in Spark.

@neeleshkumar_mannur changing the bucket timeout with spark only really works by setting a system property “com.couchbase.connectTimeout” in milliseconds. There is “spark.driver.extraJavaOptions” as well as “–driver-java-options” available in spark to do so I think, according to https://spark.apache.org/docs/latest/configuration.html

1 Like

Hi Will,

While running for reading the data it worked seamlessly with the code below. with one bucket in the config.

.config(“com.couchbase.bucket.B1”, “”)

val ref_qry1=""“select * from employee limit 10"”"
val df= spark.read.json(sc.couchbaseQuery(N1qlQuery.simple(ref_qry1)).map(_.value.toString()))
df,show(false)

I am facing issue while persisting the dataframe to couchbase. For that I have to keep all the buckets I am persisting to in the config. otherwise below method is not able to recognize bucket but If I add more than 1 bucket in configuration read data will fail if not write will fail.

.config(“com.couchbase.bucket.B1”, “”)
.config(“com.couchbase.bucket.EMPLOYEE”, “”)

df.write
.mode(“overwrite”)
.couchbase( Map(“idField” → “uuid”, “bucket” → “employee”, “timeout” → “4900”))

Please guide me for any solution to overcome this error.