I am developing a system in C for a device running Yocto Linux (Posix compatible, libc, etc.) on a microprocessor. The Yocto build is "quite generalized", as it's supposed to run several different services. This system is still resource-constrained, so optimization and fault tolerance are crucial. The purpose of this system is to take in a high volume of sensor data over a TCP socket, process it, and save it to a PostgreSQL database.
Current Design:
Data Reception: I use one thread to receive incoming data frames over a TCP Socket. This data is parsed and then inserted into a circular buffer.
Data Processing: Another thread pulls the data out of the circular buffer and writes it to the PostgreSQL database.
Here are some code examples for my design. For the receiving and insertion part.
typedef struct
{
u8 *Data;
size_t DataSize;
size_t Head;
size_t Tail;
size_t Count;
bool Full;
pthread_mutex_t WriteLock;
pthread_mutex_t ReadLock;
pthread_cond_t NotEmpty;
pthread_cond_t NotFull;
} circular_buffer;
// Function to receive a complete TCP frame
ssize_t RecvDataFrame(int socket_fd, uint8_t *buffer, size_t buffer_size) {
ssize_t total_bytes_read = 0;
// Receive header first
ReciveHeader:
while (total_bytes_read < HEADER_SIZE) {
ssize_t bytes_read = recv(socket_fd, buffer + total_bytes_read, HEADER_SIZE - total_bytes_read, 0);
if (BytesRead == -1) {
if (errno == EINTR) {
goto ReciveHeader;
} else {
return -1;
}
} else if (BytesRead == 0) {
return 0;
}
total_bytes_read += bytes_read;
}
// Calculate the full frame size based on header info
uint16_t frame_length = (buffer[4] << 8) | buffer[5];
ssize_t total_frame_size = HEADER_SIZE + frame_length;
// Ensure the frame fits in the buffer
if (total_frame_size > buffer_size) return -1;
// Receive the rest of the frame
ReciveFrame:
while (total_bytes_read < total_frame_size) {
ssize_t bytes_read = recv(socket_fd, buffer + total_bytes_read, total_frame_size - total_bytes_read, 0);
if (BytesRead == -1) {
if (errno == EINTR) {
goto ReciveFrame;
} else {
return -1;
}
} else if (BytesRead == 0) {
return 0;
}
total_bytes_read += bytes_read;
}
return total_bytes_read;
}
// Main function to handle TCP frames
errno Run(comm_protocol_api *Protocol1) {
Protocol1_ctx *Ctx = Protocol1->Ctx;
uint8_t Buf[MAX_TCP_FRAME];
while (true) {
ssize_t num_bytes = RecvDataFrame(Ctx->SockFd, Buf, sizeof(Buf));
if (num_bytes > 0) {
queue_item item;
CbInsert(Ctx->Cb, Buf, num_bytes); // Insert data into circular buffer
} else if (num_bytes == 0) {
LogDebug("Connection closed by server");
return SDBE_CONN_CLOSED_SUCS;
} else {
ogError("Error during read operation. Closing connection");
return -SDBE_CONN_CLOSED_ERR;
}
}
return 0;
}
// Function to insert data into a circular buffer
ssize_t CbInsert(circular_buffer *cb, void *data, size_t size) {
pthread_mutex_lock(&cb->write_lock);
// Wait if buffer is full.
//This might be redundant, Some of the data can be overWritten, while some other data cant.
// So I am thinking of including a bool in the struct.
while (cb_is_full(cb)) {
pthread_cond_wait(&cb->not_full, &cb->write_lock);
}
// Insert data into buffer with wrap-around handling
size_t first_chunk = min(size, cb->data_size - cb->head);
memcpy(cb->data + cb->head, data, first_chunk);
size_t second_chunk = size - first_chunk;
if (second_chunk > 0) {
memcpy(cb->data, data + first_chunk, second_chunk);
}
// Update buffer head and count
cb->head = (cb->head + size) % cb->data_size;
cb->count += size;
// Signal buffer is not empty and release lock
pthread_cond_signal(&cb->not_empty);
pthread_mutex_unlock(&cb->write_lock);
return size;
}
Questions:
1. Design Viability: Is this a solid system architecture for handling high-throughput sensor data on a resource-limited device, or should I consider something else?
2. Potential Bottlenecks: I am concerned that buffering the data from the TCP socket to an intermediate buffer before inserting it into the circular buffer might become a bottleneck.I might also just be dumb and it might not be a big deal. But would it be more efficient to stream the data directly into the circular buffer? Are there any techniques to manage high-throughput data in a fault-tolerant way without overwhelming the circular buffer? What are some other potential bottlenecks?
3. Optimizations: What other optimizations strategies would be recommended, given the constraints? Is there anything we should specifically watch out for with this system design?
Any advice on these aspects or potential pitfalls to consider would be greatly appreciated.
I can post full code if more context is needed.
I have tried researching parts of the system individually, but it's hard to find resources regarding a whole system design like this.
1 Answer 1
Regarding performance:
From lower-level embedded systems we may pick up the good practice "never use
memcpy
unless you actually have to". If the socket lib is already doing such through the API then that's already bad enough without the application adding further copying on top of that.Most often we can instead keep 2 or more buffers allocated (similar to
uint8_t Buf[MAX_TCP_FRAME];
) and just swap the pointers between them. Often there is no need to "zero out" old data either, but just to overwrite it.And regarding threading, it is good to minimize the time spent inside a mutex lock, since if we have lots of code there we are stalling other parts of the program. I would wildly guess that this part is the main bottleneck.
Though if you are to forward this data on someone else outside this module, there might seem to be no other way around it than hard copying. A different approach then could be to skip the local variable and instead pre-allocate a chunk using
malloc
so that the data will persist. Let the socket write directly into themalloc
buffer and then have the circular buffer work on pointers to allocated data instead of doing hard copies.malloc
does come with overhead, but if youmalloc
the next chunk just after receiving data, you may be able to time it so that you are doing malloc while the socket is still receiving the next incoming data packet.The standard nit-pick code review remark for structs is that
circular_buffer
is poorly optimized with regards to alignment. You shouldn't have a small type likebool
in the middle of a struct, put small types at the end to minimize the need for padding.
Regarding coding style:
The rule of thumb is to never use goto
because if you do, you have to suffer the old "goto considered harmful" debate yet again. Or as various industry coding standards usually put it: it's ok to use goto
for error handling, but only if you jump downwards.
In this specific case, goto
just makes readability worse. As does the while
loop even. The first loop could be replaced with an idiomatic, readable for
loop (and maybe less verbose names to get shorter lines, but that's a minor thing):
ssize_t total_read;
ssize_t bytes_read = 0;
for(total_read = 0; total_read < HEADER_SIZE; total_read += bytes_read)
{
bytes_read = recv(socket_fd, buffer + total_read, HEADER_SIZE - total_read, 0);
// error handling code is never fun to read so best write it as clear as possible
if(bytes_read == -1)
{
if(errno == EINTR)
{
bytes_read=0; /* ignore and continue */
}
else
{
return -1;
}
}
else if (bytes_read == 0)
{
return 0;
}
}
...
// next loop may have to skip the total_read initialization
-
\$\begingroup\$ Came here to ask OP if they knew about the keyword
continue
, but this is better. Continue is only barely better than goto. I would also question the naming convention ofcircular_buffer
, to me that looks like a variable rather then a type. I know that the_t
ending is sometimes frowned upon for user made typedefs, I would probably not hide thestruct
behind a typedef. \$\endgroup\$Gauthier– Gauthier2024年11月09日 18:19:41 +00:00Commented Nov 9, 2024 at 18:19
circular_buffer
: aside formcircular_buffer()
, what other functions use it? \$\endgroup\$