BackBuffer controllable in any way?


#1

Hello,

I´ve been struggling with BackPressureExceptions for the last few days. As the underlying implementation of the AsyncBucket does not seem to allow any BackBuffer-Limitations or Settings how would you controll the inputstream?

When loading several thousand documents i cannot find any other solution than to drastically increase the .requestBufferSize().

(I use the Code at the Startup of a GameServer to load 500.000k Profiles and Settings; Using CB from inside the game would be to slow they need to be in-memory. Only changes are written to the db async at runtime)

Example Code that will result in BackBufferExceptions:

    asyncBucket.query(ViewQuery.from("getAll", "accounts").stale(Stale.FALSE))
    .flatMap(x -> x.rows())
    .flatMap(x -> x.document())
    .toBlocking()
    .forEach(doc -> {
        Account acc = Account.fromJson(doc.content());
        accounts.put(acc.getName(), acc);}
    );

Is there anything i can do here while keeping a high troughput?

Thanks,
Kademlia


How do I handle BackpressureException with couchbase-spark-connector
#2

The BackpressureException occurs because the internal queue (or rather a RingBuffer) fills up to quickly with queries that the IO layer and/or the server cannot serve in time. The size of the RingBuffer can be customized further. What you could also try is use RxJava operators to batch and delay between batches?
edit: see better solution in next post

  asyncBucket.query(ViewQuery.from("getAll", "accounts").stale(Stale.FALSE))
    .flatMap(x -> x.rows())
    //batch in lists of 2048 items, to be tuned
    .buffer(2048)
    //re-emit content of those lists, concatMap unless flatMap preserves order
    .concatMap(buffer -> Observable.from(buffer)
        //BUT add a delay for first emission
        .delay(100, TimeUnit.MILLISECONDS)
        //AND do the doc retrieval in the same
        .flatMap(x -> x.document())
    )
    .toBlocking()
    .forEach(doc -> {
        Account acc = Account.fromJson(doc.content());
        accounts.put(acc.getName(), acc);}

@daschl what do you think?


#3

So after thinking a bit about that, this is not entirely satisfactory because it doesn’t actually react to BackpressureException, it delays between batches regardless…

Here is another solution that you could try. It relies on the RetryBuilder introduced in 2.1.2. Note that the retry semantic, in the current state of the SDK, needs you to defer the calls to document() in order to be retriable:

asyncBucket.query(ViewQuery.from("getAll", "accounts").stale(Stale.FALSE))
    .flatMap(x -> x.rows())
    .concatMap(row -> 
        Observable.defer(() -> row.document()) //defer makes it retriable
        .retryWhen(RetryBuilder.anyOf(BackpressureException.class) //only retry on Backpressure
            .max(2) //you could tune the maximum number of attempts here
            .delay(Delay.exponential(TimeUnit.MILLISECONDS, 500, 10)) //delay between attempts grows exponentially between 10ms and 500ms
            .build()
        )
    )//end of retrying concatMap
    .toBlocking()
    .forEach(doc -> {
        Account acc = Account.fromJson(doc.content());
        accounts.put(acc.getName(), acc);}

A bit of context: concatMap is like flatMap but it keeps order by waiting for completion of each internal Observable before emitting the next one. In effect, this means that it waits for the retry to complete (with delays) before trying to get the next document, in case of a BackpressureException.

:exclamation: Using a flatMap instead would mean first item to cause backpressure would retry with a delay, but it would immediately go to the next key, try to fetch the document and probably get a BackpressureException again.

So concatMap gives us the best result: if and only if there’s a BackpressureException, retry just this key and make subsequent keys wait, then once the system is in good shape again continue fetching remaining documents. :smiley:


#4

Hey, wow thanks, i need to try this. I got/am caught by some other work on the servers currently.

My own idea was to add some kind of “buffer-burrow-service”. All requests would need to burrow a given amount of the buffer and are only allowed to use exactly that much space while querying. But as far as i can tell this would only be possible in a good way if it was part of the SDK and be able to controll the workflow in the underlying AsyncBucket logic.

On the other hand this could be done inside the SDK without the Users even noticing. All Queries to asyncBucket could internally use the buffer-burrow-service and make the BackPressure impossible. Then something like this would get much nicer to implement:

Subscription accs = ...getAllAccounts();
Subscription  users = ...getAllUsers();
Subscription  regions = ...getAllRegions();

<Do other BootUp Work on the Server>

<WaitFor accs.isUnsubscribed() && users.isUnsubscribed() && regions.isUnsubscribed()>

<Allow Ticking/Client-Connections>

Thank you for the detailed and fast responses,
Kademlia


#5

I did some Tests with this Setup and noticed a pretty huge slowdown caused by concatMap
If someone googles this and does not require the order use flatMap

Test:

  • Localhost, 4.0.0-2213 DEV Edition (build-2213), no memory Limits
  • .requestBufferSize(131072) (High enough to not result in any BackPressureExceptions)
  • At the end i wait for multiple view-Queries to finish:
    while (!accountSub.isUnsubscribed() || !userSub.isUnsubscribed() || !regionSub.isUnsubscribed()) {
        try { Thread.sleep(10);    } catch (InterruptedException e) { e.printStackTrace(); } 
    }

Old Code or new Code with .flatMap. About 30k-50k OPS + 20k Ops of the last Data-View
Basically this is bound by my CPU-Speed. The Java-App + CB take 99%
(Slowdown at the end caused by a view-Query that is dependent on another query to finish first)

With BackPressure-Checks and .concatMap. About 25k OPS + 20k Ops of the last Data-View
This is no longer bound by my CPU-Speed. The Java-App + CB take 65%**
(Slowdown at the end caused by a view-Query that is dependent on another query to finish first)

Code Example

    asyncBucket.query(ViewQuery.from("getAll", "regions").stale(Stale.FALSE))
    .flatMap(x -> x.rows())
    .**concatMap**(row -> {
        return Observable.defer(() -> row.document())
        .retryWhen(RetryBuilder.anyOf(BackpressureException.class)
        .max(5)
        .delay(Delay.exponential(TimeUnit.MILLISECONDS, 500, 1)).build());
    })
    .map(doc -> Region.fromJson(doc.content())).toBlocking().forEach(region -> regions.put(region.getName(), region));

#6

mmh that’s good to know, didn’t think concatMap would cause this huge of a slowdown :frowning:
the issue with flatMap is that you retry more than the absolute minimum necessary, so this is a tradeoff I guess?
@daschl was making a good analogy: it’s like pushing the brakes on a car…

  • flatMap would be just braking a little when you see the car in front of you brake, then re-accelerate immediately, maybe braking a little again if the car in front is still slowing :blue_car::red_car:
  • concatMap version would be like pushing a good amount of brake and build a nice gap with the car in front, keeping a slower but safer speed and better security distance :blue_car:    :red_car:
  • no retryWhen would just be a traffic accident :blue_car::boom: :oncoming_automobile::boom::blue_car:

:wink:


#7

@simonbasle love the explanation with the emoticons :wink: