Requesting for an api to get the current / most recent streamOffset of all vBuckets

hey folks,

  • how do we get the last or recent streamOffset of all vBuckets? for example from client instance itself. Is there any way to do so?
  • the reason for this question is , while starting the application first time , we will not be having streamOffset of vBuckets until we have a mutation on the vBuckets, so we have to use ZERO streamOffset , which will give us all the mutations while I use this ZERO streamOffset on resumeStreaming() api, which I don’t want. Instead I want some api , using which we can get the current streamOffset or most recent streamOffet, so on the first start of the application , I can use this most recent streamOffset on resumeStreaming(), which will not pull all the mutations that has happened on that vBucket. it will be very helpful if we are able to get that current StreamOffsets all vBuckets.

kindly provide us a api to do this , we are blocked on this and would be very very helpful

Hi @Karthikeyan

I long time ago I used this sort of syntax on my client the key seems to be how you initialize your DCP stream.

Just current mutations form the start to the end
client.initializeState(StreamFrom.BEGINNING, StreamTo.NOW).block();

or form the end to infinity (as in never stop)
client.initializeState(StreamFrom.NOW, StreamTo.INFINITY ).block();

Below is some code complete that worked for me.

Note @david.nault is our DCP guru he might chime in with some more comments or better code.

package com.jonstrabala;

import com.couchbase.client.java.json.JsonObject;
import com.couchbase.client.dcp.Client;
import com.couchbase.client.dcp.ControlEventHandler;
import com.couchbase.client.dcp.DataEventHandler;
import com.couchbase.client.dcp.StreamFrom;
import com.couchbase.client.dcp.StreamTo;
import com.couchbase.client.dcp.SystemEventHandler;
import com.couchbase.client.dcp.config.DcpControl;
import com.couchbase.client.dcp.core.event.CouchbaseEvent;
import com.couchbase.client.dcp.message.DcpMutationMessage;
import com.couchbase.client.dcp.message.DcpSnapshotMarkerRequest;
import com.couchbase.client.dcp.transport.netty.ChannelFlowController;
import com.couchbase.client.dcp.deps.io.netty.buffer.ByteBuf;
//import com.couchbase.client.dcp.events.StreamEndEvent;

// import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicLong;
import java.util.Date;

public class DocIdsFromDcpFromEnd {

	public static void main(String[] args) throws InterruptedException {
	    AtomicLong dcpDocCount = new AtomicLong(0);
	    AtomicLong dcpMarkerCount = new AtomicLong(0);

	    final Client client = Client.builder()
	            .seedNodes("192.168.3.150")
	            .bucket("test")
	            .scopeName("_default")  // for 7.0+ give the keyspace
	            .collectionsAware(true) // for 7.0+ enable keyspace mode
	            .credentials("myusername", "mypasswd")
	            .controlParam(DcpControl.Names.CONNECTION_BUFFER_SIZE, 16 * 10240) // set the buffer to 10K
	            .bufferAckWatermark(60) // after 75% are reached of the 10KB, acknowledge against the server
	            .build();
	    
	    client.controlEventHandler(new ControlEventHandler() {
	      @Override
	      public void onEvent(ChannelFlowController flowController, ByteBuf event) {
	        if (DcpSnapshotMarkerRequest.is(event)) {
	          dcpMarkerCount.incrementAndGet();
	          flowController.ack(event);
	        }
	        event.release();
	      }
	    });
		

		client.dataEventHandler(new DataEventHandler() {
			@Override
			public void onEvent(ChannelFlowController flowController, ByteBuf event) {
				long curCnt = dcpDocCount.incrementAndGet();
				if ( DcpMutationMessage.is(event)) {
					// this is a mutation here
					// int partition = DcpMutationMessage.partition(event);
					String key = DcpMutationMessage.keyString(event);
					// String content = DcpMutationMessage.content(event).toString(StandardCharsets.UTF_8);
					// JsonObject data = JsonObject.fromJson(content);
					long thisMs = System.currentTimeMillis();
					System.out.printf("%s: %d markers, mutations %9d, key %-18s\n",new Date(),dcpMarkerCount.get(),curCnt, key);			

				}
				// ack when we have < 1024 outstanding
				flowController.ack(event);
				event.release();
			}
		});
		
		client.systemEventHandler(new SystemEventHandler() {
			@Override
			public void onEvent(CouchbaseEvent event) {
				/*
		        if (event instanceof StreamEndEvent) {
		            StreamEndEvent streamEnd = (StreamEndEvent) event;
		            System.out.printf("Stream for partition %4d has ended (reason: %s)\n",streamEnd.partition(), streamEnd.reason());
		        }
		        */		
			}
		});

		client.connect().block();
	    // client.initializeState(StreamFrom.BEGINNING, StreamTo.NOW).block();
		client.initializeState(StreamFrom.NOW, StreamTo.INFINITY ).block();

	    client.startStreaming().block();
	    while (true) {
	    	try { Thread.sleep(10 * 1000); } catch (InterruptedException e1) {}
	    }
    }
}

Best

Jon Strabala
Principal Product Manager - Server‌

2 Likes
public static void main(String[] args) {
  try (Client client = Client.builder()
      .credentials("Administrator", "password")
      .seedNodes("127.0.0.1")
      .bucket("travel-sample")
      .build()) {

    client.connect().block();
    client.initializeState(StreamFrom.NOW, StreamTo.INFINITY).block();

    // Key is partition (vbucket) number, value is partition's offset
    Map<Integer, StreamOffset> currentOffsets = new TreeMap<>();
    client.sessionState().foreachPartition((partition, state) -> 
        currentOffsets.put(partition, state.getOffset())
    );

    currentOffsets.entrySet().forEach(System.out::println);

    client.resumeStreaming(currentOffsets).block();

    // etc...

  }
}
1 Like

hey Jon and David,

It has solved the problem , Thank you so much.
A huge kudos to the entire couchbase team and David for constant support.