This is a class that allows only limited number of requests to proceed per period of time.
This is designed for use with external APIs that require such rate limiting, e.g. 600 requests per 10 minutes. I have a multithreaded application where these requests can be queued by users with varying frequency, and the goal is to allow these requests to progress as fast as possible without exceeding the API's limits.
As the requests are user-submitted, there may also be long periods of time where no requests come in. Therefore, I've decided to avoid using a permanent timer/loop that would continuously reset the semaphore count; instead, the count is reset on-demand as requests come in or while there are pending requests that have not yet been allowed to proceed.
class AsyncRateLimitedSemaphore
{
private readonly int maxCount;
private readonly TimeSpan resetTimeSpan;
private readonly SemaphoreSlim semaphore;
private DateTimeOffset nextResetTime;
private readonly object resetTimeLock = new();
public AsyncRateLimitedSemaphore(int maxCount, TimeSpan resetTimeSpan)
{
this.maxCount = maxCount;
this.resetTimeSpan = resetTimeSpan;
this.semaphore = new SemaphoreSlim(maxCount, maxCount);
this.nextResetTime = DateTimeOffset.UtcNow + this.resetTimeSpan;
}
private void TryResetSemaphore()
{
// quick exit if before the reset time, no need to lock
if (!(DateTimeOffset.UtcNow > this.nextResetTime))
{
return;
}
// take a lock so only one reset can happen per period
lock (this.resetTimeLock)
{
var currentTime = DateTimeOffset.UtcNow;
// need to check again in case a reset has already happened in this period
if (currentTime > this.nextResetTime)
{
this.semaphore.Release(this.maxCount - this.semaphore.CurrentCount);
this.nextResetTime = currentTime + this.resetTimeSpan;
}
}
}
public async Task WaitAsync()
{
// attempt a reset in case it's been some time since the last wait
TryResetSemaphore();
var semaphoreTask = this.semaphore.WaitAsync();
// if there are no slots, need to keep trying to reset until one opens up
while (!semaphoreTask.IsCompleted)
{
var delayTime = this.nextResetTime - DateTimeOffset.UtcNow;
// delay until the next reset period
// can't delay a negative time so if it's already passed just continue with a completed task
var delayTask = delayTime >= TimeSpan.Zero ? Task.Delay(delayTime) : Task.CompletedTask;
await Task.WhenAny(semaphoreTask, delayTask);
TryResetSemaphore();
}
}
}
Some thoughts:
nextResetTime
is notvolatile
, so the pre-lock read inTryResetSemaphore
and the non-locked read in the delay loop could read stale data. This should be fine since it'd just progress sooner into the locked check, at which point it'd exit without doing anything anyway.- Ordering should be guaranteed by the order in which
SemaphoreSlim.WaitAsync()
is called. So earlier requests should be processed first and won't be starved. I don't have a strict ordering requirement, just that later incoming requests don't cause one of the early ones to wait forever. - The semaphore release could potentially be interleaved with other acquires. This should be fine since there is no need for the semaphore to actually hit max; those interleaved acquires would just subtract some counts from the post-release available pool.
-
\$\begingroup\$ What minimum .NET version must be supported? \$\endgroup\$aepot– aepot2021年07月06日 12:01:35 +00:00Commented Jul 6, 2021 at 12:01
-
\$\begingroup\$ @aepot Safe to assume latest, so currently .NET 5. If there's anything coming in .NET 6 that would help, I'm happy to consider them. I have full control over the runtime environment. \$\endgroup\$Bob– Bob2021年07月06日 13:17:28 +00:00Commented Jul 6, 2021 at 13:17
1 Answer 1
The usage of nextResetTime
is incorrect. The non-locked accesses were under the assumption that those reads would be atomic; unfortunately, DateTimeOffset
(and DateTime
) are structs, value types. C# only provides atomicity guarantees for reference types and a subset of value types (which structs do not fall under).
The two possible solutions here are to either guard the reads of nextResetTime
with locks, or store the raw tick count as long
s with the appropriate Interlocked.Read
/Interlocked.Exchange
(since long
is not guaranteed atomic either).
The implementation below is with DateTimeOffset nextResetTime
stored as long nextResetTimeTicks
instead.
class AsyncRateLimitedSemaphore
{
private readonly int maxCount;
private readonly TimeSpan resetTimeSpan;
private readonly SemaphoreSlim semaphore;
private long nextResetTimeTicks;
private readonly object resetTimeLock = new();
public AsyncRateLimitedSemaphore(int maxCount, TimeSpan resetTimeSpan)
{
this.maxCount = maxCount;
this.resetTimeSpan = resetTimeSpan;
this.semaphore = new SemaphoreSlim(maxCount, maxCount);
this.nextResetTimeTicks = (DateTimeOffset.UtcNow + this.resetTimeSpan).UtcTicks;
}
private void TryResetSemaphore()
{
// quick exit if before the reset time, no need to lock
if (!(DateTimeOffset.UtcNow.UtcTicks > Interlocked.Read(ref this.nextResetTimeTicks)))
{
return;
}
// take a lock so only one reset can happen per period
lock (this.resetTimeLock)
{
var currentTime = DateTimeOffset.UtcNow;
// need to check again in case a reset has already happened in this period
if (currentTime.UtcTicks > Interlocked.Read(ref this.nextResetTimeTicks))
{
this.semaphore.Release(this.maxCount - this.semaphore.CurrentCount);
var newResetTimeTicks = (currentTime + this.resetTimeSpan).UtcTicks;
Interlocked.Exchange(ref this.nextResetTimeTicks, newResetTimeTicks);
}
}
}
public async Task WaitAsync()
{
// attempt a reset in case it's been some time since the last wait
TryResetSemaphore();
var semaphoreTask = this.semaphore.WaitAsync();
// if there are no slots, need to keep trying to reset until one opens up
while (!semaphoreTask.IsCompleted)
{
var ticks = Interlocked.Read(ref this.nextResetTimeTicks);
var nextResetTime = new DateTimeOffset(new DateTime(ticks, DateTimeKind.Utc));
var delayTime = nextResetTime - DateTimeOffset.UtcNow;
// delay until the next reset period
// can't delay a negative time so if it's already passed just continue with a completed task
var delayTask = delayTime >= TimeSpan.Zero ? Task.Delay(delayTime) : Task.CompletedTask;
await Task.WhenAny(semaphoreTask, delayTask);
TryResetSemaphore();
}
}
}
```
-
\$\begingroup\$ A design suggestion: method with name like
TryDoSomething
probably might returnbool
notvoid
. \$\endgroup\$aepot– aepot2021年07月06日 19:20:43 +00:00Commented Jul 6, 2021 at 19:20 -
1\$\begingroup\$ @aepot I did waffle over the naming: it was originally
ResetSemaphoreIfNeeded
. The current one is perhaps wrongly suggestive of aTryParse
-like pattern. That said, I'd rather find a more appropriate name if possible, since there is nothing meaningful to be done with a return value in this scenario. \$\endgroup\$Bob– Bob2021年07月06日 19:41:39 +00:00Commented Jul 6, 2021 at 19:41 -
\$\begingroup\$ Could this also be implemented using TaskScheduler, so that it could be used with Parallel.ForEachAsync? learn.microsoft.com/en-us/dotnet/api/… \$\endgroup\$John C– John C2023年05月19日 19:18:23 +00:00Commented May 19, 2023 at 19:18