Do SDK Observables always start at the beginning of the sequence?

We have a Scala application which is using the JavaSDK version 2.1.4.

We wanted to confirm that the Hot Observables from the SDK will always emit everything from the beginning to a new subscriber. The RxJava explicitly say that a late observer may start observing the sequence somewhere in the middle (http://reactivex.io/documentation/observable.html) but other things I’ve seen in my searches seems to imply that the Couchbase SDK starts off new observers at the beginning of the sequence. (I know that the new 2.2 SDK is switching to Cold Observables but for now we’re on 2.1.4)

The current code we’re trying to use for get, paraphrased from Scala so apologies for any Java errors.

class OurObserver extends Observer<RawJsonDocument> {
  void onCompleted() {
       //Pass along null to rest of code if no onNext calls have been made
  }
  void onError(Throwable error) {
     //Log error and propagate exception to rest of code
  }
  void onNext(RawJsonDocument doc) {
     //Pass along result to rest of code
  }
}

used like this

Observable<RawJsonDocument> observable = bucket.async.get(key, classOf<RawJsonDocument>).timeout(2500, TimeUnit.MILLISECONDS)
OurObserver observer = new OurObserver()
observable.subscribe(observer)

One of my coworkers was worried about a race between the subscription and emission of the only object from the get call. We’ve been seeing more TimeoutExceptions than we’d been expecting in our stress tests using this code. On my fairly beefy dev machine to our 2 node development cluster, throwing 100 simultaneous requests at it only gets about 30 successes and the rest timed out.

All hot observables in 2.1.x will start from the beginning once subscribed to, because they will use one of the following subjects:

  • AsyncSubject for all responses with only one result (so everything except views and n1ql)
  • N1ql and views use something called a UnicastAutoReleaseSubject, which only allows for one subscriber and it will send all data it has to it to consume (to avoid leaking buffers).

I don’t think the timeouts are related to a race condition in the subjects. Can you shed more light on the actual code you run and the characteristics when you run into the timeouts?

Thanks for getting back to me daschl.

For a bit more context our application is built on the Play framework and Akka actors. This code is part of an actor we use as glue between the Couchbase SDK and the rest of our system. Lots of requests seem to causing a pile up somewhere in the flow. Possibly we are exhausting a thread pool somewhere (quite likely one of ours instead of one of yours) and I’m trying to eliminate possible suspects. I need to spend a bit of time quantifying if this actually a performance degradation vs the way we were operating before using the 1.x SDK and I will try and get you an actual code example once I clear it with my team lead.

1 Like