ClassCastException when async insert with ReplicateTo.ONE and with retry


#1

When I insert (asynchronously) to Couchbase server (3 nodes cluster) with ReplicateTo.ONE and with retry, the Java Client always throws java.lang.ClassCastException exception.

If no retry, the client will rather throws “DurabilityException: Durability requirement failed: null” exception.

I am using couchbase-java-client 2.1.4. Any input is very much appreciated!

Code snippet (which throws ClassCastException):

final CountDownLatch latch = new CountDownLatch(1);

Observable.defer(new Func0<Observable<JsonDocument>>() {

    @Override
    public Observable<JsonDocument> call() {
        return bucket
                .async()
                .insert(JsonDocument.create(documentId, jsonContent), replicateTo)
                .timeout(10, TimeUnit.SECONDS);
    }
})

.retryWhen(RetryBuilder
            .any()
            .max(10)
            .delay(Delay.exponential(TimeUnit.SECONDS, 1))
            .build())

.subscribe(new Subscriber<JsonDocument>() {

    @Override
    public void onCompleted() {
        latch.countDown();
    }
    
    @Override
    public void onError(Throwable e) {
        e.printStackTrace();
        latch.countDown();
    }

    @Override
    public void onNext(JsonDocument t) {
    }
});

try {
    latch.await();
} catch (InterruptedException ex) {
}

The cluster environment:

3 nodes cluster, all with version 3.1.0-1776 Enterprise Edition (build-1776-rel).

Bucket:
Access Control: Authentication 
Replicas: 1 replica copy 
Compaction: Not active 
Cache Metadata: Value Eviction 
Disk I/O priority: Low

Index replicas: not enabled
Flush: enabled

Exception (ClassCastException):

$ 
Aug 25, 2015 6:02:29 PM com.couchbase.client.core.CouchbaseCore <init>
INFO: CouchbaseEnvironment: {sslEnabled=false, sslKeystoreFile='null', sslKeystorePassword='null', queryEnabled=false, queryPort=8093, bootstrapHttpEnabled=true, bootstrapCarrierEnabled=true, bootstrapHttpDirectPort=8091, bootstrapHttpSslPort=18091, bootstrapCarrierDirectPort=11210, bootstrapCarrierSslPort=11207, ioPoolSize=32, computationPoolSize=32, responseBufferSize=16384, requestBufferSize=16384, kvServiceEndpoints=1, viewServiceEndpoints=1, queryServiceEndpoints=1, ioPool=NioEventLoopGroup, coreScheduler=CoreScheduler, eventBus=DefaultEventBus, packageNameAndVersion=couchbase-java-client/2.1.4 (git: 2.1.4), dcpEnabled=false, retryStrategy=BestEffort, maxRequestLifetime=75000, retryDelay=ExponentialDelay{growBy 1.0 MICROSECONDS; lower=100, upper=100000}, reconnectDelay=ExponentialDelay{growBy 1.0 MILLISECONDS; lower=32, upper=4096}, observeIntervalDelay=ExponentialDelay{growBy 1.0 MICROSECONDS; lower=10, upper=100000}, keepAliveInterval=30000, autoreleaseAfter=2000, bufferPoolingEnabled=true, queryTimeout=75000, viewTimeout=75000, kvTimeout=2500, connectTimeout=5000, disconnectTimeout=25000, dnsSrvEnabled=false}
Aug 25, 2015 6:02:29 PM com.couchbase.client.core.node.CouchbaseNode$1 call
INFO: Connected to Node 172.28.3.1
Aug 25, 2015 6:02:29 PM com.couchbase.client.core.config.DefaultConfigurationProvider$6 call
INFO: Opened bucket Aggr
Aug 25, 2015 6:02:29 PM com.couchbase.client.core.node.CouchbaseNode$1 call
INFO: Connected to Node 172.28.6.1
Aug 25, 2015 6:02:29 PM com.couchbase.client.core.node.CouchbaseNode$1 call
INFO: Connected to Node 172.28.8.1
Inserted: 0 objects
Aug 25, 2015 6:02:30 PM com.couchbase.client.core.endpoint.AbstractGenericHandler$1 call
WARNING: Caught exception while onNext on observable
java.lang.IllegalStateException: This instance has been unsubscribed and the queue is no longer usable.
        at rx.internal.util.RxRingBuffer.onNext(RxRingBuffer.java:335)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.enqueue(OperatorMerge.java:680)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:657)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:545)
        at rx.internal.operators.OperatorOnErrorResumeNextViaObservable$1.onNext(OperatorOnErrorResumeNextViaObservable.java:64)
        at rx.subjects.SubjectSubscriptionManager$SubjectObserver.onNext(SubjectSubscriptionManager.java:224)
        at rx.subjects.AsyncSubject.onCompleted(AsyncSubject.java:101)
        at com.couchbase.client.core.endpoint.AbstractGenericHandler$1.call(AbstractGenericHandler.java:199)
        at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:47)
        at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
        at java.util.concurrent.FutureTask.run(Unknown Source)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown Source)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)

Aug 25, 2015 6:02:31 PM com.couchbase.client.core.endpoint.AbstractGenericHandler$1 call
WARNING: Caught exception while onNext on observable
java.lang.IllegalStateException: This instance has been unsubscribed and the queue is no longer usable.
        at rx.internal.util.RxRingBuffer.onNext(RxRingBuffer.java:335)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.enqueue(OperatorMerge.java:680)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:657)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:545)
        at rx.internal.operators.OperatorOnErrorResumeNextViaObservable$1.onNext(OperatorOnErrorResumeNextViaObservable.java:64)
        at rx.subjects.SubjectSubscriptionManager$SubjectObserver.onNext(SubjectSubscriptionManager.java:224)
        at rx.subjects.AsyncSubject.onCompleted(AsyncSubject.java:101)
        at com.couchbase.client.core.endpoint.AbstractGenericHandler$1.call(AbstractGenericHandler.java:199)
        at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:47)
        at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
        at java.util.concurrent.FutureTask.run(Unknown Source)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown Source)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)

Aug 25, 2015 6:02:31 PM com.couchbase.client.core.endpoint.AbstractGenericHandler$1 call
WARNING: Caught exception while onNext on observable
java.lang.IllegalStateException: This instance has been unsubscribed and the queue is no longer usable.
        at rx.internal.util.RxRingBuffer.onNext(RxRingBuffer.java:335)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.enqueue(OperatorMerge.java:680)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:657)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:545)
        at rx.internal.operators.OperatorOnErrorResumeNextViaObservable$1.onNext(OperatorOnErrorResumeNextViaObservable.java:64)
        at rx.subjects.SubjectSubscriptionManager$SubjectObserver.onNext(SubjectSubscriptionManager.java:224)
        at rx.subjects.AsyncSubject.onCompleted(AsyncSubject.java:101)
        at com.couchbase.client.core.endpoint.AbstractGenericHandler$1.call(AbstractGenericHandler.java:199)
        at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:47)
        at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
        at java.util.concurrent.FutureTask.run(Unknown Source)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown Source)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)

java.lang.ClassCastException: com.couchbase.client.core.message.kv.ObserveResponse cannot be cast to com.couchbase.client.java.document.JsonDocument
        at cbInsert$2.onNext(cbInsert.java:114)
        at rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:130)
        at rx.internal.operators.OnSubscribeRedo$2$1.onNext(OnSubscribeRedo.java:231)
        at rx.observers.SerializedObserver.onNext(SerializedObserver.java:159)
        at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:81)
        at rx.internal.operators.OperatorTimeoutBase$TimeoutSubscriber.onNext(OperatorTimeoutBase.java:126)
        at rx.internal.operators.NotificationLite.accept(NotificationLite.java:150)
        at rx.internal.util.RxRingBuffer.accept(RxRingBuffer.java:432)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.drainAll(OperatorMerge.java:725)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.drainQueue(OperatorMerge.java:741)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:612)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:545)
        at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$1.onNext(OperatorOnErrorResumeNextViaFunction.java:89)
        at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
        at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
        at rx.internal.operators.OperatorTake$1.onNext(OperatorTake.java:67)
        at rx.internal.operators.OperatorSkipWhile$1.onNext(OperatorSkipWhile.java:45)
        at rx.internal.operators.OnSubscribeRedo$2$1.onNext(OnSubscribeRedo.java:231)
        at rx.internal.operators.OperatorScan$2.onNext(OperatorScan.java:110)
        at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:635)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:545)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:635)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:545)
        at rx.internal.operators.OperatorOnErrorResumeNextViaObservable$1.onNext(OperatorOnErrorResumeNextViaObservable.java:64)
        at rx.subjects.SubjectSubscriptionManager$SubjectObserver.onNext(SubjectSubscriptionManager.java:224)
        at rx.subjects.AsyncSubject.onCompleted(AsyncSubject.java:101)
        at com.couchbase.client.core.endpoint.AbstractGenericHandler$1.call(AbstractGenericHandler.java:199)
        at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:47)
        at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
        at java.util.concurrent.FutureTask.run(Unknown Source)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown Source)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)

Error on insertWithRetry, exception: 'com.couchbase.client.core.message.kv.ObserveResponse cannot be cast to com.couchbase.client.java.document.JsonDocument'. Document id: 1440496949571_1880
Exception in thread "main" java.lang.Exception: com.couchbase.client.core.message.kv.ObserveResponse cannot be cast to com.couchbase.client.java.document.JsonDocument
        at cbInsert.insertWithRetry(cbInsert.java:141)
        at cbInsert.execute(cbInsert.java:48)
        at cbInsert.main(cbInsert.java:146)
$ 

Exception (DurabilityException):

$
Aug 25, 2015 6:01:01 PM com.couchbase.client.core.CouchbaseCore <init>
INFO: CouchbaseEnvironment: {sslEnabled=false, sslKeystoreFile='null', sslKeystorePassword='null', queryEnabled=false, queryPort=8093, bootstrapHttpEnabled=true, bootstrapCarrierEnabled=true, bootstrapHttpDirectPort=8091, bootstrapHttpSslPort=18091, bootstrapCarrierDirectPort=11210, bootstrapCarrierSslPort=11207, ioPoolSize=32, computationPoolSize=32, responseBufferSize=16384, requestBufferSize=16384, kvServiceEndpoints=1, viewServiceEndpoints=1, queryServiceEndpoints=1, ioPool=NioEventLoopGroup, coreScheduler=CoreScheduler, eventBus=DefaultEventBus, packageNameAndVersion=couchbase-java-client/2.1.4 (git: 2.1.4), dcpEnabled=false, retryStrategy=BestEffort, maxRequestLifetime=75000, retryDelay=ExponentialDelay{growBy 1.0 MICROSECONDS; lower=100, upper=100000}, reconnectDelay=ExponentialDelay{growBy 1.0 MILLISECONDS; lower=32, upper=4096}, observeIntervalDelay=ExponentialDelay{growBy 1.0 MICROSECONDS; lower=10, upper=100000}, keepAliveInterval=30000, autoreleaseAfter=2000, bufferPoolingEnabled=true, queryTimeout=75000, viewTimeout=75000, kvTimeout=2500, connectTimeout=5000, disconnectTimeout=25000, dnsSrvEnabled=false}
Aug 25, 2015 6:01:02 PM com.couchbase.client.core.node.CouchbaseNode$1 call
INFO: Connected to Node 172.28.3.1
Aug 25, 2015 6:01:02 PM com.couchbase.client.core.config.DefaultConfigurationProvider$6 call
INFO: Opened bucket Aggr
Aug 25, 2015 6:01:02 PM com.couchbase.client.core.node.CouchbaseNode$1 call
INFO: Connected to Node 172.28.6.1
Aug 25, 2015 6:01:02 PM com.couchbase.client.core.node.CouchbaseNode$1 call
INFO: Connected to Node 172.28.8.1
Inserted: 0 objects
Aug 25, 2015 6:01:02 PM com.couchbase.client.core.endpoint.AbstractGenericHandler$1 call
WARNING: Caught exception while onNext on observable
java.lang.IllegalStateException: This instance has been unsubscribed and the queue is no longer usable.
        at rx.internal.util.RxRingBuffer.onNext(RxRingBuffer.java:335)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.enqueue(OperatorMerge.java:680)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:657)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:545)
        at rx.internal.operators.OperatorOnErrorResumeNextViaObservable$1.onNext(OperatorOnErrorResumeNextViaObservable.java:64)
        at rx.subjects.SubjectSubscriptionManager$SubjectObserver.onNext(SubjectSubscriptionManager.java:224)
        at rx.subjects.AsyncSubject.onCompleted(AsyncSubject.java:101)
        at com.couchbase.client.core.endpoint.AbstractGenericHandler$1.call(AbstractGenericHandler.java:199)
        at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:47)
        at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
        at java.util.concurrent.FutureTask.run(Unknown Source)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown Source)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)

Aug 25, 2015 6:01:04 PM com.couchbase.client.core.endpoint.AbstractGenericHandler$1 call
WARNING: Caught exception while onNext on observable
java.lang.IllegalStateException: This instance has been unsubscribed and the queue is no longer usable.
        at rx.internal.util.RxRingBuffer.onNext(RxRingBuffer.java:335)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.enqueue(OperatorMerge.java:680)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:657)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:545)
        at rx.internal.operators.OperatorOnErrorResumeNextViaObservable$1.onNext(OperatorOnErrorResumeNextViaObservable.java:64)
        at rx.subjects.SubjectSubscriptionManager$SubjectObserver.onNext(SubjectSubscriptionManager.java:224)
        at rx.subjects.AsyncSubject.onCompleted(AsyncSubject.java:101)
        at com.couchbase.client.core.endpoint.AbstractGenericHandler$1.call(AbstractGenericHandler.java:199)
        at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:47)
        at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
        at java.util.concurrent.FutureTask.run(Unknown Source)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown Source)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)

Aug 25, 2015 6:01:05 PM com.couchbase.client.core.endpoint.AbstractGenericHandler$1 call
WARNING: Caught exception while onNext on observable
java.lang.IllegalStateException: This instance has been unsubscribed and the queue is no longer usable.
        at rx.internal.util.RxRingBuffer.onNext(RxRingBuffer.java:335)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.enqueue(OperatorMerge.java:680)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:657)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:545)
        at rx.internal.operators.OperatorOnErrorResumeNextViaObservable$1.onNext(OperatorOnErrorResumeNextViaObservable.java:64)
        at rx.subjects.SubjectSubscriptionManager$SubjectObserver.onNext(SubjectSubscriptionManager.java:224)
        at rx.subjects.AsyncSubject.onCompleted(AsyncSubject.java:101)
        at com.couchbase.client.core.endpoint.AbstractGenericHandler$1.call(AbstractGenericHandler.java:199)
        at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:47)
        at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
        at java.util.concurrent.FutureTask.run(Unknown Source)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown Source)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)

com.couchbase.client.java.error.DurabilityException: Durability requirement failed: null
        at com.couchbase.client.java.CouchbaseAsyncBucket$9$1.call(CouchbaseAsyncBucket.java:410)
        at com.couchbase.client.java.CouchbaseAsyncBucket$9$1.call(CouchbaseAsyncBucket.java:406)
        at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$1.onError(OperatorOnErrorResumeNextViaFunction.java:77)
        at rx.internal.operators.OperatorMap$1.onError(OperatorMap.java:49)
        at rx.internal.operators.OperatorMap$1.onError(OperatorMap.java:49)
        at rx.internal.operators.OperatorTake$1.onError(OperatorTake.java:57)
        at rx.internal.operators.OperatorSkipWhile$1.onError(OperatorSkipWhile.java:54)
        at rx.internal.operators.OnSubscribeRedo$4$1.onError(OnSubscribeRedo.java:300)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.innerError(OperatorMerge.java:429)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.onError(OperatorMerge.java:404)
        at rx.internal.operators.OperatorMap$1.onError(OperatorMap.java:49)
        at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onError(OperatorZip.java:316)
        at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onNext(OperatorZip.java:324)
        at rx.internal.operators.OnSubscribeRange$RangeProducer.request(OnSubscribeRange.java:93)
        at rx.Subscriber.setProducer(Subscriber.java:139)
        at rx.internal.operators.OnSubscribeRange.call(OnSubscribeRange.java:39)
        at rx.internal.operators.OnSubscribeRange.call(OnSubscribeRange.java:27)
        at rx.Observable.unsafeSubscribe(Observable.java:7304)
        at rx.internal.operators.OperatorZip$Zip.start(OperatorZip.java:210)
        at rx.internal.operators.OperatorZip$ZipSubscriber.onNext(OperatorZip.java:154)
        at rx.internal.operators.OperatorZip$ZipSubscriber.onNext(OperatorZip.java:120)
        at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:41)
        at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:30)
        at rx.Observable$1.call(Observable.java:145)
        at rx.Observable$1.call(Observable.java:137)
        at rx.Observable$1.call(Observable.java:145)
        at rx.Observable$1.call(Observable.java:137)
        at rx.Observable$1.call(Observable.java:145)
        at rx.Observable$1.call(Observable.java:137)
        at rx.Observable.unsafeSubscribe(Observable.java:7304)
        at rx.internal.operators.OnSubscribeRedo$4.call(OnSubscribeRedo.java:292)
        at rx.schedulers.TrampolineScheduler$InnerCurrentThreadScheduler.enqueue(TrampolineScheduler.java:81)
        at rx.schedulers.TrampolineScheduler$InnerCurrentThreadScheduler.schedule(TrampolineScheduler.java:59)
        at rx.internal.operators.OnSubscribeRedo.call(OnSubscribeRedo.java:289)
        at rx.internal.operators.OnSubscribeRedo.call(OnSubscribeRedo.java:54)
        at rx.Observable$1.call(Observable.java:145)
        at rx.Observable$1.call(Observable.java:137)
        at rx.Observable$1.call(Observable.java:145)
        at rx.Observable$1.call(Observable.java:137)
        at rx.Observable$1.call(Observable.java:145)
        at rx.Observable$1.call(Observable.java:137)
        at rx.Observable$1.call(Observable.java:145)
        at rx.Observable$1.call(Observable.java:137)
        at rx.Observable$1.call(Observable.java:145)
        at rx.Observable$1.call(Observable.java:137)
        at rx.Observable.unsafeSubscribe(Observable.java:7304)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.handleNewSource(OperatorMerge.java:188)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:158)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:93)
        at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
        at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
        at rx.subjects.SubjectSubscriptionManager$SubjectObserver.onNext(SubjectSubscriptionManager.java:224)
        at rx.subjects.AsyncSubject.onCompleted(AsyncSubject.java:101)
        at com.couchbase.client.core.endpoint.AbstractGenericHandler$1.call(AbstractGenericHandler.java:199)
        at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:47)
        at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
        at java.util.concurrent.FutureTask.run(Unknown Source)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown Source)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)
Caused by: rx.exceptions.MissingBackpressureException
        at rx.internal.util.RxRingBuffer.onNext(RxRingBuffer.java:338)
        at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onNext(OperatorZip.java:322)
        ... 49 more

Error on insert, exception: 'Durability requirement failed: null'. Document id: 1440496862517_4044
Exception in thread "main" java.lang.Exception: Durability requirement failed: null
        at cbInsert.insert(cbInsert.java:89)
        at cbInsert.execute(cbInsert.java:50)
        at cbInsert.main(cbInsert.java:146)
$

Complete source code:

import com.couchbase.client.core.time.Delay;
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.Cluster;
import com.couchbase.client.java.CouchbaseCluster;
import com.couchbase.client.java.ReplicateTo;
import com.couchbase.client.java.document.JsonDocument;
import com.couchbase.client.java.document.json.JsonObject;
import com.couchbase.client.java.util.retry.RetryBuilder;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Func0;

public class cbInsert {

    private final Cluster cluster;
    private final Bucket bucket;
    private final ReplicateTo replicateTo;
    private final String documentPrefix;
    public enum Retry {
        YES, NO
    }

    public cbInsert() {
        cluster = CouchbaseCluster.create("172.28.3.1");
        bucket = cluster.openBucket("bucket", "pass123");
        replicateTo = ReplicateTo.ONE;
        documentPrefix = System.currentTimeMillis() + "_";
    }

    public void execute(Retry retry) throws Exception {
        JsonObject json = JsonObject.create();
        json.put("name", "unique name");

        long counter = 0;

        boolean run = true;
        while (run) {
            if (counter % 5_000 == 0) {
                System.out.println("Inserted: " + counter + " objects");
            }

            String id = documentPrefix + (++counter);
            json.put("counter", counter).put("id", id);

            if (retry == Retry.YES) {
                insertWithRetry(id, json);
            } else {
                insert(id, json);
            }
        }
    }

    private void insert(String documentId, JsonObject jsonContent) throws Exception {
        final StringBuilder exception = new StringBuilder();
        final CountDownLatch latch = new CountDownLatch(1);

        bucket.async()
                .insert(JsonDocument.create(documentId, jsonContent), replicateTo)
                .timeout(50, TimeUnit.SECONDS)
                .subscribe(new Subscriber<JsonDocument>() {

                    @Override
                    public void onCompleted() {
                        latch.countDown();
                    }

                    @Override
                    public void onError(Throwable e) {
                        e.printStackTrace();
                        exception.append(e.getMessage());
                        latch.countDown();
                    }

                    @Override
                    public void onNext(JsonDocument t) {
                    }

                });

        try {
            latch.await();
        } catch (InterruptedException ex) {
        }

        if (exception.length() > 0) {
            System.out.println("\nError on insert, exception: '" + exception.toString() + "'. Document id: " + documentId);
            throw new Exception(exception.toString());
        }
    }

    private void insertWithRetry(final String documentId, final JsonObject jsonContent) throws Exception {
        final StringBuilder exception = new StringBuilder();
        final CountDownLatch latch = new CountDownLatch(1);

        Observable.defer(new Func0<Observable<JsonDocument>>() {

            @Override
            public Observable<JsonDocument> call() {
                return bucket
                        .async()
                        .insert(JsonDocument.create(documentId, jsonContent), replicateTo)
                        .timeout(10, TimeUnit.SECONDS);

            }

        })
        .retryWhen(RetryBuilder
                    .any()
                    .max(10)
                    .delay(Delay.exponential(TimeUnit.SECONDS, 1))
                    .build())
        .subscribe(new Subscriber<JsonDocument>() {

            @Override
            public void onCompleted() {
                latch.countDown();
            }

            @Override
            public void onError(Throwable e) {
                e.printStackTrace();
                exception.append(e.getMessage());
                latch.countDown();
            }

            @Override
            public void onNext(JsonDocument t) {
            }

        });

        try {
            latch.await();
        } catch (InterruptedException ex) {
        }

        if (exception.length() > 0) {
            System.out.println("\nError on insertWithRetry, exception: '" + exception.toString() + "'. Document id: " + documentId);
            throw new Exception(exception.toString());
        }
    }

    public static void main(String[] args) throws Exception {
        new cbInsert().execute(Retry.YES);
    }
}

#2

@hsrky this looks like https://issues.couchbase.com/browse/JVMCBC-234 (or maybe related)

can you try manually bumping your rxjava dependency to 1.0.14 and see if the issue comes up again?


#3

Replacing rxjava-1.0.4.jar with rxjava-1.0.14.jar seems to solve the issue. Thanks.


#4

thanks for letting me know - the upcoming 2.2.0 release will depend on that one - but you can use 1.0.14 with the java client 2.1.4 until then I think without issues.


#5

Thanks, this fixed the issue for me, too.
Didn’t have a ReplicateTo.ONE though, but a bulk async insert.