4
\$\begingroup\$

The following code attempts to efficiently stream queue messages to processing units using an IEnumerable yield return.

  1. create a blocking collection to handle the stream
  2. create a linked token source to abort by external call or internal one
  3. read from q by the EventingBasicConsumer and add messages to the blocking collection.
  4. start a while loop to yield return the messages to the caller.
  5. in abort case clean the blocking collection wait for acks and close the connection.

    private EventingBasicConsumer GetConsumer(string queueName, ushort prefetchCount = 10)
    {
     _channel = Connection.CreateModel();
     _channel.BasicQos(
     prefetchSize: 0,
     prefetchCount: prefetchCount,
     global: false
     );
     _channel.QueueDeclare(
     queue: queueName,
     durable: true,
     exclusive: false,
     autoDelete: false,
     arguments: null
     );
     return new EventingBasicConsumer(_channel);
    }
    /// <summary>
    /// Consumer running continuously to listen for messages
    /// </summary>
    /// <typeparam name="T">MQContractObject</typeparam>
    /// <param name="queueName">The queue name to listen to.</param>
    /// <param name="cancellationToken">The cancellation token to about the operation.</param>
    /// <param name="autoAck">True to automatically acknowledge dequeue messages. other wise false and let to process acknowledge the message when done.</param>
    /// <param name="prefetchCount">the max number of message the queue will serve without acknowledgment on previous ones</param>
    /// <returns>yield enumerator of type T</returns>
    public IEnumerable<T> DequeueYield<T>(string queueName, CancellationToken cancellationToken = default(CancellationToken), bool autoAck = true, ushort prefetchCount = 10) where T : MQContractObject
    {
     using (var resultsQueue = new BlockingCollection<T>())
     {
     using (var tokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken))
     {
     EventingBasicConsumer consumer = null;
     var isConsumerError = false;
     void cancelRun() {
     try
     {
     tokenSource.Cancel();
     }
     catch (ObjectDisposedException)
     {
     }
     }
     void recivedHandler(object sender, BasicDeliverEventArgs ea)
     {
     try
     {
     var body = _utensils.ByteArrayToObject<T>(ea.Body);
     body.DeliveryTag = ea.DeliveryTag;
     resultsQueue.Add(body);
     }
     catch (Exception ex)
     {
     logger.Error($"Failed to receive message from queue '{queueName}'.", ex);
     isConsumerError = true;
     unRegisterHandlers();
     cancelRun();
     }
     }
     void consumerHandler(object sender, ConsumerEventArgs ea)
     {
     lock (_locker)
     {
     isConsumerError = true;
     unRegisterHandlers();
     cancelRun();
     }
     }
     void shutdownHandler(object sender, ShutdownEventArgs ea)
     {
     lock (_locker)
     {
     if (!tokenSource.IsCancellationRequested)
     {
     logger.Warn($"RabbitSimpleReceiver:: DequeueYield has initiated a shutdown, cause { ea.Cause?.ToString() }. will try to read from queue again in 1/2 sec.");
     Task.Delay(500).Wait();
     // try reading from q again.
     readFromQueue();
     }
     }
     }
     void unRegisterHandlers()
     {
     if(consumer!= null)
     {
     consumer.Received -= recivedHandler;
     consumer.Shutdown -= shutdownHandler;
     consumer.ConsumerCancelled -= consumerHandler;
     }
     }
     void registerHandlers()
     {
     //https://stackoverflow.com/a/7065771/395890
     unRegisterHandlers();
     if (consumer != null)
     {
     consumer.Received += recivedHandler;
     consumer.ConsumerCancelled += consumerHandler;
     consumer.Shutdown += shutdownHandler;
     }
     }
     void readFromQueue()
     {
     lock (_locker)
     {
     if (consumer == null)
     {
     consumer = GetConsumer(queueName, prefetchCount);
     }
     if (consumer.IsRunning)
     {
     return;
     }
     registerHandlers();
     _channel.BasicConsume(
     queue: queueName,
     autoAck: autoAck,
     consumer: consumer
     );
     }
     }
     try
     {
     readFromQueue();
     T result = null;
     while (true)
     {
     result = null;
     try
     {
     result = resultsQueue.Take(tokenSource.Token);
     }
     catch (OperationCanceledException)
     {
     unRegisterHandlers();
     break;
     }
     catch (ObjectDisposedException)
     {
     unRegisterHandlers();
     break;
     }
     yield return result;
     }
     // empty the blocking collection.
     while (resultsQueue.TryTake(out result))
     {
     yield return result;
     }
     if (!autoAck)
     {
     Task.Delay(MaxTimeToLeaveTheConnectionOpenForAckAfterCancelactionRequest).Wait();
     }
     if (isConsumerError)
     {
     throw new Exception(consumer.ShutdownReason.ToString());
     }
     else
     {
     yield break;
     }
     }
     finally
     {
     CloseConnection(consumer);
     }
     }
     }
    }
    
asked Jul 27, 2018 at 20:23
\$\endgroup\$
2
  • 3
    \$\begingroup\$ Local functions are great helpers but you really overuse them here. Defining those somewhere in the middle of some statement makes it very confusing. They could all be static methods taking one or two parameters. \$\endgroup\$ Commented Jul 27, 2018 at 21:34
  • 2
    \$\begingroup\$ I'm all for mnemonic names, but MaxTimeToLeaveTheConnectionOpenForAckAfterCancelactionRequest seems a bit excessive, maybe maxOpenTimeAfterCancel or something. \$\endgroup\$ Commented Jul 28, 2018 at 1:58

0

Know someone who can answer? Share a link to this question via email, Twitter, or Facebook.

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.