""" Fast multithreaded spline interpolation - Ver. 1.1 This versione: 15/06/2018 @author: Marco Maffezzoli, Università Bocconi """ import numpy as np from scipy.linalg.lapack import get_lapack_funcs from numba import jit, vectorize, guvectorize from functools import partial from interpolation import interpolation_search @jit(nopython=True, cache=True) def horner(x, coefs): """ Horner's Scheme for evaluating a polynomial of coefficients coefs in x, ands its first-order derivative. """ y = 0. yp = 0. for c in coefs: yp = yp * x + y y = y * x + c return y, yp @guvectorize('(i8[:],f8[:],f8[:],f8[:],f8[:],f8[:],f8[:])', '(m),(m),(n),(n),(n)->(m),(m)', cache=True, target='parallel') def _ppoly_eval(index, s, x, y, d, yn, dn): """ Computes the piecewise cubic polynomial at nodes xn. """ c = np.empty(4) for j, (i, z) in enumerate(zip(index, s)): # Step size h = x[i+1] - x[i] # Compute the coefficients of the power form (in reverse order!) c[3] = y0 = y[i] q0 = (y[i+1] - y0) / h c[2] = d0 = d[i] d1 = d[i+1] c[1] = (3*q0 - 2*d0 - d1) / h c[0] = (d0 - 2*q0 + d1) / h**2 # Computes the Hermite polynomial and its derivative # using Horner's Scheme yn[j], dn[j] = horner(z, c) @guvectorize('(f8[:],f8[:],i8[:],f8[:])', '(),(n)->(),()', cache=True, target='parallel') def _locate(xn, x, index, s): """ Locates the elements of xn on the grid x. """ index[0] = i = interpolation_search(x, xn[0]) s[0] = xn[0] - x[i] class Pchip(object): """ See C. Moler, Numerical Computing with Matlab, 2004. """ def __init__(self, xn, x, assume_sorted=False): xn = np.asarray(xn).ravel() self.x = np.asarray(x).ravel() # Sanity check if self.x.size < 2: raise ValueError('at least two nodes are needed') if not isinstance(assume_sorted, bool): raise TypeError('assume_sorted is not a boolean') # Sort if needed if assume_sorted: self._sort_idx = None else: self._sort_idx = np.argsort(x) self.x = self.x.take(self._sort_idx) # Locate xn on the grid and store the result self.idx, self.s = _locate(xn, self.x) # Compute and store the interval lenghts self._h = np.diff(self.x) def __call__(self, y, deriv=False): y = np.atleast_1d(y) # Sanity checks if y.shape[-1] != self.x.size: raise ValueError('the shape of y does not conform') if y.ndim > 2: raise ValueError('the y array should be at most 2-D') if not isinstance(deriv, bool): raise TypeError('derivative not a boolean') # Sort y if needed if self._sort_idx is not None: y = y.take(self._sort_idx, axis=-1) # Compute the slopes, keeping monotonicity h = self._h q = np.diff(y) / h # Compute the slopes d_k for interior points d = np.empty_like(y) d[..., 1:-1] = self._slopes(h[:-1], h[1:], q[..., :-1], q[..., 1:]) # Compute the slopes d_k for endpoints, following Moler d[..., 0] = self._endpoint(h[0], h[1], q[..., 0], q[..., 1]) d[..., -1] = self._endpoint(h[-1], h[-2], q[..., -1], q[..., -2]) # Evaluate the PCHIP at nodes xn results = _ppoly_eval(self.idx, self.s, self.x, y, d) yn, dn = map(np.squeeze, results) return [yn, dn] if deriv else yn @staticmethod @vectorize('f8(f8,f8,f8,f8)', cache=True, target='parallel') def _slopes(h0, h1, q0, q1): """ Computes d_k at interior points. See C. Moler, Numerical Computing with MATLAB, Ch. 3.4. """ if np.sign(q0)*np.sign(q1) > 0: # If q0 and q1 have the same sign, then d_k is taken to be # the harmonic mean of the two discrete slopes: w0 = 2*h1 + h0 w1 = h1 + 2*h0 d = (w0 + w1) / (w0/q0 + w1/q1) else: # If q0 and q1 have opposite signs or if # either of them is zero, then x_k is a discrete # local minimum or maximum, so we set d_k = 0 d = 0. return d @staticmethod @vectorize('f8(f8,f8,f8,f8)', cache=True) def _endpoint(h0, h1, q0, q1): """ Computes d_k at endpoints. See C. Moler, Numerical Computing with MATLAB, Ch. 3.4. """ # Noncentered three-point estimate for derivative d = ((h1 + 2*h0)*q0 - h0*q1) / (h0 + h1) # Preserve shape if np.sign(d) != np.sign(q0): d = 0. elif ((np.sign(q0) != np.sign(q1)) and (np.abs(d) > np.abs(3*q0))): d = 3*q0 return d class CubicSpline(object): """ See C. Moler, Numerical Computing with Matlab, 2004. """ def __init__(self, xn, x, assume_sorted=False, bc_type='not-a-knot'): xn = np.asarray(xn).ravel() self.x = x = np.asarray(x).ravel() # Sanity check if x.size < 2: raise ValueError('at least two nodes are needed') if not isinstance(assume_sorted, bool): raise ValueError('assume_sorted is not a boolean') if bc_type not in ('not-a-knot', 'natural'): raise ValueError('boundary condition not implemented') self._nak = True if bc_type == 'not-a-knot' else False # Sort if needed if assume_sorted: self._sort_idx = None else: self._sort_idx = np.argsort(self.x) self.x = self.x.take(self._sort_idx) # Locate xn on the grid and store the result self.idx, self.s = _locate(xn, self.x) # Build the tridiagonal matrix self._h, ld, dd, ud = self._tridiag_matrix(x, self._nak) # Direct access to LAPACK gtsv function (tridiagonal linear solver) gtsv = get_lapack_funcs('gtsv', dtype=np.float64) # Store the solver as function of rhs self.gtsv = partial(gtsv, ld, dd, ud, overwrite_dl=False, overwrite_d=False, overwrite_du=False, overwrite_b=True) def __call__(self, y, deriv=False): y = np.atleast_1d(y) # Sanity checks if y.shape[-1] != self.x.size: raise ValueError('the shape of y does not conform') if y.ndim > 2: raise ValueError('the y array should be at most 2-D') if not isinstance(deriv, bool): raise TypeError('deriv should be a boolean') # Sort y if needed if self._sort_idx is not None: y = y.take(self._sort_idx, axis=-1) # Compute the right-hand side of the linear system d = self._rhs(self._h, y, self._nak) # Solve the tridiagonal system dl, dd, du, slopes, info = self.gtsv(d.transpose()) if info > 0: raise RuntimeError('the tridiagonal matrix is singular') # Evaluate the polynomial at nodes xn results = _ppoly_eval(self.idx, self.s, self.x, y, slopes.transpose()) yn, dn = map(np.squeeze, results) return [yn, dn] if deriv else yn @staticmethod def _tridiag_matrix(x, nak): # Compute and store the interval lenghts h = np.diff(x) # Build the tridiagonal matrix ld = np.empty_like(h) dd = np.empty_like(x) ud = np.empty_like(h) # Diagonals h0 = h[:-1] h1 = h[1:] ld[:-1] = h1 # The lower diagonal dd[1:-1] = 2 * (h0 + h1) # The main diagonal ud[1:] = h0 # The upper diagonal # Boundary conditions if nak: # Start ud[0] = h[1] + h[0] dd[0] = h[1] # End dd[-1] = h[-2] ld[-1] = h[-2] + h[-1] else: # Start ud[0] = h[0] dd[0] = 2 * h[0] # End dd[-1] = 2 * h[-1] ld[-1] = h[-1] return h, ld, dd, ud @staticmethod def _rhs(h, y, nak): # Compute the first divided differences q = np.diff(y) / h # Build the tridiagonal system d = np.empty_like(y) d[..., 1:-1] = 3 * (h[1:]*q[..., :-1] + h[:-1]*q[..., 1:]) # Boundary condition s0 = (Ellipsis, 0) s1 = (Ellipsis, 1) z1 = (Ellipsis, -1) z2 = (Ellipsis, -2) if nak: # Start c0 = h[1] + h[0] d[s0] = ((h[0] + 2*c0) * h[1] * q[s0] + h[0]**2 * q[s1]) / c0 # End a1 = h[-1] + h[-2] d[z1] = (h[-1]**2 * q[z2] + (2*a1 + h[-1]) * h[-2] * q[z1]) / a1 else: # Start d[s0] = 3 * (y[s1] - y[s0]) # End d[z1] = 3 * (y[z1] - y[z2]) return d class SchumakerSpline(object): """ 1-D piecewise shape-preserving quadratic spline interpolation Follows Schumaker (1983): "On Quadratic Spline Interpolation," SIAM J. of Numerical Analysis 20(4), 854-864 """ def __init__(self, xn, x, assume_sorted=False): self.xn = np.asarray(xn).ravel() self.x = np.asarray(x).ravel() # Sanity check if self.x.size < 2: raise ValueError('at least two nodes are needed') if not isinstance(assume_sorted, bool): raise ValueError('assume_sorted is not a boolean') # Sort if needed if assume_sorted: self._sort_idx = None else: self._sort_idx = np.argsort(self.x) self.x = self.x.take(self._sort_idx) self._h = np.diff(self.x) # Locate xn on the grid and store the result self._index = self._locate(self.xn, self.x) def __call__(self, y, deriv=False): y = np.atleast_1d(y) # Sanity checks if y.shape[-1] != self.x.size: raise ValueError('the shape of y does not conform') if y.ndim > 2: raise ValueError('the y array should be at most 2-D') if not isinstance(deriv, bool): raise TypeError('deriv should be a boolean') # Sort y if needed if self._sort_idx is not None: y = y.take(self._sort_idx, axis=-1) delta = np.diff(y) delta /= self._h d = np.empty_like(y) d[..., 1:-1] = self._slopes(delta[..., :-1], delta[..., 1:]) d[..., 0] = self._endpoint(delta[..., 0], d[..., 1]) d[..., -1] = self._endpoint(delta[..., -1], d[..., -2]) results = self._qspline(self._index, self.xn, self.x, y, d) yn, dn = map(np.squeeze, results) return [yn, dn] if deriv else yn @guvectorize('(f8[:],f8[:],i8[:])', '(),(n)->()', cache=True, target='parallel') def _locate(xn, x, index): """ Locates the elements of xn on the grid x. """ index[0] = interpolation_search(x, xn[0]) @staticmethod @vectorize('f8(f8,f8)', cache=True, target='parallel') def _slopes(delta0, delta1): # The slope is computed as the harmonic mean of two adiacent # finite differences. This follows Lam (1990): "Monotone and Convex # Quadratic Spline Interpolation," Virginia J. of Sc. 41(1) if delta0*delta1 > 0: slope = ((2 * delta0 * delta1) / (delta0 + delta1)) else: slope = 0. return slope @staticmethod @vectorize('f8(f8,f8)', cache=True) def _endpoint(delta, d): # See above. if delta*(2*delta - d) > 0: slope = 2*delta - d else: slope = 0. return slope @staticmethod @guvectorize('(i8[:],f8[:],f8[:],f8[:],f8[:],f8[:],f8[:])', '(m),(m),(n),(n),(k)->(m),(m)', cache=True, target='parallel') def _qspline(index, xn, x, y, d, yn, dn): coefs = np.empty(3) for j, (i, z) in enumerate(zip(index, xn)): # Preliminaries x0, x1 = x[i], x[i+1] y0, y1 = y[i], y[i+1] d0, d1 = d[i], d[i+1] dx = x1 - x0 dy = y1 - y0 delta = dy / dx # Main body if d0 + d1 == 2 * delta: # Knife-edge case: a quadratic polynomial works coefs[2] = y0 coefs[1] = d0 coefs[0] = (d1 - d0) / (2*dx) s = z - x0 else: dd0 = d0 - delta dd1 = d1 - delta # Compute the new node zeta if dd0 * dd1 >= 0: # Inflection point zeta = (x0 + x1) / 2 elif np.abs(dd1) < np.abs(dd0): # Convex z1 = x0 + (2 * dx * dd1) / (d1 - d0) zeta = (x0 + z1) / 2 else: # Concave z0 = x1 + (2 * dx * dd0) / (d1 - d0) zeta = (z0 + x1) / 2 # Compute the coefficients for the quadratic spline # See Schumaker (1983), p. 855 alpha = zeta - x0 beta = x1 - zeta s_hat = (2 * dy - alpha * d0 - beta * d1) / dx a = y0 b = d0 c = (s_hat - d0) / (2*alpha) if z < zeta: coefs[2] = a coefs[1] = b coefs[0] = c s = z - x0 else: coefs[2] = a + alpha * b + alpha**2 * c coefs[1] = s_hat coefs[0] = (d1 - s_hat) / (2*beta) s = z - zeta yn[j], dn[j] = horner(s, coefs)