hypercall/observability/metrics_collector/
settlement.rs1use super::*;
2use crate::read_cache::portfolio::PortfolioSummary;
3use crate::rsm::margin_mode::MarginMode;
4use hypercall_db::PmSettlementPoolProjection;
5use hypercall_engine::instrument::ParsedInstrument;
6use hypercall_types::{Side, WalletAddress};
7use rust_decimal::Decimal;
8use std::collections::{BTreeMap, BTreeSet};
9
10const PM_RECOVERY_PLAN_STATUSES: &[&str] = &[
11 "Planned",
12 "Submitted",
13 "Confirmed",
14 "Canceled",
15 "Failed",
16 "EscalatedManual",
17 "PartiallyRepaid",
18 "Repaid",
19 "Placeholder",
20];
21
22#[derive(Default)]
23struct PmShortOptionQuantity {
24 underlying: String,
25 long_position_quantity: Decimal,
26 short_position_quantity: Decimal,
27 open_sell_quantity: Decimal,
28}
29
30impl MetricsCollector {
31 pub(super) async fn collect_settlement_metrics(&self) {
34 if let Some(ref handler) = self.db {
35 let handler_clone = handler.clone();
36
37 let db_results = tokio::task::spawn_blocking(move || {
39 let status_counts = handler_clone.get_instrument_status_counts_sync();
40 let expiring_count = handler_clone.get_markets_expiring_within_sync(24 * 60 * 60);
41 (status_counts, expiring_count)
42 })
43 .await;
44
45 match db_results {
46 Ok((status_counts_result, expiring_result)) => {
47 match status_counts_result {
49 Ok(counts) => {
50 for (status, count) in counts {
51 gauge!("ht_instruments_by_status", "status" => status)
52 .set(count as f64);
53 }
54 }
55 Err(e) => {
56 warn!("Failed to get instrument status counts: {}", e);
57 }
58 }
59
60 match expiring_result {
62 Ok(count) => {
63 gauge!("ht_markets_expiring_24h").set(count as f64);
64 }
65 Err(e) => {
66 warn!("Failed to get expiring markets count: {}", e);
67 }
68 }
69 }
70 Err(e) => {
71 warn!("spawn_blocking for settlement metrics failed: {}", e);
72 }
73 }
74
75 let Some(pm_settlement_db) = &self.pm_settlement_db else {
76 return;
77 };
78
79 let mut pm_pool_underlyings = std::collections::BTreeSet::new();
80 match pm_settlement_db.list_pm_settlement_pools().await {
81 Ok(pools) => {
82 gauge!("ht_pm_pool_facts_available").set(1.0);
83 for pool in pools {
84 let underlying = pool.underlying.clone();
85 pm_pool_underlyings.insert(underlying.clone());
86 if let Some(value) = pool.pool_utilization.and_then(|v| v.to_f64()) {
87 gauge!("ht_pm_pool_utilization", "underlying" => underlying.clone())
88 .set(value);
89 }
90 if let Some(value) = pool.pool_available_usdc.to_f64() {
91 gauge!("ht_pm_pool_available_usdc", "underlying" => underlying.clone())
92 .set(value);
93 }
94 match self.recomputed_pm_pool_target_usdc(&pool).await {
95 Ok(pool_target_usdc) => {
96 gauge!(
97 "ht_pm_pool_target_facts_available",
98 "underlying" => underlying.clone()
99 )
100 .set(1.0);
101 if let Some(value) = pool_target_usdc.to_f64() {
102 gauge!(
103 "ht_pm_pool_target_usdc",
104 "underlying" => underlying.clone()
105 )
106 .set(value);
107 }
108 }
109 Err(error) => {
110 gauge!(
111 "ht_pm_pool_target_facts_available",
112 "underlying" => underlying.clone()
113 )
114 .set(0.0);
115 gauge!("ht_pm_pool_target_usdc", "underlying" => underlying.clone())
116 .set(f64::NAN);
117 warn!(
118 "Failed to recompute PM settlement pool target for {}: {}",
119 pool.underlying, error
120 );
121 }
122 }
123 if let Some(value) = pool.active_timing_bridge_usdc.to_f64() {
124 gauge!("ht_pm_timing_bridge_usdc", "underlying" => underlying.clone())
125 .set(value);
126 }
127 if let Some(value) = pool.active_settlement_debt_usdc.to_f64() {
128 gauge!("ht_pm_settlement_debt_usdc", "underlying" => underlying)
129 .set(value);
130 }
131 }
132 }
133 Err(e) => {
134 gauge!("ht_pm_pool_facts_available").set(0.0);
135 warn!("Failed to get PM settlement pool projections: {}", e);
136 }
137 }
138
139 match pm_settlement_db.list_pm_settlement_accounts().await {
140 Ok(accounts) => {
141 let mut bridge_accounts: HashMap<String, usize> = HashMap::new();
142 let mut debt_accounts: HashMap<String, usize> = HashMap::new();
143 let mut account_underlyings = pm_pool_underlyings.clone();
144 for account in accounts {
145 account_underlyings.insert(account.underlying.clone());
146 if account.timing_bridge_principal_usdc > Decimal::ZERO {
147 *bridge_accounts
148 .entry(account.underlying.clone())
149 .or_default() += 1;
150 }
151 if account.settlement_debt_principal_usdc > Decimal::ZERO {
152 *debt_accounts.entry(account.underlying).or_default() += 1;
153 }
154 }
155 for underlying in account_underlyings {
156 let bridge_count = bridge_accounts.get(&underlying).copied().unwrap_or(0);
157 let debt_count = debt_accounts.get(&underlying).copied().unwrap_or(0);
158 gauge!("ht_pm_bridge_accounts", "underlying" => underlying.clone())
159 .set(bridge_count as f64);
160 gauge!("ht_pm_debt_accounts", "underlying" => underlying)
161 .set(debt_count as f64);
162 }
163 }
164 Err(e) => {
165 warn!("Failed to get PM settlement account projections: {}", e);
166 }
167 }
168
169 match pm_settlement_db.list_pm_recovery_plans().await {
170 Ok(plans) => {
171 let mut counts: HashMap<(String, String), usize> = HashMap::new();
172 let mut recovery_underlyings = pm_pool_underlyings.clone();
173 for plan in plans {
174 recovery_underlyings.insert(plan.underlying.clone());
175 *counts.entry((plan.underlying, plan.status)).or_default() += 1;
176 }
177 for underlying in recovery_underlyings {
178 for status in PM_RECOVERY_PLAN_STATUSES {
179 let count = counts
180 .get(&(underlying.clone(), status.to_string()))
181 .copied()
182 .unwrap_or(0);
183 gauge!(
184 "ht_pm_recovery_plan_total",
185 "underlying" => underlying.clone(),
186 "status" => *status
187 )
188 .set(count as f64);
189 }
190 }
191 for ((underlying, status), count) in counts {
192 if PM_RECOVERY_PLAN_STATUSES.contains(&status.as_str()) {
193 continue;
194 }
195 gauge!(
196 "ht_pm_recovery_plan_total",
197 "underlying" => underlying,
198 "status" => status
199 )
200 .set(count as f64);
201 }
202 }
203 Err(e) => {
204 warn!("Failed to get PM recovery plan projections: {}", e);
205 }
206 }
207 }
208 }
209
210 async fn recomputed_pm_pool_target_usdc(
211 &self,
212 pool: &PmSettlementPoolProjection,
213 ) -> Result<Decimal, String> {
214 let multiplier = pm_target_short_oi_notional_multiplier(pool)?;
215 let eligible_wallets = self.pm_settlement_pool_eligible_wallets().await?;
216 let short_option_oi_usdc = self
217 .pm_current_short_option_oi_usdc(&pool.underlying, &eligible_wallets)
218 .await?;
219 let active_liability_usdc =
220 pool.active_timing_bridge_usdc + pool.active_settlement_debt_usdc;
221 Ok(active_liability_usdc.max(multiplier * short_option_oi_usdc))
222 }
223
224 async fn pm_settlement_pool_eligible_wallets(&self) -> Result<BTreeSet<WalletAddress>, String> {
225 let Some(tier_cache) = &self.tier_cache else {
226 return Err("missing tier cache for PM settlement pool target metric".to_string());
227 };
228
229 let mut eligible_wallets = BTreeSet::new();
230 for wallet in &self.pm_settlement_allowlist {
231 let margin_mode = tier_cache.get_margin_mode(wallet).await.map_err(|error| {
232 format!("failed to load margin mode for PM settlement allowlisted wallet: {error}")
233 })?;
234 if margin_mode == MarginMode::Portfolio {
235 eligible_wallets.insert(*wallet);
236 }
237 }
238 Ok(eligible_wallets)
239 }
240
241 async fn pm_current_short_option_oi_usdc(
242 &self,
243 underlying: &str,
244 eligible_wallets: &BTreeSet<WalletAddress>,
245 ) -> Result<Decimal, String> {
246 let mut quantities = BTreeMap::<(WalletAddress, String), PmShortOptionQuantity>::new();
247 let portfolios = self.portfolio_cache.get_all_portfolios().await;
248 self.collect_pm_position_quantities(
249 underlying,
250 eligible_wallets,
251 &portfolios,
252 &mut quantities,
253 );
254 self.collect_pm_open_sell_quantities(underlying, eligible_wallets, &mut quantities)?;
255
256 let mut short_option_oi_usdc = Decimal::ZERO;
257 for quantity in quantities.values() {
258 let short_open_order_quantity =
259 (quantity.open_sell_quantity - quantity.long_position_quantity).max(Decimal::ZERO);
260 let aggregate_short_quantity =
261 quantity.short_position_quantity + short_open_order_quantity;
262 if aggregate_short_quantity <= Decimal::ZERO {
263 continue;
264 }
265
266 let spot_price = self.pm_spot_price(&quantity.underlying).await?;
267 short_option_oi_usdc += spot_price * aggregate_short_quantity;
268 }
269
270 Ok(short_option_oi_usdc)
271 }
272
273 fn collect_pm_position_quantities(
274 &self,
275 underlying: &str,
276 eligible_wallets: &BTreeSet<WalletAddress>,
277 portfolios: &HashMap<WalletAddress, PortfolioSummary>,
278 quantities: &mut BTreeMap<(WalletAddress, String), PmShortOptionQuantity>,
279 ) {
280 for (wallet, portfolio) in portfolios {
281 if !eligible_wallets.contains(wallet) {
282 continue;
283 }
284 for (symbol, position) in &portfolio.positions {
285 let Ok(parsed) = ParsedInstrument::parse(symbol) else {
286 continue;
287 };
288 if parsed.underlying != underlying {
289 continue;
290 }
291
292 let quantity = quantities.entry((*wallet, symbol.clone())).or_default();
293 quantity.underlying = parsed.underlying;
294 if position.amount < Decimal::ZERO {
295 quantity.short_position_quantity = position.amount.abs();
296 } else {
297 quantity.long_position_quantity = position.amount;
298 }
299 }
300 }
301 }
302
303 fn collect_pm_open_sell_quantities(
304 &self,
305 underlying: &str,
306 eligible_wallets: &BTreeSet<WalletAddress>,
307 quantities: &mut BTreeMap<(WalletAddress, String), PmShortOptionQuantity>,
308 ) -> Result<(), String> {
309 let Some(order_snapshot) = &self.order_snapshot else {
310 return Err("missing order snapshot for PM settlement pool target metric".to_string());
311 };
312
313 for (order, wallet) in order_snapshot.get_all_orders() {
314 if !eligible_wallets.contains(&wallet)
315 || order.is_perp
316 || !matches!(order.side, Side::Sell)
317 || order.remaining_size <= Decimal::ZERO
318 {
319 continue;
320 }
321 let Ok(parsed) = ParsedInstrument::parse(&order.symbol) else {
322 continue;
323 };
324 if parsed.underlying != underlying {
325 continue;
326 }
327
328 let quantity = quantities.entry((wallet, order.symbol)).or_default();
329 quantity.underlying = parsed.underlying;
330 quantity.open_sell_quantity += order.remaining_size;
332 }
333 Ok(())
334 }
335
336 async fn pm_spot_price(&self, underlying: &str) -> Result<Decimal, String> {
337 let Some(greeks_cache) = &self.greeks_cache else {
338 return Err("missing greeks cache for PM settlement pool target metric".to_string());
339 };
340 let spot_price = greeks_cache
341 .get_spot_price(underlying)
342 .await
343 .ok_or_else(|| format!("missing spot price facts for {underlying}"))?;
344 let spot_price = Decimal::from_f64_retain(spot_price)
345 .ok_or_else(|| format!("invalid spot price facts for {underlying}: {spot_price}"))?;
346 if spot_price <= Decimal::ZERO {
347 return Err(format!(
348 "nonpositive spot price facts for {underlying}: {spot_price}"
349 ));
350 }
351 Ok(spot_price)
352 }
353}
354
355fn pm_target_short_oi_notional_multiplier(
356 pool: &PmSettlementPoolProjection,
357) -> Result<Decimal, String> {
358 pool.target_short_oi_notional_multiplier.ok_or_else(|| {
359 format!(
360 "missing PM settlement pool config facts for {} target recomputation",
361 pool.underlying
362 )
363 })
364}
365
366#[cfg(test)]
367mod tests {
368 use super::*;
369 use chrono::Utc;
370 use rust_decimal_macros::dec;
371
372 fn pm_pool_projection_with_multiplier(
373 target_short_oi_notional_multiplier: Option<Decimal>,
374 ) -> PmSettlementPoolProjection {
375 PmSettlementPoolProjection {
376 underlying: "BTC".to_string(),
377 config_version: 1,
378 policy_version: 1,
379 pool_available_usdc: dec!(1000),
380 pool_target_usdc: dec!(500),
381 pool_capacity_usdc: dec!(2000),
382 pool_utilization: Some(dec!(0.5)),
383 active_timing_bridge_usdc: Decimal::ZERO,
384 active_settlement_debt_usdc: Decimal::ZERO,
385 target_short_oi_notional_multiplier,
386 utilization_kink: Some(dec!(0.6)),
387 apr_at_kink: Some(dec!(0.10)),
388 max_apr: Some(dec!(1.00)),
389 normal_utilization_cap: Some(dec!(0.80)),
390 crisis_utilization_cap: Some(dec!(0.95)),
391 bridge_window_ms: Some(86_400_000),
392 last_engine_command_id: 1,
393 projection_seq: 1,
394 updated_at: Utc::now(),
395 }
396 }
397
398 #[test]
399 fn pm_pool_target_multiplier_missing_config_is_unavailable() {
400 let pool = pm_pool_projection_with_multiplier(None);
401
402 let error = pm_target_short_oi_notional_multiplier(&pool)
403 .expect_err("missing target multiplier must mark target facts unavailable");
404
405 assert!(error.contains("missing PM settlement pool config facts"));
406 }
407
408 #[test]
409 fn pm_pool_target_multiplier_reads_config_value() {
410 let pool = pm_pool_projection_with_multiplier(Some(dec!(0.25)));
411
412 let multiplier =
413 pm_target_short_oi_notional_multiplier(&pool).expect("config value should be present");
414
415 assert_eq!(multiplier, dec!(0.25));
416 }
417}