"""Language analysis utilities for convergence, topics, and sentiment."""
from __future__ import annotations
import re
from collections.abc import Callable, Mapping, Sequence
from dataclasses import dataclass, field
from typing import Any, TypeGuard
import numpy as np
from ._comparison import ComparableResultMixin
from .sequence.embeddings import embed_text
from .table import coerce_unified_table, derive_columns
_LANG_IMPORT_ERROR = (
"Topic modeling requires optional language dependencies. "
"Install with `pip install design-research-analysis[lang]`."
)
_TOKEN_PATTERN = re.compile(r"[A-Za-z']+")
_POSITIVE_WORDS = {
"good",
"great",
"excellent",
"better",
"best",
"clear",
"helpful",
"improve",
"improved",
"improvement",
"success",
"successful",
"positive",
"creative",
"efficient",
"effective",
"confident",
"collaborative",
"useful",
"strong",
}
_NEGATIVE_WORDS = {
"bad",
"worse",
"worst",
"unclear",
"confusing",
"difficult",
"hard",
"frustrated",
"frustrating",
"error",
"errors",
"problem",
"problems",
"negative",
"stress",
"stressed",
"risky",
"risk",
"slow",
"weak",
}
@dataclass(slots=True)
class LanguageConvergenceResult(ComparableResultMixin):
"""Result container for language convergence/divergence analysis."""
groups: list[str]
distance_trajectories: dict[str, list[float]]
slope_by_group: dict[str, float]
direction_by_group: dict[str, str]
window_size: int
n_observations: int
config: dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> dict[str, Any]:
"""Convert to a JSON-serializable dictionary."""
return {
"groups": list(self.groups),
"distance_trajectories": {k: list(v) for k, v in self.distance_trajectories.items()},
"slope_by_group": {k: float(v) for k, v in self.slope_by_group.items()},
"direction_by_group": dict(self.direction_by_group),
"window_size": int(self.window_size),
"n_observations": int(self.n_observations),
"config": dict(self.config),
}
def _comparison_metric(self) -> str:
return "convergence_profile"
def _comparison_vectors(
self,
other: LanguageConvergenceResult,
) -> tuple[np.ndarray, np.ndarray, dict[str, Any]]:
groups = sorted(set(self.slope_by_group) | set(other.slope_by_group))
left_slopes = np.asarray(
[self.slope_by_group.get(group, 0.0) for group in groups], dtype=float
)
right_slopes = np.asarray(
[other.slope_by_group.get(group, 0.0) for group in groups],
dtype=float,
)
left_lengths = np.asarray(
[len(self.distance_trajectories.get(group, [])) for group in groups],
dtype=float,
)
right_lengths = np.asarray(
[len(other.distance_trajectories.get(group, [])) for group in groups],
dtype=float,
)
return (
np.concatenate([left_slopes, left_lengths]),
np.concatenate([right_slopes, right_lengths]),
{"groups": groups, "window_sizes": [self.window_size, other.window_size]},
)
def _is_blank(value: Any) -> bool:
return value is None or (isinstance(value, str) and value.strip() == "")
def _is_text_sequence(
data: Sequence[Mapping[str, Any]] | Sequence[str],
) -> TypeGuard[Sequence[str]]:
return len(data) > 0 and isinstance(data[0], str)
def _extract_text_rows(
data: Sequence[Mapping[str, Any]] | Sequence[str],
*,
text_column: str,
group_column: str,
text_mapper: Callable[[Mapping[str, Any]], Any] | None,
) -> tuple[list[str], list[str], int]:
if _is_text_sequence(data):
text_items = [str(item) for item in data]
return text_items, ["__all__"] * len(text_items), len(text_items)
rows = coerce_unified_table(data)
rows = derive_columns(rows, text_mapper=text_mapper)
texts: list[str] = []
groups: list[str] = []
for index, row in enumerate(rows):
text = row.get(text_column)
if _is_blank(text):
raise ValueError(
f"Row {index} is missing text in '{text_column}'. "
"Provide text values or a text mapper."
)
group = row.get(group_column)
texts.append(str(text))
groups.append("__all__" if _is_blank(group) else str(group))
return texts, groups, len(rows)
def _cosine_distance(a: np.ndarray, b: np.ndarray) -> float:
a_norm = float(np.linalg.norm(a))
b_norm = float(np.linalg.norm(b))
if a_norm == 0.0 or b_norm == 0.0:
return 0.0
similarity = float(np.dot(a, b) / (a_norm * b_norm))
similarity = max(-1.0, min(1.0, similarity))
return 1.0 - similarity
[docs]
def compute_semantic_distance_trajectory(
data: Sequence[Mapping[str, Any]] | Sequence[str],
*,
text_column: str = "text",
group_column: str = "session_id",
window_size: int = 3,
model_name: str = "all-MiniLM-L6-v2",
normalize: bool = True,
batch_size: int = 32,
device: str = "auto",
embedder: Callable[[Sequence[str]], np.ndarray] | None = None,
text_mapper: Callable[[Mapping[str, Any]], Any] | None = None,
) -> dict[str, list[float]]:
"""Compute semantic distance trajectories to a group's final language state.
Args:
data: Unified table rows or a simple text list.
text_column: Text column for table input.
group_column: Grouping column for trajectory computation.
window_size: Sliding window size for centroid estimation.
model_name: Sentence transformer model name when ``embedder`` is omitted.
normalize: Whether to normalize embeddings when using built-in embedding.
batch_size: Embedding batch size.
device: Embedding device.
embedder: Optional custom embedding function.
text_mapper: Optional mapper used to derive missing text values.
Returns:
Mapping of ``group -> [distance_t0, distance_t1, ...]``.
"""
if window_size <= 0:
raise ValueError("window_size must be positive.")
texts, groups, _ = _extract_text_rows(
data,
text_column=text_column,
group_column=group_column,
text_mapper=text_mapper,
)
if embedder is None:
embedded = embed_text(
texts,
model_name=model_name,
normalize=normalize,
batch_size=batch_size,
device=device,
)
else:
embedded = np.asarray(embedder(texts), dtype=float)
if embedded.ndim != 2:
raise ValueError("embedder must return a 2D embedding matrix.")
if embedded.shape[0] != len(texts):
raise ValueError("embedder output rows must match number of texts.")
by_group: dict[str, list[np.ndarray]] = {}
for group, vector in zip(groups, embedded, strict=True):
by_group.setdefault(group, []).append(np.asarray(vector, dtype=float))
trajectories: dict[str, list[float]] = {}
for group, vectors in by_group.items():
if len(vectors) < window_size:
trajectories[group] = []
continue
windows: list[np.ndarray] = []
for start in range(0, len(vectors) - window_size + 1):
window_matrix = np.vstack(vectors[start : start + window_size])
windows.append(np.mean(window_matrix, axis=0))
reference = windows[-1]
trajectories[group] = [_cosine_distance(window, reference) for window in windows]
return trajectories
[docs]
def compute_language_convergence(
data: Sequence[Mapping[str, Any]] | Sequence[str],
*,
text_column: str = "text",
group_column: str = "session_id",
window_size: int = 3,
slope_tolerance: float = 1e-6,
model_name: str = "all-MiniLM-L6-v2",
normalize: bool = True,
batch_size: int = 32,
device: str = "auto",
embedder: Callable[[Sequence[str]], np.ndarray] | None = None,
text_mapper: Callable[[Mapping[str, Any]], Any] | None = None,
) -> LanguageConvergenceResult:
"""Compute convergence/divergence of language trajectories by group.
Negative slope indicates convergence toward the final language centroid.
Positive slope indicates divergence.
"""
trajectories = compute_semantic_distance_trajectory(
data,
text_column=text_column,
group_column=group_column,
window_size=window_size,
model_name=model_name,
normalize=normalize,
batch_size=batch_size,
device=device,
embedder=embedder,
text_mapper=text_mapper,
)
slope_by_group: dict[str, float] = {}
direction_by_group: dict[str, str] = {}
for group, distances in trajectories.items():
if len(distances) < 2:
slope = 0.0
else:
x = np.arange(len(distances), dtype=float)
y = np.asarray(distances, dtype=float)
slope = float(np.polyfit(x, y, deg=1)[0])
if slope < -slope_tolerance:
direction = "converging"
elif slope > slope_tolerance:
direction = "diverging"
else:
direction = "stable"
slope_by_group[group] = slope
direction_by_group[group] = direction
n_observations = len(data) if _is_text_sequence(data) else len(coerce_unified_table(data))
groups = sorted(trajectories)
return LanguageConvergenceResult(
groups=groups,
distance_trajectories=trajectories,
slope_by_group=slope_by_group,
direction_by_group=direction_by_group,
window_size=window_size,
n_observations=n_observations,
config={
"text_column": text_column,
"group_column": group_column,
"window_size": int(window_size),
"model_name": model_name if embedder is None else "custom",
"normalized_embeddings": bool(normalize),
},
)
def _resolve_text_input(
data: Sequence[Mapping[str, Any]] | Sequence[str],
*,
text_column: str,
) -> list[str]:
if _is_text_sequence(data):
return [str(item) for item in data]
rows = coerce_unified_table(data)
texts: list[str] = []
for index, row in enumerate(rows):
value = row.get(text_column)
if _is_blank(value):
raise ValueError(f"Row {index} is missing text in '{text_column}'.")
texts.append(str(value))
return texts
[docs]
def fit_topic_model(
data: Sequence[Mapping[str, Any]] | Sequence[str],
*,
n_topics: int = 5,
max_features: int = 5000,
random_state: int = 0,
text_column: str = "text",
top_k_terms: int = 10,
) -> dict[str, Any]:
"""Fit an LDA topic model and return topic summaries.
Args:
data: Unified table rows or a list of texts.
n_topics: Number of latent topics.
max_features: Maximum vocabulary size.
random_state: Random seed.
text_column: Text column for table input.
top_k_terms: Number of representative terms per topic.
Returns:
JSON-serializable topic summary.
"""
if n_topics <= 0:
raise ValueError("n_topics must be positive.")
if max_features <= 0:
raise ValueError("max_features must be positive.")
if top_k_terms <= 0:
raise ValueError("top_k_terms must be positive.")
try:
from sklearn.decomposition import LatentDirichletAllocation
from sklearn.feature_extraction.text import CountVectorizer
except ImportError as exc:
raise ImportError(_LANG_IMPORT_ERROR) from exc
texts = _resolve_text_input(data, text_column=text_column)
if not texts:
raise ValueError("Topic modeling requires at least one text.")
vectorizer = CountVectorizer(max_features=max_features, stop_words="english")
matrix = vectorizer.fit_transform(texts)
lda = LatentDirichletAllocation(
n_components=n_topics,
random_state=random_state,
learning_method="batch",
)
doc_topic = lda.fit_transform(matrix)
terms = np.asarray(vectorizer.get_feature_names_out())
topic_terms: list[dict[str, Any]] = []
for topic_index, component in enumerate(lda.components_):
order = np.argsort(component)[::-1][:top_k_terms]
topic_terms.append(
{
"topic": int(topic_index),
"terms": [str(terms[idx]) for idx in order],
"weights": [float(component[idx]) for idx in order],
}
)
return {
"n_topics": int(n_topics),
"n_documents": len(texts),
"vocab_size": len(terms),
"doc_topic_distribution": doc_topic.tolist(),
"topic_terms": topic_terms,
"config": {
"max_features": int(max_features),
"random_state": int(random_state),
"text_column": text_column,
"top_k_terms": int(top_k_terms),
},
}
[docs]
def score_sentiment(
data: Sequence[Mapping[str, Any]] | Sequence[str],
*,
text_column: str = "text",
) -> dict[str, Any]:
"""Score sentiment with a deterministic lexicon-based approach.
This lightweight scorer is intentionally simple and offline-friendly.
"""
texts = _resolve_text_input(data, text_column=text_column)
scores: list[float] = []
labels: list[str] = []
for text in texts:
tokens = [token.lower() for token in _TOKEN_PATTERN.findall(text)]
if not tokens:
score = 0.0
else:
pos = sum(1 for token in tokens if token in _POSITIVE_WORDS)
neg = sum(1 for token in tokens if token in _NEGATIVE_WORDS)
score = float(pos - neg) / float(len(tokens))
if score > 0.03:
label = "positive"
elif score < -0.03:
label = "negative"
else:
label = "neutral"
scores.append(score)
labels.append(label)
scores_array = np.asarray(scores, dtype=float)
return {
"n_documents": len(texts),
"scores": scores,
"labels": labels,
"mean_score": float(np.mean(scores_array)) if len(scores_array) else 0.0,
"std_score": float(np.std(scores_array)) if len(scores_array) else 0.0,
"counts": {
"positive": int(sum(label == "positive" for label in labels)),
"neutral": int(sum(label == "neutral" for label in labels)),
"negative": int(sum(label == "negative" for label in labels)),
},
"config": {"text_column": text_column, "method": "lexicon"},
}
__all__ = [
"LanguageConvergenceResult",
"compute_language_convergence",
"compute_semantic_distance_trajectory",
"fit_topic_model",
"score_sentiment",
]