Single app which listen to multiple RabbitMQ queue and process the message. This is working fine but not sure this implementation is right one or I am missing something.
Implementation is inspired from this answer https://stackoverflow.com/a/21847234/37571
//Message subscriber implementation
public class AuditSubscriber : IMessageSubscriber
{
public IList<string> SubscribedRouteKeys
{
get { return new List<string>()
{
"*.inceitive.attested.*"
};
}
}
public async Task<bool> Process(Core.MessageInfo MessageItem)
{
//Start new task to process the message
bool _ProcessedResult = await Task<bool>.Factory.StartNew(() => MessageProcesser(MessageItem), CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default);
return _ProcessedResult;
}
protected bool MessageProcesser(MessageInfo MessageItem)
{
Thread.Sleep(1000); //Acthual work
return true;
}
}
public class RabbitMQMessageConsumer : AbstractRabbitMQClient, IMessageConsumer
{
//Message consumer method, which will initiate number of tasks based upon the available subscriber.
public void Consume(CancellationToken token)
{
//Start Rabbit MQ connection
StartConnection(_ConnectionFactory.Get());
List<Task> tasks = new List<Task>();
foreach (SubscriberType subscriberType in (SubscriberType[])Enum.GetValues(typeof(SubscriberType)))
{
//Start listeing to all queues based upon the number of subscriber type availbale in the system
Task task = Task.Factory.StartNew(() => ConsumeMessage(subscriberType, token), CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default);
tasks.Add(task);
}
Task.WhenAll(tasks);
}
//Listen to queue
async Task ConsumeMessage(SubscriberType subscriberType, CancellationToken token)
{
try
{
//Get message subscriber which will process the message
IMessageSubscriber _MessageSubscriber = _MessageSubscriberFactory.GetMessageSubscriber(subscriberType);
using (IModel _ConsumerChannel = _Connection.CreateModel())
{
_ConsumerChannel.ExchangeDeclare(_ExchangeProperties.Name, _ExchangeProperties.Type, _ExchangeProperties.Durable);
string _QueueName = Enum.GetName(typeof(SubscriberType), subscriberType);
_ConsumerChannel.QueueDeclare(_QueueName, _QueueProperties.Durable, _QueueProperties.Exclusive, _QueueProperties.AutoDelete, _QueueProperties.Arguments);
foreach (string routeKey in _MessageSubscriber.SubscribedRouteKeys)
{
_ConsumerChannel.QueueBind(_QueueName, _ExchangeProperties.Name, routeKey);
}
var consumer = new QueueingBasicConsumer(_ConsumerChannel);
_ConsumerChannel.BasicConsume(_QueueName, false, consumer);
//Infinite loop to listen the queueu
while (true)
{
if (token.IsCancellationRequested)
{
break;
}
try
{
BasicDeliverEventArgs eventArgs;
//Get meesage or time out
if (consumer.Queue.Dequeue(1000, out eventArgs))
{
if (eventArgs != null)
{
MessageInfo _MessageItem = ByteArrayToMessageInfo(eventArgs.Body);
//Message process by async method
var messageProcesser = _MessageSubscriber.Process(_MessageItem);
//Wait for result
bool _MessageProcessed = await messageProcesser;
if (_MessageProcessed)
{
_ConsumerChannel.BasicAck(eventArgs.DeliveryTag, false);
}
else
{
_ConsumerChannel.BasicNack(eventArgs.DeliveryTag, false, true);
}
}
else
{
//connnection is dead
}
}
}
catch (EndOfStreamException ex)
{
Console.WriteLine(ex.Message);
throw;
}
}
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
//TODO: Restart the task again
throw;
}
}
}
-
\$\begingroup\$ Does it work? Why do you think it might not be right? \$\endgroup\$svick– svick2014年02月26日 13:20:00 +00:00Commented Feb 26, 2014 at 13:20
-
\$\begingroup\$ @svick Yes its work. \$\endgroup\$Mahesh– Mahesh2014年02月26日 14:21:51 +00:00Commented Feb 26, 2014 at 14:21
2 Answers 2
public async Task<bool> Process(Core.MessageInfo MessageItem)
{
//Start new task to process the message
bool _ProcessedResult = await Task<bool>.Factory.StartNew(() => MessageProcesser(MessageItem), CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default);
return _ProcessedResult;
}
That's a weird name for a local variable. The common naming is camelCase (e.g.
processedResult
). Underscores are sometimes used for fields, but certainly not for locals.The variable is actually unnecessary, you can just directly
return
the expression.If all
await
s in a method arereturn await
s, then you don't needawait
at all, just return theTask
directly, after removingasync
from the signature.Are you sure
TaskCreationOptions.LongRunning
is appropriate here? Its practical (and undocumented) effect is that it creates a newThread
to execute theTask
. If you don't need that, just useTask.Run()
With all those changes the method would look like this:
public Task<bool> Process(Core.MessageInfo MessageItem)
{
return Task.Run(() => MessageProcesser(MessageItem));
}
There are many empty lines in your code that I think are unnecessary. Empty lines are useful, but I think the way you're using them (after {
or between two }
) is just wasting space. And multiple empty lines are usually not useful either.
Task.WhenAll(tasks);
WhenAll()
returns a Task
that represents waiting for all the passed-in Task
s, so ignoring its return value like this doesn't make any sense. Ideally, you should await
the returned Task
, but for that you need async
. And async void
methods shouldn't be used. So, if you need to wait for all the Task
s here and you can't use await
, you will have to block the thread by using Task.WaitAll(tasks)
.
consumer.Queue.Dequeue(1000, out eventArgs)
This looks like a blocking method. Isn't there an asynchronous version available? If there is, you should probably use that instead.
-
\$\begingroup\$ Unfortunately "consumer.Queue.Dequeue(1000, out eventArgs)" do not have any async version. \$\endgroup\$Mahesh– Mahesh2014年03月11日 12:06:44 +00:00Commented Mar 11, 2014 at 12:06
Very new to all the async await stuff so I could be completely off the mark here but I don't really follow why you are running the MessageProcessor in another new task.
//Wait for result
bool _MessageProcessed = await messageProcesser;
You are already consuming messages in an asynchronous task. So it seems to me that your call to await on the messageProcessor is just going to block an existing asynchronous task. So you might as well just do that work in the current asynchronous task rather than submitting a new task? There is no caller of ConsumeMessage that benefits from this additional await.
Like I said - very new to async await, so please 'school' me if I'm missing the point!
Explore related questions
See similar questions with these tags.