I'd like to use a buffered queue for logging via a WebAPI that handles multiple applications. This helper should reduce the blocking that occures due to logging. I've been experimenting with a producer/consumer pattern and the BufferedProcessor<T>
is what I've come up with. Internally it uses the BlockingCollection
and allows the user (me) to specify the handler action - it's going to be HttpClient.SendAsync
(with additional processing) used as fire-and-forget. Although currently its main purpose is to be used for logging I may use it in other scenarios too.
public class BufferedProcessor<T> : Task
{
private const int Unlimited = -1;
private static readonly TimeSpan Indefinite = TimeSpan.FromMilliseconds(-1);
private readonly State _processor;
public BufferedProcessor(Action<T> process)
: this(process, Unlimited, CancellationToken.None)
{ }
public BufferedProcessor(Action<T> process, int bufferSize)
: this(process, bufferSize, CancellationToken.None)
{ }
public BufferedProcessor(Action<T> process, CancellationToken cancellationToken)
: this(process, Unlimited, cancellationToken)
{ }
public BufferedProcessor(Action<T> process, int bufferSize, CancellationToken cancellationToken)
: base(Start, new State(process, bufferSize, cancellationToken), cancellationToken, TaskCreationOptions.LongRunning)
{
_processor = (State)AsyncState;
}
public bool Enqueue(T obj)
{
return Enqueue(obj, Indefinite);
}
public bool Enqueue(T obj, TimeSpan timeout)
{
return _processor.Enqueue(obj, timeout);
}
private static void Start(object state)
{
((State)state).ProcessObjects();
}
private class State
{
private readonly Action<T> _action;
private readonly BlockingCollection<T> _queue = new BlockingCollection<T>();
private readonly CancellationToken _cancellationToken;
public State(Action<T> action, int bufferSize, CancellationToken cancellationToken)
{
_action = action;
_queue = bufferSize > 0 ? new BlockingCollection<T>(bufferSize) : new BlockingCollection<T>();
_cancellationToken = cancellationToken;
}
public bool Enqueue(T obj, TimeSpan timeout)
{
return _queue.TryAdd(obj, timeout);
}
public void ProcessObjects()
{
foreach (var obj in _queue.GetConsumingEnumerable(_cancellationToken))
{
_action(obj);
}
}
}
}
And this is how I've tested it. I created three produces working with different speeds and limited the queue to 3 items. Before leaving the application I'm waiting for items that may still be in the queue.
void Main()
{
var cancellationTokenSource = new CancellationTokenSource();
var cancellationToken = cancellationTokenSource.Token;
Console.WriteLine($"Main ThreadId = {Thread.CurrentThread.ManagedThreadId}");
var processor = new BufferedProcessor<int>(
i => Console.WriteLine($"{i.ToString().PadLeft(3, '_')}\tThreadId = {Thread.CurrentThread.ManagedThreadId}"),
3,
cancellationToken);
processor.Start();
var producer1 = Task.Run(async () =>
{
foreach (var i in Enumerable.Range(0, 10))
{
processor.Enqueue(i);
await Task.Delay(200);
}
});
var producer2 = Task.Run(async () =>
{
foreach (var i in Enumerable.Range(10, 10))
{
processor.Enqueue(i);
await Task.Delay(500);
}
});
var producer3 = Task.Run(() =>
{
foreach (var i in Enumerable.Range(100, 10))
{
processor.Enqueue(i);
}
});
Task.WaitAny(producer1, producer2, producer3);
// Wait for logs in queue before exiting the application.
Task.WaitAll(new Task[] { processor }, TimeSpan.FromSeconds(3));
}
Does this design have any obvious weaknesses? I've added the beginner tag because I have very little experience with all the async
stuff.
1 Answer 1
I don't see any obvious problems. I'm not a fan of extending Task
though. It has enough responsibilities as-is, IMHO. Original implementation that used aggregation instead of inheritance was more straightforward and easier to understand. It took me a while to figure out how BufferedProcessor
and State
interact with each other, pretty confusing.
P.S. This type of consumer already exists in Dataflow:
var processor = new ActionBlock<int>(n => Console.WriteLine(n),
new ExecutionDataflowBlockOptions
{
BoundedCapacity = 3,
CancellationToken = cancellationToken
});
processor.Post(1);
processor.Post(2);
You can grab it from NuGet, unless you intentionally reinvent this stuff.
-
\$\begingroup\$ oh, I didn't want to reinvent it :) next move: reading about the dataflow components. \$\endgroup\$t3chb0t– t3chb0t2017年08月10日 11:46:24 +00:00Commented Aug 10, 2017 at 11:46
-
2\$\begingroup\$ @t3chb0t, yeah, for some reason this library is not well known, even though it easily solves all typical data-processing tasks. Probably because it is not included it TPL and goes as separate package. \$\endgroup\$Nikita B– Nikita B2017年08月10日 11:52:24 +00:00Commented Aug 10, 2017 at 11:52
-
\$\begingroup\$ I noticed an important difference between my implementation and using the
ActionBlock
, the latter calls the delegate in new threads whereas mine was using the same thread for each call - but as I've said, I wasn't quite sure what I was doing ;-) I think it's not widely known because the naming convention in the dataflow package is very unusual. Everything is called a block etc so it rarely pops up in search results. It could have been better, I guess. \$\endgroup\$t3chb0t– t3chb0t2017年08月13日 19:05:18 +00:00Commented Aug 13, 2017 at 19:05 -
\$\begingroup\$ I'm taking it back about the unusal naming... I'm reading through the docs and this is pretty cool, I now get why they are called blocks ;-) \$\endgroup\$t3chb0t– t3chb0t2017年08月13日 20:51:36 +00:00Commented Aug 13, 2017 at 20:51
-
\$\begingroup\$ @t3chb0t
ExecutionDataflowBlockOptions
allows you to set the degree of parallelism. If you set it to1
, actions will still execute on threadpool (probably), but they should no longer run in parallel. \$\endgroup\$Nikita B– Nikita B2017年08月13日 21:45:25 +00:00Commented Aug 13, 2017 at 21:45
Explore related questions
See similar questions with these tags.
BufferedProcessor<T>
is now derived fromTask
. \$\endgroup\$