I am writing a program in an environment that makes use of cgroups to identify and group processes together. I want to parse the CPU utilization of each cgroup by sampling /sys/fs/cgroup/cpuacct/.../cpuacct.stat
over a 1 second interval a few times and then averaging them. It also calculates the overall CPU utilization of the system by sampling /proc/stat
in the same manner.
I want to do this in a multithreaded way because if I did it serially, the samples would happen serially, and the program would take a while to run.
I also use PCRE to distinguish between two different process types.
#include <sys/types.h>
#include <sys/wait.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <semaphore.h>
#include <pthread.h>
#include <sys/uio.h>
#include <pcre.h>
#define ITERATIONS 3
#if !defined(CLK_TIME_PER_SECOND) && defined(_SC_CLK_TCK)
# define CLK_TIME_PER_SECOND ((int) sysconf (_SC_CLK_TCK))
#endif
#if !defined(CLK_TIME_PER_SECOND) && defined(CLK_TCK)
# define CLK_TIME_PER_SECOND ((int) CLK_TCK)
#endif
#if !defined(CLK_TIME_PER_SECOND) && defined(CLOCKS_PER_SEC)
# define CLK_TIME_PER_SECOND ((int) CLOCKS_PER_SEC)
#endif
#if !defined(CLK_TIME_PER_SECOND)
# define CLK_TIME_PER_SECOND 100
#endif
struct job_struct {
char procid[50]; //store job information as procid,procType tuple
char procType[50];
};
typedef struct {
char *procid; //for passing to the threads
char *procType;
} sample_struct;
int sumFlag, clk; //TODO: localize?
long double sum1, sum2, systemUtilization, otherCPU;
char *aStrRegex;
const char *pcreErrorStr;
int pcreErrorOffset, pcreExecRet;
int subStrVec[30];
pcre *reCompiled;
pcre_extra *pcreExtra;
pthread_mutex_t total_lock = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t ok_to_add = PTHREAD_COND_INITIALIZER;
int sampleCGroup(char procid[], char procType[]);
void *addSampletoTotal();
void *overallUtilization();
long double getSystemUtilization(void);
int getOSVersion(void);
int getCPUCount(void);
int main(void) {
int cpus, i, jobCount, osVersion;
FILE *fp;
char buff[1024];
cpus = getCPUCount();
clk = CLK_TIME_PER_SECOND;
osVersion = getOSVersion();
if (osVersion <= 10) {
systemUtilization = getSystemUtilization();
printf("sys=%Lf\n", systemUtilization); //cgroups are not supported on certain OS versions, so print the overall utilization and exit, otherwise, try to run
pthread_mutex_destroy(&total_lock); //clean up mutex lock
pthread_cond_destroy(&ok_to_add);
return (0);
}
struct job_struct jobs[cpus]; //initially size the array to be the number of CPUs
sumFlag = 1; //condition variable for mutex
pthread_t systr; //thread id for overall system utilization, note this should only execute on certain versions, the conditional above should have returned if not
pthread_create(&systr, NULL, overallUtilization, NULL); //create the thread
aStrRegex = "proc_type1"; //type1
reCompiled = pcre_compile(aStrRegex, 0, &pcreErrorStr, &pcreErrorOffset, NULL); //compile the regex
if (reCompiled == NULL) {
printf("ERROR: Could not compile '%s': %s\n", aStrRegex, pcreErrorStr);
pthread_join(systr, NULL); //join the system utilization thread
if (systemUtilization > (long double) cpus) { //sanity check for HT
systemUtilization = (long double) cpus;
}
printf("sys=%Lf\n", systemUtilization); //print info
printf("cgrType2=%f\n", 0.0);
printf("cgrType1=%f\n", 0.0);
printf("cgrOther=%Lf\n", systemUtilization);
pthread_mutex_destroy(&total_lock); //clean up mutex lock
pthread_cond_destroy(&ok_to_add);
return (-1);
}
// Optimize the regex
pcreExtra = pcre_study(reCompiled, 0, &pcreErrorStr);
/* pcre_study() returns NULL for both errors and when it can not optimize the regex. The last argument is how one checks for
errors (it is NULL if everything works, and points to an error string otherwise. */
if (pcreErrorStr != NULL) {
printf("ERROR: Could not study '%s': %s\n", aStrRegex, pcreErrorStr);
pthread_join(systr, NULL); //join the system utilization thread
if (systemUtilization > (long double) cpus) { //sanity check for HT
systemUtilization = (long double) cpus;
}
printf("sys=%Lf\n", systemUtilization); //print info
printf("cgrType2=%f\n", 0.0);
printf("cgrType1=%f\n", 0.0);
printf("cgrOther=%Lf\n", systemUtilization);
pthread_mutex_destroy(&total_lock); //clean up mutex lock
pthread_cond_destroy(&ok_to_add);
return (-1);
} /* end if */
fp = popen("/var/bin/getProcs", "r"); //open getProcs to list the procs, this returns the cgroup ID and the type
if (fp == NULL) {
perror("/var/bin/getProcs");
pthread_join(systr, NULL); //join the system utilization thread
if (systemUtilization > (long double) cpus) { //sanity check for HT
systemUtilization = (long double) cpus;
}
printf("sys=%Lf\n", systemUtilization); //print info
printf("cgrType2=%f\n", 0.0);
printf("cgrType1=%f\n", 0.0);
printf("cgrOther=%Lf\n", systemUtilization);
pthread_mutex_destroy(&total_lock); //clean up mutex lock
pthread_cond_destroy(&ok_to_add);
return (-1);
}
jobCount = 0;
while (fgets(buff, sizeof(buff), fp) != NULL) {
sscanf(buff, "%[^,],%[^,]", jobs[jobCount].procid, jobs[jobCount].procType);
jobCount++;
}
if (jobCount == 0) {
pthread_join(systr, NULL); //join the system utilization thread
if (systemUtilization > (long double) cpus) {
systemUtilization = (long double) cpus;
}
printf("sys=%Lf\n", systemUtilization); //print info
printf("cgrType2=%f\n", 0.0);
printf("cgrType1=%f\n", 0.0);
printf("cgrOther=%Lf\n", systemUtilization);
pclose(fp); //close the command
pthread_mutex_destroy(&total_lock); //destroy mutex lock
pthread_cond_destroy(&ok_to_add);
return (0);
}
pthread_t tid[jobCount]; // the thread identifiers, one for each job
for (i = 0; i < jobCount; i++) {
sample_struct *args = malloc(sizeof *args); //allocate memory for passing the argument struct to the threads
args->procid = jobs[i].procid; //assign info in job array to the arg struct
args->procType = jobs[i].procType;
pthread_create(&tid[i], NULL, addSampletoTotal, args); //create the thread
}
pthread_join(systr, NULL); //join the system utilization thread
for (i = 0; i < jobCount; i++) {
pthread_join(tid[i], NULL); //finish threads
}
pthread_mutex_destroy(&total_lock); //destroy mutex lock
pthread_cond_destroy(&ok_to_add);
if (systemUtilization > (long double) cpus) { //sanity check for HT
systemUtilization = (long double) cpus;
}
if (sum2 > (long double) cpus) {
sum2 = (long double) cpus;
}
if (sum1 > (long double) cpus) {
sum1 = (long double) cpus;
}
otherCPU = systemUtilization - sum2 - sum1;
if (otherCPU < 0.0) {
otherCPU = 0.0;
}
printf("sys=%Lf\n", systemUtilization); //print info
printf("cgrType2=%Lf\n", sum2);
printf("cgrType1=%Lf\n", sum1);
printf("cgrOther=%Lf\n", otherCPU);
pclose(fp); //close the command
return (0);
}
int sampleCGroup(char procid[], char procType[]) {
//open the cgroup cpustat file for the job based on the procid arg
char *pidPath = malloc(
strlen("/sys/fs/cgroup/cpuacct/cpuacct.stat/") + strlen(procid) + 1);//+1 for the zero-terminator
if (pidPath == NULL) {
perror("Could not allocate memory for str\n");
return (-1);
} else {
strcpy(pidPath, "/sys/fs/cgroup/cpuacct/"); //create the path to the cgroup info
strcat(pidPath, procid);
strcat(pidPath, "/cpuacct.stat");
}
long double a[2], b[2];
long double loadavg = 0.0;
int i;
FILE *fp;
for (i = 0; i < ITERATIONS; i++) {
fp = fopen(pidPath, "r"); //first sample
if (fp == NULL) {
perror(pidPath);
return (-1);
} else {
if (fscanf(fp, "%*4s %Lf %*6s %Lf", &a[0], &a[1]) != 2) {
perror("Could not parse first cgroup CPU sample!\n");
return (-1);
}
fclose(fp);
sleep(1); //let 1s go by
}
fp = fopen(pidPath, "r"); //second sample
if (fp == NULL) {
perror(pidPath);
return (-1);
} else {
if (fscanf(fp, "%*4s %Lf %*6s %Lf", &b[0], &b[1]) != 2) {
perror("Could not parse second cgroup CPU sample!\n");
return (-1);
}
fclose(fp);
}
loadavg += ((b[0] + b[1]) - (a[0] + a[1])); //delta
}
loadavg = (loadavg / (clk * ITERATIONS)); //calculate the CPU use by delta divided by clock ticks per one second times iterations
free(pidPath); //free the memory allocated to the path variable
//CRITICAL SECTION BEGIN
pthread_mutex_lock(&total_lock); //lock mutex
while (sumFlag == 0) {
pthread_cond_wait(&ok_to_add, &total_lock); //wait on ok to add signal
}
pcreExecRet = pcre_exec(reCompiled,
pcreExtra,
procType,
strlen(procType), // length of string
0, // Start looking at this point
0, // OPTIONS
subStrVec,
30); // Length of subStrVec
if (pcreExecRet > 0) {
sum1 += loadavg;
} else if (pcreExecRet == PCRE_ERROR_NOMATCH) {
sum2 += loadavg;
} else {
sum2 += loadavg; //assume type2 if this fails
perror("Could not determine type!\n");
}
sumFlag = 1;
pthread_cond_signal(&ok_to_add); //signal that it is ok to add
pthread_mutex_unlock(&total_lock); //unlock mutex
//CRITICAL SECTION END
return 0; //success
}
void *addSampletoTotal(void *args) {
sample_struct *actual_args = args; //get the jobs argument struct
sampleCGroup(actual_args->procid, actual_args->procType); //sample the cgroup for the job passed to the thread
free(actual_args); //free the memory allocated to args
pthread_exit(0); //exit thread
}
void *overallUtilization() {
systemUtilization = getSystemUtilization(); //sample the cgroup for the job passed to the thread
pthread_exit(0); //exit thread
}
long double getSystemUtilization(void) {
long double a[3], b[3];
long double loadavg = 0.0;
int i;
FILE *fp;
/* /proc/stat columns:
* 1st column : user = normal processes executing in user mode
* 2nd column : nice = niced processes executing in user mode
* 3rd column : system = processes executing in kernel mode
* 4th column : idle = twiddling thumbs
* 5th column : iowait = waiting for I/O to complete
* 6th column : irq = servicing interrupts
* 7th column : softirq = servicing softirqs
* */
for (i = 0; i < ITERATIONS; i++) {
fp = fopen("/proc/stat", "r"); //first sample
if (fp == NULL) {
perror("/proc/stat");
return (-1);
} else {
if (fscanf(fp, "%*s %Lf %Lf %Lf", &a[0], &a[1], &a[2]) != 3 ) {
perror("Could not parse first system CPU sample!\n");
return (-1);
}
fclose(fp);
sleep(1); //let 1s go by
}
fp = fopen("/proc/stat", "r"); //second sample
if (fp == NULL) {
perror("/proc/stat");
return (-1);
} else {
if (fscanf(fp, "%*s %Lf %Lf %Lf", &b[0], &b[1], &b[2]) != 3) {
perror("Could not parse second system CPU sample!\n");
return (-1);
}
fclose(fp);
}
loadavg += ((b[0] + b[1] + b[2]) - (a[0] + a[1] + a[2])); //delta
}
loadavg = (loadavg / (clk * ITERATIONS)); //calculate the CPU use by delta divided by clock ticks per one second times iterations
return loadavg;
}
int getOSVersion(void) {
FILE *os;
int version, checkVersion, patchLevel;
os = fopen("/etc/SuSE-release", "r"); //first sample
if (os == NULL) {
perror("/etc/SuSE-release");
return (-1);
} else {
if (fscanf(os, "%*s %*s %*s %*s %d %*s VERSION = %d PATCHLEVEL = %d", &version, &checkVersion, &patchLevel) != 3) {
perror("Unable to read SuSE release");
return -1;
}
fclose(os);
}
if (version == checkVersion) {
return version;
} else {
return -1;
}
}
int getCPUCount(void) {
#if defined _SC_NPROCESSORS_ONLN
{
long int nprocs = sysconf (_SC_NPROCESSORS_ONLN);
if (nprocs > 0)
return nprocs;
}
#elif HAVE_SCHED_GETAFFINITY_LIKE_GLIBC /* glibc >= 2.3.4 */
{
cpu_set_t set;
if (sched_getaffinity (0, sizeof (set), &set) == 0)
{
unsigned long count;
# ifdef CPU_COUNT
/* glibc >= 2.6 has the CPU_COUNT macro. */
count = CPU_COUNT (&set);
# else
size_t i;
count = 0;
for (i = 0; i < CPU_SETSIZE; i++)
if (CPU_ISSET (i, &set))
count++;
# endif
if (count > 0)
return count;
}
}
#endif
return 1;
}
In particular I want to be sure this is safe multithreading.
1 Answer 1
Global Variables
I see this comment just prior to the declarations for the global variables:
//TODO: localize?
My answer would be yes. It is very difficult to read, write, debug and maintain programs that use global variables. Global variables can be modified by any function within the program and therefore require each function to be examined before making changes in the code. In C and C++ global variables impact the namespace and they can cause linking errors if they are defined in multiple files. The answers in this stackoverflow question provide a fuller explanation.
Apply Best Practices Consistently
In the function sampleCGroup()
the result from malloc()
is tested before it is used, however, in main()
the result from malloc()
is not tested before it is used.
Return From main()
Rather than hard code values in return from main, it would be better to use the system defined symbolic constants EXIT_SUCCESS and EXIT_FAILURE. There are 2 benefits to this, it makes the code more self documenting and it makes the code more portable (may not be necessary in this case). These symbolic constants are always defined in stdlib.h
.
DRY Code
There is a programming principle called the Don't Repeat Yourself Principle sometimes referred to as DRY code. If you find yourself repeating the same code mutiple times it is better to encapsulate it in a function. If it is possible to loop through the code that can reduce repetition as well.
The following code is repeated many times in main()
, it would be better to encapsulate it in a function:
pthread_join(systr, NULL); //join the system utilization thread
if (systemUtilization > (long double)cpus) { //sanity check for HT
systemUtilization = (long double)cpus;
}
printf("sys=%Lf\n", systemUtilization); //print info
printf("cgrType2=%f\n", 0.0);
printf("cgrType1=%f\n", 0.0);
printf("cgrOther=%Lf\n", systemUtilization);
pthread_mutex_destroy(&total_lock); //clean up mutex lock
pthread_cond_destroy(&ok_to_add);
Complexity
There are two functions in the code, main()
and sampleCGroup()
that are too complex (do too much). The code in these 2 functions should be broken into smaller functions. Functions that are too large or too complex are very difficult to understand, this makes it much harder to write, read, debug or maintain them. A general rule of thumb is that any function that is larger than a single screen in an editor or IDE is too large, a second indication of complexity is the level of indentation in the function.
In both main()
and sampleCGroup()
it is important to note that part of what is making these functions larger is code repetition as discussed in DRY CODE.
In the case of sampleGroup()
some obvious functions are:
char* make_pid_path(char procid[], char procType[])
{
const char* pathHead = "/sys/fs/cgroup/cpuacct/";
const char* pathTail = "/cpuacct.stat";
char* pidPath = (char*) malloc(
strlen(pathHead) + strlen(pathTail) + strlen(procid) + 1);//+1 for the zero-terminator
if (pidPath == NULL) {
perror("Could not allocate memory for str\n");
return NULL;
}
else {
strcpy(pidPath, "/sys/fs/cgroup/cpuacct/"); //create the path to the cgroup info
strcat(pidPath, procid);
strcat(pidPath, "/cpuacct.stat");
}
return pidPath;
}
bool parse_cGroup_CPU_sample(char* pidPath, const char* pass, long double result[2])
{
FILE* fp = fopen(pidPath, "r");
if (fp == NULL) {
perror(pidPath);
return (-1);
}
else {
if (fscanf(fp, "%*4s %Lf %*6s %Lf", &result[0], &result[1]) != 2) {
char error_buffer[256];
sprintf(error_buffer, "Could not parse %s cgroup CPU sample!\n", pass);
perror(error_buffer);
return false;
}
fclose(fp);
}
return true;
}
long double getLoadAverage(char* pidPath)
{
long double a[2], b[2];
long double loadavg = 0.0;
for (int i = 0; i < ITERATIONS; i++) {
if (!parse_cGroup_CPU_sample(pidPath, "first", a))
{
return -1;
}
else
{
sleep(1);
}
if (!parse_cGroup_CPU_sample(pidPath, "second", b))
{
return -1;
}
loadavg += ((b[0] + b[1]) - (a[0] + a[1])); //delta
}
loadavg = (loadavg / (clk * ITERATIONS)); //calculate the CPU use by delta divided by clock ticks per one second times iterations
return loadavg;
}
int sampleCGroup(char procid[], char procType[]) {
//open the cgroup cpustat file for the job based on the procid arg
char* pidPath = make_pid_path(procid, procType);
if (pidPath == NULL)
{
return -1;
}
long double loadavg = getLoadAverage(pidPath);
free(pidPath); //free the memory allocated to the path variable
if (loadavg < 0)
{
return -1;
}
//CRITICAL SECTION BEGIN
pthread_mutex_lock(&total_lock); //lock mutex
while (sumFlag == 0) {
pthread_cond_wait(&ok_to_add, &total_lock); //wait on ok to add signal
}
pcreExecRet = pcre_exec(reCompiled,
pcreExtra,
procType,
strlen(procType), // length of string
0, // Start looking at this point
0, // OPTIONS
subStrVec,
30); // Length of subStrVec
if (pcreExecRet > 0) {
sum1 += loadavg;
}
else if (pcreExecRet == PCRE_ERROR_NOMATCH) {
sum2 += loadavg;
}
else {
sum2 += loadavg; //assume type2 if this fails
perror("Could not determine type!\n");
}
sumFlag = 1;
pthread_cond_signal(&ok_to_add); //signal that it is ok to add
pthread_mutex_unlock(&total_lock); //unlock mutex
//CRITICAL SECTION END
return 0; //success
}
As programs grow in size the use of main()
should be limited to calling functions that parse the command line, calling functions that set up for processing, calling functions that execute the desired function of the program, and calling functions to clean up after the main portion of the program.
Example of a simplified main()
void report_utilization_and_clean_up_mutex(int cpus)
{
pthread_join(systr, NULL); //join the system utilization thread
if (systemUtilization > (long double)cpus) { //sanity check for HT
systemUtilization = (long double)cpus;
}
printf("sys=%Lf\n", systemUtilization); //print info
printf("cgrType2=%f\n", 0.0);
printf("cgrType1=%f\n", 0.0);
printf("cgrOther=%Lf\n", systemUtilization);
pthread_mutex_destroy(&total_lock); //clean up mutex lock
pthread_cond_destroy(&ok_to_add);
}
void testOperationNotSupported()
{
int osVersion = getOSVersion();
if (osVersion <= 10) {
systemUtilization = getSystemUtilization();
printf("sys=%Lf\n", systemUtilization); //cgroups are not supported on certain OS versions, so print the overall utilization and exit, otherwise, try to run
pthread_mutex_destroy(&total_lock); //clean up mutex lock
pthread_cond_destroy(&ok_to_add);
exit(EXIT_SUCCESS);
}
}
void performPcreOperations(int cpus)
{
aStrRegex = "proc_type1"; //type1
reCompiled = pcre_compile(aStrRegex, 0, &pcreErrorStr, &pcreErrorOffset, NULL); //compile the regex
if (reCompiled == NULL) {
printf("ERROR: Could not compile '%s': %s\n", aStrRegex, pcreErrorStr);
report_utilization_and_clean_up_mutex(cpus);
exit(EXIT_FAILURE);
}
// Optimize the regex
pcreExtra = pcre_study(reCompiled, 0, &pcreErrorStr);
/* pcre_study() returns NULL for both errors and when it can not optimize the regex. The last argument is how one checks for
errors (it is NULL if everything works, and points to an error string otherwise. */
if (pcreErrorStr != NULL) {
printf("ERROR: Could not study '%s': %s\n", aStrRegex, pcreErrorStr);
report_utilization_and_clean_up_mutex(cpus);
exit(EXIT_FAILURE);
} /* end if */
}
struct job_struct* getJobs(int cpus, int* jobCount)
{
char buff[1024];
struct job_struct* jobs = (job_struct*)calloc(cpus, sizeof(*jobs)); //initially size the array to be the number of CPUs
if (jobs == NULL)
{
fprintf(stderr, "Failed to allocate %d job structures in main()\n", cpus);
exit(EXIT_FAILURE);
}
*jobCount = 0;
FILE* fp = popen("/var/bin/getProcs", "r"); //open getProcs to list the procs, this returns the cgroup ID and the type
if (fp == NULL) {
perror("/var/bin/getProcs");
report_utilization_and_clean_up_mutex(cpus);
return (EXIT_FAILURE);
}
int ljobCount = 0;
while (fgets(buff, sizeof(buff), fp) != NULL) {
sscanf(buff, "%[^,],%[^,]", jobs[ljobCount].procid, jobs[ljobCount].procType);
ljobCount++;
}
*jobCount = ljobCount;
return jobs;
}
int main(void) {
int jobCount = 0;
testOperationNotSupported();
int cpus = getCPUCount();
clk = CLK_TIME_PER_SECOND;
struct job_struct* jobs = getJobs(cpus, &jobCount);
sumFlag = 1; //condition variable for mutex
pthread_t systr; //thread id for overall system utilization, note this should only execute on certain versions, the conditional above should have returned if not
pthread_create(&systr, NULL, overallUtilization, NULL); //create the thread
performPcreOperations(cpus);
if (jobCount == 0) {
report_utilization_and_clean_up_mutex(cpus);
return (EXIT_SUCCESS);
}
pthread_t tid[jobCount]; // the thread identifiers, one for each job
for (int i = 0; i < jobCount; i++) {
sample_struct* args = malloc(sizeof * args); //allocate memory for passing the argument struct to the threads
args->procid = jobs[i].procid; //assign info in job array to the arg struct
args->procType = jobs[i].procType;
pthread_create(&tid[i], NULL, addSampletoTotal, args); //create the thread
}
pthread_join(systr, NULL); //join the system utilization thread
for (int i = 0; i < jobCount; i++) {
pthread_join(tid[i], NULL); //finish threads
}
pthread_mutex_destroy(&total_lock); //destroy mutex lock
pthread_cond_destroy(&ok_to_add);
if (systemUtilization > (long double) cpus) { //sanity check for HT
systemUtilization = (long double)cpus;
}
if (sum2 > (long double)cpus) {
sum2 = (long double)cpus;
}
if (sum1 > (long double)cpus) {
sum1 = (long double)cpus;
}
otherCPU = systemUtilization - sum2 - sum1;
if (otherCPU < 0.0) {
otherCPU = 0.0;
}
printf("sys=%Lf\n", systemUtilization); //print info
printf("cgrType2=%Lf\n", sum2);
printf("cgrType1=%Lf\n", sum1);
printf("cgrOther=%Lf\n", otherCPU);
pclose(fp); //close the command
return (EXIT_SUCCESS);
}
There is also a programming principle called the Single Responsibility Principle that applies here. The Single Responsibility Principle states:
that every module, class, or function should have responsibility over a single part of the functionality provided by the software, and that responsibility should be entirely encapsulated by that module, class or function.
Explore related questions
See similar questions with these tags.