Workflow Runtime Loop Step#
Source: examples/workflow/workflow_runtime_loop_step.py
Introduction#
Tree of Thoughts and ReAct each motivate iterative reasoning with explicit state updates, and AutoGen provides a practical framing for orchestrating repeated loop actions. This example demonstrates loop-step execution in the workflow runtime, including bounded iteration behavior and trace emission.
Technical Implementation#
Configure
Tracerwith JSONL + console output so each run emits machine-readable traces and lifecycle logs.Build the runtime surface (public APIs only) and execute
Workflow.run(...)with a fixedrequest_id.Capture structured outputs from runtime execution and preserve termination metadata for analysis.
Print a compact JSON payload including
trace_infofor deterministic tests and docs examples.
The diagram below is generated from the example’s configured Workflow.
flowchart LR
workflow_entry["Workflow Entrypoint"]
step_1["design_counter_loop<br/>LoopStep<br/>max_iterations=10"]
subgraph loop_body_1["Loop Body: design_counter_loop"]
direction TD
loop_entry_1["design_counter_loop iteration entry"]
step_2["design_counter_loop::increment<br/>LogicStep"]
step_3["design_counter_loop::snapshot<br/>LogicStep"]
loop_entry_1 --> step_2
step_2 --> step_3
step_3 -. "next iteration" .-> loop_entry_1
end
workflow_entry --> step_1
step_1 -. "iterate" .-> loop_entry_1
1from __future__ import annotations
2
3import json
4from collections.abc import Mapping
5from pathlib import Path
6
7import design_research_agents as drag
8
9WORKFLOW_DIAGRAM_DIRECTION = "LR"
10
11
12def _increment_handler(context: Mapping[str, object]) -> Mapping[str, object]:
13 loop_state = context.get("loop_state")
14 state_mapping = loop_state if isinstance(loop_state, Mapping) else {}
15 return {"counter": int(state_mapping.get("counter", 0)) + 1}
16
17
18def _snapshot_handler(context: Mapping[str, object]) -> Mapping[str, object]:
19 dependency_results = context.get("dependency_results")
20 if not isinstance(dependency_results, Mapping):
21 return {"counter": 0, "status": "looping"}
22 increment_result = dependency_results.get("increment")
23 if not isinstance(increment_result, Mapping):
24 return {"counter": 0, "status": "looping"}
25 increment_output = increment_result.get("output")
26 if not isinstance(increment_output, Mapping):
27 return {"counter": 0, "status": "looping"}
28 counter = int(increment_output.get("counter", 0))
29 return {
30 "counter": counter,
31 "status": "threshold_met" if counter >= 3 else "looping",
32 }
33
34
35def _state_reducer(
36 state: Mapping[str, object],
37 iteration_result: object,
38 iteration: int,
39) -> Mapping[str, object]:
40 del state, iteration
41 step_results = getattr(iteration_result, "step_results", {})
42 if not isinstance(step_results, Mapping):
43 return {"counter": 0}
44 increment = step_results.get("increment")
45 increment_output = getattr(increment, "output", {})
46 if not isinstance(increment_output, Mapping):
47 return {"counter": 0}
48 return {
49 "counter": int(increment_output.get("counter", 0)),
50 }
51
52
53def build_example_workflow(*, tracer: drag.Tracer | None = None) -> drag.Workflow:
54 """Build the bounded loop workflow used for runtime illustration and docs diagrams."""
55 return drag.Workflow(
56 tool_runtime=None,
57 input_schema={"type": "object"},
58 tracer=tracer,
59 steps=[
60 drag.LoopStep(
61 step_id="design_counter_loop",
62 steps=(
63 drag.LogicStep(step_id="increment", handler=_increment_handler),
64 drag.LogicStep(
65 step_id="snapshot",
66 dependencies=("increment",),
67 handler=_snapshot_handler,
68 ),
69 ),
70 max_iterations=10,
71 initial_state={"counter": 0},
72 continue_predicate=lambda iteration, state: int(state.get("counter", 0)) < 3,
73 state_reducer=_state_reducer,
74 execution_mode="sequential",
75 failure_policy="skip_dependents",
76 )
77 ],
78 )
79
80
81def main() -> None:
82 """Run a small loop and print compact JSON summary."""
83 # Fixed request id keeps traces and docs output deterministic across runs.
84 request_id = "example-workflow-loop-design-001"
85 tracer = drag.Tracer(
86 enabled=True,
87 trace_dir=Path("artifacts/examples/traces"),
88 enable_jsonl=True,
89 enable_console=True,
90 )
91 # Build and run the loop workflow using public runtime APIs.
92 workflow = build_example_workflow(tracer=tracer)
93
94 result = workflow.run({}, execution_mode="sequential", request_id=request_id)
95 # Print the results
96 summary = result.summary()
97 print(json.dumps(summary, ensure_ascii=True, indent=2, sort_keys=True))
98
99
100if __name__ == "__main__":
101 main()
Expected Results#
Run Command
PYTHONPATH=src python3 examples/workflow/workflow_runtime_loop_step.py
Example output shape (values vary by run):
{
"success": true,
"final_output": "<example-specific payload>",
"terminated_reason": "<string-or-null>",
"error": null,
"trace": {
"request_id": "<request-id>",
"trace_dir": "artifacts/examples/traces",
"trace_path": "artifacts/examples/traces/run_<timestamp>_<request_id>.jsonl"
}
}