When sending a GetAllMutationTokensRequest I expect to get MutationToken(s) for all 1024 partitions in return but in some situations I only get 512 tokens. The code that fetches the MutationTokens looks as follows
The main difference between the servers, besides the version numbers, is the number of nodes. The 3.1.3 server, which has two nodes, only returns 512 tokens in its response, while the 4.0.0 server, witch has one node, returns 1024 tokens. Does the number of nodes play a role here or is that just an unlucky circumstance?
#nodes * #returned tokens = 1024
What’s the proper method of getting the all MutationTokens? I’m using Java SDK version 2.2.7 and I’m aware that these APIs are experimental.
@alexm your observation is correct - the reason is that the message only gets directed to one node. So you get the response from only one node and since both of them have half the partitions that’s why you only get 512 back in the 2 node case. On 4 nodes it would be 256.
So right now this only seems to work with 1 nodes, maybe @avsej can shed more light on alternative ways on this.
For what reason do you need the tokens? If you want to use DCP there are other APIs that do this properly I think…
Obtain a snapshot of the sequence numbers from each vBucket
Use view queries to fetch a data set which represents the history or initial state of my data
Start a DCP stream, using the previously obtained snapshot as a starting point, in order to capture upserts made to the data set.
Essentially I’m using Couchbase as streaming service after obtaining an initial state. I’m monitoring a large set of documents that are updated frequently and the “push behavior” offered by DCP, suites my needs better than the traditional view queries.
For what reason do you need the tokens? If you want to use DCP there are other APIs that do this properly I think…
I used to rely on the BucketStreamAggregator.getCurrentState() and BucketStreamAggregator.feed(), but they were removed in 2.2.6. Now there’s DCPConnection.getCurrentState() but it seem to have a different idea of what the current state is compared to BucketStreamAggregator.getCurrentState().
It is different in implementation, but the idea the same. It uses combination of GetFailoverLogRequest with GetLastCheckpointRequest, because the ltter does not have vbucket UUID. The code below is basically extraction from DCPConnection.getCurrentState()
List<MutationToken> stateList = ... //as described in the previous post
CountdownLatch latch = new CountdownLatch(stateList.size())
Observable.from(stateList)
.flatMap(mutationToken -> {
return dcpConnection.addStream(
(short) mutationToken.vbucketID(),
mutationToken.vbucketUUID(),
mutationToken.sequenceNumber(), end,
mutationToken.sequenceNumber(), end);
})
.subscribe(response -> latch.countDown());
latch.await();
dcpConnection.subject()
subscribe(req -> handle(req));
then I should only get changes that occurred between the calls to dcpConnection.addStream() and dcpConnection.subject().subscribe()? But what I see is a huge number of MutationMessages even though nothing has happened to the documents in the bucket between those calls. Is this the expected behavior and if so, is it possible to get notifications with respect to only those documents that have changed between a certain point in time and now?