Source code for bluemath_tk.waves.binwaves

import logging
import os
from typing import List, Tuple

import numpy as np
import pandas as pd
import xarray as xr
from dask.diagnostics.progress import ProgressBar
from matplotlib import cm
from matplotlib import pyplot as plt
from matplotlib.colors import ListedColormap
from wavespectra.input.swan import read_swan

from ..core.dask import setup_dask_client
from ..core.plotting.base_plotting import DefaultStaticPlotting


[docs] def generate_swan_cases( frequencies_array: np.ndarray = None, directions_array: np.ndarray = None, direction_range: tuple = (0, 360), direction_divisions: int = 24, direction_sector: tuple = None, frequency_range: tuple = (0.035, 0.5), frequency_divisions: int = 29, gamma: float = 50, spr: float = 2, ) -> xr.Dataset: """ Generate the SWAN cases monocromatic wave parameters. Parameters ---------- frequencies_array : np.ndarray, optional The frequencies array. If None, it is generated using frequency_range and frequency_divisions. directions_array : np.ndarray, optional The directions array. If None, it is generated using direction_range and direction_divisions. direction_range : tuple (min, max) range for directions in degrees. direction_divisions : int Number of directional divisions. frequency_range : tuple (min, max) range for frequencies in Hz. frequency_divisions : int Number of frequency divisions. Returns ------- xr.Dataset The SWAN monocromatic cases Dataset with coordinates freq and dir. """ # Auto-generate directions if not provided if directions_array is None: step = (direction_range[1] - direction_range[0]) / direction_divisions directions_array = np.arange( direction_range[0] + step / 2, direction_range[1], step ) if direction_sector is not None: start, end = direction_sector if start < end: directions_array = directions_array[ (directions_array >= start) & (directions_array <= end) ] else: # caso circular, ej. 270–90 directions_array = directions_array[ (directions_array >= start) | (directions_array <= end) ] # Auto-generate frequencies if not provided if frequencies_array is None: frequencies_array = np.geomspace( frequency_range[0], frequency_range[1], frequency_divisions ) # Constants for SWAN gamma = gamma # waves gamma spr = spr # waves directional spread # Initialize data arrays for each variable hs = np.zeros((len(directions_array), len(frequencies_array))) tp = np.zeros((len(directions_array), len(frequencies_array))) gamma_arr = np.full((len(directions_array), len(frequencies_array)), gamma) spr_arr = np.full((len(directions_array), len(frequencies_array)), spr) # Fill hs and tp arrays for i, freq in enumerate(frequencies_array): period = 1 / freq hs_val = 1.0 if period > 5 else 0.1 hs[:, i] = hs_val tp[:, i] = np.round(period, 4) # Create xarray Dataset ds = xr.Dataset( { "hs": (("dir", "freq"), hs), "tp": (("dir", "freq"), tp), "spr": (("dir", "freq"), spr_arr), "gamma": (("dir", "freq"), gamma_arr), }, coords={ "dir": directions_array, "freq": frequencies_array, }, ) # To get DataFrame if needed: # df = ds.to_dataframe().reset_index() return ds
[docs] def process_kp_coefficients( list_of_input_spectra: List[str], list_of_output_spectra: List[str], ) -> xr.Dataset: """ Process the kp coefficients from the output and input spectra. Parameters ---------- list_of_input_spectra : List[str] The list of input spectra files. list_of_output_spectra : List[str] The list of output spectra files. Returns ------- xr.Dataset The kp coefficients Dataset in frequency and direction. """ output_kp_list = [] for i, (input_spec_file, output_spec_file) in enumerate( zip(list_of_input_spectra, list_of_output_spectra) ): try: input_spec = read_swan(input_spec_file).squeeze().efth output_spec = ( read_swan(output_spec_file) .efth.squeeze() .drop_vars("time") .expand_dims({"case_num": [i]}) ) kp = output_spec / input_spec.sum(dim=["freq", "dir"]) output_kp_list.append(kp) except Exception as e: print(f"Error processing {input_spec_file} and {output_spec_file}") print(e) # Concat files one by one concatened_kp = output_kp_list[0] for file in output_kp_list[1:]: concatened_kp = xr.concat([concatened_kp, file], dim="case_num") return concatened_kp.fillna(0.0).sortby("freq").sortby("dir")
[docs] def reconstruc_spectra( offshore_spectra: xr.Dataset, kp_coeffs: xr.Dataset, num_workers: int = None, memory_limit: float = 0.5, chunk_sizes: dict = {"time": 24}, verbose: bool = False, ): """ Reconstruct the onshore spectra using offshore spectra and kp coefficients. Parameters ---------- offshore_spectra : xr.Dataset The offshore spectra dataset. kp_coeffs : xr.Dataset The kp coefficients dataset. num_workers : int, optional The number of workers to use. Default is None. memory_limit : float, optional The memory limit to use. Default is 0.5. chunk_sizes : dict, optional The chunk sizes to use. Default is {"time": 24}. verbose : bool, optional Whether to print verbose output. Default is False. If False, Dask logs are suppressed. If True, Dask logs are shown. Returns ------- xr.Dataset The reconstructed onshore spectra dataset. """ if not verbose: # Suppress Dask logs logging.getLogger("distributed").setLevel(logging.ERROR) logging.getLogger("distributed.client").setLevel(logging.ERROR) logging.getLogger("distributed.scheduler").setLevel(logging.ERROR) logging.getLogger("distributed.worker").setLevel(logging.ERROR) logging.getLogger("distributed.nanny").setLevel(logging.ERROR) # Also suppress bokeh and tornado logs that Dask uses logging.getLogger("bokeh").setLevel(logging.ERROR) logging.getLogger("tornado").setLevel(logging.ERROR) # Setup Dask client if num_workers is None: num_workers = os.environ.get("BLUEMATH_NUM_WORKERS", 4) client = setup_dask_client(n_workers=num_workers, memory_limit=memory_limit) try: # Process with controlled chunks offshore_spectra_chunked = offshore_spectra.chunk( {"time": chunk_sizes.get("time", 24 * 7)} ) kp_coeffs_chunked = kp_coeffs.chunk({"site": 10}) with ProgressBar(): onshore_spectra = ( (offshore_spectra_chunked * kp_coeffs_chunked).sum(dim="case_num") ).compute() return onshore_spectra finally: client.close()
[docs] def plot_selected_subset_parameters( selected_subset: pd.DataFrame, color: str = "blue", **kwargs, ) -> Tuple[plt.figure, plt.axes]: """ Plot the selected subset parameters. Parameters ---------- selected_subset : pd.DataFrame The selected subset parameters. color : str, optional The color to use in the plot. Default is "blue". **kwargs : dict, optional Additional keyword arguments to be passed to the scatter plot function. Returns ------- plt.figure The figure object containing the plot. plt.axes Array of axes objects for the subplots. """ # Create figure and axes default_static_plot = DefaultStaticPlotting() fig, axes = default_static_plot.get_subplots( nrows=len(selected_subset) - 1, ncols=len(selected_subset) - 1, sharex=False, sharey=False, ) for c1, v1 in enumerate(list(selected_subset.columns)[1:]): for c2, v2 in enumerate(list(selected_subset.columns)[:-1]): default_static_plot.plot_scatter( ax=axes[c2, c1], x=selected_subset[v1], y=selected_subset[v2], c=color, alpha=0.6, **kwargs, ) if c1 == c2: axes[c2, c1].set_xlabel(list(selected_subset.columns)[c1 + 1]) axes[c2, c1].set_ylabel(list(selected_subset.columns)[c2]) elif c1 > c2: axes[c2, c1].xaxis.set_ticklabels([]) axes[c2, c1].yaxis.set_ticklabels([]) else: fig.delaxes(axes[c2, c1]) return fig, axes
[docs] def plot_selected_cases_grid( frequencies: np.ndarray, directions: np.ndarray, figsize: Tuple[int, int] = (8, 8), **kwargs, ): """ Plot the selected subset parameters. Parameters ---------- frequencies : np.ndarray The frequencies array. directions : np.ndarray The directions array. figsize : tuple, optional The figure size. Default is (8, 8). **kwargs : dict, optional Additional keyword arguments to be passed to the pcolormesh function. """ # generate figure and axes fig = plt.figure(figsize=figsize) ax = fig.add_subplot(1, 1, 1, projection="polar") # prepare data x = np.append(np.deg2rad(directions), np.deg2rad(directions)[0]) y = np.append(0, frequencies) z = ( np.array(range(len(frequencies) * len(directions))) .reshape(len(directions), len(frequencies)) .T ) # custom colormap cmn = np.vstack( ( cm.get_cmap("plasma", 124)(np.linspace(0, 0.9, 70)), cm.get_cmap("magma_r", 124)(np.linspace(0.1, 0.4, 80)), cm.get_cmap("rainbow_r", 124)(np.linspace(0.1, 0.8, 80)), cm.get_cmap("Blues_r", 124)(np.linspace(0.4, 0.8, 40)), cm.get_cmap("cubehelix_r", 124)(np.linspace(0.1, 0.8, 80)), ) ) cmn = ListedColormap(cmn, name="cmn") # plot cases id p1 = ax.pcolormesh( x, y, z, vmin=0, vmax=np.nanmax(z), edgecolor="grey", linewidth=0.005, cmap=cmn, shading="flat", **kwargs, ) # customize axes ax.set_theta_zero_location("N", offset=0) ax.set_theta_direction(-1) ax.tick_params( axis="both", colors="black", labelsize=14, pad=10, ) # add colorbar plt.colorbar(p1, pad=0.1, shrink=0.7).set_label("Case ID", fontsize=16)