Source code for design_research_agents._contracts._workflow

"""Workflow runtime contracts and typed step payloads."""

from __future__ import annotations

from collections.abc import Callable, Mapping, Sequence
from dataclasses import asdict, dataclass, field
from typing import Any, Literal, Protocol, runtime_checkable

from ._delegate import Delegate
from ._execution import ExecutionResult
from ._llm import LLMClient, LLMRequest, LLMResponse
from ._memory import MemoryWriteRecord

type WorkflowExecutionMode = Literal["sequential", "dag"]
"""Runtime execution mode options for workflow runs and nested loop runs."""

type WorkflowFailurePolicy = Literal["skip_dependents", "propagate_failed_state"]
"""Failure handling policies for workflow runs and nested loop runs."""

type WorkflowStepStatus = Literal["completed", "failed", "skipped"]
"""Normalized status strings for workflow step execution outcomes."""


[docs] @runtime_checkable class DelegateRunner(Protocol): """Protocol for configured orchestration chunks with fixed step topology."""
[docs] def run( self, *, context: Mapping[str, object] | None = None, execution_mode: WorkflowExecutionMode = "dag", failure_policy: WorkflowFailurePolicy = "skip_dependents", request_id: str | None = None, dependencies: Mapping[str, object] | None = None, ) -> ExecutionResult: """Execute the configured orchestration and return aggregated results. Args: context: Optional shared context mapping available to step builders. execution_mode: Runtime scheduling mode (for example ``dag``). failure_policy: Failure behavior when a step fails. request_id: Optional request id used for tracing and downstream calls. dependencies: Optional dependency payload mapping exposed to steps. Returns: Aggregated workflow execution result. """
[docs] @runtime_checkable class WorkflowDelegate(Protocol): """Protocol for raw ``Workflow`` objects used as delegates."""
[docs] def run( self, input: str | Mapping[str, object] | None = None, *, execution_mode: WorkflowExecutionMode = "sequential", failure_policy: WorkflowFailurePolicy = "skip_dependents", request_id: str | None = None, dependencies: Mapping[str, object] | None = None, ) -> ExecutionResult: """Execute a workflow object and return one aggregate result. Args: input: Optional workflow input payload. execution_mode: Runtime scheduling mode (for example ``dag``). failure_policy: Failure behavior when a step fails. request_id: Optional request id used for tracing and downstream calls. dependencies: Optional dependency payload mapping exposed to steps. Returns: Aggregated workflow execution result. """
# Delegate inputs are intentionally broad so callers can compose agents, workflow # wrappers, and direct Workflow objects without adapter boilerplate. type DelegateTarget = Delegate | DelegateRunner | WorkflowDelegate """Union type covering all supported delegate types for delegate steps and batch delegate calls."""
[docs] @dataclass(slots=True, frozen=True, kw_only=True) class WorkflowArtifactSource: """Provenance entry describing one artifact source edge.""" step_id: str """Step id that contributed to this artifact.""" field: str | None = None """Optional output field or source label within the step payload.""" note: str | None = None """Optional human-readable provenance note."""
[docs] @dataclass(slots=True, frozen=True, kw_only=True) class WorkflowArtifact: """User-facing workflow artifact manifest entry.""" path: str """Filesystem path to the artifact.""" mime: str """MIME type for artifact consumers.""" title: str | None = None """Optional short artifact title for UIs.""" summary: str | None = None """Optional artifact summary for user-facing rendering.""" audience: str | None = None """Optional target audience label (for example ``user``).""" producer_step_id: str | None = None """Step id that produced the artifact when known.""" sources: tuple[WorkflowArtifactSource, ...] = () """Provenance entries describing source steps/fields.""" metadata: dict[str, object] = field(default_factory=dict) """Supplemental artifact metadata."""
[docs] def to_dict(self) -> dict[str, Any]: """Return a JSON-serializable dictionary representation. Returns: Dictionary representation of this artifact entry. """ return asdict(self)
type WorkflowArtifactsBuilder = Callable[ [Mapping[str, object]], Sequence[WorkflowArtifact | Mapping[str, object]], ] """Optional callback type for building user-facing artifact manifests from runtime step context.""" type ToolStepInputBuilder = Callable[[Mapping[str, object]], Mapping[str, object]] """Callback type for building tool input payloads from runtime step context.""" type DelegateStepPromptBuilder = Callable[[Mapping[str, object]], str] """Callback type for building delegate prompt strings from runtime step context.""" type ModelStepRequestBuilder = Callable[[Mapping[str, object]], LLMRequest] """Callback type for building model request payloads from runtime step context.""" type ModelStepResponseParser = Callable[ [LLMResponse, Mapping[str, object]], Mapping[str, object], ] """Optional callback type for parsing model responses into structured step output.""" type LogicStepHandler = Callable[[Mapping[str, object]], Mapping[str, object]] """Callback type for executing deterministic logic within a logic step from runtime step context.""" type MemoryReadQueryBuilder = Callable[[Mapping[str, object]], str | Mapping[str, object]] """Callback type for building memory read query text or payload from runtime step context.""" type MemoryWriteRecordsBuilder = Callable[ [Mapping[str, object]], Sequence[str | Mapping[str, object] | MemoryWriteRecord], ] """Callback type for building memory write record payloads from runtime step context.""" type LoopStepContinuePredicate = Callable[[int, Mapping[str, object]], bool] """Callback type for deciding whether to continue iterating the loop body based on iteration count and loop state.""" type LoopStepStateReducer = Callable[ [Mapping[str, object], ExecutionResult, int], Mapping[str, object], ] """Callback type for computing next loop state from prior state, iteration result, and iteration count.""" type LoopStepTerminationReason = Literal[ "condition_stopped", "max_iterations_reached", "iteration_failed", ] """Normalized termination reason strings for loop steps.""" # Keep this closed set stable so downstream analytics can aggregate loop outcomes # without handling free-form reason strings.
[docs] @dataclass(slots=True, frozen=True, kw_only=True) class ToolStep: """Workflow step that invokes one runtime tool.""" step_id: str """Unique step identifier used for dependency wiring and result lookup.""" tool_name: str """Registered tool name to invoke through the tool runtime.""" dependencies: tuple[str, ...] = () """Step ids that must complete before this step can run.""" input_data: Mapping[str, object] | None = None """Static input payload used when ``input_builder`` is not provided.""" input_builder: ToolStepInputBuilder | None = None """Optional callback that derives input payload from runtime step context.""" artifacts_builder: WorkflowArtifactsBuilder | None = None """Optional callback that extracts user-facing artifact manifests from step context."""
[docs] @dataclass(slots=True, frozen=True, kw_only=True) class DelegateStep: """Workflow step that invokes one direct delegate.""" step_id: str """Unique step identifier used for dependency wiring and result lookup.""" delegate: DelegateTarget """Direct delegate object (agent, pattern, or workflow-like runner).""" dependencies: tuple[str, ...] = () """Step ids that must complete before this step can run.""" prompt: str | None = None """Static prompt passed to the delegate when ``prompt_builder`` is absent.""" prompt_builder: DelegateStepPromptBuilder | None = None """Optional callback that derives a prompt string from runtime step context.""" artifacts_builder: WorkflowArtifactsBuilder | None = None """Optional callback that extracts user-facing artifact manifests from step context."""
[docs] @dataclass(slots=True, frozen=True, kw_only=True) class ModelStep: """Workflow step that executes one model request through an LLM client.""" step_id: str """Unique step identifier used for dependency wiring and result lookup.""" llm_client: LLMClient """LLM client used to execute the request built for this step.""" request_builder: ModelStepRequestBuilder """Callback that builds the ``LLMRequest`` payload from runtime context.""" dependencies: tuple[str, ...] = () """Step ids that must complete before this step can run.""" response_parser: ModelStepResponseParser | None = None """Optional callback that parses model response into structured output.""" artifacts_builder: WorkflowArtifactsBuilder | None = None """Optional callback that extracts user-facing artifact manifests from step context."""
[docs] @dataclass(slots=True, frozen=True, kw_only=True) class DelegateBatchCall: """One delegate call specification executed by ``DelegateBatchStep``.""" call_id: str """Unique call identifier within the batch.""" delegate: DelegateTarget """Delegate object invoked for this call.""" prompt: str """Prompt passed to the delegate for this call.""" execution_mode: WorkflowExecutionMode = "sequential" """Execution mode propagated when the delegate is workflow-like.""" failure_policy: WorkflowFailurePolicy = "skip_dependents" """Failure policy propagated when the delegate is workflow-like."""
type DelegateBatchCallsBuilder = Callable[ [Mapping[str, object]], Sequence[DelegateBatchCall | Mapping[str, object]], ]
[docs] @dataclass(slots=True, frozen=True, kw_only=True) class DelegateBatchStep: """Workflow step that executes multiple delegate invocations in sequence.""" step_id: str """Unique step identifier used for dependency wiring and result lookup.""" calls_builder: DelegateBatchCallsBuilder """Callback that builds batch delegate call specs from runtime context.""" dependencies: tuple[str, ...] = () """Step ids that must complete before this step can run.""" fail_fast: bool = True """Whether to stop executing additional calls after first failure.""" artifacts_builder: WorkflowArtifactsBuilder | None = None """Optional callback that extracts user-facing artifact manifests from step context."""
[docs] @dataclass(slots=True, frozen=True, kw_only=True) class LogicStep: """Workflow step that executes deterministic local logic.""" step_id: str """Unique step identifier used for dependency wiring and result lookup.""" handler: LogicStepHandler """Deterministic local function that computes this step output.""" dependencies: tuple[str, ...] = () """Step ids that must complete before this step can run.""" route_map: Mapping[str, tuple[str, ...]] | None = None """Optional route key to downstream-target mapping for conditional activation.""" artifacts_builder: WorkflowArtifactsBuilder | None = None """Optional callback that extracts user-facing artifact manifests from step context."""
[docs] @dataclass(slots=True, frozen=True, kw_only=True) class LoopStep: """Workflow step that executes an iterative nested workflow body.""" step_id: str """Unique step identifier used for dependency wiring and result lookup.""" steps: tuple[WorkflowStep, ...] """Static loop body steps executed for each iteration.""" dependencies: tuple[str, ...] = () """Step ids that must complete before loop iteration begins.""" max_iterations: int = 1 """Hard cap on the number of loop iterations.""" initial_state: Mapping[str, object] | None = None """Initial loop state mapping provided to iteration context.""" continue_predicate: LoopStepContinuePredicate | None = None """Predicate deciding whether to execute the next iteration.""" state_reducer: LoopStepStateReducer | None = None """Reducer that computes next loop state from prior state and iteration result.""" execution_mode: WorkflowExecutionMode = "sequential" """Execution mode used for nested loop-body workflow runs.""" failure_policy: WorkflowFailurePolicy = "skip_dependents" """Failure handling policy applied within each loop iteration run.""" artifacts_builder: WorkflowArtifactsBuilder | None = None """Optional callback that extracts user-facing artifact manifests from step context."""
[docs] @dataclass(slots=True, frozen=True, kw_only=True) class MemoryReadStep: """Workflow step that reads relevant records from the memory store.""" step_id: str """Unique step identifier used for dependency wiring and result lookup.""" query_builder: MemoryReadQueryBuilder """Callback that builds query text or query payload from step context.""" dependencies: tuple[str, ...] = () """Step ids that must complete before this step can run.""" namespace: str = "default" """Namespace partition to read from.""" top_k: int = 5 """Maximum number of records to return.""" min_score: float | None = None """Optional minimum score threshold for returned records.""" artifacts_builder: WorkflowArtifactsBuilder | None = None """Optional callback that extracts user-facing artifact manifests from step context."""
[docs] @dataclass(slots=True, frozen=True, kw_only=True) class MemoryWriteStep: """Workflow step that writes records into the memory store.""" step_id: str """Unique step identifier used for dependency wiring and result lookup.""" records_builder: MemoryWriteRecordsBuilder """Callback that builds record payloads from step context.""" dependencies: tuple[str, ...] = () """Step ids that must complete before this step can run.""" namespace: str = "default" """Namespace partition to write into.""" artifacts_builder: WorkflowArtifactsBuilder | None = None """Optional callback that extracts user-facing artifact manifests from step context."""
type WorkflowStep = ( ToolStep | DelegateStep | ModelStep | DelegateBatchStep | LogicStep | LoopStep | MemoryReadStep | MemoryWriteStep ) """Union type covering all supported workflow step variants for use in step sequences and delegate definitions."""
[docs] @dataclass(slots=True, frozen=True, kw_only=True) class WorkflowStepResult: """Result payload for one workflow step execution.""" step_id: str """Step id this result belongs to.""" status: WorkflowStepStatus """Execution status for the step.""" success: bool """True when step completed successfully.""" output: dict[str, object] = field(default_factory=dict) """Step output payload produced by the runtime.""" error: str | None = None """Human-readable error message when step fails.""" metadata: dict[str, object] = field(default_factory=dict) """Supplemental runtime metadata for diagnostics or tracing.""" artifacts: tuple[WorkflowArtifact, ...] = () """User-facing artifact manifests produced by this step.""" @property def final_output(self) -> object | None: """Return ``final_output`` value from ``output`` when present. Returns: Final output payload value, or ``None``. """ return self.output.get("final_output") @property def terminated_reason(self) -> str | None: """Return normalized step termination reason when present. Returns: Termination reason string, or ``None``. """ value = self.output.get("terminated_reason") return value if isinstance(value, str) else None
[docs] def output_value(self, key: str, default: object | None = None) -> object | None: """Return one output value by key with optional default. Args: key: Output key to read. default: Value returned when ``key`` is absent. Returns: Output value for ``key`` when present, else ``default``. """ return self.output.get(key, default)
[docs] def output_dict(self, key: str) -> dict[str, object]: """Return one output value normalized to a dictionary. Args: key: Output key to read. Returns: Dictionary value when the output value is mapping-like, else ``{}``. """ value = self.output.get(key) if isinstance(value, Mapping): return dict(value) return {}
[docs] def output_list(self, key: str) -> list[object]: """Return one output value normalized to a list. Args: key: Output key to read. Returns: List value when the output value is a list/tuple, else ``[]``. """ value = self.output.get(key) if isinstance(value, list): return list(value) if isinstance(value, tuple): return list(value) return []
[docs] def to_dict(self) -> dict[str, Any]: """Return a JSON-serializable dictionary representation. Returns: Dictionary representation of this step result. """ return asdict(self)
[docs] class WorkflowRunner(Protocol): """Protocol implemented by workflow runtime implementations."""
[docs] def run( self, steps: Sequence[WorkflowStep], *, context: Mapping[str, object] | None = None, execution_mode: WorkflowExecutionMode = "dag", failure_policy: WorkflowFailurePolicy = "skip_dependents", request_id: str | None = None, dependencies: Mapping[str, object] | None = None, ) -> ExecutionResult: """Execute a workflow definition and return aggregated results. Args: steps: Workflow step sequence to execute. context: Optional shared context mapping available to step builders. execution_mode: Global runtime scheduling mode (for example ``dag``). failure_policy: Global failure behavior when a step fails. request_id: Optional request id used for tracing and downstream calls. dependencies: Optional dependency payload mapping exposed to steps. Returns: Aggregated workflow execution result. """