With great inspiration from t3chb0ts ongoing work about scheduling, cron jobs etc. (last post here), I decided to make my own version of the Scheduler - using the observer pattern - as a learning project.
The scheduler
public class HScheduler
{
public const int UnlimitedJobParallelism = -1;
HSecondCounter m_counter;
ConcurrentDictionary<IScheduleSubscriber, int> m_parallelCounters = new ConcurrentDictionary<IScheduleSubscriber, int>();
public void Start()
{
m_counter = HSecondCounter.CountAsync();
}
public IDisposable Subscribe(IScheduleSubscriber subscriber)
{
IDisposable handle = m_counter
.Where(t => subscriber.ShouldRun(t) && CanRun(subscriber))
.Subscribe((time) =>
{
try
{
subscriber.Action(time);
DecrementSubscriber(subscriber);
}
catch (Exception ex)
{
// TODO: something should be done here (logging, notifying...)
Console.WriteLine(ex.Message);
}
},
(ex) =>
{
// TODO: something should be done here (logging, notifying...)
Console.WriteLine(ex.Message);
},
() =>
{
Console.WriteLine($"{subscriber.Name} Completed");
});
return Disposable.Create(() =>
{
handle.Dispose();
RemoveSubscriber(subscriber);
});
}
private void RemoveSubscriber(IScheduleSubscriber subscriber)
{
m_parallelCounters.TryRemove(subscriber, out _);
}
private void DecrementSubscriber(IScheduleSubscriber subscriber)
{
if (subscriber.DegreeOfParallelism != UnlimitedJobParallelism)
m_parallelCounters.AddOrUpdate(subscriber, 0, (scr, c) => c - 1);
}
private bool CanRun(IScheduleSubscriber subscriber)
{
if (subscriber.DegreeOfParallelism == UnlimitedJobParallelism) return true;
int value = m_parallelCounters.GetOrAdd(subscriber, 0);
bool result = value < subscriber.DegreeOfParallelism;
if (result)
{
m_parallelCounters.AddOrUpdate(subscriber, 1, (scr, c) => c + 1);
}
return result;
}
internal void Stop()
{
m_counter.Stop();
}
}
This is at large following t3chb0ts pattern, but is not dedicated to cron jobs. Instead of the use of Observable.Interval
, I decided to make my own "time pump" which resulted in the HSecondCounter
.
HSecondCounter
HSecondCounter
is an implementation of the IObservable<DateTime>
interface and it is locked to ticking each second. It is an attempt to solve the problem that Xiaoy312 describes here. The solution for now is to check the current time for every half second but only dispatch ticks if the second component of the current time has changed since last tick. It seems to work, but I'm sure someone can shoot it down?
public class HSecondCounter : IObservable<DateTime>
{
static public HSecondCounter Count()
{
HSecondCounter counter = new HSecondCounter();
Task.Factory.StartNew(() =>
{
counter.Run((now) =>
{
lock (counter.m_observers)
{
foreach (var observer in counter.m_observers)
{
observer.OnNext(now);
}
Console.WriteLine("HSecondCounter: {0}", now);
}
});
}, TaskCreationOptions.LongRunning);
return counter;
}
static public HSecondCounter CountAsync()
{
HSecondCounter counter = new HSecondCounter();
Task.Factory.StartNew(() =>
{
counter.Run((now) =>
{
lock (counter.m_observers)
{
foreach (var observer in counter.m_observers)
{
Task.Factory.StartNew(() =>
{
observer.OnNext(now);
});
}
Console.WriteLine("HSecondCounter: {0}", now);
}
});
}, TaskCreationOptions.LongRunning);
return counter;
}
List<IObserver<DateTime>> m_observers = new List<IObserver<DateTime>>();
volatile bool m_doContinue = true;
private HSecondCounter()
{
}
public void Stop()
{
m_doContinue = false;
}
public IDisposable Subscribe(IObserver<DateTime> observer)
{
lock (m_observers)
{
if (!m_observers.Contains(observer))
{
m_observers.Add(observer);
}
}
return Disposable.Create(() =>
{
lock (m_observers)
{
m_observers.Remove(observer);
}
observer.OnCompleted();
});
}
async private void Run(Action<DateTime> notifier)
{
try
{
int lastSecond = 0;
while (m_doContinue)
{
DateTime now = DateTime.Now;
if (now.Second != lastSecond)
{
notifier(now);
}
lastSecond = now.Second;
await Task.Delay(500);
}
}
catch (Exception ex)
{
lock (m_observers)
{
foreach (var observer in m_observers.ToArray())
{
observer.OnError(ex);
}
}
}
finally
{
lock (m_observers)
{
foreach (var observer in m_observers)
{
observer.OnCompleted();
}
}
Console.WriteLine($"HSceondCounter ended at: {DateTime.Now}");
}
}
It can be run "normally" by calling HSceondCounter.Count()
or async by calling HSceondCounter.CountAsync()
. Running normally means that the next notification (tick) is not sent before the previous returns. This is how Observable.Interval(...)
works. Async means that the notification to each observer is sent independently. There are pros and cons in both cases(?). At the outer level they are both awaitable, so maybe the naming is somewhat misleading?
IScheduleSubscriber
The contract for subscribers to the HScheduler
.
public interface IScheduleSubscriber
{
string Name { get; }
Action<DateTime> Action { get; }
bool ShouldRun(DateTime time);
int DegreeOfParallelism { get; }
}
ScheduleSubscriber
A test implementation of IScheduleSubscriber
:
public class ScheduleSubscriber : IScheduleSubscriber
{
public string Name { get; set; }
public Action<DateTime> Action { get; set; }
public Func<DateTime, bool> ShouldRunPredicate { get; set; }
public int DegreeOfParallelism { get; set; }
public bool ShouldRun(DateTime time)
{
return ShouldRunPredicate(time);
}
public override string ToString()
{
return Name;
}
}
Test Case
void TestSchedule()
{
HScheduler scheduler = new HScheduler();
scheduler.Start();
ScheduleSubscriber subscriber1 = new ScheduleSubscriber
{
Name = "AAAA",
DegreeOfParallelism = HScheduler.UnlimitedJobParallelism,
Action = (value) =>
{
Console.WriteLine($"AAAA: {value} - {value.Millisecond} - Thread Id: {Thread.CurrentThread.ManagedThreadId}");
Thread.Sleep(5000);
Console.WriteLine("AAAA Finished");
},
ShouldRunPredicate = (time) =>
{
return time.Second % 2 == 0;
}
};
ScheduleSubscriber subscriber2 = new ScheduleSubscriber
{
Name = "BBBB",
DegreeOfParallelism = 1,
Action = (value) =>
{
Console.WriteLine($"BBBB: {value} - {value.Millisecond} - Thread Id: {Thread.CurrentThread.ManagedThreadId}");
Thread.Sleep(3000);
Console.WriteLine("BBBB Finished");
},
ShouldRunPredicate = (time) =>
{
return time.Second % 5 == 0;
}
};
using (IDisposable scheduledJob1 = scheduler.Subscribe(subscriber1))
using (IDisposable scheduledJob2 = scheduler.Subscribe(subscriber2))
{
Console.ReadLine();
}
scheduler.Stop();
Console.ReadLine();
}
1 Answer 1
It's very intersting to see it done the hard way and I have to study it a little bit more but so far I found a couple of things that I believe should be improved. (They aren't in any particular order.)
There is one issue with stopping the scheduler. You are using the m_doContinue
variable to control the while
loop of the Run
method but there is nowhere any mechanism to stop the LongRunning
task.
I think the scheduler should be IDisposable
and use a CancellationTokenSource
to cancel the Task.Factory.StartNew
method that initialized it.
Another thing I'd change is the List<IObserver<DateTime>> m_observers
variable. If you made it a HashSet
, you wouldn't need the if
for Subscribe
or you could just use a ConcurrentDictionary
and remove the two lock
s.
I also noticed that the HScheduler
has a Subscribe
method but isn't derived from the IObservable
interface.
The HSecondCounter
has two very similar methods: Count
and CountAsync
. I find this duplication is not necessary. Count
could call CountAsync
synchronously. In fact, they are identical. Additionaly I expect CountAsync
to return a Task
so that I can await
it (if I wanted to) - this is the usual convention for xAsync
methods.
-
\$\begingroup\$ Thanks for the feedback. You have some good observations. I think I'll write a revision in an answer where I also will comment the issues you address. \$\endgroup\$user73941– user739412018年04月17日 17:23:08 +00:00Commented Apr 17, 2018 at 17:23
-
1\$\begingroup\$ I've posted a new version as a new question. I think, I've incorporated most of your points. The cancelllationToken must wait to version 3.0. codereview.stackexchange.com/questions/192354/… \$\endgroup\$user73941– user739412018年04月18日 06:48:56 +00:00Commented Apr 18, 2018 at 6:48
HSecondCounter
. I needed quite a while to figure it out but if get this correctly, you let the timer tick a little bit faster to prevent the missing second withawait Task.Delay(500);
, right? \$\endgroup\$DateTime.Now
with the "missing" second when the milliseconds roundtrips to 000 else there are two. I have had it running for a while without missing seconds - but who knows :-) \$\endgroup\$