0
\$\begingroup\$

This is a class that allows only limited number of requests to proceed per period of time.

This is designed for use with external APIs that require such rate limiting, e.g. 600 requests per 10 minutes. I have a multithreaded application where these requests can be queued by users with varying frequency, and the goal is to allow these requests to progress as fast as possible without exceeding the API's limits.

As the requests are user-submitted, there may also be long periods of time where no requests come in. Therefore, I've decided to avoid using a permanent timer/loop that would continuously reset the semaphore count; instead, the count is reset on-demand as requests come in or while there are pending requests that have not yet been allowed to proceed.

class AsyncRateLimitedSemaphore
{
 private readonly int maxCount;
 private readonly TimeSpan resetTimeSpan;
 private readonly SemaphoreSlim semaphore;
 private DateTimeOffset nextResetTime;
 private readonly object resetTimeLock = new();
 public AsyncRateLimitedSemaphore(int maxCount, TimeSpan resetTimeSpan)
 {
 this.maxCount = maxCount;
 this.resetTimeSpan = resetTimeSpan;
 this.semaphore = new SemaphoreSlim(maxCount, maxCount);
 this.nextResetTime = DateTimeOffset.UtcNow + this.resetTimeSpan;
 }
 private void TryResetSemaphore()
 {
 // quick exit if before the reset time, no need to lock
 if (!(DateTimeOffset.UtcNow > this.nextResetTime))
 {
 return;
 }
 // take a lock so only one reset can happen per period
 lock (this.resetTimeLock)
 {
 var currentTime = DateTimeOffset.UtcNow;
 // need to check again in case a reset has already happened in this period
 if (currentTime > this.nextResetTime)
 {
 this.semaphore.Release(this.maxCount - this.semaphore.CurrentCount);
 this.nextResetTime = currentTime + this.resetTimeSpan;
 }
 }
 }
 public async Task WaitAsync()
 {
 // attempt a reset in case it's been some time since the last wait
 TryResetSemaphore();
 var semaphoreTask = this.semaphore.WaitAsync();
 // if there are no slots, need to keep trying to reset until one opens up
 while (!semaphoreTask.IsCompleted)
 {
 var delayTime = this.nextResetTime - DateTimeOffset.UtcNow;
 // delay until the next reset period
 // can't delay a negative time so if it's already passed just continue with a completed task
 var delayTask = delayTime >= TimeSpan.Zero ? Task.Delay(delayTime) : Task.CompletedTask;
 await Task.WhenAny(semaphoreTask, delayTask);
 TryResetSemaphore();
 }
 }
}

Some thoughts:

  • nextResetTime is not volatile, so the pre-lock read in TryResetSemaphore and the non-locked read in the delay loop could read stale data. This should be fine since it'd just progress sooner into the locked check, at which point it'd exit without doing anything anyway.
  • Ordering should be guaranteed by the order in which SemaphoreSlim.WaitAsync() is called. So earlier requests should be processed first and won't be starved. I don't have a strict ordering requirement, just that later incoming requests don't cause one of the early ones to wait forever.
  • The semaphore release could potentially be interleaved with other acquires. This should be fine since there is no need for the semaphore to actually hit max; those interleaved acquires would just subtract some counts from the post-release available pool.
asked Jul 6, 2021 at 8:13
\$\endgroup\$
2
  • \$\begingroup\$ What minimum .NET version must be supported? \$\endgroup\$ Commented Jul 6, 2021 at 12:01
  • \$\begingroup\$ @aepot Safe to assume latest, so currently .NET 5. If there's anything coming in .NET 6 that would help, I'm happy to consider them. I have full control over the runtime environment. \$\endgroup\$ Commented Jul 6, 2021 at 13:17

1 Answer 1

2
\$\begingroup\$

The usage of nextResetTime is incorrect. The non-locked accesses were under the assumption that those reads would be atomic; unfortunately, DateTimeOffset (and DateTime) are structs, value types. C# only provides atomicity guarantees for reference types and a subset of value types (which structs do not fall under).

The two possible solutions here are to either guard the reads of nextResetTime with locks, or store the raw tick count as longs with the appropriate Interlocked.Read/Interlocked.Exchange (since long is not guaranteed atomic either).

The implementation below is with DateTimeOffset nextResetTime stored as long nextResetTimeTicks instead.

class AsyncRateLimitedSemaphore
{
 private readonly int maxCount;
 private readonly TimeSpan resetTimeSpan;
 private readonly SemaphoreSlim semaphore;
 private long nextResetTimeTicks;
 private readonly object resetTimeLock = new();
 public AsyncRateLimitedSemaphore(int maxCount, TimeSpan resetTimeSpan)
 {
 this.maxCount = maxCount;
 this.resetTimeSpan = resetTimeSpan;
 this.semaphore = new SemaphoreSlim(maxCount, maxCount);
 this.nextResetTimeTicks = (DateTimeOffset.UtcNow + this.resetTimeSpan).UtcTicks;
 }
 private void TryResetSemaphore()
 {
 // quick exit if before the reset time, no need to lock
 if (!(DateTimeOffset.UtcNow.UtcTicks > Interlocked.Read(ref this.nextResetTimeTicks)))
 {
 return;
 }
 // take a lock so only one reset can happen per period
 lock (this.resetTimeLock)
 {
 var currentTime = DateTimeOffset.UtcNow;
 // need to check again in case a reset has already happened in this period
 if (currentTime.UtcTicks > Interlocked.Read(ref this.nextResetTimeTicks))
 {
 this.semaphore.Release(this.maxCount - this.semaphore.CurrentCount);
 var newResetTimeTicks = (currentTime + this.resetTimeSpan).UtcTicks;
 Interlocked.Exchange(ref this.nextResetTimeTicks, newResetTimeTicks);
 }
 }
 }
 public async Task WaitAsync()
 {
 // attempt a reset in case it's been some time since the last wait
 TryResetSemaphore();
 var semaphoreTask = this.semaphore.WaitAsync();
 // if there are no slots, need to keep trying to reset until one opens up
 while (!semaphoreTask.IsCompleted)
 {
 var ticks = Interlocked.Read(ref this.nextResetTimeTicks);
 var nextResetTime = new DateTimeOffset(new DateTime(ticks, DateTimeKind.Utc));
 var delayTime = nextResetTime - DateTimeOffset.UtcNow;
 // delay until the next reset period
 // can't delay a negative time so if it's already passed just continue with a completed task
 var delayTask = delayTime >= TimeSpan.Zero ? Task.Delay(delayTime) : Task.CompletedTask;
 await Task.WhenAny(semaphoreTask, delayTask);
 TryResetSemaphore();
 }
 }
}
```
answered Jul 6, 2021 at 18:39
\$\endgroup\$
3
  • \$\begingroup\$ A design suggestion: method with name like TryDoSomething probably might return bool not void. \$\endgroup\$ Commented Jul 6, 2021 at 19:20
  • 1
    \$\begingroup\$ @aepot I did waffle over the naming: it was originally ResetSemaphoreIfNeeded. The current one is perhaps wrongly suggestive of a TryParse-like pattern. That said, I'd rather find a more appropriate name if possible, since there is nothing meaningful to be done with a return value in this scenario. \$\endgroup\$ Commented Jul 6, 2021 at 19:41
  • \$\begingroup\$ Could this also be implemented using TaskScheduler, so that it could be used with Parallel.ForEachAsync? learn.microsoft.com/en-us/dotnet/api/… \$\endgroup\$ Commented May 19, 2023 at 19:18

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.