Weired sideeffect on JAVA sdk 2.1.x

Hi.
Sometimes we’re facing TimeoutException when accessing blocking SDK calls.
Looking inside it I found out, that whenever initially async Observable request is being created it is set to “observeOn(computation)” scheduler.
As a result what happens after. If I’m using original Observable result and somewhere within the .map function calling blocking operation (like jdbc call or something else) I’m blocking one of the threads used by CB SDK computation thread pool. So if simultaneously (it’s a web app) there are a lot of another calls to blocking CB SDK sooner or later they’ll have TimeoutException, because internal Observable will try to be executed on thread, which is blocked by some jdbc operation (or another blocking one).

To illustrate this situation, please, take a look at following test:
@Test
public void testName() throws Exception {
CountDownLatch latch = new CountDownLatch(7);
newObs()
.map(i -> {
System.out.println("starting to trigger another obs " + Thread.currentThread().getName());
newObs()
.subscribe(n -> System.out.println(n + " result 1 " + Thread.currentThread().getName()),
throwable -> latch.countDown(),
() -> latch.countDown());
newObs()
.subscribe(n -> System.out.println(n + " result 2 " + Thread.currentThread().getName()),
throwable -> latch.countDown(),
() -> latch.countDown());
newObs()
.subscribe(n -> System.out.println(n + " result 3 " + Thread.currentThread().getName()),
throwable -> latch.countDown(),
() -> latch.countDown());
newObs()
.subscribe(n -> System.out.println(n + " result 4 " + Thread.currentThread().getName()),
throwable -> latch.countDown(),
() -> latch.countDown());
newObs()
.subscribe(n -> System.out.println(n + " result 5 " + Thread.currentThread().getName()),
throwable -> latch.countDown(),
() -> latch.countDown());
newObs()
.subscribe(n -> System.out.println(n + " result 6 " + Thread.currentThread().getName()),
throwable -> latch.countDown(),
() -> latch.countDown());
newObs()
.subscribe(n -> System.out.println(n + " result 7 " + Thread.currentThread().getName()),
throwable -> latch.countDown(),
() -> latch.countDown());
try {
System.out.println("starting to wait " + Thread.currentThread().getName());
if (!latch.await(20, TimeUnit.SECONDS)) {
throw new RuntimeException(“timeout”);
}
System.out.println("finalized waiting " + Thread.currentThread().getName());
} catch (InterruptedException e) {
throw new RuntimeException(“second”, e);
}
return i;
})
.subscribeOn(Schedulers.computation())
.toBlocking().first();

}

private Observable newObs() {
return Observable.just(1).flatMap(i -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
return Observable.just(i);
}).observeOn(Schedulers.computation());
}

It timeouts 100% of times, since one of the computation() threads is blocked.

I know I should not use Scheduler.computation() with blocking operations, but it illustrates an issue I’m having.

Questions:

  • did I missed somewhere in documentation that I shouldn’t do blocking operations withing Observable given me from async SDK?
  • if there is none, there should be general recommendation to change scheduler for async SDK when you want to use the in blocking way?
  • maybe it makes sense to switch from observeOn(computation()) all the time to observeOn(immediate()) for blocking SDK?

PS is it clear at all on the problem?

You shouldn’t add blocking behavior in an Observable returned by the SDK directly, since as you found out it operates on a computation thread.

Instead, prefer chaining in observeOn(Schedulers.io()) and then do a map (for example) where you execute your blocking code. Note that calling SDK methods after that will switch back to the computation thread.

The fact that blocking inside a map/flatMap is considered harmful is stated in http://docs.couchbase.com/developer/java-2.1/observables.html (see “From Asynchronous to Synchronous” section).

Thanks a lot for response.
One more question:
What is the root cause of a decision to make blocking SDK to work on top of ‘computation()’ scheduler and not ‘immediate()’?
The issue is that if you have several calls in multiple threads (by the number of ‘computation()’ thread pool size) doing long-going blocking SDK calls (like blocking query view with STALE.FALSE) you can see TimeoutExeption in different thread and it is not anyhow correlated with issue you’re facing.

It’s not that the sync API was explicitly made to block on computation()… we wanted to be primarily asynchronous and non-blocking, so we used a bounded thread pool in the background. we still have to manage request-response cycles asynchronously.

Schedulers.immediate() would be even worse, it wouldn’t be asynchronous even on one thread since you’d block the current thread…

as for pool starvation and timeouts, maybe stale view queries are a bit of a corner case. You are able to increase both the timeouts for queries and the size of the pool using the CouchbaseEnvironment.

Not sure I understood why immediate() would be worse.
If I as a consumer is fine with blocking operation on my current thread then what’s a difference for SDK?

Yes, I can increase pool size and timeouts, sure.
My point is that there are two types of timeouts with this issue:

  1. there was not enough of time to process your request, because of performance or high load (which is much obvious for me)
  2. there was not enough of internal resources to process your request at all (which is less likely)
    And it’s not clear which one of them you’re currently facing, so helping determining which one of them would help.

On another project (which uses only blocking one) I do have periodic timeouts under very low (5-10 ops/s) CB load and it seems somehow correlated with issue described above.