I know this has been done many times before, but this is my existing code taken from a sample I found to async process a list of tasks. I've seen Task.Run in use before in many examples, but I'm assuming that in most cases, the workload will be IO bound and therefore the Task scheduler is not clever enough to know whether to grab a thread pool thread rather than use TaskCompletionSource.
I also wanted the below extension method to fast fail, that is if there are 50 items being processed I want the whole thing to fail if just one task encounters an exception. I use a Task.Delay as a "marker" task to signal the whole batch to finish. Is the below a good implementation and if so how can it be improved upon?
public static async Task ForEachAsyncSemaphore<T>(this IEnumerable<T> items,
Func<T, Task> func,
int dop,
CancellationToken ct)
{
Exception capturedException = null;
var childCts = new CancellationTokenSource();
var linkedTokens = CancellationTokenSource.CreateLinkedTokenSource(ct, childCts.Token);
using (var semaphore = new SemaphoreSlim(dop))
{
var cancellationTask = Task.Delay(Timeout.Infinite, linkedTokens.Token);
var tasks = items.Select(async item =>
{
await semaphore.WaitAsync(linkedTokens.Token);
try
{
await func(item);
}
catch (Exception ex)
{
capturedException = ex;
childCts.Cancel();
}
finally
{
semaphore.Release();
}
}).ToArray();
// Use cancellation task as signal to exit early if unexpected exception
await Task.WhenAny(Task.WhenAll(tasks), cancellationTask);
if (capturedException != null)
{
throw capturedException;
}
}
-
\$\begingroup\$ I've now updated to include the linked semaphore token as previously it was cancelling on the token passed in which is incorrect. It now cancels either from the parent caller or from inside when an exception is raised. \$\endgroup\$user183872– user1838722021年05月18日 13:12:58 +00:00Commented May 18, 2021 at 13:12
1 Answer 1
First, let me share with you my revised version then I will detail the changes that I've made:
public static async Task ThrottledConcurrentForeach<T>(
this IEnumerable<T> toBeProcessedItems, Func<T, Task> asyncOperation,
int maxDegreeOfConcurrency, CancellationToken externalCancellationToken)
{
ExceptionDispatchInfo capturedException = null;
var internalCancellation = new CancellationTokenSource();
var internalCancellationSignal = new TaskCompletionSource<object>();
var combinedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(externalCancellationToken, internalCancellation.Token);
combinedCancellationTokenSource.Token.Register(() => internalCancellationSignal.TrySetCanceled(), useSynchronizationContext: false);
using var throttler = new SemaphoreSlim(maxDegreeOfConcurrency);
var processingOperations = toBeProcessedItems.Select(async item =>
{
await throttler.WaitAsync(combinedCancellationTokenSource.Token);
try { await asyncOperation(item); }
catch (Exception ex)
{
capturedException = ExceptionDispatchInfo.Capture(ex);
internalCancellation.Cancel();
}
finally { throttler.Release(); }
});
await Task.WhenAny(Task.WhenAll(processingOperations), internalCancellationSignal.Task);
capturedException?.Throw();
}
Naming
As you can see I've done a lots of renaming. I do believe that using the meaningful names improves readability and gives clarity. Here are a few examples:
ForEachAsyncSemaphore
- Semaphore is an implementation detail
- Try to focus on the provided functionality instead: throttling
- You are using
SemaphoreSlim
notSemaphore
that's why it's misleading
- In case of I/O bound operations we are normally talking about Concurrency, in case of CPU bound operations we talk about Parallelism
- If you want to emphasize your intent then make it visible to your consumers
- Semaphore is an implementation detail
dop
- This is a useless abbreviation
- No-one will understand what that means unless they read the implementation (if they can access it)
- Try to use as expressive parameter names as possible if you do not provide documentation comments
- This is a useless abbreviation
childCts
- Yet again try to focus on the provided functionality
ExceptionDispatchInfo
- If you don't want to change the
StackTrace
of the already thrown exception then please prefer ExceptionDispatchInfo - It preserves the original
StackTrace
with the re-throwability
CancellationToken's Register
- Even though
Task.Delay(Timeout.Infinite, linkedTokens.Token)
works that's not the most elegant way to provide cancellation ability toTask.WhenAll
- You can use the
TaskCompletionSource<object>
for signalling. It works fine withCancellationToken
by combiningRegister
andTrySetCanceled
using declaration
- In order to reduce indentations (to streamline your code) please prefer using declaration if you are using C# 8 or greater