Setting timeout for AsyncBucket query

Is it possible to set timeout for AsyncBucket query to complete?

As calling timeout(…) on the Observable , returns another Observable with a time out for each object and not for complete query.

I’m not sure I follow. The recommended pattern is indeed asyncBucket.query().timeout().subscribe, but if that doesn’t work for your use case can you show us the code you are using and let us know more about the use case?

From docs:

Returns an Observable that mirrors the source Observable but applies a timeout policy for each emitted item. If the next item isn’t emitted within the specified timeout duration starting from its predecessor, the resulting Observable terminates and notifies observers of a TimeoutException.

I want to set time for all items not just for one. If all items not returned within specified time I want to get an Exception.

@yevsht in this case you need to change the timeout chaining a bit. Since I assume you are interested in all rows coming back and not just the result it more looks like this with rxjava:

     bucket.async()
      .query(N1qlQuery.simple("select ..."))
      .flatMap((Func1<AsyncN1qlQueryResult, Observable<?>>) result -> result.rows())
      .last()
      .timeout(1, TimeUnit.MINUTES)
      .subscribe()

So you are extracting all the rows and then waiting for the last one to come back when applying the timeout. This is a simple example, normally you’d do your computations in between the flatmap and last (or doing different aggregations) and then just applying the timeout at the very end of your computation chain.

Probably I am doing something wrong, but last() emits only last item:

 final Subscriber<AsyncN1qlQueryRow> queryRowSub = new Subscriber<AsyncN1qlQueryRow>() {
				@Override
				public void onNext(AsyncN1qlQueryRow ayncN1qlQueryRow) {
					System.out.println("putting " + ayncN1qlQueryRow.value());
					
				}

				@Override
				public void onCompleted() {
					System.out.println("-----------DONE PUTTING--------------");
				}

			    @Override
			    public void onError(Throwable e) {
			    	System.out.println("Error "+ e);
			    }
			};

bucket.async()
	.query(qq)           		         
	.flatMap(new Func1<AsyncN1qlQueryResult, Observable<AsyncN1qlQueryRow>>() {
		 @Override
		 public Observable<AsyncN1qlQueryRow> call(AsyncN1qlQueryResult result) {		 			       
			return result.rows(); 
		 }
	   }).last()
	     .timeout(1, TimeUnit.MINUTES)         		          		           		         
	     .subscribe(queryRowSub);