1use crate::rsm::engine_deps::EngineCtx;
13use crc::{Crc, CRC_32_ISO_HDLC};
14use hypercall_journal::checkpoint::WalCheckpointMetadata;
15use hypercall_types::{Side, WalletAddress};
16use rust_decimal::Decimal;
17use serde::{Deserialize, Serialize};
18use std::collections::HashMap;
19use std::fs::{create_dir_all, OpenOptions};
20use std::io::Write;
21use std::path::{Path, PathBuf};
22use tracing::{info, warn};
23
24const SNAPSHOT_CRC: Crc<u32> = Crc::<u32>::new(&CRC_32_ISO_HDLC);
25
26const SNAPSHOT_VERSION: u32 = 3;
28const LEGACY_SNAPSHOT_VERSION_V2: u32 = 2;
29const LEGACY_SNAPSHOT_VERSION_V1: u32 = 1;
30const LEGACY_BALANCE_UPDATE_SEQ_MIGRATION_EPOCH: u64 = 0;
31
32fn msgpack_array_start(payload: &[u8]) -> Option<(usize, usize)> {
33 let marker = *payload.first()?;
34 match marker {
35 0x90..=0x9f => Some(((marker & 0x0f) as usize, 1)),
36 0xdc => {
37 let bytes: [u8; 2] = payload.get(1..3)?.try_into().ok()?;
38 Some((u16::from_be_bytes(bytes) as usize, 3))
39 }
40 0xdd => {
41 let bytes: [u8; 4] = payload.get(1..5)?.try_into().ok()?;
42 Some((u32::from_be_bytes(bytes) as usize, 5))
43 }
44 _ => None,
45 }
46}
47
48fn msgpack_map_start(payload: &[u8]) -> Option<(usize, usize)> {
49 let marker = *payload.first()?;
50 match marker {
51 0x80..=0x8f => Some(((marker & 0x0f) as usize, 1)),
52 0xde => {
53 let bytes: [u8; 2] = payload.get(1..3)?.try_into().ok()?;
54 Some((u16::from_be_bytes(bytes) as usize, 3))
55 }
56 0xdf => {
57 let bytes: [u8; 4] = payload.get(1..5)?.try_into().ok()?;
58 Some((u32::from_be_bytes(bytes) as usize, 5))
59 }
60 _ => None,
61 }
62}
63
64fn msgpack_u32_at(payload: &[u8], offset: usize) -> Option<u32> {
65 let marker = *payload.get(offset)?;
66 match marker {
67 0x00..=0x7f => Some(marker as u32),
68 0xcc => Some(*payload.get(offset + 1)? as u32),
69 0xcd => {
70 let bytes: [u8; 2] = payload.get(offset + 1..offset + 3)?.try_into().ok()?;
71 Some(u16::from_be_bytes(bytes) as u32)
72 }
73 0xce => {
74 let bytes: [u8; 4] = payload.get(offset + 1..offset + 5)?.try_into().ok()?;
75 Some(u32::from_be_bytes(bytes))
76 }
77 _ => None,
78 }
79}
80
81fn msgpack_str_at(payload: &[u8], offset: usize) -> Option<(&str, usize)> {
82 let marker = *payload.get(offset)?;
83 let (len, start) = match marker {
84 0xa0..=0xbf => ((marker & 0x1f) as usize, offset + 1),
85 0xd9 => (*payload.get(offset + 1)? as usize, offset + 2),
86 0xda => {
87 let bytes: [u8; 2] = payload.get(offset + 1..offset + 3)?.try_into().ok()?;
88 (u16::from_be_bytes(bytes) as usize, offset + 3)
89 }
90 0xdb => {
91 let bytes: [u8; 4] = payload.get(offset + 1..offset + 5)?.try_into().ok()?;
92 (u32::from_be_bytes(bytes) as usize, offset + 5)
93 }
94 _ => return None,
95 };
96 let end = start.checked_add(len)?;
97 std::str::from_utf8(payload.get(start..end)?)
98 .ok()
99 .map(|value| (value, end))
100}
101
102fn snapshot_payload_version(payload: &[u8]) -> Option<u32> {
103 if let Some((len, offset)) = msgpack_array_start(payload) {
104 if len == 0 {
105 return None;
106 }
107 return msgpack_u32_at(payload, offset);
108 }
109
110 if let Some((len, offset)) = msgpack_map_start(payload) {
111 if len == 0 {
112 return None;
113 }
114 if let Some((key, value_offset)) = msgpack_str_at(payload, offset) {
115 if key == "version" {
116 return msgpack_u32_at(payload, value_offset);
117 }
118 }
119 }
120
121 let value: serde_json::Value = rmp_serde::from_slice(payload).ok()?;
122 match value {
123 serde_json::Value::Array(fields) => fields.first()?.as_u64()?.try_into().ok(),
124 serde_json::Value::Object(fields) => fields.get("version")?.as_u64()?.try_into().ok(),
125 _ => None,
126 }
127}
128
129#[derive(Debug, Serialize, Deserialize)]
131pub struct EngineStateSnapshot {
132 pub version: u32,
134 pub last_command_id: i64,
136 pub last_l2_seq: i64,
138 pub next_order_id: u64,
140 pub next_trade_id: u64,
142 pub expired_instruments: HashMap<String, bool>,
144 pub orderbooks: HashMap<String, Vec<OrderSnapshotEntry>>,
146 #[serde(default)]
148 pub engine_positions: HashMap<(WalletAddress, String), crate::rsm::engine_deps::EnginePosition>,
149 pub balance_ledger: HashMap<WalletAddress, rust_decimal::Decimal>,
151 pub last_balance_update_seq: u64,
153 #[serde(default)]
155 pub deposit_update_watermarks:
156 HashMap<WalletAddress, crate::rsm::engine_deps::DepositUpdateWatermark>,
157 #[serde(default)]
159 pub agent_authorizations: HashMap<WalletAddress, Vec<(WalletAddress, Option<u64>)>>,
160 #[serde(default)]
162 pub agent_auth_loaded: bool,
163 #[serde(default)]
165 pub nonce_watermarks: HashMap<WalletAddress, u64>,
166 #[serde(default)]
168 pub nonce_sets: HashMap<WalletAddress, Vec<u64>>,
169 #[serde(default)]
171 pub hypercore_account_equity: HashMap<WalletAddress, rust_decimal::Decimal>,
172 #[serde(default)]
174 pub hypercore_equity_timestamps: HashMap<WalletAddress, u64>,
175 #[serde(default)]
177 pub perp_positions: HashMap<(String, String), crate::hypercore::PerpPosition>,
178 #[serde(default)]
180 pub spot_prices: HashMap<String, rust_decimal::Decimal>,
181 #[serde(default)]
183 pub iv_surfaces: HashMap<String, VolatilitySurfaceSnapshot>,
184 #[serde(default)]
186 pub iv_source_timestamps: HashMap<String, i64>,
187 #[serde(default)]
189 pub instrument_trading_modes: HashMap<String, hypercall_types::TradingModes>,
190 #[serde(default)]
192 pub mmp_state: HashMap<(WalletAddress, String), crate::rsm::engine_deps::EngineMmpState>,
193 #[serde(default)]
195 pub liquidation_states: HashMap<WalletAddress, hypercall_types::LiquidationStateType>,
196 #[serde(default)]
198 pub wallet_margin_modes: HashMap<WalletAddress, crate::rsm::margin_mode::MarginMode>,
199 #[serde(default)]
201 pub mmp_enabled: HashMap<(WalletAddress, String), bool>,
202 #[serde(default)]
204 pub wallet_trading_limits: HashMap<WalletAddress, hypercall_types::api_models::TradingLimits>,
205 #[serde(default)]
207 pub default_trading_limits: hypercall_types::api_models::TradingLimits,
208 #[serde(default)]
210 pub wallet_tiers: HashMap<WalletAddress, String>,
211 #[serde(default)]
213 pub rsm_signer_nonces: HashMap<WalletAddress, u64>,
214 #[serde(default)]
219 pub applied_deposit_source_event_hashes:
220 std::collections::BTreeSet<alloy::primitives::FixedBytes<32>>,
221 #[serde(default)]
224 pub pm_settlement_state: crate::rsm::portfolio_margin::settlement_state::PmSettlementState,
225}
226
227#[derive(Debug, Deserialize)]
234struct LegacyEngineStateSnapshot {
235 version: u32,
236 last_command_id: i64,
237 last_l2_seq: i64,
238 next_order_id: u64,
239 next_trade_id: u64,
240 expired_instruments: HashMap<String, bool>,
241 orderbooks: HashMap<String, Vec<OrderSnapshotEntry>>,
242 #[serde(default)]
243 engine_positions: HashMap<(WalletAddress, String), crate::rsm::engine_deps::EnginePosition>,
244 balance_ledger: HashMap<WalletAddress, rust_decimal::Decimal>,
245 #[serde(default)]
246 deposit_update_watermarks:
247 HashMap<WalletAddress, crate::rsm::engine_deps::DepositUpdateWatermark>,
248 #[serde(default)]
249 agent_authorizations: HashMap<WalletAddress, Vec<(WalletAddress, Option<u64>)>>,
250 #[serde(default)]
251 agent_auth_loaded: bool,
252 #[serde(default)]
253 nonce_watermarks: HashMap<WalletAddress, u64>,
254 #[serde(default)]
255 nonce_sets: HashMap<WalletAddress, Vec<u64>>,
256 #[serde(default)]
257 hypercore_account_equity: HashMap<WalletAddress, rust_decimal::Decimal>,
258 #[serde(default)]
259 hypercore_equity_timestamps: HashMap<WalletAddress, u64>,
260 #[serde(default)]
261 perp_positions: HashMap<(String, String), crate::hypercore::PerpPosition>,
262 #[serde(default)]
263 spot_prices: HashMap<String, rust_decimal::Decimal>,
264 #[serde(default)]
265 iv_surfaces: HashMap<String, VolatilitySurfaceSnapshot>,
266 #[serde(default)]
267 iv_source_timestamps: HashMap<String, i64>,
268 #[serde(default)]
269 instrument_trading_modes: HashMap<String, hypercall_types::TradingModes>,
270 #[serde(default)]
271 mmp_state: HashMap<(WalletAddress, String), crate::rsm::engine_deps::EngineMmpState>,
272 #[serde(default)]
273 liquidation_states: HashMap<WalletAddress, hypercall_types::LiquidationStateType>,
274 #[serde(default)]
275 wallet_margin_modes: HashMap<WalletAddress, crate::rsm::margin_mode::MarginMode>,
276 #[serde(default)]
277 mmp_enabled: HashMap<(WalletAddress, String), bool>,
278 #[serde(default)]
279 wallet_trading_limits: HashMap<WalletAddress, hypercall_types::api_models::TradingLimits>,
280 #[serde(default)]
281 default_trading_limits: hypercall_types::api_models::TradingLimits,
282 #[serde(default)]
283 wallet_tiers: HashMap<WalletAddress, String>,
284 #[serde(default)]
285 rsm_signer_nonces: HashMap<WalletAddress, u64>,
286 #[serde(default)]
287 applied_deposit_source_event_hashes:
288 std::collections::BTreeSet<alloy::primitives::FixedBytes<32>>,
289}
290
291#[derive(Debug, Deserialize)]
296struct LegacyEngineStateSnapshotPreAppendOnlyV2 {
297 version: u32,
298 last_command_id: i64,
299 last_l2_seq: i64,
300 next_order_id: u64,
301 next_trade_id: u64,
302 expired_instruments: HashMap<String, bool>,
303 orderbooks: HashMap<String, Vec<OrderSnapshotEntry>>,
304 #[serde(default)]
305 engine_positions: HashMap<(WalletAddress, String), crate::rsm::engine_deps::EnginePosition>,
306 balance_ledger: HashMap<WalletAddress, rust_decimal::Decimal>,
307 #[serde(default)]
308 deposit_update_watermarks:
309 HashMap<WalletAddress, crate::rsm::engine_deps::DepositUpdateWatermark>,
310 #[serde(default)]
311 applied_deposit_source_event_hashes:
312 std::collections::BTreeSet<alloy::primitives::FixedBytes<32>>,
313 #[serde(default)]
314 agent_authorizations: HashMap<WalletAddress, Vec<(WalletAddress, Option<u64>)>>,
315 #[serde(default)]
316 agent_auth_loaded: bool,
317 #[serde(default)]
318 nonce_watermarks: HashMap<WalletAddress, u64>,
319 #[serde(default)]
320 nonce_sets: HashMap<WalletAddress, Vec<u64>>,
321 #[serde(default)]
322 hypercore_account_equity: HashMap<WalletAddress, rust_decimal::Decimal>,
323 #[serde(default)]
324 hypercore_equity_timestamps: HashMap<WalletAddress, u64>,
325 #[serde(default)]
326 perp_positions: HashMap<(String, String), crate::hypercore::PerpPosition>,
327 #[serde(default)]
328 spot_prices: HashMap<String, rust_decimal::Decimal>,
329 #[serde(default)]
330 iv_surfaces: HashMap<String, VolatilitySurfaceSnapshot>,
331 #[serde(default)]
332 iv_source_timestamps: HashMap<String, i64>,
333 #[serde(default)]
334 instrument_trading_modes: HashMap<String, hypercall_types::TradingModes>,
335 #[serde(default)]
336 mmp_state: HashMap<(WalletAddress, String), crate::rsm::engine_deps::EngineMmpState>,
337 #[serde(default)]
338 liquidation_states: HashMap<WalletAddress, hypercall_types::LiquidationStateType>,
339 #[serde(default)]
340 wallet_margin_modes: HashMap<WalletAddress, crate::rsm::margin_mode::MarginMode>,
341 #[serde(default)]
342 mmp_enabled: HashMap<(WalletAddress, String), bool>,
343 #[serde(default)]
344 wallet_trading_limits: HashMap<WalletAddress, hypercall_types::api_models::TradingLimits>,
345 #[serde(default)]
346 default_trading_limits: hypercall_types::api_models::TradingLimits,
347 #[serde(default)]
348 wallet_tiers: HashMap<WalletAddress, String>,
349 #[serde(default)]
350 rsm_signer_nonces: HashMap<WalletAddress, u64>,
351}
352
353impl From<LegacyEngineStateSnapshotPreAppendOnlyV2> for LegacyEngineStateSnapshot {
354 fn from(snapshot: LegacyEngineStateSnapshotPreAppendOnlyV2) -> Self {
355 Self {
356 version: snapshot.version,
357 last_command_id: snapshot.last_command_id,
358 last_l2_seq: snapshot.last_l2_seq,
359 next_order_id: snapshot.next_order_id,
360 next_trade_id: snapshot.next_trade_id,
361 expired_instruments: snapshot.expired_instruments,
362 orderbooks: snapshot.orderbooks,
363 engine_positions: snapshot.engine_positions,
364 balance_ledger: snapshot.balance_ledger,
365 deposit_update_watermarks: snapshot.deposit_update_watermarks,
366 agent_authorizations: snapshot.agent_authorizations,
367 agent_auth_loaded: snapshot.agent_auth_loaded,
368 nonce_watermarks: snapshot.nonce_watermarks,
369 nonce_sets: snapshot.nonce_sets,
370 hypercore_account_equity: snapshot.hypercore_account_equity,
371 hypercore_equity_timestamps: snapshot.hypercore_equity_timestamps,
372 perp_positions: snapshot.perp_positions,
373 spot_prices: snapshot.spot_prices,
374 iv_surfaces: snapshot.iv_surfaces,
375 iv_source_timestamps: snapshot.iv_source_timestamps,
376 instrument_trading_modes: snapshot.instrument_trading_modes,
377 mmp_state: snapshot.mmp_state,
378 liquidation_states: snapshot.liquidation_states,
379 wallet_margin_modes: snapshot.wallet_margin_modes,
380 mmp_enabled: snapshot.mmp_enabled,
381 wallet_trading_limits: snapshot.wallet_trading_limits,
382 default_trading_limits: snapshot.default_trading_limits,
383 wallet_tiers: snapshot.wallet_tiers,
384 rsm_signer_nonces: snapshot.rsm_signer_nonces,
385 applied_deposit_source_event_hashes: snapshot.applied_deposit_source_event_hashes,
386 }
387 }
388}
389
390impl LegacyEngineStateSnapshot {
391 fn into_migration_snapshot(mut self, path: &Path) -> EngineStateSnapshot {
392 let missing_hypercore_equity_watermarks: Vec<WalletAddress> = self
393 .hypercore_account_equity
394 .keys()
395 .filter(|wallet| !self.hypercore_equity_timestamps.contains_key(wallet))
396 .copied()
397 .collect();
398 if !missing_hypercore_equity_watermarks.is_empty() {
399 warn!(
400 snapshot_path = %path.display(),
401 missing_watermark_count = missing_hypercore_equity_watermarks.len(),
402 "Legacy engine snapshot has HyperCore equity without timestamp watermarks; seeding zero watermarks for one-time v3 migration replay"
403 );
404 for wallet in missing_hypercore_equity_watermarks {
405 self.hypercore_equity_timestamps.insert(wallet, 0);
406 }
407 }
408
409 EngineStateSnapshot {
410 version: SNAPSHOT_VERSION,
411 last_command_id: self.last_command_id,
412 last_l2_seq: self.last_l2_seq,
413 next_order_id: self.next_order_id,
414 next_trade_id: self.next_trade_id,
415 expired_instruments: self.expired_instruments,
416 orderbooks: self.orderbooks,
417 engine_positions: self.engine_positions,
418 balance_ledger: self.balance_ledger,
419 last_balance_update_seq: LEGACY_BALANCE_UPDATE_SEQ_MIGRATION_EPOCH,
420 deposit_update_watermarks: self.deposit_update_watermarks,
421 agent_authorizations: self.agent_authorizations,
422 agent_auth_loaded: self.agent_auth_loaded,
423 nonce_watermarks: self.nonce_watermarks,
424 nonce_sets: self.nonce_sets,
425 hypercore_account_equity: self.hypercore_account_equity,
426 hypercore_equity_timestamps: self.hypercore_equity_timestamps,
427 perp_positions: self.perp_positions,
428 spot_prices: self.spot_prices,
429 iv_surfaces: self.iv_surfaces,
430 iv_source_timestamps: self.iv_source_timestamps,
431 instrument_trading_modes: self.instrument_trading_modes,
432 mmp_state: self.mmp_state,
433 liquidation_states: self.liquidation_states,
434 wallet_margin_modes: self.wallet_margin_modes,
435 mmp_enabled: self.mmp_enabled,
436 wallet_trading_limits: self.wallet_trading_limits,
437 default_trading_limits: self.default_trading_limits,
438 wallet_tiers: self.wallet_tiers,
439 rsm_signer_nonces: self.rsm_signer_nonces,
440 applied_deposit_source_event_hashes: self.applied_deposit_source_event_hashes,
441 pm_settlement_state:
442 crate::rsm::portfolio_margin::settlement_state::PmSettlementState::default(),
443 }
444 }
445}
446
447fn decode_legacy_engine_state_snapshot(
448 payload: &[u8],
449 path: &Path,
450) -> Result<(LegacyEngineStateSnapshot, &'static str), String> {
451 match rmp_serde::from_slice::<LegacyEngineStateSnapshot>(payload) {
452 Ok(snapshot) => Ok((snapshot, "append-only-v1-v2")),
453 Err(append_only_error) => {
454 let pre_append_snapshot =
455 rmp_serde::from_slice::<LegacyEngineStateSnapshotPreAppendOnlyV2>(payload)
456 .map_err(|pre_append_error| {
457 format!(
458 "failed to deserialize legacy snapshot {} for CALL-1707 migration replay; append-only layout error: {}; pre-append-only v2 layout error: {}",
459 path.display(),
460 append_only_error,
461 pre_append_error,
462 )
463 })?;
464 Ok((pre_append_snapshot.into(), "pre-append-only-v2"))
465 }
466 }
467}
468
469#[derive(Debug, Clone, Serialize, Deserialize)]
470pub struct VolatilitySurfaceSnapshot {
471 #[serde(default)]
472 pub strike_points: Vec<crate::vol_oracle::vol_surface_cache::VolPoint>,
473 #[serde(default)]
474 pub atm_vols: Vec<(i64, f64)>,
475 #[serde(default)]
476 pub delta_curves: Vec<crate::vol_oracle::vol_surface_cache::DeltaCurveExport>,
477}
478
479impl From<&crate::vol_oracle::vol_surface_cache::VolatilitySurface> for VolatilitySurfaceSnapshot {
480 fn from(surface: &crate::vol_oracle::vol_surface_cache::VolatilitySurface) -> Self {
481 Self {
482 strike_points: surface.export_all_points(),
483 atm_vols: surface.export_atm_vols(),
484 delta_curves: surface.export_delta_curves(),
485 }
486 }
487}
488
489impl VolatilitySurfaceSnapshot {
490 pub fn into_surface(self) -> crate::vol_oracle::vol_surface_cache::VolatilitySurface {
491 let mut surface = crate::vol_oracle::vol_surface_cache::VolatilitySurface::new();
492 for point in self.strike_points {
493 surface.insert(point.strike, point.expiry, point.iv);
494 }
495 for (expiry, iv) in self.atm_vols {
496 surface.set_atm_vol(expiry, iv);
497 }
498 for curve in self.delta_curves {
499 for point in curve.points {
500 surface.set_delta_iv(curve.expiry, point.delta, point.iv);
501 }
502 }
503 surface
504 }
505}
506
507#[derive(Debug, Serialize, Deserialize)]
512pub struct OrderSnapshotEntry {
513 pub order_id: u64,
514 pub price: Decimal,
515 pub quantity: Decimal,
516 pub side: Side,
517 pub wallet: WalletAddress,
518 pub timestamp: u64,
519 #[serde(default)]
521 pub client_id: Option<String>,
522 #[serde(default)]
524 pub mmp_enabled: bool,
525 #[serde(default)]
528 pub original_size: Option<Decimal>,
529}
530
531pub fn snapshot_path_from_wal_path(wal_path: &Path) -> PathBuf {
533 let mut os = wal_path.as_os_str().to_os_string();
534 os.push(".engine_snapshot");
535 PathBuf::from(os)
536}
537
538pub fn build_snapshot(ctx: &EngineCtx, checkpoint: &WalCheckpointMetadata) -> EngineStateSnapshot {
545 let mut orderbooks = HashMap::new();
546 let mut total_orders = 0u64;
547
548 for (symbol, orderbook) in &ctx.orderbooks {
549 let all_orders = orderbook.get_all_orders();
550 let entries: Vec<OrderSnapshotEntry> = all_orders
551 .into_iter()
552 .map(|r| OrderSnapshotEntry {
553 order_id: r.order_id,
554 price: r.price,
555 quantity: r.quantity,
556 side: r.side,
557 wallet: r.wallet,
558 timestamp: r.timestamp,
559 client_id: r.client_id,
560 mmp_enabled: r.mmp_enabled,
561 original_size: Some(r.original_size),
562 })
563 .collect();
564 total_orders += entries.len() as u64;
565 orderbooks.insert(symbol.clone(), entries);
566 }
567
568 info!(
569 "Built engine state snapshot: last_command_id={}, last_l2_seq={}, \
570 next_order_id={}, next_trade_id={}, orderbooks={}, orders={}",
571 checkpoint.last_command_id,
572 checkpoint.last_l2_seq,
573 ctx.next_order_id,
574 ctx.next_trade_id,
575 orderbooks.len(),
576 total_orders,
577 );
578
579 let agent_authorizations: HashMap<WalletAddress, Vec<(WalletAddress, Option<u64>)>> = ctx
580 .agent_authorizations
581 .iter()
582 .map(|(wallet, agents)| {
583 let mut sorted: Vec<(WalletAddress, Option<u64>)> =
584 agents.iter().map(|(a, e)| (*a, *e)).collect();
585 sorted.sort_by_key(|(a, _)| *a);
586 (*wallet, sorted)
587 })
588 .collect();
589
590 if let Some(wallet) = ctx
591 .deps
592 .hypercore_account_equity
593 .keys()
594 .find(|wallet| !ctx.deps.hypercore_equity_timestamps.contains_key(wallet))
595 {
596 panic!(
597 "build_snapshot refusing to serialize hypercore_account_equity without \
598 hypercore_equity_timestamps for wallet {}; read_snapshot enforces the same invariant",
599 wallet
600 );
601 }
602
603 EngineStateSnapshot {
604 version: SNAPSHOT_VERSION,
605 last_command_id: checkpoint.last_command_id,
606 last_l2_seq: checkpoint.last_l2_seq,
607 next_order_id: ctx.next_order_id,
608 next_trade_id: ctx.next_trade_id,
609 expired_instruments: ctx.expired_instruments.clone(),
610 spot_prices: ctx.spot_prices.clone(),
611 iv_surfaces: ctx
612 .iv_surfaces
613 .iter()
614 .map(|(underlying, surface)| (underlying.clone(), surface.into()))
615 .collect(),
616 iv_source_timestamps: ctx.iv_source_timestamps.clone(),
617 instrument_trading_modes: ctx.instrument_trading_modes.clone(),
618 orderbooks,
619 engine_positions: ctx.engine_positions.clone(),
620 mmp_state: ctx.mmp_state.clone(),
621 liquidation_states: ctx.deps.liquidation_states.clone(),
622 wallet_margin_modes: ctx.deps.wallet_margin_modes.clone(),
623 mmp_enabled: ctx.deps.mmp_enabled.clone(),
624 wallet_trading_limits: ctx.deps.wallet_trading_limits.clone(),
625 default_trading_limits: ctx.deps.default_trading_limits,
626 wallet_tiers: ctx.deps.wallet_tiers.clone(),
627 balance_ledger: ctx.balance_ledger.snapshot_map(),
628 last_balance_update_seq: ctx.balance_ledger.last_balance_update_seq(),
629 deposit_update_watermarks: ctx.deposit_update_watermarks.clone(),
630 applied_deposit_source_event_hashes: ctx.applied_deposit_source_event_hashes.clone(),
631 pm_settlement_state: ctx.pm_settlement_state.clone(),
632 agent_authorizations,
633 agent_auth_loaded: true,
634 nonce_watermarks: HashMap::new(),
635 nonce_sets: {
636 let now_ms = std::time::SystemTime::now()
637 .duration_since(std::time::UNIX_EPOCH)
638 .map(|d| d.as_millis() as u64)
639 .unwrap_or(0);
640 ctx.nonce_sets
641 .iter()
642 .filter_map(|(signer, set)| {
643 let mut pruned = set.clone();
644 pruned.purge_expired(now_ms);
645 if pruned.count() > 0 {
646 Some((*signer, pruned.to_vec()))
647 } else {
648 None
649 }
650 })
651 .collect()
652 },
653 rsm_signer_nonces: ctx.rsm_signer_nonces.clone(),
654 hypercore_account_equity: ctx.deps.hypercore_account_equity.clone(),
655 hypercore_equity_timestamps: ctx.deps.hypercore_equity_timestamps.clone(),
656 perp_positions: ctx.deps.perp_positions.clone(),
657 }
658}
659
660pub fn write_snapshot(path: &Path, snapshot: &EngineStateSnapshot) -> Result<(), String> {
662 if let Some(parent) = path.parent() {
663 if !parent.as_os_str().is_empty() {
664 create_dir_all(parent).map_err(|e| {
665 format!(
666 "failed to create snapshot directory {}: {}",
667 parent.display(),
668 e
669 )
670 })?;
671 }
672 }
673
674 let mut tmp_os = path.as_os_str().to_os_string();
675 tmp_os.push(".tmp");
676 let tmp_path = PathBuf::from(tmp_os);
677
678 #[cfg(unix)]
679 let mut file = {
680 use std::os::unix::fs::OpenOptionsExt;
681 OpenOptions::new()
682 .create(true)
683 .truncate(true)
684 .write(true)
685 .mode(0o600)
686 .open(&tmp_path)
687 .map_err(|e| format!("failed to open snapshot temp {}: {}", tmp_path.display(), e))?
688 };
689 #[cfg(not(unix))]
690 let mut file = OpenOptions::new()
691 .create(true)
692 .truncate(true)
693 .write(true)
694 .open(&tmp_path)
695 .map_err(|e| format!("failed to open snapshot temp {}: {}", tmp_path.display(), e))?;
696
697 let payload =
698 rmp_serde::to_vec(snapshot).map_err(|e| format!("failed to serialize snapshot: {}", e))?;
699
700 let crc = SNAPSHOT_CRC.checksum(&payload);
701
702 file.write_all(&payload).map_err(|e| {
703 format!(
704 "failed to write snapshot temp {}: {}",
705 tmp_path.display(),
706 e
707 )
708 })?;
709 file.write_all(&crc.to_le_bytes())
710 .map_err(|e| format!("failed to write snapshot CRC {}: {}", tmp_path.display(), e))?;
711 file.sync_data()
712 .map_err(|e| format!("failed to sync snapshot temp {}: {}", tmp_path.display(), e))?;
713
714 std::fs::rename(&tmp_path, path).map_err(|e| {
715 format!(
716 "failed to atomically replace snapshot {} with {}: {}",
717 path.display(),
718 tmp_path.display(),
719 e
720 )
721 })?;
722
723 #[cfg(unix)]
724 {
725 let parent = path.parent().ok_or_else(|| {
726 format!(
727 "snapshot {} has no parent directory to sync",
728 path.display()
729 )
730 })?;
731 let parent = if parent.as_os_str().is_empty() {
732 Path::new(".")
733 } else {
734 parent
735 };
736
737 OpenOptions::new()
738 .read(true)
739 .open(parent)
740 .and_then(|dir| dir.sync_all())
741 .map_err(|e| {
742 format!(
743 "failed to sync snapshot directory {}: {}",
744 parent.display(),
745 e
746 )
747 })?;
748 }
749
750 Ok(())
751}
752
753pub fn read_snapshot(path: &Path) -> Result<Option<EngineStateSnapshot>, String> {
758 if !path.exists() {
759 return Ok(None);
760 }
761
762 let data = std::fs::read(path)
763 .map_err(|e| format!("failed to read snapshot {}: {}", path.display(), e))?;
764
765 if data.len() < 4 {
766 return Err(format!(
767 "snapshot {} too small ({} bytes, need at least 4 for CRC)",
768 path.display(),
769 data.len()
770 ));
771 }
772
773 let (payload, crc_bytes) = data.split_at(data.len() - 4);
774 let stored_crc = u32::from_le_bytes(
775 crc_bytes
776 .try_into()
777 .map_err(|_| format!("snapshot {} CRC slice length mismatch", path.display()))?,
778 );
779 let computed_crc = SNAPSHOT_CRC.checksum(payload);
780
781 if stored_crc != computed_crc {
782 return Err(format!(
783 "snapshot {} CRC mismatch: stored={:#010x}, computed={:#010x}",
784 path.display(),
785 stored_crc,
786 computed_crc
787 ));
788 }
789
790 let payload_version = snapshot_payload_version(payload);
791 match payload_version {
792 Some(LEGACY_SNAPSHOT_VERSION_V2 | LEGACY_SNAPSHOT_VERSION_V1) => {
793 let (legacy, legacy_layout) = decode_legacy_engine_state_snapshot(payload, path)?;
794 if !matches!(
795 legacy.version,
796 LEGACY_SNAPSHOT_VERSION_V2 | LEGACY_SNAPSHOT_VERSION_V1
797 ) {
798 return Err(format!(
799 "snapshot {} payload version probe returned {}, but decoded legacy version was {}",
800 path.display(),
801 payload_version.expect("legacy version branch has a version"),
802 legacy.version
803 ));
804 }
805 info!(
806 snapshot_path = %path.display(),
807 legacy_version = legacy.version,
808 legacy_layout = legacy_layout,
809 legacy_last_command_id = legacy.last_command_id,
810 balance_update_seq = LEGACY_BALANCE_UPDATE_SEQ_MIGRATION_EPOCH,
811 "Restoring legacy engine snapshot as temporary CALL-1707 migration base; WAL tail replay will rebuild canonical balance-update sequence state"
812 );
813 return Ok(Some(legacy.into_migration_snapshot(path)));
814 }
815 Some(SNAPSHOT_VERSION) | None => {}
816 Some(unsupported) => {
817 return Err(format!(
818 "snapshot {} has unsupported version {} (expected {})",
819 path.display(),
820 unsupported,
821 SNAPSHOT_VERSION,
822 ));
823 }
824 }
825 if payload_version.is_none() && msgpack_map_start(payload).is_some() {
826 info!(
827 snapshot_path = %path.display(),
828 "Ignoring legacy map-shaped engine snapshot without readable version metadata; WAL replay will rebuild engine state"
829 );
830 return Ok(None);
836 }
837
838 let snapshot: EngineStateSnapshot = rmp_serde::from_slice(payload)
839 .map_err(|e| format!("failed to deserialize snapshot {}: {}", path.display(), e))?;
840
841 match snapshot.version {
842 SNAPSHOT_VERSION => {}
843 unsupported => {
844 return Err(format!(
845 "snapshot {} has unsupported version {} (expected {})",
846 path.display(),
847 unsupported,
848 SNAPSHOT_VERSION,
849 ));
850 }
851 }
852
853 if snapshot.version == SNAPSHOT_VERSION {
854 let missing_hypercore_equity_watermarks = snapshot
855 .hypercore_account_equity
856 .keys()
857 .filter(|wallet| !snapshot.hypercore_equity_timestamps.contains_key(wallet))
858 .count();
859 if missing_hypercore_equity_watermarks > 0 {
860 return Err(format!(
861 "snapshot {} has {} HyperCore equity entries without timestamp watermarks",
862 path.display(),
863 missing_hypercore_equity_watermarks
864 ));
865 }
866 }
867
868 Ok(Some(snapshot))
869}
870
871pub fn snapshot_order_count(snapshot: &EngineStateSnapshot) -> usize {
873 snapshot.orderbooks.values().map(|v| v.len()).sum()
874}
875
876#[cfg(test)]
877mod tests {
878 use super::*;
879
880 fn make_test_snapshot() -> EngineStateSnapshot {
881 let mut orderbooks = HashMap::new();
882 orderbooks.insert(
883 "ETH-20260131-4000-C".to_string(),
884 vec![
885 OrderSnapshotEntry {
886 order_id: 1,
887 price: Decimal::new(1000, 1), quantity: Decimal::new(50, 1), side: Side::Buy,
890 wallet: WalletAddress::from(alloy::primitives::Address::repeat_byte(1)),
891 timestamp: 1000,
892 client_id: None,
893 mmp_enabled: false,
894 original_size: None,
895 },
896 OrderSnapshotEntry {
897 order_id: 2,
898 price: Decimal::new(1100, 1), quantity: Decimal::new(30, 1), side: Side::Sell,
901 wallet: WalletAddress::from(alloy::primitives::Address::repeat_byte(2)),
902 timestamp: 2000,
903 client_id: None,
904 mmp_enabled: false,
905 original_size: None,
906 },
907 ],
908 );
909
910 let mut expired = HashMap::new();
911 expired.insert("ETH-20250101-3000-C".to_string(), true);
912
913 EngineStateSnapshot {
914 version: SNAPSHOT_VERSION,
915 last_command_id: 42,
916 last_l2_seq: 99,
917 next_order_id: 100,
918 next_trade_id: 50,
919 expired_instruments: expired,
920 spot_prices: HashMap::new(),
921 iv_surfaces: HashMap::new(),
922 iv_source_timestamps: HashMap::new(),
923 instrument_trading_modes: HashMap::new(),
924 orderbooks,
925 engine_positions: HashMap::new(),
926 mmp_state: HashMap::new(),
927 liquidation_states: HashMap::new(),
928 wallet_margin_modes: HashMap::new(),
929 mmp_enabled: HashMap::new(),
930 wallet_trading_limits: HashMap::new(),
931 default_trading_limits: hypercall_types::api_models::TradingLimits::default(),
932 wallet_tiers: HashMap::new(),
933 balance_ledger: HashMap::new(),
934 last_balance_update_seq: 0,
935 deposit_update_watermarks: HashMap::new(),
936 applied_deposit_source_event_hashes: std::collections::BTreeSet::new(),
937 pm_settlement_state:
938 crate::rsm::portfolio_margin::settlement_state::PmSettlementState::default(),
939 agent_authorizations: HashMap::new(),
940 agent_auth_loaded: false,
941 nonce_watermarks: HashMap::new(),
942 nonce_sets: HashMap::new(),
943 rsm_signer_nonces: HashMap::new(),
944 hypercore_account_equity: HashMap::new(),
945 hypercore_equity_timestamps: HashMap::new(),
946 perp_positions: HashMap::new(),
947 }
948 }
949
950 #[test]
951 fn snapshot_roundtrip() {
952 let dir = tempfile::tempdir().expect("create tempdir");
953 let path = dir.path().join("test.engine_snapshot");
954 let snapshot = make_test_snapshot();
955
956 write_snapshot(&path, &snapshot).expect("write snapshot");
957 let loaded = read_snapshot(&path)
958 .expect("read snapshot")
959 .expect("snapshot should exist");
960
961 assert_eq!(loaded.version, SNAPSHOT_VERSION);
962 assert_eq!(loaded.last_command_id, 42);
963 assert_eq!(loaded.last_l2_seq, 99);
964 assert_eq!(loaded.next_order_id, 100);
965 assert_eq!(loaded.next_trade_id, 50);
966 assert_eq!(loaded.expired_instruments.len(), 1);
967 assert_eq!(
968 loaded.expired_instruments.get("ETH-20250101-3000-C"),
969 Some(&true)
970 );
971
972 let orders = loaded
973 .orderbooks
974 .get("ETH-20260131-4000-C")
975 .expect("should have orderbook");
976 assert_eq!(orders.len(), 2);
977 assert_eq!(orders[0].order_id, 1);
978 assert_eq!(orders[1].order_id, 2);
979 }
980
981 #[test]
982 fn snapshot_missing_returns_none() {
983 let dir = tempfile::tempdir().expect("create tempdir");
984 let path = dir.path().join("missing.engine_snapshot");
985 assert!(read_snapshot(&path).expect("should not error").is_none());
986 }
987
988 #[test]
989 fn snapshot_corrupt_crc_returns_err() {
990 let dir = tempfile::tempdir().expect("create tempdir");
991 let path = dir.path().join("corrupt.engine_snapshot");
992 let snapshot = make_test_snapshot();
993
994 write_snapshot(&path, &snapshot).expect("write snapshot");
995
996 let mut data = std::fs::read(&path).unwrap();
998 if data.len() > 4 {
999 data[0] ^= 0xFF;
1000 }
1001 std::fs::write(&path, &data).unwrap();
1002
1003 let err = read_snapshot(&path).expect_err("should detect corruption");
1004 assert!(err.contains("CRC mismatch"), "unexpected error: {}", err);
1005 }
1006
1007 #[test]
1008 fn snapshot_too_small_returns_err() {
1009 let dir = tempfile::tempdir().expect("create tempdir");
1010 let path = dir.path().join("tiny.engine_snapshot");
1011 std::fs::write(&path, b"ab").unwrap();
1012
1013 let err = read_snapshot(&path).expect_err("should reject too-small file");
1014 assert!(err.contains("too small"), "unexpected error: {}", err);
1015 }
1016
1017 #[test]
1018 fn snapshot_path_from_wal_path_appends_suffix() {
1019 let wal = PathBuf::from("/data/engine-journal.wal");
1020 let snap = snapshot_path_from_wal_path(&wal);
1021 assert_eq!(
1022 snap,
1023 PathBuf::from("/data/engine-journal.wal.engine_snapshot")
1024 );
1025 }
1026
1027 #[test]
1028 fn snapshot_order_count_works() {
1029 let snapshot = make_test_snapshot();
1030 assert_eq!(snapshot_order_count(&snapshot), 2);
1031 }
1032
1033 #[test]
1034 fn snapshot_metadata_roundtrip() {
1035 let dir = tempfile::tempdir().expect("create tempdir");
1037 let path = dir.path().join("meta.engine_snapshot");
1038
1039 let mut orderbooks = HashMap::new();
1040 orderbooks.insert(
1041 "BTC-20260311-90000-C".to_string(),
1042 vec![
1043 OrderSnapshotEntry {
1044 order_id: 10,
1045 price: Decimal::new(5000, 0),
1046 quantity: Decimal::new(3, 0),
1047 side: Side::Sell,
1048 wallet: WalletAddress::from(alloy::primitives::Address::repeat_byte(0xAA)),
1049 timestamp: 9999,
1050 client_id: Some("cli_42".to_string()),
1051 mmp_enabled: true,
1052 original_size: Some(Decimal::new(10, 0)),
1053 },
1054 OrderSnapshotEntry {
1055 order_id: 11,
1056 price: Decimal::new(4500, 0),
1057 quantity: Decimal::new(7, 0),
1058 side: Side::Buy,
1059 wallet: WalletAddress::from(alloy::primitives::Address::repeat_byte(0xBB)),
1060 timestamp: 10000,
1061 client_id: None, mmp_enabled: false,
1063 original_size: Some(Decimal::new(7, 0)),
1064 },
1065 ],
1066 );
1067
1068 let snapshot = EngineStateSnapshot {
1069 version: SNAPSHOT_VERSION,
1070 last_command_id: 100,
1071 last_l2_seq: 200,
1072 next_order_id: 500,
1073 next_trade_id: 300,
1074 expired_instruments: HashMap::new(),
1075 spot_prices: HashMap::new(),
1076 iv_surfaces: HashMap::new(),
1077 iv_source_timestamps: HashMap::new(),
1078 instrument_trading_modes: HashMap::new(),
1079 orderbooks,
1080 engine_positions: HashMap::new(),
1081 mmp_state: HashMap::new(),
1082 liquidation_states: HashMap::new(),
1083 wallet_margin_modes: HashMap::new(),
1084 mmp_enabled: HashMap::new(),
1085 wallet_trading_limits: HashMap::new(),
1086 default_trading_limits: hypercall_types::api_models::TradingLimits::default(),
1087 wallet_tiers: HashMap::new(),
1088 balance_ledger: HashMap::new(),
1089 last_balance_update_seq: 0,
1090 deposit_update_watermarks: HashMap::new(),
1091 applied_deposit_source_event_hashes: std::collections::BTreeSet::new(),
1092 pm_settlement_state:
1093 crate::rsm::portfolio_margin::settlement_state::PmSettlementState::default(),
1094 agent_authorizations: HashMap::new(),
1095 agent_auth_loaded: false,
1096 nonce_watermarks: HashMap::new(),
1097 nonce_sets: HashMap::new(),
1098 rsm_signer_nonces: HashMap::new(),
1099 hypercore_account_equity: HashMap::new(),
1100 hypercore_equity_timestamps: HashMap::new(),
1101 perp_positions: HashMap::new(),
1102 };
1103
1104 write_snapshot(&path, &snapshot).expect("write");
1105 let loaded = read_snapshot(&path).expect("read").expect("exists");
1106
1107 let orders = loaded
1108 .orderbooks
1109 .get("BTC-20260311-90000-C")
1110 .expect("orderbook");
1111 assert_eq!(orders.len(), 2);
1112
1113 assert_eq!(orders[0].client_id, Some("cli_42".to_string()));
1115 assert!(orders[0].mmp_enabled);
1116 assert_eq!(orders[0].original_size, Some(Decimal::new(10, 0)));
1117
1118 assert_eq!(orders[1].client_id, None);
1120 assert!(!orders[1].mmp_enabled);
1121 assert_eq!(orders[1].original_size, Some(Decimal::new(7, 0)));
1122 }
1123
1124 #[derive(Debug, Serialize)]
1127 struct LegacyOrderSnapshotEntry {
1128 order_id: u64,
1129 price: Decimal,
1130 quantity: Decimal,
1131 side: Side,
1132 wallet: WalletAddress,
1133 timestamp: u64,
1134 }
1135
1136 #[derive(Debug, Serialize)]
1137 struct MinimalLegacyEngineStateSnapshot {
1138 version: u32,
1139 last_command_id: i64,
1140 last_l2_seq: i64,
1141 next_order_id: u64,
1142 next_trade_id: u64,
1143 expired_instruments: HashMap<String, bool>,
1144 orderbooks: HashMap<String, Vec<LegacyOrderSnapshotEntry>>,
1145 engine_positions: HashMap<(WalletAddress, String), crate::rsm::engine_deps::EnginePosition>,
1146 balance_ledger: HashMap<WalletAddress, Decimal>,
1147 }
1148
1149 #[derive(Debug, Serialize)]
1150 struct MissingBalanceLedgerSnapshot {
1151 version: u32,
1152 last_command_id: i64,
1153 last_l2_seq: i64,
1154 next_order_id: u64,
1155 next_trade_id: u64,
1156 expired_instruments: HashMap<String, bool>,
1157 orderbooks: HashMap<String, Vec<LegacyOrderSnapshotEntry>>,
1158 engine_positions: HashMap<(WalletAddress, String), crate::rsm::engine_deps::EnginePosition>,
1159 last_balance_update_seq: u64,
1160 }
1161
1162 #[derive(Debug, Serialize)]
1163 struct SnapshotBeforeDepositSourceHashField {
1164 version: u32,
1165 last_command_id: i64,
1166 last_l2_seq: i64,
1167 next_order_id: u64,
1168 next_trade_id: u64,
1169 expired_instruments: HashMap<String, bool>,
1170 orderbooks: HashMap<String, Vec<LegacyOrderSnapshotEntry>>,
1171 engine_positions: HashMap<(WalletAddress, String), crate::rsm::engine_deps::EnginePosition>,
1172 balance_ledger: HashMap<WalletAddress, Decimal>,
1173 last_balance_update_seq: u64,
1174 deposit_update_watermarks:
1175 HashMap<WalletAddress, crate::rsm::engine_deps::DepositUpdateWatermark>,
1176 agent_authorizations: HashMap<WalletAddress, Vec<(WalletAddress, Option<u64>)>>,
1177 agent_auth_loaded: bool,
1178 nonce_watermarks: HashMap<WalletAddress, u64>,
1179 nonce_sets: HashMap<WalletAddress, Vec<u64>>,
1180 hypercore_account_equity: HashMap<WalletAddress, Decimal>,
1181 hypercore_equity_timestamps: HashMap<WalletAddress, u64>,
1182 perp_positions: HashMap<(String, String), crate::hypercore::PerpPosition>,
1183 spot_prices: HashMap<String, Decimal>,
1184 iv_surfaces: HashMap<String, VolatilitySurfaceSnapshot>,
1185 iv_source_timestamps: HashMap<String, i64>,
1186 instrument_trading_modes: HashMap<String, hypercall_types::TradingModes>,
1187 mmp_state: HashMap<(WalletAddress, String), crate::rsm::engine_deps::EngineMmpState>,
1188 liquidation_states: HashMap<WalletAddress, hypercall_types::LiquidationStateType>,
1189 wallet_margin_modes: HashMap<WalletAddress, crate::rsm::margin_mode::MarginMode>,
1190 mmp_enabled: HashMap<(WalletAddress, String), bool>,
1191 wallet_trading_limits: HashMap<WalletAddress, hypercall_types::api_models::TradingLimits>,
1192 default_trading_limits: hypercall_types::api_models::TradingLimits,
1193 wallet_tiers: HashMap<WalletAddress, String>,
1194 rsm_signer_nonces: HashMap<WalletAddress, u64>,
1195 }
1196
1197 #[derive(Debug, Serialize)]
1198 struct LegacyV2SnapshotPreAppendOnlySourceHashes {
1199 version: u32,
1200 last_command_id: i64,
1201 last_l2_seq: i64,
1202 next_order_id: u64,
1203 next_trade_id: u64,
1204 expired_instruments: HashMap<String, bool>,
1205 orderbooks: HashMap<String, Vec<LegacyOrderSnapshotEntry>>,
1206 engine_positions: HashMap<(WalletAddress, String), crate::rsm::engine_deps::EnginePosition>,
1207 balance_ledger: HashMap<WalletAddress, Decimal>,
1208 deposit_update_watermarks:
1209 HashMap<WalletAddress, crate::rsm::engine_deps::DepositUpdateWatermark>,
1210 applied_deposit_source_event_hashes:
1211 std::collections::BTreeSet<alloy::primitives::FixedBytes<32>>,
1212 agent_authorizations: HashMap<WalletAddress, Vec<(WalletAddress, Option<u64>)>>,
1213 agent_auth_loaded: bool,
1214 nonce_watermarks: HashMap<WalletAddress, u64>,
1215 nonce_sets: HashMap<WalletAddress, Vec<u64>>,
1216 hypercore_account_equity: HashMap<WalletAddress, Decimal>,
1217 hypercore_equity_timestamps: HashMap<WalletAddress, u64>,
1218 perp_positions: HashMap<(String, String), crate::hypercore::PerpPosition>,
1219 spot_prices: HashMap<String, Decimal>,
1220 iv_surfaces: HashMap<String, VolatilitySurfaceSnapshot>,
1221 iv_source_timestamps: HashMap<String, i64>,
1222 instrument_trading_modes: HashMap<String, hypercall_types::TradingModes>,
1223 mmp_state: HashMap<(WalletAddress, String), crate::rsm::engine_deps::EngineMmpState>,
1224 liquidation_states: HashMap<WalletAddress, hypercall_types::LiquidationStateType>,
1225 wallet_margin_modes: HashMap<WalletAddress, crate::rsm::margin_mode::MarginMode>,
1226 mmp_enabled: HashMap<(WalletAddress, String), bool>,
1227 wallet_trading_limits: HashMap<WalletAddress, hypercall_types::api_models::TradingLimits>,
1228 default_trading_limits: hypercall_types::api_models::TradingLimits,
1229 wallet_tiers: HashMap<WalletAddress, String>,
1230 rsm_signer_nonces: HashMap<WalletAddress, u64>,
1231 }
1232
1233 fn serialize_snapshot_without_hypercore_equity_timestamps(
1234 snapshot: &EngineStateSnapshot,
1235 ) -> Vec<u8> {
1236 let mut value =
1237 rmp_serde::to_vec_named(snapshot).expect("serialize snapshot to named msgpack");
1238 let mut json: serde_json::Value =
1239 rmp_serde::from_slice(&value).expect("deserialize snapshot into json");
1240 let object = json
1241 .as_object_mut()
1242 .expect("named snapshot encoding must deserialize as an object");
1243 let removed = object.remove("hypercore_equity_timestamps");
1244 assert!(
1245 removed.is_some(),
1246 "test snapshot must contain hypercore_equity_timestamps"
1247 );
1248 value.clear();
1249 rmp_serde::encode::write_named(&mut value, &json)
1250 .expect("re-encode named snapshot without watermarks");
1251 value
1252 }
1253
1254 #[test]
1255 fn legacy_snapshot_without_balance_update_seq_restores_as_migration_base() {
1256 let dir = tempfile::tempdir().expect("create tempdir");
1257 let path = dir.path().join("old.engine_snapshot");
1258 let wallet = WalletAddress::from(alloy::primitives::Address::repeat_byte(0x42));
1259
1260 let mut orderbooks = HashMap::new();
1261 orderbooks.insert(
1262 "ETH-20260131-4000-C".to_string(),
1263 vec![LegacyOrderSnapshotEntry {
1264 order_id: 1,
1265 price: Decimal::new(1000, 1),
1266 quantity: Decimal::new(50, 1),
1267 side: Side::Buy,
1268 wallet: WalletAddress::from(alloy::primitives::Address::repeat_byte(1)),
1269 timestamp: 1000,
1270 }],
1271 );
1272 let mut balance_ledger = HashMap::new();
1273 balance_ledger.insert(wallet, Decimal::new(2500, 0));
1274
1275 let legacy = MinimalLegacyEngineStateSnapshot {
1276 version: LEGACY_SNAPSHOT_VERSION_V1,
1277 last_command_id: 42,
1278 last_l2_seq: 99,
1279 next_order_id: 100,
1280 next_trade_id: 50,
1281 expired_instruments: HashMap::new(),
1282 orderbooks,
1283 engine_positions: HashMap::new(),
1284 balance_ledger,
1285 };
1286
1287 let payload = rmp_serde::to_vec(&legacy).expect("serialize legacy");
1289 let crc = SNAPSHOT_CRC.checksum(&payload);
1290 let mut data = payload;
1291 data.extend_from_slice(&crc.to_le_bytes());
1292 std::fs::write(&path, &data).expect("write legacy snapshot");
1293
1294 let loaded = read_snapshot(&path)
1295 .expect("legacy snapshot should migrate")
1296 .expect("legacy snapshot should be present after migration");
1297
1298 assert_eq!(loaded.version, SNAPSHOT_VERSION);
1299 assert_eq!(loaded.last_command_id, 42);
1300 assert_eq!(loaded.last_l2_seq, 99);
1301 assert_eq!(
1302 loaded.last_balance_update_seq, LEGACY_BALANCE_UPDATE_SEQ_MIGRATION_EPOCH,
1303 "legacy snapshots enter CALL-1707 migration at balance update sequence zero"
1304 );
1305 assert_eq!(
1306 loaded.balance_ledger.get(&wallet),
1307 Some(&Decimal::new(2500, 0))
1308 );
1309 let orders = loaded
1310 .orderbooks
1311 .get("ETH-20260131-4000-C")
1312 .expect("legacy orderbook should be restored");
1313 assert_eq!(orders.len(), 1);
1314 assert_eq!(orders[0].order_id, 1);
1315 assert_eq!(orders[0].client_id, None);
1316 assert_eq!(orders[0].original_size, None);
1317 }
1318
1319 #[test]
1320 fn temp_call_1707_legacy_snapshot_tail_replay_e2e_allows_archived_prefix() {
1321 let dir = tempfile::tempdir().expect("create tempdir");
1326 let path = dir.path().join("legacy-tail-replay.engine_snapshot");
1327 let legacy = MinimalLegacyEngineStateSnapshot {
1328 version: LEGACY_SNAPSHOT_VERSION_V2,
1329 last_command_id: 407_647_670,
1330 last_l2_seq: 200_000_000,
1331 next_order_id: 100,
1332 next_trade_id: 50,
1333 expired_instruments: HashMap::new(),
1334 orderbooks: HashMap::new(),
1335 engine_positions: HashMap::new(),
1336 balance_ledger: HashMap::new(),
1337 };
1338
1339 let payload = rmp_serde::to_vec(&legacy).expect("serialize legacy");
1340 let crc = SNAPSHOT_CRC.checksum(&payload);
1341 let mut data = payload;
1342 data.extend_from_slice(&crc.to_le_bytes());
1343 std::fs::write(&path, &data).expect("write legacy snapshot");
1344
1345 let loaded = read_snapshot(&path)
1346 .expect("legacy snapshot should migrate")
1347 .expect("legacy snapshot should be present after migration");
1348 assert_eq!(loaded.last_command_id, 407_647_670);
1349 assert_eq!(
1350 loaded.last_balance_update_seq,
1351 LEGACY_BALANCE_UPDATE_SEQ_MIGRATION_EPOCH
1352 );
1353
1354 crate::rsm::unified_engine::validate_hot_journal_covers_base_replay(
1355 416_984_317,
1356 Some(hypercall_db::JournalCommandIdBounds {
1357 min_command_id: 407_647_671,
1358 max_command_id: 416_984_317,
1359 }),
1360 loaded.last_command_id + 1,
1361 None,
1362 )
1363 .expect("legacy snapshot migration should allow replay from the retained hot journal tail");
1364
1365 let error = crate::rsm::unified_engine::validate_hot_journal_covers_base_replay(
1366 416_984_317,
1367 Some(hypercall_db::JournalCommandIdBounds {
1368 min_command_id: 407_647_672,
1369 max_command_id: 416_984_317,
1370 }),
1371 loaded.last_command_id + 1,
1372 None,
1373 )
1374 .expect_err("a missing first tail command must still fail closed");
1375 assert!(
1376 error.contains("requires replay from command_id 407647671"),
1377 "error should identify the missing post-snapshot tail command: {error}"
1378 );
1379 }
1380
1381 #[test]
1382 fn legacy_map_snapshot_without_version_is_ignored() {
1383 let dir = tempfile::tempdir().expect("create tempdir");
1384 let path = dir.path().join("legacy-map.engine_snapshot");
1385
1386 let mut payload = Vec::new();
1387 rmp_serde::encode::write_named(
1388 &mut payload,
1389 &serde_json::json!({
1390 "legacy_snapshot": true
1391 }),
1392 )
1393 .expect("serialize legacy map snapshot");
1394 let crc = SNAPSHOT_CRC.checksum(&payload);
1395 let mut data = payload;
1396 data.extend_from_slice(&crc.to_le_bytes());
1397 std::fs::write(&path, &data).expect("write legacy map snapshot");
1398
1399 assert!(
1400 read_snapshot(&path)
1401 .expect("legacy map snapshot should be ignored")
1402 .is_none(),
1403 "legacy map-shaped snapshot must be discarded so WAL replay rebuilds engine state"
1404 );
1405 }
1406
1407 #[test]
1408 fn legacy_v2_snapshot_pre_append_only_source_hash_layout_migrates() {
1409 let dir = tempfile::tempdir().expect("create tempdir");
1410 let path = dir
1411 .path()
1412 .join("legacy-v2-pre-append-source-hash.engine_snapshot");
1413 let owner = WalletAddress::from(alloy::primitives::Address::repeat_byte(0x22));
1414 let agent = WalletAddress::from(alloy::primitives::Address::repeat_byte(0x23));
1415 let source_event_hash = alloy::primitives::FixedBytes::<32>::from([0x7a; 32]);
1416 let mut source_event_hashes = std::collections::BTreeSet::new();
1417 source_event_hashes.insert(source_event_hash);
1418 let mut agent_authorizations = HashMap::new();
1419 agent_authorizations.insert(owner, vec![(agent, Some(5555))]);
1420
1421 let legacy = LegacyV2SnapshotPreAppendOnlySourceHashes {
1422 version: LEGACY_SNAPSHOT_VERSION_V2,
1423 last_command_id: 407_647_670,
1424 last_l2_seq: 200_000_000,
1425 next_order_id: 100,
1426 next_trade_id: 50,
1427 expired_instruments: HashMap::new(),
1428 orderbooks: HashMap::new(),
1429 engine_positions: HashMap::new(),
1430 balance_ledger: HashMap::new(),
1431 deposit_update_watermarks: HashMap::new(),
1432 applied_deposit_source_event_hashes: source_event_hashes,
1433 agent_authorizations,
1434 agent_auth_loaded: true,
1435 nonce_watermarks: HashMap::new(),
1436 nonce_sets: HashMap::new(),
1437 hypercore_account_equity: HashMap::new(),
1438 hypercore_equity_timestamps: HashMap::new(),
1439 perp_positions: HashMap::new(),
1440 spot_prices: HashMap::new(),
1441 iv_surfaces: HashMap::new(),
1442 iv_source_timestamps: HashMap::new(),
1443 instrument_trading_modes: HashMap::new(),
1444 mmp_state: HashMap::new(),
1445 liquidation_states: HashMap::new(),
1446 wallet_margin_modes: HashMap::new(),
1447 mmp_enabled: HashMap::new(),
1448 wallet_trading_limits: HashMap::new(),
1449 default_trading_limits: Default::default(),
1450 wallet_tiers: HashMap::new(),
1451 rsm_signer_nonces: HashMap::new(),
1452 };
1453
1454 let payload = rmp_serde::to_vec(&legacy).expect("serialize pre-append legacy v2 snapshot");
1455 let crc = SNAPSHOT_CRC.checksum(&payload);
1456 let mut data = payload;
1457 data.extend_from_slice(&crc.to_le_bytes());
1458 std::fs::write(&path, &data).expect("write legacy snapshot");
1459
1460 let loaded = read_snapshot(&path)
1461 .expect("pre-append legacy v2 snapshot should migrate")
1462 .expect("legacy snapshot should be present after migration");
1463
1464 assert_eq!(loaded.version, SNAPSHOT_VERSION);
1465 assert_eq!(loaded.last_command_id, 407_647_670);
1466 assert_eq!(
1467 loaded.last_balance_update_seq, LEGACY_BALANCE_UPDATE_SEQ_MIGRATION_EPOCH,
1468 "pre-append legacy snapshots enter CALL-1707 migration at balance update sequence zero"
1469 );
1470 assert!(
1471 loaded
1472 .applied_deposit_source_event_hashes
1473 .contains(&source_event_hash),
1474 "fallback decoder must preserve source event hash state from the old positional slot"
1475 );
1476 assert_eq!(
1477 loaded.agent_authorizations.get(&owner),
1478 Some(&vec![(agent, Some(5555))]),
1479 "fallback decoder must not shift source hashes into agent authorizations"
1480 );
1481 }
1482
1483 #[test]
1484 fn snapshot_backwards_compat_deposit_source_hash_field_appended() {
1485 let dir = tempfile::tempdir().expect("create tempdir");
1486 let path = dir
1487 .path()
1488 .join("old-v3-before-deposit-source-hash.engine_snapshot");
1489 let owner = WalletAddress::from(alloy::primitives::Address::repeat_byte(2));
1490 let agent = WalletAddress::from(alloy::primitives::Address::repeat_byte(3));
1491 let mut agent_authorizations = HashMap::new();
1492 agent_authorizations.insert(owner, vec![(agent, Some(1234))]);
1493
1494 let legacy = SnapshotBeforeDepositSourceHashField {
1495 version: SNAPSHOT_VERSION,
1496 last_command_id: 622489,
1497 last_l2_seq: 16735,
1498 next_order_id: 8478,
1499 next_trade_id: 8486,
1500 expired_instruments: HashMap::new(),
1501 orderbooks: HashMap::new(),
1502 engine_positions: HashMap::new(),
1503 balance_ledger: HashMap::new(),
1504 last_balance_update_seq: 12,
1505 deposit_update_watermarks: HashMap::new(),
1506 agent_authorizations,
1507 agent_auth_loaded: true,
1508 nonce_watermarks: HashMap::new(),
1509 nonce_sets: HashMap::new(),
1510 hypercore_account_equity: HashMap::new(),
1511 hypercore_equity_timestamps: HashMap::new(),
1512 perp_positions: HashMap::new(),
1513 spot_prices: HashMap::new(),
1514 iv_surfaces: HashMap::new(),
1515 iv_source_timestamps: HashMap::new(),
1516 instrument_trading_modes: HashMap::new(),
1517 mmp_state: HashMap::new(),
1518 liquidation_states: HashMap::new(),
1519 wallet_margin_modes: HashMap::new(),
1520 mmp_enabled: HashMap::new(),
1521 wallet_trading_limits: HashMap::new(),
1522 default_trading_limits: Default::default(),
1523 wallet_tiers: HashMap::new(),
1524 rsm_signer_nonces: HashMap::new(),
1525 };
1526
1527 let payload = rmp_serde::to_vec(&legacy).expect("serialize legacy v2 snapshot");
1528 let crc = SNAPSHOT_CRC.checksum(&payload);
1529 let mut data = payload;
1530 data.extend_from_slice(&crc.to_le_bytes());
1531 std::fs::write(&path, &data).expect("write legacy v2 snapshot");
1532
1533 let loaded = read_snapshot(&path).expect("read").expect("should exist");
1534
1535 assert_eq!(loaded.last_command_id, 622489);
1536 assert_eq!(loaded.last_balance_update_seq, 12);
1537 assert_eq!(
1538 loaded.agent_authorizations.get(&owner),
1539 Some(&vec![(agent, Some(1234))])
1540 );
1541 assert!(loaded.applied_deposit_source_event_hashes.is_empty());
1542 }
1543
1544 #[test]
1545 fn snapshot_missing_balance_ledger_fails_fast() {
1546 let dir = tempfile::tempdir().expect("create tempdir");
1547 let path = dir.path().join("missing-balance-ledger.engine_snapshot");
1548
1549 let missing_balance = MissingBalanceLedgerSnapshot {
1550 version: SNAPSHOT_VERSION,
1551 last_command_id: 42,
1552 last_l2_seq: 99,
1553 next_order_id: 100,
1554 next_trade_id: 50,
1555 expired_instruments: HashMap::new(),
1556 orderbooks: HashMap::new(),
1557 engine_positions: HashMap::new(),
1558 last_balance_update_seq: 0,
1559 };
1560
1561 let payload = rmp_serde::to_vec(&missing_balance).expect("serialize missing balance");
1562 let crc = SNAPSHOT_CRC.checksum(&payload);
1563 let mut data = payload;
1564 data.extend_from_slice(&crc.to_le_bytes());
1565 std::fs::write(&path, &data).expect("write missing balance snapshot");
1566
1567 let err = read_snapshot(&path).expect_err("missing balance_ledger must fail");
1568 assert!(
1569 err.contains("failed to deserialize snapshot"),
1570 "unexpected error: {}",
1571 err
1572 );
1573 }
1574
1575 #[test]
1576 fn snapshot_hypercore_equity_without_watermarks_fails_fast() {
1577 let dir = tempfile::tempdir().expect("create tempdir");
1578 let path = dir
1579 .path()
1580 .join("missing-hypercore-equity-watermarks.engine_snapshot");
1581 let wallet = WalletAddress::from(alloy::primitives::Address::repeat_byte(0x13));
1582 let mut snapshot = make_test_snapshot();
1583 snapshot.version = SNAPSHOT_VERSION;
1584 snapshot.last_command_id = 42;
1585 snapshot.last_l2_seq = 99;
1586 snapshot.next_order_id = 100;
1587 snapshot.next_trade_id = 50;
1588 snapshot.balance_ledger.clear();
1589 snapshot.orderbooks.clear();
1590 snapshot.engine_positions.clear();
1591 snapshot.hypercore_account_equity.clear();
1592 snapshot.hypercore_equity_timestamps.clear();
1593 snapshot
1594 .hypercore_account_equity
1595 .insert(wallet, Decimal::new(5000, 0));
1596
1597 let payload = serialize_snapshot_without_hypercore_equity_timestamps(&snapshot);
1598 let crc = SNAPSHOT_CRC.checksum(&payload);
1599 let mut data = payload;
1600 data.extend_from_slice(&crc.to_le_bytes());
1601 std::fs::write(&path, &data).expect("write missing watermarks snapshot");
1602
1603 let err = read_snapshot(&path).expect_err("missing equity watermarks must fail");
1604 assert!(
1605 err.contains("HyperCore equity entries without timestamp watermarks")
1606 || err.contains("failed to deserialize snapshot"),
1607 "unexpected error: {}",
1608 err
1609 );
1610 }
1611
1612 #[test]
1613 fn legacy_v1_snapshot_hypercore_equity_without_watermarks_migrates_zero_watermarks() {
1614 let dir = tempfile::tempdir().expect("create tempdir");
1615 let path = dir
1616 .path()
1617 .join("legacy-v1-hypercore-equity.engine_snapshot");
1618 let wallet = WalletAddress::from(alloy::primitives::Address::repeat_byte(0x14));
1619 let mut snapshot = make_test_snapshot();
1620 snapshot.version = LEGACY_SNAPSHOT_VERSION_V1;
1621 snapshot.last_command_id = 42;
1622 snapshot.last_l2_seq = 99;
1623 snapshot.next_order_id = 100;
1624 snapshot.next_trade_id = 50;
1625 snapshot.balance_ledger.clear();
1626 snapshot.orderbooks.clear();
1627 snapshot.engine_positions.clear();
1628 snapshot.hypercore_account_equity.clear();
1629 snapshot.hypercore_equity_timestamps.clear();
1630 snapshot
1631 .hypercore_account_equity
1632 .insert(wallet, Decimal::new(5000, 0));
1633
1634 let payload = serialize_snapshot_without_hypercore_equity_timestamps(&snapshot);
1635 let crc = SNAPSHOT_CRC.checksum(&payload);
1636 let mut data = payload;
1637 data.extend_from_slice(&crc.to_le_bytes());
1638 std::fs::write(&path, &data).expect("write legacy snapshot");
1639
1640 let loaded = read_snapshot(&path)
1641 .expect("legacy snapshot should migrate")
1642 .expect("legacy snapshot should be present after migration");
1643
1644 assert_eq!(loaded.version, SNAPSHOT_VERSION);
1645 assert_eq!(
1646 loaded.hypercore_equity_timestamps.get(&wallet),
1647 Some(&0),
1648 "temporary legacy migration seeds missing HyperCore equity timestamps at zero"
1649 );
1650 }
1651}