123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433 |
- from itertools import groupby
- from warnings import warn
- import numpy as np
- from scipy.sparse import find, coo_matrix
- EPS = np.finfo(float).eps
- def validate_first_step(first_step, t0, t_bound):
- """Assert that first_step is valid and return it."""
- if first_step <= 0:
- raise ValueError("`first_step` must be positive.")
- if first_step > np.abs(t_bound - t0):
- raise ValueError("`first_step` exceeds bounds.")
- return first_step
- def validate_max_step(max_step):
- """Assert that max_Step is valid and return it."""
- if max_step <= 0:
- raise ValueError("`max_step` must be positive.")
- return max_step
- def warn_extraneous(extraneous):
- """Display a warning for extraneous keyword arguments.
- The initializer of each solver class is expected to collect keyword
- arguments that it doesn't understand and warn about them. This function
- prints a warning for each key in the supplied dictionary.
- Parameters
- ----------
- extraneous : dict
- Extraneous keyword arguments
- """
- if extraneous:
- warn("The following arguments have no effect for a chosen solver: {}."
- .format(", ".join("`{}`".format(x) for x in extraneous)))
- def validate_tol(rtol, atol, n):
- """Validate tolerance values."""
- if np.any(rtol < 100 * EPS):
- warn("At least one element of `rtol` is too small. "
- f"Setting `rtol = np.maximum(rtol, {100 * EPS})`.")
- rtol = np.maximum(rtol, 100 * EPS)
- atol = np.asarray(atol)
- if atol.ndim > 0 and atol.shape != (n,):
- raise ValueError("`atol` has wrong shape.")
- if np.any(atol < 0):
- raise ValueError("`atol` must be positive.")
- return rtol, atol
- def norm(x):
- """Compute RMS norm."""
- return np.linalg.norm(x) / x.size ** 0.5
- def select_initial_step(fun, t0, y0, f0, direction, order, rtol, atol):
- """Empirically select a good initial step.
- The algorithm is described in [1]_.
- Parameters
- ----------
- fun : callable
- Right-hand side of the system.
- t0 : float
- Initial value of the independent variable.
- y0 : ndarray, shape (n,)
- Initial value of the dependent variable.
- f0 : ndarray, shape (n,)
- Initial value of the derivative, i.e., ``fun(t0, y0)``.
- direction : float
- Integration direction.
- order : float
- Error estimator order. It means that the error controlled by the
- algorithm is proportional to ``step_size ** (order + 1)`.
- rtol : float
- Desired relative tolerance.
- atol : float
- Desired absolute tolerance.
- Returns
- -------
- h_abs : float
- Absolute value of the suggested initial step.
- References
- ----------
- .. [1] E. Hairer, S. P. Norsett G. Wanner, "Solving Ordinary Differential
- Equations I: Nonstiff Problems", Sec. II.4.
- """
- if y0.size == 0:
- return np.inf
- scale = atol + np.abs(y0) * rtol
- d0 = norm(y0 / scale)
- d1 = norm(f0 / scale)
- if d0 < 1e-5 or d1 < 1e-5:
- h0 = 1e-6
- else:
- h0 = 0.01 * d0 / d1
- y1 = y0 + h0 * direction * f0
- f1 = fun(t0 + h0 * direction, y1)
- d2 = norm((f1 - f0) / scale) / h0
- if d1 <= 1e-15 and d2 <= 1e-15:
- h1 = max(1e-6, h0 * 1e-3)
- else:
- h1 = (0.01 / max(d1, d2)) ** (1 / (order + 1))
- return min(100 * h0, h1)
- class OdeSolution:
- """Continuous ODE solution.
- It is organized as a collection of `DenseOutput` objects which represent
- local interpolants. It provides an algorithm to select a right interpolant
- for each given point.
- The interpolants cover the range between `t_min` and `t_max` (see
- Attributes below). Evaluation outside this interval is not forbidden, but
- the accuracy is not guaranteed.
- When evaluating at a breakpoint (one of the values in `ts`) a segment with
- the lower index is selected.
- Parameters
- ----------
- ts : array_like, shape (n_segments + 1,)
- Time instants between which local interpolants are defined. Must
- be strictly increasing or decreasing (zero segment with two points is
- also allowed).
- interpolants : list of DenseOutput with n_segments elements
- Local interpolants. An i-th interpolant is assumed to be defined
- between ``ts[i]`` and ``ts[i + 1]``.
- Attributes
- ----------
- t_min, t_max : float
- Time range of the interpolation.
- """
- def __init__(self, ts, interpolants):
- ts = np.asarray(ts)
- d = np.diff(ts)
- # The first case covers integration on zero segment.
- if not ((ts.size == 2 and ts[0] == ts[-1])
- or np.all(d > 0) or np.all(d < 0)):
- raise ValueError("`ts` must be strictly increasing or decreasing.")
- self.n_segments = len(interpolants)
- if ts.shape != (self.n_segments + 1,):
- raise ValueError("Numbers of time stamps and interpolants "
- "don't match.")
- self.ts = ts
- self.interpolants = interpolants
- if ts[-1] >= ts[0]:
- self.t_min = ts[0]
- self.t_max = ts[-1]
- self.ascending = True
- self.ts_sorted = ts
- else:
- self.t_min = ts[-1]
- self.t_max = ts[0]
- self.ascending = False
- self.ts_sorted = ts[::-1]
- def _call_single(self, t):
- # Here we preserve a certain symmetry that when t is in self.ts,
- # then we prioritize a segment with a lower index.
- if self.ascending:
- ind = np.searchsorted(self.ts_sorted, t, side='left')
- else:
- ind = np.searchsorted(self.ts_sorted, t, side='right')
- segment = min(max(ind - 1, 0), self.n_segments - 1)
- if not self.ascending:
- segment = self.n_segments - 1 - segment
- return self.interpolants[segment](t)
- def __call__(self, t):
- """Evaluate the solution.
- Parameters
- ----------
- t : float or array_like with shape (n_points,)
- Points to evaluate at.
- Returns
- -------
- y : ndarray, shape (n_states,) or (n_states, n_points)
- Computed values. Shape depends on whether `t` is a scalar or a
- 1-D array.
- """
- t = np.asarray(t)
- if t.ndim == 0:
- return self._call_single(t)
- order = np.argsort(t)
- reverse = np.empty_like(order)
- reverse[order] = np.arange(order.shape[0])
- t_sorted = t[order]
- # See comment in self._call_single.
- if self.ascending:
- segments = np.searchsorted(self.ts_sorted, t_sorted, side='left')
- else:
- segments = np.searchsorted(self.ts_sorted, t_sorted, side='right')
- segments -= 1
- segments[segments < 0] = 0
- segments[segments > self.n_segments - 1] = self.n_segments - 1
- if not self.ascending:
- segments = self.n_segments - 1 - segments
- ys = []
- group_start = 0
- for segment, group in groupby(segments):
- group_end = group_start + len(list(group))
- y = self.interpolants[segment](t_sorted[group_start:group_end])
- ys.append(y)
- group_start = group_end
- ys = np.hstack(ys)
- ys = ys[:, reverse]
- return ys
- NUM_JAC_DIFF_REJECT = EPS ** 0.875
- NUM_JAC_DIFF_SMALL = EPS ** 0.75
- NUM_JAC_DIFF_BIG = EPS ** 0.25
- NUM_JAC_MIN_FACTOR = 1e3 * EPS
- NUM_JAC_FACTOR_INCREASE = 10
- NUM_JAC_FACTOR_DECREASE = 0.1
- def num_jac(fun, t, y, f, threshold, factor, sparsity=None):
- """Finite differences Jacobian approximation tailored for ODE solvers.
- This function computes finite difference approximation to the Jacobian
- matrix of `fun` with respect to `y` using forward differences.
- The Jacobian matrix has shape (n, n) and its element (i, j) is equal to
- ``d f_i / d y_j``.
- A special feature of this function is the ability to correct the step
- size from iteration to iteration. The main idea is to keep the finite
- difference significantly separated from its round-off error which
- approximately equals ``EPS * np.abs(f)``. It reduces a possibility of a
- huge error and assures that the estimated derivative are reasonably close
- to the true values (i.e., the finite difference approximation is at least
- qualitatively reflects the structure of the true Jacobian).
- Parameters
- ----------
- fun : callable
- Right-hand side of the system implemented in a vectorized fashion.
- t : float
- Current time.
- y : ndarray, shape (n,)
- Current state.
- f : ndarray, shape (n,)
- Value of the right hand side at (t, y).
- threshold : float
- Threshold for `y` value used for computing the step size as
- ``factor * np.maximum(np.abs(y), threshold)``. Typically, the value of
- absolute tolerance (atol) for a solver should be passed as `threshold`.
- factor : ndarray with shape (n,) or None
- Factor to use for computing the step size. Pass None for the very
- evaluation, then use the value returned from this function.
- sparsity : tuple (structure, groups) or None
- Sparsity structure of the Jacobian, `structure` must be csc_matrix.
- Returns
- -------
- J : ndarray or csc_matrix, shape (n, n)
- Jacobian matrix.
- factor : ndarray, shape (n,)
- Suggested `factor` for the next evaluation.
- """
- y = np.asarray(y)
- n = y.shape[0]
- if n == 0:
- return np.empty((0, 0)), factor
- if factor is None:
- factor = np.full(n, EPS ** 0.5)
- else:
- factor = factor.copy()
- # Direct the step as ODE dictates, hoping that such a step won't lead to
- # a problematic region. For complex ODEs it makes sense to use the real
- # part of f as we use steps along real axis.
- f_sign = 2 * (np.real(f) >= 0).astype(float) - 1
- y_scale = f_sign * np.maximum(threshold, np.abs(y))
- h = (y + factor * y_scale) - y
- # Make sure that the step is not 0 to start with. Not likely it will be
- # executed often.
- for i in np.nonzero(h == 0)[0]:
- while h[i] == 0:
- factor[i] *= 10
- h[i] = (y[i] + factor[i] * y_scale[i]) - y[i]
- if sparsity is None:
- return _dense_num_jac(fun, t, y, f, h, factor, y_scale)
- else:
- structure, groups = sparsity
- return _sparse_num_jac(fun, t, y, f, h, factor, y_scale,
- structure, groups)
- def _dense_num_jac(fun, t, y, f, h, factor, y_scale):
- n = y.shape[0]
- h_vecs = np.diag(h)
- f_new = fun(t, y[:, None] + h_vecs)
- diff = f_new - f[:, None]
- max_ind = np.argmax(np.abs(diff), axis=0)
- r = np.arange(n)
- max_diff = np.abs(diff[max_ind, r])
- scale = np.maximum(np.abs(f[max_ind]), np.abs(f_new[max_ind, r]))
- diff_too_small = max_diff < NUM_JAC_DIFF_REJECT * scale
- if np.any(diff_too_small):
- ind, = np.nonzero(diff_too_small)
- new_factor = NUM_JAC_FACTOR_INCREASE * factor[ind]
- h_new = (y[ind] + new_factor * y_scale[ind]) - y[ind]
- h_vecs[ind, ind] = h_new
- f_new = fun(t, y[:, None] + h_vecs[:, ind])
- diff_new = f_new - f[:, None]
- max_ind = np.argmax(np.abs(diff_new), axis=0)
- r = np.arange(ind.shape[0])
- max_diff_new = np.abs(diff_new[max_ind, r])
- scale_new = np.maximum(np.abs(f[max_ind]), np.abs(f_new[max_ind, r]))
- update = max_diff[ind] * scale_new < max_diff_new * scale[ind]
- if np.any(update):
- update, = np.nonzero(update)
- update_ind = ind[update]
- factor[update_ind] = new_factor[update]
- h[update_ind] = h_new[update]
- diff[:, update_ind] = diff_new[:, update]
- scale[update_ind] = scale_new[update]
- max_diff[update_ind] = max_diff_new[update]
- diff /= h
- factor[max_diff < NUM_JAC_DIFF_SMALL * scale] *= NUM_JAC_FACTOR_INCREASE
- factor[max_diff > NUM_JAC_DIFF_BIG * scale] *= NUM_JAC_FACTOR_DECREASE
- factor = np.maximum(factor, NUM_JAC_MIN_FACTOR)
- return diff, factor
- def _sparse_num_jac(fun, t, y, f, h, factor, y_scale, structure, groups):
- n = y.shape[0]
- n_groups = np.max(groups) + 1
- h_vecs = np.empty((n_groups, n))
- for group in range(n_groups):
- e = np.equal(group, groups)
- h_vecs[group] = h * e
- h_vecs = h_vecs.T
- f_new = fun(t, y[:, None] + h_vecs)
- df = f_new - f[:, None]
- i, j, _ = find(structure)
- diff = coo_matrix((df[i, groups[j]], (i, j)), shape=(n, n)).tocsc()
- max_ind = np.array(abs(diff).argmax(axis=0)).ravel()
- r = np.arange(n)
- max_diff = np.asarray(np.abs(diff[max_ind, r])).ravel()
- scale = np.maximum(np.abs(f[max_ind]),
- np.abs(f_new[max_ind, groups[r]]))
- diff_too_small = max_diff < NUM_JAC_DIFF_REJECT * scale
- if np.any(diff_too_small):
- ind, = np.nonzero(diff_too_small)
- new_factor = NUM_JAC_FACTOR_INCREASE * factor[ind]
- h_new = (y[ind] + new_factor * y_scale[ind]) - y[ind]
- h_new_all = np.zeros(n)
- h_new_all[ind] = h_new
- groups_unique = np.unique(groups[ind])
- groups_map = np.empty(n_groups, dtype=int)
- h_vecs = np.empty((groups_unique.shape[0], n))
- for k, group in enumerate(groups_unique):
- e = np.equal(group, groups)
- h_vecs[k] = h_new_all * e
- groups_map[group] = k
- h_vecs = h_vecs.T
- f_new = fun(t, y[:, None] + h_vecs)
- df = f_new - f[:, None]
- i, j, _ = find(structure[:, ind])
- diff_new = coo_matrix((df[i, groups_map[groups[ind[j]]]],
- (i, j)), shape=(n, ind.shape[0])).tocsc()
- max_ind_new = np.array(abs(diff_new).argmax(axis=0)).ravel()
- r = np.arange(ind.shape[0])
- max_diff_new = np.asarray(np.abs(diff_new[max_ind_new, r])).ravel()
- scale_new = np.maximum(
- np.abs(f[max_ind_new]),
- np.abs(f_new[max_ind_new, groups_map[groups[ind]]]))
- update = max_diff[ind] * scale_new < max_diff_new * scale[ind]
- if np.any(update):
- update, = np.nonzero(update)
- update_ind = ind[update]
- factor[update_ind] = new_factor[update]
- h[update_ind] = h_new[update]
- diff[:, update_ind] = diff_new[:, update]
- scale[update_ind] = scale_new[update]
- max_diff[update_ind] = max_diff_new[update]
- diff.data /= np.repeat(h, np.diff(diff.indptr))
- factor[max_diff < NUM_JAC_DIFF_SMALL * scale] *= NUM_JAC_FACTOR_INCREASE
- factor[max_diff > NUM_JAC_DIFF_BIG * scale] *= NUM_JAC_FACTOR_DECREASE
- factor = np.maximum(factor, NUM_JAC_MIN_FACTOR)
- return diff, factor
|