"""Trace event models plus run-scoped span bookkeeping."""
from __future__ import annotations
import dataclasses
import time
from dataclasses import dataclass, field
from datetime import datetime, timezone
from uuid import uuid4
from ._sinks import TraceSink
from ._utils import _normalize_value
UTC = getattr(datetime, "UTC", timezone.utc) # noqa: UP017
[docs]
@dataclass(slots=True, frozen=True, kw_only=True)
class TraceEvent:
"""Normalized trace event payload."""
event_type: str
"""Normalized event type name."""
run_id: str
"""Run identifier that owns the event."""
span_id: str
"""Span identifier associated with the event."""
parent_span_id: str | None
"""Parent span identifier, when the event is nested."""
timestamp: str
"""ISO 8601 UTC timestamp for event emission."""
timestamp_ms: int
"""Unix timestamp in milliseconds."""
duration_ms: int | None = None
"""Elapsed duration for completed spans, when known."""
attributes: dict[str, object] = field(default_factory=dict)
"""Normalized event-attribute payload."""
event_index: int | None = None
"""Monotonic sequence number within the trace session."""
[docs]
def to_dict(self) -> dict[str, object]:
"""Return JSON-serializable dictionary representation.
Returns:
Normalized trace event mapping.
"""
payload = dataclasses.asdict(self)
payload["attributes"] = _normalize_value(self.attributes)
return payload
@dataclass(slots=True, frozen=True, kw_only=True)
class _SpanInfo:
"""Internal bookkeeping record for one open span."""
start_time: float
"""Performance-counter timestamp captured when the span opened."""
parent_span_id: str | None
"""Parent span id recorded when the span opened."""
[docs]
class TraceSession:
"""Run-scoped trace session tracking open spans and sinks."""
def __init__(self, *, run_id: str, sinks: list[TraceSink]) -> None:
"""Initialize a trace session with a run id and sinks.
Args:
run_id: Run identifier for this trace session.
sinks: Trace sinks that will receive emitted events.
"""
self.run_id = run_id
self.root_span_id = uuid4().hex
self._sinks = sinks
self._open_spans: dict[str, _SpanInfo] = {}
self._event_index = 0
[docs]
def start_span(
self,
event_type: str,
*,
parent_span_id: str | None,
attributes: dict[str, object],
) -> str:
"""Open a new span and emit its start event.
Args:
event_type: Event type label for the span start.
parent_span_id: Optional parent span id.
attributes: Event attributes payload.
Returns:
Generated span id.
"""
span_id = uuid4().hex
# Track open-span start time for duration emission on finish_span.
self._open_spans[span_id] = _SpanInfo(
start_time=time.perf_counter(),
parent_span_id=parent_span_id,
)
self.emit_event(
event_type,
span_id=span_id,
parent_span_id=parent_span_id,
attributes=attributes,
)
return span_id
[docs]
def finish_span(
self,
event_type: str,
*,
span_id: str,
attributes: dict[str, object],
) -> None:
"""Finish a span and emit a completion event with duration.
Args:
event_type: Event type label for the span completion.
span_id: Span identifier to close.
attributes: Event attributes payload.
"""
info = self._open_spans.pop(span_id, None)
duration_ms = None
parent_span_id = None
if info is not None:
# Duration is only known for spans started in this session.
duration_ms = int((time.perf_counter() - info.start_time) * 1000)
parent_span_id = info.parent_span_id
self.emit_event(
event_type,
span_id=span_id,
parent_span_id=parent_span_id,
attributes=attributes,
duration_ms=duration_ms,
)
[docs]
def emit_span_event(
self,
event_type: str,
*,
span_id: str,
attributes: dict[str, object],
) -> None:
"""Emit an event tied to an existing span.
Args:
event_type: Event type label for the span event.
span_id: Span identifier for the event.
attributes: Event attributes payload.
"""
parent_span_id = None
info = self._open_spans.get(span_id)
if info is not None:
parent_span_id = info.parent_span_id
self.emit_event(
event_type,
span_id=span_id,
parent_span_id=parent_span_id,
attributes=attributes,
)
[docs]
def emit_event(
self,
event_type: str,
*,
span_id: str,
parent_span_id: str | None,
attributes: dict[str, object],
duration_ms: int | None = None,
) -> None:
"""Emit a standalone event payload to all sinks.
Args:
event_type: Event type label.
span_id: Span identifier for the event.
parent_span_id: Optional parent span id.
attributes: Event attributes payload.
duration_ms: Optional event duration in milliseconds.
"""
timestamp = datetime.now(UTC).isoformat()
event = TraceEvent(
event_type=event_type,
run_id=self.run_id,
span_id=span_id,
parent_span_id=parent_span_id,
timestamp=timestamp,
timestamp_ms=int(time.time() * 1000),
duration_ms=duration_ms,
attributes=dict(attributes),
event_index=self._event_index,
)
payload = event.to_dict()
# Monotonic event index preserves emission order across sinks/readers.
self._event_index += 1
for sink in self._sinks:
sink.emit(payload)
[docs]
def close(self) -> None:
"""Close all sinks associated with this session."""
for sink in self._sinks:
sink.close()