I've had great experience in the past getting feedback from uDev and python, but I'm dreading this attempt at C++. I have a little background in C - but absolutely none in C++ - so this code is constructed directly from reading many books, SO posts and the code from a lot of GitHub projects.
I know there's probably an overlap of C and C++, but I tried to catch as much as I understood. I just thought I better stop and get some feedback before I go any further.
Everything is working now - it reads the stream, checks for appropriate tags and the code is there to play an alert - but I haven't added the actual code that sends the message.
I'm just looking for obvious C++ blunders, sources of potential memory leaks and crashes. Also welcome feedback for improvements and suggestions for the future as I continue.
The code is only at 300 lines now - in my program - I should probably split out some stuff to a program header file.
Start of File
/***********************************************
* ProcessMessages.cpp
*
* Process Messages via streaming socket
*
* Compile and Link with: g++ -std=c++11 -pthread
*************************************************/
#include <iostream>
#include <string>
#include <algorithm> /* used for replace */
#include <queue>
#include <chrono>
#include <thread>
#include <stdlib.h>
#include <time.h>
#include <pthread.h>
#include <err.h>
#include <stdio.h>
#include <alloca.h>
#include "popen-noshell/popen_noshell.h" // https://github.com/famzah/popen-noshell
#include "network/SocketClient.h" // TCP-IP Sockets in C
#include "pugixml/pugixml.hpp" // https://github.com/zeux/pugixml
using namespace std;
using namespace nsCAP;
enum message_t { msg_type_text, msg_type_wav_file };
struct alert_t {
std::string alert_msg;
message_t msg_type;
};
alert_t sendAlert;
std::queue<alert_t> qAlerts;
pthread_mutex_t m_alert_queue = PTHREAD_MUTEX_INITIALIZER;
Adjust volume function (uses popen for alsa)
/*********************
* adjustVolume
* recursive function to incrementally lower or increase volume of Line In
*********************/
void adjustVolume(const int finalVolume) {
const int MAX_VOLUME = 50;
const int STEP_VOLUME = 5;
const int SHORT_SLEEP_MS = 100; // 100 milliseconds
static int current_volume = MAX_VOLUME; // Base starting volume
// increments volume
// exit if final volume has already been reached
if (finalVolume == 0) {
current_volume -= STEP_VOLUME;
if (current_volume < finalVolume)
return;
} else {
current_volume += STEP_VOLUME;
if (current_volume > finalVolume)
return;
}
FILE *fp;
int status;
struct popen_noshell_pass_to_pclose pclose_arg;
/* the command arguments used by popen_noshell() */
char *exec_file = (char *) "amixer";
char *arg1 = (char *) "set";
char *arg2 = (char *) "Line,0";
char *arg3 = (char *) (current_volume + "%");
char *arg4 = (char *) NULL; /* last element */
char *argv[] = {exec_file, arg1, arg2, arg3, arg4}; /* NOTE! The first argv[] must be the executed *exec_file itself */
fp = popen_noshell(exec_file, (const char * const *)argv, "r", &pclose_arg, 0);
if (!fp) {
err(EXIT_FAILURE, "popen_noshell()");
}
status = pclose_noshell(&pclose_arg);
if (status == -1) {
err(EXIT_FAILURE, "pclose_noshell()");
}
// pause and then continue adjusting volume in stages
std::this_thread::sleep_for(std::chrono::milliseconds(SHORT_SLEEP_MS));
adjustVolume(finalVolume);
}
Play queue (threaded function)
/*********************
* playQueue
* thread that checks message queue and plays message wave file
* or -To-Do- uses TTS to play text message
*********************/
void *playQueue(void* unused) {
const int LONG_SLEEP_MS = 5000; // 5 seconds
while (true) {
if (!qAlerts.empty()) {
// Lock Queue
int rc = pthread_mutex_lock(&m_alert_queue);
if (rc) { /* an error has occurred */
std::cout << "Error in : pthread_mutex_lock" << "]\n";
}
alert_t newAlert = qAlerts.front();
qAlerts.pop();
// Lower volume to background level
adjustVolume(0);
/*********** To DO *************
// Start playing the alert signal = then play the actual alert
// use espeak and pipe out to aplay
// aplay -Dplug:LineOut test.wav
******************************/
// Raise volume to foreground level
adjustVolume(50);
// Unlock Queue
rc = pthread_mutex_unlock(&m_alert_queue);
if (rc) {
std::cout << "Error in : pthread_mutex_unlock" << "]\n";
}
std::cout << "Playing Alert [" << newAlert.alert_msg << "] \n";
}
else {
// pause before checking queue again
std::this_thread::sleep_for(std::chrono::milliseconds(LONG_SLEEP_MS));
std::cout << "Checking Message QUEUE Again " << " \n";
}
}
pthread_exit(NULL);
}
Main
int main(int argc, char *argv[]) {
const int BUFFER_SIZE = 4096;
const std::string ALERT_END = "</alert>";
const std::string XML_START = "<?xml";
// Define Streaming Server constants
const std::string STREAM_1 = "stream1.server.com";
const std::string STREAM_2 = "stream2.server.com";
const int SERVER_PORT = 8998;
const std::string DATA_FOLDER = "/audio-data/";
const std::string ALERT_TONE = "warning-tone.wav";
pthread_t t_alerts;
int rc;
long t = 0;
time_t now;
time_t lastBeat;
double seconds;
// Setup XML end tag search constants
std::string alertTag = ALERT_END;
std::size_t szAlertEnd = alertTag.length();
std::size_t posTag = 0;
std::string streamData;
pugi::xml_document doc;
pugi::xml_parse_result result;
pugi::xml_node node;
pugi::xml_attribute attr;
cout << "Starting Processor" << "\n";
nsCAP::SocketClient sc(STREAM_1, SERVER_PORT);
sc.connectToServer();
if (sc.isClientConnected() ) {
cout << "Connected to [" << STREAM_1 << "] \n";
} else {
nsCAP::SocketClient sc(STREAM_2, SERVER_PORT);
if (sc.isClientConnected() ) {
cout << "Connected to [" << STREAM_2 << "] \n";
} else {
cout << "Failed to Connect" << "\n";
return EXIT_FAILURE;
}
}
// Create Alert Queue Thread
/* create a new thread that will execute 'playQueue()' */
pthread_create(&t_alerts, NULL, playQueue, NULL);
// track time since last heartbeat received
time(&now); /* get current time; same as: now = time(NULL) */
// seconds = difftime(now,lastBeat);
std::string alertBuffer = sc.receive(BUFFER_SIZE);
while (alertBuffer.length() > 0) {
// Check for start of XML
posTag = alertBuffer.find(XML_START);
if (posTag != std::string::npos) {
alertBuffer = alertBuffer.substr(posTag);
}
// Check for end of alert
posTag = alertBuffer.find(ALERT_END);
if (posTag != std::string::npos) {
// Extract Alert XML including Alert tag
std::string alertXML = alertBuffer.substr(0, posTag + szAlertEnd);
cout << "ALERT XML [" << alertXML << "] \n";
// Process Alert XML
// if <status>System</status> - then heartbeat
// const char * cAlert = alertXML.c_str();
if (doc.load_string(alertXML.c_str())) {
std::string alertStatus = doc.child("alert").child_value("status");
std::string alertSender = doc.child("alert").child_value("sender");
std::cout << "Alert Status: [" << alertStatus << "]\n";
std::cout << "Alert Sender: [" << alertSender << "]\n";
if ( (alertStatus == "System") && (alertSender == "Heartbeat") ) {
std::cout << "Alert Heartbeat" << "\n";
time(&lastBeat); /* get heartbeat time as: lastBeat = time(NULL) */
} else {
std::string alertMessage = doc.child("alert").child_value("description");
std::cout << "Alert Description Found: [" << alertMessage << "]\n";
if (alertMessage == "") {
// Use Headline instead
std::string alertMessage = doc.child("alert").child_value("headline");
} else {
// Remove all <CR>
replace( alertMessage.begin(), alertMessage.end(), '\r', ' ');
// Replace <LF> with space (don't use std:replace)
replace( alertMessage.begin(), alertMessage.end(), '\n', ' ');
}
sendAlert.alert_msg = alertMessage;
sendAlert.msg_type = msg_type_text;
// Lock Queue
int rc = pthread_mutex_lock(&m_alert_queue);
if (rc) { /* an error has occurred */
std::cout << "Error in : pthread_mutex_lock" << "]\n";
}
qAlerts.push(sendAlert);
// Unlock Queue
rc = pthread_mutex_unlock(&m_alert_queue);
if (rc) {
std::cout << "Error in : pthread_mutex_unlock" << "]\n";
}
std::cout << "Alert Message queued For Play:" << alertMessage << "\n";
}
}
else {
std::cout << "XML [" << alertXML << "] parsed with errors, attr value: [" << doc.child("node").attribute("attr").value() << "]\n";
}
// replace current buffer with end of string
alertBuffer = alertBuffer.substr(posTag + szAlertEnd - 1);
}
// Collect more data
streamData = sc.receive(BUFFER_SIZE);
if (streamData.empty())
break;
// Append data to buffer
alertBuffer += streamData;
}
sc.disconnectFromServer();
cout << "End of Processor" << "\n";
/* Last thing that main() should do */
pthread_exit(NULL);
}
1 Answer 1
I've only glanced at your code, however it looks like your connection retry (in main
) has a scoping issue. In your else
block:
} else {
nsCAP::SocketClient sc(STREAM_2, SERVER_PORT);
if (sc.isClientConnected() ) {
cout << "Connected to [" << STREAM_2 << "] \n";
} else {
cout << "Failed to Connect" << "\n";
return EXIT_FAILURE;
}
}
you declare a variable sc
, which you use to try and connect to STREAM_2
if STREAM_1
fails. The variable you're declaring has the same name as an existing variable declared at a higher scope, however the one you're creating is scoped to within the else
block. This means that when you access the variable later in main to read from the socket:
std::string alertBuffer = sc.receive(BUFFER_SIZE);
You're actually calling out to the one that failed to open STREAM_1
.
Declaring nested variables with the same name is something that compilers will often warn about, so particularly when getting familiar with a new language it's a good idea to turn on 'warnings as errors' and crank the warning level up. That way the compiler can give you as much information as possible about what it thinks you might be doing (that you're not expecting).
-
\$\begingroup\$ That makes sense - thanks @forsvarir. It looks like you've also found a problem in my second attempt - I don't actually try to connect to the second server. Are you saying that even if I do remove the declare, and connect to the second server with the same var
sc
- when I come out of the else scope - the connection won't be valid forsc.receive
- or is that issue solely because of the second declare? \$\endgroup\$dbmitch– dbmitch2016年11月15日 18:55:11 +00:00Commented Nov 15, 2016 at 18:55 -
1\$\begingroup\$ @dbmitch If I understand your question correctly, then calling something like
sc.connectToServer();
within theelse
, without declaring a new variablensCAP::SocketClient sc(STREAM_2, SERVER_PORT);
would call the method on the object at the outer scope, so you would be able to use the connection after exiting theelse
(to callsc.receive
). The issue is that when the object goes out of scope it is destroyed. A simple illustration withint
: ideone.com/TSBFwx \$\endgroup\$forsvarir– forsvarir2016年11月15日 19:15:22 +00:00Commented Nov 15, 2016 at 19:15