Source code for design_research_analysis.stats

"""Statistical analysis utilities for unified design-research workflows."""

from __future__ import annotations

import math
from collections import defaultdict
from collections.abc import Mapping, Sequence
from dataclasses import dataclass, field
from itertools import combinations
from typing import TYPE_CHECKING, Any, cast

import numpy as np

from ._comparison import ComparableResultMixin
from .table import UnifiedTableConfig, coerce_unified_table

if TYPE_CHECKING:
    import pandas as pd

_STATS_IMPORT_ERROR = (
    "This statistical method requires optional dependencies. "
    "Install with `pip install design-research-analysis[stats]`."
)
_POWER_IMPORT_ERROR = (
    "Power analysis requires optional dependencies. "
    "Install with `pip install design-research-analysis[stats]`."
)
_SUPPORTED_TESTS = {"one_sample_t", "paired_t", "two_sample_t"}
_SUPPORTED_ALTERNATIVES = {"two-sided", "larger", "smaller"}
_SUPPORTED_PERMUTATION_ALTERNATIVES = {"two-sided", "greater", "less"}
_ANALYSIS_TABLE_CONFIG = UnifiedTableConfig(
    required_columns=(),
    recommended_columns=(),
    optional_columns=(),
    parse_timestamps=False,
    sort_by_timestamp=False,
)
_DEFAULT_EXACT_PERMUTATION_THRESHOLD = 250_000
_DEFAULT_SAMPLED_PERMUTATIONS = 20_000


@dataclass(slots=True)
class GroupComparisonResult(ComparableResultMixin):
    """Result container for group-comparison analyses."""

    method: str
    statistic: float
    p_value: float
    effect_size: float
    group_means: dict[str, float]
    group_sizes: dict[str, int]
    config: dict[str, Any] = field(default_factory=dict)

    def to_dict(self) -> dict[str, Any]:
        """Convert to a JSON-serializable dictionary."""
        return {
            "method": self.method,
            "statistic": float(self.statistic),
            "p_value": float(self.p_value),
            "effect_size": float(self.effect_size),
            "group_means": {k: float(v) for k, v in self.group_means.items()},
            "group_sizes": {k: int(v) for k, v in self.group_sizes.items()},
            "config": dict(self.config),
        }

    def _comparison_metric(self) -> str:
        return "group_summary"

    def _comparison_vectors(
        self,
        other: GroupComparisonResult,
    ) -> tuple[np.ndarray, np.ndarray, dict[str, Any]]:
        labels = sorted(set(self.group_means) | set(other.group_means))
        left_means = np.asarray(
            [self.group_means.get(label, 0.0) for label in labels],
            dtype=float,
        )
        right_means = np.asarray(
            [other.group_means.get(label, 0.0) for label in labels],
            dtype=float,
        )
        left_sizes = np.asarray(
            [self.group_sizes.get(label, 0) for label in labels],
            dtype=float,
        )
        right_sizes = np.asarray(
            [other.group_sizes.get(label, 0) for label in labels],
            dtype=float,
        )
        left_vector = np.concatenate(
            [left_means, left_sizes, np.asarray([self.statistic, self.effect_size], dtype=float)]
        )
        right_vector = np.concatenate(
            [
                right_means,
                right_sizes,
                np.asarray([other.statistic, other.effect_size], dtype=float),
            ]
        )
        return (
            left_vector,
            right_vector,
            {
                "group_labels": labels,
                "methods": [self.method, other.method],
            },
        )


@dataclass(slots=True)
class RegressionResult(ComparableResultMixin):
    """Result container for ordinary least squares regression."""

    coefficients: dict[str, float]
    intercept: float
    r2: float
    mse: float
    n_samples: int
    n_features: int
    config: dict[str, Any] = field(default_factory=dict)

    def to_dict(self) -> dict[str, Any]:
        """Convert to a JSON-serializable dictionary."""
        return {
            "coefficients": {k: float(v) for k, v in self.coefficients.items()},
            "intercept": float(self.intercept),
            "r2": float(self.r2),
            "mse": float(self.mse),
            "n_samples": int(self.n_samples),
            "n_features": int(self.n_features),
            "config": dict(self.config),
        }

    def _comparison_metric(self) -> str:
        return "regression_profile"

    def _comparison_vectors(
        self,
        other: RegressionResult,
    ) -> tuple[np.ndarray, np.ndarray, dict[str, Any]]:
        names = sorted(set(self.coefficients) | set(other.coefficients))
        left_coeffs = np.asarray(
            [self.coefficients.get(name, 0.0) for name in names],
            dtype=float,
        )
        right_coeffs = np.asarray(
            [other.coefficients.get(name, 0.0) for name in names],
            dtype=float,
        )
        left_vector = np.concatenate(
            [
                np.asarray([self.intercept], dtype=float),
                left_coeffs,
                np.asarray([self.r2, self.mse]),
            ]
        )
        right_vector = np.concatenate(
            [
                np.asarray([other.intercept], dtype=float),
                right_coeffs,
                np.asarray([other.r2, other.mse]),
            ]
        )
        return left_vector, right_vector, {"feature_names": names}


@dataclass(slots=True)
class MixedEffectsResult(ComparableResultMixin):
    """Result container for mixed-effects model fitting."""

    success: bool
    backend: str
    formula: str
    group_column: str
    params: dict[str, float]
    aic: float | None
    bic: float | None
    log_likelihood: float | None
    message: str = ""
    config: dict[str, Any] = field(default_factory=dict)

    def to_dict(self) -> dict[str, Any]:
        """Convert to a JSON-serializable dictionary."""
        return {
            "success": bool(self.success),
            "backend": self.backend,
            "formula": self.formula,
            "group_column": self.group_column,
            "params": {k: float(v) for k, v in self.params.items()},
            "aic": None if self.aic is None else float(self.aic),
            "bic": None if self.bic is None else float(self.bic),
            "log_likelihood": None if self.log_likelihood is None else float(self.log_likelihood),
            "message": self.message,
            "config": dict(self.config),
        }

    def _comparison_metric(self) -> str:
        return "mixed_effects_profile"

    def _comparison_vectors(
        self,
        other: MixedEffectsResult,
    ) -> tuple[np.ndarray, np.ndarray, dict[str, Any]]:
        names = sorted(set(self.params) | set(other.params))
        left_params = np.asarray([self.params.get(name, 0.0) for name in names], dtype=float)
        right_params = np.asarray([other.params.get(name, 0.0) for name in names], dtype=float)
        left_tail = np.asarray(
            [
                float(self.success),
                0.0 if self.aic is None else self.aic,
                0.0 if self.bic is None else self.bic,
                0.0 if self.log_likelihood is None else self.log_likelihood,
            ],
            dtype=float,
        )
        right_tail = np.asarray(
            [
                float(other.success),
                0.0 if other.aic is None else other.aic,
                0.0 if other.bic is None else other.bic,
                0.0 if other.log_likelihood is None else other.log_likelihood,
            ],
            dtype=float,
        )
        return (
            np.concatenate([left_params, left_tail]),
            np.concatenate([right_params, right_tail]),
            {
                "formulae": [self.formula, other.formula],
                "group_columns": [self.group_column, other.group_column],
                "parameter_names": names,
            },
        )


@dataclass(frozen=True, slots=True)
class ConditionPairComparison:
    """Structured result for one pairwise condition comparison."""

    metric: str
    left_condition: str
    right_condition: str
    mean_left: float
    mean_right: float
    n_left: int
    n_right: int
    mean_difference: float
    effect_size: float
    p_value: float
    alternative: str
    test_method: str
    permutations_evaluated: int
    total_permutations: int
    higher_condition: str | None
    significant: bool

    @property
    def pair_label(self) -> str:
        """Return a stable display label for this pair."""
        return f"{self.left_condition} vs {self.right_condition}"

    @property
    def test_name(self) -> str:
        """Return the test label expected by reporting helpers."""
        if self.test_method == "exact":
            return "exact_permutation_test"
        return "sampled_permutation_test"

    def to_dict(self) -> dict[str, Any]:
        """Convert to a JSON-serializable dictionary."""
        return {
            "metric": self.metric,
            "left_condition": self.left_condition,
            "right_condition": self.right_condition,
            "pair_label": self.pair_label,
            "mean_left": float(self.mean_left),
            "mean_right": float(self.mean_right),
            "n_left": int(self.n_left),
            "n_right": int(self.n_right),
            "mean_difference": float(self.mean_difference),
            "effect_size": float(self.effect_size),
            "p_value": float(self.p_value),
            "alternative": self.alternative,
            "test_method": self.test_method,
            "test_name": self.test_name,
            "permutations_evaluated": int(self.permutations_evaluated),
            "total_permutations": int(self.total_permutations),
            "higher_condition": self.higher_condition,
            "significant": bool(self.significant),
        }


@dataclass(frozen=True, slots=True)
class ConditionComparisonReport:
    """Structured report for a set of pairwise condition comparisons."""

    metric: str
    condition_column: str
    metric_column: str
    alternative: str
    alpha: float
    comparisons: tuple[ConditionPairComparison, ...]
    config: dict[str, Any] = field(default_factory=dict)

    def to_dict(self) -> dict[str, Any]:
        """Convert to a JSON-serializable dictionary."""
        return {
            "metric": self.metric,
            "condition_column": self.condition_column,
            "metric_column": self.metric_column,
            "alternative": self.alternative,
            "alpha": float(self.alpha),
            "comparisons": [comparison.to_dict() for comparison in self.comparisons],
            "config": dict(self.config),
        }

    def to_significance_rows(self) -> list[dict[str, Any]]:
        """Render comparison rows compatible with experiments reporting helpers."""
        rows: list[dict[str, Any]] = []
        for comparison in self.comparisons:
            rows.append(
                {
                    "test": comparison.test_name,
                    "outcome": f"{self.metric} ({comparison.pair_label})",
                    "p_value": float(comparison.p_value),
                    "effect_size": float(comparison.effect_size),
                    "mean_difference": float(comparison.mean_difference),
                    "higher_condition": comparison.higher_condition,
                    "alternative": comparison.alternative,
                    "test_method": comparison.test_method,
                    "permutations_evaluated": int(comparison.permutations_evaluated),
                    "total_permutations": int(comparison.total_permutations),
                    "significant": bool(comparison.significant),
                }
            )
        return rows

    def render_brief(self) -> str:
        """Render a concise markdown brief for the comparison set."""
        lines = ["## Condition Comparison Brief"]
        if not self.comparisons:
            lines.append("- No condition comparisons were produced.")
            return "\n".join(lines)

        for comparison in self.comparisons:
            significant = "yes" if comparison.significant else "no"
            lines.append(
                "- "
                f"`{comparison.pair_label}` on `{self.metric}`: "
                f"{comparison.left_condition} mean={comparison.mean_left:.4g}, "
                f"{comparison.right_condition} mean={comparison.mean_right:.4g}, "
                f"diff={comparison.mean_difference:+.4g}, "
                f"d={comparison.effect_size:+.4g}, "
                f"p={comparison.p_value:.4g} "
                f"({comparison.test_method}, {comparison.alternative}, significant={significant})."
            )
        return "\n".join(lines)


def _is_blank(value: Any) -> bool:
    return value is None or (isinstance(value, str) and value.strip() == "")


def _cohen_d(sample_a: np.ndarray, sample_b: np.ndarray) -> float:
    n_a = sample_a.size
    n_b = sample_b.size
    if n_a < 2 or n_b < 2:
        return 0.0

    var_a = float(np.var(sample_a, ddof=1))
    var_b = float(np.var(sample_b, ddof=1))
    pooled = (((n_a - 1) * var_a) + ((n_b - 1) * var_b)) / float(n_a + n_b - 2)
    if pooled <= 0.0:
        return 0.0
    return float((np.mean(sample_a) - np.mean(sample_b)) / np.sqrt(pooled))


def _eta_squared(groups: list[np.ndarray]) -> float:
    all_values = np.concatenate(groups)
    grand_mean = float(np.mean(all_values))
    ss_between = 0.0
    ss_total = 0.0
    for group in groups:
        group_mean = float(np.mean(group))
        ss_between += float(group.size) * (group_mean - grand_mean) ** 2
        ss_total += float(np.sum((group - grand_mean) ** 2))
    if ss_total == 0.0:
        return 0.0
    return float(ss_between / ss_total)


def _as_array(values: Any, name: str) -> np.ndarray:
    arr = np.asarray(values, dtype=float).reshape(-1)
    if arr.size == 0:
        raise ValueError(f"{name} must contain at least one value.")
    return arr


def _calc_stat(x: np.ndarray, y: np.ndarray | None, stat: str) -> float:
    if stat == "mean":
        return float(np.mean(x))
    if stat == "median":
        return float(np.median(x))
    if y is None:
        raise ValueError(f"stat '{stat}' requires y values.")
    if stat == "diff_means":
        return float(np.mean(x) - np.mean(y))
    if stat == "diff_medians":
        return float(np.median(x) - np.median(y))
    raise ValueError("stat must be one of: mean, median, diff_means, diff_medians")


def _render_ci_text(estimate: float, ci_low: float, ci_high: float, ci: float, stat: str) -> str:
    return (
        f"The bootstrap {stat} estimate is {estimate:.4g}. "
        f"A {ci * 100:.1f}% interval spans [{ci_low:.4g}, {ci_high:.4g}], "
        "which describes uncertainty under the resampling assumptions."
    )


def _render_permutation_text(p_value: float, stat_name: str, alternative: str) -> str:
    return (
        f"The permutation test for {stat_name} produced p={p_value:.4g} ({alternative}). "
        "Interpret this as evidence against the null random-label model, not as proof of causality."
    )


def _render_np_test_text(
    test_name: str,
    p_value: float,
    alpha: float,
    effect_size: float | None,
) -> str:
    decision = "below" if p_value < alpha else "above"
    effect_txt = ""
    if effect_size is not None:
        effect_txt = f" Effect size estimate is {effect_size:.4g}."
    return (
        f"{test_name} returned p={p_value:.4g}, which is {decision} alpha={alpha:.3g}."
        f"{effect_txt} Use this with distribution checks and study context."
    )


def _coerce_analysis_rows(data: Any, *, table_name: str) -> list[dict[str, Any]]:
    try:
        return coerce_unified_table(data, config=_ANALYSIS_TABLE_CONFIG)
    except ValueError as exc:
        raise ValueError(f"Failed to coerce {table_name}: {exc}") from exc


def _unique_row_map(
    rows: Sequence[Mapping[str, Any]],
    *,
    key_column: str,
    table_name: str,
) -> dict[str, dict[str, Any]]:
    resolved: dict[str, dict[str, Any]] = {}
    for index, row in enumerate(rows):
        raw_key = row.get(key_column)
        if _is_blank(raw_key):
            raise ValueError(f"{table_name} row {index} is missing '{key_column}'.")
        key = str(raw_key)
        if key in resolved:
            raise ValueError(f"{table_name} contains duplicate '{key_column}' value {key!r}.")
        resolved[key] = dict(row)
    return resolved


def _collect_rows_by_run_id(
    rows: Sequence[Mapping[str, Any]],
    *,
    run_id_column: str,
    table_name: str,
) -> dict[str, list[dict[str, Any]]]:
    grouped: dict[str, list[dict[str, Any]]] = defaultdict(list)
    for index, row in enumerate(rows):
        raw_run_id = row.get(run_id_column)
        if _is_blank(raw_run_id):
            raise ValueError(f"{table_name} row {index} is missing '{run_id_column}'.")
        grouped[str(raw_run_id)].append(dict(row))
    return dict(grouped)


def _resolve_metric_label(
    rows: Sequence[Mapping[str, Any]],
    *,
    metric_name: str | None,
    metric_label_column: str,
    metric_column: str,
) -> str:
    if metric_name is not None and metric_name.strip():
        return metric_name

    labels = {
        str(value)
        for row in rows
        if metric_label_column in row and not _is_blank(value := row.get(metric_label_column))
    }
    if len(labels) == 1:
        return next(iter(labels))
    if len(labels) > 1:
        raise ValueError(
            "Joined table contains multiple metric labels. "
            "Provide metric_name explicitly or pre-filter the table."
        )
    return metric_column


def _resolve_condition_pairs(
    grouped_values: Mapping[str, Sequence[float]],
    *,
    condition_pairs: Sequence[tuple[str, str]] | None,
) -> list[tuple[str, str]]:
    known_conditions = sorted(grouped_values)
    if condition_pairs is None:
        return list(combinations(known_conditions, 2))

    resolved_pairs: list[tuple[str, str]] = []
    seen_pairs: set[tuple[str, str]] = set()
    for index, pair in enumerate(condition_pairs):
        if len(pair) != 2:
            raise ValueError(f"condition_pairs[{index}] must contain exactly two condition labels.")
        left, right = str(pair[0]), str(pair[1])
        if left == right:
            raise ValueError(f"condition_pairs[{index}] compares {left!r} against itself.")
        if left not in grouped_values or right not in grouped_values:
            raise ValueError(
                f"condition_pairs[{index}] references unknown conditions: {(left, right)!r}."
            )
        normalized_pair = (left, right)
        if normalized_pair in seen_pairs:
            raise ValueError(f"condition_pairs[{index}] duplicates pair {normalized_pair!r}.")
        seen_pairs.add(normalized_pair)
        resolved_pairs.append(normalized_pair)
    return resolved_pairs


def _exact_permutation_summary(
    x_arr: np.ndarray,
    y_arr: np.ndarray,
    *,
    alternative: str,
) -> dict[str, Any]:
    pooled = np.concatenate([x_arr, y_arr])
    observed = float(np.mean(x_arr) - np.mean(y_arr))
    n_x = int(x_arr.size)
    n_total = int(pooled.size)
    total_permutations = math.comb(n_total, n_x)
    pooled_sum = float(np.sum(pooled))
    exceedances = 0

    for left_indices in combinations(range(n_total), n_x):
        left_sum = float(sum(float(pooled[index]) for index in left_indices))
        right_sum = pooled_sum - left_sum
        stat = (left_sum / n_x) - (right_sum / (n_total - n_x))
        if alternative == "two-sided":
            is_extreme = abs(stat) >= abs(observed) - 1e-12
        elif alternative == "greater":
            is_extreme = stat >= observed - 1e-12
        else:
            is_extreme = stat <= observed + 1e-12
        if is_extreme:
            exceedances += 1

    return {
        "p_value": float(exceedances / total_permutations),
        "test_method": "exact",
        "permutations_evaluated": int(total_permutations),
        "total_permutations": int(total_permutations),
    }


def _pairwise_permutation_summary(
    x_arr: np.ndarray,
    y_arr: np.ndarray,
    *,
    alternative: str,
    exact_threshold: int,
    n_permutations: int,
    seed: int,
) -> dict[str, Any]:
    total_permutations = math.comb(int(x_arr.size + y_arr.size), int(x_arr.size))
    if total_permutations <= exact_threshold:
        return _exact_permutation_summary(x_arr, y_arr, alternative=alternative)

    sampled = permutation_test(
        x_arr,
        y_arr,
        n_permutations=n_permutations,
        alternative=alternative,
        seed=seed,
    )
    return {
        "p_value": float(sampled["p_value"]),
        "test_method": "sampled",
        "permutations_evaluated": int(n_permutations),
        "total_permutations": int(total_permutations),
    }


[docs] def compare_groups( values: Sequence[float] | None = None, groups: Sequence[Any] | None = None, *, data: Sequence[Mapping[str, Any]] | None = None, value_column: str = "value", group_column: str = "group", method: str = "auto", ) -> GroupComparisonResult: """Compare outcomes across groups using t-test, ANOVA, or Kruskal-Wallis.""" if data is not None: rows = coerce_unified_table(data) resolved_values: list[float] = [] resolved_groups: list[str] = [] for index, row in enumerate(rows): raw_value = row.get(value_column) raw_group = row.get(group_column) if _is_blank(raw_value) or _is_blank(raw_group): raise ValueError(f"Row {index} is missing '{value_column}' or '{group_column}'.") resolved_values.append(float(cast(float | int | str, raw_value))) resolved_groups.append(str(raw_group)) values = resolved_values groups = resolved_groups if values is None or groups is None: raise ValueError("Provide either values/groups or table data with value/group columns.") if len(values) != len(groups): raise ValueError("values and groups must have the same length.") if len(values) == 0: raise ValueError("values must not be empty.") grouped_values: dict[str, list[float]] = defaultdict(list) for value, group in zip(values, groups, strict=True): grouped_values[str(group)].append(float(value)) if len(grouped_values) < 2: raise ValueError("At least two groups are required for comparison.") arrays = {group: np.asarray(vals, dtype=float) for group, vals in grouped_values.items()} group_means = {group: float(np.mean(arr)) for group, arr in arrays.items()} group_sizes = {group: int(arr.size) for group, arr in arrays.items()} try: from scipy import stats except ImportError as exc: raise ImportError(_STATS_IMPORT_ERROR) from exc ordered_groups = sorted(arrays) samples = [arrays[group] for group in ordered_groups] resolved_method = method.lower() if resolved_method == "auto": resolved_method = "ttest" if len(samples) == 2 else "anova" if resolved_method == "ttest": if len(samples) != 2: raise ValueError("ttest requires exactly two groups.") statistic, p_value = stats.ttest_ind(samples[0], samples[1], equal_var=False) effect_size = _cohen_d(samples[0], samples[1]) elif resolved_method == "anova": statistic, p_value = stats.f_oneway(*samples) effect_size = _eta_squared(samples) elif resolved_method == "kruskal": statistic, p_value = stats.kruskal(*samples) effect_size = _eta_squared(samples) else: raise ValueError("Unsupported method. Valid options: auto, ttest, anova, kruskal.") return GroupComparisonResult( method=resolved_method, statistic=float(statistic), p_value=float(p_value), effect_size=float(effect_size), group_means=group_means, group_sizes=group_sizes, config={ "n_groups": len(samples), "n_samples": len(values), "value_column": value_column, "group_column": group_column, }, )
[docs] def fit_regression( X: Sequence[Sequence[float]] | np.ndarray, y: Sequence[float] | np.ndarray, *, feature_names: Sequence[str] | None = None, add_intercept: bool = True, ) -> RegressionResult: """Fit an ordinary least squares regression model with ``numpy``.""" X_arr = np.asarray(X, dtype=float) y_arr = np.asarray(y, dtype=float) if X_arr.ndim != 2: raise ValueError("X must be a 2D matrix.") if y_arr.ndim != 1: raise ValueError("y must be a 1D vector.") if X_arr.shape[0] != y_arr.shape[0]: raise ValueError("X and y must have the same number of rows.") if X_arr.shape[0] == 0: raise ValueError("X and y must not be empty.") design = X_arr intercept = 0.0 if add_intercept: ones = np.ones((X_arr.shape[0], 1), dtype=float) design = np.hstack([ones, X_arr]) coefficients, *_ = np.linalg.lstsq(design, y_arr, rcond=None) predictions = design @ coefficients residuals = y_arr - predictions ss_res = float(np.sum(residuals**2)) ss_tot = float(np.sum((y_arr - np.mean(y_arr)) ** 2)) r2 = 0.0 if ss_tot == 0.0 else 1.0 - (ss_res / ss_tot) mse = float(np.mean(residuals**2)) if add_intercept: intercept = float(coefficients[0]) weight_vector = coefficients[1:] else: weight_vector = coefficients if feature_names is None: feature_names = [f"x{i}" for i in range(X_arr.shape[1])] if len(feature_names) != X_arr.shape[1]: raise ValueError("feature_names length must match number of features in X.") weights = { str(name): float(value) for name, value in zip(feature_names, weight_vector, strict=True) } return RegressionResult( coefficients=weights, intercept=intercept, r2=float(r2), mse=float(mse), n_samples=int(X_arr.shape[0]), n_features=int(X_arr.shape[1]), config={"add_intercept": bool(add_intercept)}, )
[docs] def fit_mixed_effects( data: Sequence[Mapping[str, Any]], *, formula: str, group_column: str, backend: str = "statsmodels", reml: bool = True, max_iter: int = 200, ) -> MixedEffectsResult: """Fit a mixed-effects model using ``statsmodels``.""" if backend != "statsmodels": raise ValueError("Unsupported backend. Valid options: statsmodels.") rows = coerce_unified_table(data) if not rows: raise ValueError("Mixed-effects fitting requires at least one row.") try: import pandas as pd import statsmodels.formula.api as smf except ImportError as exc: raise ImportError(_STATS_IMPORT_ERROR) from exc frame = pd.DataFrame(rows) if group_column not in frame.columns: raise ValueError(f"group_column '{group_column}' was not found in the table.") model = smf.mixedlm(formula=formula, data=frame, groups=frame[group_column]) try: fitted = model.fit(reml=reml, maxiter=max_iter, method="lbfgs") except Exception as exc: # pragma: no cover - backend-dependent failure modes return MixedEffectsResult( success=False, backend="statsmodels", formula=formula, group_column=group_column, params={}, aic=None, bic=None, log_likelihood=None, message=str(exc), config={"reml": bool(reml), "max_iter": int(max_iter)}, ) params = {str(name): float(value) for name, value in fitted.params.items()} return MixedEffectsResult( success=True, backend="statsmodels", formula=formula, group_column=group_column, params=params, aic=float(fitted.aic) if fitted.aic is not None else None, bic=float(fitted.bic) if fitted.bic is not None else None, log_likelihood=float(fitted.llf) if fitted.llf is not None else None, message="", config={"reml": bool(reml), "max_iter": int(max_iter)}, )
[docs] def bootstrap_ci( x: Any, *, stat: str = "mean", y: Any | None = None, n_resamples: int = 10000, ci: float = 0.95, method: str = "percentile", seed: int = 0, ) -> dict[str, Any]: """Estimate bootstrap confidence intervals for one- and two-sample statistics.""" if n_resamples <= 0: raise ValueError("n_resamples must be positive.") if not (0.0 < ci < 1.0): raise ValueError("ci must be in (0, 1).") x_arr = _as_array(x, "x") y_arr = _as_array(y, "y") if y is not None else None if method == "bca": try: from scipy import stats as sp_stats except ImportError as exc: raise ImportError(_STATS_IMPORT_ERROR) from exc alpha = 1.0 - ci if y_arr is None: stat_fun: Any if stat == "mean": stat_fun = np.mean elif stat == "median": stat_fun = np.median else: raise ValueError(f"stat '{stat}' requires y values.") bres = sp_stats.bootstrap( (x_arr,), stat_fun, confidence_level=ci, n_resamples=n_resamples, method="BCa", random_state=seed, ) ci_low = float(bres.confidence_interval.low) ci_high = float(bres.confidence_interval.high) estimate = _calc_stat(x_arr, y_arr, stat) rng = np.random.default_rng(seed) samples = np.empty(n_resamples, dtype=float) n_x = x_arr.size for idx in range(n_resamples): boot_x = x_arr[rng.integers(0, n_x, size=n_x)] samples[idx] = _calc_stat(boot_x, None, stat) else: if stat not in {"diff_means", "diff_medians"}: raise ValueError(f"stat '{stat}' is not valid for two-sample BCa.") rng = np.random.default_rng(seed) samples = np.empty(n_resamples, dtype=float) n_x = x_arr.size n_y = y_arr.size for idx in range(n_resamples): boot_x = x_arr[rng.integers(0, n_x, size=n_x)] boot_y = y_arr[rng.integers(0, n_y, size=n_y)] samples[idx] = _calc_stat(boot_x, boot_y, stat) ci_low = float(np.quantile(samples, alpha / 2.0)) ci_high = float(np.quantile(samples, 1.0 - alpha / 2.0)) estimate = _calc_stat(x_arr, y_arr, stat) distribution_summary = { "n": int(samples.size), "std": float(np.std(samples, ddof=1)), "skew": float( ((samples - samples.mean()) ** 3).mean() / (np.std(samples) ** 3 + 1e-12) ), } return { "estimate": estimate, "ci_low": ci_low, "ci_high": ci_high, "distribution_summary": distribution_summary, "method_used": method, "interpretation": _render_ci_text( estimate=estimate, ci_low=ci_low, ci_high=ci_high, ci=ci, stat=stat, ), } if method != "percentile": raise ValueError("method must be one of: percentile, bca") rng = np.random.default_rng(seed) samples = np.empty(n_resamples, dtype=float) n_x = x_arr.size n_y = y_arr.size if y_arr is not None else 0 for idx in range(n_resamples): boot_x = x_arr[rng.integers(0, n_x, size=n_x)] boot_y_opt = y_arr[rng.integers(0, n_y, size=n_y)] if y_arr is not None else None samples[idx] = _calc_stat(boot_x, boot_y_opt, stat) alpha = 1.0 - ci ci_low = float(np.quantile(samples, alpha / 2.0)) ci_high = float(np.quantile(samples, 1.0 - alpha / 2.0)) estimate = _calc_stat(x_arr, y_arr, stat) distribution_summary = { "n": int(samples.size), "std": float(np.std(samples, ddof=1)), "skew": float(((samples - samples.mean()) ** 3).mean() / (np.std(samples) ** 3 + 1e-12)), } return { "estimate": estimate, "ci_low": ci_low, "ci_high": ci_high, "distribution_summary": distribution_summary, "method_used": method, "interpretation": _render_ci_text( estimate=estimate, ci_low=ci_low, ci_high=ci_high, ci=ci, stat=stat, ), }
[docs] def permutation_test( x: Any, y: Any, *, stat: str = "diff_means", n_permutations: int = 20000, alternative: str = "two-sided", seed: int = 0, ) -> dict[str, Any]: """Run a two-sample permutation test.""" if n_permutations <= 0: raise ValueError("n_permutations must be positive.") if alternative not in _SUPPORTED_PERMUTATION_ALTERNATIVES: raise ValueError("alternative must be one of: two-sided, greater, less") x_arr = _as_array(x, "x") y_arr = _as_array(y, "y") observed = _calc_stat(x_arr, y_arr, stat) rng = np.random.default_rng(seed) pooled = np.concatenate([x_arr, y_arr]) n_x = x_arr.size perm_stats = np.empty(n_permutations, dtype=float) for idx in range(n_permutations): perm = rng.permutation(pooled) perm_stats[idx] = _calc_stat(perm[:n_x], perm[n_x:], stat) if alternative == "two-sided": p_value = float((np.abs(perm_stats) >= abs(observed)).mean()) elif alternative == "greater": p_value = float((perm_stats >= observed).mean()) else: p_value = float((perm_stats <= observed).mean()) return { "p_value": p_value, "observed_stat": float(observed), "stat": stat, "alternative": alternative, "interpretation": _render_permutation_text( p_value=p_value, stat_name=stat, alternative=alternative, ), }
[docs] def build_condition_metric_table( runs: Any, *, metric: str, condition_column: str = "condition", evaluations: Any | None = None, conditions: Any | None = None, run_id_column: str = "run_id", condition_id_column: str = "condition_id", evaluation_metric_column: str = "metric_name", evaluation_value_column: str = "metric_value", ) -> list[dict[str, Any]]: """Build a normalized run-level condition/metric table from experiment exports.""" run_rows = _coerce_analysis_rows(runs, table_name="runs table") if not run_rows: raise ValueError("runs table must contain at least one row.") condition_rows = ( _coerce_analysis_rows(conditions, table_name="conditions table") if conditions is not None else [] ) evaluation_rows = ( _coerce_analysis_rows(evaluations, table_name="evaluations table") if evaluations is not None else [] ) condition_lookup = ( _unique_row_map( condition_rows, key_column=condition_id_column, table_name="conditions table", ) if condition_rows else {} ) evaluation_lookup = ( _collect_rows_by_run_id( evaluation_rows, run_id_column=run_id_column, table_name="evaluations table", ) if evaluation_rows else {} ) metric_in_runs = any(metric in row for row in run_rows) condition_in_runs = any(condition_column in row for row in run_rows) normalized: list[dict[str, Any]] = [] for index, row in enumerate(run_rows): raw_run_id = row.get(run_id_column) if _is_blank(raw_run_id): raise ValueError(f"runs table row {index} is missing '{run_id_column}'.") run_id = str(raw_run_id) raw_condition_id = row.get(condition_id_column) condition_id = "" if _is_blank(raw_condition_id) else str(raw_condition_id) if condition_in_runs: raw_condition = row.get(condition_column) if _is_blank(raw_condition): raise ValueError( "runs table row " f"{index} is missing direct condition column {condition_column!r}." ) condition_value = str(raw_condition) condition_source = "runs" else: if not condition_lookup: raise ValueError( f"Condition column {condition_column!r} was not found in runs rows and no " "conditions table was provided." ) if not condition_id: raise ValueError( f"runs table row {index} is missing '{condition_id_column}' " "for conditions join." ) condition_row = condition_lookup.get(condition_id) if condition_row is None: raise ValueError( f"runs table row {index} references unknown condition_id {condition_id!r}." ) raw_condition = condition_row.get(condition_column) if _is_blank(raw_condition): raise ValueError( f"conditions row for condition_id {condition_id!r} is missing " f"{condition_column!r}." ) condition_value = str(raw_condition) condition_source = "conditions" if metric_in_runs: raw_value = row.get(metric) if _is_blank(raw_value): raise ValueError(f"runs table row {index} is missing metric column {metric!r}.") metric_value = float(cast(float | int | str, raw_value)) metric_source = "runs" else: if not evaluation_lookup: raise ValueError( f"Metric {metric!r} was not found in runs rows and no evaluations table " "was provided." ) matching_rows = [] for evaluation_row in evaluation_lookup.get(run_id, []): aggregation_level = evaluation_row.get("aggregation_level") if not _is_blank(aggregation_level) and str(aggregation_level) != "run": continue if str(evaluation_row.get(evaluation_metric_column, "")) == metric: matching_rows.append(evaluation_row) if not matching_rows: raise ValueError( f"No evaluation metric {metric!r} was found for run_id {run_id!r}." ) if len(matching_rows) > 1: raise ValueError( f"Multiple evaluation rows matched metric {metric!r} for run_id {run_id!r}." ) raw_value = matching_rows[0].get(evaluation_value_column) if _is_blank(raw_value): raise ValueError( f"Evaluation metric {metric!r} for run_id {run_id!r} is missing " f"{evaluation_value_column!r}." ) metric_value = float(cast(float | int | str, raw_value)) metric_source = "evaluations" normalized.append( { "run_id": run_id, "condition_id": condition_id, "condition": condition_value, "metric": metric, "value": metric_value, "condition_source": condition_source, "metric_source": metric_source, } ) return normalized
[docs] def compare_condition_pairs( data: Any, *, condition_column: str = "condition", metric_column: str = "value", metric_name: str | None = None, condition_pairs: Sequence[tuple[str, str]] | None = None, alternative: str = "two-sided", alpha: float = 0.05, exact_threshold: int = _DEFAULT_EXACT_PERMUTATION_THRESHOLD, n_permutations: int = _DEFAULT_SAMPLED_PERMUTATIONS, seed: int = 0, ) -> ConditionComparisonReport: """Compare all or selected condition pairs on one numeric metric.""" if alternative not in _SUPPORTED_PERMUTATION_ALTERNATIVES: raise ValueError("alternative must be one of: two-sided, greater, less") if not (0.0 < alpha < 1.0): raise ValueError("alpha must be in (0, 1).") if exact_threshold <= 0: raise ValueError("exact_threshold must be positive.") if n_permutations <= 0: raise ValueError("n_permutations must be positive.") rows = _coerce_analysis_rows(data, table_name="comparison table") if not rows: raise ValueError("comparison table must contain at least one row.") grouped_values: dict[str, list[float]] = defaultdict(list) for index, row in enumerate(rows): raw_condition = row.get(condition_column) raw_value = row.get(metric_column) if _is_blank(raw_condition): raise ValueError(f"comparison table row {index} is missing '{condition_column}'.") if _is_blank(raw_value): raise ValueError(f"comparison table row {index} is missing '{metric_column}'.") grouped_values[str(raw_condition)].append(float(cast(float | int | str, raw_value))) if len(grouped_values) < 2: raise ValueError("At least two conditions are required for pairwise comparison.") resolved_metric = _resolve_metric_label( rows, metric_name=metric_name, metric_label_column="metric", metric_column=metric_column, ) resolved_pairs = _resolve_condition_pairs( grouped_values, condition_pairs=condition_pairs, ) if not resolved_pairs: raise ValueError("No condition pairs were requested.") comparisons: list[ConditionPairComparison] = [] for pair_index, (left_condition, right_condition) in enumerate(resolved_pairs): left_arr = np.asarray(grouped_values[left_condition], dtype=float) right_arr = np.asarray(grouped_values[right_condition], dtype=float) mean_left = float(np.mean(left_arr)) mean_right = float(np.mean(right_arr)) mean_difference = mean_left - mean_right effect_size = _cohen_d(left_arr, right_arr) permutation_summary = _pairwise_permutation_summary( left_arr, right_arr, alternative=alternative, exact_threshold=exact_threshold, n_permutations=n_permutations, seed=seed + pair_index, ) if math.isclose(mean_difference, 0.0, abs_tol=1e-12): higher_condition = None elif mean_difference > 0: higher_condition = left_condition else: higher_condition = right_condition comparisons.append( ConditionPairComparison( metric=resolved_metric, left_condition=left_condition, right_condition=right_condition, mean_left=mean_left, mean_right=mean_right, n_left=int(left_arr.size), n_right=int(right_arr.size), mean_difference=mean_difference, effect_size=effect_size, p_value=float(permutation_summary["p_value"]), alternative=alternative, test_method=str(permutation_summary["test_method"]), permutations_evaluated=int(permutation_summary["permutations_evaluated"]), total_permutations=int(permutation_summary["total_permutations"]), higher_condition=higher_condition, significant=float(permutation_summary["p_value"]) < alpha, ) ) return ConditionComparisonReport( metric=resolved_metric, condition_column=condition_column, metric_column=metric_column, alternative=alternative, alpha=float(alpha), comparisons=tuple(comparisons), config={ "n_conditions": len(grouped_values), "exact_threshold": int(exact_threshold), "n_permutations": int(n_permutations), "seed": int(seed), }, )
[docs] def rank_tests_one_stop( x: Any, y: Any | None = None, groups: Sequence[Any] | None = None, *, paired: bool | None = None, kind: str | None = None, alternative: str = "two-sided", alpha: float = 0.05, ) -> dict[str, Any]: """Dispatch to a nonparametric rank test with consistent structured output.""" try: from scipy import stats as sp_stats except ImportError as exc: raise ImportError(_STATS_IMPORT_ERROR) from exc x_arr = _as_array(x, "x") if kind is None: if groups is not None: kind = "friedman" if paired else "kruskal" elif y is not None: kind = "wilcoxon" if paired else "mannwhitney" else: raise ValueError("Provide y or groups to select a rank test.") effect_size: float | None = None result: dict[str, Any] guidance = [ "Report exact sample sizes and handling of ties.", "Pair p-values with effect size and uncertainty context.", "Avoid causal claims from rank tests alone.", ] if kind == "mannwhitney": if y is None: raise ValueError("mannwhitney requires y.") y_arr = _as_array(y, "y") stat_val, p_value = sp_stats.mannwhitneyu(x_arr, y_arr, alternative=alternative) n1, n2 = x_arr.size, y_arr.size effect_size = 1.0 - 2.0 * float(stat_val) / float(n1 * n2) result = {"test": "mannwhitney", "statistic": float(stat_val), "p_value": float(p_value)} elif kind == "wilcoxon": if y is None: raise ValueError("wilcoxon requires y.") y_arr = _as_array(y, "y") stat_val, p_value = sp_stats.wilcoxon(x_arr, y_arr, alternative=alternative) n = x_arr.size max_w = n * (n + 1) / 2.0 effect_size = 1.0 - 2.0 * float(stat_val) / max_w result = {"test": "wilcoxon", "statistic": float(stat_val), "p_value": float(p_value)} elif kind == "kruskal": if groups is None: raise ValueError("kruskal requires groups as list-like samples.") arrays = [_as_array(g, f"group_{idx}") for idx, g in enumerate(groups)] stat_val, p_value = sp_stats.kruskal(*arrays) n_total = int(sum(len(g) for g in arrays)) k = len(arrays) effect_size = float((stat_val - k + 1) / (n_total - k)) if n_total > k else 0.0 result = {"test": "kruskal", "statistic": float(stat_val), "p_value": float(p_value)} elif kind == "friedman": if groups is None: raise ValueError("friedman requires groups as repeated-measures samples.") arrays = [_as_array(g, f"group_{idx}") for idx, g in enumerate(groups)] stat_val, p_value = sp_stats.friedmanchisquare(*arrays) n = len(arrays[0]) k = len(arrays) effect_size = float(stat_val / (n * (k - 1))) if n > 0 and k > 1 else 0.0 result = {"test": "friedman", "statistic": float(stat_val), "p_value": float(p_value)} else: raise ValueError("kind must be one of: mannwhitney, wilcoxon, kruskal, friedman") result["effect_size"] = effect_size result["alpha"] = alpha result["interpretation"] = _render_np_test_text( test_name=result["test"], p_value=result["p_value"], alpha=alpha, effect_size=effect_size, ) result["reporting_guidance"] = guidance return result
def _load_power_engines() -> tuple[Any, Any]: try: from statsmodels.stats.power import TTestIndPower, TTestPower except ImportError as exc: raise ImportError(_POWER_IMPORT_ERROR) from exc return TTestIndPower(), TTestPower() def _validate_common_inputs( *, test: str, alpha: float, power: float | None = None, ratio: float = 1.0, alternative: str, ) -> None: if test not in _SUPPORTED_TESTS: valid = ", ".join(sorted(_SUPPORTED_TESTS)) raise ValueError(f"test must be one of: {valid}") if not (0.0 < alpha < 1.0): raise ValueError("alpha must be in (0, 1).") if power is not None and not (0.0 < power < 1.0): raise ValueError("power must be in (0, 1).") if ratio <= 0: raise ValueError("ratio must be positive.") if alternative not in _SUPPORTED_ALTERNATIVES: valid = ", ".join(sorted(_SUPPORTED_ALTERNATIVES)) raise ValueError(f"alternative must be one of: {valid}") def _two_sample_allocation_from_n(total_n: int, ratio: float) -> tuple[int, int]: n1 = max(1, round(total_n / (1.0 + ratio))) if n1 >= total_n: n1 = total_n - 1 n2 = total_n - n1 if n2 <= 0: n2 = 1 n1 = total_n - n2 return n1, n2 def _compute_power( effect_size: float, *, n: int, test: str, alpha: float, ratio: float, alternative: str, ) -> float: ind_power, one_power = _load_power_engines() effect = abs(effect_size) if test == "two_sample_t": n1, n2 = _two_sample_allocation_from_n(n, ratio) return float( ind_power.power( effect_size=effect, nobs1=n1, alpha=alpha, ratio=(n2 / n1), alternative=alternative, ) ) return float(one_power.power(effect_size=effect, nobs=n, alpha=alpha, alternative=alternative))
[docs] def estimate_sample_size( effect_size: float, *, test: str, alpha: float = 0.05, power: float = 0.8, ratio: float = 1.0, alternative: str = "two-sided", ) -> dict[str, Any]: """Estimate total sample size for supported t-test families.""" _validate_common_inputs( test=test, alpha=alpha, power=power, ratio=ratio, alternative=alternative, ) effect = abs(effect_size) if effect == 0: raise ValueError("effect_size must be non-zero.") ind_power, one_power = _load_power_engines() assumptions = ["Effect size is interpreted as Cohen's d."] if test == "two_sample_t": nobs1 = float( ind_power.solve_power( effect_size=effect, nobs1=None, alpha=alpha, power=power, ratio=ratio, alternative=alternative, ) ) n1 = max(1, math.ceil(nobs1)) n2 = max(1, math.ceil(n1 * ratio)) recommended_n = n1 + n2 group_allocation: list[int] | None = [n1, n2] assumptions.append("Group allocation is rounded up while preserving the requested ratio.") else: nobs = float( one_power.solve_power( effect_size=effect, nobs=None, alpha=alpha, power=power, alternative=alternative, ) ) recommended_n = max(2, math.ceil(nobs)) group_allocation = None return { "test": test, "effect_size": effect, "alpha": alpha, "target_power": power, "alternative": alternative, "recommended_n": int(recommended_n), "group_allocation": group_allocation, "assumptions": assumptions, }
[docs] def power_curve( effect_sizes: Sequence[float], *, n: int, test: str, alpha: float = 0.05, ratio: float = 1.0, alternative: str = "two-sided", ) -> pd.DataFrame: """Compute achieved power over a sequence of effect sizes.""" _validate_common_inputs( test=test, alpha=alpha, ratio=ratio, alternative=alternative, ) if not effect_sizes: raise ValueError("effect_sizes must not be empty.") if n <= 1: raise ValueError("n must be greater than 1.") try: import pandas as pd except ImportError as exc: raise ImportError(_POWER_IMPORT_ERROR) from exc rows = [ { "effect_size": abs(float(effect_size)), "power": _compute_power( abs(float(effect_size)), n=n, test=test, alpha=alpha, ratio=ratio, alternative=alternative, ), } for effect_size in effect_sizes ] return pd.DataFrame(rows, columns=["effect_size", "power"])
[docs] def minimum_detectable_effect( n: int, *, test: str, alpha: float = 0.05, power: float = 0.8, ratio: float = 1.0, alternative: str = "two-sided", ) -> dict[str, Any]: """Solve for the smallest detectable standardized effect size.""" _validate_common_inputs( test=test, alpha=alpha, power=power, ratio=ratio, alternative=alternative, ) if n <= 1: raise ValueError("n must be greater than 1.") low = 1e-6 high = 5.0 if ( _compute_power(high, n=n, test=test, alpha=alpha, ratio=ratio, alternative=alternative) < power ): raise ValueError("Target power is unreachable for effect sizes up to 5.0.") for _ in range(100): mid = (low + high) / 2.0 achieved = _compute_power( mid, n=n, test=test, alpha=alpha, ratio=ratio, alternative=alternative, ) if achieved >= power: high = mid else: low = mid if abs(high - low) < 1e-4: break return { "test": test, "n": int(n), "alpha": alpha, "target_power": power, "alternative": alternative, "minimum_detectable_effect": float(high), "assumptions": ["Effect size is interpreted as Cohen's d."], }
__all__ = [ "ConditionComparisonReport", "ConditionPairComparison", "GroupComparisonResult", "MixedEffectsResult", "RegressionResult", "bootstrap_ci", "build_condition_metric_table", "compare_condition_pairs", "compare_groups", "estimate_sample_size", "fit_mixed_effects", "fit_regression", "minimum_detectable_effect", "permutation_test", "power_curve", "rank_tests_one_stop", ]