Consider this class that consumes some items and handles them with the provided handler function:
#include <queue>
#include <mutex>
#include <atomic>
#include <thread>
#include <utility>
#include <condition_variable>
template < typename Item >
struct Handler
{
template < typename Callable >
Handler(Callable&& callable) : m_thread([&]()
{
for (std::queue<Item> queue; !m_isInterruptionRequested;)
{
{
std::unique_lock<std::mutex> lock(m_mutex);
m_queueIsNotEmpty.wait(lock, [&]{ return m_isInterruptionRequested || !m_queue.empty(); });
m_queue.swap(queue);
}
for (; !queue.empty(); queue.pop())
std::forward<Callable>(callable)(std::move(queue.front()));
}
}) { }
void addItem(Item&& item)
{
{
std::lock_guard<std::mutex> lock(m_mutex);
m_queue.push(std::move(item));
}
m_queueIsNotEmpty.notify_one();
}
~Handler()
{
m_isInterruptionRequested = true;
m_queueIsNotEmpty.notify_all();
m_thread.join();
}
private:
std::mutex m_mutex;
std::queue<Item> m_queue;
std::condition_variable m_queueIsNotEmpty;
std::atomic<bool> m_isInterruptionRequested = ATOMIC_VAR_INIT(false);
std::thread m_thread; // VM: Has to be the last field!
};
Any suggestions on what could be better in this implementation (e.g. a better way to handle the interruptions)?
Are there any hidden threats in this implementation that I don't see?
1 Answer 1
Small nitpicks
- Naming:
Handler
is very non-descriptive. Maybe tryConsumer
instead? - Naming:
addItem
does not describe aConsumer
s action,consume
orpass
would be a better fit IMHO
Design problems
Handler
is a queue fused with a single consumer, so there will never be more than one consumer for the queue!.
If the consumer is not required to run on its own thread, the implementation could be simplified to:
template<typename Item>
class SingleConsumer {
private:
std::function<void(Item&&)> consumer;
std::mutex mut; // can be removed if consumer is thread-safe on its own
public:
template<typename Callable>
SingleConsumer(Callable&& callable) : consumer(std::forward<Callable>(callable)) {}
void consume(Item&& item) {
std::unique_lock<std::mutex> lock(mut); // not necessary if consumer is thread-safe on its own
consumer(std::move(item));
}
};
On the other hand, if the requirement is that the consumer has to run on its own thread, why fuse it with its own queue? If there is only one queue, you could run multiple consumers on it to share the workload if needed and/or producers by themselves don't have to decide which consumer to call!
template<typename Item>
class ConcurrentQueue {
private:
std::mutex mut;
std::queue<Item> queue;
public:
ConcurrentQueue() : queue() {}
bool try_pop(Item& item) {
std::unique_lock<std::mutex> lock(mut);
if(queue.empty()) return false;
item = queue.front();
queue.pop();
}
void push(Item&& item) {
std::unique_lock<std::mutex> lock(mut);
queue.push(std::move(item));
}
};
template<typename Item>
class Consumer {
private:
std::thread consumer_thread;
ConcurrentQueue<Item>& queue;
std::atomic<bool> is_running = ATOMIC_VAR_INIT(true);
public:
template<typename Callable>
Consumer(ConcurrentQueue<Item>& q, Callable&& callable) : consumer_thread([callable]() {
run(std::forward<Callable>(callable));
}), queue(q) {}
~Consumer() {
is_running = false;
consumer_thread.join();
}
template<typename Callable>
void run(Callable&& consumer) {
Item item;
while(is_running) {
while(is_running && !queue.try_pop(item)) /* do nothing */;
if(!is_running) return;
consumer(item);
}
}
};
Of course, this example can be expanded according to your needs!
Explore related questions
See similar questions with these tags.