I've created an asynchronous parallel ForEach
mechanism so I could enumerate an enumerable by N degrees of concurrency and process an action for each item. Additionally, I have the option of stopping when first exception is encountered or processing the entire enumerable and returning all exceptions at the end. I'm trying to ensure that all my exception handling is being done correctly and I understand the flow correctly. My original code review can be found here (Original Answer). The code below incorporates suggestions from the original answer, but I'm not sure all the assumptions stated in that answer are happening.
My assumptions and concerns (some contrary to original answer) are posted with each test case. Please feel free to correct any incorrect assumptions.
Test Harness In All Test Cases
// This really would be a CreateLinkedTokenSource that could be cancelled
// from the caller of this code or internally within the body of the ForEachAsync body
var internalCancel = new CancellationTokenSource();
try
{
// Simulate running something for 10 data batches...
await Enumerable.Range(0, 10)
.ForEachAsync(
async jobData =>
{
// body of code that might throw exceptions or set internalCancel.Cancel()...
internalCancel.Token.ThrowIfCancellationRequested();
Console.WriteLine( $"Task {jobKey}: FooAsync - Finish DataChunk {jobData}" );
},
new AsyncParallelOptions { MaxDegreeOfParallelism = 4, CancellationToken = internalCancel.Token }
);
}
catch (Exception ex)
{
Console.WriteLine( $"Task {jobKey}: FooAsync - Exception: {ex.GetType().ToString()}, internalCancel.Token.IsCancellationRequested: {internalCancel.Token.IsCancellationRequested}" );
throw;
}
Test Case 1 - Run body()
to completion
This seems to work as expected.
Test Case 2 - Inside body()
exception is thrown
Inside the body()
, throw a NotSupportedException()
(or any exception other than OperationCancelledException
) and I get the following output.
ForEachAsync Extension #1: ContinueWith, t.Exception is null: False, t.IsCanceled: False, t.IsFaulted: True, cts.IsCancellationRequested: False
ForEachAsync Extension #2: Finished, 1 total, allDone.IsCanceled: False, allDone.IsFaulted: False, cts.IsCancellationRequested: True
ForEachAsync Extension #3: Throw Exceptions
Task 0: FooAsync - Exception: System.AggregateException, internalCancel.Token.IsCancellationRequested: False
My assumptions/concerns are:
- Any exceptions thrown in
body()
are swallowed and presented insideContinueWith()
viat.Exception
ort.IsFaulted
. - Since I did not throw
t.Exception
and did not usects.Token.ThrowIfCancellationRequested()
, that is the reason why afterawait allDone
,allDone.IsFaulted
andallDone.IsCanceled
is always false? - Original answer mentioned
allDone.IsFaulted
could betrue
ifbody()
threw synchronously. I'm not sure what throwing synchronously means, but as my code stands I couldn't seem to ever have it set totrue
. - Original answer mentioned code after
await allDone;
would not run if thrown synchronously, but as my output displays, code is running after theawait allDone
. Is this because I didn't throw any exceptions from insideContinueWith()
?
Test Case 3 - Inside body()
OperationCancelledException
is thrown
Output:
ForEachAsync Extension #1: ContinueWith, t.Exception is null: True, t.IsCanceled: True, t.IsFaulted: False, cts.IsCancellationRequested: False
// Next three are dumps from the other 3 degrees of parallelism
ForEachAsync Extension #1: ContinueWith, t.Exception is null: True, t.IsCanceled: False, t.IsFaulted: False, cts.IsCancellationRequested: True
ForEachAsync Extension #1: ContinueWith, t.Exception is null: True, t.IsCanceled: False, t.IsFaulted: False, cts.IsCancellationRequested: False
ForEachAsync Extension #1: ContinueWith, t.Exception is null: True, t.IsCanceled: False, t.IsFaulted: False, cts.IsCancellationRequested: True
ForEachAsync Extension #2: Finished, 0 total, allDone.IsCanceled: False, allDone.IsFaulted: False, cts.IsCancellationRequested: True
ForEachAsync Extension #4: Throw OperationCanceledException
Task 0: FooAsync - Exception: System.OperationCanceledException, internalCancel.Token.IsCancellationRequested: False
Assumptions/Concerns:
- Original answer stated that it was better practice if I used
cts.Token.ThrowIfCancellationRequested()
instead of checkingcts.IsCancellationRequested
however, I assume I'd place theThrowIfCancellationRequested()
as first line inwhile( partition.MoveNext() ) { }
which means that if an exception or cancellation happened during the body, the next partition would be prepared for processing which could involve hitting web services and/or database calls. Checkingcts.IsCancellationRequested
seems to be a more efficient approach? Unless I'm over thinking things. - Original answer stated that I could just re throw
t.Exception
instead of adding to my collection then I could get my exception handling for free. I assume this means thatawait allDone;
would then throw the exception and no code after that would be executed? This raises two questions.- If I wanted entire enumerable processed and all exceptions thrown at end, this wouldn't allow that right? This would stop execution as soon as the first exception occurred?
- The suggestion to call
t.Exception.Flatten()
before adding to my exception would be lost, then requiring the caller ofForEachAsync
to do that instead?
Test Case 4 - Inside body()
internalCancel.Token.ThrowIfCancellationRequested()
is triggered
Output:
ForEachAsync Extension #1: ContinueWith, t.Exception is null: True, t.IsCanceled: True, t.IsFaulted: False, cts.IsCancellationRequested: True
ForEachAsync Extension #1: ContinueWith, t.Exception is null: True, t.IsCanceled: True, t.IsFaulted: False, cts.IsCancellationRequested: True
ForEachAsync Extension #2: Finished, 0 total, allDone.IsCanceled: False, allDone.IsFaulted: False, cts.IsCancellationRequested: True
ForEachAsync Extension #4: Throw OperationCanceledException
Task 0: FooAsync - Exception: System.OperationCanceledException, internalCancel.Token.IsCancellationRequested: True
Assumptions/Concerns:
1. Similar to the above case, since I don't use cts.Token.ThrowIfCancellationRequested();
my while
loop exits without exception. Then, exceptions.Count==0
so I simply throw a new OperationCancelledException
and the flow seems to behave correctly (or at least how I expect it to).
If you've made it this far, I appreciate any and all comments.
ForEachAsync Extension Method
public static async Task ForEachAsync<T>( this IEnumerable<T> source, Func<T, Task> body, AsyncParallelOptions parallelOptions )
{
ConcurrentBag<Exception> exceptions = new ConcurrentBag<Exception>();
var maxDegreeOfConcurrency = parallelOptions.MaxDegreeOfParallelism;
// If they pass in a CancellationToken from caller of ForEachAsync need to create linked token source in case caller cancels, I want
// ForEachAsync to cancel as well. If they want to failImmediately, make a new CancellationTokenSource so I can stop processing partitions
var cts = parallelOptions.CancellationToken != CancellationToken.None
? CancellationTokenSource.CreateLinkedTokenSource( parallelOptions.CancellationToken )
: new CancellationTokenSource();
var allDone = Task.WhenAll(
from partition in Partitioner.Create( source ).GetPartitions( maxDegreeOfConcurrency )
select Task.Run( async delegate
{
using ( partition )
{
while ( !cts.IsCancellationRequested /* either from caller or failImmediately */ && partition.MoveNext() )
{
await body( partition.Current ).ContinueWith( t =>
{
Console.WriteLine( $"ForEachAsync Extension #1: ContinueWith, t.Exception is null: {t.Exception == null}, t.IsCanceled: {t.IsCanceled}, t.IsFaulted: {t.IsFaulted}, cts.IsCancellationRequested: {cts.IsCancellationRequested}" );
// If body() threw an error, cancel if a CancellationTokenSource is present.
if ( t.Exception != null )
{
if ( parallelOptions.FailImmediately )
{
cts.Cancel();
}
// Always gather the exception to throw at the end
foreach ( var ex in t.Exception.Flatten().InnerExceptions )
{
exceptions.Add( ex );
}
}
else if (t.IsCanceled)
{
// Needed in case OperationCanceledException() is thrown manually without calling
// .Cancel() on any linked token sources
cts.Cancel();
}
}
);
}
}
}, cts.Token ) );
// Wait until all finished (or errored out) and then return exceptions
await allDone;
// Question: allDone is never going to have IsCanceled or IsFaulted correct? because await body() will swallow all exceptions?
Console.WriteLine( $"ForEachAsync Extension #2: Finished, {exceptions?.Count ?? 0} total, allDone.IsCanceled: {allDone.IsCanceled}, allDone.IsFaulted: {allDone.IsFaulted}, cts.IsCancellationRequested: {cts.IsCancellationRequested}" );
if ( exceptions.Count > 0 )
{
Console.WriteLine( $"ForEachAsync Extension #3: Throw Exceptions" );
throw new AggregateException( exceptions );
}
if ( cts.IsCancellationRequested )
{
Console.WriteLine( $"ForEachAsync Extension #4: Throw OperationCanceledException" );
throw new OperationCanceledException();
}
}
1 Answer 1
Test Case 2
- Correct, but only if those exceptions are thrown asynchronously. See (3) below.
- Correct. When you do
await task.ContinueWith(...)
, you are no longer awaiting the original task, rather its continuation. The fact that the original task threw will only reflect in the task object passed to the continuation (e.g.t.Exception
). You are allowed to observe the exception, but unless you throw anything from the continuation it will not be considered faulted or canceled. - If you throw synchronously (e.g.
body = i => throw new Exception("foo")
), you'll never get back a task and evaluation will stop beforeContinueWith
is ever called, let alone awaited. However, since you're using an async delegate in your test harness (async jobData => {...}
) that would never happen, as async delegates make sure aTask
is always returned (and when you throw an exception, it is attached to that task). - It's because you don't throw from either
body
(synchronously) or fromContinueWith
so basically in the eyes of the TPL everything is completing successfully.
Test Case 3
- If that scenario concerns you, you could always do something like
while (true) {cts.Token.ThrowIfCancellationRequested(); if (!partition.MoveNext()) {break;} }
- Correct.
2(1). If you rethrow the exceptions from the continuation (and don't catch them in the outer
await task.ContinueWith
) then it won't be guaranteed either way. This is because execution would stop only for the offending partition's specific task. And since the partitions are dynamic, the enumerable will keep getting processed in the other tasks, and that will continue until all elements are processed, becauseTask.WhenAll
waits for all tasks to complete (even if some failed along the way). Of course if you throw as many exceptions as your degree of parallelism, all tasks will fault and execution will stop. But that's a pretty random condition, so I'd say for this approach to be reasonable you'd have to always trigger cancellation as well (not just whenFailImmediately
is true). 2(2). Correct.
Test Case 4
There is a race condition here. If one of the tasks hits the cancellation code quickly enough, one of the Task.Run
calls will complete as canceled before ever running its task (once it started running a task, it can't magically cancel it, so the token is only relevant before it actually started it). In that case, cancellation will propagate to allDone
, and a TaskCanceledException
will be thrown at await allDone
. To see this in action (in high probability), try increasing your range to say 1000 and your degree of parallelism to say 100. The other option of the race condition is that the tasks are launched so quickly that nothing gets actually cancelled (since, again, true cancellation is only possibe before task launch in your code, and everything was launched by the time cancellation was triggered), leading to the scenario you've described.
More Comments
- You should still be able to do
cts = CancellationTokenSource.CreateLinkedTokenSource(parallelOptions.CancellationToken)
, don't see the need for the separation. - I don't like your manual check of
t.IsCanceled
, the user should specify cancelation via his token. Maybe he wanted to cancel a specific task but not the entire operation. - It turns out
ConcurrentQueue
(which is generally more lightweight) performs better in this use case, both in terms of run-time performance and memory. For proof: http://pastebin.com/ig47x6VV.
-
\$\begingroup\$ Awesome information. I've applied your suggestions for Test case #1/Assumption 1, and your 'More Comments' section. Two questions. Test case #2/Assumption 3 - is there a way to force caller to use an async lamda? Just curious. Test case #4 - Race condition - I tried 1000 range and 100 degrees of parallelism and my code always seemed to 'exit' correctly. I don't seem to get a
TaskCanceledException
ever thrown. More importantly, any chance you could put a suggestion/code fix in your answer? \$\endgroup\$Terry– Terry2016年10月31日 14:59:50 +00:00Commented Oct 31, 2016 at 14:59 -
\$\begingroup\$ @Terry Test case #2/Assumption 3 - no, because async methods are just syntactic sugar for "plain"
Task
returning methods (the compiler transforms your async method to a regularTask
returning method) . So there is no way to differentiate between them in your signature. Test case #4 it doesn't make sense that you can't repro this. Are you sure you haveinternalCancel.Cancel()
inside the test body? Add something likeConsole.WriteLine(Interlocked.Increment(i)
beforeusing (partition)
and also add a trace right after you cancel the token. Then share the output at pastebin.com \$\endgroup\$Ohad Schneider– Ohad Schneider2016年11月01日 22:24:09 +00:00Commented Nov 1, 2016 at 22:24 -
\$\begingroup\$ As for a fix (assuming that by fix you mean more consistent behavior that [almost] always throws when canceled) , like I said using
ThrowIfCancellationRequested
should do it as that would trigger a true TPL cancellation from within the task. Of course if you cancel it at the last minute, while the tasks are still running but are about to finish and are just after the lastThrowIfCancellationRequested
, no exception will be thrown. Such is the nature of async calcellation. you could add anotherThrowIfCancellationRequested
right afterawait allDone
for consistent behavior in that case. \$\endgroup\$Ohad Schneider– Ohad Schneider2016年11月01日 22:28:01 +00:00Commented Nov 1, 2016 at 22:28 -
\$\begingroup\$ I've posted the final chapter of review (again with most of your suggestions) at codereview.stackexchange.com/questions/146129/… \$\endgroup\$Terry– Terry2016年11月04日 14:31:56 +00:00Commented Nov 4, 2016 at 14:31
Explore related questions
See similar questions with these tags.