Hello, CB users.
My new version of application with couchbase-spark-connector sends tons of requests to Couchbase Server. As a result, it gives me BackpressureException. (The following is my code)
ids.foreach(id => {
val sKey = JsonArray.from(id, start_date)
val eKey = JsonArray.from(id, end_date)
val couchbaseView = spark
.couchbaseView(ViewQuery.from("my_view", "all").startKey(sKey).endKey(eKey).inclusiveEnd(false).reduce(false))
.cache()
val myView = couchbaseView
.map(_.value)
.distinct()
.count()
if (myView.toInt > 0) {
val rows = couchbaseView
.map(_.id)
.cache()
val rowCount = rows.count()
if (rowCount > 0) {
val result = rows
.couchbaseGet[JsonDocument]()
.map(doc => {
// Do something with doc
(result1, result2, result3)
})
.reduce((a, b) => (a._1 + b._1, a._2 + b._2, a._3 + b._3))
}
rows.unpersist()
}
couchbaseView.unpersist()
})
It seems like Couchbase JAVA SDK has delay and retry option (here) for such issue, but I can’t use the same solution with couchbase-spark-connector.
Does anyone have encounter the same problem? or has any good idea to solve it?
Best,