1
\$\begingroup\$

I have made a thread pool which will dynamically create threads based on how many you need.

#pragma once
#include <thread>
#include <atomic>
#include <mutex>
#include <vector>
#include <functional>
#include <condition_variable>
class thread_pool {
public:
 thread_pool() : stop(false), busy_workers(0) { }
 ~thread_pool() {
 stop = true;
 task_available.notify_all();
 for (auto& worker : workers) {
 if (worker.joinable()) {
 worker.join();
 }
 }
 }
 void run_task(std::function<void()> task) {
 {
 std::lock_guard<std::mutex> task_lock{ task_mu };
 current_task = std::move(task);
 }
 {
 std::lock_guard<std::mutex> workers_lock{ workers_mu };
 if (workers.size() == busy_workers++) {
 workers.emplace_back(work);
 return;
 }
 }
 task_available.notify_one();
 }
private:
 std::atomic_bool stop;
 std::atomic_size_t busy_workers;
 std::vector<std::thread> workers;
 std::function<void()> current_task;
 std::condition_variable task_available;
 std::mutex task_mu;
 std::mutex workers_mu;
 std::function<void()> work = [&]() {
 while (true) {
 std::unique_lock<std::mutex> task_lock{ task_mu };
 task_available.wait(task_lock, [&]() { return current_task || stop; });
 if (!current_task && stop) return;
 auto task = std::move(current_task);
 task_lock.unlock();
 task();
 busy_workers--;
 }
 };
};

The only problem I have with it is the threads will be destroyed before all the tasks have been completed, even though I join all the threads in the destructor.

asked Jan 7, 2018 at 22:28
\$\endgroup\$

1 Answer 1

3
\$\begingroup\$

Bugs

You have several race conditions:

There is no guarantee that a worker stays joinable between these two calls.

 if (worker.joinable()) {
 worker.join();

This is a multi-threaded environment you can not assume run_task is being called from only one thread. You must assume that it can be called from any number of threads.

 {
 std::lock_guard<std::mutex> task_lock{ task_mu };
 // There is no guarantee that `current_task` stays the same
 // between here and when you create a new worker thread.
 // As soon as the scope is exited another thread can enter
 // and overwrite `current_task` before the first thread
 // pushes `work` onto the thread queue or an existing worker
 // picks it up
 current_task = std::move(task);
 }

Design

Usually thread pools are created with a fixed number of threads. Jobs are added to a queue. When a thread becomes available it pulls a job from the queue or waits on a condition variable until a job is available.

I dislike the design of adding threads. As this could spawn lots of threads when there is a sudden high workload that are never released. Threads are expensive resources to create. Also creating many threads does not mean more parallelism (it just means more swapping). A machine usually has an upper bound of available parallelism allocating more threads than this is usually counter productive).

answered Jan 8, 2018 at 20:16
\$\endgroup\$

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.