I am trying to work out the best way to go about a task which relys on multiple long running tasks taking place.
My use case is that I want to have multiple events running for set periods of time, where I have websocket connections to each event for that period.
My thoughts were that I keep a conurrent list of all active events, when a new event pops into the list, it spawns a new thread to handle the event, when the event pops off the list, this thread will be closed.
Is this a good way to go about it? I am trying to set up a proof of concept, where all I am doing is logging out the event ID to the console for now, it kind of works, but I haven't worked out a way to remove the thread yet etc.
Any advise anyone can give I would be really appreciative.
public class EventProcessingService : IHostedService, IDisposable
{
private readonly ILogger<EventProcessingService> _logger;
private readonly ICacheService _cacheService;
private const int MaxThreads = 10;
private static readonly CountdownEvent cde = new CountdownEvent(MaxThreads);
public static readonly BlockingCollection<int> eventIds = new BlockingCollection<int>();
ConcurrentBag<int> EventIdsProcessing = new ConcurrentBag<int>();
private Timer _timer = null!;
public EventProcessingService(ILogger<EventProcessingService> logger, ICacheService cacheService)
{
_logger = logger;
_cacheService = cacheService;
for (int i = 0; i < MaxThreads; i++)
{
Task.Factory.StartNew(Process, TaskCreationOptions.LongRunning);
}
}
public Task StartAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Timed Hosted Service running.");
_timer = new Timer(DoWork, null, TimeSpan.Zero,
TimeSpan.FromSeconds(5));
return Task.CompletedTask;
}
private void DoWork(object? state)
{
var ids = _cacheService.GetCachedEventIds();
foreach (var id in ids)
{
if (!EventIdsProcessing.Contains(id))
{
EventIdsProcessing.Add(id);
eventIds.Add(id);
}
}
cde.Wait();
}
private async Task Process()
{
foreach (var id in eventIds.GetConsumingEnumerable())
{
cde.Signal();
while (true)
{
Console.WriteLine(id);
await Task.Delay(1000);
}
}
}
public Task StopAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Timed Hosted Service is stopping.");
_timer?.Change(Timeout.Infinite, 0);
return Task.CompletedTask;
}
public void Dispose()
{
_timer?.Dispose();
}
}
-
\$\begingroup\$ You seem to conflate tasks with threads. When using tasks, you have no direct control over threads or whether tasks run concurrently or not. In principle, it is possible for an async application to run in a single-threaded environment - but then you're obviously limited to one concurrent task at a time. Tasks are intentionally an abstraction above the thread pool management, because managing a thread pool is often much more than you're bargaining for. \$\endgroup\$Flater– Flater2022年02月21日 09:48:09 +00:00Commented Feb 21, 2022 at 9:48
1 Answer 1
private
members
- I know naming is hard and MSDN has a lots of bad examples but
cde
is not a really good name- Try to capture what does it limit, like
ConcurrentProcessThrottler
ConcurrentProcessLimiter
- etc.
- Try to capture what does it limit, like
- Same applies for
_timer
, try to capture the essence why did you introduce it- I'm not sure that you really need to use the null-forgiving / damn-it operator
- Please try to follow consistent naming pattern
- Inconsistent:
EventIdsProcessing
,_cacheService
,cde
, etc. - Either use underscore prefix for all your
private
members or do not prefix them
- Inconsistent:
- I know it is a POC but I would suggest to receive the
maxThreads
as a constructor parameter rather than using a hard-codedconst
- Tasks are not Threads, so a way better name would be
MaxDegreeOfParallelism
ThresholdForMaxConcurrency
- etc.
- Tasks are not Threads, so a way better name would be
public
member
- Please try to use Pascal Casing for public member (
eventIds
)- It is unclear why it should be
public
- It is unclear why it should be
EventProcessingService
constructor
- Try to express your intent by using the discard operator
- If you want to just fire off a new
Task
and you don't care about theTask
itself then make this intent explicit
- If you want to just fire off a new
_ = Task.Factory.StartNew(Process, TaskCreationOptions.LongRunning);
- Here the
StartNew
returns aTask<Task>
so you need to callUnwrap
to have a flattenedTask
- Please prefer
Task.Run
overStartNew
since the latter one might be dangerous
- Please prefer
Process
- Using
GetConsumingEnumerable
works fine if the producer side calls theCompleteAdding
to signal it will not produce new elements - I assume that your infinite loop simulates some real processing logic
- Based on your code I don't see how will it move from the first element to the next since you have an infinite loop inside the loop body
StartAsync
- I do believe you should kick off your concurrent
Process
workers/consumers here, not inside the constructor- With that you would be able to pass the
CancellationToken
to theTask.Run
and to theProcess
as well
- With that you would be able to pass the
- I would also recommend to add protection against multiple
StartAsync
calls- A
StartAsync
should have any affect only if it was not called before or if there was a completedStopAsync
prior it
- A
DoWork
- It took me a couple of seconds to realize that
DoWork
has to match toTimerCallback
delegate that's why it has aobject? state
parameter- Please consider to add a comment there for future maintainers or to enhance legibility
- As I said several times please try to use better naming
- Here your
DoWork
acts like a single producer, please try to capture this information inside the method name
- Here your
- Please bear in my that
ConcurrentBag
is thread-safe if you perform atomic operation- Performing
Contains
thenAdd
is not atomic << not thread-safe - Please consider to use
lock
or useConcurrentDictionary
which does exposeTryAdd
- Performing
Dispose
- Please try to implement the Dispose pattern as it should be
-
1\$\begingroup\$ Thanks so much for your detailed response, I will read it all through properly and make the changes. \$\endgroup\$user3284707– user32847072022年02月23日 06:38:31 +00:00Commented Feb 23, 2022 at 6:38