Source code for design_research_agents._tracing._session

"""Trace event models plus run-scoped span bookkeeping."""

from __future__ import annotations

import dataclasses
import time
from dataclasses import dataclass, field
from datetime import datetime, timezone
from uuid import uuid4

from ._sinks import TraceSink
from ._utils import _normalize_value

UTC = getattr(datetime, "UTC", timezone.utc)  # noqa: UP017


[docs] @dataclass(slots=True, frozen=True, kw_only=True) class TraceEvent: """Normalized trace event payload.""" event_type: str """Normalized event type name.""" run_id: str """Run identifier that owns the event.""" span_id: str """Span identifier associated with the event.""" parent_span_id: str | None """Parent span identifier, when the event is nested.""" timestamp: str """ISO 8601 UTC timestamp for event emission.""" timestamp_ms: int """Unix timestamp in milliseconds.""" duration_ms: int | None = None """Elapsed duration for completed spans, when known.""" attributes: dict[str, object] = field(default_factory=dict) """Normalized event-attribute payload.""" event_index: int | None = None """Monotonic sequence number within the trace session."""
[docs] def to_dict(self) -> dict[str, object]: """Return JSON-serializable dictionary representation. Returns: Normalized trace event mapping. """ payload = dataclasses.asdict(self) payload["attributes"] = _normalize_value(self.attributes) return payload
@dataclass(slots=True, frozen=True, kw_only=True) class _SpanInfo: """Internal bookkeeping record for one open span.""" start_time: float """Performance-counter timestamp captured when the span opened.""" parent_span_id: str | None """Parent span id recorded when the span opened."""
[docs] class TraceSession: """Run-scoped trace session tracking open spans and sinks.""" def __init__(self, *, run_id: str, sinks: list[TraceSink]) -> None: """Initialize a trace session with a run id and sinks. Args: run_id: Run identifier for this trace session. sinks: Trace sinks that will receive emitted events. """ self.run_id = run_id self.root_span_id = uuid4().hex self._sinks = sinks self._open_spans: dict[str, _SpanInfo] = {} self._event_index = 0
[docs] def start_span( self, event_type: str, *, parent_span_id: str | None, attributes: dict[str, object], ) -> str: """Open a new span and emit its start event. Args: event_type: Event type label for the span start. parent_span_id: Optional parent span id. attributes: Event attributes payload. Returns: Generated span id. """ span_id = uuid4().hex # Track open-span start time for duration emission on finish_span. self._open_spans[span_id] = _SpanInfo( start_time=time.perf_counter(), parent_span_id=parent_span_id, ) self.emit_event( event_type, span_id=span_id, parent_span_id=parent_span_id, attributes=attributes, ) return span_id
[docs] def finish_span( self, event_type: str, *, span_id: str, attributes: dict[str, object], ) -> None: """Finish a span and emit a completion event with duration. Args: event_type: Event type label for the span completion. span_id: Span identifier to close. attributes: Event attributes payload. """ info = self._open_spans.pop(span_id, None) duration_ms = None parent_span_id = None if info is not None: # Duration is only known for spans started in this session. duration_ms = int((time.perf_counter() - info.start_time) * 1000) parent_span_id = info.parent_span_id self.emit_event( event_type, span_id=span_id, parent_span_id=parent_span_id, attributes=attributes, duration_ms=duration_ms, )
[docs] def emit_span_event( self, event_type: str, *, span_id: str, attributes: dict[str, object], ) -> None: """Emit an event tied to an existing span. Args: event_type: Event type label for the span event. span_id: Span identifier for the event. attributes: Event attributes payload. """ parent_span_id = None info = self._open_spans.get(span_id) if info is not None: parent_span_id = info.parent_span_id self.emit_event( event_type, span_id=span_id, parent_span_id=parent_span_id, attributes=attributes, )
[docs] def emit_event( self, event_type: str, *, span_id: str, parent_span_id: str | None, attributes: dict[str, object], duration_ms: int | None = None, ) -> None: """Emit a standalone event payload to all sinks. Args: event_type: Event type label. span_id: Span identifier for the event. parent_span_id: Optional parent span id. attributes: Event attributes payload. duration_ms: Optional event duration in milliseconds. """ timestamp = datetime.now(UTC).isoformat() event = TraceEvent( event_type=event_type, run_id=self.run_id, span_id=span_id, parent_span_id=parent_span_id, timestamp=timestamp, timestamp_ms=int(time.time() * 1000), duration_ms=duration_ms, attributes=dict(attributes), event_index=self._event_index, ) payload = event.to_dict() # Monotonic event index preserves emission order across sinks/readers. self._event_index += 1 for sink in self._sinks: sink.emit(payload)
[docs] def close(self) -> None: """Close all sinks associated with this session.""" for sink in self._sinks: sink.close()