Source code for bluemath_tk.interpolation.rbf

import time
from typing import Callable, List, Tuple

import numpy as np
import dask.array as da
import pandas as pd
from scipy.optimize import fmin, fminbound

from ..core.decorators import validate_data_rbf
from ._base_interpolation import BaseInterpolation


[docs] def gaussian_kernel(r: float, const: float) -> float: """ This function calculates the Gaussian kernel for the given distance and constant. Parameters ---------- r : float The distance. const : float The constant (default name is usually sigma for gaussian kernel). Returns ------- float The Gaussian kernel value. Notes ----- - The Gaussian kernel is defined as: K(r) = exp(r^2 / 2 * const^2) (https://en.wikipedia.org/wiki/Gaussian_function) - Here, we are assuming the mean is 0. """ return np.exp(-0.5 * r * r / (const * const))
[docs] def multiquadratic_kernel(r, const): return np.sqrt(1 + (r / const) ** 2)
[docs] def inverse_kernel(r, const): return 1 / np.sqrt(1 + (r / const) ** 2)
[docs] def cubic_kernel(r, const): return r**3
[docs] def thin_plate_kernel(r, const): return r**2 * np.log(r / const)
[docs] class RBFError(Exception): """ Custom exception for RBF class. """ def __init__(self, message: str = "RBF error occurred."): self.message = message super().__init__(self.message)
[docs] class RBF(BaseInterpolation): """ Radial Basis Function (RBF) interpolation model. Attributes ---------- sigma_min : float The minimum value for the sigma parameter. This value might change in the optimization process. sigma_max : float The maximum value for the sigma parameter. This value might change in the optimization process. sigma_diff : float The difference between the optimal bounded sigma and the minimum and maximum sigma values. If the difference is less than this value, the optimization process continues. kernel : str The kernel to use for the RBF model. The available kernels are: - gaussian : ``exp(-1/2 * (r / const)**2)`` - multiquadratic : ``sqrt(1 + (r / const)**2)`` - inverse : ``1 / sqrt(1 + (r / const)**2)`` - cubic : ``r**3`` - thin_plate : ``r**2 * log(r / const)`` kernel_func : function The kernel function to use for the RBF model. smooth : float The smoothness parameter. subset_data : pd.DataFrame The subset data used to fit the model. normalized_subset_data : pd.DataFrame The normalized subset data used to fit the model. target_data : pd.DataFrame The target data used to fit the model. normalized_target_data : pd.DataFrame The normalized target data used to fit the model. This attribute is only set if normalize_target_data is True in the fit method. subset_directional_variables : List[str] The subset directional variables. target_directional_variables : List[str] The target directional variables. subset_processed_variables : List[str] The subset processed variables. target_processed_variables : List[str] The target processed variables. subset_custom_scale_factor : dict The custom scale factor for the subset data. target_custom_scale_factor : dict The custom scale factor for the target data. subset_scale_factor : dict The scale factor for the subset data. target_scale_factor : dict The scale factor for the target data. rbf_coeffs : pd.DataFrame The RBF coefficients for the target variables. opt_sigmas : dict The optimal sigmas for the target variables. Methods ------- fit -> None Fits the model to the data. predict -> pd.DataFrame Predicts the data for the provided dataset. fit_predict -> pd.DataFrame Fits the model to the subset and predicts the interpolated dataset. Notes ----- TODO: For the moment, this class only supports optimization for one parameter kernels. For this reason, we only have sigma as the parameter to optimize. This sigma refers to the sigma parameter in the Gaussian kernel (but is used for all kernels). Main reference for sigma optimization: https://link.springer.com/article/10.1023/A:1018975909870 Examples -------- .. jupyter-execute:: import numpy as np import pandas as pd from bluemath_tk.interpolation.rbf import RBF dataset = pd.DataFrame( { "Hs": np.random.rand(1000) * 7, "Tp": np.random.rand(1000) * 20, "Dir": np.random.rand(1000) * 360, } ) subset = dataset.sample(frac=0.25) target = pd.DataFrame( { "HsPred": subset["Hs"] * 2 + subset["Tp"] * 3, "DirPred": - subset["Dir"], } ) rbf = RBF() predictions = rbf.fit_predict( subset_data=subset, subset_directional_variables=["Dir"], target_data=target, target_directional_variables=["DirPred"], normalize_target_data=True, dataset=dataset, num_workers=4, iteratively_update_sigma=True, ) print(predictions.head()) References ---------- [1] https://en.wikipedia.org/wiki/Radial_basis_function [2] https://en.wikipedia.org/wiki/Gaussian_function [3] https://link.springer.com/article/10.1023/A:1018975909870 """ rbf_kernels = { "gaussian": gaussian_kernel, "multiquadratic": multiquadratic_kernel, "inverse": inverse_kernel, "cubic": cubic_kernel, "thin_plate": thin_plate_kernel, } def __init__( self, sigma_min: float = 0.001, sigma_max: float = 0.1, sigma_diff: float = 0.0001, sigma_opt: float = None, kernel: str = "gaussian", smooth: float = 1e-5, ): """ Raises ------ ValueError If the sigma_min is not a positive float. If the sigma_max is not a positive float greater than sigma_min. If the sigma_diff is not a positive float. If the kernel is not a string and one of the available kernels. If the smooth is not a positive float. """ initial_msg = f""" --------------------------------------------------------------------------------- | Initializing RBF interpolation model with the following parameters: | - sigma_min: {sigma_min} | - sigma_max: {sigma_max} | - sigma_diff: {sigma_diff} | - sigma_opt: {sigma_opt} | - kernel: {kernel} | - smooth: {smooth} | For more information, please refer to the documentation. | Recommended lecture: https://link.springer.com/article/10.1023/A:1018975909870 --------------------------------------------------------------------------------- """ print(initial_msg) super().__init__() self.set_logger_name(name=self.__class__.__name__) if not isinstance(sigma_min, float) or sigma_min < 0: raise ValueError("sigma_min must be a positive float.") self._sigma_min = sigma_min if not isinstance(sigma_max, float) or sigma_max < sigma_min: raise ValueError( "sigma_max must be a positive float greater than sigma_min." ) self._sigma_max = sigma_max if not isinstance(sigma_diff, float) or sigma_diff < 0: raise ValueError("sigma_diff must be a positive float.") self._sigma_diff = sigma_diff if not isinstance(kernel, str) or kernel not in self.rbf_kernels.keys(): raise ValueError( f"kernel must be a string and one of {list(self.rbf_kernels.keys())}." ) if sigma_opt is not None: if not isinstance(sigma_opt, float) or sigma_opt < 0: raise ValueError("sigma_opt must be a positive float.") self._sigma_opt = sigma_opt if not isinstance(kernel, str) or kernel not in self.rbf_kernels.keys(): raise ValueError( f"kernel must be a string and one of {list(self.rbf_kernels.keys())}." ) self._kernel = kernel self._kernel_func = self.rbf_kernels[self.kernel] if not isinstance(smooth, float) or smooth < 0: raise ValueError("smooth must be a positive float.") self._smooth = smooth # Below, we initialize the attributes that will be set in the fit method self.is_fitted: bool = False self.is_target_normalized: bool = False self._subset_data: pd.DataFrame = pd.DataFrame() self._normalized_subset_data: pd.DataFrame = pd.DataFrame() self._target_data: pd.DataFrame = pd.DataFrame() self._normalized_target_data: pd.DataFrame = pd.DataFrame() self._subset_directional_variables: List[str] = [] self._target_directional_variables: List[str] = [] self._subset_processed_variables: List[str] = [] self._target_processed_variables: List[str] = [] self._subset_custom_scale_factor: dict = {} self._target_custom_scale_factor: dict = {} self._subset_scale_factor: dict = {} self._target_scale_factor: dict = {} self._rbf_coeffs: pd.DataFrame = pd.DataFrame() self._opt_sigmas: dict = {} # Exclude attributes to .save_model() method self._exclude_attributes = [] # Row chunks for parallel computation self.row_chunks: int = None @property def sigma_min(self) -> float: return self._sigma_min @property def sigma_max(self) -> float: return self._sigma_max @property def sigma_diff(self) -> float: return self._sigma_diff @property def sigma_opt(self) -> float: return self._sigma_opt @property def kernel(self) -> str: return self._kernel @property def kernel_func(self) -> Callable: return self._kernel_func @property def smooth(self) -> float: return self._smooth @property def subset_data(self) -> pd.DataFrame: return self._subset_data @property def normalized_subset_data(self) -> pd.DataFrame: return self._normalized_subset_data @property def target_data(self) -> pd.DataFrame: return self._target_data @property def normalized_target_data(self) -> pd.DataFrame: if self._normalized_target_data.empty: raise ValueError("Target data is not normalized.") return self._normalized_target_data @property def subset_directional_variables(self) -> List[str]: return self._subset_directional_variables @property def target_directional_variables(self) -> List[str]: return self._target_directional_variables @property def subset_processed_variables(self) -> List[str]: return self._subset_processed_variables @property def target_processed_variables(self) -> List[str]: return self._target_processed_variables @property def subset_custom_scale_factor(self) -> dict: return self._subset_custom_scale_factor @property def target_custom_scale_factor(self) -> dict: return self._target_custom_scale_factor @property def subset_scale_factor(self) -> dict: return self._subset_scale_factor @property def target_scale_factor(self) -> dict: return self._target_scale_factor @property def rbf_coeffs(self) -> pd.DataFrame: return self._rbf_coeffs @property def opt_sigmas(self) -> dict: return self._opt_sigmas def _preprocess_subset_data( self, subset_data: pd.DataFrame, is_fit: bool = True ) -> pd.DataFrame: """ This function preprocesses the subset data. Parameters ---------- subset_data : pd.DataFrame The subset data to preprocess (could be a dataset to predict). is_fit : bool, optional Whether the data is being fit or not. Default is True. Returns ------- pd.DataFrame The preprocessed subset data. Raises ------ ValueError If the subset contains NaNs. Notes ----- - This function preprocesses the subset data by: - Checking for NaNs. - Preprocessing directional variables. - Normalizing the data. """ # Make copies to avoid modifying the original data subset_data = subset_data.copy() self.logger.info("Checking for NaNs in subset data") subset_data = self.check_nans(data=subset_data, raise_error=True) self.logger.info("Preprocessing subset data") for directional_variable in self.subset_directional_variables: var_u_component, var_y_component = self.get_uv_components( x_deg=subset_data[directional_variable].values ) subset_data[f"{directional_variable}_u"] = var_u_component subset_data[f"{directional_variable}_v"] = var_y_component # Drop the original directional variable in subset_data subset_data.drop(columns=[directional_variable], inplace=True) self._subset_processed_variables = list(subset_data.columns) self.logger.info("Normalizing subset data") normalized_subset_data, subset_scale_factor = self.normalize( data=subset_data, custom_scale_factor=self.subset_custom_scale_factor if is_fit else self.subset_scale_factor, ) self.logger.info("Subset data preprocessed successfully") if is_fit: self._subset_data = subset_data self._normalized_subset_data = normalized_subset_data self._subset_scale_factor = subset_scale_factor return normalized_subset_data.copy() def _preprocess_target_data( self, target_data: pd.DataFrame, normalize_target_data: bool = True, ) -> pd.DataFrame: """ This function preprocesses the target data. Parameters ---------- target_data : pd.DataFrame The target data to preprocess. normalize_target_data : bool, optional Whether to normalize the target data. Default is True. Returns ------- pd.DataFrame The preprocessed target data. Raises ------ ValueError If the target contains NaNs. Notes ----- - This function preprocesses the target data by: - Checking for NaNs. - Preprocessing directional variables. - Normalizing the data. """ # Make copies to avoid modifying the original data target_data = target_data.copy() self.logger.info("Checking for NaNs in target data") target_data = self.check_nans(data=target_data, raise_error=True) self.logger.info("Preprocessing target data") for directional_variable in self.target_directional_variables: var_u_component, var_y_component = self.get_uv_components( x_deg=target_data[directional_variable].values ) target_data[f"{directional_variable}_u"] = var_u_component target_data[f"{directional_variable}_v"] = var_y_component # Drop the original directional variable in target_data target_data.drop(columns=[directional_variable], inplace=True) self._target_processed_variables = list(target_data.columns) if normalize_target_data: self.logger.info("Normalizing target data") normalized_target_data, target_scale_factor = self.normalize( data=target_data, custom_scale_factor=self.target_custom_scale_factor, ) self.is_target_normalized = True self._target_data = target_data.copy() self._normalized_target_data = normalized_target_data.copy() self._target_scale_factor = target_scale_factor.copy() self.logger.info("Target data preprocessed successfully") return normalized_target_data.copy() else: self.is_target_normalized = False self._target_data = target_data.copy() self._normalized_target_data = pd.DataFrame() self._target_scale_factor = {} self.logger.info("Target data preprocessed successfully") return target_data.copy() def _rbf_assemble(self, x, sigma): """ Assemble the RBF matrix. Parameters ---------- x : np.ndarray The data. sigma : float The sigma parameter for the kernel. Returns ------- np.ndarray The data with all the calculated kernel values. """ # Get the number of rows and columns in x dim, n = x.shape # Compute the pairwise distances dists = np.linalg.norm(x[:, :, np.newaxis] - x[:, np.newaxis, :], axis=0) # Apply the kernel function to the distances A = self.kernel_func(dists, sigma) # Subtract the smoothing parameter from the diagonal elements np.fill_diagonal(A, A.diagonal() - self.smooth) # Add the identity matrix to the matrix (polynomial term) P = np.hstack((np.ones((n, 1)), x.T)) A = np.vstack( (np.hstack((A, P)), np.hstack((P.T, np.zeros((dim + 1, dim + 1))))) ) return A def _calc_rbf_coeff( self, sigma: float, x: np.ndarray, y: np.ndarray ) -> Tuple[np.ndarray, np.ndarray]: """ This function calculates the RBF coefficients for the given data. Parameters ---------- sigma : float The sigma parameter for the kernel. x : np.ndarray The subset data used to interpolate. y : np.ndarray The target data to interpolate. Returns ------- Tuple[np.ndarray, np.ndarray] The RBF coefficients and the A matrix. """ # Get the number of rows and columns in x m, n = x.shape # Assemble the A matrix A = self._rbf_assemble(x=x, sigma=sigma) # Concatenate y with zeros and reshape b = np.concatenate((y, np.zeros((m + 1,)))).reshape(-1, 1) # Calculate the RBF coefficients rbfcoeff, _, _, _ = np.linalg.lstsq(A, b, rcond=None) # inverse return rbfcoeff, A def _cost_sigma(self, sigma: float, x: np.ndarray, y: np.ndarray) -> float: """ This function is called by fminbound to minimize the cost function. Parameters ---------- sigma : float The sigma parameter for the kernel. x : np.ndarray The subset data used to interpolate. y : np.ndarray The target data to interpolate. Returns ------- float The cost value. """ # Calculate RBF coefficients and A matrix rbf_coeff, A = self._calc_rbf_coeff(sigma=sigma, x=x, y=y) # Extract the top-left n x n submatrix from A m, n = x.shape A = A[:n, :n] # Compute the pseudo-inverse of the submatrix A invA = np.linalg.pinv(A) # Initialize residuals by subtracting the last m elements of rbf_coeff from y m1, n1 = rbf_coeff.shape kk = y - rbf_coeff[m1 - m - 1] # Adjust residuals by subtracting the product of rbf_coeff and x # DEPRECATED: The loop is replaced by the vectorized operation below # for i in range(m): # kk = kk - rbf_coeff[m1 - m + i] * x[i, :] kk -= np.dot(rbf_coeff[m1 - m :].T, x).reshape(-1) # Calculate the cost by multiplying invA with kk and normalizing by the diagonal elements of invA ceps = np.dot(invA, kk) / np.diagonal(invA) # Return the norm of ceps, representing the cost yy = np.linalg.norm(ceps) return yy def _calc_opt_sigma( self, target_variable: np.ndarray, subset_variables: np.ndarray, iteratively_update_sigma: bool = False, ) -> float: """ This function calculates the optimal sigma for the given target variable. Parameters ---------- target_variable : np.ndarray The target variable to interpolate. subset_variables : np.ndarray The subset variables used to interpolate. iteratively_update_sigma : bool, optional Whether to iteratively update the sigma parameter. Default is False. Returns ------- float The optimal sigma. """ t0 = time.time() # Initialize sigma_min, sigma_max, and d_sigma sigma_min, sigma_max, d_sigma = self.sigma_min, self.sigma_max, 0 if self.sigma_opt is not None: # Optimize sigma using the specified sigma_opt opt_sigma = fmin( func=self._cost_sigma, x0=self.sigma_opt, args=(subset_variables, target_variable), disp=0, )[0] if iteratively_update_sigma: self._sigma_opt = opt_sigma else: # Loop until sigma_diff is less than the specified sigma_diff while d_sigma < self.sigma_diff: opt_sigma = fminbound( func=self._cost_sigma, x1=sigma_min, x2=sigma_max, args=(subset_variables, target_variable), disp=0, ) lm_min = np.abs(opt_sigma - sigma_min) lm_max = np.abs(opt_sigma - sigma_max) if lm_min < self.sigma_diff: sigma_min = sigma_min - sigma_min / 2 elif lm_max < self.sigma_min: sigma_max = sigma_max + sigma_max / 2 d_sigma = np.nanmin([lm_min, lm_max]) if iteratively_update_sigma: self._sigma_opt = opt_sigma # Calculate the time taken to optimize sigma t1 = time.time() self.logger.info(f"Optimal sigma: {opt_sigma} - Time: {t1 - t0:.2f} seconds") # Calculate the RBF coefficients for the optimal sigma rbf_coeff, _ = self._calc_rbf_coeff( sigma=opt_sigma, x=subset_variables, y=target_variable ) return rbf_coeff, opt_sigma def _rbf_variable_interpolation( self, opt_sigma: float, rbf_coeff: np.ndarray, normalized_dataset: pd.DataFrame, num_points_subset: int, num_vars_subset: int, ) -> np.ndarray: """ Interpolates the surface for a variable. opt_sigma : float The optimal sigma calculated for variable. rbf_coeff : np.ndarray The fitted coefficients for variable. normalized_dataset : pd.DataFrame The normalized dataset. num_points_subset : int The number of points used in the fitting. num_vars_subset : int The number of variables used in the fitting. np.ndarray The interpolated variable. """ # Calculate optimal chunk size based on memory norm_dataset = normalized_dataset.values norm_subset = self.normalized_subset_data.values if self.row_chunks is not None: chunks = (min(self.row_chunks, norm_dataset.shape[0]), -1) self.logger.info(f"Using row chunks of size {chunks[0]}") # elif self.num_workers > 1: # chunks = (norm_dataset.shape[0] // self.num_workers, -1) else: chunks = (norm_dataset.shape[0], -1) # Convert to dask arrays for large operations d_dataset = da.from_array(norm_dataset, chunks=chunks) d_subset = da.from_array(norm_subset) # Split computation into chunks result = [] for i in range(0, len(d_dataset), chunks[0]): chunk = d_dataset[i : i + chunks[0]] # Calculate r for this chunk r_chunk = da.linalg.norm(chunk[:, None, :] - d_subset[None, :, :], axis=2) # Apply kernel and dot product kernel_values = self.kernel_func(r_chunk, opt_sigma) # Compute this chunk's result chunk_result = ( rbf_coeff[num_points_subset] + da.dot(kernel_values, rbf_coeff[:num_points_subset]) + da.dot( chunk, rbf_coeff[ num_points_subset + 1 : num_points_subset + 1 + num_vars_subset ].T, ) ) # Compute and append result.append(chunk_result.compute()) # Combine results return np.concatenate(result) def _rbf_interpolate( self, dataset: pd.DataFrame, num_workers: int = None ) -> pd.DataFrame: """ This function interpolates the dataset. Parameters ---------- dataset : pd.DataFrame The dataset to interpolate (must have same variables as subset). num_workers : int, optional The number of workers to use for the interpolation. Default is None. Returns ------- pd.DataFrame The interpolated dataset (with all target variables). """ normalized_dataset = self._preprocess_subset_data( subset_data=dataset, is_fit=False ) # Get the number of rows and columns in subset and dataset num_vars_subset, num_points_subset = self.normalized_subset_data.T.shape _, num_points_dataset = normalized_dataset.T.shape # Initialize the interpolated dataset interpolated_array = np.zeros( (num_points_dataset, len(self.target_processed_variables)) ) # Loop through the target variables if num_workers > 1: self.logger.info( f"Interpolating target variables using parallel execution and num_workers={num_workers}" ) rbf_interpolated_vars = self.parallel_execute( func=self._rbf_variable_interpolation, items=zip( [ self._opt_sigmas[target_var] for target_var in self.target_processed_variables ], [ self._rbf_coeffs[target_var].values for target_var in self.target_processed_variables ], ), num_workers=num_workers, normalized_dataset=normalized_dataset, num_points_subset=num_points_subset, num_vars_subset=num_vars_subset, ) for i_var, interpolated_var in rbf_interpolated_vars.items(): interpolated_array[:, i_var] = interpolated_var else: for i_var, target_var in enumerate(self.target_processed_variables): self.logger.info(f"Interpolating target variable {target_var}") interpolated_var = self._rbf_variable_interpolation( normalized_dataset=normalized_dataset, opt_sigma=self._opt_sigmas[target_var], rbf_coeff=self._rbf_coeffs[target_var].values, num_points_subset=num_points_subset, num_vars_subset=num_vars_subset, ) interpolated_array[:, i_var] = interpolated_var return pd.DataFrame(interpolated_array, columns=self.target_processed_variables)
[docs] @validate_data_rbf def fit( self, subset_data: pd.DataFrame, target_data: pd.DataFrame, subset_directional_variables: List[str] = [], target_directional_variables: List[str] = [], subset_custom_scale_factor: dict = {}, normalize_target_data: bool = True, target_custom_scale_factor: dict = {}, num_workers: int = None, iteratively_update_sigma: bool = False, ) -> None: """ Fits the model to the data. Parameters ---------- subset_data : pd.DataFrame The subset data used to fit the model. target_data : pd.DataFrame The target data used to fit the model. subset_directional_variables : List[str], optional The subset directional variables. Default is []. target_directional_variables : List[str], optional The target directional variables. Default is []. subset_custom_scale_factor : dict, optional The custom scale factor for the subset data. Default is {}. normalize_target_data : bool, optional Whether to normalize the target data. Default is True. target_custom_scale_factor : dict, optional The custom scale factor for the target data. Default is {}. num_workers : int, optional The number of workers to use for the optimization. Default is None. iteratively_update_sigma : bool, optional Whether to iteratively update the sigma parameter. Default is False. Notes ----- - This function fits the RBF model to the data by: 1. Preprocessing the subset and target data. 2. Calculating the optimal sigma for the target variables. 3. Storing the RBF coefficients and optimal sigmas. - The number of threads to use for the optimization can be specified. """ self._subset_directional_variables = subset_directional_variables self._target_directional_variables = target_directional_variables self._subset_custom_scale_factor = subset_custom_scale_factor self._target_custom_scale_factor = target_custom_scale_factor subset_data = self._preprocess_subset_data(subset_data=subset_data) target_data = self._preprocess_target_data( target_data=target_data, normalize_target_data=normalize_target_data, ) if num_workers is None: num_workers = self.num_workers self.logger.info("Fitting RBF model to the data") # RBF fitting for all variables rbf_coeffs, opt_sigmas = {}, {} if num_workers > 1: self.logger.info( f"Fitting RBF model using parallel execution and num_workers={num_workers}" ) rbf_coeffs_and_sigmas = self.parallel_execute( func=self._calc_opt_sigma, items=[ target_data[target_var].values for target_var in target_data.columns ], num_workers=num_workers, subset_variables=subset_data.values.T, iteratively_update_sigma=iteratively_update_sigma, ) for i_target_var, (rbf_coeff, opt_sigma) in rbf_coeffs_and_sigmas.items(): target_var = target_data.columns[i_target_var] rbf_coeffs[target_var] = rbf_coeff.flatten() opt_sigmas[target_var] = opt_sigma else: for target_var in target_data.columns: self.logger.info(f"Fitting RBF for variable {target_var}") target_var_values = target_data[target_var].values rbf_coeff, opt_sigma = self._calc_opt_sigma( target_variable=target_var_values, subset_variables=subset_data.values.T, iteratively_update_sigma=iteratively_update_sigma, ) rbf_coeffs[target_var] = rbf_coeff.flatten() opt_sigmas[target_var] = opt_sigma # Store the RBF coefficients and optimal sigmas self._rbf_coeffs = pd.DataFrame(rbf_coeffs) self._opt_sigmas = opt_sigmas # Set the is_fitted attribute to True self.is_fitted = True
[docs] def predict(self, dataset: pd.DataFrame, num_workers: int = None) -> pd.DataFrame: """ Predicts the data for the provided dataset. Parameters ---------- dataset : pd.DataFrame The dataset to predict (must have same variables than subset). num_workers : int, optional The number of workers to use for the interpolation. Default is None. Returns ------- pd.DataFrame The interpolated dataset. Raises ------ ValueError If the model is not fitted. Notes ----- - This function predicts the data by: 1. Reconstructing the data using the fitted coefficients. 2. Denormalizing the target data if normalize_target_data is True. 3. Calculating the degrees for the target directional variables. """ if self.is_fitted is False: raise RBFError("RBF model must be fitted before predicting.") if num_workers is None: num_workers = self.num_workers self.logger.info("Reconstructing data using fitted coefficients.") interpolated_target = self._rbf_interpolate( dataset=dataset, num_workers=num_workers ) if self.is_target_normalized: self.logger.info("Denormalizing target data") interpolated_target = self.denormalize( normalized_data=interpolated_target, scale_factor=self.target_scale_factor, ) for directional_variable in self.target_directional_variables: self.logger.info(f"Calculating target degrees for {directional_variable}") interpolated_target[directional_variable] = self.get_degrees_from_uv( xu=interpolated_target[f"{directional_variable}_u"].values, xv=interpolated_target[f"{directional_variable}_v"].values, ) return interpolated_target
[docs] def fit_predict( self, subset_data: pd.DataFrame, target_data: pd.DataFrame, dataset: pd.DataFrame, subset_directional_variables: List[str] = [], target_directional_variables: List[str] = [], subset_custom_scale_factor: dict = {}, normalize_target_data: bool = True, target_custom_scale_factor: dict = {}, num_workers: int = None, iteratively_update_sigma: bool = False, ) -> pd.DataFrame: """ Fits the model to the subset and predicts the interpolated dataset. Parameters ---------- subset_data : pd.DataFrame The subset data used to fit the model. target_data : pd.DataFrame The target data used to fit the model. dataset : pd.DataFrame The dataset to predict (must have same variables than subset). subset_directional_variables : List[str], optional The subset directional variables. Default is []. target_directional_variables : List[str], optional The target directional variables. Default is []. subset_custom_scale_factor : dict, optional The custom scale factor for the subset data. Default is {}. normalize_target_data : bool, optional Whether to normalize the target data. Default is True. target_custom_scale_factor : dict, optional The custom scale factor for the target data. Default is {}. num_workers : int, optional The number of workers to use for the optimization. Default is None. iteratively_update_sigma : bool, optional Whether to iteratively update the sigma parameter. Default is False. Returns ------- pd.DataFrame The interpolated dataset. Notes ----- - This function fits the model to the subset and predicts the interpolated dataset. """ if num_workers is None: num_workers = self.num_workers self.fit( subset_data=subset_data, target_data=target_data, subset_directional_variables=subset_directional_variables, target_directional_variables=target_directional_variables, subset_custom_scale_factor=subset_custom_scale_factor, normalize_target_data=normalize_target_data, target_custom_scale_factor=target_custom_scale_factor, num_workers=num_workers, iteratively_update_sigma=iteratively_update_sigma, ) return self.predict(dataset=dataset, num_workers=num_workers)