4
\$\begingroup\$

I am interested in creating a class that manages a thread and executes SQL queries sequentially within it. The queries would be submitted from various parts of the program, and the results would be received via a promise. Could you please review my code for any potential issues?

#include <future>
#include <iostream>
#include <mutex>
#include <queue>
#include <string>
#include <thread>
#include <vector>
class Threader {
 public:
 Threader() noexcept
 : worker{Threader::workerFn, std::cref(this->isWorking),
 std::ref(this->cond), std::ref(this->todoMutex),
 std::ref(this->todo)} {}
 Threader(const Threader&) = delete;
 Threader(Threader&&) = delete;
 Threader& operator=(const Threader&) = delete;
 Threader& operator=(Threader&&) = delete;
 ~Threader() {
 this->isWorking = false;
 this->cond.notify_one();
 this->worker.join();
 }
 std::future<std::string> addWork(const std::string& arg) {
 if (!this->isWorking) {
 throw std::runtime_error{"Can't add work after finish"};
 }
 std::promise<std::string> promise;
 std::future<std::string> future{promise.get_future()};
 std::unique_lock l{this->todoMutex};
 this->todo.emplace(arg, std::move(promise));
 this->cond.notify_one();
 return future;
 }
 private:
 static std::string doSql(const std::string& query) noexcept {
 try {
 std::string output{};
 // Send query to DB and get result (for simplicity - string result
 // here); DB execution can throw
 output = "Your result is: " + std::to_string(query.size());
 return output;
 } catch (...) {
 return "Some error value";
 }
 }
 static void workerFn(
 const std::atomic<bool>& isWorking, std::condition_variable& cond,
 std::mutex& todoMutex,
 std::queue<std::pair<std::string, std::promise<std::string>>>&
 todo) noexcept {
 while (true) {
 bool hasElements{};
 std::string input;
 {
 std::unique_lock l{todoMutex};
 hasElements = !todo.empty();
 if (!hasElements) {
 cond.wait_for(l, std::chrono::seconds(1));
 hasElements = !todo.empty();
 }
 if (hasElements) {
 input = std::move(todo.front().first);
 }
 }
 if (hasElements) {
 const auto result{Threader::doSql(input)};
 {
 std::scoped_lock l{todoMutex};
 todo.front().second.set_value(result);
 todo.pop();
 }
 } else if (!isWorking) {
 break;
 }
 }
 }
 std::thread worker;
 std::atomic<bool> isWorking{true};
 std::mutex todoMutex;
 std::condition_variable cond;
 std::queue<std::pair<std::string, std::promise<std::string>>> todo;
};
int main() {
 Threader th;
 std::vector<std::future<std::string>> futures;
 futures.push_back(th.addWork("SELECT * FROM something;"));
 std::thread t([&th]() {
 auto f{th.addWork("SELECT ...")};
 std::cout << f.get() << std::endl;
 });
 t.join();
 futures.push_back(th.addWork("INSERT INTO ...;"));
 futures.push_back(th.addWork("UPDATE ..."));
 for (auto& f : futures) {
 std::cout << f.get() << std::endl;
 }
 return 0;
}

Expected output:

Your result is: 10
Your result is: 24
Your result is: 16
Your result is: 10

https://godbolt.org/z/jPhs5Mvj7

Toby Speight
87.8k14 gold badges104 silver badges325 bronze badges
asked Mar 14 at 14:33
\$\endgroup\$

1 Answer 1

4
\$\begingroup\$

Separate concerns

Your Threader class does too much at the same time: it manages a thread, a queue, and interacts with a databse. These things should be split up into separate classes and/or functions, in such a way that they can be combined together.

I recommend creating a thread-safe queue. This can be used to hold the arguments and promises. Then create a class that takes a function as an argument in its constructor, and will run a thread worker that calls that function on any work submitted to it. Your code could then look something like:

template<typename Return, typename... Args>
class Threader {
public:
 Threader(std::function<Return(Args...)> function):
 function(function), thread(Threader::worker, this) {}
 std::future<Return> addWork(Args... args) {
 std::promise<Return> promise;
 auto future = promise.get_future();
 queue.emplace(std::make_tuple(std::move(promise),
 std::forward<Args>(args...)));
 return future;
 }
private:
 void worker() {
 while (auto item = queue.pop()) {
 auto& [promise, args...] = *item;
 promise.set_value(std::invoke(function, args...));
 }
 }
 std::function<Return(Args...)> function;
 std::jthread thread;
 ThreadSafeQueue<std::tuple<std::promise<Return>, Args...>> queue;
};

Then your example code would look like:

static std::string doSql(const std::string& query) noexcept {
 ...
}
int main() {
 Threader th(std::function(doSql));
 std::vector<std::future<std::string>> futures;
 futures.push_back(th.addWork("SELECT * FROM something;");
 ...
}

But now you can easily create another thread that calls a function other than doSql(). Threader is much simpler since it relies on a thread safe queue, which in turn is reuable for other things as well.

Don't mix atomics and mutexes

While isWorking is atomic, it doesn't share the same atomic scope as anything protected by todoMutex. This can lead to subtle problems. Just make isWorking a non-atomic variable, and make sure you only access it with todoMutex held.

The proper wait to wait for either a new element being pushed to the queue or the threader being destroyed is to write:

static void workerFn(...) {
 while (true) {
 std::pair<std::string, std::promise<std::string>> item;
 {
 std::lock_guard lock{todoMutex};
 cond.wait(todoMutex, [&]{
 return !todo.empty() || !isWorking;
 });
 if (todo.empty()) {
 break;
 }
 item = std::move(todo.front());
 todo.pop();
 }
 item.second.set_value(Threader::doSql(item.first));
 }
}

Unnecessary check for isWorking in addWork()

You are checking if isWorking is false in addWork(), and throw an exception if so. But consider isWorking is only set to false in the destructor. At that point, no other threads should be using that instance of Threader at all, because that would be undefined behavior. Think for example if they called it after the destructor had already fully completed.

answered Mar 14 at 23:06
\$\endgroup\$

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.