CouchbaseAsyncBucket.get() drops any RxJavaSchedulersHook

I’m using couchbase in a service that reads couchbase and makes remote calls based on the couchbase results. We are using spring-cloud-sleuth for distributed tracing across services.

Sleuth supports calls across RxJava threads using RxJavaSchedulersHook (https://github.com/spring-cloud/spring-cloud-sleuth/blob/master/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/rxjava/SleuthRxJavaSchedulersHook.java)

The distributed tracing is working across RxJava threads except in the case where we need to read couchbase get(id, target), then call other services for additional information.

I think the issue occurs in CouchbaseAsyncBucket.get(id, target). The method calls OnSubscribeDefer that ends up calling o.unsafeSubscribe(Subscribers.wrap(s)); (see javadoc below)

The call to unsafeSubscribe loses the information in the RxJavaSchedulersHook. Is there a way to use couchbase.java-client asyncBucket.get(id) without losing the RxJavaSchedulersHook?

javadoc
Subscribes to an Observable and invokes OnSubscribe function without any contract protection, error handling, unsubscribe, or execution hooks.

@michael_sarver do you know if this is an rxjava bug that has been fixed in a later version? I didn’t look into it yet, but also keep in mind that a call against the database will end up in the IO pool (netty) as well, so its not exclusively rxjava thread interaction going on here.

I’m creating a simple project on github that demonstrate the problem. I will try to narrow down the root cause using some integration tests. Thanks for your help. I will reply once I can show where the issue occurs.

1 Like