Workflow Delegate And Memory Steps

Source: examples/workflow/workflow_delegate_and_memory_steps.py

Introduction

Generative Agents and MemGPT both emphasize durable memory as a first-class runtime primitive, while AutoGen demonstrates delegation across specialized roles. This example composes delegate and memory steps in a single workflow so context propagation and role handoff remain explicit.

Technical Implementation

  1. Configure Tracer with JSONL + console output so each run emits machine-readable traces and lifecycle logs.

  2. Build the runtime surface (public APIs only) and execute Workflow.run(...) with a fixed request_id.

  3. Capture structured outputs from runtime execution and preserve termination metadata for analysis.

  4. Persist and query context via SQLiteMemoryStore to demonstrate memory-backed workflow behavior.

  5. Print a compact JSON payload including trace_info for deterministic tests and docs examples.

        flowchart LR
    A["Input prompt or scenario"] --> B["main(): runtime wiring"]
    B --> C["Workflow.run(...)"]
    C --> D["WorkflowRuntime schedules step graph (DelegateBatchStep, LogicStep, MemoryReadStep, MemoryWriteStep)"]
    C --> E["Tracer JSONL + console events"]
    D --> F["ExecutionResult/payload"]
    E --> F
    F --> G["Printed JSON output"]
    
  1from __future__ import annotations
  2
  3import json
  4from pathlib import Path
  5
  6from design_research_agents import (
  7    DelegateBatchStep,
  8    DirectLLMCall,
  9    LlamaCppServerLLMClient,
 10    LogicStep,
 11    MemoryReadStep,
 12    MemoryWriteStep,
 13    Tracer,
 14    Workflow,
 15)
 16from design_research_agents.memory import SQLiteMemoryStore
 17
 18
 19def main() -> None:
 20    """Execute memory and delegate-batch primitives in one traced workflow."""
 21    # Stable request ids keep workflow trace artifacts deterministic for docs snapshots.
 22    request_id = "example-workflow-delegate-memory-design-001"
 23    tracer = Tracer(
 24        enabled=True,
 25        trace_dir=Path("artifacts/examples/traces"),
 26        enable_jsonl=True,
 27        enable_console=True,
 28    )
 29    db_path = Path("artifacts/examples/workflow_delegate_and_memory.sqlite3")
 30    db_path.parent.mkdir(parents=True, exist_ok=True)
 31    # Reset persisted state so each example run starts from the same memory baseline.
 32    if db_path.exists():
 33        db_path.unlink()
 34
 35    # Run the workflow using public runtime surfaces. Using this with statement will automatically close the
 36    # memory store and managed client when the example is done.
 37    with (
 38        SQLiteMemoryStore(db_path=db_path) as store,
 39        LlamaCppServerLLMClient() as llm_client,
 40    ):
 41        # Two delegates share the same backend client to model role-specific prompts over one transport.
 42        manufacturing_peer = DirectLLMCall(llm_client=llm_client, tracer=tracer)
 43        reliability_peer = DirectLLMCall(llm_client=llm_client, tracer=tracer)
 44
 45        workflow = Workflow(
 46            tool_runtime=None,
 47            memory_store=store,
 48            tracer=tracer,
 49            input_schema={"type": "object"},
 50            steps=[
 51                MemoryWriteStep(
 52                    step_id="seed_constraints",
 53                    namespace="design_constraints",
 54                    # Seed memory first so downstream reads/delegates operate on explicit constraints.
 55                    records_builder=lambda _context: [
 56                        {
 57                            "content": "Constraint: reduce service time by at least 20 percent.",
 58                            "metadata": {"kind": "constraint"},
 59                        },
 60                        {
 61                            "content": "Constraint: preserve ingress protection sealing.",
 62                            "metadata": {"kind": "constraint"},
 63                        },
 64                    ],
 65                ),
 66                MemoryReadStep(
 67                    step_id="read_constraints",
 68                    namespace="design_constraints",
 69                    dependencies=("seed_constraints",),
 70                    top_k=5,
 71                    query_builder=lambda _context: {
 72                        "text": "service time constraint",
 73                        "metadata_filters": {"kind": "constraint"},
 74                    },
 75                ),
 76                DelegateBatchStep(
 77                    step_id="peer_batch",
 78                    dependencies=("read_constraints",),
 79                    fail_fast=False,
 80                    calls_builder=lambda context: [
 81                        {
 82                            "call_id": "manufacturing_peer",
 83                            "delegate": manufacturing_peer,
 84                            "prompt": (
 85                                "Propose manufacturing-friendly maintenance improvements using "
 86                                "retrieved constraints count="
 87                                f"{context['dependency_results']['read_constraints']['output']['count']}."
 88                            ),
 89                        },
 90                        {
 91                            "call_id": "reliability_peer",
 92                            "delegate": reliability_peer,
 93                            "prompt": "Propose reliability-focused maintenance improvements.",
 94                        },
 95                    ],
 96                ),
 97                LogicStep(
 98                    step_id="finalize",
 99                    dependencies=("read_constraints", "peer_batch"),
100                    handler=lambda context: {
101                        "constraints_found": (context["dependency_results"]["read_constraints"]["output"]["count"]),
102                        "delegate_calls": len(context["dependency_results"]["peer_batch"]["output"].get("results", [])),
103                        "final_delegate_output": (
104                            context["dependency_results"]["peer_batch"]["output"].get("final_output")
105                        ),
106                    },
107                ),
108            ],
109        )
110        result = workflow.run({}, request_id=request_id)
111
112    # Print the results
113    summary = result.summary()
114    print(json.dumps(summary, ensure_ascii=True, indent=2, sort_keys=True))
115
116
117if __name__ == "__main__":
118    main()

Expected Results

Run Command

PYTHONPATH=src python3 examples/workflow/workflow_delegate_and_memory_steps.py

Example output shape (values vary by run):

{
  "success": true,
  "final_output": "<example-specific payload>",
  "terminated_reason": "<string-or-null>",
  "error": null,
  "trace": {
    "request_id": "<request-id>",
    "trace_dir": "artifacts/examples/traces",
    "trace_path": "artifacts/examples/traces/run_<timestamp>_<request_id>.jsonl"
  }
}

References