from typing import List, Tuple
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from minisom import MiniSom
from sklearn.preprocessing import StandardScaler
from ..core.decorators import validate_data_som
from ..core.plotting.base_plotting import DefaultStaticPlotting
from ._base_datamining import BaseClustering
[docs]
class SOMError(Exception):
"""
Custom exception for SOM class.
"""
def __init__(self, message: str = "SOM error occurred."):
self.message = message
super().__init__(self.message)
[docs]
class SOM(BaseClustering):
"""
Self-Organizing Map (SOM) class.
This class performs the Self-Organizing Map algorithm on a given dataframe.
Attributes
----------
som_shape : Tuple[int, int]
The shape of the SOM.
num_dimensions : int
The number of dimensions of the input data.
data : pd.DataFrame
The input data.
standarized_data : pd.DataFrame
The standarized input data.
data_to_fit : pd.DataFrame
The data to fit the SOM algorithm.
data_variables : List[str]
A list with all data variables.
directional_variables : List[str]
A list with directional variables.
fitting_variables : List[str]
A list with fitting variables.
scaler : StandardScaler
The StandardScaler object.
centroids : pd.DataFrame
The selected centroids.
is_fitted : bool
A flag to check if the SOM model is fitted.
Methods
-------
activation_response(data)
Returns the activation response of the given data.
get_centroids_probs_for_labels(data, labels)
Returns the labels map of the given data.
plot_centroids_probs_for_labels(probs_data)
Plots the labels map of the given data.
fit(data, directional_variables, num_iteration)
Fits the SOM model to the provided data.
predict(data)
Predicts the nearest centroid for the provided data.
fit_predict(data, directional_variables, num_iteration)
Fit the SOM algorithm to the provided data and predict the nearest centroid for each data point.
Notes
-----
- Check MiniSom documentation for more information:
https://github.com/JustGlowing/minisom
Examples
--------
>>> import numpy as np
>>> import pandas as pd
>>> from bluemath_tk.datamining.som import SOM
>>> data = pd.DataFrame(
... {
... 'Hs': np.random.rand(1000) * 7,
... 'Tp': np.random.rand(1000) * 20,
... 'Dir': np.random.rand(1000) * 360
... }
... )
>>> som = SOM(som_shape=(3, 3), num_dimensions=4)
>>> nearest_centroids_idxs, nearest_centroids_df = som.fit_predict(
... data=data,
... directional_variables=['Dir'],
... )
TODO
-----
- Add option to normalize data?
"""
def __init__(
self,
som_shape: Tuple[int, int],
num_dimensions: int,
sigma: float = 1,
learning_rate: float = 0.5,
decay_function: str = "asymptotic_decay",
neighborhood_function: str = "gaussian",
topology: str = "rectangular",
activation_distance: str = "euclidean",
random_seed: int = None,
sigma_decay_function: str = "asymptotic_decay",
) -> None:
"""
Initializes a Self Organizing Maps.
A rule of thumb to set the size of the grid for a dimensionality
reduction task is that it should contain 5*sqrt(N) neurons
where N is the number of samples in the dataset to analyze.
E.g. if your dataset has 150 samples, 5*sqrt(150) = 61.23
hence a map 8-by-8 should perform well.
Parameters
----------
som_shape : tuple
Shape of the SOM. This should be a tuple with two integers.
num_dimensions : int
Number of the elements of the vectors in input.
For the other parameters, check the MiniSom documentation:
https://github.com/JustGlowing/minisom/blob/master/minisom.py
Raises
------
ValueError
If the SOM shape is not a tuple with two integers.
Or if the number of dimensions is not an integer.
"""
super().__init__()
self.set_logger_name(name=self.__class__.__name__)
if not isinstance(som_shape, tuple):
if len(som_shape) != 2:
raise ValueError("Invalid SOM shape.")
self.som_shape = som_shape
if not isinstance(num_dimensions, int):
raise ValueError("Invalid number of dimensions.")
self.num_dimensions = num_dimensions
self.x = self.som_shape[0]
self.y = self.som_shape[1]
self.sigma = sigma
self.learning_rate = learning_rate
self.decay_function = decay_function
self.neighborhood_function = neighborhood_function
self.topology = topology
self.activation_distance = activation_distance
self.random_seed = random_seed
self.sigma_decay_function = sigma_decay_function
self._som = MiniSom(
x=self.x,
y=self.y,
input_len=self.num_dimensions,
sigma=self.sigma,
learning_rate=self.learning_rate,
decay_function=self.decay_function,
neighborhood_function=self.neighborhood_function,
topology=self.topology,
activation_distance=self.activation_distance,
random_seed=self.random_seed,
sigma_decay_function=self.sigma_decay_function,
)
self._data: pd.DataFrame = pd.DataFrame()
self._standarized_data: pd.DataFrame = pd.DataFrame()
self._data_to_fit: pd.DataFrame = pd.DataFrame()
self.data_variables: List[str] = []
self.directional_variables: List[str] = []
self.fitting_variables: List[str] = []
self.scaler: StandardScaler = StandardScaler()
self.centroids: pd.DataFrame = pd.DataFrame()
self.is_fitted: bool = False
@property
def som(self) -> MiniSom:
return self._som
@property
def data(self) -> pd.DataFrame:
return self._data
@property
def standarized_data(self) -> pd.DataFrame:
return self._standarized_data
@property
def data_to_fit(self) -> pd.DataFrame:
return self._data_to_fit
@property
def distance_map(self) -> np.ndarray:
"""
Returns the distance map of the SOM.
"""
return self.som.distance_map().T
def _get_winner_neurons(self, standarized_data: np.ndarray) -> np.ndarray:
"""
Returns the winner neurons of the given standarized data.
"""
winner_neurons = np.array([self.som.winner(x) for x in standarized_data]).T
return np.ravel_multi_index(winner_neurons, self.som_shape)
[docs]
def activation_response(self, data: pd.DataFrame = None) -> np.ndarray:
"""
Returns the activation response of the given data.
"""
if data is None:
data = self.standarized_data.copy()
else:
data, _ = self.standarize(data=data, scaler=self.scaler)
return self.som.activation_response(data=data.values)
[docs]
def get_centroids_probs_for_labels(
self, data: pd.DataFrame, labels: List[str]
) -> pd.DataFrame:
"""
Returns the labels map of the given data.
"""
# TODO: JAVI: Could this method be implemented in more datamining classes?
data = data.copy() # Avoid modifying the original data to predict
for directional_variable in self.directional_variables:
u_comp, v_comp = self.get_uv_components(
x_deg=data[directional_variable].values
)
data[f"{directional_variable}_u"] = u_comp
data[f"{directional_variable}_v"] = v_comp
data.drop(columns=[directional_variable], inplace=True)
standarized_data, _ = self.standarize(data=data, scaler=self.scaler)
dict_with_probs = self.som.labels_map(standarized_data.values, labels)
return pd.DataFrame(dict_with_probs).T.sort_index()
[docs]
def plot_centroids_probs_for_labels(
self, probs_data: pd.DataFrame
) -> Tuple[plt.figure, plt.axes]:
"""
Plots the labels map of the given data.
"""
default_static_plot = DefaultStaticPlotting()
fig, axes = default_static_plot.get_subplots(
nrows=self.som_shape[0],
ncols=self.som_shape[1],
)
for index in probs_data.index:
default_static_plot.plot_pie(
ax=axes[*index], x=probs_data.loc[index], labels=probs_data.columns
)
return fig, axes
[docs]
@validate_data_som
def fit(
self,
data: pd.DataFrame,
directional_variables: List[str] = [],
num_iteration: int = 1000,
) -> None:
"""
Fits the SOM model to the provided data.
Parameters
----------
data : pd.DataFrame
The input data to be used for the fitting.
directional_variables : List[str], optional
A list with the directional variables (will be transformed to u and v).
Default is [].
num_iteration : int, optional
The number of iterations for the SOM fitting.
Default is 1000.
Notes
-----
- The function assumes that the data is validated by the `validate_data_som`
decorator before execution.
"""
self._data = data.copy()
self.directional_variables = directional_variables.copy()
for directional_variable in self.directional_variables:
u_comp, v_comp = self.get_uv_components(
x_deg=self.data[directional_variable].values
)
self.data[f"{directional_variable}_u"] = u_comp
self.data[f"{directional_variable}_v"] = v_comp
self.data_variables = list(self.data.columns)
# Get just the data to be used in the training
self._data_to_fit = self.data.copy()
for directional_variable in self.directional_variables:
self.data_to_fit.drop(columns=[directional_variable], inplace=True)
self.fitting_variables = list(self.data_to_fit.columns)
# Standarize data using the StandardScaler custom method
self._standarized_data, self.scaler = self.standarize(data=self.data_to_fit)
# Train the SOM model
self.som.train(data=self.standarized_data.values, num_iteration=num_iteration)
# Save winner neurons and calculate centroids values
data_and_winners = self.data.copy()
data_and_winners["winner_neurons"] = self._get_winner_neurons(
standarized_data=self.standarized_data.values
)
self.centroids = data_and_winners.groupby("winner_neurons").mean()
for directional_variable in self.directional_variables:
self.centroids[directional_variable] = self.get_degrees_from_uv(
xu=self.centroids[f"{directional_variable}_u"].values,
xv=self.centroids[f"{directional_variable}_v"].values,
)
# Set the fitted flag to True
self.is_fitted = True
[docs]
def predict(self, data: pd.DataFrame) -> Tuple[np.ndarray, pd.DataFrame]:
"""
Predicts the nearest centroid for the provided data.
Parameters
----------
data : pd.DataFrame
The input data to be used for the prediction.
Returns
-------
Tuple[np.ndarray, pd.DataFrame]
A tuple with the winner neurons and the centroids of the given data.
"""
if self.is_fitted is False:
raise SOMError("SOM model is not fitted.")
data = data.copy() # Avoid modifying the original data to predict
for directional_variable in self.directional_variables:
u_comp, v_comp = self.get_uv_components(
x_deg=data[directional_variable].values
)
data[f"{directional_variable}_u"] = u_comp
data[f"{directional_variable}_v"] = v_comp
data.drop(columns=[directional_variable], inplace=True)
standarized_data, _ = self.standarize(data=data, scaler=self.scaler)
winner_neurons = self._get_winner_neurons(
standarized_data=standarized_data.values
)
return winner_neurons, self.centroids.iloc[winner_neurons]
[docs]
def fit_predict(
self,
data: pd.DataFrame,
directional_variables: List[str] = [],
num_iteration: int = 1000,
) -> Tuple[np.ndarray, pd.DataFrame]:
"""
Fit the SOM algorithm to the provided data and predict the nearest centroid for each data point.
Parameters
----------
data : pd.DataFrame
The input data to be used for the SOM algorithm.
directional_variables : List[str], optional
A list of directional variables (will be transformed to u and v).
Default is [].
num_iteration : int, optional
The number of iterations for the SOM fitting.
Default is 1000.
Returns
-------
Tuple[np.ndarray, pd.DataFrame]
A tuple containing the winner neurons for each data point and the nearest centroids.
"""
self.fit(
data=data,
directional_variables=directional_variables,
num_iteration=num_iteration,
)
return self.predict(data=data)