0
\$\begingroup\$

I think I made a mistake trying to start from the simplest version and working forward. This time I will present the final version, get all the fixes from here and incorperat them into the code, then I can reverse engineer the steps to get there.

The Parts of the server are:

This post deals only with JobQueue.

I know I should use std::jthread. Unfortunately Apples C++ compiler currently does not support std::jthread yet.

JobQueue.h

#ifndef THORSANVIL_NISSE_JOB_QUEUE_H
#define THORSANVIL_NISSE_JOB_QUEUE_H
/*
 * The class that holds all the background threads and work that the threads will do.
 *
 * Constructor creates all the child threads.
 * New jobs added via `addJob()` which will then be executed ASAP by one of the threads.
 */
#include "NisseConfig.h"
#include <queue>
#include <vector>
#include <functional>
#include <optional>
#include <thread>
#include <mutex>
#include <condition_variable>
namespace ThorsAnvil::Nisse
{
using Work = std::function<void()>;
class JobQueue
{
 std::vector<std::thread> workers;
 std::mutex workMutex;
 std::condition_variable workCV;
 std::queue<Work> workQueue;
 bool finished;
 public:
 JobQueue(int workerCount);
 ~JobQueue();
 void addJob(Work&& action);
 void stop();
 private:
 std::optional<Work> getNextJob();
 void processWork();
 void markFinished();
};
}
#endif

JobQueue.cpp

#include "JobQueue.h"
#include <ThorsLogging/ThorsLogging.h>
using namespace ThorsAnvil::Nisse;
JobQueue::JobQueue(int workerCount)
 : finished(false)
{
 try
 {
 for (int loop = 0; loop < workerCount; ++loop) {
 workers.emplace_back(&JobQueue::processWork, this);
 }
 }
 catch (...)
 {
 stop();
 throw;
 }
}
JobQueue::~JobQueue()
{
 stop();
}
void JobQueue::addJob(Work&& action)
{
 std::unique_lock lock(workMutex);
 workQueue.emplace(action);
 workCV.notify_one();
}
void JobQueue::markFinished()
{
 std::unique_lock lock(workMutex);
 finished = true;
}
void JobQueue::stop()
{
 markFinished();
 workCV.notify_all();
 for (auto& w: workers) {
 w.join();
 }
 workers.clear();
}
std::optional<Work> JobQueue::getNextJob()
{
 std::unique_lock lock(workMutex);
 workCV.wait(lock, [&](){return !workQueue.empty() || finished;});
 if (workQueue.empty() || finished) {
 return {};
 }
 Work work = std::move(workQueue.front());
 workQueue.pop();
 return work;
}
void JobQueue::processWork()
{
 while (!finished)
 {
 std::optional<Work> work = getNextJob();
 try
 {
 if (work.has_value()) {
 (*work)();
 }
 }
 catch (std::exception const& e)
 {
 ThorsLogWarning("ThorsAnvil::Nissa::JobQueue::", "processWork", "Work Exception: ", e.what());
 }
 catch (...)
 {
 ThorsLogWarning("ThorsAnvil::Nissa::JobQueue::", "processWork", "Work Exception: Unknown");
 }
 }
}
asked Oct 15, 2024 at 5:33
\$\endgroup\$

0

Know someone who can answer? Share a link to this question via email, Twitter, or Facebook.

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.