Start from state

Hi, everyone,

I am using the couchbase kafka connector (GitHub - couchbase/couchbase-kafka-connector: Legacy Couchbase to Kafka connector, superseded by Kafka Connect based.) and I am trying to start the connector from a defined point and not stop after that.

ConnectorState start = connector.startState();
ConnectorState end = connector.endState();
connector.run(start, end);

This starts froms the very beggining and works ok. But when the code changes to:

ConnectorState start = connector.startState(766);
ConnectorState end = connector.endState();
connector.run(start, end);

It throws an exception:

java.lang.IllegalArgumentException: partitions in FROM state do not match partitions in TO state

Any ideas how to solve it?

Thanks in advance
Juan Manuel

PD: The state of the connector is saved in a file. We implemented one StateSerializer.

Hi Juan,

The code throwing that exception expects the start and end states to be identical. I suspect that means this feature was not fully implemented in the legacy kafka connector. Perhaps @avsej can confirm?

By the way, the couchbase-kafka-connector project has been superseded by the new Kafka connector which integrates with the Kafka Connect framework: https://github.com/couchbase/kafka-connect-couchbase . I would recommend switching over if you can, since that’s where all new development effort is focused.

There’s an enhancement request for adding start/stop controls to the new connector. Could I ask you to take a look at the Jira issue and comment on whether any of the proposed solutions meet your requirements?

Thanks,
David

Hi David,

Thanks for the reply! We are already using the old version and it is working great. The only problem we cant solve at this point is if for some reason something fails we want to start from the last consistent point. That is why we are saving the state.

The new implementation is a little complicated to follow for what we have read so far.

If you or @avsej could guide us to complete this feature, we’d really appreciate it.

Thanks in advance
Juan Manuel

One nice feature of the new version: on restart it automatically resumes at the point where it left off, so it’s not necessary to manually track the session state. Is this the behavior you’re trying to implement? If you’re unable to upgrade, perhaps you can borrow the code that does this. (I’m not too familiar with this aspect of the connectors, so I can’t say how easy or difficult it would be.)

I appreciate your feedback about the new connector. Are there specific things you found difficult to implement or hard to understand about the new connector? Hopefully we can make those things simpler.

David,

The old version of the connector is very simple to implement, it is only necessary to implement a filter, an encoder, and a state serialized.

The code with the problem is this: code

I am going to give another try to the new connector version. Is it possible to implement something to control where the state is saved? We want to save the state into a file and/or into a zookeeper.

Thanks!

With the new connector, the state is managed automatically by the Kafka Connect framework. If you search the Kafka Connect documentation for “offset.storage” you’ll see how to configure the framework to store the offsets (state) in a file or in a Kafka topic.

David, thanks for pointing me that new feature.

The problem we are trying to solve is to read four couchbase buckets and send the data to only one kafka topic. Rigth now we have four instances of the old couchbase kafka connector running and sending data to kafka. That data is read by a consumer what normalize it and save it in a elastic search.

That is why we have to save each state in another place than in the kafka topic.

Thanks in advance
Juan Manuel

The new connector should handle that situation just fine.

If you want to use filesystem storage, it’s possible to configure each connector instance to use a different offset storage file.

If you want to use the Kafaka storage, the bucket name is part of the namespace for the offset storage. As long as the connectors all read from different buckets, their offset storage will be separate.

If you wanted multiple connectors to read from the same bucket, there’s an additional config property compat.connector_name_in_offsets that must be set to true. But it sounds like that should be not be necessary in your case, since each connector instance reads from a different bucket.

Again, I’m not completely familiar with the features and limitations of the old connector, so it’s possible I’ve misunderstood your situation. If that’s the case, let me know :slight_smile:

David,

You had understood our situation perfectly. Are you going to release a new version (with a tag) any time soon? Because we would like to use a tagged version of the connector instead of the master branch.

Another one, where can we config kafka or zookeeper location? Could you include an example how to run the connector? I am not very good with java (some example like it is in the old connector)

Thanks
Juan Manuel

Hi Juan,

Version 3.2.0 was tagged today. The official release will be available on the Couchbase download page as soon as the documentation is online; we’re currently aiming for this Friday.

In the mean time I would encourage you to follow along with the quickstart guide on GitHub, which shows how to run the connector.

The Kafka and Zookeeper locations are specified when you start the connector, in the properties file that is the first argument to the connect-standalone (or connect-distributed) command. This is standard Kafka Connect stuff, which you can read about in the Kafka Connect documentation.

Cheers,
David

The official release of Kafka Connect Couchbase 3.2.0 is now available.