Skip to main content

hypercall/observability/metrics_collector/
settlement.rs

1use super::*;
2use crate::read_cache::portfolio::PortfolioSummary;
3use crate::rsm::margin_mode::MarginMode;
4use hypercall_db::PmSettlementPoolProjection;
5use hypercall_engine::instrument::ParsedInstrument;
6use hypercall_types::{Side, WalletAddress};
7use rust_decimal::Decimal;
8use std::collections::{BTreeMap, BTreeSet};
9
10const PM_RECOVERY_PLAN_STATUSES: &[&str] = &[
11    "Planned",
12    "Submitted",
13    "Confirmed",
14    "Canceled",
15    "Failed",
16    "EscalatedManual",
17    "PartiallyRepaid",
18    "Repaid",
19    "Placeholder",
20];
21
22#[derive(Default)]
23struct PmShortOptionQuantity {
24    underlying: String,
25    long_position_quantity: Decimal,
26    short_position_quantity: Decimal,
27    open_sell_quantity: Decimal,
28}
29
30impl MetricsCollector {
31    // ===== Settlement Metrics =====
32
33    pub(super) async fn collect_settlement_metrics(&self) {
34        if let Some(ref handler) = self.db {
35            let handler_clone = handler.clone();
36
37            // Run blocking DB queries in spawn_blocking to avoid blocking the async runtime
38            let db_results = tokio::task::spawn_blocking(move || {
39                let status_counts = handler_clone.get_instrument_status_counts_sync();
40                let expiring_count = handler_clone.get_markets_expiring_within_sync(24 * 60 * 60);
41                (status_counts, expiring_count)
42            })
43            .await;
44
45            match db_results {
46                Ok((status_counts_result, expiring_result)) => {
47                    // Process instrument status counts
48                    match status_counts_result {
49                        Ok(counts) => {
50                            for (status, count) in counts {
51                                gauge!("ht_instruments_by_status", "status" => status)
52                                    .set(count as f64);
53                            }
54                        }
55                        Err(e) => {
56                            warn!("Failed to get instrument status counts: {}", e);
57                        }
58                    }
59
60                    // Process expiring markets count
61                    match expiring_result {
62                        Ok(count) => {
63                            gauge!("ht_markets_expiring_24h").set(count as f64);
64                        }
65                        Err(e) => {
66                            warn!("Failed to get expiring markets count: {}", e);
67                        }
68                    }
69                }
70                Err(e) => {
71                    warn!("spawn_blocking for settlement metrics failed: {}", e);
72                }
73            }
74
75            let Some(pm_settlement_db) = &self.pm_settlement_db else {
76                return;
77            };
78
79            let mut pm_pool_underlyings = std::collections::BTreeSet::new();
80            match pm_settlement_db.list_pm_settlement_pools().await {
81                Ok(pools) => {
82                    gauge!("ht_pm_pool_facts_available").set(1.0);
83                    for pool in pools {
84                        let underlying = pool.underlying.clone();
85                        pm_pool_underlyings.insert(underlying.clone());
86                        if let Some(value) = pool.pool_utilization.and_then(|v| v.to_f64()) {
87                            gauge!("ht_pm_pool_utilization", "underlying" => underlying.clone())
88                                .set(value);
89                        }
90                        if let Some(value) = pool.pool_available_usdc.to_f64() {
91                            gauge!("ht_pm_pool_available_usdc", "underlying" => underlying.clone())
92                                .set(value);
93                        }
94                        match self.recomputed_pm_pool_target_usdc(&pool).await {
95                            Ok(pool_target_usdc) => {
96                                gauge!(
97                                    "ht_pm_pool_target_facts_available",
98                                    "underlying" => underlying.clone()
99                                )
100                                .set(1.0);
101                                if let Some(value) = pool_target_usdc.to_f64() {
102                                    gauge!(
103                                        "ht_pm_pool_target_usdc",
104                                        "underlying" => underlying.clone()
105                                    )
106                                    .set(value);
107                                }
108                            }
109                            Err(error) => {
110                                gauge!(
111                                    "ht_pm_pool_target_facts_available",
112                                    "underlying" => underlying.clone()
113                                )
114                                .set(0.0);
115                                gauge!("ht_pm_pool_target_usdc", "underlying" => underlying.clone())
116                                    .set(f64::NAN);
117                                warn!(
118                                    "Failed to recompute PM settlement pool target for {}: {}",
119                                    pool.underlying, error
120                                );
121                            }
122                        }
123                        if let Some(value) = pool.active_timing_bridge_usdc.to_f64() {
124                            gauge!("ht_pm_timing_bridge_usdc", "underlying" => underlying.clone())
125                                .set(value);
126                        }
127                        if let Some(value) = pool.active_settlement_debt_usdc.to_f64() {
128                            gauge!("ht_pm_settlement_debt_usdc", "underlying" => underlying)
129                                .set(value);
130                        }
131                    }
132                }
133                Err(e) => {
134                    gauge!("ht_pm_pool_facts_available").set(0.0);
135                    warn!("Failed to get PM settlement pool projections: {}", e);
136                }
137            }
138
139            match pm_settlement_db.list_pm_settlement_accounts().await {
140                Ok(accounts) => {
141                    let mut bridge_accounts: HashMap<String, usize> = HashMap::new();
142                    let mut debt_accounts: HashMap<String, usize> = HashMap::new();
143                    let mut account_underlyings = pm_pool_underlyings.clone();
144                    for account in accounts {
145                        account_underlyings.insert(account.underlying.clone());
146                        if account.timing_bridge_principal_usdc > Decimal::ZERO {
147                            *bridge_accounts
148                                .entry(account.underlying.clone())
149                                .or_default() += 1;
150                        }
151                        if account.settlement_debt_principal_usdc > Decimal::ZERO {
152                            *debt_accounts.entry(account.underlying).or_default() += 1;
153                        }
154                    }
155                    for underlying in account_underlyings {
156                        let bridge_count = bridge_accounts.get(&underlying).copied().unwrap_or(0);
157                        let debt_count = debt_accounts.get(&underlying).copied().unwrap_or(0);
158                        gauge!("ht_pm_bridge_accounts", "underlying" => underlying.clone())
159                            .set(bridge_count as f64);
160                        gauge!("ht_pm_debt_accounts", "underlying" => underlying)
161                            .set(debt_count as f64);
162                    }
163                }
164                Err(e) => {
165                    warn!("Failed to get PM settlement account projections: {}", e);
166                }
167            }
168
169            match pm_settlement_db.list_pm_recovery_plans().await {
170                Ok(plans) => {
171                    let mut counts: HashMap<(String, String), usize> = HashMap::new();
172                    let mut recovery_underlyings = pm_pool_underlyings.clone();
173                    for plan in plans {
174                        recovery_underlyings.insert(plan.underlying.clone());
175                        *counts.entry((plan.underlying, plan.status)).or_default() += 1;
176                    }
177                    for underlying in recovery_underlyings {
178                        for status in PM_RECOVERY_PLAN_STATUSES {
179                            let count = counts
180                                .get(&(underlying.clone(), status.to_string()))
181                                .copied()
182                                .unwrap_or(0);
183                            gauge!(
184                                "ht_pm_recovery_plan_total",
185                                "underlying" => underlying.clone(),
186                                "status" => *status
187                            )
188                            .set(count as f64);
189                        }
190                    }
191                    for ((underlying, status), count) in counts {
192                        if PM_RECOVERY_PLAN_STATUSES.contains(&status.as_str()) {
193                            continue;
194                        }
195                        gauge!(
196                            "ht_pm_recovery_plan_total",
197                            "underlying" => underlying,
198                            "status" => status
199                        )
200                        .set(count as f64);
201                    }
202                }
203                Err(e) => {
204                    warn!("Failed to get PM recovery plan projections: {}", e);
205                }
206            }
207        }
208    }
209
210    async fn recomputed_pm_pool_target_usdc(
211        &self,
212        pool: &PmSettlementPoolProjection,
213    ) -> Result<Decimal, String> {
214        let multiplier = pm_target_short_oi_notional_multiplier(pool)?;
215        let eligible_wallets = self.pm_settlement_pool_eligible_wallets().await?;
216        let short_option_oi_usdc = self
217            .pm_current_short_option_oi_usdc(&pool.underlying, &eligible_wallets)
218            .await?;
219        let active_liability_usdc =
220            pool.active_timing_bridge_usdc + pool.active_settlement_debt_usdc;
221        Ok(active_liability_usdc.max(multiplier * short_option_oi_usdc))
222    }
223
224    async fn pm_settlement_pool_eligible_wallets(&self) -> Result<BTreeSet<WalletAddress>, String> {
225        let Some(tier_cache) = &self.tier_cache else {
226            return Err("missing tier cache for PM settlement pool target metric".to_string());
227        };
228
229        let mut eligible_wallets = BTreeSet::new();
230        for wallet in &self.pm_settlement_allowlist {
231            let margin_mode = tier_cache.get_margin_mode(wallet).await.map_err(|error| {
232                format!("failed to load margin mode for PM settlement allowlisted wallet: {error}")
233            })?;
234            if margin_mode == MarginMode::Portfolio {
235                eligible_wallets.insert(*wallet);
236            }
237        }
238        Ok(eligible_wallets)
239    }
240
241    async fn pm_current_short_option_oi_usdc(
242        &self,
243        underlying: &str,
244        eligible_wallets: &BTreeSet<WalletAddress>,
245    ) -> Result<Decimal, String> {
246        let mut quantities = BTreeMap::<(WalletAddress, String), PmShortOptionQuantity>::new();
247        let portfolios = self.portfolio_cache.get_all_portfolios().await;
248        self.collect_pm_position_quantities(
249            underlying,
250            eligible_wallets,
251            &portfolios,
252            &mut quantities,
253        );
254        self.collect_pm_open_sell_quantities(underlying, eligible_wallets, &mut quantities)?;
255
256        let mut short_option_oi_usdc = Decimal::ZERO;
257        for quantity in quantities.values() {
258            let short_open_order_quantity =
259                (quantity.open_sell_quantity - quantity.long_position_quantity).max(Decimal::ZERO);
260            let aggregate_short_quantity =
261                quantity.short_position_quantity + short_open_order_quantity;
262            if aggregate_short_quantity <= Decimal::ZERO {
263                continue;
264            }
265
266            let spot_price = self.pm_spot_price(&quantity.underlying).await?;
267            short_option_oi_usdc += spot_price * aggregate_short_quantity;
268        }
269
270        Ok(short_option_oi_usdc)
271    }
272
273    fn collect_pm_position_quantities(
274        &self,
275        underlying: &str,
276        eligible_wallets: &BTreeSet<WalletAddress>,
277        portfolios: &HashMap<WalletAddress, PortfolioSummary>,
278        quantities: &mut BTreeMap<(WalletAddress, String), PmShortOptionQuantity>,
279    ) {
280        for (wallet, portfolio) in portfolios {
281            if !eligible_wallets.contains(wallet) {
282                continue;
283            }
284            for (symbol, position) in &portfolio.positions {
285                let Ok(parsed) = ParsedInstrument::parse(symbol) else {
286                    continue;
287                };
288                if parsed.underlying != underlying {
289                    continue;
290                }
291
292                let quantity = quantities.entry((*wallet, symbol.clone())).or_default();
293                quantity.underlying = parsed.underlying;
294                if position.amount < Decimal::ZERO {
295                    quantity.short_position_quantity = position.amount.abs();
296                } else {
297                    quantity.long_position_quantity = position.amount;
298                }
299            }
300        }
301    }
302
303    fn collect_pm_open_sell_quantities(
304        &self,
305        underlying: &str,
306        eligible_wallets: &BTreeSet<WalletAddress>,
307        quantities: &mut BTreeMap<(WalletAddress, String), PmShortOptionQuantity>,
308    ) -> Result<(), String> {
309        let Some(order_snapshot) = &self.order_snapshot else {
310            return Err("missing order snapshot for PM settlement pool target metric".to_string());
311        };
312
313        for (order, wallet) in order_snapshot.get_all_orders() {
314            if !eligible_wallets.contains(&wallet)
315                || order.is_perp
316                || !matches!(order.side, Side::Sell)
317                || order.remaining_size <= Decimal::ZERO
318            {
319                continue;
320            }
321            let Ok(parsed) = ParsedInstrument::parse(&order.symbol) else {
322                continue;
323            };
324            if parsed.underlying != underlying {
325                continue;
326            }
327
328            let quantity = quantities.entry((wallet, order.symbol)).or_default();
329            quantity.underlying = parsed.underlying;
330            // RuntimeOrderSummary sizes are published in human contract units.
331            quantity.open_sell_quantity += order.remaining_size;
332        }
333        Ok(())
334    }
335
336    async fn pm_spot_price(&self, underlying: &str) -> Result<Decimal, String> {
337        let Some(greeks_cache) = &self.greeks_cache else {
338            return Err("missing greeks cache for PM settlement pool target metric".to_string());
339        };
340        let spot_price = greeks_cache
341            .get_spot_price(underlying)
342            .await
343            .ok_or_else(|| format!("missing spot price facts for {underlying}"))?;
344        let spot_price = Decimal::from_f64_retain(spot_price)
345            .ok_or_else(|| format!("invalid spot price facts for {underlying}: {spot_price}"))?;
346        if spot_price <= Decimal::ZERO {
347            return Err(format!(
348                "nonpositive spot price facts for {underlying}: {spot_price}"
349            ));
350        }
351        Ok(spot_price)
352    }
353}
354
355fn pm_target_short_oi_notional_multiplier(
356    pool: &PmSettlementPoolProjection,
357) -> Result<Decimal, String> {
358    pool.target_short_oi_notional_multiplier.ok_or_else(|| {
359        format!(
360            "missing PM settlement pool config facts for {} target recomputation",
361            pool.underlying
362        )
363    })
364}
365
366#[cfg(test)]
367mod tests {
368    use super::*;
369    use chrono::Utc;
370    use rust_decimal_macros::dec;
371
372    fn pm_pool_projection_with_multiplier(
373        target_short_oi_notional_multiplier: Option<Decimal>,
374    ) -> PmSettlementPoolProjection {
375        PmSettlementPoolProjection {
376            underlying: "BTC".to_string(),
377            config_version: 1,
378            policy_version: 1,
379            pool_available_usdc: dec!(1000),
380            pool_target_usdc: dec!(500),
381            pool_capacity_usdc: dec!(2000),
382            pool_utilization: Some(dec!(0.5)),
383            active_timing_bridge_usdc: Decimal::ZERO,
384            active_settlement_debt_usdc: Decimal::ZERO,
385            target_short_oi_notional_multiplier,
386            utilization_kink: Some(dec!(0.6)),
387            apr_at_kink: Some(dec!(0.10)),
388            max_apr: Some(dec!(1.00)),
389            normal_utilization_cap: Some(dec!(0.80)),
390            crisis_utilization_cap: Some(dec!(0.95)),
391            bridge_window_ms: Some(86_400_000),
392            last_engine_command_id: 1,
393            projection_seq: 1,
394            updated_at: Utc::now(),
395        }
396    }
397
398    #[test]
399    fn pm_pool_target_multiplier_missing_config_is_unavailable() {
400        let pool = pm_pool_projection_with_multiplier(None);
401
402        let error = pm_target_short_oi_notional_multiplier(&pool)
403            .expect_err("missing target multiplier must mark target facts unavailable");
404
405        assert!(error.contains("missing PM settlement pool config facts"));
406    }
407
408    #[test]
409    fn pm_pool_target_multiplier_reads_config_value() {
410        let pool = pm_pool_projection_with_multiplier(Some(dec!(0.25)));
411
412        let multiplier =
413            pm_target_short_oi_notional_multiplier(&pool).expect("config value should be present");
414
415        assert_eq!(multiplier, dec!(0.25));
416    }
417}