Spark Connector Returns Different Result Than cbq Query

Hi,

I’m using couchbase 4.0, spark 1.6.1 with spark-connect 1.2 (also tested spark 1.5.1 with spark-connector 1.1). There are some strange behavior about the spark connector.

The code I ran is a simple query:

val v = sqlContext.read.couchbase(schemaFilter = EqualTo(“metadata-type”, “type1”)).cache()
v.count

First, this line of code was executed on only one single task - the resulting dataFrame has only one partition although I assigned 10 executors to this spark app,
Second, when I print out the count, the number is not correct - it is a number much smaller than the total count. Running this same code multiple times prints out different (all wrong) results.

If I ran this query in cbq command line:

select count(*) from my-bucket where metadata-type = “type1”;

I got the correct count returned.

Did I miss something obvious here? Has anyone experience something like this?

Thanks a lot for the help.

More information:

I tried multiple datasets with the same query. Each dataset has about 1 million records. The majority of them return “correct” count from the spark code (same number as the cbq query returns), but three of the datasets returned different numbers.

Those records in couchbase are all simple JSON.

Hi @hclc,

This is expected since we are streaming it from one source and not partitioning it. I recommend you to repartition later if you need parallelism or manually split up the queries (not sure what the best strategy is for you here).

The spark log actually has the executed queries in the log at INFO level. Grep for “SELECT” and you’ll see both the inferred schema query as well as your executed ones and compare them and/or put them into cbq and check out the results.

In the end we are just executing a N1QL query like you do manually - one other question would be: are you having traffic in between so the data changes?

Hi, daschl,

Thanks a lot for your help!

For the first point, it’s good to know that’s the way it works. For the second one, I’m sure there’s no other write operations during my test, and I have tried those spark reads and cbq queries back and forth for quite a while, and the numbers are always different. cbq query gives me consistent results, spark code gives me slightly different results from each run, and are largely different than the cbq query results.

I did find the query from spark log:

SELECT {all the 60 fields} FROM my-bucket WHERE metadata-type = ‘type1’

seems no difference than my n1ql query - except in n1ql query I directly select count(*).

@hclc hm a few other things to drill down… I think we need to find out if the server returns less records or they get sucked up in spark somewhere.

So:

  • Can you execute the SELECT {all the 60 fields} FROM my-bucket WHERE metadata-type = 'type1' query in CBQ and check if the correct number of rows are returned?
  • You could also do a sparkContect.couchbaseQuery(…) with the query above and see if the RDD count is as expected.
  • You could also turn up TRACE logging and look at the raw http response in the logs and see if the count is as expected.

As soon as we know where the discrepancy comes from we can figure out how to fix it.

@daschl Thanks for the reply…

The dataset that has discrepancy has about 1 million records, so running select * from my-bucket from cbq console is kind of impossible (I tried and after several minutes got and error “index scan timed out”). However I believe the numbers from the cbq query is always correct.

Smaller datasets (with thousands of records) don’t have any number discrepancy issue in my tests with cbq query and spark-connector.

I tried sparkContect.couchbaseQuery(…) with the same query I used in cbq. couchbaseQuery(“select * from …”) returns a number that’s much smaller than the correct number.
couchbaseQuery(“select count(*) as counts from …”) returns nothing…when I print out the result RDD’s count(), I got 0. When I try resultRDD.map(row => row.value.getLong(“counts”)).collect foreach println, I still got nothing. Did I missing something here?

One thing worth mentioning is when using sqlContext.read.couchbase, I can see there’s only about 1k read/sec on the couchbase cluster. But when using couchbaseQuery(select count(*) …), it shows > 10K reads from the couchbase UI, same as directly running the query in cbq.

Maybe for some reason sqlContext.read.couchbase was much slower (because select * is uses more network bandwidth, for example), and later it got error just like “Index scan timed out”, but swallowed the error, and returned only whatever was scanned by that time?

A separate question I found when experimenting with spark connector is, in a spark app, if I create the sparkConf with couchbase properties, I won’t be able to talk to my Hive anymore. I am trying to batch load metadata to Hive table. Now it seems an impossible approach as my sparkContext won’t be able to talk to both Couchbase and Hive at the same time.

Do you have any other suggestions for this use case? I know there’s a spark-hadoop connector, but I would need more fine-grained processing when publishing data to hive.

that could be the case - the problem is that spark itself does not push down the limit so we always need to fetch all records according to your predicate. can you try to make the predicate for the query more restrictive so less rows are returned? Also can you check the spark log for any errors that might’ve been logged?

which properties are you setting so that this happens?

I have tested with different sizes of data. If the query only returns a small data set (several thousands), then it works fine and the numbers are correct. For larger numbers, there’s no error messages from the Spark log.

For Hive, normally I provide hive-site.xml and core-site.xml, then my spark app is able to access hive tables. However, to use Couchbase bucket, I have to add this line of code as stated in Couchbase documentation:

val cfg = new SparkConf()
.setAppName(“couchbaseQuickstart”) // give your app a name
.setMaster(“local[*]”) // set the master to local for easy experimenting
.set(“com.couchbase.bucket.travel-sample”, “”) // open the travel-sample bucket

[Edit]
To be precise, this is my code - I’m using a cloudera cluster under yarn mode, not using local master:

val sparkConf = new SparkConf().setAppName(config.getString(“appName”)).set(“com.couchbase.nodes”, config.getString(“couchbaseSeedIPs”)).set(“com.couchbase.bucket.” + config.getString(“couchbaseBucketName”), “”)

hive-site.xml and core-site.xml are provided to the spark shell as usual.
[/Edit]

As soon as this line was added, my spark app won’t be able to find hive tables anymore. Error message is (cannot find table “table name”). I believe it’s trying to find the table from the couchbase bucket.

So overall, I believe there are three very critical issues here:

  1. couchbase connector does not return the correct number of records if the dataset is relatively large
  2. when spark app is set to access couchbase bucket, it is not able to access hive anymore
  3. the design of sending all records into one spark executor through the connector. This, IMHO, is a series performance issue. My test couchbase cluster is powerful enough to support > 200K get ops/sec on my test dataset (and this number is only limited by the network bandwidth of the cluster). However, when using spark connector to retrieve the same dataset (which should obviously take advantage of batch get), the couchbase server only gets ~ 1K read/sec. And imagine millions of records get retrieved into one single executor of a spark cluster, then user have to repartition it manually – that’s a huge performance hit and could cause problem too.

Hope these information is helpful. Couchbase is a great product, and a good spark connector will bring so much more big data use cases to it.

Thanks

Okay thanks, I’ve created tickets on https://issues.couchbase.com/browse/SPARKC to track your input :slight_smile:

The Hive one sounds weird, I’ll try to get to the bottom of it.

Pretty sure this turns out to be a time out issue. In my testing, the process consistently stopped at around 76 seconds, which was my first clue since the default couchbase query time out is 75 seconds

adding a system property for all the spark executors “solved” the problem, kinda (e.g. adding this on the spark-shell command line):

--conf spark.executor.extraJavaOptions=-Dcom.couchbase.queryTimeout=360000

I say “kinda” in that increasing the timeout gets me the correct count for this particular task. I have ~4 million records I need to read for this process and I can do that with a 5 minute timeout - but the next task I have is to read ~198 million records. Ideally I’d like to be able to leverage the parallel processing capabilities of Spark to load those records more quickly than ~200 minutes…

Is there any way to spread the query across multiple spark executors? Or is there a way I could just get the IDs, and then do a bunch of batch gets from there? Any other creative solutions?

Really happy with couchbase and we’d love to be able to get our data into spark for further data analysis! Hopeful that someone out there has encountered this problem before and has a solution!

Thanks!

setting --conf spark.executor.extraJavaOptions=-Dcom.couchbase.queryTimeout=360000 also solved my problem where the actual number of documents loaded was different from what we know. But this is more or less temporary solution since the number of docs can go up dramatically and then we have to reconfigure this parameter. Any other suggestions?

Also it would be nice to throw timeout error if timeout happens, otherwise we really don’t know whether we have right data or not. Any idea?

To increase the parallelism, I first load all meta id from couchbase and repartition it. Then, I load the full data from couchbase. By this, I could fully use my cluster resource. You can find the code here: spark.sparkContext.couchbaseQuery number of partitions