I must implement thread-safe queue with buffer. According to the advice from the previous post: Thread-safe queue mechanism I resigned from Concurent Collections and used lock. Is it thread safe now? What can I improve?
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Timers;
class Program
{
public class Document : IItem
{
public Guid Id { get; set; }
public Guid MessageId { get; set; }
}
static void Main()
{
var queueProvider = new Provider();
var docs = new List<IItem>
{
new Document { Id = Guid.NewGuid()},
new Document {Id = Guid.NewGuid()},
new Document {Id = Guid.NewGuid()},
new Document {Id = Guid.NewGuid()},
new Document {Id = Guid.NewGuid()},
};
object locker = new object();
try
{
var tasks = new List<Task>();
var task1 = Task.Factory.StartNew(() =>
{
var timer1 = new Timer(1000) { Interval = 5000 };
timer1.Elapsed += (object sender, ElapsedEventArgs e) =>
{
foreach (var doc in docs)
{
if (queueProvider.TryEnqueue(doc))
{
Console.WriteLine("Enqueue: " + doc.Id + "taskId: 1");
Console.WriteLine("Count: " + queueProvider.QueueCount + " Buffor: " + queueProvider.BufforCount);
}
else
{
Console.WriteLine("Not Enqueue: " + doc.Id + "taskId: 1");
}
}
};
timer1.Enabled = true;
timer1.Start();
});
tasks.Add(task1);
var task2 = Task.Factory.StartNew(() =>
{
var timer1 = new Timer(1000) { Interval = 5000 };
timer1.Elapsed += (object sender, ElapsedEventArgs e) =>
{
foreach (var doc in docs)
{
if (queueProvider.TryEnqueue(doc))
{
Console.WriteLine("Enqueue: " + doc.Id + "taskId: 2");
Console.WriteLine("Count: " + queueProvider.QueueCount + " Buffor: " + queueProvider.BufforCount);
}
else
{
Console.WriteLine("Not Enqueue: " + doc.Id + "taskId: 2");
}
}
};
timer1.Enabled = true;
timer1.Start();
});
tasks.Add(task2);
Task.WaitAll(tasks.ToArray());
}
catch (Exception e)
{
Console.WriteLine(e);
}
Console.ReadKey();
}
}
public interface IItem
{
Guid Id { get; set; }
}
public interface IProvider
{
}
public class Provider: IProvider
{
private readonly Queue<IItem> queue;
private readonly Dictionary<Guid, DateTime> inputBuffor;
private readonly object locker = new object();
private readonly int maxQueueCount = 3;
public Provider()
{
queue = new Queue<IItem>();
inputBuffor = new Dictionary<Guid, DateTime>();
}
public bool TryEnqueue(IItem feedingItem)
{
lock (locker)
{
if (inputBuffor.ContainsKey(feedingItem.Id) || queue.Count >= maxQueueCount) return false;
inputBuffor.Add(feedingItem.Id, DateTime.Now);
queue.Enqueue(feedingItem);
return true;
}
}
public IItem Dequeue()
{
lock (locker)
{
if (queue.Count <= 0) return null;
var item = queue.Dequeue();
return item;
}
}
public int QueueCount
{
get { lock (locker) return queue.Count; }
}
public int BufforCount
{
get { lock (locker) return inputBuffor.Count; }
}
}
1 Answer 1
Looks pretty good.
I'm surmising that the console application is just a test/demo driver, and the code you want reviewed is just the Provider
class at the bottom. I don't see any problems with its thread safety, or anything egregious in the way of style.
I do have some comments, though.
The queue class implements an empty interface. If you plan on having multiple implementations, there will be some value in defining a common set of methods here. If you don't plan on having multiple implementations, the value of having an interface (empty or not) is diminished.
The
inputBuffer
andqueue
fields are initialized from within the constructor. I often like to initialize fields like these at their declaration, for two reasons. One, suppose a future developer adds a constructor overload accepting anint capacity
. That developer won't need remember to chain to the empty constructor, and they won't need to set those fields from within the overload. Two, it communicates to future developers that there is "nothing up my sleeve"; it says "I have a private queue field, and it will always be exactly what you expect: a non-null, initially empty queue. You don't have to worry about it".The
Dequeue
method will returnnull
when the queue is empty. I would encourage you to instead write abool TryDequeue(out IItem)
method, and/or haveDequeue
throw when the queue is empty. This both brings you closer in line to standard library'sQueue
behavior, and helps keepnull
values out of your program.The "item exists or queue is full" check in
TryEnqueue
is a pretty long line. You might add aprivate bool CanAdd(IItem)
method to handle those details.The
inputBuffer
is aDictionary
, but the values are never used. If you have no plans to use them, you might "Keep It Simple" by changing this to aHashSet
.An item can't be re-queued, even after being dequeued, because the
inputBuffer
is never cleared. I'm not sure if this is intentional; if re-adding an item is a valid use case you'll want to evict that item's ID from theinputBuffer
inDequeue
.Items lose their type once they're put in the queue: You may enqueue a
Document
, but when it's dequeued you'll have anIItem
you'll need to cast back toDocument
. The solution to this would be to make the class generic, which would look something like this:
public class Provider<TItem> where TItem : IItem
{
private readonly Queue<TItem> queue = new Queue<TItem>();
private readonly HashSet<TItem> inputBuffer = new HashSet<TItem>();
// other methods and fields ...
private bool CanAdd(TItem feedingItem)
{
return !inputBuffer.Contains(feedingItem.Id) // IItem.Id is still accessible
&& !queue.Count >= maxQueueCount;
}
public TItem Dequeue()
{
// thread safety code ...
return queue.Dequeue();
}
}
And would be used something like this:
var queueProvider = new Provider<Document>();
var inputDoc = new Document();
queueProvider.TryEnqueue(inputDoc);
var outputDoc = queueProvider.Dequeue(); // Type of outputDoc is Document
var outputId = outputDoc.MessageId; // MessageId is accessible without cast
Beyond that, my only advice would be to consider the names carefully:
The class is called
Provider
, but it behaves like a queue. Why not call it aConcurrentQueue
, or evenCappedConcurrentQueue
to hint at the maximum size limitation?If the
IItem
interface is intended only for use with queues, I would suggest you ensure it's in a queue-specific namespace, or rename it toIQueueItem
(maybe both). If it's intended for more general use, I would suggest naming it something more indicative of its details... perhapsIIdentifiable
, since its only method provides an identifier?The collection of item IDs currently in the queue is called
inputBuffer
, which to me indicates that it's a temporary place to hold items before they enter the real queue (like definition 3 here). I would have named it something more descriptive of its actual use, likecurrentItemIds
.The argument to
TryEnqueue
is calledfeedingItem
, but the idea that I'm feeding the item into the queue is already suggested by the name of the method I'm calling. The standard library'sQueue.Enqueue
just calls its argumentitem
, and I would agree that's sufficient.
-
\$\begingroup\$ Thanks for answer. What do you think about my previous qestion: codereview.stackexchange.com/questions/203225/…. and Concurent Collections for this case. Maybe Could I use BlockingCollection with BoundedCapacity? I have more questiion but I will post new question with code corections. \$\endgroup\$dMilan– dMilan2018年09月13日 07:48:29 +00:00Commented Sep 13, 2018 at 7:48