Hi.
Sometimes we’re facing TimeoutException when accessing blocking SDK calls.
Looking inside it I found out, that whenever initially async Observable request is being created it is set to “observeOn(computation)” scheduler.
As a result what happens after. If I’m using original Observable result and somewhere within the .map function calling blocking operation (like jdbc call or something else) I’m blocking one of the threads used by CB SDK computation thread pool. So if simultaneously (it’s a web app) there are a lot of another calls to blocking CB SDK sooner or later they’ll have TimeoutException, because internal Observable will try to be executed on thread, which is blocked by some jdbc operation (or another blocking one).
To illustrate this situation, please, take a look at following test:
@Test
public void testName() throws Exception {
CountDownLatch latch = new CountDownLatch(7);
newObs()
.map(i -> {
System.out.println("starting to trigger another obs " + Thread.currentThread().getName());
newObs()
.subscribe(n -> System.out.println(n + " result 1 " + Thread.currentThread().getName()),
throwable -> latch.countDown(),
() -> latch.countDown());
newObs()
.subscribe(n -> System.out.println(n + " result 2 " + Thread.currentThread().getName()),
throwable -> latch.countDown(),
() -> latch.countDown());
newObs()
.subscribe(n -> System.out.println(n + " result 3 " + Thread.currentThread().getName()),
throwable -> latch.countDown(),
() -> latch.countDown());
newObs()
.subscribe(n -> System.out.println(n + " result 4 " + Thread.currentThread().getName()),
throwable -> latch.countDown(),
() -> latch.countDown());
newObs()
.subscribe(n -> System.out.println(n + " result 5 " + Thread.currentThread().getName()),
throwable -> latch.countDown(),
() -> latch.countDown());
newObs()
.subscribe(n -> System.out.println(n + " result 6 " + Thread.currentThread().getName()),
throwable -> latch.countDown(),
() -> latch.countDown());
newObs()
.subscribe(n -> System.out.println(n + " result 7 " + Thread.currentThread().getName()),
throwable -> latch.countDown(),
() -> latch.countDown());
try {
System.out.println("starting to wait " + Thread.currentThread().getName());
if (!latch.await(20, TimeUnit.SECONDS)) {
throw new RuntimeException(“timeout”);
}
System.out.println("finalized waiting " + Thread.currentThread().getName());
} catch (InterruptedException e) {
throw new RuntimeException(“second”, e);
}
return i;
})
.subscribeOn(Schedulers.computation())
.toBlocking().first();
}
private Observable newObs() {
return Observable.just(1).flatMap(i -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
return Observable.just(i);
}).observeOn(Schedulers.computation());
}
It timeouts 100% of times, since one of the computation() threads is blocked.
I know I should not use Scheduler.computation() with blocking operations, but it illustrates an issue I’m having.
Questions:
- did I missed somewhere in documentation that I shouldn’t do blocking operations withing Observable given me from async SDK?
- if there is none, there should be general recommendation to change scheduler for async SDK when you want to use the in blocking way?
- maybe it makes sense to switch from observeOn(computation()) all the time to observeOn(immediate()) for blocking SDK?
PS is it clear at all on the problem?