I created priority queue which stores messages sent from asp net core controller and then processes them in BackgroundService
The thread which loops through items is blocked when collection is empty.
It might look like reinventing the wheel as there is already BlockingCollectionOfT but I am clueless how to extend that thing to my scenario.
Are there any code smells, concurrency hazards?
Perhaps there might be even better way to do it.
I would like to hear your opinion
Thanks
Implementation
internal class BPQueue : IBPQueue, IDisposable
{
private readonly BaseMessage[] _items;
private readonly object _syncRoot = new();
private readonly ManualResetEventSlim _manualResetEventSlim;
private int _head;
public BPQueue ()
{
_items = new BaseMessage[100];
_head = -1;
_manualResetEventSlim = new ();
}
public bool TryEnqueue(BaseMessage message)
{
lock (_syncRoot)
{
if(_head + 1 == _items.Length)
{
return false;
}
_head++;
_items[_head] = message;
}
if (!_manualResetEventSlim.IsSet)
{
_manualResetEventSlim.Set();
}
return true;
}
public IEnumerable<BaseMessage> GetEnumerable(CancellationToken token = default)
{
while(true)
{
token.ThrowIfCancellationRequested();
if(_head == -1)
{
_manualResetEventSlim.Reset();
_manualResetEventSlim.Wait(token);
}
BaseMessage item;
lock (_syncRoot)
{
item = _items[_head];
_items[_head] = null;
_head--;
}
yield return item;
}
}
public void Dispose()
{
_manualResetEventSlim.Dispose();
}
}
Message base class
public abstract class BaseMessage : IComparable<BaseMessage>
{
public Priority Priority { get; }
public BaseMessage(Priority priority)
{
Priority = priority;
}
public int CompareTo(BaseMessage? other)
{
if (other == null)
{
return -1;
}
if (Priority < other.Priority)
{
return 1;
}
if (Priority > other.Priority)
{
return -1;
}
return 0;
}
How is it used
Example controller
public class QueueController : ControllerBase
{
private readonly IBPQueue _queue;
public QueueController(IBPQueue queue)
{
_queue = queue;
}
[HttpPost]
public IActionResult AddMessage([FromBody] SampleMessage sampleMessage)
{
var enqueued = _queue.TryEnqueue(sampleMessage);
if (!enqueued)
{
return BadRequest();
}
return Ok();
}
}
Background service
internal class MessageProcessorHostedService : BackgroundService
{
private readonly ILogger _logger;
private readonly IServiceScopeFactory _serviceScopeFactory;
private readonly IBPQueue _queue;
public TaskQueueHostedService(
ILogger<MessageProcessorHostedService> logger,
IServiceScopeFactory serviceScopeFactory,
IBPQueue queue)
{
_logger = logger;
_serviceScopeFactory = serviceScopeFactory;
_queue = queue;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await Task.Run(async () =>
{
foreach (var message in _blockingPriorityQueue.GetEnumerable(stoppingToken))
{
try
{
stoppingToken.ThrowIfCancellationRequested();
using var serviceScope = _serviceScopeFactory.CreateScope();
var serviceProvider = serviceScope.ServiceProvider;
var messageHandler = serviceProvider.GetMessageHandler(message);
await messageHandler.HandleAsync(message);
stoppingToken.ThrowIfCancellationRequested();
}
catch (OperationCanceledException)
{
_logger.LogInformation("Queue has been stopped");
break;
}
catch (Exception ex)
{
_logger.LogError("An error occurred while processing queue", ex);
}
}
}, stoppingToken);
}
}
Priority
public enum Priority : byte
{
High = 0,
Medium = 1,
Low = 2
}
1 Answer 1
First let me share with you my observations related to your current implementation.
Then let me show you how to rewrite it to take advantage of IAsyncEnumerable
.
BPQueue
Init
- readonly field initialization: You don't need the constructor to initialize them, you can do that in-line
private readonly BaseMessage[] _items = new BaseMessage[100];
private readonly object _syncRoot = new();
private readonly ManualResetEventSlim _manualResetEventSlim = new();
private int _head = -1;
TryEnqueue
- is there any free space: I think you can better express your intent if write the condition like this
if (_head == _items.Length - 1)
- Reasoning:
_head
is an array index and you want to avoid over indexing
GetEnumerable
- infinite loop: At the first glance it is not obvious whether it is an infinite loop or will it terminate in case of cancellation
- If you write your main loop like this then yet again your intent becomes more clear IMHO
while (!token.IsCancellationRequested)
{
...
}
token.ThrowIfCancellationRequested();
- Reasoning: Until it is not cancelled do that ... otherwise bubble up the cancellation
BaseMessage
CompareTo
- If you use switch expression with case guards the whole method become more concise
public int CompareTo(BaseMessage? other) => other switch
{
null => -1,
var msg when msg.Priority > Priority => 1,
var msg when msg.Priority < Priority => -1,
_ => 0
};
Your GetEnumerator
is blocked whenever there is no message to fetch. Since the introduction of Async Enumerables you can rewrite it to make it non-blocking.
Unfortunately there is no built-in async version of the ManualResetEvent
- But Microsoft.VisualStudio.Threading does define an
AsyncManualResetEvent
- As well as the
AutoResetEvent
can be replaced withSemaphoreSlim
.
Here is my version where I have used SemaphoreSlim
:
internal class BPQueue<T>: IDisposable, IAsyncEnumerable<T> where T: class
{
private readonly T[] _items = new T[100];
private readonly object _syncRoot = new();
private readonly SemaphoreSlim isThereAnyData = new(0,1);
private int _head = -1;
public bool TryEnqueue(T message)
{
lock (_syncRoot)
{
if (_head == _items.Length - 1)
return false;
_head++;
_items[_head] = message;
if(_head == 0)
isThereAnyData.Release();
}
return true;
}
public async IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken token = default)
{
while (!token.IsCancellationRequested)
{
if (_head == -1)
await isThereAnyData.WaitAsync(token);
T item;
lock (_syncRoot)
{
item = _items[_head];
_items[_head] = null;
_head--;
}
yield return item;
}
token.ThrowIfCancellationRequested();
}
public void Dispose() => isThereAnyData.Dispose();
}
- I've initialized the
SemaphoreSlim
with 0 as initial value and 1 as the max - Inside the
TryEnqueue
I call theRelease
only if the_head
was-1
prior this method call - Inside the
GetAsyncEnumerator
I call theWaitAsync
if the_head
is-1
to wait until there is some data to retrieve - The class itself implements
IAsyncEnumerable
so theGetAsyncEnumerator
method returns anIAsyncEnumerator
- Please note there is no need here to define the return type as
Task<IAsyncEnumerator>
to be able to useasync
-await
inside the method
- Please note there is no need here to define the return type as
- I've made your class generic because it was tightly coupled to the
BaseMessage
Here is the code which I've used for testing:
static async Task Main()
{
var _queue = new BPQueue<SampleMessage>();
CancellationTokenSource cts = new();
_ = Task.Run(async () => {
await Task.Delay(2000);
_queue.TryEnqueue(new(Priority.Medium));
await Task.Delay(2000);
_queue.TryEnqueue(new(Priority.Medium));
_queue.TryEnqueue(new(Priority.High));
await Task.Delay(500);
_queue.TryEnqueue(new(Priority.High));
});
_ = Task.Run(async () => {
await Task.Delay(1750);
_queue.TryEnqueue(new(Priority.High));
await Task.Delay(2300);
_queue.TryEnqueue(new(Priority.Low));
_queue.TryEnqueue(new(Priority.High));
await Task.Delay(300);
cts.Cancel();
});
try
{
await foreach (var item in _queue.WithCancellation(cts.Token))
{
Console.WriteLine(item.Priority);
}
}
catch (OperationCanceledException)
{
Console.WriteLine("Operation has been cancelled");
}
}
There is a known limitation. The async foreach should start prior the first item is pushed into the queue, since the SemaphoreSlim
has been initialized with 0.
-
1\$\begingroup\$ Superb! I didn't know about
case guards
. I wonder if that thing compiles down to simple if statements or does something clever. Regarding the limitation, apparently there is SemaphoreSlim.CurrentCount which might be used inTryEnqueue
method. \$\endgroup\$Józef Podlecki– Józef Podlecki2021年12月20日 18:04:56 +00:00Commented Dec 20, 2021 at 18:04 -
\$\begingroup\$ @JózefPodlecki If you copy the related code to sharplab.io then you can see it is compiled to if statements. \$\endgroup\$Peter Csala– Peter Csala2021年12月20日 18:45:27 +00:00Commented Dec 20, 2021 at 18:45
-
\$\begingroup\$ @JózefPodlecki It is also advisable to try to perform Release and Wait calls unconditionally. Since the check and the action are not performed as an atomic operation that's why in case of multithreading it could cause strange behaviours. In this simple case it is not a problem, but you should be aware of it. \$\endgroup\$Peter Csala– Peter Csala2021年12月20日 18:50:15 +00:00Commented Dec 20, 2021 at 18:50
Priority
? \$\endgroup\$IAsyncEnumerable
but I couldn't find good mechanism inSystem.Threading
which would block and return task. \$\endgroup\$