Sdílet prostřednictvím


Task-based Asynchronous Pattern - WhenAllOrError

A very common scenario when you scatter and gather, i.e. start a number or parallell tasks and then wait for them all, is that you really just want to wait for all if they all succeed. in the case there is an error you typically want to wait no more, handle the error, cancel any tasks not already completed and then move on. Here are a few extension methods I would use in this case:

  1: public static Task WhenAllOrError(params Task[] tasks)
 2: {
 3:     return WhenAllOrError(new Progress<int>(), tasks);
 4: }
 5:  
 6: public static Task<T[]> WhenAllOrError<T>(params Task<T>[] tasks)
 7: {
 8:     return WhenAllOrError(new Progress<T>(), tasks);
 9: }
 10:  
 11: public static Task WhenAllOrError(
 12:     IProgress<int> progress, 
 13:     params Task[] tasks)
 14: {
 15:     var errorResult = new TaskCompletionSource<bool>();
 16:     int completed = 0;
 17:     foreach (var task in tasks)
 18:     {
 19:         task.ContinueWith(
 20:             t =>
 21:                 {
 22:                     progress.Report(Interlocked.Increment(ref completed));
 23:                     if (t.IsCanceled)
 24:                     {
 25:                         errorResult.TrySetCanceled();
 26:                     }
 27:                     else if (t.IsFaulted)
 28:                     {
 29:                         errorResult.TrySetException(
 30:                             t.Exception.InnerException);
 31:                     }
 32:                 });
 33:     }
 34:  
 35:     return Task.WhenAny(errorResult.Task, Task.WhenAll(tasks)).Unwrap();
 36: }
 37:  
 38: public async static Task<T[]> WhenAllOrError<T>(
 39:     IProgress<T> progress, 
 40:     params Task<T>[] tasks)
 41: {
 42:     var errorResult = new TaskCompletionSource<T[]>();
 43:     foreach (var task in tasks)
 44:     {
 45:         task.ContinueWith(
 46:             t =>
 47:                 {
 48:                     if (t.IsCanceled)
 49:                     {
 50:                         errorResult.TrySetCanceled();
 51:                     }
 52:                     else if (t.IsFaulted)
 53:                     {
 54:                         errorResult.TrySetException(
 55:                             t.Exception.InnerException);
 56:                     }
 57:                     else
 58:                     {
 59:                         progress.Report(t.Result);
 60:                         return t.Result;
 61:                     }
 62:                             
 63:                     return default(T);
 64:                 });
 65:     }
 66:  
 67:     return await await Task.WhenAny(errorResult.Task, Task.WhenAll(tasks));
 68: }

Naturally there is a little code duplication between the Task and the Task<T> versions but starting new tasks just to make a Task into a Task<T> does not feel motivated here I think.