"""Base class for optimization problems."""
from __future__ import annotations
from abc import ABC, abstractmethod
from collections.abc import Callable
from dataclasses import dataclass
from typing import TYPE_CHECKING, Literal
import numpy
from numpy.typing import NDArray
from design_research_problems.problems._assets import PackageResourceBundle
from design_research_problems.problems._computable import ComputableProblem
from design_research_problems.problems._mcp import (
create_fastmcp_server,
normalized_optional_text,
register_design_brief_resource,
to_json_value,
)
from design_research_problems.problems._metadata import ProblemMetadata
if TYPE_CHECKING:
from mcp.server.fastmcp import FastMCP
@dataclass(frozen=True)
class Bounds:
"""Library-owned variable bounds."""
lb: NDArray[numpy.float64]
"""Lower bounds for each design variable."""
ub: NDArray[numpy.float64]
"""Upper bounds for each design variable."""
@dataclass(frozen=True)
class ConstraintDefinition:
"""Minimal solver-independent constraint definition."""
kind: Literal["eq", "ineq"]
"""Constraint type understood by baseline optimization routines."""
evaluate: Callable[[NDArray[numpy.float64]], float]
"""Callable that maps variables to a scalar constraint value."""
target: float = 0.0
"""Target value for the constraint function."""
@dataclass(frozen=True)
class OptimizationResult:
"""Minimal SciPy-like result object returned by built-in baseline solvers."""
x: NDArray[numpy.float64]
"""Best candidate vector returned by the baseline routine."""
fun: float
"""Objective value evaluated at ``x``."""
success: bool
"""Whether the baseline routine reached a representative valid answer."""
message: str
"""Short human-readable status summary."""
nit: int = 1
"""Number of baseline iterations or candidates considered."""
nfev: int = 0
"""Number of objective evaluations used by the solver."""
[docs]
@dataclass(frozen=True)
class OptimizationEvaluation:
"""Standardized evaluation result for one optimization candidate."""
x: NDArray[numpy.float64]
"""Evaluated candidate vector."""
objective_value: float
"""Objective value at ``x``."""
total_constraint_violation: float
"""Sum of all bound and constraint violations."""
max_constraint_violation: float
"""Largest single bound or constraint violation."""
is_feasible: bool
"""Whether ``x`` is feasible under the default tolerance."""
higher_is_better: bool = False
"""Whether larger objective values are better for ranking."""
@dataclass(frozen=True)
class LocalSearchResult:
"""Internal result object for bounded pattern search."""
x: NDArray[numpy.float64]
"""Best point found by the local search."""
fun: float
"""Objective value at ``x``."""
nit: int
"""Number of search sweeps performed."""
nfev: int
"""Number of objective evaluations performed."""
def bounded_pattern_search(
objective: Callable[[NDArray[numpy.float64]], float],
lower_bounds: NDArray[numpy.float64],
upper_bounds: NDArray[numpy.float64],
initial_solution: NDArray[numpy.float64],
maxiter: int = 200,
initial_step_fraction: float = 0.15,
minimum_step_fraction: float = 1e-4,
) -> LocalSearchResult:
"""Run a deterministic bounded coordinate-pattern local search.
Args:
objective: Scalar objective to minimize.
lower_bounds: Inclusive lower bounds.
upper_bounds: Inclusive upper bounds.
initial_solution: Starting point for the search.
maxiter: Maximum number of coordinate-sweep iterations.
initial_step_fraction: Initial step size as a fraction of the bound span.
minimum_step_fraction: Search stops when all steps fall below this
fraction of the span.
Returns:
Local search result with the best point found.
"""
span = upper_bounds - lower_bounds
safe_span = numpy.where(span > 0.0, span, 1.0)
current = numpy.clip(numpy.array(initial_solution, dtype=float, copy=True), lower_bounds, upper_bounds)
step = numpy.maximum(initial_step_fraction * safe_span, minimum_step_fraction * safe_span)
value = float(objective(current))
nfev = 1
nit = 0
tolerance = 1e-12
for _ in range(maxiter):
nit += 1
improved = False
for index in range(current.shape[0]):
for direction in (-1.0, 1.0):
candidate = current.copy()
candidate[index] = float(
numpy.clip(candidate[index] + direction * step[index], lower_bounds[index], upper_bounds[index])
)
candidate_value = float(objective(candidate))
nfev += 1
if candidate_value + tolerance < value:
current = candidate
value = candidate_value
improved = True
if improved:
step = numpy.minimum(step * 1.1, 0.3 * safe_span)
continue
step *= 0.5
if bool(numpy.all(step <= minimum_step_fraction * safe_span)):
break
return LocalSearchResult(x=current, fun=value, nit=nit, nfev=nfev)
[docs]
class OptimizationProblem(ComputableProblem[NDArray[numpy.float64], OptimizationEvaluation], ABC):
"""Abstract base for optimization problems."""
def __init__(
self,
metadata: ProblemMetadata,
statement_markdown: str = "",
resource_bundle: PackageResourceBundle | None = None,
) -> None:
"""Store shared metadata and initialize empty bounds and constraints.
Args:
metadata: Shared packaged metadata for the problem.
statement_markdown: Human-readable problem statement.
resource_bundle: Optional package-resource loader.
"""
super().__init__(
metadata=metadata,
statement_markdown=statement_markdown,
resource_bundle=resource_bundle,
)
self.bounds = Bounds(
lb=numpy.zeros(0, dtype=float),
ub=numpy.zeros(0, dtype=float),
)
self.constraints: list[ConstraintDefinition] = []
[docs]
def evaluate(self, variables: NDArray[numpy.float64]) -> OptimizationEvaluation:
"""Evaluate one candidate vector without invoking the solver.
Args:
variables: Candidate design vector to score.
Returns:
Standardized optimization evaluation for ``variables``.
"""
candidate = numpy.array(variables, dtype=float, copy=True)
total_violation = self.constraint_violation(candidate)
max_violation = self.max_constraint_violation(candidate)
return OptimizationEvaluation(
x=candidate,
objective_value=self.objective(candidate),
total_constraint_violation=total_violation,
max_constraint_violation=max_violation,
is_feasible=max_violation <= 1e-9,
higher_is_better=False,
)
[docs]
def to_mcp_server(
self,
*,
server_name: str | None = None,
include_citation: bool = True,
citation_mode: Literal["summary", "summary+raw", "raw"] = "summary",
) -> FastMCP:
"""Expose this optimization problem through FastMCP.
The exported server exposes:
- ``problem://design-brief`` resource
- ``evaluate(x)`` tool for stateless candidate scoring
- ``submit_final(final_x, justification?)`` tool
Args:
server_name: Optional explicit server name.
include_citation: Whether the design brief includes citations.
citation_mode: Citation rendering mode for the design brief.
Returns:
Configured FastMCP server.
"""
server = create_fastmcp_server(self, server_name=server_name)
register_design_brief_resource(
server,
brief_text=self.render_brief(include_citation=include_citation, citation_mode=citation_mode),
)
def evaluate_tool(x: list[float]) -> dict[str, object]:
"""Evaluate one candidate vector and return a complete report.
Args:
x: Candidate design vector.
Returns:
MCP-ready evaluation report for the candidate vector.
"""
return self._mcp_evaluation_report(x)
def submit_final(final_x: list[float], justification: str | None = None) -> dict[str, object]:
"""Submit one final optimization vector with an optional justification.
Args:
final_x: Final candidate design vector.
justification: Optional justification text.
Returns:
MCP-ready submission payload for the final candidate.
"""
report = self._mcp_evaluation_report(final_x)
return {
"problem_id": self.metadata.problem_id,
"problem_kind": self.metadata.kind.value,
"final_x": report["candidate_x"],
"justification": normalized_optional_text(justification),
"report": report,
}
server.add_tool(
evaluate_tool,
name="evaluate",
title="Evaluate Design",
description="Evaluate a candidate vector and return feasibility/objective metrics.",
)
server.add_tool(
submit_final,
name="submit_final",
title="Submit Final Answer",
description="Submit the final candidate vector with optional justification.",
)
return server
def _mcp_evaluation_report(self, x: list[float]) -> dict[str, object]:
"""Return one MCP-facing optimization report.
Args:
x: Candidate design vector.
Returns:
JSON-safe report containing standardized evaluation fields and
available problem-specific extras.
"""
vector = self._coerce_mcp_vector(x)
evaluation = self.evaluate(vector)
report: dict[str, object] = {
"problem_id": self.metadata.problem_id,
"candidate_x": to_json_value(vector),
"evaluation": to_json_value(evaluation),
"objective_value": evaluation.objective_value,
"higher_is_better": evaluation.higher_is_better,
"is_feasible": evaluation.is_feasible,
}
objective_components = getattr(self, "objective_components", None)
if callable(objective_components):
report["objective_components"] = to_json_value(objective_components(vector))
decode_candidate = getattr(self, "decode_candidate", None)
if callable(decode_candidate):
report["decoded_candidate"] = to_json_value(decode_candidate(vector))
return report
def _coerce_mcp_vector(self, x: list[float]) -> NDArray[numpy.float64]:
"""Validate and normalize one MCP-provided vector.
Args:
x: Candidate vector from the MCP tool input.
Returns:
Float numpy vector with the expected shape.
Raises:
ValueError: If ``x`` is not a one-dimensional vector matching the
problem dimensionality.
"""
candidate = numpy.array(x, dtype=float, copy=True)
if candidate.ndim != 1:
raise ValueError("x must be a one-dimensional numeric vector.")
expected_shape = self.bounds.lb.shape
if candidate.shape != expected_shape:
raise ValueError(
f"Expected a {expected_shape[0]}-variable design vector, received shape {candidate.shape!r}."
)
return candidate
[docs]
def bound_violation(self, variables: NDArray[numpy.float64]) -> float:
"""Return the total amount by which bounds are violated.
Args:
variables: Candidate design vector.
Returns:
Sum of lower- and upper-bound violations.
"""
lower = numpy.maximum(self.bounds.lb - variables, 0.0)
upper = numpy.maximum(variables - self.bounds.ub, 0.0)
return float(lower.sum() + upper.sum())
[docs]
def constraint_violation(self, variables: NDArray[numpy.float64]) -> float:
"""Return the total equality, inequality, and bound violation.
Args:
variables: Candidate design vector.
Returns:
Total scalar violation measure.
"""
violation = self.bound_violation(variables)
for constraint in self.constraints:
value = constraint.evaluate(variables)
if constraint.kind == "eq":
violation += abs(value - constraint.target)
else:
violation += max(constraint.target - value, 0.0)
return float(violation)
[docs]
def max_constraint_violation(self, variables: NDArray[numpy.float64]) -> float:
"""Return the largest single equality, inequality, or bound violation.
Args:
variables: Candidate design vector.
Returns:
Maximum scalar violation.
"""
values: list[float] = [self.bound_violation(variables)]
for constraint in self.constraints:
value = constraint.evaluate(variables)
if constraint.kind == "eq":
values.append(abs(value - constraint.target))
else:
values.append(max(constraint.target - value, 0.0))
return float(max(values, default=0.0))
[docs]
@abstractmethod
def generate_initial_solution(self, seed: int | None = None) -> NDArray[numpy.float64]:
"""Generate a deterministic or seeded initial solution.
Args:
seed: Optional random seed.
Returns:
Initial solution vector.
"""
[docs]
@abstractmethod
def objective(self, variables: NDArray[numpy.float64]) -> float:
"""Evaluate the objective function.
Args:
variables: Candidate design vector.
Returns:
Scalar objective value.
"""
[docs]
@abstractmethod
def solve(
self,
initial_solution: NDArray[numpy.float64] | None = None,
seed: int | None = None,
maxiter: int = 200,
) -> OptimizationResult:
"""Run the problem's representative baseline optimization routine.
Args:
initial_solution: Optional candidate vector supplied by the caller.
seed: Optional random seed for any stochastic baseline logic.
maxiter: Problem-specific iteration or candidate budget.
Returns:
Baseline optimization result.
"""