As a follow up to this first attempt to create and use my own IObservable<T>
implementation, this is version 2.0. The goal has been to correct the code where there were conceptual misunderstandings and to take the issues pointed to by answers into account.
The overall concept is the same: A scheduler that can fire off jobs when they want to be in intervals as small as one second. Instead of using a timer, the Observer pattern is used by implementing an IObservable<DateTime>
type, that notifies subscribers each second.
The Scheduler
public class HScheduler : IDisposable
{
public const int UnlimitedJobParallelism = -1;
private const int DefaultWaitTimeout = 60000;
HSecondCounter m_counter;
ConcurrentDictionary<IScheduleJob, int> m_parallelCounters = new ConcurrentDictionary<IScheduleJob, int>();
List<AutoResetEvent> m_waitHandles = new List<AutoResetEvent>();
public HScheduler(int waitTimeout = DefaultWaitTimeout)
{
WaitTimeout = waitTimeout;
}
public bool IsDisposed => m_counter == null;
public bool IsRunning => !IsDisposed;
/// <summary>
/// When the scheduler is stopped the amount of milliseconds to wait for all actions to finish.
/// </summary>
public int WaitTimeout { get; set; }
public void Start()
{
if (IsRunning)
return;
m_counter = HSecondCounter.CountAsync();
Log($"Scheduler started");
}
public ScheduledJobHandle Schedule(IScheduleJob job)
{
if (!RegisterJob(job)) return null;
IDisposable handle = m_counter
.Where(t => CanRun(job) && job.ShouldRun(t))
.Subscribe((time) =>
{
AutoResetEvent waitHandle = null;
try
{
waitHandle = RegisterAction(job);
job.Action(time);
}
catch (Exception ex)
{
Log($"Job Action Error (1): {job.Name}: Error Message: {ex.Message}");
}
finally
{
ReleaseAction(job, waitHandle);
}
},
(ex) =>
{
Log($"Job Action Error (2): {job.Name}: Error Message: {ex.Message}");
},
() =>
{
Log($"Job Action Completed: {job.Name}");
});
return new ScheduledJobHandle(job, Disposable.Create(() =>
{
handle.Dispose();
RemoveJob(job);
}));
}
private bool RegisterJob(IScheduleJob job)
{
if (m_parallelCounters.ContainsKey(job))
{
Log($"Job already registered and running.: {job.Name}");
return false;
}
else
{
m_parallelCounters.AddOrUpdate(job, 0, (sj, c) => c);
Log($"Job Registered: {job.Name}");
return true;
}
}
private void RemoveJob(IScheduleJob job)
{
m_parallelCounters.TryRemove(job, out _);
}
private AutoResetEvent RegisterAction(IScheduleJob job)
{
Log($"Job Action Started: {job.Name}");
m_parallelCounters.AddOrUpdate(job, 1, (scr, c) => c + 1);
lock (m_waitHandles)
{
AutoResetEvent waitHandle = new AutoResetEvent(false);
m_waitHandles.Add(waitHandle);
return waitHandle;
}
}
private void ReleaseAction(IScheduleJob job, AutoResetEvent waitHandle)
{
if (waitHandle != null)
{
lock (m_waitHandles)
{
m_waitHandles.Remove(waitHandle);
if (!waitHandle.SafeWaitHandle.IsClosed && !waitHandle.SafeWaitHandle.IsInvalid)
{
waitHandle.Set();
}
waitHandle.Dispose();
}
}
Log($"Job Action Finished: {job.Name}");
if (job.DegreeOfParallelism != UnlimitedJobParallelism)
m_parallelCounters.AddOrUpdate(job, 0, (scr, c) => c - 1);
}
private bool CanRun(IScheduleJob job)
{
if (job.DegreeOfParallelism == UnlimitedJobParallelism) return true;
int value = m_parallelCounters.GetOrAdd(job, 0);
bool result = value < job.DegreeOfParallelism;
return result;
}
internal void Stop()
{
if (IsRunning)
{
if (m_waitHandles.Count > 0)
{
AutoResetEvent[] waitHandles = m_waitHandles.ToArray(); ;
WaitHandle.WaitAll(waitHandles, WaitTimeout);
Array.ForEach(waitHandles, (wh) => wh.Dispose());
}
Log($"Scheduler STOPPED.");
m_counter.Stop();
m_counter = null;
}
}
private void Log(string format, params object[] parameters)
{
string message = string.Format(format, parameters);
Console.WriteLine($"[{DateTime.Now}]: {message}");
}
public void Dispose()
{
Stop();
}
}
The main changes here are:
1) The naming: Instead of Subscribe
, the client now Schedules
a job. The use of the term "Subscribe" was as confusion of the two concepts scheduling and subscription to an observable. The client of the Scheduler, should be agnostic about how the Scheduler works internally - and a scheduler schedules jobs.
2) Waithandles: When the Scheduler stops, it will now wait for running jobs to finish before it finally terminates.
3) Logging: Introduced some basic logging of events.
The "Counter"
This is essentially the same as the previous. It notifies observers for every second and act fundamentally seen as a timer.
public class HSecondCounter : IObservable<DateTime>
{
static public HSecondCounter Count()
{
HSecondCounter counter = new HSecondCounter();
Task.Factory.StartNew(() =>
{
counter.Run((now) =>
{
lock (counter.m_observers)
{
foreach (var observer in counter.m_observers.ToArray())
{
observer.OnNext(now);
}
Console.WriteLine("HSecondCounter: {0}", now);
}
});
}, TaskCreationOptions.LongRunning);
return counter;
}
static public HSecondCounter CountAsync()
{
HSecondCounter counter = new HSecondCounter();
Task.Factory.StartNew(() =>
{
counter.Run((now) =>
{
lock (counter.m_observers)
{
foreach (var observer in counter.m_observers.ToArray())
{
Task.Factory.StartNew(() =>
{
observer.OnNext(now);
});
}
Console.WriteLine("HSecondCounter: {0}", now);
}
});
}, TaskCreationOptions.LongRunning);
return counter;
}
HashSet<IObserver<DateTime>> m_observers = new HashSet<IObserver<DateTime>>();
volatile bool m_doContinue = true;
private HSecondCounter()
{
}
public void Stop()
{
m_doContinue = false;
}
public IDisposable Subscribe(IObserver<DateTime> observer)
{
lock (m_observers)
{
m_observers.Add(observer);
}
return Disposable.Create(() =>
{
lock (m_observers)
{
m_observers.Remove(observer);
}
observer.OnCompleted();
});
}
private void Run(Action<DateTime> notifier)
{
try
{
int lastSecond = 0;
while (m_doContinue)
{
DateTime now = DateTime.Now;
if (now.Second != lastSecond)
{
notifier(now);
}
lastSecond = now.Second;
Thread.Sleep(500);
}
}
catch (Exception ex)
{
lock (m_observers)
{
foreach (var observer in m_observers.ToArray())
{
observer.OnError(ex);
}
}
}
finally
{
lock (m_observers)
{
foreach (var observer in m_observers.ToArray())
{
observer.OnCompleted();
}
}
Console.WriteLine($"HSceondCounter ended at: {DateTime.Now}");
}
}
}
The original Run(...)
method had this definition:
async private void Run(Action<DateTime> notifier)
which is a misunderstanding of the async
concept in that an async
method (unless it is a UI-event handler) should return Task
. In the new implementation async
is removed. This has the positive effect that when Run(..)
is called by Count()
and CountAsync()
the parameter TaskCreationOptions.LongRunning
to Task.Factory.StartNew()
has more meaning. In the first version that call returned immediately after starting Run(...)
which in turn in fact was the long running thread.
It is in fact possible to await a call to HSceondCounter.CountAsync()
like this:
async Task Start()
{
DateTime time = await HSecondCounter.CountAsync();
}
But this will halt the calling thread while HSecondCounter ticking every second infinitely with no way to stop it. So this is not useful.
Other Objects
public interface IScheduleJob
{
string Name { get; }
Action<DateTime> Action { get; }
bool ShouldRun(DateTime time);
int DegreeOfParallelism { get; }
}
public sealed class ScheduledJobHandle : IDisposable
{
IDisposable m_terminateHandle;
public ScheduledJobHandle(IScheduleJob job, IDisposable terminateHandle)
{
Job = job;
m_terminateHandle = terminateHandle;
}
public IScheduleJob Job { get; }
public bool IsDisposed => m_terminateHandle == null;
public void Dispose()
{
if (m_terminateHandle != null)
{
m_terminateHandle.Dispose();
m_terminateHandle = null;
}
}
}
public class ScheduleJob : IScheduleJob
{
public string Name { get; set; }
public Action<DateTime> Action { get; set; }
public Func<DateTime, bool> ShouldRunPredicate { get; set; }
public int DegreeOfParallelism { get; set; }
public bool ShouldRun(DateTime time)
{
return ShouldRunPredicate(time);
}
public override string ToString()
{
return Name;
}
}
Test Case
void TestSchedule()
{
HScheduler scheduler = new HScheduler();
scheduler.Start();
ScheduleJob job1 = new ScheduleJob
{
Name = "AAAA",
DegreeOfParallelism = HScheduler.UnlimitedJobParallelism,
Action = (value) =>
{
Console.WriteLine($" AAAA Running:{value} - {value.Millisecond} - Thread Id: {Thread.CurrentThread.ManagedThreadId}");
Thread.Sleep(5000);
},
ShouldRunPredicate = (time) =>
{
return true; // time.Second % 2 == 0;
}
};
ScheduleJob job2 = new ScheduleJob
{
Name = "BBBB",
DegreeOfParallelism = 1,
Action = (value) =>
{
Console.WriteLine($" BBBB Running:{value} - {value.Millisecond} - Thread Id: {Thread.CurrentThread.ManagedThreadId}");
Thread.Sleep(4000);
},
ShouldRunPredicate = (time) =>
{
return time.Second % 5 == 0;
}
};
ScheduledJobHandle scheduledJob1 = scheduler.Schedule(job1);
ScheduledJobHandle scheduledJob2 = scheduler.Schedule(job2);
ScheduledJobHandle scheduledJob11 = scheduler.Schedule(job1);
Console.ReadLine();
if (scheduledJob1 != null)
scheduledJob1.Dispose();
if (scheduledJob2 != null)
scheduledJob2.Dispose();
if (scheduledJob11 != null)
scheduledJob11.Dispose();
scheduler.Stop();
Console.WriteLine("After Scheduler Stop");
Console.ReadLine();
}
The previous version worked in general as expected. This one IMO a little more. I'm sure you can find a lot of possible drawbacks, so you're very welcome to do so.
The origin to this work is still t3chb0t's inspiring posts.
1 Answer 1
1) int
as time interval is ambiguous. TimeSpan
is not. So I would use the latter.
public int WaitTimeout { get; set; }
2) IsDisposed
is a misleading name. I would not expect new HScheduler(...).IsDisposed
to return true
.
3) This does not look threadsafe:
WaitHandle.WaitAll(waitHandles, WaitTimeout); Array.ForEach(waitHandles, (wh) => wh.Dispose());
Say first line returns due to timeout. Which I assume means that some "jobs" are still running. How can you guarantee that disposed handle will not be accessed on background thread? waitHandle.SafeWaitHandle.IsClosed
check looks like a racing condition. Imagine the following scenario:
- Thread#1: checks that
waitHandle.SafeWaitHandle.IsClosed == false
- Thread#2: executes
(wh) => wh.Dispose()
in dispose method - Thread#1: executes
waitHandle.Set();
- Thread#1: throws
ObjectDisposedException
4) The way you handle exceptions looks weird:
catch (Exception ex) { Log($"Job Action Error (1): {job.Name}: Error Message: {ex.Message}"); } finally { ReleaseAction(job, waitHandle); } }, (ex) => { Log($"Job Action Error (2): {job.Name}: Error Message: {ex.Message}"); },
I think you should either use catch
or OnError
, but not both. Also aren't you already catching all errors in your HSecondCounter
implementation? Another advice: do not discard the callstack, when you log. Otherwise, how are you going to find the source of exception?
5) Don't use Task.Factory.StartNew without specifying TaskScheduler.
6) I don't quite like HSecondCounter
API. I would expect it to behave the same way most other observables do: it should automatically start inner timer when first subscriber arrives and it should automatically stop timer when last subscriber disconnects. Having to manually call Stop
looks inconvenient (although I imagine some Pause/Unpause
functionality might actually be useful).
7) Whether or not HSecondCounter
uses multiple threads should be an implementation detail of Run
method. This logic should not reside in static factory methods, IMHO. Or you might even want to abstract it out into separate component.
8) I would name static methods CreateCounter()
or something. Otherwise it is not very obvious, what they actually do.
9) This does not look very precise as far as timers are concerned:
Thread.Sleep(500);
First, you can use WaitHandle.WaitOne(500)
instead, so you can actually cancel the wait from Stop
maethod. Second, I think you can easily estimate how many milliseconds you have to wait on every iteration, instead of waiting for half a second every time.
var now = DateTime.Now;
//something along those lines? It is not quite clear, what your logic is.
var nextTick = new DateTime(now.Years, now.Months, now.Days, now.Hours, now.Minutes, now.Seconds + 1);
TimeSpan delay = nextTick - now;
waitHandle.WaitOne(delay.TotalMilliseconds);
10) Maybe this is intentional (I have not been following your previous questions), but this does not look very safe:
foreach (var observer in counter.m_observers.ToArray()) { Task.Factory.StartNew(() => { observer.OnNext(now); }); }
You do not wait for inner tasks to complete, so it looks like you gonna be in trouble if they do not complete in 1 second, before the next tick of your "timer". I suggest you use Parallel.ForEach
instead, it will make sure that all tasks are finished before the next iteration.
-
\$\begingroup\$ Thanks for feedback. I'll come back with comments soon... \$\endgroup\$user73941– user739412018年04月18日 17:33:46 +00:00Commented Apr 18, 2018 at 17:33
-
\$\begingroup\$ 1) Where do you mean?; 2) OK, 3) Array.ForEach needs a lock, IsClosed why racing condition(?) (Dispose() == Close()) 4) The try...catch is for exceptions in HScheduler and for unhandled exceptions in job.Action(...). The (ex) => ... is for errors comming from HSecondCounter (The error message is a little misleading) \$\endgroup\$user73941– user739412018年04月19日 05:49:32 +00:00Commented Apr 19, 2018 at 5:49
-
\$\begingroup\$ 5) Stephen is always worth reading. I use
StartNew()
instead of justRun()
, for the parameterTaskCreationOptions.LongRunning
. After rereading the documentation for that setting, I'm not sure, it's the right thing to do. As I understand it, StartNew(() => {}, TaskCreationOptions.LongRunning) just starts on the default scheduler, and that seems OK here.Factory.StartNew(action) == Task.Run(action)
. One could claim that such a potentially long running task (a HScheduler could be running for years) should be running on a normal thread instead? \$\endgroup\$user73941– user739412018年04月19日 05:49:46 +00:00Commented Apr 19, 2018 at 5:49 -
\$\begingroup\$ 6) OK, The counter shouldn't dispatch ticks if no one really cares. Start/Stop can be handled internally. 7) We just disagree on that :-) 8) I'm not sure if HSecondCounter is a very good name either 9) I'm not sure if you have read my explanation/discussion in the first version of this post. If you have then please update your answer with some code that shows a realiable solution. \$\endgroup\$user73941– user739412018年04月19日 05:50:03 +00:00Commented Apr 19, 2018 at 5:50
-
\$\begingroup\$ @HenrikHansen,
As I understand it, StartNew(() => {}, TaskCreationOptions.LongRunning) just starts on the default scheduler
this is exactly the misconception that the article talks about. No it does not use Default scheduler. AllTaskFactory
methods use Current scheduler by default unless specified otherwise. Whether or not it matters when you run a LongRunning task (which, to my knowledge, should always run on newly created thread) is a different question. I just made sure you are aware of this pitfall. \$\endgroup\$Nikita B– Nikita B2018年04月19日 08:13:04 +00:00Commented Apr 19, 2018 at 8:13
AutoResetEvent
... I don't get this one at all so let's wait for someone else who can review it this time ;-) \$\endgroup\$