This code consistently executes commands with a given priority. How can I to improve it?
public class PriorizatableCommandQueue<T> : IDisposable
{
private readonly object _lock;
private readonly Action<T> _action;
private ConcurrentQueue<T> _forLowestPriority;
private ConcurrentQueue<T> _forBelowNormalPriority;
private ConcurrentQueue<T> _forNormalPriority;
private ConcurrentQueue<T> _forAboveNormalPriority;
private ConcurrentQueue<T> _forHighestPriority;
private ThreadPriority _currentPriority;
private CancellationTokenSource _cts;
private Task _task;
private bool _isExecuting;
private bool _isDisposed;
public PriorizatableCommandQueue(Action<T> action)
{
_forLowestPriority = new ConcurrentQueue<T>();
_forBelowNormalPriority = new ConcurrentQueue<T>();
_forNormalPriority = new ConcurrentQueue<T>();
_forAboveNormalPriority = new ConcurrentQueue<T>();
_forHighestPriority = new ConcurrentQueue<T>();
_lock = new object();
_action = action;
_currentPriority = ThreadPriority.Normal;
_cts = new CancellationTokenSource();
_task = new Task(() => { }, _cts.Token);
_task.Start();
_cts.Cancel();
_isExecuting = false;
_isDisposed = false;
}
private void ExecuteCommands(ConcurrentQueue<T> queue, CancellationToken token)
{
T command;
while (true)
{
if (token.IsCancellationRequested)
{
lock (_lock)
_isExecuting = false;
throw new OperationCanceledException(token);
}
if (queue.TryDequeue(out command))
_action(command);
else
break;
}
lock (_lock)
{
queue = null;
if (_forHighestPriority.Count > 0)
{
queue = _forHighestPriority;
_currentPriority = ThreadPriority.Highest;
}
else if (_forAboveNormalPriority.Count > 0)
{
queue = _forAboveNormalPriority;
_currentPriority = ThreadPriority.AboveNormal;
}
else if (_forNormalPriority.Count > 0)
{
queue = _forNormalPriority;
_currentPriority = ThreadPriority.Normal;
}
else if (_forBelowNormalPriority.Count > 0)
{
queue = _forBelowNormalPriority;
_currentPriority = ThreadPriority.BelowNormal;
}
else if (_forLowestPriority.Count > 0)
{
queue = _forLowestPriority;
_currentPriority = ThreadPriority.Lowest;
}
if (queue == null)
_isExecuting = false;
else
_task = _task.ContinueWith((task) => ExecuteCommands(queue, token), token);
}
}
public void AddCommand(T command)
{
AddCommand(command, ThreadPriority.Normal);
}
public void AddCommand(T command, ThreadPriority priority)
{
if (_isDisposed)
throw new ObjectDisposedException(GetType().FullName);
ConcurrentQueue<T> queue;
switch (priority)
{
case ThreadPriority.Lowest:
queue = _forLowestPriority;
break;
case ThreadPriority.BelowNormal:
queue = _forBelowNormalPriority;
break;
case ThreadPriority.Normal:
queue = _forNormalPriority;
break;
case ThreadPriority.AboveNormal:
queue = _forAboveNormalPriority;
break;
case ThreadPriority.Highest:
queue = _forHighestPriority;
break;
default:
queue = _forNormalPriority;
break;
}
queue.Enqueue(command);
lock (_lock)
{
if (_currentPriority < priority)
{
_cts.Cancel();
_currentPriority = priority;
_cts = new CancellationTokenSource();
_task = _task.ContinueWith((task) => ExecuteCommands(queue, _cts.Token), _cts.Token);
_isExecuting = true;
}
else if (!_isExecuting)
{
_currentPriority = priority;
_cts = new CancellationTokenSource();
_task = _task.ContinueWith((task) => ExecuteCommands(queue, _cts.Token), _cts.Token);
_isExecuting = true;
}
}
}
public void ClearQueue()
{
if (_isDisposed)
throw new ObjectDisposedException(GetType().FullName);
_cts.Cancel();
_forLowestPriority = new ConcurrentQueue<T>();
_forBelowNormalPriority = new ConcurrentQueue<T>();
_forNormalPriority = new ConcurrentQueue<T>();
_forAboveNormalPriority = new ConcurrentQueue<T>();
_forHighestPriority = new ConcurrentQueue<T>();
}
public void ClearQueueAndWait()
{
ClearQueue();
try
{
_task.Wait();
}
catch (AggregateException ex)
{
ex.Handle((OperationCanceledException) => true);
}
}
public async Task ClearQueueAndWaitAsync()
{
ClearQueue();
try
{
await _task;
}
catch (OperationCanceledException) { }
}
public void Dispose()
{
if (_isDisposed)
return;
ClearQueueAndWait();
_isDisposed = true;
}
}
1 Answer 1
First of all, have you considered implementing a Priority Queue? This will allow you to have just one queue, which automatically arranges items in it based on priority - this will allow you to just have one simple listener to get the Next Item from the queue, and rely on the priority to ensure the Next Item is always the highest priority available. There are many implementations of it, including one here on CodeReview.SE I found in the "Related" sidebar. Another advantage over your queue-per-priority is that you can have more complex prioritization logic, rather than just base it on one field.
However, if you wish to stick your your (conceptually simpler to grok) multiple queues, you can still make the code cleaner and more generic by replacing the variable-for-each-queue with a SortedList<ThreadPriority,ConcurrentQueue>
- have the queues by keyed by priority, and instead of having a big switch/case block to determine the current queue, you can iterate on the Keys, which are sorted.
-
-
\$\begingroup\$ @Nodon, all you need to do to make it thread safe is to
lock
its public methods. \$\endgroup\$Nikita B– Nikita B2015年12月01日 07:10:14 +00:00Commented Dec 1, 2015 at 7:10 -
\$\begingroup\$ @Nodon Do not roll your own priority queue. Standard libraries already have
SortedList
. Instead of putting queues in it as @Avner suggests just put the elements directly. \$\endgroup\$abuzittin gillifirca– abuzittin gillifirca2015年12月01日 07:14:13 +00:00Commented Dec 1, 2015 at 7:14
Explore related questions
See similar questions with these tags.