Because I'm quite naive regarding C#'s Task Asynchronous Programming and concurrency in general, and because it's so difficult to test, I'm concerned about the safety of this code. It's likely that those topics, or something even more esoteric to me (e.g. processor-specific issues like volatility rules), caused me to overlook something obvious to you. Please help me understand what I missed here!
Intent
Given a Func<T>
that does not support concurrency, we can safely initate the function concurrently, and receive Task<T>
. Concurrent initiations are "joined" to one of two iterations of the function; the active iteration, or the single pending iteration. The caller may disallow joining an ongoing iteration, to avoid inconsistent outcomes.
Contract
- This
ExclusiveFunc
class requires aFunc<T>
, which is set at construction and cannot be changed. ExclusiveFunc
has one public instance method,Task<T> Run(bool joinLate = false)
, used to initiate, or "trigger", theFunc<T>
.ExclusiveFunc
is safe for multi-threaded use. Multiple instances ofFunc<T>
will never run concurrently from any concurrent calls toRun
. When proxied throughExclusiveFunc.Run
, theFunc<T>
is protected from self-concurrency issues that might be caused by scheduled tasks or other concurrent use.- There will also never be more than one pending
Func<T>
. That is, there will at most be oneFunc<T>
running, and one waiting to run. TheFunc<T>
must be implemented such that the side-effects and return value of any single future iteration is sufficient for all pending callers. - Callers receive
Task<T>
for the iteration they have joined, whether joining the running or the pending operation.T
must be safe for concurrent use if concurrent callers will use the result. - Because "joining a run late" may result in unacceptable dirty / stale information or missed processing, the default operation when
Func<T>
is already running is to instead set one iteration ofFunc<T>
to run when the active operation is complete. In this default operation, the caller is guaranteed thatFunc<T>
will begin some time after the request was made. - Optionally, the caller may
joinLate
if knowledge of an ongoing iteration (including its return value and side effects) is sufficient, and initiating an additional iteration is not required. ExclusiveFunc.Run
need not be awaited, if a "fire and forget" implementation is required. However, this implementation is not ideal for heavy usage as such, since every call toRun
will internallyawait
for the result on its own thread.- All callers joined on any iteration of
Func<T>
will receive exceptions thrown by the specific iteration they await. - Awaiting callers will not wait unnecessarily (such as for subsequent runs), and specifically a queued request will not be forced onto the thread that was running
Func<T>
at the moment it was queued, but instead will run on the thread that queued it. - The
ExclusiveFunc
class is not vulnerable to internal transition race conditions. It will not halt ifFunc<T>
throws or the queue runs dry.
Specific Concerns
- Are there cross-platform issues caused by the
lock
-less implementation? For example, shouldqueue
be markedvolatile
? - Are there exception-handling problems? For example, the caller does not receive exceptions, or exceptions crash the queue.
- Are there defects in where the work happens that cause unexpected results? For example, the a Task never completes because its thread also runs pending requests.
- Is the lack of Task methods options a problem? For example, does this cause incorrect SynchronizationContext?
- Is this likely to cause a thread pool problem? For example, using twice as many threads as the caller expects?
- Is there a problem with using one Task to synchronize multiple threads in this way?
Code
using System;
using System.Threading;
using System.Threading.Tasks;
public class ExclusiveFunc<T>
{
public ExclusiveFunc(Func<T> func)
{
queue = new Queue(() => {
try {
return func();
}
finally {
queue = queue.Next();
}
});
}
public async Task<T> Run(bool joinLate = false)
{
var a = queue;
if (a.Current.Acquire())
{
a.Current.Acquired.Start();
return await a.Current.Acquired;
}
if (joinLate) {
return await a.Current.Acquired;
}
if (a.Pending.Acquire()) {
await a.Current.Acquired;
a.Pending.Acquired.Start();
}
return await a.Pending.Acquired;
}
private Queue queue;
private class Queue
{
public readonly State Current;
public readonly State Pending;
public Queue(Func<T> func) : this(func, new State(func)) { }
public Queue Next()
{
return new Queue(func, Pending);
}
private readonly Func<T> func;
private Queue(Func<T> func, State pending)
{
this.func = func;
Current = pending;
Pending = new State(func);
}
}
private class State
{
public Task<T> Acquired;
public State(Func<T> func)
{
this.func = func;
}
public bool Acquire()
{
return Interlocked.CompareExchange(ref Acquired, new Task<T>(func), null) == null;
}
private readonly Func<T> func;
}
}
Tests
using System.Threading;
using System.Threading.Tasks;
using Xunit;
public class ExclusiveIncrementer {
private int locked = 0;
private int count = 0;
public int Slow() {
Assert.Equal(0, Interlocked.Exchange(ref locked, 1));
Thread.Sleep(100);
Assert.Equal(1, Interlocked.Exchange(ref locked, 0));
return Interlocked.Increment(ref count);
}
}
public class ExclusiveFuncTest_WithoutThreads {
protected delegate Task<int> RunEf(bool joinLate = false);
protected virtual RunEf GetRun() {
return new ExclusiveFunc<int>(new ExclusiveIncrementer().Slow).Run;
}
[Fact]
public async Task ConcurrentRequestCanJoinOngoing() {
var run = GetRun();
var master = run();
var slave = run(true);
Assert.Equal(1, await master);
Assert.Equal(1, await slave);
}
[Fact]
public async Task ConcurrentRequestCanQueueIfOngoing() {
var run = GetRun();
var immediate = run();
var queued = run();
Assert.Equal(1, await immediate);
Assert.Equal(2, await queued);
}
[Fact]
public async Task ProceedsAfterQueueEmpty() {
var run = GetRun();
var first = run();
Assert.Equal(1, await first);
var second = run();
Assert.Equal(2, await second);
}
[Fact]
public async Task FireAndForgetCompletes() {
var run = GetRun();
var first = run();
var second = run();
Assert.Equal(2, await second);
}
[Fact]
public async Task OrderDeterminedByCallNotAwait() {
var run = GetRun();
var first = run();
var second = run();
Assert.Equal(2, await second);
Assert.Equal(1, await first);
}
[Fact]
public async Task MultiplePendingShareOperation() {
var run = GetRun();
var blocking = run();
var firstPending = run();
var secondPending = run();
Assert.Equal(2, await firstPending);
Assert.Equal(2, await secondPending);
}
[Fact]
public async Task JoinWillStartIfRequired() {
var run = GetRun();
var only = run(true);
Assert.Equal(1, await only);
}
}
public class ExclusiveFuncTest_WithThreads : ExclusiveFuncTest_WithoutThreads {
protected override RunEf GetRun() {
var run = base.GetRun();
return runThread;
Task<int> runThread(bool joinLate = false) {
// We enforce order with Sleep, to allow human-readable test outcomes
Thread.Sleep(30);
return Task.Run(() => run(joinLate));
}
}
}
Background (not the main question, but ofc comments welcome):
I would like to separate the locking logic from several schedulable tasks in my system (e.g. scheduled conference call setups, due e-mails). While unlikely, very closely-scheduled tasks may run concurrently. Rather than artificially restricting the scheduling resolution to an arbitrary "almost certainly safe" value, I want to ensure there is at most one running. The caller may determine whether joining an ongoing run is sufficient or not.
I understand domain-specific idempotency/synchronization is typically preferable.
Related: (for Node.js) Concurrent fire and forget async task queue
2 Answers 2
Foreword
I appreciate you taking the effort to edit your question time and again to clarify your goal. At first, I thought you were making a synchronous task scheduler using whatever thread is available at any given time, which would have been really cool as well! In fact, your smartly designed API could be augmented to become one if you allow:
- chaining pending executions
- a specific
Func<T>
instance for eachRun
Review
I haven't found any scheduling-related or other major issues. I only have a couple of small remarks.
Designing private classes allows you to have more leverage in defining access modifiers and validating arguments of internal state and methods. However, I would include some sort of debug-only checks on validating arguments to detect bad design early in unit tests.
private class State
{
public Task<T> Acquired; // <- OK, since your API does not allow public access to it
public State(Func<T> func)
{
Debug.Assert(func != null); // or use Contracts API in Debug Mode
this.func = func;
}
}
A public API should use argument checks.
public ExclusiveFunc(Func<T> func)
{
func = func ?? throw new ArgumentNullException(nameof(func)); // <- a MUST in public API
queue = new Queue(() => {
try {
return func();
}
finally {
queue = queue.Next();
}
});
}
-
1\$\begingroup\$ My pleasure and thanks for requesting clarification; it has yielded documentation that will help consumers if we keep this functionality in our library. Regarding
func != null
, you are right, that was lazy of me. C# 8.0 where are you ;) \$\endgroup\$shannon– shannon2019年06月17日 19:39:41 +00:00Commented Jun 17, 2019 at 19:39 -
1\$\begingroup\$ I'm thinking further about your chaining variable
Func<T>
comments. I'd given it some consideration. One concern I ran into early was defeating garbage collection on a deep graph. Another was complexity when limiting depth on the obviousConcurrentQueue
. I definitely see the value (e.g. read and write operations might be components of a longer process that can be interleaved, but not concurrent). \$\endgroup\$shannon– shannon2019年06月17日 20:09:55 +00:00Commented Jun 17, 2019 at 20:09 -
\$\begingroup\$ FYI, found a place where exceptions will crash the scheduler, lmk if you want to search for it yourself as a puzzle before I edit. \$\endgroup\$shannon– shannon2019年06月18日 13:23:18 +00:00Commented Jun 18, 2019 at 13:23
-
\$\begingroup\$ It can throw on the line before
a.Pending.Acquired.Start();
, causing the queue to block. I'll add a catch block in a moment after you've had a chance to look. \$\endgroup\$shannon– shannon2019年06月18日 14:37:48 +00:00Commented Jun 18, 2019 at 14:37 -
\$\begingroup\$ @shannon you await a Task not started because current is already pending? \$\endgroup\$dfhwze– dfhwze2019年06月18日 14:45:00 +00:00Commented Jun 18, 2019 at 14:45
A few fixes / possible improvements:
Scheduler Crash on Exception
This is the most significant I've found. The thread responsible for Starting the Pending Task first awaits the Current Task completion, but an exception will abort this continuation, leaving the queue in a hung state. The exception should be caught.
if (a.Pending.Acquire()) {
try {
await a.Current.Acquired;
}
catch {}
a.Pending.Acquired.Start();
}
(After discovering this defect, I first incorrectly edited the code, which a moderator pointed out to me. I've reverted the fix and moved it to this answer.)
Contract-stated thread assignment mismatch
Disagreement between Pending and Current SynchronizationContext
Explore related questions
See similar questions with these tags.
ContinueWith
and then used its task to chain the next one and so on...? Alternatively, wouldn't theConcurrentQueue<T>
be here easier to use? \$\endgroup\$