Eventing timer function not working after same doc created

Hi

we are using eventing’s timer future to schedule tasks.
this function works when the document is created and creates a timer, deletes this doc when this timer runs

when create same doc with same meta.id not working to function

for example

1: doc create and meta.id = x
2: function run and timer created
3: timer run and delete doc

— and couple time after—

4: doc create and meta.id = x and this when create same doc with same meta.id not working to function for create timer

couchbase server version = Couchbase Server Enterprise Edition 6.5.1 build 6299

and function like this

function OnUpdate(doc, meta) {
    var context = {"docId": meta.id};

    var startTriggerDate = new Date(doc.startDate);
    startTriggerDate.setSeconds(startTriggerDate.getSeconds() + 60);
    var startTimerId = meta.id + ":scheduleStartTimer";

    createTimer(scheduleStartTimer, startTriggerDate, startTimerId, context);

    var endTriggerDate = new Date(doc.endDate);
    endTriggerDate.setSeconds(endTriggerDate.getSeconds() + 60);
    var endTimerId = meta.id + ":scheduleEndTimer";

    createTimer(scheduleEndTimer, endTriggerDate, endTimerId, context);
}

function scheduleStartTimer(context) {
    var doc = sourcebucket[context.docId];

    if (doc) {
        var now = new Date();
        var startDate = new Date(doc.startDate);

        var request = {};

        var response = curl('POST', apiUrl, request);
    }
}

function scheduleEndTimer(context) {
    var doc = sourcebucket[context.docId];

    if (doc) {
        var now = new Date();
        var endDate = new Date(doc.endDate);

        var request = {};
        var response = curl('POST', apiUrl, request);
    }
}

like as this issues
https://issues.couchbase.com/browse/JCBC-1587

Hi @Rohat_Sahin – I’ll give this a try and respond on what may be the issue. Can you please let me know what version of Couchbase you are using? Thanks.

Also @Rohat_Sahin - if you create multiple timers with the same id, the new one will cancel old one. It’ll be helpful if you can add a log(…) messages in OnUpdate, scheduleStartTimer, scheduleEndTimer and each call to createTimer to log all relevant parameters (especially meta.id, timerId and timer date) and post the same here, so I can trace the sequence of events. It’s also a good idea to encompass the functions in a try { … } catch (err) { log (err); } block, so in case doc.endDate was not valid, the exception thrown will be logged to eventing Application Logs.

thanks
we are using this version Couchbase Server Enterprise Edition 6.5.1 build 6299 and also I add logs to timer function
I also check before timer executions if doc exist like this

 var doc = sourcebucket[context.docId];
if (doc) {}

Hi Rohat

It seems that you are writing a scheduling system based on the “names”, you might want to visit https://blog.couchbase.com/implementing-a-robust-portable-cron-like-scheduler-via-couchbase-eventing-part-1/ to see a complex example of making recurring schedules.

Back to your code

  1. I refactored your code and applied what Siri asked you to to i.e. adding try catch blocks
  2. I commented out the timerId’s and just utilize null (let the system deal with it)
  3. I added a “quicktest” flag to short circuit your logic just to get faster test runs through the system.
  4. I added a sanity check if doc.startDate >= doc.endDate
  5. I replaced your curl calls with a simple log message

Because of 2) above you can delete a document and create a document (or mutate twice) and the callbacks will still work - it all depends on your goal a) always generate the callbacks or b) ignore if not the correct callback.

I am curious on way you seem to have a document with a startDate and an endDate but add 60 seconds to each as I am not privy to your thinking I would assume that you would have just put in the correct dates for the timers you wanted to schedule.

Keep in mind timers are not wall clock accurate (refer to https://docs.couchbase.com/server/6.5/eventing/eventing-faq.html#timer-delays )

Also the Application logs might be split up between servers in you have N nodes you really don’t know where your log messages will be written. The get combined view across all logs (reverse time order) use the UI Eventing log hyperlink.

If you failed to make a bucket alias for sourcebucket you will always get a null document on your reads in your two callbacks (the try catch block will warn you of this).

Next you did not supply a control document to trigger the mutation so I just guessed at one perhaps it would look something like the following

KEY:
schedule::1

DOCUMENT:
{
“type”: “schedule”,
“id”: 1,
“startDate”: “Mon Jun 08 2020 08:30:08 GMT-0700”,
“endDate”: “Mon Jun 08 2020 09:00:08 GMT-0700”
}

The new refactored source code is below

function OnUpdate(doc, meta) {
  if (doc.type !== 'schedule') return;
  log("OnUpdate", doc);
  var quicktest = true;
  var context = {
    "docId": meta.id
  };
  var startTriggerDate = null;
  var endTriggerDate = null;
  try {
    if (quicktest) {
      log("OnUpdate doing quicktest", doc);
      startTriggerDate = new Date();
      endTriggerDate = new Date(startTriggerDate.getTime() + 15 * 1000);
    } else {
      startTriggerDate = new Date(doc.startDate);
      endTriggerDate = new Date(doc.endDate);
      // if you define both dates why are you adding 60 ?
      startTriggerDate.setSeconds(startTriggerDate.getSeconds() + 60);
      endTriggerDate.setSeconds(endTriggerDate.getSeconds() + 60);
    }
    // sanity check add 15 seconds if illegal
    if (startTriggerDate.getTime() >= endTriggerDate.getTime()) {
      log("OnUpdate set endTriggerDate to startTriggerDate + 15 second")
      endTriggerDate = new Date(doc.startDate)
      endTriggerDate.setSeconds(endTriggerDate.getSeconds() + 15);
    }
  } catch (e1) {
    log("OnUpdate issue with prep", e1);
  }
  try {
    log("OnUpdate createTimer for callback scheduleStartTimer ", startTriggerDate);
    createTimer(scheduleStartTimer, startTriggerDate, null /* meta.id + ":scheduleStartTimer" */ , context);
  } catch (e2) {
    log("OnUpdate issue with createTimer(scheduleStartTimer,...)", e2);
  }
  try {
    log("OnUpdate createTimer for callback scheduleEndTimer ", endTriggerDate);
    createTimer(scheduleEndTimer, endTriggerDate, null /* meta.id + ":scheduleEndTimer" */ , context);
  } catch (e3) {
    log("OnUpdate issue with createTimer(scheduleStartTimer,...)", e2e3);
  }
}
function scheduleStartTimer(context) {
  log("scheduleStartTimer context:", context);
  var doc = null
  try {
    doc = sourcebucket[context.docId];
    log("scheduleStartTimer doc:", doc);
  } catch (e1) {
    log("scheduleStartTimer ERROR could not read doc:", e1);
  }
  if (doc) {
    var now = new Date();
    var startDate = new Date(doc.startDate);
    log('scheduleStartTimer now:', now);
    try {
      //var request = {};
      //var response = curl('POST', apiUrl, request);
    } catch (e2) {
      log("scheduleStartTimer ERROR curl:", e2);
    }
  }
}
function scheduleEndTimer(context) {
  log("scheduleEndTimer context:", context);
  var doc = null
  try {
    doc = sourcebucket[context.docId];
    log("scheduleEndTimer doc:", doc);
  } catch (e1) {
    log("scheduleEndTimer ERROR could not read doc:", e1);
  }
  if (doc) {
    var now = new Date();
    var endDate = new Date(doc.endDate);
    log('scheduleEndTimer now:', now);
    try {
      //var request = {};
      //var response = curl('POST', apiUrl, request);
    } catch (e2) {
      log("scheduleEndTimer ERROR curl:", e2);
    }
  }
}

Now assuming we deploy from now and trigger one mutation we should see something like the following:

Note all three message blocks could be recorded on different nodes but they will be combined in the UI Eventing log view pop-up but it important that each node uses NTP to have accurate and in sync wall clocks.

2020-06-08T16:50:01.699+00:00 [INFO] “OnUpdate” {“type”:“schedule”,“id”:1,“startDate”:“Mon Jun 08 2020 08:30:08 GMT-0700”,“endDate”:“Mon Jun 08 2020 09:00:09 GMT-0700”}
2020-06-08T16:50:01.699+00:00 [INFO] “OnUpdate doing quicktest” {“type”:“schedule”,“id”:1,“startDate”:“Mon Jun 08 2020 08:30:08 GMT-0700”,“endDate”:“Mon Jun 08 2020 09:00:09 GMT-0700”}
2020-06-08T16:50:01.699+00:00 [INFO] "OnUpdate createTimer for callback scheduleStartTimer " “2020-06-08T16:50:01.699Z”
2020-06-08T16:50:01.703+00:00 [INFO] "OnUpdate createTimer for callback scheduleEndTimer " “2020-06-08T16:50:16.699Z”

2020-06-08T16:50:15.964+00:00 [INFO] “scheduleStartTimer context:” {“docId”:“schedule::1”}
2020-06-08T16:50:15.965+00:00 [INFO] “scheduleStartTimer doc:” {“type”:“schedule”,“id”:1,“startDate”:“Mon Jun 08 2020 08:30:08 GMT-0700”,“endDate”:“Mon Jun 08 2020 09:00:09 GMT-0700”}
2020-06-08T16:50:15.965+00:00 [INFO] “scheduleStartTimer now:” “2020-06-08T16:50:15.965Z”

2020-06-08T16:50:22.910+00:00 [INFO] “scheduleEndTimer context:” {“docId”:“schedule::1”}
2020-06-08T16:50:22.911+00:00 [INFO] “scheduleEndTimer doc:” {“type”:“schedule”,“id”:1,“startDate”:“Mon Jun 08 2020 08:30:08 GMT-0700”,“endDate”:“Mon Jun 08 2020 09:00:09 GMT-0700”}
2020-06-08T16:50:22.911+00:00 [INFO] “scheduleEndTimer now:” “2020-06-08T16:50:22.911Z”

Now use the Bucket Editor and mutate the document twice you will see that all four callbacks two for each mutation are indeed called.

2020-06-08T18:04:21.861+00:00 [INFO] “OnUpdate” {“type”:“schedule”,“id”:1,“startDate”:“Mon Jun 08 2020 08:30:08 GMT-0700”,“endDate”:“Mon Jun 08 2020 09:00:10 GMT-0700”}
2020-06-08T18:04:21.861+00:00 [INFO] “OnUpdate doing quicktest” {“type”:“schedule”,“id”:1,“startDate”:“Mon Jun 08 2020 08:30:08 GMT-0700”,“endDate”:“Mon Jun 08 2020 09:00:10 GMT-0700”}
2020-06-08T18:04:21.861+00:00 [INFO] "OnUpdate createTimer for callback scheduleStartTimer " “2020-06-08T18:04:21.861Z”
2020-06-08T18:04:21.865+00:00 [INFO] "OnUpdate createTimer for callback scheduleEndTimer " “2020-06-08T18:04:36.861Z”
2020-06-08T18:04:28.160+00:00 [INFO] “OnUpdate” {“type”:“schedule”,“id”:1,“startDate”:“Mon Jun 08 2020 08:30:08 GMT-0700”,“endDate”:“Mon Jun 08 2020 09:00:11 GMT-0700”}
2020-06-08T18:04:28.160+00:00 [INFO] “OnUpdate doing quicktest” {“type”:“schedule”,“id”:1,“startDate”:“Mon Jun 08 2020 08:30:08 GMT-0700”,“endDate”:“Mon Jun 08 2020 09:00:11 GMT-0700”}
2020-06-08T18:04:28.160+00:00 [INFO] "OnUpdate createTimer for callback scheduleStartTimer " “2020-06-08T18:04:28.160Z”
2020-06-08T18:04:28.163+00:00 [INFO] "OnUpdate createTimer for callback scheduleEndTimer " “2020-06-08T18:04:43.160Z”
2020-06-08T11:04:29.565-07:00 [INFO] “scheduleStartTimer context:” {“docId”:“schedule::1”}
2020-06-08T11:04:29.567-07:00 [INFO] “scheduleStartTimer doc:” {“type”:“schedule”,“id”:1,“startDate”:“Mon Jun 08 2020 08:30:08 GMT-0700”,“endDate”:“Mon Jun 08 2020 09:00:11 GMT-0700”}
2020-06-08T11:04:29.567-07:00 [INFO] “scheduleStartTimer now:” “2020-06-08T18:04:29.567Z”
2020-06-08T11:04:36.582-07:00 [INFO] “scheduleStartTimer context:” {“docId”:“schedule::1”}
2020-06-08T11:04:36.588-07:00 [INFO] “scheduleStartTimer doc:” {“type”:“schedule”,“id”:1,“startDate”:“Mon Jun 08 2020 08:30:08 GMT-0700”,“endDate”:“Mon Jun 08 2020 09:00:11 GMT-0700”}
2020-06-08T11:04:36.588-07:00 [INFO] “scheduleStartTimer now:” “2020-06-08T18:04:36.588Z”
2020-06-08T18:04:36.725+00:00 [INFO] “scheduleEndTimer context:” {“docId”:“schedule::1”}
2020-06-08T18:04:36.726+00:00 [INFO] “scheduleEndTimer doc:” {“type”:“schedule”,“id”:1,“startDate”:“Mon Jun 08 2020 08:30:08 GMT-0700”,“endDate”:“Mon Jun 08 2020 09:00:11 GMT-0700”}
2020-06-08T18:04:36.726+00:00 [INFO] “scheduleEndTimer now:” “2020-06-08T18:04:36.726Z”
2020-06-08T18:04:43.833+00:00 [INFO] “scheduleEndTimer context:” {“docId”:“schedule::1”}
2020-06-08T18:04:43.834+00:00 [INFO] “scheduleEndTimer doc:” {“type”:“schedule”,“id”:1,“startDate”:“Mon Jun 08 2020 08:30:08 GMT-0700”,“endDate”:“Mon Jun 08 2020 09:00:11 GMT-0700”}
2020-06-08T18:04:43.834+00:00 [INFO] “scheduleEndTimer now:” “2020-06-08T18:04:43.834Z”

Of course if you mutate your document and then quickly it delete it you will have logging to tell you just what happened.

2020-06-08T18:10:39.461+00:00 [INFO] “OnUpdate” {“type”:“schedule”,“id”:1,“startDate”:“Mon Jun 08 2020 08:30:08 GMT-0700”,“endDate”:“Mon Jun 08 2020 09:00:01 GMT-0700”}
2020-06-08T18:10:39.461+00:00 [INFO] “OnUpdate doing quicktest” {“type”:“schedule”,“id”:1,“startDate”:“Mon Jun 08 2020 08:30:08 GMT-0700”,“endDate”:“Mon Jun 08 2020 09:00:01 GMT-0700”}
2020-06-08T18:10:39.461+00:00 [INFO] "OnUpdate createTimer for callback scheduleStartTimer " “2020-06-08T18:10:39.461Z”
2020-06-08T18:10:39.469+00:00 [INFO] "OnUpdate createTimer for callback scheduleEndTimer " “2020-06-08T18:10:54.461Z”

2020-06-08T18:10:47.819+00:00 [INFO] “scheduleStartTimer context:” {“docId”:“schedule::1”}
2020-06-08T18:10:47.820+00:00 [INFO] “scheduleStartTimer doc:” undefined

2020-06-08T11:10:54.568-07:00 [INFO] “scheduleEndTimer context:” {“docId”:“schedule::1”}
2020-06-08T11:10:54.569-07:00 [INFO] “scheduleEndTimer doc:” undefined

Best

Jon Strabala

@jon.strabala @Siri
i think case is this. why remaining dcp eventing wait too long, when this occurring functions execution


253k is remaining

@Rohat_Sahin I am not sure what your saying here, timers are not wall clock accurate refer to https://docs.couchbase.com/server/6.5/eventing/eventing-timers.html#wall-clock-accuracy - I am assuming you might be saying something about not firing at the scheduled time here (probably wrong).

Did you run my refactored code yes/no, can you elaborate and also supply your Application log? I’ll dig into things if you give me some more information.

I refactored code remove timerId with null and add try catch block and add application log. I known timer is not wall lock event but this is wait four of five hours , this delay is too huge
and refactored code like this function is worker threat is 6 and timeout is 60 second

function OnUpdate(doc, meta) {
    try {
        // fail fast if doc type is not Type
        if (doc.type !== 'Type') {
            return
        }

        const correlationId = create_UUID();
        const context = {"docId": meta.id, "correlationId": correlationId};
        log('context: ', context);

        const startTriggerDate = new Date(doc.startDate);
        createTimer(scheduleStartTimer, startTriggerDate, null, context);
        log("start timer created doc, startDate, triggerDate ", meta.id, doc.startDate, startTriggerDate);

        const endTriggerDate = new Date(doc.endDate);
        createTimer(scheduleEndTimer, endTriggerDate, null, context);
        log("end timer created doc, endDate, triggerDate ", meta.id, doc.endDate, endTriggerDate);
    } catch (e) {
        log("OnUpdate Exception:", e);
    }
}

function scheduleStartTimer(context) {
    log("scheduleStartTimer timer started ", context.docId);
    const doc = source_bucket[context.docId];
    let counter = 0;

    while (counter < 5) {
        try {
            if (doc) {
                let now = new Date();
                let startDate = new Date(doc.startDate);

                if (now > startDate) {
                    log("rule started curl request started now and startDate", now, startDate);

                    let request = {};

                    let response = curl('POST', apiUrl, request);
                    if (response.status === 200) {
                        log("rule request is success id: ", context.docId)
                    }
                }
            }
            break;
        } catch (e) {
            counter++;
            log("scheduleStartTimer Exception:", e);
        }
    }
}

function scheduleEndTimer(context) {
    log("scheduleEndTimer timer started ", context.docId);
    const doc = source_bucket[context.docId];
    let counter = 0;

    while (counter < 5) {
        try {
            if (doc) {
                let now = new Date();
                let endDate = new Date(doc.endDate);

                if (now > endDate) {
                    log("schedule end timer curl request started now and endDate", now, endDate);
                    let request = {};

                    let response = curl('POST', apiUrl, request);
                    if (response.status === 200) {
                        log("schedule end request is success: ", context.docId);
                    }
                }
            }
            break;
        } catch (e) {
            counter++;
            log("scheduleEndTimer Exception:", e);
        }
    }
}

function create_UUID() {
    let dt = new Date().getTime();
    const uuid = 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function (c) {
        let r = (dt + Math.random() * 16) % 16 | 0;
        dt = Math.floor(dt / 16);
        return (c == 'x' ? r : (r & 0x3 | 0x8)).toString(16);
    });
    return uuid;
}

Please provide me a clean application log file see Table 2 in https://docs.couchbase.com/server/6.5/eventing/eventing-debugging-and-diagnosability.html#logging-functions (I need it from each node if you have more than one)

Also are you testing validating with one event or are you processin 100’s/1000’s or more mutations ?

Hi @Rohat_Sahin

Try this function (I took your more recent code and just more debug and a tested an actual curl API and rest endpoint). I also made sure the timer id was unique and random (rather than null) - just being paranoid.

This works just fine for me (the control document was just one minute apart) - so please give it a try.

function OnUpdate(doc, meta) {
    try {
        // fail fast if doc type is not Type
        if (doc.type !== 'Type') {
            return
        }

        const correlationId = create_UUID();

        const startTriggerDate = new Date(doc.startDate);
        let s_id = "rand::" + meta.id + '::' + Math.random();
        let s_context = {
            "docId": meta.id,
            "correlationId": correlationId,
            "s_id": s_id
        };
        createTimer(scheduleStartTimer, startTriggerDate, /*null*/ s_id, s_context);
        log("start timer created doc, startDate, triggerDate ", meta.id, doc.startDate, startTriggerDate, s_context);

        const endTriggerDate = new Date(doc.endDate);
        let e_id = "rand::" + meta.id + '::' + Math.random();
        let e_context = {
            "docId": meta.id,
            "correlationId": correlationId,
            "e_id": e_id
        };
        createTimer(scheduleEndTimer, endTriggerDate, /*null*/ e_id, e_context);
        log("end timer created doc, endDate, triggerDate ", meta.id, doc.endDate, endTriggerDate, e_context);

    } catch (e) {
        log("OnUpdate Exception:", e);
    }
}

function scheduleStartTimer(context) {
    log("scheduleStartTimer timer started ", context);
    const doc = source_bucket[context.docId];
    if (!doc || doc === null || doc == undefined) {
        log("scheduleStartTimer timer stno doc found via context.docId ", context.docId);
        return;
    }

    let counter = 0;

    while (counter < 5) {
        try {

            let now = new Date();
            let startDate = new Date(doc.startDate);

            if (now > startDate) {
                log("scheduleStartTimer curl request started now and startDate", now, startDate);

                var request = {
                    path: '',
                    headers: {
                        'What': 'scheduleStartTimer',
                        'AppKey': 'something',
                        'AnotherItem': 'someting else'
                    },
                    body: {
                        id: context.docId,
                        value: doc.type,
                        loop: counter
                    }
                };
                var response = curl('POST', apiUrl, request);

                //log('scheduleStartTimer request', request);
                var response = curl('POST', apiUrl, request);
                if (response.status != 200 && response.status != 302) {
                    log('scheduleStartTimer cURL try ' + counter + ' failed for context ' + context, response);
                } else {
                    log('scheduleStartTimer cURL try ' + counter + ' success for context ', context, response);
                }
            }
            break;
        } catch (e) {
            counter++;
            log("scheduleStartTimer Exception:", e);
        }
    }
}

function scheduleEndTimer(context) {
    log("scheduleEndTimer timer started ", context);
    const doc = source_bucket[context.docId];
    if (!doc || doc === null || doc == undefined) {
        log("scheduleStartTimer timer stno doc found via context.docId ", context.docId);
        return;
    }
    let counter = 0;

    while (counter < 5) {
        try {
            let now = new Date();
            let endDate = new Date(doc.endDate);

            if (now > endDate) {
                log("scheduleEndTimer curl request started now and endDate", now, endDate);

                var request = {
                    path: '',
                    headers: {
                        'What': 'scheduleEndTimer',
                        'AppKey': 'something',
                        'AnotherItem': 'someting else'
                    },
                    body: {
                        id: context.docId,
                        value: doc.type,
                        loop: counter
                    }
                };
                var response = curl('POST', apiUrl, request);

                //log('scheduleEndTimer request', request);
                var response = curl('POST', apiUrl, request);
                if (response.status != 200 && response.status != 302) {
                    log('scheduleEndTimer cURL try ' + counter + ' failed for context ' + context, response);
                } else {
                    log('scheduleEndTimer cURL try ' + counter + ' success for context ', context, response);
                }
            }
            break;
        } catch (e) {
            counter++;
            log("scheduleEndTimer Exception:", e);
        }
    }
}

function create_UUID() {
    let dt = new Date().getTime();
    const uuid = 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) {
        let r = (dt + Math.random() * 16) % 16 | 0;
        dt = Math.floor(dt / 16);
        return (c == 'x' ? r : (r & 0x3 | 0x8)).toString(16);
    });
    return uuid;
}

I added the following document after it was deployed to create a mutation

{
  "type": "Type",
  "id": 1,
  "startDate": "2020-06-12T14:10:00",
  "endDate": "2020-06-12T14:11:00"
}

So on the creation of the above document i.e. the initial mutation you will see a log like:

2020-06-12T14:09:35.641-07:00 [INFO] "start timer created doc, startDate, triggerDate " "Type::1" "2020-06-12T14:10:00" "2020-06-12T21:10:00.000Z" {"docId":"Type::1","correlationId":"4e06ba81-3fdc-4a3a-9f37-b5961a23de1e","s_id":"rand::Type::1::0.18379722406300592"}
2020-06-12T14:09:35.643-07:00 [INFO] "end timer created doc, endDate, triggerDate " "Type::1" "2020-06-12T14:11:00" "2020-06-12T21:11:00.000Z" {"docId":"Type::1","correlationId":"4e06ba81-3fdc-4a3a-9f37-b5961a23de1e","e_id":"rand::Type::1::0.9041868649235303"

Then about 1 minute later as per the control document you will see the cURL processing this is the first or start timer firing:

2020-06-12T14:10:07.660-07:00 [INFO] "scheduleStartTimer timer started " {"docId":"Type::1","correlationId":"4e06ba81-3fdc-4a3a-9f37-b5961a23de1e","s_id":"rand::Type::1::0.18379722406300592"}
2020-06-12T14:10:07.660-07:00 [INFO] "scheduleStartTimer curl request started now and startDate" "2020-06-12T21:10:07.660Z" "2020-06-12T21:10:00.000Z"
2020-06-12T14:10:07.705-07:00 [INFO] "scheduleStartTimer cURL try 0 success for context " {"docId":"Type::1","correlationId":"4e06ba81-3fdc-4a3a-9f37-b5961a23de1e","s_id":"rand::Type::1::0.18379722406300592"} {"status":200,"headers":{"Date":" Fri, 12 Jun 2020 21:10:07 GMT\r\n","Server":" BaseHTTP/0.3 Python/2.7.16\r\n"},"body":{}}

wait about another minute and you will see the second timer firing or the end timer:

2020-06-12T14:11:10.695-07:00 [INFO] "scheduleEndTimer timer started " {"docId":"Type::1","correlationId":"4e06ba81-3fdc-4a3a-9f37-b5961a23de1e","e_id":"rand::Type::1::0.9041868649235303"}
2020-06-12T14:11:10.696-07:00 [INFO] "scheduleEndTimer curl request started now and endDate" "2020-06-12T21:11:10.696Z" "2020-06-12T21:11:00.000Z"
2020-06-12T14:11:10.740-07:00 [INFO] "scheduleEndTimer cURL try 0 success for context " {"docId":"Type::1","correlationId":"4e06ba81-3fdc-4a3a-9f37-b5961a23de1e","e_id":"rand::Type::1::0.9041868649235303"} {"status":200,"headers":{"Date":" Fri, 12 Jun 2020 21:11:10 GMT\r\n","Server":" BaseHTTP/0.3 Python/2.7.16\r\n"},"body":{}}

And here is my REST endpoint a simple python script

from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
from SocketServer import ThreadingMixIn
import threading
from time import sleep

class Handler(BaseHTTPRequestHandler):


    def do_GET(self):
        sleep(0.02)
        self.send_response(200)
        self.end_headers()
        message =  threading.currentThread().getName()
        self.wfile.write(message)
        self.wfile.write('\n')
        return


    def do_POST(self):
        sleep(0.02)
        self.send_response(200)
        self.end_headers()
        message =  threading.currentThread().getName()
        self.wfile.write(message)
        self.wfile.write('\n')
        return


class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
    """Handle requests in a separate thread."""

if __name__ == '__main__':
    server = ThreadedHTTPServer(('localhost', 8080), Handler)
    print 'Starting server, use <Ctrl-C> to stop'
    server.serve_forever()

Hope this helps, the devil is in the details.

@jon.strabala @Siri

I tried everything and refactored but it still doesn’t work as I waited.

I don’t know what’s wrong, I also don’t know why this metric increases as the function meta bucket ops does not come up with a new mutation, I see ops / sec increasing. and now I think there is a problem with the event service

there is no mutation, but the function increases the ops / sec for metadata bucket and stops after a while.

Rohat,

First, I am sorry that things still do not work for you, I gave you a complete working example fully tested that would also log errors. Please read this post and utilize the files I supply at the end of this post in the ZIP archive.

In the future I would appreciate it if you provide as much information as possible including the Application log (on Linux it is located in the directory /opt/couchbase/var/lib/couchbase/data/@eventing/ ) and also capture the output of the python script/webserver too and provide that if you have any questions as I have no visibility into your system other than what you provide.

In my example you can provide a timer in the “past” it will indeed process, I inserted it into my Function’s source bucket ‘register’ with KEY Type::1

{
"type": "Type",
"id": 1,
"startDate": "2020-06-12T10:32:00",
"endDate":  "2020-06-12T10:33:00"
}

I also don’t even know what statistics you are looking at or the document count, for example the Eventing metadata bucket (for me this is “meta”)

First for the bucket “meta” under Enterprise Edition 6.5.1 build 6299 I expect in an idle or quiescent state to see 2048 documents (1024 for the Eventing function plus another 1024 because the function uses timers)

Next for the bucket “meta” even if the system is idle a scanner executing every 15 seconds is constantly run to find items (timers) ready to fire. So you will get ops form this I will show two image of this


The other bucket involved in my “test” is called “register” so lets look at the ops/sec. on that bucket - we expect zero or close to zero as I only have one test document in that bucket (which I inserted and then I mutated again).


Some important points:

Okay back to my test Function in my earlier post (I was making two cURL calls in scheduleStartTimer and scheduleEndTimer) but this doesn’t really impact anything (nor would it cause an issue with the high Ops/Sec you claim. I added a bit more logging here is the new Eventing function:

/*
{
    "type": "Type",
    "id": 1,
    "startDate": "2020-06-12T10:32:00",
    "endDate":  "2020-06-12T10:33:00"
}

*/
function OnUpdate(doc, meta) {
    try {
        // fail fast if doc type is not Type
        if (doc.type !== 'Type') {
            return
        }
        log('OnUpdate meta:', meta);
        log('OnUpdate doc:', doc);

        const correlationId = create_UUID();

        const startTriggerDate = new Date(doc.startDate);
        let s_id = "rand::" + meta.id + '::' + Math.random();
        let s_context = {
            "docId": meta.id,
            "correlationId": correlationId,
            "s_id": s_id
        };
        createTimer(scheduleStartTimer, startTriggerDate, /*null*/ s_id, s_context);
        log("start timer created doc, startDate, triggerDate ", meta.id, doc.startDate, startTriggerDate, s_context);

        const endTriggerDate = new Date(doc.endDate);
        let e_id = "rand::" + meta.id + '::' + Math.random();
        let e_context = {
            "docId": meta.id,
            "correlationId": correlationId,
            "e_id": e_id
        };
        createTimer(scheduleEndTimer, endTriggerDate, /*null*/ e_id, e_context);
        log("end timer created doc, endDate, triggerDate ", meta.id, doc.endDate, endTriggerDate, e_context);

    } catch (e) {
        log("OnUpdate Exception:", e);
    }
}

function scheduleStartTimer(context) {
    log("scheduleStartTimer timer started ", context);
    const doc = source_bucket[context.docId];
    if (!doc || doc === null || doc == undefined) {
        log("scheduleStartTimer timer fired, but no doc found via context.docId ", context.docId);
        return;
    }

    let counter = 0;

    while (counter < 5) {
        try {

            let now = new Date();
            let startDate = new Date(doc.startDate);

            if (now > startDate) {
                log("scheduleStartTimer has fired curl request started now and startDate", now, startDate);

                var request = {
                    path: '',
                    headers: {
                        'What': 'scheduleStartTimer',
                        'context': context,
                        'AppKey': 'something',
                        'AnotherItem': 'someting else'
                    },
                    body: {
                        id: context.docId,
                        value: doc.type,
                        loop: counter
                    }
                };
                //log('scheduleStartTimer request', request);
                var response = curl('POST', apiUrl, request);
                if (response.status != 200 && response.status != 302) {
                    log('scheduleStartTimer cURL try ' + counter + ' failed for context ' + context, response);
                } else {
                    log('scheduleStartTimer cURL try ' + counter + ' success for context ', context, response);
                }
            }
            break;
        } catch (e) {
            counter++;
            log("scheduleStartTimer Exception:", e);
        }
    }
}

function scheduleEndTimer(context) {
    log("scheduleEndTimer timer started ", context);
    const doc = source_bucket[context.docId];
    if (!doc || doc === null || doc == undefined) {
        log("scheduleStartTimer timer fired, but no doc found via context.docId ", context.docId);
        return;
    }
    let counter = 0;

    while (counter < 5) {
        try {
            let now = new Date();
            let endDate = new Date(doc.endDate);

            if (now > endDate) {
                log("scheduleEndTimer has fired curl request started now and endDate", now, endDate);

                var request = {
                    path: '',
                    headers: {
                        'What': 'scheduleEndTimer',
                        'context': context,
                        'AppKey': 'something',
                        'AnotherItem': 'someting else'
                    },
                    body: {
                        id: context.docId,
                        value: doc.type,
                        loop: counter
                    }
                };
                //log('scheduleEndTimer request', request);
                var response = curl('POST', apiUrl, request);
                if (response.status != 200 && response.status != 302) {
                    log('scheduleEndTimer cURL try ' + counter + ' failed for context ' + context, response);
                } else {
                    log('scheduleEndTimer cURL try ' + counter + ' success for context ', context, response);
                }
            }
            break;
        } catch (e) {
            counter++;
            log("scheduleEndTimer Exception:", e);
        }
    }
}

function create_UUID() {
let dt = new Date().getTime();
const uuid = 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) {
    let r = (dt + Math.random() * 16) % 16 | 0;
    dt = Math.floor(dt / 16);
    return (c == 'x' ? r : (r & 0x3 | 0x8)).toString(16);
});
return uuid;
}

I injected a 20 ms (0.02 sec) delay in the python web server those lines can be removed or commented out. Below I comment it out and I also added some more debugging as inprinting some of headers I passed to the POST from Eventing.

from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
from SocketServer import ThreadingMixIn
import threading
from time import sleep

class Handler(BaseHTTPRequestHandler):


    def do_GET(self):
        #sleep(0.02)
        self.send_response(200)
        self.end_headers()
        message =  threading.currentThread().getName()
        self.wfile.write(message)
        self.wfile.write('\n')
        request_headers = self.headers
        hdr_what = request_headers.getheaders('What')
        hdr_context = request_headers.getheaders('context')
        print('done with POST ' + ', '.join(hdr_what) + ' ' + ', '.join(hdr_context))
        return


    def do_POST(self):
        #sleep(0.02)
        self.send_response(200)
        self.end_headers()
        message =  threading.currentThread().getName()
        self.wfile.write(message)
        self.wfile.write('\n')
        request_headers = self.headers
        hdr_what = request_headers.getheaders('What')
        hdr_context = request_headers.getheaders('context')
        print('done with POST ' + ', '.join(hdr_what) + ' ' + ', '.join(hdr_context))
        return


class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
    """Handle requests in a separate thread."""

if __name__ == '__main__':
    server = ThreadedHTTPServer(('localhost', 8080), Handler)
    print 'Starting server, use <Ctrl-C> to stop'
    server.serve_forever()

Okay Rohat I’ll I ask is yo run my test case first verbatim, make two buckets “register” for our source data and “meta” for your Eventing metadata. Both should be initially empty.

Then add the (code) I supplied above I called the Functoin Rohat3, make sure you specify a binding alias and an URL alais as shown below:

Deploy the Function Rohat3

Open a shell tool onto your Eventing node (I assume you have a single eventing node for development if you have more than one start the python webserver on each node). I made a file Rohat3.py to create a test webserver so start it as follows:

python Rohat3.py

Now back in the Eventing UI goto the Buckets and select Documents for “register” then hit “ADD DOCUMENT” use a key Type::1 yes that is an uppercase “T”

image

Hit “Save” and add the data below (yes it is in the past) just do a cut-n-paste

{
    "type": "Type",
    "id": 1,
    "startDate": "2020-06-12T10:32:00",
    "endDate":  "2020-06-12T10:33:00"
}

Hit “Save” in about 15 seconds the completed Event and also both timers will have processed the shell tool you have open will show the POSTs as expected.

linuxbrew@couch01:~/blog_curl$ python Rohat3.py
Starting server, use <Ctrl-C> to stop
127.0.0.1 - - [28/Jun/2020 13:24:09] "POST / HTTP/1.1" 200 -
done with POST scheduleStartTimer {"docId":"Type::1","correlationId":"a47864ff-e423-4818-a587-870d670c6252","s_id":"rand::Type::1::0.36963559736995677"}
127.0.0.1 - - [28/Jun/2020 13:24:09] "POST / HTTP/1.1" 200 -
done with POST scheduleEndTimer {"docId":"Type::1","correlationId":"a47864ff-e423-4818-a587-870d670c6252","e_id":"rand::Type::1::0.4125269767402704"}

Also the Application log will show the processing as expected e.g.

root@couch01:~# cat /opt/couchbase/var/lib/couchbase/data/@eventing/Rohat3.log
2020-06-28T13:23:57.312-07:00 [INFO] "OnUpdate meta:" {"cas":1593375837222666200,"id":"Type::1","expiration":0,"flags":33554438,"vb":634,"seq":13}
2020-06-28T13:23:57.312-07:00 [INFO] "OnUpdate doc:" {"type":"Type","id":1,"startDate":"2020-06-12T10:32:00","endDate":"2020-06-12T10:33:00"}
2020-06-28T13:23:57.314-07:00 [INFO] "start timer created doc, startDate, triggerDate " "Type::1" "2020-06-12T10:32:00" "2020-06-12T17:32:00.000Z" {"docId":"Type::1","correlationId":"a47864ff-e423-4818-a587-870d670c6252","s_id":"rand::Type::1::0.36963559736995677"}
2020-06-28T13:23:57.315-07:00 [INFO] "end timer created doc, endDate, triggerDate " "Type::1" "2020-06-12T10:33:00" "2020-06-12T17:33:00.000Z" {"docId":"Type::1","correlationId":"a47864ff-e423-4818-a587-870d670c6252","e_id":"rand::Type::1::0.4125269767402704"}
2020-06-28T13:24:09.106-07:00 [INFO] "scheduleStartTimer timer started " {"docId":"Type::1","correlationId":"a47864ff-e423-4818-a587-870d670c6252","s_id":"rand::Type::1::0.36963559736995677"}
2020-06-28T13:24:09.107-07:00 [INFO] "scheduleStartTimer has fired curl request started now and startDate" "2020-06-28T20:24:09.107Z" "2020-06-12T17:32:00.000Z"
2020-06-28T13:24:09.110-07:00 [INFO] "scheduleStartTimer cURL try 0 success for context " {"docId":"Type::1","correlationId":"a47864ff-e423-4818-a587-870d670c6252","s_id":"rand::Type::1::0.36963559736995677"} {"status":200,"headers":{"Date":" Sun, 28 Jun 2020 20:24:09 GMT\r\n","Server":" BaseHTTP/0.3 Python/2.7.16\r\n"},"body":{}}
2020-06-28T13:24:09.111-07:00 [INFO] "scheduleEndTimer timer started " {"docId":"Type::1","correlationId":"a47864ff-e423-4818-a587-870d670c6252","e_id":"rand::Type::1::0.4125269767402704"}
2020-06-28T13:24:09.111-07:00 [INFO] "scheduleEndTimer has fired curl request started now and endDate" "2020-06-28T20:24:09.111Z" "2020-06-12T17:33:00.000Z"
2020-06-28T13:24:09.113-07:00 [INFO] "scheduleEndTimer cURL try 0 success for context " {"docId":"Type::1","correlationId":"a47864ff-e423-4818-a587-870d670c6252","e_id":"rand::Type::1::0.4125269767402704"} {"status":200,"headers":{"Date":" Sun, 28 Jun 2020 20:24:09 GMT\r\n","Server":" BaseHTTP/0.3 Python/2.7.16\r\n"},"body":{}}
root@couch01:~#

Now edit your document in the bucket “register” and put it about 1 or 2 minutes in the future it is S28/Jun/2020 13:34:04 for me so I will apply the following by updating both startDate and endDate to be in the future.

{
  "type": "Type",
  "id": 1,
  "startDate": "2020-06-28T13:35:00",
  "endDate": "2020-06-28T13:36:00"
}

The new data for the python web server to the console is as follows:

127.0.0.1 - - [28/Jun/2020 13:35:07] "POST / HTTP/1.1" 200 -
done with POST scheduleStartTimer {"docId":"Type::1","correlationId":"2d8b0042-90cb-4c6d-b7f9-500d325cd613","s_id":"rand::Type::1::0.013607044218856279"}
127.0.0.1 - - [28/Jun/2020 13:36:03] "POST / HTTP/1.1" 200 -
done with POST scheduleEndTimer {"docId":"Type::1","correlationId":"2d8b0042-90cb-4c6d-b7f9-500d325cd613","e_id":"rand::Type::1::0.6307247362494868"}

And the new Application log data is as follows:

2020-06-28T13:34:08.311-07:00 [INFO] "OnUpdate meta:" {"cas":1593376448244809700,"id":"Type::1","expiration":0,"flags":33554438,"vb":634,"seq":16}
2020-06-28T13:34:08.311-07:00 [INFO] "OnUpdate doc:" {"type":"Type","id":1,"startDate":"2020-06-28T13:35:00","endDate":"2020-06-28T13:36:00"}
2020-06-28T13:34:08.314-07:00 [INFO] "start timer created doc, startDate, triggerDate " "Type::1" "2020-06-28T13:35:00" "2020-06-28T20:35:00.000Z" {"docId":"Type::1","correlationId":"2d8b0042-90cb-4c6d-b7f9-500d325cd613","s_id":"rand::Type::1::0.013607044218856279"}
2020-06-28T13:34:08.315-07:00 [INFO] "end timer created doc, endDate, triggerDate " "Type::1" "2020-06-28T13:36:00" "2020-06-28T20:36:00.000Z" {"docId":"Type::1","correlationId":"2d8b0042-90cb-4c6d-b7f9-500d325cd613","e_id":"rand::Type::1::0.6307247362494868"}
2020-06-28T13:35:07.113-07:00 [INFO] "scheduleStartTimer timer started " {"docId":"Type::1","correlationId":"2d8b0042-90cb-4c6d-b7f9-500d325cd613","s_id":"rand::Type::1::0.013607044218856279"}
2020-06-28T13:35:07.114-07:00 [INFO] "scheduleStartTimer has fired curl request started now and startDate" "2020-06-28T20:35:07.114Z" "2020-06-28T20:35:00.000Z"
2020-06-28T13:35:07.116-07:00 [INFO] "scheduleStartTimer cURL try 0 success for context " {"docId":"Type::1","correlationId":"2d8b0042-90cb-4c6d-b7f9-500d325cd613","s_id":"rand::Type::1::0.013607044218856279"} {"status":200,"headers":{"Date":" Sun, 28 Jun 2020 20:35:07 GMT\r\n","Server":" BaseHTTP/0.3 Python/2.7.16\r\n"},"body":{}}
2020-06-28T13:36:03.118-07:00 [INFO] "scheduleEndTimer timer started " {"docId":"Type::1","correlationId":"2d8b0042-90cb-4c6d-b7f9-500d325cd613","e_id":"rand::Type::1::0.6307247362494868"}
2020-06-28T13:36:03.119-07:00 [INFO] "scheduleEndTimer has fired curl request started now and endDate" "2020-06-28T20:36:03.118Z" "2020-06-28T20:36:00.000Z"
2020-06-28T13:36:03.120-07:00 [INFO] "scheduleEndTimer cURL try 0 success for context " {"docId":"Type::1","correlationId":"2d8b0042-90cb-4c6d-b7f9-500d325cd613","e_id":"rand::Type::1::0.6307247362494868"} {"status":200,"headers":{"Date":" Sun, 28 Jun 2020 20:36:03 GMT\r\n","Server":" BaseHTTP/0.3 Python/2.7.16\r\n"},"body":{}}

Assuming you have the buckets "register’ and ‘meta’ you can unzip the attached file “Rohat3_WIP.zip” and then import the Eventing function “Rohat3.json” and run the python webserver Rohat3.py on your eventing node(s).Rohat3_WIP.zip (2.2 KB)

So Rohat once you get the above to work “exactly” as I have shown above then and only then feel free to modify one thing at a time and test until you get things working in your environment (switch to the cURL you want, add your busness logic, etc.). At each step test for 1 mutation, then try multiple mutations (or scheduled items in bucket register). When your happy pause/comment out the logging statements.

Back to your high ops/sec. question: First let me know is you see the same issue with high Ops/sec. with only one test doc and one function I show you what is expected. Also verify that nothing else is running in your Couchbase system other than the toy test Eventing function we are talking about. Please note if your source bucket has 190K items in it I would indeed expect some delay processing the backlog if you deploy your function from Everything instead of from now. Assume you want to run cURL on all 190K items, if your REST endpoint is slow as say 100 ms. (0.1 seconds) the most you can process with a default setting of three workers per function (and two threads per worker) is 60 mutations/sec. this is because each of the six (6) thread can not complete in less than a 10th of a second. In fact since we have two timer call backs and both invoke curl you will only achieve 30 mutations per second. As you can see cURL can quickly become a performance bottleneck. So if you want to process faster you can up the number of workers from the default of 3 to say 12 in your Functions settings or you need a faster REST endpoint in the 1-5 ms. range. Note you can emulate a slow REST endpoint by uncommenting the sleep(0.02) statements Rohat3.py and adjusting them to your measured endpoint speed.

@jon.strabala @Siri
I think like this issue
https://issues.couchbase.com/browse/MB-39399

Rohat, I am aware of the MB you reference up (in fact it was in the back of my mind yesterday). However the root cause of the high Ops could be a different MB or even an issue in your scripts.

Please note your primary comment was that your script didn’t work:

I tried everything and refactored but it still doesn’t work as I waited.

I am trying my best to help you get something that work - I even supplied you with a complete working test bench i.e. Eventing code and a python test REST endpoint in a ZIP file. Remember I don’t even know how many timers you are creating or how far into the future so it is difficult to intuit your exact issues if we have a common code base and see the same issue then helping you just got a lot easier.

If your code doesn’t work and you do not supply logs, it is quite problematic to go chasing down hundreds of bugs.

However once your Function works, if there is a specific performance or operational issue then I can easily work with you and advise on a work around or a solution.

@jon.strabala

it was actually working from the very beginning, but after a while it usually stopped working after 10 12 hours, and when we reploy it started working again. We encountered the same problem for the script you provided.

however, the eventing service I mentioned earlier is running unstable. such as high ops / sec and stop after a while

Rohat,

it was actually working from the very beginning, but after a while it usually stopped working after 10 12 hours, and when we reploy it started working again. We encountered the same problem for the script you provided.

Rohat this is the first time you described the behavior now I want to see both and Application log and a System log (which is in /opt/couchbase/var/lib/couchbase/logs/eventing.log)

So how many timers are you scheduling and how far out into the future, how do your timers get rearmed ( you reference a JCBC for Java) and at what frequency. For the same version of CB that you are running I have run 120,000 timers every minute for weeks in the blog I already referenced, i.e. https://blog.couchbase.com/implementing-a-robust-portable-cron-like-scheduler-via-couchbase-eventing-part-1/

Hi Rohat, I believe you were correct this seems to be MB-39301 (the bug you referenced MB-39399 is a back port of the same to 6.6.0)

I can reproduce your issue (with my code) by creating 10,000 timers in a year in the future. From the image below you can see I can get the same massive growth in ops (specifically gets) that you saw and also see the starvation, i.e. non processing of new events.


If you have a support contract I would advise you to upload a cbcollect_info and open a ticket and request an early delivery of the 6.6.0 Release Candidate.

@jon.strabala

Thanks, our company has a support contract and our database team has opened tickets and the request to 6.6 and hopefully it has been fixed with this version. I also attribute the function to stop working after a while. For this reason, I hope that the mutations do not flow after a while and the backlog accumulates in this way. like this for example

@Rohat_Sahin while you are waiting for a EA of 6.6.0 I do have a solution (it is a bit involved).

Essentially since you are creating timers out into the future well beyond an hour or two the scans to find timers ready to fire keep increasing, however if every vBucket (1024 on Linux/PC or 64 on MacOS) generates and fires a timer the scan time will reset to it minimal value.

There are several methods to do this however I will share the most basic with you (if you need to automate in a pure Eventing fashion via two cooperative functions let me know). The most basic is to compile a Java program and periodically execute the Java program. Below I show a “reset” of the growing ops on my test system.

Just modify your function (Part 1) and then periodically run the Java Program (Part 2) of course update host, user, pass, and source_bucket and you will reset the excesive Ops/Sec. you are seeing back to the default levels.

Part 1 add the following to your Eventing function:

/*
This will allow you to 'rest' growing ops/sec in the Eventing metadata bucket when timers are scheduled 
far into the future.  By adding the following JavaScript code into your Eventing function e.g. the 
function "noopTimer" the "try..catch block" to the top of the "OnUpdate" entry point you can run this
Java (SDK 3.0) program periodically (every 120 seconds?) to reset the scan times for new Timers back to 
the default.
*/

// New function will only print one line out of all the vbuckets
function noopTimer(context) {
    //log(context);
    if (context.type === "vbs_seed" && context.vb === 0) { 
        // Update/touch all 1 doc in the helper_bucket the helper function will then
        // mutate all 1024 of type == vbs_seed (64 on MacOS) to create a recuring cycle.
        log("noopTimer timer fired all 1024 vBuckets showing only vb 0", context);
    }
}


// New code to place at the top of the Eventing entry point will only print one line out of all the vbuckets
function OnUpdate(doc, meta) {
    try {
        if (doc.type === "vbs_seed") {
            // Since we are using an external program to tirgger a timer on all our vBuckets do immediately (can take up to 15 seconds)
            // If we used cross bucket recursion to rearm all the timers in a recurring fashion we would add a delay of at least 40 seconds.
            if (doc.vb === 0) log("noopTimer timer created for all 1024 vBuckets showing only vb 0", doc);
            createTimer(noopTimer, new Date(new Date().getTime() + 0 * 1000), "clr_scan_growth::" + Math.random(), doc);
        }
    } catch (e) {
        log("OnUpdate Exception:", e);
    }
    // ... existing code ....
}

Part 2 add the Java program ScanOpsGrowthReset .java Couchbase Java SDK 3.0

You will also need on more Java file (adjust the package as needed) https://github.com/couchbase/java-dcp-client/blob/master/integration-test/src/main/java/com/couchbase/client/dcp/test/util/MarkableCrc32.java

package com.jonstrabala;

import java.time.Duration;
import java.util.Date;
import java.util.Optional;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.couchbase.client.core.cnc.tracing.ThresholdRequestTracer;
import com.couchbase.client.core.env.IoConfig;
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.Cluster;
import com.couchbase.client.java.ClusterOptions;
import com.couchbase.client.java.Collection;
import com.couchbase.client.java.env.ClusterEnvironment;
import com.couchbase.client.java.json.JsonObject;
import static java.nio.charset.StandardCharsets.UTF_8;

public class ScanOpsGrowthReset {

    private final static Logger LOGGER = Logger.getLogger("com");
    
	static String host = "192.168.3.150";
	static String user = "Administrator";
	static String pass = "password";
	static String source_bucket = "register";
	static boolean isMacOS = false;
	static boolean isVerbose = false;

	public static Optional<String> forceKeyToPartition(String key, int desiredPartition, int numPartitions) {
		final int MAX_ITERATIONS = 10_000_000;
		final MarkableCrc32 crc32 = new MarkableCrc32();
		final byte[] keyBytes = (key + "#").getBytes(UTF_8);
		crc32.update(keyBytes, 0, keyBytes.length);
		crc32.mark();
		for (long salt = 0; salt < MAX_ITERATIONS; salt++) {
			crc32.reset();
			final String saltString = Long.toHexString(salt);
			for (int i = 0, max = saltString.length(); i < max; i++) {
				crc32.update(saltString.charAt(i));
			}
			final long rv = (crc32.getValue() >> 16) & 0x7fff;
			final int actualPartition = (int) rv & numPartitions - 1;

			if (actualPartition == desiredPartition) {
				return Optional.of(new String(keyBytes, UTF_8) + saltString);
			}
		}
		return Optional.empty();
	}


	public static void main(String... args) throws Exception {
		
        LOGGER.setLevel(Level.WARNING);
        System.out.println(new Date() + ": " + "BEG initial connect");

        ClusterEnvironment env = ClusterEnvironment
                .builder()
                .ioConfig(
                		IoConfig
                		.idleHttpConnectionTimeout(Duration.ofSeconds(4)) // must be less than 5 seconds
                )
                .requestTracer(
                		ThresholdRequestTracer.builder(null)
                		.queryThreshold(Duration.ofSeconds(12)).build()
                )
                .build();
                
		ClusterOptions coptions = ClusterOptions.clusterOptions(user,pass).environment(env);	
		Cluster cluster = Cluster.connect(host, coptions);		
		Bucket bucket = cluster.bucket(source_bucket);
		final Collection collection = bucket.defaultCollection();

		int numvbs = 1024; // default is linux/PC
		if (isMacOS) numvbs = 64;
		
		for (int vb=0; vb<numvbs; vb++) {
		    Optional<String> s =  forceKeyToPartition("vbs_seed::"+vb, vb, numvbs);		    
		    JsonObject jo = JsonObject.create().put("type", "vbs_seed").put("id", s.get().substring(10)).put("vb", vb);
		    if (isVerbose || (vb < 3 || vb > numvbs -4)) {
		    	System.out.println("KEY: " + s.get() + " VAL: " + jo);
		    } else {
		    	if (vb == 5) System.out.println("\t*\n\t*\n\t*");
		    }
			collection.upsert(s.get(), jo);
		}
		bucket = null;
		cluster.disconnect(Duration.ofSeconds(3));
		System.out.println(new Date() + ": " + "END "+numvbs+" upserts done macOS==" + isMacOS);
	}
}

Please let me know if this works until you get an EA of 6.6.0