Filter push-down to DCP producer

dcp

#1

DCP is a great way to batch access data or to ingest updates of reference objects to a streaming application. For batch and stream access we either use Apache Spark or Apache Flink. Let’s say that the total number of any type of document (e.g. users) are sufficiently large not to retrieve all through a single query. Also let’s state that some buckets hold multiple types of documents.

We implement the following use-cases:
UC1. Incoming user-click stream is analyzed real time using Apache Spark in a streaming program. Click events are correlated with user data, that is retrieved from Couchbase using the Spark Connector’s Receiver implementation, which uses DCP. The latter user update stream is kept as a state in Spark, using for example updateStateByKey.
UC2. Incoming user ratings (explicit or implicit click events) are used for online update of a factor model, a recommendation engine that is implemented in Apache Spark. In the program, ratings are correlated with user objects and items as well. These are kept in memory the same way as in the previous use-case.
UC3. Using Apache Spark we batch read items from Couchbase using DCP [see spark.sparkContext.couchbaseQuery number of partitions] into Apache Zeppelin notebook and do simple analytics and ad-hoc queries, aggregations on Spark’s in-memory data. Data is then written to HDFS.

I could easily add 10 more critical use cases from our applications.

Due to the “get-everything” nature of DCP, all in scenarios, the whole bucket will be read, transferred, deserialized and then a decision will be made whether the document is useful (i.e. filter for users in UC1). As mutations occur in Couchbase, all DCP clients will receive all the mutations, even though most DCP use cases filter for a specific type of document (e.g. users or items). This results in an impractically high overhead - pressure on the memory/disk (server side) as well as the CPU (client side).

A simple filter push-down mechanism would mitigate most overhead off these use cases. In a filter push-down, during DCP handshake, the client would initialize the connection with arbitrary filters, thus refusing mutations of some “document.type”. In UC1, the streaming application would essentially receive mutation and deletion events only where the “document.type” is “user”.

I would be happy to start a discussion on this improvement, whether it is feasible on the server side. Also, I’m willing to contribute to the Java client library if the idea ever gets materialization.

@david.nault

Cheers,
Zoltán


#2

Thanks for the comments and feedback.

We launched the Eventing Service in Couchbase Server 5,5 release : https://blog.couchbase.com/eventing/

Eventing Service essentially productises DCP and offers better reliability and performance guarantees. Functions allows yo to bring the business logic closer to database.

As part of the Eventing Service, we intend to launch Feeds that will allow you to consume events using an SDK. Do have a look at the spec published on our github repo.


#3

Hi Zoltán,

I asked some Couchbase Server experts to take a look at this thread, and the consensus is there’s something exciting in the pipeline for the use case you described.

In addition to Eventing (which as Venkat mentioned is available now!), there’s ongoing work on a Collections feature that will allow grouping documents within a bucket. For example, you’ll be able to organize documents into a users collection and an items collection, and then open a DCP stream that only receives events from a specific collection.

The Collections work is being tracked as MB-16181. It’s been baking for a long time. According to the Jira ticket, it’s currently scheduled for inclusion in Mad Hatter (not the next major release, but the one after).

Thanks,
David


#4

@venkat & @david.nault thanks for the update.

I have looked into the description of the Eventing Service in the Couchbase Server 5.5 release. Please correct me if I’m drawing wrong conclusions here, but the function is very close to a traditional relational DB feature, called triggers (and their corresponding procedures). From a technical point of view, the use cases I described all incorporate at least one join and/or scatter-gather (key-grouping), moreover stateful operations with essentially a reliable fault tolerance - all of which a data processing engine provides.

My idea is that Eventing Service combined with Kafka queues might do the trick, where the Eventing Service routes mutations to their corresponding “collection”-based topic. One drawback would be that such a queue that Kafka provides will not truncate mutation history as DCP would. Still, reading from the start_seqno is required to bootstrap users state as in UC1.

MB-16181 sounds great, but it seems there is much work to do.