Workflow Schema Mode
Source: examples/workflow/workflow_schema_mode.py
Introduction
JSON Schema and function-calling conventions are central for reliable machine-to-machine workflow steps, while the Responses API anchors current structured request/response patterns. This example illustrates schema-mode workflow execution where each step contract is explicit and testable.
Technical Implementation
Configure
Tracerwith JSONL + console output so each run emits machine-readable traces and lifecycle logs.Build the runtime surface (public APIs only) and execute
Workflow.run(...)with a fixedrequest_id.Configure and invoke
Toolboxintegrations (core/script/MCP/callable) before assembling the final payload.Print a compact JSON payload including
trace_infofor deterministic tests and docs examples.
flowchart LR
A["Input prompt or scenario"] --> B["main(): runtime wiring"]
B --> C["Workflow.run(...)"]
C --> D["WorkflowRuntime schedules step graph (LogicStep, ToolStep)"]
C --> E["Tracer JSONL + console events"]
D --> F["ExecutionResult/payload"]
E --> F
F --> G["Printed JSON output"]
1from __future__ import annotations
2
3import json
4from pathlib import Path
5
6from design_research_agents import ExecutionResult, LogicStep, Toolbox, ToolStep, Tracer, Workflow
7
8INPUT_SCHEMA: dict[str, object] = {
9 "type": "object",
10 "required": [
11 "dataset_csv_path",
12 "quality_report_path",
13 "required_columns",
14 "sample_nrows",
15 "max_missing_ratio_per_column",
16 ],
17 "properties": {
18 "dataset_csv_path": {"type": "string"},
19 "quality_report_path": {"type": "string"},
20 "required_columns": {"type": "array", "items": {"type": "string"}},
21 "sample_nrows": {"type": "integer"},
22 "max_missing_ratio_per_column": {"type": "number"},
23 },
24 "additionalProperties": False,
25}
26
27
28def _summarize(result: ExecutionResult) -> dict[str, object]:
29 return result.summary()
30
31
32def main() -> None:
33 """Run schema-mode workflow with strict and relaxed quality thresholds."""
34 tracer = Tracer(
35 enabled=True,
36 trace_dir=Path("artifacts/examples/traces"),
37 enable_jsonl=True,
38 enable_console=True,
39 )
40 dataset_path = Path("artifacts/examples/design_schema_dataset.csv")
41 dataset_path.parent.mkdir(parents=True, exist_ok=True)
42 dataset_path.write_text(
43 "\n".join(
44 [
45 "component_id,variant,serviceability_score,notes",
46 "C001,A,4.2,Quick access screws",
47 "C002,A,3.8,",
48 "C003,B,,Needs gasket redesign",
49 "C004,B,4.9,Tool-less latch",
50 "C005,A,2.7,Cable route is cramped",
51 ]
52 )
53 + "\n",
54 encoding="utf-8",
55 )
56
57 # Run the schema-mode workflow using public runtime surfaces. Using this with statement will automatically
58 # close the tool runtime when the example is done.
59 with Toolbox() as tool_runtime:
60 workflow = Workflow(
61 tool_runtime=tool_runtime,
62 tracer=tracer,
63 steps=[
64 ToolStep(
65 step_id="describe_dataset",
66 tool_name="data.describe",
67 input_builder=lambda context: {
68 "path": context["inputs"]["dataset_csv_path"],
69 "kind": "csv",
70 },
71 ),
72 ToolStep(
73 step_id="load_sample",
74 tool_name="data.load_csv",
75 dependencies=("describe_dataset",),
76 input_builder=lambda context: {
77 "path": context["inputs"]["dataset_csv_path"],
78 "nrows": context["inputs"]["sample_nrows"],
79 },
80 ),
81 LogicStep(
82 step_id="quality_gate",
83 dependencies=("describe_dataset", "load_sample"),
84 handler=lambda context: {
85 "row_count": (context["dependency_results"]["describe_dataset"]["output"]["result"]["rows"]),
86 "sample_count": (context["dependency_results"]["load_sample"]["output"]["result"]["count"]),
87 "required_columns": context["inputs"]["required_columns"],
88 "threshold": context["inputs"]["max_missing_ratio_per_column"],
89 },
90 ),
91 ToolStep(
92 step_id="persist_report",
93 tool_name="fs.write_text",
94 dependencies=("quality_gate",),
95 input_builder=lambda context: {
96 "path": context["inputs"]["quality_report_path"],
97 "content": str(context["dependency_results"]["quality_gate"]["output"]) + "\n",
98 "overwrite": True,
99 },
100 ),
101 LogicStep(
102 step_id="finalize",
103 dependencies=("persist_report",),
104 handler=lambda context: {
105 "report_path": (context["dependency_results"]["persist_report"]["output"]["result"]["path"])
106 },
107 ),
108 ],
109 input_schema=INPUT_SCHEMA,
110 )
111
112 # Use explicit strict and relaxed ids so each policy run is traceable independently.
113 strict_request_id = "example-workflow-schema-design-strict-001"
114 strict_result = workflow.run(
115 {
116 "dataset_csv_path": str(dataset_path),
117 "required_columns": ["component_id", "variant", "serviceability_score", "notes"],
118 "sample_nrows": 3,
119 "quality_report_path": "artifacts/examples/design_schema_quality_strict.txt",
120 "max_missing_ratio_per_column": 0.2,
121 },
122 execution_mode="sequential",
123 request_id=strict_request_id,
124 )
125
126 # Use explicit strict and relaxed ids so each policy run is traceable independently.
127 relaxed_request_id = "example-workflow-schema-design-relaxed-001"
128 relaxed_result = workflow.run(
129 {
130 "dataset_csv_path": str(dataset_path),
131 "required_columns": ["component_id", "variant", "serviceability_score", "notes"],
132 "sample_nrows": 5,
133 "quality_report_path": "artifacts/examples/design_schema_quality_relaxed.txt",
134 "max_missing_ratio_per_column": 0.45,
135 },
136 execution_mode="dag",
137 request_id=relaxed_request_id,
138 )
139
140 # Print the results
141 print(
142 json.dumps(
143 {
144 "strict_run": _summarize(strict_result),
145 "relaxed_run": _summarize(relaxed_result),
146 },
147 ensure_ascii=True,
148 indent=2,
149 sort_keys=True,
150 )
151 )
152
153
154if __name__ == "__main__":
155 main()
Expected Results
Run Command
PYTHONPATH=src python3 examples/workflow/workflow_schema_mode.py
Example output shape (values vary by run):
{
"strict_run": {
"success": true,
"final_output": "<example-specific payload>",
"terminated_reason": "<string-or-null>",
"error": null,
"trace": {
"request_id": "<request-id>",
"trace_dir": "artifacts/examples/traces",
"trace_path": "artifacts/examples/traces/run_<timestamp>_<request_id>.jsonl"
}
},
"relaxed_run": {
"success": true,
"final_output": "<example-specific payload>",
"terminated_reason": "<string-or-null>",
"error": null,
"trace": {
"request_id": "<request-id>",
"trace_dir": "artifacts/examples/traces",
"trace_path": "artifacts/examples/traces/run_<timestamp>_<request_id>.jsonl"
}
}
}