I am trying to improve my code that involves a thread (threadA
) handling some UDP communication and another thread (threadB
) that works with the output of that communication.
My basic idea is to .push()
new data into a std::queue
within threadA
.
threadB
is looking for new data periodically - if it gets new data it processes it. Then .pop
is called and the data is deleted.
Implementation:
threadA
will calladdNewElement()
any time new UDP data arrives.threadB
will callgetNewestPtr()
to access the data.when done with the data,
threadB
will calldoneWithNewest()
.
I am managing the access to the data (which comes in the form of std::array
) through the use of std::shared_ptr
. I know that the shared_ptr
will manage the access to the pointer in a thread safe manner, but the underlying object is not going to be managed. Here lies my problem - in my implementation of the class, I am not sure where to put in memory locks, to ensure thread safety. My implementation already works, I have been testing it for a few days without problems, but I have the feeling I am doing something wrong, as I have not used any std::mutex
. Anyway, here is my code:
commbufferfifo.h
#ifndef COMMBUFFERFIFO_H
#define COMMBUFFERFIFO_H
#include <queue>
#include <array>
#include <memory>
const size_t COMMLENGTH = 56;
/**
* @brief The commbufferfifo class
* @details used to safely share data between the comm thread and the visu thread
*/
class commbufferfifo
{
public:
commbufferfifo();
/**
* @brief getNewestPtr only accessed in visu thread
* @return returns nullptr if fifo is empty!
*/
std::shared_ptr<std::array<int16_t, COMMLENGTH>> getNewestPtr();///< @brief
void doneWithNewest();///< @brief call when newest object can be deleted
void addNewElement(const std::array<int16_t, COMMLENGTH> &commBuffer); ///< @brief only accessed in comm thread
private:
std::queue<std::shared_ptr<std::array<int16_t, COMMLENGTH>>> m_fifo;
};
#endif // COMMBUFFERFIFO_H
commbufferfifo.cpp
#include "commbufferfifo.h"
commbufferfifo::commbufferfifo()
{
}
std::shared_ptr<std::array<int16_t, COMMLENGTH> > commbufferfifo::getNewestPtr()
{
if (m_fifo.empty())
return nullptr;
else
return m_fifo.front();
}
void commbufferfifo::doneWithNewest()
{
if (m_fifo.empty())
return;
else
m_fifo.pop();
}
void commbufferfifo::addNewElement(const std::array<int16_t, COMMLENGTH> &commBuffer)
{
m_fifo.push(std::make_shared<std::array<int16_t, COMMLENGTH>>(commBuffer));
}
some pseudo usage: main.cpp
#include "includes/commbufferfifo.h"
#include <unistd.h>
void threadA(std::shared_ptr<commbufferfifo> myPointer){
std::array<int16_t, COMMLENGTH> arr;
for (int iteration = 0; iteration < 1000; iteration++){
arr.at(0) = iteration;
myPointer->addNewElement(arr);
usleep(100);
}
}
int main()
{
auto sharedFifo = std::make_shared<commbufferfifo>();
std::thread tA(&threadA, sharedFifo);
usleep(10000);
for(;;){
if (sharedFifo->getNewestPtr() == nullptr)
break;
std::cout << "sharedFifo->getNewestPtr()->at(0) = " << sharedFifo->getNewestPtr()->at(0) << "\n"; //use data
sharedFifo->doneWithNewest(); //data will be deleted
usleep(1000);
}
tA.join();
std::cout << "------------------------------------\n";
std::cout << "done" << std::endl;
return 0;
}
```
1 Answer 1
This is more of a stackoverflow question but whatever.
There is no guarantee that
std::queue
is thread safe and methodsempty()/front()/push()
will properly interact in a multi-threaded environment. They most certainly cause data races. For this reason you need to usestd::mutex
to ensure thatstd::queue
isn't accessed simultaneously from multiple threads ensuring that the data is read/written correctly.What are you going to do when the queue is empty? What if you want to wait for next element? Will you put
sleep(10ms)
periodically? What if it isn't responsive enough and has too big of a lag? The proper method is to utilizestd::condition_variable
which has properwait/notify
methods.It isn't worth to create a shared pointer to view some 100 bytes. You'd better transfer 100 bytes of data by copying it.
This isn't a C++ approach to transfer data by simply copying chunks of bytes. What you should do instead is to template your thread-safe queue over some class
T
so it movesT
's instances from sender to receiver in a safe way.