I've posted a question on stackoverflow: How can I consequently read messages from a queue in parallel? I would like my own answer to be reviewed.
Situation
We have one message queue. We would like to process messages in parallel and limit the number of simultaneously processed messages.
Our solution
Based on answers given and some research of our own, we've come to a solution. We're using a SemaphoreSlim
to limit our number of parallel Tasks.
Are there any pitfalls to this solution? I'm also interested in any better solutions.
static string queue = @".\Private$\concurrenttest";
private static async Task Process(CancellationToken token)
{
MessageQueue msMq = new MessageQueue(queue);
msMq.Formatter = new XmlMessageFormatter(new Type[] { typeof(Command1) });
SemaphoreSlim s = new SemaphoreSlim(15, 15);
while (true)
{
await s.WaitAsync();
await PeekAsync(msMq);
Command1 message = await ReceiveAsync(msMq);
Task.Run(async () =>
{
try
{
await HandleMessage(message);
}
catch (Exception)
{
// Exception handling
}
s.Release();
});
}
}
private static Task HandleMessage(Command1 message)
{
Console.WriteLine("id: " + message.id + ", name: " + message.name);
Thread.Sleep(1000);
return Task.FromResult(1);
}
private static Task<Message> PeekAsync(MessageQueue msMq)
{
return Task.Factory.FromAsync<Message>(msMq.BeginPeek(), msMq.EndPeek);
}
public class Command1
{
public int id { get; set; }
public string name { get; set; }
}
private static async Task<Command1> ReceiveAsync(MessageQueue msMq)
{
var receiveAsync = await Task.Factory.FromAsync<Message>(msMq.BeginReceive(), msMq.EndPeek);
return (Command1)receiveAsync.Body;
}
1 Answer 1
Readability
Use keyword var
any time that the initialization of the variable clearly tells what the variable represents. Avoid abbreviations in variable names.
var messageQueue = new MessageQueue(queue);
var semaphore = new SemaphoreSlim(15, 15);
Configuration
Avoid hard-coded settings:
var semaphore = new SemaphoreSlim(15, 15);
Make use of a pattern to allow the number of concurrent processing jobs to be configurable (instance property, configuration file, ..).
Main Loop
The main loop while (true)
is considered a code smell by some because it's not clear what the exit conditions are. But what's definately an issue is that the loop never exits. Always make sure the loop has a graceful exit. Since there's a cancellation token provided, why not use it to drive the loop?
sand-box mode:
while (!token.IsCancellationRequested)
{
// .. loop body
}
explicit mode:
while (true)
{
token.ThrowIfCancellationRequested();
// .. loop body
}
Threading Integrity
Once you have acquired the semaphore, you should immediately use the try-finally block to make sure to always release it.
await semaphore.WaitAsync(); // What if the code below throws an exception? The semaphore is not released! await PeekAsync(messageQueue); var message = await ReceiveAsync(messageQueue);
await semaphore.WaitAsync();
try
{
// .. code while holding lock
}
finally
{
semaphore.Release();
}
MessageQueue
There is no reason to peek before receiving, because both operations are blocking on the same trigger - an incoming message.
await PeekAsync(messageQueue); var message = await ReceiveAsync(messageQueue);
You can replace the above with:
var message = await ReceiveAsync(messageQueue);
Explore related questions
See similar questions with these tags.
Command1
is just a testbody for the messages that are send through the MessageQueue.SemaphoreSlim
is a Class from .NET Framework. It should limit the number of threads that access the code block, in this case 15. \$\endgroup\$ReceiveAsync()
dequeues a message from the queue.PeekAsync()
check if there are any messages. \$\endgroup\$