My aim is to avoid using threadpool threads for CPU bound work, thus avoiding a situation where IIS stops responding to new requests.
Can you see any problems with the code below? Is this a safe/clean approach? Can you offer any improvements?
private static ConcurrentQueue<Job> Jobs = new ConcurrentQueue<Job>();
static int threadCount = 0;
private void QueueJob(Job job)
{
bool startQueue=false;
lock(Jobs)
{
if (threadCount == 0)
{
Interlocked.Increment(ref threadCount);
startQueue = true;
}
}
Jobs.Enqueue(job);
if (startQueue)
{
var t= new Thread(new ThreadStart(ConsumeQueue));
t.Start();
}
}
private void ConsumeQueue()
{
while (true)
{
lock (Jobs)
{
if (!Jobs.Any())
{
Interlocked.Decrement(ref threadCount);
return;
}
}
Job j;
var jobToDo = Jobs.TryDequeue(out j);
if (jobToDo)
{
DoCPUBoundWork(j);
}
}
}
6 Answers 6
I think a much easier solution is to just increse the number of threads available in the thread pool by calling ThreadPool.SetMaxThreads()
.
Also, if you have this many threads, then it means that:
- either you're not going to get any performance improvements from parallelization, because all your cores are busy
- or most of your threads are blocked, which means a better approach would be to actually solve the problem by using asynchronous waiting (possibly with the help of C# 5
async
-await
), instead of working around the problem.
-
\$\begingroup\$ The idea of this is to move CPU bound work out of the threadpool. The only reason I have potentially lots of threads is from concurrent requests. I am not trying to parallelise the work, rather to serialise it onto a single thread, at least that is the intention of the code above. \$\endgroup\$Tom– Tom2012年12月12日 02:48:13 +00:00Commented Dec 12, 2012 at 2:48
-
\$\begingroup\$ I am not working around an existing problem, I am trying to write this code in such a way as to avoid tying up threadpool threads, which I know to be a common anti-pattern. \$\endgroup\$Tom– Tom2012年12月12日 02:50:23 +00:00Commented Dec 12, 2012 at 2:50
-
\$\begingroup\$ How are you going to start and work for that work? If you're going to do it synchronously, you're not going to save any ThreadPool threads. \$\endgroup\$svick– svick2012年12月12日 20:03:27 +00:00Commented Dec 12, 2012 at 20:03
From my experience ASP.NET is using threadpool threads only for requests. When creating a new thread inside a request thread will usually* not cause your application pool to run out of resources.
*I say usually as there are cases when a new thread created from inside a request is causing apppool to use another threadpool thread. e.g. System.Threading.ThreadPool.QueueUserWorkItem
What could still go wrong in your code is the following:
- you call QueueJob from a request thread ( R1 )
- a new thread running ConsumeQueue is started T1
- R1 waits untill all it's subthreads are finished ( waits T1 to finish)
- if T1 takes a lot of time, than R1 is blocked for a while.
So you have 1 request hanging for the entire runtime of T1 maybe throwing request timeout at some point.
-
\$\begingroup\$ Thought that 3 wouldn't happen... \$\endgroup\$Tom– Tom2012年12月11日 19:36:58 +00:00Commented Dec 11, 2012 at 19:36
I do think there's a race condition which might mean that some jobs never get executed. Given:
private void QueueJob(Job job)
{
bool startQueue=false;
lock(Jobs)
{
if (threadCount == 0)
{
Interlocked.Increment(ref threadCount);
startQueue = true;
}
}
Jobs.Enqueue(job);
if (startQueue)
{
var t= new Thread(new ThreadStart(ConsumeQueue));
t.Start();
}
}
Suppose the following:
- R1,R2: requests to enqueue a job
- T1, T2: current job executor
Jobs
has no jobs enqueued.- R1 executes
Foo.QueueJob(job1) : lock(Jobs){}
until the end of the lock block and determines there's a thread executing but is preemptively switched just after. Because there's a thread running,startQueue
is never set totrue
- T1: Finishes it's job, finds there's no jobs remaining, decrements the thread count and returns, killing T1.
- (other threads execute, we get back to R1)
- R1: enqueues a job
- R1: sees
startQueue == false
so does not start a new thread to perform the job. job1 stuck in queue, R1 finishes, and job1 doesn't get done. - ... until... some time passes. If the process dies/gets recycled at this time, job1 never gets executed.
- Some R2 executes
Foo.QueueJob(job2) : lock(Jobs){}
. It sees that there are no threads so set flag to start thread. - R2 enqueues job2 and starts a new thread (T2) again. There should only be one thread created as if there were an R3 that executed just after R2 executed the lock block it would just enqueue another job3 and go on.
- T2: starts, finally executes job1 (and any other jobs, including job2)
- R1 executes
I think the simplest solution from what you have is to lock Jobs until both the new job is enqueued and the new thread is started, but that's just wrapping more code. Supposing it were written this way, however, I think we avoid the race condition, and it's much simpler:
private void QueueJob(Job job)
{
Jobs.Enqueue(job);
lock(Jobs)
{
if (threadCount == 0)
{
Interlocked.Increment(ref threadCount);
var t= new Thread(new ThreadStart(ConsumeQueue));
t.Start();
}
}
}
But of course, strictly speaking, we don't need to start the thread while we have a lock on Jobs
, we just need to know that the current request needs to start it. However, I don't like flags polluting the local scope. Maybe this is better?
private bool shouldStartNewQueueConsumer()
{
bool startConsumer = false;
lock(Jobs)
{
if (threadCount == 0)
{
Interlocked.Increment(ref threadCount);
startConsumer = true;
}
}
return startConsumer;
}
private void QueueJob(Job job)
{
Jobs.Enqueue(job);
if(shouldStartNewQueueConsumer()){
var t= new Thread(new ThreadStart(ConsumeQueue));
t.Start();
}
}
Other issues:
Another issue would be if DoCPUBoundWork
throws an exception, the thread would die, but no new threads would get started, and a heap of jobs would build up.
I have doubts about threadCount==0
, etc, but as all operations relating to threadCount occur only within the context of a locked Jobs
object, I guess we're ok, for now. :-/
But once you desire to have more than one thread to process your job Queue, you'll probably want to maintain the set of threads in it's own synchronized collection, and your threadCount variable seems less utilizable in that case, anyway.
-
\$\begingroup\$ Thankyou, yes I agree this looks safe. How do you think this compares with almaz's suggestion - given that we are talking about trying to sensibly manage a particularly heavy request. What I have currently is an
AsyncController
which callsTask.Factory.StartNew(() => DoCPUBoundWork(job), TaskCreationOptions.LongRunning);
. Does this seem like a better alternative? \$\endgroup\$Tom– Tom2012年12月11日 23:36:34 +00:00Commented Dec 11, 2012 at 23:36 -
\$\begingroup\$ It looks like something worth investigating, although I don't know enough about the default factory and default scheduler to know if it fits your situation. There's an interesting example of creating factory with an instance of a derived TaskScheduler class i on msdn.microsoft.com/en-us/library/… which would limit the number of concurrent tasks and I suspect that might work better for you, but I suppose there's no point in trying that until you know what the default TaskFactory does. \$\endgroup\$JayC– JayC2012年12月12日 03:13:03 +00:00Commented Dec 12, 2012 at 3:13
Hmm any reason your not using the TPL with the TaskPoolScheduler? I would use a Task instance from the TaskPoolScheduler to perform the work on your job.
In light of my comment: Creating a Task with TaskCreationOptions.LongRunning effective achieves the same thing. As in, it bypasses the Threadpool.
To expand on Almaz answer:
private Task consumer;
private ConcurrentQueue<Job> jobs = new ConcurrentQueue<Job>();
private void QueueJob(Job job)
{
EnsureConsumer();
jobs.Enqueue(job);
}
private void EnsureConsumer() {
if (consumer == null || consumer.IsCompleted || consumer.IsFaulted)
{
lock (consumer)
{
if (consumer == null || consumer.IsCompleted || consumer.IsFaulted)
{
consumer = Task.Factory.StartNew(() => DoCPUBoundWork(), TaskCreationOptions.LongRunning);
}
}
}
}
private void DoCPUBoundWork() {
Job j = null;
while( jobs.TryDequeue(out j)) {
//do your work
}
}
This effectively ensures that there is one thread consuming your queue. When the task runs, it simply tries to work its way through the queue and stops when there is no work, only to start up again when needed.
If you dont need that, you can also create your task in the constructor of your class, and never let your task stop by simply having it spin. You can then stop your Task using an TaskCancellationToken.
Also note that I did not make the collection or Task instance static. This is because you will want to utilize the Dispose method to stop your long running task, if any. If you really need your job queue to be a single instance, either use an DI container to force a single instance in your appdomain, or use a singleton.
-
1\$\begingroup\$ Thankyou, how does this compare with almaz's suggestion? (which is what I am going with at the moment) \$\endgroup\$Tom– Tom2012年12月11日 23:32:59 +00:00Commented Dec 11, 2012 at 23:32
-
\$\begingroup\$ The same actually. Although you can expand on his anwer by saving the Task in a private var and checking at each invocation of the QueueJob method if the task is still running. Ill edit my answer with an example. \$\endgroup\$Danthar– Danthar2012年12月13日 07:51:31 +00:00Commented Dec 13, 2012 at 7:51
I would do something like that:
private void QueueJob(Job job)
{
Task.Factory.StartNew(() => DoCPUBoundWork(job), TaskCreationOptions.LongRunning);
}
Update:
I did miss the point of having just a single thread that does the heavy work... You're right about using BlockingCollection
, this is the best structure to be used here:
public class Program
{
private readonly BlockingCollection<Job> _jobs = new BlockingCollection<Job>();
public Program()
{
//You can add CancellationToken in case you may want to stop queue processing externally
Task.Factory.StartNew(ConsumeQueue, TaskCreationOptions.LongRunning);
}
private void QueueJob(Job job)
{
_jobs.Add(job);
}
private void ConsumeQueue()
{
while (true)
{
DoCPUBoundWork(_jobs.Take());
}
}
}
This code is simple because it doesn't shut down the worker thread in case when there are no tasks. If you do need this functionality you'll need to add some more thread management code:
private readonly BlockingCollection<Job> _jobs = new BlockingCollection<Job>();
private readonly object _syncLock = new object();
private volatile Task _task;
private void QueueJob(Job job)
{
_jobs.Add(job);
if (_task != null)
return;
lock (_syncLock)
{
if (_task != null)
return;
_task = Task.Factory.StartNew(ConsumeQueue);
}
}
private bool TryDequeueOtherwiseShutdown(out Job job)
{
if (_jobs.TryTake(out job, TimeSpan.FromSeconds(10)))
return true;
lock (_syncLock)
{
var task = _task;
_task = null; //to signal that we're close to shutdown - in order to avoid racing conditions
if (!_jobs.TryTake(out job))
return false;
_task = task; //cancelling shutdown, going back to work
return true;
}
}
private void ConsumeQueue()
{
Job job;
while (TryDequeueOtherwiseShutdown(out job))
{
DoCPUBoundWork(job);
}
}
-
\$\begingroup\$ This looks like the best option really. I am calling
Task.Factory.StartNew(() => DoCPUBoundWork(job), TaskCreationOptions.LongRunning);
from anAsyncController
method. \$\endgroup\$Tom– Tom2012年12月11日 23:41:26 +00:00Commented Dec 11, 2012 at 23:41 -
\$\begingroup\$ Note this behaves differently than the original code. Using TaskCreationOptions.LongRunning creates a new thread, so this code will spawn one thread per job. If you have a lot of jobs, this will spawn a lot of threads. The original code would spawn just one thread. \$\endgroup\$breischl– breischl2012年12月12日 16:17:00 +00:00Commented Dec 12, 2012 at 16:17
-
\$\begingroup\$ Yes, I have realised that this isn't actually what I want. I am going with a
BlockingCollection<Action>
but would be interested to know if there is a one liner like the above - but that queues jobs. \$\endgroup\$Tom– Tom2012年12月16日 17:50:15 +00:00Commented Dec 16, 2012 at 17:50 -
\$\begingroup\$ @Tom I updated the answer \$\endgroup\$almaz– almaz2012年12月16日 22:21:50 +00:00Commented Dec 16, 2012 at 22:21
If you're reasonably sure there will be some work to do, you might as well start up the thread when the app starts and leave it waiting for work. This approach greatly simplifies all the queuing and dequeueing logic, at the slight cost that you have to call Start()
just once at startup, and Stop()
just once at shutdown. Calling them from you Global.Application_Start()
and Global.Application_End()
methods should suffice.
private static BlockingCollection<Job> Jobs = new BlockingCollection<Job>(new ConcurrentQueue<Job>());
private static Thread workerThread;
private static CancellationTokenSource cancelSource = new CancellationTokenSource();
public static void Start()
{
workerThread = new Thread(ConsumeQueue);
workerThread.Start();
}
public void Stop()
{
cancelSource.Cancel();
workerThread.Join();
workerThread = null;
}
private void QueueJob(Job job)
{
Jobs.Add(job);
}
private void ConsumeQueue()
{
while (true)
{
try
{
Job j = Jobs.Take(cancelSource.Token);
//if we're shutting down, bail out immediately
if (cancelSource.Token.IsCancellationRequested)
{
return;
}
DoCPUBoundWork(j);
}
catch (OperationCancelledException)
{
return;
}
catch (Exception ex)
{
//Do something sensible here
}
}
}
-
\$\begingroup\$ (as it is) we can end up spinning in the loop forever, after
Take
thowsOperationCancelledException
. Do something sensible =break
\$\endgroup\$Tom– Tom2012年12月16日 15:48:52 +00:00Commented Dec 16, 2012 at 15:48 -
\$\begingroup\$ This code (as is) ends up spinning! I think that this happens after
Stop
is called fromApplication_End
after an app pool recycle,Take
will then throw and we go into an endless loop. 'Do something sensible' = break; \$\endgroup\$Tom– Tom2012年12月16日 17:37:21 +00:00Commented Dec 16, 2012 at 17:37 -
\$\begingroup\$ @Tom - sorry about that, you're right. I modified the code above to watch for OperationCancelledException and return \$\endgroup\$breischl– breischl2012年12月17日 15:20:55 +00:00Commented Dec 17, 2012 at 15:20
Explore related questions
See similar questions with these tags.
ConcurrentQueue<T>
is already thread-safe, I don't think you need to do any locking to access it. \$\endgroup\$ResizeJobs
? I don't think I could say whether or not any of the locks needed without knowing whatResizeJobs
is. \$\endgroup\$ResizeJobs
should beJobs
! \$\endgroup\$