5
\$\begingroup\$

I have a service which receives many requests and I am trying to optimize it by aggregating requests in batches and then processing batch which is much quicker than processing requests one by one. That worked better than expected and I decided to try and make this batch processor generic.

Here is code for BatchCollection class which exposes methods AddItem which is fire-and-forget and ProcessAsync which adds item to batch and can be awaited when batch is processed.

public delegate void BatchEventHandler<TItem>(BatchCollection<TItem> sender, EventArgs args);
public abstract class BatchCollection<TItem> : IDisposable
{
 public event BatchEventHandler<TItem> BatchEvent;
 private IList<TItem> items;
 private readonly int batchItemCount;
 private readonly object syncObj = new object();
 private bool disposed = false;
 private SemaphoreSlim requestSemaphore;
 public int Count
 {
 get { return items.Count; }
 }
 public BatchCollection(int batchItemCount)
 {
 if (batchItemCount <= 0)
 {
 throw new ArgumentOutOfRangeException("batchItemCount", batchItemCount, "batchItemCount greater than 0.");
 }
 this.items = new List<TItem>();
 this.batchItemCount = batchItemCount;
 requestSemaphore = new SemaphoreSlim(0);
 }
 public void AddItem(TItem item)
 {
 int itemsCount;
 lock (syncObj)
 {
 itemsCount = AddItemInternal(item);
 }
 OnBatchEvent(itemsCount);
 }
 public async Task ProcessAsync(TItem item)
 {
 Task processingTask;
 int itemsCount;
 lock (syncObj)
 {
 itemsCount = AddItemInternal(item);
 processingTask = requestSemaphore.WaitAsync();
 }
 OnBatchEvent(itemsCount);
 await processingTask;
 }
 public void ProcessItems()
 {
 SemaphoreSlim localSemaphore = null;
 List<TItem> toProcess = null;
 lock (syncObj)
 {
 if (items.Count != 0)
 {
 toProcess = new List<TItem>(items);
 // allow new requests to be added while current are being processed
 items.Clear();
 localSemaphore = requestSemaphore;
 requestSemaphore = new SemaphoreSlim(0);
 }
 }
 if (toProcess != null) // if (items.Count != 0)
 {
 ProcessBatch(toProcess);
 localSemaphore.Release(toProcess.Count);
 localSemaphore.Dispose();
 }
 }
 public virtual void Dispose()
 {
 // disallow adding of new requests and process all pending requests
 disposed = true;
 ProcessItems();
 requestSemaphore.Dispose();
 BatchEvent = null; // unsubscribe everyone so we don't have memory leaks
 }
 public abstract void ProcessBatch(IList<TItem> items);
 private int AddItemInternal(TItem item)
 {
 // this should always be called from inside lock (syncObj)
 if (disposed)
 {
 throw new ObjectDisposedException(GetType().FullName);
 }
 items.Add(item);
 return items.Count;
 }
 private void OnBatchEvent(int itemCount)
 {
 if (itemCount != batchItemCount)
 {
 return;
 }
 var handlers = BatchEvent; // for thread safety reasons
 if (handlers != null)
 {
 BatchEvent(this, null); // TODO: event arguments
 }
 }
}

And here is BatchProcessor which calls ProcessItems method on BatchCollection.

public class BatchProcessor<TItem> : IDisposable
{
 protected BatchCollection<TItem> collection;
 private int aggregateDelay;
 private int afterProcessingDelay;
 private Thread processingThread;
 private SemaphoreSlim pendingItemsForProcessingSemaphore;
 private volatile bool disposed = false;
 public BatchProcessor(BatchCollection<TItem> collection, int aggregateDelay, int afterProcessingDelay = 0)
 {
 if (aggregateDelay < 0)
 {
 throw new ArgumentOutOfRangeException("aggregateDelay", aggregateDelay, "aggregateDelay must be a positive number.");
 }
 if (afterProcessingDelay < 0)
 {
 throw new ArgumentOutOfRangeException("afterProcessingDelay", afterProcessingDelay, "afterProcessingDelay must be a positive number.");
 }
 if (collection == null)
 {
 throw new ArgumentNullException("collection");
 }
 this.collection = collection;
 this.aggregateDelay = aggregateDelay;
 this.afterProcessingDelay = afterProcessingDelay;
 pendingItemsForProcessingSemaphore = new SemaphoreSlim(0, 1);
 this.collection.BatchEvent += OnBatchEvent;
 processingThread = new Thread(ProcessingLoop);
 processingThread.Start();
 }
 public virtual void Dispose()
 {
 disposed = true;
 this.collection.BatchEvent -= OnBatchEvent;
 SafelyReleaseSemaphore(pendingItemsForProcessingSemaphore);
 processingThread.Join();
 pendingItemsForProcessingSemaphore.Dispose();
 }
 private void OnBatchEvent(BatchCollection<TItem> sender, EventArgs args)
 {
 SafelyReleaseSemaphore(pendingItemsForProcessingSemaphore);
 }
 private void SafelyReleaseSemaphore(SemaphoreSlim sem)
 {
 lock (sem)
 {
 if (sem.CurrentCount == 0)
 {
 try
 {
 sem.Release();
 }
 catch (ObjectDisposedException)
 {
 // could potentially happen if unsubscribe from event (in Dispose) but BatchCollection already saved reference with this subscriber before invoking event
 }
 catch (SemaphoreFullException)
 {
 // could potentially happen if Dispose and OnBatchEvent get called at the same time
 }
 }
 }
 }
 private void ProcessingLoop()
 {
 while (!disposed)
 {
 int sleepTime;
 if (!pendingItemsForProcessingSemaphore.Wait(0)) // check if there is work now
 {
 pendingItemsForProcessingSemaphore.Wait();
 sleepTime = aggregateDelay;
 }
 else
 {
 sleepTime = afterProcessingDelay;
 }
 if (disposed)
 {
 break;
 }
 else if (sleepTime > 0) // avoid context switching
 {
 Thread.Sleep(sleepTime);
 }
 collection.ProcessItems();
 }
 }
}

Any suggestions are welcome! Also, is there any existing library that does something like this?

asked Jun 9, 2017 at 21:17
\$\endgroup\$

1 Answer 1

1
\$\begingroup\$

We use this approach for group commit, ie writing a batch of operations to disk instead of flushing one at a time. Our implementation is a wrapper around BlockingCollection which handles almost all of the synchronization.

So my suggestion is consider using BlockingCollection and here is an example of what that could look like:

public class Batcher<T> : IDisposable
{ 
 public const int DefaultMaxBatchSize = 1000;
 private readonly int _maxBatchSize;
 private readonly BlockingCollection<T> _items;
 private readonly Task _batchTask;
 public Batcher(int maxBatchSize = DefaultMaxBatchSize, int? boundedCapacity = null)
 {
 _maxBatchSize = maxBatchSize;
 _items = new BlockingCollection<T>(boundedCapacity ?? int.MaxValue);
 _batchTask = Task.Run((Action)ProcessItems);
 }
 public delegate void BatchHandler(IEnumerable<T> items);
 public event BatchHandler OnBatch;
 public void Add(T item)
 {
 _items.Add(item);
 }
 public void Dispose()
 {
 _items.CompleteAdding();
 _batchTask.Wait();
 }
 private void ProcessItems()
 {
 var buffer = new List<T>(_maxBatchSize);
 while (!_items.IsCompleted)
 {
 if (_items.TryTake(out var firstItem, 1000))
 {
 buffer.Add(firstItem);
 while (buffer.Count < _maxBatchSize && _items.TryTake(out var item))
 {
 buffer.Add(item);
 }
 OnBatch?.Invoke(buffer);
 buffer.Clear();
 }
 }
 }
}
answered Nov 4, 2017 at 7:14
\$\endgroup\$
1
  • \$\begingroup\$ why the downvote? please provide some feedback so I can improve the answer! \$\endgroup\$ Commented Nov 4, 2017 at 8:44

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.