Skip to main content

hypercall/nats/
deserialize.rs

1use super::{CommandType, COMMAND_WIRE_VERSION_V1, LEGACY_COMMAND_WIRE_VERSION};
2use crate::rsm::apply::{
3    AccruePmSettlementInterestCommand, ApplyPmSettlementRepaymentCommand, ApproveAgentPayload,
4    BalanceCommandPayload, DepositUpdatePayload, EngineCommand, HypercoreEquityUpdatePayload,
5    HypercorePositionUpdatePayload, IvUpdatePayload, JournalPmRecoveryPlanCommand,
6    MarkPmRecoveryActionSubmittedCommand, MarketActionCommand, MmpConfigUpdatePayload,
7    NonceAdvancePayload, PriceUpdatePayload, RecordPmVaultDepositCommand,
8    RequestPmVaultWithdrawalCommand, ResolvePmRecoveryActionCommand, RevokeAgentPayload,
9    SetPmSettlementPoolConfigCommand, TierMarginModeUpdatePayload, TierUpdatePayload,
10    TradingModeUpdatePayload,
11};
12use crate::rsm::margin_mode::MarginMode;
13use alloy::primitives::FixedBytes;
14use hypercall_types::WalletAddress;
15use hypercall_types::{LiquidationStateMessage, OrderActionMessage};
16use tracing::warn;
17
18fn from_wire_bytes<T: serde::de::DeserializeOwned>(data: &[u8]) -> anyhow::Result<T> {
19    if data.is_empty() {
20        return Err(anyhow::anyhow!("Empty command data"));
21    }
22    if data[0] != hypercall_types::WIRE_FORMAT_VERSION {
23        anyhow::bail!("Unsupported inner wire version: {}", data[0]);
24    }
25    rmp_serde::from_slice(&data[1..]).map_err(|e| anyhow::anyhow!("Deserialize failed: {}", e))
26}
27
28fn from_named_wire_bytes<T: serde::de::DeserializeOwned>(data: &[u8]) -> anyhow::Result<T> {
29    if data.len() < 2 {
30        anyhow::bail!("Named command data missing MessagePack payload");
31    }
32    if !is_msgpack_map_marker(data[1]) {
33        anyhow::bail!(
34            "Named command payload must be encoded as a MessagePack map, got marker 0x{:02x}",
35            data[1]
36        );
37    }
38    from_wire_bytes(data)
39}
40
41fn is_msgpack_map_marker(marker: u8) -> bool {
42    matches!(marker, 0x80..=0x8f | 0xde | 0xdf)
43}
44
45type LegacyMmpConfigUpdatePayload = (
46    hypercall_types::WalletAddress,
47    String,
48    bool,
49    i64,
50    i64,
51    Option<f64>,
52    Option<f64>,
53    Option<f64>,
54);
55
56/// Deserialize NATS wire bytes into an EngineCommand.
57pub fn deserialize_command(
58    cmd_type: CommandType,
59    command_version: u8,
60    data: &[u8],
61) -> anyhow::Result<EngineCommand> {
62    match command_version {
63        LEGACY_COMMAND_WIRE_VERSION => deserialize_legacy_command(cmd_type, data),
64        COMMAND_WIRE_VERSION_V1 => deserialize_command_v1(cmd_type, data),
65        other => anyhow::bail!(
66            "Unsupported NATS command wire version {} for {:?}",
67            other,
68            cmd_type
69        ),
70    }
71}
72
73/// Deserialize a persisted engine command during restart recovery.
74///
75/// This intentionally has a narrower compatibility surface than live NATS. The
76/// source-hash cutover changed `DepositUpdate` from `BalanceCommandPayload` to
77/// `DepositUpdatePayload`, but existing WAL rows can still contain the old
78/// named payload. Recovery accepts only that persisted shape and only when a
79/// sequence exists, so live messages stay on the new deterministic source-hash
80/// contract.
81///
82/// Compatibility exception audit:
83/// - Tracking ticket: CALL-1900.
84/// - Environment: AWS staging restart recovery after PR #2555, observed at
85///   command_id=284965 with command_data_len=73.
86/// - Historical shape: legacy wire-version tuple
87///   `(WalletAddress, Decimal amount, Decimal balance_after, u64 timestamp_ms,
88///   Option<u64> sequence)`.
89/// - Hard cutover risk: staging cannot boot far enough to replay the journal
90///   and write a current snapshot while the historical WAL row is undecodable.
91/// - Removal plan: remove the legacy tuple branch after AWS staging boots on
92///   the hotfix image, replays the row, writes a compatible snapshot, and
93///   restarts without `ENGINE_SNAPSHOT_RESTORE_ALLOW_UNSAFE_REPLAY=true`.
94pub(crate) fn deserialize_command_for_replay(
95    cmd_type: CommandType,
96    command_version: u8,
97    data: &[u8],
98) -> anyhow::Result<EngineCommand> {
99    match (cmd_type, command_version) {
100        (CommandType::DepositUpdate, COMMAND_WIRE_VERSION_V1) => {
101            deserialize_deposit_update_for_replay(data)
102        }
103        (CommandType::DepositUpdate, LEGACY_COMMAND_WIRE_VERSION) => {
104            deserialize_legacy_deposit_update_for_replay(data)
105        }
106        _ => deserialize_command(cmd_type, command_version, data),
107    }
108}
109
110fn deserialize_command_v1(cmd_type: CommandType, data: &[u8]) -> anyhow::Result<EngineCommand> {
111    match cmd_type {
112        CommandType::Order => {
113            let msg: OrderActionMessage = from_named_wire_bytes(data)?;
114            Ok(EngineCommand::OrderAction(msg))
115        }
116        CommandType::PriceUpdate => {
117            let p: PriceUpdatePayload = from_named_wire_bytes(data)?;
118            Ok(price_update_command(p))
119        }
120        CommandType::IvUpdate => {
121            let p: IvUpdatePayload = from_named_wire_bytes(data)?;
122            Ok(iv_update_command(p))
123        }
124        CommandType::MarketAction => {
125            let cmd: MarketActionCommand = from_named_wire_bytes(data)?;
126            Ok(EngineCommand::MarketAction(cmd))
127        }
128        CommandType::LiquidationState => {
129            let msg: LiquidationStateMessage = from_named_wire_bytes(data)?;
130            Ok(EngineCommand::LiquidationState(msg))
131        }
132        CommandType::TierUpdate => deserialize_tier_update_v1(data),
133        CommandType::HypercorePositionUpdate => {
134            let payload: HypercorePositionUpdatePayload = from_named_wire_bytes(data)?;
135            Ok(EngineCommand::HypercorePositionUpdate {
136                account: payload.account,
137                coin: payload.coin,
138                size: payload.size,
139                entry_price: payload.entry_price,
140                unrealized_pnl: payload.unrealized_pnl,
141                timestamp_ms: payload.timestamp_ms,
142            })
143        }
144        CommandType::MmpConfigUpdate => {
145            let payload: MmpConfigUpdatePayload = from_named_wire_bytes(data)?;
146            Ok(EngineCommand::MmpConfigUpdate {
147                wallet: payload.wallet,
148                currency: payload.currency,
149                enabled: payload.enabled,
150                interval_ms: payload.interval_ms,
151                frozen_time_ms: payload.frozen_time_ms,
152                qty_limit: payload.qty_limit,
153                delta_limit: payload.delta_limit,
154                vega_limit: payload.vega_limit,
155            })
156        }
157        CommandType::RfqExecute => {
158            let cmd: hypercall_runtime_api::RfqExecuteCommand = from_named_wire_bytes(data)?;
159            Ok(EngineCommand::RfqExecute(cmd))
160        }
161        CommandType::TradingModeUpdate => {
162            let payload: TradingModeUpdatePayload = from_named_wire_bytes(data)?;
163            Ok(EngineCommand::TradingModeUpdate {
164                modes: payload.modes,
165                timestamp_ms: payload.timestamp_ms,
166            })
167        }
168        CommandType::TickExpiry => deserialize_tick_expiry(data),
169        CommandType::ApproveAgent => {
170            let payload: ApproveAgentPayload = from_named_wire_bytes(data)?;
171            Ok(EngineCommand::ApproveAgent {
172                wallet: payload.wallet,
173                agent: payload.agent,
174                expires_at_ms: payload.expires_at_ms,
175                nonce: payload.nonce,
176                timestamp_ms: payload.timestamp_ms,
177            })
178        }
179        CommandType::RevokeAgent => {
180            let payload: RevokeAgentPayload = from_named_wire_bytes(data)?;
181            Ok(EngineCommand::RevokeAgent {
182                wallet: payload.wallet,
183                agent: payload.agent,
184                nonce: payload.nonce,
185                timestamp_ms: payload.timestamp_ms,
186            })
187        }
188        CommandType::NonceAdvance => {
189            let payload: NonceAdvancePayload = from_named_wire_bytes(data)?;
190            Ok(EngineCommand::NonceAdvance {
191                wallet: payload.wallet,
192                nonce: payload.nonce,
193                timestamp_ms: payload.timestamp_ms,
194            })
195        }
196        CommandType::HypercoreEquityUpdate => {
197            let payload: HypercoreEquityUpdatePayload = from_named_wire_bytes(data)?;
198            Ok(EngineCommand::HypercoreEquityUpdate {
199                wallet: payload.wallet,
200                account_value: payload.account_value,
201                timestamp_ms: payload.timestamp_ms,
202            })
203        }
204        CommandType::DepositUpdate => {
205            let payload: DepositUpdatePayload = from_named_wire_bytes(data)?;
206            Ok(deposit_update_command(payload))
207        }
208        CommandType::LiquidationBonusUpdate => {
209            let payload: BalanceCommandPayload = from_named_wire_bytes(data)?;
210            Ok(balance_update_command(payload))
211        }
212        CommandType::OptionDepositUpdate => {
213            let (request_id, wallet, symbol, quantity, timestamp_ms): (
214                String,
215                hypercall_types::WalletAddress,
216                String,
217                rust_decimal::Decimal,
218                u64,
219            ) = from_wire_bytes(data)?;
220            Ok(EngineCommand::OptionDepositUpdate {
221                request_id,
222                wallet,
223                symbol,
224                quantity,
225                timestamp_ms,
226            })
227        }
228        CommandType::OptionWithdrawalUpdate => {
229            let (
230                request_id,
231                wallet,
232                account,
233                signer,
234                rsm_signer,
235                symbol,
236                quantity,
237                nonce,
238                action,
239                timestamp_ms,
240            ): (
241                String,
242                hypercall_types::WalletAddress,
243                hypercall_types::WalletAddress,
244                hypercall_types::WalletAddress,
245                hypercall_types::WalletAddress,
246                String,
247                rust_decimal::Decimal,
248                Option<u64>,
249                Vec<u8>,
250                u64,
251            ) = from_wire_bytes(data)?;
252            Ok(EngineCommand::OptionWithdrawalUpdate {
253                request_id,
254                wallet,
255                account,
256                signer,
257                rsm_signer,
258                symbol,
259                quantity,
260                nonce,
261                action,
262                timestamp_ms,
263            })
264        }
265        CommandType::CashWithdrawalUpdate => {
266            let (
267                request_id,
268                wallet,
269                account,
270                destination,
271                signer,
272                rsm_signer,
273                amount,
274                amount_wei,
275                nonce,
276                timestamp_ms,
277            ): (
278                String,
279                hypercall_types::WalletAddress,
280                hypercall_types::WalletAddress,
281                hypercall_types::WalletAddress,
282                hypercall_types::WalletAddress,
283                hypercall_types::WalletAddress,
284                rust_decimal::Decimal,
285                u64,
286                Option<u64>,
287                u64,
288            ) = from_wire_bytes(data)?;
289            Ok(EngineCommand::CashWithdrawalUpdate {
290                request_id,
291                wallet,
292                account,
293                destination,
294                signer,
295                rsm_signer,
296                amount,
297                amount_wei,
298                nonce,
299                timestamp_ms,
300            })
301        }
302        CommandType::SetPmSettlementPoolConfig => {
303            let command: SetPmSettlementPoolConfigCommand = from_named_wire_bytes(data)?;
304            Ok(EngineCommand::SetPmSettlementPoolConfig(command))
305        }
306        CommandType::RecordPmVaultDeposit => {
307            let command: RecordPmVaultDepositCommand = from_named_wire_bytes(data)?;
308            Ok(EngineCommand::RecordPmVaultDeposit(command))
309        }
310        CommandType::RequestPmVaultWithdrawal => {
311            let command: RequestPmVaultWithdrawalCommand = from_named_wire_bytes(data)?;
312            Ok(EngineCommand::RequestPmVaultWithdrawal(command))
313        }
314        CommandType::AccruePmSettlementInterest => {
315            let command: AccruePmSettlementInterestCommand = from_named_wire_bytes(data)?;
316            Ok(EngineCommand::AccruePmSettlementInterest(command))
317        }
318        CommandType::ApplyPmSettlementRepayment => {
319            let command: ApplyPmSettlementRepaymentCommand = from_named_wire_bytes(data)?;
320            Ok(EngineCommand::ApplyPmSettlementRepayment(command))
321        }
322        CommandType::JournalPmRecoveryPlan => {
323            let command: JournalPmRecoveryPlanCommand = from_named_wire_bytes(data)?;
324            Ok(EngineCommand::JournalPmRecoveryPlan(command))
325        }
326        CommandType::MarkPmRecoveryActionSubmitted => {
327            let command: MarkPmRecoveryActionSubmittedCommand = from_named_wire_bytes(data)?;
328            Ok(EngineCommand::MarkPmRecoveryActionSubmitted(command))
329        }
330        CommandType::ResolvePmRecoveryAction => {
331            let command: ResolvePmRecoveryActionCommand = from_named_wire_bytes(data)?;
332            Ok(EngineCommand::ResolvePmRecoveryAction(command))
333        }
334    }
335}
336
337fn deserialize_deposit_update_for_replay(data: &[u8]) -> anyhow::Result<EngineCommand> {
338    match from_named_wire_bytes::<DepositUpdatePayload>(data) {
339        Ok(payload) => return Ok(deposit_update_command(payload)),
340        Err(current_err) => {
341            let legacy_payload: BalanceCommandPayload =
342                from_named_wire_bytes(data).map_err(|legacy_err| {
343                    anyhow::anyhow!(
344                        "Deserialize replay DepositUpdate failed as current payload ({}) and legacy balance payload ({})",
345                        current_err,
346                        legacy_err
347                    )
348                })?;
349            let sequence = legacy_payload.sequence.ok_or_else(|| {
350                anyhow::anyhow!(
351                    "Legacy replay DepositUpdate balance payload missing sequence; cannot synthesize source_event_hash"
352                )
353            })?;
354
355            Ok(EngineCommand::DepositUpdate {
356                wallet: legacy_payload.wallet,
357                amount: legacy_payload.amount,
358                timestamp_ms: legacy_payload.timestamp_ms,
359                sequence: Some(sequence),
360                source_event_hash: legacy_deposit_source_event_hash(sequence),
361            })
362        }
363    }
364}
365
366fn legacy_deposit_source_event_hash(sequence: u64) -> FixedBytes<32> {
367    let mut bytes = [0_u8; 32];
368    bytes[0..4].copy_from_slice(b"LDPT");
369    bytes[24..32].copy_from_slice(&sequence.to_be_bytes());
370    FixedBytes::from(bytes)
371}
372
373// Replay-only bridge for CALL-1900. Live legacy `DepositUpdate` messages remain
374// rejected in `deserialize_legacy_balance_command`; this path exists only so
375// AWS staging can replay historical WAL rows written before the named
376// `DepositUpdatePayload` cutover. The legacy `balance_after` field is ignored
377// because the journaled command effect is the deposit amount, and the required
378// durable sequence is used to reconstruct the deterministic legacy source hash.
379fn deserialize_legacy_deposit_update_for_replay(data: &[u8]) -> anyhow::Result<EngineCommand> {
380    let (wallet, amount, _balance_after, timestamp_ms, sequence) = from_wire_bytes::<(
381        hypercall_types::WalletAddress,
382        rust_decimal::Decimal,
383        rust_decimal::Decimal,
384        u64,
385        Option<u64>,
386    )>(data)
387    .map_err(|e| anyhow::anyhow!("legacy replay DepositUpdate tuple decode failed: {}", e))?;
388
389    let sequence = sequence.ok_or_else(|| {
390        anyhow::anyhow!(
391            "legacy replay DepositUpdate payload missing durable sequence; cannot synthesize source_event_hash"
392        )
393    })?;
394
395    Ok(EngineCommand::DepositUpdate {
396        wallet,
397        amount,
398        timestamp_ms,
399        sequence: Some(sequence),
400        source_event_hash: legacy_deposit_source_event_hash(sequence),
401    })
402}
403
404fn deserialize_tier_update_v1(data: &[u8]) -> anyhow::Result<EngineCommand> {
405    match from_named_wire_bytes::<TierUpdatePayload>(data) {
406        Ok(payload) => Ok(tier_update_command(payload)),
407        Err(full_update_err) => {
408            let payload: TierMarginModeUpdatePayload =
409                from_named_wire_bytes(data).map_err(|mode_err| {
410                    anyhow::anyhow!(
411                        "Deserialize TierUpdate failed as full named payload ({}) and margin-mode named payload ({})",
412                        full_update_err,
413                        mode_err
414                    )
415                })?;
416            Ok(EngineCommand::LegacyTierMarginModeUpdate {
417                wallet: payload.wallet,
418                margin_mode: payload.margin_mode,
419            })
420        }
421    }
422}
423
424fn deserialize_legacy_command(cmd_type: CommandType, data: &[u8]) -> anyhow::Result<EngineCommand> {
425    match cmd_type {
426        CommandType::Order => {
427            let msg: OrderActionMessage = from_wire_bytes(data)?;
428            Ok(EngineCommand::OrderAction(msg))
429        }
430        CommandType::PriceUpdate => {
431            let payload: PriceUpdatePayload = from_wire_bytes(data)?;
432            Ok(price_update_command(payload))
433        }
434        CommandType::IvUpdate => {
435            let payload: IvUpdatePayload = from_wire_bytes(data)?;
436            Ok(iv_update_command(payload))
437        }
438        CommandType::MarketAction => {
439            let cmd: MarketActionCommand = from_wire_bytes(data)?;
440            Ok(EngineCommand::MarketAction(cmd))
441        }
442        CommandType::LiquidationState => {
443            let msg: LiquidationStateMessage = from_wire_bytes(data)?;
444            Ok(EngineCommand::LiquidationState(msg))
445        }
446        CommandType::TierUpdate => deserialize_legacy_tier_update(data),
447        CommandType::HypercorePositionUpdate => {
448            let (account, coin, size, entry_price, unrealized_pnl, timestamp_ms): (
449                String,
450                String,
451                f64,
452                f64,
453                f64,
454                u64,
455            ) = from_wire_bytes(data)?;
456            Ok(EngineCommand::HypercorePositionUpdate {
457                account,
458                coin,
459                size,
460                entry_price,
461                unrealized_pnl,
462                timestamp_ms,
463            })
464        }
465        CommandType::MmpConfigUpdate => {
466            let (
467                wallet,
468                currency,
469                enabled,
470                interval_ms,
471                frozen_time_ms,
472                qty_limit,
473                delta_limit,
474                vega_limit,
475            ): LegacyMmpConfigUpdatePayload = from_wire_bytes(data)?;
476            Ok(EngineCommand::MmpConfigUpdate {
477                wallet,
478                currency,
479                enabled,
480                interval_ms,
481                frozen_time_ms,
482                qty_limit,
483                delta_limit,
484                vega_limit,
485            })
486        }
487        CommandType::TradingModeUpdate => {
488            let (modes, timestamp_ms): (
489                std::collections::HashMap<String, hypercall_types::TradingModes>,
490                u64,
491            ) = from_wire_bytes(data)?;
492            Ok(EngineCommand::TradingModeUpdate {
493                modes,
494                timestamp_ms,
495            })
496        }
497        CommandType::RfqExecute => {
498            let cmd: hypercall_runtime_api::RfqExecuteCommand = from_wire_bytes(data)?;
499            Ok(EngineCommand::RfqExecute(cmd))
500        }
501        CommandType::TickExpiry => deserialize_tick_expiry(data),
502        CommandType::ApproveAgent => deserialize_legacy_approve_agent(data),
503        CommandType::RevokeAgent => deserialize_legacy_revoke_agent(data),
504        CommandType::NonceAdvance => {
505            let (wallet, nonce, timestamp_ms): (hypercall_types::WalletAddress, u64, u64) =
506                from_wire_bytes(data)?;
507            Ok(EngineCommand::NonceAdvance {
508                wallet,
509                nonce,
510                timestamp_ms,
511            })
512        }
513        CommandType::OptionDepositUpdate => {
514            let (request_id, wallet, symbol, quantity, timestamp_ms): (
515                String,
516                hypercall_types::WalletAddress,
517                String,
518                rust_decimal::Decimal,
519                u64,
520            ) = from_wire_bytes(data)?;
521            Ok(EngineCommand::OptionDepositUpdate {
522                request_id,
523                wallet,
524                symbol,
525                quantity,
526                timestamp_ms,
527            })
528        }
529        CommandType::OptionWithdrawalUpdate => {
530            let (
531                request_id,
532                wallet,
533                account,
534                signer,
535                rsm_signer,
536                symbol,
537                quantity,
538                nonce,
539                action,
540                timestamp_ms,
541            ): (
542                String,
543                hypercall_types::WalletAddress,
544                hypercall_types::WalletAddress,
545                hypercall_types::WalletAddress,
546                hypercall_types::WalletAddress,
547                String,
548                rust_decimal::Decimal,
549                Option<u64>,
550                Vec<u8>,
551                u64,
552            ) = from_wire_bytes(data)?;
553            Ok(EngineCommand::OptionWithdrawalUpdate {
554                request_id,
555                wallet,
556                account,
557                signer,
558                rsm_signer,
559                symbol,
560                quantity,
561                nonce,
562                action,
563                timestamp_ms,
564            })
565        }
566        CommandType::CashWithdrawalUpdate => {
567            let (
568                request_id,
569                wallet,
570                account,
571                destination,
572                signer,
573                rsm_signer,
574                amount,
575                amount_wei,
576                nonce,
577                timestamp_ms,
578            ): (
579                String,
580                hypercall_types::WalletAddress,
581                hypercall_types::WalletAddress,
582                hypercall_types::WalletAddress,
583                hypercall_types::WalletAddress,
584                hypercall_types::WalletAddress,
585                rust_decimal::Decimal,
586                u64,
587                Option<u64>,
588                u64,
589            ) = from_wire_bytes(data)?;
590            Ok(EngineCommand::CashWithdrawalUpdate {
591                request_id,
592                wallet,
593                account,
594                destination,
595                signer,
596                rsm_signer,
597                amount,
598                amount_wei,
599                nonce,
600                timestamp_ms,
601            })
602        }
603        CommandType::HypercoreEquityUpdate => {
604            let (wallet, account_value, timestamp_ms): (
605                hypercall_types::WalletAddress,
606                rust_decimal::Decimal,
607                u64,
608            ) = from_wire_bytes(data)?;
609            Ok(EngineCommand::HypercoreEquityUpdate {
610                wallet,
611                account_value,
612                timestamp_ms,
613            })
614        }
615        typ @ (CommandType::DepositUpdate | CommandType::LiquidationBonusUpdate) => {
616            deserialize_legacy_balance_update(typ, data)
617        }
618        CommandType::SetPmSettlementPoolConfig
619        | CommandType::RecordPmVaultDeposit
620        | CommandType::RequestPmVaultWithdrawal
621        | CommandType::AccruePmSettlementInterest
622        | CommandType::ApplyPmSettlementRepayment
623        | CommandType::JournalPmRecoveryPlan
624        | CommandType::MarkPmRecoveryActionSubmitted
625        | CommandType::ResolvePmRecoveryAction => {
626            anyhow::bail!("PM settlement commands do not support legacy NATS payloads")
627        }
628    }
629}
630
631fn deserialize_tick_expiry(data: &[u8]) -> anyhow::Result<EngineCommand> {
632    let (now_ms, context): (u64, crate::rsm::apply::TickExpiryContext) = from_wire_bytes(data)?;
633    Ok(EngineCommand::TickExpiry { now_ms, context })
634}
635
636fn deserialize_legacy_tier_update(data: &[u8]) -> anyhow::Result<EngineCommand> {
637    match from_wire_bytes::<TierUpdatePayload>(data) {
638        Ok(payload) => Ok(tier_update_command(payload)),
639        Err(full_update_err) => {
640            let (wallet, margin_mode): (WalletAddress, MarginMode) =
641                from_wire_bytes(data).map_err(|mode_err| {
642                    anyhow::anyhow!(
643                        "Deserialize legacy TierUpdate failed as full payload ({}) and margin-mode payload ({})",
644                        full_update_err,
645                        mode_err
646                    )
647                })?;
648            Ok(EngineCommand::LegacyTierMarginModeUpdate {
649                wallet,
650                margin_mode,
651            })
652        }
653    }
654}
655
656fn deserialize_legacy_approve_agent(data: &[u8]) -> anyhow::Result<EngineCommand> {
657    if let Ok((wallet, agent, expires_at_ms, nonce, timestamp_ms)) = from_wire_bytes::<(
658        hypercall_types::WalletAddress,
659        hypercall_types::WalletAddress,
660        Option<u64>,
661        Option<u64>,
662        u64,
663    )>(data)
664    {
665        Ok(EngineCommand::ApproveAgent {
666            wallet,
667            agent,
668            expires_at_ms,
669            nonce,
670            timestamp_ms,
671        })
672    } else if let Ok((wallet, agent, expires_at_ms, nonce)) = from_wire_bytes::<(
673        hypercall_types::WalletAddress,
674        hypercall_types::WalletAddress,
675        Option<u64>,
676        Option<u64>,
677    )>(data)
678    {
679        Ok(EngineCommand::ApproveAgent {
680            wallet,
681            agent,
682            expires_at_ms,
683            nonce,
684            timestamp_ms: 0,
685        })
686    } else {
687        let (wallet, agent, expires_at_ms): (
688            hypercall_types::WalletAddress,
689            hypercall_types::WalletAddress,
690            Option<u64>,
691        ) = from_wire_bytes(data)?;
692        Ok(EngineCommand::ApproveAgent {
693            wallet,
694            agent,
695            expires_at_ms,
696            nonce: None,
697            timestamp_ms: 0,
698        })
699    }
700}
701
702fn deserialize_legacy_revoke_agent(data: &[u8]) -> anyhow::Result<EngineCommand> {
703    if let Ok((wallet, agent, nonce, timestamp_ms)) = from_wire_bytes::<(
704        hypercall_types::WalletAddress,
705        hypercall_types::WalletAddress,
706        Option<u64>,
707        u64,
708    )>(data)
709    {
710        Ok(EngineCommand::RevokeAgent {
711            wallet,
712            agent,
713            nonce,
714            timestamp_ms,
715        })
716    } else if let Ok((wallet, agent, nonce)) = from_wire_bytes::<(
717        hypercall_types::WalletAddress,
718        hypercall_types::WalletAddress,
719        Option<u64>,
720    )>(data)
721    {
722        Ok(EngineCommand::RevokeAgent {
723            wallet,
724            agent,
725            nonce,
726            timestamp_ms: 0,
727        })
728    } else {
729        let (wallet, agent): (
730            hypercall_types::WalletAddress,
731            hypercall_types::WalletAddress,
732        ) = from_wire_bytes(data)?;
733        Ok(EngineCommand::RevokeAgent {
734            wallet,
735            agent,
736            nonce: None,
737            timestamp_ms: 0,
738        })
739    }
740}
741
742fn deserialize_legacy_balance_update(
743    typ: CommandType,
744    data: &[u8],
745) -> anyhow::Result<EngineCommand> {
746    if typ == CommandType::DepositUpdate {
747        anyhow::bail!("legacy DepositUpdate payloads are unsupported");
748    }
749
750    if let Ok(payload) = from_named_wire_bytes::<BalanceCommandPayload>(data) {
751        return Ok(balance_update_command(payload));
752    }
753
754    if let Ok((wallet, amount, balance_after, timestamp_ms, sequence)) = from_wire_bytes::<(
755        hypercall_types::WalletAddress,
756        rust_decimal::Decimal,
757        rust_decimal::Decimal,
758        u64,
759        Option<u64>,
760    )>(data)
761    {
762        return Ok(balance_update_command(BalanceCommandPayload {
763            wallet,
764            amount,
765            balance_after,
766            timestamp_ms,
767            sequence,
768        }));
769    }
770
771    if let Ok((wallet, amount, balance_after, timestamp_ms)) = from_wire_bytes::<(
772        hypercall_types::WalletAddress,
773        rust_decimal::Decimal,
774        rust_decimal::Decimal,
775        u64,
776    )>(data)
777    {
778        return Ok(balance_update_command(BalanceCommandPayload {
779            wallet,
780            amount,
781            balance_after,
782            timestamp_ms,
783            sequence: None,
784        }));
785    }
786
787    warn!(
788        payload_len = data.len(),
789        command_type = ?typ,
790        "Received unsupported legacy balance update payload shape from NATS"
791    );
792    anyhow::bail!("Unsupported legacy balance update payload shape")
793}
794
795fn tier_update_command(payload: TierUpdatePayload) -> EngineCommand {
796    EngineCommand::TierUpdate {
797        wallet: payload.wallet,
798        margin_mode: payload.margin_mode,
799        tier: payload.tier,
800        trading_limits: payload.trading_limits,
801    }
802}
803
804fn price_update_command(payload: PriceUpdatePayload) -> EngineCommand {
805    EngineCommand::PriceUpdate {
806        underlying: payload.underlying,
807        spot_price: payload.spot_price,
808        timestamp_ms: payload.timestamp_ms,
809    }
810}
811
812fn iv_update_command(payload: IvUpdatePayload) -> EngineCommand {
813    let mut surface = crate::vol_oracle::vol_surface_cache::VolatilitySurface::new();
814    for point in &payload.strike_points {
815        surface.insert(point.strike, point.expiry, point.iv);
816    }
817    EngineCommand::IvUpdate {
818        underlying: payload.underlying,
819        surface,
820        journal_data: None,
821        timestamp_ms: payload.timestamp_ms,
822    }
823}
824
825fn deposit_update_command(payload: DepositUpdatePayload) -> EngineCommand {
826    EngineCommand::DepositUpdate {
827        wallet: payload.wallet,
828        amount: payload.amount,
829        timestamp_ms: payload.timestamp_ms,
830        sequence: Some(payload.sequence),
831        source_event_hash: payload.source_event_hash,
832    }
833}
834
835fn balance_update_command(payload: BalanceCommandPayload) -> EngineCommand {
836    EngineCommand::LiquidationBonusUpdate {
837        wallet: payload.wallet,
838        amount: payload.amount,
839        balance_after: payload.balance_after,
840        timestamp_ms: payload.timestamp_ms,
841        sequence: payload.sequence,
842    }
843}
844
845#[cfg(test)]
846mod tests {
847    use super::*;
848    use hypercall_types::api_models::TradingLimits;
849    use hypercall_types::serialize_to_wire_bytes;
850    use std::str::FromStr;
851
852    fn test_wallet() -> WalletAddress {
853        WalletAddress::from_str("0x1234567890123456789012345678901234567890")
854            .expect("valid test wallet")
855    }
856
857    #[test]
858    fn test_deserialize_tier_update_current_payload() {
859        let wallet = test_wallet();
860        let trading_limits = TradingLimits {
861            max_open_orders: 7,
862            max_open_positions: 8,
863            orders_per_minute: 70,
864            cancels_per_minute: 80,
865            api_requests_per_minute: 90,
866        };
867        let data = serialize_to_wire_bytes(&TierUpdatePayload {
868            wallet,
869            margin_mode: MarginMode::Portfolio,
870            tier: "tier1".to_string(),
871            trading_limits,
872        });
873
874        let command = deserialize_command(CommandType::TierUpdate, COMMAND_WIRE_VERSION_V1, &data)
875            .expect("deserialize TierUpdate");
876
877        match command {
878            EngineCommand::TierUpdate {
879                wallet: actual_wallet,
880                margin_mode,
881                tier,
882                trading_limits: actual_limits,
883            } => {
884                assert_eq!(actual_wallet, wallet);
885                assert_eq!(margin_mode, MarginMode::Portfolio);
886                assert_eq!(tier, "tier1");
887                assert_eq!(actual_limits.max_open_orders, 7);
888                assert_eq!(actual_limits.max_open_positions, 8);
889            }
890            other => panic!("expected current TierUpdate payload, got {other:?}"),
891        }
892    }
893
894    #[test]
895    fn test_deserialize_tier_update_legacy_margin_mode_payload() {
896        let wallet = test_wallet();
897        let data = serialize_to_wire_bytes(&(wallet, MarginMode::Standard));
898
899        let command =
900            deserialize_command(CommandType::TierUpdate, LEGACY_COMMAND_WIRE_VERSION, &data)
901                .expect("deserialize legacy TierUpdate");
902
903        match command {
904            EngineCommand::LegacyTierMarginModeUpdate {
905                wallet: actual_wallet,
906                margin_mode,
907            } => {
908                assert_eq!(actual_wallet, wallet);
909                assert_eq!(margin_mode, MarginMode::Standard);
910            }
911            other => panic!("expected legacy TierUpdate payload, got {other:?}"),
912        }
913    }
914
915    #[test]
916    fn test_deserialize_option_deposit_update_payload() {
917        let wallet = test_wallet();
918        let data = serialize_to_wire_bytes(&(
919            "deposit-request-1".to_string(),
920            wallet,
921            "BTC-20260130-100000-C".to_string(),
922            rust_decimal_macros::dec!(1.5),
923            1234_u64,
924        ));
925
926        let command = deserialize_command(
927            CommandType::OptionDepositUpdate,
928            COMMAND_WIRE_VERSION_V1,
929            &data,
930        )
931        .expect("deserialize OptionDepositUpdate");
932
933        match command {
934            EngineCommand::OptionDepositUpdate {
935                request_id,
936                wallet: actual_wallet,
937                symbol,
938                quantity,
939                timestamp_ms,
940            } => {
941                assert_eq!(request_id, "deposit-request-1");
942                assert_eq!(actual_wallet, wallet);
943                assert_eq!(symbol, "BTC-20260130-100000-C");
944                assert_eq!(quantity, rust_decimal_macros::dec!(1.5));
945                assert_eq!(timestamp_ms, 1234);
946            }
947            other => panic!("expected OptionDepositUpdate payload, got {other:?}"),
948        }
949    }
950
951    #[test]
952    fn test_deserialize_option_withdrawal_update_payload() {
953        let wallet = test_wallet();
954        let account = test_wallet();
955        let signer = test_wallet();
956        let rsm_signer = test_wallet();
957        let data = serialize_to_wire_bytes(&(
958            "withdrawal-request-1".to_string(),
959            wallet,
960            account,
961            signer,
962            rsm_signer,
963            "BTC-20260130-100000-C".to_string(),
964            rust_decimal_macros::dec!(0.75),
965            Some(5678_u64),
966            vec![1, 2, 3],
967            5678_u64,
968        ));
969
970        let command = deserialize_command(
971            CommandType::OptionWithdrawalUpdate,
972            COMMAND_WIRE_VERSION_V1,
973            &data,
974        )
975        .expect("deserialize OptionWithdrawalUpdate");
976
977        match command {
978            EngineCommand::OptionWithdrawalUpdate {
979                request_id,
980                wallet: actual_wallet,
981                account: actual_account,
982                signer: actual_signer,
983                rsm_signer: actual_rsm_signer,
984                symbol,
985                quantity,
986                nonce,
987                action,
988                timestamp_ms,
989            } => {
990                assert_eq!(request_id, "withdrawal-request-1");
991                assert_eq!(actual_wallet, wallet);
992                assert_eq!(actual_account, account);
993                assert_eq!(actual_signer, signer);
994                assert_eq!(actual_rsm_signer, rsm_signer);
995                assert_eq!(symbol, "BTC-20260130-100000-C");
996                assert_eq!(quantity, rust_decimal_macros::dec!(0.75));
997                assert_eq!(nonce, Some(5678));
998                assert_eq!(action, vec![1, 2, 3]);
999                assert_eq!(timestamp_ms, 5678);
1000            }
1001            other => panic!("expected OptionWithdrawalUpdate payload, got {other:?}"),
1002        }
1003    }
1004
1005    #[test]
1006    fn test_deserialize_deposit_update_current_payload() {
1007        let wallet = test_wallet();
1008        let source_event_hash = alloy::primitives::FixedBytes::<32>::from([7u8; 32]);
1009        let data = serialize_to_wire_bytes(&DepositUpdatePayload {
1010            wallet,
1011            amount: rust_decimal_macros::dec!(125.50),
1012            timestamp_ms: 6789,
1013            sequence: 42,
1014            source_event_hash,
1015        });
1016
1017        let command =
1018            deserialize_command(CommandType::DepositUpdate, COMMAND_WIRE_VERSION_V1, &data)
1019                .expect("deserialize current DepositUpdate");
1020
1021        match command {
1022            EngineCommand::DepositUpdate {
1023                wallet: actual_wallet,
1024                amount,
1025                timestamp_ms,
1026                sequence,
1027                source_event_hash: actual_source_event_hash,
1028            } => {
1029                assert_eq!(actual_wallet, wallet);
1030                assert_eq!(amount, rust_decimal_macros::dec!(125.50));
1031                assert_eq!(timestamp_ms, 6789);
1032                assert_eq!(sequence, Some(42));
1033                assert_eq!(actual_source_event_hash, source_event_hash);
1034            }
1035            other => panic!("expected current DepositUpdate payload, got {other:?}"),
1036        }
1037    }
1038
1039    #[test]
1040    fn test_deserialize_deposit_update_rejects_legacy_balance_payload() {
1041        let wallet = test_wallet();
1042        let data = serialize_to_wire_bytes(&BalanceCommandPayload {
1043            wallet,
1044            amount: rust_decimal_macros::dec!(125.50),
1045            balance_after: rust_decimal_macros::dec!(125.50),
1046            timestamp_ms: 6789,
1047            sequence: Some(42),
1048        });
1049
1050        let error = deserialize_command(CommandType::DepositUpdate, COMMAND_WIRE_VERSION_V1, &data)
1051            .expect_err("live DepositUpdate must reject legacy balance payload");
1052
1053        assert!(
1054            error.to_string().contains("source_event_hash"),
1055            "unexpected error: {error}"
1056        );
1057
1058        let replay_command = deserialize_command_for_replay(
1059            CommandType::DepositUpdate,
1060            COMMAND_WIRE_VERSION_V1,
1061            &data,
1062        )
1063        .expect("replay accepts legacy persisted DepositUpdate payload with sequence");
1064
1065        match replay_command {
1066            EngineCommand::DepositUpdate {
1067                wallet: actual_wallet,
1068                amount,
1069                timestamp_ms,
1070                sequence,
1071                source_event_hash,
1072            } => {
1073                assert_eq!(actual_wallet, wallet);
1074                assert_eq!(amount, rust_decimal_macros::dec!(125.50));
1075                assert_eq!(timestamp_ms, 6789);
1076                assert_eq!(sequence, Some(42));
1077                assert_eq!(source_event_hash, legacy_deposit_source_event_hash(42));
1078            }
1079            other => panic!("expected replay DepositUpdate payload, got {other:?}"),
1080        }
1081    }
1082
1083    #[test]
1084    fn test_replay_deserializes_legacy_deposit_update_tuple_with_sequence() {
1085        let wallet = test_wallet();
1086        let data = serialize_to_wire_bytes(&(
1087            wallet,
1088            rust_decimal_macros::dec!(125.50),
1089            rust_decimal_macros::dec!(1000.25),
1090            6789_u64,
1091            Some(42_u64),
1092        ));
1093
1094        deserialize_command(
1095            CommandType::DepositUpdate,
1096            LEGACY_COMMAND_WIRE_VERSION,
1097            &data,
1098        )
1099        .expect_err("live legacy DepositUpdate tuple must stay rejected");
1100
1101        let replay_command = deserialize_command_for_replay(
1102            CommandType::DepositUpdate,
1103            LEGACY_COMMAND_WIRE_VERSION,
1104            &data,
1105        )
1106        .expect("replay accepts sequenced legacy DepositUpdate tuple");
1107
1108        match replay_command {
1109            EngineCommand::DepositUpdate {
1110                wallet: actual_wallet,
1111                amount,
1112                timestamp_ms,
1113                sequence,
1114                source_event_hash,
1115            } => {
1116                assert_eq!(actual_wallet, wallet);
1117                assert_eq!(amount, rust_decimal_macros::dec!(125.50));
1118                assert_eq!(timestamp_ms, 6789);
1119                assert_eq!(sequence, Some(42));
1120                assert_eq!(source_event_hash, legacy_deposit_source_event_hash(42));
1121            }
1122            other => panic!("expected replay DepositUpdate payload, got {other:?}"),
1123        }
1124    }
1125
1126    #[test]
1127    fn test_deserialize_tier_update_legacy_full_payload() {
1128        let wallet = test_wallet();
1129        let trading_limits = TradingLimits {
1130            max_open_orders: 17,
1131            max_open_positions: 18,
1132            orders_per_minute: 170,
1133            cancels_per_minute: 180,
1134            api_requests_per_minute: 190,
1135        };
1136        let data = serialize_to_wire_bytes(&TierUpdatePayload {
1137            wallet,
1138            margin_mode: MarginMode::Portfolio,
1139            tier: "legacy-tier".to_string(),
1140            trading_limits,
1141        });
1142
1143        let command =
1144            deserialize_command(CommandType::TierUpdate, LEGACY_COMMAND_WIRE_VERSION, &data)
1145                .expect("deserialize legacy full TierUpdate");
1146
1147        match command {
1148            EngineCommand::TierUpdate {
1149                wallet: actual_wallet,
1150                margin_mode,
1151                tier,
1152                trading_limits: actual_limits,
1153            } => {
1154                assert_eq!(actual_wallet, wallet);
1155                assert_eq!(margin_mode, MarginMode::Portfolio);
1156                assert_eq!(tier, "legacy-tier");
1157                assert_eq!(actual_limits.max_open_orders, 17);
1158                assert_eq!(actual_limits.max_open_positions, 18);
1159            }
1160            other => panic!("expected legacy full TierUpdate payload, got {other:?}"),
1161        }
1162    }
1163
1164    #[test]
1165    fn test_deserialize_price_update_legacy_struct_payload() {
1166        let mut data = vec![hypercall_types::WIRE_FORMAT_VERSION];
1167        data.extend(
1168            rmp_serde::to_vec(&PriceUpdatePayload {
1169                underlying: "BTC".to_string(),
1170                spot_price: rust_decimal_macros::dec!(95000),
1171                timestamp_ms: 123,
1172            })
1173            .expect("serialize legacy PriceUpdate payload"),
1174        );
1175
1176        let command =
1177            deserialize_command(CommandType::PriceUpdate, LEGACY_COMMAND_WIRE_VERSION, &data)
1178                .expect("deserialize legacy PriceUpdate");
1179
1180        match command {
1181            EngineCommand::PriceUpdate {
1182                underlying,
1183                spot_price,
1184                timestamp_ms,
1185            } => {
1186                assert_eq!(underlying, "BTC");
1187                assert_eq!(spot_price, rust_decimal_macros::dec!(95000));
1188                assert_eq!(timestamp_ms, 123);
1189            }
1190            other => panic!("expected legacy PriceUpdate payload, got {other:?}"),
1191        }
1192    }
1193}