Source code for design_research_agents._implementations._patterns._debate_pattern

"""Reusable debate-pattern orchestration chunk."""

from __future__ import annotations

import json
from collections.abc import Mapping
from contextlib import suppress

from design_research_agents._contracts._delegate import Delegate, ExecutionResult
from design_research_agents._contracts._llm import LLMClient, LLMMessage, LLMRequest, LLMResponse
from design_research_agents._contracts._tools import ToolRuntime
from design_research_agents._contracts._workflow import (
    DelegateBatchCall,
    DelegateBatchStep,
    DelegateStep,
    DelegateTarget,
    LogicStep,
    LoopStep,
    ModelStep,
)
from design_research_agents._implementations._agents._direct_llm_call import DirectLLMCall
from design_research_agents._implementations._shared._agent_internal._input_parsing import (
    parse_json_mapping as _parse_json_mapping,
)
from design_research_agents._implementations._shared._agent_internal._model_resolution import (
    resolve_agent_model,
)
from design_research_agents._runtime._patterns import (
    MODE_DEBATE,
    build_compiled_pattern_execution,
    build_pattern_execution_result,
    normalize_request_id_prefix,
    render_prompt_template,
    resolve_pattern_run_context,
    resolve_prompt_override,
)
from design_research_agents._runtime._patterns import (
    extract_call_error as _runtime_extract_call_error,
)
from design_research_agents._runtime._patterns import (
    extract_call_model_response as _runtime_extract_call_model_response,
)
from design_research_agents._runtime._patterns import (
    extract_call_output as _runtime_extract_call_output,
)
from design_research_agents._runtime._patterns import (
    extract_delegate_batch_call_result_from_context as _runtime_extract_delegate_batch_call_result_from_context,
)
from design_research_agents._runtime._patterns import (
    is_call_success as _runtime_is_call_success,
)
from design_research_agents._schemas import (
    SchemaValidationError,
    validate_payload_against_schema,
)
from design_research_agents._tracing import Tracer
from design_research_agents.workflow import CompiledExecution, Workflow

_VERDICT_SCHEMA: dict[str, object] = {
    "type": "object",
    "additionalProperties": False,
    "required": ["winner", "rationale", "synthesis"],
    "properties": {
        "winner": {"type": "string", "enum": ["affirmative", "negative", "tie"]},
        "rationale": {"type": "string"},
        "synthesis": {"type": "string"},
    },
}

_DEFAULT_AFFIRMATIVE_SYSTEM_PROMPT = (
    "You are the affirmative side in a structured debate. Argue for the strongest case in favor of the task."
)
_DEFAULT_AFFIRMATIVE_USER_PROMPT_TEMPLATE = "\n".join(
    [
        "Task: $task_prompt",
        "Round: $round",
        "Opponent argument from prior round:",
        "$opponent_argument",
        "Respond with a concise affirmative argument only.",
    ]
)
_DEFAULT_NEGATIVE_SYSTEM_PROMPT = (
    "You are the negative side in a structured debate. "
    "Argue the strongest case against the task's affirmative position."
)
_DEFAULT_NEGATIVE_USER_PROMPT_TEMPLATE = "\n".join(
    [
        "Task: $task_prompt",
        "Round: $round",
        "Opponent argument this round:",
        "$opponent_argument",
        "Respond with a concise negative argument only.",
    ]
)
_DEFAULT_JUDGE_SYSTEM_PROMPT = "You are a strict debate judge. Return JSON only with winner, rationale, and synthesis."
_DEFAULT_JUDGE_USER_PROMPT_TEMPLATE = "\n".join(
    [
        "Task:",
        "$task_prompt",
        "",
        "Debate rounds (JSON):",
        "$debate_rounds_json",
        "",
        "Pick a winner and provide a concise synthesis.",
    ]
)


class _DebateWorkflowCallbacks:
    """Workflow callback bundle used by debate round and judge steps."""

    def __init__(
        self,
        *,
        pattern: DebatePattern,
        prompt: str,
        request_id: str,
        dependencies: Mapping[str, object],
        affirmative_delegate: DelegateTarget,
        negative_delegate: DelegateTarget,
        judge_delegate: DelegateTarget | None,
        resolved_model: str,
        runtime_state: dict[str, object],
    ) -> None:
        """Store per-run callback dependencies."""
        self._pattern = pattern
        self._prompt = prompt
        self._request_id = request_id
        self._dependencies = dependencies
        self._affirmative_delegate = affirmative_delegate
        self._negative_delegate = negative_delegate
        self._judge_delegate = judge_delegate
        self._resolved_model = resolved_model
        self._runtime_state = runtime_state

    def continue_predicate(self, iteration: int, state: Mapping[str, object]) -> bool:
        """Return whether the debate loop should continue."""
        del iteration
        if not bool(state.get("should_continue", True)):
            return False
        failure_reason = state.get("failure_reason")
        return not (isinstance(failure_reason, str) and failure_reason)

    def build_affirmative_calls(
        self,
        context: Mapping[str, object],
    ) -> list[DelegateBatchCall]:
        """Build affirmative delegate call for one debate round."""
        round_number, _rounds, _prior_aff, prior_negative = _resolve_round_context(context)
        affirmative_prompt = render_prompt_template(
            template_text=self._pattern._affirmative_user_prompt_template,
            variables={
                "task_prompt": self._prompt,
                "round": round_number,
                "opponent_argument": prior_negative,
            },
            field_name="affirmative_user_prompt_template",
        )
        return [
            DelegateBatchCall(
                call_id="affirmative",
                delegate=self._affirmative_delegate,
                prompt=affirmative_prompt,
            )
        ]

    def build_negative_calls(
        self,
        context: Mapping[str, object],
    ) -> list[DelegateBatchCall]:
        """Build negative delegate call for one debate round."""
        round_number, _rounds, _prior_aff, _prior_negative = _resolve_round_context(context)
        affirmative_result = _extract_delegate_batch_call_result(
            context=context,
            dependency_step_id="debate_affirmative_batch",
            call_id="affirmative",
        )
        affirmative_output = _extract_call_output(affirmative_result)
        affirmative_argument = _extract_model_text_from_output(affirmative_output)
        negative_prompt = render_prompt_template(
            template_text=self._pattern._negative_user_prompt_template,
            variables={
                "task_prompt": self._prompt,
                "round": round_number,
                "opponent_argument": affirmative_argument or "(none)",
            },
            field_name="negative_user_prompt_template",
        )
        return [
            DelegateBatchCall(
                call_id="negative",
                delegate=self._negative_delegate,
                prompt=negative_prompt,
            )
        ]

    def build_round_state(self, context: Mapping[str, object]) -> Mapping[str, object]:
        """Build next loop state from affirmative and negative batch results."""
        round_number, rounds, _prior_affirmative, prior_negative = _resolve_round_context(context)

        affirmative_result = _extract_delegate_batch_call_result(
            context=context,
            dependency_step_id="debate_affirmative_batch",
            call_id="affirmative",
        )
        affirmative_response = _extract_call_model_response(affirmative_result)
        if affirmative_response is not None:
            self._runtime_state["last_model_response"] = affirmative_response
        if not _is_call_success(affirmative_result):
            return {
                "rounds": rounds,
                "prior_negative_argument": prior_negative,
                "prior_affirmative_argument": "(none)",
                "should_continue": False,
                "failure_reason": "affirmative_failed",
                "failure_error": _extract_call_error(
                    affirmative_result,
                    fallback_message="Affirmative delegate failed.",
                ),
            }
        affirmative_output = _extract_call_output(affirmative_result)
        affirmative_argument = _extract_model_text_from_output(affirmative_output)

        negative_result = _extract_delegate_batch_call_result(
            context=context,
            dependency_step_id="debate_negative_batch",
            call_id="negative",
        )
        negative_response = _extract_call_model_response(negative_result)
        if negative_response is not None:
            self._runtime_state["last_model_response"] = negative_response
        if not _is_call_success(negative_result):
            return {
                "rounds": rounds,
                "prior_negative_argument": prior_negative,
                "prior_affirmative_argument": affirmative_argument or "(none)",
                "should_continue": False,
                "failure_reason": "negative_failed",
                "failure_error": _extract_call_error(
                    negative_result,
                    fallback_message="Negative delegate failed.",
                ),
            }
        negative_output = _extract_call_output(negative_result)
        negative_argument = _extract_model_text_from_output(negative_output)

        rounds.append(
            {
                "round": round_number,
                "affirmative_argument": affirmative_argument,
                "negative_argument": negative_argument,
            }
        )
        return {
            "rounds": rounds,
            "prior_affirmative_argument": affirmative_argument or "(none)",
            "prior_negative_argument": negative_argument or "(none)",
            "should_continue": True,
            "failure_reason": None,
            "failure_error": None,
        }

    @staticmethod
    def state_reducer(
        state: Mapping[str, object],
        iteration_result: ExecutionResult,
        iteration: int,
    ) -> Mapping[str, object]:
        """Fold one debate round iteration into accumulated loop state."""
        del iteration
        iteration_step = iteration_result.step_results.get("debate_round")
        if iteration_step is None or not getattr(iteration_step, "success", False):
            return dict(state)
        output = getattr(iteration_step, "output", {})
        return dict(output) if isinstance(output, Mapping) else dict(state)

    def build_judge_prompt_from_context(self, context: Mapping[str, object]) -> str:
        """Build judge prompt from loop dependency results."""
        rounds = _extract_rounds_from_context(context)
        return _render_judge_prompt(
            prompt_template=self._pattern._judge_user_prompt_template,
            task_prompt=self._prompt,
            rounds=rounds,
        )

    def build_judge_request(self, context: Mapping[str, object]) -> LLMRequest:
        """Build model request for direct judge invocation."""
        judge_prompt = self.build_judge_prompt_from_context(context)
        judge_messages = [
            LLMMessage(role="system", content=self._pattern._judge_system_prompt),
            LLMMessage(role="user", content=judge_prompt),
        ]
        return LLMRequest(
            messages=judge_messages,
            model=self._resolved_model,
            response_schema=dict(_VERDICT_SCHEMA),
            metadata={
                "agent": "DebatePattern",
                "mode": MODE_DEBATE,
                "phase": "judge",
                "request_id": self._request_id,
            },
            provider_options={
                "agent": "DebatePattern",
                "mode": MODE_DEBATE,
                "phase": "judge",
            },
        )

    def parse_judge_response(
        self,
        response: LLMResponse,
        context: Mapping[str, object],
    ) -> Mapping[str, object]:
        """Parse model judge response into structured verdict payload."""
        del context
        return {"verdict": _parse_json_mapping(response.text)}

    def build_judge_output_from_model(self, context: Mapping[str, object]) -> Mapping[str, object]:
        """Build normalized judge output from direct-model judge step payload."""
        rounds = _extract_rounds_from_context(context)
        judge_step_output = _extract_dependency_output(context, dependency_id="debate_judge_model")
        model_response = _extract_model_response_from_model_step_output(judge_step_output)
        if model_response is not None:
            self._runtime_state["last_model_response"] = model_response

        parsed_payload = judge_step_output.get("parsed")
        parsed_mapping = dict(parsed_payload) if isinstance(parsed_payload, Mapping) else {}
        parsed_verdict = parsed_mapping.get("verdict")
        if not isinstance(parsed_verdict, Mapping):
            return {
                "status": "judge_invalid_json",
                "error": "Debate judge did not return valid JSON output.",
                "rounds": rounds,
                "verdict": None,
            }

        normalized_verdict = dict(parsed_verdict)
        try:
            validate_payload_against_schema(
                payload=normalized_verdict,
                schema=_VERDICT_SCHEMA,
                location="debate_pattern.judge",
            )
        except SchemaValidationError as exc:
            return {
                "status": "judge_invalid_schema",
                "error": f"Debate judge output failed schema validation: {exc}",
                "rounds": rounds,
                "verdict": normalized_verdict,
            }
        return {
            "status": "completed",
            "error": None,
            "rounds": rounds,
            "verdict": normalized_verdict,
        }

    def build_judge_output_from_delegate(
        self,
        context: Mapping[str, object],
    ) -> Mapping[str, object]:
        """Build normalized judge output from delegate judge step payload."""
        rounds = _extract_rounds_from_context(context)
        delegate_payload = _extract_dependency_output(
            context,
            dependency_id="debate_judge_delegate",
        )
        maybe_model_response = delegate_payload.get("model_response")
        if isinstance(maybe_model_response, Mapping):
            with suppress(TypeError):
                self._runtime_state["last_model_response"] = LLMResponse(**dict(maybe_model_response))

        delegate_success = delegate_payload.get("success")
        delegate_output_payload = delegate_payload.get("output")
        delegate_output = dict(delegate_output_payload) if isinstance(delegate_output_payload, Mapping) else {}
        if delegate_success is not True:
            error_text = str(delegate_output.get("error", "Debate judge delegate failed."))
            return {
                "status": "judge_invalid_json",
                "error": error_text,
                "rounds": rounds,
                "verdict": None,
            }

        parsed_verdict = _extract_delegate_verdict(delegate_output)
        if parsed_verdict is None:
            return {
                "status": "judge_invalid_json",
                "error": "Debate judge did not return valid JSON output.",
                "rounds": rounds,
                "verdict": None,
            }

        try:
            validate_payload_against_schema(
                payload=parsed_verdict,
                schema=_VERDICT_SCHEMA,
                location="debate_pattern.judge",
            )
        except SchemaValidationError as exc:
            return {
                "status": "judge_invalid_schema",
                "error": f"Debate judge output failed schema validation: {exc}",
                "rounds": rounds,
                "verdict": parsed_verdict,
            }
        return {
            "status": "completed",
            "error": None,
            "rounds": rounds,
            "verdict": parsed_verdict,
        }


[docs] class DebatePattern(Delegate): """Configured reusable debate pattern with affirmative, negative, and judge phases.""" def __init__( self, *, llm_client: LLMClient, tool_runtime: ToolRuntime, affirmative_delegate: DelegateTarget | None = None, negative_delegate: DelegateTarget | None = None, judge_delegate: DelegateTarget | None = None, max_rounds: int = 3, affirmative_system_prompt: str | None = None, affirmative_user_prompt_template: str | None = None, negative_system_prompt: str | None = None, negative_user_prompt_template: str | None = None, judge_system_prompt: str | None = None, judge_user_prompt_template: str | None = None, default_request_id_prefix: str | None = "debate", default_dependencies: Mapping[str, object] | None = None, tracer: Tracer | None = None, ) -> None: """Store dependencies and initialize prompt defaults.""" if max_rounds < 1: raise ValueError("max_rounds must be >= 1.") self._llm_client = llm_client self._tool_runtime = tool_runtime self._max_rounds = max_rounds self._default_request_id_prefix = normalize_request_id_prefix(default_request_id_prefix) self._default_dependencies = dict(default_dependencies or {}) self._tracer = tracer self.workflow: Workflow | None = None self._affirmative_delegate = affirmative_delegate self._negative_delegate = negative_delegate self._judge_delegate = judge_delegate self._affirmative_system_prompt = resolve_prompt_override( override=affirmative_system_prompt, default_value=_DEFAULT_AFFIRMATIVE_SYSTEM_PROMPT, field_name="affirmative_system_prompt", ) self._affirmative_user_prompt_template = resolve_prompt_override( override=affirmative_user_prompt_template, default_value=_DEFAULT_AFFIRMATIVE_USER_PROMPT_TEMPLATE, field_name="affirmative_user_prompt_template", ) self._negative_system_prompt = resolve_prompt_override( override=negative_system_prompt, default_value=_DEFAULT_NEGATIVE_SYSTEM_PROMPT, field_name="negative_system_prompt", ) self._negative_user_prompt_template = resolve_prompt_override( override=negative_user_prompt_template, default_value=_DEFAULT_NEGATIVE_USER_PROMPT_TEMPLATE, field_name="negative_user_prompt_template", ) self._judge_system_prompt = resolve_prompt_override( override=judge_system_prompt, default_value=_DEFAULT_JUDGE_SYSTEM_PROMPT, field_name="judge_system_prompt", ) self._judge_user_prompt_template = resolve_prompt_override( override=judge_user_prompt_template, default_value=_DEFAULT_JUDGE_USER_PROMPT_TEMPLATE, field_name="judge_user_prompt_template", ) self._debate_runtime_state: dict[str, object] | None = None
[docs] def run( self, prompt: str, *, request_id: str | None = None, dependencies: Mapping[str, object] | None = None, ) -> ExecutionResult: """The debate pattern and return one final judged result.""" return self.compile( prompt=prompt, request_id=request_id, dependencies=dependencies, ).run()
[docs] def compile( self, prompt: str, *, request_id: str | None = None, dependencies: Mapping[str, object] | None = None, ) -> CompiledExecution: """Compile one debate workflow.""" run_context = resolve_pattern_run_context( default_request_id_prefix=self._default_request_id_prefix, default_dependencies=self._default_dependencies, request_id=request_id, dependencies=dependencies, ) input_payload = {"prompt": prompt, "max_rounds": self._max_rounds, "mode": MODE_DEBATE} workflow = self._build_workflow( prompt, request_id=run_context.request_id, dependencies=run_context.dependencies, ) runtime_state = self._debate_runtime_state or {"last_model_response": None} return build_compiled_pattern_execution( workflow=workflow, pattern_name="DebatePattern", request_id=run_context.request_id, dependencies=run_context.dependencies, tracer=self._tracer, input_payload=input_payload, workflow_request_id=f"{run_context.request_id}:debate_workflow", finalize=lambda workflow_result: _build_debate_result( workflow_result=workflow_result, runtime_state=runtime_state, request_id=run_context.request_id, dependencies=run_context.dependencies, ), )
def _build_workflow( self, prompt: str, *, request_id: str, dependencies: Mapping[str, object], ) -> Workflow: """Build the debate workflow for one resolved run context.""" resolved_model = resolve_agent_model(llm_client=self._llm_client) affirmative_delegate = self._affirmative_delegate if affirmative_delegate is None: affirmative_delegate = DirectLLMCall( llm_client=self._llm_client, system_prompt=self._affirmative_system_prompt, tracer=self._tracer, ) negative_delegate = self._negative_delegate if negative_delegate is None: negative_delegate = DirectLLMCall( llm_client=self._llm_client, system_prompt=self._negative_system_prompt, tracer=self._tracer, ) runtime_state: dict[str, object] = {"last_model_response": None} callbacks = _DebateWorkflowCallbacks( pattern=self, prompt=prompt, request_id=request_id, dependencies=dependencies, affirmative_delegate=affirmative_delegate, negative_delegate=negative_delegate, judge_delegate=self._judge_delegate, resolved_model=resolved_model, runtime_state=runtime_state, ) steps: list[LoopStep | DelegateStep | ModelStep | LogicStep] = [ LoopStep( step_id="debate_rounds", steps=( DelegateBatchStep( step_id="debate_affirmative_batch", calls_builder=callbacks.build_affirmative_calls, fail_fast=True, ), DelegateBatchStep( step_id="debate_negative_batch", dependencies=("debate_affirmative_batch",), calls_builder=callbacks.build_negative_calls, fail_fast=True, ), LogicStep( step_id="debate_round", dependencies=("debate_affirmative_batch", "debate_negative_batch"), handler=callbacks.build_round_state, ), ), max_iterations=self._max_rounds, initial_state={ "rounds": [], "prior_affirmative_argument": "(none)", "prior_negative_argument": "(none)", "should_continue": True, "failure_reason": None, "failure_error": None, }, continue_predicate=callbacks.continue_predicate, state_reducer=callbacks.state_reducer, execution_mode="sequential", failure_policy="propagate_failed_state", ) ] if self._judge_delegate is None: steps.extend( [ ModelStep( step_id="debate_judge_model", dependencies=("debate_rounds",), llm_client=self._llm_client, request_builder=callbacks.build_judge_request, response_parser=callbacks.parse_judge_response, ), LogicStep( step_id="debate_judge", dependencies=("debate_rounds", "debate_judge_model"), handler=callbacks.build_judge_output_from_model, ), ] ) else: steps.extend( [ DelegateStep( step_id="debate_judge_delegate", dependencies=("debate_rounds",), delegate=self._judge_delegate, prompt_builder=callbacks.build_judge_prompt_from_context, ), LogicStep( step_id="debate_judge", dependencies=("debate_rounds", "debate_judge_delegate"), handler=callbacks.build_judge_output_from_delegate, ), ] ) workflow = Workflow( tool_runtime=self._tool_runtime, tracer=self._tracer, input_schema={"type": "object"}, steps=steps, ) self.workflow = workflow self._debate_runtime_state = runtime_state return workflow def _run_debate( self, *, prompt: str, request_id: str, dependencies: Mapping[str, object], ) -> ExecutionResult: """Execute debate rounds then judge verdict.""" workflow = self._build_workflow( prompt, request_id=request_id, dependencies=dependencies, ) workflow_result = workflow.run( input={}, execution_mode="sequential", failure_policy="skip_dependents", request_id=f"{request_id}:debate_workflow", dependencies=dependencies, ) return _build_debate_result( workflow_result=workflow_result, runtime_state=self._debate_runtime_state or {"last_model_response": None}, request_id=request_id, dependencies=dependencies, )
def _build_debate_result( *, workflow_result: ExecutionResult, runtime_state: Mapping[str, object], request_id: str, dependencies: Mapping[str, object], ) -> ExecutionResult: """Build final debate output from workflow result payloads.""" round_state = _extract_debate_round_state(workflow_result) round_failure_reason = _normalize_optional_text(round_state.get("failure_reason")) round_failure_error = _normalize_optional_text(round_state.get("failure_error")) raw_rounds = round_state.get("rounds") normalized_rounds = ( [dict(round_item) for round_item in raw_rounds if isinstance(round_item, Mapping)] if isinstance(raw_rounds, list) else [] ) judge_step = workflow_result.step_results.get("debate_judge") judge_output = judge_step.output if judge_step is not None else {} judge_status = str(judge_output.get("status", "judge_invalid_json")) parsed_verdict = judge_output.get("verdict") normalized_verdict = dict(parsed_verdict) if isinstance(parsed_verdict, Mapping) else None workflow_payload = workflow_result.to_dict() workflow_artifacts = workflow_result.output.get("artifacts", []) last_model_response = runtime_state.get("last_model_response") model_response = last_model_response if isinstance(last_model_response, LLMResponse) else None if round_failure_reason is not None: return build_pattern_execution_result( success=False, final_output={}, terminated_reason=round_failure_reason, details={ "debate_rounds": normalized_rounds, "verdict": None, }, workflow_payload=workflow_payload, artifacts=workflow_artifacts, request_id=request_id, dependencies=dependencies, mode=MODE_DEBATE, metadata={"stage": "round", "rounds": len(normalized_rounds)}, tool_results=[], model_response=model_response, error=round_failure_error or "Debate round failed.", requested_mode=MODE_DEBATE, resolved_mode=MODE_DEBATE, ) if judge_status != "completed" or normalized_verdict is None: terminated_reason = ( judge_status if judge_status in {"judge_invalid_json", "judge_invalid_schema"} else "judge_invalid_json" ) raw_error = judge_output.get("error") error_text = ( str(raw_error) if isinstance(raw_error, str) and raw_error.strip() else "Debate judge did not return valid JSON output." ) judge_delegate_step = workflow_result.step_results.get("debate_judge_delegate") if ( error_text == "Debate judge did not return valid JSON output." and judge_delegate_step is not None and isinstance(judge_delegate_step.error, str) and judge_delegate_step.error.strip() ): error_text = judge_delegate_step.error return build_pattern_execution_result( success=False, final_output={}, terminated_reason=terminated_reason, details={ "debate_rounds": normalized_rounds, "verdict": normalized_verdict, }, workflow_payload=workflow_payload, artifacts=workflow_artifacts, request_id=request_id, dependencies=dependencies, mode=MODE_DEBATE, metadata={"stage": "judge", "rounds": len(normalized_rounds)}, tool_results=[], model_response=model_response, error=error_text, requested_mode=MODE_DEBATE, resolved_mode=MODE_DEBATE, ) return build_pattern_execution_result( success=True, final_output=normalized_verdict, terminated_reason="completed", details={ "debate_rounds": normalized_rounds, "verdict": normalized_verdict, }, workflow_payload=workflow_payload, artifacts=workflow_artifacts, request_id=request_id, dependencies=dependencies, mode=MODE_DEBATE, metadata={"rounds": len(normalized_rounds)}, tool_results=[], model_response=model_response, requested_mode=MODE_DEBATE, resolved_mode=MODE_DEBATE, ) def _extract_debate_round_state(workflow_result: ExecutionResult) -> dict[str, object]: """Extract final loop state mapping for debate rounds.""" loop_step_result = workflow_result.step_results.get("debate_rounds") if loop_step_result is None: return {} loop_output = loop_step_result.output final_state = loop_output.get("final_state") return dict(final_state) if isinstance(final_state, Mapping) else {} def _resolve_round_context( context: Mapping[str, object], ) -> tuple[int, list[dict[str, object]], str, str]: """Resolve round metadata and loop state payload.""" loop_meta = context.get("_loop") round_number = 1 if isinstance(loop_meta, Mapping): round_number = max(1, _safe_int(loop_meta.get("iteration", 1))) loop_state = context.get("loop_state") state_mapping = loop_state if isinstance(loop_state, Mapping) else {} raw_rounds = state_mapping.get("rounds") rounds = ( [dict(round_item) for round_item in raw_rounds if isinstance(round_item, Mapping)] if isinstance(raw_rounds, list) else [] ) prior_affirmative = str(state_mapping.get("prior_affirmative_argument", "(none)")) prior_negative = str(state_mapping.get("prior_negative_argument", "(none)")) return round_number, rounds, prior_affirmative, prior_negative def _extract_rounds_from_context(context: Mapping[str, object]) -> list[dict[str, object]]: """Extract normalized round entries from loop dependency results.""" dependency_results = context.get("dependency_results") if not isinstance(dependency_results, Mapping): return [] rounds_step = dependency_results.get("debate_rounds") if not isinstance(rounds_step, Mapping): return [] rounds_output = rounds_step.get("output") if not isinstance(rounds_output, Mapping): return [] final_state = rounds_output.get("final_state") if not isinstance(final_state, Mapping): return [] raw_rounds = final_state.get("rounds") if not isinstance(raw_rounds, list): return [] return [dict(round_item) for round_item in raw_rounds if isinstance(round_item, Mapping)] def _extract_dependency_output( context: Mapping[str, object], *, dependency_id: str, ) -> dict[str, object]: """Extract one dependency output mapping from workflow step context.""" dependency_results = context.get("dependency_results") if not isinstance(dependency_results, Mapping): return {} dependency_payload = dependency_results.get(dependency_id) if not isinstance(dependency_payload, Mapping): return {} dependency_output = dependency_payload.get("output") if isinstance(dependency_output, Mapping): return dict(dependency_output) return {} def _extract_model_response_from_model_step_output( output: Mapping[str, object], ) -> LLMResponse | None: """Deserialize model response from ``ModelStep`` output mapping.""" model_response = output.get("model_response") if isinstance(model_response, LLMResponse): return model_response if isinstance(model_response, Mapping): try: return LLMResponse(**dict(model_response)) except TypeError: return None return None def _extract_delegate_batch_call_result( *, context: Mapping[str, object], dependency_step_id: str, call_id: str, ) -> Mapping[str, object] | None: """Extract one call-result mapping from a batch-step dependency output.""" return _runtime_extract_delegate_batch_call_result_from_context( context=context, dependency_step_id=dependency_step_id, call_id=call_id, ) def _extract_call_output(call_result: Mapping[str, object] | None) -> dict[str, object]: """Extract normalized output payload from one batch call result.""" return _runtime_extract_call_output(call_result) def _extract_model_text_from_output(output: Mapping[str, object]) -> str: """Extract normalized model text from delegate output payload.""" model_text = output.get("model_text") if isinstance(model_text, str): return model_text.strip() final_output = output.get("final_output") if isinstance(final_output, str): return final_output.strip() if isinstance(final_output, Mapping): text_value = final_output.get("message") if isinstance(text_value, str): return text_value.strip() return "" def _extract_call_model_response(call_result: Mapping[str, object] | None) -> LLMResponse | None: """Deserialize model response from one batch call result.""" return _runtime_extract_call_model_response(call_result) def _is_call_success(call_result: Mapping[str, object] | None) -> bool: """Return whether one batch call result succeeded.""" return _runtime_is_call_success(call_result) def _extract_call_error( call_result: Mapping[str, object] | None, *, fallback_message: str, ) -> str: """Extract one human-readable error from a batch call result.""" return _runtime_extract_call_error(call_result, fallback_message=fallback_message) def _render_judge_prompt( *, prompt_template: str, task_prompt: str, rounds: list[dict[str, object]], ) -> str: """Render judge prompt from task prompt and normalized rounds.""" return render_prompt_template( template_text=prompt_template, variables={ "task_prompt": task_prompt, "debate_rounds_json": json.dumps(rounds, ensure_ascii=True, sort_keys=True), }, field_name="judge_user_prompt_template", ) def _extract_delegate_verdict(output: Mapping[str, object]) -> dict[str, object] | None: """Extract verdict payload from delegate output mappings or model text.""" if {"winner", "rationale", "synthesis"}.issubset(output.keys()): return dict(output) final_output = output.get("final_output") if isinstance(final_output, Mapping) and {"winner", "rationale", "synthesis"}.issubset(final_output.keys()): return dict(final_output) if isinstance(final_output, str): parsed_final = _parse_json_mapping(final_output) if parsed_final is not None: return parsed_final model_text = output.get("model_text") if isinstance(model_text, str): return _parse_json_mapping(model_text) return None def _normalize_optional_text(value: object) -> str | None: """Normalize optional text value.""" if not isinstance(value, str): return None normalized = value.strip() return normalized or None def _safe_int(value: object) -> int: """Convert values to int with deterministic fallback to one.""" if isinstance(value, bool): return int(value) if isinstance(value, int): return value if isinstance(value, float): return int(value) if isinstance(value, str): try: return int(value.strip()) except ValueError: return 1 return 1 __all__ = [ "DebatePattern", ]