Rejected Events Comes Again After Connecter Restarts

We are using Couchbase Kafka source connector (v4.1.1) with custom filter. After the connector restarts, rejected events (I mean returned false from the custom filter), comes again.
I think there is an inconsistency between the seq number in Kafka committed offset and couchbase. According to this blog post: Couchbase DCP Rollback and How QA Tests Them | The Couchbase Blog. Because rejected events don’t send to Kafka, so there will be no committed offset in Kafka for rejected ones. We do not want to process previously rejected events after the restart.

Is this default behavior or are we missing something?

Example log:

Starting to Stream for 1 partitions
Stopping stream for 1 partitions
Received rollback for vbucket 581 to seqno 0
{"milestone":"RECEIVED_FROM_COUCHBASE","tracingToken":6,"documentId":"_default._default.12345678","connectTaskId":"0","revision":6,"type":"mutation","partition":469,"sequenceNumber":6,"sizeInBytes":4297,"usSinceCouchbaseChange(might be inaccurate before Couchbase 7)":256087724954}
{"milestone":"SKIPPED_BECAUSE_FILTER_SAYS_IGNORE","tracingToken":6,"documentId":"_default._default.12345678"}
Poll returns 0 result(s) (filtered out 1)
-- restart
same logs comes again
{
    "name": "kafka-connector",
    "connector.class": "com.couchbase.connect.kafka.CouchbaseSourceConnector",
    "tasks.max": "1",
    "couchbase.topic": "topic_name",
    "couchbase.seed.nodes": "...",
    "couchbase.bootstrap.timeout": "5s",
    "couchbase.persistence.polling.interval": "100ms",
    "couchbase.compression": "ENABLED",
    "couchbase.stream.from": "SAVED_OFFSET_OR_NOW",
    "couchbase.source.handler": "com.couchbase.connect.kafka.handler.source.RawJsonSourceHandler",
    "couchbase.event.filter": "our custom filter class path",
    "couchbase.log.redaction": "PARTIAL",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "transforms": "dropIfNullValue,deserializeJson",
    "transforms.dropIfNullValue.type": "com.couchbase.connect.kafka.transform.DropIfNullValue",
    "transforms.deserializeJson.type": "com.couchbase.connect.kafka.transform.DeserializeJson",
    "couchbase.connector.name.in.offsets" : "true"
}

Hi @mrblithe ,

This is the expected behavior.

The Kafka Connect framework stores the “source offset” for each Couchbase partition. It can only update this offset if the connector gives it a message; since the filter is discarding the document changes, the framework is never made aware of change to the source offset, which is why the events appear again (and are filtered out again) when the connector restarts.

There are a couple of ways around this problem.

The most elegant solution would be to upgrade to Couchbase 7 and store documents that you want to ignore in different collections than the documents you want to replicate. The connector has a couchbase.collections config property where you specify which collections the connector is interested in. Only changes from those collections will be sent to the connector.

An alternative that doesn’t require Couchbase 7 would be to use a custom SourceHandler instead of a Filter. Instead of ignoring the document change, it could publish a dummy record (with null value) to a “black hole” topic configured for low retention. This way the Kafka Connect framework would see the changes and update the source offset.

Thanks,
David

Just a quick followup to explain why this isn’t an issue for most deployments.

Kafka Connect tracks one “source offset” for each of the 1024 Couchbase partitions. It’s essentially a sequence number that identifies the change within the partition. If a document change is ignored, but a second document change in the same partition is not ignored, the Kafka Connect framework will store the source offset associated with the second document. In this case, assuming the framework has a chance to persist the offsets, you won’t see the filtered message when the connector restarts.

Typically if there’s a decent amount of traffic in your Couchbase cluster, a non-filtered document change will eventually occur in each Couchbase partition. The problem only occurs if some partitions never receive a change the connector doesn’t ignore.

Thank you for the valuable information @david.nault .