I needed an asynchronous parallel ForEach
mechanism so I could enumerate an enumerable by N degrees of concurrency and process an action for each item. Searching around, my code was modeled/merged from a few posts I found.
The tests I have been running are:
- Run
body()
to completion - Inside the
body()
, do aCancellationTokenSource.Cancel()
- Inside the
body()
, throw aOperationCancelledException()
- Inside the
body()
, throw aNotSupportedException()
First, this points to a Stephen Toub post here. I came up with combination of Toub's methods to have maxDegreeOfConcurrency
and exception options (following bits from here)..
public static async Task ForEachAsync<T>( this IEnumerable<T> source, Func<T, Task> body, AsyncParallelOptions parallelOptions )
{
ConcurrentBag<Exception> exceptions = null;
var maxDegreeOfConcurrency = parallelOptions.MaxDegreeOfParallelism;
// If they pass in a CancellationToken from caller of ForEachAsync need to create linked token source in case caller cancels, I want
// ForEachAsync to cancel as well. If they want to failImmediately, make a new CancellationTokenSource so I can stop processing partitions
var cts = parallelOptions.CancellationToken != null ? CancellationTokenSource.CreateLinkedTokenSource( parallelOptions.CancellationToken ) :
parallelOptions.FailImmediately ? new CancellationTokenSource() : null;
var allDone = Task.WhenAll(
from partition in Partitioner.Create( source ).GetPartitions( maxDegreeOfConcurrency )
select Task.Run( async delegate
{
using ( partition )
{
while ( ( cts == null || !cts.IsCancellationRequested /* either from caller or failImmediately */ ) && partition.MoveNext() )
{
await body( partition.Current )
.ContinueWith( t =>
{
// If body() threw an error, cancel if a CancellationTokenSource is present.
if ( t.IsFaulted )
{
if ( parallelOptions.FailImmediately )
{
cts.Cancel();
}
// Always gather the exception to throw at the end
if ( exceptions == null ) exceptions = new ConcurrentBag<Exception>();
foreach ( var ex in t.Exception.InnerExceptions )
{
exceptions.Add( ex );
}
}
}
);
}
}
}, cts.Token ) );
// Wait until all finished (or errored out) and then return exceptions
await allDone;
// Question: allDone is never going to have IsCanceled or IsFaulted correct? because await body() will swallow all exceptions?
Console.WriteLine($"ForEachAsync Extension: Check Exceptions, {exceptions?.Count ?? 0} total, allDone.IsCanceled: {allDone.IsCanceled}, allDone.IsFaulted: {allDone.IsFaulted}, cts.IsCancellationRequested: {cts.IsCancellationRequested}" );
if ( exceptions != null )
{
throw new AggregateException( exceptions );
}
else if ( cts.IsCancellationRequested )
{
throw new OperationCanceledException();
}
}
-
\$\begingroup\$ I have rolled back Rev 4 → 3. Please see What to do when someone answers . \$\endgroup\$200_success– 200_success2016年10月24日 18:07:23 +00:00Commented Oct 24, 2016 at 18:07
-
\$\begingroup\$ Reading and conforming to. Sorry. \$\endgroup\$Terry– Terry2016年10月24日 18:08:37 +00:00Commented Oct 24, 2016 at 18:08
-
\$\begingroup\$ Revised code and new questions at codereview.stackexchange.com/questions/145326/…. \$\endgroup\$Terry– Terry2016年10月26日 15:47:05 +00:00Commented Oct 26, 2016 at 15:47
1 Answer 1
CancaellationToken
is a struct, not a class, so it can never be null. When people don't want to cancel stuff they just passCancellationToken.None
.- However note that If you change your
cts
assignment logic to something likeparallelOptions.CancellationToken != CancellationToken.None
you could hit aNullReferenceException
when the cancellation token is None andFailImmediately
is false. One simple way to make things work would be to just always assigncts = CancellationTokenSource.CreateLinkedTokenSource(parallelOptions.CancellationToken)
. Then you won't need thects
nullity check in the loop as well. - Instead of checking for
IsCancellationRequested
directly, it's a better practice to callcts.Token.ThrowIfCancellationRequested()
- that way you will actually mark the task as canceled. - Instead of checking
t.IsFaulted
, it's a better practice to check ift.Exception != null
. - Instead of iterating over
t.Exception.InnerExceptions
, you should iterate over t.Exception.Flatten().InnerExceptions`. - Creating the
ConcurrentBag
inside the task like you're doing now is not thread safe. I suggest just creating it upon declaration (instead of assigning null like you do now). allDone
issues:allDone
could be faulted if one of thebody
methods throw synchronously.allDone
would have IsCanceled set if cancellation was initiated either by the caller or by theFailImmediately
logic- In either case, you won't get to the following line as
await allDone
would throw.
- You don't need
else
after youthrow new AggregateException
. - If all you do is re-throw the exceptions at the end of the method, you could just
await
theTask.WhenAll
and get the same behavior for free (without theConsole.WriteLine
though). To make this work, you'll have to re-throw the exception instead of adding its inner exceptions to the concurrent bag.
-
\$\begingroup\$ Revised code and new questions at codereview.stackexchange.com/questions/145326/…. \$\endgroup\$Terry– Terry2016年10月26日 15:47:19 +00:00Commented Oct 26, 2016 at 15:47
Explore related questions
See similar questions with these tags.