Source code for design_research_analysis.table

"""Unified table contracts for design-research analysis workflows."""

from __future__ import annotations

import csv
import json
from collections.abc import Callable, Iterable, Mapping, Sequence
from dataclasses import dataclass
from datetime import UTC, datetime
from os import PathLike
from pathlib import Path
from typing import Any

Row = dict[str, Any]
MapperFn = Callable[[Mapping[str, Any]], Any]

_RECOMMENDED_COLUMNS = ("record_id", "text", "session_id", "actor_id", "event_type")
_OPTIONAL_COLUMNS = ("meta_json",)


[docs] @dataclass(frozen=True, slots=True) class UnifiedTableConfig: """Configuration for coercing and validating a unified table. Args: required_columns: Columns that must be present. recommended_columns: Columns that are strongly encouraged. optional_columns: Common optional fields documented by the package. timestamp_column: Name of the canonical timestamp column. parse_timestamps: Whether to parse timestamp values into ``datetime`` objects. sort_by_timestamp: Whether to return rows sorted by timestamp. allow_extra_columns: Whether columns outside known sets are allowed. """ required_columns: tuple[str, ...] = ("timestamp",) recommended_columns: tuple[str, ...] = _RECOMMENDED_COLUMNS optional_columns: tuple[str, ...] = _OPTIONAL_COLUMNS timestamp_column: str = "timestamp" parse_timestamps: bool = True sort_by_timestamp: bool = True allow_extra_columns: bool = True
[docs] def known_columns(self) -> set[str]: """Return the known column names implied by this configuration.""" return ( set(self.required_columns) | set(self.recommended_columns) | set(self.optional_columns) )
[docs] @dataclass(frozen=True, slots=True) class UnifiedTableValidationReport: """Validation report for a unified table. Args: is_valid: Whether validation passed. n_rows: Number of rows observed. columns: Ordered columns found in the table. missing_required: Required columns missing from the table. missing_recommended: Recommended columns missing from the table. errors: Validation errors. warnings: Validation warnings. """ is_valid: bool n_rows: int columns: tuple[str, ...] missing_required: tuple[str, ...] missing_recommended: tuple[str, ...] errors: tuple[str, ...] warnings: tuple[str, ...]
[docs] def to_dict(self) -> dict[str, Any]: """Return a JSON-serializable representation of the report.""" return { "is_valid": self.is_valid, "n_rows": int(self.n_rows), "columns": list(self.columns), "missing_required": list(self.missing_required), "missing_recommended": list(self.missing_recommended), "errors": list(self.errors), "warnings": list(self.warnings), }
def _is_blank(value: Any) -> bool: if value is None: return True return isinstance(value, str) and value.strip() == "" def _parse_timestamp_value(value: Any, *, column: str) -> datetime: """Parse a timestamp value into a timezone-aware ``datetime``. Args: value: Raw timestamp value. column: Name of the timestamp column for error messages. Returns: Parsed ``datetime``. Raises: ValueError: If parsing fails. """ if isinstance(value, datetime): dt = value elif isinstance(value, (int, float)): dt = datetime.fromtimestamp(float(value), tz=UTC) elif isinstance(value, str): text = value.strip() if text == "": raise ValueError(f"{column} contains an empty timestamp string.") if text.endswith("Z"): text = f"{text[:-1]}+00:00" try: dt = datetime.fromisoformat(text) except ValueError as exc: raise ValueError(f"{column} has an invalid ISO timestamp: {value!r}.") from exc else: raise ValueError(f"{column} has an unsupported timestamp type: {type(value)!r}.") if dt.tzinfo is None: dt = dt.replace(tzinfo=UTC) return dt def _columnar_to_rows(data: Mapping[str, Sequence[Any]]) -> list[Row]: lengths = {len(values) for values in data.values()} if not lengths: return [] if len(lengths) != 1: raise ValueError("Columnar input must provide equally sized value arrays.") size = lengths.pop() rows: list[Row] = [] for idx in range(size): rows.append({column: values[idx] for column, values in data.items()}) return rows def _load_rows_from_path(path_input: str | PathLike[str]) -> list[Row]: path = Path(path_input) suffix = path.suffix.lower() if suffix == ".json": payload = json.loads(path.read_text(encoding="utf-8")) return _rows_from_data(payload) if suffix in {".csv", ".tsv"}: delimiter = "," if suffix == ".csv" else "\t" with path.open("r", encoding="utf-8", newline="") as handle: reader = csv.DictReader(handle, delimiter=delimiter) return [dict(row) for row in reader] raise ValueError("Unsupported table input path. Use .csv, .tsv, or .json.") def _rows_from_data(data: Any) -> list[Row]: if isinstance(data, (str, PathLike)): return _load_rows_from_path(data) if isinstance(data, Mapping): if data and all( isinstance(value, Sequence) and not isinstance(value, (str, bytes)) for value in data.values() ): return _columnar_to_rows(data) raise ValueError( "Mapping input must be columnar (column name -> sequence of values) " "to coerce a unified table." ) if isinstance(data, Sequence) and not isinstance(data, (str, bytes)): rows: list[Row] = [] for index, row in enumerate(data): if not isinstance(row, Mapping): raise ValueError( f"Row input must be mapping-like. Found {type(row)!r} at index {index}." ) rows.append(dict(row)) return rows raise ValueError( "Unsupported table input. Provide row-oriented data (sequence of mappings) " "or columnar data (mapping of column -> sequence)." ) def _stable_columns(rows: Sequence[Mapping[str, Any]]) -> tuple[str, ...]: seen: set[str] = set() ordered: list[str] = [] for row in rows: for column in row: if column not in seen: seen.add(column) ordered.append(column) return tuple(ordered)
[docs] def coerce_unified_table( data: Any, *, config: UnifiedTableConfig | None = None, ) -> list[Row]: """Coerce input data to normalized row-oriented unified table records. Args: data: Row-oriented sequence of mappings, column-oriented mapping, or a ``.csv``, ``.tsv``, or ``.json`` path. config: Optional table configuration. Returns: Normalized table rows. """ resolved_config = config or UnifiedTableConfig() rows = _rows_from_data(data) if not rows: return [] normalized: list[Row] = [dict(row) for row in rows] if resolved_config.parse_timestamps: ts_column = resolved_config.timestamp_column for index, row in enumerate(normalized): if ts_column in row and not _is_blank(row[ts_column]): try: row[ts_column] = _parse_timestamp_value(row[ts_column], column=ts_column) except ValueError as exc: raise ValueError(f"Failed to parse timestamp at row {index}: {exc}") from exc if resolved_config.sort_by_timestamp and resolved_config.timestamp_column in _stable_columns( normalized ): ts_column = resolved_config.timestamp_column decorated: list[tuple[int, datetime, Row]] = [] for index, row in enumerate(normalized): raw = row.get(ts_column) if _is_blank(raw): continue if isinstance(raw, datetime): dt = raw else: dt = _parse_timestamp_value(raw, column=ts_column) row[ts_column] = dt decorated.append((index, dt, row)) if decorated: present_rows = {id(item[2]) for item in decorated} sorted_rows = [ row for _, _, row in sorted(decorated, key=lambda item: (item[1], item[0])) ] trailing = [row for row in normalized if id(row) not in present_rows] normalized = sorted_rows + trailing return normalized
[docs] def validate_unified_table( table: Any, *, config: UnifiedTableConfig | None = None, ) -> UnifiedTableValidationReport: """Validate a unified table against the configured contract. Args: table: Coerced unified table rows or a supported path-like input. config: Optional table configuration. Returns: Validation report with errors and warnings. """ resolved_config = config or UnifiedTableConfig() rows = [dict(row) for row in _rows_from_data(table)] columns = _stable_columns(rows) column_set = set(columns) errors: list[str] = [] warnings: list[str] = [] if not rows: errors.append("Unified table must contain at least one row.") missing_required = tuple( sorted(column for column in resolved_config.required_columns if column not in column_set) ) if missing_required: errors.append(f"Missing required columns: {', '.join(missing_required)}.") missing_recommended = tuple( sorted(column for column in resolved_config.recommended_columns if column not in column_set) ) if missing_recommended: warnings.append(f"Missing recommended columns: {', '.join(missing_recommended)}.") if not resolved_config.allow_extra_columns: extra = sorted( column for column in column_set if column not in resolved_config.known_columns() ) if extra: errors.append(f"Unexpected columns: {', '.join(extra)}.") ts_column = resolved_config.timestamp_column if ts_column in column_set: for index, row in enumerate(rows): value = row.get(ts_column) if _is_blank(value): errors.append(f"Row {index} has blank timestamp in '{ts_column}'.") continue if resolved_config.parse_timestamps: try: _parse_timestamp_value(value, column=ts_column) except ValueError as exc: errors.append(f"Row {index} timestamp is invalid: {exc}") is_valid = len(errors) == 0 return UnifiedTableValidationReport( is_valid=is_valid, n_rows=len(rows), columns=columns, missing_required=missing_required, missing_recommended=missing_recommended, errors=tuple(errors), warnings=tuple(warnings), )
[docs] def derive_columns( table: Sequence[Mapping[str, Any]], *, actor_mapper: MapperFn | None = None, event_mapper: MapperFn | None = None, session_mapper: MapperFn | None = None, text_mapper: MapperFn | None = None, record_id_mapper: MapperFn | None = None, ) -> list[Row]: """Derive canonical columns from deterministic user-provided mappers. Existing non-blank values are preserved. Mappers are only applied to blank or missing values. ``record_id`` defaults to the row index if not provided. Args: table: Unified table rows. actor_mapper: Optional mapper for ``actor_id``. event_mapper: Optional mapper for ``event_type``. session_mapper: Optional mapper for ``session_id``. text_mapper: Optional mapper for ``text``. record_id_mapper: Optional mapper for ``record_id``. Returns: New rows with derived columns. """ derived: list[Row] = [] for index, source in enumerate(table): row = dict(source) if _is_blank(row.get("record_id")): if record_id_mapper is not None: row["record_id"] = record_id_mapper(source) else: row["record_id"] = str(index) if _is_blank(row.get("session_id")) and session_mapper is not None: row["session_id"] = session_mapper(source) if _is_blank(row.get("actor_id")) and actor_mapper is not None: row["actor_id"] = actor_mapper(source) if _is_blank(row.get("event_type")) and event_mapper is not None: row["event_type"] = event_mapper(source) if _is_blank(row.get("text")) and text_mapper is not None: row["text"] = text_mapper(source) derived.append(row) return derived
def group_rows( table: Sequence[Mapping[str, Any]], *, key_column: str, ) -> list[list[Row]]: """Group rows by ``key_column`` while preserving in-group order. Args: table: Unified table rows. key_column: Column used for grouping. Returns: List of grouped row lists sorted by key string representation. """ grouped: dict[str, list[Row]] = {} for row in table: key_raw = row.get(key_column) key = "__missing__" if _is_blank(key_raw) else str(key_raw) grouped.setdefault(key, []).append(dict(row)) return [grouped[key] for key in sorted(grouped)] def select_column(table: Sequence[Mapping[str, Any]], column: str) -> list[Any]: """Collect a single column from unified table rows.""" return [row.get(column) for row in table] def iter_non_blank(values: Iterable[Any]) -> Iterable[Any]: """Yield non-blank values from an iterable.""" for value in values: if not _is_blank(value): yield value __all__ = [ "MapperFn", "UnifiedTableConfig", "UnifiedTableValidationReport", "coerce_unified_table", "derive_columns", "group_rows", "iter_non_blank", "select_column", "validate_unified_table", ]