\$\begingroup\$
\$\endgroup\$
I think I made a mistake trying to start from the simplest version and working forward. This time I will present the final version, get all the fixes from here and incorperat them into the code, then I can reverse engineer the steps to get there.
The Parts of the server are:
This post deals only with JobQueue.
I know I should use std::jthread
. Unfortunately Apples C++ compiler currently does not support std::jthread
yet.
JobQueue.h
#ifndef THORSANVIL_NISSE_JOB_QUEUE_H
#define THORSANVIL_NISSE_JOB_QUEUE_H
/*
* The class that holds all the background threads and work that the threads will do.
*
* Constructor creates all the child threads.
* New jobs added via `addJob()` which will then be executed ASAP by one of the threads.
*/
#include "NisseConfig.h"
#include <queue>
#include <vector>
#include <functional>
#include <optional>
#include <thread>
#include <mutex>
#include <condition_variable>
namespace ThorsAnvil::Nisse
{
using Work = std::function<void()>;
class JobQueue
{
std::vector<std::thread> workers;
std::mutex workMutex;
std::condition_variable workCV;
std::queue<Work> workQueue;
bool finished;
public:
JobQueue(int workerCount);
~JobQueue();
void addJob(Work&& action);
void stop();
private:
std::optional<Work> getNextJob();
void processWork();
void markFinished();
};
}
#endif
JobQueue.cpp
#include "JobQueue.h"
#include <ThorsLogging/ThorsLogging.h>
using namespace ThorsAnvil::Nisse;
JobQueue::JobQueue(int workerCount)
: finished(false)
{
try
{
for (int loop = 0; loop < workerCount; ++loop) {
workers.emplace_back(&JobQueue::processWork, this);
}
}
catch (...)
{
stop();
throw;
}
}
JobQueue::~JobQueue()
{
stop();
}
void JobQueue::addJob(Work&& action)
{
std::unique_lock lock(workMutex);
workQueue.emplace(action);
workCV.notify_one();
}
void JobQueue::markFinished()
{
std::unique_lock lock(workMutex);
finished = true;
}
void JobQueue::stop()
{
markFinished();
workCV.notify_all();
for (auto& w: workers) {
w.join();
}
workers.clear();
}
std::optional<Work> JobQueue::getNextJob()
{
std::unique_lock lock(workMutex);
workCV.wait(lock, [&](){return !workQueue.empty() || finished;});
if (workQueue.empty() || finished) {
return {};
}
Work work = std::move(workQueue.front());
workQueue.pop();
return work;
}
void JobQueue::processWork()
{
while (!finished)
{
std::optional<Work> work = getNextJob();
try
{
if (work.has_value()) {
(*work)();
}
}
catch (std::exception const& e)
{
ThorsLogWarning("ThorsAnvil::Nissa::JobQueue::", "processWork", "Work Exception: ", e.what());
}
catch (...)
{
ThorsLogWarning("ThorsAnvil::Nissa::JobQueue::", "processWork", "Work Exception: Unknown");
}
}
}
Loki AstariLoki Astari
asked Oct 15, 2024 at 5:33
lang-cpp