Source code for bluemath_tk.tcs.shytcwaves

import datetime
import os.path as op
from typing import List, Optional, Tuple

import numpy as np
import pandas as pd
import xarray as xr
from tqdm import tqdm

from ..core.constants import EARTH_RADIUS
from ..core.geo import geodesic_azimuth, geodesic_distance_azimuth, shoot
from ..datamining.mda import find_nearest_indices
from ..waves.superpoint import superpoint_calculation
from .tracks import (
    PATHS,
    get_vmean,
    historic_track_interpolation,
    historic_track_preprocessing,
    track_triming,
)

###############################################################################
# STOPMOTION functions
# functions to preprocess and interpolate coordinates at swan computational
# timesteps for any storm track according to the SHyTCWaves methodology units:
# 6-hour target segment preceded by 24-hour warmup segments

# storm2stopmotion --> parameterization of a storm track into segments
# stopmotion_interpolation --> generate stop-motion events
###############################################################################


[docs] def storm2stopmotion(df_storm: pd.DataFrame) -> pd.DataFrame: """ Generate stopmotion segments from storm track. Parameters ---------- df_storm : pd.DataFrame Storm track DataFrame containing: - lon : Longitude coordinates - lat : Latitude coordinates - p0 : Minimum central pressure (mbar) - vmax : Maximum sustained winds (kt) - rmw : Radius of maximum winds (nmile) - vmaxfill : Boolean indicating if winds were filled with estimates - rmwfill : Boolean indicating if RMW was filled with estimates Returns ------- pd.DataFrame Stopmotion segments with columns: - vseg : Mean translational speed (kt) - dvseg : Speed variation (kt) - pseg : Segment pressure (mbar) - dpseg : Pressure variation (mbar) - wseg : Segment maximum winds (kt) - dwseg : Wind variation (kt) - rseg : Segment RMW (nmile) - drseg : RMW variation (nmile) - aseg : Azimuth from geographic North (degrees) - daseg : Azimuth variation (degrees) - lseg : Latitude (degrees) - laseg : Absolute latitude (degrees) - dlaseg : Latitude variation (degrees) - lon_w : Warmup origin longitude - lat_w : Warmup origin latitude - lon_i : Target origin longitude - lat_i : Target origin latitude - lon_t : Target endpoint longitude - lat_t : Target endpoint latitude Notes ----- The function generates stopmotion segments methodology from 6-hour storm track segments: A. Warmup segment (24h): - 4 segments to define start/end coordinates - Defines {Vmean, relative angle} - Last 4th segment defines mean {Pmin, Wmax, Rmw} - Endpoint defines {lat} B. Target segment (6h): Defines variations {dP, dV, dW, dR, dAng} The absolute value of latitude is stored (start of target segment). Relative angle is referenced to geographic north (southern hemisphere is multiplied by -1). """ # remove NaNs df_ = df_storm.dropna() df_["time"] = df_.index.values # constant segments variables lon = df_["lon"].values[:] lat = df_["lat"].values[:] pres = df_["p0"].values[:] # [mbar] wind = df_["vmax"].values[:] # [kt] rmw = df_["rmw"].values[:] # [nmile] # generate stopmotion segments: 24h warmup + 6h target segments # timestep [hours] dt = np.diff(df_.index) / np.timedelta64(1, "h") # warmup 4-segments (24h) variables lo0 = np.full(df_.shape[0], np.nan) # warmup x-coordinate la0 = np.full(df_.shape[0], np.nan) # warmup y-coordinate vseg = np.full(df_.shape[0], np.nan) # mean translational speed vxseg = np.full(df_.shape[0], np.nan) # (dirx) vyseg = np.full(df_.shape[0], np.nan) # (diry) aseg = np.full(df_.shape[0], np.nan) # azimuth, geographic North # warmup last-segment (6h) variables pseg = np.full(df_.shape[0], np.nan) # segment pressure wseg = np.full(df_.shape[0], np.nan) # segment maxwinds rseg = np.full(df_.shape[0], np.nan) # segment radii rmw lseg = np.full(df_.shape[0], np.nan) # latitude (north hemisphere) laseg = np.full(df_.shape[0], np.nan) # (absolute) # target segment (6h) variables lo1 = np.full(df_.shape[0], np.nan) # target origin x-coordinate la1 = np.full(df_.shape[0], np.nan) # idem y-coordinate dv = np.full(df_.shape[0], np.nan) # translational speed variation dvx = np.full(df_.shape[0], np.nan) # (dirx) dvy = np.full(df_.shape[0], np.nan) # (diry) da = np.full(df_.shape[0], np.nan) # azimuth variation dp = np.full(df_.shape[0], np.nan) # pressure variation dw = np.full(df_.shape[0], np.nan) # maxwinds variation dr = np.full(df_.shape[0], np.nan) # radii rmw variation dl = np.full(df_.shape[0], np.nan) # latitude variation dla = np.full(df_.shape[0], np.nan) # (absolute) lo2 = np.full(df_.shape[0], np.nan) # target endpoint x-coordinate la2 = np.full(df_.shape[0], np.nan) # idem y-coordinate # loop for i in np.arange(1, dt.size): # get stopmotion endpoints coordinates (24h+6h) if i < 4: # < four preceding segments # number of "missing" preceding segments to last 24h n_missing = 4 - i # last available preceding segment lon1, lon2 = lon[1], lon[0] lat1, lat2 = lat[1], lat[0] # distance of last available preceding segment arcl_h, gamma_h = geodesic_distance_azimuth(lat2, lon2, lat1, lon1) r = arcl_h * np.pi / 180.0 * EARTH_RADIUS # distance [km] # shoot backwards to calculate (lo0,la0) of 24h preceding warmup dist = r * n_missing glon, glat, baz = shoot(lon2, lat2, gamma_h + 180, dist) # endpoint coordinates (-24h, 0h, 6h) lon_0, lon_i, lon_i1 = glon, lon[i], lon[i + 1] lat_0, lat_i, lat_i1 = glat, lat[i], lat[i + 1] if i >= 4: # >= four preceding segments # endpoint coordinates (-24h, 0h, 6h) lon_0, lon_i, lon_i1 = lon[i - 4], lon[i], lon[i + 1] lat_0, lat_i, lat_i1 = lat[i - 4], lat[i], lat[i + 1] # segment endpoints lo0[i], lo1[i], lo2[i] = lon_0, lon_i, lon_i1 la0[i], la1[i], la2[i] = lat_0, lat_i, lat_i1 # warmup 4-segments (24h) variables _, vseg[i], vxseg[i], vyseg[i] = get_vmean(lat_0, lon_0, lat_i, lon_i, 24) aseg[i] = geodesic_azimuth(lat_0, lon_0, lat_i, lon_i) # aseg[i] = calculate_azimut(lon_0, lat_0, lon_i, lat_i) # warmup last-segment (6h) variables pseg[i] = pres[i - 1] wseg[i] = wind[i - 1] rseg[i] = rmw[i - 1] lseg[i] = lat_i laseg[i] = np.abs(lat_i) # target segment (6h) variables _, v, vx, vy = get_vmean(lat_i, lon_i, lat_i1, lon_i1, dt[i : i + 1].sum()) dv[i] = v - vseg[i] # [km/h] dvx[i] = vx - vxseg[i] dvy[i] = vy - vyseg[i] dp[i] = pres[i] - pres[i - 1] # [mbar] dw[i] = wind[i] - wind[i - 1] # [kt] dr[i] = rmw[i] - rmw[i - 1] # [nmile] dl[i] = lat_i1 - lat_i # [º] dla[i] = np.abs(dl[i]) # angle variation ang1 = aseg[i] ang2 = geodesic_azimuth(lat_i, lon_i, lat_i1, lon_i1) # ang2 = calculate_azimut(lon_i, lat_i, lon_i1, lat_i1) dt_ang = ang2 - ang1 # [º] sign = np.sign(lseg[i]) # hemisphere: north (+), south (-) if (ang2 > ang1) & (dt_ang < 180): da[i] = sign * (dt_ang) elif (ang2 > ang1) & (dt_ang > 180): da[i] = sign * (dt_ang - 360) elif (ang2 < ang1) & (dt_ang > -180): da[i] = sign * (dt_ang) elif (ang2 < ang1) & (dt_ang < -180): da[i] = sign * (dt_ang + 360) # add to dataframe df_["vseg"] = vseg / 1.852 # [kt] # df_['vxseg'] = vxseg / 1.852 # df_['vyseg'] = vyseg / 1.852 df_["dvseg"] = dv / 1.852 # df_['dvxseg'] = dvx / 1.852 # df_['dvyseg'] = dvy / 1.852 df_["pseg"] = pseg # [mbar] df_["dpseg"] = dp df_["wseg"] = wseg # [kt, 1-min avg] df_["dwseg"] = dw df_["rseg"] = rseg # [nmile] df_["drseg"] = dr df_["aseg"] = aseg # [º] df_["daseg"] = da df_["lseg"] = lseg # [º] df_["laseg"] = laseg df_["dlaseg"] = dla df_["lon_w"] = lo0 # warmup origin df_["lat_w"] = la0 df_["lon_i"] = lo1 # target origin df_["lat_i"] = la1 df_["lon_t"] = lo2 # target endpoint df_["lat_t"] = la2 return df_
[docs] def stopmotion_interpolation( df_seg: pd.DataFrame, st: pd.DataFrame = None, t_warm: int = 24, t_seg: int = 6, t_prop: int = 42, ) -> Tuple[List[pd.DataFrame], List[pd.DataFrame]]: """ Generate SWAN cases in cartesian coordinates from stopmotion parameterized segments. Parameters ---------- df_seg : pd.DataFrame Stopmotion segments containing: - vseg : Mean translational speed (kt) - pseg : Segment pressure (mbar) - wseg : Maximum winds (kt) - rseg : RMW (nmile) - laseg : Absolute latitude (degrees) - dvseg : Speed variation (kt) - dpseg : Pressure variation (mbar) - dwseg : Wind variation (kt) - drseg : RMW variation (nmile) - daseg : Azimuth variation (degrees) st : pd.DataFrame, optional Real storm track data. If None, MDA segments are used (unrelated to historic tracks). t_warm : int, optional Warmup period in hours. Default is 24. t_seg : int, optional Target period in hours. Default is 6. t_prop : int, optional Propagation period in hours. Default is 42. Returns ------- Tuple[List[pd.DataFrame], List[pd.DataFrame]] Two lists containing: 1. List of storm track DataFrames with columns: - x, y : Cartesian coordinates (m) - lon, lat : Geographic coordinates (degrees) - vf : Translation speed (kt) - vfx, vfy : Velocity components (kt) - pn : Surface pressure (1013 mbar) - p0 : Minimum central pressure (mbar) - vmax : Maximum winds (kt) - rmw : RMW (nmile) - latitude : Latitude with hemisphere sign 2. List of empty wave event DataFrames with columns: - hs, t02, dir, spr : Wave parameters - U10, V10 : Wind components - level, tide : Water level parameters Notes ----- The function generates SWAN cases in cartesian coordinates following SHyTCWaves configuration: A. Warmup period (24h): Over the negative x-axis ending at (x,y)=(0,0) B. Target period (6h): Starting at (x,y)=(0,0) C. Propagation period (42h): No track coordinates (no wind forcing) """ # sign hemisphere (+north, -south) if isinstance(st, pd.DataFrame): # historic track sign = np.sign(st["lat"][0]) method = st.attrs["method"] center = st.attrs["center"] else: # mda cases sign = 1 method = "mda" center = "mda" # remove NaNs df = df_seg.dropna() # number of stopmotion events N = df.shape[0] # list of SWAN cases (paramterized events) st_list, we_list = [], [] for i in range(N): seg_i = df.iloc[i] # stopmotion unit parameters vseg = seg_i["vseg"] * 1.852 # [km/h] dvseg = seg_i["dvseg"] * 1.852 pseg = seg_i["pseg"] # [mbar] dpseg = seg_i["dpseg"] wseg = seg_i["wseg"] # [kt, 1-min avg] dwseg = seg_i["dwseg"] rseg = seg_i["rseg"] # [nmile] drseg = seg_i["drseg"] daseg = seg_i["daseg"] # [º] laseg = seg_i["laseg"] # [º] # vmean criteria for SWAN computational timestep [minutes] seg_vmean = vseg + dvseg if (vseg > 20) or (seg_vmean > 20): dt_comp = 10 else: dt_comp = 20 # time array for SWAN input ts = t_warm + t_seg + t_prop # [h] simulation duration ts = np.asarray(ts) * 60 / dt_comp # [] intervals of computation ts_warmup = int(t_warm * 60 / dt_comp) ts_segment = int(t_seg * 60 / dt_comp) # random initial date date_ini = pd.Timestamp(1999, 12, 31, 0) time_input = pd.date_range( date_ini, periods=int(ts), freq="{0}MIN".format(dt_comp) ) time_input = np.array(time_input) # vortex input variables x = np.full(int(ts), np.nan) # [m] y = np.full(int(ts), np.nan) # [m] vmean = np.full(int(ts), np.nan) # [km/h] ut = np.full(int(ts), np.nan) vt = np.full(int(ts), np.nan) p0 = np.full(int(ts), np.nan) # [mbar] vmax = np.full(int(ts), np.nan) # [kt, 1-min avg] rmw = np.full(int(ts), np.nan) # [nmile] lat = np.full(int(ts), np.nan) # [º] # (A) preceding 24h segment: over negative x-axis ending at (x,y)=(0,0) for j in np.arange(0, ts_warmup): if j == 0: x[j] = -vseg * 24 * 10**3 else: x[j] = x[j - 1] + vseg * (dt_comp / 60) * 10**3 y[j] = 0 vmean[j] = vseg ut[j] = vseg vt[j] = 0 p0[j] = pseg vmax[j] = wseg rmw[j] = rseg lat[j] = laseg # (B) target 6h segment: starting at (x,y)=(0,0) for j in np.arange(ts_warmup, ts_warmup + ts_segment): vel = vseg + dvseg # [km/h] velx = vel * np.sin((daseg * sign + 90) * np.pi / 180) vely = vel * np.cos((daseg * sign + 90) * np.pi / 180) x[j] = x[j - 1] + velx * (dt_comp / 60) * 10**3 y[j] = y[j - 1] + vely * (dt_comp / 60) * 10**3 vmean[j] = vel ut[j] = velx vt[j] = vely p0[j] = pseg + dpseg vmax[j] = wseg + dwseg rmw[j] = rseg + drseg lat[j] = laseg # (C) propagation 42h segment: remaining values of data arrays # store dataframe st_seg = pd.DataFrame( index=time_input, columns=["x", "y", "vf", "vfx", "vfy", "pn", "p0", "vmax", "rmw", "lat"], ) st_seg["x"] = x # [m] st_seg["y"] = y st_seg["lon"] = x # (idem for plots) st_seg["lat"] = y st_seg["vf"] = vmean / 1.852 # [kt] st_seg["vfx"] = ut / 1.852 st_seg["vfy"] = vt / 1.852 st_seg["pn"] = 1013 # [mbar] st_seg["p0"] = p0 st_seg["vmax"] = vmax # [kt] st_seg["rmw"] = rmw # [nmile] st_seg["latitude"] = lat * sign # [º] # add metadata st_seg.attrs = { "method": method, "center": center, "override_dtcomp": "{0} MIN".format(dt_comp), "x0": 0, "y0": 0, "p0": "mbar", "vf": "kt", "vmax": "kt, 1-min avg", "rmw": "nmile", } # append to stopmotion event list st_list.append(st_seg) # generate wave event (empty) we = pd.DataFrame( index=time_input, columns=["hs", "t02", "dir", "spr", "U10", "V10"] ) we["level"] = 0 we["tide"] = 0 we_list.append(we) return st_list, we_list
############################################################################### # STOPMOTION ensemble # functions that collect 6h-segments from library cases to obtain the hybrid # storm track (analogue or closest to the real track); those segments are # rotated and assigned time-geographical coordinates. The ensemble and the # envelope is calculate at each control point ###############################################################################
[docs] def find_analogue( df_library: pd.DataFrame, df_case: pd.DataFrame, ix_weights: List[float] ) -> np.ndarray: """ Find the minimum distance in a 10-dimensional normalized space. Parameters ---------- df_library : pd.DataFrame Library parameters DataFrame containing the 10 SHyTCWaves parameters: {pseg, vseg, wseg, rseg, dp, dv, dw, dr, dA, lat} df_case : pd.DataFrame Target case parameters DataFrame with same structure as df_library. ix_weights : List[float] Weight factors for each parameter dimension. Returns ------- np.ndarray Indices of nearest points in the library for each case point. Notes ----- The function finds the minimum distance in a normalized space corresponding to the SHyTCWaves 10 parameters: - pseg : Segment pressure - vseg : Mean translational speed - wseg : Maximum winds - rseg : RMW - dp : Pressure variation - dv : Speed variation - dw : Wind variation - dr : RMW variation - dA : Azimuth variation - lat : Latitude """ # remove NaNs from storm segments df_case = df_case.dropna() data_case = df_case[ [ "daseg", "dpseg", "pseg", "dwseg", "wseg", "dvseg", "vseg", "drseg", "rseg", "laseg", ] ].values # library segments parameter data_lib = df_library[ [ "daseg", "dpseg", "pseg", "dwseg", "wseg", "dvseg", "vseg", "drseg", "rseg", "laseg", ] ].values ix_directional = [0] # get indices of nearest n-dimensional point ix_near = find_nearest_indices( query_points=data_case, reference_points=data_lib, directional_indices=ix_directional, weights=ix_weights, ) return ix_near
[docs] def analogue_endpoints(df_seg: pd.DataFrame, df_analogue: pd.DataFrame) -> pd.DataFrame: """ Add segment endpoint coordinates by looking up the real target segment. Parameters ---------- df_seg : pd.DataFrame Parameterized historical storm track DataFrame containing: - lon, lat : Target origin coordinates - aseg : Warmup azimuth - daseg : Target azimuth variation df_analogue : pd.DataFrame Analogue segments from library containing: - vseg : Mean translational speed (kt) - dvseg : Speed variation (kt) - daseg : Target azimuth variation Returns ------- pd.DataFrame Updated analogue segments DataFrame with added columns: - aseg : Warmup azimuth - lon_w, lat_w : Warmup origin coordinates - lon_i, lat_i : Target origin coordinates (from df_seg) - lon_t, lat_t : Target endpoint coordinates Notes ----- The function: 1. Calculates warmup origin by shooting backwards 24h from target origin 2. Uses target origin from historical track 3. Calculates target endpoint by shooting forwards 6h from target origin 4. Accounts for hemisphere when calculating angles 5. Converts longitudes to [0-360°] convention """ # remove NaNs df_seg = df_seg[~df_seg.isna().any(axis=1)] # df_analogue = df_analogue[~df_analogue.isna().any(axis=1)] # get hemisphere # relative angles are multiplied by "sign" to account for hemisphere sign = np.sign(df_seg.lat.values[0]) # get historic variables lon0 = df_seg.lon.values # target origin coords lat0 = df_seg.lat.values aseg = df_seg.aseg.values # warmup azimuth daseg = df_seg.daseg.values * sign # target azimuth variation # get analogue variables vseg1_an = df_analogue.vseg.values # warmup velocity [kt] dvseg_an = df_analogue.dvseg.values vseg2_an = vseg1_an + dvseg_an # target velocity daseg_an = df_analogue.daseg.values * sign # target azimuth variation # new variables az1_an = np.full(lon0.shape, np.nan) glon1_an = np.full(lon0.shape, np.nan) glon2_an = np.full(lon0.shape, np.nan) glat1_an = np.full(lon0.shape, np.nan) glat2_an = np.full(lon0.shape, np.nan) for i in range(df_seg.shape[0]): # azimuth angles for ensemble az2 = aseg[i] + daseg[i] # target azimuth (historic-fixed) az1 = az2 - daseg_an[i] # warmup azimuth (stopmotion analogue) # shoot backwards to warmup origin dist1 = vseg1_an[i] * 1.852 * 24 # [km] glon1, glat1, baz = shoot(lon0[i], lat0[i], az1 + 180, dist1) # shoot forwards to target endpoint dist2 = vseg2_an[i] * 1.852 * 6 # [km] glon2, glat2, baz = shoot(lon0[i], lat0[i], az2 + 180 - 180, dist2) # store az1_an[i] = az1 glon1_an[i] = glon1 glon2_an[i] = glon2 glat1_an[i] = glat1 glat2_an[i] = glat2 # longitude convention glon1_an[glon1_an < 0] += 360 glon2_an[glon2_an < 0] += 360 # add to dataframe df_analogue["aseg"] = az1_an df_analogue["lon_w"] = glon1_an # warmup origin df_analogue["lat_w"] = glat1_an df_analogue["lon_i"] = df_seg.lon.values df_analogue["lat_i"] = df_seg.lat.values df_analogue["lon_t"] = glon2_an df_analogue["lat_t"] = glat2_an return df_analogue
[docs] def stopmotion_st_bmu( df_analogue: pd.DataFrame, df_seg: pd.DataFrame, st: pd.DataFrame, cp_lon_ls: List[float], cp_lat_ls: List[float], max_dist: float = 60, list_out: bool = False, tqdm_out: bool = False, text_out: bool = True, mode: str = "", ) -> xr.Dataset: """ Extract bulk wave parameters from library analogue cases. Parameters ---------- df_analogue : pd.DataFrame Analogue prerun segments from library. df_seg : pd.DataFrame Storm 6h-segments parameters. st : pd.DataFrame Storm track interpolated every 6h. cp_lon_ls : List[float] Control point longitude coordinates. cp_lat_ls : List[float] Control point latitude coordinates. max_dist : float, optional Maximum distance (km) to extract closest node. Default is 60. list_out : bool, optional Whether to return list of datasets instead of merged dataset. Default is False. tqdm_out : bool, optional Whether to show progress bar. Default is False. text_out : bool, optional Whether to show text output. Default is True. mode : str, optional High or low resolution library indices. Default is "". Returns ------- xr.Dataset Wave directional spectra with dimensions: - case : Storm segments - point : Control points - time : Time steps Contains variables: - hs : Significant wave height - tp : Peak period - lon, lat : Control point coordinates - ix_near : Nearest point indices - pos_nonan : Valid point mask - bmu : Best matching unit indices - hsbmu : Maximum Hs per point and time - tpbmu : Tp at maximum Hs - hswath : Maximum Hs per point - tswath : Tp at maximum Hs per point Notes ----- The function: 1. Accesses library analogue cases for a given storm track 2. Calculates distance and angle from target segment origin to control points 3. Extracts wave parameters at closest nodes for each analogue segment 4. Computes bulk parameter envelope and swath 5. Handles both high and low resolution libraries """ # remove NaN df_seg = df_seg[~df_seg.isna().any(axis=1)] df_analogue = df_analogue[~df_analogue.isna().any(axis=1)] # assign time df_seg["time"] = df_seg.index.values # get hemisphere sign = np.sign(df_seg.lat.values[0]) xds_list = [] if tqdm_out: array = tqdm(range(df_seg.shape[0])) else: array = range(df_seg.shape[0]) for iseg in array: # each segment # get storm segment 'i' df_icase = df_seg.iloc[iseg] df_ianalogue = df_analogue.iloc[iseg] iseg_analogue = df_ianalogue.name # analogue id aseg = df_ianalogue.aseg # analogue warmseg azimuth (N) # --------------------------------------------------------------------- # get storm coordinates at "seg_time" seg_time = np.datetime64(df_icase.time) # timestamp to datetime64 ind_time_st = np.where(st.index.values == seg_time)[0][0] # storm coordinates (real storm eye) st_lon = st.iloc[ind_time_st].lon # target origin longitude st_lat = st.iloc[ind_time_st].lat # target origin latitude st_time = st.index.values[ind_time_st] # time coordinates # --------------------------------------------------------------------- # get cp coordinates in relative system (radii, angle) cp_dist_ls, cp_ang_ls = get_cp_radii_angle( st_lat, st_lon, cp_lat_ls, cp_lon_ls, sign, aseg ) # get SWAN output mask indices (rad, ang) mask_rad, mask_ang = get_mask_radii_angle(iseg_analogue, mode=mode) # find closest index ix_near = find_nearest(cp_dist_ls, cp_ang_ls, mask_rad, mask_ang) pos_nonan = np.abs(mask_rad[ix_near] - cp_dist_ls) <= max_dist # --------------------------------------------------------------------- # load library Hs reconstructed xds_rec = xr.open_dataset(PATHS["SHYTCWAVES_BULK"]) xds_rec["time"] = pd.date_range("2000-01-01", periods=48, freq="1H") # extract HS,TP at closest points (case,point,time) hs_arr = np.full((ix_near.size, xds_rec.time.values.size), np.nan) hs_arr[pos_nonan, :] = ( xds_rec.hs.isel(case=iseg_analogue, point=ix_near[pos_nonan]).load().values ) tp_arr = np.full((ix_near.size, xds_rec.time.values.size), np.nan) tp_arr[pos_nonan, :] = ( xds_rec.tp.isel(case=iseg_analogue, point=ix_near[pos_nonan]).load().values ) # time array hour_intervals = xds_rec.time.size time = [st_time + np.timedelta64(1, "h") * i for i in range(hour_intervals)] time_array = np.array(time) # store dataset xds = xr.Dataset( { "hs": (("case", "point", "time"), np.expand_dims(hs_arr, axis=0)), "tp": (("case", "point", "time"), np.expand_dims(tp_arr, axis=0)), "lon": (("point"), cp_lon_ls), "lat": (("point"), cp_lat_ls), "ix_near": (("case", "point"), np.expand_dims(ix_near, axis=0)), "pos_nonan": (("case", "point"), np.expand_dims(pos_nonan, axis=0)), }, coords={ "case": [iseg], "time": time_array, }, ) xds_list.append(xds) if list_out: xds_out = xds_list else: # merge xds_out = xr.merge(xds_list) if text_out: print("Merging bulk envelope...", datetime.datetime.now()) # add envelope variables xds_bmu = xds_out.copy() hsval = xds_bmu.hs.values[:] hsval[np.isnan(hsval)] = 0 # remove nan bmu = np.argmax(hsval, axis=0).astype(float) # bmu indices hsmax = np.sort(hsval, axis=0)[-1, :, :] # max over 'case' # bmu, hs bmu[hsmax == 0] = np.nan # restitute nans hsmax[hsmax == 0] = np.nan xds_out["bmu"] = (("point", "time"), bmu) xds_out["hsbmu"] = (("point", "time"), hsmax) # tp tpmax = np.full(xds_out.hsbmu.shape, np.nan) nanmask = ~np.isnan(bmu) mesht, meshp = np.meshgrid( np.arange(0, xds_out.time.size), np.arange(0, xds_out.point.size) ) tpmax[nanmask] = xds_out.tp.values[ bmu.ravel()[nanmask.ravel()].astype("int64"), meshp.ravel()[nanmask.ravel()], mesht.ravel()[nanmask.ravel()], ] xds_out["tpbmu"] = (("point", "time"), tpmax) # add swath variables hh = hsmax.copy() hh[np.isnan(hh)] = 0 posw = np.argmax(hh, axis=1) xds_out["hswath"] = (("point"), hsmax[np.arange(0, xds_out.point.size), posw]) xds_out["tswath"] = (("point"), tpmax[np.arange(0, xds_out.point.size), posw]) return xds_out
[docs] def stopmotion_st_spectra( df_analogue: pd.DataFrame, df_seg: pd.DataFrame, st: pd.DataFrame, cp_lon_ls: List[float], cp_lat_ls: List[float], cp_names: List[str] = [], max_dist: float = 60, list_out: bool = False, tqdm_out: bool = False, text_out: bool = True, mode: str = "", ) -> Tuple[xr.Dataset, xr.Dataset]: """ Function to access the library analogue cases for a given storm track, calculate distance and angle from the target segment origin to the control point (relative coordinate system), and extract the directional wave spectra at the closest node (for every analogue segment) Parameters ---------- df_analogue : pd.DataFrame Analogue prerun segments from library. df_seg : pd.DataFrame Storm 6h-segments parameters. st : pd.DataFrame Storm track interpolated every 6h. cp_lon_ls : List[float] Control point geographical coordinates. cp_lat_ls : List[float] Control point geographical coordinates. cp_names : List[str], optional Control point names. Default is []. max_dist : float, optional Maximum distance [km] to extract closest node. Default is 60. list_out : bool, optional Whether to list output. Default is False. tqdm_out : bool, optional Whether to use tqdm. Default is False. text_out : bool, optional Whether to print text. Default is True. mode : str, optional Mode. Default is "". Returns ------- Tuple[xr.Dataset, xr.Dataset] - xds_spec : Wave directional spectra (dim 'case') - xds_bmu : BMU indices Notes ----- The function: 1. Removes NaN values from df_seg and df_analogue 2. Assigns time to df_seg 3. Gets hemisphere sign 4. Gets bmu (wavespectra reconstructed) 5. Opens seg_sim dataset """ # remove NaN df_seg = df_seg[~df_seg.isna().any(axis=1)] df_analogue = df_analogue[~df_analogue.isna().any(axis=1)] # assign time df_seg["time"] = df_seg.index.values # get hemisphere sign = np.sign(df_seg.lat.values[0]) # get bmu (wavespectra reconstructed) # it provides 'time,bmu,ix_near,pos_nonan' (point,time) xds_bmu = stopmotion_st_bmu( df_analogue=df_analogue, df_seg=df_seg, st=st, cp_lon_ls=cp_lon_ls, cp_lat_ls=cp_lat_ls, max_dist=max_dist, list_out=list_out, tqdm_out=tqdm_out, text_out=text_out, mode=mode, ) # spectral energy seg_sim = xr.open_dataset( op.join(PATHS["SHYTCWAVES_SPECTRA"], "0000/spec_outpts_main.nc") ) efth_arr = np.full( ( seg_sim.frequency.size, seg_sim.direction.size, xds_bmu.point.size, xds_bmu.time.size, ), np.nan, ) # 38,72,cp,t if tqdm_out: array = tqdm(range(xds_bmu.case.size)) else: array = range(xds_bmu.case.size) for iseg in array: # get storm segment 'i' df_icase = df_seg.iloc[iseg] df_ianalogue = df_analogue.iloc[iseg] iseg_analogue = df_ianalogue.name # analogue id # --------------------------------------------------------------------- # get storm coordinates at "seg_time" seg_time = np.datetime64(df_icase.time) # timestamp to datetime64 st_time = st.index.values[st.index.values == seg_time][0] # --------------------------------------------------------------------- # get analogue segment from library filename = "spec_outpts_main.nc" p_analogue = op.join( PATHS["SHYTCWAVES_SPECTRA"], f"{iseg_analogue:04d}", filename ) # load file seg_sim = xr.open_dataset(p_analogue) # freq,dir,2088,48 # time array hour_intervals = seg_sim.time.size time = [st_time + np.timedelta64(1, "h") * i for i in range(hour_intervals)] time_array = np.array(time) # get intersect time iseg vs xds_bmu _, ix_time_st, ix_time_shy = np.intersect1d( time_array, xds_bmu.time.values, return_indices=True ) # find all closest grid points shy_inear = xds_bmu.ix_near.values[iseg, :].astype("int64") # case,point # find bmu indices for iseg in_pt, in_t = np.where(xds_bmu.bmu.values == iseg) # get indices of casei in_t_ = in_t - ix_time_shy[0] # reorder spectral directions base = 5 if mode == "_lowres": base = 10 # depends on the library dirs delta efth_case = seg_sim.isel(point=shy_inear) # .isel(point=in_pt, time=in_t_) if sign < 0: efth_case["direction"] = 360 - seg_sim.direction.values new_dirs = np.round( efth_case.direction.values + base * round(df_icase.aseg / base) + 90, 1 ) else: new_dirs = np.round( efth_case.direction.values + base * round(df_icase.aseg / base) - 90, 1 ) new_dirs = np.mod(new_dirs, 360) new_dirs[new_dirs > 270] -= 360 efth_case["direction"] = new_dirs efth_case = efth_case.sel(direction=seg_sim.direction.values) # insert spectral values for bmu=iseg efth_arr[:, :, in_pt, in_t] = efth_case.efth.values[:, :, in_pt, in_t_] if text_out: print("Inserting envelope spectra...", datetime.datetime.now()) # store dataset xds_spec = xr.Dataset( { "efth": (("freq", "dir", "point", "time"), efth_arr), "lon": (("point"), np.array(cp_lon_ls)), "lat": (("point"), np.array(cp_lat_ls)), "station": (("point"), np.array(cp_names)), }, coords={ "freq": seg_sim.frequency.values, "dir": seg_sim.direction.values, "point": xds_bmu.point.values, "time": xds_bmu.time.values, }, ) return xds_spec, xds_bmu
[docs] def get_cp_radii_angle( st_lat: float, st_lon: float, cp_lat_ls: List[float], cp_lon_ls: List[float], sign: int, aseg: float, ) -> Tuple[List[float], List[float]]: """ Extract control point distances and angles in the relative coordinate system. Parameters ---------- st_lat : float Storm center latitude. st_lon : float Storm center longitude. cp_lat_ls : List[float] Control point latitudes. cp_lon_ls : List[float] Control point longitudes. sign : int Hemisphere indicator: 1 for north, -1 for south. aseg : float Azimuth of the analogue warm segment (from geographic north). Returns ------- Tuple[List[float], List[float]] Two lists containing: - cp_dist_ls : Distances from storm center to control points (km) - cp_ang_ls : Angles from storm center to control points (degrees) Notes ----- The function: 1. Calculates great circle distances between storm center and control points 2. Converts distances from degrees to kilometers 3. Calculates angles relative to geographic north 4. Transforms angles to relative coordinate system 5. Adjusts angles for southern hemisphere """ cp_dist_ls, cp_ang_ls = [], [] for i in range(len(cp_lat_ls)): cp_lat, cp_lon = cp_lat_ls[i], cp_lon_ls[i] # get point polar reference # azimut is refered to geographical north (absolute system) arcl_h, ang_abs = geodesic_distance_azimuth(st_lat, st_lon, cp_lat, cp_lon) cp_dist_ls.append(arcl_h * np.pi / 180.0 * EARTH_RADIUS) # [km] # change of coordinate system (absolute to relative) ang_rel = ang_abs - (aseg - 90) if ang_rel < 0: ang_rel = np.mod(ang_rel, 360) # south hemisphere effect if sign == -1: if (ang_rel >= 0) and (ang_rel <= 180): ang_rel = 180 - ang_rel elif (ang_rel >= 180) and (ang_rel <= 360): ang_rel = 360 - (ang_rel - 180) cp_ang_ls.append(ang_rel) return cp_dist_ls, cp_ang_ls # [km], [º]
[docs] def get_mask_radii_angle(icase: int, mode: str = "") -> Tuple[np.ndarray, np.ndarray]: """ Extract radii and angle indices for output points. Parameters ---------- icase : int Analogue case ID. mode : str, optional Option to select SHyTCWaves library resolution ('', '_lowres'). Default is "". Returns ------- Tuple[np.ndarray, np.ndarray] Two arrays containing: - rad : Radial distances from storm center (km) - ang : Angles from storm center (degrees) Notes ----- The function: 1. Loads output indices associated with distances/angles to target origin 2. Determines grid size (small/medium/large) based on case ID 3. Extracts appropriate radii and angle arrays for the grid size 4. For low resolution mode, uses half the number of radial points """ # load output indices associated with distances/angles to target origin if mode == "_lowres": xds_mask_ind = xr.open_dataset(PATHS["SHYTCWAVES_MDA_MASK_INDICES_LOWRES"]) else: xds_mask_ind = xr.open_dataset(PATHS["SHYTCWAVES_MDA_MASK_INDICES"]) # load MDA indices (grid sizes) xds_ind_mda = xr.open_dataset(PATHS["SHYTCWAVES_MDA_INDICES"]) # get grid code pos_small = np.where(icase == xds_ind_mda.indices_small)[0] pos_medium = np.where(icase == xds_ind_mda.indices_medium)[0] pos_large = np.where(icase == xds_ind_mda.indices_large)[0] if len(pos_small) == 1: rad = xds_mask_ind.radii_sma.values[:] / 1000 ang = xds_mask_ind.angle_sma.values[:] elif len(pos_medium) == 1: rad = xds_mask_ind.radii_med.values[:] / 1000 ang = xds_mask_ind.angle_med.values[:] elif len(pos_large) == 1: rad = xds_mask_ind.radii_lar.values[:] / 1000 ang = xds_mask_ind.angle_lar.values[:] return rad, ang # [km], [º]
[docs] def find_nearest( cp_rad_ls: List[float], cp_ang_ls: List[float], mask_rad: np.ndarray, mask_ang: np.ndarray, ) -> np.ndarray: """ Find nearest points in normalized space of radii and angles. Parameters ---------- cp_rad_ls : List[float] Control point radial distances. cp_ang_ls : List[float] Control point angles. mask_rad : np.ndarray SWAN output point radial distances. mask_ang : np.ndarray SWAN output point angles. Returns ------- np.ndarray Indices of nearest points in SWAN output grid for each control point. Notes ----- The function: 1. Creates dataframes from control points and SWAN grid points 2. Treats radial distances as scalar values 3. Treats angles as directional values (circular) 4. Uses nearest neighbor search in normalized space """ # create dataframes df_cp = pd.DataFrame({"radii": cp_rad_ls, "angle": cp_ang_ls}) df_mask = pd.DataFrame({"radii": mask_rad, "angle": mask_ang}) # indices ix_directional = [1] # get indices of nearest n-dimensional point ix_near = find_nearest_indices( query_points=df_cp.values, reference_points=df_mask.values, directional_indices=ix_directional, ) return ix_near
############################################################################### # SHyTCWaves APPLICATION # Functions that calculate the ensemble/reconstruction for a storm track # from either historical or forecast/predicted tracks ###############################################################################
[docs] def get_coef_calibration() -> np.ndarray: """ Get calibration coefficients for SHyTCWaves model pressure bias correction. Returns ------- np.ndarray Linear fit coefficients for pressure bias correction. Notes ----- The function: 1. Uses Saffir-Simpson category center pressures as reference points 2. Applies calibrated pressure deltas based on validation against satellite data 3. Performs linear fit to get correction coefficients """ p = [1015, 990, 972, 954, 932, 880] # Saffir-Simpson center categories dp = [-17, -15, -12.5, -7, +2.5, +10] # calibrated "dP" for shytcwaves coef = np.polyfit(p, dp, 1) # order 1 fitting return coef
########################################## # SHYTCWAVES - historical track
[docs] def historic2shytcwaves_cluster( path_save: str, tc_name: str, storm: xr.Dataset, center: str, lon: np.ndarray, lat: np.ndarray, dict_site: Optional[dict] = None, calibration: bool = True, mode: str = "", database_on: bool = False, st_param: bool = False, extract_bulk: bool = True, max_segments: int = 300, ) -> None: """ Process historical storm track data using SHyTCWaves methodology. Parameters ---------- path_save : str Base path to store results, without the file name. tc_name : str Storm name. storm : xr.Dataset Storm track dataset with standard IBTrACS variables: Required: - longitude, latitude : Storm coordinates - pressure : Central pressure (mbar) - maxwinds : Maximum sustained winds (kt) Optional: - rmw : Radius of maximum winds (nmile) - dist2land : Distance to land - basin : Basin identifier center : str IBTrACS center code. lon : np.ndarray Longitude coordinates for swath calculation. lat : np.ndarray Latitude coordinates for swath calculation. dict_site : dict, optional Site data for superpoint building. Default is None. Must contain: - lonpts : Longitude coordinates - latpts : Latitude coordinates - namepts : Site names - site : Site identifier - sectors : Sectors - deg_superposition : Superposition degree calibration : bool, optional Whether to apply SHyTCWaves calibration. Default is True. mode : str, optional High or low resolution library indices. Default is "". database_on : bool, optional Whether to keep data only at 0,6,12,18 hours. Default is False. st_param : bool, optional Whether to keep data as original. Default is False. extract_bulk : bool, optional Whether to extract bulk wave parameters. Default is True. max_segments : int, optional Maximum number of segments to process. Default is 300. Notes ----- The function processes historical storm tracks in several steps: 1. Performs stopmotion segmentation at 6h intervals 2. Optionally applies SHyTCWaves calibration to track parameters 3. Trims track to target domain 4. Finds analogue segments from library 5. Extracts bulk wave parameters and/or spectral data 6. Saves results to specified directory """ # stopmotion segmentation, 6h interval df = historic_track_preprocessing( xds=storm, center=center, forecast_on=False, database_on=database_on, st_param=st_param, ) dt_int_minutes = 6 * 60 # [minutes] constant segments # optional: shytcwaves calibration of track parameters if calibration: coef = get_coef_calibration() # linear fitting df["pressure"] = df["pressure"].values * (1 + coef[0]) + coef[1] df["maxwinds"] = np.nan st, _ = historic_track_interpolation( df, dt_int_minutes, interpolation=False, mode="mean", fit=True, radi_estimate_on=True, ) else: st, _ = historic_track_interpolation( df, dt_int_minutes, interpolation=False, mode="mean" ) # skip when only NaN or 0 lons, lats = st.lon.values, st.lat.values if (np.unique(lons[~np.isnan(lons)]).all() == 0) & ( np.unique(lats[~np.isnan(lats)]).all() == 0 ): print("No track coordinates") else: st_trim = track_triming(st, lat[0], lon[0], lat[-1], lon[-1]) # store tracks for shytcwaves st.to_pickle(op.join(path_save, f"{tc_name}_track.pkl")) st_trim.to_pickle(op.join(path_save, f"{tc_name}_track_trim.pkl")) # parameterized segts (24h warmup + 6htarget) df_seg = storm2stopmotion(st_trim) if df_seg.shape[0] > 2: print(f"st: {st.shape[0]}, df_seg: {df_seg.shape[0]}") st_list, we_list = stopmotion_interpolation(df_seg, st=st_trim) # analogue segments from library df_mda = xr.open_dataset(PATHS["SHYTCWAVES_MDA"]).to_dataframe() ix_weights = [1] * 10 # equal weights ix = find_analogue(df_mda, df_seg, ix_weights) df_analogue = df_mda.iloc[ix] df_analogue = analogue_endpoints(df_seg, df_analogue) # extract bulk envelope (to plot swaths) if extract_bulk: mesh_lo, mesh_la = np.meshgrid(lon, lat) print( f"Number of segments: {len(st_list)}, number of swath nodes: {mesh_lo.size}" ) if len(st_list) < max_segments: xds_shy_bulk = stopmotion_st_bmu( df_analogue, df_seg, st_trim, list(np.ravel(mesh_lo)), list(np.ravel(mesh_la)), max_dist=60, mode=mode, ) # store xds_shy_bulk.to_netcdf( op.join(path_save, f"{tc_name}_xds_shy_bulk.nc") ) # extract spectra envelope if isinstance(dict_site, dict): xds_shy_spec, _ = stopmotion_st_spectra( df_analogue, df_seg, st_trim, cp_lon_ls=dict_site["lonpts"], cp_lat_ls=dict_site["latpts"], cp_names=dict_site["namepts"], mode=mode, ) # store xds_shy_spec.to_netcdf( op.join( path_save, f"{tc_name}_xds_shy_spec_{dict_site['site']}.nc", ) ) # build superpoint xds_shy_sp = superpoint_calculation( xds_shy_spec.efth, "point", dict_site["sectors"], ) # store xds_shy_sp.to_netcdf( op.join( path_save, f"{tc_name}_xds_shy_sp_{dict_site['site']}.nc", ) ) print("Files stored.")