Skip to main content

hypercall_admin/monitoring/
integrity.rs

1//! System integrity admin endpoints (margin engine + orderbook).
2
3use axum::{extract::State, http::StatusCode, response::IntoResponse};
4use rust_decimal::Decimal;
5use rust_decimal_macros::dec;
6use serde::Serialize;
7use sonic_rs::json;
8use std::collections::HashMap;
9use tracing::warn;
10
11use crate::state::AdminState;
12use hypercall_runtime_api::sonic_json::SonicJson;
13use hypercall_types::perp_underlying;
14
15/// Detailed integrity report for the margin engine.
16#[derive(Debug, Serialize, utoipa::ToSchema)]
17pub struct IntegrityReport {
18    /// ISO 8601 timestamp of when the report was generated
19    pub timestamp: String,
20    /// Overall system health status
21    pub overall_status: IntegrityStatus,
22    /// Individual integrity checks performed
23    pub checks: Vec<IntegrityCheck>,
24    /// Summary of ledger balances
25    pub ledger_summary: LedgerSummary,
26    /// Summary of open positions
27    pub position_summary: PositionSummary,
28    /// Summary of settlements
29    pub settlement_summary: SettlementSummary,
30}
31
32/// System health status level
33#[derive(Debug, Clone, Copy, Serialize, PartialEq, utoipa::ToSchema)]
34#[serde(rename_all = "UPPERCASE")]
35pub enum IntegrityStatus {
36    /// All systems operating normally
37    Healthy,
38    /// Minor issues detected, monitoring recommended
39    Warning,
40    /// Significant issues detected, investigation required
41    Critical,
42}
43
44/// Individual integrity check result
45#[derive(Debug, Serialize, utoipa::ToSchema)]
46pub struct IntegrityCheck {
47    /// Name of the check performed
48    pub name: String,
49    /// Status result of the check
50    pub status: IntegrityStatus,
51    /// Human-readable description of the result
52    pub message: String,
53    /// Additional details (varies by check type)
54    #[schema(value_type = Option<Object>)]
55    pub details: Option<sonic_rs::Value>,
56}
57
58/// Summary of ledger balances across all accounts
59#[derive(Debug, Serialize, utoipa::ToSchema)]
60pub struct LedgerSummary {
61    /// Total DB-projected balance across all accounts (USD), unavailable when engine-owned.
62    pub total_balance: Option<String>,
63    /// Current-balance authority for this report.
64    pub authority_state: String,
65    /// Number of accounts with balances
66    pub account_count: i64,
67    /// Total number of fills processed
68    pub total_fills: i64,
69    /// Total trading volume (USD)
70    pub total_volume: String,
71}
72
73/// Summary of open positions across all accounts
74#[derive(Debug, Serialize, utoipa::ToSchema)]
75pub struct PositionSummary {
76    /// Total open interest across all instruments
77    pub total_open_interest: String,
78    /// Number of unique instruments with positions
79    pub unique_instruments: i64,
80    /// List of symbols with position imbalances (should be empty)
81    pub net_position_imbalances: Vec<PositionImbalance>,
82}
83
84/// Position imbalance details for a symbol
85#[derive(Debug, Clone, Serialize, PartialEq, utoipa::ToSchema)]
86pub struct PositionImbalance {
87    /// Option symbol
88    pub symbol: String,
89    /// Net position (should be zero for balanced book)
90    pub net_position: String,
91    /// Severity of the imbalance
92    pub severity: IntegrityStatus,
93}
94
95#[derive(Debug, Clone, Serialize, PartialEq)]
96struct PerpExposure {
97    symbol: String,
98    net_position: String,
99}
100
101#[derive(Debug, Clone, PartialEq)]
102struct PositionBalanceEvaluation {
103    imbalances: Vec<PositionImbalance>,
104    excluded_perp_exposures: Vec<PerpExposure>,
105    total_imbalance: Decimal,
106    status: IntegrityStatus,
107}
108
109fn position_balance_severity(abs_net: Decimal) -> IntegrityStatus {
110    if abs_net > dec!(0.01) {
111        IntegrityStatus::Critical
112    } else if abs_net > dec!(0.0001) {
113        IntegrityStatus::Warning
114    } else {
115        IntegrityStatus::Healthy
116    }
117}
118
119fn evaluate_position_balance(
120    net_positions_by_symbol: &HashMap<String, Decimal>,
121) -> PositionBalanceEvaluation {
122    let mut imbalances = Vec::new();
123    let mut excluded_perp_exposures = Vec::new();
124    let mut total_imbalance = dec!(0);
125
126    for (symbol, net) in net_positions_by_symbol {
127        if perp_underlying(symbol).is_some() {
128            if *net != Decimal::ZERO {
129                excluded_perp_exposures.push(PerpExposure {
130                    symbol: symbol.clone(),
131                    net_position: net.to_string(),
132                });
133            }
134            continue;
135        }
136
137        let abs_net = net.abs();
138        total_imbalance += abs_net;
139        let severity = position_balance_severity(abs_net);
140
141        if severity != IntegrityStatus::Healthy {
142            imbalances.push(PositionImbalance {
143                symbol: symbol.clone(),
144                net_position: net.to_string(),
145                severity,
146            });
147        }
148    }
149
150    imbalances.sort_by(|a, b| a.symbol.cmp(&b.symbol));
151    excluded_perp_exposures.sort_by(|a, b| a.symbol.cmp(&b.symbol));
152
153    PositionBalanceEvaluation {
154        imbalances,
155        excluded_perp_exposures,
156        total_imbalance,
157        status: position_balance_severity(total_imbalance),
158    }
159}
160
161/// Summary of settlement processing
162#[derive(Debug, Serialize, utoipa::ToSchema)]
163pub struct SettlementSummary {
164    /// Total number of settlements
165    pub total: i64,
166    /// Number of settlements applied
167    pub applied: i64,
168    /// Number of settlements pending
169    pub pending: i64,
170    /// Total value of settlements (USD)
171    pub total_value: String,
172}
173
174/// GET /monitoring/integrity - Comprehensive integrity check
175///
176/// Returns detailed integrity report including:
177/// - Position balance verification (longs should equal shorts)
178/// - Ledger totals
179/// - Settlement status
180/// - Any detected anomalies
181///
182/// **Authentication**: Requires `X-Admin-Key` header (performance protection).
183/// These endpoints execute synchronous database queries.
184#[utoipa::path(
185    get,
186    path = "/monitoring/integrity",
187    responses(
188        (status = 200, description = "Integrity report", body = IntegrityReport),
189        (status = 401, description = "Invalid or missing X-Admin-Key header")
190    ),
191    tag = "Monitoring",
192    security(("admin_key" = []))
193)]
194pub async fn integrity_check(State(app_state): State<AdminState>) -> impl IntoResponse {
195    let timestamp = chrono::Utc::now().to_rfc3339();
196    let mut checks = Vec::new();
197    let mut overall_status = IntegrityStatus::Healthy;
198
199    // Get sync DB handler (early-return if not configured; reads go through async db)
200    let _db_handler = match app_state.sync_db.as_ref() {
201        Some(h) => h.clone(),
202        None => {
203            return (
204                StatusCode::SERVICE_UNAVAILABLE,
205                SonicJson(json!({
206                    "error": "Database handler not available"
207                })),
208            )
209                .into_response();
210        }
211    };
212
213    // Get position data from portfolio cache (not DB - positions table doesn't exist)
214    let portfolios = app_state.portfolio_cache.get_all_portfolios().await;
215
216    // Calculate net positions by symbol from cache
217    let mut net_positions_by_symbol: std::collections::HashMap<String, Decimal> =
218        std::collections::HashMap::new();
219    let mut oi_by_underlying: std::collections::HashMap<String, Decimal> =
220        std::collections::HashMap::new();
221
222    for summary in portfolios.values() {
223        for (symbol, pos) in &summary.positions {
224            // Net position for zero-sum check
225            *net_positions_by_symbol.entry(symbol.clone()).or_default() += pos.amount;
226
227            // Open interest by underlying (sum of absolute positions)
228            if let Some(underlying) = symbol.split('-').next() {
229                *oi_by_underlying.entry(underlying.to_string()).or_default() += pos.amount.abs();
230            }
231        }
232    }
233
234    // Run integrity checks via async persistence pool.
235    let integrity: &dyn hypercall_db::IntegrityReader = app_state.db.as_ref();
236    let db_results = integrity.get_integrity_query_results().await;
237
238    let engine_digest = app_state.engine_state_digest_provider.engine_state_digest();
239    let fill_volume_result = db_results.fill_volume;
240    let settlement_result = db_results.settlement_stats;
241
242    // ===== Check 0: Database Connectivity =====
243    // Track DB query failures - don't silently zero them out (fail loudly principle)
244    let mut db_errors: Vec<String> = Vec::new();
245
246    if let Err(e) = &fill_volume_result {
247        db_errors.push(format!("fill_volume: {}", e));
248    }
249    if let Err(e) = &settlement_result {
250        db_errors.push(format!("settlement_stats: {}", e));
251    }
252
253    if !db_errors.is_empty() {
254        overall_status = IntegrityStatus::Critical;
255        checks.push(IntegrityCheck {
256            name: "Database Connectivity".to_string(),
257            status: IntegrityStatus::Critical,
258            message: format!("{} database queries failed", db_errors.len()),
259            details: Some(json!({ "failed_queries": db_errors })),
260        });
261        warn!("INTEGRITY CHECK: Database queries failed: {:?}", db_errors);
262    }
263
264    // ===== Check 1: Position Balance (Zero-Sum) =====
265    //
266    // HyperCore perp positions are external venue exposure imported into the
267    // portfolio cache for margin. They are not expected to net to zero inside
268    // Hypercall, so the matching-engine zero-sum invariant only applies to
269    // option symbols.
270    let position_balance = evaluate_position_balance(&net_positions_by_symbol);
271    let position_check_status = position_balance.status;
272
273    if position_check_status == IntegrityStatus::Critical {
274        overall_status = IntegrityStatus::Critical;
275    } else if position_check_status == IntegrityStatus::Warning
276        && overall_status != IntegrityStatus::Critical
277    {
278        overall_status = IntegrityStatus::Warning;
279    }
280
281    checks.push(IntegrityCheck {
282        name: "Position Balance (Zero-Sum)".to_string(),
283        status: position_check_status,
284        message: if position_balance.imbalances.is_empty() {
285            if position_balance.excluded_perp_exposures.is_empty() {
286                "All option positions are balanced (sum of longs equals sum of shorts)".to_string()
287            } else {
288                format!(
289                    "All option positions are balanced; {} external perp exposure symbol(s) excluded from zero-sum check",
290                    position_balance.excluded_perp_exposures.len()
291                )
292            }
293        } else {
294            format!(
295                "{} option symbol(s) have position imbalance totaling {}",
296                position_balance.imbalances.len(),
297                position_balance.total_imbalance
298            )
299        },
300        details: if position_balance.imbalances.is_empty()
301            && position_balance.excluded_perp_exposures.is_empty()
302        {
303            None
304        } else if position_balance.excluded_perp_exposures.is_empty() {
305            Some(sonic_rs::to_value(&position_balance.imbalances).unwrap_or_default())
306        } else if position_balance.imbalances.is_empty() {
307            Some(json!({
308                "excluded_perp_exposures": position_balance.excluded_perp_exposures.clone(),
309            }))
310        } else {
311            Some(json!({
312                "imbalances": position_balance.imbalances.clone(),
313                "excluded_perp_exposures": position_balance.excluded_perp_exposures.clone(),
314            }))
315        },
316    });
317
318    // ===== Check 2: Settlement Status =====
319    let settlement_check_status = match &settlement_result {
320        Ok((_, _, pending, _)) => {
321            if *pending > 10 {
322                IntegrityStatus::Critical
323            } else if *pending > 0 {
324                IntegrityStatus::Warning
325            } else {
326                IntegrityStatus::Healthy
327            }
328        }
329        Err(_) => IntegrityStatus::Critical, // Already reported in DB check
330    };
331
332    let (settlement_total, settlement_applied, settlement_pending, settlement_value) =
333        settlement_result
334            .as_ref()
335            .map(|(t, a, p, v)| (*t, *a, *p, *v))
336            .unwrap_or((0, 0, 0, dec!(0)));
337
338    if settlement_check_status == IntegrityStatus::Critical && settlement_result.is_ok() {
339        overall_status = IntegrityStatus::Critical;
340    } else if settlement_check_status == IntegrityStatus::Warning
341        && overall_status != IntegrityStatus::Critical
342    {
343        overall_status = IntegrityStatus::Warning;
344    }
345
346    checks.push(IntegrityCheck {
347        name: "Settlement Processing".to_string(),
348        status: settlement_check_status,
349        message: if settlement_result.is_err() {
350            "Unable to check - database query failed".to_string()
351        } else {
352            format!(
353                "{} total settlements, {} applied, {} pending",
354                settlement_total, settlement_applied, settlement_pending
355            )
356        },
357        details: None,
358    });
359
360    // ===== Check 4: Settlement Balance Authority =====
361    checks.push(IntegrityCheck {
362        name: "Settlement Balance Authority".to_string(),
363        status: IntegrityStatus::Healthy,
364        message:
365            "Settlement cash authority is EngineSnapshot.balance_ledger; DB settlement rows are audit evidence"
366                .to_string(),
367        details: None,
368    });
369
370    // ===== Check 3: Engine Balance Ledger =====
371    // Saturate rather than panic: this is a read-only monitoring endpoint, and a
372    // wallet count above i64::MAX is not physically reachable, so a clamp keeps
373    // the endpoint serving instead of crashing the server.
374    let account_count = i64::try_from(engine_digest.cash_wallet_count).unwrap_or(i64::MAX);
375    let (fill_count, fill_volume) = fill_volume_result
376        .as_ref()
377        .map(|(c, v)| (*c, *v))
378        .unwrap_or((0, dec!(0)));
379
380    checks.push(IntegrityCheck {
381        name: "Engine Balance Ledger".to_string(),
382        status: IntegrityStatus::Healthy,
383        message: format!(
384            "Engine balance ledger has {} nonzero wallet(s); DB current-balance projection is write-only/downstream",
385            account_count
386        ),
387        details: Some(json!({
388            "cash_digest": engine_digest.cash_digest
389        })),
390    });
391
392    // ===== Check 5: WAL Unreplicated Tail Size =====
393    {
394        use hypercall_journal::checkpoint::{checkpoint_path_for, read_checkpoint};
395
396        let wal_path = app_state.runtime_config.wal_path.clone();
397        let checkpoint_path = checkpoint_path_for(&wal_path);
398
399        // Use async metadata so a slow filesystem (NFS, overloaded disk) cannot
400        // stall the Tokio runtime. read_checkpoint reads a tiny fixed-size header,
401        // so it stays synchronous.
402        let wal_metadata = tokio::fs::metadata(&wal_path).await;
403        let wal_check = match (wal_metadata, read_checkpoint(&checkpoint_path)) {
404            (Ok(meta), Ok(checkpoint)) => {
405                let file_size = meta.len();
406                let unreplicated = file_size.saturating_sub(checkpoint.wal_offset);
407                let unreplicated_mb = unreplicated as f64 / (1024.0 * 1024.0);
408
409                const WARNING_THRESHOLD: u64 = 100 * 1024 * 1024; // 100 MB
410                const CRITICAL_THRESHOLD: u64 = 1024 * 1024 * 1024; // 1 GB
411
412                let status = if unreplicated > CRITICAL_THRESHOLD {
413                    IntegrityStatus::Critical
414                } else if unreplicated > WARNING_THRESHOLD {
415                    IntegrityStatus::Warning
416                } else {
417                    IntegrityStatus::Healthy
418                };
419
420                if status == IntegrityStatus::Critical {
421                    overall_status = IntegrityStatus::Critical;
422                } else if status == IntegrityStatus::Warning
423                    && overall_status != IntegrityStatus::Critical
424                {
425                    overall_status = IntegrityStatus::Warning;
426                }
427
428                IntegrityCheck {
429                    name: "WAL Unreplicated Tail".to_string(),
430                    status,
431                    message: format!(
432                        "Unreplicated tail: {:.1} MB (file: {} bytes, checkpoint: {} bytes)",
433                        unreplicated_mb, file_size, checkpoint.wal_offset
434                    ),
435                    details: Some(json!({
436                        "wal_file_bytes": file_size,
437                        "checkpoint_offset_bytes": checkpoint.wal_offset,
438                        "unreplicated_bytes": unreplicated,
439                        "unreplicated_mb": format!("{:.1}", unreplicated_mb),
440                    })),
441                }
442            }
443            (Err(e), _) if e.kind() == std::io::ErrorKind::NotFound => {
444                // WAL file doesn't exist - journaling is disabled
445                IntegrityCheck {
446                    name: "WAL Unreplicated Tail".to_string(),
447                    status: IntegrityStatus::Healthy,
448                    message: "WAL file not present (journaling may be disabled)".to_string(),
449                    details: None,
450                }
451            }
452            (Err(e), _) => {
453                // Permission error, broken mount, etc. - flag it
454                if overall_status != IntegrityStatus::Critical {
455                    overall_status = IntegrityStatus::Warning;
456                }
457                IntegrityCheck {
458                    name: "WAL Unreplicated Tail".to_string(),
459                    status: IntegrityStatus::Warning,
460                    message: format!("Unable to stat WAL file: {}", e),
461                    details: None,
462                }
463            }
464            (Ok(_), Err(e)) => {
465                if overall_status != IntegrityStatus::Critical {
466                    overall_status = IntegrityStatus::Warning;
467                }
468                IntegrityCheck {
469                    name: "WAL Unreplicated Tail".to_string(),
470                    status: IntegrityStatus::Warning,
471                    message: format!("Unable to read checkpoint: {}", e),
472                    details: None,
473                }
474            }
475        };
476        checks.push(wal_check);
477    }
478
479    // ===== Calculate Open Interest =====
480    let total_oi: Decimal = oi_by_underlying.values().copied().sum();
481
482    let unique_instruments = net_positions_by_symbol.len() as i64;
483
484    // Build response
485    let report = IntegrityReport {
486        timestamp,
487        overall_status,
488        checks,
489        ledger_summary: LedgerSummary {
490            total_balance: None,
491            authority_state: "engine_owned_not_db_aggregated".to_string(),
492            account_count,
493            total_fills: fill_count,
494            total_volume: format!("{:.2}", fill_volume),
495        },
496        position_summary: PositionSummary {
497            total_open_interest: format!("{:.4}", total_oi),
498            unique_instruments,
499            net_position_imbalances: position_balance.imbalances,
500        },
501        settlement_summary: SettlementSummary {
502            total: settlement_total,
503            applied: settlement_applied,
504            pending: settlement_pending,
505            total_value: format!("{:.2}", settlement_value),
506        },
507    };
508
509    (StatusCode::OK, SonicJson(report)).into_response()
510}
511
512/// Crossed orderbook details for debugging
513#[derive(Debug, Serialize, utoipa::ToSchema)]
514pub struct CrossedOrderbook {
515    /// Symbol (e.g., "BTC-20260131-100000-C")
516    pub symbol: String,
517    /// Best bid price
518    pub best_bid: f64,
519    /// Best ask price
520    pub best_ask: f64,
521    /// Spread (ask - bid, negative if crossed)
522    pub spread: f64,
523    /// Age of last update in seconds
524    pub age_seconds: i64,
525    /// Last L2 update sequence number
526    pub last_l2_seq: Option<i64>,
527    /// Number of bid price levels
528    pub bid_levels: usize,
529    /// Number of ask price levels
530    pub ask_levels: usize,
531    /// Top 10 bid levels (price, size)
532    pub top_bids: Vec<(f64, f64)>,
533    /// Top 10 ask levels (price, size)
534    pub top_asks: Vec<(f64, f64)>,
535}
536
537/// Orderbook integrity report
538#[derive(Debug, Serialize, utoipa::ToSchema)]
539pub struct OrderbookIntegrityReport {
540    /// Timestamp of the report
541    pub timestamp: String,
542    /// Total number of orderbooks in cache
543    pub total_orderbooks: usize,
544    /// Number of crossed orderbooks (bid >= ask)
545    pub crossed_count: usize,
546    /// Number of stale orderbooks (>60s since last update)
547    pub stale_count: usize,
548    /// Details of each crossed orderbook
549    pub crossed_orderbooks: Vec<CrossedOrderbook>,
550}
551
552/// GET /monitoring/orderbooks - List all orderbooks with crossed state detection
553///
554/// **Authentication**: Requires `X-Admin-Key` header (performance protection).
555#[utoipa::path(
556    get,
557    path = "/monitoring/orderbooks",
558    responses(
559        (status = 200, description = "Orderbook integrity report", body = OrderbookIntegrityReport),
560        (status = 401, description = "Invalid or missing X-Admin-Key header")
561    ),
562    tag = "Monitoring",
563    security(("admin_key" = []))
564)]
565pub async fn orderbook_integrity(State(app_state): State<AdminState>) -> impl IntoResponse {
566    let quotes = app_state.quote_provider.all_quotes();
567    let staleness = app_state.quote_provider.staleness();
568    let l2_seq = app_state.quote_provider.l2_seq();
569    let now = chrono::Utc::now();
570
571    let mut crossed_orderbooks = Vec::new();
572    let age_seconds = staleness.as_secs() as i64;
573
574    for (symbol, quote) in &quotes {
575        if let (Some(bid), Some(ask)) = (quote.best_bid, quote.best_ask) {
576            if bid > 0.0 && ask > 0.0 && bid >= ask {
577                crossed_orderbooks.push(CrossedOrderbook {
578                    symbol: symbol.clone(),
579                    best_bid: bid,
580                    best_ask: ask,
581                    spread: ask - bid,
582                    age_seconds,
583                    last_l2_seq: Some(l2_seq),
584                    bid_levels: quote.bids.len(),
585                    ask_levels: quote.asks.len(),
586                    top_bids: quote.bids.iter().take(10).cloned().collect(),
587                    top_asks: quote.asks.iter().take(10).cloned().collect(),
588                });
589            }
590        }
591    }
592
593    // Sort by spread (most negative/crossed first)
594    crossed_orderbooks.sort_by(|a, b| {
595        a.spread
596            .partial_cmp(&b.spread)
597            .unwrap_or(std::cmp::Ordering::Equal)
598    });
599
600    // Stale if no snapshot update in 30+ seconds
601    let stale_count = if age_seconds > 30 { quotes.len() } else { 0 };
602
603    let report = OrderbookIntegrityReport {
604        timestamp: now.to_rfc3339(),
605        total_orderbooks: quotes.len(),
606        crossed_count: crossed_orderbooks.len(),
607        stale_count,
608        crossed_orderbooks,
609    };
610
611    (StatusCode::OK, SonicJson(report)).into_response()
612}
613
614#[cfg(test)]
615mod tests {
616    use super::*;
617
618    #[test]
619    fn position_balance_excludes_external_perp_exposure() {
620        let positions = HashMap::from([
621            ("BTC-PERP".to_string(), dec!(-0.031)),
622            ("ETH-20260501-2000-C".to_string(), dec!(0)),
623        ]);
624
625        let result = evaluate_position_balance(&positions);
626
627        assert_eq!(result.status, IntegrityStatus::Healthy);
628        assert!(result.imbalances.is_empty());
629        assert_eq!(result.total_imbalance, dec!(0));
630        assert_eq!(
631            result.excluded_perp_exposures,
632            vec![PerpExposure {
633                symbol: "BTC-PERP".to_string(),
634                net_position: "-0.031".to_string(),
635            }]
636        );
637    }
638
639    #[test]
640    fn position_balance_flags_option_imbalance_even_with_perp_exposure() {
641        let positions = HashMap::from([
642            ("BTC-PERP".to_string(), dec!(-0.031)),
643            ("ETH-20260501-2000-C".to_string(), dec!(0.02)),
644        ]);
645
646        let result = evaluate_position_balance(&positions);
647
648        assert_eq!(result.status, IntegrityStatus::Critical);
649        assert_eq!(result.total_imbalance, dec!(0.02));
650        assert_eq!(
651            result.imbalances,
652            vec![PositionImbalance {
653                symbol: "ETH-20260501-2000-C".to_string(),
654                net_position: "0.02".to_string(),
655                severity: IntegrityStatus::Critical,
656            }]
657        );
658        assert_eq!(result.excluded_perp_exposures.len(), 1);
659    }
660
661    #[test]
662    fn position_balance_preserves_warning_threshold_for_small_option_imbalance() {
663        let positions = HashMap::from([("ETH-20260501-2000-C".to_string(), dec!(0.001))]);
664
665        let result = evaluate_position_balance(&positions);
666
667        assert_eq!(result.status, IntegrityStatus::Warning);
668        assert_eq!(result.imbalances[0].severity, IntegrityStatus::Warning);
669    }
670}