import matplotlib.pyplot as plt
import numpy as np
from scipy.signal import find_peaks
from scipy.stats import chi2, norm, pearsonr
from ..core.io import BlueMathModel
from .utils.pot_utils import RWLSfit, threshold_search
[docs]
def block_maxima(
x: np.ndarray,
block_size: int | float = 365.25,
min_sep: int = 2,
) -> tuple[np.ndarray, np.ndarray]:
"""
Function to obtain the Block Maxima of given size taking into account
minimum independence hypothesis
Parameters
----------
x : np.ndarray
Data used to compute the Block Maxima
block_size : int, default=5
Size of BM in index units (if daily data, nº of days), by default 5
min_sep : int, optional
Minimum separation between maximas in index units, by default 2
Returns
-------
idx : np.ndarray
Indices of BM
bmaxs : np.ndarray
BM values
Raises
------
ValueError
Minimum separation must be smaller than (block_size+1)/2
Example
-------
>>> # 1-year of daily values
>>> x = np.random.lognormal(1, 1.2, size=365)
>>> # 5-day Block Maxima with 72h of independency
>>> idx, bmaxs = block_maxima(
>>> x,
>>> block_size=5,
>>> min_sep=3
>>> )
"""
block_size = int(np.ceil(block_size))
if min_sep > (block_size + 1) / 2:
raise ValueError("min_sep must be smaller than (block_size+1)/2")
x = np.asarray(x)
n = x.size
# Partition into non-overlapping blocks
n_blocks = int(np.ceil(n / block_size))
segments_idx = []
segments = []
blocks = []
# For each block, keep a *ranked* list of (idx, value) candidates (desc by value)
for b in range(n_blocks):
start = b * block_size
stop = min((b + 1) * block_size, n)
segment = x[start:stop]
candidates_idx = np.argsort(segment)[::-1]
segments_idx.append(np.arange(start, stop))
segments.append(segment)
blocks.append(candidates_idx)
def violates(i_left, i_right):
if i_left is None or i_right is None:
return False
return i_right - i_left < min_sep
changed = True
while changed:
changed = False
for b in range(n_blocks - 1):
idx_left = segments_idx[b][blocks[b][0]]
idx_right = segments_idx[b + 1][blocks[b + 1][0]]
if not violates(idx_left, idx_right):
continue
else:
idx_block_adjust = (
0
if segments[b][blocks[b][0]] < segments[b + 1][blocks[b + 1][0]]
else 1
)
blocks[b + idx_block_adjust] = np.delete(
blocks[b + idx_block_adjust], 0
)
changed = True
break
bmaxs = np.asarray([segments[b][blocks[b][0]] for b in range(n_blocks)])
idx = np.asarray([segments_idx[b][blocks[b][0]] for b in range(n_blocks)])
return idx, bmaxs
[docs]
def pot(
data: np.ndarray,
threshold: float = 0.0,
n0: int = 10,
min_peak_distance: int = 2,
sig_level: float = 0.05,
):
"""
Function to identiy POT
This function identifies peaks in a dataset that exceed a specified
threshold and computes statistics such as mean exceedances, variances,
and weights for valid unique peaks. Peaks are considered independent if
they are separated by a minimum distance.
Parameters
----------
data : np.ndarray(n,)
Input time series or data array
threshold : float, default=0
Threshold above which peaks are extracted
n0 : int, default=10
Minimum number of exceedances required for valid computation
min_peak_distance : int, default = 2
Minimum distance between two peaks (in data points)
sig_level : float, default=0.05
Significance level for Chi-squared test
Returns
-------
pks_unicos_valid : np.ndarray
Valid unique peaks after removing NaN values
excedencias_mean_valid : np.ndarray
Mean exceedances for valid peaks
excedencias_weight_valid : np.ndarray
Weights based on exceedance variance for valid peaks
pks : np.ndarray
All detected peaks
locs : np.ndarray
Indices of the detected peaks in the data
autocorrelations : np.ndarray(n, 3)
Lags, correlations and pvalues to check the independence assumption
"""
# Find peaks exceeding the threshold with specified min distance
adjusted_data = np.maximum(data - threshold, 0)
# Usamos la librería detecta que tiene el mismo funcionamiento que la función de matlab findpeaks
locs, _ = find_peaks(adjusted_data, distance=min_peak_distance + 1)
# Con scipy
# locs, _ = find_peaks(adjusted_data, distance=min_peak_distance)
pks = data[locs]
# Calculate autocorrelation for lags 1 to 5 (if enough peaks)
num_lags = 5
if len(pks) > num_lags:
autocorrelations = np.zeros((num_lags, 3), dtype=float)
for i in range(num_lags):
lag = i + 1
r, p_value = pearsonr(pks[:-lag], pks[lag:]) # Test corr != 0
autocorrelations[i, 0] = int(lag)
autocorrelations[i, 1] = r
autocorrelations[i, 2] = p_value
if p_value < sig_level:
Warning(
f"Lag {int(lag)} significant, consider increase the number of min_peak_distance"
)
else:
# Not enough peaks for autocorrelation analysis
autocorrelations = np.array([])
# Unique peaks (pks_unicos), ignoring duplicates
pks_unicos = np.unique(pks)
# Allocate arrays to store mean exceedances, variances, and weights
excedencias_mean = np.zeros(len(pks_unicos), dtype=float)
excedencias_var = np.zeros(len(pks_unicos), dtype=float)
excedencias_weight = np.zeros(len(pks_unicos), dtype=float)
# Loop through each unique peak and calculate mean exceedances, variances, and weights
for i in range(len(pks_unicos)):
# Define the current unique peak
pico_actual = pks_unicos[i]
# Calculate the exceedances for peaks greater than the current unique peak
excedencias = pks[pks > pico_actual]
# If there are enough exceedances (greater than or equal to n0)
if len(excedencias) >= n0:
# Compute the mean exceedance (adjusted by the current peak)
excedencias_mean[i] = np.mean(excedencias) - pico_actual
# Compute the variance of the exceedances (ddof=1 to use the same variance as matlab)
excedencias_var[i] = np.var(excedencias, ddof=1)
# Compute the weight as the number of exceedances divided by the variance
# Weight = number of exceedances / variance
# (Guard against division by zero)
if excedencias_var[i] != 0:
excedencias_weight[i] = len(excedencias) / excedencias_var[i]
else:
excedencias_weight[i] = np.nan
else:
# If fewer than n0 exceedances, truncate arrays and stop
excedencias_mean = excedencias_mean[:i]
excedencias_var = excedencias_var[:i]
excedencias_weight = excedencias_weight[:i]
break
# Trim the list of unique peaks to match the number of valid exceedances
pks_unicos = pks_unicos[: len(excedencias_weight)]
# Remove any NaN values from the peak and exceedance data to avoid issues in regression
valid_indices = (
~np.isnan(pks_unicos)
& ~np.isnan(excedencias_mean)
& ~np.isnan(excedencias_weight)
)
pks_unicos_valid = pks_unicos[valid_indices]
excedencias_mean_valid = excedencias_mean[valid_indices]
excedencias_weight_valid = excedencias_weight[valid_indices]
return (
pks_unicos_valid,
excedencias_mean_valid,
excedencias_weight_valid,
pks,
locs,
autocorrelations,
)
[docs]
class OptimalThreshold(BlueMathModel):
"""
Class to compute the optimal threshold using different algorithms.
Methods
-------
studentidez_residuals :
Function to compute the threshold using the studentidez resiudals
Notes
-----
The list of methods implemented to select the optimal threshold are:
- Studentidez residuals method Mínguez (2025) [1].
[1] Mínguez, R. (2025). Automatic Threshold Selection for Generalized
Pareto and Pareto–Poisson Distributions in Rainfall Analysis: A Case
Study Using the NOAA NCDC Daily Rainfall Database. Atmosphere, 16(1),
61. https://doi.org/10.3390/atmos16010061
"""
def __init__(
self,
data,
threshold: float = 0.0,
n0: int = 10,
min_peak_distance: int = 2,
sig_level: float = 0.05,
method: str = "studentized",
plot: bool = False,
folder: str = None,
display_flag: bool = False,
):
self.data = data
self.threshold = threshold
self.n0 = n0
self.min_peak_distance = min_peak_distance
(
self.pks_unicos_valid,
self.excedencias_mean_valid,
self.excedencias_weight_valid,
self.pks,
self.locs,
self.autocorrelations,
) = pot(self.data, threshold, n0, min_peak_distance)
self.method = method
self.sig_level = sig_level
self.plot = plot
self.folder = folder
self.display_flag = display_flag
[docs]
def fit(
self,
):
"""
Obtain the optimal threshold and POTs given the selected method
Returns
-------
threshold : float
Optimal threshold
pks : np.ndarray
POT
pks_idx : np.ndarray
Indices of POT
"""
if self.method == "studentized":
self.threshold, beta, fobj, r = self.studentized_residuals(
self.pks_unicos_valid,
self.excedencias_mean_valid,
self.excedencias_weight_valid,
self.sig_level,
self.plot,
self.folder,
self.display_flag,
)
# TODO: Añadir más metodos
_, _, _, self.pks, self.pks_idx, _ = pot(
self.data, self.threshold, self.n0, self.min_peak_distance
)
return self.threshold.item(), self.pks, self.pks_idx
[docs]
def studentized_residuals(
self,
pks_unicos_valid: np.ndarray,
exceedances_mean_valid: np.ndarray,
exceedances_weight_valid: np.ndarray,
sig_level: float = 0.05,
plot: bool = False,
folder: str = None,
display_flag: bool = False,
):
"""
Function to compute the optimal threshold based on Chi-Squared
and studentized residuals. Optionally plot the results if plot_flag is true and
displays messages if display_flag is true.
Parameters
----------
pks_unicos_valid : np.ndarray(n,)
Vector of unique peaks (potential thresholds)
exceedances_mean_valid : np.ndarray(n,)
Vector of exceedance means
exceedances_weight_valid : np.ndarray(n,)
Vector of exceedance weights
sig_level : bool, default=0.05
Significance level for Chi-squared test
plot_flag : bool, default=False
Boolean flag, true to plot the graphs, false otherwise
folder : str, default=None
Path and name for making graphs
display_flag : bool, default=False
Boolean flag, true to display messages, false otherwise
Returns
-------
threshold :
Optimal threshold found
beta : np.ndarray
Optimal regression coefficients
fobj :
Optimal objective function (weighted leats squares)
r : np.ndarray
Optimal residuals
"""
stop_search = 0
it = 1
threshold = pks_unicos_valid[0] # Initial threshold
while stop_search == 0 and it <= 10:
# Find the current threshold in the pks_unicos_valid array
pos = np.argwhere(pks_unicos_valid == threshold).item()
u_values = pks_unicos_valid[pos:] # Threshold starting from the current one
e_values = exceedances_mean_valid[pos:] # Exceedances
w_values = exceedances_weight_valid[pos:] # Weights
# Perform the RWLS fitting and calculate studentidez residuals
beta, fobj, r, rN = RWLSfit(u_values, e_values, w_values)
# Plot resudals if required
if plot:
fig = plt.figure(figsize=(10, 6))
ax = fig.add_subplot(111)
ax.plot(
u_values,
rN,
"k",
linewidth=1.5,
label=f"Internally Studentized Residuals\nMin threshold = {threshold.item()}",
)
ax.set_xlabel(r"Threshold $u$")
ax.set_ylabel(r"$r$")
ax.set_title(f"Studentized Residuals Iteration {it}")
# ax.text(threshold + 0.5, min(rN) + 0.1 * (max(rN) - min(rN)), f'Min threshold = {threshold}')
ax.grid()
ax.legend(loc="upper right")
if folder is not None:
plt.savefig(f"{folder}/StudenRes{it}.png", dpi=300)
# plt.show()
plt.close()
if fobj > chi2.ppf(1 - sig_level, df=u_values.size - 2) or np.abs(
rN[0]
) > norm.ppf(1 - sig_level / 2, 0, 1):
if display_flag:
if fobj > chi2.ppf(1 - sig_level, df=u_values.size - 2):
print("Chi-squared test detects anomalies")
if np.abs(rN[0]) > norm.ppf(1 - sig_level / 2, 0, 1):
print(
"The maximum studentized residual of the first record detects anomalies"
)
thresholdsearch = 1
else:
thresholdsearch = 0
stop_search = 1 # If criteria met, stop the loop
if thresholdsearch:
if display_flag:
print(
f"Maximum sensitivity = {np.max(np.abs(rN))} and thus the optimal threshold seems to be on the right side of the minimum sample value, looking for the location"
)
_, threshold = threshold_search(
u_values,
rN,
w_values,
plot,
folder,
)
if display_flag:
print(f"New threshold found: {threshold}")
it += 1
return threshold, beta, fobj, r
[docs]
def potplot(
self,
time: np.ndarray = None,
ax: plt.Axes = None,
figsize: tuple = (8, 5),
):
"""
Auxiliar function which call generic potplot to plot the POT usign the optimal threshold obtained
Parameters
----------
time : np.ndarray, default=None
Time of data
ax : plt.Axes, default=None
Axes
figsize : tuple, default=(8,5)
Figure size, by default (8, 5)
Returns
-------
fig : plt.Figure
Figure
ax : plt.Axes
Axes
"""
fig, ax = potplot(
self.data,
self.threshold,
time,
self.n0,
self.min_peak_distance,
self.sig_level,
ax,
figsize,
)
return fig, ax
[docs]
def potplot(
data: np.ndarray,
threshold: float = 0.0,
time: np.ndarray = None,
n0: int = 10,
min_peak_distance: int = 2,
sig_level: float = 0.05,
ax: plt.Axes = None,
figsize: tuple = (8, 5),
):
"""
Plot the POT for data given a threshold.
Parameters
----------
data : np.ndarray
Data
threshold : float, default=0.0
Threshold used to plot the peaks
time : np.ndarray, default=None
Time of data
n0 : int, default=10
Minimum number of data to compute the POT given a threshold
min_peak_distance : int, default=2
Minimum peak distance between POT (in index size)
sig_level : float, default=0.05
Significance level for Chi-squared test
ax : plt.Axes, default=None
Axes
figsize : tuple, default=(8,5)
Figure figsize
Returns
-------
fig : plt.Figure
Figure
ax : plt.Axes
Axes
"""
_, _, _, _, pks_idx, _ = pot(data, threshold, n0, min_peak_distance, sig_level)
if time is None:
time = np.arange(data.size)
if ax is None:
fig = plt.figure(figsize=figsize)
ax = fig.add_subplot()
# Plot complete series
ax.plot(time, data, ls="-", color="#5199FF", lw=0.25, zorder=10)
# Plot extremes
ax.scatter(
time[pks_idx],
data[pks_idx],
s=20,
lw=0.5,
edgecolor="w",
facecolor="#F85C50",
zorder=20,
)
ax.axhline(threshold, ls="--", lw=1, color="#FF756B", zorder=15)
ax.set_xlabel("Time")
return fig, ax
[docs]
def mrlp(
data: np.ndarray,
threshold: float = None,
conf_level: float = 0.95,
ax: plt.Axes = None,
figsize: tuple = (8, 5),
) -> plt.Axes:
"""
Plot mean residual life for given threshold value.
The mean residual life plot should be approximately linear above a threshold
for which the Generalized Pareto Distribution model is valid.
The strategy is to select the smallest threshold value immediately above
which the plot is approximately linear.
Parameters
----------
data : np.ndarray
Time series of the signal.
threshold : float, optional
An array of thresholds for which the mean residual life plot is plotted.
If None (default), starting in the 90th quantile
conf_level : float, default=0.95
Confidence interval width in the range (0, 1), by default it is 0.95.
If None, then confidence interval is not shown.
ax : matplotlib.axes._axes.Axes, optional
If provided, then the plot is drawn on this axes.
If None (default), new figure and axes are created
figsize : tuple, optional
Figure size in inches in format (width, height).
By default it is (8, 5).
Returns
-------
matplotlib.axes._axes.Axes
Axes object.
"""
if threshold is None:
threshold = np.nanquantile(data, q=0.9)
(
pks_unicos_valid,
excedencias_mean_valid,
excedencias_weight_valid,
pks,
locs,
autocorrelations,
) = pot(data, threshold)
if conf_level is not None:
interlow, interup = norm.interval(
0.95,
loc=excedencias_mean_valid,
scale=np.sqrt(1 / excedencias_weight_valid),
)
if ax is None:
fig = plt.figure(figsize=figsize)
ax = fig.add_subplot()
ax.grid(False)
# Plotting central estimates of mean re sidual life
ax.plot(
pks_unicos_valid,
excedencias_mean_valid,
color="#F85C50",
lw=2,
ls="-",
zorder=15,
)
if conf_level is not None:
ax.plot(pks_unicos_valid, interlow, color="#5199FF", lw=1, ls="--", zorder=10)
ax.plot(pks_unicos_valid, interup, color="#5199FF", lw=1, ls="--", zorder=10)
ax.fill_between(
pks_unicos_valid,
interlow,
interup,
facecolor="#5199FF",
edgecolor="None",
alpha=0.25,
zorder=5,
)
ax.set_xlabel("Threshold")
ax.set_ylabel("Mean excess")
return ax