This code is slightly platform dependent but should be pretty easy to port. The goal with this code was to create a circular buffer where the consumer could be limited to read-only access of the buffer (the test uses threads but it could be ported to processes fairly easily.)
I think I probably made at least one mistake in this code and I'm not even sure it is technically possible to implement in a standards compliant way.
#define _GNU_SOURCE
#include <sched.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <string.h>
#include <pthread.h>
#include <xmmintrin.h>
#define BUFFER_SIZE 64U
#define PACKET_SIZE 128U
#define CUTOFF 2147483648U
typedef uint32_t seqno;
struct packet_header {
seqno production_count;
};
struct packet_payload {
char bytes[PACKET_SIZE - sizeof (struct packet_header)];
};
union packet {
struct {
struct packet_header header;
struct packet_payload payload;
} format;
char bytes[PACKET_SIZE];
};
static union packet volatile buffer[BUFFER_SIZE] = {{.format.header.production_count = -1}};
static void *producer(void *foo)
{
seqno producer_count = 0U;
for (;;) {
uint64_t payload_number = random();
struct packet_payload payload = {0};
memcpy(payload.bytes, &payload_number, sizeof payload_number);
__atomic_store_n(&buffer[producer_count % BUFFER_SIZE].format.header.production_count, -1, __ATOMIC_SEQ_CST);
/* This store needs to be SEQ_CST to prevent the write moving before it (whether via the compiler or the CPU */
buffer[producer_count % BUFFER_SIZE].format.payload = payload;
__atomic_thread_fence(__ATOMIC_RELEASE);
__atomic_store_n(&buffer[producer_count % BUFFER_SIZE].format.header.production_count, producer_count, __ATOMIC_RELEASE);
#if defined LOG
fprintf(stdout, "production count: %u, payload: %lu, produced\n", producer_count, payload_number);
#endif
producer_count = (producer_count + 1U) % CUTOFF;
}
}
static void *consumer(void *foo)
{
seqno consumer_count = 0U;
for (;;) {
seqno count = __atomic_load_n(&buffer[consumer_count % BUFFER_SIZE].format.header.production_count, __ATOMIC_ACQUIRE);
if ((seqno)-1 == count) {
/* the message hasn't been completed yet */
_mm_pause();
continue;
}
__atomic_thread_fence(__ATOMIC_ACQUIRE);
struct packet_payload payload = buffer[consumer_count % BUFFER_SIZE].format.payload;
/* Note that the double check needs to be SEQ_CST to prevent the read moving after it (whether via the compiler or the CPU */
if (count != __atomic_load_n(&buffer[consumer_count % BUFFER_SIZE].format.header.production_count, __ATOMIC_SEQ_CST)) {
/* torn read, skip packet */
consumer_count = (count + 1U) % CUTOFF;
continue;
}
if (consumer_count > count) {
/* we have incremented the consumer count enough to catch up to the producer */
_mm_pause();
continue;
}
uint64_t payload_number;
memcpy(&payload_number, payload.bytes, sizeof payload_number);
#if defined LOG
fprintf(stderr, "production count: %u, payload: %lu, consumed\n", count, payload_number);
#endif
consumer_count = (count + 1U) % CUTOFF;
}
}
int main(void)
{
pthread_t xx;
pthread_create(&xx, 0, producer, 0);
pthread_create(&xx, 0, consumer, 0);
pthread_exit(0);
}
1 Answer 1
Merge cases
In the consumer, you have two separate checks:
if ((seqno)-1 == count) { /* the message hasn't been completed yet */ _mm_pause(); continue; }
and later:
if (consumer_count > count) { /* we have incremented the consumer count enough to catch up to the producer */ _mm_pause(); continue; }
Since consumer_count
doesn't change in between those two blocks of code, you could move the second check up and merge it with the first one:
if ((seqno)-1 == count || consumer_count > count) {
/* the message hasn't been completed yet, or we have */
/* incremented the consumer count enough to catch up to the producer */
_mm_pause();
continue;
}
Moving the second check up could actually help you avoid dropping a packet that didn't need to be dropped.
production_count
to look for possibly torn reads. But yes, technically this is undefined behaviour. I wish there was a way to get around that but I don't think its possible without giving up the desirable property of a read only consumer. \$\endgroup\$