Reading documents from the Couchbase bucket as batches

I have a Couchbase cluster which has around 25M documents. I am able to read them sequentially and also I have a function that can read a specific number of documents from the database. But my use case is slightly different since I cannot store all the 25M documents (each document is huge) in memory.

I need to process the documents in batches, say 1M/batch, push that batch to my memory, (do some operation on those documents) and push the next batch.

The function which I have written to read specific number of documents doesn’t ensure that it returns a different set of documents when called again.

Is there a way by which I can complete this functionality? I also have a function which can create documents in batches. I am not sure if I can write a similar function that can read the documents in batches.

The function is given below.

public void createMultipleCustomerDocuments(String docId, Customer myCust, long numDocs) {

        Gson gson = new GsonBuilder().create();
        JsonObject content = JsonObject.fromJson(gson.toJson(myCust));
        JsonDocument document = JsonDocument.create(docId, content);
        jsonDocuments.add(document);
        documentCounter++;

        if (documentCounter == numDocs) {
            Observable.from(jsonDocuments).flatMap(new Func1<JsonDocument, Observable<JsonDocument>>() {
                public Observable<JsonDocument > call(final JsonDocument docToInsert) {
                    return (theBucket.async().upsert(docToInsert));
                }
            }).last().toBlocking().single();

            documentCounter = 0;
            //System.out.println("Batch counter: " + batchCounter++);

        }

Can someone please help me with this?

I’m not exactly sure how you want this to be solved. Isn’t it possible in your application layer to have counter or something similar that tracks the “range” of documents in the batch already loaded and/or those that are still missing?

The other way to do it potentially would be to use the available RxJava operations that allow you to delay and/or handle stuff in batches (for example look at buffer and delay).

Hi, I was able to read the documents as batches using .limit() and .skip() functions. I will also look into the RxJava operations. Thank you!

1 Like