While upgrading our code base to take advantage of new features in .Net 4.5, I'm trying to refactor our take of the classic Producer/Consumer algorithm, but I'm concerned my refactoring is going to increase the CPU usage.
Each instance of my WorkItemConsumer
class is basically a System.Threading.Thread
that is able to process at most MaxConcurrentHandlers
items, each of which is handled by a TPL System.Threading.Task
.
In my previous implementation, I made use of explicit locking to read from the Queue<T>
. Also, the class wrapping the queue set an AutoResetEvent
to signal when a new item has been produced. Here was the code:
protected override void Run(string[] args)
{
while (true)
{
var index = WaitHandle.WaitAny(new[] {StopRequested, queue_.ItemsAvailable});
if (index == 0)
break;
if (tasks_.Count == MaxConcurrentHandlers)
WaitAndDiscardAnyCompletedTask();
var item = queue_.PopItem();
// "null" denotes a signal to stop processing subsequent work items
if (item == null)
break;
tasks_.Add(CreateWorkItemHandler(item));
}
while(tasks_.Count > 0)
WaitAndDiscardAnyCompletedTask();
}
I would like to take advantage of the BlockingCollection<T>
class, which gives me most of the features I want. It allows a consumer to know when no more items will ever be published and it also allows my to remove any explicit locking. However, I don't have a way to being signaled when new work items are available.
Here is the new code:
protected override void Run(string[] args)
{
while (!queue_.IsCompleted)
{
if (StopRequested.WaitOne(0))
break;
if (tasks_.Count == MaxConcurrentHandlers)
WaitAndDiscardAnyCompletedTask();
foreach (var item in queue_.GetConsumingEnumerable().Take(MaxConcurrentHandlers - tasks_.Count))
tasks_.Add(CreateWorkItemHandler(item));
}
while(tasks_.Count > 0)
WaitAndDiscardAnyCompletedTask();
}
How would you refactor this code?
-
1\$\begingroup\$ I think the only way to tell if it increases CPU usage is to put a profiler on it, and run it each way. \$\endgroup\$Jeff Vanzella– Jeff Vanzella2013年03月27日 19:57:13 +00:00Commented Mar 27, 2013 at 19:57
1 Answer 1
Well, after more resarch and testing, it appears that the GetConsumingEnumerable()
method of the BlockingCollection<T>
is actually blocking when there are no items to consume. And it handles the case where the collection is completed as well.
The final touch, it that, instead of using a ManualResetEvent
to signal the consumer to stop, like the original code, I'm using a CancellationToken
instead.
This allows me to simplify the code drastically.
Here is the final snippet:
protected override void Run(string[] args)
{
try
{
foreach (var item in queue_.GetConsumingEnumerable(cancellation_.Token))
{
tasks_.Add(CreateWorkItemHandler(item));
if (tasks_.Count == MaxConcurrentHandlers)
WaitAndDiscardAnyCompletedTask();
}
}
catch (OperationCanceledException /* e */)
{
}
while (tasks_.Count > 0)
WaitAndDiscardAnyCompletedTask();
}
PS: I should have done more testing before posting. But, I'm leaving the question and its answer here for reference.