Skip to main content
Code Review

Return to Question

replaced http://stackoverflow.com/ with https://stackoverflow.com/
Source Link

First, this this points to a Stephen Toub post here. I came up with combination of Toub's methods to have maxDegreeOfConcurrency and exception options (following bits from here here)..

First, this points to a Stephen Toub post here. I came up with combination of Toub's methods to have maxDegreeOfConcurrency and exception options (following bits from here)..

First, this points to a Stephen Toub post here. I came up with combination of Toub's methods to have maxDegreeOfConcurrency and exception options (following bits from here)..

added 15 characters in body
Source Link
Jamal
  • 35.2k
  • 13
  • 134
  • 238
  1. Run body()body() to completion
  2. Inside the body()body(), do a CancellationTokenSource.Cancel()CancellationTokenSource.Cancel()
  3. Inside the body()body(), throw a OperationCancelledException()OperationCancelledException()
  4. Inside the body()body(), throw a NotSupportedException().NotSupportedException()
  1. Run body() to completion
  2. Inside the body(), do a CancellationTokenSource.Cancel()
  3. Inside the body() throw a OperationCancelledException()
  4. Inside the body() throw a NotSupportedException().
  1. Run body() to completion
  2. Inside the body(), do a CancellationTokenSource.Cancel()
  3. Inside the body(), throw a OperationCancelledException()
  4. Inside the body(), throw a NotSupportedException()
Rolled back to version 3 and added my test scenarios.
Source Link
Terry
  • 229
  • 3
  • 10

The tests I have been running are:

  1. Run body() to completion
  2. Inside the body(), do a CancellationTokenSource.Cancel()
  3. Inside the body() throw a OperationCancelledException()
  4. Inside the body() throw a NotSupportedException().

First, this points to a Stephen Toub post here. I came up with combination of Toub's methods to have maxDegreeOfConcurrency and exception options (following bits from here).

The tests I have been running are:.

  1. Run body() to completion

  2. Inside the body(), do a CancellationTokenSource.Cancel()

  3. Inside the body() throw a OperationCancelledException()

  4. Inside the body() throw a NotSupportedException().

    public static async Task ForEachAsync( this IEnumerable source, Func<T, Task> body, AsyncParallelOptions parallelOptions ) { ConcurrentBag exceptions = null;

     var maxDegreeOfConcurrency = parallelOptions.MaxDegreeOfParallelism;
     // If they pass in a CancellationToken from caller of ForEachAsync need to create linked token source in case caller cancels, I want
     // ForEachAsync to cancel as well. If they want to failImmediately, make a new CancellationTokenSource so I can stop processing partitions
     var cts = parallelOptions.CancellationToken != null ? CancellationTokenSource.CreateLinkedTokenSource( parallelOptions.CancellationToken ) :
     parallelOptions.FailImmediately ? new CancellationTokenSource() : null;
     var allDone = Task.WhenAll(
     from partition in Partitioner.Create( source ).GetPartitions( maxDegreeOfConcurrency )
     select Task.Run( async delegate
     {
     using ( partition )
     {
     while ( ( cts == null || !cts.IsCancellationRequested /* either from caller or failImmediately */ ) && partition.MoveNext() )
     {
     await body( partition.Current )
     .ContinueWith( t =>
     {
     // If body() threw an error, cancel if a CancellationTokenSource is present.
     if ( t.IsFaulted )
     {
     if ( parallelOptions.FailImmediately )
     {
     cts.Cancel();
     }
     // Always gather the exception to throw at the end
     if ( exceptions == null ) exceptions = new ConcurrentBag<Exception>();
     foreach ( var ex in t.Exception.InnerExceptions )
     {
     exceptions.Add( ex );
     }
     }
     }
     );
     }
     }
     }, cts.Token ) );
     // Wait until all finished (or errored out) and then return exceptions
     await allDone;
     // Question: allDone is never going to have IsCanceled or IsFaulted correct? because await body() will swallow all exceptions?
     Console.WriteLine($"ForEachAsync Extension: Check Exceptions, {exceptions?.Count ?? 0} total, allDone.IsCanceled: {allDone.IsCanceled}, allDone.IsFaulted: {allDone.IsFaulted}, cts.IsCancellationRequested: {cts.IsCancellationRequested}" );
     if ( exceptions != null )
     {
     throw new AggregateException( exceptions );
     }
     else if ( cts.IsCancellationRequested )
     {
     throw new OperationCanceledException();
     }
    

    }

public static async Task ForEachAsync<T>( this IEnumerable<T> source, Func<T, Task> body, AsyncParallelOptions parallelOptions )
{
 ConcurrentBag<Exception> exceptions = null;
 var maxDegreeOfConcurrency = parallelOptions.MaxDegreeOfParallelism;
 // If they pass in a CancellationToken from caller of ForEachAsync need to create linked token source in case caller cancels, I want
 // ForEachAsync to cancel as well. If they want to failImmediately, make a new CancellationTokenSource so I can stop processing partitions
 var cts = parallelOptions.CancellationToken != null ? CancellationTokenSource.CreateLinkedTokenSource( parallelOptions.CancellationToken ) :
 parallelOptions.FailImmediately ? new CancellationTokenSource() : null;
 var allDone = Task.WhenAll(
 from partition in Partitioner.Create( source ).GetPartitions( maxDegreeOfConcurrency )
 select Task.Run( async delegate
 {
 using ( partition )
 {
 while ( ( cts == null || !cts.IsCancellationRequested /* either from caller or failImmediately */ ) && partition.MoveNext() )
 {
 await body( partition.Current )
 .ContinueWith( t =>
 {
 // If body() threw an error, cancel if a CancellationTokenSource is present.
 if ( t.IsFaulted )
 {
 if ( parallelOptions.FailImmediately )
 {
 cts.Cancel();
 }
 // Always gather the exception to throw at the end
 if ( exceptions == null ) exceptions = new ConcurrentBag<Exception>();
 foreach ( var ex in t.Exception.InnerExceptions )
 {
 exceptions.Add( ex );
 }
 }
 }
 );
 }
 }
 }, cts.Token ) );
 // Wait until all finished (or errored out) and then return exceptions
 await allDone;
 // Question: allDone is never going to have IsCanceled or IsFaulted correct? because await body() will swallow all exceptions?
 Console.WriteLine($"ForEachAsync Extension: Check Exceptions, {exceptions?.Count ?? 0} total, allDone.IsCanceled: {allDone.IsCanceled}, allDone.IsFaulted: {allDone.IsFaulted}, cts.IsCancellationRequested: {cts.IsCancellationRequested}" );
 if ( exceptions != null )
 {
 throw new AggregateException( exceptions );
 }
 else if ( cts.IsCancellationRequested )
 {
 throw new OperationCanceledException();
 }
}

First, this points to a Stephen Toub post here. I came up with combination of Toub's methods to have maxDegreeOfConcurrency and exception options (following bits from here).

The tests I have been running are:

  1. Run body() to completion

  2. Inside the body(), do a CancellationTokenSource.Cancel()

  3. Inside the body() throw a OperationCancelledException()

  4. Inside the body() throw a NotSupportedException().

    public static async Task ForEachAsync( this IEnumerable source, Func<T, Task> body, AsyncParallelOptions parallelOptions ) { ConcurrentBag exceptions = null;

     var maxDegreeOfConcurrency = parallelOptions.MaxDegreeOfParallelism;
     // If they pass in a CancellationToken from caller of ForEachAsync need to create linked token source in case caller cancels, I want
     // ForEachAsync to cancel as well. If they want to failImmediately, make a new CancellationTokenSource so I can stop processing partitions
     var cts = parallelOptions.CancellationToken != null ? CancellationTokenSource.CreateLinkedTokenSource( parallelOptions.CancellationToken ) :
     parallelOptions.FailImmediately ? new CancellationTokenSource() : null;
     var allDone = Task.WhenAll(
     from partition in Partitioner.Create( source ).GetPartitions( maxDegreeOfConcurrency )
     select Task.Run( async delegate
     {
     using ( partition )
     {
     while ( ( cts == null || !cts.IsCancellationRequested /* either from caller or failImmediately */ ) && partition.MoveNext() )
     {
     await body( partition.Current )
     .ContinueWith( t =>
     {
     // If body() threw an error, cancel if a CancellationTokenSource is present.
     if ( t.IsFaulted )
     {
     if ( parallelOptions.FailImmediately )
     {
     cts.Cancel();
     }
     // Always gather the exception to throw at the end
     if ( exceptions == null ) exceptions = new ConcurrentBag<Exception>();
     foreach ( var ex in t.Exception.InnerExceptions )
     {
     exceptions.Add( ex );
     }
     }
     }
     );
     }
     }
     }, cts.Token ) );
     // Wait until all finished (or errored out) and then return exceptions
     await allDone;
     // Question: allDone is never going to have IsCanceled or IsFaulted correct? because await body() will swallow all exceptions?
     Console.WriteLine($"ForEachAsync Extension: Check Exceptions, {exceptions?.Count ?? 0} total, allDone.IsCanceled: {allDone.IsCanceled}, allDone.IsFaulted: {allDone.IsFaulted}, cts.IsCancellationRequested: {cts.IsCancellationRequested}" );
     if ( exceptions != null )
     {
     throw new AggregateException( exceptions );
     }
     else if ( cts.IsCancellationRequested )
     {
     throw new OperationCanceledException();
     }
    

    }

The tests I have been running are:

  1. Run body() to completion
  2. Inside the body(), do a CancellationTokenSource.Cancel()
  3. Inside the body() throw a OperationCancelledException()
  4. Inside the body() throw a NotSupportedException().

First, this points to a Stephen Toub post here. I came up with combination of Toub's methods to have maxDegreeOfConcurrency and exception options (following bits from here)..

public static async Task ForEachAsync<T>( this IEnumerable<T> source, Func<T, Task> body, AsyncParallelOptions parallelOptions )
{
 ConcurrentBag<Exception> exceptions = null;
 var maxDegreeOfConcurrency = parallelOptions.MaxDegreeOfParallelism;
 // If they pass in a CancellationToken from caller of ForEachAsync need to create linked token source in case caller cancels, I want
 // ForEachAsync to cancel as well. If they want to failImmediately, make a new CancellationTokenSource so I can stop processing partitions
 var cts = parallelOptions.CancellationToken != null ? CancellationTokenSource.CreateLinkedTokenSource( parallelOptions.CancellationToken ) :
 parallelOptions.FailImmediately ? new CancellationTokenSource() : null;
 var allDone = Task.WhenAll(
 from partition in Partitioner.Create( source ).GetPartitions( maxDegreeOfConcurrency )
 select Task.Run( async delegate
 {
 using ( partition )
 {
 while ( ( cts == null || !cts.IsCancellationRequested /* either from caller or failImmediately */ ) && partition.MoveNext() )
 {
 await body( partition.Current )
 .ContinueWith( t =>
 {
 // If body() threw an error, cancel if a CancellationTokenSource is present.
 if ( t.IsFaulted )
 {
 if ( parallelOptions.FailImmediately )
 {
 cts.Cancel();
 }
 // Always gather the exception to throw at the end
 if ( exceptions == null ) exceptions = new ConcurrentBag<Exception>();
 foreach ( var ex in t.Exception.InnerExceptions )
 {
 exceptions.Add( ex );
 }
 }
 }
 );
 }
 }
 }, cts.Token ) );
 // Wait until all finished (or errored out) and then return exceptions
 await allDone;
 // Question: allDone is never going to have IsCanceled or IsFaulted correct? because await body() will swallow all exceptions?
 Console.WriteLine($"ForEachAsync Extension: Check Exceptions, {exceptions?.Count ?? 0} total, allDone.IsCanceled: {allDone.IsCanceled}, allDone.IsFaulted: {allDone.IsFaulted}, cts.IsCancellationRequested: {cts.IsCancellationRequested}" );
 if ( exceptions != null )
 {
 throw new AggregateException( exceptions );
 }
 else if ( cts.IsCancellationRequested )
 {
 throw new OperationCanceledException();
 }
}
rolled back to Rev 3
Source Link
200_success
  • 145.5k
  • 22
  • 190
  • 479
Loading
Revision 2 of my code after comments and wrote test scenarios I'm using.
Source Link
Terry
  • 229
  • 3
  • 10
Loading
added 3 characters in body; edited title
Source Link
Jamal
  • 35.2k
  • 13
  • 134
  • 238
Loading
Language tag.
Source Link
Terry
  • 229
  • 3
  • 10
Loading
Source Link
Terry
  • 229
  • 3
  • 10
Loading
lang-cs

AltStyle によって変換されたページ (->オリジナル) /