AsyncBucket get not calling onNext of Observable when Key does not exists. Should Throw an exception of DocumentDoesNotExistsException

Hello,

I am using Java SDK 2.4.1 and what i have noticed that when the key is not present the onNext call does not happen when using AsyncBucket, the expected behaviour should be to have an exception “DocumentDoesNotExistException” or “NoSuchElementException”. I have tried with previous versions of SDKs(2.3.x, 2.2.x,2.1.x) as well, the same behaviour occurs.

The reason i am posting this is because the behaviour of normal Observable and BlockingObservable is not in sync. when i tried same call with toBlocking.single() it gave me an exception DocumentDoesNotExistsException

  • Below is the source code which doesnt throw an exception -
  1. We are using AsyncBucket by using .async() over bucket to have immediate access to Observable and run custom flatmaps over it. But below code calls onCompleted of that Observable.

couchbaseServiceDelegator.getCouchbaseBucket(requestType).get(key, StringDocument.class);

  1. Below code provides an exception when Key does not exists.

couchbaseServiceDelegator.getCouchbaseBucket(requestType).get(key, StringDocument.class).toBlocking().single();

To help you guys out more when i checked the code below are my findings

In class CouchbaseAsyncBucket in method get(final String id, final Class target) there a switch case which when document does not exists returns false

switch(response.status()) {
case NOT_EXISTS:
** return false;**
case TEMPORARY_FAILURE:
case SERVER_BUSY:
throw new TemporaryFailureException();
case OUT_OF_MEMORY:
throw new CouchbaseOutOfMemoryException();
default:
throw new CouchbaseException(response.status().toString());
}

and once we return false this wont call onNext - below is the code(FilterSubscriber class) which doesnt calls the actual onNext of the Observable.

– Below is the code that gets called when we apply filter on an Observable.
@Override
public void onNext(T t) {
boolean result;
try {
result = predicate.call(t); // We return false here from filter when document does not exists.
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
unsubscribe();
onError(OnErrorThrowable.addValueAsLastCause(ex, t));
return;
}
if (result) {
actual.onNext(t); // This never gets called as we are returning false. So we cant control error flow.
} else {
request(1);
}
}

Please let me know if any more inputs required

Thanks,
Dhawal Patel

Hi,

this works as designed. The idea is that if a document does not exist the observable stream is empty, not returning with an error. You can always turn it into an error if you want by using an RxJava operator (defaultIfEmpty, switchIfEmpty).

Yes. That’s what we are doing now. My point is, the behaviour with sync and async are not same. We can argue that its the way Rx works.

Just a suggestion, it will be great if we have an method which takes in one more observable as input which will get subscribed when we have an empty stream.

Well, I’d argue the opposite. Signaling absence in RxJava is done by having an empty observable, while signaling absence in traditional/sync java is signaled by “null” (and in java 8 we finally get Optional).

I guess this request is best directed to the RxJava issue tracker Issues · ReactiveX/RxJava · GitHub

Oh, and I agree that we could have decided (when 2.0 hit stable years ago) to expose it as an error, but that ship has sailed at least until we hit a 3.0 :slight_smile:

Yeah exactly… But some how having that in RxJava makes more sense as it will be shipped to all of us. but also it will be super to have it in sdk as well… as its painful to debug to get to root of it.

Thanks,
Dhawal Patel

Is there any plan to move to RxJava 2.x in couchbase SDK?

Yes, but not right now since it is a breaking change and will mean a SDK 3.0 most likely (or we find an elegant way to run both side by side which should be possible as well - but it is yet to be seen how elegant the API is then ;-))