Rx queue blocks in "wrong" call


I’ve been struggling with a problem in the Java SDK for a while, which I assumed was a deadlock in my code, but I couldn’t understand how a simple “replace” could possibly deadlock with itself.

It turns out that in a call much earlier I had deadlocked one of the “worker” threads in the thread pool maintained by CoreScheduler and then this later call had been assigned to the same “worker” thread. The consequence of this was that the earlier deadlock meant that the response to the replace request was “never” processed.

Now that I’ve managed to unearth the problem, I can go ahead and figure out the original deadlock and then presumably everything will work, but I’m wondering if it’s possible to make it easier to diagnose these problems. In order to figure this out, I had to (or at least I did) download the source to the couchbase & rxjava libraries and add tracing to them until I could understand what was going wrong (and in particular, that the queue for one of the thread pool workers was growing). I was then able to identify the thread at play and see what was going on.

Part of the problem is probably a failure on my part to be consistent at adding timeouts everywhere (my next step is to try and add such a timeout and check that it throws an exception).

Would it be possible to add some kind of automatic diagnostic that is called on a timer or otherwise that can review the status of the thread pools and raises some kind of alert if it seems that there is a backup on one or more of them? Or does such a facility already exist and I need to turn it on?

Finally, I’m unclear why there appears to be a nested pool of pools. It would seem that the CoreScheduler maintains a pool of ThreadPoolExecutors, each of which has just one thread. It seems to me that while my code would still have been wrong, if this were not the case it would not have blocked in this way. Whether the alternative would have been better or worse, I wouldn’t like to say :slight_smile:


It’s true that reactive asynchronous program are by nature harder to debug, glad you could hunt down the problem but I don’t think we can add instrumentation like that in an easy way.

As a rule of thumb though, you should never have blocking behavior in you Observable chain, eg. in maps or flatMaps. Deadlocks are of course another beast (but sharing resources always adds much complexity and introduces problems like that).

Or if you do have blocking calls somewhere, you should be careful to execute them in the Schedulers.io() scheduler using observeOn(...) (or maybe sometimes subscribeOn(...)).


Thanks. All good advice.

I’ve been doing asynchronous/multithreaded programming for a fair while now, but even so, the ReactiveX library still feels very “odd” to me. In particular, it seems designed much more for data streaming applications (where you configure the flow and then start data flowing) than for the kinds of apps that seem more natural in a Couchbase setting, where there tends to be much more interactivity.

I have tried using observeOn() and subscribeOn() a couple of times, but have always ended up regretting it; for me, at least, that has always been dealing with a symptom rather than actually addressing the cause and that just means the problem pops up somewhere else.