BackBuffer controllable in any way?

So after thinking a bit about that, this is not entirely satisfactory because it doesn’t actually react to BackpressureException, it delays between batches regardless…

Here is another solution that you could try. It relies on the RetryBuilder introduced in 2.1.2. Note that the retry semantic, in the current state of the SDK, needs you to defer the calls to document() in order to be retriable:

asyncBucket.query(ViewQuery.from("getAll", "accounts").stale(Stale.FALSE))
    .flatMap(x -> x.rows())
    .concatMap(row -> 
        Observable.defer(() -> row.document()) //defer makes it retriable
        .retryWhen(RetryBuilder.anyOf(BackpressureException.class) //only retry on Backpressure
            .max(2) //you could tune the maximum number of attempts here
            .delay(Delay.exponential(TimeUnit.MILLISECONDS, 500, 10)) //delay between attempts grows exponentially between 10ms and 500ms
            .build()
        )
    )//end of retrying concatMap
    .toBlocking()
    .forEach(doc -> {
        Account acc = Account.fromJson(doc.content());
        accounts.put(acc.getName(), acc);}

A bit of context: concatMap is like flatMap but it keeps order by waiting for completion of each internal Observable before emitting the next one. In effect, this means that it waits for the retry to complete (with delays) before trying to get the next document, in case of a BackpressureException.

:exclamation: Using a flatMap instead would mean first item to cause backpressure would retry with a delay, but it would immediately go to the next key, try to fetch the document and probably get a BackpressureException again.

So concatMap gives us the best result: if and only if there’s a BackpressureException, retry just this key and make subsequent keys wait, then once the system is in good shape again continue fetching remaining documents. :smiley:

2 Likes