The purpose of this code is to let me loop over 100 items (up to MAX_CONCURRENT
at a time), performing some action on them, and then return only once all items have been processed:
/// <summary>Generic method to perform an action or set of actions
/// in parallel on each item in a collection of items, returning
/// only when all actions have been completed.</summary>
/// <typeparam name="T">The element type</typeparam>
/// <param name="elements">A collection of elements, each of which to
/// perform the action on.</param>
/// <param name="action">The action to perform on each element. The
/// action should of course be thread safe.</param>
/// <param name="MaxConcurrent">The maximum number of concurrent actions.</param>
public static void PerformActionsInParallel<T>(IEnumerable<T> elements, Action<T> action)
{
// Semaphore limiting the number of parallel requests
Semaphore limit = new Semaphore(MAX_CONCURRENT, MAX_CONCURRENT);
// Count of the number of remaining threads to be completed
int remaining = 0;
// Signal to notify the main thread when a worker is done
AutoResetEvent onComplete = new AutoResetEvent(false);
foreach (T element in elements)
{
Interlocked.Increment(ref remaining);
limit.WaitOne();
new Thread(() =>
{
try
{
action(element);
}
catch (Exception ex)
{
Console.WriteLine("Error performing concurrent action: " + ex);
}
finally
{
Interlocked.Decrement(ref remaining);
limit.Release();
onComplete.Set();
}
}).Start();
}
// Wait for all requests to complete
while (remaining > 0)
onComplete.WaitOne(10); // Slightly better than Thread.Sleep(10)
}
I include a timeout on the WaitOne()
before checking remaining
again to protect against the rare case where the last outstanding thread decrements 'remaining' and then signals completion between the main thread checking 'remaining' and waiting for the next completion signal, which would otherwise result in the main thread missing the last signal and locking forever. This is faster than just using Thread.Sleep(10)
because it has a chance to return immediately after the last thread completes.
Goals:
Ensure thread safety - I want to be sure I won't accidentally return too early (before all elements have been acted on), and be sure that I don't become deadlocked or otherwise stuck.
Add as little overhead as possible - minimizing amount of time that fewer than
MAX_CONCURRENT
threads are executingaction
, and returning as soon as possible after the finalaction
has been performed.
2 Answers 2
I've made use of a CountdownEvent
signal to avoid the use of the remaining
integer and avoid the busy waiting involved in polling it with the unreliable AutoResetEvent onComplete
:
public static void PerformActionsInParallel<T>(IEnumerable<T> elements, Action<T> action)
{
int threads = MaxConcurrent ?? DefaultMaxConcurrentRequests;
// Ensure elements is only enumerated once.
elements = elements as T[] ?? elements.ToArray();
// Semaphore limiting the number of parallel requests
Semaphore limit = new Semaphore(MAX_CONCURRENT, MAX_CONCURRENT);
// Count of the number of remaining threads to be completed
CountdownEvent remaining = new CountdownEvent(elements.Count());
foreach (T element in elements)
{
limit.WaitOne();
new Thread(() =>
{
try
{
action(element);
}
catch (Exception ex)
{
Console.WriteLine("Error performing concurrent action: " + ex);
}
finally
{
remaining.Signal();
limit.Release();
}
}).Start();
}
// Wait for all requests to complete
remaining.Wait();
}
There's one thing that is particularly disturbing me: the last while
block. That's an example of Busy waiting and that's something that should be avoided IMO. A possible solution to this problem could be to store the Thread
objects to a List
and once you have created all the threads you run a Thread.Join
for each thread on such list.
That being said, I'd suggest to take a look at PLINQ and TPL.
One last thing: I'd remove the Console.WriteLine
in the catch
block also. I'd say that the Console.WriteLine
instruction should be used only in a Main
method. See here and here for ways to handle exception when working in an async way.
-
\$\begingroup\$ Without wholesale switching to an out of the box parallel task runner, which loses me a lot of the flexibility I'm not showing here, is there any way I can finish this off without busy-waiting? My original intent was for the last line to read
while (remaining > 0) onComplete.WaitOne();
which would not be busy waiting at all, but as described, this ran into issues where the last two threads finished simultaneously. \$\endgroup\$Alain– Alain2015年11月16日 20:10:48 +00:00Commented Nov 16, 2015 at 20:10 -
\$\begingroup\$ @Alain In this answer to a StackOverflow post they describe several methods. I'd suggest to take a look. \$\endgroup\$Gentian Kasa– Gentian Kasa2015年11月16日 20:28:54 +00:00Commented Nov 16, 2015 at 20:28
-
\$\begingroup\$ @Alain, I edited the answer with a possible way of solving the busy waiting problem. There's no code, but it should be pretty straight-forward. \$\endgroup\$Gentian Kasa– Gentian Kasa2015年11月16日 20:41:53 +00:00Commented Nov 16, 2015 at 20:41
-
\$\begingroup\$ @Alain, I think you should post that as an answer to the question or as another question if you want review on that also. In the help center you can read about what you should do when someone answer your question. \$\endgroup\$Gentian Kasa– Gentian Kasa2015年11月16日 20:47:21 +00:00Commented Nov 16, 2015 at 20:47
-
\$\begingroup\$ Thanks, will do. I gave PLINQ a shot but it only ever uses as many threads as I have logical processors, which is sub-optimal in my use case - because the threads aren't CPU-intensive, they're Server-bound, and I get the best speed up by 'queuing up' many requests on the server rather than just as many requests as I have cores. \$\endgroup\$Alain– Alain2015年11月17日 19:11:03 +00:00Commented Nov 17, 2015 at 19:11
Explore related questions
See similar questions with these tags.
.AsParallel()
before. I gave it a shot, and it works well! \$\endgroup\$WithDegreeOfParallelism
\$\endgroup\$