I have multiple distinct processes that need to access external resources that are rate limited. The processes are all async in nature and run in different applications. In times past I would just use SemaphoreSlim this design doesn't allow for that.
I've found several samples that seem to be half complete, or cut and paste into their code. This was derived from an existing post, but heavily modified to encompass the additional methods and to honor the cancellation token.
Any feedback would be appreciated.
public sealed class SemaphoreAsync : IDisposable
{
Semaphore _semaphore;
private SemaphoreAsync(Semaphore sem) => _semaphore = sem;
public SemaphoreAsync(int initialCount, int maximumCount) => _semaphore = new Semaphore(initialCount, maximumCount);
public SemaphoreAsync(int initialCount, int maximumCount, string name) => _semaphore = new Semaphore(initialCount, maximumCount, name);
public SemaphoreAsync(int initialCount, int maximumCount, string name, out bool createdNew, SemaphoreSecurity semaphoreSecurity) => _semaphore = new Semaphore(initialCount, maximumCount, name, out createdNew, semaphoreSecurity);
public static SemaphoreAsync OpenExisting(string name)
{
return new SemaphoreAsync(Semaphore.OpenExisting(name));
}
public static SemaphoreAsync OpenExisting(string name, SemaphoreRights rights)
{
return new SemaphoreAsync(Semaphore.OpenExisting(name, rights));
}
public static bool TryOpenExisting(string name, out SemaphoreAsync result)
{
if (Semaphore.TryOpenExisting(name, out Semaphore semaphore))
{
result = new SemaphoreAsync(semaphore);
return true;
}
result = null;
return false;
}
public static bool TryOpenExisting(string name, SemaphoreRights rights, out SemaphoreAsync result)
{
if (Semaphore.TryOpenExisting(name, rights, out Semaphore semaphore))
{
result = new SemaphoreAsync(semaphore);
return true;
}
result = null;
return false;
}
public async Task<bool> WaitOne(TimeSpan timeout, CancellationToken ct)
{
DateTime start = DateTime.UtcNow;
while (!_semaphore.WaitOne(0))
{
ct.ThrowIfCancellationRequested();
if (DateTime.UtcNow < start.Add(timeout))
return false;
await Task.Delay(100, ct);
}
return true;
}
public async Task<bool> WaitOne(int millisecondsTimeout, CancellationToken ct)
{
DateTime start = DateTime.UtcNow;
while (!_semaphore.WaitOne(0))
{
ct.ThrowIfCancellationRequested();
if (millisecondsTimeout > 0)
{
if (DateTime.UtcNow < start.AddMilliseconds(millisecondsTimeout))
return false;
}
await Task.Delay(100, ct);
}
return true;
}
public async Task<bool> WaitOne(CancellationToken ct)
{
while (!_semaphore.WaitOne(0))
{
ct.ThrowIfCancellationRequested();
await Task.Delay(100, ct);
}
return true;
}
public SemaphoreSecurity GetAccessControl()
{
return _semaphore.GetAccessControl();
}
public int Release()
{
return _semaphore.Release();
}
public int Release(int releaseCount)
{
return _semaphore.Release(releaseCount);
}
public void SetAccessControl(SemaphoreSecurity semaphoreSecurity)
{
_semaphore.SetAccessControl(semaphoreSecurity);
}
#region IDisposable Support
private bool disposedValue = false; // To detect redundant calls
void Dispose(bool disposing)
{
if (!disposedValue)
{
if (disposing)
{
// TODO: dispose managed state (managed objects).
if (_semaphore != null)
{
_semaphore.Dispose();
_semaphore = null;
}
}
disposedValue = true;
}
}
// This code added to correctly implement the disposable pattern.
public void Dispose()
{
// Do not change this code. Put cleanup code in Dispose(bool disposing) above.
Dispose(true);
}
#endregion
}
3 Answers 3
You could use ThreadPool.RegisterWaitForSingleObject
to register a callback when a the WaitHandle
(Semaphore
extends WaitHandle
) is signaled.
Together with a TaskCompletionSource
you could completely remove all your wait loops.
Example:
private async Task Run()
{
var semaphore = new Semaphore(0, 1);
await AwaitWaitHandle(semaphore, CancellationToken.None, TimeSpan.FromMilliseconds(-1));
}
private Task AwaitWaitHandle(WaitHandle handle, CancellationToken cancellationToken, TimeSpan timeout)
{
var taskCompletionSource = new TaskCompletionSource<bool>();
var reg = ThreadPool.RegisterWaitForSingleObject(handle,
(state, timedOut) =>
{
// Handle timeout
if (timedOut)
taskCompletionSource.TrySetCanceled();
taskCompletionSource.TrySetResult(true);
}, null, timeout, true);
// Handle cancellation
cancellationToken.Register(() =>
{
reg.Unregister(handle);
taskCompletionSource.TrySetCanceled();
});
return taskCompletionSource.Task;
}
You could use AwaitWaitHandle
in your SemaphoreAsync
implementation to await the Semaphore.
-
\$\begingroup\$ This is an interesting approach. I'm going to try this and the other method presented here. At first I thought there would be issues with multiple calls on the same CancellationToken but I noticed that you're unregistered it on the actual event. \$\endgroup\$Gary Smith– Gary Smith2019年05月30日 18:14:42 +00:00Commented May 30, 2019 at 18:14
-
\$\begingroup\$ The
Unregister
should probably be givennull
parameter (we do not want ot signal anything upon cancellation). TheCancellationTokenRegistration
should be probably disposed on successful wait. \$\endgroup\$Jan– Jan2020年09月17日 19:03:19 +00:00Commented Sep 17, 2020 at 19:03 -
\$\begingroup\$
TimeSpan.FromMilliseconds(-1)
can be replaced withTimeout.Infinite
.CancellationToken.None
can be replaced withdefault
. \$\endgroup\$Vaidotas Beržinskas– Vaidotas Beržinskas2021年11月14日 13:33:25 +00:00Commented Nov 14, 2021 at 13:33
Perhaps you could rewrite the wait operations to await both the semaphore or cancellation token.
public async Task<bool> WaitOne(TimeSpan timeout, CancellationToken ct) { DateTime start = DateTime.UtcNow; while (!_semaphore.WaitOne(0)) { ct.ThrowIfCancellationRequested(); if (DateTime.UtcNow < start.Add(timeout)) return false; await Task.Delay(100, ct); } return true; }
public async Task<bool> WaitOne(TimeSpan timeout, CancellationToken ct)
{
var success = await Task.Run(() =>
{
return WaitHandle.WaitTimeout
!= WaitHandle.WaitAny(new[] { _semaphore, ct.WaitHandle }, timeout);
});
ct.ThrowIfCancellationRequested();
return success;
}
I ended up going in a different direction. The secondary application that we needed to synchronize ended up being in python. The below code works for c# self-locking as well as python in the mix. It was taken from https://stackoverflow.com/questions/229565/what-is-a-good-pattern-for-using-a-global-mutex-in-c for the C# portion.
C#:
namespace SandBox
{
using System.Security.AccessControl;
using System.Security.Principal;
using System.Threading;
class Program
{
static log4net.ILog _log;
static Mutex mutex;
static bool mutexCreated;
static void Main(string[] args)
{
string mutexId = $"test1";
MutexAccessRule allowEveryoneRule = new MutexAccessRule(
new SecurityIdentifier(WellKnownSidType.WorldSid, null),
MutexRights.FullControl,
AccessControlType.Allow);
MutexSecurity securitySettings = new MutexSecurity();
securitySettings.AddAccessRule(allowEveryoneRule);
// initiallyOwned: true == false + mutex.WaitOne()
mutex = new Mutex(initiallyOwned: true, mutexId, out mutexCreated, securitySettings);
if (!mutexCreated)
{
Console.WriteLine("Already started!");
for (int i = 0; i < 10; i++)
{
Console.WriteLine("wait");
Thread.Sleep(2000);
}
}
if (mutexCreated)
{
try
{
mutex.ReleaseMutex();
}
catch (ApplicationException ex)
{
Console.WriteLine("Exception throw on mutexrelease!");
}
}
mutex.Dispose();
}
}
}
Python:
import os
import time
class mutext_test(object):
def __init__(self, id):
if self.isWin():
self.ensure_win32api()
self.mutexname = id
self.lock = win32event.CreateMutex(None, False, self.mutexname)
self.running = (win32api.GetLastError() == winerror.ERROR_ALREADY_EXISTS)
else:
self.ensure_fcntl()
self.lock = open(f"/tmp/instance_{id}.lock", 'wb')
try:
fcntl.lockf(self.lock, fcntl.LOCK_EX | fcntl.LOCK_NB)
self.running = False
except IOError:
self.running = True
def already_running(self):
return self.running
def __del__(self):
if self.lock:
try:
if self.isWin():
win32api.CloseHandle(self.lock)
else:
os.close(self.lock)
except Exception as ex:
pass
# ---------------------------------------
# Utility Functions
# Dynamically load win32api on demand
# Install with: pip install pywin32
win32api=winerror=win32event=None
def ensure_win32api(self):
global win32api,winerror,win32event
if self.win32api is None:
import win32api
import winerror
import win32event
# Dynamically load fcntl on demand
# Install with: pip install fcntl
fcntl=None
def ensure_fcntl(self):
global fcntl
if self.fcntl is None:
import fcntl
def isWin(self):
return (os.name == 'nt')
# ---------------------------------------
sic = mutext_test("test1")
if sic.already_running():
print("An instance of {} is already running.".format("test1"))
sys.exit(1)
while (True):
print("wait")
time.sleep(10)
-
\$\begingroup\$ If you want your alternate approach reviewed, you should post a follow up question with a link back to this question, That might be better than posting your own answer. \$\endgroup\$2022年11月10日 14:33:04 +00:00Commented Nov 10, 2022 at 14:33
-
1\$\begingroup\$ It was more or less for the person who was asking what my solution was. But I see your point. \$\endgroup\$Gary Smith– Gary Smith2022年11月10日 18:46:56 +00:00Commented Nov 10, 2022 at 18:46
ConcurrentDictionary
to storeSemaphoreSlim
s.SemaphoreSlim
already supports async operations. \$\endgroup\$