Parallel read and write operation not working

java
#1

Hi, I have two application server instances running and one of them tries to read from the bucket and the other one tries to write data into the bucket. Each operation, when run on it’s own, runs without causing any trouble. But the moment I try to run both the operations, only write operation works and read gets halted and resumes only when the write operation gets finished (I write data into the bucket for 2 minutes). I tried running the same code twice in my local system and the program was able to run the two functions in parallel. Note: I am running the whole program in parallel and I am not using threads to start off the two functions.

Is there any specific reason why the programs don’t run in parallel in the virtual machine instances? Also, is there a reason why write operation takes preference over the read while running the programs in parallel? Please let me know if I need to provide more details.

Here is the bucket analysis when read operation is done alone.

And here is the analysis when the write operation is done separately.

#2

hi @nagaraj.irock
could you share a bit of code? especially your “write” workload code?

#4

@simonbasle yes, I will share the code.

 private static void sequentialInsertUsingThreads(
            ExecutorService executorService) throws Exception {
        Integer nThreads = config.getThreadCount();
        for (int i = 0; i < nThreads; i++)
            try {
                executorService.execute(new InsertSequentialThreadRunner());
            } catch (RejectedExecutionException e) {
                e.printStackTrace();
            }
    }


public class InsertSequentialThreadRunner implements Runnable {

    Kryo kryo = Utilities.getSerializer();
    private AtomicInteger counter = new AtomicInteger(0) ;
    private AtomicInteger exceptionCount = new AtomicInteger(0);
    private final int TIME_IN_MINUTES = 10*60*1000;
    public void run() {
        long st = System.currentTimeMillis();
        while (true) {

            try {
                LearningCouchbaseView.Create(); //There is a function create which will create a document  and insert it into the database.
                counter.incrementAndGet();
                //Thread.sleep(10);
                if(System.currentTimeMillis()-st > TIME_IN_MINUTES){
                    break;
                }
            } catch (Exception e) {
                exceptionCount.incrementAndGet();
                e.printStackTrace();
            }
        }
        
        System.out.println("Final Insert count: " + counter);
        System.out.println("Final Exception count: " + exceptionCount);

    }
}
#5

Ok, can you also share the code that gets data from couchbase and the one that inserts into Couchbase?

My guess is that you are using views to get the data, and you’ve configured your view queries with Stale.FALSE. This instructs the view to wait for any insertion to be indexed before it can return a result, so then writes take priority and reads only occur once all writes have stopped (at least long enough for the view to catch up).

#6

@simonbasle Yes, your guess is correct. But the performance has drastically reduced from 25k-30k ops/sec to almost around 7k ops/sec, since I remove Stale.FALSE parameter. I will anyway give you the read function. You can have look at it and suggest me ways to improve it.

private static void getKeysAndRead(ExecutorService executorService)
                    throws Exception {
           BinaryBO theBo = new BinaryBO(config.getBucketName());
           Set<Future<List<BinaryDocument>>> totalDocumentsFutures = new   HashSet<Future<List<BinaryDocument>>>();
                
                while (counter <= docsInBucket) { //docsInBucket is the number of documents in the bucket

                    if (docsInBucket - counter < nDocs) {
                        nDocs = docsInBucket - counter;
                        keys = theBo.getKeys(lastKey, nDocs);
                        List<Customer> documents = theBo.bulkReadUsingKeys(keys);
                        counter += documents.size();
                    }
                    else
                    {
                        keys = theBo.getKeys(lastKey, nDocs);
                        int keysUsed = 0;
                        int batchNumber = 0;
                        while (keysUsed < keys.size()-1) {

                            List<String> threadKeys = getThreadKeys(batchNumber++,
                                    config.getBatchSize(), keys);
                            @SuppressWarnings("unchecked")
                            Future<List<BinaryDocument>> documents = executorService
                                    .submit(new BulkReadBinaryDocumentsRunner(threadKeys));

                            totalDocumentsFutures.add(documents);
                            keysUsed += threadKeys.size();
                        }

                    }

                    for (Future<List<BinaryDocument>> doc : totalDocumentsFutures) {
                        counter += doc.get().size();
                        for(BinaryDocument d : doc.get()) {
                            d.content().release();

                        }
                        doc.get().clear();
                    }

                    totalDocumentsFutures.clear();
                    lastKey = keys.get(keys.size() - 1);
                    System.out.println("Total Size: " + counter);
                    if(counter >= docsInBucket){
                        break;
                    }
                }    
            }
            keys.clear();
        }

This is the function which determines the keys for the threads.

    private static List<String> getThreadKeys(int number, Integer batchSize,
                        List<String> keys) {

                    int startIndex = number * batchSize;
                    int endIndex = (number + 1) * batchSize;
                    if(startIndex > keys.size()-1){
                        startIndex = keys.size()-1;
                    }
                    if (endIndex > keys.size()) {
                        endIndex = keys.size() - 1;
                    }
                    return keys.subList(startIndex, endIndex);
                }

This function is to deserialize once the documents have been fetched from the database.

 public ArrayList<Customer> bulkReadUsingKeys(List<String> keys) throws Exception {
                    
                    List<BinaryDocument> documents = theDao.readBinaryDocumentsUsingKeys(keys);
                    ArrayList<Customer> customers = new ArrayList<Customer>();

                    ByteBuffer buffer = null;
                    for(int i = 0; i<documents.size(); i++){
                        try{
                            buffer = documents.get(i).content().nioBuffer();
                            Customer cust = (Customer) Utilities.byteBufferToJavaObject(Utilities.getSerializer(), buffer, new String("com.ht.TestingWithMaven.Customer"));
                            customers.add(cust);
                            buffer.clear();
                            
                            for(BinaryDocument d : documents) {
                                d.content().release();

                            }
                            documents.clear();
                        }
                        catch(DecoderException e){
                            System.exit(0);
                        }
                        catch(Exception e) {
                            e.printStackTrace();
                        }
                    }
                    return customers;
                    
                }

This function actually fetches the data from the database and returns the data to the function which is given above.

 public List<BinaryDocument> readBinaryDocumentsUsingKeys(Collection<String> keys) throws DecoderException{
                try{
                    return Observable
                            .from(keys)
                            .flatMap(new Func1<String, Observable<BinaryDocument>>(){
                                public Observable<BinaryDocument> call(String id){
                                    return theBucket.async().get(id, BinaryDocument.class)
                                            .retryWhen(RetryBuilder
                                            .anyOf(BackpressureException.class)
                                            .delay(Delay.exponential(TimeUnit.MILLISECONDS, 100))
                                            .max(10)
                                            .build()
                                    );

                                }
                            })
                            .toList()
                            .toBlocking()
                            .single();
                }
                catch(DecoderException e){
                    return null;
                }
            }

Here is the getKeys() function

public Iterable<ViewRow> getKeys(String lastKey, Integer nDocs) {
        
        
        // TODO Auto-generated method stub
        if(lastKey.isEmpty())
        {
            ViewQuery query = ViewQuery.from("Couchbase", "numDocs")
                .stale(Stale.TRUE)
                .reduce(false)
                .limit(nDocs);
                
            ViewResult result = theBucket.query(query);
        
            return result;
        }
        else{
            ViewQuery query = ViewQuery.from("Couchbase", "numDocs")
                               .stale(Stale.TRUE)
                               .reduce(false)
                               .limit(nDocs)
                               .startKey(lastKey)
                               .skip(1);
            ViewResult result = theBucket.query(query);
            return result;
        }
    }

Basically what I do is, get a bunch of keys (say 10000 at once) and then split them among different threads (if I have 5 threads, each thread gets 2000 keys) and they read the documents using their respective keys. By this way I try to read the entire bucket. I don’t know if this efficient, but performance wise, the read operation works better than the ones by which I tried to read the entire bucket.

#7

At first glance, don’t see anything blatantly wrong with that code. The fact that you get worst performance when using Stale.TRUE instead of Stale.FALSE is weird though, as the former should put less pressure on your workload (basically let the view catch up when it can)…

#8

@simonbasle But there is a decrease in the performance. Anyway, I am able to run the programs in parallel in perform both the operations simultaneously now after the suggested change. Maybe I should spawn another thread which tries to get the keys in parallel to that of the threads that are trying to read the documents. That might improve the performance, but it wouldn’t answer why after changing from Stale.FALSE to Stale.TRUE there is a dip in the performance.

#9

@simonbasle Is it natural that when both the operations (read and write) occur in parallel, there should be a decrease in the number of reads per second? Write is not affected at all. Only the performance of the read has become low. Or is it possible to get the same performance when running in parallel also?