Data loss and writing data slowly and frequent timeout after auto failover

Used SDK:2.1.2,2.1.3

When I write 1000 data items into the couchbase server,
I deliberately remove a node with “service couchbase-server stop” .
After successful autofailover, I meet a problem with data loss.
And meaningfully I find the writing speed slows down, and even time-ou Exception occurs frequently.

Can you share the code you are using? You need to properly react to the return values from the client.

System.out.println("===>start");

                   client.async().insert(doc, ReplicateTo.ONE)
                  
                   .single()

                   /*.retryWhen(new Func1<Observable<? extends Throwable>, ? extends Observable<?>>(){

                    public  Observable<?> call(Observable<? extends Throwable> t1){
                        
                        t1.zipWith(Observable.range(1, 3), new Func2<Throwable, Integer, Throwable>(){
                            @Override
                            public Throwable call(Throwable t1, Integer t2) {
                                
                                return null;
                            }
                        })
                         .flatMap(new Func1<Throwable, Observable<JsonDocument>>(){
                            @Override
                            public Observable<JsonDocument> call(Throwable t1) {
                                   
                                //if(t1.getClass())
                                return client.async().insert(doc, ReplicateTo.ONE);
                            }
                         });
                         
                         return Observable.timer(3, TimeUnit.SECONDS);
                    
                    }
                   })*/
                   
                   .subscribe(new Subscriber<JsonDocument>() {
                       
                    @Override
                    public void onCompleted() {
                          cdl.countDown();
                    }
                    
                    @Override
                    public void onError(Throwable e) {
                          //returnException.set(e);
                          errNum.set(1);
                          cdl.countDown();
                          
                          System.out.println("onerror===出现异常!" + e.getMessage());
                    }
                    
                    @Override
                    public void onNext(JsonDocument t) {
                           if(t.cas()>0){
                               System.out.println("成功插入");
                           }
                           returnValue.set(t);
                           //completeVal.set(1);
                           //cdl.countDown();
                    }
                   });
                   
                 
                   
                 try{
                        
                        //cdl.await();
                       if(!cdl.await(5000, TimeUnit.MILLISECONDS)){
                              System.out.println("超时; "+errNum.get()+";"+returnValue.get());
                       }else{
                              System.out.println("正常"+errNum.get()+";"+returnValue.get());
                        
                       }
                    }catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        //throw new RuntimeException("Interrupted while waiting for subscription to complete.", e);
                    }
                    System.out.println("===>end");

hello, thank you for your reply.

Hi @xiger,

I would recommend you the following in the first place:

  • From 2.1.2 going forward, use our retry builder - you can use something like:

    .retryWhen(anyOf(Exception.class).max(4).delay(Delay.exponential(TimeUnit.SECONDS))));

  • Do not use latches, use .toBlocking to convert your observable into a Blocking one and then call .single(), .forEach() and so forth on it

  • Make sure that when you retry your observable is cold, either by using Observable.from(doc) and then flatmap the insert or use Observable.defer() and wrap the insert call with it.

Hi,daschl

Why did the writing speed slow down after auto failover, with 500 items in nearly an hour?

I use sdk 2.1.3
And you recommand that not to use .toBlocking in retry builder in product.

I don’t know your actual numbers, but keep in mind that when you failover you have one node less in the cluster to handle your requests, but it should be able to withstand it more or less closely.

Can you share some actual numbers that you are seeing?

toBlocking() should not be used with the retry builder since it’s async at this point, but downstream the observable you can block, finally if you want to.

now i need bring millions of data into cb ,and imagine that I meet with node failure, the speed is so intolerable.
How can i do?

So you are performing a bulk import? Or is it part of a OLTP appplication - this changes the retry semantics quite a bit because you mostly can accept longer delays on latency and go for max throughput (in the bulk case).

no,

I need the data to complete the verifying process of data within the files uploaded in the web app.

And the data in couchbase server is updated frequentely.

@xiger are you able to share the full code to reproduce with some logs and more? So we can have an accurrate reproduction case here and help you moving forward.

The situation is that :smile:

we havenot use the couchbase server in the web app.

we used the memcahed server to store the data to support the speed of the process of parsing and verfying of the file uploaded, and now as of memcahed’s bad avaliablity, we use couchbase server to replace the memcached.
because data changes in time, we also do the data syn.
And out storage environment: sqlserver and memcached.–subsequently---->sqlserver and couchbase server

I find .toBlock same as latch.await() above.

@xiger the toBlock is more efficient in a way that you can do much more with it. the latch works fine for a single result, but it gets more tricky if you are iterating over the results in a blocking fashion. Also, it’s idiomatic RxJava code. You don’t need to stick to it but it’s recommended.

ob,I see ,Thank you .

I want to ask a question:

when I deal with the node failure excpetion in coding, and while using .toBlock() to retrive the data without timeout,i cannot utilize the retry builder, but how can i catch the excpetion to retry the CRUD operation.

hi,

.retryWhen(anyOf(Exception.class).max(4).delay(Delay.exponential(TimeUnit.SECONDS))));

this i cannot find the api, where is it .

You need to use at least 2.1.2 to have it available.