Subscriber not consuming all the data from Observable

Hello,

I am using couchbase 4.5 with the latest java driver. I have a simple program that fetches some data using N1QL and then increments an atomic counter. The problem is that the stream completes but only processed 450000 records. Can someone explain this to me?

val sourceCounter = new AtomicLong(0) //this is only equal to 450000 after completion
val query = "select * from foo limit 1000000"
  val docs = for {
    b <- bucket
    docs <- b.query(N1qlQuery.simple(query)).toScala.flatMap(r => r.rows().toScala.map(_.toString))
  } yield docs


docs
  .foreach(
  onNext = {n:String => sourceCounter.incrementAndGet()},
  onError={e:Throwable => logger.error("error in subscriber", e)},
  () => {
    logger.info("completed")
  })

My task is to process 6 million records but I need to get past this. No error is being thrown…I just see the completed log message above.

Thanks,

-K

UPDATE
I also tried using flatMap but still encountered the same result:

docs
    .flatMap(10, {Observable.just(_)})
    .toBlocking
    .subscribe(  onNext = {n:String => sourceCounter.incrementAndGet()},
      onError={e:Throwable => logger.error("error in subscriber", e)},
      () => {
        logger.info("completed")
      })

SOLVED
I just needed to increase the serverSideTimeout:
b.query(N1qlQuery.simple(query(count = true), N1qlParams.build().serverSideTimeout(10, TimeUnit.MINUTES))

getting 1 Million documents in single REST call takes time and also memory.

You should get small chunk of documents.

id = “”
In loop until
SELECT *, META().id AS mid FROM foo WHERE META().id > $id LIMIT 10000;
id = last mid
If number of documents return less than limit value
break
end loop

You can also project document ids “SELECT RAW META().id FROM foo”;
Then use SDK fetch directly from KV

@vsr1

Thanks for your response. I am trying to take advantage of “reactive pull backpressure” mentioned in the Bulk Pattern, BackpressureException and Reactive Pull Backpressure documentation. I thought it would protect me against situations like this. Will I not potentially run into the same issue using your second approach, especially with 6 million records?

I am actually seeing this error when run my code. Its very similar to above but also calls a rest endpoint for each record.

com.couchbase.client.deps.io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 679477248 byte(s) of direct memory (used: 1379926023, max: 2058354688)
	at com.couchbase.client.deps.io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:522)
	at com.couchbase.client.deps.io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:476)
	at com.couchbase.client.deps.io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:766)
	at com.couchbase.client.deps.io.netty.buffer.PoolArena$DirectArena.newUnpooledChunk(PoolArena.java:756)
	at com.couchbase.client.deps.io.netty.buffer.PoolArena.allocateHuge(PoolArena.java:260)
	at com.couchbase.client.deps.io.netty.buffer.PoolArena.allocate(PoolArena.java:231)
	at com.couchbase.client.deps.io.netty.buffer.PoolArena.reallocate(PoolArena.java:397)
	at com.couchbase.client.deps.io.netty.buffer.PooledByteBuf.capacity(PooledByteBuf.java:116)
	at com.couchbase.client.deps.io.netty.buffer.AbstractByteBuf.ensureWritable0(AbstractByteBuf.java:271)
	at com.couchbase.client.deps.io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:251)
	at com.couchbase.client.deps.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:862)
	at com.couchbase.client.deps.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:855)
	at com.couchbase.client.deps.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:845)
	at com.couchbase.client.core.endpoint.query.QueryHandlerV2.decodeResponse(QueryHandlerV2.java:153)
	at com.couchbase.client.core.endpoint.query.QueryHandlerV2.decodeResponse(QueryHandlerV2.java:59)
	at com.couchbase.client.core.endpoint.AbstractGenericHandler.decode(AbstractGenericHandler.java:281)
	at com.couchbase.client.deps.io.netty.handler.codec.MessageToMessageCodec$2.decode(MessageToMessageCodec.java:81)
	at com.couchbase.client.deps.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:88)
	at com.couchbase.client.deps.io.netty.handler.codec.MessageToMessageCodec.channelRead(MessageToMessageCodec.java:111)
	at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)
	at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342)
	at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:335)
	at com.couchbase.client.deps.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438)
	at com.couchbase.client.deps.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:312)
	at com.couchbase.client.deps.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:299)
	at com.couchbase.client.deps.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:415)
	at com.couchbase.client.deps.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
	at com.couchbase.client.deps.io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)
	at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)
	at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342)
	at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:335)
	at com.couchbase.client.deps.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
	at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)
	at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342)
	at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:335)
	at com.couchbase.client.deps.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1302)
	at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)
	at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342)
	at com.couchbase.client.deps.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at com.couchbase.client.deps.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
	at com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:646)
	at com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:581)
	at com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:498)
	at com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:460)
	at com.couchbase.client.deps.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
	at com.couchbase.client.deps.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
	at java.lang.Thread.run(Thread.java:748)

Second approach uses covering index avoids data fetch by N1QL(i.e avoid 2 hops of data DataNode ===> QueryService====>Client).

You can also set pretty=false and use "select RAW foo from foo limit 1000000"
Will reduce result size and avoids UnMarshall becauase query does not accessing any fields of document.

Speaking to the Java error, you should be able to continuously receive records so long as you’re not, of course, trying to keep them all in memory. It looks there like you’ve run out of direct memory, which is allocated by the streaming parser and freed after working with the observeable.

Since the error there indicates it’s trying to allocate 648MB, it seems like the streaming parser isn’t handling the RAW results from query. Is this with the RAW query?

@ingenthr @vsr1

Thank you both for replying. Let me give a bit more context into what I am doing:

val sourceCounter = new AtomicLong(0) //this is only equal to 450000 after completion

val query = "Select * from foo ..."

val docs = for {
    b <- bucket
    docs <- b.query(N1qlQuery.simple(query, N1qlParams.build().serverSideTimeout(10, TimeUnit.MINUTES))).toScala.flatMap(r => r.rows().toScala.map(_.toString))
  } yield docs

//send each record to an http endpoint
def postToRestEndpoint(json:String) = ???

docs
  .foreach(
  onNext = {json:String => 
sourceCounter.incrementAndGet()
postToRestEndpoint(json)
},
  onError={e:Throwable => logger.error("error in subscriber", e)},
  () => {
    logger.info("completed")
  })

So I need to post each record to an http endpoint. Therefore I do need each document. With that information, here are my followup questions:

  1. With second approach @vsr1 suggested, I will need to do an additional query to get the document using the meta().id. Is that still preferred over taking the batch select approach?
  2. Will I not still encounter the OOM issue if I project the ids instead of doing a select *?
  3. I though the whole point of streaming with reactive pull back pressure is to avoid pulling the entire result set into memory but stream it on demand. Based on what @ingenthr said “should be able to continuously receive records so long as you’re not, of course, trying to keep them all in memory”, it sounds like this may be possible. Am I just doing something wrong thats causing the OOM? I’d rather not do the batch select process.
  4. I assume RAW won’t work in my situation since I need the json, right?

Thanks for the pretty suggestion.

Thanks,

-K

I am not expert in SDKs so not sure about streaming. My answer is based on query service and in general.

  1. You are using scala not sure if that works this approach. The approach I talked is https://github.com/brianfrankcooper/YCSB/blob/master/couchbase2/src/main/java/com/yahoo/ycsb/db/couchbase2/Couchbase2Client.java#L600 The query is at line 156
  2. select * projecting all documents. projecting ids just document ids that are very small in size. Item 1 covers for this also.
  3. SELECT RAW <bucketname> FROM <bucketname> WHERE … Projects whole document but reduce on level nesting.

@k_reid we do the streaming in the SDK, but it only really works if you do it fully asynchronous - and you are using the sync code.

One other approach would be to only fetch the doc IDs and then use bulk fetching with Rx to load the docs in a more controlled fashion. (select meta().id as id from …) and then feed the IDs into async bulk asyncBucket.get() if that makes sense.

@daschl,

I am using the AsyncBucket so isn’t that using the async code? Can you point me to an example that selects a large dataset and streams it?

@k_reid, The link i provided in previous post has code that gets meta().id’s and doing fetch

Hi @vsr1 @daschl,

I tried whats in the link posted above and got a very ugly exception. Here is a snippet of the code:

val queryParams = N1qlParams.build().serverSideTimeout(10, TimeUnit.MINUTES).pretty(false).adhoc(true)

  def query() =
    s"""
       |select RAW meta(m).id
       |FROM ${settings.bucket} m
       |WHERE m.appId='foo'
       |AND array_count(m.subscriptions) > 0
       |${limit.map(l => s"limit $l").getOrElse("")}
       """.stripMargin

val docs = for {
      b <- bucket
      docs <-
        b
          .query(N1qlQuery.simple(query, queryParams))
          .toScala
          .flatMap(/*20, */{ r => //also tried the concurrent variation of flatMap but it made no difference
            r.errors().toScala.foreach(e => logger.error(s"Error while parsing N1QL result: ${e.toString}"))
            r.rows().toScala.map { v =>
              new String(v.byteValue()).trim.drop(1).dropRight(1)
            } // id as a string
          })
          .flatMap(b.get(_, classOf[RawJsonDocument]))
          .map(_.content())
    } yield docs

docs.map{ postToRestEndpoint(_) }

}

And here is the exception:

rx.exceptions.CompositeException: 1 exceptions occurred.
	at rx.internal.operators.OperatorMerge$MergeSubscriber.reportError(OperatorMerge.java:268)
	at rx.internal.operators.OperatorMerge$MergeSubscriber.checkTerminate(OperatorMerge.java:818)
	at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:579)
	at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:568)
	at rx.internal.operators.OperatorMerge$InnerSubscriber.onError(OperatorMerge.java:852)
	at rx.internal.operators.OnSubscribeMap$MapSubscriber.onError(OnSubscribeMap.java:88)
	at rx.internal.operators.OnSubscribeFilter$FilterSubscriber.onError(OnSubscribeFilter.java:90)
	at rx.observers.Subscribers$5.onError(Subscribers.java:230)
	at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87)
	at rx.subjects.SubjectSubscriptionManager$SubjectObserver.onError(SubjectSubscriptionManager.java:227)
	at rx.subjects.AsyncSubject.onError(AsyncSubject.java:116)
	at com.couchbase.client.core.CouchbaseCore.send(CouchbaseCore.java:213)
	at com.couchbase.client.core.ResponseHandler$3.call(ResponseHandler.java:205)
	at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: rx.exceptions.CompositeException$CompositeExceptionCausalChain: Chain of Causes for CompositeException In Order Received =>
	at rx.exceptions.CompositeException.getCause(CompositeException.java:129)
	at ch.qos.logback.classic.spi.ThrowableProxy.<init>(ThrowableProxy.java:57)
	at ch.qos.logback.classic.spi.LoggingEvent.<init>(LoggingEvent.java:119)
	at ch.qos.logback.classic.Logger.buildLoggingEventAndAppend(Logger.java:419)
	at ch.qos.logback.classic.Logger.filterAndLog_0_Or3Plus(Logger.java:383)
	at ch.qos.logback.classic.Logger.info(Logger.java:595)
	at com.comcast.cable.elements.mobile.migration.MigrationExecutor$$anonfun$7$$anonfun$apply$9.apply(MigrationExecutor.scala:226)
	at com.comcast.cable.elements.mobile.migration.MigrationExecutor$$anonfun$7$$anonfun$apply$9.apply(MigrationExecutor.scala:226)
	at scala.util.Success.foreach(Try.scala:236)
	at scala.concurrent.Future$$anonfun$foreach$1.apply(Future.scala:206)
	at scala.concurrent.Future$$anonfun$foreach$1.apply(Future.scala:206)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
	at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: com.couchbase.client.core.BackpressureException: null
timestamp=2018-01-11 16:48:24,435 log_level=ERROR host=PATAPL-D1JRG8WM thread=cb-io-1-8 logger=c.c.c.d.i.n.u.ResourceLeakDetector LEAK: ByteBuf.release() was not called before it's garbage-collected. Enable advanced leak reporting to find out where the leak occurred. To enable advanced leak reporting, specify the JVM option '-Dcom.couchbase.client.deps.io.netty.leakDetection.level=advanced' or call ResourceLeakDetector.setLevel() See http://netty.io/wiki/reference-counted-objects.html for more information.

Hello,

I’ve made some changes to the code above and I am seeing the following:

  1. com.couchbase.client.core.BackpressureException: null

  2. {"msg":"Index scan timed out - cause: Index scan timed out","code":12015}

  3. thread=cb-io-1-4 logger=c.c.c.d.i.n.u.ResourceLeakDetector LEAK: ByteBuf.release() was not called before it's garbage-collected. See http://netty.io/wiki/reference-counted-objects.html for more information. Recent access records:

I desperately need help with 1 and 3.

I think number 2 is caused by the query timing out. Do I need to create a different index?

Here is the index: CREATE INDEX `appId_subscriptions_idx` ON `mobile`(`appId`) WHERE (0 < array_count(`subscriptions`))

Here is the explain plan:

[
  {
    "plan": {
      "#operator": "Sequence",
      "~children": [
        {
          "#operator": "Sequence",
          "~children": [
            {
              "#operator": "IndexScan",
              "index": "appId_subscriptions_idx",
              "index_id": "7f7802bb4f33a5b2",
              "keyspace": "mobile",
              "namespace": "default",
              "spans": [
                {
                  "Range": {
                    "High": [
                      "\"ucid\""
                    ],
                    "Inclusion": 3,
                    "Low": [
                      "\"ucid\""
                    ]
                  }
                }
              ],
              "using": "gsi"
            },
            {
              "#operator": "Fetch",
              "as": "m",
              "keyspace": "mobile",
              "namespace": "default"
            },
            {
              "#operator": "Parallel",
              "~child": {
                "#operator": "Sequence",
                "~children": [
                  {
                    "#operator": "Filter",
                    "condition": "(((`m`.`appId`) = \"ucid\") and (0 < array_count((`m`.`subscriptions`))))"
                  },
                  {
                    "#operator": "InitialProject",
                    "result_terms": [
                      {
                        "expr": "`m`",
                        "star": true
                      }
                    ]
                  }
                ]
              }
            }
          ]
        },
        {
          "#operator": "Order",
          "limit": "100",
          "sort_terms": [
            {
              "expr": "(meta(`m`).`id`)"
            }
          ]
        },
        {
          "#operator": "Limit",
          "expr": "100"
        },
        {
          "#operator": "FinalProject"
        }
      ]
    },
    "text": "select m.*\nFROM mobile m\nWHERE m.appId='ucid'\nAND array_count(m.subscriptions) > 0\nORDER BY meta(m).id\nlimit 100"
  }
]

Thanks.

For 2 , Try following index and query. Query follows index key order and avoids sort and push the limit to indexer.

CREATE INDEX `appId_subscriptions_idx` ON `mobile`(`appId`, meta().id) WHERE (0 < array_count(`subscriptions`));
select m.* FROM mobile m WHERE m.appId='ucid' AND array_count(m.subscriptions) > 0 ORDER BY m.appId, meta(m).id limit 100;

for 1) it looks like you are simply pushing too hard on your box. the backpressure exception comes up because you directly feed each row into the ringbuffer on the client side again with bucket.get, and it default size is 16k. So since streaming the rows is quicker than doing all the full fetches for each ID (its manyfold of course) you end up getting backpressured. There are two ways to mitigate this:

a) you put retry logic around the async get call (look up the RetryHelper)
b) you throttle the incoming stream via rx operators

(or c) a combination of both)

Again, the backpressure is just the symptom. the root cause is a temporary overload on the app/sdk side.

  1. is a little harder to figure out, but it could be that if you adress the above, the potential leaks (maybe they happen because of the unruly unsubscribes of the get calls) may go away too.

Thanks @daschl,

The retryWhen operator has helped with the BackPressureException. However, the documentation says:

Retry always passes onNext notifications through to its observers, even from sequences that terminate with an error, so this can cause duplicate emissions (as shown in the diagram above)

So there can be duplicates. Does that hold for this scenario, where the result of a get can be emitted more than once downstream? If it does, then my logic gets even more complicated!!

Thanks,

K

Hello @vsr1 @daschl @ingenthr,

I need to change the query to do a group by. But now it is much slower than the query you helped me with above. Can you provide some optimization tips?

select m.token, ARRAY_AGG(meta(m).id) as ids
FROM mobile m
WHERE m.appId='foo'
AND array_count(m.subscriptions) > 0
AND m.token > "0"
GROUP BY m.token
ORDER BY m.token
LIMIT 10000

I have this index:
CREATE INDEX `appId_token_subscriptions_idx` ON `mobile`(`appId`,`token`) WHERE (0 < array_count(`subscriptions`))

Here is the explain plan:

[
  {
    "plan": {
      "#operator": "Sequence",
      "~children": [
        {
          "#operator": "Sequence",
          "~children": [
            {
              "#operator": "IndexScan",
              "covers": [
                "cover ((`m`.`appId`))",
                "cover ((`m`.`token`))",
                "cover ((meta(`m`).`id`))"
              ],
              "filter_covers": {
                "cover ((0 < array_count((`m`.`subscriptions`))))": true
              },
              "index": "appId_token_subscriptions_idx",
              "index_id": "21c334ac0d2804dc",
              "keyspace": "mobile",
              "namespace": "default",
              "spans": [
                {
                  "Range": {
                    "High": [
                      "successor(\"foo\")"
                    ],
                    "Inclusion": 0,
                    "Low": [
                      "\"foo\"",
                      "\"0\""
                    ]
                  }
                }
              ],
              "using": "gsi"
            },
            {
              "#operator": "Parallel",
              "~child": {
                "#operator": "Sequence",
                "~children": [
                  {
                    "#operator": "Filter",
                    "condition": "(((cover ((`m`.`appId`)) = \"foo\") and cover ((0 < array_count((`m`.`subscriptions`))))) and (\"0\" < cover ((`m`.`token`))))"
                  },
                  {
                    "#operator": "InitialGroup",
                    "aggregates": [
                      "array_agg(cover ((meta(`m`).`id`)))"
                    ],
                    "group_keys": [
                      "cover ((`m`.`token`))"
                    ]
                  }
                ]
              }
            },
            {
              "#operator": "IntermediateGroup",
              "aggregates": [
                "array_agg(cover ((meta(`m`).`id`)))"
              ],
              "group_keys": [
                "cover ((`m`.`token`))"
              ]
            },
            {
              "#operator": "FinalGroup",
              "aggregates": [
                "array_agg(cover ((meta(`m`).`id`)))"
              ],
              "group_keys": [
                "cover ((`m`.`token`))"
              ]
            },
            {
              "#operator": "Parallel",
              "~child": {
                "#operator": "Sequence",
                "~children": [
                  {
                    "#operator": "InitialProject",
                    "result_terms": [
                      {
                        "expr": "cover ((`m`.`token`))"
                      },
                      {
                        "as": "ids",
                        "expr": "array_agg(cover ((meta(`m`).`id`)))"
                      }
                    ]
                  }
                ]
              }
            }
          ]
        },
        {
          "#operator": "Order",
          "limit": "10000",
          "sort_terms": [
            {
              "expr": "cover ((`m`.`token`))"
            }
          ]
        },
        {
          "#operator": "Limit",
          "expr": "10000"
        },
        {
          "#operator": "FinalProject"
        }
      ]
    },
    "text": "select m.token, ARRAY_AGG(meta(m).id) as ids\nFROM mobile m\nWHERE m.appId='foo'\nAND array_count(m.subscriptions) > 0\nAND m.token > \"0\"\nGROUP BY m.token\nORDER BY m.token\nLIMIT 10000"
  }
]

The plan looks right. As query has GROUP BY, ORDER BY it needs produce all qualified items that is why it takes time.

Hmm, ok. Can you suggest a different approach? This query timed out after 5 minutes.

You can increase Indexer timeout, If timeout is from indexer.

If that also not work taking too long you can try the following approach

If your appIds are few and fixed you can move that to Index where clause like below
CREATE INDEX ix1 ON `mobile`(`token`) WHERE (0 < array_count(`subscriptions`) AND appId = "foo");

Follow the approach described bleow and see if that performs better.
https://dzone.com/articles/count-amp-group-faster-using-n1ql
Example:  
          SELECT MIN(token)  FROM mobile WHERE  m.appId='foo' AND array_count(m.subscriptions) > 0 AND m.token > "0";
              value is minval;
             In loop until 10000
                  SELECT RAW META().id FROM mobile WHERE  m.appId='foo' AND array_count(m.subscriptions) > 0 AND m.token = $minval;

SELECT MIN(token)  FROM mobile WHERE  m.appId='foo' AND array_count(m.subscriptions) > 0 AND m.token > $minval;
               minval become new minval;
     End loop

Thanks @vsr1 but your suggestion is not much better. I may be able to get away with running the expensive query once. Is there a way to limit the results by the size of the ids array?

Neither of these work:

select m.token, ARRAY_AGG(meta(m).id) as ids
FROM temp m
WHERE m.appId='foo'
AND array_count(m.subscriptions) > 0
AND m.token > "0"
AND ARRAY_LENGTH(ids) >1
GROUP BY m.token
limit 10
select m.token, ARRAY_AGG(meta(m).id) as ids
FROM temp m
WHERE m.appId='foo'
AND array_count(m.subscriptions) > 0
AND m.token > "0"
GROUP BY m.token
having ARRAY_LENGTH(ids) >1
limit 10