Below is my source code for the implementation of a Ring/Circular buffer for use in audio processing. I have been using this implementation for several months now but I have some doubts about the efficiency of the insertion and removal functions as well as the correctness/necessity of my usage of std::atomic<size_t>
type index variables. The execution speed of the read and write functions is crucial to the overall execution speed of the DSP callbacks using these buffers so I would like to know if anyone can suggest any improvements to my design.
class Buffer {
public:
Buffer();
Buffer(size_t size);
Buffer(Buffer const& other);
Buffer& operator=(Buffer const& other);
virtual ~Buffer();
DSG::DSGSample& operator[](size_t const& index);
inline size_t const& Size()const;
protected:
DSG::DSGSample* _buffer;
size_t _size;
};
inline size_t const& DSG::Buffer::Size()const{
return _size;
}
//implementation
DSG::Buffer::Buffer():_size(0),_buffer(nullptr){}
DSG::Buffer::Buffer(size_t size):_size(size),_buffer(new DSG::DSGSample[size]){}
DSG::Buffer::Buffer(Buffer const& other) {
_buffer = new DSG::DSGSample[_size];
_size = other._size;
*this = other;
}
DSG::Buffer& DSG::Buffer::operator=(Buffer const& other){
if (_size!=other._size) {
if (_buffer!=nullptr) {
delete [] _buffer;
}
_size = other._size;
_buffer = new DSG::DSGSample[_size];
}
for (int i=0; i<_size; ++i) {
_buffer[i] = other._buffer[i];
}
return *this;
}
DSG::Buffer::~Buffer(){
if (_buffer!=nullptr) {
delete [] _buffer;
}
}
DSG::DSGSample& DSG::Buffer::operator[](size_t const& index){
#ifdef DEBUG
assert(index<_size);
#endif
return _buffer[index];
}
//ringbuffer
class RingBuffer:public DSG::Buffer {
protected:
std::atomic<size_t> _write;
std::atomic<size_t> _read;
size_t _count;
size_t MASK;
size_t write;
size_t read;
inline size_t next(size_t current);
inline size_t make_pow_2(size_t number);
public:
RingBuffer();
RingBuffer(const size_t size);
RingBuffer(RingBuffer& buffer);
RingBuffer& operator=(RingBuffer& buffer);
virtual ~RingBuffer();
inline bool Write(const DSGSample& elem);
inline bool Read(DSG::DSGSample& elem);
inline size_t const& Count()const;
inline bool Full()const;
inline bool Empty()const;
inline void Flush();
friend bool operator>>(DSG::DSGSample const& signal,DSG::RingBuffer& buffer){
return buffer.Write(signal);
}
friend bool operator<<(DSG::DSGSample& signal,DSG::RingBuffer& buffer){
return buffer.Read(signal);
}
#ifdef DEBUG
friend std::ostream& operator<<(std::ostream& os,DSG:: RingBuffer const& buffer){
if (!buffer.Empty()) {
size_t index= buffer._read;
size_t count=buffer.Count();
size_t size = buffer.Size();
for (int i=0; i<count; ++i) {
os<<index<<": "<<buffer._buffer[index]<<std::endl;
index = ((index+1)%size);
}
}return os;
}
#endif
};
inline bool DSG::RingBuffer::Full()const{
return _count==this->_size;
}
inline bool DSG::RingBuffer::Empty()const{
return _count==0;
}
inline void DSG::RingBuffer::Flush(){
_write.store(0,std::memory_order_relaxed);
_read.store(0,std::memory_order_relaxed);
_count=0;
}
inline bool DSG::RingBuffer::Write(const DSGSample& elem){
if (!Full()) {
write = _write.load(std::memory_order_acquire);
_write.store(next(write),std::memory_order_release);
this->_buffer[write] = elem;
++_count;
return true;
}else return false;
}
inline bool DSG::RingBuffer::Read(DSGSample& elem){
if (!Empty()) {
read = _read.load(std::memory_order_acquire);
_read.store(next(read),std::memory_order_release);
elem = this->_buffer[read];
--_count;
return true;
}else return false;
}
inline size_t const& DSG::RingBuffer::Count()const{
return _count;
}
//note: RingBuffer implementation will force a power of 2 size to allow use of bitwise increment.
inline size_t DSG::RingBuffer::next(size_t current){return (current+1) & MASK;}
inline size_t DSG::RingBuffer::make_pow_2(size_t number){
return pow(2, ceil(log(number)/log(2)));
}
//implementation
DSG:: RingBuffer::RingBuffer():Buffer(0),_read(0),_write(0),_count(0),MASK(0){}
DSG:: RingBuffer::RingBuffer(const size_t size):Buffer(make_pow_2(size)),_read(0),_write(0),_count(0){
MASK = this->_size-1;
}
DSG:: RingBuffer::RingBuffer(RingBuffer& buffer):Buffer(buffer){
_write.store(buffer._write.load(std::memory_order_acquire));
_read.store(buffer._read.load(std::memory_order_acquire));
_count = buffer._count;
MASK = buffer._size-1;
}
DSG:: RingBuffer& DSG:: RingBuffer::operator=(RingBuffer& buffer){
Buffer::operator=(buffer);
_write.store(buffer._write.load(std::memory_order_acquire));
_read.store(buffer._read.load(std::memory_order_acquire));
_count = buffer._count;
MASK = buffer._size-1;
return *this;
}
DSG:: RingBuffer::~RingBuffer(){Flush();}
NOTE: DSG::
is the namespace that encloses all of the code. I have removed the namespace declaration in the code I have posted to cut down on code length.
-
\$\begingroup\$ What's the use case? Are there multiple threads reading/writing from this ringbuffer simultaneously? \$\endgroup\$JS1– JS12015年06月06日 08:47:46 +00:00Commented Jun 6, 2015 at 8:47
-
1\$\begingroup\$ @JS1 the buffer is intended to be used in a audio processing system where it acts as the storage device for audio data as the data is passed in-between multiple processing routines. I was considering the possibility of breaking the processing up between multiple threads. \$\endgroup\$Alex Zywicki– Alex Zywicki2015年06月07日 00:31:51 +00:00Commented Jun 7, 2015 at 0:31
-
\$\begingroup\$ I have asked a follow up question that contains an updated version of the code up for review codereview.stackexchange.com/questions/93176/… \$\endgroup\$Alex Zywicki– Alex Zywicki2015年06月10日 06:07:57 +00:00Commented Jun 10, 2015 at 6:07
2 Answers 2
Some basic comments
As a small point to begin with, I think your code to do with some more whitespace, especially when starting a new method. The way it is currently, it's very densely packed, which makes it harder to read.
Your operator=
has no exception safety, and does not check for self-assignment. The copy and swap idiom is the easiest way to fix this (if you implement a swap
function). If not, always remember to allocate the new buffer before deleting the old one; if new
throws, this will then leave everything in a consistent state. Further, delete
is a no-op on a nullptr
, so doing the check isn't needed.
DSG::Buffer& DSG::Buffer::operator=(Buffer const& other)
{
if(this != &other) {
if(_size == other._size) {
std::copy(other._buffer, other._buffer + other._size, _buffer);
}
else {
new_buf = new DSG::DSGSample[other._size];
std::copy(other._buffer, other._buffer + other._size, new_buf);
delete[] _buffer;
_size = other._size;
_buffer = new_buf;
}
}
return *this;
}
Same again with the destructor: the nullptr
check isn't needed.
Depending on what you have available to you, using a std::vector
may be a better choice, as it will free you from having to write all of this.
You use the pattern }else return false;
a few times. This looks clunky. Just separate it onto a new line and return false
.
if(some_condition) {
....
return true;
}
return false;
Multithreading Issues
If you were planning on make this work in a multithreaded environment, you have quite a lot of work to do. For a start:
- Your
_count
variable will need to be atomic.Full
,Empty
,Write
andRead
are all completely non-threadsafe at the moment. - Using
memory_order_relaxed
triggers massive warning bells. Unless you 100% know what you are doing, aren't working on x86(-64), have profiled the hell out of your code and KNOW that it is causing a bottleneck, you should avoid usingmemory_order_relaxed
. In this case, if you were in a multithreaded environment, it is almost certainly wrong.
To expand upon why this is wrong in this situation, the most important thing to know about memory_order_relaxed
is that it does no inter-thread synchronization. If you have one thread that calls Flush()
, and another thread that calls Read()
, there is absolutely no guarantee that the Read()
thread will see the update from Flush()
.
In fact, most of your methods will likely need locks. Suppose you get rid of your std::memory_order_relaxed
and have it use the default std::memory_order_seq_cst
. If we then have a thread that is in a call to Flush()
, it may complete the store
operations on _write
and _read
, and then be pre-empted. Another thread may then come along and call Read()
or Write()
, in which case you will be reading or writing the start value of the array, which is presumably not what you want. In a multithreaded environment, you want everything in Flush()
to complete atomically - so this would require a mutex
or a rethink of your design.
The same problems can happen with the Read()
and Write()
functions themselves.
The RingBuffer operator=
suffers from this on both sides: threads could be trampling all over each other inside Buffer::operator=
, giving you an inconsistent view of its member variables, and also potentially trampling over each other for any of the member variables in the RingBuffer
itself. Even with locks, it is very, very easy to write operator=
in an incorrect way. Generally, you will need to use std::lock
, and acquire locks on both this
AND the argument to operator=
:
RingBuffer& RingBuffer::operator=(const RingBuffer& buffer)
{
// Assuming each has a mutable std::mutex called mutex
// Need to use std::lock here to make sure things are always
// locked in the same order, so we can't deadlock
std::lock(mutex, buffer.mutex);
// Probably want to add each to a std::lock_guard<> here
// to make sure they are properly unlocked on scope exit.
// Now we can safely perform the actual operator= operations...
}
The thing to take away from this is that writing lock-free data structures is hard. It is not sufficient to simply use a few atomic variables here and there in a multithreaded environment; thread pre-emption will cause all sorts of havoc if you aren't careful. Further, using std::memory_order_relaxed
makes things very, very hard to reason about, and is best avoided.
Unfortunately, you may have to rethink the design of this. For a single threaded buffer it will work, but using std::atomic
is unnecessary. In a multithreaded environment, this has a number of problems.
Besides the remars of @Yuushi, there's some other stuff to improve:
Your code doesn't respect SRP: the
Buffer
class should either manage low level memory (I'm looking atDSG::DSGSample* Buffer::_buffer;
), or be a base for buffer specializations (but probably, not both). Since it is a base class, it should also not mix the responsibilities of being a base class and a concrete object (unless you have really good reasons for that).I would simply change it to a
final
class, with no VTBL. Then, I would encapsulate it in client code.This interface is incomplete:
Buffer& operator=(Buffer const& other);
If you are working with a derived class, you will get your objects sliced. The duplication of an object's data for class hierarchies, is usually done using the clone pattern.
The DEBUG macro use is redundant, because
assert
only means something in debugging configurations:DSG::DSGSample& DSG::Buffer::operator[](size_t const& index){ #ifdef DEBUG // this shouldn't be here assert(index<_size); #endif // neither should this return _buffer[index]; }
Checking for nullptr before deleting a pointer is redundant:
if (_buffer!=nullptr) { // this is unnecessary delete [] _buffer; }
The interface is a bit ambiguous:
inline bool Full()const; inline bool Empty()const;
These functions seem opposite (judging by their names).
friend bool operator>>(DSG::DSGSample const& signal,DSG::RingBuffer& buffer){ return buffer.Write(signal); } friend bool operator<<(DSG::DSGSample& signal,DSG::RingBuffer& buffer){ return buffer.Read(signal); }
These functions are redundant (unless you maintain this paradigm accross anything that can get data from a DSG::DSGSample in the code base). They also break the usual signature of the "stream reading" operator, which is supposed to return the stream, for call chaining. As a rule, you should have one and only one way to do an operation.