Source code for bluemath_tk.waves.superpoint

from typing import Dict, Tuple

import xarray as xr


[docs] def superpoint_calculation( stations_data: xr.DataArray, stations_dimension_name: str, sectors_for_each_station: Dict[str, Tuple[float, float]], overlap_angle: float = 0.0, ) -> xr.DataArray: """ Join multiple station spectral data for each directional sector. Parameters ---------- stations_data : xr.DataArray DataArray containing spectral data for multiple stations. stations_dimension_name : str Name of the dimension representing different stations in the DataArray. sectors_for_each_station : Dict[str, Tuple[float, float]] Dictionary mapping each station ID to a tuple of (min_direction, max_direction) representing the directional sector for that station. overlap_angle : float Angle in degrees that defines the overlap between two sectors. Returns ------- xr.DataArray A new DataArray where each point is the sum of spectral data from all stations for the specified directional sector. Notes ----- If your stations_data is saved in different files, you can load them all and then concatenate them using xr.open_mfdataset function. Example below: ```python files = [ "path/to/station1.nc", "path/to/station2.nc", "path/to/station3.nc" ] def load_station_data(ds: xr.Dataset) -> xr.DataArray: return ds.efth.expand_dims("station") stations_data = xr.open_mfdataset( files, concat_dim="station", preprocess=load_station_data, ) ``` """ superpoint_dataarray = xr.zeros_like( stations_data.isel({stations_dimension_name: 0}) ) if overlap_angle == 0: for station_id, (dir_min, dir_max) in sectors_for_each_station.items(): if dir_min < dir_max: mask = (stations_data["dir"] >= dir_min) & ( stations_data["dir"] < dir_max ) else: # Handle wrap-around (e.g., 350° to 10°) mask = (stations_data["dir"] >= dir_min) | ( stations_data["dir"] < dir_max ) superpoint_dataarray += stations_data.sel( {stations_dimension_name: station_id} ).where(mask, 0.0) else: # With overlap - expand sectors and average overlaps directions = stations_data["dir"] count_array = xr.zeros_like(superpoint_dataarray) # Counter for overlaps for station_id, (dir_min, dir_max) in sectors_for_each_station.items(): station_data = stations_data.sel({stations_dimension_name: station_id}) # Expand sector boundaries by overlap_angle if (dir_max - dir_min) < 0: mask = (directions >= dir_min - overlap_angle) | ( directions <= dir_max + overlap_angle ) else: mask = (directions >= dir_min - overlap_angle) & ( directions <= dir_max + overlap_angle ) # Add contribution where mask is true superpoint_dataarray += station_data.where(mask, 0.0) count_array += xr.where(mask, 1, 0) # Average where there are overlaps (count > 1) overlap_mask = count_array > 1 superpoint_dataarray = xr.where( overlap_mask, superpoint_dataarray / count_array, superpoint_dataarray ) return superpoint_dataarray