How do I handle BackpressureException with couchbase-spark-connector

Hello, CB users.

My new version of application with couchbase-spark-connector sends tons of requests to Couchbase Server. As a result, it gives me BackpressureException. (The following is my code)

ids.foreach(id => {

  val sKey = JsonArray.from(id, start_date)
  val eKey = JsonArray.from(id, end_date)

  val couchbaseView = spark
    .couchbaseView(ViewQuery.from("my_view", "all").startKey(sKey).endKey(eKey).inclusiveEnd(false).reduce(false))
    .cache()

  val myView = couchbaseView
      .map(_.value)
      .distinct()
      .count()

  if (myView.toInt > 0) {
    
    val rows = couchbaseView
      .map(_.id)
      .cache()

    val rowCount = rows.count()

    if (rowCount > 0) {
      val result = rows
        .couchbaseGet[JsonDocument]()
        .map(doc => {
          // Do something with doc
          (result1, result2, result3)
        })
        .reduce((a, b) => (a._1 + b._1, a._2 + b._2, a._3 + b._3))
    }
    rows.unpersist()
  }
  couchbaseView.unpersist()
})

It seems like Couchbase JAVA SDK has delay and retry option (here) for such issue, but I can’t use the same solution with couchbase-spark-connector.
Does anyone have encounter the same problem? or has any good idea to solve it?

Best,

@Dynamicscope error handling with spark semantics is a little hard, because there are not many facilities available. I’m currently thinking to add a generic “retry amount” to the config, but this would be very generic.

Do you have any experience what and how I can make your life easier when using spark?

1 Like

I’ve created a ticket to track it - feel free to add your comments there! https://issues.couchbase.com/browse/SPARKC-36

1 Like