Source code for bluemath_tk.datamining.mda

from typing import List, Tuple

import numpy as np
import pandas as pd

from ..core.decorators import validate_data_mda
from ._base_datamining import BaseClustering


[docs] class MDAError(Exception): """ Custom exception for MDA class. """ def __init__(self, message: str = "MDA error occurred."): self.message = message super().__init__(self.message)
[docs] class MDA(BaseClustering): """ Maximum Dissimilarity Algorithm (MDA) class. This class performs the MDA algorithm on a given dataframe. Attributes ---------- num_centers : int The number of centers to use in the MDA algorithm. data : pd.DataFrame The input data. normalized_data : pd.DataFrame The normalized input data. data_to_fit : pd.DataFrame The data to fit the MDA algorithm. data_variables : List[str] A list with all data variables. directional_variables : List[str] A list with directional variables. fitting_variables : List[str] A list with fitting variables. custom_scale_factor : dict A dictionary of custom scale factors. scale_factor : dict A dictionary of scale factors (after normalizing the data). centroids : pd.DataFrame The selected centroids. normalized_centroids : pd.DataFrame The selected normalized centroids. centroid_iterative_indices : List[int] A list of iterative indices of the centroids. centroid_real_indices : List[int] The real indices of the selected centroids. Methods ------- fit(data, directional_variables, custom_scale_factor, first_centroid_seed) Fit the MDA algorithm to the provided data. predict(data) Predict the nearest centroid for the provided data. fit_predict(data, directional_variables, custom_scale_factor, first_centroid_seed) Fits the MDA model to the data and predicts the nearest centroids. Examples -------- >>> import numpy as np >>> import pandas as pd >>> from bluemath_tk.datamining.mda import MDA >>> data = pd.DataFrame( ... { ... 'Hs': np.random.rand(1000) * 7, ... 'Tp': np.random.rand(1000) * 20, ... 'Dir': np.random.rand(1000) * 360 ... } ... ) >>> mda = MDA(num_centers=10) >>> nearest_centroids_idxs, nearest_centroids_df = mda.fit_predict( ... data=data, ... directional_variables=['Dir'], ... ) """ def __init__(self, num_centers: int) -> None: """ Initializes the MDA class. Parameters ---------- num_centers : int The number of centers to use in the MDA algorithm. Must be greater than 0. Raises ------ ValueError If num_centers is not greater than 0. """ super().__init__() self.set_logger_name(name=self.__class__.__name__) if num_centers > 0: self.num_centers = int(num_centers) else: raise ValueError("Variable num_centers must be > 0") self._data: pd.DataFrame = pd.DataFrame() self._normalized_data: pd.DataFrame = pd.DataFrame() self._data_to_fit: pd.DataFrame = pd.DataFrame() self.data_variables: List[str] = [] self.directional_variables: List[str] = [] self.fitting_variables: List[str] = [] self.custom_scale_factor: dict = {} self.scale_factor: dict = {} self.centroids: pd.DataFrame = pd.DataFrame() self.normalized_centroids: pd.DataFrame = pd.DataFrame() self.centroid_iterative_indices: List[int] = [] self.centroid_real_indices: np.ndarray = np.array([]) self.is_fitted: bool = False @property def data(self) -> pd.DataFrame: return self._data @property def normalized_data(self) -> pd.DataFrame: return self._normalized_data @property def data_to_fit(self) -> pd.DataFrame: return self._data_to_fit def _normalized_distance( self, array_to_compare: np.ndarray, all_rest_data: np.ndarray ) -> np.ndarray: """ Calculate the normalized distance between the array_to_compare and all_rest_data. Parameters ---------- array_to_compare : np.ndarray The array to compare against. all_rest_data : np.ndarray) The rest of the data. Returns ------- np.ndarray An array of squared Euclidean distances between the two arrays for each row. Raises ------ MDAError If the function is NOT called before or during fitting. Notes ----- - IMPORTANT: Data is assumed to be normalized before calling this function. - The function assumes that the data_variables, directional_variables, and scale_factor attributes have been set. - The function calculates the squared sum of differences for each row. - DEPRECATED: directional_distance calculation. distance = np.absolute(array_to_compare[:, ix] - all_rest_data[:, ix]) diff[:, ix] = np.minimum( distance, 1 - distance ) """ if not self.fitting_variables: raise MDAError( "_normalized_distance must be called after or during fitting, not before." ) diff = np.zeros(all_rest_data.shape) # Calculate differences for columns for ix in range(len(self.fitting_variables)): diff[:, ix] = array_to_compare[:, ix] - all_rest_data[:, ix] ix = ix + 1 # Compute the squared sum of differences for each row dist = np.sum(diff**2, axis=1) return dist def _nearest_indices_to_centroids( self, normalized_data: pd.DataFrame ) -> Tuple[np.ndarray, pd.DataFrame]: """ Compute nearest data points to calculated centroids. Parameters ---------- normalized_data : pd.DataFrame The input data to be used to compute nearest data point to centroids. Returns ------- np.ndarray An array containing the index of the nearest data point to centroids. pd.DataFrame A DataFrame containing the nearest data points to centroids. Raises ------ MDAError If the fit method has not been called before this method. Or if the data is empty. """ if normalized_data.empty: raise MDAError("Data cannot be empty.") # Compute distances and store nearest distance index nearest_indices_array = np.zeros(self.normalized_centroids.shape[0], dtype=int) for i in range(self.normalized_centroids.shape[0]): rep = np.repeat( np.expand_dims(self.normalized_centroids.values[i, :], axis=0), normalized_data.values.shape[0], axis=0, ) ndist = self._normalized_distance( array_to_compare=rep, all_rest_data=normalized_data.values ) nearest_indices_array[i] = np.nanargmin(ndist) return nearest_indices_array, normalized_data.iloc[nearest_indices_array] def _nearest_indices( self, normalized_data: pd.DataFrame ) -> Tuple[np.ndarray, pd.DataFrame]: """ Compute nearest centroids to the provided data. Parameters ---------- normalized_data : pd.DataFrame The input data to be used to compute nearest centroids. Returns ------- np.ndarray An array containing the index of the nearest centroid to the data. pd.DataFrame A DataFrame containing the nearest centroids to the data. Raises ------ MDAError If the fit method has not been called before this method. Or if the data is empty. """ if normalized_data.empty: raise MDAError("Data cannot be empty.") # Compute distances and store nearest distance index nearest_indices_array = np.zeros(normalized_data.shape[0], dtype=int) for i in range(normalized_data.shape[0]): rep = np.repeat( np.expand_dims(normalized_data.values[i, :], axis=0), self.normalized_centroids.values.shape[0], axis=0, ) ndist = self._normalized_distance( array_to_compare=rep, all_rest_data=self.normalized_centroids.values ) nearest_indices_array[i] = np.nanargmin(ndist) return nearest_indices_array, self.centroids.iloc[nearest_indices_array]
[docs] @validate_data_mda def fit( self, data: pd.DataFrame, directional_variables: List[str] = [], custom_scale_factor: dict = {}, first_centroid_seed: int = None, ) -> None: """ Fit the Maximum Dissimilarity Algorithm (MDA) to the provided data. This method initializes centroids for the MDA algorithm using the provided dataframe, directional variables, and custom scale factor. It normalizes the data, iteratively selects centroids based on maximum dissimilarity, and denormalizes the centroids before returning them. Parameters ---------- data : pd.DataFrame The input data to be used for the MDA algorithm. directional_variables : List[str], optional A list of names of the directional variables within the data. Default is []. custom_scale_factor : dict, optional A dictionary specifying custom scale factors for normalization. Default is {}. first_centroid_seed : int, optional The index of the first centroid to use in the MDA algorithm. Default is None. Notes ----- - The function assumes that the data is validated by the `validate_data_mda` decorator before execution. - When first_centroid_seed is not provided, max value centroid is used. """ self._data = data.copy() self.directional_variables = directional_variables.copy() for directional_variable in self.directional_variables: u_comp, v_comp = self.get_uv_components( x_deg=self.data[directional_variable].values ) self.data[f"{directional_variable}_u"] = u_comp self.data[f"{directional_variable}_v"] = v_comp self.data_variables = list(self.data.columns) self.custom_scale_factor = custom_scale_factor.copy() # Get just the data to be used in the fitting self._data_to_fit = self.data.copy() for directional_variable in self.directional_variables: self.data_to_fit.drop(columns=[directional_variable], inplace=True) self.fitting_variables = list(self.data_to_fit.columns) # Normalize provided data with instantiated custom_scale_factor self._normalized_data, self.scale_factor = self.normalize( data=self.data_to_fit, custom_scale_factor=self.custom_scale_factor ) # [DEPRECATED] Select the point with the maximum value in the first column of pandas dataframe # seed = self.normalized_data[self.normalized_data.columns[0]].idxmax() # Select the point with the maximum summed value if first_centroid_seed is not None: seed = first_centroid_seed self.logger.info(f"Using specified seed={seed} as first centroid.") else: seed = np.argmax(self.normalized_data.sum(axis=1).values) self.logger.info( f"Using max calculated value seed={seed} as first centroid." ) # Initialize centroids subset subset = np.array( [self.normalized_data.values[seed]] ) # The row that starts as seed train = np.delete(self.normalized_data.values, seed, axis=0) # Repeat until we have the desired num_centers n_c = 1 while n_c < self.num_centers: m2 = subset.shape[0] if m2 == 1: xx2 = np.repeat(subset, train.shape[0], axis=0) d_last = self._normalized_distance( array_to_compare=xx2, all_rest_data=train ) else: xx = np.array([subset[-1, :]]) xx2 = np.repeat(xx, train.shape[0], axis=0) d_prev = self._normalized_distance( array_to_compare=xx2, all_rest_data=train ) d_last = np.minimum(d_prev, d_last) qerr, bmu = np.nanmax(d_last), np.nanargmax(d_last) if not np.isnan(qerr): self.centroid_iterative_indices.append(bmu) subset = np.append(subset, np.array([train[bmu, :]]), axis=0) train = np.delete(train, bmu, axis=0) d_last = np.delete(d_last, bmu, axis=0) # Log fmt = "0{0}d".format(len(str(self.num_centers))) self.logger.info( " MDA centroids: {1:{0}}/{2:{0}}".format( fmt, subset.shape[0], self.num_centers ) ) n_c = subset.shape[0] # De-normalize scalar and directional data self.normalized_centroids = pd.DataFrame(subset, columns=self.fitting_variables) self.centroids = self.denormalize( normalized_data=self.normalized_centroids, scale_factor=self.scale_factor ) for directional_variable in self.directional_variables: self.centroids[directional_variable] = self.get_degrees_from_uv( xu=self.centroids[f"{directional_variable}_u"].values, xv=self.centroids[f"{directional_variable}_v"].values, ) # Calculate the real indices of the centroids self.centroid_real_indices, _ = self._nearest_indices_to_centroids( normalized_data=self.normalized_data ) # Set the fitted flag to True self.is_fitted = True
[docs] def predict(self, data: pd.DataFrame) -> Tuple[np.ndarray, pd.DataFrame]: """ Predict the nearest centroid for the provided data. Parameters ---------- data : pd.DataFrame The input data to be used for the prediction. Returns ------- Tuple[np.ndarray, pd.DataFrame] A tuple containing the nearest centroid index for each data point and the nearest centroids. """ if self.is_fitted is False: raise MDAError("MDA model is not fitted.") data = data.copy() # Avoid modifying the original data to predict for directional_variable in self.directional_variables: u_comp, v_comp = self.get_uv_components( x_deg=data[directional_variable].values ) data[f"{directional_variable}_u"] = u_comp data[f"{directional_variable}_v"] = v_comp data.drop(columns=[directional_variable], inplace=True) normalized_data, _ = self.normalize( data=data, custom_scale_factor=self.scale_factor ) return self._nearest_indices(normalized_data=normalized_data)
[docs] def fit_predict( self, data: pd.DataFrame, directional_variables: List[str] = [], custom_scale_factor: dict = {}, first_centroid_seed: int = None, ) -> Tuple[np.ndarray, pd.DataFrame]: """ Fits the MDA model to the data and predicts the nearest centroids. Parameters ---------- data : pd.DataFrame The input data to be used for the MDA algorithm. directional_variables : List[str], optional A list of names of the directional variables within the data. Default is []. custom_scale_factor : dict, optional A dictionary specifying custom scale factors for normalization. Default is {}. first_centroid_seed : int, optional The index of the first centroid to use in the MDA algorithm. Default is None. Returns ------- Tuple[np.ndarray, pd.DataFrame] A tuple containing the nearest centroid index for each data point and the nearest centroids. """ self.fit( data=data, directional_variables=directional_variables, custom_scale_factor=custom_scale_factor, first_centroid_seed=first_centroid_seed, ) return self.predict(data=data)