I have the following method with an Observable chain:
protected void readMultiple(List<String> keys, Predicate<ENTITY> filter, int limit, Consumer<List<ENTITY>> responseHandler) {
if (keys.isEmpty()) {
responseHandler.accept(Collections.emptyList());
return;
}
Func1<JsonDocument, ENTITY> func = this::createEntity;
Func1<ENTITY, Boolean> entityFilter = filter::test;
Observable<ENTITY> obs =
Observable
.from(keys)
.observeOn(scheduler)
.flatMap(bucket.async()::get)
.filter(Objects::nonNull)
.map(func)
.filter(entityFilter);
if (limit > 0) {
obs.take(limit);
}
obs.timeout(TIMEOUT_SECONDS, TimeUnit.SECONDS)
.toList()
.subscribe(new Subscriber<List<ENTITY>>() {
//onError and onComplete have been left out for clarity
@Override
public void onNext(List<ENTITY> entities) {
if (LOG.isDebugEnabled()) {
LOG.debug(" Processing " + entities.size() + " couchbase documents");
}
responseHandler.accept(entities);
}
}
);
}
If limit is > 0 the obs.take(limit)
section is executed but ignored when the Observable is actually executed. If I for example set the limit to 1, I will always get all results i.e. the entities
list containing more than 1 element in onNext
.
I am using Couchbase Java client 2.2.2 with rx 1.0.15. I have tried using rx versions 1.0.17 and 1.1.0 but all had no effect.
Actually take
needs to work because we are passing a few thousand to a million keys and only need around the first 10 to make it in the end. We cannot load them all, create a huge list and then limit that list manually using Java streams for example.
Is this a bug in the Observables? In Couchbase? Am I doing something wrong?
Thank you!