Workflow Delegate And Memory Steps#

Source: examples/workflow/workflow_delegate_and_memory_steps.py

Introduction#

Generative Agents and MemGPT both emphasize durable memory as a first-class runtime primitive, while AutoGen demonstrates delegation across specialized roles. This example composes delegate and memory steps in a single workflow so context propagation and role handoff remain explicit.

Technical Implementation#

  1. Configure Tracer with JSONL + console output so each run emits machine-readable traces and lifecycle logs.

  2. Build the runtime surface (public APIs only) and execute Workflow.run(...) with a fixed request_id.

  3. Capture structured outputs from runtime execution and preserve termination metadata for analysis.

  4. Persist and query context via SQLiteMemoryStore to demonstrate memory-backed workflow behavior.

  5. Print a compact JSON payload including trace_info for deterministic tests and docs examples.

The diagram below is generated from the example’s configured Workflow.

        flowchart LR
    workflow_entry["Workflow Entrypoint"]
    step_1["seed_constraints<br/>MemoryWriteStep<br/>namespace=design_constraints"]
    step_2["read_constraints<br/>MemoryReadStep<br/>namespace=design_constraints"]
    step_3["peer_batch<br/>DelegateBatchStep<br/>batch delegate calls"]
    step_4["finalize<br/>LogicStep"]
    workflow_entry --> step_1
    step_1 --> step_2
    step_2 --> step_3
    step_2 --> step_4
    step_3 --> step_4
    
  1from __future__ import annotations
  2
  3import json
  4from pathlib import Path
  5
  6import design_research_agents as drag
  7from design_research_agents.memory import SQLiteMemoryStore
  8
  9WORKFLOW_DIAGRAM_DIRECTION = "LR"
 10
 11
 12class _DocDelegate:
 13    """Minimal delegate stub used only for docs-diagram workflow construction."""
 14
 15    def run(self, prompt: str, *, request_id: str | None = None, dependencies: object | None = None) -> object:
 16        del prompt, request_id, dependencies
 17        raise RuntimeError("Docs-only delegate stub should not be executed.")
 18
 19
 20def build_example_workflow(
 21    *,
 22    tracer: drag.Tracer | None = None,
 23    memory_store: object | None = None,
 24    manufacturing_peer: object | None = None,
 25    reliability_peer: object | None = None,
 26) -> drag.Workflow:
 27    """Build the delegate-memory workflow used for docs diagrams and runtime execution."""
 28    resolved_manufacturing_peer = manufacturing_peer or _DocDelegate()
 29    resolved_reliability_peer = reliability_peer or _DocDelegate()
 30    return drag.Workflow(
 31        tool_runtime=None,
 32        memory_store=memory_store,
 33        tracer=tracer,
 34        input_schema={"type": "object"},
 35        steps=[
 36            drag.MemoryWriteStep(
 37                step_id="seed_constraints",
 38                namespace="design_constraints",
 39                records_builder=lambda _context: [
 40                    {
 41                        "content": "Constraint: reduce service time by at least 20 percent.",
 42                        "metadata": {"kind": "constraint"},
 43                    },
 44                    {
 45                        "content": "Constraint: preserve ingress protection sealing.",
 46                        "metadata": {"kind": "constraint"},
 47                    },
 48                ],
 49            ),
 50            drag.MemoryReadStep(
 51                step_id="read_constraints",
 52                namespace="design_constraints",
 53                dependencies=("seed_constraints",),
 54                top_k=5,
 55                query_builder=lambda _context: {
 56                    "text": "service time constraint",
 57                    "metadata_filters": {"kind": "constraint"},
 58                },
 59            ),
 60            drag.DelegateBatchStep(
 61                step_id="peer_batch",
 62                dependencies=("read_constraints",),
 63                fail_fast=False,
 64                calls_builder=lambda context: [
 65                    {
 66                        "call_id": "manufacturing_peer",
 67                        "delegate": resolved_manufacturing_peer,
 68                        "prompt": (
 69                            "Propose manufacturing-friendly maintenance improvements using "
 70                            "retrieved constraints count="
 71                            f"{context['dependency_results']['read_constraints']['output']['count']}."
 72                        ),
 73                    },
 74                    {
 75                        "call_id": "reliability_peer",
 76                        "delegate": resolved_reliability_peer,
 77                        "prompt": "Propose reliability-focused maintenance improvements.",
 78                    },
 79                ],
 80            ),
 81            drag.LogicStep(
 82                step_id="finalize",
 83                dependencies=("read_constraints", "peer_batch"),
 84                handler=lambda context: {
 85                    "constraints_found": (context["dependency_results"]["read_constraints"]["output"]["count"]),
 86                    "delegate_calls": len(context["dependency_results"]["peer_batch"]["output"].get("results", [])),
 87                    "final_delegate_output": (
 88                        context["dependency_results"]["peer_batch"]["output"].get("final_output")
 89                    ),
 90                },
 91            ),
 92        ],
 93    )
 94
 95
 96def main() -> None:
 97    """Execute memory and delegate-batch primitives in one traced workflow."""
 98    # Stable request ids keep workflow trace artifacts deterministic for docs snapshots.
 99    request_id = "example-workflow-delegate-memory-design-001"
100    tracer = drag.Tracer(
101        enabled=True,
102        trace_dir=Path("artifacts/examples/traces"),
103        enable_jsonl=True,
104        enable_console=True,
105    )
106    db_path = Path("artifacts/examples/workflow_delegate_and_memory.sqlite3")
107    db_path.parent.mkdir(parents=True, exist_ok=True)
108    # Reset persisted state so each example run starts from the same memory baseline.
109    if db_path.exists():
110        db_path.unlink()
111
112    # Run the workflow using public runtime surfaces. Using this with statement will automatically close the
113    # memory store and managed client when the example is done.
114    with (
115        SQLiteMemoryStore(db_path=db_path) as store,
116        drag.LlamaCppServerLLMClient() as llm_client,
117    ):
118        # Two delegates share the same backend client to model role-specific prompts over one transport.
119        manufacturing_peer = drag.DirectLLMCall(llm_client=llm_client, tracer=tracer)
120        reliability_peer = drag.DirectLLMCall(llm_client=llm_client, tracer=tracer)
121
122        workflow = build_example_workflow(
123            tracer=tracer,
124            memory_store=store,
125            manufacturing_peer=manufacturing_peer,
126            reliability_peer=reliability_peer,
127        )
128        result = workflow.run({}, request_id=request_id)
129
130    # Print the results
131    summary = result.summary()
132    print(json.dumps(summary, ensure_ascii=True, indent=2, sort_keys=True))
133
134
135if __name__ == "__main__":
136    main()

Expected Results#

Run Command

PYTHONPATH=src python3 examples/workflow/workflow_delegate_and_memory_steps.py

Example output shape (values vary by run):

{
  "success": true,
  "final_output": "<example-specific payload>",
  "terminated_reason": "<string-or-null>",
  "error": null,
  "trace": {
    "request_id": "<request-id>",
    "trace_dir": "artifacts/examples/traces",
    "trace_path": "artifacts/examples/traces/run_<timestamp>_<request_id>.jsonl"
  }
}

References#