2

It is correct to use a Parallel.ForEach while processing a request? I ask this because the async task was designed to attendee more requests as possible, not fast as possible which is what Parallel.ForEach will do.

Simple example:

public async Task<OperationResult> ProcessApiRequest(List<string> ids)
{
 Parallel.ForEach(ids, async (id) =>
 { 
 await this.doStuff(id);
 await this.doAnotherStuff(id);
 });
 return OperationResult.Success();
}

Imagine I can receive 1 id or 1 million id's, and I want to attendee as much request as possible. Since my threads will be busy processing 1 million id's, it will have struggle attendee new requests, I'm right?

Thank you!

asked Nov 29, 2019 at 12:06
2
  • 2
    Parallel.ForEach is meant for data parallelism, not concurrent operations. This call will fire off 1M async void calls that nobody will await. It will return almost immediatelly, perhaps before any of the requests had a chance to even start. The compiler will also issue a warning that ProcessApiRequest has no await and will run concurrently. Commented Nov 29, 2019 at 15:36
  • Yes this is was what I was worried about. Thank you! Commented Nov 29, 2019 at 15:40

2 Answers 2

4

No, it's not correct. Parallel.ForEach is meant for data parallelism. It will create as many worker tasks as there are cores on the machine, partition the input data and use one worker per partition. It doesn't know anything about async operations, which means your code is essentially :

Parallel.ForEach(ids, async void (int id) =>
 { 
 await this.doStuff(id);
 await this.doAnotherStuff(id);
 });

On a quad machine, this will fire off 1M requests, 4 at a time, without waiting for any of them. It could easily return before any of the requests had a chance to complete.

If you want to execute multiple requests in a controlled manner, you could use eg an ActionBlock with a specific degree of parallelism, eg :

var options=new ExecutionDataflowBlockOptions
{
 MaxDegreeOfParallelism = 10,
 BoundedCapacity=100
}
var block=new ActionBlock<string>(async id=>{....},options);
foreach(var id in ids)
{
 await block.SendAsync(id);
}
block.Complete();
await block.Completion;

The block will process up to 10 concurrent requests. If the actions are really asynchronous, or the async waits are long, we can easily use a higher DOP than the number of available cores.

Input messages are buffered, which means we could end up with 1M requests waiting in the input buffer of a slow block. To avoid this, the BoundedCapacity setting will block SendAsync if the block can't accept any more inputs.

Finally, the call to Complete() tells the block we're done and it should process any remaining messages in its input buffer. We await for them to finish with await block.Completion

answered Nov 29, 2019 at 15:51
1
  • In my case, the caller does not need to await the await this.doStuff(id) or other any operation, and what I return is an Acepted status code. So here I don't need the Parallel.ForEach at all because I only need process the "stuff". Thanks for your good explanation. Commented Nov 29, 2019 at 16:01
2

You are right to be concerned, Parallel.ForEach by default will use as many threads from the threadpool as it can , the threadpool will scale up gradually to it's maximum thread count of it needs to. Task.Run is generally a bad idea for a web server, Parallel.ForEach is often many times worse.

Especially given the ids is unbounded, you could quickly get to a situation where your requests will get queued as all of the threads are busy satisfying only a handful of requests.

So you are right to be concerned, this sort of code is optimizing the latency of individual requests for very low scale, but at scale will sacrifice a fair and well performing web server, ultimately undoing the latency initial latency win, and creating you a wider service problem.

Update - as Panagiotis Kanavos points out in the comments, Parallel.ForEach doesn't have Task overloads, so will just run the initial synchronous part of the delegate, leaving the bulk of the async work queued up, your API just became fire and forget possibly unknowingly.

For an alternative version using ChannelReader & ChannelWriter for a completely asynchronous producer consumer pattern, along with some new C# 8.0 syntax, you could try this:

public async Task<OperationResult> ProcessApiRequest(List<string> ids)
{
 var channel = Channel.CreateBounded<string>(new BoundedChannelOptions(100) {SingleWriter = true});
 foreach (var id in ids)
 {
 await channel.Writer.WriteAsync(id); // If the back pressure exceeds 100 ids, we asynchronously wait here
 }
 channel.Writer.Complete();
 for (var i = 0; i < 8; i++) // 8 concurrent readers
 {
 _ = Task.Run(async () =>
 {
 await foreach (var id in channel.Reader.ReadAllAsync())
 {
 await this.doStuff(id);
 await this.doAnotherStuff(id);
 }
 });
 }
 return OperationResult.Success();
}
answered Nov 29, 2019 at 15:30
18
  • 2
    That's not correct. Parallel.ForEach will create as many worker tasks as there are cores on the machine. It's meant for data paralellism, not concurrent operations, so it makes no sense to use more. The HUGE problem is that Parallel.ForEach knows nothing about async so all of those calls are essentially async void calls. They'll all be fired as fast as possible, 4 or 8 at the time, without waiting for any of them to complete Commented Nov 29, 2019 at 15:33
  • I hadn't spotted the lack of async delegate overloads, I'll update my answer. Can you point me to the docs that back up the claim that it's the number of cores, other apis do this, but this one uses the default TaskScheduler the docs says which would be unbounded. Commented Nov 29, 2019 at 15:37
  • it has nothing to do with APIs or TaskScheduler. There's no ForEach overload that accepts a Task<> Commented Nov 29, 2019 at 15:38
  • 1
    what it says is Behind the scenes, the Task Scheduler partitions the task based on system resources and workload. When possible, the scheduler redistributes work among multiple threads and processors if the workload becomes unbalanced. It says nothing at all about using 2 billion threads. It says you can customize both partitioning and scheduling, but if you don't the sensible default is to use all cores. Commented Nov 29, 2019 at 15:54
  • 1
    no, it will use all cores in the machine, not all 1000s of possible threads in that threadpool. Unless you explicitly increased the DOP, or used blocking code that forced the scheduler to increase the number of workers. All of which are described in the docs Commented Nov 29, 2019 at 15:59

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.