I have implemented a producer consumer problem, following the resources below:
I have used mutex_t
and sem_t
. The reason for using mutex_t
instead of binary semaphore
because of my understanding- mentioned here.
And hence I have avoided using sem_t
as binary
.
Also the Oracle doc link I have provided uses two mutexes
, sem_t & cmut
, whereas I have used only one mutex_t
object as suggested in the CSEE link provided.
Is it OK? Please review my code and provide your valuable comments.
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
#define MAX 10
typedef struct{
char buff[MAX];
int in;
int out;
int count;
pthread_mutex_t mutex;
sem_t empty;
sem_t full;
}BUFFER;
BUFFER queue;
char substring[5];
void initializeBuffer(BUFFER* b)
{
b->count = 0;
b->in = 0;
b->out = 0;
pthread_mutex_init(&(b->mutex),NULL);
sem_init(&(b->full),0,0);
sem_init(&(b->empty),0,MAX);
}
void destroyBuffer(BUFFER* b)
{
b->count = 0;
b->in = 0;
b->out = 0;
pthread_mutex_destroy(&(b->mutex));
sem_destroy(&(b->full));
sem_destroy(&(b->empty));
}
void producer(BUFFER* b,char c)
{
sem_wait(&(b->empty));
pthread_mutex_lock(&(b->mutex));
b->buff[b->in]=c;
(b->in)++;
(b->in) %= MAX;
(b->count)++;
pthread_mutex_unlock(&(b->mutex));
sem_post(&(b->full));
}
char consumer(BUFFER* b)
{
sem_wait(&(b->full));
pthread_mutex_lock(&(b->mutex));
char val=(b->buff[b->out]);
(b->out)++;
(b->out) %= MAX;
(b->count)--;
pthread_mutex_lock(&(b->mutex));
sem_post(&(b->empty));
return val;
}
void* THREAD_PRODUCER(void* arg)
{
char array[]="HelloWorld";
int i=0;
printf("Producer Thread Created\n");
while(array[i] != '0円')
{
producer(&queue,array[i]);
i++;
}
pthread_exit((void*) 1);
}
void* THREAD_CONSUMER(void* arg)
{
int i=0;
printf("Consumer Thread Created\n");
for(i=0;i<5;i++)
{
substring[i]=consumer(&queue);
}
pthread_exit((void*) 1);
}
int main()
{
pthread_t T1,T2;
initializeBuffer(&queue);
pthread_create(&T1,NULL,THREAD_PRODUCER,NULL);
pthread_create(&T2,NULL,THREAD_CONSUMER,NULL);
pthread_join(T1,NULL);
pthread_join(T2,NULL);
printf("Inside queue: %c\n",queue.buff[queue.out]);
printf("substring: %s\n",substring);
destroyBuffer(&queue);
return 0;
}
The output is:
$ ./ProducerConsumer.exe Producer Thread Created Consumer Thread Created Inside queue: W substring: Hello
1 Answer 1
1. Bugs
It doesn't work! On OS X 10.9.4, with Clang/LLVM, when I run it, I get the output:
Producer Thread Created Consumer Thread Created
... and then it hangs. Here's an LLDB session:
(lldb) run Process 29359 launched: './a.out' (x86_64) Producer Thread Created Consumer Thread Created ^C (lldb) thread backtrace all * thread #1: tid = 0x5461f, 0x00007fff8752ca3a libsystem_kernel.dylib`__semwait_signal + 10, queue = 'com.apple.main-thread', stop reason = signal SIGSTOP * frame #0: 0x00007fff8752ca3a libsystem_kernel.dylib`__semwait_signal + 10 frame #1: 0x00007fff862c17f3 libsystem_pthread.dylib`pthread_join + 433 frame #2: 0x0000000100000e17 a.out`main + 135 at cr55792.c:102 thread #2: tid = 0x54646, 0x00007fff8752c746 libsystem_kernel.dylib`__psynch_mutexwait + 10 frame #0: 0x00007fff8752c746 libsystem_kernel.dylib`__psynch_mutexwait + 10 frame #1: 0x00007fff862c0779 libsystem_pthread.dylib`_pthread_mutex_lock + 372 frame #2: 0x0000000100000c5b a.out`consumer(b=0x0000000100001070) + 155 at cr55792.c:66 frame #3: 0x0000000100000d57 a.out`THREAD_CONSUMER(arg=0x0000000000000000) + 71 at cr55792.c:90 frame #4: 0x00007fff862bd899 libsystem_pthread.dylib`_pthread_body + 138 frame #5: 0x00007fff862bd72a libsystem_pthread.dylib`_pthread_start + 137
Thread #1 is the main thread. It is waiting at line 102 to join the consumer thread:
pthread_join(T2,NULL);
Thread #2 is the consumer. It is waiting to take the lock in the function
consumer
at line 66:pthread_mutex_lock(&(b->mutex));
The producer thread has finished.
Obviously the mistake is at line 66 of
consumer
: it should saypthread_mutex_unlock
. The question for you is, how did you miss this? This is not some mysterious race condition: it ought not to be possible forconsumer
to take the lock twice in succession.With bug #1 fixed, the program runs to completion. Sometimes it produces the correct output:
Producer Thread Created Consumer Thread Created Inside queue: W substring: Hello
but about half the time it produces the output:
Consumer Thread Created Producer Thread Created Inside queue: W substring:
Why is this? It's because this call in
consumer
is not blocking when it should:sem_wait(&(b->full));
Let's see why, by changing this to:
if (sem_wait(&(b->full)) != 0) { perror("sem_wait"); exit(EXIT_FAILURE); }
Sure enough, this check fails with:
sem_wait: Bad file descriptor
Reading the manual page for
sem_wait
:[EINVAL]
sem
is not a valid semaphore descriptor.Why is that? Let's add an error check to the
sem_init
call:if (sem_init(&(b->full),0,0) != 0) { perror("sem_init"); exit(EXIT_FAILURE); }
This check fails with:
sem_init: Function not implemented
This is because OS X does not support unnamed semaphores (
sem_init
,sem_destroy
). On this operating system, it's necessary to use named semaphores (sem_open
,sem_close
,sem_unlink
) instead (that is, if you need to restrict yourself to POSIX; if you're writing OS X-only code then you could use the Mach semaphores).The lesson here is that writing portable Unix programs is hard! I wouldn't necessarily suggest that you replace your unnamed semaphores with named semaphores (although that's what I've done in my revised code below), just that you check the result of
sem_init
and other functions so that portability problems like this are easily discovered and diagnosed.
2. Review
You don't check error codes! Read the manual pages for
sem_wait
,pthread_mutex_lock
and so on and you'll see that they can all fail. Checking error codes would have immediately identified the cause of problem #2 above.Checking for errors is vital in multi-threaded code, where race conditions are often hard to reproduce. I was lucky that this particular error occurred about 50% of the time: had it been rare I would probably have missed it altogther.
Always check error codes! It's burdensome to start with, but not technically difficult, to develop a simple error-reporting system, and apply it to all fallible function calls. And once in a while this will save your ass.
There are no comments. What do these functions do? What is the meaning of each variable?
Some of your names are lower case (
queue
,producer
), some are uppercase (BUFFER
,THREAD_PRODUCER
), and some are mixed (initializeBuffer
), but there doesn't seem to be any consistent meaning in this distinction. It's worth paying careful attention to names.You use names inconsistently. For example, is it a
BUFFER
or aqueue
?The names
T1
andT2
are rather obscure. One is the producer thread and the other the consumer thread, but which is which? It is hard to remember.In the
BUFFER
structure, thein
andout
members are non-negative indexes intobuff
, and thecount
member is a non-negative count of unconsumed items, so they should be represented by unsigned integers, for examplesize_t
.The
count
member is not actually used for anything. Use it or remove it! I would suggest using it by adding an assertion before you decrement it.You have unnecessary parentheses in expressions like:
&(b->empty) (b->in)++;
which suggests that you are not confident about operator precedence and associativity. I would suggest having an operator precedence table open in a tab somewhere until you get used to it, rather than adding parentheses when you are unsure.
You'll see from the table that
->
has higher precedence than&
, so it's safe to write:&b->empty
You'll also see that
->
and (postfix)++
have the same precedence but associate from left to right, so it's safe to write:b->in++;
There's no need to call
pthread_exit
at the end of the thread function. If you read the manual, you'll see that:An implicit call to
pthread_exit()
is made when a thread other than the thread in whichmain()
was first invoked returns from the start routine that was used to create it. The function's return value serves as the thread's exit status.
3. Revised code
#include <assert.h>
#include <pthread.h>
#include <semaphore.h>
#include <stdio.h>
#include <stdlib.h>
/* If expr is false, print error message and exit. */
#define CHECK(expr, msg) \
do { \
if (!(expr)) { \
perror(msg); \
exit(EXIT_FAILURE); \
} \
} while (0)
/* Thread-safe queues */
#define MAX 10 /* maximum number of characters in queue */
typedef struct {
char buffer[MAX]; /* circular buffer */
size_t in; /* position of producer in buffer */
size_t out; /* position of consumer in buffer */
size_t count; /* number of unconsumed items */
pthread_mutex_t mutex; /* mutex protecting buffer, in, out, count */
sem_t *empty; /* semaphore counting empty slots in buffer */
sem_t *full; /* semaphore counting full slots in buffer */
} queue_s, *queue_t;
void queue_init(queue_t q)
{
q->count = 0;
q->in = 0;
q->out = 0;
CHECK(pthread_mutex_init(&q->mutex, NULL) == 0, "pthread_mutex_init");
q->full = sem_open("full", O_CREAT, 0600, 0);
CHECK(q->full != SEM_FAILED, "sem_open");
CHECK(sem_unlink("full") == 0, "sem_unlink");
q->empty = sem_open("empty", O_CREAT, 0600, MAX);
CHECK(q->empty != SEM_FAILED, "sem_open");
CHECK(sem_unlink("empty") == 0, "sem_unlink");
}
void queue_destroy(queue_t q)
{
CHECK(pthread_mutex_destroy(&q->mutex) == 0, "pthread_mutex_destroy");
CHECK(sem_close(q->full) == 0, "sem_close");
CHECK(sem_close(q->empty) == 0, "sem_close");
}
/* Wait, if necessary, for a slot to become available in the queue,
* and then append the character c. */
void queue_append(queue_t q, char c)
{
CHECK(sem_wait(q->empty) == 0, "sem_wait");
CHECK(pthread_mutex_lock(&q->mutex) == 0, "pthread_mutex_lock");
q->buffer[q->in] = c;
q->in++;
q->in %= MAX;
assert(q->count < MAX);
q->count++;
CHECK(pthread_mutex_unlock(&q->mutex) == 0, "pthread_mutex_unlock");
CHECK(sem_post(q->full) == 0, "sem_post");
}
/* Wait, if necessary, for a slot to become full in the queue, and
* then pop and return the first character. */
char queue_pop(queue_t q)
{
CHECK(sem_wait(q->full) == 0, "sem_wait");
CHECK(pthread_mutex_lock(&q->mutex) == 0, "pthread_mutex_lock");
char val = q->buffer[q->out];
q->out++;
q->out %= MAX;
assert(q->count > 0);
q->count--;
assert(q->count == (q->in + MAX - q->out) % MAX);
CHECK(pthread_mutex_unlock(&q->mutex) == 0, "pthread_mutex_unlock");
CHECK(sem_post(q->empty) == 0, "sem_post");
return val;
}
/* Test harness */
queue_s queue;
char substring[5];
void *producer(void *arg)
{
char array[] = "HelloWorld";
size_t i = 0;
printf("Producer Thread Created\n");
while (array[i] != '0円') {
queue_append(&queue, array[i]);
i++;
}
return NULL;
}
void *consumer(void *arg)
{
size_t i = 0;
printf("Consumer Thread Created\n");
for (i = 0; i < 5; i++) {
substring[i] = queue_pop(&queue);
}
return NULL;
}
int main()
{
pthread_t producer_thread, consumer_thread;
queue_init(&queue);
CHECK(pthread_create(&consumer_thread, NULL, consumer, NULL) == 0,
"pthread_create");
CHECK(pthread_create(&producer_thread, NULL, producer, NULL) == 0,
"pthread_create");
CHECK(pthread_join(consumer_thread, NULL) == 0, "pthread_join");
CHECK(pthread_join(producer_thread, NULL) == 0, "pthread_join");
printf("Next character in queue: %c\n", queue.buffer[queue.out]);
printf("substring: %s\n", substring);
queue_destroy(&queue);
return EXIT_SUCCESS;
}
-
2\$\begingroup\$ This is a superb review! I am thoroughly impressed to how much work you put into this! \$\endgroup\$syb0rg– syb0rg2014年07月05日 18:11:13 +00:00Commented Jul 5, 2014 at 18:11