In a producer-consumer scenario sometimes we have to deal with the producer being much faster than consumer. The data loss is unavoidable, and we are OK with it, as long as the consumer always has the most recent data.
The answer is triple buffering. Endow the system with three buffers, which assume the roles of being presented
, ready
, and inprogress
, and let them change their roles according to the rules:
- Once the
presented
buffer is completely presented, swap roles withready
. - Once the
inprogress
buffer is completely produced, swap roles withready
.
It is possible (unlikely but still possible) for producer to fall behind. Consumer must detect such situation, and not present a stale buffer. In other words, the presented/ready
swap shall happen only if ready
has been updated since the last swap.
I am particularly not proud with the stale
flag being a part of termination flow.
PS: The fine-grain controls such as start
and stop
methods are intentionally not designed in.
Meanwhile, triple-buffer.h
#include <atomic>
#include <thread>
template<typename buffer, void produce(buffer *), void consume(buffer *)>
class triple_buffer {
std::atomic<buffer *> present;
std::atomic<buffer *> ready;
std::atomic<buffer *> inprogress;
std::atomic<bool> stop;
std::atomic_flag stale;
std::thread consumer_thread;
std::thread producer_thread;
void producer()
{
while (!stop) {
produce(inprogress.load());
inprogress = ready.exchange(inprogress);
stale.clear();
}
}
void consumer()
{
while (!stop) {
consume(present.load());
while (stale.test_and_set())
;
present = ready.exchange(present);
}
}
public:
triple_buffer() = delete;
triple_buffer(triple_buffer& other) = delete;
triple_buffer(triple_buffer&& other) = delete;
triple_buffer& operator=(triple_buffer& other) = delete;
triple_buffer(buffer bufs[3])
: present(&bufs[0])
, ready(&bufs[1])
, inprogress(&bufs[2])
{
stop.store(false);
stale.test_and_set();
produce(present);
consumer_thread = std::move(std::thread([this] { consumer(); }));
producer_thread = std::move(std::thread([this] { producer(); }));
}
~triple_buffer()
{
stop.store(true);
producer_thread.join();
stale.clear();
consumer_thread.join();
}
};
and a short demo.cpp
#include "triple-buffer.h"
#include <iostream>
#include <chrono>
#include <thread>
struct buffer {
int id;
int value;
};
void produce(buffer * buf)
{
static int value = 0;
buf->value = value++;
}
void consume(buffer * buf)
{
std::cout << buf->id << ": " << buf->value << '\n';
}
int main()
{
buffer bufs[3] { {0}, {1}, {2} };
triple_buffer<buffer, produce, consume> tb(bufs);
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
2 Answers 2
The concurrency looks bug-free to me, which is rare!
Some nits on C++ idioms:
It is extremely unusual to take a function pointer as a template parameter. This drastically limits the reusability of your class template. A more traditional, "STL-ish" interface would be
template<class Buffer, class Produce, class Consume>
class triple_buffer {
explicit triple_buffer(Buffer bufs[3]);
explicit triple_buffer(Buffer bufs[3], Produce produce, Consume consume);
// ...
};
Or, since produce
and consume
are used only inside the constructor — it immediately wraps them up in lambdas and passes them off to std::thread
— you don't even need them to be template parameters of the whole class. They can be template parameters of the constructor itself. Something like this:
template<class Buffer>
class triple_buffer {
template<class P, class C>
explicit triple_buffer(Buffer bufs[3], P produce, C consume) {
// ...
produce(present_.load());
producer_thread_ = std::thread([&, produce]() {
while (!stop_.load()) {
produce(inprogress_.load());
inprogress_ = ready_.exchange(inprogress_);
stale_.store(false);
}
});
// ...
}
};
(In C++14 you can capture produce
as [&, produce = std::move(produce)]
to avoid making a copy of it.)
Notice the CamelCase
template parameter names.
I also recommend sigilling the names of your member variables — ready_
rather than ready
, and so on. (Some coding style guides prefer m_ready
or d_ready
. No big deal.)
These two naming conventions, together, will help to alleviate some of your confusingly similar naming. Right now you've got a template parameter named produce
and a member function named producer
(and your test harness has a global function named produce
). Of course, I just "fixed" that naming issue by getting rid of the producer
member function altogether!
You use both atomic<bool> stop
and atomic_flag stale
. There's no significant difference between atomic<bool>
and atomic_flag
; you should pick one and stick to it. I recommend atomic<bool>
because it's one fewer thing for your coworkers to look up on cppreference.
inprogress = ready.exchange(inprogress);
This line is correct, but I really strongly encourage you to rewrite it as two lines anyway. "One side-effect per source line" is a good rule of thumb for concurrent code. And there really are two side-effects happening here.
Buffer *p = ready.exchange(inprogress);
inprogress.store(p);
Splitting it up this way makes it clear what's going on — in particular that the swap is not being effected atomically — and incidentally makes the code easier to stress-test (by adding instrumentation between each pair of lines) and debug (because now it's possible to pause the debugger between the two side-effects).
while (stale.test_and_set())
If you switch to consistently using atomic<bool>
, this line would become
while (stale.exchange(true))
consumer_thread = std::move(std::thread([this] { consumer(); }));
This line is overengineered. You don't need to std::move
a prvalue. And personally, I'd capture [&]
unless I had a special reason not to. So:
consumer_thread = std::thread([&]{ consumer(); });
Stylistically, I wonder why triple_buffer
takes a pointer to an array of three Buffer
objects, as opposed to either
taking three pointers to
Buffer
objects (which then needn't be adjacent in memory), orgiving
triple_buffer
three member variables of typeBuffer
.
-
2\$\begingroup\$ Thanks for the review. I already lost all hopes. And very good points indeed. \$\endgroup\$vnp– vnp2019年01月17日 04:10:33 +00:00Commented Jan 17, 2019 at 4:10
while (stale.test_and_set()) ;
This spinlock might be appropriate given that the producer is usually faster than the consumer.
If the consumer usually has to wait for the producer, then this is just an expensive space-heater, and we should consider waiting on a condition variable instead (but only enter that slow path if we didn't find it ready using the normal, fast, method).
-
\$\begingroup\$ Right. The very first sentence of a scenario description reads the producer being much faster than consumer. \$\endgroup\$vnp– vnp2021年04月24日 18:52:17 +00:00Commented Apr 24, 2021 at 18:52
-
\$\begingroup\$ Slightly re-worded. \$\endgroup\$Toby Speight– Toby Speight2021年04月24日 19:25:20 +00:00Commented Apr 24, 2021 at 19:25
atomic<bool> stale
to do away with the spinlock, e.g. in the consumer:stale.wait(true); stale.store(true)
and in the producer:stale.store(false); stale.notify_one()
. \$\endgroup\$