from typing import Dict, List, Tuple
import numpy as np
import pandas as pd
from sklearn.cluster import KMeans
from sklearn.linear_model import LinearRegression
from ..core.decorators import validate_data_kma
from ._base_datamining import BaseClustering
[docs]
class KMAError(Exception):
"""
Custom exception for KMA class.
"""
def __init__(self, message: str = "KMA error occurred."):
self.message = message
super().__init__(self.message)
[docs]
class KMA(BaseClustering):
"""
K-Means Algorithm (KMA) class.
This class performs the K-Means algorithm on a given dataframe.
Attributes
----------
num_clusters : int
The number of clusters to use in the K-Means algorithm.
seed : int
The random seed to use as initial datapoint.
data_variables : List[str]
A list with all data variables.
directional_variables : List[str]
A list with directional variables.
fitting_variables : List[str]
A list with fitting variables.
custom_scale_factor : dict
A dictionary of custom scale factors.
scale_factor : dict
A dictionary of scale factors (after normalizing the data).
centroids : pd.DataFrame
The selected centroids.
normalized_centroids : pd.DataFrame
The selected normalized centroids.
centroid_real_indices : np.array
The real indices of the selected centroids.
is_fitted : bool
A flag indicating whether the model is fitted or not.
Examples
--------
.. jupyter-execute::
import numpy as np
import pandas as pd
from bluemath_tk.datamining.kma import KMA
data = pd.DataFrame(
{
"Hs": np.random.rand(1000) * 7,
"Tp": np.random.rand(1000) * 20,
"Dir": np.random.rand(1000) * 360
}
)
kma = KMA(num_clusters=5)
nearest_centroids_idxs, nearest_centroids_df = kma.fit_predict(
data=data,
directional_variables=["Dir"],
)
kma.plot_selected_centroids(plot_text=True)
"""
def __init__(
self,
num_clusters: int,
seed: int = None,
) -> None:
"""
Initializes the KMA class.
Parameters
----------
num_clusters : int
The number of clusters to use in the K-Means algorithm.
Must be greater than 0.
seed : int, optional
The random seed to use as initial datapoint.
Must be greater or equal to 0 and less than number of datapoints.
Default is 0.
Raises
------
ValueError
If num_clusters is not greater than 0.
Or if seed is not greater or equal to 0.
"""
super().__init__()
self.set_logger_name(name=self.__class__.__name__)
if num_clusters > 0:
self.num_clusters = int(num_clusters)
else:
raise ValueError("Variable num_clusters must be > 0")
if seed is None:
self.seed = None
elif seed >= 0:
self.seed = int(seed)
else:
raise ValueError("Variable seed must be >= 0")
self._kma = KMeans(
n_clusters=self.num_clusters,
random_state=self.seed,
)
self.logger.info(
f"KMA object created with {self.num_clusters} clusters and seed {self.seed}."
"To customize kma, do self.kma = dict(n_clusters=..., random_state=..., etc)"
)
self._data: pd.DataFrame = pd.DataFrame()
self._normalized_data: pd.DataFrame = pd.DataFrame()
self._data_to_fit: pd.DataFrame = pd.DataFrame()
self.data_variables: List[str] = []
self.directional_variables: List[str] = []
self.fitting_variables: List[str] = []
self.custom_scale_factor: dict = {}
self.scale_factor: dict = {}
self.centroids: pd.DataFrame = pd.DataFrame()
self.normalized_centroids: pd.DataFrame = pd.DataFrame()
self.centroid_real_indices: np.array = np.array([])
self.is_fitted: bool = False
self.regression_guided: dict = {}
@property
def kma(self) -> KMeans:
return self._kma
@kma.setter
def kma(self, kma_params_dict) -> None:
"""
Setter for the KMeans object.
Parameters
----------
kma_params_dict : dict
A dictionary with KMeans parameters.
The keys should be the same as the KMeans parameters.
Example: {"n_clusters": 5, "random_state": 42}
"""
self._kma = KMeans(**kma_params_dict)
@property
def data(self) -> pd.DataFrame:
"""
Returns the original data used for clustering.
"""
return self._data
@property
def normalized_data(self) -> pd.DataFrame:
"""
Returns the normalized data used for clustering.
"""
return self._normalized_data
@property
def data_to_fit(self) -> pd.DataFrame:
"""
Returns the data used for fitting the K-Means algorithm.
"""
return self._data_to_fit
[docs]
@staticmethod
def add_regression_guided(
data: pd.DataFrame, vars: List[str], alpha: List[float]
) -> pd.DataFrame:
"""
Calculate regression-guided variables.
Parameters
----------
data : pd.DataFrame
The data to fit the K-Means algorithm.
vars : List[str]
The variables to use for regression-guided clustering.
alpha : List[float]
The alpha values to use for regression-guided clustering.
Returns
-------
pd.DataFrame
The data with the regression-guided variables.
"""
# Stack guiding variables into (time, n_vars) array
X = data.drop(columns=vars)
Y = np.stack([data[var].values for var in vars], axis=1)
# Normalize input features
X_std = X.std().replace(0, 1)
X_norm = X / X_std
# Add intercept column to input
X_design = np.column_stack((np.ones(len(X)), X_norm.values))
# Normalize guiding targets
Y_std = np.nanstd(Y, axis=0)
Y_std[Y_std == 0] = 1.0
# Fit regression model to predict guiding vars from input
model = LinearRegression(fit_intercept=False).fit(X_design, Y / Y_std)
Y_pred = model.predict(X_design) * Y_std # De-normalize predictions
# Weight columns by input alpha
X_weight = 1.0 - np.sum(alpha)
X_scaled = X_weight * X.values
Y_scaled = Y_pred * alpha
df = pd.DataFrame(np.hstack([X_scaled, Y_scaled]), index=data.index)
df.columns = list(X.columns) + vars
return df
[docs]
@validate_data_kma
def fit(
self,
data: pd.DataFrame,
directional_variables: List[str] = [],
custom_scale_factor: dict = {},
min_number_of_points: int = None,
max_number_of_iterations: int = 10,
normalize_data: bool = False,
regression_guided: Dict[str, List] = {},
) -> None:
"""
Fit the K-Means algorithm to the provided data.
TODO: Add option to force KMA initialization with MDA centroids.
Parameters
----------
data : pd.DataFrame
The input data to be used for the KMA algorithm.
directional_variables : List[str], optional
A list of directional variables that will be transformed to u and v components.
Then, to use custom_scale_factor, you must specify the variables names with the u and v suffixes.
Example: directional_variables=["Dir"], custom_scale_factor={"Dir_u": [0, 1], "Dir_v": [0, 1]}.
Default is [].
custom_scale_factor : dict, optional
A dictionary specifying custom scale factors for normalization.
If normalize_data is True, this will be used to normalize the data.
Example: {"Hs": [0, 10], "Tp": [0, 10]}.
Default is {}.
min_number_of_points : int, optional
The minimum number of points to consider a cluster.
Default is None.
max_number_of_iterations : int, optional
The maximum number of iterations for the K-Means algorithm.
This is used when min_number_of_points is not None.
Default is 10.
normalize_data : bool, optional
A flag to normalize the data.
If True, the data will be normalized using the custom_scale_factor.
Default is False.
regression_guided: dict, optional
A dictionary specifying regression-guided clustering variables and relative weights.
Example: {"vars": ["Fe"], "alpha": [0.6]}.
Default is {}.
"""
if regression_guided:
data = self.add_regression_guided(
data=data,
vars=regression_guided.get("vars", None),
alpha=regression_guided.get("alpha", None),
)
super().fit(
data=data,
directional_variables=directional_variables,
custom_scale_factor=custom_scale_factor,
normalize_data=normalize_data,
)
# Fit K-Means algorithm
if min_number_of_points is not None:
stable_kma_child = False
number_of_tries = 0
while not stable_kma_child:
kma_child = KMeans(n_clusters=self.num_clusters)
predicted_labels = kma_child.fit_predict(self.normalized_data)
_unique_labels, counts = np.unique(predicted_labels, return_counts=True)
if np.all(counts >= min_number_of_points):
stable_kma_child = True
number_of_tries += 1
if number_of_tries > max_number_of_iterations:
raise ValueError(
f"Failed to find a stable K-Means configuration after {max_number_of_iterations} attempts."
"Change max_number_of_iterations or min_number_of_points."
)
self.logger.info(
f"Found a stable K-Means configuration after {number_of_tries} attempts."
)
self._kma = kma_child
else:
self._kma = self.kma.fit(self.normalized_data)
# Calculate the centroids
self.centroid_real_indices = self.kma.labels_.copy()
self.normalized_centroids = pd.DataFrame(
self.kma.cluster_centers_, columns=self.fitting_variables
)
self.centroids = self.denormalize(
normalized_data=self.normalized_centroids, scale_factor=self.scale_factor
)
for directional_variable in self.directional_variables:
self.centroids[directional_variable] = self.get_degrees_from_uv(
xu=self.centroids[f"{directional_variable}_u"].values,
xv=self.centroids[f"{directional_variable}_v"].values,
)
# Set the fitted flag to True
self.is_fitted = True
[docs]
def predict(self, data: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""
Predict the nearest centroid for the provided data.
Parameters
----------
data : pd.DataFrame
The input data to be used for the prediction.
Returns
-------
Tuple[pd.DataFrame, pd.DataFrame]
A tuple containing the nearest centroid index for each data point,
and the nearest centroids.
"""
if self.is_fitted is False:
raise KMAError("KMA model is not fitted.")
normalized_data = super().predict(data=data)
y = self.kma.predict(X=normalized_data)
return pd.DataFrame(
y, columns=["kma_bmus"], index=data.index
), self.centroids.iloc[y]
[docs]
def fit_predict(
self,
data: pd.DataFrame,
directional_variables: List[str] = [],
custom_scale_factor: dict = {},
min_number_of_points: int = None,
max_number_of_iterations: int = 10,
normalize_data: bool = False,
regression_guided: Dict[str, List] = {},
) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""
Fit the K-Means algorithm to the provided data and predict the nearest centroid
for each data point.
Parameters
----------
data : pd.DataFrame
The input data to be used for the KMA algorithm.
directional_variables : List[str], optional
A list of directional variables that will be transformed to u and v components.
Then, to use custom_scale_factor, you must specify the variables names with the u and v suffixes.
Example: directional_variables=["Dir"], custom_scale_factor={"Dir_u": [0, 1], "Dir_v": [0, 1]}.
Default is [].
custom_scale_factor : dict, optional
A dictionary specifying custom scale factors for normalization.
If normalize_data is True, this will be used to normalize the data.
Example: {"Hs": [0, 10], "Tp": [0, 10]}.
Default is {}.
min_number_of_points : int, optional
The minimum number of points to consider a cluster.
Default is None.
max_number_of_iterations : int, optional
The maximum number of iterations for the K-Means algorithm.
This is used when min_number_of_points is not None.
Default is 10.
normalize_data : bool, optional
A flag to normalize the data.
If True, the data will be normalized using the custom_scale_factor.
Default is False.
regression_guided: dict, optional
A dictionary specifying regression-guided clustering variables and relative weights.
Example: {"vars": ["Fe"], "alpha": [0.6]}.
Default is {}.
Returns
-------
Tuple[pd.DataFrame, pd.DataFrame]
A tuple containing the nearest centroid index for each data point,
and the nearest centroids.
"""
self.fit(
data=data,
directional_variables=directional_variables,
custom_scale_factor=custom_scale_factor,
min_number_of_points=min_number_of_points,
max_number_of_iterations=max_number_of_iterations,
normalize_data=normalize_data,
regression_guided=regression_guided,
)
return self.predict(data=data)