"""Memory and graph-memory contracts for pluggable persistence backends."""
from __future__ import annotations
from collections.abc import Sequence
from dataclasses import asdict, dataclass, field
from types import TracebackType
from typing import Any, Protocol, Self
[docs]
@dataclass(slots=True, frozen=True, kw_only=True)
class MemoryWriteRecord:
"""One record payload to be persisted into a memory store."""
content: str
"""Primary text content to persist."""
metadata: dict[str, object] = field(default_factory=dict)
"""Optional metadata fields used for downstream filtering/ranking."""
item_id: str | None = None
"""Optional caller-supplied id for deterministic upserts."""
[docs]
def to_dict(self) -> dict[str, Any]:
"""Return a JSON-serializable dictionary representation.
Returns:
Serialized dataclass payload.
"""
return asdict(self)
[docs]
@dataclass(slots=True, frozen=True, kw_only=True)
class MemorySearchQuery:
"""Structured memory search query."""
text: str
"""Natural language query text."""
namespace: str = "default"
"""Namespace partition used for isolation."""
top_k: int = 5
"""Maximum number of matches to return."""
min_score: float | None = None
"""Optional minimum score threshold for returned matches."""
metadata_filters: dict[str, object] = field(default_factory=dict)
"""Exact-match metadata filters applied before ranking."""
[docs]
def to_dict(self) -> dict[str, Any]:
"""Return a JSON-serializable dictionary representation.
Returns:
Serialized dataclass payload.
"""
return asdict(self)
[docs]
@dataclass(slots=True, frozen=True, kw_only=True)
class MemoryRecord:
"""Retrieved or persisted memory record."""
item_id: str
"""Stable memory record identifier."""
namespace: str
"""Namespace partition that owns this record."""
content: str
"""Record content text."""
metadata: dict[str, object] = field(default_factory=dict)
"""Record metadata fields."""
created_at: str | None = None
"""ISO timestamp for record creation."""
updated_at: str | None = None
"""ISO timestamp for last record update."""
score: float | None = None
"""Combined ranking score when returned by search."""
lexical_score: float | None = None
"""Lexical relevance score."""
vector_score: float | None = None
"""Vector similarity score when embeddings are available."""
[docs]
def to_dict(self) -> dict[str, Any]:
"""Return a JSON-serializable dictionary representation.
Returns:
Serialized dataclass payload.
"""
return asdict(self)
[docs]
@dataclass(slots=True, frozen=True, kw_only=True)
class GraphNodeRecord:
"""One graph node stored in a graph-memory backend."""
node_id: str
"""Stable node identifier."""
name: str
"""Human-readable node label."""
node_type: str = "entity"
"""Coarse node type such as ``component`` or ``formula``."""
description: str = ""
"""Optional free-text node description."""
metadata: dict[str, object] = field(default_factory=dict)
"""Optional node metadata fields."""
created_at: str | None = None
"""ISO timestamp for node creation."""
updated_at: str | None = None
"""ISO timestamp for last node update."""
score: float | None = None
"""Optional retrieval score when returned by a graph query."""
[docs]
def to_dict(self) -> dict[str, Any]:
"""Return a JSON-serializable dictionary representation.
Returns:
Serialized dataclass payload.
"""
return asdict(self)
[docs]
@dataclass(slots=True, frozen=True, kw_only=True)
class GraphEdgeRecord:
"""One graph edge stored in a graph-memory backend."""
source_id: str
"""Source node identifier."""
target_id: str
"""Target node identifier."""
relationship: str
"""Normalized relationship label."""
edge_id: str | None = None
"""Stable edge identifier; derived by stores when omitted."""
metadata: dict[str, object] = field(default_factory=dict)
"""Optional edge metadata fields."""
created_at: str | None = None
"""ISO timestamp for edge creation."""
updated_at: str | None = None
"""ISO timestamp for last edge update."""
[docs]
def to_dict(self) -> dict[str, Any]:
"""Return a JSON-serializable dictionary representation.
Returns:
Serialized dataclass payload.
"""
return asdict(self)
[docs]
@dataclass(slots=True, frozen=True, kw_only=True)
class GraphSearchQuery:
"""Structured graph-memory search query."""
text: str
"""Natural language query text."""
namespace: str = "default"
"""Namespace partition used for isolation."""
top_k: int = 3
"""Maximum number of seed nodes to retrieve before expansion."""
max_hops: int = 1
"""Maximum graph-traversal distance from matched seed nodes."""
min_score: float | None = None
"""Optional minimum score threshold for matched seed nodes."""
metadata_filters: dict[str, object] = field(default_factory=dict)
"""Exact-match metadata filters applied to candidate nodes."""
node_type_filters: tuple[str, ...] = ()
"""Optional allowed node types."""
relationship_filters: tuple[str, ...] = ()
"""Optional allowed relationship labels for returned edges."""
[docs]
def to_dict(self) -> dict[str, Any]:
"""Return a JSON-serializable dictionary representation.
Returns:
Serialized dataclass payload.
"""
return asdict(self)
[docs]
@dataclass(slots=True, frozen=True, kw_only=True)
class GraphSubgraphResult:
"""One retrieved graph subgraph."""
namespace: str
"""Namespace partition queried."""
query_text: str
"""Original query text."""
matched_node_ids: tuple[str, ...] = ()
"""Seed node ids that matched the text query before graph expansion."""
nodes: tuple[GraphNodeRecord, ...] = ()
"""Retrieved nodes, including traversal context."""
edges: tuple[GraphEdgeRecord, ...] = ()
"""Retrieved edges connecting included nodes."""
[docs]
def to_dict(self) -> dict[str, Any]:
"""Return a JSON-serializable dictionary representation.
Returns:
Serialized dataclass payload.
"""
return {
"namespace": self.namespace,
"query_text": self.query_text,
"matched_node_ids": list(self.matched_node_ids),
"nodes": [node.to_dict() for node in self.nodes],
"edges": [edge.to_dict() for edge in self.edges],
}
[docs]
class MemoryStore(Protocol):
"""Protocol implemented by memory stores used by workflows and agents."""
[docs]
def write(
self,
records: Sequence[MemoryWriteRecord],
*,
namespace: str = "default",
) -> list[MemoryRecord]:
"""Persist one or more records and return normalized stored entries.
Args:
records: Record payloads to persist.
namespace: Namespace partition to store records under.
Returns:
Stored records including ids and timestamps.
"""
[docs]
def search(self, query: MemorySearchQuery) -> list[MemoryRecord]:
"""Search memory records using lexical/vector relevance.
Args:
query: Structured memory search query.
Returns:
Ordered list of matching records.
"""
[docs]
def close(self) -> None:
"""Release any store-owned resources.
Implementations that do not own external resources may implement this
as a no-op so callers can use a uniform lifecycle pattern.
"""
return None
def __enter__(self) -> Self:
"""Return this store for use in a ``with`` statement."""
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc: BaseException | None,
tb: TracebackType | None,
) -> bool | None:
"""Close the store when exiting a ``with`` block."""
del exc_type, exc, tb
self.close()
return None
[docs]
class GraphMemoryStore(Protocol):
"""Protocol implemented by graph-memory stores used by agents and patterns."""
[docs]
def upsert_nodes(
self,
nodes: Sequence[GraphNodeRecord],
*,
namespace: str = "default",
) -> list[GraphNodeRecord]:
"""Persist graph nodes and return normalized stored entries.
Args:
nodes: Node payloads to persist.
namespace: Namespace partition to store nodes under.
Returns:
Stored nodes including timestamps.
"""
[docs]
def upsert_edges(
self,
edges: Sequence[GraphEdgeRecord],
*,
namespace: str = "default",
) -> list[GraphEdgeRecord]:
"""Persist graph edges and return normalized stored entries.
Args:
edges: Edge payloads to persist.
namespace: Namespace partition to store edges under.
Returns:
Stored edges including ids and timestamps.
"""
[docs]
def query_subgraph(self, query: GraphSearchQuery) -> GraphSubgraphResult:
"""Retrieve a relevant graph subgraph for one structured query.
Args:
query: Structured graph-memory search query.
Returns:
Retrieved subgraph result.
"""
[docs]
def close(self) -> None:
"""Release any store-owned resources."""
return None
def __enter__(self) -> Self:
"""Return this store for use in a ``with`` statement."""
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc: BaseException | None,
tb: TracebackType | None,
) -> bool | None:
"""Close the store when exiting a ``with`` block."""
del exc_type, exc, tb
self.close()
return None
__all__ = [
"GraphEdgeRecord",
"GraphMemoryStore",
"GraphNodeRecord",
"GraphSearchQuery",
"GraphSubgraphResult",
"MemoryRecord",
"MemorySearchQuery",
"MemoryStore",
"MemoryWriteRecord",
]