OutofMemory Error - Async Bulk Get and Insert

Hi,
What would be the best solution to implement bulk GET and bulk INSERT operation in Couchbase SDK 2.7.9 ?

I have implemented the code in a multithreaded way and am getting OutOfMemory Error sporadically:
2020-06-05 23:43:38,745 RxComputationScheduler-6 WARN com.couchbase.client.core.endpoint.AbstractGenericHandler [/10.7.54.107:11210][KeyValueEndpoint]: Got error while consuming KeepAliveResponse.
java.util.concurrent.TimeoutException
Caused by: java.lang.OutOfMemoryError: Java heap space
Exception in thread “cb-computations-24” java.lang.IllegalStateException: Fatal Exception thrown on Scheduler.Worker thread.
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:59)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.OutOfMemoryError: Java heap space

My code is as follows:

//MULTITHREAD
public class CouchBaseProcessor {
int threadSize = 10;
List keys; //assume this list has millions of keys in it
for(int i=0; i<threadSize; i++){
List subKeys; //splitting the millions of keys into subKeys based on the thread size.
CouchBaseWrapper.bulkGetAndInsert(subKeys);
}
}
}

public class CouchBaseWrapper {
private void asyncBulkGetAndInsert(List subKeys) {
//Further partitioning keys into chunks, fearing Couchbase will faill to return
List<List> listOfLists = Lists.partition(subKeys, 10000);
for(List subList: listOfLists){
List purgeableDocs = cbWrapper.asyncBulkGet(subList, fromBucket);
cbWrapper.asyncBulkInsert(purgeableDocs, toBucket);
}
}

public List asyncBulkGet(Iterable keys, Bucket fromBucket) {
int MAX_CONCURRENT = 10; //what is the max value i can assign it here?
AsyncBucket asyncBucket = fromBucket.async();
List items = Observable
.from(keys)
.flatMap(new Func1<String, Observable>() {
@SuppressWarnings(“unchecked”)
@Override
public Observable call(String key) {
return (Observable) asyncBucket.get(key, LegacyDocument.class);
}
},MAX_CONCURRENT)
.subscribeOn(Schedulers.io())
.toList()
.toBlocking()
.single();
return items;
}

public void asyncBulkInsert(Iterable items, Bucket toBucket) {
int MAX_CONCURRENT = 10; //what is the max value i can assign it here?
AsyncBucket asyncBucket = toBucket.async();
Observable
.from(items)
.flatMap(new Func1<LegacyDocument, Observable>() {
@Override
public Observable call(final LegacyDocument doc) {
return asyncBucket.insert(doc)
.onErrorResumeNext(Observable.empty());
}
},MAX_CONCURRENT)
.subscribeOn(Schedulers.io())
.lastOrDefault(null)
.toBlocking()
.single();
}
}

Hello @ismail.iqbal.ap, there are quite few examples listed in here not sure if this would help but worth looking into

Thanks for your suggestion. I take a look into the document, which talks about Error Handling. But I doubt a flaw in my code that am I doing it in a right way?