1use super::bounded_idempotence_cache::BoundedIdempotenceCache;
2use super::engine_snapshot::EngineSnapshot;
3use crate::engine_enums_ext::EventTypeExt;
4use crate::portfolio::PortfolioService;
5use crate::price_oracle::hyperliquid_oracle::HyperliquidMarkPriceOracle;
6use crate::read_cache::greeks::GreeksCache;
7use crate::read_cache::portfolio::PortfolioCache;
8use crate::rsm::ledger::BalanceLedger;
9use crate::rsm::liquidation_manager::LiquidationManager;
10use crate::shared::order_types::{get_timestamp_millis, ParsedSymbol};
11use crate::shared::traits::MarkPriceOracle;
12use crate::snapshot::SyncStatus;
13use crate::standard_margin::StandardAccountBuilder;
14use crate::types::Config;
15use arc_swap::ArcSwap;
16use hypercall_db::{InstrumentReader, InstrumentWriter, JournalReplayReader, OrderReader};
17use hypercall_db_diesel::DatabaseHandler;
18use hypercall_engine::{FeeService, MatchResult, OrderBook};
19use hypercall_journal::checkpoint::WalCheckpointMetadata;
20use hypercall_types::Side;
21use hypercall_types::{to_human_readable_decimal, WalletAddress};
22use hypercall_types::{
23 EngineMessage, Fill, MarketAction, MarketActionMessage, MarketUpdateMessage,
24 MarketUpdateStatus, OptionType as MessageOptionType, OrderAction, OrderActionMessage,
25 OrderInfo, OrderUpdateMessage, OrderUpdateStatus, TimeInForce,
26};
27use metrics::{counter, gauge, histogram};
28use rust_decimal::prelude::ToPrimitive;
29use rust_decimal::Decimal;
30use rust_decimal_macros::dec;
31use std::collections::HashMap;
32use std::path::PathBuf;
33use std::sync::atomic::{AtomicI64, Ordering};
34use std::sync::Arc;
35use std::time::{Duration, Instant};
36use tokio::select;
37use tokio::sync::{broadcast, mpsc};
38use tracing::{debug, error, info, info_span, warn, Instrument};
39
40use hypercall_recovery::RestartStateComponent;
41pub use hypercall_runtime_api::{
42 decrement_pending_requests, get_pending_requests, increment_pending_requests, AgentAuthRequest,
43 CashWithdrawalApplyReceipt, CashWithdrawalRequest, DepositRequest, EngineQuiesceAction,
44 EngineQuiesceReport, EngineQuiesceRequest, HypercoreEquityRequest, LiquidationBonusRequest,
45 MarginModeUpdateRequest, MarketRequest, NonceCheckRequest, OptionDepositRequest,
46 OptionWithdrawalApplyReceipt, OptionWithdrawalRequest, TierUpdateRequest, UnifiedEngineRequest,
47};
48
49#[cfg(feature = "otel-tracing")]
50use tracing_opentelemetry::OpenTelemetrySpanExt;
51
52mod apply_interface;
53mod builder;
54mod fill_metadata;
55mod journaling;
56mod markets;
57mod matching;
58mod order_routing;
59pub(crate) mod recovery;
60mod rfq_handler;
61mod runtime;
62#[cfg(test)]
63mod tests;
64mod traits;
65
66pub use apply_interface::EngineError;
67pub use builder::{UnifiedEngineBuilder, UnifiedEngineBuilderResult};
68pub(crate) use traits::{DurableJournaling, OrderAdmission, OrderExecution, ReplayRecovery};
69
70const MAX_EXPIRY_NATS_PAYLOAD_BYTES: usize = 1024 * 1024;
71const ENGINE_REQUIRE_SNAPSHOT_RESTORE_ENV: &str = "ENGINE_REQUIRE_SNAPSHOT_RESTORE";
72const UNSAFE_SNAPSHOT_REPLAY_OVERRIDE_ENV: &str = "ENGINE_SNAPSHOT_RESTORE_ALLOW_UNSAFE_REPLAY";
73const SNAPSHOT_CASH_VALIDATION_OVERRIDE_ENV: &str = "ENGINE_SNAPSHOT_RESTORE_ALLOW_CASH_DIVERGENCE";
74#[cfg(test)]
75const DURABLE_CASH_EVIDENCE_TOLERANCE: Decimal = dec!(0.00000001);
76
77const DEFAULT_ORDER_BUFFER_SIZE: usize = 1000;
79
80const MARKET_REQUEST_BUFFER_SIZE: usize = 100;
82
83const TWAP_WINDOW_SECONDS: u32 = 1800;
85
86const IDEMPOTENCY_CACHE_LOOKBACK_HOURS: i64 = 24;
88
89#[cfg(test)]
90#[derive(Clone)]
91pub(super) struct DurableCashEvidence {
92 pub(super) wallet: WalletAddress,
93 pub(super) ledger_balance: Decimal,
94 pub(super) margin_mode: Option<String>,
95}
96
97#[cfg(test)]
98pub(super) fn validate_snapshot_cash_evidence(
99 snapshot: &crate::rsm::engine_state_snapshot::EngineStateSnapshot,
100 rows: impl IntoIterator<Item = DurableCashEvidence>,
101) -> Result<(), String> {
102 let mut durable_by_wallet: HashMap<WalletAddress, (Decimal, Option<String>)> = HashMap::new();
103 for row in rows {
104 durable_by_wallet.insert(row.wallet, (row.ledger_balance, row.margin_mode));
105 }
106
107 let mut failures = Vec::new();
108 let snapshot_modes_are_authoritative = !snapshot.wallet_margin_modes.is_empty();
109 for (wallet, (ledger_balance, margin_mode)) in &durable_by_wallet {
110 let db_portfolio = margin_mode.as_deref() == Some("portfolio");
111 let snapshot_portfolio = matches!(
112 snapshot.wallet_margin_modes.get(wallet),
113 Some(crate::rsm::margin_mode::MarginMode::Portfolio)
114 );
115 if snapshot_portfolio || (!snapshot_modes_are_authoritative && db_portfolio) {
116 continue;
117 }
118
119 let snapshot_balance = snapshot
120 .balance_ledger
121 .get(wallet)
122 .copied()
123 .unwrap_or(Decimal::ZERO);
124
125 let snapshot_durable_diff = snapshot_balance - *ledger_balance;
126 if snapshot_durable_diff > DURABLE_CASH_EVIDENCE_TOLERANCE
127 || snapshot_durable_diff < -DURABLE_CASH_EVIDENCE_TOLERANCE
128 {
129 failures.push(format!(
130 "{} snapshot_cash={} durable_cash={}",
131 wallet, snapshot_balance, ledger_balance
132 ));
133 }
134 }
135
136 for (wallet, snapshot_balance) in &snapshot.balance_ledger {
137 if *snapshot_balance == Decimal::ZERO || durable_by_wallet.contains_key(wallet) {
138 continue;
139 }
140 if matches!(
141 snapshot.wallet_margin_modes.get(wallet),
142 Some(crate::rsm::margin_mode::MarginMode::Portfolio)
143 ) {
144 continue;
145 }
146 failures.push(format!(
147 "{} snapshot_cash={} missing durable cash evidence",
148 wallet, snapshot_balance
149 ));
150 }
151
152 if failures.is_empty() {
153 Ok(())
154 } else {
155 let sample = failures.into_iter().take(10).collect::<Vec<_>>().join("; ");
156 Err(format!(
157 "engine snapshot cash validation failed for standard-margin wallets: {sample}"
158 ))
159 }
160}
161
162#[derive(Clone, Copy, Debug, Eq, PartialEq)]
163pub(super) enum CashRestartAuthority {
164 EmptyState,
165 SnapshotAtCheckpoint {
166 command_id: i64,
167 },
168 SnapshotPlusReplay {
169 snapshot_command_id: i64,
170 replay_target_command_id: i64,
171 },
172 HotJournalBaseReplay {
173 replay_target_command_id: i64,
174 },
175}
176
177impl CashRestartAuthority {
178 #[cfg(test)]
179 pub(super) fn durable_cash_validation_command_id(self) -> Option<i64> {
180 match self {
181 Self::SnapshotAtCheckpoint { command_id } => Some(command_id),
182 Self::EmptyState
183 | Self::SnapshotPlusReplay { .. }
184 | Self::HotJournalBaseReplay { .. } => None,
185 }
186 }
187}
188
189pub(super) fn cash_restart_authority_for_startup(
190 snapshot_command_id: Option<i64>,
191 checkpoint_command_id: i64,
192) -> Result<CashRestartAuthority, String> {
193 match (snapshot_command_id, checkpoint_command_id) {
194 (Some(snapshot_command_id), checkpoint_command_id)
195 if snapshot_command_id > checkpoint_command_id =>
196 {
197 Err(format!(
198 "engine snapshot last_command_id {snapshot_command_id} is ahead of WAL checkpoint last_command_id {checkpoint_command_id}"
199 ))
200 }
201 (Some(command_id), checkpoint_command_id) if command_id == checkpoint_command_id => {
202 Ok(CashRestartAuthority::SnapshotAtCheckpoint { command_id })
203 }
204 (Some(snapshot_command_id), replay_target_command_id) => {
205 Ok(CashRestartAuthority::SnapshotPlusReplay {
206 snapshot_command_id,
207 replay_target_command_id,
208 })
209 }
210 (None, checkpoint_command_id) if checkpoint_command_id > 0 => {
211 Ok(CashRestartAuthority::HotJournalBaseReplay {
212 replay_target_command_id: checkpoint_command_id,
213 })
214 }
215 (None, _) => Ok(CashRestartAuthority::EmptyState),
216 }
217}
218
219#[cfg(test)]
220pub(super) fn snapshot_cash_validation_boundary_matches(
221 snapshot_command_id: i64,
222 checkpoint_command_id: i64,
223) -> bool {
224 matches!(
225 cash_restart_authority_for_startup(Some(snapshot_command_id), checkpoint_command_id),
226 Ok(CashRestartAuthority::SnapshotAtCheckpoint { .. })
227 )
228}
229
230#[cfg(test)]
231pub(super) fn should_validate_snapshot_cash_against_durable_accounting(
232 snapshot_command_id: i64,
233 checkpoint_command_id: i64,
234 replay_bounds: Option<hypercall_db::JournalCommandIdBounds>,
235) -> bool {
236 if snapshot_command_id != checkpoint_command_id {
237 return false;
238 }
239
240 match replay_bounds {
241 Some(bounds) => bounds.max_command_id <= checkpoint_command_id,
242 None => true,
243 }
244}
245
246pub(super) fn validate_hot_journal_covers_base_replay(
247 checkpoint_command_id: i64,
248 bounds: Option<hypercall_db::JournalCommandIdBounds>,
249 replay_from_command_id: i64,
250 non_replayable_tail_count: Option<i64>,
251) -> Result<(), String> {
252 if replay_from_command_id < 1 {
253 return Err(format!(
254 "invalid replay window start command_id {replay_from_command_id}; command ids must be >=1"
255 ));
256 }
257
258 if checkpoint_command_id <= 0 {
259 if let Some(bounds) = bounds {
260 if bounds.min_command_id != replay_from_command_id {
261 return Err(format!(
262 "hot journal starts at command_id {}, but zero-checkpoint startup with existing engine_commands rows requires replay from command_id {}",
263 bounds.min_command_id, replay_from_command_id
264 ));
265 }
266 }
267 return Ok(());
268 }
269
270 let Some(bounds) = bounds else {
271 if replay_from_command_id > checkpoint_command_id {
272 return Ok(());
276 }
277 validate_non_replayable_tail(
278 replay_from_command_id.saturating_sub(1),
279 checkpoint_command_id,
280 non_replayable_tail_count,
281 )?;
282 return Ok(());
283 };
284
285 if bounds.max_command_id < replay_from_command_id {
286 validate_non_replayable_tail(
287 replay_from_command_id.saturating_sub(1),
288 checkpoint_command_id,
289 non_replayable_tail_count,
290 )?;
291 return Ok(());
292 }
293
294 if bounds.min_command_id > replay_from_command_id {
295 return Err(format!(
296 "hot journal starts at command_id {}, but base reconstruction for checkpoint command_id {} requires replay from command_id {} or earlier",
297 bounds.min_command_id, checkpoint_command_id, replay_from_command_id
298 ));
299 }
300
301 validate_non_replayable_tail(
302 bounds.max_command_id,
303 checkpoint_command_id,
304 non_replayable_tail_count,
305 )?;
306
307 Ok(())
308}
309
310fn validate_non_replayable_tail(
311 replayable_tail_command_id: i64,
312 checkpoint_command_id: i64,
313 non_replayable_tail_count: Option<i64>,
314) -> Result<(), String> {
315 if replayable_tail_command_id >= checkpoint_command_id {
316 return Ok(());
317 }
318
319 let expected_tail_count = checkpoint_command_id - replayable_tail_command_id;
320 let Some(non_replayable_tail_count) = non_replayable_tail_count else {
321 return Err(format!(
322 "hot journal replayable commands end at command_id {}, before checkpoint command_id {}, and non-replayable tail coverage was not checked",
323 replayable_tail_command_id, checkpoint_command_id
324 ));
325 };
326
327 if non_replayable_tail_count != expected_tail_count {
328 return Err(format!(
329 "hot journal replayable commands end at command_id {}, before checkpoint command_id {}, and only {} of {} tail commands are proven non-replayable",
330 replayable_tail_command_id,
331 checkpoint_command_id,
332 non_replayable_tail_count,
333 expected_tail_count
334 ));
335 }
336
337 Ok(())
338}
339
340fn non_replayable_tail_count_for_coverage_check(
341 db: &impl hypercall_db::JournalReplayReader,
342 checkpoint_command_id: i64,
343 bounds: Option<hypercall_db::JournalCommandIdBounds>,
344 replay_from_command_id: i64,
345) -> Result<Option<i64>, String> {
346 if checkpoint_command_id <= 0 {
347 return Ok(None);
348 }
349
350 let replayable_tail_command_id = match bounds {
351 Some(bounds) if bounds.max_command_id >= replay_from_command_id => bounds.max_command_id,
352 _ => replay_from_command_id.saturating_sub(1),
353 };
354 if replayable_tail_command_id >= checkpoint_command_id {
355 return Ok(None);
356 }
357
358 db.count_non_replayable_commands_in_range_sync(
359 replayable_tail_command_id,
360 checkpoint_command_id,
361 )
362 .map(Some)
363 .map_err(|error| {
364 format!(
365 "failed to count non-replayable hot journal tail rows from command_id {} through {}: {error}",
366 replayable_tail_command_id + 1,
367 checkpoint_command_id
368 )
369 })
370}
371
372pub(super) fn should_validate_hot_journal_coverage_for_base_replay(
373 db_present: bool,
374 startup_checkpoint_command_id: i64,
375 wal_path_configured: bool,
376 snapshot_loaded: bool,
377 replay_checkpoint_command_id: i64,
378) -> bool {
379 if !db_present || !wal_path_configured {
380 return false;
381 }
382
383 !snapshot_loaded || startup_checkpoint_command_id >= replay_checkpoint_command_id
384}
385
386pub(super) fn post_replay_cash_validation_checkpoint(
387 db_present: bool,
388 wal_path_configured: bool,
389 startup_checkpoint: WalCheckpointMetadata,
390 journal_bounds: Option<hypercall_db::JournalCommandIdBounds>,
391) -> Option<WalCheckpointMetadata> {
392 if !db_present || !wal_path_configured {
393 return None;
394 }
395
396 if startup_checkpoint.last_command_id > 0 {
397 return Some(startup_checkpoint);
398 }
399
400 journal_bounds
401 .filter(|bounds| bounds.min_command_id == 1)
402 .map(|bounds| WalCheckpointMetadata {
403 last_command_id: bounds.max_command_id,
404 ..startup_checkpoint
405 })
406}
407
408fn env_flag_enabled(name: &str) -> bool {
409 std::env::var(name).ok().is_some_and(|value| {
410 matches!(
411 value.trim(),
412 "1" | "true" | "TRUE" | "yes" | "YES" | "on" | "ON"
413 )
414 })
415}
416
417pub struct UnifiedEngine {
425 pub(crate) ctx: crate::rsm::engine_deps::EngineCtx,
427
428 margin_manager: crate::rsm::margin_manager::MarginManager,
431
432 expiry_manager: crate::rsm::expiry_manager::ExpiryManager,
434
435 order_receiver: mpsc::Receiver<UnifiedEngineRequest>,
438 market_receiver: mpsc::Receiver<MarketRequest>,
439 pub(crate) rfq_receiver: Option<mpsc::Receiver<hypercall_runtime_api::RfqExecuteRequest>>,
440 pub(crate) deposit_receiver: Option<mpsc::Receiver<DepositRequest>>,
441 pub(crate) option_deposit_receiver: Option<mpsc::Receiver<OptionDepositRequest>>,
442 pub(crate) option_withdrawal_receiver: Option<mpsc::Receiver<OptionWithdrawalRequest>>,
443 pub(crate) cash_withdrawal_receiver: Option<mpsc::Receiver<CashWithdrawalRequest>>,
444 pub(crate) liquidation_bonus_receiver: Option<mpsc::Receiver<LiquidationBonusRequest>>,
445 pub(crate) margin_mode_receiver: Option<mpsc::Receiver<MarginModeUpdateRequest>>,
446 pub(crate) agent_auth_receiver: Option<mpsc::Receiver<AgentAuthRequest>>,
447 pub(crate) nonce_check_receiver: Option<mpsc::Receiver<NonceCheckRequest>>,
448 pub(crate) tier_update_receiver: Option<mpsc::Receiver<TierUpdateRequest>>,
449 pub(crate) hypercore_equity_receiver: Option<mpsc::Receiver<HypercoreEquityRequest>>,
450 pub(crate) pm_settlement_admin_receiver:
451 Option<mpsc::Receiver<hypercall_runtime_api::PmSettlementAdminRequest>>,
452 pub(crate) quiesce_receiver: Option<mpsc::Receiver<EngineQuiesceRequest>>,
453 pub(crate) trading_mode_receiver: Option<
460 tokio::sync::watch::Receiver<
461 std::collections::HashMap<String, hypercall_types::TradingModes>,
462 >,
463 >,
464 shutdown_receiver: broadcast::Receiver<()>,
465
466 fee_service: FeeService,
468
469 snapshot_interval: Duration,
471 read_snapshot_interval: Duration,
472 post_startup_reconcile_delay: Duration,
473 last_snapshot: Instant,
474 wal_path: Option<PathBuf>,
475
476 sync_status: Arc<SyncStatus>,
478
479 journal_writer: Option<crate::journal::SharedEngineJournalWriter>,
481
482 journal_batch_sender: Option<crate::journal::JournalBatchSender>,
484
485 nats_publisher: Option<crate::nats::NatsPublisher>,
487 nats_balance_update_publisher: Option<crate::nats::NatsBalanceUpdatePublisher>,
489
490 known_request_ids: BoundedIdempotenceCache,
492
493 read_snapshot: Arc<ArcSwap<EngineSnapshot>>,
495
496 post_startup_reconciled: bool,
501
502 startup_replayed_events: Vec<(String, Vec<EngineMessage>)>,
507
508 replay_checkpoint: WalCheckpointMetadata,
510
511 runtime_quiesced: bool,
514
515 snapshot_loaded: bool,
518
519 startup_expired_order_cancels: Vec<recovery::StartupExpiredOrderCancel>,
521
522 external_vol_oracle: Option<crate::vol_oracle::SharedVolOracle>,
526
527 engine_iv_surfaces: Arc<
530 std::sync::RwLock<HashMap<String, crate::vol_oracle::vol_surface_cache::VolatilitySurface>>,
531 >,
532}
533
534pub(crate) struct AllocatedOrder {
538 order_id: u64,
539}
540
541pub(crate) struct MatchingResult {
543 fills: Vec<hypercall_types::Fill>,
544 filled_size: Decimal,
545 mmp_triggered: bool,
546 self_trade_maker_id: Option<u64>,
547}
548
549#[derive(Debug, Clone)]
550pub struct EngineRuntimeSettings {
551 pub snapshot_interval: Duration,
552 pub read_snapshot_interval: Duration,
553 pub post_startup_reconcile_delay: Duration,
554 pub wal_path: Option<PathBuf>,
555 pub start_quiesced: bool,
556 pub recovery_safety_report: Option<hypercall_api::recovery_safety::SharedRecoverySafetyReport>,
557 pub portfolio_margin_pool_enabled: bool,
558 pub portfolio_margin_settlement_allowlist: std::collections::BTreeSet<WalletAddress>,
559}
560
561impl Default for EngineRuntimeSettings {
562 fn default() -> Self {
563 Self {
564 snapshot_interval: Duration::from_secs(60),
565 read_snapshot_interval: Duration::from_millis(500),
566 post_startup_reconcile_delay: Duration::from_secs(5),
567 wal_path: None,
568 start_quiesced: false,
569 recovery_safety_report: None,
570 portfolio_margin_pool_enabled: false,
571 portfolio_margin_settlement_allowlist: std::collections::BTreeSet::new(),
572 }
573 }
574}
575
576impl UnifiedEngine {
577 pub fn init(
579 config: Config,
580 order_receiver: mpsc::Receiver<UnifiedEngineRequest>,
581 market_receiver: mpsc::Receiver<MarketRequest>,
582 event_sender: mpsc::UnboundedSender<EngineMessage>,
583 read_snapshot: Option<Arc<ArcSwap<EngineSnapshot>>>,
584 shutdown_receiver: broadcast::Receiver<()>,
585 runtime_settings: EngineRuntimeSettings,
586 database_path: Option<String>,
587 db_auth: Option<hypercall_db_diesel::DbAuthConfig>,
588 allow_no_database: bool,
589 skip_db_migrations: bool,
590 ) -> Self {
591 let diesel_handler = if let Some(db_path) = database_path {
593 info!(
594 "UnifiedEngine attempting to connect to database: {}",
595 db_path
596 );
597 let auth =
598 db_auth.unwrap_or_else(|| hypercall_db_diesel::DbAuthConfig::password(&db_path));
599 let result = if skip_db_migrations {
600 DatabaseHandler::new_readonly_auth(auth, 3)
601 } else {
602 DatabaseHandler::new_with_auth(auth, 3)
603 };
604 match result {
605 Ok(handler) => {
606 info!(
607 "UnifiedEngine successfully connected to persistence database: {} (migrations_skipped={})",
608 db_path, skip_db_migrations
609 );
610 Some(handler)
611 }
612 Err(e) => {
613 if allow_no_database {
615 warn!("UnifiedEngine failed to connect to persistence database: {}, continuing without persistence (allowed for tests)", e);
616 None
617 } else {
618 error!(
619 "FATAL: UnifiedEngine failed to connect to persistence database {}: {}",
620 db_path, e
621 );
622 panic!(
623 "Cannot start UnifiedEngine without database connection: {}",
624 e
625 );
626 }
627 }
628 }
629 } else {
630 if allow_no_database {
632 info!("UnifiedEngine: No database path provided, running without persistence (allowed for tests)");
633 None
634 } else {
635 error!("FATAL: No database path provided for UnifiedEngine");
636 panic!("Cannot start UnifiedEngine without database configuration");
637 }
638 };
639
640 let l2_update_seq = if let Some(ref handler) = diesel_handler {
642 let replay: &dyn hypercall_db::JournalReplayReader = handler;
643 match replay.get_max_l2_seq_from_events_sync() {
644 Ok(seq) => {
645 info!("Loaded L2 update sequence from engine_events: {}", seq);
646 Arc::new(AtomicI64::new(seq))
647 }
648 Err(e) => {
649 if allow_no_database {
650 warn!(
651 "Failed to load L2 update sequence (allowed for tests): {}",
652 e
653 );
654 Arc::new(AtomicI64::new(0))
655 } else {
656 panic!(
657 "CRITICAL_FAILURE: Failed to load L2 update sequence from database: {}. \
658 Starting from 0 would risk ID collisions. Restart required.",
659 e
660 );
661 }
662 }
663 }
664 } else {
665 Arc::new(AtomicI64::new(0))
666 };
667
668 let next_order_id = if let Some(ref handler) = diesel_handler {
670 match handler.get_max_order_id_sync() {
671 Ok(max_order_id) => {
672 let next_id = max_order_id + 1;
673 info!(
674 "Loaded max order_id from database: {}, starting from {}",
675 max_order_id, next_id
676 );
677 next_id
678 }
679 Err(e) => {
680 if allow_no_database {
681 warn!("Failed to load max order_id (allowed for tests): {}", e);
682 1
683 } else {
684 panic!(
685 "CRITICAL_FAILURE: Failed to load max order_id from database: {}. \
686 Starting from 1 would risk duplicate order IDs. Restart required.",
687 e
688 );
689 }
690 }
691 }
692 } else {
693 info!("No database handler, starting order_id from 1");
694 1
695 };
696
697 let next_trade_id = if let Some(ref handler) = diesel_handler {
699 match handler.get_max_trade_id_sync() {
700 Ok(max_trade_id) => {
701 let next_id = max_trade_id + 1;
702 info!(
703 "Loaded max trade_id from database: {}, starting from {}",
704 max_trade_id, next_id
705 );
706 next_id
707 }
708 Err(e) => {
709 if allow_no_database {
710 warn!("Failed to load max trade_id (allowed for tests): {}", e);
711 1
712 } else {
713 panic!(
714 "CRITICAL_FAILURE: Failed to load max trade_id from database: {}. \
715 Starting from 1 would risk duplicate trade IDs. Restart required.",
716 e
717 );
718 }
719 }
720 }
721 } else {
722 info!("No database handler, starting trade_id from 1");
723 1
724 };
725
726 let fee_service = FeeService::new(config.fee_config.clone());
727
728 let margin_manager = crate::rsm::margin_manager::MarginManager::new(&config);
730 let expiry_manager = crate::rsm::expiry_manager::ExpiryManager::new();
731 let ctx = crate::rsm::engine_deps::EngineCtx {
732 orderbooks: HashMap::new(),
733 next_order_id,
734 next_trade_id,
735 l2_update_seq,
736 expired_instruments: HashMap::new(),
737 spot_prices: HashMap::new(),
738 iv_surfaces: HashMap::new(),
739 iv_source_timestamps: HashMap::new(),
740 instrument_trading_modes: HashMap::new(),
741 order_index: hypercall_engine::order_index::EngineOrderIndex::new(),
742 engine_positions: HashMap::new(),
743 balance_ledger: BalanceLedger::new(),
744 deposit_update_watermarks: HashMap::new(),
745 applied_deposit_source_event_hashes: std::collections::BTreeSet::new(),
746 pm_settlement_state:
747 crate::rsm::portfolio_margin::settlement_state::PmSettlementState::default(),
748 mmp_state: HashMap::new(),
749 agent_authorizations: HashMap::new(),
750 nonce_sets: HashMap::new(),
751 rsm_signer_nonces: HashMap::new(),
752 deps: crate::rsm::engine_deps::EngineDeps {
753 mmp_cache: None,
754 tier_cache: None,
755 portfolio_service: None,
756 portfolio_cache: None,
757 risk_account_builder: None,
758 standard_account_builder: None,
759 greeks_cache: None,
760 mark_price_oracles: HashMap::new(),
761 reference_prices: HashMap::new(),
762 liquidation_cache: None,
763 config,
764 portfolio_margin_pool_enabled: runtime_settings.portfolio_margin_pool_enabled,
765 portfolio_margin_settlement_allowlist: runtime_settings
766 .portfolio_margin_settlement_allowlist
767 .clone(),
768 event_sender,
769 ws_event_sender: None,
770 margin_timestamp_s: chrono::Utc::now().timestamp(),
771 liquidation_states: HashMap::new(),
772 wallet_margin_modes: HashMap::new(),
773 mmp_enabled: HashMap::new(),
774 wallet_trading_limits: HashMap::new(),
775 default_trading_limits: hypercall_types::api_models::TradingLimits::default(),
776 wallet_tiers: HashMap::new(),
777 perp_positions: HashMap::new(),
778 hypercore_account_equity: HashMap::new(),
779 hypercore_equity_timestamps: HashMap::new(),
780 },
781 db: diesel_handler,
782 };
783
784 let mut engine = Self {
785 ctx,
786 margin_manager,
787 expiry_manager,
788 order_receiver,
789 market_receiver,
790 rfq_receiver: None,
791 deposit_receiver: None,
792 option_deposit_receiver: None,
793 option_withdrawal_receiver: None,
794 cash_withdrawal_receiver: None,
795 liquidation_bonus_receiver: None,
796 margin_mode_receiver: None,
797 agent_auth_receiver: None,
798 nonce_check_receiver: None,
799 tier_update_receiver: None,
800 hypercore_equity_receiver: None,
801 pm_settlement_admin_receiver: None,
802 quiesce_receiver: None,
803 trading_mode_receiver: None,
804 shutdown_receiver,
805 fee_service,
806 snapshot_interval: runtime_settings.snapshot_interval,
807 read_snapshot_interval: runtime_settings.read_snapshot_interval,
808 post_startup_reconcile_delay: runtime_settings.post_startup_reconcile_delay,
809 last_snapshot: Instant::now(),
810 wal_path: runtime_settings.wal_path,
811 sync_status: Arc::new(SyncStatus::new()),
812 journal_writer: None,
813 journal_batch_sender: None,
814 nats_publisher: None,
815 nats_balance_update_publisher: None,
816 known_request_ids: BoundedIdempotenceCache::new(),
817 read_snapshot: read_snapshot
818 .unwrap_or_else(|| Arc::new(ArcSwap::from_pointee(EngineSnapshot::empty()))),
819 post_startup_reconciled: false,
820 startup_replayed_events: Vec::new(),
821 replay_checkpoint: WalCheckpointMetadata::ZERO,
822 runtime_quiesced: runtime_settings.start_quiesced,
823 snapshot_loaded: false,
824 startup_expired_order_cancels: Vec::new(),
825 external_vol_oracle: None,
826 engine_iv_surfaces: Arc::new(std::sync::RwLock::new(HashMap::new())),
827 };
828
829 ReplayRecovery::load_markets_from_db(&mut engine);
831
832 ReplayRecovery::load_orderbook_snapshots(&mut engine);
834 engine.sync_status.set_catching_up();
835
836 let wal_path_configured = hypercall_journal::checkpoint::wal_path_is_explicitly_configured(
843 engine.wal_path.as_ref(),
844 );
845 let wal_path =
846 hypercall_journal::checkpoint::wal_path_from_config(engine.wal_path.as_ref());
847 let env_wal_path = std::env::var("ENGINE_JOURNAL_WAL_PATH").ok();
848 let require_snapshot_restore = env_flag_enabled(ENGINE_REQUIRE_SNAPSHOT_RESTORE_ENV);
849 let unsafe_replay_override = env_flag_enabled(UNSAFE_SNAPSHOT_REPLAY_OVERRIDE_ENV);
850 let cash_divergence_override = env_flag_enabled(SNAPSHOT_CASH_VALIDATION_OVERRIDE_ENV);
851 let startup_replay_checkpoint = engine.replay_checkpoint;
852 let snapshot_path =
853 crate::rsm::engine_state_snapshot::snapshot_path_from_wal_path(&wal_path);
854 let drain_marker_path = hypercall_api::handlers::health::drain_marker_path(&wal_path);
855 let mut recovery_safety_report = hypercall_api::recovery_safety::RecoverySafetyReport::new(
856 wal_path.display().to_string(),
857 wal_path_configured,
858 require_snapshot_restore,
859 unsafe_replay_override,
860 cash_divergence_override,
861 hypercall_api::recovery_safety::RecoverySafetyCheckpoint {
862 last_command_id: engine.replay_checkpoint.last_command_id,
863 last_l2_seq: engine.replay_checkpoint.last_l2_seq,
864 wal_offset: engine.replay_checkpoint.wal_offset as u64,
865 },
866 hypercall_api::recovery_safety::RecoverySafetyDrainMarker {
867 path: drain_marker_path.display().to_string(),
868 present_at_startup: runtime_settings.start_quiesced,
869 },
870 );
871 if runtime_settings.start_quiesced {
872 recovery_safety_report.pass(
873 "drain_marker",
874 format!(
875 "persistent drain marker present at startup; engine starts quiesced until operator undrains: {}",
876 drain_marker_path.display()
877 ),
878 );
879 } else {
880 recovery_safety_report.pass("drain_marker", "no persistent drain marker at startup");
881 }
882 info!(
883 configured_wal_path = ?engine.wal_path,
884 env_wal_path = ?env_wal_path,
885 effective_wal_path = %wal_path.display(),
886 wal_checkpoint_last_command_id = engine.replay_checkpoint.last_command_id,
887 require_snapshot_restore,
888 "Resolved engine recovery WAL path"
889 );
890 if require_snapshot_restore && engine.ctx.db.is_some() {
891 if !wal_path_configured {
892 recovery_safety_report.fail(
893 "wal_path_configured",
894 format!(
895 "{} is set but no explicit engine WAL path is configured",
896 ENGINE_REQUIRE_SNAPSHOT_RESTORE_ENV
897 ),
898 );
899 hypercall_api::recovery_safety::store_report(
900 &runtime_settings.recovery_safety_report,
901 &recovery_safety_report,
902 );
903 panic!(
904 "CRITICAL_FAILURE: {} is set but no explicit engine WAL path is configured",
905 ENGINE_REQUIRE_SNAPSHOT_RESTORE_ENV
906 );
907 }
908 if engine.replay_checkpoint.last_command_id <= 0 {
909 recovery_safety_report.fail(
910 "checkpoint",
911 format!(
912 "{} is set but WAL checkpoint at {} is missing or zero",
913 ENGINE_REQUIRE_SNAPSHOT_RESTORE_ENV,
914 wal_path.display()
915 ),
916 );
917 hypercall_api::recovery_safety::store_report(
918 &runtime_settings.recovery_safety_report,
919 &recovery_safety_report,
920 );
921 panic!(
922 "CRITICAL_FAILURE: {} is set but WAL checkpoint at {} is missing or zero. \
923 Blue/green standby must start from a cloned WAL/checkpoint/snapshot boundary.",
924 ENGINE_REQUIRE_SNAPSHOT_RESTORE_ENV,
925 wal_path.display(),
926 );
927 }
928 }
929 if wal_path_configured {
930 recovery_safety_report.pass("wal_path_configured", "explicit WAL path configured");
931 } else {
932 recovery_safety_report.pass(
933 "wal_path_configured",
934 "explicit WAL path not configured; persistent snapshot restore not attempted",
935 );
936 }
937 if engine.replay_checkpoint.last_command_id > 0 {
938 recovery_safety_report.pass(
939 "checkpoint",
940 format!(
941 "checkpoint loaded at command_id {}",
942 engine.replay_checkpoint.last_command_id
943 ),
944 );
945 } else {
946 recovery_safety_report.pass("checkpoint", "no nonzero WAL checkpoint");
947 }
948 let mut hot_journal_bounds = None;
949
950 if engine.ctx.db.is_some()
951 && engine.replay_checkpoint.last_command_id > 0
952 && wal_path_configured
953 {
954 use crate::rsm::engine_state_snapshot::{read_snapshot, snapshot_order_count};
955 match read_snapshot(&snapshot_path) {
956 Ok(Some(snap))
957 if snap.last_command_id <= engine.replay_checkpoint.last_command_id =>
958 {
959 recovery_safety_report.snapshot =
960 Some(hypercall_api::recovery_safety::RecoverySafetySnapshot {
961 path: snapshot_path.display().to_string(),
962 present: true,
963 loaded: false,
964 last_command_id: Some(snap.last_command_id),
965 last_l2_seq: Some(snap.last_l2_seq),
966 order_count: Some(snapshot_order_count(&snap)),
967 });
968 recovery_safety_report.pass(
969 "snapshot_decode",
970 format!(
971 "decoded engine snapshot at command_id {}",
972 snap.last_command_id
973 ),
974 );
975 let cash_authority = cash_restart_authority_for_startup(
976 Some(snap.last_command_id),
977 engine.replay_checkpoint.last_command_id,
978 )
979 .expect("snapshot command_id checked not ahead of WAL checkpoint");
980 info!(
981 snapshot_last_command_id = snap.last_command_id,
982 checkpoint_last_command_id = engine.replay_checkpoint.last_command_id,
983 ?cash_authority,
984 snapshot_balance_update_seq = snap.last_balance_update_seq,
985 "Skipping pre-restore DB cash validation; engine snapshot plus WAL boundary is authoritative"
986 );
987 recovery_safety_report.cash.validation_checked = false;
988 recovery_safety_report.cash.validation_passed = None;
989 recovery_safety_report.cash.message = Some(format!(
990 "pre-restore DB cash validation skipped for {:?}; engine snapshot plus WAL is authoritative",
991 cash_authority
992 ));
993
994 let order_count = snapshot_order_count(&snap);
995 info!(
996 "Loading engine state snapshot: last_command_id={}, orders={}",
997 snap.last_command_id, order_count,
998 );
999 crate::rsm::restart_components::PersistentEngineStateComponent::restore(
1000 &mut engine.ctx,
1001 &snap,
1002 );
1003 engine.rebuild_shared_engine_iv_surfaces();
1004 engine.replay_checkpoint.last_command_id = snap.last_command_id;
1005 engine.replay_checkpoint.last_l2_seq = snap.last_l2_seq;
1006 engine.snapshot_loaded = true;
1007 if let Some(snapshot) = recovery_safety_report.snapshot.as_mut() {
1008 snapshot.loaded = true;
1009 }
1010
1011 gauge!("ht_engine_snapshot_restored").set(1.0);
1012 gauge!("ht_engine_snapshot_last_command_id").set(snap.last_command_id as f64);
1013 gauge!("ht_engine_snapshot_orders_count").set(order_count as f64);
1014 }
1015 Ok(Some(snap)) => {
1016 recovery_safety_report.snapshot =
1017 Some(hypercall_api::recovery_safety::RecoverySafetySnapshot {
1018 path: snapshot_path.display().to_string(),
1019 present: true,
1020 loaded: false,
1021 last_command_id: Some(snap.last_command_id),
1022 last_l2_seq: Some(snap.last_l2_seq),
1023 order_count: Some(snapshot_order_count(&snap)),
1024 });
1025 recovery_safety_report.fail(
1026 "snapshot_boundary",
1027 format!(
1028 "snapshot command_id {} is ahead of WAL checkpoint command_id {}",
1029 snap.last_command_id, engine.replay_checkpoint.last_command_id
1030 ),
1031 );
1032 hypercall_api::recovery_safety::store_report(
1033 &runtime_settings.recovery_safety_report,
1034 &recovery_safety_report,
1035 );
1036 panic!(
1037 "CRITICAL_FAILURE: engine state snapshot last_command_id {} is ahead of \
1038 WAL checkpoint last_command_id {}. This is a logic bug — the snapshot \
1039 should always be tagged from the Postgres-replicated boundary.",
1040 snap.last_command_id, engine.replay_checkpoint.last_command_id,
1041 );
1042 }
1043 Ok(None) => {
1044 gauge!("ht_engine_snapshot_restored").set(0.0);
1045 recovery_safety_report.snapshot =
1046 Some(hypercall_api::recovery_safety::RecoverySafetySnapshot {
1047 path: snapshot_path.display().to_string(),
1048 present: false,
1049 loaded: false,
1050 last_command_id: None,
1051 last_l2_seq: None,
1052 order_count: None,
1053 });
1054 if require_snapshot_restore {
1055 recovery_safety_report.fail(
1056 "snapshot_decode",
1057 format!(
1058 "no engine state snapshot found at {}",
1059 snapshot_path.display()
1060 ),
1061 );
1062 hypercall_api::recovery_safety::store_report(
1063 &runtime_settings.recovery_safety_report,
1064 &recovery_safety_report,
1065 );
1066 panic!(
1067 "CRITICAL_FAILURE: no engine state snapshot found at {} with WAL \
1068 checkpoint command_id {} while {} is set. Refusing standby restore \
1069 without a cloned engine snapshot.",
1070 snapshot_path.display(),
1071 engine.replay_checkpoint.last_command_id,
1072 ENGINE_REQUIRE_SNAPSHOT_RESTORE_ENV,
1073 );
1074 } else {
1075 recovery_safety_report.pass(
1076 "snapshot_decode",
1077 "no snapshot found; base replay requires hot journal coverage validation",
1078 );
1079 warn!(
1080 snapshot_path = %snapshot_path.display(),
1081 checkpoint_command_id = engine.replay_checkpoint.last_command_id,
1082 "No engine state snapshot found; falling back to full durable journal replay after coverage validation"
1083 );
1084 }
1085 }
1086 Err(e) => {
1087 gauge!("ht_engine_snapshot_restored").set(0.0);
1088 recovery_safety_report.snapshot =
1089 Some(hypercall_api::recovery_safety::RecoverySafetySnapshot {
1090 path: snapshot_path.display().to_string(),
1091 present: true,
1092 loaded: false,
1093 last_command_id: None,
1094 last_l2_seq: None,
1095 order_count: None,
1096 });
1097 if unsafe_replay_override {
1098 recovery_safety_report.fail(
1099 "snapshot_decode",
1100 format!("corrupt engine state snapshot accepted by override: {e}"),
1101 );
1102 warn!(
1103 error = %e,
1104 override_env = UNSAFE_SNAPSHOT_REPLAY_OVERRIDE_ENV,
1105 checkpoint_command_id = engine.replay_checkpoint.last_command_id,
1106 "UNSAFE_OPERATOR_OVERRIDE: corrupt engine state snapshot, continuing to full replay"
1107 );
1108 } else {
1109 recovery_safety_report.fail(
1110 "snapshot_decode",
1111 format!("corrupt engine state snapshot: {e}"),
1112 );
1113 hypercall_api::recovery_safety::store_report(
1114 &runtime_settings.recovery_safety_report,
1115 &recovery_safety_report,
1116 );
1117 panic!(
1118 "CRITICAL_FAILURE: corrupt engine state snapshot at {} with WAL \
1119 checkpoint command_id {}: {}. Refusing unsafe hot-journal replay. \
1120 Set {}=true only for an explicit operator-controlled repair or \
1121 incident investigation.",
1122 snapshot_path.display(),
1123 engine.replay_checkpoint.last_command_id,
1124 e,
1125 UNSAFE_SNAPSHOT_REPLAY_OVERRIDE_ENV,
1126 );
1127 }
1128 }
1129 }
1130 }
1131
1132 if should_validate_hot_journal_coverage_for_base_replay(
1133 engine.ctx.db.is_some(),
1134 startup_replay_checkpoint.last_command_id,
1135 wal_path_configured,
1136 engine.snapshot_loaded,
1137 engine.replay_checkpoint.last_command_id,
1138 ) {
1139 let checkpoint_command_id = startup_replay_checkpoint.last_command_id;
1140 let replay_from_command_id = if engine.snapshot_loaded {
1141 engine.replay_checkpoint.last_command_id.saturating_add(1)
1142 } else {
1143 1
1144 };
1145 let db = engine
1146 .ctx
1147 .db
1148 .as_ref()
1149 .expect("checked db is present before hot journal coverage validation");
1150 let coverage_result = match hot_journal_bounds {
1151 Some(bounds) => Ok(Some(bounds)),
1152 None => db
1153 .get_journal_command_id_bounds_sync()
1154 .map_err(|error| format!("failed to load hot journal command bounds: {error}")),
1155 }
1156 .and_then(|bounds| {
1157 recovery_safety_report.replay_coverage.checked = true;
1158 if let Some(bounds) = bounds {
1159 hot_journal_bounds = Some(bounds);
1160 recovery_safety_report.replay_coverage.min_command_id =
1161 Some(bounds.min_command_id);
1162 recovery_safety_report.replay_coverage.max_command_id =
1163 Some(bounds.max_command_id);
1164 }
1165 let non_replayable_tail_count = non_replayable_tail_count_for_coverage_check(
1166 db,
1167 checkpoint_command_id,
1168 bounds,
1169 replay_from_command_id,
1170 )?;
1171 validate_hot_journal_covers_base_replay(
1172 checkpoint_command_id,
1173 bounds,
1174 replay_from_command_id,
1175 non_replayable_tail_count,
1176 )
1177 });
1178
1179 match coverage_result {
1180 Ok(()) => {
1181 recovery_safety_report.pass(
1182 "hot_journal_coverage",
1183 "hot journal covers base reconstruction window",
1184 );
1185 info!(
1186 checkpoint_command_id = checkpoint_command_id,
1187 "Hot journal coverage validation passed for base reconstruction"
1188 );
1189 }
1190 Err(error) if unsafe_replay_override => {
1191 recovery_safety_report.fail("hot_journal_coverage", error.clone());
1192 warn!(
1193 %error,
1194 override_env = UNSAFE_SNAPSHOT_REPLAY_OVERRIDE_ENV,
1195 checkpoint_command_id = checkpoint_command_id,
1196 "UNSAFE_OPERATOR_OVERRIDE: continuing with incomplete hot-journal base reconstruction"
1197 );
1198 }
1199 Err(error) => {
1200 recovery_safety_report.fail("hot_journal_coverage", error.clone());
1201 hypercall_api::recovery_safety::store_report(
1202 &runtime_settings.recovery_safety_report,
1203 &recovery_safety_report,
1204 );
1205 panic!(
1206 "CRITICAL_FAILURE: {error}. Refusing base reconstruction from incomplete \
1207 hot Postgres journal at checkpoint command_id {}. Set {}=true only for \
1208 an explicit operator-controlled repair or incident investigation.",
1209 checkpoint_command_id, UNSAFE_SNAPSHOT_REPLAY_OVERRIDE_ENV,
1210 );
1211 }
1212 }
1213 } else {
1214 recovery_safety_report.pass(
1215 "hot_journal_coverage",
1216 "base reconstruction coverage validation not required",
1217 );
1218 }
1219 ReplayRecovery::replay_commands_from_journal(&mut engine);
1225
1226 if let Some(cash_validation_checkpoint) = post_replay_cash_validation_checkpoint(
1227 engine.ctx.db.is_some(),
1228 wal_path_configured,
1229 startup_replay_checkpoint,
1230 hot_journal_bounds,
1231 ) {
1232 recovery_safety_report.cash.validation_checked = false;
1233 recovery_safety_report.cash.validation_passed = None;
1234 recovery_safety_report.cash.message = Some(format!(
1235 "post-replay DB cash validation skipped at checkpoint command_id {}; engine balance ledger is authoritative",
1236 cash_validation_checkpoint.last_command_id
1237 ));
1238 recovery_safety_report.pass(
1239 "standard_margin_cash_replay_watermark",
1240 "post-replay DB cash validation skipped because engine balance ledger is authoritative",
1241 );
1242 }
1243 hypercall_api::recovery_safety::store_report(
1244 &runtime_settings.recovery_safety_report,
1245 &recovery_safety_report,
1246 );
1247
1248 ReplayRecovery::check_restored_orderbooks_for_crosses(&engine);
1250
1251 engine.publish_snapshot();
1253
1254 engine.persist_engine_state_snapshot();
1256
1257 if engine.ctx.db.is_none() {
1260 engine.post_startup_reconciled = true;
1261 engine.sync_status.set_ready();
1262 info!("UnifiedEngine sync status: Ready (no database configured)");
1263 }
1264
1265 engine
1266 }
1267
1268 fn publish_snapshot(&self) {
1270 let l2_seq = self.ctx.l2_update_seq.load(Ordering::SeqCst);
1271 let snapshot = EngineSnapshot::build_with_ctx(&self.ctx, l2_seq);
1272 tracing::debug!(
1273 l2_seq = snapshot.l2_seq,
1274 cash_wallet_count = snapshot.engine_state_digest.cash_wallet_count,
1275 cash_digest = %snapshot.engine_state_digest.cash_digest,
1276 "Publishing EngineSnapshot balance_ledger"
1277 );
1278 self.read_snapshot.store(Arc::new(snapshot));
1279 }
1280
1281 pub fn set_portfolio_service(&mut self, service: Arc<dyn PortfolioService + Send + Sync>) {
1286 info!("Setting PortfolioService for UnifiedEngine");
1287 self.ctx.deps.portfolio_service = Some(service);
1288 }
1289
1290 pub fn set_portfolio_cache(&mut self, cache: Arc<PortfolioCache>) {
1292 info!("Setting PortfolioCache for UnifiedEngine");
1293 self.ctx.deps.portfolio_cache = Some(cache);
1294 }
1295
1296 pub fn set_risk_account_builder(
1301 &mut self,
1302 builder: Arc<crate::rsm::portfolio_margin::risk_account_builder::RiskAccountBuilder>,
1303 ) {
1304 info!("Setting RiskAccountBuilder for UnifiedEngine");
1305 self.ctx.deps.risk_account_builder = Some(builder);
1306 }
1307
1308 pub fn set_standard_account_builder(&mut self, builder: Arc<StandardAccountBuilder>) {
1312 info!("Setting StandardAccountBuilder for UnifiedEngine");
1313 self.ctx.deps.standard_account_builder = Some(builder);
1314 }
1315
1316 pub fn set_liquidation_cache(&mut self, cache: Arc<crate::liquidator::LiquidationCache>) {
1321 info!("Setting LiquidationCache for UnifiedEngine");
1322 self.ctx.deps.liquidation_cache = Some(cache);
1323 }
1324
1325 pub fn set_db(&mut self, handler: DatabaseHandler) {
1331 info!("Replacing DieselEventHandler on UnifiedEngine (standby promote)");
1332 self.ctx.db = Some(handler);
1333 }
1334
1335 pub fn set_journal_writer(&mut self, writer: crate::journal::SharedEngineJournalWriter) {
1339 info!("Setting EngineJournalWriter for UnifiedEngine");
1340
1341 match writer.get_recent_request_ids(IDEMPOTENCY_CACHE_LOOKBACK_HOURS) {
1345 Ok(request_ids) => {
1346 let count = request_ids.len();
1347 self.known_request_ids.extend(request_ids);
1348 info!(
1349 "Loaded {} request_ids into idempotency cache (24h lookback, capacity: {})",
1350 count,
1351 self.known_request_ids.capacity()
1352 );
1353 }
1354 Err(e) => {
1355 warn!(
1356 "Failed to load request_ids for idempotency cache: {} - idempotency checks will query DB",
1357 e
1358 );
1359 }
1360 }
1361
1362 self.journal_writer = Some(writer);
1363 }
1364
1365 pub fn set_journal_batch_sender(&mut self, sender: crate::journal::JournalBatchSender) {
1369 info!("Setting JournalBatchSender for UnifiedEngine (async batched writes)");
1370 self.journal_batch_sender = Some(sender);
1371 }
1372
1373 pub fn set_nats_publisher(&mut self, publisher: crate::nats::NatsPublisher) {
1375 info!("Setting NatsPublisher for UnifiedEngine (real-time replication)");
1376 self.nats_publisher = Some(publisher);
1377 }
1378
1379 pub fn set_nats_balance_update_publisher(
1380 &mut self,
1381 publisher: crate::nats::NatsBalanceUpdatePublisher,
1382 ) {
1383 info!("Setting NatsBalanceUpdatePublisher for UnifiedEngine");
1384 self.nats_balance_update_publisher = Some(publisher);
1385 }
1386
1387 pub(crate) async fn publish_to_nats(&self, env: &crate::rsm::apply::CommandEnvelope) {
1390 let Some(ref publisher) = self.nats_publisher else {
1391 return;
1392 };
1393
1394 use crate::nats::CommandType;
1395 use crate::rsm::apply::{
1396 ApproveAgentPayload, BalanceCommandPayload, DepositUpdatePayload, EngineCommand,
1397 HypercoreEquityUpdatePayload, HypercorePositionUpdatePayload, MmpConfigUpdatePayload,
1398 NonceAdvancePayload, RevokeAgentPayload, TierMarginModeUpdatePayload,
1399 TradingModeUpdatePayload,
1400 };
1401
1402 let (cmd_type, data) = match &env.command {
1403 EngineCommand::OrderAction(msg) => (
1404 CommandType::Order,
1405 hypercall_types::serialize_to_wire_bytes(msg),
1406 ),
1407 EngineCommand::PriceUpdate {
1408 underlying,
1409 spot_price,
1410 timestamp_ms,
1411 } => (
1412 CommandType::PriceUpdate,
1413 hypercall_types::serialize_to_wire_bytes(&crate::rsm::apply::PriceUpdatePayload {
1414 underlying: underlying.clone(),
1415 spot_price: *spot_price,
1416 timestamp_ms: *timestamp_ms,
1417 }),
1418 ),
1419 EngineCommand::IvUpdate {
1420 underlying: _,
1421 journal_data,
1422 timestamp_ms: _,
1423 ..
1424 } => {
1425 let data = match journal_data {
1427 Some(d) => d.clone(),
1428 None => return, };
1430 (CommandType::IvUpdate, data)
1431 }
1432 EngineCommand::MarketAction(cmd) => (
1433 CommandType::MarketAction,
1434 hypercall_types::serialize_to_wire_bytes(cmd),
1435 ),
1436 EngineCommand::LiquidationState(msg) => (
1437 CommandType::LiquidationState,
1438 hypercall_types::serialize_to_wire_bytes(msg),
1439 ),
1440 EngineCommand::TierUpdate {
1441 wallet,
1442 margin_mode,
1443 tier,
1444 trading_limits,
1445 } => (
1446 CommandType::TierUpdate,
1447 hypercall_types::serialize_to_wire_bytes(&crate::rsm::apply::TierUpdatePayload {
1448 wallet: *wallet,
1449 margin_mode: *margin_mode,
1450 tier: tier.clone(),
1451 trading_limits: *trading_limits,
1452 }),
1453 ),
1454 EngineCommand::LegacyTierMarginModeUpdate {
1455 wallet,
1456 margin_mode,
1457 } => (
1458 CommandType::TierUpdate,
1459 hypercall_types::serialize_to_wire_bytes(&TierMarginModeUpdatePayload {
1460 wallet: *wallet,
1461 margin_mode: *margin_mode,
1462 }),
1463 ),
1464 EngineCommand::HypercorePositionUpdate {
1465 account,
1466 coin,
1467 size,
1468 entry_price,
1469 unrealized_pnl,
1470 timestamp_ms,
1471 } => (
1472 CommandType::HypercorePositionUpdate,
1473 hypercall_types::serialize_to_wire_bytes(&HypercorePositionUpdatePayload {
1474 account: account.clone(),
1475 coin: coin.clone(),
1476 size: *size,
1477 entry_price: *entry_price,
1478 unrealized_pnl: *unrealized_pnl,
1479 timestamp_ms: *timestamp_ms,
1480 }),
1481 ),
1482 EngineCommand::MmpConfigUpdate {
1483 wallet,
1484 currency,
1485 enabled,
1486 interval_ms,
1487 frozen_time_ms,
1488 qty_limit,
1489 delta_limit,
1490 vega_limit,
1491 } => (
1492 CommandType::MmpConfigUpdate,
1493 hypercall_types::serialize_to_wire_bytes(&MmpConfigUpdatePayload {
1494 wallet: *wallet,
1495 currency: currency.clone(),
1496 enabled: *enabled,
1497 interval_ms: *interval_ms,
1498 frozen_time_ms: *frozen_time_ms,
1499 qty_limit: *qty_limit,
1500 delta_limit: *delta_limit,
1501 vega_limit: *vega_limit,
1502 }),
1503 ),
1504 EngineCommand::RfqExecute(cmd) => (
1505 CommandType::RfqExecute,
1506 hypercall_types::serialize_to_wire_bytes(cmd),
1507 ),
1508 EngineCommand::TradingModeUpdate {
1509 modes,
1510 timestamp_ms,
1511 } => (
1512 CommandType::TradingModeUpdate,
1513 hypercall_types::serialize_to_wire_bytes(&TradingModeUpdatePayload {
1514 modes: modes.clone(),
1515 timestamp_ms: *timestamp_ms,
1516 }),
1517 ),
1518 EngineCommand::TickExpiry { now_ms, context } => (
1519 CommandType::TickExpiry,
1520 hypercall_types::serialize_to_wire_bytes(&(*now_ms, context)),
1521 ),
1522 EngineCommand::TickSnapshot { .. } => return, EngineCommand::DepositUpdate {
1524 wallet,
1525 amount,
1526 timestamp_ms,
1527 sequence,
1528 source_event_hash,
1529 } => (
1530 CommandType::DepositUpdate,
1531 hypercall_types::serialize_to_wire_bytes(&DepositUpdatePayload {
1532 wallet: *wallet,
1533 amount: *amount,
1534 timestamp_ms: *timestamp_ms,
1535 sequence: sequence.unwrap_or_else(|| {
1536 panic!("RUNTIME_INVARIANT: DepositUpdate missing durable sequence")
1537 }),
1538 source_event_hash: source_event_hash.clone(),
1539 }),
1540 ),
1541 EngineCommand::OptionDepositUpdate {
1542 request_id,
1543 wallet,
1544 symbol,
1545 quantity,
1546 timestamp_ms,
1547 } => (
1548 CommandType::OptionDepositUpdate,
1549 hypercall_types::serialize_to_wire_bytes(&(
1550 request_id,
1551 wallet,
1552 symbol,
1553 quantity,
1554 timestamp_ms,
1555 )),
1556 ),
1557 EngineCommand::OptionWithdrawalUpdate {
1558 request_id,
1559 wallet,
1560 account,
1561 signer,
1562 rsm_signer,
1563 symbol,
1564 quantity,
1565 nonce,
1566 action,
1567 timestamp_ms,
1568 } => (
1569 CommandType::OptionWithdrawalUpdate,
1570 hypercall_types::serialize_to_wire_bytes(&(
1571 request_id,
1572 wallet,
1573 account,
1574 signer,
1575 rsm_signer,
1576 symbol,
1577 quantity,
1578 nonce,
1579 action,
1580 timestamp_ms,
1581 )),
1582 ),
1583 EngineCommand::CashWithdrawalUpdate {
1584 request_id,
1585 wallet,
1586 account,
1587 destination,
1588 signer,
1589 rsm_signer,
1590 amount,
1591 amount_wei,
1592 nonce,
1593 timestamp_ms,
1594 } => (
1595 CommandType::CashWithdrawalUpdate,
1596 hypercall_types::serialize_to_wire_bytes(&(
1597 request_id,
1598 wallet,
1599 account,
1600 destination,
1601 signer,
1602 rsm_signer,
1603 amount,
1604 amount_wei,
1605 nonce,
1606 timestamp_ms,
1607 )),
1608 ),
1609 EngineCommand::LiquidationBonusUpdate {
1610 wallet,
1611 amount,
1612 balance_after,
1613 timestamp_ms,
1614 sequence,
1615 } => (
1616 CommandType::LiquidationBonusUpdate,
1617 hypercall_types::serialize_to_wire_bytes(&BalanceCommandPayload {
1618 wallet: *wallet,
1619 amount: *amount,
1620 balance_after: *balance_after,
1621 timestamp_ms: *timestamp_ms,
1622 sequence: *sequence,
1623 }),
1624 ),
1625 EngineCommand::ApproveAgent {
1626 wallet,
1627 agent,
1628 expires_at_ms,
1629 nonce,
1630 timestamp_ms,
1631 } => (
1632 CommandType::ApproveAgent,
1633 hypercall_types::serialize_to_wire_bytes(&ApproveAgentPayload {
1634 wallet: *wallet,
1635 agent: *agent,
1636 expires_at_ms: *expires_at_ms,
1637 nonce: *nonce,
1638 timestamp_ms: *timestamp_ms,
1639 }),
1640 ),
1641 EngineCommand::RevokeAgent {
1642 wallet,
1643 agent,
1644 nonce,
1645 timestamp_ms,
1646 } => (
1647 CommandType::RevokeAgent,
1648 hypercall_types::serialize_to_wire_bytes(&RevokeAgentPayload {
1649 wallet: *wallet,
1650 agent: *agent,
1651 nonce: *nonce,
1652 timestamp_ms: *timestamp_ms,
1653 }),
1654 ),
1655 EngineCommand::NonceAdvance { wallet, nonce, .. } => (
1656 CommandType::NonceAdvance,
1657 hypercall_types::serialize_to_wire_bytes(&NonceAdvancePayload {
1658 wallet: *wallet,
1659 nonce: *nonce,
1660 timestamp_ms: env.received_ts_ms,
1661 }),
1662 ),
1663 EngineCommand::HypercoreEquityUpdate {
1664 wallet,
1665 account_value,
1666 timestamp_ms,
1667 } => (
1668 CommandType::HypercoreEquityUpdate,
1669 hypercall_types::serialize_to_wire_bytes(&HypercoreEquityUpdatePayload {
1670 wallet: *wallet,
1671 account_value: *account_value,
1672 timestamp_ms: *timestamp_ms,
1673 }),
1674 ),
1675 EngineCommand::SetPmSettlementPoolConfig(command) => (
1676 CommandType::SetPmSettlementPoolConfig,
1677 hypercall_types::serialize_to_wire_bytes(command),
1678 ),
1679 EngineCommand::RecordPmVaultDeposit(command) => (
1680 CommandType::RecordPmVaultDeposit,
1681 hypercall_types::serialize_to_wire_bytes(command),
1682 ),
1683 EngineCommand::RequestPmVaultWithdrawal(command) => (
1684 CommandType::RequestPmVaultWithdrawal,
1685 hypercall_types::serialize_to_wire_bytes(command),
1686 ),
1687 EngineCommand::AccruePmSettlementInterest(command) => (
1688 CommandType::AccruePmSettlementInterest,
1689 hypercall_types::serialize_to_wire_bytes(command),
1690 ),
1691 EngineCommand::ApplyPmSettlementRepayment(command) => (
1692 CommandType::ApplyPmSettlementRepayment,
1693 hypercall_types::serialize_to_wire_bytes(command),
1694 ),
1695 EngineCommand::JournalPmRecoveryPlan(command) => (
1696 CommandType::JournalPmRecoveryPlan,
1697 hypercall_types::serialize_to_wire_bytes(command),
1698 ),
1699 EngineCommand::MarkPmRecoveryActionSubmitted(command) => (
1700 CommandType::MarkPmRecoveryActionSubmitted,
1701 hypercall_types::serialize_to_wire_bytes(command),
1702 ),
1703 EngineCommand::ResolvePmRecoveryAction(command) => (
1704 CommandType::ResolvePmRecoveryAction,
1705 hypercall_types::serialize_to_wire_bytes(command),
1706 ),
1707 };
1708
1709 let is_expiry_payload = matches!(cmd_type, CommandType::TickExpiry)
1710 || matches!(
1711 &env.command,
1712 EngineCommand::MarketAction(command) if command.expiry_context.is_some()
1713 );
1714 if is_expiry_payload {
1715 let total_payload_len = crate::nats::COMMAND_PAYLOAD_PREFIX_LEN + data.len();
1716 if total_payload_len > MAX_EXPIRY_NATS_PAYLOAD_BYTES {
1717 panic!(
1718 "CRITICAL_FAILURE: expiry NATS payload is {} bytes, exceeding {} byte limit. \
1719 Refusing to publish an oversized replication command.",
1720 total_payload_len, MAX_EXPIRY_NATS_PAYLOAD_BYTES
1721 );
1722 }
1723 }
1724
1725 publisher.publish(cmd_type, &data).await;
1726 }
1727
1728 pub(crate) async fn publish_balance_updates_to_nats(
1729 &self,
1730 updates: &[hypercall_types::BalanceUpdate],
1731 ) {
1732 let Some(ref publisher) = self.nats_balance_update_publisher else {
1733 return;
1734 };
1735 for update in updates {
1736 publisher.publish(update).await;
1737 }
1738 }
1739
1740 pub fn set_mark_price_oracles(
1745 &mut self,
1746 oracles: HashMap<String, Arc<HyperliquidMarkPriceOracle>>,
1747 ) {
1748 info!(
1749 "Setting mark price oracles for UnifiedEngine: {:?}",
1750 oracles.keys().collect::<Vec<_>>()
1751 );
1752 self.ctx.deps.mark_price_oracles = oracles;
1753 }
1754
1755 pub fn set_mmp_cache(&mut self, cache: Arc<crate::read_cache::mmp::MmpCache>) {
1757 info!("Setting MMP cache for UnifiedEngine");
1758 self.ctx.deps.mmp_cache = Some(cache);
1759 }
1760
1761 pub fn set_tier_cache(&mut self, cache: Arc<crate::read_cache::tier::TierCache>) {
1766 let modes = cache.get_all_margin_modes_sync().expect(
1767 "CRITICAL_FAILURE: tier cache unavailable during engine seed -- margin modes unavailable",
1768 );
1769 let default_limits = cache.default_trading_limits();
1770 let limits = cache
1771 .get_all_trading_limits_sync()
1772 .expect("CRITICAL_FAILURE: tier cache lock contention during engine seed -- trading limits unavailable");
1773 let tiers = cache.get_all_tiers_sync().expect(
1774 "CRITICAL_FAILURE: tier cache lock contention during engine seed -- tiers unavailable",
1775 );
1776 let count = modes.len();
1777 self.ctx.deps.wallet_margin_modes = modes;
1778 self.ctx.deps.wallet_trading_limits = limits;
1779 self.ctx.deps.default_trading_limits = default_limits;
1780 self.ctx.deps.wallet_tiers = tiers;
1781 info!(
1782 "Setting Tier cache for UnifiedEngine, seeded {} wallet tiers",
1783 count
1784 );
1785 self.ctx.deps.tier_cache = Some(cache);
1786 }
1787
1788 pub fn set_ws_event_sender(
1790 &mut self,
1791 sender: tokio::sync::mpsc::UnboundedSender<hypercall_types::EngineMessage>,
1792 ) {
1793 self.ctx.deps.ws_event_sender = Some(sender);
1794 }
1795
1796 pub fn set_rfq_receiver(
1798 &mut self,
1799 receiver: mpsc::Receiver<hypercall_runtime_api::RfqExecuteRequest>,
1800 ) {
1801 self.rfq_receiver = Some(receiver);
1802 }
1803
1804 pub fn set_deposit_receiver(&mut self, receiver: mpsc::Receiver<DepositRequest>) {
1806 self.deposit_receiver = Some(receiver);
1807 }
1808
1809 pub fn set_option_deposit_receiver(&mut self, receiver: mpsc::Receiver<OptionDepositRequest>) {
1811 self.option_deposit_receiver = Some(receiver);
1812 }
1813
1814 pub fn set_option_withdrawal_receiver(
1815 &mut self,
1816 receiver: mpsc::Receiver<OptionWithdrawalRequest>,
1817 ) {
1818 self.option_withdrawal_receiver = Some(receiver);
1819 }
1820
1821 pub fn set_cash_withdrawal_receiver(
1823 &mut self,
1824 receiver: mpsc::Receiver<CashWithdrawalRequest>,
1825 ) {
1826 self.cash_withdrawal_receiver = Some(receiver);
1827 }
1828
1829 pub fn set_liquidation_bonus_receiver(
1831 &mut self,
1832 receiver: mpsc::Receiver<LiquidationBonusRequest>,
1833 ) {
1834 self.liquidation_bonus_receiver = Some(receiver);
1835 }
1836
1837 pub fn set_tier_update_receiver(&mut self, receiver: mpsc::Receiver<TierUpdateRequest>) {
1839 self.tier_update_receiver = Some(receiver);
1840 }
1841
1842 pub fn set_hypercore_equity_receiver(
1844 &mut self,
1845 receiver: mpsc::Receiver<HypercoreEquityRequest>,
1846 ) {
1847 self.hypercore_equity_receiver = Some(receiver);
1848 }
1849
1850 pub fn set_trading_mode_receiver(
1857 &mut self,
1858 receiver: tokio::sync::watch::Receiver<
1859 std::collections::HashMap<String, hypercall_types::TradingModes>,
1860 >,
1861 ) {
1862 self.trading_mode_receiver = Some(receiver);
1863 }
1864
1865 pub fn sync_status(&self) -> Arc<SyncStatus> {
1870 self.sync_status.clone()
1871 }
1872}
1873
1874impl UnifiedEngine {
1875 fn rebuild_shared_engine_iv_surfaces(&self) {
1876 let mut shared = self
1877 .engine_iv_surfaces
1878 .write()
1879 .expect("engine IV surfaces lock poisoned");
1880 *shared = self.ctx.iv_surfaces.clone();
1881 }
1882
1883 pub async fn get_settlement_price(&self, underlying: &str, expiry_ts: i64) -> Option<Decimal> {
1885 self.expiry_manager
1886 .get_settlement_price(&self.ctx.deps, underlying, expiry_ts)
1887 .await
1888 }
1889
1890 #[cfg(any(test, feature = "test-utils"))]
1895 pub fn set_reference_price(&mut self, underlying: String, price: Decimal) {
1896 let price_f64 = price
1897 .to_f64()
1898 .expect("set_reference_price: invalid Decimal -> f64 conversion");
1899 self.ctx.deps.reference_prices.insert(underlying, price_f64);
1900 }
1901
1902 #[cfg(any(test, feature = "test-utils"))]
1907 pub fn set_account_cash(&mut self, wallet: &WalletAddress, cash: f64) {
1908 let cash_decimal = Decimal::from_f64_retain(cash)
1909 .expect("set_account_cash: invalid f64 -> Decimal conversion");
1910
1911 self.ctx.balance_ledger.set(*wallet, cash_decimal);
1912 }
1913
1914 pub async fn expire_instrument(
1916 &mut self,
1917 symbol: &str,
1918 reference_price: Decimal,
1919 now_ms: u64,
1920 ) -> Result<(), String> {
1921 self.expiry_manager
1922 .expire_instrument(symbol, reference_price, now_ms, &mut self.ctx)
1923 .await
1924 }
1925
1926 pub fn transition_to_pending_settlement(&mut self, symbols: &[String], now_ms: u64) {
1931 self.expiry_manager
1932 .transition_to_pending_settlement(symbols, now_ms, &mut self.ctx);
1933 }
1934
1935 pub fn get_pending_settlement_instruments(&self) -> Vec<(String, i64, Vec<String>)> {
1937 self.expiry_manager
1938 .get_pending_settlement_instruments(self.ctx.db.as_ref())
1939 }
1940
1941 #[cfg(any(test, feature = "test-utils"))]
1943 pub fn orderbooks_contains_key(&self, symbol: &str) -> bool {
1944 self.ctx.orderbooks.contains_key(symbol)
1945 }
1946
1947 #[cfg(any(test, feature = "test-utils"))]
1949 pub fn get_event_sender(&self) -> Option<mpsc::UnboundedSender<EngineMessage>> {
1950 Some(self.ctx.deps.event_sender.clone())
1951 }
1952
1953 pub fn get_all_book_snapshots(
1957 &self,
1958 ) -> HashMap<
1959 String,
1960 (
1961 Vec<(rust_decimal::Decimal, rust_decimal::Decimal)>,
1962 Vec<(rust_decimal::Decimal, rust_decimal::Decimal)>,
1963 ),
1964 > {
1965 self.ctx
1966 .orderbooks
1967 .iter()
1968 .map(|(symbol, book)| (symbol.clone(), book.get_orderbook_snapshot()))
1969 .collect()
1970 }
1971
1972 #[cfg(any(test, feature = "test-utils"))]
1974 pub fn insert_orderbook(&mut self, symbol: String, orderbook: OrderBook) {
1975 self.ctx.orderbooks.insert(symbol, orderbook);
1976 }
1977
1978 #[cfg(any(test, feature = "test-utils"))]
1980 pub fn track_expiry(&mut self, symbol: String, expiry: u64) {
1981 let underlying = symbol.split('-').next().unwrap_or_default().to_string();
1982 let expiry_timestamp =
1983 crate::rsm::margin_manager::expiry_date_to_timestamp(&underlying, expiry);
1984 self.expiry_manager
1985 .schedule_expiry(symbol, expiry_timestamp);
1986 }
1987
1988 #[cfg(any(test, feature = "test-utils"))]
1990 pub fn persist_test_orderbook(&self, symbol: &str, parsed_symbol: &ParsedSymbol) {
1991 if let Some(ref handler) = self.ctx.db {
1992 let option_type = match parsed_symbol.option_type {
1993 crate::types::OptionType::Call => hypercall_types::OptionType::Call,
1994 crate::types::OptionType::Put => hypercall_types::OptionType::Put,
1995 };
1996 let option_token_address =
1997 match crate::shared::option_token_address::derive_option_token_address(
1998 &parsed_symbol.underlying,
1999 parsed_symbol.expiry,
2000 parsed_symbol.strike,
2001 option_type.as_db_str(),
2002 ) {
2003 Ok(option_token_address) => option_token_address,
2004 Err(e) => {
2005 error!(
2006 symbol = symbol,
2007 error = %e,
2008 "Failed to derive option token address for persisted test orderbook"
2009 );
2010 return;
2011 }
2012 };
2013
2014 let new_instrument = hypercall_db::InstrumentRecord {
2016 instrument_numeric_id: 0,
2017 id: symbol.to_string(),
2018 underlying: parsed_symbol.underlying.clone(),
2019 strike: parsed_symbol.strike,
2020 expiry: parsed_symbol.expiry as i64,
2021 option_type,
2022 option_token_address: Some(option_token_address),
2023 status: hypercall_types::api_models::InstrumentStatus::Active,
2024 trading_mode: "orderbook".to_string(),
2025 };
2026
2027 if let Err(e) = handler.save_market_and_instrument_sync(
2029 &parsed_symbol.underlying,
2030 parsed_symbol.expiry as i64,
2031 &new_instrument,
2032 ) {
2033 error!("Failed to persist test orderbook to database: {}", e);
2034 } else {
2036 info!("Persisted test orderbook {} to database", symbol);
2037 }
2038 }
2039 }
2040}