Hopefully the final chapter of my ForEachAsync
code review. The original question was the starting point and the second question contained modifications suggested in the original trying to ensure I was handling cancellation and exceptions correctly. The second question also contained my test scenarios (in pseudo code fashion). This final post addresses a comment about a race condition (after applying other suggestions from the second post). This post contains a 'complete' LINQPad script that can be ran in LINQPad to illustrate what I think is a working function and never presents the aforementioned race condition.
Couple of disclaimers about the script:
- The code is taken from a larger code base that is a Windows service project. You can see my SO post about that here if you so desire. So the 'context' of this script isn't the exact scenario where I'd use it, but appears to be good enough to illustrate my core functionality at question. Most notably, the
Main()
method of the script exits and completes before theForEachAsync
task is complete. That is because in my service project, this 'same' code is being ran within aSystem.Threading.Timer
callback and is meant to 'fire and forget' the job, not wait for its completion. - This test script is meant to cancel after 4 seconds of processing. My
Task.Delay
calculation may not be exactly right. In reality,ProcessJobAsync
might be looking for a cancel state in a database and triggering the task cancellation. And just like this script, in my real code base, any number ofTask
s that are running due to theForEachAsync
parallelism could end up callinginternalCancel.Cancel();
because it is very likely that more than one of the tasks will get past the!cts.Token.IsCancellationRequested
check and hit database at the same time. Is it OK to call.Cancel
multiple times? - Please note the question at the end of
ForEachAsync
regarding 'cts.Token.ThrowIfCancellationRequested()
instead of throwingOperationCanceledException
' That might be a better way to handle cancellation, I'm not sure. Task.Run
vsTask.Factory.StartNew
- I had done some reading (mostly from Stephen Toub) before writing this and my understanding may be wrong, but I choseTask.Factory.StartNew
because:- I thought I should be passing in
TaskCreationOptions.LongRunning
, but maybe that isn't needed? - I thought calling the override that allowed
object state
parameter to be passed was correct way to do this, but maybe I should just be accessing themyJob
variable directly from within the async delegate? Any problems (either way) regarding the fact thatavailableJobs
could have more than one item in it, and the fire and forget nature of my task would result inmyJob
being reassigned almost immediately?
- I thought I should be passing in
Given these caveats, does the previous concern that I have a race condition (resulting in a TaskCancelledException
instead of a OperationCancelledException
), still exist? If so, any suggestions on how to prevent it? Other comments/concerns welcome as well.
void Main()
{
// This would really be hitting a DB and returning 1-MaxJobsAllowed (10) jobs
var availableJobs = Enumerable.Range(0, 1);
foreach (var j in availableJobs)
{
var myJob = new ScheduledJob
{
CancellationTokenSource = new CancellationTokenSource(),
Key = j
};
// Main is really a simulation for a System.Threading.Timer callback method, so I want fire and forget by *not* awaiting Task.Factory.StartNew
Task.Factory.StartNew(
async jobState =>
{
var scheduledJob = jobState as ScheduledJob;
try
{
var jobProcessor = new FooAsync();
$"Task.Factory.StartNew - Before await ProcessJobAsync".Dump();
await jobProcessor.ProcessJobAsync( scheduledJob.Key, new XElement("InputPackage") /* fake 'instructions' */, scheduledJob.CancellationTokenSource.Token);
$"Task.Factory.StartNew - After await ProcessJobAsync".Dump();
}
catch (OperationCanceledException)
{
$"Task.Factory.StartNew - Operation Cancelled".Dump();
}
catch (Exception ex)
{
$"Task.Factory.StartNew - Exception - {ex.Message}".Dump();
// throw;
}
finally
{
$"Task.Factory.StartNew - Finished Processing, return 'worker thread' to pool".Dump();
}
},
myJob,
myJob.CancellationTokenSource.Token,
TaskCreationOptions.LongRunning,
TaskScheduler.Default
);
}
"Main: Loop Complete".Dump();
}
public class FooAsync
{
public async Task<int> ProcessJobAsync(int jobKey, XElement inputPackage, CancellationToken cancellationToken)
{
var seconds = new Random().Next( 15, 30 ); // Simulation of how long job will take to run
var start = DateTime.Now;
$"Task {jobKey}: FooAsync - Start ProcessJobAsync (total of {seconds} seconds)".Dump();
// cancellationToken - Required if controlling service needs to shut down and stop job processing
// internalCancel - Required for this job to be able to cancel itself due to UI request
var internalCancel = CancellationTokenSource.CreateLinkedTokenSource( cancellationToken );
try
{
Action<CancellationTokenSource, int> cancelProcessIfNeeded = ( cts, jobData ) =>
{
if (!cts.Token.IsCancellationRequested /* Windows Service shutting down */ && (DateTime.Now - start).TotalSeconds > 4 /* Simulate internal cancel check which might be a cancel on Website from User */ )
{
Console.WriteLine($"Task {jobKey}: FooAsync - Cancelling Job, DataChunk {jobData}, Elapsed Time: {(DateTime.Now - start).TotalSeconds} seconds");
cts.Cancel();
}
cts.Token.ThrowIfCancellationRequested();
};
// Simulate running something for 10 data batches...
await Enumerable
.Range(0, 1000)
.ForEachAsync(
async jobData =>
{
Console.WriteLine( $"Task {jobKey}: FooAsync - Start DataChunk {jobData}" );
cancelProcessIfNeeded( internalCancel, jobData );
await Task.Delay(seconds * 100);
Console.WriteLine( $"Task {jobKey}: FooAsync - Finish DataChunk {jobData}" );
},
new AsyncParallelOptions { MaxDegreeOfParallelism = 100, CancellationToken = internalCancel.Token }
);
}
catch (Exception ex)
{
Console.WriteLine( $"Task {jobKey}: FooAsync - Exception: {ex.GetType().ToString()}, internalCancel.Token.IsCancellationRequested: {internalCancel.Token.IsCancellationRequested}" );
throw;
}
Console.WriteLine( $"Task {jobKey}: FooAsync - Finished ProcessJobAsync in {(DateTime.Now - start).TotalSeconds} seconds" );
return 10;
}
}
public static class ExtensionMethods
{
public static async Task ForEachAsync<T>( this IEnumerable<T> source, Func<T, Task> body, AsyncParallelOptions parallelOptions )
{
ConcurrentQueue<Exception> exceptions = new ConcurrentQueue<Exception>();
var maxDegreeOfConcurrency = parallelOptions.MaxDegreeOfParallelism;
// If they pass in a CancellationToken from caller of ForEachAsync need to create linked token source in case caller cancels, I want
// ForEachAsync to cancel as well. If they want to failImmediately, make a new CancellationTokenSource so I can stop processing partitions
var cts = CancellationTokenSource.CreateLinkedTokenSource( parallelOptions.CancellationToken );
var allDone = Task.WhenAll(
from partition in Partitioner.Create( source ).GetPartitions( maxDegreeOfConcurrency )
select Task.Run( async delegate {
using ( partition )
{
while ( true )
{
cts.Token.ThrowIfCancellationRequested(); /* either from caller or failImmediately */
// try to read next partition
if ( !partition.MoveNext() ) break;
await body( partition.Current ).ContinueWith( t => {
Console.WriteLine( $"ForEachAsync Extension #1: ContinueWith, t.Exception is null: {t.Exception == null}, t.IsCanceled: {t.IsCanceled}, t.IsFaulted: {t.IsFaulted}, cts.IsCancellationRequested: {cts.IsCancellationRequested}" );
// If body() threw an error, cancel caller wants immediate failure
if ( t.Exception != null )
{
// Always gather the exception to throw at the end
foreach ( var ex in t.Exception.Flatten().InnerExceptions )
{
exceptions.Enqueue( ex );
}
if ( parallelOptions.FailImmediately )
{
cts.Cancel();
}
}
} );
}
}
}, cts.Token ) );
// Wait until all finished (or errored out) and then return exceptions
await allDone;
// Question: allDone is never going to have IsCanceled or IsFaulted correct? because await body() will swallow all exceptions?
Console.WriteLine( $"ForEachAsync Extension #2: Finished, {exceptions?.Count ?? 0} total, allDone.IsCanceled: {allDone.IsCanceled}, allDone.IsFaulted: {allDone.IsFaulted}, cts.IsCancellationRequested: {cts.IsCancellationRequested}" );
if ( exceptions.Count > 0 )
{
Console.WriteLine( $"ForEachAsync Extension #3: Throw Exceptions" );
throw new AggregateException( exceptions );
}
// Question, should I just change this whole if statement to cts.Token.ThrowIfCancellationRequested() instead of throwing OperationCanceledException;
if ( cts.IsCancellationRequested )
{
Console.WriteLine($"ForEachAsync Extension #4: Throw OperationCanceledException");
throw new OperationCanceledException();
}
}
}
public class ScheduledJob
{
public CancellationTokenSource CancellationTokenSource { get; set; }
public int Key { get; set;}
}
public class AsyncParallelOptions : System.Threading.Tasks.ParallelOptions
{
public bool FailImmediately { get; set; } = true;
}
1 Answer 1
System.Threading.Timer
already executes on a threadpool thread, and by callingTask.Factory.StartNew
you are starting yet another thread which is wasteful. If you calledWait()
on the task, the TPL could inline it and keep using the same thread. Another possibility is an approach such as this (basicallyawait Task.Delay
instead of an actual timer), but that depends on whether you want the timer to fire exactly every n seconds (even if the previous iteration is still running, which is what you're doing now), or you want each iteration to start n seconds after the last one finished (which is usually what you want, and which you would get with theawait Task.Delay
approach).- I don't exactly follow your cancellation code - you cancel if cancellation wasn't requested? Anyway, yes, calling
Cancel
multiple times is OK. - Generally you should always use
ThrowIfCancellationRequested
when available. - (a) Presumably if you start these tasks every n seconds where n isn't very large, then they are not long-running at all. I don't think that flag makes sense here. Read Toub's answer here. (b) Passing the state parameter is slightly more performant since you avoid the lambda capture, but that is probably premature optimization. Each iteration will get its "own"
myjob
, so don't worry about that either. (c) These two methods are not exactly equivalent, specifically when the task itself returns a task (which is what you're doing). Read the part aboutUnwrap
in Toub's article.
As for OperationCancelledException
, you're seeing it and not TaskCancelledException
because CancelltionToken.ThrowIfCancellationRequested
, which is what you're using to cancel your tasks, throws OperationCanceledException
. Apparently that makes Task.WhenAll
throw OperationCanceledException
and so on. But you shouldn't be worried about TaskCancelledException
vs OperationCancelledException
. The former inherits from the latter, so just catch the latter and be done with it.
One more thing, in your Task.Delay
, I assume you meant to multiply by 1000 (not 100) to convert from milliseconds to seconds. It is then not clear what you attempt to achieve because the minimum is 15 seconds which is more than 4, so the cancellation code will always get called.
-
\$\begingroup\$ Digesting all this information. As for #2. The Windows service passes in the original token so that it can attempt to cancel any running jobs if the service shutdown is requested. So if that was requested,
cts.Token.IsCancellationRequested
would betrue
and it wouldn't enterif
statement ever. But if the service shutdown hasn't been requested yet, the 'running job' could then check the 'application's database' to see if the user(s) of the website/application had requested a cancel (in my code, it is just the time check, not the real DB query). \$\endgroup\$Terry– Terry2016年11月07日 20:19:22 +00:00Commented Nov 7, 2016 at 20:19 -
\$\begingroup\$
Task.Delay
comment, I didn't want to use 1000 because if I am understanding parallelism correctly, if I setEnumerable.Range( 0, 10 )
. and I slept for 1000 for each item with NO parallelism, it would run for 10 seconds. If I set parallelism to 2, it would be down to 5 seconds. So given that I just wanted to specify the length of the entire job running before I hit theForEachAsync
with desired length and parallelism, the calculation I thought was Range / Parallelism * N = Seconds * 1000; That is how I started, but then just bumped to 100 so that it always cancelled (for this test). \$\endgroup\$Terry– Terry2016年11月07日 21:15:18 +00:00Commented Nov 7, 2016 at 21:15 -
\$\begingroup\$
LongRunning
param - I'll switch toTask.Run
and do some testing. But reading Toub's comment in your link, I don't kick off too many tasks (only when user on site requests a 'job' to run)...maybe 20-50 a day? And then each task runs from 30 seconds to 30 minutes (extreme is 2 hours), soLongRunning
still seems kind of appropriate? \$\endgroup\$Terry– Terry2016年11月07日 21:35:38 +00:00Commented Nov 7, 2016 at 21:35 -
1\$\begingroup\$ @Terry RE #2 my bad, I didn't scroll to the right to see the other condition. RE
Task.Delay
that would have been more readable asseconds * 1000 / macDegreeOfParallelism
. RELongRunning
, read his advice again: If you're generating one or two tasks that will persist for quite some time relative to the lifetime of your application, then LongRunning is something to consider. In general, don't use it unless you find that you really need it. In your case, neither condition is satisfied. \$\endgroup\$Ohad Schneider– Ohad Schneider2016年11月07日 22:02:03 +00:00Commented Nov 7, 2016 at 22:02 -
\$\begingroup\$ RE
Task.Wait()
suggestion. MyTimer
is set to call once. It executes the delegate, which could launch off 1..N tasks viaTask.Factory.StartNew()
, then after the timer is then again set to execute once (and repeat). So as you suggested, I don't get overlaps. But are you suggesting that I put.Wait()
on theTask.Factory.StartNew()
call? I don't think I want to do that because I want to fire and forget those jobs. I don't wantPollJobs
to block (unless I'm misunderstanding). Also note, that I did change from Timer to the alternative method by the link you provided. \$\endgroup\$Terry– Terry2016年11月09日 03:33:54 +00:00Commented Nov 9, 2016 at 3:33