I'm in need of a solution that runs constantly incoming requests in a per-resource sequence, but parallel in general.
The use-case:
Many clients connect to a server and start issuing work. The work of a single client needs to run in sequential order, so the downward code doesn't need to cope with concurrency, but in general all work should be run on multiple threads. I'm trusting the .NET framework a lot here, which I hope is a good thing.
I've also read into DataFlow
and parallel Rx but could not find a general solution there. But hints into that direction are welcome!
class TaskGroup
{
public int CurrentlyQueuedTasks { get { return _currentlyQueued; } }
private readonly object _previousTaskMonitor;
private Task _previousTask;
private int _currentlyQueued;
public TaskGroup()
{
_previousTaskMonitor = new object();
_previousTask = Task.CompletedTask;
}
public void Append(Action action)
{
lock(_previousTaskMonitor)
{
Interlocked.Increment(ref _currentlyQueued);
_previousTask = _previousTask.ContinueWith(task =>
{
try
{
action();
}catch(Exception)
{
//TODO
}
finally
{
Interlocked.Decrement(ref _currentlyQueued);
}
});
}
}
}
-
3\$\begingroup\$ It's not clear what you're asking here. Does your code work as intended? \$\endgroup\$RubberDuck– RubberDuck2016年02月06日 16:57:02 +00:00Commented Feb 6, 2016 at 16:57
-
\$\begingroup\$ Yes, it seems to work, but i haven't tested it heavily against concurrency issues. I'm also asking for improvements or maybe parts of the library that i missed. \$\endgroup\$Vengarioth– Vengarioth2016年02月06日 18:36:55 +00:00Commented Feb 6, 2016 at 18:36
2 Answers 2
This is an interesting approach. I would have used a queue by default since it seems to express the semantics a bit clearer (the queuing is slightly more obvious). Also ContinueWith
creates a Task
wrapping the original task which I'm not sure if that has any form of performance downsides (it probably shouldn't). I hacked a quick benchmark together with the alternative being implemented using the BlockingCollection
:
using System;
using System.Diagnostics;
using System.Threading.Tasks;
using System.Threading;
using System.Collections.Concurrent;
public interface IAppendable
{
void Append(Action action);
}
public class TaskGroup : IAppendable
{
public int CurrentlyQueuedTasks { get { return _currentlyQueued; } }
private readonly object _previousTaskMonitor;
private Task _previousTask;
private int _currentlyQueued;
public TaskGroup()
{
_previousTaskMonitor = new object();
_previousTask = Task.FromResult(false);
}
public void Append(Action action)
{
lock(_previousTaskMonitor)
{
Interlocked.Increment(ref _currentlyQueued);
_previousTask = _previousTask.ContinueWith(task =>
{
try
{
action();
}catch(Exception)
{
//TODO
}
finally
{
Interlocked.Decrement(ref _currentlyQueued);
}
});
}
}
}
public class QueueAppendable : IAppendable, IDisposable
{
public int CurrentlyQueuedTasks { get { return _Queue.Count; } }
BlockingCollection<Action> _Queue = new BlockingCollection<Action>();
public QueueAppendable()
{
Task.Factory.StartNew(() =>
{
while (true)
{
try
{
var action = _Queue.Take();
action();
}
catch (InvalidOperationException)
{
break;
}
catch
{
// TODO log me
}
}
});
}
public void Append(Action action)
{
_Queue.Add(action);
}
public void Dispose()
{
_Queue.CompleteAdding();
}
}
public class Test
{
public static void TimeIt(string name, IAppendable appendable)
{
var finishEvent = new ManualResetEvent(false);
var sw = new Stopwatch();
sw.Start();
for (int i = 0; i < 2000; ++i)
{
appendable.Append(() => { Thread.Sleep(1); });
}
appendable.Append(() => { finishEvent.Set(); });
finishEvent.WaitOne();
sw.Stop();
Console.WriteLine("{0} elapsed time: {1}ms", name, sw.ElapsedMilliseconds);
(appendable as IDisposable)?.Dispose();
}
public static void Main()
{
TimeIt("TaskGroup", new TaskGroup());
TimeIt("Queue", new QueueAppendable());
}
}
Output:
TaskGroup elapsed time: 2135ms
Queue elapsed time: 2121ms
So there is pretty much no performance difference between the two however I think the BlockingCollection
approach has a few advantages:
- Easier to debug. You can simply set a break point and peek the queue. This is quite difficult to do with the wrapped task approach.
- No use use of lower level synchronization primitives. The first time I read your code I instinctively thought "Hang he's got a
lock
why theInterlocked
calls" until I realized that the decrement happen in the async task outside of the lock. With theBlockingQueue
you program against a slightly higher level of abstraction which is often a good thing. - Fewer class members which reduce the state complexity of the object (the queue is the only member).
Apart from that I think your approach should be fine. You may want to consider adding support for cancellation via CancellationToken
-
\$\begingroup\$ Wait, what prevents the Queue implementation's tasks from running parallel instead of sequential? \$\endgroup\$Vengarioth– Vengarioth2016年02月13日 18:25:58 +00:00Commented Feb 13, 2016 at 18:25
-
\$\begingroup\$ @Vengarioth: The queue only has one task - the one which is pulling items of the queue and executing them. \$\endgroup\$ChrisWue– ChrisWue2016年02月14日 00:02:52 +00:00Commented Feb 14, 2016 at 0:02
-
\$\begingroup\$ Is it blocking the thread when there are no tasks in the Queue? \$\endgroup\$Vengarioth– Vengarioth2016年02月14日 20:35:14 +00:00Commented Feb 14, 2016 at 20:35
-
\$\begingroup\$ @Vengarioth: Yes, see
BlockingCollection.Take
\$\endgroup\$ChrisWue– ChrisWue2016年02月14日 20:36:43 +00:00Commented Feb 14, 2016 at 20:36 -
\$\begingroup\$ In my case, that is not an option unfortunately, i expanded my proposed solution to a linked list of tasks to perform basic list operations, but thank you a lot for your review! \$\endgroup\$Vengarioth– Vengarioth2016年02月17日 00:08:52 +00:00Commented Feb 17, 2016 at 0:08
Task Parallel Library (TPL)
I believe you are reinventing the wheel here. TPL provides numerous ways of synchronizing tasks.
For instance, it allows you to:
- create a task scheduler from current synchronization context or to
- start a task on a specified task scheduler.
All you need to do is create (削除) a custom synchronization context (.NET Core no longer uses these) (削除ここまで) or custom task scheduler that schedules tasks to a single thread. Perhaps your TaskGroup
could be used internally in such scheduler.
Each of your clients should use that specific scheduler to run tasks. This allows you to guard sequential behavior by client, even in an asynchronous environment. For instance, if each client has a session, and mutiple calls (multiple threads) can make requests concurrently in that session; all these calls should schedule tasks to the session-specific scheduler.
Any code that must/could run asynchronously, could still be scheduled on the default task scheduler to allow for concurrency when required to.
-
2\$\begingroup\$ This is a really old question and i think i ended up solving it this way, too. Thank you for taking the time to clarify :) \$\endgroup\$Vengarioth– Vengarioth2019年08月21日 10:09:47 +00:00Commented Aug 21, 2019 at 10:09
Explore related questions
See similar questions with these tags.