Batch Reading of JsonDocuments: BackpressureException

query
java

#1

Hello,

I am trying to upgrade an existing Java SDK 1.4 code to Java SDK 2.3. I read a particular view with its documents, do some operations on each row and then proceed to the next row. I get the BackpressureException and after some research I found out that an asynchronous operation would be good. I am pretty new to Rx Java hence wanted some guidance in that aspect. my code currently does the following steps:

ViewResult result = client.query(ViewQuery
                				.from(DESIGN_DOC, SORT_VIEW)
                				.stale(Stale.TRUE)
                				.includeDocs(true));
int index = 0;
Iterator<ViewRow> rowIterator = result.rows();
while(rowIterator.hasNext())
{
    ViewRow viewRow = rowIterator.next();
    Abcd a = gson.fromJson(viewRow.document().toString(), Abcd.class);
    AbcdListing aListing = new AbcdListing(a);
    aListing.set_id(viewRow.id().substring(5).toUpperCase());
    Float x1 = new Float(aListing.getLon());
    Float y1 = new Float(aListing.getLat());
    Rectangle r = new Rectangle(x1,y1,x1,y1);
    spatialIndexTree.add(r, index);
    mapSpatialIndexToAListing.put(index, aListing);
    index++;
}

the “spatialIndexTree” is a SpatialIndex built on top of an RTree ( http://jsi.sourceforge.net/ ) and mapSpatialIndexToAListing is a Map<Integer, AListing>

I tried changing it to an async model to steer away from the backpressureexception and have come to the following implementation but I am unable to manage the “index” value in the above code. Any help /comments would be great.

final Gson gson = new Gson();
final SpatialIndex spatialIndexTree = new RTree();
spatialIndexTree.init(null);
bucket1
    .async()
    .query(ViewQuery
            .from(DESIGN_DOC, SORT_VIEW)
            .stale(Stale.TRUE)
            .includeDocs(true))
            .flatMap(new Func1<AsyncViewResult, Observable<AsyncViewRow>>() {
	    @Override
	    public Observable<AsyncViewRow> call(AsyncViewResult viewResult) {
		return viewResult.rows();
	    }
	    })
	    .flatMap(new Func1<AsyncViewRow, Observable<JsonDocument>>() {
	    @Override
    	    public Observable<JsonDocument> call(AsyncViewRow viewRow) {
		return viewRow.document();
	    }
	    })
	    .toBlocking()
	    .subscribe(new Action1<JsonDocument>() {
	    @Override
	    public void call(JsonDocument document) {
	        Abcd abcd = gson.fromJson(document.toString(), Abcd.class);
	        AbcdListing aListing = new AbcdListing(a);
		aListing.set_id(viewRow.id().substring(5).toUpperCase());
                Float x1 = new Float(aListing.getLon());
                Float y1 = new Float(aListing.getLat());
                Rectangle r = new Rectangle(x1,y1,x1,y1);
                spatialIndexTree.add(r, index);
                mapSpatialIndexToAListing.put(index, aListing); 
	    }
	    });

Although this code has the variable “index” but I do not have a way I can increment it. Please comment if this rendition of the above code looks and what can I do abut the “index” variable


#2

hey @anuja.khemka

A little bit of background first :wink:

BackpressureException is something that could technically happen when using both the synchronous and asynchronous APIs (as the former is built on top of the later). But using the async API brings you more tools to deal with it.

It is essentially a signal that many operations were queued on the SDK, and the server side cannot keep up processing and dequeuing as quickly as you enqueue new operations. Sometimes, it is just because the user sends the SDK a huge amount of work in a very short amount of time (like processing a batch of 6K elements at once…).

About toBlocking() now. The aim of this operator is to switch back from an asynchronous world to a synchronous one. To get the most benefit from using asynchronous patterns, you must make an effort of staying asynchronous from start to finish.
Unfortunately that is more easily said than done sometimes, like when you are working on existing code that is mostly synchronous.

So toBlocking can be conceptualized like starting a CountDownLatch that will be counted down when the asynchronous Observable upstream finishes (onCompleted) or fails (onError).

The async version
I think your async code makes sense, but I have a question: does the order of the documents matter? In an asynchronous world, retrieving the documents can be done “out of order”. Imagine the server has a small hiccup and takes 2ms more to retrieve the 3rd document… It would come in after the fourth and maybe fifth in the final sequence.
So if that matters, there are additional steps to prevent this re-ordering. If not, even simpler :slight_smile:

Regarding your problem with index, that is one tricky part of reactive programming: it is harder (and generally discouraged if possible) to maintain a “global” state between transformations (short of using Tuples).

From my perspective, you have two choices:

  • use an AtomicLong counter for the index, out of the chain. incrementAndGet() in your subscribe to get the next index value. Straight to the point but less elegant / rx-like.

  • chain a zipWith() just before toBlocking, associated with an Observable.range(). Range will generate a sequence of numbers, while zipWith will let you combine values from upstream with these sequence numbers. You’ll have to write a “zip function” that takes a JsonDocument plus an Integer and outputs a Tuple2 (Tuple.create(theDoc, theInt)). In your subscribe, you now get the Tuple2 which you can use for index and value.

If you need ordering guarantees
Good news is, there is a dedicated method for that in the SDK: includeDocsOrdered instead of includeDocs.
Behind the scenes, when executing the view query it goes fetching the associated documents using a concatMapEager instead of flatMap, which will keep the original order of the keys.

Side note
Note that processing the values and adding them to your two collections inside of subscribe is guaranteed by RxJava to be executed from a single thread at a time (serialization contract). So that’s probably the right move to do your indexing there, unless these collections offer thread-safety guarantees :wink:

The transformation of JsonDocument to a Rectangle and AbcdListing could be done in a map step, but you’d need a Tuple3 there (index, rectangle and listing), so not sure it brings more readability.


#3

@simonbasle Thank you so much for the reply. I do not need ordering and I will use as AtomicInteger for the index value (i got it working by declaring int index as a static variable for the class and incrementing it, that works good for me but can change it to AtomicInteger on you suggestion). I am sure the Map collection can be made threadsafe by using ConcurrentHashMap but I am not sure about SpatialIndex so I will leave that processing for the more synchronous subscribe step. :slight_smile:

I did some trial and error and wrote the below code, what I am unable to understand is that why do I need retryWhen even though my code is aynschronous? Secondly, inside subscribe if I use call the code throws an Exception that onError not implemented (although it is not mandatory).

It is to be noted that this code runs fine WITHOUT the retryWhen and onError/onCompleted if I run it within my IDE and couchbase deployed locally. However as soon as I deploy the project in a docker container (on my local machine) and use the same couchbase instance, it fails.

index = 0;
            logger.info("Prefix length: {}", prefixLength);
            client
            	.async()
            	.query(ViewQuery
            			.from(HOTEL_SORT_DESIGN_DOC_DEFAULT_NAME, HOTEL_SORT_VIEW_DEFAULT_NAME)
            			.stale(Stale.TRUE)
            			.includeDocs(true))
            	.retryWhen(RetryBuilder
							.anyOf(BackpressureException.class)
							.delay(Delay.exponential(TimeUnit.SECONDS, 1))
							.max(10)
							.build())
            	.flatMap(new Func1<AsyncViewResult, Observable<AsyncViewRow>>() {
            		@Override
            		public Observable<AsyncViewRow> call(AsyncViewResult viewResult) {
            			viewRowsCount = viewResult.totalRows();
            			return viewResult.rows();
            		}
            	})
            	.flatMap(new Func1<AsyncViewRow, Observable<JsonDocument>>() {
            		@Override
            		public Observable<JsonDocument> call(AsyncViewRow viewRow) {
            			return viewRow.document();
            		}
            	})
            	.toBlocking()
				.subscribe(new Subscriber<JsonDocument>() {
					@Override
					public void onNext(JsonDocument document) {
						Hotel hotel = gson.fromJson(document.content().toString(), Hotel.class);
						HotelListing hotelListing = new HotelListing(hotel);
						hotelListing.setNorthstar_id(document.id().substring(prefixLength).toUpperCase());
						Float x1 = new Float(hotelListing.getLon());
						Float y1 = new Float(hotelListing.getLat());
						Rectangle r = new Rectangle(x1,y1,x1,y1);
						spatialIndexTree.add(r, index);
						mapSpatialIndexToHotelListing.put(index++,hotelListing); 
					}
					@Override
					public void onCompleted() {
						logger.info("Final Index Value: {}", index);
					}
					@Override
					public void onError(Throwable arg0) {
						logger.info("Index: {} Whoops: ", index, arg0.getMessage());
					}
				});

Please see if this looks ok to you as it works for me however I do want to know if I am doing something wrong :slight_smile:


#4

From the beginning I’m assuming the view returns a large number of documents, correct? :books:

retryWhen as I said, the BackpressureException is not eliminated automatically when you switch to async, on the contrary. But the async API gives you that sort of tools to retry…

Note that includeDocs(true) actually bring real benefits only in the sync API, so here’s another suggestion that you could try to get rid of BackpressureException :bulb::

  • remove the includeDocs(true) and the retryWhen block
  • in the second flatMap, add a int parameter after the Func1, eg. .flatMap(new Func1(...) { ... }, 10).

This will instruct the flatMap to never trigger more than 10 document fetches in parallel. This should get rid of BackpressureException and give you nice performance. You can fine tune the parallelism hint with trial and error.

about subscribe :bell:
subscribe has quite a number of variants (both in the “normal” Observable and in the BlockingObservable flavour that you use).

Some take an Observer/Subscriber, which forces you to implement onNext(), onError() and onCompleted().

The other variants take combinations of Action as parameter(s). They represent handlers that will cover the corresponding methods from Observer above. So if you only provide one Action1, it’ll only cover onNext(). Two and you’re covering onError() too. Three and you’re covering the whole range, onCompleted() included.


#5

Yes. The view returns close to 2,00,000 documents. How will I get the documents without using includeDocs(true) ? I use the document to create the collections later.


#6

When you call viewRow.document(...) it chains in a KV fetch for that document, unless it has been eagerly loaded and cached (which is what includeDocs(true) does).

But here, eagerly loading all these docs doesn’t work because the view returns too many of them and you can’t apply any form of control (parrallelism hint, etc…) to the eager loading.

So simply removing the eager loading by removing includeDocs(true) and then applying some backpressure control to the flatMap that calls viewRow.document() should help with your situation :wink:


#7

Thanks a lot. This works :slight_smile: