I didn't know whether it was best to start with the general task I'm trying to solve, or the meat of the programming problem. They're both below but the former is rather long and solving the latter may not require it, so feel free to skip at will.
The task to be solved
I'm running a relatively simple dynamical simulation, following the gravitational evolution of a series of spherical shells, between each pair of which there is a fixed mass.
To explain with an example, between shell 3 and 4 there's a mass of 2, when the shells are far apart the density is low but this mass is always contained between them. Other shells may overlap, crossing in and out as the simulation continues, changing the density in at least some of the region between shells 3 and 4, but no mass is transferred.
I am trying to write an efficient function which, for each shell, can find the mass internal (at smaller radii).
To find the contribution to mass internal of shell n
(let's call it M[n]
), by the mass between shells j
and j+1
(which we'll call m[j]
), we need to know if:
- Shells
j
andj+1
are both contained within shelln
, thusM[n]+=m[j]
- Shells
j
and j+1
are both outside of shelln
, thusM[n]+=0
- One of shell
j
orj+1
are outside ofn
, and the other within, thusM[n]+=
some fraction ofm[j]
(Let's assume mass is distributed uniformly between shells, meaning the fraction in case c) is just the ratio of volume between the inner shell and shell n
, compared to between shells j
and j+1
.)
The difficulty in solving it
I've written two Python (+ NumPy) solutions to the problem, the first a quick and ugly long for loop, designed just to build the rest of the simulation and to be improved on later. It's neither quick nor beautiful, but it does the job.
I then returned to the problem with the intention of streamlining and speeding it up. The general philosophy I've been operating under is: for
loops are slow, array operations are fast.
I carefully worked out how to do the whole thing with no for
loops, and mostly operations on integer arrays. And yet to my dismay, it's slower, and seems to scale worse, than the block headed for
loop I started with.
My question then is where my understanding of the efficiency of numpy has broken down. Where, in the second solution, are my uses of arrays inefficient, and when does the tradeoff of a simple for loops supersede a complex array treatment?
As you may guess, I'm not a hugely experienced python programmer, with more experience in c++ and IDL, so my mistakes may well be clear as day and easily avoidable.
The solutions
Both solutions are self contained functions that take 2 inputs, an array of radii at which each shell sits, and an array of the fixed masses between each pair of shells. I've been running this with 500 shells (and hence 499 masses), I'd like to increase this by an order of magnitude if possible, but that may be ambitious.
Solution 1: The for
loop
For each shell identifies the masses internal (or overlapping) and sums the relevant masses:
def findMass(rads,shellM):
nR=rads.size
massInRad=5+np.zeros(nR)
for i in range(0,nR):
thisRad=rads[i]
innerMass=0
inside,=np.where((rads[:-1]<=thisRad) & (rads[1:]<=thisRad)) #both shell bounds within r
overLR,=np.where((rads[:-1]<thisRad) & (rads[1:]>thisRad)) #lower shell bound within r (returns index below r)
overRL,=np.where((rads[:-1]>thisRad) & (rads[1:]<thisRad)) #upper shell bound within r (returns index above r)
if (inside.size > 0):
innerMass+=np.sum(shellM[inside])
if (overLR.size > 0):
for index in overLR:
innerMass+=shellM[index]*(thisRad**3-rads[index]**3)/(rads[index+1]**3-rads[index]**3)
if (overRL.size > 0):
for index in overRL:
innerMass+=shellM[index]*(thisRad**3-rads[index+1]**3)/(rads[index]**3-rads[index+1]**3)
massInRad[i]+=innerMass
return massInRad
Solution 2: The array method
Using the order of shells identifies which case (internal, external or overlapping) is true between every combination of shells and masses:
def findMass(rads,mass):
def addCase(M,case,whichCase):
thisCase=np.where(case==whichCase)
ns=thisCase[0]
js=thisCase[1]
if (whichCase==3): # case c, mass j contained with shell n
mAdd=mass[js]
if (whichCase==1): # case b, mass j split over shell n, with shell j+1 at or further than shell n
mAdd=mass[js]*(rads[ns]**3-rads[js+1]**3)/(rads[js]**3-rads[js+1]**3)
if (whichCase==-1): # case d, mass j split over shell n, with shell j at or further than shell n
mAdd=mass[js]*(rads[ns]**3-rads[js]**3)/(rads[js+1]**3-rads[js]**3)
# case a, mass j not contained within shell n, requires no action
np.add.at(M,ns,mAdd)
return M
nRads=rads.size
M=5+np.zeros(nRads)
s=np.argsort(np.argsort(rads))
jDiff=np.tile(s,(nRads-1,1)).transpose() - np.tile(s[:-1],(nRads,1))
jSign=np.sign(jDiff+0.1).astype(int)
jPlusDiff=np.tile(s,(nRads-1,1)).transpose() - np.tile(s[1:],(nRads,1))
jPlusSign=np.sign(jPlusDiff+0.1).astype(int)
case=2*jPlusSign + jSign
M=addCase(M,case,3) #case C
M=addCase(M,case,1) #case B
M=addCase(M,case,-1) #case D
return M
Test case For a very simple simulation to run this on:
def runSim(nRads,nSteps):
rads=1+(np.arange(nRads)**2)/nRads
vel=np.zeros(nRads)
mass=1+np.arange(nRads)
lSq=findMass(rads,mass)*rads+1
for step in range(nSteps):
rads=rads+vel
massEnclosed=findMass(rads,mass)
acc=0.001*(lSq-massEnclosed*rads)/np.power(rads,3)
vel=vel+acc
Using
%timeit runSim(500,500)
the first method takes 3.9 seconds whilst the second takes takes 14.5 seconds
I was rather expecting the second to be an order of magnitude faster, not 4 times slower, but can't really see where I'm going wrong?
-
\$\begingroup\$ Could you maybe post some example data, so we can play around with it? Just enough so the time difference between the two algorithms becomes visible. \$\endgroup\$Graipher– Graipher2016年12月07日 10:41:59 +00:00Commented Dec 7, 2016 at 10:41
1 Answer 1
the first [solution is] a quick and ugly long for loop, designed just to build the rest of the simulation and to be improved on later. It's neither quick nor beautiful, but it does the job.
That's a good start! Better to write something that you know is correct first, before worrying about performance.
for
loops are slow, array operations are fast.
That is often true - but not always, as you've seen.
Where, in the second solution, are my uses of arrays inefficient
You're using tile
in place of proper broadcasting. You also treat the data as dense, when in fact they're (very, very) sparse.
when does the tradeoff of a simple for loops supersede a complex array treatment?
Depends on context; there is not one answer.
This is a challenging system to refactor, because your simulation is very, very sensitive to perturbation at the level of machine precision, and amplifies that error rapidly as the number of simulation iterations increases (especially for indices that have high acceleration). I demonstrate a compromise regression test as follows:
import typing
import numpy as np
def findMass_loop(rads: np.ndarray, shellM: np.ndarray) -> np.ndarray:
nR=rads.size
massInRad=5+np.zeros(nR)
for i in range(0,nR):
thisRad=rads[i]
innerMass=0
inside,=np.where((rads[:-1]<=thisRad) & (rads[1:]<=thisRad)) #both shell bounds within r
overLR,=np.where((rads[:-1]<thisRad) & (rads[1:]>thisRad)) #lower shell bound within r (returns index below r)
overRL,=np.where((rads[:-1]>thisRad) & (rads[1:]<thisRad)) #upper shell bound within r (returns index above r)
if (inside.size > 0):
innerMass+=np.sum(shellM[inside])
if (overLR.size > 0):
for index in overLR:
innerMass+=shellM[index]*(thisRad**3-rads[index]**3)/(rads[index+1]**3-rads[index]**3)
if (overRL.size > 0):
for index in overRL:
innerMass+=shellM[index]*(thisRad**3-rads[index+1]**3)/(rads[index]**3-rads[index+1]**3)
massInRad[i]+=innerMass
return massInRad
def findMass_array(rads: np.ndarray, mass: np.ndarray) -> np.ndarray:
def addCase(M: np.ndarray, case: np.ndarray, whichCase: int) -> np.ndarray:
thisCase=np.where(case==whichCase)
ns=thisCase[0]
js=thisCase[1]
if (whichCase==3): # case c, mass j contained with shell n
mAdd=mass[js]
if (whichCase==1): # case b, mass j split over shell n, with shell j+1 at or further than shell n
mAdd=mass[js]*(rads[ns]**3-rads[js+1]**3)/(rads[js]**3-rads[js+1]**3)
if (whichCase==-1): # case d, mass j split over shell n, with shell j at or further than shell n
mAdd=mass[js]*(rads[ns]**3-rads[js]**3)/(rads[js+1]**3-rads[js]**3)
# case a, mass j not contained within shell n, requires no action
np.add.at(M,ns,mAdd)
return M
nRads=rads.size
M=5+np.zeros(nRads)
s=np.argsort(np.argsort(rads))
jDiff=np.tile(s,(nRads-1,1)).transpose() - np.tile(s[:-1],(nRads,1))
jSign=np.sign(jDiff+0.1).astype(int)
jPlusDiff=np.tile(s,(nRads-1,1)).transpose() - np.tile(s[1:],(nRads,1))
jPlusSign=np.sign(jPlusDiff+0.1).astype(int)
case=2*jPlusSign + jSign
M=addCase(M,case,3) #case C
M=addCase(M,case,1) #case B
M=addCase(M,case,-1) #case D
return M
def findMass_refactor(rads: np.ndarray, shell_m: np.ndarray) -> np.ndarray:
n_r = rads.size
mass_in_rad = np.full(shape=n_r, fill_value=5.)
rads3 = rads**3
lower = rads[:-1]
upper = rads[1:]
lower3 = rads3[:-1]
upper3 = rads3[1:]
shell_m0 = shell_m[:-1]
factors = np.broadcast_to(
array=shell_m0/(upper3 - lower3),
shape=(2, 2, n_r-1),
).copy()
factors[1, 0] *= -1
factors[0, 1] *= -lower3
factors[1, 1] *= upper3
for i, this_rad in enumerate(rads):
inner_mass = 0
rad_row = np.array((this_rad**3, 1))
inside = (lower <= this_rad) & (upper <= this_rad) # both shell bounds within r
inner_mass += shell_m0.dot(inside)
# lower shell bound within r (returns index below r)
overLR = (lower < this_rad) & (upper > this_rad)
# upper shell bound within r (returns index above r)
overRL = (lower > this_rad) & (upper < this_rad)
lrrl = np.stack((overLR, overRL), axis=0)
# i,n i,n,k k,
# 2,n 2,n,2 2,
# LR,RL factors rad_row
addend = np.einsum('ik,ijk,j->', lrrl, factors, rad_row)
mass_in_rad[i] += inner_mass + addend
return mass_in_rad
def run_sim(
method: typing.Callable[[np.ndarray, np.ndarray], np.ndarray],
n_rads: int = 500,
n_steps: int = 500,
) -> tuple[
np.ndarray, # rads
np.ndarray, # vel
np.ndarray, # acc
]:
rads = 1 + np.arange(n_rads)**2/n_rads
vel = np.zeros(n_rads)
mass = np.arange(1, 1 + n_rads)
lSq = method(rads, mass)*rads + 1
for step in range(n_steps):
rads += vel
mass_enclosed = method(rads, mass)
acc = 1e-3*(lSq - mass_enclosed*rads)/rads**3
vel += acc
return rads, vel, acc
def test() -> None:
"""
The different implementations, due to high sensitivity to error at the level of machine
precision, do not converge for the first few indices if the number of steps is high. This is
because of high instability when the acceleration is high. There are two workarounds: either
compare on full steps and partial (20+) rad indices, or reduce the number of steps.
"""
steps = 20
print(findMass_refactor)
rads_c, vel_c, acc_c = run_sim(method=findMass_refactor, n_steps=steps)
for method in (findMass_loop, findMass_array):
print(method)
rads, vel, acc = run_sim(method=method, n_steps=steps)
assert np.allclose(rads, rads_c, atol=0, rtol=1e-13)
assert np.allclose(vel, vel_c, atol=0, rtol=1e-13)
assert np.allclose(acc, acc_c, atol=0, rtol=1e-12)
if __name__ == '__main__':
test()
My demonstrated refactor()
is further proof that vectorisation on its own is insufficient for this system. Further improvements would use scipy.sparse
.
Explore related questions
See similar questions with these tags.