Workflow Schema Mode

Source: examples/workflow/workflow_schema_mode.py

Introduction

JSON Schema and function-calling conventions are central for reliable machine-to-machine workflow steps, while the Responses API anchors current structured request/response patterns. This example illustrates schema-mode workflow execution where each step contract is explicit and testable.

Technical Implementation

  1. Configure Tracer with JSONL + console output so each run emits machine-readable traces and lifecycle logs.

  2. Build the runtime surface (public APIs only) and execute Workflow.run(...) with a fixed request_id.

  3. Configure and invoke Toolbox integrations (core/script/MCP/callable) before assembling the final payload.

  4. Print a compact JSON payload including trace_info for deterministic tests and docs examples.

        flowchart LR
    A["Input prompt or scenario"] --> B["main(): runtime wiring"]
    B --> C["Workflow.run(...)"]
    C --> D["WorkflowRuntime schedules step graph (LogicStep, ToolStep)"]
    C --> E["Tracer JSONL + console events"]
    D --> F["ExecutionResult/payload"]
    E --> F
    F --> G["Printed JSON output"]
    
  1from __future__ import annotations
  2
  3import json
  4from pathlib import Path
  5
  6from design_research_agents import ExecutionResult, LogicStep, Toolbox, ToolStep, Tracer, Workflow
  7
  8INPUT_SCHEMA: dict[str, object] = {
  9    "type": "object",
 10    "required": [
 11        "dataset_csv_path",
 12        "quality_report_path",
 13        "required_columns",
 14        "sample_nrows",
 15        "max_missing_ratio_per_column",
 16    ],
 17    "properties": {
 18        "dataset_csv_path": {"type": "string"},
 19        "quality_report_path": {"type": "string"},
 20        "required_columns": {"type": "array", "items": {"type": "string"}},
 21        "sample_nrows": {"type": "integer"},
 22        "max_missing_ratio_per_column": {"type": "number"},
 23    },
 24    "additionalProperties": False,
 25}
 26
 27
 28def _summarize(result: ExecutionResult) -> dict[str, object]:
 29    return result.summary()
 30
 31
 32def main() -> None:
 33    """Run schema-mode workflow with strict and relaxed quality thresholds."""
 34    tracer = Tracer(
 35        enabled=True,
 36        trace_dir=Path("artifacts/examples/traces"),
 37        enable_jsonl=True,
 38        enable_console=True,
 39    )
 40    dataset_path = Path("artifacts/examples/design_schema_dataset.csv")
 41    dataset_path.parent.mkdir(parents=True, exist_ok=True)
 42    dataset_path.write_text(
 43        "\n".join(
 44            [
 45                "component_id,variant,serviceability_score,notes",
 46                "C001,A,4.2,Quick access screws",
 47                "C002,A,3.8,",
 48                "C003,B,,Needs gasket redesign",
 49                "C004,B,4.9,Tool-less latch",
 50                "C005,A,2.7,Cable route is cramped",
 51            ]
 52        )
 53        + "\n",
 54        encoding="utf-8",
 55    )
 56
 57    # Run the schema-mode workflow using public runtime surfaces. Using this with statement will automatically
 58    # close the tool runtime when the example is done.
 59    with Toolbox() as tool_runtime:
 60        workflow = Workflow(
 61            tool_runtime=tool_runtime,
 62            tracer=tracer,
 63            steps=[
 64                ToolStep(
 65                    step_id="describe_dataset",
 66                    tool_name="data.describe",
 67                    input_builder=lambda context: {
 68                        "path": context["inputs"]["dataset_csv_path"],
 69                        "kind": "csv",
 70                    },
 71                ),
 72                ToolStep(
 73                    step_id="load_sample",
 74                    tool_name="data.load_csv",
 75                    dependencies=("describe_dataset",),
 76                    input_builder=lambda context: {
 77                        "path": context["inputs"]["dataset_csv_path"],
 78                        "nrows": context["inputs"]["sample_nrows"],
 79                    },
 80                ),
 81                LogicStep(
 82                    step_id="quality_gate",
 83                    dependencies=("describe_dataset", "load_sample"),
 84                    handler=lambda context: {
 85                        "row_count": (context["dependency_results"]["describe_dataset"]["output"]["result"]["rows"]),
 86                        "sample_count": (context["dependency_results"]["load_sample"]["output"]["result"]["count"]),
 87                        "required_columns": context["inputs"]["required_columns"],
 88                        "threshold": context["inputs"]["max_missing_ratio_per_column"],
 89                    },
 90                ),
 91                ToolStep(
 92                    step_id="persist_report",
 93                    tool_name="fs.write_text",
 94                    dependencies=("quality_gate",),
 95                    input_builder=lambda context: {
 96                        "path": context["inputs"]["quality_report_path"],
 97                        "content": str(context["dependency_results"]["quality_gate"]["output"]) + "\n",
 98                        "overwrite": True,
 99                    },
100                ),
101                LogicStep(
102                    step_id="finalize",
103                    dependencies=("persist_report",),
104                    handler=lambda context: {
105                        "report_path": (context["dependency_results"]["persist_report"]["output"]["result"]["path"])
106                    },
107                ),
108            ],
109            input_schema=INPUT_SCHEMA,
110        )
111
112        # Use explicit strict and relaxed ids so each policy run is traceable independently.
113        strict_request_id = "example-workflow-schema-design-strict-001"
114        strict_result = workflow.run(
115            {
116                "dataset_csv_path": str(dataset_path),
117                "required_columns": ["component_id", "variant", "serviceability_score", "notes"],
118                "sample_nrows": 3,
119                "quality_report_path": "artifacts/examples/design_schema_quality_strict.txt",
120                "max_missing_ratio_per_column": 0.2,
121            },
122            execution_mode="sequential",
123            request_id=strict_request_id,
124        )
125
126        # Use explicit strict and relaxed ids so each policy run is traceable independently.
127        relaxed_request_id = "example-workflow-schema-design-relaxed-001"
128        relaxed_result = workflow.run(
129            {
130                "dataset_csv_path": str(dataset_path),
131                "required_columns": ["component_id", "variant", "serviceability_score", "notes"],
132                "sample_nrows": 5,
133                "quality_report_path": "artifacts/examples/design_schema_quality_relaxed.txt",
134                "max_missing_ratio_per_column": 0.45,
135            },
136            execution_mode="dag",
137            request_id=relaxed_request_id,
138        )
139
140    # Print the results
141    print(
142        json.dumps(
143            {
144                "strict_run": _summarize(strict_result),
145                "relaxed_run": _summarize(relaxed_result),
146            },
147            ensure_ascii=True,
148            indent=2,
149            sort_keys=True,
150        )
151    )
152
153
154if __name__ == "__main__":
155    main()

Expected Results

Run Command

PYTHONPATH=src python3 examples/workflow/workflow_schema_mode.py

Example output shape (values vary by run):

{
  "strict_run": {
    "success": true,
    "final_output": "<example-specific payload>",
    "terminated_reason": "<string-or-null>",
    "error": null,
    "trace": {
      "request_id": "<request-id>",
      "trace_dir": "artifacts/examples/traces",
      "trace_path": "artifacts/examples/traces/run_<timestamp>_<request_id>.jsonl"
    }
  },
  "relaxed_run": {
    "success": true,
    "final_output": "<example-specific payload>",
    "terminated_reason": "<string-or-null>",
    "error": null,
    "trace": {
      "request_id": "<request-id>",
      "trace_dir": "artifacts/examples/traces",
      "trace_path": "artifacts/examples/traces/run_<timestamp>_<request_id>.jsonl"
    }
  }
}

References