Upserting with async bucket on java sdk 2.7.15

Hello, I have the following code that upserts a collection of pojos( to be turned to json documents) into couchbase using Observables:

    import com.couchbase.client.core.BackpressureException;
    import com.couchbase.client.core.time.Delay;
    import com.couchbase.client.java.AsyncBucket;
    import com.couchbase.client.java.Bucket;
    import com.couchbase.client.java.document.JsonDocument;
    import com.couchbase.client.java.document.json.JsonObject;
    import com.couchbase.client.java.query.N1qlQuery;
    import com.couchbase.client.java.query.N1qlQueryResult;
    import com.couchbase.client.java.query.N1qlQueryRow;
    import com.couchbase.client.java.util.retry.RetryBuilder;
    import com.fasterxml.jackson.core.type.TypeReference;
    import com.fasterxml.jackson.databind.DeserializationFeature;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import com.google.common.base.Stopwatch;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.commons.lang3.StringUtils;
    import rx.Observable;

    import java.io.IOException;
    import java.util.Arrays;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicLong;
    import java.util.function.Function;
    import java.util.stream.Collectors;


    public <T> long  batchUpsert(Iterable<T> items, Function<T, JsonDocument> docCreator, Bucket couchbaseBucket) {

    AtomicLong counter = new AtomicLong();

    AsyncBucket asyncBucket = couchbaseBucket.async();
    Observable<JsonDocument> observableFromDocs =
            Observable
                    .from(items)
                    .map(elem -> docCreator.apply(elem))
                    .filter(elem -> elem!=null)//skip creating problematic docs. logging their info for troubleshooting
                    .flatMap(elem -> upsertDocument(elem, asyncBucket))
                    .retryWhen(
                            RetryBuilder.anyOf(BackpressureException.class, Exception.class)
                                    .doOnRetry((Integer integer, Throwable throwable, Long aLong, TimeUnit timeUnit) ->
                                            log.error("Retrying load. Attempt {} For exception {}", integer,throwable.toString())
                                    )
                                    .delay(Delay.exponential(TimeUnit.MILLISECONDS, RETRY_DELAY_CEILING))
                                    .max(MAX_RETRIES)
                                    .build()
                    );

    observableFromDocs.subscribe(
            (elem)-> {},
            elem -> log.error("Document insertion failure", elem),
            () -> {counter.incrementAndGet();log.debug("Completed ASYNC load ");});

    return counter.get();
}

  private Observable<JsonDocument> upsertDocument(JsonDocument document, AsyncBucket asyncBucket){
    Observable<JsonDocument> observableDoc = asyncBucket.upsert(document);
    return observableDoc;
}

This code runs fine, the documents get created and the async call is made to upsert but no document is upserted to couchbase, it fails silently, and no exceptions are logged, almost as if the thread dies internally, can someone kindly point what am I doing wrong? I am about to pull my hairs… :slight_smile:

I now verified it fails for collections of 1 item, and runs when there are more than one item in the iterable, can someone tell me why that’s happening?

EDIT
added requested code and imports

@deniswsrosa / @mreiche can you please assist ?

I assume your application exits right after the call to batchUpsert() and never has a chance to apply the upsert.
If you cluster.disconnect() the upsert will be applied. Or wait a few hundredths of a second.

  import com.couchbase.client.core.BackpressureException;
  import com.couchbase.client.core.time.Delay;
  import com.couchbase.client.java.AsyncBucket;
  import com.couchbase.client.java.Bucket;
  import com.couchbase.client.java.CouchbaseCluster;
  import com.couchbase.client.java.document.JsonDocument;
  
  import com.couchbase.client.java.document.json.JsonObject;
  import com.couchbase.client.java.util.retry.RetryBuilder;
  
  import rx.Observable;
  
  import java.util.ArrayList;
  import java.util.List;
  import java.util.concurrent.TimeUnit;
  import java.util.concurrent.atomic.AtomicLong;
  import java.util.function.Function;
  import java.util.logging.Logger;
  
  public class ObservableExample {
  
    Logger log = Logger.getLogger("mylogger");
    long RETRY_DELAY_CEILING = 1000;
    int MAX_RETRIES = 2;
    String connectionString = "localhost";
    String username = "Administrator";
    String password = "password";
    String bucketName = "travel-sample";
  
    CouchbaseCluster cluster;
    Bucket bucket;
    AtomicLong counter = new AtomicLong();
  
  
    private ObservableExample init() {
      cluster = CouchbaseCluster.create(connectionString);
      cluster.authenticate(username, password);
      bucket = cluster.openBucket(bucketName);
      return this;
    }
  
    public static void main(String[] args){
        ObservableExample obj = new ObservableExample().init();
        List<String> items=new ArrayList<>();
        items.add("one");
        obj.bucket.remove("one");
        obj.batchUpsert(items, obj::creator, obj.bucket);
        obj.cluster.disconnect();
        System.out.println("count: "+obj.counter.get());
        obj.init();
        System.out.println( obj.bucket.get("one"));
        obj.cluster.disconnect();
    }
  
    public JsonDocument creator(String s){
      JsonDocument jdoc = JsonDocument.create(s, JsonObject.create().put("a", "a"));
      return jdoc;
    }
  
    public <T> long  batchUpsert(Iterable<T> items, Function<T, JsonDocument> docCreator, Bucket couchbaseBucket) {
  
      AsyncBucket asyncBucket = couchbaseBucket.async();
      Observable<JsonDocument> observableFromDocs =
          Observable
              .from(items)
              .map(elem -> docCreator.apply(elem))
              .filter(elem -> elem!=null)//skip creating problematic docs. logging their info for troubleshooting
              .flatMap(elem -> upsertDocument(elem, asyncBucket))
              .retryWhen(
                  RetryBuilder.anyOf(BackpressureException.class, Exception.class)
                      .doOnRetry((Integer integer, Throwable throwable, Long aLong, TimeUnit timeUnit) ->
                          log.severe("Retrying load. Attempt {} For exception {} " + integer +" "+throwable.toString())
                      )
                      .delay(Delay.exponential(TimeUnit.MILLISECONDS, RETRY_DELAY_CEILING))
                      .max(MAX_RETRIES)
                      .build()
              );
  
      observableFromDocs.subscribe(
          (elem)-> {System.out.println("inserted");},
          elem -> log.severe("Document insertion failure " +elem),
          () -> {counter.incrementAndGet();log.fine("Completed ASYNC load ");});
      return counter.get();
    }
  
    private Observable<JsonDocument> upsertDocument(JsonDocument document, AsyncBucket asyncBucket){
      Observable<JsonDocument> observableDoc = asyncBucket.upsert(document);
      return observableDoc;
    }
  
  }
2 Likes

You can also use one of the blocking operators to wait for the Observable to finish: https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators.