Source code for design_research_agents._implementations._patterns._router_delegate_pattern

"""Reusable intent/agent-routing orchestration chunk."""

from __future__ import annotations

from collections.abc import Mapping, Sequence
from dataclasses import dataclass, field

from design_research_agents._contracts._delegate import Delegate, ExecutionResult
from design_research_agents._contracts._llm import LLMClient
from design_research_agents._contracts._termination import (
    TERMINATED_ROUTING_FAILURE,
    TERMINATED_UNKNOWN_ALTERNATIVE,
)
from design_research_agents._contracts._tools import ToolResult, ToolRuntime, ToolSpec
from design_research_agents._contracts._workflow import (
    DelegateStep,
    DelegateTarget,
    LogicStep,
    WorkflowExecutionMode,
    WorkflowFailurePolicy,
)
from design_research_agents._runtime._common._delegate_invocation import invoke_delegate
from design_research_agents._runtime._patterns import (
    MODE_ROUTER_DELEGATE,
    WorkflowBudgetTracker,
    attach_runtime_metadata,
    build_compiled_pattern_execution,
    build_pattern_execution_result,
    normalize_request_id_prefix,
    resolve_pattern_run_context,
)
from design_research_agents._skills import (
    SkillsConfig,
    SkillsContext,
    SkillsToolRuntimeAdapter,
    merge_skills_metadata,
    resolve_skills_context,
)
from design_research_agents._tracing import Tracer
from design_research_agents.workflow import CompiledExecution
from design_research_agents.workflow.workflow import Workflow

from .._shared._agent_internal._agent_routing_runtime_adapter import (
    AgentRoutingToolRuntimeAdapter,
)
from .._shared._agent_internal._json_action_step_runner import (
    JsonActionStepRunner,
)
from .._shared._agent_internal._prompt_overrides import (
    validate_prompt_text,
)

_ROUTING_FAILURE_ROUTE = "__routing_failure__"
_UNKNOWN_ALTERNATIVE_ROUTE = "__unknown_alternative__"


def _build_alternative_step_ids(alternative_names: Sequence[str]) -> dict[str, str]:
    """Return stable workflow step ids for router alternatives."""
    step_ids: dict[str, str] = {}
    used_step_ids: set[str] = set()
    for index, alternative_name in enumerate(alternative_names, start=1):
        sanitized_name = "".join(
            character.lower() if character.isalnum() else "_" for character in alternative_name.strip()
        ).strip("_")
        if not sanitized_name:
            sanitized_name = f"alternative_{index}"
        candidate_step_id = f"agent_routing_delegate_{sanitized_name}"
        suffix = 2
        while candidate_step_id in used_step_ids:
            candidate_step_id = f"agent_routing_delegate_{sanitized_name}_{suffix}"
            suffix += 1
        used_step_ids.add(candidate_step_id)
        step_ids[alternative_name] = candidate_step_id
    return step_ids


def _resolve_delegate_workflow_for_diagram(
    *,
    delegate: DelegateTarget,
    prompt: str,
    request_id: str,
    dependencies: Mapping[str, object],
) -> Workflow | None:
    """Return a compiled workflow for delegate diagram expansion when available."""
    direct_workflow = delegate if isinstance(delegate, Workflow) else None
    if direct_workflow is not None:
        return direct_workflow

    existing_workflow = getattr(delegate, "workflow", None)
    if isinstance(existing_workflow, Workflow):
        return existing_workflow

    compile_callable = getattr(delegate, "compile", None)
    if not callable(compile_callable):
        return None

    try:
        compiled = compile_callable(
            prompt,
            request_id=f"{request_id}:diagram",
            dependencies=dependencies,
        )
    except Exception:
        fallback_workflow = getattr(delegate, "workflow", None)
        return fallback_workflow if isinstance(fallback_workflow, Workflow) else None

    compiled_workflow = getattr(compiled, "workflow", None)
    if isinstance(compiled_workflow, Workflow):
        return compiled_workflow
    fallback_workflow = getattr(delegate, "workflow", None)
    return fallback_workflow if isinstance(fallback_workflow, Workflow) else None


@dataclass(slots=True, kw_only=True)
class _RoutingExecutionState:
    """Mutable state shared between routing workflow callbacks."""

    router_result: ExecutionResult | None = None
    """Result produced by the router selection step, if it ran."""

    delegated_result: ExecutionResult | None = None
    """Result produced by the selected delegate step, if it ran."""

    router_tool_results: list[ToolResult] = field(default_factory=list)
    """Tool results produced by router selection, including optional skill activation."""


class _AlternativeDelegateRunner:
    """Workflow-runner wrapper that preserves router pattern delegate semantics."""

    def __init__(
        self,
        *,
        selected_name: str,
        delegate: DelegateTarget,
        prompt: str,
        request_id: str,
        dependencies: Mapping[str, object],
        state: _RoutingExecutionState,
        budget_tracker: WorkflowBudgetTracker,
        runtime_tool_specs: Mapping[str, ToolSpec],
    ) -> None:
        self._selected_name = selected_name
        self._delegate = delegate
        self._state = state
        self._budget_tracker = budget_tracker
        self._runtime_tool_specs = runtime_tool_specs
        self.workflow = _resolve_delegate_workflow_for_diagram(
            delegate=delegate,
            prompt=prompt,
            request_id=request_id,
            dependencies=dependencies,
        )

    def run(
        self,
        *,
        context: Mapping[str, object] | None = None,
        execution_mode: WorkflowExecutionMode = "dag",
        failure_policy: WorkflowFailurePolicy = "skip_dependents",
        request_id: str | None = None,
        dependencies: Mapping[str, object] | None = None,
    ) -> ExecutionResult:
        """Invoke the wrapped delegate while keeping the outer workflow step green."""
        normalized_context = dict(context or {})
        prompt_value = normalized_context.get("prompt")
        if not isinstance(prompt_value, str) or not prompt_value.strip():
            raise ValueError("Router delegate runner requires a non-empty prompt in context.")
        resolved_request_id = str(request_id) if request_id is not None else f"router_delegate:{self._selected_name}"
        delegate_invocation = invoke_delegate(
            delegate=self._delegate,
            prompt=prompt_value,
            step_context=normalized_context,
            request_id=resolved_request_id,
            execution_mode=execution_mode,
            failure_policy=failure_policy,
            dependencies=dict(dependencies or {}),
        )
        self._state.delegated_result = delegate_invocation.result
        delegated_result = self._state.delegated_result
        self._budget_tracker.add_model_response(delegated_result.model_response)
        self._budget_tracker.add_tool_results(
            tool_results=delegated_result.tool_results,
            tool_specs=self._runtime_tool_specs,
        )
        return ExecutionResult(
            success=True,
            output={
                "status": "delegated",
                "selected_name": self._selected_name,
                "delegated_success": delegated_result.success,
                "delegate_type": delegate_invocation.delegate_type,
            },
            metadata={
                "selected_alternative": self._selected_name,
                "delegate_type": delegate_invocation.delegate_type,
            },
        )


class _RoutingWorkflowCallbacks:
    """Workflow callback bundle for router-selection and fallback steps."""

    def __init__(
        self,
        *,
        router_agent: JsonActionStepRunner,
        prompt: str,
        request_id: str,
        dependencies: Mapping[str, object],
        budget_tracker: WorkflowBudgetTracker,
        alternative_step_ids: Mapping[str, str],
        routing_failure_step_id: str,
        unknown_alternative_step_id: str,
        state: _RoutingExecutionState,
        router_agent_after_activation: JsonActionStepRunner | None = None,
    ) -> None:
        """Store dependencies used by workflow callback methods.

        Args:
            router_agent: Router agent used in selection step.
            prompt: Routed user prompt.
            request_id: Resolved request id.
            dependencies: Normalized dependency mapping.
            budget_tracker: Budget tracker collecting model/tool usage.
            alternative_step_ids: Workflow step ids keyed by alternative name.
            routing_failure_step_id: Fallback step id used when routing selection fails.
            unknown_alternative_step_id: Fallback step id used for unknown alternatives.
            state: Mutable callback state sink.
            router_agent_after_activation: Optional second-pass router used after one
                successful ``skills.activate`` call.
        """
        self._router_agent = router_agent
        self._prompt = prompt
        self._request_id = request_id
        self._dependencies = dependencies
        self._budget_tracker = budget_tracker
        self._alternative_step_ids = dict(alternative_step_ids)
        self._routing_failure_step_id = routing_failure_step_id
        self._unknown_alternative_step_id = unknown_alternative_step_id
        self._state = state
        self._router_agent_after_activation = router_agent_after_activation

    def run_selection(self, context: Mapping[str, object]) -> Mapping[str, object]:
        """Execute router-model selection step.

        Args:
            context: Workflow step context payload (unused).

        Returns:
            Step output describing routing status and selected delegate name.
        """
        del context
        router_result = self._router_agent.run(
            self._prompt,
            request_id=f"{self._request_id}:agent_routing_router",
            dependencies=self._dependencies,
        )
        router_tool_results = list(router_result.tool_results)
        self._budget_tracker.add_model_response(router_result.model_response)
        if (
            self._router_agent_after_activation is not None
            and _selected_tool_name_from_result(router_result) == "skills.activate"
        ):
            activated_prompt = _append_activated_skill_context(
                prompt=self._prompt,
                tool_results=router_tool_results,
            )
            router_result = self._router_agent_after_activation.run(
                activated_prompt,
                request_id=f"{self._request_id}:agent_routing_router_after_skill",
                dependencies=self._dependencies,
            )
            router_tool_results.extend(router_result.tool_results)
            self._budget_tracker.add_model_response(router_result.model_response)
        self._state.router_result = router_result
        self._state.router_tool_results = router_tool_results
        if not router_result.success:
            return {
                "status": TERMINATED_ROUTING_FAILURE,
                "route": _ROUTING_FAILURE_ROUTE,
                "selected_step_id": self._routing_failure_step_id,
                "routing": router_result.metadata.get("routing", {}),
            }

        selected_name = _extract_selected_name_from_router_output(router_result.output)
        selected_step_id = self._alternative_step_ids.get(selected_name)
        if selected_step_id is None:
            return {
                "status": "selected",
                "selected_name": selected_name,
                "route": _UNKNOWN_ALTERNATIVE_ROUTE,
                "selected_step_id": self._unknown_alternative_step_id,
                "routing": router_result.metadata.get("routing", {}),
            }
        return {
            "status": "selected",
            "selected_name": selected_name,
            "route": selected_name,
            "selected_step_id": selected_step_id,
            "routing": router_result.metadata.get("routing", {}),
        }

    def run_routing_failure(self, context: Mapping[str, object]) -> Mapping[str, object]:
        """Emit a stable failure payload for router-model selection failures."""
        selection_output = _extract_selection_output(context) or {}
        return {
            "status": TERMINATED_ROUTING_FAILURE,
            "routing": selection_output.get("routing", {}),
        }

    def run_unknown_alternative(self, context: Mapping[str, object]) -> Mapping[str, object]:
        """Emit a stable failure payload when the router selects an undeclared alternative."""
        selection_output = _extract_selection_output(context)
        if selection_output is None:
            return {
                "status": TERMINATED_ROUTING_FAILURE,
                "routing": {},
            }
        return {
            "status": TERMINATED_UNKNOWN_ALTERNATIVE,
            "selected_name": str(selection_output.get("selected_name", "")).strip(),
            "routing": selection_output.get("routing", {}),
        }


def _extract_selection_output(context: Mapping[str, object]) -> Mapping[str, object] | None:
    """Extract routing selection output from step context.

    Args:
        context: Delegate-step execution context mapping.

    Returns:
        Selection output mapping from ``agent_routing_selection`` when present.
    """
    dependency_results = context.get("dependency_results")
    if not isinstance(dependency_results, Mapping):
        return None
    selection_step = dependency_results.get("agent_routing_selection")
    if not isinstance(selection_step, Mapping):
        return None
    selection_output = selection_step.get("output")
    if not isinstance(selection_output, Mapping):
        return None
    return selection_output


def _build_routing_failure_result(
    *,
    error: str,
    request_id: str,
    dependencies: Mapping[str, object],
    router_result: ExecutionResult,
    budget_tracker: WorkflowBudgetTracker,
    stage: str,
    terminated_reason: str,
    available_alternatives: Sequence[str],
    workflow_payload: Mapping[str, object],
    workflow_artifacts: Sequence[object],
    skills_context: SkillsContext | None,
    tool_results: Sequence[ToolResult],
) -> ExecutionResult:
    """Build one attached routing failure result with stable metadata.

    Args:
        error: Human-readable failure description.
        request_id: Request id for correlation metadata.
        dependencies: Normalized dependency mapping.
        router_result: Router selection result used for carry-forward metadata.
        budget_tracker: Runtime budget tracker for aggregate metadata.
        stage: Internal routing stage where failure occurred.
        terminated_reason: Canonical termination reason.
        available_alternatives: Declared delegate names available to the router.
        workflow_payload: Serialized workflow payload for this routing run.
        workflow_artifacts: Normalized workflow artifact entries.
        skills_context: Optional resolved Agent Skills context.
        tool_results: Tool results already produced during routing selection.

    Returns:
        Execution result carrying normalized routing failure metadata.
    """
    failure = build_pattern_execution_result(
        success=False,
        final_output={},
        terminated_reason=terminated_reason,
        details={
            "selected_alternative": None,
            "available_alternatives": list(available_alternatives),
            "delegated_result": {},
            "router": router_result.metadata.get("routing", {}),
        },
        workflow_payload=dict(workflow_payload),
        artifacts=list(workflow_artifacts),
        request_id=request_id,
        dependencies=dependencies,
        mode=MODE_ROUTER_DELEGATE,
        metadata=merge_skills_metadata(
            metadata={"stage": stage, "routing": router_result.metadata.get("routing", {})},
            skills_context=skills_context,
            tool_results=tool_results,
        ),
        tool_results=list(tool_results),
        model_response=router_result.model_response,
        error=error,
    )
    return attach_runtime_metadata(
        agent_result=failure,
        requested_mode=MODE_ROUTER_DELEGATE,
        resolved_mode=MODE_ROUTER_DELEGATE,
        budget_metadata=budget_tracker.as_metadata(),
        extra_metadata=None,
    )


[docs] class RouterDelegatePattern(Delegate): """Routing/delegation pattern built on workflow primitives.""" def __init__( self, *, llm_client: LLMClient, tool_runtime: ToolRuntime, alternatives: Mapping[str, DelegateTarget], alternative_descriptions: Mapping[str, str] | None = None, router_system_prompt: str | None = None, router_user_prompt_template: str | None = None, default_request_id_prefix: str | None = None, default_dependencies: Mapping[str, object] | None = None, skills: SkillsConfig | None = None, tracer: Tracer | None = None, ) -> None: """Store dependencies and initialize workflow-native routing settings. Args: llm_client: LLM client used by the router agent. tool_runtime: Tool runtime used to cost/metadata-account delegated calls. alternatives: Mapping of route keys to delegate objects. alternative_descriptions: Optional descriptions used to guide routing. router_system_prompt: Optional override for router system prompt. router_user_prompt_template: Optional override for router user prompt. default_request_id_prefix: Optional prefix used to derive request ids. default_dependencies: Dependency defaults merged into each run. skills: Optional Agent Skills configuration. tracer: Optional tracer used for run-level instrumentation. Raises: ValueError: If no valid route alternatives are supplied. """ self._llm_client = llm_client self._tool_runtime = tool_runtime self._tracer = tracer self._skills_context = resolve_skills_context(skills) self.workflow: Workflow | None = None self._agent_routing_runtime: dict[str, object] | None = None self._default_request_id_prefix = normalize_request_id_prefix(default_request_id_prefix) self._default_dependencies = dict(default_dependencies or {}) self._alternatives = { name.strip(): agent for name, agent in alternatives.items() if isinstance(name, str) and name.strip() } if not self._alternatives: raise ValueError("alternatives must include at least one non-empty route key.") self._alternative_descriptions = { name.strip(): description.strip() for name, description in (alternative_descriptions or {}).items() if isinstance(name, str) and name.strip() and isinstance(description, str) and description.strip() } self._router_system_prompt = ( validate_prompt_text( value=router_system_prompt, field_name="router_system_prompt", ) if router_system_prompt is not None else None ) self._router_user_prompt_template = ( validate_prompt_text( value=router_user_prompt_template, field_name="router_user_prompt_template", ) if router_user_prompt_template is not None else None )
[docs] def run( self, prompt: str | object, *, request_id: str | None = None, dependencies: Mapping[str, object] | None = None, ) -> ExecutionResult: """Execute one intent-routing orchestration run.""" return self.compile( prompt=prompt, request_id=request_id, dependencies=dependencies, ).run()
[docs] def compile( self, prompt: str | object, *, request_id: str | None = None, dependencies: Mapping[str, object] | None = None, ) -> CompiledExecution: """Compile one intent-routing orchestration run.""" run_context = resolve_pattern_run_context( prompt=prompt, default_request_id_prefix=self._default_request_id_prefix, default_dependencies=self._default_dependencies, request_id=request_id, dependencies=dependencies, ) workflow = self._build_workflow( run_context.prompt, request_id=run_context.request_id, dependencies=run_context.dependencies, ) runtime = self._agent_routing_runtime or {} budget_tracker = runtime.get("budget_tracker") execution_state = runtime.get("execution_state") if not isinstance(budget_tracker, WorkflowBudgetTracker) or not isinstance( execution_state, _RoutingExecutionState ): raise RuntimeError("Agent routing runtime state is unavailable before workflow execution.") return build_compiled_pattern_execution( workflow=workflow, pattern_name="RouterDelegatePattern", request_id=run_context.request_id, dependencies=run_context.dependencies, tracer=self._tracer, input_payload={**run_context.normalized_input, "mode": MODE_ROUTER_DELEGATE}, workflow_request_id=f"{run_context.request_id}:router_delegate_workflow", finalize=lambda workflow_result: self._finalize_agent_routing_result( workflow_result=workflow_result, budget_tracker=budget_tracker, execution_state=execution_state, request_id=run_context.request_id, dependencies=run_context.dependencies, ), )
def _build_workflow( self, prompt: str, *, request_id: str, dependencies: Mapping[str, object], ) -> Workflow: """Build the routing workflow for one resolved run context.""" budget_tracker = WorkflowBudgetTracker() alternative_step_ids = _build_alternative_step_ids(tuple(self._alternatives)) routing_failure_step_id = "agent_routing_failure" unknown_alternative_step_id = "agent_routing_unknown_alternative" routing_tool_runtime = AgentRoutingToolRuntimeAdapter( alternatives=self._alternatives, descriptions=self._alternative_descriptions, ) automatic_activation_enabled = bool( self._skills_context is not None and self._skills_context.config.allow_automatic_activation and self._skills_context.discovered_skill_names ) router_skills_context = self._skills_context router_agent_after_activation: JsonActionStepRunner | None = None routed_tool_runtime: ToolRuntime = routing_tool_runtime if automatic_activation_enabled and self._skills_context is not None: routed_tool_runtime = SkillsToolRuntimeAdapter( wrapped_runtime=routing_tool_runtime, skills_context=self._skills_context, ) router_agent_after_activation = JsonActionStepRunner( llm_client=self._llm_client, tool_runtime=routing_tool_runtime, system_prompt=self._router_system_prompt, user_prompt_template=self._router_user_prompt_template, allowed_tools=tuple(sorted(self._alternatives)), skills_context=_pinned_only_skills_context(self._skills_context), tracer=self._tracer, ) router_agent = JsonActionStepRunner( llm_client=self._llm_client, tool_runtime=routed_tool_runtime, system_prompt=self._router_system_prompt, user_prompt_template=self._router_user_prompt_template, allowed_tools=tuple(sorted(self._alternatives)), skills_context=router_skills_context, tracer=self._tracer, ) runtime_tool_specs: dict[str, ToolSpec] = {spec.name: spec for spec in self._tool_runtime.list_tools()} execution_state = _RoutingExecutionState() callbacks = _RoutingWorkflowCallbacks( router_agent=router_agent, prompt=prompt, request_id=request_id, dependencies=dependencies, budget_tracker=budget_tracker, alternative_step_ids=alternative_step_ids, routing_failure_step_id=routing_failure_step_id, unknown_alternative_step_id=unknown_alternative_step_id, state=execution_state, router_agent_after_activation=router_agent_after_activation, ) routed_delegate_steps = [ DelegateStep( step_id=step_id, delegate=_AlternativeDelegateRunner( selected_name=alternative_name, delegate=self._alternatives[alternative_name], prompt=prompt, request_id=request_id, dependencies=dependencies, state=execution_state, budget_tracker=budget_tracker, runtime_tool_specs=runtime_tool_specs, ), dependencies=("agent_routing_selection",), prompt=prompt, ) for alternative_name, step_id in alternative_step_ids.items() ] workflow = Workflow( tool_runtime=None, tracer=self._tracer, input_schema={"type": "object"}, base_context={"prompt": prompt}, steps=[ LogicStep( step_id="agent_routing_selection", handler=callbacks.run_selection, route_map={ **{name: (step_id,) for name, step_id in alternative_step_ids.items()}, _ROUTING_FAILURE_ROUTE: (routing_failure_step_id,), _UNKNOWN_ALTERNATIVE_ROUTE: (unknown_alternative_step_id,), }, ), *routed_delegate_steps, LogicStep( step_id=routing_failure_step_id, dependencies=("agent_routing_selection",), handler=callbacks.run_routing_failure, ), LogicStep( step_id=unknown_alternative_step_id, dependencies=("agent_routing_selection",), handler=callbacks.run_unknown_alternative, ), ], ) self.workflow = workflow self._agent_routing_runtime = { "budget_tracker": budget_tracker, "execution_state": execution_state, } return workflow def _run_agent_routing( self, *, prompt: str, request_id: str, dependencies: Mapping[str, object], ) -> ExecutionResult: """Router-selection workflow and delegated agent execution. Args: prompt: User prompt to route. request_id: Resolved request id for this orchestration run. dependencies: Normalized dependency mapping for delegates. Returns: Pattern result containing routing decision and delegate output. Raises: RuntimeError: If internal workflow invariants are violated. """ workflow = self._build_workflow( prompt, request_id=request_id, dependencies=dependencies, ) runtime = self._agent_routing_runtime or {} budget_tracker = runtime.get("budget_tracker") execution_state = runtime.get("execution_state") if not isinstance(budget_tracker, WorkflowBudgetTracker) or not isinstance( execution_state, _RoutingExecutionState ): raise RuntimeError("Agent routing runtime state is unavailable before workflow execution.") workflow_result = workflow.run( input={}, execution_mode="sequential", failure_policy="skip_dependents", request_id=f"{request_id}:router_delegate_workflow", dependencies=dependencies, ) return self._finalize_agent_routing_result( workflow_result=workflow_result, budget_tracker=budget_tracker, execution_state=execution_state, request_id=request_id, dependencies=dependencies, ) def _finalize_agent_routing_result( self, *, workflow_result: ExecutionResult, budget_tracker: WorkflowBudgetTracker, execution_state: _RoutingExecutionState, request_id: str, dependencies: Mapping[str, object], ) -> ExecutionResult: """Build the final routed result from a workflow execution.""" workflow_payload = workflow_result.to_dict() raw_workflow_artifacts = workflow_result.output.get("artifacts") workflow_artifacts: list[object] = ( list(raw_workflow_artifacts) if isinstance(raw_workflow_artifacts, Sequence) and not isinstance(raw_workflow_artifacts, (str, bytes)) else [] ) if not workflow_result.success: raise RuntimeError("Agent routing workflow graph execution failed.") router_result = execution_state.router_result if router_result is None: raise RuntimeError("Agent routing selection step did not produce a router result.") selection_step = workflow_result.step_results.get("agent_routing_selection") selection_output = selection_step.output if selection_step is not None else {} selected_step_id = str(selection_output.get("selected_step_id", "")).strip() delegate_step = workflow_result.step_results.get(selected_step_id) if selected_step_id else None delegate_output = delegate_step.output if delegate_step is not None else {} if not router_result.success: return _build_routing_failure_result( error="Agent routing selection failed.", request_id=request_id, dependencies=dependencies, router_result=router_result, budget_tracker=budget_tracker, stage="agent_routing_selection", terminated_reason=TERMINATED_ROUTING_FAILURE, available_alternatives=sorted(self._alternatives.keys()), workflow_payload=workflow_payload, workflow_artifacts=workflow_artifacts, skills_context=self._skills_context, tool_results=execution_state.router_tool_results, ) selected_name = str(selection_output.get("selected_name", "")).strip() if delegate_output.get("status") == TERMINATED_UNKNOWN_ALTERNATIVE or selected_name not in self._alternatives: return _build_routing_failure_result( error=f"Agent routing selected unknown agent alternative '{selected_name}'.", request_id=request_id, dependencies=dependencies, router_result=router_result, budget_tracker=budget_tracker, stage="agent_routing_selection", terminated_reason=TERMINATED_UNKNOWN_ALTERNATIVE, available_alternatives=sorted(self._alternatives.keys()), workflow_payload=workflow_payload, workflow_artifacts=workflow_artifacts, skills_context=self._skills_context, tool_results=execution_state.router_tool_results, ) delegated_result = execution_state.delegated_result if delegated_result is None: return _build_routing_failure_result( error="Agent routing delegate execution did not run.", request_id=request_id, dependencies=dependencies, router_result=router_result, budget_tracker=budget_tracker, stage=selected_step_id or "agent_routing_delegate", terminated_reason=TERMINATED_ROUTING_FAILURE, available_alternatives=sorted(self._alternatives.keys()), workflow_payload=workflow_payload, workflow_artifacts=workflow_artifacts, skills_context=self._skills_context, tool_results=execution_state.router_tool_results, ) router_delegate_metadata = { "routing": router_result.metadata.get("routing", {}), "selected_alternative": selected_name, "available_alternatives": sorted(self._alternatives.keys()), } combined_tool_results = [ *execution_state.router_tool_results, *list(delegated_result.tool_results), ] delegated_output = dict(delegated_result.output) delegated_final_output = delegated_output.get("final_output") if not isinstance(delegated_final_output, Mapping): delegated_final_output = dict(delegated_output) result = build_pattern_execution_result( success=delegated_result.success, final_output=dict(delegated_final_output), terminated_reason=delegated_result.terminated_reason or "completed", details={ "selected_alternative": selected_name, "available_alternatives": sorted(self._alternatives.keys()), "delegated_result": dict(delegated_output), "router": router_result.metadata.get("routing", {}), }, workflow_payload=workflow_payload, artifacts=list(workflow_artifacts), request_id=request_id, dependencies=dependencies, mode=MODE_ROUTER_DELEGATE, metadata=merge_skills_metadata( metadata={ **dict(delegated_result.metadata), "router_delegate": router_delegate_metadata, }, skills_context=self._skills_context, tool_results=combined_tool_results, ), tool_results=combined_tool_results, model_response=delegated_result.model_response, error=delegated_result.error, ) return attach_runtime_metadata( agent_result=result, requested_mode=MODE_ROUTER_DELEGATE, resolved_mode=MODE_ROUTER_DELEGATE, budget_metadata=budget_tracker.as_metadata(), extra_metadata={ "workflow": { "execution_order": list(workflow_result.execution_order), } }, )
def _extract_selected_name_from_router_output(output: Mapping[str, object]) -> str: """Extract selected delegate name from multi-step router output payload. Args: output: Router output mapping containing ``step_outputs``. Returns: Normalized selected delegate key, or empty string when unavailable. """ direct_tool_name = output.get("tool_name") if isinstance(direct_tool_name, str) and direct_tool_name.strip(): return direct_tool_name.strip() raw_step_outputs = output.get("step_outputs") if not isinstance(raw_step_outputs, Sequence) or isinstance(raw_step_outputs, (str, bytes)): return "" for raw_step_output in reversed(raw_step_outputs): if not isinstance(raw_step_output, Mapping): continue tool_name = raw_step_output.get("tool_name") if isinstance(tool_name, str) and tool_name.strip(): return tool_name.strip() return "" def _selected_tool_name_from_result(result: ExecutionResult) -> str: """Extract the selected tool name from one router-agent result.""" return _extract_selected_name_from_router_output(result.output) def _append_activated_skill_context( *, prompt: str, tool_results: Sequence[ToolResult], ) -> str: """Append one activated skill block to the router prompt when available.""" for tool_result in tool_results: if tool_result.tool_name != "skills.activate" or not tool_result.ok: continue payload = tool_result.result_dict() name = str(payload.get("name", "")).strip() description = str(payload.get("description", "")).strip() instructions = str(payload.get("instructions", "")).strip() skill_root = str(payload.get("skill_root", "")).strip() compatibility_raw = payload.get("compatibility") compatibility = ( ", ".join(str(item) for item in compatibility_raw) if isinstance( compatibility_raw, list, ) else "(none)" ) skill_block = "\n".join( [ "Activated routing skill:", f'<active_skill name="{name}" root="{skill_root}">', f"description: {description}", f"compatibility: {compatibility}", "instructions:", instructions, "</active_skill>", "Use the activated routing skill when choosing the best alternative.", ] ).strip() if skill_block: return f"{prompt.rstrip()}\n\n{skill_block}" return prompt def _pinned_only_skills_context(skills_context: SkillsContext) -> SkillsContext: """Return one derived skills context that suppresses automatic activation.""" return SkillsContext( config=SkillsConfig( project_root=skills_context.config.project_root, extra_paths=skills_context.config.extra_paths, pinned_skills=skills_context.config.pinned_skills, catalog_prompt_target=skills_context.config.catalog_prompt_target, allow_automatic_activation=False, ), catalog=skills_context.catalog, pinned_skills=skills_context.pinned_skills, ) __all__ = [ "RouterDelegatePattern", ]