- Run body()
body()
to completion - Inside the body()
body()
, do a CancellationTokenSource.Cancel()CancellationTokenSource.Cancel()
- Inside the body()
body()
, throw a OperationCancelledException()OperationCancelledException()
- Inside the body()
body()
, throw a NotSupportedException().NotSupportedException()
- Run body() to completion
- Inside the body(), do a CancellationTokenSource.Cancel()
- Inside the body() throw a OperationCancelledException()
- Inside the body() throw a NotSupportedException().
- Run
body()
to completion - Inside the
body()
, do aCancellationTokenSource.Cancel()
- Inside the
body()
, throw aOperationCancelledException()
- Inside the
body()
, throw aNotSupportedException()
The tests I have been running are:
- Run body() to completion
- Inside the body(), do a CancellationTokenSource.Cancel()
- Inside the body() throw a OperationCancelledException()
- Inside the body() throw a NotSupportedException().
First, this points to a Stephen Toub post here. I came up with combination of Toub's methods to have maxDegreeOfConcurrency
and exception options (following bits from here).
The tests I have been running are:.
Run body() to completion
Inside the body(), do a CancellationTokenSource.Cancel()
Inside the body() throw a OperationCancelledException()
Inside the body() throw a NotSupportedException().
public static async Task ForEachAsync( this IEnumerable source, Func<T, Task> body, AsyncParallelOptions parallelOptions ) { ConcurrentBag exceptions = null;
var maxDegreeOfConcurrency = parallelOptions.MaxDegreeOfParallelism; // If they pass in a CancellationToken from caller of ForEachAsync need to create linked token source in case caller cancels, I want // ForEachAsync to cancel as well. If they want to failImmediately, make a new CancellationTokenSource so I can stop processing partitions var cts = parallelOptions.CancellationToken != null ? CancellationTokenSource.CreateLinkedTokenSource( parallelOptions.CancellationToken ) : parallelOptions.FailImmediately ? new CancellationTokenSource() : null; var allDone = Task.WhenAll( from partition in Partitioner.Create( source ).GetPartitions( maxDegreeOfConcurrency ) select Task.Run( async delegate { using ( partition ) { while ( ( cts == null || !cts.IsCancellationRequested /* either from caller or failImmediately */ ) && partition.MoveNext() ) { await body( partition.Current ) .ContinueWith( t => { // If body() threw an error, cancel if a CancellationTokenSource is present. if ( t.IsFaulted ) { if ( parallelOptions.FailImmediately ) { cts.Cancel(); } // Always gather the exception to throw at the end if ( exceptions == null ) exceptions = new ConcurrentBag<Exception>(); foreach ( var ex in t.Exception.InnerExceptions ) { exceptions.Add( ex ); } } } ); } } }, cts.Token ) ); // Wait until all finished (or errored out) and then return exceptions await allDone; // Question: allDone is never going to have IsCanceled or IsFaulted correct? because await body() will swallow all exceptions? Console.WriteLine($"ForEachAsync Extension: Check Exceptions, {exceptions?.Count ?? 0} total, allDone.IsCanceled: {allDone.IsCanceled}, allDone.IsFaulted: {allDone.IsFaulted}, cts.IsCancellationRequested: {cts.IsCancellationRequested}" ); if ( exceptions != null ) { throw new AggregateException( exceptions ); } else if ( cts.IsCancellationRequested ) { throw new OperationCanceledException(); }
}
public static async Task ForEachAsync<T>( this IEnumerable<T> source, Func<T, Task> body, AsyncParallelOptions parallelOptions )
{
ConcurrentBag<Exception> exceptions = null;
var maxDegreeOfConcurrency = parallelOptions.MaxDegreeOfParallelism;
// If they pass in a CancellationToken from caller of ForEachAsync need to create linked token source in case caller cancels, I want
// ForEachAsync to cancel as well. If they want to failImmediately, make a new CancellationTokenSource so I can stop processing partitions
var cts = parallelOptions.CancellationToken != null ? CancellationTokenSource.CreateLinkedTokenSource( parallelOptions.CancellationToken ) :
parallelOptions.FailImmediately ? new CancellationTokenSource() : null;
var allDone = Task.WhenAll(
from partition in Partitioner.Create( source ).GetPartitions( maxDegreeOfConcurrency )
select Task.Run( async delegate
{
using ( partition )
{
while ( ( cts == null || !cts.IsCancellationRequested /* either from caller or failImmediately */ ) && partition.MoveNext() )
{
await body( partition.Current )
.ContinueWith( t =>
{
// If body() threw an error, cancel if a CancellationTokenSource is present.
if ( t.IsFaulted )
{
if ( parallelOptions.FailImmediately )
{
cts.Cancel();
}
// Always gather the exception to throw at the end
if ( exceptions == null ) exceptions = new ConcurrentBag<Exception>();
foreach ( var ex in t.Exception.InnerExceptions )
{
exceptions.Add( ex );
}
}
}
);
}
}
}, cts.Token ) );
// Wait until all finished (or errored out) and then return exceptions
await allDone;
// Question: allDone is never going to have IsCanceled or IsFaulted correct? because await body() will swallow all exceptions?
Console.WriteLine($"ForEachAsync Extension: Check Exceptions, {exceptions?.Count ?? 0} total, allDone.IsCanceled: {allDone.IsCanceled}, allDone.IsFaulted: {allDone.IsFaulted}, cts.IsCancellationRequested: {cts.IsCancellationRequested}" );
if ( exceptions != null )
{
throw new AggregateException( exceptions );
}
else if ( cts.IsCancellationRequested )
{
throw new OperationCanceledException();
}
}
First, this points to a Stephen Toub post here. I came up with combination of Toub's methods to have maxDegreeOfConcurrency
and exception options (following bits from here).
The tests I have been running are:
Run body() to completion
Inside the body(), do a CancellationTokenSource.Cancel()
Inside the body() throw a OperationCancelledException()
Inside the body() throw a NotSupportedException().
public static async Task ForEachAsync( this IEnumerable source, Func<T, Task> body, AsyncParallelOptions parallelOptions ) { ConcurrentBag exceptions = null;
var maxDegreeOfConcurrency = parallelOptions.MaxDegreeOfParallelism; // If they pass in a CancellationToken from caller of ForEachAsync need to create linked token source in case caller cancels, I want // ForEachAsync to cancel as well. If they want to failImmediately, make a new CancellationTokenSource so I can stop processing partitions var cts = parallelOptions.CancellationToken != null ? CancellationTokenSource.CreateLinkedTokenSource( parallelOptions.CancellationToken ) : parallelOptions.FailImmediately ? new CancellationTokenSource() : null; var allDone = Task.WhenAll( from partition in Partitioner.Create( source ).GetPartitions( maxDegreeOfConcurrency ) select Task.Run( async delegate { using ( partition ) { while ( ( cts == null || !cts.IsCancellationRequested /* either from caller or failImmediately */ ) && partition.MoveNext() ) { await body( partition.Current ) .ContinueWith( t => { // If body() threw an error, cancel if a CancellationTokenSource is present. if ( t.IsFaulted ) { if ( parallelOptions.FailImmediately ) { cts.Cancel(); } // Always gather the exception to throw at the end if ( exceptions == null ) exceptions = new ConcurrentBag<Exception>(); foreach ( var ex in t.Exception.InnerExceptions ) { exceptions.Add( ex ); } } } ); } } }, cts.Token ) ); // Wait until all finished (or errored out) and then return exceptions await allDone; // Question: allDone is never going to have IsCanceled or IsFaulted correct? because await body() will swallow all exceptions? Console.WriteLine($"ForEachAsync Extension: Check Exceptions, {exceptions?.Count ?? 0} total, allDone.IsCanceled: {allDone.IsCanceled}, allDone.IsFaulted: {allDone.IsFaulted}, cts.IsCancellationRequested: {cts.IsCancellationRequested}" ); if ( exceptions != null ) { throw new AggregateException( exceptions ); } else if ( cts.IsCancellationRequested ) { throw new OperationCanceledException(); }
}
The tests I have been running are:
- Run body() to completion
- Inside the body(), do a CancellationTokenSource.Cancel()
- Inside the body() throw a OperationCancelledException()
- Inside the body() throw a NotSupportedException().
First, this points to a Stephen Toub post here. I came up with combination of Toub's methods to have maxDegreeOfConcurrency
and exception options (following bits from here)..
public static async Task ForEachAsync<T>( this IEnumerable<T> source, Func<T, Task> body, AsyncParallelOptions parallelOptions )
{
ConcurrentBag<Exception> exceptions = null;
var maxDegreeOfConcurrency = parallelOptions.MaxDegreeOfParallelism;
// If they pass in a CancellationToken from caller of ForEachAsync need to create linked token source in case caller cancels, I want
// ForEachAsync to cancel as well. If they want to failImmediately, make a new CancellationTokenSource so I can stop processing partitions
var cts = parallelOptions.CancellationToken != null ? CancellationTokenSource.CreateLinkedTokenSource( parallelOptions.CancellationToken ) :
parallelOptions.FailImmediately ? new CancellationTokenSource() : null;
var allDone = Task.WhenAll(
from partition in Partitioner.Create( source ).GetPartitions( maxDegreeOfConcurrency )
select Task.Run( async delegate
{
using ( partition )
{
while ( ( cts == null || !cts.IsCancellationRequested /* either from caller or failImmediately */ ) && partition.MoveNext() )
{
await body( partition.Current )
.ContinueWith( t =>
{
// If body() threw an error, cancel if a CancellationTokenSource is present.
if ( t.IsFaulted )
{
if ( parallelOptions.FailImmediately )
{
cts.Cancel();
}
// Always gather the exception to throw at the end
if ( exceptions == null ) exceptions = new ConcurrentBag<Exception>();
foreach ( var ex in t.Exception.InnerExceptions )
{
exceptions.Add( ex );
}
}
}
);
}
}
}, cts.Token ) );
// Wait until all finished (or errored out) and then return exceptions
await allDone;
// Question: allDone is never going to have IsCanceled or IsFaulted correct? because await body() will swallow all exceptions?
Console.WriteLine($"ForEachAsync Extension: Check Exceptions, {exceptions?.Count ?? 0} total, allDone.IsCanceled: {allDone.IsCanceled}, allDone.IsFaulted: {allDone.IsFaulted}, cts.IsCancellationRequested: {cts.IsCancellationRequested}" );
if ( exceptions != null )
{
throw new AggregateException( exceptions );
}
else if ( cts.IsCancellationRequested )
{
throw new OperationCanceledException();
}
}
- 229
- 3
- 10