1use super::state::{
7 state_str, AccountLiquidationStatus, FullLiquidationMetadata, LiquidatedMetadata,
8 LiquidationState, PartialLiquidationMetadata,
9};
10use crate::rsm::MarginMode;
11use hypercall_db::LiquidationReader;
12use hypercall_types::WalletAddress;
13use rust_decimal::Decimal;
14use serde_json::Value as JsonValue;
15use std::collections::HashMap;
16use std::sync::{Arc, RwLock};
17use tracing::{debug, info, warn};
18
19const DEFAULT_STATE: LiquidationState = LiquidationState::Healthy;
21
22pub struct LiquidationCache {
29 states: Arc<RwLock<HashMap<WalletAddress, AccountLiquidationStatus>>>,
31}
32
33impl LiquidationCache {
34 pub fn new() -> Self {
36 Self {
37 states: Arc::new(RwLock::new(HashMap::new())),
38 }
39 }
40
41 pub fn get_state_sync(&self, wallet: &WalletAddress) -> LiquidationState {
49 let cache = self.states.read().expect("RwLock poisoned");
50 cache
51 .get(wallet)
52 .map(|s| s.state.clone())
53 .unwrap_or(DEFAULT_STATE)
54 }
55
56 pub fn is_pre_liquidation_sync(&self, wallet: &WalletAddress) -> bool {
58 self.get_state_sync(wallet).is_pre_liquidation()
59 }
60
61 pub fn should_block_risk_increasing_sync(&self, wallet: &WalletAddress) -> bool {
63 self.get_state_sync(wallet).should_block_risk_increasing()
64 }
65
66 pub async fn get_status(&self, wallet: &WalletAddress) -> Option<AccountLiquidationStatus> {
74 let cache = self.states.read().expect("RwLock poisoned");
75 cache.get(wallet).cloned()
76 }
77
78 pub async fn get_state(&self, wallet: &WalletAddress) -> LiquidationState {
80 let cache = self.states.read().expect("RwLock poisoned");
81 cache
82 .get(wallet)
83 .map(|s| s.state.clone())
84 .unwrap_or(DEFAULT_STATE)
85 }
86
87 pub async fn get_all_wallets(&self) -> Vec<WalletAddress> {
89 let cache = self.states.read().expect("RwLock poisoned");
90 cache.keys().cloned().collect()
91 }
92
93 pub async fn get_wallets_in_state(&self, state_type: &str) -> Vec<WalletAddress> {
95 let cache = self.states.read().expect("RwLock poisoned");
96 cache
97 .iter()
98 .filter(|(_, status)| status.state.as_str() == state_type)
99 .map(|(wallet, _)| *wallet)
100 .collect()
101 }
102
103 pub async fn get_state_counts(&self) -> HashMap<String, usize> {
105 let cache = self.states.read().expect("RwLock poisoned");
106 let mut counts: HashMap<String, usize> = HashMap::new();
107
108 for status in cache.values() {
109 *counts.entry(status.state.as_str().to_string()).or_insert(0) += 1;
110 }
111
112 counts
113 }
114
115 pub async fn set_status(&self, status: AccountLiquidationStatus) {
117 let wallet = status.wallet;
118 let state_str = status.state.as_str();
119
120 let mut cache = self.states.write().expect("RwLock poisoned");
121 cache.insert(wallet, status);
122
123 debug!("Updated liquidation status for {}: {}", wallet, state_str);
124 }
125
126 pub async fn update_health(
128 &self,
129 wallet: &WalletAddress,
130 equity: Decimal,
131 mm_required: Decimal,
132 timestamp: u64,
133 ) {
134 let mut cache = self.states.write().expect("RwLock poisoned");
135 if let Some(status) = cache.get_mut(wallet) {
136 status.update_health(equity, mm_required, timestamp);
137 }
138 }
139
140 pub async fn enter_pre_liquidation(
144 &self,
145 wallet: &WalletAddress,
146 metadata: PartialLiquidationMetadata,
147 timestamp: u64,
148 ) -> Option<LiquidationState> {
149 let mut cache = self.states.write().expect("RwLock poisoned");
150 if let Some(status) = cache.get_mut(wallet) {
151 let previous = status.state.clone();
152 status.enter_pre_liquidation(metadata, timestamp);
153 info!(
154 "Wallet {} entered pre-liquidation (shortfall: {})",
155 wallet,
156 status.shortfall()
157 );
158 return Some(previous);
159 }
160 None
161 }
162
163 pub async fn recover_to_healthy(
167 &self,
168 wallet: &WalletAddress,
169 timestamp: u64,
170 ) -> Option<LiquidationState> {
171 let mut cache = self.states.write().expect("RwLock poisoned");
172 if let Some(status) = cache.get_mut(wallet) {
173 let previous = status.state.clone();
174 if previous.is_pre_liquidation() {
175 status.recover_to_healthy(timestamp);
176 info!("Wallet {} recovered to healthy state", wallet);
177 return Some(previous);
178 }
179 }
180 None
181 }
182
183 pub async fn enter_liquidation(
187 &self,
188 wallet: &WalletAddress,
189 metadata: FullLiquidationMetadata,
190 timestamp: u64,
191 ) -> Option<LiquidationState> {
192 let mut cache = self.states.write().expect("RwLock poisoned");
193 if let Some(status) = cache.get_mut(wallet) {
194 let previous = status.state.clone();
195 let auction_id = metadata.auction_id.clone();
196 status.enter_liquidation(metadata, timestamp);
197 info!(
198 "Wallet {} entered liquidation, auction_id={}",
199 wallet, auction_id
200 );
201 return Some(previous);
202 }
203 None
204 }
205
206 pub async fn complete_liquidation(
210 &self,
211 wallet: &WalletAddress,
212 metadata: LiquidatedMetadata,
213 timestamp: u64,
214 ) -> Option<LiquidationState> {
215 let mut cache = self.states.write().expect("RwLock poisoned");
216 if let Some(status) = cache.get_mut(wallet) {
217 let previous = status.state.clone();
218 status.complete_liquidation(metadata, timestamp);
219 info!("Wallet {} liquidation completed", wallet);
220 return Some(previous);
221 }
222 None
223 }
224
225 pub async fn remove(&self, wallet: &WalletAddress) {
227 let mut cache = self.states.write().expect("RwLock poisoned");
228 cache.remove(wallet);
229 debug!("Removed wallet {} from liquidation cache", wallet);
230 }
231
232 pub async fn settle_and_complete_liquidation(
233 &self,
234 wallet: &WalletAddress,
235 auction_id: &str,
236 liquidator: &WalletAddress,
237 settlement_value: Decimal,
238 timestamp: u64,
239 ) -> Option<LiquidationState> {
240 info!(
241 target: "liquidation::onchain",
242 wallet = %wallet,
243 auction_id = %auction_id,
244 liquidator = %liquidator,
245 settlement_value = %settlement_value,
246 "ONCHAIN_CALL: LiquidationContract.settleAuction(auction_id={}, liquidator={}, settlement_value={})",
247 auction_id, liquidator, settlement_value
248 );
249
250 info!(
251 target: "liquidation::onchain",
252 wallet = %wallet,
253 auction_id = %auction_id,
254 "ONCHAIN_CALL: RootCommitter.commitMarginRoot() - updating merkle root post-liquidation"
255 );
256
257 let Some(cached_auction_id) = self
258 .get_state(wallet)
259 .await
260 .auction_id()
261 .map(str::to_string)
262 else {
263 return None;
264 };
265 if cached_auction_id != auction_id {
266 warn!(
267 wallet = %wallet,
268 requested_auction_id = %auction_id,
269 cached_auction_id = %cached_auction_id,
270 "Refusing to complete liquidation with mismatched auction ids"
271 );
272 return None;
273 }
274
275 self.complete_liquidation(
276 wallet,
277 LiquidatedMetadata {
278 auction_id: cached_auction_id,
279 completed_at: timestamp,
280 winner: Some(*liquidator),
281 bonus: Decimal::ZERO,
282 tx_hash: None,
283 },
284 timestamp,
285 )
286 .await
287 }
288
289 pub async fn init_if_absent(
291 &self,
292 wallet: WalletAddress,
293 margin_mode: MarginMode,
294 equity: Decimal,
295 mm_required: Decimal,
296 timestamp: u64,
297 ) {
298 let mut cache = self.states.write().expect("RwLock poisoned");
299 cache.entry(wallet).or_insert_with(|| {
300 AccountLiquidationStatus::healthy(wallet, margin_mode, equity, mm_required, timestamp)
301 });
302 }
303
304 #[cfg(test)]
306 pub async fn clear(&self) {
307 let mut cache = self.states.write().expect("RwLock poisoned");
308 cache.clear();
309 }
310
311 pub async fn load_from_db(
317 &self,
318 handler: &hypercall_db_diesel::DieselDb,
319 ) -> anyhow::Result<usize> {
320 let records = handler.get_all_liquidation_states().await?;
321 let count = records.len();
322
323 let mut cache = self.states.write().expect("RwLock poisoned");
324 for record in records {
325 let wallet = record.wallet_address;
326 let status = Self::record_to_status(record).map_err(|error| {
327 anyhow::anyhow!(
328 "failed to parse persisted liquidation state for {}: {}",
329 wallet,
330 error
331 )
332 })?;
333 debug!(
334 "Loaded liquidation state for {}: {}",
335 status.wallet,
336 status.state.as_str()
337 );
338 cache.insert(status.wallet, status);
339 }
340
341 info!("Loaded {} liquidation states from database", count);
342 Ok(count)
343 }
344
345 pub(crate) fn record_to_status(
347 record: hypercall_db::LiquidationStateRecord,
348 ) -> anyhow::Result<AccountLiquidationStatus> {
349 let updated_at = persisted_updated_at_ms(&record)?;
350 let margin_mode: MarginMode = record
351 .margin_mode
352 .parse()
353 .map_err(|_| anyhow::anyhow!("Unknown margin mode: {}", record.margin_mode))?;
354
355 let state = match record.state.as_str() {
356 state_str::HEALTHY => LiquidationState::Healthy,
357 state_str::PRE_LIQUIDATION => {
358 let mm_shortfall = record
359 .mm_shortfall
360 .unwrap_or_else(|| (record.mm_required - record.equity).max(Decimal::ZERO));
361 LiquidationState::PreLiquidation(PartialLiquidationMetadata {
362 entered_at: required_i64(record.entered_pre_liq_at, "entered_pre_liq_at")?,
363 mm_shortfall,
364 target_equity: required_decimal(record.target_equity, "target_equity")?,
365 escalation_deadline: required_i64(
366 record.escalation_deadline,
367 "escalation_deadline",
368 )?,
369 last_reprice_at: optional_i64(record.last_reprice_at),
370 active_order_request_ids: match record.partial_order_request_ids.as_ref() {
371 Some(_) => json_string_array(
372 record.partial_order_request_ids.as_ref(),
373 "partial_order_request_ids",
374 )?,
375 None => Vec::new(),
376 },
377 active_order_client_ids: match record.partial_order_client_ids.as_ref() {
378 Some(_) => json_string_array(
379 record.partial_order_client_ids.as_ref(),
380 "partial_order_client_ids",
381 )?,
382 None => Vec::new(),
383 },
384 bonus_bps: {
385 let raw = required_i32(record.partial_bonus_bps, "partial_bonus_bps")?;
386 if raw < 0 {
387 warn!("partial_bonus_bps is negative ({}), clamping to 0", raw);
388 0u32
389 } else {
390 raw as u32
391 }
392 },
393 pending_full_auction_id: record.auction_id,
394 pending_full_request_id: record.request_id,
395 pending_full_tx_hash: record.tx_hash,
396 pending_full_margin_needed: record.margin_needed,
397 })
398 }
399 state_str::IN_LIQUIDATION => LiquidationState::InLiquidation(FullLiquidationMetadata {
400 auction_id: required_string(record.auction_id, "auction_id")?,
401 request_id: record.request_id,
402 tx_hash: record.tx_hash,
403 started_at: required_i64(record.auction_started_at, "auction_started_at")?,
404 chain_start_time: optional_i64(record.chain_start_time),
405 margin_needed: required_decimal(record.margin_needed, "margin_needed")?,
406 stop_request_id: record.stop_request_id,
407 stop_tx_hash: record.stop_tx_hash,
408 }),
409 state_str::LIQUIDATED => LiquidationState::Liquidated(LiquidatedMetadata {
410 auction_id: required_string(record.auction_id, "auction_id")?,
411 completed_at: required_i64(record.liquidated_at, "liquidated_at")?,
412 winner: record.resolved_winner,
413 bonus: required_decimal(record.resolved_bonus, "resolved_bonus")?,
414 tx_hash: record.resolution_tx_hash,
415 }),
416 _ => return Err(anyhow::anyhow!("Unknown state: {}", record.state)),
417 };
418
419 Ok(AccountLiquidationStatus {
420 wallet: record.wallet_address,
421 state,
422 margin_mode,
423 equity: record.equity,
424 mm_required: record.mm_required,
425 maintenance_margin: record.maintenance_margin,
426 updated_at,
427 })
428 }
429}
430
431fn required_decimal(value: Option<Decimal>, field: &str) -> anyhow::Result<Decimal> {
432 value.ok_or_else(|| anyhow::anyhow!("missing required liquidation field '{}'", field))
433}
434
435fn required_i64(value: Option<i64>, field: &str) -> anyhow::Result<u64> {
436 let raw =
437 value.ok_or_else(|| anyhow::anyhow!("missing required liquidation field '{}'", field))?;
438 u64::try_from(raw)
439 .map_err(|_| anyhow::anyhow!("negative value in liquidation field '{}': {}", field, raw))
440}
441
442fn required_i32(value: Option<i32>, field: &str) -> anyhow::Result<i32> {
443 value.ok_or_else(|| anyhow::anyhow!("missing required liquidation field '{}'", field))
444}
445
446fn optional_i64(value: Option<i64>) -> Option<u64> {
447 value.map(|raw| {
448 u64::try_from(raw).unwrap_or_else(|_| {
449 panic!(
450 "STATE_CORRUPTION: negative persisted liquidation timestamp/block value {}",
451 raw
452 )
453 })
454 })
455}
456
457fn persisted_updated_at_ms(record: &hypercall_db::LiquidationStateRecord) -> anyhow::Result<u64> {
458 if let Some(updated_at_ms) = optional_i64(record.updated_at_ms) {
459 return Ok(updated_at_ms);
460 }
461
462 if let Some(updated_at) = record.updated_at {
463 return Ok(updated_at.and_utc().timestamp_millis() as u64);
464 }
465
466 if let Some(created_at) = record.created_at {
467 return Ok(created_at.and_utc().timestamp_millis() as u64);
468 }
469
470 anyhow::bail!(
471 "missing liquidation updated_at_ms and database timestamps for wallet {}",
472 record.wallet_address
473 )
474}
475
476fn required_string(value: Option<String>, field: &str) -> anyhow::Result<String> {
477 let value =
478 value.ok_or_else(|| anyhow::anyhow!("missing required liquidation field '{}'", field))?;
479 if value.trim().is_empty() {
480 anyhow::bail!("liquidation field '{}' must not be empty", field);
481 }
482 Ok(value)
483}
484
485fn json_string_array(value: Option<&JsonValue>, field: &str) -> anyhow::Result<Vec<String>> {
486 let Some(value) = value else {
487 return Ok(Vec::new());
488 };
489 let array = value.as_array().ok_or_else(|| {
490 anyhow::anyhow!(
491 "liquidation field '{}' must be a JSON array of strings",
492 field
493 )
494 })?;
495 array
496 .iter()
497 .map(|entry| {
498 entry.as_str().map(ToOwned::to_owned).ok_or_else(|| {
499 anyhow::anyhow!(
500 "liquidation field '{}' must contain only string values",
501 field
502 )
503 })
504 })
505 .collect()
506}
507
508impl Default for LiquidationCache {
509 fn default() -> Self {
510 Self::new()
511 }
512}
513
514#[cfg(test)]
515mod tests {
516 use super::*;
517 use chrono::Utc;
518 use rust_decimal_macros::dec;
519
520 fn test_wallet() -> WalletAddress {
521 "0x1234567890123456789012345678901234567890"
522 .parse()
523 .unwrap()
524 }
525
526 fn test_wallet_2() -> WalletAddress {
527 "0xabcdefabcdefabcdefabcdefabcdefabcdefabcd"
528 .parse()
529 .unwrap()
530 }
531
532 fn partial_metadata(shortfall: Decimal) -> PartialLiquidationMetadata {
533 PartialLiquidationMetadata {
534 entered_at: 1_500,
535 mm_shortfall: shortfall,
536 target_equity: dec!(5_500),
537 escalation_deadline: 61_500,
538 last_reprice_at: Some(1_750),
539 active_order_request_ids: vec!["req-1".to_string()],
540 active_order_client_ids: vec!["liq-1".to_string()],
541 bonus_bps: 100,
542 pending_full_auction_id: None,
543 pending_full_request_id: None,
544 pending_full_tx_hash: None,
545 pending_full_margin_needed: None,
546 }
547 }
548
549 #[tokio::test]
550 async fn test_empty_cache_returns_healthy() {
551 let cache = LiquidationCache::new();
552 let wallet = test_wallet();
553
554 assert!(cache.get_state(&wallet).await.is_healthy());
556
557 assert!(cache.get_state_sync(&wallet).is_healthy());
559 assert!(!cache.should_block_risk_increasing_sync(&wallet));
560 }
561
562 #[tokio::test]
563 async fn test_set_and_get_status() {
564 let cache = LiquidationCache::new();
565 let wallet = test_wallet();
566
567 let status = AccountLiquidationStatus::healthy(
568 wallet,
569 MarginMode::Standard,
570 dec!(10000),
571 dec!(5000),
572 1000,
573 );
574
575 cache.set_status(status).await;
576
577 let retrieved = cache.get_status(&wallet).await.unwrap();
578 assert!(retrieved.state.is_healthy());
579 assert_eq!(retrieved.equity, dec!(10000));
580 }
581
582 #[tokio::test]
583 async fn test_pre_liquidation_transition() {
584 let cache = LiquidationCache::new();
585 let wallet = test_wallet();
586
587 let status = AccountLiquidationStatus::healthy(
589 wallet,
590 MarginMode::Standard,
591 dec!(4000),
592 dec!(5000),
593 1000,
594 );
595 cache.set_status(status).await;
596
597 let previous = cache
599 .enter_pre_liquidation(&wallet, partial_metadata(dec!(1000)), 2000)
600 .await;
601 assert!(previous.is_some());
602 assert!(previous.unwrap().is_healthy());
603
604 assert!(cache.is_pre_liquidation_sync(&wallet));
606 assert!(cache.should_block_risk_increasing_sync(&wallet));
607 }
608
609 #[tokio::test]
610 async fn test_recovery_transition() {
611 let cache = LiquidationCache::new();
612 let wallet = test_wallet();
613
614 let mut status = AccountLiquidationStatus::healthy(
616 wallet,
617 MarginMode::Standard,
618 dec!(4000),
619 dec!(5000),
620 1000,
621 );
622 status.enter_pre_liquidation(partial_metadata(dec!(1000)), 1500);
623 cache.set_status(status).await;
624
625 assert!(cache.is_pre_liquidation_sync(&wallet));
626
627 let previous = cache.recover_to_healthy(&wallet, 2000).await;
629 assert!(previous.is_some());
630 assert!(previous.unwrap().is_pre_liquidation());
631
632 assert!(!cache.is_pre_liquidation_sync(&wallet));
634 assert!(!cache.should_block_risk_increasing_sync(&wallet));
635 }
636
637 #[tokio::test]
638 async fn test_get_wallets_in_state() {
639 let cache = LiquidationCache::new();
640 let wallet1 = test_wallet();
641 let wallet2 = test_wallet_2();
642
643 let status1 = AccountLiquidationStatus::healthy(
645 wallet1,
646 MarginMode::Standard,
647 dec!(10000),
648 dec!(5000),
649 1000,
650 );
651 cache.set_status(status1).await;
652
653 let mut status2 = AccountLiquidationStatus::healthy(
654 wallet2,
655 MarginMode::Portfolio,
656 dec!(4000),
657 dec!(5000),
658 1000,
659 );
660 status2.enter_pre_liquidation(partial_metadata(dec!(1000)), 1500);
661 cache.set_status(status2).await;
662
663 let healthy = cache.get_wallets_in_state("Healthy").await;
665 assert_eq!(healthy.len(), 1);
666 assert_eq!(healthy[0], wallet1);
667
668 let pre_liq = cache.get_wallets_in_state("PreLiquidation").await;
669 assert_eq!(pre_liq.len(), 1);
670 assert_eq!(pre_liq[0], wallet2);
671 }
672
673 #[tokio::test]
674 async fn test_state_counts() {
675 let cache = LiquidationCache::new();
676
677 for i in 0..5 {
679 let wallet: WalletAddress = format!("0x{:040x}", i).parse().unwrap();
680 let status = AccountLiquidationStatus::healthy(
681 wallet,
682 MarginMode::Standard,
683 dec!(10000),
684 dec!(5000),
685 1000,
686 );
687 cache.set_status(status).await;
688 }
689
690 for i in 5..7 {
692 let wallet: WalletAddress = format!("0x{:040x}", i).parse().unwrap();
693 let mut status = AccountLiquidationStatus::healthy(
694 wallet,
695 MarginMode::Standard,
696 dec!(4000),
697 dec!(5000),
698 1000,
699 );
700 status.enter_pre_liquidation(partial_metadata(dec!(1000)), 1500);
701 cache.set_status(status).await;
702 }
703
704 let counts = cache.get_state_counts().await;
705 assert_eq!(counts.get("Healthy"), Some(&5));
706 assert_eq!(counts.get("PreLiquidation"), Some(&2));
707 }
708
709 #[tokio::test]
710 async fn test_init_if_absent() {
711 let cache = LiquidationCache::new();
712 let wallet = test_wallet();
713
714 cache
716 .init_if_absent(wallet, MarginMode::Standard, dec!(10000), dec!(5000), 1000)
717 .await;
718
719 let status = cache.get_status(&wallet).await.unwrap();
720 assert_eq!(status.equity, dec!(10000));
721
722 cache
724 .init_if_absent(wallet, MarginMode::Standard, dec!(20000), dec!(5000), 2000)
725 .await;
726
727 let status = cache.get_status(&wallet).await.unwrap();
728 assert_eq!(status.equity, dec!(10000)); }
730
731 #[test]
732 fn test_record_to_status_requires_partial_bonus_bps() {
733 let now = Utc::now().naive_utc();
734 let error = LiquidationCache::record_to_status(hypercall_db::LiquidationStateRecord {
735 wallet_address: test_wallet(),
736 state: state_str::PRE_LIQUIDATION.to_string(),
737 margin_mode: MarginMode::Standard.as_str().to_string(),
738 equity: dec!(4000),
739 mm_required: dec!(5000),
740 maintenance_margin: dec!(-1000),
741 liquidation_mode: Some("partial".to_string()),
742 target_equity: Some(dec!(6000)),
743 entered_pre_liq_at: Some(2_000),
744 mm_shortfall: Some(dec!(1000)),
745 escalation_deadline: Some(62_000),
746 last_reprice_at: Some(3_000),
747 partial_order_request_ids: Some(JsonValue::Array(vec![JsonValue::String(
748 "req-1".to_string(),
749 )])),
750 partial_order_client_ids: Some(JsonValue::Array(vec![JsonValue::String(
751 "liq-1".to_string(),
752 )])),
753 partial_bonus_bps: None,
754 auction_id: None,
755 request_id: None,
756 tx_hash: None,
757 auction_started_at: None,
758 chain_start_time: None,
759 margin_needed: None,
760 stop_request_id: None,
761 stop_tx_hash: None,
762 liquidated_at: None,
763 resolved_winner: None,
764 resolved_bonus: None,
765 resolution_tx_hash: None,
766 last_observed_block: None,
767 updated_at_ms: Some(3_500),
768 created_at: Some(now),
769 updated_at: Some(now),
770 })
771 .unwrap_err();
772
773 assert!(error
774 .to_string()
775 .contains("missing required liquidation field 'partial_bonus_bps'"));
776 }
777}