Get replicated high seqnos

server
#1

getCurrentState in DcpConnection in core-io 1.3.0 returns highSeqNo for each partition which might not have been replicated or persisted.
What is the API to get all seqNumbers which have been persisted and replicated ?

#2

The current implementation of getCurrentState does not allow to specify filter sequence number by partition state. In future version we will implement it though, but for now, you can adjust this method and parameterize it. For example it might look like this:

package sample;

import com.couchbase.client.core.ClusterFacade;
import com.couchbase.client.core.config.CouchbaseBucketConfig;
import com.couchbase.client.core.config.NodeInfo;
import com.couchbase.client.core.logging.CouchbaseLogger;
import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
import com.couchbase.client.core.message.cluster.GetClusterConfigRequest;
import com.couchbase.client.core.message.cluster.GetClusterConfigResponse;
import com.couchbase.client.core.message.dcp.FailoverLogEntry;
import com.couchbase.client.core.message.dcp.GetFailoverLogRequest;
import com.couchbase.client.core.message.dcp.GetFailoverLogResponse;
import com.couchbase.client.core.message.kv.GetAllMutationTokensRequest;
import com.couchbase.client.core.message.kv.GetAllMutationTokensResponse;
import com.couchbase.client.core.message.kv.MutationToken;
import com.couchbase.client.java.Cluster;
import com.couchbase.client.java.CouchbaseCluster;
import com.couchbase.client.java.env.CouchbaseEnvironment;
import com.couchbase.client.java.env.DefaultCouchbaseEnvironment;
import rx.Observable;
import rx.functions.Action2;
import rx.functions.Func0;
import rx.functions.Func1;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class Sample {
    private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance(Sample.class);

    public static Observable<MutationToken> getCurrentState(final ClusterFacade core, final String bucket,
                                                            GetAllMutationTokensRequest.PartitionState state) {
        return core
                .<GetClusterConfigResponse>send(new GetClusterConfigRequest())
                .flatMap(new Func1<GetClusterConfigResponse, Observable<NodeInfo>>() {
                    @Override
                    public Observable<NodeInfo> call(GetClusterConfigResponse response) {
                        CouchbaseBucketConfig cfg = (CouchbaseBucketConfig) response.config().bucketConfig(bucket);
                        return Observable.from(cfg.nodes());
                    }
                })
                .flatMap(new Func1<NodeInfo, Observable<GetAllMutationTokensResponse>>() {
                    @Override
                    public Observable<GetAllMutationTokensResponse> call(NodeInfo node) {
                        return core.send(new GetAllMutationTokensRequest(state, node.hostname(), bucket));
                    }
                })
                .collect(new Func0<Map<Integer, MutationToken>>() {
                    @Override
                    public Map<Integer, MutationToken> call() {
                        return new HashMap<Integer, MutationToken>(1024);
                    }
                }, new Action2<Map<Integer, MutationToken>, GetAllMutationTokensResponse>() {
                    @Override
                    public void call(Map<Integer, MutationToken> collector, GetAllMutationTokensResponse response) {
                        for (MutationToken token : response.mutationTokens()) {
                            int key = (int) token.vbucketID();
                            MutationToken prev = collector.get(key);
                            MutationToken current = token;
                            if (prev != null && prev.sequenceNumber() != token.sequenceNumber()) {
                                if (current.sequenceNumber() < prev.sequenceNumber()) {
                                    current = prev;
                                }
                                LOGGER.debug("nodes are not agree on sequence number for vbucket {}: old={}, new={}, selected={}",
                                        token.vbucketID(), prev.sequenceNumber(), token.sequenceNumber(), current.sequenceNumber());
                            }
                            collector.put(key, current);
                        }
                    }
                })
                .flatMap(new Func1<Map<Integer, MutationToken>, Observable<MutationToken>>() {
                    @Override
                    public Observable<MutationToken> call(Map<Integer, MutationToken> sequenceNumbers) {
                        return Observable.from(sequenceNumbers.values());
                    }
                })
                .flatMap(new Func1<MutationToken, Observable<MutationToken>>() {
                    @Override
                    public Observable<MutationToken> call(final MutationToken token) {
                        return core.<GetFailoverLogResponse>send(new GetFailoverLogRequest((short) token.vbucketID(), bucket))
                                .map(new Func1<GetFailoverLogResponse, MutationToken>() {
                                    @Override
                                    public MutationToken call(GetFailoverLogResponse failoverLogsResponse) {
                                        final FailoverLogEntry entry = failoverLogsResponse.failoverLog().get(0);
                                        return new MutationToken(
                                                failoverLogsResponse.partition(),
                                                entry.vbucketUUID(),
                                                token.sequenceNumber(),
                                                bucket);
                                    }
                                });
                    }
                });
    }

    public static void main(String[] args) throws InterruptedException {
        String CONNECTION_NAME = "mytest";
        String BUCKET_NAME = "travel-sample";

        DefaultCouchbaseEnvironment.Builder builder = DefaultCouchbaseEnvironment.builder();
        // at the moment core will pick connection name from the environment
        builder.dcpConnectionName(CONNECTION_NAME);
        builder.dcpEnabled(true);
        CouchbaseEnvironment env = builder.build();
        Cluster cluster = CouchbaseCluster.create(env, "10.112.150.101");
        ClusterFacade core = cluster.openBucket(BUCKET_NAME).core();

        List<MutationToken> stateList =
                getCurrentState(core, BUCKET_NAME, GetAllMutationTokensRequest.PartitionState.REPLICA)
                        .toList().toBlocking().single();

        for (MutationToken mutationToken : stateList) {
            System.out.println(mutationToken);
        }

        cluster.disconnect();
    }
}

The code above, pulls the sequence numbers from replica, so that you can be sure that you are starting at replicated sequence numbers