Workflow Runtime Loop Step#

Source: examples/workflow/workflow_runtime_loop_step.py

Introduction#

Tree of Thoughts and ReAct each motivate iterative reasoning with explicit state updates, and AutoGen provides a practical framing for orchestrating repeated loop actions. This example demonstrates loop-step execution in the workflow runtime, including bounded iteration behavior and trace emission.

Technical Implementation#

  1. Configure Tracer with JSONL + console output so each run emits machine-readable traces and lifecycle logs.

  2. Build the runtime surface (public APIs only) and execute Workflow.run(...) with a fixed request_id.

  3. Capture structured outputs from runtime execution and preserve termination metadata for analysis.

  4. Print a compact JSON payload including trace_info for deterministic tests and docs examples.

The diagram below is generated from the example’s configured Workflow.

        flowchart LR
    workflow_entry["Workflow Entrypoint"]
    step_1["design_counter_loop<br/>LoopStep<br/>max_iterations=10"]
    subgraph loop_body_1["Loop Body: design_counter_loop"]
        direction TD
        loop_entry_1["design_counter_loop iteration entry"]
        step_2["design_counter_loop::increment<br/>LogicStep"]
        step_3["design_counter_loop::snapshot<br/>LogicStep"]
        loop_entry_1 --> step_2
        step_2 --> step_3
        step_3 -. "next iteration" .-> loop_entry_1
    end
    workflow_entry --> step_1
    step_1 -. "iterate" .-> loop_entry_1
    
  1from __future__ import annotations
  2
  3import json
  4from collections.abc import Mapping
  5from pathlib import Path
  6
  7import design_research_agents as drag
  8
  9WORKFLOW_DIAGRAM_DIRECTION = "LR"
 10
 11
 12def _increment_handler(context: Mapping[str, object]) -> Mapping[str, object]:
 13    loop_state = context.get("loop_state")
 14    state_mapping = loop_state if isinstance(loop_state, Mapping) else {}
 15    return {"counter": int(state_mapping.get("counter", 0)) + 1}
 16
 17
 18def _snapshot_handler(context: Mapping[str, object]) -> Mapping[str, object]:
 19    dependency_results = context.get("dependency_results")
 20    if not isinstance(dependency_results, Mapping):
 21        return {"counter": 0, "status": "looping"}
 22    increment_result = dependency_results.get("increment")
 23    if not isinstance(increment_result, Mapping):
 24        return {"counter": 0, "status": "looping"}
 25    increment_output = increment_result.get("output")
 26    if not isinstance(increment_output, Mapping):
 27        return {"counter": 0, "status": "looping"}
 28    counter = int(increment_output.get("counter", 0))
 29    return {
 30        "counter": counter,
 31        "status": "threshold_met" if counter >= 3 else "looping",
 32    }
 33
 34
 35def _state_reducer(
 36    state: Mapping[str, object],
 37    iteration_result: object,
 38    iteration: int,
 39) -> Mapping[str, object]:
 40    del state, iteration
 41    step_results = getattr(iteration_result, "step_results", {})
 42    if not isinstance(step_results, Mapping):
 43        return {"counter": 0}
 44    increment = step_results.get("increment")
 45    increment_output = getattr(increment, "output", {})
 46    if not isinstance(increment_output, Mapping):
 47        return {"counter": 0}
 48    return {
 49        "counter": int(increment_output.get("counter", 0)),
 50    }
 51
 52
 53def build_example_workflow(*, tracer: drag.Tracer | None = None) -> drag.Workflow:
 54    """Build the bounded loop workflow used for runtime illustration and docs diagrams."""
 55    return drag.Workflow(
 56        tool_runtime=None,
 57        input_schema={"type": "object"},
 58        tracer=tracer,
 59        steps=[
 60            drag.LoopStep(
 61                step_id="design_counter_loop",
 62                steps=(
 63                    drag.LogicStep(step_id="increment", handler=_increment_handler),
 64                    drag.LogicStep(
 65                        step_id="snapshot",
 66                        dependencies=("increment",),
 67                        handler=_snapshot_handler,
 68                    ),
 69                ),
 70                max_iterations=10,
 71                initial_state={"counter": 0},
 72                continue_predicate=lambda iteration, state: int(state.get("counter", 0)) < 3,
 73                state_reducer=_state_reducer,
 74                execution_mode="sequential",
 75                failure_policy="skip_dependents",
 76            )
 77        ],
 78    )
 79
 80
 81def main() -> None:
 82    """Run a small loop and print compact JSON summary."""
 83    # Fixed request id keeps traces and docs output deterministic across runs.
 84    request_id = "example-workflow-loop-design-001"
 85    tracer = drag.Tracer(
 86        enabled=True,
 87        trace_dir=Path("artifacts/examples/traces"),
 88        enable_jsonl=True,
 89        enable_console=True,
 90    )
 91    # Build and run the loop workflow using public runtime APIs.
 92    workflow = build_example_workflow(tracer=tracer)
 93
 94    result = workflow.run({}, execution_mode="sequential", request_id=request_id)
 95    # Print the results
 96    summary = result.summary()
 97    print(json.dumps(summary, ensure_ascii=True, indent=2, sort_keys=True))
 98
 99
100if __name__ == "__main__":
101    main()

Expected Results#

Run Command

PYTHONPATH=src python3 examples/workflow/workflow_runtime_loop_step.py

Example output shape (values vary by run):

{
  "success": true,
  "final_output": "<example-specific payload>",
  "terminated_reason": "<string-or-null>",
  "error": null,
  "trace": {
    "request_id": "<request-id>",
    "trace_dir": "artifacts/examples/traces",
    "trace_path": "artifacts/examples/traces/run_<timestamp>_<request_id>.jsonl"
  }
}

References#