6
\$\begingroup\$

I am trying to write a python implementation of Euler-Maruyama and Milstein schemes for numerically solving stochastic differential equations. The pseudo-code for the algorithms is in the Wikipedia links.

Would you review my code to see if it can be improved and is closer to production-quality? I've tried to use snake_style convention for all variable names.

from dataclasses import dataclass
from abc import ABC, abstractmethod
from typing import Callable, Optional
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
sns.set_style("whitegrid")
@dataclass
class SDE:
 """
 An abstraction for a stochastic differential equation
 """
 initial_condition: float
 drift: Callable[[float, float], float]
 vol: Callable[[float, float], float]
 dvol_dx: Optional[Callable[[float, float], float]]
@dataclass
class Solver(ABC):
 """
 An abstract base class for all numerical schemes
 """
 t_start: float = 0.0
 t_end: float = 1.0
 step_size: float = 0.001
 num_paths: int = 10
 def __post_init__(self):
 self.iter = 0
 self.x_curr = np.zeros((self.num_paths,))
 self.num_steps = int((self.t_end - self.t_start) / self.step_size)
 self.times = np.linspace(self.t_start, self.t_end, self.num_steps + 1)
 self.brownian_increments = np.sqrt(self.step_size) * np.random.standard_normal(
 size=(self.num_paths, self.num_steps)
 )
 self.brownian = np.cumsum(self.brownian_increments, axis=1)
 self.brownian = np.concatenate(
 [np.zeros(shape=(self.num_paths, 1)), self.brownian], axis=1
 )
 self.solution = []
 @abstractmethod
 def iterate(self):
 """
 Compute the next iterate X(n+1)
 """
 def solve(self, sde: SDE):
 """
 Solve the SDE
 """
 self.x_curr = np.full(shape=(self.num_paths,), fill_value=sde.initial_condition)
 self.solution = [self.x_curr.copy()]
 while self.iter < self.num_steps:
 self.solution.append(self.iterate(sde))
 self.iter += 1
 return np.array(self.solution).transpose()
 def reset(self):
 self.__post_init__()
@dataclass
class Milstein(Solver):
 """
 Numerical solver for a stochastic differential equation(SDE) using
 the Euler-Maruyama method.
 Consider an SDE of the form :
 dX_t = mu(t,X_t) dt + sigma(t,X_t) dB_t
 with initial condition X_0 = x_0
 The solution to the SDE can be computed using the increments
 X_{n+1} - X_n = mu(n,X_n)(t_{n+1}-t_n) + sigma(n,X_n)(B(n+1)-B(n))
 + 0.5 * sigma(n,X_n) * sigma'(n,X_n) * ((B(n+1) - B(n))**2 - (t_{n+1} - t_n))
 """
 def iterate(self, sde: SDE):
 """
 Generate the next iterate X(n+1)
 """
 mu_n = np.array([sde.drift(self.times[self.iter], x) for x in self.x_curr])
 sigma_n = np.array([sde.vol(self.times[self.iter], x) for x in self.x_curr])
 dvol_dx_n = np.array(
 [sde.dvol_dx(self.times[self.iter], x) for x in self.x_curr]
 )
 d_brownian = self.brownian[:, self.iter + 1] - self.brownian[:, self.iter]
 self.x_curr += (
 mu_n * self.step_size
 + sigma_n * d_brownian
 + 0.5 * sigma_n * dvol_dx_n * (d_brownian**2 - self.step_size)
 )
 return self.x_curr.copy()
@dataclass
class EulerMaruyama(Solver):
 """
 Numerical solver for a stochastic differential equation(SDE) using
 the Euler-Maruyama method.
 Consider an SDE of the form :
 dX_t = mu(t,X_t) dt + sigma(t,X_t) dB_t
 with initial condition X_0 = x_0
 The solution to the SDE can be computed using the increments
 X_{n+1} - X_n = mu(n,X_n)(t_{n+1}-t_n) + sigma(n,X_n)(B(n+1)-B(n))
 """
 def iterate(self, sde: SDE):
 """
 Generate the next iterate X(n+1)
 """
 mu_n = np.array([sde.drift(self.times[self.iter], x) for x in self.x_curr])
 sigma_n = np.array([sde.vol(self.times[self.iter], x) for x in self.x_curr])
 d_brownian = self.brownian[:, self.iter + 1] - self.brownian[:, self.iter]
 self.x_curr += mu_n * self.step_size + sigma_n * d_brownian
 return self.x_curr.copy()
if __name__ == "__main__":
 # Let's solve the following SDE
 # dS_t = S_t dB_t + (mu + 1/2) S_t dt
 # where mu = 0. This has the known solution S_t = exp(B_t - t/2)
 gbm_sde = SDE(
 initial_condition=1.0,
 drift = lambda t, s_t : 0.50 * s_t,
 vol = lambda t, s_t : s_t,
 dvol_dx = lambda t, s_t : 1
 )
 t = np.linspace(start=0.0, stop=1.0, num=1001)
 euler = EulerMaruyama()
 solution = euler.solve(gbm_sde)
 plt.xlabel(r'Time $t$')
 plt.ylabel(r'$S(t)$')
 plt.title(r'10 sample paths of Geometric Brownian Motion, $\mu=0.0,ドル Euler method')
 plt.grid(True)
 for i in range(10):
 plt.plot(t, solution[i])
 plt.show()
 milstein = Milstein()
 solution = milstein.solve(gbm_sde)
 plt.xlabel(r'Time $t$')
 plt.ylabel(r'$S(t)$')
 plt.title(r'10 sample paths of Geometric Brownian Motion, $\mu=0.0,ドル Milstein method')
 plt.grid(True)
 for i in range(10):
 plt.plot(t, solution[i])
 plt.show()
toolic
14.6k5 gold badges29 silver badges204 bronze badges
asked Jul 10, 2024 at 7:59
\$\endgroup\$
2
  • 1
    \$\begingroup\$ The immediate red flag is using so many list comprehensions with numpy code, since they are almost never actually needed. \$\endgroup\$ Commented Jul 12, 2024 at 4:00
  • \$\begingroup\$ I think, I abused dataclass as well. dataclass is mainly used to represent data, and a long __post_init__() usually means code smell. Some of my classes are workers. \$\endgroup\$ Commented Jul 12, 2024 at 16:22

3 Answers 3

6
\$\begingroup\$
  1. Use NumPy vectorized operations instead of list comprehensions whenever possible. This can significantly improve performance, especially when dealing with large data. For instance, in the iterate functions of both EulerMaruyama and Milstein, you can replace the list comprehensions with NumPy vectorized operations to compute mu_n and sigma_n.

  2. Use type hints for function parameters and return types for better code readability and compatibility with statically-typed languages or IDEs. For example, in your current implementation, the drift, vol, and dvol_dx functions are expected to take two arguments (time and state), but you haven't specified this in the type hints.

  3. Use a logging library instead of printing debug information directly to the console. This will make it easier to manage and filter log messages, especially when dealing with large datasets or complex simulations. For instance, you can use Python's built-in logging module to configure different log levels and output formats.

  4. Use appropriate error handling mechanisms for function inputs or simulation scenarios that may raise exceptions. This will help prevent your code from crashing unexpectedly when encountering invalid input values or other errors. For example, you can use try-except blocks to handle potential errors and provide meaningful error messages.

  5. Consider implementing additional methods or functions to analyze the simulation results, such as computing statistics (e.g., mean, standard deviation), visualizing the sample paths, or plotting summary figures (e.g., histograms, density plots). This can help gain a deeper understanding of the simulated SDE and its behavior under different conditions.

Here's an example of how you might modify your EulerMaruyama class to use NumPy vectorized operations:

import numpy as np
class EulerMaruyama(Solver):
 def iterate(self, sde, state):
 mu_n = self.sde.drift(np.repeat(self.times[self.iter], len(state)), state)
 sigma_n = self.sde.vol(np.repeat(self.times[self.iter], len(state)), state)
 d_brownian = self.brownian[:, self.iter + 1] - self.brownian[:, self.iter]
 delta_t = np.ones((len(state), 1)) * self.step_size
 self.x_curr += np.hstack((mu_n, sigma_n * d_brownian + delta_t))
 return self.x_curr.copy()

In this example, the EulerMaruyama class now uses NumPy's vectorized operations (repeat, hstack) to compute mu_n and sigma_n for all states in parallel instead of using a list comprehension.

toolic
14.6k5 gold badges29 silver badges204 bronze badges
answered Jul 10, 2024 at 9:51
\$\endgroup\$
4
  • 1
    \$\begingroup\$ Re #2: drift, vol, and dvol_dx are type-hinted in OP's implementation. Please clarify what you mean. \$\endgroup\$ Commented Jul 10, 2024 at 16:20
  • \$\begingroup\$ Re #3: I don't see any debugging info being printed in OP's implementation. Please clarify. \$\endgroup\$ Commented Jul 10, 2024 at 16:21
  • \$\begingroup\$ Re #5: Please clarify what you mean that OP should consider visualizing the sample paths. OP already plotted the sample paths as solution[i] vs t. \$\endgroup\$ Commented Jul 10, 2024 at 17:03
  • 3
    \$\begingroup\$ @tdy this smells of an AI generated answer based on the writing style and new user. \$\endgroup\$ Commented Jul 12, 2024 at 3:57
9
\$\begingroup\$

This looks pretty good to me.

The only thing I would comment is that:

mu_n = np.array([sde.drift(self.times[self.iter], x) for x in self.x_curr])
sigma_n = np.array([sde.vol(self.times[self.iter], x) for x in self.x_curr])
dvol_dx_n = np.array([sde.dvol_dx(self.times[self.iter], x) for x in self.x_curr])

are not vectorised, and are therefore potentially a little or a lot slower than they could be.

Consider the following example:

def slow():
 a = np.ones(1000)
 f = lambda x: 0.5 * x
 b = np.array([f(x) for x in a])
 return b
%timeit slow()
# 185 μs ± 11.8 μs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
def fast():
 a = np.ones(1000)
 f = lambda x: 0.5 * x
 b = f(a)
 return b
%timeit fast()
# 3.82 μs ± 261 ns per loop (mean ± std. dev. of 7 runs, 100,000 loops each)

Your class iterate methods enforce that the underlying function is not vectorised and therefore enforces slowness.

Try expecting a vectorised solution and allowing the initialiser to handle cases where vectorisation is not possible, since you gain flexibility this way in the scenario where it is not possible to vectorise.

mu_n = sde.drift(self.times[self.iter], self.x_curr)
sigma_n = sde.vol(self.times[self.iter], self.x_curr)
dvol_dx_n = sde.dvol_dx(self.times[self.iter], self.x_curr)

where

# FAST
gbm_sde = SDE(
 initial_condition=1.0,
 drift = lambda t, s_t : 0.50 * s_t,
 vol = lambda t, s_t : s_t,
 dvol_dx = lambda t, s_t : np.ones(len(s_t)),
)
# SLOW
gbm_sde = SDE(
 initial_condition=1.0,
 drift = lambda t, s_t : np.array([0.50 * x for x in s_t]),
 vol = lambda t, s_t : np.array([x for x in s_t]),
 dvol_dx = lambda t, s_t : np.array([1.0 for x in s_t]),
)
answered Jul 10, 2024 at 9:34
\$\endgroup\$
8
\$\begingroup\$
self.num_steps = int((self.t_end - self.t_start) / self.step_size)
self.times = np.linspace(self.t_start, self.t_end, self.num_steps + 1)

You may not reach t_end. If the step-size is fixed, why not use num_steps as the parameter?

self.brownian_increments = np.sqrt(self.step_size) * np.random.standard_normal(
 size=(self.num_paths, self.num_steps)
)
self.brownian = np.cumsum(self.brownian_increments, axis=1)
self.brownian = np.concatenate(
 [np.zeros(shape=(self.num_paths, 1)), self.brownian], axis=1
)

You don't really use either of these, only through

d_brownian = self.brownian[:, self.iter + 1] - self.brownian[:, self.iter]

at each time-step. I'd calculate it once at the start, and store that. In fact, the differences are the Gaussians you have already computed:

self.d_brownian = np.sqrt(self.step_size) * np.random.standard_normal(
 size=(self.num_paths, self.num_steps)
)

If the number of steps is known apriori, then there is no reason for self.solution to be a list, instead of a numpy array that you successively fill.

self.solution = np.zeros((self.num_paths, self.num_steps))

It would be faster if your functions like sde.drift were assumed to be vectorized, returning numpy arrays, and then

mu_n = np.array([sde.drift(self.times[self.iter], x) for x in self.x_curr])

would become

mu_n = sde.drift(self.times[self.iter], x)

This is minor, but

for i in range(10):
 plt.plot(t, solution[i])

could be

plt.plot(t, solution.T)

I also see you doing

self.x_curr += ...

right before a

return self.x_curr.copy()

which brings me to suggest that you restructure your code.

I like the SDE class. I'd either remove the initial_condition from it, or also include the t_start, t_end values. All these three are properties of the initial value problem, not really the properties of the stochastic diff.eq. itself.

We are perhaps in the territory of what looks subjectively cleaner, so I'm just going to offer a rewrite.

from dataclasses import dataclass
from abc import ABC, abstractmethod
from typing import Callable, Optional
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
sns.set_style("whitegrid")
# for more descriptive type hints
T = np.array
X = np.ndarray
@dataclass
class SIVP:
 """
 An abstraction for a stochastic initial value problem
 """
 t_start: float
 t_end: float
 initial_condition: float
 
 drift: Callable[[float, float], float]
 vol: Callable[[float, float], float]
 dvol_dx: Optional[Callable[[float, float], float]]
@dataclass
class Solver(ABC):
 """
 An abstract base class for all numerical schemes
 """
 num_steps: int = 1000
 num_paths: int = 10
 @staticmethod
 @abstractmethod
 def time_step(dt: float, t: T, x: X, dw: X) -> X:
 """
 Compute the next iterate X(n+1)
 """
 def solve(self, sivp: SIVP) -> (T, X):
 """
 Solve the SDE
 """
 # stepsize
 dt = (sivp.t_end - sivp.t_start) / self.num_steps
 # Gaussian increments
 dws = np.random.normal(
 scale=np.sqrt(dt),
 size=(self.num_steps, self.num_paths)
 )
 
 # times
 ts = np.linspace(sivp.t_start, sivp.t_end, self.num_steps + 1)
 # trajectories
 xs = np.zeros((self.num_steps + 1, self.num_paths))
 xs[0] = sivp.initial_condition
 #time-stepping
 for step_ix in range(self.num_steps):
 xs[step_ix + 1] = self.time_step(sivp, dt, ts[step_ix], xs[step_ix], dws[step_ix])
 return ts, xs
@dataclass
class Milstein(Solver):
 """
 Numerical solver for a stochastic differential equation(SDE) using
 the Euler-Maruyama method.
 Consider an SDE of the form :
 dX_t = mu(t,X_t) dt + sigma(t,X_t) dB_t
 with initial condition X_0 = x_0
 The solution to the SDE can be computed using the increments
 X_{n+1} - X_n = mu(n,X_n)(t_{n+1}-t_n) + sigma(n,X_n)(B(n+1)-B(n))
 + 0.5 * sigma(n,X_n) * sigma'(n,X_n) * ((B(n+1) - B(n))**2 - (t_{n+1} - t_n))
 """
 @staticmethod
 def time_step(sivp: SIVP, dt: float, t: T, x: X, dw: X) -> X:
 """
 Generate the next iterate X(n+1)
 """
 mu_n = sivp.drift(t, x)
 sigma_n = sivp.vol(t, x)
 dvol_dx_n = sivp.dvol_dx(t, x)
 dx = (
 dt * mu_n
 + sigma_n * dw
 + 0.5 * sigma_n * dvol_dx_n * (dw**2 - dt)
 )
 return x + dx
@dataclass
class EulerMaruyama(Solver):
 """
 Numerical solver for a stochastic differential equation(SDE) using
 the Euler-Maruyama method.
 Consider an SDE of the form :
 dX_t = mu(t,X_t) dt + sigma(t,X_t) dB_t
 with initial condition X_0 = x_0
 The solution to the SDE can be computed using the increments
 X_{n+1} - X_n = mu(n,X_n)(t_{n+1}-t_n) + sigma(n,X_n)(B(n+1)-B(n))
 """
 
 @staticmethod
 def time_step(sivp: SIVP, dt: float, t: T, x: X, dw: X) -> X:
 """
 Generate the next iterate X(n+1)
 """
 mu_n = sivp.drift(t, x)
 sigma_n = sivp.vol(t, x)
 dvol_dx_n = sivp.dvol_dx(t, x)
 dx = dt * mu_n + sigma_n * dw
 return x + dx
# Let's solve the following SDE
# dS_t = S_t dB_t + (mu + 1/2) S_t dt
# where mu = 0. This has the known solution S_t = exp(B_t - t/2)
gbm_sde = SIVP(
 t_start = 0.0,
 t_end = 1.0,
 initial_condition = 1.0,
 drift = lambda t, s_t : 0.50 * s_t,
 vol = lambda t, s_t : s_t,
 dvol_dx = lambda t, s_t: 1 + 0*s_t
)
ts, xs = EulerMaruyama().solve(gbm_sde)
plt.xlabel(r'Time $t$')
plt.ylabel(r'$S(t)$')
plt.title(r'10 sample paths of Geometric Brownian Motion, $\mu=0.0,ドル Euler method')
plt.grid(True)
plt.plot(ts, xs)
plt.show()
ts, xs = Milstein().solve(gbm_sde)
plt.xlabel(r'Time $t$')
plt.ylabel(r'$S(t)$')
plt.title(r'10 sample paths of Geometric Brownian Motion, $\mu=0.0,ドル Milstein method')
plt.grid(True)
plt.plot(ts, xs)
plt.show()
answered Jul 10, 2024 at 12:21
\$\endgroup\$

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.