I need to implement a queue of requests which can be populated from multiple threads. When this queue becomes larger than 1000 completed requests, these requests should be stored into a database.
public class RequestQueue
{
private static BlockingCollection<VerificationRequest> _queue = new BlockingCollection<VerificationRequest>();
private static ConcurrentQueue<VerificationRequest> _storageQueue = new ConcurrentQueue<VerificationRequest>();
private static volatile bool isLoading = false;
private static object _lock = new object();
public static void Launch()
{
Task.Factory.StartNew(execute);
}
public static void Add(VerificationRequest request)
{
_queue.Add(request);
}
public static void AddRange(List<VerificationRequest> requests)
{
Parallel.ForEach(requests, new ParallelOptions() {MaxDegreeOfParallelism = 3},
(request) => { _queue.Add(request); });
}
private static void execute()
{
Parallel.ForEach(_queue.GetConsumingEnumerable(), new ParallelOptions {MaxDegreeOfParallelism = 5}, EnqueueSaveRequest );
}
private static void EnqueueSaveRequest(VerificationRequest request)
{
_storageQueue.Enqueue( new RequestExecuter().ExecuteVerificationRequest( request ) );
if (_storageQueue.Count > 1000 && !isLoading)
{
lock ( _lock )
{
if ( _storageQueue.Count > 1000 && !isLoading )
{
isLoading = true;
var requestChunck = new List<VerificationRequest>();
VerificationRequest req;
for (var i = 0; i < 1000; i++)
{
if( _storageQueue.TryDequeue(out req))
requestChunck.Add(req);
}
new VerificationRequestRepository().InsertRange(requestChunck);
isLoading = false;
}
}
}
}
}
Is there any way to implement this without lock and isLoading
?
-
\$\begingroup\$ Why is everything static? A queue is an object, so create an instance of it and remove "static" from everywhere. \$\endgroup\$404– 4042016年10月01日 22:54:44 +00:00Commented Oct 1, 2016 at 22:54
-
\$\begingroup\$ It supposed to be like singleton. \$\endgroup\$madmanul– madmanul2016年10月01日 23:09:25 +00:00Commented Oct 1, 2016 at 23:09
-
1\$\begingroup\$ That seems like a terrible idea to me. If you want a singleton then create a single instance of the class and inject that instance where required. Otherwise you're limiting yourself to only one queue per application. \$\endgroup\$404– 4042016年10月02日 09:28:09 +00:00Commented Oct 2, 2016 at 9:28
-
\$\begingroup\$ it should be only one queue per application \$\endgroup\$madmanul– madmanul2016年10月02日 10:23:09 +00:00Commented Oct 2, 2016 at 10:23
-
\$\begingroup\$ That may be the way you wish to use it in your application, but it doesn't make sense to design the queue itself that way. \$\endgroup\$404– 4042016年10月02日 14:20:51 +00:00Commented Oct 2, 2016 at 14:20
1 Answer 1
Static class
As proposed by eurotrash I would not implement the class as static class because it has some disadvantages:
- It is only possible to use one class per application
- It is not possible to mock the class (e.g. for unit test / alternative implementations)
- It is not possible to use it with DI.
- It is not very common to have a statefull static classes - therefore it may confuse other developer
Naming
isLoading
should be renamed to_isLoading
execute
should be renamed toExecute
or even better to some more descriptive name (e.g.StartConsumingTask
)
Logic
Is there any way to implement this without lock and isLoading?
Currently your solution uses up to 5 tasks that process the items from the _queue
and put them into the _storageQueue
. If the _storageQueue
has more than 1000 items, one of the 5 tasks writes them to the repository.
If the last part (write the items to the repository) is executed by a separate single task, the lock and the isLoading flag could be dropped.
The lock and the isLoding flag can be dropped if the last part (writing the items to the repository)
Tasks
If you have long running tasks (like the one in the method
Launch
), it is better to start it with optionTaskCreationOptions.LongRunning
. Otherwise the thread for the task is taken from the threadpool and therefore blocks them for executing short-running procedures._queue.Add
(in methodAddRange
) is such a fast operation. I don't see a reason why to useParallel.For
here - just add the item in the current thread.Parallel.For
in methodexecute
makes sense ifExecuteVerificationRequest
is realy time consuming - otherwise it adds only overhead and complexity.
Code with the suggestions above applied:
public class RequestQueue
{
private BlockingCollection<VerificationRequest> _queue = new BlockingCollection<VerificationRequest>();
private ConcurrentQueue<VerificationRequest> _storageQueue = new ConcurrentQueue<VerificationRequest>();
private RequestQueue() { }
public void Launch()
{
Task.Factory.StartNew(StartConsumingTask);
Task.Factory.StartNew(StartPersistingTask);
}
public void Add(VerificationRequest request) => _queue.Add(request);
public void AddRange(List<VerificationRequest> requests) => requests.ForEach(r => Add(r));
private void StartConsumingTask()
{
Parallel.ForEach(_queue.GetConsumingEnumerable(), new ParallelOptions { MaxDegreeOfParallelism = 5 }, EnqueueSaveRequest);
}
private void EnqueueSaveRequest(VerificationRequest request)
{
_storageQueue.Enqueue(new RequestExecuter().ExecuteVerificationRequest(request));
}
private async Task StartPersistingTask()
{
while (true)
{
if (_storageQueue.Count > 1000)
{
var requestChunck = new List<VerificationRequest>();
VerificationRequest req;
for (var i = 0; i < 1000; i++)
{
if (_storageQueue.TryDequeue(out req))
requestChunck.Add(req);
}
new VerificationRequestRepository().InsertRange(requestChunck);
}
else
{
await Task.Delay(100);
}
}
}
private static RequestQueue _instance = new RequestQueue();
public static RequestQueue Instance => _instance;
}
-
\$\begingroup\$
isLoading
should be renamed to_isLoading
- are you sure not to myIsLoading? you're definitelly cured ;-D \$\endgroup\$t3chb0t– t3chb0t2016年10月03日 10:56:02 +00:00Commented Oct 3, 2016 at 10:56 -
\$\begingroup\$ Because the other 3 variables also start with '_'. The actual prefix is not important - but it should ne consistent IMHO. ;) \$\endgroup\$JanDotNet– JanDotNet2016年10月03日 13:49:45 +00:00Commented Oct 3, 2016 at 13:49
-
\$\begingroup\$
isLoading
prevents parallel consuming, but not parallel producing \$\endgroup\$madmanul– madmanul2016年10月03日 15:32:17 +00:00Commented Oct 3, 2016 at 15:32 -
\$\begingroup\$ "The lock and the isLoding flag can be dropped if the last part ..." - it cannot. Other thread will start consuming task too because it still can be more than 1000 jobs in queue \$\endgroup\$madmanul– madmanul2016年10月03日 17:08:34 +00:00Commented Oct 3, 2016 at 17:08
-
\$\begingroup\$ The part between
isLoading = true
andisLoading = false
is not executed parallel, even if_storageQueue
has 5000 items, right? So, my suggestion is, to run that code in a separate single task. \$\endgroup\$JanDotNet– JanDotNet2016年10月03日 19:23:11 +00:00Commented Oct 3, 2016 at 19:23
Explore related questions
See similar questions with these tags.