Best Practice to Handle Couchbase Exceptions in Observables/Async Bulk Loads


I have inherited a couchbase client built with the Java SDK. It parses through a line-by-line JSON file and loads the files to my Couchbase bucket asynchronously via an Observable. However, I also have a requirement to version documents so if a “Document Already Exists”, I am able to properly version and insert it w/ a new document ID. I am having trouble understanding how to capture any documents that hit a “DocumentAlreadyExists” exception using an Observable. As I understand it, if an exception is thrown, the Observable dies. My question is, what is the best practice to load documents in bulk but account for exceptions and perform necessary logic on those exceptions? This is my Observable currently:

>  List<JsonDocument> documents = makeJsonDocuments();
                .flatMap(new Func1<JsonDocument, Observable<JsonDocument>>() {
                    public Observable<JsonDocument> call(final JsonDocument docToInsert) {
                        // this is what I need to 'try' and then perform action on any 'catch' for
                        // a "DocumentAlreadyExists exception
                        return bucket.async().insert(docToInsert);

I have created a synchronous load that handles DocumentAlreadyExists exceptions and performs the proper versioning function but it is MUCH slower so I would prefer to stick to Async loads.

documents.forEach((e) -> {
try {
} catch (DocumentAlreadyExistsException d) {
// this works and I call additional logic to handle it