1use crate::rsm::apply::EngineCommand;
4use crate::rsm::engine_deps::EngineCtx;
5use crate::rsm::engine_snapshot::EngineStateDigest;
6use crate::rsm::engine_state_snapshot::{build_snapshot, EngineStateSnapshot};
7use hypercall_journal::checkpoint::WalCheckpointMetadata;
8use hypercall_recovery::{
9 MutationSource, ReplayDisposition, RestartComponentDescriptor, RestartComponentName,
10 RestartStateComponent,
11};
12use std::collections::BTreeSet;
13
14pub struct EngineRecoveryCapture<'a> {
15 pub ctx: &'a EngineCtx,
16 pub checkpoint: Option<&'a WalCheckpointMetadata>,
17 pub l2_seq: i64,
18}
19
20impl<'a> EngineRecoveryCapture<'a> {
21 pub fn for_snapshot(ctx: &'a EngineCtx, checkpoint: &'a WalCheckpointMetadata) -> Self {
22 Self {
23 ctx,
24 checkpoint: Some(checkpoint),
25 l2_seq: checkpoint.last_l2_seq,
26 }
27 }
28
29 pub fn for_digest(ctx: &'a EngineCtx, l2_seq: i64) -> Self {
30 Self {
31 ctx,
32 checkpoint: None,
33 l2_seq,
34 }
35 }
36}
37
38macro_rules! restart_component {
39 (
40 $name:ident,
41 $label:expr,
42 sources: [$($source:expr),* $(,)?],
43 snapshot: [$($snapshot_field:literal),* $(,)?],
44 digest: [$($digest_field:literal),* $(,)?]
45 ) => {
46 pub struct $name;
47
48 impl<'a>
49 RestartStateComponent<
50 EngineRecoveryCapture<'a>,
51 EngineCtx,
52 EngineCommand,
53 EngineStateDigest,
54 > for $name
55 {
56 const NAME: RestartComponentName = $label;
57 type Snapshot = EngineStateSnapshot;
58
59 fn capture(ctx: &EngineRecoveryCapture<'a>) -> Self::Snapshot {
60 PersistentEngineStateComponent::capture(ctx)
61 }
62
63 fn restore(ctx: &mut EngineCtx, snapshot: &Self::Snapshot) {
64 PersistentEngineStateComponent::restore(ctx, snapshot);
65 }
66
67 fn digest(ctx: &EngineRecoveryCapture<'a>) -> EngineStateDigest {
68 PersistentEngineStateComponent::digest(ctx)
69 }
70
71 fn replay(_command: &EngineCommand, _ctx: &mut EngineCtx) -> ReplayDisposition {
72 ReplayDisposition::NotOwned
73 }
74
75 fn mutation_sources() -> &'static [MutationSource] {
76 &[$($source),*]
77 }
78
79 fn descriptor() -> RestartComponentDescriptor {
80 RestartComponentDescriptor {
81 name: Self::NAME,
82 mutation_sources: Self::mutation_sources(),
83 snapshot_fields: &[$($snapshot_field),*],
84 digest_fields: &[$($digest_field),*],
85 }
86 }
87 }
88 };
89}
90
91restart_component!(
92 OrderbookStateComponent,
93 RestartComponentName::OrderbookState,
94 sources: [MutationSource::UserCommand, MutationSource::RfqExecution, MutationSource::Expiry],
95 snapshot: ["next_order_id", "next_trade_id", "orderbooks"],
96 digest: ["orders_digest", "positions_digest"]
97);
98
99restart_component!(
100 CashLedgerComponent,
101 RestartComponentName::CashLedger,
102 sources: [MutationSource::Deposit, MutationSource::LiquidationBonus, MutationSource::UserCommand],
103 snapshot: ["engine_positions", "balance_ledger", "deposit_update_watermarks", "applied_deposit_source_event_hashes"],
104 digest: ["positions_digest", "cash_digest", "deposit_watermarks_digest"]
105);
106
107restart_component!(
108 MarketCatalogComponent,
109 RestartComponentName::MarketCatalog,
110 sources: [MutationSource::MarketAction, MutationSource::TradingMode, MutationSource::Expiry],
111 snapshot: ["expired_instruments", "instrument_trading_modes"],
112 digest: ["markets_digest", "wallet_margin_modes_digest"]
113);
114
115restart_component!(
116 PriceIvInputsComponent,
117 RestartComponentName::PriceIvInputs,
118 sources: [MutationSource::PriceIngestion, MutationSource::IvIngestion],
119 snapshot: ["spot_prices", "iv_surfaces", "iv_source_timestamps"],
120 digest: ["prices_digest", "iv_source_timestamps_digest"]
121);
122
123restart_component!(
124 HypercoreStateComponent,
125 RestartComponentName::HypercoreState,
126 sources: [MutationSource::HypercoreEquity],
127 snapshot: ["hypercore_account_equity", "hypercore_equity_timestamps", "perp_positions"],
128 digest: ["perp_positions_digest", "hypercore_equity_digest"]
129);
130
131restart_component!(
132 MmpLiquidationComponent,
133 RestartComponentName::MmpLiquidation,
134 sources: [MutationSource::LiquidationBonus, MutationSource::UserCommand],
135 snapshot: ["mmp_state", "mmp_enabled", "liquidation_states"],
136 digest: ["mmp_digest", "mmp_enabled_digest", "liquidation_states_digest"]
137);
138
139restart_component!(
140 AuthNonceComponent,
141 RestartComponentName::AuthNonce,
142 sources: [MutationSource::AgentAuth, MutationSource::NonceCheck],
143 snapshot: ["agent_authorizations", "nonce_watermarks", "nonce_sets"],
144 digest: ["agent_auth_digest", "nonce_sets_digest"]
145);
146
147restart_component!(
148 TierRiskLimitsComponent,
149 RestartComponentName::TierRiskLimits,
150 sources: [MutationSource::TierUpdate, MutationSource::MarginMode],
151 snapshot: ["wallet_margin_modes", "wallet_trading_limits", "default_trading_limits", "wallet_tiers"],
152 digest: ["wallet_margin_modes_digest", "wallet_trading_limits_digest", "wallet_tiers_digest"]
153);
154
155restart_component!(
156 PmSettlementStateComponent,
157 RestartComponentName::PmSettlementState,
158 sources: [MutationSource::UserCommand],
159 snapshot: ["pm_settlement_state"],
160 digest: ["pm_settlement_digest"]
161);
162
163pub struct PersistentEngineStateComponent;
164
165impl<'a>
166 RestartStateComponent<EngineRecoveryCapture<'a>, EngineCtx, EngineCommand, EngineStateDigest>
167 for PersistentEngineStateComponent
168{
169 const NAME: RestartComponentName = RestartComponentName::PersistentEngineStateEnvelope;
170 type Snapshot = EngineStateSnapshot;
171
172 fn capture(ctx: &EngineRecoveryCapture<'a>) -> Self::Snapshot {
173 let checkpoint = ctx.checkpoint.unwrap_or_else(|| {
174 panic!("PersistentEngineStateComponent::capture requires a WAL checkpoint boundary")
175 });
176 build_snapshot(ctx.ctx, checkpoint)
177 }
178
179 fn restore(ctx: &mut EngineCtx, snapshot: &Self::Snapshot) {
180 ctx.restore_from_snapshot(snapshot);
181 }
182
183 fn digest(ctx: &EngineRecoveryCapture<'a>) -> EngineStateDigest {
184 EngineStateDigest::from_ctx(ctx.ctx, ctx.l2_seq)
185 }
186
187 fn replay(_command: &EngineCommand, _ctx: &mut EngineCtx) -> ReplayDisposition {
188 ReplayDisposition::NotOwned
189 }
190
191 fn mutation_sources() -> &'static [MutationSource] {
192 &[]
193 }
194}
195
196pub fn restart_component_descriptors() -> Vec<RestartComponentDescriptor> {
197 vec![
198 OrderbookStateComponent::descriptor(),
199 CashLedgerComponent::descriptor(),
200 MarketCatalogComponent::descriptor(),
201 PriceIvInputsComponent::descriptor(),
202 HypercoreStateComponent::descriptor(),
203 MmpLiquidationComponent::descriptor(),
204 AuthNonceComponent::descriptor(),
205 TierRiskLimitsComponent::descriptor(),
206 PmSettlementStateComponent::descriptor(),
207 ]
208}
209
210pub fn registered_mutation_sources() -> Vec<MutationSource> {
211 let mut sources = BTreeSet::new();
212 for descriptor in restart_component_descriptors() {
213 sources.extend(descriptor.mutation_sources.iter().copied());
214 }
215 sources.into_iter().collect()
216}
217
218pub fn replay_disposition_for_command(command: &EngineCommand) -> ReplayDisposition {
219 match command {
220 EngineCommand::PriceUpdate { .. }
221 | EngineCommand::IvUpdate { .. }
222 | EngineCommand::HypercorePositionUpdate { .. }
223 | EngineCommand::TradingModeUpdate { .. }
224 | EngineCommand::DepositUpdate { .. }
225 | EngineCommand::OptionDepositUpdate { .. }
226 | EngineCommand::OptionWithdrawalUpdate { .. }
227 | EngineCommand::CashWithdrawalUpdate { .. }
228 | EngineCommand::LiquidationBonusUpdate { .. }
229 | EngineCommand::LiquidationState(_)
230 | EngineCommand::TierUpdate { .. }
231 | EngineCommand::LegacyTierMarginModeUpdate { .. }
232 | EngineCommand::MmpConfigUpdate { .. }
233 | EngineCommand::ApproveAgent { .. }
234 | EngineCommand::RevokeAgent { .. }
235 | EngineCommand::NonceAdvance { .. }
236 | EngineCommand::HypercoreEquityUpdate { .. }
237 | EngineCommand::SetPmSettlementPoolConfig(_)
238 | EngineCommand::RecordPmVaultDeposit(_)
239 | EngineCommand::RequestPmVaultWithdrawal(_)
240 | EngineCommand::AccruePmSettlementInterest(_)
241 | EngineCommand::ApplyPmSettlementRepayment(_)
242 | EngineCommand::JournalPmRecoveryPlan(_)
243 | EngineCommand::MarkPmRecoveryActionSubmitted(_)
244 | EngineCommand::ResolvePmRecoveryAction(_) => ReplayDisposition::Applied,
245 _ => ReplayDisposition::NotOwned,
246 }
247}
248
249#[cfg(test)]
250pub fn registered_snapshot_fields() -> BTreeSet<&'static str> {
251 restart_component_descriptors()
252 .into_iter()
253 .flat_map(|descriptor| descriptor.snapshot_fields.iter().copied())
254 .collect()
255}
256
257#[cfg(test)]
258pub fn registered_digest_fields() -> BTreeSet<&'static str> {
259 restart_component_descriptors()
260 .into_iter()
261 .flat_map(|descriptor| descriptor.digest_fields.iter().copied())
262 .collect()
263}