Skip to main content

hypercall/observability/metrics_collector/
integrity.rs

1use super::*;
2use crate::shared::order_types::perp_underlying;
3use rust_decimal::Decimal;
4
5fn position_imbalance_alert_threshold() -> Decimal {
6    Decimal::new(1, 4)
7}
8
9#[derive(Debug, Default, PartialEq)]
10struct PositionImbalanceMetrics {
11    total_imbalance: Decimal,
12    imbalanced_symbols: i64,
13    excluded_perp_symbols: i64,
14}
15
16fn evaluate_position_imbalance(
17    net_positions_by_symbol: &HashMap<String, Decimal>,
18) -> PositionImbalanceMetrics {
19    let mut metrics = PositionImbalanceMetrics::default();
20
21    for (symbol, net_position) in net_positions_by_symbol {
22        if perp_underlying(symbol).is_some() {
23            if *net_position != Decimal::ZERO {
24                metrics.excluded_perp_symbols += 1;
25            }
26            continue;
27        }
28
29        let abs_net = net_position.abs();
30        metrics.total_imbalance += abs_net;
31        if abs_net > position_imbalance_alert_threshold() {
32            metrics.imbalanced_symbols += 1;
33        }
34    }
35
36    metrics
37}
38
39impl MetricsCollector {
40    // ===== Position Integrity Metrics =====
41    // Positions live in the in-memory portfolio cache, not the database.
42
43    pub(super) async fn collect_position_metrics(&self) {
44        let portfolios = self.portfolio_cache.get_all_portfolios().await;
45        let mut net_positions_by_symbol: HashMap<String, Decimal> = HashMap::new();
46
47        for portfolio in portfolios.values() {
48            for position in portfolio.positions.values() {
49                *net_positions_by_symbol
50                    .entry(position.symbol.clone())
51                    .or_insert(Decimal::ZERO) += position.amount;
52            }
53        }
54
55        let metrics = evaluate_position_imbalance(&net_positions_by_symbol);
56        match metrics.total_imbalance.to_f64() {
57            Some(total_imbalance) => {
58                gauge!("ht_integrity_position_imbalance").set(total_imbalance);
59                gauge!("ht_integrity_position_imbalance_conversion_failures").set(0.0);
60                gauge!("ht_integrity_position_imbalance_complete").set(1.0);
61            }
62            None => {
63                error!(
64                    imbalance = %metrics.total_imbalance,
65                    "Failed to convert position imbalance to f64"
66                );
67                gauge!("ht_integrity_position_imbalance_conversion_failures").set(1.0);
68                gauge!("ht_integrity_position_imbalance_complete").set(0.0);
69            }
70        }
71
72        gauge!("ht_integrity_imbalanced_symbols").set(metrics.imbalanced_symbols as f64);
73        gauge!("ht_integrity_excluded_perp_symbols").set(metrics.excluded_perp_symbols as f64);
74
75        if metrics.imbalanced_symbols > 0 {
76            error!(
77                imbalance = %metrics.total_imbalance,
78                symbols = metrics.imbalanced_symbols,
79                "🚨 POSITION IMBALANCE: matching-engine option positions do not net to zero"
80            );
81        }
82    }
83
84    // ===== Integrity Metrics =====
85    // These metrics help detect margin engine bugs by tracking system-wide invariants
86
87    pub(super) async fn collect_integrity_metrics(&self) {
88        if let Some(ref integrity_db) = self.integrity_db {
89            let results = integrity_db.get_integrity_query_results().await;
90            {
91                let fill_volume_result = results.fill_volume;
92                let settlement_result = results.settlement_stats;
93                let oi_result = results.open_interest_by_underlying;
94
95                // NOTE: Position imbalance metrics (ht_integrity_position_imbalance,
96                // ht_integrity_imbalanced_symbols) are computed from the in-memory
97                // portfolio cache in collect_position_metrics(), not from the DB.
98                // Positions are not persisted to the database.
99
100                // Fill volume stats
101                if let Ok((fill_count, total_notional)) = fill_volume_result {
102                    gauge!("ht_ledger_total_fills").set(fill_count as f64);
103                    if let Some(v) = total_notional.to_f64() {
104                        gauge!("ht_ledger_total_volume_usd").set(v);
105                    } else {
106                        error!(
107                            "Failed to convert total_notional to f64: {}",
108                            total_notional
109                        );
110                    }
111                }
112
113                // Settlement integrity
114                if let Ok((total, applied, pending, total_value)) = settlement_result {
115                    gauge!("ht_settlement_total").set(total as f64);
116                    gauge!("ht_settlement_applied").set(applied as f64);
117                    gauge!("ht_settlement_pending").set(pending as f64);
118                    if let Some(v) = total_value.to_f64() {
119                        gauge!("ht_settlement_total_value_usd").set(v);
120                    } else {
121                        error!(
122                            "Failed to convert settlement total_value to f64: {}",
123                            total_value
124                        );
125                    }
126
127                    // Warn if there are pending settlements that might be stuck
128                    if pending > 0 {
129                        debug!(
130                            "Settlements: {} total, {} applied, {} pending",
131                            total, applied, pending
132                        );
133                    }
134                }
135
136                // Orphaned orders on settled instruments (CALL-553 canary)
137                if let Ok(count) = results.orphaned_settlement_orders {
138                    gauge!("ht_orphaned_settlement_orders").set(count as f64);
139                }
140
141                // Settlement events sum from ledger_events (drift detection)
142                if let Ok(total) = results.ledger_events_settlement_total {
143                    if let Some(v) = total.to_f64() {
144                        gauge!("ht_ledger_events_settlement_sum").set(v);
145                    } else {
146                        error!(
147                            "Failed to convert ledger_events_settlement_total to f64: {}",
148                            total
149                        );
150                    }
151                }
152
153                // Open interest by underlying
154                if let Ok(oi_list) = oi_result {
155                    for (underlying, oi) in oi_list {
156                        if let Some(v) = oi.to_f64() {
157                            gauge!("ht_open_interest_by_underlying", "underlying" => underlying.clone())
158                                    .set(v);
159                        } else {
160                            error!("Failed to convert OI for {} to f64: {}", underlying, oi);
161                        }
162                    }
163                }
164            }
165        }
166    }
167}
168
169#[cfg(test)]
170mod tests {
171    use super::*;
172    use rust_decimal_macros::dec;
173
174    #[test]
175    fn position_imbalance_sums_option_exposure() {
176        let net_positions = HashMap::from([
177            ("BTC-20260115-100000-C".to_string(), dec!(1.25)),
178            ("ETH-20260115-5000-P".to_string(), dec!(-0.5)),
179        ]);
180
181        let metrics = evaluate_position_imbalance(&net_positions);
182
183        assert_eq!(metrics.total_imbalance, dec!(1.75));
184        assert_eq!(metrics.imbalanced_symbols, 2);
185        assert_eq!(metrics.excluded_perp_symbols, 0);
186    }
187
188    #[test]
189    fn position_imbalance_excludes_external_perps() {
190        let net_positions = HashMap::from([
191            ("BTC-PERP".to_string(), dec!(7)),
192            ("BTC-20260115-100000-C".to_string(), dec!(0)),
193        ]);
194
195        let metrics = evaluate_position_imbalance(&net_positions);
196
197        assert_eq!(metrics.total_imbalance, dec!(0));
198        assert_eq!(metrics.imbalanced_symbols, 0);
199        assert_eq!(metrics.excluded_perp_symbols, 1);
200    }
201
202    #[test]
203    fn position_imbalance_ignores_dust_for_symbol_count() {
204        let net_positions = HashMap::from([("BTC-20260115-100000-C".to_string(), dec!(0.0001))]);
205
206        let metrics = evaluate_position_imbalance(&net_positions);
207
208        assert_eq!(metrics.total_imbalance, dec!(0.0001));
209        assert_eq!(metrics.imbalanced_symbols, 0);
210    }
211}