7
\$\begingroup\$

I'd like to use a buffered queue for logging via a WebAPI that handles multiple applications. This helper should reduce the blocking that occures due to logging. I've been experimenting with a producer/consumer pattern and the BufferedProcessor<T> is what I've come up with. Internally it uses the BlockingCollection and allows the user (me) to specify the handler action - it's going to be HttpClient.SendAsync (with additional processing) used as fire-and-forget. Although currently its main purpose is to be used for logging I may use it in other scenarios too.

public class BufferedProcessor<T> : Task
{
 private const int Unlimited = -1;
 private static readonly TimeSpan Indefinite = TimeSpan.FromMilliseconds(-1);
 private readonly State _processor;
 public BufferedProcessor(Action<T> process)
 : this(process, Unlimited, CancellationToken.None)
 { }
 public BufferedProcessor(Action<T> process, int bufferSize)
 : this(process, bufferSize, CancellationToken.None)
 { }
 public BufferedProcessor(Action<T> process, CancellationToken cancellationToken)
 : this(process, Unlimited, cancellationToken)
 { }
 public BufferedProcessor(Action<T> process, int bufferSize, CancellationToken cancellationToken)
 : base(Start, new State(process, bufferSize, cancellationToken), cancellationToken, TaskCreationOptions.LongRunning)
 {
 _processor = (State)AsyncState;
 } 
 public bool Enqueue(T obj)
 {
 return Enqueue(obj, Indefinite);
 }
 public bool Enqueue(T obj, TimeSpan timeout)
 {
 return _processor.Enqueue(obj, timeout);
 }
 private static void Start(object state)
 {
 ((State)state).ProcessObjects();
 }
 private class State
 {
 private readonly Action<T> _action;
 private readonly BlockingCollection<T> _queue = new BlockingCollection<T>();
 private readonly CancellationToken _cancellationToken;
 public State(Action<T> action, int bufferSize, CancellationToken cancellationToken)
 {
 _action = action;
 _queue = bufferSize > 0 ? new BlockingCollection<T>(bufferSize) : new BlockingCollection<T>();
 _cancellationToken = cancellationToken;
 }
 public bool Enqueue(T obj, TimeSpan timeout)
 {
 return _queue.TryAdd(obj, timeout);
 }
 public void ProcessObjects()
 {
 foreach (var obj in _queue.GetConsumingEnumerable(_cancellationToken))
 {
 _action(obj);
 }
 }
 }
}

And this is how I've tested it. I created three produces working with different speeds and limited the queue to 3 items. Before leaving the application I'm waiting for items that may still be in the queue.

void Main()
{
 var cancellationTokenSource = new CancellationTokenSource();
 var cancellationToken = cancellationTokenSource.Token;
 Console.WriteLine($"Main ThreadId = {Thread.CurrentThread.ManagedThreadId}");
 var processor = new BufferedProcessor<int>(
 i => Console.WriteLine($"{i.ToString().PadLeft(3, '_')}\tThreadId = {Thread.CurrentThread.ManagedThreadId}"),
 3,
 cancellationToken);
 processor.Start();
 var producer1 = Task.Run(async () =>
 {
 foreach (var i in Enumerable.Range(0, 10))
 {
 processor.Enqueue(i);
 await Task.Delay(200);
 }
 });
 var producer2 = Task.Run(async () =>
 {
 foreach (var i in Enumerable.Range(10, 10))
 {
 processor.Enqueue(i);
 await Task.Delay(500);
 }
 });
 var producer3 = Task.Run(() =>
 {
 foreach (var i in Enumerable.Range(100, 10))
 {
 processor.Enqueue(i);
 }
 });
 Task.WaitAny(producer1, producer2, producer3);
 // Wait for logs in queue before exiting the application.
 Task.WaitAll(new Task[] { processor }, TimeSpan.FromSeconds(3));
}

Does this design have any obvious weaknesses? I've added the tag because I have very little experience with all the async stuff.

asked Aug 9, 2017 at 19:12
\$\endgroup\$
1
  • \$\begingroup\$ I have change the implementation so the BufferedProcessor<T> is now derived from Task. \$\endgroup\$ Commented Aug 10, 2017 at 8:50

1 Answer 1

3
\$\begingroup\$

I don't see any obvious problems. I'm not a fan of extending Task though. It has enough responsibilities as-is, IMHO. Original implementation that used aggregation instead of inheritance was more straightforward and easier to understand. It took me a while to figure out how BufferedProcessor and State interact with each other, pretty confusing.

P.S. This type of consumer already exists in Dataflow:

var processor = new ActionBlock<int>(n => Console.WriteLine(n),
 new ExecutionDataflowBlockOptions 
 {
 BoundedCapacity = 3,
 CancellationToken = cancellationToken 
 });
processor.Post(1);
processor.Post(2);

You can grab it from NuGet, unless you intentionally reinvent this stuff.

answered Aug 10, 2017 at 11:22
\$\endgroup\$
5
  • \$\begingroup\$ oh, I didn't want to reinvent it :) next move: reading about the dataflow components. \$\endgroup\$ Commented Aug 10, 2017 at 11:46
  • 2
    \$\begingroup\$ @t3chb0t, yeah, for some reason this library is not well known, even though it easily solves all typical data-processing tasks. Probably because it is not included it TPL and goes as separate package. \$\endgroup\$ Commented Aug 10, 2017 at 11:52
  • \$\begingroup\$ I noticed an important difference between my implementation and using the ActionBlock, the latter calls the delegate in new threads whereas mine was using the same thread for each call - but as I've said, I wasn't quite sure what I was doing ;-) I think it's not widely known because the naming convention in the dataflow package is very unusual. Everything is called a block etc so it rarely pops up in search results. It could have been better, I guess. \$\endgroup\$ Commented Aug 13, 2017 at 19:05
  • \$\begingroup\$ I'm taking it back about the unusal naming... I'm reading through the docs and this is pretty cool, I now get why they are called blocks ;-) \$\endgroup\$ Commented Aug 13, 2017 at 20:51
  • \$\begingroup\$ @t3chb0t ExecutionDataflowBlockOptions allows you to set the degree of parallelism. If you set it to 1, actions will still execute on threadpool (probably), but they should no longer run in parallel. \$\endgroup\$ Commented Aug 13, 2017 at 21:45

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.