Workflow Schema Mode#
Source: examples/workflow/workflow_schema_mode.py
Introduction#
JSON Schema and function-calling conventions are central for reliable machine-to-machine workflow steps, while the Responses API anchors current structured request/response patterns. This example illustrates schema-mode workflow execution where each step contract is explicit and testable.
Technical Implementation#
Configure
Tracerwith JSONL + console output so each run emits machine-readable traces and lifecycle logs.Build the runtime surface (public APIs only) and execute
Workflow.run(...)with a fixedrequest_id.Configure and invoke
Toolboxintegrations (core/script/MCP/callable) before assembling the final payload.Print a compact JSON payload including
trace_infofor deterministic tests and docs examples.
The diagram below is generated from the example’s configured Workflow.
flowchart LR
workflow_entry["Workflow Entrypoint"]
step_1["describe_dataset<br/>ToolStep<br/>tool=data.describe"]
step_2["load_sample<br/>ToolStep<br/>tool=data.load_csv"]
step_3["quality_gate<br/>LogicStep"]
step_4["persist_report<br/>ToolStep<br/>tool=fs.write_text"]
step_5["finalize<br/>LogicStep"]
workflow_entry --> step_1
step_1 --> step_2
step_1 --> step_3
step_2 --> step_3
step_3 --> step_4
step_4 --> step_5
1from __future__ import annotations
2
3import json
4from pathlib import Path
5
6import design_research_agents as drag
7
8WORKFLOW_DIAGRAM_DIRECTION = "LR"
9
10INPUT_SCHEMA: dict[str, object] = {
11 "type": "object",
12 "required": [
13 "dataset_csv_path",
14 "quality_report_path",
15 "required_columns",
16 "sample_nrows",
17 "max_missing_ratio_per_column",
18 ],
19 "properties": {
20 "dataset_csv_path": {"type": "string"},
21 "quality_report_path": {"type": "string"},
22 "required_columns": {"type": "array", "items": {"type": "string"}},
23 "sample_nrows": {"type": "integer"},
24 "max_missing_ratio_per_column": {"type": "number"},
25 },
26 "additionalProperties": False,
27}
28
29
30def _summarize(result: drag.ExecutionResult) -> dict[str, object]:
31 return result.summary()
32
33
34def build_example_workflow(
35 *,
36 tracer: drag.Tracer | None = None,
37 tool_runtime: object | None = None,
38) -> drag.Workflow:
39 """Build the schema-mode workflow used for runtime illustration and docs diagrams."""
40 return drag.Workflow(
41 tool_runtime=tool_runtime,
42 tracer=tracer,
43 steps=[
44 drag.ToolStep(
45 step_id="describe_dataset",
46 tool_name="data.describe",
47 input_builder=lambda context: {
48 "path": context["inputs"]["dataset_csv_path"],
49 "kind": "csv",
50 },
51 ),
52 drag.ToolStep(
53 step_id="load_sample",
54 tool_name="data.load_csv",
55 dependencies=("describe_dataset",),
56 input_builder=lambda context: {
57 "path": context["inputs"]["dataset_csv_path"],
58 "nrows": context["inputs"]["sample_nrows"],
59 },
60 ),
61 drag.LogicStep(
62 step_id="quality_gate",
63 dependencies=("describe_dataset", "load_sample"),
64 handler=lambda context: {
65 "row_count": (context["dependency_results"]["describe_dataset"]["output"]["result"]["rows"]),
66 "sample_count": (context["dependency_results"]["load_sample"]["output"]["result"]["count"]),
67 "required_columns": context["inputs"]["required_columns"],
68 "threshold": context["inputs"]["max_missing_ratio_per_column"],
69 },
70 ),
71 drag.ToolStep(
72 step_id="persist_report",
73 tool_name="fs.write_text",
74 dependencies=("quality_gate",),
75 input_builder=lambda context: {
76 "path": context["inputs"]["quality_report_path"],
77 "content": str(context["dependency_results"]["quality_gate"]["output"]) + "\n",
78 "overwrite": True,
79 },
80 ),
81 drag.LogicStep(
82 step_id="finalize",
83 dependencies=("persist_report",),
84 handler=lambda context: {
85 "report_path": (context["dependency_results"]["persist_report"]["output"]["result"]["path"])
86 },
87 ),
88 ],
89 input_schema=INPUT_SCHEMA,
90 )
91
92
93def main() -> None:
94 """Run schema-mode workflow with strict and relaxed quality thresholds."""
95 tracer = drag.Tracer(
96 enabled=True,
97 trace_dir=Path("artifacts/examples/traces"),
98 enable_jsonl=True,
99 enable_console=True,
100 )
101 dataset_path = Path("artifacts/examples/design_schema_dataset.csv")
102 dataset_path.parent.mkdir(parents=True, exist_ok=True)
103 dataset_path.write_text(
104 "\n".join(
105 [
106 "component_id,variant,serviceability_score,notes",
107 "C001,A,4.2,Quick access screws",
108 "C002,A,3.8,",
109 "C003,B,,Needs gasket redesign",
110 "C004,B,4.9,Tool-less latch",
111 "C005,A,2.7,Cable route is cramped",
112 ]
113 )
114 + "\n",
115 encoding="utf-8",
116 )
117
118 # Run the schema-mode workflow using public runtime surfaces. Using this with statement will automatically
119 # close the tool runtime when the example is done.
120 with drag.Toolbox() as tool_runtime:
121 workflow = build_example_workflow(tracer=tracer, tool_runtime=tool_runtime)
122
123 # Use explicit strict and relaxed ids so each policy run is traceable independently.
124 strict_request_id = "example-workflow-schema-design-strict-001"
125 strict_result = workflow.run(
126 {
127 "dataset_csv_path": str(dataset_path),
128 "required_columns": ["component_id", "variant", "serviceability_score", "notes"],
129 "sample_nrows": 3,
130 "quality_report_path": "artifacts/examples/design_schema_quality_strict.txt",
131 "max_missing_ratio_per_column": 0.2,
132 },
133 execution_mode="sequential",
134 request_id=strict_request_id,
135 )
136
137 # Use explicit strict and relaxed ids so each policy run is traceable independently.
138 relaxed_request_id = "example-workflow-schema-design-relaxed-001"
139 relaxed_result = workflow.run(
140 {
141 "dataset_csv_path": str(dataset_path),
142 "required_columns": ["component_id", "variant", "serviceability_score", "notes"],
143 "sample_nrows": 5,
144 "quality_report_path": "artifacts/examples/design_schema_quality_relaxed.txt",
145 "max_missing_ratio_per_column": 0.45,
146 },
147 execution_mode="dag",
148 request_id=relaxed_request_id,
149 )
150
151 # Print the results
152 print(
153 json.dumps(
154 {
155 "strict_run": _summarize(strict_result),
156 "relaxed_run": _summarize(relaxed_result),
157 },
158 ensure_ascii=True,
159 indent=2,
160 sort_keys=True,
161 )
162 )
163
164
165if __name__ == "__main__":
166 main()
Expected Results#
Run Command
PYTHONPATH=src python3 examples/workflow/workflow_schema_mode.py
Example output shape (values vary by run):
{
"strict_run": {
"success": true,
"final_output": "<example-specific payload>",
"terminated_reason": "<string-or-null>",
"error": null,
"trace": {
"request_id": "<request-id>",
"trace_dir": "artifacts/examples/traces",
"trace_path": "artifacts/examples/traces/run_<timestamp>_<request_id>.jsonl"
}
},
"relaxed_run": {
"success": true,
"final_output": "<example-specific payload>",
"terminated_reason": "<string-or-null>",
"error": null,
"trace": {
"request_id": "<request-id>",
"trace_dir": "artifacts/examples/traces",
"trace_path": "artifacts/examples/traces/run_<timestamp>_<request_id>.jsonl"
}
}
}