Skip to main content

hypercall/nats/
standby_handler.rs

1use tracing::{debug, warn};
2
3use super::CommandType;
4use crate::rsm::apply::CommandEnvelope;
5use crate::rsm::unified_engine::UnifiedEngine;
6use crate::shared::traits::MarkPriceOracle;
7
8/// Replay handler that applies commands to a standby engine instance.
9/// Deserializes NATS messages via shared deserialize module and calls apply().
10pub struct StandbyEngineHandler {
11    engine: UnifiedEngine,
12}
13
14impl StandbyEngineHandler {
15    pub fn new(engine: UnifiedEngine) -> Self {
16        Self { engine }
17    }
18
19    pub fn into_engine(self) -> UnifiedEngine {
20        self.engine
21    }
22
23    pub fn engine(&self) -> &UnifiedEngine {
24        &self.engine
25    }
26
27    pub fn engine_mut(&mut self) -> &mut UnifiedEngine {
28        &mut self.engine
29    }
30
31    async fn apply_standby_market_effects(
32        engine: &UnifiedEngine,
33        output: &crate::rsm::apply::ApplyOutput,
34    ) -> anyhow::Result<()> {
35        for effect in &output.market_effects {
36            match effect {
37                crate::rsm::apply::MarketEffect::RegisterSettlement {
38                    underlying,
39                    symbol,
40                    expiry_ts,
41                    twap_window_seconds,
42                } => {
43                    let Some(oracle) = engine.ctx.deps.mark_price_oracles.get(underlying) else {
44                        warn!(
45                            underlying,
46                            symbol,
47                            expiry_ts,
48                            twap_window_seconds,
49                            "Skipping standby TWAP settlement registration because mark price oracle is missing"
50                        );
51                        continue;
52                    };
53                    oracle
54                        .register_settlement(*expiry_ts, *twap_window_seconds)
55                        .await;
56                    debug!(
57                        underlying,
58                        symbol,
59                        expiry_ts,
60                        "Registered standby TWAP settlement window during replay"
61                    );
62                }
63                crate::rsm::apply::MarketEffect::SaveMarketAndInstrument { .. }
64                | crate::rsm::apply::MarketEffect::DeleteMarketAndInstrument { .. } => {
65                    debug!("Skipping primary-only market persistence effect during standby replay");
66                }
67            }
68        }
69        Ok(())
70    }
71
72    fn apply_standby_expiry_effects(
73        engine: &mut UnifiedEngine,
74        output: &crate::rsm::apply::ApplyOutput,
75    ) -> anyhow::Result<()> {
76        for effect in &output.expiry_effects {
77            engine
78                .apply_standby_expiry_effect(effect)
79                .map_err(|error| anyhow::anyhow!(error.to_string()))?;
80        }
81        Ok(())
82    }
83
84    fn standby_replay_timestamp(cmd: &crate::rsm::apply::EngineCommand) -> anyhow::Result<u64> {
85        let timestamp = match cmd {
86            crate::rsm::apply::EngineCommand::OrderAction(msg) => msg.timestamp,
87            crate::rsm::apply::EngineCommand::PriceUpdate { timestamp_ms, .. } => *timestamp_ms,
88            crate::rsm::apply::EngineCommand::IvUpdate { timestamp_ms, .. } => *timestamp_ms,
89            crate::rsm::apply::EngineCommand::MarketAction(cmd) => cmd.message.timestamp,
90            crate::rsm::apply::EngineCommand::HypercorePositionUpdate { timestamp_ms, .. } => {
91                *timestamp_ms
92            }
93            crate::rsm::apply::EngineCommand::TradingModeUpdate { timestamp_ms, .. } => {
94                *timestamp_ms
95            }
96            crate::rsm::apply::EngineCommand::TickExpiry { now_ms, .. } => *now_ms,
97            crate::rsm::apply::EngineCommand::DepositUpdate { timestamp_ms, .. }
98            | crate::rsm::apply::EngineCommand::OptionDepositUpdate { timestamp_ms, .. }
99            | crate::rsm::apply::EngineCommand::OptionWithdrawalUpdate { timestamp_ms, .. }
100            | crate::rsm::apply::EngineCommand::CashWithdrawalUpdate { timestamp_ms, .. }
101            | crate::rsm::apply::EngineCommand::LiquidationBonusUpdate { timestamp_ms, .. } => {
102                *timestamp_ms
103            }
104            crate::rsm::apply::EngineCommand::RfqExecute(cmd) => {
105                if cmd.timestamp_ms == 0 || cmd.fill_id.is_empty() {
106                    anyhow::bail!(
107                        "RfqExecute command {} missing timestamp_ms or fill_id during standby replay",
108                        cmd.request_id
109                    );
110                }
111                cmd.timestamp_ms
112            }
113            crate::rsm::apply::EngineCommand::NonceAdvance { timestamp_ms, .. }
114            | crate::rsm::apply::EngineCommand::ApproveAgent { timestamp_ms, .. }
115            | crate::rsm::apply::EngineCommand::RevokeAgent { timestamp_ms, .. }
116            | crate::rsm::apply::EngineCommand::HypercoreEquityUpdate { timestamp_ms, .. }
117            | crate::rsm::apply::EngineCommand::SetPmSettlementPoolConfig(
118                hypercall_engine::command::SetPmSettlementPoolConfigCommand {
119                    timestamp_ms, ..
120                },
121            )
122            | crate::rsm::apply::EngineCommand::RecordPmVaultDeposit(
123                hypercall_engine::command::RecordPmVaultDepositCommand { timestamp_ms, .. },
124            )
125            | crate::rsm::apply::EngineCommand::RequestPmVaultWithdrawal(
126                hypercall_engine::command::RequestPmVaultWithdrawalCommand { timestamp_ms, .. },
127            )
128            | crate::rsm::apply::EngineCommand::AccruePmSettlementInterest(
129                hypercall_engine::command::AccruePmSettlementInterestCommand {
130                    timestamp_ms, ..
131                },
132            )
133            | crate::rsm::apply::EngineCommand::ApplyPmSettlementRepayment(
134                hypercall_engine::command::ApplyPmSettlementRepaymentCommand {
135                    timestamp_ms, ..
136                },
137            )
138            | crate::rsm::apply::EngineCommand::JournalPmRecoveryPlan(
139                hypercall_engine::command::JournalPmRecoveryPlanCommand { timestamp_ms, .. },
140            )
141            | crate::rsm::apply::EngineCommand::MarkPmRecoveryActionSubmitted(
142                hypercall_engine::command::MarkPmRecoveryActionSubmittedCommand {
143                    timestamp_ms,
144                    ..
145                },
146            )
147            | crate::rsm::apply::EngineCommand::ResolvePmRecoveryAction(
148                hypercall_engine::command::ResolvePmRecoveryActionCommand { timestamp_ms, .. },
149            ) => *timestamp_ms,
150            _ => 0,
151        };
152        Ok(timestamp)
153    }
154}
155
156impl super::replay_loop::ReplayHandler for StandbyEngineHandler {
157    async fn handle_command(
158        &mut self,
159        seq: u64,
160        command_type: CommandType,
161        command_version: u8,
162        command_data: Vec<u8>,
163    ) -> anyhow::Result<()> {
164        let cmd =
165            super::deserialize::deserialize_command(command_type, command_version, &command_data)?;
166
167        if let crate::rsm::apply::EngineCommand::RfqExecute(rfq_cmd) = cmd {
168            if rfq_cmd.timestamp_ms == 0 || rfq_cmd.fill_id.is_empty() {
169                anyhow::bail!(
170                    "RfqExecute command {} missing timestamp_ms or fill_id during standby replay",
171                    rfq_cmd.request_id
172                );
173            }
174            let plan = self.engine.build_rfq_execution_plan_unchecked(&rfq_cmd);
175            self.engine
176                .apply_replayed_events_sync(&plan.events, &rfq_cmd.request_id)
177                .await;
178            debug!(seq, "Replayed RFQ command without live revalidation");
179            return Ok(());
180        }
181
182        let timestamp = Self::standby_replay_timestamp(&cmd)?;
183
184        let is_order = matches!(command_type, CommandType::Order);
185        let req_id = cmd
186            .request_id()
187            .unwrap_or_else(|| format!("nats-replay-{}", seq));
188        let env = CommandEnvelope::new(timestamp, cmd);
189
190        match self.engine.apply(env) {
191            Ok(output) => {
192                if let Some(Err(error)) = &output.rfq_plan {
193                    anyhow::bail!("RFQ replay rejected for seq={}: {:?}", seq, error);
194                }
195                Self::apply_standby_market_effects(&self.engine, &output).await?;
196                Self::apply_standby_expiry_effects(&mut self.engine, &output)?;
197                self.engine
198                    .apply_pm_settlement_projection_effects_sync(
199                        &output.pm_settlement_effects,
200                        &req_id,
201                    )
202                    .map_err(|error| {
203                        anyhow::anyhow!(
204                            "PM settlement projection replay failed for seq={}: {}",
205                            seq,
206                            error
207                        )
208                    })?;
209                self.engine
210                    .apply_replayed_events_sync(&output.events, &req_id)
211                    .await;
212                debug!(seq, "Replayed command");
213            }
214            Err(e) => {
215                if is_order {
216                    debug!(seq, error = %e, "Order rejected during replay (expected)");
217                } else {
218                    return Err(anyhow::anyhow!(
219                        "Command {:?} failed during replay for seq={}: {}",
220                        command_type,
221                        seq,
222                        e
223                    ));
224                }
225            }
226        }
227
228        Ok(())
229    }
230}
231
232#[cfg(test)]
233mod tests {
234    use super::*;
235    use alloy::primitives::Address;
236    use hypercall_types::WalletAddress;
237    use rust_decimal_macros::dec;
238
239    #[test]
240    fn standby_replay_timestamp_uses_cash_withdrawal_timestamp() {
241        let timestamp_ms = 1_700_000_000_000;
242        let cmd = crate::rsm::apply::EngineCommand::CashWithdrawalUpdate {
243            request_id: uuid::Uuid::now_v7().to_string(),
244            wallet: WalletAddress::from(Address::repeat_byte(1)),
245            account: WalletAddress::from(Address::repeat_byte(1)),
246            destination: WalletAddress::from(Address::repeat_byte(2)),
247            signer: WalletAddress::from(Address::repeat_byte(1)),
248            rsm_signer: WalletAddress::from(Address::repeat_byte(3)),
249            amount: dec!(1),
250            amount_wei: 1_000_000,
251            nonce: Some(timestamp_ms + 1),
252            timestamp_ms,
253        };
254
255        assert_eq!(
256            StandbyEngineHandler::standby_replay_timestamp(&cmd)
257                .expect("cash withdrawal timestamp should be available"),
258            timestamp_ms
259        );
260    }
261
262    #[test]
263    fn standby_replay_timestamp_uses_pm_settlement_command_timestamp() {
264        let timestamp_ms = 1_700_000_000_001;
265        let cmd = crate::rsm::apply::EngineCommand::SetPmSettlementPoolConfig(
266            hypercall_engine::command::SetPmSettlementPoolConfigCommand {
267                request_id: uuid::Uuid::from_u128(0x018f_0000_0000_7000_8000_0000_0000_0001),
268                input_digest: "digest-config-btc".to_string(),
269                underlying: "BTC".to_string(),
270                config_version: 1,
271                config: hypercall_margin::portfolio::PmSettlementPoolConfig {
272                    target_short_oi_notional_multiplier: dec!(0.10),
273                    utilization_kink: dec!(0.60),
274                    apr_at_kink: dec!(0.04),
275                    max_apr: dec!(4.00),
276                    normal_utilization_cap: dec!(0.80),
277                    crisis_utilization_cap: dec!(0.98),
278                    bridge_window_ms: 86_400_000,
279                    policy_version: 1,
280                },
281                timestamp_ms,
282            },
283        );
284
285        assert_eq!(
286            StandbyEngineHandler::standby_replay_timestamp(&cmd)
287                .expect("PM settlement timestamp should be available"),
288            timestamp_ms
289        );
290    }
291}