Phase F Implementation Complete
Date: 2026-02-17 Branch: feature/sonnet-impl-20260217-155229 Status: ✅ Implemented (Testing Deferred)
Summary
Phase F implements observability and disaster recovery:
- Structured Logging: JSON logs with request_id (already in Phase B!)
- Metrics Collection: Prometheus-compatible metrics endpoint
- Wallet Reconciliation: Nightly job to detect balance discrepancies
- Reindex Scripts: Re-embed chunks with new models
- Disaster Recovery: Weekly export of canonical chunks to blob store
What Was Implemented
S17: Structured Logging & Metrics
Already Implemented in Phase B:
- app/core/logging.py: JSON formatter with request_id
- Structured logs with timestamp, level, logger, message, request_id
New in Phase F: app/core/metrics.py
MetricsCollector: - In-memory metrics collection - Counters, histograms, and gauges - Prometheus-compatible exposition - JSON export for debugging
Helper Functions:
from app.core.metrics import (
record_ingestion_job,
record_pinecone_query,
record_pinecone_upsert,
record_openai_request,
record_wallet_reservation,
set_circuit_breaker_state,
set_active_reservations,
record_rate_limit_rejection
)
# Example usage
record_openai_request(
model="gpt-4o",
endpoint="chat",
duration_seconds=2.5,
tokens=350
)
Metrics Exposed:
| Metric | Type | Labels | Purpose |
|---|---|---|---|
ingestion_job_status_total |
Counter | status, language | Track job outcomes |
ingestion_job_duration_seconds |
Histogram | status, language | Performance monitoring |
pinecone_query_duration_seconds |
Histogram | namespace | Vector search latency |
pinecone_upsert_duration_seconds |
Histogram | namespace | Upsert latency |
pinecone_vectors_upserted_total |
Counter | namespace | Vector count tracking |
openai_request_duration_seconds |
Histogram | model, endpoint | LLM call latency |
openai_tokens_used_total |
Counter | model | Cost tracking |
wallet_reservation_total |
Counter | status | Billing flow health |
circuit_breaker_state |
Gauge | service | 0=closed, 1=open, 2=half-open |
active_reservations |
Gauge | - | Currently un-finalized reservations |
rate_limit_rejected_total |
Counter | scope, path | Rate limit enforcement |
Endpoints: app/api/routers/metrics.py
- GET /metrics/prometheus: Prometheus text format
- GET /metrics/json: JSON format for debugging
S18: Wallet Reconciliation Job
New: scripts/reconcile_wallets.py
Functionality:
- Compares wallet.token_balance with SUM(wallet_ledger.delta)
- Flags discrepancies without auto-correction
- Saves discrepancy report to ARTIFACTS/wallet_discrepancies_TIMESTAMP.json
- Exits with error code if discrepancies found (triggers alert)
SQL Logic:
SELECT
w.user_id,
w.token_balance AS current_balance,
COALESCE(SUM(wl.delta), 0) AS ledger_sum,
w.token_balance - COALESCE(SUM(wl.delta), 0) AS discrepancy
FROM wallet w
LEFT JOIN wallet_ledger wl ON w.user_id = wl.user_id
GROUP BY w.user_id, w.token_balance
HAVING w.token_balance != COALESCE(SUM(wl.delta), 0);
Cron Schedule:
# Run daily at 2 AM
0 2 * * * /path/to/venv/bin/python /path/to/scripts/reconcile_wallets.py >> /var/log/reconcile_wallets.log 2>&1
Alert Handling: - If discrepancies found: Exit code 1 (triggers monitoring alert) - Report saved with timestamp for manual investigation - No auto-correction (requires manual review)
S19: Reindex & Disaster Recovery
New: scripts/reindex.py
Reindex Functionality:
- Re-embed all chunks with a new embedding model
- Creates new Pinecone namespace (e.g., grade-12-math-v2)
- Verifies vector count after upsert
- Provides swap and cleanup instructions
Usage:
# Reindex single namespace
python scripts/reindex.py --namespace grade-12-math --model text-embedding-3-small
# Reindex all namespaces
python scripts/reindex.py --all --model text-embedding-3-large
# Export chunks without reindexing
python scripts/reindex.py --export-only --output ARTIFACTS/chunks_backup.ndjson
Reindex Flow:
1. Fetch all chunks for namespace from Postgres
2. Re-embed with new model in batches (100 chunks per batch)
3. Upsert to new namespace (e.g., namespace-v2)
4. Verify vector count
5. Manual steps:
- Test search quality in new namespace
- Update config to use new namespace
- Delete old namespace
New: scripts/export_chunks.py
DR Export Functionality: - Exports all chunks from Postgres to NDJSON - Uploads to blob store (S3/GCS/Supabase Storage) - Weekly schedule for automated backups
Usage:
# Manual export
python scripts/export_chunks.py
# Output: ARTIFACTS/dr_export_chunks_YYYYMMDD_HHMMSS.ndjson
Cron Schedule:
# Run weekly on Sunday at 3 AM
0 3 * * 0 /path/to/venv/bin/python /path/to/scripts/export_chunks.py >> /var/log/export_chunks.log 2>&1
Recovery Procedure: 1. Import NDJSON file to Postgres 2. Re-embed with current model 3. Upsert to Pinecone 4. Verify vector counts
Reservation Expiry Job
New: scripts/expire_reservations.py
Functionality:
- Runs continuously (every 60 seconds)
- Finds reservations with status='reserved' and expires_at < now()
- Marks as expired and refunds tokens to wallet
- Logs ledger entry with reason 'reservation_expired'
Run as Service:
# Via systemd
sudo systemctl start bacmr-expire-reservations
# Or manually (development)
python scripts/expire_reservations.py
Supervisord Config (example):
[program:bacmr-expire-reservations]
command=/path/to/venv/bin/python /path/to/scripts/expire_reservations.py
directory=/path/to/BacMR
autostart=true
autorestart=true
stdout_logfile=/var/log/bacmr/expire_reservations.log
stderr_logfile=/var/log/bacmr/expire_reservations_error.log
Metrics Endpoints
GET /metrics/prometheus
Response (Prometheus text format):
# TYPE ingestion_job_status_total counter
ingestion_job_status_total{status="ready"} 42
ingestion_job_status_total{status="failed"} 3
# TYPE openai_tokens_used_total counter
openai_tokens_used_total{model="gpt-4o"} 125000
openai_tokens_used_total{model="gpt-4o-mini"} 15000
# TYPE circuit_breaker_state gauge
circuit_breaker_state{service="openai_mini"} 0
circuit_breaker_state{service="pinecone_query"} 0
# TYPE active_reservations gauge
active_reservations 5
GET /metrics/json
Response (JSON format):
{
"timestamp": "2026-02-17T16:30:00Z",
"counters": {
"ingestion_job_status_total{status=\"ready\"}": 42,
"ingestion_job_status_total{status=\"failed\"}": 3,
"openai_tokens_used_total{model=\"gpt-4o\"}": 125000
},
"histograms": {
"openai_request_duration_seconds{model=\"gpt-4o\"}": {
"count": 150,
"sum": 375.5,
"min": 0.8,
"max": 12.3,
"mean": 2.5,
"p50": 2.1,
"p95": 5.6,
"p99": 10.2
}
},
"gauges": {
"circuit_breaker_state{service=\"openai_mini\"}": 0,
"active_reservations": 5
}
}
Background Jobs
Cron Schedule
# Reservation expiry (run as systemd service)
# No cron needed - continuous service
# Wallet reconciliation (nightly)
0 2 * * * /path/to/venv/bin/python /path/to/scripts/reconcile_wallets.py
# Chunk export (weekly DR backup)
0 3 * * 0 /path/to/venv/bin/python /path/to/scripts/export_chunks.py
Job Monitoring
Reconcile Wallets:
- Success: Exit code 0, no discrepancies
- Failure: Exit code 1, discrepancies found, alert triggered
- Report: ARTIFACTS/wallet_discrepancies_TIMESTAMP.json
Export Chunks:
- Success: Export file created and uploaded to blob store
- Failure: Log error, retry next week
- Output: ARTIFACTS/dr_export_chunks_TIMESTAMP.ndjson
Expire Reservations: - Runs continuously (systemd service) - Logs expiry count every 60 seconds - Restarts automatically if crashes
Disaster Recovery Procedures
Scenario 1: Pinecone Data Loss
Recovery Steps:
1. Export chunks from Postgres: python scripts/reindex.py --export-only
2. Re-embed chunks: python scripts/reindex.py --all --model text-embedding-3-small
3. Verify vector counts in new namespaces
4. Update config to use new namespaces
5. Delete temporary export files
Recovery Time: ~2-4 hours for 10,000 chunks (depends on OpenAI rate limits)
Scenario 2: Postgres Data Loss
Recovery Steps: 1. Restore from Supabase automated backup (point-in-time recovery) 2. Alternative: Import from weekly export file (last Sunday's export) 3. Verify table integrity (chunks, wallet, reservations) 4. Reconcile wallets to verify balances
Recovery Time: ~10-30 minutes (Supabase automated restore)
Scenario 3: Blob Store (S3/GCS) Loss
Recovery Steps: 1. Re-download PDFs from original sources (scraper) 2. Re-ingest via ingestion pipeline 3. Verify chunks and vectors match Postgres records
Recovery Time: ~4-8 hours (depends on source availability)
Scenario 4: Complete System Loss
Recovery Steps: 1. Restore Postgres from Supabase backup 2. Restore blobs from S3/GCS backup (if available) 3. Re-embed from Postgres chunks table 4. Verify all subsystems operational 5. Run reconciliation jobs
Recovery Time: ~6-12 hours
Testing Checklist
Metrics Collection (S17)
- [ ] Metrics endpoint returns Prometheus format
- [ ] JSON endpoint returns structured metrics
- [ ] Counters increment correctly
- [ ] Histograms calculate p50, p95, p99
- [ ] Gauges set to current values
- [ ] Circuit breaker state reflected in metrics
Wallet Reconciliation (S18)
- [ ] Job runs without errors when balances match
- [ ] Detects discrepancies when balances mismatch
- [ ] Report saved to ARTIFACTS/wallet_discrepancies_*.json
- [ ] Exit code 1 when discrepancies found
- [ ] No auto-correction applied
Reindex & DR (S19)
- [ ] Export creates valid NDJSON file
- [ ] Export uploaded to blob store
- [ ] Reindex re-embeds all chunks
- [ ] New namespace created with correct vector count
- [ ] Verification passes (expected == actual vectors)
Reservation Expiry
- [ ] Expiry job finds stale reservations (>5 min old)
- [ ] Tokens refunded to wallet
- [ ] Ledger entry created with reason 'reservation_expired'
- [ ] Reservation status updated to 'expired'
- [ ] Job runs continuously without crashes
Monitoring Integration
Prometheus Scrape Config
scrape_configs:
- job_name: 'bacmr'
static_configs:
- targets: ['localhost:8000']
metrics_path: '/metrics/prometheus'
scrape_interval: 15s
Alert Rules (Prometheus)
groups:
- name: bacmr_alerts
rules:
- alert: HighIngestionFailureRate
expr: (
sum(rate(ingestion_job_status_total{status="failed"}[1h]))
/
sum(rate(ingestion_job_status_total[1h]))
) > 0.2
for: 5m
annotations:
summary: "High ingestion failure rate (>20% in last hour)"
- alert: CircuitBreakerOpen
expr: circuit_breaker_state > 0
for: 1m
annotations:
summary: "Circuit breaker opened for {{ $labels.service }}"
- alert: WalletDiscrepancy
# Triggered by reconcile_wallets.py exit code
annotations:
summary: "Wallet balance discrepancies detected"
- alert: HighReservationExpiryRate
expr: (
rate(wallet_reservation_total{status="expired"}[1h])
/
rate(wallet_reservation_total{status="reserved"}[1h])
) > 0.1
for: 5m
annotations:
summary: "High reservation expiry rate (>10% in last hour)"
Background Job Status
Services Running
| Job | Type | Schedule | Status | Logs |
|---|---|---|---|---|
| Expire Reservations | Systemd service | Continuous (60s loop) | ✅ Implemented | /var/log/bacmr/expire_reservations.log |
| Reconcile Wallets | Cron | Daily 2 AM | ✅ Implemented | /var/log/reconcile_wallets.log |
| Export Chunks (DR) | Cron | Weekly Sunday 3 AM | ✅ Implemented | /var/log/export_chunks.log |
Service Management
Start expiry service:
# Systemd
sudo systemctl start bacmr-expire-reservations
sudo systemctl enable bacmr-expire-reservations
# Or run manually (development)
python scripts/expire_reservations.py
Monitor services:
# Check status
systemctl status bacmr-expire-reservations
# View logs
journalctl -u bacmr-expire-reservations -f
# Check cron logs
tail -f /var/log/reconcile_wallets.log
Reindex Examples
Example 1: Reindex Single Namespace
# Upgrade from text-embedding-3-small to text-embedding-3-large
python scripts/reindex.py \
--namespace grade-12-math \
--model text-embedding-3-large
# Output:
# Reindex Summary:
# Old namespace: grade-12-math
# New namespace: grade-12-math-v2
# Chunks processed: 1,234
# Vectors upserted: 1,234
# Verified count: 1,234
#
# Next steps:
# 1. Verify search quality in grade-12-math-v2
# 2. Update PINECONE_NAMESPACE_DEFAULT to grade-12-math-v2
# 3. Delete old namespace: grade-12-math
Example 2: Export for Disaster Recovery
# Export all chunks
python scripts/export_chunks.py
# Output:
# Export complete: 5,678 chunks exported to ARTIFACTS/dr_export_chunks_20260217_150000.ndjson
# Upload complete: s3://bacmr-dr/chunks/dr_export_chunks_20260217_150000.ndjson
Example 3: Reindex All Namespaces
# Model upgrade for entire platform
python scripts/reindex.py --all --model text-embedding-3-large
# Output:
# Found 5 namespaces to reindex
# Reindexing namespace: grade-10-math
# Reindexing namespace: grade-11-math
# ...
Alert Severity Levels
| Alert | Severity | Action Required |
|---|---|---|
| High ingestion failure rate (>20%) | Critical | Investigate logs, check OpenAI/Pinecone status |
| Wallet discrepancy detected | Warning | Manual investigation required, DO NOT auto-correct |
| Circuit breaker opened | Critical | Check external service status, verify fallback working |
| High reservation expiry rate (>10%) | Warning | Check LLM latency, investigate timeout issues |
| OpenAI latency spike (p99 >30s) | Warning | Monitor OpenAI status page |
| Pinecone latency spike (p99 >5s) | Warning | Check Pinecone status, consider index optimization |
| Stale reservations (>50 older than 5min) | Warning | Check expiry job is running |
| Rate limiting abuse (>20 rejects/5min) | Warning | Investigate user behavior, possible abuse |
Observability Stack
Logs
- Format: JSON with request_id
- Storage: Stdout → Log aggregation (e.g., CloudWatch, Datadog)
- Retention: 30 days for debugging
- Correlation: Search by request_id to trace full request lifecycle
Metrics
- Collection: In-memory MetricsCollector
- Exposition: Prometheus text format at
/metrics/prometheus - Scraping: Prometheus server scrapes every 15 seconds
- Visualization: Grafana dashboards
- Alerting: Prometheus Alertmanager
Tracing
- Request-ID: UUID propagated across all subsystems
- Spans: Not implemented (future: OpenTelemetry)
Next Steps
Production Deployment
- Setup Monitoring:
- Deploy Prometheus server
- Configure scrape targets
- Import alert rules
-
Setup Grafana dashboards
-
Configure Cron Jobs:
- Wallet reconciliation: Daily 2 AM
-
Chunk export: Weekly Sunday 3 AM
-
Setup Systemd Services:
- Reservation expiry service
-
Restart policies
-
Configure Alerts:
- Slack/email notifications
- PagerDuty integration for critical alerts
Optimization
- Migrate Caching to Redis (multi-instance deployments)
- Add OpenTelemetry Tracing (distributed tracing)
- Optimize Reconciliation Query (use RPC instead of client-side)
- Add Metrics Persistence (export to time-series DB)
Files Changed
New Files
scripts/reconcile_wallets.py- Nightly wallet reconciliation (S18)scripts/expire_reservations.py- Continuous reservation expiryscripts/reindex.py- Reindex script for model upgrades (S19)scripts/export_chunks.py- Weekly DR export (S19)app/core/metrics.py- Metrics collection (S17)app/api/routers/metrics.py- Metrics endpoints
Status: ✅ Phase F Complete - Ready for Testing
See SONNET_RUN.md for full implementation log