1use super::*;
4use crate::engine_enums_ext::ReplayCommandExt;
5use crate::rsm::apply::{CommandEnvelope, EngineCommand};
6use hypercall_db::JournalReplayReader;
7use std::collections::BTreeMap;
8
9type ReplayFillEvent = (hypercall_types::Fill, hypercall_types::FillAccounting);
10type ReplayFillEventsByCommand = BTreeMap<i64, Vec<ReplayFillEvent>>;
11
12#[cfg(test)]
13fn test_source_hash(byte: u8) -> alloy::primitives::FixedBytes<32> {
14 alloy::primitives::FixedBytes::from([byte; 32])
15}
16
17fn replay_state_command_type(command_type: &str) -> Option<crate::nats::CommandType> {
18 match command_type {
19 "ExpireMarket" => Some(crate::nats::CommandType::MarketAction),
20 "TickExpiry" => Some(crate::nats::CommandType::TickExpiry),
21 "PriceUpdate" => Some(crate::nats::CommandType::PriceUpdate),
22 "IvUpdate" => Some(crate::nats::CommandType::IvUpdate),
23 "LiquidationState" => Some(crate::nats::CommandType::LiquidationState),
24 "TierUpdate" => Some(crate::nats::CommandType::TierUpdate),
25 "HypercorePositionUpdate" => Some(crate::nats::CommandType::HypercorePositionUpdate),
26 "MmpConfigUpdate" => Some(crate::nats::CommandType::MmpConfigUpdate),
27 "TradingModeUpdate" => Some(crate::nats::CommandType::TradingModeUpdate),
28 "DepositUpdate" => Some(crate::nats::CommandType::DepositUpdate),
29 "OptionDepositUpdate" => Some(crate::nats::CommandType::OptionDepositUpdate),
30 "OptionWithdrawalUpdate" => Some(crate::nats::CommandType::OptionWithdrawalUpdate),
31 "CashWithdrawalUpdate" => Some(crate::nats::CommandType::CashWithdrawalUpdate),
32 "LiquidationBonusUpdate" => Some(crate::nats::CommandType::LiquidationBonusUpdate),
33 "ApproveAgent" => Some(crate::nats::CommandType::ApproveAgent),
34 "RevokeAgent" => Some(crate::nats::CommandType::RevokeAgent),
35 "NonceAdvance" => Some(crate::nats::CommandType::NonceAdvance),
36 "HypercoreEquityUpdate" => Some(crate::nats::CommandType::HypercoreEquityUpdate),
37 "SetPmSettlementPoolConfig" => Some(crate::nats::CommandType::SetPmSettlementPoolConfig),
38 "RecordPmVaultDeposit" => Some(crate::nats::CommandType::RecordPmVaultDeposit),
39 "RequestPmVaultWithdrawal" => Some(crate::nats::CommandType::RequestPmVaultWithdrawal),
40 "AccruePmSettlementInterest" => Some(crate::nats::CommandType::AccruePmSettlementInterest),
41 "ApplyPmSettlementRepayment" => Some(crate::nats::CommandType::ApplyPmSettlementRepayment),
42 "JournalPmRecoveryPlan" => Some(crate::nats::CommandType::JournalPmRecoveryPlan),
43 "MarkPmRecoveryActionSubmitted" => {
44 Some(crate::nats::CommandType::MarkPmRecoveryActionSubmitted)
45 }
46 "ResolvePmRecoveryAction" => Some(crate::nats::CommandType::ResolvePmRecoveryAction),
47 _ => None,
48 }
49}
50
51fn decode_state_command_for_replay(cmd: &hypercall_db::ReplayCommand) -> Option<EngineCommand> {
52 let command_type = replay_state_command_type(&cmd.command_type)?;
53 let command_wire_version = replay_command_wire_version(&cmd.command_data);
54 Some(
55 crate::nats::deserialize::deserialize_command_for_replay(
56 command_type,
57 command_wire_version,
58 &cmd.command_data,
59 )
60 .unwrap_or_else(|e| {
61 panic!(
62 "CRITICAL_FAILURE: Failed to decode {} for replay \
63 (command_id={}, command_data_len={}): {}",
64 cmd.command_type,
65 cmd.command_id,
66 cmd.command_data.len(),
67 e
68 )
69 }),
70 )
71}
72
73fn replay_command_wire_version(command_data: &[u8]) -> u8 {
74 if replay_command_data_is_named_payload(command_data) {
75 crate::nats::COMMAND_WIRE_VERSION_V1
76 } else {
77 crate::nats::LEGACY_COMMAND_WIRE_VERSION
78 }
79}
80
81fn replay_command_data_is_named_payload(command_data: &[u8]) -> bool {
82 command_data.first() == Some(&hypercall_types::WIRE_FORMAT_VERSION)
83 && command_data
84 .get(1)
85 .is_some_and(|marker| matches!(*marker, 0x80..=0x8f | 0xde | 0xdf))
86}
87
88fn replay_envelope_timestamp(command: &EngineCommand) -> u64 {
89 match command {
90 EngineCommand::PriceUpdate { timestamp_ms, .. }
91 | EngineCommand::IvUpdate { timestamp_ms, .. }
92 | EngineCommand::HypercorePositionUpdate { timestamp_ms, .. }
93 | EngineCommand::TradingModeUpdate { timestamp_ms, .. }
94 | EngineCommand::DepositUpdate { timestamp_ms, .. }
95 | EngineCommand::OptionDepositUpdate { timestamp_ms, .. }
96 | EngineCommand::OptionWithdrawalUpdate { timestamp_ms, .. }
97 | EngineCommand::CashWithdrawalUpdate { timestamp_ms, .. }
98 | EngineCommand::LiquidationBonusUpdate { timestamp_ms, .. }
99 | EngineCommand::ApproveAgent { timestamp_ms, .. }
100 | EngineCommand::RevokeAgent { timestamp_ms, .. }
101 | EngineCommand::NonceAdvance { timestamp_ms, .. }
102 | EngineCommand::HypercoreEquityUpdate { timestamp_ms, .. } => *timestamp_ms,
103 EngineCommand::SetPmSettlementPoolConfig(command) => command.timestamp_ms,
104 EngineCommand::RecordPmVaultDeposit(command) => command.timestamp_ms,
105 EngineCommand::RequestPmVaultWithdrawal(command) => command.timestamp_ms,
106 EngineCommand::AccruePmSettlementInterest(command) => command.timestamp_ms,
107 EngineCommand::ApplyPmSettlementRepayment(command) => command.timestamp_ms,
108 EngineCommand::JournalPmRecoveryPlan(command) => command.timestamp_ms,
109 EngineCommand::MarkPmRecoveryActionSubmitted(command) => command.timestamp_ms,
110 EngineCommand::ResolvePmRecoveryAction(command) => command.timestamp_ms,
111 EngineCommand::MarketAction(command) => command.message.timestamp,
112 EngineCommand::TickExpiry { now_ms, .. } => *now_ms,
113 EngineCommand::LiquidationState(message) => message.timestamp,
114 _ => 0,
115 }
116}
117
118pub(crate) struct StartupExpiredOrderCancel {
119 pub order_id: u64,
120 pub wallet: WalletAddress,
121 pub symbol: String,
122 pub price: Decimal,
123 pub size: Decimal,
124 pub side: hypercall_types::Side,
125 pub tif: hypercall_types::TimeInForce,
126 pub is_perp: bool,
127 pub underlying: Option<String>,
128 pub reduce_only: Option<bool>,
129 pub nonce: Option<u64>,
130 pub signature: Option<String>,
131 pub mmp_enabled: bool,
132 pub filled_size: Decimal,
133 pub client_id: Option<String>,
134 pub timestamp: u64,
135}
136
137impl UnifiedEngine {
138 pub(super) fn load_markets_from_db(&mut self) {
139 if let Some(ref handler) = self.ctx.db {
140 info!("Loading instruments from database...");
141 match handler.get_all_instruments_sync() {
142 Ok(instruments) => {
143 info!("Loading {} instruments from database", instruments.len());
144 let mut loaded_count = 0;
145 let mut expired_on_startup = 0;
146 let mut scheduled_for_expiry = 0;
147 let mut startup_pending_symbols: Vec<String> = Vec::new();
148
149 let now = crate::shared::clock::unix_now_secs();
153
154 for instrument in instruments {
155 let option_type = instrument.option_type;
157
158 let expiry_timestamp = crate::rsm::margin_manager::expiry_date_to_timestamp(
161 &instrument.underlying,
162 instrument.expiry as u64,
163 );
164
165 if expiry_timestamp <= now {
166 self.ctx
168 .expired_instruments
169 .insert(instrument.id.clone(), true);
170 let trading_mode = hypercall_types::TradingModes::from_db_str(
171 &instrument.trading_mode,
172 );
173 self.ctx
174 .instrument_trading_modes
175 .insert(instrument.id.clone(), trading_mode);
176 expired_on_startup += 1;
177
178 match instrument.status {
179 hypercall_types::api_models::InstrumentStatus::Active => {
180 startup_pending_symbols.push(instrument.id.clone());
181 warn!(
182 "Instrument {} expired while offline (expiry_ts={}) with status Active - \
183 queuing startup DB transition to EXPIRED_PENDING_PRICE for settlement retry.",
184 instrument.id, expiry_timestamp
185 );
186 }
187 hypercall_types::api_models::InstrumentStatus::ExpiredPendingPrice => {
188 info!(
189 "Instrument {} expired while offline (expiry_ts={}) and already ExpiredPendingPrice - \
190 settlement retry will resume from persisted state.",
191 instrument.id, expiry_timestamp
192 );
193 }
194 hypercall_types::api_models::InstrumentStatus::Settled => {
195 debug!(
196 "Instrument {} expired while offline (expiry_ts={}) and already Settled - \
197 no startup status transition needed.",
198 instrument.id, expiry_timestamp
199 );
200 }
201 }
202
203 metrics::gauge!("ht_settlement_instruments_pending_price")
205 .increment(1.0);
206 } else {
207 let orderbook = OrderBook::with_symbol(
209 instrument.expiry as u64,
210 instrument.strike,
211 option_type,
212 instrument.id.clone(),
213 );
214
215 self.ctx.orderbooks.insert(instrument.id.clone(), orderbook);
216 let trading_mode = hypercall_types::TradingModes::from_db_str(
217 &instrument.trading_mode,
218 );
219 self.ctx
220 .instrument_trading_modes
221 .insert(instrument.id.clone(), trading_mode);
222 debug!("Loaded orderbook for symbol: {}", instrument.id);
223 loaded_count += 1;
224
225 self.expiry_manager
227 .expiry_schedules
228 .entry(expiry_timestamp)
229 .or_default()
230 .push(instrument.id.clone());
231 scheduled_for_expiry += 1;
232 }
233 }
234
235 if !startup_pending_symbols.is_empty() {
236 if let Err(e) = handler
237 .transition_active_instruments_to_expired_pending_sync(
238 &startup_pending_symbols,
239 )
240 {
241 panic!(
242 "CRITICAL_FAILURE: Failed startup transition of expired ACTIVE instruments to EXPIRED_PENDING_PRICE for {:?}: {}. \
243 Persisted state must match in-memory expiry state before continuing.",
244 startup_pending_symbols, e
245 );
246 }
247 info!(
248 "Startup transitioned {} expired ACTIVE instruments to EXPIRED_PENDING_PRICE",
249 startup_pending_symbols.len()
250 );
251 }
252
253 info!(
255 "Expiry rebuild complete: {} instruments loaded, {} expired at startup, {} transitioned to EXPIRED_PENDING_PRICE, {} scheduled for expiry",
256 loaded_count,
257 expired_on_startup,
258 startup_pending_symbols.len(),
259 scheduled_for_expiry
260 );
261
262 metrics::gauge!("ht_expiry_rebuild_loaded").set(loaded_count as f64);
264 metrics::gauge!("ht_expiry_rebuild_pending_price")
265 .set(expired_on_startup as f64);
266 metrics::gauge!("ht_expiry_rebuild_scheduled").set(scheduled_for_expiry as f64);
267
268 info!(
269 "Successfully loaded {} orderbooks from database",
270 loaded_count
271 );
272
273 if loaded_count != self.ctx.orderbooks.len() {
274 warn!(
275 "Mismatch: loaded {} instruments but have {} orderbooks",
276 loaded_count,
277 self.ctx.orderbooks.len()
278 );
279 }
280
281 info!(
282 "UnifiedEngine sync status remains CatchingUp (journal replay and post-startup reconciliation pending)"
283 );
284 }
285 Err(e) => {
286 error!("FATAL: Failed to load instruments from database: {}", e);
287 panic!(
288 "Cannot start UnifiedEngine without loading instruments: {}",
289 e
290 );
291 }
292 }
293 } else {
294 info!("No diesel_handler configured, skipping market loading from database");
296 }
297 }
298
299 pub(super) fn load_orderbook_snapshots(&mut self) {
301 if self.ctx.db.is_none() {
302 self.replay_checkpoint = hypercall_journal::checkpoint::WalCheckpointMetadata::ZERO;
303 self.ctx
304 .l2_update_seq
305 .store(self.replay_checkpoint.last_l2_seq, Ordering::SeqCst);
306 return;
307 }
308
309 let checkpoint_path =
310 hypercall_journal::checkpoint::checkpoint_path_from_config(self.wal_path.as_ref());
311 let checkpoint = hypercall_journal::checkpoint::read_checkpoint(&checkpoint_path)
312 .unwrap_or_else(|e| {
313 panic!(
314 "CRITICAL_FAILURE: Failed to load WAL checkpoint metadata from {}: {}",
315 checkpoint_path.display(),
316 e
317 )
318 });
319
320 self.replay_checkpoint = checkpoint;
321 self.ctx
322 .l2_update_seq
323 .store(checkpoint.last_l2_seq, Ordering::SeqCst);
324 info!(
325 "Loaded WAL checkpoint for recovery: wal_offset={}, last_command_id={}, last_l2_seq={}",
326 checkpoint.wal_offset, checkpoint.last_command_id, checkpoint.last_l2_seq
327 );
328 }
329
330 pub(super) fn check_restored_orderbooks_for_crosses(&self) {
334 let mut crossed_count = 0;
335 let mut crossed_details = Vec::new();
336
337 for (symbol, orderbook) in &self.ctx.orderbooks {
338 if orderbook.is_crossed() {
339 crossed_count += 1;
340 let best_bid = orderbook.get_best_bid();
341 let best_ask = orderbook.get_best_ask();
342 crossed_details.push(format!("{}: bid={:?} ask={:?}", symbol, best_bid, best_ask));
343 }
344 }
345
346 if crossed_count > 0 {
347 error!(
348 "🚨 CRITICAL: {} orderbooks have CROSSED state after restore! \
349 This indicates a matching engine bug - orders should have executed. \
350 Details: {:?}",
351 crossed_count, crossed_details
352 );
353 panic!(
354 "CRITICAL_FAILURE: {} crossed orderbooks after restore: {:?}",
355 crossed_count, crossed_details
356 );
357 }
358 }
359
360 pub(super) fn replay_commands_from_journal(&mut self) {
366 let snapshot_seq = self.ctx.l2_update_seq.load(Ordering::SeqCst);
367 let checkpoint_command_id = self.replay_checkpoint.last_command_id;
368 const REPLAY_CHUNK_SIZE: i64 = 5_000;
369
370 if self.ctx.db.is_none() {
371 debug!("No diesel handler, skipping journal replay");
372 return;
373 }
374
375 let mut replay_count = 0;
376 let mut max_replayed_order_id: u64 = 0;
377 if checkpoint_command_id > 0 && !self.snapshot_loaded {
378 info!(
379 "Recovery base reconstruction: replaying journal commands up to checkpoint command_id {}",
380 checkpoint_command_id
381 );
382 self.replay_command_window(
383 0,
384 Some(checkpoint_command_id),
385 REPLAY_CHUNK_SIZE,
386 &mut replay_count,
387 &mut max_replayed_order_id,
388 );
389 } else if self.snapshot_loaded {
390 info!(
391 "Skipping base reconstruction — engine state snapshot loaded (last_command_id={})",
392 checkpoint_command_id
393 );
394 }
395
396 info!(
397 "Recovery delta replay: replaying commands after checkpoint command_id {}",
398 checkpoint_command_id
399 );
400 self.replay_command_window(
401 checkpoint_command_id,
402 None,
403 REPLAY_CHUNK_SIZE,
404 &mut replay_count,
405 &mut max_replayed_order_id,
406 );
407
408 if max_replayed_order_id >= self.ctx.next_order_id {
422 let old = self.ctx.next_order_id;
423 self.ctx.next_order_id = max_replayed_order_id + 1;
424 info!(
425 "Advanced next_order_id after replay: {} -> {} (max replayed order_id was {})",
426 old, self.ctx.next_order_id, max_replayed_order_id
427 );
428 }
429
430 let max_seq_result = {
435 let Some(ref handler) = self.ctx.db else {
436 warn!(
437 "No diesel handler for max L2 seq query. L2 seq remains at {}",
438 snapshot_seq
439 );
440 return;
441 };
442 let replay: &dyn JournalReplayReader = handler;
443 replay.get_max_l2_seq_from_events_sync()
444 };
445
446 match max_seq_result {
447 Ok(max_seq) => {
448 if max_seq > snapshot_seq {
449 self.ctx.l2_update_seq.store(max_seq, Ordering::SeqCst);
450 info!(
451 "Journal replay complete: {} commands replayed, L2 seq {} -> {}",
452 replay_count, snapshot_seq, max_seq
453 );
454 } else {
455 info!(
456 "Journal replay complete: {} commands replayed, L2 seq unchanged at {}",
457 replay_count, snapshot_seq
458 );
459 }
460 }
461 Err(e) => {
462 panic!(
463 "CRITICAL_FAILURE: Failed to get max L2 seq after replay: {}. \
464 Cannot safely continue with stale sequence {}.",
465 e, snapshot_seq
466 );
467 }
468 }
469
470 for ob in self.ctx.orderbooks.values_mut() {
480 ob.sync_l2_snapshot_baseline();
481 }
482
483 self.flush_startup_expired_order_cancels();
484 }
485
486 fn replay_command_window(
487 &mut self,
488 start_command_id: i64,
489 end_command_id: Option<i64>,
490 chunk_size: i64,
491 replay_count: &mut usize,
492 max_replayed_order_id: &mut u64,
493 ) {
494 let mut cursor = start_command_id;
495 loop {
496 let commands = {
497 let replay: &dyn JournalReplayReader = self
498 .ctx
499 .db
500 .as_ref()
501 .expect("diesel handler required for replay");
502 replay
503 .get_replay_commands_after_command_id_sync(cursor, end_command_id, chunk_size)
504 .unwrap_or_else(|e| {
505 panic!(
506 "CRITICAL_FAILURE: Failed to query replay commands after command_id {} \
507 (end={:?}, limit={}): {}",
508 cursor, end_command_id, chunk_size, e
509 )
510 })
511 };
512
513 if commands.is_empty() {
514 break;
515 }
516
517 let chunk_end_command_id = commands
518 .last()
519 .expect("replay command chunk unexpectedly empty")
520 .command_id;
521 if chunk_end_command_id <= cursor {
522 panic!(
523 "CRITICAL_FAILURE: Replay cursor did not advance (cursor={}, chunk_end={})",
524 cursor, chunk_end_command_id
525 );
526 }
527
528 let mut fill_events_by_command =
529 self.replay_fill_events_by_command_for_window(cursor, chunk_end_command_id);
530 for cmd in commands {
531 self.replay_command_row(&cmd, replay_count, max_replayed_order_id);
532 if let Some(fill_events) = fill_events_by_command.remove(&cmd.command_id) {
533 self.apply_fill_events_for_replay(&fill_events);
534 }
535 }
536 if !fill_events_by_command.is_empty() {
537 let command_ids = fill_events_by_command.keys().copied().collect::<Vec<_>>();
538 panic!(
539 "CRITICAL_FAILURE: Fill replay found OrderFilled events for command_ids {:?} \
540 without matching replay commands in window ({}..={}). Persisted journal is inconsistent.",
541 command_ids, cursor, chunk_end_command_id
542 );
543 }
544
545 self.replay_order_update_events_for_window(cursor, chunk_end_command_id);
546 cursor = chunk_end_command_id;
547 }
548 }
549
550 fn replay_command_row(
551 &mut self,
552 cmd: &hypercall_db::ReplayCommand,
553 replay_count: &mut usize,
554 max_replayed_order_id: &mut u64,
555 ) {
556 if cmd.command_type == "RfqExecute" {
561 if let Ok(rfq_cmd) = rmp_serde::from_slice::<hypercall_runtime_api::RfqExecuteCommand>(
562 &cmd.command_data[1..],
563 ) {
564 if let Some(nonce) = rfq_cmd.taker_nonce {
565 let nonce_signer = rfq_cmd.taker_submit_signer();
566 self.ctx
567 .nonce_sets
568 .entry(nonce_signer)
569 .or_insert_with(|| {
570 hypercall_engine::BoundedNonceSet::new(
571 hypercall_engine::nonce::DEFAULT_NONCE_SET_CAPACITY,
572 )
573 })
574 .insert(nonce);
575 }
576 if let Some(nonce) = rfq_cmd.taker_accept_nonce {
577 let nonce_signer = rfq_cmd.taker_accept_signer();
578 self.ctx
579 .nonce_sets
580 .entry(nonce_signer)
581 .or_insert_with(|| {
582 hypercall_engine::BoundedNonceSet::new(
583 hypercall_engine::nonce::DEFAULT_NONCE_SET_CAPACITY,
584 )
585 })
586 .insert(nonce);
587 }
588 }
589 *replay_count += 1;
590 return;
591 }
592
593 if let Some(command) = decode_state_command_for_replay(cmd) {
594 if Self::is_expiry_replay_command(&command) {
595 self.replay_expiry_command_row(cmd, command);
596 *replay_count += 1;
597 return;
598 }
599 let disposition =
600 crate::rsm::restart_components::replay_disposition_for_command(&command);
601 assert_eq!(
602 disposition,
603 hypercall_recovery::ReplayDisposition::Applied,
604 "CRITICAL_FAILURE: decoded restart-owned command {} has no component replay owner",
605 cmd.command_type
606 );
607 if self.apply_legacy_agent_auth_replay_command(&command) {
608 *replay_count += 1;
609 return;
610 }
611 let timestamp_ms = replay_envelope_timestamp(&command);
612 let output = self
613 .apply(CommandEnvelope::new(timestamp_ms, command))
614 .unwrap_or_else(|e| {
615 panic!(
616 "CRITICAL_FAILURE: Replay failed for {} command {} \
617 (request_id={}): {}",
618 cmd.command_type, cmd.command_id, cmd.request_id, e
619 )
620 });
621 self.apply_replayed_expiry_effects(&output.expiry_effects)
622 .unwrap_or_else(|e| {
623 panic!(
624 "CRITICAL_FAILURE: Replay failed to apply expiry runtime effects for {} \
625 command {} (request_id={}): {}",
626 cmd.command_type, cmd.command_id, cmd.request_id, e
627 )
628 });
629 self.apply_pm_settlement_projection_effects_sync(
630 &output.pm_settlement_effects,
631 &cmd.request_id,
632 )
633 .unwrap_or_else(|e| {
634 panic!(
635 "CRITICAL_FAILURE: Replay failed to apply PM settlement projection effects for {} \
636 command {} (request_id={}): {}",
637 cmd.command_type, cmd.command_id, cmd.request_id, e
638 )
639 });
640 *replay_count += 1;
641 return;
642 }
643
644 let decoded_response = cmd.decode_response();
645 if let Some(resp) = decoded_response.as_ref() {
646 if let Some(order_id) = resp.order_id {
647 *max_replayed_order_id = (*max_replayed_order_id).max(order_id);
648 }
649 }
650
651 let order_action = cmd.decode_command();
652
653 match self.process_order_for_replay(&order_action, decoded_response.as_ref()) {
654 Ok(_) => {
655 *replay_count += 1;
656 debug!(
657 "Replayed command {} (request_id: {})",
658 cmd.command_id, cmd.request_id
659 );
660 }
661 Err(e) => {
662 panic!(
663 "CRITICAL_FAILURE: Replay failed for command {} (request_id: {}): {}",
664 cmd.command_id, cmd.request_id, e
665 );
666 }
667 }
668 }
669
670 fn is_expiry_replay_command(command: &EngineCommand) -> bool {
671 matches!(
672 command,
673 EngineCommand::MarketAction(market_command)
674 if matches!(
675 market_command.message.action,
676 hypercall_types::MarketAction::ExpireMarket
677 )
678 ) || matches!(command, EngineCommand::TickExpiry { .. })
679 }
680
681 fn replay_expiry_command_row(
682 &mut self,
683 cmd: &hypercall_db::ReplayCommand,
684 command: EngineCommand,
685 ) {
686 let timestamp_ms = replay_envelope_timestamp(&command);
687 let output = self
688 .apply(CommandEnvelope::new(timestamp_ms, command))
689 .unwrap_or_else(|error| {
690 panic!(
691 "CRITICAL_FAILURE: Replay failed for expiry command {} \
692 (request_id={}): {}",
693 cmd.command_id, cmd.request_id, error
694 )
695 });
696
697 self.apply_replayed_expiry_effects(&output.expiry_effects)
698 .unwrap_or_else(|error| {
699 panic!(
700 "CRITICAL_FAILURE: Failed to apply replayed expiry effects while \
701 replaying command {} (request_id={}): {}",
702 cmd.command_id, cmd.request_id, error
703 )
704 });
705
706 self.apply_pm_settlement_projection_effects_sync(
707 &output.pm_settlement_effects,
708 &cmd.request_id,
709 )
710 .unwrap_or_else(|error| {
711 panic!(
712 "CRITICAL_FAILURE: Failed to apply replayed PM settlement projection effects while \
713 replaying command {} (request_id={}): {}",
714 cmd.command_id, cmd.request_id, error
715 )
716 });
717
718 if !output.events.is_empty() {
719 self.startup_replayed_events
720 .push((cmd.request_id.clone(), output.events));
721 }
722 }
723
724 pub(super) fn apply_legacy_agent_auth_replay_command(
725 &mut self,
726 command: &EngineCommand,
727 ) -> bool {
728 match command {
733 EngineCommand::ApproveAgent {
734 wallet,
735 agent,
736 expires_at_ms,
737 nonce: None,
738 ..
739 } => {
740 self.ctx
741 .agent_authorizations
742 .entry(*wallet)
743 .or_default()
744 .insert(*agent, *expires_at_ms);
745 debug!(
746 wallet = %wallet,
747 agent = %agent,
748 expires_at_ms = ?expires_at_ms,
749 "Replayed legacy ApproveAgent command without nonce"
750 );
751 true
752 }
753 EngineCommand::RevokeAgent {
754 wallet,
755 agent,
756 nonce: None,
757 ..
758 } => {
759 if let Some(agents) = self.ctx.agent_authorizations.get_mut(wallet) {
760 agents.remove(agent);
761 if agents.is_empty() {
762 self.ctx.agent_authorizations.remove(wallet);
763 }
764 }
765 debug!(
766 wallet = %wallet,
767 agent = %agent,
768 "Replayed legacy RevokeAgent command without nonce"
769 );
770 true
771 }
772 _ => false,
773 }
774 }
775
776 fn replay_fill_events_by_command_for_window(
777 &mut self,
778 start_command_id: i64,
779 end_command_id: i64,
780 ) -> ReplayFillEventsByCommand {
781 let rows = {
782 let replay: &dyn JournalReplayReader = self
783 .ctx
784 .db
785 .as_ref()
786 .expect("diesel handler required for fill replay");
787 replay
788 .get_portfolio_events_for_command_range_sync(start_command_id + 1, end_command_id)
789 .unwrap_or_else(|e| {
790 panic!(
791 "CRITICAL_FAILURE: Failed to query OrderFilled events for replay window \
792 ({}..={}): {}",
793 start_command_id, end_command_id, e
794 )
795 })
796 };
797 if rows.is_empty() {
798 return BTreeMap::new();
799 }
800
801 let mut fill_events_by_command: ReplayFillEventsByCommand = BTreeMap::new();
802 for (idx, row) in rows.into_iter().enumerate() {
803 if row.event_type != hypercall_db::EventType::OrderFilled {
804 continue;
805 }
806 fill_events_by_command
807 .entry(row.command_id)
808 .or_default()
809 .push(Self::decode_fill_event(&row.event_data, idx));
810 }
811 let max_fill_trade_id = fill_events_by_command
812 .values()
813 .flat_map(|fill_events| fill_events.iter().map(|(fill, _)| fill.trade_id))
814 .max()
815 .unwrap_or(0);
816 if max_fill_trade_id >= self.ctx.next_trade_id {
817 let old = self.ctx.next_trade_id;
818 self.ctx.next_trade_id = max_fill_trade_id + 1;
819 info!(
820 "Advanced next_trade_id after replay: {} -> {} (max replayed trade_id was {})",
821 old, self.ctx.next_trade_id, max_fill_trade_id
822 );
823 }
824 fill_events_by_command
825 }
826
827 pub(super) fn apply_fill_events_for_replay(
828 &mut self,
829 fill_events: &[(hypercall_types::Fill, hypercall_types::FillAccounting)],
830 ) {
831 if fill_events.is_empty() {
832 return;
833 }
834
835 let fills: Vec<_> = fill_events.iter().map(|(fill, _)| fill.clone()).collect();
836 self.apply_fills_for_replay(&fills);
837
838 for (fill, accounting) in fill_events {
841 for (wallet, delta) in [
842 (fill.taker_wallet_address, accounting.taker_net_cash_delta),
843 (fill.maker_wallet_address, accounting.maker_net_cash_delta),
844 ] {
845 if delta == rust_decimal::Decimal::ZERO {
846 continue;
847 }
848 let balance_after = self.ctx.balance_ledger.balance(&wallet) + delta;
849 let update = hypercall_types::BalanceUpdate {
850 balance_update_seq: self.ctx.balance_ledger.next_balance_update_seq(),
851 wallet,
852 delta,
853 balance_after,
854 reason: hypercall_types::BalanceUpdateReason::OptionFillPremium,
855 reference_id: Some(fill.trade_id.to_string()),
856 source_command_id: None,
857 timestamp_ms: fill.timestamp,
858 };
859 self.ctx
860 .balance_ledger
861 .apply_balance_update(&update)
862 .unwrap_or_else(|error| {
863 panic!(
864 "CRITICAL: failed to apply replayed fill balance update: {}",
865 error
866 )
867 });
868 }
869
870 use crate::rsm::engine_deps::apply_fill_to_positions;
871 use hypercall_types::to_human_readable_decimal;
872 let human_size = to_human_readable_decimal(&fill.symbol, fill.size);
873 let is_buy = matches!(fill.taker_side, hypercall_types::Side::Buy);
874 let taker_signed = if is_buy { human_size } else { -human_size };
875 apply_fill_to_positions(
876 &mut self.ctx.engine_positions,
877 fill.taker_wallet_address,
878 fill.symbol.clone(),
879 taker_signed,
880 fill.price,
881 );
882 apply_fill_to_positions(
883 &mut self.ctx.engine_positions,
884 fill.maker_wallet_address,
885 fill.symbol.clone(),
886 -taker_signed,
887 fill.price,
888 );
889 }
890 }
891
892 fn replay_order_update_events_for_window(
893 &mut self,
894 start_command_id: i64,
895 end_command_id: i64,
896 ) {
897 let rows = {
898 let replay: &dyn JournalReplayReader = self
899 .ctx
900 .db
901 .as_ref()
902 .expect("diesel handler required for order update replay");
903 replay
904 .get_order_update_events_for_command_range_sync(start_command_id, end_command_id)
905 .unwrap_or_else(|e| {
906 panic!(
907 "CRITICAL_FAILURE: Failed to query OrderUpdate events for replay window \
908 ({}..={}): {}",
909 start_command_id, end_command_id, e
910 )
911 })
912 };
913 if rows.is_empty() {
914 return;
915 }
916
917 let updates = Self::decode_order_update_events(&rows);
918 self.apply_cancel_events_for_replay(&updates);
919 }
920
921 fn decode_fill_events(
922 raw_rows: &[Vec<u8>],
923 ) -> Vec<(hypercall_types::Fill, hypercall_types::FillAccounting)> {
924 raw_rows
925 .iter()
926 .enumerate()
927 .map(|(idx, raw)| Self::decode_fill_event(raw, idx))
928 .collect()
929 }
930
931 fn decode_fill_event(
932 raw: &[u8],
933 idx: usize,
934 ) -> (hypercall_types::Fill, hypercall_types::FillAccounting) {
935 match hypercall_types::EngineMessage::deserialize_from_wire(
936 crate::shared::topics::TOPIC_FILLS,
937 raw,
938 ) {
939 Ok(hypercall_types::EngineMessage::OrderFilled { fill, accounting }) => {
940 (fill, accounting)
941 }
942 Ok(other) => {
943 panic!(
944 "CRITICAL_FAILURE: Expected OrderFilled event at index {} during fill replay, got {}. \
945 Persisted data is corrupt; refusing to continue.",
946 idx,
947 other.type_name()
948 );
949 }
950 Err(e) => {
951 panic!(
952 "CRITICAL_FAILURE: Failed to deserialize OrderFilled event at index {}: {}. \
953 Persisted data is corrupt; refusing to continue.",
954 idx, e
955 );
956 }
957 }
958 }
959
960 fn decode_order_update_events(
961 raw_rows: &[Vec<u8>],
962 ) -> Vec<hypercall_types::OrderUpdateMessage> {
963 raw_rows
964 .iter()
965 .enumerate()
966 .map(|(idx, raw)| {
967 if raw.len() < 2 {
968 panic!(
969 "CRITICAL_FAILURE: OrderUpdate event data too short at index {} ({} bytes). \
970 Persisted data is corrupt.",
971 idx,
972 raw.len()
973 );
974 }
975 let version = raw[0];
976 rmp_serde::from_slice::<hypercall_types::OrderUpdateMessage>(&raw[1..])
977 .unwrap_or_else(|e| {
978 panic!(
979 "CRITICAL_FAILURE: Failed to deserialize OrderUpdate event at index {} \
980 (version={}): {}. Persisted data is corrupt; refusing to continue.",
981 idx, version, e
982 )
983 })
984 })
985 .collect()
986 }
987
988 pub(super) fn run_post_startup_reconciliation(&mut self) {
996 let all_order_ids: Vec<i64> = self
997 .ctx
998 .orderbooks
999 .values()
1000 .flat_map(|ob| ob.get_all_order_ids())
1001 .map(|id| id as i64)
1002 .collect();
1003
1004 if all_order_ids.is_empty() {
1005 info!("Post-startup reconciliation: no orders in orderbook, skipping");
1006 return;
1007 }
1008
1009 if let Some(ref handler) = self.ctx.db {
1010 match handler.get_terminal_order_ids_sync(&all_order_ids) {
1011 Ok(terminal_ids) if !terminal_ids.is_empty() => {
1012 let mut removed = 0;
1013 for order_id in &terminal_ids {
1014 let oid = *order_id as u64;
1015 for ob in self.ctx.orderbooks.values_mut() {
1016 if ob.has_order(oid) {
1017 ob.cancel_order(oid);
1018 removed += 1;
1019 break;
1020 }
1021 }
1022 self.ctx.order_index.remove_order_by_id(oid);
1023 }
1024 if removed > 0 {
1025 warn!(
1026 "Post-startup reconciliation: removed {} ghost orders that were \
1027 terminal in order_infos but present in orderbook after replay.",
1028 removed
1029 );
1030 self.publish_snapshot();
1032 }
1033 }
1034 Ok(_) => {
1035 info!(
1036 "Post-startup reconciliation: checked {} orders, all non-terminal — no ghost orders",
1037 all_order_ids.len()
1038 );
1039 }
1040 Err(e) => {
1041 panic!(
1042 "CRITICAL_FAILURE: Post-startup reconciliation query failed: {}. \
1043 Ghost orders may remain in orderbook; refusing to serve live traffic.",
1044 e
1045 );
1046 }
1047 }
1048 }
1049 }
1050
1051 pub(super) fn process_order_for_replay(
1062 &mut self,
1063 msg: &OrderActionMessage,
1064 response: Option<&OrderUpdateMessage>,
1065 ) -> Result<(), String> {
1066 if let Some(nonce) = msg.info.nonce {
1067 let signer = msg.api_wallet_address.unwrap_or(msg.wallet);
1068 self.ctx
1069 .nonce_sets
1070 .entry(signer)
1071 .or_insert_with(|| {
1072 hypercall_engine::BoundedNonceSet::new(
1073 hypercall_engine::nonce::DEFAULT_NONCE_SET_CAPACITY,
1074 )
1075 })
1076 .insert(nonce);
1077 }
1078 match msg.action {
1079 OrderAction::CreateOrder => self.replay_create_order(msg, response),
1080 OrderAction::CancelOrder => self.replay_cancel_order(msg),
1081 OrderAction::ReplaceOrder => self.replay_replace_order(msg, response),
1082 }
1083 }
1084
1085 fn replay_create_order(
1098 &mut self,
1099 msg: &OrderActionMessage,
1100 response: Option<&OrderUpdateMessage>,
1101 ) -> Result<(), String> {
1102 let symbol = &msg.info.symbol;
1103
1104 if symbol.ends_with("-PERP") || msg.info.is_perp {
1107 return Ok(());
1108 }
1109
1110 if !self.ctx.orderbooks.contains_key(symbol) {
1122 let resp = match response {
1123 Some(r) => r,
1124 None => {
1125 return Err(format!(
1126 "Replay create for missing-orderbook instrument {} has no response; \
1127 cannot determine order state for reconciliation",
1128 symbol
1129 ));
1130 }
1131 };
1132
1133 match resp.status {
1134 OrderUpdateStatus::Open | OrderUpdateStatus::PartiallyFilled => {
1135 let order_id = match (msg.info.order_id, resp.order_id) {
1136 (Some(cmd_id), Some(resp_id)) if cmd_id != resp_id => {
1137 return Err(format!(
1138 "Replay create for missing-orderbook instrument {} has mismatched \
1139 order_id (command={}, response={})",
1140 symbol, cmd_id, resp_id
1141 ));
1142 }
1143 (Some(id), _) | (_, Some(id)) => id,
1144 (None, None) => {
1145 return Err(format!(
1146 "Replay create for missing-orderbook instrument {} has no order_id \
1147 in command or response",
1148 symbol
1149 ));
1150 }
1151 };
1152 let filled_size = match resp.status {
1153 OrderUpdateStatus::PartiallyFilled => {
1154 if resp.filled_size < dec!(0) || resp.filled_size >= msg.info.size {
1155 return Err(format!(
1156 "Replay create for missing-orderbook instrument {} has invalid \
1157 partially-filled payload: size={}, filled={}",
1158 symbol, msg.info.size, resp.filled_size
1159 ));
1160 }
1161 resp.filled_size
1162 }
1163 _ => dec!(0),
1164 };
1165 warn!(
1166 "Replay: order {} on missing-orderbook instrument {} was {:?} — \
1167 collecting for startup DB cancellation",
1168 order_id, symbol, resp.status
1169 );
1170 self.startup_expired_order_cancels
1171 .push(StartupExpiredOrderCancel {
1172 order_id,
1173 wallet: msg.wallet,
1174 symbol: symbol.clone(),
1175 price: msg.info.price,
1176 size: msg.info.size,
1177 side: msg.info.side,
1178 tif: msg.info.tif,
1179 is_perp: msg.info.is_perp,
1180 underlying: msg.info.underlying.clone(),
1181 reduce_only: msg.info.reduce_only,
1182 nonce: msg.info.nonce,
1183 signature: msg.info.signature.clone(),
1184 mmp_enabled: msg.info.mmp_enabled,
1185 filled_size,
1186 client_id: msg.info.client_id.clone(),
1187 timestamp: msg.timestamp,
1188 });
1189 }
1190 OrderUpdateStatus::Acked => {
1191 return Err(format!(
1192 "Replay create for missing-orderbook instrument {} has non-final Acked \
1193 status; refusing to skip reconciliation",
1194 symbol
1195 ));
1196 }
1197 _ => {}
1199 }
1200
1201 return Ok(());
1202 }
1203
1204 let response = match response {
1205 Some(resp) => resp,
1206 None => {
1207 return Err(format!(
1208 "Replay create for symbol {} has no response; refusing to guess order state",
1209 symbol
1210 ));
1211 }
1212 };
1213
1214 let resting_quantity = match response.status {
1218 OrderUpdateStatus::Filled => {
1219 debug!(
1223 "Replay create for {} was fully filled, not adding to orderbook",
1224 symbol
1225 );
1226 return Ok(());
1227 }
1228 OrderUpdateStatus::Canceled | OrderUpdateStatus::Rejected => {
1229 debug!(
1230 "Replay create for {} was {:?}, not adding to orderbook",
1231 symbol, response.status
1232 );
1233 return Ok(());
1234 }
1235 OrderUpdateStatus::Open => {
1236 msg.info.size
1238 }
1239 OrderUpdateStatus::PartiallyFilled => {
1240 if response.filled_size < dec!(0) || response.filled_size >= msg.info.size {
1241 return Err(format!(
1242 "Replay create for symbol {} has invalid partially-filled payload: size={}, filled={}",
1243 symbol, msg.info.size, response.filled_size
1244 ));
1245 }
1246
1247 let remaining = msg.info.size - response.filled_size;
1250 debug!(
1251 "Replay create for {} partially filled: original={}, filled={}, resting={}",
1252 symbol, msg.info.size, response.filled_size, remaining
1253 );
1254 remaining
1255 }
1256 OrderUpdateStatus::Acked => {
1257 return Err(format!(
1258 "Replay create for symbol {} has non-final Acked status; refusing to infer resting quantity",
1259 symbol
1260 ));
1261 }
1262 };
1263
1264 let order_id = match (msg.info.order_id, response.order_id) {
1265 (Some(command_order_id), Some(response_order_id)) => {
1266 if command_order_id != response_order_id {
1267 return Err(format!(
1268 "Replay create for symbol {} has mismatched order_id (command={}, response={})",
1269 symbol, command_order_id, response_order_id
1270 ));
1271 }
1272 command_order_id
1273 }
1274 (Some(order_id), None) | (None, Some(order_id)) => order_id,
1275 (None, None) => {
1276 return Err(format!(
1277 "Replay create for symbol {} with status {:?} has no order_id in command or response",
1278 symbol, response.status
1279 ));
1280 }
1281 };
1282
1283 {
1286 let orderbook = self.ctx.orderbooks.get(symbol).unwrap();
1287
1288 if orderbook.has_order(order_id) {
1289 debug!(
1290 "Order {} already exists in orderbook during replay, skipping",
1291 order_id
1292 );
1293 return Ok(());
1294 }
1295 }
1296
1297 let timestamp = msg.timestamp;
1298 let price = msg.info.price;
1299 let side = msg.info.side;
1300 let wallet = msg.wallet;
1301
1302 let orderbook = self.ctx.orderbooks.get_mut(symbol).unwrap();
1304 orderbook.add_order_with_metadata(
1305 order_id,
1306 price,
1307 resting_quantity,
1308 side,
1309 wallet,
1310 timestamp,
1311 msg.info.client_id.clone(),
1312 msg.info.mmp_enabled,
1313 msg.info.size, );
1315
1316 self.ctx.order_index.remove_order(&wallet, order_id);
1318 self.ctx.order_index.add_order(
1319 &wallet,
1320 hypercall_engine::order_index::OrderSummary {
1321 order_id,
1322 symbol: symbol.clone(),
1323 side,
1324 price,
1325 original_size: msg.info.size,
1326 remaining_size: resting_quantity,
1327 is_perp: msg.info.is_perp,
1328 mmp_enabled: msg.info.mmp_enabled,
1329 client_id: msg.info.client_id.clone(),
1330 created_at: timestamp as i64,
1331 },
1332 );
1333
1334 debug!(
1335 "Replayed create order {}: symbol={}, resting_qty={}, side={:?}",
1336 order_id, symbol, resting_quantity, side
1337 );
1338
1339 Ok(())
1346 }
1347
1348 fn replay_cancel_order(&mut self, msg: &OrderActionMessage) -> Result<(), String> {
1350 let symbol = &msg.info.symbol;
1351 let wallet = &msg.wallet;
1352
1353 if !self.ctx.orderbooks.contains_key(symbol) {
1356 debug!(
1357 "Skipping replay cancel for expired/removed instrument {}",
1358 symbol
1359 );
1360 return Ok(());
1361 }
1362
1363 let orderbook = self.ctx.orderbooks.get_mut(symbol).unwrap();
1365
1366 let order_id = if let Some(oid) = msg.info.order_id {
1368 oid
1369 } else if let Some(ref client_id) = msg.info.client_id {
1370 match self
1372 .ctx
1373 .order_index
1374 .get_order_by_client_id(wallet, client_id)
1375 {
1376 Some((oid, _)) => oid,
1377 None => {
1378 debug!(
1379 "Order with client_id {} not found during replay cancel, may already be cancelled",
1380 client_id
1381 );
1382 return Ok(());
1383 }
1384 }
1385 } else {
1386 return Err("Cancel order has neither order_id nor client_id".to_string());
1387 };
1388
1389 if orderbook.cancel_order_for_replay(order_id).is_none() {
1392 debug!(
1393 "Order {} not present in replay cancel scan for symbol {}, may already be cancelled/filled",
1394 order_id, symbol
1395 );
1396 }
1397
1398 self.ctx.order_index.remove_order(wallet, order_id);
1400
1401 Ok(())
1404 }
1405
1406 fn cancel_order_for_replay_with_fallback(
1412 &mut self,
1413 order_id: u64,
1414 candidate_symbols: Vec<String>,
1415 ) -> Option<String> {
1416 let mut attempted_symbols = Vec::new();
1417
1418 for symbol in candidate_symbols {
1419 if attempted_symbols.iter().any(|seen| seen == &symbol) {
1420 continue;
1421 }
1422 attempted_symbols.push(symbol.clone());
1423
1424 if let Some(orderbook) = self.ctx.orderbooks.get_mut(&symbol) {
1425 if orderbook.cancel_order_for_replay(order_id).is_some() {
1426 return Some(symbol);
1427 }
1428 }
1429 }
1430
1431 let remaining_symbols: Vec<String> = self
1432 .ctx
1433 .orderbooks
1434 .keys()
1435 .filter(|symbol| !attempted_symbols.iter().any(|seen| seen == *symbol))
1436 .cloned()
1437 .collect();
1438
1439 for symbol in remaining_symbols {
1440 if let Some(orderbook) = self.ctx.orderbooks.get_mut(&symbol) {
1441 if orderbook.cancel_order_for_replay(order_id).is_some() {
1442 error!(
1443 "RECOVERY_INVARIANT: order {} was found in unexpected orderbook {} during replay cancel fallback",
1444 order_id, symbol
1445 );
1446 return Some(symbol);
1447 }
1448 }
1449 }
1450
1451 None
1452 }
1453
1454 fn replay_replace_order(
1459 &mut self,
1460 msg: &OrderActionMessage,
1461 response: Option<&OrderUpdateMessage>,
1462 ) -> Result<(), String> {
1463 let wallet = &msg.wallet;
1464
1465 let cancel_order_id = msg
1471 .info
1472 .order_id
1473 .ok_or("Replace order missing order_id for cancel target".to_string())?;
1474
1475 let mut candidate_symbols = Vec::new();
1476 if let Some(index_symbol) = self
1477 .ctx
1478 .order_index
1479 .get_order_symbol(wallet, cancel_order_id)
1480 .map(|s| s.to_string())
1481 {
1482 candidate_symbols.push(index_symbol);
1483 }
1484
1485 if self
1486 .cancel_order_for_replay_with_fallback(cancel_order_id, candidate_symbols)
1487 .is_none()
1488 {
1489 debug!(
1490 "Replay replace: cancel target {} not present in any replay cancel scan",
1491 cancel_order_id
1492 );
1493 }
1494 self.ctx.order_index.remove_order(wallet, cancel_order_id);
1495
1496 if let Some(resp) = response {
1500 if resp.status == OrderUpdateStatus::Rejected {
1501 debug!(
1502 "Replay replace: skipping create phase (rejected) for wallet {}, cancel_target={}",
1503 wallet, cancel_order_id
1504 );
1505 return Ok(());
1506 }
1507 }
1508
1509 let mut create_msg = msg.clone();
1510 create_msg.action = OrderAction::CreateOrder;
1511 if let Some(resp) = response {
1512 create_msg.info.order_id = resp.order_id;
1513 } else {
1514 create_msg.info.order_id = None;
1515 }
1516
1517 self.replay_create_order(&create_msg, response)
1518 }
1519
1520 pub(super) fn apply_fills_for_replay(&mut self, fills: &[hypercall_types::Fill]) {
1531 if fills.is_empty() {
1532 return;
1533 }
1534
1535 let mut applied = 0;
1536 let mut skipped = 0;
1537
1538 for fill in fills {
1539 let fully_filled = self.ctx.order_index.fill_order(
1541 &fill.maker_wallet_address,
1542 fill.maker_order_id,
1543 fill.size,
1544 );
1545
1546 if let Some(orderbook) = self.ctx.orderbooks.get_mut(&fill.symbol) {
1548 if orderbook.has_order(fill.maker_order_id) {
1549 orderbook.reduce_order_quantity(fill.maker_order_id, fill.size);
1550 applied += 1;
1551 } else {
1552 skipped += 1;
1553 }
1554 } else {
1555 if self.ctx.expired_instruments.contains_key(&fill.symbol) {
1560 debug!(
1561 "Replay fill for maker order {} on expired instrument {} \
1562 — no orderbook, fill applied to order_index only",
1563 fill.maker_order_id, fill.symbol
1564 );
1565 } else {
1566 warn!(
1567 "Replay fill for maker order {} references non-existent \
1568 orderbook {} (not in expired_instruments) — fill applied \
1569 to order_index but not orderbook",
1570 fill.maker_order_id, fill.symbol
1571 );
1572 }
1573 skipped += 1;
1574 }
1575
1576 if fully_filled {
1577 debug!(
1578 "Replay fill: maker order {} fully filled (trade_id={})",
1579 fill.maker_order_id, fill.trade_id
1580 );
1581 } else {
1582 debug!(
1583 "Replay fill: maker order {} partially filled by {} (trade_id={})",
1584 fill.maker_order_id, fill.size, fill.trade_id
1585 );
1586 }
1587 }
1588
1589 if applied > 0 || skipped > 0 {
1590 info!(
1591 "Applied {} replay fills ({} skipped - order not in book)",
1592 applied, skipped
1593 );
1594 }
1595 }
1596
1597 pub(super) fn flush_startup_expired_order_cancels(&mut self) {
1598 if self.startup_expired_order_cancels.is_empty() {
1599 return;
1600 }
1601
1602 let count = self.startup_expired_order_cancels.len();
1603 warn!(
1604 "Startup recovery: {} orders on expired instruments need DB cancellation",
1605 count
1606 );
1607
1608 for cancel in &self.startup_expired_order_cancels {
1609 warn!(
1610 " order_id={}, symbol={}, wallet={}, side={:?}, price={}, size={}, filled={}",
1611 cancel.order_id,
1612 cancel.symbol,
1613 cancel.wallet,
1614 cancel.side,
1615 cancel.price,
1616 cancel.size,
1617 cancel.filled_size,
1618 );
1619 }
1620
1621 if let Some(ref handler) = self.ctx.db {
1622 let event_handler =
1623 crate::db_handler::DieselEventHandler::with_pool_no_migrations(handler.pool());
1624 let chunk_size = 5000;
1625 let mut total_rows = 0usize;
1626 for (i, chunk) in self
1627 .startup_expired_order_cancels
1628 .chunks(chunk_size)
1629 .enumerate()
1630 {
1631 match event_handler
1632 .batch_cancel_expired_orders_sync(chunk, "Instrument expired during offline")
1633 {
1634 Ok(rows) => {
1635 total_rows += rows;
1636 info!(
1637 "Startup recovery: cancelled chunk {}, {} rows written ({} in chunk, {} total so far)",
1638 i + 1, rows, chunk.len(), total_rows
1639 );
1640 }
1641 Err(e) => {
1642 error!(
1643 "Startup recovery: failed to cancel chunk {} ({} orders): {}. \
1644 Remaining orders will show stale OPEN status.",
1645 i + 1,
1646 chunk.len(),
1647 e
1648 );
1649 }
1650 }
1651 }
1652 info!(
1653 "Startup recovery: {} candidates, {} total rows cancelled",
1654 count, total_rows
1655 );
1656 metrics::counter!("ht_recovery_expired_order_cancels_total")
1657 .increment(total_rows as u64);
1658 }
1659 self.startup_expired_order_cancels.clear();
1660 }
1661
1662 pub(super) fn apply_cancel_events_for_replay(
1674 &mut self,
1675 order_updates: &[hypercall_types::OrderUpdateMessage],
1676 ) {
1677 if order_updates.is_empty() {
1678 return;
1679 }
1680
1681 let mut applied = 0;
1682 let mut skipped = 0;
1683
1684 for update in order_updates {
1685 let is_terminal = matches!(
1687 update.status,
1688 hypercall_types::OrderUpdateStatus::Canceled
1689 | hypercall_types::OrderUpdateStatus::Filled
1690 | hypercall_types::OrderUpdateStatus::Rejected
1691 );
1692 if !is_terminal {
1693 continue;
1694 }
1695
1696 let Some(order_id) = update.order_id else {
1697 warn!(
1698 "Replay Pass 3: terminal OrderUpdate (status={:?}, wallet={}) \
1699 has no order_id — cannot reconcile, skipping",
1700 update.status, update.wallet_address
1701 );
1702 continue;
1703 };
1704
1705 let wallet = &update.wallet_address;
1706 let symbol = &update.info.symbol;
1707
1708 if let Some(orderbook) = self.ctx.orderbooks.get_mut(symbol) {
1709 if orderbook.cancel_order_for_replay(order_id).is_some() {
1710 applied += 1;
1711 debug!(
1712 "Replay Pass 3: removed order {} (status={:?}, wallet={}, symbol={})",
1713 order_id, update.status, wallet, symbol
1714 );
1715 } else {
1716 skipped += 1;
1717 }
1718 }
1719
1720 self.ctx.order_index.remove_order(wallet, order_id);
1721 }
1722
1723 if applied > 0 || skipped > 0 {
1724 info!(
1725 "Replay Pass 3: applied {} side-effect cancels ({} skipped - not in book)",
1726 applied, skipped
1727 );
1728 }
1729 }
1730}
1731
1732#[cfg(test)]
1733mod tests {
1734 use super::*;
1735 use crate::rsm::MarginMode;
1736 use hypercall_db::SettlementWriter;
1737 use hypercall_engine::FillAccounting;
1738 use hypercall_types::wallet_address::test_wallet;
1739 use hypercall_types::FillSource;
1740 use hypercall_types::Side;
1741 use rust_decimal::Decimal;
1742 use rust_decimal_macros::dec;
1743
1744 #[test]
1745 fn decode_fill_events_accepts_stored_fill_payload() {
1746 let fill = hypercall_types::Fill {
1747 trade_id: 77,
1748 taker_order_id: 101,
1749 maker_order_id: 102,
1750 symbol: "BTC-20261231-100000-C".to_string(),
1751 price: dec!(250),
1752 size: dec!(100000000),
1753 taker_side: Side::Buy,
1754 taker_wallet_address: test_wallet(1),
1755 maker_wallet_address: test_wallet(2),
1756 fee: Decimal::ZERO,
1757 is_taker: true,
1758 timestamp: 1_700_000_000_000,
1759 builder_code_address: None,
1760 builder_code_fee: None,
1761 source: FillSource::Orderbook,
1762 taker_realized_pnl: Some(dec!(10)),
1763 maker_realized_pnl: Some(dec!(-10)),
1764 underlying_notional: Some(dec!(6500000)),
1765 };
1766 let message = hypercall_types::EngineMessage::OrderFilled {
1767 accounting: FillAccounting::from_fill(&fill),
1768 fill: fill.clone(),
1769 };
1770 let wire = message.serialize_inner().expect("serialize OrderFilled");
1771
1772 let decoded = UnifiedEngine::decode_fill_events(&[wire]);
1773 let (decoded_fill, decoded_accounting) = decoded.first().expect("decoded fill");
1774
1775 assert_eq!(decoded.len(), 1);
1776 assert_eq!(decoded_fill.trade_id, fill.trade_id);
1777 assert_eq!(decoded_fill.taker_order_id, fill.taker_order_id);
1778 assert_eq!(decoded_fill.maker_order_id, fill.maker_order_id);
1779 assert_eq!(decoded_fill.symbol, fill.symbol);
1780 assert_eq!(decoded_fill.price, fill.price);
1781 assert_eq!(decoded_fill.size, fill.size);
1782 assert_eq!(decoded_fill.taker_side, fill.taker_side);
1783 assert_eq!(decoded_fill.taker_wallet_address, fill.taker_wallet_address);
1784 assert_eq!(decoded_fill.maker_wallet_address, fill.maker_wallet_address);
1785 assert_eq!(decoded_fill.taker_realized_pnl, fill.taker_realized_pnl);
1786 assert_eq!(decoded_fill.maker_realized_pnl, fill.maker_realized_pnl);
1787 assert_eq!(*decoded_accounting, FillAccounting::from_fill(&fill));
1788 }
1789
1790 #[test]
1791 fn state_command_replay_decodes_hypercore_equity_update() {
1792 let wallet = test_wallet(12);
1793 let mut command_data = vec![1];
1794 command_data.extend(
1795 rmp_serde::to_vec(&(wallet, dec!(1234), 300u64)).expect("serialize replay payload"),
1796 );
1797 let cmd = hypercall_db::ReplayCommand {
1798 command_id: 42,
1799 request_id: "req".to_string(),
1800 command_type: "HypercoreEquityUpdate".to_string(),
1801 command_data,
1802 response_data: None,
1803 };
1804
1805 let decoded = decode_state_command_for_replay(&cmd).expect("decoded state command");
1806
1807 assert_eq!(replay_envelope_timestamp(&decoded), 300);
1808 assert_eq!(
1809 crate::rsm::restart_components::replay_disposition_for_command(&decoded),
1810 hypercall_recovery::ReplayDisposition::Applied
1811 );
1812 assert!(matches!(
1813 decoded,
1814 EngineCommand::HypercoreEquityUpdate {
1815 wallet: actual_wallet,
1816 account_value,
1817 timestamp_ms: 300
1818 } if actual_wallet == wallet && account_value == dec!(1234)
1819 ));
1820 }
1821
1822 #[test]
1823 fn state_command_replay_decodes_price_update() {
1824 let mut command_data = vec![1];
1825 command_data.extend(
1826 rmp_serde::to_vec(&crate::rsm::apply::PriceUpdatePayload {
1827 underlying: "BTC".to_string(),
1828 spot_price: dec!(95000),
1829 timestamp_ms: 123,
1830 })
1831 .expect("serialize PriceUpdate replay payload"),
1832 );
1833 let cmd = hypercall_db::ReplayCommand {
1834 command_id: 43,
1835 request_id: "req".to_string(),
1836 command_type: "PriceUpdate".to_string(),
1837 command_data,
1838 response_data: None,
1839 };
1840
1841 let decoded = decode_state_command_for_replay(&cmd).expect("decoded state command");
1842
1843 assert_eq!(replay_envelope_timestamp(&decoded), 123);
1844 assert!(matches!(
1845 decoded,
1846 EngineCommand::PriceUpdate {
1847 underlying,
1848 spot_price,
1849 timestamp_ms: 123
1850 } if underlying == "BTC" && spot_price == dec!(95000)
1851 ));
1852 }
1853
1854 #[test]
1855 fn state_command_replay_decodes_expiry_commands() {
1856 let market = hypercall_types::Market {
1857 symbol: "BTC-20260619-110000-C".to_string(),
1858 underlying: "BTC".to_string(),
1859 expiry: 20260619,
1860 strike: dec!(110000),
1861 option_type: hypercall_types::OptionType::Call,
1862 };
1863 let expire = hypercall_db::ReplayCommand {
1864 command_id: 44,
1865 request_id: "expire-market".to_string(),
1866 command_type: "ExpireMarket".to_string(),
1867 command_data: hypercall_types::serialize_to_wire_bytes(
1868 &crate::rsm::apply::MarketActionCommand::with_expiry_context(
1869 hypercall_types::MarketActionMessage {
1870 market: market.clone(),
1871 action: hypercall_types::MarketAction::ExpireMarket,
1872 timestamp: 459,
1873 },
1874 crate::rsm::apply::TickExpiryContext {
1875 due_expiries: vec![crate::rsm::apply::TickExpiryDueGroup {
1876 expiry_ts: 20260619,
1877 symbols: vec![market.symbol.clone()],
1878 }],
1879 pending_settlements: Vec::new(),
1880 settlement_prices: vec![crate::rsm::apply::TickExpirySettlementPrice {
1881 underlying: "BTC".to_string(),
1882 expiry_ts: 20260619,
1883 price: dec!(120000),
1884 }],
1885 margin_modes: Vec::new(),
1886 pm_settlements: Vec::new(),
1887 },
1888 ),
1889 ),
1890 response_data: None,
1891 };
1892
1893 let decoded = decode_state_command_for_replay(&expire).expect("decoded expiry command");
1894
1895 assert_eq!(replay_envelope_timestamp(&decoded), 459);
1896 assert!(UnifiedEngine::is_expiry_replay_command(&decoded));
1897 assert!(matches!(
1898 decoded,
1899 EngineCommand::MarketAction(command)
1900 if command.message.action == hypercall_types::MarketAction::ExpireMarket
1901 && command.expiry_context.is_some()
1902 ));
1903
1904 let tick = hypercall_db::ReplayCommand {
1905 command_id: 45,
1906 request_id: "tick-expiry".to_string(),
1907 command_type: "TickExpiry".to_string(),
1908 command_data: hypercall_types::serialize_to_wire_bytes(&(
1909 460u64,
1910 crate::rsm::apply::TickExpiryContext::empty(),
1911 )),
1912 response_data: None,
1913 };
1914
1915 let decoded_tick = decode_state_command_for_replay(&tick).expect("decoded tick command");
1916
1917 assert_eq!(replay_envelope_timestamp(&decoded_tick), 460);
1918 assert!(UnifiedEngine::is_expiry_replay_command(&decoded_tick));
1919 assert!(matches!(decoded_tick, EngineCommand::TickExpiry { .. }));
1920 }
1921
1922 #[tokio::test]
1923 async fn replay_command_row_tick_expiry_never_reconciles_from_durable_settlement() {
1924 let context = crate::rsm::unified_engine::tests::setup_test_context().await;
1925 let diesel_handler = std::sync::Arc::new(
1926 DatabaseHandler::new(&context.database_url).expect("diesel handler should connect"),
1927 );
1928
1929 let wallet = test_wallet(156);
1930 let symbol = "BTC-20261231-100000-C".to_string();
1931 let expiry_ts = hypercall_types::expiry_date_to_timestamp("BTC", 20261231) as u64;
1932 let reference_price = dec!(105000);
1933 let settlement_price = dec!(5000);
1934 let position_size = dec!(1);
1935 let settlement_entry_price = dec!(100000);
1936 let settlement_value = settlement_price * position_size;
1937 let cost_basis = settlement_entry_price * position_size;
1938 let net_pnl = settlement_value - cost_basis;
1939
1940 diesel_handler
1941 .try_apply_settlement_sync(
1942 &wallet,
1943 &symbol,
1944 position_size,
1945 settlement_price,
1946 settlement_value,
1947 MarginMode::Standard,
1948 1_798_675_200_000,
1949 Some(settlement_entry_price),
1950 Some(cost_basis),
1951 Some(net_pnl),
1952 )
1953 .expect("durable settlement should be present before replay");
1954
1955 let (engine_event_tx, _engine_event_rx) = tokio::sync::mpsc::unbounded_channel();
1956 let (shutdown_tx, _) = tokio::sync::broadcast::channel::<()>(1);
1957 let (mut engine, _order_tx, _market_tx) =
1958 UnifiedEngineBuilder::new(crate::rsm::unified_engine::tests::create_test_config())
1959 .with_database(&context.database_url)
1960 .allow_no_database_for_tests()
1961 .with_mock_journal()
1962 .build(engine_event_tx, shutdown_tx);
1963
1964 engine
1965 .expiry_manager
1966 .schedule_expiry(symbol.clone(), expiry_ts);
1967 engine.ctx.engine_positions.insert(
1968 (wallet, symbol.clone()),
1969 crate::rsm::engine_deps::EnginePosition {
1970 quantity: position_size,
1971 entry_price: settlement_entry_price,
1972 },
1973 );
1974 engine.ctx.balance_ledger.set(wallet, dec!(10000));
1975
1976 let cmd = hypercall_db::ReplayCommand {
1977 command_id: 156,
1978 request_id: "tick-expiry-divergent-cash".to_string(),
1979 command_type: "TickExpiry".to_string(),
1980 command_data: hypercall_types::serialize_to_wire_bytes(&(
1981 1_798_675_200_000_u64,
1982 crate::rsm::apply::TickExpiryContext {
1983 due_expiries: vec![crate::rsm::apply::TickExpiryDueGroup {
1984 expiry_ts: expiry_ts as i64,
1985 symbols: vec![symbol],
1986 }],
1987 pending_settlements: Vec::new(),
1988 settlement_prices: vec![crate::rsm::apply::TickExpirySettlementPrice {
1989 underlying: "BTC".to_string(),
1990 expiry_ts: expiry_ts as i64,
1991 price: reference_price,
1992 }],
1993 margin_modes: vec![crate::rsm::apply::TickExpiryWalletMarginMode {
1994 wallet,
1995 margin_mode: MarginMode::Standard,
1996 pm_settlement_required: false,
1997 }],
1998 pm_settlements: Vec::new(),
1999 },
2000 )),
2001 response_data: None,
2002 };
2003
2004 let mut replay_count = 0;
2005 let mut max_replayed_order_id = 0;
2006 engine.replay_command_row(&cmd, &mut replay_count, &mut max_replayed_order_id);
2007
2008 assert_eq!(
2009 engine.ctx.balance_ledger.balance(&wallet),
2010 dec!(15000),
2011 "replay must keep deterministic engine cash and never reconcile from durable settlement rows"
2012 );
2013 }
2014
2015 #[test]
2016 fn state_command_replay_ignores_non_state_commands() {
2017 let cmd = hypercall_db::ReplayCommand {
2018 command_id: 46,
2019 request_id: "req".to_string(),
2020 command_type: "CreateOrder".to_string(),
2021 command_data: vec![1],
2022 response_data: None,
2023 };
2024
2025 assert!(decode_state_command_for_replay(&cmd).is_none());
2026 }
2027
2028 #[test]
2029 fn replay_command_row_applies_legacy_agent_auth_without_nonce() {
2030 let (mut engine, _order_tx, _market_tx) =
2031 crate::rsm::unified_engine::tests::build_replay_test_engine();
2032 let wallet = test_wallet(52);
2033 let agent = test_wallet(53);
2034 let mut replay_count = 0;
2035 let mut max_replayed_order_id = 0;
2036
2037 let approve = hypercall_db::ReplayCommand {
2038 command_id: 45,
2039 request_id: "legacy-approve-agent".to_string(),
2040 command_type: "ApproveAgent".to_string(),
2041 command_data: hypercall_types::serialize_to_wire_bytes(&(wallet, agent, None::<u64>)),
2042 response_data: None,
2043 };
2044
2045 engine.replay_command_row(&approve, &mut replay_count, &mut max_replayed_order_id);
2046
2047 assert_eq!(replay_count, 1);
2048 assert_eq!(max_replayed_order_id, 0);
2049 assert_eq!(
2050 engine
2051 .ctx
2052 .agent_authorizations
2053 .get(&wallet)
2054 .and_then(|agents| agents.get(&agent)),
2055 Some(&None),
2056 "legacy persisted ApproveAgent replay row without nonce must restore auth"
2057 );
2058 let revoke = hypercall_db::ReplayCommand {
2059 command_id: 46,
2060 request_id: "legacy-revoke-agent".to_string(),
2061 command_type: "RevokeAgent".to_string(),
2062 command_data: hypercall_types::serialize_to_wire_bytes(&(wallet, agent)),
2063 response_data: None,
2064 };
2065
2066 engine.replay_command_row(&revoke, &mut replay_count, &mut max_replayed_order_id);
2067
2068 assert_eq!(replay_count, 2);
2069 assert!(
2070 !engine.ctx.agent_authorizations.contains_key(&wallet),
2071 "legacy persisted RevokeAgent replay row without nonce must clear auth"
2072 );
2073 }
2074
2075 #[test]
2076 fn state_command_replay_decodes_named_agent_auth_payloads() {
2077 let wallet = test_wallet(54);
2078 let agent = test_wallet(55);
2079 let approve = hypercall_db::ReplayCommand {
2080 command_id: 47,
2081 request_id: "approve-agent".to_string(),
2082 command_type: "ApproveAgent".to_string(),
2083 command_data: hypercall_types::serialize_to_wire_bytes(
2084 &crate::rsm::apply::ApproveAgentPayload {
2085 wallet,
2086 agent,
2087 expires_at_ms: Some(999),
2088 nonce: Some(123),
2089 timestamp_ms: 456,
2090 },
2091 ),
2092 response_data: None,
2093 };
2094
2095 let decoded = decode_state_command_for_replay(&approve).expect("decoded approve");
2096
2097 assert!(matches!(
2098 decoded,
2099 EngineCommand::ApproveAgent {
2100 wallet: decoded_wallet,
2101 agent: decoded_agent,
2102 expires_at_ms: Some(999),
2103 nonce: Some(123),
2104 timestamp_ms: 456,
2105 } if decoded_wallet == wallet && decoded_agent == agent
2106 ));
2107
2108 let revoke = hypercall_db::ReplayCommand {
2109 command_id: 48,
2110 request_id: "revoke-agent".to_string(),
2111 command_type: "RevokeAgent".to_string(),
2112 command_data: hypercall_types::serialize_to_wire_bytes(
2113 &crate::rsm::apply::RevokeAgentPayload {
2114 wallet,
2115 agent,
2116 nonce: Some(124),
2117 timestamp_ms: 457,
2118 },
2119 ),
2120 response_data: None,
2121 };
2122
2123 let decoded = decode_state_command_for_replay(&revoke).expect("decoded revoke");
2124
2125 assert!(matches!(
2126 decoded,
2127 EngineCommand::RevokeAgent {
2128 wallet: decoded_wallet,
2129 agent: decoded_agent,
2130 nonce: Some(124),
2131 timestamp_ms: 457,
2132 } if decoded_wallet == wallet && decoded_agent == agent
2133 ));
2134 }
2135
2136 #[test]
2137 fn state_command_replay_decodes_named_balance_update_payloads() {
2138 let wallet = test_wallet(56);
2139 let cmd = hypercall_db::ReplayCommand {
2140 command_id: 49,
2141 request_id: "deposit-update".to_string(),
2142 command_type: "DepositUpdate".to_string(),
2143 command_data: hypercall_types::serialize_to_wire_bytes(
2144 &crate::rsm::apply::DepositUpdatePayload {
2145 wallet,
2146 amount: dec!(50),
2147 timestamp_ms: 458,
2148 sequence: 125,
2149 source_event_hash: test_source_hash(125),
2150 },
2151 ),
2152 response_data: None,
2153 };
2154
2155 let decoded = decode_state_command_for_replay(&cmd).expect("decoded deposit update");
2156
2157 assert!(matches!(
2158 decoded,
2159 EngineCommand::DepositUpdate {
2160 wallet: decoded_wallet,
2161 amount,
2162 timestamp_ms: 458,
2163 sequence: Some(125),
2164 source_event_hash,
2165 } if decoded_wallet == wallet && amount == dec!(50) && source_event_hash == test_source_hash(125)
2166 ));
2167 }
2168
2169 #[test]
2170 fn state_command_replay_decodes_legacy_deposit_balance_payloads() {
2171 let wallet = test_wallet(57);
2172 let cmd = hypercall_db::ReplayCommand {
2173 command_id: 50,
2174 request_id: "legacy-deposit-update".to_string(),
2175 command_type: "DepositUpdate".to_string(),
2176 command_data: hypercall_types::serialize_to_wire_bytes(
2177 &crate::rsm::apply::BalanceCommandPayload {
2178 wallet,
2179 amount: dec!(50),
2180 balance_after: dec!(150),
2181 timestamp_ms: 459,
2182 sequence: Some(126),
2183 },
2184 ),
2185 response_data: None,
2186 };
2187
2188 let decoded = decode_state_command_for_replay(&cmd).expect("decoded legacy deposit update");
2189
2190 match decoded {
2191 EngineCommand::DepositUpdate {
2192 wallet: decoded_wallet,
2193 amount,
2194 timestamp_ms,
2195 sequence,
2196 source_event_hash,
2197 } => {
2198 assert_eq!(decoded_wallet, wallet);
2199 assert_eq!(amount, dec!(50));
2200 assert_eq!(timestamp_ms, 459);
2201 assert_eq!(sequence, Some(126));
2202 assert_ne!(source_event_hash, alloy::primitives::FixedBytes::<32>::ZERO);
2203 }
2204 other => panic!("expected DepositUpdate, got {other:?}"),
2205 }
2206 }
2207
2208 #[test]
2209 #[should_panic(expected = "Legacy replay DepositUpdate balance payload missing sequence")]
2210 fn state_command_replay_rejects_legacy_deposit_balance_payloads_without_sequence() {
2211 let wallet = test_wallet(58);
2212 let cmd = hypercall_db::ReplayCommand {
2213 command_id: 51,
2214 request_id: "legacy-deposit-update-no-sequence".to_string(),
2215 command_type: "DepositUpdate".to_string(),
2216 command_data: hypercall_types::serialize_to_wire_bytes(
2217 &crate::rsm::apply::BalanceCommandPayload {
2218 wallet,
2219 amount: dec!(50),
2220 balance_after: dec!(150),
2221 timestamp_ms: 460,
2222 sequence: None,
2223 },
2224 ),
2225 response_data: None,
2226 };
2227
2228 let _ = decode_state_command_for_replay(&cmd);
2229 }
2230}