| | """Time-series diagnostics utilities. |
| | |
| | Provides summary statistics, stationarity tests, trend estimation, |
| | autocorrelation analysis, seasonal decomposition, rolling statistics, |
| | year-over-year change computation, and multi-series summaries. |
| | """ |
| |
|
| | from dataclasses import dataclass |
| | from typing import Optional |
| |
|
| | import numpy as np |
| | import pandas as pd |
| | from numpy.typing import NDArray |
| | from scipy import stats |
| | from statsmodels.tsa.stattools import adfuller, acf, pacf |
| | from statsmodels.tsa.seasonal import seasonal_decompose, DecomposeResult |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | @dataclass |
| | class SummaryStats: |
| | """Container for univariate time-series summary statistics.""" |
| |
|
| | count: int |
| | missing_count: int |
| | missing_pct: float |
| | min_val: float |
| | max_val: float |
| | mean_val: float |
| | median_val: float |
| | std_val: float |
| | p25: float |
| | p75: float |
| | date_start: pd.Timestamp |
| | date_end: pd.Timestamp |
| | date_span_days: int |
| | trend_slope: float |
| | trend_pvalue: float |
| | adf_statistic: float |
| | adf_pvalue: float |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def compute_adf_test(series: pd.Series) -> tuple[float, float]: |
| | """Run the Augmented Dickey-Fuller test for stationarity. |
| | |
| | Parameters |
| | ---------- |
| | series : pd.Series |
| | The time-series values (NaNs are dropped automatically). |
| | |
| | Returns |
| | ------- |
| | tuple[float, float] |
| | ``(adf_statistic, p_value)``. Returns ``(np.nan, np.nan)`` when the |
| | test cannot be performed (e.g. too few observations or constant data). |
| | """ |
| | clean = series.dropna() |
| | if len(clean) < 2: |
| | return np.nan, np.nan |
| | try: |
| | result = adfuller(clean, autolag="AIC") |
| | return float(result[0]), float(result[1]) |
| | except Exception: |
| | return np.nan, np.nan |
| |
|
| |
|
| | def compute_trend_slope( |
| | df: pd.DataFrame, |
| | date_col: str, |
| | y_col: str, |
| | ) -> tuple[float, float]: |
| | """Estimate a linear trend via OLS on a numeric index. |
| | |
| | Parameters |
| | ---------- |
| | df : pd.DataFrame |
| | Must contain *date_col* and *y_col*. |
| | date_col : str |
| | Column with datetime-like values. |
| | y_col : str |
| | Column with numeric values. |
| | |
| | Returns |
| | ------- |
| | tuple[float, float] |
| | ``(slope, p_value)`` from ``scipy.stats.linregress``. |
| | Returns ``(np.nan, np.nan)`` when the regression cannot be computed. |
| | """ |
| | subset = df[[date_col, y_col]].dropna() |
| | if len(subset) < 2: |
| | return np.nan, np.nan |
| | try: |
| | x = np.arange(len(subset), dtype=float) |
| | y = subset[y_col].astype(float).values |
| | result = stats.linregress(x, y) |
| | return float(result.slope), float(result.pvalue) |
| | except Exception: |
| | return np.nan, np.nan |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def compute_summary_stats( |
| | df: pd.DataFrame, |
| | date_col: str, |
| | y_col: str, |
| | ) -> SummaryStats: |
| | """Compute a comprehensive set of summary statistics for a time series. |
| | |
| | Parameters |
| | ---------- |
| | df : pd.DataFrame |
| | Source data. |
| | date_col : str |
| | Name of the datetime column. |
| | y_col : str |
| | Name of the numeric value column. |
| | |
| | Returns |
| | ------- |
| | SummaryStats |
| | Dataclass instance containing descriptive stats, date range info, |
| | trend slope / p-value, and ADF test results. |
| | """ |
| | series = df[y_col] |
| | dates = pd.to_datetime(df[date_col]) |
| |
|
| | count = int(series.notna().sum()) |
| | missing_count = int(series.isna().sum()) |
| | total = len(series) |
| | missing_pct = (missing_count / total * 100.0) if total > 0 else 0.0 |
| |
|
| | min_val = float(series.min()) |
| | max_val = float(series.max()) |
| | mean_val = float(series.mean()) |
| | median_val = float(series.median()) |
| | std_val = float(series.std()) |
| | p25 = float(series.quantile(0.25)) |
| | p75 = float(series.quantile(0.75)) |
| |
|
| | date_start = dates.min() |
| | date_end = dates.max() |
| | date_span_days = int((date_end - date_start).days) |
| |
|
| | trend_slope, trend_pvalue = compute_trend_slope(df, date_col, y_col) |
| | adf_statistic, adf_pvalue = compute_adf_test(series) |
| |
|
| | return SummaryStats( |
| | count=count, |
| | missing_count=missing_count, |
| | missing_pct=missing_pct, |
| | min_val=min_val, |
| | max_val=max_val, |
| | mean_val=mean_val, |
| | median_val=median_val, |
| | std_val=std_val, |
| | p25=p25, |
| | p75=p75, |
| | date_start=date_start, |
| | date_end=date_end, |
| | date_span_days=date_span_days, |
| | trend_slope=trend_slope, |
| | trend_pvalue=trend_pvalue, |
| | adf_statistic=adf_statistic, |
| | adf_pvalue=adf_pvalue, |
| | ) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def compute_acf_pacf( |
| | series: pd.Series, |
| | nlags: int = 40, |
| | ) -> tuple[NDArray, NDArray, NDArray, NDArray]: |
| | """Compute ACF and PACF with confidence intervals. |
| | |
| | Parameters |
| | ---------- |
| | series : pd.Series |
| | The time-series values (NaNs are dropped automatically). |
| | nlags : int, optional |
| | Maximum number of lags (default 40). Automatically reduced when the |
| | series is shorter than ``nlags + 1``. |
| | |
| | Returns |
| | ------- |
| | tuple[ndarray, ndarray, ndarray, ndarray] |
| | ``(acf_values, acf_confint, pacf_values, pacf_confint)`` |
| | |
| | * ``acf_values`` -- shape ``(nlags + 1,)`` |
| | * ``acf_confint`` -- shape ``(nlags + 1, 2)`` |
| | * ``pacf_values`` -- shape ``(nlags + 1,)`` |
| | * ``pacf_confint`` -- shape ``(nlags + 1, 2)`` |
| | """ |
| | clean = series.dropna().values.astype(float) |
| |
|
| | |
| | max_possible = len(clean) - 1 |
| | if max_possible < 1: |
| | raise ValueError( |
| | "Series has fewer than 2 non-NaN observations; " |
| | "cannot compute ACF/PACF." |
| | ) |
| | nlags = min(nlags, max_possible) |
| |
|
| | acf_values, acf_confint = acf(clean, nlags=nlags, alpha=0.05) |
| | pacf_values, pacf_confint = pacf(clean, nlags=nlags, alpha=0.05) |
| |
|
| | return acf_values, acf_confint, pacf_values, pacf_confint |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def _infer_period(df: pd.DataFrame, date_col: str) -> int: |
| | """Best-effort period inference from the date column's frequency. |
| | |
| | Returns a sensible integer period or raises ``ValueError`` when the |
| | frequency cannot be determined. |
| | """ |
| | dates = pd.to_datetime(df[date_col]) |
| | freq = pd.infer_freq(dates) |
| | if freq is None: |
| | raise ValueError( |
| | "Cannot infer a regular frequency from the date column. " |
| | "Please supply an explicit 'period' argument or resample the " |
| | "data to a regular frequency before calling compute_decomposition." |
| | ) |
| |
|
| | |
| | freq_upper = freq.upper() |
| | period_map: dict[str, int] = { |
| | "D": 365, |
| | "B": 252, |
| | "W": 52, |
| | "SM": 24, |
| | "BMS": 12, |
| | "BM": 12, |
| | "MS": 12, |
| | "M": 12, |
| | "ME": 12, |
| | "QS": 4, |
| | "Q": 4, |
| | "QE": 4, |
| | "BQ": 4, |
| | "AS": 1, |
| | "A": 1, |
| | "YS": 1, |
| | "Y": 1, |
| | "YE": 1, |
| | "H": 24, |
| | "T": 60, |
| | "MIN": 60, |
| | "S": 60, |
| | } |
| |
|
| | |
| | stripped = freq_upper.lstrip("0123456789") |
| | |
| | base = stripped.split("-")[0] |
| |
|
| | if base in period_map: |
| | return period_map[base] |
| |
|
| | raise ValueError( |
| | f"Unable to map inferred frequency '{freq}' to a seasonal period. " |
| | "Please provide an explicit 'period' argument." |
| | ) |
| |
|
| |
|
| | def compute_decomposition( |
| | df: pd.DataFrame, |
| | date_col: str, |
| | y_col: str, |
| | model: str = "additive", |
| | period: Optional[int] = None, |
| | ) -> DecomposeResult: |
| | """Decompose a time series into trend, seasonal, and residual components. |
| | |
| | Parameters |
| | ---------- |
| | df : pd.DataFrame |
| | Source data. |
| | date_col : str |
| | Datetime column name. |
| | y_col : str |
| | Numeric value column name. |
| | model : str, optional |
| | ``"additive"`` (default) or ``"multiplicative"``. |
| | period : int or None, optional |
| | Seasonal period. When *None* the period is inferred from the date |
| | column's frequency. |
| | |
| | Returns |
| | ------- |
| | statsmodels.tsa.seasonal.DecomposeResult |
| | |
| | Raises |
| | ------ |
| | ValueError |
| | If a regular frequency cannot be inferred and *period* is not given. |
| | """ |
| | ts = ( |
| | df[[date_col, y_col]] |
| | .copy() |
| | .set_index(date_col) |
| | .sort_index() |
| | ) |
| | ts.index = pd.to_datetime(ts.index) |
| |
|
| | |
| | |
| | ts[y_col] = ts[y_col].ffill().bfill() |
| |
|
| | if period is None: |
| | period = _infer_period(df, date_col) |
| |
|
| | |
| | |
| | if ts.index.freq is None: |
| | inferred = pd.infer_freq(ts.index) |
| | if inferred is not None: |
| | ts = ts.asfreq(inferred) |
| | ts[y_col] = ts[y_col].ffill().bfill() |
| |
|
| | return seasonal_decompose(ts[y_col], model=model, period=period) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def compute_rolling_stats( |
| | df: pd.DataFrame, |
| | y_col: str, |
| | window: int = 12, |
| | ) -> pd.DataFrame: |
| | """Add rolling mean and rolling standard deviation columns to *df*. |
| | |
| | Parameters |
| | ---------- |
| | df : pd.DataFrame |
| | Source data (not mutated). |
| | y_col : str |
| | Column over which rolling statistics are calculated. |
| | window : int, optional |
| | Rolling window size (default 12). |
| | |
| | Returns |
| | ------- |
| | pd.DataFrame |
| | Copy of *df* with two extra columns: ``rolling_mean`` and |
| | ``rolling_std``. |
| | """ |
| | out = df.copy() |
| | out["rolling_mean"] = out[y_col].rolling(window=window, min_periods=1).mean() |
| | out["rolling_std"] = out[y_col].rolling(window=window, min_periods=1).std() |
| | return out |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def _offset_for_frequency(df: pd.DataFrame, date_col: str) -> pd.DateOffset: |
| | """Return a 1-year ``DateOffset`` appropriate to the series frequency.""" |
| | dates = pd.to_datetime(df[date_col]) |
| | freq = pd.infer_freq(dates) |
| |
|
| | if freq is not None: |
| | freq_upper = freq.upper().lstrip("0123456789").split("-")[0] |
| | |
| | if freq_upper in {"D", "B"}: |
| | return pd.DateOffset(days=365) |
| | if freq_upper in {"W"}: |
| | return pd.DateOffset(weeks=52) |
| | if freq_upper in {"H", "T", "MIN", "S"}: |
| | return pd.DateOffset(days=365) |
| |
|
| | |
| | return pd.DateOffset(months=12) |
| |
|
| |
|
| | def compute_yoy_change( |
| | df: pd.DataFrame, |
| | date_col: str, |
| | y_col: str, |
| | ) -> pd.DataFrame: |
| | """Compute year-over-year absolute and percentage change. |
| | |
| | The number of periods to shift is determined from the inferred frequency |
| | of the date column. |
| | |
| | Parameters |
| | ---------- |
| | df : pd.DataFrame |
| | Source data (not mutated). |
| | date_col : str |
| | Datetime column name. |
| | y_col : str |
| | Numeric value column name. |
| | |
| | Returns |
| | ------- |
| | pd.DataFrame |
| | Copy of *df* sorted by *date_col* with additional columns |
| | ``yoy_abs_change`` and ``yoy_pct_change``. |
| | """ |
| | out = df.copy().sort_values(date_col).reset_index(drop=True) |
| | out[date_col] = pd.to_datetime(out[date_col]) |
| |
|
| | |
| | freq = pd.infer_freq(out[date_col]) |
| | if freq is not None: |
| | freq_upper = freq.upper().lstrip("0123456789").split("-")[0] |
| | period_map: dict[str, int] = { |
| | "D": 365, |
| | "B": 252, |
| | "W": 52, |
| | "SM": 24, |
| | "BMS": 12, |
| | "BM": 12, |
| | "MS": 12, |
| | "M": 12, |
| | "ME": 12, |
| | "QS": 4, |
| | "Q": 4, |
| | "QE": 4, |
| | "BQ": 4, |
| | "AS": 1, |
| | "A": 1, |
| | "YS": 1, |
| | "Y": 1, |
| | "YE": 1, |
| | "H": 8760, |
| | "T": 525600, |
| | "MIN": 525600, |
| | "S": 31536000, |
| | } |
| | base = freq_upper |
| | shift_periods = period_map.get(base, 12) |
| | else: |
| | |
| | shift_periods = 12 |
| |
|
| | shifted = out[y_col].shift(shift_periods) |
| | out["yoy_abs_change"] = out[y_col] - shifted |
| | out["yoy_pct_change"] = out["yoy_abs_change"] / shifted.abs().replace(0, np.nan) * 100.0 |
| |
|
| | return out |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def compute_multi_series_summary( |
| | df: pd.DataFrame, |
| | date_col: str, |
| | y_cols: list[str], |
| | ) -> pd.DataFrame: |
| | """Produce a summary DataFrame with one row per value column. |
| | |
| | Parameters |
| | ---------- |
| | df : pd.DataFrame |
| | Source data. |
| | date_col : str |
| | Datetime column name. |
| | y_cols : list[str] |
| | List of numeric column names to summarise. |
| | |
| | Returns |
| | ------- |
| | pd.DataFrame |
| | Columns: ``variable``, ``count``, ``mean``, ``std``, ``min``, |
| | ``max``, ``trend_slope``, ``adf_pvalue``. |
| | """ |
| | rows: list[dict] = [] |
| | for col in y_cols: |
| | series = df[col] |
| | slope, _ = compute_trend_slope(df, date_col, col) |
| | _, adf_p = compute_adf_test(series) |
| | rows.append( |
| | { |
| | "variable": col, |
| | "count": int(series.notna().sum()), |
| | "mean": float(series.mean()), |
| | "std": float(series.std()), |
| | "min": float(series.min()), |
| | "max": float(series.max()), |
| | "trend_slope": slope, |
| | "adf_pvalue": adf_p, |
| | } |
| | ) |
| |
|
| | return pd.DataFrame(rows) |
| |
|