I want to implement an Agent
-like object in C#. An Agent wraps some memory location (ideally storing an immutable object) and receives updates to that location. All these updates are performed asynchronously, but sequentially. In other words
At any point in time, at most one [update submitted to an] Agent is being executed.
My current implementation uses the TPL. The Agent
keeps a reference to the last update that has been or must be performed, wrapped in a Task
. When submitted a new update, you wrap the update in a Task
, get a reference to the last Task
and replace it, atomically, then invoke ContinueWith
on that last Task
with this new update Task
(everything submitted through a TaskScheduler
).
public class Agent<T> where T : class
{
private T wrappedValue;
private readonly TaskScheduler scheduler;
private Task lastTask;
public Agent (T value, TaskScheduler scheduler)
{
this.wrappedValue = value;
this.scheduler = scheduler;
}
public void Send (Action<T> operation)
{
Task task = new Task ((action) => ((Action<T>)action) (wrappedValue), operation);
Task localCurrent;
do {
// always append to the end
localCurrent = lastTask;
} while(Interlocked.CompareExchange (ref lastTask, task, localCurrent) != localCurrent);
if (localCurrent == null) {
task.Start (scheduler);
} else {
localCurrent.ContinueWith ((currentTask, scheduler) => {
task.Start ((TaskScheduler)scheduler);
}, scheduler, scheduler);
}
}
[...]
}
The first thread to send an action will see null
in the field and so needs to Start()
the Task
directly.
This (seems to) work. Only one submitted action is ever running at a given moment. Additionally, tasks are potentially run on different threads. I don't want a dedicated thread for updates (and I don't want to lock up a TaskScheduler
thread).
However, I'm worried about a few things that I think could be improved:
- I'd like to be able to
ContinueWith
theTask task
directly, instead of having to start it within a different submittedAction
. I feel like I need to create the newTask
so that I can atomically exchange thelastTask
field's value. - Lambda capturing of local variables, partly alleviated with state objects where possible.
- How can I keep a reference to the currently executing
Task
, so I can short-circuit execution? Or should I use a poison pill to signal termination and not chain any new tasks? Interlocked
for atomicity. Should I use aConcurrentQueue
? How would I manage not having a dedicated thread and only running oneTask
at a time with aConcurrentQueue
?- Is a
Volatile.Read
oflastTask
necessary forlocalCurrent = lastTask;
?
Doing this now
private void InvokeAndComplete (Action<T> act, TaskCompletionSource<byte> promise)
{
try {
act (internalValue);
} finally {
// some value we will ignore
promise.SetResult (1);
}
}
public Task Alter (Action<T> alterAction)
{
TaskCompletionSource<byte> promise = new TaskCompletionSource<byte> ();
Task localLastTask;
do {
// Not sure if Volatile.Read is necessary
localLastTask = Volatile.Read (ref lastTask);
} while (Interlocked.CompareExchange (ref lastTask, promise.Task, localLastTask) != localLastTask);
// only this thread has a reference to the object referenced by localLastTask
return localLastTask.ContinueWith ((currentTask) => {
InvokeAndComplete (alterAction, promise);
}, taskScheduler);
}
2 Answers 2
I would use
lock
instead ofInterlocked
, because its much easier to follow and debug. Unless you are after some kind of micro optimization, I think you should keep it simple:lock (lock) { if (lastTask == null) { task.Start(scheduler); } else { lastTask.ContinueWith(...); } lastTask = task; }
I think you can modify your lambda expression to get a current task:
Task task = null; task = new Task ((action) => { CurrentTask = task; ((Action<T>)action) (wrappedValue); }, operation);
Though I am not too experienced with TPL, so there might be an easier way.
-
\$\begingroup\$ I'd rather not
lock
when the critical section takes less time that the locking itself. Maybe with aSpinLock
. \$\endgroup\$Sotirios Delimanolis– Sotirios Delimanolis2015年07月17日 14:49:28 +00:00Commented Jul 17, 2015 at 14:49 -
\$\begingroup\$ @SotiriosDelimanolis have you read this post from Eric Lippert? The part about locks being expensive seems relevant - but don't get me wrong, this MT stuff is way over my head. \$\endgroup\$Mathieu Guindon– Mathieu Guindon2015年07月22日 03:44:18 +00:00Commented Jul 22, 2015 at 3:44
-
\$\begingroup\$ @Mat'sMug That is quite entertaining and useful, but I think I can hold my own in a multithreaded environment (though more in Java than C#). I'll admit that I have written the code with
lock
and it works, but I want to push the limits withInterlocked
. \$\endgroup\$Sotirios Delimanolis– Sotirios Delimanolis2015年07月22日 05:27:03 +00:00Commented Jul 22, 2015 at 5:27 -
\$\begingroup\$ @SotiriosDelimanolis, well if you really need to push the limits - by all means. But if there is no actual performance issue you are facing, then I would call this kind of micro-optimization an anti-pattern. Similar to how some people use shift operator to multiply, because multiplication is just not fast enough. :) But then again, its up to you. \$\endgroup\$Nikita B– Nikita B2015年07月27日 06:34:14 +00:00Commented Jul 27, 2015 at 6:34
Why not just use an ActionBlock
with a MaxDegreeOfParallelism
of 1?
private readonly ActionBlock<Action<T>> block =
new ActionBlock<Action<T>>(action => action(wrappedValue),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1
});
public void Send (Action<T> operation)
{
block.Post(operation);
}
Why? The best code is code you don't need to write, and this is almost a single-line solution. You won't need to concern yourself with housekeeping and synchronization and it accomplishes the primary requirement perfectly; it will execute your operations sequentially and asynchronously.
-
2\$\begingroup\$ Please add some explanation to your post. Why should the user do this? Why is this solution better? Etc, etc. Just posting code and a small comment doesn't help the readers. \$\endgroup\$SirPython– SirPython2015年07月22日 12:52:39 +00:00Commented Jul 22, 2015 at 12:52
-
\$\begingroup\$ This simplifies the code. It's good to know, but it slows things down. I want to use lower level primitives if possible. \$\endgroup\$Sotirios Delimanolis– Sotirios Delimanolis2015年07月22日 21:56:59 +00:00Commented Jul 22, 2015 at 21:56
Agent
? \$\endgroup\$