Skip to content

Phase F Implementation Complete

Date: 2026-02-17 Branch: feature/sonnet-impl-20260217-155229 Status: ✅ Implemented (Testing Deferred)


Summary

Phase F implements observability and disaster recovery:

  • Structured Logging: JSON logs with request_id (already in Phase B!)
  • Metrics Collection: Prometheus-compatible metrics endpoint
  • Wallet Reconciliation: Nightly job to detect balance discrepancies
  • Reindex Scripts: Re-embed chunks with new models
  • Disaster Recovery: Weekly export of canonical chunks to blob store

What Was Implemented

S17: Structured Logging & Metrics

Already Implemented in Phase B: - app/core/logging.py: JSON formatter with request_id - Structured logs with timestamp, level, logger, message, request_id

New in Phase F: app/core/metrics.py

MetricsCollector: - In-memory metrics collection - Counters, histograms, and gauges - Prometheus-compatible exposition - JSON export for debugging

Helper Functions:

from app.core.metrics import (
    record_ingestion_job,
    record_pinecone_query,
    record_pinecone_upsert,
    record_openai_request,
    record_wallet_reservation,
    set_circuit_breaker_state,
    set_active_reservations,
    record_rate_limit_rejection
)

# Example usage
record_openai_request(
    model="gpt-4o",
    endpoint="chat",
    duration_seconds=2.5,
    tokens=350
)

Metrics Exposed:

Metric Type Labels Purpose
ingestion_job_status_total Counter status, language Track job outcomes
ingestion_job_duration_seconds Histogram status, language Performance monitoring
pinecone_query_duration_seconds Histogram namespace Vector search latency
pinecone_upsert_duration_seconds Histogram namespace Upsert latency
pinecone_vectors_upserted_total Counter namespace Vector count tracking
openai_request_duration_seconds Histogram model, endpoint LLM call latency
openai_tokens_used_total Counter model Cost tracking
wallet_reservation_total Counter status Billing flow health
circuit_breaker_state Gauge service 0=closed, 1=open, 2=half-open
active_reservations Gauge - Currently un-finalized reservations
rate_limit_rejected_total Counter scope, path Rate limit enforcement

Endpoints: app/api/routers/metrics.py - GET /metrics/prometheus: Prometheus text format - GET /metrics/json: JSON format for debugging

S18: Wallet Reconciliation Job

New: scripts/reconcile_wallets.py

Functionality: - Compares wallet.token_balance with SUM(wallet_ledger.delta) - Flags discrepancies without auto-correction - Saves discrepancy report to ARTIFACTS/wallet_discrepancies_TIMESTAMP.json - Exits with error code if discrepancies found (triggers alert)

SQL Logic:

SELECT
    w.user_id,
    w.token_balance AS current_balance,
    COALESCE(SUM(wl.delta), 0) AS ledger_sum,
    w.token_balance - COALESCE(SUM(wl.delta), 0) AS discrepancy
FROM wallet w
LEFT JOIN wallet_ledger wl ON w.user_id = wl.user_id
GROUP BY w.user_id, w.token_balance
HAVING w.token_balance != COALESCE(SUM(wl.delta), 0);

Cron Schedule:

# Run daily at 2 AM
0 2 * * * /path/to/venv/bin/python /path/to/scripts/reconcile_wallets.py >> /var/log/reconcile_wallets.log 2>&1

Alert Handling: - If discrepancies found: Exit code 1 (triggers monitoring alert) - Report saved with timestamp for manual investigation - No auto-correction (requires manual review)

S19: Reindex & Disaster Recovery

New: scripts/reindex.py

Reindex Functionality: - Re-embed all chunks with a new embedding model - Creates new Pinecone namespace (e.g., grade-12-math-v2) - Verifies vector count after upsert - Provides swap and cleanup instructions

Usage:

# Reindex single namespace
python scripts/reindex.py --namespace grade-12-math --model text-embedding-3-small

# Reindex all namespaces
python scripts/reindex.py --all --model text-embedding-3-large

# Export chunks without reindexing
python scripts/reindex.py --export-only --output ARTIFACTS/chunks_backup.ndjson

Reindex Flow: 1. Fetch all chunks for namespace from Postgres 2. Re-embed with new model in batches (100 chunks per batch) 3. Upsert to new namespace (e.g., namespace-v2) 4. Verify vector count 5. Manual steps: - Test search quality in new namespace - Update config to use new namespace - Delete old namespace

New: scripts/export_chunks.py

DR Export Functionality: - Exports all chunks from Postgres to NDJSON - Uploads to blob store (S3/GCS/Supabase Storage) - Weekly schedule for automated backups

Usage:

# Manual export
python scripts/export_chunks.py

# Output: ARTIFACTS/dr_export_chunks_YYYYMMDD_HHMMSS.ndjson

Cron Schedule:

# Run weekly on Sunday at 3 AM
0 3 * * 0 /path/to/venv/bin/python /path/to/scripts/export_chunks.py >> /var/log/export_chunks.log 2>&1

Recovery Procedure: 1. Import NDJSON file to Postgres 2. Re-embed with current model 3. Upsert to Pinecone 4. Verify vector counts

Reservation Expiry Job

New: scripts/expire_reservations.py

Functionality: - Runs continuously (every 60 seconds) - Finds reservations with status='reserved' and expires_at < now() - Marks as expired and refunds tokens to wallet - Logs ledger entry with reason 'reservation_expired'

Run as Service:

# Via systemd
sudo systemctl start bacmr-expire-reservations

# Or manually (development)
python scripts/expire_reservations.py

Supervisord Config (example):

[program:bacmr-expire-reservations]
command=/path/to/venv/bin/python /path/to/scripts/expire_reservations.py
directory=/path/to/BacMR
autostart=true
autorestart=true
stdout_logfile=/var/log/bacmr/expire_reservations.log
stderr_logfile=/var/log/bacmr/expire_reservations_error.log


Metrics Endpoints

GET /metrics/prometheus

Response (Prometheus text format):

# TYPE ingestion_job_status_total counter
ingestion_job_status_total{status="ready"} 42
ingestion_job_status_total{status="failed"} 3

# TYPE openai_tokens_used_total counter
openai_tokens_used_total{model="gpt-4o"} 125000
openai_tokens_used_total{model="gpt-4o-mini"} 15000

# TYPE circuit_breaker_state gauge
circuit_breaker_state{service="openai_mini"} 0
circuit_breaker_state{service="pinecone_query"} 0

# TYPE active_reservations gauge
active_reservations 5

GET /metrics/json

Response (JSON format):

{
  "timestamp": "2026-02-17T16:30:00Z",
  "counters": {
    "ingestion_job_status_total{status=\"ready\"}": 42,
    "ingestion_job_status_total{status=\"failed\"}": 3,
    "openai_tokens_used_total{model=\"gpt-4o\"}": 125000
  },
  "histograms": {
    "openai_request_duration_seconds{model=\"gpt-4o\"}": {
      "count": 150,
      "sum": 375.5,
      "min": 0.8,
      "max": 12.3,
      "mean": 2.5,
      "p50": 2.1,
      "p95": 5.6,
      "p99": 10.2
    }
  },
  "gauges": {
    "circuit_breaker_state{service=\"openai_mini\"}": 0,
    "active_reservations": 5
  }
}


Background Jobs

Cron Schedule

# Reservation expiry (run as systemd service)
# No cron needed - continuous service

# Wallet reconciliation (nightly)
0 2 * * * /path/to/venv/bin/python /path/to/scripts/reconcile_wallets.py

# Chunk export (weekly DR backup)
0 3 * * 0 /path/to/venv/bin/python /path/to/scripts/export_chunks.py

Job Monitoring

Reconcile Wallets: - Success: Exit code 0, no discrepancies - Failure: Exit code 1, discrepancies found, alert triggered - Report: ARTIFACTS/wallet_discrepancies_TIMESTAMP.json

Export Chunks: - Success: Export file created and uploaded to blob store - Failure: Log error, retry next week - Output: ARTIFACTS/dr_export_chunks_TIMESTAMP.ndjson

Expire Reservations: - Runs continuously (systemd service) - Logs expiry count every 60 seconds - Restarts automatically if crashes


Disaster Recovery Procedures

Scenario 1: Pinecone Data Loss

Recovery Steps: 1. Export chunks from Postgres: python scripts/reindex.py --export-only 2. Re-embed chunks: python scripts/reindex.py --all --model text-embedding-3-small 3. Verify vector counts in new namespaces 4. Update config to use new namespaces 5. Delete temporary export files

Recovery Time: ~2-4 hours for 10,000 chunks (depends on OpenAI rate limits)

Scenario 2: Postgres Data Loss

Recovery Steps: 1. Restore from Supabase automated backup (point-in-time recovery) 2. Alternative: Import from weekly export file (last Sunday's export) 3. Verify table integrity (chunks, wallet, reservations) 4. Reconcile wallets to verify balances

Recovery Time: ~10-30 minutes (Supabase automated restore)

Scenario 3: Blob Store (S3/GCS) Loss

Recovery Steps: 1. Re-download PDFs from original sources (scraper) 2. Re-ingest via ingestion pipeline 3. Verify chunks and vectors match Postgres records

Recovery Time: ~4-8 hours (depends on source availability)

Scenario 4: Complete System Loss

Recovery Steps: 1. Restore Postgres from Supabase backup 2. Restore blobs from S3/GCS backup (if available) 3. Re-embed from Postgres chunks table 4. Verify all subsystems operational 5. Run reconciliation jobs

Recovery Time: ~6-12 hours


Testing Checklist

Metrics Collection (S17)

  • [ ] Metrics endpoint returns Prometheus format
  • [ ] JSON endpoint returns structured metrics
  • [ ] Counters increment correctly
  • [ ] Histograms calculate p50, p95, p99
  • [ ] Gauges set to current values
  • [ ] Circuit breaker state reflected in metrics

Wallet Reconciliation (S18)

  • [ ] Job runs without errors when balances match
  • [ ] Detects discrepancies when balances mismatch
  • [ ] Report saved to ARTIFACTS/wallet_discrepancies_*.json
  • [ ] Exit code 1 when discrepancies found
  • [ ] No auto-correction applied

Reindex & DR (S19)

  • [ ] Export creates valid NDJSON file
  • [ ] Export uploaded to blob store
  • [ ] Reindex re-embeds all chunks
  • [ ] New namespace created with correct vector count
  • [ ] Verification passes (expected == actual vectors)

Reservation Expiry

  • [ ] Expiry job finds stale reservations (>5 min old)
  • [ ] Tokens refunded to wallet
  • [ ] Ledger entry created with reason 'reservation_expired'
  • [ ] Reservation status updated to 'expired'
  • [ ] Job runs continuously without crashes

Monitoring Integration

Prometheus Scrape Config

scrape_configs:
  - job_name: 'bacmr'
    static_configs:
      - targets: ['localhost:8000']
    metrics_path: '/metrics/prometheus'
    scrape_interval: 15s

Alert Rules (Prometheus)

groups:
  - name: bacmr_alerts
    rules:
      - alert: HighIngestionFailureRate
        expr: (
          sum(rate(ingestion_job_status_total{status="failed"}[1h]))
          /
          sum(rate(ingestion_job_status_total[1h]))
        ) > 0.2
        for: 5m
        annotations:
          summary: "High ingestion failure rate (>20% in last hour)"

      - alert: CircuitBreakerOpen
        expr: circuit_breaker_state > 0
        for: 1m
        annotations:
          summary: "Circuit breaker opened for {{ $labels.service }}"

      - alert: WalletDiscrepancy
        # Triggered by reconcile_wallets.py exit code
        annotations:
          summary: "Wallet balance discrepancies detected"

      - alert: HighReservationExpiryRate
        expr: (
          rate(wallet_reservation_total{status="expired"}[1h])
          /
          rate(wallet_reservation_total{status="reserved"}[1h])
        ) > 0.1
        for: 5m
        annotations:
          summary: "High reservation expiry rate (>10% in last hour)"

Background Job Status

Services Running

Job Type Schedule Status Logs
Expire Reservations Systemd service Continuous (60s loop) ✅ Implemented /var/log/bacmr/expire_reservations.log
Reconcile Wallets Cron Daily 2 AM ✅ Implemented /var/log/reconcile_wallets.log
Export Chunks (DR) Cron Weekly Sunday 3 AM ✅ Implemented /var/log/export_chunks.log

Service Management

Start expiry service:

# Systemd
sudo systemctl start bacmr-expire-reservations
sudo systemctl enable bacmr-expire-reservations

# Or run manually (development)
python scripts/expire_reservations.py

Monitor services:

# Check status
systemctl status bacmr-expire-reservations

# View logs
journalctl -u bacmr-expire-reservations -f

# Check cron logs
tail -f /var/log/reconcile_wallets.log


Reindex Examples

Example 1: Reindex Single Namespace

# Upgrade from text-embedding-3-small to text-embedding-3-large
python scripts/reindex.py \
  --namespace grade-12-math \
  --model text-embedding-3-large

# Output:
# Reindex Summary:
#   Old namespace: grade-12-math
#   New namespace: grade-12-math-v2
#   Chunks processed: 1,234
#   Vectors upserted: 1,234
#   Verified count: 1,234
#
# Next steps:
#   1. Verify search quality in grade-12-math-v2
#   2. Update PINECONE_NAMESPACE_DEFAULT to grade-12-math-v2
#   3. Delete old namespace: grade-12-math

Example 2: Export for Disaster Recovery

# Export all chunks
python scripts/export_chunks.py

# Output:
# Export complete: 5,678 chunks exported to ARTIFACTS/dr_export_chunks_20260217_150000.ndjson
# Upload complete: s3://bacmr-dr/chunks/dr_export_chunks_20260217_150000.ndjson

Example 3: Reindex All Namespaces

# Model upgrade for entire platform
python scripts/reindex.py --all --model text-embedding-3-large

# Output:
# Found 5 namespaces to reindex
# Reindexing namespace: grade-10-math
# Reindexing namespace: grade-11-math
# ...

Alert Severity Levels

Alert Severity Action Required
High ingestion failure rate (>20%) Critical Investigate logs, check OpenAI/Pinecone status
Wallet discrepancy detected Warning Manual investigation required, DO NOT auto-correct
Circuit breaker opened Critical Check external service status, verify fallback working
High reservation expiry rate (>10%) Warning Check LLM latency, investigate timeout issues
OpenAI latency spike (p99 >30s) Warning Monitor OpenAI status page
Pinecone latency spike (p99 >5s) Warning Check Pinecone status, consider index optimization
Stale reservations (>50 older than 5min) Warning Check expiry job is running
Rate limiting abuse (>20 rejects/5min) Warning Investigate user behavior, possible abuse

Observability Stack

Logs

  • Format: JSON with request_id
  • Storage: Stdout → Log aggregation (e.g., CloudWatch, Datadog)
  • Retention: 30 days for debugging
  • Correlation: Search by request_id to trace full request lifecycle

Metrics

  • Collection: In-memory MetricsCollector
  • Exposition: Prometheus text format at /metrics/prometheus
  • Scraping: Prometheus server scrapes every 15 seconds
  • Visualization: Grafana dashboards
  • Alerting: Prometheus Alertmanager

Tracing

  • Request-ID: UUID propagated across all subsystems
  • Spans: Not implemented (future: OpenTelemetry)

Next Steps

Production Deployment

  1. Setup Monitoring:
  2. Deploy Prometheus server
  3. Configure scrape targets
  4. Import alert rules
  5. Setup Grafana dashboards

  6. Configure Cron Jobs:

  7. Wallet reconciliation: Daily 2 AM
  8. Chunk export: Weekly Sunday 3 AM

  9. Setup Systemd Services:

  10. Reservation expiry service
  11. Restart policies

  12. Configure Alerts:

  13. Slack/email notifications
  14. PagerDuty integration for critical alerts

Optimization

  1. Migrate Caching to Redis (multi-instance deployments)
  2. Add OpenTelemetry Tracing (distributed tracing)
  3. Optimize Reconciliation Query (use RPC instead of client-side)
  4. Add Metrics Persistence (export to time-series DB)

Files Changed

New Files

  • scripts/reconcile_wallets.py - Nightly wallet reconciliation (S18)
  • scripts/expire_reservations.py - Continuous reservation expiry
  • scripts/reindex.py - Reindex script for model upgrades (S19)
  • scripts/export_chunks.py - Weekly DR export (S19)
  • app/core/metrics.py - Metrics collection (S17)
  • app/api/routers/metrics.py - Metrics endpoints

Status: ✅ Phase F Complete - Ready for Testing


See SONNET_RUN.md for full implementation log