I implemented work stealing queue inspired by Sean Parent's talk on code::dive 2016. Full implementation is here. I am looking to get feedback on improvements to make code more effective and common best practices.
TaskQueue
is thread safe task queue. It has pair of blocking Pop/Push
methods and non-blocking ones. Non-blocking methods are used in work stealing queue thread pool. Push
returns std::future
object. When m_done
flag is true tasks cannot be popped from the queue.
TaskQueue.h
#pragma once
#include <deque>
#include <optional>
#include <functional>
#include <condition_variable>
#include <future>
class TaskQueue
{
public:
using TaskType = std::function<void()>;
TaskQueue() = default;
~TaskQueue() = default;
TaskQueue(TaskQueue&&) = default;
TaskQueue& operator=(TaskQueue&&) = default;
void SetDone(bool done)
{
{
LockType lock{ m_mutex };
m_done = done;
}
if (done)
m_ready.notify_all();
}
auto IsDone() const
{
LockType lock{ m_mutex };
return m_done;
}
auto WaitAndPop(TaskType& task)
{
LockType lock{ m_mutex };
while (m_queue.empty() && !m_done)
m_ready.wait(lock);
if (!m_queue.empty() && !m_done)
{
task = std::move(m_queue.front());
m_queue.pop_front();
return true;
}
return false;
}
template<typename TTask>
auto Push(TTask&& task) // -> std::future<decltype(task())>
{
using TaskReturnType = decltype(task());
// std::packaged_task<> is move only type.
// We need to wrap it in a shared_ptr:
auto packagedTask = std::make_shared<std::packaged_task<TaskReturnType()>>(std::forward<TTask>(task));
auto future = packagedTask->get_future();
{
LockType lock{ m_mutex };
m_queue.emplace_back([packagedTask] { (*packagedTask)(); });
}
m_ready.notify_one();
return future;
}
auto TryPop(TaskType& task)
{
LockType lock{ m_mutex, std::try_to_lock };
if (!lock || m_queue.empty() || m_done)
return false;
task = move(m_queue.front());
m_queue.pop_front();
return true;
}
template<typename TTask>
auto TryPush(TTask&& task) -> std::optional<std::future<decltype(task())>>
{
using TaskReturnType = decltype(task());
// std::packaged_task<void()> is not movable
// We need to wrap it in a shared_ptr:
auto packagedTask = std::make_shared<std::packaged_task<TaskReturnType()>>(std::forward<TTask>(task));
auto future = packagedTask->get_future();
{
LockType lock{ m_mutex, std::try_to_lock };
if (!lock)
return {};
m_queue.emplace_back([packagedTask]() { (*packagedTask)(); });
}
m_ready.notify_one();
return future;
}
private:
using LockType = std::unique_lock<std::mutex>;
std::deque<TaskType> m_queue;
bool m_done{ false };
mutable std::mutex m_mutex;
std::condition_variable m_ready;
TaskQueue(const TaskQueue&) = delete;
TaskQueue& operator=(const TaskQueue&) = delete;
};
Work-stealing queue thread pool. Each thread has its own task queue. The main idea of work stealing is to Pop/Push
tasks in queue that is not locked by another thread.
WorkStealingQueueThreadPool.h
#pragma once
#include "TaskQueue.h"
#include <algorithm>
#include <thread>
class WorkStealingQueueThreadPool
{
public:
explicit WorkStealingQueueThreadPool(size_t threadCount = std::max(2u, std::thread::hardware_concurrency()));
~WorkStealingQueueThreadPool();
template<typename TaskT>
auto ExecuteAsync(TaskT&& task)
{
const auto index = m_queueIndex++;
for (size_t n = 0; n != m_queueCount*m_tryoutCount; ++n)
{
auto result = m_queues[(index + n) % m_queueCount].TryPush(std::forward<TaskT>(task));
if (result.has_value())
return std::move(*result);
}
return m_queues[index % m_queueCount].Push(std::forward<TaskT>(task));
}
private:
void Run(size_t queueIndex);
std::vector<TaskQueue> m_queues;
size_t m_queueIndex{ 0 };
const size_t m_queueCount;
const size_t m_tryoutCount{ 1 };
std::vector<std::thread> m_threads;
};
WorkStealingQueueThreadPool::WorkStealingQueueThreadPool(size_t threadCount)
: m_queues{ threadCount }
, m_queueCount{ threadCount }
{
for (size_t index = 0; index != threadCount; ++index)
m_threads.emplace_back([this, index] { Run(index); });
}
WorkStealingQueueThreadPool::~WorkStealingQueueThreadPool()
{
for (auto& queue : m_queues)
queue.SetDone(true);
for (auto& thread : m_threads)
thread.join();
}
void WorkStealingQueueThreadPool::Run(size_t queueIndex)
{
while (!m_queues[queueIndex].IsDone())
{
TaskQueue::TaskType task;
for (size_t n = 0; n != m_queueCount*m_tryoutCount; ++n)
{
if (m_queues[(queueIndex + n) % m_queueCount].TryPop(task))
break;
}
if (!task && !m_queues[queueIndex].WaitAndPop(task))
return;
task();
}
}
1 Answer 1
TaskQueue
I'm not particularly fond of the name m_done
. I would prefer it semantically inverted and called m_enabled
. Also it looks like m_done
is never set from inside of the queue and you could use std::atomic<bool>
for it and avoid a bunch of locking when you query the queue. It seems you already handle it changing dynamically and mid processing.
This:
while (m_queue.empty() && !m_done)
m_ready.wait(lock);
is equivalent to:
m_ready.wait(lock, [this](){ return m_done || !m_queue.empty(); });
I see that you're using std::packaged_task
which is nice and all but be aware that there are some subtle and annoying bugs with this class and Microsoft Visual Studio compilers. Like for example here: std::packaged_task not breaking promises on destruction?
.
In TryPush
you are constructing the shared std::packaged_task
before you get a lock. If you fail to get the lock you destroy the task. But as the task
argument was std::move
d into the packaged_task
, you must no longer use task
from the caller (ExecuteAsync
). It is also unnecessary work. I would build the packaged task at the caller and simply take a reference to shared_ptr
in TryPush
to avoid the unnecessary work and the undefined behaviour you got there. Worst case is you could end up executing a moved from task.
WorkStealingQueueThreadPool
The name is a bit convoluted and too many words. I'd look for something shorter.
The variable m_queueCount
is useless. Remove it and simply use m_queues.size()
.
(削除) This isn't necessary:
for (auto& thread : m_threads)
thread.join();
because std::thread
is automatically joined on destruction. (削除ここまで) My memory has failed me, if the thread isn't joined on destruction std::terminate
is called.
Looking at ExecuteAsync
here:
template<typename TaskT>
auto ExecuteAsync(TaskT&& task)
{
const auto index = m_queueIndex++;
for (size_t n = 0; n != m_queueCount*m_tryoutCount; ++n)
{
auto result = m_queues[(index + n) % m_queueCount].TryPush(std::forward<TaskT>(task));
if (result.has_value())
return std::move(*result);
}
return m_queues[index % m_queueCount].Push(std::forward<TaskT>(task));
}
There are a few problems namely that you std::forward
the value of task
to TryPush
. If the universal reference of task
has bound to an r-value reference then std::forward
will have the same effect as std::move
and thus task
is not required to contain a valid task for n!=0
and if it is added to a queue n>0
and subsequently executed you will have undefined behaviour. You must only std::forward
a universal reference exactly zero or one times.
Closing remarks
In the current implementation I cannot see how this multi-queue could be more effective than a correctly implemented single queue. But I would love to be proven wrong by benchmarks with source code.
-
\$\begingroup\$ Thanks for review! I 'll fix bugs soon. Regarding
std::thread
behavior please have a look at this. Regarding benchmark: you can look here. Multi-queue is almost twice faster for light weight tasks (90 ms against 180 ms). If you have any idea how comprehensive and fully correct bench-marking can be done please let me know. \$\endgroup\$Viktor– Viktor2017年07月18日 09:44:08 +00:00Commented Jul 18, 2017 at 9:44 -
\$\begingroup\$ @Viktor Hmm my memory has failed me then. I could have sworn
std::thread
joined on destruction. As for your benchmark unless you show the source I cannot trust the benchmark as you might just as well have botched the implementation of any of the candidates. \$\endgroup\$Emily L.– Emily L.2017年07月18日 10:26:08 +00:00Commented Jul 18, 2017 at 10:26 -
\$\begingroup\$ @EmilyL. Many consider this nearly to be a defect in std::thread's design. See github.com/isocpp/CppCoreGuidelines/issues/925 \$\endgroup\$Maikel– Maikel2017年07月18日 10:54:46 +00:00Commented Jul 18, 2017 at 10:54
-
-
\$\begingroup\$ @Viktor I optimised the code for single queue thread pool. Results here, code here. Note that the memory allocation tests are pretty much useless as 1) a smart compiler can remove the calls (clang does this). and 2) It's only a sys call and the memory allocation will be really fast as it so happens that there is a suitably large memory area already available after the last call. Which is why the results are so close to the empty task test, it essentially does nothing. \$\endgroup\$Emily L.– Emily L.2017年07月20日 15:41:00 +00:00Commented Jul 20, 2017 at 15:41
Explore related questions
See similar questions with these tags.
pop
by any thread in pool. \$\endgroup\$