Spark Streaming from bucket, stream is empty


#1

While works completely fine on my default bucket, which is full with tweets, with a local Spark Streaming app to create a couchbaseStream(from = FromBeginning, to = ToInfinity) and to retrieve, print out all the records, unfortunately, the same code does not work on the cluster. No any exception, error or warn thrown neither in the Spark driver, neither on the executors. The Couchbase Client API states that it has been able to create a connection to the target bucket, but no data is received so far.

What could be the problem?

Thanks!


#2

So you are sure you are connected to the bucket with content, and are you using the Spark Connector 1.2.1 and not 1.2.0?

Can you enable TRACE logging and share it with us? I’d like to see whats going on on the network :slight_smile:


#3

Thanks for sharing the logs.

There is something weird going on, its retrying an operation that somehow got returned properly it seems and it never goes up to the level where it needs it.

Just to narrow down the causes, can you check if it works against a non-local 1 node cluster? So if its related to being 2 instead of 1 nodes.


#4

The same code with the same configurations and dependencies work perfectly on a local-mode Spark.

I’m using a laptop to deploy applications to a 3-node cluster, where Couchbase, Kafka, YARN and Hadoop has been installed. The laptop and the 3 nodes are on the same network. When I start the application (Spark Streaming for that matter) with spark.master=yarn-client configuration, no data is going to be received by the executors (Receivers), like if the bucket would be empty - but no WARN or ERROR is thrown regarding accessibility to the cluster or the bucket. Actually these are the logs that I’ve sent to you previously. The funny part is in this deployment, any executor is actually co-located with a Couchbase-node, right?

Now, when I start the application using spark.master=local, that being said the executors and the receivers are local to my computer, the application receives the mutations from Couchbase, the application works! :slight_smile:

I’m thinking on to start the same Spark Streaming application in local mode but on some of the nodes to see if there is a problem that executors are running in default YARN containers.

Do you have any insights? Where would you start?


#5

Thanks for the further information, I think this might be a bug in the dcp implementation in the SDK (core-io), but I need to chat with @avsej about that. I’ll let you know as soon as we have something to share :slight_smile: If you can apply your workaround for now that would be good to not keep you blocked!


#6

Tested with yarn-cluster mode, but does not work either.


#7

Tried a lot of different deploy modes with different configurations, but it seems that when the Receivers are running inside a YARN container on the same cluster where Couchbase has been deployed, it does not work. This is a serious drawback for us, since this means that the Streaming API is not usable.

Do you have any estimate on when is this going to be solved? Or a JIRA issue? Should I file an issue for this one?


#8

Tried several workarounds with no success.

The logs are filled with the following lines:

16/07/21 11:12:08 TRACE ResponseHandler: Retrying GetAllMutationTokensRequest{observable=rx.subjects.AsyncSubject@73841916, bucket=‘default’} with a delay of 100000 MICROSECONDS
16/07/21 11:12:08 TRACE ResponseHandler: Retrying GetAllMutationTokensRequest{observable=rx.subjects.AsyncSubject@73841916, bucket=‘default’} with a delay of 100000 MICROSECONDS
16/07/21 11:12:09 DEBUG RecurringTimer: Callback for BlockGenerator called at time 1469092329000
16/07/21 11:12:09 TRACE ResponseHandler: Retrying GetAllMutationTokensRequest{observable=rx.subjects.AsyncSubject@73841916, bucket=‘default’} with a delay of 100000 MICROSECONDS
16/07/21 11:12:09 TRACE ResponseHandler: Retrying GetAllMutationTokensRequest{observable=rx.subjects.AsyncSubject@73841916, bucket=‘default’} with a delay of 100000 MICROSECONDS
16/07/21 11:12:09 DEBUG RecurringTimer: Callback for BlockGenerator called at time 1469092329200
16/07/21 11:12:09 TRACE ResponseHandler: Retrying GetAllMutationTokensRequest{observable=rx.subjects.AsyncSubject@73841916, bucket=‘default’} with a delay of 100000 MICROSECONDS
16/07/21 11:12:09 TRACE ResponseHandler: Retrying GetAllMutationTokensRequest{observable=rx.subjects.AsyncSubject@73841916, bucket=‘default’} with a delay of 100000 MICROSECONDS
16/07/21 11:12:09 DEBUG RecurringTimer: Callback for BlockGenerator called at time 1469092329400
16/07/21 11:12:09 TRACE ResponseHandler: Retrying GetAllMutationTokensRequest{observable=rx.subjects.AsyncSubject@73841916, bucket=‘default’} with a delay of 100000 MICROSECONDS
16/07/21 11:12:09 TRACE ResponseHandler: Retrying GetAllMutationTokensRequest{observable=rx.subjects.AsyncSubject@73841916, bucket=‘default’} with a delay of 100000 MICROSECONDS
16/07/21 11:12:09 DEBUG RecurringTimer: Callback for BlockGenerator called at time 1469092329600
16/07/21 11:12:09 TRACE ResponseHandler: Retrying GetAllMutationTokensRequest{observable=rx.subjects.AsyncSubject@73841916, bucket=‘default’} with a delay of 100000 MICROSECONDS
16/07/21 11:12:09 TRACE ResponseHandler: Retrying GetAllMutationTokensRequest{observable=rx.subjects.AsyncSubject@73841916, bucket=‘default’} with a delay of 100000 MICROSECONDS
16/07/21 11:12:09 DEBUG RecurringTimer: Callback for BlockGenerator called at time 1469092329800
16/07/21 11:12:09 TRACE ResponseHandler: Retrying GetAllMutationTokensRequest{observable=rx.subjects.AsyncSubject@73841916, bucket=‘default’} with a delay of 100000 MICROSECONDS
16/07/21 11:12:09 TRACE ResponseHandler: Retrying GetAllMutationTokensRequest{observable=rx.subjects.AsyncSubject@73841916, bucket=‘default’} with a delay of 100000 MICROSECONDS
16/07/21 11:12:10 DEBUG RecurringTimer: Callback for BlockGenerator called at time 1469092330000
16/07/21 11:12:10 TRACE ResponseHandler: Retrying GetAllMutationTokensRequest{observable=rx.subjects.AsyncSubject@73841916, bucket=‘default’} with a delay of 100000 MICROSECONDS
16/07/21 11:12:10 TRACE ResponseHandler: Retrying GetAllMutationTokensRequest{observable=rx.subjects.AsyncSubject@73841916, bucket=‘default’} with a delay of 100000 MICROSECONDS
16/07/21 11:12:10 DEBUG RecurringTimer: Callback for BlockGenerator called at time 1469092330200
16/07/21 11:12:10 TRACE ResponseHandler: Retrying GetAllMutationTokensRequest{observable=rx.subjects.AsyncSubject@73841916, bucket=‘default’} with a delay of 100000 MICROSECONDS
16/07/21 11:12:10 TRACE ResponseHandler: Retrying GetAllMutationTokensRequest{observable=rx.subjects.AsyncSubject@73841916, bucket=‘default’} with a delay of 100000 MICROSECONDS
16/07/21 11:12:10 DEBUG RecurringTimer: Callback for BlockGenerator called at time 1469092330400
16/07/21 11:12:10 TRACE ResponseHandler: Retrying GetAllMutationTokensRequest{observable=rx.subjects.AsyncSubject@73841916, bucket=‘default’} with a delay of 100000 MICROSECONDS
16/07/21 11:12:10 TRACE ResponseHandler: Retrying GetAllMutationTokensRequest{observable=rx.subjects.AsyncSubject@73841916, bucket=‘default’} with a delay of 100000 MICROSECONDS
16/07/21 11:12:10 DEBUG RecurringTimer: Callback for BlockGenerator called at time 1469092330600
16/07/21 11:12:10 TRACE ResponseHandler: Retrying GetAllMutationTokensRequest{observable=rx.subjects.AsyncSubject@73841916, bucket=‘default’} with a delay of 100000 MICROSECONDS
16/07/21 11:12:10 TRACE ResponseHandler: Retrying GetAllMutationTokensRequest{observable=rx.subjects.AsyncSubject@73841916, bucket=‘default’} with a delay of 100000 MICROSECONDS

I’ve tracked the CouchbaseCore, but the request goes into LMAX Disruptor, and just did not want to go there for now.

It should be great to know what is happening, and it’s interesting from a user point of view, that why do I have to TRACE to realize that something is not going through.

P.S.:

* Hey I just mapped you, * And this is crazy, * But here's my data * so subscribe me maybe. * * It's hard to block right, * at you baby, * But here's my data, * so subscribe me maybe.


#9

@zoltan.zvara looks like you found my easter egg!

The issue with the retries above looks like a bug in the DCP implementation - we are currently working on a replacement for it and its coming along pretty well… It is under heavy development and not complete, but if you are curious you can star/track the progress on this repo: https://github.com/couchbaselabs/java-dcp-client

Once it is in a stable and supportable condition we’ll integrate it transparently with spark, that will solve the current issues with spark streaming in general :slight_smile:

p.s.: the reason why the dcp stuff doesn’t look polished is: it isn’t :smiley: … it has always been experimental and we now have enough data together to build something substantial that will help us get to the point of a stable and supportable streaming system into spark, kafka and other consumers. Stay tuned!