Workflow Runtime Loop Step

Source: examples/workflow/workflow_runtime_loop_step.py

Introduction

Tree of Thoughts and ReAct each motivate iterative reasoning with explicit state updates, and AutoGen provides a practical framing for orchestrating repeated loop actions. This example demonstrates loop-step execution in the workflow runtime, including bounded iteration behavior and trace emission.

Technical Implementation

  1. Configure Tracer with JSONL + console output so each run emits machine-readable traces and lifecycle logs.

  2. Build the runtime surface (public APIs only) and execute Workflow.run(...) with a fixed request_id.

  3. Capture structured outputs from runtime execution and preserve termination metadata for analysis.

  4. Print a compact JSON payload including trace_info for deterministic tests and docs examples.

        flowchart LR
    A["Input prompt or scenario"] --> B["main(): runtime wiring"]
    B --> C["Workflow.run(...)"]
    C --> D["WorkflowRuntime schedules step graph (LogicStep, LoopStep)"]
    C --> E["Tracer JSONL + console events"]
    D --> F["ExecutionResult/payload"]
    E --> F
    F --> G["Printed JSON output"]
    
 1from __future__ import annotations
 2
 3import json
 4from collections.abc import Mapping
 5from pathlib import Path
 6
 7from design_research_agents import LogicStep, LoopStep, Tracer, Workflow
 8
 9
10def _increment_handler(context: Mapping[str, object]) -> Mapping[str, object]:
11    loop_state = context.get("loop_state")
12    state_mapping = loop_state if isinstance(loop_state, Mapping) else {}
13    return {"counter": int(state_mapping.get("counter", 0)) + 1}
14
15
16def _snapshot_handler(context: Mapping[str, object]) -> Mapping[str, object]:
17    dependency_results = context.get("dependency_results")
18    if not isinstance(dependency_results, Mapping):
19        return {"counter": 0, "status": "looping"}
20    increment_result = dependency_results.get("increment")
21    if not isinstance(increment_result, Mapping):
22        return {"counter": 0, "status": "looping"}
23    increment_output = increment_result.get("output")
24    if not isinstance(increment_output, Mapping):
25        return {"counter": 0, "status": "looping"}
26    counter = int(increment_output.get("counter", 0))
27    return {
28        "counter": counter,
29        "status": "threshold_met" if counter >= 3 else "looping",
30    }
31
32
33def _state_reducer(
34    state: Mapping[str, object],
35    iteration_result: object,
36    iteration: int,
37) -> Mapping[str, object]:
38    del state, iteration
39    step_results = getattr(iteration_result, "step_results", {})
40    if not isinstance(step_results, Mapping):
41        return {"counter": 0}
42    increment = step_results.get("increment")
43    increment_output = getattr(increment, "output", {})
44    if not isinstance(increment_output, Mapping):
45        return {"counter": 0}
46    return {
47        "counter": int(increment_output.get("counter", 0)),
48    }
49
50
51def main() -> None:
52    """Run a small loop and print compact JSON summary."""
53    # Fixed request id keeps traces and docs output deterministic across runs.
54    request_id = "example-workflow-loop-design-001"
55    tracer = Tracer(
56        enabled=True,
57        trace_dir=Path("artifacts/examples/traces"),
58        enable_jsonl=True,
59        enable_console=True,
60    )
61    # Build and run the loop workflow using public runtime APIs.
62    workflow = Workflow(
63        tool_runtime=None,
64        input_schema={"type": "object"},
65        tracer=tracer,
66        steps=[
67            LoopStep(
68                step_id="design_counter_loop",
69                steps=(
70                    LogicStep(step_id="increment", handler=_increment_handler),
71                    LogicStep(
72                        step_id="snapshot",
73                        dependencies=("increment",),
74                        handler=_snapshot_handler,
75                    ),
76                ),
77                max_iterations=10,
78                initial_state={"counter": 0},
79                continue_predicate=lambda iteration, state: int(state.get("counter", 0)) < 3,
80                state_reducer=_state_reducer,
81                execution_mode="sequential",
82                failure_policy="skip_dependents",
83            )
84        ],
85    )
86
87    result = workflow.run({}, execution_mode="sequential", request_id=request_id)
88    # Print the results
89    summary = result.summary()
90    print(json.dumps(summary, ensure_ascii=True, indent=2, sort_keys=True))
91
92
93if __name__ == "__main__":
94    main()

Expected Results

Run Command

PYTHONPATH=src python3 examples/workflow/workflow_runtime_loop_step.py

Example output shape (values vary by run):

{
  "success": true,
  "final_output": "<example-specific payload>",
  "terminated_reason": "<string-or-null>",
  "error": null,
  "trace": {
    "request_id": "<request-id>",
    "trace_dir": "artifacts/examples/traces",
    "trace_path": "artifacts/examples/traces/run_<timestamp>_<request_id>.jsonl"
  }
}

References