Source code for bluemath_tk.teslakit.climate_emulator

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# common
import os
import os.path as op
import time
import pickle
from itertools import permutations
import glob
import shutil

# pip
import numpy as np
import pandas as pd
import xarray as xr
from scipy.special import ndtri  # norm inv
from scipy.stats import genextreme, gumbel_l, spearmanr, norm, weibull_min
from statsmodels.distributions.empirical_distribution import ECDF
from numpy.random import choice, multivariate_normal, randint, rand

# fix tqdm for notebook
from tqdm import tqdm as tqdm_base


[docs] def tqdm(*args, **kwargs): if hasattr(tqdm_base, "_instances"): for instance in list(tqdm_base._instances): tqdm_base._decr_instances(instance) return tqdm_base(*args, **kwargs)
# tk from .statistical import Empirical_ICDF from .extremes import FitGEV_KMA_Frechet, Smooth_GEV_Shape, ACOV from .io.aux_nc import StoreBugXdset from .database import clean_files from .plotting.extremes import ( Plot_GEVParams, Plot_ChromosomesProbs, Plot_SigmaCorrelation, )
[docs] class Climate_Emulator(object): """ KMA - DWTs Climate Emulator Fit Waves extremes model (GEV, Gumbel, Weibull distributions) and DWTs series Simulate (probabilistic) a new dataset of Waves from a custom DWTs series This custom emulator uses Waves data divided by families (sea, swell_1, swell_2, ...), each family is defined by 3 variables: Hs, Tp, Dir Some configuration parameters: A chromosomes methodology for statistical fit is available, it will handle on/off probabilities for each different waves family combination. Aditional variables, outside of waves families, can be given to the emulator. A variable has to be selected as the PROXY to calculate storms max. (default: AWL) User can manually set a variable for an empircal / weibull / gev distribution. """ def __init__(self, p_base): # max. Total Water level for each storm data self.KMA_MS = None self.WVS_MS = None # Waves TCs WTs self.WVS_TCs = None # extremes model params self.GEV_Par = None # GEV fitting parameters self.GEV_Par_S = None # GEV simulation sampled parameters self.sigma = None # Pearson sigma correlation # chromosomes self.do_chrom = True # True for chromosomes combinations methodology self.chrom = None # chromosomes and chromosomes probs # waves families and variables related parameters self.fams = [] # waves families to use in emulator self.vars_GEV = [] # variables handled with GEV fit self.vars_EMP = [] # varibles handled with Empirical fit self.vars_WBL = [] # varibles handled with Weibull fit self.extra_variables = [] # extra variables to use in emulator (no waves families) # to override a particular variable simulation distribution with a Empirical self.sim_icdf_empirical_override = [] # full variable name: "family_var_WT", example: ['swell_2_Hs_23', 'sea_Hs_15', ...] # simulated waves filter self.sim_waves_filter = { "hs": (0, 8), "tp": (2, 25), "ws": (0, 0.06), } # paths self.p_base = p_base self.p_config = op.join(p_base, "config.pk") self.p_WVS_MS = op.join(p_base, "WVS_MaxStorm.nc") self.p_KMA_MS = op.join(p_base, "KMA_MaxStorm.nc") self.p_WVS_TCs = op.join(p_base, "WVS_TCs.nc") self.p_chrom = op.join(p_base, "chromosomes.nc") self.p_GEV_Par = op.join(p_base, "GEV_Parameters.nc") self.p_GEV_Sigma = op.join(p_base, "GEV_SigmaCorrelation.nc") self.p_report_fit = op.join(p_base, "report_fit") self.p_report_sim = op.join(p_base, "report_sim") # output simulation default storage paths (folders) self.p_sims = op.join(self.p_base, "Simulations") # folder self.p_sim_wvs_notcs = op.join(self.p_sims, "WAVES_noTCs") self.p_sim_wvs_tcs = op.join(self.p_sims, "WAVES_TCs") self.p_sim_tcs = op.join(self.p_sims, "TCs")
[docs] def ConfigVariables(self, config): """ Set waves families names Set optional extra variables names Set variables distribution: GEV, Empirical, Weibull Activate / Deactivate chromosomes methodology (if False, all-on combo) config = { 'waves_families': ['sea', 'swell_1', ...], 'extra_variables': ['wind', 'slp', ...] 'distribution': [ ('sea_Tp', 'Empirical'), ('wind', 'Weibull'), ... }, 'do_chromosomes': True, } """ # get data from config dict fams = config["waves_families"] # waves families to use # variables lists for each distribution l_GEV_vars = [] # GEV l_EMP_vars = [] # Empirical l_WBL_vars = [] # Weibull # Default Hs, Tp, Dir distributions GEV_vn = ["Hs", "Tp"] EMP_vn = ["Dir"] # mount family_variable lists for f in fams: for v in GEV_vn: l_GEV_vars.append("{0}_{1}".format(f, v)) for v in EMP_vn: l_EMP_vars.append("{0}_{1}".format(f, v)) # now add extra variables to GEV list if "extra_variables" in config.keys(): for vn in config["extra_variables"]: l_GEV_vars.append("{0}".format(vn)) # update extra variables parameter self.extra_variables = config["extra_variables"] # set custom distribution choices if "distribution" in config.keys(): for vn, vd in config["distribution"]: # clean variable default distribution if vn in l_GEV_vars: l_GEV_vars.pop(l_GEV_vars.index(vn)) if vn in l_EMP_vars: l_EMP_vars.pop(l_EMP_vars.index(vn)) # now asign variable new distribution if vd == "GEV": l_GEV_vars.append(vn) elif vd == "Empirical": l_EMP_vars.append(vn) elif vd == "Weibull": l_WBL_vars.append(vn) # chromosomes combination methodology option if "do_chromosomes" in config.keys(): self.do_chrom = config["do_chromosomes"] # store configuration: families, variables distribution lists self.fams = fams self.vars_GEV = l_GEV_vars self.vars_EMP = l_EMP_vars self.vars_WBL = l_WBL_vars # log print("Waves Families: {0}".format(self.fams)) print("Extra Variables: {0}".format(self.extra_variables)) print("GEV distribution: {0}".format(self.vars_GEV)) print("Empirical distribution: {0}".format(self.vars_EMP)) print("Weibull distribution: {0}".format(self.vars_WBL)) print("Do chromosomes combinations: {0}".format(self.do_chrom))
[docs] def FitExtremes(self, KMA, WVS, config, proxy="AWL"): """ GEV extremes fitting. Input data (waves vars series and bmus) shares time dimension KMA - xarray.Dataset, vars: bmus (time,), cenEOFs(n_clusters, n_features) WVS - xarray.Dataset: (time,), Hs, Tp, Dir, TC_category (time,), fam_V, {fam: sea, swell_1, ... V: Hs, Tp, Dir} (time,), extra_var1, extra_var2, ... config - configuration dictionary: view self.ConfigVariables() proxy - variable used to get DWTs max. storms (default AWL). proxy variable has to be inside WVS xarray.Dataset """ # configure waves fams variables parameters from config dict self.ConfigVariables(config) print("Max. Storms PROXY: {0}".format(proxy)) # store TCs WTs waves WVS_TCs = WVS.copy() # WVS.where(~np.isnan(WVS.TC_category), drop=True) # TODO select waves without TCs ? # WVS = WVS.where(np.isnan(WVS.TC_category), drop=True) # get start and end dates for each storm lt_storm_dates = self.Calc_StormsDates(KMA) # calculate max. PROXY variable for each storm ms_PROXY = self.Calc_StormsMaxProxy(WVS[proxy], lt_storm_dates) # select waves at storms max. ms_WVS = WVS.sel(time=ms_PROXY.time) ms_WVS[proxy] = ms_PROXY # reindex KMA, then select KMA data at storms max. KMA_rs = KMA.reindex(time=WVS.time, method="pad") ms_KMA = KMA_rs.sel(time=ms_PROXY.time) # GEV: Fit each wave family to a GEV distribution (KMA bmus) GEV_Par = self.Calc_GEVParams(ms_KMA, ms_WVS) # chromosomes combinations methodology if self.do_chrom: # calculate chromosomes and probabilities chromosomes = self.Calc_Chromosomes(ms_KMA, ms_WVS) # Calculate sigma spearman for each KMA - fams chromosome d_sigma = self.Calc_SigmaCorrelation(ms_KMA, ms_WVS, GEV_Par) else: # only one chromosome combination: all - on chromosomes = self.AllOn_Chromosomes(ms_KMA) # Calculate sigma spearman for each KMA (all chromosomes on) d_sigma = self.Calc_SigmaCorrelation_AllOn_Chromosomes( ms_KMA, ms_WVS, GEV_Par ) # store data self.WVS_MS = ms_WVS self.WVS_TCs = WVS_TCs self.KMA_MS = ms_KMA self.GEV_Par = GEV_Par self.chrom = chromosomes self.sigma = d_sigma self.Save()
[docs] def Save(self): "Saves fitted climate emulator data" if not op.isdir(self.p_base): os.makedirs(self.p_base) clean_files( [ self.p_WVS_MS, self.p_WVS_TCs, self.p_KMA_MS, self.p_chrom, self.p_GEV_Par, self.p_GEV_Sigma, self.p_config, ] ) # store .nc files self.WVS_MS.to_netcdf(self.p_WVS_MS) self.KMA_MS.to_netcdf(self.p_KMA_MS) self.WVS_TCs.to_netcdf(self.p_WVS_TCs) self.chrom.to_netcdf(self.p_chrom) self.GEV_Par.to_netcdf(self.p_GEV_Par) # store pickle pickle.dump(self.sigma, open(self.p_GEV_Sigma, "wb")) # store config pickle.dump( ( self.fams, self.vars_GEV, self.vars_EMP, self.vars_WBL, self.extra_variables, ), open(self.p_config, "wb"), )
[docs] def SaveSim(self, WVS_sim, TCs_sim, WVS_upd, n_sim): "Store waves and TCs simulation" # storage data and folders d_fs = [WVS_sim, TCs_sim, WVS_upd] p_fs = [self.p_sim_wvs_notcs, self.p_sim_tcs, self.p_sim_wvs_tcs] # store each simulation at different nc file for d, p in zip(d_fs, p_fs): if not op.isdir(p): os.makedirs(p) nm = "{0:08d}.nc".format(n_sim) # sim code StoreBugXdset(d, op.join(p, nm))
[docs] def Load(self): "Loads fitted climate emulator data" # store .nc files self.WVS_MS = xr.open_dataset(self.p_WVS_MS) self.KMA_MS = xr.open_dataset(self.p_KMA_MS) self.WVS_TCs = xr.open_dataset(self.p_WVS_TCs) self.chrom = xr.open_dataset(self.p_chrom) self.GEV_Par = xr.open_dataset(self.p_GEV_Par) # store pickle self.sigma = pickle.load(open(self.p_GEV_Sigma, "rb")) # load config self.fams, self.vars_GEV, self.vars_EMP, self.vars_WBL, self.extra_variables = ( pickle.load(open(self.p_config, "rb")) )
[docs] def LoadSim(self, n_sim=0): "Load waves and TCs simulations" WVS_sim, TCs_sim, WVS_upd = None, None, None nm = "{0:08d}.nc".format(n_sim) # sim code p_WVS_sim = op.join(self.p_sim_wvs_notcs, nm) p_TCS_sim = op.join(self.p_sim_tcs, nm) p_WVS_upd = op.join(self.p_sim_wvs_tcs, nm) if op.isfile(p_WVS_sim): WVS_sim = xr.open_dataset(p_WVS_sim) if op.isfile(p_TCS_sim): TCs_sim = xr.open_dataset(p_TCS_sim) if op.isfile(p_WVS_upd): WVS_upd = xr.open_dataset(p_WVS_upd) return WVS_sim, TCs_sim, WVS_upd
[docs] def LoadSim_All(self, n_sim_ce=0, TCs=True): """ Load all waves and TCs (1 DWT -> 1 output) simulations. Each max. storm simulation has a different time dimension, returns a pandas.DataFrame with added columns 'time' and 'n_sim' output will merge WVS_upd and TCs_sim data variables a unique n_sim_ce (inner climate emulator simulation) has to be chosen. TCs - True / False. Load WVS (TCs updated) + TCs / WVS (without TCs) simulations """ # count available waves simulations n_sims_DWTs = len(glob.glob(op.join(self.p_sim_wvs_tcs, "*.nc"))) # iterate over simulations l_sims = [] for n in range(n_sims_DWTs): if TCs: # Load simulated Waves (TCs updated) and TCs _, TCs_sim, WVS_upd = self.LoadSim(n_sim=n) pd_sim = xr.merge( [WVS_upd.isel(n_sim=n_sim_ce), TCs_sim.isel(n_sim=n_sim_ce)] ).to_dataframe() else: # Load simulated Waves (without TCs) WVS_sim, _, _ = self.LoadSim(n_sim=n) pd_sim = WVS_sim.isel(n_sim=n_sim_ce).to_dataframe() # add columns pd_sim["n_sim"] = n # store simulation index with data pd_sim["time"] = pd_sim.index # store dates as a variable l_sims.append(pd_sim) # join all waves simulations all_sims = pd.concat(l_sims, ignore_index=True) return all_sims
[docs] def Set_Simulation_Folder(self, p_sim, copy_WAVES_noTCs=False): """ Modifies climate emulator default simulation path p_sim - new simulation path copy_WAVES_noTCs - copies simulated waves (no TCs) from original path """ # store base path p_base = self.p_sims # update simulation and files paths self.p_sims = p_sim self.p_sim_wvs_notcs = op.join(self.p_sims, "WAVES_noTCs") self.p_sim_wvs_tcs = op.join(self.p_sims, "WAVES_TCs") self.p_sim_tcs = op.join(self.p_sims, "TCs") # optional copy files if copy_WAVES_noTCs: if op.isdir(self.p_sim_wvs_notcs): shutil.rmtree(self.p_sim_wvs_notcs) shutil.copytree(op.join(p_base, "WAVES_noTCs"), self.p_sim_wvs_notcs)
[docs] def Calc_StormsDates(self, xds_KMA): "Returns list of tuples with each storm start and end times" # locate dates where KMA WT changes (bmus series) bmus_diff = np.diff(xds_KMA.bmus.values) ix_ch = np.where((bmus_diff != 0))[0] + 1 ix_ch = np.insert(ix_ch, 0, 0) ds_ch = xds_KMA.time.values[ix_ch] # dates where WT changes # list of tuples with (date start, date end) for each storm (WT window) dates_tup_WT = [ (ds_ch[c], ds_ch[c + 1] - np.timedelta64(1, "D")) for c in range(len(ds_ch) - 1) ] dates_tup_WT.append( (dates_tup_WT[-1][1] + np.timedelta64(1, "D"), xds_KMA.time.values[-1]) ) return dates_tup_WT
[docs] def Calc_StormsMaxProxy(self, wvs_PROXY, lt_storm_dates): "Returns xarray.Dataset with max. PROXY variable value and time" # find max PROXY inside each storm ms_PROXY = [] ms_times = [] for d1, d2 in lt_storm_dates: # get TWL inside WT window wt_PROXY = wvs_PROXY.sel(time=slice(d1, d2 + np.timedelta64(23, "h")))[:] # get window maximum TWL date wt_PROXY_max = wt_PROXY.where( wt_PROXY == wt_PROXY.max(), drop=True ).squeeze() # append data ms_PROXY.append(wt_PROXY_max.values) ms_times.append(wt_PROXY_max.time.values) return wvs_PROXY.sel(time=ms_times)
[docs] def Calc_GEVParams(self, xds_KMA_MS, xds_WVS_MS): """ Fits each WT (KMA.bmus) waves families data to a GEV distribtion Requires KMA and WVS families at storms max. TWL Returns xarray.Dataset with GEV shape, location and scale parameters """ vars_gev = self.vars_GEV bmus = xds_KMA_MS.bmus.values[:] cenEOFs = xds_KMA_MS.cenEOFs.values[:] n_clusters = len(xds_KMA_MS.n_clusters) xds_GEV_Par = xr.Dataset( coords={ "n_cluster": np.arange(n_clusters) + 1, "parameter": ["shape", "location", "scale"], } ) # Fit each wave family var to GEV distribution (using KMA bmus) for vn in vars_gev: gp_pars = FitGEV_KMA_Frechet(bmus, n_clusters, xds_WVS_MS[vn].values[:]) xds_GEV_Par[vn] = ( ( "n_cluster", "parameter", ), gp_pars, ) return xds_GEV_Par
[docs] def Calc_Chromosomes(self, xds_KMA_MS, xds_WVS_MS): """ Calculate chromosomes and probabilities from KMA.bmus data Returns xarray.Dataset vars: chrom, chrom_probs. dims: WT, wave_family """ bmus = xds_KMA_MS.bmus.values[:] n_clusters = len(xds_KMA_MS.n_clusters) fams_chrom = self.fams l_vc = ["{0}_Hs".format(x) for x in fams_chrom] # get chromosomes matrix np_vc = np.column_stack([xds_WVS_MS[vn].values for vn in l_vc]) chrom = ChromMatrix(np_vc) # calculate chromosomes probabilities probs = np.zeros((n_clusters, chrom.shape[0])) for i in range(n_clusters): c = i + 1 pos = np.where((bmus == c))[0] # get variables chromosomes at cluster var_c = np_vc[pos, :] var_c[~np.isnan(var_c)] = 1 var_c[np.isnan(var_c)] = 0 # count chromosomes ucs, ccs = np.unique(var_c, return_counts=True, axis=0) tcs = var_c.shape[0] # get probs of each chromosome for uc, cc in zip(ucs, ccs): # skip all empty chromosomes if ~uc.any(): continue pc = np.where(np.all(uc == chrom, axis=1))[0][0] probs[i, pc] = cc / tcs # chromosomes dataset return xr.Dataset( { "chrom": ( ( "n", "wave_family", ), chrom, ), "probs": ( ( "WT", "n", ), probs, ), }, coords={ "WT": np.arange(n_clusters) + 1, "wave_family": fams_chrom, }, )
[docs] def AllOn_Chromosomes(self, xds_KMA_MS): """ Generate Fake chromosomes and probabilities from KMA.bmus data in order to use only all-on chromosome combination Returns xarray.Dataset vars: chrom, chrom_probs. dims: WT, wave_family """ bmus = xds_KMA_MS.bmus.values[:] n_clusters = len(xds_KMA_MS.n_clusters) fams_chrom = self.fams # fake chromosomes and probabilities chrom = np.ones((1, len(fams_chrom))) probs = np.ones((n_clusters, 1)) # chromosomes dataset return xr.Dataset( { "chrom": ( ( "n", "wave_family", ), chrom, ), "probs": ( ( "WT", "n", ), probs, ), }, coords={ "WT": np.arange(n_clusters) + 1, "wave_family": fams_chrom, }, )
[docs] def Calc_SigmaCorrelation(self, xds_KMA_MS, xds_WVS_MS, xds_GEV_Par): "Calculate Sigma Pearson correlation for each WT-chromosome combo" bmus = xds_KMA_MS.bmus.values[:] cenEOFs = xds_KMA_MS.cenEOFs.values[:] n_clusters = len(xds_KMA_MS.n_clusters) wvs_fams = self.fams vars_extra = self.extra_variables vars_GEV = self.vars_GEV # smooth GEV shape parameter d_shape = {} for vn in vars_GEV: sh_GEV = xds_GEV_Par.sel(parameter="shape")[vn].values[:] d_shape[vn] = Smooth_GEV_Shape(cenEOFs, sh_GEV) # Get sigma correlation for each KMA cluster d_sigma = {} # nested dict [WT][crom] for iwt in range(n_clusters): c = iwt + 1 pos = np.where((bmus == c))[0] d_sigma[c] = {} # current cluster waves xds_K_wvs = xds_WVS_MS.isel(time=pos) # get chromosomes from waves (0/1) var_c = np.column_stack( [xds_K_wvs["{0}_Hs".format(x)].values[:] for x in wvs_fams] ) var_c[~np.isnan(var_c)] = 1 var_c[np.isnan(var_c)] = 0 chrom = ChromMatrix(var_c) # get sigma for each chromosome for ucix, uc in enumerate(chrom): wt_crom = 1 # data / no data # find data position for this chromosome p_c = np.where((var_c == uc).all(axis=1))[0] # if not enought data, get all chromosomes with shared 1s if len(p_c) < 20: p1s = np.where(uc == 1)[0] p_c = np.where((var_c[:, p1s] == uc[p1s]).all(axis=1))[0] wt_crom = 0 # data / no data # select waves chrom data xds_chr_wvs = xds_K_wvs.isel(time=p_c) # solve normal inverse GEV/EMP/WBL CDF for each active chromosome to_corr = np.empty((0, len(p_c))) # append for spearman correlation for i_c in np.where(uc == 1)[0]: # get wave family chromosome variables fam_n = wvs_fams[i_c] vn_Hs = "{0}_Hs".format(fam_n) vn_Tp = "{0}_Tp".format(fam_n) vn_Dir = "{0}_Dir".format(fam_n) vv_Hs = xds_chr_wvs[vn_Hs].values[:] vv_Tp = xds_chr_wvs[vn_Tp].values[:] vv_Dir = xds_chr_wvs[vn_Dir].values[:] # Hs norm_Hs = self.CDF_Distribution( vn_Hs, vv_Hs, xds_GEV_Par, d_shape, iwt ) # Tp norm_Tp = self.CDF_Distribution( vn_Tp, vv_Tp, xds_GEV_Par, d_shape, iwt ) # Dir norm_Dir = self.CDF_Distribution( vn_Dir, vv_Dir, xds_GEV_Par, d_shape, iwt ) # normal inverse CDF u_cdf = np.column_stack([norm_Hs, norm_Tp, norm_Dir]) u_cdf[u_cdf >= 1.0] = 0.999999 inv_n = ndtri(u_cdf) # concatenate data for correlation to_corr = np.concatenate((to_corr, inv_n.T), axis=0) # concatenate extra variables for correlation for vn in vars_extra: vv = xds_chr_wvs[vn].values[:] norm_vn = self.CDF_Distribution(vn, vv, xds_GEV_Par, d_shape, iwt) norm_vn[norm_vn >= 1.0] = 0.999999 inv_n = ndtri(norm_vn) to_corr = np.concatenate((to_corr, inv_n[:, None].T), axis=0) # sigma: spearman correlation corr, pval = spearmanr(to_corr, axis=1) # store data at dict d_sigma[c][ucix] = {"corr": corr, "data": len(p_c), "wt_crom": wt_crom} return d_sigma
[docs] def Calc_SigmaCorrelation_AllOn_Chromosomes( self, xds_KMA_MS, xds_WVS_MS, xds_GEV_Par ): "Calculate Sigma Pearson correlation for each WT, all on chrom combo" bmus = xds_KMA_MS.bmus.values[:] cenEOFs = xds_KMA_MS.cenEOFs.values[:] n_clusters = len(xds_KMA_MS.n_clusters) wvs_fams = self.fams vars_extra = self.extra_variables vars_GEV = self.vars_GEV # smooth GEV shape parameter d_shape = {} for vn in vars_GEV: sh_GEV = xds_GEV_Par.sel(parameter="shape")[vn].values[:] d_shape[vn] = Smooth_GEV_Shape(cenEOFs, sh_GEV) # Get sigma correlation for each KMA cluster d_sigma = {} # nested dict [WT][crom] for iwt in range(n_clusters): c = iwt + 1 pos = np.where((bmus == c))[0] d_sigma[c] = {} # current cluster waves xds_K_wvs = xds_WVS_MS.isel(time=pos) # append data for spearman correlation to_corr = np.empty((0, len(xds_K_wvs.time))) # solve normal inverse GEV/EMP/WBL CDF for each waves family for fam_n in wvs_fams: # get wave family variables vn_Hs = "{0}_Hs".format(fam_n) vn_Tp = "{0}_Tp".format(fam_n) vn_Dir = "{0}_Dir".format(fam_n) vv_Hs = xds_K_wvs[vn_Hs].values[:] vv_Tp = xds_K_wvs[vn_Tp].values[:] vv_Dir = xds_K_wvs[vn_Dir].values[:] # fix fams nan: Hs 0, Tp mean, dir mean p_nans = np.where(np.isnan(vv_Hs))[0] vv_Hs[p_nans] = 0 vv_Tp[p_nans] = np.nanmean(vv_Tp) vv_Dir[p_nans] = np.nanmean(vv_Dir) # Hs norm_Hs = self.CDF_Distribution(vn_Hs, vv_Hs, xds_GEV_Par, d_shape, iwt) # Tp norm_Tp = self.CDF_Distribution(vn_Tp, vv_Tp, xds_GEV_Par, d_shape, iwt) # Dir norm_Dir = self.CDF_Distribution( vn_Dir, vv_Dir, xds_GEV_Par, d_shape, iwt ) # normal inverse CDF u_cdf = np.column_stack([norm_Hs, norm_Tp, norm_Dir]) u_cdf[u_cdf >= 1.0] = 0.999999 inv_n = ndtri(u_cdf) # concatenate data for correlation to_corr = np.concatenate((to_corr, inv_n.T), axis=0) # concatenate extra variables for correlation for vn in vars_extra: vv = xds_K_wvs[vn].values[:] norm_vn = self.CDF_Distribution(vn, vv, xds_GEV_Par, d_shape, iwt) norm_vn[norm_vn >= 1.0] = 0.999999 inv_n = ndtri(norm_vn) to_corr = np.concatenate((to_corr, inv_n[:, None].T), axis=0) # sigma: spearman correlation corr, pval = spearmanr(to_corr, axis=1) # store data at dict (keep cromosomes structure) d_sigma[c][0] = {"corr": corr, "data": len(xds_K_wvs.time), "wt_crom": 1} return d_sigma
[docs] def GEV_Parameters_Sampling(self, n_sims): """ Sample new GEV/GUMBELL parameters using GEV/GUMBELL asymptotic variances num_sims - number of GEV parameters to sample """ xds_GEV_Par = self.GEV_Par vars_gev = self.vars_GEV xds_KMA_MS = self.KMA_MS xds_WVS_MS = self.WVS_MS # get KMA data bmus = xds_KMA_MS.bmus.values[:] n_clusters = len(xds_KMA_MS.n_clusters) cenEOFs = xds_KMA_MS.cenEOFs.values[:] # dataset for storing parameters xds_par_samp = xr.Dataset( {}, coords={ "parameter": ["shape", "location", "scale"], "n_cluster": np.arange(n_clusters) + 1, "simulation": range(n_sims), }, ) # simulate variables for vn in vars_gev: # GEV/GUMBELL parameters pars_GEV = xds_GEV_Par[vn] sha = pars_GEV.sel(parameter="shape").values[:] sca = pars_GEV.sel(parameter="scale").values[:] loc = pars_GEV.sel(parameter="location").values[:] # location parameter Extremal Index (Gev) index = np.ones(sha.shape) mu_b = loc - (sca / sha) * (1 - np.power(index, sha)) psi_b = sca * np.power(index, sha) # location parameter Extremal Index (Gumbell) sha_gbl = 0.0000000001 pos_gbl = np.where(sha == sha_gbl)[0] cls_gbl = pos_gbl + 1 # Gumbell Weather Types # update mu_b mu_b[pos_gbl] = loc[pos_gbl] + sca[pos_gbl] * np.log(index[pos_gbl]) # output holder out_ps = np.ndarray((n_clusters, n_sims, 3)) * np.nan # sample Gumbel or GEV parameters for each WT for i in range(n_clusters): c = i + 1 # WT ID # get var values at cluster and remove nans p_bmus = np.where((bmus == c))[0] var_wvs = xds_WVS_MS[vn].isel(time=p_bmus).values[:] var_wvs = var_wvs[~np.isnan(var_wvs)] # Gumbel WTs: parameters sampling if c in cls_gbl: # GUMBELL Loglikelihood function acov theta = (loc[i], sca[i]) acov = ACOV(gumbel_l.nnlf, theta, var_wvs) # GUMBELL params used for multivar. normal random generation theta_gen = np.array([mu_b[i], sca[i]]) theta_gbl = multivariate_normal(theta_gen, acov, n_sims) # mount "GEV" params for simulation theta_sim = np.ones((n_sims, 3)) * sha_gbl theta_sim[:, 1:] = theta_gbl # GEV WTs: parameters sampling else: # GEV Loglikelihood function acov theta = (sha[i], loc[i], sca[i]) acov = ACOV(genextreme.nnlf, theta, var_wvs) # GEV params used for multivar. normal random generation theta_gen = np.array([sha[i], mu_b[i], psi_b[i]]) theta_sim = multivariate_normal(theta_gen, acov, n_sims) # store sampled GEV/GUMBELL params out_ps[i, :, :] = theta_sim[:, :] # smooth shape parameter for j in range(n_sims): shape_wts = out_ps[:, j, 0] out_ps[:, j, 0] = Smooth_GEV_Shape(cenEOFs, shape_wts) # append output to dataset xds_par_samp[vn] = (("n_cluster", "simulation", "parameter"), out_ps) return xds_par_samp
[docs] def Simulate_Waves( self, xds_DWT, n_sims=1, filters={"hs": False, "tp": False, "ws": False} ): """ Climate Emulator DWTs waves simulation xds_DWT - xarray.Dataset, vars: evbmus_sims (time,) n_sims - number of simulations to compute filters - filter simulated waves by hs, tp, and/or wave setpness """ # TODO can optimize? # max. storm waves and KMA xds_KMA_MS = self.KMA_MS xds_WVS_MS = self.WVS_MS xds_WVS_TCs = self.WVS_TCs xds_chrom = self.chrom xds_GEV_Par = self.GEV_Par sigma = self.sigma # vars needed dwt_bmus_sim = xds_DWT.evbmus_sims.values[:] dwt_time_sim = xds_DWT.time.values[:] bmus = xds_KMA_MS.bmus.values[:] n_clusters = len(xds_KMA_MS.n_clusters) chrom = xds_chrom.chrom.values[:] chrom_probs = xds_chrom.probs.values[:] # iterate n_sims ls_wvs_sim = [] for i_sim in range(n_sims): # get number of simulations idw, iuc = np.unique(dwt_bmus_sim, return_counts=True) num_gev_sims = np.max(iuc) # Sample GEV/GUMBELL parameters xds_GEV_Par_Sampled = self.GEV_Parameters_Sampling(num_gev_sims) # generate waves wvs_sim = self.GenerateWaves( bmus, n_clusters, chrom, chrom_probs, sigma, xds_WVS_MS, xds_WVS_TCs, xds_GEV_Par_Sampled, dwt_bmus_sim, dwt_time_sim, filters=filters, ) ls_wvs_sim.append(wvs_sim) # concatenate simulations WVS_sims = xr.concat(ls_wvs_sim, "n_sim") return WVS_sims
[docs] def Simulate_TCs( self, xds_DWT, WVS_sims, xds_TCs_params, xds_TCs_simulation, prob_change_TCs, MU_WT, TAU_WT, extra_vars_update=[], ): """ Climate Emulator DWTs TCs simulation xds_DWT - xarray.Dataset, vars: evbmus_sims (time,) WVS_sim - xarray.Dataset, output from Simulate_Waves() xds_TCs_params - xr.Dataset. vars(storm): pressure_min xds_TCs_simulation - xr.Dataset. vars(storm): mu, hs, ss, tp, dir prob_change_TCs - cumulative probabilities of TC category change MU_WT, TAU_WT - intradaily hidrographs for each WT extra_vars_update - list(string), optional extra variables to update with value from "xds_TCs_simulation" """ # max. storm waves and KMA xds_KMA_MS = self.KMA_MS # vars needed dwt_bmus_sim = xds_DWT.evbmus_sims.values[:] dwt_time_sim = xds_DWT.time.values[:] n_clusters = len(xds_KMA_MS.n_clusters) # iterate waves simulations ls_tcs_sim = [] ls_wvs_upd = [] for i_sim in WVS_sims.n_sim: wvs_s = WVS_sims.sel(n_sim=i_sim) # generate TCs tcs_sim, wvs_upd_sim = self.GenerateTCs( n_clusters, dwt_bmus_sim, dwt_time_sim, xds_TCs_params, xds_TCs_simulation, prob_change_TCs, MU_WT, TAU_WT, wvs_s, extra_vars_update=extra_vars_update, ) ls_tcs_sim.append(tcs_sim) ls_wvs_upd.append(wvs_upd_sim) # concatenate simulations TCs_sim = xr.concat(ls_tcs_sim, "n_sim") WVS_upd = xr.concat(ls_wvs_upd, "n_sim") return TCs_sim, WVS_upd
[docs] def CDF_Distribution(self, vn, vv, xds_GEV_Par, d_shape, i_wt): """ Switch function: GEV / Empirical / Weibull Check variable distribution and calculates CDF vn - var name vv - var value i_wt - Weather Type index xds_GEV_Par , d_shape: GEV data used in sigma correlation """ # get GEV / EMPIRICAL / WEIBULL variables list vars_GEV = self.vars_GEV vars_EMP = self.vars_EMP vars_WBL = self.vars_WBL # switch variable name if vn in vars_GEV: # gev CDF sha_g = d_shape[vn][i_wt] loc_g = xds_GEV_Par.sel(parameter="location")[vn].values[i_wt] sca_g = xds_GEV_Par.sel(parameter="scale")[vn].values[i_wt] norm_VV = genextreme.cdf(vv, -1 * sha_g, loc_g, sca_g) elif vn in vars_EMP: # empirical CDF ecdf = ECDF(vv) norm_VV = ecdf(vv) elif vn in vars_WBL: # Weibull CDF norm_VV = weibull_min.cdf(vv, *weibull_min.fit(vv)) return norm_VV
[docs] def ICDF_Distribution(self, vn, vv, pb, xds_GEV_Par, i_wt): """ Switch function: GEV / Empirical / Weibull Check variable distribution and calculates ICDF vn - var name vv - var value pb - var simulation probs i_wt - Weather Type index xds_GEV_Par: GEV parameters """ # optional empirical var_wt override fv = "{0}_{1}".format(vn, i_wt + 1) if fv in self.sim_icdf_empirical_override: ppf_VV = Empirical_ICDF(vv, pb) return ppf_VV # get GEV / EMPIRICAL / WEIBULL variables list vars_GEV = self.vars_GEV vars_EMP = self.vars_EMP vars_WBL = self.vars_WBL # switch variable name if vn in vars_GEV: # gev ICDF sha_g = xds_GEV_Par.sel(parameter="shape")[vn].values[i_wt] loc_g = xds_GEV_Par.sel(parameter="location")[vn].values[i_wt] sca_g = xds_GEV_Par.sel(parameter="scale")[vn].values[i_wt] ppf_VV = genextreme.ppf(pb, -1 * sha_g, loc_g, sca_g) elif vn in vars_EMP: # empirical ICDF ppf_VV = Empirical_ICDF(vv, pb) elif vn in vars_WBL: # Weibull ICDF ppf_VV = weibull_min.ppf(pb, *weibull_min.fit(vv)) return ppf_VV
[docs] def GenerateWaves( self, bmus, n_clusters, chrom, chrom_probs, sigma, xds_WVS_MS, xds_WVS_TCs, xds_GEV_Par_Sampled, DWT, DWT_time, filters={"hs": False, "tp": False, "ws": False}, ): """ Climate Emulator DWTs waves simulation bmus - KMA max. storms bmus series n_clusters - KMA number of clusters chrom, chrom_probs - chromosomes and probabilities sigma - pearson correlation for each WT xds_GEV_Par_Sampled - GEV/GUMBELL parameters sampled for simulation DWT - np.array with DWT bmus sim series (dims: time,) filters - filter simulated waves by hs, tp, and/or wave setpness Returns xarray.Dataset with simulated storm data vars: *fam*_*vn* (fam: sea, swell_1, swell_2, vn: Hs, Tp, Dir), DWT_sim dims: storm """ # waves parameters filters hs_min, hs_max = self.sim_waves_filter["hs"] tp_min, tp_max = self.sim_waves_filter["tp"] ws_min, ws_max = self.sim_waves_filter["ws"] # waves families - variables (sorted for simulation output) wvs_fams = self.fams wvs_fams_vars = [ ("{0}_{1}".format(f, vn)) for f in wvs_fams for vn in ["Hs", "Tp", "Dir"] ] # extra variables (optional) vars_extra = self.extra_variables # simulate one value for each storm dwt_df = np.diff(DWT) dwt_df[-1] = 1 # ensure last day storm ix_ch = np.where((dwt_df != 0))[0] + 1 ix_ch = np.insert(ix_ch, 0, 0) # get first day storm DWT_sim = DWT[ix_ch] DWT_time_sim = DWT_time[ix_ch] # new progress bar pbar = tqdm(total=len(DWT_sim), desc="C.E: Sim. Waves") # Simulate srl = len(wvs_fams) * 3 + len(vars_extra) # simulation row length sims_out = np.zeros((len(DWT_sim), srl)) c = 0 while c < len(DWT_sim): WT = int(DWT_sim[c]) iwt = WT - 1 # KMA Weather Types waves generation if WT <= n_clusters: # get random chromosome (weigthed choice) pr = chrom_probs[iwt] / np.sum(chrom_probs[iwt]) ci = choice(range(chrom.shape[0]), 1, p=pr) crm = chrom[ci].astype(int).squeeze() # get sigma correlation for this WT - crm combination corr = sigma[WT][int(ci)]["corr"] mvn_m = np.zeros(corr.shape[0]) sims = multivariate_normal(mvn_m, corr) prob_sim = norm.cdf(sims, 0, 1) # solve normal inverse CDF for each active chromosome ipbs = 0 # prob_sim aux. index sim_row = np.zeros(srl) for i_c in np.where(np.atleast_1d(crm) == 1)[0]: # random sampled GEV rd = np.random.randint(0, len(xds_GEV_Par_Sampled.simulation)) xds_GEV_Par = xds_GEV_Par_Sampled.isel(simulation=rd) # get wave family chromosome variables fam_n = wvs_fams[i_c] vn_Hs = "{0}_Hs".format(fam_n) vn_Tp = "{0}_Tp".format(fam_n) vn_Dir = "{0}_Dir".format(fam_n) # use WAVES that are not from TCs vv_Hs = xds_WVS_MS[vn_Hs].values[:] vv_Tp = xds_WVS_MS[vn_Tp].values[:] vv_Dir = xds_WVS_MS[vn_Dir].values[:] pb_Hs = prob_sim[ipbs + 0] pb_Tp = prob_sim[ipbs + 1] pb_Dir = prob_sim[ipbs + 2] ipbs += 3 # Hs ppf_Hs = self.ICDF_Distribution( vn_Hs, vv_Hs, pb_Hs, xds_GEV_Par, iwt ) # Tp ppf_Tp = self.ICDF_Distribution( vn_Tp, vv_Tp, pb_Tp, xds_GEV_Par, iwt ) # Dir ppf_Dir = self.ICDF_Distribution( vn_Dir, vv_Dir, pb_Dir, xds_GEV_Par, iwt ) # store simulation data is0, is1 = ( wvs_fams.index(fam_n) * 3, (wvs_fams.index(fam_n) + 1) * 3, ) sim_row[is0:is1] = [ppf_Hs, ppf_Tp, ppf_Dir] # solve normal inverse CDF for each extra variable ipbs = len(wvs_fams) * 3 for vn in vars_extra: # random sampled GEV rd = np.random.randint(0, len(xds_GEV_Par_Sampled.simulation)) xds_GEV_Par = xds_GEV_Par_Sampled.isel(simulation=rd) vv = xds_WVS_MS[vn].values[:] pb_vv = prob_sim[ipbs] ppf_vv = self.ICDF_Distribution(vn, vv, pb_vv, xds_GEV_Par, iwt) # store simulation data sim_row[ipbs] = ppf_vv ipbs += 1 # TCs Weather Types waves generation else: # Get TC-WT waves fams data ixtc = np.where(xds_WVS_TCs.TC_category == WT - n_clusters - 1)[0] tws = xds_WVS_TCs.isel(time=ixtc) # select random state ri = randint(len(tws.time)) # generate sim_row with sorted waves families variables sim_row = np.stack( [tws[vn].values[ri] for vn in wvs_fams_vars + vars_extra] ) # Filters # all 0 chromosomes # if all(c == 0 for c in crm): # continue # nan / negative values if np.isnan(sim_row).any() or len(np.where(sim_row < 0)[0]) != 0: continue # custom "bad data" filter dir_s = sim_row[2 : len(wvs_fams) * 3 : 3][crm == 1] if any(v > 360.0 for v in dir_s): continue # wave hs if filters["hs"]: hs_s = sim_row[0 : len(wvs_fams) * 3 : 3][crm == 1] if any(v <= hs_min for v in hs_s) or any(v >= hs_max for v in hs_s): continue # wave tp if filters["tp"]: tp_s = sim_row[1 : len(wvs_fams) * 3 : 3][crm == 1] if any(v <= tp_min for v in tp_s) or any(v >= tp_max for v in tp_s): continue # wave stepness if filters["ws"]: hs_s = sim_row[0 : len(wvs_fams) * 3 : 3][crm == 1] tp_s = sim_row[1 : len(wvs_fams) * 3 : 3][crm == 1] ws_s = hs_s / (1.56 * tp_s**2) if any(v <= ws_min for v in ws_s) or any(v >= ws_max for v in ws_s): continue # store simulation sim_row[sim_row == 0] = np.nan # nan data at crom 0 sims_out[c] = sim_row c += 1 # progress bar pbar.update(1) pbar.close() # dataset for storing output xds_wvs_sim = xr.Dataset( { "DWT": (("time",), DWT_sim), }, coords={"time": DWT_time_sim}, ) for c, vn in enumerate(wvs_fams_vars + vars_extra): xds_wvs_sim[vn] = (("time",), sims_out[:, c]) return xds_wvs_sim
[docs] def GenerateTCs( self, n_clusters, DWT, DWT_time, TCs_params, TCs_simulation, prob_TCs, MU_WT, TAU_WT, xds_wvs_sim, extra_vars_update=[], ): """ Climate Emulator DWTs TCs simulation n_clusters - KMA number of clusters DWT - np.array with DWT bmus sim series (dims: time,) TCs_params - xr.Dataset. vars(storm): pressure_min TCs_simulation - xr.Dataset. vars(storm): mu, hs, ss, tp, dir prob_TCs - cumulative probabilities of TC category change MU_WT, TAU_WT - intradaily hidrographs for each WT xds_wvs_sim - xr.Dataset, waves simulated without TCs (for updating) extra_vars_update - list(string), optional extra variables to update with value from "TCs_simulation" returns xarray.Datasets with updated Waves and simulated TCs data vars waves: *fam*_*vn* (fam: sea, swell_1, swell_2 ..., vn: Hs, Tp, Dir), vars TCS: mu, tau, ss dims: storm """ # wave family to modify mod_fam = "sea" # TODO input parameter # waves families - variables (sorted for simulation output) wvs_fams = self.fams wvs_fams_vars = [ ("{0}_{1}".format(f, vn)) for f in wvs_fams for vn in ["Hs", "Tp", "Dir"] ] # simulate one value for each storm dwt_df = np.diff(DWT) dwt_df[-1] = 1 # ensure last day storm ix_ch = np.where((dwt_df != 0))[0] + 1 ix_ch = np.insert(ix_ch, 0, 0) # get first day storm DWT_sim = DWT[ix_ch] DWT_time_sim = DWT_time[ix_ch] # get simulated waves for updating sim_wvs = np.column_stack([xds_wvs_sim[vn].values[:] for vn in wvs_fams_vars]) # get simulated extra variables for updating (optional) if extra_vars_update: sim_extra = np.column_stack( [xds_wvs_sim[vn].values[:] for vn in extra_vars_update] ) # new progress bar pbar = tqdm(total=len(DWT_sim), desc="C.E: Sim. TCs ") # Simulate TCs (mu, ss, tau) sims_out = np.zeros((len(DWT_sim), 3)) c = 0 while c < len(DWT_sim): WT = int(DWT_sim[c]) iwt = WT - 1 do_upd_wvs = False # to record when to update simulated waves # KMA Weather Types tcs generation if WT <= n_clusters: # get random MU,TAU from current WT # TODO: random excluyente? ri = randint(len(MU_WT[iwt])) mu_s = MU_WT[iwt][ri] tau_s = TAU_WT[iwt][ri] ss_s = 0 # TCs Weather Types waves generation else: # get probability of category change for this WT prob_t = np.append(prob_TCs[:, iwt - n_clusters], 1) # generate random r2 category ri = rand() si = np.where(prob_t >= ri)[0][0] if si == len(prob_t) - 1: # TC does not enter. random mu_s, 0.5 tau_s, 0 ss_s all_MUs = np.concatenate(MU_WT) ri = randint(len(all_MUs)) # TODO: check mu 0s, set nans (?) mu_s = all_MUs[ri] tau_s = 0.5 ss_s = 0 else: # locate TCs with category "si" s_pmin = TCs_params.pressure_min.values[:] p1, p2 = { 0: (1000, np.nanmax(s_pmin) + 1), 1: (979, 1000), 2: (964, 979), 3: (944, 964), 4: (920, 944), 5: (np.nanmin(s_pmin) - 1, 920), }[si] psi = np.where((s_pmin > p1) & (s_pmin <= p2))[0] if psi.any(): # get a random TC from psi indexes ri = choice(psi) mu_s = TCs_simulation.mu.values[ri] ss_s = TCs_simulation.ss.values[ri] tau_s = 0.5 # Get waves family data from simulated TCs (numerical+rbf) mod_fam_Hs = TCs_simulation.hs.values[ri] mod_fam_Tp = TCs_simulation.tp.values[ri] mod_fam_Dir = TCs_simulation.dir.values[ri] # locate index of wave family to modify ixu = wvs_fams.index(mod_fam) * 3 # replace waves: only sea family upd_wvs = sim_wvs[c, :] * 0 upd_wvs[ixu : ixu + 3] = [mod_fam_Hs, mod_fam_Tp, mod_fam_Dir] do_upd_wvs = True # replace extra variables (optional) if extra_vars_update: upd_extra = sim_extra[c, :] * 0 for ve_c, ve in enumerate(extra_vars_update): upd_extra[ve_c] = TCs_simulation[ve].values[ri] else: # TODO: no deberia caer aqui mu_s = 0 ss_s = 0 tau_s = 0 sim_row = np.array([mu_s, tau_s, ss_s]) # no nans or values < 0 stored if ~np.isnan(sim_row).any() and len(np.where(sim_row < 0)[0]) == 0: # store TCs sim sims_out[c] = sim_row if do_upd_wvs: # update waves: only sea from TCs sim_wvs[c, :] = upd_wvs # update_extra_variables (optional) if extra_vars_update: sim_extra[c, :] = upd_extra c += 1 # progress bar pbar.update(1) pbar.close() # update waves simulation xds_WVS_sim_updated = xr.Dataset( { "DWT": (("time",), DWT_sim), }, coords={"time": DWT_time_sim}, ) for c, vn in enumerate(wvs_fams_vars): xds_WVS_sim_updated[vn] = (("time",), sim_wvs[:, c]) # add extra variables to waves simulation update (optional) if extra_vars_update: for c, vn in enumerate(extra_vars_update): xds_WVS_sim_updated[vn] = (("time",), sim_extra[:, c]) # generated TCs xds_TCs_sim = xr.Dataset( { "mu": (("time",), sims_out[:, 0]), "tau": (("time",), sims_out[:, 1]), "ss": (("time",), sims_out[:, 2]), "DWT": (("time",), DWT_sim), }, coords={"time": DWT_time_sim}, ) return xds_TCs_sim, xds_WVS_sim_updated
[docs] def Report_Fit(self, vns_GEV=["Hs"], show=True, plot_chrom=True, plot_sigma=True): """ Report for extremes model fitting - GEV parameters for vns_GEV - chromosomes probabilities - sigma correlation """ f_out = [] # GEV variables reports for vn in vns_GEV: fs = self.Report_Gev(vn=vn, show=show) f_out = f_out + fs # Chromosomes and Sigma reports chrom = self.chrom d_sigma = self.sigma # Plot cromosomes probabilities if plot_chrom: f = Plot_ChromosomesProbs(chrom, show=show) f_out.append(f) # Plot sigma correlation triangle if plot_sigma: f = Plot_SigmaCorrelation(chrom, d_sigma, show=show) f_out.append(f) return f_out
[docs] def Report_Gev(self, vn="Hs", show=True): "Plot vn variable GEV parameters for each WT. variables: Hs, Tp" # GEV parameters GEV_Par = self.GEV_Par.copy(deep="True") # locate fam_vn variables vars_gev_params = [x for x in self.vars_GEV if vn in x] f_out = [] for gvn in vars_gev_params: f = Plot_GEVParams(GEV_Par[gvn], show=show) f_out.append(f) return f_out
[docs] def Report_Sim_QQplot(self, WVS_sim, vn, n_sim=0, show=True): """ QQplot for variable vn """ # TODO # f_out = Plot_QQplot(kma_bmus, d_sigma, show=show) return None
[docs] def ChromMatrix(vs): "Return chromosome matrix for np.array vs (n x nvars)" n_cols = vs.shape[1] chrom = np.empty((0, n_cols), int) b = np.zeros(n_cols) for c in range(n_cols): b[c] = 1 for r in set(permutations(b.tolist())): chrom = np.row_stack([chrom, np.array(r)]) return chrom