Reactive bulk fetching strategy

Lets say I have a bucket with 10M documents and I want to simply produce a Flux<MyEntity> from them, what would be the best way to do this?

The amount of documents is too large to fit into memory at once so I tried implementing “keyset pagination” using the expand operator like this:

final QueryOptions options = QueryOptions.queryOptions().parameters(JsonObject.create().put("class", MyEntity.class.getName()));

final Flux<TestEntity> flux = cluster.reactive().query("SELECT test.* FROM test " +
						"WHERE META().id IS NOT MISSING " +
						"AND _class = $class ORDER BY META().id ASC LIMIT 1000", options)
				.flatMap(result -> result.rowsAs(MyEntity.class).collectList())
				.filter(batch -> !batch.isEmpty())
				.expand(batch -> cluster.reactive().query("SELECT test.* FROM test " +
										"WHERE META().id > '" + Iterables.getLast(batch).getId() + "' " +
										"AND _class = $class ORDER BY META().id ASC LIMIT 1000", options)
								.flatMap(res -> res.rowsAs(TestEntity.class).collectList())
								.filter(b -> !b.isEmpty()))
				.flatMapIterable(l -> l);

It fetches batches of 1000 using N1QL, then using the expand operator fetches the next batch using the id of the last item in the last batch.

Now this seems to work fine, I’m just wondering if anyone has any better ideas?

There are some problems with this approach though:
If someone requests only 10 items from the flux with flux.limitRequest(10).collectList().block(), 1000 items would still be fetched.