/Self answer with updated code/
/Self answer with updated code/
- I received several improvement suggestions that are relatively minor -- I have incorporated several of them, and reproduce my updated solution below.
- I received a suggestion based on BlockingCollection, I reproduce this solution as well.
- I have also received a suggestion based on Tasks and continuation, I think this is also valuable.
- I received some comments questioning the need for such an abstraction, as using threadpool processing can solve these scenarios ‘directly’. This is a useful conclusion for me, and shows that my original solution is based on traditional thread-based sequential paradigm, and a TPL-based approach can result in quite different (and likely more elegant) solutions. I also show a very simple solution inspired by this, which sovles the orginal problem too.
Here is my improved original solution, keeping the original basic idea (together with simple Main for testing):
/// <summary>
/// Perform asynchronous processing in a background thread.
/// Handle thread creation, exit, thread safety.
/// </summary>
public class BgLoop_ConcQueue : IBgLoop, IDisposable
{
/// <summary>Message handling delegate.</summary>
private ProcessMessage _processMessageDelegate;
/// <summary>Queue for the incoming messages.</summary>
private ConcurrentQueue<Message> _queue = new ConcurrentQueue<Message>();
/// <summary>Task for background processing.</summary>
private Task _bgLoopTask;
/// <summary>Event for awakening the processing loop, used when new entry is added to the queue or exit requested.</summary>
private AutoResetEvent _queueEvent = new AutoResetEvent(false);
/// <summary>Flag to signal stop for the parallel sender thread.</summary>
private bool _stopTaskFlag;
/// <summary>Flag to store if this class is disposing</summary>
private bool _disposed;
/// <summary>
/// Start the background thread.
/// </summary>
public void Start(ProcessMessage processMessageDelegate)
{
if (_bgLoopTask != null) return; // prevent starting more than once
_processMessageDelegate = processMessageDelegate;
_bgLoopTask = Task.Factory.StartNew(BgLoop, TaskCreationOptions.LongRunning);
}
/// <summary>
/// Adds new event to the Queue
/// </summary>
/// <param name="message">New event to add to the queue.</param>
public void AddMessage(Message message)
{
if (_stopTaskFlag)
{
// don't add any more if stop has been requested
return;
}
_queue?.Enqueue(message);
_queueEvent?.Set();
}
/// <summary>
/// Stops the sender parallel task, but send all data before exiting.
/// </summary>
private void StopSenderTask()
{
// set the stop flag, and signal change
_stopTaskFlag = true;
_queueEvent.Set();
// wait till the loop exits
_bgLoopTask.Wait();
Console.WriteLine("Thread exited");
}
/// <summary>
/// Infinite loop for processing the incoming events from the Queue.
/// </summary>
private void BgLoop()
{
while (true)
{
// copy the message queue if it has entry (copy for thread safety)
List<Message> messageList = new List<Message>();
while (_queue.Count > 0)
{
Message msg;
if (_queue.TryDequeue(out msg))
{
messageList.Add(msg);
}
}
// Process the events
if (messageList.Count > 0 && _processMessageDelegate != null)
{
foreach (Message m in messageList)
{
_processMessageDelegate(m);
}
}
// Stop if need to stop and queue has been emptied
if (_stopTaskFlag)
{
if (_queue.Count == 0)
{
break;
}
// stop requested but not empty yet -- loop without waiting
continue;
}
// Block for a while, until an event happens. The exact timeout value does not really matter
_queueEvent.WaitOne(10000);
}
Console.WriteLine("Thread exiting...");
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (_disposed)
{
return;
}
if (disposing)
{
StopSenderTask();
_queueEvent.Dispose();
}
_disposed = true;
}
}
public delegate void ProcessMessage(Message msg);
public interface IBgLoop : IDisposable
{
void Start(ProcessMessage processMessageDelegate);
void AddMessage(Message message);
}
public class Message
{
public string _msg;
public override string ToString() { return _msg; }
}
public class Program
{
public static void ProcessMessage(Message msg)
{
try
{
Console.WriteLine($"{DateTime.Now}: Processing message {msg} ...");
// ... do processing ...
}
catch (Exception)
{
// ...
}
}
public static void Main(string[] args)
{
//using (IBgLoop loop = new BgLoop_Queue())
using (IBgLoop loop = new BgLoop_ConcQueue())
//using (IBgLoop loop = new BgLoop_BlockingCollection())
{
loop.Start(new ProcessMessage(Program.ProcessMessage));
for (int i = 0; i < 5; ++i)
{
Thread.Sleep(500);
loop.AddMessage(new Message{ _msg = $"msg{i}" });
}
Thread.Sleep(2000);
}
}
}
/// <summary>Perform asynchronous processing in a background thread.</summary>
public class BgLoop_BlockingCollection : IBgLoop, IDisposable
{
/// <summary>Message handling delegate.</summary>
private ProcessMessage _processMessageDelegate;
/// <summary>Keep the messages in a BlockingCollection.</summary>
private BlockingCollection<Action> _actionQueue = new BlockingCollection<Action>();
/// <summary>Task for background processing.</summary>
private Task _bgLoopTask;
/// <summary>Flag to store if this class is disposing.</summary>
private bool _disposed;
public void Start(ProcessMessage processMessageDelegate)
{
if (_bgLoopTask != null) return; // prevent starting more than once
_processMessageDelegate = processMessageDelegate;
_bgLoopTask = Task.Factory.StartNew(BgLoop, TaskCreationOptions.LongRunning);
}
public void AddMessage(Message message)
{
_actionQueue.Add(() =>
{
if (_processMessageDelegate != null)
{
_processMessageDelegate(message);
}
});
}
/// <summary>
/// Infinite loop for processing the incoming events from the Queue.
/// </summary>
private void BgLoop()
{
foreach(var item in _actionQueue.GetConsumingEnumerable())
{
item(); // Run the task
} // Exits when the BlockingCollection is marked for no more actions
}
/// <summary>
/// Stops the sender parallel task, but send all data before exiting.
/// </summary>
private void StopSenderTask()
{
_actionQueue.CompleteAdding();
_bgLoopTask.Wait();
Console.WriteLine("Thread exited");
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (_disposed)
{
return;
}
if (disposing)
{
StopSenderTask();
}
_disposed = true;
}
}
Here is the simple solution doing no background loop at all:
public class BgLoop_Simple : IBgLoop, IDisposable
{
/// <summary>Message handling delegate.</summary>
private ProcessMessage _processMessageDelegate;
public void Start(ProcessMessage processMessageDelegate)
{
if (_processMessageDelegate != null) return; // prevent starting more than once
_processMessageDelegate = processMessageDelegate;
// nothing else to do
}
public void AddMessage(Message message)
{
// call directly in a threadpool thread
Task.Run(() => _processMessageDelegate?.Invoke(message));
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing) {}
}
And finally here is a simple Main and surroundings, for testing:
public delegate void ProcessMessage(Message msg);
public interface IBgLoop : IDisposable
{
void Start(ProcessMessage processMessageDelegate);
void AddMessage(Message message);
}
public class Message
{
public string _msg;
public override string ToString() { return _msg; }
}
public class Program
{
public static void ProcessMessage(Message msg)
{
try
{
Console.WriteLine($"{DateTime.Now}: Processing message {msg} ...");
// ... do processing ...
}
catch (Exception)
{
// ...
}
}
public static void Main(string[] args)
{
//using (IBgLoop loop = new BgLoop_Queue())
using (IBgLoop loop = new BgLoop_ConcQueue())
//using (IBgLoop loop = new BgLoop_BlockingCollection())
//using (IBgLoop loop = new BgLoop_Simple())
{
loop.Start(new ProcessMessage(Program.ProcessMessage));
int cnt = 0;
// add some messages slowly
for (int i = 0; i < 5; ++i)
{
Thread.Sleep(500);
loop.AddMessage(new Message{ _msg = $"msg{++cnt}" });
}
// add some messages quickly
for (int i = 0; i < 10; ++i)
{
loop.AddMessage(new Message{ _msg = $"msg{++cnt}" });
}
Thread.Sleep(2000);
}
}
}
- I received several improvement suggestions that are relatively minor -- I have incorporated several of them.
- I received a suggestion based on BlockingCollection, I reproduce this solution as well.
- I have also received a suggestion based on Tasks and continuation, I think this is also valuable.
- I received some comments questioning the need for such an abstraction, as using threadpool processing can solve these scenarios ‘directly’. This is a useful conclusion for me, and shows that my original solution is based on traditional thread-based sequential paradigm, and a TPL-based approach can result in quite different (and likely more elegant) solutions.
Here is my improved solution, keeping the original basic idea (together with simple Main for testing):
/// <summary>
/// Perform asynchronous processing in a background thread.
/// Handle thread creation, exit, thread safety.
/// </summary>
public class BgLoop_ConcQueue : IBgLoop, IDisposable
{
/// <summary>Message handling delegate.</summary>
private ProcessMessage _processMessageDelegate;
/// <summary>Queue for the incoming messages.</summary>
private ConcurrentQueue<Message> _queue = new ConcurrentQueue<Message>();
/// <summary>Task for background processing.</summary>
private Task _bgLoopTask;
/// <summary>Event for awakening the processing loop, used when new entry is added to the queue or exit requested.</summary>
private AutoResetEvent _queueEvent = new AutoResetEvent(false);
/// <summary>Flag to signal stop for the parallel sender thread.</summary>
private bool _stopTaskFlag;
/// <summary>Flag to store if this class is disposing</summary>
private bool _disposed;
/// <summary>
/// Start the background thread.
/// </summary>
public void Start(ProcessMessage processMessageDelegate)
{
if (_bgLoopTask != null) return; // prevent starting more than once
_processMessageDelegate = processMessageDelegate;
_bgLoopTask = Task.Factory.StartNew(BgLoop, TaskCreationOptions.LongRunning);
}
/// <summary>
/// Adds new event to the Queue
/// </summary>
/// <param name="message">New event to add to the queue.</param>
public void AddMessage(Message message)
{
if (_stopTaskFlag)
{
// don't add any more if stop has been requested
return;
}
_queue?.Enqueue(message);
_queueEvent?.Set();
}
/// <summary>
/// Stops the sender parallel task, but send all data before exiting.
/// </summary>
private void StopSenderTask()
{
// set the stop flag, and signal change
_stopTaskFlag = true;
_queueEvent.Set();
// wait till the loop exits
_bgLoopTask.Wait();
Console.WriteLine("Thread exited");
}
/// <summary>
/// Infinite loop for processing the incoming events from the Queue.
/// </summary>
private void BgLoop()
{
while (true)
{
// copy the message queue if it has entry (copy for thread safety)
List<Message> messageList = new List<Message>();
while (_queue.Count > 0)
{
Message msg;
if (_queue.TryDequeue(out msg))
{
messageList.Add(msg);
}
}
// Process the events
if (messageList.Count > 0 && _processMessageDelegate != null)
{
foreach (Message m in messageList)
{
_processMessageDelegate(m);
}
}
// Stop if need to stop and queue has been emptied
if (_stopTaskFlag)
{
if (_queue.Count == 0)
{
break;
}
// stop requested but not empty yet -- loop without waiting
continue;
}
// Block for a while, until an event happens. The exact timeout value does not really matter
_queueEvent.WaitOne(10000);
}
Console.WriteLine("Thread exiting...");
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (_disposed)
{
return;
}
if (disposing)
{
StopSenderTask();
_queueEvent.Dispose();
}
_disposed = true;
}
}
public delegate void ProcessMessage(Message msg);
public interface IBgLoop : IDisposable
{
void Start(ProcessMessage processMessageDelegate);
void AddMessage(Message message);
}
public class Message
{
public string _msg;
public override string ToString() { return _msg; }
}
public class Program
{
public static void ProcessMessage(Message msg)
{
try
{
Console.WriteLine($"{DateTime.Now}: Processing message {msg} ...");
// ... do processing ...
}
catch (Exception)
{
// ...
}
}
public static void Main(string[] args)
{
//using (IBgLoop loop = new BgLoop_Queue())
using (IBgLoop loop = new BgLoop_ConcQueue())
//using (IBgLoop loop = new BgLoop_BlockingCollection())
{
loop.Start(new ProcessMessage(Program.ProcessMessage));
for (int i = 0; i < 5; ++i)
{
Thread.Sleep(500);
loop.AddMessage(new Message{ _msg = $"msg{i}" });
}
Thread.Sleep(2000);
}
}
}
/// <summary>Perform asynchronous processing in a background thread.</summary>
public class BgLoop_BlockingCollection : IBgLoop, IDisposable
{
/// <summary>Message handling delegate.</summary>
private ProcessMessage _processMessageDelegate;
/// <summary>Keep the messages in a BlockingCollection.</summary>
private BlockingCollection<Action> _actionQueue = new BlockingCollection<Action>();
/// <summary>Task for background processing.</summary>
private Task _bgLoopTask;
/// <summary>Flag to store if this class is disposing.</summary>
private bool _disposed;
public void Start(ProcessMessage processMessageDelegate)
{
if (_bgLoopTask != null) return; // prevent starting more than once
_processMessageDelegate = processMessageDelegate;
_bgLoopTask = Task.Factory.StartNew(BgLoop, TaskCreationOptions.LongRunning);
}
public void AddMessage(Message message)
{
_actionQueue.Add(() =>
{
if (_processMessageDelegate != null)
{
_processMessageDelegate(message);
}
});
}
/// <summary>
/// Infinite loop for processing the incoming events from the Queue.
/// </summary>
private void BgLoop()
{
foreach(var item in _actionQueue.GetConsumingEnumerable())
{
item(); // Run the task
} // Exits when the BlockingCollection is marked for no more actions
}
/// <summary>
/// Stops the sender parallel task, but send all data before exiting.
/// </summary>
private void StopSenderTask()
{
_actionQueue.CompleteAdding();
_bgLoopTask.Wait();
Console.WriteLine("Thread exited");
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (_disposed)
{
return;
}
if (disposing)
{
StopSenderTask();
}
_disposed = true;
}
}
- I received several improvement suggestions that are relatively minor -- I have incorporated several of them, and reproduce my updated solution below.
- I received a suggestion based on BlockingCollection, I reproduce this solution as well.
- I have also received a suggestion based on Tasks and continuation, I think this is also valuable.
- I received some comments questioning the need for such an abstraction, as using threadpool processing can solve these scenarios ‘directly’. This is a useful conclusion for me, and shows that my original solution is based on traditional thread-based sequential paradigm, and a TPL-based approach can result in quite different (and likely more elegant) solutions. I also show a very simple solution inspired by this, which sovles the orginal problem too.
Here is my improved original solution, keeping the original basic idea:
/// <summary>
/// Perform asynchronous processing in a background thread.
/// Handle thread creation, exit, thread safety.
/// </summary>
public class BgLoop_ConcQueue : IBgLoop, IDisposable
{
/// <summary>Message handling delegate.</summary>
private ProcessMessage _processMessageDelegate;
/// <summary>Queue for the incoming messages.</summary>
private ConcurrentQueue<Message> _queue = new ConcurrentQueue<Message>();
/// <summary>Task for background processing.</summary>
private Task _bgLoopTask;
/// <summary>Event for awakening the processing loop, used when new entry is added to the queue or exit requested.</summary>
private AutoResetEvent _queueEvent = new AutoResetEvent(false);
/// <summary>Flag to signal stop for the parallel sender thread.</summary>
private bool _stopTaskFlag;
/// <summary>Flag to store if this class is disposing</summary>
private bool _disposed;
/// <summary>
/// Start the background thread.
/// </summary>
public void Start(ProcessMessage processMessageDelegate)
{
if (_bgLoopTask != null) return; // prevent starting more than once
_processMessageDelegate = processMessageDelegate;
_bgLoopTask = Task.Factory.StartNew(BgLoop, TaskCreationOptions.LongRunning);
}
/// <summary>
/// Adds new event to the Queue
/// </summary>
/// <param name="message">New event to add to the queue.</param>
public void AddMessage(Message message)
{
if (_stopTaskFlag)
{
// don't add any more if stop has been requested
return;
}
_queue?.Enqueue(message);
_queueEvent?.Set();
}
/// <summary>
/// Stops the sender parallel task, but send all data before exiting.
/// </summary>
private void StopSenderTask()
{
// set the stop flag, and signal change
_stopTaskFlag = true;
_queueEvent.Set();
// wait till the loop exits
_bgLoopTask.Wait();
Console.WriteLine("Thread exited");
}
/// <summary>
/// Infinite loop for processing the incoming events from the Queue.
/// </summary>
private void BgLoop()
{
while (true)
{
// copy the message queue if it has entry (copy for thread safety)
List<Message> messageList = new List<Message>();
while (_queue.Count > 0)
{
Message msg;
if (_queue.TryDequeue(out msg))
{
messageList.Add(msg);
}
}
// Process the events
if (messageList.Count > 0 && _processMessageDelegate != null)
{
foreach (Message m in messageList)
{
_processMessageDelegate(m);
}
}
// Stop if need to stop and queue has been emptied
if (_stopTaskFlag)
{
if (_queue.Count == 0)
{
break;
}
// stop requested but not empty yet -- loop without waiting
continue;
}
// Block for a while, until an event happens. The exact timeout value does not really matter
_queueEvent.WaitOne(10000);
}
Console.WriteLine("Thread exiting...");
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (_disposed)
{
return;
}
if (disposing)
{
StopSenderTask();
_queueEvent.Dispose();
}
_disposed = true;
}
}
/// <summary>Perform asynchronous processing in a background thread.</summary>
public class BgLoop_BlockingCollection : IBgLoop, IDisposable
{
/// <summary>Message handling delegate.</summary>
private ProcessMessage _processMessageDelegate;
/// <summary>Keep the messages in a BlockingCollection.</summary>
private BlockingCollection<Action> _actionQueue = new BlockingCollection<Action>();
/// <summary>Task for background processing.</summary>
private Task _bgLoopTask;
/// <summary>Flag to store if this class is disposing.</summary>
private bool _disposed;
public void Start(ProcessMessage processMessageDelegate)
{
if (_bgLoopTask != null) return; // prevent starting more than once
_processMessageDelegate = processMessageDelegate;
_bgLoopTask = Task.Factory.StartNew(BgLoop, TaskCreationOptions.LongRunning);
}
public void AddMessage(Message message)
{
_actionQueue.Add(() =>
{
if (_processMessageDelegate != null)
{
_processMessageDelegate(message);
}
});
}
/// <summary>
/// Infinite loop for processing the incoming events from the Queue.
/// </summary>
private void BgLoop()
{
foreach(var item in _actionQueue.GetConsumingEnumerable())
{
item(); // Run the task
} // Exits when the BlockingCollection is marked for no more actions
}
/// <summary>
/// Stops the sender parallel task, but send all data before exiting.
/// </summary>
private void StopSenderTask()
{
_actionQueue.CompleteAdding();
_bgLoopTask.Wait();
Console.WriteLine("Thread exited");
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (_disposed)
{
return;
}
if (disposing)
{
StopSenderTask();
}
_disposed = true;
}
}
Here is the simple solution doing no background loop at all:
public class BgLoop_Simple : IBgLoop, IDisposable
{
/// <summary>Message handling delegate.</summary>
private ProcessMessage _processMessageDelegate;
public void Start(ProcessMessage processMessageDelegate)
{
if (_processMessageDelegate != null) return; // prevent starting more than once
_processMessageDelegate = processMessageDelegate;
// nothing else to do
}
public void AddMessage(Message message)
{
// call directly in a threadpool thread
Task.Run(() => _processMessageDelegate?.Invoke(message));
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing) {}
}
And finally here is a simple Main and surroundings, for testing:
public delegate void ProcessMessage(Message msg);
public interface IBgLoop : IDisposable
{
void Start(ProcessMessage processMessageDelegate);
void AddMessage(Message message);
}
public class Message
{
public string _msg;
public override string ToString() { return _msg; }
}
public class Program
{
public static void ProcessMessage(Message msg)
{
try
{
Console.WriteLine($"{DateTime.Now}: Processing message {msg} ...");
// ... do processing ...
}
catch (Exception)
{
// ...
}
}
public static void Main(string[] args)
{
//using (IBgLoop loop = new BgLoop_Queue())
using (IBgLoop loop = new BgLoop_ConcQueue())
//using (IBgLoop loop = new BgLoop_BlockingCollection())
//using (IBgLoop loop = new BgLoop_Simple())
{
loop.Start(new ProcessMessage(Program.ProcessMessage));
int cnt = 0;
// add some messages slowly
for (int i = 0; i < 5; ++i)
{
Thread.Sleep(500);
loop.AddMessage(new Message{ _msg = $"msg{++cnt}" });
}
// add some messages quickly
for (int i = 0; i < 10; ++i)
{
loop.AddMessage(new Message{ _msg = $"msg{++cnt}" });
}
Thread.Sleep(2000);
}
}
}
- I received several improvement suggestions that are relatively minor -- I have incorporated several of them.
- I received a suggestion based on BlockingCollection, I reproduce this solution as well.
- I have also received a suggestion based on Tasks and continuation, I think this is also valuable.
- I received some comments questioning the need for such an abstraction, as using threadpool processing can solve these scenarios ‘directly’. This is a useful conclusion for me, and shows that my original solution is based on traditional thread-based sequential paradigm, and a TPL-based approach can result in quite different (and likely more elegant) solutions.
Here is my improved solution, keeping the original basic idea (together with simple Main for testing):
/// <summary>
/// Perform asynchronous processing in a background thread.
/// Handle thread creation, exit, thread safety.
/// </summary>
public class BgLoop_ConcQueue : IBgLoop, IDisposable
{
/// <summary>Message handling delegate.</summary>
private ProcessMessage _processMessageDelegate;
/// <summary>Queue for the incoming messages.</summary>
private ConcurrentQueue<Message> _queue = new ConcurrentQueue<Message>();
/// <summary>Task for background processing.</summary>
private Task _bgLoopTask;
/// <summary>Event for awakening the processing loop, used when new entry is added to the queue or exit requested.</summary>
private AutoResetEvent _queueEvent = new AutoResetEvent(false);
/// <summary>Flag to signal stop for the parallel sender thread.</summary>
private bool _stopTaskFlag;
/// <summary>Flag to store if this class is disposing</summary>
private bool _disposed;
/// <summary>
/// Start the background thread.
/// </summary>
public void Start(ProcessMessage processMessageDelegate)
{
if (_bgLoopTask != null) return; // prevent starting more than once
_processMessageDelegate = processMessageDelegate;
_bgLoopTask = Task.Factory.StartNew(BgLoop, TaskCreationOptions.LongRunning);
}
/// <summary>
/// Adds new event to the Queue
/// </summary>
/// <param name="message">New event to add to the queue.</param>
public void AddMessage(Message message)
{
if (_stopTaskFlag)
{
// don't add any more if stop has been requested
return;
}
_queue?.Enqueue(message);
_queueEvent?.Set();
}
/// <summary>
/// Stops the sender parallel task, but send all data before exiting.
/// </summary>
private void StopSenderTask()
{
// set the stop flag, and signal change
_stopTaskFlag = true;
_queueEvent.Set();
// wait till the loop exits
_bgLoopTask.Wait();
Console.WriteLine("Thread exited");
}
/// <summary>
/// Infinite loop for processing the incoming events from the Queue.
/// </summary>
private void BgLoop()
{
while (true)
{
// copy the message queue if it has entry (copy for thread safety)
List<Message> messageList = new List<Message>();
while (_queue.Count > 0)
{
Message msg;
if (_queue.TryDequeue(out msg))
{
messageList.Add(msg);
}
}
// Process the events
if (messageList.Count > 0 && _processMessageDelegate != null)
{
foreach (Message m in messageList)
{
_processMessageDelegate(m);
}
}
// Stop if need to stop and queue has been emptied
if (_stopTaskFlag)
{
if (_queue.Count == 0)
{
break;
}
// stop requested but not empty yet -- loop without waiting
continue;
}
// Block for a while, until an event happens. The exact timeout value does not really matter
_queueEvent.WaitOne(10000);
}
Console.WriteLine("Thread exiting...");
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (_disposed)
{
return;
}
if (disposing)
{
StopSenderTask();
_queueEvent.Dispose();
}
_disposed = true;
}
}
public delegate void ProcessMessage(Message msg);
public interface IBgLoop : IDisposable
{
void Start(ProcessMessage processMessageDelegate);
void AddMessage(Message message);
}
public class Message
{
public string _msg;
public override string ToString() { return _msg; }
}
public class Program
{
public static void ProcessMessage(Message msg)
{
try
{
Console.WriteLine($"{DateTime.Now}: Processing message {msg} ...");
// ... do processing ...
}
catch (Exception)
{
// ...
}
}
public static void Main(string[] args)
{
//using (IBgLoop loop = new BgLoop_Queue())
using (IBgLoop loop = new BgLoop_ConcQueue())
//using (IBgLoop loop = new BgLoop_BlockingCollection())
{
loop.Start(new ProcessMessage(Program.ProcessMessage));
for (int i = 0; i < 5; ++i)
{
Thread.Sleep(500);
loop.AddMessage(new Message{ _msg = $"msg{i}" });
}
Thread.Sleep(2000);
}
}
}
Here is the solution based on BlockingCollection -- I find it more elegant:
/// <summary>Perform asynchronous processing in a background thread.</summary>
public class BgLoop_BlockingCollection : IBgLoop, IDisposable
{
/// <summary>Message handling delegate.</summary>
private ProcessMessage _processMessageDelegate;
/// <summary>Keep the messages in a BlockingCollection.</summary>
private BlockingCollection<Action> _actionQueue = new BlockingCollection<Action>();
/// <summary>Task for background processing.</summary>
private Task _bgLoopTask;
/// <summary>Flag to store if this class is disposing.</summary>
private bool _disposed;
public void Start(ProcessMessage processMessageDelegate)
{
if (_bgLoopTask != null) return; // prevent starting more than once
_processMessageDelegate = processMessageDelegate;
_bgLoopTask = Task.Factory.StartNew(BgLoop, TaskCreationOptions.LongRunning);
}
public void AddMessage(Message message)
{
_actionQueue.Add(() =>
{
if (_processMessageDelegate != null)
{
_processMessageDelegate(message);
}
});
}
/// <summary>
/// Infinite loop for processing the incoming events from the Queue.
/// </summary>
private void BgLoop()
{
foreach(var item in _actionQueue.GetConsumingEnumerable())
{
item(); // Run the task
} // Exits when the BlockingCollection is marked for no more actions
}
/// <summary>
/// Stops the sender parallel task, but send all data before exiting.
/// </summary>
private void StopSenderTask()
{
_actionQueue.CompleteAdding();
_bgLoopTask.Wait();
Console.WriteLine("Thread exited");
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (_disposed)
{
return;
}
if (disposing)
{
StopSenderTask();
}
_disposed = true;
}
}