How to do bulk get in 2.x java sdk

Hi,

What is the best way to do bulk get using the new 2.x java sdk.
We are using 1.x api bulkGet() in production but there is no direct counter part of this in 2.x api.

hi @shahzad_mughal,
cf the 2.x documentation, there’s a whole chapter on bulk pattern here: http://docs.couchbase.com/developer/java-2.1/documents-bulk.html.

PS: there’s now a quick chapter about migrating from 1.x to 2.x here. If you think it’d be worth it to mention the bulk pattern in there, please do tell (or please give any feedback you have on it, since you seem to be a prime audience for that page :slight_smile: )

Hi @simonbasle,

i did take a look on the bulk API chapter. But still having problem.
What i need as a return type of getBulk(String… keys) is Map<String,Object> but using the observable api i am getting Map<String,Observable<Object>> and when i am trying to convert that to my desired map its a performance hit. I am new to Observables kindly help. I am also attaching my sample code so that you can help me out if i am not using the appropriate API’s

     private Map<String,Object> getBulkV2_NewAPI(String ... keys){
            Collection<String> ids = Arrays.asList(keys); 
            long start = System.currentTimeMillis();
            Map<String,Observable<Object>> docsMap = Observable.from(ids)
                    .toMap(
                            new Func1<String, String>() {
    
                                @Override
                                public String call(String arg0) {
                                    String id=arg0;
                                    return id;
                                }
                            }, new Func1<String, Observable<Object>>() {
    
                                @Override
                                public Observable<Object> call(String arg0) {
                                    Observable<Object> data= bucket.async().get(arg0, LegacyDocument.class)
                                            .map(new Func1<LegacyDocument, Object>() {
    
                                                @Override
                                                public Object call(LegacyDocument t1) {
                                                    Object obj = t1.content();
                                                    return obj;
                                                }
                                            })
                                            
                                            .single() //this returns Observable<Object> if a try with .toBlocking() it will give Object only, but that too takes too much time
                                            ;
                                    return data;
                                }
                            }
                            )
                    .toBlocking()
                    .single();
            long end = System.currentTimeMillis();
            System.out.println("Time taken="+(end-start));
            System.out.println("v2 map size="+docsMap.size());
            
            start = System.currentTimeMillis();
            Map<String,Object> result = new HashMap<String, Object>();
            
            for(String k : docsMap.keySet()){
                Observable<Object> obr = docsMap.get(k);
                Object o = obr.toBlocking().single();
                result.put(k,o);
            }
            end = System.currentTimeMillis();
            System.out.println("Time taken="+(end-start));
            System.out.println("v2 map result size="+result.size());
            
            return result;
        
        }

I had to edit your message, was confused by the “Map>” that was shown (turns out not code-quoting Map<String, Observable<Object>> will do that, don’t forget backticks :wink: ).

RxJava can be a bit hard to get into, but that will come.
Here the problem is that you don’t apply the bulk pattern entirely correctly. You tried to go straight to building the map, where you should have applied bulk pattern (Observable.from(ids) immediately followed by a flatMap that calls bucket.async().get(...)).

Also, getting a Observable<T> instead of a T is usually symptomatic that a flatMap may be missing somewhere.

Here is the code I propose:

Observable<Map<String, Object>> asyncResult = Observable.from(ids)
    .flatMap(new Func1<String, Observable<LegacyDocument>>() {
      @Override
      public Observable<LegacyDocument> call(String id) {
        return bucket.async().get(id, LegacyDocument.class);
      }
    })
    .toMap(new Func1<LegacyDocument, String>() {
             @Override
             public String call(LegacyDocument legacyDocument) {
               return legacyDocument.id();
             }
           },
        new Func1<LegacyDocument, Object>() {
          @Override
          public Object call(LegacyDocument legacyDocument) {
            return legacyDocument.content();
          }
        });

Map<String, Object> syncResult = asyncResult.toBlocking().single();
return syncResult;

So this code will asynchronously go into bulk get, retrieve each document (I’m assuming you’re using LegacyDocument because that’s some data that is persisted by the previous version of the SDK?).

After that, it will asynchronously collect each document into a Map, selecting its id as the key and its content as the value.

Finally, we revert to blocking, waiting for the whole map collection to finish. The toMap explicitly says it will only emit one item so we can safely call single() to obtain it in a blocking fashion.

:bulb: Note: single() will throw if there is 0 element or more than 1. Use first() if you expect that the “more than one” case may occur and is legit, but just want to keep the first to arrive.

Thanks for your reply, it really helped.

But i am still having the same problem, the response time of the code you shared is way more than the response time of the 1.x API for get bulk. I am testing it with 5000 keys, sharing both the 1.x and 2.x codes.

Time difference:

Method Name                        Return Type                        Time taken (millis)  
getBulkForumCodeV2                 Map<String, Object>                703
getBulkV1                          Map<String, Object>                419

// 1.x API code to do the get bulk

    public Map< String, Object > getBulkV1(String ...keys )throws Exception{
        long start = System.currentTimeMillis();
        BulkFuture< Map< String, Object > > future = cbClient.asyncGetBulk(keys);
        Map< String, Object > pairs = future.get();
        long end = System.currentTimeMillis();
        
        System.out.printf("%-35s%-35s%d","getBulkV1","Map< String, Object >",(end-start));
        System.out.println();
        return pairs;
    }

// 2.x API code to do the get bulk

    protected Map< String, Object > getBulkForumCodeV2(final Collection< String > ids) {
        long start = System.currentTimeMillis();
        Observable< Map< String, Object > > asyncResult = Observable.from(ids)
                .flatMap(new Func1< String, Observable< LegacyDocument > >() {
                  @Override
                  public Observable< LegacyDocument > call(String id) {
                    return bucket.async().get(id, LegacyDocument.class);
                  }
                })
                .toMap(new Func1< LegacyDocument, String >() {
                         @Override
                         public String call(LegacyDocument legacyDocument) {
                           return legacyDocument.id();
                         }
                       },
                    new Func1< LegacyDocument, Object >() {
                      @Override
                      public Object call(LegacyDocument legacyDocument) {
                        return legacyDocument.content();
                      }
                    })
                    ;
            Map< String, Object > syncResult = asyncResult.toBlocking().single();

            long end = System.currentTimeMillis();
            System.out.printf("%-35s%-35s%d","getBulkForumCodeV2","Map< String, Object >",(end-start));
            System.out.println();
            return syncResult;
    }

Update on this. The new API for getBulk is working fine, i mean the code you shared. Yesterday i tried it with couchbase instance running on my local dev machine. Today i run the code against dedicated couchbase instance and benchmarks are good, even better than 1.x API sometimes. I think we are good with that, thanks for all your support.

awesome :smiley: glad it was resolved for you :wink: