11
\$\begingroup\$

I'm trying to implement a TaskScheduler that runs all tasks in the order they are submitted, on a single dedicated thread. Here's my implementation, using a BlockingCollection:

class SequentialScheduler : TaskScheduler, IDisposable {
 readonly BlockingCollection<Task> m_taskQueue = new BlockingCollection<Task>();
 readonly Thread m_thread;
 bool m_disposed;
 public SequentialScheduler() {
 m_thread = new Thread(Run);
 m_thread.Start();
 }
 public void Dispose() {
 m_disposed = true;
 }
 void Run() {
 while (!m_disposed) {
 var task = m_taskQueue.Take();
 Debug.Assert(TryExecuteTask(task));
 }
 }
 protected override IEnumerable<Task> GetScheduledTasks() {
 return m_taskQueue;
 }
 protected override void QueueTask(Task task) {
 m_taskQueue.Add(task);
 }
 protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) {
 if (Thread.CurrentThread == m_thread) {
 return TryExecuteTask(task);
 }
 return false;
 }
}

I've played with it a bit and it seems to work well. I have some lingering questions however:

  • As you can tell by the Debug.Assert, I'm not sure how TryExecuteTask could return false and I'm just assuming that given my current implementation, it won't. Can that actually happen and what I should I do if it does?
  • I'm not sure my implementation of TryExecuteTaskInline makes sense as it doesn't get called in my tests. If I understand correctly, this method should run the task synchronously if possible; hence why I'm checking if the current thread is the scheduler's dedicated thread.
  • Also, my implementation of Dispose doesn't wait for the current task to complete, and will cause the rest of the tasks in the queue to just wait there forever, but short of making it blocking and wait for the queue to empty, I don't see how to do that any differently. I just need a way to release the thread.
dfhwze
14.1k3 gold badges40 silver badges101 bronze badges
asked Mar 8, 2014 at 21:11
\$\endgroup\$

2 Answers 2

7
\$\begingroup\$

TryExecuteTaskInline is used in what is called "task inlining": basically, when you call Wait() on a Task that didn't start executing yet, it might be executed on the current thread. A simple way to test that is:

var factory = new TaskFactory(new SequentialScheduler());
factory.StartNew(
 () =>
 {
 factory.StartNew(() => { }).Wait();
 });

For more information, see Stephen Toub's article Task.Wait and "Inlining".

But this all means that a Task might be executed outside of your Run() loop, so the call to TryExecuteTask() there might return false. Because of that, you should simply ignore the return value there (just like the official example scheduler does, in its NotifyThreadPoolOfPendingWork()).

Another option would be to remove inlined Tasks from the queue, but there is no simple way to do that for BlockingCollection.


I think that m_disposed should be volatile, otherwise, the Run() loop can be optimized into an infinite loop that checks the value of m_disposed only once, at the start.


For disposal, you might want to use the completion capability of BlockingQueue. That way, trying to schedule a new Task after the scheduler has been disposed will throw, which I think is the correct behavior.

If you do this, you can also rewrite Run() to use GetConsumingEnumerable(), and remove m_disposed altogether.

answered Mar 18, 2014 at 20:04
\$\endgroup\$
2
  • \$\begingroup\$ Thanks a lot. Does this mean that my implementation of TryExecuteInline is correct? \$\endgroup\$ Commented Mar 22, 2014 at 2:20
  • \$\begingroup\$ I have difficulty seeing under what circumstances a task might be executed outside the Run() loop yet on the same thread, so perhaps my check there is unnecessary. \$\endgroup\$ Commented Mar 22, 2014 at 2:28
6
\$\begingroup\$

Dispose the scheduler gracefully

As you mentioned, you lack proper disposal functionality.

void Run() {
 while (!m_disposed) {
 var task = m_taskQueue.Take();
 Debug.Assert(TryExecuteTask(task));
 }
}

The first issue is that bool m_disposed is not marked as volatile . This means that this variable is subject to optimisations that could render its usage invalid for your scenario. If optimized, the value will be cached once, and never re-read in the same ongoing method call, causing an infinite loop.

The second issue is that you can't abort waiting a new task using Take(). But there is a simple alternative Take(CancellationToken ct); Keep track of a CancellationTokenSource in your task scheduler, pass its Token to Take(token); on disposal of the instance, flag it by calling Cancel().

I would also verify how Debug.Assert(TryExecuteTask(task)) gets optimized in production code. And whether this could introduce unwanted side effects.

Revised

class SequentialScheduler : TaskScheduler, IDisposable {
 readonly BlockingCollection<Task> m_taskQueue = new BlockingCollection<Task>();
 readonly Thread m_thread;
 readonly CancellationTokenSource m_cancellation; // CR comment: field added
 volatile bool m_disposed; // CR comment: volatile added
 public SequentialScheduler() {
 m_cancellation = new CancellationTokenSource();
 m_thread = new Thread(Run);
 m_thread.Start();
 }
 public void Dispose() {
 m_disposed = true;
 m_cancellation.Cancel(); // CR comment: cancellation added
 }
 void Run() {
 while (!m_disposed) {
 // CR comment: dispose gracefully
 try
 {
 var task = m_taskQueue.Take(m_cancellation.Token);
 // Debug.Assert(TryExecuteTask(task));
 TryExecuteTask(task); // CR comment: not sure about the Debug.Assert here
 }
 catch (OperationCanceledException)
 { 
 Debug.Assert(m_disposed);
 }
 }
 }
 protected override IEnumerable<Task> GetScheduledTasks() {
 return m_taskQueue;
 }
 protected override void QueueTask(Task task) {
 m_taskQueue.Add(task);
 }
 protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) {
 if (Thread.CurrentThread == m_thread) {
 return TryExecuteTask(task);
 }
 return false;
 }
}
answered Jul 25, 2019 at 17:30
\$\endgroup\$

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.