How to ignore errors in Sink Kafka Connector

I created a sink connector from Kafka to Couchbase.
It writes all events to Couchbase successfully until an event that has empty key came.

It stuck at that point with failure on the task.

Error in the connector task:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception

Caused by: com.couchbase.client.core.error.InvalidArgumentException: Id cannot be null or empty\n\tat com.couchbase.client.core.error.InvalidArgumentException.fromMessage(InvalidArgumentException.java:28)\n\tat com.couchbase.client.core.util.Validators.notNullOrEmpty(Validators.java:70)\n\tat com.couchbase.client.core.util.Validators.notNullOrEmpty(Validators.java:78)\n\t... 35 more\n"

I want to ignore that error(and its event) and continue with the other events.

I try these configs but it didn’t work.

"errors.tolerance": "all",
"errors.log.enable": true,
"errors.log.include.messages": true,

How can I achieve this?

Hi @sinkconnector,

One solution is to apply a Single Message Transform (SMT) to the sink connector. An SMT can transform (or filter out) a message before it reaches the sink connector.

The Couchbase Kafka connector GitHub repo has an example project that shows how to build a custom SMT (among other components). With that project as a starting point, you could add something like this:

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.transforms.Transformation;

import java.util.Map;

public class DropIfKeyIsEmptyString<R extends ConnectRecord<R>> implements Transformation<R> {

  public static final String OVERVIEW_DOC =
      "Propagate a record only if its key is not an empty string.";

  @Override
  public R apply(R record) {
    if (!(record.key() instanceof String)) {
      return record;
    }

    String key = (String) record.key();
    return key.isEmpty() ? null : record;
  }

  @Override
  public ConfigDef config() {
    return new ConfigDef();
  }

  @Override
  public void close() {
  }

  @Override
  public void configure(Map<String, ?> configs) {
  }
}

If you’d rather use off-the-shelf components, the comments on this issue filed against the Kafka Elasticsearch connector describe an alternate workaround:

The trick is to use the built-in org.apache.kafka.connect.transforms.Filter SMT with a predicate from the third party Kafka Connect JMESPath plugin. The config in the comments shows how to filter out null keys. I imagine it could be adapted to look for empty string keys instead of null keys.

Thanks,
David

Thanks a lot @david.nault

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.