Source code for bluemath_tk.datamining.kma

from typing import List, Tuple

import numpy as np
import pandas as pd
from sklearn.cluster import KMeans

from ..core.decorators import validate_data_kma
from ._base_datamining import BaseClustering


[docs] class KMAError(Exception): """ Custom exception for KMA class. """ def __init__(self, message: str = "KMA error occurred."): self.message = message super().__init__(self.message)
[docs] class KMA(BaseClustering): """ K-Means (KMA) class. This class performs the K-Means algorithm on a given dataframe. Attributes ---------- num_clusters : int The number of clusters to use in the K-Means algorithm. seed : int The random seed to use as initial datapoint. data_variables : List[str] A list with all data variables. directional_variables : List[str] A list with directional variables. fitting_variables : List[str] A list with fitting variables. custom_scale_factor : dict A dictionary of custom scale factors. scale_factor : dict A dictionary of scale factors (after normalizing the data). centroids : pd.DataFrame The selected centroids. normalized_centroids : pd.DataFrame The selected normalized centroids. centroid_real_indices : np.array The real indices of the selected centroids. Notes ----- - The K-Means algorithm is used to cluster data points into k clusters. - The K-Means algorithm is sensitive to the initial centroids. - The K-Means algorithm is not suitable for large datasets. Examples -------- .. jupyter-execute:: import numpy as np import pandas as pd from bluemath_tk.datamining.kma import KMA data = pd.DataFrame( { 'Hs': np.random.rand(1000) * 7, 'Tp': np.random.rand(1000) * 20, 'Dir': np.random.rand(1000) * 360 } ) kma = KMA(num_clusters=5) nearest_centroids_idxs, nearest_centroids_df = kma.fit_predict( data=data, directional_variables=['Dir'], ) kma.plot_selected_centroids(plot_text=True) """ def __init__( self, num_clusters: int, seed: int = None, ) -> None: """ Initializes the KMA class. Parameters ---------- num_clusters : int The number of clusters to use in the K-Means algorithm. Must be greater than 0. seed : int, optional The random seed to use as initial datapoint. Must be greater or equal to 0 and less than number of datapoints. Default is 0. Raises ------ ValueError If num_clusters is not greater than 0. Or if seed is not greater or equal to 0. """ super().__init__() self.set_logger_name(name=self.__class__.__name__, console=False) if num_clusters > 0: self.num_clusters = int(num_clusters) else: raise ValueError("Variable num_clusters must be > 0") if seed is None: self.seed = None elif seed >= 0: self.seed = int(seed) else: raise ValueError("Variable seed must be >= 0") self._kma = KMeans( n_clusters=self.num_clusters, random_state=self.seed, ) self.logger.info( f"KMA object created with {self.num_clusters} clusters and seed {self.seed}." "To customize kma, do self.kma = dict(n_clusters=..., random_state=..., etc)" ) self._data: pd.DataFrame = pd.DataFrame() self._normalized_data: pd.DataFrame = pd.DataFrame() self._data_to_fit: pd.DataFrame = pd.DataFrame() self.data_variables: List[str] = [] self.directional_variables: List[str] = [] self.fitting_variables: List[str] = [] self.custom_scale_factor: dict = {} self.scale_factor: dict = {} self.centroids: pd.DataFrame = pd.DataFrame() self.normalized_centroids: pd.DataFrame = pd.DataFrame() self.centroid_real_indices: np.array = np.array([]) self.is_fitted: bool = False self._exclude_attributes = [ "_data", "_normalized_data", "_data_to_fit", ] @property def kma(self) -> KMeans: return self._kma @kma.setter def kma(self, kwargs) -> None: self._kma = KMeans(**kwargs) @property def data(self) -> pd.DataFrame: """ Returns the original data used for clustering. """ return self._data @property def normalized_data(self) -> pd.DataFrame: """ Returns the normalized data used for clustering. """ return self._normalized_data @property def data_to_fit(self) -> pd.DataFrame: """ Returns the data used for fitting the K-Means algorithm. """ return self._data_to_fit
[docs] @validate_data_kma def fit( self, data: pd.DataFrame, directional_variables: List[str] = [], custom_scale_factor: dict = {}, min_number_of_points: int = None, max_number_of_iterations: int = 10, normalize_data: bool = True, ) -> None: """ Fit the K-Means algorithm to the provided data. This method initializes centroids for the K-Means algorithm using the provided dataframe and custom scale factor. It normalizes the data, and returns the calculated centroids. TODO: Implement KMA regression guided with variable. Parameters ---------- data : pd.DataFrame The input data to be used for the KMA algorithm. directional_variables : List[str], optional A list of directional variables (will be transformed to u and v). Default is []. custom_scale_factor : dict, optional A dictionary specifying custom scale factors for normalization. Default is {}. min_number_of_points : int, optional The minimum number of points to consider a cluster. Default is None. max_number_of_iterations : int, optional The maximum number of iterations for the K-Means algorithm. This is used when min_number_of_points is not None. Default is 10. normalize_data : bool, optional A flag to normalize the data. Default is True. """ self._data = data.copy() self.directional_variables = directional_variables.copy() for directional_variable in self.directional_variables: u_comp, v_comp = self.get_uv_components( x_deg=self.data[directional_variable].values ) self._data[f"{directional_variable}_u"] = u_comp self._data[f"{directional_variable}_v"] = v_comp self.data_variables = list(self.data.columns) # Get just the data to be used in the fitting self._data_to_fit = self.data.copy() for directional_variable in self.directional_variables: self.data_to_fit.drop(columns=[directional_variable], inplace=True) self.fitting_variables = list(self.data_to_fit.columns) if normalize_data: self.custom_scale_factor = custom_scale_factor.copy() else: self.custom_scale_factor = { fitting_variable: (0, 1) for fitting_variable in self.fitting_variables } # Normalize data using custom min max scaler self._normalized_data, self.scale_factor = self.normalize( data=self.data_to_fit, custom_scale_factor=self.custom_scale_factor ) # Fit K-Means algorithm if min_number_of_points is not None: stable_kma_child = False number_of_tries = 0 while not stable_kma_child: kma_child = KMeans(n_clusters=self.num_clusters) predicted_labels = kma_child.fit_predict(self.normalized_data) _unique_labels, counts = np.unique(predicted_labels, return_counts=True) if np.all(counts >= min_number_of_points): stable_kma_child = True number_of_tries += 1 if number_of_tries > max_number_of_iterations: raise ValueError( f"Failed to find a stable K-Means configuration after {max_number_of_iterations} attempts." "Change max_number_of_iterations or min_number_of_points." ) self.logger.info( f"Found a stable K-Means configuration after {number_of_tries} attempts." ) self._kma = kma_child else: self._kma = self.kma.fit(self.normalized_data) # Calculate the centroids self.centroid_real_indices = self.kma.labels_.copy() self.normalized_centroids = pd.DataFrame( self.kma.cluster_centers_, columns=self.fitting_variables ) self.centroids = self.denormalize( normalized_data=self.normalized_centroids, scale_factor=self.scale_factor ) for directional_variable in self.directional_variables: self.centroids[directional_variable] = self.get_degrees_from_uv( xu=self.centroids[f"{directional_variable}_u"].values, xv=self.centroids[f"{directional_variable}_v"].values, ) # Set the fitted flag to True self.is_fitted = True
[docs] def predict(self, data: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]: """ Predict the nearest centroid for the provided data. Parameters ---------- data : pd.DataFrame The input data to be used for the prediction. Returns ------- Tuple[pd.DataFrame, pd.DataFrame] A tuple containing the nearest centroid index for each data point, and the nearest centroids. """ if self.is_fitted is False: raise KMAError("KMA model is not fitted.") data = data.copy() # Avoid modifying the original data to predict for directional_variable in self.directional_variables: u_comp, v_comp = self.get_uv_components( x_deg=data[directional_variable].values ) data[f"{directional_variable}_u"] = u_comp data[f"{directional_variable}_v"] = v_comp data.drop(columns=[directional_variable], inplace=True) normalized_data, _ = self.normalize( data=data, custom_scale_factor=self.scale_factor ) y = self.kma.predict(X=normalized_data) return pd.DataFrame( y, columns=["kma_bmus"], index=data.index ), self.centroids.iloc[y]
[docs] def fit_predict( self, data: pd.DataFrame, directional_variables: List[str] = [], custom_scale_factor: dict = {}, min_number_of_points: int = None, max_number_of_iterations: int = 10, normalize_data: bool = True, ) -> Tuple[pd.DataFrame, pd.DataFrame]: """ Fit the K-Means algorithm to the provided data and predict the nearest centroid for each data point. Parameters ---------- data : pd.DataFrame The input data to be used for the KMA algorithm. directional_variables : List[str], optional A list of directional variables (will be transformed to u and v). Default is []. custom_scale_factor : dict A dictionary specifying custom scale factors for normalization. Default is {}. min_number_of_points : int, optional The minimum number of points to consider a cluster. Default is None. max_number_of_iterations : int, optional The maximum number of iterations for the K-Means algorithm. This is used when min_number_of_points is not None. Default is 10. normalize_data : bool, optional A flag to normalize the data. Default is True. Returns ------- Tuple[pd.DataFrame, pd.DataFrame] A tuple containing the nearest centroid index for each data point, and the nearest centroids. """ self.fit( data=data, directional_variables=directional_variables, custom_scale_factor=custom_scale_factor, min_number_of_points=min_number_of_points, max_number_of_iterations=max_number_of_iterations, normalize_data=normalize_data, ) return self.predict(data=data)