Couchbase Java Client Sync Vs Async blocking Vs Async Non-Blocking

Hi there.
I was assigned the task of testing CouchBase and the Java Client to see if I fits the needs of our company.
During the evaluation with the java client, I saw that Rx Observables are recommended to make async calls and interact with couchbase server.
The case is that were are inserting 1M registers from a file using a Blocking Queue and several Threads. We tried doing this using the sync client and the async client (using both, Blocking and non blocking operations).
At the same time we are reading the values from a copy of the file, in sequential order in a single thread.
I was surprised to see that using the Async non-blocking methods take a lot more time than with the Async blocking method and the Sync Methods.

The main question is:
Why is the Async non-blocking approach taking much more time than the other two?
Which approach is supposed to be faster?
Are there any suggestions to minimize the time in the Asyn Non-blocking approach?

The Sync method returns me some not found keys, which I think is the expected result since we are writing and reading at almost the same time.
Our test environment is composed by 2 Centos Servers running couchbase 3.0.1, cluster replicated in both servers, default couchbase bucket. Couchbase Java Client 2.0.1
Any input will be really appreciated.
Cheers,
Ivan

Hi @isanchez,

Normally, fully asynchronous flows are much quicker than blocking operations, but of course they need to be handled differently. If you have some code to look at, I’m happy to suggest improvements.

The reason why async flows are much faster is that they allow you for better resource utilization (cpu, IO,…). For one example, see the docs on async batching: http://docs.couchbase.com/developer/java-2.0/documents-bulk.html the same workload one sync and one async much better utilizes the underling resources. We’ve put lots of effort to make this happen, and to some degree you also benefit in a synchronous context when you have a thread pool firing at it.

Looking at the overall architecture of the core module, check out this slide: https://speakerdeck.com/daschl/building-a-reactive-database-driver-on-the-jvm?slide=34 There is a RingBuffer sitting right at the beginning of the request flow and allows to decouple the publishers from the consumers, giving us nice batching effects and reduce contention. If you want to learn more about this, read here: http://mechanical-sympathy.blogspot.co.at/2011/10/smart-batching.html

Can you share some code that I can run and suggest improvements?

1 Like

Thanks @daschl for your quick reply:

This is my code for async calls:

For saving objects:

//Newer method, inserts object and generates ID via counter
	public boolean save(DummyObject d)
	{
		boolean saved = false;
		
		//First lets create the Json Object
		JsonObject jo = JsonObject.empty();
			//jo.put("id", d.getId());
			jo.put("answer", d.getAnswer());
			jo.put("foodPreference", d.getFoodPreference());
			jo.put("accountBalance", d.getAccountBalance());
			jo.put("name", d.getName());
			
		try{
			bucket
			.counter("counter", 1)
			.map(new Func1<JsonLongDocument,Long>() {
				@Override
				public Long call(JsonLongDocument arg0) {
					// TODO Auto-generated method stub
					long newCounter = arg0.content();
					return newCounter;
				}
		    })
			.subscribe(new Action1<Long>() {
				@Override
				public void call(Long arg0) {
					// TODO Auto-generated method stub
					System.out.println("Current Index: " + arg0);
					newCounter = arg0;
				}
		        } );

			//Compose the key
			String newKeyString = "doc" + newCounter;

			//Persist
			JsonDocument jd = JsonDocument.create(newKeyString,jo);
			bucket.upsert(jd);
			saved = true;
		}catch(Exception ex)
		{
			System.err.println(ex.toString());
		}
		return saved;
	} 

This is for querying:

	//To retrieve objects from DB non async and non blocking
public DummyObject queryDoc(String key)
{
	try{
	
	bucket
	.get(key)
	.map(new Func1<JsonDocument,DummyObject>() {
        @Override
        public DummyObject call(final JsonDocument loaded) {
        	//JsonDocument jd = bucket.get(key);
			String objInJsonString = loaded.content().toString();
			g = new Gson();
			DummyObject d = g.fromJson(objInJsonString, DummyObject.class);
            return d;
        }
    })
	.subscribe(new Action1<DummyObject>() {
		@Override
		public void call(DummyObject d) {
			// TODO Auto-generated method stub
			System.out.println(d.getName());
			tempDummyObject = d;
		}
        } );
	
	}catch(Exception ex)
	{
		System.err.println(ex.toString());
		ex.printStackTrace();
		tempDummyObject = null;
	}
	return tempDummyObject;		
}

Actually, I looks like you are mixing sync and async calls here (and leaking scope). I think you want it more something like this for storing:

try {
    bucket
        .async()
        .counter("counter", 1)
        .flatMap(new Func1<JsonLongDocument, Observable<?>>() {
            @Override
            public Observable<?> call(JsonLongDocument counterRes) {
                String id = "doc" + counterRes.content();
                JsonObject jo = JsonObject.empty(); // your data here
                return bucket.async().upsert(JsonDocument.create(id, jo));
            }
        })
        .timeout(1, TimeUnit.SECONDS) // don't forget timeout
        .toBlocking()
        .single();
} catch(Exception ex) {
    ex.printStackTrace();
}

Also note on the querying part you are also mixing sync and async which will inevitably get you into race condition and not wanted behavior. I recommend an approach like this (or you can also use latches - see the docs for this). Also note that if you already use GSON you can avoid parsing and using the RawJsonDocument:

Cluster cluster = CouchbaseCluster.create();
final Bucket bucket = cluster.openBucket();

String id = "foo";
final Gson gson = new Gson();

DummyObject result = bucket
    .async()
    .get(id, RawJsonDocument.class)
    .map(new Func1<RawJsonDocument, DummyObject>() {
        @Override
        public DummyObject call(RawJsonDocument doc) {
            return gson.fromJson(doc.content(), DummyObject.class);
        }
    })
    .timeout(1, TimeUnit.SECONDS) // don't forget the timeout
    .toBlocking()
    .single();
1 Like

Thanks @daschl.
I was aware of the leaking scope, which is a bad practice of course, but that I “planned” to solve later.
This example is really helpful, I am new to Rx so this is really useful.
Still one question remains:

  • Is using toBlocking preventing me from getting the full benefits from the async API?

Thanks for the RawJsonDocument tip, I will use Json for that.
Such great support.
I will let you know if the performance improves. =)

hey @isanchez,
I’ll try to build on michael’s answer and address your last question.

As was said, the point of going async is to have as few wasted time and resources as possible. When you do synchronous query and response, you are effectively doing nothing between firing the request and receiving a response.
In async mode, this chunk of time can be used to do further processing, for example reacting to the response of a previous asynchronous query.

Using the basic sync operations directly on the Bucket is equivalent to the first situation. Fully using the AsyncBucket obtained via Bucket.async() is equivalent to the second situation.

In fact, the simple sync bucket uses the AsyncBucket under the hood, blocking on each operation.

Just doing a toBlocking().last() (for instance) at the end of an async stream with flatmaps and multiple key processing, etc… should only be slightly underperforming, because the multiple requests are still processed asynchronously on the io thread. There’s still a price to pay, but chances are your application is not fully made to be asynchronous so it can be a good compromise.

If you are still designing the app, going fully reactive would rip the most benefits. You can have a look at the Reactive Manifesto : www.reactivemanifesto.org. It describes an approach for highly efficient and resilient applications, and RxJava fits well into it.

Main takeaway point though: never mix blocking behavior inside an asynchronous processing stream (be it either calls to the sdk blocking API or any other long running call). This will lead to race conditions and unexpected errors.

I hope you enjoy coding with RxJava and the new SDK!
Simon

1 Like

Thanks @simonbasle & @daschl ,
I tried several configurations, with your advice I manage to overcome some issues and get a better performance.
I wanted to ask, what is an acceptable rate of operation per second in Couchbase, specifically number of gets per second and puts per second while using CouchBase 3.0 with the Java client 2.0.1.

I think I might have a bottleneck issue in my client, since I am using async requests and get measurements around 50 put/s and 20 get/s.

Just asking about what is expected since I found online that Couchbase (given the right conditions) is able to handle up to 350k requests/second. (http://blog.bigstep.com/big-data-performance/nosql-performance-benchmarks-series-couchbase/)
My results are far from this (by too much), so there is something obviously wrong about my implementation or Hardware.

Some info about my CouchBase cluster:
CouchBase Servers:
Amazon type instances: m3.2xlarge (4 CPUs, 2048 MB Ram, 15 GB disk)
OS: Centos 2.6.32-431.el6.x86_64

Environment:
3 CouchBase nodes (3 dual core processors).
Disk Structure: Undetermined.
8 GB Total Ram.
Bucket Name: Serialized
Bucket Type: Couchbase
DocumentType: SerializableDocument
1024 MB per bucket per node
2 replicas in Serialized bucket

Client:
Java 7
CouchBase Java Client 2.0.1
Object type: Serialized Pojos.

Number of Items to write: 1M
Number or Items to read: 1M (same items in order, 1 single Thread)

Read Threads:
1 Reading thread non blocking Async

Write Threads:
5 Writing threads non blocking Async (using a blocking queue)

Failover time: 120 seconds (automatic)
Rebalance Time: Off

Average Statistics:
Ops per second: 53
Writes per second: 17
Read per second: 20

Thanks!

@isanchez yes, your numbers seem “odd”. Thousands of ops/s are aways possible. Can you please share your code so we can take a look and provide input?

Sorry @daschl,
I was on Holiday, thanks for all the help, unfortunately because of company policies I cannot upload any code made within the company, even test code (not very good policy I think).

Anyways, I finally found the bottleneck, it was IO on the machine running the client.

My Log configuration in Log4J was writing all events, so that was slowing down the whole process.
Now results are in the order of thousands.

Read Threads:
1 Reading thread non blocking Async

Write Threads:
100 Writing threads non blocking Async (using a blocking queue)

Failover time: 120 seconds (automatic)
Rebalance Time: Off

Average Statistics:
Ops per second: around 6K
Writes per second: 4520
Read per second: 1147

Thanks for all your help.

@isanchez glad you found it! Let us know if you need more help.

Hi Daschl,

I am running out of memory in an application , I am using operations as below :

Single get operation :

LegacyDocument document = bucket.get(key, LegacyDocument.class,1000,TimeUnit.MILLISECONDS);
if(document != null){
    value = (String)document .content();
}

Single Put Operation:

protected void storeToCouchbase(String key, int ttl,String value){
    LegacyDocument document = LegacyDocument.create(key, ttl, value);
    client.upsert(document);
}

Multi Get Operation :

     protected Map<String, Object> asyncBulkGet(final Collection<String> ids, final Bucket bucket) {
            Map<String, Object> responseMap = null;
            try{
                responseMap =  Observable
                        .from(ids)
                        .flatMap(new Func1<String, Observable<LegacyDocument>>() {
                            @Override
                            public Observable<LegacyDocument> call(String id) {
                                return bucket.async().get(id, LegacyDocument.class);
                            }
                        })
                        .toMap(new Func1<LegacyDocument, String>() {
                            @Override
                            public String call(LegacyDocument jsonDoc) {
                                if(jsonDoc != null){
                                    return jsonDoc.id();
                                }
                                return null;
                            }
                        },new Func1<LegacyDocument, Object>() {
                            @Override
                            public Object call(LegacyDocument jsonDoc) {
                                if(jsonDoc != null){
                                    return jsonDoc.content();
                                }
                                return null;
                            }
                        })
                        .toBlocking()
                        .single();
    
            }catch(Exception e){
    }
    return responseMap;
    }

I am using java-client 2.1.0 and couchbase server is 3.0.2

Can u please help me here and let me know what exactly I am doing wrong.

Thanks,
Bharat

@bkachhal I see nothing obviously wrong with what you’re doing…

Are you loading a very big number of documents in your bulk method? how large are the document?

OutOfMemory and the like would need further analysis on your side, with tools like JVisualVM or Java Mission Control (packed with the JVM, second one only for Oracle jdks) to see what kind of objects fill up the memory…

(also unless you’re seeing the exact same problem, as diagnosed throughout the thread, as original poster, please consider creating a new topic)

Thanks Simon.

I realized it and created the new topic for this Running Out of Memory in an application, Will surely take care of this in future posts.

Regarding Out Of memory , we switched to Java SDK 2.1.2 with buffer pool disabled and things started working fine.

Thanks,
Bharat

@bkachhal hmmm that is actually not the best thing, I mean if it works its okay but its not how it should be. Can you raise an issue here: http://www.couchbase.com/issues/browse/JCBC, maybe its a bug in the LegacyDocument. Would it be possible for you to simulate with a JsonDocument and see if you have the same issues?

Hello, could you please explain why do you use transformation to BlockingObservable in each example, if according to java-doc it is not recommended at all ?

@dmartyanov examples and snippets should usually be easily runnable. When you do such a snippet eg. in a main method, if you don’t block somehow then the asynchronous call will happen in a background thread and method execution will continue immediately… in effect terminating the program before anything could happen.

So it is not very interesting! From here you have two choices: either you sleep() or you block on your async call (toBlocking()). Sleeping is even less elegant, as it will potentially force the program to run for more time that it really needs (eg. the async code executes in 1ms but you sleep for 1000ms).

Hence the use of toBlocking() in various examples: if you copy paste that in a main() method and run it, something happens :slight_smile:

But yeah, to rip the most benefits from the asynchronous API, for a “normal” long-running application, you should avoid use of toBlocking() and try to make as much of the application code reactive as possible.