Source code for bluemath_tk.core.operations

import logging
from typing import Tuple, Union

import numpy as np
import pandas as pd
import pyproj
import xarray as xr
from sklearn.preprocessing import StandardScaler

available_projections = {
    "SPAIN": pyproj.Proj(proj="utm", zone=30, ellps="WGS84"),
}


[docs] def normalize( data: Union[pd.DataFrame, xr.Dataset], custom_scale_factor: dict = {}, logger: logging.Logger = None, ) -> Tuple[Union[pd.DataFrame, xr.Dataset], dict]: """ Normalize data to 0-1 using min max scaler approach. Parameters ---------- data : pd.DataFrame or xr.Dataset Input data to be normalized. custom_scale_factor : dict, optional Dictionary with variables as keys and a list with two values as values. The first value is the minimum and the second value is the maximum used to normalize the variable. If not provided, the minimum and maximum values of the variable are used. logger : logging.Logger, optional Logger object to log warnings if the custom min or max is bigger or lower than the datapoints. Returns ------- normalized_data : pd.DataFrame or xr.Dataset Normalized data. scale_factor : dict Dictionary with variables as keys and a list with two values as values. The first value is the minimum and the second value is the maximum used to normalize the variable. Notes ----- - This method does not modify the input data, it creates a copy of the dataframe / dataset and normalizes it. - The normalization is done variable by variable, i.e. the minimum and maximum values are calculated for each variable. - If custom min or max is bigger or lower than the datapoints, it will be changed to the minimum or maximum of the datapoints and a warning will be logged. Examples -------- >>> import numpy as np >>> import pandas as pd >>> from bluemath_tk.core.operations import normalize >>> df = pd.DataFrame( ... { ... "Hs": np.random.rand(1000) * 7, ... "Tp": np.random.rand(1000) * 20, ... "Dir": np.random.rand(1000) * 360, ... } ... ) >>> normalized_data, scale_factor = normalize(data=df) >>> import numpy as np >>> import xarray as xr >>> from bluemath_tk.core.data import normalize >>> ds = xr.Dataset( ... { ... "Hs": (("time",), np.random.rand(1000) * 7), ... "Tp": (("time",), np.random.rand(1000) * 20), ... "Dir": (("time",), np.random.rand(1000) * 360), ... }, ... coords={"time": pd.date_range("2000-01-01", periods=1000)}, ... ) >>> normalized_data, scale_factor = normalize(data=ds) """ if isinstance(data, pd.DataFrame): vars_to_normalize = list(data.columns) elif isinstance(data, xr.Dataset): vars_to_normalize = list(data.data_vars) else: raise TypeError("Data must be a pandas DataFrame or an xarray Dataset") normalized_data = data.copy() # Copy data to avoid bad memory replacements scale_factor = ( custom_scale_factor.copy() ) # Copy dict to avoid bad memory replacements for data_var in vars_to_normalize: data_var_min = normalized_data[data_var].min() data_var_max = normalized_data[data_var].max() if custom_scale_factor.get(data_var): if custom_scale_factor.get(data_var)[0] > data_var_min: if logger is not None: logger.info( f"Proposed min custom scaler for {data_var} is bigger than datapoint" # , using smallest datapoint ) else: print( f"Proposed min custom scaler for {data_var} is bigger than datapoint" # , using smallest datapoint ) # scale_factor[data_var][0] = data_var_min # else: data_var_min = custom_scale_factor.get(data_var)[0] if custom_scale_factor.get(data_var)[1] < data_var_max: if logger is not None: logger.info( f"Proposed max custom scaler for {data_var} is lower than datapoint" # , using biggest datapoint ) else: print( f"Proposed max custom scaler for {data_var} is lower than datapoint" # , using biggest datapoint ) # scale_factor[data_var][1] = data_var_max # else: data_var_max = custom_scale_factor.get(data_var)[1] else: scale_factor[data_var] = [data_var_min, data_var_max] normalized_data[data_var] = (normalized_data[data_var] - data_var_min) / ( data_var_max - data_var_min ) return normalized_data, scale_factor
[docs] def denormalize( normalized_data: Union[pd.DataFrame, xr.Dataset], scale_factor: dict, ) -> Union[pd.DataFrame, xr.Dataset]: """ Denormalize data using provided scale_factor. Parameters ---------- normalized_data : pd.DataFrame or xr.Dataset Input data that has been normalized and needs to be denormalized. scale_factor : dict Dictionary with variables as keys and a list with two values as values. The first value is the minimum and the second value is the maximum used to denormalize the variable. Returns ------- data : pd.DataFrame or xr.Dataset Denormalized data. Notes ----- - This method does not modify the input data, it creates a copy of the dataframe / dataset and denormalizes it. - The denormalization is done variable by variable, i.e. the minimum and maximum values are used to scale the data back to its original range. - Assumes that the scale_factor dictionary contains appropriate min and max values for each variable in the normalized_data. Examples -------- >>> import numpy as np >>> import pandas as pd >>> from bluemath_tk.core.operation import denormalize >>> df = pd.DataFrame( ... { ... "Hs": np.random.rand(1000), ... "Tp": np.random.rand(1000), ... "Dir": np.random.rand(1000), ... } ... ) >>> scale_factor = { ... "Hs": [0, 7], ... "Tp": [0, 20], ... "Dir": [0, 360], ... } >>> denormalized_data = denormalize(normalized_data=df, scale_factor=scale_factor) >>> import numpy as np >>> import xarray as xr >>> from bluemath_tk.core.operations import denormalize >>> ds = xr.Dataset( ... { ... "Hs": (("time",), np.random.rand(1000)), ... "Tp": (("time",), np.random.rand(1000)), ... "Dir": (("time",), np.random.rand(1000)), ... }, ... coords={"time": pd.date_range("2000-01-01", periods=1000)}, ... ) >>> scale_factor = { ... "Hs": [0, 7], ... "Tp": [0, 20], ... "Dir": [0, 360], ... } >>> denormalized_data = denormalize(normalized_data=ds, scale_factor=scale_factor) """ if isinstance(normalized_data, pd.DataFrame): vars_to_denormalize = list(normalized_data.columns) elif isinstance(normalized_data, xr.Dataset): vars_to_denormalize = list(normalized_data.data_vars) else: raise TypeError("Data must be a pandas DataFrame or an xarray Dataset") data = normalized_data.copy() # Copy data to avoid bad memory replacements for data_var in vars_to_denormalize: data[data_var] = ( data[data_var] * (scale_factor[data_var][1] - scale_factor[data_var][0]) + scale_factor[data_var][0] ) return data
[docs] def standarize( data: Union[np.ndarray, pd.DataFrame, xr.Dataset], scaler: StandardScaler = None, transform: bool = False, ) -> Tuple[Union[np.ndarray, pd.DataFrame, xr.Dataset], StandardScaler]: """ Standarize data to have mean 0 and std 1. Parameters ---------- data : np.ndarray, pd.DataFrame or xr.Dataset Input data to be standarized. scaler : StandardScaler, optional Scaler object to use for standarization. Default is None. transform : bool Whether to just transform the data. Default to False. Returns ------- standarized_data : np.ndarray, pd.DataFrame or xr.Dataset Standarized data. scaler : StandardScaler Scaler object used for standarization. Examples -------- >>> import numpy as np >>> from bluemath_tk.core.operations import standarize >>> data = np.random.rand(1000, 3) * 10.0 >>> standarized_data, scaler = standarize(data=data) """ scaler = scaler or StandardScaler() if isinstance(data, np.ndarray): if transform: standarized_data = scaler.transform(X=data) else: standarized_data = scaler.fit_transform(X=data) elif isinstance(data, pd.DataFrame): if transform: standarized_data = scaler.transform(X=data.values) else: standarized_data = scaler.fit_transform(X=data.values) standarized_data = pd.DataFrame(standarized_data, columns=data.columns) elif isinstance(data, xr.Dataset): if transform: standarized_data = scaler.transform(X=data.to_array().values) else: standarized_data = scaler.fit_transform(X=data.to_array().values) standarized_data = xr.Dataset( { var_name: (tuple(data.coords), standarized_data[i_var]) for i_var, var_name in enumerate(data.data_vars) }, coords=data.coords, ) return standarized_data, scaler
[docs] def destandarize( standarized_data: Union[np.ndarray, pd.DataFrame, xr.Dataset], scaler: StandardScaler, ) -> Union[np.ndarray, pd.DataFrame, xr.Dataset]: """ Destandarize data using provided scaler. Parameters ---------- standarized_data : np.ndarray, pd.DataFrame or xr.Dataset Standarized data to be destandarized. scaler : StandardScaler Scaler object used for standarization. Returns ------- np.ndarray, pd.DataFrame or xr.Dataset Destandarized data. Examples -------- >>> import numpy as np >>> from bluemath_tk.core.data import standarize, destandarize >>> data = np.random.rand(1000, 3) * 10.0 >>> standarized_data, scaler = standarize(data=data) >>> data = destandarize(standarized_data=standarized_data, scaler=scaler) """ if isinstance(standarized_data, np.ndarray): data = scaler.inverse_transform(X=standarized_data) elif isinstance(standarized_data, pd.DataFrame): data = scaler.inverse_transform(X=standarized_data.values) data = pd.DataFrame(data, columns=standarized_data.columns) elif isinstance(standarized_data, xr.Dataset): data = scaler.inverse_transform(X=standarized_data.to_array().values) data = xr.Dataset( { var_name: (tuple(standarized_data.coords), data[i_var]) for i_var, var_name in enumerate(standarized_data.data_vars) }, coords=standarized_data.coords, ) return data
[docs] def get_uv_components(x_deg: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: """ This method calculates the u and v components for the given directional data. Here, we assume that the directional data is in degrees, beign 0° the North direction, and increasing clockwise. 0° N | | 270° W <---------> 90° E | | 90° S Parameters ---------- x_deg : np.ndarray The directional data in degrees. Returns ------- Tuple[np.ndarray, np.ndarray] The u and v components. """ # Convert degrees to radians and adjust by subtracting from π/2 x_rad = x_deg * np.pi / 180 # Calculate x and y components using cosine and sine xu = np.sin(x_rad) xv = np.cos(x_rad) # Return the u and v components return xu, xv
[docs] def get_degrees_from_uv(xu: np.ndarray, xv: np.ndarray) -> np.ndarray: """ This method calculates the degrees from the u and v components. Here, we assume u and v represent angles between 0 and 360 degrees, where 0° is the North direction, and increasing clockwise. (u=0, v=1) | | (u=-1, v=0) <---------> (u=1, v=0) | | (u=0, v=-1) Parameters ---------- xu : np.ndarray The u component. xv : np.ndarray The v component. Returns ------- np.ndarray The degrees. """ # Calculate the degrees using the arctangent function x_deg = np.arctan2(xu, xv) * 180 / np.pi % 360 # Return the degrees return x_deg
[docs] def convert_utm_to_lonlat( utm_x: np.ndarray, utm_y: np.ndarray, projection: Union[int, str, dict, pyproj.CRS], ) -> Tuple[np.ndarray, np.ndarray]: """ This method converts UTM coordinates to Longitude and Latitude. Parameters ---------- utm_x : np.ndarray The x values in UTM. utm_y : np.ndarray The y values in UTM. projection : int, str, dict, pyproj.CRS The projection to use for the transformation. Returns ------- Tuple[np.ndarray, np.ndarray] The longitude and latitude values. """ if isinstance(projection, str): projection = available_projections.get(projection, projection) # Transform the UTM to LonLat coordinates reshape = False if utm_x.size != utm_y.size: reshape_size = (utm_y.size, utm_x.size) utm_x, utm_y = ( np.meshgrid(utm_x, utm_y)[0].reshape(-1), np.meshgrid(utm_x, utm_y)[1].reshape(-1), ) reshape = True lon, lat = projection(utm_x, utm_y, inverse=True) if reshape: lon, lat = ( lon.reshape(*reshape_size)[0, :], lat.reshape(*reshape_size)[:, 0], ) # Return the LonLat coordinates return lon, lat
[docs] def convert_lonlat_to_utm( lon: np.ndarray, lat: np.ndarray, projection: Union[int, str, dict, pyproj.CRS], ) -> Tuple[np.ndarray, np.ndarray]: """ This method converts Longitude and Latitude to UTM coordinates. Parameters ---------- lon : np.ndarray The longitude values. lat : np.ndarray The latitude values. projection : int, str, dict, pyproj.CRS The projection to use for the transformation. Returns ------- Tuple[np.ndarray, np.ndarray] The x and y coordinates in UTM. """ if isinstance(projection, str): projection = available_projections.get(projection, projection) # Transform the LonLat to UTM coordinates reshape = False if lon.size != lat.size: reshape_size = (lat.size, lon.size) lon, lat = ( np.meshgrid(lon, lat)[0].reshape(-1), np.meshgrid(lon, lat)[1].reshape(-1), ) reshape = True utm_x, utm_y = projection(lon, lat) if reshape: utm_x, utm_y = ( utm_x.reshape(*reshape_size)[0, :], utm_y.reshape(*reshape_size)[:, 0], ) # Return the UTM coordinates return utm_x, utm_y
[docs] def spatial_gradient(data: xr.DataArray) -> xr.DataArray: """ Calculate spatial gradient of a DataArray with dimensions (time, latitude, longitude). Parameters ---------- data : xr.DataArray Input data with dimensions (time, latitude, longitude). Returns ------- xr.DataArray Gradient magnitude with same dimensions as input. Notes ----- The gradient is calculated using central differences, accounting for latitude-dependent grid spacing in spherical coordinates. """ # Initialize gradient array var_grad = xr.zeros_like(data) # Get latitude values in radians for spherical coordinate correction lat_rad = np.pi * np.abs(data.latitude.values) / 180.0 # Calculate gradients using vectorized operations for t in range(len(data.time)): var_val = data.isel(time=t).values # calculate gradient (matrix) m_c = var_val[1:-1, 1:-1] m_l = np.roll(var_val, -1, axis=1)[1:-1, 1:-1] m_r = np.roll(var_val, +1, axis=1)[1:-1, 1:-1] m_u = np.roll(var_val, -1, axis=0)[1:-1, 1:-1] m_d = np.roll(var_val, +1, axis=0)[1:-1, 1:-1] m_phi = lat_rad[1:-1] dpx1 = (m_c - m_l) / np.cos(m_phi[:, None]) dpx2 = (m_r - m_c) / np.cos(m_phi[:, None]) dpy1 = m_c - m_d dpy2 = m_u - m_c vg = (dpx1**2 + dpx2**2) / 2 + (dpy1**2 + dpy2**2) / 2 var_grad[t, 1:-1, 1:-1] = vg # Set attributes var_grad.attrs["units"] = "m^2/s^2" var_grad.attrs["name"] = "Gradient" return var_grad
[docs] def nautical_to_mathematical(nautical_degrees: np.ndarray) -> np.ndarray: """ Convert nautical degrees (0° at North, clockwise) to mathematical degrees (0° at East, counterclockwise) Parameters ---------- nautical_degrees : np.ndarray Directional angle in nautical convention Returns ------- np.ndarray Directional angle in mathematical convention """ # Convert nautical degrees to mathematical degrees return (90 - nautical_degrees) % 360
[docs] def mathematical_to_nautical(math_degrees: np.ndarray) -> np.ndarray: """ Convert mathematical degrees (0° at East, counterclockwise) to nautical degrees (0° at North, clockwise) Parameters ---------- math_degrees : float or array-like Directional angle in mathematical convention Returns ------- np.ndarray Directional angle in nautical convention """ # Rotate the angle by 360 degrees if math_degrees == 0: reversed_angle = 0 else: reversed_angle = 360 - math_degrees # Convert mathematical degrees to nautical degrees return (reversed_angle + 90) % 360