RxJava & Couchbase Sdk clarification

java

#1

I am trying to do a bulk insert using the following Rx code in my application logic:
LOGGER.info("Main thread : " + Thread.currentThread() + “----” + Thread.currentThread().getId());

Observable
.from(items)
.subscribeOn(Schedulers.io())
.map(item->{return convertJavaToJsonObject(item,CbKeyHelper.createCbKey(item.getid));})
.flatMap(docToInsert->asyncBucket.insert(docToInsert)) .retryWhen(RetryBuilder.anyOf(TemporaryFailureException.class).delay(Delay.exponential(TimeUnit.MILLISECONDS, 5)).max(3).build())
.toBlocking()
.forEach(insertedDoc ->{
final Item item = convertToJava(JsonObject.fromJson(insertedDoc.content()));
storedIds.add(item.getId);
LOGGER.info("Current thread : " + Thread.currentThread() + “----” + Thread.currentThread().getId());
});

Thread output :
Main thread : Thread[main,5,main]----1
Current thread : Thread[cb-computations-5,5,main]----48
Current thread : Thread[cb-computations-5,5,main]----48
Current thread : Thread[cb-computations-5,5,main]----48
Current thread : Thread[cb-computations-5,5,main]----48

My understanding is :
The streams of items is sent from observable is one by one converted to RawJsonDocument using map. Flatmap takes all this documents and inserts them in separate threads in parallel. This is taken care by couchbase and its request reponse buffer. If some insert fails retryWhen comes into picture. To blocking blocks the main thread till all the documents are inserted. forEach receives all the documents emitted by the observable until some error occurs.
The subscribeOn method tells which thread pool to use. In this case IO.

Why I doubt that something is wrong with my understanding is because when I tried to print the thread ids. First I get it as a computation thread. I was expecting a io thread. And second, all thread ids are same?

Also what happens if I put a onNext after retryWhen instead of forEach? What will be the result in that case.

Please help.


#2

Hi @shreyas.sangai,
The subscribeOn on the chain would cause the Observable.from(items) to execute and emit items on the Rx IO Scheduler. Later the observable is passed into the couchbase client core which uses the couchbase computation and IO thread pool for observing notifications on it.
Yes, your understanding about toBlocking is right, it will block the main thread, then the foreach call still executes on the emission thread that is the couchbase computation pool thread whereas subscribe will change to current thread.
The computation pool size is calculated based on the number of processors available to the JVM. In this particular example it seems like all the items were received on the same thread, is it possible to try and check the behavior for larger set of items.

Thanks