Source code for bluemath_tk.wrappers.swash.swash_wrapper

import os
import re
from typing import List, Tuple, Union

import numpy as np
import pandas as pd
import xarray as xr
from scipy.signal import find_peaks

from ...waves.series import series_regular_monochromatic, series_TMA, waves_dispersion
from ...waves.spectra import spectral_analysis
from ...waves.statistics import upcrossing
from .._base_wrappers import BaseModelWrapper

np.random.seed(42)  # TODO: check global behavior.


[docs] def convert_seconds_to_hour_minutes_seconds(seconds): seconds = seconds % (24 * 3600) hour = seconds // 3600 seconds %= 3600 minutes = seconds // 60 seconds %= 60 return f"{str(int(hour)).zfill(2)}{str(int(minutes)).zfill(2)}{str(int(seconds)).zfill(2)}.000"
[docs] class SwashModelWrapper(BaseModelWrapper): """ Wrapper for the SWASH model. https://swash.sourceforge.io/online_doc/swashuse/swashuse.html#input-and-output-files Attributes ---------- default_parameters : dict The default parameters type for the wrapper. available_launchers : dict The available launchers for the wrapper. postprocess_functions : dict The postprocess functions for the wrapper. Methods ------- build_cases -> None Create the cases folders and render the input files. list_available_postprocess_vars -> List[str] List available postprocess variables. _read_tabfile -> pd.DataFrame Read a tab file and return a pandas DataFrame. _convert_case_output_files_to_nc -> xr.Dataset Convert tab files to netCDF file. get_case_percentage_from_file -> float Get the case percentage from the output log file. monitor_cases -> pd.DataFrame Monitor the cases and log relevant information. postprocess_case -> xr.Dataset Convert tab ouput files to netCDF file. join_postprocessed_files -> xr.Dataset Join postprocessed files in a single Dataset. find_maximas -> Tuple[np.ndarray, np.ndarray] Find the individual maxima of an array. get_waterlevel -> xr.Dataset Get water level from the output netCDF file. calculate_runup2 -> xr.Dataset Calculates runup 2% (Ru2) from the output netCDF file. calculate_runup -> xr.Dataset Stores runup from the output netCDF file. calculate_setup -> xr.Dataset Calculates mean setup (Msetup) from the output netCDF file. calculate_statistical_analysis -> xr.Dataset Calculates zero-upcrossing analysis to obtain individual wave heights (Hi) and wave periods (Ti). calculate_spectral_analysis -> xr.Dataset Makes a water level spectral analysis (scipy.signal.welch) then separates incident waves, infragravity waves, very low frequency waves. """ default_parameters = { "Hs": { "type": float, "value": None, "description": "Significant wave height.", }, "Hs_L0": { "type": float, "value": None, "description": "Wave height at deep water.", }, "WL": { "type": float, "value": None, "description": "Water level.", }, "vegetation_height": { "type": float, "value": None, "description": "The vegetation height.", }, "dxinp": { "type": float, "value": 1.0, "description": "The input spacing.", }, "n_nodes_per_wavelength": { "type": int, "value": 60, "description": "The number of nodes per wavelength.", }, "comptime": { "type": int, "value": 180, "description": "The computational time.", }, "warmup": { "type": int, "value": 0, "description": "The warmup time.", }, "gamma": { "type": int, "value": 2, "description": "The gamma parameter.", }, "deltat": { "type": int, "value": 1, "description": "The time step.", }, } available_launchers = { "serial": "swash_serial.exe", "mpi": "mpirun -np 2 swash_mpi.exe", "docker_serial": "docker run --rm -v .:/case_dir -w /case_dir geoocean/rocky8 swash_serial.exe", "docker_mpi": "docker run --rm -v .:/case_dir -w /case_dir geoocean/rocky8 mpirun -np 2 swash_mpi.exe", "geoocean-cluster": "launchSwash.sh", } postprocess_functions = { "Ru2": "calculate_runup2", "Runlev": "calculate_runup", "Msetup": "calculate_setup", "Hrms": "calculate_statistical_analysis", "Hfreqs": "calculate_spectral_analysis", } def __init__( self, templates_dir: str, metamodel_parameters: dict, fixed_parameters: dict, output_dir: str, depth_array: np.ndarray, templates_name: dict = "all", debug: bool = True, ) -> None: """ Initialize the SWASH model wrapper. """ super().__init__( templates_dir=templates_dir, metamodel_parameters=metamodel_parameters, fixed_parameters=fixed_parameters, output_dir=output_dir, templates_name=templates_name, default_parameters=self.default_parameters, ) self.set_logger_name( name=self.__class__.__name__, level="DEBUG" if debug else "INFO" ) self.depth_array = depth_array self.mxinp = len(self.depth_array) - 1 self.xlenc = int(self.mxinp * self.fixed_parameters["dxinp"]) self.fixed_parameters["mxinp"] = self.mxinp self.fixed_parameters["xlenc"] = self.xlenc self.fixed_parameters["tendc"] = convert_seconds_to_hour_minutes_seconds( self.fixed_parameters["comptime"] + self.fixed_parameters["warmup"] )
[docs] def build_case( self, case_context: dict, case_dir: str, ) -> None: """ Build the input files for a case. Parameters ---------- case_context : dict The case context. case_dir : str The case directory. """ # Build the input waves waves_dict = { "H": case_context["Hs"], "T": np.sqrt( (case_context["Hs"] * 2 * np.pi) / (self.gravity * case_context["Hs_L0"]) ), "warmup": case_context["warmup"], "comptime": case_context["comptime"], "gamma": case_context["gamma"], "deltat": case_context["deltat"], } # waves = series_TMA(waves=waves_dict, depth=self.depth_array[0]) waves = series_regular_monochromatic(waves=waves_dict) # Save the waves to a file self.write_array_in_file( array=waves, filename=os.path.join(case_dir, "waves.bnd") ) # Calculate computational parameters # Assuming there is always 1m of setup due to (IG, VLF) L1, _k1, _c1 = waves_dispersion(T=waves_dict["T"], h=1.0) _L, _k, c = waves_dispersion(T=waves_dict["T"], h=self.depth_array[0]) dx = L1 / case_context["n_nodes_per_wavelength"] # Computational time step deltc = 0.5 * dx / (np.sqrt(self.gravity * self.depth_array[0]) + np.abs(c)) # Computational grid modifications mxc = int(self.mxinp / dx) # Update the case context case_context["mxc"] = mxc case_context["deltc"] = deltc
[docs] def list_available_postprocess_vars(self) -> List[str]: """ List available postprocess variables. Returns ------- List[str] The available postprocess variables. """ return list(self.postprocess_functions.keys())
@staticmethod def _read_tabfile(file_path: str) -> pd.DataFrame: """ Read a tab file and return a pandas DataFrame. This function is used to read the output files of SWASH. Parameters ---------- file_path : str The file path. Returns ------- pd.DataFrame The pandas DataFrame. """ f = open(file_path, "r") lines = f.readlines() # read head colums (variables names) names = lines[4].split() names = names[1:] # Eliminate '%' # read data rows values = pd.Series(lines[7:]).str.split(expand=True).values.astype(float) df = pd.DataFrame(values, columns=names) f.close() return df def _convert_case_output_files_to_nc( self, case_num: int, output_path: str, run_path: str ) -> xr.Dataset: """ Convert tab files to netCDF file. Parameters ---------- case_num : int The case number. output_path : str The output path. run_path : str The run path. Returns ------- xr.Dataset The xarray Dataset. """ df_output = self._read_tabfile(file_path=output_path) df_output[["Xp", "Yp", "Tsec"]] = df_output[["Xp", "Yp", "Tsec"]].astype( int ) # TODO: check if this is correct df_output.set_index( ["Xp", "Yp", "Tsec"], inplace=True ) # set index to Xp, Yp and Tsec ds_output = df_output.to_xarray() df_run = self._read_tabfile(file_path=run_path) df_run[["Tsec"]] = df_run[["Tsec"]].astype( int ) # TODO: check if this is correct df_run.set_index(["Tsec"], inplace=True) ds_run = df_run.to_xarray() # merge output files to one xarray.Dataset ds = xr.merge([ds_output, ds_run], compat="no_conflicts") # assign correct coordinate case_num ds.coords["case_num"] = case_num return ds
[docs] def get_case_percentage_from_file(self, output_log_file: str) -> str: """ Get the case percentage from the output log file. Parameters ---------- output_log_file : str The output log file. Returns ------- str The case percentage. """ if not os.path.exists(output_log_file): return "0 %" progress_pattern = r"\[\s*(\d+)%\]" with open(output_log_file, "r") as f: for line in reversed(f.readlines()): match = re.search(progress_pattern, line) if match: return f"{match.group(1)} %" return "0 %" # if no progress is found
[docs] def monitor_cases(self, value_counts: str = None) -> Union[pd.DataFrame, dict]: """ Monitor the cases based on different model log files. """ cases_status = {} for case_dir in self.cases_dirs: case_dir_name = os.path.basename(case_dir) if os.path.exists(os.path.join(case_dir, "Errfile")): cases_status[case_dir_name] = "Errfile" elif os.path.exists(os.path.join(case_dir, "norm_end")): cases_status[case_dir_name] = "END" else: run_tab_file = os.path.join(case_dir, "run.tab") if os.path.exists(run_tab_file): run_tab = self._read_tabfile(file_path=run_tab_file) if run_tab.isnull().values.any(): cases_status[case_dir_name] = "NaN" continue else: cases_status[case_dir_name] = "No run.tab" continue output_log_file = os.path.join(case_dir, "wrapper_out.log") progress = self.get_case_percentage_from_file( output_log_file=output_log_file ) cases_status[case_dir_name] = progress return super().monitor_cases( cases_status=cases_status, value_counts=value_counts )
[docs] def postprocess_case( self, case_num: int, case_dir: str, output_vars: List[str] = None, overwrite_output: bool = True, overwrite_output_postprocessed: bool = True, remove_tab: bool = False, remove_nc: bool = False, ) -> xr.Dataset: """ Convert tab output files to netCDF file. Parameters ---------- case_num : int The case number. case_dir : str The case directory. output_vars : list, optional The output variables to postprocess. Default is None. overwrite_output : bool, optional Overwrite the output.nc file. Default is True. overwrite_output_postprocessed : bool, optional Overwrite the output_postprocessed.nc file. Default is True. remove_tab : bool, optional Remove the tab files. Default is False. remove_nc : bool, optional Remove the netCDF file. Default is False. Returns ------- xr.Dataset The postprocessed Dataset. """ import warnings warnings.filterwarnings("ignore") self.logger.info(f"[{case_num}]: Postprocessing case {case_num} in {case_dir}.") if output_vars is None: self.logger.debug(f"[{case_num}]: Postprocessing all available variables.") output_vars = list(self.postprocess_functions.keys()) output_nc_path = os.path.join(case_dir, "output.nc") if not os.path.exists(output_nc_path) or overwrite_output: # Convert tab files to netCDF file output_path = os.path.join(case_dir, "output.tab") run_path = os.path.join(case_dir, "run.tab") output_nc = self._convert_case_output_files_to_nc( case_num=case_num, output_path=output_path, run_path=run_path ) output_nc.to_netcdf(output_nc_path) else: self.logger.info(f"[{case_num}]: Reading existing output.nc file.") output_nc = xr.open_dataset(output_nc_path) processed_nc_path = os.path.join(case_dir, "output_postprocessed.nc") if not os.path.exists(processed_nc_path) or overwrite_output_postprocessed: # Postprocess variables from output.nc var_ds_list = [] for var in output_vars: if var in self.postprocess_functions: self.logger.debug(f"[{case_num}]: Postprocessing variable {var}.") var_ds = getattr(self, self.postprocess_functions[var])( case_num=case_num, case_dir=case_dir, output_nc=output_nc ) var_ds_list.append(var_ds) else: # If the variable is present in output_nc, extract and squeeze it if var in output_nc: self.logger.debug(f"[{case_num}]: Extracting variable {var}.") var_ds = output_nc[var].squeeze() var_ds_list.append(var_ds) else: self.logger.warning( f"[{case_num}]: Variable {var} is not available for postprocessing." ) # Merge all variables in one Dataset ds = xr.merge(var_ds_list, compat="no_conflicts") self.logger.info( f"[{case_num}]: Creating new output_postprocessed.nc file from output.nc." ) ds.to_netcdf(processed_nc_path) else: self.logger.info( f"[{case_num}]: Reading existing output_postprocessed.nc file." ) ds = xr.open_dataset(processed_nc_path) # Remove raw files to save space if remove_tab: os.remove(output_path) os.remove(run_path) if remove_nc: os.remove(output_nc_path) return ds
[docs] def join_postprocessed_files( self, postprocessed_files: List[xr.Dataset] ) -> xr.Dataset: """ Join postprocessed files in a single Dataset. Parameters ---------- postprocessed_files : list The postprocessed files. Returns ------- xr.Dataset The joined Dataset. """ return xr.concat(postprocessed_files, dim="case_num")
[docs] def find_maximas(self, x: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: """ Find the individual maxima of an array. Parameters ---------- x : np.ndarray The array (should be the water level time series). Returns ------- Tuple[np.ndarray, np.ndarray] The peaks and the values of the peaks. """ peaks, _ = find_peaks(x=x) return peaks, x[peaks]
[docs] def calculate_runup2( self, case_num: int, case_dir: str, output_nc: xr.Dataset ) -> xr.Dataset: """ Calculates runup 2% (Ru2) from the output netCDF file. Parameters ---------- case_num : int The case number. case_dir : str The case directory. output_nc : xr.Dataset The output netCDF file. Returns ------- xr.Dataset The runup 2% (Ru2). """ # get runup runup = output_nc["Runlev"].values # find individual wave uprushes _, val_peaks = self.find_maximas(runup) # calculate ru2 ru2 = np.percentile(val_peaks, 98) # create xarray Dataset with ru2 value depending on case_num ds = xr.Dataset({"Ru2": ("case_num", [ru2])}, {"case_num": [case_num]}) return ds
[docs] def calculate_runup( self, case_num: int, case_dir: str, output_nc: xr.Dataset ) -> xr.Dataset: """ Stores runup from the output netCDF file. Parameters ---------- case_num : int The case number. case_dir : str The case directory. output_nc : xr.Dataset The output netCDF file. Returns ------- xr.Dataset The runup. """ # get runup ds = output_nc["Runlev"] return ds
[docs] def calculate_setup( self, case_num: int, case_dir: str, output_nc: xr.Dataset ) -> xr.Dataset: """ Calculates mean setup (Msetup) from the output netCDF file. Parameters ---------- case_num : int The case number. case_dir : str The case directory. output_nc : xr.Dataset The output netCDF file. Returns ------- xr.Dataset The mean setup (Msetup). """ # create xarray Dataset with mean setup ds = output_nc["Watlev"].mean(dim="Tsec") ds = ds.to_dataset() # eliminate Yp dimension ds = ds.squeeze() # rename variable ds = ds.rename({"Watlev": "Msetup"}) return ds
[docs] def calculate_statistical_analysis( self, case_num: int, case_dir: str, output_nc: xr.Dataset ) -> xr.Dataset: """ Calculates zero-upcrossing analysis to obtain individual wave heights (Hi) and wave periods (Ti). Parameters ---------- case_num : int The case number. case_dir : str The case directory. output_nc : xr.Dataset The output netCDF file. Returns ------- xr.Dataset The statistical analysis. """ # for every X coordinate in domain df_Hrms = pd.DataFrame() # for x in output_nc["Xp"].values: # dsw = output_nc.sel(Xp=x) # # obtain series of water level # series_water = dsw["Watlev"].values # time_series = dsw["Tsec"].values # # perform statistical analysis # # _, Hi = upcrossing(time_series, series_water) # _, Hi = upcrossing(np.vstack([time_series, series_water]).T) # Hi = np.std(series_water) # # Calculo de Pablo Zubia # #standard_deviation = np.std(series_water) # # calculate Hrms # Hrms_x = np.sqrt(np.sum(Hi**2)/len(Hi)) # df_Hrms.loc[x, "Hrms"] = Hrms_x # # convert pd DataFrame to xr Dataset # df_Hrms.index.name = "Xp" # ds = df_Hrms.to_xarray() # # assign coordinate case_num # ds = ds.assign_coords({"case_num": [output_nc["case_num"].values]}) # return ds for x in output_nc["Xp"].values: dsw = output_nc.sel(Xp=x) # obtain series of water level series_water = dsw["Watlev"].values # time_series = dsw['Tsec'].values standard_deviation = np.std(series_water) Hrms_x = 2 * np.sqrt(2 * standard_deviation**2) df_Hrms.loc[x, "Hrms"] = Hrms_x # convert pd DataFrame to xr Dataset df_Hrms.index.name = "Xp" ds = df_Hrms.to_xarray() # assign coordinate case_id ds = ds.assign_coords({"case_num": [output_nc["case_num"].values]}) return ds
[docs] def calculate_spectral_analysis( self, case_num: int, case_dir: str, output_nc: xr.Dataset ) -> xr.Dataset: """ Makes a water level spectral analysis (scipy.signal.welch) then separates incident waves, infragravity waves, very low frequency waves. Parameters ---------- case_num : int The case number. case_dir : str The case directory. output_nc : xr.Dataset The output netCDF file. Returns ------- xr.Dataset The spectral analysis. """ delttbl = np.diff(output_nc["Tsec"].values)[1] df_H_spectral = pd.DataFrame() for x in output_nc["Xp"].values: dsw = output_nc.sel(Xp=x) series_water = dsw["Watlev"].values # calculate significant, SS, IG and VLF wave heighs Hs, Hss, Hig, Hvlf = spectral_analysis(series_water, delttbl) df_H_spectral.loc[x, "Hs"] = Hs df_H_spectral.loc[x, "Hss"] = Hss df_H_spectral.loc[x, "ig"] = Hig df_H_spectral.loc[x, "Hvlf"] = Hvlf # convert pd DataFrame to xr Dataset df_H_spectral.index.name = "Xp" ds = df_H_spectral.to_xarray() # assign coordinate case_num ds = ds.assign_coords({"case_num": [output_nc["case_num"].values]}) return ds
[docs] class HySwashVeggyModelWrapper(SwashModelWrapper): """ Wrapper for the SWASH model with vegetation. """
[docs] def build_case(self, case_context: dict, case_dir: str) -> None: super().build_case(case_context=case_context, case_dir=case_dir) # Build the input vegetation file plants = np.zeros((len(self.depth_array))) plants[ int(case_context["Plants_end"] - case_context["Wv"]) : int( case_context["Plants_end"] ) ] = case_context["Nv"] np.savetxt(os.path.join(case_dir, "plants.txt"), plants, fmt="%.6f")
[docs] class ChySwashModelWrapper(SwashModelWrapper): """ Wrapper for the SWASH model with friction. """ default_Cf = 0.0002
[docs] def build_case(self, case_context: dict, case_dir: str) -> None: super().build_case(case_context=case_context, case_dir=case_dir) # Build the input friction file friction = np.ones((len(self.depth_array))) * self.default_Cf friction[ int(self.fixed_parameters["Cf_ini"]) : int(self.fixed_parameters["Cf_fin"]) ] = case_context["Cf"] np.savetxt(os.path.join(case_dir, "friction.txt"), friction, fmt="%.6f")