Bulk operations async error handling

Hello!
I guess, I’ve got a brain explosion from Observables+Couchbase async api :slight_smile: Could anybody help me, please?
Have been fighting with batch operations for a few days already and still can’t understand how to do batch operations with proper error handling in java.

Let’s say, I want to update some documents in Couchbase in bulk.
If I used sync API, it would look like:

List<JsonDocument> items = getItems(1, 2, 3, 4, 5); // some method which calls bucket.get() for specified keys
for (JsonDocument item : items) {
   try {
      try {
         item.content().putInt("value", 42);
         bucket.replace(item);
      } catch (CasMismatchException e) {
        // retry
        bucket.get(item.id()).content().putInt("value", 42);
        bucket.replace(item);
      }
   } catch (Exception e) {
      // handle error which doesn't stop execution for other items
      errorHandler.handleError(item.id(), e);
   }
}

But this is not parallel, and documentations says async API is more efficient.
What I can’t understand is how to create such flow via Observables, I tried:

Observable.from(items)
.flatMap(item -> {
   item.content().putInt("value", 42);
   return bucket.async().replace(item);
})
.onErrorResumeNext(error -> {
   // what to do? return another observable which does retry logic above?
   // how do I know what item has failed?
   // I don't have ID of that item, nor I can extract it from passed Exception
   // why onErrorResumeNext is getting called only once (if one item fails)
   // and is not called for other items?
})
.subscribe(); // also need Subscriber with onError (otherwise there are warnings in log)

Any help will be much appreciated!
Thanks

Hi @chotenvan,

You can chain the retry to the async operation observable.
Observable.from(items) .flatMap(item -> { item.content().putInt("value", 42); return bucket.async().replace(item) .timeout(timeoutMicroSeconds, TimeUnit.MICROSECONDS) // set timeout for the individual request .retryWhen(RetryBuilder.anyOf(TemporaryFailureException.class) //retry on Temporary failure, similarly you chain retries for timeouts, backpressure .. .max(2) //you could tune the maximum number of attempts here .delay(Delay.exponential(TimeUnit.MICROSECONDS, timeoutMicroSeconds, 2)) //delay between attempts grows exponentially .build()) }) .subscribe();