I'm writing an MD-like simulation and I'm having some difficulties in making this code run faster. I profiled the code using callgrind and kcachegrind, and it appears I'm using about 30% of my time on calculating distances, 43% on setting up the linked-cell list and calculating my forces, 20% on malloc and free (I assume these are called when I create new vectors). In my simulation, I'm using two classes: 'particle', which holds two STL vectors (position and velocity); and 'system' which holds a vector of particles and the functions I use for my simulation.
The linked-cell method I'm using consists of dividing the system into blocks of equal size (in my case just '1'), and then only takes into account interactions between particles in neighbouring blocks. Periodic boundary conditions are applied.
The two functions which use pretty much all my CPU time are the following:
bool checkDistance(Particle& p1, Particle& p2){ //checks whether the distance between p1 and p2 is less than 1
double dist = 0, relDist;
for ( int i = 0; i < dim; i++){ //loop over all dimensions
relDist = p1.pos[i] - p2.pos[i];
if (fabs(relDist) > halfSize){ //max separation in one dimension is half of system size with PBC
if(relDist < 0){ //shift the relative distance to the shortest possible one (minimum image convention)
relDist += systemSize;
}
else {
relDist -= systemSize;
}
}
dist += relDist * relDist;
}
return (dist < 1);
}
and
vector< vector< double> > System::calcForces(vector<vector<double> >& noise){ //calculate the force working on all particles in my system, adds a noise to this force (which is made in other function)
vector<vector<double> > forces (numberOfParticles, vector<double>(dim,0.0)); //contains the force working on every particle
/* this looks up neighbors with linked cell method*/
int numberOfCells = systemSize/interactionRange; //number of cells in one dimension. This way, all particles within interactionRange from a certain particle are in the same cell and in the neighboring cells.
vector<vector<int> > header(numberOfCells, vector<int> (numberOfCells, -1)); //contains highest particle index for each cell, -1 if no particle
vector<int> link(numberOfParticles, -1); //gives index of particle in same cell; -1 if no particle
vector<int> centralCell, neighboringCell; //these contain the indices of particle in central cell and four of its neighbouring cells (in 2D). Don't need to loop over all neighbours for all cells.
centralCell.reserve(numberOfParticles/(systemSize*systemSize) *2); //this is the amount of ints every cell will on average contain, probably unnecessary to reserve
neighboringCell.reserve(numberOfParticles/(systemSize*systemSize) *2* 4);
vector<vector<double> > unitVectors (numberOfParticles, vector<double>(dim)); //create unit vector of velocities for all particles, needed for later force calculation
double norm = 0.0;
double dotProduct = 0.0; //needed for force calculation
for(int i = 0; i < numberOfParticles; i++){
for(int j = 0; j < dim; j++){
norm += particleList[i].vel[j] * particleList[i].vel[j];
}
norm = sqrt(norm);
for (int j = 0; j < dim; j++){
unitVectors[i][j] = particleList[i].vel[j] / norm;
}
norm = 0.0;
}
vector<vector<double> > averages(numberOfParticles, vector<double>(dim,0.0)); //contains average velocities of neighboring particles
vector<int> neighborCount(numberOfParticles,0); //number of neighbours for every particle
for (int i = 0; i < numberOfParticles; i++){ //fill header and link
int xIndex = numberOfCells * particleList[i].pos[0] / systemSize;
int yIndex = numberOfCells * particleList[i].pos[1] / systemSize;
link[i] = header[xIndex][yIndex];
header[xIndex][yIndex] = i;
}
for (int i = 0; i < numberOfCells; i++){ //only need to check with 4 cells. Uses PBC
for (int j = 0; j < numberOfCells; j++){
int tempIndex = header[i][j];
while (tempIndex > -1){ //fill vector with particles in central cell
centralCell.emplace_back(tempIndex);
tempIndex = link[tempIndex];
}
//fill vector with particles in 4 of the neighboring cells. The '%'s are used for periodic boundary conditions.
tempIndex = header[(i+1) % numberOfCells][j];
while (tempIndex > -1){
neighboringCell.emplace_back(tempIndex);
tempIndex = link[tempIndex];
}
tempIndex = header[(i+1) % numberOfCells][(j+1) % numberOfCells];
while (tempIndex > -1){
neighboringCell.emplace_back(tempIndex);
tempIndex = link[tempIndex];
}
tempIndex = header[i][(j+1) % numberOfCells];
while (tempIndex > -1){
neighboringCell.emplace_back(tempIndex);
tempIndex = link[tempIndex];
}
tempIndex = header[(i+1) % numberOfCells][((j-1) % numberOfCells + numberOfCells) % numberOfCells];
while (tempIndex > -1){
neighboringCell.emplace_back(tempIndex);
tempIndex = link[tempIndex];
}
//calculate distance between particles in central cell + particles in neighboring cell. if smaller than interaction range, add to neighbors
for (unsigned int k = 0; k < centralCell.size(); k++){
neighborCount[centralCell[k]] +=1;
for (int d = 0; d < dim; d++){
averages[centralCell[k]][d] += unitVectors[centralCell[k]][d];
}
for(unsigned int l = 0; l < k; l++){
if(checkDistance(particleList[centralCell[k]], particleList[centralCell[l]])){
neighborCount[centralCell[k]] +=1;
neighborCount[centralCell[l]] +=1;
for (int d = 0; d < dim; d++){
averages[centralCell[k]][d] += unitVectors[centralCell[l]][d];
averages[centralCell[l]][d] += unitVectors[centralCell[k]][d];
}
}
}
}
for(unsigned int k = 0; k < centralCell.size(); k++){
for(unsigned int l = 0; l < neighboringCell.size(); l++){
if(checkDistance(particleList[centralCell[k]], particleList[neighboringCell[l]])){
neighborCount[centralCell[k]] +=1;
neighborCount[neighboringCell[l]] +=1;
for (int d = 0; d < dim; d++){
averages[centralCell[k]][d] += unitVectors[neighboringCell[l]][d];
averages[neighboringCell[l]][d] += unitVectors[centralCell[k]][d];
}
}
}
}
centralCell.clear();
neighboringCell.clear();
}
}
int numberOfNeighbors;
for(int i = 0; i < numberOfParticles; i++){ //calculate social force + friction + noise
for (int k = 0; k < dim; k++){
forces[i][k] = noise[i][k] - particleList[i].vel[k]; //stochastic + friction
}
if(neighborCount[i] != 0){
numberOfNeighbors = neighborCount[i];
for(int k = 0; k < dim; k++){
dotProduct += averages[i][k]/numberOfNeighbors * unitVectors[i][k];
}
for (int k = 0; k < dim; k++){
forces[i][k] += couplingFactor * (averages[i][k]/numberOfNeighbors - unitVectors[i][k] * dotProduct);
}
dotProduct = 0.0;
}
}
return forces;
}
As for the first function, to me this looks as efficient as I can get it. Maybe there's a better function than the 'fabs' I'm using?
For the second function, I think it's weird that I'm using so much time on malloc and free. Is there an easy way to reduce the number of calls to these? Any other ways to speed this up?
Finally, I think I'll need to parallelize these methods to get the results I want. Would it be more efficient to use something like openMP or CUDA here?
3 Answers 3
Given the additional information mentioned in the comments, I suggest the following changes:
Use Classes with Proper Members and Useful Methods
It appears that your Particle
class has an array that represents a coordinate. This is an inefficient way of representing a coordinate, particularly if you know you will never need more than 3 dimensions. Iterating over an array instead of just directly handling values invokes a lot of overhead.
I would make 2 classes: Vector2D
and Vector3D
each with their coordinates explicitly defined. Something like:
class Vector2D {
// ...methods, etc.
float x;
float y;
};
and
class Vector3D {
// ...methods, etc.
float x;
float y;
float z;
};
If you have proper methods on the class, such as a length()
and distanceFrom(otherVector)
method, then you don't need to iterate over an array. You can just call the method, which is easier to read, too.
It might make sense to either have an abstract VectorBase
base class that has common methods from both Vector2D
and Vector3D
, or to have your Particle
class be a template that takes a Vector2D
or Vector3D
as the template type.
As mentioned above, there will be common methods between the 2 vector types and this will simplify some of the code in calcForces()
. For example, instead of normalizing all those vectors by having 2 for
loops inside another for
loop, you'd just have 1 loop:
for (int i = 0; i < numberOfParticles; i++) {
unitVectors[i] = particleList[i].normalize();
}
You'll probably also want to implement things like operator+()
and operator-()
to simplify other parts of the code, such as at the end when you "calculate social force + friction + noise".
Make Loops More Efficient
You will inevitably need to have some complex loops. But you're repeatedly calculating various positions in those arrays multiple times in each loop. For example, you have this loop:
//calculate distance between particles in central cell + particles in neighboring cell. if smaller than interaction range, add to neighbors
for (unsigned int k = 0; k < centralCell.size(); k++){
neighborCount[centralCell[k]] +=1;
for (int d = 0; d < dim; d++){
averages[centralCell[k]][d] += unitVectors[centralCell[k]][d];
}
for(unsigned int l = 0; l < k; l++){
if(checkDistance(particleList[centralCell[k]], particleList[centralCell[l]])){
neighborCount[centralCell[k]] +=1;
neighborCount[centralCell[l]] +=1;
for (int d = 0; d < dim; d++){
averages[centralCell[k]][d] += unitVectors[centralCell[l]][d];
averages[centralCell[l]][d] += unitVectors[centralCell[k]][d];
}
}
}
}
In it, you calculate centralCell[k]
7 time and centralCell[l]
4 times. And then in the next loop you do something similar. It would be more efficient to do something like this:
for (unsigned int k = 0; k < centralCell.size(); k++)
{
int index = centralCell[k];
neighborCount[index] += 1;
//... etc., replacing centralCell[k] with index everywhere
}
And of course, if you were using a proper Vector*
class, you'd have fewer inner loops.
Don't Repeat Yourself
You have 5 nearly identical loops that make up a good portion of calcForces()
. The first one is slightly different from the next 4 because you're putting them into the centralCell
vector rather than the neighboringCell
vector. The last 4 loops, at least, could be combined into 1 by putting all of the indexes into an array and iterating over that array. Something like this:
int neighborIndexes[4] = {
header[i][j],
header[(i+1) % numberOfCells][j],
header[i][(j+1) % numberOfCells],
header[(i+1) % numberOfCells][((j-1) % numberOfCells + numberOfCells) % numberOfCells]
};
for (int k = 0; k < 4; k++)
{
int tempIndex = neighborIndexes[k];
while(tempIndex > -1) {
neighboringCell.emplace_back(tempIndex);
tempIndex = link[tempIndex];
}
}
It's shorter and easier to read. There is slightly more overhead because we've added on outer loop, though. You'd need to profile to see whether it makes any difference.
Conclusions
Overall this code is very messy. You have one very long function that should be broken up into smaller, properly named pieces that are easier to understand at a glance.
You are allocating a very large number of vector
s at the start of the calcForces
function, and your profiling seems to indicate that's a problem. I think that having a Vector*
class will reduce that somewhat. For example, instead of a 2D vector
at the very beginning of calcForces
, you'd only need a 1D vector
, like this:
vector<VectorBase> forces(numberOfParticles);
or if it was templatized:
vector<T> forces(numberOfParticles);
-
\$\begingroup\$ This is amazing, thank you so much for this. I'll get started with adding this vector2D class. It will indeed increase readability by a lot, and hopefully will make it easier to parallelize everything later. \$\endgroup\$K. Kim– K. Kim2017年03月26日 18:10:46 +00:00Commented Mar 26, 2017 at 18:10
-
\$\begingroup\$ I've edited in your changes, thanks again. Will update after extensive profiling! \$\endgroup\$K. Kim– K. Kim2017年03月26日 20:15:03 +00:00Commented Mar 26, 2017 at 20:15
Without all the relevant classes, it's hard to say for sure, but those two nested loop blocks to calculate distance between particles, definitely smell for me.
It looks to me that they are both doing mostly the same things, you should be able to find a way to combine those into 1 nested loop block, perhaps having more than one counter in a loop and running centralcell and neighborcell statements in one loop would work?
In regards to the vectors, you are creating 7 of them and of those 4 are 2D. I would definitely look at trying to combine some of that data into a supplementary class to reduce the amount of vectors you're creating.
With the help of user111832, I've changed my code to the following:
I've added a vector class (just 2D for now, will add 3D later)
class Vector2D{
public:
double _x;
double _y;
Vector2D(){ _x = 0; _y = 0; }
Vector2D(double x, double y){ _x = x; _y = y; }
double norm() const
{
return sqrt(_x*_x + _y*_y);
}
Vector2D& operator+=(Vector2D v){
_x += v._x;
_y += v._y;
return *this;
}
Vector2D& operator-=(Vector2D v){
_x -= v._x;
_y -= v._y;
return *this;
}
Vector2D& operator*=(double s) {
_x *= s;
_y *= s;
return *this;
}
Vector2D& operator/=(double s) {
_x /= s;
_y /= s;
return *this;
}
};
inline Vector2D operator+(Vector2D a, Vector2D b) { return a += b; }
inline Vector2D operator-(Vector2D a, Vector2D b) { return a -= b; }
inline Vector2D operator*(Vector2D a, double s) { return a *= s; }
inline Vector2D operator*(Vector2D a, Vector2D b){ return a._x * b._x + a._y * b._y; } //inner product of two vectors
inline Vector2D operator*(double s, Vector2D b) { return b *= s; }
inline Vector2D operator/(Vector2D a, double s) { return a /= s; }
inline Vector2D normalize(Vector2D a){ return a / a.norm(); }
Using this, I changed my Particle class to a container class with
class Particle {
public:
Vector2D pos;
Vector2D vel;
};
My distance check has then been changed to the following:
bool checkDistance(Particle& p1, Particle& p2){
double dist = 0, relDist;
relDist = p1.pos._x - p2.pos._x;
if (fabs(relDist) > systemSize / 2){ //max separation in one dimension is half of system size
if (relDist < 0){
relDist += systemSize;
}
else {
relDist -= systemSize;
}
}
dist += relDist * relDist;
relDist = p1.pos._y - p2.pos._y;
if (fabs(relDist) > systemSize / 2){ //max separation in one dimension is half of system size
if (relDist < 0){
relDist += systemSize;
}
else {
relDist -= systemSize;
}
}
dist += relDist * relDist;
return (dist < 1);
}
And the calculation of forces to
vector<Vector2D> System::calcForces(vector<Vector2D>& noise){
vector<Vector2D> forces (numberOfParticles); //contains the force working on every particle
vector<Vector2D> averages(numberOfParticles); //contains average velocities of neighboring particles
/* creates neighborlist with linked cell method*/
int numberOfCells = systemSize; //number of cells in one dimension. This way, all particles within interactionRange from a certain particle are in the same cell and in the neighboring cells.
vector<vector<int> > header(numberOfCells, vector<int> (numberOfCells, -1)); //contains highest particle index for each cell, -1 if no particle -> need better option for this
vector<int> link(numberOfParticles, -1); //gives index of particle in same cell; -1 if no particle
vector<int> centralCell, neighboringCell;
centralCell.reserve(numberOfParticles/(systemSize*systemSize) *2);
neighboringCell.reserve(numberOfParticles/(systemSize*systemSize) *2* 4);
vector<Vector2D> unitVectors (numberOfParticles); //create unit vector of velocities for all particles
for(int i = 0; i < numberOfParticles; i++){
unitVectors[i] = normalize(particleList[i].vel);
}
vector<int> neighborCount(numberOfParticles,0);
for (int i = 0; i < numberOfParticles; i++){ //fill header and link
int xIndex = particleList[i].pos._x;
int yIndex = particleList[i].pos._y;
link[i] = header[xIndex][yIndex];
header[xIndex][yIndex] = i;
}
for (int i = 0; i < numberOfCells; i++){ //only need to check with 4 cells. Uses PBC
for (int j = 0; j < numberOfCells; j++){
int tempIndex = header[i][j];
while (tempIndex > -1){ //fill vector with particles in central cell
centralCell.emplace_back(tempIndex);
tempIndex = link[tempIndex];
}
int neighborIndexes[4] = {
header[(i + 1) % numberOfCells][j],
header[i][(j + 1) % numberOfCells],
header[(i + 1) % numberOfCells][(j + 1) % numberOfCells],
header[(i + 1) % numberOfCells][((j - 1) % numberOfCells + numberOfCells) % numberOfCells]
};
for (int k = 0; k < 4; k++)
{
int tempIndex = neighborIndexes[k];
while (tempIndex > -1) {
neighboringCell.emplace_back(tempIndex);
tempIndex = link[tempIndex];
}
}
int index1, index2;
//calculate distance between particles in central cell + particles in neighboring cell. if smaller than interaction range, add to neighbors
for (unsigned int k = 0; k < centralCell.size(); k++){
index1 = centralCell[k];
neighborCount[index1] +=1;
averages[index1] += unitVectors[index1];
for(unsigned int l = 0; l < k; l++){
index2 = centralCell[l];
if(checkDistance(particleList[index1], particleList[index2])){
neighborCount[index1] +=1;
neighborCount[index2] +=1;
averages[index1] += unitVectors[index2];
averages[index2] += unitVectors[index1];
}
}
}
for(unsigned int k = 0; k < centralCell.size(); k++){
index1 = centralCell[k];
for(unsigned int l = 0; l < neighboringCell.size(); l++){
index2 = neighboringCell[l];
if(checkDistance(particleList[index1], particleList[index2])){
neighborCount[index1] +=1;
neighborCount[index2] +=1;
averages[index1] += unitVectors[index2];
averages[index1] += unitVectors[index2];
}
}
}
centralCell.clear();
neighboringCell.clear();
}
}
double dotProduct = 0.0;
for(int i = 0; i < numberOfParticles; i++){ //calculate social force + friction + noise
forces[i] = noise[i]- particleList[i].vel; //stochastic + friction
averages[i] /= neighborCount[i] ;
dotProduct = averages[i] * unitVectors[i];
forces[i] += couplingFactor * (averages[i]- unitVectors[i] * dotProduct);
}
}
return forces;
}
This is a lot easier on the eyes (although, I'm sure, there's still a lot that can be done here), and will allow for easier implementation of the parallelization. I want to come up with a way to eliminate the 2D array 'header' I'm using right now, but I assume this is not the forum to ask such questions.
Some testruns indicate that my calculations now go twice as fast. Thanks a lot everyone! If anything is still unclear or should be changed, let me know
dim
? Is only ever 2 or 3? Is it global? It seems odd that it's used but not passed in. \$\endgroup\$