"""Compiled workflow execution wrapper for delegate compile APIs."""
from __future__ import annotations
from collections.abc import Callable, Mapping
from dataclasses import dataclass, field
from design_research_agents._contracts._execution import ExecutionResult
from design_research_agents._contracts._workflow import (
WorkflowExecutionMode,
WorkflowFailurePolicy,
)
from design_research_agents._tracing import Tracer, finish_trace_run, start_trace_run
from design_research_agents._tracing._result_metadata import (
enrich_execution_result_trace_metadata,
)
from .workflow import Workflow
def _identity_result(result: ExecutionResult) -> ExecutionResult:
"""Return the provided workflow result unchanged."""
return result
[docs]
@dataclass(slots=True, frozen=True, kw_only=True)
class CompiledExecution:
"""Bound compiled delegate execution that can be run repeatedly."""
workflow: Workflow
"""Workflow graph compiled for this execution."""
input: str | Mapping[str, object] | None
"""Bound workflow input payload."""
request_id: str
"""Top-level request identifier for delegate tracing."""
dependencies: Mapping[str, object]
"""Bound dependency payload mapping."""
delegate_name: str
"""Delegate name used for top-level trace metadata."""
finalize: Callable[[ExecutionResult], ExecutionResult] = _identity_result
"""Finalizer that maps the raw workflow result into the delegate result."""
execution_mode: WorkflowExecutionMode = "sequential"
"""Workflow execution mode used by ``run()``."""
failure_policy: WorkflowFailurePolicy = "skip_dependents"
"""Workflow failure policy used by ``run()``."""
tracer: Tracer | None = None
"""Optional tracer used for top-level compile-run traces."""
trace_input: Mapping[str, object] = field(default_factory=dict)
"""Input payload attached to the top-level trace scope."""
workflow_request_id: str | None = None
"""Optional nested workflow request id override."""
[docs]
def run(self) -> ExecutionResult:
"""Execute the compiled workflow and finalize the result."""
trace_scope = start_trace_run(
agent_name=self.delegate_name,
request_id=self.request_id,
input_payload=dict(self.trace_input),
dependencies=dict(self.dependencies),
tracer=self.tracer,
)
try:
workflow_result = self.workflow.run(
input=self.input,
execution_mode=self.execution_mode,
failure_policy=self.failure_policy,
request_id=self.workflow_request_id or self.request_id,
dependencies=self.dependencies,
)
result = self.finalize(workflow_result)
except Exception as exc:
finish_trace_run(trace_scope, error=str(exc))
raise
result = enrich_execution_result_trace_metadata(result=result, tracer=self.tracer)
finish_trace_run(trace_scope, result=result)
return result
__all__ = ["CompiledExecution"]