1use tracing::{debug, warn};
2
3use super::CommandType;
4use crate::rsm::apply::CommandEnvelope;
5use crate::rsm::unified_engine::UnifiedEngine;
6use crate::shared::traits::MarkPriceOracle;
7
8pub struct StandbyEngineHandler {
11 engine: UnifiedEngine,
12}
13
14impl StandbyEngineHandler {
15 pub fn new(engine: UnifiedEngine) -> Self {
16 Self { engine }
17 }
18
19 pub fn into_engine(self) -> UnifiedEngine {
20 self.engine
21 }
22
23 pub fn engine(&self) -> &UnifiedEngine {
24 &self.engine
25 }
26
27 pub fn engine_mut(&mut self) -> &mut UnifiedEngine {
28 &mut self.engine
29 }
30
31 async fn apply_standby_market_effects(
32 engine: &UnifiedEngine,
33 output: &crate::rsm::apply::ApplyOutput,
34 ) -> anyhow::Result<()> {
35 for effect in &output.market_effects {
36 match effect {
37 crate::rsm::apply::MarketEffect::RegisterSettlement {
38 underlying,
39 symbol,
40 expiry_ts,
41 twap_window_seconds,
42 } => {
43 let Some(oracle) = engine.ctx.deps.mark_price_oracles.get(underlying) else {
44 warn!(
45 underlying,
46 symbol,
47 expiry_ts,
48 twap_window_seconds,
49 "Skipping standby TWAP settlement registration because mark price oracle is missing"
50 );
51 continue;
52 };
53 oracle
54 .register_settlement(*expiry_ts, *twap_window_seconds)
55 .await;
56 debug!(
57 underlying,
58 symbol,
59 expiry_ts,
60 "Registered standby TWAP settlement window during replay"
61 );
62 }
63 crate::rsm::apply::MarketEffect::SaveMarketAndInstrument { .. }
64 | crate::rsm::apply::MarketEffect::DeleteMarketAndInstrument { .. } => {
65 debug!("Skipping primary-only market persistence effect during standby replay");
66 }
67 }
68 }
69 Ok(())
70 }
71
72 fn apply_standby_expiry_effects(
73 engine: &mut UnifiedEngine,
74 output: &crate::rsm::apply::ApplyOutput,
75 ) -> anyhow::Result<()> {
76 for effect in &output.expiry_effects {
77 engine
78 .apply_standby_expiry_effect(effect)
79 .map_err(|error| anyhow::anyhow!(error.to_string()))?;
80 }
81 Ok(())
82 }
83
84 fn standby_replay_timestamp(cmd: &crate::rsm::apply::EngineCommand) -> anyhow::Result<u64> {
85 let timestamp = match cmd {
86 crate::rsm::apply::EngineCommand::OrderAction(msg) => msg.timestamp,
87 crate::rsm::apply::EngineCommand::PriceUpdate { timestamp_ms, .. } => *timestamp_ms,
88 crate::rsm::apply::EngineCommand::IvUpdate { timestamp_ms, .. } => *timestamp_ms,
89 crate::rsm::apply::EngineCommand::MarketAction(cmd) => cmd.message.timestamp,
90 crate::rsm::apply::EngineCommand::HypercorePositionUpdate { timestamp_ms, .. } => {
91 *timestamp_ms
92 }
93 crate::rsm::apply::EngineCommand::TradingModeUpdate { timestamp_ms, .. } => {
94 *timestamp_ms
95 }
96 crate::rsm::apply::EngineCommand::TickExpiry { now_ms, .. } => *now_ms,
97 crate::rsm::apply::EngineCommand::DepositUpdate { timestamp_ms, .. }
98 | crate::rsm::apply::EngineCommand::OptionDepositUpdate { timestamp_ms, .. }
99 | crate::rsm::apply::EngineCommand::OptionWithdrawalUpdate { timestamp_ms, .. }
100 | crate::rsm::apply::EngineCommand::CashWithdrawalUpdate { timestamp_ms, .. }
101 | crate::rsm::apply::EngineCommand::LiquidationBonusUpdate { timestamp_ms, .. } => {
102 *timestamp_ms
103 }
104 crate::rsm::apply::EngineCommand::RfqExecute(cmd) => {
105 if cmd.timestamp_ms == 0 || cmd.fill_id.is_empty() {
106 anyhow::bail!(
107 "RfqExecute command {} missing timestamp_ms or fill_id during standby replay",
108 cmd.request_id
109 );
110 }
111 cmd.timestamp_ms
112 }
113 crate::rsm::apply::EngineCommand::NonceAdvance { timestamp_ms, .. }
114 | crate::rsm::apply::EngineCommand::ApproveAgent { timestamp_ms, .. }
115 | crate::rsm::apply::EngineCommand::RevokeAgent { timestamp_ms, .. }
116 | crate::rsm::apply::EngineCommand::HypercoreEquityUpdate { timestamp_ms, .. }
117 | crate::rsm::apply::EngineCommand::SetPmSettlementPoolConfig(
118 hypercall_engine::command::SetPmSettlementPoolConfigCommand {
119 timestamp_ms, ..
120 },
121 )
122 | crate::rsm::apply::EngineCommand::RecordPmVaultDeposit(
123 hypercall_engine::command::RecordPmVaultDepositCommand { timestamp_ms, .. },
124 )
125 | crate::rsm::apply::EngineCommand::RequestPmVaultWithdrawal(
126 hypercall_engine::command::RequestPmVaultWithdrawalCommand { timestamp_ms, .. },
127 )
128 | crate::rsm::apply::EngineCommand::AccruePmSettlementInterest(
129 hypercall_engine::command::AccruePmSettlementInterestCommand {
130 timestamp_ms, ..
131 },
132 )
133 | crate::rsm::apply::EngineCommand::ApplyPmSettlementRepayment(
134 hypercall_engine::command::ApplyPmSettlementRepaymentCommand {
135 timestamp_ms, ..
136 },
137 )
138 | crate::rsm::apply::EngineCommand::JournalPmRecoveryPlan(
139 hypercall_engine::command::JournalPmRecoveryPlanCommand { timestamp_ms, .. },
140 )
141 | crate::rsm::apply::EngineCommand::MarkPmRecoveryActionSubmitted(
142 hypercall_engine::command::MarkPmRecoveryActionSubmittedCommand {
143 timestamp_ms,
144 ..
145 },
146 )
147 | crate::rsm::apply::EngineCommand::ResolvePmRecoveryAction(
148 hypercall_engine::command::ResolvePmRecoveryActionCommand { timestamp_ms, .. },
149 ) => *timestamp_ms,
150 _ => 0,
151 };
152 Ok(timestamp)
153 }
154}
155
156impl super::replay_loop::ReplayHandler for StandbyEngineHandler {
157 async fn handle_command(
158 &mut self,
159 seq: u64,
160 command_type: CommandType,
161 command_version: u8,
162 command_data: Vec<u8>,
163 ) -> anyhow::Result<()> {
164 let cmd =
165 super::deserialize::deserialize_command(command_type, command_version, &command_data)?;
166
167 if let crate::rsm::apply::EngineCommand::RfqExecute(rfq_cmd) = cmd {
168 if rfq_cmd.timestamp_ms == 0 || rfq_cmd.fill_id.is_empty() {
169 anyhow::bail!(
170 "RfqExecute command {} missing timestamp_ms or fill_id during standby replay",
171 rfq_cmd.request_id
172 );
173 }
174 let plan = self.engine.build_rfq_execution_plan_unchecked(&rfq_cmd);
175 self.engine
176 .apply_replayed_events_sync(&plan.events, &rfq_cmd.request_id)
177 .await;
178 debug!(seq, "Replayed RFQ command without live revalidation");
179 return Ok(());
180 }
181
182 let timestamp = Self::standby_replay_timestamp(&cmd)?;
183
184 let is_order = matches!(command_type, CommandType::Order);
185 let req_id = cmd
186 .request_id()
187 .unwrap_or_else(|| format!("nats-replay-{}", seq));
188 let env = CommandEnvelope::new(timestamp, cmd);
189
190 match self.engine.apply(env) {
191 Ok(output) => {
192 if let Some(Err(error)) = &output.rfq_plan {
193 anyhow::bail!("RFQ replay rejected for seq={}: {:?}", seq, error);
194 }
195 Self::apply_standby_market_effects(&self.engine, &output).await?;
196 Self::apply_standby_expiry_effects(&mut self.engine, &output)?;
197 self.engine
198 .apply_pm_settlement_projection_effects_sync(
199 &output.pm_settlement_effects,
200 &req_id,
201 )
202 .map_err(|error| {
203 anyhow::anyhow!(
204 "PM settlement projection replay failed for seq={}: {}",
205 seq,
206 error
207 )
208 })?;
209 self.engine
210 .apply_replayed_events_sync(&output.events, &req_id)
211 .await;
212 debug!(seq, "Replayed command");
213 }
214 Err(e) => {
215 if is_order {
216 debug!(seq, error = %e, "Order rejected during replay (expected)");
217 } else {
218 return Err(anyhow::anyhow!(
219 "Command {:?} failed during replay for seq={}: {}",
220 command_type,
221 seq,
222 e
223 ));
224 }
225 }
226 }
227
228 Ok(())
229 }
230}
231
232#[cfg(test)]
233mod tests {
234 use super::*;
235 use alloy::primitives::Address;
236 use hypercall_types::WalletAddress;
237 use rust_decimal_macros::dec;
238
239 #[test]
240 fn standby_replay_timestamp_uses_cash_withdrawal_timestamp() {
241 let timestamp_ms = 1_700_000_000_000;
242 let cmd = crate::rsm::apply::EngineCommand::CashWithdrawalUpdate {
243 request_id: uuid::Uuid::now_v7().to_string(),
244 wallet: WalletAddress::from(Address::repeat_byte(1)),
245 account: WalletAddress::from(Address::repeat_byte(1)),
246 destination: WalletAddress::from(Address::repeat_byte(2)),
247 signer: WalletAddress::from(Address::repeat_byte(1)),
248 rsm_signer: WalletAddress::from(Address::repeat_byte(3)),
249 amount: dec!(1),
250 amount_wei: 1_000_000,
251 nonce: Some(timestamp_ms + 1),
252 timestamp_ms,
253 };
254
255 assert_eq!(
256 StandbyEngineHandler::standby_replay_timestamp(&cmd)
257 .expect("cash withdrawal timestamp should be available"),
258 timestamp_ms
259 );
260 }
261
262 #[test]
263 fn standby_replay_timestamp_uses_pm_settlement_command_timestamp() {
264 let timestamp_ms = 1_700_000_000_001;
265 let cmd = crate::rsm::apply::EngineCommand::SetPmSettlementPoolConfig(
266 hypercall_engine::command::SetPmSettlementPoolConfigCommand {
267 request_id: uuid::Uuid::from_u128(0x018f_0000_0000_7000_8000_0000_0000_0001),
268 input_digest: "digest-config-btc".to_string(),
269 underlying: "BTC".to_string(),
270 config_version: 1,
271 config: hypercall_margin::portfolio::PmSettlementPoolConfig {
272 target_short_oi_notional_multiplier: dec!(0.10),
273 utilization_kink: dec!(0.60),
274 apr_at_kink: dec!(0.04),
275 max_apr: dec!(4.00),
276 normal_utilization_cap: dec!(0.80),
277 crisis_utilization_cap: dec!(0.98),
278 bridge_window_ms: 86_400_000,
279 policy_version: 1,
280 },
281 timestamp_ms,
282 },
283 );
284
285 assert_eq!(
286 StandbyEngineHandler::standby_replay_timestamp(&cmd)
287 .expect("PM settlement timestamp should be available"),
288 timestamp_ms
289 );
290 }
291}