"""Trace session context management."""
from __future__ import annotations
import time
from collections.abc import Mapping
from contextvars import ContextVar, Token
from dataclasses import dataclass
from ._config import Tracer
from ._session import TraceSession, _SpanInfo
from ._utils import _normalize_value, _sanitize_trace_payload
[docs]
@dataclass(slots=True, kw_only=True)
class TraceScope:
"""Context manager payload for active trace scopes."""
session: TraceSession
"""Stored ``session`` value."""
span_id: str
"""Stored ``span_id`` value."""
is_root: bool
"""Stored ``is_root`` value."""
trace_token: Token[TraceSession | None] | None = None
"""Stored ``trace_token`` value."""
span_token: Token[str | None] | None = None
"""Stored ``span_token`` value."""
agent_name: str | None = None
"""Stored ``agent_name`` value."""
_CURRENT_TRACE: ContextVar[TraceSession | None] = ContextVar("dra_trace_session", default=None)
_CURRENT_SPAN: ContextVar[str | None] = ContextVar("dra_trace_span", default=None)
[docs]
def current_trace_session() -> TraceSession | None:
"""Return the current trace session when active.
Returns:
Active trace session, or ``None`` when tracing is inactive.
"""
return _CURRENT_TRACE.get()
[docs]
def current_span_id() -> str | None:
"""Return the current active span id if one is set.
Returns:
Active span id, or ``None`` when no span is active.
"""
return _CURRENT_SPAN.get()
[docs]
def start_trace_run(
*,
agent_name: str,
request_id: str,
input_payload: Mapping[str, object],
dependencies: Mapping[str, object],
tracer: Tracer | None = None,
) -> TraceScope | None:
"""Start a trace run scope or nested agent span.
Args:
agent_name: Delegate name for trace metadata.
request_id: Request identifier for the run.
input_payload: Normalized input payload mapping.
dependencies: Dependency payload mapping.
tracer: Explicit tracer dependency. When ``None`` tracing is disabled.
Returns:
Trace scope when tracing is enabled, otherwise ``None``.
"""
if tracer is None or not tracer.enabled:
return None
active_session = _CURRENT_TRACE.get()
if active_session is None:
# Root run: create a fresh session and bind both trace/span contextvars.
trace_path = tracer.build_trace_path(run_id=request_id)
sinks = tracer.build_sinks(trace_path=trace_path)
session = TraceSession(run_id=request_id, sinks=sinks)
session._open_spans[session.root_span_id] = _SpanInfo(
start_time=time.perf_counter(),
parent_span_id=None,
)
trace_token = _CURRENT_TRACE.set(session)
span_token = _CURRENT_SPAN.set(session.root_span_id)
session.emit_event(
"RunStarted",
span_id=session.root_span_id,
parent_span_id=None,
attributes={
"agent": agent_name,
"request_id": request_id,
"input": _sanitize_trace_payload(dict(input_payload)),
"dependency_keys": sorted(dependencies.keys()),
"trace_path": str(trace_path) if trace_path is not None else None,
},
)
return TraceScope(
session=session,
span_id=session.root_span_id,
is_root=True,
trace_token=trace_token,
span_token=span_token,
agent_name=agent_name,
)
parent_span_id = _CURRENT_SPAN.get() or active_session.root_span_id
# Nested agent run: attach under current span without replacing the active session.
span_id = active_session.start_span(
"AgentRunStarted",
parent_span_id=parent_span_id,
attributes={
"agent": agent_name,
"request_id": request_id,
"input": _sanitize_trace_payload(dict(input_payload)),
"dependency_keys": sorted(dependencies.keys()),
},
)
span_token = _CURRENT_SPAN.set(span_id)
return TraceScope(
session=active_session,
span_id=span_id,
is_root=False,
span_token=span_token,
agent_name=agent_name,
)
[docs]
def finish_trace_run(
scope: TraceScope | None,
*,
result: object | None = None,
error: str | None = None,
) -> None:
"""Finish a trace run scope with success or failure metadata.
Args:
scope: Trace scope returned by ``start_trace_run``.
result: Optional result payload for success metadata.
error: Optional error string for failure metadata.
"""
if scope is None:
return
if scope.is_root:
# Root scope owns session lifecycle and must always close sinks/context.
attributes = {
"success": (bool(getattr(result, "success", False)) if result is not None else False),
"error": error,
"result": _sanitize_trace_payload(_normalize_value(result)),
}
scope.session.finish_span(
"RunFinished",
span_id=scope.span_id,
attributes=attributes,
)
scope.session.close()
if scope.trace_token is not None:
_CURRENT_TRACE.reset(scope.trace_token)
else:
# Nested scope only closes its span; parent/root session remains active.
attributes = {
"success": (bool(getattr(result, "success", False)) if result is not None else False),
"error": error,
"agent": scope.agent_name,
}
scope.session.finish_span(
"AgentRunFinished",
span_id=scope.span_id,
attributes=attributes,
)
if scope.span_token is not None:
_CURRENT_SPAN.reset(scope.span_token)