Source code for design_research_agents._implementations._patterns._ralph_loop_pattern

"""RALPH-style dynamic role loop pattern built on workflow primitives."""

from __future__ import annotations

import json
from collections.abc import Mapping, Sequence
from dataclasses import dataclass
from typing import Literal

from design_research_agents._contracts._delegate import Delegate, ExecutionResult
from design_research_agents._contracts._workflow import (
    DelegateTarget,
    LogicStep,
    LoopStep,
)
from design_research_agents._runtime._common._delegate_invocation import invoke_delegate
from design_research_agents._runtime._patterns import (
    MODE_RALPH_LOOP,
    build_compiled_pattern_execution,
    build_loop_callbacks,
    build_pattern_execution_result,
    resolve_pattern_run_context,
    wrap_iteration_handler,
)
from design_research_agents._tracing import Tracer
from design_research_agents.workflow import CompiledExecution, Workflow


[docs] class RalphLoopPattern(Delegate): """Dynamic-role loop with evaluator-threshold termination."""
[docs] @dataclass(slots=True, frozen=True, kw_only=True) class RoleSpec: """One ordered role participating in each iteration.""" role_id: str delegate: DelegateTarget prompt_template: str | None = None
[docs] @dataclass(slots=True, frozen=True, kw_only=True) class LoopConfig: """Typed loop configuration for Ralph loop execution.""" max_iterations: int = 3 consensus_threshold: float = 0.8 selection_strategy: Literal["best_score", "latest"] = "best_score"
def __init__( self, *, roles: Sequence[RoleSpec], evaluator_role_id: str, loop_config: LoopConfig | None = None, tracer: Tracer | None = None, ) -> None: """Store role lineup and loop settings.""" normalized_roles = self._validate_roles(roles) if evaluator_role_id not in {role.role_id for role in normalized_roles}: raise ValueError("evaluator_role_id must match one configured role_id.") resolved_config = loop_config or self.LoopConfig() if resolved_config.max_iterations < 1: raise ValueError("loop_config.max_iterations must be >= 1.") if not (0.0 <= resolved_config.consensus_threshold <= 1.0): raise ValueError("loop_config.consensus_threshold must be in [0, 1].") if resolved_config.selection_strategy not in {"best_score", "latest"}: raise ValueError("loop_config.selection_strategy must be one of: best_score, latest.") self._roles = normalized_roles self._evaluator_role_id = evaluator_role_id self._loop_config = resolved_config self._tracer = tracer self.workflow: Workflow | None = None @staticmethod def _validate_roles(roles: Sequence[RoleSpec]) -> tuple[RoleSpec, ...]: """Normalize and validate role list.""" normalized: list[RalphLoopPattern.RoleSpec] = [] seen: set[str] = set() for role in roles: role_id = role.role_id.strip() if not role_id: raise ValueError("roles must include non-empty role_id values.") if role_id in seen: raise ValueError("roles must have unique role_id values.") seen.add(role_id) normalized.append( RalphLoopPattern.RoleSpec( role_id=role_id, delegate=role.delegate, prompt_template=role.prompt_template, ) ) if not normalized: raise ValueError("roles must include at least one role.") return tuple(normalized)
[docs] def run( self, prompt: str | object, *, request_id: str | None = None, dependencies: Mapping[str, object] | None = None, ) -> ExecutionResult: """Execute the Ralph loop pattern.""" return self.compile( prompt=prompt, request_id=request_id, dependencies=dependencies, ).run()
[docs] def compile( self, prompt: str | object, *, request_id: str | None = None, dependencies: Mapping[str, object] | None = None, ) -> CompiledExecution: """Compile one Ralph loop workflow.""" run_context = resolve_pattern_run_context( prompt=prompt, default_request_id_prefix=None, default_dependencies={}, request_id=request_id, dependencies=dependencies, ) workflow = self._build_workflow( run_context.prompt, request_id=run_context.request_id, dependencies=run_context.dependencies, ) return build_compiled_pattern_execution( workflow=workflow, pattern_name="RalphLoopPattern", request_id=run_context.request_id, dependencies=run_context.dependencies, tracer=self._tracer, input_payload={ **run_context.normalized_input, "mode": MODE_RALPH_LOOP, "evaluator_role_id": self._evaluator_role_id, "max_iterations": self._loop_config.max_iterations, "consensus_threshold": self._loop_config.consensus_threshold, "selection_strategy": self._loop_config.selection_strategy, "roles": [role.role_id for role in self._roles], }, workflow_request_id=f"{run_context.request_id}:ralph_loop_workflow", finalize=lambda workflow_result: self._finalize_result( workflow_result=workflow_result, request_id=run_context.request_id, dependencies=run_context.dependencies, ), )
def _build_workflow( self, prompt: str, *, request_id: str, dependencies: Mapping[str, object], ) -> Workflow: """Build workflow graph for one Ralph loop run.""" def _run_iteration(context: Mapping[str, object]) -> Mapping[str, object]: def _build_role_failure_state( *, failure_role: str, failure_error: str, current_role_results: Mapping[str, Mapping[str, object]], ) -> Mapping[str, object]: loop_state = context.get("loop_state") current_state = dict(loop_state) if isinstance(loop_state, Mapping) else {} history = ( list(current_state.get("iteration_history", [])) if isinstance( current_state.get("iteration_history"), list, ) else [] ) history.append( { "iteration": self._resolve_iteration(context), "roles": _json_ready(current_role_results), "consensus_score": 0.0, "failed_role": failure_role, "error": failure_error, } ) return { **current_state, "iteration_history": history, "role_outputs": _json_ready(current_role_results), "failure_role": failure_role, "failure_error": failure_error, "should_continue": False, "terminated_reason": "role_failure", } current_state = _as_dict(context.get("loop_state")) previous_selected_output = _compact_role_output(_as_dict(current_state.get("selected_output"))) previous_role_outputs = _compact_role_results(_as_dict(current_state.get("role_outputs"))) role_results: dict[str, dict[str, object]] = {} selected_output_for_prompt = dict(previous_selected_output) iteration = self._resolve_iteration(context) for role in self._roles: prompt_role_outputs = { **previous_role_outputs, **role_results, } try: role_prompt = self._render_role_prompt( prompt=prompt, role=role, iteration=iteration, selected_output=selected_output_for_prompt, prior_role_outputs=prompt_role_outputs, ) delegate_invocation = invoke_delegate( delegate=role.delegate, prompt=role_prompt, step_context=context, request_id=f"{request_id}:ralph_loop:loop:{iteration}:role:{role.role_id}", execution_mode="sequential", failure_policy="skip_dependents", dependencies=dependencies, ) delegate_result = delegate_invocation.result compact_output = _compact_role_output(delegate_result.output) role_error = None if not delegate_result.success: role_error = _extract_execution_error( result=delegate_result, fallback="Role call failed.", ) role_results[role.role_id] = { "success": delegate_result.success, "error": role_error, "output": compact_output, } if delegate_result.success and role.role_id != self._evaluator_role_id and compact_output: selected_output_for_prompt = dict(compact_output) except Exception as exc: role_results[role.role_id] = { "success": False, "error": str(exc), "output": {}, } failure_role, failure_error = self._find_role_failure(role_results) if failure_role is not None: return _build_role_failure_state( failure_role=failure_role, failure_error=failure_error or "Role call failed.", current_role_results=role_results, ) consensus_score, evaluator_error = self._extract_evaluator_score(role_results) if evaluator_error is not None: normalized_role_results = { role_id: dict(result) if isinstance(result, Mapping) else {} for role_id, result in role_results.items() } evaluator_result = dict(normalized_role_results.get(self._evaluator_role_id, {})) evaluator_result["success"] = False evaluator_result["error"] = evaluator_error evaluator_result["output"] = _as_dict(evaluator_result.get("output")) normalized_role_results[self._evaluator_role_id] = evaluator_result return _build_role_failure_state( failure_role=self._evaluator_role_id, failure_error=evaluator_error, current_role_results=normalized_role_results, ) synthesized_output = _resolve_synthesized_output( role_results, ordered_roles=self._roles, evaluator_role_id=self._evaluator_role_id, ) loop_state = context.get("loop_state") current_state = _as_dict(loop_state) history = _as_list(current_state.get("iteration_history")) best_score = _safe_float(current_state.get("best_score")) best_output = _as_dict(current_state.get("best_output")) best_iteration = _safe_int(current_state.get("best_iteration")) current_iteration = self._resolve_iteration(context) if consensus_score >= best_score: best_score = consensus_score best_output = dict(synthesized_output) best_iteration = current_iteration selected_output = ( dict(synthesized_output) if self._loop_config.selection_strategy == "latest" else dict(best_output) ) should_continue = consensus_score < self._loop_config.consensus_threshold terminated_reason = "looping" if should_continue else "consensus_reached" history.append( { "iteration": current_iteration, "consensus_score": consensus_score, "selected_output": _json_ready(selected_output), "roles": _json_ready(role_results), } ) return { "role_outputs": _json_ready(role_results), "consensus_score": consensus_score, "best_score": best_score, "best_output": _json_ready(best_output), "best_iteration": best_iteration, "selected_output": _json_ready(selected_output), "iteration_history": history, "failure_role": None, "failure_error": None, "should_continue": should_continue, "terminated_reason": terminated_reason, } wrapped_handler = wrap_iteration_handler( _run_iteration, error_prefix="RalphLoopPattern iteration", ) loop_callbacks = build_loop_callbacks( iteration_step_id="ralph_iteration", iteration_handler=wrapped_handler, ) workflow = Workflow( tool_runtime=None, tracer=self._tracer, input_schema={"type": "object"}, steps=[ LoopStep( step_id="ralph_loop", steps=( LogicStep( step_id="ralph_iteration", handler=loop_callbacks.iteration_handler, ), ), max_iterations=self._loop_config.max_iterations, initial_state={ "role_outputs": {}, "consensus_score": 0.0, "best_score": 0.0, "best_output": {}, "best_iteration": 0, "selected_output": {}, "iteration_history": [], "failure_role": None, "failure_error": None, "should_continue": True, "terminated_reason": "max_iterations_reached", }, continue_predicate=loop_callbacks.continue_predicate, state_reducer=loop_callbacks.state_reducer, execution_mode="sequential", failure_policy="propagate_failed_state", ) ], ) self.workflow = workflow return workflow def _render_role_prompt( self, *, prompt: str, role: RoleSpec, iteration: int, selected_output: Mapping[str, object], prior_role_outputs: Mapping[str, object], ) -> str: """Render one role prompt from compact loop state.""" prompt_payload = { "task": prompt, "iteration": iteration, "role_id": role.role_id, "selected_output": _json_ready(selected_output), "prior_role_outputs": _json_ready(prior_role_outputs), } default_prompt = json.dumps(prompt_payload, ensure_ascii=True, sort_keys=True) if not isinstance(role.prompt_template, str) or not role.prompt_template.strip(): return default_prompt try: return role.prompt_template.format( task=prompt, iteration=iteration, role_id=role.role_id, selected_output_json=json.dumps( _json_ready(selected_output), ensure_ascii=True, sort_keys=True, ), prior_role_outputs_json=json.dumps( _json_ready(prior_role_outputs), ensure_ascii=True, sort_keys=True, ), ) except KeyError as exc: missing = str(exc.args[0]) if exc.args else "unknown" raise ValueError( f"Role prompt template for '{role.role_id}' references unknown variable '{missing}'." ) from exc def _resolve_iteration(self, context: Mapping[str, object]) -> int: """Resolve one-based loop iteration from context metadata.""" loop_meta = context.get("_loop") if isinstance(loop_meta, Mapping): return max(1, _safe_int(loop_meta.get("iteration", 1))) return 1 def _find_role_failure( self, role_results: Mapping[str, Mapping[str, object]], ) -> tuple[str | None, str | None]: """Return first failing role id and message, if any.""" for role in self._roles: result = role_results.get(role.role_id) if not isinstance(result, Mapping): return role.role_id, "Role result missing." if bool(result.get("success", False)): continue error = result.get("error") return role.role_id, str(error) if error is not None else "Role call failed." return None, None def _extract_evaluator_score(self, role_results: Mapping[str, Mapping[str, object]]) -> tuple[float, str | None]: """Extract normalized score from configured evaluator role output.""" evaluator_result = role_results.get(self._evaluator_role_id) if not isinstance(evaluator_result, Mapping): return 0.0, "Evaluator role result is missing." output = evaluator_result.get("output") if not isinstance(output, Mapping): return 0.0, "Evaluator role output is missing." score = _extract_score(output) if score is None: return 0.0, "Evaluator output must include numeric score in `score` or JSON `model_text.score`." return score, None def _finalize_result( self, *, workflow_result: ExecutionResult, request_id: str, dependencies: Mapping[str, object], ) -> ExecutionResult: """Build final execution result for one Ralph loop run.""" loop_step_result = workflow_result.step_results.get("ralph_loop") if loop_step_result is None: raise RuntimeError("Ralph loop step result is missing.") loop_output = loop_step_result.output final_state_raw = loop_output.get("final_state") final_state = dict(final_state_raw) if isinstance(final_state_raw, Mapping) else {} failure_role = final_state.get("failure_role") failure_error = final_state.get("failure_error") terminated_reason = str( final_state.get( "terminated_reason", loop_output.get("terminated_reason", "max_iterations_reached"), ) ) if terminated_reason == "looping": terminated_reason = "max_iterations_reached" selected_output = _as_dict(final_state.get("selected_output")) best_output = _as_dict(final_state.get("best_output")) best_score = _safe_float(final_state.get("best_score")) consensus_score = _safe_float(final_state.get("consensus_score")) history = _as_list(final_state.get("iteration_history")) success = loop_step_result.success and terminated_reason != "role_failure" error = None if terminated_reason == "role_failure": success = False role_text = str(failure_role) if isinstance(failure_role, str) and failure_role else "unknown" error_text = str(failure_error) if isinstance(failure_error, str) and failure_error else "Role failure." error = f"{role_text}: {error_text}" return build_pattern_execution_result( success=success, final_output=dict(selected_output), terminated_reason=terminated_reason, details={ "role_outputs": _json_ready(final_state.get("role_outputs", {})), "iteration_history": _json_ready(history), "consensus_score": consensus_score, "best_score": best_score, "best_output": _json_ready(best_output), "best_iteration": _safe_int(final_state.get("best_iteration")), "selection_strategy": self._loop_config.selection_strategy, "consensus_threshold": self._loop_config.consensus_threshold, "evaluator_role_id": self._evaluator_role_id, "roles": [role.role_id for role in self._roles], }, workflow_payload=workflow_result.to_dict(), artifacts=workflow_result.output.get("artifacts", []), request_id=request_id, dependencies=dependencies, mode=MODE_RALPH_LOOP, metadata={ "max_iterations": self._loop_config.max_iterations, "consensus_threshold": self._loop_config.consensus_threshold, "selection_strategy": self._loop_config.selection_strategy, "evaluator_role_id": self._evaluator_role_id, }, error=error, )
def _resolve_synthesized_output( role_results: Mapping[str, Mapping[str, object]], *, ordered_roles: Sequence[RalphLoopPattern.RoleSpec], evaluator_role_id: str, ) -> dict[str, object]: """Resolve synthesized output from ordered role outputs.""" for role in reversed(ordered_roles): if role.role_id == evaluator_role_id: continue result = role_results.get(role.role_id) if not isinstance(result, Mapping): continue output = result.get("output") if isinstance(output, Mapping) and output: return dict(output) for role in reversed(ordered_roles): result = role_results.get(role.role_id) if not isinstance(result, Mapping): continue output = result.get("output") if isinstance(output, Mapping) and output: return dict(output) return {} def _compact_role_results(results: Mapping[str, object]) -> dict[str, dict[str, object]]: """Compact stored role results into prompt-safe shapes.""" compact: dict[str, dict[str, object]] = {} for role_id, raw_result in results.items(): if not isinstance(raw_result, Mapping): continue compact[str(role_id)] = { "success": bool(raw_result.get("success", False)), "error": raw_result.get("error"), "output": _compact_role_output(_as_dict(raw_result.get("output"))), } return compact def _compact_role_output(output: Mapping[str, object]) -> dict[str, object]: """Return a prompt-safe role output without workflow envelope bloat.""" if not output: return {} final_output = output.get("final_output") compact_from_final = _mapping_from_compact_value(final_output) if compact_from_final: return compact_from_final model_text = output.get("model_text") compact_from_text = _mapping_from_compact_value(model_text) if compact_from_text: return compact_from_text compact: dict[str, object] = {} for key, value in output.items(): if key in {"workflow", "artifacts"}: continue if isinstance(value, Mapping) and value: compact[str(key)] = _json_ready(value) continue if isinstance(value, (list, tuple)) and value: compact[str(key)] = _json_ready(value) continue if isinstance(value, str) and value.strip(): compact[str(key)] = value.strip() continue if isinstance(value, (int, float, bool)): compact[str(key)] = value return compact def _mapping_from_compact_value(value: object) -> dict[str, object]: """Normalize compact role values from mappings, JSON text, or scalars.""" if isinstance(value, Mapping): return {str(key): _json_ready(item) for key, item in value.items()} if isinstance(value, str): stripped = value.strip() if not stripped: return {} try: parsed = json.loads(stripped) except json.JSONDecodeError: return {"text": stripped} if isinstance(parsed, Mapping): return {str(key): _json_ready(item) for key, item in parsed.items()} return {"text": stripped} if isinstance(value, (int, float, bool)): return {"value": value} return {} def _extract_execution_error(*, result: ExecutionResult, fallback: str) -> str: """Extract a deterministic error message from one execution result.""" error = result.error if isinstance(error, str) and error.strip(): return error.strip() if isinstance(result.output, Mapping): output_error = result.output.get("error") if isinstance(output_error, str) and output_error.strip(): return output_error.strip() return fallback def _extract_score(output: Mapping[str, object]) -> float | None: """Extract normalized score from role output mapping.""" score = output.get("score") if isinstance(score, (int, float)): return max(0.0, min(1.0, float(score))) model_text = output.get("model_text") if isinstance(model_text, str): try: parsed = json.loads(model_text) except json.JSONDecodeError: return None if isinstance(parsed, Mapping): parsed_score = parsed.get("score") if isinstance(parsed_score, (int, float)): return max(0.0, min(1.0, float(parsed_score))) return None def _safe_float(value: object) -> float: """Return float value with deterministic fallback.""" if isinstance(value, bool): return float(int(value)) if isinstance(value, (int, float)): return float(value) if isinstance(value, str): try: return float(value.strip()) except ValueError: return 0.0 return 0.0 def _safe_int(value: object) -> int: """Return int value with deterministic fallback.""" if isinstance(value, bool): return int(value) if isinstance(value, int): return value if isinstance(value, float): return int(value) if isinstance(value, str): try: return int(value.strip()) except ValueError: return 0 return 0 def _as_dict(value: object) -> dict[str, object]: """Return mapping-like values as mutable dictionaries.""" if isinstance(value, Mapping): return dict(value) return {} def _as_list(value: object) -> list[object]: """Return list-like values as mutable lists.""" if isinstance(value, list): return list(value) return [] def _json_ready(value: object) -> object: """Recursively normalize values into JSON-safe shapes.""" if isinstance(value, Mapping): return {str(key): _json_ready(item) for key, item in value.items()} if isinstance(value, list): return [_json_ready(item) for item in value] if isinstance(value, tuple): return [_json_ready(item) for item in value] if isinstance(value, (str, int, float, bool)) or value is None: return value return str(value) __all__ = ["RalphLoopPattern"]