Source code for bluemath_tk.interpolation.rbf_scipy

from typing import List
import copy
import time
import numpy as np
import pandas as pd
from scipy.optimize import fmin, fminbound
from scipy.interpolate import RBFInterpolator
from sklearn.model_selection import KFold
from sklearn.metrics import mean_squared_error
from ._base_interpolation import BaseInterpolation
from ..core.decorators import validate_data_rbf


[docs] class RBFError(Exception): """ Custom exception for RBF class. """ def __init__(self, message: str = "RBF error occurred."): self.message = message super().__init__(self.message)
[docs] class RBF(BaseInterpolation): """ Radial Basis Function (RBF) interpolation model. Here, scipy's RBFInterpolator is used to interpolate the data. https://docs.scipy.org/doc/scipy/reference/generated/scipy.interpolate.RBFInterpolator.html Warnings -------- - This class is a Beta, results may not be accurate. See Also -------- bluemath_tk.interpolation.RBF : The stable version for this model. Attributes ---------- sigma_min : float The minimum value for the sigma parameter. This value might change in the optimization process. sigma_max : float The maximum value for the sigma parameter. This value might change in the optimization process. sigma_opt : float The optimal value for the sigma parameter. kernel : str Type of RBF. This should be one of - 'linear' : ``-r`` - 'thin_plate_spline' : ``r**2 * log(r)`` - 'cubic' : ``r**3`` - 'quintic' : ``-r**5`` - 'multiquadric' : ``-sqrt(1 + r**2)`` - 'inverse_multiquadric' : ``1/sqrt(1 + r**2)`` - 'inverse_quadratic' : ``1/(1 + r**2)`` - 'gaussian' : ``exp(-r**2)`` smoothing : float or (npoints, ) array_like Smoothing parameter. The interpolant perfectly fits the data when this is set to 0. For large values, the interpolant approaches a least squares fit of a polynomial with the specified degree. degree : int Degree of the added polynomial. For some RBFs the interpolant may not be well-posed if the polynomial degree is too small. Those RBFs and their corresponding minimum degrees are - 'multiquadric' : 0 - 'linear' : 0 - 'thin_plate_spline' : 1 - 'cubic' : 1 - 'quintic' : 2 The default value is the minimum degree for `kernel` or 0 if there is no minimum degree. Set this to -1 for no added polynomial. neighbors : int If specified, the value of the interpolant at each evaluation point will be computed using only this many nearest data points. All the data points are used by default. rbfs : dict Dict with RBFInterpolator instances. subset_data : pd.DataFrame The subset data used to fit the model. normalized_subset_data : pd.DataFrame The normalized subset data used to fit the model. target_data : pd.DataFrame The target data used to fit the model. normalized_target_data : pd.DataFrame The normalized target data used to fit the model. This attribute is only set if normalize_target_data is True in the fit method. subset_directional_variables : List[str] The subset directional variables. target_directional_variables : List[str] The target directional variables. subset_processed_variables : List[str] The subset processed variables. target_processed_variables : List[str] The target processed variables. subset_custom_scale_factor : dict The custom scale factor for the subset data. target_custom_scale_factor : dict The custom scale factor for the target data. subset_scale_factor : dict The scale factor for the subset data. target_scale_factor : dict The scale factor for the target data. rbf_coeffs : pd.DataFrame The RBF coefficients for the target variables. opt_sigmas : dict The optimal sigmas for the target variables. Methods ------- fit(...) : Fits the model to the data. predict(...) : Predicts the data for the provided dataset. fit_predict(...) : Fits the model to the subset and predicts the interpolated dataset. References ---------- .. [1] Fasshauer, G., 2007. Meshfree Approximation Methods with Matlab. World Scientific Publishing Co. .. [2] http://amadeus.math.iit.edu/~fass/603_ch3.pdf .. [3] Wahba, G., 1990. Spline Models for Observational Data. SIAM. .. [4] http://pages.stat.wisc.edu/~wahba/stat860public/lect/lect8/lect8.pdf Notes ----- .. versionadded:: 1.0.3 TODO: For the moment, this class only supports optimization for one parameter kernels. For this reason, we only have sigma as the parameter to optimize. This sigma refers to the sigma parameter in the Gaussian kernel (but is used for all kernels). """ def __init__( self, sigma_min: float = 0.001, sigma_max: float = 1.0, sigma_opt: float = None, kernel: str = "thin_plate_spline", smoothing: float = 0.0, degree: int = None, neighbors: int = None, ): """ Initializes the RBF model. Parameters ---------- sigma_min : float, optional The minimum value for the sigma parameter. Default is 0.001. sigma_max : float, optional The maximum value for the sigma parameter. Default is 1.0. sigma_opt : float, optional The optimal value for the sigma parameter. Default is None. kernel : str, optional Type of RBF. Default is 'thin_plate_spline'. smoothing : float, optional Smoothing parameter. Default is 0.0. degree : int, optional Degree of the added polynomial. Default is None. neighbors : int, optional If specified, the value of the interpolant at each evaluation point will be computed using only this many nearest data points. Default is None. """ super().__init__() self.set_logger_name(name=self.__class__.__name__) if not isinstance(sigma_min, float) or sigma_min < 0: raise ValueError("sigma_min must be a positive float.") self._sigma_min = sigma_min if not isinstance(sigma_max, float) or sigma_max < sigma_min: raise ValueError( "sigma_max must be a positive float greater than sigma_min." ) self._sigma_max = sigma_max if sigma_opt is not None: if not isinstance(sigma_opt, float) or sigma_opt < 0: raise ValueError("sigma_opt must be a positive float.") self._sigma_opt = sigma_opt if not isinstance(kernel, str): raise ValueError("kernel must be a string.") self._kernel = kernel if not isinstance(smoothing, float): raise ValueError("smoothing must be a float.") self._smoothing = smoothing if not isinstance(degree, int) and degree is not None: raise ValueError("degree must be an integer.") self._degree = degree if not isinstance(neighbors, int) and neighbors is not None: raise ValueError("neighbors must be an integer.") self._neighbors = neighbors self._rbfs: dict = {} # Dict with RBFInterpolator instances # Below, we initialize the attributes that will be set in the fit method self.is_fitted: bool = False self.is_target_normalized: bool = False self._subset_data: pd.DataFrame = pd.DataFrame() self._normalized_subset_data: pd.DataFrame = pd.DataFrame() self._target_data: pd.DataFrame = pd.DataFrame() self._normalized_target_data: pd.DataFrame = pd.DataFrame() self._subset_directional_variables: List[str] = [] self._target_directional_variables: List[str] = [] self._subset_processed_variables: List[str] = [] self._target_processed_variables: List[str] = [] self._subset_custom_scale_factor: dict = {} self._target_custom_scale_factor: dict = {} self._subset_scale_factor: dict = {} self._target_scale_factor: dict = {} self._rbf_coeffs: pd.DataFrame = pd.DataFrame() self._opt_sigmas: dict = {} @property def sigma_min(self) -> float: return self._sigma_min @property def sigma_max(self) -> float: return self._sigma_max @property def sigma_opt(self) -> float: return self._sigma_opt @property def kernel(self) -> str: return self._kernel @property def smoothing(self) -> float: return self._smoothing @property def degree(self) -> int: return self._degree @property def neighbors(self) -> int: return self._neighbors @property def rbfs(self) -> dict: return self._rbfs @property def subset_data(self) -> pd.DataFrame: return self._subset_data @property def normalized_subset_data(self) -> pd.DataFrame: return self._normalized_subset_data @property def target_data(self) -> pd.DataFrame: return self._target_data @property def normalized_target_data(self) -> pd.DataFrame: if self._normalized_target_data.empty: raise ValueError("Target data is not normalized.") return self._normalized_target_data @property def subset_directional_variables(self) -> List[str]: return self._subset_directional_variables @property def target_directional_variables(self) -> List[str]: return self._target_directional_variables @property def subset_processed_variables(self) -> List[str]: return self._subset_processed_variables @property def target_processed_variables(self) -> List[str]: return self._target_processed_variables @property def subset_custom_scale_factor(self) -> dict: return self._subset_custom_scale_factor @property def target_custom_scale_factor(self) -> dict: return self._target_custom_scale_factor @property def subset_scale_factor(self) -> dict: return self._subset_scale_factor @property def target_scale_factor(self) -> dict: return self._target_scale_factor @property def rbf_coeffs(self) -> pd.DataFrame: return self._rbf_coeffs @property def opt_sigmas(self) -> dict: if not self._opt_sigmas: raise ValueError("Specified kernel does not require optimization.") return self._opt_sigmas def _preprocess_subset_data( self, subset_data: pd.DataFrame, is_fit: bool = True ) -> pd.DataFrame: """ This function preprocesses the subset data. Parameters ---------- subset_data : pd.DataFrame The subset data to preprocess (could be a dataset to predict). is_fit : bool, optional Whether the data is to fit or not. Default is True. Returns ------- pd.DataFrame The preprocessed subset data. Raises ------ ValueError If the subset contains NaNs. Notes ----- - This function preprocesses the subset data by: - Checking for NaNs. - Preprocessing directional variables. - Normalizing the data. """ # Make copies to avoid modifying the original data subset_data = subset_data.copy() self.logger.info("Checking for NaNs in subset data") subset_data = self.check_nans(data=subset_data, raise_error=True) self.logger.info("Preprocessing subset data") for directional_variable in self.subset_directional_variables: var_u_component, var_y_component = self.get_uv_components( x_deg=subset_data[directional_variable].values ) subset_data[f"{directional_variable}_u"] = var_u_component subset_data[f"{directional_variable}_v"] = var_y_component # Drop the original directional variable in subset_data subset_data.drop(columns=[directional_variable], inplace=True) self._subset_processed_variables = list(subset_data.columns) self.logger.info("Normalizing subset data") normalized_subset_data, subset_scale_factor = self.normalize( data=subset_data, custom_scale_factor=self.subset_custom_scale_factor if is_fit else self.subset_scale_factor, ) self.logger.info("Subset data preprocessed successfully") if is_fit: self._subset_data = subset_data self._normalized_subset_data = normalized_subset_data self._subset_scale_factor = subset_scale_factor return normalized_subset_data.copy() def _preprocess_target_data( self, target_data: pd.DataFrame, normalize_target_data: bool = True, ) -> pd.DataFrame: """ This function preprocesses the target data. Parameters ---------- target_data : pd.DataFrame The target data to preprocess. normalize_target_data : bool, optional Whether to normalize the target data. Default is True. Returns ------- pd.DataFrame The preprocessed target data. Raises ------ ValueError If the target contains NaNs. Notes ----- - This function preprocesses the target data by: - Checking for NaNs. - Preprocessing directional variables. - Normalizing the data. """ # Make copies to avoid modifying the original data target_data = target_data.copy() self.logger.info("Checking for NaNs in target data") target_data = self.check_nans(data=target_data, raise_error=True) self.logger.info("Preprocessing target data") for directional_variable in self.target_directional_variables: var_u_component, var_y_component = self.get_uv_components( x_deg=target_data[directional_variable].values ) target_data[f"{directional_variable}_u"] = var_u_component target_data[f"{directional_variable}_v"] = var_y_component # Drop the original directional variable in target_data target_data.drop(columns=[directional_variable], inplace=True) self._target_processed_variables = list(target_data.columns) if normalize_target_data: self.logger.info("Normalizing target data") normalized_target_data, target_scale_factor = self.normalize( data=target_data, custom_scale_factor=self.target_custom_scale_factor, ) self.is_target_normalized = True self._target_data = target_data.copy() self._normalized_target_data = normalized_target_data.copy() self._target_scale_factor = target_scale_factor.copy() self.logger.info("Target data preprocessed successfully") return normalized_target_data.copy() else: self.is_target_normalized = False self._target_data = target_data.copy() self._normalized_target_data = pd.DataFrame() self._target_scale_factor = {} self.logger.info("Target data preprocessed successfully") return target_data.copy() def _cost_sigma( self, sigma: float, x: np.ndarray, y: np.ndarray, k: int = 5 ) -> float: """ Calculate the cost for a given sigma using K-Fold cross-validation. Parameters ---------- sigma : float The sigma parameter for the kernel. x : np.ndarray The input data. y : np.ndarray The target data. k : int, optional The number of folds for cross-validation. Default is 5. Returns ------- float The total cost for the RBF interpolation. """ kf = KFold(n_splits=k) total_cost = 0.0 for train_index, val_index in kf.split(x): x_train, x_val = x[train_index], x[val_index] y_train, y_val = y[train_index], y[val_index] # Instantiate the RBFInterpolator rbf = RBFInterpolator( y=x_train, d=y_train, neighbors=self.neighbors, smoothing=self.smoothing, kernel=self.kernel, epsilon=sigma, degree=self.degree, ) # Predict on the validation set predicted_y = rbf(x_val) # Calculate the cost (mean squared error) cost = mean_squared_error(y_val, predicted_y) total_cost += cost return total_cost / k def _calc_opt_sigma( self, target_variable: np.ndarray, subset_variables: np.ndarray, iteratively_update_sigma: bool = False, ) -> RBFInterpolator: """ This function calculates the optimal sigma for the given target variable. Parameters ---------- target_variable : np.ndarray The target variable to interpolate. subset_variables : np.ndarray The subset variables used to interpolate. iteratively_update_sigma : bool, optional Whether to iteratively update the sigma parameter. Default is False. Returns ------- float The optimal sigma. """ t0 = time.time() # Optimize sigma using fminbound or fmin if self.sigma_opt is not None: opt_sigma = fmin( func=self._cost_sigma, x0=self.sigma_opt, args=(subset_variables, target_variable), disp=0, )[-1] if iteratively_update_sigma: self._sigma_opt = opt_sigma else: opt_sigma = fminbound( func=self._cost_sigma, x1=self.sigma_min, x2=self.sigma_max, args=(subset_variables, target_variable), disp=0, ) # Save the fitted RBF for the optimal sigma rbf = RBFInterpolator( y=subset_variables, d=target_variable, neighbors=self.neighbors, smoothing=self.smoothing, kernel=self.kernel, epsilon=opt_sigma, degree=self.degree, ) # Calculate the time taken to optimize sigma t1 = time.time() self.logger.info(f"Optimal sigma: {opt_sigma} - Time: {t1 - t0:.2f} seconds") return rbf, opt_sigma
[docs] @validate_data_rbf def fit( self, subset_data: pd.DataFrame, target_data: pd.DataFrame, subset_directional_variables: List[str] = [], target_directional_variables: List[str] = [], subset_custom_scale_factor: dict = {}, normalize_target_data: bool = True, target_custom_scale_factor: dict = {}, num_threads: int = None, iteratively_update_sigma: bool = False, ) -> None: """ Fits the model to the data. Parameters ---------- subset_data : pd.DataFrame The subset data used to fit the model. target_data : pd.DataFrame The target data used to fit the model. subset_directional_variables : List[str], optional The subset directional variables. Default is []. target_directional_variables : List[str], optional The target directional variables. Default is []. subset_custom_scale_factor : dict, optional The custom scale factor for the subset data. Default is {}. normalize_target_data : bool, optional Whether to normalize the target data. Default is True. target_custom_scale_factor : dict, optional The custom scale factor for the target data. Default is {}. num_threads : int, optional The number of threads to use for the optimization. Default is None. iteratively_update_sigma : bool, optional Whether to iteratively update the sigma parameter. Default is False. Notes ----- - This function fits the RBF model to the data by: 1. Preprocessing the subset and target data. 2. Calculating the optimal sigma for the target variables. 3. Storing the RBF coefficients and optimal sigmas. - The number of threads to use for the optimization can be specified. """ if num_threads is not None: self.set_num_processors_to_use(num_processors=num_threads) self.logger.info(f"Using {num_threads} threads for optimization.") self._subset_directional_variables = subset_directional_variables self._target_directional_variables = target_directional_variables self._subset_custom_scale_factor = subset_custom_scale_factor self._target_custom_scale_factor = target_custom_scale_factor subset_data = self._preprocess_subset_data(subset_data=subset_data) target_data = self._preprocess_target_data( target_data=target_data, normalize_target_data=normalize_target_data, ) self.logger.info("Fitting RBF model to the data") # RBF fitting for all variables rbf_coeffs, opt_sigmas = {}, {} # Optimize sigma for each target variable for target_var in target_data.columns: self.logger.info(f"Fitting RBF for variable {target_var}") target_var_values = target_data[target_var].values if ( self.kernel == "linear" or self.kernel == "cubic" or self.kernel == "quintic" or self.kernel == "thin_plate_spline" ): rbf = RBFInterpolator( y=subset_data.values, d=target_var_values, neighbors=self.neighbors, smoothing=self.smoothing, kernel=self.kernel, degree=self.degree, ) opt_sigma = None else: rbf, opt_sigma = self._calc_opt_sigma( target_variable=target_var_values, subset_variables=subset_data.values, iteratively_update_sigma=iteratively_update_sigma, ) self.rbfs[target_var] = copy.deepcopy(rbf) rbf_coeffs[target_var] = rbf._coeffs.flatten() opt_sigmas[target_var] = opt_sigma # Store the RBF coefficients and optimal sigmas self._rbf_coeffs = pd.DataFrame(rbf_coeffs) self._opt_sigmas = opt_sigmas # Set the is_fitted attribute to True self.is_fitted = True
[docs] def predict(self, dataset: pd.DataFrame) -> pd.DataFrame: """ Predicts the data for the provided dataset. Parameters ---------- dataset : pd.DataFrame The dataset to predict (must have same variables than subset). Returns ------- pd.DataFrame The interpolated dataset. Raises ------ ValueError If the model is not fitted. Notes ----- - This function predicts the data by: 1. Reconstructing the data using the fitted coefficients. 2. Denormalizing the target data if normalize_target_data is True. 3. Calculating the degrees for the target directional variables. """ if self.is_fitted is False: raise RBFError("RBF model must be fitted before predicting.") self.logger.info("Reconstructing data using fitted coefficients.") normalized_dataset = self._preprocess_subset_data( subset_data=dataset, is_fit=False ) # Create an empty array to store the interpolated target data interpolated_target_array = np.zeros( (normalized_dataset.shape[0], len(self.target_processed_variables)) ) for target_var in self.target_processed_variables: self.logger.info(f"Predicting target variable {target_var}") rbf = self.rbfs[target_var] interpolated_target_array[ :, self.target_processed_variables.index(target_var) ] = rbf(normalized_dataset.values) interpolated_target = pd.DataFrame( data=interpolated_target_array, columns=self.target_processed_variables ) # Denormalize the target data if normalize_target_data is True if self.is_target_normalized: self.logger.info("Denormalizing target data") interpolated_target = self.denormalize( normalized_data=interpolated_target, scale_factor=self.target_scale_factor, ) # Calculate the degrees for the target directional variables for directional_variable in self.target_directional_variables: self.logger.info(f"Calculating target degrees for {directional_variable}") interpolated_target[directional_variable] = self.get_degrees_from_uv( xu=interpolated_target[f"{directional_variable}_u"].values, xv=interpolated_target[f"{directional_variable}_v"].values, ) return interpolated_target
[docs] def fit_predict( self, subset_data: pd.DataFrame, target_data: pd.DataFrame, dataset: pd.DataFrame, subset_directional_variables: List[str] = [], target_directional_variables: List[str] = [], subset_custom_scale_factor: dict = {}, normalize_target_data: bool = True, target_custom_scale_factor: dict = {}, num_threads: int = None, iteratively_update_sigma: bool = False, ) -> pd.DataFrame: """ Fits the model to the subset and predicts the interpolated dataset. Parameters ---------- subset_data : pd.DataFrame The subset data used to fit the model. target_data : pd.DataFrame The target data used to fit the model. dataset : pd.DataFrame The dataset to predict (must have same variables than subset). subset_directional_variables : List[str], optional The subset directional variables. Default is []. target_directional_variables : List[str], optional The target directional variables. Default is []. subset_custom_scale_factor : dict, optional The custom scale factor for the subset data. Default is {}. normalize_target_data : bool, optional Whether to normalize the target data. Default is True. target_custom_scale_factor : dict, optional The custom scale factor for the target data. Default is {}. num_threads : int, optional The number of threads to use for the optimization. Default is None. iteratively_update_sigma : bool, optional Whether to iteratively update the sigma parameter. Default is False. Returns ------- pd.DataFrame The interpolated dataset. Notes ----- - This function fits the model to the subset and predicts the interpolated dataset. """ self.fit( subset_data=subset_data, target_data=target_data, subset_directional_variables=subset_directional_variables, target_directional_variables=target_directional_variables, subset_custom_scale_factor=subset_custom_scale_factor, normalize_target_data=normalize_target_data, target_custom_scale_factor=target_custom_scale_factor, num_threads=num_threads, iteratively_update_sigma=iteratively_update_sigma, ) return self.predict(dataset=dataset)