Newbie need help with observable logic


Hi all,
I am new to couchbase and I am trying the Java sdk from Scala. Basically what I want to do is run some async query, iterate through the result set, convert them to some other object and return a list of these converted objects.

I have this code snippet:

bucket.query(query).flatMap(new Func1[AsyncN1qlQueryResult, Observable[AsyncN1qlQueryRow]](){
      override def call(result: AsyncN1qlQueryResult):Observable[AsyncN1qlQueryRow]= {
        println("****asyncSearch flatMap=" + result)
    }).map[Transaction]( new Func1[AsyncN1qlQueryRow, Transaction](){
      override def call(result: AsyncN1qlQueryRow):Transaction= {
        println("****asyncSearch map=" + result)
        result.value().toString : Transaction
    }).scan(List[Transaction](), new Func2[ List[Transaction], Transaction, List[Transaction] ](){
      override def call(accumulated: List[Transaction], current: Transaction): List[Transaction]= {
        println("****asyncSearch scan=" + current)
        accumulated ::: List(current)
    }).subscribe(new Action1[List[Transaction]](){
      override def call(result: List[Transaction]):Unit= {
        println("****asyncSearch subscribe=" + result)

However, I am getting an empty result. I can see only in the logs:
****asyncSearch subscribe=List()

The logs I put in flatMap, map and scan did not display. What am I doing wrong? Please help me. Thanks in advance


Can you by chance provide a complete file we can use to reproduce? Note that subscribe is async, so if you put it like this in a main() function and don’t wait nothing will happen because your main thread quits early.

If you are new to couchbase you can start with the sync API too - note that there is also RxScala which acts as a wrapper around RxJava and provides a much nicer experience from scala!


Hi, thanks for replying.

I tried using the sync API before but I am was receiving intermittent results when I made a write and immediately make a read. Later I understand that Couchbase just behave that way in here:

So I tried learning the async API and it was giving me correct results. However my experience with async is only for the calling methods get(id), remove(id), upsert(doc) etc which is simple to grasp. I do them like this:

val bucket= openTxnDatasource.async()
val promise= Promise[Option[TransactionClaim]]()
bucket.get(id).subscribe(new Action1[CouchbaseJsonDoc]() {
  override def call(document: CouchbaseJsonDoc) {
    Option(document).map { doc=>
      logger.debug("Found: " +
      promise.success(Option(doc.content().toString : TransactionClaim))
    }.getOrElse( promise.success(None) )

However, when using async bucket.query(N1qlQuery) was too difficult for me because I am receiving an Observable[AsyncN1qlQueryResult] and I dont know how to map this to a Future[List[Transaction]] properly.

PS I also have a Thread.sleep(5000) right after calling the function that makes the async couchbase call


Hi again,

I think I understand my problem better now. I noticed that when I call my async couchbase function repeatedly, there are times that I see my logs for “****asyncSearch map=…”, “****asyncSearch scan= …”, “****asyncSearch flatmap= …” and they seem to have the correct values as expected. However, I noticed that the logs of “****asyncSearch subscribe=List()” happens first and then I complete the future. Then later I see another log of “****asyncSearch scan= …” and another log of “****asyncSearch subscribe=…” this time with a value but I receive error that promise future is already completed.

So somehow my error lies on using .scan I need to aggregate to a single result and then subscribe to that result. It seems that subscribe was being called repeatedly. Do you think my understanding was correct?