1use super::*;
4
5use crate::rsm::apply::{ApplyOutput, CommandEnvelope, EngineCommand};
10
11#[derive(Debug, Clone)]
13pub enum EngineError {
14 Rejected(String),
16 Internal(String),
18 Market(String),
20}
21
22impl std::fmt::Display for EngineError {
23 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
24 match self {
25 EngineError::Rejected(reason) => write!(f, "Rejected: {}", reason),
26 EngineError::Internal(reason) => write!(f, "Internal error: {}", reason),
27 EngineError::Market(reason) => write!(f, "Market error: {}", reason),
28 }
29 }
30}
31
32impl std::error::Error for EngineError {}
33
34impl UnifiedEngine {
35 fn validate_nonce_preview(
36 &self,
37 signer: &hypercall_types::WalletAddress,
38 nonce: Option<u64>,
39 command_timestamp_ms: u64,
40 preview_sets: &mut std::collections::HashMap<
41 hypercall_types::WalletAddress,
42 hypercall_engine::BoundedNonceSet,
43 >,
44 ) -> Result<(), EngineError> {
45 let Some(nonce) = nonce else {
46 return Ok(());
47 };
48 if !hypercall_engine::nonce_within_time_bounds(nonce, command_timestamp_ms) {
49 return Err(EngineError::Rejected(format!(
50 "nonce {} is outside time bounds for signer {} (command_ts={})",
51 nonce, signer, command_timestamp_ms
52 )));
53 }
54 let set = preview_sets.entry(*signer).or_insert_with(|| {
55 let mut set = self.ctx.nonce_sets.get(signer).cloned().unwrap_or_else(|| {
56 hypercall_engine::BoundedNonceSet::new(
57 hypercall_engine::nonce::DEFAULT_NONCE_SET_CAPACITY,
58 )
59 });
60 set.purge_expired(command_timestamp_ms);
61 set
62 });
63 if !set.insert(nonce) {
64 return Err(EngineError::Rejected(format!(
65 "nonce {} rejected for signer {} (duplicate or below set minimum {})",
66 nonce,
67 signer,
68 set.min().map(|m| m.to_string()).unwrap_or_default()
69 )));
70 }
71 Ok(())
72 }
73
74 fn validate_and_advance_nonce(
79 &mut self,
80 signer: &hypercall_types::WalletAddress,
81 nonce: Option<u64>,
82 command_timestamp_ms: u64,
83 ) -> Result<(), EngineError> {
84 let Some(nonce) = nonce else {
85 return Ok(());
86 };
87 if !hypercall_engine::nonce_within_time_bounds(nonce, command_timestamp_ms) {
88 return Err(EngineError::Rejected(format!(
89 "nonce {} is outside time bounds for signer {} (command_ts={})",
90 nonce, signer, command_timestamp_ms
91 )));
92 }
93 let set = self.ctx.nonce_sets.entry(*signer).or_insert_with(|| {
94 hypercall_engine::BoundedNonceSet::new(
95 hypercall_engine::nonce::DEFAULT_NONCE_SET_CAPACITY,
96 )
97 });
98 set.purge_expired(command_timestamp_ms);
99 if !set.insert(nonce) {
100 return Err(EngineError::Rejected(format!(
101 "nonce {} rejected for signer {} (duplicate or below set minimum {})",
102 nonce,
103 signer,
104 set.min().map(|m| m.to_string()).unwrap_or_default()
105 )));
106 }
107 Ok(())
108 }
109
110 fn allocate_rsm_nonce(&mut self, signer: hypercall_types::WalletAddress) -> u64 {
111 let next = self.ctx.rsm_signer_nonces.entry(signer).or_insert(0);
112 let allocated = *next;
113 *next = next.saturating_add(1);
114 allocated
115 }
116
117 pub fn seed_rsm_signer_nonce_floor(
118 &mut self,
119 signer: hypercall_types::WalletAddress,
120 next_nonce: u64,
121 ) {
122 let current = self.ctx.rsm_signer_nonces.entry(signer).or_insert(0);
123 if *current < next_nonce {
124 *current = next_nonce;
125 }
126 }
127
128 fn validate_cash_withdrawal_margin(
129 &self,
130 wallet: hypercall_types::WalletAddress,
131 amount: rust_decimal::Decimal,
132 ) -> Result<(), EngineError> {
133 if matches!(
134 self.ctx.deps.liquidation_states.get(&wallet),
135 Some(hypercall_types::LiquidationStateType::PreLiquidation)
136 | Some(hypercall_types::LiquidationStateType::InLiquidation)
137 ) {
138 return Err(EngineError::Rejected(
139 "withdrawals are disabled while the account is in liquidation".to_string(),
140 ));
141 }
142
143 let margin_mode = self
144 .margin_manager
145 .get_margin_mode(&self.ctx.deps, &wallet)
146 .map_err(|error| {
147 EngineError::Internal(format!(
148 "failed to validate cash withdrawal margin for {}: {}",
149 wallet, error
150 ))
151 })?;
152
153 match margin_mode {
154 crate::rsm::MarginMode::Standard => {
155 let mut simulated_balances = self.ctx.balance_ledger.clone();
156 let post_balance = simulated_balances
157 .get(&wallet)
158 .copied()
159 .unwrap_or(rust_decimal::Decimal::ZERO)
160 - amount;
161 simulated_balances.insert(wallet, post_balance);
162 let account =
163 crate::standard_margin::StandardAccountBuilder::build_from_engine_state(
164 &wallet,
165 &simulated_balances,
166 &self.ctx.engine_positions,
167 &self.ctx.deps.reference_prices,
168 )
169 .map_err(|error| {
170 EngineError::Internal(format!(
171 "failed to build standard margin account for cash withdrawal {}: {}",
172 wallet, error
173 ))
174 })?;
175 let margin = self
176 .margin_manager
177 .standard_margin_service
178 .compute_margin(&account);
179 if margin.maintenance_margin < rust_decimal::Decimal::ZERO {
180 return Err(EngineError::Rejected(format!(
181 "withdrawal would put account below maintenance margin: post_withdraw_equity={}, maintenance_margin_shortfall={}",
182 margin.equity,
183 -margin.maintenance_margin
184 )));
185 }
186 }
187 crate::rsm::MarginMode::Portfolio => {
188 let margin = self
189 .margin_manager
190 .get_span_margin_for_wallet(
191 &self.ctx.deps,
192 &self.ctx.engine_positions,
193 &self.ctx.balance_ledger,
194 &wallet,
195 )
196 .map_err(|error| {
197 EngineError::Internal(format!(
198 "failed to compute portfolio margin for cash withdrawal {}: {}",
199 wallet, error
200 ))
201 })?
202 .ok_or_else(|| {
203 EngineError::Internal(format!(
204 "missing portfolio margin details for cash withdrawal {}",
205 wallet
206 ))
207 })?;
208 let post_withdraw_equity = margin.equity - amount;
209 if post_withdraw_equity < margin.maintenance_margin_required {
210 return Err(EngineError::Rejected(format!(
211 "withdrawal would put account below maintenance margin: post_withdraw_equity={}, maintenance_required={}",
212 post_withdraw_equity, margin.maintenance_margin_required
213 )));
214 }
215 }
216 }
217
218 Ok(())
219 }
220
221 fn validate_option_withdrawal_margin(
222 &self,
223 wallet: hypercall_types::WalletAddress,
224 symbol: &str,
225 quantity: rust_decimal::Decimal,
226 ) -> Result<(), EngineError> {
227 if matches!(
228 self.ctx.deps.liquidation_states.get(&wallet),
229 Some(hypercall_types::LiquidationStateType::PreLiquidation)
230 | Some(hypercall_types::LiquidationStateType::InLiquidation)
231 ) {
232 return Err(EngineError::Rejected(
233 "withdrawals are disabled while the account is in liquidation".to_string(),
234 ));
235 }
236
237 let margin_mode = self
238 .margin_manager
239 .get_margin_mode(&self.ctx.deps, &wallet)
240 .map_err(|error| {
241 EngineError::Internal(format!(
242 "failed to validate option withdrawal margin for {}: {}",
243 wallet, error
244 ))
245 })?;
246
247 let mut simulated_positions = self.ctx.engine_positions.clone();
248 crate::rsm::engine_deps::apply_option_withdrawal_to_positions(
249 &mut simulated_positions,
250 wallet,
251 symbol.to_string(),
252 quantity,
253 )
254 .map_err(EngineError::Rejected)?;
255
256 match margin_mode {
257 crate::rsm::MarginMode::Standard => {
258 let account =
259 crate::standard_margin::StandardAccountBuilder::build_from_engine_state(
260 &wallet,
261 &self.ctx.balance_ledger,
262 &simulated_positions,
263 &self.ctx.deps.reference_prices,
264 )
265 .map_err(|error| {
266 EngineError::Internal(format!(
267 "failed to build standard margin account for option withdrawal {}: {}",
268 wallet, error
269 ))
270 })?;
271 let margin = self
272 .margin_manager
273 .standard_margin_service
274 .compute_margin(&account);
275 if margin.maintenance_margin < rust_decimal::Decimal::ZERO {
276 return Err(EngineError::Rejected(format!(
277 "option withdrawal would put account below maintenance margin: equity={}, maintenance_margin_shortfall={}",
278 margin.equity,
279 -margin.maintenance_margin
280 )));
281 }
282 }
283 crate::rsm::MarginMode::Portfolio => {
284 let margin = self
285 .margin_manager
286 .get_span_margin_for_wallet(
287 &self.ctx.deps,
288 &simulated_positions,
289 &self.ctx.balance_ledger,
290 &wallet,
291 )
292 .map_err(|error| {
293 EngineError::Internal(format!(
294 "failed to compute portfolio margin for option withdrawal {}: {}",
295 wallet, error
296 ))
297 })?
298 .ok_or_else(|| {
299 EngineError::Internal(format!(
300 "missing portfolio margin details for option withdrawal {}",
301 wallet
302 ))
303 })?;
304 if margin.equity < margin.maintenance_margin_required {
305 return Err(EngineError::Rejected(format!(
306 "option withdrawal would put account below maintenance margin: equity={}, maintenance_required={}",
307 margin.equity, margin.maintenance_margin_required
308 )));
309 }
310 }
311 }
312
313 Ok(())
314 }
315
316 pub(super) fn apply_deposit_update_to_balance_ledger(
317 &mut self,
318 update_kind: &'static str,
319 wallet: hypercall_types::WalletAddress,
320 amount: rust_decimal::Decimal,
321 timestamp_ms: u64,
322 sequence: Option<u64>,
323 source_event_hash: &alloy::primitives::FixedBytes<32>,
324 output: &mut ApplyOutput,
325 ) -> Result<(), EngineError> {
326 if sequence.is_none() {
327 return Err(EngineError::Internal(format!(
328 "{} for {} missing durable sequence",
329 update_kind, wallet
330 )));
331 }
332 if source_event_hash.is_zero() {
333 return Err(EngineError::Internal(format!(
334 "{} for {} missing source_event_hash",
335 update_kind, wallet
336 )));
337 }
338
339 if self
351 .ctx
352 .applied_deposit_source_event_hashes
353 .contains(source_event_hash)
354 {
355 let current_balance = self.ctx.balance_ledger.balance(&wallet);
356 warn!(
357 wallet = %wallet,
358 amount = %amount,
359 current_balance = %current_balance,
360 timestamp_ms = timestamp_ms,
361 sequence = ?sequence,
362 update_kind = update_kind,
363 source_event_hash = %source_event_hash,
364 "Skipping duplicate balance update command"
365 );
366 return Ok(());
367 }
368
369 if let Some(last) = self.last_deposit_update(&wallet) {
370 let stale_update = match last.source {
371 crate::rsm::engine_deps::BalanceLedgerMutationSource::DepositUpdate => {
372 match (sequence, last.sequence) {
373 (Some(sequence), Some(last_sequence)) if sequence == last_sequence => true,
378 (Some(_), Some(_)) => false,
383 (Some(_), None) => false,
387 (None, Some(_)) => true,
390 (None, None) => timestamp_ms <= last.timestamp_ms,
393 }
394 }
395 crate::rsm::engine_deps::BalanceLedgerMutationSource::CashWithdrawal => {
396 match (sequence, last.sequence) {
402 (Some(sequence), Some(last_sequence)) if sequence == last_sequence => true,
406 (Some(_), Some(_)) => false,
409 (None, Some(_)) => true,
412 _ => timestamp_ms <= last.timestamp_ms,
414 }
415 }
416 };
417 if stale_update {
418 let current_balance = self.ctx.balance_ledger.balance(&wallet);
419 warn!(
420 wallet = %wallet,
421 amount = %amount,
422 current_balance = %current_balance,
423 timestamp_ms = timestamp_ms,
424 sequence = ?sequence,
425 last_timestamp_ms = last.timestamp_ms,
426 last_sequence = ?last.sequence,
427 last_balance_after = %last.balance_after,
428 update_kind = update_kind,
429 "Skipping stale balance update command"
430 );
431 return Ok(());
432 }
433 }
434
435 let previous_balance = self.ctx.balance_ledger.balance(&wallet);
439 let applied_balance_after = previous_balance + amount;
440 let update = self.build_balance_update(
441 wallet,
442 amount,
443 applied_balance_after,
444 hypercall_types::BalanceUpdateReason::Deposit,
445 sequence.map(|seq| seq.to_string()),
446 timestamp_ms,
447 );
448 self.apply_balance_update(update, output)?;
449 self.ctx
450 .applied_deposit_source_event_hashes
451 .insert(*source_event_hash);
452 let watermark_sequence = match (
453 sequence,
454 self.last_deposit_update(&wallet)
455 .and_then(|last| last.sequence),
456 ) {
457 (Some(sequence), Some(last_sequence)) => Some(sequence.max(last_sequence)),
458 (Some(sequence), None) => Some(sequence),
459 (None, Some(last_sequence)) => Some(last_sequence),
460 (None, None) => None,
461 };
462 let watermark_timestamp_ms = self
463 .last_deposit_update(&wallet)
464 .map(|last| timestamp_ms.max(last.timestamp_ms))
465 .unwrap_or(timestamp_ms);
466 self.ctx.deposit_update_watermarks.insert(
467 wallet,
468 crate::rsm::engine_deps::DepositUpdateWatermark {
469 sequence: watermark_sequence,
470 timestamp_ms: watermark_timestamp_ms,
471 balance_after: applied_balance_after,
472 source: crate::rsm::engine_deps::BalanceLedgerMutationSource::DepositUpdate,
473 },
474 );
475 debug!(
476 wallet = %wallet,
477 amount = %amount,
478 previous_balance = %previous_balance,
479 applied_balance_after = %applied_balance_after,
480 timestamp_ms = timestamp_ms,
481 sequence = ?sequence,
482 update_kind = update_kind,
483 source_event_hash = %source_event_hash,
484 "Applied balance update command"
485 );
486 Ok(())
487 }
488
489 pub(super) fn apply_balance_snapshot_update_to_balance_ledger(
490 &mut self,
491 update_kind: &'static str,
492 wallet: hypercall_types::WalletAddress,
493 amount: rust_decimal::Decimal,
494 balance_after: rust_decimal::Decimal,
495 timestamp_ms: u64,
496 sequence: Option<u64>,
497 output: &mut ApplyOutput,
498 ) -> Result<(), EngineError> {
499 if let Some(last) = self.last_deposit_update(&wallet) {
500 let stale_update = match last.source {
505 crate::rsm::engine_deps::BalanceLedgerMutationSource::DepositUpdate => {
506 match (sequence, last.sequence) {
507 (Some(sequence), Some(last_sequence)) => sequence <= last_sequence,
509 (Some(_), None) => false,
510 (None, Some(_)) => true,
511 (None, None) => {
512 timestamp_ms == last.timestamp_ms
517 && ((amount >= rust_decimal::Decimal::ZERO
518 && balance_after <= last.balance_after)
519 || (amount < rust_decimal::Decimal::ZERO
520 && balance_after >= last.balance_after))
521 }
522 }
523 }
524 crate::rsm::engine_deps::BalanceLedgerMutationSource::CashWithdrawal => {
525 if timestamp_ms <= last.timestamp_ms {
530 true
534 } else {
535 match (sequence, last.sequence) {
536 (Some(sequence), Some(last_sequence)) => sequence <= last_sequence,
539 (Some(_), None) => false,
540 (None, Some(_)) => true,
541 (None, None) => false,
542 }
543 }
544 }
545 };
546 if stale_update {
547 let current_balance = self.ctx.balance_ledger.balance(&wallet);
548 warn!(
549 wallet = %wallet,
550 amount = %amount,
551 current_balance = %current_balance,
552 balance_after = %balance_after,
553 timestamp_ms = timestamp_ms,
554 sequence = ?sequence,
555 last_timestamp_ms = last.timestamp_ms,
556 last_sequence = ?last.sequence,
557 last_balance_after = %last.balance_after,
558 update_kind = update_kind,
559 "Skipping stale balance snapshot command"
560 );
561 return Ok(());
562 }
563 }
564
565 let previous_balance = self.ctx.balance_ledger.balance(&wallet);
566 let reason = match update_kind {
567 "LiquidationBonusUpdate" => hypercall_types::BalanceUpdateReason::LiquidationBonus,
568 _ if amount < rust_decimal::Decimal::ZERO => {
569 hypercall_types::BalanceUpdateReason::Withdrawal
570 }
571 _ => hypercall_types::BalanceUpdateReason::Deposit,
572 };
573 let update = self.build_balance_update(
574 wallet,
575 amount,
576 balance_after,
577 reason,
578 sequence.map(|seq| seq.to_string()),
579 timestamp_ms,
580 );
581 self.apply_balance_update(update, output)?;
582 self.ctx.deposit_update_watermarks.insert(
583 wallet,
584 crate::rsm::engine_deps::DepositUpdateWatermark {
585 sequence,
586 timestamp_ms,
587 balance_after,
588 source: crate::rsm::engine_deps::BalanceLedgerMutationSource::DepositUpdate,
589 },
590 );
591 debug!(
592 wallet = %wallet,
593 amount = %amount,
594 previous_balance = %previous_balance,
595 balance_after = %balance_after,
596 timestamp_ms = timestamp_ms,
597 sequence = ?sequence,
598 update_kind = update_kind,
599 "Applied balance snapshot command"
600 );
601 Ok(())
602 }
603
604 fn build_balance_update(
605 &self,
606 wallet: hypercall_types::WalletAddress,
607 delta: rust_decimal::Decimal,
608 balance_after: rust_decimal::Decimal,
609 reason: hypercall_types::BalanceUpdateReason,
610 reference_id: Option<String>,
611 timestamp_ms: u64,
612 ) -> hypercall_types::BalanceUpdate {
613 hypercall_types::BalanceUpdate {
614 balance_update_seq: self.ctx.balance_ledger.next_balance_update_seq(),
615 wallet,
616 delta,
617 balance_after,
618 reason,
619 reference_id,
620 source_command_id: None,
621 timestamp_ms,
622 }
623 }
624
625 fn apply_balance_update(
626 &mut self,
627 update: hypercall_types::BalanceUpdate,
628 output: &mut ApplyOutput,
629 ) -> Result<(), EngineError> {
630 let applied = self
631 .ctx
632 .balance_ledger
633 .apply_balance_update(&update)
634 .map_err(|error| EngineError::Internal(error.to_string()))?;
635 if !applied {
636 return Err(EngineError::Internal(format!(
637 "balance update seq {} for {} was not applied",
638 update.balance_update_seq, update.wallet
639 )));
640 }
641 output.balance_updates.push(update);
642 Ok(())
643 }
644
645 fn last_deposit_update(
646 &self,
647 wallet: &hypercall_types::WalletAddress,
648 ) -> Option<crate::rsm::engine_deps::DepositUpdateWatermark> {
649 self.ctx.deposit_update_watermarks.get(wallet).copied()
650 }
651
652 pub(super) fn apply_option_deposit_update(
653 &mut self,
654 request_id: String,
655 wallet: hypercall_types::WalletAddress,
656 symbol: String,
657 quantity: rust_decimal::Decimal,
658 timestamp_ms: u64,
659 ) -> Result<(), EngineError> {
660 if quantity <= rust_decimal::Decimal::ZERO {
661 return Err(EngineError::Internal(format!(
662 "option deposit {} for {} {} has non-positive quantity {}",
663 request_id, wallet, symbol, quantity
664 )));
665 }
666 let request_uuid = uuid::Uuid::parse_str(&request_id).map_err(|error| {
667 EngineError::Internal(format!(
668 "OptionDepositUpdate request_id {} is not a UUID: {}",
669 request_id, error
670 ))
671 })?;
672
673 crate::rsm::engine_deps::apply_option_deposit_to_positions(
674 &mut self.ctx.engine_positions,
675 wallet,
676 symbol.clone(),
677 quantity,
678 );
679 debug!(
680 wallet = %wallet,
681 symbol = %symbol,
682 quantity = %quantity,
683 timestamp_ms = timestamp_ms,
684 request_id = %request_uuid,
685 "Applied OptionDepositUpdate command"
686 );
687 Ok(())
688 }
689
690 pub(super) fn apply_option_withdrawal_update(
691 &mut self,
692 request_id: String,
693 wallet: hypercall_types::WalletAddress,
694 account: hypercall_types::WalletAddress,
695 signer: hypercall_types::WalletAddress,
696 rsm_signer: hypercall_types::WalletAddress,
697 symbol: String,
698 quantity: rust_decimal::Decimal,
699 nonce: Option<u64>,
700 action: Vec<u8>,
701 timestamp_ms: u64,
702 output: &mut ApplyOutput,
703 ) -> Result<(), EngineError> {
704 let request_uuid = uuid::Uuid::parse_str(&request_id).map_err(|error| {
705 EngineError::Internal(format!(
706 "OptionWithdrawalUpdate request_id {} is not a UUID: {}",
707 request_id, error
708 ))
709 })?;
710
711 if quantity <= rust_decimal::Decimal::ZERO {
712 return Err(EngineError::Rejected(
713 "option withdrawal quantity must be positive".to_string(),
714 ));
715 }
716 let current_quantity = self
717 .ctx
718 .engine_positions
719 .get(&(wallet, symbol.clone()))
720 .map(|position| position.quantity)
721 .unwrap_or_default();
722 if current_quantity < quantity {
723 return Err(EngineError::Rejected(format!(
724 "insufficient option balance for withdrawal: have {}, need {}",
725 current_quantity, quantity
726 )));
727 }
728 self.validate_option_withdrawal_margin(wallet, &symbol, quantity)?;
729 self.validate_and_advance_nonce(&signer, nonce, timestamp_ms)?;
730 let rsm_nonce = self.allocate_rsm_nonce(rsm_signer);
731 crate::rsm::engine_deps::apply_option_withdrawal_to_positions(
732 &mut self.ctx.engine_positions,
733 wallet,
734 symbol.clone(),
735 quantity,
736 )
737 .map_err(EngineError::Rejected)?;
738 output.outbox_appends.push(
739 crate::directive_outbox::DirectiveOutboxAppend::needs_rsm_signature(
740 request_uuid.to_string(),
741 hypercall_types::directives::ActionKey::SystemCreditOption,
742 wallet,
743 account,
744 rsm_signer,
745 rsm_nonce,
746 format!("rsm:{}:{}", rsm_signer, request_uuid),
747 action,
748 timestamp_ms,
749 Some(timestamp_ms.saturating_add(300_000)),
750 ),
751 );
752 debug!(
753 wallet = %wallet,
754 symbol = %symbol,
755 quantity = %quantity,
756 timestamp_ms = timestamp_ms,
757 request_id = %request_uuid,
758 "Applied OptionWithdrawalUpdate command"
759 );
760 Ok(())
761 }
762
763 pub(super) fn apply_cash_withdrawal_update(
764 &mut self,
765 request_id: String,
766 wallet: hypercall_types::WalletAddress,
767 destination: hypercall_types::WalletAddress,
768 signer: hypercall_types::WalletAddress,
769 rsm_signer: hypercall_types::WalletAddress,
770 amount: rust_decimal::Decimal,
771 amount_wei: u64,
772 account: hypercall_types::WalletAddress,
773 nonce: Option<u64>,
774 timestamp_ms: u64,
775 output: &mut ApplyOutput,
776 ) -> Result<rust_decimal::Decimal, EngineError> {
777 let request_uuid = uuid::Uuid::parse_str(&request_id).map_err(|error| {
778 EngineError::Internal(format!(
779 "CashWithdrawalUpdate request_id {} is not a UUID: {}",
780 request_id, error
781 ))
782 })?;
783 if amount <= rust_decimal::Decimal::ZERO {
784 return Err(EngineError::Rejected(
785 "cash withdrawal amount must be positive".to_string(),
786 ));
787 }
788 if amount_wei == 0 {
789 return Err(EngineError::Rejected(
790 "cash withdrawal amount_wei must be positive".to_string(),
791 ));
792 }
793 let current_balance = self.ctx.balance_ledger.balance(&wallet);
794 if current_balance < amount {
795 return Err(EngineError::Rejected(format!(
796 "insufficient USDC balance for withdrawal: available={}, requested={}",
797 current_balance, amount
798 )));
799 }
800 self.validate_cash_withdrawal_margin(wallet, amount)?;
801 let action = crate::rsm_credit_directive_producer::encode_system_withdraw_token_action(
802 crate::rsm_credit_directive_producer::SystemWithdrawTokenDirective {
803 destination,
804 sub_account: hypercall_types::WalletAddress::default(),
805 src_dex: 0,
806 dst_dex: 0,
807 token: 0,
808 amount_wei,
809 },
810 )
811 .map_err(|error| {
812 EngineError::Internal(format!("failed to encode withdrawal action: {}", error))
813 })?;
814 self.validate_and_advance_nonce(&signer, nonce, timestamp_ms)?;
815 let balance_after = current_balance - amount;
816 let update = self.build_balance_update(
817 wallet,
818 -amount,
819 balance_after,
820 hypercall_types::BalanceUpdateReason::Withdrawal,
821 Some(request_uuid.to_string()),
822 timestamp_ms,
823 );
824 self.apply_balance_update(update, output)?;
825 let last_deposit_sequence = self
826 .last_deposit_update(&wallet)
827 .and_then(|watermark| watermark.sequence);
828 self.ctx.deposit_update_watermarks.insert(
829 wallet,
830 crate::rsm::engine_deps::DepositUpdateWatermark {
831 sequence: last_deposit_sequence,
832 timestamp_ms,
833 balance_after,
834 source: crate::rsm::engine_deps::BalanceLedgerMutationSource::CashWithdrawal,
835 },
836 );
837 let rsm_nonce = self.allocate_rsm_nonce(rsm_signer);
838 output.outbox_appends.push(
839 crate::directive_outbox::DirectiveOutboxAppend::needs_rsm_signature(
840 request_uuid.to_string(),
841 hypercall_types::directives::ActionKey::SystemWithdrawToken,
842 wallet,
843 account,
844 rsm_signer,
845 rsm_nonce,
846 format!("rsm:{}:{}", rsm_signer, request_uuid),
847 action,
848 timestamp_ms,
849 Some(timestamp_ms.saturating_add(1_800_000)),
850 ),
851 );
852 debug!(
853 wallet = %wallet,
854 amount = %amount,
855 balance_after = %balance_after,
856 timestamp_ms = timestamp_ms,
857 request_id = %request_uuid,
858 "Applied CashWithdrawalUpdate command"
859 );
860 Ok(balance_after)
861 }
862
863 pub fn apply(&mut self, env: CommandEnvelope) -> Result<ApplyOutput, EngineError> {
879 let timestamp = env.received_ts_ms;
880 self.ctx.deps.margin_timestamp_s = (timestamp / 1000) as i64;
881 let mut output = ApplyOutput::with_capacity(8);
882
883 #[cfg(feature = "rsm-state")]
884 {
885 output.command_identity_hash = env.command.identity_hash();
886 }
887
888 match env.command {
889 EngineCommand::OrderAction(order_msg) => {
890 let signer = order_msg.api_wallet_address.unwrap_or(order_msg.wallet);
891 self.validate_and_advance_nonce(&signer, order_msg.info.nonce, timestamp)?;
892 self.apply_order_action(order_msg, timestamp, &mut output);
893 }
894 EngineCommand::MarketAction(market_cmd) => {
895 self.apply_market_action(market_cmd, timestamp, &mut output)?;
896 }
897 EngineCommand::LiquidationState(liq_msg) => {
898 self.ctx
899 .deps
900 .liquidation_states
901 .insert(liq_msg.wallet, liq_msg.new_state);
902 output.push(EngineMessage::LiquidationStateChange(liq_msg));
903 }
904 EngineCommand::TickExpiry { now_ms, context } => {
905 self.process_expiry_tick_collecting(now_ms, context, &mut output, None)?;
907 }
908 EngineCommand::TickSnapshot { now_ms: _ } => {
909 }
913 EngineCommand::PriceUpdate {
914 underlying,
915 spot_price,
916 timestamp_ms,
917 } => {
918 use rust_decimal::prelude::ToPrimitive;
919 self.ctx.spot_prices.insert(underlying.clone(), spot_price);
920 if let Some(price_f64) = spot_price.to_f64() {
921 self.ctx
922 .deps
923 .reference_prices
924 .insert(underlying.clone(), price_f64);
925 }
926 debug!(
927 underlying = %underlying,
928 spot_price = %spot_price,
929 timestamp_ms = timestamp_ms,
930 "Applied PriceUpdate command"
931 );
932 }
933 EngineCommand::IvUpdate {
934 underlying,
935 surface,
936 timestamp_ms,
937 ..
938 } => {
939 self.ctx
940 .iv_surfaces
941 .insert(underlying.clone(), surface.clone());
942 self.engine_iv_surfaces
943 .write()
944 .expect("engine IV surfaces lock poisoned")
945 .insert(underlying.clone(), surface);
946 debug!(
947 underlying = %underlying,
948 timestamp_ms = timestamp_ms,
949 "Applied IvUpdate command"
950 );
951 }
952 EngineCommand::TierUpdate {
953 wallet,
954 margin_mode,
955 tier,
956 trading_limits,
957 } => {
958 self.ctx
959 .deps
960 .wallet_margin_modes
961 .insert(wallet, margin_mode);
962 self.ctx.deps.wallet_tiers.insert(wallet, tier.clone());
963 self.ctx
964 .deps
965 .wallet_trading_limits
966 .insert(wallet, trading_limits);
967 debug!(
968 wallet = %wallet,
969 margin_mode = ?margin_mode,
970 tier = %tier,
971 max_open_orders = trading_limits.max_open_orders,
972 max_open_positions = trading_limits.max_open_positions,
973 "Applied TierUpdate command"
974 );
975 }
976 EngineCommand::LegacyTierMarginModeUpdate {
977 wallet,
978 margin_mode,
979 } => {
980 self.ctx
981 .deps
982 .wallet_margin_modes
983 .insert(wallet, margin_mode);
984 debug!(
985 wallet = %wallet,
986 margin_mode = ?margin_mode,
987 "Applied legacy TierUpdate margin-mode command"
988 );
989 }
990 EngineCommand::HypercorePositionUpdate {
991 ref account,
992 ref coin,
993 size,
994 entry_price,
995 unrealized_pnl,
996 timestamp_ms,
997 } => {
998 let key = (account.to_lowercase(), coin.clone());
999 if size == 0.0 {
1000 self.ctx.deps.perp_positions.remove(&key);
1001 } else {
1002 self.ctx.deps.perp_positions.insert(
1003 key,
1004 crate::hypercore::PerpPosition {
1005 coin: coin.clone(),
1006 size,
1007 entry_price: Some(entry_price),
1008 position_value: size * entry_price,
1009 unrealized_pnl,
1010 margin_used: 0.0,
1011 liquidation_price: None,
1012 },
1013 );
1014 }
1015 debug!(
1016 account = %account,
1017 coin = %coin,
1018 size = size,
1019 timestamp_ms = timestamp_ms,
1020 "Applied HypercorePositionUpdate command"
1021 );
1022 }
1023 EngineCommand::HypercoreEquityUpdate {
1024 wallet,
1025 account_value,
1026 timestamp_ms,
1027 } => {
1028 let stale = self
1029 .ctx
1030 .deps
1031 .hypercore_equity_timestamps
1032 .get(&wallet)
1033 .map(|&last_ts| timestamp_ms <= last_ts)
1034 .unwrap_or(false);
1035 if stale {
1036 debug!(
1037 wallet = %wallet,
1038 account_value = %account_value,
1039 timestamp_ms = timestamp_ms,
1040 "Skipping stale HypercoreEquityUpdate"
1041 );
1042 } else {
1043 self.ctx
1044 .deps
1045 .hypercore_account_equity
1046 .insert(wallet, account_value);
1047 self.ctx
1048 .deps
1049 .hypercore_equity_timestamps
1050 .insert(wallet, timestamp_ms);
1051 debug!(
1052 wallet = %wallet,
1053 account_value = %account_value,
1054 timestamp_ms = timestamp_ms,
1055 "Applied HypercoreEquityUpdate command"
1056 );
1057 }
1058 }
1059 EngineCommand::SetPmSettlementPoolConfig(command) => {
1060 let effects = self
1061 .ctx
1062 .pm_settlement_state
1063 .apply_set_config(command)
1064 .map_err(EngineError::Rejected)?;
1065 output.pm_settlement_effects.extend(effects);
1066 }
1067 EngineCommand::RecordPmVaultDeposit(command) => {
1068 let effects = self
1069 .ctx
1070 .pm_settlement_state
1071 .apply_record_vault_deposit(command)
1072 .map_err(EngineError::Rejected)?;
1073 output.pm_settlement_effects.extend(effects);
1074 }
1075 EngineCommand::RequestPmVaultWithdrawal(command) => {
1076 let effects = self
1077 .ctx
1078 .pm_settlement_state
1079 .apply_request_vault_withdrawal(command)
1080 .map_err(EngineError::Rejected)?;
1081 output.pm_settlement_effects.extend(effects);
1082 }
1083 EngineCommand::AccruePmSettlementInterest(command) => {
1084 let effects = self
1085 .ctx
1086 .pm_settlement_state
1087 .apply_accrue_interest(command)
1088 .map_err(EngineError::Rejected)?;
1089 output.pm_settlement_effects.extend(effects);
1090 }
1091 EngineCommand::ApplyPmSettlementRepayment(command) => {
1092 let effects = self
1093 .ctx
1094 .pm_settlement_state
1095 .apply_repayment(command)
1096 .map_err(EngineError::Rejected)?;
1097 output.pm_settlement_effects.extend(effects);
1098 }
1099 EngineCommand::JournalPmRecoveryPlan(command) => {
1100 let effects = self
1101 .ctx
1102 .pm_settlement_state
1103 .apply_journal_recovery_plan(command)
1104 .unwrap_or_else(|error| {
1105 panic!("RUNTIME_INVARIANT: replayed JournalPmRecoveryPlan failed: {error}")
1106 });
1107 output.pm_settlement_effects.extend(effects);
1108 }
1109 EngineCommand::MarkPmRecoveryActionSubmitted(command) => {
1110 let effects = self
1111 .ctx
1112 .pm_settlement_state
1113 .apply_mark_recovery_action_submitted(command)
1114 .unwrap_or_else(|error| {
1115 panic!(
1116 "RUNTIME_INVARIANT: replayed MarkPmRecoveryActionSubmitted failed: {error}"
1117 )
1118 });
1119 output.pm_settlement_effects.extend(effects);
1120 }
1121 EngineCommand::ResolvePmRecoveryAction(command) => {
1122 let effects = self
1123 .ctx
1124 .pm_settlement_state
1125 .apply_resolve_recovery_action(command)
1126 .unwrap_or_else(|error| {
1127 panic!(
1128 "RUNTIME_INVARIANT: replayed ResolvePmRecoveryAction failed: {error}"
1129 )
1130 });
1131 output.pm_settlement_effects.extend(effects);
1132 }
1133 EngineCommand::MmpConfigUpdate {
1134 wallet,
1135 currency,
1136 enabled,
1137 interval_ms,
1138 frozen_time_ms,
1139 qty_limit,
1140 delta_limit,
1141 vega_limit,
1142 } => {
1143 self.ctx
1144 .deps
1145 .mmp_enabled
1146 .insert((wallet, currency.clone()), enabled);
1147
1148 let key = (wallet, currency.clone());
1151 let state = self.ctx.mmp_state.entry(key).or_insert_with(|| {
1152 crate::rsm::engine_deps::EngineMmpState::new(
1153 enabled,
1154 interval_ms,
1155 frozen_time_ms,
1156 qty_limit,
1157 delta_limit,
1158 vega_limit,
1159 )
1160 });
1161 state.enabled = enabled;
1164 state.interval_ms = interval_ms;
1165 state.frozen_time_ms = frozen_time_ms;
1166 state.qty_limit = qty_limit;
1167 state.delta_limit = delta_limit;
1168 state.vega_limit = vega_limit;
1169
1170 debug!(
1171 wallet = %wallet,
1172 currency = %currency,
1173 enabled = enabled,
1174 "Applied MmpConfigUpdate command"
1175 );
1176 }
1177 EngineCommand::RfqExecute(rfq_cmd) => {
1178 let taker_submit_signer = rfq_cmd.taker_submit_signer();
1179 let taker_accept_signer = rfq_cmd.taker_accept_signer();
1180 let mut nonce_preview = std::collections::HashMap::new();
1181 self.validate_nonce_preview(
1182 &taker_submit_signer,
1183 rfq_cmd.taker_nonce,
1184 timestamp,
1185 &mut nonce_preview,
1186 )?;
1187 self.validate_nonce_preview(
1188 &taker_accept_signer,
1189 rfq_cmd.taker_accept_nonce,
1190 timestamp,
1191 &mut nonce_preview,
1192 )?;
1193
1194 match self.plan_rfq_execution(&rfq_cmd) {
1195 Ok(plan) => {
1196 self.validate_and_advance_nonce(
1197 &taker_submit_signer,
1198 rfq_cmd.taker_nonce,
1199 timestamp,
1200 )?;
1201 self.validate_and_advance_nonce(
1202 &taker_accept_signer,
1203 rfq_cmd.taker_accept_nonce,
1204 timestamp,
1205 )?;
1206 output.rfq_plan = Some(Ok(crate::rsm::apply::RfqPlanOutput {
1207 fill_id: plan.fill_id,
1208 mmp_updates: plan.mmp_updates,
1209 }));
1210 for event in self.account_for_events(plan.events, &mut output) {
1211 output.push(event);
1212 }
1213 }
1214 Err(failure) => {
1215 output.rfq_plan = Some(Err(failure));
1216 }
1217 }
1218 }
1219 EngineCommand::TradingModeUpdate { modes, .. } => {
1220 self.apply_underlying_trading_mode_update(&modes);
1221 }
1222 EngineCommand::DepositUpdate {
1223 wallet,
1224 amount,
1225 timestamp_ms,
1226 sequence,
1227 source_event_hash,
1228 } => {
1229 self.apply_deposit_update_to_balance_ledger(
1230 "DepositUpdate",
1231 wallet,
1232 amount,
1233 timestamp_ms,
1234 sequence,
1235 &source_event_hash,
1236 &mut output,
1237 )?;
1238 }
1239 EngineCommand::OptionDepositUpdate {
1240 request_id,
1241 wallet,
1242 symbol,
1243 quantity,
1244 timestamp_ms,
1245 } => {
1246 self.apply_option_deposit_update(
1247 request_id,
1248 wallet,
1249 symbol,
1250 quantity,
1251 timestamp_ms,
1252 )?;
1253 }
1254 EngineCommand::OptionWithdrawalUpdate {
1255 request_id,
1256 wallet,
1257 account,
1258 signer,
1259 rsm_signer,
1260 symbol,
1261 quantity,
1262 nonce,
1263 action,
1264 timestamp_ms,
1265 } => {
1266 self.apply_option_withdrawal_update(
1267 request_id,
1268 wallet,
1269 account,
1270 signer,
1271 rsm_signer,
1272 symbol,
1273 quantity,
1274 nonce,
1275 action,
1276 timestamp_ms,
1277 &mut output,
1278 )?;
1279 }
1280 EngineCommand::CashWithdrawalUpdate {
1281 request_id,
1282 wallet,
1283 account,
1284 destination,
1285 signer,
1286 rsm_signer,
1287 amount,
1288 amount_wei,
1289 nonce,
1290 timestamp_ms,
1291 } => {
1292 self.apply_cash_withdrawal_update(
1293 request_id,
1294 wallet,
1295 destination,
1296 signer,
1297 rsm_signer,
1298 amount,
1299 amount_wei,
1300 account,
1301 nonce,
1302 timestamp_ms,
1303 &mut output,
1304 )?;
1305 }
1306 EngineCommand::LiquidationBonusUpdate {
1307 wallet,
1308 amount,
1309 balance_after,
1310 timestamp_ms,
1311 sequence,
1312 } => {
1313 self.apply_balance_snapshot_update_to_balance_ledger(
1314 "LiquidationBonusUpdate",
1315 wallet,
1316 amount,
1317 balance_after,
1318 timestamp_ms,
1319 sequence,
1320 &mut output,
1321 )?;
1322 }
1323 EngineCommand::ApproveAgent {
1324 wallet,
1325 agent,
1326 expires_at_ms,
1327 nonce,
1328 ..
1329 } => {
1330 if nonce.is_none() {
1331 return Err(EngineError::Rejected(
1332 "ApproveAgent requires a signed nonce".to_string(),
1333 ));
1334 }
1335 const MAX_AGENTS_PER_WALLET: usize = 50;
1336 let agent_count = self
1337 .ctx
1338 .agent_authorizations
1339 .get(&wallet)
1340 .map(|m| m.len())
1341 .unwrap_or(0);
1342 if agent_count >= MAX_AGENTS_PER_WALLET
1343 && !self
1344 .ctx
1345 .agent_authorizations
1346 .get(&wallet)
1347 .map(|m| m.contains_key(&agent))
1348 .unwrap_or(false)
1349 {
1350 return Err(EngineError::Rejected(format!(
1351 "wallet {} already has {} authorized agents (max {})",
1352 wallet, agent_count, MAX_AGENTS_PER_WALLET
1353 )));
1354 }
1355 self.validate_and_advance_nonce(&wallet, nonce, timestamp)?;
1356 self.ctx
1357 .agent_authorizations
1358 .entry(wallet)
1359 .or_default()
1360 .insert(agent, expires_at_ms);
1361 debug!(
1362 wallet = %wallet,
1363 agent = %agent,
1364 expires_at_ms = ?expires_at_ms,
1365 "Applied ApproveAgent command"
1366 );
1367 }
1368 EngineCommand::RevokeAgent {
1369 wallet,
1370 agent,
1371 nonce,
1372 ..
1373 } => {
1374 if nonce.is_none() {
1375 return Err(EngineError::Rejected(
1376 "RevokeAgent requires a signed nonce".to_string(),
1377 ));
1378 }
1379 self.validate_and_advance_nonce(&wallet, nonce, timestamp)?;
1380 if let Some(map) = self.ctx.agent_authorizations.get_mut(&wallet) {
1381 map.remove(&agent);
1382 if map.is_empty() {
1383 self.ctx.agent_authorizations.remove(&wallet);
1384 }
1385 }
1386 debug!(
1387 wallet = %wallet,
1388 agent = %agent,
1389 "Applied RevokeAgent command"
1390 );
1391 }
1392 EngineCommand::NonceAdvance { wallet, nonce, .. } => {
1393 self.validate_and_advance_nonce(&wallet, Some(nonce), timestamp)?;
1394 debug!(
1395 wallet = %wallet,
1396 nonce = nonce,
1397 "Applied NonceAdvance command"
1398 );
1399 }
1400 }
1401
1402 Ok(output)
1403 }
1404
1405 pub(super) fn account_for_fill(
1406 &mut self,
1407 fill: hypercall_types::Fill,
1408 output: &mut ApplyOutput,
1409 ) -> EngineMessage {
1410 if !hypercall_types::utils::is_option_symbol(&fill.symbol) {
1411 use crate::rsm::engine_deps::apply_fill_to_positions;
1412
1413 let human_size = hypercall_types::to_human_readable_decimal(&fill.symbol, fill.size);
1414 let taker_signed_qty = match fill.taker_side {
1415 hypercall_types::Side::Buy => human_size,
1416 hypercall_types::Side::Sell => -human_size,
1417 };
1418
1419 apply_fill_to_positions(
1420 &mut self.ctx.engine_positions,
1421 fill.taker_wallet_address,
1422 fill.symbol.clone(),
1423 taker_signed_qty,
1424 fill.price,
1425 );
1426 apply_fill_to_positions(
1427 &mut self.ctx.engine_positions,
1428 fill.maker_wallet_address,
1429 fill.symbol.clone(),
1430 -taker_signed_qty,
1431 fill.price,
1432 );
1433
1434 return EngineMessage::OrderFilled {
1435 accounting: hypercall_engine::FillAccounting::zero(fill.trade_id),
1436 fill,
1437 };
1438 }
1439
1440 let accounting =
1441 hypercall_engine::apply_fill_position_accounting(&mut self.ctx.engine_positions, &fill);
1442 let reference_id = Some(fill.trade_id.to_string());
1443 let taker_balance_after = self.ctx.balance_ledger.balance(&fill.taker_wallet_address)
1444 + accounting.taker_net_cash_delta;
1445 let taker_update = self.build_balance_update(
1446 fill.taker_wallet_address,
1447 accounting.taker_net_cash_delta,
1448 taker_balance_after,
1449 hypercall_types::BalanceUpdateReason::OptionFillPremium,
1450 reference_id.clone(),
1451 fill.timestamp,
1452 );
1453 self.apply_balance_update(taker_update, output)
1454 .expect("CRITICAL: option fill taker balance update must apply");
1455
1456 let maker_balance_after = self.ctx.balance_ledger.balance(&fill.maker_wallet_address)
1457 + accounting.maker_net_cash_delta;
1458 let maker_update = self.build_balance_update(
1459 fill.maker_wallet_address,
1460 accounting.maker_net_cash_delta,
1461 maker_balance_after,
1462 hypercall_types::BalanceUpdateReason::OptionFillPremium,
1463 reference_id,
1464 fill.timestamp,
1465 );
1466 self.apply_balance_update(maker_update, output)
1467 .expect("CRITICAL: option fill maker balance update must apply");
1468 debug!(
1469 trade_id = fill.trade_id,
1470 symbol = %fill.symbol,
1471 "Applied option fill cashflow to balance_ledger"
1472 );
1473
1474 let mut enriched_fill = fill;
1475 self.attach_fill_underlying_notional(&mut enriched_fill);
1476 enriched_fill.taker_realized_pnl = Some(accounting.taker_realized_pnl);
1477 enriched_fill.maker_realized_pnl = Some(accounting.maker_realized_pnl);
1478
1479 EngineMessage::OrderFilled {
1480 fill: enriched_fill,
1481 accounting,
1482 }
1483 }
1484
1485 pub(super) fn account_for_events(
1486 &mut self,
1487 events: Vec<EngineMessage>,
1488 output: &mut ApplyOutput,
1489 ) -> Vec<EngineMessage> {
1490 events
1491 .into_iter()
1492 .map(|event| match event {
1493 EngineMessage::OrderFilled { fill, .. } => self.account_for_fill(fill, output),
1494 other => other,
1495 })
1496 .collect()
1497 }
1498
1499 pub(super) fn drain_orderbook_events(&mut self) -> Vec<EngineMessage> {
1500 let mut events = Vec::new();
1501 let spot_prices = self.ctx.spot_prices.clone();
1502 let reference_prices = self.ctx.deps.reference_prices.clone();
1503 for orderbook in self.ctx.orderbooks.values_mut() {
1504 for ob_event in orderbook.drain_events() {
1505 match ob_event {
1506 hypercall_engine::OrderBookEvent::OrderFilled(mut fill) => {
1507 fill.underlying_notional = fill_metadata::resolve_fill_underlying_notional(
1508 &fill,
1509 &spot_prices,
1510 &reference_prices,
1511 );
1512 events.push(EngineMessage::OrderFilled {
1513 accounting: hypercall_engine::FillAccounting::zero(fill.trade_id),
1514 fill,
1515 });
1516 }
1517 hypercall_engine::OrderBookEvent::Trade(trade_msg) => {
1518 events.push(EngineMessage::Trade(trade_msg));
1519 }
1520 hypercall_engine::OrderBookEvent::L2Update(l2_msg) => {
1521 events.push(EngineMessage::L2Update(l2_msg));
1522 }
1523 hypercall_engine::OrderBookEvent::OrderbookUpdated(update) => {
1524 events.push(EngineMessage::OrderbookUpdated(update));
1525 }
1526 }
1527 }
1528 }
1529 events
1530 }
1531
1532 pub(super) fn process_expiry_tick_collecting(
1534 &mut self,
1535 now_ms: u64,
1536 context: crate::rsm::apply::TickExpiryContext,
1537 output: &mut ApplyOutput,
1538 response_market_symbol: Option<&str>,
1539 ) -> Result<Option<MarketUpdateMessage>, EngineError> {
1540 let first_new_event = output.events.len();
1541 self.expiry_manager
1542 .apply_tick_expiry(now_ms, context, &mut self.ctx, &self.margin_manager, output)
1543 .map_err(EngineError::Internal)?;
1544
1545 Ok(output.events[first_new_event..]
1546 .iter()
1547 .find_map(|event| match event {
1548 EngineMessage::MarketUpdate(update)
1549 if update.status == MarketUpdateStatus::MarketExpired
1550 && response_market_symbol
1551 .map(|symbol| update.market.symbol == symbol)
1552 .unwrap_or(true) =>
1553 {
1554 Some(update.clone())
1555 }
1556 _ => None,
1557 }))
1558 }
1559}