Couchbase SDK 2 : bulk read operations , how to failover to replicas

We are in the process of refactoring a benchmark tool migrating from Couchbase Client 2 to new CouchBase SDK 2.

Previous version has following “bulk get” logic to retrive keys in bulk and if it fails reading from the master, there is a failover to read from the “replicas”

Legacy code :

List<Map.Entry<String, OperationFuture<CASValue<JsonNode>>>> futures = new java.util.ArrayList<>(keys.size());
                for (String key : keys) {
                    futures.add(new AbstractMap.SimpleImmutableEntry<>(key, client.asyncGets(key, transcoder)));
                 }
                Map<String, Long> casValues = new java.util.HashMap<>(keys.size(), 1f);
                for (Map.Entry<String, OperationFuture<CASValue<JsonNode>>> e : futures) {
                    String key = e.getKey();
                    OperationFuture<CASValue<JsonNode>> future = e.getValue();
                    try {
                        CASValue<JsonNode> casVal = future.get();
                        if (checkStatus(future.getStatus(), errIfNotFound) == OK) {
                            result.put(key, JsonByteIterator.asMap(casVal.getValue()));
                            casValues.put(key, casVal.getCas());
                        } else {
                            return ERROR;
                        }
                    } catch (RuntimeException te) {
                        if (te.getCause() instanceof CheckedOperationTimeoutException) { ///READ FROM REPLICA
                            log.warn("Reading from Replica as reading from master has timed out.");
                            // This is a timeout operation on a read, let's try to read from slave
                            ReplicaGetFuture<JsonNode> futureReplica = client.asyncGetFromReplica(key, transcoder);
                            result.put(key, JsonByteIterator.asMap(futureReplica.get()));
                            
                        } else {
                            throw te;
                        }
                    }


                }

Using new Couchbase SDK2

According to the new Couchbase 2 SDK docs ,
Couchbase SDKs

I have following logic to retrieve in bulk.But I am not quite sure where to add the failover mechanism to read from “replicas” using

bucket.async().getFromReplica(key, ReplicaMode.ALL);

List<RawJsonDocument> rawDocs = idObs.flatMap((keys)->{
    		Observable<RawJsonDocument> rawJsonObs = bucket.async().get(key, RawJsonDocument.class);
    		return rawJsonObs;
    	}).toList()
    	  .toBlocking()
    	  .single();

How can I implement this “read from replica” failover mechanism with the new RxJava based CouchBase SDK ?

I think I found the answer !

Observable rawDocs = idObs.flatMap((key)->{
System.out.println("key "+key);
Observable rawJsonObs = bucket.async().get(key, RawJsonDocument.class);

		return rawJsonObs.onErrorResumeNext(new Func1<Throwable, Observable<RawJsonDocument>>() {
  		@Override
  		public Observable<RawJsonDocument> call(Throwable t1) {
  			if (t1.getCause() instanceof TimeoutException) { //we have a timeout
  				return bucket.async().getFromReplica(key, ReplicaMode.FIRST, RawJsonDocument.class).first();
  			}
  			throw OnErrorThrowable.from(t1);
  		}
  	});
	});

hi @ashikaumanga

I commented on you second post on stackoverflow :smile:
Great job, looks like you figured it out yourself!

As said on SO, the throw would probably look “more Rx” if replaced with a return Observable.error(t1);.
Now that I look at it a second time, I think I see one potential improvement:

You are using first(). This will error if no value is emitted, which is a possibility if the key doesn’t exist.
You may want another behavior in this case? Other options are take(1) that will just result in an empty observable or firstOrDefault(null) that will emit null instead of throwing (or whatever default value you pass to the operator).

I will add this as well on SO.
Happy coding!

1 Like