Bulk sub document api with observable not faster than synchronous operation

Hi,
I am trying to do bulk upserts using observables and was expecting a big performance improvement. But for 10000 records, it takes about 55 seconds with async and observable and 60 seconds with synchronous using a loop to insert. Am I missing something? Below is the code for reference

Observable
            .from(someBatch)
            .flatMap(t -> {
                final String key = generateId(t);
                Optional<SomeObject> optionalSomeObject = repo.findById(key);
                if (optionalSomeObject.isPresent()) {
                    return asyncBucket.mutateIn(key)
                            .upsert("somepath",t)
                            .execute()
                            .retryWhen(RetryBuilder
                            .anyOf(BackpressureException.class)
                            .delay(Delay.exponential(TimeUnit.SECONDS, sleepInSeconds))
                            .max(retryCount)
                            .build())
                            .doOnError(e -> {
                                log.error("Error", e);
                            })
                            .onErrorResumeNext(Observable.empty());
                            
                } else {
                    return asyncBucket.insert(converter.toDocument(t, key))
                            .retryWhen(RetryBuilder
                            .anyOf(BackpressureException.class)
                            .delay(Delay.exponential(TimeUnit.SECONDS, sleepInSeconds))
                            .max(retryCount)
                            .build())
                            .doOnError(e -> {
                                log.error("Error", e);
                            })
                            .onErrorResumeNext(Observable.empty());
                }
            })
            .toList()
            .toBlocking()
            .single();
}

Hey @aqua123

It’s worth checking if this code is actually running in parallel. You can add an AtomicInteger and some tracking and logging in doOnSubscribe (done before the mutateIn) and doOnNext (done after) to check this.

Thank you @graham.pople

I added the logging and now in the logs I see

Counter in onSubscribe: 0
Counter in onNext: 1 … 10000

Observable
.from(detourBatchObjects)
.doOnSubscribe(() -> {
** log.info(String.format(“Counter in onSubscribe: %d”, at.get()));**
** })**
.flatMap(t -> {
final String key = generateId(t.getPartyId());
Optional optionalDetour = repo.findById(key);
if (optionalDetour.isPresent()) {
return asyncBucket.mutateIn(key).upsert(String.format(“detourDetails.%s”, t.getDetourId()), new DetourDetail(t.getAccountId(), t.getSubmissionId(), DetourStatusEnum.OPEN))
.execute()
.doOnNext(x -> {
** log.info(String.format(“Counter in onNext: %d”, at.incrementAndGet()));**
** })**
.retryWhen(RetryBuilder
.anyOf(BackpressureException.class)
.delay(Delay.exponential(TimeUnit.MILLISECONDS, sleepInMilliSeconds))
.max(retryCount)
.build())
.doOnError(e -> {
log.error(“Error”, e);
})
.onErrorResumeNext(Observable.empty());
} else {
return asyncBucket.insert(converter.toDocument(t, key))
.doOnNext(x -> {
log.info(String.format(“Counter in onNext: %d”, at.incrementAndGet()));
})
.retryWhen(RetryBuilder
.anyOf(BackpressureException.class)
.delay(Delay.exponential(TimeUnit.MILLISECONDS, sleepInMilliSeconds))
.max(retryCount)
.build())
.doOnError(e -> {
log.error(“Error”, e);
}).onErrorResumeNext(Observable.empty());
}
})
.toList()
.toBlocking()
.single();

@graham.pople I think we figured out the issue. repo.get(id) was blocking call so that was making the process slow. I converted into exists and added it to observable and that improved performance a lot. Now for 10000 records, it just takes 1.2 seconds

Yes, that makes sense. You could also do the insert, and if that fails with DocumentAlreadyExistsException, do the mutateIn. So in cases where the document does exist, you’re down to just 1 round trip.

But, exists is rather faster than insert, as it doesn’t need to send the JSON. So whether this approach is faster overall will depend on how often you need to insert vs mutateIn, and to an extent on how big your data is.