The problem of bulk get

while using RXjava to do the bulk-get parrallel operation, one of the operations will abort all the get operation with seeing an exception.

@xiger can you please show your code and what exception you are seeing?

final CountDownLatch latTest2 = new CountDownLatch(1);
          Observable.just(1,2,3,4)
           .flatMap(new Func1<Integer, Observable<Integer>>(){
              @Override
              public Observable<Integer> call(Integer t1) {
                // TODO Auto-generated method stub
                  
                //return Observable.just(t1);
                  if(t1==2)
                      return Observable.error(new IllegalArgumentException("非法参数"));
                  else 
                      return Observable.just(t1);
             }
          })
          .onExceptionResumeNext(Observable.just(22))
          .subscribe(new Subscriber<Integer>(){
              @Override
                public void onNext(Integer t) {
                    System.out.println("提交的值:-->"+t);
                }
                  @Override
                public void onError(Throwable e) {
                      latTest2.countDown();
                      System.out.println("error:"+e);
                      
                }
                  @Override
                public void onCompleted() {
                     latTest2.countDown();
                }
                  
              });
            
            try{
                 latTest2.await();
            }catch(Exception e){
                 Thread.currentThread().interrupt();
            }

The result is
提交的值:–>1
提交的值:–>22

why not
提交的值:–>1
提交的值:–>22
提交的值:–>3
提交的值:–>4

Because the contract of RxJava is that when an error happens, the Observable is ended. What you actually want (I see this is just an example) is to put the error handling directly on the couchbase operation itself. So if you do in a flatMap a bucket.get(), attached it directly on there so that the other ops are not affected.

I recently did a quick code sample for a bulk update, you can check it out here… that should give you an idea of how to make your sample work! https://gist.github.com/daschl/4ce7a375a3aefd170b50

ok , Thank you very much!

Can you help check some code ? thx ahead.

/**
     * 插入操作[]
     * 
     * @param doc
     * @return
     * @throws Exception 
     */
    public <T extends Serializable>  boolean  add(Document<T> doc) throws Exception {
        boolean flag = true;
        
          try{
              
                int retryNum=0;
                HandleStatus  handleStatus;
                for(;;){
                      
                     final AtomicReference<Throwable> returnException = new AtomicReference<Throwable>();
                      final AtomicReference<Document<T>>  returnValue = new AtomicReference<Document<T>>();
                       
                     System.out.println("test===>start");
                       
                     handleStatus = internalProcess(client.async().insert(doc, PersistTo.ONE)
                              .single(), 
                              returnException, returnValue);
                       
                      
                        if(handleStatus.equals(HandleStatus.CONTINUE)) {
                            continue;
                        }else if(handleStatus.equals(HandleStatus.EXCEPTION)) {
                            
                            if(returnException.get()!=null) {
                                if(DocumentAlreadyExistsException.class.isAssignableFrom(returnException.get().getClass())){
                                     //return false;
                                    flag=false;
                                } 
                                
                                /*else if(RequestTooBigException.class.isAssignableFrom(returnException.get().getClass())){
                                     throw  new RequestTooBigException(returnException.get().getMessage());
                                } else if(CouchbaseOutOfMemoryException.class.isAssignableFrom(returnException.get().getClass())){
                                      throw new CouchbaseOutOfMemoryException(returnException.get().getMessage());
                               }else if(CouchbaseException.class.isAssignableFrom(returnException.get().getClass())){
                               }*/
                                else {
                                       throw  new Exception (returnException.get());
                                  }
                          }
                       }else{
                           
                       }
                      break;
                  }
                
                System.out.println("test===>end");
            } catch (Exception e) {
                e.printStackTrace();
                throw e;
            }
       
         return  flag;
    } 

private <T extends Serializable> HandleStatus  internalProcess(Observable<Document<T>> observable, 
            final AtomicReference<Throwable> returnException, final AtomicReference<Document<T>>  returnValue) 
            throws Exception {
           
           // final AtomicReference<Throwable> returnException = new AtomicReference<Throwable>();
              // final AtomicReference<Document<?>>  returnValue = new AtomicReference<Document<?>>();
           final CountDownLatch cdl = new CountDownLatch(1);
           
           observable
           
           /*.retryWhen(new Func1<Observable<? extends Throwable>, ? extends Observable<?>>(){
            
            public  Observable<?> call(Observable<? extends Throwable> t1){
            
             return t1.zipWith(Observable.range(1, 2), new Func2<Throwable, Integer, Throwable>(){
                @Override
                public Throwable call(Throwable t1, Throwable t2) {
                    //
                    //if(RuntimeException.class.isAssignableFrom(t1.getClass())){
                        return t1;
                    //}
                    
                }
            })
             .flatMap(new Func1<Throwable, Observable<?>>(){
                @Override
                public Observable<?> call(Throwable t1) {
                    
                    if(RuntimeException.class.isAssignableFrom(t1.getClass())){
                        return Observable.error(t1);
                    }
                     //
                     
                    //return client.async().insert(doc, ReplicateTo.ONE);
                    return Observable.timer(30/1, TimeUnit.SECONDS);
                }
             });
         }
        })
        .toBlocking()
         //.single()
        .forEach(new Action1<JsonDocument>(){
           
           @Override
           public void call(JsonDocument t) {
           }
          });
         */
          
         .subscribe(new Subscriber<Document<T>>() {
                @Override
                public void onCompleted() {
                     cdl.countDown();
                }
                
                @Override
                public void onError(Throwable e) {
                      returnException.set(e);
                      cdl.countDown();
                      System.out.println("onerror===" + e.getMessage());
                      e.printStackTrace();
                }
                
                @Override
                public void onNext(Document<T> t) {
                    if(t.cas()>0){
                         System.out.println("插入成功.");
                    }
                    returnValue.set(t);
                }
               });
               
               
               try{
                     cdl.await();
               }catch (InterruptedException e) {
                   Thread.currentThread().interrupt();
                   throw new RuntimeException("Interrupted while waiting for subscription to complete.", e);
               }
               
               
              if(returnException.get()!=null) {
                  
                 if(RequestCancelledException.class.isAssignableFrom(returnException.get().getClass()) 
                         //|| RequestCancelledException.class.isAssignableFrom(returnException.get().getCause().getClass())
                 )
                 {
                       //retryNum++;
                       try{
                            Thread.sleep(31000);
                       } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        }
                      System.out.println("test===>retry");
                      return HandleStatus.CONTINUE;
                } else if(TemporaryFailureException.class.isAssignableFrom(returnException.get().getClass())
                         || BackpressureException.class.isAssignableFrom(returnException.get().getClass())){
                       //retryNum++;
                      try{
                           Thread.sleep(100);
                      } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                      }
                     return HandleStatus.CONTINUE;
                }else {
                     //throw  new Exception (returnException.get());
                     return HandleStatus.EXCEPTION;    
                }
            }
             return HandleStatus.RETURN;
     }
     
     
     public enum  HandleStatus{
        
        CONTINUE(1)
        ,
        BREAK(2)
        ,
        EXCEPTION(3)
        ,
        RETURN(4)
        ,
        NOP(0)
        ;
        
        private int value=0;
        HandleStatus(int value){
            this.value=value;
        }
        
        public static int translateValue(HandleStatus status){
             int value=0;
            
            switch(status){
                case CONTINUE: value=1; break;
                case BREAK: value=2;  break;
                case RETURN: value=3; break;
                default:
                    break;
            }
            
            
            return value;
        }
        
        public int getValue(){
            return this.value;
        }
        
        
    }

What do you think of it?

So,
You want to do a get, detect some exceptions and retry with different delays depending on the exception (RequestCancelled = 31s, TemporaryFailure or Backpressure = 100ms, others = no retry)?

I see you have at some point tried to use the RxJava retry primitives. They can be a bit hard to put in place, but the good news is that since SDK 2.1.2 there’s a RetryBuilder utility that can help you do that.

This, along with a call to toBlocking, could vastly simplify your code: no more having to store return codes, exceptions and values in AtomicReferences, no more CountDownLatches, etc…

So instead I would do something more along the line of:

public <T extends Serializable>  boolean  add(Document<T> doc) throws Exception {
  boolean flag = false;
  try {
    System.out.println("test===>start");

    Document<T> inserted = 
      Observable.from(doc)
      .flatMap(doc -> client.async().insert(doc, PersistTo.ONE))
      .retryWhen(RetryBuilder
        .anyOf(RequestCancelledException.class)
        .delay(Delay.fixed(31, TimeUnit.SECONDS)
        .max(100)
        .build())
      .retryWhen(RetryBuilder
        .anyOf(TemporaryFailureException.class, BackpressureException.class)
        .delay(Delay.fixed(100, TimeUnit.MILLISECONDS)
        .max(Integer.MAX_VALUE) //let it retry as long as it can
        .build())
      //some kind of global timeout here is probably a good idea
      //.timeout(10, TimeUnit.MINUTES)
      .toBlocking() //this replaces the CountDownLatch
      .single(); //this after a toBlocking expects one and only one value emitted and returns said value

    System.out.println("Successfully inserted " + inserted);
    flag = true;
    System.out.println("test===>end");
  //this catch block happens when there was retries but the errors kept coming  
  } catch (CannotRetryException e) {
    System.err.println(e.getMessage + ", due to " + e.getCause());
    e.printStackTrace();
    flag = false;
  } catch (Exception e) {
   //here I'm not sure, you may want to throw for unexpected exceptions and return false for the expected ones?
    e.printStackTrace();
    flag = false;
  }
  return flag;
}

Thanks!

others : I think it is fatal, so I throw it ,for example: objectbig, outofmemory,and couchbaseException

What do you think of this?

Your call, but yeah it makes sense I think. So you’d want to change the last block’s flag = false; to throw e;.

The retryWhen wont let the expected exceptions get out (temporaryfailure, backpressure and requestcancelled) but if they keep happening beyond their respective max retries, a CannotRetryException with the original exception as a cause will get thrown => this returns false.

If the operation ends up succeeding (possibly after retries) this returns true.

If an unexpected exception occurs, the modification above will cause the exception to be thrown by add method.

But surprisingly, according to the code above, i have met with the data loss. 1000 loaded, but 999 stored.
But the log show that all items have been inserted.

Hi,

Does the requestcancelledexception only occur at the node down ? If so , the 30s can be enough for the failover.
And you think ?

the RequestCancelledException is most probably symptomatic of a node down and not yet failed over, yes.

I see that you have posted several messages, so let’s keep the discussion on these 1000 items in Data loss and writing data slowly and frequent timeout after auto failover, I’ll reference the code sample there.

please keep this discussion on the other topic