The following is an TaskScheduler
that always run tasks in a thread it maintains.
When created, a name of the thread was specified. Once you schedule the first task, until it is been Dispose
ed, a thread will be created and wait for tasks to execute.
The reason of this class is that sometimes there is a need to guarantee that some tasks must be always scheduled in a specific thread (not the UI thread though). For example, some 3 party dll may have resource leak if you keep creating new threads to call its functions.
using Task = System.Threading.Tasks.Task;
using Thread = System.Threading.Thread;
using Barrier = System.Threading.Barrier;
using Monitor = System.Threading.Monitor;
using IDisposable = System.IDisposable;
using TaskEnum = System.Collections.Generic.IEnumerable<System.Threading.Tasks.Task>;
using TaskQueue = System.Collections.Generic.Queue<System.Threading.Tasks.Task>;
using Enumerable = System.Linq.Enumerable;
using ObjectDisposedException = System.ObjectDisposedException;
using _Imported_Extensions_;
namespace _Imported_Extensions_
{
public static class Extensions
{
public static bool Any(this TaskEnum te)
{
return Enumerable.Any(te);
}
public static TaskEnum ToList(this TaskEnum te)
{
return Enumerable.ToList(te);
}
}
}
namespace TaskUtils
{
public class SameThreadTaskScheduler : System.Threading.Tasks.TaskScheduler, IDisposable
{
#region publics
public SameThreadTaskScheduler(string name)
{
scheduledTasks = new TaskQueue();
threadName = name;
}
public override int MaximumConcurrencyLevel { get { return 1; } }
public void Dispose()
{
lock (scheduledTasks)
{
quit = true;
Monitor.PulseAll(scheduledTasks);
}
}
#endregion
#region protected overrides
protected override TaskEnum GetScheduledTasks()
{
lock (scheduledTasks)
{
return scheduledTasks.ToList();
}
}
protected override void QueueTask(Task task)
{
if (myThread == null)
myThread = StartThread(threadName);
if (!myThread.IsAlive)
throw new ObjectDisposedException("My thread is not alive, so this object has been disposed!");
lock (scheduledTasks)
{
scheduledTasks.Enqueue(task);
Monitor.PulseAll(scheduledTasks);
}
}
protected override bool TryExecuteTaskInline(Task task, bool task_was_previously_queued)
{
return false;
}
#endregion
private readonly TaskQueue scheduledTasks;
private Thread myThread;
private readonly string threadName;
private bool quit;
private Thread StartThread(string name)
{
var t = new Thread(MyThread) { Name = name };
using (var start = new Barrier(2))
{
t.Start(start);
ReachBarrier(start);
}
return t;
}
private void MyThread(object o)
{
Task tsk;
lock (scheduledTasks)
{
//When reaches the barrier, we know it holds the lock.
//
//So there is no Pulse call can trigger until
//this thread starts to wait for signals.
//
//It is important not to call StartThread within a lock.
//Otherwise, deadlock!
ReachBarrier(o as Barrier);
tsk = WaitAndDequeueTask();
}
for (; ; )
{
if (tsk == null)
break;
TryExecuteTask(tsk);
lock (scheduledTasks)
{
tsk = WaitAndDequeueTask();
}
}
}
private Task WaitAndDequeueTask()
{
while (!scheduledTasks.Any() && !quit)
Monitor.Wait(scheduledTasks);
return quit ? null : scheduledTasks.Dequeue();
}
private static void ReachBarrier(Barrier b)
{
if (b != null)
b.SignalAndWait();
}
}
}
I used an unusual using
block and put all method extensions in use into a single class. The reason is that I want to specify exactly what I wanted from the outside of the code.
It is fine to use traditional using block instead without change any class code, but anyway focus on the class!
What I am concerning is its concurrency correctness. I want to know although this seems to be working, is it actually correct? Are there better way (simpler) to achieve this? Coding style advises are also welcome, thanks.
Specific Questions
Is it safe to use Pulse
rather than PulseAll
in this case?
5 Answers 5
using Task = System.Threading.Tasks.Task;
using Thread = System.Threading.Thread;
using Barrier = System.Threading.Barrier;
using Monitor = System.Threading.Monitor;
using IDisposable = System.IDisposable;
You don't need to write all those using
s one class at a time. In C#, the common approach is to add a using
once for each namespace you need. This is considered a bad practice in C++ (maybe that's why you did it this way?), but that's only because in C++, namespaces are not structured properly (almost everything is directly in std
) and because the naming conventions there (list
, not List
) make collisions more likely.
using TaskEnum = System.Collections.Generic.IEnumerable<System.Threading.Tasks.Task>;
using TaskQueue = System.Collections.Generic.Queue<System.Threading.Tasks.Task>;
This is also not necessary, just add the necessary namespace usings, and the write IEnumerable<Task>
or Queue<Task>
, that's not that long.
namespace _Imported_Extensions_
_Imported_Extensions_
is a weird name for a namespace. Why all the underscores? And the convention is to use PascalCase (e.g. ImportedExtensions
) for namespaces too.
And what does the name even mean? Why is it important to stress out that those extensions were imported? And from where?
Also, it's not common to have multiple namespaces in the same file. If the class is used only in this file, put it in the same namespace as everything else in that file.
public static bool Any(this TaskEnum te)
public static TaskEnum ToList(this TaskEnum te)
Both of the extension methods are completely unnecessary. If you just added using System.Linq;
, both would work by themselves.
if (myThread == null)
myThread = StartThread(threadName);
This is not thread-safe. If two threads call this method at the same time, StartThread()
will be called twice and two threads will be created.
Also, why is the thread started here and not in the constructor?
if (!myThread.IsAlive)
I don't think this is the right check here. Checking quit
would be better, because that means enqueuing stops working as soon as the scheduler is disposed.
I don't like that your fields are in the middle of the class. If you put them at (or near) the top, they will be easier to find.
I think the way you're using Barrier
is clumsy. If you want a notification that the worker thread is ready, use something like ManualResetEvent
.
Also, you seem to be trying to protect against Barrier
being null
, but that can never happen here. So doing that just makes your code longer and more confusing.
Even better option would be to use a queue that already supports blocking when no items are available: BlockingCollection
.
Is it safe to use
Pulse
rather thanPulseAll
in this case?
Yes, it is, since you're always going to have only one thread waiting.
Also, if I wanted something like this, I would either use ConcurrentExclusiveSchedulerPair.ExclusiveScheduler
, if the tasks didn't have to execute on the same thread, just being mutually exclusive.
Or some scheduler from ParallelExtensionsExtras, if a single thread was a requirement.
-
\$\begingroup\$ No, my way of use "using" is from Java and Haskell. In those languages, people prefer precise symbol import. \$\endgroup\$Earth Engine– Earth Engine2014年03月10日 22:43:03 +00:00Commented Mar 10, 2014 at 22:43
-
\$\begingroup\$ +1 Live example. From all the good answers provided by everyone here I am now getting a better understanding of appropriate use of this topic and IMHO other programmers will definitely benifit from the knowledge sharing here. Some of the implementations that I had worked in the past could have been better implemented using this knowledge. IMHO, better samples for minimize learning curve are real applications with full source code and good patterns \$\endgroup\$Kiquenet– Kiquenet2014年09月04日 06:53:53 +00:00Commented Sep 4, 2014 at 6:53
-
\$\begingroup\$ @svick Could you specify which scheduler from PEE would be suitable if a single thread is a requirement? They have too many responsibilities there.. \$\endgroup\$tsul– tsul2021年06月04日 09:13:09 +00:00Commented Jun 4, 2021 at 9:13
Not exactly a review, but since you are asking for simpler way... The simpler way is to run your tasks via dispatcher. Just run it on background thread:
_thread = new Thread(() =>
{
_dispatcher = Dispatcher.CurrentDispatcher;
Dispatcher.Run();
});
_thread.Start();
And use _dispatcher.BeginInvoke
and _dispatcher.Invoke
to run your tasks on that thread.
It is a lot simpler than reinventing the wheel. The obvious downside is wpf dependency.
-
2\$\begingroup\$ Sorry, I marked another answer, because yours is "not exactly a review". But still, +1ed. \$\endgroup\$Earth Engine– Earth Engine2014年03月14日 12:39:18 +00:00Commented Mar 14, 2014 at 12:39
-
\$\begingroup\$ Thanks. This is a nice trick and answers the question. Note I wasn't able to run other tasks on that thread, even using Dispatcher.Yield. I don't know why, that's the reason I built another answer to that question, using Tasks which BTW solves the dependency on UI-oriented framework. \$\endgroup\$Simon Mourier– Simon Mourier2018年12月04日 09:45:17 +00:00Commented Dec 4, 2018 at 9:45
I had a similar problem. Here is an easy way to make all your tasks run exclusively on one thread for all tasks. And you can also run the them Concurrently to using Scheduler.ConcurrentScheduler
.
ConcurrentExclusiveSchedulerPair Scheduler = new ConcurrentExclusiveSchedulerPair();
Task.Factory.StartNew(() =>
{
DoSomthing();
}, CancellationToken.None, TaskCreationOptions.DenyChildAttach, Scheduler.ExclusiveScheduler);
Resources:
-
\$\begingroup\$ Unfortunately this runs tasks synchronously, but on different threads. See dotnetfiddle.net/6roLaj (.NET Core) and dotnetfiddle.net/TrSA7q (.NET FW). That's not OP wanted I suggest. \$\endgroup\$tsul– tsul2021年06月02日 13:18:19 +00:00Commented Jun 2, 2021 at 13:18
Building on user161231's code (thanks!), here is a complete answer that uses modern .NET framework objects. Sorry it's not strictly a code review (although for me a code review that removes code and favors using frameworks's primitives is a good code review), but it answers the same question.
It demonstrates not only how to run tasks on a specific thread, but also how to schedule other tasks on this unique thread and how to stop that thread:
var scheduler = new ConcurrentExclusiveSchedulerPair();
// create a stop request source
var stop = new CancellationTokenSource();
// this will run on a specific thread
var task = Task.Factory.StartNew(MyAction,
stop.Token,
stop.Token,
TaskCreationOptions.DenyChildAttach,
scheduler.ExclusiveScheduler);
... do something
// this is how to schedule a task on the *same* thread.
// a moral equivalent of BeginInvoke in UI-oriented scenarios like Winforms of WPF but w/o any dependencies on those frameworks)
Task.Factory.StartNew(() =>
{
... do something that will run on the scheduler's thread
}, CancellationToken.None, TaskCreationOptions.DenyChildAttach, scheduler.ExclusiveScheduler);
... do something
// this is how to request the thread to stop
stop.Cancel();
// the one-thread code
static async void MyAction(object state)
{
var stop = (CancellationToken)state;
// do something useful. all this could be in a loop, while, etc. ....
// sometimes, regularly, check for the stop and quit if requested
if (stop.IsCancellationRequested)
return; // end of thread is here
// do something useful ....
// sometimes, regularly, let other scheduled tasks run.
// they will run on *this* thread
await Task.Yield();
// do something useful, loop, etc. ....
// end of thread is not here!
}
-
\$\begingroup\$ You have presented an alternative solution, but haven't reviewed the code. Please edit to show what aspects of the question code prompted you to write this version, and in what ways it's an improvement over the original. It may be worth (re-)reading How to Answer. \$\endgroup\$Toby Speight– Toby Speight2018年12月04日 10:36:00 +00:00Commented Dec 4, 2018 at 10:36
-
2\$\begingroup\$ @TobySpeight - I have nothing to edit. The question is useful to many, and my answer is at least useful to me (and it seems at least one upvoter). There's no equivalent to this question on stackoverflow, to my knowledge, otherwise I'd have posted there. Also note two other answers for this are not code reviews at all. \$\endgroup\$Simon Mourier– Simon Mourier2018年12月04日 11:20:19 +00:00Commented Dec 4, 2018 at 11:20
-
\$\begingroup\$ Couldn't get this to run tasks on a dedicated single thread. It seems they are run on different threads. Would you share an example? \$\endgroup\$tsul– tsul2021年06月02日 13:21:37 +00:00Commented Jun 2, 2021 at 13:21
Not a review, but there is a working solution with a custom TaskScheduler which seems a bit simplier: https://stackoverflow.com/a/30726903/2007631
And here is a working example: https://dotnetfiddle.net/4qCiMH
Explore related questions
See similar questions with these tags.