Bulk update with sub document api and observables

Hi,
I am currently trying to use bulk upsert using sub-document api and observables. But for the upsert to work, the path and key has to exist otherwise it throws exception. Not sure what’s the best way to handle the exception case. Here’s the piece of code that I am working on. Any help will be appreciated.

private void createOrUpdateObjects(List someBatch) {
AsyncBucket asyncBucket = couchbaseBucket.async();

    Observable
            .from(someBatch)
            .flatMap(t -> {
                final String key = generateId(t);
                Optional<SomeObject> optionalSomeObject = repo.findById(key);
                if (optionalSomeObject.isPresent()) {
                    return asyncBucket.mutateIn(key)
                            .upsert("somepath",t)
                            .execute()
                            .doOnError(e -> {
                                log.error("Error", e);
                            })
                            .onErrorResumeNext(Observable.empty())
                            .retryWhen(RetryBuilder
                            .anyOf(BackpressureException.class)
                            .delay(Delay.exponential(TimeUnit.SECONDS, sleepInSeconds))
                            .max(retryCount)
                            .build());
                } else {
                    return asyncBucket.insert(converter.toDocument(t, key))
                            .doOnError(e -> {
                                log.error("Error", e);
                            })
                            .onErrorResumeNext(Observable.empty())
                            .retryWhen(RetryBuilder
                                    .anyOf(BackpressureException.class)
                                    .delay(Delay.exponential(TimeUnit.SECONDS, sleepInSeconds))
                                    .max(retryCount)
                                    .build());
                }
            })
            .toList()
            .toBlocking()
            .single();
}

Hey @aqua123

mutateIn() has an upsertDocument(true) method that will insert the document, if it does not already exist. You can use this instead of the findById branching, to save a round-trip.

upsert("somepath", key) should not return an error if the path does not already exist, it should create it.

Your retryWhen logic wants to go before the onErrorResumeNext, otherwise it will never trigger - the onErrorResumeNext will capture all errors.

Hope this helps.

Thank you @graham.pople for quick response. Because I am mutating just a path in the whole document, I am just giving part of document and not the whole document. If I use upsertDocument(true), it just saves that part and not the whole document.
asyncBucket.mutateIn(key).upsert(“somepath”, part-of-the-document)

Thank you for pointing the issue with retry. I have moved it up now. Should, “doOnError” log all the errors because I don’t see that happening. With a exception, the whole thing stops with stacktace in console.

Ah I meant combining upsertDocument with your mutateIn call… so it will create the document if it doesn’t exist, and either way it will upsert “somepath”. Hopefully that makes sense, if not let me know and I’ll knock up some code.

If, I use upsertDocument(true) with mutateIn call, it will always update the whole document
(if it exists) without just mutating the path which we don’t want because the document has other fields and paths which we don’t want to overwrite.
For reference, this is our json below. For each call, I have to update someDetails.*, it can be ABC, DEF or XYZ or I have to insert a new field. If the key does not exist, I have to insert the new document.

{
“someId”: “123”,
“nsid”: “detail
“someDetails”: {
“ABC”: {
“submissionId”: “batch1”,
“status”: “OPEN”
},
“DEF”: {
“submissionId”: “batch1”,
“status”: “OPEN”
},
“XYZ”: {
“submissionId”: “batch1”,
“status”: “OPEN”
}
}"
}