Feature request: Elasticsearch Connector should be able to filter documents

We increasingly faced with the requirement to maintain separate Elasticsearch indexes for our Couchbase documents of a certain bucket that contains roughly 240 million documents.

The current Elasticsearch Connector (4.2.2-SNAPSHOT) [1] does not allow Couchbase documents to be filtered before written to Elasticsearch, except the following example: [2]

[[elasticsearch.type]]
  # Index can be inferred from document ID by including a capturing group
  # named "index". This example matches IDs that start with one or more
  # characters followed by "::". It directs "user::alice" to index "user",
  # and "foo::bar::123" to index "foo".
  regex = '(?<index>.+?)::.*'

We mostly have randomized document IDs, for example, 32 character long alphabetical strings. Other document classes are stored in the document itself. For example type, class, interestGroup and such.

It would be nice to have filters like type="News" AND interestGroup="Group57" - not expressed exactly like this in the configuration file at [2], but simple AND + OR and exact matching would be great!

  • Is there anything like this ever been considered?
  • There is a slight chance I think that we missed something, and there is something like this, isn’t there?
  • If to anywhere, I would probably place the filter right here, [3] and not let it to go to the eventSink. What do you think? @david.nault

[1] https://github.com/couchbase/couchbase-elasticsearch-connector
[2] https://github.com/couchbase/couchbase-elasticsearch-connector/blob/765cf72351719ebbfa52bf437a5c470ca8bb2eb8/src/dist/config/example-connector.toml#L162
[3] https://github.com/couchbase/couchbase-elasticsearch-connector/blob/765cf72351719ebbfa52bf437a5c470ca8bb2eb8/src/main/java/com/couchbase/connector/dcp/DcpHelper.java#L144

1 Like

Hi Zoltan,

This is a reasonable thing to want to do. The database change protocol used by the connector places some limitations on us though. Deletion notifications do not include the content of the deleted document. This means the connector would not be able to route deletions to the correct index. In the past we’ve accommodated this by adding an “ignoreDeletes” flag to the the definition, and requiring it be set to true for all rules that need to inspect the document content.

Does your use case require that deletions propagate to Elasticsearch, or would you be happy with setting ignoreDeletes=true for types whose ES index is determined by content fields?

Thanks,
David

1 Like

Hi David,

There are no deletes in our use-cases. If there are, deletes are managed yearly by dropping the last year’s data and persisting it to something like HDFS. Reloading the whole index yearly is accepted. Ignoring deletes are fine with us.

I assume that when a DeleteMutation would hit an index that has no matching document to delete (because previously an UpdateMutation was filtered out), it would fail silently. Therefore even with a small amount of DeleteMutation failing silently, it would still be practical. Does this sound reasonable?

Would this be a big change/feature? We could help in the development/testing of this right away.

P.S.: Will DCP support Couchbase collections eventually or will it stay as a v-bucket-level protocol? (I have no idea how collections are being implemented.)

Thanks,
Zoltán

Hi Zoltan,

Just wanted to check in. I can’t make a commitment about the timeline, but I do think it would make sense to add this feature to the connector. It seems like a good use for some “json stream matching” code we’ve got sitting on a shelf.

Turn out we’re already tracking this feature request as CBES-146. I’ve reprioritized it to target the June 16th release. That might be optimistic, but we’ll see. I’ll ping you as soon as it’s ready to play with.

Will DCP support Couchbase collections eventually or will it stay as a v-bucket-level protocol?

Yes, support for collections is the next big feature we’re adding to the connectors.

Thanks,
David

1 Like

@david.nault we started a quick implementation, that is kind-of-a-hackish workaround to enabled filtering for a single use-case. The solution is not configurable, but easily deployed, since we use the principle of “store configuration & deployment in Git”, therefore a change in code is built and upgraded on Kubernetes within 1 minute.

What we have is the following:

if (filteringEnabled && DcpMutationMessage.is(event)) {
[...]
try {
  byte[] dataBuffer = new byte[DcpMutationMessage.content(event).readableBytes()];
  DcpMutationMessage.content(event).readBytes(dataBuffer);
  data = new String(dataBuffer, StandardCharsets.UTF_8);
  JsonParser jParser = jFactory.createParser(data);
  while (jParser.nextToken() != JsonToken.END_OBJECT) {
    [...] // here we set a boolean whether the event should go to the sink or not
  }
}
[...]
}

At the end when the sink is not used, we make sure to call:

ackAndRelease(flowController, event);

But, after a while, the DCP Client stops calling the dataEventHandler. I suspect that the watermark is reached, thus, the server would not send messages to the client.

What are we missing here? We have the exact same technique for Spark: we use DCP to read all Couchbase documents into an RDD. It works fine. In the Spark use-case, we read event payload into a String, parse it with a JSON4S pull-parser and filter events before storing them in Spark. Events are released using the same technique.

When filtering is disabled in our patch, the Connector works fine and keeps receiving events.

Thanks!

1 Like

Hi Zoltan,

Where in the connector was that filtering code added?

Thanks,
David

1 Like

We added it to DcpHelper as follows:

public static void initDataEventHandler(Client dcpClient, Consumer<Event> eventSink, SnapshotMarker[] snapshots) {
    dcpClient.dataEventHandler((flowController, event) -> {
      boolean letItPass = true;
      boolean eventReleased = false;
      if (filteringEnabled && DcpMutationMessage.is(event)) {
        String data = "";

        try {
          byte[] dataBuffer = new byte[DcpMutationMessage.content(event).readableBytes()];
          DcpMutationMessage.content(event).readBytes(dataBuffer);
          data = new String(dataBuffer, StandardCharsets.UTF_8);
          JsonParser jParser = jFactory.createParser(data);
          while (jParser.nextToken() != JsonToken.END_OBJECT) {
            String fieldName = jParser.getCurrentName();

            // update letItPass

            if (!letItPass) {
              break;
            }
          }
        } catch (Exception e) {
          e.printStackTrace();
          LOGGER.warn("Could not read event bytes of type '{}'", event.readableBytes() > 0 ? event.getByte(1) : "<zero length>");
          ackAndRelease(flowController, event);
          eventReleased = true;
          letItPass = false;
        }
      }

      if (letItPass) {
        if (DcpMutationMessage.is(event) || DcpDeletionMessage.is(event) || DcpExpirationMessage.is(event)) {
          final short vbucket = MessageUtil.getVbucket(event);
          final long vbuuid = dcpClient.sessionState().get(vbucket).getLastUuid();

          final Event e = new Event(event, flowController, vbuuid, snapshots[vbucket]);
          //LOGGER.trace("GOT DATA EVENT: {}", e);
          LOGGER.info("accepting to sink");
          eventSink.accept(e);

        } else {
          LOGGER.warn("Unexpected data event type '{}'", event.readableBytes() > 0 ? event.getByte(1) : "<zero length>");
          ackAndRelease(flowController, event);
        }
      } else if (!eventReleased) {
        LOGGER.info("do not pass but also not released. releasing");
        ackAndRelease(flowController, event);
      } else {
        throw new RuntimeException("Not ACKd!");
      }
    });

I have to make a remark, that during the operation of this filtering-DCP-client, the code so far never reached the eventSink. All events end up in branch LOGGER.info("do not pass but also not released. releasing");, but the DCP client stops after a certain number of events and prints:

19:00:03.803 [nioEventLoopGroup-2-1] INFO  c.c.c.d.CheckpointService - Remaining backfill by vbucket: [1379416, 1357596, 1349119, 1413314, 1360039, 1387226, 1451 [...]

…where numbers did not change in 24 hours - DCP client really stops.

Thanks!

1 Like

Hi Zoltan,

You can verify whether flow control (buffer ack) is the issue by disabling it when building the DCP client. In DcpHelper.newClient() try commenting out these lines:

.mitigateRollbacks(
    config.dcp().persistencePollingInterval().duration(),
    config.dcp().persistencePollingInterval().timeUnit())
.flowControl(toIntOrDie(config.dcp().flowControlBuffer().getBytes()))
.bufferAckWatermark(60);

That will cause the server to keep streaming data regardless of whether the DCP client has ack’d previous messages. The result of that experiment might tell is where to focus next.

Some other things to consider:

That log message from the CheckpointService is far downstream of the DCP client. The backfill statistics only get updated after a message reaches the connector sink. If all of your messages are being filtered out, the backfill numbers won’t change.

I would recommend changing catch (Exception e) to catch (Throwable e), just to be sure nothing is slipping through.

Until the problem is uncovered, I’d recommend parsing the JSON using an ObjectMapper instead the streaming API. This could make it easier to verify that the filtering logic is working as intended.

Do you see any log messages indicating at least some documents are passing the filter?

Thanks,
David

1 Like

Hi David,

I have applied your suggestions on the filtering deployment and now seeing the following messages:

19:22:44.367 [nioEventLoopGroup-5-2] INFO  c.c.c.d.DcpHelper - do not pass but also not released. releasing
19:22:44.367 [nioEventLoopGroup-5-2] INFO  c.c.c.d.DcpHelper - do not pass but also not released. releasing
19:22:44.367 [nioEventLoopGroup-5-1] INFO  c.c.c.d.DcpHelper - do not pass but also not released. releasing
19:22:44.367 [nioEventLoopGroup-5-2] INFO  c.c.c.d.DcpHelper - do not pass but also not released. releasing
19:22:44.367 [nioEventLoopGroup-5-1] INFO  c.c.c.d.DcpHelper - do not pass but also not released. releasing

I know that the first 50% of the Couchbase history should be filtered out based on the documents and the filters I now applied. Let’s see how far it can go. After the first 50%, documents should start appearing near the sink.

In the meantime, I noticed that you mentioned CheckpointService as dependent on the sink receiving any messages. Could this potentially lead to a refactor of CheckpointService to make sure that events not hitting the sink to get counted for?

Thanks,
Zoltán

Hi David,

We could not get it to work so far. We have the same problem: DCP stream stops after a while.

As a quick solution, we are going to create a separate bucket for those documents and then create a full DCP stream to Elasticsearch.

Thanks for your kind help and we hope to help and test this feature in case you had time to progress with it.

Zoltán

Hi Zoltan,

Shucks, that’s disappointing. I’m glad you were able to find a temporary workaround though. I’ll keep you posted as we progress with CBES-146.

Thanks,
David

1 Like