Skip to main content
Code Review

Return to Answer

Post Made Community Wiki by Simon Forsberg
added 35 characters in body
Source Link
Adam B
  • 207
  • 1
  • 2
  • 6

/Self answer with updated code/

/Self answer with updated code/

added 1490 characters in body
Source Link
Adam B
  • 207
  • 1
  • 2
  • 6
  • I received several improvement suggestions that are relatively minor -- I have incorporated several of them, and reproduce my updated solution below.
  • I received a suggestion based on BlockingCollection, I reproduce this solution as well.
  • I have also received a suggestion based on Tasks and continuation, I think this is also valuable.
  • I received some comments questioning the need for such an abstraction, as using threadpool processing can solve these scenarios ‘directly’. This is a useful conclusion for me, and shows that my original solution is based on traditional thread-based sequential paradigm, and a TPL-based approach can result in quite different (and likely more elegant) solutions. I also show a very simple solution inspired by this, which sovles the orginal problem too.

Here is my improved original solution, keeping the original basic idea (together with simple Main for testing):

/// <summary>
/// Perform asynchronous processing in a background thread.
/// Handle thread creation, exit, thread safety.
/// </summary>
public class BgLoop_ConcQueue : IBgLoop, IDisposable
{
 /// <summary>Message handling delegate.</summary>
 private ProcessMessage _processMessageDelegate;
 /// <summary>Queue for the incoming messages.</summary>
 private ConcurrentQueue<Message> _queue = new ConcurrentQueue<Message>();
 /// <summary>Task for background processing.</summary>
 private Task _bgLoopTask;
 /// <summary>Event for awakening the processing loop, used when new entry is added to the queue or exit requested.</summary>
 private AutoResetEvent _queueEvent = new AutoResetEvent(false);
 /// <summary>Flag to signal stop for the parallel sender thread.</summary>
 private bool _stopTaskFlag;
 /// <summary>Flag to store if this class is disposing</summary>
 private bool _disposed;
 /// <summary>
 /// Start the background thread.
 /// </summary>
 public void Start(ProcessMessage processMessageDelegate)
 {
 if (_bgLoopTask != null) return; // prevent starting more than once
 _processMessageDelegate = processMessageDelegate;
 _bgLoopTask = Task.Factory.StartNew(BgLoop, TaskCreationOptions.LongRunning);
 }
 /// <summary>
 /// Adds new event to the Queue
 /// </summary>
 /// <param name="message">New event to add to the queue.</param>
 public void AddMessage(Message message)
 {
 if (_stopTaskFlag)
 {
 // don't add any more if stop has been requested
 return;
 }
 _queue?.Enqueue(message);
 _queueEvent?.Set();
 }
 /// <summary>
 /// Stops the sender parallel task, but send all data before exiting.
 /// </summary>
 private void StopSenderTask()
 {
 // set the stop flag, and signal change
 _stopTaskFlag = true;
 _queueEvent.Set();
 // wait till the loop exits
 _bgLoopTask.Wait();
 Console.WriteLine("Thread exited");
 }
 /// <summary>
 /// Infinite loop for processing the incoming events from the Queue.
 /// </summary>
 private void BgLoop()
 {
 while (true)
 {
 // copy the message queue if it has entry (copy for thread safety)
 List<Message> messageList = new List<Message>();
 while (_queue.Count > 0)
 {
 Message msg;
 if (_queue.TryDequeue(out msg))
 {
 messageList.Add(msg);
 }
 }
 // Process the events
 if (messageList.Count > 0 && _processMessageDelegate != null)
 {
 foreach (Message m in messageList)
 {
 _processMessageDelegate(m);
 }
 }
 // Stop if need to stop and queue has been emptied
 if (_stopTaskFlag)
 {
 if (_queue.Count == 0)
 {
 break;
 }
 // stop requested but not empty yet -- loop without waiting
 continue;
 }
 // Block for a while, until an event happens. The exact timeout value does not really matter
 _queueEvent.WaitOne(10000);
 }
 Console.WriteLine("Thread exiting...");
 }
 public void Dispose()
 {
 Dispose(true);
 GC.SuppressFinalize(this);
 }
 protected virtual void Dispose(bool disposing)
 {
 if (_disposed)
 {
 return;
 }
 if (disposing)
 {
 StopSenderTask();
 _queueEvent.Dispose();
 }
 _disposed = true;
 }
}

public delegate void ProcessMessage(Message msg);
public interface IBgLoop : IDisposable
{
 void Start(ProcessMessage processMessageDelegate);
 void AddMessage(Message message);
}
public class Message
{
 public string _msg;
 public override string ToString() { return _msg; }
}
public class Program
{
 public static void ProcessMessage(Message msg)
 {
 try
 {
 Console.WriteLine($"{DateTime.Now}: Processing message {msg} ...");
 // ... do processing ...
 }
 catch (Exception)
 {
 // ...
 }
 }
 public static void Main(string[] args)
 {
 //using (IBgLoop loop = new BgLoop_Queue())
 using (IBgLoop loop = new BgLoop_ConcQueue())
 //using (IBgLoop loop = new BgLoop_BlockingCollection())
 {
 loop.Start(new ProcessMessage(Program.ProcessMessage));
 for (int i = 0; i < 5; ++i)
 {
 Thread.Sleep(500);
 loop.AddMessage(new Message{ _msg = $"msg{i}" });
 }
 Thread.Sleep(2000);
 }
 }
}
/// <summary>Perform asynchronous processing in a background thread.</summary>
public class BgLoop_BlockingCollection : IBgLoop, IDisposable
{
 /// <summary>Message handling delegate.</summary>
 private ProcessMessage _processMessageDelegate;
 /// <summary>Keep the messages in a BlockingCollection.</summary>
 private BlockingCollection<Action> _actionQueue = new BlockingCollection<Action>();
 
 /// <summary>Task for background processing.</summary>
 private Task _bgLoopTask;
 /// <summary>Flag to store if this class is disposing.</summary>
 private bool _disposed;
 public void Start(ProcessMessage processMessageDelegate)
 {
 if (_bgLoopTask != null) return; // prevent starting more than once
 _processMessageDelegate = processMessageDelegate;
 _bgLoopTask = Task.Factory.StartNew(BgLoop, TaskCreationOptions.LongRunning);
 }
 public void AddMessage(Message message)
 {
 _actionQueue.Add(() =>
 {
 if (_processMessageDelegate != null)
 {
 _processMessageDelegate(message);
 }
 });
 }
 /// <summary>
 /// Infinite loop for processing the incoming events from the Queue.
 /// </summary>
 private void BgLoop()
 {
 foreach(var item in _actionQueue.GetConsumingEnumerable())
 {
 item(); // Run the task
 } // Exits when the BlockingCollection is marked for no more actions
 }
 /// <summary>
 /// Stops the sender parallel task, but send all data before exiting.
 /// </summary>
 private void StopSenderTask()
 {
 _actionQueue.CompleteAdding();
 _bgLoopTask.Wait();
 Console.WriteLine("Thread exited");
 }
 public void Dispose()
 {
 Dispose(true);
 GC.SuppressFinalize(this);
 }
 
 protected virtual void Dispose(bool disposing)
 {
 if (_disposed)
 {
 return;
 }
 if (disposing)
 {
 StopSenderTask();
 }
 _disposed = true;
 }
}

Here is the simple solution doing no background loop at all:

public class BgLoop_Simple : IBgLoop, IDisposable
{
 /// <summary>Message handling delegate.</summary>
 private ProcessMessage _processMessageDelegate;
 public void Start(ProcessMessage processMessageDelegate)
 {
 if (_processMessageDelegate != null) return; // prevent starting more than once
 _processMessageDelegate = processMessageDelegate;
 // nothing else to do
 }
 public void AddMessage(Message message)
 {
 // call directly in a threadpool thread
 Task.Run(() => _processMessageDelegate?.Invoke(message));
 }
 public void Dispose()
 {
 Dispose(true);
 GC.SuppressFinalize(this);
 }
 protected virtual void Dispose(bool disposing) {}
}

And finally here is a simple Main and surroundings, for testing:

public delegate void ProcessMessage(Message msg);
public interface IBgLoop : IDisposable
{
 void Start(ProcessMessage processMessageDelegate);
 void AddMessage(Message message);
}
public class Message
{
 public string _msg;
 public override string ToString() { return _msg; }
}
public class Program
{
 public static void ProcessMessage(Message msg)
 {
 try
 {
 Console.WriteLine($"{DateTime.Now}: Processing message {msg} ...");
 // ... do processing ...
 }
 catch (Exception)
 {
 // ...
 }
 }
 public static void Main(string[] args)
 {
 //using (IBgLoop loop = new BgLoop_Queue())
 using (IBgLoop loop = new BgLoop_ConcQueue())
 //using (IBgLoop loop = new BgLoop_BlockingCollection())
 //using (IBgLoop loop = new BgLoop_Simple())
 {
 loop.Start(new ProcessMessage(Program.ProcessMessage));
 int cnt = 0;
 // add some messages slowly
 for (int i = 0; i < 5; ++i)
 {
 Thread.Sleep(500);
 loop.AddMessage(new Message{ _msg = $"msg{++cnt}" });
 }
 // add some messages quickly
 for (int i = 0; i < 10; ++i)
 {
 loop.AddMessage(new Message{ _msg = $"msg{++cnt}" });
 }
 Thread.Sleep(2000);
 }
 }
}
  • I received several improvement suggestions that are relatively minor -- I have incorporated several of them.
  • I received a suggestion based on BlockingCollection, I reproduce this solution as well.
  • I have also received a suggestion based on Tasks and continuation, I think this is also valuable.
  • I received some comments questioning the need for such an abstraction, as using threadpool processing can solve these scenarios ‘directly’. This is a useful conclusion for me, and shows that my original solution is based on traditional thread-based sequential paradigm, and a TPL-based approach can result in quite different (and likely more elegant) solutions.

Here is my improved solution, keeping the original basic idea (together with simple Main for testing):

/// <summary>
/// Perform asynchronous processing in a background thread.
/// Handle thread creation, exit, thread safety.
/// </summary>
public class BgLoop_ConcQueue : IBgLoop, IDisposable
{
 /// <summary>Message handling delegate.</summary>
 private ProcessMessage _processMessageDelegate;
 /// <summary>Queue for the incoming messages.</summary>
 private ConcurrentQueue<Message> _queue = new ConcurrentQueue<Message>();
 /// <summary>Task for background processing.</summary>
 private Task _bgLoopTask;
 /// <summary>Event for awakening the processing loop, used when new entry is added to the queue or exit requested.</summary>
 private AutoResetEvent _queueEvent = new AutoResetEvent(false);
 /// <summary>Flag to signal stop for the parallel sender thread.</summary>
 private bool _stopTaskFlag;
 /// <summary>Flag to store if this class is disposing</summary>
 private bool _disposed;
 /// <summary>
 /// Start the background thread.
 /// </summary>
 public void Start(ProcessMessage processMessageDelegate)
 {
 if (_bgLoopTask != null) return; // prevent starting more than once
 _processMessageDelegate = processMessageDelegate;
 _bgLoopTask = Task.Factory.StartNew(BgLoop, TaskCreationOptions.LongRunning);
 }
 /// <summary>
 /// Adds new event to the Queue
 /// </summary>
 /// <param name="message">New event to add to the queue.</param>
 public void AddMessage(Message message)
 {
 if (_stopTaskFlag)
 {
 // don't add any more if stop has been requested
 return;
 }
 _queue?.Enqueue(message);
 _queueEvent?.Set();
 }
 /// <summary>
 /// Stops the sender parallel task, but send all data before exiting.
 /// </summary>
 private void StopSenderTask()
 {
 // set the stop flag, and signal change
 _stopTaskFlag = true;
 _queueEvent.Set();
 // wait till the loop exits
 _bgLoopTask.Wait();
 Console.WriteLine("Thread exited");
 }
 /// <summary>
 /// Infinite loop for processing the incoming events from the Queue.
 /// </summary>
 private void BgLoop()
 {
 while (true)
 {
 // copy the message queue if it has entry (copy for thread safety)
 List<Message> messageList = new List<Message>();
 while (_queue.Count > 0)
 {
 Message msg;
 if (_queue.TryDequeue(out msg))
 {
 messageList.Add(msg);
 }
 }
 // Process the events
 if (messageList.Count > 0 && _processMessageDelegate != null)
 {
 foreach (Message m in messageList)
 {
 _processMessageDelegate(m);
 }
 }
 // Stop if need to stop and queue has been emptied
 if (_stopTaskFlag)
 {
 if (_queue.Count == 0)
 {
 break;
 }
 // stop requested but not empty yet -- loop without waiting
 continue;
 }
 // Block for a while, until an event happens. The exact timeout value does not really matter
 _queueEvent.WaitOne(10000);
 }
 Console.WriteLine("Thread exiting...");
 }
 public void Dispose()
 {
 Dispose(true);
 GC.SuppressFinalize(this);
 }
 protected virtual void Dispose(bool disposing)
 {
 if (_disposed)
 {
 return;
 }
 if (disposing)
 {
 StopSenderTask();
 _queueEvent.Dispose();
 }
 _disposed = true;
 }
}

public delegate void ProcessMessage(Message msg);
public interface IBgLoop : IDisposable
{
 void Start(ProcessMessage processMessageDelegate);
 void AddMessage(Message message);
}
public class Message
{
 public string _msg;
 public override string ToString() { return _msg; }
}
public class Program
{
 public static void ProcessMessage(Message msg)
 {
 try
 {
 Console.WriteLine($"{DateTime.Now}: Processing message {msg} ...");
 // ... do processing ...
 }
 catch (Exception)
 {
 // ...
 }
 }
 public static void Main(string[] args)
 {
 //using (IBgLoop loop = new BgLoop_Queue())
 using (IBgLoop loop = new BgLoop_ConcQueue())
 //using (IBgLoop loop = new BgLoop_BlockingCollection())
 {
 loop.Start(new ProcessMessage(Program.ProcessMessage));
 for (int i = 0; i < 5; ++i)
 {
 Thread.Sleep(500);
 loop.AddMessage(new Message{ _msg = $"msg{i}" });
 }
 Thread.Sleep(2000);
 }
 }
}
/// <summary>Perform asynchronous processing in a background thread.</summary>
public class BgLoop_BlockingCollection : IBgLoop, IDisposable
{
 /// <summary>Message handling delegate.</summary>
 private ProcessMessage _processMessageDelegate;
 /// <summary>Keep the messages in a BlockingCollection.</summary>
 private BlockingCollection<Action> _actionQueue = new BlockingCollection<Action>();
 
 /// <summary>Task for background processing.</summary>
 private Task _bgLoopTask;
 /// <summary>Flag to store if this class is disposing.</summary>
 private bool _disposed;
 public void Start(ProcessMessage processMessageDelegate)
 {
 if (_bgLoopTask != null) return; // prevent starting more than once
 _processMessageDelegate = processMessageDelegate;
 _bgLoopTask = Task.Factory.StartNew(BgLoop, TaskCreationOptions.LongRunning);
 }
 public void AddMessage(Message message)
 {
 _actionQueue.Add(() =>
 {
 if (_processMessageDelegate != null)
 {
 _processMessageDelegate(message);
 }
 });
 }
 /// <summary>
 /// Infinite loop for processing the incoming events from the Queue.
 /// </summary>
 private void BgLoop()
 {
 foreach(var item in _actionQueue.GetConsumingEnumerable())
 {
 item(); // Run the task
 } // Exits when the BlockingCollection is marked for no more actions
 }
 /// <summary>
 /// Stops the sender parallel task, but send all data before exiting.
 /// </summary>
 private void StopSenderTask()
 {
 _actionQueue.CompleteAdding();
 _bgLoopTask.Wait();
 Console.WriteLine("Thread exited");
 }
 public void Dispose()
 {
 Dispose(true);
 GC.SuppressFinalize(this);
 }
 
 protected virtual void Dispose(bool disposing)
 {
 if (_disposed)
 {
 return;
 }
 if (disposing)
 {
 StopSenderTask();
 }
 _disposed = true;
 }
}
  • I received several improvement suggestions that are relatively minor -- I have incorporated several of them, and reproduce my updated solution below.
  • I received a suggestion based on BlockingCollection, I reproduce this solution as well.
  • I have also received a suggestion based on Tasks and continuation, I think this is also valuable.
  • I received some comments questioning the need for such an abstraction, as using threadpool processing can solve these scenarios ‘directly’. This is a useful conclusion for me, and shows that my original solution is based on traditional thread-based sequential paradigm, and a TPL-based approach can result in quite different (and likely more elegant) solutions. I also show a very simple solution inspired by this, which sovles the orginal problem too.

Here is my improved original solution, keeping the original basic idea:

/// <summary>
/// Perform asynchronous processing in a background thread.
/// Handle thread creation, exit, thread safety.
/// </summary>
public class BgLoop_ConcQueue : IBgLoop, IDisposable
{
 /// <summary>Message handling delegate.</summary>
 private ProcessMessage _processMessageDelegate;
 /// <summary>Queue for the incoming messages.</summary>
 private ConcurrentQueue<Message> _queue = new ConcurrentQueue<Message>();
 /// <summary>Task for background processing.</summary>
 private Task _bgLoopTask;
 /// <summary>Event for awakening the processing loop, used when new entry is added to the queue or exit requested.</summary>
 private AutoResetEvent _queueEvent = new AutoResetEvent(false);
 /// <summary>Flag to signal stop for the parallel sender thread.</summary>
 private bool _stopTaskFlag;
 /// <summary>Flag to store if this class is disposing</summary>
 private bool _disposed;
 /// <summary>
 /// Start the background thread.
 /// </summary>
 public void Start(ProcessMessage processMessageDelegate)
 {
 if (_bgLoopTask != null) return; // prevent starting more than once
 _processMessageDelegate = processMessageDelegate;
 _bgLoopTask = Task.Factory.StartNew(BgLoop, TaskCreationOptions.LongRunning);
 }
 /// <summary>
 /// Adds new event to the Queue
 /// </summary>
 /// <param name="message">New event to add to the queue.</param>
 public void AddMessage(Message message)
 {
 if (_stopTaskFlag)
 {
 // don't add any more if stop has been requested
 return;
 }
 _queue?.Enqueue(message);
 _queueEvent?.Set();
 }
 /// <summary>
 /// Stops the sender parallel task, but send all data before exiting.
 /// </summary>
 private void StopSenderTask()
 {
 // set the stop flag, and signal change
 _stopTaskFlag = true;
 _queueEvent.Set();
 // wait till the loop exits
 _bgLoopTask.Wait();
 Console.WriteLine("Thread exited");
 }
 /// <summary>
 /// Infinite loop for processing the incoming events from the Queue.
 /// </summary>
 private void BgLoop()
 {
 while (true)
 {
 // copy the message queue if it has entry (copy for thread safety)
 List<Message> messageList = new List<Message>();
 while (_queue.Count > 0)
 {
 Message msg;
 if (_queue.TryDequeue(out msg))
 {
 messageList.Add(msg);
 }
 }
 // Process the events
 if (messageList.Count > 0 && _processMessageDelegate != null)
 {
 foreach (Message m in messageList)
 {
 _processMessageDelegate(m);
 }
 }
 // Stop if need to stop and queue has been emptied
 if (_stopTaskFlag)
 {
 if (_queue.Count == 0)
 {
 break;
 }
 // stop requested but not empty yet -- loop without waiting
 continue;
 }
 // Block for a while, until an event happens. The exact timeout value does not really matter
 _queueEvent.WaitOne(10000);
 }
 Console.WriteLine("Thread exiting...");
 }
 public void Dispose()
 {
 Dispose(true);
 GC.SuppressFinalize(this);
 }
 protected virtual void Dispose(bool disposing)
 {
 if (_disposed)
 {
 return;
 }
 if (disposing)
 {
 StopSenderTask();
 _queueEvent.Dispose();
 }
 _disposed = true;
 }
}
/// <summary>Perform asynchronous processing in a background thread.</summary>
public class BgLoop_BlockingCollection : IBgLoop, IDisposable
{
 /// <summary>Message handling delegate.</summary>
 private ProcessMessage _processMessageDelegate;
 /// <summary>Keep the messages in a BlockingCollection.</summary>
 private BlockingCollection<Action> _actionQueue = new BlockingCollection<Action>();
 
 /// <summary>Task for background processing.</summary>
 private Task _bgLoopTask;
 /// <summary>Flag to store if this class is disposing.</summary>
 private bool _disposed;
 public void Start(ProcessMessage processMessageDelegate)
 {
 if (_bgLoopTask != null) return; // prevent starting more than once
 _processMessageDelegate = processMessageDelegate;
 _bgLoopTask = Task.Factory.StartNew(BgLoop, TaskCreationOptions.LongRunning);
 }
 public void AddMessage(Message message)
 {
 _actionQueue.Add(() =>
 {
 if (_processMessageDelegate != null)
 {
 _processMessageDelegate(message);
 }
 });
 }
 /// <summary>
 /// Infinite loop for processing the incoming events from the Queue.
 /// </summary>
 private void BgLoop()
 {
 foreach(var item in _actionQueue.GetConsumingEnumerable())
 {
 item(); // Run the task
 } // Exits when the BlockingCollection is marked for no more actions
 }
 /// <summary>
 /// Stops the sender parallel task, but send all data before exiting.
 /// </summary>
 private void StopSenderTask()
 {
 _actionQueue.CompleteAdding();
 _bgLoopTask.Wait();
 Console.WriteLine("Thread exited");
 }
 public void Dispose()
 {
 Dispose(true);
 GC.SuppressFinalize(this);
 }
 
 protected virtual void Dispose(bool disposing)
 {
 if (_disposed)
 {
 return;
 }
 if (disposing)
 {
 StopSenderTask();
 }
 _disposed = true;
 }
}

Here is the simple solution doing no background loop at all:

public class BgLoop_Simple : IBgLoop, IDisposable
{
 /// <summary>Message handling delegate.</summary>
 private ProcessMessage _processMessageDelegate;
 public void Start(ProcessMessage processMessageDelegate)
 {
 if (_processMessageDelegate != null) return; // prevent starting more than once
 _processMessageDelegate = processMessageDelegate;
 // nothing else to do
 }
 public void AddMessage(Message message)
 {
 // call directly in a threadpool thread
 Task.Run(() => _processMessageDelegate?.Invoke(message));
 }
 public void Dispose()
 {
 Dispose(true);
 GC.SuppressFinalize(this);
 }
 protected virtual void Dispose(bool disposing) {}
}

And finally here is a simple Main and surroundings, for testing:

public delegate void ProcessMessage(Message msg);
public interface IBgLoop : IDisposable
{
 void Start(ProcessMessage processMessageDelegate);
 void AddMessage(Message message);
}
public class Message
{
 public string _msg;
 public override string ToString() { return _msg; }
}
public class Program
{
 public static void ProcessMessage(Message msg)
 {
 try
 {
 Console.WriteLine($"{DateTime.Now}: Processing message {msg} ...");
 // ... do processing ...
 }
 catch (Exception)
 {
 // ...
 }
 }
 public static void Main(string[] args)
 {
 //using (IBgLoop loop = new BgLoop_Queue())
 using (IBgLoop loop = new BgLoop_ConcQueue())
 //using (IBgLoop loop = new BgLoop_BlockingCollection())
 //using (IBgLoop loop = new BgLoop_Simple())
 {
 loop.Start(new ProcessMessage(Program.ProcessMessage));
 int cnt = 0;
 // add some messages slowly
 for (int i = 0; i < 5; ++i)
 {
 Thread.Sleep(500);
 loop.AddMessage(new Message{ _msg = $"msg{++cnt}" });
 }
 // add some messages quickly
 for (int i = 0; i < 10; ++i)
 {
 loop.AddMessage(new Message{ _msg = $"msg{++cnt}" });
 }
 Thread.Sleep(2000);
 }
 }
}
Source Link
Adam B
  • 207
  • 1
  • 2
  • 6
  • I received several improvement suggestions that are relatively minor -- I have incorporated several of them.
  • I received a suggestion based on BlockingCollection, I reproduce this solution as well.
  • I have also received a suggestion based on Tasks and continuation, I think this is also valuable.
  • I received some comments questioning the need for such an abstraction, as using threadpool processing can solve these scenarios ‘directly’. This is a useful conclusion for me, and shows that my original solution is based on traditional thread-based sequential paradigm, and a TPL-based approach can result in quite different (and likely more elegant) solutions.

Here is my improved solution, keeping the original basic idea (together with simple Main for testing):

/// <summary>
/// Perform asynchronous processing in a background thread.
/// Handle thread creation, exit, thread safety.
/// </summary>
public class BgLoop_ConcQueue : IBgLoop, IDisposable
{
 /// <summary>Message handling delegate.</summary>
 private ProcessMessage _processMessageDelegate;
 /// <summary>Queue for the incoming messages.</summary>
 private ConcurrentQueue<Message> _queue = new ConcurrentQueue<Message>();
 /// <summary>Task for background processing.</summary>
 private Task _bgLoopTask;
 /// <summary>Event for awakening the processing loop, used when new entry is added to the queue or exit requested.</summary>
 private AutoResetEvent _queueEvent = new AutoResetEvent(false);
 /// <summary>Flag to signal stop for the parallel sender thread.</summary>
 private bool _stopTaskFlag;
 /// <summary>Flag to store if this class is disposing</summary>
 private bool _disposed;
 /// <summary>
 /// Start the background thread.
 /// </summary>
 public void Start(ProcessMessage processMessageDelegate)
 {
 if (_bgLoopTask != null) return; // prevent starting more than once
 _processMessageDelegate = processMessageDelegate;
 _bgLoopTask = Task.Factory.StartNew(BgLoop, TaskCreationOptions.LongRunning);
 }
 /// <summary>
 /// Adds new event to the Queue
 /// </summary>
 /// <param name="message">New event to add to the queue.</param>
 public void AddMessage(Message message)
 {
 if (_stopTaskFlag)
 {
 // don't add any more if stop has been requested
 return;
 }
 _queue?.Enqueue(message);
 _queueEvent?.Set();
 }
 /// <summary>
 /// Stops the sender parallel task, but send all data before exiting.
 /// </summary>
 private void StopSenderTask()
 {
 // set the stop flag, and signal change
 _stopTaskFlag = true;
 _queueEvent.Set();
 // wait till the loop exits
 _bgLoopTask.Wait();
 Console.WriteLine("Thread exited");
 }
 /// <summary>
 /// Infinite loop for processing the incoming events from the Queue.
 /// </summary>
 private void BgLoop()
 {
 while (true)
 {
 // copy the message queue if it has entry (copy for thread safety)
 List<Message> messageList = new List<Message>();
 while (_queue.Count > 0)
 {
 Message msg;
 if (_queue.TryDequeue(out msg))
 {
 messageList.Add(msg);
 }
 }
 // Process the events
 if (messageList.Count > 0 && _processMessageDelegate != null)
 {
 foreach (Message m in messageList)
 {
 _processMessageDelegate(m);
 }
 }
 // Stop if need to stop and queue has been emptied
 if (_stopTaskFlag)
 {
 if (_queue.Count == 0)
 {
 break;
 }
 // stop requested but not empty yet -- loop without waiting
 continue;
 }
 // Block for a while, until an event happens. The exact timeout value does not really matter
 _queueEvent.WaitOne(10000);
 }
 Console.WriteLine("Thread exiting...");
 }
 public void Dispose()
 {
 Dispose(true);
 GC.SuppressFinalize(this);
 }
 protected virtual void Dispose(bool disposing)
 {
 if (_disposed)
 {
 return;
 }
 if (disposing)
 {
 StopSenderTask();
 _queueEvent.Dispose();
 }
 _disposed = true;
 }
}
public delegate void ProcessMessage(Message msg);
public interface IBgLoop : IDisposable
{
 void Start(ProcessMessage processMessageDelegate);
 void AddMessage(Message message);
}
public class Message
{
 public string _msg;
 public override string ToString() { return _msg; }
}
public class Program
{
 public static void ProcessMessage(Message msg)
 {
 try
 {
 Console.WriteLine($"{DateTime.Now}: Processing message {msg} ...");
 // ... do processing ...
 }
 catch (Exception)
 {
 // ...
 }
 }
 public static void Main(string[] args)
 {
 //using (IBgLoop loop = new BgLoop_Queue())
 using (IBgLoop loop = new BgLoop_ConcQueue())
 //using (IBgLoop loop = new BgLoop_BlockingCollection())
 {
 loop.Start(new ProcessMessage(Program.ProcessMessage));
 for (int i = 0; i < 5; ++i)
 {
 Thread.Sleep(500);
 loop.AddMessage(new Message{ _msg = $"msg{i}" });
 }
 Thread.Sleep(2000);
 }
 }
}

Here is the solution based on BlockingCollection -- I find it more elegant:

/// <summary>Perform asynchronous processing in a background thread.</summary>
public class BgLoop_BlockingCollection : IBgLoop, IDisposable
{
 /// <summary>Message handling delegate.</summary>
 private ProcessMessage _processMessageDelegate;
 /// <summary>Keep the messages in a BlockingCollection.</summary>
 private BlockingCollection<Action> _actionQueue = new BlockingCollection<Action>();
 
 /// <summary>Task for background processing.</summary>
 private Task _bgLoopTask;
 /// <summary>Flag to store if this class is disposing.</summary>
 private bool _disposed;
 public void Start(ProcessMessage processMessageDelegate)
 {
 if (_bgLoopTask != null) return; // prevent starting more than once
 _processMessageDelegate = processMessageDelegate;
 _bgLoopTask = Task.Factory.StartNew(BgLoop, TaskCreationOptions.LongRunning);
 }
 public void AddMessage(Message message)
 {
 _actionQueue.Add(() =>
 {
 if (_processMessageDelegate != null)
 {
 _processMessageDelegate(message);
 }
 });
 }
 /// <summary>
 /// Infinite loop for processing the incoming events from the Queue.
 /// </summary>
 private void BgLoop()
 {
 foreach(var item in _actionQueue.GetConsumingEnumerable())
 {
 item(); // Run the task
 } // Exits when the BlockingCollection is marked for no more actions
 }
 /// <summary>
 /// Stops the sender parallel task, but send all data before exiting.
 /// </summary>
 private void StopSenderTask()
 {
 _actionQueue.CompleteAdding();
 _bgLoopTask.Wait();
 Console.WriteLine("Thread exited");
 }
 public void Dispose()
 {
 Dispose(true);
 GC.SuppressFinalize(this);
 }
 
 protected virtual void Dispose(bool disposing)
 {
 if (_disposed)
 {
 return;
 }
 if (disposing)
 {
 StopSenderTask();
 }
 _disposed = true;
 }
}
lang-cs

AltStyle によって変換されたページ (->オリジナル) /