I require hundreds of equal, small sized buffers in my current project. The bulk of the computation in the program operates on data stored directly in those containers, so high performance is imperative. As the number of the buffers stays constant at runtime, my current approach is to use a std::vector<std::deque<MyObj>>
as the buffer, but I do not like the low cache locality (since the actual storage of each deque is spread on the heap) and possibility for memory fragmentation that this approach entails.
Thus I would like to replace my current container with something like a std::vector<somecontainer<MyObj>>
, where somecontainer has at least a part of the contained objects stored locally, within the container object itself, and thus in contigous storage in the vector.
My attempt to implement such a class is presented below. The container Ring
is implemented as a circular buffer, permitting fast insertion and deletion at either end, and supports indexed access to the contents. The size is fixed at compile time, and is restricted to powers of two in order to effectively utilize binary modular integer arithmetic. It is still incomplete, lacking mainly iterators and an assignment operator capable of handling Ring
objects to others with different capacities. I do not plan to add exception support to the class or bounds checking to the access functions.
Are there any serious pitfalls in the design, or any significant improvements to be made? Will the memory alignment of the elements be correct, and will move constructors be used when inserting elements if possible? Could the container even be adapted to work as a thread safe, single producer - single consumer queue by substituting the current indexes of type uintN_t with indexes of type std::atomic_uintN_t along with other small modifications?
Usage example:
Ring<int, 8> buf(4, 0);
buf.popFront();
buf.pushBack(1);
buf.pushFront(-1);
while(!buf.isFull())
buf.pushBack(2);
auto buf2(buf);
buf.clear();
for(size_t n = 0; n < buf2.getSize(); n++)
std::cout << buf2[n] << ", ";
//-1, 0, 0, 0, 1, 2, 2, 2,
Implementation:
#include <cstdint>
template<int N> struct UintByBits{ };
template<> struct UintByBits<8> { using type = uint8_t; };
template<> struct UintByBits<16> { using type = uint16_t; };
template<> struct UintByBits<32> { using type = uint32_t; };
template<> struct UintByBits<64> { using type = uint64_t; };
constexpr getReqdBitCount(uint64_t val)
{
return (val <= 0xFF) ? 8 : (
(val <= 0xFFFF) ? 16 : (
(val <= 0xFFFFFFFF) ? 32 : 64));
}
//MinUint is an alias for the smallest unsigned integer type where MaxVal fits
template<uint64_t MaxVal>
using MinUint = typename UintByBits<getReqdBitCount(MaxVal)>::type;
constexpr bool isPowerOfTwo(uint64_t val)
{
return val != 0 && (val & (val - 1)) == 0;
}
//A constant-sized dual-ended queue implemented as a ring buffer. The size of
//..the container may only be a power of two (this is a requirement of the
//..modular arithmetic performed internally)
template<typename T, size_t BufSize>
class Ring
{
static_assert(isPowerOfTwo(BufSize),
"'Ring' buffer size (template param 'BufSize') is not a power of two");
//Aligned raw memory array for constructing objects of type T into
using ElemT = typename std::aligned_storage<sizeof(T), alignof(T)>::type;
ElemT ring_[BufSize];
//Indexes to the raw memory pointig to the head and tail end of the ring.
//..The indexes are each guaranteed to have one extra "parity bit" of storage,
//..which is used for distinguishing between an empty ring and a full ring.
MinUint<BufSize> ringFront_;
MinUint<BufSize> ringBack_;
//Bit masks used in bitwise operations: INDEX_MASK is used to remove all bits
//..from an index that do not express the actual element position, PARITY_BIT
//..is used for masking and comparing to the parity bit.
static constexpr MinUint<BufSize> INDEX_MASK = BufSize - 1;
static constexpr MinUint<BufSize> PARITY_BIT = BufSize;
public:
//When the Ring object is first constructed, ringFront_ points to the first
//..element and ringBack_ to the last element in the container. Pushing one
//..element to either end causes the index of the affected end to roll around,
//..resulting in both indexes pointing to the same element, at which point the
//..element can also be popped from either end.
Ring() : ringFront_(0), ringBack_(BufSize - 1)
{ }
//Fill constructor: copy construct n elements
Ring(size_t n, const T &val) : ringFront_(0), ringBack_(BufSize - 1)
{
while(n-- > 0)
pushBack(val);
}
//Fill constructor: construct n elements using default constructor
explicit Ring(size_t n) : ringFront_(0), ringBack_(BufSize - 1)
{
while(n-- > 0)
pushBack();
}
//The assignment operator destroys all elements copy constructs new ones.
//..Perhaps a templated version with a bounds check could accept a different
//..sized Ring of the same value type?
Ring& operator=(const Ring &other)
{
if(this != &other)
{
clear();
size_t n = other.getSize();
while(n-- > 0)
pushFront(other[n]);
}
return *this;
}
//Copy constructor
Ring(const Ring &other) : ringFront_(0), ringBack_(BufSize - 1)
{
size_t n = other.getSize();
while(n-- > 0)
pushFront(other[n]);
}
~Ring()
{
clear();
}
//Get the element at the position 'position' relative to the front of the
//..ring (that is, the element returned by getFront()). Pushing/popping at
//..the front will thus change which element a given index points to, while
//..pushing/popping at the back will not. Accessing at indexes equal getSize()
//..NOT BOUNDS CHECKED
T &operator[] (size_t position)
{
return *static_cast<T*>(static_cast<void*>(
ring_ + ((ringFront_ + position) & INDEX_MASK))
);
}
const T &operator[] (size_t position) const
{
return *static_cast<const T*>(static_cast<const void*>(
ring_ + ((ringFront_ + position) & INDEX_MASK))
);
}
//Construct an element to the front. NOT BOUNDS CHECKED
template<typename ... Args>
void pushFront(Args&& ... args)
{
--ringFront_;
new(ring_ + (ringFront_ & INDEX_MASK)) T(std::forward<Args>(args)...);
}
//Destroy the element at the front. NOT BOUNDS CHECKED
void popFront()
{
getFront().~T();
++ringFront_;
}
//Get a reference to the front element. If size == 1 returns the same element
//..as getBack(). Undefined for an empty container.
T &getFront()
{
return *static_cast<T*>(static_cast<void*>(
ring_ + (ringFront_ & INDEX_MASK))
);
}
const T &getFront() const
{
return *static_cast<const T*>(static_cast<const void*>(
ring_ + (ringFront_ & INDEX_MASK))
);
}
//Construct an element to the back. NOT BOUNDS CHECKED
template<typename ... Args>
void pushBack(Args&& ... args)
{
++ringBack_;
new(ring_ + (ringBack_ & INDEX_MASK)) T(std::forward<Args>(args)...);
}
//Destroy the element at the front. NOT BOUNDS CHECKED
void popBack()
{
getBack().~T();
--ringBack_;
}
//Get a reference to the back element. If size == 1 returns the same element
//..as getFront(). Undefined for an empty container.
T &getBack()
{
return *static_cast<T*>(static_cast<void*>(
ring_ + (ringBack_ & INDEX_MASK))
);
}
const T &getBack() const
{
return *static_cast<const T*>(static_cast<const void*>(
ring_ + (ringBack_ & INDEX_MASK))
);
}
//Destroy all elements in the container.
void clear()
{
while(!isEmpty())
popFront();
}
bool isEmpty() const
{
//if the indexes are otherwise equal but the parity bits differ, the ring is empty
return (((ringBack_ + 1) ^ ringFront_) & (PARITY_BIT | INDEX_MASK)) == PARITY_BIT;
}
bool isFull() const
{
//if the indexes are equal including the parity bits, the ring is empty
return (((ringBack_ + 1) ^ ringFront_) & (PARITY_BIT | INDEX_MASK)) == 0;
}
size_t getSize() const
{
//distinguish between a full and an empty ring, otherwise it would never return 0
return isEmpty() ? 0 : ((ringBack_ - ringFront_) & INDEX_MASK) + 1;
}
constexpr size_t getCapacity() const
{
return BufSize;
}
};
-
\$\begingroup\$ What are getFront, popFront supposed to do when the ring is empty? The current behavior doesn't seem correct. Likewise PushFront when the ring is full. \$\endgroup\$Snowbody– Snowbody2015年04月13日 16:56:07 +00:00Commented Apr 13, 2015 at 16:56
-
\$\begingroup\$ No functions are bounds checked (for performance reasons), the user may call isFull() or isEmpty() before pushing or popping elements, respectively. \$\endgroup\$jms– jms2015年04月13日 17:04:09 +00:00Commented Apr 13, 2015 at 17:04
-
2\$\begingroup\$ @Snowbody: This is the behavior of the standard containers. You are supposed to either know or explicitly check yourself before use otherwise these methods degenerate into undefined behavior. \$\endgroup\$Loki Astari– Loki Astari2015年04月13日 19:32:42 +00:00Commented Apr 13, 2015 at 19:32
1 Answer 1
Here are some observations that may help you improve your program.
Add type to getReqdBitCount
The C++ standard does not allow definition of functions without type, so this program is technically malformed. I suspect that getReqdBitCount
should return unsigned
.
Use std::size_t
Since this is C++11, you should use the standard std::size_t
and #include <cstdlib>
rather than using the plain C version which may or may not be defined in <cstdint>
.
Use appropriate #include
s
In addition to <cstdlib>
mentioned above, the code is using std::aligned_storage
but does not #include <type_traits>
where that is defined and is using std::forward
but does not #include <utility>
.
Consider simplifying some of the templates
There are a series of templates and constexpr
functions near the top that are ulimately all about finding the smallest size for MinUint<BufSize>
but there are two changes I would like to suggest. The first is simplification. You can do all of that with just two templates:
#include <limits>
template <bool C, typename T, typename F>
using Conditional = typename std::conditional<C, T, F>::type;
template<uint64_t MaxVal>
using MinUint =
Conditional<std::numeric_limits<uint8_t>::max()>=MaxVal, uint_fast8_t,
Conditional<std::numeric_limits<uint16_t>::max()>=MaxVal, uint_fast16_t,
Conditional<std::numeric_limits<uint32_t>::max()>=MaxVal, uint_fast32_t,
uint_fast64_t
>
>
>;
The second change is to use types such as uint_fast8_t
instead of uint8_t
for the resulting type. There may be no difference, but the intent is to designate the fastest uint
type with at least 8 bits. Also note the use of std::numeric_limits
means that we don't have to manually type a bunch of potentially error-prone constant values. The difference is that if you type one too few F
s in a long constant, the compiler will happily generate the wrong code anyway, but if you make an error typing std::numeric_limits...
the compiler will halt with an error.
Think of the data cache
Each of your data structures has three data items -- ringFront_
, ringBack_
and the actual buffer. Because the cache line size in a typical machine these days is 64 bytes, you'll want to put the "hottest" (that is, the most frequently accessed) data toward the front of any object. For that reason, you should probably make the ring_
element the last of the three data items instead of the first to improve cacheing performance. Naturally, you should actually test the performance change (if any) rather than simply making assumptions.
A somewhat more radical possibility is to put all of the cache pointers in one object and all of the cache buffer area in another.
Consider keeping explicit count
At the moment, the isEmpty
, isFull
and getSize
are relatively complex because of the storage mechanism used. Instead, consider instead if a separate count_
variable were used. Then, those functions would be trivial:
bool isEmpty() const { return count_==0; }
bool isFull() const { return count_==BufSize; }
bool getSize() const { return count_; }
Increment and decrement would be done in each dequeue operation instead, but it may be worthwhile to check for a performance difference. This would also allow for the front and back to be pointers (*T
) rather than indices, which would also likely speed access.
-
\$\begingroup\$ Thanks for the review! I wonder about the performance of an explicit counter with pointers as opposed to indices, as it would remove the size restriction and likely speed up every operation except for pushing and popping, where a sequence of
++counter; if(ringFront_ >= ring_ + BufSize) ringFront_ = ring_;
would be required. What would the cost of the branch be, as opposed to the currently branchless code? While the memory overhead would be significant for very small sized containers, it wouldn't likely matter in most cases. \$\endgroup\$jms– jms2015年04月14日 06:46:44 +00:00Commented Apr 14, 2015 at 6:46 -
\$\begingroup\$ @jms: It's a good question and worth trying. Timing tests, should, of course be weighted with you intended use in mind. That is, optimize the things you use most often. \$\endgroup\$Edward– Edward2015年04月14日 14:17:28 +00:00Commented Apr 14, 2015 at 14:17
Explore related questions
See similar questions with these tags.