Source code for bluemath_tk.teslakit.statistical

#!/usr/bin/env python
# -*- coding: utf-8 -*-


# pip
import numpy as np
import statsmodels.api as sm
from statsmodels.distributions.empirical_distribution import ECDF
from scipy.interpolate import interp1d
from scipy.stats import norm, genpareto, t
from scipy.special import ndtri  # norm inv


[docs] def ksdensity_CDF(x): ''' Kernel smoothing function estimate. Returns cumulative probability function at x. ''' # fit a univariate KDE kde = sm.nonparametric.KDEUnivariate(x) kde.fit() # interpolate KDE CDF at x position (kde.support = x) fint = interp1d(kde.support, kde.cdf) return fint(x)
[docs] def ksdensity_ICDF(x, p): ''' Returns Inverse Kernel smoothing function at p points ''' # fit a univariate KDE kde = sm.nonparametric.KDEUnivariate(x) kde.fit() # interpolate KDE CDF to get support values  fint = interp1d(kde.cdf, kde.support) # ensure p inside kde.cdf p[p<np.min(kde.cdf)] = kde.cdf[0] p[p>np.max(kde.cdf)] = kde.cdf[-1] return fint(p)
[docs] def GeneralizedPareto_CDF(x): ''' Generalized Pareto fit Returns cumulative probability function at x. ''' # fit a generalized pareto and get params shape, _, scale = genpareto.fit(x) # get generalized pareto CDF cdf = genpareto.cdf(x, shape, scale=scale) return cdf
[docs] def GeneralizedPareto_ICDF(x, p): ''' Generalized Pareto fit Returns inverse cumulative probability function at p points ''' # fit a generalized pareto and get params shape, _, scale = genpareto.fit(x) # get percent points (inverse of CDF) icdf = genpareto.ppf(p, shape, scale=scale) return icdf
[docs] def Empirical_CDF(x): ''' Returns empirical cumulative probability function at x. ''' # fit ECDF ecdf = ECDF(x) cdf = ecdf(x) return cdf
[docs] def Empirical_ICDF(x, p): ''' Returns inverse empirical cumulative probability function at p points ''' # TODO: revisar que el fill_value funcione correctamente # fit ECDF ecdf = ECDF(x) cdf = ecdf(x) # interpolate KDE CDF to get support values  fint = interp1d( cdf, x, fill_value=(np.nanmin(x), np.nanmax(x)), #fill_value=(np.min(x), np.max(x)), bounds_error=False ) return fint(p)
[docs] def copulafit(u, family='gaussian'): ''' Fit copula to data. Returns correlation matrix and degrees of freedom for t student ''' rhohat = None # correlation matrix nuhat = None # degrees of freedom (for t student) if family=='gaussian': u[u>=1.0] = 0.999999 inv_n = ndtri(u) rhohat = np.corrcoef(inv_n.T) elif family=='t': raise ValueError("Not implemented") # TODO: no encaja con los datos. no funciona x = np.linspace(np.min(u), np.max(u),100) inv_t = np.ndarray((len(x), u.shape[1])) for j in range(u.shape[1]): param = t.fit(u[:,j]) t_pdf = t.pdf(x,loc=param[0],scale=param[1],df=param[2]) inv_t[:,j] = t_pdf # TODO CORRELACION? NUHAT? rhohat = np.corrcoef(inv_n.T) nuhat = None else: raise ValueError("Wrong family parameter. Use 'gaussian' or 't'") return rhohat, nuhat
[docs] def copularnd(family, rhohat, n): ''' Random vectors from a copula ''' if family=='gaussian': mn = np.zeros(rhohat.shape[0]) np_rmn = np.random.multivariate_normal(mn, rhohat, n) u = norm.cdf(np_rmn) elif family=='t': # TODO raise ValueError("Not implemented") else: raise ValueError("Wrong family parameter. Use 'gaussian' or 't'") return u
[docs] def CopulaSimulation(U_data, kernels, num_sim): ''' Fill statistical space using copula simulation U_data: 2D nump.array, each variable in a column kernels: list of kernels for each column at U_data (KDE | GPareto) num_sim: number of simulations ''' # kernel CDF dictionary d_kf = { 'KDE' : (ksdensity_CDF, ksdensity_ICDF), 'GPareto' : (GeneralizedPareto_CDF, GeneralizedPareto_ICDF), 'ECDF' : (Empirical_CDF, Empirical_ICDF), } # check kernel input if any([k not in d_kf.keys() for k in kernels]): raise ValueError( 'wrong kernel: {0}, use: {1}'.format( kernel, ' | '.join(d_kf.keys()) ) ) # normalize: calculate data CDF using kernels U_cdf = np.zeros(U_data.shape) * np.nan ic = 0 for d, k in zip(U_data.T, kernels): cdf, _ = d_kf[k] # get kernel cdf U_cdf[:, ic] = cdf(d) ic += 1 # fit data CDFs to a gaussian copula rhohat, _ = copulafit(U_cdf, 'gaussian') # simulate data to fill probabilistic space U_cop = copularnd('gaussian', rhohat, num_sim) # de-normalize: calculate data ICDF U_sim = np.zeros(U_cop.shape) * np.nan ic = 0 for d, c, k in zip(U_data.T, U_cop.T, kernels): _, icdf = d_kf[k] # get kernel icdf U_sim[:, ic] = icdf(d, c) ic += 1 return U_sim
[docs] def runmean(X, m, modestr): ''' parsed runmean function from original matlab codes. ''' mm = 2*m+1 if modestr == 'edge': xfirst = np.repeat(X[0], m) xlast = np.repeat(X[-1], m) elif modestr == 'zero': xfirst = np.zeros(m) xlast = np.zeros(m) elif modestr == 'mean': xfirst = np.repeat(np.nanmean(X), m) xlast = xfirst Y = np.concatenate( (np.zeros(1), xfirst, X, xlast) ) Y = np.nancumsum(Y) Y = np.divide(Y[mm:,]-Y[:-mm], mm) return Y