"""Workflow runtime contracts and typed step payloads."""
from __future__ import annotations
from collections.abc import Callable, Mapping, Sequence
from dataclasses import asdict, dataclass, field
from typing import Any, Literal, Protocol, runtime_checkable
from ._delegate import Delegate
from ._execution import ExecutionResult
from ._llm import LLMClient, LLMRequest, LLMResponse
from ._memory import MemoryWriteRecord
type WorkflowExecutionMode = Literal["sequential", "dag"]
"""Runtime execution mode options for workflow runs and nested loop runs."""
type WorkflowFailurePolicy = Literal["skip_dependents", "propagate_failed_state"]
"""Failure handling policies for workflow runs and nested loop runs."""
type WorkflowStepStatus = Literal["completed", "failed", "skipped"]
"""Normalized status strings for workflow step execution outcomes."""
[docs]
@runtime_checkable
class DelegateRunner(Protocol):
"""Protocol for configured orchestration chunks with fixed step topology."""
[docs]
def run(
self,
*,
context: Mapping[str, object] | None = None,
execution_mode: WorkflowExecutionMode = "dag",
failure_policy: WorkflowFailurePolicy = "skip_dependents",
request_id: str | None = None,
dependencies: Mapping[str, object] | None = None,
) -> ExecutionResult:
"""Execute the configured orchestration and return aggregated results.
Args:
context: Optional shared context mapping available to step builders.
execution_mode: Runtime scheduling mode (for example ``dag``).
failure_policy: Failure behavior when a step fails.
request_id: Optional request id used for tracing and downstream calls.
dependencies: Optional dependency payload mapping exposed to steps.
Returns:
Aggregated workflow execution result.
"""
[docs]
@runtime_checkable
class WorkflowDelegate(Protocol):
"""Protocol for raw ``Workflow`` objects used as delegates."""
[docs]
def run(
self,
input: str | Mapping[str, object] | None = None,
*,
execution_mode: WorkflowExecutionMode = "sequential",
failure_policy: WorkflowFailurePolicy = "skip_dependents",
request_id: str | None = None,
dependencies: Mapping[str, object] | None = None,
) -> ExecutionResult:
"""Execute a workflow object and return one aggregate result.
Args:
input: Optional workflow input payload.
execution_mode: Runtime scheduling mode (for example ``dag``).
failure_policy: Failure behavior when a step fails.
request_id: Optional request id used for tracing and downstream calls.
dependencies: Optional dependency payload mapping exposed to steps.
Returns:
Aggregated workflow execution result.
"""
# Delegate inputs are intentionally broad so callers can compose agents, workflow
# wrappers, and direct Workflow objects without adapter boilerplate.
type DelegateTarget = Delegate | DelegateRunner | WorkflowDelegate
"""Union type covering all supported delegate types for delegate steps and batch delegate calls."""
[docs]
@dataclass(slots=True, frozen=True, kw_only=True)
class WorkflowArtifactSource:
"""Provenance entry describing one artifact source edge."""
step_id: str
"""Step id that contributed to this artifact."""
field: str | None = None
"""Optional output field or source label within the step payload."""
note: str | None = None
"""Optional human-readable provenance note."""
[docs]
@dataclass(slots=True, frozen=True, kw_only=True)
class WorkflowArtifact:
"""User-facing workflow artifact manifest entry."""
path: str
"""Filesystem path to the artifact."""
mime: str
"""MIME type for artifact consumers."""
title: str | None = None
"""Optional short artifact title for UIs."""
summary: str | None = None
"""Optional artifact summary for user-facing rendering."""
audience: str | None = None
"""Optional target audience label (for example ``user``)."""
producer_step_id: str | None = None
"""Step id that produced the artifact when known."""
sources: tuple[WorkflowArtifactSource, ...] = ()
"""Provenance entries describing source steps/fields."""
metadata: dict[str, object] = field(default_factory=dict)
"""Supplemental artifact metadata."""
[docs]
def to_dict(self) -> dict[str, Any]:
"""Return a JSON-serializable dictionary representation.
Returns:
Dictionary representation of this artifact entry.
"""
return asdict(self)
type WorkflowArtifactsBuilder = Callable[
[Mapping[str, object]],
Sequence[WorkflowArtifact | Mapping[str, object]],
]
"""Optional callback type for building user-facing artifact manifests from runtime step context."""
type ToolStepInputBuilder = Callable[[Mapping[str, object]], Mapping[str, object]]
"""Callback type for building tool input payloads from runtime step context."""
type DelegateStepPromptBuilder = Callable[[Mapping[str, object]], str]
"""Callback type for building delegate prompt strings from runtime step context."""
type ModelStepRequestBuilder = Callable[[Mapping[str, object]], LLMRequest]
"""Callback type for building model request payloads from runtime step context."""
type ModelStepResponseParser = Callable[
[LLMResponse, Mapping[str, object]],
Mapping[str, object],
]
"""Optional callback type for parsing model responses into structured step output."""
type LogicStepHandler = Callable[[Mapping[str, object]], Mapping[str, object]]
"""Callback type for executing deterministic logic within a logic step from runtime step context."""
type MemoryReadQueryBuilder = Callable[[Mapping[str, object]], str | Mapping[str, object]]
"""Callback type for building memory read query text or payload from runtime step context."""
type MemoryWriteRecordsBuilder = Callable[
[Mapping[str, object]],
Sequence[str | Mapping[str, object] | MemoryWriteRecord],
]
"""Callback type for building memory write record payloads from runtime step context."""
type LoopStepContinuePredicate = Callable[[int, Mapping[str, object]], bool]
"""Callback type for deciding whether to continue iterating the loop body based on iteration count and loop state."""
type LoopStepStateReducer = Callable[
[Mapping[str, object], ExecutionResult, int],
Mapping[str, object],
]
"""Callback type for computing next loop state from prior state, iteration result, and iteration count."""
type LoopStepTerminationReason = Literal[
"condition_stopped",
"max_iterations_reached",
"iteration_failed",
]
"""Normalized termination reason strings for loop steps."""
# Keep this closed set stable so downstream analytics can aggregate loop outcomes
# without handling free-form reason strings.
[docs]
@dataclass(slots=True, frozen=True, kw_only=True)
class DelegateStep:
"""Workflow step that invokes one direct delegate."""
step_id: str
"""Unique step identifier used for dependency wiring and result lookup."""
delegate: DelegateTarget
"""Direct delegate object (agent, pattern, or workflow-like runner)."""
dependencies: tuple[str, ...] = ()
"""Step ids that must complete before this step can run."""
prompt: str | None = None
"""Static prompt passed to the delegate when ``prompt_builder`` is absent."""
prompt_builder: DelegateStepPromptBuilder | None = None
"""Optional callback that derives a prompt string from runtime step context."""
artifacts_builder: WorkflowArtifactsBuilder | None = None
"""Optional callback that extracts user-facing artifact manifests from step context."""
[docs]
@dataclass(slots=True, frozen=True, kw_only=True)
class ModelStep:
"""Workflow step that executes one model request through an LLM client."""
step_id: str
"""Unique step identifier used for dependency wiring and result lookup."""
llm_client: LLMClient
"""LLM client used to execute the request built for this step."""
request_builder: ModelStepRequestBuilder
"""Callback that builds the ``LLMRequest`` payload from runtime context."""
dependencies: tuple[str, ...] = ()
"""Step ids that must complete before this step can run."""
response_parser: ModelStepResponseParser | None = None
"""Optional callback that parses model response into structured output."""
artifacts_builder: WorkflowArtifactsBuilder | None = None
"""Optional callback that extracts user-facing artifact manifests from step context."""
[docs]
@dataclass(slots=True, frozen=True, kw_only=True)
class DelegateBatchCall:
"""One delegate call specification executed by ``DelegateBatchStep``."""
call_id: str
"""Unique call identifier within the batch."""
delegate: DelegateTarget
"""Delegate object invoked for this call."""
prompt: str
"""Prompt passed to the delegate for this call."""
execution_mode: WorkflowExecutionMode = "sequential"
"""Execution mode propagated when the delegate is workflow-like."""
failure_policy: WorkflowFailurePolicy = "skip_dependents"
"""Failure policy propagated when the delegate is workflow-like."""
type DelegateBatchCallsBuilder = Callable[
[Mapping[str, object]],
Sequence[DelegateBatchCall | Mapping[str, object]],
]
[docs]
@dataclass(slots=True, frozen=True, kw_only=True)
class DelegateBatchStep:
"""Workflow step that executes multiple delegate invocations in sequence."""
step_id: str
"""Unique step identifier used for dependency wiring and result lookup."""
calls_builder: DelegateBatchCallsBuilder
"""Callback that builds batch delegate call specs from runtime context."""
dependencies: tuple[str, ...] = ()
"""Step ids that must complete before this step can run."""
fail_fast: bool = True
"""Whether to stop executing additional calls after first failure."""
artifacts_builder: WorkflowArtifactsBuilder | None = None
"""Optional callback that extracts user-facing artifact manifests from step context."""
[docs]
@dataclass(slots=True, frozen=True, kw_only=True)
class LogicStep:
"""Workflow step that executes deterministic local logic."""
step_id: str
"""Unique step identifier used for dependency wiring and result lookup."""
handler: LogicStepHandler
"""Deterministic local function that computes this step output."""
dependencies: tuple[str, ...] = ()
"""Step ids that must complete before this step can run."""
route_map: Mapping[str, tuple[str, ...]] | None = None
"""Optional route key to downstream-target mapping for conditional activation."""
artifacts_builder: WorkflowArtifactsBuilder | None = None
"""Optional callback that extracts user-facing artifact manifests from step context."""
[docs]
@dataclass(slots=True, frozen=True, kw_only=True)
class LoopStep:
"""Workflow step that executes an iterative nested workflow body."""
step_id: str
"""Unique step identifier used for dependency wiring and result lookup."""
steps: tuple[WorkflowStep, ...]
"""Static loop body steps executed for each iteration."""
dependencies: tuple[str, ...] = ()
"""Step ids that must complete before loop iteration begins."""
max_iterations: int = 1
"""Hard cap on the number of loop iterations."""
initial_state: Mapping[str, object] | None = None
"""Initial loop state mapping provided to iteration context."""
continue_predicate: LoopStepContinuePredicate | None = None
"""Predicate deciding whether to execute the next iteration."""
state_reducer: LoopStepStateReducer | None = None
"""Reducer that computes next loop state from prior state and iteration result."""
execution_mode: WorkflowExecutionMode = "sequential"
"""Execution mode used for nested loop-body workflow runs."""
failure_policy: WorkflowFailurePolicy = "skip_dependents"
"""Failure handling policy applied within each loop iteration run."""
artifacts_builder: WorkflowArtifactsBuilder | None = None
"""Optional callback that extracts user-facing artifact manifests from step context."""
[docs]
@dataclass(slots=True, frozen=True, kw_only=True)
class MemoryReadStep:
"""Workflow step that reads relevant records from the memory store."""
step_id: str
"""Unique step identifier used for dependency wiring and result lookup."""
query_builder: MemoryReadQueryBuilder
"""Callback that builds query text or query payload from step context."""
dependencies: tuple[str, ...] = ()
"""Step ids that must complete before this step can run."""
namespace: str = "default"
"""Namespace partition to read from."""
top_k: int = 5
"""Maximum number of records to return."""
min_score: float | None = None
"""Optional minimum score threshold for returned records."""
artifacts_builder: WorkflowArtifactsBuilder | None = None
"""Optional callback that extracts user-facing artifact manifests from step context."""
[docs]
@dataclass(slots=True, frozen=True, kw_only=True)
class MemoryWriteStep:
"""Workflow step that writes records into the memory store."""
step_id: str
"""Unique step identifier used for dependency wiring and result lookup."""
records_builder: MemoryWriteRecordsBuilder
"""Callback that builds record payloads from step context."""
dependencies: tuple[str, ...] = ()
"""Step ids that must complete before this step can run."""
namespace: str = "default"
"""Namespace partition to write into."""
artifacts_builder: WorkflowArtifactsBuilder | None = None
"""Optional callback that extracts user-facing artifact manifests from step context."""
type WorkflowStep = (
ToolStep | DelegateStep | ModelStep | DelegateBatchStep | LogicStep | LoopStep | MemoryReadStep | MemoryWriteStep
)
"""Union type covering all supported workflow step variants for use in step sequences and delegate definitions."""
[docs]
@dataclass(slots=True, frozen=True, kw_only=True)
class WorkflowStepResult:
"""Result payload for one workflow step execution."""
step_id: str
"""Step id this result belongs to."""
status: WorkflowStepStatus
"""Execution status for the step."""
success: bool
"""True when step completed successfully."""
output: dict[str, object] = field(default_factory=dict)
"""Step output payload produced by the runtime."""
error: str | None = None
"""Human-readable error message when step fails."""
metadata: dict[str, object] = field(default_factory=dict)
"""Supplemental runtime metadata for diagnostics or tracing."""
artifacts: tuple[WorkflowArtifact, ...] = ()
"""User-facing artifact manifests produced by this step."""
@property
def final_output(self) -> object | None:
"""Return ``final_output`` value from ``output`` when present.
Returns:
Final output payload value, or ``None``.
"""
return self.output.get("final_output")
@property
def terminated_reason(self) -> str | None:
"""Return normalized step termination reason when present.
Returns:
Termination reason string, or ``None``.
"""
value = self.output.get("terminated_reason")
return value if isinstance(value, str) else None
[docs]
def output_value(self, key: str, default: object | None = None) -> object | None:
"""Return one output value by key with optional default.
Args:
key: Output key to read.
default: Value returned when ``key`` is absent.
Returns:
Output value for ``key`` when present, else ``default``.
"""
return self.output.get(key, default)
[docs]
def output_dict(self, key: str) -> dict[str, object]:
"""Return one output value normalized to a dictionary.
Args:
key: Output key to read.
Returns:
Dictionary value when the output value is mapping-like, else ``{}``.
"""
value = self.output.get(key)
if isinstance(value, Mapping):
return dict(value)
return {}
[docs]
def output_list(self, key: str) -> list[object]:
"""Return one output value normalized to a list.
Args:
key: Output key to read.
Returns:
List value when the output value is a list/tuple, else ``[]``.
"""
value = self.output.get(key)
if isinstance(value, list):
return list(value)
if isinstance(value, tuple):
return list(value)
return []
[docs]
def to_dict(self) -> dict[str, Any]:
"""Return a JSON-serializable dictionary representation.
Returns:
Dictionary representation of this step result.
"""
return asdict(self)
[docs]
class WorkflowRunner(Protocol):
"""Protocol implemented by workflow runtime implementations."""
[docs]
def run(
self,
steps: Sequence[WorkflowStep],
*,
context: Mapping[str, object] | None = None,
execution_mode: WorkflowExecutionMode = "dag",
failure_policy: WorkflowFailurePolicy = "skip_dependents",
request_id: str | None = None,
dependencies: Mapping[str, object] | None = None,
) -> ExecutionResult:
"""Execute a workflow definition and return aggregated results.
Args:
steps: Workflow step sequence to execute.
context: Optional shared context mapping available to step builders.
execution_mode: Global runtime scheduling mode (for example ``dag``).
failure_policy: Global failure behavior when a step fails.
request_id: Optional request id used for tracing and downstream calls.
dependencies: Optional dependency payload mapping exposed to steps.
Returns:
Aggregated workflow execution result.
"""