"""Trace sinks for JSONL and console output."""
from __future__ import annotations
import json
import sys
from collections.abc import Mapping
from pathlib import Path
from typing import Protocol, TextIO
from ._utils import _preview
[docs]
class TraceSink(Protocol):
"""Protocol for trace sinks that consume normalized event payloads."""
[docs]
def emit(self, event: dict[str, object]) -> None:
"""Handle one trace event payload.
Args:
event: Normalized trace event payload mapping.
"""
[docs]
def close(self) -> None:
"""Finalize and release sink resources."""
[docs]
class JSONLTraceSink:
"""Append-only JSONL sink for trace events."""
def __init__(self, path: Path) -> None:
"""Initialize a JSONL sink that appends to the given path.
Args:
path: Output path for the JSONL trace file.
"""
self._path = path
self._path.parent.mkdir(parents=True, exist_ok=True)
self._file = self._path.open("a", encoding="utf-8")
[docs]
def emit(self, event: dict[str, object]) -> None:
"""Write one event to the JSONL file.
Args:
event: Normalized trace event payload mapping.
"""
self._file.write(json.dumps(event, ensure_ascii=True))
self._file.write("\n")
self._file.flush()
[docs]
def close(self) -> None:
"""Close the underlying file handle."""
self._file.close()
[docs]
class ConsoleTraceSink:
"""Pretty streaming console sink for trace events."""
def __init__(self, stream: TextIO | None = None) -> None:
"""Initialize a console sink that writes to the given stream.
Args:
stream: Optional stream to write console output to.
"""
self._stream = stream if stream is not None else sys.stderr
self._streaming_line_open = False
[docs]
def emit(self, event: dict[str, object]) -> None:
"""Render one event to the console stream.
Args:
event: Normalized trace event payload mapping.
"""
event_type = str(event.get("event_type", ""))
attrs = self._normalize_attributes(event.get("attributes", {}))
if self._emit_token_delta(event_type=event_type, attrs=attrs):
return
# Close streaming token line before printing non-token events for readable console output.
self._maybe_close_streaming_line(event_type)
rendered = self._render_event_line(event_type=event_type, attrs=attrs, event=event)
if rendered is not None:
self._write_line(rendered)
def _normalize_attributes(self, attrs: object) -> Mapping[str, object]:
"""Normalize arbitrary event attributes into a mapping.
Args:
attrs: Raw attribute payload from the trace event.
Returns:
Mapping view of the attributes, or an empty mapping.
"""
if isinstance(attrs, Mapping):
return attrs
return {}
def _emit_token_delta(self, *, event_type: str, attrs: Mapping[str, object]) -> bool:
"""Render inline token deltas for streaming model output.
Args:
event_type: Trace event type currently being processed.
attrs: Normalized event attributes.
Returns:
``True`` when the event was handled inline and needs no extra line output.
"""
if event_type != "ModelCallToken":
return False
delta_text = attrs.get("delta_text")
if isinstance(delta_text, str) and delta_text:
self._stream.write(delta_text)
self._stream.flush()
self._streaming_line_open = True
# Token events are rendered inline and should not emit an additional event line.
return True
def _maybe_close_streaming_line(self, event_type: str) -> None:
"""Close an inline token stream before printing the next model event.
Args:
event_type: Trace event type currently being processed.
"""
if self._streaming_line_open and event_type.startswith("ModelCall"):
self._stream.write("\n")
self._streaming_line_open = False
def _render_event_line(
self,
*,
event_type: str,
attrs: Mapping[str, object],
event: Mapping[str, object],
) -> str | None:
"""Render one human-readable console line for a known event type.
Args:
event_type: Trace event type currently being processed.
attrs: Normalized event attributes.
event: Full normalized event payload.
Returns:
Rendered console line, or ``None`` when the event is intentionally hidden.
"""
# Keep renderer table explicit so event-label changes are easy to diff/review.
event_lines: dict[str, str] = {
"RunStarted": (
f"[run] start run_id={event.get('run_id')} "
f"agent={_preview(attrs.get('agent'))} "
f"trace_path={_preview(attrs.get('trace_path'))}"
),
"RunFinished": (
f"[run] finished run_id={event.get('run_id')} "
f"success={_preview(attrs.get('success'))} "
f"duration_ms={_preview(event.get('duration_ms'))}"
),
"AgentRunStarted": (
f"[agent] start agent={_preview(attrs.get('agent'))} request_id={_preview(attrs.get('request_id'))}"
),
"AgentRunFinished": (
f"[agent] finished agent={_preview(attrs.get('agent'))} "
f"success={_preview(attrs.get('success'))} "
f"duration_ms={_preview(event.get('duration_ms'))}"
),
"ModelCallStarted": (
f"[model] start model={_preview(attrs.get('model'))} messages={_preview(attrs.get('message_count'))}"
),
"ModelCallFinished": (
f"[model] finished model={_preview(attrs.get('model'))} "
f"duration_ms={_preview(event.get('duration_ms'))}"
),
"ModelCallFailed": (
f"[model] failed model={_preview(attrs.get('model'))} error={_preview(attrs.get('error'))}"
),
"ToolCallStarted": (
f"[tool] start tool={_preview(attrs.get('tool_name'))} input={_preview(attrs.get('tool_input'))}"
),
"ToolCallFinished": (
f"[tool] finished tool={_preview(attrs.get('tool_name'))} "
f"duration_ms={_preview(event.get('duration_ms'))}"
),
"ToolCallFailed": (
f"[tool] failed tool={_preview(attrs.get('tool_name'))} error={_preview(attrs.get('error'))}"
),
"RouterDecision": (
f"[router] decision tool={_preview(attrs.get('selected_tool_name'))} "
f"source={_preview(attrs.get('source'))}"
),
"ModelSelectionDecision": (
f"[model-select] decision model={_preview(attrs.get('model_id'))} "
f"provider={_preview(attrs.get('provider'))}"
),
"ToolSelectionDecision": (
f"[tool-select] decision tool={_preview(attrs.get('tool_name'))} source={_preview(attrs.get('source'))}"
),
"ContinuationDecision": (
f"[continuation] step={_preview(attrs.get('step'))} "
f"decision={_preview(attrs.get('continue'))} "
f"source={_preview(attrs.get('source'))}"
),
"GuardrailDecision": (
f"[guardrail] guardrail={_preview(attrs.get('guardrail'))} "
f"decision={_preview(attrs.get('decision'))} "
f"reason={_preview(attrs.get('reason'))}"
),
}
return event_lines.get(event_type)
[docs]
def close(self) -> None:
"""Flush and close any open streaming output."""
if self._streaming_line_open:
self._stream.write("\n")
self._stream.flush()
self._streaming_line_open = False
def _write_line(self, text: str) -> None:
"""Write one newline-terminated line to the configured stream.
Args:
text: Rendered line text to write.
"""
self._stream.write(f"{text}\n")
self._stream.flush()