I have a service which receives many requests and I am trying to optimize it by aggregating requests in batches and then processing batch which is much quicker than processing requests one by one. That worked better than expected and I decided to try and make this batch processor generic.
Here is code for BatchCollection
class which exposes methods AddItem
which is fire-and-forget and ProcessAsync
which adds item to batch and can be awaited when batch is processed.
public delegate void BatchEventHandler<TItem>(BatchCollection<TItem> sender, EventArgs args);
public abstract class BatchCollection<TItem> : IDisposable
{
public event BatchEventHandler<TItem> BatchEvent;
private IList<TItem> items;
private readonly int batchItemCount;
private readonly object syncObj = new object();
private bool disposed = false;
private SemaphoreSlim requestSemaphore;
public int Count
{
get { return items.Count; }
}
public BatchCollection(int batchItemCount)
{
if (batchItemCount <= 0)
{
throw new ArgumentOutOfRangeException("batchItemCount", batchItemCount, "batchItemCount greater than 0.");
}
this.items = new List<TItem>();
this.batchItemCount = batchItemCount;
requestSemaphore = new SemaphoreSlim(0);
}
public void AddItem(TItem item)
{
int itemsCount;
lock (syncObj)
{
itemsCount = AddItemInternal(item);
}
OnBatchEvent(itemsCount);
}
public async Task ProcessAsync(TItem item)
{
Task processingTask;
int itemsCount;
lock (syncObj)
{
itemsCount = AddItemInternal(item);
processingTask = requestSemaphore.WaitAsync();
}
OnBatchEvent(itemsCount);
await processingTask;
}
public void ProcessItems()
{
SemaphoreSlim localSemaphore = null;
List<TItem> toProcess = null;
lock (syncObj)
{
if (items.Count != 0)
{
toProcess = new List<TItem>(items);
// allow new requests to be added while current are being processed
items.Clear();
localSemaphore = requestSemaphore;
requestSemaphore = new SemaphoreSlim(0);
}
}
if (toProcess != null) // if (items.Count != 0)
{
ProcessBatch(toProcess);
localSemaphore.Release(toProcess.Count);
localSemaphore.Dispose();
}
}
public virtual void Dispose()
{
// disallow adding of new requests and process all pending requests
disposed = true;
ProcessItems();
requestSemaphore.Dispose();
BatchEvent = null; // unsubscribe everyone so we don't have memory leaks
}
public abstract void ProcessBatch(IList<TItem> items);
private int AddItemInternal(TItem item)
{
// this should always be called from inside lock (syncObj)
if (disposed)
{
throw new ObjectDisposedException(GetType().FullName);
}
items.Add(item);
return items.Count;
}
private void OnBatchEvent(int itemCount)
{
if (itemCount != batchItemCount)
{
return;
}
var handlers = BatchEvent; // for thread safety reasons
if (handlers != null)
{
BatchEvent(this, null); // TODO: event arguments
}
}
}
And here is BatchProcessor
which calls ProcessItems
method on BatchCollection
.
public class BatchProcessor<TItem> : IDisposable
{
protected BatchCollection<TItem> collection;
private int aggregateDelay;
private int afterProcessingDelay;
private Thread processingThread;
private SemaphoreSlim pendingItemsForProcessingSemaphore;
private volatile bool disposed = false;
public BatchProcessor(BatchCollection<TItem> collection, int aggregateDelay, int afterProcessingDelay = 0)
{
if (aggregateDelay < 0)
{
throw new ArgumentOutOfRangeException("aggregateDelay", aggregateDelay, "aggregateDelay must be a positive number.");
}
if (afterProcessingDelay < 0)
{
throw new ArgumentOutOfRangeException("afterProcessingDelay", afterProcessingDelay, "afterProcessingDelay must be a positive number.");
}
if (collection == null)
{
throw new ArgumentNullException("collection");
}
this.collection = collection;
this.aggregateDelay = aggregateDelay;
this.afterProcessingDelay = afterProcessingDelay;
pendingItemsForProcessingSemaphore = new SemaphoreSlim(0, 1);
this.collection.BatchEvent += OnBatchEvent;
processingThread = new Thread(ProcessingLoop);
processingThread.Start();
}
public virtual void Dispose()
{
disposed = true;
this.collection.BatchEvent -= OnBatchEvent;
SafelyReleaseSemaphore(pendingItemsForProcessingSemaphore);
processingThread.Join();
pendingItemsForProcessingSemaphore.Dispose();
}
private void OnBatchEvent(BatchCollection<TItem> sender, EventArgs args)
{
SafelyReleaseSemaphore(pendingItemsForProcessingSemaphore);
}
private void SafelyReleaseSemaphore(SemaphoreSlim sem)
{
lock (sem)
{
if (sem.CurrentCount == 0)
{
try
{
sem.Release();
}
catch (ObjectDisposedException)
{
// could potentially happen if unsubscribe from event (in Dispose) but BatchCollection already saved reference with this subscriber before invoking event
}
catch (SemaphoreFullException)
{
// could potentially happen if Dispose and OnBatchEvent get called at the same time
}
}
}
}
private void ProcessingLoop()
{
while (!disposed)
{
int sleepTime;
if (!pendingItemsForProcessingSemaphore.Wait(0)) // check if there is work now
{
pendingItemsForProcessingSemaphore.Wait();
sleepTime = aggregateDelay;
}
else
{
sleepTime = afterProcessingDelay;
}
if (disposed)
{
break;
}
else if (sleepTime > 0) // avoid context switching
{
Thread.Sleep(sleepTime);
}
collection.ProcessItems();
}
}
}
Any suggestions are welcome! Also, is there any existing library that does something like this?
1 Answer 1
We use this approach for group commit, ie writing a batch of operations to disk instead of flushing one at a time. Our implementation is a wrapper around BlockingCollection which handles almost all of the synchronization.
So my suggestion is consider using BlockingCollection and here is an example of what that could look like:
public class Batcher<T> : IDisposable
{
public const int DefaultMaxBatchSize = 1000;
private readonly int _maxBatchSize;
private readonly BlockingCollection<T> _items;
private readonly Task _batchTask;
public Batcher(int maxBatchSize = DefaultMaxBatchSize, int? boundedCapacity = null)
{
_maxBatchSize = maxBatchSize;
_items = new BlockingCollection<T>(boundedCapacity ?? int.MaxValue);
_batchTask = Task.Run((Action)ProcessItems);
}
public delegate void BatchHandler(IEnumerable<T> items);
public event BatchHandler OnBatch;
public void Add(T item)
{
_items.Add(item);
}
public void Dispose()
{
_items.CompleteAdding();
_batchTask.Wait();
}
private void ProcessItems()
{
var buffer = new List<T>(_maxBatchSize);
while (!_items.IsCompleted)
{
if (_items.TryTake(out var firstItem, 1000))
{
buffer.Add(firstItem);
while (buffer.Count < _maxBatchSize && _items.TryTake(out var item))
{
buffer.Add(item);
}
OnBatch?.Invoke(buffer);
buffer.Clear();
}
}
}
}
-
\$\begingroup\$ why the downvote? please provide some feedback so I can improve the answer! \$\endgroup\$robertfriberg– robertfriberg2017年11月04日 08:44:39 +00:00Commented Nov 4, 2017 at 8:44