Workflow Delegate And Memory Steps
Source: examples/workflow/workflow_delegate_and_memory_steps.py
Introduction
Generative Agents and MemGPT both emphasize durable memory as a first-class runtime primitive, while AutoGen demonstrates delegation across specialized roles. This example composes delegate and memory steps in a single workflow so context propagation and role handoff remain explicit.
Technical Implementation
Configure
Tracerwith JSONL + console output so each run emits machine-readable traces and lifecycle logs.Build the runtime surface (public APIs only) and execute
Workflow.run(...)with a fixedrequest_id.Capture structured outputs from runtime execution and preserve termination metadata for analysis.
Persist and query context via
SQLiteMemoryStoreto demonstrate memory-backed workflow behavior.Print a compact JSON payload including
trace_infofor deterministic tests and docs examples.
flowchart LR
A["Input prompt or scenario"] --> B["main(): runtime wiring"]
B --> C["Workflow.run(...)"]
C --> D["WorkflowRuntime schedules step graph (DelegateBatchStep, LogicStep, MemoryReadStep, MemoryWriteStep)"]
C --> E["Tracer JSONL + console events"]
D --> F["ExecutionResult/payload"]
E --> F
F --> G["Printed JSON output"]
1from __future__ import annotations
2
3import json
4from pathlib import Path
5
6from design_research_agents import (
7 DelegateBatchStep,
8 DirectLLMCall,
9 LlamaCppServerLLMClient,
10 LogicStep,
11 MemoryReadStep,
12 MemoryWriteStep,
13 Tracer,
14 Workflow,
15)
16from design_research_agents.memory import SQLiteMemoryStore
17
18
19def main() -> None:
20 """Execute memory and delegate-batch primitives in one traced workflow."""
21 # Stable request ids keep workflow trace artifacts deterministic for docs snapshots.
22 request_id = "example-workflow-delegate-memory-design-001"
23 tracer = Tracer(
24 enabled=True,
25 trace_dir=Path("artifacts/examples/traces"),
26 enable_jsonl=True,
27 enable_console=True,
28 )
29 db_path = Path("artifacts/examples/workflow_delegate_and_memory.sqlite3")
30 db_path.parent.mkdir(parents=True, exist_ok=True)
31 # Reset persisted state so each example run starts from the same memory baseline.
32 if db_path.exists():
33 db_path.unlink()
34
35 # Run the workflow using public runtime surfaces. Using this with statement will automatically close the
36 # memory store and managed client when the example is done.
37 with (
38 SQLiteMemoryStore(db_path=db_path) as store,
39 LlamaCppServerLLMClient() as llm_client,
40 ):
41 # Two delegates share the same backend client to model role-specific prompts over one transport.
42 manufacturing_peer = DirectLLMCall(llm_client=llm_client, tracer=tracer)
43 reliability_peer = DirectLLMCall(llm_client=llm_client, tracer=tracer)
44
45 workflow = Workflow(
46 tool_runtime=None,
47 memory_store=store,
48 tracer=tracer,
49 input_schema={"type": "object"},
50 steps=[
51 MemoryWriteStep(
52 step_id="seed_constraints",
53 namespace="design_constraints",
54 # Seed memory first so downstream reads/delegates operate on explicit constraints.
55 records_builder=lambda _context: [
56 {
57 "content": "Constraint: reduce service time by at least 20 percent.",
58 "metadata": {"kind": "constraint"},
59 },
60 {
61 "content": "Constraint: preserve ingress protection sealing.",
62 "metadata": {"kind": "constraint"},
63 },
64 ],
65 ),
66 MemoryReadStep(
67 step_id="read_constraints",
68 namespace="design_constraints",
69 dependencies=("seed_constraints",),
70 top_k=5,
71 query_builder=lambda _context: {
72 "text": "service time constraint",
73 "metadata_filters": {"kind": "constraint"},
74 },
75 ),
76 DelegateBatchStep(
77 step_id="peer_batch",
78 dependencies=("read_constraints",),
79 fail_fast=False,
80 calls_builder=lambda context: [
81 {
82 "call_id": "manufacturing_peer",
83 "delegate": manufacturing_peer,
84 "prompt": (
85 "Propose manufacturing-friendly maintenance improvements using "
86 "retrieved constraints count="
87 f"{context['dependency_results']['read_constraints']['output']['count']}."
88 ),
89 },
90 {
91 "call_id": "reliability_peer",
92 "delegate": reliability_peer,
93 "prompt": "Propose reliability-focused maintenance improvements.",
94 },
95 ],
96 ),
97 LogicStep(
98 step_id="finalize",
99 dependencies=("read_constraints", "peer_batch"),
100 handler=lambda context: {
101 "constraints_found": (context["dependency_results"]["read_constraints"]["output"]["count"]),
102 "delegate_calls": len(context["dependency_results"]["peer_batch"]["output"].get("results", [])),
103 "final_delegate_output": (
104 context["dependency_results"]["peer_batch"]["output"].get("final_output")
105 ),
106 },
107 ),
108 ],
109 )
110 result = workflow.run({}, request_id=request_id)
111
112 # Print the results
113 summary = result.summary()
114 print(json.dumps(summary, ensure_ascii=True, indent=2, sort_keys=True))
115
116
117if __name__ == "__main__":
118 main()
Expected Results
Run Command
PYTHONPATH=src python3 examples/workflow/workflow_delegate_and_memory_steps.py
Example output shape (values vary by run):
{
"success": true,
"final_output": "<example-specific payload>",
"terminated_reason": "<string-or-null>",
"error": null,
"trace": {
"request_id": "<request-id>",
"trace_dir": "artifacts/examples/traces",
"trace_path": "artifacts/examples/traces/run_<timestamp>_<request_id>.jsonl"
}
}