Batch reading of BinaryDocuments

query
#1

Hi, I am performing batch read operation using multiple threads. I initially completed this operation when the type of the documents was JSON. I was able to deserialize my JSON document back to my user-defined object type.

I am now using Kryo to serialize and storing BinaryDocuments in the database. The overall process which I have to perform now is, get the BinaryDocuments from the database, convert them to ByteBuf and then convert ByteBuf to ByteBuffer and finally convert ByteBuffer object to my java object. I feel that this is not efficient. But I have to do this.

Below is the code for reading the documents as batches from the database.

public Iterable<ViewRow> readNUsers(Integer nDocs, int threadNo, int nThreads) {
        if(functionEntryCounter == 0){
            threadCounter = new int[nThreads];
            functionEntryCounter++;
        }
        
        int blockSize = nDocs * nThreads;
        threadCounter[threadNo]++;
        Integer skipValue = new Integer((threadNo*nDocs) + (threadCounter[threadNo] * blockSize) + 1);
        //System.out.println("Thread Counter for " + threadNo + ": " + threadCounter[threadNo]);
        ViewQuery query = ViewQuery.from("LCDD", "numDocs")
                          .stale(Stale.FALSE)
                          .limit((int) nDocs)
                          .reduce(false)
                          .skip(skipValue);
        ViewResult result = theBucket.query(query);
        return result;
 }

This works fine and I am returning ViewResult. I need to iterate these ViewResult objects and convert them to BinaryDocument objects. How to do that?

This is how I converted the ViewResult objects into JsonDocument.

  public List<Customer> readNUserInformation(Integer nDocs) {
                Iterable<ViewRow> result = theDao.readNUsers(nDocs);
                Gson gson = new GsonBuilder().create();
                ArrayList<Customer> customers = new ArrayList<Customer>();
                
                int counter = 0;
                for (ViewRow row : result) {
                    Customer cust = gson.fromJson(((ViewRow) row).document().content().toString(), Customer.class);
                    customers.add(cust);
                    counter++;
                    if(counter %1000 == 0)
                        System.out.println("Counter: "+ counter);
                }
                System.out.println("List size: "+ customers.size());
                return customers;
        }

So instead of creating an ArrayList<Customer>, I need to create a list of BinaryDocument. But I don’t know how to do it. If someone can suggest me a similar way of converting, it would be of great help to me.

#2

if indeed all the document ids returned by the view are stored as BinaryDocument, you can simply use the ViewRow.document(Class<? extends Document> targetClass) overload. I would argue that you want to perform the kryo deserialization and still return Customers (since before you were doing JSON deserialization):

public List<Customer> readNUserInformation(Integer nDocs) {
    Iterable<ViewRow> result = theDao.readNUsers(nDocs);
    ArrayList<Customer> customers = new ArrayList<Customer>();

    int counter = 0;
    for (ViewRow row : result) {
        //get the binary document
        BinaryDocument bDoc = row.document(BinaryDocument.class);
        //maybe check for nulls (eg. the view is stale and the doc has been deleted)
        if (bDoc != null) {
            //get the netty ByteBuf
            ByteBuf nettyBuffer = bDoc.content();
            try {
                //deserialize: get a ByteBuffer and use kryo
                Customer cust = deserializeUsingKryo(nettyBuffer, Customer.class);
                //prepare results
                customers.add(cust);
                counter++;
            } finally {
                if (nettyBuffer != null && nettyBuffer.refCnt() > 0) {
                    nettyBuffer.release();
                }
            }
            if(counter %1000 == 0)
                System.out.println("Counter: "+ counter);
        }
    }
    System.out.println("List size: "+ customers.size());
    return customers;
}

For an example how to get a NIO ByteBuffer out of a Netty ByteBuf, see this stackoverflow answer.

#3

Hi. Thanks for the reply. But my entire logic seems to be wrong. I am calculating the skipValue as new Integer((threadNo*nDocs) + (threadCounter[threadNo] * blockSize) + 1); in the public Iterable<ViewRow> readNUsers(Integer nDocs, int threadNo, int nThreads) method. This operation slows down the process. The threads take a lot of time to skip the values since the skipValue keeps increasing. I am now back to where I began. Stuck with reading the documents from the database. I still haven’t figured out the best way to read the entire bucket using multiple threads. The partitioning logic which I have entered works fine but is of no use. I have already implemented a method for reading the documents using a single thread. That uses startKey() and startKeyDocId() methods. But that won’t work with multiple threads.

Can you suggest me an optimal method to read documents in batches using multiple threads? @daschl suggested to see about buffer and delay in RxJava. I am new to Rx and I am not finding the right materials that will give me insights about RxJava and Couchbase Server.

#4

What is the reason you absolutely want to perform these operations in batches, in separate threads? Note the SDK uses its own threads for IO already.

You could use flatMap with the maxParallel variant that says “at a single point in time, only make maxParallel asynchronous calls” (in this case, to the DB).

One thing I forgot above that is very important is to release the ByteBuf once you’re done with it (I’ll edit my answer).

Here is an example using asynchronous processing and RxJava to retrieve BinaryDocuments and deserialize them on the fly, with a maximum amount of parallelism that will prevent things like BackpressureException:

//pretend class
public static class Customer { }

//pretend generic method to deserialize using kryo. assumes the ByteBuf is not released within it
public static <T> Customer deserializeUsingKryo(ByteBuf buf, Class<? extends T> clazz) { return new Customer(); }

public static Observable<Customer> readNUsers(int nDocs, int maxParallel) {
    ViewQuery query = ViewQuery.from("LCDD", "numDocs")
            .stale(Stale.FALSE)
            .limit(nDocs)
            .reduce(false);
    
    return bucket.async()
            .query(query) //async query
            .flatMap(AsyncViewResult::rows) //get the stream of rows
            .flatMap(row -> row.document(BinaryDocument.class) //get docs as Binary (1)
                    , maxParallel) //maximum N calls to the DB at a single time
            .map(binaryDoc -> {
                //if the view is stale and the doc has been deleted, since we're async
                //the observable (1) above will be empty -> this is not called
                
                //get the netty ByteBuf
                ByteBuf nettyBuffer = binaryDoc.content();
                try {
                    //deserialize: get a ByteBuffer and use kryo
                    Customer cust = deserializeUsingKryo(nettyBuffer, Customer.class);
                    return cust;
                } finally {
                    //always release the ByteBuf (this is your responsibility
                    //when using BinaryDocument)
                    if (nettyBuffer != null && nettyBuffer.refCnt() > 0) {
                        nettyBuffer.release();
                    }
                }
            });
}

public static List<Customer> getList(int nDocs, int maxParallel) {
    return readNUsers(nDocs, maxParallel) //get the users as a stream
            .toList() //gather them all in a List
            .toBlocking() //block until all have been collected
            .first(); //return the list instance (only element in the stream)
}

So you can just call getList(1000, 100) to retrieve the first 1000 user IDs (using the view’s limit) then fetching the corresponding Customers, at most 100 at a time.

#5

Hi. Once again, thank you so much for the reply. I should definitely improve my knowledge on RxJava. I am not able to solve an error that I am getting when I use this logic.

For the line
.flatMap(row -> row.document(BinaryDocument.class) , maxParallel), I am getting the following error.

   Multiple markers at this line
    - Type mismatch: cannot convert from Observable<BinaryDocument> to Observable<? extends U>
    - The method flatMap(Func1<? super AsyncViewRow,? extends Observable<? extends U>>, Func2<? super AsyncViewRow,? super U,? extends R>) in 
     the type Observable<AsyncViewRow> is not applicable for the arguments ((<no type> rows) -> {}, int)
    - The method flatMap(Func1<? super AsyncViewRow,? extends Observable<? extends U>>, Func2<? super AsyncViewRow,? super U,? extends R>) in 
     the type Observable<AsyncViewRow> is not applicable for the arguments ((<no type> row) -> {}, int)

Only if I understand the exact working of the reactive functions and Java 8 (Lambda functions) , I will be able to solve such problems on my own. I am not able to figure out a way to eliminate this error even though this is a common type mismatch error. I don’t feel comfortable asking this as a question, but I don’t know what to do to get this function working. Can you please help me?

#6

Often if you’re returning an Observable that is transformed like that via lambdas, it’s as simple as having forgotten to change the method return type to Observable<Customer>. Did you correctly do that?

Otherwise try to do the same using good old anonymous class then convert each one by on to a lambda to see where something went wrong :wink:

#7

Yes. The return type is Observable<Customer>. Ah Okay. I will try. Didn’t think reading the whole bunch of documents will turn out to be so difficult.