Filter DCP Messages by document content



I’m currently trying to connect Couchbase to Kafka using the Kafka Connector. But my bucket receives all kind of documents. Here in my company we use a convention where all documents must have a “type” property and I would like to open the MutationMessage and filter the MutationMessage by this field.

What is the correct way of doing that?

Currently I created a MutationFilter that filter MutationMessage’s and then I open the message and create the JsonDocument of that Message. Like this:

class MutationFilter extends Filter {
	override def pass(evt: DCPEvent): Boolean = {
		val trans = new JsonTranscoder()
		(evt.message().isInstanceOf[MutationMessage]) &&
			.content()).get("type") == "MessagePosition")


In order to do this I depend on the couchbase-client library.

Is there a better way of doing this? Is this the right way?