Source code for design_research_agents._implementations._patterns._router_delegate_pattern

"""Reusable intent/agent-routing orchestration chunk."""

from __future__ import annotations

from collections.abc import Mapping, Sequence
from dataclasses import dataclass

from design_research_agents._contracts._delegate import Delegate, ExecutionResult
from design_research_agents._contracts._llm import LLMClient
from design_research_agents._contracts._termination import (
    TERMINATED_ROUTING_FAILURE,
    TERMINATED_UNKNOWN_ALTERNATIVE,
)
from design_research_agents._contracts._tools import ToolRuntime, ToolSpec
from design_research_agents._contracts._workflow import DelegateTarget, LogicStep
from design_research_agents._runtime._common._delegate_invocation import invoke_delegate
from design_research_agents._runtime._patterns import (
    MODE_ROUTER_DELEGATE,
    WorkflowBudgetTracker,
    attach_runtime_metadata,
    build_compiled_pattern_execution,
    build_pattern_execution_result,
    normalize_request_id_prefix,
    resolve_pattern_run_context,
)
from design_research_agents._tracing import Tracer
from design_research_agents.workflow import CompiledExecution
from design_research_agents.workflow.workflow import Workflow

from .._shared._agent_internal._agent_routing_runtime_adapter import (
    AgentRoutingToolRuntimeAdapter,
)
from .._shared._agent_internal._input_parsing import (
    extract_prompt as _extract_prompt,
)
from .._shared._agent_internal._json_action_step_runner import (
    JsonActionStepRunner,
)
from .._shared._agent_internal._prompt_overrides import (
    validate_prompt_text,
)
from .._shared._agent_internal._run_options import (
    normalize_input_payload,
)


@dataclass(slots=True, kw_only=True)
class _RoutingExecutionState:
    """Mutable state shared between routing workflow callbacks."""

    router_result: ExecutionResult | None = None
    """Result produced by the router selection step, if it ran."""

    delegated_result: ExecutionResult | None = None
    """Result produced by the selected delegate step, if it ran."""


class _RoutingWorkflowCallbacks:
    """Workflow callback bundle for selection and delegate execution steps."""

    def __init__(
        self,
        *,
        pattern: RouterDelegatePattern,
        router_agent: JsonActionStepRunner,
        prompt: str,
        request_id: str,
        dependencies: Mapping[str, object],
        budget_tracker: WorkflowBudgetTracker,
        runtime_tool_specs: Mapping[str, ToolSpec],
        state: _RoutingExecutionState,
    ) -> None:
        """Store dependencies used by workflow callback methods.

        Args:
            pattern: Router pattern instance owning delegate alternatives.
            router_agent: Router agent used in selection step.
            prompt: Routed user prompt.
            request_id: Resolved request id.
            dependencies: Normalized dependency mapping.
            budget_tracker: Budget tracker collecting model/tool usage.
            runtime_tool_specs: Runtime tool specs keyed by tool name.
            state: Mutable callback state sink.
        """
        self._pattern = pattern
        self._router_agent = router_agent
        self._prompt = prompt
        self._request_id = request_id
        self._dependencies = dependencies
        self._budget_tracker = budget_tracker
        self._runtime_tool_specs = runtime_tool_specs
        self._state = state

    def run_selection(self, context: Mapping[str, object]) -> Mapping[str, object]:
        """Execute router-model selection step.

        Args:
            context: Workflow step context payload (unused).

        Returns:
            Step output describing routing status and selected delegate name.
        """
        del context
        self._state.router_result = self._router_agent.run(
            self._prompt,
            request_id=f"{self._request_id}:agent_routing_router",
            dependencies=self._dependencies,
        )
        router_result = self._state.router_result
        self._budget_tracker.add_model_response(router_result.model_response)
        if not router_result.success:
            return {
                "status": TERMINATED_ROUTING_FAILURE,
                "routing": router_result.metadata.get("routing", {}),
            }

        selected_name = _extract_selected_name_from_router_output(router_result.output)
        return {
            "status": "selected",
            "selected_name": selected_name,
            "routing": router_result.metadata.get("routing", {}),
        }

    def run_delegate(self, context: Mapping[str, object]) -> Mapping[str, object]:
        """Execute selected delegate agent based on selection-step output.

        Args:
            context: Workflow step context containing dependency step outputs.

        Returns:
            Step output describing delegation status and selected agent metadata.
        """
        selection_output = _extract_selection_output(context)
        if selection_output is None:
            return {
                "status": TERMINATED_ROUTING_FAILURE,
                "routing": {},
            }

        status = selection_output.get("status")
        if status != "selected":
            return {
                "status": TERMINATED_ROUTING_FAILURE,
                "routing": selection_output.get("routing", {}),
            }

        selected_name = str(selection_output.get("selected_name", "")).strip()
        selected_delegate = self._pattern._alternatives.get(selected_name)
        if selected_delegate is None:
            return {
                "status": TERMINATED_UNKNOWN_ALTERNATIVE,
                "selected_name": selected_name,
                "routing": selection_output.get("routing", {}),
            }

        delegate_invocation = invoke_delegate(
            delegate=selected_delegate,
            prompt=self._prompt,
            step_context=context,
            request_id=f"{self._request_id}:agent_routing:{selected_name}",
            execution_mode="sequential",
            failure_policy="skip_dependents",
            dependencies=self._dependencies,
        )
        self._state.delegated_result = delegate_invocation.result
        delegated_result = self._state.delegated_result
        self._budget_tracker.add_model_response(delegated_result.model_response)
        self._budget_tracker.add_tool_results(
            tool_results=delegated_result.tool_results,
            tool_specs=self._runtime_tool_specs,
        )
        return {
            "status": "delegated",
            "selected_name": selected_name,
            "delegated_success": delegated_result.success,
            "routing": selection_output.get("routing", {}),
        }


def _extract_selection_output(context: Mapping[str, object]) -> Mapping[str, object] | None:
    """Extract routing selection output from step context.

    Args:
        context: Delegate-step execution context mapping.

    Returns:
        Selection output mapping from ``agent_routing_selection`` when present.
    """
    dependency_results = context.get("dependency_results")
    if not isinstance(dependency_results, Mapping):
        return None
    selection_step = dependency_results.get("agent_routing_selection")
    if not isinstance(selection_step, Mapping):
        return None
    selection_output = selection_step.get("output")
    if not isinstance(selection_output, Mapping):
        return None
    return selection_output


def _build_routing_failure_result(
    *,
    error: str,
    request_id: str,
    dependencies: Mapping[str, object],
    router_result: ExecutionResult,
    budget_tracker: WorkflowBudgetTracker,
    stage: str,
    terminated_reason: str,
    available_alternatives: Sequence[str],
    workflow_payload: Mapping[str, object],
    workflow_artifacts: Sequence[object],
) -> ExecutionResult:
    """Build one attached routing failure result with stable metadata.

    Args:
        error: Human-readable failure description.
        request_id: Request id for correlation metadata.
        dependencies: Normalized dependency mapping.
        router_result: Router selection result used for carry-forward metadata.
        budget_tracker: Runtime budget tracker for aggregate metadata.
        stage: Internal routing stage where failure occurred.
        terminated_reason: Canonical termination reason.
        available_alternatives: Declared delegate names available to the router.
        workflow_payload: Serialized workflow payload for this routing run.
        workflow_artifacts: Normalized workflow artifact entries.

    Returns:
        Execution result carrying normalized routing failure metadata.
    """
    failure = build_pattern_execution_result(
        success=False,
        final_output={},
        terminated_reason=terminated_reason,
        details={
            "selected_alternative": None,
            "available_alternatives": list(available_alternatives),
            "delegated_result": {},
            "router": router_result.metadata.get("routing", {}),
        },
        workflow_payload=dict(workflow_payload),
        artifacts=list(workflow_artifacts),
        request_id=request_id,
        dependencies=dependencies,
        mode=MODE_ROUTER_DELEGATE,
        metadata={"stage": stage, "routing": router_result.metadata.get("routing", {})},
        model_response=router_result.model_response,
        error=error,
    )
    return attach_runtime_metadata(
        agent_result=failure,
        requested_mode=MODE_ROUTER_DELEGATE,
        resolved_mode=MODE_ROUTER_DELEGATE,
        budget_metadata=budget_tracker.as_metadata(),
        extra_metadata=None,
    )


[docs] class RouterDelegatePattern(Delegate): """Routing/delegation pattern built on workflow primitives.""" def __init__( self, *, llm_client: LLMClient, tool_runtime: ToolRuntime, alternatives: Mapping[str, DelegateTarget], alternative_descriptions: Mapping[str, str] | None = None, router_system_prompt: str | None = None, router_user_prompt_template: str | None = None, default_request_id_prefix: str | None = None, default_dependencies: Mapping[str, object] | None = None, tracer: Tracer | None = None, ) -> None: """Store dependencies and initialize workflow-native routing settings. Args: llm_client: LLM client used by the router agent. tool_runtime: Tool runtime used to cost/metadata-account delegated calls. alternatives: Mapping of route keys to delegate objects. alternative_descriptions: Optional descriptions used to guide routing. router_system_prompt: Optional override for router system prompt. router_user_prompt_template: Optional override for router user prompt. default_request_id_prefix: Optional prefix used to derive request ids. default_dependencies: Dependency defaults merged into each run. tracer: Optional tracer used for run-level instrumentation. Raises: ValueError: If no valid route alternatives are supplied. """ self._llm_client = llm_client self._tool_runtime = tool_runtime self._tracer = tracer self.workflow: Workflow | None = None self._agent_routing_runtime: dict[str, object] | None = None self._default_request_id_prefix = normalize_request_id_prefix(default_request_id_prefix) self._default_dependencies = dict(default_dependencies or {}) self._alternatives = { name.strip(): agent for name, agent in alternatives.items() if isinstance(name, str) and name.strip() } if not self._alternatives: raise ValueError("alternatives must include at least one non-empty route key.") self._alternative_descriptions = { name.strip(): description.strip() for name, description in (alternative_descriptions or {}).items() if isinstance(name, str) and name.strip() and isinstance(description, str) and description.strip() } self._router_system_prompt = ( validate_prompt_text( value=router_system_prompt, field_name="router_system_prompt", ) if router_system_prompt is not None else None ) self._router_user_prompt_template = ( validate_prompt_text( value=router_user_prompt_template, field_name="router_user_prompt_template", ) if router_user_prompt_template is not None else None )
[docs] def run( self, prompt: str, *, request_id: str | None = None, dependencies: Mapping[str, object] | None = None, ) -> ExecutionResult: """Execute one intent-routing orchestration run.""" return self.compile( prompt=prompt, request_id=request_id, dependencies=dependencies, ).run()
[docs] def compile( self, prompt: str, *, request_id: str | None = None, dependencies: Mapping[str, object] | None = None, ) -> CompiledExecution: """Compile one intent-routing orchestration run.""" run_context = resolve_pattern_run_context( default_request_id_prefix=self._default_request_id_prefix, default_dependencies=self._default_dependencies, request_id=request_id, dependencies=dependencies, ) normalized_input = normalize_input_payload(prompt) resolved_prompt = _extract_prompt(normalized_input) workflow = self._build_workflow( resolved_prompt, request_id=run_context.request_id, dependencies=run_context.dependencies, ) runtime = self._agent_routing_runtime or {} budget_tracker = runtime.get("budget_tracker") execution_state = runtime.get("execution_state") if not isinstance(budget_tracker, WorkflowBudgetTracker) or not isinstance( execution_state, _RoutingExecutionState ): raise RuntimeError("Agent routing runtime state is unavailable before workflow execution.") return build_compiled_pattern_execution( workflow=workflow, pattern_name="RouterDelegatePattern", request_id=run_context.request_id, dependencies=run_context.dependencies, tracer=self._tracer, input_payload={"prompt": resolved_prompt, "mode": MODE_ROUTER_DELEGATE}, workflow_request_id=f"{run_context.request_id}:router_delegate_workflow", finalize=lambda workflow_result: self._finalize_agent_routing_result( workflow_result=workflow_result, budget_tracker=budget_tracker, execution_state=execution_state, request_id=run_context.request_id, dependencies=run_context.dependencies, ), )
def _build_workflow( self, prompt: str, *, request_id: str, dependencies: Mapping[str, object], ) -> Workflow: """Build the routing workflow for one resolved run context.""" budget_tracker = WorkflowBudgetTracker() routing_tool_runtime = AgentRoutingToolRuntimeAdapter( alternatives=self._alternatives, descriptions=self._alternative_descriptions, ) router_agent = JsonActionStepRunner( llm_client=self._llm_client, tool_runtime=routing_tool_runtime, system_prompt=self._router_system_prompt, user_prompt_template=self._router_user_prompt_template, allowed_tools=tuple(sorted(self._alternatives)), tracer=self._tracer, ) runtime_tool_specs: dict[str, ToolSpec] = {spec.name: spec for spec in self._tool_runtime.list_tools()} execution_state = _RoutingExecutionState() callbacks = _RoutingWorkflowCallbacks( pattern=self, router_agent=router_agent, prompt=prompt, request_id=request_id, dependencies=dependencies, budget_tracker=budget_tracker, runtime_tool_specs=runtime_tool_specs, state=execution_state, ) workflow = Workflow( tool_runtime=None, tracer=self._tracer, input_schema={"type": "object"}, base_context={"prompt": prompt}, steps=[ LogicStep(step_id="agent_routing_selection", handler=callbacks.run_selection), LogicStep( step_id="agent_routing_delegate", dependencies=("agent_routing_selection",), handler=callbacks.run_delegate, ), ], ) self.workflow = workflow self._agent_routing_runtime = { "budget_tracker": budget_tracker, "execution_state": execution_state, } return workflow def _run_agent_routing( self, *, prompt: str, request_id: str, dependencies: Mapping[str, object], ) -> ExecutionResult: """Router-selection workflow and delegated agent execution. Args: prompt: User prompt to route. request_id: Resolved request id for this orchestration run. dependencies: Normalized dependency mapping for delegates. Returns: Pattern result containing routing decision and delegate output. Raises: RuntimeError: If internal workflow invariants are violated. """ workflow = self._build_workflow( prompt, request_id=request_id, dependencies=dependencies, ) runtime = self._agent_routing_runtime or {} budget_tracker = runtime.get("budget_tracker") execution_state = runtime.get("execution_state") if not isinstance(budget_tracker, WorkflowBudgetTracker) or not isinstance( execution_state, _RoutingExecutionState ): raise RuntimeError("Agent routing runtime state is unavailable before workflow execution.") workflow_result = workflow.run( input={}, execution_mode="sequential", failure_policy="skip_dependents", request_id=f"{request_id}:router_delegate_workflow", dependencies=dependencies, ) return self._finalize_agent_routing_result( workflow_result=workflow_result, budget_tracker=budget_tracker, execution_state=execution_state, request_id=request_id, dependencies=dependencies, ) def _finalize_agent_routing_result( self, *, workflow_result: ExecutionResult, budget_tracker: WorkflowBudgetTracker, execution_state: _RoutingExecutionState, request_id: str, dependencies: Mapping[str, object], ) -> ExecutionResult: """Build the final routed result from a workflow execution.""" workflow_payload = workflow_result.to_dict() raw_workflow_artifacts = workflow_result.output.get("artifacts") workflow_artifacts: list[object] = ( list(raw_workflow_artifacts) if isinstance(raw_workflow_artifacts, Sequence) and not isinstance(raw_workflow_artifacts, (str, bytes)) else [] ) if not workflow_result.success: raise RuntimeError("Agent routing workflow graph execution failed.") router_result = execution_state.router_result if router_result is None: raise RuntimeError("Agent routing selection step did not produce a router result.") selection_step = workflow_result.step_results.get("agent_routing_selection") selection_output = selection_step.output if selection_step is not None else {} delegate_step = workflow_result.step_results.get("agent_routing_delegate") delegate_output = delegate_step.output if delegate_step is not None else {} if not router_result.success: return _build_routing_failure_result( error="Agent routing selection failed.", request_id=request_id, dependencies=dependencies, router_result=router_result, budget_tracker=budget_tracker, stage="agent_routing_selection", terminated_reason=TERMINATED_ROUTING_FAILURE, available_alternatives=sorted(self._alternatives.keys()), workflow_payload=workflow_payload, workflow_artifacts=workflow_artifacts, ) selected_name = str(selection_output.get("selected_name", "")).strip() if delegate_output.get("status") == TERMINATED_UNKNOWN_ALTERNATIVE or selected_name not in self._alternatives: return _build_routing_failure_result( error=f"Agent routing selected unknown agent alternative '{selected_name}'.", request_id=request_id, dependencies=dependencies, router_result=router_result, budget_tracker=budget_tracker, stage="agent_routing_selection", terminated_reason=TERMINATED_UNKNOWN_ALTERNATIVE, available_alternatives=sorted(self._alternatives.keys()), workflow_payload=workflow_payload, workflow_artifacts=workflow_artifacts, ) delegated_result = execution_state.delegated_result if delegated_result is None: return _build_routing_failure_result( error="Agent routing delegate execution did not run.", request_id=request_id, dependencies=dependencies, router_result=router_result, budget_tracker=budget_tracker, stage="agent_routing_delegate", terminated_reason=TERMINATED_ROUTING_FAILURE, available_alternatives=sorted(self._alternatives.keys()), workflow_payload=workflow_payload, workflow_artifacts=workflow_artifacts, ) router_delegate_metadata = { "routing": router_result.metadata.get("routing", {}), "selected_alternative": selected_name, "available_alternatives": sorted(self._alternatives.keys()), } delegated_output = dict(delegated_result.output) delegated_final_output = delegated_output.get("final_output") if not isinstance(delegated_final_output, Mapping): delegated_final_output = dict(delegated_output) result = build_pattern_execution_result( success=delegated_result.success, final_output=dict(delegated_final_output), terminated_reason=delegated_result.terminated_reason or "completed", details={ "selected_alternative": selected_name, "available_alternatives": sorted(self._alternatives.keys()), "delegated_result": dict(delegated_output), "router": router_result.metadata.get("routing", {}), }, workflow_payload=workflow_payload, artifacts=list(workflow_artifacts), request_id=request_id, dependencies=dependencies, mode=MODE_ROUTER_DELEGATE, metadata={ **dict(delegated_result.metadata), "router_delegate": router_delegate_metadata, }, tool_results=list(delegated_result.tool_results), model_response=delegated_result.model_response, error=delegated_result.error, ) return attach_runtime_metadata( agent_result=result, requested_mode=MODE_ROUTER_DELEGATE, resolved_mode=MODE_ROUTER_DELEGATE, budget_metadata=budget_tracker.as_metadata(), extra_metadata={ "workflow": { "execution_order": list(workflow_result.execution_order), } }, )
def _extract_selected_name_from_router_output(output: Mapping[str, object]) -> str: """Extract selected delegate name from multi-step router output payload. Args: output: Router output mapping containing ``step_outputs``. Returns: Normalized selected delegate key, or empty string when unavailable. """ direct_tool_name = output.get("tool_name") if isinstance(direct_tool_name, str) and direct_tool_name.strip(): return direct_tool_name.strip() raw_step_outputs = output.get("step_outputs") if not isinstance(raw_step_outputs, Sequence) or isinstance(raw_step_outputs, (str, bytes)): return "" for raw_step_output in reversed(raw_step_outputs): if not isinstance(raw_step_output, Mapping): continue tool_name = raw_step_output.get("tool_name") if isinstance(tool_name, str) and tool_name.strip(): return tool_name.strip() return "" __all__ = [ "RouterDelegatePattern", ]