Async IO problem and OutOfMemory

Hi,

I’m using couchbase 4.1, spark 1.6.2 with spark-connector 1.2.1.

The following code ended with OutOfMemory for large file (more then 50GB):

sc.textFile(file_name)//50G
.map(line =>toJsonObject(line))
.zipWithIndex()//or zipWithUniqueId()
.map(pair=>JsonDocument.create(pair._2.toString, pair._1))
.saveToCouchbase()// OutOfMemory

This problem occurs because of the saveToCouchbase implementation.
Simplified:

rdd.foreachPartition(iter => {
val bucket = CouchbaseConnection().bucket(cbConfig).async()
Observable
.from(OnceIterable(iter))
.flatMap(doc => {bucket.upsert(doc) …/error handling/…})
.toBlocking
.last
})

Sending in Couchbase is slower than reading from a file, leaving the entire partition finally loaded into RAM. (Sorry for my English, if I made a mistake)

A possible solution is rdd.repartition(???).foreachPartition(…), but what I have to replace the ‘???’ if the amount of data is unknown?

The following solution works more well:

val asyncPartitionSize = 1000 // user defined
rdd.foreachPartition(iter => {
val bucket = CouchbaseConnection().bucket(cbConfig).async()
iter.grouped(asyncPartitionSize).map(_.iterator).foreach(partIter => {
Observable
.from(OnceIterable(partIter ))
.flatMap(doc => {bucket.upsert(doc) …/error handling/…})
.toBlocking
.toIterable
.iterator
})
.flatten// lazy flatten Iterator[Iterator[T]] to Iterator[T]
.last
})

Thus I was able to save file size of 300 GB on a cluster with a total RAM of 20 GB.

However, the same problem is apparent when reading the data from the Couchbase using ‘CouchbaseView’, and I could not solve it.

I do not have experience with Apache Spark, maybe I missed something?