Workflow Schema Mode#

Source: examples/workflow/workflow_schema_mode.py

Introduction#

JSON Schema and function-calling conventions are central for reliable machine-to-machine workflow steps, while the Responses API anchors current structured request/response patterns. This example illustrates schema-mode workflow execution where each step contract is explicit and testable.

Technical Implementation#

  1. Configure Tracer with JSONL + console output so each run emits machine-readable traces and lifecycle logs.

  2. Build the runtime surface (public APIs only) and execute Workflow.run(...) with a fixed request_id.

  3. Configure and invoke Toolbox integrations (core/script/MCP/callable) before assembling the final payload.

  4. Print a compact JSON payload including trace_info for deterministic tests and docs examples.

The diagram below is generated from the example’s configured Workflow.

        flowchart LR
    workflow_entry["Workflow Entrypoint"]
    step_1["describe_dataset<br/>ToolStep<br/>tool=data.describe"]
    step_2["load_sample<br/>ToolStep<br/>tool=data.load_csv"]
    step_3["quality_gate<br/>LogicStep"]
    step_4["persist_report<br/>ToolStep<br/>tool=fs.write_text"]
    step_5["finalize<br/>LogicStep"]
    workflow_entry --> step_1
    step_1 --> step_2
    step_1 --> step_3
    step_2 --> step_3
    step_3 --> step_4
    step_4 --> step_5
    
  1from __future__ import annotations
  2
  3import json
  4from pathlib import Path
  5
  6import design_research_agents as drag
  7
  8WORKFLOW_DIAGRAM_DIRECTION = "LR"
  9
 10INPUT_SCHEMA: dict[str, object] = {
 11    "type": "object",
 12    "required": [
 13        "dataset_csv_path",
 14        "quality_report_path",
 15        "required_columns",
 16        "sample_nrows",
 17        "max_missing_ratio_per_column",
 18    ],
 19    "properties": {
 20        "dataset_csv_path": {"type": "string"},
 21        "quality_report_path": {"type": "string"},
 22        "required_columns": {"type": "array", "items": {"type": "string"}},
 23        "sample_nrows": {"type": "integer"},
 24        "max_missing_ratio_per_column": {"type": "number"},
 25    },
 26    "additionalProperties": False,
 27}
 28
 29
 30def _summarize(result: drag.ExecutionResult) -> dict[str, object]:
 31    return result.summary()
 32
 33
 34def build_example_workflow(
 35    *,
 36    tracer: drag.Tracer | None = None,
 37    tool_runtime: object | None = None,
 38) -> drag.Workflow:
 39    """Build the schema-mode workflow used for runtime illustration and docs diagrams."""
 40    return drag.Workflow(
 41        tool_runtime=tool_runtime,
 42        tracer=tracer,
 43        steps=[
 44            drag.ToolStep(
 45                step_id="describe_dataset",
 46                tool_name="data.describe",
 47                input_builder=lambda context: {
 48                    "path": context["inputs"]["dataset_csv_path"],
 49                    "kind": "csv",
 50                },
 51            ),
 52            drag.ToolStep(
 53                step_id="load_sample",
 54                tool_name="data.load_csv",
 55                dependencies=("describe_dataset",),
 56                input_builder=lambda context: {
 57                    "path": context["inputs"]["dataset_csv_path"],
 58                    "nrows": context["inputs"]["sample_nrows"],
 59                },
 60            ),
 61            drag.LogicStep(
 62                step_id="quality_gate",
 63                dependencies=("describe_dataset", "load_sample"),
 64                handler=lambda context: {
 65                    "row_count": (context["dependency_results"]["describe_dataset"]["output"]["result"]["rows"]),
 66                    "sample_count": (context["dependency_results"]["load_sample"]["output"]["result"]["count"]),
 67                    "required_columns": context["inputs"]["required_columns"],
 68                    "threshold": context["inputs"]["max_missing_ratio_per_column"],
 69                },
 70            ),
 71            drag.ToolStep(
 72                step_id="persist_report",
 73                tool_name="fs.write_text",
 74                dependencies=("quality_gate",),
 75                input_builder=lambda context: {
 76                    "path": context["inputs"]["quality_report_path"],
 77                    "content": str(context["dependency_results"]["quality_gate"]["output"]) + "\n",
 78                    "overwrite": True,
 79                },
 80            ),
 81            drag.LogicStep(
 82                step_id="finalize",
 83                dependencies=("persist_report",),
 84                handler=lambda context: {
 85                    "report_path": (context["dependency_results"]["persist_report"]["output"]["result"]["path"])
 86                },
 87            ),
 88        ],
 89        input_schema=INPUT_SCHEMA,
 90    )
 91
 92
 93def main() -> None:
 94    """Run schema-mode workflow with strict and relaxed quality thresholds."""
 95    tracer = drag.Tracer(
 96        enabled=True,
 97        trace_dir=Path("artifacts/examples/traces"),
 98        enable_jsonl=True,
 99        enable_console=True,
100    )
101    dataset_path = Path("artifacts/examples/design_schema_dataset.csv")
102    dataset_path.parent.mkdir(parents=True, exist_ok=True)
103    dataset_path.write_text(
104        "\n".join(
105            [
106                "component_id,variant,serviceability_score,notes",
107                "C001,A,4.2,Quick access screws",
108                "C002,A,3.8,",
109                "C003,B,,Needs gasket redesign",
110                "C004,B,4.9,Tool-less latch",
111                "C005,A,2.7,Cable route is cramped",
112            ]
113        )
114        + "\n",
115        encoding="utf-8",
116    )
117
118    # Run the schema-mode workflow using public runtime surfaces. Using this with statement will automatically
119    # close the tool runtime when the example is done.
120    with drag.Toolbox() as tool_runtime:
121        workflow = build_example_workflow(tracer=tracer, tool_runtime=tool_runtime)
122
123        # Use explicit strict and relaxed ids so each policy run is traceable independently.
124        strict_request_id = "example-workflow-schema-design-strict-001"
125        strict_result = workflow.run(
126            {
127                "dataset_csv_path": str(dataset_path),
128                "required_columns": ["component_id", "variant", "serviceability_score", "notes"],
129                "sample_nrows": 3,
130                "quality_report_path": "artifacts/examples/design_schema_quality_strict.txt",
131                "max_missing_ratio_per_column": 0.2,
132            },
133            execution_mode="sequential",
134            request_id=strict_request_id,
135        )
136
137        # Use explicit strict and relaxed ids so each policy run is traceable independently.
138        relaxed_request_id = "example-workflow-schema-design-relaxed-001"
139        relaxed_result = workflow.run(
140            {
141                "dataset_csv_path": str(dataset_path),
142                "required_columns": ["component_id", "variant", "serviceability_score", "notes"],
143                "sample_nrows": 5,
144                "quality_report_path": "artifacts/examples/design_schema_quality_relaxed.txt",
145                "max_missing_ratio_per_column": 0.45,
146            },
147            execution_mode="dag",
148            request_id=relaxed_request_id,
149        )
150
151    # Print the results
152    print(
153        json.dumps(
154            {
155                "strict_run": _summarize(strict_result),
156                "relaxed_run": _summarize(relaxed_result),
157            },
158            ensure_ascii=True,
159            indent=2,
160            sort_keys=True,
161        )
162    )
163
164
165if __name__ == "__main__":
166    main()

Expected Results#

Run Command

PYTHONPATH=src python3 examples/workflow/workflow_schema_mode.py

Example output shape (values vary by run):

{
  "strict_run": {
    "success": true,
    "final_output": "<example-specific payload>",
    "terminated_reason": "<string-or-null>",
    "error": null,
    "trace": {
      "request_id": "<request-id>",
      "trace_dir": "artifacts/examples/traces",
      "trace_path": "artifacts/examples/traces/run_<timestamp>_<request_id>.jsonl"
    }
  },
  "relaxed_run": {
    "success": true,
    "final_output": "<example-specific payload>",
    "terminated_reason": "<string-or-null>",
    "error": null,
    "trace": {
      "request_id": "<request-id>",
      "trace_dir": "artifacts/examples/traces",
      "trace_path": "artifacts/examples/traces/run_<timestamp>_<request_id>.jsonl"
    }
  }
}

References#