I find myself having to loop through a lot of async calls all over the place in my code base. The current pattern we are using is to declare SemaphoreSlim(maxcount) and then await sem.WaitAsync(), Create a new task, add it to a List and then repeat. To control the release the new task itself has a reference to sem and does the release on final. There is a step in there to look for and remove completed tasks. This is a pattern that was inherited and is used for multiple different types of async calls. As I develop new code I was hoping to simply this into a single helper class where I can just queue the work until I hit a set limit and then have to wait to add the next once a slot as freed up.
The calls are all async (as this is a 3rd party lib that we don't control). Also, the library I maintain has to be used in both winform and asp.net processes, so keeping it async seems ideal. The targets are usually waiting web services (5-10s range), in which we don't want to slam, but at the same time we typically have 1000's of items in the queue (3rd part has no bulk update implementation -- one element at a time type situation).
This is what I have come up with (naming is still a work in progress):
// Assume this is the parameter to the method we're calling over and over again
private class _ProcessArg
{
public Guid ID { get; set; }
}
// The method that we are calling over and over again
private async Task _Process(_ProcessArg arg, CancellationToken ct)
{
await Task.Run(() => { System.Diagnostics.Debug.WriteLine(arg.ID.ToString())});
}
// The main loop where we generated the data for the call.
private async Task _RunMainLoop(CancellationToken ct)
{
int maxThreads = 10;
ConcurrentQueue<Guid> queue = new ConcurrentQueue<Guid>();
// Typically this would be the database load/whatnot
for (int i = 0; i < 100000; i++)
{
queue.Enqueue(Guid.NewGuid());
}
AsyncTaskMutex mutex = new AsyncTaskMutex(maxThreads);
while (true)
{
Guid id;
if (queue.TryDequeue(out id))
{
await mutex.QueueTask<_ProcessArg>(_Process, new _ProcessArg
{
ID = id,
}, ct);
}
else
{
await mutex.DrainQueue(ct);
break;
}
}
}
// Class I'm looking for peer review on
public class AsyncTaskMutex
{
private SemaphoreSlim _sem;
private List<Task> _tasks;
public AsyncTaskMutex()
: this(10)
{
}
public AsyncTaskMutex(int maxTasks)
{
_sem = new SemaphoreSlim(maxTasks, maxTasks);
_tasks = new List<Task>();
}
public async Task DrainQueue(CancellationToken ct)
{
await Task.WhenAll(_tasks);
_tasks.RemoveAll(t => t.IsCompleted);
}
public async Task QueueTask<T>(Func<T, CancellationToken, Task> func, T args, CancellationToken ct = default(CancellationToken))
{
await _sem.WaitAsync(ct);
try
{
Task task = func(args, ct);
task.GetAwaiter().OnCompleted(_OnCompleted);
_tasks.Add(task);
}
catch (OperationCanceledException)
{
// Intentional ignore
return;
}
}
private void _OnCompleted()
{
_sem.Release(1);
_tasks.RemoveAll(t => t.IsCompleted);
}
}
2 Answers 2
It seems you simply need process a fixed set of work items in parallel with a fixed degree of parallelism and in an async compatible way. Stephen Toub has written a very elegant way to do that in just a couple lines of code.
public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
{
return Task.WhenAll(
from partition in Partitioner.Create(source).GetPartitions(dop)
select Task.Run(async delegate {
using (partition)
while (partition.MoveNext())
await body(partition.Current);
}));
}
Your new code would be:
await ForEachAsync(GetWorkItems(), dop: 16, body: async item => {
await ProcessItem(item); //TODO
});
There is no need to explicitly maintain a queue. But if you want to, you can du that by feeding ConcurrentQueue.GetConsumingEnumerable()
into that ForEachAsync
helper.
-
\$\begingroup\$ I'm going to have to give this a look. The ability to cancel a thread or the entire process is a must, but I suspect there is a way to factor that into his implementation for reuse as well. Usually we're talking 10-12 threads with a queue of 500k tasks. The example is simplistic but there is a queue manager that continuously adds to it. \$\endgroup\$Gary Smith– Gary Smith2018年01月17日 06:31:31 +00:00Commented Jan 17, 2018 at 6:31
-
\$\begingroup\$ Yes, cancellation is easy and should be done within
ProcessItem
like you normally would do it. The loop inherently has no ability to cancel a running work item. Of course you can also test the CancellationToken inside the loop if that would work for you. \$\endgroup\$usr– usr2018年01月23日 12:06:47 +00:00Commented Jan 23, 2018 at 12:06
Here's the extension method I've created.
/// <summary>
/// Concurrently Executes async actions for each item of <see cref="IEnumerable<typeparamref name="T"/>
/// </summary>
/// <typeparam name="T">Type of IEnumerable</typeparam>
/// <param name="enumerable">instance of <see cref="IEnumerable<typeparamref name="T"/>"/></param>
/// <param name="action">an async <see cref="Action" /> to execute</param>
/// <param name="maxActionsToRunInParallel">Optional, max numbers of the actions to run in parallel,
/// Must be grater than 0</param>
/// <returns>A Task representing an async operation</returns>
/// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel is less than 1</exception>
public static async Task ForEachAsyncConcurrent<T>(
this IEnumerable<T> enumerable,
Func<T, Task> action,
int? maxActionsToRunInParallel = null)
{
if (maxActionsToRunInParallel.HasValue)
{
using (var semaphoreSlim = new SemaphoreSlim(
maxActionsToRunInParallel.Value, maxActionsToRunInParallel.Value))
{
var tasksWithThrottler = new List<Task>();
foreach (var item in enumerable)
{
// Increment the number of currently running tasks and wait if they are more than limit.
await semaphoreSlim.WaitAsync();
tasksWithThrottler.Add(Task.Run(async () =>
{
await action(item).ContinueWith(res =>
{
// action is completed, so decrement the number of currently running tasks
semaphoreSlim.Release();
});
}));
}
// Wait for all tasks to complete.
await Task.WhenAll(tasksWithThrottler.ToArray());
}
}
else
{
await Task.WhenAll(enumerable.Select(item => action(item)));
}
}
Sample Usage:
await enumerable.ForEachAsyncConcurrent(
async item =>
{
await SomeAsyncMethod(item);
},
5);
-
1\$\begingroup\$ @Jamal If you read the answer, you'll see that code is well documented. If you still have any queries, feel free to ask. Also I don't think answer is always supposed to be better - I'm posting a different approach - I've used SemaphorSlim instead of Partitioner \$\endgroup\$Jay Shah– Jay Shah2018年05月10日 00:17:24 +00:00Commented May 10, 2018 at 0:17
-
\$\begingroup\$ My mistake, I didn't see it at first. It's not very common here to see entire reviews posted as comments. \$\endgroup\$Jamal– Jamal2018年05月10日 01:01:46 +00:00Commented May 10, 2018 at 1:01
-
\$\begingroup\$ Jay, that's an interesting take on it. I have been using the first approach with a few tweaks. In both cases I added some additional logic for passing in a CancellationToken (so Action<T, CancellationToken) and main method CancellationToken ct = default(CancellationToken) (then passing on the inside to the Action<T, CancellationToken>. The cancellation token is a little more critical for me as we might be processing 100k items and we need a mechanism for cleaning stopping them. \$\endgroup\$Gary Smith– Gary Smith2018年05月11日 23:14:02 +00:00Commented May 11, 2018 at 23:14
-
1\$\begingroup\$ Yes, you can easily add the CancellationToken parameter and pass it in semaphoreSlim.WaitAsync() & Task,Run's 2nd parameter - which can be helpful to stop the task. \$\endgroup\$Jay Shah– Jay Shah2018年05月14日 11:10:46 +00:00Commented May 14, 2018 at 11:10
-
\$\begingroup\$ This rocks, I love it! \$\endgroup\$Nicholas Petersen– Nicholas Petersen2019年09月25日 20:31:45 +00:00Commented Sep 25, 2019 at 20:31