Skip to main content

hypercall/rsm/
unified_engine.rs

1use super::bounded_idempotence_cache::BoundedIdempotenceCache;
2use super::engine_snapshot::EngineSnapshot;
3use crate::engine_enums_ext::EventTypeExt;
4use crate::portfolio::PortfolioService;
5use crate::price_oracle::hyperliquid_oracle::HyperliquidMarkPriceOracle;
6use crate::read_cache::greeks::GreeksCache;
7use crate::read_cache::portfolio::PortfolioCache;
8use crate::rsm::ledger::BalanceLedger;
9use crate::rsm::liquidation_manager::LiquidationManager;
10use crate::shared::order_types::{get_timestamp_millis, ParsedSymbol};
11use crate::shared::traits::MarkPriceOracle;
12use crate::snapshot::SyncStatus;
13use crate::standard_margin::StandardAccountBuilder;
14use crate::types::Config;
15use arc_swap::ArcSwap;
16use hypercall_db::{InstrumentReader, InstrumentWriter, JournalReplayReader, OrderReader};
17use hypercall_db_diesel::DatabaseHandler;
18use hypercall_engine::{FeeService, MatchResult, OrderBook};
19use hypercall_journal::checkpoint::WalCheckpointMetadata;
20use hypercall_types::Side;
21use hypercall_types::{to_human_readable_decimal, WalletAddress};
22use hypercall_types::{
23    EngineMessage, Fill, MarketAction, MarketActionMessage, MarketUpdateMessage,
24    MarketUpdateStatus, OptionType as MessageOptionType, OrderAction, OrderActionMessage,
25    OrderInfo, OrderUpdateMessage, OrderUpdateStatus, TimeInForce,
26};
27use metrics::{counter, gauge, histogram};
28use rust_decimal::prelude::ToPrimitive;
29use rust_decimal::Decimal;
30use rust_decimal_macros::dec;
31use std::collections::HashMap;
32use std::path::PathBuf;
33use std::sync::atomic::{AtomicI64, Ordering};
34use std::sync::Arc;
35use std::time::{Duration, Instant};
36use tokio::select;
37use tokio::sync::{broadcast, mpsc};
38use tracing::{debug, error, info, info_span, warn, Instrument};
39
40use hypercall_recovery::RestartStateComponent;
41pub use hypercall_runtime_api::{
42    decrement_pending_requests, get_pending_requests, increment_pending_requests, AgentAuthRequest,
43    CashWithdrawalApplyReceipt, CashWithdrawalRequest, DepositRequest, EngineQuiesceAction,
44    EngineQuiesceReport, EngineQuiesceRequest, HypercoreEquityRequest, LiquidationBonusRequest,
45    MarginModeUpdateRequest, MarketRequest, NonceCheckRequest, OptionDepositRequest,
46    OptionWithdrawalApplyReceipt, OptionWithdrawalRequest, TierUpdateRequest, UnifiedEngineRequest,
47};
48
49#[cfg(feature = "otel-tracing")]
50use tracing_opentelemetry::OpenTelemetrySpanExt;
51
52mod apply_interface;
53mod builder;
54mod fill_metadata;
55mod journaling;
56mod markets;
57mod matching;
58mod order_routing;
59pub(crate) mod recovery;
60mod rfq_handler;
61mod runtime;
62#[cfg(test)]
63mod tests;
64mod traits;
65
66pub use apply_interface::EngineError;
67pub use builder::{UnifiedEngineBuilder, UnifiedEngineBuilderResult};
68pub(crate) use traits::{DurableJournaling, OrderAdmission, OrderExecution, ReplayRecovery};
69
70const MAX_EXPIRY_NATS_PAYLOAD_BYTES: usize = 1024 * 1024;
71const ENGINE_REQUIRE_SNAPSHOT_RESTORE_ENV: &str = "ENGINE_REQUIRE_SNAPSHOT_RESTORE";
72const UNSAFE_SNAPSHOT_REPLAY_OVERRIDE_ENV: &str = "ENGINE_SNAPSHOT_RESTORE_ALLOW_UNSAFE_REPLAY";
73const SNAPSHOT_CASH_VALIDATION_OVERRIDE_ENV: &str = "ENGINE_SNAPSHOT_RESTORE_ALLOW_CASH_DIVERGENCE";
74#[cfg(test)]
75const DURABLE_CASH_EVIDENCE_TOLERANCE: Decimal = dec!(0.00000001);
76
77/// Default buffer size for order channel
78const DEFAULT_ORDER_BUFFER_SIZE: usize = 1000;
79
80/// Buffer size for market request channel
81const MARKET_REQUEST_BUFFER_SIZE: usize = 100;
82
83/// TWAP settlement window in seconds (30 minutes)
84const TWAP_WINDOW_SECONDS: u32 = 1800;
85
86/// Hours to look back when loading idempotency cache
87const IDEMPOTENCY_CACHE_LOOKBACK_HOURS: i64 = 24;
88
89#[cfg(test)]
90#[derive(Clone)]
91pub(super) struct DurableCashEvidence {
92    pub(super) wallet: WalletAddress,
93    pub(super) ledger_balance: Decimal,
94    pub(super) margin_mode: Option<String>,
95}
96
97#[cfg(test)]
98pub(super) fn validate_snapshot_cash_evidence(
99    snapshot: &crate::rsm::engine_state_snapshot::EngineStateSnapshot,
100    rows: impl IntoIterator<Item = DurableCashEvidence>,
101) -> Result<(), String> {
102    let mut durable_by_wallet: HashMap<WalletAddress, (Decimal, Option<String>)> = HashMap::new();
103    for row in rows {
104        durable_by_wallet.insert(row.wallet, (row.ledger_balance, row.margin_mode));
105    }
106
107    let mut failures = Vec::new();
108    let snapshot_modes_are_authoritative = !snapshot.wallet_margin_modes.is_empty();
109    for (wallet, (ledger_balance, margin_mode)) in &durable_by_wallet {
110        let db_portfolio = margin_mode.as_deref() == Some("portfolio");
111        let snapshot_portfolio = matches!(
112            snapshot.wallet_margin_modes.get(wallet),
113            Some(crate::rsm::margin_mode::MarginMode::Portfolio)
114        );
115        if snapshot_portfolio || (!snapshot_modes_are_authoritative && db_portfolio) {
116            continue;
117        }
118
119        let snapshot_balance = snapshot
120            .balance_ledger
121            .get(wallet)
122            .copied()
123            .unwrap_or(Decimal::ZERO);
124
125        let snapshot_durable_diff = snapshot_balance - *ledger_balance;
126        if snapshot_durable_diff > DURABLE_CASH_EVIDENCE_TOLERANCE
127            || snapshot_durable_diff < -DURABLE_CASH_EVIDENCE_TOLERANCE
128        {
129            failures.push(format!(
130                "{} snapshot_cash={} durable_cash={}",
131                wallet, snapshot_balance, ledger_balance
132            ));
133        }
134    }
135
136    for (wallet, snapshot_balance) in &snapshot.balance_ledger {
137        if *snapshot_balance == Decimal::ZERO || durable_by_wallet.contains_key(wallet) {
138            continue;
139        }
140        if matches!(
141            snapshot.wallet_margin_modes.get(wallet),
142            Some(crate::rsm::margin_mode::MarginMode::Portfolio)
143        ) {
144            continue;
145        }
146        failures.push(format!(
147            "{} snapshot_cash={} missing durable cash evidence",
148            wallet, snapshot_balance
149        ));
150    }
151
152    if failures.is_empty() {
153        Ok(())
154    } else {
155        let sample = failures.into_iter().take(10).collect::<Vec<_>>().join("; ");
156        Err(format!(
157            "engine snapshot cash validation failed for standard-margin wallets: {sample}"
158        ))
159    }
160}
161
162#[derive(Clone, Copy, Debug, Eq, PartialEq)]
163pub(super) enum CashRestartAuthority {
164    EmptyState,
165    SnapshotAtCheckpoint {
166        command_id: i64,
167    },
168    SnapshotPlusReplay {
169        snapshot_command_id: i64,
170        replay_target_command_id: i64,
171    },
172    HotJournalBaseReplay {
173        replay_target_command_id: i64,
174    },
175}
176
177impl CashRestartAuthority {
178    #[cfg(test)]
179    pub(super) fn durable_cash_validation_command_id(self) -> Option<i64> {
180        match self {
181            Self::SnapshotAtCheckpoint { command_id } => Some(command_id),
182            Self::EmptyState
183            | Self::SnapshotPlusReplay { .. }
184            | Self::HotJournalBaseReplay { .. } => None,
185        }
186    }
187}
188
189pub(super) fn cash_restart_authority_for_startup(
190    snapshot_command_id: Option<i64>,
191    checkpoint_command_id: i64,
192) -> Result<CashRestartAuthority, String> {
193    match (snapshot_command_id, checkpoint_command_id) {
194        (Some(snapshot_command_id), checkpoint_command_id)
195            if snapshot_command_id > checkpoint_command_id =>
196        {
197            Err(format!(
198                "engine snapshot last_command_id {snapshot_command_id} is ahead of WAL checkpoint last_command_id {checkpoint_command_id}"
199            ))
200        }
201        (Some(command_id), checkpoint_command_id) if command_id == checkpoint_command_id => {
202            Ok(CashRestartAuthority::SnapshotAtCheckpoint { command_id })
203        }
204        (Some(snapshot_command_id), replay_target_command_id) => {
205            Ok(CashRestartAuthority::SnapshotPlusReplay {
206                snapshot_command_id,
207                replay_target_command_id,
208            })
209        }
210        (None, checkpoint_command_id) if checkpoint_command_id > 0 => {
211            Ok(CashRestartAuthority::HotJournalBaseReplay {
212                replay_target_command_id: checkpoint_command_id,
213            })
214        }
215        (None, _) => Ok(CashRestartAuthority::EmptyState),
216    }
217}
218
219#[cfg(test)]
220pub(super) fn snapshot_cash_validation_boundary_matches(
221    snapshot_command_id: i64,
222    checkpoint_command_id: i64,
223) -> bool {
224    matches!(
225        cash_restart_authority_for_startup(Some(snapshot_command_id), checkpoint_command_id),
226        Ok(CashRestartAuthority::SnapshotAtCheckpoint { .. })
227    )
228}
229
230#[cfg(test)]
231pub(super) fn should_validate_snapshot_cash_against_durable_accounting(
232    snapshot_command_id: i64,
233    checkpoint_command_id: i64,
234    replay_bounds: Option<hypercall_db::JournalCommandIdBounds>,
235) -> bool {
236    if snapshot_command_id != checkpoint_command_id {
237        return false;
238    }
239
240    match replay_bounds {
241        Some(bounds) => bounds.max_command_id <= checkpoint_command_id,
242        None => true,
243    }
244}
245
246pub(super) fn validate_hot_journal_covers_base_replay(
247    checkpoint_command_id: i64,
248    bounds: Option<hypercall_db::JournalCommandIdBounds>,
249    replay_from_command_id: i64,
250    non_replayable_tail_count: Option<i64>,
251) -> Result<(), String> {
252    if replay_from_command_id < 1 {
253        return Err(format!(
254            "invalid replay window start command_id {replay_from_command_id}; command ids must be >=1"
255        ));
256    }
257
258    if checkpoint_command_id <= 0 {
259        if let Some(bounds) = bounds {
260            if bounds.min_command_id != replay_from_command_id {
261                return Err(format!(
262                    "hot journal starts at command_id {}, but zero-checkpoint startup with existing engine_commands rows requires replay from command_id {}",
263                    bounds.min_command_id, replay_from_command_id
264                ));
265            }
266        }
267        return Ok(());
268    }
269
270    let Some(bounds) = bounds else {
271        if replay_from_command_id > checkpoint_command_id {
272            // Start replay after checkpoint implies this is an equal-snapshot, post-checkpoint
273            // resume path where a missing command-id window simply means no replayable
274            // deltas remain.
275            return Ok(());
276        }
277        validate_non_replayable_tail(
278            replay_from_command_id.saturating_sub(1),
279            checkpoint_command_id,
280            non_replayable_tail_count,
281        )?;
282        return Ok(());
283    };
284
285    if bounds.max_command_id < replay_from_command_id {
286        validate_non_replayable_tail(
287            replay_from_command_id.saturating_sub(1),
288            checkpoint_command_id,
289            non_replayable_tail_count,
290        )?;
291        return Ok(());
292    }
293
294    if bounds.min_command_id > replay_from_command_id {
295        return Err(format!(
296            "hot journal starts at command_id {}, but base reconstruction for checkpoint command_id {} requires replay from command_id {} or earlier",
297            bounds.min_command_id, checkpoint_command_id, replay_from_command_id
298        ));
299    }
300
301    validate_non_replayable_tail(
302        bounds.max_command_id,
303        checkpoint_command_id,
304        non_replayable_tail_count,
305    )?;
306
307    Ok(())
308}
309
310fn validate_non_replayable_tail(
311    replayable_tail_command_id: i64,
312    checkpoint_command_id: i64,
313    non_replayable_tail_count: Option<i64>,
314) -> Result<(), String> {
315    if replayable_tail_command_id >= checkpoint_command_id {
316        return Ok(());
317    }
318
319    let expected_tail_count = checkpoint_command_id - replayable_tail_command_id;
320    let Some(non_replayable_tail_count) = non_replayable_tail_count else {
321        return Err(format!(
322            "hot journal replayable commands end at command_id {}, before checkpoint command_id {}, and non-replayable tail coverage was not checked",
323            replayable_tail_command_id, checkpoint_command_id
324        ));
325    };
326
327    if non_replayable_tail_count != expected_tail_count {
328        return Err(format!(
329            "hot journal replayable commands end at command_id {}, before checkpoint command_id {}, and only {} of {} tail commands are proven non-replayable",
330            replayable_tail_command_id,
331            checkpoint_command_id,
332            non_replayable_tail_count,
333            expected_tail_count
334        ));
335    }
336
337    Ok(())
338}
339
340fn non_replayable_tail_count_for_coverage_check(
341    db: &impl hypercall_db::JournalReplayReader,
342    checkpoint_command_id: i64,
343    bounds: Option<hypercall_db::JournalCommandIdBounds>,
344    replay_from_command_id: i64,
345) -> Result<Option<i64>, String> {
346    if checkpoint_command_id <= 0 {
347        return Ok(None);
348    }
349
350    let replayable_tail_command_id = match bounds {
351        Some(bounds) if bounds.max_command_id >= replay_from_command_id => bounds.max_command_id,
352        _ => replay_from_command_id.saturating_sub(1),
353    };
354    if replayable_tail_command_id >= checkpoint_command_id {
355        return Ok(None);
356    }
357
358    db.count_non_replayable_commands_in_range_sync(
359        replayable_tail_command_id,
360        checkpoint_command_id,
361    )
362    .map(Some)
363    .map_err(|error| {
364        format!(
365            "failed to count non-replayable hot journal tail rows from command_id {} through {}: {error}",
366            replayable_tail_command_id + 1,
367            checkpoint_command_id
368        )
369    })
370}
371
372pub(super) fn should_validate_hot_journal_coverage_for_base_replay(
373    db_present: bool,
374    startup_checkpoint_command_id: i64,
375    wal_path_configured: bool,
376    snapshot_loaded: bool,
377    replay_checkpoint_command_id: i64,
378) -> bool {
379    if !db_present || !wal_path_configured {
380        return false;
381    }
382
383    !snapshot_loaded || startup_checkpoint_command_id >= replay_checkpoint_command_id
384}
385
386pub(super) fn post_replay_cash_validation_checkpoint(
387    db_present: bool,
388    wal_path_configured: bool,
389    startup_checkpoint: WalCheckpointMetadata,
390    journal_bounds: Option<hypercall_db::JournalCommandIdBounds>,
391) -> Option<WalCheckpointMetadata> {
392    if !db_present || !wal_path_configured {
393        return None;
394    }
395
396    if startup_checkpoint.last_command_id > 0 {
397        return Some(startup_checkpoint);
398    }
399
400    journal_bounds
401        .filter(|bounds| bounds.min_command_id == 1)
402        .map(|bounds| WalCheckpointMetadata {
403            last_command_id: bounds.max_command_id,
404            ..startup_checkpoint
405        })
406}
407
408fn env_flag_enabled(name: &str) -> bool {
409    std::env::var(name).ok().is_some_and(|value| {
410        matches!(
411            value.trim(),
412            "1" | "true" | "TRUE" | "yes" | "YES" | "on" | "ON"
413        )
414    })
415}
416
417/// The unified engine that combines order matching and margin calculations.
418///
419/// Margin calculations for order acceptance are delegated to MarginService
420/// (SpanMarginService implementation). The engine is responsible for:
421/// - Building hypothetical accounts that include open orders + proposed orders
422/// - Calling MarginService to compute margin requirements
423/// - Accepting/rejecting orders based on available collateral
424pub struct UnifiedEngine {
425    // Shared mutable state (passed to managers as &mut EngineCtx)
426    pub(crate) ctx: crate::rsm::engine_deps::EngineCtx,
427
428    // ===== Sub-managers (borrowed immutably during dispatch) =====
429    /// Margin checking logic (SPAN + Standard).
430    margin_manager: crate::rsm::margin_manager::MarginManager,
431
432    /// Expiry and settlement management.
433    expiry_manager: crate::rsm::expiry_manager::ExpiryManager,
434
435    // ===== Engine-internal (NOT passed to managers) =====
436    // Channels
437    order_receiver: mpsc::Receiver<UnifiedEngineRequest>,
438    market_receiver: mpsc::Receiver<MarketRequest>,
439    pub(crate) rfq_receiver: Option<mpsc::Receiver<hypercall_runtime_api::RfqExecuteRequest>>,
440    pub(crate) deposit_receiver: Option<mpsc::Receiver<DepositRequest>>,
441    pub(crate) option_deposit_receiver: Option<mpsc::Receiver<OptionDepositRequest>>,
442    pub(crate) option_withdrawal_receiver: Option<mpsc::Receiver<OptionWithdrawalRequest>>,
443    pub(crate) cash_withdrawal_receiver: Option<mpsc::Receiver<CashWithdrawalRequest>>,
444    pub(crate) liquidation_bonus_receiver: Option<mpsc::Receiver<LiquidationBonusRequest>>,
445    pub(crate) margin_mode_receiver: Option<mpsc::Receiver<MarginModeUpdateRequest>>,
446    pub(crate) agent_auth_receiver: Option<mpsc::Receiver<AgentAuthRequest>>,
447    pub(crate) nonce_check_receiver: Option<mpsc::Receiver<NonceCheckRequest>>,
448    pub(crate) tier_update_receiver: Option<mpsc::Receiver<TierUpdateRequest>>,
449    pub(crate) hypercore_equity_receiver: Option<mpsc::Receiver<HypercoreEquityRequest>>,
450    pub(crate) pm_settlement_admin_receiver:
451        Option<mpsc::Receiver<hypercall_runtime_api::PmSettlementAdminRequest>>,
452    pub(crate) quiesce_receiver: Option<mpsc::Receiver<EngineQuiesceRequest>>,
453    /// Live notify channel from `catalog_manager::manager` for
454    /// per-underlying `TradingModes` updates. When the catalog
455    /// manager rewrites any row in the `instruments.trading_mode`
456    /// column, it publishes the full current `underlying → mode`
457    /// map here. The main loop `changed()`s on it and calls
458    /// `apply_underlying_trading_mode_update`.
459    pub(crate) trading_mode_receiver: Option<
460        tokio::sync::watch::Receiver<
461            std::collections::HashMap<String, hypercall_types::TradingModes>,
462        >,
463    >,
464    shutdown_receiver: broadcast::Receiver<()>,
465
466    // Engine-level fee service (used in matching loop)
467    fee_service: FeeService,
468
469    // Snapshot timing
470    snapshot_interval: Duration,
471    read_snapshot_interval: Duration,
472    post_startup_reconcile_delay: Duration,
473    last_snapshot: Instant,
474    wal_path: Option<PathBuf>,
475
476    /// Sync status for readiness tracking.
477    sync_status: Arc<SyncStatus>,
478
479    /// Durable engine journal for restart-safe ACK and idempotency.
480    journal_writer: Option<crate::journal::SharedEngineJournalWriter>,
481
482    /// Batched journal sender for async PostgreSQL writes.
483    journal_batch_sender: Option<crate::journal::JournalBatchSender>,
484
485    /// NATS publisher for real-time command replication to standby instances.
486    nats_publisher: Option<crate::nats::NatsPublisher>,
487    /// NATS publisher for canonical balance update follower replication.
488    nats_balance_update_publisher: Option<crate::nats::NatsBalanceUpdatePublisher>,
489
490    /// Known request_ids for fast idempotency checks (bounded to prevent memory leaks).
491    known_request_ids: BoundedIdempotenceCache,
492
493    /// ArcSwap snapshot for lock-free reads by API handlers.
494    read_snapshot: Arc<ArcSwap<EngineSnapshot>>,
495
496    /// Whether post-startup reconciliation has been performed.
497    /// After SIGKILL, the journal batcher needs a few seconds to materialize
498    /// pending terminal order statuses into order_infos.
499    /// This flag ensures we re-check for ghost orders once after catchup.
500    post_startup_reconciled: bool,
501
502    /// Runtime events produced during synchronous startup journal replay.
503    /// These must be projected before `start()` hydrates engine positions from
504    /// PortfolioService, otherwise recovered TickExpiry events can leave stale
505    /// portfolio positions that resurrect settled engine positions.
506    startup_replayed_events: Vec<(String, Vec<EngineMessage>)>,
507
508    /// WAL checkpoint metadata loaded at startup and used to bound replay windows.
509    replay_checkpoint: WalCheckpointMetadata,
510
511    /// When true, the engine has crossed an admin drain barrier. Live mutation
512    /// sources remain parked until an explicit resume request.
513    runtime_quiesced: bool,
514
515    /// Whether an engine state snapshot was successfully loaded during startup.
516    /// When true, the base reconstruction window is skipped during replay.
517    snapshot_loaded: bool,
518
519    /// Orders on expired instruments discovered during replay that need DB cancellation.
520    startup_expired_order_cancels: Vec<recovery::StartupExpiredOrderCancel>,
521
522    /// External vol oracle reference for ingesting IV surfaces into the command stream.
523    /// Stored separately from the margin_manager's oracle so the runtime can read
524    /// surfaces and feed them as IvUpdate commands.
525    external_vol_oracle: Option<crate::vol_oracle::SharedVolOracle>,
526
527    /// Shared IV surfaces backing the EngineVolOracle. Updated by IvUpdate commands,
528    /// read by the SpanMarginService during margin checks.
529    engine_iv_surfaces: Arc<
530        std::sync::RwLock<HashMap<String, crate::vol_oracle::vol_surface_cache::VolatilitySurface>>,
531    >,
532}
533
534// ===== Helper types for process_order refactoring =====
535
536/// Result of order allocation
537pub(crate) struct AllocatedOrder {
538    order_id: u64,
539}
540
541/// Result of matching execution
542pub(crate) struct MatchingResult {
543    fills: Vec<hypercall_types::Fill>,
544    filled_size: Decimal,
545    mmp_triggered: bool,
546    self_trade_maker_id: Option<u64>,
547}
548
549#[derive(Debug, Clone)]
550pub struct EngineRuntimeSettings {
551    pub snapshot_interval: Duration,
552    pub read_snapshot_interval: Duration,
553    pub post_startup_reconcile_delay: Duration,
554    pub wal_path: Option<PathBuf>,
555    pub start_quiesced: bool,
556    pub recovery_safety_report: Option<hypercall_api::recovery_safety::SharedRecoverySafetyReport>,
557    pub portfolio_margin_pool_enabled: bool,
558    pub portfolio_margin_settlement_allowlist: std::collections::BTreeSet<WalletAddress>,
559}
560
561impl Default for EngineRuntimeSettings {
562    fn default() -> Self {
563        Self {
564            snapshot_interval: Duration::from_secs(60),
565            read_snapshot_interval: Duration::from_millis(500),
566            post_startup_reconcile_delay: Duration::from_secs(5),
567            wal_path: None,
568            start_quiesced: false,
569            recovery_safety_report: None,
570            portfolio_margin_pool_enabled: false,
571            portfolio_margin_settlement_allowlist: std::collections::BTreeSet::new(),
572        }
573    }
574}
575
576impl UnifiedEngine {
577    /// Initialize the unified engine with all necessary components
578    pub fn init(
579        config: Config,
580        order_receiver: mpsc::Receiver<UnifiedEngineRequest>,
581        market_receiver: mpsc::Receiver<MarketRequest>,
582        event_sender: mpsc::UnboundedSender<EngineMessage>,
583        read_snapshot: Option<Arc<ArcSwap<EngineSnapshot>>>,
584        shutdown_receiver: broadcast::Receiver<()>,
585        runtime_settings: EngineRuntimeSettings,
586        database_path: Option<String>,
587        db_auth: Option<hypercall_db_diesel::DbAuthConfig>,
588        allow_no_database: bool,
589        skip_db_migrations: bool,
590    ) -> Self {
591        // Try to create diesel handler for persistence if path provided
592        let diesel_handler = if let Some(db_path) = database_path {
593            info!(
594                "UnifiedEngine attempting to connect to database: {}",
595                db_path
596            );
597            let auth =
598                db_auth.unwrap_or_else(|| hypercall_db_diesel::DbAuthConfig::password(&db_path));
599            let result = if skip_db_migrations {
600                DatabaseHandler::new_readonly_auth(auth, 3)
601            } else {
602                DatabaseHandler::new_with_auth(auth, 3)
603            };
604            match result {
605                Ok(handler) => {
606                    info!(
607                        "UnifiedEngine successfully connected to persistence database: {} (migrations_skipped={})",
608                        db_path, skip_db_migrations
609                    );
610                    Some(handler)
611                }
612                Err(e) => {
613                    // Allow running without database if explicitly allowed (for tests)
614                    if allow_no_database {
615                        warn!("UnifiedEngine failed to connect to persistence database: {}, continuing without persistence (allowed for tests)", e);
616                        None
617                    } else {
618                        error!(
619                            "FATAL: UnifiedEngine failed to connect to persistence database {}: {}",
620                            db_path, e
621                        );
622                        panic!(
623                            "Cannot start UnifiedEngine without database connection: {}",
624                            e
625                        );
626                    }
627                }
628            }
629        } else {
630            // Allow running without database if explicitly allowed (for tests)
631            if allow_no_database {
632                info!("UnifiedEngine: No database path provided, running without persistence (allowed for tests)");
633                None
634            } else {
635                error!("FATAL: No database path provided for UnifiedEngine");
636                panic!("Cannot start UnifiedEngine without database configuration");
637            }
638        };
639
640        // Get the latest L2 sequence number from engine_events table
641        let l2_update_seq = if let Some(ref handler) = diesel_handler {
642            let replay: &dyn hypercall_db::JournalReplayReader = handler;
643            match replay.get_max_l2_seq_from_events_sync() {
644                Ok(seq) => {
645                    info!("Loaded L2 update sequence from engine_events: {}", seq);
646                    Arc::new(AtomicI64::new(seq))
647                }
648                Err(e) => {
649                    if allow_no_database {
650                        warn!(
651                            "Failed to load L2 update sequence (allowed for tests): {}",
652                            e
653                        );
654                        Arc::new(AtomicI64::new(0))
655                    } else {
656                        panic!(
657                            "CRITICAL_FAILURE: Failed to load L2 update sequence from database: {}. \
658                             Starting from 0 would risk ID collisions. Restart required.",
659                            e
660                        );
661                    }
662                }
663            }
664        } else {
665            Arc::new(AtomicI64::new(0))
666        };
667
668        // Get the next order_id from the database (max existing order_id + 1)
669        let next_order_id = if let Some(ref handler) = diesel_handler {
670            match handler.get_max_order_id_sync() {
671                Ok(max_order_id) => {
672                    let next_id = max_order_id + 1;
673                    info!(
674                        "Loaded max order_id from database: {}, starting from {}",
675                        max_order_id, next_id
676                    );
677                    next_id
678                }
679                Err(e) => {
680                    if allow_no_database {
681                        warn!("Failed to load max order_id (allowed for tests): {}", e);
682                        1
683                    } else {
684                        panic!(
685                            "CRITICAL_FAILURE: Failed to load max order_id from database: {}. \
686                             Starting from 1 would risk duplicate order IDs. Restart required.",
687                            e
688                        );
689                    }
690                }
691            }
692        } else {
693            info!("No database handler, starting order_id from 1");
694            1
695        };
696
697        // Get the next trade_id from the database (max existing trade_id + 1)
698        let next_trade_id = if let Some(ref handler) = diesel_handler {
699            match handler.get_max_trade_id_sync() {
700                Ok(max_trade_id) => {
701                    let next_id = max_trade_id + 1;
702                    info!(
703                        "Loaded max trade_id from database: {}, starting from {}",
704                        max_trade_id, next_id
705                    );
706                    next_id
707                }
708                Err(e) => {
709                    if allow_no_database {
710                        warn!("Failed to load max trade_id (allowed for tests): {}", e);
711                        1
712                    } else {
713                        panic!(
714                            "CRITICAL_FAILURE: Failed to load max trade_id from database: {}. \
715                             Starting from 1 would risk duplicate trade IDs. Restart required.",
716                            e
717                        );
718                    }
719                }
720            }
721        } else {
722            info!("No database handler, starting trade_id from 1");
723            1
724        };
725
726        let fee_service = FeeService::new(config.fee_config.clone());
727
728        // Initialize sub-managers
729        let margin_manager = crate::rsm::margin_manager::MarginManager::new(&config);
730        let expiry_manager = crate::rsm::expiry_manager::ExpiryManager::new();
731        let ctx = crate::rsm::engine_deps::EngineCtx {
732            orderbooks: HashMap::new(),
733            next_order_id,
734            next_trade_id,
735            l2_update_seq,
736            expired_instruments: HashMap::new(),
737            spot_prices: HashMap::new(),
738            iv_surfaces: HashMap::new(),
739            iv_source_timestamps: HashMap::new(),
740            instrument_trading_modes: HashMap::new(),
741            order_index: hypercall_engine::order_index::EngineOrderIndex::new(),
742            engine_positions: HashMap::new(),
743            balance_ledger: BalanceLedger::new(),
744            deposit_update_watermarks: HashMap::new(),
745            applied_deposit_source_event_hashes: std::collections::BTreeSet::new(),
746            pm_settlement_state:
747                crate::rsm::portfolio_margin::settlement_state::PmSettlementState::default(),
748            mmp_state: HashMap::new(),
749            agent_authorizations: HashMap::new(),
750            nonce_sets: HashMap::new(),
751            rsm_signer_nonces: HashMap::new(),
752            deps: crate::rsm::engine_deps::EngineDeps {
753                mmp_cache: None,
754                tier_cache: None,
755                portfolio_service: None,
756                portfolio_cache: None,
757                risk_account_builder: None,
758                standard_account_builder: None,
759                greeks_cache: None,
760                mark_price_oracles: HashMap::new(),
761                reference_prices: HashMap::new(),
762                liquidation_cache: None,
763                config,
764                portfolio_margin_pool_enabled: runtime_settings.portfolio_margin_pool_enabled,
765                portfolio_margin_settlement_allowlist: runtime_settings
766                    .portfolio_margin_settlement_allowlist
767                    .clone(),
768                event_sender,
769                ws_event_sender: None,
770                margin_timestamp_s: chrono::Utc::now().timestamp(),
771                liquidation_states: HashMap::new(),
772                wallet_margin_modes: HashMap::new(),
773                mmp_enabled: HashMap::new(),
774                wallet_trading_limits: HashMap::new(),
775                default_trading_limits: hypercall_types::api_models::TradingLimits::default(),
776                wallet_tiers: HashMap::new(),
777                perp_positions: HashMap::new(),
778                hypercore_account_equity: HashMap::new(),
779                hypercore_equity_timestamps: HashMap::new(),
780            },
781            db: diesel_handler,
782        };
783
784        let mut engine = Self {
785            ctx,
786            margin_manager,
787            expiry_manager,
788            order_receiver,
789            market_receiver,
790            rfq_receiver: None,
791            deposit_receiver: None,
792            option_deposit_receiver: None,
793            option_withdrawal_receiver: None,
794            cash_withdrawal_receiver: None,
795            liquidation_bonus_receiver: None,
796            margin_mode_receiver: None,
797            agent_auth_receiver: None,
798            nonce_check_receiver: None,
799            tier_update_receiver: None,
800            hypercore_equity_receiver: None,
801            pm_settlement_admin_receiver: None,
802            quiesce_receiver: None,
803            trading_mode_receiver: None,
804            shutdown_receiver,
805            fee_service,
806            snapshot_interval: runtime_settings.snapshot_interval,
807            read_snapshot_interval: runtime_settings.read_snapshot_interval,
808            post_startup_reconcile_delay: runtime_settings.post_startup_reconcile_delay,
809            last_snapshot: Instant::now(),
810            wal_path: runtime_settings.wal_path,
811            sync_status: Arc::new(SyncStatus::new()),
812            journal_writer: None,
813            journal_batch_sender: None,
814            nats_publisher: None,
815            nats_balance_update_publisher: None,
816            known_request_ids: BoundedIdempotenceCache::new(),
817            read_snapshot: read_snapshot
818                .unwrap_or_else(|| Arc::new(ArcSwap::from_pointee(EngineSnapshot::empty()))),
819            post_startup_reconciled: false,
820            startup_replayed_events: Vec::new(),
821            replay_checkpoint: WalCheckpointMetadata::ZERO,
822            runtime_quiesced: runtime_settings.start_quiesced,
823            snapshot_loaded: false,
824            startup_expired_order_cancels: Vec::new(),
825            external_vol_oracle: None,
826            engine_iv_surfaces: Arc::new(std::sync::RwLock::new(HashMap::new())),
827        };
828
829        // Load existing markets from database
830        ReplayRecovery::load_markets_from_db(&mut engine);
831
832        // Load WAL checkpoint metadata and replay bounds before journal recovery.
833        ReplayRecovery::load_orderbook_snapshots(&mut engine);
834        engine.sync_status.set_catching_up();
835
836        // Try to load a persistent engine state snapshot to skip full replay.
837        // The snapshot's last_command_id must be <= the checkpoint's last_command_id
838        // (snapshot is tagged from the Postgres-replicated boundary).
839        // Only attempt when ENGINE_JOURNAL_WAL_PATH is explicitly set — the default
840        // path (/var/tmp/...) is shared across test processes with different databases,
841        // so snapshot files from one test would corrupt another's state.
842        let wal_path_configured = hypercall_journal::checkpoint::wal_path_is_explicitly_configured(
843            engine.wal_path.as_ref(),
844        );
845        let wal_path =
846            hypercall_journal::checkpoint::wal_path_from_config(engine.wal_path.as_ref());
847        let env_wal_path = std::env::var("ENGINE_JOURNAL_WAL_PATH").ok();
848        let require_snapshot_restore = env_flag_enabled(ENGINE_REQUIRE_SNAPSHOT_RESTORE_ENV);
849        let unsafe_replay_override = env_flag_enabled(UNSAFE_SNAPSHOT_REPLAY_OVERRIDE_ENV);
850        let cash_divergence_override = env_flag_enabled(SNAPSHOT_CASH_VALIDATION_OVERRIDE_ENV);
851        let startup_replay_checkpoint = engine.replay_checkpoint;
852        let snapshot_path =
853            crate::rsm::engine_state_snapshot::snapshot_path_from_wal_path(&wal_path);
854        let drain_marker_path = hypercall_api::handlers::health::drain_marker_path(&wal_path);
855        let mut recovery_safety_report = hypercall_api::recovery_safety::RecoverySafetyReport::new(
856            wal_path.display().to_string(),
857            wal_path_configured,
858            require_snapshot_restore,
859            unsafe_replay_override,
860            cash_divergence_override,
861            hypercall_api::recovery_safety::RecoverySafetyCheckpoint {
862                last_command_id: engine.replay_checkpoint.last_command_id,
863                last_l2_seq: engine.replay_checkpoint.last_l2_seq,
864                wal_offset: engine.replay_checkpoint.wal_offset as u64,
865            },
866            hypercall_api::recovery_safety::RecoverySafetyDrainMarker {
867                path: drain_marker_path.display().to_string(),
868                present_at_startup: runtime_settings.start_quiesced,
869            },
870        );
871        if runtime_settings.start_quiesced {
872            recovery_safety_report.pass(
873                "drain_marker",
874                format!(
875                    "persistent drain marker present at startup; engine starts quiesced until operator undrains: {}",
876                    drain_marker_path.display()
877                ),
878            );
879        } else {
880            recovery_safety_report.pass("drain_marker", "no persistent drain marker at startup");
881        }
882        info!(
883            configured_wal_path = ?engine.wal_path,
884            env_wal_path = ?env_wal_path,
885            effective_wal_path = %wal_path.display(),
886            wal_checkpoint_last_command_id = engine.replay_checkpoint.last_command_id,
887            require_snapshot_restore,
888            "Resolved engine recovery WAL path"
889        );
890        if require_snapshot_restore && engine.ctx.db.is_some() {
891            if !wal_path_configured {
892                recovery_safety_report.fail(
893                    "wal_path_configured",
894                    format!(
895                        "{} is set but no explicit engine WAL path is configured",
896                        ENGINE_REQUIRE_SNAPSHOT_RESTORE_ENV
897                    ),
898                );
899                hypercall_api::recovery_safety::store_report(
900                    &runtime_settings.recovery_safety_report,
901                    &recovery_safety_report,
902                );
903                panic!(
904                    "CRITICAL_FAILURE: {} is set but no explicit engine WAL path is configured",
905                    ENGINE_REQUIRE_SNAPSHOT_RESTORE_ENV
906                );
907            }
908            if engine.replay_checkpoint.last_command_id <= 0 {
909                recovery_safety_report.fail(
910                    "checkpoint",
911                    format!(
912                        "{} is set but WAL checkpoint at {} is missing or zero",
913                        ENGINE_REQUIRE_SNAPSHOT_RESTORE_ENV,
914                        wal_path.display()
915                    ),
916                );
917                hypercall_api::recovery_safety::store_report(
918                    &runtime_settings.recovery_safety_report,
919                    &recovery_safety_report,
920                );
921                panic!(
922                    "CRITICAL_FAILURE: {} is set but WAL checkpoint at {} is missing or zero. \
923                     Blue/green standby must start from a cloned WAL/checkpoint/snapshot boundary.",
924                    ENGINE_REQUIRE_SNAPSHOT_RESTORE_ENV,
925                    wal_path.display(),
926                );
927            }
928        }
929        if wal_path_configured {
930            recovery_safety_report.pass("wal_path_configured", "explicit WAL path configured");
931        } else {
932            recovery_safety_report.pass(
933                "wal_path_configured",
934                "explicit WAL path not configured; persistent snapshot restore not attempted",
935            );
936        }
937        if engine.replay_checkpoint.last_command_id > 0 {
938            recovery_safety_report.pass(
939                "checkpoint",
940                format!(
941                    "checkpoint loaded at command_id {}",
942                    engine.replay_checkpoint.last_command_id
943                ),
944            );
945        } else {
946            recovery_safety_report.pass("checkpoint", "no nonzero WAL checkpoint");
947        }
948        let mut hot_journal_bounds = None;
949
950        if engine.ctx.db.is_some()
951            && engine.replay_checkpoint.last_command_id > 0
952            && wal_path_configured
953        {
954            use crate::rsm::engine_state_snapshot::{read_snapshot, snapshot_order_count};
955            match read_snapshot(&snapshot_path) {
956                Ok(Some(snap))
957                    if snap.last_command_id <= engine.replay_checkpoint.last_command_id =>
958                {
959                    recovery_safety_report.snapshot =
960                        Some(hypercall_api::recovery_safety::RecoverySafetySnapshot {
961                            path: snapshot_path.display().to_string(),
962                            present: true,
963                            loaded: false,
964                            last_command_id: Some(snap.last_command_id),
965                            last_l2_seq: Some(snap.last_l2_seq),
966                            order_count: Some(snapshot_order_count(&snap)),
967                        });
968                    recovery_safety_report.pass(
969                        "snapshot_decode",
970                        format!(
971                            "decoded engine snapshot at command_id {}",
972                            snap.last_command_id
973                        ),
974                    );
975                    let cash_authority = cash_restart_authority_for_startup(
976                        Some(snap.last_command_id),
977                        engine.replay_checkpoint.last_command_id,
978                    )
979                    .expect("snapshot command_id checked not ahead of WAL checkpoint");
980                    info!(
981                        snapshot_last_command_id = snap.last_command_id,
982                        checkpoint_last_command_id = engine.replay_checkpoint.last_command_id,
983                        ?cash_authority,
984                        snapshot_balance_update_seq = snap.last_balance_update_seq,
985                        "Skipping pre-restore DB cash validation; engine snapshot plus WAL boundary is authoritative"
986                    );
987                    recovery_safety_report.cash.validation_checked = false;
988                    recovery_safety_report.cash.validation_passed = None;
989                    recovery_safety_report.cash.message = Some(format!(
990                        "pre-restore DB cash validation skipped for {:?}; engine snapshot plus WAL is authoritative",
991                        cash_authority
992                    ));
993
994                    let order_count = snapshot_order_count(&snap);
995                    info!(
996                        "Loading engine state snapshot: last_command_id={}, orders={}",
997                        snap.last_command_id, order_count,
998                    );
999                    crate::rsm::restart_components::PersistentEngineStateComponent::restore(
1000                        &mut engine.ctx,
1001                        &snap,
1002                    );
1003                    engine.rebuild_shared_engine_iv_surfaces();
1004                    engine.replay_checkpoint.last_command_id = snap.last_command_id;
1005                    engine.replay_checkpoint.last_l2_seq = snap.last_l2_seq;
1006                    engine.snapshot_loaded = true;
1007                    if let Some(snapshot) = recovery_safety_report.snapshot.as_mut() {
1008                        snapshot.loaded = true;
1009                    }
1010
1011                    gauge!("ht_engine_snapshot_restored").set(1.0);
1012                    gauge!("ht_engine_snapshot_last_command_id").set(snap.last_command_id as f64);
1013                    gauge!("ht_engine_snapshot_orders_count").set(order_count as f64);
1014                }
1015                Ok(Some(snap)) => {
1016                    recovery_safety_report.snapshot =
1017                        Some(hypercall_api::recovery_safety::RecoverySafetySnapshot {
1018                            path: snapshot_path.display().to_string(),
1019                            present: true,
1020                            loaded: false,
1021                            last_command_id: Some(snap.last_command_id),
1022                            last_l2_seq: Some(snap.last_l2_seq),
1023                            order_count: Some(snapshot_order_count(&snap)),
1024                        });
1025                    recovery_safety_report.fail(
1026                        "snapshot_boundary",
1027                        format!(
1028                            "snapshot command_id {} is ahead of WAL checkpoint command_id {}",
1029                            snap.last_command_id, engine.replay_checkpoint.last_command_id
1030                        ),
1031                    );
1032                    hypercall_api::recovery_safety::store_report(
1033                        &runtime_settings.recovery_safety_report,
1034                        &recovery_safety_report,
1035                    );
1036                    panic!(
1037                        "CRITICAL_FAILURE: engine state snapshot last_command_id {} is ahead of \
1038                         WAL checkpoint last_command_id {}. This is a logic bug — the snapshot \
1039                         should always be tagged from the Postgres-replicated boundary.",
1040                        snap.last_command_id, engine.replay_checkpoint.last_command_id,
1041                    );
1042                }
1043                Ok(None) => {
1044                    gauge!("ht_engine_snapshot_restored").set(0.0);
1045                    recovery_safety_report.snapshot =
1046                        Some(hypercall_api::recovery_safety::RecoverySafetySnapshot {
1047                            path: snapshot_path.display().to_string(),
1048                            present: false,
1049                            loaded: false,
1050                            last_command_id: None,
1051                            last_l2_seq: None,
1052                            order_count: None,
1053                        });
1054                    if require_snapshot_restore {
1055                        recovery_safety_report.fail(
1056                            "snapshot_decode",
1057                            format!(
1058                                "no engine state snapshot found at {}",
1059                                snapshot_path.display()
1060                            ),
1061                        );
1062                        hypercall_api::recovery_safety::store_report(
1063                            &runtime_settings.recovery_safety_report,
1064                            &recovery_safety_report,
1065                        );
1066                        panic!(
1067                            "CRITICAL_FAILURE: no engine state snapshot found at {} with WAL \
1068                             checkpoint command_id {} while {} is set. Refusing standby restore \
1069                             without a cloned engine snapshot.",
1070                            snapshot_path.display(),
1071                            engine.replay_checkpoint.last_command_id,
1072                            ENGINE_REQUIRE_SNAPSHOT_RESTORE_ENV,
1073                        );
1074                    } else {
1075                        recovery_safety_report.pass(
1076                            "snapshot_decode",
1077                            "no snapshot found; base replay requires hot journal coverage validation",
1078                        );
1079                        warn!(
1080                            snapshot_path = %snapshot_path.display(),
1081                            checkpoint_command_id = engine.replay_checkpoint.last_command_id,
1082                            "No engine state snapshot found; falling back to full durable journal replay after coverage validation"
1083                        );
1084                    }
1085                }
1086                Err(e) => {
1087                    gauge!("ht_engine_snapshot_restored").set(0.0);
1088                    recovery_safety_report.snapshot =
1089                        Some(hypercall_api::recovery_safety::RecoverySafetySnapshot {
1090                            path: snapshot_path.display().to_string(),
1091                            present: true,
1092                            loaded: false,
1093                            last_command_id: None,
1094                            last_l2_seq: None,
1095                            order_count: None,
1096                        });
1097                    if unsafe_replay_override {
1098                        recovery_safety_report.fail(
1099                            "snapshot_decode",
1100                            format!("corrupt engine state snapshot accepted by override: {e}"),
1101                        );
1102                        warn!(
1103                            error = %e,
1104                            override_env = UNSAFE_SNAPSHOT_REPLAY_OVERRIDE_ENV,
1105                            checkpoint_command_id = engine.replay_checkpoint.last_command_id,
1106                            "UNSAFE_OPERATOR_OVERRIDE: corrupt engine state snapshot, continuing to full replay"
1107                        );
1108                    } else {
1109                        recovery_safety_report.fail(
1110                            "snapshot_decode",
1111                            format!("corrupt engine state snapshot: {e}"),
1112                        );
1113                        hypercall_api::recovery_safety::store_report(
1114                            &runtime_settings.recovery_safety_report,
1115                            &recovery_safety_report,
1116                        );
1117                        panic!(
1118                            "CRITICAL_FAILURE: corrupt engine state snapshot at {} with WAL \
1119                             checkpoint command_id {}: {}. Refusing unsafe hot-journal replay. \
1120                             Set {}=true only for an explicit operator-controlled repair or \
1121                             incident investigation.",
1122                            snapshot_path.display(),
1123                            engine.replay_checkpoint.last_command_id,
1124                            e,
1125                            UNSAFE_SNAPSHOT_REPLAY_OVERRIDE_ENV,
1126                        );
1127                    }
1128                }
1129            }
1130        }
1131
1132        if should_validate_hot_journal_coverage_for_base_replay(
1133            engine.ctx.db.is_some(),
1134            startup_replay_checkpoint.last_command_id,
1135            wal_path_configured,
1136            engine.snapshot_loaded,
1137            engine.replay_checkpoint.last_command_id,
1138        ) {
1139            let checkpoint_command_id = startup_replay_checkpoint.last_command_id;
1140            let replay_from_command_id = if engine.snapshot_loaded {
1141                engine.replay_checkpoint.last_command_id.saturating_add(1)
1142            } else {
1143                1
1144            };
1145            let db = engine
1146                .ctx
1147                .db
1148                .as_ref()
1149                .expect("checked db is present before hot journal coverage validation");
1150            let coverage_result = match hot_journal_bounds {
1151                Some(bounds) => Ok(Some(bounds)),
1152                None => db
1153                    .get_journal_command_id_bounds_sync()
1154                    .map_err(|error| format!("failed to load hot journal command bounds: {error}")),
1155            }
1156            .and_then(|bounds| {
1157                recovery_safety_report.replay_coverage.checked = true;
1158                if let Some(bounds) = bounds {
1159                    hot_journal_bounds = Some(bounds);
1160                    recovery_safety_report.replay_coverage.min_command_id =
1161                        Some(bounds.min_command_id);
1162                    recovery_safety_report.replay_coverage.max_command_id =
1163                        Some(bounds.max_command_id);
1164                }
1165                let non_replayable_tail_count = non_replayable_tail_count_for_coverage_check(
1166                    db,
1167                    checkpoint_command_id,
1168                    bounds,
1169                    replay_from_command_id,
1170                )?;
1171                validate_hot_journal_covers_base_replay(
1172                    checkpoint_command_id,
1173                    bounds,
1174                    replay_from_command_id,
1175                    non_replayable_tail_count,
1176                )
1177            });
1178
1179            match coverage_result {
1180                Ok(()) => {
1181                    recovery_safety_report.pass(
1182                        "hot_journal_coverage",
1183                        "hot journal covers base reconstruction window",
1184                    );
1185                    info!(
1186                        checkpoint_command_id = checkpoint_command_id,
1187                        "Hot journal coverage validation passed for base reconstruction"
1188                    );
1189                }
1190                Err(error) if unsafe_replay_override => {
1191                    recovery_safety_report.fail("hot_journal_coverage", error.clone());
1192                    warn!(
1193                        %error,
1194                        override_env = UNSAFE_SNAPSHOT_REPLAY_OVERRIDE_ENV,
1195                        checkpoint_command_id = checkpoint_command_id,
1196                        "UNSAFE_OPERATOR_OVERRIDE: continuing with incomplete hot-journal base reconstruction"
1197                    );
1198                }
1199                Err(error) => {
1200                    recovery_safety_report.fail("hot_journal_coverage", error.clone());
1201                    hypercall_api::recovery_safety::store_report(
1202                        &runtime_settings.recovery_safety_report,
1203                        &recovery_safety_report,
1204                    );
1205                    panic!(
1206                        "CRITICAL_FAILURE: {error}. Refusing base reconstruction from incomplete \
1207                         hot Postgres journal at checkpoint command_id {}. Set {}=true only for \
1208                         an explicit operator-controlled repair or incident investigation.",
1209                        checkpoint_command_id, UNSAFE_SNAPSHOT_REPLAY_OVERRIDE_ENV,
1210                    );
1211                }
1212            }
1213        } else {
1214            recovery_safety_report.pass(
1215                "hot_journal_coverage",
1216                "base reconstruction coverage validation not required",
1217            );
1218        }
1219        // CRITICAL: Replay commands from the journal in checkpoint-bounded windows
1220        // to reconstruct orderbook state without materializing the full replay set.
1221        // The replay also builds order_index with full metadata (client_id,
1222        // mmp_enabled, original_size), so rebuild_from_orderbooks is NOT
1223        // called afterwards — it would destroy that metadata.
1224        ReplayRecovery::replay_commands_from_journal(&mut engine);
1225
1226        if let Some(cash_validation_checkpoint) = post_replay_cash_validation_checkpoint(
1227            engine.ctx.db.is_some(),
1228            wal_path_configured,
1229            startup_replay_checkpoint,
1230            hot_journal_bounds,
1231        ) {
1232            recovery_safety_report.cash.validation_checked = false;
1233            recovery_safety_report.cash.validation_passed = None;
1234            recovery_safety_report.cash.message = Some(format!(
1235                "post-replay DB cash validation skipped at checkpoint command_id {}; engine balance ledger is authoritative",
1236                cash_validation_checkpoint.last_command_id
1237            ));
1238            recovery_safety_report.pass(
1239                "standard_margin_cash_replay_watermark",
1240                "post-replay DB cash validation skipped because engine balance ledger is authoritative",
1241            );
1242        }
1243        hypercall_api::recovery_safety::store_report(
1244            &runtime_settings.recovery_safety_report,
1245            &recovery_safety_report,
1246        );
1247
1248        // Check for crossed orderbooks after replay — should NEVER happen.
1249        ReplayRecovery::check_restored_orderbooks_for_crosses(&engine);
1250
1251        // Publish initial snapshot after replay
1252        engine.publish_snapshot();
1253
1254        // Persist a fresh engine state snapshot after replay for the next restart.
1255        engine.persist_engine_state_snapshot();
1256
1257        // Without a database there is no delayed post-startup reconciliation.
1258        // Mark the engine ready immediately after replay/bootstrap.
1259        if engine.ctx.db.is_none() {
1260            engine.post_startup_reconciled = true;
1261            engine.sync_status.set_ready();
1262            info!("UnifiedEngine sync status: Ready (no database configured)");
1263        }
1264
1265        engine
1266    }
1267
1268    /// Publish a new read snapshot from the current orderbook state.
1269    fn publish_snapshot(&self) {
1270        let l2_seq = self.ctx.l2_update_seq.load(Ordering::SeqCst);
1271        let snapshot = EngineSnapshot::build_with_ctx(&self.ctx, l2_seq);
1272        tracing::debug!(
1273            l2_seq = snapshot.l2_seq,
1274            cash_wallet_count = snapshot.engine_state_digest.cash_wallet_count,
1275            cash_digest = %snapshot.engine_state_digest.cash_digest,
1276            "Publishing EngineSnapshot balance_ledger"
1277        );
1278        self.read_snapshot.store(Arc::new(snapshot));
1279    }
1280
1281    /// Set the portfolio service for executed state.
1282    ///
1283    /// This is the canonical source of truth for positions + cash.
1284    /// Must be set for margin checks to use real executed state.
1285    pub fn set_portfolio_service(&mut self, service: Arc<dyn PortfolioService + Send + Sync>) {
1286        info!("Setting PortfolioService for UnifiedEngine");
1287        self.ctx.deps.portfolio_service = Some(service);
1288    }
1289
1290    /// Set the portfolio cache for synchronous fill processing.
1291    pub fn set_portfolio_cache(&mut self, cache: Arc<PortfolioCache>) {
1292        info!("Setting PortfolioCache for UnifiedEngine");
1293        self.ctx.deps.portfolio_cache = Some(cache);
1294    }
1295
1296    /// Set the RiskAccountBuilder for building risk accounts.
1297    ///
1298    /// This is the single source of truth for "how to build an Account for PM".
1299    /// Must be set for margin calculations to work correctly.
1300    pub fn set_risk_account_builder(
1301        &mut self,
1302        builder: Arc<crate::rsm::portfolio_margin::risk_account_builder::RiskAccountBuilder>,
1303    ) {
1304        info!("Setting RiskAccountBuilder for UnifiedEngine");
1305        self.ctx.deps.risk_account_builder = Some(builder);
1306    }
1307
1308    /// Set the StandardAccountBuilder for building StandardAccounts.
1309    ///
1310    /// Used for Standard margin mode accounts (Deribit-style linear margin).
1311    pub fn set_standard_account_builder(&mut self, builder: Arc<StandardAccountBuilder>) {
1312        info!("Setting StandardAccountBuilder for UnifiedEngine");
1313        self.ctx.deps.standard_account_builder = Some(builder);
1314    }
1315
1316    /// Set the LiquidationCache for pre-liquidation order blocking.
1317    ///
1318    /// When set, orders from accounts in pre-liquidation state will be checked
1319    /// to ensure they don't increase risk. Risk-reducing orders are still allowed.
1320    pub fn set_liquidation_cache(&mut self, cache: Arc<crate::liquidator::LiquidationCache>) {
1321        info!("Setting LiquidationCache for UnifiedEngine");
1322        self.ctx.deps.liquidation_cache = Some(cache);
1323    }
1324
1325    /// Replace the engine's DieselEventHandler.
1326    ///
1327    /// Used during standby-to-active promotion: the engine starts with a
1328    /// readonly handler (or None), and on promote we swap in a read-write
1329    /// handler so the engine can persist fills, settlements, etc.
1330    pub fn set_db(&mut self, handler: DatabaseHandler) {
1331        info!("Replacing DieselEventHandler on UnifiedEngine (standby promote)");
1332        self.ctx.db = Some(handler);
1333    }
1334
1335    /// Set the durable engine journal writer.
1336    /// When set and journaling is enabled, commands are persisted before ACK.
1337    /// Also loads recent request_ids into the idempotency cache for fast lookups.
1338    pub fn set_journal_writer(&mut self, writer: crate::journal::SharedEngineJournalWriter) {
1339        info!("Setting EngineJournalWriter for UnifiedEngine");
1340
1341        // Load recent request_ids for idempotency cache warm-up.
1342        // This eliminates DB lookups for requests that can't be duplicates from previous runs.
1343        // We load request_ids from the last 24 hours, which should cover any retry scenarios.
1344        match writer.get_recent_request_ids(IDEMPOTENCY_CACHE_LOOKBACK_HOURS) {
1345            Ok(request_ids) => {
1346                let count = request_ids.len();
1347                self.known_request_ids.extend(request_ids);
1348                info!(
1349                    "Loaded {} request_ids into idempotency cache (24h lookback, capacity: {})",
1350                    count,
1351                    self.known_request_ids.capacity()
1352                );
1353            }
1354            Err(e) => {
1355                warn!(
1356                    "Failed to load request_ids for idempotency cache: {} - idempotency checks will query DB",
1357                    e
1358                );
1359            }
1360        }
1361
1362        self.journal_writer = Some(writer);
1363    }
1364
1365    /// Set the batched journal sender for async PostgreSQL writes.
1366    /// When set, the engine pushes journal entries to a channel instead of
1367    /// blocking on synchronous inserts.
1368    pub fn set_journal_batch_sender(&mut self, sender: crate::journal::JournalBatchSender) {
1369        info!("Setting JournalBatchSender for UnifiedEngine (async batched writes)");
1370        self.journal_batch_sender = Some(sender);
1371    }
1372
1373    /// Set the NATS publisher for real-time command replication.
1374    pub fn set_nats_publisher(&mut self, publisher: crate::nats::NatsPublisher) {
1375        info!("Setting NatsPublisher for UnifiedEngine (real-time replication)");
1376        self.nats_publisher = Some(publisher);
1377    }
1378
1379    pub fn set_nats_balance_update_publisher(
1380        &mut self,
1381        publisher: crate::nats::NatsBalanceUpdatePublisher,
1382    ) {
1383        info!("Setting NatsBalanceUpdatePublisher for UnifiedEngine");
1384        self.nats_balance_update_publisher = Some(publisher);
1385    }
1386
1387    /// Publish a command envelope to NATS for standby replication.
1388    /// Call this after apply() succeeds for any command that should be replicated.
1389    pub(crate) async fn publish_to_nats(&self, env: &crate::rsm::apply::CommandEnvelope) {
1390        let Some(ref publisher) = self.nats_publisher else {
1391            return;
1392        };
1393
1394        use crate::nats::CommandType;
1395        use crate::rsm::apply::{
1396            ApproveAgentPayload, BalanceCommandPayload, DepositUpdatePayload, EngineCommand,
1397            HypercoreEquityUpdatePayload, HypercorePositionUpdatePayload, MmpConfigUpdatePayload,
1398            NonceAdvancePayload, RevokeAgentPayload, TierMarginModeUpdatePayload,
1399            TradingModeUpdatePayload,
1400        };
1401
1402        let (cmd_type, data) = match &env.command {
1403            EngineCommand::OrderAction(msg) => (
1404                CommandType::Order,
1405                hypercall_types::serialize_to_wire_bytes(msg),
1406            ),
1407            EngineCommand::PriceUpdate {
1408                underlying,
1409                spot_price,
1410                timestamp_ms,
1411            } => (
1412                CommandType::PriceUpdate,
1413                hypercall_types::serialize_to_wire_bytes(&crate::rsm::apply::PriceUpdatePayload {
1414                    underlying: underlying.clone(),
1415                    spot_price: *spot_price,
1416                    timestamp_ms: *timestamp_ms,
1417                }),
1418            ),
1419            EngineCommand::IvUpdate {
1420                underlying: _,
1421                journal_data,
1422                timestamp_ms: _,
1423                ..
1424            } => {
1425                // Use pre-serialized journal_data if available (avoids re-serializing the surface)
1426                let data = match journal_data {
1427                    Some(d) => d.clone(),
1428                    None => return, // No journal data = nothing to publish
1429                };
1430                (CommandType::IvUpdate, data)
1431            }
1432            EngineCommand::MarketAction(cmd) => (
1433                CommandType::MarketAction,
1434                hypercall_types::serialize_to_wire_bytes(cmd),
1435            ),
1436            EngineCommand::LiquidationState(msg) => (
1437                CommandType::LiquidationState,
1438                hypercall_types::serialize_to_wire_bytes(msg),
1439            ),
1440            EngineCommand::TierUpdate {
1441                wallet,
1442                margin_mode,
1443                tier,
1444                trading_limits,
1445            } => (
1446                CommandType::TierUpdate,
1447                hypercall_types::serialize_to_wire_bytes(&crate::rsm::apply::TierUpdatePayload {
1448                    wallet: *wallet,
1449                    margin_mode: *margin_mode,
1450                    tier: tier.clone(),
1451                    trading_limits: *trading_limits,
1452                }),
1453            ),
1454            EngineCommand::LegacyTierMarginModeUpdate {
1455                wallet,
1456                margin_mode,
1457            } => (
1458                CommandType::TierUpdate,
1459                hypercall_types::serialize_to_wire_bytes(&TierMarginModeUpdatePayload {
1460                    wallet: *wallet,
1461                    margin_mode: *margin_mode,
1462                }),
1463            ),
1464            EngineCommand::HypercorePositionUpdate {
1465                account,
1466                coin,
1467                size,
1468                entry_price,
1469                unrealized_pnl,
1470                timestamp_ms,
1471            } => (
1472                CommandType::HypercorePositionUpdate,
1473                hypercall_types::serialize_to_wire_bytes(&HypercorePositionUpdatePayload {
1474                    account: account.clone(),
1475                    coin: coin.clone(),
1476                    size: *size,
1477                    entry_price: *entry_price,
1478                    unrealized_pnl: *unrealized_pnl,
1479                    timestamp_ms: *timestamp_ms,
1480                }),
1481            ),
1482            EngineCommand::MmpConfigUpdate {
1483                wallet,
1484                currency,
1485                enabled,
1486                interval_ms,
1487                frozen_time_ms,
1488                qty_limit,
1489                delta_limit,
1490                vega_limit,
1491            } => (
1492                CommandType::MmpConfigUpdate,
1493                hypercall_types::serialize_to_wire_bytes(&MmpConfigUpdatePayload {
1494                    wallet: *wallet,
1495                    currency: currency.clone(),
1496                    enabled: *enabled,
1497                    interval_ms: *interval_ms,
1498                    frozen_time_ms: *frozen_time_ms,
1499                    qty_limit: *qty_limit,
1500                    delta_limit: *delta_limit,
1501                    vega_limit: *vega_limit,
1502                }),
1503            ),
1504            EngineCommand::RfqExecute(cmd) => (
1505                CommandType::RfqExecute,
1506                hypercall_types::serialize_to_wire_bytes(cmd),
1507            ),
1508            EngineCommand::TradingModeUpdate {
1509                modes,
1510                timestamp_ms,
1511            } => (
1512                CommandType::TradingModeUpdate,
1513                hypercall_types::serialize_to_wire_bytes(&TradingModeUpdatePayload {
1514                    modes: modes.clone(),
1515                    timestamp_ms: *timestamp_ms,
1516                }),
1517            ),
1518            EngineCommand::TickExpiry { now_ms, context } => (
1519                CommandType::TickExpiry,
1520                hypercall_types::serialize_to_wire_bytes(&(*now_ms, context)),
1521            ),
1522            EngineCommand::TickSnapshot { .. } => return, // Snapshots are local-only
1523            EngineCommand::DepositUpdate {
1524                wallet,
1525                amount,
1526                timestamp_ms,
1527                sequence,
1528                source_event_hash,
1529            } => (
1530                CommandType::DepositUpdate,
1531                hypercall_types::serialize_to_wire_bytes(&DepositUpdatePayload {
1532                    wallet: *wallet,
1533                    amount: *amount,
1534                    timestamp_ms: *timestamp_ms,
1535                    sequence: sequence.unwrap_or_else(|| {
1536                        panic!("RUNTIME_INVARIANT: DepositUpdate missing durable sequence")
1537                    }),
1538                    source_event_hash: source_event_hash.clone(),
1539                }),
1540            ),
1541            EngineCommand::OptionDepositUpdate {
1542                request_id,
1543                wallet,
1544                symbol,
1545                quantity,
1546                timestamp_ms,
1547            } => (
1548                CommandType::OptionDepositUpdate,
1549                hypercall_types::serialize_to_wire_bytes(&(
1550                    request_id,
1551                    wallet,
1552                    symbol,
1553                    quantity,
1554                    timestamp_ms,
1555                )),
1556            ),
1557            EngineCommand::OptionWithdrawalUpdate {
1558                request_id,
1559                wallet,
1560                account,
1561                signer,
1562                rsm_signer,
1563                symbol,
1564                quantity,
1565                nonce,
1566                action,
1567                timestamp_ms,
1568            } => (
1569                CommandType::OptionWithdrawalUpdate,
1570                hypercall_types::serialize_to_wire_bytes(&(
1571                    request_id,
1572                    wallet,
1573                    account,
1574                    signer,
1575                    rsm_signer,
1576                    symbol,
1577                    quantity,
1578                    nonce,
1579                    action,
1580                    timestamp_ms,
1581                )),
1582            ),
1583            EngineCommand::CashWithdrawalUpdate {
1584                request_id,
1585                wallet,
1586                account,
1587                destination,
1588                signer,
1589                rsm_signer,
1590                amount,
1591                amount_wei,
1592                nonce,
1593                timestamp_ms,
1594            } => (
1595                CommandType::CashWithdrawalUpdate,
1596                hypercall_types::serialize_to_wire_bytes(&(
1597                    request_id,
1598                    wallet,
1599                    account,
1600                    destination,
1601                    signer,
1602                    rsm_signer,
1603                    amount,
1604                    amount_wei,
1605                    nonce,
1606                    timestamp_ms,
1607                )),
1608            ),
1609            EngineCommand::LiquidationBonusUpdate {
1610                wallet,
1611                amount,
1612                balance_after,
1613                timestamp_ms,
1614                sequence,
1615            } => (
1616                CommandType::LiquidationBonusUpdate,
1617                hypercall_types::serialize_to_wire_bytes(&BalanceCommandPayload {
1618                    wallet: *wallet,
1619                    amount: *amount,
1620                    balance_after: *balance_after,
1621                    timestamp_ms: *timestamp_ms,
1622                    sequence: *sequence,
1623                }),
1624            ),
1625            EngineCommand::ApproveAgent {
1626                wallet,
1627                agent,
1628                expires_at_ms,
1629                nonce,
1630                timestamp_ms,
1631            } => (
1632                CommandType::ApproveAgent,
1633                hypercall_types::serialize_to_wire_bytes(&ApproveAgentPayload {
1634                    wallet: *wallet,
1635                    agent: *agent,
1636                    expires_at_ms: *expires_at_ms,
1637                    nonce: *nonce,
1638                    timestamp_ms: *timestamp_ms,
1639                }),
1640            ),
1641            EngineCommand::RevokeAgent {
1642                wallet,
1643                agent,
1644                nonce,
1645                timestamp_ms,
1646            } => (
1647                CommandType::RevokeAgent,
1648                hypercall_types::serialize_to_wire_bytes(&RevokeAgentPayload {
1649                    wallet: *wallet,
1650                    agent: *agent,
1651                    nonce: *nonce,
1652                    timestamp_ms: *timestamp_ms,
1653                }),
1654            ),
1655            EngineCommand::NonceAdvance { wallet, nonce, .. } => (
1656                CommandType::NonceAdvance,
1657                hypercall_types::serialize_to_wire_bytes(&NonceAdvancePayload {
1658                    wallet: *wallet,
1659                    nonce: *nonce,
1660                    timestamp_ms: env.received_ts_ms,
1661                }),
1662            ),
1663            EngineCommand::HypercoreEquityUpdate {
1664                wallet,
1665                account_value,
1666                timestamp_ms,
1667            } => (
1668                CommandType::HypercoreEquityUpdate,
1669                hypercall_types::serialize_to_wire_bytes(&HypercoreEquityUpdatePayload {
1670                    wallet: *wallet,
1671                    account_value: *account_value,
1672                    timestamp_ms: *timestamp_ms,
1673                }),
1674            ),
1675            EngineCommand::SetPmSettlementPoolConfig(command) => (
1676                CommandType::SetPmSettlementPoolConfig,
1677                hypercall_types::serialize_to_wire_bytes(command),
1678            ),
1679            EngineCommand::RecordPmVaultDeposit(command) => (
1680                CommandType::RecordPmVaultDeposit,
1681                hypercall_types::serialize_to_wire_bytes(command),
1682            ),
1683            EngineCommand::RequestPmVaultWithdrawal(command) => (
1684                CommandType::RequestPmVaultWithdrawal,
1685                hypercall_types::serialize_to_wire_bytes(command),
1686            ),
1687            EngineCommand::AccruePmSettlementInterest(command) => (
1688                CommandType::AccruePmSettlementInterest,
1689                hypercall_types::serialize_to_wire_bytes(command),
1690            ),
1691            EngineCommand::ApplyPmSettlementRepayment(command) => (
1692                CommandType::ApplyPmSettlementRepayment,
1693                hypercall_types::serialize_to_wire_bytes(command),
1694            ),
1695            EngineCommand::JournalPmRecoveryPlan(command) => (
1696                CommandType::JournalPmRecoveryPlan,
1697                hypercall_types::serialize_to_wire_bytes(command),
1698            ),
1699            EngineCommand::MarkPmRecoveryActionSubmitted(command) => (
1700                CommandType::MarkPmRecoveryActionSubmitted,
1701                hypercall_types::serialize_to_wire_bytes(command),
1702            ),
1703            EngineCommand::ResolvePmRecoveryAction(command) => (
1704                CommandType::ResolvePmRecoveryAction,
1705                hypercall_types::serialize_to_wire_bytes(command),
1706            ),
1707        };
1708
1709        let is_expiry_payload = matches!(cmd_type, CommandType::TickExpiry)
1710            || matches!(
1711                &env.command,
1712                EngineCommand::MarketAction(command) if command.expiry_context.is_some()
1713            );
1714        if is_expiry_payload {
1715            let total_payload_len = crate::nats::COMMAND_PAYLOAD_PREFIX_LEN + data.len();
1716            if total_payload_len > MAX_EXPIRY_NATS_PAYLOAD_BYTES {
1717                panic!(
1718                    "CRITICAL_FAILURE: expiry NATS payload is {} bytes, exceeding {} byte limit. \
1719                     Refusing to publish an oversized replication command.",
1720                    total_payload_len, MAX_EXPIRY_NATS_PAYLOAD_BYTES
1721                );
1722            }
1723        }
1724
1725        publisher.publish(cmd_type, &data).await;
1726    }
1727
1728    pub(crate) async fn publish_balance_updates_to_nats(
1729        &self,
1730        updates: &[hypercall_types::BalanceUpdate],
1731    ) {
1732        let Some(ref publisher) = self.nats_balance_update_publisher else {
1733            return;
1734        };
1735        for update in updates {
1736            publisher.publish(update).await;
1737        }
1738    }
1739
1740    /// Set the mark price oracles for spot/forward price lookups.
1741    ///
1742    /// Maps underlying symbol (e.g., "BTC", "ETH") to oracle instance.
1743    /// This is the canonical source for spot prices in margin calculations.
1744    pub fn set_mark_price_oracles(
1745        &mut self,
1746        oracles: HashMap<String, Arc<HyperliquidMarkPriceOracle>>,
1747    ) {
1748        info!(
1749            "Setting mark price oracles for UnifiedEngine: {:?}",
1750            oracles.keys().collect::<Vec<_>>()
1751        );
1752        self.ctx.deps.mark_price_oracles = oracles;
1753    }
1754
1755    /// Set the MMP cache for market maker protection
1756    pub fn set_mmp_cache(&mut self, cache: Arc<crate::read_cache::mmp::MmpCache>) {
1757        info!("Setting MMP cache for UnifiedEngine");
1758        self.ctx.deps.mmp_cache = Some(cache);
1759    }
1760
1761    /// Set the Tier cache for position limit checks.
1762    ///
1763    /// Also seeds wallet_margin_modes from the cache so the engine fails closed
1764    /// when a wallet's mode is missing rather than consulting a default.
1765    pub fn set_tier_cache(&mut self, cache: Arc<crate::read_cache::tier::TierCache>) {
1766        let modes = cache.get_all_margin_modes_sync().expect(
1767            "CRITICAL_FAILURE: tier cache unavailable during engine seed -- margin modes unavailable",
1768        );
1769        let default_limits = cache.default_trading_limits();
1770        let limits = cache
1771            .get_all_trading_limits_sync()
1772            .expect("CRITICAL_FAILURE: tier cache lock contention during engine seed -- trading limits unavailable");
1773        let tiers = cache.get_all_tiers_sync().expect(
1774            "CRITICAL_FAILURE: tier cache lock contention during engine seed -- tiers unavailable",
1775        );
1776        let count = modes.len();
1777        self.ctx.deps.wallet_margin_modes = modes;
1778        self.ctx.deps.wallet_trading_limits = limits;
1779        self.ctx.deps.default_trading_limits = default_limits;
1780        self.ctx.deps.wallet_tiers = tiers;
1781        info!(
1782            "Setting Tier cache for UnifiedEngine, seeded {} wallet tiers",
1783            count
1784        );
1785        self.ctx.deps.tier_cache = Some(cache);
1786    }
1787
1788    /// Set the direct WS event sender (low-latency WS delivery).
1789    pub fn set_ws_event_sender(
1790        &mut self,
1791        sender: tokio::sync::mpsc::UnboundedSender<hypercall_types::EngineMessage>,
1792    ) {
1793        self.ctx.deps.ws_event_sender = Some(sender);
1794    }
1795
1796    /// Set the RFQ execution receiver channel.
1797    pub fn set_rfq_receiver(
1798        &mut self,
1799        receiver: mpsc::Receiver<hypercall_runtime_api::RfqExecuteRequest>,
1800    ) {
1801        self.rfq_receiver = Some(receiver);
1802    }
1803
1804    /// Set the deposit receiver channel (faucet/admin deposits).
1805    pub fn set_deposit_receiver(&mut self, receiver: mpsc::Receiver<DepositRequest>) {
1806        self.deposit_receiver = Some(receiver);
1807    }
1808
1809    /// Set the option deposit receiver channel.
1810    pub fn set_option_deposit_receiver(&mut self, receiver: mpsc::Receiver<OptionDepositRequest>) {
1811        self.option_deposit_receiver = Some(receiver);
1812    }
1813
1814    pub fn set_option_withdrawal_receiver(
1815        &mut self,
1816        receiver: mpsc::Receiver<OptionWithdrawalRequest>,
1817    ) {
1818        self.option_withdrawal_receiver = Some(receiver);
1819    }
1820
1821    /// Set the cash withdrawal receiver channel.
1822    pub fn set_cash_withdrawal_receiver(
1823        &mut self,
1824        receiver: mpsc::Receiver<CashWithdrawalRequest>,
1825    ) {
1826        self.cash_withdrawal_receiver = Some(receiver);
1827    }
1828
1829    /// Set the liquidation bonus receiver channel.
1830    pub fn set_liquidation_bonus_receiver(
1831        &mut self,
1832        receiver: mpsc::Receiver<LiquidationBonusRequest>,
1833    ) {
1834        self.liquidation_bonus_receiver = Some(receiver);
1835    }
1836
1837    /// Set the tier update receiver channel.
1838    pub fn set_tier_update_receiver(&mut self, receiver: mpsc::Receiver<TierUpdateRequest>) {
1839        self.tier_update_receiver = Some(receiver);
1840    }
1841
1842    /// Set the HyperCore equity update receiver channel.
1843    pub fn set_hypercore_equity_receiver(
1844        &mut self,
1845        receiver: mpsc::Receiver<HypercoreEquityRequest>,
1846    ) {
1847        self.hypercore_equity_receiver = Some(receiver);
1848    }
1849
1850    /// Attach a `tokio::sync::watch::Receiver` that the engine main
1851    /// loop will poll for per-underlying `trading_mode` updates. See
1852    /// `apply_underlying_trading_mode_update` for the handler
1853    /// semantics. Optional — call sites that don't need live catalog
1854    /// notifications can omit this and the engine behaves exactly as
1855    /// before (modes only refresh on recovery / CreateMarket).
1856    pub fn set_trading_mode_receiver(
1857        &mut self,
1858        receiver: tokio::sync::watch::Receiver<
1859            std::collections::HashMap<String, hypercall_types::TradingModes>,
1860        >,
1861    ) {
1862        self.trading_mode_receiver = Some(receiver);
1863    }
1864
1865    /// Get the sync status for readiness checks.
1866    ///
1867    /// API endpoints should check this before serving requests that depend on
1868    /// engine state (e.g., order placement).
1869    pub fn sync_status(&self) -> Arc<SyncStatus> {
1870        self.sync_status.clone()
1871    }
1872}
1873
1874impl UnifiedEngine {
1875    fn rebuild_shared_engine_iv_surfaces(&self) {
1876        let mut shared = self
1877            .engine_iv_surfaces
1878            .write()
1879            .expect("engine IV surfaces lock poisoned");
1880        *shared = self.ctx.iv_surfaces.clone();
1881    }
1882
1883    /// Get settlement price from oracle for a specific expiry.
1884    pub async fn get_settlement_price(&self, underlying: &str, expiry_ts: i64) -> Option<Decimal> {
1885        self.expiry_manager
1886            .get_settlement_price(&self.ctx.deps, underlying, expiry_ts)
1887            .await
1888    }
1889
1890    /// Set reference price for testing.
1891    ///
1892    /// Updates the canonical `deps.reference_prices` map used by margin checks
1893    /// and settlement price lookups.
1894    #[cfg(any(test, feature = "test-utils"))]
1895    pub fn set_reference_price(&mut self, underlying: String, price: Decimal) {
1896        let price_f64 = price
1897            .to_f64()
1898            .expect("set_reference_price: invalid Decimal -> f64 conversion");
1899        self.ctx.deps.reference_prices.insert(underlying, price_f64);
1900    }
1901
1902    /// Set test cash for an account.
1903    ///
1904    /// This is a TEST-ONLY function for funding accounts in unit/integration tests.
1905    /// It seeds balance_ledger, which is authoritative for margin checks.
1906    #[cfg(any(test, feature = "test-utils"))]
1907    pub fn set_account_cash(&mut self, wallet: &WalletAddress, cash: f64) {
1908        let cash_decimal = Decimal::from_f64_retain(cash)
1909            .expect("set_account_cash: invalid f64 -> Decimal conversion");
1910
1911        self.ctx.balance_ledger.set(*wallet, cash_decimal);
1912    }
1913
1914    /// Expire a specific instrument and settle all positions.
1915    pub async fn expire_instrument(
1916        &mut self,
1917        symbol: &str,
1918        reference_price: Decimal,
1919        now_ms: u64,
1920    ) -> Result<(), String> {
1921        self.expiry_manager
1922            .expire_instrument(symbol, reference_price, now_ms, &mut self.ctx)
1923            .await
1924    }
1925
1926    // ===== Test Helper Methods =====
1927    // These methods are public to support test utilities but should not be used in production code
1928
1929    /// Transition instruments to EXPIRED_PENDING_PRICE status (test helper).
1930    pub fn transition_to_pending_settlement(&mut self, symbols: &[String], now_ms: u64) {
1931        self.expiry_manager
1932            .transition_to_pending_settlement(symbols, now_ms, &mut self.ctx);
1933    }
1934
1935    /// Get all instruments pending settlement (test helper).
1936    pub fn get_pending_settlement_instruments(&self) -> Vec<(String, i64, Vec<String>)> {
1937        self.expiry_manager
1938            .get_pending_settlement_instruments(self.ctx.db.as_ref())
1939    }
1940
1941    /// Check if orderbooks contains a key (for test utilities)
1942    #[cfg(any(test, feature = "test-utils"))]
1943    pub fn orderbooks_contains_key(&self, symbol: &str) -> bool {
1944        self.ctx.orderbooks.contains_key(symbol)
1945    }
1946
1947    /// Get a clone of the event sender (for test utilities)
1948    #[cfg(any(test, feature = "test-utils"))]
1949    pub fn get_event_sender(&self) -> Option<mpsc::UnboundedSender<EngineMessage>> {
1950        Some(self.ctx.deps.event_sender.clone())
1951    }
1952
1953    /// Get L2 snapshots of all orderbooks.
1954    /// Returns a map of symbol → (bids, asks) where each side is Vec<(price, size)>.
1955    /// Used for cache reconciliation after unclean restart (SIGKILL).
1956    pub fn get_all_book_snapshots(
1957        &self,
1958    ) -> HashMap<
1959        String,
1960        (
1961            Vec<(rust_decimal::Decimal, rust_decimal::Decimal)>,
1962            Vec<(rust_decimal::Decimal, rust_decimal::Decimal)>,
1963        ),
1964    > {
1965        self.ctx
1966            .orderbooks
1967            .iter()
1968            .map(|(symbol, book)| (symbol.clone(), book.get_orderbook_snapshot()))
1969            .collect()
1970    }
1971
1972    /// Insert an orderbook directly (for test utilities)
1973    #[cfg(any(test, feature = "test-utils"))]
1974    pub fn insert_orderbook(&mut self, symbol: String, orderbook: OrderBook) {
1975        self.ctx.orderbooks.insert(symbol, orderbook);
1976    }
1977
1978    /// Track expiry for a symbol (for test utilities)
1979    #[cfg(any(test, feature = "test-utils"))]
1980    pub fn track_expiry(&mut self, symbol: String, expiry: u64) {
1981        let underlying = symbol.split('-').next().unwrap_or_default().to_string();
1982        let expiry_timestamp =
1983            crate::rsm::margin_manager::expiry_date_to_timestamp(&underlying, expiry);
1984        self.expiry_manager
1985            .schedule_expiry(symbol, expiry_timestamp);
1986    }
1987
1988    /// Persist an orderbook to database if handler is available (for test utilities)
1989    #[cfg(any(test, feature = "test-utils"))]
1990    pub fn persist_test_orderbook(&self, symbol: &str, parsed_symbol: &ParsedSymbol) {
1991        if let Some(ref handler) = self.ctx.db {
1992            let option_type = match parsed_symbol.option_type {
1993                crate::types::OptionType::Call => hypercall_types::OptionType::Call,
1994                crate::types::OptionType::Put => hypercall_types::OptionType::Put,
1995            };
1996            let option_token_address =
1997                match crate::shared::option_token_address::derive_option_token_address(
1998                    &parsed_symbol.underlying,
1999                    parsed_symbol.expiry,
2000                    parsed_symbol.strike,
2001                    option_type.as_db_str(),
2002                ) {
2003                    Ok(option_token_address) => option_token_address,
2004                    Err(e) => {
2005                        error!(
2006                            symbol = symbol,
2007                            error = %e,
2008                            "Failed to derive option token address for persisted test orderbook"
2009                        );
2010                        return;
2011                    }
2012                };
2013
2014            // Create new instrument record
2015            let new_instrument = hypercall_db::InstrumentRecord {
2016                instrument_numeric_id: 0,
2017                id: symbol.to_string(),
2018                underlying: parsed_symbol.underlying.clone(),
2019                strike: parsed_symbol.strike,
2020                expiry: parsed_symbol.expiry as i64,
2021                option_type,
2022                option_token_address: Some(option_token_address),
2023                status: hypercall_types::api_models::InstrumentStatus::Active,
2024                trading_mode: "orderbook".to_string(),
2025            };
2026
2027            // Save to database synchronously
2028            if let Err(e) = handler.save_market_and_instrument_sync(
2029                &parsed_symbol.underlying,
2030                parsed_symbol.expiry as i64,
2031                &new_instrument,
2032            ) {
2033                error!("Failed to persist test orderbook to database: {}", e);
2034                // Don't fail - orderbook is already created in memory
2035            } else {
2036                info!("Persisted test orderbook {} to database", symbol);
2037            }
2038        }
2039    }
2040}