"""Internal helpers for algebraic comparison across typed result objects."""
from __future__ import annotations
import itertools
import math
from dataclasses import dataclass, field
from typing import Any
import numpy as np
[docs]
@dataclass(slots=True)
class ComparisonResult:
"""Structured output for algebraic result-object comparisons."""
operation: str
left_type: str
right_type: str
metric: str
estimate: float
statistic: float | None = None
p_value: float | None = None
effect_size: float | None = None
details: dict[str, Any] = field(default_factory=dict)
interpretation: str = ""
[docs]
def to_dict(self) -> dict[str, Any]:
"""Convert the comparison output to a JSON-serializable dictionary."""
return {
"operation": self.operation,
"left_type": self.left_type,
"right_type": self.right_type,
"metric": self.metric,
"estimate": float(self.estimate),
"statistic": None if self.statistic is None else float(self.statistic),
"p_value": None if self.p_value is None else float(self.p_value),
"effect_size": None if self.effect_size is None else float(self.effect_size),
"details": dict(self.details),
"interpretation": self.interpretation,
}
def flatten_numeric_vector(values: Any, *, name: str) -> np.ndarray:
"""Normalize numeric inputs to a non-empty 1D float array."""
vector = np.asarray(values, dtype=float).reshape(-1)
if vector.size == 0:
raise ValueError(f"{name} must contain at least one numeric value.")
return vector
def cohen_d(left: Any, right: Any) -> float:
"""Estimate a standardized mean difference between numeric vectors."""
left_vec = flatten_numeric_vector(left, name="left")
right_vec = flatten_numeric_vector(right, name="right")
if left_vec.size < 2 or right_vec.size < 2:
return 0.0
left_var = float(np.var(left_vec, ddof=1))
right_var = float(np.var(right_vec, ddof=1))
pooled = (((left_vec.size - 1) * left_var) + ((right_vec.size - 1) * right_var)) / float(
left_vec.size + right_vec.size - 2
)
if pooled <= 0.0:
return 0.0
return float((np.mean(left_vec) - np.mean(right_vec)) / math.sqrt(pooled))
def rms_delta(left: Any, right: Any) -> float:
"""Return the root-mean-square delta between paired numeric vectors."""
left_vec = flatten_numeric_vector(left, name="left")
right_vec = flatten_numeric_vector(right, name="right")
if left_vec.shape != right_vec.shape:
raise ValueError(
f"Numeric comparison requires matching vector sizes. Got {left_vec.shape} and "
f"{right_vec.shape}."
)
return float(math.sqrt(float(np.mean((left_vec - right_vec) ** 2))))
def permutation_rms_test(
left: Any,
right: Any,
*,
n_permutations: int = 2000,
seed: int = 0,
) -> tuple[float, float]:
"""Estimate a permutation p-value for RMS difference between vectors."""
if n_permutations <= 0:
raise ValueError("n_permutations must be positive.")
left_vec = flatten_numeric_vector(left, name="left")
right_vec = flatten_numeric_vector(right, name="right")
if left_vec.shape != right_vec.shape:
raise ValueError(
f"Numeric comparison requires matching vector sizes. Got {left_vec.shape} and "
f"{right_vec.shape}."
)
observed = rms_delta(left_vec, right_vec)
pooled = np.concatenate([left_vec, right_vec])
n_left = left_vec.size
rng = np.random.default_rng(seed)
exceedances = 0
for _ in range(n_permutations):
permuted = rng.permutation(pooled)
perm_stat = rms_delta(permuted[:n_left], permuted[n_left:])
if perm_stat >= observed:
exceedances += 1
p_value = float((exceedances + 1) / (n_permutations + 1))
return observed, p_value
def build_numeric_difference_result(
*,
left: Any,
right: Any,
left_type: str,
right_type: str,
metric: str,
details: dict[str, Any] | None = None,
seed: int = 0,
) -> ComparisonResult:
"""Build a default difference result from aligned numeric vectors."""
left_vec = flatten_numeric_vector(left, name="left")
right_vec = flatten_numeric_vector(right, name="right")
statistic, p_value = permutation_rms_test(left_vec, right_vec, seed=seed)
effect = cohen_d(left_vec, right_vec)
payload = dict(details or {})
payload.setdefault("n_parameters", int(left_vec.size))
payload.setdefault("mean_absolute_difference", float(np.mean(np.abs(left_vec - right_vec))))
interpretation = (
f"RMS {metric} difference is {statistic:.4g}. "
f"Permutation p={p_value:.4g}. "
f"Standardized effect size d={effect:.4g}."
)
return ComparisonResult(
operation="difference",
left_type=left_type,
right_type=right_type,
metric=metric,
estimate=float(statistic),
statistic=float(statistic),
p_value=float(p_value),
effect_size=float(effect),
details=payload,
interpretation=interpretation,
)
def build_numeric_effect_size_result(
*,
left: Any,
right: Any,
left_type: str,
right_type: str,
metric: str,
details: dict[str, Any] | None = None,
) -> ComparisonResult:
"""Build a default effect-size result from aligned numeric vectors."""
left_vec = flatten_numeric_vector(left, name="left")
right_vec = flatten_numeric_vector(right, name="right")
effect = cohen_d(left_vec, right_vec)
payload = dict(details or {})
payload.setdefault("n_parameters", int(left_vec.size))
payload.setdefault("mean_left", float(np.mean(left_vec)))
payload.setdefault("mean_right", float(np.mean(right_vec)))
interpretation = (
f"Standardized {metric} effect size is d={effect:.4g}. "
"Positive values indicate larger average parameters on the left-hand result."
)
return ComparisonResult(
operation="effect_size",
left_type=left_type,
right_type=right_type,
metric=metric,
estimate=float(effect),
statistic=None,
p_value=None,
effect_size=float(effect),
details=payload,
interpretation=interpretation,
)
def align_vector_by_labels(
values: Any,
source_labels: list[str],
target_labels: list[str],
) -> np.ndarray:
"""Expand a 1D vector to a shared label space."""
vector = np.asarray(values, dtype=float).reshape(-1)
if vector.size != len(source_labels):
raise ValueError("Label alignment requires one value per source label.")
index_map = {label: idx for idx, label in enumerate(source_labels)}
aligned = np.zeros(len(target_labels), dtype=float)
for idx, label in enumerate(target_labels):
source_idx = index_map.get(label)
if source_idx is not None:
aligned[idx] = vector[source_idx]
return aligned
def align_square_matrix_by_labels(
matrix: Any,
source_labels: list[str],
target_labels: list[str],
) -> np.ndarray:
"""Expand a square matrix to a shared row/column label space."""
arr = np.asarray(matrix, dtype=float)
if arr.ndim != 2 or arr.shape[0] != arr.shape[1]:
raise ValueError("Square-matrix alignment requires a square 2D array.")
if arr.shape[0] != len(source_labels):
raise ValueError("Label alignment requires one row/column per source label.")
index_map = {label: idx for idx, label in enumerate(source_labels)}
aligned = np.zeros((len(target_labels), len(target_labels)), dtype=float)
for row_idx, row_label in enumerate(target_labels):
source_row = index_map.get(row_label)
if source_row is None:
continue
for col_idx, col_label in enumerate(target_labels):
source_col = index_map.get(col_label)
if source_col is None:
continue
aligned[row_idx, col_idx] = arr[source_row, source_col]
return aligned
def permute_vector(values: Any, permutation: tuple[int, ...]) -> np.ndarray:
"""Reorder a 1D vector according to a state permutation."""
vector = np.asarray(values, dtype=float).reshape(-1)
if vector.size != len(permutation):
raise ValueError("Permutation size must match vector length.")
return vector[list(permutation)]
def permute_rows(matrix: Any, permutation: tuple[int, ...]) -> np.ndarray:
"""Reorder the rows of a matrix according to a state permutation."""
arr = np.asarray(matrix, dtype=float)
if arr.ndim != 2 or arr.shape[0] != len(permutation):
raise ValueError("Permutation size must match the number of matrix rows.")
return arr[list(permutation), :]
def permute_square_matrix(matrix: Any, permutation: tuple[int, ...]) -> np.ndarray:
"""Reorder both axes of a square matrix according to a state permutation."""
arr = np.asarray(matrix, dtype=float)
if arr.ndim != 2 or arr.shape[0] != arr.shape[1]:
raise ValueError("Square-matrix permutation requires a square 2D array.")
if arr.shape[0] != len(permutation):
raise ValueError("Permutation size must match the square matrix dimension.")
indices = list(permutation)
return arr[np.ix_(indices, indices)]
def best_assignment(cost_matrix: Any) -> tuple[int, ...]:
"""Find a low-cost one-to-one assignment for a square cost matrix."""
cost = np.asarray(cost_matrix, dtype=float)
if cost.ndim != 2 or cost.shape[0] != cost.shape[1]:
raise ValueError("Assignment requires a square cost matrix.")
n_states = cost.shape[0]
if n_states == 0:
return ()
if n_states <= 8:
best = min(
itertools.permutations(range(n_states)),
key=lambda perm: float(sum(cost[idx, perm[idx]] for idx in range(n_states))),
)
return tuple(int(item) for item in best)
remaining = set(range(n_states))
ordered: list[int] = []
for row_idx in range(n_states):
best_col = min(remaining, key=lambda col_idx: float(cost[row_idx, col_idx]))
ordered.append(int(best_col))
remaining.remove(best_col)
return tuple(ordered)
class ComparableResultMixin:
"""Provide shared difference/effect-size operators for result objects."""
def __sub__(self, other: Any) -> ComparisonResult:
"""Shorthand for ``difference(other)``."""
return self.difference(other)
def __truediv__(self, other: Any) -> ComparisonResult:
"""Shorthand for ``effect(other)``."""
return self.effect(other)
def difference(self, other: Any) -> ComparisonResult:
"""Return a structured difference result against another typed result."""
return self._comparison_result(other, operation="difference")
def effect(self, other: Any) -> ComparisonResult:
"""Return a structured effect-size result against another typed result."""
return self._comparison_result(other, operation="effect_size")
def _comparison_result(self, other: Any, *, operation: str) -> ComparisonResult:
other_family = getattr(other, "_comparison_family", None)
if other_family is None or other_family() != self._comparison_family():
raise TypeError(
f"{type(self).__name__} can only be compared against results in the "
f"'{self._comparison_family()}' family."
)
return self._build_comparison(other, operation=operation)
def _comparison_family(self) -> str:
return self.__class__.__name__
def _comparison_metric(self) -> str:
return "parameter_profile"
def _build_comparison(self, other: Any, *, operation: str) -> ComparisonResult:
left_vector, right_vector, details = self._comparison_vectors(other)
if operation == "difference":
return build_numeric_difference_result(
left=left_vector,
right=right_vector,
left_type=type(self).__name__,
right_type=type(other).__name__,
metric=self._comparison_metric(),
details=details,
)
if operation == "effect_size":
return build_numeric_effect_size_result(
left=left_vector,
right=right_vector,
left_type=type(self).__name__,
right_type=type(other).__name__,
metric=self._comparison_metric(),
details=details,
)
raise ValueError(f"Unsupported comparison operation: {operation}")
def _comparison_vectors(self, other: Any) -> tuple[np.ndarray, np.ndarray, dict[str, Any]]:
raise NotImplementedError