\$\begingroup\$
\$\endgroup\$
2
The following code attempts to efficiently stream queue messages to processing units using an IEnumerable yield return.
- create a blocking collection to handle the stream
- create a linked token source to abort by external call or internal one
- read from q by the EventingBasicConsumer and add messages to the blocking collection.
- start a while loop to yield return the messages to the caller.
in abort case clean the blocking collection wait for acks and close the connection.
private EventingBasicConsumer GetConsumer(string queueName, ushort prefetchCount = 10) { _channel = Connection.CreateModel(); _channel.BasicQos( prefetchSize: 0, prefetchCount: prefetchCount, global: false ); _channel.QueueDeclare( queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null ); return new EventingBasicConsumer(_channel); } /// <summary> /// Consumer running continuously to listen for messages /// </summary> /// <typeparam name="T">MQContractObject</typeparam> /// <param name="queueName">The queue name to listen to.</param> /// <param name="cancellationToken">The cancellation token to about the operation.</param> /// <param name="autoAck">True to automatically acknowledge dequeue messages. other wise false and let to process acknowledge the message when done.</param> /// <param name="prefetchCount">the max number of message the queue will serve without acknowledgment on previous ones</param> /// <returns>yield enumerator of type T</returns> public IEnumerable<T> DequeueYield<T>(string queueName, CancellationToken cancellationToken = default(CancellationToken), bool autoAck = true, ushort prefetchCount = 10) where T : MQContractObject { using (var resultsQueue = new BlockingCollection<T>()) { using (var tokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken)) { EventingBasicConsumer consumer = null; var isConsumerError = false; void cancelRun() { try { tokenSource.Cancel(); } catch (ObjectDisposedException) { } } void recivedHandler(object sender, BasicDeliverEventArgs ea) { try { var body = _utensils.ByteArrayToObject<T>(ea.Body); body.DeliveryTag = ea.DeliveryTag; resultsQueue.Add(body); } catch (Exception ex) { logger.Error($"Failed to receive message from queue '{queueName}'.", ex); isConsumerError = true; unRegisterHandlers(); cancelRun(); } } void consumerHandler(object sender, ConsumerEventArgs ea) { lock (_locker) { isConsumerError = true; unRegisterHandlers(); cancelRun(); } } void shutdownHandler(object sender, ShutdownEventArgs ea) { lock (_locker) { if (!tokenSource.IsCancellationRequested) { logger.Warn($"RabbitSimpleReceiver:: DequeueYield has initiated a shutdown, cause { ea.Cause?.ToString() }. will try to read from queue again in 1/2 sec."); Task.Delay(500).Wait(); // try reading from q again. readFromQueue(); } } } void unRegisterHandlers() { if(consumer!= null) { consumer.Received -= recivedHandler; consumer.Shutdown -= shutdownHandler; consumer.ConsumerCancelled -= consumerHandler; } } void registerHandlers() { //https://stackoverflow.com/a/7065771/395890 unRegisterHandlers(); if (consumer != null) { consumer.Received += recivedHandler; consumer.ConsumerCancelled += consumerHandler; consumer.Shutdown += shutdownHandler; } } void readFromQueue() { lock (_locker) { if (consumer == null) { consumer = GetConsumer(queueName, prefetchCount); } if (consumer.IsRunning) { return; } registerHandlers(); _channel.BasicConsume( queue: queueName, autoAck: autoAck, consumer: consumer ); } } try { readFromQueue(); T result = null; while (true) { result = null; try { result = resultsQueue.Take(tokenSource.Token); } catch (OperationCanceledException) { unRegisterHandlers(); break; } catch (ObjectDisposedException) { unRegisterHandlers(); break; } yield return result; } // empty the blocking collection. while (resultsQueue.TryTake(out result)) { yield return result; } if (!autoAck) { Task.Delay(MaxTimeToLeaveTheConnectionOpenForAckAfterCancelactionRequest).Wait(); } if (isConsumerError) { throw new Exception(consumer.ShutdownReason.ToString()); } else { yield break; } } finally { CloseConnection(consumer); } } } }
lang-cs
MaxTimeToLeaveTheConnectionOpenForAckAfterCancelactionRequest
seems a bit excessive, maybemaxOpenTimeAfterCancel
or something. \$\endgroup\$