Skip to main content

hypercall/rsm/
engine_state_snapshot.rs

1//! Persistent engine state snapshots for fast restart recovery.
2//!
3//! After journal replay the engine writes a snapshot of its core state
4//! (orderbooks, counters, expired instruments) to disk. On the next
5//! restart the snapshot is loaded first, and only the delta since the
6//! snapshot's `last_command_id` needs to be replayed from the journal.
7//!
8//! **File format**: `[msgpack payload][4-byte CRC32 LE]`
9//!
10//! **Atomic write**: temp → fsync → rename (same as journal checkpoint writes).
11
12use crate::rsm::engine_deps::EngineCtx;
13use crc::{Crc, CRC_32_ISO_HDLC};
14use hypercall_journal::checkpoint::WalCheckpointMetadata;
15use hypercall_types::{Side, WalletAddress};
16use rust_decimal::Decimal;
17use serde::{Deserialize, Serialize};
18use std::collections::HashMap;
19use std::fs::{create_dir_all, OpenOptions};
20use std::io::Write;
21use std::path::{Path, PathBuf};
22use tracing::{info, warn};
23
24const SNAPSHOT_CRC: Crc<u32> = Crc::<u32>::new(&CRC_32_ISO_HDLC);
25
26/// Current snapshot format version.
27const SNAPSHOT_VERSION: u32 = 3;
28const LEGACY_SNAPSHOT_VERSION_V2: u32 = 2;
29const LEGACY_SNAPSHOT_VERSION_V1: u32 = 1;
30const LEGACY_BALANCE_UPDATE_SEQ_MIGRATION_EPOCH: u64 = 0;
31
32fn msgpack_array_start(payload: &[u8]) -> Option<(usize, usize)> {
33    let marker = *payload.first()?;
34    match marker {
35        0x90..=0x9f => Some(((marker & 0x0f) as usize, 1)),
36        0xdc => {
37            let bytes: [u8; 2] = payload.get(1..3)?.try_into().ok()?;
38            Some((u16::from_be_bytes(bytes) as usize, 3))
39        }
40        0xdd => {
41            let bytes: [u8; 4] = payload.get(1..5)?.try_into().ok()?;
42            Some((u32::from_be_bytes(bytes) as usize, 5))
43        }
44        _ => None,
45    }
46}
47
48fn msgpack_map_start(payload: &[u8]) -> Option<(usize, usize)> {
49    let marker = *payload.first()?;
50    match marker {
51        0x80..=0x8f => Some(((marker & 0x0f) as usize, 1)),
52        0xde => {
53            let bytes: [u8; 2] = payload.get(1..3)?.try_into().ok()?;
54            Some((u16::from_be_bytes(bytes) as usize, 3))
55        }
56        0xdf => {
57            let bytes: [u8; 4] = payload.get(1..5)?.try_into().ok()?;
58            Some((u32::from_be_bytes(bytes) as usize, 5))
59        }
60        _ => None,
61    }
62}
63
64fn msgpack_u32_at(payload: &[u8], offset: usize) -> Option<u32> {
65    let marker = *payload.get(offset)?;
66    match marker {
67        0x00..=0x7f => Some(marker as u32),
68        0xcc => Some(*payload.get(offset + 1)? as u32),
69        0xcd => {
70            let bytes: [u8; 2] = payload.get(offset + 1..offset + 3)?.try_into().ok()?;
71            Some(u16::from_be_bytes(bytes) as u32)
72        }
73        0xce => {
74            let bytes: [u8; 4] = payload.get(offset + 1..offset + 5)?.try_into().ok()?;
75            Some(u32::from_be_bytes(bytes))
76        }
77        _ => None,
78    }
79}
80
81fn msgpack_str_at(payload: &[u8], offset: usize) -> Option<(&str, usize)> {
82    let marker = *payload.get(offset)?;
83    let (len, start) = match marker {
84        0xa0..=0xbf => ((marker & 0x1f) as usize, offset + 1),
85        0xd9 => (*payload.get(offset + 1)? as usize, offset + 2),
86        0xda => {
87            let bytes: [u8; 2] = payload.get(offset + 1..offset + 3)?.try_into().ok()?;
88            (u16::from_be_bytes(bytes) as usize, offset + 3)
89        }
90        0xdb => {
91            let bytes: [u8; 4] = payload.get(offset + 1..offset + 5)?.try_into().ok()?;
92            (u32::from_be_bytes(bytes) as usize, offset + 5)
93        }
94        _ => return None,
95    };
96    let end = start.checked_add(len)?;
97    std::str::from_utf8(payload.get(start..end)?)
98        .ok()
99        .map(|value| (value, end))
100}
101
102fn snapshot_payload_version(payload: &[u8]) -> Option<u32> {
103    if let Some((len, offset)) = msgpack_array_start(payload) {
104        if len == 0 {
105            return None;
106        }
107        return msgpack_u32_at(payload, offset);
108    }
109
110    if let Some((len, offset)) = msgpack_map_start(payload) {
111        if len == 0 {
112            return None;
113        }
114        if let Some((key, value_offset)) = msgpack_str_at(payload, offset) {
115            if key == "version" {
116                return msgpack_u32_at(payload, value_offset);
117            }
118        }
119    }
120
121    let value: serde_json::Value = rmp_serde::from_slice(payload).ok()?;
122    match value {
123        serde_json::Value::Array(fields) => fields.first()?.as_u64()?.try_into().ok(),
124        serde_json::Value::Object(fields) => fields.get("version")?.as_u64()?.try_into().ok(),
125        _ => None,
126    }
127}
128
129/// Persistent engine state snapshot for fast restart recovery.
130#[derive(Debug, Serialize, Deserialize)]
131pub struct EngineStateSnapshot {
132    /// Format version for forward compatibility.
133    pub version: u32,
134    /// Last command ID replicated to Postgres (from checkpoint at write time).
135    pub last_command_id: i64,
136    /// Last L2 sequence number (from checkpoint at write time).
137    pub last_l2_seq: i64,
138    /// Next order ID counter.
139    pub next_order_id: u64,
140    /// Next trade ID counter.
141    pub next_trade_id: u64,
142    /// Expired instruments map (symbol → expired flag).
143    pub expired_instruments: HashMap<String, bool>,
144    /// Orderbook orders keyed by symbol.
145    pub orderbooks: HashMap<String, Vec<OrderSnapshotEntry>>,
146    /// Engine-owned positions (wallet+symbol -> quantity+entry_price).
147    #[serde(default)]
148    pub engine_positions: HashMap<(WalletAddress, String), crate::rsm::engine_deps::EnginePosition>,
149    /// Engine-owned cash balances per wallet.
150    pub balance_ledger: HashMap<WalletAddress, rust_decimal::Decimal>,
151    /// Last applied canonical balance update sequence.
152    pub last_balance_update_seq: u64,
153    /// Last applied deposit update per wallet.
154    #[serde(default)]
155    pub deposit_update_watermarks:
156        HashMap<WalletAddress, crate::rsm::engine_deps::DepositUpdateWatermark>,
157    /// Agent authorizations: owner wallet → list of (agent wallet, optional expiry ms).
158    #[serde(default)]
159    pub agent_authorizations: HashMap<WalletAddress, Vec<(WalletAddress, Option<u64>)>>,
160    /// Legacy marker retained only so existing snapshots deserialize.
161    #[serde(default)]
162    pub agent_auth_loaded: bool,
163    /// Per-wallet nonce high-water marks for replay protection (legacy format).
164    #[serde(default)]
165    pub nonce_watermarks: HashMap<WalletAddress, u64>,
166    /// Per-signer bounded nonce sets (HL model). Serialized as sorted Vec<u64>.
167    #[serde(default)]
168    pub nonce_sets: HashMap<WalletAddress, Vec<u64>>,
169    /// HyperCore account equity per wallet for PM margin calculations.
170    #[serde(default)]
171    pub hypercore_account_equity: HashMap<WalletAddress, rust_decimal::Decimal>,
172    /// Last applied HyperCore equity update timestamp per wallet.
173    #[serde(default)]
174    pub hypercore_equity_timestamps: HashMap<WalletAddress, u64>,
175    /// HyperCore perp positions per account and coin for PM margin calculations.
176    #[serde(default)]
177    pub perp_positions: HashMap<(String, String), crate::hypercore::PerpPosition>,
178    /// Canonical spot prices used by deterministic margin checks.
179    #[serde(default)]
180    pub spot_prices: HashMap<String, rust_decimal::Decimal>,
181    /// Canonical IV surfaces used by deterministic margin checks.
182    #[serde(default)]
183    pub iv_surfaces: HashMap<String, VolatilitySurfaceSnapshot>,
184    /// Source timestamps for canonical IV surfaces.
185    #[serde(default)]
186    pub iv_source_timestamps: HashMap<String, i64>,
187    /// Per-instrument trading mode.
188    #[serde(default)]
189    pub instrument_trading_modes: HashMap<String, hypercall_types::TradingModes>,
190    /// Engine-owned MMP state per wallet/currency.
191    #[serde(default)]
192    pub mmp_state: HashMap<(WalletAddress, String), crate::rsm::engine_deps::EngineMmpState>,
193    /// Engine-owned liquidation state per wallet.
194    #[serde(default)]
195    pub liquidation_states: HashMap<WalletAddress, hypercall_types::LiquidationStateType>,
196    /// Engine-owned margin mode per wallet.
197    #[serde(default)]
198    pub wallet_margin_modes: HashMap<WalletAddress, crate::rsm::margin_mode::MarginMode>,
199    /// Engine-owned MMP enabled state per wallet/currency.
200    #[serde(default)]
201    pub mmp_enabled: HashMap<(WalletAddress, String), bool>,
202    /// Engine-owned trading limits per wallet.
203    #[serde(default)]
204    pub wallet_trading_limits: HashMap<WalletAddress, hypercall_types::api_models::TradingLimits>,
205    /// Default trading limits used when a wallet has no explicit tier.
206    #[serde(default)]
207    pub default_trading_limits: hypercall_types::api_models::TradingLimits,
208    /// Engine-owned tier name per wallet.
209    #[serde(default)]
210    pub wallet_tiers: HashMap<WalletAddress, String>,
211    /// Canonical next RSM nonce per signer.
212    #[serde(default)]
213    pub rsm_signer_nonces: HashMap<WalletAddress, u64>,
214    /// Applied cash DepositUpdate source event hashes.
215    ///
216    /// This is intentionally unbounded incident-stabilization state. See
217    /// EngineCtx field docs for the replacement plan.
218    #[serde(default)]
219    pub applied_deposit_source_event_hashes:
220        std::collections::BTreeSet<alloy::primitives::FixedBytes<32>>,
221    /// Engine-owned PM settlement-pool state. Projection rows are downstream
222    /// and must never hydrate this field.
223    #[serde(default)]
224    pub pm_settlement_state: crate::rsm::portfolio_margin::settlement_state::PmSettlementState,
225}
226
227/// Pre-CALL-1707 snapshot layout without `last_balance_update_seq`.
228///
229/// Temporary legacy migration hook: this is only for environments that still
230/// have a v1/v2 snapshot and a hot journal tail that starts after command 1.
231/// It must be removed once every deployed environment has written a v3
232/// snapshot with canonical balance-update sequence metadata.
233#[derive(Debug, Deserialize)]
234struct LegacyEngineStateSnapshot {
235    version: u32,
236    last_command_id: i64,
237    last_l2_seq: i64,
238    next_order_id: u64,
239    next_trade_id: u64,
240    expired_instruments: HashMap<String, bool>,
241    orderbooks: HashMap<String, Vec<OrderSnapshotEntry>>,
242    #[serde(default)]
243    engine_positions: HashMap<(WalletAddress, String), crate::rsm::engine_deps::EnginePosition>,
244    balance_ledger: HashMap<WalletAddress, rust_decimal::Decimal>,
245    #[serde(default)]
246    deposit_update_watermarks:
247        HashMap<WalletAddress, crate::rsm::engine_deps::DepositUpdateWatermark>,
248    #[serde(default)]
249    agent_authorizations: HashMap<WalletAddress, Vec<(WalletAddress, Option<u64>)>>,
250    #[serde(default)]
251    agent_auth_loaded: bool,
252    #[serde(default)]
253    nonce_watermarks: HashMap<WalletAddress, u64>,
254    #[serde(default)]
255    nonce_sets: HashMap<WalletAddress, Vec<u64>>,
256    #[serde(default)]
257    hypercore_account_equity: HashMap<WalletAddress, rust_decimal::Decimal>,
258    #[serde(default)]
259    hypercore_equity_timestamps: HashMap<WalletAddress, u64>,
260    #[serde(default)]
261    perp_positions: HashMap<(String, String), crate::hypercore::PerpPosition>,
262    #[serde(default)]
263    spot_prices: HashMap<String, rust_decimal::Decimal>,
264    #[serde(default)]
265    iv_surfaces: HashMap<String, VolatilitySurfaceSnapshot>,
266    #[serde(default)]
267    iv_source_timestamps: HashMap<String, i64>,
268    #[serde(default)]
269    instrument_trading_modes: HashMap<String, hypercall_types::TradingModes>,
270    #[serde(default)]
271    mmp_state: HashMap<(WalletAddress, String), crate::rsm::engine_deps::EngineMmpState>,
272    #[serde(default)]
273    liquidation_states: HashMap<WalletAddress, hypercall_types::LiquidationStateType>,
274    #[serde(default)]
275    wallet_margin_modes: HashMap<WalletAddress, crate::rsm::margin_mode::MarginMode>,
276    #[serde(default)]
277    mmp_enabled: HashMap<(WalletAddress, String), bool>,
278    #[serde(default)]
279    wallet_trading_limits: HashMap<WalletAddress, hypercall_types::api_models::TradingLimits>,
280    #[serde(default)]
281    default_trading_limits: hypercall_types::api_models::TradingLimits,
282    #[serde(default)]
283    wallet_tiers: HashMap<WalletAddress, String>,
284    #[serde(default)]
285    rsm_signer_nonces: HashMap<WalletAddress, u64>,
286    #[serde(default)]
287    applied_deposit_source_event_hashes:
288        std::collections::BTreeSet<alloy::primitives::FixedBytes<32>>,
289}
290
291/// Older v2 layout from before snapshot fields were made append-only.
292///
293/// Temporary legacy migration hook: remove with `LegacyEngineStateSnapshot`
294/// after all deployed environments have written v3 snapshots.
295#[derive(Debug, Deserialize)]
296struct LegacyEngineStateSnapshotPreAppendOnlyV2 {
297    version: u32,
298    last_command_id: i64,
299    last_l2_seq: i64,
300    next_order_id: u64,
301    next_trade_id: u64,
302    expired_instruments: HashMap<String, bool>,
303    orderbooks: HashMap<String, Vec<OrderSnapshotEntry>>,
304    #[serde(default)]
305    engine_positions: HashMap<(WalletAddress, String), crate::rsm::engine_deps::EnginePosition>,
306    balance_ledger: HashMap<WalletAddress, rust_decimal::Decimal>,
307    #[serde(default)]
308    deposit_update_watermarks:
309        HashMap<WalletAddress, crate::rsm::engine_deps::DepositUpdateWatermark>,
310    #[serde(default)]
311    applied_deposit_source_event_hashes:
312        std::collections::BTreeSet<alloy::primitives::FixedBytes<32>>,
313    #[serde(default)]
314    agent_authorizations: HashMap<WalletAddress, Vec<(WalletAddress, Option<u64>)>>,
315    #[serde(default)]
316    agent_auth_loaded: bool,
317    #[serde(default)]
318    nonce_watermarks: HashMap<WalletAddress, u64>,
319    #[serde(default)]
320    nonce_sets: HashMap<WalletAddress, Vec<u64>>,
321    #[serde(default)]
322    hypercore_account_equity: HashMap<WalletAddress, rust_decimal::Decimal>,
323    #[serde(default)]
324    hypercore_equity_timestamps: HashMap<WalletAddress, u64>,
325    #[serde(default)]
326    perp_positions: HashMap<(String, String), crate::hypercore::PerpPosition>,
327    #[serde(default)]
328    spot_prices: HashMap<String, rust_decimal::Decimal>,
329    #[serde(default)]
330    iv_surfaces: HashMap<String, VolatilitySurfaceSnapshot>,
331    #[serde(default)]
332    iv_source_timestamps: HashMap<String, i64>,
333    #[serde(default)]
334    instrument_trading_modes: HashMap<String, hypercall_types::TradingModes>,
335    #[serde(default)]
336    mmp_state: HashMap<(WalletAddress, String), crate::rsm::engine_deps::EngineMmpState>,
337    #[serde(default)]
338    liquidation_states: HashMap<WalletAddress, hypercall_types::LiquidationStateType>,
339    #[serde(default)]
340    wallet_margin_modes: HashMap<WalletAddress, crate::rsm::margin_mode::MarginMode>,
341    #[serde(default)]
342    mmp_enabled: HashMap<(WalletAddress, String), bool>,
343    #[serde(default)]
344    wallet_trading_limits: HashMap<WalletAddress, hypercall_types::api_models::TradingLimits>,
345    #[serde(default)]
346    default_trading_limits: hypercall_types::api_models::TradingLimits,
347    #[serde(default)]
348    wallet_tiers: HashMap<WalletAddress, String>,
349    #[serde(default)]
350    rsm_signer_nonces: HashMap<WalletAddress, u64>,
351}
352
353impl From<LegacyEngineStateSnapshotPreAppendOnlyV2> for LegacyEngineStateSnapshot {
354    fn from(snapshot: LegacyEngineStateSnapshotPreAppendOnlyV2) -> Self {
355        Self {
356            version: snapshot.version,
357            last_command_id: snapshot.last_command_id,
358            last_l2_seq: snapshot.last_l2_seq,
359            next_order_id: snapshot.next_order_id,
360            next_trade_id: snapshot.next_trade_id,
361            expired_instruments: snapshot.expired_instruments,
362            orderbooks: snapshot.orderbooks,
363            engine_positions: snapshot.engine_positions,
364            balance_ledger: snapshot.balance_ledger,
365            deposit_update_watermarks: snapshot.deposit_update_watermarks,
366            agent_authorizations: snapshot.agent_authorizations,
367            agent_auth_loaded: snapshot.agent_auth_loaded,
368            nonce_watermarks: snapshot.nonce_watermarks,
369            nonce_sets: snapshot.nonce_sets,
370            hypercore_account_equity: snapshot.hypercore_account_equity,
371            hypercore_equity_timestamps: snapshot.hypercore_equity_timestamps,
372            perp_positions: snapshot.perp_positions,
373            spot_prices: snapshot.spot_prices,
374            iv_surfaces: snapshot.iv_surfaces,
375            iv_source_timestamps: snapshot.iv_source_timestamps,
376            instrument_trading_modes: snapshot.instrument_trading_modes,
377            mmp_state: snapshot.mmp_state,
378            liquidation_states: snapshot.liquidation_states,
379            wallet_margin_modes: snapshot.wallet_margin_modes,
380            mmp_enabled: snapshot.mmp_enabled,
381            wallet_trading_limits: snapshot.wallet_trading_limits,
382            default_trading_limits: snapshot.default_trading_limits,
383            wallet_tiers: snapshot.wallet_tiers,
384            rsm_signer_nonces: snapshot.rsm_signer_nonces,
385            applied_deposit_source_event_hashes: snapshot.applied_deposit_source_event_hashes,
386        }
387    }
388}
389
390impl LegacyEngineStateSnapshot {
391    fn into_migration_snapshot(mut self, path: &Path) -> EngineStateSnapshot {
392        let missing_hypercore_equity_watermarks: Vec<WalletAddress> = self
393            .hypercore_account_equity
394            .keys()
395            .filter(|wallet| !self.hypercore_equity_timestamps.contains_key(wallet))
396            .copied()
397            .collect();
398        if !missing_hypercore_equity_watermarks.is_empty() {
399            warn!(
400                snapshot_path = %path.display(),
401                missing_watermark_count = missing_hypercore_equity_watermarks.len(),
402                "Legacy engine snapshot has HyperCore equity without timestamp watermarks; seeding zero watermarks for one-time v3 migration replay"
403            );
404            for wallet in missing_hypercore_equity_watermarks {
405                self.hypercore_equity_timestamps.insert(wallet, 0);
406            }
407        }
408
409        EngineStateSnapshot {
410            version: SNAPSHOT_VERSION,
411            last_command_id: self.last_command_id,
412            last_l2_seq: self.last_l2_seq,
413            next_order_id: self.next_order_id,
414            next_trade_id: self.next_trade_id,
415            expired_instruments: self.expired_instruments,
416            orderbooks: self.orderbooks,
417            engine_positions: self.engine_positions,
418            balance_ledger: self.balance_ledger,
419            last_balance_update_seq: LEGACY_BALANCE_UPDATE_SEQ_MIGRATION_EPOCH,
420            deposit_update_watermarks: self.deposit_update_watermarks,
421            agent_authorizations: self.agent_authorizations,
422            agent_auth_loaded: self.agent_auth_loaded,
423            nonce_watermarks: self.nonce_watermarks,
424            nonce_sets: self.nonce_sets,
425            hypercore_account_equity: self.hypercore_account_equity,
426            hypercore_equity_timestamps: self.hypercore_equity_timestamps,
427            perp_positions: self.perp_positions,
428            spot_prices: self.spot_prices,
429            iv_surfaces: self.iv_surfaces,
430            iv_source_timestamps: self.iv_source_timestamps,
431            instrument_trading_modes: self.instrument_trading_modes,
432            mmp_state: self.mmp_state,
433            liquidation_states: self.liquidation_states,
434            wallet_margin_modes: self.wallet_margin_modes,
435            mmp_enabled: self.mmp_enabled,
436            wallet_trading_limits: self.wallet_trading_limits,
437            default_trading_limits: self.default_trading_limits,
438            wallet_tiers: self.wallet_tiers,
439            rsm_signer_nonces: self.rsm_signer_nonces,
440            applied_deposit_source_event_hashes: self.applied_deposit_source_event_hashes,
441            pm_settlement_state:
442                crate::rsm::portfolio_margin::settlement_state::PmSettlementState::default(),
443        }
444    }
445}
446
447fn decode_legacy_engine_state_snapshot(
448    payload: &[u8],
449    path: &Path,
450) -> Result<(LegacyEngineStateSnapshot, &'static str), String> {
451    match rmp_serde::from_slice::<LegacyEngineStateSnapshot>(payload) {
452        Ok(snapshot) => Ok((snapshot, "append-only-v1-v2")),
453        Err(append_only_error) => {
454            let pre_append_snapshot =
455                rmp_serde::from_slice::<LegacyEngineStateSnapshotPreAppendOnlyV2>(payload)
456                    .map_err(|pre_append_error| {
457                        format!(
458                            "failed to deserialize legacy snapshot {} for CALL-1707 migration replay; append-only layout error: {}; pre-append-only v2 layout error: {}",
459                            path.display(),
460                            append_only_error,
461                            pre_append_error,
462                        )
463                    })?;
464            Ok((pre_append_snapshot.into(), "pre-append-only-v2"))
465        }
466    }
467}
468
469#[derive(Debug, Clone, Serialize, Deserialize)]
470pub struct VolatilitySurfaceSnapshot {
471    #[serde(default)]
472    pub strike_points: Vec<crate::vol_oracle::vol_surface_cache::VolPoint>,
473    #[serde(default)]
474    pub atm_vols: Vec<(i64, f64)>,
475    #[serde(default)]
476    pub delta_curves: Vec<crate::vol_oracle::vol_surface_cache::DeltaCurveExport>,
477}
478
479impl From<&crate::vol_oracle::vol_surface_cache::VolatilitySurface> for VolatilitySurfaceSnapshot {
480    fn from(surface: &crate::vol_oracle::vol_surface_cache::VolatilitySurface) -> Self {
481        Self {
482            strike_points: surface.export_all_points(),
483            atm_vols: surface.export_atm_vols(),
484            delta_curves: surface.export_delta_curves(),
485        }
486    }
487}
488
489impl VolatilitySurfaceSnapshot {
490    pub fn into_surface(self) -> crate::vol_oracle::vol_surface_cache::VolatilitySurface {
491        let mut surface = crate::vol_oracle::vol_surface_cache::VolatilitySurface::new();
492        for point in self.strike_points {
493            surface.insert(point.strike, point.expiry, point.iv);
494        }
495        for (expiry, iv) in self.atm_vols {
496            surface.set_atm_vol(expiry, iv);
497        }
498        for curve in self.delta_curves {
499            for point in curve.points {
500                surface.set_delta_iv(curve.expiry, point.delta, point.iv);
501            }
502        }
503        surface
504    }
505}
506
507/// A single order in the snapshot, matching `OrderBook::get_all_orders()` output.
508///
509/// New fields use `#[serde(default)]` for backwards compatibility: old snapshots
510/// without these fields will deserialize with `None`/`false`/`None` defaults.
511#[derive(Debug, Serialize, Deserialize)]
512pub struct OrderSnapshotEntry {
513    pub order_id: u64,
514    pub price: Decimal,
515    pub quantity: Decimal,
516    pub side: Side,
517    pub wallet: WalletAddress,
518    pub timestamp: u64,
519    /// Client ID (e.g. "cli_123" for MM orders). Absent in old snapshots.
520    #[serde(default)]
521    pub client_id: Option<String>,
522    /// Whether MMP (market-maker protection) is enabled. Absent in old snapshots.
523    #[serde(default)]
524    pub mmp_enabled: bool,
525    /// Original order size at placement time. `None` in old snapshots means
526    /// fall back to `quantity` (remaining size) as a best-effort default.
527    #[serde(default)]
528    pub original_size: Option<Decimal>,
529}
530
531/// Derive the snapshot file path from the WAL path.
532pub fn snapshot_path_from_wal_path(wal_path: &Path) -> PathBuf {
533    let mut os = wal_path.as_os_str().to_os_string();
534    os.push(".engine_snapshot");
535    PathBuf::from(os)
536}
537
538/// Build a snapshot from current engine state, tagged with the checkpoint boundary.
539///
540/// **Critical**: `last_command_id` and `last_l2_seq` come from the WAL checkpoint
541/// (the Postgres-replicated boundary), NOT from the live engine. This ensures
542/// that delta replay on the next restart queries Postgres starting from a point
543/// where all commands have been replicated.
544pub fn build_snapshot(ctx: &EngineCtx, checkpoint: &WalCheckpointMetadata) -> EngineStateSnapshot {
545    let mut orderbooks = HashMap::new();
546    let mut total_orders = 0u64;
547
548    for (symbol, orderbook) in &ctx.orderbooks {
549        let all_orders = orderbook.get_all_orders();
550        let entries: Vec<OrderSnapshotEntry> = all_orders
551            .into_iter()
552            .map(|r| OrderSnapshotEntry {
553                order_id: r.order_id,
554                price: r.price,
555                quantity: r.quantity,
556                side: r.side,
557                wallet: r.wallet,
558                timestamp: r.timestamp,
559                client_id: r.client_id,
560                mmp_enabled: r.mmp_enabled,
561                original_size: Some(r.original_size),
562            })
563            .collect();
564        total_orders += entries.len() as u64;
565        orderbooks.insert(symbol.clone(), entries);
566    }
567
568    info!(
569        "Built engine state snapshot: last_command_id={}, last_l2_seq={}, \
570         next_order_id={}, next_trade_id={}, orderbooks={}, orders={}",
571        checkpoint.last_command_id,
572        checkpoint.last_l2_seq,
573        ctx.next_order_id,
574        ctx.next_trade_id,
575        orderbooks.len(),
576        total_orders,
577    );
578
579    let agent_authorizations: HashMap<WalletAddress, Vec<(WalletAddress, Option<u64>)>> = ctx
580        .agent_authorizations
581        .iter()
582        .map(|(wallet, agents)| {
583            let mut sorted: Vec<(WalletAddress, Option<u64>)> =
584                agents.iter().map(|(a, e)| (*a, *e)).collect();
585            sorted.sort_by_key(|(a, _)| *a);
586            (*wallet, sorted)
587        })
588        .collect();
589
590    if let Some(wallet) = ctx
591        .deps
592        .hypercore_account_equity
593        .keys()
594        .find(|wallet| !ctx.deps.hypercore_equity_timestamps.contains_key(wallet))
595    {
596        panic!(
597            "build_snapshot refusing to serialize hypercore_account_equity without \
598             hypercore_equity_timestamps for wallet {}; read_snapshot enforces the same invariant",
599            wallet
600        );
601    }
602
603    EngineStateSnapshot {
604        version: SNAPSHOT_VERSION,
605        last_command_id: checkpoint.last_command_id,
606        last_l2_seq: checkpoint.last_l2_seq,
607        next_order_id: ctx.next_order_id,
608        next_trade_id: ctx.next_trade_id,
609        expired_instruments: ctx.expired_instruments.clone(),
610        spot_prices: ctx.spot_prices.clone(),
611        iv_surfaces: ctx
612            .iv_surfaces
613            .iter()
614            .map(|(underlying, surface)| (underlying.clone(), surface.into()))
615            .collect(),
616        iv_source_timestamps: ctx.iv_source_timestamps.clone(),
617        instrument_trading_modes: ctx.instrument_trading_modes.clone(),
618        orderbooks,
619        engine_positions: ctx.engine_positions.clone(),
620        mmp_state: ctx.mmp_state.clone(),
621        liquidation_states: ctx.deps.liquidation_states.clone(),
622        wallet_margin_modes: ctx.deps.wallet_margin_modes.clone(),
623        mmp_enabled: ctx.deps.mmp_enabled.clone(),
624        wallet_trading_limits: ctx.deps.wallet_trading_limits.clone(),
625        default_trading_limits: ctx.deps.default_trading_limits,
626        wallet_tiers: ctx.deps.wallet_tiers.clone(),
627        balance_ledger: ctx.balance_ledger.snapshot_map(),
628        last_balance_update_seq: ctx.balance_ledger.last_balance_update_seq(),
629        deposit_update_watermarks: ctx.deposit_update_watermarks.clone(),
630        applied_deposit_source_event_hashes: ctx.applied_deposit_source_event_hashes.clone(),
631        pm_settlement_state: ctx.pm_settlement_state.clone(),
632        agent_authorizations,
633        agent_auth_loaded: true,
634        nonce_watermarks: HashMap::new(),
635        nonce_sets: {
636            let now_ms = std::time::SystemTime::now()
637                .duration_since(std::time::UNIX_EPOCH)
638                .map(|d| d.as_millis() as u64)
639                .unwrap_or(0);
640            ctx.nonce_sets
641                .iter()
642                .filter_map(|(signer, set)| {
643                    let mut pruned = set.clone();
644                    pruned.purge_expired(now_ms);
645                    if pruned.count() > 0 {
646                        Some((*signer, pruned.to_vec()))
647                    } else {
648                        None
649                    }
650                })
651                .collect()
652        },
653        rsm_signer_nonces: ctx.rsm_signer_nonces.clone(),
654        hypercore_account_equity: ctx.deps.hypercore_account_equity.clone(),
655        hypercore_equity_timestamps: ctx.deps.hypercore_equity_timestamps.clone(),
656        perp_positions: ctx.deps.perp_positions.clone(),
657    }
658}
659
660/// Write a snapshot atomically: temp → fsync → rename → dir sync.
661pub fn write_snapshot(path: &Path, snapshot: &EngineStateSnapshot) -> Result<(), String> {
662    if let Some(parent) = path.parent() {
663        if !parent.as_os_str().is_empty() {
664            create_dir_all(parent).map_err(|e| {
665                format!(
666                    "failed to create snapshot directory {}: {}",
667                    parent.display(),
668                    e
669                )
670            })?;
671        }
672    }
673
674    let mut tmp_os = path.as_os_str().to_os_string();
675    tmp_os.push(".tmp");
676    let tmp_path = PathBuf::from(tmp_os);
677
678    #[cfg(unix)]
679    let mut file = {
680        use std::os::unix::fs::OpenOptionsExt;
681        OpenOptions::new()
682            .create(true)
683            .truncate(true)
684            .write(true)
685            .mode(0o600)
686            .open(&tmp_path)
687            .map_err(|e| format!("failed to open snapshot temp {}: {}", tmp_path.display(), e))?
688    };
689    #[cfg(not(unix))]
690    let mut file = OpenOptions::new()
691        .create(true)
692        .truncate(true)
693        .write(true)
694        .open(&tmp_path)
695        .map_err(|e| format!("failed to open snapshot temp {}: {}", tmp_path.display(), e))?;
696
697    let payload =
698        rmp_serde::to_vec(snapshot).map_err(|e| format!("failed to serialize snapshot: {}", e))?;
699
700    let crc = SNAPSHOT_CRC.checksum(&payload);
701
702    file.write_all(&payload).map_err(|e| {
703        format!(
704            "failed to write snapshot temp {}: {}",
705            tmp_path.display(),
706            e
707        )
708    })?;
709    file.write_all(&crc.to_le_bytes())
710        .map_err(|e| format!("failed to write snapshot CRC {}: {}", tmp_path.display(), e))?;
711    file.sync_data()
712        .map_err(|e| format!("failed to sync snapshot temp {}: {}", tmp_path.display(), e))?;
713
714    std::fs::rename(&tmp_path, path).map_err(|e| {
715        format!(
716            "failed to atomically replace snapshot {} with {}: {}",
717            path.display(),
718            tmp_path.display(),
719            e
720        )
721    })?;
722
723    #[cfg(unix)]
724    {
725        let parent = path.parent().ok_or_else(|| {
726            format!(
727                "snapshot {} has no parent directory to sync",
728                path.display()
729            )
730        })?;
731        let parent = if parent.as_os_str().is_empty() {
732            Path::new(".")
733        } else {
734            parent
735        };
736
737        OpenOptions::new()
738            .read(true)
739            .open(parent)
740            .and_then(|dir| dir.sync_all())
741            .map_err(|e| {
742                format!(
743                    "failed to sync snapshot directory {}: {}",
744                    parent.display(),
745                    e
746                )
747            })?;
748    }
749
750    Ok(())
751}
752
753/// Read a snapshot from disk with CRC32 integrity verification.
754///
755/// Returns `Ok(None)` if the file does not exist.
756/// Returns `Err` if the file exists but is corrupt or has an unsupported version.
757pub fn read_snapshot(path: &Path) -> Result<Option<EngineStateSnapshot>, String> {
758    if !path.exists() {
759        return Ok(None);
760    }
761
762    let data = std::fs::read(path)
763        .map_err(|e| format!("failed to read snapshot {}: {}", path.display(), e))?;
764
765    if data.len() < 4 {
766        return Err(format!(
767            "snapshot {} too small ({} bytes, need at least 4 for CRC)",
768            path.display(),
769            data.len()
770        ));
771    }
772
773    let (payload, crc_bytes) = data.split_at(data.len() - 4);
774    let stored_crc = u32::from_le_bytes(
775        crc_bytes
776            .try_into()
777            .map_err(|_| format!("snapshot {} CRC slice length mismatch", path.display()))?,
778    );
779    let computed_crc = SNAPSHOT_CRC.checksum(payload);
780
781    if stored_crc != computed_crc {
782        return Err(format!(
783            "snapshot {} CRC mismatch: stored={:#010x}, computed={:#010x}",
784            path.display(),
785            stored_crc,
786            computed_crc
787        ));
788    }
789
790    let payload_version = snapshot_payload_version(payload);
791    match payload_version {
792        Some(LEGACY_SNAPSHOT_VERSION_V2 | LEGACY_SNAPSHOT_VERSION_V1) => {
793            let (legacy, legacy_layout) = decode_legacy_engine_state_snapshot(payload, path)?;
794            if !matches!(
795                legacy.version,
796                LEGACY_SNAPSHOT_VERSION_V2 | LEGACY_SNAPSHOT_VERSION_V1
797            ) {
798                return Err(format!(
799                    "snapshot {} payload version probe returned {}, but decoded legacy version was {}",
800                    path.display(),
801                    payload_version.expect("legacy version branch has a version"),
802                    legacy.version
803                ));
804            }
805            info!(
806                snapshot_path = %path.display(),
807                legacy_version = legacy.version,
808                legacy_layout = legacy_layout,
809                legacy_last_command_id = legacy.last_command_id,
810                balance_update_seq = LEGACY_BALANCE_UPDATE_SEQ_MIGRATION_EPOCH,
811                "Restoring legacy engine snapshot as temporary CALL-1707 migration base; WAL tail replay will rebuild canonical balance-update sequence state"
812            );
813            return Ok(Some(legacy.into_migration_snapshot(path)));
814        }
815        Some(SNAPSHOT_VERSION) | None => {}
816        Some(unsupported) => {
817            return Err(format!(
818                "snapshot {} has unsupported version {} (expected {})",
819                path.display(),
820                unsupported,
821                SNAPSHOT_VERSION,
822            ));
823        }
824    }
825    if payload_version.is_none() && msgpack_map_start(payload).is_some() {
826        info!(
827            snapshot_path = %path.display(),
828            "Ignoring legacy map-shaped engine snapshot without readable version metadata; WAL replay will rebuild engine state"
829        );
830        // Temporary CALL-1707 legacy hook: map-shaped artifacts without a
831        // readable version cannot be migrated with a trustworthy replay
832        // watermark. Treat them as absent and let the existing hot-journal
833        // coverage gate decide whether full WAL rebuild is safe. Remove this
834        // after every deployed environment has written a v3 snapshot.
835        return Ok(None);
836    }
837
838    let snapshot: EngineStateSnapshot = rmp_serde::from_slice(payload)
839        .map_err(|e| format!("failed to deserialize snapshot {}: {}", path.display(), e))?;
840
841    match snapshot.version {
842        SNAPSHOT_VERSION => {}
843        unsupported => {
844            return Err(format!(
845                "snapshot {} has unsupported version {} (expected {})",
846                path.display(),
847                unsupported,
848                SNAPSHOT_VERSION,
849            ));
850        }
851    }
852
853    if snapshot.version == SNAPSHOT_VERSION {
854        let missing_hypercore_equity_watermarks = snapshot
855            .hypercore_account_equity
856            .keys()
857            .filter(|wallet| !snapshot.hypercore_equity_timestamps.contains_key(wallet))
858            .count();
859        if missing_hypercore_equity_watermarks > 0 {
860            return Err(format!(
861                "snapshot {} has {} HyperCore equity entries without timestamp watermarks",
862                path.display(),
863                missing_hypercore_equity_watermarks
864            ));
865        }
866    }
867
868    Ok(Some(snapshot))
869}
870
871/// Count total orders across all orderbooks in a snapshot.
872pub fn snapshot_order_count(snapshot: &EngineStateSnapshot) -> usize {
873    snapshot.orderbooks.values().map(|v| v.len()).sum()
874}
875
876#[cfg(test)]
877mod tests {
878    use super::*;
879
880    fn make_test_snapshot() -> EngineStateSnapshot {
881        let mut orderbooks = HashMap::new();
882        orderbooks.insert(
883            "ETH-20260131-4000-C".to_string(),
884            vec![
885                OrderSnapshotEntry {
886                    order_id: 1,
887                    price: Decimal::new(1000, 1),  // 100.0
888                    quantity: Decimal::new(50, 1), // 5.0
889                    side: Side::Buy,
890                    wallet: WalletAddress::from(alloy::primitives::Address::repeat_byte(1)),
891                    timestamp: 1000,
892                    client_id: None,
893                    mmp_enabled: false,
894                    original_size: None,
895                },
896                OrderSnapshotEntry {
897                    order_id: 2,
898                    price: Decimal::new(1100, 1),  // 110.0
899                    quantity: Decimal::new(30, 1), // 3.0
900                    side: Side::Sell,
901                    wallet: WalletAddress::from(alloy::primitives::Address::repeat_byte(2)),
902                    timestamp: 2000,
903                    client_id: None,
904                    mmp_enabled: false,
905                    original_size: None,
906                },
907            ],
908        );
909
910        let mut expired = HashMap::new();
911        expired.insert("ETH-20250101-3000-C".to_string(), true);
912
913        EngineStateSnapshot {
914            version: SNAPSHOT_VERSION,
915            last_command_id: 42,
916            last_l2_seq: 99,
917            next_order_id: 100,
918            next_trade_id: 50,
919            expired_instruments: expired,
920            spot_prices: HashMap::new(),
921            iv_surfaces: HashMap::new(),
922            iv_source_timestamps: HashMap::new(),
923            instrument_trading_modes: HashMap::new(),
924            orderbooks,
925            engine_positions: HashMap::new(),
926            mmp_state: HashMap::new(),
927            liquidation_states: HashMap::new(),
928            wallet_margin_modes: HashMap::new(),
929            mmp_enabled: HashMap::new(),
930            wallet_trading_limits: HashMap::new(),
931            default_trading_limits: hypercall_types::api_models::TradingLimits::default(),
932            wallet_tiers: HashMap::new(),
933            balance_ledger: HashMap::new(),
934            last_balance_update_seq: 0,
935            deposit_update_watermarks: HashMap::new(),
936            applied_deposit_source_event_hashes: std::collections::BTreeSet::new(),
937            pm_settlement_state:
938                crate::rsm::portfolio_margin::settlement_state::PmSettlementState::default(),
939            agent_authorizations: HashMap::new(),
940            agent_auth_loaded: false,
941            nonce_watermarks: HashMap::new(),
942            nonce_sets: HashMap::new(),
943            rsm_signer_nonces: HashMap::new(),
944            hypercore_account_equity: HashMap::new(),
945            hypercore_equity_timestamps: HashMap::new(),
946            perp_positions: HashMap::new(),
947        }
948    }
949
950    #[test]
951    fn snapshot_roundtrip() {
952        let dir = tempfile::tempdir().expect("create tempdir");
953        let path = dir.path().join("test.engine_snapshot");
954        let snapshot = make_test_snapshot();
955
956        write_snapshot(&path, &snapshot).expect("write snapshot");
957        let loaded = read_snapshot(&path)
958            .expect("read snapshot")
959            .expect("snapshot should exist");
960
961        assert_eq!(loaded.version, SNAPSHOT_VERSION);
962        assert_eq!(loaded.last_command_id, 42);
963        assert_eq!(loaded.last_l2_seq, 99);
964        assert_eq!(loaded.next_order_id, 100);
965        assert_eq!(loaded.next_trade_id, 50);
966        assert_eq!(loaded.expired_instruments.len(), 1);
967        assert_eq!(
968            loaded.expired_instruments.get("ETH-20250101-3000-C"),
969            Some(&true)
970        );
971
972        let orders = loaded
973            .orderbooks
974            .get("ETH-20260131-4000-C")
975            .expect("should have orderbook");
976        assert_eq!(orders.len(), 2);
977        assert_eq!(orders[0].order_id, 1);
978        assert_eq!(orders[1].order_id, 2);
979    }
980
981    #[test]
982    fn snapshot_missing_returns_none() {
983        let dir = tempfile::tempdir().expect("create tempdir");
984        let path = dir.path().join("missing.engine_snapshot");
985        assert!(read_snapshot(&path).expect("should not error").is_none());
986    }
987
988    #[test]
989    fn snapshot_corrupt_crc_returns_err() {
990        let dir = tempfile::tempdir().expect("create tempdir");
991        let path = dir.path().join("corrupt.engine_snapshot");
992        let snapshot = make_test_snapshot();
993
994        write_snapshot(&path, &snapshot).expect("write snapshot");
995
996        // Corrupt one byte in the payload
997        let mut data = std::fs::read(&path).unwrap();
998        if data.len() > 4 {
999            data[0] ^= 0xFF;
1000        }
1001        std::fs::write(&path, &data).unwrap();
1002
1003        let err = read_snapshot(&path).expect_err("should detect corruption");
1004        assert!(err.contains("CRC mismatch"), "unexpected error: {}", err);
1005    }
1006
1007    #[test]
1008    fn snapshot_too_small_returns_err() {
1009        let dir = tempfile::tempdir().expect("create tempdir");
1010        let path = dir.path().join("tiny.engine_snapshot");
1011        std::fs::write(&path, b"ab").unwrap();
1012
1013        let err = read_snapshot(&path).expect_err("should reject too-small file");
1014        assert!(err.contains("too small"), "unexpected error: {}", err);
1015    }
1016
1017    #[test]
1018    fn snapshot_path_from_wal_path_appends_suffix() {
1019        let wal = PathBuf::from("/data/engine-journal.wal");
1020        let snap = snapshot_path_from_wal_path(&wal);
1021        assert_eq!(
1022            snap,
1023            PathBuf::from("/data/engine-journal.wal.engine_snapshot")
1024        );
1025    }
1026
1027    #[test]
1028    fn snapshot_order_count_works() {
1029        let snapshot = make_test_snapshot();
1030        assert_eq!(snapshot_order_count(&snapshot), 2);
1031    }
1032
1033    #[test]
1034    fn snapshot_metadata_roundtrip() {
1035        // Snapshot with metadata populated should preserve it across write/read.
1036        let dir = tempfile::tempdir().expect("create tempdir");
1037        let path = dir.path().join("meta.engine_snapshot");
1038
1039        let mut orderbooks = HashMap::new();
1040        orderbooks.insert(
1041            "BTC-20260311-90000-C".to_string(),
1042            vec![
1043                OrderSnapshotEntry {
1044                    order_id: 10,
1045                    price: Decimal::new(5000, 0),
1046                    quantity: Decimal::new(3, 0),
1047                    side: Side::Sell,
1048                    wallet: WalletAddress::from(alloy::primitives::Address::repeat_byte(0xAA)),
1049                    timestamp: 9999,
1050                    client_id: Some("cli_42".to_string()),
1051                    mmp_enabled: true,
1052                    original_size: Some(Decimal::new(10, 0)),
1053                },
1054                OrderSnapshotEntry {
1055                    order_id: 11,
1056                    price: Decimal::new(4500, 0),
1057                    quantity: Decimal::new(7, 0),
1058                    side: Side::Buy,
1059                    wallet: WalletAddress::from(alloy::primitives::Address::repeat_byte(0xBB)),
1060                    timestamp: 10000,
1061                    client_id: None, // FE order — no client_id
1062                    mmp_enabled: false,
1063                    original_size: Some(Decimal::new(7, 0)),
1064                },
1065            ],
1066        );
1067
1068        let snapshot = EngineStateSnapshot {
1069            version: SNAPSHOT_VERSION,
1070            last_command_id: 100,
1071            last_l2_seq: 200,
1072            next_order_id: 500,
1073            next_trade_id: 300,
1074            expired_instruments: HashMap::new(),
1075            spot_prices: HashMap::new(),
1076            iv_surfaces: HashMap::new(),
1077            iv_source_timestamps: HashMap::new(),
1078            instrument_trading_modes: HashMap::new(),
1079            orderbooks,
1080            engine_positions: HashMap::new(),
1081            mmp_state: HashMap::new(),
1082            liquidation_states: HashMap::new(),
1083            wallet_margin_modes: HashMap::new(),
1084            mmp_enabled: HashMap::new(),
1085            wallet_trading_limits: HashMap::new(),
1086            default_trading_limits: hypercall_types::api_models::TradingLimits::default(),
1087            wallet_tiers: HashMap::new(),
1088            balance_ledger: HashMap::new(),
1089            last_balance_update_seq: 0,
1090            deposit_update_watermarks: HashMap::new(),
1091            applied_deposit_source_event_hashes: std::collections::BTreeSet::new(),
1092            pm_settlement_state:
1093                crate::rsm::portfolio_margin::settlement_state::PmSettlementState::default(),
1094            agent_authorizations: HashMap::new(),
1095            agent_auth_loaded: false,
1096            nonce_watermarks: HashMap::new(),
1097            nonce_sets: HashMap::new(),
1098            rsm_signer_nonces: HashMap::new(),
1099            hypercore_account_equity: HashMap::new(),
1100            hypercore_equity_timestamps: HashMap::new(),
1101            perp_positions: HashMap::new(),
1102        };
1103
1104        write_snapshot(&path, &snapshot).expect("write");
1105        let loaded = read_snapshot(&path).expect("read").expect("exists");
1106
1107        let orders = loaded
1108            .orderbooks
1109            .get("BTC-20260311-90000-C")
1110            .expect("orderbook");
1111        assert_eq!(orders.len(), 2);
1112
1113        // MM order: metadata preserved
1114        assert_eq!(orders[0].client_id, Some("cli_42".to_string()));
1115        assert!(orders[0].mmp_enabled);
1116        assert_eq!(orders[0].original_size, Some(Decimal::new(10, 0)));
1117
1118        // FE order: no client_id
1119        assert_eq!(orders[1].client_id, None);
1120        assert!(!orders[1].mmp_enabled);
1121        assert_eq!(orders[1].original_size, Some(Decimal::new(7, 0)));
1122    }
1123
1124    /// Legacy snapshot entry WITHOUT the new metadata fields.
1125    /// Used to produce a truly old-format payload for backwards compat testing.
1126    #[derive(Debug, Serialize)]
1127    struct LegacyOrderSnapshotEntry {
1128        order_id: u64,
1129        price: Decimal,
1130        quantity: Decimal,
1131        side: Side,
1132        wallet: WalletAddress,
1133        timestamp: u64,
1134    }
1135
1136    #[derive(Debug, Serialize)]
1137    struct MinimalLegacyEngineStateSnapshot {
1138        version: u32,
1139        last_command_id: i64,
1140        last_l2_seq: i64,
1141        next_order_id: u64,
1142        next_trade_id: u64,
1143        expired_instruments: HashMap<String, bool>,
1144        orderbooks: HashMap<String, Vec<LegacyOrderSnapshotEntry>>,
1145        engine_positions: HashMap<(WalletAddress, String), crate::rsm::engine_deps::EnginePosition>,
1146        balance_ledger: HashMap<WalletAddress, Decimal>,
1147    }
1148
1149    #[derive(Debug, Serialize)]
1150    struct MissingBalanceLedgerSnapshot {
1151        version: u32,
1152        last_command_id: i64,
1153        last_l2_seq: i64,
1154        next_order_id: u64,
1155        next_trade_id: u64,
1156        expired_instruments: HashMap<String, bool>,
1157        orderbooks: HashMap<String, Vec<LegacyOrderSnapshotEntry>>,
1158        engine_positions: HashMap<(WalletAddress, String), crate::rsm::engine_deps::EnginePosition>,
1159        last_balance_update_seq: u64,
1160    }
1161
1162    #[derive(Debug, Serialize)]
1163    struct SnapshotBeforeDepositSourceHashField {
1164        version: u32,
1165        last_command_id: i64,
1166        last_l2_seq: i64,
1167        next_order_id: u64,
1168        next_trade_id: u64,
1169        expired_instruments: HashMap<String, bool>,
1170        orderbooks: HashMap<String, Vec<LegacyOrderSnapshotEntry>>,
1171        engine_positions: HashMap<(WalletAddress, String), crate::rsm::engine_deps::EnginePosition>,
1172        balance_ledger: HashMap<WalletAddress, Decimal>,
1173        last_balance_update_seq: u64,
1174        deposit_update_watermarks:
1175            HashMap<WalletAddress, crate::rsm::engine_deps::DepositUpdateWatermark>,
1176        agent_authorizations: HashMap<WalletAddress, Vec<(WalletAddress, Option<u64>)>>,
1177        agent_auth_loaded: bool,
1178        nonce_watermarks: HashMap<WalletAddress, u64>,
1179        nonce_sets: HashMap<WalletAddress, Vec<u64>>,
1180        hypercore_account_equity: HashMap<WalletAddress, Decimal>,
1181        hypercore_equity_timestamps: HashMap<WalletAddress, u64>,
1182        perp_positions: HashMap<(String, String), crate::hypercore::PerpPosition>,
1183        spot_prices: HashMap<String, Decimal>,
1184        iv_surfaces: HashMap<String, VolatilitySurfaceSnapshot>,
1185        iv_source_timestamps: HashMap<String, i64>,
1186        instrument_trading_modes: HashMap<String, hypercall_types::TradingModes>,
1187        mmp_state: HashMap<(WalletAddress, String), crate::rsm::engine_deps::EngineMmpState>,
1188        liquidation_states: HashMap<WalletAddress, hypercall_types::LiquidationStateType>,
1189        wallet_margin_modes: HashMap<WalletAddress, crate::rsm::margin_mode::MarginMode>,
1190        mmp_enabled: HashMap<(WalletAddress, String), bool>,
1191        wallet_trading_limits: HashMap<WalletAddress, hypercall_types::api_models::TradingLimits>,
1192        default_trading_limits: hypercall_types::api_models::TradingLimits,
1193        wallet_tiers: HashMap<WalletAddress, String>,
1194        rsm_signer_nonces: HashMap<WalletAddress, u64>,
1195    }
1196
1197    #[derive(Debug, Serialize)]
1198    struct LegacyV2SnapshotPreAppendOnlySourceHashes {
1199        version: u32,
1200        last_command_id: i64,
1201        last_l2_seq: i64,
1202        next_order_id: u64,
1203        next_trade_id: u64,
1204        expired_instruments: HashMap<String, bool>,
1205        orderbooks: HashMap<String, Vec<LegacyOrderSnapshotEntry>>,
1206        engine_positions: HashMap<(WalletAddress, String), crate::rsm::engine_deps::EnginePosition>,
1207        balance_ledger: HashMap<WalletAddress, Decimal>,
1208        deposit_update_watermarks:
1209            HashMap<WalletAddress, crate::rsm::engine_deps::DepositUpdateWatermark>,
1210        applied_deposit_source_event_hashes:
1211            std::collections::BTreeSet<alloy::primitives::FixedBytes<32>>,
1212        agent_authorizations: HashMap<WalletAddress, Vec<(WalletAddress, Option<u64>)>>,
1213        agent_auth_loaded: bool,
1214        nonce_watermarks: HashMap<WalletAddress, u64>,
1215        nonce_sets: HashMap<WalletAddress, Vec<u64>>,
1216        hypercore_account_equity: HashMap<WalletAddress, Decimal>,
1217        hypercore_equity_timestamps: HashMap<WalletAddress, u64>,
1218        perp_positions: HashMap<(String, String), crate::hypercore::PerpPosition>,
1219        spot_prices: HashMap<String, Decimal>,
1220        iv_surfaces: HashMap<String, VolatilitySurfaceSnapshot>,
1221        iv_source_timestamps: HashMap<String, i64>,
1222        instrument_trading_modes: HashMap<String, hypercall_types::TradingModes>,
1223        mmp_state: HashMap<(WalletAddress, String), crate::rsm::engine_deps::EngineMmpState>,
1224        liquidation_states: HashMap<WalletAddress, hypercall_types::LiquidationStateType>,
1225        wallet_margin_modes: HashMap<WalletAddress, crate::rsm::margin_mode::MarginMode>,
1226        mmp_enabled: HashMap<(WalletAddress, String), bool>,
1227        wallet_trading_limits: HashMap<WalletAddress, hypercall_types::api_models::TradingLimits>,
1228        default_trading_limits: hypercall_types::api_models::TradingLimits,
1229        wallet_tiers: HashMap<WalletAddress, String>,
1230        rsm_signer_nonces: HashMap<WalletAddress, u64>,
1231    }
1232
1233    fn serialize_snapshot_without_hypercore_equity_timestamps(
1234        snapshot: &EngineStateSnapshot,
1235    ) -> Vec<u8> {
1236        let mut value =
1237            rmp_serde::to_vec_named(snapshot).expect("serialize snapshot to named msgpack");
1238        let mut json: serde_json::Value =
1239            rmp_serde::from_slice(&value).expect("deserialize snapshot into json");
1240        let object = json
1241            .as_object_mut()
1242            .expect("named snapshot encoding must deserialize as an object");
1243        let removed = object.remove("hypercore_equity_timestamps");
1244        assert!(
1245            removed.is_some(),
1246            "test snapshot must contain hypercore_equity_timestamps"
1247        );
1248        value.clear();
1249        rmp_serde::encode::write_named(&mut value, &json)
1250            .expect("re-encode named snapshot without watermarks");
1251        value
1252    }
1253
1254    #[test]
1255    fn legacy_snapshot_without_balance_update_seq_restores_as_migration_base() {
1256        let dir = tempfile::tempdir().expect("create tempdir");
1257        let path = dir.path().join("old.engine_snapshot");
1258        let wallet = WalletAddress::from(alloy::primitives::Address::repeat_byte(0x42));
1259
1260        let mut orderbooks = HashMap::new();
1261        orderbooks.insert(
1262            "ETH-20260131-4000-C".to_string(),
1263            vec![LegacyOrderSnapshotEntry {
1264                order_id: 1,
1265                price: Decimal::new(1000, 1),
1266                quantity: Decimal::new(50, 1),
1267                side: Side::Buy,
1268                wallet: WalletAddress::from(alloy::primitives::Address::repeat_byte(1)),
1269                timestamp: 1000,
1270            }],
1271        );
1272        let mut balance_ledger = HashMap::new();
1273        balance_ledger.insert(wallet, Decimal::new(2500, 0));
1274
1275        let legacy = MinimalLegacyEngineStateSnapshot {
1276            version: LEGACY_SNAPSHOT_VERSION_V1,
1277            last_command_id: 42,
1278            last_l2_seq: 99,
1279            next_order_id: 100,
1280            next_trade_id: 50,
1281            expired_instruments: HashMap::new(),
1282            orderbooks,
1283            engine_positions: HashMap::new(),
1284            balance_ledger,
1285        };
1286
1287        // Write using raw msgpack + CRC (same file format, but legacy struct)
1288        let payload = rmp_serde::to_vec(&legacy).expect("serialize legacy");
1289        let crc = SNAPSHOT_CRC.checksum(&payload);
1290        let mut data = payload;
1291        data.extend_from_slice(&crc.to_le_bytes());
1292        std::fs::write(&path, &data).expect("write legacy snapshot");
1293
1294        let loaded = read_snapshot(&path)
1295            .expect("legacy snapshot should migrate")
1296            .expect("legacy snapshot should be present after migration");
1297
1298        assert_eq!(loaded.version, SNAPSHOT_VERSION);
1299        assert_eq!(loaded.last_command_id, 42);
1300        assert_eq!(loaded.last_l2_seq, 99);
1301        assert_eq!(
1302            loaded.last_balance_update_seq, LEGACY_BALANCE_UPDATE_SEQ_MIGRATION_EPOCH,
1303            "legacy snapshots enter CALL-1707 migration at balance update sequence zero"
1304        );
1305        assert_eq!(
1306            loaded.balance_ledger.get(&wallet),
1307            Some(&Decimal::new(2500, 0))
1308        );
1309        let orders = loaded
1310            .orderbooks
1311            .get("ETH-20260131-4000-C")
1312            .expect("legacy orderbook should be restored");
1313        assert_eq!(orders.len(), 1);
1314        assert_eq!(orders[0].order_id, 1);
1315        assert_eq!(orders[0].client_id, None);
1316        assert_eq!(orders[0].original_size, None);
1317    }
1318
1319    #[test]
1320    fn temp_call_1707_legacy_snapshot_tail_replay_e2e_allows_archived_prefix() {
1321        // Temporary CALL-1707 e2e-style regression: remove this with the
1322        // legacy snapshot migration hook after every environment writes v3
1323        // snapshots. This simulates staging retaining only the hot journal tail
1324        // after a legacy snapshot checkpoint.
1325        let dir = tempfile::tempdir().expect("create tempdir");
1326        let path = dir.path().join("legacy-tail-replay.engine_snapshot");
1327        let legacy = MinimalLegacyEngineStateSnapshot {
1328            version: LEGACY_SNAPSHOT_VERSION_V2,
1329            last_command_id: 407_647_670,
1330            last_l2_seq: 200_000_000,
1331            next_order_id: 100,
1332            next_trade_id: 50,
1333            expired_instruments: HashMap::new(),
1334            orderbooks: HashMap::new(),
1335            engine_positions: HashMap::new(),
1336            balance_ledger: HashMap::new(),
1337        };
1338
1339        let payload = rmp_serde::to_vec(&legacy).expect("serialize legacy");
1340        let crc = SNAPSHOT_CRC.checksum(&payload);
1341        let mut data = payload;
1342        data.extend_from_slice(&crc.to_le_bytes());
1343        std::fs::write(&path, &data).expect("write legacy snapshot");
1344
1345        let loaded = read_snapshot(&path)
1346            .expect("legacy snapshot should migrate")
1347            .expect("legacy snapshot should be present after migration");
1348        assert_eq!(loaded.last_command_id, 407_647_670);
1349        assert_eq!(
1350            loaded.last_balance_update_seq,
1351            LEGACY_BALANCE_UPDATE_SEQ_MIGRATION_EPOCH
1352        );
1353
1354        crate::rsm::unified_engine::validate_hot_journal_covers_base_replay(
1355            416_984_317,
1356            Some(hypercall_db::JournalCommandIdBounds {
1357                min_command_id: 407_647_671,
1358                max_command_id: 416_984_317,
1359            }),
1360            loaded.last_command_id + 1,
1361            None,
1362        )
1363        .expect("legacy snapshot migration should allow replay from the retained hot journal tail");
1364
1365        let error = crate::rsm::unified_engine::validate_hot_journal_covers_base_replay(
1366            416_984_317,
1367            Some(hypercall_db::JournalCommandIdBounds {
1368                min_command_id: 407_647_672,
1369                max_command_id: 416_984_317,
1370            }),
1371            loaded.last_command_id + 1,
1372            None,
1373        )
1374        .expect_err("a missing first tail command must still fail closed");
1375        assert!(
1376            error.contains("requires replay from command_id 407647671"),
1377            "error should identify the missing post-snapshot tail command: {error}"
1378        );
1379    }
1380
1381    #[test]
1382    fn legacy_map_snapshot_without_version_is_ignored() {
1383        let dir = tempfile::tempdir().expect("create tempdir");
1384        let path = dir.path().join("legacy-map.engine_snapshot");
1385
1386        let mut payload = Vec::new();
1387        rmp_serde::encode::write_named(
1388            &mut payload,
1389            &serde_json::json!({
1390                "legacy_snapshot": true
1391            }),
1392        )
1393        .expect("serialize legacy map snapshot");
1394        let crc = SNAPSHOT_CRC.checksum(&payload);
1395        let mut data = payload;
1396        data.extend_from_slice(&crc.to_le_bytes());
1397        std::fs::write(&path, &data).expect("write legacy map snapshot");
1398
1399        assert!(
1400            read_snapshot(&path)
1401                .expect("legacy map snapshot should be ignored")
1402                .is_none(),
1403            "legacy map-shaped snapshot must be discarded so WAL replay rebuilds engine state"
1404        );
1405    }
1406
1407    #[test]
1408    fn legacy_v2_snapshot_pre_append_only_source_hash_layout_migrates() {
1409        let dir = tempfile::tempdir().expect("create tempdir");
1410        let path = dir
1411            .path()
1412            .join("legacy-v2-pre-append-source-hash.engine_snapshot");
1413        let owner = WalletAddress::from(alloy::primitives::Address::repeat_byte(0x22));
1414        let agent = WalletAddress::from(alloy::primitives::Address::repeat_byte(0x23));
1415        let source_event_hash = alloy::primitives::FixedBytes::<32>::from([0x7a; 32]);
1416        let mut source_event_hashes = std::collections::BTreeSet::new();
1417        source_event_hashes.insert(source_event_hash);
1418        let mut agent_authorizations = HashMap::new();
1419        agent_authorizations.insert(owner, vec![(agent, Some(5555))]);
1420
1421        let legacy = LegacyV2SnapshotPreAppendOnlySourceHashes {
1422            version: LEGACY_SNAPSHOT_VERSION_V2,
1423            last_command_id: 407_647_670,
1424            last_l2_seq: 200_000_000,
1425            next_order_id: 100,
1426            next_trade_id: 50,
1427            expired_instruments: HashMap::new(),
1428            orderbooks: HashMap::new(),
1429            engine_positions: HashMap::new(),
1430            balance_ledger: HashMap::new(),
1431            deposit_update_watermarks: HashMap::new(),
1432            applied_deposit_source_event_hashes: source_event_hashes,
1433            agent_authorizations,
1434            agent_auth_loaded: true,
1435            nonce_watermarks: HashMap::new(),
1436            nonce_sets: HashMap::new(),
1437            hypercore_account_equity: HashMap::new(),
1438            hypercore_equity_timestamps: HashMap::new(),
1439            perp_positions: HashMap::new(),
1440            spot_prices: HashMap::new(),
1441            iv_surfaces: HashMap::new(),
1442            iv_source_timestamps: HashMap::new(),
1443            instrument_trading_modes: HashMap::new(),
1444            mmp_state: HashMap::new(),
1445            liquidation_states: HashMap::new(),
1446            wallet_margin_modes: HashMap::new(),
1447            mmp_enabled: HashMap::new(),
1448            wallet_trading_limits: HashMap::new(),
1449            default_trading_limits: Default::default(),
1450            wallet_tiers: HashMap::new(),
1451            rsm_signer_nonces: HashMap::new(),
1452        };
1453
1454        let payload = rmp_serde::to_vec(&legacy).expect("serialize pre-append legacy v2 snapshot");
1455        let crc = SNAPSHOT_CRC.checksum(&payload);
1456        let mut data = payload;
1457        data.extend_from_slice(&crc.to_le_bytes());
1458        std::fs::write(&path, &data).expect("write legacy snapshot");
1459
1460        let loaded = read_snapshot(&path)
1461            .expect("pre-append legacy v2 snapshot should migrate")
1462            .expect("legacy snapshot should be present after migration");
1463
1464        assert_eq!(loaded.version, SNAPSHOT_VERSION);
1465        assert_eq!(loaded.last_command_id, 407_647_670);
1466        assert_eq!(
1467            loaded.last_balance_update_seq, LEGACY_BALANCE_UPDATE_SEQ_MIGRATION_EPOCH,
1468            "pre-append legacy snapshots enter CALL-1707 migration at balance update sequence zero"
1469        );
1470        assert!(
1471            loaded
1472                .applied_deposit_source_event_hashes
1473                .contains(&source_event_hash),
1474            "fallback decoder must preserve source event hash state from the old positional slot"
1475        );
1476        assert_eq!(
1477            loaded.agent_authorizations.get(&owner),
1478            Some(&vec![(agent, Some(5555))]),
1479            "fallback decoder must not shift source hashes into agent authorizations"
1480        );
1481    }
1482
1483    #[test]
1484    fn snapshot_backwards_compat_deposit_source_hash_field_appended() {
1485        let dir = tempfile::tempdir().expect("create tempdir");
1486        let path = dir
1487            .path()
1488            .join("old-v3-before-deposit-source-hash.engine_snapshot");
1489        let owner = WalletAddress::from(alloy::primitives::Address::repeat_byte(2));
1490        let agent = WalletAddress::from(alloy::primitives::Address::repeat_byte(3));
1491        let mut agent_authorizations = HashMap::new();
1492        agent_authorizations.insert(owner, vec![(agent, Some(1234))]);
1493
1494        let legacy = SnapshotBeforeDepositSourceHashField {
1495            version: SNAPSHOT_VERSION,
1496            last_command_id: 622489,
1497            last_l2_seq: 16735,
1498            next_order_id: 8478,
1499            next_trade_id: 8486,
1500            expired_instruments: HashMap::new(),
1501            orderbooks: HashMap::new(),
1502            engine_positions: HashMap::new(),
1503            balance_ledger: HashMap::new(),
1504            last_balance_update_seq: 12,
1505            deposit_update_watermarks: HashMap::new(),
1506            agent_authorizations,
1507            agent_auth_loaded: true,
1508            nonce_watermarks: HashMap::new(),
1509            nonce_sets: HashMap::new(),
1510            hypercore_account_equity: HashMap::new(),
1511            hypercore_equity_timestamps: HashMap::new(),
1512            perp_positions: HashMap::new(),
1513            spot_prices: HashMap::new(),
1514            iv_surfaces: HashMap::new(),
1515            iv_source_timestamps: HashMap::new(),
1516            instrument_trading_modes: HashMap::new(),
1517            mmp_state: HashMap::new(),
1518            liquidation_states: HashMap::new(),
1519            wallet_margin_modes: HashMap::new(),
1520            mmp_enabled: HashMap::new(),
1521            wallet_trading_limits: HashMap::new(),
1522            default_trading_limits: Default::default(),
1523            wallet_tiers: HashMap::new(),
1524            rsm_signer_nonces: HashMap::new(),
1525        };
1526
1527        let payload = rmp_serde::to_vec(&legacy).expect("serialize legacy v2 snapshot");
1528        let crc = SNAPSHOT_CRC.checksum(&payload);
1529        let mut data = payload;
1530        data.extend_from_slice(&crc.to_le_bytes());
1531        std::fs::write(&path, &data).expect("write legacy v2 snapshot");
1532
1533        let loaded = read_snapshot(&path).expect("read").expect("should exist");
1534
1535        assert_eq!(loaded.last_command_id, 622489);
1536        assert_eq!(loaded.last_balance_update_seq, 12);
1537        assert_eq!(
1538            loaded.agent_authorizations.get(&owner),
1539            Some(&vec![(agent, Some(1234))])
1540        );
1541        assert!(loaded.applied_deposit_source_event_hashes.is_empty());
1542    }
1543
1544    #[test]
1545    fn snapshot_missing_balance_ledger_fails_fast() {
1546        let dir = tempfile::tempdir().expect("create tempdir");
1547        let path = dir.path().join("missing-balance-ledger.engine_snapshot");
1548
1549        let missing_balance = MissingBalanceLedgerSnapshot {
1550            version: SNAPSHOT_VERSION,
1551            last_command_id: 42,
1552            last_l2_seq: 99,
1553            next_order_id: 100,
1554            next_trade_id: 50,
1555            expired_instruments: HashMap::new(),
1556            orderbooks: HashMap::new(),
1557            engine_positions: HashMap::new(),
1558            last_balance_update_seq: 0,
1559        };
1560
1561        let payload = rmp_serde::to_vec(&missing_balance).expect("serialize missing balance");
1562        let crc = SNAPSHOT_CRC.checksum(&payload);
1563        let mut data = payload;
1564        data.extend_from_slice(&crc.to_le_bytes());
1565        std::fs::write(&path, &data).expect("write missing balance snapshot");
1566
1567        let err = read_snapshot(&path).expect_err("missing balance_ledger must fail");
1568        assert!(
1569            err.contains("failed to deserialize snapshot"),
1570            "unexpected error: {}",
1571            err
1572        );
1573    }
1574
1575    #[test]
1576    fn snapshot_hypercore_equity_without_watermarks_fails_fast() {
1577        let dir = tempfile::tempdir().expect("create tempdir");
1578        let path = dir
1579            .path()
1580            .join("missing-hypercore-equity-watermarks.engine_snapshot");
1581        let wallet = WalletAddress::from(alloy::primitives::Address::repeat_byte(0x13));
1582        let mut snapshot = make_test_snapshot();
1583        snapshot.version = SNAPSHOT_VERSION;
1584        snapshot.last_command_id = 42;
1585        snapshot.last_l2_seq = 99;
1586        snapshot.next_order_id = 100;
1587        snapshot.next_trade_id = 50;
1588        snapshot.balance_ledger.clear();
1589        snapshot.orderbooks.clear();
1590        snapshot.engine_positions.clear();
1591        snapshot.hypercore_account_equity.clear();
1592        snapshot.hypercore_equity_timestamps.clear();
1593        snapshot
1594            .hypercore_account_equity
1595            .insert(wallet, Decimal::new(5000, 0));
1596
1597        let payload = serialize_snapshot_without_hypercore_equity_timestamps(&snapshot);
1598        let crc = SNAPSHOT_CRC.checksum(&payload);
1599        let mut data = payload;
1600        data.extend_from_slice(&crc.to_le_bytes());
1601        std::fs::write(&path, &data).expect("write missing watermarks snapshot");
1602
1603        let err = read_snapshot(&path).expect_err("missing equity watermarks must fail");
1604        assert!(
1605            err.contains("HyperCore equity entries without timestamp watermarks")
1606                || err.contains("failed to deserialize snapshot"),
1607            "unexpected error: {}",
1608            err
1609        );
1610    }
1611
1612    #[test]
1613    fn legacy_v1_snapshot_hypercore_equity_without_watermarks_migrates_zero_watermarks() {
1614        let dir = tempfile::tempdir().expect("create tempdir");
1615        let path = dir
1616            .path()
1617            .join("legacy-v1-hypercore-equity.engine_snapshot");
1618        let wallet = WalletAddress::from(alloy::primitives::Address::repeat_byte(0x14));
1619        let mut snapshot = make_test_snapshot();
1620        snapshot.version = LEGACY_SNAPSHOT_VERSION_V1;
1621        snapshot.last_command_id = 42;
1622        snapshot.last_l2_seq = 99;
1623        snapshot.next_order_id = 100;
1624        snapshot.next_trade_id = 50;
1625        snapshot.balance_ledger.clear();
1626        snapshot.orderbooks.clear();
1627        snapshot.engine_positions.clear();
1628        snapshot.hypercore_account_equity.clear();
1629        snapshot.hypercore_equity_timestamps.clear();
1630        snapshot
1631            .hypercore_account_equity
1632            .insert(wallet, Decimal::new(5000, 0));
1633
1634        let payload = serialize_snapshot_without_hypercore_equity_timestamps(&snapshot);
1635        let crc = SNAPSHOT_CRC.checksum(&payload);
1636        let mut data = payload;
1637        data.extend_from_slice(&crc.to_le_bytes());
1638        std::fs::write(&path, &data).expect("write legacy snapshot");
1639
1640        let loaded = read_snapshot(&path)
1641            .expect("legacy snapshot should migrate")
1642            .expect("legacy snapshot should be present after migration");
1643
1644        assert_eq!(loaded.version, SNAPSHOT_VERSION);
1645        assert_eq!(
1646            loaded.hypercore_equity_timestamps.get(&wallet),
1647            Some(&0),
1648            "temporary legacy migration seeds missing HyperCore equity timestamps at zero"
1649        );
1650    }
1651}