3
\$\begingroup\$

I didn't know whether it was best to start with the general task I'm trying to solve, or the meat of the programming problem. They're both below but the former is rather long and solving the latter may not require it, so feel free to skip at will.

The task to be solved

I'm running a relatively simple dynamical simulation, following the gravitational evolution of a series of spherical shells, between each pair of which there is a fixed mass.

To explain with an example, between shell 3 and 4 there's a mass of 2, when the shells are far apart the density is low but this mass is always contained between them. Other shells may overlap, crossing in and out as the simulation continues, changing the density in at least some of the region between shells 3 and 4, but no mass is transferred.

I am trying to write an efficient function which, for each shell, can find the mass internal (at smaller radii).

To find the contribution to mass internal of shell n (let's call it M[n]), by the mass between shells j and j+1 (which we'll call m[j]), we need to know if:

  1. Shells j and j+1 are both contained within shell n, thus M[n]+=m[j]
  2. Shells j and j+1 are both outside of shell n, thus M[n]+=0
  3. One of shell j or j+1 are outside of n, and the other within, thus M[n]+= some fraction of m[j]

(Let's assume mass is distributed uniformly between shells, meaning the fraction in case c) is just the ratio of volume between the inner shell and shell n, compared to between shells j and j+1.)

The difficulty in solving it

I've written two Python (+ NumPy) solutions to the problem, the first a quick and ugly long for loop, designed just to build the rest of the simulation and to be improved on later. It's neither quick nor beautiful, but it does the job.

I then returned to the problem with the intention of streamlining and speeding it up. The general philosophy I've been operating under is: for loops are slow, array operations are fast.

I carefully worked out how to do the whole thing with no for loops, and mostly operations on integer arrays. And yet to my dismay, it's slower, and seems to scale worse, than the block headed for loop I started with.

My question then is where my understanding of the efficiency of numpy has broken down. Where, in the second solution, are my uses of arrays inefficient, and when does the tradeoff of a simple for loops supersede a complex array treatment?

As you may guess, I'm not a hugely experienced python programmer, with more experience in c++ and IDL, so my mistakes may well be clear as day and easily avoidable.

The solutions

Both solutions are self contained functions that take 2 inputs, an array of radii at which each shell sits, and an array of the fixed masses between each pair of shells. I've been running this with 500 shells (and hence 499 masses), I'd like to increase this by an order of magnitude if possible, but that may be ambitious.

Solution 1: The for loop

For each shell identifies the masses internal (or overlapping) and sums the relevant masses:

def findMass(rads,shellM):
 nR=rads.size
 massInRad=5+np.zeros(nR)
 for i in range(0,nR):
 thisRad=rads[i]
 innerMass=0
 inside,=np.where((rads[:-1]<=thisRad) & (rads[1:]<=thisRad)) #both shell bounds within r
 overLR,=np.where((rads[:-1]<thisRad) & (rads[1:]>thisRad)) #lower shell bound within r (returns index below r)
 overRL,=np.where((rads[:-1]>thisRad) & (rads[1:]<thisRad)) #upper shell bound within r (returns index above r)
 if (inside.size > 0):
 innerMass+=np.sum(shellM[inside])
 if (overLR.size > 0):
 for index in overLR:
 innerMass+=shellM[index]*(thisRad**3-rads[index]**3)/(rads[index+1]**3-rads[index]**3)
 if (overRL.size > 0):
 for index in overRL:
 innerMass+=shellM[index]*(thisRad**3-rads[index+1]**3)/(rads[index]**3-rads[index+1]**3)
 massInRad[i]+=innerMass
 return massInRad

Solution 2: The array method

Using the order of shells identifies which case (internal, external or overlapping) is true between every combination of shells and masses:

def findMass(rads,mass):
 def addCase(M,case,whichCase):
 thisCase=np.where(case==whichCase)
 ns=thisCase[0]
 js=thisCase[1]
 if (whichCase==3): # case c, mass j contained with shell n
 mAdd=mass[js]
 if (whichCase==1): # case b, mass j split over shell n, with shell j+1 at or further than shell n
 mAdd=mass[js]*(rads[ns]**3-rads[js+1]**3)/(rads[js]**3-rads[js+1]**3)
 if (whichCase==-1): # case d, mass j split over shell n, with shell j at or further than shell n
 mAdd=mass[js]*(rads[ns]**3-rads[js]**3)/(rads[js+1]**3-rads[js]**3)
 # case a, mass j not contained within shell n, requires no action 
 np.add.at(M,ns,mAdd)
 return M
 nRads=rads.size
 M=5+np.zeros(nRads)
 s=np.argsort(np.argsort(rads))
 jDiff=np.tile(s,(nRads-1,1)).transpose() - np.tile(s[:-1],(nRads,1))
 jSign=np.sign(jDiff+0.1).astype(int)
 jPlusDiff=np.tile(s,(nRads-1,1)).transpose() - np.tile(s[1:],(nRads,1))
 jPlusSign=np.sign(jPlusDiff+0.1).astype(int)
 case=2*jPlusSign + jSign
 M=addCase(M,case,3) #case C
 M=addCase(M,case,1) #case B
 M=addCase(M,case,-1) #case D
 return M

Test case For a very simple simulation to run this on:

def runSim(nRads,nSteps):
 rads=1+(np.arange(nRads)**2)/nRads
 vel=np.zeros(nRads)
 mass=1+np.arange(nRads)
 lSq=findMass(rads,mass)*rads+1
 for step in range(nSteps):
 rads=rads+vel
 massEnclosed=findMass(rads,mass)
 acc=0.001*(lSq-massEnclosed*rads)/np.power(rads,3)
 vel=vel+acc

Using

%timeit runSim(500,500)

the first method takes 3.9 seconds whilst the second takes takes 14.5 seconds

I was rather expecting the second to be an order of magnitude faster, not 4 times slower, but can't really see where I'm going wrong?

asked Dec 6, 2016 at 14:26
\$\endgroup\$
1
  • \$\begingroup\$ Could you maybe post some example data, so we can play around with it? Just enough so the time difference between the two algorithms becomes visible. \$\endgroup\$ Commented Dec 7, 2016 at 10:41

1 Answer 1

3
\$\begingroup\$

the first [solution is] a quick and ugly long for loop, designed just to build the rest of the simulation and to be improved on later. It's neither quick nor beautiful, but it does the job.

That's a good start! Better to write something that you know is correct first, before worrying about performance.

for loops are slow, array operations are fast.

That is often true - but not always, as you've seen.

Where, in the second solution, are my uses of arrays inefficient

You're using tile in place of proper broadcasting. You also treat the data as dense, when in fact they're (very, very) sparse.

when does the tradeoff of a simple for loops supersede a complex array treatment?

Depends on context; there is not one answer.

This is a challenging system to refactor, because your simulation is very, very sensitive to perturbation at the level of machine precision, and amplifies that error rapidly as the number of simulation iterations increases (especially for indices that have high acceleration). I demonstrate a compromise regression test as follows:

import typing
import numpy as np
def findMass_loop(rads: np.ndarray, shellM: np.ndarray) -> np.ndarray:
 nR=rads.size
 massInRad=5+np.zeros(nR)
 for i in range(0,nR):
 thisRad=rads[i]
 innerMass=0
 inside,=np.where((rads[:-1]<=thisRad) & (rads[1:]<=thisRad)) #both shell bounds within r
 overLR,=np.where((rads[:-1]<thisRad) & (rads[1:]>thisRad)) #lower shell bound within r (returns index below r)
 overRL,=np.where((rads[:-1]>thisRad) & (rads[1:]<thisRad)) #upper shell bound within r (returns index above r)
 if (inside.size > 0):
 innerMass+=np.sum(shellM[inside])
 if (overLR.size > 0):
 for index in overLR:
 innerMass+=shellM[index]*(thisRad**3-rads[index]**3)/(rads[index+1]**3-rads[index]**3)
 if (overRL.size > 0):
 for index in overRL:
 innerMass+=shellM[index]*(thisRad**3-rads[index+1]**3)/(rads[index]**3-rads[index+1]**3)
 massInRad[i]+=innerMass
 return massInRad
def findMass_array(rads: np.ndarray, mass: np.ndarray) -> np.ndarray:
 def addCase(M: np.ndarray, case: np.ndarray, whichCase: int) -> np.ndarray:
 thisCase=np.where(case==whichCase)
 ns=thisCase[0]
 js=thisCase[1]
 if (whichCase==3): # case c, mass j contained with shell n
 mAdd=mass[js]
 if (whichCase==1): # case b, mass j split over shell n, with shell j+1 at or further than shell n
 mAdd=mass[js]*(rads[ns]**3-rads[js+1]**3)/(rads[js]**3-rads[js+1]**3)
 if (whichCase==-1): # case d, mass j split over shell n, with shell j at or further than shell n
 mAdd=mass[js]*(rads[ns]**3-rads[js]**3)/(rads[js+1]**3-rads[js]**3)
 # case a, mass j not contained within shell n, requires no action
 np.add.at(M,ns,mAdd)
 return M
 nRads=rads.size
 M=5+np.zeros(nRads)
 s=np.argsort(np.argsort(rads))
 jDiff=np.tile(s,(nRads-1,1)).transpose() - np.tile(s[:-1],(nRads,1))
 jSign=np.sign(jDiff+0.1).astype(int)
 jPlusDiff=np.tile(s,(nRads-1,1)).transpose() - np.tile(s[1:],(nRads,1))
 jPlusSign=np.sign(jPlusDiff+0.1).astype(int)
 case=2*jPlusSign + jSign
 M=addCase(M,case,3) #case C
 M=addCase(M,case,1) #case B
 M=addCase(M,case,-1) #case D
 return M
def findMass_refactor(rads: np.ndarray, shell_m: np.ndarray) -> np.ndarray:
 n_r = rads.size
 mass_in_rad = np.full(shape=n_r, fill_value=5.)
 rads3 = rads**3
 lower = rads[:-1]
 upper = rads[1:]
 lower3 = rads3[:-1]
 upper3 = rads3[1:]
 shell_m0 = shell_m[:-1]
 factors = np.broadcast_to(
 array=shell_m0/(upper3 - lower3),
 shape=(2, 2, n_r-1),
 ).copy()
 factors[1, 0] *= -1
 factors[0, 1] *= -lower3
 factors[1, 1] *= upper3
 for i, this_rad in enumerate(rads):
 inner_mass = 0
 rad_row = np.array((this_rad**3, 1))
 inside = (lower <= this_rad) & (upper <= this_rad) # both shell bounds within r
 inner_mass += shell_m0.dot(inside)
 # lower shell bound within r (returns index below r)
 overLR = (lower < this_rad) & (upper > this_rad)
 # upper shell bound within r (returns index above r)
 overRL = (lower > this_rad) & (upper < this_rad)
 lrrl = np.stack((overLR, overRL), axis=0)
 # i,n i,n,k k,
 # 2,n 2,n,2 2,
 # LR,RL factors rad_row
 addend = np.einsum('ik,ijk,j->', lrrl, factors, rad_row)
 mass_in_rad[i] += inner_mass + addend
 return mass_in_rad
def run_sim(
 method: typing.Callable[[np.ndarray, np.ndarray], np.ndarray],
 n_rads: int = 500,
 n_steps: int = 500,
) -> tuple[
 np.ndarray, # rads
 np.ndarray, # vel
 np.ndarray, # acc
]:
 rads = 1 + np.arange(n_rads)**2/n_rads
 vel = np.zeros(n_rads)
 mass = np.arange(1, 1 + n_rads)
 lSq = method(rads, mass)*rads + 1
 for step in range(n_steps):
 rads += vel
 mass_enclosed = method(rads, mass)
 acc = 1e-3*(lSq - mass_enclosed*rads)/rads**3
 vel += acc
 return rads, vel, acc
def test() -> None:
 """
 The different implementations, due to high sensitivity to error at the level of machine
 precision, do not converge for the first few indices if the number of steps is high. This is
 because of high instability when the acceleration is high. There are two workarounds: either
 compare on full steps and partial (20+) rad indices, or reduce the number of steps.
 """
 steps = 20
 print(findMass_refactor)
 rads_c, vel_c, acc_c = run_sim(method=findMass_refactor, n_steps=steps)
 for method in (findMass_loop, findMass_array):
 print(method)
 rads, vel, acc = run_sim(method=method, n_steps=steps)
 assert np.allclose(rads, rads_c, atol=0, rtol=1e-13)
 assert np.allclose(vel, vel_c, atol=0, rtol=1e-13)
 assert np.allclose(acc, acc_c, atol=0, rtol=1e-12)
if __name__ == '__main__':
 test()

My demonstrated refactor() is further proof that vectorisation on its own is insufficient for this system. Further improvements would use scipy.sparse.

answered Dec 30, 2024 at 22:01
\$\endgroup\$

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.