Skip to main content

hypercall/rsm/
engine_deps.rs

1//! Shared dependency bundle for the UnifiedEngine and its sub-managers.
2//!
3//! `EngineDeps` groups all `Arc`-shared services that the engine's sub-managers
4//! need to borrow during order processing, margin checks, and expiry handling.
5//! By bundling these into a dedicated struct the engine avoids passing 10+ parameters
6//! to every delegated method call.
7
8use crate::portfolio::PortfolioService;
9use crate::price_oracle::hyperliquid_oracle::HyperliquidMarkPriceOracle;
10use crate::read_cache::greeks::GreeksCache;
11use crate::read_cache::portfolio::PortfolioCache;
12use crate::rsm::engine_state_snapshot::EngineStateSnapshot;
13use crate::rsm::ledger::BalanceLedger;
14use crate::shared::order_types::ParsedSymbol;
15use crate::standard_margin::StandardAccountBuilder;
16use crate::types::Config;
17use hypercall_db_diesel::DatabaseHandler;
18use hypercall_engine::order_index::EngineOrderIndex;
19use hypercall_engine::OrderBook;
20use hypercall_types::TradingModes;
21use hypercall_types::WalletAddress;
22use rust_decimal::prelude::ToPrimitive;
23use rust_decimal::Decimal;
24use serde::{Deserialize, Serialize};
25use std::collections::{BTreeSet, HashMap};
26use std::sync::atomic::{AtomicI64, Ordering};
27use std::sync::Arc;
28use tracing::{info, warn};
29
30// Re-export pure MMP and position types from hypercall-engine.
31pub use hypercall_engine::mmp::{EngineMmpState, MmpFillRecord};
32pub use hypercall_engine::position::EnginePosition;
33
34#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
35pub struct DepositUpdateWatermark {
36    #[serde(default)]
37    pub sequence: Option<u64>,
38    pub timestamp_ms: u64,
39    pub balance_after: Decimal,
40    #[serde(default)]
41    pub source: BalanceLedgerMutationSource,
42}
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
45pub enum BalanceLedgerMutationSource {
46    #[default]
47    DepositUpdate,
48    CashWithdrawal,
49}
50
51/// Shared, immutable (from the manager's perspective) dependencies.
52///
53/// The engine owns these fields and lends `&EngineDeps` to sub-managers
54/// so they can read caches, services, and configuration without owning clones.
55pub struct EngineDeps {
56    /// MMP (Market Maker Protection) cache.
57    pub mmp_cache: Option<Arc<crate::read_cache::mmp::MmpCache>>,
58
59    /// Tier cache for position-limit and margin-mode lookups.
60    pub tier_cache: Option<Arc<crate::read_cache::tier::TierCache>>,
61
62    /// Canonical source of truth for executed positions and cash.
63    pub portfolio_service: Option<Arc<dyn PortfolioService + Send + Sync>>,
64
65    /// Portfolio cache for synchronous fill application in the engine hot path.
66    pub portfolio_cache: Option<Arc<PortfolioCache>>,
67
68    /// Single source of truth for building a risk-ready `Account` for PM.
69    pub risk_account_builder:
70        Option<Arc<crate::rsm::portfolio_margin::risk_account_builder::RiskAccountBuilder>>,
71
72    /// Builder for Standard margin mode accounts.
73    pub standard_account_builder: Option<Arc<StandardAccountBuilder>>,
74
75    /// Spot price provider for margin calculations (legacy / test fallback).
76    pub greeks_cache: Option<Arc<GreeksCache>>,
77
78    /// Mark price oracles keyed by underlying (e.g. "BTC", "ETH").
79    pub mark_price_oracles: HashMap<String, Arc<HyperliquidMarkPriceOracle>>,
80
81    /// Legacy reference prices for test / expiry settlement fallback.
82    pub reference_prices: HashMap<String, f64>,
83
84    /// Pre-liquidation order blocking cache.
85    pub liquidation_cache: Option<Arc<crate::liquidator::LiquidationCache>>,
86
87    /// Engine configuration (scenarios, fee config, tolerances, etc.).
88    pub config: Config,
89
90    /// Global PM settlement-pool expiry classification gate.
91    pub portfolio_margin_pool_enabled: bool,
92
93    /// Wallets allowed to use PM settlement-pool expiry classification.
94    pub portfolio_margin_settlement_allowlist: BTreeSet<WalletAddress>,
95
96    /// Broadcast channel for engine events (order updates, L2, trades, …).
97    /// Goes to EventBus for persistence and cross-service sync.
98    pub event_sender: tokio::sync::mpsc::UnboundedSender<hypercall_types::EngineMessage>,
99
100    /// Direct channel for WebSocket event forwarding.
101    /// When set, events are sent here in addition to `event_sender` so that the
102    /// WsEventForwarder receives them with minimal latency.
103    pub ws_event_sender: Option<tokio::sync::mpsc::UnboundedSender<hypercall_types::EngineMessage>>,
104
105    /// Current command timestamp in seconds. Set from CommandEnvelope.received_ts_ms
106    /// at the top of apply(). Used by SPAN instead of wall clock for deterministic
107    /// time-to-expiry calculations.
108    pub margin_timestamp_s: i64,
109
110    /// Internal liquidation state per wallet, updated by LiquidationState commands.
111    pub liquidation_states: HashMap<WalletAddress, hypercall_types::LiquidationStateType>,
112
113    /// Internal margin mode per wallet, updated by TierUpdate commands.
114    pub wallet_margin_modes: HashMap<WalletAddress, crate::rsm::margin_mode::MarginMode>,
115
116    /// Internal MMP enabled state per (wallet, currency), updated by MmpConfigUpdate commands.
117    pub mmp_enabled: HashMap<(WalletAddress, String), bool>,
118
119    /// Internal trading limits per wallet, seeded from tier_cache at startup.
120    /// Updated by TierUpdate commands.
121    pub wallet_trading_limits: HashMap<WalletAddress, hypercall_types::api_models::TradingLimits>,
122
123    /// Configured fallback trading limits for wallets without explicit tier rows.
124    pub default_trading_limits: hypercall_types::api_models::TradingLimits,
125
126    /// Internal tier string per wallet (e.g. "tier1", "tier2", "market_maker").
127    /// Seeded from tier_cache at startup, updated by TierUpdate commands.
128    pub wallet_tiers: HashMap<WalletAddress, String>,
129
130    /// Internal perp positions per (account, coin), updated by HypercorePositionUpdate commands.
131    /// Keyed by (lowercase account string, coin string).
132    pub perp_positions: HashMap<(String, String), crate::hypercore::PerpPosition>,
133
134    /// HyperCore account equity per wallet (manager address), updated by
135    /// HypercoreEquityUpdate commands from the Hydromancer feed. For PM users,
136    /// this replaces the balance_ledger as the cash base for margin calculations.
137    /// The value is the HyperCore `accountValue` which includes perp UPNL.
138    pub hypercore_account_equity: HashMap<WalletAddress, Decimal>,
139
140    /// Last applied HyperCore equity update timestamp per wallet, in milliseconds.
141    /// Written by the Hydromancer HyperCore sync path and restored from snapshots
142    /// so runtime apply and replay reject stale accountValue updates.
143    pub hypercore_equity_timestamps: HashMap<WalletAddress, u64>,
144}
145
146impl EngineDeps {
147    /// Emit an engine event to both the event bus (persistence) and the direct WS channel.
148    ///
149    /// Both paths are best-effort (errors silently dropped). A dropped WS receiver
150    /// just means no WS clients.
151    pub fn emit_event(&self, event: &hypercall_types::EngineMessage) {
152        let _ = self.event_sender.send(event.clone());
153        if let Some(ref ws_tx) = self.ws_event_sender {
154            let _ = ws_tx.send(event.clone());
155        }
156    }
157}
158
159/// Classify a rejection reason string into a metric label.
160///
161/// Used by all order managers when recording rejection metrics.
162pub fn classify_rejection_reason(reason: &str) -> &'static str {
163    hypercall_engine::admission::classify_rejection_reason(reason)
164}
165
166/// Convert engine positions to a `PortfolioBalance` for margin calculations.
167///
168/// This is a runtime helper that uses crate-local types (`PortfolioBalance`,
169/// `PositionData`) which cannot live in the pure `hypercall-engine` crate.
170pub fn engine_positions_to_portfolio_balance(
171    positions: &HashMap<(WalletAddress, String), EnginePosition>,
172    wallet: &WalletAddress,
173    reference_prices: &HashMap<String, f64>,
174) -> crate::portfolio::PortfolioBalance {
175    use crate::portfolio::{PortfolioBalance, PositionData};
176    use rust_decimal_macros::dec;
177
178    let mut pos_map = HashMap::new();
179    for ((w, symbol), pos) in positions {
180        if w != wallet {
181            continue;
182        }
183        let upnl = if symbol.ends_with("-PERP") {
184            let underlying = symbol.split('-').next().unwrap_or(symbol);
185            if let Some(&spot) = reference_prices.get(underlying) {
186                let spot_dec = rust_decimal::Decimal::try_from(spot).unwrap_or(dec!(0));
187                (spot_dec - pos.entry_price) * pos.quantity
188            } else {
189                dec!(0)
190            }
191        } else {
192            dec!(0)
193        };
194        pos_map.insert(
195            symbol.clone(),
196            PositionData {
197                symbol: symbol.clone(),
198                amount: pos.quantity,
199                entry_price: pos.entry_price,
200                margin_posted: dec!(0),
201                realized_pnl: dec!(0),
202                unrealized_pnl: upnl,
203            },
204        );
205    }
206    PortfolioBalance {
207        positions: pos_map,
208        total_margin_used: dec!(0),
209    }
210}
211
212/// Apply a fill to the engine position map, creating or removing entries as needed.
213///
214/// This is a map-level helper that wraps [`EnginePosition::apply_fill`] with
215/// HashMap insert/remove semantics. The pure position math lives in the engine crate.
216pub fn apply_fill_to_positions(
217    positions: &mut HashMap<(WalletAddress, String), EnginePosition>,
218    wallet: WalletAddress,
219    symbol: String,
220    signed_qty: rust_decimal::Decimal,
221    fill_price: rust_decimal::Decimal,
222) {
223    use rust_decimal_macros::dec;
224
225    let key = (wallet, symbol);
226    let pos = positions.entry(key.clone()).or_insert(EnginePosition {
227        quantity: dec!(0),
228        entry_price: dec!(0),
229    });
230
231    pos.apply_fill(signed_qty, fill_price);
232
233    if pos.quantity == dec!(0) {
234        positions.remove(&key);
235    }
236}
237
238/// Credit externally deposited option-token inventory into engine positions.
239///
240/// This is deliberately not modeled as a zero-price fill. Depositing tokens
241/// moves already-owned inventory into HyperCall custody; it must not realize
242/// PnL, change cash, or rewrite the cost basis of an existing same-side
243/// position. If a deposit crosses a short through zero, the newly long
244/// remainder has unknown off-chain cost basis, so the engine records zero.
245pub fn apply_option_deposit_to_positions(
246    positions: &mut HashMap<(WalletAddress, String), EnginePosition>,
247    wallet: WalletAddress,
248    symbol: String,
249    quantity: rust_decimal::Decimal,
250) {
251    use rust_decimal_macros::dec;
252
253    assert!(
254        quantity > dec!(0),
255        "RUNTIME_INVARIANT: option deposit quantity must be positive"
256    );
257
258    let key = (wallet, symbol);
259    let Some(pos) = positions.get_mut(&key) else {
260        positions.insert(
261            key,
262            EnginePosition {
263                quantity,
264                entry_price: dec!(0),
265            },
266        );
267        return;
268    };
269
270    let old_qty = pos.quantity;
271    let old_entry = pos.entry_price;
272    let new_qty = old_qty + quantity;
273
274    if new_qty == dec!(0) {
275        positions.remove(&key);
276    } else {
277        pos.quantity = new_qty;
278        pos.entry_price = if old_qty > dec!(0) {
279            old_entry
280        } else if new_qty < dec!(0) {
281            old_entry
282        } else {
283            dec!(0)
284        };
285    }
286}
287
288#[cfg(test)]
289mod tests {
290    use super::*;
291    use alloy::primitives::Address;
292    use rust_decimal_macros::dec;
293
294    fn wallet(byte: u8) -> WalletAddress {
295        WalletAddress::from(Address::repeat_byte(byte))
296    }
297
298    #[test]
299    fn option_deposit_preserves_existing_long_cost_basis() {
300        let wallet = wallet(1);
301        let symbol = "BTC-20260130-100000-C".to_string();
302        let mut positions = HashMap::from([(
303            (wallet, symbol.clone()),
304            EnginePosition {
305                quantity: dec!(1),
306                entry_price: dec!(100),
307            },
308        )]);
309
310        apply_option_deposit_to_positions(&mut positions, wallet, symbol.clone(), dec!(1));
311
312        let position = positions
313            .get(&(wallet, symbol))
314            .expect("position should remain open");
315        assert_eq!(position.quantity, dec!(2));
316        assert_eq!(position.entry_price, dec!(100));
317    }
318}
319
320/// Debit option-token inventory for an on-chain withdrawal.
321///
322/// Withdrawals are custody movements out of the engine, not sells. They can
323/// only reduce existing long inventory and must not realize PnL or mutate cash.
324pub fn apply_option_withdrawal_to_positions(
325    positions: &mut HashMap<(WalletAddress, String), EnginePosition>,
326    wallet: WalletAddress,
327    symbol: String,
328    quantity: rust_decimal::Decimal,
329) -> Result<(), String> {
330    use rust_decimal_macros::dec;
331
332    if quantity <= dec!(0) {
333        return Err("option withdrawal quantity must be positive".to_string());
334    }
335
336    let key = (wallet, symbol);
337    let Some(pos) = positions.get_mut(&key) else {
338        return Err("insufficient option balance for withdrawal".to_string());
339    };
340    if pos.quantity < quantity {
341        return Err(format!(
342            "insufficient option balance for withdrawal: have {}, need {}",
343            pos.quantity, quantity
344        ));
345    }
346
347    pos.quantity -= quantity;
348    if pos.quantity == dec!(0) {
349        positions.remove(&key);
350    }
351    Ok(())
352}
353
354/// All mutable + immutable shared state that managers need to access.
355///
356/// Kept as a single struct field on `UnifiedEngine` to enable split borrows
357/// between `self.ctx` and `self.<manager>`.
358pub struct EngineCtx {
359    // ----- Mutable order state -----
360    /// Active orderbooks keyed by symbol.
361    pub orderbooks: HashMap<String, OrderBook>,
362
363    /// Next order ID to allocate (global monotonic counter).
364    pub next_order_id: u64,
365
366    /// Next trade ID to allocate (global monotonic counter).
367    pub next_trade_id: u64,
368
369    /// L2 sequence counter for orderbook snapshots.
370    pub l2_update_seq: Arc<AtomicI64>,
371
372    // ----- Expiry state -----
373    /// symbol → expired flag (shared between engine and ExpiryManager).
374    pub expired_instruments: HashMap<String, bool>,
375
376    // ----- Deterministic price state -----
377    /// Canonical spot prices per underlying, updated only by PriceUpdate commands.
378    /// Margin checks read from here instead of external caches, making the engine
379    /// deterministic and replayable.
380    pub spot_prices: HashMap<String, rust_decimal::Decimal>,
381
382    /// Canonical IV surfaces per underlying, updated only by IvUpdate commands.
383    /// The SpanMarginService reads from these instead of external vol oracles.
384    pub iv_surfaces: HashMap<String, crate::vol_oracle::vol_surface_cache::VolatilitySurface>,
385
386    /// Source timestamps for IV surfaces (from the upstream oracle snapshot).
387    /// Used for change detection to skip ingestion when the oracle hasn't updated.
388    pub iv_source_timestamps: HashMap<String, i64>,
389
390    // ----- Trading mode -----
391    /// Per-instrument trading mode (orderbook-only, rfq-only, or both).
392    pub instrument_trading_modes: HashMap<String, TradingModes>,
393
394    // ----- In-process order index -----
395    pub order_index: EngineOrderIndex,
396
397    // ----- Position tracking -----
398    /// Engine-owned option positions, updated from fills inside apply().
399    /// Single source of truth for margin checks. PortfolioService is a
400    /// downstream projection, not consulted during apply().
401    /// Perp positions are in deps.perp_positions (fed by HypercorePositionUpdate).
402    pub engine_positions: HashMap<(WalletAddress, String), EnginePosition>,
403
404    // ----- Cash tracking -----
405    /// Engine-owned cash balances per wallet. Updated from fill side effects
406    /// (realized PnL + premium deltas) and deposits. Margin checks read from
407    /// here instead of the async Ledger.
408    pub balance_ledger: BalanceLedger,
409
410    /// Last applied deposit update per wallet. This prevents an older
411    /// post-balance command from rolling balance_ledger backward after a newer
412    /// deposit has already applied.
413    pub deposit_update_watermarks: HashMap<WalletAddress, DepositUpdateWatermark>,
414
415    /// Applied cash DepositUpdate source event hashes.
416    ///
417    /// This is intentionally an incident-stabilization tradeoff, not the target
418    /// architecture. Cash DepositUpdate is now an additive delta, so the old
419    /// single per-wallet sequence watermark can under-credit if sequence 11 is
420    /// applied before an unseen sequence 10. The source_event_hash is the
421    /// deterministic idempotency key, and tracking it here makes duplicate
422    /// replay safe without dropping out-of-order lower-sequence deltas.
423    ///
424    /// The cost is that this set grows with every applied cash deposit and is
425    /// serialized in EngineStateSnapshot. That is unacceptable long-term. The
426    /// intended replacement is a bounded engine-owned cash sequencer, similar in
427    /// spirit to BoundedNonceSet but gap-buffered for ordered deltas: store only
428    /// applied_through plus a small pending window, require a per-wallet cash
429    /// sequence, apply contiguous deltas, and fail closed when the gap window is
430    /// exceeded. A DB-owned cash queue would also fix ordering, but it moves
431    /// authority away from the engine and back toward projection-driven cash
432    /// mutation, which is the wrong architectural direction.
433    pub applied_deposit_source_event_hashes: BTreeSet<alloy::primitives::FixedBytes<32>>,
434
435    /// Engine-owned PM settlement-pool authority.
436    ///
437    /// Projection tables are downstream audit surfaces and must never hydrate
438    /// this state on startup or rebuild.
439    pub pm_settlement_state: crate::rsm::portfolio_margin::settlement_state::PmSettlementState,
440
441    // ----- MMP state -----
442    /// Engine-internal MMP fill tracking per (wallet, currency).
443    /// Updated synchronously inside apply() so the matching loop never
444    /// performs async reads against the external MmpCache.
445    pub mmp_state: HashMap<(WalletAddress, String), EngineMmpState>,
446
447    // ----- Agent authorization state -----
448    /// Engine-owned agent authorizations: owner wallet → (agent wallet → optional expiry ms).
449    /// `None` = never expires. Updated by ApproveAgent/RevokeAgent commands.
450    /// Read via EngineSnapshot for lock-free API access.
451    pub agent_authorizations: HashMap<WalletAddress, HashMap<WalletAddress, Option<u64>>>,
452
453    // ----- Nonce replay protection -----
454    /// Per-signer bounded nonce sets (HL model). The engine stores the N highest
455    /// nonces per signer. A new nonce must be greater than the smallest in the
456    /// set and not already used. Shared across orders, agent auth, and QP
457    /// handshakes. Keyed by signer address (api_wallet for orders, recovered
458    /// signer for agent auth, QP wallet for handshakes).
459    pub nonce_sets: HashMap<WalletAddress, hypercall_engine::BoundedNonceSet>,
460
461    /// Canonical RSM signer nonce allocation. Directive-producing commands
462    /// allocate here during apply(), then append a NeedsRsmSignature outbox
463    /// item carrying the allocated nonce.
464    pub rsm_signer_nonces: HashMap<WalletAddress, u64>,
465
466    // ----- Immutable shared dependencies -----
467    pub deps: EngineDeps,
468
469    // ----- Persistence -----
470    pub db: Option<DatabaseHandler>,
471}
472
473impl EngineCtx {
474    fn symbol_is_expired_by_wall_clock(symbol: &str) -> Option<bool> {
475        let parsed = match ParsedSymbol::from_symbol(symbol) {
476            Ok(parsed) => parsed,
477            Err(error) => {
478                warn!(
479                    "Snapshot expired instrument {} has invalid symbol format: {}",
480                    symbol, error
481                );
482                return None;
483            }
484        };
485        let expiry_ts =
486            hypercall_types::expiry_date_to_timestamp(&parsed.underlying, parsed.expiry);
487        if expiry_ts == 0 {
488            warn!(
489                "Snapshot expired instrument {} has invalid expiry code {}",
490                symbol, parsed.expiry
491            );
492            return None;
493        }
494        let expiry_ts = u64::try_from(expiry_ts).unwrap_or_else(|_| {
495            panic!(
496                "STATE_CORRUPTION: expiry timestamp {} for {} is negative",
497                expiry_ts, symbol
498            )
499        });
500        let now = crate::shared::clock::unix_now_secs();
501        Some(now >= expiry_ts)
502    }
503
504    /// Restore engine state from a persistent snapshot.
505    ///
506    /// Sets counters, restores orderbook orders, rebuilds the order index,
507    /// and restores expired instruments. Called during startup before delta
508    /// replay to skip full journal replay.
509    pub fn restore_from_snapshot(&mut self, snapshot: &EngineStateSnapshot) {
510        // 1. Restore counters
511        self.next_order_id = snapshot.next_order_id;
512        self.next_trade_id = snapshot.next_trade_id;
513        self.l2_update_seq
514            .store(snapshot.last_l2_seq, Ordering::SeqCst);
515
516        // 2. Restore orderbook orders
517        let snapshot_symbols: BTreeSet<String> = snapshot.orderbooks.keys().cloned().collect();
518        let engine_symbols: BTreeSet<String> = self.orderbooks.keys().cloned().collect();
519        let snapshot_only: Vec<String> = snapshot_symbols
520            .difference(&engine_symbols)
521            .cloned()
522            .collect();
523        let snapshot_only_with_orders: Vec<String> = snapshot_only
524            .iter()
525            .filter_map(|symbol| {
526                snapshot
527                    .orderbooks
528                    .get(symbol)
529                    .filter(|entries| !entries.is_empty())
530                    .map(|_| symbol.clone())
531            })
532            .collect();
533        if !snapshot_only_with_orders.is_empty() {
534            let orphaned_order_count: usize = snapshot_only_with_orders
535                .iter()
536                .filter_map(|s| snapshot.orderbooks.get(s).map(|e| e.len()))
537                .sum();
538            warn!(
539                "Dropping {} orphaned orders across {} expired/unknown symbols from snapshot: {:?}",
540                orphaned_order_count,
541                snapshot_only_with_orders.len(),
542                snapshot_only_with_orders
543            );
544        }
545        if !snapshot_only.is_empty() {
546            warn!(
547                "Snapshot contains symbols not present in current engine orderbooks; skipping restore for {:?}",
548                snapshot_only
549            );
550        }
551
552        let mut total_orders = 0usize;
553        for (symbol, orderbook) in &mut self.orderbooks {
554            let orders: Vec<_> = snapshot
555                .orderbooks
556                .get(symbol)
557                .map(|entries| {
558                    entries
559                        .iter()
560                        .map(|e| {
561                            hypercall_engine::OrderRecord {
562                                order_id: e.order_id,
563                                price: e.price,
564                                quantity: e.quantity,
565                                side: e.side,
566                                wallet: e.wallet,
567                                timestamp: e.timestamp,
568                                client_id: e.client_id.clone(),
569                                mmp_enabled: e.mmp_enabled,
570                                // Old snapshots have original_size=None; fall back to remaining qty.
571                                original_size: e.original_size.unwrap_or(e.quantity),
572                            }
573                        })
574                        .collect()
575                })
576                .unwrap_or_default();
577            total_orders += orders.len();
578            orderbook.restore_from_orders(orders);
579        }
580
581        // 3. Rebuild order index from restored orderbooks
582        self.order_index.rebuild_from_orderbooks(&self.orderbooks);
583
584        // 4. Restore expired instruments, but keep DB startup recovery authoritative.
585        // A stale snapshot file can outlive a fresh database in smoke tests and
586        // must not mark active future instruments expired.
587        let db_recovered_expired = self.expired_instruments.clone();
588        let mut restored_expired = HashMap::new();
589        let mut dropped_stale_expired = 0usize;
590        for (symbol, expired) in &snapshot.expired_instruments {
591            if !*expired {
592                continue;
593            }
594            if db_recovered_expired.get(symbol) == Some(&true) {
595                restored_expired.insert(symbol.clone(), true);
596                continue;
597            }
598            match Self::symbol_is_expired_by_wall_clock(symbol) {
599                Some(true) | None => {
600                    restored_expired.insert(symbol.clone(), true);
601                }
602                Some(false) => {
603                    dropped_stale_expired += 1;
604                }
605            }
606        }
607        if dropped_stale_expired > 0 {
608            warn!(
609                "Dropped {} stale expired instrument flag(s) from engine state snapshot",
610                dropped_stale_expired
611            );
612        }
613        self.expired_instruments = restored_expired;
614
615        // 5. Restore deterministic market and policy state that is owned by
616        // engine commands. Snapshot restore must not depend on live caches for
617        // these replayable inputs.
618        self.spot_prices = snapshot.spot_prices.clone();
619        self.deps.reference_prices = snapshot
620            .spot_prices
621            .iter()
622            .filter_map(|(underlying, price)| {
623                price
624                    .to_f64()
625                    .map(|price| (underlying.clone(), price))
626                    .or_else(|| {
627                        warn!(
628                            underlying = %underlying,
629                            price = %price,
630                            "Skipping restored spot price that cannot be represented as f64"
631                        );
632                        None
633                    })
634            })
635            .collect();
636        self.iv_surfaces = snapshot
637            .iv_surfaces
638            .clone()
639            .into_iter()
640            .map(|(underlying, surface)| (underlying, surface.into_surface()))
641            .collect();
642        self.iv_source_timestamps = snapshot.iv_source_timestamps.clone();
643        if snapshot.instrument_trading_modes.is_empty() && !self.instrument_trading_modes.is_empty()
644        {
645            warn!(
646                seeded_modes = self.instrument_trading_modes.len(),
647                "Preserving DB-seeded instrument trading modes because snapshot has none"
648            );
649        } else {
650            self.instrument_trading_modes = snapshot.instrument_trading_modes.clone();
651        }
652
653        // 6. Restore engine positions
654        self.engine_positions = snapshot.engine_positions.clone();
655        self.mmp_state = snapshot.mmp_state.clone();
656        self.deps.liquidation_states = snapshot.liquidation_states.clone();
657        if snapshot.wallet_margin_modes.is_empty() && !self.deps.wallet_margin_modes.is_empty() {
658            warn!(
659                seeded_modes = self.deps.wallet_margin_modes.len(),
660                "Preserving tier-cache seeded wallet margin modes because snapshot has none"
661            );
662        } else {
663            self.deps.wallet_margin_modes = snapshot.wallet_margin_modes.clone();
664        }
665        self.deps.mmp_enabled = snapshot.mmp_enabled.clone();
666        if snapshot.wallet_trading_limits.is_empty()
667            && snapshot.wallet_tiers.is_empty()
668            && !self.deps.wallet_trading_limits.is_empty()
669        {
670            warn!(
671                seeded_limits = self.deps.wallet_trading_limits.len(),
672                seeded_tiers = self.deps.wallet_tiers.len(),
673                "Preserving tier-cache seeded trading limits because snapshot has none"
674            );
675        } else {
676            self.deps.wallet_trading_limits = snapshot.wallet_trading_limits.clone();
677            self.deps.default_trading_limits = snapshot.default_trading_limits;
678            self.deps.wallet_tiers = snapshot.wallet_tiers.clone();
679        }
680
681        // 7. Restore runtime balances from the replay-bound engine snapshot.
682        // Engine snapshots plus WAL are the only cash restart authority.
683        self.balance_ledger = BalanceLedger::from_map_with_sequence(
684            snapshot.balance_ledger.clone(),
685            snapshot.last_balance_update_seq,
686        );
687        info!(
688            balance_wallet_count = self.balance_ledger.len(),
689            snapshot_last_command_id = snapshot.last_command_id,
690            snapshot_last_l2_seq = snapshot.last_l2_seq,
691            "Restored balance_ledger from engine state snapshot"
692        );
693
694        // 8. Restore deposit update watermarks
695        self.deposit_update_watermarks = snapshot.deposit_update_watermarks.clone();
696        self.applied_deposit_source_event_hashes =
697            snapshot.applied_deposit_source_event_hashes.clone();
698        self.pm_settlement_state = snapshot.pm_settlement_state.clone();
699
700        // 9. Restore agent authorizations
701        self.agent_authorizations = snapshot
702            .agent_authorizations
703            .iter()
704            .map(|(wallet, agents)| {
705                let map: HashMap<WalletAddress, Option<u64>> = agents
706                    .iter()
707                    .map(|(agent, expires)| (*agent, *expires))
708                    .collect();
709                (*wallet, map)
710            })
711            .collect();
712        // 10. Restore nonce sets
713        self.nonce_sets.clear();
714        for (signer, nonces_vec) in &snapshot.nonce_sets {
715            self.nonce_sets.insert(
716                *signer,
717                hypercall_engine::BoundedNonceSet::from_vec(
718                    nonces_vec.clone(),
719                    hypercall_engine::nonce::DEFAULT_NONCE_SET_CAPACITY,
720                ),
721            );
722        }
723        // Backward compat: migrate old watermark entries as single-element sets
724        for (wallet, watermark) in &snapshot.nonce_watermarks {
725            self.nonce_sets.entry(*wallet).or_insert_with(|| {
726                let mut set = hypercall_engine::BoundedNonceSet::new(
727                    hypercall_engine::nonce::DEFAULT_NONCE_SET_CAPACITY,
728                );
729                set.insert(*watermark);
730                set
731            });
732        }
733        self.rsm_signer_nonces = snapshot.rsm_signer_nonces.clone();
734
735        // 11. Restore HyperCore account equity
736        self.deps.hypercore_account_equity = snapshot.hypercore_account_equity.clone();
737        self.deps.hypercore_equity_timestamps = snapshot.hypercore_equity_timestamps.clone();
738        self.deps.perp_positions = snapshot.perp_positions.clone();
739
740        info!(
741            "Restored engine state from snapshot: next_order_id={}, next_trade_id={}, \
742             l2_seq={}, orderbooks={}, orders={}, expired={}, hypercore_equity_wallets={}",
743            self.next_order_id,
744            self.next_trade_id,
745            snapshot.last_l2_seq,
746            snapshot.orderbooks.len(),
747            total_orders,
748            self.expired_instruments.len(),
749            self.deps.hypercore_account_equity.len(),
750        );
751    }
752}