Batch sub-document mutations 3.1 migration

I am migrating a bulk upsert procedure from 2.7 to 3.1. It does similar multiple upserts on multiple documents. The code: DataPoint is a path/value pair

public async static Task<int> BulkUpdate(IEnumerable<string> keys, IEnumerable<DataPoint> dataPints)
       {

        var bucket = await _cluster.BucketAsync(_bucket);
        var collection = bucket.DefaultCollection();
        var taskList = new List<Task<IMutateInResult>>();
        foreach (string key in keys)
        {
            taskList.Add(collection.MutateInAsync(key, specs =>
                {
                    foreach (var dataPoint in dataPints)
                    {
                        specs.Upsert(dataPoint.JsonPath, dataPoint.Value, true);
                    }
                }));
        }
        var result = await Task.WhenAll(taskList);
        return result.Count();
    }

My problem is the batch throws and stops on the first failed mutation now when in 2.7 it returned IDocumentFragment with Success=false and did not stop a batch.
Please help with proper error handling in my case to keep a batch running to the end. There are some ways to mess with WhenAll but SDK level solution would greatly simplify migration.

@skaryshev -

This is per RFC, but certainly has some ramifications for async/await and .NET. I do think its possible to do something like this at the extension method level.

-Jeff

Ended up with a terrible hack so far. :frowning:

        var resultList = new List<IMutateInResult>();
        var errorList = new List<Exception>();
        await Task.WhenAll(taskList.Select(t=>t.ContinueWith(t =>
        {
            if (t.IsFaulted)
                errorList.Add(t.Exception);
            else
                resultList.Add(t.Result);
        })));
        //TODO: Log errorList
        return resultList.Count();
1 Like

@skaryshev

I’ve been considering adding a standard set of extension methods to help with batched operations, possibly in a separate package. It would need to address things like controlling the degree of parallelization to avoid overloading the Task queue and handling mixed success/failure results. Can you provide more details about use cases you see in your code that might help inform the design?

Also, in the meantime for your “terrible hack”, you should use something like ConcurrentBag<T> instead of List<T> to avoid concurrency issues.

Thank you for pointing out concurrency issue.
My exact use case is fairly simple but may grow into something bigger. I have a collection of complex documents

{ "Status":"Active", "EditedBy":"UserName", "MoreStuff": {Complex object}, "EvenMoreStuff": {Complex object} }

A user selects multiple documents getting a collection of keys. I want to update all selected documents setting Status to “Archived” and EditedBy to current user name without downloading all documents one by one.

@skaryshev

In your case, then, you’re looking to run a set of X sub-doc mutation operations, each of which is mutating 2 attributes on a document. And you want all the mutations to succeed or fail and then report overall status back rather than stopping at the first failure.

  • What is your anticipated scale? Normally 5 docs? 500?
  • Do you foresee any need to make slightly different mutations on each doc, or all of them always the same?
  • Are you doing any CAS concurrency checks?

I limit a batch size to 200 documents.chunking larger tasks - users expect slower performance on larger selection.
So far requirement is to mutate all documents in a batch exactly the same way but who knows whats next.
I do not check concurrent editing - the last hand wins…

Thanks for the info. I also did some checking into your exception problem to help with your stopgap. Based on my reading, the key problem is around the await. When Task.WhenAll receives exceptions, it wraps all of the individual exceptions in an AggregateException. However, await unwraps this and throws only the first exception, dropping the others on the floor. There are a lot of complicated explanations for this behavior I won’t go into here.

However, it appears you can handle things more gracefully if you need to:

var allTasks = Task.WhenAll(taskList);
try
{
     await allTasks;
}
catch
{
    // If you reach this spot, at least one of the tasks failed.
    var aggregateException = (AggregateException) allTasks.Exception;
    var errorList = aggregateException.InnerExceptions;
    // Do something here if you need to with the errors
}

return taskList.Count(t => t.IsCompletedSuccessfully);

@skaryshev

In case you are interested, I’ve begin work on drafting a library that adds multi-get and multi-mutation support. I’d be interested any any feedback before the API surface is locked down.

1 Like