I am migrating a bulk upsert procedure from 2.7 to 3.1. It does similar multiple upserts on multiple documents. The code: DataPoint is a path/value pair
public async static Task<int> BulkUpdate(IEnumerable<string> keys, IEnumerable<DataPoint> dataPints)
{
var bucket = await _cluster.BucketAsync(_bucket);
var collection = bucket.DefaultCollection();
var taskList = new List<Task<IMutateInResult>>();
foreach (string key in keys)
{
taskList.Add(collection.MutateInAsync(key, specs =>
{
foreach (var dataPoint in dataPints)
{
specs.Upsert(dataPoint.JsonPath, dataPoint.Value, true);
}
}));
}
var result = await Task.WhenAll(taskList);
return result.Count();
}
My problem is the batch throws and stops on the first failed mutation now when in 2.7 it returned IDocumentFragment with Success=false and did not stop a batch.
Please help with proper error handling in my case to keep a batch running to the end. There are some ways to mess with WhenAll but SDK level solution would greatly simplify migration.
This is per RFC, but certainly has some ramifications for async/await and .NET. I do think its possible to do something like this at the extension method level.
var resultList = new List<IMutateInResult>();
var errorList = new List<Exception>();
await Task.WhenAll(taskList.Select(t=>t.ContinueWith(t =>
{
if (t.IsFaulted)
errorList.Add(t.Exception);
else
resultList.Add(t.Result);
})));
//TODO: Log errorList
return resultList.Count();
Iāve been considering adding a standard set of extension methods to help with batched operations, possibly in a separate package. It would need to address things like controlling the degree of parallelization to avoid overloading the Task queue and handling mixed success/failure results. Can you provide more details about use cases you see in your code that might help inform the design?
Also, in the meantime for your āterrible hackā, you should use something like ConcurrentBag<T> instead of List<T> to avoid concurrency issues.
Thank you for pointing out concurrency issue.
My exact use case is fairly simple but may grow into something bigger. I have a collection of complex documents
A user selects multiple documents getting a collection of keys. I want to update all selected documents setting Status to āArchivedā and EditedBy to current user name without downloading all documents one by one.
In your case, then, youāre looking to run a set of X sub-doc mutation operations, each of which is mutating 2 attributes on a document. And you want all the mutations to succeed or fail and then report overall status back rather than stopping at the first failure.
What is your anticipated scale? Normally 5 docs? 500?
Do you foresee any need to make slightly different mutations on each doc, or all of them always the same?
I limit a batch size to 200 documents.chunking larger tasks - users expect slower performance on larger selection.
So far requirement is to mutate all documents in a batch exactly the same way but who knows whats next.
I do not check concurrent editing - the last hand winsā¦
Thanks for the info. I also did some checking into your exception problem to help with your stopgap. Based on my reading, the key problem is around the await. When Task.WhenAll receives exceptions, it wraps all of the individual exceptions in an AggregateException. However, await unwraps this and throws only the first exception, dropping the others on the floor. There are a lot of complicated explanations for this behavior I wonāt go into here.
However, it appears you can handle things more gracefully if you need to:
var allTasks = Task.WhenAll(taskList);
try
{
await allTasks;
}
catch
{
// If you reach this spot, at least one of the tasks failed.
var aggregateException = (AggregateException) allTasks.Exception;
var errorList = aggregateException.InnerExceptions;
// Do something here if you need to with the errors
}
return taskList.Count(t => t.IsCompletedSuccessfully);
In case you are interested, Iāve begin work on drafting a library that adds multi-get and multi-mutation support. Iād be interested any any feedback before the API surface is locked down.