Source code for bluemath_tk.wrappers.xbeach.xbeach_wrapper

import math
import os
from typing import List, Union

import numpy as np
import pandas as pd
import xarray as xr

from .._base_wrappers import BaseModelWrapper


[docs] class XBeachModelWrapper(BaseModelWrapper): """ Wrapper for the XBeach model. https://xbeach.readthedocs.io/en/latest/ Attributes ---------- default_parameters : dict The default parameters type for the wrapper. available_launchers : dict The available launchers for the wrapper. """ default_parameters = { "comptime": { "type": int, "value": 3600, "description": "The computational time.", }, "wbctype": { "type": str, "value": "off", "description": "The time step for the simulation.", }, } available_launchers = { "geoocean-cluster": "launchXbeach.sh", } def __init__( self, templates_dir: str, metamodel_parameters: dict, fixed_parameters: dict, output_dir: str, templates_name: dict = "all", debug: bool = True, ) -> None: """ Initialize the XBeach model wrapper. """ super().__init__( templates_dir=templates_dir, metamodel_parameters=metamodel_parameters, fixed_parameters=fixed_parameters, output_dir=output_dir, templates_name=templates_name, default_parameters=self.default_parameters, ) self.set_logger_name( name=self.__class__.__name__, level="DEBUG" if debug else "INFO" )
[docs] def build_case( self, case_context: dict, case_dir: str, ) -> None: """ Build the input files for a case. Parameters ---------- case_context : dict The case context. case_dir : str The case directory. """ if case_context["wbctype"] == "jonstable": with open(f"{case_dir}/jonswap.txt", "w") as f: for _i in range(math.ceil(case_context["comptime"] / 3600)): f.write( f"{case_context['Hs']} {case_context['Tp']} {case_context['Dir']} 3.300000 30.000000 3600.000000 1.000000 \n" )
def _get_average_var(self, case_nc: xr.Dataset, var: str) -> np.ndarray: """ Get the average value of a variable except for the first hour of the simulation Parameters ---------- case_nc : xr.Dataset Simulation .nc file. var : str Variable of interest. Returns ------- np.ndarray The average value of the variable. """ if var in case_nc: return np.mean( case_nc[var] .isel(meantime=slice(1, int(case_nc.meantime.values[-1]))) .values, axis=0, ) def _get_max_var(self, case_nc: xr.Dataset, var: str) -> np.ndarray: """ Get the Max value of a variable except for the first hour of the simulation Parameters ---------- case_nc : xr.Dataset Simulation .nc file. var : str Variable of interest. Returns ------- np.ndarray The max value of the variable. """ if var in case_nc: return np.max( case_nc[var] .isel(meantime=slice(1, int(case_nc.meantime.values[-1]))) .values, axis=0, )
[docs] def monitor_cases(self, value_counts: str = None) -> Union[pd.DataFrame, dict]: """ Monitor the cases based on different model log files. """ cases_status = {} for case_dir in self.cases_dirs: case_dir_name = os.path.basename(case_dir) if os.path.exists(os.path.join(case_dir, "XBlog.txt")): if ( os.path.exists(os.path.join(case_dir, "XBerror.txt")) & os.path.getsize(os.path.join(case_dir, "XBerror.txt")) != 0 ): cases_status[case_dir_name] = "XBerror.txt" continue else: with open(os.path.join(case_dir, "XBlog.txt"), "r") as f: lines = f.readlines()[-2:] if any("End of program xbeach" in line.lower() for line in lines): cases_status[case_dir_name] = "End of run" continue else: cases_status[case_dir_name] = "Running" continue else: cases_status[case_dir_name] = "No run" continue return super().monitor_cases( cases_status=cases_status, value_counts=value_counts )
[docs] def postprocess_case( self, case_num: int, case_dir: str, output_vars: List[str] = None, overwrite_output: bool = True, ) -> xr.Dataset: """ Convert tab output files to netCDF file. Parameters ---------- case_num : int The case number. case_dir : str The case directory. output_vars : list, optional The output variables to postprocess. Default is None. overwrite_output : bool, optional Overwrite the output.nc file. Default is True. Returns ------- xr.Dataset The postprocessed Dataset. """ import warnings warnings.filterwarnings("ignore") self.logger.info(f"[{case_num}]: Postprocessing case {case_num} in {case_dir}.") output_nc_path = os.path.join(case_dir, "xboutput_postprocessed.nc") if not os.path.exists(output_nc_path) or overwrite_output: output_raw = xr.open_dataset(os.path.join(case_dir, "xboutput.nc")) globalx = output_raw.globalx.values globaly = output_raw.globaly.values zb = output_raw.zb.values[0] y = np.arange(globalx.shape[0]) x = np.arange(globalx.shape[1]) ds = xr.Dataset( { "globalx": (("y", "x"), globalx), "globaly": (("y", "x"), globaly), "zb": (("y", "x"), zb), }, coords={"y": y, "x": x}, ) for var in output_vars: if var == "zs_max": maxed = self._get_max_var(case_nc=output_raw, var=var) masked = xr.where(ds["zb"] > 0, np.nan, maxed) ds[var] = (("y", "x"), masked.data) else: averaged = self._get_average_var(case_nc=output_raw, var=var) masked = xr.where(ds["zb"] > 0, np.nan, averaged) ds[var] = (("y", "x"), masked.data) ds = ds.drop_vars("zb") ds.to_netcdf(output_nc_path) return ds else: self.logger.info( f"[{case_num}]: Reading existing xboutput_postprocessed.nc file." ) output_nc = xr.open_dataset(output_nc_path) return output_nc
[docs] def join_postprocessed_files( self, postprocessed_files: List[xr.Dataset] ) -> xr.Dataset: """ Join postprocessed files in a single Dataset. Parameters ---------- postprocessed_files : list The postprocessed files. Returns ------- xr.Dataset The joined xarray.Dataset. """ return xr.concat(postprocessed_files, dim="case_num")