Filter DCP Messages by document content

connections
java

#1

I’m currently trying to connect Couchbase to Kafka using the Kafka Connector. But my bucket receives all kind of documents. Here in my company we use a convention where all documents must have a “type” property and I would like to open the MutationMessage and filter the MutationMessage by this field.

What is the correct way of doing that?

Currently I created a MutationFilter that filter MutationMessage’s and then I open the message and create the JsonDocument of that Message. Like this:

class MutationFilter extends Filter {
	override def pass(evt: DCPEvent): Boolean = {
		val trans = new JsonTranscoder()
		(evt.message().isInstanceOf[MutationMessage]) &&
		(trans.byteBufToJsonObject(evt.message()
			.asInstanceOf[MutationMessage]
			.content()).get("type") == "MessagePosition")

	}
}

In order to do this I depend on the couchbase-client library.

Is there a better way of doing this? Is this the right way?