7
\$\begingroup\$

I am trying to create golang inspired cancellable context in C++ for use in worker threads and other tasks. I have tried to avoid using raw pointers, but ever since I started writing C++ code, I always feel I'm not sure how my code is behaving. I want feedback on my implementation and potential errors.

Here's an overview of usage. the app has a root shutdown context, each worker spawned from the main thread will use it. In case a worker thread decides to create subworkers, these should be able to be cancelled by the worker or by the root. Therefore the parent doesnt own the child. Its the worker that owns it. Cancelling a context goes from the top to bottom. the context should be shared between mutliple consumers of the same level.

This is my implementation which is inspired by the golang context package. Many const references were suggestions from the linter:

#pragma once
#include <atomic>
#include <condition_variable>
#include <memory>
#include <mutex>
#include <vector>
class Context : public std::enable_shared_from_this<Context> {
public:
 // Factory method to create a root context
 static std::shared_ptr<Context> createRoot() {
 return std::shared_ptr<Context>(new Context());
 }
 // Factory method to create a child context from a parent
 static std::shared_ptr<Context> fromParent(const std::shared_ptr<Context> &parent) {
 auto context = std::shared_ptr<Context>(new Context());
 if (parent) {
 parent->appendChild(context);
 context->parent = parent;
 }
 return context;
 }
 // Destructor
 ~Context() {
 cancel();
 detachFromParent();
 }
 // Initiates cancellation and propagates to children
 void cancel() {
 if (cancelled.exchange(true)) return; // Ensure cancellation happens only once
 std::vector<std::shared_ptr<Context> > childrenCopy; {
 std::lock_guard<std::mutex> lock(mtx);
 childrenCopy = children; // Copy children to avoid iterator invalidation
 }
 for (const auto &child: childrenCopy) {
 child->cancel();
 }
 cv.notify_all(); // Notify all waiting threads
 }
 // Checks if the context has been cancelled
 bool isCancelled() const {
 return cancelled.load();
 }
 // Waits until the context is cancelled
 void waitForCancel() {
 std::unique_lock<std::mutex> lock(mtx);
 cv.wait(lock, [&]() { return isCancelled(); });
 }
private:
 // Private constructor to enforce the use of factory methods
 Context() = default;
 // Adds a child context
 void appendChild(const std::shared_ptr<Context> &child) {
 std::lock_guard<std::mutex> lock(mtx);
 children.push_back(child);
 }
 // Removes a child context by raw pointer comparison
 void removeChild(const Context *child) {
 std::lock_guard<std::mutex> lock(mtx);
 for (auto it = children.begin(); it != children.end(); ++it) {
 if (it->get() == child) {
 children.erase(it);
 break;
 }
 }
 }
 // Detaches this context from its parent
 void detachFromParent() {
 if (auto parentPtr = parent.lock()) {
 parentPtr->removeChild(this);
 }
 }
 std::atomic<bool> cancelled{false}; // Atomic flag for cancellation state
 mutable std::mutex mtx; // Mutex for thread-safe operations
 std::condition_variable cv; // Condition variable for waiting on cancellation
 std::vector<std::shared_ptr<Context> > children; // Container for child contexts
 std::weak_ptr<Context> parent; // Weak pointer to parent to avoid circular references
};
toolic
14.3k5 gold badges29 silver badges200 bronze badges
asked Apr 1 at 11:38
\$\endgroup\$
0

3 Answers 3

3
\$\begingroup\$

Some of the ideas proposed by the other answer make sense, some of them not. I'm going to fix some of the ideas proposed by the other answer and provide additional suggestions here.

You might want to make std::mutex mutable

The other answer suggests to remove mutable on std::mutex, but this is just misapplying best practice of not using mutable in one of the few situations it actually makes sense.

Normally mutable is a code smell... unless the object you're applying it to would otherwise force (from the user's perspective) a thread safe read only operation non const. std::mutex, used in any const or non const context, will always be thread safe in applying locks, in the sense that two threads attempting to lock will not cause issues at the memory level, though it's up to you to prevent deadlocks. This means using a mutable std::mutex will not break the semantics of const functions. Not using mutable can lead to much worse usage of const_cast which actually should basically never be used unless you have a function call which incorrectly leaves out const (which is common in C apis) which doesn't actually modify the underlying variable.

see more on this idea here https://stackoverflow.com/questions/14131572/always-declare-stdmutex-as-mutable-in-c11

Your waitForCancel function is ostensibly a read only const operation from the users perspective, if you don't make std::mutex mutable, this can't be made const. However you actually don't make waitForCancel const, and the initial determination of making std::mutex for the reasons you outlined weren't justified in how you actually used std::mutex.

Don't inherit from jthread, that's just dead wrong.

In general you don't inherit from any standard library type unless it's explicitly meant to be inherited from (std::exception), this is not one of those objects, see https://stackoverflow.com/questions/6806173/subclass-inherit-standard-containers and https://stackoverflow.com/questions/78159940/can-i-access-the-value-of-a-derived-class-member-in-a-function-running-in-a-std

jthread does not have a virtual destructor, don't inherit from it. If you wish to utilize it you need to have it as a member.

Consider making your class moveable

std::thread and std::jthread can be moved, however your class does not allow this because:

  • std::atomic cannot be moved
  • std::mutex cannot be moved
  • std::condition variable cannot be moved.

This will make things very frustrating for users. The simplest and easiest way to make this possible is to wrap all of these objects in std::unique_ptr. To prevent bugs you'll also want to initialize these values.

 std::unique_ptr<std::atomic<bool>> cancelled= std::make_unique<std::atomic<bool>>({false}); // Atomic flag for cancellation state
 mutable std::unique_ptr<std::mutex> mtx = std::make_unique<std::mutex>(); // Mutex for thread-safe operations
 std::unique_ptr<std::condition_variable> cv = std::make_unique<std::condition_variable>(); // Condition variable for waiting on cancellation

Technically there's overhead for this, but that's because C++ does not properly encapsulate how atomics, mutexes, or condition variables actually work on hardware/ would not be possible to refer to the same object with out pointers, and having unmovable types is a compromise. There's probably a way around wrapping with unique ptr for some things (for example using std::atomic_ref, which allows you to apply atomic operations to a non atomic value), but that's outside the scope of this answer.

Consider replacing std::condition_variable with just using std::atomic if possible.

In C++20, you can just straight up wait, notify_one and notify_all with atomic variables, with the flexibility of atomic variables, this makes it much easier to use than condition variables in many situations. For example, you can replace your waitForCancel to just:

// Waits until the context is cancelled
void waitForCancel() {
 //waits for cancel to not be false
 cancelled.wait(false); 
}

Condition variables are generally a code smell in C++20+, generally you can just use atomics to replace them today.

answered Apr 2 at 19:03
\$\endgroup\$
5
\$\begingroup\$

Don't make mtx mutable

Making mtx mutable doesn't make sense; anything that takes the mutex also modifies some of the other non-mutable member variables, so this can never be done on a const object. And in fact you the only const member function you have doesn't touch mtx at all.

remove_child() can be expensive

If a context has a lot of children, remove_child() can be expensive because it is a \$O(N)\$ operation. This is even more problematic because it locks the mutex for the whole operation. Consider using a std::unordered_set instead of a std::vector.

Use std::make_shared()

You can avoid some memory allocations by using std::make_shared() instead of std::shared_ptr(new ...).

Consider making full use of std::jthread

Since C++20 there is std::jthread, which is a nicer version of std::thread. In particular, threads get a std::stop_token passed to them which they can check to see if a stop is requested (equivalent to isCancelled()). A stop is requested if the stop token is destroyed (same as with your Context). So if you use it everywhere, you already get some form of tree. consider:

#include <thread>
#include <iostream>
using namespace std::chrono_literals;
void bar(std::stop_token stop_token) {
 while (!stop_token.stop_requested()) {
 std::cout << "bar\n";
 std::this_thread::sleep_for(200ms);
 }
}
void foo(std::stop_token stop_token) {
 std::jthread bar_thread(bar);
 while (!stop_token.stop_requested()) {
 std::cout << "foo\n";
 std::this_thread::sleep_for(2000ms);
 }
}
int main() {
 std::jthread foo_thread(foo);
 std::this_thread::sleep_for(1000ms);
}

Here, main() launches a thread that runs foo(), which in turn launches a thread that runs bar(). The destructors at each scope will request a stop for the respective stop tokens. The program will terminate after 2 seconds.

Note however that in that example, it will print bar ten times, because when main() requests foo_thread to be stopped, it will not immediately propagate that to bar_thread. However, we can do that manually, by creating a std::stop_callback that propagates the stop request as soon as it is received:

void foo(std::stop_token stop_token) {
 std::jthread bar_thread(bar);
 std::stop_callback stop_callback(stop_token, [&]{
 bar_thread.request_stop();
 });
 while (!stop_token.stop_requested()) {
 std::cout << "foo\n";
 std::this_thread::sleep_for(2000ms);
 }
}

Now it only prints bar 5 times. But you have to remember to add this callback everywhere, which is annoying. It would be nice if we automatically propagate stop tokens when creating child threads. So consider creating a class pjthread, which is a propagating std::jthread:

class pjthread: public std::jthread {
 std::stop_callback<std::function<void()>> stop_callback;
public:
 template<class... Args>
 explicit pjthread(std::stop_token& stop_token, Args&&... args)
 : std::jthread(std::forward<Args>(args)...)
 , stop_callback(stop_token, [this]{ this->request_stop(); })
 {}
};

(Note, the above class is just a short example, inheriting from std::jthread like this is legal C++ but has potential issues, as elaborated by Krupip.) Then you can write:

void foo(std::stop_token stop_token) {
 pjthread bar_thread(stop_token, bar);
 while (!stop_token.stop_requested()) {
 std::cout << "foo...\n";
 std::this_thread::sleep_for(2000ms);
 }
}

Note that you can use std:stop_token and std::stop_callback without threads as well, so I think it provides all the functionality of your Context, and just needs some function or class to make it a bit more convenient to use in a propagating tree.

answered Apr 1 at 20:04
\$\endgroup\$
6
  • \$\begingroup\$ Thanks for pointing out the jthreads they are exactly what I was looking for. As for remove child, I thought that removing a child was an infrequent operation. And I optimized for fast full array traversal for fast cancellation. \$\endgroup\$ Commented Apr 2 at 8:36
  • \$\begingroup\$ The frequency of child removal is directly related to the number of threads being stopped. So if your threads are long-lived, it is fine. But then you also don't cancel often, so fast array traversal is perhaps not a very useful optimization? \$\endgroup\$ Commented Apr 2 at 9:09
  • \$\begingroup\$ True. I have edited the post added the new Worker Implementation. This used to depend on context, after switching to the stop_token I feel it makes more sence now? My goal is to make thread creation and management as transparent as possible. I saw that you suggested I inherit from the std::jthread. but I want the two phase initialization : create the object then run it. Why? because in the application I have many threads that depend on each other therefore being able to allocate ressources and do system checks before launching any component in the system. For example Graphics ressources. \$\endgroup\$ Commented Apr 2 at 10:05
  • 1
    \$\begingroup\$ I'm confused why you say don't make mutex mutable. Mutex is protecting some read only operations, which semantically would appear to make sense as const, for example waitForCancel(). While maybe not a big deal in this specific example, mutex's lock operations being non const is one of the biggest sources of needing mutable on const-semantic-ed functionality, forcing objects that would otherwise be able to be used as const parameters as non const. this can lead to the even worse usage of const_cast \$\endgroup\$ Commented Apr 2 at 18:19
  • \$\begingroup\$ Also is it even legal to inherit of off Jthread? Actually this answer has a lot of bad things about it stackoverflow.com/questions/78159940/… You're not supposed to use jthread like this, I'm making my own. \$\endgroup\$ Commented Apr 2 at 18:24
1
\$\begingroup\$

Thanks to @G. Sliepen for the helpful review and suggestions. I've updated the code accordingly to improve readability, efficiency, and maintainability.

Below is the revised version for future readers who may find this useful.

Feel free to suggest further improvements!

#pragma once
#include <iostream>
#include <stop_token>
#include <thread>
class Worker {
public:
 virtual ~Worker() = default;
 Worker() = default;
 void start(std::stop_token &parent_stop_token) {
 if (thread.joinable()) {
 throw std::runtime_error("Thread already running");
 }
 // start the execution thread
 thread =
 std::jthread([this, parent_stop_token](std::stop_token &stop_token) {
 try {
 run(stop_token);
 } catch (const std::exception &e) {
 std::cerr << "[Worker] Exception: " << e.what() << "\n";
 }
 });
 stop_callback.emplace(parent_stop_token, [this]() {
 thread.request_stop();
 });
 }
 void stop() {
 if (thread.joinable()) {
 thread.request_stop();
 }
 }
 virtual void run(std::stop_token stop_token) = 0;
 Worker(const Worker &) = delete;
 Worker &operator=(const Worker &) = delete;
 Worker(Worker &&) = delete;
 Worker &operator=(Worker &&) = delete;
private:
 std::jthread thread;
 std::optional<std::stop_callback<std::function<void()> > > stop_callback;
};
Sᴀᴍ Onᴇᴌᴀ
29.5k16 gold badges45 silver badges201 bronze badges
answered Apr 2 at 11:50
\$\endgroup\$
1
  • 2
    \$\begingroup\$ Normally you make a new question when you implement improvements, and you typically wait a while so that people can trickle in and give other suggestions on the initial post. \$\endgroup\$ Commented Apr 2 at 18:15

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.