I am to implement a parallel version of FastICA, so I implemented the serial version of FastICA.
Now this is the architectural diagram of how it is going to parallelize. I just coded a basic thread model of this. But I need to know what I did was correct or wrong regarding inter threads synchronization.
#include <condition_variable>
#include <mutex>
#include <vector>
#include <iostream>
#include <thread>
int numberOfThreads=10;
std::vector<int> checkedEvent(numberOfThreads);
std::vector<double> data(numberOfThreads);
std::mutex lock;
int IsPrepare=0;
std::condition_variable data_prepare;
class MainProcess
{
private:
int rank;
double input;
public:
MainProcess(int rank_, double dataPr){
rank=rank_;
input=dataPr;
}
MainProcess(){
}
void fastICA_processing()
{
while(true){
std::unique_lock<std::mutex> lk(lock);
data_prepare.wait(lk,[this]{return checkedEvent[rank]!=1;});
data[rank]=input;
checkedEvent[rank]=1;
IsPrepare=IsPrepare+1;
std::cerr <<IsPrepare<<" thread is processing"<<std::endl;
lk.unlock();
}
}
void controller()
{
while(true)
{
while(IsPrepare==numberOfThreads)
{
std::lock_guard<std::mutex> lk(lock);
std::cerr<<"Now controller is checking fastICA is converged or not "<<std::endl;
std::fill(checkedEvent.begin(), checkedEvent.end(), 0);
IsPrepare=0;
data_prepare.notify_all();
}
}
}
};
int main()
{
std::vector<std::thread> thredArray;
MainProcess p3(0,0);
std::thread process_thread(&MainProcess::controller,&p3);
for(int i=0;i<numberOfThreads;i++){
MainProcess* p1= new MainProcess(i,i);
thredArray.push_back(std::thread (&MainProcess::fastICA_processing,p1));
}
for(auto &t:thredArray)
{
t.join();
}
process_thread.join();
return 0;
}
Note: Here "data" stands for input data. First fastICA_processing
threads do some mathematical calculations and then the controller thread gets all that data and processes it. This will happen again and again. I didn't add threads detaching part here.
1 Answer 1
The code is a mess, but I can tell you that it will spin forever right here:
std::thread process_thread(&MainProcess::controller,&p3);
This starts controller
running immediately.
void controller()
{
while(true)
{
while(IsPrepare==numberOfThreads)
{
std::lock_guard<std::mutex> lk(lock);
std::cerr<<"Now controller is checking fastICA is converged or not "<<std::endl;
std::fill(checkedEvent.begin(), checkedEvent.end(), 0);
IsPrepare=0;
data_prepare.notify_all();
}
}
}
IsPrepare
is 0, and numberOfThreads
is 10, and there's no synchronization to worry about here (no locks, no atomic
s), so we can definitely say that 0 == 10
is invariably false. Thus controller
reduces to
void controller()
{
while(true)
{
while(false) {}
}
}
Okay, so that thread won't be doing anything anytime soon. And with that code out of the way, the rest is pretty much irrelevant; it looks to me as if it will end up blocking forever, since there's nobody left to notify the condition variable.
Your problem might be suited to a TBB flowgraph; consider using TBB instead of hand-rolling your own stuff.
-
\$\begingroup\$ here cant say IsPrepare is 0 and numberOfThreads is 10 because fastICA_processing() threads are increasing IsPrepare by one when they acquired lock. Eventually when it reaches 10 in this case controller thread will get the lock and do some processing and notify all fastICA_processing() threads do the same thing. Anyway thank you for mentioning about TBB flowgraph \$\endgroup\$GPrathap– GPrathap2015年06月20日 09:58:20 +00:00Commented Jun 20, 2015 at 9:58
Explore related questions
See similar questions with these tags.