1use super::{CommandType, COMMAND_WIRE_VERSION_V1, LEGACY_COMMAND_WIRE_VERSION};
2use crate::rsm::apply::{
3 AccruePmSettlementInterestCommand, ApplyPmSettlementRepaymentCommand, ApproveAgentPayload,
4 BalanceCommandPayload, DepositUpdatePayload, EngineCommand, HypercoreEquityUpdatePayload,
5 HypercorePositionUpdatePayload, IvUpdatePayload, JournalPmRecoveryPlanCommand,
6 MarkPmRecoveryActionSubmittedCommand, MarketActionCommand, MmpConfigUpdatePayload,
7 NonceAdvancePayload, PriceUpdatePayload, RecordPmVaultDepositCommand,
8 RequestPmVaultWithdrawalCommand, ResolvePmRecoveryActionCommand, RevokeAgentPayload,
9 SetPmSettlementPoolConfigCommand, TierMarginModeUpdatePayload, TierUpdatePayload,
10 TradingModeUpdatePayload,
11};
12use crate::rsm::margin_mode::MarginMode;
13use alloy::primitives::FixedBytes;
14use hypercall_types::WalletAddress;
15use hypercall_types::{LiquidationStateMessage, OrderActionMessage};
16use tracing::warn;
17
18fn from_wire_bytes<T: serde::de::DeserializeOwned>(data: &[u8]) -> anyhow::Result<T> {
19 if data.is_empty() {
20 return Err(anyhow::anyhow!("Empty command data"));
21 }
22 if data[0] != hypercall_types::WIRE_FORMAT_VERSION {
23 anyhow::bail!("Unsupported inner wire version: {}", data[0]);
24 }
25 rmp_serde::from_slice(&data[1..]).map_err(|e| anyhow::anyhow!("Deserialize failed: {}", e))
26}
27
28fn from_named_wire_bytes<T: serde::de::DeserializeOwned>(data: &[u8]) -> anyhow::Result<T> {
29 if data.len() < 2 {
30 anyhow::bail!("Named command data missing MessagePack payload");
31 }
32 if !is_msgpack_map_marker(data[1]) {
33 anyhow::bail!(
34 "Named command payload must be encoded as a MessagePack map, got marker 0x{:02x}",
35 data[1]
36 );
37 }
38 from_wire_bytes(data)
39}
40
41fn is_msgpack_map_marker(marker: u8) -> bool {
42 matches!(marker, 0x80..=0x8f | 0xde | 0xdf)
43}
44
45type LegacyMmpConfigUpdatePayload = (
46 hypercall_types::WalletAddress,
47 String,
48 bool,
49 i64,
50 i64,
51 Option<f64>,
52 Option<f64>,
53 Option<f64>,
54);
55
56pub fn deserialize_command(
58 cmd_type: CommandType,
59 command_version: u8,
60 data: &[u8],
61) -> anyhow::Result<EngineCommand> {
62 match command_version {
63 LEGACY_COMMAND_WIRE_VERSION => deserialize_legacy_command(cmd_type, data),
64 COMMAND_WIRE_VERSION_V1 => deserialize_command_v1(cmd_type, data),
65 other => anyhow::bail!(
66 "Unsupported NATS command wire version {} for {:?}",
67 other,
68 cmd_type
69 ),
70 }
71}
72
73pub(crate) fn deserialize_command_for_replay(
95 cmd_type: CommandType,
96 command_version: u8,
97 data: &[u8],
98) -> anyhow::Result<EngineCommand> {
99 match (cmd_type, command_version) {
100 (CommandType::DepositUpdate, COMMAND_WIRE_VERSION_V1) => {
101 deserialize_deposit_update_for_replay(data)
102 }
103 (CommandType::DepositUpdate, LEGACY_COMMAND_WIRE_VERSION) => {
104 deserialize_legacy_deposit_update_for_replay(data)
105 }
106 _ => deserialize_command(cmd_type, command_version, data),
107 }
108}
109
110fn deserialize_command_v1(cmd_type: CommandType, data: &[u8]) -> anyhow::Result<EngineCommand> {
111 match cmd_type {
112 CommandType::Order => {
113 let msg: OrderActionMessage = from_named_wire_bytes(data)?;
114 Ok(EngineCommand::OrderAction(msg))
115 }
116 CommandType::PriceUpdate => {
117 let p: PriceUpdatePayload = from_named_wire_bytes(data)?;
118 Ok(price_update_command(p))
119 }
120 CommandType::IvUpdate => {
121 let p: IvUpdatePayload = from_named_wire_bytes(data)?;
122 Ok(iv_update_command(p))
123 }
124 CommandType::MarketAction => {
125 let cmd: MarketActionCommand = from_named_wire_bytes(data)?;
126 Ok(EngineCommand::MarketAction(cmd))
127 }
128 CommandType::LiquidationState => {
129 let msg: LiquidationStateMessage = from_named_wire_bytes(data)?;
130 Ok(EngineCommand::LiquidationState(msg))
131 }
132 CommandType::TierUpdate => deserialize_tier_update_v1(data),
133 CommandType::HypercorePositionUpdate => {
134 let payload: HypercorePositionUpdatePayload = from_named_wire_bytes(data)?;
135 Ok(EngineCommand::HypercorePositionUpdate {
136 account: payload.account,
137 coin: payload.coin,
138 size: payload.size,
139 entry_price: payload.entry_price,
140 unrealized_pnl: payload.unrealized_pnl,
141 timestamp_ms: payload.timestamp_ms,
142 })
143 }
144 CommandType::MmpConfigUpdate => {
145 let payload: MmpConfigUpdatePayload = from_named_wire_bytes(data)?;
146 Ok(EngineCommand::MmpConfigUpdate {
147 wallet: payload.wallet,
148 currency: payload.currency,
149 enabled: payload.enabled,
150 interval_ms: payload.interval_ms,
151 frozen_time_ms: payload.frozen_time_ms,
152 qty_limit: payload.qty_limit,
153 delta_limit: payload.delta_limit,
154 vega_limit: payload.vega_limit,
155 })
156 }
157 CommandType::RfqExecute => {
158 let cmd: hypercall_runtime_api::RfqExecuteCommand = from_named_wire_bytes(data)?;
159 Ok(EngineCommand::RfqExecute(cmd))
160 }
161 CommandType::TradingModeUpdate => {
162 let payload: TradingModeUpdatePayload = from_named_wire_bytes(data)?;
163 Ok(EngineCommand::TradingModeUpdate {
164 modes: payload.modes,
165 timestamp_ms: payload.timestamp_ms,
166 })
167 }
168 CommandType::TickExpiry => deserialize_tick_expiry(data),
169 CommandType::ApproveAgent => {
170 let payload: ApproveAgentPayload = from_named_wire_bytes(data)?;
171 Ok(EngineCommand::ApproveAgent {
172 wallet: payload.wallet,
173 agent: payload.agent,
174 expires_at_ms: payload.expires_at_ms,
175 nonce: payload.nonce,
176 timestamp_ms: payload.timestamp_ms,
177 })
178 }
179 CommandType::RevokeAgent => {
180 let payload: RevokeAgentPayload = from_named_wire_bytes(data)?;
181 Ok(EngineCommand::RevokeAgent {
182 wallet: payload.wallet,
183 agent: payload.agent,
184 nonce: payload.nonce,
185 timestamp_ms: payload.timestamp_ms,
186 })
187 }
188 CommandType::NonceAdvance => {
189 let payload: NonceAdvancePayload = from_named_wire_bytes(data)?;
190 Ok(EngineCommand::NonceAdvance {
191 wallet: payload.wallet,
192 nonce: payload.nonce,
193 timestamp_ms: payload.timestamp_ms,
194 })
195 }
196 CommandType::HypercoreEquityUpdate => {
197 let payload: HypercoreEquityUpdatePayload = from_named_wire_bytes(data)?;
198 Ok(EngineCommand::HypercoreEquityUpdate {
199 wallet: payload.wallet,
200 account_value: payload.account_value,
201 timestamp_ms: payload.timestamp_ms,
202 })
203 }
204 CommandType::DepositUpdate => {
205 let payload: DepositUpdatePayload = from_named_wire_bytes(data)?;
206 Ok(deposit_update_command(payload))
207 }
208 CommandType::LiquidationBonusUpdate => {
209 let payload: BalanceCommandPayload = from_named_wire_bytes(data)?;
210 Ok(balance_update_command(payload))
211 }
212 CommandType::OptionDepositUpdate => {
213 let (request_id, wallet, symbol, quantity, timestamp_ms): (
214 String,
215 hypercall_types::WalletAddress,
216 String,
217 rust_decimal::Decimal,
218 u64,
219 ) = from_wire_bytes(data)?;
220 Ok(EngineCommand::OptionDepositUpdate {
221 request_id,
222 wallet,
223 symbol,
224 quantity,
225 timestamp_ms,
226 })
227 }
228 CommandType::OptionWithdrawalUpdate => {
229 let (
230 request_id,
231 wallet,
232 account,
233 signer,
234 rsm_signer,
235 symbol,
236 quantity,
237 nonce,
238 action,
239 timestamp_ms,
240 ): (
241 String,
242 hypercall_types::WalletAddress,
243 hypercall_types::WalletAddress,
244 hypercall_types::WalletAddress,
245 hypercall_types::WalletAddress,
246 String,
247 rust_decimal::Decimal,
248 Option<u64>,
249 Vec<u8>,
250 u64,
251 ) = from_wire_bytes(data)?;
252 Ok(EngineCommand::OptionWithdrawalUpdate {
253 request_id,
254 wallet,
255 account,
256 signer,
257 rsm_signer,
258 symbol,
259 quantity,
260 nonce,
261 action,
262 timestamp_ms,
263 })
264 }
265 CommandType::CashWithdrawalUpdate => {
266 let (
267 request_id,
268 wallet,
269 account,
270 destination,
271 signer,
272 rsm_signer,
273 amount,
274 amount_wei,
275 nonce,
276 timestamp_ms,
277 ): (
278 String,
279 hypercall_types::WalletAddress,
280 hypercall_types::WalletAddress,
281 hypercall_types::WalletAddress,
282 hypercall_types::WalletAddress,
283 hypercall_types::WalletAddress,
284 rust_decimal::Decimal,
285 u64,
286 Option<u64>,
287 u64,
288 ) = from_wire_bytes(data)?;
289 Ok(EngineCommand::CashWithdrawalUpdate {
290 request_id,
291 wallet,
292 account,
293 destination,
294 signer,
295 rsm_signer,
296 amount,
297 amount_wei,
298 nonce,
299 timestamp_ms,
300 })
301 }
302 CommandType::SetPmSettlementPoolConfig => {
303 let command: SetPmSettlementPoolConfigCommand = from_named_wire_bytes(data)?;
304 Ok(EngineCommand::SetPmSettlementPoolConfig(command))
305 }
306 CommandType::RecordPmVaultDeposit => {
307 let command: RecordPmVaultDepositCommand = from_named_wire_bytes(data)?;
308 Ok(EngineCommand::RecordPmVaultDeposit(command))
309 }
310 CommandType::RequestPmVaultWithdrawal => {
311 let command: RequestPmVaultWithdrawalCommand = from_named_wire_bytes(data)?;
312 Ok(EngineCommand::RequestPmVaultWithdrawal(command))
313 }
314 CommandType::AccruePmSettlementInterest => {
315 let command: AccruePmSettlementInterestCommand = from_named_wire_bytes(data)?;
316 Ok(EngineCommand::AccruePmSettlementInterest(command))
317 }
318 CommandType::ApplyPmSettlementRepayment => {
319 let command: ApplyPmSettlementRepaymentCommand = from_named_wire_bytes(data)?;
320 Ok(EngineCommand::ApplyPmSettlementRepayment(command))
321 }
322 CommandType::JournalPmRecoveryPlan => {
323 let command: JournalPmRecoveryPlanCommand = from_named_wire_bytes(data)?;
324 Ok(EngineCommand::JournalPmRecoveryPlan(command))
325 }
326 CommandType::MarkPmRecoveryActionSubmitted => {
327 let command: MarkPmRecoveryActionSubmittedCommand = from_named_wire_bytes(data)?;
328 Ok(EngineCommand::MarkPmRecoveryActionSubmitted(command))
329 }
330 CommandType::ResolvePmRecoveryAction => {
331 let command: ResolvePmRecoveryActionCommand = from_named_wire_bytes(data)?;
332 Ok(EngineCommand::ResolvePmRecoveryAction(command))
333 }
334 }
335}
336
337fn deserialize_deposit_update_for_replay(data: &[u8]) -> anyhow::Result<EngineCommand> {
338 match from_named_wire_bytes::<DepositUpdatePayload>(data) {
339 Ok(payload) => return Ok(deposit_update_command(payload)),
340 Err(current_err) => {
341 let legacy_payload: BalanceCommandPayload =
342 from_named_wire_bytes(data).map_err(|legacy_err| {
343 anyhow::anyhow!(
344 "Deserialize replay DepositUpdate failed as current payload ({}) and legacy balance payload ({})",
345 current_err,
346 legacy_err
347 )
348 })?;
349 let sequence = legacy_payload.sequence.ok_or_else(|| {
350 anyhow::anyhow!(
351 "Legacy replay DepositUpdate balance payload missing sequence; cannot synthesize source_event_hash"
352 )
353 })?;
354
355 Ok(EngineCommand::DepositUpdate {
356 wallet: legacy_payload.wallet,
357 amount: legacy_payload.amount,
358 timestamp_ms: legacy_payload.timestamp_ms,
359 sequence: Some(sequence),
360 source_event_hash: legacy_deposit_source_event_hash(sequence),
361 })
362 }
363 }
364}
365
366fn legacy_deposit_source_event_hash(sequence: u64) -> FixedBytes<32> {
367 let mut bytes = [0_u8; 32];
368 bytes[0..4].copy_from_slice(b"LDPT");
369 bytes[24..32].copy_from_slice(&sequence.to_be_bytes());
370 FixedBytes::from(bytes)
371}
372
373fn deserialize_legacy_deposit_update_for_replay(data: &[u8]) -> anyhow::Result<EngineCommand> {
380 let (wallet, amount, _balance_after, timestamp_ms, sequence) = from_wire_bytes::<(
381 hypercall_types::WalletAddress,
382 rust_decimal::Decimal,
383 rust_decimal::Decimal,
384 u64,
385 Option<u64>,
386 )>(data)
387 .map_err(|e| anyhow::anyhow!("legacy replay DepositUpdate tuple decode failed: {}", e))?;
388
389 let sequence = sequence.ok_or_else(|| {
390 anyhow::anyhow!(
391 "legacy replay DepositUpdate payload missing durable sequence; cannot synthesize source_event_hash"
392 )
393 })?;
394
395 Ok(EngineCommand::DepositUpdate {
396 wallet,
397 amount,
398 timestamp_ms,
399 sequence: Some(sequence),
400 source_event_hash: legacy_deposit_source_event_hash(sequence),
401 })
402}
403
404fn deserialize_tier_update_v1(data: &[u8]) -> anyhow::Result<EngineCommand> {
405 match from_named_wire_bytes::<TierUpdatePayload>(data) {
406 Ok(payload) => Ok(tier_update_command(payload)),
407 Err(full_update_err) => {
408 let payload: TierMarginModeUpdatePayload =
409 from_named_wire_bytes(data).map_err(|mode_err| {
410 anyhow::anyhow!(
411 "Deserialize TierUpdate failed as full named payload ({}) and margin-mode named payload ({})",
412 full_update_err,
413 mode_err
414 )
415 })?;
416 Ok(EngineCommand::LegacyTierMarginModeUpdate {
417 wallet: payload.wallet,
418 margin_mode: payload.margin_mode,
419 })
420 }
421 }
422}
423
424fn deserialize_legacy_command(cmd_type: CommandType, data: &[u8]) -> anyhow::Result<EngineCommand> {
425 match cmd_type {
426 CommandType::Order => {
427 let msg: OrderActionMessage = from_wire_bytes(data)?;
428 Ok(EngineCommand::OrderAction(msg))
429 }
430 CommandType::PriceUpdate => {
431 let payload: PriceUpdatePayload = from_wire_bytes(data)?;
432 Ok(price_update_command(payload))
433 }
434 CommandType::IvUpdate => {
435 let payload: IvUpdatePayload = from_wire_bytes(data)?;
436 Ok(iv_update_command(payload))
437 }
438 CommandType::MarketAction => {
439 let cmd: MarketActionCommand = from_wire_bytes(data)?;
440 Ok(EngineCommand::MarketAction(cmd))
441 }
442 CommandType::LiquidationState => {
443 let msg: LiquidationStateMessage = from_wire_bytes(data)?;
444 Ok(EngineCommand::LiquidationState(msg))
445 }
446 CommandType::TierUpdate => deserialize_legacy_tier_update(data),
447 CommandType::HypercorePositionUpdate => {
448 let (account, coin, size, entry_price, unrealized_pnl, timestamp_ms): (
449 String,
450 String,
451 f64,
452 f64,
453 f64,
454 u64,
455 ) = from_wire_bytes(data)?;
456 Ok(EngineCommand::HypercorePositionUpdate {
457 account,
458 coin,
459 size,
460 entry_price,
461 unrealized_pnl,
462 timestamp_ms,
463 })
464 }
465 CommandType::MmpConfigUpdate => {
466 let (
467 wallet,
468 currency,
469 enabled,
470 interval_ms,
471 frozen_time_ms,
472 qty_limit,
473 delta_limit,
474 vega_limit,
475 ): LegacyMmpConfigUpdatePayload = from_wire_bytes(data)?;
476 Ok(EngineCommand::MmpConfigUpdate {
477 wallet,
478 currency,
479 enabled,
480 interval_ms,
481 frozen_time_ms,
482 qty_limit,
483 delta_limit,
484 vega_limit,
485 })
486 }
487 CommandType::TradingModeUpdate => {
488 let (modes, timestamp_ms): (
489 std::collections::HashMap<String, hypercall_types::TradingModes>,
490 u64,
491 ) = from_wire_bytes(data)?;
492 Ok(EngineCommand::TradingModeUpdate {
493 modes,
494 timestamp_ms,
495 })
496 }
497 CommandType::RfqExecute => {
498 let cmd: hypercall_runtime_api::RfqExecuteCommand = from_wire_bytes(data)?;
499 Ok(EngineCommand::RfqExecute(cmd))
500 }
501 CommandType::TickExpiry => deserialize_tick_expiry(data),
502 CommandType::ApproveAgent => deserialize_legacy_approve_agent(data),
503 CommandType::RevokeAgent => deserialize_legacy_revoke_agent(data),
504 CommandType::NonceAdvance => {
505 let (wallet, nonce, timestamp_ms): (hypercall_types::WalletAddress, u64, u64) =
506 from_wire_bytes(data)?;
507 Ok(EngineCommand::NonceAdvance {
508 wallet,
509 nonce,
510 timestamp_ms,
511 })
512 }
513 CommandType::OptionDepositUpdate => {
514 let (request_id, wallet, symbol, quantity, timestamp_ms): (
515 String,
516 hypercall_types::WalletAddress,
517 String,
518 rust_decimal::Decimal,
519 u64,
520 ) = from_wire_bytes(data)?;
521 Ok(EngineCommand::OptionDepositUpdate {
522 request_id,
523 wallet,
524 symbol,
525 quantity,
526 timestamp_ms,
527 })
528 }
529 CommandType::OptionWithdrawalUpdate => {
530 let (
531 request_id,
532 wallet,
533 account,
534 signer,
535 rsm_signer,
536 symbol,
537 quantity,
538 nonce,
539 action,
540 timestamp_ms,
541 ): (
542 String,
543 hypercall_types::WalletAddress,
544 hypercall_types::WalletAddress,
545 hypercall_types::WalletAddress,
546 hypercall_types::WalletAddress,
547 String,
548 rust_decimal::Decimal,
549 Option<u64>,
550 Vec<u8>,
551 u64,
552 ) = from_wire_bytes(data)?;
553 Ok(EngineCommand::OptionWithdrawalUpdate {
554 request_id,
555 wallet,
556 account,
557 signer,
558 rsm_signer,
559 symbol,
560 quantity,
561 nonce,
562 action,
563 timestamp_ms,
564 })
565 }
566 CommandType::CashWithdrawalUpdate => {
567 let (
568 request_id,
569 wallet,
570 account,
571 destination,
572 signer,
573 rsm_signer,
574 amount,
575 amount_wei,
576 nonce,
577 timestamp_ms,
578 ): (
579 String,
580 hypercall_types::WalletAddress,
581 hypercall_types::WalletAddress,
582 hypercall_types::WalletAddress,
583 hypercall_types::WalletAddress,
584 hypercall_types::WalletAddress,
585 rust_decimal::Decimal,
586 u64,
587 Option<u64>,
588 u64,
589 ) = from_wire_bytes(data)?;
590 Ok(EngineCommand::CashWithdrawalUpdate {
591 request_id,
592 wallet,
593 account,
594 destination,
595 signer,
596 rsm_signer,
597 amount,
598 amount_wei,
599 nonce,
600 timestamp_ms,
601 })
602 }
603 CommandType::HypercoreEquityUpdate => {
604 let (wallet, account_value, timestamp_ms): (
605 hypercall_types::WalletAddress,
606 rust_decimal::Decimal,
607 u64,
608 ) = from_wire_bytes(data)?;
609 Ok(EngineCommand::HypercoreEquityUpdate {
610 wallet,
611 account_value,
612 timestamp_ms,
613 })
614 }
615 typ @ (CommandType::DepositUpdate | CommandType::LiquidationBonusUpdate) => {
616 deserialize_legacy_balance_update(typ, data)
617 }
618 CommandType::SetPmSettlementPoolConfig
619 | CommandType::RecordPmVaultDeposit
620 | CommandType::RequestPmVaultWithdrawal
621 | CommandType::AccruePmSettlementInterest
622 | CommandType::ApplyPmSettlementRepayment
623 | CommandType::JournalPmRecoveryPlan
624 | CommandType::MarkPmRecoveryActionSubmitted
625 | CommandType::ResolvePmRecoveryAction => {
626 anyhow::bail!("PM settlement commands do not support legacy NATS payloads")
627 }
628 }
629}
630
631fn deserialize_tick_expiry(data: &[u8]) -> anyhow::Result<EngineCommand> {
632 let (now_ms, context): (u64, crate::rsm::apply::TickExpiryContext) = from_wire_bytes(data)?;
633 Ok(EngineCommand::TickExpiry { now_ms, context })
634}
635
636fn deserialize_legacy_tier_update(data: &[u8]) -> anyhow::Result<EngineCommand> {
637 match from_wire_bytes::<TierUpdatePayload>(data) {
638 Ok(payload) => Ok(tier_update_command(payload)),
639 Err(full_update_err) => {
640 let (wallet, margin_mode): (WalletAddress, MarginMode) =
641 from_wire_bytes(data).map_err(|mode_err| {
642 anyhow::anyhow!(
643 "Deserialize legacy TierUpdate failed as full payload ({}) and margin-mode payload ({})",
644 full_update_err,
645 mode_err
646 )
647 })?;
648 Ok(EngineCommand::LegacyTierMarginModeUpdate {
649 wallet,
650 margin_mode,
651 })
652 }
653 }
654}
655
656fn deserialize_legacy_approve_agent(data: &[u8]) -> anyhow::Result<EngineCommand> {
657 if let Ok((wallet, agent, expires_at_ms, nonce, timestamp_ms)) = from_wire_bytes::<(
658 hypercall_types::WalletAddress,
659 hypercall_types::WalletAddress,
660 Option<u64>,
661 Option<u64>,
662 u64,
663 )>(data)
664 {
665 Ok(EngineCommand::ApproveAgent {
666 wallet,
667 agent,
668 expires_at_ms,
669 nonce,
670 timestamp_ms,
671 })
672 } else if let Ok((wallet, agent, expires_at_ms, nonce)) = from_wire_bytes::<(
673 hypercall_types::WalletAddress,
674 hypercall_types::WalletAddress,
675 Option<u64>,
676 Option<u64>,
677 )>(data)
678 {
679 Ok(EngineCommand::ApproveAgent {
680 wallet,
681 agent,
682 expires_at_ms,
683 nonce,
684 timestamp_ms: 0,
685 })
686 } else {
687 let (wallet, agent, expires_at_ms): (
688 hypercall_types::WalletAddress,
689 hypercall_types::WalletAddress,
690 Option<u64>,
691 ) = from_wire_bytes(data)?;
692 Ok(EngineCommand::ApproveAgent {
693 wallet,
694 agent,
695 expires_at_ms,
696 nonce: None,
697 timestamp_ms: 0,
698 })
699 }
700}
701
702fn deserialize_legacy_revoke_agent(data: &[u8]) -> anyhow::Result<EngineCommand> {
703 if let Ok((wallet, agent, nonce, timestamp_ms)) = from_wire_bytes::<(
704 hypercall_types::WalletAddress,
705 hypercall_types::WalletAddress,
706 Option<u64>,
707 u64,
708 )>(data)
709 {
710 Ok(EngineCommand::RevokeAgent {
711 wallet,
712 agent,
713 nonce,
714 timestamp_ms,
715 })
716 } else if let Ok((wallet, agent, nonce)) = from_wire_bytes::<(
717 hypercall_types::WalletAddress,
718 hypercall_types::WalletAddress,
719 Option<u64>,
720 )>(data)
721 {
722 Ok(EngineCommand::RevokeAgent {
723 wallet,
724 agent,
725 nonce,
726 timestamp_ms: 0,
727 })
728 } else {
729 let (wallet, agent): (
730 hypercall_types::WalletAddress,
731 hypercall_types::WalletAddress,
732 ) = from_wire_bytes(data)?;
733 Ok(EngineCommand::RevokeAgent {
734 wallet,
735 agent,
736 nonce: None,
737 timestamp_ms: 0,
738 })
739 }
740}
741
742fn deserialize_legacy_balance_update(
743 typ: CommandType,
744 data: &[u8],
745) -> anyhow::Result<EngineCommand> {
746 if typ == CommandType::DepositUpdate {
747 anyhow::bail!("legacy DepositUpdate payloads are unsupported");
748 }
749
750 if let Ok(payload) = from_named_wire_bytes::<BalanceCommandPayload>(data) {
751 return Ok(balance_update_command(payload));
752 }
753
754 if let Ok((wallet, amount, balance_after, timestamp_ms, sequence)) = from_wire_bytes::<(
755 hypercall_types::WalletAddress,
756 rust_decimal::Decimal,
757 rust_decimal::Decimal,
758 u64,
759 Option<u64>,
760 )>(data)
761 {
762 return Ok(balance_update_command(BalanceCommandPayload {
763 wallet,
764 amount,
765 balance_after,
766 timestamp_ms,
767 sequence,
768 }));
769 }
770
771 if let Ok((wallet, amount, balance_after, timestamp_ms)) = from_wire_bytes::<(
772 hypercall_types::WalletAddress,
773 rust_decimal::Decimal,
774 rust_decimal::Decimal,
775 u64,
776 )>(data)
777 {
778 return Ok(balance_update_command(BalanceCommandPayload {
779 wallet,
780 amount,
781 balance_after,
782 timestamp_ms,
783 sequence: None,
784 }));
785 }
786
787 warn!(
788 payload_len = data.len(),
789 command_type = ?typ,
790 "Received unsupported legacy balance update payload shape from NATS"
791 );
792 anyhow::bail!("Unsupported legacy balance update payload shape")
793}
794
795fn tier_update_command(payload: TierUpdatePayload) -> EngineCommand {
796 EngineCommand::TierUpdate {
797 wallet: payload.wallet,
798 margin_mode: payload.margin_mode,
799 tier: payload.tier,
800 trading_limits: payload.trading_limits,
801 }
802}
803
804fn price_update_command(payload: PriceUpdatePayload) -> EngineCommand {
805 EngineCommand::PriceUpdate {
806 underlying: payload.underlying,
807 spot_price: payload.spot_price,
808 timestamp_ms: payload.timestamp_ms,
809 }
810}
811
812fn iv_update_command(payload: IvUpdatePayload) -> EngineCommand {
813 let mut surface = crate::vol_oracle::vol_surface_cache::VolatilitySurface::new();
814 for point in &payload.strike_points {
815 surface.insert(point.strike, point.expiry, point.iv);
816 }
817 EngineCommand::IvUpdate {
818 underlying: payload.underlying,
819 surface,
820 journal_data: None,
821 timestamp_ms: payload.timestamp_ms,
822 }
823}
824
825fn deposit_update_command(payload: DepositUpdatePayload) -> EngineCommand {
826 EngineCommand::DepositUpdate {
827 wallet: payload.wallet,
828 amount: payload.amount,
829 timestamp_ms: payload.timestamp_ms,
830 sequence: Some(payload.sequence),
831 source_event_hash: payload.source_event_hash,
832 }
833}
834
835fn balance_update_command(payload: BalanceCommandPayload) -> EngineCommand {
836 EngineCommand::LiquidationBonusUpdate {
837 wallet: payload.wallet,
838 amount: payload.amount,
839 balance_after: payload.balance_after,
840 timestamp_ms: payload.timestamp_ms,
841 sequence: payload.sequence,
842 }
843}
844
845#[cfg(test)]
846mod tests {
847 use super::*;
848 use hypercall_types::api_models::TradingLimits;
849 use hypercall_types::serialize_to_wire_bytes;
850 use std::str::FromStr;
851
852 fn test_wallet() -> WalletAddress {
853 WalletAddress::from_str("0x1234567890123456789012345678901234567890")
854 .expect("valid test wallet")
855 }
856
857 #[test]
858 fn test_deserialize_tier_update_current_payload() {
859 let wallet = test_wallet();
860 let trading_limits = TradingLimits {
861 max_open_orders: 7,
862 max_open_positions: 8,
863 orders_per_minute: 70,
864 cancels_per_minute: 80,
865 api_requests_per_minute: 90,
866 };
867 let data = serialize_to_wire_bytes(&TierUpdatePayload {
868 wallet,
869 margin_mode: MarginMode::Portfolio,
870 tier: "tier1".to_string(),
871 trading_limits,
872 });
873
874 let command = deserialize_command(CommandType::TierUpdate, COMMAND_WIRE_VERSION_V1, &data)
875 .expect("deserialize TierUpdate");
876
877 match command {
878 EngineCommand::TierUpdate {
879 wallet: actual_wallet,
880 margin_mode,
881 tier,
882 trading_limits: actual_limits,
883 } => {
884 assert_eq!(actual_wallet, wallet);
885 assert_eq!(margin_mode, MarginMode::Portfolio);
886 assert_eq!(tier, "tier1");
887 assert_eq!(actual_limits.max_open_orders, 7);
888 assert_eq!(actual_limits.max_open_positions, 8);
889 }
890 other => panic!("expected current TierUpdate payload, got {other:?}"),
891 }
892 }
893
894 #[test]
895 fn test_deserialize_tier_update_legacy_margin_mode_payload() {
896 let wallet = test_wallet();
897 let data = serialize_to_wire_bytes(&(wallet, MarginMode::Standard));
898
899 let command =
900 deserialize_command(CommandType::TierUpdate, LEGACY_COMMAND_WIRE_VERSION, &data)
901 .expect("deserialize legacy TierUpdate");
902
903 match command {
904 EngineCommand::LegacyTierMarginModeUpdate {
905 wallet: actual_wallet,
906 margin_mode,
907 } => {
908 assert_eq!(actual_wallet, wallet);
909 assert_eq!(margin_mode, MarginMode::Standard);
910 }
911 other => panic!("expected legacy TierUpdate payload, got {other:?}"),
912 }
913 }
914
915 #[test]
916 fn test_deserialize_option_deposit_update_payload() {
917 let wallet = test_wallet();
918 let data = serialize_to_wire_bytes(&(
919 "deposit-request-1".to_string(),
920 wallet,
921 "BTC-20260130-100000-C".to_string(),
922 rust_decimal_macros::dec!(1.5),
923 1234_u64,
924 ));
925
926 let command = deserialize_command(
927 CommandType::OptionDepositUpdate,
928 COMMAND_WIRE_VERSION_V1,
929 &data,
930 )
931 .expect("deserialize OptionDepositUpdate");
932
933 match command {
934 EngineCommand::OptionDepositUpdate {
935 request_id,
936 wallet: actual_wallet,
937 symbol,
938 quantity,
939 timestamp_ms,
940 } => {
941 assert_eq!(request_id, "deposit-request-1");
942 assert_eq!(actual_wallet, wallet);
943 assert_eq!(symbol, "BTC-20260130-100000-C");
944 assert_eq!(quantity, rust_decimal_macros::dec!(1.5));
945 assert_eq!(timestamp_ms, 1234);
946 }
947 other => panic!("expected OptionDepositUpdate payload, got {other:?}"),
948 }
949 }
950
951 #[test]
952 fn test_deserialize_option_withdrawal_update_payload() {
953 let wallet = test_wallet();
954 let account = test_wallet();
955 let signer = test_wallet();
956 let rsm_signer = test_wallet();
957 let data = serialize_to_wire_bytes(&(
958 "withdrawal-request-1".to_string(),
959 wallet,
960 account,
961 signer,
962 rsm_signer,
963 "BTC-20260130-100000-C".to_string(),
964 rust_decimal_macros::dec!(0.75),
965 Some(5678_u64),
966 vec![1, 2, 3],
967 5678_u64,
968 ));
969
970 let command = deserialize_command(
971 CommandType::OptionWithdrawalUpdate,
972 COMMAND_WIRE_VERSION_V1,
973 &data,
974 )
975 .expect("deserialize OptionWithdrawalUpdate");
976
977 match command {
978 EngineCommand::OptionWithdrawalUpdate {
979 request_id,
980 wallet: actual_wallet,
981 account: actual_account,
982 signer: actual_signer,
983 rsm_signer: actual_rsm_signer,
984 symbol,
985 quantity,
986 nonce,
987 action,
988 timestamp_ms,
989 } => {
990 assert_eq!(request_id, "withdrawal-request-1");
991 assert_eq!(actual_wallet, wallet);
992 assert_eq!(actual_account, account);
993 assert_eq!(actual_signer, signer);
994 assert_eq!(actual_rsm_signer, rsm_signer);
995 assert_eq!(symbol, "BTC-20260130-100000-C");
996 assert_eq!(quantity, rust_decimal_macros::dec!(0.75));
997 assert_eq!(nonce, Some(5678));
998 assert_eq!(action, vec![1, 2, 3]);
999 assert_eq!(timestamp_ms, 5678);
1000 }
1001 other => panic!("expected OptionWithdrawalUpdate payload, got {other:?}"),
1002 }
1003 }
1004
1005 #[test]
1006 fn test_deserialize_deposit_update_current_payload() {
1007 let wallet = test_wallet();
1008 let source_event_hash = alloy::primitives::FixedBytes::<32>::from([7u8; 32]);
1009 let data = serialize_to_wire_bytes(&DepositUpdatePayload {
1010 wallet,
1011 amount: rust_decimal_macros::dec!(125.50),
1012 timestamp_ms: 6789,
1013 sequence: 42,
1014 source_event_hash,
1015 });
1016
1017 let command =
1018 deserialize_command(CommandType::DepositUpdate, COMMAND_WIRE_VERSION_V1, &data)
1019 .expect("deserialize current DepositUpdate");
1020
1021 match command {
1022 EngineCommand::DepositUpdate {
1023 wallet: actual_wallet,
1024 amount,
1025 timestamp_ms,
1026 sequence,
1027 source_event_hash: actual_source_event_hash,
1028 } => {
1029 assert_eq!(actual_wallet, wallet);
1030 assert_eq!(amount, rust_decimal_macros::dec!(125.50));
1031 assert_eq!(timestamp_ms, 6789);
1032 assert_eq!(sequence, Some(42));
1033 assert_eq!(actual_source_event_hash, source_event_hash);
1034 }
1035 other => panic!("expected current DepositUpdate payload, got {other:?}"),
1036 }
1037 }
1038
1039 #[test]
1040 fn test_deserialize_deposit_update_rejects_legacy_balance_payload() {
1041 let wallet = test_wallet();
1042 let data = serialize_to_wire_bytes(&BalanceCommandPayload {
1043 wallet,
1044 amount: rust_decimal_macros::dec!(125.50),
1045 balance_after: rust_decimal_macros::dec!(125.50),
1046 timestamp_ms: 6789,
1047 sequence: Some(42),
1048 });
1049
1050 let error = deserialize_command(CommandType::DepositUpdate, COMMAND_WIRE_VERSION_V1, &data)
1051 .expect_err("live DepositUpdate must reject legacy balance payload");
1052
1053 assert!(
1054 error.to_string().contains("source_event_hash"),
1055 "unexpected error: {error}"
1056 );
1057
1058 let replay_command = deserialize_command_for_replay(
1059 CommandType::DepositUpdate,
1060 COMMAND_WIRE_VERSION_V1,
1061 &data,
1062 )
1063 .expect("replay accepts legacy persisted DepositUpdate payload with sequence");
1064
1065 match replay_command {
1066 EngineCommand::DepositUpdate {
1067 wallet: actual_wallet,
1068 amount,
1069 timestamp_ms,
1070 sequence,
1071 source_event_hash,
1072 } => {
1073 assert_eq!(actual_wallet, wallet);
1074 assert_eq!(amount, rust_decimal_macros::dec!(125.50));
1075 assert_eq!(timestamp_ms, 6789);
1076 assert_eq!(sequence, Some(42));
1077 assert_eq!(source_event_hash, legacy_deposit_source_event_hash(42));
1078 }
1079 other => panic!("expected replay DepositUpdate payload, got {other:?}"),
1080 }
1081 }
1082
1083 #[test]
1084 fn test_replay_deserializes_legacy_deposit_update_tuple_with_sequence() {
1085 let wallet = test_wallet();
1086 let data = serialize_to_wire_bytes(&(
1087 wallet,
1088 rust_decimal_macros::dec!(125.50),
1089 rust_decimal_macros::dec!(1000.25),
1090 6789_u64,
1091 Some(42_u64),
1092 ));
1093
1094 deserialize_command(
1095 CommandType::DepositUpdate,
1096 LEGACY_COMMAND_WIRE_VERSION,
1097 &data,
1098 )
1099 .expect_err("live legacy DepositUpdate tuple must stay rejected");
1100
1101 let replay_command = deserialize_command_for_replay(
1102 CommandType::DepositUpdate,
1103 LEGACY_COMMAND_WIRE_VERSION,
1104 &data,
1105 )
1106 .expect("replay accepts sequenced legacy DepositUpdate tuple");
1107
1108 match replay_command {
1109 EngineCommand::DepositUpdate {
1110 wallet: actual_wallet,
1111 amount,
1112 timestamp_ms,
1113 sequence,
1114 source_event_hash,
1115 } => {
1116 assert_eq!(actual_wallet, wallet);
1117 assert_eq!(amount, rust_decimal_macros::dec!(125.50));
1118 assert_eq!(timestamp_ms, 6789);
1119 assert_eq!(sequence, Some(42));
1120 assert_eq!(source_event_hash, legacy_deposit_source_event_hash(42));
1121 }
1122 other => panic!("expected replay DepositUpdate payload, got {other:?}"),
1123 }
1124 }
1125
1126 #[test]
1127 fn test_deserialize_tier_update_legacy_full_payload() {
1128 let wallet = test_wallet();
1129 let trading_limits = TradingLimits {
1130 max_open_orders: 17,
1131 max_open_positions: 18,
1132 orders_per_minute: 170,
1133 cancels_per_minute: 180,
1134 api_requests_per_minute: 190,
1135 };
1136 let data = serialize_to_wire_bytes(&TierUpdatePayload {
1137 wallet,
1138 margin_mode: MarginMode::Portfolio,
1139 tier: "legacy-tier".to_string(),
1140 trading_limits,
1141 });
1142
1143 let command =
1144 deserialize_command(CommandType::TierUpdate, LEGACY_COMMAND_WIRE_VERSION, &data)
1145 .expect("deserialize legacy full TierUpdate");
1146
1147 match command {
1148 EngineCommand::TierUpdate {
1149 wallet: actual_wallet,
1150 margin_mode,
1151 tier,
1152 trading_limits: actual_limits,
1153 } => {
1154 assert_eq!(actual_wallet, wallet);
1155 assert_eq!(margin_mode, MarginMode::Portfolio);
1156 assert_eq!(tier, "legacy-tier");
1157 assert_eq!(actual_limits.max_open_orders, 17);
1158 assert_eq!(actual_limits.max_open_positions, 18);
1159 }
1160 other => panic!("expected legacy full TierUpdate payload, got {other:?}"),
1161 }
1162 }
1163
1164 #[test]
1165 fn test_deserialize_price_update_legacy_struct_payload() {
1166 let mut data = vec![hypercall_types::WIRE_FORMAT_VERSION];
1167 data.extend(
1168 rmp_serde::to_vec(&PriceUpdatePayload {
1169 underlying: "BTC".to_string(),
1170 spot_price: rust_decimal_macros::dec!(95000),
1171 timestamp_ms: 123,
1172 })
1173 .expect("serialize legacy PriceUpdate payload"),
1174 );
1175
1176 let command =
1177 deserialize_command(CommandType::PriceUpdate, LEGACY_COMMAND_WIRE_VERSION, &data)
1178 .expect("deserialize legacy PriceUpdate");
1179
1180 match command {
1181 EngineCommand::PriceUpdate {
1182 underlying,
1183 spot_price,
1184 timestamp_ms,
1185 } => {
1186 assert_eq!(underlying, "BTC");
1187 assert_eq!(spot_price, rust_decimal_macros::dec!(95000));
1188 assert_eq!(timestamp_ms, 123);
1189 }
1190 other => panic!("expected legacy PriceUpdate payload, got {other:?}"),
1191 }
1192 }
1193}