(See also the follow-up question.)
I was in the mood for pthread.h
and decided to write a concurrent stack data structure. My requirements:
- the stack has maximum capacity,
- pushing an element,
- removing the top element without returning it,
- peeking the top element,
- client is blocked whenever popping from an empty stack,
- client is blocked whenever pushing to a full stack.
My codez looks like this:
concurrent_stack.h
#ifndef CONCURRENT_STACK_H
#define CONCURRENT_STACK_H
#include <stdlib.h>
typedef struct concurrent_stack
{
/*******************************************
The number of elements stored in this stack.
*******************************************/
size_t size;
/**************************************************
The maximum number of elements this stack can hold.
**************************************************/
size_t capacity;
/**********************************************
The actual array holding the data of the stack.
**********************************************/
void** storage;
/************************************************
The mutual exclusion lock for updating the stack.
************************************************/
pthread_mutex_t mutex;
/*****************************
Guards against an empty stack.
*****************************/
pthread_cond_t empty_condition_variable;
/***************************
Guards against a full stack.
***************************/
pthread_cond_t full_condition_variable;
}
concurrent_stack;
/*****************************************
Initializes a new, empty concurrent stack.
*****************************************/
void concurrent_stack_init(concurrent_stack* stack, size_t capacity);
/****************************************
Pushes a datum onto the top of the stack.
****************************************/
void concurrent_stack_push(concurrent_stack* stack, void* datum);
/******************************************
Returns, but does not remove the top datum.
******************************************/
void* concurrent_stack_top(concurrent_stack* stack);
/****************************************
Removes the topmost datum from the stack.
****************************************/
void concurrent_stack_pop(concurrent_stack* stack);
/*******************************************
Returns the number of elements in the stack.
*******************************************/
size_t concurrent_stack_size(concurrent_stack* stack);
/*********************************
Returns the capacity of the stack.
*********************************/
size_t concurrent_stack_capacity(concurrent_stack* stack);
/***********************************
Releases all resources of the stack.
***********************************/
void concurrent_stack_free(concurrent_stack* stack);
#endif /* CONCURRENT_STACK_H */
concurrent_stack.c
#include "concurrent_stack.h"
#include <pthread.h>
#define MAX(A,B) (((A) > (B)) ? (A) : (B))
static const size_t MINIMUM_CAPACITY = 10;
void concurrent_stack_init(concurrent_stack* stack, size_t capacity)
{
stack->size = 0;
stack->capacity = MAX(capacity, MINIMUM_CAPACITY);
stack->storage = malloc(sizeof(void*) * stack->capacity);
pthread_mutex_init(&stack->mutex, NULL);
pthread_cond_init(&stack->empty_condition_variable, NULL);
pthread_cond_init(&stack->full_condition_variable, NULL);
}
void concurrent_stack_push(concurrent_stack* stack, void* datum)
{
pthread_mutex_lock(&stack->mutex);
while (stack->size == stack->capacity)
{
pthread_cond_wait(&stack->full_condition_variable, &stack->mutex);
}
stack->storage[stack->size++] = datum;
pthread_cond_signal(&stack->empty_condition_variable);
pthread_mutex_unlock(&stack->mutex);
}
void concurrent_stack_pop(concurrent_stack* stack)
{
pthread_mutex_lock(&stack->mutex);
while (stack->size == 0)
{
pthread_cond_wait(&stack->empty_condition_variable, &stack->mutex);
}
stack->size--;
pthread_cond_signal(&stack->full_condition_variable);
pthread_mutex_unlock(&stack->mutex);
}
void* concurrent_stack_top(concurrent_stack* stack)
{
void* ret;
pthread_mutex_lock(&stack->mutex);
while (stack->size == 0)
{
pthread_cond_wait(&stack->empty_condition_variable, &stack->mutex);
}
ret = stack->storage[stack->size - 1];
pthread_cond_signal(&stack->full_condition_variable);
pthread_mutex_unlock(&stack->mutex);
return ret;
}
size_t concurrent_stack_size(concurrent_stack* stack)
{
size_t size;
pthread_mutex_lock(&stack->mutex);
size = stack->size;
pthread_mutex_unlock(&stack->mutex);
return size;
}
size_t concurrent_stack_capacity(concurrent_stack* stack)
{
return stack->capacity;
}
void concurrent_stack_free(concurrent_stack* stack)
{
free(stack->storage);
pthread_mutex_destroy(&stack->mutex);
pthread_cond_destroy(&stack->empty_condition_variable);
pthread_cond_destroy(&stack->full_condition_variable);
}
main.c
#include "concurrent_stack.h"
#include <pthread.h>
#include <stdio.h>
typedef struct thread_config
{
size_t element_count;
concurrent_stack* stack;
}
thread_config;
void* producer_code(void* data)
{
thread_config* cfg = (thread_config*) data;
size_t limit = cfg->element_count;
concurrent_stack* stack = cfg->stack;
for (size_t i = 0; i != limit; ++i)
{
concurrent_stack_push(stack, (void*) i);
}
return NULL;
}
void* inspector_code(void* data)
{
thread_config* cfg = (thread_config*) data;
size_t limit = cfg->element_count;
concurrent_stack* stack = cfg->stack;
for (size_t i = 0; i != limit; ++i)
{
concurrent_stack_top(stack);
concurrent_stack_size(stack);
}
return NULL;
}
void* consumer_code(void* data)
{
thread_config* cfg = (thread_config*) data;
size_t limit = cfg->element_count;
concurrent_stack* stack = cfg->stack;
for (size_t i = 0; i != limit; ++i)
{
concurrent_stack_pop(stack);
}
return NULL;
}
static const size_t PRODUCERS = 3;
static const size_t CONSUMERS = 3;
static const size_t INSPECTORS = 1;
static const size_t PRODUCER_ELEMENTS = 91 * 1000;
static const size_t CONSUMER_ELEMENTS = 90 * 1000;
static const size_t INSPECTOR_ELEMENTS = 50 * 1000;
/**
In order to make sure that the program exits, you must guarantee that:
PRODUCER_ELEMENTS * PRODUCERS - CONSUMER_ELEMENTS * CONSUMERS <= STACK_CAPACITY.
Otherwise, after all consumers have done their job, the producers will fill
it again and finally block on it.
*/
static const size_t STACK_CAPACITY = 5000;
int main()
{
concurrent_stack st;
concurrent_stack_init(&st, STACK_CAPACITY);
pthread_t threads[PRODUCERS + CONSUMERS + INSPECTORS];
size_t next_thread_slot = 0;
thread_config producer_thread_config = { PRODUCER_ELEMENTS, &st };
thread_config consumer_thread_config = { CONSUMER_ELEMENTS, &st };
thread_config inspector_thread_config = { INSPECTOR_ELEMENTS, &st };
for (size_t i = 0; i != CONSUMERS; ++i)
{
pthread_create(&threads[next_thread_slot++],
NULL,
consumer_code,
(void*) &consumer_thread_config);
}
for (size_t i = 0; i != INSPECTORS; ++i)
{
pthread_create(&threads[next_thread_slot++],
NULL,
inspector_code,
(void*) &inspector_thread_config);
}
for (size_t i = 0; i != PRODUCERS; ++i)
{
pthread_create(&threads[next_thread_slot++],
NULL,
producer_code,
(void*) &producer_thread_config);
}
/* All threads created. Now join them. */
for (size_t i = 0; i != INSPECTORS + CONSUMERS + PRODUCERS; ++i)
{
pthread_join(threads[i], NULL);
}
size_t expected_stack_size =
PRODUCERS * PRODUCER_ELEMENTS - CONSUMERS * CONSUMER_ELEMENTS;
printf("Expected final stack size: %zu\n", expected_stack_size);
printf("Actual final stack size: %zu\n", concurrent_stack_size(&st));
concurrent_stack_free(&st);
}
Critique request
Please tell me anything that comes to mind, yet especially I am interested in comments relating to:
- code layout,
- correctness of synchronization.
-
1\$\begingroup\$ I'd suggest you remove that part of your requests regarding coding conventions. I't makes your question overly broad and opinion based, since there aren't any general coding conventions for c. \$\endgroup\$πάντα ῥεῖ– πάντα ῥεῖ2017年01月04日 19:12:09 +00:00Commented Jan 4, 2017 at 19:12
-
1\$\begingroup\$ That's absolutely false @πάντα ῥεῖ, OP can specify where exactly he is looking for improvements and what type of improvements he is more interested in. Any specific request are on-topic as well. This makes it easier for reviewers to focus on specific parts of the code instead of pointing out all of the code-style, naming, etc. errors, which doesn't really concern OP all that much. \$\endgroup\$Denis– Denis2017年01月04日 23:41:40 +00:00Commented Jan 4, 2017 at 23:41
-
1\$\begingroup\$ @denis No, that's not false. Coding conventions vary on many points of view, and are contradictorily if you don't follow the whole set of them. I think the OP's edit improved the question a lot! I'll keep to flag/VTC any question about better coding conventions in future. \$\endgroup\$πάντα ῥεῖ– πάντα ῥεῖ2017年01月05日 00:05:24 +00:00Commented Jan 5, 2017 at 0:05
-
\$\begingroup\$ I see, you were talking specifically about the coding conventions part, I do agree on that one, my bad :) \$\endgroup\$Denis– Denis2017年01月05日 00:19:30 +00:00Commented Jan 5, 2017 at 0:19
2 Answers 2
Extraneous line
I quickly scanned your stack code and it looked correct to me. However, I did spot an extraneous line in concurrent_stack_top()
. You have this line that was probably copied from concurrent_stack_pop()
:
pthread_cond_signal(&stack->full_condition_variable);
Since concurrent_stack_top()
doesn't remove anything from the stack, you shouldn't need to signal anything to the producers.
Pop should return top also
I think that pop()
should return the top element, so that the operation is atomic. Otherwise, if you do top()
followed by pop()
, you might get some element from top()
but then pop a different element.
Issues:
Failed to check the return value of all these system calls:
stack->storage = malloc(sizeof(void*) * stack->capacity);
pthread_mutex_init(&stack->mutex, NULL);
pthread_cond_init(&stack->empty_condition_variable, NULL);
pthread_cond_init(&stack->full_condition_variable, NULL);
pthread_mutex_lock(&stack->mutex);
pthread_cond_wait(&stack->full_condition_variable, &stack->mutex);
pthread_cond_signal(&stack->empty_condition_variable);
pthread_mutex_unlock(&stack->mutex);
pthread_mutex_lock(&stack->mutex);
pthread_cond_wait(&stack->empty_condition_variable, &stack->mutex);
pthread_cond_signal(&stack->full_condition_variable);
pthread_mutex_unlock(&stack->mutex);
pthread_mutex_lock(&stack->mutex);
pthread_cond_wait(&stack->empty_condition_variable, &stack->mutex);
pthread_cond_signal(&stack->full_condition_variable);
pthread_mutex_unlock(&stack->mutex);
pthread_mutex_lock(&stack->mutex);
pthread_mutex_unlock(&stack->mutex);
pthread_mutex_destroy(&stack->mutex);
pthread_cond_destroy(&stack->empty_condition_variable);
pthread_cond_destroy(&stack->full_condition_variable);
You may be destroying objects that are currently in use in another thread.
void concurrent_stack_free(concurrent_stack* stack)
{
free(stack->storage);
pthread_mutex_destroy(&stack->mutex);
pthread_cond_destroy(&stack->empty_condition_variable);
pthread_cond_destroy(&stack->full_condition_variable);
}
Before you can start freeing the resources you have to make sure that no other thread is using these resources. This means you have to force a flush and make sure there are no threads in your object.
-
\$\begingroup\$ Thank you for your reply. A couple of questions though: (1) What should I do, if a pthread call fails? (2) How to make sure that
concurrent_stack_free
does the job such that no other threads are using the stack? \$\endgroup\$coderodde– coderodde2017年01月05日 07:35:37 +00:00Commented Jan 5, 2017 at 7:35 -
\$\begingroup\$ What should you do? What can you do? Can your code continue if the pthread call fails? If it can't then an error message and a call to
exit()
seem appropriate. Continuing without doing anything should never be an option. \$\endgroup\$Loki Astari– Loki Astari2017年01月05日 18:29:02 +00:00Commented Jan 5, 2017 at 18:29
Explore related questions
See similar questions with these tags.