Observable.take() ignored on rather complex Observable chain


#1

I have the following method with an Observable chain:

protected void readMultiple(List<String> keys, Predicate<ENTITY> filter, int limit, Consumer<List<ENTITY>> responseHandler) {
    
    if (keys.isEmpty()) {
        responseHandler.accept(Collections.emptyList());
        return;
    }
    
    Func1<JsonDocument, ENTITY> func = this::createEntity;
    Func1<ENTITY, Boolean> entityFilter = filter::test;
    
    Observable<ENTITY> obs = 
        Observable
            .from(keys)
            .observeOn(scheduler)
            .flatMap(bucket.async()::get)
            .filter(Objects::nonNull)
            .map(func)
            .filter(entityFilter);
    
    if (limit > 0) {
        obs.take(limit);
    }
    
    obs.timeout(TIMEOUT_SECONDS, TimeUnit.SECONDS)
       .toList()
       .subscribe(new Subscriber<List<ENTITY>>() { 
            //onError and onComplete have been left out for clarity
       
            @Override
            public void onNext(List<ENTITY> entities) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug(" Processing " + entities.size() + " couchbase documents");
                }
                
                responseHandler.accept(entities);
            }
        }
    );
}

If limit is > 0 the obs.take(limit) section is executed but ignored when the Observable is actually executed. If I for example set the limit to 1, I will always get all results i.e. the entities list containing more than 1 element in onNext.

I am using Couchbase Java client 2.2.2 with rx 1.0.15. I have tried using rx versions 1.0.17 and 1.1.0 but all had no effect.
Actually take needs to work because we are passing a few thousand to a million keys and only need around the first 10 to make it in the end. We cannot load them all, create a huge list and then limit that list manually using Java streams for example.

Is this a bug in the Observables? In Couchbase? Am I doing something wrong?
Thank you!


#2

Calling operators decorate the original Observable and returns a new, wrapped, Observable. You are merely forgetting to assign obs.take(limit) to the obs variable.