Source code for bluemath_tk.teslakit.estela

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# common 
import os
import os.path as op

# pip
import xarray as xr
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import path

# tk 
from .pca import PCA_EstelaPred
from .kma import KMA_regression_guided
from .kma import SimpleMultivariateRegressionModel as SMRM
from .intradaily import Calculate_Hydrographs

from .plotting.estela import Plot_EOFs_EstelaPred, Plot_DWTs_Mean_Anom, \
Plot_DWTs_Probs
from .plotting.pcs import Plot_PCs_WT, Plot_WT_PCs_3D


[docs] def spatial_gradient(xdset, var_name): ''' Calculate spatial gradient xdset: (longitude, latitude, time), var_name returns xdset with new variable "var_name_gradient" ''' # TODO:check/ ADD ONE ROW/COL EACH SIDE var_grad = np.zeros(xdset[var_name].shape) Mx = len(xdset.longitude) My = len(xdset.latitude) lat = xdset.latitude.values for it in range(len(xdset.time)): var_val = xdset[var_name].isel(time=it).values # calculate gradient (matrix) m_c = var_val[1:-1,1:-1] m_l = np.roll(var_val, -1, axis=1)[1:-1,1:-1] m_r = np.roll(var_val, +1, axis=1)[1:-1,1:-1] m_u = np.roll(var_val, -1, axis=0)[1:-1,1:-1] m_d = np.roll(var_val, +1, axis=0)[1:-1,1:-1] m_phi = np.pi*np.abs(lat)/180.0 m_phi = m_phi[1:-1] dpx1 = (m_c - m_l)/np.cos(m_phi[:,None]) dpx2 = (m_r - m_c)/np.cos(m_phi[:,None]) dpy1 = m_c - m_d dpy2 = m_u - m_c vg = (dpx1**2+dpx2**2)/2 + (dpy1**2+dpy2**2)/2 var_grad[it, 1:-1, 1:-1] = vg # calculate gradient (for). old code #for i in range(1, Mx-1): # for j in range(1, My-1): # phi = np.pi*np.abs(lat[j])/180.0 # dpx1 = (var_val[j,i] - var_val[j,i-1]) / np.cos(phi) # dpx2 = (var_val[j,i+1] - var_val[j,i]) / np.cos(phi) # dpy1 = (var_val[j,i] - var_val[j-1,i]) # dpy2 = (var_val[j+1,i] - var_val[j,i]) # var_grad[it, j, i] = (dpx1**2+dpx2**2)/2 + (dpy1**2+dpy2**2)/2 # store gradient xdset['{0}_gradient'.format(var_name)]= ( ('time', 'latitude', 'longitude'), var_grad) return xdset
[docs] def mask_from_poly(xdset, ls_poly, name_mask='mask'): ''' Generate mask from list of tuples (lon, lat) xdset dimensions: (longitude, latitude, ) returns xdset with new variable "mask" ''' lon = xdset.longitude.values lat = xdset.latitude.values mesh_lat, mesh_lon = np.meshgrid(lat, lon) mask = np.zeros(mesh_lat.shape) mesh_points = np.array( [mesh_lon.flatten(), mesh_lat.flatten()] ).T for pol in ls_poly: p = path.Path(pol) inside = np.array(p.contains_points(mesh_points)) inmesh = np.reshape(inside, mask.shape) mask[inmesh] = 1 xdset[name_mask]=(('latitude','longitude'), mask.T) return xdset
[docs] def dynamic_estela_predictor(xdset, var_name, estela_D): ''' Generate dynamic predictor using estela xdset: (time, latitude, longitude), var_name, mask returns similar xarray.Dataset with variables: (time, latitude, longitude), var_name_comp (time, latitude, longitude), var_name_gradient_comp ''' # first day is estela max first_day = int(np.floor(np.nanmax(estela_D)))+1 # output will start at time=first_day shp = xdset[var_name].shape comp_shape = (shp[0]-first_day, shp[1], shp[2]) var_comp = np.ones(comp_shape) * np.nan var_grd_comp = np.ones(comp_shape) * np.nan # get data using estela for each cell for i_lat in range(len(xdset.latitude)): for i_lon in range(len(xdset.longitude)): ed = estela_D[i_lat, i_lon] if not np.isnan(ed): # mount estela displaced time array i_times = np.arange( first_day, len(xdset.time) ) - np.int(ed) # select data from displaced time array positions xdselec = xdset.isel( time = i_times, latitude = i_lat, longitude = i_lon) # get estela predictor values var_comp[:, i_lat, i_lon] = xdselec[var_name].values var_grd_comp[:, i_lat, i_lon] = xdselec['{0}_gradient'.format(var_name)].values # return generated estela predictor return xr.Dataset( { '{0}_comp'.format(var_name):( ('time','latitude','longitude'), var_comp), '{0}_gradient_comp'.format(var_name):( ('time','latitude','longitude'), var_grd_comp), }, coords = { 'time':xdset.time.values[first_day:], 'latitude':xdset.latitude.values, 'longitude':xdset.longitude.values, } )
[docs] class Predictor(object): ''' tesla-kit custom dataset handler used for 3D dataset (lon,lat,time) and related statistical classification calculations and figures. ''' def __init__(self, p_store): # file paths self.p_store = p_store self.p_data = op.join(p_store, 'data.nc') self.p_pca = op.join(p_store, 'pca.nc') self.p_kma = op.join(p_store, 'kma.nc') self.p_plots = op.join(p_store, 'figs') # data (xarray.Dataset) self.data = None self.PCA = None self.KMA = None
[docs] def Load(self): if op.isfile(self.p_data): self.data = xr.open_dataset(self.p_data) if op.isfile(self.p_pca): self.PCA = xr.open_dataset(self.p_pca) if op.isfile(self.p_kma): self.KMA = xr.open_dataset(self.p_kma)
[docs] def Save(self): try: os.makedirs(self.p_store) except: pass if self.data: if op.isfile(self.p_data): os.remove(self.p_data) self.data.to_netcdf(self.p_data,'w') if self.PCA: if op.isfile(self.p_pca): os.remove(self.p_pca) self.PCA.to_netcdf(self.p_pca,'w') if self.KMA: if op.isfile(self.p_kma): os.remove(self.p_kma) self.KMA.to_netcdf(self.p_kma,'w')
[docs] def Calc_PCA_EstelaPred(self, var_name, xds_estela): 'Principal components analysis using estela predictor' # generate estela predictor # TODO STORE DYNAMIC ESTELA PREDICTOR (too big..) xds_estela_pred = dynamic_estela_predictor( self.data, var_name, xds_estela) # Calculate PCA self.PCA = PCA_EstelaPred( xds_estela_pred, var_name) # save data self.Save()
[docs] def Calc_KMA_regressionguided( self, num_clusters, xds_waves, waves_vars, alpha, min_group_size=None, repres = 0.95): 'KMA regression guided with waves data' # we have to miss some days of data due to ESTELA tcut = self.PCA.pred_time.values[:] # calculate regresion model between predictand and predictor xds_waves = xds_waves.sel(time = slice(tcut[0], tcut[-1])) xds_Yregres = SMRM(self.PCA, xds_waves, waves_vars) # classification: KMA regresion guided self.KMA = KMA_regression_guided( self.PCA, xds_Yregres, num_clusters, repres, alpha, min_group_size ) # store time array with KMA self.KMA['time'] = (('n_components',), self.PCA.pred_time.values[:]) # save data self.Save()
[docs] def Calc_MU_TAU_Hydrographs(self, xds_WAVES): ''' Calculates TWL hydrographs returns list of xarray.Dataset with TWL hydrographs MU,TAU arrays for each WT ''' # TODO: SACAR DE AQUI, replantear hydrographs # get sorted bmus from kma xds_BMUS = xr.Dataset( {'bmus':(('time', self.KMA.sorted_bmus.values[:]))}, coords = {'time': self.KMA.time.values[:]} ) # Calculate hydrographs for each WT _, l_xds_MUTAU = Calculate_Hydrographs(xds_BMUS, xds_WAVES) return l_xds_MUTAU
[docs] def Mod_KMA_AddStorms(self, storm_dates, storm_categories): ''' Modify KMA bmus series adding storm category (6 new groups) ''' n_clusters = len(self.KMA.n_clusters.values[:]) kma_dates = self.KMA.time.values[:] bmus_storms = np.copy(self.KMA.sorted_bmus.values[:]) # copy numpy.array for sd, sc in zip(storm_dates, storm_categories): sdr = np.array(sd, dtype='datetime64[D]') # round to day pos_date = np.where(kma_dates==sdr)[0] if pos_date: bmus_storms[pos_date[0]] = n_clusters + sc # copy kma and add bmus_storms self.KMA['sorted_bmus_storms'] = (('n_components',), bmus_storms) # store changes if op.isfile(self.p_kma): os.remove(self.p_kma) self.KMA.to_netcdf(self.p_kma,'w')
[docs] def Plot_EOFs_EstelaPred(self, n_plot=3, show=True): 'Plot EOFs generated in PCA_EstelaPred' # optional land mask mask_land = None if 'mask_land' in self.data.keys(): mask_land = self.data['mask_land'].values[:] l_fs = Plot_EOFs_EstelaPred(self.PCA, n_plot, mask_land=mask_land, show=show) return l_fs
[docs] def Plot_DWTs(self, var_name, kind='mean', show=True): ''' Plot KMA clusters generated in PCA_EstelaPred (DWTs) kind = mean / anom uses database means/anomalies at cluster location (bmus corrected) ''' # data to plot xds_DWTs = self.KMA var_data = self.data[var_name] # optional land mask mask_land = None if 'mask_land' in self.data.keys(): mask_land = self.data['mask_land'].values[:] # Plot DWTs mean using var_data fig = Plot_DWTs_Mean_Anom( xds_DWTs, var_data, kind = kind, mask_land = mask_land, show = show ) return fig
[docs] def Plot_DWTs_Probs(self, show=True, field='sorted_bmus', n_clusters=None): ''' Plot DWTs bmus probabilities - histogram for ocurrences - probs. all series - probs by month - probs by 3month ''' # Plot DWTs mean using var_data bmus = self.KMA[field].values[:] + 1 # index to DWT id bmus_time = self.KMA['time'].values[:] if not n_clusters: n_clusters = len(self.KMA.n_clusters.values[:]) fig = Plot_DWTs_Probs(bmus, bmus_time, n_clusters, show=show) return fig
[docs] def Plot_DWT_PCs(self, n=3, show=True): ''' Plot Daily Weather Types PCs using 2D axis ''' # get estela data PCs = self.PCA.PCs.values[:] variance = self.PCA.variance.values[:] bmus = self.KMA.sorted_bmus.values[:] # sorted_bmus n_clusters = len(self.KMA.n_clusters.values[:]) # Plot DWTs PCs fig = Plot_PCs_WT(PCs, variance, bmus, n_clusters, n, show=show) return fig
[docs] def Plot_PCs_3D(self, show=True): 'Plots Predictor first 3 PCs' # first 3 PCs bmus = self.KMA['sorted_bmus'].values[:] PCs = self.PCA.PCs.values[:] variance = self.PCA.variance.values[:] n_clusters = len(self.KMA.n_clusters.values[:]) PC1 = np.divide(PCs[:,0], np.sqrt(variance[0])) PC2 = np.divide(PCs[:,1], np.sqrt(variance[1])) PC3 = np.divide(PCs[:,2], np.sqrt(variance[2])) # dictionary of DWT PCs 123 d_PCs = {} for ic in range(n_clusters): ind = np.where(bmus == ic)[:] PC123 = np.column_stack((PC1[ind], PC2[ind], PC3[ind])) d_PCs['{0}'.format(ic+1)] = PC123 # Plot DWTs PCs 3D fig = Plot_WT_PCs_3D(d_PCs, n_clusters, show=show) return fig