Workflow Delegate And Memory Steps#
Source: examples/workflow/workflow_delegate_and_memory_steps.py
Introduction#
Generative Agents and MemGPT both emphasize durable memory as a first-class runtime primitive, while AutoGen demonstrates delegation across specialized roles. This example composes delegate and memory steps in a single workflow so context propagation and role handoff remain explicit.
Technical Implementation#
Configure
Tracerwith JSONL + console output so each run emits machine-readable traces and lifecycle logs.Build the runtime surface (public APIs only) and execute
Workflow.run(...)with a fixedrequest_id.Capture structured outputs from runtime execution and preserve termination metadata for analysis.
Persist and query context via
SQLiteMemoryStoreto demonstrate memory-backed workflow behavior.Print a compact JSON payload including
trace_infofor deterministic tests and docs examples.
The diagram below is generated from the example’s configured Workflow.
flowchart LR
workflow_entry["Workflow Entrypoint"]
step_1["seed_constraints<br/>MemoryWriteStep<br/>namespace=design_constraints"]
step_2["read_constraints<br/>MemoryReadStep<br/>namespace=design_constraints"]
step_3["peer_batch<br/>DelegateBatchStep<br/>batch delegate calls"]
step_4["finalize<br/>LogicStep"]
workflow_entry --> step_1
step_1 --> step_2
step_2 --> step_3
step_2 --> step_4
step_3 --> step_4
1from __future__ import annotations
2
3import json
4from pathlib import Path
5
6import design_research_agents as drag
7from design_research_agents.memory import SQLiteMemoryStore
8
9WORKFLOW_DIAGRAM_DIRECTION = "LR"
10
11
12class _DocDelegate:
13 """Minimal delegate stub used only for docs-diagram workflow construction."""
14
15 def run(self, prompt: str, *, request_id: str | None = None, dependencies: object | None = None) -> object:
16 del prompt, request_id, dependencies
17 raise RuntimeError("Docs-only delegate stub should not be executed.")
18
19
20def build_example_workflow(
21 *,
22 tracer: drag.Tracer | None = None,
23 memory_store: object | None = None,
24 manufacturing_peer: object | None = None,
25 reliability_peer: object | None = None,
26) -> drag.Workflow:
27 """Build the delegate-memory workflow used for docs diagrams and runtime execution."""
28 resolved_manufacturing_peer = manufacturing_peer or _DocDelegate()
29 resolved_reliability_peer = reliability_peer or _DocDelegate()
30 return drag.Workflow(
31 tool_runtime=None,
32 memory_store=memory_store,
33 tracer=tracer,
34 input_schema={"type": "object"},
35 steps=[
36 drag.MemoryWriteStep(
37 step_id="seed_constraints",
38 namespace="design_constraints",
39 records_builder=lambda _context: [
40 {
41 "content": "Constraint: reduce service time by at least 20 percent.",
42 "metadata": {"kind": "constraint"},
43 },
44 {
45 "content": "Constraint: preserve ingress protection sealing.",
46 "metadata": {"kind": "constraint"},
47 },
48 ],
49 ),
50 drag.MemoryReadStep(
51 step_id="read_constraints",
52 namespace="design_constraints",
53 dependencies=("seed_constraints",),
54 top_k=5,
55 query_builder=lambda _context: {
56 "text": "service time constraint",
57 "metadata_filters": {"kind": "constraint"},
58 },
59 ),
60 drag.DelegateBatchStep(
61 step_id="peer_batch",
62 dependencies=("read_constraints",),
63 fail_fast=False,
64 calls_builder=lambda context: [
65 {
66 "call_id": "manufacturing_peer",
67 "delegate": resolved_manufacturing_peer,
68 "prompt": (
69 "Propose manufacturing-friendly maintenance improvements using "
70 "retrieved constraints count="
71 f"{context['dependency_results']['read_constraints']['output']['count']}."
72 ),
73 },
74 {
75 "call_id": "reliability_peer",
76 "delegate": resolved_reliability_peer,
77 "prompt": "Propose reliability-focused maintenance improvements.",
78 },
79 ],
80 ),
81 drag.LogicStep(
82 step_id="finalize",
83 dependencies=("read_constraints", "peer_batch"),
84 handler=lambda context: {
85 "constraints_found": (context["dependency_results"]["read_constraints"]["output"]["count"]),
86 "delegate_calls": len(context["dependency_results"]["peer_batch"]["output"].get("results", [])),
87 "final_delegate_output": (
88 context["dependency_results"]["peer_batch"]["output"].get("final_output")
89 ),
90 },
91 ),
92 ],
93 )
94
95
96def main() -> None:
97 """Execute memory and delegate-batch primitives in one traced workflow."""
98 # Stable request ids keep workflow trace artifacts deterministic for docs snapshots.
99 request_id = "example-workflow-delegate-memory-design-001"
100 tracer = drag.Tracer(
101 enabled=True,
102 trace_dir=Path("artifacts/examples/traces"),
103 enable_jsonl=True,
104 enable_console=True,
105 )
106 db_path = Path("artifacts/examples/workflow_delegate_and_memory.sqlite3")
107 db_path.parent.mkdir(parents=True, exist_ok=True)
108 # Reset persisted state so each example run starts from the same memory baseline.
109 if db_path.exists():
110 db_path.unlink()
111
112 # Run the workflow using public runtime surfaces. Using this with statement will automatically close the
113 # memory store and managed client when the example is done.
114 with (
115 SQLiteMemoryStore(db_path=db_path) as store,
116 drag.LlamaCppServerLLMClient() as llm_client,
117 ):
118 # Two delegates share the same backend client to model role-specific prompts over one transport.
119 manufacturing_peer = drag.DirectLLMCall(llm_client=llm_client, tracer=tracer)
120 reliability_peer = drag.DirectLLMCall(llm_client=llm_client, tracer=tracer)
121
122 workflow = build_example_workflow(
123 tracer=tracer,
124 memory_store=store,
125 manufacturing_peer=manufacturing_peer,
126 reliability_peer=reliability_peer,
127 )
128 result = workflow.run({}, request_id=request_id)
129
130 # Print the results
131 summary = result.summary()
132 print(json.dumps(summary, ensure_ascii=True, indent=2, sort_keys=True))
133
134
135if __name__ == "__main__":
136 main()
Expected Results#
Run Command
PYTHONPATH=src python3 examples/workflow/workflow_delegate_and_memory_steps.py
Example output shape (values vary by run):
{
"success": true,
"final_output": "<example-specific payload>",
"terminated_reason": "<string-or-null>",
"error": null,
"trace": {
"request_id": "<request-id>",
"trace_dir": "artifacts/examples/traces",
"trace_path": "artifacts/examples/traces/run_<timestamp>_<request_id>.jsonl"
}
}