"""Configuration and typed descriptors for toolbox sources."""
from __future__ import annotations
import json
from collections.abc import Callable, Mapping
from dataclasses import dataclass, field
from pathlib import Path
from typing import Literal
CallableToolHandler = Callable[[Mapping[str, object]], object]
@dataclass(slots=True, frozen=True, kw_only=True)
class CoreToolsConfig:
"""Configuration for built-in core tools."""
enabled: bool = True
"""Whether built-in core tools are registered."""
allow_network: bool = False
"""Whether core tools may use outbound network access."""
allow_writes_outside_artifacts: bool = False
"""Whether core tools may write outside the artifacts directory."""
allowed_commands: tuple[str, ...] = (
"git",
"rg",
"python",
"python3",
"uv",
"ruff",
"pytest",
)
"""Command allowlist enforced by shell-executing core tools."""
artifacts_dir: str = "artifacts"
"""Directory used for tool-generated artifacts."""
workspace_root: str = "."
"""Workspace root path exposed to filesystem-aware tools."""
[docs]
@dataclass(slots=True, frozen=True, kw_only=True)
class MCPServerConfig:
"""External MCP server definition."""
id: str
"""Unique identifier for the server. This is used to reference the server in
tool definitions and logs."""
type: Literal["stdio"] = "stdio"
"""Communication protocol to use with the server. Currently, only 'stdio'
is supported, which means the server will be launched as a subprocess and
communicated with via its standard input and output streams."""
command: tuple[str, ...] = ()
"""Command to launch the server, specified as a tuple of strings. The first
element should be the executable, and the subsequent elements are its arguments."""
timeout_s: int = 20
"""Timeout in seconds for server responses before treating it as unresponsive."""
env_allowlist: tuple[str, ...] = (
"PATH",
"HOME",
"USER",
"LANG",
"LC_ALL",
"PYTHONPATH",
"VIRTUAL_ENV",
)
"""Allowlist of environment variable names that will be passed to the server process.
Only variables in this list will be included in the server's environment, which
helps to limit exposure of sensitive information and reduce the attack surface."""
env: dict[str, str] = field(default_factory=dict)
"""Explicit environment variables to set for the server process. This is a mapping of variable
names to their desired values. These variables will be included in the server's environment in
addition to any variables from the allowlist that are present in the parent process."""
@dataclass(slots=True, frozen=True, kw_only=True)
class McpConfig:
"""Configuration for attached MCP servers."""
enabled: bool = False
"""Whether MCP-backed tools are enabled."""
servers: tuple[MCPServerConfig, ...] = ()
"""Configured external MCP servers to attach."""
@dataclass(slots=True, frozen=True, kw_only=True)
class ScriptToolsConfig:
"""Configuration for explicitly declared script tools."""
enabled: bool = False
"""Whether script-backed tools are enabled."""
tools: tuple[ScriptToolConfig, ...] = ()
"""Explicit script-backed tool definitions."""
@dataclass(slots=True, frozen=True, kw_only=True)
class ToolRuntimeConfig:
"""Top-level configuration for source-enabled toolbox runtime."""
core_tools: CoreToolsConfig = field(default_factory=CoreToolsConfig)
"""Configuration block for built-in core tools."""
mcp: McpConfig = field(default_factory=McpConfig)
"""Configuration block for MCP-backed tools."""
script_tools: ScriptToolsConfig = field(default_factory=ScriptToolsConfig)
"""Configuration block for explicit script tools."""
def load_tool_runtime_config(path: str) -> ToolRuntimeConfig:
"""Load toolbox runtime config from JSON.
Args:
path: Path to JSON configuration file.
Returns:
Parsed runtime configuration.
Raises:
ValueError: If configuration shape or values are invalid.
"""
try:
with open(path, encoding="utf-8") as handle:
payload = json.load(handle)
except json.JSONDecodeError as exc:
raise ValueError(f"Tool runtime config must be valid JSON: {exc}") from exc
if not isinstance(payload, dict):
raise ValueError("Tool runtime config root must be a mapping.")
if "lazy_tools" in payload:
raise ValueError("Use 'script_tools' instead of the removed 'lazy_tools' key.")
core_raw = payload.get("core_tools", {})
mcp_raw = payload.get("mcp", {})
script_raw = payload.get("script_tools", {})
# Parse sections independently so malformed optional blocks fail with targeted messages.
core_cfg = _parse_core_config(core_raw)
mcp_cfg = _parse_mcp_config(mcp_raw)
script_cfg = _parse_script_config(script_raw)
return ToolRuntimeConfig(core_tools=core_cfg, mcp=mcp_cfg, script_tools=script_cfg)
def _parse_core_config(raw: object) -> CoreToolsConfig:
"""Parse the ``core_tools`` configuration section.
Args:
raw: Raw section payload from parsed configuration.
Returns:
Normalized core-tools configuration.
"""
if not isinstance(raw, dict):
return CoreToolsConfig()
defaults = CoreToolsConfig()
# Fall back to constructor defaults for omitted keys to keep config files concise.
return CoreToolsConfig(
enabled=bool(raw.get("enabled", True)),
allow_network=bool(raw.get("allow_network", False)),
allow_writes_outside_artifacts=bool(raw.get("allow_writes_outside_artifacts", False)),
allowed_commands=_parse_str_list(raw.get("allowed_commands")) or defaults.allowed_commands,
artifacts_dir=_parse_str(raw.get("artifacts_dir")) or "artifacts",
workspace_root=_parse_str(raw.get("workspace_root")) or ".",
)
def _parse_mcp_config(raw: object) -> McpConfig:
"""Parse the ``mcp`` configuration section.
Args:
raw: Raw section payload from parsed configuration.
Returns:
Normalized MCP configuration.
Raises:
ValueError: If server definitions are malformed.
"""
if not isinstance(raw, dict):
return McpConfig()
enabled = bool(raw.get("enabled", False))
defaults = MCPServerConfig(id="__defaults__")
servers_raw = raw.get("servers", [])
parsed_servers: list[MCPServerConfig] = []
if isinstance(servers_raw, list):
for index, item in enumerate(servers_raw):
if not isinstance(item, dict):
raise ValueError(f"mcp.servers[{index}] must be a mapping.")
server_id = _parse_str(item.get("id"))
if server_id is None:
raise ValueError(f"mcp.servers[{index}].id is required.")
server_type = _parse_str(item.get("type")) or "stdio"
if server_type != "stdio":
raise ValueError(f"mcp.servers[{index}].type '{server_type}' is not supported.")
command = _parse_str_list(item.get("command"))
if not command:
raise ValueError(f"mcp.servers[{index}].command must be a non-empty string list.")
timeout_s = _parse_int(item.get("timeout_s"), default=20)
env_allowlist = _parse_str_list(item.get("env_allowlist")) or defaults.env_allowlist
env = _parse_env(item.get("env"))
parsed_servers.append(
MCPServerConfig(
id=server_id,
type="stdio",
command=command,
timeout_s=timeout_s,
env_allowlist=env_allowlist,
env=env,
)
)
return McpConfig(enabled=enabled, servers=tuple(parsed_servers))
def _parse_script_config(raw: object) -> ScriptToolsConfig:
"""Parse the ``script_tools`` configuration section.
Args:
raw: Raw section payload from parsed configuration.
Returns:
Normalized script-tools configuration.
Raises:
ValueError: If tool definitions are malformed.
"""
if not isinstance(raw, dict):
return ScriptToolsConfig()
enabled = bool(raw.get("enabled", False))
tools_raw = raw.get("tools", [])
parsed_tools: list[ScriptToolConfig] = []
if isinstance(tools_raw, list):
for index, item in enumerate(tools_raw):
if not isinstance(item, dict):
raise ValueError(f"script_tools.tools[{index}] must be a mapping.")
name = _parse_str(item.get("name"))
path = _parse_str(item.get("path"))
if name is None:
raise ValueError(f"script_tools.tools[{index}].name is required.")
if path is None:
raise ValueError(f"script_tools.tools[{index}].path is required.")
description = _parse_str(item.get("description")) or f"Script tool {name}"
input_schema = _parse_mapping(
item.get("input_schema"),
default={
"type": "object",
"additionalProperties": True,
"properties": {},
"required": [],
},
)
output_schema = _parse_mapping(item.get("output_schema"), default={"type": "object"})
permissions = _parse_str_list(item.get("permissions"))
risky = _parse_optional_bool(item.get("risky"))
commands = _parse_str_list(item.get("commands"))
timeout_s = _parse_int(item.get("timeout_s"), default=30)
parsed_tools.append(
ScriptToolConfig(
name=name,
path=str(Path(path).expanduser()),
description=description,
input_schema=input_schema,
output_schema=output_schema,
filesystem_read=bool(item.get("filesystem_read", False)),
filesystem_write=bool(item.get("filesystem_write", False)),
network=bool(item.get("network", False)),
commands=commands,
timeout_s=timeout_s,
permissions=permissions,
risky=risky,
)
)
return ScriptToolsConfig(enabled=enabled, tools=tuple(parsed_tools))
def _parse_str(value: object) -> str | None:
"""Normalize an optional string value.
Args:
value: Raw value to normalize.
Returns:
Stripped string, or ``None`` when value is empty/non-string.
"""
if not isinstance(value, str):
return None
normalized = value.strip()
return normalized or None
def _parse_str_list(value: object) -> tuple[str, ...]:
"""Parse a list of strings into a normalized tuple.
Args:
value: Raw value expected to be a list of strings.
Returns:
Tuple of non-empty stripped strings.
Raises:
ValueError: If ``value`` is not a list of strings.
"""
if value is None:
return ()
if not isinstance(value, list):
raise ValueError("Expected list of strings.")
normalized: list[str] = []
for item in value:
if not isinstance(item, str):
raise ValueError("Expected list of strings.")
stripped = item.strip()
if stripped:
normalized.append(stripped)
return tuple(normalized)
def _parse_int(value: object, *, default: int) -> int:
"""Parse an optional integer with default fallback.
Args:
value: Raw value to parse.
default: Default integer when ``value`` is ``None``.
Returns:
Parsed integer value.
Raises:
ValueError: If ``value`` is not an integer.
"""
if value is None:
return default
if isinstance(value, bool) or not isinstance(value, int):
raise ValueError("Expected integer value.")
return value
def _parse_optional_bool(value: object) -> bool | None:
"""Parse an optional boolean value.
Args:
value: Raw value to parse.
Returns:
Parsed boolean value, or ``None`` when value is absent.
Raises:
ValueError: If ``value`` is not a boolean.
"""
if value is None:
return None
if not isinstance(value, bool):
raise ValueError("Expected bool value.")
return value
def _parse_mapping(value: object, *, default: dict[str, object]) -> dict[str, object]:
"""Parse an optional mapping with default fallback.
Args:
value: Raw value to parse.
default: Default mapping when ``value`` is ``None``.
Returns:
Parsed mapping copied into a dictionary.
Raises:
ValueError: If ``value`` is not a mapping.
"""
if value is None:
return dict(default)
if not isinstance(value, Mapping):
raise ValueError("Expected mapping value.")
return dict(value)
def _parse_env(value: object) -> dict[str, str]:
"""Parse environment-variable mapping for MCP server execution.
Args:
value: Raw value expected to be a string-keyed mapping.
Returns:
Parsed environment mapping.
Raises:
ValueError: If ``value`` is not a valid mapping.
"""
if value is None:
return {}
if not isinstance(value, dict):
raise ValueError("Expected env mapping.")
parsed: dict[str, str] = {}
for key, item in value.items():
if not isinstance(key, str) or not key.strip():
raise ValueError("Environment variable names must be non-empty strings.")
parsed[key.strip()] = str(item)
return parsed
__all__ = [
"CallableToolConfig",
"CallableToolHandler",
"CoreToolsConfig",
"MCPServerConfig",
"McpConfig",
"ScriptToolConfig",
"ScriptToolsConfig",
"ToolRuntimeConfig",
"load_tool_runtime_config",
]