Skip to main content

hypercall/rsm/
restart_components.rs

1//! Concrete restart-owned state components wired to `hypercall-recovery`.
2
3use crate::rsm::apply::EngineCommand;
4use crate::rsm::engine_deps::EngineCtx;
5use crate::rsm::engine_snapshot::EngineStateDigest;
6use crate::rsm::engine_state_snapshot::{build_snapshot, EngineStateSnapshot};
7use hypercall_journal::checkpoint::WalCheckpointMetadata;
8use hypercall_recovery::{
9    MutationSource, ReplayDisposition, RestartComponentDescriptor, RestartComponentName,
10    RestartStateComponent,
11};
12use std::collections::BTreeSet;
13
14pub struct EngineRecoveryCapture<'a> {
15    pub ctx: &'a EngineCtx,
16    pub checkpoint: Option<&'a WalCheckpointMetadata>,
17    pub l2_seq: i64,
18}
19
20impl<'a> EngineRecoveryCapture<'a> {
21    pub fn for_snapshot(ctx: &'a EngineCtx, checkpoint: &'a WalCheckpointMetadata) -> Self {
22        Self {
23            ctx,
24            checkpoint: Some(checkpoint),
25            l2_seq: checkpoint.last_l2_seq,
26        }
27    }
28
29    pub fn for_digest(ctx: &'a EngineCtx, l2_seq: i64) -> Self {
30        Self {
31            ctx,
32            checkpoint: None,
33            l2_seq,
34        }
35    }
36}
37
38macro_rules! restart_component {
39    (
40        $name:ident,
41        $label:expr,
42        sources: [$($source:expr),* $(,)?],
43        snapshot: [$($snapshot_field:literal),* $(,)?],
44        digest: [$($digest_field:literal),* $(,)?]
45    ) => {
46        pub struct $name;
47
48        impl<'a>
49            RestartStateComponent<
50                EngineRecoveryCapture<'a>,
51                EngineCtx,
52                EngineCommand,
53                EngineStateDigest,
54            > for $name
55        {
56            const NAME: RestartComponentName = $label;
57            type Snapshot = EngineStateSnapshot;
58
59            fn capture(ctx: &EngineRecoveryCapture<'a>) -> Self::Snapshot {
60                PersistentEngineStateComponent::capture(ctx)
61            }
62
63            fn restore(ctx: &mut EngineCtx, snapshot: &Self::Snapshot) {
64                PersistentEngineStateComponent::restore(ctx, snapshot);
65            }
66
67            fn digest(ctx: &EngineRecoveryCapture<'a>) -> EngineStateDigest {
68                PersistentEngineStateComponent::digest(ctx)
69            }
70
71            fn replay(_command: &EngineCommand, _ctx: &mut EngineCtx) -> ReplayDisposition {
72                ReplayDisposition::NotOwned
73            }
74
75            fn mutation_sources() -> &'static [MutationSource] {
76                &[$($source),*]
77            }
78
79            fn descriptor() -> RestartComponentDescriptor {
80                RestartComponentDescriptor {
81                    name: Self::NAME,
82                    mutation_sources: Self::mutation_sources(),
83                    snapshot_fields: &[$($snapshot_field),*],
84                    digest_fields: &[$($digest_field),*],
85                }
86            }
87        }
88    };
89}
90
91restart_component!(
92    OrderbookStateComponent,
93    RestartComponentName::OrderbookState,
94    sources: [MutationSource::UserCommand, MutationSource::RfqExecution, MutationSource::Expiry],
95    snapshot: ["next_order_id", "next_trade_id", "orderbooks"],
96    digest: ["orders_digest", "positions_digest"]
97);
98
99restart_component!(
100    CashLedgerComponent,
101    RestartComponentName::CashLedger,
102    sources: [MutationSource::Deposit, MutationSource::LiquidationBonus, MutationSource::UserCommand],
103    snapshot: ["engine_positions", "balance_ledger", "deposit_update_watermarks", "applied_deposit_source_event_hashes"],
104    digest: ["positions_digest", "cash_digest", "deposit_watermarks_digest"]
105);
106
107restart_component!(
108    MarketCatalogComponent,
109    RestartComponentName::MarketCatalog,
110    sources: [MutationSource::MarketAction, MutationSource::TradingMode, MutationSource::Expiry],
111    snapshot: ["expired_instruments", "instrument_trading_modes"],
112    digest: ["markets_digest", "wallet_margin_modes_digest"]
113);
114
115restart_component!(
116    PriceIvInputsComponent,
117    RestartComponentName::PriceIvInputs,
118    sources: [MutationSource::PriceIngestion, MutationSource::IvIngestion],
119    snapshot: ["spot_prices", "iv_surfaces", "iv_source_timestamps"],
120    digest: ["prices_digest", "iv_source_timestamps_digest"]
121);
122
123restart_component!(
124    HypercoreStateComponent,
125    RestartComponentName::HypercoreState,
126    sources: [MutationSource::HypercoreEquity],
127    snapshot: ["hypercore_account_equity", "hypercore_equity_timestamps", "perp_positions"],
128    digest: ["perp_positions_digest", "hypercore_equity_digest"]
129);
130
131restart_component!(
132    MmpLiquidationComponent,
133    RestartComponentName::MmpLiquidation,
134    sources: [MutationSource::LiquidationBonus, MutationSource::UserCommand],
135    snapshot: ["mmp_state", "mmp_enabled", "liquidation_states"],
136    digest: ["mmp_digest", "mmp_enabled_digest", "liquidation_states_digest"]
137);
138
139restart_component!(
140    AuthNonceComponent,
141    RestartComponentName::AuthNonce,
142    sources: [MutationSource::AgentAuth, MutationSource::NonceCheck],
143    snapshot: ["agent_authorizations", "nonce_watermarks", "nonce_sets"],
144    digest: ["agent_auth_digest", "nonce_sets_digest"]
145);
146
147restart_component!(
148    TierRiskLimitsComponent,
149    RestartComponentName::TierRiskLimits,
150    sources: [MutationSource::TierUpdate, MutationSource::MarginMode],
151    snapshot: ["wallet_margin_modes", "wallet_trading_limits", "default_trading_limits", "wallet_tiers"],
152    digest: ["wallet_margin_modes_digest", "wallet_trading_limits_digest", "wallet_tiers_digest"]
153);
154
155restart_component!(
156    PmSettlementStateComponent,
157    RestartComponentName::PmSettlementState,
158    sources: [MutationSource::UserCommand],
159    snapshot: ["pm_settlement_state"],
160    digest: ["pm_settlement_digest"]
161);
162
163pub struct PersistentEngineStateComponent;
164
165impl<'a>
166    RestartStateComponent<EngineRecoveryCapture<'a>, EngineCtx, EngineCommand, EngineStateDigest>
167    for PersistentEngineStateComponent
168{
169    const NAME: RestartComponentName = RestartComponentName::PersistentEngineStateEnvelope;
170    type Snapshot = EngineStateSnapshot;
171
172    fn capture(ctx: &EngineRecoveryCapture<'a>) -> Self::Snapshot {
173        let checkpoint = ctx.checkpoint.unwrap_or_else(|| {
174            panic!("PersistentEngineStateComponent::capture requires a WAL checkpoint boundary")
175        });
176        build_snapshot(ctx.ctx, checkpoint)
177    }
178
179    fn restore(ctx: &mut EngineCtx, snapshot: &Self::Snapshot) {
180        ctx.restore_from_snapshot(snapshot);
181    }
182
183    fn digest(ctx: &EngineRecoveryCapture<'a>) -> EngineStateDigest {
184        EngineStateDigest::from_ctx(ctx.ctx, ctx.l2_seq)
185    }
186
187    fn replay(_command: &EngineCommand, _ctx: &mut EngineCtx) -> ReplayDisposition {
188        ReplayDisposition::NotOwned
189    }
190
191    fn mutation_sources() -> &'static [MutationSource] {
192        &[]
193    }
194}
195
196pub fn restart_component_descriptors() -> Vec<RestartComponentDescriptor> {
197    vec![
198        OrderbookStateComponent::descriptor(),
199        CashLedgerComponent::descriptor(),
200        MarketCatalogComponent::descriptor(),
201        PriceIvInputsComponent::descriptor(),
202        HypercoreStateComponent::descriptor(),
203        MmpLiquidationComponent::descriptor(),
204        AuthNonceComponent::descriptor(),
205        TierRiskLimitsComponent::descriptor(),
206        PmSettlementStateComponent::descriptor(),
207    ]
208}
209
210pub fn registered_mutation_sources() -> Vec<MutationSource> {
211    let mut sources = BTreeSet::new();
212    for descriptor in restart_component_descriptors() {
213        sources.extend(descriptor.mutation_sources.iter().copied());
214    }
215    sources.into_iter().collect()
216}
217
218pub fn replay_disposition_for_command(command: &EngineCommand) -> ReplayDisposition {
219    match command {
220        EngineCommand::PriceUpdate { .. }
221        | EngineCommand::IvUpdate { .. }
222        | EngineCommand::HypercorePositionUpdate { .. }
223        | EngineCommand::TradingModeUpdate { .. }
224        | EngineCommand::DepositUpdate { .. }
225        | EngineCommand::OptionDepositUpdate { .. }
226        | EngineCommand::OptionWithdrawalUpdate { .. }
227        | EngineCommand::CashWithdrawalUpdate { .. }
228        | EngineCommand::LiquidationBonusUpdate { .. }
229        | EngineCommand::LiquidationState(_)
230        | EngineCommand::TierUpdate { .. }
231        | EngineCommand::LegacyTierMarginModeUpdate { .. }
232        | EngineCommand::MmpConfigUpdate { .. }
233        | EngineCommand::ApproveAgent { .. }
234        | EngineCommand::RevokeAgent { .. }
235        | EngineCommand::NonceAdvance { .. }
236        | EngineCommand::HypercoreEquityUpdate { .. }
237        | EngineCommand::SetPmSettlementPoolConfig(_)
238        | EngineCommand::RecordPmVaultDeposit(_)
239        | EngineCommand::RequestPmVaultWithdrawal(_)
240        | EngineCommand::AccruePmSettlementInterest(_)
241        | EngineCommand::ApplyPmSettlementRepayment(_)
242        | EngineCommand::JournalPmRecoveryPlan(_)
243        | EngineCommand::MarkPmRecoveryActionSubmitted(_)
244        | EngineCommand::ResolvePmRecoveryAction(_) => ReplayDisposition::Applied,
245        _ => ReplayDisposition::NotOwned,
246    }
247}
248
249#[cfg(test)]
250pub fn registered_snapshot_fields() -> BTreeSet<&'static str> {
251    restart_component_descriptors()
252        .into_iter()
253        .flat_map(|descriptor| descriptor.snapshot_fields.iter().copied())
254        .collect()
255}
256
257#[cfg(test)]
258pub fn registered_digest_fields() -> BTreeSet<&'static str> {
259    restart_component_descriptors()
260        .into_iter()
261        .flat_map(|descriptor| descriptor.digest_fields.iter().copied())
262        .collect()
263}