1use axum::{extract::State, http::StatusCode, response::IntoResponse};
4use rust_decimal::Decimal;
5use rust_decimal_macros::dec;
6use serde::Serialize;
7use sonic_rs::json;
8use std::collections::HashMap;
9use tracing::warn;
10
11use crate::state::AdminState;
12use hypercall_runtime_api::sonic_json::SonicJson;
13use hypercall_types::perp_underlying;
14
15#[derive(Debug, Serialize, utoipa::ToSchema)]
17pub struct IntegrityReport {
18 pub timestamp: String,
20 pub overall_status: IntegrityStatus,
22 pub checks: Vec<IntegrityCheck>,
24 pub ledger_summary: LedgerSummary,
26 pub position_summary: PositionSummary,
28 pub settlement_summary: SettlementSummary,
30}
31
32#[derive(Debug, Clone, Copy, Serialize, PartialEq, utoipa::ToSchema)]
34#[serde(rename_all = "UPPERCASE")]
35pub enum IntegrityStatus {
36 Healthy,
38 Warning,
40 Critical,
42}
43
44#[derive(Debug, Serialize, utoipa::ToSchema)]
46pub struct IntegrityCheck {
47 pub name: String,
49 pub status: IntegrityStatus,
51 pub message: String,
53 #[schema(value_type = Option<Object>)]
55 pub details: Option<sonic_rs::Value>,
56}
57
58#[derive(Debug, Serialize, utoipa::ToSchema)]
60pub struct LedgerSummary {
61 pub total_balance: Option<String>,
63 pub authority_state: String,
65 pub account_count: i64,
67 pub total_fills: i64,
69 pub total_volume: String,
71}
72
73#[derive(Debug, Serialize, utoipa::ToSchema)]
75pub struct PositionSummary {
76 pub total_open_interest: String,
78 pub unique_instruments: i64,
80 pub net_position_imbalances: Vec<PositionImbalance>,
82}
83
84#[derive(Debug, Clone, Serialize, PartialEq, utoipa::ToSchema)]
86pub struct PositionImbalance {
87 pub symbol: String,
89 pub net_position: String,
91 pub severity: IntegrityStatus,
93}
94
95#[derive(Debug, Clone, Serialize, PartialEq)]
96struct PerpExposure {
97 symbol: String,
98 net_position: String,
99}
100
101#[derive(Debug, Clone, PartialEq)]
102struct PositionBalanceEvaluation {
103 imbalances: Vec<PositionImbalance>,
104 excluded_perp_exposures: Vec<PerpExposure>,
105 total_imbalance: Decimal,
106 status: IntegrityStatus,
107}
108
109fn position_balance_severity(abs_net: Decimal) -> IntegrityStatus {
110 if abs_net > dec!(0.01) {
111 IntegrityStatus::Critical
112 } else if abs_net > dec!(0.0001) {
113 IntegrityStatus::Warning
114 } else {
115 IntegrityStatus::Healthy
116 }
117}
118
119fn evaluate_position_balance(
120 net_positions_by_symbol: &HashMap<String, Decimal>,
121) -> PositionBalanceEvaluation {
122 let mut imbalances = Vec::new();
123 let mut excluded_perp_exposures = Vec::new();
124 let mut total_imbalance = dec!(0);
125
126 for (symbol, net) in net_positions_by_symbol {
127 if perp_underlying(symbol).is_some() {
128 if *net != Decimal::ZERO {
129 excluded_perp_exposures.push(PerpExposure {
130 symbol: symbol.clone(),
131 net_position: net.to_string(),
132 });
133 }
134 continue;
135 }
136
137 let abs_net = net.abs();
138 total_imbalance += abs_net;
139 let severity = position_balance_severity(abs_net);
140
141 if severity != IntegrityStatus::Healthy {
142 imbalances.push(PositionImbalance {
143 symbol: symbol.clone(),
144 net_position: net.to_string(),
145 severity,
146 });
147 }
148 }
149
150 imbalances.sort_by(|a, b| a.symbol.cmp(&b.symbol));
151 excluded_perp_exposures.sort_by(|a, b| a.symbol.cmp(&b.symbol));
152
153 PositionBalanceEvaluation {
154 imbalances,
155 excluded_perp_exposures,
156 total_imbalance,
157 status: position_balance_severity(total_imbalance),
158 }
159}
160
161#[derive(Debug, Serialize, utoipa::ToSchema)]
163pub struct SettlementSummary {
164 pub total: i64,
166 pub applied: i64,
168 pub pending: i64,
170 pub total_value: String,
172}
173
174#[utoipa::path(
185 get,
186 path = "/monitoring/integrity",
187 responses(
188 (status = 200, description = "Integrity report", body = IntegrityReport),
189 (status = 401, description = "Invalid or missing X-Admin-Key header")
190 ),
191 tag = "Monitoring",
192 security(("admin_key" = []))
193)]
194pub async fn integrity_check(State(app_state): State<AdminState>) -> impl IntoResponse {
195 let timestamp = chrono::Utc::now().to_rfc3339();
196 let mut checks = Vec::new();
197 let mut overall_status = IntegrityStatus::Healthy;
198
199 let _db_handler = match app_state.sync_db.as_ref() {
201 Some(h) => h.clone(),
202 None => {
203 return (
204 StatusCode::SERVICE_UNAVAILABLE,
205 SonicJson(json!({
206 "error": "Database handler not available"
207 })),
208 )
209 .into_response();
210 }
211 };
212
213 let portfolios = app_state.portfolio_cache.get_all_portfolios().await;
215
216 let mut net_positions_by_symbol: std::collections::HashMap<String, Decimal> =
218 std::collections::HashMap::new();
219 let mut oi_by_underlying: std::collections::HashMap<String, Decimal> =
220 std::collections::HashMap::new();
221
222 for summary in portfolios.values() {
223 for (symbol, pos) in &summary.positions {
224 *net_positions_by_symbol.entry(symbol.clone()).or_default() += pos.amount;
226
227 if let Some(underlying) = symbol.split('-').next() {
229 *oi_by_underlying.entry(underlying.to_string()).or_default() += pos.amount.abs();
230 }
231 }
232 }
233
234 let integrity: &dyn hypercall_db::IntegrityReader = app_state.db.as_ref();
236 let db_results = integrity.get_integrity_query_results().await;
237
238 let engine_digest = app_state.engine_state_digest_provider.engine_state_digest();
239 let fill_volume_result = db_results.fill_volume;
240 let settlement_result = db_results.settlement_stats;
241
242 let mut db_errors: Vec<String> = Vec::new();
245
246 if let Err(e) = &fill_volume_result {
247 db_errors.push(format!("fill_volume: {}", e));
248 }
249 if let Err(e) = &settlement_result {
250 db_errors.push(format!("settlement_stats: {}", e));
251 }
252
253 if !db_errors.is_empty() {
254 overall_status = IntegrityStatus::Critical;
255 checks.push(IntegrityCheck {
256 name: "Database Connectivity".to_string(),
257 status: IntegrityStatus::Critical,
258 message: format!("{} database queries failed", db_errors.len()),
259 details: Some(json!({ "failed_queries": db_errors })),
260 });
261 warn!("INTEGRITY CHECK: Database queries failed: {:?}", db_errors);
262 }
263
264 let position_balance = evaluate_position_balance(&net_positions_by_symbol);
271 let position_check_status = position_balance.status;
272
273 if position_check_status == IntegrityStatus::Critical {
274 overall_status = IntegrityStatus::Critical;
275 } else if position_check_status == IntegrityStatus::Warning
276 && overall_status != IntegrityStatus::Critical
277 {
278 overall_status = IntegrityStatus::Warning;
279 }
280
281 checks.push(IntegrityCheck {
282 name: "Position Balance (Zero-Sum)".to_string(),
283 status: position_check_status,
284 message: if position_balance.imbalances.is_empty() {
285 if position_balance.excluded_perp_exposures.is_empty() {
286 "All option positions are balanced (sum of longs equals sum of shorts)".to_string()
287 } else {
288 format!(
289 "All option positions are balanced; {} external perp exposure symbol(s) excluded from zero-sum check",
290 position_balance.excluded_perp_exposures.len()
291 )
292 }
293 } else {
294 format!(
295 "{} option symbol(s) have position imbalance totaling {}",
296 position_balance.imbalances.len(),
297 position_balance.total_imbalance
298 )
299 },
300 details: if position_balance.imbalances.is_empty()
301 && position_balance.excluded_perp_exposures.is_empty()
302 {
303 None
304 } else if position_balance.excluded_perp_exposures.is_empty() {
305 Some(sonic_rs::to_value(&position_balance.imbalances).unwrap_or_default())
306 } else if position_balance.imbalances.is_empty() {
307 Some(json!({
308 "excluded_perp_exposures": position_balance.excluded_perp_exposures.clone(),
309 }))
310 } else {
311 Some(json!({
312 "imbalances": position_balance.imbalances.clone(),
313 "excluded_perp_exposures": position_balance.excluded_perp_exposures.clone(),
314 }))
315 },
316 });
317
318 let settlement_check_status = match &settlement_result {
320 Ok((_, _, pending, _)) => {
321 if *pending > 10 {
322 IntegrityStatus::Critical
323 } else if *pending > 0 {
324 IntegrityStatus::Warning
325 } else {
326 IntegrityStatus::Healthy
327 }
328 }
329 Err(_) => IntegrityStatus::Critical, };
331
332 let (settlement_total, settlement_applied, settlement_pending, settlement_value) =
333 settlement_result
334 .as_ref()
335 .map(|(t, a, p, v)| (*t, *a, *p, *v))
336 .unwrap_or((0, 0, 0, dec!(0)));
337
338 if settlement_check_status == IntegrityStatus::Critical && settlement_result.is_ok() {
339 overall_status = IntegrityStatus::Critical;
340 } else if settlement_check_status == IntegrityStatus::Warning
341 && overall_status != IntegrityStatus::Critical
342 {
343 overall_status = IntegrityStatus::Warning;
344 }
345
346 checks.push(IntegrityCheck {
347 name: "Settlement Processing".to_string(),
348 status: settlement_check_status,
349 message: if settlement_result.is_err() {
350 "Unable to check - database query failed".to_string()
351 } else {
352 format!(
353 "{} total settlements, {} applied, {} pending",
354 settlement_total, settlement_applied, settlement_pending
355 )
356 },
357 details: None,
358 });
359
360 checks.push(IntegrityCheck {
362 name: "Settlement Balance Authority".to_string(),
363 status: IntegrityStatus::Healthy,
364 message:
365 "Settlement cash authority is EngineSnapshot.balance_ledger; DB settlement rows are audit evidence"
366 .to_string(),
367 details: None,
368 });
369
370 let account_count = i64::try_from(engine_digest.cash_wallet_count).unwrap_or(i64::MAX);
375 let (fill_count, fill_volume) = fill_volume_result
376 .as_ref()
377 .map(|(c, v)| (*c, *v))
378 .unwrap_or((0, dec!(0)));
379
380 checks.push(IntegrityCheck {
381 name: "Engine Balance Ledger".to_string(),
382 status: IntegrityStatus::Healthy,
383 message: format!(
384 "Engine balance ledger has {} nonzero wallet(s); DB current-balance projection is write-only/downstream",
385 account_count
386 ),
387 details: Some(json!({
388 "cash_digest": engine_digest.cash_digest
389 })),
390 });
391
392 {
394 use hypercall_journal::checkpoint::{checkpoint_path_for, read_checkpoint};
395
396 let wal_path = app_state.runtime_config.wal_path.clone();
397 let checkpoint_path = checkpoint_path_for(&wal_path);
398
399 let wal_metadata = tokio::fs::metadata(&wal_path).await;
403 let wal_check = match (wal_metadata, read_checkpoint(&checkpoint_path)) {
404 (Ok(meta), Ok(checkpoint)) => {
405 let file_size = meta.len();
406 let unreplicated = file_size.saturating_sub(checkpoint.wal_offset);
407 let unreplicated_mb = unreplicated as f64 / (1024.0 * 1024.0);
408
409 const WARNING_THRESHOLD: u64 = 100 * 1024 * 1024; const CRITICAL_THRESHOLD: u64 = 1024 * 1024 * 1024; let status = if unreplicated > CRITICAL_THRESHOLD {
413 IntegrityStatus::Critical
414 } else if unreplicated > WARNING_THRESHOLD {
415 IntegrityStatus::Warning
416 } else {
417 IntegrityStatus::Healthy
418 };
419
420 if status == IntegrityStatus::Critical {
421 overall_status = IntegrityStatus::Critical;
422 } else if status == IntegrityStatus::Warning
423 && overall_status != IntegrityStatus::Critical
424 {
425 overall_status = IntegrityStatus::Warning;
426 }
427
428 IntegrityCheck {
429 name: "WAL Unreplicated Tail".to_string(),
430 status,
431 message: format!(
432 "Unreplicated tail: {:.1} MB (file: {} bytes, checkpoint: {} bytes)",
433 unreplicated_mb, file_size, checkpoint.wal_offset
434 ),
435 details: Some(json!({
436 "wal_file_bytes": file_size,
437 "checkpoint_offset_bytes": checkpoint.wal_offset,
438 "unreplicated_bytes": unreplicated,
439 "unreplicated_mb": format!("{:.1}", unreplicated_mb),
440 })),
441 }
442 }
443 (Err(e), _) if e.kind() == std::io::ErrorKind::NotFound => {
444 IntegrityCheck {
446 name: "WAL Unreplicated Tail".to_string(),
447 status: IntegrityStatus::Healthy,
448 message: "WAL file not present (journaling may be disabled)".to_string(),
449 details: None,
450 }
451 }
452 (Err(e), _) => {
453 if overall_status != IntegrityStatus::Critical {
455 overall_status = IntegrityStatus::Warning;
456 }
457 IntegrityCheck {
458 name: "WAL Unreplicated Tail".to_string(),
459 status: IntegrityStatus::Warning,
460 message: format!("Unable to stat WAL file: {}", e),
461 details: None,
462 }
463 }
464 (Ok(_), Err(e)) => {
465 if overall_status != IntegrityStatus::Critical {
466 overall_status = IntegrityStatus::Warning;
467 }
468 IntegrityCheck {
469 name: "WAL Unreplicated Tail".to_string(),
470 status: IntegrityStatus::Warning,
471 message: format!("Unable to read checkpoint: {}", e),
472 details: None,
473 }
474 }
475 };
476 checks.push(wal_check);
477 }
478
479 let total_oi: Decimal = oi_by_underlying.values().copied().sum();
481
482 let unique_instruments = net_positions_by_symbol.len() as i64;
483
484 let report = IntegrityReport {
486 timestamp,
487 overall_status,
488 checks,
489 ledger_summary: LedgerSummary {
490 total_balance: None,
491 authority_state: "engine_owned_not_db_aggregated".to_string(),
492 account_count,
493 total_fills: fill_count,
494 total_volume: format!("{:.2}", fill_volume),
495 },
496 position_summary: PositionSummary {
497 total_open_interest: format!("{:.4}", total_oi),
498 unique_instruments,
499 net_position_imbalances: position_balance.imbalances,
500 },
501 settlement_summary: SettlementSummary {
502 total: settlement_total,
503 applied: settlement_applied,
504 pending: settlement_pending,
505 total_value: format!("{:.2}", settlement_value),
506 },
507 };
508
509 (StatusCode::OK, SonicJson(report)).into_response()
510}
511
512#[derive(Debug, Serialize, utoipa::ToSchema)]
514pub struct CrossedOrderbook {
515 pub symbol: String,
517 pub best_bid: f64,
519 pub best_ask: f64,
521 pub spread: f64,
523 pub age_seconds: i64,
525 pub last_l2_seq: Option<i64>,
527 pub bid_levels: usize,
529 pub ask_levels: usize,
531 pub top_bids: Vec<(f64, f64)>,
533 pub top_asks: Vec<(f64, f64)>,
535}
536
537#[derive(Debug, Serialize, utoipa::ToSchema)]
539pub struct OrderbookIntegrityReport {
540 pub timestamp: String,
542 pub total_orderbooks: usize,
544 pub crossed_count: usize,
546 pub stale_count: usize,
548 pub crossed_orderbooks: Vec<CrossedOrderbook>,
550}
551
552#[utoipa::path(
556 get,
557 path = "/monitoring/orderbooks",
558 responses(
559 (status = 200, description = "Orderbook integrity report", body = OrderbookIntegrityReport),
560 (status = 401, description = "Invalid or missing X-Admin-Key header")
561 ),
562 tag = "Monitoring",
563 security(("admin_key" = []))
564)]
565pub async fn orderbook_integrity(State(app_state): State<AdminState>) -> impl IntoResponse {
566 let quotes = app_state.quote_provider.all_quotes();
567 let staleness = app_state.quote_provider.staleness();
568 let l2_seq = app_state.quote_provider.l2_seq();
569 let now = chrono::Utc::now();
570
571 let mut crossed_orderbooks = Vec::new();
572 let age_seconds = staleness.as_secs() as i64;
573
574 for (symbol, quote) in "es {
575 if let (Some(bid), Some(ask)) = (quote.best_bid, quote.best_ask) {
576 if bid > 0.0 && ask > 0.0 && bid >= ask {
577 crossed_orderbooks.push(CrossedOrderbook {
578 symbol: symbol.clone(),
579 best_bid: bid,
580 best_ask: ask,
581 spread: ask - bid,
582 age_seconds,
583 last_l2_seq: Some(l2_seq),
584 bid_levels: quote.bids.len(),
585 ask_levels: quote.asks.len(),
586 top_bids: quote.bids.iter().take(10).cloned().collect(),
587 top_asks: quote.asks.iter().take(10).cloned().collect(),
588 });
589 }
590 }
591 }
592
593 crossed_orderbooks.sort_by(|a, b| {
595 a.spread
596 .partial_cmp(&b.spread)
597 .unwrap_or(std::cmp::Ordering::Equal)
598 });
599
600 let stale_count = if age_seconds > 30 { quotes.len() } else { 0 };
602
603 let report = OrderbookIntegrityReport {
604 timestamp: now.to_rfc3339(),
605 total_orderbooks: quotes.len(),
606 crossed_count: crossed_orderbooks.len(),
607 stale_count,
608 crossed_orderbooks,
609 };
610
611 (StatusCode::OK, SonicJson(report)).into_response()
612}
613
614#[cfg(test)]
615mod tests {
616 use super::*;
617
618 #[test]
619 fn position_balance_excludes_external_perp_exposure() {
620 let positions = HashMap::from([
621 ("BTC-PERP".to_string(), dec!(-0.031)),
622 ("ETH-20260501-2000-C".to_string(), dec!(0)),
623 ]);
624
625 let result = evaluate_position_balance(&positions);
626
627 assert_eq!(result.status, IntegrityStatus::Healthy);
628 assert!(result.imbalances.is_empty());
629 assert_eq!(result.total_imbalance, dec!(0));
630 assert_eq!(
631 result.excluded_perp_exposures,
632 vec![PerpExposure {
633 symbol: "BTC-PERP".to_string(),
634 net_position: "-0.031".to_string(),
635 }]
636 );
637 }
638
639 #[test]
640 fn position_balance_flags_option_imbalance_even_with_perp_exposure() {
641 let positions = HashMap::from([
642 ("BTC-PERP".to_string(), dec!(-0.031)),
643 ("ETH-20260501-2000-C".to_string(), dec!(0.02)),
644 ]);
645
646 let result = evaluate_position_balance(&positions);
647
648 assert_eq!(result.status, IntegrityStatus::Critical);
649 assert_eq!(result.total_imbalance, dec!(0.02));
650 assert_eq!(
651 result.imbalances,
652 vec![PositionImbalance {
653 symbol: "ETH-20260501-2000-C".to_string(),
654 net_position: "0.02".to_string(),
655 severity: IntegrityStatus::Critical,
656 }]
657 );
658 assert_eq!(result.excluded_perp_exposures.len(), 1);
659 }
660
661 #[test]
662 fn position_balance_preserves_warning_threshold_for_small_option_imbalance() {
663 let positions = HashMap::from([("ETH-20260501-2000-C".to_string(), dec!(0.001))]);
664
665 let result = evaluate_position_balance(&positions);
666
667 assert_eq!(result.status, IntegrityStatus::Warning);
668 assert_eq!(result.imbalances[0].severity, IntegrityStatus::Warning);
669 }
670}