I have made a thread pool which will dynamically create threads based on how many you need.
#pragma once
#include <thread>
#include <atomic>
#include <mutex>
#include <vector>
#include <functional>
#include <condition_variable>
class thread_pool {
public:
thread_pool() : stop(false), busy_workers(0) { }
~thread_pool() {
stop = true;
task_available.notify_all();
for (auto& worker : workers) {
if (worker.joinable()) {
worker.join();
}
}
}
void run_task(std::function<void()> task) {
{
std::lock_guard<std::mutex> task_lock{ task_mu };
current_task = std::move(task);
}
{
std::lock_guard<std::mutex> workers_lock{ workers_mu };
if (workers.size() == busy_workers++) {
workers.emplace_back(work);
return;
}
}
task_available.notify_one();
}
private:
std::atomic_bool stop;
std::atomic_size_t busy_workers;
std::vector<std::thread> workers;
std::function<void()> current_task;
std::condition_variable task_available;
std::mutex task_mu;
std::mutex workers_mu;
std::function<void()> work = [&]() {
while (true) {
std::unique_lock<std::mutex> task_lock{ task_mu };
task_available.wait(task_lock, [&]() { return current_task || stop; });
if (!current_task && stop) return;
auto task = std::move(current_task);
task_lock.unlock();
task();
busy_workers--;
}
};
};
The only problem I have with it is the threads will be destroyed before all the tasks have been completed, even though I join all the threads in the destructor.
1 Answer 1
Bugs
You have several race conditions:
There is no guarantee that a worker stays joinable between these two calls.
if (worker.joinable()) {
worker.join();
This is a multi-threaded environment you can not assume run_task is being called from only one thread. You must assume that it can be called from any number of threads.
{
std::lock_guard<std::mutex> task_lock{ task_mu };
// There is no guarantee that `current_task` stays the same
// between here and when you create a new worker thread.
// As soon as the scope is exited another thread can enter
// and overwrite `current_task` before the first thread
// pushes `work` onto the thread queue or an existing worker
// picks it up
current_task = std::move(task);
}
Design
Usually thread pools are created with a fixed number of threads. Jobs are added to a queue. When a thread becomes available it pulls a job from the queue or waits on a condition variable until a job is available.
I dislike the design of adding threads. As this could spawn lots of threads when there is a sudden high workload that are never released. Threads are expensive resources to create. Also creating many threads does not mean more parallelism (it just means more swapping). A machine usually has an upper bound of available parallelism allocating more threads than this is usually counter productive).