Spec 056: Failure Pattern Observability
Status: Implemented (PR #92, 2026-01-03) Priority: Medium Complexity: Medium Related: PIPELINE-BRITTLENESS.md, ANALYSIS-026
SSOT (Implemented)
- Code:
src/ai_psychiatrist/infrastructure/observability.py(FailureRegistry,record_failure()) - Wire-up:
scripts/reproduce_results.py(registry init + per-participant recording +failures_{run_id}.json) - Tests:
tests/unit/infrastructure/test_observability.py
Problem Statement
We don't systematically track failure patterns across runs. When something goes wrong, we discover it through:
- Manual log inspection
- Unexplained metric changes
- User reports
We need structured observability to: 1. Know failure rates by category 2. Identify patterns (e.g., "participant 373 always fails on evidence extraction") 3. Track improvement over time 4. Debug issues faster
Previous Behavior (Fixed)
Logging is ad-hoc:
- Some failures logged with logger.warning() or logger.error()
- No consistent taxonomy or structure
- No aggregation across runs
- No per-participant failure tracking
Implemented Solution
Implement a Failure Registry that: 1. Captures all failures with consistent structure 2. Aggregates by failure type 3. Persists to JSON for cross-run analysis 4. Integrates with existing logging
Privacy / Licensing Constraint (Non-Negotiable)
DAIC-WOZ transcripts are licensed and must not leak into logs or artifacts. The failure registry MUST: - Never store raw transcript text. - Never store raw LLM responses or evidence quote strings. - Only store counts, lengths, stable hashes, model ids, error codes, and stack-trace-free messages.
Examples of allowed context fields:
- response_hash, response_len
- transcript_hash, transcript_len
- text_hash, text_len (for embeddings)
- exception_type, http_status
Implementation
Failure Taxonomy
# New: src/ai_psychiatrist/infrastructure/observability.py
from enum import Enum
from dataclasses import dataclass, field
from datetime import datetime, UTC
from pathlib import Path
from typing import Any
import json
class FailureCategory(str, Enum):
"""Top-level failure categories."""
# Stage 1: Transcript
TRANSCRIPT_NOT_FOUND = "transcript_not_found"
TRANSCRIPT_PARSE_ERROR = "transcript_parse_error"
TRANSCRIPT_EMPTY = "transcript_empty"
# Stage 2: Evidence Extraction
EVIDENCE_JSON_PARSE = "evidence_json_parse"
EVIDENCE_SCHEMA_INVALID = "evidence_schema_invalid"
EVIDENCE_HALLUCINATION = "evidence_hallucination"
EVIDENCE_LLM_TIMEOUT = "evidence_llm_timeout"
# Stage 3: Embeddings
EMBEDDING_NAN = "embedding_nan"
EMBEDDING_DIMENSION_MISMATCH = "embedding_dimension_mismatch"
EMBEDDING_ZERO_VECTOR = "embedding_zero_vector"
EMBEDDING_TIMEOUT = "embedding_timeout"
# Stage 4: Reference Store
REFERENCE_ARTIFACT_MISSING = "reference_artifact_missing"
REFERENCE_ARTIFACT_CORRUPT = "reference_artifact_corrupt"
REFERENCE_TAG_MISMATCH = "reference_tag_mismatch"
# Stage 5: Scoring
SCORING_JSON_PARSE = "scoring_json_parse"
SCORING_SCHEMA_INVALID = "scoring_schema_invalid"
SCORING_LLM_TIMEOUT = "scoring_llm_timeout"
SCORING_PYDANTIC_RETRY_EXHAUSTED = "scoring_pydantic_retry_exhausted"
# Stage 6: Aggregation
AGGREGATION_MISSING_ITEMS = "aggregation_missing_items"
# Stage 7: Evaluation
GROUND_TRUTH_MISSING = "ground_truth_missing"
GROUND_TRUTH_INVALID = "ground_truth_invalid"
# Other
UNKNOWN = "unknown"
class FailureSeverity(str, Enum):
"""Failure severity levels."""
FATAL = "fatal" # Participant cannot be processed
ERROR = "error" # Significant issue, partial results possible
WARNING = "warning" # Minor issue, results may be degraded
INFO = "info" # Informational, no impact on results
@dataclass
class Failure:
"""Single failure event."""
category: FailureCategory
severity: FailureSeverity
message: str
participant_id: int | None = None
phq8_item: str | None = None # e.g., "PHQ8_Sleep"
stage: str | None = None # e.g., "evidence_extraction"
timestamp: str = field(default_factory=lambda: datetime.now(UTC).isoformat())
context: dict[str, Any] = field(default_factory=dict)
"""Additional privacy-safe context (never raw transcript/LLM text)."""
def to_dict(self) -> dict[str, Any]:
return {
"category": self.category.value,
"severity": self.severity.value,
"message": self.message,
"participant_id": self.participant_id,
"phq8_item": self.phq8_item,
"stage": self.stage,
"timestamp": self.timestamp,
"context": self.context,
}
Failure Registry
# Continue in observability.py
@dataclass
class FailureRegistry:
"""Collects and persists failure events for a run."""
run_id: str
failures: list[Failure] = field(default_factory=list)
_start_time: str = field(default_factory=lambda: datetime.now(UTC).isoformat())
def record(
self,
category: FailureCategory,
severity: FailureSeverity,
message: str,
*,
participant_id: int | None = None,
phq8_item: str | None = None,
stage: str | None = None,
**context: Any,
) -> None:
"""Record a failure event."""
failure = Failure(
category=category,
severity=severity,
message=message,
participant_id=participant_id,
phq8_item=phq8_item,
stage=stage,
context=context,
)
self.failures.append(failure)
# Also log for immediate visibility
from ai_psychiatrist.infrastructure.logging import get_logger
logger = get_logger("failure_registry")
log_method = {
FailureSeverity.FATAL: logger.error,
FailureSeverity.ERROR: logger.error,
FailureSeverity.WARNING: logger.warning,
FailureSeverity.INFO: logger.info,
}.get(severity, logger.warning)
log_method(
f"failure_{category.value}",
message=message,
participant_id=participant_id,
phq8_item=phq8_item,
stage=stage,
**context,
)
def summary(self) -> dict[str, Any]:
"""Generate summary statistics."""
by_category: dict[str, int] = {}
by_severity: dict[str, int] = {}
by_participant: dict[int, int] = {}
by_stage: dict[str, int] = {}
for f in self.failures:
by_category[f.category.value] = by_category.get(f.category.value, 0) + 1
by_severity[f.severity.value] = by_severity.get(f.severity.value, 0) + 1
if f.participant_id is not None:
by_participant[f.participant_id] = by_participant.get(f.participant_id, 0) + 1
if f.stage:
by_stage[f.stage] = by_stage.get(f.stage, 0) + 1
return {
"run_id": self.run_id,
"start_time": self._start_time,
"end_time": datetime.now(UTC).isoformat(),
"total_failures": len(self.failures),
"by_category": dict(sorted(by_category.items(), key=lambda x: -x[1])),
"by_severity": by_severity,
"by_participant": dict(sorted(by_participant.items(), key=lambda x: -x[1])[:10]), # Top 10
"by_stage": by_stage,
"fatal_count": by_severity.get("fatal", 0),
"error_count": by_severity.get("error", 0),
}
def save(self, output_dir: Path) -> Path:
"""Save failures to JSON file."""
output_dir.mkdir(parents=True, exist_ok=True)
filename = f"failures_{self.run_id}.json"
output_path = output_dir / filename
data = {
"summary": self.summary(),
"failures": [f.to_dict() for f in self.failures],
}
output_path.write_text(json.dumps(data, indent=2))
return output_path
def print_summary(self) -> None:
"""Print human-readable summary to stdout."""
summary = self.summary()
print("\n" + "=" * 60)
print("FAILURE SUMMARY")
print("=" * 60)
print(f"Run ID: {summary['run_id']}")
print(f"Total failures: {summary['total_failures']}")
print(f" Fatal: {summary['fatal_count']}")
print(f" Error: {summary['error_count']}")
if summary['by_category']:
print("\nBy Category:")
for cat, count in summary['by_category'].items():
print(f" {cat}: {count}")
if summary['by_stage']:
print("\nBy Stage:")
for stage, count in summary['by_stage'].items():
print(f" {stage}: {count}")
if summary['by_participant']:
print("\nMost Failing Participants:")
for pid, count in list(summary['by_participant'].items())[:5]:
print(f" Participant {pid}: {count} failures")
print("=" * 60 + "\n")
# Global registry instance (created per run)
_current_registry: FailureRegistry | None = None
def get_failure_registry() -> FailureRegistry:
"""Get the current failure registry."""
global _current_registry
if _current_registry is None:
raise RuntimeError("Failure registry not initialized. Call init_failure_registry() first.")
return _current_registry
def init_failure_registry(run_id: str) -> FailureRegistry:
"""Initialize a new failure registry for a run."""
global _current_registry
_current_registry = FailureRegistry(run_id=run_id)
return _current_registry
def record_failure(
category: FailureCategory,
severity: FailureSeverity,
message: str,
**kwargs: Any,
) -> None:
"""Convenience function to record a failure to the global registry."""
try:
registry = get_failure_registry()
registry.record(category, severity, message, **kwargs)
except RuntimeError:
# Registry not initialized - log directly instead
from ai_psychiatrist.infrastructure.logging import get_logger
logger = get_logger("failure_registry")
logger.warning(
"failure_registry_not_initialized",
category=category.value,
message=message,
**kwargs,
)
Integration Points
1) Per-participant failure capture (required)
The most robust/low-coupling integration point is the existing per-participant exception handler in
scripts/reproduce_results.py:evaluate_participant(). Record failures there so every participant failure
is captured even if it originates deep in the call stack.
# scripts/reproduce_results.py (inside evaluate_participant)
from ai_psychiatrist.infrastructure.observability import (
record_failure,
FailureCategory,
FailureSeverity,
)
def classify_failure(exc: Exception) -> tuple[FailureCategory, FailureSeverity, dict[str, object]]:
# Minimal, privacy-safe classification by exception type.
name = type(exc).__name__
if name == "UnexpectedModelBehavior":
return (FailureCategory.SCORING_PYDANTIC_RETRY_EXHAUSTED, FailureSeverity.FATAL, {})
if name in {"EmbeddingDimensionMismatchError", "EmbeddingArtifactMismatchError"}:
return (FailureCategory.EMBEDDING_DIMENSION_MISMATCH, FailureSeverity.FATAL, {})
if name in {"EmbeddingValidationError"}:
return (FailureCategory.EMBEDDING_NAN, FailureSeverity.FATAL, {})
return (FailureCategory.UNKNOWN, FailureSeverity.ERROR, {"exception_type": name})
# ...
except Exception as e:
category, severity, ctx = classify_failure(e)
record_failure(
category,
severity,
str(e),
participant_id=participant_id,
stage="evaluate_participant",
**ctx,
)
return EvaluationResult(
participant_id=participant_id,
mode=mode,
duration_seconds=duration,
success=False,
error=str(e),
)
2) Evidence extraction parse failures (optional, adds hash/length)
If you want extra observability for deterministic JSON failures, also record them at the source in
src/ai_psychiatrist/agents/quantitative.py:_extract_evidence() where the sanitized JSON string is available:
response_hash/response_len(never raw output)exception_type
3) Run initialization + persistence (required)
# scripts/reproduce_results.py (inside main_async; RunMetadata is already captured)
from ai_psychiatrist.infrastructure.observability import init_failure_registry
# Initialize at start of run (SSOT run id)
failure_registry = init_failure_registry(run_metadata.run_id)
# ... run evaluation ...
# At end of run
failure_registry.print_summary()
failure_path = failure_registry.save(Path("data/outputs"))
print(f"Failures saved to: {failure_path}")
Output Format
Summary (printed to console)
============================================================
FAILURE SUMMARY
============================================================
Run ID: 19b42478
Total failures: 12
Fatal: 3
Error: 7
By Category:
evidence_json_parse: 2
scoring_pydantic_retry_exhausted: 1
evidence_hallucination: 7
embedding_nan: 2
By Stage:
evidence_extraction: 9
scoring: 1
embedding_generation: 2
Most Failing Participants:
Participant 373: 3 failures
Participant 444: 2 failures
Participant 318: 1 failures
============================================================
Full JSON (saved to file)
{
"summary": {
"run_id": "19b42478",
"start_time": "2026-01-03T14:30:22.123456+00:00",
"end_time": "2026-01-03T16:45:33.789012+00:00",
"total_failures": 12,
"by_category": {
"evidence_hallucination": 7,
"evidence_json_parse": 2,
"embedding_nan": 2,
"scoring_pydantic_retry_exhausted": 1
},
"by_severity": {
"fatal": 3,
"error": 7,
"warning": 2
},
"by_participant": {
"373": 3,
"444": 2,
"318": 1
},
"by_stage": {
"evidence_extraction": 9,
"scoring": 1,
"embedding_generation": 2
},
"fatal_count": 3,
"error_count": 7
},
"failures": [
{
"category": "evidence_json_parse",
"severity": "fatal",
"message": "Evidence JSON parse failed: Expecting ',' delimiter",
"participant_id": 373,
"phq8_item": null,
"stage": "evidence_extraction",
"timestamp": "2026-01-03T14:35:12.456789+00:00",
"context": {
"response_hash": "4f1c2b6a19d0",
"response_len": 1289,
"exception_type": "JSONDecodeError"
}
}
]
}
Testing
# tests/unit/infrastructure/test_observability.py
import pytest
from ai_psychiatrist.infrastructure.observability import (
FailureRegistry,
FailureCategory,
FailureSeverity,
init_failure_registry,
record_failure,
)
def test_record_failure():
registry = FailureRegistry(run_id="test_run")
registry.record(
FailureCategory.EVIDENCE_JSON_PARSE,
FailureSeverity.FATAL,
"Test failure",
participant_id=300,
stage="evidence_extraction",
)
assert len(registry.failures) == 1
assert registry.failures[0].category == FailureCategory.EVIDENCE_JSON_PARSE
def test_summary_aggregation():
registry = FailureRegistry(run_id="test")
# Add multiple failures
for i in range(3):
registry.record(
FailureCategory.EVIDENCE_JSON_PARSE,
FailureSeverity.FATAL,
f"Failure {i}",
participant_id=300,
)
registry.record(
FailureCategory.EMBEDDING_NAN,
FailureSeverity.ERROR,
"NaN failure",
participant_id=301,
)
summary = registry.summary()
assert summary["total_failures"] == 4
assert summary["by_category"]["evidence_json_parse"] == 3
assert summary["by_category"]["embedding_nan"] == 1
assert summary["by_participant"][300] == 3
def test_save_and_load(tmp_path):
registry = FailureRegistry(run_id="test")
registry.record(
FailureCategory.SCORING_LLM_TIMEOUT,
FailureSeverity.FATAL,
"Timeout",
)
output_path = registry.save(tmp_path)
assert output_path.exists()
import json
data = json.loads(output_path.read_text())
assert data["summary"]["total_failures"] == 1
assert len(data["failures"]) == 1
def test_global_registry():
init_failure_registry("global_test")
record_failure(
FailureCategory.GROUND_TRUTH_MISSING,
FailureSeverity.FATAL,
"Missing ground truth",
participant_id=999,
)
from ai_psychiatrist.infrastructure.observability import get_failure_registry
registry = get_failure_registry()
assert len(registry.failures) == 1
Rollout Plan
- Phase 1: Implement FailureRegistry and taxonomy
- Phase 2: Integrate with evidence extraction failures
- Phase 3: Integrate with embedding failures
- Phase 4: Integrate with scoring failures
- Phase 5: Add to reproduction script with console summary
Success Criteria
- All failure types have a defined category
- Every fatal/error failure is recorded
- Summary printed at end of each run
- JSON file persisted for cross-run analysis
- No performance regression (registry is append-only)
Future Enhancements
- Dashboard: Web UI to visualize failure trends
- Alerting: Notify when failure rate exceeds threshold
- Cross-run comparison: Compare failure rates across runs
- Root cause analysis: Auto-detect correlated failures