RxJava: Observable is already released

Hey there,
I’m pretty new to using the async bucket in the Couchbase SDK.

I would like to fetch a large volume of documents (in the range of 50000-100000) and process them one by one. I thought that using RxJava would lean itself to this kind of task.
Below you can see my pipeline:

String queryString = String.format("SELECT CDT.*, meta().id "
                                       + "FROM %s as CDT "
                                       + "WHERE `_class` = '%s'",
                                   getQuotedBucketName(),
                                   clazz.getCanonicalName());

N1qlParams params = N1qlParams.build().consistency(ScanConsistency.NOT_BOUNDED).pretty(false);
N1qlQuery query = N1qlQuery.simple(queryString, params);

asyncBucket
    .query(query)
    .flatMap(AsyncN1qlQueryResult::rows)
    .map(this::readTask)
    .filter(Optional::isPresent)
    .map(Optional::get)
    .forEach(consumer::accept,
             e -> LOGGER.error("Failed to delete chunk.", e));

Using this helper function to map the results:



public Optional<ChunkDeletionTask> readTask(AsyncN1qlQueryRow row) {
    try {
        return Optional.of(objectMapper.readValue(row.value().toString(), clazz));
    } catch (IOException e) {
        LOGGER.warn("Could not map ChunkDeletionTask to object.", e);
        return Optional.empty();
    }
}

The consumer in turn deletes the given entity.

When there is a lot of objects returned from the query, I get the infamous exception IllegalStateException: The content of this Observable is already released. Subscribe earlier or tune the CouchbaseEnvironment#autoreleaseAfter() setting..

Can someone give me a hint as to what I am doing wrong? I assumed that using backpreassure, rows are only fetched when the service is ready to handle them.