Skip to main content

hypercall/rsm/unified_engine/
recovery.rs

1//! Replay and recovery logic for UnifiedEngine.
2
3use super::*;
4use crate::engine_enums_ext::ReplayCommandExt;
5use crate::rsm::apply::{CommandEnvelope, EngineCommand};
6use hypercall_db::JournalReplayReader;
7use std::collections::BTreeMap;
8
9type ReplayFillEvent = (hypercall_types::Fill, hypercall_types::FillAccounting);
10type ReplayFillEventsByCommand = BTreeMap<i64, Vec<ReplayFillEvent>>;
11
12#[cfg(test)]
13fn test_source_hash(byte: u8) -> alloy::primitives::FixedBytes<32> {
14    alloy::primitives::FixedBytes::from([byte; 32])
15}
16
17fn replay_state_command_type(command_type: &str) -> Option<crate::nats::CommandType> {
18    match command_type {
19        "ExpireMarket" => Some(crate::nats::CommandType::MarketAction),
20        "TickExpiry" => Some(crate::nats::CommandType::TickExpiry),
21        "PriceUpdate" => Some(crate::nats::CommandType::PriceUpdate),
22        "IvUpdate" => Some(crate::nats::CommandType::IvUpdate),
23        "LiquidationState" => Some(crate::nats::CommandType::LiquidationState),
24        "TierUpdate" => Some(crate::nats::CommandType::TierUpdate),
25        "HypercorePositionUpdate" => Some(crate::nats::CommandType::HypercorePositionUpdate),
26        "MmpConfigUpdate" => Some(crate::nats::CommandType::MmpConfigUpdate),
27        "TradingModeUpdate" => Some(crate::nats::CommandType::TradingModeUpdate),
28        "DepositUpdate" => Some(crate::nats::CommandType::DepositUpdate),
29        "OptionDepositUpdate" => Some(crate::nats::CommandType::OptionDepositUpdate),
30        "OptionWithdrawalUpdate" => Some(crate::nats::CommandType::OptionWithdrawalUpdate),
31        "CashWithdrawalUpdate" => Some(crate::nats::CommandType::CashWithdrawalUpdate),
32        "LiquidationBonusUpdate" => Some(crate::nats::CommandType::LiquidationBonusUpdate),
33        "ApproveAgent" => Some(crate::nats::CommandType::ApproveAgent),
34        "RevokeAgent" => Some(crate::nats::CommandType::RevokeAgent),
35        "NonceAdvance" => Some(crate::nats::CommandType::NonceAdvance),
36        "HypercoreEquityUpdate" => Some(crate::nats::CommandType::HypercoreEquityUpdate),
37        "SetPmSettlementPoolConfig" => Some(crate::nats::CommandType::SetPmSettlementPoolConfig),
38        "RecordPmVaultDeposit" => Some(crate::nats::CommandType::RecordPmVaultDeposit),
39        "RequestPmVaultWithdrawal" => Some(crate::nats::CommandType::RequestPmVaultWithdrawal),
40        "AccruePmSettlementInterest" => Some(crate::nats::CommandType::AccruePmSettlementInterest),
41        "ApplyPmSettlementRepayment" => Some(crate::nats::CommandType::ApplyPmSettlementRepayment),
42        "JournalPmRecoveryPlan" => Some(crate::nats::CommandType::JournalPmRecoveryPlan),
43        "MarkPmRecoveryActionSubmitted" => {
44            Some(crate::nats::CommandType::MarkPmRecoveryActionSubmitted)
45        }
46        "ResolvePmRecoveryAction" => Some(crate::nats::CommandType::ResolvePmRecoveryAction),
47        _ => None,
48    }
49}
50
51fn decode_state_command_for_replay(cmd: &hypercall_db::ReplayCommand) -> Option<EngineCommand> {
52    let command_type = replay_state_command_type(&cmd.command_type)?;
53    let command_wire_version = replay_command_wire_version(&cmd.command_data);
54    Some(
55        crate::nats::deserialize::deserialize_command_for_replay(
56            command_type,
57            command_wire_version,
58            &cmd.command_data,
59        )
60        .unwrap_or_else(|e| {
61            panic!(
62                "CRITICAL_FAILURE: Failed to decode {} for replay \
63                     (command_id={}, command_data_len={}): {}",
64                cmd.command_type,
65                cmd.command_id,
66                cmd.command_data.len(),
67                e
68            )
69        }),
70    )
71}
72
73fn replay_command_wire_version(command_data: &[u8]) -> u8 {
74    if replay_command_data_is_named_payload(command_data) {
75        crate::nats::COMMAND_WIRE_VERSION_V1
76    } else {
77        crate::nats::LEGACY_COMMAND_WIRE_VERSION
78    }
79}
80
81fn replay_command_data_is_named_payload(command_data: &[u8]) -> bool {
82    command_data.first() == Some(&hypercall_types::WIRE_FORMAT_VERSION)
83        && command_data
84            .get(1)
85            .is_some_and(|marker| matches!(*marker, 0x80..=0x8f | 0xde | 0xdf))
86}
87
88fn replay_envelope_timestamp(command: &EngineCommand) -> u64 {
89    match command {
90        EngineCommand::PriceUpdate { timestamp_ms, .. }
91        | EngineCommand::IvUpdate { timestamp_ms, .. }
92        | EngineCommand::HypercorePositionUpdate { timestamp_ms, .. }
93        | EngineCommand::TradingModeUpdate { timestamp_ms, .. }
94        | EngineCommand::DepositUpdate { timestamp_ms, .. }
95        | EngineCommand::OptionDepositUpdate { timestamp_ms, .. }
96        | EngineCommand::OptionWithdrawalUpdate { timestamp_ms, .. }
97        | EngineCommand::CashWithdrawalUpdate { timestamp_ms, .. }
98        | EngineCommand::LiquidationBonusUpdate { timestamp_ms, .. }
99        | EngineCommand::ApproveAgent { timestamp_ms, .. }
100        | EngineCommand::RevokeAgent { timestamp_ms, .. }
101        | EngineCommand::NonceAdvance { timestamp_ms, .. }
102        | EngineCommand::HypercoreEquityUpdate { timestamp_ms, .. } => *timestamp_ms,
103        EngineCommand::SetPmSettlementPoolConfig(command) => command.timestamp_ms,
104        EngineCommand::RecordPmVaultDeposit(command) => command.timestamp_ms,
105        EngineCommand::RequestPmVaultWithdrawal(command) => command.timestamp_ms,
106        EngineCommand::AccruePmSettlementInterest(command) => command.timestamp_ms,
107        EngineCommand::ApplyPmSettlementRepayment(command) => command.timestamp_ms,
108        EngineCommand::JournalPmRecoveryPlan(command) => command.timestamp_ms,
109        EngineCommand::MarkPmRecoveryActionSubmitted(command) => command.timestamp_ms,
110        EngineCommand::ResolvePmRecoveryAction(command) => command.timestamp_ms,
111        EngineCommand::MarketAction(command) => command.message.timestamp,
112        EngineCommand::TickExpiry { now_ms, .. } => *now_ms,
113        EngineCommand::LiquidationState(message) => message.timestamp,
114        _ => 0,
115    }
116}
117
118pub(crate) struct StartupExpiredOrderCancel {
119    pub order_id: u64,
120    pub wallet: WalletAddress,
121    pub symbol: String,
122    pub price: Decimal,
123    pub size: Decimal,
124    pub side: hypercall_types::Side,
125    pub tif: hypercall_types::TimeInForce,
126    pub is_perp: bool,
127    pub underlying: Option<String>,
128    pub reduce_only: Option<bool>,
129    pub nonce: Option<u64>,
130    pub signature: Option<String>,
131    pub mmp_enabled: bool,
132    pub filled_size: Decimal,
133    pub client_id: Option<String>,
134    pub timestamp: u64,
135}
136
137impl UnifiedEngine {
138    pub(super) fn load_markets_from_db(&mut self) {
139        if let Some(ref handler) = self.ctx.db {
140            info!("Loading instruments from database...");
141            match handler.get_all_instruments_sync() {
142                Ok(instruments) => {
143                    info!("Loading {} instruments from database", instruments.len());
144                    let mut loaded_count = 0;
145                    let mut expired_on_startup = 0;
146                    let mut scheduled_for_expiry = 0;
147                    let mut startup_pending_symbols: Vec<String> = Vec::new();
148
149                    // Startup expiry classification. A clock failure must not
150                    // silently classify every expired instrument as live, so
151                    // unix_now_secs panics instead of falling back.
152                    let now = crate::shared::clock::unix_now_secs();
153
154                    for instrument in instruments {
155                        // Parse option type
156                        let option_type = instrument.option_type;
157
158                        // Rebuild expiry maps for restart-safety
159                        // Convert YYYYMMDD expiry to Unix timestamp
160                        let expiry_timestamp = crate::rsm::margin_manager::expiry_date_to_timestamp(
161                            &instrument.underlying,
162                            instrument.expiry as u64,
163                        );
164
165                        if expiry_timestamp <= now {
166                            // Already expired at startup, always block trading in-memory.
167                            self.ctx
168                                .expired_instruments
169                                .insert(instrument.id.clone(), true);
170                            let trading_mode = hypercall_types::TradingModes::from_db_str(
171                                &instrument.trading_mode,
172                            );
173                            self.ctx
174                                .instrument_trading_modes
175                                .insert(instrument.id.clone(), trading_mode);
176                            expired_on_startup += 1;
177
178                            match instrument.status {
179                                hypercall_types::api_models::InstrumentStatus::Active => {
180                                    startup_pending_symbols.push(instrument.id.clone());
181                                    warn!(
182                                        "Instrument {} expired while offline (expiry_ts={}) with status Active - \
183                                         queuing startup DB transition to EXPIRED_PENDING_PRICE for settlement retry.",
184                                        instrument.id, expiry_timestamp
185                                    );
186                                }
187                                hypercall_types::api_models::InstrumentStatus::ExpiredPendingPrice => {
188                                    info!(
189                                        "Instrument {} expired while offline (expiry_ts={}) and already ExpiredPendingPrice - \
190                                         settlement retry will resume from persisted state.",
191                                        instrument.id, expiry_timestamp
192                                    );
193                                }
194                                hypercall_types::api_models::InstrumentStatus::Settled => {
195                                    debug!(
196                                        "Instrument {} expired while offline (expiry_ts={}) and already Settled - \
197                                         no startup status transition needed.",
198                                        instrument.id, expiry_timestamp
199                                    );
200                                }
201                            }
202
203                            // Emit metric for monitoring
204                            metrics::gauge!("ht_settlement_instruments_pending_price")
205                                .increment(1.0);
206                        } else {
207                            // Not yet expired - create orderbook and schedule for expiry
208                            let orderbook = OrderBook::with_symbol(
209                                instrument.expiry as u64,
210                                instrument.strike,
211                                option_type,
212                                instrument.id.clone(),
213                            );
214
215                            self.ctx.orderbooks.insert(instrument.id.clone(), orderbook);
216                            let trading_mode = hypercall_types::TradingModes::from_db_str(
217                                &instrument.trading_mode,
218                            );
219                            self.ctx
220                                .instrument_trading_modes
221                                .insert(instrument.id.clone(), trading_mode);
222                            debug!("Loaded orderbook for symbol: {}", instrument.id);
223                            loaded_count += 1;
224
225                            // Schedule for expiry
226                            self.expiry_manager
227                                .expiry_schedules
228                                .entry(expiry_timestamp)
229                                .or_default()
230                                .push(instrument.id.clone());
231                            scheduled_for_expiry += 1;
232                        }
233                    }
234
235                    if !startup_pending_symbols.is_empty() {
236                        if let Err(e) = handler
237                            .transition_active_instruments_to_expired_pending_sync(
238                                &startup_pending_symbols,
239                            )
240                        {
241                            panic!(
242                                "CRITICAL_FAILURE: Failed startup transition of expired ACTIVE instruments to EXPIRED_PENDING_PRICE for {:?}: {}. \
243                                 Persisted state must match in-memory expiry state before continuing.",
244                                startup_pending_symbols, e
245                            );
246                        }
247                        info!(
248                            "Startup transitioned {} expired ACTIVE instruments to EXPIRED_PENDING_PRICE",
249                            startup_pending_symbols.len()
250                        );
251                    }
252
253                    // Observability: log rebuild summary
254                    info!(
255                        "Expiry rebuild complete: {} instruments loaded, {} expired at startup, {} transitioned to EXPIRED_PENDING_PRICE, {} scheduled for expiry",
256                        loaded_count,
257                        expired_on_startup,
258                        startup_pending_symbols.len(),
259                        scheduled_for_expiry
260                    );
261
262                    // Emit metrics for observability
263                    metrics::gauge!("ht_expiry_rebuild_loaded").set(loaded_count as f64);
264                    metrics::gauge!("ht_expiry_rebuild_pending_price")
265                        .set(expired_on_startup as f64);
266                    metrics::gauge!("ht_expiry_rebuild_scheduled").set(scheduled_for_expiry as f64);
267
268                    info!(
269                        "Successfully loaded {} orderbooks from database",
270                        loaded_count
271                    );
272
273                    if loaded_count != self.ctx.orderbooks.len() {
274                        warn!(
275                            "Mismatch: loaded {} instruments but have {} orderbooks",
276                            loaded_count,
277                            self.ctx.orderbooks.len()
278                        );
279                    }
280
281                    info!(
282                        "UnifiedEngine sync status remains CatchingUp (journal replay and post-startup reconciliation pending)"
283                    );
284                }
285                Err(e) => {
286                    error!("FATAL: Failed to load instruments from database: {}", e);
287                    panic!(
288                        "Cannot start UnifiedEngine without loading instruments: {}",
289                        e
290                    );
291                }
292            }
293        } else {
294            // Allow running without diesel_handler if no database is configured
295            info!("No diesel_handler configured, skipping market loading from database");
296        }
297    }
298
299    /// Load WAL checkpoint metadata and prepare replay bounds.
300    pub(super) fn load_orderbook_snapshots(&mut self) {
301        if self.ctx.db.is_none() {
302            self.replay_checkpoint = hypercall_journal::checkpoint::WalCheckpointMetadata::ZERO;
303            self.ctx
304                .l2_update_seq
305                .store(self.replay_checkpoint.last_l2_seq, Ordering::SeqCst);
306            return;
307        }
308
309        let checkpoint_path =
310            hypercall_journal::checkpoint::checkpoint_path_from_config(self.wal_path.as_ref());
311        let checkpoint = hypercall_journal::checkpoint::read_checkpoint(&checkpoint_path)
312            .unwrap_or_else(|e| {
313                panic!(
314                    "CRITICAL_FAILURE: Failed to load WAL checkpoint metadata from {}: {}",
315                    checkpoint_path.display(),
316                    e
317                )
318            });
319
320        self.replay_checkpoint = checkpoint;
321        self.ctx
322            .l2_update_seq
323            .store(checkpoint.last_l2_seq, Ordering::SeqCst);
324        info!(
325            "Loaded WAL checkpoint for recovery: wal_offset={}, last_command_id={}, last_l2_seq={}",
326            checkpoint.wal_offset, checkpoint.last_command_id, checkpoint.last_l2_seq
327        );
328    }
329
330    /// Check restored orderbooks for crosses and log warnings.
331    /// Crossed orderbooks after restore indicate a bug in the matching engine
332    /// or snapshot corruption - the matching engine should never allow bid >= ask.
333    pub(super) fn check_restored_orderbooks_for_crosses(&self) {
334        let mut crossed_count = 0;
335        let mut crossed_details = Vec::new();
336
337        for (symbol, orderbook) in &self.ctx.orderbooks {
338            if orderbook.is_crossed() {
339                crossed_count += 1;
340                let best_bid = orderbook.get_best_bid();
341                let best_ask = orderbook.get_best_ask();
342                crossed_details.push(format!("{}: bid={:?} ask={:?}", symbol, best_bid, best_ask));
343            }
344        }
345
346        if crossed_count > 0 {
347            error!(
348                "🚨 CRITICAL: {} orderbooks have CROSSED state after restore! \
349                This indicates a matching engine bug - orders should have executed. \
350                Details: {:?}",
351                crossed_count, crossed_details
352            );
353            panic!(
354                "CRITICAL_FAILURE: {} crossed orderbooks after restore: {:?}",
355                crossed_count, crossed_details
356            );
357        }
358    }
359
360    /// Replay commands from the journal to reconstruct orderbook state.
361    ///
362    /// Recovery runs in two bounded windows:
363    /// 1) Base reconstruction: commands (0, checkpoint.last_command_id]
364    /// 2) Delta replay: commands (checkpoint.last_command_id, +inf)
365    pub(super) fn replay_commands_from_journal(&mut self) {
366        let snapshot_seq = self.ctx.l2_update_seq.load(Ordering::SeqCst);
367        let checkpoint_command_id = self.replay_checkpoint.last_command_id;
368        const REPLAY_CHUNK_SIZE: i64 = 5_000;
369
370        if self.ctx.db.is_none() {
371            debug!("No diesel handler, skipping journal replay");
372            return;
373        }
374
375        let mut replay_count = 0;
376        let mut max_replayed_order_id: u64 = 0;
377        if checkpoint_command_id > 0 && !self.snapshot_loaded {
378            info!(
379                "Recovery base reconstruction: replaying journal commands up to checkpoint command_id {}",
380                checkpoint_command_id
381            );
382            self.replay_command_window(
383                0,
384                Some(checkpoint_command_id),
385                REPLAY_CHUNK_SIZE,
386                &mut replay_count,
387                &mut max_replayed_order_id,
388            );
389        } else if self.snapshot_loaded {
390            info!(
391                "Skipping base reconstruction — engine state snapshot loaded (last_command_id={})",
392                checkpoint_command_id
393            );
394        }
395
396        info!(
397            "Recovery delta replay: replaying commands after checkpoint command_id {}",
398            checkpoint_command_id
399        );
400        self.replay_command_window(
401            checkpoint_command_id,
402            None,
403            REPLAY_CHUNK_SIZE,
404            &mut replay_count,
405            &mut max_replayed_order_id,
406        );
407
408        // Pass 4 (ghost order reconciliation) is deferred to
409        // run_post_startup_reconciliation(), which runs ~5s after the engine
410        // loop starts. This gives the journal batcher time to materialize
411        // pending terminal statuses into order_infos.
412        // Running the expensive get_terminal_order_ids_sync query here
413        // (before flush completes) would produce stale results
414        // AND double the startup time by querying the same large table twice.
415
416        // After replay, ensure next_order_id is higher than any order_id seen
417        // in the replayed commands. Without this, the engine could reassign order IDs
418        // that were used by replayed commands (especially rejected/filled/cancelled
419        // orders that don't appear in the orderbook snapshot), causing the MM to
420        // detect "phantom regressions" where a terminal order_id reappears as open.
421        if max_replayed_order_id >= self.ctx.next_order_id {
422            let old = self.ctx.next_order_id;
423            self.ctx.next_order_id = max_replayed_order_id + 1;
424            info!(
425                "Advanced next_order_id after replay: {} -> {} (max replayed order_id was {})",
426                old, self.ctx.next_order_id, max_replayed_order_id
427            );
428        }
429
430        // After replay, update the engine's L2 sequence to match the database.
431        // The replayed commands already emitted L2 updates with sequences, so
432        // we query the max sequence from the events table.
433        // Re-borrow handler for the final query.
434        let max_seq_result = {
435            let Some(ref handler) = self.ctx.db else {
436                warn!(
437                    "No diesel handler for max L2 seq query. L2 seq remains at {}",
438                    snapshot_seq
439                );
440                return;
441            };
442            let replay: &dyn JournalReplayReader = handler;
443            replay.get_max_l2_seq_from_events_sync()
444        };
445
446        match max_seq_result {
447            Ok(max_seq) => {
448                if max_seq > snapshot_seq {
449                    self.ctx.l2_update_seq.store(max_seq, Ordering::SeqCst);
450                    info!(
451                        "Journal replay complete: {} commands replayed, L2 seq {} -> {}",
452                        replay_count, snapshot_seq, max_seq
453                    );
454                } else {
455                    info!(
456                        "Journal replay complete: {} commands replayed, L2 seq unchanged at {}",
457                        replay_count, snapshot_seq
458                    );
459                }
460            }
461            Err(e) => {
462                panic!(
463                    "CRITICAL_FAILURE: Failed to get max L2 seq after replay: {}. \
464                     Cannot safely continue with stale sequence {}.",
465                    e, snapshot_seq
466                );
467            }
468        }
469
470        // Sync L2 snapshot baselines for all orderbooks after replay.
471        //
472        // During replay, orders are added/removed from orderbooks but
473        // emit_orderbook_events() is never called, so last_bid_snapshot
474        // and last_ask_snapshot remain empty. Without this sync, the
475        // first cancel after restart produces an empty L2 diff
476        // (empty→empty), and the CancelOrder command gets no L2Update
477        // event in engine_events — making it invisible to the replay
478        // query on the NEXT restart (JOIN on event_type = 'L2Update').
479        for ob in self.ctx.orderbooks.values_mut() {
480            ob.sync_l2_snapshot_baseline();
481        }
482
483        self.flush_startup_expired_order_cancels();
484    }
485
486    fn replay_command_window(
487        &mut self,
488        start_command_id: i64,
489        end_command_id: Option<i64>,
490        chunk_size: i64,
491        replay_count: &mut usize,
492        max_replayed_order_id: &mut u64,
493    ) {
494        let mut cursor = start_command_id;
495        loop {
496            let commands = {
497                let replay: &dyn JournalReplayReader = self
498                    .ctx
499                    .db
500                    .as_ref()
501                    .expect("diesel handler required for replay");
502                replay
503                    .get_replay_commands_after_command_id_sync(cursor, end_command_id, chunk_size)
504                    .unwrap_or_else(|e| {
505                        panic!(
506                            "CRITICAL_FAILURE: Failed to query replay commands after command_id {} \
507                             (end={:?}, limit={}): {}",
508                            cursor, end_command_id, chunk_size, e
509                        )
510                    })
511            };
512
513            if commands.is_empty() {
514                break;
515            }
516
517            let chunk_end_command_id = commands
518                .last()
519                .expect("replay command chunk unexpectedly empty")
520                .command_id;
521            if chunk_end_command_id <= cursor {
522                panic!(
523                    "CRITICAL_FAILURE: Replay cursor did not advance (cursor={}, chunk_end={})",
524                    cursor, chunk_end_command_id
525                );
526            }
527
528            let mut fill_events_by_command =
529                self.replay_fill_events_by_command_for_window(cursor, chunk_end_command_id);
530            for cmd in commands {
531                self.replay_command_row(&cmd, replay_count, max_replayed_order_id);
532                if let Some(fill_events) = fill_events_by_command.remove(&cmd.command_id) {
533                    self.apply_fill_events_for_replay(&fill_events);
534                }
535            }
536            if !fill_events_by_command.is_empty() {
537                let command_ids = fill_events_by_command.keys().copied().collect::<Vec<_>>();
538                panic!(
539                    "CRITICAL_FAILURE: Fill replay found OrderFilled events for command_ids {:?} \
540                     without matching replay commands in window ({}..={}). Persisted journal is inconsistent.",
541                    command_ids, cursor, chunk_end_command_id
542                );
543            }
544
545            self.replay_order_update_events_for_window(cursor, chunk_end_command_id);
546            cursor = chunk_end_command_id;
547        }
548    }
549
550    fn replay_command_row(
551        &mut self,
552        cmd: &hypercall_db::ReplayCommand,
553        replay_count: &mut usize,
554        max_replayed_order_id: &mut u64,
555    ) {
556        // RFQ command rows: replay is event-driven, not command-driven. The
557        // journaled OrderFilled events are picked up by the fill-event replay
558        // pass. Replay must not re-run matching or validation for an accepted
559        // RFQ, but it must restore any nonces burned by the original command.
560        if cmd.command_type == "RfqExecute" {
561            if let Ok(rfq_cmd) = rmp_serde::from_slice::<hypercall_runtime_api::RfqExecuteCommand>(
562                &cmd.command_data[1..],
563            ) {
564                if let Some(nonce) = rfq_cmd.taker_nonce {
565                    let nonce_signer = rfq_cmd.taker_submit_signer();
566                    self.ctx
567                        .nonce_sets
568                        .entry(nonce_signer)
569                        .or_insert_with(|| {
570                            hypercall_engine::BoundedNonceSet::new(
571                                hypercall_engine::nonce::DEFAULT_NONCE_SET_CAPACITY,
572                            )
573                        })
574                        .insert(nonce);
575                }
576                if let Some(nonce) = rfq_cmd.taker_accept_nonce {
577                    let nonce_signer = rfq_cmd.taker_accept_signer();
578                    self.ctx
579                        .nonce_sets
580                        .entry(nonce_signer)
581                        .or_insert_with(|| {
582                            hypercall_engine::BoundedNonceSet::new(
583                                hypercall_engine::nonce::DEFAULT_NONCE_SET_CAPACITY,
584                            )
585                        })
586                        .insert(nonce);
587                }
588            }
589            *replay_count += 1;
590            return;
591        }
592
593        if let Some(command) = decode_state_command_for_replay(cmd) {
594            if Self::is_expiry_replay_command(&command) {
595                self.replay_expiry_command_row(cmd, command);
596                *replay_count += 1;
597                return;
598            }
599            let disposition =
600                crate::rsm::restart_components::replay_disposition_for_command(&command);
601            assert_eq!(
602                disposition,
603                hypercall_recovery::ReplayDisposition::Applied,
604                "CRITICAL_FAILURE: decoded restart-owned command {} has no component replay owner",
605                cmd.command_type
606            );
607            if self.apply_legacy_agent_auth_replay_command(&command) {
608                *replay_count += 1;
609                return;
610            }
611            let timestamp_ms = replay_envelope_timestamp(&command);
612            let output = self
613                .apply(CommandEnvelope::new(timestamp_ms, command))
614                .unwrap_or_else(|e| {
615                    panic!(
616                        "CRITICAL_FAILURE: Replay failed for {} command {} \
617                         (request_id={}): {}",
618                        cmd.command_type, cmd.command_id, cmd.request_id, e
619                    )
620                });
621            self.apply_replayed_expiry_effects(&output.expiry_effects)
622                .unwrap_or_else(|e| {
623                    panic!(
624                        "CRITICAL_FAILURE: Replay failed to apply expiry runtime effects for {} \
625                         command {} (request_id={}): {}",
626                        cmd.command_type, cmd.command_id, cmd.request_id, e
627                    )
628                });
629            self.apply_pm_settlement_projection_effects_sync(
630                &output.pm_settlement_effects,
631                &cmd.request_id,
632            )
633            .unwrap_or_else(|e| {
634                panic!(
635                    "CRITICAL_FAILURE: Replay failed to apply PM settlement projection effects for {} \
636                     command {} (request_id={}): {}",
637                    cmd.command_type, cmd.command_id, cmd.request_id, e
638                )
639            });
640            *replay_count += 1;
641            return;
642        }
643
644        let decoded_response = cmd.decode_response();
645        if let Some(resp) = decoded_response.as_ref() {
646            if let Some(order_id) = resp.order_id {
647                *max_replayed_order_id = (*max_replayed_order_id).max(order_id);
648            }
649        }
650
651        let order_action = cmd.decode_command();
652
653        match self.process_order_for_replay(&order_action, decoded_response.as_ref()) {
654            Ok(_) => {
655                *replay_count += 1;
656                debug!(
657                    "Replayed command {} (request_id: {})",
658                    cmd.command_id, cmd.request_id
659                );
660            }
661            Err(e) => {
662                panic!(
663                    "CRITICAL_FAILURE: Replay failed for command {} (request_id: {}): {}",
664                    cmd.command_id, cmd.request_id, e
665                );
666            }
667        }
668    }
669
670    fn is_expiry_replay_command(command: &EngineCommand) -> bool {
671        matches!(
672            command,
673            EngineCommand::MarketAction(market_command)
674                if matches!(
675                    market_command.message.action,
676                    hypercall_types::MarketAction::ExpireMarket
677                )
678        ) || matches!(command, EngineCommand::TickExpiry { .. })
679    }
680
681    fn replay_expiry_command_row(
682        &mut self,
683        cmd: &hypercall_db::ReplayCommand,
684        command: EngineCommand,
685    ) {
686        let timestamp_ms = replay_envelope_timestamp(&command);
687        let output = self
688            .apply(CommandEnvelope::new(timestamp_ms, command))
689            .unwrap_or_else(|error| {
690                panic!(
691                    "CRITICAL_FAILURE: Replay failed for expiry command {} \
692                     (request_id={}): {}",
693                    cmd.command_id, cmd.request_id, error
694                )
695            });
696
697        self.apply_replayed_expiry_effects(&output.expiry_effects)
698            .unwrap_or_else(|error| {
699                panic!(
700                    "CRITICAL_FAILURE: Failed to apply replayed expiry effects while \
701                     replaying command {} (request_id={}): {}",
702                    cmd.command_id, cmd.request_id, error
703                )
704            });
705
706        self.apply_pm_settlement_projection_effects_sync(
707            &output.pm_settlement_effects,
708            &cmd.request_id,
709        )
710        .unwrap_or_else(|error| {
711            panic!(
712                "CRITICAL_FAILURE: Failed to apply replayed PM settlement projection effects while \
713                 replaying command {} (request_id={}): {}",
714                cmd.command_id, cmd.request_id, error
715            )
716        });
717
718        if !output.events.is_empty() {
719            self.startup_replayed_events
720                .push((cmd.request_id.clone(), output.events));
721        }
722    }
723
724    pub(super) fn apply_legacy_agent_auth_replay_command(
725        &mut self,
726        command: &EngineCommand,
727    ) -> bool {
728        // Temporary replay shim for WAL rows written before agent-auth commands
729        // required signed nonces. Remove this after the pre-nonce WAL retention
730        // window has expired and all restart replay ranges are guaranteed to
731        // contain only nonce-bearing ApproveAgent/RevokeAgent commands.
732        match command {
733            EngineCommand::ApproveAgent {
734                wallet,
735                agent,
736                expires_at_ms,
737                nonce: None,
738                ..
739            } => {
740                self.ctx
741                    .agent_authorizations
742                    .entry(*wallet)
743                    .or_default()
744                    .insert(*agent, *expires_at_ms);
745                debug!(
746                    wallet = %wallet,
747                    agent = %agent,
748                    expires_at_ms = ?expires_at_ms,
749                    "Replayed legacy ApproveAgent command without nonce"
750                );
751                true
752            }
753            EngineCommand::RevokeAgent {
754                wallet,
755                agent,
756                nonce: None,
757                ..
758            } => {
759                if let Some(agents) = self.ctx.agent_authorizations.get_mut(wallet) {
760                    agents.remove(agent);
761                    if agents.is_empty() {
762                        self.ctx.agent_authorizations.remove(wallet);
763                    }
764                }
765                debug!(
766                    wallet = %wallet,
767                    agent = %agent,
768                    "Replayed legacy RevokeAgent command without nonce"
769                );
770                true
771            }
772            _ => false,
773        }
774    }
775
776    fn replay_fill_events_by_command_for_window(
777        &mut self,
778        start_command_id: i64,
779        end_command_id: i64,
780    ) -> ReplayFillEventsByCommand {
781        let rows = {
782            let replay: &dyn JournalReplayReader = self
783                .ctx
784                .db
785                .as_ref()
786                .expect("diesel handler required for fill replay");
787            replay
788                .get_portfolio_events_for_command_range_sync(start_command_id + 1, end_command_id)
789                .unwrap_or_else(|e| {
790                    panic!(
791                        "CRITICAL_FAILURE: Failed to query OrderFilled events for replay window \
792                         ({}..={}): {}",
793                        start_command_id, end_command_id, e
794                    )
795                })
796        };
797        if rows.is_empty() {
798            return BTreeMap::new();
799        }
800
801        let mut fill_events_by_command: ReplayFillEventsByCommand = BTreeMap::new();
802        for (idx, row) in rows.into_iter().enumerate() {
803            if row.event_type != hypercall_db::EventType::OrderFilled {
804                continue;
805            }
806            fill_events_by_command
807                .entry(row.command_id)
808                .or_default()
809                .push(Self::decode_fill_event(&row.event_data, idx));
810        }
811        let max_fill_trade_id = fill_events_by_command
812            .values()
813            .flat_map(|fill_events| fill_events.iter().map(|(fill, _)| fill.trade_id))
814            .max()
815            .unwrap_or(0);
816        if max_fill_trade_id >= self.ctx.next_trade_id {
817            let old = self.ctx.next_trade_id;
818            self.ctx.next_trade_id = max_fill_trade_id + 1;
819            info!(
820                "Advanced next_trade_id after replay: {} -> {} (max replayed trade_id was {})",
821                old, self.ctx.next_trade_id, max_fill_trade_id
822            );
823        }
824        fill_events_by_command
825    }
826
827    pub(super) fn apply_fill_events_for_replay(
828        &mut self,
829        fill_events: &[(hypercall_types::Fill, hypercall_types::FillAccounting)],
830    ) {
831        if fill_events.is_empty() {
832            return;
833        }
834
835        let fills: Vec<_> = fill_events.iter().map(|(fill, _)| fill.clone()).collect();
836        self.apply_fills_for_replay(&fills);
837
838        // Update engine_positions from replayed fills so margin checks
839        // and expiry settlement see the correct position state after restart.
840        for (fill, accounting) in fill_events {
841            for (wallet, delta) in [
842                (fill.taker_wallet_address, accounting.taker_net_cash_delta),
843                (fill.maker_wallet_address, accounting.maker_net_cash_delta),
844            ] {
845                if delta == rust_decimal::Decimal::ZERO {
846                    continue;
847                }
848                let balance_after = self.ctx.balance_ledger.balance(&wallet) + delta;
849                let update = hypercall_types::BalanceUpdate {
850                    balance_update_seq: self.ctx.balance_ledger.next_balance_update_seq(),
851                    wallet,
852                    delta,
853                    balance_after,
854                    reason: hypercall_types::BalanceUpdateReason::OptionFillPremium,
855                    reference_id: Some(fill.trade_id.to_string()),
856                    source_command_id: None,
857                    timestamp_ms: fill.timestamp,
858                };
859                self.ctx
860                    .balance_ledger
861                    .apply_balance_update(&update)
862                    .unwrap_or_else(|error| {
863                        panic!(
864                            "CRITICAL: failed to apply replayed fill balance update: {}",
865                            error
866                        )
867                    });
868            }
869
870            use crate::rsm::engine_deps::apply_fill_to_positions;
871            use hypercall_types::to_human_readable_decimal;
872            let human_size = to_human_readable_decimal(&fill.symbol, fill.size);
873            let is_buy = matches!(fill.taker_side, hypercall_types::Side::Buy);
874            let taker_signed = if is_buy { human_size } else { -human_size };
875            apply_fill_to_positions(
876                &mut self.ctx.engine_positions,
877                fill.taker_wallet_address,
878                fill.symbol.clone(),
879                taker_signed,
880                fill.price,
881            );
882            apply_fill_to_positions(
883                &mut self.ctx.engine_positions,
884                fill.maker_wallet_address,
885                fill.symbol.clone(),
886                -taker_signed,
887                fill.price,
888            );
889        }
890    }
891
892    fn replay_order_update_events_for_window(
893        &mut self,
894        start_command_id: i64,
895        end_command_id: i64,
896    ) {
897        let rows = {
898            let replay: &dyn JournalReplayReader = self
899                .ctx
900                .db
901                .as_ref()
902                .expect("diesel handler required for order update replay");
903            replay
904                .get_order_update_events_for_command_range_sync(start_command_id, end_command_id)
905                .unwrap_or_else(|e| {
906                    panic!(
907                        "CRITICAL_FAILURE: Failed to query OrderUpdate events for replay window \
908                         ({}..={}): {}",
909                        start_command_id, end_command_id, e
910                    )
911                })
912        };
913        if rows.is_empty() {
914            return;
915        }
916
917        let updates = Self::decode_order_update_events(&rows);
918        self.apply_cancel_events_for_replay(&updates);
919    }
920
921    fn decode_fill_events(
922        raw_rows: &[Vec<u8>],
923    ) -> Vec<(hypercall_types::Fill, hypercall_types::FillAccounting)> {
924        raw_rows
925            .iter()
926            .enumerate()
927            .map(|(idx, raw)| Self::decode_fill_event(raw, idx))
928            .collect()
929    }
930
931    fn decode_fill_event(
932        raw: &[u8],
933        idx: usize,
934    ) -> (hypercall_types::Fill, hypercall_types::FillAccounting) {
935        match hypercall_types::EngineMessage::deserialize_from_wire(
936            crate::shared::topics::TOPIC_FILLS,
937            raw,
938        ) {
939            Ok(hypercall_types::EngineMessage::OrderFilled { fill, accounting }) => {
940                (fill, accounting)
941            }
942            Ok(other) => {
943                panic!(
944                    "CRITICAL_FAILURE: Expected OrderFilled event at index {} during fill replay, got {}. \
945                     Persisted data is corrupt; refusing to continue.",
946                    idx,
947                    other.type_name()
948                );
949            }
950            Err(e) => {
951                panic!(
952                    "CRITICAL_FAILURE: Failed to deserialize OrderFilled event at index {}: {}. \
953                     Persisted data is corrupt; refusing to continue.",
954                    idx, e
955                );
956            }
957        }
958    }
959
960    fn decode_order_update_events(
961        raw_rows: &[Vec<u8>],
962    ) -> Vec<hypercall_types::OrderUpdateMessage> {
963        raw_rows
964            .iter()
965            .enumerate()
966            .map(|(idx, raw)| {
967                if raw.len() < 2 {
968                    panic!(
969                        "CRITICAL_FAILURE: OrderUpdate event data too short at index {} ({} bytes). \
970                         Persisted data is corrupt.",
971                        idx,
972                        raw.len()
973                    );
974                }
975                let version = raw[0];
976                rmp_serde::from_slice::<hypercall_types::OrderUpdateMessage>(&raw[1..])
977                    .unwrap_or_else(|e| {
978                        panic!(
979                            "CRITICAL_FAILURE: Failed to deserialize OrderUpdate event at index {} \
980                             (version={}): {}. Persisted data is corrupt; refusing to continue.",
981                            idx, version, e
982                        )
983                    })
984            })
985            .collect()
986    }
987
988    /// Post-startup ghost order reconciliation.
989    ///
990    /// Runs ~5s after engine start, giving the journal batcher time to flush
991    /// pending terminal statuses to order_infos. Queries for any
992    /// orders in the orderbook that have a terminal status and removes them.
993    /// This is the sole safeguard against ghost orders from the outbox pattern
994    /// crash scenario (SIGKILL between journal fsync and outbox publish).
995    pub(super) fn run_post_startup_reconciliation(&mut self) {
996        let all_order_ids: Vec<i64> = self
997            .ctx
998            .orderbooks
999            .values()
1000            .flat_map(|ob| ob.get_all_order_ids())
1001            .map(|id| id as i64)
1002            .collect();
1003
1004        if all_order_ids.is_empty() {
1005            info!("Post-startup reconciliation: no orders in orderbook, skipping");
1006            return;
1007        }
1008
1009        if let Some(ref handler) = self.ctx.db {
1010            match handler.get_terminal_order_ids_sync(&all_order_ids) {
1011                Ok(terminal_ids) if !terminal_ids.is_empty() => {
1012                    let mut removed = 0;
1013                    for order_id in &terminal_ids {
1014                        let oid = *order_id as u64;
1015                        for ob in self.ctx.orderbooks.values_mut() {
1016                            if ob.has_order(oid) {
1017                                ob.cancel_order(oid);
1018                                removed += 1;
1019                                break;
1020                            }
1021                        }
1022                        self.ctx.order_index.remove_order_by_id(oid);
1023                    }
1024                    if removed > 0 {
1025                        warn!(
1026                            "Post-startup reconciliation: removed {} ghost orders that were \
1027                             terminal in order_infos but present in orderbook after replay.",
1028                            removed
1029                        );
1030                        // Publish updated snapshot so API reflects the corrected state
1031                        self.publish_snapshot();
1032                    }
1033                }
1034                Ok(_) => {
1035                    info!(
1036                        "Post-startup reconciliation: checked {} orders, all non-terminal — no ghost orders",
1037                        all_order_ids.len()
1038                    );
1039                }
1040                Err(e) => {
1041                    panic!(
1042                        "CRITICAL_FAILURE: Post-startup reconciliation query failed: {}. \
1043                         Ghost orders may remain in orderbook; refusing to serve live traffic.",
1044                        e
1045                    );
1046                }
1047            }
1048        }
1049    }
1050
1051    /// Process an order for replay purposes (no journaling, no response).
1052    ///
1053    /// This is used during startup to replay commands from the journal after
1054    /// loading a snapshot. The commands are already persisted, so we just need
1055    /// to apply their state changes.
1056    ///
1057    /// `response_json` is the original response from when this command was first
1058    /// processed. It tells us the final order state (filled, open, partially_filled,
1059    /// cancelled) so we can reconstruct the correct resting quantity without
1060    /// re-running matching.
1061    pub(super) fn process_order_for_replay(
1062        &mut self,
1063        msg: &OrderActionMessage,
1064        response: Option<&OrderUpdateMessage>,
1065    ) -> Result<(), String> {
1066        if let Some(nonce) = msg.info.nonce {
1067            let signer = msg.api_wallet_address.unwrap_or(msg.wallet);
1068            self.ctx
1069                .nonce_sets
1070                .entry(signer)
1071                .or_insert_with(|| {
1072                    hypercall_engine::BoundedNonceSet::new(
1073                        hypercall_engine::nonce::DEFAULT_NONCE_SET_CAPACITY,
1074                    )
1075                })
1076                .insert(nonce);
1077        }
1078        match msg.action {
1079            OrderAction::CreateOrder => self.replay_create_order(msg, response),
1080            OrderAction::CancelOrder => self.replay_cancel_order(msg),
1081            OrderAction::ReplaceOrder => self.replay_replace_order(msg, response),
1082        }
1083    }
1084
1085    /// Replay a create order command using the journal's response to determine
1086    /// what resting state to restore.
1087    ///
1088    /// Instead of re-running matching (which would require determinism of all
1089    /// inputs: timestamps, fees, oracle state, etc.), we read the response_json
1090    /// from the journal to determine the order's final status and filled_size.
1091    /// This tells us exactly what resting quantity belongs in the orderbook:
1092    ///
1093    /// - FILLED → don't add (fully consumed)
1094    /// - CANCELED / REJECTED → don't add
1095    /// - OPEN → add with full original quantity
1096    /// - PARTIALLY_FILLED → add with (original_size - filled_size)
1097    fn replay_create_order(
1098        &mut self,
1099        msg: &OrderActionMessage,
1100        response: Option<&OrderUpdateMessage>,
1101    ) -> Result<(), String> {
1102        let symbol = &msg.info.symbol;
1103
1104        // Perp orders are validated and sent to HyperCore; they never rest in
1105        // the internal orderbook. Skip replay entirely.
1106        if symbol.ends_with("-PERP") || msg.info.is_perp {
1107            return Ok(());
1108        }
1109
1110        // No orderbook for this symbol — the instrument may have expired while
1111        // offline, been removed, or the snapshot predates the instrument.
1112        // If the journal response shows a resting order (Open/PartiallyFilled),
1113        // we must cancel it in the DB; otherwise the persisted order state
1114        // retains a stale OPEN row with no engine-side counterpart.
1115        //
1116        // We intentionally do NOT gate on expired_instruments here: the snapshot
1117        // restore can overwrite that map with older state, so an instrument that
1118        // expired during downtime may not appear in it. The absence of an
1119        // orderbook is sufficient — any resting order without an orderbook is
1120        // orphaned.
1121        if !self.ctx.orderbooks.contains_key(symbol) {
1122            let resp = match response {
1123                Some(r) => r,
1124                None => {
1125                    return Err(format!(
1126                        "Replay create for missing-orderbook instrument {} has no response; \
1127                         cannot determine order state for reconciliation",
1128                        symbol
1129                    ));
1130                }
1131            };
1132
1133            match resp.status {
1134                OrderUpdateStatus::Open | OrderUpdateStatus::PartiallyFilled => {
1135                    let order_id = match (msg.info.order_id, resp.order_id) {
1136                        (Some(cmd_id), Some(resp_id)) if cmd_id != resp_id => {
1137                            return Err(format!(
1138                                "Replay create for missing-orderbook instrument {} has mismatched \
1139                                 order_id (command={}, response={})",
1140                                symbol, cmd_id, resp_id
1141                            ));
1142                        }
1143                        (Some(id), _) | (_, Some(id)) => id,
1144                        (None, None) => {
1145                            return Err(format!(
1146                                "Replay create for missing-orderbook instrument {} has no order_id \
1147                                 in command or response",
1148                                symbol
1149                            ));
1150                        }
1151                    };
1152                    let filled_size = match resp.status {
1153                        OrderUpdateStatus::PartiallyFilled => {
1154                            if resp.filled_size < dec!(0) || resp.filled_size >= msg.info.size {
1155                                return Err(format!(
1156                                    "Replay create for missing-orderbook instrument {} has invalid \
1157                                     partially-filled payload: size={}, filled={}",
1158                                    symbol, msg.info.size, resp.filled_size
1159                                ));
1160                            }
1161                            resp.filled_size
1162                        }
1163                        _ => dec!(0),
1164                    };
1165                    warn!(
1166                        "Replay: order {} on missing-orderbook instrument {} was {:?} — \
1167                         collecting for startup DB cancellation",
1168                        order_id, symbol, resp.status
1169                    );
1170                    self.startup_expired_order_cancels
1171                        .push(StartupExpiredOrderCancel {
1172                            order_id,
1173                            wallet: msg.wallet,
1174                            symbol: symbol.clone(),
1175                            price: msg.info.price,
1176                            size: msg.info.size,
1177                            side: msg.info.side,
1178                            tif: msg.info.tif,
1179                            is_perp: msg.info.is_perp,
1180                            underlying: msg.info.underlying.clone(),
1181                            reduce_only: msg.info.reduce_only,
1182                            nonce: msg.info.nonce,
1183                            signature: msg.info.signature.clone(),
1184                            mmp_enabled: msg.info.mmp_enabled,
1185                            filled_size,
1186                            client_id: msg.info.client_id.clone(),
1187                            timestamp: msg.timestamp,
1188                        });
1189                }
1190                OrderUpdateStatus::Acked => {
1191                    return Err(format!(
1192                        "Replay create for missing-orderbook instrument {} has non-final Acked \
1193                         status; refusing to skip reconciliation",
1194                        symbol
1195                    ));
1196                }
1197                // Filled, Canceled, Rejected — terminal, safe to skip
1198                _ => {}
1199            }
1200
1201            return Ok(());
1202        }
1203
1204        let response = match response {
1205            Some(resp) => resp,
1206            None => {
1207                return Err(format!(
1208                    "Replay create for symbol {} has no response; refusing to guess order state",
1209                    symbol
1210                ));
1211            }
1212        };
1213
1214        // Determine resting quantity from the journal response.
1215        // Deserialize into OrderUpdateMessage to use proper enum types
1216        // instead of fragile string matching.
1217        let resting_quantity = match response.status {
1218            OrderUpdateStatus::Filled => {
1219                // Taker was fully filled — don't add to book.
1220                // Maker fills are applied by Pass 2 (exact fill events
1221                // from engine_events, same DB transaction as commands).
1222                debug!(
1223                    "Replay create for {} was fully filled, not adding to orderbook",
1224                    symbol
1225                );
1226                return Ok(());
1227            }
1228            OrderUpdateStatus::Canceled | OrderUpdateStatus::Rejected => {
1229                debug!(
1230                    "Replay create for {} was {:?}, not adding to orderbook",
1231                    symbol, response.status
1232                );
1233                return Ok(());
1234            }
1235            OrderUpdateStatus::Open => {
1236                // No fills occurred — full quantity rests
1237                msg.info.size
1238            }
1239            OrderUpdateStatus::PartiallyFilled => {
1240                if response.filled_size < dec!(0) || response.filled_size >= msg.info.size {
1241                    return Err(format!(
1242                        "Replay create for symbol {} has invalid partially-filled payload: size={}, filled={}",
1243                        symbol, msg.info.size, response.filled_size
1244                    ));
1245                }
1246
1247                // Taker was partially filled — add remaining to book.
1248                // Maker fills are applied by Pass 2 (exact fill events).
1249                let remaining = msg.info.size - response.filled_size;
1250                debug!(
1251                    "Replay create for {} partially filled: original={}, filled={}, resting={}",
1252                    symbol, msg.info.size, response.filled_size, remaining
1253                );
1254                remaining
1255            }
1256            OrderUpdateStatus::Acked => {
1257                return Err(format!(
1258                    "Replay create for symbol {} has non-final Acked status; refusing to infer resting quantity",
1259                    symbol
1260                ));
1261            }
1262        };
1263
1264        let order_id = match (msg.info.order_id, response.order_id) {
1265            (Some(command_order_id), Some(response_order_id)) => {
1266                if command_order_id != response_order_id {
1267                    return Err(format!(
1268                        "Replay create for symbol {} has mismatched order_id (command={}, response={})",
1269                        symbol, command_order_id, response_order_id
1270                    ));
1271                }
1272                command_order_id
1273            }
1274            (Some(order_id), None) | (None, Some(order_id)) => order_id,
1275            (None, None) => {
1276                return Err(format!(
1277                    "Replay create for symbol {} with status {:?} has no order_id in command or response",
1278                    symbol, response.status
1279                ));
1280            }
1281        };
1282
1283        // Validate order isn't already present.
1284        // Use a scoped immutable borrow so we can call &mut self methods below.
1285        {
1286            let orderbook = self.ctx.orderbooks.get(symbol).unwrap();
1287
1288            if orderbook.has_order(order_id) {
1289                debug!(
1290                    "Order {} already exists in orderbook during replay, skipping",
1291                    order_id
1292                );
1293                return Ok(());
1294            }
1295        }
1296
1297        let timestamp = msg.timestamp;
1298        let price = msg.info.price;
1299        let side = msg.info.side;
1300        let wallet = msg.wallet;
1301
1302        // Re-borrow orderbook mutably now that fill inference is done
1303        let orderbook = self.ctx.orderbooks.get_mut(symbol).unwrap();
1304        orderbook.add_order_with_metadata(
1305            order_id,
1306            price,
1307            resting_quantity,
1308            side,
1309            wallet,
1310            timestamp,
1311            msg.info.client_id.clone(),
1312            msg.info.mmp_enabled,
1313            msg.info.size, // original_size from the journal entry
1314        );
1315
1316        // Update order index: remove any snapshot-loaded stub, then add with replay metadata
1317        self.ctx.order_index.remove_order(&wallet, order_id);
1318        self.ctx.order_index.add_order(
1319            &wallet,
1320            hypercall_engine::order_index::OrderSummary {
1321                order_id,
1322                symbol: symbol.clone(),
1323                side,
1324                price,
1325                original_size: msg.info.size,
1326                remaining_size: resting_quantity,
1327                is_perp: msg.info.is_perp,
1328                mmp_enabled: msg.info.mmp_enabled,
1329                client_id: msg.info.client_id.clone(),
1330                created_at: timestamp as i64,
1331            },
1332        );
1333
1334        debug!(
1335            "Replayed create order {}: symbol={}, resting_qty={}, side={:?}",
1336            order_id, symbol, resting_quantity, side
1337        );
1338
1339        // NOTE: We do NOT increment L2 sequence here because:
1340        // 1. The original order processing already emitted L2 updates
1341        // 2. The cache will replay those L2 updates from the event stream
1342        // 3. The engine's L2 seq will be updated after all replays complete
1343        // Incrementing here would cause engine/cache seq divergence.
1344
1345        Ok(())
1346    }
1347
1348    /// Replay a cancel order command.
1349    fn replay_cancel_order(&mut self, msg: &OrderActionMessage) -> Result<(), String> {
1350        let symbol = &msg.info.symbol;
1351        let wallet = &msg.wallet;
1352
1353        // Skip commands for instruments that no longer have an orderbook
1354        // (expired instruments are not loaded during startup).
1355        if !self.ctx.orderbooks.contains_key(symbol) {
1356            debug!(
1357                "Skipping replay cancel for expired/removed instrument {}",
1358                symbol
1359            );
1360            return Ok(());
1361        }
1362
1363        // Get the orderbook
1364        let orderbook = self.ctx.orderbooks.get_mut(symbol).unwrap();
1365
1366        // Try to find the order by order_id or client_id
1367        let order_id = if let Some(oid) = msg.info.order_id {
1368            oid
1369        } else if let Some(ref client_id) = msg.info.client_id {
1370            // Look up by client_id via order_index (synchronous, no cache lag)
1371            match self
1372                .ctx
1373                .order_index
1374                .get_order_by_client_id(wallet, client_id)
1375            {
1376                Some((oid, _)) => oid,
1377                None => {
1378                    debug!(
1379                        "Order with client_id {} not found during replay cancel, may already be cancelled",
1380                        client_id
1381                    );
1382                    return Ok(());
1383                }
1384            }
1385        } else {
1386            return Err("Cancel order has neither order_id nor client_id".to_string());
1387        };
1388
1389        // Replay cancel is authoritative. Scrub this order_id from the book
1390        // even if snapshot metadata drift left it at the wrong level or side.
1391        if orderbook.cancel_order_for_replay(order_id).is_none() {
1392            debug!(
1393                "Order {} not present in replay cancel scan for symbol {}, may already be cancelled/filled",
1394                order_id, symbol
1395            );
1396        }
1397
1398        // Update order index
1399        self.ctx.order_index.remove_order(wallet, order_id);
1400
1401        // NOTE: We do NOT increment L2 sequence here - see replay_create_order comment.
1402
1403        Ok(())
1404    }
1405
1406    /// Recovery-only cancel helper.
1407    ///
1408    /// Tries likely symbols first, then scans every orderbook as a fallback.
1409    /// Replay is authoritative by order_id, so it is better to scrub a ghost
1410    /// from an unexpected book than to leave a stale resting order behind.
1411    fn cancel_order_for_replay_with_fallback(
1412        &mut self,
1413        order_id: u64,
1414        candidate_symbols: Vec<String>,
1415    ) -> Option<String> {
1416        let mut attempted_symbols = Vec::new();
1417
1418        for symbol in candidate_symbols {
1419            if attempted_symbols.iter().any(|seen| seen == &symbol) {
1420                continue;
1421            }
1422            attempted_symbols.push(symbol.clone());
1423
1424            if let Some(orderbook) = self.ctx.orderbooks.get_mut(&symbol) {
1425                if orderbook.cancel_order_for_replay(order_id).is_some() {
1426                    return Some(symbol);
1427                }
1428            }
1429        }
1430
1431        let remaining_symbols: Vec<String> = self
1432            .ctx
1433            .orderbooks
1434            .keys()
1435            .filter(|symbol| !attempted_symbols.iter().any(|seen| seen == *symbol))
1436            .cloned()
1437            .collect();
1438
1439        for symbol in remaining_symbols {
1440            if let Some(orderbook) = self.ctx.orderbooks.get_mut(&symbol) {
1441                if orderbook.cancel_order_for_replay(order_id).is_some() {
1442                    error!(
1443                        "RECOVERY_INVARIANT: order {} was found in unexpected orderbook {} during replay cancel fallback",
1444                        order_id, symbol
1445                    );
1446                    return Some(symbol);
1447                }
1448            }
1449        }
1450
1451        None
1452    }
1453
1454    /// Replay a replace order: cancel the old order and create the new one.
1455    ///
1456    /// For replay, `msg.info.order_id` is the cancel target (not patched by journal).
1457    /// The new order's assigned ID comes from `response.order_id`.
1458    fn replay_replace_order(
1459        &mut self,
1460        msg: &OrderActionMessage,
1461        response: Option<&OrderUpdateMessage>,
1462    ) -> Result<(), String> {
1463        let wallet = &msg.wallet;
1464
1465        // Phase 1: Always attempt to cancel the old order.
1466        // This is idempotent: if the order was already cancelled/filled by a
1467        // prior replay command, nothing happens. We must always try because
1468        // even a Rejected replace may have succeeded at the cancel phase
1469        // (e.g., cancel succeeded but the new order was rejected for margin).
1470        let cancel_order_id = msg
1471            .info
1472            .order_id
1473            .ok_or("Replace order missing order_id for cancel target".to_string())?;
1474
1475        let mut candidate_symbols = Vec::new();
1476        if let Some(index_symbol) = self
1477            .ctx
1478            .order_index
1479            .get_order_symbol(wallet, cancel_order_id)
1480            .map(|s| s.to_string())
1481        {
1482            candidate_symbols.push(index_symbol);
1483        }
1484
1485        if self
1486            .cancel_order_for_replay_with_fallback(cancel_order_id, candidate_symbols)
1487            .is_none()
1488        {
1489            debug!(
1490                "Replay replace: cancel target {} not present in any replay cancel scan",
1491                cancel_order_id
1492            );
1493        }
1494        self.ctx.order_index.remove_order(wallet, cancel_order_id);
1495
1496        // Phase 2: Only create the new order if the response shows it succeeded.
1497        // Rejected means the new order was not placed (either cancel target was
1498        // not found, or cancel succeeded but create failed for margin/validation).
1499        if let Some(resp) = response {
1500            if resp.status == OrderUpdateStatus::Rejected {
1501                debug!(
1502                    "Replay replace: skipping create phase (rejected) for wallet {}, cancel_target={}",
1503                    wallet, cancel_order_id
1504                );
1505                return Ok(());
1506            }
1507        }
1508
1509        let mut create_msg = msg.clone();
1510        create_msg.action = OrderAction::CreateOrder;
1511        if let Some(resp) = response {
1512            create_msg.info.order_id = resp.order_id;
1513        } else {
1514            create_msg.info.order_id = None;
1515        }
1516
1517        self.replay_create_order(&create_msg, response)
1518    }
1519
1520    /// Apply fill events from the journal to resting maker orders.
1521    ///
1522    /// During `replay_create_order`, each command's `response_json` only
1523    /// reflects the order's state at the time *that* command was processed.
1524    /// A maker order whose response said "Open" may have been filled later
1525    /// by a subsequent taker order. This method applies those fills to both
1526    /// the orderbook and order_index, closing the gap.
1527    ///
1528    /// `fills` should contain all `OrderFilled` events from engine_events
1529    /// for commands in the replay window (after the snapshot L2 sequence).
1530    pub(super) fn apply_fills_for_replay(&mut self, fills: &[hypercall_types::Fill]) {
1531        if fills.is_empty() {
1532            return;
1533        }
1534
1535        let mut applied = 0;
1536        let mut skipped = 0;
1537
1538        for fill in fills {
1539            // Update the order_index (handles partial/full fill, converts raw→human)
1540            let fully_filled = self.ctx.order_index.fill_order(
1541                &fill.maker_wallet_address,
1542                fill.maker_order_id,
1543                fill.size,
1544            );
1545
1546            // Update the orderbook
1547            if let Some(orderbook) = self.ctx.orderbooks.get_mut(&fill.symbol) {
1548                if orderbook.has_order(fill.maker_order_id) {
1549                    orderbook.reduce_order_quantity(fill.maker_order_id, fill.size);
1550                    applied += 1;
1551                } else {
1552                    skipped += 1;
1553                }
1554            } else {
1555                // No orderbook for this symbol. Expected for expired instruments;
1556                // suspicious otherwise. We warn rather than panic because
1557                // expired_instruments can be stale after snapshot restore (see
1558                // replay_create_order comment), so we can't hard-gate on it.
1559                if self.ctx.expired_instruments.contains_key(&fill.symbol) {
1560                    debug!(
1561                        "Replay fill for maker order {} on expired instrument {} \
1562                         — no orderbook, fill applied to order_index only",
1563                        fill.maker_order_id, fill.symbol
1564                    );
1565                } else {
1566                    warn!(
1567                        "Replay fill for maker order {} references non-existent \
1568                         orderbook {} (not in expired_instruments) — fill applied \
1569                         to order_index but not orderbook",
1570                        fill.maker_order_id, fill.symbol
1571                    );
1572                }
1573                skipped += 1;
1574            }
1575
1576            if fully_filled {
1577                debug!(
1578                    "Replay fill: maker order {} fully filled (trade_id={})",
1579                    fill.maker_order_id, fill.trade_id
1580                );
1581            } else {
1582                debug!(
1583                    "Replay fill: maker order {} partially filled by {} (trade_id={})",
1584                    fill.maker_order_id, fill.size, fill.trade_id
1585                );
1586            }
1587        }
1588
1589        if applied > 0 || skipped > 0 {
1590            info!(
1591                "Applied {} replay fills ({} skipped - order not in book)",
1592                applied, skipped
1593            );
1594        }
1595    }
1596
1597    pub(super) fn flush_startup_expired_order_cancels(&mut self) {
1598        if self.startup_expired_order_cancels.is_empty() {
1599            return;
1600        }
1601
1602        let count = self.startup_expired_order_cancels.len();
1603        warn!(
1604            "Startup recovery: {} orders on expired instruments need DB cancellation",
1605            count
1606        );
1607
1608        for cancel in &self.startup_expired_order_cancels {
1609            warn!(
1610                "  order_id={}, symbol={}, wallet={}, side={:?}, price={}, size={}, filled={}",
1611                cancel.order_id,
1612                cancel.symbol,
1613                cancel.wallet,
1614                cancel.side,
1615                cancel.price,
1616                cancel.size,
1617                cancel.filled_size,
1618            );
1619        }
1620
1621        if let Some(ref handler) = self.ctx.db {
1622            let event_handler =
1623                crate::db_handler::DieselEventHandler::with_pool_no_migrations(handler.pool());
1624            let chunk_size = 5000;
1625            let mut total_rows = 0usize;
1626            for (i, chunk) in self
1627                .startup_expired_order_cancels
1628                .chunks(chunk_size)
1629                .enumerate()
1630            {
1631                match event_handler
1632                    .batch_cancel_expired_orders_sync(chunk, "Instrument expired during offline")
1633                {
1634                    Ok(rows) => {
1635                        total_rows += rows;
1636                        info!(
1637                            "Startup recovery: cancelled chunk {}, {} rows written ({} in chunk, {} total so far)",
1638                            i + 1, rows, chunk.len(), total_rows
1639                        );
1640                    }
1641                    Err(e) => {
1642                        error!(
1643                            "Startup recovery: failed to cancel chunk {} ({} orders): {}. \
1644                             Remaining orders will show stale OPEN status.",
1645                            i + 1,
1646                            chunk.len(),
1647                            e
1648                        );
1649                    }
1650                }
1651            }
1652            info!(
1653                "Startup recovery: {} candidates, {} total rows cancelled",
1654                count, total_rows
1655            );
1656            metrics::counter!("ht_recovery_expired_order_cancels_total")
1657                .increment(total_rows as u64);
1658        }
1659        self.startup_expired_order_cancels.clear();
1660    }
1661
1662    /// Apply side-effect cancels from OrderUpdate events during replay (Pass 3).
1663    ///
1664    /// During live processing, some commands produce cancel side effects:
1665    /// - MMP-triggered cancels: A taker fill triggers MMP, which cancels all
1666    ///   MMP-enabled orders for that wallet+underlying as a side effect.
1667    /// - Self-trade prevention: A taker's order is cancelled to prevent self-trade,
1668    ///   and the matching maker may also be cancelled.
1669    ///
1670    /// These cancels are captured as OrderUpdate events in the taker's journal entry
1671    /// but are NOT separate journal commands. Pass 1 only processes commands, so these
1672    /// side-effect cancels are lost during replay. This pass re-applies them.
1673    pub(super) fn apply_cancel_events_for_replay(
1674        &mut self,
1675        order_updates: &[hypercall_types::OrderUpdateMessage],
1676    ) {
1677        if order_updates.is_empty() {
1678            return;
1679        }
1680
1681        let mut applied = 0;
1682        let mut skipped = 0;
1683
1684        for update in order_updates {
1685            // Only process terminal statuses
1686            let is_terminal = matches!(
1687                update.status,
1688                hypercall_types::OrderUpdateStatus::Canceled
1689                    | hypercall_types::OrderUpdateStatus::Filled
1690                    | hypercall_types::OrderUpdateStatus::Rejected
1691            );
1692            if !is_terminal {
1693                continue;
1694            }
1695
1696            let Some(order_id) = update.order_id else {
1697                warn!(
1698                    "Replay Pass 3: terminal OrderUpdate (status={:?}, wallet={}) \
1699                     has no order_id — cannot reconcile, skipping",
1700                    update.status, update.wallet_address
1701                );
1702                continue;
1703            };
1704
1705            let wallet = &update.wallet_address;
1706            let symbol = &update.info.symbol;
1707
1708            if let Some(orderbook) = self.ctx.orderbooks.get_mut(symbol) {
1709                if orderbook.cancel_order_for_replay(order_id).is_some() {
1710                    applied += 1;
1711                    debug!(
1712                        "Replay Pass 3: removed order {} (status={:?}, wallet={}, symbol={})",
1713                        order_id, update.status, wallet, symbol
1714                    );
1715                } else {
1716                    skipped += 1;
1717                }
1718            }
1719
1720            self.ctx.order_index.remove_order(wallet, order_id);
1721        }
1722
1723        if applied > 0 || skipped > 0 {
1724            info!(
1725                "Replay Pass 3: applied {} side-effect cancels ({} skipped - not in book)",
1726                applied, skipped
1727            );
1728        }
1729    }
1730}
1731
1732#[cfg(test)]
1733mod tests {
1734    use super::*;
1735    use crate::rsm::MarginMode;
1736    use hypercall_db::SettlementWriter;
1737    use hypercall_engine::FillAccounting;
1738    use hypercall_types::wallet_address::test_wallet;
1739    use hypercall_types::FillSource;
1740    use hypercall_types::Side;
1741    use rust_decimal::Decimal;
1742    use rust_decimal_macros::dec;
1743
1744    #[test]
1745    fn decode_fill_events_accepts_stored_fill_payload() {
1746        let fill = hypercall_types::Fill {
1747            trade_id: 77,
1748            taker_order_id: 101,
1749            maker_order_id: 102,
1750            symbol: "BTC-20261231-100000-C".to_string(),
1751            price: dec!(250),
1752            size: dec!(100000000),
1753            taker_side: Side::Buy,
1754            taker_wallet_address: test_wallet(1),
1755            maker_wallet_address: test_wallet(2),
1756            fee: Decimal::ZERO,
1757            is_taker: true,
1758            timestamp: 1_700_000_000_000,
1759            builder_code_address: None,
1760            builder_code_fee: None,
1761            source: FillSource::Orderbook,
1762            taker_realized_pnl: Some(dec!(10)),
1763            maker_realized_pnl: Some(dec!(-10)),
1764            underlying_notional: Some(dec!(6500000)),
1765        };
1766        let message = hypercall_types::EngineMessage::OrderFilled {
1767            accounting: FillAccounting::from_fill(&fill),
1768            fill: fill.clone(),
1769        };
1770        let wire = message.serialize_inner().expect("serialize OrderFilled");
1771
1772        let decoded = UnifiedEngine::decode_fill_events(&[wire]);
1773        let (decoded_fill, decoded_accounting) = decoded.first().expect("decoded fill");
1774
1775        assert_eq!(decoded.len(), 1);
1776        assert_eq!(decoded_fill.trade_id, fill.trade_id);
1777        assert_eq!(decoded_fill.taker_order_id, fill.taker_order_id);
1778        assert_eq!(decoded_fill.maker_order_id, fill.maker_order_id);
1779        assert_eq!(decoded_fill.symbol, fill.symbol);
1780        assert_eq!(decoded_fill.price, fill.price);
1781        assert_eq!(decoded_fill.size, fill.size);
1782        assert_eq!(decoded_fill.taker_side, fill.taker_side);
1783        assert_eq!(decoded_fill.taker_wallet_address, fill.taker_wallet_address);
1784        assert_eq!(decoded_fill.maker_wallet_address, fill.maker_wallet_address);
1785        assert_eq!(decoded_fill.taker_realized_pnl, fill.taker_realized_pnl);
1786        assert_eq!(decoded_fill.maker_realized_pnl, fill.maker_realized_pnl);
1787        assert_eq!(*decoded_accounting, FillAccounting::from_fill(&fill));
1788    }
1789
1790    #[test]
1791    fn state_command_replay_decodes_hypercore_equity_update() {
1792        let wallet = test_wallet(12);
1793        let mut command_data = vec![1];
1794        command_data.extend(
1795            rmp_serde::to_vec(&(wallet, dec!(1234), 300u64)).expect("serialize replay payload"),
1796        );
1797        let cmd = hypercall_db::ReplayCommand {
1798            command_id: 42,
1799            request_id: "req".to_string(),
1800            command_type: "HypercoreEquityUpdate".to_string(),
1801            command_data,
1802            response_data: None,
1803        };
1804
1805        let decoded = decode_state_command_for_replay(&cmd).expect("decoded state command");
1806
1807        assert_eq!(replay_envelope_timestamp(&decoded), 300);
1808        assert_eq!(
1809            crate::rsm::restart_components::replay_disposition_for_command(&decoded),
1810            hypercall_recovery::ReplayDisposition::Applied
1811        );
1812        assert!(matches!(
1813            decoded,
1814            EngineCommand::HypercoreEquityUpdate {
1815                wallet: actual_wallet,
1816                account_value,
1817                timestamp_ms: 300
1818            } if actual_wallet == wallet && account_value == dec!(1234)
1819        ));
1820    }
1821
1822    #[test]
1823    fn state_command_replay_decodes_price_update() {
1824        let mut command_data = vec![1];
1825        command_data.extend(
1826            rmp_serde::to_vec(&crate::rsm::apply::PriceUpdatePayload {
1827                underlying: "BTC".to_string(),
1828                spot_price: dec!(95000),
1829                timestamp_ms: 123,
1830            })
1831            .expect("serialize PriceUpdate replay payload"),
1832        );
1833        let cmd = hypercall_db::ReplayCommand {
1834            command_id: 43,
1835            request_id: "req".to_string(),
1836            command_type: "PriceUpdate".to_string(),
1837            command_data,
1838            response_data: None,
1839        };
1840
1841        let decoded = decode_state_command_for_replay(&cmd).expect("decoded state command");
1842
1843        assert_eq!(replay_envelope_timestamp(&decoded), 123);
1844        assert!(matches!(
1845            decoded,
1846            EngineCommand::PriceUpdate {
1847                underlying,
1848                spot_price,
1849                timestamp_ms: 123
1850            } if underlying == "BTC" && spot_price == dec!(95000)
1851        ));
1852    }
1853
1854    #[test]
1855    fn state_command_replay_decodes_expiry_commands() {
1856        let market = hypercall_types::Market {
1857            symbol: "BTC-20260619-110000-C".to_string(),
1858            underlying: "BTC".to_string(),
1859            expiry: 20260619,
1860            strike: dec!(110000),
1861            option_type: hypercall_types::OptionType::Call,
1862        };
1863        let expire = hypercall_db::ReplayCommand {
1864            command_id: 44,
1865            request_id: "expire-market".to_string(),
1866            command_type: "ExpireMarket".to_string(),
1867            command_data: hypercall_types::serialize_to_wire_bytes(
1868                &crate::rsm::apply::MarketActionCommand::with_expiry_context(
1869                    hypercall_types::MarketActionMessage {
1870                        market: market.clone(),
1871                        action: hypercall_types::MarketAction::ExpireMarket,
1872                        timestamp: 459,
1873                    },
1874                    crate::rsm::apply::TickExpiryContext {
1875                        due_expiries: vec![crate::rsm::apply::TickExpiryDueGroup {
1876                            expiry_ts: 20260619,
1877                            symbols: vec![market.symbol.clone()],
1878                        }],
1879                        pending_settlements: Vec::new(),
1880                        settlement_prices: vec![crate::rsm::apply::TickExpirySettlementPrice {
1881                            underlying: "BTC".to_string(),
1882                            expiry_ts: 20260619,
1883                            price: dec!(120000),
1884                        }],
1885                        margin_modes: Vec::new(),
1886                        pm_settlements: Vec::new(),
1887                    },
1888                ),
1889            ),
1890            response_data: None,
1891        };
1892
1893        let decoded = decode_state_command_for_replay(&expire).expect("decoded expiry command");
1894
1895        assert_eq!(replay_envelope_timestamp(&decoded), 459);
1896        assert!(UnifiedEngine::is_expiry_replay_command(&decoded));
1897        assert!(matches!(
1898            decoded,
1899            EngineCommand::MarketAction(command)
1900                if command.message.action == hypercall_types::MarketAction::ExpireMarket
1901                    && command.expiry_context.is_some()
1902        ));
1903
1904        let tick = hypercall_db::ReplayCommand {
1905            command_id: 45,
1906            request_id: "tick-expiry".to_string(),
1907            command_type: "TickExpiry".to_string(),
1908            command_data: hypercall_types::serialize_to_wire_bytes(&(
1909                460u64,
1910                crate::rsm::apply::TickExpiryContext::empty(),
1911            )),
1912            response_data: None,
1913        };
1914
1915        let decoded_tick = decode_state_command_for_replay(&tick).expect("decoded tick command");
1916
1917        assert_eq!(replay_envelope_timestamp(&decoded_tick), 460);
1918        assert!(UnifiedEngine::is_expiry_replay_command(&decoded_tick));
1919        assert!(matches!(decoded_tick, EngineCommand::TickExpiry { .. }));
1920    }
1921
1922    #[tokio::test]
1923    async fn replay_command_row_tick_expiry_never_reconciles_from_durable_settlement() {
1924        let context = crate::rsm::unified_engine::tests::setup_test_context().await;
1925        let diesel_handler = std::sync::Arc::new(
1926            DatabaseHandler::new(&context.database_url).expect("diesel handler should connect"),
1927        );
1928
1929        let wallet = test_wallet(156);
1930        let symbol = "BTC-20261231-100000-C".to_string();
1931        let expiry_ts = hypercall_types::expiry_date_to_timestamp("BTC", 20261231) as u64;
1932        let reference_price = dec!(105000);
1933        let settlement_price = dec!(5000);
1934        let position_size = dec!(1);
1935        let settlement_entry_price = dec!(100000);
1936        let settlement_value = settlement_price * position_size;
1937        let cost_basis = settlement_entry_price * position_size;
1938        let net_pnl = settlement_value - cost_basis;
1939
1940        diesel_handler
1941            .try_apply_settlement_sync(
1942                &wallet,
1943                &symbol,
1944                position_size,
1945                settlement_price,
1946                settlement_value,
1947                MarginMode::Standard,
1948                1_798_675_200_000,
1949                Some(settlement_entry_price),
1950                Some(cost_basis),
1951                Some(net_pnl),
1952            )
1953            .expect("durable settlement should be present before replay");
1954
1955        let (engine_event_tx, _engine_event_rx) = tokio::sync::mpsc::unbounded_channel();
1956        let (shutdown_tx, _) = tokio::sync::broadcast::channel::<()>(1);
1957        let (mut engine, _order_tx, _market_tx) =
1958            UnifiedEngineBuilder::new(crate::rsm::unified_engine::tests::create_test_config())
1959                .with_database(&context.database_url)
1960                .allow_no_database_for_tests()
1961                .with_mock_journal()
1962                .build(engine_event_tx, shutdown_tx);
1963
1964        engine
1965            .expiry_manager
1966            .schedule_expiry(symbol.clone(), expiry_ts);
1967        engine.ctx.engine_positions.insert(
1968            (wallet, symbol.clone()),
1969            crate::rsm::engine_deps::EnginePosition {
1970                quantity: position_size,
1971                entry_price: settlement_entry_price,
1972            },
1973        );
1974        engine.ctx.balance_ledger.set(wallet, dec!(10000));
1975
1976        let cmd = hypercall_db::ReplayCommand {
1977            command_id: 156,
1978            request_id: "tick-expiry-divergent-cash".to_string(),
1979            command_type: "TickExpiry".to_string(),
1980            command_data: hypercall_types::serialize_to_wire_bytes(&(
1981                1_798_675_200_000_u64,
1982                crate::rsm::apply::TickExpiryContext {
1983                    due_expiries: vec![crate::rsm::apply::TickExpiryDueGroup {
1984                        expiry_ts: expiry_ts as i64,
1985                        symbols: vec![symbol],
1986                    }],
1987                    pending_settlements: Vec::new(),
1988                    settlement_prices: vec![crate::rsm::apply::TickExpirySettlementPrice {
1989                        underlying: "BTC".to_string(),
1990                        expiry_ts: expiry_ts as i64,
1991                        price: reference_price,
1992                    }],
1993                    margin_modes: vec![crate::rsm::apply::TickExpiryWalletMarginMode {
1994                        wallet,
1995                        margin_mode: MarginMode::Standard,
1996                        pm_settlement_required: false,
1997                    }],
1998                    pm_settlements: Vec::new(),
1999                },
2000            )),
2001            response_data: None,
2002        };
2003
2004        let mut replay_count = 0;
2005        let mut max_replayed_order_id = 0;
2006        engine.replay_command_row(&cmd, &mut replay_count, &mut max_replayed_order_id);
2007
2008        assert_eq!(
2009            engine.ctx.balance_ledger.balance(&wallet),
2010            dec!(15000),
2011            "replay must keep deterministic engine cash and never reconcile from durable settlement rows"
2012        );
2013    }
2014
2015    #[test]
2016    fn state_command_replay_ignores_non_state_commands() {
2017        let cmd = hypercall_db::ReplayCommand {
2018            command_id: 46,
2019            request_id: "req".to_string(),
2020            command_type: "CreateOrder".to_string(),
2021            command_data: vec![1],
2022            response_data: None,
2023        };
2024
2025        assert!(decode_state_command_for_replay(&cmd).is_none());
2026    }
2027
2028    #[test]
2029    fn replay_command_row_applies_legacy_agent_auth_without_nonce() {
2030        let (mut engine, _order_tx, _market_tx) =
2031            crate::rsm::unified_engine::tests::build_replay_test_engine();
2032        let wallet = test_wallet(52);
2033        let agent = test_wallet(53);
2034        let mut replay_count = 0;
2035        let mut max_replayed_order_id = 0;
2036
2037        let approve = hypercall_db::ReplayCommand {
2038            command_id: 45,
2039            request_id: "legacy-approve-agent".to_string(),
2040            command_type: "ApproveAgent".to_string(),
2041            command_data: hypercall_types::serialize_to_wire_bytes(&(wallet, agent, None::<u64>)),
2042            response_data: None,
2043        };
2044
2045        engine.replay_command_row(&approve, &mut replay_count, &mut max_replayed_order_id);
2046
2047        assert_eq!(replay_count, 1);
2048        assert_eq!(max_replayed_order_id, 0);
2049        assert_eq!(
2050            engine
2051                .ctx
2052                .agent_authorizations
2053                .get(&wallet)
2054                .and_then(|agents| agents.get(&agent)),
2055            Some(&None),
2056            "legacy persisted ApproveAgent replay row without nonce must restore auth"
2057        );
2058        let revoke = hypercall_db::ReplayCommand {
2059            command_id: 46,
2060            request_id: "legacy-revoke-agent".to_string(),
2061            command_type: "RevokeAgent".to_string(),
2062            command_data: hypercall_types::serialize_to_wire_bytes(&(wallet, agent)),
2063            response_data: None,
2064        };
2065
2066        engine.replay_command_row(&revoke, &mut replay_count, &mut max_replayed_order_id);
2067
2068        assert_eq!(replay_count, 2);
2069        assert!(
2070            !engine.ctx.agent_authorizations.contains_key(&wallet),
2071            "legacy persisted RevokeAgent replay row without nonce must clear auth"
2072        );
2073    }
2074
2075    #[test]
2076    fn state_command_replay_decodes_named_agent_auth_payloads() {
2077        let wallet = test_wallet(54);
2078        let agent = test_wallet(55);
2079        let approve = hypercall_db::ReplayCommand {
2080            command_id: 47,
2081            request_id: "approve-agent".to_string(),
2082            command_type: "ApproveAgent".to_string(),
2083            command_data: hypercall_types::serialize_to_wire_bytes(
2084                &crate::rsm::apply::ApproveAgentPayload {
2085                    wallet,
2086                    agent,
2087                    expires_at_ms: Some(999),
2088                    nonce: Some(123),
2089                    timestamp_ms: 456,
2090                },
2091            ),
2092            response_data: None,
2093        };
2094
2095        let decoded = decode_state_command_for_replay(&approve).expect("decoded approve");
2096
2097        assert!(matches!(
2098            decoded,
2099            EngineCommand::ApproveAgent {
2100                wallet: decoded_wallet,
2101                agent: decoded_agent,
2102                expires_at_ms: Some(999),
2103                nonce: Some(123),
2104                timestamp_ms: 456,
2105            } if decoded_wallet == wallet && decoded_agent == agent
2106        ));
2107
2108        let revoke = hypercall_db::ReplayCommand {
2109            command_id: 48,
2110            request_id: "revoke-agent".to_string(),
2111            command_type: "RevokeAgent".to_string(),
2112            command_data: hypercall_types::serialize_to_wire_bytes(
2113                &crate::rsm::apply::RevokeAgentPayload {
2114                    wallet,
2115                    agent,
2116                    nonce: Some(124),
2117                    timestamp_ms: 457,
2118                },
2119            ),
2120            response_data: None,
2121        };
2122
2123        let decoded = decode_state_command_for_replay(&revoke).expect("decoded revoke");
2124
2125        assert!(matches!(
2126            decoded,
2127            EngineCommand::RevokeAgent {
2128                wallet: decoded_wallet,
2129                agent: decoded_agent,
2130                nonce: Some(124),
2131                timestamp_ms: 457,
2132            } if decoded_wallet == wallet && decoded_agent == agent
2133        ));
2134    }
2135
2136    #[test]
2137    fn state_command_replay_decodes_named_balance_update_payloads() {
2138        let wallet = test_wallet(56);
2139        let cmd = hypercall_db::ReplayCommand {
2140            command_id: 49,
2141            request_id: "deposit-update".to_string(),
2142            command_type: "DepositUpdate".to_string(),
2143            command_data: hypercall_types::serialize_to_wire_bytes(
2144                &crate::rsm::apply::DepositUpdatePayload {
2145                    wallet,
2146                    amount: dec!(50),
2147                    timestamp_ms: 458,
2148                    sequence: 125,
2149                    source_event_hash: test_source_hash(125),
2150                },
2151            ),
2152            response_data: None,
2153        };
2154
2155        let decoded = decode_state_command_for_replay(&cmd).expect("decoded deposit update");
2156
2157        assert!(matches!(
2158            decoded,
2159            EngineCommand::DepositUpdate {
2160                wallet: decoded_wallet,
2161                amount,
2162                timestamp_ms: 458,
2163                sequence: Some(125),
2164                source_event_hash,
2165            } if decoded_wallet == wallet && amount == dec!(50) && source_event_hash == test_source_hash(125)
2166        ));
2167    }
2168
2169    #[test]
2170    fn state_command_replay_decodes_legacy_deposit_balance_payloads() {
2171        let wallet = test_wallet(57);
2172        let cmd = hypercall_db::ReplayCommand {
2173            command_id: 50,
2174            request_id: "legacy-deposit-update".to_string(),
2175            command_type: "DepositUpdate".to_string(),
2176            command_data: hypercall_types::serialize_to_wire_bytes(
2177                &crate::rsm::apply::BalanceCommandPayload {
2178                    wallet,
2179                    amount: dec!(50),
2180                    balance_after: dec!(150),
2181                    timestamp_ms: 459,
2182                    sequence: Some(126),
2183                },
2184            ),
2185            response_data: None,
2186        };
2187
2188        let decoded = decode_state_command_for_replay(&cmd).expect("decoded legacy deposit update");
2189
2190        match decoded {
2191            EngineCommand::DepositUpdate {
2192                wallet: decoded_wallet,
2193                amount,
2194                timestamp_ms,
2195                sequence,
2196                source_event_hash,
2197            } => {
2198                assert_eq!(decoded_wallet, wallet);
2199                assert_eq!(amount, dec!(50));
2200                assert_eq!(timestamp_ms, 459);
2201                assert_eq!(sequence, Some(126));
2202                assert_ne!(source_event_hash, alloy::primitives::FixedBytes::<32>::ZERO);
2203            }
2204            other => panic!("expected DepositUpdate, got {other:?}"),
2205        }
2206    }
2207
2208    #[test]
2209    #[should_panic(expected = "Legacy replay DepositUpdate balance payload missing sequence")]
2210    fn state_command_replay_rejects_legacy_deposit_balance_payloads_without_sequence() {
2211        let wallet = test_wallet(58);
2212        let cmd = hypercall_db::ReplayCommand {
2213            command_id: 51,
2214            request_id: "legacy-deposit-update-no-sequence".to_string(),
2215            command_type: "DepositUpdate".to_string(),
2216            command_data: hypercall_types::serialize_to_wire_bytes(
2217                &crate::rsm::apply::BalanceCommandPayload {
2218                    wallet,
2219                    amount: dec!(50),
2220                    balance_after: dec!(150),
2221                    timestamp_ms: 460,
2222                    sequence: None,
2223                },
2224            ),
2225            response_data: None,
2226        };
2227
2228        let _ = decode_state_command_for_replay(&cmd);
2229    }
2230}