Source code for bluemath_tk.teslakit.numerical_models.swan.io

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import os.path as op
import shutil as su
from datetime import datetime

import numpy as np
import pandas as pd
import xarray as xr
from scipy.io import loadmat

from .geo import gc_distance


# AUX. FUNCTIONs
[docs] def geo_distance_azimuth(lat_matrix, lon_matrix, lat_point, lon_point): ''' Returns geodesic distance and azimuth between lat,lon matrix and lat,lon point in degrees ''' arcl = np.zeros(lat_matrix.shape) * np.nan azi = np.zeros(lat_matrix.shape) * np.nan sh1, sh2 = lat_matrix.shape for i in range(sh1): for j in range(sh2): arcl[i,j], azi[i,j] = gc_distance( lat_point, lon_point, lat_matrix[i][j], lon_matrix[i][j] ) return arcl, azi
# SWAN INPUT/OUTPUT STAT LIBRARY
[docs] class SwanIO(object): 'SWAN numerical model input/output' def __init__(self, swan_proj): # needs SwanProject self.proj = swan_proj
[docs] def make_project(self): 'makes swan project folder and subfolders' if not op.isdir(self.proj.p_main): os.makedirs(self.proj.p_main) if not op.isdir(self.proj.p_cases): os.makedirs(self.proj.p_cases)
[docs] class SwanIO_STAT(SwanIO): 'SWAN numerical model input/output - STATIONARY cases'
[docs] def make_input(self, p_file, id_run, ws, bnd): ''' Writes input.swn file from waves sea state for stationary execution p_file - input.swn file path ws - wave sea state (hs, per, dr, spr) bnd - wave sea state active boundaries more info: http://swanmodel.sourceforge.net/online_doc/swanuse/node23.html ''' # TODO: check readinp idla # .swn file parameters sea_level = self.proj.params['sea_level'] jonswap_gamma = self.proj.params['jonswap_gamma'] coords_spherical = self.proj.params['coords_spherical'] waves_period = self.proj.params['waves_period'] # main mesh mm = self.proj.mesh_main # .swn text file t = "PROJ '{0}' '{1}'\n$\n".format(self.proj.name, id_run) t += 'MODE STAT\n' # spherical coordinates (mercator) switch if coords_spherical != None: t += 'COORDINATES SPHER {0}\n'.format(coords_spherical) # sea level t += 'SET level={0} NAUTICAL\n$\n'.format(sea_level) # computational grid t += 'CGRID REGULAR {0} {1} {2} {3} {4} {5} {6} CIRCLE 72 0.0345 1.00 34\n$\n'.format( mm.cg['xpc'], mm.cg['ypc'], mm.cg['alpc'], mm.cg['xlenc'], mm.cg['ylenc'], mm.cg['mxc']-1, mm.cg['myc']-1) # bathymetry t += 'INPGRID BOTTOM REGULAR {0} {1} {2} {3} {4} {5} {6}\n'.format( mm.dg['xpc'], mm.dg['ypc'], mm.dg['alpc'], mm.dg['mxc'], mm.dg['myc'], mm.dg['dxinp'], mm.dg['dyinp']) t += "READINP BOTTOM 1 '{0}' {1} 0 FREE\n$\n".format( mm.depth_fn, mm.dg_idla) # waves boundary conditions t += 'BOUND SHAPespec JONswap {0} {1} DSPR DEGR\n'.format( jonswap_gamma, waves_period) for ic in bnd: t += "BOUN SIDE {0} CONstant PAR {1:.3f} {2:.3f} {3:.3f} {4:.3f}\n".format( ic, ws.hs, ws.per, ws.dir, ws.spr) t += "$\n" # numerics t += 'OFF QUAD\n' # t += 'PROP BSBT\n' # t += 'WCAP\n' t += 'BREA\n' t += 'FRICTION JONSWAP\n$\n' # optional nested mesh r_ns = [self.proj.run_nest1, self.proj.run_nest2, self.proj.run_nest3] m_ns = [self.proj.mesh_nest1, self.proj.mesh_nest2, self.proj.mesh_nest3] nout_0 = ['nest1', 'nest2', 'nest3'] nout_1 = ['bounds_nest1.dat', 'bounds_nest2.dat', 'bounds_nest3.dat'] for r_n, m_n, n0, n1 in zip(r_ns, m_ns, nout_0, nout_1): if r_n: t += "NGRID '{0}' {1} {2} {3} {4} {5} {6} {7}\n".format( n0, m_n.cg['xpc'], m_n.cg['ypc'], m_n.cg['alpc'], m_n.cg['xlenc'], m_n.cg['ylenc'], np.int32(m_n.cg['xlenc']/mm.cg['dxinp']), np.int32(m_n.cg['ylenc']/mm.cg['dyinp']) ) t += "NESTOUT '{0}' '{1}'\n".format(n0, n1) # output t += "BLOCK 'COMPGRID' NOHEAD '{0}' LAY 3 HSIGN TM02 DIR TPS DSPR\n$\n".format( mm.output_fn, ) # compute t += 'TEST 1,0\n' t += 'COMPUTE \n' t += 'STOP\n$\n' # write file: with open(p_file, 'w') as f: f.write(t) # log fmt2 = ' 7.2f' print( 'SWAN CASE: {1} ---> hs {2:{0}}, per {3:{0}}, dir {4:{0}}, spr {5:{0}}'.format( fmt2, id_run, ws.hs, ws.per, ws.dir, ws.spr ) )
[docs] def make_input_nested(self, p_file, id_run): ''' Writes input_nested.swn file from waves sea state for stationary execution p_file - input_nestedN.swn file path ''' # TODO check myc-1, mxc -1 # .swn file parameters sea_level = self.proj.params['sea_level'] coords_spherical = self.proj.params['coords_spherical'] nested_bounds = self.proj.params['nested_bounds'] # SWAN nested Computacional grid mn1 = self.proj.mesh_nest1 # .swn text file t = "PROJ '{0}' '{1}'\n$\n".format(self.proj.name, id_run) t += 'MODE STAT\n' # spherical coordinates (mercator) switch if coords_spherical != None: t += 'COORDINATES SPHER {0}\n'.format(coords_spherical) t += 'SET level={0} NAUTICAL\n$\n'.format(sea_level) # computational grid t += 'CGRID REGULAR {0} {1} {2} {3} {4} {5} {6} CIRCLE 72 0.03558410 1.00 35\n$\n'.format( mn1.cg['xpc'], mn1.cg['ypc'], mn1.cg['alpc'], mn1.cg['xlenc'], mn1.cg['ylenc'], mn1.cg['mxc']-1, mn1.cg['myc']-1) # bathymetry t += 'INPGRID BOTTOM REGULAR {0} {1} {2} {3} {4} {5} {6}\n'.format( mn1.dg['xpc'], mn1.dg['ypc'], mn1.dg['alpc'], mn1.dg['mxc']-1, mn1.dg['myc']-1, mn1.dg['dxinp'], mn1.dg['dyinp']) t += "READINP BOTTOM 1 '{0}' {1} 0 FREE\n$\n".format( mn1.depth_fn, mn1.dg_idla) # Boundary Conditions t += "BOUN NEST '{0}' {1}\n".format('bounds_nest1.dat', nested_bounds) # wind file t += "$\n" # numerics t += 'OFF QUAD\n' # t += 'GEN1\n' # t += 'PROP BSBT\n' # t += 'WCAP\n' t += 'BREA\n' t += 'FRICTION JONSWAP\n$\n' # output t += "BLOCK 'COMPGRID' NOHEAD '{0}' LAY 3 HSIGN TM02 DIR TPS DSPR\n$\n".format( mn1.output_fn, ) # compute t += 'TEST 1,0\n' t += 'COMPUTE \n' t += 'STOP\n$\n' # write file: with open(p_file, 'w') as f: f.write(t)
[docs] def build_case(self, case_id, waves_ss, bnd=['N', 'E', 'W', 'S']): ''' Build SWAN STAT case input files for given wave sea state (hs, per, dir, spr) ix_case - SWAN case index (int) waves_ss - wave sea state (hs, per, dir, spr) bnd - wave sea state active boundaries ''' # SWAN case path p_case = op.join(self.proj.p_cases, case_id) # make execution dir if not op.isdir(p_case): os.makedirs(p_case) # make depth file for main mesh self.proj.mesh_main.export_dat(p_case) # make input.swn file self.make_input(op.join(p_case, 'input.swn'), case_id, waves_ss, bnd) # optional nested mesh depth and input files r_ns = [self.proj.run_nest1, self.proj.run_nest2, self.proj.run_nest3] m_ns = [self.proj.mesh_nest1, self.proj.mesh_nest2, self.proj.mesh_nest3] i_ns = ['input_nest1.swn', 'input_nest2.swn', 'input_nest3.swn'] for r_n, m_n, i_n in zip(r_ns, m_ns, i_ns): if r_n: m_n.export_dat(p_case) self.make_input_nested(op.join(p_case, i_n), case_id)
[docs] def outmat2xr(self, p_mat): # matlab dictionary dmat = loadmat(p_mat) # return dataset xds_out = xr.Dataset( { 'Hsig': (('X','Y',), dmat['Hsig'].T, {'units':'m'}), 'Tm02': (('X','Y',), dmat['Tm02'].T, {'units':'s'}), 'Dir': (('X','Y',), dmat['Dir'].T, {'units':'º'}), 'Dspr': (('X','Y',), dmat['Dspr'].T, {'units':'º'}), 'TPsmoo': (('X','Y',), dmat['TPsmoo'].T, {'units':'s'}), } ) return xds_out
[docs] def output_case(self, p_case, mesh): 'read .mat output file from stationary and returns xarray.Dataset' # extract output from selected mesh p_mat = op.join(p_case, mesh.output_fn) xds_out = self.outmat2xr(p_mat) # set X and Y values X, Y = mesh.get_XY() xds_out = xds_out.assign_coords(X=X) xds_out = xds_out.assign_coords(Y=Y) # rename to longitude latitude in spherical coords cases coords_spherical = self.proj.params['coords_spherical'] if coords_spherical != None: xds_out = xds_out.rename({'X':'lon', 'Y':'lat'}) return xds_out
[docs] class SwanIO_NONSTAT(SwanIO): 'SWAN numerical model input/output - NON STATIONARY cases'
[docs] def make_out_points(self, p_file): 'Generates desired output-points coordinates file' # define and save output points x_out = self.proj.x_out y_out = self.proj.y_out if not x_out or not y_out: return else: points = np.vstack((x_out,y_out)).T np.savetxt(p_file, points, fmt='%.2f')
[docs] def make_wave_files(self, p_case, waves_event, time, bnd): 'Generate event wave files (swan compatible)' # wave variables hs = waves_event.hs.values[:] per = waves_event.per.values[:] direc = waves_event.dir.values[:] spr = waves_event.spr.values[:] # csv file num_data = len(time) data = np.zeros((num_data, 5)) data[:, 0] = time data[:, 1] = hs data[:, 2] = per data[:, 3] = direc data[:, 4] = spr # Copy file for all boundaries save = op.join(p_case, 'series_waves.dat') np.savetxt(save, data, header='TPAR', comments='', fmt='%8.4f %2.3f %2.3f %3.2f %3.1f') for i in bnd: su.copyfile(save, op.join(p_case, 'series_waves_{0}.dat'.format(i)))
[docs] def make_wind_files(self, p_case, waves_event): ''' Generate event wind mesh files (swan compatible) uses wave_event U10 and V10 values at the entire SWAN comp. grid ''' # wind variables u10 = waves_event.U10.values[:] v10 = waves_event.V10.values[:] # main mesh mm = self.proj.mesh_main # each time needs 2D (mesh) wind files (U,V) mxc = mm.cg['mxc'] # number mesh x myc = mm.cg['myc'] # number mesh y txt = '' for c, (u, v) in enumerate(zip(u10,v10)): # single point wind -> entire SWAN comp.grid wind aux = np.ones((mxc, myc)) # TODO: wind has to be rotated if alpc != 0 # csv file u_2d = aux * u v_2d = aux * v u_v_stack = np.vstack((u_2d, v_2d)) save = op.join(p_case, 'wind_{0:06}.dat'.format(c)) np.savetxt(save, u_v_stack, fmt='%.2f') # wind list file txt += 'wind_{0:06}.dat\n'.format(c) # winds file path save = op.join(p_case, 'series_wind.dat') with open(save, 'w') as f: f.write(txt)
[docs] def make_vortex_files(self, p_case, storm_track): ''' Generate event wind mesh files (swan compatible) uses wave_event storm path data over SWAN computational grid needs SPHERICAL COORDINATES ''' # parameters RE = 6378.135 # Earth radius # wind variables storm_move = storm_track.move.values[:] storm_vf = storm_track.vf.values[:] storm_lon = storm_track.lon.values[:] storm_lat = storm_track.lat.values[:] storm_pn = storm_track.pn.values[:] storm_p0 = storm_track.p0.values[:] times = storm_track.index[:] # main mesh mm = self.proj.mesh_main # comp. grid for generating vortex wind files mxc = mm.cg['mxc'] # number mesh x myc = mm.cg['myc'] # number mesh y # comp. grid lat, lon limits lon0 = mm.cg['xpc'] lat0 = mm.cg['ypc'] lon1 = mm.cg['xpc'] + mm.cg['xlenc'] lat1 = mm.cg['ypc'] + mm.cg['ylenc'] cg_lon = np.linspace(lon0, lon1, mxc) cg_lat = np.linspace(lat0, lat1, myc) mg_lon, mg_lat = np.meshgrid(cg_lon, cg_lat) # wind output holder hld_W = np.zeros((len(cg_lat), len(cg_lon), len(storm_move))) hld_D = np.zeros((len(cg_lat), len(cg_lon), len(storm_move))) # each time needs 2D (mesh) wind files (U,V) txt = '' for c, (lo, la, p0, pn, move, vf) in enumerate(zip( storm_lon, storm_lat, storm_p0, storm_pn, storm_move, storm_vf)): # get distance and angle between points arcl, beta = geo_distance_azimuth(mg_lat, mg_lon, la, lo) r = arcl * np.pi / 180.0 * RE if p0 < 900: p0 = 900 # fix p0 # Silva et al. 2010 RC = 0.4785 * p0 - 413.01 # TODO usar otro radio ciclostrofico? # Hydromet Rankin-Vortex model (eq. 76) pr = p0 + (pn - p0) * np.exp(-2*RC/r) py, px = np.gradient(pr) ang = np.arctan2(py, px) + np.sign(la) * np.pi/2.0 # Wind model w = 0.2618 # velocidad angular Earth (rad/h) f = 2 * w * np.sin(la*np.pi/180) # coriolis ur = 21.8 * np.sqrt(pn-p0) - 0.5 * f * RC # wind max grad (km/h) fv = np.zeros(mg_lon.shape) s1 = r/RC < 1 # eq. (9) Rodo (2009) fv[s1] = 1 - 0.971 * np.exp(-6.826 * np.power(r[s1]/RC, 4.798)) s2 = r/RC >=1 # eq. (10) Rodo (2009) nc = (f*RC)/ur A = -0.99 * (1.066-np.exp(-1.936*nc)) B = -0.357 * (1.4456-np.exp(-5.2388*nc)) fv[s2] = np.exp(A*np.power(np.log(r[s2]/RC),3) * \ np.exp(B*np.log(r[s2]/RC))) abnaut = move + beta ab = np.remainder(-abnaut+270, 360) *np.pi/180 # nautical to cartesian W = 0.986 * (fv*ur + 0.5*vf * np.cos(ab-np.pi/2)) W[W<0] = 0 # TODO: wind has to be rotated if alpc != 0 # csv file u_2d = W * np.cos(ang) / 3.6 # km/h --> m/s v_2d = W * np.sin(ang) / 3.6 # km/h --> m/s u_v_stack = np.vstack((u_2d, v_2d)) save = op.join(p_case, 'wind_{0:06}.dat'.format(c)) np.savetxt(save, u_v_stack, fmt='%.2f') # wind list file txt += 'wind_{0:06}.dat\n'.format(c) # hold wind data (m/s) hld_W[:,:,c] = W / 3.6 # km/h --> m/s hld_D[:,:,c] = 270 - np.rad2deg(ang) # direction (º clock. rel. north) # winds file path save = op.join(p_case, 'series_wind.dat') with open(save, 'w') as f: f.write(txt) # aux. save vortex wind fields p_vortex = op.join(p_case, 'vortex_wind.nc') xds_vortex = xr.Dataset( { 'W': (('lat','lon','time'), hld_W, {'units':'m/s'}), 'Dir': (('lat','lon','time'), hld_D, {'units':'º'}) }, coords={ 'Y' : cg_lat, 'X' : cg_lon, 'time' : times, } ) xds_vortex.attrs['xlabel'] = 'Longitude (º)' xds_vortex.attrs['ylabel'] = 'Latitude (º)' xds_vortex.to_netcdf(p_vortex)
[docs] def make_level_files(self, p_case, wave_event): 'Generate event level mesh files (swan compatible)' # parse pandas time index to swan iso format swan_iso_fmt = '%Y%m%d.%H%M' time = pd.to_datetime(wave_event.index).strftime(swan_iso_fmt).values[:] # level variables zeta = wave_event.level.values[:] tide = wave_event.tide.values[:] # main mesh mm = self.proj.mesh_main # each time needs 2D (mesh) level mxc = mm.cg['mxc'] # number mesh x myc = mm.cg['myc'] # number mesh y txt = '' for c, (z, t) in enumerate(zip(zeta, tide)): # single point level -> entire SWAN comp.grid level aux = np.ones((mxc, myc)).T # csv file l = z + t # total level l_2d = aux * l save = op.join(p_case, 'level_{0:06}.dat'.format(c)) np.savetxt(save, l_2d, fmt='%.2f') # level list file txt += 'level_{0:06}.dat\n'.format(c) # waves file path save = op.join(p_case, 'series_level.dat') with open(save, 'w') as f: f.write(txt)
[docs] def make_input(self, p_file, id_run, time, make_waves=True, make_winds=True, wvs_bnd=['N', 'E', 'W', 'S']): ''' Writes input.swn file from waves event for non-stationary execution p_file - input.swn file path time - event time at swan iso format make_waves - activates waves input files generation (at waves_bnd) make_winds - activates wind input files generation more info: http://swanmodel.sourceforge.net/online_doc/swanuse/node23.html ''' # event time (swan iso format) t0_iso = time[0] t1_iso = time[-1] # .swn file parameters sea_level = self.proj.params['sea_level'] jonswap_gamma = self.proj.params['jonswap_gamma'] cdcap = self.proj.params['cdcap'] maxerr = self.proj.params['maxerr'] coords_spherical = self.proj.params['coords_spherical'] waves_period = self.proj.params['waves_period'] # main mesh mm = self.proj.mesh_main # output points x_out = self.proj.x_out y_out = self.proj.y_out # computational data dt_comp = 5 # time step (minutes) # .swn text file t = "PROJ '{0}' '{1}'\n$\n".format(self.proj.name, id_run) t += 'MODE NONSTAT\n' # spherical coordinates (mercator) swich if coords_spherical: t += 'COORDINATES SPHER CCM\n' # cdcap cdcap_str = '' if cdcap: cdcap_str = 'cdcap={0}'.format(cdcap) # max error (caution) maxerr_str = '' if maxerr: maxerr_str = 'maxerr={0}'.format(maxerr) # set level and cdcap (if available) t += 'SET level={0} {1} {2} NAUTICAL\n$\n'.format( sea_level, cdcap_str, maxerr_str ) # computational grid t += 'CGRID REGULAR {0} {1} {2} {3} {4} {5} {6} CIRCLE 72 0.0345 1.00 34\n$\n'.format( mm.cg['xpc'], mm.cg['ypc'], mm.cg['alpc'], mm.cg['xlenc'], mm.cg['ylenc'], mm.cg['mxc']-1, mm.cg['myc']-1) # bathymetry t += 'INPGRID BOTTOM REGULAR {0} {1} {2} {3} {4} {5} {6}\n'.format( mm.dg['xpc'], mm.dg['ypc'], mm.dg['alpc'], mm.dg['mxc'], mm.dg['myc'], mm.dg['dxinp'], mm.dg['dyinp']) t += "READINP BOTTOM 1 '{0}' {1} 0 FREE\n$\n".format( mm.depth_fn, mm.dg_idla) # wind t += 'INPGRID WIND REGULAR {0} {1} {2} {3} {4} {5} {6} NONSTAT {7} 1 HR {8}\n'.format( mm.cg['xpc'], mm.cg['ypc'], mm.cg['alpc'], mm.cg['mxc']-1, mm.cg['myc']-1, mm.cg['dxinp'], mm.cg['dyinp'], t0_iso, t1_iso) t += "READINP WIND 1. SERIES '{0}' 3 0 FREE\n$\n".format('series_wind.dat') # level t += 'INPGRID WLEV REGULAR {0} {1} {2} {3} {4} {5} {6} NONSTAT {7} 1 HR {8}\n'.format( mm.cg['xpc'], mm.cg['ypc'], mm.cg['alpc'], mm.cg['mxc']-1, mm.cg['myc']-1, mm.cg['dxinp'], mm.cg['dyinp'], t0_iso, t1_iso) t += "READINP WLEV 1. SERIES '{0}' 3 0 FREE\n$\n".format('series_level.dat') # waves boundary conditions if make_waves: t += 'BOUND SHAPespec JONswap {0} {1} DSPR DEGR\n'.format( jonswap_gamma, waves_period) for ic in wvs_bnd: t += "BOUN SIDE {0} CONstant FILE 'series_waves_{0}.dat'\n".format(ic) # numerics & physics t += 'WIND DRAG WU\n' t += 'GEN3 ST6 5.7E-7 8.0E-6 4.0 4.0 UP HWANG VECTAU TRUE10\n' t += 'SSWELL\n' t += 'QUAD iquad=8\n' t += 'WCAP\n' t += 'PROP BSBT\n' if not coords_spherical: t += 'SETUP\n' # not compatible with spherical t += 'BREA\n' t += 'FRICTION JONSWAP\n$\n' t += 'TRIADS\n' t += 'DIFFRAC\n' # output t += "BLOCK 'COMPGRID' NOHEAD '{0}' LAY 3 HSIGN TM02 DIR TPS DSPR OUT {1} 1.0 HR\n$\n".format( mm.output_fn, t0_iso) # output points if not x_out or not y_out: pass else: t += "POINTS 'outpts' FILE 'points_out.dat'\n" t += "TABLE 'outpts' NOHEAD 'table_outpts.dat' DEP HS HSWELL DIR RTP TM02 DSPR WIND WATLEV OUT {0} {1} MIN\n$\n".format(t0_iso, dt_comp) # compute t += 'TEST 1,0\n' t += 'COMPUTE NONSTAT {0} {1} MIN {2}\n'.format(t0_iso, dt_comp, t1_iso) t += 'STOP\n$\n' # write file: with open(p_file, 'w') as f: f.write(t)
[docs] def build_case(self, case_id, waves_event, storm_track=None, make_waves=True, make_winds=True, waves_bnd=['N', 'E', 'W', 'S']): ''' Build SWAN NONSTAT case input files for given wave dataset case_id - SWAN case index (int) waves_event - waves event time series (pandas.Dataframe) also contains level, tide and wind (not storm track) variables [n x 8] (hs, per, dir, spr, U10, V10, level, tide) storm_track - None / storm track time series (pandas.Dataframe) storm_track generated winds have priority over waves_event winds [n x 6] (move, vf, lon, lat, pn, p0) ''' # SWAN case path p_case = op.join(self.proj.p_cases, case_id) # make execution dir if not op.isdir(p_case): os.makedirs(p_case) # make depth file for main mesh self.proj.mesh_main.export_dat(p_case) # make output points file self.make_out_points(op.join(p_case, 'points_out.dat')) # parse pandas time index to swan iso format swan_iso_fmt = '%Y%m%d.%H%M' time_swan = pd.to_datetime(waves_event.index).strftime(swan_iso_fmt).values[:] # make wave files if make_waves: self.make_wave_files(p_case, waves_event, time_swan, waves_bnd) # make wind files # TODO: vortex model, if active, will override wind files if make_winds: self.make_wind_files(p_case, waves_event) # vortex model for storm tracks if isinstance(storm_track, pd.DataFrame): self.make_vortex_files(p_case, storm_track) # make water level files self.make_level_files(p_case, waves_event) # make input.swn file self.make_input( op.join(p_case, 'input.swn'), case_id, time_swan, make_waves = make_waves, make_winds = make_winds, )
# TODO: add optional nested mesh depth and input files
[docs] def outmat2xr(self, p_mat): # matlab dictionary dmat = loadmat(p_mat) # get dates from one key hsfs = sorted([x for x in dmat.keys() if 'Hsig' in x]) dates_str = ['_'.join(x.split('_')[1:]) for x in hsfs] dates = [datetime.strptime(s,'%Y%m%d_%H%M%S') for s in dates_str] # read times l_times = [] for ds in dates_str: xds_t = xr.Dataset( { 'Hsig': (('X','Y',), dmat['Hsig_{0}'.format(ds)].T, {'units':'m'}), 'Tm02': (('X','Y',), dmat['Tm02_{0}'.format(ds)].T, {'units':'s'}), 'Dir': (('X','Y',), dmat['Dir_{0}'.format(ds)].T, {'units':'º'}), 'Dspr': (('X','Y',), dmat['Dspr_{0}'.format(ds)].T, {'units':'º'}), 'TPsmoo': (('X','Y',), dmat['TPsmoo_{0}'.format(ds)].T, {'units':'s'}), } ) l_times.append(xds_t) # join at times dim xds_out = xr.concat(l_times, dim='time') xds_out = xds_out.assign_coords(time=dates) return xds_out
[docs] def output_case(self, p_case, mesh): 'read .mat output file from non-stationary and returns xarray.Dataset' # extract output from selected mesh p_mat = op.join(p_case, mesh.output_fn) xds_out = self.outmat2xr(p_mat) # set X and Y values X, Y = mesh.get_XY() xds_out = xds_out.assign_coords(X=X) xds_out = xds_out.assign_coords(Y=Y) # rename to longitude latitude in spherical coords cases coords_spherical = self.proj.params['coords_spherical'] if coords_spherical != None: xds_out = xds_out.rename({'X':'lon', 'Y':'lat'}) return xds_out
[docs] def get_t0_dt(self, p_input): 'gets output points time_ini and delta_time (min) from SWAN input.swn file' # read input.swn and file data with open(p_input, 'r') as fR: ls = fR.readlines() lx = [x for x in ls if x.startswith('TABLE')][0].split(' ') t0_str = lx[-3] # start date dt_min = lx[-2] # dt (minutes) swan_iso_fmt = '%Y%m%d.%H%M' t0 = datetime.strptime(t0_str, swan_iso_fmt) return t0, dt_min
[docs] def output_points(self, p_case): 'read table_outpts.dat output file and returns xarray.Dataset' p_dat = op.join(p_case, 'table_outpts.dat') # variable names names = ['DEP', 'HS', 'HSWELL', 'DIR', 'RTP', 'TM02', 'DSPR', 'WIND', 'WATLEV', 'OUT' ] x_out = self.proj.x_out y_out = self.proj.y_out # points are mixed at output file np_pts = np.genfromtxt(p_dat) n_rows = np_pts.shape[0] # number of points n_pts = len(x_out) l_xds_pts = [] for i in range(n_pts): ix_p = np.arange(i, n_rows, n_pts) np_pti = np_pts[ix_p, :] xds_pti = xr.Dataset({}) #, coords='time') for c, n in enumerate(names): xds_pti[n] = (('time'), np_pti[:,c]) l_xds_pts.append(xds_pti) xds_out = xr.concat(l_xds_pts, dim='point') # add point x and y xds_out['x_point'] = (('point'), x_out) xds_out['y_point'] = (('point'), y_out) # add times dim values t0, dt_min = self.get_t0_dt(op.join(p_case, 'input.swn')) time_out = pd.date_range(t0, periods=len(xds_out.time), freq='{0}min'.format(dt_min)) xds_out = xds_out.assign_coords(time=time_out) return xds_out