We are hosting replication of Couchbase on K8s cluster with Sync Gateway.
Now I am trying to connect Couchbase with Spark Streaming in Databricks to get change documents. I am able to connect and query Couchbase using the following code.
I defined config as shown here and it working perfectly fine.
val records =spark.read.couchbase(In("id",Array("1234","2341","5431"))).show()
Now I have the following code for Spark Streaming
val records =spark.read.couchbase(In("id",Array("1234","2341","5431")))
val schema = records.schema
val df = spark.readStream.format("com.couchbase.spark.sql").option("streamFrom","now").schema(schema).load()
df.createOrReplaceTempView("df")
val stream_df =spark.sql("select * from df where status='D' ")
stream_df.writeStream.outputMode("append").format("console").start().awaitTermination()
Logs for this Job
19/04/24 15:52:55 INFO N1QLRelation: Inferred schema is **Schema(Removed it)**
19/04/24 15:52:55 INFO SparkSqlParser: Parsing command: df
19/04/24 15:52:55 INFO SparkSqlParser: Parsing command: select * from df where status='D'
19/04/24 15:52:56 INFO StreamExecution: Starting [id = a8e7ab8b-8b1b-4f70-9df8-c4fdbf8fd8e2, runId = 25bf2396-9b8b-4384-897c-9b905b16ba84]. Use /tmp/temporary-0494cb51-ddc7-4341-bf5c-fcfd42c042a6 to store the query checkpoint.
19/04/24 15:52:56 INFO DatabricksStreamingQueryListener: Streaming query started: [id=a8e7ab8b-8b1b-4f70-9df8-c4fdbf8fd8e2, runId=25bf2396-9b8b-4384-897c-9b905b16ba84]
19/04/24 15:52:57 INFO Client: Environment Configuration Used: ClientEnvironment{clusterAt=[XXXX/XXXX:8091], connectionNameGenerator=DefaultConnectionNameGenerator, bucket='Something', passwordSet=true, dcpControl=DcpControl{{connection_buffer_size=51200000}}, eventLoopGroup=NioEventLoopGroup, eventLoopGroupIsPrivate=true, poolBuffers=true, bufferAckWatermark=80, connectTimeout=1000, bootstrapTimeout=5000, sslEnabled=false, sslKeystoreFile='null', sslKeystorePassword=false, sslKeystore=null}
19/04/24 15:52:57 INFO Client: Connecting to seed nodes and bootstrapping bucket Something.
19/04/24 15:52:57 INFO CouchbaseSource: Starting Couchbase Structured Stream from NOW to INFINITY
after that, it keeps running for some time and then it gives me this error
O DcpChannel: Connect attempt to Domain_name/XXXX:11210 failed.
java.io.IOException: Connection timed out
I double check with Networking team, this port is not blocked and I am able to connect with all possible port which required for this operation with telnet.
Edit: Turns out there was firewall so our IPs needs to whitelisted for so now I am able to connect and it’s getting offset but now I get a new following error
Logical Plan:
Project [sku#90, size#89, status#91]
± Filter status#91 IN (D,N)
± SubqueryAlias records
± StreamingExecutionRelation com.couchbase.spark.sql.streaming.CouchbaseSource@c9c0bba, [META_ID#82, column1#83, column2#84, column3#85L, column4#86, column5#87, column6#88L, column7#89, column8#90, column9#91, column10#92]at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:365)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:226)
> Caused by: java.lang.AssertionError: assertion failed: DataFrame returned by getBatch from com.couchbase.spark.sql.streaming.CouchbaseSource@c9c0bba did not have isStreaming=true
Anyone faced this issue before who can help me?
Thank you!