Legacy code + multithreading + rx couchbase method = timeout errors

Hi Folks !

I am facing a problem and i cannot understand why is this happening.
I am using CB Server 4.6.2 ( local ) , Java sdk 2.4.7

I have a rx bulkGet method which end up doing a .toList().toBlocking().single();

This method is being called inside a for loop like this one:

int nThreads = 5;

ExecutorServices executorService = Executors.newFixedThreadPool(nThreads)
final CountDownLatch countDownLatch = new CountDownLatch(X);

for(...){
    executorService.submit(new Runnable() {
                @Override
                public void run() {
                 call rx bulkGet method
                 countDownLatch.countDown()
                 }
              });
}

countDownLatch.await();

The problem i am facing is that if nThreads >= 5 i am getting lot of timeouts.
If nThreads <= 3 no timeout.

The weird thing is that i did a tests using an rx flow and calling bulkGet as part of that flow using a subscribeOn with the same executor and no timeout error is happening, not even when nThreads = 10.

Seems that .toBlocking() isn’t working very good under heavy load ( 40K ops/second )

Any ideas ?

@german.barros, I don’t think the issue is with toBlocking. In your example, >= 5 threads causes too much parallelization of requests. Couchbase can’t keep up. The lower thread number limits requests sufficiently to avoid timeouts.

I’d have to see the RX Flow example to understand why subscribeOn with the same thread executor has no issues regardless of the thread count. It could be how you’re subscribing.

Jeff

Hi Jeff !!! Sorry for the late response !

Basically my rx flow is as follows:

Observable.from(keys)
 .flatMap(/* Here i am calling an asyncGet with fallback logic, here there's a timeout but is not being thrown */)
.flatMap(/* Convert the json retrieved  to object and generates new observable with it  */)
.timeout(/* This timeout is being thrown */)
.toList()
.toBlocking()
.singleORDefault(new ArrayList()).

Hi @german.barros,

I can’t say much more on the timeouts you’re seeing with your executor example. My only suggestion is to use flatMap with a maxConcurrent argument instead of using the ExecutorService. That would allow you to create a single timeout for all the work you’re performing.

See this forum post for more on the maxConcurrent approach:

You can also limit in-flight delete requests by limiting the number of subscribers to the flatMap operator performing the deletions. Choose an Observable.flatMap() method that takes a maxConcurrent parameter.

Jeff

Thanks Jeff ! Will try that !