Hi Suresh,
Thanks for the further details. Before I get into a possible solution I recommend you review the following document. Async and Reactive APIs | Couchbase Docs it specifically states (emphasis added):
We recommend using the reactive API over the CompletableFuture counterpart because it provides all the bells and whistles you need to build scalable asynchronous stacks.
We recommend using this API [CompletableFuture] only if you are either writing integration code for higher level concurrency mechanisms or you really need the last drop of performance. In all other cases, the blocking API (for simplicity) or the reactive API (for richness in operators) is likely the better choice.`
I’ve played with the CompletableFuture API before it works but it takes effort to get right (note I am no expert as my focus is on Eventing Service - serverless JavaScript compute).
Note, you might add your own RetryStrategy as per Handling Errors | Couchbase Docs although it is not absoulutly nessesary.
I think what you are seeing is that when the KV system becomes busy for the first time you then overwhelm the KV system by stacking up 1000’s of async requests. Unlike doing CompletableFuture in pure Java there is a limit to the number concurrent async requests that the Couchbase Java SDK can handle.
In my quick solution, I just put in a bit of flow control so as not to overwhelm KV by keeping the CompletableFuture “queue” under some sane bound (I used 512).
I am sure that the Couchbase Java SDK experts like @daschl and @ingenthr would be able to come up with a more elegant solution for implementing flow control or callbacks (so take my solution with a grain of salt). I just wanted to play with the Java SDK3 a bit and learn about Java’s CompletableFuture (which I am still a novice at) thus my implementation may not be elegant or optimal.
Please try this standalone code I provided and adopt the technique for your own use ( it is set to 1,000,000 inserts) you can play with the flow control in “while (queuedRequests.get() >= 512)” try 4, 16, 64, 1024, 2048, 8192 and see what happens. Note hopefully you have a seperate insertion thread and don’t do your loading directly from main.
import com.couchbase.client.java.AsyncCollection;
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.Cluster;
import com.couchbase.client.java.ClusterOptions;
import com.couchbase.client.java.env.ClusterEnvironment;
import com.couchbase.client.java.json.JsonObject;
import com.couchbase.client.java.kv.MutationResult;
import com.couchbase.client.core.env.IoConfig;
import com.couchbase.client.core.env.IoEnvironment;
import com.couchbase.client.core.env.TimeoutConfig;
import com.couchbase.client.core.retry.BestEffortRetryStrategy;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;
public class CompletableUpsertTest {
private final static Logger LOGGER = Logger.getLogger(/* MyLogger.class.getName() */ "com");
public static void main(String[] args) throws InterruptedException {
AtomicLong inputDocCount = new AtomicLong(0);
AtomicLong bucketWrSucc = new AtomicLong(0);
AtomicLong queuedRequests = new AtomicLong(0);
BestEffortRetryStrategy mrs = BestEffortRetryStrategy.INSTANCE.withExponentialBackoff(Duration.ofMillis(1200), Duration.ofSeconds(29), 2);
ClusterEnvironment env = ClusterEnvironment.builder()
.retryStrategy(mrs)
.ioEnvironment(IoEnvironment
.eventLoopThreadCount(2)
)
.ioConfig(IoConfig
.numKvConnections(6)
.maxHttpConnections(32)
)
.timeoutConfig(TimeoutConfig.kvTimeout(Duration.ofSeconds(30)))
.build();
Cluster cluster = Cluster
.connect("192.168.3.150", ClusterOptions
.clusterOptions("Administrator","password")
.environment(env));
cluster.waitUntilReady(Duration.ofSeconds(10));
Bucket bucket = cluster.bucket("destination");
bucket.waitUntilReady(Duration.ofSeconds(10));
AsyncCollection asyncBucketCollection = bucket.defaultCollection().async();
long beg = System.currentTimeMillis();
for (int i = 0; i < 1000000; i++) {
inputDocCount.getAndIncrement();
queuedRequests.getAndIncrement();
String key = "tstkey:" + i;
// Make arandom number between 1 ... 100K
int random_int = (int) (Math.random() * (100000 - 1 + 1) + 1);
JsonObject data =JsonObject.create().put("id", i).put("type", "tstkey").put("data", random_int);
// Only allow 512 outstanding async requests (respond to back pressure)
while (queuedRequests.get() >= 512) {
// wait until we are < 512
try { Thread.sleep(1); } catch (InterruptedException e) {};
}
CompletableFuture<MutationResult> result = asyncBucketCollection.upsert(key, data /* ,upsertOptions */)
.whenComplete((res, ex) -> {
// Decrement to free up the flow control for KV writes via the SDK
queuedRequests.getAndDecrement();
if (ex == null) {
// no exception we did our upsert
bucketWrSucc.incrementAndGet();
} else {
// something went wrong
System.out.println("Had Exception: " + ex);
}
});
}
long end = System.currentTimeMillis();
// stop and clean up the Java SDK
cluster.disconnect(Duration.ofSeconds(10));
// emit some stats my single node non-MDS hits 50K ops.sec.
System.out.println("exiting duration in seconds " + (end-beg)/1000 +
", count " + inputDocCount.get() +
", succ " + bucketWrSucc.get() +
", fail " + (inputDocCount.get() - bucketWrSucc.get() ) +
", ops/sec. " + inputDocCount.get() /((end-beg)/1000.0 + 0.000000001) );
}
}
Upserting 1M docs
exiting duration in seconds 19, count 1000000, succ 1000000, fail 0, ops/sec. 51674.245553344655
Upserting 10M docs
exiting duration in seconds 145, count 10000000, succ 10000000, fail 0, ops/sec. 68717.6597509073
Upserting 50M docs
exiting duration in seconds 729, count 50000000, succ 50000000, fail 0, ops/sec. 68584.84768667618