from collections import OrderedDict
from datetime import datetime
from typing import List, Optional, Tuple
import numpy as np
import pandas as pd
import statsmodels.discrete.discrete_model as sm
import xarray as xr
[docs]
class AutoLogisticRegression:
"""
Auto-Regressive Logistic (ALR) model for categorical time series modeling.
This implementation focuses on statsmodels with AIC-based covariate selection.
The model simulates transitions between discrete states (weather types, climate
patterns, etc.) accounting for:
- Markov dependencies (previous states)
- Seasonality
- Long-term trends
- External covariates
Attributes
----------
cluster_size : int
Number of discrete states or clusters in the model
model : statsmodels.MNLogit
Fitted multinomial logistic regression model
terms_fit : dict
Dictionary of terms used for model fitting
terms_fit_names : list
Names of the terms used for model fitting
"""
def __init__(self, cluster_size: int):
"""
Initialize the ALR model.
Parameters
----------
cluster_size : int
Number of discrete states/clusters in the model
"""
self.cluster_size = cluster_size
self.model = None
self.terms_fit = {}
self.terms_fit_names = []
self.mk_order = 0
self.cov_names = []
[docs]
def get_year_fraction(self, time_array):
"""
Convert datetime array to fractional year representation.
Parameters
----------
time_array : array-like
Array of datetime objects
Returns
-------
np.ndarray
Array of year fractions (0-1 for position within the year)
"""
time_yfrac = np.zeros(len(time_array))
for i, dt in enumerate(time_array):
doy = dt.timetuple().tm_yday
year_length = 366 if dt.year % 4 == 0 else 365
time_yfrac[i] = dt.year + doy / year_length
return time_yfrac
[docs]
def generate_terms(
self,
bmus: np.ndarray,
time: np.ndarray,
mk_order: int = 0,
use_constant: bool = True,
use_long_term: bool = False,
seasonality: Tuple[bool, List[int]] = (False, []),
covariates: Optional[xr.Dataset] = None,
cov_seasonality: bool = False,
) -> Tuple[OrderedDict, List[str]]:
"""
Generate design matrix terms for the ALR model.
Parameters
----------
bmus : np.ndarray
Array of categorical states (1-indexed)
time : np.ndarray
Array of datetime objects
mk_order : int, optional
Markov chain order (number of previous states to include)
use_constant : bool, optional
Include intercept term
use_long_term : bool, optional
Include long-term trend
seasonality : tuple(bool, list), optional
Tuple of (use_seasonality, [phases]). Example: (True, [1, 2]) for annual
and semi-annual cycles
covariates : xr.Dataset, optional
Dataset with covariates. Should have dimensions (time, cov_names) and variable cov_values
cov_seasonality : bool, optional
Include seasonal modulation of covariates
Returns
-------
OrderedDict
Dictionary of design matrix terms
list
List of term names
"""
# Convert time to year fractions
time_yfrac = self.get_year_fraction(time)
# Initialize containers
terms = OrderedDict()
terms_names = []
# Constant term (intercept)
if use_constant:
terms["constant"] = np.ones((bmus.size, 1))
terms_names.append("intercept")
# Long-term trend
if use_long_term:
terms["long_term"] = np.ones((bmus.size, 1))
terms["long_term"][:, 0] = time_yfrac
terms_names.append("long_term")
# Seasonality terms (harmonic functions)
if seasonality[0]:
phases = seasonality[1]
temp_seas = np.zeros((len(time_yfrac), 2 * len(phases)))
for i, harmonic in enumerate(phases):
# Add cosine and sine components for each harmonic
temp_seas[:, 2 * i] = np.cos(harmonic * 2 * np.pi * (time_yfrac % 1))
temp_seas[:, 2 * i + 1] = np.sin(
harmonic * 2 * np.pi * (time_yfrac % 1)
)
terms_names.append(f"cos_{harmonic}")
terms_names.append(f"sin_{harmonic}")
terms["seasonality"] = temp_seas
# External covariates
if covariates is not None:
cov_values = covariates.cov_values.values
cov_names = covariates.cov_names.values
self.cov_names = cov_names
# Normalize covariates
cov_norm = (cov_values - cov_values.mean(axis=0)) / cov_values.std(axis=0)
# Add each covariate as a separate term
for i, name in enumerate(cov_names):
terms[name] = np.reshape(cov_norm[:, i], (cov_norm.shape[0], 1))
terms_names.append(name)
# Add seasonal modulation of covariates if requested
if cov_seasonality:
cos_term = terms[name] * np.cos(
2 * np.pi * (time_yfrac % 1)
).reshape(-1, 1)
sin_term = terms[name] * np.sin(
2 * np.pi * (time_yfrac % 1)
).reshape(-1, 1)
terms[f"{name}_cos"] = cos_term
terms[f"{name}_sin"] = sin_term
terms_names.append(f"{name}_cos")
terms_names.append(f"{name}_sin")
# Markov terms (previous states)
if mk_order > 0:
# Define Helmert contrasts for categorical variables
def helmert_coding(k):
"""Generate Helmert contrast matrix for k categories"""
H = np.zeros((k, k - 1))
for i in range(k - 1):
H[i, i] = (k - i - 1) / (k - i)
H[(i + 1) :, i] = -1.0 / (k - i)
return H
# Generate contrast matrix
contrast_matrix = helmert_coding(self.cluster_size)
# Add terms for each lag in the Markov order
for i in range(mk_order):
# Initialize terms for this lag
Z = np.zeros((bmus.size, self.cluster_size - 1))
# Set values using the contrast matrix
for j in range(bmus.size - i - 1):
bmu_idx = int(bmus[j]) - 1 # Convert 1-indexed to 0-indexed
Z[j + i + 1, :] = contrast_matrix[bmu_idx, :]
# Add to terms
terms[f"markov_{i + 1}"] = Z
# Add term names
for c in range(self.cluster_size - 1):
terms_names.append(f"mk{i + 1}_{c + 1}")
return terms, terms_names
[docs]
def fit(
self,
bmus: np.ndarray,
time: np.ndarray,
mk_order: int = 0,
use_constant: bool = True,
use_long_term: bool = False,
seasonality: Tuple[bool, List[int]] = (False, []),
covariates: Optional[xr.Dataset] = None,
cov_seasonality: bool = False,
max_iter: int = 1000,
) -> None:
"""
Fit the ALR model to data.
Parameters
----------
bmus : np.ndarray
Array of categorical states (1-indexed)
time : np.ndarray
Array of datetime objects
mk_order : int, optional
Markov chain order (number of previous states to include)
use_constant : bool, optional
Include intercept term
use_long_term : bool, optional
Include long-term trend
seasonality : tuple(bool, list), optional
Tuple of (use_seasonality, [phases])
covariates : xr.Dataset, optional
Dataset with covariates
cov_seasonality : bool, optional
Include seasonal modulation of covariates
max_iter : int, optional
Maximum iterations for model fitting
"""
self.mk_order = mk_order
# Generate design matrix terms
self.terms_fit, self.terms_fit_names = self.generate_terms(
bmus=bmus,
time=time,
mk_order=mk_order,
use_constant=use_constant,
use_long_term=use_long_term,
seasonality=seasonality,
covariates=covariates,
cov_seasonality=cov_seasonality,
)
# Combine terms into design matrix
X = np.concatenate(list(self.terms_fit.values()), axis=1)
y = bmus
# Convert to pandas for statsmodels
X_df = pd.DataFrame(X, columns=self.terms_fit_names)
y_df = pd.DataFrame(y, columns=["bmus"])
print(
f"Fitting multinomial logistic regression with {len(self.terms_fit_names)} terms..."
)
# Fit the model
self.model = sm.MNLogit(y_df, X_df).fit(
method="lbfgs", maxiter=max_iter, disp=True
)
print("Model fitting complete.")
[docs]
def select_covariates_by_aic(
self,
bmus: np.ndarray,
time: np.ndarray,
covariates: xr.Dataset,
base_config: dict = None,
) -> Tuple[List[str], float]:
"""
Select optimal covariates using AIC criterion.
Parameters
----------
bmus : np.ndarray
Array of categorical states (1-indexed)
time : np.ndarray
Array of datetime objects
covariates : xr.Dataset
Dataset with all potential covariates
base_config : dict, optional
Base configuration for other terms (mk_order, seasonality, etc.)
Returns
-------
list
List of selected covariate names
float
AIC of the best model
"""
# Default base configuration
if base_config is None:
base_config = {
"mk_order": 1,
"use_constant": True,
"use_long_term": False,
"seasonality": (True, [1]),
"cov_seasonality": False,
}
cov_names = covariates.cov_names.values
n_covs = len(cov_names)
print(f"Starting covariate selection with {n_covs} potential covariates...")
# Initialize
best_aic = float("inf")
best_covs = []
all_results = []
# Test each covariate individually first
for i, cov in enumerate(cov_names):
# Create subset of covariates with just this one
cov_subset = xr.Dataset(
data_vars={
"cov_values": (
("time", "cov_names"),
covariates.cov_values.values[:, i : i + 1],
),
},
coords={"time": covariates.time, "cov_names": [cov]},
)
# Fit model
try:
self.fit(
bmus=bmus,
time=time,
mk_order=base_config["mk_order"],
use_constant=base_config["use_constant"],
use_long_term=base_config["use_long_term"],
seasonality=base_config["seasonality"],
covariates=cov_subset,
cov_seasonality=base_config["cov_seasonality"],
)
# Get AIC
aic = self.model.aic
all_results.append((aic, [cov]))
print(f"Covariate {cov}: AIC = {aic:.2f}")
# Update best model if improved
if aic < best_aic:
best_aic = aic
best_covs = [cov]
except Exception as e:
print(f"Error fitting model with covariate {cov}: {str(e)}")
# Forward selection - iteratively add variables
current_covs = best_covs.copy()
current_aic = best_aic
improved = True
while improved and len(current_covs) < n_covs:
improved = False
best_new_aic = float("inf")
best_new_cov = None
# Try adding each remaining covariate
for cov in cov_names:
if cov not in current_covs:
test_covs = current_covs + [cov]
# Create subset of covariates
cov_indices = [list(cov_names).index(c) for c in test_covs]
cov_subset = xr.Dataset(
data_vars={
"cov_values": (
("time", "cov_names"),
covariates.cov_values.values[:, cov_indices],
),
},
coords={"time": covariates.time, "cov_names": test_covs},
)
# Fit model
try:
self.fit(
bmus=bmus,
time=time,
mk_order=base_config["mk_order"],
use_constant=base_config["use_constant"],
use_long_term=base_config["use_long_term"],
seasonality=base_config["seasonality"],
covariates=cov_subset,
cov_seasonality=base_config["cov_seasonality"],
)
# Get AIC
aic = self.model.aic
all_results.append((aic, test_covs))
print(f"Covariates {test_covs}: AIC = {aic:.2f}")
# Update best candidate
if aic < best_new_aic:
best_new_aic = aic
best_new_cov = cov
except Exception as e:
print(
f"Error fitting model with covariates {test_covs}: {str(e)}"
)
# Check if adding the best new covariate improves AIC
if best_new_aic < current_aic:
improved = True
current_aic = best_new_aic
current_covs.append(best_new_cov)
# Update global best if improved
if current_aic < best_aic:
best_aic = current_aic
best_covs = current_covs.copy()
print(f"Added covariate {best_new_cov}, new AIC = {current_aic:.2f}")
# Final model with best covariates
if best_covs:
print(f"\nBest model has AIC = {best_aic:.2f} with covariates: {best_covs}")
# Create final covariate subset
cov_indices = [list(cov_names).index(c) for c in best_covs]
best_cov_subset = xr.Dataset(
data_vars={
"cov_values": (
("time", "cov_names"),
covariates.cov_values.values[:, cov_indices],
),
},
coords={"time": covariates.time, "cov_names": best_covs},
)
# Fit final model
self.fit(
bmus=bmus,
time=time,
mk_order=base_config["mk_order"],
use_constant=base_config["use_constant"],
use_long_term=base_config["use_long_term"],
seasonality=base_config["seasonality"],
covariates=best_cov_subset,
cov_seasonality=base_config["cov_seasonality"],
)
return best_covs, best_aic
[docs]
def simulate(
self,
time_sim: np.ndarray,
num_sims: int = 1,
covariates_sim: Optional[xr.Dataset] = None,
) -> np.ndarray:
"""
Generate simulations from the fitted ALR model.
Parameters
----------
time_sim : np.ndarray
Array of datetime objects for simulation
num_sims : int, optional
Number of simulations to generate
covariates_sim : xr.Dataset, optional
Covariates for simulation period
Returns
-------
np.ndarray
Array of simulated states with shape (time, num_sims)
"""
if self.model is None:
raise ValueError("Model must be fitted before simulation")
# Get basic parameters
mk_order = self.mk_order
# Initialize output array
sim_bmus = np.zeros((len(time_sim), num_sims))
# Initial values (random start)
for n in range(num_sims):
# Initialize with random values
init_bmus = np.random.randint(1, self.cluster_size + 1, mk_order)
sim_bmus[:mk_order, n] = init_bmus
# Main simulation loop
print(f"Simulating {num_sims} time series...")
for n in range(num_sims):
for i in range(mk_order, len(time_sim)):
# Extract current simulation history
hist_bmus = sim_bmus[i - mk_order : i, n]
# Generate terms for this step
terms_i, _ = self.generate_terms(
bmus=np.append(hist_bmus, 0),
time=time_sim[i - mk_order : i + 1],
mk_order=mk_order,
use_constant=True, # Assuming constant is always used
use_long_term="long_term" in self.terms_fit,
seasonality=(
"seasonality" in self.terms_fit,
[1] if "seasonality" in self.terms_fit else [],
),
covariates=covariates_sim.sel(time=time_sim[i : i + 1])
if covariates_sim is not None
else None,
cov_seasonality=any(
k.endswith("_cos") for k in self.terms_fit.keys()
),
)
# Prepare prediction matrix
X = np.concatenate(list(terms_i.values()), axis=1)
# Get transition probabilities
probs = self.model.predict(X)
cum_probs = np.cumsum(probs[-1, :])
# Generate random state based on probabilities
rnd_val = np.random.rand()
new_bmu = np.where(cum_probs > rnd_val)[0][0] + 1
# Store result
sim_bmus[i, n] = new_bmu
return sim_bmus
[docs]
def get_summary(self) -> pd.DataFrame:
"""
Get model summary statistics.
Returns
-------
pd.DataFrame
DataFrame with parameter estimates, p-values, etc.
"""
if self.model is None:
raise ValueError("Model must be fitted first")
# Extract key information
params = self.model.params.transpose()
pvalues = self.model.pvalues.transpose()
conf_int = self.model.conf_int()
# Organize into a DataFrame
summary = pd.DataFrame()
# for cluster in range(2, self.cluster_size + 1):
# cluster_params = params.iloc[cluster - 1].reset_index()
# cluster_params.columns = ["term", "coefficient"]
# cluster_pvals = pvalues.iloc[cluster - 1].reset_index()
# cluster_pvals.columns = ["term", "p_value"]
# # Get confidence intervals
# ci_lower = conf_int.xs(cluster - 1, level=1)[0].reset_index()
# ci_upper = conf_int.xs(cluster - 1, level=1)[1].reset_index()
# ci_lower.columns = ["term", "ci_lower"]
# ci_upper.columns = ["term", "ci_upper"]
# # Combine all info
# cluster_info = cluster_params.merge(cluster_pvals, on="term")
# cluster_info = cluster_info.merge(ci_lower, on="term")
# cluster_info = cluster_info.merge(ci_upper, on="term")
# # Add cluster identifier
# cluster_info["cluster"] = cluster
# # Add significance stars
# cluster_info["significance"] = ""
# cluster_info.loc[cluster_info["p_value"] < 0.05, "significance"] = "*"
# cluster_info.loc[cluster_info["p_value"] < 0.01, "significance"] = "**"
# cluster_info.loc[cluster_info["p_value"] < 0.001, "significance"] = "***"
# # Append to summary
# summary = pd.concat([summary, cluster_info])
# Add model metrics
print("Model Information:")
print(f"AIC: {self.model.aic:.2f}")
print(f"BIC: {self.model.bic:.2f}")
print(f"Log-Likelihood: {self.model.llf:.2f}")
print(f"Pseudo R-squared: {self.model.prsquared:.4f}")
# return summary.sort_values(["cluster", "term"]).reset_index(drop=True)
if __name__ == "__main__":
from datetime import datetime, timedelta
import numpy as np
import pandas as pd
import xarray as xr
# Create sample data
np.random.seed(42)
# Create time array
start_date = datetime(2000, 1, 1)
days = 1000
time_array = np.array([start_date + timedelta(days=i) for i in range(days)])
# Generate weather states (1-6)
bmus = np.random.randint(1, 7, size=days)
# Add some seasonality to states
for i in range(days):
day_of_year = time_array[i].timetuple().tm_yday
if day_of_year < 100: # Winter
bmus[i] = np.random.choice([1, 2, 3], p=[0.6, 0.3, 0.1])
elif day_of_year > 250: # Fall
bmus[i] = np.random.choice([4, 5, 6], p=[0.5, 0.3, 0.2])
# Create some covariates
cov1 = np.sin(np.linspace(0, 10 * np.pi, days)) + np.random.normal(
0, 0.2, days
) # Oscillating
cov2 = np.linspace(-1, 1, days) + np.random.normal(0, 0.1, days) # Trend
cov3 = np.random.normal(0, 1, days) # Random noise
# Create dataset with covariates
covariates = xr.Dataset(
data_vars={
"cov_values": (("time", "cov_names"), np.column_stack([cov1, cov2, cov3])),
},
coords={"time": time_array, "cov_names": ["oscillation", "trend", "noise"]},
)
# Initialize and fit the ALR model
alr_model = AutoLogisticRegression(cluster_size=6)
# Select best covariates using AIC
best_covs, best_aic = alr_model.select_covariates_by_aic(
bmus=bmus,
time=time_array,
covariates=covariates,
base_config={
"mk_order": 1,
"use_constant": True,
"use_long_term": True,
"seasonality": (True, [1, 2]),
"cov_seasonality": True,
},
)
# Get model summary
summary = alr_model.get_summary()
print("\nModel Parameter Summary:")
print(summary)
# Generate future simulation
future_days = 100
future_time = np.array(
[start_date + timedelta(days=days + i) for i in range(future_days)]
)
# Create future covariates
future_cov1 = np.sin(
np.linspace(10 * np.pi, 11 * np.pi, future_days)
) + np.random.normal(0, 0.2, future_days)
future_cov2 = np.linspace(1, 1.2, future_days) + np.random.normal(
0, 0.1, future_days
)
future_cov3 = np.random.normal(0, 1, future_days)
# Create dataset with future covariates (using only selected covariates)
cov_indices = [list(covariates.cov_names.values).index(c) for c in best_covs]
future_covs = np.column_stack(
[[future_cov1, future_cov2, future_cov3][i] for i in cov_indices]
)
future_covariates = xr.Dataset(
data_vars={
"cov_values": (("time", "cov_names"), future_covs),
},
coords={"time": future_time, "cov_names": best_covs},
)
# Generate simulations
simulations = alr_model.simulate(
time_sim=future_time, num_sims=5, covariates_sim=future_covariates
)
print("\nSimulation Results (first 10 time steps):")
print(simulations[:10, :])