Workflow Runtime Loop Step
Source: examples/workflow/workflow_runtime_loop_step.py
Introduction
Tree of Thoughts and ReAct each motivate iterative reasoning with explicit state updates, and AutoGen provides a practical framing for orchestrating repeated loop actions. This example demonstrates loop-step execution in the workflow runtime, including bounded iteration behavior and trace emission.
Technical Implementation
Configure
Tracerwith JSONL + console output so each run emits machine-readable traces and lifecycle logs.Build the runtime surface (public APIs only) and execute
Workflow.run(...)with a fixedrequest_id.Capture structured outputs from runtime execution and preserve termination metadata for analysis.
Print a compact JSON payload including
trace_infofor deterministic tests and docs examples.
flowchart LR
A["Input prompt or scenario"] --> B["main(): runtime wiring"]
B --> C["Workflow.run(...)"]
C --> D["WorkflowRuntime schedules step graph (LogicStep, LoopStep)"]
C --> E["Tracer JSONL + console events"]
D --> F["ExecutionResult/payload"]
E --> F
F --> G["Printed JSON output"]
1from __future__ import annotations
2
3import json
4from collections.abc import Mapping
5from pathlib import Path
6
7from design_research_agents import LogicStep, LoopStep, Tracer, Workflow
8
9
10def _increment_handler(context: Mapping[str, object]) -> Mapping[str, object]:
11 loop_state = context.get("loop_state")
12 state_mapping = loop_state if isinstance(loop_state, Mapping) else {}
13 return {"counter": int(state_mapping.get("counter", 0)) + 1}
14
15
16def _snapshot_handler(context: Mapping[str, object]) -> Mapping[str, object]:
17 dependency_results = context.get("dependency_results")
18 if not isinstance(dependency_results, Mapping):
19 return {"counter": 0, "status": "looping"}
20 increment_result = dependency_results.get("increment")
21 if not isinstance(increment_result, Mapping):
22 return {"counter": 0, "status": "looping"}
23 increment_output = increment_result.get("output")
24 if not isinstance(increment_output, Mapping):
25 return {"counter": 0, "status": "looping"}
26 counter = int(increment_output.get("counter", 0))
27 return {
28 "counter": counter,
29 "status": "threshold_met" if counter >= 3 else "looping",
30 }
31
32
33def _state_reducer(
34 state: Mapping[str, object],
35 iteration_result: object,
36 iteration: int,
37) -> Mapping[str, object]:
38 del state, iteration
39 step_results = getattr(iteration_result, "step_results", {})
40 if not isinstance(step_results, Mapping):
41 return {"counter": 0}
42 increment = step_results.get("increment")
43 increment_output = getattr(increment, "output", {})
44 if not isinstance(increment_output, Mapping):
45 return {"counter": 0}
46 return {
47 "counter": int(increment_output.get("counter", 0)),
48 }
49
50
51def main() -> None:
52 """Run a small loop and print compact JSON summary."""
53 # Fixed request id keeps traces and docs output deterministic across runs.
54 request_id = "example-workflow-loop-design-001"
55 tracer = Tracer(
56 enabled=True,
57 trace_dir=Path("artifacts/examples/traces"),
58 enable_jsonl=True,
59 enable_console=True,
60 )
61 # Build and run the loop workflow using public runtime APIs.
62 workflow = Workflow(
63 tool_runtime=None,
64 input_schema={"type": "object"},
65 tracer=tracer,
66 steps=[
67 LoopStep(
68 step_id="design_counter_loop",
69 steps=(
70 LogicStep(step_id="increment", handler=_increment_handler),
71 LogicStep(
72 step_id="snapshot",
73 dependencies=("increment",),
74 handler=_snapshot_handler,
75 ),
76 ),
77 max_iterations=10,
78 initial_state={"counter": 0},
79 continue_predicate=lambda iteration, state: int(state.get("counter", 0)) < 3,
80 state_reducer=_state_reducer,
81 execution_mode="sequential",
82 failure_policy="skip_dependents",
83 )
84 ],
85 )
86
87 result = workflow.run({}, execution_mode="sequential", request_id=request_id)
88 # Print the results
89 summary = result.summary()
90 print(json.dumps(summary, ensure_ascii=True, indent=2, sort_keys=True))
91
92
93if __name__ == "__main__":
94 main()
Expected Results
Run Command
PYTHONPATH=src python3 examples/workflow/workflow_runtime_loop_step.py
Example output shape (values vary by run):
{
"success": true,
"final_output": "<example-specific payload>",
"terminated_reason": "<string-or-null>",
"error": null,
"trace": {
"request_id": "<request-id>",
"trace_dir": "artifacts/examples/traces",
"trace_path": "artifacts/examples/traces/run_<timestamp>_<request_id>.jsonl"
}
}