I've been thinking about the AsyncLazy<T>
class that Stephen Toub and Stephen Cleary present in various resources such as this. I am trying to find a way to integrate cancellation with AsyncLazy<T>
. Microsoft has a version of AsyncLazy<T>
in their Visual Studio 2019 SDK that offers a GetValueAsync(CancellationToken)
method, but if you read the fine print, it says:
Note that this will not cancel the value factory (since other callers may exist).
I would like something that does cancel the value factory. After failing to get anywhere with my research, I decided to attempt my own which is shown in the code below. However, I'm having trouble with thread safety, as I don't want multiple threads to execute the long-running value factory. I want any thread that comes in behind the first one to essentially await
the completion of that first thread, and then the subsequent thread would have the trivial task of just returning the already-created value.
Here is the code:
public sealed class CancelableAsyncLazy<T>
{
private readonly Func<CancellationToken, Task<T>> _valueFactory;
private T _createdValue;
private bool _isValueCreated = false;
private volatile bool _shouldWait = false;
public CancelableAsyncLazy(Func<CancellationToken, Task<T>> valueFactory)
=> _valueFactory = valueFactory;
public async Task<T> GetValueAsync(CancellationToken cancellationToken)
{
await Task.Run(() =>
{
while (_shouldWait) Thread.Sleep(1);
});
_shouldWait = true;
try
{
if (_isValueCreated) return _createdValue;
cancellationToken.ThrowIfCancellationRequested();
_createdValue = await _valueFactory(cancellationToken).ConfigureAwait(false);
_isValueCreated = true;
return _createdValue;
}
finally
{
_shouldWait = false;
}
}
}
I don't like the fact that I'm using Task.Run
and Thread.Sleep
to wait for the value to be created by an earlier thread, but I can't think of a better way to do it. The key is that it shouldn't block. I tried using a Mutex
, but it's possible the same thread could call the method before the value is created, so the subsequent call (from the same thread) would just blow past the Mutex
.
1 Answer 1
Specification Considerations
Note that the first caller that gets access is the one that provides the cancellationToken
for the valueFactory
. Other callers use their own cancellationToken
for timing out on awaiting the result of the async operation. This is how you designed your class. I did not change this part of the behavior.
If this behavior is not wanted, consider storing a cancellation token through the constructor. This one is used solely for the factory. And each call to GetValueAsync
could then use its own cancellation token to time out, without influencing possible cancellation of the factory. This is a design decision you would have to make.
Review
This is not a good solution. You know it, we all know it :-)
await Task.Run(() => { while (_shouldWait) Thread.Sleep(1); });
The way to delay a thread the async way is to use Task.Delay
(Overview TAP Guidelines). This is already better. But as you mentioned, this still feels like a hack (and it is). Using some sort of Mutex
would be a better approach, as you rightfully indicated. We'll get to that.
Lets' walk through your code.
You define 2 similar variables _isValueCreated
and _shouldWait
. If you think about it, they have the same purpose. As long the value is not created, a caller should wait. Let's merge them into a variable named .. let's say _memoized
. This is common terminology in caching.
private bool _isValueCreated = false; private volatile bool _shouldWait = false;
Rather than the whole construct of abusing a thread's Sleep
and using a volatile
bool to simulate a lock, we could use a Semaphore
(Async Mutex using a Semaphore).
As discussed in the previous link, this is not possible..
// example code
lock (lockObject)
{
await Task.Delay(1000);
}
.. but this is..
// example code
await semaphoreSlim.WaitAsync();
try
{
await Task.Delay(1000);
}
finally
{
semaphoreSlim.Release();
}
Let's see how we can implement this in your cancellable async lazy class.
Refactoring the Code
We define our variables. Your two booleans are merged into one. And we use the semaphore described above.
private readonly Func<CancellationToken, Task<T>> _valueFactory;
private volatile bool _memoized;
private readonly SemaphoreSlim _mutex;
private T _value;
The constructor is straight-forward. Make sure to use a Semaphore
than emulates a Mutex
so one thread has access to the resource simultaneously.
public CancelableAsyncLazy(Func<CancellationToken, Task<T>> valueFactory)
{
_valueFactory = valueFactory ?? throw new ArgumentNullException(nameof(valueFactory));
_mutex = new SemaphoreSlim(1, 1);
}
The infamous GetValueAsync
can then be rewritten completely. Each caller thread can provide its own cancellationToken
. Use it to acquire a lock on _mutex
. Once we have the lock, check whether _value
is already memoized. If not (!_memoized
) execute _valueFactory
, and memoize the value. Now, we perform another timeout check for the calling thread ThrowIfCancellationRequested
. Even though we have the value available now, the caller might still have timed out, so let him now. Don't forget to release the mutex.
public async Task<T> GetValueAsync(CancellationToken cancellationToken)
{
await _mutex.WaitAsync(cancellationToken);
try
{
if (!_memoized)
{
_value = await _valueFactory(cancellationToken).ConfigureAwait(false);
_memoized = true;
cancellationToken.ThrowIfCancellationRequested();
}
return _value;
}
finally
{
_mutex.Release();
}
}
We should allow for a convenience overload if no cancellation support is required for a given caller.
public async Task<T> GetValueAsync() => await GetValueAsync(CancellationToken.None);
And since we comply to the concept of Lazy
we should also provide a synchronous property Value
.
public T Value => GetValueAsync().Result;
Refactored Code
public sealed class CancelableAsyncLazy<T>
{
private readonly Func<CancellationToken, Task<T>> _valueFactory;
private volatile bool _memoized;
private readonly SemaphoreSlim _mutex;
private T _value;
public CancelableAsyncLazy(Func<CancellationToken, Task<T>> valueFactory)
{
_valueFactory = valueFactory
?? throw new ArgumentNullException(nameof(valueFactory));
_mutex = new SemaphoreSlim(1, 1);
}
public T Value => GetValueAsync().Result;
public async Task<T> GetValueAsync() => await GetValueAsync(CancellationToken.None);
public async Task<T> GetValueAsync(CancellationToken cancellationToken)
{
await _mutex.WaitAsync(cancellationToken);
try
{
if (!_memoized)
{
_value = await _valueFactory(cancellationToken).ConfigureAwait(false);
_memoized = true;
// at this point, the value is available, however, the caller
// might have indicated to cancel the operation; I favor
// checking for cancellation one last time here, but you
// might decide against it and return the result anyway
cancellationToken.ThrowIfCancellationRequested();
}
return _value;
}
finally
{
_mutex.Release();
}
}
}
-
\$\begingroup\$ What's the
cancellationToken.ThrowIfCancellationRequested();
for in the refactored code? The point of cancellation is to avoid doing expensive operations, but at that point there's nothing left to do. \$\endgroup\$Peter Taylor– Peter Taylor2019年07月17日 07:17:16 +00:00Commented Jul 17, 2019 at 7:17 -
\$\begingroup\$ @PeterTaylor Because the caller might have indicated he no longer wants to await the result of the operation. This is an edge case for a race condition. One could design it to favor returning the result anyway. I have decided against it. \$\endgroup\$dfhwze– dfhwze2019年07月17日 07:30:43 +00:00Commented Jul 17, 2019 at 7:30
-
\$\begingroup\$ Thank you for your very thorough answer; I appreciate it. One thing I can't seem to wrap my mind around is this: with the
WaitAsync
method "there is no thread", correct? So how can a locking mechanism grab the mutex if it doesn't have a thread to associate it with and prevent other threads from grabbing the mutex until it's released? In fact (based on my testing) the first call never does any async at all because if I use.ConfigureAwait(false)
it continues on the main thread, not a thread pool thread. \$\endgroup\$rory.ap– rory.ap2019年07月17日 14:21:25 +00:00Commented Jul 17, 2019 at 14:21 -
\$\begingroup\$ @rory.ap A semaphore allows for thread-safe access to a resource. It does not have an affinity to a thread. In other words, thread A may acquire the semaphore, all other threads will block untill any thread A or B releases the semaphore. As you can see, it differs from a lock where thread affinity is required. \$\endgroup\$dfhwze– dfhwze2019年07月18日 05:46:28 +00:00Commented Jul 18, 2019 at 5:46
Explore related questions
See similar questions with these tags.
Thread
features you're currently using (which means asking for code not yet implemented). \$\endgroup\$