Source code for bluemath_tk.teslakit.alr

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# common
import pickle
import time
import sys
import os
import os.path as op
from collections import OrderedDict
from datetime import datetime, date, timedelta

# pip
import numpy as np
import pandas as pd
from sklearn import linear_model
import statsmodels.discrete.discrete_model as sm
import scipy.stats as stat
import xarray as xr

# fix tqdm for notebook
from tqdm import tqdm as tqdm_base


[docs] def tqdm(*args, **kwargs): if hasattr(tqdm_base, "_instances"): for instance in list(tqdm_base._instances): tqdm_base._decr_instances(instance) return tqdm_base(*args, **kwargs)
# fix library from scipy import stats stats.chisqprob = lambda chisq, df: stats.chi2.sf(chisq, df) # tk from .io.aux_nc import StoreBugXdset from .util.time_operations import npdt64todatetime as npdt2dt from .kma import Persistences from .plotting.alr import Plot_PValues, Plot_Params, Plot_Terms from .plotting.wts import ( Plot_Compare_PerpYear, Plot_Compare_Transitions, Plot_Compare_Persistences, ) from .plotting.alr import Plot_Compare_Covariate from .plotting.alr import Plot_Log_Sim
[docs] class ALR_WRP(object): "AutoRegressive Logistic Model Wrapper" def __init__(self, p_base): # data needed for ALR fitting self.xds_bmus_fit = None self.cluster_size = None # ALR terms self.d_terms_settings = {} self.terms_fit = {} self.terms_fit_names = [] # temporal data storage self.mk_order = 0 self.cov_names = [] # ALR model core self.model = None # config (only tested with statsmodels library) self.model_library = "statsmodels" # sklearn / statsmodels # paths self.p_base = p_base # alr model and terms_fit self.p_save_model = op.join(p_base, "model.sav") self.p_save_terms_fit = op.join(p_base, "terms.sav") # store fit and simulated bmus self.p_save_fit_xds = op.join(p_base, "xds_input.nc") self.p_save_sim_xds = op.join(p_base, "xds_output.nc") # export folders for figures self.p_report_fit = op.join(p_base, "report_fit") self.p_report_sim = op.join(p_base, "report_sim") # log sim self.p_log_sim_xds = op.join(p_base, "xds_log_sim.nc")
[docs] def SetFitData(self, cluster_size, xds_bmus_fit, d_terms_settings): """ Sets data needed for ALR fitting cluster_size - number of clusters in classification xds_bmus_fit - xarray.Dataset vars: bmus, dims: time d_terms_settings - terms settings. See "SetFittingTerms" """ self.cluster_size = cluster_size self.xds_bmus_fit = xds_bmus_fit self.SetFittingTerms(d_terms_settings) # save bmus series used for fitting self.SaveBmus_Fit()
[docs] def SetFittingTerms(self, d_terms_settings): "Set terms settings that will be used for fitting" # default settings used for ALR terms default_settings = { "mk_order": 0, "constant": False, "long_term": False, "seasonality": (False, []), "covariates": (False, []), "covariates_seasonality": (False, []), } # join user and default input for k in default_settings.keys(): if k not in d_terms_settings: d_terms_settings[k] = default_settings[k] # generate ALR terms bmus_fit = self.xds_bmus_fit.bmus.values time_fit = self.xds_bmus_fit.time.values cluster_size = self.cluster_size self.terms_fit, self.terms_fit_names = self.GenerateALRTerms( d_terms_settings, bmus_fit, time_fit, cluster_size, time2yfrac=True ) # store data self.mk_order = d_terms_settings["mk_order"] self.d_terms_settings = d_terms_settings
[docs] def GenerateALRTerms( self, d_terms_settings, bmus, time, cluster_size, time2yfrac=False ): "Generate ALR terms from user terms settings" # terms stored at OrderedDict terms = OrderedDict() terms_names = [] # time options (time has to bee yearly fraction) if time2yfrac: time_yfrac = self.GetFracYears(time) else: time_yfrac = time # constant term if d_terms_settings["constant"]: terms["constant"] = np.ones((bmus.size, 1)) terms_names.append("intercept") # time term (use custom time array with year decimals) if d_terms_settings["long_term"]: terms["long_term"] = np.ones((bmus.size, 1)) terms["long_term"][:, 0] = time_yfrac terms_names.append("long_term") # seasonality term if d_terms_settings["seasonality"][0]: phases = d_terms_settings["seasonality"][1] temp_seas = np.zeros((len(time_yfrac), 2 * len(phases))) c = 0 for a in phases: temp_seas[:, c] = np.cos(a * np.pi * time_yfrac) temp_seas[:, c + 1] = np.sin(a * np.pi * time_yfrac) terms_names.append("ss_cos_{0}".format(a)) terms_names.append("ss_sin_{0}".format(a)) c += 2 terms["seasonality"] = temp_seas # Covariates term if d_terms_settings["covariates"][0]: # covariates dataset (vars: cov_values, dims: time, cov_names) xds_cov = d_terms_settings["covariates"][1] cov_names = xds_cov.cov_names.values self.cov_names = cov_names # storage # normalize covars if not "cov_norm" in xds_cov.keys(): cov_values = xds_cov.cov_values.values cov_norm = (cov_values - cov_values.mean(axis=0)) / cov_values.std( axis=0 ) else: # simulation covars are previously normalized cov_norm = xds_cov.cov_norm.values # generate covar terms for i in range(cov_norm.shape[1]): cn = cov_names[i] terms[cn] = np.transpose(np.asmatrix(cov_norm[:, i])) terms_names.append(cn) # Covariates seasonality if d_terms_settings["covariates_seasonality"][0]: cov_season = d_terms_settings["covariates_seasonality"][1] if cov_season[i]: terms["{0}_cos".format(cn)] = np.multiply( terms[cn].T, np.cos(2 * np.pi * time_yfrac) ).T terms["{0}_sin".format(cn)] = np.multiply( terms[cn].T, np.sin(2 * np.pi * time_yfrac) ).T terms_names.append("{0}_cos".format(cn)) terms_names.append("{0}_sin".format(cn)) # markov term if d_terms_settings["mk_order"] > 0: # dummi for markov chain def dummi(csize): D = np.ones((csize - 1, csize)) * -1 for i in range(csize - 1): D[i, csize - 1 - i] = csize - i - 1 D[i, csize - 1 + 1 - i :] = 0 return D def dummi_norm(csize): D = np.zeros((csize, csize - 1)) for i in range(csize - 1): D[i, i] = float((csize - i - 1)) / (csize - i) D[i + 1 :, i] = -1.0 / (csize - i) return np.transpose(np.flipud(D)) def helmert_ints(csize, reverse=False): D = np.zeros((csize, csize - 1)) for i in range(csize - 1): D[i, i] = csize - i - 1 D[i + 1 :, i] = -1.0 if reverse: return np.fliplr(np.flipud(D)) else: return D def helmert_norm(csize, reverse=False): D = np.zeros((csize, csize - 1)) for i in range(csize - 1): D[i, i] = float((csize - i - 1)) / (csize - i) D[i + 1 :, i] = -1.0 / (csize - i) if reverse: return np.fliplr(np.flipud(D)) else: return D # helmert dum = helmert_norm(cluster_size, reverse=True) # solve markov order N mk_order = d_terms_settings["mk_order"] for i in range(mk_order): Z = np.zeros((bmus.size, cluster_size - 1)) for indz in range(bmus.size - i - 1): Z[indz + i + 1, :] = np.squeeze(dum[bmus[indz] - 1, :]) terms["markov_{0}".format(i + 1)] = Z for ics in range(cluster_size - 1): terms_names.append("mk{0}_{1}".format(i + 1, ics + 1)) return terms, terms_names
[docs] def GetFracYears(self, time): "Returns time in custom year decimal format" # fix np.datetime64 if not "year" in dir(time[0]): time_0 = pd.to_datetime(time[0]) time_1 = pd.to_datetime(time[-1]) time_d = pd.to_datetime(time[1]) else: time_0 = time[0] time_1 = time[-1] time_d = time[1] # resolution year if time_d.year - time_0.year == 1: return range(time_1.year - time_0.year + 1) # resolution day: get start/end data y0 = time_0.year m0 = time_0.month d0 = time_0.day y1 = time_1.year m1 = time_1.month d1 = time_1.day # start "year cicle" at 01/01 d_y0 = date(y0, 1, 1) # time array d_0 = date(y0, m0, d0) d_1 = date(y1, m1, d1) # year_decimal from year start to d1 delta_y0 = d_1 - d_y0 y_fraq_y0 = np.array(range(delta_y0.days + 1)) / 365.25 # cut year_decimal from d_0 i0 = (d_0 - d_y0).days y_fraq = y_fraq_y0[i0:] return y_fraq
[docs] def FitModel(self, max_iter=1000): "Fits ARL model using sklearn" # get fitting data X = np.concatenate(list(self.terms_fit.values()), axis=1) y = self.xds_bmus_fit.bmus.values # fit model print("\nFitting autoregressive logistic model ...") start_time = time.time() if self.model_library == "statsmodels": # mount data with pandas X = pd.DataFrame(X, columns=self.terms_fit_names) y = pd.DataFrame(y, columns=["bmus"]) # TODO: CAPTURAR LA EVOLUCION DE L (maximun-likelihood) self.model = sm.MNLogit(y, X).fit( method="lbfgs", maxiter=max_iter, retall=True, full_output=True, disp=True, warn_convergence=True, missing="raise", ) elif self.model_library == "sklearn": # use sklearn logistig regression self.model = linear_model.LogisticRegression( penalty="l2", C=1e5, fit_intercept=False, solver="lbfgs" ) self.model.fit(X, y) else: print("wrong config: {0} not in model_library".format(self.model_library)) sys.exit() elapsed_time = time.time() - start_time print("Optimization done in {0:.2f} seconds\n".format(elapsed_time)) # save fitted model self.SaveModel()
[docs] def SaveModel(self): "Saves fitted model (and fitting terms) for future use" if not op.isdir(self.p_base): os.makedirs(self.p_base) # save model pickle.dump(self.model, open(self.p_save_model, "wb")) # save terms fit pickle.dump( [self.d_terms_settings, self.terms_fit, self.terms_fit_names], open(self.p_save_terms_fit, "wb"), )
[docs] def LoadModel(self): "Load fitted model (and fitting terms)" # load model self.model = pickle.load(open(self.p_save_model, "rb")) # load terms fit self.d_terms_settings, self.terms_fit, self.terms_fit_names = pickle.load( open(self.p_save_terms_fit, "rb") ) # load aux data if self.d_terms_settings["covariates"][0]: cov_names = self.d_terms_settings["covariates"][1].cov_names.values self.cov_names = cov_names self.mk_order = self.d_terms_settings["mk_order"]
[docs] def SaveBmus_Fit(self): "Saves bmus - fit for future use" if not op.isdir(self.p_base): os.makedirs(self.p_base) self.xds_bmus_fit.attrs["cluster_size"] = self.cluster_size self.xds_bmus_fit.to_netcdf(self.p_save_fit_xds, "w")
[docs] def LoadBmus_Fit(self): "Load bmus - fit" self.xds_bmus_fit = xr.open_dataset(self.p_save_fit_xds) self.cluster_size = self.xds_bmus_fit.attrs["cluster_size"] return self.xds_bmus_fit
[docs] def SaveBmus_Sim(self): "Saves bmus - sim for future use" if not op.isdir(self.p_base): os.makedirs(self.p_base) self.xds_bmus_sim.to_netcdf(self.p_save_sim_xds, "w")
[docs] def LoadBmus_Sim(self): "Load bmus - sim" self.xds_bmus_sim = xr.open_dataset(self.p_save_sim_xds) return self.xds_bmus_sim
[docs] def Report_Fit(self, terms_fit=False, summary=False, show=True): "Report containing model fitting info" # load model self.LoadModel() # get data try: pval_df = self.model.pvalues.transpose() params_df = self.model.params.transpose() name_terms = pval_df.columns.tolist() except: # TODO: no converge? print("warning - statsmodels MNLogit could not provide p-values") return # output figs l_figs = [] # plot p-values f = Plot_PValues(pval_df.values, name_terms, show=show) l_figs.append(f) # plot parameters f = Plot_Params(params_df.values, name_terms, show=show) l_figs.append(f) # plot terms used for fitting if terms_fit: f = self.Report_Terms_Fit(p_rep_trms, show=show) l_figs.append(f) # write summary if summary: summ = self.model.summary() print(summ.as_text())
[docs] def Report_Terms_Fit(self, show=True): "Plot terms used for model fitting" # load bmus fit self.LoadBmus_Fit() # get data for plotting term_mx = np.concatenate(list(self.terms_fit.values()), axis=1) term_ds = [npdt2dt(t) for t in self.xds_bmus_fit.time.values] term_ns = self.terms_fit_names # Plot terms f = Plot_Terms(term_mx, term_ds, term_ns, show=show) return f
[docs] def Simulate( self, num_sims, time_sim, xds_covars_sim=None, log_sim=False, overfit_filter=False, of_probs=0.98, of_pers=5, ): """ Launch ARL model simulations num_sims - number of simulations to compute time_sim - time array to solve xds_covars_sim - xr.Dataset (time,), cov_values Covariates used at simulation, compatible with "n_sim" dimension ("n_sim" dimension (optional) will be iterated with each simulation) log_sim - Store a .nc file with all simulation detailed information. filters for exceptional ALR overfit probabilities situation and patch: overfit_filter - overfit filter activation of_probs - overfit filter probabilities activation of_pers - overfit filter persistences activation """ class SimLog(object): """ simulatiom log - records and stores detail info for each time step and n_sim """ def __init__( self, time_yfrac, mk_order, num_sims, cluster_size, terms_fit_names ): # initialize variables to record self.terms = np.nan * np.zeros( ( len(time_yfrac) - mk_order, num_sims, mk_order + 1, len(terms_fit_names), ) ) self.probs = np.nan * np.zeros( (len(time_yfrac) - mk_order, num_sims, mk_order + 1, cluster_size) ) self.ptrns = np.nan * np.zeros( (len(time_yfrac) - mk_order, num_sims, cluster_size) ) self.nrnd = np.nan * np.zeros((len(time_yfrac) - mk_order, num_sims)) self.evbmu_sims = np.nan * np.zeros( (len(time_yfrac) - mk_order, num_sims) ) # overfit filter variables (filter states and filtered bmus) self.of_state = np.zeros( (len(time_yfrac) - mk_order, num_sims), dtype=bool ) self.of_bmus = np.nan * np.zeros((len(time_yfrac) - mk_order, num_sims)) def Add(self, ix_t, ix_s, terms, prob, probTrans, nrnd, of_state, of_bmus): # add iteration to log self.terms[ix_t, ix_s, :, :] = terms self.probs[ix_t, ix_s, :, :] = prob self.ptrns[ix_t, ix_s, :] = probTrans self.nrnd[ix_t, ix_s] = nrnd self.evbmu_sims[ix_t, ix_s] = np.where(probTrans > nrnd)[0][0] + 1 # add overfit filter data to log self.of_state[ix_t, ix_s] = of_state self.of_bmus[ix_t, ix_s] = of_bmus def Save(self, p_save, terms_names): # use xarray to store netcdf xds_log = xr.Dataset( { "alr_terms": ( ( "time", "n_sim", "mk", "terms", ), self.terms, ), "probs": (("time", "n_sim", "mk", "n_clusters"), self.probs), "probTrans": (("time", "n_sim", "n_clusters"), self.ptrns), "nrnd": (("time", "n_sim"), self.nrnd), "evbmus_sims": (("time", "n_sim"), self.evbmu_sims.astype(int)), "overfit_filter_state": (("time", "n_sim"), self.of_state), "evbmus_sims_filtered": ( ("time", "n_sim"), self.of_bmus.astype(int), ), }, coords={ "time": time_sim[mk_order:], "terms": terms_names, }, ) StoreBugXdset(xds_log, p_save) print("simulation data log stored at {0}\n".format(p_save)) class OverfitFilter(object): """ overfit filter for alr outlayer outputs. """ def __init__(self, probs_lim, pers_lim): self.active = False self.probs_lim = probs_lim self.pers_lim = pers_lim self.log = "" def CheckStatus(self, n_sim, prob, bmus): "check current iteration filter status" # active filter if self.active: # continuation condition self.active = np.nanmax(prob[-1, :]) >= self.probs_lim # log when deactivated if self.active == False: self.log += ( "sim. {0:02d} - {1} - deactivated (max prob {2})\n".format( n_sim, time_sim[i], np.nanmax(prob[-1, :]) ) ) # inactive filter else: # re-activation condition self.active = np.nanmax(prob[-1, :]) >= self.probs_lim and np.all( evbmus[-1 * self.pers_lim :] == new_bmus ) # log when activated if self.active: self.log += ( "sim. {0:02d} - {1} - activated (max prob {2})\n".format( n_sim, time_sim[i], np.nanmax(prob[-1, :]) ) ) def PrintLog(self): "Print filter log" if self.log != "": print("overfit filter log") print(self.log) # switch library probabilities predictor function if self.model_library == "statsmodels": pred_prob_fun = self.model.predict elif self.model_library == "sklearn": pred_prob_fun = self.model.predict_proba else: print("wrong config: {0} not in model_library".format(self.model_library)) sys.exit() # get needed data evbmus_values = self.xds_bmus_fit.bmus.values time_fit = self.xds_bmus_fit.time.values mk_order = self.mk_order # times at datetime if isinstance(time_sim[0], np.datetime64): time_sim = [npdt2dt(t) for t in time_sim] # print some info tf0 = str(time_fit[0])[:10] tf1 = str(time_fit[-1])[:10] ts0 = str(time_sim[0])[:10] ts1 = str(time_sim[-1])[:10] print("ALR model fit : {0} --- {1}".format(tf0, tf1)) print("ALR model sim : {0} --- {1}".format(ts0, ts1)) # generate time yearly fractional array time_yfrac = self.GetFracYears(time_sim) # use a d_terms_settigs copy d_terms_settings_sim = self.d_terms_settings.copy() # initialize optional simulation log if log_sim: SL = SimLog( time_yfrac, mk_order, num_sims, self.cluster_size, self.terms_fit_names ) # initialize ALR overfit filter ofilt = OverfitFilter(of_probs, of_pers) # initialize ALR simulated bmus array, and overfit filter register array evbmus_sims = np.zeros((len(time_yfrac), num_sims)) ofbmus_sims = np.zeros((len(time_yfrac), num_sims), dtype=bool) # start simulations print("\nLaunching {0} simulations...\n".format(num_sims)) for n in range(num_sims): # preload some data (simulation covariates) cvtxt = "" if xds_covars_sim != None: # check if n_sim dimension in xds_covars_sim if "n_sim" in xds_covars_sim.dims: sim_covars_T = xds_covars_sim.isel(n_sim=n).cov_values.values cvtxt = " (Covs. {0:03d})".format(n + 1) else: sim_covars_T = xds_covars_sim.cov_values.values sim_covars_T_mean = sim_covars_T.mean(axis=0) sim_covars_T_std = sim_covars_T.std(axis=0) # progress bar pbar = tqdm( total=len(time_yfrac) - mk_order, file=sys.stdout, desc="Sim. Num. {0:03d}{1}".format(n + 1, cvtxt), ) evbmus = evbmus_values[1 : mk_order + 1] for i in range(len(time_yfrac) - mk_order): # handle simulation covars if d_terms_settings_sim["covariates"][0]: # normalize step covars sim_covars_evbmus = sim_covars_T[i : i + mk_order + 1] sim_cov_norm = ( sim_covars_evbmus - sim_covars_T_mean ) / sim_covars_T_std # mount step xr.dataset for sim covariates xds_cov_sim_step = xr.Dataset( { "cov_norm": (("time", "cov_names"), sim_cov_norm), }, coords={"cov_names": self.cov_names}, ) d_terms_settings_sim["covariates"] = (True, xds_cov_sim_step) # generate time step ALR terms terms_i, terms_names = self.GenerateALRTerms( d_terms_settings_sim, np.append(evbmus[i : i + mk_order], 0), time_yfrac[i : i + mk_order + 1], self.cluster_size, time2yfrac=False, ) # Event sequence simulation (sklearn) X = np.concatenate(list(terms_i.values()), axis=1) prob = pred_prob_fun(X) # statsmodels // sklearn functions probTrans = np.cumsum(prob[-1, :]) # generate random cluster with ALR probs nrnd = np.random.rand() new_bmus = np.where(probTrans > nrnd)[0][0] + 1 # overfit filter status swich (if active) if overfit_filter: ofilt.CheckStatus(n, prob, np.append(evbmus, new_bmus)) # override overfit bmus if filter active if ofilt.active: # criteria: random bmus from that date of the year at historical ix_of = np.random.choice( np.where( self.xds_bmus_fit["time.dayofyear"] == time_sim[i].timetuple().tm_yday )[0] ) new_bmus = self.xds_bmus_fit.bmus.values[ix_of] # append_bmus evbmus = np.append(evbmus, new_bmus) # store overfit filter status ofbmus_sims[i + mk_order, n] = ofilt.active # optional detail log if log_sim: SL.Add(i, n, X, prob, probTrans, nrnd, ofilt.active, new_bmus) # update progress bar pbar.update(1) evbmus_sims[:, n] = evbmus # close progress bar pbar.close() print() # white line after all progress bars # return ALR simulation data in a xr.Dataset xds_out = xr.Dataset( { "evbmus_sims": (("time", "n_sim"), evbmus_sims.astype(int)), "ofbmus_sims": (("time", "n_sim"), ofbmus_sims), }, coords={ "time": time_sim, }, ) # save output StoreBugXdset(xds_out, self.p_save_sim_xds) # save log file if log_sim: SL.Save(self.p_log_sim_xds, terms_names) # overfit filter log ofilt.PrintLog() return xds_out
[docs] def Report_Sim( self, py_month_ini=1, persistences_hists=False, persistences_table=False, show=True, ): """ Report that Compare fitting to simulated bmus py_month_ini - start month for PerpetualYear bmus comparison """ # TODO: add arg n_sim = None (for plotting only one sim output) # load fit and sim bmus xds_ALR_fit = self.LoadBmus_Fit() xds_ALR_sim = self.LoadBmus_Sim() # report folder and files p_save = self.p_report_sim # get data cluster_size = self.cluster_size bmus_values_sim = xds_ALR_sim.evbmus_sims.values[:] bmus_dates_sim = xds_ALR_sim.time.values[:] bmus_values_hist = np.reshape(xds_ALR_fit.bmus.values, [-1, 1]) bmus_dates_hist = xds_ALR_fit.time.values[:] num_sims = bmus_values_sim.shape[1] # calculate bmus persistences pers_hist = Persistences(bmus_values_hist.flatten()) lsp = [Persistences(bs) for bs in bmus_values_sim.T.astype(int)] pers_sim = { k: np.concatenate([x.get(k, []) for x in lsp]) for k in lsp[0].keys() } # fix datetime 64 dates if isinstance(bmus_dates_sim[0], np.datetime64): bmus_dates_sim = [npdt2dt(t) for t in bmus_dates_sim] if isinstance(bmus_dates_hist[0], np.datetime64): bmus_dates_hist = [npdt2dt(t) for t in bmus_dates_hist] # output figs l_figs = [] # Plot Perpetual Year (daily) - bmus wt fig_PP = Plot_Compare_PerpYear( cluster_size, bmus_values_sim, bmus_dates_sim, bmus_values_hist, bmus_dates_hist, n_sim=num_sims, month_ini=py_month_ini, show=show, ) l_figs.append(fig_PP) # Plot WTs Transition (probability change / scatter Fit vs. ACCUMULATED Sim) sttl = "Cluster Probabilities Transitions: All Simulations" fig_CT = Plot_Compare_Transitions( cluster_size, bmus_values_hist, bmus_values_sim, sttl=sttl, show=show, ) l_figs.append(fig_CT) # Plot Persistences comparison Fit vs Sim if persistences_hists: fig_PS = Plot_Compare_Persistences( cluster_size, pers_hist, pers_sim, show=show, ) l_figs.append(fig_PS) # persistences set table if persistences_table: print("Persistences by WT (set)") for c in range(cluster_size): wt = c + 1 p_h = pers_hist[wt] p_s = pers_sim[wt] print("WT: {0}".format(wt)) print(" hist : {0}".format((sorted(set(p_h))))) print(" sim. : {0}".format(sorted(set(p_s)))) # TODO export handling (if show=False) # p_save = self.p_report_sim # p_rep_PY = None # p_rep_VL = None # if export: # if not op.isdir(p_save): os.mkdir(p_save) # p_rep_PY = op.join(p_save, 'PerpetualYear.png') # p_rep_VL = op.join(p_save, 'Transitions.png') return l_figs # TODO: Plot Perpetual Year (monthly) # TODO: load covariates if needed for ploting # TODO: activate this if self.d_terms_settings["covariates"][0]: # TODO eliminar tiempos duplicados time_hist_covars = bmus_dates_hist time_sim_covars = bmus_dates_sim # covars fit xds_cov_fit = self.d_terms_settings["covariates"][1] cov_names = xds_cov_fit.cov_names.values cov_fit_values = xds_cov_fit.cov_values.values # covars sim cov_sim_values = xds_cov_sim.cov_values.values for ic, cn in enumerate(cov_names): # get covariate data cf_val = cov_fit_values[:, ic] cs_val = cov_sim_values[:, ic] # plot covariate - bmus wt p_rep_cn = op.join(p_save, "{0}_comp.png".format(cn)) Plot_Compare_Covariate( cluster_size, bmus_values_sim, bmus_dates_sim, bmus_values_hist, bmus_dates_hist, cs_val, time_sim_covars, cf_val, time_hist_covars, cn, n_sim=num_sims, p_export=p_rep_cn, )
[docs] def Report_Sim_Log(self, n_sim=0, t_slice=None, show=True): """ Interactive plot for simulation log n_sim - simulation log to plot """ # load fit and sim bmus xds_log = xr.open_dataset(self.p_log_sim_xds, decode_times=True) # get simulation log_sim = xds_log.isel(n_sim=n_sim) if t_slice != None: log_sim = log_sim.sel(time=t_slice) # plot interactive report Plot_Log_Sim(log_sim)