Kafka Source connector's filter parallelism

I have a need to add a integer counter field to the source record, so i created a custom filter and added the same to the connector property ’ event.filter.class'. I increase this counter in the ‘pass’ call back method (which is called for every DCP event)
I want to know if the counter variable has to be treated as an instance variable or a static variable,

  1. So I want to understand how the filter class is instantiated i.e. for a given connector task, how many instances of the filter class would be instantiated?

  2. Is the filter object or the filter objects called from multiple threads?

  3. And finally, can more than one task for the same connector go to the same connect worker? I guess it does. if so, then if i treat the counter variable to be a static variable then i have to synchronize it, rather i would keep it as an instance variable but still question 1 and 2 needs to be answered.

In short, is the instance variable in the filter class thread safe

  1. So I want to understand how the filter class is instantiated i.e. for a given connector task, how many instances of the filter class would be instantiated?

One per task, here.

  1. Is the filter object or the filter objects called from multiple threads?

Each task’s filter is called here. According to the Confluent documentation and this StackOverflow answer, each task is given a dedicated thread, and the poll() method is always called on this thread. Note that it might be a different thread for each task.

  1. And finally, can more than one task for the same connector go to the same connect worker?

Yes, a single worker process can host any number of tasks.

if i treat the counter variable to be a static variable then i have to synchronize it

If you’re set on this path, I’d recommend using a static AtomicLong. But as I’m sure you’re aware, using an in-memory counter would limit you to running a single connect worker.

@david.nault, Essentially i need a way to get the latest of the documents in couch i.e. a document with key 100 may have multiple events (t1,t2,…,t10) and i ingest these events using Kafka connector. I should be able to derive the latest which is t10 out of these 10 events (my questions in the other thread is related to this).
Determining the latest of every document using the application logic is quite complicated and hence resorting to a technical solution. I can use the bySeqNo but as you mentioned in the other thread, its an issue during the failover scenario (Ofcourse, i need to check the persistence polling that you mentioned)

Keeping the persistence polling aside for now…
So i have written a connect filter and in the poll method, i add a epoch timestamp to the source record and use this timestamp to determine the latest (as the events in the same vBucket would go to the same task and in order, so adding a timestamp would help me determine the latest). The problem that i am facing now is that the same timestamp is added to different events, i guess kafka connector processes more events per millisec.
Now i am thinking of adding a counter also apart from the timestamp. The counter is an instance variable in the filter and its incremented for every event. Since the events in a vBucket go to the same connector task and in order, the counter that i add would also be incremental. I compare the timestamp first and then compare the counter only if the timestamp is the same. I have to do this keeping the failure scenario in mind i.e. if the kafka connector worker or task or node fails, then the counter would be reset and comparing the counter first would not help. In this case, the timestamp would definitely differ as it would take more than a millisec to recover but from then on, the counter would help even if the timestamp is the same. I guess keeping the synchronized static counter (AtomicLong) would also help.

On the point that you made - " If you’re set on this path, I’d recommend using a static AtomicLong. But as I’m sure you’re aware, using an in-memory counter would limit you to running a single connect worker."
As the events in the same Vbucket go to the same connector task and i want the ordering per document (document with the same key go to the same vBucket), this should not be an issue even if i have multiple workers. I do not want a global counter but the counter which would help me per document events.

What are your thoughts?

Sorry, I misunderstood your requirements earlier. And I probably still don’t understand very well :slight_smile: Rather than get myself into trouble by giving bad design advice, I’ll try to stick to just clarifying how the connector behaves.

If you’re using persistence polling, the latest version of a document is the one with the highest dcp seqno. It’s also the most recent message in the Kafka topic, unless the stream was temporarily rewound due to a connector restart.

If you’re looking for a per-document counter that’s incremented whenever the document changes, that revSeqNo field we discussed in the other thread might be exactly what you want. I’m not sure if it’s reset to zero if a document is deleted and recreated; it should be easy enough to find out by experimenting, although we’re getting deep into unsupported territory.

@david.nault, per-document sequence number or the sequence number across documents does not matter in my case. Ultimately, it has to be an increasing number for a document and need not be strictly continuous as i have to only find the latest of the events per document.

so, lets assume that there are 2 documents with keys 100 and 101. There are mutation events to these documents and lets say that the sequence numbers are the following for these these events,

100 - 1,7,10,14,15
101-2,5,8,9,11

With this, i know that 15 and 11 are the latest events for 100 and 101 respectively. So bySeqNo would help me here. I don’t need the revSeqNo (in fact i still don’t understand the purpose of revSeqNo :slight_smile:)
The counter logic in the filter that i wanted to add is primarily for the CouchBase failover scenario but persistence polling would cover that as well (i understood that now :slight_smile: )

  1. Am i still missing something here?

Now one additional question,
The same vBucket stream is processed by the same connector task and if the task fails for some reason, then i assume that the sequence would still be still maintained if the task comes back on another node or that vBucket is processed by another task (of course if the entire connect cluster goes down and the connect offset topics are also gone then it would start from the beginning as it loses its state)

  1. Am i missing something here?

@david.nault Can you kindly revert?