Source code for bluemath_tk.wrappers.sfincs.sfincs_wrapper

import os
import os.path as op
from typing import List

import numpy as np
import pandas as pd
from hydromt_sfincs import SfincsModel

from .._base_wrappers import BaseModelWrapper


[docs] class SfincsModelWrapper(BaseModelWrapper): """ Wrapper for the SFINCS model. Attributes ---------- default_parameters : dict The default parameters type for the wrapper. available_launchers : dict The available launchers for the wrapper. """ default_parameters = {} available_launchers = { "docker": "docker run --rm -v .:/case_dir -w /case_dir deltares/sfincs-cpu", "cluster": "launchSfincs.sh", } def __init__( self, templates_dir: str, metamodel_parameters: dict, fixed_parameters: dict, output_dir: str, templates_name: dict = "all", debug: bool = True, ) -> None: """ Initialize the SFINCS model wrapper. """ super().__init__( templates_dir=templates_dir, metamodel_parameters=metamodel_parameters, fixed_parameters=fixed_parameters, output_dir=output_dir, templates_name=templates_name, default_parameters=self.default_parameters, ) self.set_logger_name( name=self.__class__.__name__, level="DEBUG" if debug else "INFO" )
[docs] def setup_dem(self, sf: SfincsModel, case_context: dict) -> List[dict]: """ Setup the DEM for the SFINCS model. """ ds = sf.data_catalog.get_rasterdataset( data_like=case_context.get("path_to_dem_tif"), variables=[case_context.get("dem_tif_var_name")], geom=sf.region, meta={"version": "1"}, ) datasets_dep = [{"da": ds}] # "zmin": 0.001 sf.setup_dep(datasets_dep=datasets_dep) return datasets_dep
[docs] def setup_friction(self, sf: SfincsModel, case_context: dict) -> List[dict]: """ Setup the friction for the SFINCS model. """ dataset_rgh = sf.data_catalog.get_rasterdataset( data_like=case_context.get("path_to_rgh_tif"), ) datasets_rgh = [{"manning": dataset_rgh}] sf.setup_manning_roughness( datasets_rgh=datasets_rgh, rgh_lev_land=0, # the minimum elevation of the land ) return datasets_rgh
[docs] def setup_infiltration(self, sf: SfincsModel, case_context: dict) -> List[dict]: """ Setup the infiltration for the SFINCS model. """ p_infiltration = case_context.get("path_to_inf_tif") ant_moisture = "avg" dataset_inf = sf.data_catalog.get_rasterdataset(p_infiltration) dataset_inf.name = "cn_{0}".format(ant_moisture) sf.setup_cn_infiltration( dataset_inf.compute(), antecedent_moisture="{0}".format(ant_moisture) ) return dataset_inf
[docs] def setup_outflow(self, sf: SfincsModel, case_context: dict) -> None: """ Setup the outflow for the SFINCS model. """ gdf = sf.data_catalog.get_geodataframe( data_like=case_context.get("path_to_outflow_shp") ) sf.setup_mask_bounds(btype="outflow", include_mask=gdf, reset_bounds=True)
[docs] def setup_waterlevel_mask(self, sf: SfincsModel, case_context: dict) -> None: """ Setup the waterlevel mask for the SFINCS model. """ gdf = sf.data_catalog.get_geodataframe( data_like=case_context.get("path_to_waterlevel_shp") ) sf.setup_mask_bounds(btype="waterlevel", include_mask=gdf, reset_bounds=True)
[docs] def set_ctimes(self, case_context: dict) -> str: """ Determine the start time (TSTART) end time (TSTOP) for the simulation based on precipitation and water level forcing, add 1 hour, and return it formatted as 'YYYYMMDD HHMMSS'. """ precip_forcing = case_context.get("precipitation_forcing") waterlevel_forcing = case_context.get("waterlevel_forcing") tstart_times, tstop_times = [], [] if precip_forcing is not None: tstart_precip = precip_forcing["time"].values[0] tstop_precip = precip_forcing["time"].values[-1] tstart_times.append(tstart_precip) tstop_times.append(tstop_precip) if waterlevel_forcing is not None: tstart_waterlevel = waterlevel_forcing.index.values[0] tstop_waterlevel = waterlevel_forcing.index.values[-1] tstart_times.append(tstart_waterlevel) tstop_times.append(tstop_waterlevel) if not tstart_times: raise ValueError("No forcing data found to determine TSTART.") if not tstop_times: raise ValueError("No forcing data found to determine TSTOP.") # Get the latest time and add one hour tstop_max = max(tstop_times) tstop_plus_1h = tstop_max + np.timedelta64(1, "h") tstart_min = min(tstart_times) # Convert to pandas.Timestamp (works with numpy.datetime64) then format formatted_tstop = pd.to_datetime(tstop_plus_1h).strftime("%Y%m%d %H%M%S") formatted_tstart = pd.to_datetime(tstart_min).strftime("%Y%m%d %H%M%S") return formatted_tstart, formatted_tstop
[docs] def build_template_case(self) -> None: """ Build a base SFINCS model case used as a template for multiple simulations. This function sets up all the static components of the model that are common to all cases, such as grid definition, DEM, friction, infiltration, subgrid configuration, and boundary masks. """ sf = SfincsModel(root=self.templates_dir, mode="w+") sf.setup_grid( x0=self.fixed_parameters["x0"], y0=self.fixed_parameters["y0"], dx=self.fixed_parameters["dx"], dy=self.fixed_parameters["dy"], nmax=self.fixed_parameters["nmax"], mmax=self.fixed_parameters["mmax"], rotation=self.fixed_parameters["rotation"], epsg=self.fixed_parameters["epsg"], ) datasets_dep = self.setup_dem(sf=sf, case_context=self.fixed_parameters) sf.setup_mask_active( mask=self.fixed_parameters.get("path_to_mask"), ) datasets_rgh = self.setup_friction(sf, case_context=self.fixed_parameters) _dataset_inf = self.setup_infiltration(sf, case_context=self.fixed_parameters) sf.setup_subgrid( datasets_dep=datasets_dep, datasets_rgh=datasets_rgh, nr_subgrid_pixels=self.fixed_parameters.get("nr_subgrid_pixels"), write_dep_tif=True, write_man_tif=False, ) self.setup_outflow(sf=sf, case_context=self.fixed_parameters) self.setup_waterlevel_mask(sf=sf, case_context=self.fixed_parameters) _ = sf.plot_basemap(bmap="sat", zoomlevel=12) sf.write()
[docs] def build_case(self, case_context: dict, case_dir: str) -> None: """ Build the base SFINCS model. This includes setting up the grid, depth, friction, mask, outflow and waterlevel mask. It also applies the precipitation and waterlevel forcing if specified. """ sf = SfincsModel(root=case_dir, mode="w+") sf.setup_grid( x0=case_context["x0"], y0=case_context["y0"], dx=case_context["dx"], dy=case_context["dy"], nmax=case_context["nmax"], mmax=case_context["mmax"], rotation=case_context["rotation"], epsg=case_context["epsg"], ) tstart, tstop = self.set_ctimes(case_context=case_context) sf.config["tstop"] = tstop sf.config["tstart"] = tstart sf.config["dtout"] = 60 sf.config["storemeteo"] = 1 if case_context.get("quickly_waterlevel_forcing") is not None: """ NOTE - There is not a specific Python function yet, but one could call the setup_waterlevel_forcing function twice with saving the files in between and changing their names """ sf.setup_waterlevel_forcing( timeseries=case_context.get("quickly_waterlevel_forcing"), locations=case_context.get("gdf_boundary_points"), ) sf.write_forcing() os.rename(op.join(case_dir, "sfincs.bzs"), op.join(case_dir, "sfincs.bzi")) if case_context.get("slowly_waterlevel_forcing") is not None: sf.setup_waterlevel_forcing( timeseries=case_context.get("slowly_waterlevel_forcing"), locations=case_context.get("gdf_boundary_points"), ) sf.write_forcing() if case_context.get("precipitation_forcing") is not None: sf.setup_precip_forcing_from_grid( precip=case_context.get("precipitation_forcing"), aggregate=False ) if case_context.get("gdf_crs") is not None: sf.setup_observation_lines( locations=case_context.get("gdf_crs"), merge=False ) if case_context.get("gdf_obs") is not None: sf.setup_observation_points(locations=case_context.get("gdf_obs")) # if case_context.get("precipitation_forcing") is not None and case_context.get("waterlevel_forcing") is not None: # self.setup_rivers(sf) # sf.setup_river_inflow( # rivers=case_context.get("precipitation_forcing"), keep_rivers_geom=True # ) # sf.write_forcing() sf.write()