Source code for bluemath_tk.datamining.som

from typing import List, Tuple

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from minisom import MiniSom

from ..core.decorators import validate_data_som
from ..core.plotting.base_plotting import DefaultStaticPlotting
from ._base_datamining import BaseClustering


[docs] class SOMError(Exception): """ Custom exception for SOM class. """ def __init__(self, message: str = "SOM error occurred."): self.message = message super().__init__(self.message)
[docs] class SOM(BaseClustering): """ Self-Organizing Maps (SOM) class. This class performs the Self-Organizing Map algorithm on a given dataframe. Attributes ---------- som_shape : Tuple[int, int] The shape of the SOM. num_dimensions : int The number of dimensions of the input data. data_variables : List[str] A list with all data variables. directional_variables : List[str] A list with directional variables. fitting_variables : List[str] A list with fitting variables. custom_scale_factor : dict A dictionary of custom scale factors. scale_factor : dict A dictionary of scale factors (after normalizing the data). centroids : pd.DataFrame The selected centroids. normalized_centroids : pd.DataFrame The selected normalized centroids. is_fitted : bool A flag to check if the SOM model is fitted. Notes ----- - Check MiniSom documentation for more information: https://github.com/JustGlowing/minisom Examples -------- :: jupyter-execute:: import numpy as np import pandas as pd from bluemath_tk.datamining.som import SOM data = pd.DataFrame( { "Hs": np.random.rand(1000) * 7, "Tp": np.random.rand(1000) * 20, "Dir": np.random.rand(1000) * 360 } ) som = SOM(som_shape=(3, 3), num_dimensions=4) nearest_centroids_idxs, nearest_centroids_df = som.fit_predict( data=data, directional_variables=["Dir"], ) som.plot_selected_centroids(plot_text=True) """ def __init__( self, som_shape: Tuple[int, int], num_dimensions: int, sigma: float = 1, learning_rate: float = 0.5, decay_function: str = "asymptotic_decay", neighborhood_function: str = "gaussian", topology: str = "rectangular", activation_distance: str = "euclidean", random_seed: int = None, sigma_decay_function: str = "asymptotic_decay", ) -> None: """ Initializes a Self Organizing Maps. A rule of thumb to set the size of the grid for a dimensionality reduction task is that it should contain 5*sqrt(N) neurons where N is the number of samples in the dataset to analyze. E.g. if your dataset has 150 samples, 5*sqrt(150) = 61.23 hence a map 8-by-8 should perform well. Parameters ---------- som_shape : tuple Shape of the SOM. This should be a tuple with two integers. num_dimensions : int Number of the elements of the vectors in input. For the other parameters, check the MiniSom documentation: https://github.com/JustGlowing/minisom/blob/master/minisom.py Raises ------ ValueError If the SOM shape is not a tuple with two integers. Or if the number of dimensions is not an integer. """ super().__init__() self.set_logger_name(name=self.__class__.__name__) if not isinstance(som_shape, tuple): if len(som_shape) != 2: raise ValueError("Invalid SOM shape.") self.som_shape = som_shape if not isinstance(num_dimensions, int): raise ValueError("Invalid number of dimensions.") self.num_dimensions = num_dimensions self.x = self.som_shape[0] self.y = self.som_shape[1] self.sigma = sigma self.learning_rate = learning_rate self.decay_function = decay_function self.neighborhood_function = neighborhood_function self.topology = topology self.activation_distance = activation_distance self.random_seed = random_seed self.sigma_decay_function = sigma_decay_function self._som = MiniSom( x=self.x, y=self.y, input_len=self.num_dimensions, sigma=self.sigma, learning_rate=self.learning_rate, decay_function=self.decay_function, neighborhood_function=self.neighborhood_function, topology=self.topology, activation_distance=self.activation_distance, random_seed=self.random_seed, sigma_decay_function=self.sigma_decay_function, ) self._data: pd.DataFrame = pd.DataFrame() self._normalized_data: pd.DataFrame = pd.DataFrame() self._data_to_fit: pd.DataFrame = pd.DataFrame() self.data_variables: List[str] = [] self.directional_variables: List[str] = [] self.fitting_variables: List[str] = [] self.custom_scale_factor: dict = {} self.scale_factor: dict = {} self.centroids: pd.DataFrame = pd.DataFrame() self.normalized_centroids: pd.DataFrame = pd.DataFrame() self.is_fitted: bool = False @property def som(self) -> MiniSom: return self._som @som.setter def som(self, som_params_dict: dict) -> None: """ Setter for the SOM object. Parameters ---------- som_params_dict : dict A dictionary with the parameters to set the SOM object. The keys should be the same as the parameters of the MiniSom class. Example: {"sigma": 1, "learning_rate": 0.5} """ self._som = MiniSom(**som_params_dict) @property def data(self) -> pd.DataFrame: """ Returns the original data used for clustering. """ return self._data @property def normalized_data(self) -> pd.DataFrame: """ Returns the normalized data used for clustering. """ return self._normalized_data @property def data_to_fit(self) -> pd.DataFrame: """ Returns the data used for fitting the K-Means algorithm. """ return self._data_to_fit @property def distance_map(self) -> np.ndarray: """ Returns the distance map of the SOM. """ return self.som.distance_map().T def _get_winner_neurons(self, normalized_data: np.ndarray) -> np.ndarray: """ Returns the winner neurons of the given normalized data. """ winner_neurons = np.array([self.som.winner(x) for x in normalized_data]).T return np.ravel_multi_index(winner_neurons, self.som_shape)
[docs] def activation_response(self, data: pd.DataFrame = None) -> np.ndarray: """ Returns the activation response of the given data. """ if data is None: data = self.normalized_data.copy() else: data, _ = self.normalize(data=data, scaler=self.scaler) return self.som.activation_response(data=data.values)
[docs] def get_centroids_probs_for_labels( self, data: pd.DataFrame, labels: List[str] ) -> pd.DataFrame: """ Returns the labels map of the given data. """ # TODO: JAVI: Could this method be implemented in more datamining classes? data = data.copy() # Avoid modifying the original data to predict for directional_variable in self.directional_variables: u_comp, v_comp = self.get_uv_components( x_deg=data[directional_variable].values ) data[f"{directional_variable}_u"] = u_comp data[f"{directional_variable}_v"] = v_comp data.drop(columns=[directional_variable], inplace=True) normalized_data, _ = self.normalize( data=data, custom_scale_factor=self.scale_factor ) dict_with_probs = self.som.labels_map(normalized_data.values, labels) return pd.DataFrame(dict_with_probs).T.sort_index()
[docs] def plot_centroids_probs_for_labels( self, probs_data: pd.DataFrame ) -> Tuple[plt.figure, plt.axes]: """ Plots the labels map of the given data. """ default_static_plot = DefaultStaticPlotting() fig, axes = default_static_plot.get_subplots( nrows=self.som_shape[0], ncols=self.som_shape[1], ) for index in probs_data.index: default_static_plot.plot_pie( ax=axes[*index], x=probs_data.loc[index], labels=probs_data.columns ) return fig, axes
[docs] @validate_data_som def fit( self, data: pd.DataFrame, directional_variables: List[str] = [], custom_scale_factor: dict = {}, num_iteration: int = 1000, normalize_data: bool = False, ) -> None: """ Fits the SOM model to the provided data. Parameters ---------- data : pd.DataFrame The input data to be used for the SOM algorithm. directional_variables : List[str], optional A list of directional variables that will be transformed to u and v components. Then, to use custom_scale_factor, you must specify the variables names with the u and v suffixes. Example: directional_variables=["Dir"], custom_scale_factor={"Dir_u": [0, 1], "Dir_v": [0, 1]}. Default is []. custom_scale_factor : dict, optional A dictionary specifying custom scale factors for normalization. If normalize_data is True, this will be used to normalize the data. Example: {"Hs": [0, 10], "Tp": [0, 10]}. Default is {}. num_iteration : int, optional The number of iterations for the SOM fitting. Default is 1000. normalize_data : bool, optional A flag to normalize the data. If True, the data will be normalized using the custom_scale_factor. Default is False. """ super().fit( data=data, directional_variables=directional_variables, custom_scale_factor=custom_scale_factor, normalize_data=normalize_data, ) # Train the SOM model self.som.train(data=self.normalized_data.values, num_iteration=num_iteration) # Save winner neurons and calculate centroids values data_and_winners = self.data.copy() data_and_winners["winner_neurons"] = self._get_winner_neurons( normalized_data=self.normalized_data.values ) self.normalized_centroids = ( data_and_winners.groupby("winner_neurons") .mean() .drop(columns=self.directional_variables) ) self.centroids = self.denormalize( normalized_data=self.normalized_centroids, scale_factor=self.scale_factor ) for directional_variable in self.directional_variables: self.centroids[directional_variable] = self.get_degrees_from_uv( xu=self.centroids[f"{directional_variable}_u"].values, xv=self.centroids[f"{directional_variable}_v"].values, ) # Set the fitted flag to True self.is_fitted = True
[docs] def predict(self, data: pd.DataFrame) -> Tuple[np.ndarray, pd.DataFrame]: """ Predicts the nearest centroid for the provided data. Parameters ---------- data : pd.DataFrame The input data to be used for the prediction. Returns ------- Tuple[np.ndarray, pd.DataFrame] A tuple with the winner neurons and the centroids of the given data. """ if self.is_fitted is False: raise SOMError("SOM model is not fitted.") normalized_data = super().predict(data=data) winner_neurons = self._get_winner_neurons( normalized_data=normalized_data.values ) return winner_neurons, self.centroids.iloc[winner_neurons]
[docs] def fit_predict( self, data: pd.DataFrame, directional_variables: List[str] = [], custom_scale_factor: dict = {}, num_iteration: int = 1000, normalize_data: bool = False, ) -> Tuple[np.ndarray, pd.DataFrame]: """ Fit the SOM algorithm to the provided data and predict the nearest centroid for each data point. Parameters ---------- data : pd.DataFrame The input data to be used for the SOM algorithm. directional_variables : List[str], optional A list of directional variables that will be transformed to u and v components. Then, to use custom_scale_factor, you must specify the variables names with the u and v suffixes. Example: directional_variables=["Dir"], custom_scale_factor={"Dir_u": [0, 1], "Dir_v": [0, 1]}. Default is []. custom_scale_factor : dict, optional A dictionary specifying custom scale factors for normalization. If normalize_data is True, this will be used to normalize the data. Example: {"Hs": [0, 10], "Tp": [0, 10]}. Default is {}. num_iteration : int, optional The number of iterations for the SOM fitting. Default is 1000. normalize_data : bool, optional A flag to normalize the data. If True, the data will be normalized using the custom_scale_factor. Default is False. Returns ------- Tuple[np.ndarray, pd.DataFrame] A tuple containing the winner neurons for each data point and the nearest centroids. """ self.fit( data=data, directional_variables=directional_variables, custom_scale_factor=custom_scale_factor, num_iteration=num_iteration, normalize_data=normalize_data, ) return self.predict(data=data)