Hi,
We have upgraded to latest Couchbase driver to fix issue with earlier thread leak.
To be able to use rx async features we need to use our own Sheduler (threadpool)
DefaultCouchbaseEnvironment.builder().scheduler(Schedulers.from(executorService)).build();
This because we use own custom dynamic classloader and this classloader need to be used
when continue the flow in rx chain to recognise dynamic loaded classes.
The upgrade to rx 1.0.15 enable us to shutdown all associated threads with Schedulers.shutdown();
This solved the thread leak, but because we use a custom scheduler this cause us to not be able to reconnect again
in the same execution. This cause exception
rx.exceptions.OnErrorNotImplementedException: Task
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@65466a6a
rejected from
java.util.concurrent.ScheduledThreadPoolExecutor@4ddced80[Terminated,
pool size = 0, active threads = 0, queued tasks = 0, completed tasks =
0]
at rx.Observable$27.onError(Observable.java:7996)
================================
16/01/12 09:09:47 WARN env.CoreEnvironment: More than 1 Couchbase
Environments found (2), this can have severe impact on performance and
stability. Reuse environments!
Exception in thread "main"
rx.exceptions.OnErrorNotImplementedException: Task
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@65466a6a
rejected from
java.util.concurrent.ScheduledThreadPoolExecutor@4ddced80[Terminated,
pool size = 0, active threads = 0, queued tasks = 0, completed tasks =
0]
at rx.Observable$27.onError(Observable.java:7996)
at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:158)
at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:119)
at rx.Observable.subscribe(Observable.java:8273)
at rx.Observable.subscribe(Observable.java:8233)
at rx.Observable.subscribe(Observable.java:7987)
at com.couchbase.client.core.metrics.AbstractMetricsCollector.<init>(AbstractMetricsCollector.java:59)
at com.couchbase.client.core.metrics.RuntimeMetricsCollector.<init>(RuntimeMetricsCollector.java:42)
at com.couchbase.client.core.env.DefaultCoreEnvironment.<init>(DefaultCoreEnvironment.java:293)
at com.couchbase.client.java.env.DefaultCouchbaseEnvironment.<init>(DefaultCouchbaseEnvironment.java:124)
at com.couchbase.client.java.env.DefaultCouchbaseEnvironment.<init>(DefaultCouchbaseEnvironment.java:55)
at com.couchbase.client.java.env.DefaultCouchbaseEnvironment$Builder.build(DefaultCouchbaseEnvironment.java:472)
at com.digitalroute.couchbase.aggregation.rx.CbThreadLeak.connect_and_disconnect(CbThreadLeak.java:28)
at com.digitalroute.couchbase.aggregation.rx.CbThreadLeak.main(CbThreadLeak.java:16)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: java.util.concurrent.RejectedExecutionException: Task
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@65466a6a
rejected from
java.util.concurrent.ScheduledThreadPoolExecutor@4ddced80[Terminated,
pool size = 0, active threads = 0, queued tasks = 0, completed tasks =
0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326)
at java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533)
at rx.schedulers.ExecutorScheduler$ExecutorSchedulerWorker.schedule(ExecutorScheduler.java:153)
at rx.Scheduler$Worker.schedulePeriodically(Scheduler.java:129)
at rx.internal.operators.OnSubscribeTimerPeriodically.call(OnSubscribeTimerPeriodically.java:47)
at rx.internal.operators.OnSubscribeTimerPeriodically.call(OnSubscribeTimerPeriodically.java:30)
at rx.Observable.subscribe(Observable.java:8266)
... 15 more
================================
We have seen there is a builder scheduler(Scheduler scheduler, ShutdownHook shutdownHook) method.
Is this a solution to this problem? Can you provide a code example in that case
We attach a smal application that reproduce this problem.
Regards
Martin
<a class="attachment" href="//cdck-file-uploads-global.s3.dualstack.us-west-2.amazonaws.com/couchbase/original/2X/5/5f0f1f6f071222faf6be75d2b541567598ecf879.zip">RestartProblem.java.zip</a> (1.1 KB)