It is correct to use a Parallel.ForEach
while processing a request? I ask this because the async task
was designed to attendee more requests as possible, not fast as possible which is what Parallel.ForEach
will do.
Simple example:
public async Task<OperationResult> ProcessApiRequest(List<string> ids)
{
Parallel.ForEach(ids, async (id) =>
{
await this.doStuff(id);
await this.doAnotherStuff(id);
});
return OperationResult.Success();
}
Imagine I can receive 1 id or 1 million id's, and I want to attendee as much request as possible. Since my threads will be busy processing 1 million id's, it will have struggle attendee new requests, I'm right?
Thank you!
2 Answers 2
No, it's not correct. Parallel.ForEach
is meant for data parallelism. It will create as many worker tasks as there are cores on the machine, partition the input data and use one worker per partition. It doesn't know anything about async
operations, which means your code is essentially :
Parallel.ForEach(ids, async void (int id) =>
{
await this.doStuff(id);
await this.doAnotherStuff(id);
});
On a quad machine, this will fire off 1M requests, 4 at a time, without waiting for any of them. It could easily return before any of the requests had a chance to complete.
If you want to execute multiple requests in a controlled manner, you could use eg an ActionBlock with a specific degree of parallelism, eg :
var options=new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 10,
BoundedCapacity=100
}
var block=new ActionBlock<string>(async id=>{....},options);
foreach(var id in ids)
{
await block.SendAsync(id);
}
block.Complete();
await block.Completion;
The block will process up to 10 concurrent requests. If the actions are really asynchronous, or the async waits are long, we can easily use a higher DOP than the number of available cores.
Input messages are buffered, which means we could end up with 1M requests waiting in the input buffer of a slow block. To avoid this, the BoundedCapacity
setting will block SendAsync
if the block can't accept any more inputs.
Finally, the call to Complete()
tells the block we're done and it should process any remaining messages in its input buffer. We await for them to finish with await block.Completion
-
In my case, the caller does not need to await the
await this.doStuff(id)
or other any operation, and what I return is anAcepted
status code. So here I don't need theParallel.ForEach
at all because I only need process the "stuff". Thanks for your good explanation.Kiril1512– Kiril15122019年11月29日 16:01:47 +00:00Commented Nov 29, 2019 at 16:01
You are right to be concerned, Parallel.ForEach
by default will use as many threads from the threadpool as it can , the threadpool will scale up gradually to it's maximum thread count of it needs to. Task.Run
is generally a bad idea for a web server, Parallel.ForEach
is often many times worse.
Especially given the ids
is unbounded, you could quickly get to a situation where your requests will get queued as all of the threads are busy satisfying only a handful of requests.
So you are right to be concerned, this sort of code is optimizing the latency of individual requests for very low scale, but at scale will sacrifice a fair and well performing web server, ultimately undoing the latency initial latency win, and creating you a wider service problem.
Update - as Panagiotis Kanavos points out in the comments, Parallel.ForEach
doesn't have Task
overloads, so will just run the initial synchronous part of the delegate, leaving the bulk of the async work queued up, your API just became fire and forget possibly unknowingly.
For an alternative version using ChannelReader
& ChannelWriter
for a completely asynchronous producer consumer pattern, along with some new C# 8.0 syntax, you could try this:
public async Task<OperationResult> ProcessApiRequest(List<string> ids)
{
var channel = Channel.CreateBounded<string>(new BoundedChannelOptions(100) {SingleWriter = true});
foreach (var id in ids)
{
await channel.Writer.WriteAsync(id); // If the back pressure exceeds 100 ids, we asynchronously wait here
}
channel.Writer.Complete();
for (var i = 0; i < 8; i++) // 8 concurrent readers
{
_ = Task.Run(async () =>
{
await foreach (var id in channel.Reader.ReadAllAsync())
{
await this.doStuff(id);
await this.doAnotherStuff(id);
}
});
}
return OperationResult.Success();
}
-
2That's not correct. Parallel.ForEach will create as many worker tasks as there are cores on the machine. It's meant for data paralellism, not concurrent operations, so it makes no sense to use more. The HUGE problem is that Parallel.ForEach knows nothing about
async
so all of those calls are essentiallyasync void
calls. They'll all be fired as fast as possible, 4 or 8 at the time, without waiting for any of them to completePanagiotis Kanavos– Panagiotis Kanavos2019年11月29日 15:33:27 +00:00Commented Nov 29, 2019 at 15:33 -
I hadn't spotted the lack of async delegate overloads, I'll update my answer. Can you point me to the docs that back up the claim that it's the number of cores, other apis do this, but this one uses the default TaskScheduler the docs says which would be unbounded.Stuart– Stuart2019年11月29日 15:37:17 +00:00Commented Nov 29, 2019 at 15:37
-
it has nothing to do with APIs or TaskScheduler. There's no
ForEach
overload that accepts aTask<>
Panagiotis Kanavos– Panagiotis Kanavos2019年11月29日 15:38:01 +00:00Commented Nov 29, 2019 at 15:38 -
1what it says is
Behind the scenes, the Task Scheduler partitions the task based on system resources and workload. When possible, the scheduler redistributes work among multiple threads and processors if the workload becomes unbalanced.
It says nothing at all about using 2 billion threads. It says you can customize both partitioning and scheduling, but if you don't the sensible default is to use all cores.Panagiotis Kanavos– Panagiotis Kanavos2019年11月29日 15:54:33 +00:00Commented Nov 29, 2019 at 15:54 -
1no, it will use all cores in the machine, not all 1000s of possible threads in that threadpool. Unless you explicitly increased the DOP, or used blocking code that forced the scheduler to increase the number of workers. All of which are described in the docsPanagiotis Kanavos– Panagiotis Kanavos2019年11月29日 15:59:02 +00:00Commented Nov 29, 2019 at 15:59
Explore related questions
See similar questions with these tags.
async void
calls that nobody will await. It will return almost immediatelly, perhaps before any of the requests had a chance to even start. The compiler will also issue a warning thatProcessApiRequest
has no await and will run concurrently.