Streaming the N1QL results using Java SDK

Hi Couchbase team,

I have a long running N1QL query and it could read a large number of rows. How do I stream the results from Couchbase to the client without waiting for the entire result? Meaning as soon the application receives a chunk of response(i.e. a subset of rows) in the socket (TCP) layer I would like the Java SDK to read it and hands it over to the application.

Is this possible via Java SDK?

NOTE: I tried using Async but I couldn’t really make it work the way I described above. Any pointers or sample code would be helpful.

1 Like

Hello,

Is there a reason that paging doesn’t work, which is VERY popular with most developers? You can use LIMIT and OFFSET to achieve paging in SQL++ (N1QL):

LIMIT:

OFFSET:

Thanks
Aaron

1 Like

Hi Vishnu,

You could indeed use offset-based pagination as Aaron suggests. A caveat is that offset-based pagination is not stable. If the “full” result set changes while you’re issuing the queries for each page, a document that should be present in the results might get skipped, or a document might appear on more than one page, since a document’s offset could change in between queries. This might be fine, depending on your use case.

It might be worth looking into keyset-based pagination. It takes a bit more work than offset-based pagination, but it’s stable, and tends to perform better when the offset gets large.

If you’d like to stream the whole result set without pagination, the Reactive N1QL Query API is probably the best solution. (The Async API is not good for this, since it doesn’t support streaming the results.) With the Reactive API, you get the rows as a Flux, so you can process each row as it arrives from Couchbase.

Thanks,
David

1 Like

Thanks David and Biozal.

I’m trying out Reactive N1QL, but I think the program crashes while it calls the hookOnNext() function. It looks like it is not able to Decode the row value “000548c1-bbcc-4755-aefe-36374e724f8f”. My query is a simple query that fetches an array of Strings/Ids.

[
            "000548c1-bbcc-4755-aefe-36374e724f8f",
            "0010affa-ef30-4e6e-aea6-1fe8386585cb",
            "001acd9a-f674-47bc-be7c-c1e556ad85ef",
]

How do I convert a row into an Object? (So that I can add each rows in the batch into List<Object>).

LOGS:

reactor.core.Exceptions$ErrorCallbackNotImplemented: com.couchbase.client.core.error.DecodingFailureException: Deserialization of content into target class com.couchbase.client.java.json.JsonObject failed; encoded = "000548c1-bbcc-4755-aefe-36374e724f8f"
	Suppressed: com.couchbase.client.core.error.DecodingFailureException: Deserialization of content into target class com.couchbase.client.java.json.JsonObject failed; encoded = "000548c1-bbcc-4755-aefe-36374e724f8f"
Caused by: com.couchbase.client.core.error.DecodingFailureException: Deserialization of content into target class com.couchbase.client.java.json.JsonObject failed; encoded = "000548c1-bbcc-4755-aefe-36374e724f8f"
	at com.couchbase.client.java.codec.JacksonJsonSerializer.deserialize(JacksonJsonSerializer.java:142) ~[java-client-3.2.6.jar!/:na]
	at com.couchbase.client.java.query.ReactiveQueryResult.lambda$rowsAs$0(ReactiveQueryResult.java:67) ~[java-client-3.2.6.jar!/:na]
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:106) ~[reactor-core-3.4.16.jar!/:3.4.16]
	at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:491) ~[reactor-core-3.4.16.jar!/:3.4.16]
	at reactor.core.publisher.EmitterProcessor.subscribe(EmitterProcessor.java:209) ~[reactor-core-3.4.16.jar!/:3.4.16]
	at reactor.core.publisher.Flux.subscribe(Flux.java:8469) ~[reactor-core-3.4.16.jar!/:3.4.16]
:
:
Caused by: java.io.IOException: Expected START_OBJECT but got VALUE_STRING
	at com.couchbase.client.java.json.JsonValueModule.expectCurrentToken(JsonValueModule.java:141) ~[java-client-3.2.6.jar!/:na]
	at com.couchbase.client.java.json.JsonValueModule.access$000(JsonValueModule.java:37) ~[java-client-3.2.6.jar!/:na]
	at com.couchbase.client.java.json.JsonValueModule$AbstractJsonValueDeserializer.decodeObject(JsonValueModule.java:71) ~[java-client-3.2.6.jar!/:na]
	at com.couchbase.client.java.json.JsonValueModule$JsonObjectDeserializer.deserialize(JsonValueModule.java:135) ~[java-client-3.2.6.jar!/:na]
	at com.couchbase.client.java.json.JsonValueModule$JsonObjectDeserializer.deserialize(JsonValueModule.java:132) ~[java-client-3.2.6.jar!/:na]
	at com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:322) ~[jackson-databind-2.13.2.2.jar!/:2.13.2.2]
	at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4674) ~[jackson-databind-2.13.2.2.jar!/:2.13.2.2]
	at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3690) ~[jackson-databind-2.13.2.2.jar!/:2.13.2.2]

CODE:

Mono<ReactiveQueryResult> queryResult = cluster.reactive().query(query, queryOptions); 
queryResult.flatMapMany(ReactiveQueryResult::rowsAsObject).subscribe(new BaseSubscriber<JsonObject>() {
         
                // Number of outstanding requests
                final AtomicInteger oustanding = new AtomicInteger(0);

                @Override
                protected void hookOnSubscribe(Subscription subscription) {
                  LOGGER.info(" Subscription Done ");
                  request(10); // Batch size
                  oustanding.set(10);
                }

                @Override
                protected void hookOnNext(JsonObject value) {
                   
                  //LOGGER.info("hookOnNext Row value={}", value);
                  processResult(result);
                  if (oustanding.decrementAndGet() == 0) {
                    request(10);
                  }
                }

              });

It would be great if you could help me resolve this!

Thanks,
Vishnu

Hi Vishnu,

Since your result rows are Strings (not JSON Object nodes), this is the part to change:

Try replacing it with:

result -> result.rowsAs(String.class)

And then change the signature of the hookOnNext method to:

protected void hookOnNext(String value) {

Thanks,
David

Thanks David. That works!

2 Likes