Skip to main content

hypercall/liquidator/
watcher.rs

1//! Liquidation watcher background task.
2//!
3//! Polls accounts for liquidation health and drives both backend-managed
4//! partial liquidation and on-chain full liquidation transitions.
5
6use super::cache::LiquidationCache;
7use super::executor::LiquidationExecutor;
8use super::health_check::{check_portfolio_health, check_standard_health, LiquidationHealthResult};
9use super::partial::{
10    compute_partial_bonus_bps, greedy_partial_plan, is_above_partial_target, required_mm_relief,
11    target_equity_from_mm, LiquidationSliceCandidate, PartialLiquidationOrderPlan,
12};
13use super::state::{
14    has_material_projection_change, AccountLiquidationStatus, LiquidationState,
15    PartialLiquidationMetadata,
16};
17use crate::portfolio::{PortfolioService, PositionData};
18use crate::read_cache::tier::TierCache;
19use crate::rsm::liquidation_manager::LiquidationManager;
20use crate::rsm::margin_service::SpanMarginService;
21use crate::rsm::unified_engine::{increment_pending_requests, UnifiedEngineRequest};
22use crate::rsm::MarginMode;
23use crate::shared::order_types::{to_contract_units_decimal, ParsedSymbol};
24use crate::standard_margin::StandardAccountBuilder;
25use anyhow::{anyhow, Context};
26use hypercall_margin::standard::StandardAccount;
27use hypercall_margin::{PortfolioMarginOptionKey, PortfolioMarginSnapshot};
28use hypercall_runtime_api::{OrderSnapshotProvider, QuoteProvider, RuntimeOrderSummary};
29use hypercall_types::WalletAddress;
30use hypercall_types::{
31    EngineMessage, LiquidationStateMessage, LiquidationStateType, OrderAction, OrderActionMessage,
32    OrderInfo, OrderUpdateMessage, OrderUpdateStatus, Side, TimeInForce,
33};
34use metrics::{counter, gauge};
35use rust_decimal::Decimal;
36use rust_decimal_macros::dec;
37use std::collections::{HashMap, HashSet};
38use std::sync::Arc;
39use std::time::{Duration, Instant};
40use tokio::sync::broadcast;
41use tokio::sync::mpsc;
42use tokio::time::{interval, timeout};
43use tracing::{debug, error, info, warn};
44use uuid::Uuid;
45
46const ENGINE_RESPONSE_TIMEOUT: Duration = Duration::from_secs(10);
47const PARTIAL_LIQ_MIN_SIZE: Decimal = dec!(0.00000001);
48const PARTIAL_PLAN_MAX_ROUNDS: usize = 3;
49
50#[derive(Debug)]
51struct PartialCycleOutcome {
52    metadata: PartialLiquidationMetadata,
53    can_recover: bool,
54}
55
56#[derive(Debug)]
57struct OpenOrderSyncResult {
58    open_liquidation_client_ids: Vec<String>,
59}
60
61#[derive(Debug, Clone)]
62struct CancelTarget {
63    order_id: u64,
64    client_id: Option<String>,
65}
66
67/// Configuration for the liquidation watcher.
68#[derive(Clone, Debug)]
69pub struct LiquidationWatcherConfig {
70    /// How often to poll accounts for health (default: 5 seconds).
71    pub poll_interval: Duration,
72    /// Full-liquidation escalation timeout after pre-liquidation begins.
73    pub full_escalation_timeout: Duration,
74    /// Minimum shortfall to trigger pre-liquidation (default: 0, any shortfall triggers).
75    pub min_shortfall_threshold: Decimal,
76    /// Health target buffer for partial liquidation sizing metadata.
77    pub partial_target_buffer_bps: u32,
78    /// Minimum time between partial-liquidation reprices.
79    pub partial_reprice_interval: Duration,
80    /// Starting liquidation bonus in basis points.
81    pub partial_bonus_start_bps: u32,
82    /// Maximum liquidation bonus in basis points.
83    pub partial_bonus_max_bps: u32,
84    /// Time for bonus ramp from start to max.
85    pub partial_bonus_ramp_ms: u64,
86}
87
88impl Default for LiquidationWatcherConfig {
89    fn default() -> Self {
90        Self {
91            poll_interval: Duration::from_secs(5),
92            full_escalation_timeout: Duration::from_secs(60),
93            min_shortfall_threshold: dec!(0),
94            partial_target_buffer_bps: 500,
95            partial_reprice_interval: Duration::from_secs(5),
96            partial_bonus_start_bps: 50,
97            partial_bonus_max_bps: 1_000,
98            partial_bonus_ramp_ms: 300_000,
99        }
100    }
101}
102
103impl LiquidationWatcherConfig {
104    pub fn from_runtime_config(config: &crate::backend_config::LiquidationRuntimeConfig) -> Self {
105        Self {
106            poll_interval: Duration::from_millis(config.health_poll_interval_ms),
107            full_escalation_timeout: Duration::from_millis(config.full_escalation_timeout_ms),
108            min_shortfall_threshold: config.min_shortfall_threshold,
109            partial_target_buffer_bps: config.partial_target_buffer_bps,
110            partial_reprice_interval: Duration::from_millis(config.partial_reprice_interval_ms),
111            partial_bonus_start_bps: config.partial_bonus_start_bps,
112            partial_bonus_max_bps: config.partial_bonus_max_bps,
113            partial_bonus_ramp_ms: config.partial_bonus_ramp_ms,
114        }
115    }
116}
117
118/// Liquidation watcher background task.
119///
120/// Monitors accounts for margin health and triggers liquidation state transitions.
121pub struct LiquidationWatcher {
122    /// Liquidation state cache.
123    cache: Arc<LiquidationCache>,
124    /// Executed portfolio state used for liquidation health and sizing.
125    portfolio_service: Arc<dyn PortfolioService + Send + Sync>,
126    /// Tier cache for margin mode lookup.
127    tier_cache: Option<Arc<TierCache>>,
128    /// SPAN margin service for portfolio margin accounts.
129    span_margin_service: Arc<SpanMarginService>,
130    /// Standard margin service.
131    standard_margin_service: Arc<crate::standard_margin::StandardMarginService>,
132    /// Risk account builder for portfolio margin.
133    risk_account_builder: Option<Arc<crate::rsm::portfolio_margin::RiskAccountBuilder>>,
134    /// Standard account builder for standard margin.
135    standard_account_builder: Option<Arc<StandardAccountBuilder>>,
136    /// Quote provider for liquidation pricing (BBO from orderbook).
137    quote_provider: Option<Arc<dyn QuoteProvider>>,
138    /// Greeks cache for theoretical price fallback when BBO unavailable.
139    greeks_cache: Option<Arc<crate::read_cache::greeks::GreeksCache>>,
140    /// Open-order snapshot provider for cancel/reprice decisions.
141    order_snapshot: Option<Arc<dyn OrderSnapshotProvider>>,
142    /// Order sender for backend-generated partial liquidation orders.
143    order_sender: Option<mpsc::Sender<UnifiedEngineRequest>>,
144    /// Event sender for publishing state changes.
145    event_sender: Option<mpsc::UnboundedSender<EngineMessage>>,
146    /// Liquidation executor for onchain calls.
147    executor: Option<Arc<LiquidationExecutor>>,
148    /// Configuration.
149    config: LiquidationWatcherConfig,
150}
151
152impl LiquidationWatcher {
153    /// Create a new liquidation watcher.
154    pub fn new(
155        cache: Arc<LiquidationCache>,
156        portfolio_service: Arc<dyn PortfolioService + Send + Sync>,
157        tier_cache: Arc<TierCache>,
158        span_margin_service: Arc<SpanMarginService>,
159        standard_margin_service: Arc<crate::standard_margin::StandardMarginService>,
160        config: LiquidationWatcherConfig,
161    ) -> Self {
162        Self {
163            cache,
164            portfolio_service,
165            tier_cache: Some(tier_cache),
166            span_margin_service,
167            standard_margin_service,
168            risk_account_builder: None,
169            standard_account_builder: None,
170            quote_provider: None,
171            greeks_cache: None,
172            order_snapshot: None,
173            order_sender: None,
174            event_sender: None,
175            executor: None,
176            config,
177        }
178    }
179
180    #[cfg(test)]
181    fn new_for_tests_without_tier_cache(
182        cache: Arc<LiquidationCache>,
183        portfolio_service: Arc<dyn PortfolioService + Send + Sync>,
184        span_margin_service: Arc<SpanMarginService>,
185        standard_margin_service: Arc<crate::standard_margin::StandardMarginService>,
186        config: LiquidationWatcherConfig,
187    ) -> Self {
188        Self {
189            cache,
190            portfolio_service,
191            tier_cache: None,
192            span_margin_service,
193            standard_margin_service,
194            risk_account_builder: None,
195            standard_account_builder: None,
196            quote_provider: None,
197            greeks_cache: None,
198            order_snapshot: None,
199            order_sender: None,
200            event_sender: None,
201            executor: None,
202            config,
203        }
204    }
205
206    /// Set the risk account builder for portfolio margin accounts.
207    pub fn with_risk_account_builder(
208        mut self,
209        builder: Arc<crate::rsm::portfolio_margin::RiskAccountBuilder>,
210    ) -> Self {
211        self.risk_account_builder = Some(builder);
212        self
213    }
214
215    /// Set the standard account builder.
216    pub fn with_standard_account_builder(mut self, builder: Arc<StandardAccountBuilder>) -> Self {
217        self.standard_account_builder = Some(builder);
218        self
219    }
220
221    /// Set the quote provider used for liquidation repricing.
222    pub fn with_quote_provider(mut self, provider: Arc<dyn QuoteProvider>) -> Self {
223        self.quote_provider = Some(provider);
224        self
225    }
226
227    /// Set the greeks cache for theoretical price fallback.
228    pub fn with_greeks_cache(mut self, cache: Arc<crate::read_cache::greeks::GreeksCache>) -> Self {
229        self.greeks_cache = Some(cache);
230        self
231    }
232
233    /// Set the order snapshot provider used for liquidation cancels.
234    pub fn with_order_snapshot(mut self, snapshot: Arc<dyn OrderSnapshotProvider>) -> Self {
235        self.order_snapshot = Some(snapshot);
236        self
237    }
238
239    /// Set the order sender used for backend-generated liquidation orders.
240    pub fn with_order_sender(mut self, sender: mpsc::Sender<UnifiedEngineRequest>) -> Self {
241        self.order_sender = Some(sender);
242        self
243    }
244
245    /// Set the event sender for publishing state changes.
246    pub fn with_event_sender(mut self, sender: mpsc::UnboundedSender<EngineMessage>) -> Self {
247        self.event_sender = Some(sender);
248        self
249    }
250
251    /// Set the liquidation executor for onchain calls.
252    pub fn with_executor(mut self, executor: Arc<LiquidationExecutor>) -> Self {
253        self.executor = Some(executor);
254        self
255    }
256
257    /// Run the watcher loop.
258    pub async fn run(&self) {
259        info!(
260            "Starting liquidation watcher with poll interval {:?}",
261            self.config.poll_interval
262        );
263
264        let mut interval = interval(self.config.poll_interval);
265
266        loop {
267            interval.tick().await;
268
269            if let Err(e) = self.check_all_accounts().await {
270                error!("Error checking accounts for liquidation: {}", e);
271            }
272        }
273    }
274
275    pub async fn run_with_shutdown(&self, mut shutdown_rx: broadcast::Receiver<()>) {
276        info!(
277            "Starting liquidation watcher with poll interval {:?}",
278            self.config.poll_interval
279        );
280
281        let mut interval = interval(self.config.poll_interval);
282
283        loop {
284            tokio::select! {
285                _ = shutdown_rx.recv() => {
286                    info!("Liquidation watcher received shutdown signal");
287                    break;
288                }
289                _ = interval.tick() => {
290                    if let Err(e) = self.check_all_accounts().await {
291                        error!("Error checking accounts for liquidation: {}", e);
292                    }
293                }
294            }
295        }
296    }
297
298    /// Check all accounts for liquidation health.
299    async fn check_all_accounts(&self) -> anyhow::Result<()> {
300        let now = get_timestamp_millis();
301
302        // Health polling is driven by executed portfolios, but wallets already in the
303        // liquidation state machine must continue to be processed even after their last
304        // short option is closed. Otherwise pre-liquidation can never clear.
305        let all_portfolios = self.portfolio_service.all_portfolios().await;
306        let mut wallets_to_check: HashSet<WalletAddress> = all_portfolios.keys().copied().collect();
307        for wallet in self.cache.get_all_wallets().await {
308            if matches!(
309                self.cache.get_state(&wallet).await,
310                LiquidationState::PreLiquidation(..) | LiquidationState::InLiquidation(..)
311            ) {
312                wallets_to_check.insert(wallet);
313            }
314        }
315
316        let wallet_count = wallets_to_check.len();
317        if wallet_count == 0 {
318            gauge!("ht_liquidation_partial_active_orders").set(0.0);
319            return Ok(());
320        }
321
322        debug!("Checking {} accounts for liquidation health", wallet_count);
323
324        let mut pre_liq_count = 0;
325        let mut healthy_count = 0;
326        let mut active_partial_orders = 0usize;
327
328        for wallet in wallets_to_check {
329            let has_positions = all_portfolios
330                .get(&wallet)
331                .is_some_and(|portfolio| !portfolio.positions.is_empty());
332            let Some(existing_status) = self.cache.get_status(&wallet).await else {
333                if !has_positions {
334                    continue;
335                }
336                // Status will be initialized after health is computed below.
337                // Fall through for accounts discovered via the portfolio cache.
338                let Ok(margin_mode) = self.margin_mode_for_wallet(&wallet).await else {
339                    warn!(
340                        "Failed to determine margin mode for newly discovered wallet {}",
341                        wallet
342                    );
343                    continue;
344                };
345                let health_result = match margin_mode {
346                    MarginMode::Portfolio => self.check_portfolio_margin_health(&wallet).await,
347                    MarginMode::Standard => self.check_standard_margin_health(&wallet).await,
348                };
349
350                let health = match health_result {
351                    Ok(h) => h,
352                    Err(e) => {
353                        warn!("Failed to check health for {}: {}", wallet, e);
354                        continue;
355                    }
356                };
357
358                self.process_health_check(&wallet, &health, margin_mode, now)
359                    .await;
360
361                if let Some(status) = self.cache.get_status(&wallet).await {
362                    if let LiquidationState::PreLiquidation(metadata) = &status.state {
363                        active_partial_orders += metadata.active_order_client_ids.len();
364                    }
365                }
366
367                if health.is_liquidatable {
368                    pre_liq_count += 1;
369                } else {
370                    healthy_count += 1;
371                }
372                continue;
373            };
374            if !has_positions && existing_status.state.is_healthy() {
375                continue;
376            }
377
378            let Ok(margin_mode) = self.margin_mode_for_wallet(&wallet).await else {
379                warn!(
380                    "Failed to determine margin mode for tracked wallet {}",
381                    wallet
382                );
383                continue;
384            };
385            let health_result = match margin_mode {
386                MarginMode::Portfolio => self.check_portfolio_margin_health(&wallet).await,
387                MarginMode::Standard => self.check_standard_margin_health(&wallet).await,
388            };
389
390            let health = match health_result {
391                Ok(h) => h,
392                Err(e) => {
393                    warn!("Failed to check health for {}: {}", wallet, e);
394                    continue;
395                }
396            };
397
398            self.process_health_check(&wallet, &health, margin_mode, now)
399                .await;
400
401            if let Some(status) = self.cache.get_status(&wallet).await {
402                if let LiquidationState::PreLiquidation(metadata) = &status.state {
403                    active_partial_orders += metadata.active_order_client_ids.len();
404                }
405            }
406
407            if health.is_liquidatable {
408                pre_liq_count += 1;
409            } else {
410                healthy_count += 1;
411            }
412        }
413
414        gauge!("ht_liquidation_partial_active_orders").set(active_partial_orders as f64);
415
416        if pre_liq_count > 0 {
417            info!(
418                "Liquidation check complete: {} healthy, {} in/near pre-liquidation",
419                healthy_count, pre_liq_count
420            );
421        }
422
423        Ok(())
424    }
425
426    async fn margin_mode_for_wallet(&self, wallet: &WalletAddress) -> anyhow::Result<MarginMode> {
427        if let Some(tier_cache) = &self.tier_cache {
428            return tier_cache.get_margin_mode(wallet).await;
429        }
430
431        self.cache
432            .get_status(wallet)
433            .await
434            .map(|status| status.margin_mode)
435            .ok_or_else(|| {
436                anyhow!(
437                    "tier_cache not configured and wallet {} not tracked",
438                    wallet
439                )
440            })
441    }
442
443    /// Check health for a portfolio margin account.
444    async fn check_portfolio_margin_health(
445        &self,
446        wallet: &WalletAddress,
447    ) -> anyhow::Result<LiquidationHealthResult> {
448        let builder = self
449            .risk_account_builder
450            .as_ref()
451            .ok_or_else(|| anyhow!("RiskAccountBuilder not set"))?;
452
453        let account = builder
454            .build_executed_account_for_risk(wallet)
455            .await
456            .map_err(|e| anyhow!("Failed to build risk account: {:?}", e))?;
457
458        Ok(check_portfolio_health(
459            &wallet.to_string(),
460            &account,
461            self.span_margin_service.as_ref(),
462        )
463        .await)
464    }
465
466    /// Check health for a standard margin account.
467    async fn check_standard_margin_health(
468        &self,
469        wallet: &WalletAddress,
470    ) -> anyhow::Result<LiquidationHealthResult> {
471        let builder = self
472            .standard_account_builder
473            .as_ref()
474            .ok_or_else(|| anyhow!("StandardAccountBuilder not set"))?;
475
476        let account = builder
477            .build(wallet)
478            .await
479            .map_err(|e| anyhow!("Failed to build standard account: {}", e))?;
480
481        Ok(check_standard_health(
482            &wallet.to_string(),
483            &account,
484            self.standard_margin_service.as_ref(),
485        ))
486    }
487
488    /// Process health check result and trigger state transitions.
489    async fn process_health_check(
490        &self,
491        wallet: &WalletAddress,
492        health: &LiquidationHealthResult,
493        margin_mode: MarginMode,
494        now: u64,
495    ) {
496        let equity = decimal_from_f64(health.equity, "equity");
497        let mm_required = decimal_from_f64(health.mm_required, "mm_required");
498        let shortfall = decimal_from_f64(health.shortfall(), "shortfall");
499
500        self.cache
501            .init_if_absent(*wallet, margin_mode, equity, mm_required, now)
502            .await;
503        self.cache
504            .update_health(wallet, equity, mm_required, now)
505            .await;
506
507        let Some(status) = self.cache.get_status(wallet).await else {
508            return;
509        };
510
511        match status.state.clone() {
512            LiquidationState::Healthy => {
513                if health.is_liquidatable && shortfall >= self.config.min_shortfall_threshold {
514                    let metadata = PartialLiquidationMetadata {
515                        entered_at: now,
516                        mm_shortfall: shortfall,
517                        target_equity: target_equity_from_mm(
518                            mm_required,
519                            self.config.partial_target_buffer_bps,
520                        ),
521                        escalation_deadline: now
522                            + self.config.full_escalation_timeout.as_millis() as u64,
523                        last_reprice_at: None,
524                        active_order_request_ids: Vec::new(),
525                        active_order_client_ids: Vec::new(),
526                        bonus_bps: self.config.partial_bonus_start_bps,
527                        pending_full_auction_id: None,
528                        pending_full_request_id: None,
529                        pending_full_tx_hash: None,
530                        pending_full_margin_needed: None,
531                    };
532
533                    if let Some(previous) = self
534                        .cache
535                        .enter_pre_liquidation(wallet, metadata, now)
536                        .await
537                    {
538                        counter!("ht_liquidation_partial_enter_total").increment(1);
539                        if let Some(updated_status) = self.cache.get_status(wallet).await {
540                            self.emit_state_change(&previous, &updated_status, now);
541                        }
542                    }
543
544                    self.handle_pre_liquidation_state(wallet, margin_mode, now)
545                        .await;
546                }
547            }
548            LiquidationState::PreLiquidation(..) => {
549                self.handle_pre_liquidation_state(wallet, margin_mode, now)
550                    .await;
551            }
552            LiquidationState::InLiquidation(metadata) => {
553                if !health.is_liquidatable {
554                    if metadata.stop_request_id.is_some() {
555                        debug!(
556                            "Wallet {} recovered while liquidation stop request is already pending",
557                            wallet
558                        );
559                    } else if let Some(chain_start_time) = metadata.chain_start_time {
560                        if let Some(executor) = &self.executor {
561                            match executor.stop_auction(wallet, chain_start_time).await {
562                                Ok(request_id) => {
563                                    info!(
564                                        "Enqueued stop liquidation request {} for recovered wallet {}",
565                                        request_id, wallet
566                                    );
567                                }
568                                Err(e) => {
569                                    error!(
570                                        "Failed to enqueue stop liquidation for wallet {}: {}",
571                                        wallet, e
572                                    );
573                                }
574                            }
575                        }
576                    } else {
577                        debug!(
578                            "Wallet {} recovered but liquidation has no observed chain_start_time yet",
579                            wallet
580                        );
581                    }
582                }
583            }
584            LiquidationState::Liquidated(..) => {}
585        }
586    }
587
588    async fn handle_pre_liquidation_state(
589        &self,
590        wallet: &WalletAddress,
591        margin_mode: MarginMode,
592        now: u64,
593    ) {
594        let Some(status) = self.cache.get_status(wallet).await else {
595            return;
596        };
597
598        let LiquidationState::PreLiquidation(mut metadata) = status.state.clone() else {
599            return;
600        };
601
602        let original_metadata = metadata.clone();
603        metadata.mm_shortfall = status.shortfall();
604        metadata.target_equity =
605            target_equity_from_mm(status.mm_required, self.config.partial_target_buffer_bps);
606        let above_partial_target = is_above_partial_target(
607            status.equity,
608            status.mm_required,
609            self.config.partial_target_buffer_bps,
610        );
611
612        if metadata.pending_full_request_id.is_some() {
613            if now
614                > metadata.escalation_deadline
615                    + self.config.full_escalation_timeout.as_millis() as u64
616            {
617                warn!(
618                    "Clearing stale pending_full_request_id for wallet {} (deadline+timeout exceeded)",
619                    wallet
620                );
621                metadata.pending_full_auction_id = None;
622                metadata.pending_full_request_id = None;
623                metadata.pending_full_tx_hash = None;
624                metadata.pending_full_margin_needed = None;
625                // Persist cleared state to cache before escalation retry
626                self.persist_pre_liquidation_metadata(wallet, metadata.clone(), now)
627                    .await;
628            } else {
629                if metadata != original_metadata {
630                    self.persist_pre_liquidation_metadata(wallet, metadata, now)
631                        .await;
632                }
633                return;
634            }
635        }
636
637        let has_short_option_positions = self.has_short_option_positions(wallet).await;
638        if !above_partial_target
639            && (status.equity < Decimal::ZERO
640                || matches!(has_short_option_positions, Some(false))
641                || now >= metadata.escalation_deadline)
642        {
643            if let Err(error) = self
644                .sync_and_cancel_orders(wallet, &metadata.active_order_client_ids, true)
645                .await
646            {
647                warn!(
648                    "Failed to synchronize orders before full-liquidation escalation for {}: {}",
649                    wallet, error
650                );
651            }
652
653            if let Some(executor) = &self.executor {
654                match executor.start_auction(wallet).await {
655                    Ok(auction_id) => {
656                        info!(
657                            "Enqueued liquidation auction {} for wallet {} from pre-liquidation",
658                            auction_id, wallet
659                        );
660                    }
661                    Err(error) => {
662                        error!(
663                            "Failed to enqueue liquidation auction for wallet {}: {}",
664                            wallet, error
665                        );
666                    }
667                }
668            } else {
669                debug!(
670                    "Wallet {} reached full-liquidation escalation but no executor is configured",
671                    wallet
672                );
673            }
674            return;
675        }
676
677        match self
678            .run_partial_liquidation_cycle(wallet, margin_mode, &status, metadata, now)
679            .await
680        {
681            Ok(outcome) => {
682                if outcome.can_recover {
683                    if let Some(previous) = self.cache.recover_to_healthy(wallet, now).await {
684                        counter!("ht_liquidation_partial_exit_total").increment(1);
685                        if let Some(updated_status) = self.cache.get_status(wallet).await {
686                            self.emit_state_change(&previous, &updated_status, now);
687                        }
688                    }
689                } else if let Some(current_status) = self.cache.get_status(wallet).await {
690                    if let LiquidationState::PreLiquidation(current_metadata) = current_status.state
691                    {
692                        if current_metadata != outcome.metadata {
693                            self.persist_pre_liquidation_metadata(wallet, outcome.metadata, now)
694                                .await;
695                        }
696                    }
697                }
698            }
699            Err(error) => {
700                error!(
701                    "Failed to run partial liquidation cycle for wallet {}: {}",
702                    wallet, error
703                );
704            }
705        }
706    }
707
708    // TODO: The reprice decision is driven by a wall-clock timer
709    // (partial_reprice_interval_ms), not an engine command. For green/blue
710    // deploys or multi-instance setups, this should become a
711    // TickLiquidation engine command so all nodes agree on reprice timing
712    // and only one instance submits orders. See also: NATS standby
713    // controller could gate the watcher loop as a shorter-term fix.
714    async fn run_partial_liquidation_cycle(
715        &self,
716        wallet: &WalletAddress,
717        margin_mode: MarginMode,
718        status: &AccountLiquidationStatus,
719        mut metadata: PartialLiquidationMetadata,
720        now: u64,
721    ) -> anyhow::Result<PartialCycleOutcome> {
722        metadata.mm_shortfall = status.shortfall();
723        metadata.target_equity =
724            target_equity_from_mm(status.mm_required, self.config.partial_target_buffer_bps);
725        metadata.bonus_bps = compute_partial_bonus_bps(
726            self.config.partial_bonus_start_bps,
727            self.config.partial_bonus_max_bps,
728            self.config.partial_bonus_ramp_ms,
729            metadata.entered_at,
730            now,
731        );
732
733        let above_target = is_above_partial_target(
734            status.equity,
735            status.mm_required,
736            self.config.partial_target_buffer_bps,
737        );
738        let reprice_due = metadata.last_reprice_at.is_none_or(|last_reprice_at| {
739            now.saturating_sub(last_reprice_at)
740                >= self.config.partial_reprice_interval.as_millis() as u64
741        });
742
743        let sync_result = self
744            .sync_and_cancel_orders(
745                wallet,
746                &metadata.active_order_client_ids,
747                above_target || reprice_due,
748            )
749            .await?;
750
751        if above_target {
752            metadata.active_order_request_ids.clear();
753            metadata.active_order_client_ids = sync_result.open_liquidation_client_ids;
754            let can_recover = metadata.active_order_client_ids.is_empty();
755            return Ok(PartialCycleOutcome {
756                metadata,
757                can_recover,
758            });
759        }
760
761        metadata.active_order_client_ids = sync_result.open_liquidation_client_ids;
762        if !reprice_due {
763            if metadata.active_order_client_ids.is_empty() {
764                metadata.active_order_request_ids.clear();
765            }
766            return Ok(PartialCycleOutcome {
767                metadata,
768                can_recover: false,
769            });
770        }
771
772        let plans = match margin_mode {
773            MarginMode::Standard => {
774                self.build_standard_partial_orders(wallet, metadata.bonus_bps)
775                    .await?
776            }
777            MarginMode::Portfolio => {
778                self.build_portfolio_partial_orders(wallet, metadata.bonus_bps)
779                    .await?
780            }
781        };
782
783        metadata.last_reprice_at = Some(now);
784        metadata.active_order_request_ids.clear();
785        metadata.active_order_client_ids.clear();
786
787        if plans.is_empty() {
788            return Ok(PartialCycleOutcome {
789                metadata,
790                can_recover: false,
791            });
792        }
793
794        let mut submitted_orders = 0u64;
795        for (index, plan) in plans.iter().enumerate() {
796            match self
797                .submit_partial_liquidation_order(wallet, plan, now, index)
798                .await
799            {
800                Ok((request_id, client_id, response)) => {
801                    if response.status != OrderUpdateStatus::Rejected {
802                        metadata.active_order_request_ids.push(request_id);
803                        metadata.active_order_client_ids.push(client_id);
804                        submitted_orders += 1;
805                    } else {
806                        warn!(
807                            "Partial liquidation order rejected for wallet {} symbol {}: {}",
808                            wallet,
809                            plan.symbol,
810                            response
811                                .reason
812                                .as_deref()
813                                .unwrap_or("missing rejection reason")
814                        );
815                    }
816                }
817                Err(error) => {
818                    error!(
819                        "Failed to submit partial liquidation order for wallet {} symbol {}: {}",
820                        wallet, plan.symbol, error
821                    );
822                }
823            }
824        }
825
826        if submitted_orders > 0 {
827            counter!("ht_liquidation_partial_orders_submitted_total").increment(submitted_orders);
828        }
829
830        Ok(PartialCycleOutcome {
831            metadata,
832            can_recover: false,
833        })
834    }
835
836    async fn persist_pre_liquidation_metadata(
837        &self,
838        wallet: &WalletAddress,
839        metadata: PartialLiquidationMetadata,
840        now: u64,
841    ) {
842        let Some(current_status) = self.cache.get_status(wallet).await else {
843            return;
844        };
845
846        if !matches!(current_status.state, LiquidationState::PreLiquidation(..)) {
847            return;
848        }
849
850        let previous_state = current_status.state.clone();
851        let mut updated_status = current_status;
852        updated_status.state = LiquidationState::PreLiquidation(metadata);
853        updated_status.updated_at = now;
854        self.cache.set_status(updated_status.clone()).await;
855        self.emit_state_change(&previous_state, &updated_status, now);
856    }
857
858    async fn sync_and_cancel_orders(
859        &self,
860        wallet: &WalletAddress,
861        tracked_liquidation_client_ids: &[String],
862        cancel_tracked_liquidation_orders: bool,
863    ) -> anyhow::Result<OpenOrderSyncResult> {
864        let order_snapshot = self
865            .order_snapshot
866            .as_ref()
867            .context("OrderSnapshotProvider not set")?;
868        let open_orders = order_snapshot.get_open_orders_for_wallet(wallet);
869        let tracked_liquidation_client_ids: HashSet<&str> = tracked_liquidation_client_ids
870            .iter()
871            .map(String::as_str)
872            .collect();
873        let open_liquidation_client_ids = open_orders
874            .iter()
875            .filter_map(|order| order.client_id.as_deref())
876            .filter(|client_id| tracked_liquidation_client_ids.contains(client_id))
877            .map(str::to_owned)
878            .collect::<Vec<_>>();
879        let Some(current_positions) = self.try_current_position_sizes(wallet).await else {
880            warn!(
881                wallet = %wallet,
882                "Skipping open-order liquidation cancel sync because portfolio snapshot is unavailable"
883            );
884            return Ok(OpenOrderSyncResult {
885                open_liquidation_client_ids,
886            });
887        };
888        let reduce_only_order_ids =
889            self.reduce_only_open_order_ids(&current_positions, &open_orders);
890
891        let mut cancel_targets = Vec::new();
892        let mut seen_targets = HashSet::new();
893
894        for order in &open_orders {
895            let tracked_client_id = order
896                .client_id
897                .as_deref()
898                .filter(|client_id| tracked_liquidation_client_ids.contains(client_id))
899                .map(ToOwned::to_owned);
900
901            let should_cancel = !reduce_only_order_ids.contains(&order.order_id)
902                || (cancel_tracked_liquidation_orders && tracked_client_id.is_some());
903            if !should_cancel {
904                continue;
905            }
906
907            let target_key = tracked_client_id
908                .clone()
909                .unwrap_or_else(|| format!("oid:{}", order.order_id));
910            if seen_targets.insert(target_key) {
911                cancel_targets.push(CancelTarget {
912                    order_id: order.order_id,
913                    client_id: tracked_client_id.or_else(|| order.client_id.clone()),
914                });
915            }
916        }
917
918        for target in cancel_targets {
919            if let Err(error) = self.cancel_open_order(wallet, &target).await {
920                warn!(
921                    "Failed to cancel open order {} for wallet {}: {}",
922                    target.order_id, wallet, error
923                );
924            }
925        }
926
927        Ok(OpenOrderSyncResult {
928            open_liquidation_client_ids,
929        })
930    }
931
932    async fn build_standard_partial_orders(
933        &self,
934        wallet: &WalletAddress,
935        bonus_bps: u32,
936    ) -> anyhow::Result<Vec<PartialLiquidationOrderPlan>> {
937        let builder = self
938            .standard_account_builder
939            .as_ref()
940            .context("StandardAccountBuilder not set")?;
941        let expected_prices = self.partial_execution_prices(wallet, bonus_bps).await?;
942        let mut simulated_account = builder.build(wallet).await.map_err(|error| {
943            anyhow!("Failed to build standard account for {}: {}", wallet, error)
944        })?;
945        let mut aggregated_plans = Vec::new();
946
947        for _ in 0..PARTIAL_PLAN_MAX_ROUNDS {
948            let margin = self
949                .standard_margin_service
950                .compute_margin(&simulated_account);
951            let required_relief_amount = required_mm_relief(
952                margin.equity,
953                margin.position_mm,
954                self.config.partial_target_buffer_bps,
955            );
956            if required_relief_amount <= PARTIAL_LIQ_MIN_SIZE {
957                break;
958            }
959
960            let candidates = self.standard_partial_candidates(&simulated_account, &expected_prices);
961            let round_plans = greedy_partial_plan(&candidates, required_relief_amount)
962                .into_iter()
963                .filter(|plan| plan.close_size > PARTIAL_LIQ_MIN_SIZE)
964                .collect::<Vec<_>>();
965            if round_plans.is_empty() {
966                break;
967            }
968
969            for plan in round_plans {
970                self.apply_standard_close(
971                    &mut simulated_account,
972                    &plan.symbol,
973                    plan.close_size,
974                    plan.limit_price,
975                )?;
976                merge_partial_plan(&mut aggregated_plans, plan);
977            }
978        }
979
980        Ok(aggregated_plans)
981    }
982
983    async fn build_portfolio_partial_orders(
984        &self,
985        wallet: &WalletAddress,
986        bonus_bps: u32,
987    ) -> anyhow::Result<Vec<PartialLiquidationOrderPlan>> {
988        let builder = self
989            .risk_account_builder
990            .as_ref()
991            .context("RiskAccountBuilder not set")?;
992        let expected_prices = self.partial_execution_prices(wallet, bonus_bps).await?;
993        let mut simulated_snapshot = builder
994            .build_snapshot(wallet)
995            .await
996            .map_err(|error| anyhow!("Failed to build PM snapshot for {}: {:?}", wallet, error))?
997            .without_open_orders();
998        let mut aggregated_plans = Vec::new();
999
1000        for _ in 0..PARTIAL_PLAN_MAX_ROUNDS {
1001            let margin = self.compute_pm_margin(builder, &simulated_snapshot)?;
1002            let required_relief_amount = required_mm_relief(
1003                margin.equity,
1004                margin.maintenance_margin_required,
1005                self.config.partial_target_buffer_bps,
1006            );
1007            if required_relief_amount <= PARTIAL_LIQ_MIN_SIZE {
1008                break;
1009            }
1010
1011            let candidates =
1012                self.pm_partial_candidates(builder, &simulated_snapshot, &expected_prices)?;
1013            let round_plans = greedy_partial_plan(&candidates, required_relief_amount)
1014                .into_iter()
1015                .filter(|plan| plan.close_size > PARTIAL_LIQ_MIN_SIZE)
1016                .collect::<Vec<_>>();
1017            if round_plans.is_empty() {
1018                break;
1019            }
1020
1021            for plan in round_plans {
1022                self.apply_pm_option_close(
1023                    &mut simulated_snapshot,
1024                    &plan.symbol,
1025                    plan.close_size,
1026                    plan.limit_price,
1027                )?;
1028                merge_partial_plan(&mut aggregated_plans, plan);
1029            }
1030        }
1031
1032        Ok(aggregated_plans)
1033    }
1034
1035    fn standard_partial_candidates(
1036        &self,
1037        account: &StandardAccount,
1038        expected_prices: &HashMap<String, Decimal>,
1039    ) -> Vec<LiquidationSliceCandidate> {
1040        let current_margin = self.standard_margin_service.compute_margin(account);
1041        let mut candidates = Vec::new();
1042
1043        for option_position in account.short_options() {
1044            let Some(expected_price) = expected_prices.get(&option_position.symbol).copied() else {
1045                continue;
1046            };
1047            let close_step = option_position.abs_size().min(Decimal::ONE);
1048            if close_step <= PARTIAL_LIQ_MIN_SIZE {
1049                continue;
1050            }
1051
1052            let mut simulated_account = account.clone();
1053            if self
1054                .apply_standard_close(
1055                    &mut simulated_account,
1056                    &option_position.symbol,
1057                    close_step,
1058                    expected_price,
1059                )
1060                .is_err()
1061            {
1062                continue;
1063            }
1064
1065            let new_margin = self
1066                .standard_margin_service
1067                .compute_margin(&simulated_account);
1068            let mm_relief_per_unit =
1069                (current_margin.position_mm - new_margin.position_mm) / close_step;
1070            if mm_relief_per_unit <= Decimal::ZERO {
1071                continue;
1072            }
1073
1074            candidates.push(LiquidationSliceCandidate {
1075                symbol: option_position.symbol.clone(),
1076                max_close_size: option_position.abs_size(),
1077                expected_fill_price: expected_price,
1078                mm_relief_per_unit,
1079            });
1080        }
1081
1082        candidates
1083    }
1084
1085    fn pm_partial_candidates(
1086        &self,
1087        builder: &crate::rsm::portfolio_margin::RiskAccountBuilder,
1088        snapshot: &PortfolioMarginSnapshot,
1089        expected_prices: &HashMap<String, Decimal>,
1090    ) -> anyhow::Result<Vec<LiquidationSliceCandidate>> {
1091        let current_margin = self.compute_pm_margin(builder, snapshot)?;
1092        let mut candidates = Vec::new();
1093
1094        for underlying in &snapshot.underlyings {
1095            for option in &underlying.executed_options {
1096                if option.quantity >= Decimal::ZERO {
1097                    continue;
1098                }
1099
1100                let symbol = format_pm_option_symbol(&option.key);
1101                let Some(expected_price) = expected_prices.get(&symbol).copied() else {
1102                    continue;
1103                };
1104
1105                let close_step = option.quantity.abs().min(Decimal::ONE);
1106                if close_step <= PARTIAL_LIQ_MIN_SIZE {
1107                    continue;
1108                }
1109
1110                let mut simulated_snapshot = snapshot.clone();
1111                if self
1112                    .apply_pm_option_close(
1113                        &mut simulated_snapshot,
1114                        &symbol,
1115                        close_step,
1116                        expected_price,
1117                    )
1118                    .is_err()
1119                {
1120                    continue;
1121                }
1122
1123                let new_margin = self.compute_pm_margin(builder, &simulated_snapshot)?;
1124                let mm_relief_per_unit = (current_margin.maintenance_margin_required
1125                    - new_margin.maintenance_margin_required)
1126                    / close_step;
1127                if mm_relief_per_unit <= Decimal::ZERO {
1128                    continue;
1129                }
1130
1131                candidates.push(LiquidationSliceCandidate {
1132                    symbol,
1133                    max_close_size: option.quantity.abs(),
1134                    expected_fill_price: expected_price,
1135                    mm_relief_per_unit,
1136                });
1137            }
1138        }
1139
1140        Ok(candidates)
1141    }
1142
1143    fn compute_pm_margin(
1144        &self,
1145        builder: &crate::rsm::portfolio_margin::RiskAccountBuilder,
1146        snapshot: &PortfolioMarginSnapshot,
1147    ) -> anyhow::Result<crate::types::MarginDetails> {
1148        let market_state = builder
1149            .resolve_market_state(snapshot, self.span_margin_service.config())
1150            .map_err(|error| anyhow!("Failed to resolve PM market state: {:?}", error))?;
1151
1152        self.span_margin_service
1153            .compute_margin_from_snapshot(snapshot, market_state)
1154            .map_err(|error| anyhow!("Failed to compute PM margin: {}", error))
1155    }
1156
1157    fn apply_standard_close(
1158        &self,
1159        account: &mut StandardAccount,
1160        symbol: &str,
1161        close_size: Decimal,
1162        execution_price: Decimal,
1163    ) -> anyhow::Result<()> {
1164        if close_size <= PARTIAL_LIQ_MIN_SIZE {
1165            return Ok(());
1166        }
1167
1168        let option_index = account
1169            .option_positions
1170            .iter()
1171            .position(|position| position.symbol == symbol)
1172            .ok_or_else(|| anyhow!("Missing standard option position for {}", symbol))?;
1173        let current_size = account.option_positions[option_index].size;
1174        if current_size >= Decimal::ZERO {
1175            return Err(anyhow!(
1176                "Expected short standard option position for {}, found {}",
1177                symbol,
1178                current_size
1179            ));
1180        }
1181        if close_size > current_size.abs() + PARTIAL_LIQ_MIN_SIZE {
1182            return Err(anyhow!(
1183                "Close size {} exceeds standard short position {} for {}",
1184                close_size,
1185                current_size.abs(),
1186                symbol
1187            ));
1188        }
1189
1190        account.usdc_balance -= execution_price * close_size;
1191        account.option_positions[option_index].size += close_size;
1192        if account.option_positions[option_index].size.abs() <= PARTIAL_LIQ_MIN_SIZE {
1193            account.option_positions.remove(option_index);
1194        }
1195
1196        Ok(())
1197    }
1198
1199    fn apply_pm_option_close(
1200        &self,
1201        snapshot: &mut PortfolioMarginSnapshot,
1202        symbol: &str,
1203        close_size: Decimal,
1204        execution_price: Decimal,
1205    ) -> anyhow::Result<()> {
1206        if close_size <= PARTIAL_LIQ_MIN_SIZE {
1207            return Ok(());
1208        }
1209
1210        let parsed = ParsedSymbol::from_symbol(symbol)
1211            .map_err(|error| anyhow!("Failed to parse PM option symbol {}: {}", symbol, error))?;
1212        let option_key = PortfolioMarginOptionKey {
1213            underlying: parsed.underlying.clone(),
1214            option_type: parsed.option_type,
1215            strike: parsed.strike,
1216            expiry_ts: hypercall_types::expiry_date_to_timestamp(&parsed.underlying, parsed.expiry),
1217        };
1218
1219        let underlying_snapshot = snapshot
1220            .underlyings
1221            .iter_mut()
1222            .find(|underlying| underlying.underlying == option_key.underlying)
1223            .ok_or_else(|| anyhow!("Missing PM underlying snapshot for {}", symbol))?;
1224
1225        let option_exposure = underlying_snapshot
1226            .executed_options
1227            .iter_mut()
1228            .find(|option| option.key == option_key)
1229            .ok_or_else(|| anyhow!("Missing PM executed option exposure for {}", symbol))?;
1230        if option_exposure.quantity >= Decimal::ZERO {
1231            return Err(anyhow!(
1232                "Expected short PM option exposure for {}, found {}",
1233                symbol,
1234                option_exposure.quantity
1235            ));
1236        }
1237        if close_size > option_exposure.quantity.abs() + PARTIAL_LIQ_MIN_SIZE {
1238            return Err(anyhow!(
1239                "Close size {} exceeds PM short position {} for {}",
1240                close_size,
1241                option_exposure.quantity.abs(),
1242                symbol
1243            ));
1244        }
1245
1246        let signed_closed_quantity = -close_size;
1247        snapshot.cash_balance +=
1248            signed_closed_quantity * (execution_price - option_exposure.entry_price);
1249        option_exposure.quantity += close_size;
1250
1251        underlying_snapshot
1252            .executed_options
1253            .retain(|option| option.quantity.abs() > PARTIAL_LIQ_MIN_SIZE);
1254        snapshot.underlyings.retain(|underlying| {
1255            !underlying.executed_options.is_empty()
1256                || !underlying.executed_perps.is_empty()
1257                || !underlying.hypothetical_open_order_options.is_empty()
1258                || !underlying.hypothetical_open_order_perps.is_empty()
1259        });
1260
1261        Ok(())
1262    }
1263
1264    async fn partial_execution_prices(
1265        &self,
1266        wallet: &WalletAddress,
1267        bonus_bps: u32,
1268    ) -> anyhow::Result<HashMap<String, Decimal>> {
1269        let positions = self.current_positions(wallet).await;
1270        let mut prices = HashMap::new();
1271
1272        for (symbol, position) in positions {
1273            if position.amount >= Decimal::ZERO {
1274                continue;
1275            }
1276            if ParsedSymbol::from_symbol(&symbol).is_err() {
1277                continue;
1278            }
1279
1280            let fallback_mark_price = derive_portfolio_mark_price(&position)
1281                .with_context(|| format!("Failed to derive fallback mark for {}", symbol))?;
1282            let reference_price = self
1283                .quote_reference_price(&symbol)
1284                .unwrap_or(fallback_mark_price);
1285            if reference_price <= Decimal::ZERO {
1286                return Err(anyhow!(
1287                    "Expected positive liquidation reference price for {}, got {}",
1288                    symbol,
1289                    reference_price
1290                ));
1291            }
1292
1293            prices.insert(
1294                symbol,
1295                reference_price * (Decimal::ONE + (Decimal::from(bonus_bps) / dec!(10000))),
1296            );
1297        }
1298
1299        Ok(prices)
1300    }
1301
1302    fn quote_reference_price(&self, symbol: &str) -> Option<Decimal> {
1303        // 1. Try BBO from orderbook
1304        if let Some(quote_provider) = self.quote_provider.as_ref() {
1305            if let Some(quote) = quote_provider.get_quote(symbol) {
1306                let bbo_price = quote
1307                    .best_ask
1308                    .filter(|price| *price > 0.0)
1309                    .or_else(|| quote.mid.filter(|price| *price > 0.0))
1310                    .or_else(|| quote.best_bid.filter(|price| *price > 0.0));
1311                if let Some(price) = bbo_price {
1312                    return Some(decimal_from_f64(price, "partial_quote_bbo"));
1313                }
1314            }
1315        }
1316
1317        // 2. Fall back to theoretical price from vol surface
1318        if let Some(greeks_cache) = &self.greeks_cache {
1319            if let Ok(theo) =
1320                futures::executor::block_on(greeks_cache.get_theoretical_price(symbol))
1321            {
1322                if theo > 0.0 && theo.is_finite() {
1323                    return Some(decimal_from_f64(theo, "partial_quote_theo"));
1324                }
1325            }
1326        }
1327
1328        None
1329    }
1330
1331    async fn current_positions(&self, wallet: &WalletAddress) -> HashMap<String, PositionData> {
1332        self.portfolio_service
1333            .get_portfolio_balance(wallet)
1334            .await
1335            .unwrap_or_default()
1336            .positions
1337    }
1338
1339    async fn try_current_position_sizes(
1340        &self,
1341        wallet: &WalletAddress,
1342    ) -> Option<HashMap<String, Decimal>> {
1343        self.portfolio_service
1344            .get_portfolio_balance(wallet)
1345            .await
1346            .map(|balance| {
1347                balance
1348                    .positions
1349                    .into_iter()
1350                    .map(|(symbol, position)| (symbol, position.amount))
1351                    .collect()
1352            })
1353    }
1354
1355    fn is_order_reduce_only(
1356        &self,
1357        current_positions: &HashMap<String, Decimal>,
1358        order: &RuntimeOrderSummary,
1359    ) -> bool {
1360        let current_position = current_positions
1361            .get(&order.symbol)
1362            .copied()
1363            .unwrap_or(Decimal::ZERO);
1364        let signed_order_size = if matches!(order.side, Side::Buy) {
1365            order.remaining_size
1366        } else {
1367            -order.remaining_size
1368        };
1369        let new_position = current_position + signed_order_size;
1370        LiquidationManager::is_position_reduce_only(current_position, new_position)
1371    }
1372
1373    fn reduce_only_open_order_ids(
1374        &self,
1375        current_positions: &HashMap<String, Decimal>,
1376        open_orders: &[RuntimeOrderSummary],
1377    ) -> HashSet<u64> {
1378        let mut remaining_positions = current_positions.clone();
1379        let mut sorted_orders: Vec<&RuntimeOrderSummary> = open_orders.iter().collect();
1380        sorted_orders.sort_by_key(|order| (order.created_at, order.order_id));
1381
1382        let mut reduce_only_order_ids = HashSet::new();
1383        for order in sorted_orders {
1384            if !self.is_order_reduce_only(&remaining_positions, order) {
1385                continue;
1386            }
1387
1388            let current_position = remaining_positions
1389                .get(&order.symbol)
1390                .copied()
1391                .unwrap_or(Decimal::ZERO);
1392            let signed_order_size = if matches!(order.side, Side::Buy) {
1393                order.remaining_size
1394            } else {
1395                -order.remaining_size
1396            };
1397            remaining_positions.insert(order.symbol.clone(), current_position + signed_order_size);
1398            reduce_only_order_ids.insert(order.order_id);
1399        }
1400
1401        reduce_only_order_ids
1402    }
1403
1404    async fn cancel_open_order(
1405        &self,
1406        wallet: &WalletAddress,
1407        target: &CancelTarget,
1408    ) -> anyhow::Result<()> {
1409        let request_id = Uuid::now_v7().to_string();
1410        let order_info = OrderInfo {
1411            symbol: String::new(),
1412            price: Decimal::ZERO,
1413            size: Decimal::ZERO,
1414            side: Side::Buy,
1415            tif: TimeInForce::GTC,
1416            client_id: target.client_id.clone(),
1417            order_id: Some(target.order_id),
1418            is_perp: false,
1419            underlying: None,
1420            reduce_only: None,
1421            nonce: None,
1422            signature: None,
1423            mmp_enabled: false,
1424            builder_code_address: None,
1425        };
1426
1427        let response = self
1428            .send_engine_order(OrderActionMessage {
1429                timestamp: get_timestamp_millis(),
1430                info: order_info,
1431                action: OrderAction::CancelOrder,
1432                wallet: *wallet,
1433                api_wallet_address: Some(*wallet),
1434                mmp_triggered: false,
1435                request_id: Some(request_id),
1436            })
1437            .await?;
1438
1439        let is_not_found = response.status == OrderUpdateStatus::Rejected
1440            && response.reason.as_deref().is_some_and(|reason| {
1441                reason.contains("already been completed")
1442                    || reason.contains("already filled")
1443                    || reason.contains("already cancelled")
1444                    || reason.contains("already canceled")
1445                    || reason.contains("not found")
1446            });
1447
1448        if response.status == OrderUpdateStatus::Canceled || is_not_found {
1449            return Ok(());
1450        }
1451
1452        Err(anyhow!(
1453            "Cancel rejected for wallet {} order {}: {}",
1454            wallet,
1455            target.order_id,
1456            response
1457                .reason
1458                .as_deref()
1459                .unwrap_or("missing cancellation reason")
1460        ))
1461    }
1462
1463    async fn submit_partial_liquidation_order(
1464        &self,
1465        wallet: &WalletAddress,
1466        plan: &PartialLiquidationOrderPlan,
1467        now: u64,
1468        order_index: usize,
1469    ) -> anyhow::Result<(String, String, OrderUpdateMessage)> {
1470        let request_id = Uuid::now_v7().to_string();
1471        let client_id = format!("liq-{}-{}-{}", wallet, now, order_index);
1472
1473        let response = self
1474            .send_engine_order(OrderActionMessage {
1475                timestamp: get_timestamp_millis(),
1476                info: OrderInfo {
1477                    symbol: plan.symbol.clone(),
1478                    price: plan.limit_price,
1479                    size: to_contract_units_decimal(&plan.symbol, plan.close_size),
1480                    side: Side::Buy,
1481                    tif: TimeInForce::GTC,
1482                    client_id: Some(client_id.clone()),
1483                    order_id: None,
1484                    is_perp: false,
1485                    underlying: None,
1486                    reduce_only: Some(true),
1487                    nonce: None,
1488                    signature: None,
1489                    mmp_enabled: false,
1490                    builder_code_address: None,
1491                },
1492                action: OrderAction::CreateOrder,
1493                wallet: *wallet,
1494                api_wallet_address: Some(*wallet),
1495                mmp_triggered: false,
1496                request_id: Some(request_id.clone()),
1497            })
1498            .await?;
1499
1500        Ok((request_id, client_id, response))
1501    }
1502
1503    async fn send_engine_order(
1504        &self,
1505        message: OrderActionMessage,
1506    ) -> anyhow::Result<OrderUpdateMessage> {
1507        let order_sender = self
1508            .order_sender
1509            .as_ref()
1510            .context("order sender not configured")?;
1511        let (response_tx, mut response_rx) = mpsc::channel(1);
1512        let request = UnifiedEngineRequest {
1513            message,
1514            response_tx,
1515            enqueued_at: Instant::now(),
1516            #[cfg(feature = "otel-tracing")]
1517            trace_context: None,
1518        };
1519
1520        increment_pending_requests();
1521        order_sender
1522            .send(request)
1523            .await
1524            .context("failed to send liquidation order to engine")?;
1525
1526        match timeout(ENGINE_RESPONSE_TIMEOUT, response_rx.recv()).await {
1527            Ok(Some(response)) => Ok(response),
1528            Ok(None) => Err(anyhow!("engine closed liquidation response channel")),
1529            Err(_) => Err(anyhow!("timed out waiting for liquidation engine response")),
1530        }
1531    }
1532
1533    /// Emit a state change event.
1534    fn emit_state_change(
1535        &self,
1536        previous: &LiquidationState,
1537        status: &AccountLiquidationStatus,
1538        timestamp: u64,
1539    ) {
1540        info!(
1541            "Liquidation state change: wallet={}, {} -> {}",
1542            status.wallet,
1543            previous.as_str(),
1544            status.state.as_str()
1545        );
1546
1547        // Publish to event bus via event sender
1548        if let Some(sender) = &self.event_sender {
1549            let msg = LiquidationStateMessage {
1550                wallet: status.wallet,
1551                previous_state: liquidation_state_to_type(previous),
1552                new_state: liquidation_state_to_type(&status.state),
1553                previous_liquidation_mode: previous
1554                    .liquidation_mode()
1555                    .map(|mode| mode.as_str().to_string()),
1556                liquidation_mode: status
1557                    .liquidation_mode()
1558                    .map(|mode| mode.as_str().to_string()),
1559                margin_mode: status.margin_mode.as_str().to_string(),
1560                equity: status.equity,
1561                mm_required: status.mm_required,
1562                maintenance_margin: status.maintenance_margin,
1563                shortfall: status.shortfall(),
1564                previous_auction_id: previous.auction_id().map(ToOwned::to_owned),
1565                projection_changed: has_material_projection_change(previous, &status.state),
1566                auction_id: status
1567                    .state
1568                    .auction_id()
1569                    .map(ToOwned::to_owned)
1570                    .or_else(|| previous.auction_id().map(ToOwned::to_owned)),
1571                status: status.clone(),
1572                timestamp,
1573            };
1574
1575            if let Err(error) = sender.send(EngineMessage::LiquidationStateChange(msg)) {
1576                warn!("Failed to send liquidation state change event: {}", error);
1577            }
1578        }
1579    }
1580}
1581
1582/// Get current timestamp in milliseconds.
1583fn get_timestamp_millis() -> u64 {
1584    std::time::SystemTime::now()
1585        .duration_since(std::time::UNIX_EPOCH)
1586        .unwrap_or_default()
1587        .as_millis() as u64
1588}
1589
1590/// Convert LiquidationState to LiquidationStateType for messages.
1591fn liquidation_state_to_type(state: &LiquidationState) -> LiquidationStateType {
1592    match state {
1593        LiquidationState::Healthy => LiquidationStateType::Healthy,
1594        LiquidationState::PreLiquidation(..) => LiquidationStateType::PreLiquidation,
1595        LiquidationState::InLiquidation(..) => LiquidationStateType::InLiquidation,
1596        LiquidationState::Liquidated(..) => LiquidationStateType::Liquidated,
1597    }
1598}
1599
1600fn decimal_from_f64(value: f64, field: &str) -> Decimal {
1601    if !value.is_finite() {
1602        warn!(
1603            "Liquidation {} value {} is not finite, clamping to zero",
1604            field, value
1605        );
1606        return Decimal::ZERO;
1607    }
1608    Decimal::from_f64_retain(value).unwrap_or_else(|| {
1609        warn!(
1610            "Liquidation {} value {} is not representable as Decimal, clamping to zero",
1611            field, value
1612        );
1613        Decimal::ZERO
1614    })
1615}
1616
1617fn derive_portfolio_mark_price(position: &PositionData) -> anyhow::Result<Decimal> {
1618    if position.amount.abs() <= PARTIAL_LIQ_MIN_SIZE {
1619        return Err(anyhow!(
1620            "position amount {} is too small to derive mark price for {}",
1621            position.amount,
1622            position.symbol
1623        ));
1624    }
1625
1626    Ok(position.entry_price + (position.unrealized_pnl / position.amount))
1627}
1628
1629fn merge_partial_plan(
1630    aggregated_plans: &mut Vec<PartialLiquidationOrderPlan>,
1631    plan: PartialLiquidationOrderPlan,
1632) {
1633    if let Some(existing_plan) = aggregated_plans
1634        .iter_mut()
1635        .find(|existing| existing.symbol == plan.symbol && existing.limit_price == plan.limit_price)
1636    {
1637        existing_plan.close_size += plan.close_size;
1638        existing_plan.expected_mm_relief += plan.expected_mm_relief;
1639        return;
1640    }
1641
1642    aggregated_plans.push(plan);
1643}
1644
1645fn format_pm_option_symbol(key: &PortfolioMarginOptionKey) -> String {
1646    format!(
1647        "{}-{}-{}-{}",
1648        key.underlying,
1649        chrono::DateTime::from_timestamp(key.expiry_ts, 0)
1650            .expect("option expiry_ts must be valid")
1651            .format("%Y%m%d"),
1652        key.strike.normalize(),
1653        match key.option_type {
1654            crate::types::OptionType::Call => "C",
1655            crate::types::OptionType::Put => "P",
1656        }
1657    )
1658}
1659
1660impl LiquidationWatcher {
1661    async fn has_short_option_positions(&self, wallet: &WalletAddress) -> Option<bool> {
1662        let Some(balance) = self.portfolio_service.get_portfolio_balance(wallet).await else {
1663            return None;
1664        };
1665
1666        Some(balance.positions.iter().any(|(symbol, position)| {
1667            position.amount < Decimal::ZERO && ParsedSymbol::from_symbol(symbol).is_ok()
1668        }))
1669    }
1670}
1671
1672#[async_trait::async_trait]
1673impl crate::shared::service::Service for LiquidationWatcher {
1674    fn name(&self) -> &'static str {
1675        "LiquidationWatcher"
1676    }
1677
1678    fn owner(&self) -> crate::shared::service::ServiceOwner {
1679        crate::shared::service::ServiceOwner::Engine
1680    }
1681
1682    async fn run(
1683        self: std::sync::Arc<Self>,
1684        shutdown: crate::shared::ShutdownRx,
1685    ) -> anyhow::Result<()> {
1686        self.run_with_shutdown(shutdown).await;
1687        Ok(())
1688    }
1689}
1690
1691#[cfg(test)]
1692mod tests {
1693    use super::*;
1694    use crate::portfolio::{PortfolioBalance, PortfolioChange, PortfolioError};
1695    use crate::rsm::ledger::{InMemoryLedger, Ledger};
1696    use crate::rsm::portfolio_margin::risk_account_builder::SpotPriceSource;
1697    use crate::types::{Config, Scenario, ScenarioType};
1698    use async_trait::async_trait;
1699    use hypercall_engine::FeeConfig;
1700    use hypercall_runtime_api::RuntimeOrderSummary;
1701    use hypercall_types::api_models::Portfolio;
1702    use hypercall_types::{to_human_readable_decimal, wallet_address::test_wallet};
1703    use std::any::Any;
1704    use std::collections::HashMap;
1705    use std::sync::atomic::{AtomicU64, Ordering};
1706    use std::sync::RwLock;
1707    use tokio::sync::RwLock as AsyncRwLock;
1708    use tokio::task::JoinHandle;
1709
1710    #[derive(Default)]
1711    struct MockPortfolioService {
1712        balances: AsyncRwLock<HashMap<WalletAddress, PortfolioBalance>>,
1713    }
1714
1715    impl MockPortfolioService {
1716        async fn set_balance(&self, wallet: WalletAddress, balance: PortfolioBalance) {
1717            self.balances.write().await.insert(wallet, balance);
1718        }
1719
1720        async fn clear_balance(&self, wallet: &WalletAddress) {
1721            self.balances.write().await.remove(wallet);
1722        }
1723    }
1724
1725    #[async_trait]
1726    impl PortfolioService for MockPortfolioService {
1727        async fn get_portfolio(&self, account: &WalletAddress) -> Portfolio {
1728            let balance = self
1729                .get_portfolio_balance(account)
1730                .await
1731                .unwrap_or_default();
1732            Portfolio {
1733                wallet_address: *account,
1734                positions: balance
1735                    .positions
1736                    .values()
1737                    .cloned()
1738                    .map(
1739                        |position| hypercall_types::api_models::PositionWithMetrics {
1740                            position: hypercall_types::api_models::Position {
1741                                wallet_address: *account,
1742                                symbol: position.symbol,
1743                                amount: position.amount,
1744                                entry_price: position.entry_price,
1745                                margin_posted: position.margin_posted,
1746                                realized_pnl: position.realized_pnl,
1747                                unrealized_pnl: position.unrealized_pnl,
1748                                updated_at: chrono::Utc::now(),
1749                            },
1750                            notional_value: Decimal::ZERO,
1751                            maintenance_margin: Decimal::ZERO,
1752                            liquidation_price: Decimal::ZERO,
1753                            margin_ratio: Decimal::ZERO,
1754                        },
1755                    )
1756                    .collect(),
1757                total_margin_used: balance.total_margin_used,
1758                available_balance: Decimal::ZERO,
1759                span_margin: None,
1760                margin_mode: "standard".to_string(),
1761                margin_summary: None,
1762            }
1763        }
1764
1765        async fn get_portfolio_balance(&self, account: &WalletAddress) -> Option<PortfolioBalance> {
1766            self.balances.read().await.get(account).cloned()
1767        }
1768
1769        async fn all_portfolios(&self) -> HashMap<WalletAddress, PortfolioBalance> {
1770            self.balances.read().await.clone()
1771        }
1772
1773        async fn apply_event(
1774            &self,
1775            _event: &hypercall_types::EngineMessage,
1776        ) -> Result<Vec<PortfolioChange>, PortfolioError> {
1777            Ok(Vec::new())
1778        }
1779
1780        async fn remove_expired_position(&self, _wallet: &WalletAddress, _symbol: &str) {}
1781
1782        async fn apply_hypercore_position_update(
1783            &self,
1784            _update: &crate::portfolio::HypercorePositionUpdate,
1785        ) {
1786        }
1787
1788        async fn set_hypercore_position(
1789            &self,
1790            _update: &crate::portfolio::HypercorePositionUpdate,
1791        ) {
1792        }
1793
1794        fn as_any(&self) -> &dyn Any {
1795            self
1796        }
1797
1798        async fn calculate_fill_accounting(
1799            &self,
1800            fill: &hypercall_types::Fill,
1801        ) -> Result<hypercall_types::FillAccounting, crate::portfolio::PortfolioError> {
1802            Ok(hypercall_types::FillAccounting::zero(fill.trade_id))
1803        }
1804
1805        async fn apply_fill_to_memory(
1806            &self,
1807            _wallet: &WalletAddress,
1808            _symbol: &str,
1809            _side: &Side,
1810            _price: Decimal,
1811            _quantity: Decimal,
1812        ) {
1813        }
1814    }
1815
1816    struct MockSpotPriceSource {
1817        prices: HashMap<String, f64>,
1818    }
1819
1820    #[async_trait]
1821    impl SpotPriceSource for MockSpotPriceSource {
1822        async fn get_spot_price(&self, underlying: &str) -> Option<f64> {
1823            self.prices.get(underlying).copied()
1824        }
1825    }
1826
1827    #[derive(Default)]
1828    struct MutableOrderSnapshotProvider {
1829        orders: RwLock<HashMap<WalletAddress, Vec<RuntimeOrderSummary>>>,
1830    }
1831
1832    impl MutableOrderSnapshotProvider {
1833        fn insert_order(&self, wallet: WalletAddress, order: RuntimeOrderSummary) {
1834            self.orders
1835                .write()
1836                .expect("order lock poisoned")
1837                .entry(wallet)
1838                .or_default()
1839                .push(order);
1840        }
1841
1842        fn remove_by_order_id(&self, wallet: &WalletAddress, order_id: u64) -> bool {
1843            let mut orders = self.orders.write().expect("order lock poisoned");
1844            let Some(wallet_orders) = orders.get_mut(wallet) else {
1845                return false;
1846            };
1847            let original_len = wallet_orders.len();
1848            wallet_orders.retain(|order| order.order_id != order_id);
1849            wallet_orders.len() != original_len
1850        }
1851
1852        fn clear_wallet(&self, wallet: &WalletAddress) {
1853            self.orders
1854                .write()
1855                .expect("order lock poisoned")
1856                .remove(wallet);
1857        }
1858    }
1859
1860    impl OrderSnapshotProvider for MutableOrderSnapshotProvider {
1861        fn get_open_orders_for_wallet(&self, wallet: &WalletAddress) -> Vec<RuntimeOrderSummary> {
1862            self.orders
1863                .read()
1864                .expect("order lock poisoned")
1865                .get(wallet)
1866                .cloned()
1867                .unwrap_or_default()
1868        }
1869
1870        fn get_all_orders(&self) -> Vec<(RuntimeOrderSummary, WalletAddress)> {
1871            self.orders
1872                .read()
1873                .expect("order lock poisoned")
1874                .iter()
1875                .flat_map(|(wallet, orders)| {
1876                    orders.iter().cloned().map(move |order| (order, *wallet))
1877                })
1878                .collect()
1879        }
1880    }
1881
1882    fn test_margin_config() -> Config {
1883        Config {
1884            risk_free_rate: 0.05,
1885            base_volatility: 0.8,
1886            base_skew: 0.0,
1887            base_excess_kurtosis: 0.0,
1888            scenarios: vec![
1889                Scenario {
1890                    scenario_type: ScenarioType::SpotChange,
1891                    value: 0.15,
1892                },
1893                Scenario {
1894                    scenario_type: ScenarioType::SpotChange,
1895                    value: -0.15,
1896                },
1897            ],
1898            delta_threshold: 0.0001,
1899            strike_match_tolerance: 0.01,
1900            expiry_match_tolerance_years: 0.001,
1901            allow_standard_margin_shorts: false,
1902            fee_config: FeeConfig::default(),
1903        }
1904    }
1905
1906    fn liquidatable_health(wallet: &WalletAddress) -> LiquidationHealthResult {
1907        LiquidationHealthResult {
1908            wallet: wallet.to_string(),
1909            margin_mode: MarginMode::Standard,
1910            is_liquidatable: true,
1911            maintenance_margin: -200.0,
1912            equity: 100.0,
1913            mm_required: 300.0,
1914        }
1915    }
1916
1917    fn recovered_health(wallet: &WalletAddress) -> LiquidationHealthResult {
1918        LiquidationHealthResult {
1919            wallet: wallet.to_string(),
1920            margin_mode: MarginMode::Standard,
1921            is_liquidatable: false,
1922            maintenance_margin: 120.0,
1923            equity: 420.0,
1924            mm_required: 300.0,
1925        }
1926    }
1927
1928    struct StandardWatcherHarness {
1929        wallet: WalletAddress,
1930        symbol: String,
1931        cache: Arc<LiquidationCache>,
1932        portfolio_service: Arc<MockPortfolioService>,
1933        order_snapshot: Arc<MutableOrderSnapshotProvider>,
1934        watcher: LiquidationWatcher,
1935        engine_task: JoinHandle<()>,
1936    }
1937
1938    async fn setup_standard_partial_liquidation_harness(
1939        reject_partial_orders: bool,
1940    ) -> StandardWatcherHarness {
1941        let wallet = test_wallet(42);
1942        let symbol = "BTC-20261231-100000-C".to_string();
1943        let cache = Arc::new(LiquidationCache::new());
1944        let portfolio_service = Arc::new(MockPortfolioService::default());
1945        portfolio_service
1946            .set_balance(
1947                wallet,
1948                PortfolioBalance {
1949                    positions: HashMap::from([(
1950                        symbol.clone(),
1951                        PositionData {
1952                            symbol: symbol.clone(),
1953                            amount: dec!(-4),
1954                            entry_price: dec!(100),
1955                            margin_posted: Decimal::ZERO,
1956                            realized_pnl: Decimal::ZERO,
1957                            unrealized_pnl: Decimal::ZERO,
1958                        },
1959                    )]),
1960                    total_margin_used: Decimal::ZERO,
1961                },
1962            )
1963            .await;
1964
1965        let ledger = Arc::new(InMemoryLedger::new());
1966        ledger
1967            .set_balance(&wallet, dec!(100))
1968            .await
1969            .expect("ledger balance should be seeded");
1970
1971        let standard_account_builder = Arc::new(StandardAccountBuilder::new(
1972            ledger,
1973            portfolio_service.clone(),
1974            Arc::new(MockSpotPriceSource {
1975                prices: HashMap::from([("BTC".to_string(), 105000.0)]),
1976            }),
1977        ));
1978        let quote_provider = Arc::new(crate::rsm::engine_snapshot::MockQuoteProvider::new());
1979        quote_provider.set_quote(
1980            &symbol,
1981            hypercall_runtime_api::SnapshotBookQuote {
1982                best_bid: Some(149.0),
1983                best_bid_size: Some(10.0),
1984                best_ask: Some(150.0),
1985                best_ask_size: Some(10.0),
1986                mid: Some(149.5),
1987                bids: vec![(149.0, 10.0)],
1988                asks: vec![(150.0, 10.0)],
1989            },
1990        );
1991
1992        let order_snapshot = Arc::new(MutableOrderSnapshotProvider::default());
1993        order_snapshot.insert_order(
1994            wallet,
1995            RuntimeOrderSummary {
1996                order_id: 7,
1997                symbol: symbol.clone(),
1998                side: Side::Sell,
1999                price: dec!(95),
2000                original_size: dec!(1),
2001                remaining_size: dec!(1),
2002                is_perp: false,
2003                mmp_enabled: false,
2004                client_id: Some("risk-increasing".to_string()),
2005                created_at: 1,
2006            },
2007        );
2008
2009        let (order_tx, mut order_rx) = mpsc::channel::<UnifiedEngineRequest>(8);
2010        let next_order_id = Arc::new(AtomicU64::new(1_000));
2011        let mock_orders = order_snapshot.clone();
2012        let order_id_counter = next_order_id.clone();
2013        let engine_task = tokio::spawn(async move {
2014            while let Some(request) = order_rx.recv().await {
2015                let message = request.message.clone();
2016                let response = match message.action {
2017                    OrderAction::CreateOrder if reject_partial_orders => OrderUpdateMessage {
2018                        timestamp: message.timestamp,
2019                        info: message.info,
2020                        status: OrderUpdateStatus::Rejected,
2021                        reason: Some("forced rejection".to_string()),
2022                        filled_size: Decimal::ZERO,
2023                        order_id: None,
2024                        wallet_address: message.wallet,
2025                        mmp_triggered: false,
2026                        request_id: message.request_id,
2027                    },
2028                    OrderAction::CreateOrder => {
2029                        let order_id = order_id_counter.fetch_add(1, Ordering::Relaxed);
2030                        let human_size =
2031                            to_human_readable_decimal(&message.info.symbol, message.info.size);
2032                        mock_orders.insert_order(
2033                            message.wallet,
2034                            RuntimeOrderSummary {
2035                                order_id,
2036                                symbol: message.info.symbol.clone(),
2037                                side: message.info.side,
2038                                price: message.info.price,
2039                                original_size: human_size,
2040                                remaining_size: human_size,
2041                                is_perp: message.info.is_perp,
2042                                mmp_enabled: message.info.mmp_enabled,
2043                                client_id: message.info.client_id.clone(),
2044                                created_at: message.timestamp as i64,
2045                            },
2046                        );
2047                        OrderUpdateMessage {
2048                            timestamp: message.timestamp,
2049                            info: message.info,
2050                            status: OrderUpdateStatus::Acked,
2051                            reason: None,
2052                            filled_size: Decimal::ZERO,
2053                            order_id: Some(order_id),
2054                            wallet_address: message.wallet,
2055                            mmp_triggered: false,
2056                            request_id: message.request_id,
2057                        }
2058                    }
2059                    OrderAction::CancelOrder => {
2060                        let removed = message
2061                            .info
2062                            .order_id
2063                            .map(|order_id| {
2064                                mock_orders.remove_by_order_id(&message.wallet, order_id)
2065                            })
2066                            .unwrap_or(false);
2067                        OrderUpdateMessage {
2068                            timestamp: message.timestamp,
2069                            info: message.info,
2070                            status: if removed {
2071                                OrderUpdateStatus::Canceled
2072                            } else {
2073                                OrderUpdateStatus::Rejected
2074                            },
2075                            reason: if removed {
2076                                None
2077                            } else {
2078                                Some("order not found".to_string())
2079                            },
2080                            filled_size: Decimal::ZERO,
2081                            order_id: None,
2082                            wallet_address: message.wallet,
2083                            mmp_triggered: false,
2084                            request_id: message.request_id,
2085                        }
2086                    }
2087                    OrderAction::ReplaceOrder => OrderUpdateMessage {
2088                        timestamp: message.timestamp,
2089                        info: message.info,
2090                        status: OrderUpdateStatus::Rejected,
2091                        reason: Some(
2092                            "replace orders unsupported in liquidation mock engine".to_string(),
2093                        ),
2094                        filled_size: Decimal::ZERO,
2095                        order_id: None,
2096                        wallet_address: message.wallet,
2097                        mmp_triggered: false,
2098                        request_id: message.request_id,
2099                    },
2100                };
2101
2102                request
2103                    .response_tx
2104                    .send(response)
2105                    .await
2106                    .expect("mock engine should deliver response");
2107            }
2108        });
2109
2110        let watcher = LiquidationWatcher::new_for_tests_without_tier_cache(
2111            cache.clone(),
2112            portfolio_service.clone(),
2113            Arc::new(SpanMarginService::new_for_tests(test_margin_config())),
2114            Arc::new(crate::standard_margin::StandardMarginService::new()),
2115            LiquidationWatcherConfig {
2116                poll_interval: Duration::from_secs(5),
2117                full_escalation_timeout: Duration::from_secs(60),
2118                min_shortfall_threshold: Decimal::ZERO,
2119                partial_target_buffer_bps: 500,
2120                partial_reprice_interval: Duration::from_secs(5),
2121                partial_bonus_start_bps: 50,
2122                partial_bonus_max_bps: 500,
2123                partial_bonus_ramp_ms: 60_000,
2124            },
2125        )
2126        .with_standard_account_builder(standard_account_builder)
2127        .with_quote_provider(quote_provider)
2128        .with_order_snapshot(order_snapshot.clone())
2129        .with_order_sender(order_tx);
2130
2131        StandardWatcherHarness {
2132            wallet,
2133            symbol,
2134            cache,
2135            portfolio_service,
2136            order_snapshot,
2137            watcher,
2138            engine_task,
2139        }
2140    }
2141
2142    #[test]
2143    fn test_config_default() {
2144        let config = LiquidationWatcherConfig::default();
2145        assert_eq!(config.poll_interval, Duration::from_secs(5));
2146        assert_eq!(config.full_escalation_timeout, Duration::from_secs(60));
2147        assert_eq!(config.min_shortfall_threshold, dec!(0));
2148        assert_eq!(config.partial_target_buffer_bps, 500);
2149        assert_eq!(config.partial_reprice_interval, Duration::from_secs(5));
2150        assert_eq!(config.partial_bonus_start_bps, 50);
2151        assert_eq!(config.partial_bonus_max_bps, 1_000);
2152        assert_eq!(config.partial_bonus_ramp_ms, 300_000);
2153    }
2154
2155    #[test]
2156    fn test_config_from_runtime() {
2157        let runtime = crate::backend_config::LiquidationRuntimeConfig {
2158            enabled: true,
2159            health_poll_interval_ms: 1_500,
2160            partial_target_buffer_bps: 250,
2161            partial_reprice_interval_ms: 2_500,
2162            partial_bonus_start_bps: 75,
2163            partial_bonus_max_bps: 500,
2164            partial_bonus_ramp_ms: 120_000,
2165            min_shortfall_threshold: dec!(42),
2166            full_escalation_timeout_ms: 9_000,
2167            full_target_buffer_bps: 700,
2168            chain_observer_poll_interval_ms: 1_000,
2169            chain_observer_max_lag_blocks: 16,
2170        };
2171
2172        let config = LiquidationWatcherConfig::from_runtime_config(&runtime);
2173        assert_eq!(config.poll_interval, Duration::from_millis(1_500));
2174        assert_eq!(config.full_escalation_timeout, Duration::from_millis(9_000));
2175        assert_eq!(config.min_shortfall_threshold, dec!(42));
2176        assert_eq!(config.partial_target_buffer_bps, 250);
2177        assert_eq!(
2178            config.partial_reprice_interval,
2179            Duration::from_millis(2_500)
2180        );
2181        assert_eq!(config.partial_bonus_start_bps, 75);
2182        assert_eq!(config.partial_bonus_max_bps, 500);
2183        assert_eq!(config.partial_bonus_ramp_ms, 120_000);
2184    }
2185
2186    #[test]
2187    fn test_merge_partial_plan_accumulates_same_symbol_and_price() {
2188        let mut plans = vec![PartialLiquidationOrderPlan {
2189            symbol: "BTC-20251231-100000-C".to_string(),
2190            close_size: dec!(1),
2191            limit_price: dec!(100),
2192            expected_mm_relief: dec!(50),
2193        }];
2194
2195        merge_partial_plan(
2196            &mut plans,
2197            PartialLiquidationOrderPlan {
2198                symbol: "BTC-20251231-100000-C".to_string(),
2199                close_size: dec!(2),
2200                limit_price: dec!(100),
2201                expected_mm_relief: dec!(75),
2202            },
2203        );
2204
2205        assert_eq!(plans.len(), 1);
2206        assert_eq!(plans[0].close_size, dec!(3));
2207        assert_eq!(plans[0].expected_mm_relief, dec!(125));
2208    }
2209
2210    #[test]
2211    fn test_derive_portfolio_mark_price() {
2212        let position = PositionData {
2213            symbol: "BTC-20251231-100000-C".to_string(),
2214            amount: dec!(-2),
2215            entry_price: dec!(100),
2216            margin_posted: Decimal::ZERO,
2217            realized_pnl: Decimal::ZERO,
2218            unrealized_pnl: dec!(20),
2219        };
2220
2221        assert_eq!(derive_portfolio_mark_price(&position).unwrap(), dec!(90));
2222    }
2223
2224    #[test]
2225    fn test_format_pm_option_symbol() {
2226        let symbol = format_pm_option_symbol(&PortfolioMarginOptionKey {
2227            underlying: "BTC".to_string(),
2228            option_type: crate::types::OptionType::Call,
2229            strike: dec!(100000),
2230            expiry_ts: hypercall_types::expiry_date_to_timestamp("BTC", 20251231),
2231        });
2232
2233        assert_eq!(symbol, "BTC-20251231-100000-C");
2234    }
2235
2236    #[tokio::test]
2237    async fn test_watcher_enters_partial_liquidation_and_recovers_after_orders_clear() {
2238        let harness = setup_standard_partial_liquidation_harness(false).await;
2239
2240        harness
2241            .watcher
2242            .process_health_check(
2243                &harness.wallet,
2244                &liquidatable_health(&harness.wallet),
2245                MarginMode::Standard,
2246                10_000,
2247            )
2248            .await;
2249
2250        let status = harness
2251            .cache
2252            .get_status(&harness.wallet)
2253            .await
2254            .expect("wallet should have liquidation state");
2255        let metadata = match status.state {
2256            LiquidationState::PreLiquidation(metadata) => metadata,
2257            other => panic!("expected pre-liquidation state, got {:?}", other),
2258        };
2259        assert_eq!(metadata.mm_shortfall, dec!(200));
2260        assert_eq!(metadata.target_equity, dec!(315.00));
2261        assert_eq!(metadata.active_order_client_ids.len(), 1);
2262        assert!(
2263            metadata
2264                .active_order_client_ids
2265                .iter()
2266                .all(|client_id| client_id.starts_with("liq-")),
2267            "partial liquidation orders should use liquidation client ids"
2268        );
2269
2270        let open_orders = harness
2271            .order_snapshot
2272            .get_open_orders_for_wallet(&harness.wallet);
2273        assert_eq!(open_orders.len(), 1);
2274        assert_eq!(open_orders[0].side, Side::Buy);
2275        assert_eq!(open_orders[0].symbol, harness.symbol);
2276        assert!(open_orders[0]
2277            .client_id
2278            .as_deref()
2279            .is_some_and(|client_id| client_id.starts_with("liq-")));
2280
2281        harness.order_snapshot.clear_wallet(&harness.wallet);
2282
2283        harness
2284            .watcher
2285            .process_health_check(
2286                &harness.wallet,
2287                &recovered_health(&harness.wallet),
2288                MarginMode::Standard,
2289                20_000,
2290            )
2291            .await;
2292
2293        let recovered_status = harness
2294            .cache
2295            .get_status(&harness.wallet)
2296            .await
2297            .expect("wallet should remain tracked after recovery");
2298        assert!(recovered_status.state.is_healthy());
2299
2300        harness.engine_task.abort();
2301    }
2302
2303    #[tokio::test]
2304    async fn test_watcher_requires_one_extra_cycle_to_recover_after_canceling_open_orders() {
2305        let harness = setup_standard_partial_liquidation_harness(false).await;
2306
2307        harness
2308            .watcher
2309            .process_health_check(
2310                &harness.wallet,
2311                &liquidatable_health(&harness.wallet),
2312                MarginMode::Standard,
2313                10_000,
2314            )
2315            .await;
2316
2317        harness
2318            .watcher
2319            .process_health_check(
2320                &harness.wallet,
2321                &recovered_health(&harness.wallet),
2322                MarginMode::Standard,
2323                20_000,
2324            )
2325            .await;
2326
2327        let intermediate_status = harness
2328            .cache
2329            .get_status(&harness.wallet)
2330            .await
2331            .expect("wallet should remain tracked after cancel cycle");
2332        let intermediate_metadata = match intermediate_status.state {
2333            LiquidationState::PreLiquidation(metadata) => metadata,
2334            other => panic!(
2335                "expected pre-liquidation state after cancel cycle, got {:?}",
2336                other
2337            ),
2338        };
2339        assert_eq!(intermediate_metadata.active_order_client_ids.len(), 1);
2340        assert!(
2341            harness
2342                .order_snapshot
2343                .get_open_orders_for_wallet(&harness.wallet)
2344                .is_empty(),
2345            "tracked liquidation orders should have been canceled in the recovery cycle"
2346        );
2347
2348        harness
2349            .watcher
2350            .process_health_check(
2351                &harness.wallet,
2352                &recovered_health(&harness.wallet),
2353                MarginMode::Standard,
2354                25_000,
2355            )
2356            .await;
2357
2358        let recovered_status = harness
2359            .cache
2360            .get_status(&harness.wallet)
2361            .await
2362            .expect("wallet should remain tracked after recovery");
2363        assert!(recovered_status.state.is_healthy());
2364
2365        harness.engine_task.abort();
2366    }
2367
2368    #[tokio::test]
2369    async fn test_sync_and_cancel_orders_cancels_excess_reduce_only_orders() {
2370        let harness = setup_standard_partial_liquidation_harness(false).await;
2371        harness.order_snapshot.clear_wallet(&harness.wallet);
2372        harness
2373            .portfolio_service
2374            .set_balance(
2375                harness.wallet,
2376                PortfolioBalance {
2377                    positions: HashMap::from([(
2378                        harness.symbol.clone(),
2379                        PositionData {
2380                            symbol: harness.symbol.clone(),
2381                            amount: dec!(-1),
2382                            entry_price: dec!(100),
2383                            margin_posted: Decimal::ZERO,
2384                            realized_pnl: Decimal::ZERO,
2385                            unrealized_pnl: Decimal::ZERO,
2386                        },
2387                    )]),
2388                    total_margin_used: Decimal::ZERO,
2389                },
2390            )
2391            .await;
2392        harness.order_snapshot.insert_order(
2393            harness.wallet,
2394            RuntimeOrderSummary {
2395                order_id: 100,
2396                symbol: harness.symbol.clone(),
2397                side: Side::Buy,
2398                price: dec!(150),
2399                original_size: dec!(1),
2400                remaining_size: dec!(1),
2401                is_perp: false,
2402                mmp_enabled: false,
2403                client_id: Some("close-1".to_string()),
2404                created_at: 10,
2405            },
2406        );
2407        harness.order_snapshot.insert_order(
2408            harness.wallet,
2409            RuntimeOrderSummary {
2410                order_id: 101,
2411                symbol: harness.symbol.clone(),
2412                side: Side::Buy,
2413                price: dec!(151),
2414                original_size: dec!(1),
2415                remaining_size: dec!(1),
2416                is_perp: false,
2417                mmp_enabled: false,
2418                client_id: Some("close-2".to_string()),
2419                created_at: 20,
2420            },
2421        );
2422
2423        harness
2424            .watcher
2425            .sync_and_cancel_orders(&harness.wallet, &[], false)
2426            .await
2427            .expect("sync should succeed");
2428
2429        let open_orders = harness
2430            .order_snapshot
2431            .get_open_orders_for_wallet(&harness.wallet);
2432        assert_eq!(open_orders.len(), 1);
2433        assert_eq!(open_orders[0].order_id, 100);
2434
2435        harness.engine_task.abort();
2436    }
2437
2438    #[tokio::test]
2439    async fn test_sync_and_cancel_orders_skips_when_portfolio_snapshot_missing() {
2440        let harness = setup_standard_partial_liquidation_harness(false).await;
2441        harness.order_snapshot.clear_wallet(&harness.wallet);
2442        harness
2443            .portfolio_service
2444            .clear_balance(&harness.wallet)
2445            .await;
2446        harness.order_snapshot.insert_order(
2447            harness.wallet,
2448            RuntimeOrderSummary {
2449                order_id: 100,
2450                symbol: harness.symbol.clone(),
2451                side: Side::Buy,
2452                price: dec!(150),
2453                original_size: dec!(1),
2454                remaining_size: dec!(1),
2455                is_perp: false,
2456                mmp_enabled: false,
2457                client_id: Some("liq-keep".to_string()),
2458                created_at: 10,
2459            },
2460        );
2461
2462        let sync_result = harness
2463            .watcher
2464            .sync_and_cancel_orders(&harness.wallet, &["liq-keep".to_string()], false)
2465            .await
2466            .expect("sync should succeed");
2467
2468        assert_eq!(sync_result.open_liquidation_client_ids, vec!["liq-keep"]);
2469        let open_orders = harness
2470            .order_snapshot
2471            .get_open_orders_for_wallet(&harness.wallet);
2472        assert_eq!(open_orders.len(), 1);
2473        assert_eq!(open_orders[0].order_id, 100);
2474
2475        harness.engine_task.abort();
2476    }
2477
2478    #[tokio::test]
2479    async fn test_watcher_keeps_pre_liquidation_when_partial_order_is_rejected() {
2480        let harness = setup_standard_partial_liquidation_harness(true).await;
2481
2482        harness
2483            .watcher
2484            .process_health_check(
2485                &harness.wallet,
2486                &liquidatable_health(&harness.wallet),
2487                MarginMode::Standard,
2488                10_000,
2489            )
2490            .await;
2491
2492        let status = harness
2493            .cache
2494            .get_status(&harness.wallet)
2495            .await
2496            .expect("wallet should have liquidation state");
2497        let metadata = match status.state {
2498            LiquidationState::PreLiquidation(metadata) => metadata,
2499            other => panic!("expected pre-liquidation state, got {:?}", other),
2500        };
2501
2502        assert!(
2503            metadata.active_order_request_ids.is_empty(),
2504            "rejected liquidation orders must not remain tracked as active requests"
2505        );
2506        assert!(
2507            metadata.active_order_client_ids.is_empty(),
2508            "rejected liquidation orders must not remain tracked as active client ids"
2509        );
2510        assert_eq!(metadata.last_reprice_at, Some(10_000));
2511        assert!(
2512            harness
2513                .order_snapshot
2514                .get_open_orders_for_wallet(&harness.wallet)
2515                .iter()
2516                .all(|order| !order
2517                    .client_id
2518                    .as_deref()
2519                    .is_some_and(|client_id| client_id.starts_with("liq-"))),
2520            "rejected liquidation orders must not appear in the open-order snapshot"
2521        );
2522
2523        harness.engine_task.abort();
2524    }
2525
2526    #[tokio::test]
2527    async fn test_watcher_continues_polling_tracked_wallets_after_positions_close() {
2528        let harness = setup_standard_partial_liquidation_harness(false).await;
2529
2530        harness
2531            .watcher
2532            .process_health_check(
2533                &harness.wallet,
2534                &liquidatable_health(&harness.wallet),
2535                MarginMode::Standard,
2536                10_000,
2537            )
2538            .await;
2539
2540        harness
2541            .portfolio_service
2542            .set_balance(
2543                harness.wallet,
2544                PortfolioBalance {
2545                    positions: HashMap::new(),
2546                    total_margin_used: Decimal::ZERO,
2547                },
2548            )
2549            .await;
2550
2551        harness.watcher.check_all_accounts().await.unwrap();
2552
2553        let intermediate_status = harness
2554            .cache
2555            .get_status(&harness.wallet)
2556            .await
2557            .expect("tracked wallet should still be processed after positions close");
2558        assert!(
2559            matches!(
2560                intermediate_status.state,
2561                LiquidationState::PreLiquidation(..)
2562            ),
2563            "first recovery cycle should cancel tracked liquidation orders before clearing state"
2564        );
2565        assert!(
2566            harness
2567                .order_snapshot
2568                .get_open_orders_for_wallet(&harness.wallet)
2569                .is_empty(),
2570            "tracked liquidation orders should be canceled even after positions close"
2571        );
2572
2573        harness.watcher.check_all_accounts().await.unwrap();
2574
2575        let recovered_status = harness
2576            .cache
2577            .get_status(&harness.wallet)
2578            .await
2579            .expect("wallet should remain tracked after recovery");
2580        assert!(recovered_status.state.is_healthy());
2581
2582        harness.engine_task.abort();
2583    }
2584}