Source code for design_research_agents._tracing._analysis

"""Simple trace-analysis harness for JSONL trace artifacts."""

from __future__ import annotations

import argparse
import json
from collections import Counter, defaultdict
from collections.abc import Iterable, Mapping, Sequence
from pathlib import Path
from typing import Any


[docs] def analyze_trace_dir( *, trace_dir: Path, glob_pattern: str = "run_*.jsonl", top_n: int = 10, ) -> dict[str, object]: """Analyze trace JSONL files under one directory.""" trace_files = sorted(trace_dir.glob(glob_pattern)) events: list[dict[str, object]] = [] malformed_lines = 0 for trace_path in trace_files: file_events, file_malformed = _read_trace_file(trace_path) events.extend(file_events) malformed_lines += file_malformed event_counts = Counter(_event_type(event) for event in events) # Prefer newer "*Observed" events when available; fall back to legacy names for compatibility. has_model_request_observed = event_counts.get("ModelRequestObserved", 0) > 0 has_model_response_observed = event_counts.get("ModelResponseObserved", 0) > 0 has_tool_observed = event_counts.get("ToolInvocationObserved", 0) > 0 has_tool_result_observed = event_counts.get("ToolResultObserved", 0) > 0 has_step_result_observed = event_counts.get("WorkflowStepResultObserved", 0) > 0 run_outcomes = _resolve_run_outcomes(events) model_metrics = _build_model_metrics( events, prefer_request_observed=has_model_request_observed, prefer_response_observed=has_model_response_observed, top_n=top_n, ) tool_metrics = _build_tool_metrics( events, prefer_observed=has_tool_observed, prefer_result_observed=has_tool_result_observed, top_n=top_n, ) workflow_metrics = _build_workflow_step_metrics(events, prefer_observed=has_step_result_observed) latency_metrics = _build_latency_metrics(events, top_n=top_n) error_metrics = _build_error_metrics(events, top_n=top_n) return { "inputs": { "trace_dir": str(trace_dir), "glob": glob_pattern, "top_n": top_n, "files_scanned": len(trace_files), "files": [str(path) for path in trace_files], }, "runs": run_outcomes, "events": { "total": len(events), "counts_by_type": dict(sorted(event_counts.items())), "malformed_lines": malformed_lines, }, "models": model_metrics, "tools": tool_metrics, "workflow_steps": workflow_metrics, "latency": latency_metrics, "errors": error_metrics, "warnings": { "malformed_lines": malformed_lines, }, }
def _read_trace_file(path: Path) -> tuple[list[dict[str, object]], int]: """Read one JSONL trace file into event dictionaries.""" parsed_events: list[dict[str, object]] = [] malformed = 0 with path.open("r", encoding="utf-8") as handle: for line in handle: stripped = line.strip() if not stripped: continue try: payload = json.loads(stripped) except json.JSONDecodeError: # Count malformed lines but continue so one bad line doesn't drop the whole file. malformed += 1 continue if not isinstance(payload, dict): malformed += 1 continue parsed_events.append(payload) return parsed_events, malformed def _resolve_run_outcomes(events: Sequence[Mapping[str, object]]) -> dict[str, object]: """Resolve run-level success/failure summaries.""" run_outcomes: dict[str, bool] = {} fallback_agent_outcomes: dict[str, bool] = {} for event in events: run_id = str(event.get("run_id", "") or "") if not run_id: continue event_type = _event_type(event) attributes = _event_attributes(event) success = _coerce_bool(attributes.get("success")) if success is None: continue if event_type == "RunFinished": run_outcomes[run_id] = success elif event_type == "AgentRunFinished": fallback_agent_outcomes[run_id] = success for run_id, success in fallback_agent_outcomes.items(): # Preserve explicit RunFinished outcomes when both event families exist. run_outcomes.setdefault(run_id, success) success_count = sum(1 for value in run_outcomes.values() if value) failure_count = sum(1 for value in run_outcomes.values() if not value) return { "unique_run_count": len({str(event.get("run_id", "") or "") for event in events if event.get("run_id")}), "runs_with_terminal_status": len(run_outcomes), "success_count": success_count, "failure_count": failure_count, } def _build_model_metrics( events: Sequence[Mapping[str, object]], *, prefer_request_observed: bool, prefer_response_observed: bool, top_n: int, ) -> dict[str, object]: """Build model usage metrics from trace events.""" call_event_types = ("ModelRequestObserved",) if prefer_request_observed else ("ModelCallStarted",) response_event_types = ("ModelResponseObserved",) if prefer_response_observed else ("ModelCallFinished",) failure_event_types = ("ModelResponseObserved",) if prefer_response_observed else ("ModelCallFailed",) model_call_count = 0 per_model_calls: Counter[str] = Counter() for event in events: if _event_type(event) not in call_event_types: continue model_call_count += 1 model_name = _resolve_model_name(event) per_model_calls[model_name] += 1 tokens = {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0} per_model_tokens: dict[str, Counter[str]] = defaultdict(Counter) model_latency_values: list[int] = [] model_failures = 0 for event in events: event_type = _event_type(event) if event_type in response_event_types: model_name = _resolve_model_name(event) usage = _extract_usage(event) for token_key in ("prompt_tokens", "completion_tokens", "total_tokens"): value = _coerce_int(usage.get(token_key)) if value is None: continue tokens[token_key] += value per_model_tokens[model_name][token_key] += value response_latency = _extract_response_latency(event) if response_latency is not None: model_latency_values.append(response_latency) if event_type in failure_event_types and _extract_error_text(event): model_failures += 1 per_model_items = [ { "model": model_name, "calls": per_model_calls.get(model_name, 0), "prompt_tokens": int(per_model_tokens.get(model_name, Counter()).get("prompt_tokens", 0)), "completion_tokens": int(per_model_tokens.get(model_name, Counter()).get("completion_tokens", 0)), "total_tokens": int(per_model_tokens.get(model_name, Counter()).get("total_tokens", 0)), } for model_name in sorted(set(per_model_calls) | set(per_model_tokens)) ] per_model_items.sort(key=lambda item: _coerce_int(item.get("calls")) or 0, reverse=True) return { "model_call_count": model_call_count, "model_failure_count": model_failures, "tokens": tokens, "per_model": per_model_items[: max(1, top_n)], "latency_ms": _summarize_numbers(model_latency_values), } def _build_tool_metrics( events: Sequence[Mapping[str, object]], *, prefer_observed: bool, prefer_result_observed: bool, top_n: int, ) -> dict[str, object]: """Build tool usage metrics from trace events.""" invocation_event_types = ("ToolInvocationObserved",) if prefer_observed else ("ToolCallStarted",) result_event_types = ("ToolResultObserved",) if prefer_result_observed else ("ToolCallFinished", "ToolCallFailed") # Keep invocation/result event families configurable to support mixed-version trace directories. tool_invocation_count = 0 per_tool_counts: Counter[str] = Counter() for event in events: if _event_type(event) not in invocation_event_types: continue tool_name = _resolve_tool_name(event) tool_invocation_count += 1 per_tool_counts[tool_name] += 1 tool_success = 0 tool_failures = 0 per_tool_failures: Counter[str] = Counter() for event in events: if _event_type(event) not in result_event_types: continue tool_name = _resolve_tool_name(event) ok_flag = _extract_tool_success(event) if ok_flag is True: tool_success += 1 elif ok_flag is False: tool_failures += 1 per_tool_failures[tool_name] += 1 per_tool = [ { "tool_name": tool_name, "invocations": per_tool_counts.get(tool_name, 0), "failures": per_tool_failures.get(tool_name, 0), } for tool_name in sorted(set(per_tool_counts) | set(per_tool_failures)) ] per_tool.sort(key=lambda item: _coerce_int(item.get("invocations")) or 0, reverse=True) return { "tool_invocation_count": tool_invocation_count, "tool_success_count": tool_success, "tool_failure_count": tool_failures, "per_tool": per_tool[: max(1, top_n)], } def _build_workflow_step_metrics( events: Sequence[Mapping[str, object]], *, prefer_observed: bool, ) -> dict[str, object]: """Build workflow step metrics from trace events.""" result_event_types = ("WorkflowStepResultObserved",) if prefer_observed else ("WorkflowStepFinished",) step_count = 0 counts_by_type: Counter[str] = Counter() counts_by_status: Counter[str] = Counter() for event in events: if _event_type(event) not in result_event_types: continue attributes = _event_attributes(event) step_count += 1 counts_by_type[str(attributes.get("step_type", "unknown"))] += 1 counts_by_status[str(attributes.get("status", "unknown"))] += 1 return { "step_execution_count": step_count, "counts_by_step_type": dict(sorted(counts_by_type.items())), "counts_by_status": dict(sorted(counts_by_status.items())), } def _build_latency_metrics(events: Sequence[Mapping[str, object]], *, top_n: int) -> dict[str, object]: """Build latency summaries from event duration and response fields.""" tool_durations: list[int] = [] duration_by_event_type: dict[str, list[int]] = defaultdict(list) for event in events: duration_ms = _coerce_int(event.get("duration_ms")) if duration_ms is None: continue event_type = _event_type(event) duration_by_event_type[event_type].append(duration_ms) if event_type in {"ToolCallFinished", "ToolCallFailed"}: tool_durations.append(duration_ms) key_event_durations = [ { "event_type": event_type, "summary": _summarize_numbers(values), } for event_type, values in duration_by_event_type.items() ] key_event_durations.sort( key=lambda item: _coerce_int(_as_mapping(item.get("summary")).get("count")) or 0, reverse=True, ) return { "tool_duration_ms": _summarize_numbers(tool_durations), "event_duration_ms": key_event_durations[: max(1, top_n)], } def _build_error_metrics(events: Sequence[Mapping[str, object]], *, top_n: int) -> dict[str, object]: """Build error summaries from error-bearing events.""" error_counter: Counter[str] = Counter() for event in events: error_text = _extract_error_text(event) if error_text: error_counter[error_text] += 1 top_errors = [{"error": error, "count": count} for error, count in error_counter.most_common(max(1, top_n))] return {"error_event_count": sum(error_counter.values()), "top_errors": top_errors} def _event_type(event: Mapping[str, object]) -> str: """Return normalized event type.""" return str(event.get("event_type", "Unknown")) def _event_attributes(event: Mapping[str, object]) -> Mapping[str, object]: """Return event attributes payload as mapping.""" attributes = event.get("attributes") if not isinstance(attributes, Mapping): return {} return cast_mapping(attributes) def _resolve_model_name(event: Mapping[str, object]) -> str: """Resolve one model name from event attributes.""" attributes = _event_attributes(event) explicit_model = attributes.get("model") if isinstance(explicit_model, str) and explicit_model: return explicit_model response = attributes.get("response") if isinstance(response, Mapping): response_model = response.get("model") if isinstance(response_model, str) and response_model: return response_model request_payload = attributes.get("request") if isinstance(request_payload, Mapping): request_model = request_payload.get("model") if isinstance(request_model, str) and request_model: return request_model return "unknown" def _resolve_tool_name(event: Mapping[str, object]) -> str: """Resolve one tool name from event attributes.""" attributes = _event_attributes(event) tool_name = attributes.get("tool_name") if isinstance(tool_name, str) and tool_name: return tool_name return "unknown" def _extract_usage(event: Mapping[str, object]) -> Mapping[str, object]: """Extract usage payload from event attributes.""" attributes = _event_attributes(event) usage_direct = attributes.get("usage") if isinstance(usage_direct, Mapping): return cast_mapping(usage_direct) response_payload = attributes.get("response") if isinstance(response_payload, Mapping): response_usage = response_payload.get("usage") if isinstance(response_usage, Mapping): return cast_mapping(response_usage) return {} def _extract_response_latency(event: Mapping[str, object]) -> int | None: """Extract latency field from response payload when present.""" attributes = _event_attributes(event) response_payload = attributes.get("response") if isinstance(response_payload, Mapping): latency_ms = _coerce_int(response_payload.get("latency_ms")) if latency_ms is not None: return latency_ms return _coerce_int(event.get("duration_ms")) def _extract_tool_success(event: Mapping[str, object]) -> bool | None: """Infer success flag from a tool result event.""" event_type = _event_type(event) attributes = _event_attributes(event) if event_type == "ToolResultObserved": explicit_ok = _coerce_bool(attributes.get("ok")) if explicit_ok is not None: return explicit_ok # When explicit ok is absent, infer failure from any error payload presence. return _extract_error_text(event) is None if event_type == "ToolCallFinished": return True if event_type == "ToolCallFailed": return False return None def _extract_error_text(event: Mapping[str, object]) -> str | None: """Extract one error message from an event payload when available.""" attributes = _event_attributes(event) error_value = attributes.get("error") if isinstance(error_value, str) and error_value: return error_value if isinstance(error_value, Mapping): message = error_value.get("message") if isinstance(message, str) and message: return message return json.dumps(dict(error_value), ensure_ascii=True, sort_keys=True) if isinstance(error_value, list) and error_value: return json.dumps(error_value, ensure_ascii=True, sort_keys=True) return None def _summarize_numbers(values: Sequence[int]) -> dict[str, object]: """Return count/min/p50/p95/max summary for integer values.""" if not values: return {"count": 0, "min": None, "p50": None, "p95": None, "max": None} ordered = sorted(values) # Use nearest-rank percentiles to keep summaries integer-valued and deterministic. return { "count": len(ordered), "min": ordered[0], "p50": _percentile(ordered, 0.50), "p95": _percentile(ordered, 0.95), "max": ordered[-1], } def _percentile(sorted_values: Sequence[int], quantile: float) -> int: """Return nearest-rank percentile for pre-sorted integer values.""" if not sorted_values: return 0 rank = round((len(sorted_values) - 1) * quantile) # Clamp for safety in case callers pass out-of-range quantiles. rank = max(0, min(rank, len(sorted_values) - 1)) return int(sorted_values[rank]) def _coerce_int(value: object) -> int | None: """Coerce value to int when safely possible.""" if isinstance(value, bool): return None if isinstance(value, int): return value if isinstance(value, float): return int(value) return None def _coerce_bool(value: object) -> bool | None: """Coerce value to bool when safely possible.""" if isinstance(value, bool): return value return None
[docs] def cast_mapping(value: Mapping[Any, Any]) -> Mapping[str, object]: """Cast a generic mapping into string-keyed object mapping.""" return {str(key): val for key, val in value.items()}
def _as_mapping(value: object) -> Mapping[str, object]: """Return value as mapping when possible, else empty mapping.""" if isinstance(value, Mapping): return cast_mapping(value) return {} def _format_human_summary(payload: Mapping[str, object]) -> str: """Render human-readable summary for analysis output.""" runs = cast_mapping(_mapping_value(payload, "runs")) events = cast_mapping(_mapping_value(payload, "events")) models = cast_mapping(_mapping_value(payload, "models")) tools = cast_mapping(_mapping_value(payload, "tools")) workflow_steps = cast_mapping(_mapping_value(payload, "workflow_steps")) latency = cast_mapping(_mapping_value(payload, "latency")) errors = cast_mapping(_mapping_value(payload, "errors")) warnings = cast_mapping(_mapping_value(payload, "warnings")) inputs = cast_mapping(_mapping_value(payload, "inputs")) lines = [ "Trace Analysis Summary", f"- trace_dir: {inputs.get('trace_dir')}", f"- files_scanned: {inputs.get('files_scanned')}", f"- events_total: {events.get('total')}", f"- runs_unique: {runs.get('unique_run_count')}", f"- runs_success: {runs.get('success_count')}", f"- runs_failure: {runs.get('failure_count')}", "", "Model Usage", f"- model_call_count: {models.get('model_call_count')}", f"- model_failure_count: {models.get('model_failure_count')}", f"- prompt_tokens: {cast_mapping(_mapping_value(models, 'tokens')).get('prompt_tokens')}", f"- completion_tokens: {cast_mapping(_mapping_value(models, 'tokens')).get('completion_tokens')}", f"- total_tokens: {cast_mapping(_mapping_value(models, 'tokens')).get('total_tokens')}", "", "Tool Usage", f"- tool_invocation_count: {tools.get('tool_invocation_count')}", f"- tool_success_count: {tools.get('tool_success_count')}", f"- tool_failure_count: {tools.get('tool_failure_count')}", "", "Workflow Steps", f"- step_execution_count: {workflow_steps.get('step_execution_count')}", f"- counts_by_status: {json.dumps(_mapping_value(workflow_steps, 'counts_by_status'), sort_keys=True)}", "", "Latency", f"- model_latency_ms: {json.dumps(_mapping_value(models, 'latency_ms'), sort_keys=True)}", f"- tool_duration_ms: {json.dumps(_mapping_value(latency, 'tool_duration_ms'), sort_keys=True)}", "", "Errors", f"- error_event_count: {errors.get('error_event_count')}", f"- malformed_lines: {warnings.get('malformed_lines')}", ] return "\n".join(lines) def _mapping_value(payload: Mapping[str, object], key: str) -> Mapping[str, object]: """Return mapping value from payload by key with empty fallback.""" value = payload.get(key, {}) if isinstance(value, Mapping): return cast_mapping(value) return {}
[docs] def main(argv: Iterable[str] | None = None) -> int: """Trace-analysis CLI.""" parser = argparse.ArgumentParser(description="Analyze trace JSONL files and compute summary metrics.") parser.add_argument("--trace-dir", required=True, help="Directory containing trace JSONL files.") parser.add_argument("--glob", default="run_*.jsonl", help="Glob pattern within trace directory.") parser.add_argument("--json", action="store_true", help="Emit JSON output instead of human summary.") parser.add_argument("--top-n", type=int, default=10, help="Top N rows for per-model/per-tool summaries.") args = parser.parse_args(list(argv) if argv is not None else None) summary = analyze_trace_dir( trace_dir=Path(args.trace_dir), glob_pattern=str(args.glob), top_n=max(1, int(args.top_n)), ) try: if args.json: print(json.dumps(summary, indent=2, sort_keys=True)) return 0 print(_format_human_summary(summary)) except BrokenPipeError: return 0 return 0
if __name__ == "__main__": raise SystemExit(main())