Scenario:
I wanted a task scheduler/doer that enables my application to schedule some tasks to be executed in a specified time but in the same order they were scheduled in or depending on their priorities (DataFlow
like). I also wanted it to enable the usage of C# 5 Async/Await functionality so I looked up over the Internet and didn't found something the truly fit my needs so I decided to implement one... a simple yet advanced (in my opinion) task scheduler that exactly achieved what I want. However, I want to improve it by hearing your opinions about it.
Implementation:
1) Scheduler
Class
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace ATScheduler
{
public abstract class Scheduler
{
#region Fields
private readonly TimeSpan _baseInterval;
private readonly TimeSpan _executionTimeOut;
private readonly List<IJob> _jobs;
private readonly object _sync;
private readonly Timer _timer;
private bool _paused;
#endregion
#region Constructors
protected Scheduler()
{
_jobs = new List<IJob>();
_sync = new object();
_baseInterval = TimeSpan.FromMilliseconds(12000);
_executionTimeOut = TimeSpan.FromMilliseconds(30000);
_timer = new Timer(ProcessJobs, null, Timeout.Infinite, Timeout.Infinite);
}
#endregion
#region Methods
public abstract Task JobTriggered(IJob job);
public abstract Task JobException(IJob job, Exception exception);
private void ProcessJobs(object state)
{
lock (_sync) {
IJob[] jobsToExecute = (from jobs in _jobs
where jobs.StartTime <= DateTimeOffset.Now && jobs.Enabled
orderby jobs.Priority
select jobs).ToArray();
if (jobsToExecute.Any()) {
foreach (IJob job in jobsToExecute) {
try {
JobTriggered(job).Wait(_executionTimeOut);
}
catch (AggregateException exception) {
JobException(job, exception).Wait(_executionTimeOut);
}
if (job.Repeat) {
job.StartTime = DateTimeOffset.Now.Add(job.Interval);
}
else {
_jobs.Remove(job);
}
if (_jobs.Count <= 0 || _paused)
{
_timer.Change(Timeout.Infinite, Timeout.Infinite);
}
else
{
_jobs.Sort((first, second) => first.StartTime.CompareTo(second.StartTime));
TimeSpan delay = _jobs[0].StartTime.Subtract(DateTimeOffset.Now);
long dueTime = Math.Min(Math.Max(100, (long)delay.TotalMilliseconds), (int)_baseInterval.TotalMilliseconds);
_timer.Change((int)dueTime, Timeout.Infinite);
}
}
}
}
}
public void PauseScheduler()
{
if (!_paused) {
_paused = !_paused;
}
}
public void ResumeScheduler()
{
if (_paused) {
_paused = !_paused;
}
}
public void AddJob(IJob job)
{
_jobs.Add(job);
_jobs.Sort((first, second) => first.StartTime.CompareTo(second.StartTime));
TimeSpan delay = _jobs[0].StartTime.Subtract(DateTimeOffset.Now);
long dueTime = Math.Min(Math.Max(100, (long)delay.TotalMilliseconds), (int)_baseInterval.TotalMilliseconds);
_timer.Change((int)dueTime, Timeout.Infinite);
}
public Task<T> AddJob<T>(IJob<T> job)
{
if (job.Repeat) {
throw new Exception("Repeatable jobs can't be awaited!");
}
_jobs.Add(job);
_jobs.Sort((first, second) => first.StartTime.CompareTo(second.StartTime));
TimeSpan delay = _jobs[0].StartTime.Subtract(DateTimeOffset.Now);
long dueTime = Math.Min(Math.Max(100, (long)delay.TotalMilliseconds), (int)_baseInterval.TotalMilliseconds);
_timer.Change((int)dueTime, Timeout.Infinite);
return job.TaskCompletionSource.Task;
}
public void RemoveJob(IJob job)
{
_jobs.RemoveAll(x => x.Equals(job));
}
#endregion
}
}
2) IJob
Interface
using System;
using System.Threading.Tasks;
namespace ATScheduler
{
public interface IJob
{
TimeSpan Interval { get; }
DateTimeOffset StartTime { get; set; }
int Priority { get; }
bool Repeat { get; }
bool Enabled { get; set; }
}
public interface IJob<T> : IJob
{
TaskCompletionSource<T> TaskCompletionSource { get; set; }
void Return(T result);
}
}
3) How to use:
using System;
using System.Threading.Tasks;
using ATScheduler;
namespace AdvancedScheduler
{
internal class Program : Scheduler
{
private static void Main()
{
Console.WriteLine("- Advanced yet simple Flow Task Scheduler!");
Console.WriteLine();
new Program().DoApplication();
Console.Read();
}
private async void DoApplication()
{
AddJob(new SimpleJob());
AddJob(new SimpleUndefinedJob());
Console.WriteLine("Awaitable job returned: {0}", await AddJob(new SimpleAwaitableJob()));
}
public override async Task JobTriggered(IJob job)
{
if (job is SimpleJob) {
await Task.Delay(1); //or time consumable task.
Console.WriteLine("Simple Job executed for {0} times", ((SimpleJob)job).ExecutionCounter);
((SimpleJob)job).ExecutionCounter++;
}
else if (job is SimpleAwaitableJob) {
((SimpleAwaitableJob) job).Return(5);
}
else {
throw new Exception("Undefined Job!");
}
}
public override async Task JobException(IJob job, Exception exception)
{
Console.WriteLine("{0} threw an exception of {1}", job.GetType().Name, exception.InnerException.Message);
}
private class SimpleAwaitableJob : IJob<int>
{
public SimpleAwaitableJob()
{
Priority = 1;
Repeat = false;
Enabled = true;
TaskCompletionSource = new TaskCompletionSource<int>();
Interval = TimeSpan.FromSeconds(5);
StartTime = DateTime.Now;
}
public TimeSpan Interval { get; private set; }
public DateTimeOffset StartTime { get; set; }
public int Priority { get; private set; }
public bool Repeat { get; private set; }
public bool Enabled { get; set; }
public void Return(int result)
{
TaskCompletionSource.SetResult(result);
}
public TaskCompletionSource<int> TaskCompletionSource { get; set; }
}
private class SimpleJob : IJob
{
public SimpleJob()
{
Priority = 0;
Repeat = true;
Enabled = true;
Interval = TimeSpan.FromSeconds(5);
StartTime = DateTime.Now.AddSeconds(10);
}
public TimeSpan Interval { get; private set; }
public DateTimeOffset StartTime { get; set; }
public int Priority { get; private set; }
public bool Repeat { get; private set; }
public bool Enabled { get; set; }
//We can extend it!
public int ExecutionCounter { get; set; }
}
private class SimpleUndefinedJob : IJob
{
public SimpleUndefinedJob()
{
Priority = 2;
Repeat = false;
Enabled = true;
Interval = TimeSpan.FromSeconds(5);
StartTime = DateTime.Now;
}
public TimeSpan Interval { get; private set; }
public DateTimeOffset StartTime { get; set; }
public int Priority { get; private set; }
public bool Repeat { get; private set; }
public bool Enabled { get; set; }
}
}
}
2 Answers 2
Just a couple of notes.
public void PauseScheduler() { if (!_paused) { _paused = !_paused; } } public void ResumeScheduler() { if (_paused) { _paused = !_paused; } }
Can be
public void PauseScheduler()
{
_paused = true;
}
public void ResumeScheduler()
{
_paused = false;
}
The test here is unnecessary
if (jobsToExecute.Any()) { foreach (IJob job in jobsToExecute) {
It would also be good to remove the magic numbers here:
_baseInterval = TimeSpan.FromMilliseconds(12000); _executionTimeOut = TimeSpan.FromMilliseconds(30000);
(On that note, why not TimeSpan.FromSeconds(12)
and TimeSpan.FromSeconds(30)
?)
It seems that _jobs
is always sorted by StartTime
. If so, you could insert into the list at the appropriate place here instead of re-sorting the entire list
_jobs.Add(job); _jobs.Sort((first, second) => first.StartTime.CompareTo(second.StartTime));
job
would be a better name than jobs
here:
from jobs in _jobs where ...
It's poor style to throw System.Exception
. Create a custom exception class instead.
if (job.Repeat) { throw new Exception("Repeatable jobs can't be awaited!"); }
-
\$\begingroup\$ Instead of resorting or inserting manually at the right place, you could use a collection that's automatically sorted: either
SortedDictionary
orSortedList
. \$\endgroup\$svick– svick2014年08月10日 10:15:09 +00:00Commented Aug 10, 2014 at 10:15 -
\$\begingroup\$ On second thought, those types probably aren't such a good fit, since they don't allow multiple values with the same key (i.e. multiple jobs with the same
StartTime
). \$\endgroup\$svick– svick2014年08月10日 10:49:14 +00:00Commented Aug 10, 2014 at 10:49
First, threading bugs:
The way you're accessing _jobs
from multiple threads is not thread-safe! You need to ensure that whenever you're reading or writing to the list, it's under a lock (probably under a different lock than _sync
, since you don't want to wait for all current jobs to execute before a call to AddJob()
returns.
private bool _paused;
Since this field is also accessed from multiple threads, you should also use a lock when accessing it.
Other notes:
I wanted a task scheduler/doer that enables my application to schedule some tasks to be executed in a specified time but in the same order they were scheduled in [...]
Except your code doesn't do that. List<T>.Sort()
is documented as being unstable, which means the order of jobs with the same StartTime
isn't maintained after sorting.
OrderBy
from LINQ is stable, though using that would mean creating lots of garbage lists.
[...] or depending on their priorities (
DataFlow
like).
I don't understand what this means, there are no priorities in TPL Dataflow.
private void ProcessJobs(object state)
At first I was confused by the parameter, until I realized it has to be there to fulfill the signature required by Timer
. But since you don't actually use it, maybe it would be better to remove it from here and instead use a lambda as the timer parameter?
new Timer(_ => ProcessJobs(), null, ...);
IJob[] jobsToExecute = (from jobs in _jobs
where jobs.StartTime <= DateTimeOffset.Now && jobs.Enabled
orderby jobs.Priority
select jobs).ToArray();
Since _jobs
is always kept sorted by StartTime
, you don't need to iterate the whole list to get the jobs that should have been already started. So, using TakeWhile()
instead of Where()
is likely going to be more efficient (though it requires method syntax, so I switched to that for the whole query):
IJob[] jobsToExecute = _jobs
.TakeWhile(job => job.StartTime <= DateTimeOffset.Now)
.Where(job => job.Enabled)
.OrderBy(job => job.Priority)
.ToArray();
JobTriggered(job).Wait(_executionTimeOut);
I don't quite understand why are the job-handling methods async
, when you're waiting on them synchronously: doing this doesn't save you any threads. Though you probably won't care if this uses few more threads than it needs to, since this class wouldn't make much sense in ASP.NET (you can't rely on it there, because the AppDomain can be recycled at any time when there is no request being processed).
If you wanted to make this fully asynchronous, you could do something like this to implement asynchronous waiting with timeout:
async Task<bool> WithTimeout(this Task task, TimeSpan timeout)
{
var completedTask = await Task.WhenAny(task, Task.Delay(timeout));
return completedTask == task;
}
...
await JobTriggered(job).WithTimeout(_executionTimeOut);
Though doing this would also mean you can't use lock
and so you would have to use async-compatible lock (either SemaphoreSlim(1)
or AsyncLock
from Nito AsyncEx).
You would also need to use a workaround for the fact that you can't use await
in a catch
block (at least not until C# 6.0 comes out).
I'm not sure why are you using lock
, when I think the way you're using the Timer
means the method will never be called twice at the same time. If it's there just to make absolutely sure that doesn't happen, then I guess it makes sense.
long dueTime = Math.Min(Math.Max(100, (long)delay.TotalMilliseconds), (int)_baseInterval.TotalMilliseconds);
_timer.Change((int)dueTime, Timeout.Infinite);
The way you're using long
here is weird. I always prefer to use TimeSpan
s as much as possible, not milliseconds in numeric types. There is no Math.Min()
and Math.Max()
for TimeSpan
, but you can easily write them yourself:
public static class DateMath
{
public static TimeSpan Min(TimeSpan val1, TimeSpan val2)
{
return new TimeSpan(Math.Min(val1.Ticks, val2.Ticks));
}
public static TimeSpan Max(TimeSpan val1, TimeSpan val2)
{
return new TimeSpan(Math.Max(val1.Ticks, val2.Ticks));
}
}
This would change your code to the following, which I believe is more readable and less error prone:
var dueTime = DateMath.Min(DateMath.Max(TimeSpan.FromMillisedonds(100), delay), _baseInterval);
_timer.Change(dueTime, Timeout.InfiniteTimeSpan);
It looks like you're sorting the whole list just to get the first job. If the list of jobs becomes so large that this becomes a problem for you, consider switch to a heap, which is more efficient.
Though this means the TakeWhile()
optimization mentioned above wouldn't be that simple anymore.
The two overloads of AddJob()
have lots of duplicated code. I think you could get rid of that by calling the non-generic AddJob()
from the generic one.
int Priority { get; }
You really need to document here whether a higher priority is indicated by a lower or higher numerical value.
TaskCompletionSource<T> TaskCompletionSource { get; set; }
void Return(T result);
Why does TaskCompletionSource
have a setter?
And why expose both TaskCompletionSource
and Return()
? Wouldn't one of them be enough?
private async void DoApplication()
async void
methods should be avoided, and especially so in console applications. Instead, you should use async Task
method and Wait()
for it. Doing that can cause a problem in other contexts, but it's the right solution here.
Or you could use AsyncContext
from Nito AsyncEx.
public override async Task JobTriggered(IJob job)
All the casting in this method smells of a bad design. Maybe JobTriggered()
should be a method on IJob
and not on scheduler?
-
\$\begingroup\$ Well some explanations, 1- by saying DataFlow like i was pointing out to the whole thing not the statement behind it only. 2- As i run in trouble using async i just used Wait() as it does the same as i want (which is not executing a job before the other (the dataflow thingy).
3- I don't understand the first part about thread safety so can you be more accurate with some code examples ?
\$\endgroup\$Danial Eugen– Danial Eugen2014年08月11日 18:33:05 +00:00Commented Aug 11, 2014 at 18:33 -
\$\begingroup\$ @DanialEugen You're accessing
_jobs
from two threads (the one that executesProcessJobs()
and the one that executesAddJob()
), and the accesses can happen at the same time. And sinceList<T>
is not thread-safe, your code is not thread-safe. The simplest way to fix that is to access_jobs
always under a lock, so that the accesses from different threads can't happen at the same time. In general, you might want to learn more thread-safety if you want to build multi-threaded applications. \$\endgroup\$svick– svick2014年08月11日 18:51:26 +00:00Commented Aug 11, 2014 at 18:51 -
\$\begingroup\$ ok that makes sense... Also the reason i made the JobTriggered method a part of the scheduler and not the job is that i want to subscribe to it only one time and not subscribing every time i add a new job ? what about that \$\endgroup\$Danial Eugen– Danial Eugen2014年08月12日 03:42:32 +00:00Commented Aug 12, 2014 at 3:42
-
\$\begingroup\$ also does _jobs.Count count for thread-safety ? or i can use it without lock \$\endgroup\$Danial Eugen– Danial Eugen2014年08月12日 04:08:52 +00:00Commented Aug 12, 2014 at 4:08
-
\$\begingroup\$ ah sorry and can you point me to a thread safe solution for the list prob because from what i see List and Thread Safe don't mix :P \$\endgroup\$Danial Eugen– Danial Eugen2014年08月12日 05:17:51 +00:00Commented Aug 12, 2014 at 5:17
abstract
methods. \$\endgroup\$