Event loop shutdown warning after upserting documents

I encounter a problem using the Couchbase connector for Spark. When I run a simple upsert (using couchbaseUpsert) according to the examples provided, the data is upserted, but I get a warning:

Force-closing a channel whose registration task was not accepted by an event loop

followed by an error Failed to submit a listener notification task. Event loop shut down?

I’ve also contained the problem in a github repository: GitHub - maxrem/couchbase-connector-event-loop-error, but here is the most important part:

spark
  .sparkContext
  .parallelize(Seq(doc1, doc2))
  .map { x =>
    Upsert(x.str("id"), x)
  }
  .couchbaseUpsert(Keyspace(bucket = Some("item")))
  .collect()

Can someone point out what I’m doing wrong? Thanks!

@maxr can you share the full logfile of the run?

Hi @daschl thanks for your answer.

Here’s the log file https://raw.githubusercontent.com/maxrem/couchbase-connector-event-loop-error/main/run.log

@maxr I think this is a race condition after / during shutdown. It’s something we need to address but I think it should not affect your operations, right? If your code is working fine then please ignore for now, if not we need to look into how it is affecting it.

Basically what is happening we are shutting down, but since it’s async and eventually consistent it tries to reconnect that socket - but we should just ignore this error if we are already shut down.

Hi @daschl ! Again, thanks for your answer and looking in to this.

Your explanation is clear and in my dev environment the documents get upserted. But I’m a bit worried to use this code in production, because the amount of documents is much higher and I have to be sure everything gets inserted. I’ll discuss this in the dev team if we move forward. Thanks for now

@maxr I think it should still be fine, since the way we shutdown the SDK in spark (since there is no better way) is through a JVM shutdown hook. So when this happens all of the actual spark workload is already completed.