Batch upsert using reactive programming

All, I am trying to upgrade my code to use java 3 sdk. We are doing batch upsert of 50K records and were using Observable and bucket.async() but my understanding is that is replaced by ReactiveCollection. Does anyone have sample code that I can follow?
Old code

Observable.from(someList)
                .flatMap(
                        t ->
                                bucket.async()
                                        .exists(
                                               key,
                
                                        .flatMap(
                                                exists -> {
                                                    if (exists) {
                                                        return bucket.async()
                                                                .mutateIn(
                                                                       key
                                                                .upsert(
                                                                      date)
                                                                .upsert(
                                                                       somepath, someobject)
                                                                .execute()

Also we were doing some exception handling using Retry.when() and that has changed too. Is there any sample code for retry handling?

Thanks

I just found the following code by searching Couchbase Labs repositories for “ReactiveCollection”:

I hope this helps.

1 Like

Hi @aqua123
That’s correct, with SDK3 we took the opportunity to migrate from RxJava2 to Project Reactor, which is now arguably the defacto standard for reactive streams on the JVM. They both inplement the reactive streams spec so are fully interopable. If you need to keep portions of your application in RxJava2, then please see these docs for how to convert between the Mono and Flux returned from SDK3, to the Single and Observable used by RxJava2.

On the other hand, if you’re in the position of fully converting your application to Project Reactor, then generally it’s easy to find the equivalent operation. In your example code for instance, Observable.from() is replaced by Flux.fromIterable(). And you will want to use collection.reactive() rather than bucket.async() in SDK3.

Incidentally, no need to do an explicit exists() check on the document before performing a mutation - you can just perform the mutation and then catch the DocumentNotFoundException. It’ll be slightly more performant as it saves a round-trip.

1 Like

Thank you @graham.pople. Is getting into exception and then dealing with it better than exists operation performance wise? Also we are trying to mutate the path so it might be “PathNotExistsException” that we will have to check for.

 Flux.fromIterable(someList)
                .flatMap( t ->{
                    try {
                        return insertOrUpsert(t, submissionId);
                    }catch(Exception e){
                        return doErrorProcessing(t, errorDocumentProcess, detailInsertErrors, e);
                    }
                })
                .onErrorResume(error -> {
                    log.error("Error doing batch processing: ", error);
                    return Mono.empty();}
                )
                .subscribe();
Does this look ok?

Hi @aqua123
Yes it would be more performant, by a few orders of magnitude: it’s the cost of a network round-trip (microseconds to milliseconds) vs the cost of creating an exception (nanoseconds).

Instead of try-catch, in reactive world you’d hang an onErrorResume off the insertOrUpsert. You can either return Mono.empty() from that if you want the other operations in someList to continue, or propagate the error otherwise.

Personally I use .parallel() and .runOn() instead of Flux::flatMap, but that’s more of a style thing. It just makes it clearer that there’s parallelism, and what the parallelism is. (By default Flux::flatMap will be doing 128 ops in parallel.) Something like this:

int concurrency = 100; // This many operations will be in-flight at once

Flux.fromIterable(someList)
        .parallel(concurrency)
        .runOn(Schedulers.boundedElastic())
        .concatMap(t -> insertOrUpsert(t, submissionId)
                .onErrorResume(err -> {
                    // Depends what we want to happen - here silently swallowing any errors
                    return Mono.empty();
                }))
        .sequential()
        .subscribe();
1 Like

Thank you @YoshiyukiKono. This is helpful.

@graham.pople how can I get reference to “t” in onErrorResume. I see it only has refernce to error object and doesn’t understand the outer object “t”. insertOrUpsert is currently returning a Publisher

You can just put the onErrorResume on the insertOrUpsert, not the flatMap. There you have access to t.

It is showimg me error - Cannot resolve method ‘onErrorResume’ in ‘Publisher’. insertOrUpsert is returning a publisher. Can I wrap it in Flux.just() and then onErrorResume is recognized.

It’s probably easiest to make insertOrUpdate return a Mono (a Mono is when an operation returns one-or-nothing, like the RxJava Single, a Flux is the equivalent of Observable). Especially since the underlying SDK3 collection.reactive().insert() and collection.reactive().replace() calls will be returning a Mono.

Made insertOrUpdate return Mono but onErrorResume is never executed. It’s going in the outside onErrorResume

Flux.fromIterable(somelist)
                .publishOn(Schedulers.boundedElastic())
                .log()
                .flatMap(t -> insertOrUpsert(t, submissionId)
                        .onErrorResume(error -> {
                            log.error("Error doing batch processing: ", error);
                            doErrorProcessing(t, errorDocumentProcess, detailInsertErrors, error);
                        })
                )
                .onErrorResume(error -> {
                    log.error("Error doing batch processing: ", error);
                    return Mono.empty();
                })
                .subscribe();

I also tried using doOnError but that is not recognizing “error” object.

I don’t know what your doErrorProcessing is doing, but from an onErrorResume you need to return a Mono for it to be executed.

I tried returning Mono.empty() and commented doErrorProcessing but i still don’t see it coming in the inner onErrorResume. Also, inner onErrorResume is not recognizing error object as throwable. It is saying it’s just an object.
The error that is being returned from insertOrUpsert is UnambiguousTimeoutException as I made the timeouts for insert/upsert 1 nano second to replicate error scenario.

@graham.pople thanks for your help…i was able to get it to work with something like this

    Flux.fromIterable(somebatch)
            .publishOn(Schedulers.boundedElastic())
            .flatMap(t -> insertOrUpsert(t, submissionId)
                    .doOnError(error -> doErrorProcessing((SomeBatchException) error, errorDocumentProcess, detailInsertErrors))
                    .onErrorResume(err -> {
                        log.info("inside onErrorResume");
                        return Flux.empty();
                    }))
            .subscribe();

if(errorDocumentProcess != null){
//dosomething
}

insertOrUpsert returns an exception if something goes wrong and that exception also has the outer object in it
My issue now is that subscribe doesn’t wait for the code which checks errorDocumentProcess is null or not. I tried using blockLast() but that is painfully slow to upload 1K records. Is there any other alternative?

Hi @aqua123
Doing processing inside doOnError looks a bit suspicious - with reactive you generally want to make sure everything’s part of the reactive chain.
For the slowness, I’d suggest adding some debugging statements to check if operations are actually being done in parallel. 1,000 operations should take no time at all. Or use .parallel(1000).runOn(Schedulers.boundedElastic()).concatMap(t -> ... to make absolutely certain it’s running in parallel (though .flatMap should do 128 operations in parallel, by default).
Failing that, you could take a look at the personal generic checklist I use when looking at performance issues: Couchbase performance issue - slowness - #4 by graham.pople

1 Like

Hi @graham.pople , for some reason we were not able to get reactive working and went with spring listener/publisher events but looks like couchbase client is not handling thousands of threads so we are back to reactive.

 Flux.fromIterable(somebatch)
                .publishOn(Schedulers.boundedElastic())
                .flatMap(t -> {
                    return insertOrUpsertInTwoDcuments(t, submissionId)
                            .doOnError(error -> errorDocumentProcess.add(error, t)))
                            .onErrorResume(error -> Mono.empty());
                })
                .subscribe();

insertOrUpsertInTwoDcuments needs to insert/upsert in 2 documents.

insertOrUpsertInTwoDcuments{
insert1document
return insert2document //returns Mono
}

Above code only inserts in #2 document and does n’t insert in #1 document. How can i make it insert into both documents?

Well it’s hard to know with just pseudo-code for insertOrUpsertInTwoDcuments, but my guess would be that you probably need to chain the two reactive operations together e.g. with flatMap/concatMap. That will cause the .subscribe() to be passed up through both operations, causing them to happen.

I have chained them now. But now I am seeing when error happens it always goes to the first onErrorResume and never to the second onErrorResume. Is there a way I can chain them?


        Flux.fromIterable(somebatch)
                .publishOn(Schedulers.boundedElastic())
                .flatMap(
                        t -> {
                            return insertT1(t, submissionId)
                                    .onErrorResume(
                                            DocumentExistsException.class,
                                            e -> {
                                                log.info("Handling document exists");
                                                this.handleDocumentExistsException(t, submissionId);
                                                return Mono.empty();
                                            })
                                    .doOnError(
                                            Exception.class,
                                            error -> {
                                                log.info("Error adding details", error);
                                                errorDocumentProcess.add(somerrorobj,
                                                                t));
                                                detailInsertErrors.add(t.getIdentifier());
                                            })
                                    .flatMap(
                                            t1 -> insertT2(t, submissionId, detailInsertErrors))
                                    .onErrorResume(
                                            DocumentExistsException.class,
                                            e -> {
                                                log.info("Handling event document exists");
                                                this.handleEventDocumentExists(
                                                        t, submissionId, detailInsertErrors);
                                                return Mono.empty();
                                            })
                                    .doOnError(
                                            Exception.class,
                                            error -> {
                                                log.info("Error adding events", error);
                                                errorDocumentProcess.add(
                                                        new DetourErrorObject(
                                                                ((Throwable) error).getMessage(),
                                                                t));
                                            });
                        })
                .blockLast();

So without access to all the code or knowing what documents exist, it’s really hard for me to know. Based on the info there I’d guess it’s only insertT2() that is raising an error? It does maybe look like you’re inserting the same doc twice, in which case I’d expect the 2nd to fail with DocumentExistsException, which sounds like what you’re reporting.

Thank you for your help @graham.pople. Looks like “exists” was blocking the calls, so i removed it and onErrorResume checking for DocumentExistsException and adding record to T1 in one flux and T2 in other. Now I am getting into com.couchbase.client.core.error.AmbiguousTimeoutException

{"@timestamp":"2022-04-28T15:40:20.733Z","@version":"1","message":"Error adding event 
document","logger_name":"com.usaa.ent.detour.batch.service.AsyncDetourBatchService","thread_name":"cb-timer-1-1","level":"ERROR","level_value":40000,"stackTrace":"com.couchbase.client.core.error.AmbiguousTimeoutException: InsertRequest, Reason: TIMEOUT 
{\"cancelled\":true,\"completed\":true,\"coreId\":\"0xf488dc2300000001\",\"idempotent\":false,\"lastChannelId\":\"F488DC2300000001/000000007AF1EF75\",\"lastDispatchedFrom\":\"10.38.120.12:51920\",\"lastDispatchedTo\":\"testcouchmh301dlsat.usaa.com:11210\",\"reason\":\"TIMEOUT\",\"requestId\":192207,\"requestType\":\"InsertRequest\",\"retried\":0,\"service\":
{\"bucket\":\"entdetcb\",\"collection\":\"_default\",\"documentId\":\"detour:event:tessomeid6\",
\"opaque\":\"0x2ef05\",\"scope\":\"_default\",\"type\":\"kv\",\"vbucket\":386},\"timeoutMs\":5000,\"timings\":{\"encodingMicros\":29,\"totalMicros\":5004414}}\n\tat com.couchbase.client.core.msg.BaseRequest.cancel(BaseRequest.java:183)\n\tat com.couchbase.client.core.Timer.lambda$register$2(Timer.java:157)

Not sure why this will be happening