note: This is an updated design after a previous review round.
I have written a class that handles listeners registering callbacks to receive messages. Registered callbacks can be replaced or removed. Code is multithreaded in that any thread can broadcast to the listeners, and any thread can add, replace or remove listeners.
For this second round, I have made a list of requirements for the class:
- there may be zero, one or multiple listeners
- listeners can be registered or deregistered from different threads
- each listener should be called exactly once for each message
- all listeners should receive all messages
- messages may originate from multiple sources in different threads
- each message should be forwarded to all listeners before the next message is processed (messages are not sent in parallel)
- each listener should receive the messages in the same order, i.e., the order in which they are provided to the broadcaster (from the view of the broadcaster)
- listeners can be removed or replaced
- listeners are uniquely identified by a cookie that remains valid if the listener is not deregistered
- the cookie remains valid if the listener is replaced
- anyone with the cookie should be able to deregister or replace the listener
- listeners should never be called again when deregistration completes
- the old listener should never be called again when replace completes
- listeners should be able to deregister or replace themselves when invoked, listeners do not need to be able to register more callbacks
- listeners are noexcept, so code shouldn’t deal with exceptions thrown by listeners
- trying to remove or replace the callback associated with an expired cookie is not an error, the operation should simply do nothing, and report to the caller that nothing was done.
- adding, replacing and removing listeners happens much less frequently than broadcasting.
A further consideration that is not enforced in code:
- Users are expected to read documentation that states which functions are safe to call from inside a listener, and which must not. (Ideal would be a way to check if a function is called from a listener, so we can error in that case. I can’t figure out a way to do that. How to distinguish the case where the lock in, e.g., add_listener can’t be acquired because we’re calling from the callback (can’t call lock, would deadlock), vs where it can’t be acquired because callbacks are running concurrently (calling lock is correct in that case)?)
Implementation
#include <vector>
#include <map>
#include <utility>
#include <optional>
#include <mutex>
#include <functional>
#include <type_traits>
// usage notes:
// 1. This class provides no lifetime management of the link, which means
// the user is responsible to ensure that either
// a. the registered callback outlives this broadcaster, or
// b. they revoke their callbacks before destroying any resource it
// points to.
// 2. None of the member functions of this class may be called from
// inside an executing listener, except
// replace_callback_from_inside_callback() and
// remove_listener_from_inside_listener(). Calling them anyway will
// result in deadlock.
// 3. replace_callback_from_inside_callback() and
// remove_listener_from_inside_listener() may _only_ be called from
// inside an executing listener.
template <class... Message>
class Broadcaster
{
public:
using listener = std::function<void(Message...)>;
using cookieType = int32_t;
// should never be called from inside a running listener
template <class F> requires (std::is_nothrow_invocable_r_v<void, F, Message...>)
[[nodiscard]] cookieType add_listener(F&& r_)
{
auto l = lock();
_targets.emplace_back(listener{ std::forward<F>(r_) });
_target_registry[++_lastID] = _targets.size() - 1;
return _lastID;
}
// should never be called from inside a running listener,
// in that case, use replace_listener_from_inside_listener()
template <class F> requires (std::is_nothrow_invocable_r_v<void, F, Message...>)
bool replace_listener(cookieType c_, F&& r_)
{
auto l = lock();
return replace_listener_impl(c_, listener{ std::forward<F>(r_) });
}
// should never be called from inside a running listener,
// in that case, use remove_listener_from_inside_listener()
bool remove_listener(cookieType c_)
{
auto l = lock();
return remove_listener_impl(c_);
}
void notify_all(const Message&... msg_) noexcept
{
auto l = lock();
// invoke all listeners with message
for (const auto& r: _targets)
r(msg_...);
// execute any updates to registered listeners
if (!_mutation_list.empty())
{
for (auto&& [cookie, r] : _mutation_list)
{
if (r.has_value())
replace_listener_impl(cookie, std::move(*r));
else
remove_listener_impl(cookie);
}
_mutation_list.clear();
}
}
// !! should only be called from inside a running listener!
template <class F> requires (std::is_nothrow_invocable_r_v<void, F, Message...>)
void replace_listener_from_inside_listener(cookieType c_, F&& r_)
{
_mutation_list.emplace_back(c_, listener{ std::forward<F>(r_) });
}
// !! should only be called from inside a running listener!
void remove_listener_from_inside_listener(cookieType c_)
{
_mutation_list.emplace_back(c_, std::nullopt);
}
private:
[[nodiscard]] auto lock()
{
return std::unique_lock<std::mutex>(_mut);
}
bool replace_listener_impl(cookieType c_, listener&& r_)
{
auto index = get_target_index(c_);
if (index)
{
// valid slot, update listener stored in it
auto it = _targets.begin() + *index;
*it = std::move(r_);
return true;
}
return false;
}
bool remove_listener_impl(cookieType c_)
{
auto index = get_target_index(c_);
if (index)
{
// valid slot, remove listener stored in it
auto it = _targets.begin() + *index;
_targets.erase(it);
// fixup slot registry, items past deleted one just moved one idx back
for (auto& [cookie, idx]: _target_registry)
{
if (idx > *index)
--idx;
}
// remove entry from slot registry
_target_registry.erase(c_);
return true;
}
return false;
}
std::optional<size_t> get_target_index(cookieType c_) const
{
auto map_it = _target_registry.find(c_);
if (map_it == _target_registry.end())
// cookie not found in registry
return {};
return map_it->second;
}
private:
std::mutex _mut;
std::map<cookieType, size_t> _target_registry;
cookieType _lastID = -1;
std::vector<listener> _targets;
std::vector<std::pair<cookieType, std::optional<listener>>> _mutation_list;
};
Example usage
#include <iostream>
#include <string>
#include <functional>
void freeTestFunction(std::string msg_) noexcept
{
std::cout << "from freeTestFunction: " << msg_ << std::endl;
}
struct test
{
using StringBroadcaster = Broadcaster<std::string>;
void simpleCallback(std::string msg_) noexcept
{
std::cout << "from simpleCallback: " << msg_ << std::endl;
}
void oneShotCallback(std::string msg_) noexcept
{
std::cout << "from oneShotCallback: " << msg_ << std::endl;
_broadcast.remove_listener_from_inside_listener(_cb_oneShot_cookie);
}
void twoStepCallback_step1(std::string msg_) noexcept
{
std::cout << "from twoStepCallback_step1: " << msg_ << std::endl;
// replace callback (so don't request to delete through return argument!)
_broadcast.replace_listener_from_inside_listener(_cb_twostep_cookie, [&](auto fr_) noexcept { twoStepCallback_step2(fr_); });
}
void twoStepCallback_step2(std::string msg_) noexcept
{
std::cout << "from twoStepCallback_step2: " << msg_ << std::endl;
}
void runExample()
{
auto cb_simple_cookie = _broadcast.add_listener([&](auto fr_) noexcept { simpleCallback(fr_); });
_cb_oneShot_cookie = _broadcast.add_listener([&](auto fr_) noexcept { oneShotCallback(fr_); });
_cb_twostep_cookie = _broadcast.add_listener([&](auto fr_) noexcept { twoStepCallback_step1(fr_); });
auto free_func_cookie = _broadcast.add_listener(&freeTestFunction);
_broadcast.notify_all("message 1"); // should be received by simpleCallback, oneShotCallback, twoStepCallback_step1, freeTestFunction
_broadcast.remove_listener(free_func_cookie);
_broadcast.notify_all("message 2"); // should be received by simpleCallback and twoStepCallback_step2
_broadcast.remove_listener(cb_simple_cookie);
_broadcast.notify_all("message 3"); // should be received by twoStepCallback_step2
_broadcast.remove_listener(_cb_twostep_cookie);
_broadcast.notify_all("message 4"); // should be received by none
}
StringBroadcaster _broadcast;
StringBroadcaster::cookieType _cb_oneShot_cookie;
StringBroadcaster::cookieType _cb_twostep_cookie;
};
int main(int argc, char **argv)
{
test t;
t.runExample();
return 0;
}
-
\$\begingroup\$ "one or multiple listeners"? Does that mean that there can't be zero listeners, or did you forget to include that in the requirements? \$\endgroup\$Toby Speight– Toby Speight2021年01月08日 16:52:13 +00:00Commented Jan 8, 2021 at 16:52
-
1\$\begingroup\$ @TobySpeight Ah, right. yes there could also be zero, and that should work fine (it does, was in the test case already). Thanks, i've corrected the text. \$\endgroup\$Diederick C. Niehorster– Diederick C. Niehorster2021年01月08日 21:05:17 +00:00Commented Jan 8, 2021 at 21:05
1 Answer 1
I have just one suggestion but you should evaluate in terms of performance. What I mean is that you should do a performance test with your current code and then with the new changes proposed to see if you gain.
You are using a lock() function inside the notify_all function.
void notify_all(const Message&... msg_) noexcept
{
auto l = lock();
// invoke all listeners with message
for (const auto& r: _targets)
r(msg_...);
// execute any updates to registered listeners
if (!_mutation_list.empty())
{
for (auto&& [cookie, r] : _mutation_list)
{
if (r.has_value())
replace_listener_impl(cookie, std::move(*r));
else
remove_listener_impl(cookie);
}
_mutation_list.clear();
}
}
The suggested change is to move the scope of that lock the close to where you need to lock it, for example close to the replace_listener_impl and remove_listener_impl.
void notify_all(const Message&... msg_) noexcept
{
// invoke all listeners with message
for (const auto& r: _targets)
r(msg_...);
// execute any updates to registered listeners
if (!_mutation_list.empty())
{
auto l = lock(); // First option
for (auto&& [cookie, r] : _mutation_list)
{
if (r.has_value())
replace_listener_impl(cookie, std::move(*r));
else
remove_listener_impl(cookie);
}
_mutation_list.clear();
}
}
Another alternative is to move the lock() calls that you have on the functions replace_listener and remove_listener to the _impl functions.
template <class F> requires (std::is_nothrow_invocable_r_v<void, F, Message...>)
bool replace_listener(cookieType c_, F&& r_)
{
// Remove auto l = lock();
return replace_listener_impl(c_, listener{ std::forward<F>(r_) });
}
Move it to replace_listener_impl
bool replace_listener_impl(cookieType c_, listener&& r_)
{
auto l = lock(); // Added
auto index = get_target_index(c_);
if (index)
{
// valid slot, update listener stored in it
auto it = _targets.begin() + *index;
*it = std::move(r_);
return true;
}
return false;
}
In any of the options that you will choose you should conduct a performance test to see what is more suitable for you, may be is slower or not, so better get the numbers so you have the evidence of what is better for you.
Hope it helps
-
\$\begingroup\$ Thanks for the comment! I can't move the lock, or it wouldn't be safe anymore. That is, if i move the lock down in
notify_all()
, or to the_impl
function, the vector of listeners could be changed while iterating over it, which would break stuff. Do i see that correctly? \$\endgroup\$Diederick C. Niehorster– Diederick C. Niehorster2021年01月08日 21:07:37 +00:00Commented Jan 8, 2021 at 21:07