Source code for design_research_problems.problems._decision

"""Decision problem container."""

from __future__ import annotations

from collections.abc import Iterator, Mapping, Sequence
from dataclasses import dataclass
from itertools import product
from math import exp, prod
from types import MappingProxyType
from typing import TYPE_CHECKING, Literal, cast

from design_research_problems._exceptions import ProblemEvaluationError
from design_research_problems.problems._assets import PackageResourceBundle
from design_research_problems.problems._computable import ComputableProblem
from design_research_problems.problems._mcp import (
    create_fastmcp_server,
    normalized_optional_text,
    register_design_brief_resource,
    to_json_value,
)
from design_research_problems.problems._metadata import ProblemMetadata

if TYPE_CHECKING:
    from mcp.server.fastmcp import FastMCP

    from design_research_problems._catalog._manifest import ProblemManifest

_DISCRETE_MARKET_SIZE = 1_600_000.0
_CHOICE_METRIC_TOP_CHOICE_SHARE = "top-choice-share"
_CHOICE_METRIC_MEAN_RATING = "mean-rating"
_CHOICE_METRIC_MEDIAN_RATING = "median-rating"
ChoiceMetric = Literal["top-choice-share", "mean-rating", "median-rating"]
_SUPPORTED_CHOICE_METRICS = frozenset(
    {
        _CHOICE_METRIC_TOP_CHOICE_SHARE,
        _CHOICE_METRIC_MEAN_RATING,
        _CHOICE_METRIC_MEDIAN_RATING,
    }
)
type DecisionCandidateKind = Literal["discrete-option", "empirical-choice"]


[docs] @dataclass(frozen=True) class DecisionVariableSpec: """One bounded engineering-side design variable.""" symbol: str """Stable symbol used in equations and references.""" label: str """Human-readable variable label.""" unit: str | None """Display unit when the source provides one.""" lower_bound: float """Inclusive lower bound for the variable.""" upper_bound: float """Inclusive upper bound for the variable.""" def __post_init__(self) -> None: """Normalize the bounds and enforce a valid interval. Raises: ValueError: If required fields are empty or the bounds are invalid. """ symbol = self.symbol.strip() if not symbol: raise ValueError("DecisionVariableSpec requires a non-empty symbol.") label = self.label.strip() if not label: raise ValueError(f"DecisionVariableSpec {symbol!r} requires a non-empty label.") unit = None if self.unit is None else self.unit.strip() or None lower_bound = float(self.lower_bound) upper_bound = float(self.upper_bound) if lower_bound > upper_bound: raise ValueError(f"DecisionVariableSpec {symbol!r} has lower_bound > upper_bound.") object.__setattr__(self, "symbol", symbol) object.__setattr__(self, "label", label) object.__setattr__(self, "unit", unit) object.__setattr__(self, "lower_bound", lower_bound) object.__setattr__(self, "upper_bound", upper_bound)
[docs] @dataclass(frozen=True) class DecisionFactor: """One discrete factor in an explicit option space.""" key: str """Stable factor key used in discrete option mappings.""" label: str """Human-readable factor label.""" unit: str | None """Display unit when the source provides one.""" levels: tuple[float, ...] """Ordered discrete levels in source order.""" part_worths: tuple[float, ...] """Utility coefficients aligned with ``levels``.""" def __post_init__(self) -> None: """Normalize and validate the factor definition. Raises: ValueError: If required fields are empty or the factor payload is malformed. """ key = self.key.strip() if not key: raise ValueError("DecisionFactor requires a non-empty key.") label = self.label.strip() if not label: raise ValueError(f"DecisionFactor {key!r} requires a non-empty label.") unit = None if self.unit is None else self.unit.strip() or None levels = tuple(float(level) for level in self.levels) if not levels: raise ValueError(f"DecisionFactor {key!r} must define at least one level.") if any(left >= right for left, right in pairwise(levels)): raise ValueError(f"DecisionFactor {key!r} levels must be strictly increasing.") part_worths = tuple(float(value) for value in self.part_worths) if part_worths and len(part_worths) != len(levels): raise ValueError(f"DecisionFactor {key!r} part_worths length must match levels length.") object.__setattr__(self, "key", key) object.__setattr__(self, "label", label) object.__setattr__(self, "unit", unit) object.__setattr__(self, "levels", levels) object.__setattr__(self, "part_worths", part_worths)
[docs] @dataclass(frozen=True) class DecisionOption: """One explicit discrete option candidate.""" values: Mapping[str, float] """Exact factor-key to level-value mapping.""" def __post_init__(self) -> None: """Freeze the numeric value mapping.""" object.__setattr__(self, "values", _freeze_numeric_mapping(self.values))
[docs] @dataclass(frozen=True) class DecisionProfile: """One observed alternative profile, such as a competitor.""" name: str """Display name for the observed profile.""" values: Mapping[str, float] """Observed factor-key to value mapping.""" def __post_init__(self) -> None: """Freeze the profile payload. Raises: ValueError: If the profile name is empty or the values mapping is malformed. """ name = self.name.strip() if not name: raise ValueError("DecisionProfile requires a non-empty name.") object.__setattr__(self, "name", name) object.__setattr__(self, "values", _freeze_numeric_mapping(self.values))
[docs] @dataclass(frozen=True) class DecisionObjectiveSpec: """Structured description of a decision objective.""" key: str """Stable identifier for the objective.""" label: str """Human-readable objective label.""" sense: str """Optimization sense such as ``maximize``.""" domain: str """Model domain such as ``discrete-option``.""" expression: str """Symbolic formula text for the objective.""" variables: tuple[str, ...] """Referenced variable or factor names.""" executable: bool """Whether the package can evaluate this objective directly.""" def __post_init__(self) -> None: """Normalize textual fields and variable references. Raises: ValueError: If required fields are empty. """ key = self.key.strip() if not key: raise ValueError("DecisionObjectiveSpec requires a non-empty key.") label = self.label.strip() if not label: raise ValueError(f"DecisionObjectiveSpec {key!r} requires a non-empty label.") sense = self.sense.strip().lower() if not sense: raise ValueError(f"DecisionObjectiveSpec {key!r} requires a non-empty sense.") domain = self.domain.strip().lower() if not domain: raise ValueError(f"DecisionObjectiveSpec {key!r} requires a non-empty domain.") expression = self.expression.strip() if not expression: raise ValueError(f"DecisionObjectiveSpec {key!r} requires a non-empty expression.") variables = _normalize_string_tuple(self.variables, context=f"DecisionObjectiveSpec {key!r} variables") object.__setattr__(self, "key", key) object.__setattr__(self, "label", label) object.__setattr__(self, "sense", sense) object.__setattr__(self, "domain", domain) object.__setattr__(self, "expression", expression) object.__setattr__(self, "variables", variables)
[docs] @dataclass(frozen=True) class DecisionConstraintSpec: """Structured description of a decision constraint.""" key: str """Stable identifier for the constraint.""" label: str """Human-readable constraint label.""" relation: str """Constraint relation such as ``<=``.""" domain: str """Model domain such as ``continuous-design``.""" expression: str """Symbolic formula text for the constraint.""" variables: tuple[str, ...] """Referenced variable names.""" executable: bool """Whether the package can evaluate this constraint directly.""" def __post_init__(self) -> None: """Normalize textual fields and variable references. Raises: ValueError: If required fields are empty. """ key = self.key.strip() if not key: raise ValueError("DecisionConstraintSpec requires a non-empty key.") label = self.label.strip() if not label: raise ValueError(f"DecisionConstraintSpec {key!r} requires a non-empty label.") relation = self.relation.strip() if not relation: raise ValueError(f"DecisionConstraintSpec {key!r} requires a non-empty relation.") domain = self.domain.strip().lower() if not domain: raise ValueError(f"DecisionConstraintSpec {key!r} requires a non-empty domain.") expression = self.expression.strip() if not expression: raise ValueError(f"DecisionConstraintSpec {key!r} requires a non-empty expression.") variables = _normalize_string_tuple(self.variables, context=f"DecisionConstraintSpec {key!r} variables") object.__setattr__(self, "key", key) object.__setattr__(self, "label", label) object.__setattr__(self, "relation", relation) object.__setattr__(self, "domain", domain) object.__setattr__(self, "expression", expression) object.__setattr__(self, "variables", variables)
[docs] @dataclass(frozen=True) class DecisionChoiceBenchmark: """One empirical categorical choice benchmark entry.""" key: str """Stable option key used for evaluation.""" label: str """Human-readable option label.""" top_choice_share: float """Tie-adjusted fraction of experts whose top score includes this choice.""" mean_rating: float """Mean 0-10 source rating for this choice.""" median_rating: float """Median 0-10 source rating for this choice.""" std_rating: float """Sample standard deviation of the source ratings.""" def __post_init__(self) -> None: """Normalize and validate the benchmark entry. Raises: ValueError: If any required field is empty or any numeric field is invalid. """ key = self.key.strip().lower() if not key: raise ValueError("DecisionChoiceBenchmark requires a non-empty key.") label = self.label.strip() if not label: raise ValueError(f"DecisionChoiceBenchmark {key!r} requires a non-empty label.") top_choice_share = float(self.top_choice_share) if not 0.0 <= top_choice_share <= 1.0: raise ValueError(f"DecisionChoiceBenchmark {key!r} top_choice_share must be in [0, 1].") std_rating = float(self.std_rating) if std_rating < 0.0: raise ValueError(f"DecisionChoiceBenchmark {key!r} std_rating must be non-negative.") object.__setattr__(self, "key", key) object.__setattr__(self, "label", label) object.__setattr__(self, "top_choice_share", top_choice_share) object.__setattr__(self, "mean_rating", float(self.mean_rating)) object.__setattr__(self, "median_rating", float(self.median_rating)) object.__setattr__(self, "std_rating", std_rating)
type DecisionCandidate = DecisionOption | Mapping[str, float] | str
[docs] @dataclass(frozen=True) class DecisionEvaluation: """Unified evaluation result for one decision candidate.""" candidate_kind: DecisionCandidateKind """Whether the result came from a discrete or empirical decision problem.""" candidate: DecisionOption | str """Canonical candidate representation used for evaluation.""" candidate_label: str """Human-readable candidate label.""" objective_value: float """Objective scalar used for ranking.""" objective_metric: str """Metric or objective identifier used to populate ``objective_value``.""" higher_is_better: bool = True """Whether larger objective values are better for ranking.""" option: DecisionOption | None = None """Normalized option that was evaluated when in discrete mode.""" utility: float | None = None """Discrete part-worth utility of the option.""" predicted_share: float | None = None """Predicted logit market share against the competitor set.""" expected_demand_units: float | None = None """Expected demand under the fixed market-size assumption.""" choice_key: str | None = None """Canonical empirical choice key when in empirical mode.""" choice_label: str | None = None """Human-readable empirical choice label when in empirical mode.""" top_choice_share: float | None = None """Tie-adjusted fraction of expert top matches.""" mean_rating: float | None = None """Mean 0-10 source rating for the choice.""" median_rating: float | None = None """Median 0-10 source rating for the choice.""" std_rating: float | None = None """Sample standard deviation of the source ratings.""" response_count: int | None = None """Number of valid respondents included in the aggregate.""" def __post_init__(self) -> None: """Normalize textual and numeric outputs. Raises: Exception: Raised when the callable encounters an invalid state. """ candidate_label = self.candidate_label.strip() if not candidate_label: raise ValueError("DecisionEvaluation requires a non-empty candidate_label.") objective_metric = self.objective_metric.strip().lower() if not objective_metric: raise ValueError("DecisionEvaluation requires a non-empty objective_metric.") if self.candidate_kind not in {"discrete-option", "empirical-choice"}: raise ValueError(f"Unsupported decision candidate kind: {self.candidate_kind!r}") object.__setattr__(self, "candidate_label", candidate_label) object.__setattr__(self, "objective_value", float(self.objective_value)) object.__setattr__(self, "objective_metric", objective_metric) object.__setattr__(self, "higher_is_better", bool(self.higher_is_better)) if self.utility is not None: object.__setattr__(self, "utility", float(self.utility)) if self.predicted_share is not None: object.__setattr__(self, "predicted_share", float(self.predicted_share)) if self.expected_demand_units is not None: object.__setattr__(self, "expected_demand_units", float(self.expected_demand_units)) if self.choice_key is not None: choice_key = self.choice_key.strip().lower() if not choice_key: raise ValueError("DecisionEvaluation choice_key must be non-empty when provided.") object.__setattr__(self, "choice_key", choice_key) if self.choice_label is not None: choice_label = self.choice_label.strip() if not choice_label: raise ValueError("DecisionEvaluation choice_label must be non-empty when provided.") object.__setattr__(self, "choice_label", choice_label) if self.top_choice_share is not None: object.__setattr__(self, "top_choice_share", float(self.top_choice_share)) if self.mean_rating is not None: object.__setattr__(self, "mean_rating", float(self.mean_rating)) if self.median_rating is not None: object.__setattr__(self, "median_rating", float(self.median_rating)) if self.std_rating is not None: object.__setattr__(self, "std_rating", float(self.std_rating)) if self.response_count is not None: response_count = int(self.response_count) if response_count < 0: raise ValueError("DecisionEvaluation response_count must be non-negative.") object.__setattr__(self, "response_count", response_count)
@dataclass(frozen=True) class _ParsedDecisionPayload: """Internal parsed representation of structured decision fields.""" decision_variable_specs: tuple[DecisionVariableSpec, ...] """Parsed engineering variable specs.""" option_factors: tuple[DecisionFactor, ...] """Parsed discrete conjoint factors.""" choice_benchmarks: tuple[DecisionChoiceBenchmark, ...] """Parsed empirical categorical choice benchmarks.""" competitor_profiles: tuple[DecisionProfile, ...] """Parsed observed competitor profiles.""" objective_specs: tuple[DecisionObjectiveSpec, ...] """Parsed objective descriptors.""" constraint_specs: tuple[DecisionConstraintSpec, ...] """Parsed constraint descriptors.""" default_choice_metric: ChoiceMetric """Default metric for empirical choice ranking.""" response_count: int """Number of valid respondents represented by the aggregates.""" @dataclass(frozen=True) class _NaturalCubicSpline: """Natural cubic spline with linear endpoint extrapolation.""" x_values: tuple[float, ...] """Ordered spline knot positions.""" y_values: tuple[float, ...] """Spline knot values aligned with ``x_values``.""" second_derivatives: tuple[float, ...] """Precomputed second derivatives at each knot.""" @classmethod def from_points(cls, x_values: Sequence[float], y_values: Sequence[float]) -> _NaturalCubicSpline: """Build one spline from ordered knot values. Args: x_values: Ordered knot positions. y_values: Knot values aligned with ``x_values``. Returns: Spline instance ready for evaluation. Raises: ValueError: If the point sequences are empty, mismatched, or not strictly increasing. """ x = tuple(float(value) for value in x_values) y = tuple(float(value) for value in y_values) if len(x) != len(y): raise ValueError("Natural cubic spline requires x and y sequences of the same length.") if not x: raise ValueError("Natural cubic spline requires at least one point.") if any(left >= right for left, right in pairwise(x)): raise ValueError("Natural cubic spline x values must be strictly increasing.") if len(x) <= 2: second_derivatives = tuple(0.0 for _ in x) return cls(x_values=x, y_values=y, second_derivatives=second_derivatives) h = [right - left for left, right in pairwise(x)] alpha = [ 6.0 * ((y[index + 1] - y[index]) / h[index] - (y[index] - y[index - 1]) / h[index - 1]) for index in range(1, len(x) - 1) ] lower = [0.0] diagonal = [1.0] upper = [0.0] rhs = [0.0] for index in range(1, len(x) - 1): lower.append(h[index - 1]) diagonal.append(2.0 * (h[index - 1] + h[index])) upper.append(h[index] if index < len(x) - 2 else 0.0) rhs.append(alpha[index - 1]) lower.append(0.0) diagonal.append(1.0) upper.append(0.0) rhs.append(0.0) second_derivatives = _solve_tridiagonal(lower, diagonal, upper, rhs) return cls(x_values=x, y_values=y, second_derivatives=second_derivatives) def evaluate(self, x_value: float) -> float: """Evaluate the spline or its boundary extension at one point. Args: x_value: Input position to evaluate. Returns: Interpolated or linearly extrapolated value. Raises: RuntimeError: If interval lookup unexpectedly fails for an in-range value. """ if len(self.x_values) == 1: return self.y_values[0] x_value = float(x_value) if x_value <= self.x_values[0]: slope = self._segment_slope(segment_index=0, at_right=False) return self.y_values[0] + slope * (x_value - self.x_values[0]) if x_value >= self.x_values[-1]: slope = self._segment_slope(segment_index=len(self.x_values) - 2, at_right=True) return self.y_values[-1] + slope * (x_value - self.x_values[-1]) for segment_index, (left, right) in enumerate(pairwise(self.x_values)): if left <= x_value <= right: h = right - left alpha = (right - x_value) / h beta = (x_value - left) / h return ( alpha * self.y_values[segment_index] + beta * self.y_values[segment_index + 1] + ( ((alpha**3) - alpha) * self.second_derivatives[segment_index] + ((beta**3) - beta) * self.second_derivatives[segment_index + 1] ) * (h**2) / 6.0 ) raise RuntimeError("Spline interval lookup failed for an in-range value.") def _segment_slope(self, segment_index: int, at_right: bool) -> float: """Return the derivative at one end of a segment. Args: segment_index: Zero-based segment index. at_right: Whether to evaluate the derivative at the right endpoint. Returns: Segment-end derivative. """ left = self.x_values[segment_index] right = self.x_values[segment_index + 1] h = right - left delta = (self.y_values[segment_index + 1] - self.y_values[segment_index]) / h left_m = self.second_derivatives[segment_index] right_m = self.second_derivatives[segment_index + 1] if at_right: return delta + h * (2.0 * right_m + left_m) / 6.0 return delta - h * (2.0 * left_m + right_m) / 6.0 def parse_structured_decision_payload(parameters: Mapping[str, object]) -> _ParsedDecisionPayload: """Parse one manifest parameter mapping into typed decision metadata. Args: parameters: Raw manifest parameter mapping. Returns: Parsed typed payload. Raises: ValueError: If any structured payload is malformed or internally inconsistent. """ decision_variable_specs = _parse_decision_variable_specs(parameters) option_factors = _parse_option_factors(parameters) choice_benchmarks = _parse_choice_benchmarks(parameters) competitor_profiles = _parse_competitor_profiles(parameters) objective_specs = _parse_objective_specs(parameters) constraint_specs = _parse_constraint_specs(parameters) default_choice_metric = _parse_default_choice_metric(parameters, has_choice_benchmarks=bool(choice_benchmarks)) response_count = _parse_response_count(parameters, has_choice_benchmarks=bool(choice_benchmarks)) variable_symbols = {spec.symbol for spec in decision_variable_specs} factor_keys = {factor.key for factor in option_factors} choice_keys = {benchmark.key for benchmark in choice_benchmarks} symbolic_names = variable_symbols | factor_keys if choice_benchmarks: symbolic_names.add("material") objective_keys = set[str]() for objective in objective_specs: if objective.key in objective_keys: raise ValueError(f"Duplicate decision objective key: {objective.key!r}") objective_keys.add(objective.key) if objective.executable: if objective.sense != "maximize": raise ValueError(f"Executable decision objective {objective.key!r} must use sense 'maximize'.") if objective.domain == "discrete-option": if not option_factors: raise ValueError(f"Executable decision objective {objective.key!r} requires option_factors.") if not competitor_profiles: raise ValueError(f"Executable decision objective {objective.key!r} requires competitor_profiles.") if any(not factor.part_worths for factor in option_factors): raise ValueError(f"Executable decision objective {objective.key!r} requires factor part_worths.") unknown = [name for name in objective.variables if name not in factor_keys] if unknown: raise ValueError( f"Executable decision objective {objective.key!r} references unknown factor keys: {unknown!r}" ) continue if objective.domain == "empirical-choice": if not choice_benchmarks: raise ValueError(f"Executable decision objective {objective.key!r} requires choice_options.") unknown = [name for name in objective.variables if name != "material"] if unknown: raise ValueError( f"Executable decision objective {objective.key!r} references unknown variables: {unknown!r}" ) continue raise ValueError( f"Executable decision objective {objective.key!r} must use domain 'discrete-option' or " f"'empirical-choice'." ) else: unknown = [name for name in objective.variables if name not in symbolic_names] if unknown: raise ValueError(f"Decision objective {objective.key!r} references unknown variables: {unknown!r}") constraint_keys = set[str]() for constraint in constraint_specs: if constraint.key in constraint_keys: raise ValueError(f"Duplicate decision constraint key: {constraint.key!r}") constraint_keys.add(constraint.key) unknown = [name for name in constraint.variables if name not in variable_symbols] if unknown: raise ValueError(f"Decision constraint {constraint.key!r} references unknown variables: {unknown!r}") if competitor_profiles and not option_factors: raise ValueError("competitor_profiles require option_factors.") ordered_factor_keys = tuple(factor.key for factor in option_factors) factor_key_set = set(ordered_factor_keys) for profile in competitor_profiles: if set(profile.values) != factor_key_set or len(profile.values) != len(ordered_factor_keys): raise ValueError(f"Decision profile {profile.name!r} must include exactly the option factor keys.") if choice_benchmarks and response_count <= 0: raise ValueError("choice_options require a positive response_count.") if len(choice_keys) != len(choice_benchmarks): raise ValueError("choice_options contain duplicate keys.") return _ParsedDecisionPayload( decision_variable_specs=decision_variable_specs, option_factors=option_factors, choice_benchmarks=choice_benchmarks, competitor_profiles=competitor_profiles, objective_specs=objective_specs, constraint_specs=constraint_specs, default_choice_metric=default_choice_metric, response_count=response_count, )
[docs] class DecisionProblem(ComputableProblem[DecisionCandidate, DecisionEvaluation]): """Concrete decision problem with a unified candidate/evaluation workflow."""
[docs] @classmethod def from_manifest(cls, manifest: ProblemManifest) -> DecisionProblem: """Construct the problem directly from a packaged manifest. Args: manifest: Value for ``manifest``. Returns: Computed result for this callable. """ return cls( metadata=manifest.metadata, statement_markdown=manifest.statement_markdown, parameters=manifest.parameters, resource_bundle=cls.resource_bundle_from_manifest(manifest), )
def __init__( self, *, metadata: ProblemMetadata, statement_markdown: str = "", parameters: Mapping[str, object], resource_bundle: PackageResourceBundle | None = None, ) -> None: """Parse the structured decision payload and cache shared lookups. Args: metadata: Value for ``metadata``. statement_markdown: Value for ``statement_markdown``. parameters: Value for ``parameters``. resource_bundle: Value for ``resource_bundle``. Raises: Exception: Raised when the callable encounters an invalid state. """ super().__init__( metadata=metadata, statement_markdown=statement_markdown, resource_bundle=resource_bundle, ) self.parameters = MappingProxyType(dict(parameters)) payload = parse_structured_decision_payload(self.parameters) self._decision_variable_specs = payload.decision_variable_specs self._option_factors = payload.option_factors self._choice_benchmarks = payload.choice_benchmarks self._competitor_profiles = payload.competitor_profiles self._objective_specs = payload.objective_specs self._constraint_specs = payload.constraint_specs self._default_choice_metric = payload.default_choice_metric self._response_count = payload.response_count self._factor_splines = MappingProxyType( { factor.key: _NaturalCubicSpline.from_points(factor.levels, factor.part_worths) for factor in payload.option_factors if factor.part_worths } ) choice_lookup: dict[str, DecisionChoiceBenchmark] = {} for benchmark in payload.choice_benchmarks: choice_lookup[benchmark.key.lower()] = benchmark choice_lookup[benchmark.label.lower()] = benchmark self._choice_lookup = MappingProxyType(choice_lookup) has_discrete = bool(self._option_factors) has_empirical = bool(self._choice_benchmarks) if has_discrete == has_empirical: raise ValueError( "DecisionProblem requires exactly one executable candidate source: " "either option_factors or choice_benchmarks." ) self._candidate_kind: DecisionCandidateKind = "discrete-option" if has_discrete else "empirical-choice" @property def candidate_kind(self) -> DecisionCandidateKind: """Return the active decision-candidate mode. Returns: Computed result for this callable. """ return self._candidate_kind @property def decision_variables(self) -> tuple[str, ...]: """Return the curated decision-variable descriptions. Returns: Decision-variable descriptions in source order. """ return self._string_list("decision_variables") @property def decision_variable_specs(self) -> tuple[DecisionVariableSpec, ...]: """Return typed engineering variable specifications. Returns: Parsed engineering variable specs in source order. """ return self._decision_variable_specs @property def objectives(self) -> tuple[str, ...]: """Return the stated objective descriptions. Returns: Objective descriptions in source order. """ return self._string_list("objectives") @property def objective_specs(self) -> tuple[DecisionObjectiveSpec, ...]: """Return typed objective descriptors. Returns: Parsed objective specs in source order. """ return self._objective_specs @property def constraints(self) -> tuple[str, ...]: """Return the curated constraint descriptions. Returns: Constraint descriptions in source order. """ return self._string_list("constraints") @property def constraint_specs(self) -> tuple[DecisionConstraintSpec, ...]: """Return typed constraint descriptors. Returns: Parsed constraint specs in source order. """ return self._constraint_specs @property def assumptions(self) -> tuple[str, ...]: """Return the modeling assumptions or caveats. Returns: Assumption or caveat strings in source order. """ return self._string_list("assumptions") @property def option_factors(self) -> tuple[DecisionFactor, ...]: """Return the typed discrete conjoint factors. Returns: Parsed discrete factors in source order. """ return self._option_factors @property def choice_benchmarks(self) -> tuple[DecisionChoiceBenchmark, ...]: """Return the empirical categorical choice benchmarks. Returns: Parsed empirical choice benchmarks in source order. """ return self._choice_benchmarks @property def default_choice_metric(self) -> ChoiceMetric: """Return the default metric used for empirical choice ranking. Returns: Supported metric name. """ return self._default_choice_metric @property def competitor_profiles(self) -> tuple[DecisionProfile, ...]: """Return the observed competitor profiles. Returns: Parsed competitor profiles in source order. """ return self._competitor_profiles @property def option_count(self) -> int: """Return the total size of the explicit discrete option space. Returns: Product of all discrete factor cardinalities. """ if not self.option_factors: return 0 return int(prod(len(factor.levels) for factor in self.option_factors)) @property def candidate_count(self) -> int: """Return the number of candidates exposed by the active decision mode. Returns: Computed result for this callable. """ if self.candidate_kind == "discrete-option": return self.option_count return len(self.choice_benchmarks)
[docs] def iter_candidates(self) -> Iterator[DecisionOption | str]: """Yield candidates in deterministic source order. Yields: Generated values from iter candidates. """ if self.candidate_kind == "discrete-option": factor_keys = tuple(factor.key for factor in self.option_factors) level_domains = tuple(factor.levels for factor in self.option_factors) for combination in product(*level_domains): yield DecisionOption(values=dict(zip(factor_keys, combination, strict=True))) return for benchmark in self.choice_benchmarks: yield benchmark.key
[docs] def evaluate(self, candidate: DecisionCandidate) -> DecisionEvaluation: """Evaluate one candidate using the active decision mode. Args: candidate: Value for ``candidate``. Returns: Computed result for this callable. Raises: Exception: Raised when the callable encounters an invalid state. """ if self.candidate_kind == "discrete-option": if isinstance(candidate, str) or not isinstance(candidate, (DecisionOption, Mapping)): raise TypeError( "Discrete-option decision problems require a DecisionOption or factor-to-level mapping." ) return self._evaluate_option(candidate) if not isinstance(candidate, str): raise TypeError("Empirical-choice decision problems require a string choice key.") return self._evaluate_choice(candidate)
[docs] def iter_evaluations(self, metric: ChoiceMetric | None = None) -> Iterator[DecisionEvaluation]: """Yield evaluations for every candidate in deterministic order. Args: metric: Value for ``metric``. Yields: Generated values from iter evaluations. Raises: Exception: Raised when the callable encounters an invalid state. """ if self.candidate_kind == "discrete-option": if metric is not None: raise ProblemEvaluationError("Discrete-option decision problems do not support metric overrides.") for candidate in self.iter_candidates(): yield self._evaluate_option(cast(DecisionOption, candidate)) return selected_metric = self._normalize_choice_metric(metric) for benchmark in self.choice_benchmarks: yield self._build_choice_evaluation(benchmark, selected_metric)
[docs] def rank_evaluations(self, metric: ChoiceMetric | None = None) -> tuple[DecisionEvaluation, ...]: """Return all candidate evaluations ranked by the active objective. Args: metric: Value for ``metric``. Returns: Computed result for this callable. Raises: Exception: Raised when the callable encounters an invalid state. """ if self.candidate_kind == "discrete-option": if metric is not None: raise ProblemEvaluationError("Discrete-option decision problems do not support metric overrides.") evaluations = tuple(self.iter_evaluations()) return tuple(sorted(evaluations, key=lambda item: item.objective_value, reverse=True)) ranked = list(enumerate(self.iter_evaluations(metric=metric))) ranked.sort( key=lambda item: ( -item[1].objective_value, -cast(float, item[1].mean_rating), item[0], ) ) return tuple(evaluation for _, evaluation in ranked)
[docs] def best_evaluation(self, metric: ChoiceMetric | None = None) -> DecisionEvaluation: """Return the highest-ranked evaluation in the active mode. Args: metric: Value for ``metric``. Returns: Computed result for this callable. Raises: Exception: Raised when the callable encounters an invalid state. """ ranked = self.rank_evaluations(metric=metric) if not ranked: raise ProblemEvaluationError("Decision problem does not define any evaluable candidates.") return ranked[0]
def _evaluate_option(self, option: DecisionOption | Mapping[str, float]) -> DecisionEvaluation: """Evaluate one explicit discrete option against the competitor set. Args: option: Value for ``option``. Returns: Computed result for this callable. Raises: Exception: Raised when the callable encounters an invalid state. """ objective = self._executable_objective() if objective.domain != "discrete-option": raise ProblemEvaluationError("Decision problem does not define an executable discrete-option objective.") candidate = self._coerce_option(option) utility = sum(self._factor_part_worth(factor, candidate.values[factor.key]) for factor in self.option_factors) denominator = exp(utility) for profile in self.competitor_profiles: denominator += exp(self._profile_utility(profile)) predicted_share = exp(utility) / denominator expected_demand_units = _DISCRETE_MARKET_SIZE * predicted_share return DecisionEvaluation( candidate_kind="discrete-option", candidate=candidate, candidate_label=self._format_option_label(candidate), objective_metric=objective.key, option=candidate, utility=utility, predicted_share=predicted_share, expected_demand_units=expected_demand_units, objective_value=predicted_share, ) def _evaluate_choice( self, choice: str, metric: ChoiceMetric | None = None, ) -> DecisionEvaluation: """Evaluate one empirical categorical choice option. Args: choice: Value for ``choice``. metric: Value for ``metric``. Returns: Computed result for this callable. """ benchmark = self._coerce_choice(choice) selected_metric = self._normalize_choice_metric(metric) return self._build_choice_evaluation(benchmark, selected_metric) def _build_choice_evaluation( self, benchmark: DecisionChoiceBenchmark, metric: ChoiceMetric, ) -> DecisionEvaluation: """Build one empirical evaluation payload for a normalized benchmark. Args: benchmark: Value for ``benchmark``. metric: Value for ``metric``. Returns: Computed result for this callable. """ return DecisionEvaluation( candidate_kind="empirical-choice", candidate=benchmark.key, candidate_label=benchmark.label, objective_metric=metric, choice_key=benchmark.key, choice_label=benchmark.label, top_choice_share=benchmark.top_choice_share, mean_rating=benchmark.mean_rating, median_rating=benchmark.median_rating, std_rating=benchmark.std_rating, response_count=self._response_count, objective_value=self._choice_metric_value(benchmark, metric), ) def _rank_choice_evaluations( self, metric: ChoiceMetric | None = None, ) -> tuple[DecisionEvaluation, ...]: """Return all empirical choices ranked by one metric. Args: metric: Value for ``metric``. Returns: Computed result for this callable. """ return self.rank_evaluations(metric=metric)
[docs] def render_brief( self, include_citation: bool = True, citation_mode: Literal["summary", "summary+raw", "raw"] = "summary", ) -> str: """Render the decision statement plus its extracted structure. Args: include_citation: Whether to append bundled source citations. citation_mode: Citation rendering mode for the ``Sources`` section. Returns: Markdown brief suitable for review or reuse. """ sections = [super().render_brief(include_citation=False)] context_lines: list[str] = [] for key, label in ( ("decision_maker", "Decision maker"), ("market_segment", "Market segment"), ("decision_scope", "Decision scope"), ): value = self._string_value(key) if value is not None: context_lines.append(f"- {label}: {value}") if context_lines: sections.append("## Context") sections.append("\n".join(context_lines)) for heading, values in ( ("Decision Variables", self.decision_variables), ("Objectives", self.objectives), ("Constraints", self.constraints), ("Assumptions", self.assumptions), ): if values: sections.append(f"## {heading}") sections.append(self._render_bullets(values)) if self.objective_specs: sections.append("## Objective Model") sections.append(self._render_objective_specs()) if self.constraint_specs: sections.append("## Constraint Equations") sections.append(self._render_constraint_specs()) if self.option_factors: sections.append("## Option Space") sections.append(self._render_option_space()) if self.choice_benchmarks: sections.append("## Choices") sections.append(self._render_choice_options()) sections.append("## Empirical Benchmark") sections.append(self._render_empirical_benchmark()) if include_citation and self.metadata.citations: if citation_mode in {"summary", "summary+raw"}: sections.append("## Sources") sections.append(self._render_citation_summaries()) if citation_mode in {"raw", "summary+raw"}: sections.append("## BibTeX") sections.append(self._render_citation_raw_blocks()) return "\n\n".join(sections)
[docs] def to_mcp_server( self, *, server_name: str | None = None, include_citation: bool = True, citation_mode: Literal["summary", "summary+raw", "raw"] = "summary", ) -> FastMCP: """Expose this decision problem through FastMCP. The exported server exposes: - ``problem://design-brief`` resource (structured decision brief) - ``problem://decision-candidates`` resource (deterministic index mapping) - ``list_candidates()`` tool for deterministic candidate indexing - ``evaluate(choice_index)`` tool - ``submit_final(choice_index, justification?)`` tool Args: server_name: Optional explicit server name. include_citation: Whether the brief includes citation sections. citation_mode: Citation rendering mode for the brief resource. Returns: Configured FastMCP server. """ server = create_fastmcp_server(self, server_name=server_name) register_design_brief_resource( server, brief_text=self.render_brief(include_citation=include_citation, citation_mode=citation_mode), ) indexed_candidates = tuple(enumerate(self.iter_candidates())) def _candidate_entries() -> list[dict[str, object]]: """Return deterministic indexed candidate metadata rows. Returns: Indexed candidate rows for tooling and resources. """ entries: list[dict[str, object]] = [] for index, candidate in indexed_candidates: if isinstance(candidate, str): label = self._coerce_choice(candidate).label else: label = self._format_option_label(candidate) entries.append( { "choice_index": index, "candidate": to_json_value(candidate), "candidate_label": label, } ) return entries def _require_choice_index(choice_index: int) -> int: """Validate one zero-based candidate index. Args: choice_index: Candidate index to validate. Returns: Normalized index. Raises: ValueError: If ``choice_index`` is outside the valid range. """ if choice_index < 0 or choice_index >= len(indexed_candidates): raise ValueError( f"choice_index must be in [0, {len(indexed_candidates) - 1}] for this decision problem." ) return choice_index def _decision_candidates_payload() -> dict[str, object]: """Build the deterministic decision-candidate index mapping payload. Returns: MCP-ready payload describing the indexed candidates. """ return { "problem_id": self.metadata.problem_id, "candidate_kind": self.candidate_kind, "candidate_count": len(indexed_candidates), "candidates": _candidate_entries(), } @server.resource( "problem://decision-candidates", name="decision-candidates", title="Decision Candidates", description="Deterministic zero-based candidate index mapping.", mime_type="application/json", ) def decision_candidates() -> dict[str, object]: """Return the deterministic decision-candidate index mapping.""" return _decision_candidates_payload() def list_candidates() -> dict[str, object]: """Return the deterministic decision-candidate index mapping. Returns: MCP-ready payload describing the indexed candidates. """ return _decision_candidates_payload() def evaluate_tool(choice_index: int) -> dict[str, object]: """Evaluate one indexed decision candidate. Args: choice_index: Zero-based index of the selected candidate. Returns: MCP-ready evaluation payload for the selected candidate. """ choice_index = _require_choice_index(choice_index) candidate = indexed_candidates[choice_index][1] evaluation = self.evaluate(candidate) return { "problem_id": self.metadata.problem_id, "problem_kind": self.metadata.kind.value, "candidate_kind": self.candidate_kind, "choice_index": choice_index, "candidate": to_json_value(candidate), "candidate_label": evaluation.candidate_label, "evaluation": to_json_value(evaluation), "objective_value": evaluation.objective_value, "higher_is_better": evaluation.higher_is_better, "is_feasible": True, } server.add_tool( list_candidates, name="list_candidates", title="List Candidates", description="List deterministic zero-based decision candidate indices and labels.", ) server.add_tool( evaluate_tool, name="evaluate", title="Evaluate Candidate", description="Evaluate a candidate by zero-based index.", ) def submit_final(choice_index: int, justification: str | None = None) -> dict[str, object]: """Submit one indexed final decision answer. Args: choice_index: Zero-based index of the selected candidate. justification: Optional justification text. Returns: MCP-ready submission payload for the selected candidate. """ payload = evaluate_tool(choice_index) payload["justification"] = normalized_optional_text(justification) return payload server.add_tool( submit_final, name="submit_final", title="Submit Final Answer", description="Submit the final answer by zero-based candidate index.", ) return server
def _coerce_option(self, option: DecisionOption | Mapping[str, float]) -> DecisionOption: """Normalize one caller-supplied option to the declared factor space. Args: option: Candidate option object or factor-to-level mapping. Returns: Normalized option validated against the factor space. Raises: ValueError: If the option keys or values do not match the declared factor space. """ raw_values = option.values if isinstance(option, DecisionOption) else option if not isinstance(raw_values, Mapping): raise ValueError("Option must be a mapping of factor keys to numeric levels.") factor_keys = tuple(factor.key for factor in self.option_factors) if set(raw_values) != set(factor_keys) or len(raw_values) != len(factor_keys): raise ValueError("Option must include exactly the declared factor keys.") normalized: dict[str, float] = {} for factor in self.option_factors: raw_value = raw_values[factor.key] value = float(raw_value) if value not in factor.levels: raise ValueError(f"Option value {value!r} is not a declared level for factor {factor.key!r}.") normalized[factor.key] = value return DecisionOption(values=normalized) def _executable_objective(self) -> DecisionObjectiveSpec: """Return the first executable objective descriptor. Returns: The first executable objective spec. Raises: ProblemEvaluationError: If no executable objective is defined. """ for objective in self.objective_specs: if objective.executable: return objective raise ProblemEvaluationError("Decision problem does not define an executable objective.") def _coerce_choice(self, choice: str) -> DecisionChoiceBenchmark: """Normalize one caller-supplied empirical choice identifier. Args: choice: Canonical key or display label to resolve. Returns: Matching empirical choice benchmark. Raises: ValueError: If the requested choice name is unknown. ProblemEvaluationError: If the problem has no empirical choice benchmark data. """ if not self.choice_benchmarks: raise ProblemEvaluationError("Decision problem does not define empirical choice benchmarks.") key = choice.strip().lower() if not key: raise ValueError("Choice must be a non-empty string.") benchmark = self._choice_lookup.get(key) if benchmark is None: raise ValueError(f"Unknown choice: {choice!r}") return benchmark def _normalize_choice_metric( self, metric: ChoiceMetric | None, ) -> ChoiceMetric: """Return one supported empirical choice metric name. Args: metric: Optional user-provided metric override. Returns: Supported metric name. Raises: ValueError: If the provided metric is unsupported. """ normalized = self.default_choice_metric if metric is None else metric.strip().lower() if normalized not in _SUPPORTED_CHOICE_METRICS: raise ValueError(f"Unsupported choice metric: {metric!r}") return cast(ChoiceMetric, normalized) def _choice_metric_value(self, benchmark: DecisionChoiceBenchmark, metric: str) -> float: """Return one benchmark value keyed by metric name. Args: benchmark: Choice benchmark to read. metric: Supported metric name. Returns: Numeric benchmark value. Raises: ValueError: If the provided metric is unsupported. """ if metric == _CHOICE_METRIC_TOP_CHOICE_SHARE: return benchmark.top_choice_share if metric == _CHOICE_METRIC_MEAN_RATING: return benchmark.mean_rating if metric == _CHOICE_METRIC_MEDIAN_RATING: return benchmark.median_rating raise ValueError(f"Unsupported choice metric: {metric!r}") def _format_option_label(self, option: DecisionOption) -> str: """Render one option in factor order as a stable label. Args: option: Value for ``option``. Returns: Computed result for this callable. """ parts = [f"{factor.key}={_format_number(option.values[factor.key])}" for factor in self.option_factors] return ", ".join(parts) def _factor_part_worth(self, factor: DecisionFactor, value: float) -> float: """Return the exact discrete part-worth coefficient for one level. Args: factor: Discrete factor containing the level. value: Exact declared factor level. Returns: Part-worth coefficient for the requested level. Raises: ProblemEvaluationError: If the factor does not carry part-worth coefficients. """ if not factor.part_worths: raise ProblemEvaluationError(f"Factor {factor.key!r} does not define part_worths.") level_index = factor.levels.index(value) return factor.part_worths[level_index] def _profile_utility(self, profile: DecisionProfile) -> float: """Return the spline-interpolated utility for one observed profile. Args: profile: Observed profile to score. Returns: Spline-interpolated utility. Raises: ProblemEvaluationError: If a required factor cannot be evaluated. """ utility = 0.0 for factor in self.option_factors: spline = self._factor_splines.get(factor.key) if spline is None: raise ProblemEvaluationError(f"Factor {factor.key!r} cannot be evaluated without part_worths.") utility += spline.evaluate(profile.values[factor.key]) return utility def _render_constraint_specs(self) -> str: """Render typed constraint descriptors as bullets. Returns: Markdown bullet list for the typed constraints. """ lines = [] for constraint in self.constraint_specs: executable = "yes" if constraint.executable else "no" lines.append( f"- {constraint.label} (`{constraint.key}`, {constraint.relation}, {constraint.domain}, " f"executable={executable}): {constraint.expression}" ) return "\n".join(lines) def _render_objective_specs(self) -> str: """Render typed objective descriptors as bullets. Returns: Markdown bullet list for the typed objectives. """ lines = [] for objective in self.objective_specs: executable = "yes" if objective.executable else "no" lines.append( f"- {objective.label} (`{objective.key}`, {objective.sense}, {objective.domain}, " f"executable={executable}): {objective.expression}" ) return "\n".join(lines) def _render_option_space(self) -> str: """Render the factor space summary without expanding all combinations. Returns: Markdown bullet list summarizing the discrete option space. """ lines: list[str] = [] for factor in self.option_factors: unit_suffix = f" {factor.unit}" if factor.unit else "" levels_text = ", ".join(_format_number(value) for value in factor.levels) lines.append(f"- {factor.label} (`{factor.key}`{unit_suffix}): [{levels_text}]") lines.append(f"- Total discrete options: {self.option_count:,}") return "\n".join(lines) def _render_choice_options(self) -> str: """Render the empirical choice labels in source order. Returns: Markdown bullet list of the categorical options. """ return "\n".join(f"- {benchmark.label} (`{benchmark.key}`)" for benchmark in self.choice_benchmarks) def _render_empirical_benchmark(self) -> str: """Render the empirical benchmark as a Markdown table. Returns: Markdown table sorted by the default choice metric. """ evaluations = self._rank_choice_evaluations(metric=self.default_choice_metric) lines = [ "| Choice | Key | Top Choice Share | Mean Rating | Median Rating | Std Rating |", "| --- | --- | ---: | ---: | ---: | ---: |", ] for evaluation in evaluations: choice_label = cast(str, evaluation.choice_label) choice_key = cast(str, evaluation.choice_key) top_choice_share = cast(float, evaluation.top_choice_share) mean_rating = cast(float, evaluation.mean_rating) median_rating = cast(float, evaluation.median_rating) std_rating = cast(float, evaluation.std_rating) lines.append( "| " f"{choice_label} | " f"`{choice_key}` | " f"{top_choice_share:.6f} | " f"{mean_rating:.6f} | " f"{median_rating:.6f} | " f"{std_rating:.6f} |" ) return "\n".join(lines) def _string_value(self, key: str) -> str | None: """Return one non-empty string parameter when present. Args: key: Parameter name to read from the structured metadata mapping. Returns: Stripped string content, or ``None`` when the parameter is missing or empty. """ raw_value = self.parameters.get(key) if not isinstance(raw_value, str): return None value = raw_value.strip() return value or None def _string_list(self, key: str) -> tuple[str, ...]: """Return one normalized string-list parameter when present. Args: key: Parameter name to read from the structured metadata mapping. Returns: Non-empty string values coerced from the stored sequence, preserving order. """ raw_value = self.parameters.get(key) if not isinstance(raw_value, Sequence) or isinstance(raw_value, (str, bytes)): return () values: list[str] = [] for entry in raw_value: item = str(entry).strip() if item: values.append(item) return tuple(values) def _render_bullets(self, items: tuple[str, ...]) -> str: """Render a tuple of strings as a Markdown bullet list. Args: items: Bullet body text in display order. Returns: Markdown bullet list text. """ return "\n".join(f"- {item}" for item in items)
def load_decision_problem(manifest: ProblemManifest) -> DecisionProblem: """Construct one concrete decision problem from a packaged manifest. Args: manifest: Value for ``manifest``. Returns: Computed result for this callable. """ return DecisionProblem.from_manifest(manifest) def _freeze_numeric_mapping(values: Mapping[str, float]) -> Mapping[str, float]: """Return an immutable string-to-float mapping. Args: values: Raw mapping to normalize and freeze. Returns: Immutable mapping with stripped keys and float values. Raises: ValueError: If any key is empty after normalization. """ normalized: dict[str, float] = {} for key, raw_value in values.items(): normalized_key = str(key).strip() if not normalized_key: raise ValueError("Decision mappings require non-empty keys.") normalized[normalized_key] = float(raw_value) return MappingProxyType(normalized) def _format_number(value: float) -> str: """Render one numeric value without unnecessary trailing zeros. Args: value: Numeric value to render. Returns: Compact decimal text. """ if value.is_integer(): return str(int(value)) return f"{value:g}" def _normalize_string_tuple(raw_values: Sequence[object], context: str) -> tuple[str, ...]: """Return one normalized tuple of non-empty strings. Args: raw_values: Raw values to normalize. context: Context label for validation errors. Returns: Stripped non-empty strings in the original order. Raises: ValueError: If any entry becomes empty after normalization. """ values: list[str] = [] for raw_value in raw_values: value = str(raw_value).strip() if not value: raise ValueError(f"{context} entries must be non-empty strings.") values.append(value) return tuple(values) def _parse_decision_variable_specs(parameters: Mapping[str, object]) -> tuple[DecisionVariableSpec, ...]: """Parse structured engineering variable specs. Args: parameters: Raw manifest parameter mapping. Returns: Parsed engineering variable specs. Raises: ValueError: If the structured payload is malformed. """ raw_specs = _list_of_mappings(parameters.get("decision_variable_specs"), field_name="decision_variable_specs") specs: list[DecisionVariableSpec] = [] seen_symbols: set[str] = set() for entry in raw_specs: spec = DecisionVariableSpec( symbol=str(entry.get("symbol", "")), label=str(entry.get("label", "")), unit=_optional_string(entry.get("unit")), lower_bound=_required_float(entry, "lower_bound"), upper_bound=_required_float(entry, "upper_bound"), ) if spec.symbol in seen_symbols: raise ValueError(f"Duplicate decision variable symbol: {spec.symbol!r}") seen_symbols.add(spec.symbol) specs.append(spec) return tuple(specs) def _parse_option_factors(parameters: Mapping[str, object]) -> tuple[DecisionFactor, ...]: """Parse structured discrete factor specs. Args: parameters: Raw manifest parameter mapping. Returns: Parsed discrete factor specs. Raises: ValueError: If the structured payload is malformed. """ raw_factors = _list_of_mappings(parameters.get("option_factors"), field_name="option_factors") factors: list[DecisionFactor] = [] seen_keys: set[str] = set() for entry in raw_factors: factor = DecisionFactor( key=str(entry.get("key", "")), label=str(entry.get("label", "")), unit=_optional_string(entry.get("unit")), levels=_float_tuple(entry.get("levels"), field_name="option_factors.levels"), part_worths=_float_tuple(entry.get("part_worths"), field_name="option_factors.part_worths"), ) if factor.key in seen_keys: raise ValueError(f"Duplicate option factor key: {factor.key!r}") seen_keys.add(factor.key) factors.append(factor) return tuple(factors) def _parse_choice_benchmarks(parameters: Mapping[str, object]) -> tuple[DecisionChoiceBenchmark, ...]: """Parse structured empirical categorical choice benchmarks. Args: parameters: Raw manifest parameter mapping. Returns: Parsed empirical choice benchmarks. Raises: ValueError: If the structured payload is malformed. """ raw_options = _list_of_mappings(parameters.get("choice_options"), field_name="choice_options") benchmarks: list[DecisionChoiceBenchmark] = [] seen_keys: set[str] = set() seen_labels: set[str] = set() for entry in raw_options: benchmark = DecisionChoiceBenchmark( key=str(entry.get("key", "")), label=str(entry.get("label", "")), top_choice_share=_required_float(entry, "top_choice_share"), mean_rating=_required_float(entry, "mean_rating"), median_rating=_required_float(entry, "median_rating"), std_rating=_required_float(entry, "std_rating"), ) if benchmark.key in seen_keys: raise ValueError(f"Duplicate choice option key: {benchmark.key!r}") label_key = benchmark.label.lower() if label_key in seen_labels: raise ValueError(f"Duplicate choice option label: {benchmark.label!r}") seen_keys.add(benchmark.key) seen_labels.add(label_key) benchmarks.append(benchmark) return tuple(benchmarks) def _parse_default_choice_metric( parameters: Mapping[str, object], has_choice_benchmarks: bool, ) -> ChoiceMetric: """Parse the default empirical choice metric. Args: parameters: Raw manifest parameter mapping. has_choice_benchmarks: Whether empirical choices are present. Returns: Supported metric name. Raises: ValueError: If the provided metric is unsupported. """ raw_value = parameters.get("default_choice_metric") if raw_value is None: del has_choice_benchmarks return cast(ChoiceMetric, _CHOICE_METRIC_TOP_CHOICE_SHARE) value = str(raw_value).strip().lower() if value not in _SUPPORTED_CHOICE_METRICS: raise ValueError(f"Unsupported default_choice_metric: {value!r}") return cast(ChoiceMetric, value) def _parse_response_count(parameters: Mapping[str, object], has_choice_benchmarks: bool) -> int: """Parse the empirical response count. Args: parameters: Raw manifest parameter mapping. has_choice_benchmarks: Whether empirical choices are present. Returns: Parsed integer response count. Raises: ValueError: If the count is negative, non-integral, or missing when required. """ raw_value = parameters.get("response_count") if raw_value is None: return 0 response_count = _coerce_int(raw_value, field_name="response_count") if has_choice_benchmarks and response_count <= 0: raise ValueError("response_count must be positive when choice_options are present.") if response_count < 0: raise ValueError("response_count must be non-negative.") return response_count def _parse_competitor_profiles(parameters: Mapping[str, object]) -> tuple[DecisionProfile, ...]: """Parse structured competitor profiles. Args: parameters: Raw manifest parameter mapping. Returns: Parsed competitor profiles. Raises: ValueError: If the structured payload is malformed. """ raw_profiles = _list_of_mappings(parameters.get("competitor_profiles"), field_name="competitor_profiles") profiles: list[DecisionProfile] = [] seen_names: set[str] = set() for entry in raw_profiles: raw_values = entry.get("values") if not isinstance(raw_values, Mapping): raise ValueError("competitor_profiles values must be mappings.") profile = DecisionProfile(name=str(entry.get("name", "")), values=raw_values) if profile.name in seen_names: raise ValueError(f"Duplicate competitor profile name: {profile.name!r}") seen_names.add(profile.name) profiles.append(profile) return tuple(profiles) def _parse_objective_specs(parameters: Mapping[str, object]) -> tuple[DecisionObjectiveSpec, ...]: """Parse structured objective specs. Args: parameters: Raw manifest parameter mapping. Returns: Parsed objective specs. """ raw_specs = _list_of_mappings(parameters.get("objective_specs"), field_name="objective_specs") specs: list[DecisionObjectiveSpec] = [] for entry in raw_specs: raw_variables = _string_tuple(entry.get("variables"), field_name="objective_specs.variables") specs.append( DecisionObjectiveSpec( key=str(entry.get("key", "")), label=str(entry.get("label", "")), sense=str(entry.get("sense", "")), domain=str(entry.get("domain", "")), expression=str(entry.get("expression", "")), variables=raw_variables, executable=bool(entry.get("executable", False)), ) ) return tuple(specs) def _parse_constraint_specs(parameters: Mapping[str, object]) -> tuple[DecisionConstraintSpec, ...]: """Parse structured constraint specs. Args: parameters: Raw manifest parameter mapping. Returns: Parsed constraint specs. """ raw_specs = _list_of_mappings(parameters.get("constraint_specs"), field_name="constraint_specs") specs: list[DecisionConstraintSpec] = [] for entry in raw_specs: raw_variables = _string_tuple(entry.get("variables"), field_name="constraint_specs.variables") specs.append( DecisionConstraintSpec( key=str(entry.get("key", "")), label=str(entry.get("label", "")), relation=str(entry.get("relation", "")), domain=str(entry.get("domain", "")), expression=str(entry.get("expression", "")), variables=raw_variables, executable=bool(entry.get("executable", False)), ) ) return tuple(specs) def _list_of_mappings(raw_value: object, field_name: str) -> tuple[Mapping[str, object], ...]: """Return one tuple of mapping entries from a raw manifest field. Args: raw_value: Raw manifest field to inspect. field_name: Field label for validation errors. Returns: Tuple of mapping entries. Raises: ValueError: If the raw field is not a sequence of mappings. """ if raw_value is None: return () if not isinstance(raw_value, Sequence) or isinstance(raw_value, (str, bytes)): raise ValueError(f"{field_name} must be a sequence of mappings.") mappings: list[Mapping[str, object]] = [] for entry in raw_value: if not isinstance(entry, Mapping): raise ValueError(f"{field_name} entries must be mappings.") mappings.append(entry) return tuple(mappings) def _sequence(raw_value: object, field_name: str) -> tuple[object, ...]: """Return one sequence field as a tuple. Args: raw_value: Raw manifest field to inspect. field_name: Field label for validation errors. Returns: Tuple of raw sequence entries. Raises: ValueError: If the raw field is not a supported sequence. """ if not isinstance(raw_value, Sequence) or isinstance(raw_value, (str, bytes)): raise ValueError(f"{field_name} must be a sequence.") return tuple(raw_value) def _float_tuple(raw_value: object, field_name: str) -> tuple[float, ...]: """Return one raw numeric sequence as a tuple of floats. Args: raw_value: Raw manifest field to inspect. field_name: Field label for validation errors. Returns: Tuple of normalized float values. """ return tuple(_coerce_float(value, field_name=field_name) for value in _sequence(raw_value, field_name=field_name)) def _string_tuple(raw_value: object, field_name: str) -> tuple[str, ...]: """Return one raw string sequence as a tuple of normalized strings. Args: raw_value: Raw manifest field to inspect. field_name: Field label for validation errors. Returns: Tuple of stripped strings. """ return tuple(str(value).strip() for value in _sequence(raw_value, field_name=field_name)) def _optional_string(raw_value: object) -> str | None: """Normalize one optional string field. Args: raw_value: Raw optional scalar value. Returns: Stripped string content, or ``None`` when empty or missing. """ if raw_value is None: return None value = str(raw_value).strip() return value or None def _required_float(entry: Mapping[str, object], key: str) -> float: """Read one required numeric field from a raw mapping. Args: entry: Raw mapping to read from. key: Required field name. Returns: Parsed float value. Raises: ValueError: If the field is missing or cannot be parsed as numeric. """ if key not in entry: raise ValueError(f"Missing required numeric field: {key}") return _coerce_float(entry[key], field_name=key) def _coerce_int(raw_value: object, field_name: str) -> int: """Coerce one raw scalar to int with a clear validation error. Args: raw_value: Raw scalar to convert. field_name: Field label for validation errors. Returns: Parsed integer value. Raises: ValueError: If the raw value is empty, non-numeric, or not integral. """ numeric_value = _coerce_float(raw_value, field_name=field_name) if not numeric_value.is_integer(): raise ValueError(f"{field_name} must be an integer.") return int(numeric_value) def _coerce_float(raw_value: object, field_name: str) -> float: """Coerce one raw scalar to float with a clear validation error. Args: raw_value: Raw scalar to convert. field_name: Field label for validation errors. Returns: Parsed float value. Raises: ValueError: If the raw value is empty or non-numeric. """ if isinstance(raw_value, bool): return float(raw_value) if isinstance(raw_value, (int, float)): return float(raw_value) if isinstance(raw_value, str): value = raw_value.strip() if not value: raise ValueError(f"{field_name} must not be an empty string.") return float(value) raise ValueError(f"{field_name} must be numeric.") def _solve_tridiagonal( lower: Sequence[float], diagonal: Sequence[float], upper: Sequence[float], rhs: Sequence[float], ) -> tuple[float, ...]: """Solve one tridiagonal linear system with the Thomas algorithm. Args: lower: Lower-diagonal coefficients. diagonal: Main-diagonal coefficients. upper: Upper-diagonal coefficients. rhs: Right-hand-side vector. Returns: Solution vector as a tuple of floats. Raises: ValueError: If the system dimensions are inconsistent or singular. """ size = len(diagonal) if not (len(lower) == len(upper) == len(rhs) == size): raise ValueError("Tridiagonal system inputs must all have the same length.") if size == 0: return () c_prime = [0.0] * size d_prime = [0.0] * size if diagonal[0] == 0.0: raise ValueError("Tridiagonal system has a zero leading diagonal entry.") c_prime[0] = upper[0] / diagonal[0] d_prime[0] = rhs[0] / diagonal[0] for index in range(1, size): denominator = diagonal[index] - lower[index] * c_prime[index - 1] if denominator == 0.0: raise ValueError("Tridiagonal system is singular.") c_prime[index] = upper[index] / denominator if index < size - 1 else 0.0 d_prime[index] = (rhs[index] - lower[index] * d_prime[index - 1]) / denominator solution = [0.0] * size solution[-1] = d_prime[-1] for index in range(size - 2, -1, -1): solution[index] = d_prime[index] - c_prime[index] * solution[index + 1] return tuple(solution) def pairwise(values: Sequence[float]) -> Iterator[tuple[float, float]]: """Yield adjacent pairs from one sequence. Args: values: Ordered values to pair. Yields: Adjacent two-tuples in left-to-right order. """ for index in range(len(values) - 1): yield values[index], values[index + 1]