Couchbase Kafka Connector issue

Hi,
I am using couchbase kafka connector to capture CDC events to Confluent kafka topic …I get configuration error below
Any help is greatly appreciated
[2020-01-30 17:52:30,729] ERROR WorkerConnector{id=test-couchbase-source} Error while starting connector (org.apache.kafka.connect.runtime.WorkerConnector)
kafka-connect_1 | org.apache.kafka.connect.errors.ConnectException: Cannot fetch configuration for bucket CentralDB
kafka-connect_1 | at com.couchbase.connect.kafka.CouchbaseSourceConnector.start(CouchbaseSourceConnector.java:60)
kafka-connect_1 | at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:111)
kafka-connect_1 | at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:136)
kafka-connect_1 | at org.apache.kafka.connect.runtime.WorkerConnector.transitionTo(WorkerConnector.java:196)
kafka-connect_1 | at org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:231)
kafka-connect_1 | at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:908)
kafka-connect_1 | at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1300(DistributedHerder.java:110)
kafka-connect_1 | at org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:924)
kafka-connect_1 | at org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:920)
kafka-connect_1 | at java.util.concurrent.FutureTask.run(FutureTask.java:266)
kafka-connect_1 | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
kafka-connect_1 | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
kafka-connect_1 | at java.lang.Thread.run(Thread.java:748)

Hi Pramod,

The “Cannot fetch configuration for bucket” error occurs when the connector is unable to connect to any of the seed nodes on port 8091 using the credentials from the config file. I think it might also happen if the bucket does not exist. There might be more info about the root cause earlier in the log.

I’d recommend double-checking the following config properties to make sure they’re correct for your environment. Try using admin credentials to see if that’s the issue (then scale back as appropriate).

  • connection.cluster_address
  • connection.username
  • connection.password
  • connection.bucket

Thanks,
David

the connection.cluster_address": “127.0.0.1” . I am using it in local . Just to give more background , I am using a dockerized confluent kafka which is pulling the couchbase kafka jar as plugin. confluent provides a REST API to inject configuration to connectors. I am using POST connectors API (http://localhost:18083/connectors) and gave below config to inject
{
“name”: “test-couchbase-source”,
“config”: {
“name”: “test-couchbase-source”,
“connector.class”: “com.couchbase.connect.kafka.CouchbaseSourceConnector”,
“tasks.max”: “2”,
“topic.name”: “test-source”,
“connection.cluster_address”: “127.0.0.1”,
“connection.timeout.ms”: “2000”,
“connection.bucket”: “CentralDB”,
“connection.username”: “Administrator”,
“connection.password”: “password”,
“use_snapshots”:“false”,
“dcp.message.converter.class”: “com.couchbase.connect.kafka.handler.source.DefaultSchemaSourceHandler”,
“event.filter.class”: “com.couchbase.connect.kafka.filter.AllPassFilter”,
“couchbase.stream_from”: “SAVED_OFFSET_OR_BEGINNING”,
“couchbase.compression”: “ENABLED”,
“couchbase.flow_control_buffer”: “128m”,
“couchbase.persistence_polling_interval”: “100ms”
}
}

However the configuration is not getting injected …Am i missing something ?

Can you please give some more details about how you’re making the POST request, what the response is, and any other details you think might be relevant?

There’s also a third-party Kafka Connect CLI tool that you might find useful. It takes a lot of the pain out of managing connectors via the HTTP interface.

POST call from confluent kafka REST API - http://localhost:18083/connectors/
Payload to the REST API below
{
“name”: “test-couchbase-source”,
“config”: {
“name”: “test-couchbase-source”,
“connector.class”: “com.couchbase.connect.kafka.CouchbaseSourceConnector”,
“tasks.max”: “2”,
“topic.name”: “test-source”,
“connection.cluster_address”: “127.0.0.1”,
“connection.timeout.ms”: “2000”,
“connection.bucket”: “CentralDB”,
“connection.username”: “Administrator”,
“connection.password”: “password”,
“use_snapshots”:“false”,
“dcp.message.converter.class”: “com.couchbase.connect.kafka.handler.source.DefaultSchemaSourceHandler”,
“event.filter.class”: “com.couchbase.connect.kafka.filter.AllPassFilter”,
“couchbase.stream_from”: “SAVED_OFFSET_OR_BEGINNING”,
“couchbase.compression”: “ENABLED”,
“couchbase.flow_control_buffer”: “128m”,
“couchbase.persistence_polling_interval”: “100ms”
}
}
Docker compose file i run attached below

version: ‘2.1’

services:
zookeeper:
image: confluentinc/cp-zookeeper:5.2.1
hostname: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ports:
- 2181:2181

kafka:
image: confluentinc/cp-kafka:5.2.1
hostname: kafka
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka:19091,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9091
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
ports:
- 9091:9091
depends_on:
- zookeeper

kafka-connect:
# image: confluentinc/cp-kafka-connect:5.1.1
build: .
hostname: kafka-connect
environment:
CONNECT_BOOTSTRAP_SERVERS: “kafka:19091”
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: kafka-connect-local
CONNECT_CONFIG_STORAGE_TOPIC: kafka-connect-local-config
CONNECT_OFFSET_STORAGE_TOPIC: kafka-connect-local-offset
CONNECT_STATUS_STORAGE_TOPIC: kafka-connect-local-status
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
# CONNECT_OFFSET_STORAGE_PARTITIONS: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: “true”
CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: “true”
CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_REST_ADVERTISED_HOST_NAME: ‘kafka-connect’
CONNECT_PLUGIN_PATH: /usr/share/java,/etc/kafka-connect/jars
ports:
- 18083:8083
depends_on:
- zookeeper
- kafka
**Couchbase is installed on my local , not a docker version **
The issue occurs when you try to inject the couchbase config above using REST API end point provided by confluent . I also suspect if the issue is with respect to what docker confluent considers as local host versus couchbase local host …any help greatly appreciated. is there a way i can call you ?

Ah, yes. That’s likely the problem. The connector running inside the Docker network environment cannot access Couchbase Server running outside the Docker environment. This limitation is not specific to the connector; it’s just how Docker networking operates. On certain operating systems there might be a way to configure Docker so the services can all see each other, but that’s not something I know about.

is there a way i can call you ?

For support beyond the “best effort” provided in the Forums, Couchbase Technical Support is probably the best escalation path.

As for solving the problem, the alternatives I’ve used in the past are:

A) Running Couchbase in Docker as well. It doesn’t need to be in the same docker compose file – just needs to use the same Docker network as the Kafka containers. Here’s a demo project that shows how do that.

B) Running Kafka on your local system without Docker. The Confluent CLI makes it really easy to start up the various Kafka services on your dev system.

Thanks,
David

David
I some how had the connector working and its streaming CDC captures to my kafka topic on confluent kafka . However when i read the topic in consumer , the payload content is coming in byte format , not human readable . is there a setting i am missing to view it as JSON format ?

{ topic: ‘SharedKafka2_150119797_couchbase-connect-poc’,
value:
‘{“event”:“mutation”,“partition”:23,“key”:“asset::115217032”,“cas”:1581023894824091648,“bySeqno”:1407,“revSeqno”:5,“expiration”:0,“flags”:33554438,“lockTime”:0,“content”:“{
  "accessTokens": {
    "e1": [
      "004b7276-b5d9-422b-9be3-07b180ec6999"
    ],
    "e2": [
      "f38aded4-3f86-47a3-ac6f-d7cc0584cd87"
    ],
    "e3": [
      "f033c478-2c85-4cf4-98f6-86852d278b27"
    ]
  },
  "actualExitDate": "2008-04-14T08:00:00.000Z",
  "aliasNames": [],
  "applicationEntryMode": "Production",
  "applicationStatus": "Approved",
  "applicationURL": [],
  "assetId": 115217032,
  "banks": [
    "AENBN"
  ],
  "businessTGovernanceIndicator": false,
  "capabilities": [],
  "channels": [],
  "createdDate": "2008-04-26T08:00:00.000Z",
  "customers": [],
  "dataEntities": [],
  "dataInterfaces": [],
  "dataLevelImpact": "Medium",
  "description": "Trigger reporting sends e-mails to ES client managers to alert changes in merchant auth submissions charge volume fraud chargebacks that are out of pattern  The platform runs on ASP NET and MS SQL Server Reporting Services and uses MIDW as the primary data source  Authorization data is manually loaded from CAS sdhshdsjdjsdh 999999 ",
  "docType": "asset",
  "exitReason": "Decommission",
  "financialResponsibility": "",
  "history": [
    {
      "changes": [
        {
          "field": "Created Date",
          "newValue": [
            "2008-04-26T08:00:00.000Z"
          ],
          "oldValue": []
        }
      ],
      "eventType": "New Application",
      "lastUpdateTimeStamp": "2008-04-26T08:00:00.000Z",
      "lastUpdatedBy": "Central"
    }
  ],
  "hostingSite": [],
  "isBranded": false,
  "key": "asset::115217032",
  "lastUpdateTimeStamp": "2016-11-17T02:32:53.691Z",
  "legacyapplicationEntryMode": "Production",
  "legacylifeCycleStatus": "Exit",
  "lifeCycleStatus": "Exit",
  "lineOfBusiness": {},
  "managedBy": [
    "American Express Technology"
  ],
  "name": "ES - TRIGGER REPORTING - NA",
  "notifications": {
    "email": true,
    "workGroupNotificationList": []
  },
  "ownershipInfo": {
    "applicationManager": {
      "band": 40,
      "email": "Sam.Baker@aexp.com",
      "fullName": "Sam Baker",
      "guid": "0dbc6d5ea35dde75871a4332456c568d"
    },
    "applicationOwner": {
      "band": 40,
      "email": "Sam.Baker@aexp.com",
      "fullName": "Sam Baker",
      "guid": "0dbc6d5ea35dde75871a4332456c568d"
    },
    "applicationOwnerLeader1": {
      "email": "sastryvsm.durvasula@aexp.com",
      "guid": "8331d4044f23666ab3b870facfea210b"
    },
    "applicationOwnerLeader2": {
      "email": "Mike.Ruttledge@aexp.com",
      "guid": "3c32021e7eedae5c4bfa6323599c12a1"
    },
    "businessOwner": {
      "email": "lloyd.g.cato@aexp.com",
      "guid": "154daea0dab7bd2d3443984cb94ca9a9"
    },
    "businessOwnerLeader1": {
      "email": "lloyd.g.cato@aexp.com",
      "guid": "154daea0dab7bd2d3443984cb94ca9a9"
    },
    "ownerSVP": {
      "email": "Paul.Dottle@aexp.com",
      "guid": "1135ae72efeb26ddd721784c28bf6006"
    },
    "productionSupportOwner": {
      "band": 40,
      "email": "keith.a.burns@aexp.com",
      "fullName": "Keith A Burns",
      "guid": "263642168d3895fe1f58a4b907e1780f"
    },
    "productionSupportOwnerLeader1": {
      "email": "shane.a.thatcher@aexp.com",
      "guid": "77edf4f5336b5577a952d25d54bd10fd"
    }
  },
  "regionsSupported": [
    {
      "ISO3Code": "USA",
      "name": "US"
    }
  ],
  "regulatoryInformation": [
    {
      "questionCode": "RR02",
      "values": [
        "No"
      ]
    },
    {
      "questionCode": "RR03",
      "values": [
        "N/A"
      ]
    },
    {
      "questionCode": "RR04",
      "values": [
        "N/A"
      ]
    },
    {
      "questionCode": "RR05",
      "values": [
        "N/A"
      ]
    },
    {
      "questionCode": "RR06",
      "values": [
        "Yes"
      ]
    },
    {
      "questionCode": "RR07",
      "values": [
        "Yes"
      ]
    },
    {
      "questionCode": "RR08",
      "values": [
        "Yes"
      ]
    },
    {
      "questionCode": "RR09",
      "values": [
        "No"
      ]
    },
    {
      "questionCode": "RR10",
      "values": [
        "No"
      ]
    },
    {
      "questionCode": "RR11",
      "values": [
        "No"
      ]
    },
    {
      "questionCode": "RR12",
      "values": [
        "No"
      ]
    },
    {
      "questionCode": "RR13",
      "values": [
        "Pre-defined profiles based on job function or roles"
      ]
    },
    {
      "questionCode": "RR14",
      "values": [
        "Yes"
      ]
    },
    {
      "questionCode": "RR17",
      "values": [
        "Yes"
      ]
    },
    {
      "questionCode": "RR18",
      "values": [
        "0 to 3"
      ]
    },
    {
      "questionCode": "RR19",
      "values": [
        "Yes"
      ]
    },
    {
      "questionCode": "RR20",
      "values": [
        "Less than 100"
      ]
    },
    {
      "questionCode": "RR21",
      "values": [
        "Employees"
      ]
    },
    {
      "questionCode": "RR22",
      "values": [
        "Coarse"
      ]
    },
    {
      "questionCode": "RR23",
      "values": [
        "On Termination"
      ]
    },
    {
      "questionCode": "RR24",
      "values": [
        "No"
      ]
    },
    {
      "questionCode": "RR25",
      "values": [
        "No"
      ]
    },
    {
      "questionCode": "RR01PrimaryPII",
      "values": [
        "N/A"
      ]
    },
    {
      "questionCode": "RR01SecondaryPII",
      "values": [
        "N/A"
      ]
    },
    {
      "questionCode": "RR01AffiliationSDE",
      "values": [
        "N/A"
      ]
    },
    {
      "questionCode": "RR01Primary",
      "values": [
        "No"
      ]
    },
    {
      "questionCode": "RR01Secondary",
      "values": [
        "No"
      ]
    },
    {
      "questionCode": "RR01Affiliation",
      "values": [
        "No"
      ]
    },
    {
      "questionCode": "RR01",
      "values": [
        "No"
      ]
    }
  ],
  "risk": {
    "bia": "-1",
    "riskLevel": "Medium",
    "riskScore": 1,
    "rtr": -1
  },
  "securityInfoClassification": "AXP Restricted",
  "sourceCode": true,
  "techPlatformId": null,
  "technologyEnvironment": [
    "Distributed"
  ],
  "validationStatus": "Validated"
}”,“bucket”:“CentralDB”,“vBucketUuid”:243593261013489}’,
offset: 4389,
partition: 3,
highWaterOffset: 4390,
key:
‘{“schema”:{“type”:“string”,“optional”:false},“payload”:“asset::115217032”}’ }

1 Like

Hi Pramod,

I’m glad to hear you’re one step closer to the goal.

If you want the Kafka message to include the Couchbase document as un-encoded JSON, you can configure the connector to use RawJsonSourceHandler (or its cousin RawJsonWithMetadataSourceHandler) as described in the Publishing Raw Json Messages section of the quickstart guide.

Thanks,
David

its working for me , 2 more questions

  1. Is there an option to compress the data while it gets sent ?
  2. Currently eventhough for a particular key , i am only updating 2 out of 10 fileds , the event is sending me the entire document with 10 fileds , including the 2 updated ones … Is there any option to only send 2 fileds as part of the message event ?

Hi Pramod,

  1. Is there an option to compress the data while it gets sent ?

The couchbase.compression config property controls whether documents are compressed when sent from Couchbase Server to the connector. It’s enabled by default. The connector always decompresses the document before writing it to the Kafka topic, regardless of this setting.

I haven’t tried this, but apparently Kafka supports compression, which can be enabled either in the Kafka producer client or the broker. I’m not sure if the Kafka Connect framework exposes enough of the client settings to enable compression on the producer (I doubt it, but I might be wrong). Compression can be enabled on the broker when the topic is created; see the compression.type topic configuration option.

2 . Currently event hough for a particular key , i am only updating 2 out of 10 fields , the event is sending me the entire document with 10 fields , including the 2 updated ones … Is there any option to only send 2 fields as part of the message event ?

No, I’m afraid not. This is just how the protocol used by the connector works.

Thanks,
David

Few more questions from a deployment perspective

  1. We are on Openshift on prem cloud with 3 nodes on primary Data center and 3 nodes on backup data center .
    Will i be ending up attaching 1 data connector to each node in primary data center ( total 3)? If "yes " , suppose a CDC event gets trigered from couchbase will all the dataconnectors pointing to 3 nodes be processing the same event and sending 3 messages to the configured queue or only one message gets sent to the configured topic ? is there any specific configuration that i will need to do for this scenario ?

Hi Pramod,

One setting you might want to tweak is tasks.max.

If you’re running a single connector worker in standalone mode, that one worker talks to all 3 nodes in the Couchbase cluster, and you can optimize resource usage by setting the tasks.max config property to 1.

If you’re deploying the connector in distributed mode, each distributed worker will handle a subset of the Couchbase partitions (also known as “virtual buckets” or “vbuckets”); each Couchbase change event will be sent to only one of the workers. In this scenario it’s recommended to increase the tasks.max configuration to take advantage of all of the worker nodes. If you want all worker nodes to participate in running the connector, set tasks.max equal to the number of worker nodes.

One connector can only talk to one Couchbase cluster. I’m not sure how to deal with the backup Couchbase cluster; that’s a big architectural question. Our support team might have some ideas, and would be the best people to contact if you want to dive deeper into this subject.

Thanks,
David

Hi Pramod,

I am also getting the same error “org.apache.kafka.connect.errors.ConnectException: Cannot fetch configuration for bucket start_connector_bucket” where start_connector_bucket is my bucket name.

Request Body:-

{
“name”: “abc-3”,
“config”: {
“connector.class”: “com.couchbase.connect.kafka.CouchbaseSourceConnector”,
“tasks.max”: 2,
“connection.cluster_address”: “couchbase-service:8091”,
“connection.bucket”: “start_connector_bucket”,
“topic.name”: “start_connector_topic”,
“connection.username”: “Administrator”,
“connection.passowrd”: “Administrator”,
“dcp.message.converter.class”: “com.couchbase.connect.kafka.handler.source.DefaultSchemaSourceHandler”,
“value.converter”: “org.apache.kafka.connect.storage.StringConverter”,
“transforms”: “ignoreDeletes”,
“couchbase.stream_from” : “BEGINNING”,
“transforms.ignoreDeletes.type”: “com.couchbase.connect.kafka.transform.DropIfNullValue”
}
}

I tried multiple combinations for “connection.cluster_address”: “couchbase-service:8091” but couldn’t get any luck.

Error:-
org.apache.kafka.connect.errors.ConnectException: Cannot fetch configuration for bucket start_connector_bucket
at com.couchbase.connect.kafka.CouchbaseSourceConnector.start(CouchbaseSourceConnector.java:60)
at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:110)
at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:135)
at org.apache.kafka.connect.runtime.WorkerConnector.transitionTo(WorkerConnector.java:195)
at org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:257)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:1183)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1400(DistributedHerder.java:125)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:1199)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:1195)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

I have deployed all the components (kafka, couchbase and one simple microservice) in openshift under the same namespace.

Is there any additional configuration which you did on couchbase or kafka to fix this issue?
Your inputs will be helpful.

Thanks in advance.

Looks like a typo in this property, should be: connection.password

What ??? How did I not notice it ??

Changed it to connection.password and worked like a charm.

Thanks, David :slight_smile:

1 Like