I am trying to write a python implementation of Euler-Maruyama and Milstein schemes for numerically solving stochastic differential equations. The pseudo-code for the algorithms is in the Wikipedia links.
Would you review my code to see if it can be improved and is closer to production-quality? I've tried to use snake_style
convention for all variable names.
from dataclasses import dataclass
from abc import ABC, abstractmethod
from typing import Callable, Optional
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
sns.set_style("whitegrid")
@dataclass
class SDE:
"""
An abstraction for a stochastic differential equation
"""
initial_condition: float
drift: Callable[[float, float], float]
vol: Callable[[float, float], float]
dvol_dx: Optional[Callable[[float, float], float]]
@dataclass
class Solver(ABC):
"""
An abstract base class for all numerical schemes
"""
t_start: float = 0.0
t_end: float = 1.0
step_size: float = 0.001
num_paths: int = 10
def __post_init__(self):
self.iter = 0
self.x_curr = np.zeros((self.num_paths,))
self.num_steps = int((self.t_end - self.t_start) / self.step_size)
self.times = np.linspace(self.t_start, self.t_end, self.num_steps + 1)
self.brownian_increments = np.sqrt(self.step_size) * np.random.standard_normal(
size=(self.num_paths, self.num_steps)
)
self.brownian = np.cumsum(self.brownian_increments, axis=1)
self.brownian = np.concatenate(
[np.zeros(shape=(self.num_paths, 1)), self.brownian], axis=1
)
self.solution = []
@abstractmethod
def iterate(self):
"""
Compute the next iterate X(n+1)
"""
def solve(self, sde: SDE):
"""
Solve the SDE
"""
self.x_curr = np.full(shape=(self.num_paths,), fill_value=sde.initial_condition)
self.solution = [self.x_curr.copy()]
while self.iter < self.num_steps:
self.solution.append(self.iterate(sde))
self.iter += 1
return np.array(self.solution).transpose()
def reset(self):
self.__post_init__()
@dataclass
class Milstein(Solver):
"""
Numerical solver for a stochastic differential equation(SDE) using
the Euler-Maruyama method.
Consider an SDE of the form :
dX_t = mu(t,X_t) dt + sigma(t,X_t) dB_t
with initial condition X_0 = x_0
The solution to the SDE can be computed using the increments
X_{n+1} - X_n = mu(n,X_n)(t_{n+1}-t_n) + sigma(n,X_n)(B(n+1)-B(n))
+ 0.5 * sigma(n,X_n) * sigma'(n,X_n) * ((B(n+1) - B(n))**2 - (t_{n+1} - t_n))
"""
def iterate(self, sde: SDE):
"""
Generate the next iterate X(n+1)
"""
mu_n = np.array([sde.drift(self.times[self.iter], x) for x in self.x_curr])
sigma_n = np.array([sde.vol(self.times[self.iter], x) for x in self.x_curr])
dvol_dx_n = np.array(
[sde.dvol_dx(self.times[self.iter], x) for x in self.x_curr]
)
d_brownian = self.brownian[:, self.iter + 1] - self.brownian[:, self.iter]
self.x_curr += (
mu_n * self.step_size
+ sigma_n * d_brownian
+ 0.5 * sigma_n * dvol_dx_n * (d_brownian**2 - self.step_size)
)
return self.x_curr.copy()
@dataclass
class EulerMaruyama(Solver):
"""
Numerical solver for a stochastic differential equation(SDE) using
the Euler-Maruyama method.
Consider an SDE of the form :
dX_t = mu(t,X_t) dt + sigma(t,X_t) dB_t
with initial condition X_0 = x_0
The solution to the SDE can be computed using the increments
X_{n+1} - X_n = mu(n,X_n)(t_{n+1}-t_n) + sigma(n,X_n)(B(n+1)-B(n))
"""
def iterate(self, sde: SDE):
"""
Generate the next iterate X(n+1)
"""
mu_n = np.array([sde.drift(self.times[self.iter], x) for x in self.x_curr])
sigma_n = np.array([sde.vol(self.times[self.iter], x) for x in self.x_curr])
d_brownian = self.brownian[:, self.iter + 1] - self.brownian[:, self.iter]
self.x_curr += mu_n * self.step_size + sigma_n * d_brownian
return self.x_curr.copy()
if __name__ == "__main__":
# Let's solve the following SDE
# dS_t = S_t dB_t + (mu + 1/2) S_t dt
# where mu = 0. This has the known solution S_t = exp(B_t - t/2)
gbm_sde = SDE(
initial_condition=1.0,
drift = lambda t, s_t : 0.50 * s_t,
vol = lambda t, s_t : s_t,
dvol_dx = lambda t, s_t : 1
)
t = np.linspace(start=0.0, stop=1.0, num=1001)
euler = EulerMaruyama()
solution = euler.solve(gbm_sde)
plt.xlabel(r'Time $t$')
plt.ylabel(r'$S(t)$')
plt.title(r'10 sample paths of Geometric Brownian Motion, $\mu=0.0,ドル Euler method')
plt.grid(True)
for i in range(10):
plt.plot(t, solution[i])
plt.show()
milstein = Milstein()
solution = milstein.solve(gbm_sde)
plt.xlabel(r'Time $t$')
plt.ylabel(r'$S(t)$')
plt.title(r'10 sample paths of Geometric Brownian Motion, $\mu=0.0,ドル Milstein method')
plt.grid(True)
for i in range(10):
plt.plot(t, solution[i])
plt.show()
3 Answers 3
Use NumPy vectorized operations instead of list comprehensions whenever possible. This can significantly improve performance, especially when dealing with large data. For instance, in the
iterate
functions of bothEulerMaruyama
andMilstein
, you can replace the list comprehensions with NumPy vectorized operations to computemu_n
andsigma_n
.Use type hints for function parameters and return types for better code readability and compatibility with statically-typed languages or IDEs. For example, in your current implementation, the
drift
,vol
, anddvol_dx
functions are expected to take two arguments (time and state), but you haven't specified this in the type hints.Use a logging library instead of printing debug information directly to the console. This will make it easier to manage and filter log messages, especially when dealing with large datasets or complex simulations. For instance, you can use Python's built-in
logging
module to configure different log levels and output formats.Use appropriate error handling mechanisms for function inputs or simulation scenarios that may raise exceptions. This will help prevent your code from crashing unexpectedly when encountering invalid input values or other errors. For example, you can use try-except blocks to handle potential errors and provide meaningful error messages.
Consider implementing additional methods or functions to analyze the simulation results, such as computing statistics (e.g., mean, standard deviation), visualizing the sample paths, or plotting summary figures (e.g., histograms, density plots). This can help gain a deeper understanding of the simulated SDE and its behavior under different conditions.
Here's an example of how you might modify your EulerMaruyama
class to
use NumPy vectorized operations:
import numpy as np
class EulerMaruyama(Solver):
def iterate(self, sde, state):
mu_n = self.sde.drift(np.repeat(self.times[self.iter], len(state)), state)
sigma_n = self.sde.vol(np.repeat(self.times[self.iter], len(state)), state)
d_brownian = self.brownian[:, self.iter + 1] - self.brownian[:, self.iter]
delta_t = np.ones((len(state), 1)) * self.step_size
self.x_curr += np.hstack((mu_n, sigma_n * d_brownian + delta_t))
return self.x_curr.copy()
In this example, the EulerMaruyama
class now uses NumPy's vectorized
operations (repeat
, hstack
) to compute mu_n
and sigma_n
for all
states in parallel instead of using a list comprehension.
-
1\$\begingroup\$ Re #2:
drift
,vol
, anddvol_dx
are type-hinted in OP's implementation. Please clarify what you mean. \$\endgroup\$tdy– tdy2024年07月10日 16:20:34 +00:00Commented Jul 10, 2024 at 16:20 -
\$\begingroup\$ Re #3: I don't see any debugging info being printed in OP's implementation. Please clarify. \$\endgroup\$tdy– tdy2024年07月10日 16:21:11 +00:00Commented Jul 10, 2024 at 16:21
-
\$\begingroup\$ Re #5: Please clarify what you mean that OP should consider visualizing the sample paths. OP already plotted the sample paths as
solution[i]
vst
. \$\endgroup\$tdy– tdy2024年07月10日 17:03:09 +00:00Commented Jul 10, 2024 at 17:03 -
3\$\begingroup\$ @tdy this smells of an AI generated answer based on the writing style and new user. \$\endgroup\$qwr– qwr2024年07月12日 03:57:48 +00:00Commented Jul 12, 2024 at 3:57
This looks pretty good to me.
The only thing I would comment is that:
mu_n = np.array([sde.drift(self.times[self.iter], x) for x in self.x_curr])
sigma_n = np.array([sde.vol(self.times[self.iter], x) for x in self.x_curr])
dvol_dx_n = np.array([sde.dvol_dx(self.times[self.iter], x) for x in self.x_curr])
are not vectorised, and are therefore potentially a little or a lot slower than they could be.
Consider the following example:
def slow():
a = np.ones(1000)
f = lambda x: 0.5 * x
b = np.array([f(x) for x in a])
return b
%timeit slow()
# 185 μs ± 11.8 μs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
def fast():
a = np.ones(1000)
f = lambda x: 0.5 * x
b = f(a)
return b
%timeit fast()
# 3.82 μs ± 261 ns per loop (mean ± std. dev. of 7 runs, 100,000 loops each)
Your class iterate
methods enforce that the underlying function is not vectorised and therefore enforces slowness.
Try expecting a vectorised solution and allowing the initialiser to handle cases where vectorisation is not possible, since you gain flexibility this way in the scenario where it is not possible to vectorise.
mu_n = sde.drift(self.times[self.iter], self.x_curr)
sigma_n = sde.vol(self.times[self.iter], self.x_curr)
dvol_dx_n = sde.dvol_dx(self.times[self.iter], self.x_curr)
where
# FAST
gbm_sde = SDE(
initial_condition=1.0,
drift = lambda t, s_t : 0.50 * s_t,
vol = lambda t, s_t : s_t,
dvol_dx = lambda t, s_t : np.ones(len(s_t)),
)
# SLOW
gbm_sde = SDE(
initial_condition=1.0,
drift = lambda t, s_t : np.array([0.50 * x for x in s_t]),
vol = lambda t, s_t : np.array([x for x in s_t]),
dvol_dx = lambda t, s_t : np.array([1.0 for x in s_t]),
)
self.num_steps = int((self.t_end - self.t_start) / self.step_size)
self.times = np.linspace(self.t_start, self.t_end, self.num_steps + 1)
You may not reach t_end
. If the step-size is fixed, why not use num_steps
as the parameter?
self.brownian_increments = np.sqrt(self.step_size) * np.random.standard_normal(
size=(self.num_paths, self.num_steps)
)
self.brownian = np.cumsum(self.brownian_increments, axis=1)
self.brownian = np.concatenate(
[np.zeros(shape=(self.num_paths, 1)), self.brownian], axis=1
)
You don't really use either of these, only through
d_brownian = self.brownian[:, self.iter + 1] - self.brownian[:, self.iter]
at each time-step. I'd calculate it once at the start, and store that. In fact, the differences are the Gaussians you have already computed:
self.d_brownian = np.sqrt(self.step_size) * np.random.standard_normal(
size=(self.num_paths, self.num_steps)
)
If the number of steps is known apriori, then there is no reason for self.solution
to be a list, instead of a numpy
array that you successively fill.
self.solution = np.zeros((self.num_paths, self.num_steps))
It would be faster if your functions like sde.drift
were assumed to be vectorized, returning numpy
arrays, and then
mu_n = np.array([sde.drift(self.times[self.iter], x) for x in self.x_curr])
would become
mu_n = sde.drift(self.times[self.iter], x)
This is minor, but
for i in range(10):
plt.plot(t, solution[i])
could be
plt.plot(t, solution.T)
I also see you doing
self.x_curr += ...
right before a
return self.x_curr.copy()
which brings me to suggest that you restructure your code.
I like the SDE
class. I'd either remove the initial_condition
from it, or also include the t_start, t_end
values. All these three are properties of the initial value problem, not really the properties of the stochastic diff.eq. itself.
We are perhaps in the territory of what looks subjectively cleaner, so I'm just going to offer a rewrite.
from dataclasses import dataclass
from abc import ABC, abstractmethod
from typing import Callable, Optional
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
sns.set_style("whitegrid")
# for more descriptive type hints
T = np.array
X = np.ndarray
@dataclass
class SIVP:
"""
An abstraction for a stochastic initial value problem
"""
t_start: float
t_end: float
initial_condition: float
drift: Callable[[float, float], float]
vol: Callable[[float, float], float]
dvol_dx: Optional[Callable[[float, float], float]]
@dataclass
class Solver(ABC):
"""
An abstract base class for all numerical schemes
"""
num_steps: int = 1000
num_paths: int = 10
@staticmethod
@abstractmethod
def time_step(dt: float, t: T, x: X, dw: X) -> X:
"""
Compute the next iterate X(n+1)
"""
def solve(self, sivp: SIVP) -> (T, X):
"""
Solve the SDE
"""
# stepsize
dt = (sivp.t_end - sivp.t_start) / self.num_steps
# Gaussian increments
dws = np.random.normal(
scale=np.sqrt(dt),
size=(self.num_steps, self.num_paths)
)
# times
ts = np.linspace(sivp.t_start, sivp.t_end, self.num_steps + 1)
# trajectories
xs = np.zeros((self.num_steps + 1, self.num_paths))
xs[0] = sivp.initial_condition
#time-stepping
for step_ix in range(self.num_steps):
xs[step_ix + 1] = self.time_step(sivp, dt, ts[step_ix], xs[step_ix], dws[step_ix])
return ts, xs
@dataclass
class Milstein(Solver):
"""
Numerical solver for a stochastic differential equation(SDE) using
the Euler-Maruyama method.
Consider an SDE of the form :
dX_t = mu(t,X_t) dt + sigma(t,X_t) dB_t
with initial condition X_0 = x_0
The solution to the SDE can be computed using the increments
X_{n+1} - X_n = mu(n,X_n)(t_{n+1}-t_n) + sigma(n,X_n)(B(n+1)-B(n))
+ 0.5 * sigma(n,X_n) * sigma'(n,X_n) * ((B(n+1) - B(n))**2 - (t_{n+1} - t_n))
"""
@staticmethod
def time_step(sivp: SIVP, dt: float, t: T, x: X, dw: X) -> X:
"""
Generate the next iterate X(n+1)
"""
mu_n = sivp.drift(t, x)
sigma_n = sivp.vol(t, x)
dvol_dx_n = sivp.dvol_dx(t, x)
dx = (
dt * mu_n
+ sigma_n * dw
+ 0.5 * sigma_n * dvol_dx_n * (dw**2 - dt)
)
return x + dx
@dataclass
class EulerMaruyama(Solver):
"""
Numerical solver for a stochastic differential equation(SDE) using
the Euler-Maruyama method.
Consider an SDE of the form :
dX_t = mu(t,X_t) dt + sigma(t,X_t) dB_t
with initial condition X_0 = x_0
The solution to the SDE can be computed using the increments
X_{n+1} - X_n = mu(n,X_n)(t_{n+1}-t_n) + sigma(n,X_n)(B(n+1)-B(n))
"""
@staticmethod
def time_step(sivp: SIVP, dt: float, t: T, x: X, dw: X) -> X:
"""
Generate the next iterate X(n+1)
"""
mu_n = sivp.drift(t, x)
sigma_n = sivp.vol(t, x)
dvol_dx_n = sivp.dvol_dx(t, x)
dx = dt * mu_n + sigma_n * dw
return x + dx
# Let's solve the following SDE
# dS_t = S_t dB_t + (mu + 1/2) S_t dt
# where mu = 0. This has the known solution S_t = exp(B_t - t/2)
gbm_sde = SIVP(
t_start = 0.0,
t_end = 1.0,
initial_condition = 1.0,
drift = lambda t, s_t : 0.50 * s_t,
vol = lambda t, s_t : s_t,
dvol_dx = lambda t, s_t: 1 + 0*s_t
)
ts, xs = EulerMaruyama().solve(gbm_sde)
plt.xlabel(r'Time $t$')
plt.ylabel(r'$S(t)$')
plt.title(r'10 sample paths of Geometric Brownian Motion, $\mu=0.0,ドル Euler method')
plt.grid(True)
plt.plot(ts, xs)
plt.show()
ts, xs = Milstein().solve(gbm_sde)
plt.xlabel(r'Time $t$')
plt.ylabel(r'$S(t)$')
plt.title(r'10 sample paths of Geometric Brownian Motion, $\mu=0.0,ドル Milstein method')
plt.grid(True)
plt.plot(ts, xs)
plt.show()
dataclass
as well.dataclass
is mainly used to represent data, and a long__post_init__()
usually means code smell. Some of my classes are workers. \$\endgroup\$