hypercall/observability/metrics_collector/
integrity.rs1use super::*;
2use crate::shared::order_types::perp_underlying;
3use rust_decimal::Decimal;
4
5fn position_imbalance_alert_threshold() -> Decimal {
6 Decimal::new(1, 4)
7}
8
9#[derive(Debug, Default, PartialEq)]
10struct PositionImbalanceMetrics {
11 total_imbalance: Decimal,
12 imbalanced_symbols: i64,
13 excluded_perp_symbols: i64,
14}
15
16fn evaluate_position_imbalance(
17 net_positions_by_symbol: &HashMap<String, Decimal>,
18) -> PositionImbalanceMetrics {
19 let mut metrics = PositionImbalanceMetrics::default();
20
21 for (symbol, net_position) in net_positions_by_symbol {
22 if perp_underlying(symbol).is_some() {
23 if *net_position != Decimal::ZERO {
24 metrics.excluded_perp_symbols += 1;
25 }
26 continue;
27 }
28
29 let abs_net = net_position.abs();
30 metrics.total_imbalance += abs_net;
31 if abs_net > position_imbalance_alert_threshold() {
32 metrics.imbalanced_symbols += 1;
33 }
34 }
35
36 metrics
37}
38
39impl MetricsCollector {
40 pub(super) async fn collect_position_metrics(&self) {
44 let portfolios = self.portfolio_cache.get_all_portfolios().await;
45 let mut net_positions_by_symbol: HashMap<String, Decimal> = HashMap::new();
46
47 for portfolio in portfolios.values() {
48 for position in portfolio.positions.values() {
49 *net_positions_by_symbol
50 .entry(position.symbol.clone())
51 .or_insert(Decimal::ZERO) += position.amount;
52 }
53 }
54
55 let metrics = evaluate_position_imbalance(&net_positions_by_symbol);
56 match metrics.total_imbalance.to_f64() {
57 Some(total_imbalance) => {
58 gauge!("ht_integrity_position_imbalance").set(total_imbalance);
59 gauge!("ht_integrity_position_imbalance_conversion_failures").set(0.0);
60 gauge!("ht_integrity_position_imbalance_complete").set(1.0);
61 }
62 None => {
63 error!(
64 imbalance = %metrics.total_imbalance,
65 "Failed to convert position imbalance to f64"
66 );
67 gauge!("ht_integrity_position_imbalance_conversion_failures").set(1.0);
68 gauge!("ht_integrity_position_imbalance_complete").set(0.0);
69 }
70 }
71
72 gauge!("ht_integrity_imbalanced_symbols").set(metrics.imbalanced_symbols as f64);
73 gauge!("ht_integrity_excluded_perp_symbols").set(metrics.excluded_perp_symbols as f64);
74
75 if metrics.imbalanced_symbols > 0 {
76 error!(
77 imbalance = %metrics.total_imbalance,
78 symbols = metrics.imbalanced_symbols,
79 "🚨 POSITION IMBALANCE: matching-engine option positions do not net to zero"
80 );
81 }
82 }
83
84 pub(super) async fn collect_integrity_metrics(&self) {
88 if let Some(ref integrity_db) = self.integrity_db {
89 let results = integrity_db.get_integrity_query_results().await;
90 {
91 let fill_volume_result = results.fill_volume;
92 let settlement_result = results.settlement_stats;
93 let oi_result = results.open_interest_by_underlying;
94
95 if let Ok((fill_count, total_notional)) = fill_volume_result {
102 gauge!("ht_ledger_total_fills").set(fill_count as f64);
103 if let Some(v) = total_notional.to_f64() {
104 gauge!("ht_ledger_total_volume_usd").set(v);
105 } else {
106 error!(
107 "Failed to convert total_notional to f64: {}",
108 total_notional
109 );
110 }
111 }
112
113 if let Ok((total, applied, pending, total_value)) = settlement_result {
115 gauge!("ht_settlement_total").set(total as f64);
116 gauge!("ht_settlement_applied").set(applied as f64);
117 gauge!("ht_settlement_pending").set(pending as f64);
118 if let Some(v) = total_value.to_f64() {
119 gauge!("ht_settlement_total_value_usd").set(v);
120 } else {
121 error!(
122 "Failed to convert settlement total_value to f64: {}",
123 total_value
124 );
125 }
126
127 if pending > 0 {
129 debug!(
130 "Settlements: {} total, {} applied, {} pending",
131 total, applied, pending
132 );
133 }
134 }
135
136 if let Ok(count) = results.orphaned_settlement_orders {
138 gauge!("ht_orphaned_settlement_orders").set(count as f64);
139 }
140
141 if let Ok(total) = results.ledger_events_settlement_total {
143 if let Some(v) = total.to_f64() {
144 gauge!("ht_ledger_events_settlement_sum").set(v);
145 } else {
146 error!(
147 "Failed to convert ledger_events_settlement_total to f64: {}",
148 total
149 );
150 }
151 }
152
153 if let Ok(oi_list) = oi_result {
155 for (underlying, oi) in oi_list {
156 if let Some(v) = oi.to_f64() {
157 gauge!("ht_open_interest_by_underlying", "underlying" => underlying.clone())
158 .set(v);
159 } else {
160 error!("Failed to convert OI for {} to f64: {}", underlying, oi);
161 }
162 }
163 }
164 }
165 }
166 }
167}
168
169#[cfg(test)]
170mod tests {
171 use super::*;
172 use rust_decimal_macros::dec;
173
174 #[test]
175 fn position_imbalance_sums_option_exposure() {
176 let net_positions = HashMap::from([
177 ("BTC-20260115-100000-C".to_string(), dec!(1.25)),
178 ("ETH-20260115-5000-P".to_string(), dec!(-0.5)),
179 ]);
180
181 let metrics = evaluate_position_imbalance(&net_positions);
182
183 assert_eq!(metrics.total_imbalance, dec!(1.75));
184 assert_eq!(metrics.imbalanced_symbols, 2);
185 assert_eq!(metrics.excluded_perp_symbols, 0);
186 }
187
188 #[test]
189 fn position_imbalance_excludes_external_perps() {
190 let net_positions = HashMap::from([
191 ("BTC-PERP".to_string(), dec!(7)),
192 ("BTC-20260115-100000-C".to_string(), dec!(0)),
193 ]);
194
195 let metrics = evaluate_position_imbalance(&net_positions);
196
197 assert_eq!(metrics.total_imbalance, dec!(0));
198 assert_eq!(metrics.imbalanced_symbols, 0);
199 assert_eq!(metrics.excluded_perp_symbols, 1);
200 }
201
202 #[test]
203 fn position_imbalance_ignores_dust_for_symbol_count() {
204 let net_positions = HashMap::from([("BTC-20260115-100000-C".to_string(), dec!(0.0001))]);
205
206 let metrics = evaluate_position_imbalance(&net_positions);
207
208 assert_eq!(metrics.total_imbalance, dec!(0.0001));
209 assert_eq!(metrics.imbalanced_symbols, 0);
210 }
211}