I'm trying to write a message queue implementation so my threads can exchange data. To keep things simple, the queue is only responsible for sending pointers to memory. Only one thread may send messages, and only one thread may read messages.
The struct mq_t
represents a message queue.
mq_t.messages
is a circular buffer of pointers.mq_t.send
is the index of the last sent message.mq_t.recv
is the index of the next message to be received.
I'd like to know some things:
- Thread Safety: I didn't use a mutex or lock, but I think the code is quite thread-safe. Is this true?
- In the code I use things like
while (isempty(queue));
. Is there a better way to handle such things, like suspending the thread until a memory change occurs? - Cross Platform: does my code rely on some non-cross platform code or undefined behaviour?
- Are there other things wrong with my code (except the highly generic function names)?
#include <stdlib.h>
#include <pthread.h>
#include <stdio.h>
#define null ((void*)0)
#define SIZE 2
typedef struct {
int send;
int recv;
void * messages[SIZE];
} mq_t;
inline int isfull(mq_t * queue) {
return (queue->send+1)%SIZE == queue->recv;
}
inline int isempty(mq_t * queue) {
return queue->send == queue->recv;
}
mq_t * create(void) {
mq_t * queue = (mq_t*)malloc(sizeof(mq_t));
queue->send = 0;
queue->recv = 0;
}
void send(mq_t * queue, void * message) {
while (isfull(queue));
int next = (queue->send+1) % SIZE;
queue->messages[next] = message;
queue->send = next;
}
void * recv(mq_t * queue) {
while (isempty(queue));
void * message = queue->messages[queue->send];
queue->recv = (queue->recv+1) % SIZE;
return message;
}
void destroy(mq_t * queue) {
while (!isempty(queue));
free(queue);
}
Test:
void * sender(void * queue) {
send(queue, "hello, ");
send(queue, "world");
send(queue, "how are you?");
printf("sent data\n");
}
void * recver(void * queue) {
sleep(1);
printf("received '%s'\n", recv(queue));
printf("received '%s'\n", recv(queue));
printf("received '%s'\n", recv(queue));
}
int main() {
mq_t * queue = create();
pthread_t s, r;
pthread_create(&s, null, sender, (void*)queue);
pthread_create(&r, null, recver, (void*)queue);
pthread_join(s, null);
pthread_join(r, null);
return 0;
}
Output:
received 'hello, ' received 'world' received 'how are you?' sent data
-
\$\begingroup\$ I don't think POSIX is portable to windows these days. I'm not very windows-savvy so I could easily be mistaken. \$\endgroup\$weberc2– weberc22015年02月27日 17:29:23 +00:00Commented Feb 27, 2015 at 17:29
2 Answers 2
With a size of 2, your 'queue' can hold only one item, so it is not really a queue. If you increase the size to 8 (say) you now really have a queue, but your tests don't work.
You seem to define send
and recv
as follows:
send
indicates the 'slot' holding the most recently entered data (queue not empty).recv
indicates the 'slot' before the oldest data in the queue (queue not empty).if
send == recv
the queue is empty.
So with both send
and recv
at zero, when the first data is written it goes to slot 1 and send
is incremented to 1; when the 2nd data is written it goes to slot 2 and send
is incremented to 2. If you now read from the queue, in recv
you use the send
counter to extract the data instead of using recv + 1
. Hence the queue fails.
A more logical arrangement might be as follows with the counters renamed in
and out
(naming them the same as the access functions is confusing):
in
indicates the next 'slot' to be written bysend
(queue not empty).out
indicates the 'slot' containing the next data to be read byrecv
(queue not empty).if
in == out
the queue is empty.
So with both send
and recv
at zero, when the first data is sent, it goes into slot 0 (to which in
and out
already point) and in
is incremented to 1. The first read now reads from out
at 0 - which is correct.
On thread safety, it is very difficult to see/analyse such things. You would be better to assume that it isn't (it certainly wouldn't be if there were more than 2 threads) and use a mutex. As your wait loops need to be rewritten (they just waste CPU cycles needlessly) and you are using Posix threads, use a Posix mutex and a "condition variable" to synchronise.
Here's an example, but there are many more on the web: http://www.ibm.com/developerworks/library/l-posix3/
Some minor coding points:
Add
const
to pointer parameters that do not change.Return a value from
create
Don't cast the return from
malloc
. Casting is necessary in C++ not C (where void* can be assigned to any pointer). In C, adding a cast can cause problems if you don't have the header formalloc
included, as the compiler will assume thatmalloc
returns anint
. Ifint
andpointer
have different sizes this is a problem.null
is not really needed. You can use either 0 or NULL.the
_t
type suffix is apparently reserved by Posix.
This code is surely not MTsafe:
void send(mq_t * queue, void * message) {
while (isfull(queue));
int next = (queue->send+1) % SIZE;
After while (isfull(queue));
and before queue->send = next;
, you can enter a bit of threads and writes in common slot before they update queue->send
pointer that cause isfull
to locked state.
Explore related questions
See similar questions with these tags.