This is a matrix-vector multiplication program using multi-threading. It takes matrixfile.txt and vectorfile.txt names, buffer size, and number of splits as input and splits the matrixfile into splitfiles in main function (matrix is divided into smaller parts). Then mapper threads writes the value into buffer and reducer thread writes result into resultfile.txt. Resultfile algorithm is not efficient but the code works which I tested with various inputs.
I appreciate any correction and comment.
Program:
/* -*- linux-c -*- */
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <math.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <time.h>
#include <pthread.h>
#include <errno.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <semaphore.h>
#include "common.h"
#include "stdint.h"
const int ROWS = 10000;
const int COLS = 3;
int twodimen[10000][3];
int count_lines;
int vector[10000];
int vector_lines;
int NUMBER_OF_ROWS;
int splitnum;
int INPUT_BUF_SIZE;
sem_t *sem_mutex; /* protects the buffer */
sem_t *sem_full; /* counts the number of items */
sem_t *sem_empty; /* counts the number of empty buffer slots */
void * mapperThread(void * xx){
int filecount = (intptr_t)xx;
char filename[20] = "splitfile";
char txt[5] = ".txt";
char num[10];
sprintf(num, "%d", filecount);
strcat(filename, num);
strcat(filename, txt);
printf ("mapper thread started with: %s \n", filename);
struct buffer * bp = find(filecount);
// OPENING SPLIT FILE
FILE *splitfileptr;
char *sline = NULL;
size_t slen = 0;
ssize_t sread;
splitfileptr = fopen(filename, "r");
if (splitfileptr == NULL){
exit(EXIT_FAILURE);
}
while ((sread = getline(&sline, &slen, splitfileptr)) != -1) {
char *line_copy = strdup(sline);
if (SYNCHRONIZED) {
sem_wait(sem_empty);
sem_wait(sem_mutex);
// CRITICAL SECTION BEGIN
bp->buf[bp->in] = line_copy;
bp->count = bp->count + 1;
bp->in = (bp->in + 1) % INPUT_BUF_SIZE; // incrementing buffer count, updating
// CRITICAL SECTION END
sem_post(sem_mutex); // releasing the mutex
sem_post(sem_full); // incrementing full count, sem_post is signal operation
}
}
printf("producer ended; bye...\n");
pthread_exit(0);
}
void * reducerThread(char* resultfilename){
printf("reducer thread started\n");
FILE *resultfileptr;
char *line = NULL;
size_t len = 0;
ssize_t read;
char* item;
int index = 0;
while (index < count_lines) {
for(int i = 0; i < splitnum; i++){
struct buffer * bp = find(i);
if (SYNCHRONIZED && bp->count != 0) {
sem_wait(sem_full); // checks whether buffer has item to retrieve, if full count = 0, this statement will cause consumer to wait
sem_wait(sem_mutex); // makes sure when we are executing this section no other process executes at the buffer
// CRITICAL SECTION BEGIN
item = bp->buf[bp->out]; // just retrieving the buffer. putting into item.
bp->count = bp->count - 1;
bp->out = (bp->out + 1) % INPUT_BUF_SIZE; // updating out index variable, this is a circular bufer
index++;
printf("retrieved item is: %s", item);
twodimen[atoi(&item[0]) - 1][0] = atoi(&item[0]);
twodimen[atoi(&item[0]) - 1][2] = twodimen[atoi(&item[0]) - 1 ][2] + atoi(&item[4]) * vector[atoi(&item[2]) - 1];
// CRITICAL SECTION END
sem_post(sem_mutex); //
sem_post(sem_empty); // number of empty cells in the buffer should be 1 more. incrementing empty size.
}
}
}
// WRITING TO RESULTFILE
resultfileptr = fopen(resultfilename, "w+");
for(int i = 0; i < NUMBER_OF_ROWS; i++){
for(int j = 0; j < COLS; j++){
if(twodimen[i][j] != 0 && twodimen[i][j + 2] != 0){
char str[10];
sprintf(str, "%d %d \n", twodimen[i][j], twodimen[i][j + 2]);
fprintf(resultfileptr, "%s", str);
}
}
}
printf("consumer ended; bye...\n");
fflush (stdout);
pthread_exit(NULL);
}
int main(int argc, char**argv)
{
clock_t start_time = clock();
const char *const matrixfilename = argv[1];
const char *const vectorfilename = argv[2];
const char *const resultfilename = argv[3];
const int K = atoi(argv[4]);
INPUT_BUF_SIZE = atoi(argv[5]);
splitnum = K;
printf ("mv started\n");
printf ("%s\n", matrixfilename);
printf ("%s\n", vectorfilename);
printf ("%s\n", resultfilename);
printf ("K is %d\n", K);
printf ("splitnum is %d\n", splitnum);
printf ("INPUT_BUF_SIZE is %d\n", INPUT_BUF_SIZE);
if(INPUT_BUF_SIZE > BUFSIZE || INPUT_BUF_SIZE < 100){
printf("Buffer input should be between 100 and 10000, BUFSIZE = 10000 will be used as default \n");
INPUT_BUF_SIZE = BUFSIZE;
}
FILE *fileptr;
count_lines = 0;
char filechar[10000], chr;
fileptr = fopen(matrixfilename, "r");
// extract character from file and store it in chr
chr = getc(fileptr);
while(chr != EOF)
{
// count whenever new line is encountered
if(chr == '\n')
{
count_lines = count_lines + 1;
}
// take next character from file
chr = getc(fileptr);
}
printf("countlines is %d \n", count_lines);
fclose(fileptr); // close file
printf("There are %d lines in in a file\n", count_lines);
int s = count_lines / K;
int remainder = count_lines % K;
printf("S is %d \n", s);
FILE *fw, *fr;
char *line = NULL;
size_t len = 0;
ssize_t read;
// CREATING SPLIT FILES AND WRITING TO THEM
for(int i = 0; i < K; i++){
char filename[20] = "splitfile";
char txt[5] = ".txt";
char its[10];
sprintf(its, "%d", i);
strcat(filename, its);
strcat(filename, txt);
fw = fopen(filename, "w+");
fr = fopen(matrixfilename, "r");
if(i == K - 1){
for(int j = 0; j < count_lines; j++){
while(((read = getline(&line, &len, fr)) != -1) && j >= (i * s)){
char *line_copy = strdup(line);
fprintf(fw, "%s", line_copy);
j++;
}
}
}
else{
for(int j = 0; j < count_lines; j++){
while(((read = getline(&line, &len, fr)) != -1) && j >= (i * s) && j <= (i + 1) * s - 1){
char *line_copy = strdup(line);
fprintf(fw, "%s", line_copy);
j++;
}
}
}
fclose(fw);
fclose(fr);
}
FILE *vectorfileptr;
vector_lines = 0;
char vchr;
vectorfileptr = fopen(vectorfilename, "r");
vchr = getc(vectorfileptr);
line = NULL;
len = 0;
// COUNTING THE SIZE OF VECTOR
while(vchr != EOF)
{
// count whenever new line is encountered
if(vchr == '\n')
{
vector_lines = vector_lines + 1;
}
// take next character from file
vchr = getc(vectorfileptr);
}
fclose(vectorfileptr);
printf("There are %d lines in vector file\n", vector_lines);
vector[vector_lines];
vectorfileptr = fopen(vectorfilename, "r");
if (vectorfileptr == NULL)
exit(EXIT_FAILURE);
int linenumber = 0;
while ((read = getline(&line, &len, vectorfileptr)) != -1) {
char *line_copy = strdup(line);
vector[linenumber] = atoi(line_copy);
linenumber++;
}
fclose(vectorfileptr);
for(int i = 0; i < vector_lines; i++){
printf("vector %d: %d\n", i, vector[i]);
}
FILE *countfileptr;
countfileptr = fopen(matrixfilename, "r");
NUMBER_OF_ROWS = 0;
while ((read = getline(&line, &len, countfileptr)) != -1) {
char *line_copy = strdup(line);
if(atoi(&line_copy[0]) > NUMBER_OF_ROWS){
NUMBER_OF_ROWS = atoi(&line_copy[0]);
}
}
fclose(countfileptr);
/* first clean up semaphores with same names */
sem_unlink (SEMNAME_MUTEX);
sem_unlink (SEMNAME_FULL);
sem_unlink (SEMNAME_EMPTY);
/* create and initialize the semaphores */
sem_mutex = sem_open(SEMNAME_MUTEX, O_RDWR | O_CREAT, 0660, 1);
if (sem_mutex < 0) {
perror("can not create semaphore\n");
exit (1);
}
printf("sem %s created\n", SEMNAME_MUTEX);
sem_full = sem_open(SEMNAME_FULL, O_RDWR | O_CREAT, 0660, 0);
if (sem_full < 0) {
perror("can not create semaphore\n");
exit (1);
}
printf("sem %s created\n", SEMNAME_FULL);
sem_empty = sem_open(SEMNAME_EMPTY, O_RDWR | O_CREAT, 0660, BUFSIZE); // initially bufsize items can be put
if (sem_empty < 0) {
perror("can not create semaphore\n");
exit (1);
}
printf("sem %s create\n", SEMNAME_EMPTY);
for(int i = 0; i < splitnum; i++){
insertFirst(0,0,0,i);
}
int err;
pthread_t tid[splitnum];
printf ("starting thread\n");
for(int i = 0; i < splitnum; i++){
err = pthread_create(&tid[i], NULL, (void*) mapperThread, (void*)(intptr_t)i);
if(err != 0){
printf("\n Cant create thread: [%s]", strerror(err));
}
}
pthread_t reducertid;
pthread_create(&reducertid, NULL, (void*) reducerThread, (char*) resultfilename);
for(int i = 0; i < splitnum; i++){
pthread_join(tid[i],NULL);
}
pthread_join(reducertid,NULL);
// join reducer thread
// closing semaphores
sem_close(sem_mutex);
sem_close(sem_full);
sem_close(sem_empty);
/* remove the semaphores */
sem_unlink(SEMNAME_MUTEX);
sem_unlink(SEMNAME_FULL);
sem_unlink(SEMNAME_EMPTY);
fflush( stdout );
exit(0);
}
HEADER file:
/* -*- linux-c -*- */
#ifndef COMMON_H
#define COMMON_H
#define TRACE 1
#define SEMNAME_MUTEX "/name_sem_mutex"
#define SEMNAME_FULL "/name_sem_fullcount"
#define SEMNAME_EMPTY "/name_sem_emptycount"
#define ENDOFDATA -1 // marks the end of data stream from the producer
// #define SHM_NAME "/name_shm_sharedsegment1"
#define BUFSIZE 10000 /* bounded buffer size */
#define MAX_STRING_SIZE
// #define NUM_ITEMS 10000 /* total items to produce */
/* set to 1 to synchronize;
otherwise set to 0 and see race condition */
#define SYNCHRONIZED 1 // You can play with this and see race
struct buffer{
struct buffer *next;
char * buf[BUFSIZE]; // string array
int count; /* current number of items in buffer */
int in; // this field is only accessed by the producer
int out; // this field is only accessed by the consumer
int source; // index of the producer
};
struct buffer *head = NULL;
struct buffer *current = NULL;
void printList(){
struct buffer *ptr = head;
while(ptr != NULL){
printf("items of buffer %d: \n", ptr->source);
printf("buffer count is : %d \n", ptr->count);
printf("buffer in is : %d \n", ptr->in);
printf("buffer out is : %d \n", ptr->out);
for(int i = 0; i < ptr->count; i++){
printf("%s", ptr->buf[i]);
}
ptr = ptr->next;
}
}
void insertFirst(int count, int in, int out, int source){
struct buffer *link = (struct buffer*) malloc(sizeof(struct buffer));
for(int i = 0; i < BUFSIZE; i++){
link->buf[i] = "";
}
link->count = count;
link->in = in;
link->out = out;
link->source = source;
link->next = head;
head = link;
}
struct buffer* find(int source){
struct buffer* current = head;
if(head == NULL){
return NULL;
}
while(current->source != source){
if(current->next == NULL){
return NULL;
}
else{
current = current->next;
}
}
return current;
}
#endif
1 Answer 1
A few semi-random observations.
You define functions in your header file, so if you include that in multiple source files in one project you'll get multiple definition errors from the linker. (Also, those functions are using standard library functions without including the required header files.)
main
makes assumptions about the number of parameters passed to the program. If you don't pass enough, you'll dereference a NULL or out-of-bounds pointer (e.g., an invalid value for argv[5]
). You should verify that you have enough parameters (by checking argc
) before attempting to access any of the parameters.
Rather than the verbose count_lines = count_lines + 1;
, you can just use ++count_lines;
.
Your code for building the split filename is nearly identical in the two places you use it. You can put it in a function to avoid the duplication, and simplify it by using sprintf
to build the entire filename rather than using sprintf
and strcat
.
sprintf(buf, "splitfile%d.txt", n);
where buf
and n
are passed as parameters to the function. buf
should be long enough to hold any value for n
, 9 +たす 4 +たす 1 +たす 11 =わ 25 characters, assuming n
is no larger than 32 bits. (That's 9 bytes for the base filename, 4 for the extension, 1 for the terminating nul, and 11 for a signed 32 bit integer printed as a decimal.)
You don't verify that fw
and fr
(and some of your other file handles) have successfully been opened before making use of them.
Most of your strdup
calls will leak, and are not necessary.
At one point in main
you call atoi(&line_copy[0])
twice - one inside an if
, and once in the following statement. This should be called once, stored in a local variable:
int nr = atoi(line_copy);
if (nr > NUMBER_OF_ROWS)
NUMBER_OF_ROWS = nr;
reducerThread
will be an infinite loop if SYNCHRONIZED
is 0.