Export of large numbers of documents by ad-hoc query

We have use cases where we need to extract ( large) amounts of data from a bucket (e.g. 100,000s, from bucket with say 1,000,000s of documents), by ad-hoc query. Some questions/observations:

  • Ideally we would want to stream to message broker (e.g. Kafka) for onward processing
  • DCP is not suitable for this, since we don’t necessarily want only changed documents
  • How to manage the large quantities of documents efficiently

Any ideas? Could CBExport be enhanced to allow a N1QL query to specify which documents to extract?

You can use N1QL covered query get the document keys. Then use SDK asynchronous calls get the documents.

Thanks, this looks like a good plan. The question is - even just returning the document keys from the index could be memory-consuming, if the query matches 100,000s (or even 1,000,000s) of documents. Any ideas for handling that?

Once we have the list of keys, presumably we could batch this list up into chunks and have multiple threads/processes running to retrieve the actual documents for processing. You mention using SDK asynch calls, would it be appropriate to use the Reactive calls instead?

Yes, Reactive (i am not that much familiar SDK terms)


CREATE INDEX ix1 ON default(APPID, META().id); 
start $id value ""
 for done := false ; !done ; 
       SELECT META().id FROM default WHERE APPID = "appid" AND META().id > $id LIMIT 100000; 
       If no results done = true 
          for each id value 
                  call function that gets document using bucket.get() and does required ops 
                  The above call can be done multiple threads parallelize
          set $id value last value of the select

Thanks for that, very informative.
I’m a bit nervous about paginating the query, because the result set might change between invocations due to new data or data changes by regular consumers that affect the index. For example suppose I have a bunch of very simple Order documents that include status, such as
{
“id”: “1”,
“product”: “Mobile”,
“requestedDeliveryDate”: “2020/07/31”,
“status”: “submitted”,
“customerID”: “123”
}
{
“id”: “2”,
“product”: “Broadband”,
“requestedDeliveryDate”: “2020/07/31”,
“status”: “delivered”,
“customerID”: “123”
}
{
“id”: “3”,
“product”: “Broadband”,
“requestedDeliveryDate”: “2020/08/3”,
“status”: “stuck”,
“customerID”: “789”
}

… (1,000,000s)
And I want to retrieve all the orders in status “stuck”, so I defined an index over the status field.
After I retrieve the first page (say) but before I retrieve the second page, another order enters status "stuck, so maybe this will change the result set.

Unless it supports multi version control (timestamp) no way you can avoid this. After you read another data can come in.

May be you can consider eventing. cc @jon.strabala

Consider the equivalent in a relational (SQL) database. You do a query and get a cursor on the (large) result set and start to traverse the cursor. I’m not an expert here, is the result set likely to change as you iterate through the cursor (due to ongoing updates)? Does it depend on the isolation level?
Anyway not sure what you mean by eventing. The example I gave is just that, an example; the query could be any ad-hoc query.

Hi Johnathon,

Maybe just use Eventing to pick out all the documents of interest and move them to another bucket, then use any method you want to export the resulting set.

function OnUpdate(doc, meta) {
    if  (! (doc.status && (doc.status === "stuck")) ) return 
    // copy the problem document to another bucket via the alias 'stage_bkt' (or perhaps just the key)
    stage_bkt[meta.id] = doc;
}

The key thing is you deploy “from Everything”, hopefully you have lots of cores on your Eventing node and can up the workers form 3 to say 18 to get more processing power. This will read DCP stream from the source bucket and subject to deduplication you will see the entire data set.

For more information refer to https://blog.couchbase.com/couchbase-eventing-small-scripts-that-solve-big-problems-at-scale/

Thanks very much for this hint.
If the query was fixed, then indeed this might be a viable option. But in our use cases we are talking about an ad-hoc query, that might be applied long after any creates/updates to documents.
And ideally we don’t want the people doing the query (support engineers, analysts, whatever) to be dependent on the developers.

Johnathon,

Just double checking if you use Evening all mutations (create/update) subject to de-duplication will stream into the OnUpdate(doc,meta) {…} Eventing handler. If your desired queries across 1M+ documents are primarily against the passed “doc” you can dynamically apply a set of query expressions read in from another KV doc (think of a rules) set.

I have done this where my query expressions read from KV are in “JavaScript” and also in “N1QL” where I use a function to transform the N1QL fragments into JavaScript, the performance can be impressive. This works well if you are only operating on the “doc” passed into the OnUpdate handler.

Does this idea work for your use case?

Thanks for getting back to me. I’m still not clear on what you are recommending.
Again, my use case is that some business analyst, or ongoing support person, or developer, needs to make an ad-hoc query to extract data, and this may be unplanned, i.e. we don’t know in advance what the query is.
So any solution that is based on change events and ongoing filtering will probably not work, since we don’t know what the filter will be.