Unable to reconnect couchbase using custom rx Sheduler with Couchbase java api 2.2.3

Hi,

We have upgraded to latest Couchbase driver to fix issue with earlier thread leak.

To be able to use rx async features we need to use our own Sheduler (threadpool)

DefaultCouchbaseEnvironment.builder().scheduler(Schedulers.from(executorService)).build();

This because we use own custom dynamic classloader and this classloader need to be used

when continue the flow in rx chain to recognise dynamic loaded classes.

The upgrade to rx 1.0.15 enable us to shutdown all associated threads with Schedulers.shutdown();

This solved the thread leak, but because we use a custom scheduler this cause us to not be able to reconnect again

in the same execution. This cause exception

rx.exceptions.OnErrorNotImplementedException: Task
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@65466a6a
rejected from
java.util.concurrent.ScheduledThreadPoolExecutor@4ddced80[Terminated,
pool size = 0, active threads = 0, queued tasks = 0, completed tasks =
0]

at rx.Observable$27.onError(Observable.java:7996)

================================

16/01/12 09:09:47 WARN env.CoreEnvironment: More than 1 Couchbase
Environments found (2), this can have severe impact on performance and
stability. Reuse environments!

Exception in thread "main"
rx.exceptions.OnErrorNotImplementedException: Task
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@65466a6a
rejected from
java.util.concurrent.ScheduledThreadPoolExecutor@4ddced80[Terminated,
pool size = 0, active threads = 0, queued tasks = 0, completed tasks =
0]

at rx.Observable$27.onError(Observable.java:7996)

at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:158)

at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:119)

at rx.Observable.subscribe(Observable.java:8273)

at rx.Observable.subscribe(Observable.java:8233)

at rx.Observable.subscribe(Observable.java:7987)

at com.couchbase.client.core.metrics.AbstractMetricsCollector.<init>(AbstractMetricsCollector.java:59)

at com.couchbase.client.core.metrics.RuntimeMetricsCollector.<init>(RuntimeMetricsCollector.java:42)

at com.couchbase.client.core.env.DefaultCoreEnvironment.<init>(DefaultCoreEnvironment.java:293)

at com.couchbase.client.java.env.DefaultCouchbaseEnvironment.<init>(DefaultCouchbaseEnvironment.java:124)

at com.couchbase.client.java.env.DefaultCouchbaseEnvironment.<init>(DefaultCouchbaseEnvironment.java:55)

at com.couchbase.client.java.env.DefaultCouchbaseEnvironment$Builder.build(DefaultCouchbaseEnvironment.java:472)

at com.digitalroute.couchbase.aggregation.rx.CbThreadLeak.connect_and_disconnect(CbThreadLeak.java:28)

at com.digitalroute.couchbase.aggregation.rx.CbThreadLeak.main(CbThreadLeak.java:16)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:483)

at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)

Caused by: java.util.concurrent.RejectedExecutionException: Task
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@65466a6a
rejected from
java.util.concurrent.ScheduledThreadPoolExecutor@4ddced80[Terminated,
pool size = 0, active threads = 0, queued tasks = 0, completed tasks =
0]

at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)

at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)

at java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326)

at java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533)

at rx.schedulers.ExecutorScheduler$ExecutorSchedulerWorker.schedule(ExecutorScheduler.java:153)

at rx.Scheduler$Worker.schedulePeriodically(Scheduler.java:129)

at rx.internal.operators.OnSubscribeTimerPeriodically.call(OnSubscribeTimerPeriodically.java:47)

at rx.internal.operators.OnSubscribeTimerPeriodically.call(OnSubscribeTimerPeriodically.java:30)

at rx.Observable.subscribe(Observable.java:8266)

... 15 more

================================

We have seen there is a builder scheduler(Scheduler scheduler, ShutdownHook shutdownHook) method.

Is this a solution to this problem? Can you provide a code example in that case

We attach a smal application that reproduce this problem.

Regards

Martin

  <a class="attachment" href="//cdck-file-uploads-global.s3.dualstack.us-west-2.amazonaws.com/couchbase/original/2X/5/5f0f1f6f071222faf6be75d2b541567598ecf879.zip">RestartProblem.java.zip</a> (1.1 KB)

Hi Martin,

The problem with Schedulers.shutdown() is that it completely shuts down RxJava, making it unusable for the remainder of the program’s execution…

Since you want to shutdown your own executor, you can indeed do so by implementing the ShutdownHook to simply call executor.shutdownNow(), that’s a possibility. However, don’t stop RxJava if your application will continue to run.

Side Note
While testing, I also noticed that if you don’t call Schedulers.shutdown(), after a few seconds the following exception is dumped in the console:

Exception in thread "RxScheduledExecutorPool-3" java.lang.IllegalStateException: Fatal Exception thrown on Scheduler.Worker thread.
	at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:62)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.RejectedExecutionException: Task rx.schedulers.ExecutorScheduler$ExecutorSchedulerWorker@2402d0a6 rejected from java.util.concurrent.ThreadPoolExecutor@203d50fe[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 31]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
	at rx.schedulers.ExecutorScheduler$ExecutorSchedulerWorker.schedule(ExecutorScheduler.java:78)
	at rx.schedulers.ExecutorScheduler$ExecutorSchedulerWorker$2.call(ExecutorScheduler.java:136)
	at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
	... 7 more

It seems that wrapping an ExecutorService that isn’t a ScheduledExecutorService into a Scheduler will cause this… If your executor wasn’t a ScheduledExecutorService, I would use one instead (eg. Executors.newScheduledThreadPool(10, Executors.privilegedThreadFactory()) for your example).