I am interested in creating a class that manages a thread and executes SQL queries sequentially within it. The queries would be submitted from various parts of the program, and the results would be received via a promise. Could you please review my code for any potential issues?
#include <future>
#include <iostream>
#include <mutex>
#include <queue>
#include <string>
#include <thread>
#include <vector>
class Threader {
public:
Threader() noexcept
: worker{Threader::workerFn, std::cref(this->isWorking),
std::ref(this->cond), std::ref(this->todoMutex),
std::ref(this->todo)} {}
Threader(const Threader&) = delete;
Threader(Threader&&) = delete;
Threader& operator=(const Threader&) = delete;
Threader& operator=(Threader&&) = delete;
~Threader() {
this->isWorking = false;
this->cond.notify_one();
this->worker.join();
}
std::future<std::string> addWork(const std::string& arg) {
if (!this->isWorking) {
throw std::runtime_error{"Can't add work after finish"};
}
std::promise<std::string> promise;
std::future<std::string> future{promise.get_future()};
std::unique_lock l{this->todoMutex};
this->todo.emplace(arg, std::move(promise));
this->cond.notify_one();
return future;
}
private:
static std::string doSql(const std::string& query) noexcept {
try {
std::string output{};
// Send query to DB and get result (for simplicity - string result
// here); DB execution can throw
output = "Your result is: " + std::to_string(query.size());
return output;
} catch (...) {
return "Some error value";
}
}
static void workerFn(
const std::atomic<bool>& isWorking, std::condition_variable& cond,
std::mutex& todoMutex,
std::queue<std::pair<std::string, std::promise<std::string>>>&
todo) noexcept {
while (true) {
bool hasElements{};
std::string input;
{
std::unique_lock l{todoMutex};
hasElements = !todo.empty();
if (!hasElements) {
cond.wait_for(l, std::chrono::seconds(1));
hasElements = !todo.empty();
}
if (hasElements) {
input = std::move(todo.front().first);
}
}
if (hasElements) {
const auto result{Threader::doSql(input)};
{
std::scoped_lock l{todoMutex};
todo.front().second.set_value(result);
todo.pop();
}
} else if (!isWorking) {
break;
}
}
}
std::thread worker;
std::atomic<bool> isWorking{true};
std::mutex todoMutex;
std::condition_variable cond;
std::queue<std::pair<std::string, std::promise<std::string>>> todo;
};
int main() {
Threader th;
std::vector<std::future<std::string>> futures;
futures.push_back(th.addWork("SELECT * FROM something;"));
std::thread t([&th]() {
auto f{th.addWork("SELECT ...")};
std::cout << f.get() << std::endl;
});
t.join();
futures.push_back(th.addWork("INSERT INTO ...;"));
futures.push_back(th.addWork("UPDATE ..."));
for (auto& f : futures) {
std::cout << f.get() << std::endl;
}
return 0;
}
Expected output:
Your result is: 10
Your result is: 24
Your result is: 16
Your result is: 10
1 Answer 1
Separate concerns
Your Threader
class does too much at the same time: it manages a thread, a queue, and interacts with a databse. These things should be split up into separate classes and/or functions, in such a way that they can be combined together.
I recommend creating a thread-safe queue. This can be used to hold the arguments and promises. Then create a class that takes a function as an argument in its constructor, and will run a thread worker that calls that function on any work submitted to it. Your code could then look something like:
template<typename Return, typename... Args>
class Threader {
public:
Threader(std::function<Return(Args...)> function):
function(function), thread(Threader::worker, this) {}
std::future<Return> addWork(Args... args) {
std::promise<Return> promise;
auto future = promise.get_future();
queue.emplace(std::make_tuple(std::move(promise),
std::forward<Args>(args...)));
return future;
}
private:
void worker() {
while (auto item = queue.pop()) {
auto& [promise, args...] = *item;
promise.set_value(std::invoke(function, args...));
}
}
std::function<Return(Args...)> function;
std::jthread thread;
ThreadSafeQueue<std::tuple<std::promise<Return>, Args...>> queue;
};
Then your example code would look like:
static std::string doSql(const std::string& query) noexcept {
...
}
int main() {
Threader th(std::function(doSql));
std::vector<std::future<std::string>> futures;
futures.push_back(th.addWork("SELECT * FROM something;");
...
}
But now you can easily create another thread that calls a function other than doSql()
. Threader
is much simpler since it relies on a thread safe queue, which in turn is reuable for other things as well.
Don't mix atomics and mutexes
While isWorking
is atomic, it doesn't share the same atomic scope as anything protected by todoMutex
. This can lead to subtle problems. Just make isWorking
a non-atomic variable, and make sure you only access it with todoMutex
held.
The proper wait to wait for either a new element being pushed to the queue or the threader being destroyed is to write:
static void workerFn(...) {
while (true) {
std::pair<std::string, std::promise<std::string>> item;
{
std::lock_guard lock{todoMutex};
cond.wait(todoMutex, [&]{
return !todo.empty() || !isWorking;
});
if (todo.empty()) {
break;
}
item = std::move(todo.front());
todo.pop();
}
item.second.set_value(Threader::doSql(item.first));
}
}
Unnecessary check for isWorking
in addWork()
You are checking if isWorking
is false
in addWork()
, and throw an exception if so. But consider isWorking
is only set to false
in the destructor. At that point, no other threads should be using that instance of Threader
at all, because that would be undefined behavior. Think for example if they called it after the destructor had already fully completed.