1use super::cache::LiquidationCache;
7use super::executor::LiquidationExecutor;
8use super::health_check::{check_portfolio_health, check_standard_health, LiquidationHealthResult};
9use super::partial::{
10 compute_partial_bonus_bps, greedy_partial_plan, is_above_partial_target, required_mm_relief,
11 target_equity_from_mm, LiquidationSliceCandidate, PartialLiquidationOrderPlan,
12};
13use super::state::{
14 has_material_projection_change, AccountLiquidationStatus, LiquidationState,
15 PartialLiquidationMetadata,
16};
17use crate::portfolio::{PortfolioService, PositionData};
18use crate::read_cache::tier::TierCache;
19use crate::rsm::liquidation_manager::LiquidationManager;
20use crate::rsm::margin_service::SpanMarginService;
21use crate::rsm::unified_engine::{increment_pending_requests, UnifiedEngineRequest};
22use crate::rsm::MarginMode;
23use crate::shared::order_types::{to_contract_units_decimal, ParsedSymbol};
24use crate::standard_margin::StandardAccountBuilder;
25use anyhow::{anyhow, Context};
26use hypercall_margin::standard::StandardAccount;
27use hypercall_margin::{PortfolioMarginOptionKey, PortfolioMarginSnapshot};
28use hypercall_runtime_api::{OrderSnapshotProvider, QuoteProvider, RuntimeOrderSummary};
29use hypercall_types::WalletAddress;
30use hypercall_types::{
31 EngineMessage, LiquidationStateMessage, LiquidationStateType, OrderAction, OrderActionMessage,
32 OrderInfo, OrderUpdateMessage, OrderUpdateStatus, Side, TimeInForce,
33};
34use metrics::{counter, gauge};
35use rust_decimal::Decimal;
36use rust_decimal_macros::dec;
37use std::collections::{HashMap, HashSet};
38use std::sync::Arc;
39use std::time::{Duration, Instant};
40use tokio::sync::broadcast;
41use tokio::sync::mpsc;
42use tokio::time::{interval, timeout};
43use tracing::{debug, error, info, warn};
44use uuid::Uuid;
45
46const ENGINE_RESPONSE_TIMEOUT: Duration = Duration::from_secs(10);
47const PARTIAL_LIQ_MIN_SIZE: Decimal = dec!(0.00000001);
48const PARTIAL_PLAN_MAX_ROUNDS: usize = 3;
49
50#[derive(Debug)]
51struct PartialCycleOutcome {
52 metadata: PartialLiquidationMetadata,
53 can_recover: bool,
54}
55
56#[derive(Debug)]
57struct OpenOrderSyncResult {
58 open_liquidation_client_ids: Vec<String>,
59}
60
61#[derive(Debug, Clone)]
62struct CancelTarget {
63 order_id: u64,
64 client_id: Option<String>,
65}
66
67#[derive(Clone, Debug)]
69pub struct LiquidationWatcherConfig {
70 pub poll_interval: Duration,
72 pub full_escalation_timeout: Duration,
74 pub min_shortfall_threshold: Decimal,
76 pub partial_target_buffer_bps: u32,
78 pub partial_reprice_interval: Duration,
80 pub partial_bonus_start_bps: u32,
82 pub partial_bonus_max_bps: u32,
84 pub partial_bonus_ramp_ms: u64,
86}
87
88impl Default for LiquidationWatcherConfig {
89 fn default() -> Self {
90 Self {
91 poll_interval: Duration::from_secs(5),
92 full_escalation_timeout: Duration::from_secs(60),
93 min_shortfall_threshold: dec!(0),
94 partial_target_buffer_bps: 500,
95 partial_reprice_interval: Duration::from_secs(5),
96 partial_bonus_start_bps: 50,
97 partial_bonus_max_bps: 1_000,
98 partial_bonus_ramp_ms: 300_000,
99 }
100 }
101}
102
103impl LiquidationWatcherConfig {
104 pub fn from_runtime_config(config: &crate::backend_config::LiquidationRuntimeConfig) -> Self {
105 Self {
106 poll_interval: Duration::from_millis(config.health_poll_interval_ms),
107 full_escalation_timeout: Duration::from_millis(config.full_escalation_timeout_ms),
108 min_shortfall_threshold: config.min_shortfall_threshold,
109 partial_target_buffer_bps: config.partial_target_buffer_bps,
110 partial_reprice_interval: Duration::from_millis(config.partial_reprice_interval_ms),
111 partial_bonus_start_bps: config.partial_bonus_start_bps,
112 partial_bonus_max_bps: config.partial_bonus_max_bps,
113 partial_bonus_ramp_ms: config.partial_bonus_ramp_ms,
114 }
115 }
116}
117
118pub struct LiquidationWatcher {
122 cache: Arc<LiquidationCache>,
124 portfolio_service: Arc<dyn PortfolioService + Send + Sync>,
126 tier_cache: Option<Arc<TierCache>>,
128 span_margin_service: Arc<SpanMarginService>,
130 standard_margin_service: Arc<crate::standard_margin::StandardMarginService>,
132 risk_account_builder: Option<Arc<crate::rsm::portfolio_margin::RiskAccountBuilder>>,
134 standard_account_builder: Option<Arc<StandardAccountBuilder>>,
136 quote_provider: Option<Arc<dyn QuoteProvider>>,
138 greeks_cache: Option<Arc<crate::read_cache::greeks::GreeksCache>>,
140 order_snapshot: Option<Arc<dyn OrderSnapshotProvider>>,
142 order_sender: Option<mpsc::Sender<UnifiedEngineRequest>>,
144 event_sender: Option<mpsc::UnboundedSender<EngineMessage>>,
146 executor: Option<Arc<LiquidationExecutor>>,
148 config: LiquidationWatcherConfig,
150}
151
152impl LiquidationWatcher {
153 pub fn new(
155 cache: Arc<LiquidationCache>,
156 portfolio_service: Arc<dyn PortfolioService + Send + Sync>,
157 tier_cache: Arc<TierCache>,
158 span_margin_service: Arc<SpanMarginService>,
159 standard_margin_service: Arc<crate::standard_margin::StandardMarginService>,
160 config: LiquidationWatcherConfig,
161 ) -> Self {
162 Self {
163 cache,
164 portfolio_service,
165 tier_cache: Some(tier_cache),
166 span_margin_service,
167 standard_margin_service,
168 risk_account_builder: None,
169 standard_account_builder: None,
170 quote_provider: None,
171 greeks_cache: None,
172 order_snapshot: None,
173 order_sender: None,
174 event_sender: None,
175 executor: None,
176 config,
177 }
178 }
179
180 #[cfg(test)]
181 fn new_for_tests_without_tier_cache(
182 cache: Arc<LiquidationCache>,
183 portfolio_service: Arc<dyn PortfolioService + Send + Sync>,
184 span_margin_service: Arc<SpanMarginService>,
185 standard_margin_service: Arc<crate::standard_margin::StandardMarginService>,
186 config: LiquidationWatcherConfig,
187 ) -> Self {
188 Self {
189 cache,
190 portfolio_service,
191 tier_cache: None,
192 span_margin_service,
193 standard_margin_service,
194 risk_account_builder: None,
195 standard_account_builder: None,
196 quote_provider: None,
197 greeks_cache: None,
198 order_snapshot: None,
199 order_sender: None,
200 event_sender: None,
201 executor: None,
202 config,
203 }
204 }
205
206 pub fn with_risk_account_builder(
208 mut self,
209 builder: Arc<crate::rsm::portfolio_margin::RiskAccountBuilder>,
210 ) -> Self {
211 self.risk_account_builder = Some(builder);
212 self
213 }
214
215 pub fn with_standard_account_builder(mut self, builder: Arc<StandardAccountBuilder>) -> Self {
217 self.standard_account_builder = Some(builder);
218 self
219 }
220
221 pub fn with_quote_provider(mut self, provider: Arc<dyn QuoteProvider>) -> Self {
223 self.quote_provider = Some(provider);
224 self
225 }
226
227 pub fn with_greeks_cache(mut self, cache: Arc<crate::read_cache::greeks::GreeksCache>) -> Self {
229 self.greeks_cache = Some(cache);
230 self
231 }
232
233 pub fn with_order_snapshot(mut self, snapshot: Arc<dyn OrderSnapshotProvider>) -> Self {
235 self.order_snapshot = Some(snapshot);
236 self
237 }
238
239 pub fn with_order_sender(mut self, sender: mpsc::Sender<UnifiedEngineRequest>) -> Self {
241 self.order_sender = Some(sender);
242 self
243 }
244
245 pub fn with_event_sender(mut self, sender: mpsc::UnboundedSender<EngineMessage>) -> Self {
247 self.event_sender = Some(sender);
248 self
249 }
250
251 pub fn with_executor(mut self, executor: Arc<LiquidationExecutor>) -> Self {
253 self.executor = Some(executor);
254 self
255 }
256
257 pub async fn run(&self) {
259 info!(
260 "Starting liquidation watcher with poll interval {:?}",
261 self.config.poll_interval
262 );
263
264 let mut interval = interval(self.config.poll_interval);
265
266 loop {
267 interval.tick().await;
268
269 if let Err(e) = self.check_all_accounts().await {
270 error!("Error checking accounts for liquidation: {}", e);
271 }
272 }
273 }
274
275 pub async fn run_with_shutdown(&self, mut shutdown_rx: broadcast::Receiver<()>) {
276 info!(
277 "Starting liquidation watcher with poll interval {:?}",
278 self.config.poll_interval
279 );
280
281 let mut interval = interval(self.config.poll_interval);
282
283 loop {
284 tokio::select! {
285 _ = shutdown_rx.recv() => {
286 info!("Liquidation watcher received shutdown signal");
287 break;
288 }
289 _ = interval.tick() => {
290 if let Err(e) = self.check_all_accounts().await {
291 error!("Error checking accounts for liquidation: {}", e);
292 }
293 }
294 }
295 }
296 }
297
298 async fn check_all_accounts(&self) -> anyhow::Result<()> {
300 let now = get_timestamp_millis();
301
302 let all_portfolios = self.portfolio_service.all_portfolios().await;
306 let mut wallets_to_check: HashSet<WalletAddress> = all_portfolios.keys().copied().collect();
307 for wallet in self.cache.get_all_wallets().await {
308 if matches!(
309 self.cache.get_state(&wallet).await,
310 LiquidationState::PreLiquidation(..) | LiquidationState::InLiquidation(..)
311 ) {
312 wallets_to_check.insert(wallet);
313 }
314 }
315
316 let wallet_count = wallets_to_check.len();
317 if wallet_count == 0 {
318 gauge!("ht_liquidation_partial_active_orders").set(0.0);
319 return Ok(());
320 }
321
322 debug!("Checking {} accounts for liquidation health", wallet_count);
323
324 let mut pre_liq_count = 0;
325 let mut healthy_count = 0;
326 let mut active_partial_orders = 0usize;
327
328 for wallet in wallets_to_check {
329 let has_positions = all_portfolios
330 .get(&wallet)
331 .is_some_and(|portfolio| !portfolio.positions.is_empty());
332 let Some(existing_status) = self.cache.get_status(&wallet).await else {
333 if !has_positions {
334 continue;
335 }
336 let Ok(margin_mode) = self.margin_mode_for_wallet(&wallet).await else {
339 warn!(
340 "Failed to determine margin mode for newly discovered wallet {}",
341 wallet
342 );
343 continue;
344 };
345 let health_result = match margin_mode {
346 MarginMode::Portfolio => self.check_portfolio_margin_health(&wallet).await,
347 MarginMode::Standard => self.check_standard_margin_health(&wallet).await,
348 };
349
350 let health = match health_result {
351 Ok(h) => h,
352 Err(e) => {
353 warn!("Failed to check health for {}: {}", wallet, e);
354 continue;
355 }
356 };
357
358 self.process_health_check(&wallet, &health, margin_mode, now)
359 .await;
360
361 if let Some(status) = self.cache.get_status(&wallet).await {
362 if let LiquidationState::PreLiquidation(metadata) = &status.state {
363 active_partial_orders += metadata.active_order_client_ids.len();
364 }
365 }
366
367 if health.is_liquidatable {
368 pre_liq_count += 1;
369 } else {
370 healthy_count += 1;
371 }
372 continue;
373 };
374 if !has_positions && existing_status.state.is_healthy() {
375 continue;
376 }
377
378 let Ok(margin_mode) = self.margin_mode_for_wallet(&wallet).await else {
379 warn!(
380 "Failed to determine margin mode for tracked wallet {}",
381 wallet
382 );
383 continue;
384 };
385 let health_result = match margin_mode {
386 MarginMode::Portfolio => self.check_portfolio_margin_health(&wallet).await,
387 MarginMode::Standard => self.check_standard_margin_health(&wallet).await,
388 };
389
390 let health = match health_result {
391 Ok(h) => h,
392 Err(e) => {
393 warn!("Failed to check health for {}: {}", wallet, e);
394 continue;
395 }
396 };
397
398 self.process_health_check(&wallet, &health, margin_mode, now)
399 .await;
400
401 if let Some(status) = self.cache.get_status(&wallet).await {
402 if let LiquidationState::PreLiquidation(metadata) = &status.state {
403 active_partial_orders += metadata.active_order_client_ids.len();
404 }
405 }
406
407 if health.is_liquidatable {
408 pre_liq_count += 1;
409 } else {
410 healthy_count += 1;
411 }
412 }
413
414 gauge!("ht_liquidation_partial_active_orders").set(active_partial_orders as f64);
415
416 if pre_liq_count > 0 {
417 info!(
418 "Liquidation check complete: {} healthy, {} in/near pre-liquidation",
419 healthy_count, pre_liq_count
420 );
421 }
422
423 Ok(())
424 }
425
426 async fn margin_mode_for_wallet(&self, wallet: &WalletAddress) -> anyhow::Result<MarginMode> {
427 if let Some(tier_cache) = &self.tier_cache {
428 return tier_cache.get_margin_mode(wallet).await;
429 }
430
431 self.cache
432 .get_status(wallet)
433 .await
434 .map(|status| status.margin_mode)
435 .ok_or_else(|| {
436 anyhow!(
437 "tier_cache not configured and wallet {} not tracked",
438 wallet
439 )
440 })
441 }
442
443 async fn check_portfolio_margin_health(
445 &self,
446 wallet: &WalletAddress,
447 ) -> anyhow::Result<LiquidationHealthResult> {
448 let builder = self
449 .risk_account_builder
450 .as_ref()
451 .ok_or_else(|| anyhow!("RiskAccountBuilder not set"))?;
452
453 let account = builder
454 .build_executed_account_for_risk(wallet)
455 .await
456 .map_err(|e| anyhow!("Failed to build risk account: {:?}", e))?;
457
458 Ok(check_portfolio_health(
459 &wallet.to_string(),
460 &account,
461 self.span_margin_service.as_ref(),
462 )
463 .await)
464 }
465
466 async fn check_standard_margin_health(
468 &self,
469 wallet: &WalletAddress,
470 ) -> anyhow::Result<LiquidationHealthResult> {
471 let builder = self
472 .standard_account_builder
473 .as_ref()
474 .ok_or_else(|| anyhow!("StandardAccountBuilder not set"))?;
475
476 let account = builder
477 .build(wallet)
478 .await
479 .map_err(|e| anyhow!("Failed to build standard account: {}", e))?;
480
481 Ok(check_standard_health(
482 &wallet.to_string(),
483 &account,
484 self.standard_margin_service.as_ref(),
485 ))
486 }
487
488 async fn process_health_check(
490 &self,
491 wallet: &WalletAddress,
492 health: &LiquidationHealthResult,
493 margin_mode: MarginMode,
494 now: u64,
495 ) {
496 let equity = decimal_from_f64(health.equity, "equity");
497 let mm_required = decimal_from_f64(health.mm_required, "mm_required");
498 let shortfall = decimal_from_f64(health.shortfall(), "shortfall");
499
500 self.cache
501 .init_if_absent(*wallet, margin_mode, equity, mm_required, now)
502 .await;
503 self.cache
504 .update_health(wallet, equity, mm_required, now)
505 .await;
506
507 let Some(status) = self.cache.get_status(wallet).await else {
508 return;
509 };
510
511 match status.state.clone() {
512 LiquidationState::Healthy => {
513 if health.is_liquidatable && shortfall >= self.config.min_shortfall_threshold {
514 let metadata = PartialLiquidationMetadata {
515 entered_at: now,
516 mm_shortfall: shortfall,
517 target_equity: target_equity_from_mm(
518 mm_required,
519 self.config.partial_target_buffer_bps,
520 ),
521 escalation_deadline: now
522 + self.config.full_escalation_timeout.as_millis() as u64,
523 last_reprice_at: None,
524 active_order_request_ids: Vec::new(),
525 active_order_client_ids: Vec::new(),
526 bonus_bps: self.config.partial_bonus_start_bps,
527 pending_full_auction_id: None,
528 pending_full_request_id: None,
529 pending_full_tx_hash: None,
530 pending_full_margin_needed: None,
531 };
532
533 if let Some(previous) = self
534 .cache
535 .enter_pre_liquidation(wallet, metadata, now)
536 .await
537 {
538 counter!("ht_liquidation_partial_enter_total").increment(1);
539 if let Some(updated_status) = self.cache.get_status(wallet).await {
540 self.emit_state_change(&previous, &updated_status, now);
541 }
542 }
543
544 self.handle_pre_liquidation_state(wallet, margin_mode, now)
545 .await;
546 }
547 }
548 LiquidationState::PreLiquidation(..) => {
549 self.handle_pre_liquidation_state(wallet, margin_mode, now)
550 .await;
551 }
552 LiquidationState::InLiquidation(metadata) => {
553 if !health.is_liquidatable {
554 if metadata.stop_request_id.is_some() {
555 debug!(
556 "Wallet {} recovered while liquidation stop request is already pending",
557 wallet
558 );
559 } else if let Some(chain_start_time) = metadata.chain_start_time {
560 if let Some(executor) = &self.executor {
561 match executor.stop_auction(wallet, chain_start_time).await {
562 Ok(request_id) => {
563 info!(
564 "Enqueued stop liquidation request {} for recovered wallet {}",
565 request_id, wallet
566 );
567 }
568 Err(e) => {
569 error!(
570 "Failed to enqueue stop liquidation for wallet {}: {}",
571 wallet, e
572 );
573 }
574 }
575 }
576 } else {
577 debug!(
578 "Wallet {} recovered but liquidation has no observed chain_start_time yet",
579 wallet
580 );
581 }
582 }
583 }
584 LiquidationState::Liquidated(..) => {}
585 }
586 }
587
588 async fn handle_pre_liquidation_state(
589 &self,
590 wallet: &WalletAddress,
591 margin_mode: MarginMode,
592 now: u64,
593 ) {
594 let Some(status) = self.cache.get_status(wallet).await else {
595 return;
596 };
597
598 let LiquidationState::PreLiquidation(mut metadata) = status.state.clone() else {
599 return;
600 };
601
602 let original_metadata = metadata.clone();
603 metadata.mm_shortfall = status.shortfall();
604 metadata.target_equity =
605 target_equity_from_mm(status.mm_required, self.config.partial_target_buffer_bps);
606 let above_partial_target = is_above_partial_target(
607 status.equity,
608 status.mm_required,
609 self.config.partial_target_buffer_bps,
610 );
611
612 if metadata.pending_full_request_id.is_some() {
613 if now
614 > metadata.escalation_deadline
615 + self.config.full_escalation_timeout.as_millis() as u64
616 {
617 warn!(
618 "Clearing stale pending_full_request_id for wallet {} (deadline+timeout exceeded)",
619 wallet
620 );
621 metadata.pending_full_auction_id = None;
622 metadata.pending_full_request_id = None;
623 metadata.pending_full_tx_hash = None;
624 metadata.pending_full_margin_needed = None;
625 self.persist_pre_liquidation_metadata(wallet, metadata.clone(), now)
627 .await;
628 } else {
629 if metadata != original_metadata {
630 self.persist_pre_liquidation_metadata(wallet, metadata, now)
631 .await;
632 }
633 return;
634 }
635 }
636
637 let has_short_option_positions = self.has_short_option_positions(wallet).await;
638 if !above_partial_target
639 && (status.equity < Decimal::ZERO
640 || matches!(has_short_option_positions, Some(false))
641 || now >= metadata.escalation_deadline)
642 {
643 if let Err(error) = self
644 .sync_and_cancel_orders(wallet, &metadata.active_order_client_ids, true)
645 .await
646 {
647 warn!(
648 "Failed to synchronize orders before full-liquidation escalation for {}: {}",
649 wallet, error
650 );
651 }
652
653 if let Some(executor) = &self.executor {
654 match executor.start_auction(wallet).await {
655 Ok(auction_id) => {
656 info!(
657 "Enqueued liquidation auction {} for wallet {} from pre-liquidation",
658 auction_id, wallet
659 );
660 }
661 Err(error) => {
662 error!(
663 "Failed to enqueue liquidation auction for wallet {}: {}",
664 wallet, error
665 );
666 }
667 }
668 } else {
669 debug!(
670 "Wallet {} reached full-liquidation escalation but no executor is configured",
671 wallet
672 );
673 }
674 return;
675 }
676
677 match self
678 .run_partial_liquidation_cycle(wallet, margin_mode, &status, metadata, now)
679 .await
680 {
681 Ok(outcome) => {
682 if outcome.can_recover {
683 if let Some(previous) = self.cache.recover_to_healthy(wallet, now).await {
684 counter!("ht_liquidation_partial_exit_total").increment(1);
685 if let Some(updated_status) = self.cache.get_status(wallet).await {
686 self.emit_state_change(&previous, &updated_status, now);
687 }
688 }
689 } else if let Some(current_status) = self.cache.get_status(wallet).await {
690 if let LiquidationState::PreLiquidation(current_metadata) = current_status.state
691 {
692 if current_metadata != outcome.metadata {
693 self.persist_pre_liquidation_metadata(wallet, outcome.metadata, now)
694 .await;
695 }
696 }
697 }
698 }
699 Err(error) => {
700 error!(
701 "Failed to run partial liquidation cycle for wallet {}: {}",
702 wallet, error
703 );
704 }
705 }
706 }
707
708 async fn run_partial_liquidation_cycle(
715 &self,
716 wallet: &WalletAddress,
717 margin_mode: MarginMode,
718 status: &AccountLiquidationStatus,
719 mut metadata: PartialLiquidationMetadata,
720 now: u64,
721 ) -> anyhow::Result<PartialCycleOutcome> {
722 metadata.mm_shortfall = status.shortfall();
723 metadata.target_equity =
724 target_equity_from_mm(status.mm_required, self.config.partial_target_buffer_bps);
725 metadata.bonus_bps = compute_partial_bonus_bps(
726 self.config.partial_bonus_start_bps,
727 self.config.partial_bonus_max_bps,
728 self.config.partial_bonus_ramp_ms,
729 metadata.entered_at,
730 now,
731 );
732
733 let above_target = is_above_partial_target(
734 status.equity,
735 status.mm_required,
736 self.config.partial_target_buffer_bps,
737 );
738 let reprice_due = metadata.last_reprice_at.is_none_or(|last_reprice_at| {
739 now.saturating_sub(last_reprice_at)
740 >= self.config.partial_reprice_interval.as_millis() as u64
741 });
742
743 let sync_result = self
744 .sync_and_cancel_orders(
745 wallet,
746 &metadata.active_order_client_ids,
747 above_target || reprice_due,
748 )
749 .await?;
750
751 if above_target {
752 metadata.active_order_request_ids.clear();
753 metadata.active_order_client_ids = sync_result.open_liquidation_client_ids;
754 let can_recover = metadata.active_order_client_ids.is_empty();
755 return Ok(PartialCycleOutcome {
756 metadata,
757 can_recover,
758 });
759 }
760
761 metadata.active_order_client_ids = sync_result.open_liquidation_client_ids;
762 if !reprice_due {
763 if metadata.active_order_client_ids.is_empty() {
764 metadata.active_order_request_ids.clear();
765 }
766 return Ok(PartialCycleOutcome {
767 metadata,
768 can_recover: false,
769 });
770 }
771
772 let plans = match margin_mode {
773 MarginMode::Standard => {
774 self.build_standard_partial_orders(wallet, metadata.bonus_bps)
775 .await?
776 }
777 MarginMode::Portfolio => {
778 self.build_portfolio_partial_orders(wallet, metadata.bonus_bps)
779 .await?
780 }
781 };
782
783 metadata.last_reprice_at = Some(now);
784 metadata.active_order_request_ids.clear();
785 metadata.active_order_client_ids.clear();
786
787 if plans.is_empty() {
788 return Ok(PartialCycleOutcome {
789 metadata,
790 can_recover: false,
791 });
792 }
793
794 let mut submitted_orders = 0u64;
795 for (index, plan) in plans.iter().enumerate() {
796 match self
797 .submit_partial_liquidation_order(wallet, plan, now, index)
798 .await
799 {
800 Ok((request_id, client_id, response)) => {
801 if response.status != OrderUpdateStatus::Rejected {
802 metadata.active_order_request_ids.push(request_id);
803 metadata.active_order_client_ids.push(client_id);
804 submitted_orders += 1;
805 } else {
806 warn!(
807 "Partial liquidation order rejected for wallet {} symbol {}: {}",
808 wallet,
809 plan.symbol,
810 response
811 .reason
812 .as_deref()
813 .unwrap_or("missing rejection reason")
814 );
815 }
816 }
817 Err(error) => {
818 error!(
819 "Failed to submit partial liquidation order for wallet {} symbol {}: {}",
820 wallet, plan.symbol, error
821 );
822 }
823 }
824 }
825
826 if submitted_orders > 0 {
827 counter!("ht_liquidation_partial_orders_submitted_total").increment(submitted_orders);
828 }
829
830 Ok(PartialCycleOutcome {
831 metadata,
832 can_recover: false,
833 })
834 }
835
836 async fn persist_pre_liquidation_metadata(
837 &self,
838 wallet: &WalletAddress,
839 metadata: PartialLiquidationMetadata,
840 now: u64,
841 ) {
842 let Some(current_status) = self.cache.get_status(wallet).await else {
843 return;
844 };
845
846 if !matches!(current_status.state, LiquidationState::PreLiquidation(..)) {
847 return;
848 }
849
850 let previous_state = current_status.state.clone();
851 let mut updated_status = current_status;
852 updated_status.state = LiquidationState::PreLiquidation(metadata);
853 updated_status.updated_at = now;
854 self.cache.set_status(updated_status.clone()).await;
855 self.emit_state_change(&previous_state, &updated_status, now);
856 }
857
858 async fn sync_and_cancel_orders(
859 &self,
860 wallet: &WalletAddress,
861 tracked_liquidation_client_ids: &[String],
862 cancel_tracked_liquidation_orders: bool,
863 ) -> anyhow::Result<OpenOrderSyncResult> {
864 let order_snapshot = self
865 .order_snapshot
866 .as_ref()
867 .context("OrderSnapshotProvider not set")?;
868 let open_orders = order_snapshot.get_open_orders_for_wallet(wallet);
869 let tracked_liquidation_client_ids: HashSet<&str> = tracked_liquidation_client_ids
870 .iter()
871 .map(String::as_str)
872 .collect();
873 let open_liquidation_client_ids = open_orders
874 .iter()
875 .filter_map(|order| order.client_id.as_deref())
876 .filter(|client_id| tracked_liquidation_client_ids.contains(client_id))
877 .map(str::to_owned)
878 .collect::<Vec<_>>();
879 let Some(current_positions) = self.try_current_position_sizes(wallet).await else {
880 warn!(
881 wallet = %wallet,
882 "Skipping open-order liquidation cancel sync because portfolio snapshot is unavailable"
883 );
884 return Ok(OpenOrderSyncResult {
885 open_liquidation_client_ids,
886 });
887 };
888 let reduce_only_order_ids =
889 self.reduce_only_open_order_ids(¤t_positions, &open_orders);
890
891 let mut cancel_targets = Vec::new();
892 let mut seen_targets = HashSet::new();
893
894 for order in &open_orders {
895 let tracked_client_id = order
896 .client_id
897 .as_deref()
898 .filter(|client_id| tracked_liquidation_client_ids.contains(client_id))
899 .map(ToOwned::to_owned);
900
901 let should_cancel = !reduce_only_order_ids.contains(&order.order_id)
902 || (cancel_tracked_liquidation_orders && tracked_client_id.is_some());
903 if !should_cancel {
904 continue;
905 }
906
907 let target_key = tracked_client_id
908 .clone()
909 .unwrap_or_else(|| format!("oid:{}", order.order_id));
910 if seen_targets.insert(target_key) {
911 cancel_targets.push(CancelTarget {
912 order_id: order.order_id,
913 client_id: tracked_client_id.or_else(|| order.client_id.clone()),
914 });
915 }
916 }
917
918 for target in cancel_targets {
919 if let Err(error) = self.cancel_open_order(wallet, &target).await {
920 warn!(
921 "Failed to cancel open order {} for wallet {}: {}",
922 target.order_id, wallet, error
923 );
924 }
925 }
926
927 Ok(OpenOrderSyncResult {
928 open_liquidation_client_ids,
929 })
930 }
931
932 async fn build_standard_partial_orders(
933 &self,
934 wallet: &WalletAddress,
935 bonus_bps: u32,
936 ) -> anyhow::Result<Vec<PartialLiquidationOrderPlan>> {
937 let builder = self
938 .standard_account_builder
939 .as_ref()
940 .context("StandardAccountBuilder not set")?;
941 let expected_prices = self.partial_execution_prices(wallet, bonus_bps).await?;
942 let mut simulated_account = builder.build(wallet).await.map_err(|error| {
943 anyhow!("Failed to build standard account for {}: {}", wallet, error)
944 })?;
945 let mut aggregated_plans = Vec::new();
946
947 for _ in 0..PARTIAL_PLAN_MAX_ROUNDS {
948 let margin = self
949 .standard_margin_service
950 .compute_margin(&simulated_account);
951 let required_relief_amount = required_mm_relief(
952 margin.equity,
953 margin.position_mm,
954 self.config.partial_target_buffer_bps,
955 );
956 if required_relief_amount <= PARTIAL_LIQ_MIN_SIZE {
957 break;
958 }
959
960 let candidates = self.standard_partial_candidates(&simulated_account, &expected_prices);
961 let round_plans = greedy_partial_plan(&candidates, required_relief_amount)
962 .into_iter()
963 .filter(|plan| plan.close_size > PARTIAL_LIQ_MIN_SIZE)
964 .collect::<Vec<_>>();
965 if round_plans.is_empty() {
966 break;
967 }
968
969 for plan in round_plans {
970 self.apply_standard_close(
971 &mut simulated_account,
972 &plan.symbol,
973 plan.close_size,
974 plan.limit_price,
975 )?;
976 merge_partial_plan(&mut aggregated_plans, plan);
977 }
978 }
979
980 Ok(aggregated_plans)
981 }
982
983 async fn build_portfolio_partial_orders(
984 &self,
985 wallet: &WalletAddress,
986 bonus_bps: u32,
987 ) -> anyhow::Result<Vec<PartialLiquidationOrderPlan>> {
988 let builder = self
989 .risk_account_builder
990 .as_ref()
991 .context("RiskAccountBuilder not set")?;
992 let expected_prices = self.partial_execution_prices(wallet, bonus_bps).await?;
993 let mut simulated_snapshot = builder
994 .build_snapshot(wallet)
995 .await
996 .map_err(|error| anyhow!("Failed to build PM snapshot for {}: {:?}", wallet, error))?
997 .without_open_orders();
998 let mut aggregated_plans = Vec::new();
999
1000 for _ in 0..PARTIAL_PLAN_MAX_ROUNDS {
1001 let margin = self.compute_pm_margin(builder, &simulated_snapshot)?;
1002 let required_relief_amount = required_mm_relief(
1003 margin.equity,
1004 margin.maintenance_margin_required,
1005 self.config.partial_target_buffer_bps,
1006 );
1007 if required_relief_amount <= PARTIAL_LIQ_MIN_SIZE {
1008 break;
1009 }
1010
1011 let candidates =
1012 self.pm_partial_candidates(builder, &simulated_snapshot, &expected_prices)?;
1013 let round_plans = greedy_partial_plan(&candidates, required_relief_amount)
1014 .into_iter()
1015 .filter(|plan| plan.close_size > PARTIAL_LIQ_MIN_SIZE)
1016 .collect::<Vec<_>>();
1017 if round_plans.is_empty() {
1018 break;
1019 }
1020
1021 for plan in round_plans {
1022 self.apply_pm_option_close(
1023 &mut simulated_snapshot,
1024 &plan.symbol,
1025 plan.close_size,
1026 plan.limit_price,
1027 )?;
1028 merge_partial_plan(&mut aggregated_plans, plan);
1029 }
1030 }
1031
1032 Ok(aggregated_plans)
1033 }
1034
1035 fn standard_partial_candidates(
1036 &self,
1037 account: &StandardAccount,
1038 expected_prices: &HashMap<String, Decimal>,
1039 ) -> Vec<LiquidationSliceCandidate> {
1040 let current_margin = self.standard_margin_service.compute_margin(account);
1041 let mut candidates = Vec::new();
1042
1043 for option_position in account.short_options() {
1044 let Some(expected_price) = expected_prices.get(&option_position.symbol).copied() else {
1045 continue;
1046 };
1047 let close_step = option_position.abs_size().min(Decimal::ONE);
1048 if close_step <= PARTIAL_LIQ_MIN_SIZE {
1049 continue;
1050 }
1051
1052 let mut simulated_account = account.clone();
1053 if self
1054 .apply_standard_close(
1055 &mut simulated_account,
1056 &option_position.symbol,
1057 close_step,
1058 expected_price,
1059 )
1060 .is_err()
1061 {
1062 continue;
1063 }
1064
1065 let new_margin = self
1066 .standard_margin_service
1067 .compute_margin(&simulated_account);
1068 let mm_relief_per_unit =
1069 (current_margin.position_mm - new_margin.position_mm) / close_step;
1070 if mm_relief_per_unit <= Decimal::ZERO {
1071 continue;
1072 }
1073
1074 candidates.push(LiquidationSliceCandidate {
1075 symbol: option_position.symbol.clone(),
1076 max_close_size: option_position.abs_size(),
1077 expected_fill_price: expected_price,
1078 mm_relief_per_unit,
1079 });
1080 }
1081
1082 candidates
1083 }
1084
1085 fn pm_partial_candidates(
1086 &self,
1087 builder: &crate::rsm::portfolio_margin::RiskAccountBuilder,
1088 snapshot: &PortfolioMarginSnapshot,
1089 expected_prices: &HashMap<String, Decimal>,
1090 ) -> anyhow::Result<Vec<LiquidationSliceCandidate>> {
1091 let current_margin = self.compute_pm_margin(builder, snapshot)?;
1092 let mut candidates = Vec::new();
1093
1094 for underlying in &snapshot.underlyings {
1095 for option in &underlying.executed_options {
1096 if option.quantity >= Decimal::ZERO {
1097 continue;
1098 }
1099
1100 let symbol = format_pm_option_symbol(&option.key);
1101 let Some(expected_price) = expected_prices.get(&symbol).copied() else {
1102 continue;
1103 };
1104
1105 let close_step = option.quantity.abs().min(Decimal::ONE);
1106 if close_step <= PARTIAL_LIQ_MIN_SIZE {
1107 continue;
1108 }
1109
1110 let mut simulated_snapshot = snapshot.clone();
1111 if self
1112 .apply_pm_option_close(
1113 &mut simulated_snapshot,
1114 &symbol,
1115 close_step,
1116 expected_price,
1117 )
1118 .is_err()
1119 {
1120 continue;
1121 }
1122
1123 let new_margin = self.compute_pm_margin(builder, &simulated_snapshot)?;
1124 let mm_relief_per_unit = (current_margin.maintenance_margin_required
1125 - new_margin.maintenance_margin_required)
1126 / close_step;
1127 if mm_relief_per_unit <= Decimal::ZERO {
1128 continue;
1129 }
1130
1131 candidates.push(LiquidationSliceCandidate {
1132 symbol,
1133 max_close_size: option.quantity.abs(),
1134 expected_fill_price: expected_price,
1135 mm_relief_per_unit,
1136 });
1137 }
1138 }
1139
1140 Ok(candidates)
1141 }
1142
1143 fn compute_pm_margin(
1144 &self,
1145 builder: &crate::rsm::portfolio_margin::RiskAccountBuilder,
1146 snapshot: &PortfolioMarginSnapshot,
1147 ) -> anyhow::Result<crate::types::MarginDetails> {
1148 let market_state = builder
1149 .resolve_market_state(snapshot, self.span_margin_service.config())
1150 .map_err(|error| anyhow!("Failed to resolve PM market state: {:?}", error))?;
1151
1152 self.span_margin_service
1153 .compute_margin_from_snapshot(snapshot, market_state)
1154 .map_err(|error| anyhow!("Failed to compute PM margin: {}", error))
1155 }
1156
1157 fn apply_standard_close(
1158 &self,
1159 account: &mut StandardAccount,
1160 symbol: &str,
1161 close_size: Decimal,
1162 execution_price: Decimal,
1163 ) -> anyhow::Result<()> {
1164 if close_size <= PARTIAL_LIQ_MIN_SIZE {
1165 return Ok(());
1166 }
1167
1168 let option_index = account
1169 .option_positions
1170 .iter()
1171 .position(|position| position.symbol == symbol)
1172 .ok_or_else(|| anyhow!("Missing standard option position for {}", symbol))?;
1173 let current_size = account.option_positions[option_index].size;
1174 if current_size >= Decimal::ZERO {
1175 return Err(anyhow!(
1176 "Expected short standard option position for {}, found {}",
1177 symbol,
1178 current_size
1179 ));
1180 }
1181 if close_size > current_size.abs() + PARTIAL_LIQ_MIN_SIZE {
1182 return Err(anyhow!(
1183 "Close size {} exceeds standard short position {} for {}",
1184 close_size,
1185 current_size.abs(),
1186 symbol
1187 ));
1188 }
1189
1190 account.usdc_balance -= execution_price * close_size;
1191 account.option_positions[option_index].size += close_size;
1192 if account.option_positions[option_index].size.abs() <= PARTIAL_LIQ_MIN_SIZE {
1193 account.option_positions.remove(option_index);
1194 }
1195
1196 Ok(())
1197 }
1198
1199 fn apply_pm_option_close(
1200 &self,
1201 snapshot: &mut PortfolioMarginSnapshot,
1202 symbol: &str,
1203 close_size: Decimal,
1204 execution_price: Decimal,
1205 ) -> anyhow::Result<()> {
1206 if close_size <= PARTIAL_LIQ_MIN_SIZE {
1207 return Ok(());
1208 }
1209
1210 let parsed = ParsedSymbol::from_symbol(symbol)
1211 .map_err(|error| anyhow!("Failed to parse PM option symbol {}: {}", symbol, error))?;
1212 let option_key = PortfolioMarginOptionKey {
1213 underlying: parsed.underlying.clone(),
1214 option_type: parsed.option_type,
1215 strike: parsed.strike,
1216 expiry_ts: hypercall_types::expiry_date_to_timestamp(&parsed.underlying, parsed.expiry),
1217 };
1218
1219 let underlying_snapshot = snapshot
1220 .underlyings
1221 .iter_mut()
1222 .find(|underlying| underlying.underlying == option_key.underlying)
1223 .ok_or_else(|| anyhow!("Missing PM underlying snapshot for {}", symbol))?;
1224
1225 let option_exposure = underlying_snapshot
1226 .executed_options
1227 .iter_mut()
1228 .find(|option| option.key == option_key)
1229 .ok_or_else(|| anyhow!("Missing PM executed option exposure for {}", symbol))?;
1230 if option_exposure.quantity >= Decimal::ZERO {
1231 return Err(anyhow!(
1232 "Expected short PM option exposure for {}, found {}",
1233 symbol,
1234 option_exposure.quantity
1235 ));
1236 }
1237 if close_size > option_exposure.quantity.abs() + PARTIAL_LIQ_MIN_SIZE {
1238 return Err(anyhow!(
1239 "Close size {} exceeds PM short position {} for {}",
1240 close_size,
1241 option_exposure.quantity.abs(),
1242 symbol
1243 ));
1244 }
1245
1246 let signed_closed_quantity = -close_size;
1247 snapshot.cash_balance +=
1248 signed_closed_quantity * (execution_price - option_exposure.entry_price);
1249 option_exposure.quantity += close_size;
1250
1251 underlying_snapshot
1252 .executed_options
1253 .retain(|option| option.quantity.abs() > PARTIAL_LIQ_MIN_SIZE);
1254 snapshot.underlyings.retain(|underlying| {
1255 !underlying.executed_options.is_empty()
1256 || !underlying.executed_perps.is_empty()
1257 || !underlying.hypothetical_open_order_options.is_empty()
1258 || !underlying.hypothetical_open_order_perps.is_empty()
1259 });
1260
1261 Ok(())
1262 }
1263
1264 async fn partial_execution_prices(
1265 &self,
1266 wallet: &WalletAddress,
1267 bonus_bps: u32,
1268 ) -> anyhow::Result<HashMap<String, Decimal>> {
1269 let positions = self.current_positions(wallet).await;
1270 let mut prices = HashMap::new();
1271
1272 for (symbol, position) in positions {
1273 if position.amount >= Decimal::ZERO {
1274 continue;
1275 }
1276 if ParsedSymbol::from_symbol(&symbol).is_err() {
1277 continue;
1278 }
1279
1280 let fallback_mark_price = derive_portfolio_mark_price(&position)
1281 .with_context(|| format!("Failed to derive fallback mark for {}", symbol))?;
1282 let reference_price = self
1283 .quote_reference_price(&symbol)
1284 .unwrap_or(fallback_mark_price);
1285 if reference_price <= Decimal::ZERO {
1286 return Err(anyhow!(
1287 "Expected positive liquidation reference price for {}, got {}",
1288 symbol,
1289 reference_price
1290 ));
1291 }
1292
1293 prices.insert(
1294 symbol,
1295 reference_price * (Decimal::ONE + (Decimal::from(bonus_bps) / dec!(10000))),
1296 );
1297 }
1298
1299 Ok(prices)
1300 }
1301
1302 fn quote_reference_price(&self, symbol: &str) -> Option<Decimal> {
1303 if let Some(quote_provider) = self.quote_provider.as_ref() {
1305 if let Some(quote) = quote_provider.get_quote(symbol) {
1306 let bbo_price = quote
1307 .best_ask
1308 .filter(|price| *price > 0.0)
1309 .or_else(|| quote.mid.filter(|price| *price > 0.0))
1310 .or_else(|| quote.best_bid.filter(|price| *price > 0.0));
1311 if let Some(price) = bbo_price {
1312 return Some(decimal_from_f64(price, "partial_quote_bbo"));
1313 }
1314 }
1315 }
1316
1317 if let Some(greeks_cache) = &self.greeks_cache {
1319 if let Ok(theo) =
1320 futures::executor::block_on(greeks_cache.get_theoretical_price(symbol))
1321 {
1322 if theo > 0.0 && theo.is_finite() {
1323 return Some(decimal_from_f64(theo, "partial_quote_theo"));
1324 }
1325 }
1326 }
1327
1328 None
1329 }
1330
1331 async fn current_positions(&self, wallet: &WalletAddress) -> HashMap<String, PositionData> {
1332 self.portfolio_service
1333 .get_portfolio_balance(wallet)
1334 .await
1335 .unwrap_or_default()
1336 .positions
1337 }
1338
1339 async fn try_current_position_sizes(
1340 &self,
1341 wallet: &WalletAddress,
1342 ) -> Option<HashMap<String, Decimal>> {
1343 self.portfolio_service
1344 .get_portfolio_balance(wallet)
1345 .await
1346 .map(|balance| {
1347 balance
1348 .positions
1349 .into_iter()
1350 .map(|(symbol, position)| (symbol, position.amount))
1351 .collect()
1352 })
1353 }
1354
1355 fn is_order_reduce_only(
1356 &self,
1357 current_positions: &HashMap<String, Decimal>,
1358 order: &RuntimeOrderSummary,
1359 ) -> bool {
1360 let current_position = current_positions
1361 .get(&order.symbol)
1362 .copied()
1363 .unwrap_or(Decimal::ZERO);
1364 let signed_order_size = if matches!(order.side, Side::Buy) {
1365 order.remaining_size
1366 } else {
1367 -order.remaining_size
1368 };
1369 let new_position = current_position + signed_order_size;
1370 LiquidationManager::is_position_reduce_only(current_position, new_position)
1371 }
1372
1373 fn reduce_only_open_order_ids(
1374 &self,
1375 current_positions: &HashMap<String, Decimal>,
1376 open_orders: &[RuntimeOrderSummary],
1377 ) -> HashSet<u64> {
1378 let mut remaining_positions = current_positions.clone();
1379 let mut sorted_orders: Vec<&RuntimeOrderSummary> = open_orders.iter().collect();
1380 sorted_orders.sort_by_key(|order| (order.created_at, order.order_id));
1381
1382 let mut reduce_only_order_ids = HashSet::new();
1383 for order in sorted_orders {
1384 if !self.is_order_reduce_only(&remaining_positions, order) {
1385 continue;
1386 }
1387
1388 let current_position = remaining_positions
1389 .get(&order.symbol)
1390 .copied()
1391 .unwrap_or(Decimal::ZERO);
1392 let signed_order_size = if matches!(order.side, Side::Buy) {
1393 order.remaining_size
1394 } else {
1395 -order.remaining_size
1396 };
1397 remaining_positions.insert(order.symbol.clone(), current_position + signed_order_size);
1398 reduce_only_order_ids.insert(order.order_id);
1399 }
1400
1401 reduce_only_order_ids
1402 }
1403
1404 async fn cancel_open_order(
1405 &self,
1406 wallet: &WalletAddress,
1407 target: &CancelTarget,
1408 ) -> anyhow::Result<()> {
1409 let request_id = Uuid::now_v7().to_string();
1410 let order_info = OrderInfo {
1411 symbol: String::new(),
1412 price: Decimal::ZERO,
1413 size: Decimal::ZERO,
1414 side: Side::Buy,
1415 tif: TimeInForce::GTC,
1416 client_id: target.client_id.clone(),
1417 order_id: Some(target.order_id),
1418 is_perp: false,
1419 underlying: None,
1420 reduce_only: None,
1421 nonce: None,
1422 signature: None,
1423 mmp_enabled: false,
1424 builder_code_address: None,
1425 };
1426
1427 let response = self
1428 .send_engine_order(OrderActionMessage {
1429 timestamp: get_timestamp_millis(),
1430 info: order_info,
1431 action: OrderAction::CancelOrder,
1432 wallet: *wallet,
1433 api_wallet_address: Some(*wallet),
1434 mmp_triggered: false,
1435 request_id: Some(request_id),
1436 })
1437 .await?;
1438
1439 let is_not_found = response.status == OrderUpdateStatus::Rejected
1440 && response.reason.as_deref().is_some_and(|reason| {
1441 reason.contains("already been completed")
1442 || reason.contains("already filled")
1443 || reason.contains("already cancelled")
1444 || reason.contains("already canceled")
1445 || reason.contains("not found")
1446 });
1447
1448 if response.status == OrderUpdateStatus::Canceled || is_not_found {
1449 return Ok(());
1450 }
1451
1452 Err(anyhow!(
1453 "Cancel rejected for wallet {} order {}: {}",
1454 wallet,
1455 target.order_id,
1456 response
1457 .reason
1458 .as_deref()
1459 .unwrap_or("missing cancellation reason")
1460 ))
1461 }
1462
1463 async fn submit_partial_liquidation_order(
1464 &self,
1465 wallet: &WalletAddress,
1466 plan: &PartialLiquidationOrderPlan,
1467 now: u64,
1468 order_index: usize,
1469 ) -> anyhow::Result<(String, String, OrderUpdateMessage)> {
1470 let request_id = Uuid::now_v7().to_string();
1471 let client_id = format!("liq-{}-{}-{}", wallet, now, order_index);
1472
1473 let response = self
1474 .send_engine_order(OrderActionMessage {
1475 timestamp: get_timestamp_millis(),
1476 info: OrderInfo {
1477 symbol: plan.symbol.clone(),
1478 price: plan.limit_price,
1479 size: to_contract_units_decimal(&plan.symbol, plan.close_size),
1480 side: Side::Buy,
1481 tif: TimeInForce::GTC,
1482 client_id: Some(client_id.clone()),
1483 order_id: None,
1484 is_perp: false,
1485 underlying: None,
1486 reduce_only: Some(true),
1487 nonce: None,
1488 signature: None,
1489 mmp_enabled: false,
1490 builder_code_address: None,
1491 },
1492 action: OrderAction::CreateOrder,
1493 wallet: *wallet,
1494 api_wallet_address: Some(*wallet),
1495 mmp_triggered: false,
1496 request_id: Some(request_id.clone()),
1497 })
1498 .await?;
1499
1500 Ok((request_id, client_id, response))
1501 }
1502
1503 async fn send_engine_order(
1504 &self,
1505 message: OrderActionMessage,
1506 ) -> anyhow::Result<OrderUpdateMessage> {
1507 let order_sender = self
1508 .order_sender
1509 .as_ref()
1510 .context("order sender not configured")?;
1511 let (response_tx, mut response_rx) = mpsc::channel(1);
1512 let request = UnifiedEngineRequest {
1513 message,
1514 response_tx,
1515 enqueued_at: Instant::now(),
1516 #[cfg(feature = "otel-tracing")]
1517 trace_context: None,
1518 };
1519
1520 increment_pending_requests();
1521 order_sender
1522 .send(request)
1523 .await
1524 .context("failed to send liquidation order to engine")?;
1525
1526 match timeout(ENGINE_RESPONSE_TIMEOUT, response_rx.recv()).await {
1527 Ok(Some(response)) => Ok(response),
1528 Ok(None) => Err(anyhow!("engine closed liquidation response channel")),
1529 Err(_) => Err(anyhow!("timed out waiting for liquidation engine response")),
1530 }
1531 }
1532
1533 fn emit_state_change(
1535 &self,
1536 previous: &LiquidationState,
1537 status: &AccountLiquidationStatus,
1538 timestamp: u64,
1539 ) {
1540 info!(
1541 "Liquidation state change: wallet={}, {} -> {}",
1542 status.wallet,
1543 previous.as_str(),
1544 status.state.as_str()
1545 );
1546
1547 if let Some(sender) = &self.event_sender {
1549 let msg = LiquidationStateMessage {
1550 wallet: status.wallet,
1551 previous_state: liquidation_state_to_type(previous),
1552 new_state: liquidation_state_to_type(&status.state),
1553 previous_liquidation_mode: previous
1554 .liquidation_mode()
1555 .map(|mode| mode.as_str().to_string()),
1556 liquidation_mode: status
1557 .liquidation_mode()
1558 .map(|mode| mode.as_str().to_string()),
1559 margin_mode: status.margin_mode.as_str().to_string(),
1560 equity: status.equity,
1561 mm_required: status.mm_required,
1562 maintenance_margin: status.maintenance_margin,
1563 shortfall: status.shortfall(),
1564 previous_auction_id: previous.auction_id().map(ToOwned::to_owned),
1565 projection_changed: has_material_projection_change(previous, &status.state),
1566 auction_id: status
1567 .state
1568 .auction_id()
1569 .map(ToOwned::to_owned)
1570 .or_else(|| previous.auction_id().map(ToOwned::to_owned)),
1571 status: status.clone(),
1572 timestamp,
1573 };
1574
1575 if let Err(error) = sender.send(EngineMessage::LiquidationStateChange(msg)) {
1576 warn!("Failed to send liquidation state change event: {}", error);
1577 }
1578 }
1579 }
1580}
1581
1582fn get_timestamp_millis() -> u64 {
1584 std::time::SystemTime::now()
1585 .duration_since(std::time::UNIX_EPOCH)
1586 .unwrap_or_default()
1587 .as_millis() as u64
1588}
1589
1590fn liquidation_state_to_type(state: &LiquidationState) -> LiquidationStateType {
1592 match state {
1593 LiquidationState::Healthy => LiquidationStateType::Healthy,
1594 LiquidationState::PreLiquidation(..) => LiquidationStateType::PreLiquidation,
1595 LiquidationState::InLiquidation(..) => LiquidationStateType::InLiquidation,
1596 LiquidationState::Liquidated(..) => LiquidationStateType::Liquidated,
1597 }
1598}
1599
1600fn decimal_from_f64(value: f64, field: &str) -> Decimal {
1601 if !value.is_finite() {
1602 warn!(
1603 "Liquidation {} value {} is not finite, clamping to zero",
1604 field, value
1605 );
1606 return Decimal::ZERO;
1607 }
1608 Decimal::from_f64_retain(value).unwrap_or_else(|| {
1609 warn!(
1610 "Liquidation {} value {} is not representable as Decimal, clamping to zero",
1611 field, value
1612 );
1613 Decimal::ZERO
1614 })
1615}
1616
1617fn derive_portfolio_mark_price(position: &PositionData) -> anyhow::Result<Decimal> {
1618 if position.amount.abs() <= PARTIAL_LIQ_MIN_SIZE {
1619 return Err(anyhow!(
1620 "position amount {} is too small to derive mark price for {}",
1621 position.amount,
1622 position.symbol
1623 ));
1624 }
1625
1626 Ok(position.entry_price + (position.unrealized_pnl / position.amount))
1627}
1628
1629fn merge_partial_plan(
1630 aggregated_plans: &mut Vec<PartialLiquidationOrderPlan>,
1631 plan: PartialLiquidationOrderPlan,
1632) {
1633 if let Some(existing_plan) = aggregated_plans
1634 .iter_mut()
1635 .find(|existing| existing.symbol == plan.symbol && existing.limit_price == plan.limit_price)
1636 {
1637 existing_plan.close_size += plan.close_size;
1638 existing_plan.expected_mm_relief += plan.expected_mm_relief;
1639 return;
1640 }
1641
1642 aggregated_plans.push(plan);
1643}
1644
1645fn format_pm_option_symbol(key: &PortfolioMarginOptionKey) -> String {
1646 format!(
1647 "{}-{}-{}-{}",
1648 key.underlying,
1649 chrono::DateTime::from_timestamp(key.expiry_ts, 0)
1650 .expect("option expiry_ts must be valid")
1651 .format("%Y%m%d"),
1652 key.strike.normalize(),
1653 match key.option_type {
1654 crate::types::OptionType::Call => "C",
1655 crate::types::OptionType::Put => "P",
1656 }
1657 )
1658}
1659
1660impl LiquidationWatcher {
1661 async fn has_short_option_positions(&self, wallet: &WalletAddress) -> Option<bool> {
1662 let Some(balance) = self.portfolio_service.get_portfolio_balance(wallet).await else {
1663 return None;
1664 };
1665
1666 Some(balance.positions.iter().any(|(symbol, position)| {
1667 position.amount < Decimal::ZERO && ParsedSymbol::from_symbol(symbol).is_ok()
1668 }))
1669 }
1670}
1671
1672#[async_trait::async_trait]
1673impl crate::shared::service::Service for LiquidationWatcher {
1674 fn name(&self) -> &'static str {
1675 "LiquidationWatcher"
1676 }
1677
1678 fn owner(&self) -> crate::shared::service::ServiceOwner {
1679 crate::shared::service::ServiceOwner::Engine
1680 }
1681
1682 async fn run(
1683 self: std::sync::Arc<Self>,
1684 shutdown: crate::shared::ShutdownRx,
1685 ) -> anyhow::Result<()> {
1686 self.run_with_shutdown(shutdown).await;
1687 Ok(())
1688 }
1689}
1690
1691#[cfg(test)]
1692mod tests {
1693 use super::*;
1694 use crate::portfolio::{PortfolioBalance, PortfolioChange, PortfolioError};
1695 use crate::rsm::ledger::{InMemoryLedger, Ledger};
1696 use crate::rsm::portfolio_margin::risk_account_builder::SpotPriceSource;
1697 use crate::types::{Config, Scenario, ScenarioType};
1698 use async_trait::async_trait;
1699 use hypercall_engine::FeeConfig;
1700 use hypercall_runtime_api::RuntimeOrderSummary;
1701 use hypercall_types::api_models::Portfolio;
1702 use hypercall_types::{to_human_readable_decimal, wallet_address::test_wallet};
1703 use std::any::Any;
1704 use std::collections::HashMap;
1705 use std::sync::atomic::{AtomicU64, Ordering};
1706 use std::sync::RwLock;
1707 use tokio::sync::RwLock as AsyncRwLock;
1708 use tokio::task::JoinHandle;
1709
1710 #[derive(Default)]
1711 struct MockPortfolioService {
1712 balances: AsyncRwLock<HashMap<WalletAddress, PortfolioBalance>>,
1713 }
1714
1715 impl MockPortfolioService {
1716 async fn set_balance(&self, wallet: WalletAddress, balance: PortfolioBalance) {
1717 self.balances.write().await.insert(wallet, balance);
1718 }
1719
1720 async fn clear_balance(&self, wallet: &WalletAddress) {
1721 self.balances.write().await.remove(wallet);
1722 }
1723 }
1724
1725 #[async_trait]
1726 impl PortfolioService for MockPortfolioService {
1727 async fn get_portfolio(&self, account: &WalletAddress) -> Portfolio {
1728 let balance = self
1729 .get_portfolio_balance(account)
1730 .await
1731 .unwrap_or_default();
1732 Portfolio {
1733 wallet_address: *account,
1734 positions: balance
1735 .positions
1736 .values()
1737 .cloned()
1738 .map(
1739 |position| hypercall_types::api_models::PositionWithMetrics {
1740 position: hypercall_types::api_models::Position {
1741 wallet_address: *account,
1742 symbol: position.symbol,
1743 amount: position.amount,
1744 entry_price: position.entry_price,
1745 margin_posted: position.margin_posted,
1746 realized_pnl: position.realized_pnl,
1747 unrealized_pnl: position.unrealized_pnl,
1748 updated_at: chrono::Utc::now(),
1749 },
1750 notional_value: Decimal::ZERO,
1751 maintenance_margin: Decimal::ZERO,
1752 liquidation_price: Decimal::ZERO,
1753 margin_ratio: Decimal::ZERO,
1754 },
1755 )
1756 .collect(),
1757 total_margin_used: balance.total_margin_used,
1758 available_balance: Decimal::ZERO,
1759 span_margin: None,
1760 margin_mode: "standard".to_string(),
1761 margin_summary: None,
1762 }
1763 }
1764
1765 async fn get_portfolio_balance(&self, account: &WalletAddress) -> Option<PortfolioBalance> {
1766 self.balances.read().await.get(account).cloned()
1767 }
1768
1769 async fn all_portfolios(&self) -> HashMap<WalletAddress, PortfolioBalance> {
1770 self.balances.read().await.clone()
1771 }
1772
1773 async fn apply_event(
1774 &self,
1775 _event: &hypercall_types::EngineMessage,
1776 ) -> Result<Vec<PortfolioChange>, PortfolioError> {
1777 Ok(Vec::new())
1778 }
1779
1780 async fn remove_expired_position(&self, _wallet: &WalletAddress, _symbol: &str) {}
1781
1782 async fn apply_hypercore_position_update(
1783 &self,
1784 _update: &crate::portfolio::HypercorePositionUpdate,
1785 ) {
1786 }
1787
1788 async fn set_hypercore_position(
1789 &self,
1790 _update: &crate::portfolio::HypercorePositionUpdate,
1791 ) {
1792 }
1793
1794 fn as_any(&self) -> &dyn Any {
1795 self
1796 }
1797
1798 async fn calculate_fill_accounting(
1799 &self,
1800 fill: &hypercall_types::Fill,
1801 ) -> Result<hypercall_types::FillAccounting, crate::portfolio::PortfolioError> {
1802 Ok(hypercall_types::FillAccounting::zero(fill.trade_id))
1803 }
1804
1805 async fn apply_fill_to_memory(
1806 &self,
1807 _wallet: &WalletAddress,
1808 _symbol: &str,
1809 _side: &Side,
1810 _price: Decimal,
1811 _quantity: Decimal,
1812 ) {
1813 }
1814 }
1815
1816 struct MockSpotPriceSource {
1817 prices: HashMap<String, f64>,
1818 }
1819
1820 #[async_trait]
1821 impl SpotPriceSource for MockSpotPriceSource {
1822 async fn get_spot_price(&self, underlying: &str) -> Option<f64> {
1823 self.prices.get(underlying).copied()
1824 }
1825 }
1826
1827 #[derive(Default)]
1828 struct MutableOrderSnapshotProvider {
1829 orders: RwLock<HashMap<WalletAddress, Vec<RuntimeOrderSummary>>>,
1830 }
1831
1832 impl MutableOrderSnapshotProvider {
1833 fn insert_order(&self, wallet: WalletAddress, order: RuntimeOrderSummary) {
1834 self.orders
1835 .write()
1836 .expect("order lock poisoned")
1837 .entry(wallet)
1838 .or_default()
1839 .push(order);
1840 }
1841
1842 fn remove_by_order_id(&self, wallet: &WalletAddress, order_id: u64) -> bool {
1843 let mut orders = self.orders.write().expect("order lock poisoned");
1844 let Some(wallet_orders) = orders.get_mut(wallet) else {
1845 return false;
1846 };
1847 let original_len = wallet_orders.len();
1848 wallet_orders.retain(|order| order.order_id != order_id);
1849 wallet_orders.len() != original_len
1850 }
1851
1852 fn clear_wallet(&self, wallet: &WalletAddress) {
1853 self.orders
1854 .write()
1855 .expect("order lock poisoned")
1856 .remove(wallet);
1857 }
1858 }
1859
1860 impl OrderSnapshotProvider for MutableOrderSnapshotProvider {
1861 fn get_open_orders_for_wallet(&self, wallet: &WalletAddress) -> Vec<RuntimeOrderSummary> {
1862 self.orders
1863 .read()
1864 .expect("order lock poisoned")
1865 .get(wallet)
1866 .cloned()
1867 .unwrap_or_default()
1868 }
1869
1870 fn get_all_orders(&self) -> Vec<(RuntimeOrderSummary, WalletAddress)> {
1871 self.orders
1872 .read()
1873 .expect("order lock poisoned")
1874 .iter()
1875 .flat_map(|(wallet, orders)| {
1876 orders.iter().cloned().map(move |order| (order, *wallet))
1877 })
1878 .collect()
1879 }
1880 }
1881
1882 fn test_margin_config() -> Config {
1883 Config {
1884 risk_free_rate: 0.05,
1885 base_volatility: 0.8,
1886 base_skew: 0.0,
1887 base_excess_kurtosis: 0.0,
1888 scenarios: vec![
1889 Scenario {
1890 scenario_type: ScenarioType::SpotChange,
1891 value: 0.15,
1892 },
1893 Scenario {
1894 scenario_type: ScenarioType::SpotChange,
1895 value: -0.15,
1896 },
1897 ],
1898 delta_threshold: 0.0001,
1899 strike_match_tolerance: 0.01,
1900 expiry_match_tolerance_years: 0.001,
1901 allow_standard_margin_shorts: false,
1902 fee_config: FeeConfig::default(),
1903 }
1904 }
1905
1906 fn liquidatable_health(wallet: &WalletAddress) -> LiquidationHealthResult {
1907 LiquidationHealthResult {
1908 wallet: wallet.to_string(),
1909 margin_mode: MarginMode::Standard,
1910 is_liquidatable: true,
1911 maintenance_margin: -200.0,
1912 equity: 100.0,
1913 mm_required: 300.0,
1914 }
1915 }
1916
1917 fn recovered_health(wallet: &WalletAddress) -> LiquidationHealthResult {
1918 LiquidationHealthResult {
1919 wallet: wallet.to_string(),
1920 margin_mode: MarginMode::Standard,
1921 is_liquidatable: false,
1922 maintenance_margin: 120.0,
1923 equity: 420.0,
1924 mm_required: 300.0,
1925 }
1926 }
1927
1928 struct StandardWatcherHarness {
1929 wallet: WalletAddress,
1930 symbol: String,
1931 cache: Arc<LiquidationCache>,
1932 portfolio_service: Arc<MockPortfolioService>,
1933 order_snapshot: Arc<MutableOrderSnapshotProvider>,
1934 watcher: LiquidationWatcher,
1935 engine_task: JoinHandle<()>,
1936 }
1937
1938 async fn setup_standard_partial_liquidation_harness(
1939 reject_partial_orders: bool,
1940 ) -> StandardWatcherHarness {
1941 let wallet = test_wallet(42);
1942 let symbol = "BTC-20261231-100000-C".to_string();
1943 let cache = Arc::new(LiquidationCache::new());
1944 let portfolio_service = Arc::new(MockPortfolioService::default());
1945 portfolio_service
1946 .set_balance(
1947 wallet,
1948 PortfolioBalance {
1949 positions: HashMap::from([(
1950 symbol.clone(),
1951 PositionData {
1952 symbol: symbol.clone(),
1953 amount: dec!(-4),
1954 entry_price: dec!(100),
1955 margin_posted: Decimal::ZERO,
1956 realized_pnl: Decimal::ZERO,
1957 unrealized_pnl: Decimal::ZERO,
1958 },
1959 )]),
1960 total_margin_used: Decimal::ZERO,
1961 },
1962 )
1963 .await;
1964
1965 let ledger = Arc::new(InMemoryLedger::new());
1966 ledger
1967 .set_balance(&wallet, dec!(100))
1968 .await
1969 .expect("ledger balance should be seeded");
1970
1971 let standard_account_builder = Arc::new(StandardAccountBuilder::new(
1972 ledger,
1973 portfolio_service.clone(),
1974 Arc::new(MockSpotPriceSource {
1975 prices: HashMap::from([("BTC".to_string(), 105000.0)]),
1976 }),
1977 ));
1978 let quote_provider = Arc::new(crate::rsm::engine_snapshot::MockQuoteProvider::new());
1979 quote_provider.set_quote(
1980 &symbol,
1981 hypercall_runtime_api::SnapshotBookQuote {
1982 best_bid: Some(149.0),
1983 best_bid_size: Some(10.0),
1984 best_ask: Some(150.0),
1985 best_ask_size: Some(10.0),
1986 mid: Some(149.5),
1987 bids: vec![(149.0, 10.0)],
1988 asks: vec![(150.0, 10.0)],
1989 },
1990 );
1991
1992 let order_snapshot = Arc::new(MutableOrderSnapshotProvider::default());
1993 order_snapshot.insert_order(
1994 wallet,
1995 RuntimeOrderSummary {
1996 order_id: 7,
1997 symbol: symbol.clone(),
1998 side: Side::Sell,
1999 price: dec!(95),
2000 original_size: dec!(1),
2001 remaining_size: dec!(1),
2002 is_perp: false,
2003 mmp_enabled: false,
2004 client_id: Some("risk-increasing".to_string()),
2005 created_at: 1,
2006 },
2007 );
2008
2009 let (order_tx, mut order_rx) = mpsc::channel::<UnifiedEngineRequest>(8);
2010 let next_order_id = Arc::new(AtomicU64::new(1_000));
2011 let mock_orders = order_snapshot.clone();
2012 let order_id_counter = next_order_id.clone();
2013 let engine_task = tokio::spawn(async move {
2014 while let Some(request) = order_rx.recv().await {
2015 let message = request.message.clone();
2016 let response = match message.action {
2017 OrderAction::CreateOrder if reject_partial_orders => OrderUpdateMessage {
2018 timestamp: message.timestamp,
2019 info: message.info,
2020 status: OrderUpdateStatus::Rejected,
2021 reason: Some("forced rejection".to_string()),
2022 filled_size: Decimal::ZERO,
2023 order_id: None,
2024 wallet_address: message.wallet,
2025 mmp_triggered: false,
2026 request_id: message.request_id,
2027 },
2028 OrderAction::CreateOrder => {
2029 let order_id = order_id_counter.fetch_add(1, Ordering::Relaxed);
2030 let human_size =
2031 to_human_readable_decimal(&message.info.symbol, message.info.size);
2032 mock_orders.insert_order(
2033 message.wallet,
2034 RuntimeOrderSummary {
2035 order_id,
2036 symbol: message.info.symbol.clone(),
2037 side: message.info.side,
2038 price: message.info.price,
2039 original_size: human_size,
2040 remaining_size: human_size,
2041 is_perp: message.info.is_perp,
2042 mmp_enabled: message.info.mmp_enabled,
2043 client_id: message.info.client_id.clone(),
2044 created_at: message.timestamp as i64,
2045 },
2046 );
2047 OrderUpdateMessage {
2048 timestamp: message.timestamp,
2049 info: message.info,
2050 status: OrderUpdateStatus::Acked,
2051 reason: None,
2052 filled_size: Decimal::ZERO,
2053 order_id: Some(order_id),
2054 wallet_address: message.wallet,
2055 mmp_triggered: false,
2056 request_id: message.request_id,
2057 }
2058 }
2059 OrderAction::CancelOrder => {
2060 let removed = message
2061 .info
2062 .order_id
2063 .map(|order_id| {
2064 mock_orders.remove_by_order_id(&message.wallet, order_id)
2065 })
2066 .unwrap_or(false);
2067 OrderUpdateMessage {
2068 timestamp: message.timestamp,
2069 info: message.info,
2070 status: if removed {
2071 OrderUpdateStatus::Canceled
2072 } else {
2073 OrderUpdateStatus::Rejected
2074 },
2075 reason: if removed {
2076 None
2077 } else {
2078 Some("order not found".to_string())
2079 },
2080 filled_size: Decimal::ZERO,
2081 order_id: None,
2082 wallet_address: message.wallet,
2083 mmp_triggered: false,
2084 request_id: message.request_id,
2085 }
2086 }
2087 OrderAction::ReplaceOrder => OrderUpdateMessage {
2088 timestamp: message.timestamp,
2089 info: message.info,
2090 status: OrderUpdateStatus::Rejected,
2091 reason: Some(
2092 "replace orders unsupported in liquidation mock engine".to_string(),
2093 ),
2094 filled_size: Decimal::ZERO,
2095 order_id: None,
2096 wallet_address: message.wallet,
2097 mmp_triggered: false,
2098 request_id: message.request_id,
2099 },
2100 };
2101
2102 request
2103 .response_tx
2104 .send(response)
2105 .await
2106 .expect("mock engine should deliver response");
2107 }
2108 });
2109
2110 let watcher = LiquidationWatcher::new_for_tests_without_tier_cache(
2111 cache.clone(),
2112 portfolio_service.clone(),
2113 Arc::new(SpanMarginService::new_for_tests(test_margin_config())),
2114 Arc::new(crate::standard_margin::StandardMarginService::new()),
2115 LiquidationWatcherConfig {
2116 poll_interval: Duration::from_secs(5),
2117 full_escalation_timeout: Duration::from_secs(60),
2118 min_shortfall_threshold: Decimal::ZERO,
2119 partial_target_buffer_bps: 500,
2120 partial_reprice_interval: Duration::from_secs(5),
2121 partial_bonus_start_bps: 50,
2122 partial_bonus_max_bps: 500,
2123 partial_bonus_ramp_ms: 60_000,
2124 },
2125 )
2126 .with_standard_account_builder(standard_account_builder)
2127 .with_quote_provider(quote_provider)
2128 .with_order_snapshot(order_snapshot.clone())
2129 .with_order_sender(order_tx);
2130
2131 StandardWatcherHarness {
2132 wallet,
2133 symbol,
2134 cache,
2135 portfolio_service,
2136 order_snapshot,
2137 watcher,
2138 engine_task,
2139 }
2140 }
2141
2142 #[test]
2143 fn test_config_default() {
2144 let config = LiquidationWatcherConfig::default();
2145 assert_eq!(config.poll_interval, Duration::from_secs(5));
2146 assert_eq!(config.full_escalation_timeout, Duration::from_secs(60));
2147 assert_eq!(config.min_shortfall_threshold, dec!(0));
2148 assert_eq!(config.partial_target_buffer_bps, 500);
2149 assert_eq!(config.partial_reprice_interval, Duration::from_secs(5));
2150 assert_eq!(config.partial_bonus_start_bps, 50);
2151 assert_eq!(config.partial_bonus_max_bps, 1_000);
2152 assert_eq!(config.partial_bonus_ramp_ms, 300_000);
2153 }
2154
2155 #[test]
2156 fn test_config_from_runtime() {
2157 let runtime = crate::backend_config::LiquidationRuntimeConfig {
2158 enabled: true,
2159 health_poll_interval_ms: 1_500,
2160 partial_target_buffer_bps: 250,
2161 partial_reprice_interval_ms: 2_500,
2162 partial_bonus_start_bps: 75,
2163 partial_bonus_max_bps: 500,
2164 partial_bonus_ramp_ms: 120_000,
2165 min_shortfall_threshold: dec!(42),
2166 full_escalation_timeout_ms: 9_000,
2167 full_target_buffer_bps: 700,
2168 chain_observer_poll_interval_ms: 1_000,
2169 chain_observer_max_lag_blocks: 16,
2170 };
2171
2172 let config = LiquidationWatcherConfig::from_runtime_config(&runtime);
2173 assert_eq!(config.poll_interval, Duration::from_millis(1_500));
2174 assert_eq!(config.full_escalation_timeout, Duration::from_millis(9_000));
2175 assert_eq!(config.min_shortfall_threshold, dec!(42));
2176 assert_eq!(config.partial_target_buffer_bps, 250);
2177 assert_eq!(
2178 config.partial_reprice_interval,
2179 Duration::from_millis(2_500)
2180 );
2181 assert_eq!(config.partial_bonus_start_bps, 75);
2182 assert_eq!(config.partial_bonus_max_bps, 500);
2183 assert_eq!(config.partial_bonus_ramp_ms, 120_000);
2184 }
2185
2186 #[test]
2187 fn test_merge_partial_plan_accumulates_same_symbol_and_price() {
2188 let mut plans = vec![PartialLiquidationOrderPlan {
2189 symbol: "BTC-20251231-100000-C".to_string(),
2190 close_size: dec!(1),
2191 limit_price: dec!(100),
2192 expected_mm_relief: dec!(50),
2193 }];
2194
2195 merge_partial_plan(
2196 &mut plans,
2197 PartialLiquidationOrderPlan {
2198 symbol: "BTC-20251231-100000-C".to_string(),
2199 close_size: dec!(2),
2200 limit_price: dec!(100),
2201 expected_mm_relief: dec!(75),
2202 },
2203 );
2204
2205 assert_eq!(plans.len(), 1);
2206 assert_eq!(plans[0].close_size, dec!(3));
2207 assert_eq!(plans[0].expected_mm_relief, dec!(125));
2208 }
2209
2210 #[test]
2211 fn test_derive_portfolio_mark_price() {
2212 let position = PositionData {
2213 symbol: "BTC-20251231-100000-C".to_string(),
2214 amount: dec!(-2),
2215 entry_price: dec!(100),
2216 margin_posted: Decimal::ZERO,
2217 realized_pnl: Decimal::ZERO,
2218 unrealized_pnl: dec!(20),
2219 };
2220
2221 assert_eq!(derive_portfolio_mark_price(&position).unwrap(), dec!(90));
2222 }
2223
2224 #[test]
2225 fn test_format_pm_option_symbol() {
2226 let symbol = format_pm_option_symbol(&PortfolioMarginOptionKey {
2227 underlying: "BTC".to_string(),
2228 option_type: crate::types::OptionType::Call,
2229 strike: dec!(100000),
2230 expiry_ts: hypercall_types::expiry_date_to_timestamp("BTC", 20251231),
2231 });
2232
2233 assert_eq!(symbol, "BTC-20251231-100000-C");
2234 }
2235
2236 #[tokio::test]
2237 async fn test_watcher_enters_partial_liquidation_and_recovers_after_orders_clear() {
2238 let harness = setup_standard_partial_liquidation_harness(false).await;
2239
2240 harness
2241 .watcher
2242 .process_health_check(
2243 &harness.wallet,
2244 &liquidatable_health(&harness.wallet),
2245 MarginMode::Standard,
2246 10_000,
2247 )
2248 .await;
2249
2250 let status = harness
2251 .cache
2252 .get_status(&harness.wallet)
2253 .await
2254 .expect("wallet should have liquidation state");
2255 let metadata = match status.state {
2256 LiquidationState::PreLiquidation(metadata) => metadata,
2257 other => panic!("expected pre-liquidation state, got {:?}", other),
2258 };
2259 assert_eq!(metadata.mm_shortfall, dec!(200));
2260 assert_eq!(metadata.target_equity, dec!(315.00));
2261 assert_eq!(metadata.active_order_client_ids.len(), 1);
2262 assert!(
2263 metadata
2264 .active_order_client_ids
2265 .iter()
2266 .all(|client_id| client_id.starts_with("liq-")),
2267 "partial liquidation orders should use liquidation client ids"
2268 );
2269
2270 let open_orders = harness
2271 .order_snapshot
2272 .get_open_orders_for_wallet(&harness.wallet);
2273 assert_eq!(open_orders.len(), 1);
2274 assert_eq!(open_orders[0].side, Side::Buy);
2275 assert_eq!(open_orders[0].symbol, harness.symbol);
2276 assert!(open_orders[0]
2277 .client_id
2278 .as_deref()
2279 .is_some_and(|client_id| client_id.starts_with("liq-")));
2280
2281 harness.order_snapshot.clear_wallet(&harness.wallet);
2282
2283 harness
2284 .watcher
2285 .process_health_check(
2286 &harness.wallet,
2287 &recovered_health(&harness.wallet),
2288 MarginMode::Standard,
2289 20_000,
2290 )
2291 .await;
2292
2293 let recovered_status = harness
2294 .cache
2295 .get_status(&harness.wallet)
2296 .await
2297 .expect("wallet should remain tracked after recovery");
2298 assert!(recovered_status.state.is_healthy());
2299
2300 harness.engine_task.abort();
2301 }
2302
2303 #[tokio::test]
2304 async fn test_watcher_requires_one_extra_cycle_to_recover_after_canceling_open_orders() {
2305 let harness = setup_standard_partial_liquidation_harness(false).await;
2306
2307 harness
2308 .watcher
2309 .process_health_check(
2310 &harness.wallet,
2311 &liquidatable_health(&harness.wallet),
2312 MarginMode::Standard,
2313 10_000,
2314 )
2315 .await;
2316
2317 harness
2318 .watcher
2319 .process_health_check(
2320 &harness.wallet,
2321 &recovered_health(&harness.wallet),
2322 MarginMode::Standard,
2323 20_000,
2324 )
2325 .await;
2326
2327 let intermediate_status = harness
2328 .cache
2329 .get_status(&harness.wallet)
2330 .await
2331 .expect("wallet should remain tracked after cancel cycle");
2332 let intermediate_metadata = match intermediate_status.state {
2333 LiquidationState::PreLiquidation(metadata) => metadata,
2334 other => panic!(
2335 "expected pre-liquidation state after cancel cycle, got {:?}",
2336 other
2337 ),
2338 };
2339 assert_eq!(intermediate_metadata.active_order_client_ids.len(), 1);
2340 assert!(
2341 harness
2342 .order_snapshot
2343 .get_open_orders_for_wallet(&harness.wallet)
2344 .is_empty(),
2345 "tracked liquidation orders should have been canceled in the recovery cycle"
2346 );
2347
2348 harness
2349 .watcher
2350 .process_health_check(
2351 &harness.wallet,
2352 &recovered_health(&harness.wallet),
2353 MarginMode::Standard,
2354 25_000,
2355 )
2356 .await;
2357
2358 let recovered_status = harness
2359 .cache
2360 .get_status(&harness.wallet)
2361 .await
2362 .expect("wallet should remain tracked after recovery");
2363 assert!(recovered_status.state.is_healthy());
2364
2365 harness.engine_task.abort();
2366 }
2367
2368 #[tokio::test]
2369 async fn test_sync_and_cancel_orders_cancels_excess_reduce_only_orders() {
2370 let harness = setup_standard_partial_liquidation_harness(false).await;
2371 harness.order_snapshot.clear_wallet(&harness.wallet);
2372 harness
2373 .portfolio_service
2374 .set_balance(
2375 harness.wallet,
2376 PortfolioBalance {
2377 positions: HashMap::from([(
2378 harness.symbol.clone(),
2379 PositionData {
2380 symbol: harness.symbol.clone(),
2381 amount: dec!(-1),
2382 entry_price: dec!(100),
2383 margin_posted: Decimal::ZERO,
2384 realized_pnl: Decimal::ZERO,
2385 unrealized_pnl: Decimal::ZERO,
2386 },
2387 )]),
2388 total_margin_used: Decimal::ZERO,
2389 },
2390 )
2391 .await;
2392 harness.order_snapshot.insert_order(
2393 harness.wallet,
2394 RuntimeOrderSummary {
2395 order_id: 100,
2396 symbol: harness.symbol.clone(),
2397 side: Side::Buy,
2398 price: dec!(150),
2399 original_size: dec!(1),
2400 remaining_size: dec!(1),
2401 is_perp: false,
2402 mmp_enabled: false,
2403 client_id: Some("close-1".to_string()),
2404 created_at: 10,
2405 },
2406 );
2407 harness.order_snapshot.insert_order(
2408 harness.wallet,
2409 RuntimeOrderSummary {
2410 order_id: 101,
2411 symbol: harness.symbol.clone(),
2412 side: Side::Buy,
2413 price: dec!(151),
2414 original_size: dec!(1),
2415 remaining_size: dec!(1),
2416 is_perp: false,
2417 mmp_enabled: false,
2418 client_id: Some("close-2".to_string()),
2419 created_at: 20,
2420 },
2421 );
2422
2423 harness
2424 .watcher
2425 .sync_and_cancel_orders(&harness.wallet, &[], false)
2426 .await
2427 .expect("sync should succeed");
2428
2429 let open_orders = harness
2430 .order_snapshot
2431 .get_open_orders_for_wallet(&harness.wallet);
2432 assert_eq!(open_orders.len(), 1);
2433 assert_eq!(open_orders[0].order_id, 100);
2434
2435 harness.engine_task.abort();
2436 }
2437
2438 #[tokio::test]
2439 async fn test_sync_and_cancel_orders_skips_when_portfolio_snapshot_missing() {
2440 let harness = setup_standard_partial_liquidation_harness(false).await;
2441 harness.order_snapshot.clear_wallet(&harness.wallet);
2442 harness
2443 .portfolio_service
2444 .clear_balance(&harness.wallet)
2445 .await;
2446 harness.order_snapshot.insert_order(
2447 harness.wallet,
2448 RuntimeOrderSummary {
2449 order_id: 100,
2450 symbol: harness.symbol.clone(),
2451 side: Side::Buy,
2452 price: dec!(150),
2453 original_size: dec!(1),
2454 remaining_size: dec!(1),
2455 is_perp: false,
2456 mmp_enabled: false,
2457 client_id: Some("liq-keep".to_string()),
2458 created_at: 10,
2459 },
2460 );
2461
2462 let sync_result = harness
2463 .watcher
2464 .sync_and_cancel_orders(&harness.wallet, &["liq-keep".to_string()], false)
2465 .await
2466 .expect("sync should succeed");
2467
2468 assert_eq!(sync_result.open_liquidation_client_ids, vec!["liq-keep"]);
2469 let open_orders = harness
2470 .order_snapshot
2471 .get_open_orders_for_wallet(&harness.wallet);
2472 assert_eq!(open_orders.len(), 1);
2473 assert_eq!(open_orders[0].order_id, 100);
2474
2475 harness.engine_task.abort();
2476 }
2477
2478 #[tokio::test]
2479 async fn test_watcher_keeps_pre_liquidation_when_partial_order_is_rejected() {
2480 let harness = setup_standard_partial_liquidation_harness(true).await;
2481
2482 harness
2483 .watcher
2484 .process_health_check(
2485 &harness.wallet,
2486 &liquidatable_health(&harness.wallet),
2487 MarginMode::Standard,
2488 10_000,
2489 )
2490 .await;
2491
2492 let status = harness
2493 .cache
2494 .get_status(&harness.wallet)
2495 .await
2496 .expect("wallet should have liquidation state");
2497 let metadata = match status.state {
2498 LiquidationState::PreLiquidation(metadata) => metadata,
2499 other => panic!("expected pre-liquidation state, got {:?}", other),
2500 };
2501
2502 assert!(
2503 metadata.active_order_request_ids.is_empty(),
2504 "rejected liquidation orders must not remain tracked as active requests"
2505 );
2506 assert!(
2507 metadata.active_order_client_ids.is_empty(),
2508 "rejected liquidation orders must not remain tracked as active client ids"
2509 );
2510 assert_eq!(metadata.last_reprice_at, Some(10_000));
2511 assert!(
2512 harness
2513 .order_snapshot
2514 .get_open_orders_for_wallet(&harness.wallet)
2515 .iter()
2516 .all(|order| !order
2517 .client_id
2518 .as_deref()
2519 .is_some_and(|client_id| client_id.starts_with("liq-"))),
2520 "rejected liquidation orders must not appear in the open-order snapshot"
2521 );
2522
2523 harness.engine_task.abort();
2524 }
2525
2526 #[tokio::test]
2527 async fn test_watcher_continues_polling_tracked_wallets_after_positions_close() {
2528 let harness = setup_standard_partial_liquidation_harness(false).await;
2529
2530 harness
2531 .watcher
2532 .process_health_check(
2533 &harness.wallet,
2534 &liquidatable_health(&harness.wallet),
2535 MarginMode::Standard,
2536 10_000,
2537 )
2538 .await;
2539
2540 harness
2541 .portfolio_service
2542 .set_balance(
2543 harness.wallet,
2544 PortfolioBalance {
2545 positions: HashMap::new(),
2546 total_margin_used: Decimal::ZERO,
2547 },
2548 )
2549 .await;
2550
2551 harness.watcher.check_all_accounts().await.unwrap();
2552
2553 let intermediate_status = harness
2554 .cache
2555 .get_status(&harness.wallet)
2556 .await
2557 .expect("tracked wallet should still be processed after positions close");
2558 assert!(
2559 matches!(
2560 intermediate_status.state,
2561 LiquidationState::PreLiquidation(..)
2562 ),
2563 "first recovery cycle should cancel tracked liquidation orders before clearing state"
2564 );
2565 assert!(
2566 harness
2567 .order_snapshot
2568 .get_open_orders_for_wallet(&harness.wallet)
2569 .is_empty(),
2570 "tracked liquidation orders should be canceled even after positions close"
2571 );
2572
2573 harness.watcher.check_all_accounts().await.unwrap();
2574
2575 let recovered_status = harness
2576 .cache
2577 .get_status(&harness.wallet)
2578 .await
2579 .expect("wallet should remain tracked after recovery");
2580 assert!(recovered_status.state.is_healthy());
2581
2582 harness.engine_task.abort();
2583 }
2584}