Assume I have a type task_info
that stores the task-specific data needed to execute the task. A std::vector
of those is built before executing any of the individual tasks; this build-up is done in single-thread fashion. Then the parallel task execution is set up.
To control access to the mutable resource (e.g. the output streams), I need a mutex.
#include <atomic>
#include <thread>
#include <mutex>
#include <vector>
class task_info{}; // fill the blanks
void execute_task(task_info info, std::mutex& mutex); // fill the blanks
void process_in_parallel(
std::vector<task_info> task_infos,
std::size_t threads_count = std::thread::hardware_concurrency()
)
{
// Never use more threads than there is work.
threads_count = std::min(threads_count, task_infos.size());
std::atomic_size_t task_index{ 0 }; // Note: std::atomic_size_t task_index{} is UB!
std::mutex mutex{};
// Every thread will do this.
auto process = [&task_index, &task_infos, &mutex]
{
// The loop variable i indicates the next task to do on this thread.
// Note: `fetch_add` is a post-increment, basically task_index++.
for (std::size_t i; (i = task_index.fetch_add(1, std::memory_order_relaxed)) < task_infos.size(); )
{
execute_task(task_infos[i], mutex);
}
};
std::vector<std::thread> threads(threads_count); // Set up the threads.
for (auto& thread : threads) thread = std::thread{ process }; // Start the threads.
for (auto& thread : threads) thread.join(); // Don’t forget to join the threads!
}
What is std::memory_order_relaxed
? That is a memory fence and it is concerned with the relations of changes to multiple atomic variables are visible to threads. As long as you have one atomic variable only, use std::memory_order_relaxed
as it puts the least constraints on the optimizer. For that reason, do not ever use task_index++
because it does fetch_add(1, std::memory_order_seq_cst)
, i.e. it does what you want, but with a fence that puts unnecessary restrictions.
How to use the mutex? The mutex is locked before the resource is accessed and unlocked as soon as the thread does not use the resource anymore. In simple cases, you use a std::lock_guard
object to lock the mutex at its initialization and unlock the mutex at its destruction.
void execute_task(task_info info, std::mutex& mutex)
{
// work ...
{
std::lock_guard<std::mutex> lock{ mutex };
std::cout << "Task " << info.id() << " done 50%" << std::endl;
// lock_guard unlocks mutex at the closing brace
}
// work ...
{
std::lock_guard<std::mutex> lock{ mutex };
std::cout << "Task " << info.id() << " completed" << std::endl;
}
}
If the little scopes are too noisy for a single line of code, one can use a comma expression:
// work ...
std::lock_guard<std::mutex>{ mutex }, std::cout << "Task " << info.id() << " done 50%" << std::endl;
// work ...
std::lock_guard<std::mutex>{ mutex }, std::cout << "Task " << info.id() << " completed" << std::endl;
The unnamed temporary lock_guard
’s destructor runs at the next semicolon.
If you don’t actually have a shared mutable resource, you don’t need the mutex.
If task_infos.size() > SIZE_MAX - threads_count
the index i
overlfows/wraps because task_index
is incremented once for each task_info
(loop condition is true) plus one additional time for each thread (loop condition false). Unless threads_count
is unreasonably big or you have a gigantic number of task_infos
, this won’t be a problem, but if it might be, it must be mitigated by fetch_add
ing manually:
auto process = [&task_index, &task_infos, &mutex]
{
for (std::size_t i{}; i < task_infos.size(); )
{
if (task_index.compare_exchange_weak(i, i + 1, std::memory_order_relaxed))
execute_task(task_infos[i], mutex);
}
};
compare_exchange_weak
is the crucial part: It compares task_index
and i
; if they’re equal, sets task_index
to i + 1
(i.e. increments it) and returns true
; else, it sets i
to the current value of task_index
and returns false
. In our case, a return value of true
, i.e. a successful increment, means that this thread has won the race for i
and will perform the corresponding task. A return value of false
sets i
to a new value, which can be of one of two categories: An index to "bargain for" or task_infos.size()
. If it is the latter, the for
loop condition will fail and this thread stops doing tasks. Otherwise, it’s compare_exchange_weak
again, but with the new i
.
About the "weak" part, it means that compare_exchange_weak
is allowed return false
(and not update task_index
) even if i
and task_index
are equal (it may "spuriously fail" in technical terms). Because that is fine in our case, compare_exchange_strong
is not needed.
Is my reasoning in all of this correct?
1 Answer 1
Turn the pattern into a generic function
Your process_in_parallel()
function only works with one specific task_info
type and one execute_task()
function. If you had different kinds of tasks to perform, you'd have to create multiple functions like process_in_parallel()
as well. But in C++ we can avoid that by making process_in_parallel()
a template:
template<typename TaskInfo, typename Executor>
void process_in_parallel(
std::vector<TaskInfo> task_infos,
Executor execute_task,
std::size_t threads_count = std::thread::hardware_concurrency()
) {
...
}
The standard library already provides this functionality
Since C++17 the standard library provides parallel versions of many of the standard algorithms. For example, you can write:
class task_info {...};
void execute_task(task_info);
std::vector<task_info> tasks;
std::for_each(std::execution::par, tasks.begin(), tasks.end(), execute_task);
If you need a mutex as well, then you could have it as a global variable, or use lambda expressions to pass a mutex:
void execute_task(task_info, std::mutex&);
...
std::mutex mutex;
std::for_each(std::execution::par, tasks.begin(), tasks.end(),
[&mutex](auto& task_info){ execute_task(task_info, mutex); });
-
1\$\begingroup\$ You’re right about making it a template. I didn’t tag it, but the code base I’m working with is C++14. But are you sure
std::execution::par_unseq
is right? It puts a lot of restrictions, like you cannot lock a mutex or allocate memory. As far as I understand,std::execution::par
might the right one. \$\endgroup\$Bolpat– Bolpat2023年04月20日 12:23:56 +00:00Commented Apr 20, 2023 at 12:23 -
\$\begingroup\$ You're right, it should just be
std::execution::par
. \$\endgroup\$G. Sliepen– G. Sliepen2023年04月20日 12:49:08 +00:00Commented Apr 20, 2023 at 12:49 -
1\$\begingroup\$ I just tested this with GCC 12.2.0 and Clang 16.0.1 using
-O3
giving it 100task_info
elements: Clang does not support execution policies at all and GCC does not parallelize (as I understand, an execution policy is an optimization suggestion, not something it must do). \$\endgroup\$Bolpat– Bolpat2023年04月27日 17:44:32 +00:00Commented Apr 27, 2023 at 17:44 -
\$\begingroup\$ You might need to link with the TBB library for the parallel execution policies to work as expected. \$\endgroup\$G. Sliepen– G. Sliepen2023年04月28日 06:18:36 +00:00Commented Apr 28, 2023 at 6:18