Unable to Connect Spark Streaming with Couchbase

We are hosting replication of Couchbase on K8s cluster with Sync Gateway.

Now I am trying to connect Couchbase with Spark Streaming in Databricks to get change documents. I am able to connect and query Couchbase using the following code.

I defined config as shown here and it working perfectly fine.

val records =spark.read.couchbase(In("id",Array("1234","2341","5431"))).show()

Now I have the following code for Spark Streaming

 val records =spark.read.couchbase(In("id",Array("1234","2341","5431")))
 val schema = records.schema
 val df = spark.readStream.format("com.couchbase.spark.sql").option("streamFrom","now").schema(schema).load()
 df.createOrReplaceTempView("df")
 val stream_df =spark.sql("select *  from df  where status='D' ")
 stream_df.writeStream.outputMode("append").format("console").start().awaitTermination()

Logs for this Job

19/04/24 15:52:55 INFO N1QLRelation: Inferred schema is **Schema(Removed it)**
19/04/24 15:52:55 INFO SparkSqlParser: Parsing command: df
19/04/24 15:52:55 INFO SparkSqlParser: Parsing command: select *  from df  where status='D' 
19/04/24 15:52:56 INFO StreamExecution: Starting [id = a8e7ab8b-8b1b-4f70-9df8-c4fdbf8fd8e2, runId = 25bf2396-9b8b-4384-897c-9b905b16ba84]. Use /tmp/temporary-0494cb51-ddc7-4341-bf5c-fcfd42c042a6 to store the query checkpoint.
19/04/24 15:52:56 INFO DatabricksStreamingQueryListener: Streaming query started: [id=a8e7ab8b-8b1b-4f70-9df8-c4fdbf8fd8e2, runId=25bf2396-9b8b-4384-897c-9b905b16ba84]
19/04/24 15:52:57 INFO Client: Environment Configuration Used: ClientEnvironment{clusterAt=[XXXX/XXXX:8091], connectionNameGenerator=DefaultConnectionNameGenerator, bucket='Something', passwordSet=true, dcpControl=DcpControl{{connection_buffer_size=51200000}}, eventLoopGroup=NioEventLoopGroup, eventLoopGroupIsPrivate=true, poolBuffers=true, bufferAckWatermark=80, connectTimeout=1000, bootstrapTimeout=5000, sslEnabled=false, sslKeystoreFile='null', sslKeystorePassword=false, sslKeystore=null}
19/04/24 15:52:57 INFO Client: Connecting to seed nodes and bootstrapping bucket Something.
19/04/24 15:52:57 INFO CouchbaseSource: Starting Couchbase Structured Stream from NOW to INFINITY

after that, it keeps running for some time and then it gives me this error

O DcpChannel: Connect attempt to Domain_name/XXXX:11210 failed.
java.io.IOException: Connection timed out 

I double check with Networking team, this port is not blocked and I am able to connect with all possible port which required for this operation with telnet.

Edit: Turns out there was firewall so our IPs needs to whitelisted for so now I am able to connect and it’s getting offset but now I get a new following error

Logical Plan:
Project [sku#90, size#89, status#91]
± Filter status#91 IN (D,N)
± SubqueryAlias records
± StreamingExecutionRelation com.couchbase.spark.sql.streaming.CouchbaseSource@c9c0bba, [META_ID#82, column1#83, column2#84, column3#85L, column4#86, column5#87, column6#88L, column7#89, column8#90, column9#91, column10#92]

at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:365)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:226)
> Caused by: java.lang.AssertionError: assertion failed: DataFrame returned by getBatch from com.couchbase.spark.sql.streaming.CouchbaseSource@c9c0bba did not have isStreaming=true

Anyone faced this issue before who can help me?

Thank you!

Is it intermittent, or does it happen all the time? I’m guessing, but @daschl or @graham.pople would know, maybe there’s a portion of the SparkSQL interface we don’t implement for streaming.

Also, some basic things, can you post the versions you’re using?

Update
From Spark 2.3 , it checks if Stream is micro batch or not, which is not handle by in current Connector version.

Same Kind of issue : https://github.com/Azure/azure-cosmosdb-spark/issues/174

So if you are using >= Spark 2.3 then this error is valid.

I am running this Code in Databricks Spark 2.2. but Databricks backport the changes in the latest version to databricks older Spark versions within databricks.

so even in databricks you can get same error even if are running same on spark 2.2

Hi @infinity. We don’t currently support Spark 2.3, but I’m working on it currently and hit this same issue. I will resolve it under https://issues.couchbase.com/browse/SPARKC-95. Thanks for raising it.