Observable.take() ignored on rather complex Observable chain


I have the following method with an Observable chain:

protected void readMultiple(List<String> keys, Predicate<ENTITY> filter, int limit, Consumer<List<ENTITY>> responseHandler) {
    if (keys.isEmpty()) {
    Func1<JsonDocument, ENTITY> func = this::createEntity;
    Func1<ENTITY, Boolean> entityFilter = filter::test;
    Observable<ENTITY> obs = 
    if (limit > 0) {
    obs.timeout(TIMEOUT_SECONDS, TimeUnit.SECONDS)
       .subscribe(new Subscriber<List<ENTITY>>() { 
            //onError and onComplete have been left out for clarity
            public void onNext(List<ENTITY> entities) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug(" Processing " + entities.size() + " couchbase documents");

If limit is > 0 the obs.take(limit) section is executed but ignored when the Observable is actually executed. If I for example set the limit to 1, I will always get all results i.e. the entities list containing more than 1 element in onNext.

I am using Couchbase Java client 2.2.2 with rx 1.0.15. I have tried using rx versions 1.0.17 and 1.1.0 but all had no effect.
Actually take needs to work because we are passing a few thousand to a million keys and only need around the first 10 to make it in the end. We cannot load them all, create a huge list and then limit that list manually using Java streams for example.

Is this a bug in the Observables? In Couchbase? Am I doing something wrong?
Thank you!


Calling operators decorate the original Observable and returns a new, wrapped, Observable. You are merely forgetting to assign obs.take(limit) to the obs variable.