I wrote a simple implementation of aysnc parallel.ForEach( ... )
All it really does is create a list of tasks and wait for them all to complete and aggregate the exceptions if any.
But to use CancellationToken
I added Task.Run
but I am worried that this is not the best way of stopping a task from running.
I could add some shortcuts in case the token cannot be cancelled, empty collections and so on.
But those are minor tweaks.
Anything else I could do to make tasks.AddRange(source.Select(s => Task.Run(() => body(s), token)));
more efficient while allowing me to cancel running tasks.
public static async Task ForEach<T>(ICollection<T> source, Func<T, Task> body, CancellationToken token )
{
// create the list of tasks we will be running
var tasks = new List<Task>(source.Count);
try
{
// and add them all at once.
tasks.AddRange(source.Select(s => Task.Run(() => body(s), token)));
// execute it all.
await Task.WhenAll(tasks.ToArray()).ConfigureAwait(false);
// throw if we are done here.
token.ThrowIfCancellationRequested();
}
catch
{
// find the error(s) that might have happened.
var errors = tasks.Where(tt => tt.IsFaulted).Select(tu => tu.Exception).ToList();
// we are back in our own thread
if (errors.Count > 0)
{
throw new AggregateException(errors);
}
}
}
Any suggestions where I could improve performance and the creation of tasks?
1 Answer 1
Task.WhenAll
also accepts Enumerable<Task>
as argument. So your lines with tasks
can be simplified:
var tasks = source.Select(x => Task.Run(() => body(s), token));
try
{
await Task.WhenAll(tasks).ConfigureAwait(false);
token.ThrowIfCancellationRequested();
}
catch
{
// find the error(s) that might have happened.
var errors = tasks.Where(tt => tt.IsFaulted).Select(tu => tu.Exception).ToList();
// we are back in our own thread
if (errors.Count > 0)
{
throw new AggregateException(errors);
}
}
If you end up throwing because of the line token.ThrowIfCancellationRequested()
, after all the tasks have completed (so none of the tasks themselves faulted), I think you end up swallowing the exception.
You provide the CancellationToken
to Task.Run
. The only thing this does is check whether the token has been cancelled before the tasks starts running. If you want to affect the task itself, you need to pass the token into the method body
as well. Additionally, the implementation of body
then needs to actually do something with the token (like checking whether it is cancelled), before cancelling will have any use.
So consider the following possible APIs:
Task ForEach<T>(ICollection<T> seq, Func<T, CancellationToken, Task> body, CancellationToken token)
or
Task ForEach<T>(ICollection<T> seq, Func<T, Task> body)
Not much in between will have much use, assuming it's the function body
that you want to cancel.
For some additional reading, see here.
-
\$\begingroup\$ Thanks for the info on
Task.Run
I have to admit, I never knew that and thanks for theThrow
that might never get caught. While I want thebody
to control the token I also want myForEach
to manage it, I might had a delay task and aWhenAny
\$\endgroup\$FFMG– FFMG2019年09月14日 04:00:40 +00:00Commented Sep 14, 2019 at 4:00 -
\$\begingroup\$ Also, why did you move the
source.Select( ... )
out of the try/catch? \$\endgroup\$FFMG– FFMG2019年09月14日 04:02:40 +00:00Commented Sep 14, 2019 at 4:02 -
\$\begingroup\$ @FFMG multiple reasons: 1) It makes it more convenient, since we both need the collection of tasks in the
try
, as thecatch
block. Moving the declaration out, makes that possible. 2) There is no clear reason to put it in thetry
block. From the usage here,try
is supposed to catch any exceptions the tasks might throw during their execution. If those tasks throw exceptions at some point during their execution, they will be caught by the createdTask
object (the objects intasks
), and stored until the result is accessed. In this example, the exceptions will be thrown by (cont.) \$\endgroup\$JAD– JAD2019年09月15日 09:51:28 +00:00Commented Sep 15, 2019 at 9:51 -
\$\begingroup\$ the statement that awaits the tasks:
await Task.WhenAll(tasks).ConfigureAwait(false);
, which is within thetry
block. 3) If thesource.Select
statement throws for some other reason than task failure or cancellation, like eithersource
orbody
beingnull
, you would also catch them in yourcatch
block, which is not something you want, since thecatch
block currently would swallow those exceptions. \$\endgroup\$JAD– JAD2019年09月15日 09:53:21 +00:00Commented Sep 15, 2019 at 9:53
Explore related questions
See similar questions with these tags.