Skip to main content

hypercall/rsm/unified_engine/
apply_interface.rs

1//! Apply/state-machine interface for UnifiedEngine.
2
3use super::*;
4
5// =============================================================================
6// State Machine Interface (apply)
7// =============================================================================
8
9use crate::rsm::apply::{ApplyOutput, CommandEnvelope, EngineCommand};
10
11/// Error type for apply operations
12#[derive(Debug, Clone)]
13pub enum EngineError {
14    /// Order was rejected (with reason)
15    Rejected(String),
16    /// Internal error during processing
17    Internal(String),
18    /// Market operation error
19    Market(String),
20}
21
22impl std::fmt::Display for EngineError {
23    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
24        match self {
25            EngineError::Rejected(reason) => write!(f, "Rejected: {}", reason),
26            EngineError::Internal(reason) => write!(f, "Internal error: {}", reason),
27            EngineError::Market(reason) => write!(f, "Market error: {}", reason),
28        }
29    }
30}
31
32impl std::error::Error for EngineError {}
33
34impl UnifiedEngine {
35    fn validate_nonce_preview(
36        &self,
37        signer: &hypercall_types::WalletAddress,
38        nonce: Option<u64>,
39        command_timestamp_ms: u64,
40        preview_sets: &mut std::collections::HashMap<
41            hypercall_types::WalletAddress,
42            hypercall_engine::BoundedNonceSet,
43        >,
44    ) -> Result<(), EngineError> {
45        let Some(nonce) = nonce else {
46            return Ok(());
47        };
48        if !hypercall_engine::nonce_within_time_bounds(nonce, command_timestamp_ms) {
49            return Err(EngineError::Rejected(format!(
50                "nonce {} is outside time bounds for signer {} (command_ts={})",
51                nonce, signer, command_timestamp_ms
52            )));
53        }
54        let set = preview_sets.entry(*signer).or_insert_with(|| {
55            let mut set = self.ctx.nonce_sets.get(signer).cloned().unwrap_or_else(|| {
56                hypercall_engine::BoundedNonceSet::new(
57                    hypercall_engine::nonce::DEFAULT_NONCE_SET_CAPACITY,
58                )
59            });
60            set.purge_expired(command_timestamp_ms);
61            set
62        });
63        if !set.insert(nonce) {
64            return Err(EngineError::Rejected(format!(
65                "nonce {} rejected for signer {} (duplicate or below set minimum {})",
66                nonce,
67                signer,
68                set.min().map(|m| m.to_string()).unwrap_or_default()
69            )));
70        }
71        Ok(())
72    }
73
74    /// Validate a nonce against the per-signer bounded nonce set (HL model).
75    /// The nonce must be within the time bounds of the command timestamp,
76    /// not already used, and greater than the smallest nonce in the set.
77    /// Skips validation when `nonce` is `None` (engine-internal commands).
78    fn validate_and_advance_nonce(
79        &mut self,
80        signer: &hypercall_types::WalletAddress,
81        nonce: Option<u64>,
82        command_timestamp_ms: u64,
83    ) -> Result<(), EngineError> {
84        let Some(nonce) = nonce else {
85            return Ok(());
86        };
87        if !hypercall_engine::nonce_within_time_bounds(nonce, command_timestamp_ms) {
88            return Err(EngineError::Rejected(format!(
89                "nonce {} is outside time bounds for signer {} (command_ts={})",
90                nonce, signer, command_timestamp_ms
91            )));
92        }
93        let set = self.ctx.nonce_sets.entry(*signer).or_insert_with(|| {
94            hypercall_engine::BoundedNonceSet::new(
95                hypercall_engine::nonce::DEFAULT_NONCE_SET_CAPACITY,
96            )
97        });
98        set.purge_expired(command_timestamp_ms);
99        if !set.insert(nonce) {
100            return Err(EngineError::Rejected(format!(
101                "nonce {} rejected for signer {} (duplicate or below set minimum {})",
102                nonce,
103                signer,
104                set.min().map(|m| m.to_string()).unwrap_or_default()
105            )));
106        }
107        Ok(())
108    }
109
110    fn allocate_rsm_nonce(&mut self, signer: hypercall_types::WalletAddress) -> u64 {
111        let next = self.ctx.rsm_signer_nonces.entry(signer).or_insert(0);
112        let allocated = *next;
113        *next = next.saturating_add(1);
114        allocated
115    }
116
117    pub fn seed_rsm_signer_nonce_floor(
118        &mut self,
119        signer: hypercall_types::WalletAddress,
120        next_nonce: u64,
121    ) {
122        let current = self.ctx.rsm_signer_nonces.entry(signer).or_insert(0);
123        if *current < next_nonce {
124            *current = next_nonce;
125        }
126    }
127
128    fn validate_cash_withdrawal_margin(
129        &self,
130        wallet: hypercall_types::WalletAddress,
131        amount: rust_decimal::Decimal,
132    ) -> Result<(), EngineError> {
133        if matches!(
134            self.ctx.deps.liquidation_states.get(&wallet),
135            Some(hypercall_types::LiquidationStateType::PreLiquidation)
136                | Some(hypercall_types::LiquidationStateType::InLiquidation)
137        ) {
138            return Err(EngineError::Rejected(
139                "withdrawals are disabled while the account is in liquidation".to_string(),
140            ));
141        }
142
143        let margin_mode = self
144            .margin_manager
145            .get_margin_mode(&self.ctx.deps, &wallet)
146            .map_err(|error| {
147                EngineError::Internal(format!(
148                    "failed to validate cash withdrawal margin for {}: {}",
149                    wallet, error
150                ))
151            })?;
152
153        match margin_mode {
154            crate::rsm::MarginMode::Standard => {
155                let mut simulated_balances = self.ctx.balance_ledger.clone();
156                let post_balance = simulated_balances
157                    .get(&wallet)
158                    .copied()
159                    .unwrap_or(rust_decimal::Decimal::ZERO)
160                    - amount;
161                simulated_balances.insert(wallet, post_balance);
162                let account =
163                    crate::standard_margin::StandardAccountBuilder::build_from_engine_state(
164                        &wallet,
165                        &simulated_balances,
166                        &self.ctx.engine_positions,
167                        &self.ctx.deps.reference_prices,
168                    )
169                    .map_err(|error| {
170                        EngineError::Internal(format!(
171                            "failed to build standard margin account for cash withdrawal {}: {}",
172                            wallet, error
173                        ))
174                    })?;
175                let margin = self
176                    .margin_manager
177                    .standard_margin_service
178                    .compute_margin(&account);
179                if margin.maintenance_margin < rust_decimal::Decimal::ZERO {
180                    return Err(EngineError::Rejected(format!(
181                        "withdrawal would put account below maintenance margin: post_withdraw_equity={}, maintenance_margin_shortfall={}",
182                        margin.equity,
183                        -margin.maintenance_margin
184                    )));
185                }
186            }
187            crate::rsm::MarginMode::Portfolio => {
188                let margin = self
189                    .margin_manager
190                    .get_span_margin_for_wallet(
191                        &self.ctx.deps,
192                        &self.ctx.engine_positions,
193                        &self.ctx.balance_ledger,
194                        &wallet,
195                    )
196                    .map_err(|error| {
197                        EngineError::Internal(format!(
198                            "failed to compute portfolio margin for cash withdrawal {}: {}",
199                            wallet, error
200                        ))
201                    })?
202                    .ok_or_else(|| {
203                        EngineError::Internal(format!(
204                            "missing portfolio margin details for cash withdrawal {}",
205                            wallet
206                        ))
207                    })?;
208                let post_withdraw_equity = margin.equity - amount;
209                if post_withdraw_equity < margin.maintenance_margin_required {
210                    return Err(EngineError::Rejected(format!(
211                        "withdrawal would put account below maintenance margin: post_withdraw_equity={}, maintenance_required={}",
212                        post_withdraw_equity, margin.maintenance_margin_required
213                    )));
214                }
215            }
216        }
217
218        Ok(())
219    }
220
221    fn validate_option_withdrawal_margin(
222        &self,
223        wallet: hypercall_types::WalletAddress,
224        symbol: &str,
225        quantity: rust_decimal::Decimal,
226    ) -> Result<(), EngineError> {
227        if matches!(
228            self.ctx.deps.liquidation_states.get(&wallet),
229            Some(hypercall_types::LiquidationStateType::PreLiquidation)
230                | Some(hypercall_types::LiquidationStateType::InLiquidation)
231        ) {
232            return Err(EngineError::Rejected(
233                "withdrawals are disabled while the account is in liquidation".to_string(),
234            ));
235        }
236
237        let margin_mode = self
238            .margin_manager
239            .get_margin_mode(&self.ctx.deps, &wallet)
240            .map_err(|error| {
241                EngineError::Internal(format!(
242                    "failed to validate option withdrawal margin for {}: {}",
243                    wallet, error
244                ))
245            })?;
246
247        let mut simulated_positions = self.ctx.engine_positions.clone();
248        crate::rsm::engine_deps::apply_option_withdrawal_to_positions(
249            &mut simulated_positions,
250            wallet,
251            symbol.to_string(),
252            quantity,
253        )
254        .map_err(EngineError::Rejected)?;
255
256        match margin_mode {
257            crate::rsm::MarginMode::Standard => {
258                let account =
259                    crate::standard_margin::StandardAccountBuilder::build_from_engine_state(
260                        &wallet,
261                        &self.ctx.balance_ledger,
262                        &simulated_positions,
263                        &self.ctx.deps.reference_prices,
264                    )
265                    .map_err(|error| {
266                        EngineError::Internal(format!(
267                            "failed to build standard margin account for option withdrawal {}: {}",
268                            wallet, error
269                        ))
270                    })?;
271                let margin = self
272                    .margin_manager
273                    .standard_margin_service
274                    .compute_margin(&account);
275                if margin.maintenance_margin < rust_decimal::Decimal::ZERO {
276                    return Err(EngineError::Rejected(format!(
277                        "option withdrawal would put account below maintenance margin: equity={}, maintenance_margin_shortfall={}",
278                        margin.equity,
279                        -margin.maintenance_margin
280                    )));
281                }
282            }
283            crate::rsm::MarginMode::Portfolio => {
284                let margin = self
285                    .margin_manager
286                    .get_span_margin_for_wallet(
287                        &self.ctx.deps,
288                        &simulated_positions,
289                        &self.ctx.balance_ledger,
290                        &wallet,
291                    )
292                    .map_err(|error| {
293                        EngineError::Internal(format!(
294                            "failed to compute portfolio margin for option withdrawal {}: {}",
295                            wallet, error
296                        ))
297                    })?
298                    .ok_or_else(|| {
299                        EngineError::Internal(format!(
300                            "missing portfolio margin details for option withdrawal {}",
301                            wallet
302                        ))
303                    })?;
304                if margin.equity < margin.maintenance_margin_required {
305                    return Err(EngineError::Rejected(format!(
306                        "option withdrawal would put account below maintenance margin: equity={}, maintenance_required={}",
307                        margin.equity, margin.maintenance_margin_required
308                    )));
309                }
310            }
311        }
312
313        Ok(())
314    }
315
316    pub(super) fn apply_deposit_update_to_balance_ledger(
317        &mut self,
318        update_kind: &'static str,
319        wallet: hypercall_types::WalletAddress,
320        amount: rust_decimal::Decimal,
321        timestamp_ms: u64,
322        sequence: Option<u64>,
323        source_event_hash: &alloy::primitives::FixedBytes<32>,
324        output: &mut ApplyOutput,
325    ) -> Result<(), EngineError> {
326        if sequence.is_none() {
327            return Err(EngineError::Internal(format!(
328                "{} for {} missing durable sequence",
329                update_kind, wallet
330            )));
331        }
332        if source_event_hash.is_zero() {
333            return Err(EngineError::Internal(format!(
334                "{} for {} missing source_event_hash",
335                update_kind, wallet
336            )));
337        }
338
339        // Cash DepositUpdate commands are additive deltas. Deduplicate by the
340        // deterministic source event hash, not by the per-wallet watermark. A
341        // lower sequence can be a still-unseen delta that arrived after a later
342        // one, and dropping it would under-credit engine cash.
343        //
344        // This is an explicit short-term tradeoff. Persisting every applied
345        // source hash in EngineStateSnapshot is unbounded, but it avoids silent
346        // financial corruption now. Replace this with a bounded per-wallet cash
347        // sequencer: applied_through + a small pending gap buffer, contiguous
348        // application, and fail-closed behavior when the pending window is
349        // exceeded. Do not replace it with an unjournaled projection repair.
350        if self
351            .ctx
352            .applied_deposit_source_event_hashes
353            .contains(source_event_hash)
354        {
355            let current_balance = self.ctx.balance_ledger.balance(&wallet);
356            warn!(
357                wallet = %wallet,
358                amount = %amount,
359                current_balance = %current_balance,
360                timestamp_ms = timestamp_ms,
361                sequence = ?sequence,
362                update_kind = update_kind,
363                source_event_hash = %source_event_hash,
364                "Skipping duplicate balance update command"
365            );
366            return Ok(());
367        }
368
369        if let Some(last) = self.last_deposit_update(&wallet) {
370            let stale_update = match last.source {
371                crate::rsm::engine_deps::BalanceLedgerMutationSource::DepositUpdate => {
372                    match (sequence, last.sequence) {
373                        // A ledger sequence identifies one durable cash event.
374                        // This also covers legacy replay rows whose synthetic
375                        // source hash differs from a later live retry's real
376                        // event hash.
377                        (Some(sequence), Some(last_sequence)) if sequence == last_sequence => true,
378                        // Cash deposits are deltas and source_event_hash has
379                        // already ruled out duplicates. Do not reject a lower
380                        // sequence here; it may be an out-of-order but still
381                        // unapplied cash delta.
382                        (Some(_), Some(_)) => false,
383                        // New typed deposits must carry sequence, but tolerate
384                        // old replay state that had no watermark by accepting
385                        // the first sequenced row and letting it advance state.
386                        (Some(_), None) => false,
387                        // Do not let an unsequenced update overwrite state that
388                        // has already seen sequenced cash evidence.
389                        (None, Some(_)) => true,
390                        // Last-resort legacy path. This must only run when
391                        // neither side has durable ordering evidence.
392                        (None, None) => timestamp_ms <= last.timestamp_ms,
393                    }
394                }
395                crate::rsm::engine_deps::BalanceLedgerMutationSource::CashWithdrawal => {
396                    // CashWithdrawal watermarks preserve the last observed deposit
397                    // sequence; they are not themselves deposit ledger events.
398                    // Even a lower sequence may be a valid catch-up delta after
399                    // the withdrawal; duplicates were already rejected by
400                    // source_event_hash.
401                    match (sequence, last.sequence) {
402                        // Same sequence is the same durable event even if
403                        // legacy replay used a synthetic source hash before a
404                        // live retry supplied the real event hash.
405                        (Some(sequence), Some(last_sequence)) if sequence == last_sequence => true,
406                        // Source_event_hash is the idempotency key. Apply as a
407                        // delta, not as a snapshot restore.
408                        (Some(_), Some(_)) => false,
409                        // Unsequenced cash evidence cannot safely follow a
410                        // withdrawal watermark with a known deposit sequence.
411                        (None, Some(_)) => true,
412                        // Only legacy/unsequenced states fall back to time.
413                        _ => timestamp_ms <= last.timestamp_ms,
414                    }
415                }
416            };
417            if stale_update {
418                let current_balance = self.ctx.balance_ledger.balance(&wallet);
419                warn!(
420                    wallet = %wallet,
421                    amount = %amount,
422                    current_balance = %current_balance,
423                    timestamp_ms = timestamp_ms,
424                    sequence = ?sequence,
425                    last_timestamp_ms = last.timestamp_ms,
426                    last_sequence = ?last.sequence,
427                    last_balance_after = %last.balance_after,
428                    update_kind = update_kind,
429                    "Skipping stale balance update command"
430                );
431                return Ok(());
432            }
433        }
434
435        // Cash deposits are deltas. Never restore from a projection snapshot
436        // here, because a queued older deposit can arrive after an engine-owned
437        // withdrawal and must not overwrite the post-withdrawal balance.
438        let previous_balance = self.ctx.balance_ledger.balance(&wallet);
439        let applied_balance_after = previous_balance + amount;
440        let update = self.build_balance_update(
441            wallet,
442            amount,
443            applied_balance_after,
444            hypercall_types::BalanceUpdateReason::Deposit,
445            sequence.map(|seq| seq.to_string()),
446            timestamp_ms,
447        );
448        self.apply_balance_update(update, output)?;
449        self.ctx
450            .applied_deposit_source_event_hashes
451            .insert(*source_event_hash);
452        let watermark_sequence = match (
453            sequence,
454            self.last_deposit_update(&wallet)
455                .and_then(|last| last.sequence),
456        ) {
457            (Some(sequence), Some(last_sequence)) => Some(sequence.max(last_sequence)),
458            (Some(sequence), None) => Some(sequence),
459            (None, Some(last_sequence)) => Some(last_sequence),
460            (None, None) => None,
461        };
462        let watermark_timestamp_ms = self
463            .last_deposit_update(&wallet)
464            .map(|last| timestamp_ms.max(last.timestamp_ms))
465            .unwrap_or(timestamp_ms);
466        self.ctx.deposit_update_watermarks.insert(
467            wallet,
468            crate::rsm::engine_deps::DepositUpdateWatermark {
469                sequence: watermark_sequence,
470                timestamp_ms: watermark_timestamp_ms,
471                balance_after: applied_balance_after,
472                source: crate::rsm::engine_deps::BalanceLedgerMutationSource::DepositUpdate,
473            },
474        );
475        debug!(
476            wallet = %wallet,
477            amount = %amount,
478            previous_balance = %previous_balance,
479            applied_balance_after = %applied_balance_after,
480            timestamp_ms = timestamp_ms,
481            sequence = ?sequence,
482            update_kind = update_kind,
483            source_event_hash = %source_event_hash,
484            "Applied balance update command"
485        );
486        Ok(())
487    }
488
489    pub(super) fn apply_balance_snapshot_update_to_balance_ledger(
490        &mut self,
491        update_kind: &'static str,
492        wallet: hypercall_types::WalletAddress,
493        amount: rust_decimal::Decimal,
494        balance_after: rust_decimal::Decimal,
495        timestamp_ms: u64,
496        sequence: Option<u64>,
497        output: &mut ApplyOutput,
498    ) -> Result<(), EngineError> {
499        if let Some(last) = self.last_deposit_update(&wallet) {
500            // Snapshot-style commands still carry absolute balance_after. These
501            // are separate from cash DepositUpdate deltas and need stricter
502            // ordering around withdrawals because applying a stale snapshot can
503            // restore cash that the engine already debited.
504            let stale_update = match last.source {
505                crate::rsm::engine_deps::BalanceLedgerMutationSource::DepositUpdate => {
506                    match (sequence, last.sequence) {
507                        // Same source domain: both values are durable event ids.
508                        (Some(sequence), Some(last_sequence)) => sequence <= last_sequence,
509                        (Some(_), None) => false,
510                        (None, Some(_)) => true,
511                        (None, None) => {
512                            // Without durable ordering, only suppress snapshots
513                            // that are same-timestamp no-ops or reversals. A
514                            // later snapshot may still be the only available
515                            // balance_after evidence for this legacy path.
516                            timestamp_ms == last.timestamp_ms
517                                && ((amount >= rust_decimal::Decimal::ZERO
518                                    && balance_after <= last.balance_after)
519                                    || (amount < rust_decimal::Decimal::ZERO
520                                        && balance_after >= last.balance_after))
521                        }
522                    }
523                }
524                crate::rsm::engine_deps::BalanceLedgerMutationSource::CashWithdrawal => {
525                    // After a withdrawal, wall-clock ordering is the only safe
526                    // ordering domain for absolute snapshots. The preserved
527                    // sequence belongs to the previous deposit watermark, not to
528                    // the withdrawal debit itself.
529                    if timestamp_ms <= last.timestamp_ms {
530                        // Absolute snapshots at or before the withdrawal can
531                        // restore pre-withdrawal cash, so reject them even if
532                        // their sequence looks higher in another source domain.
533                        true
534                    } else {
535                        match (sequence, last.sequence) {
536                            // Only compare sequences after the snapshot is known
537                            // to be newer than the withdrawal timestamp.
538                            (Some(sequence), Some(last_sequence)) => sequence <= last_sequence,
539                            (Some(_), None) => false,
540                            (None, Some(_)) => true,
541                            (None, None) => false,
542                        }
543                    }
544                }
545            };
546            if stale_update {
547                let current_balance = self.ctx.balance_ledger.balance(&wallet);
548                warn!(
549                    wallet = %wallet,
550                    amount = %amount,
551                    current_balance = %current_balance,
552                    balance_after = %balance_after,
553                    timestamp_ms = timestamp_ms,
554                    sequence = ?sequence,
555                    last_timestamp_ms = last.timestamp_ms,
556                    last_sequence = ?last.sequence,
557                    last_balance_after = %last.balance_after,
558                    update_kind = update_kind,
559                    "Skipping stale balance snapshot command"
560                );
561                return Ok(());
562            }
563        }
564
565        let previous_balance = self.ctx.balance_ledger.balance(&wallet);
566        let reason = match update_kind {
567            "LiquidationBonusUpdate" => hypercall_types::BalanceUpdateReason::LiquidationBonus,
568            _ if amount < rust_decimal::Decimal::ZERO => {
569                hypercall_types::BalanceUpdateReason::Withdrawal
570            }
571            _ => hypercall_types::BalanceUpdateReason::Deposit,
572        };
573        let update = self.build_balance_update(
574            wallet,
575            amount,
576            balance_after,
577            reason,
578            sequence.map(|seq| seq.to_string()),
579            timestamp_ms,
580        );
581        self.apply_balance_update(update, output)?;
582        self.ctx.deposit_update_watermarks.insert(
583            wallet,
584            crate::rsm::engine_deps::DepositUpdateWatermark {
585                sequence,
586                timestamp_ms,
587                balance_after,
588                source: crate::rsm::engine_deps::BalanceLedgerMutationSource::DepositUpdate,
589            },
590        );
591        debug!(
592            wallet = %wallet,
593            amount = %amount,
594            previous_balance = %previous_balance,
595            balance_after = %balance_after,
596            timestamp_ms = timestamp_ms,
597            sequence = ?sequence,
598            update_kind = update_kind,
599            "Applied balance snapshot command"
600        );
601        Ok(())
602    }
603
604    fn build_balance_update(
605        &self,
606        wallet: hypercall_types::WalletAddress,
607        delta: rust_decimal::Decimal,
608        balance_after: rust_decimal::Decimal,
609        reason: hypercall_types::BalanceUpdateReason,
610        reference_id: Option<String>,
611        timestamp_ms: u64,
612    ) -> hypercall_types::BalanceUpdate {
613        hypercall_types::BalanceUpdate {
614            balance_update_seq: self.ctx.balance_ledger.next_balance_update_seq(),
615            wallet,
616            delta,
617            balance_after,
618            reason,
619            reference_id,
620            source_command_id: None,
621            timestamp_ms,
622        }
623    }
624
625    fn apply_balance_update(
626        &mut self,
627        update: hypercall_types::BalanceUpdate,
628        output: &mut ApplyOutput,
629    ) -> Result<(), EngineError> {
630        let applied = self
631            .ctx
632            .balance_ledger
633            .apply_balance_update(&update)
634            .map_err(|error| EngineError::Internal(error.to_string()))?;
635        if !applied {
636            return Err(EngineError::Internal(format!(
637                "balance update seq {} for {} was not applied",
638                update.balance_update_seq, update.wallet
639            )));
640        }
641        output.balance_updates.push(update);
642        Ok(())
643    }
644
645    fn last_deposit_update(
646        &self,
647        wallet: &hypercall_types::WalletAddress,
648    ) -> Option<crate::rsm::engine_deps::DepositUpdateWatermark> {
649        self.ctx.deposit_update_watermarks.get(wallet).copied()
650    }
651
652    pub(super) fn apply_option_deposit_update(
653        &mut self,
654        request_id: String,
655        wallet: hypercall_types::WalletAddress,
656        symbol: String,
657        quantity: rust_decimal::Decimal,
658        timestamp_ms: u64,
659    ) -> Result<(), EngineError> {
660        if quantity <= rust_decimal::Decimal::ZERO {
661            return Err(EngineError::Internal(format!(
662                "option deposit {} for {} {} has non-positive quantity {}",
663                request_id, wallet, symbol, quantity
664            )));
665        }
666        let request_uuid = uuid::Uuid::parse_str(&request_id).map_err(|error| {
667            EngineError::Internal(format!(
668                "OptionDepositUpdate request_id {} is not a UUID: {}",
669                request_id, error
670            ))
671        })?;
672
673        crate::rsm::engine_deps::apply_option_deposit_to_positions(
674            &mut self.ctx.engine_positions,
675            wallet,
676            symbol.clone(),
677            quantity,
678        );
679        debug!(
680            wallet = %wallet,
681            symbol = %symbol,
682            quantity = %quantity,
683            timestamp_ms = timestamp_ms,
684            request_id = %request_uuid,
685            "Applied OptionDepositUpdate command"
686        );
687        Ok(())
688    }
689
690    pub(super) fn apply_option_withdrawal_update(
691        &mut self,
692        request_id: String,
693        wallet: hypercall_types::WalletAddress,
694        account: hypercall_types::WalletAddress,
695        signer: hypercall_types::WalletAddress,
696        rsm_signer: hypercall_types::WalletAddress,
697        symbol: String,
698        quantity: rust_decimal::Decimal,
699        nonce: Option<u64>,
700        action: Vec<u8>,
701        timestamp_ms: u64,
702        output: &mut ApplyOutput,
703    ) -> Result<(), EngineError> {
704        let request_uuid = uuid::Uuid::parse_str(&request_id).map_err(|error| {
705            EngineError::Internal(format!(
706                "OptionWithdrawalUpdate request_id {} is not a UUID: {}",
707                request_id, error
708            ))
709        })?;
710
711        if quantity <= rust_decimal::Decimal::ZERO {
712            return Err(EngineError::Rejected(
713                "option withdrawal quantity must be positive".to_string(),
714            ));
715        }
716        let current_quantity = self
717            .ctx
718            .engine_positions
719            .get(&(wallet, symbol.clone()))
720            .map(|position| position.quantity)
721            .unwrap_or_default();
722        if current_quantity < quantity {
723            return Err(EngineError::Rejected(format!(
724                "insufficient option balance for withdrawal: have {}, need {}",
725                current_quantity, quantity
726            )));
727        }
728        self.validate_option_withdrawal_margin(wallet, &symbol, quantity)?;
729        self.validate_and_advance_nonce(&signer, nonce, timestamp_ms)?;
730        let rsm_nonce = self.allocate_rsm_nonce(rsm_signer);
731        crate::rsm::engine_deps::apply_option_withdrawal_to_positions(
732            &mut self.ctx.engine_positions,
733            wallet,
734            symbol.clone(),
735            quantity,
736        )
737        .map_err(EngineError::Rejected)?;
738        output.outbox_appends.push(
739            crate::directive_outbox::DirectiveOutboxAppend::needs_rsm_signature(
740                request_uuid.to_string(),
741                hypercall_types::directives::ActionKey::SystemCreditOption,
742                wallet,
743                account,
744                rsm_signer,
745                rsm_nonce,
746                format!("rsm:{}:{}", rsm_signer, request_uuid),
747                action,
748                timestamp_ms,
749                Some(timestamp_ms.saturating_add(300_000)),
750            ),
751        );
752        debug!(
753            wallet = %wallet,
754            symbol = %symbol,
755            quantity = %quantity,
756            timestamp_ms = timestamp_ms,
757            request_id = %request_uuid,
758            "Applied OptionWithdrawalUpdate command"
759        );
760        Ok(())
761    }
762
763    pub(super) fn apply_cash_withdrawal_update(
764        &mut self,
765        request_id: String,
766        wallet: hypercall_types::WalletAddress,
767        destination: hypercall_types::WalletAddress,
768        signer: hypercall_types::WalletAddress,
769        rsm_signer: hypercall_types::WalletAddress,
770        amount: rust_decimal::Decimal,
771        amount_wei: u64,
772        account: hypercall_types::WalletAddress,
773        nonce: Option<u64>,
774        timestamp_ms: u64,
775        output: &mut ApplyOutput,
776    ) -> Result<rust_decimal::Decimal, EngineError> {
777        let request_uuid = uuid::Uuid::parse_str(&request_id).map_err(|error| {
778            EngineError::Internal(format!(
779                "CashWithdrawalUpdate request_id {} is not a UUID: {}",
780                request_id, error
781            ))
782        })?;
783        if amount <= rust_decimal::Decimal::ZERO {
784            return Err(EngineError::Rejected(
785                "cash withdrawal amount must be positive".to_string(),
786            ));
787        }
788        if amount_wei == 0 {
789            return Err(EngineError::Rejected(
790                "cash withdrawal amount_wei must be positive".to_string(),
791            ));
792        }
793        let current_balance = self.ctx.balance_ledger.balance(&wallet);
794        if current_balance < amount {
795            return Err(EngineError::Rejected(format!(
796                "insufficient USDC balance for withdrawal: available={}, requested={}",
797                current_balance, amount
798            )));
799        }
800        self.validate_cash_withdrawal_margin(wallet, amount)?;
801        let action = crate::rsm_credit_directive_producer::encode_system_withdraw_token_action(
802            crate::rsm_credit_directive_producer::SystemWithdrawTokenDirective {
803                destination,
804                sub_account: hypercall_types::WalletAddress::default(),
805                src_dex: 0,
806                dst_dex: 0,
807                token: 0,
808                amount_wei,
809            },
810        )
811        .map_err(|error| {
812            EngineError::Internal(format!("failed to encode withdrawal action: {}", error))
813        })?;
814        self.validate_and_advance_nonce(&signer, nonce, timestamp_ms)?;
815        let balance_after = current_balance - amount;
816        let update = self.build_balance_update(
817            wallet,
818            -amount,
819            balance_after,
820            hypercall_types::BalanceUpdateReason::Withdrawal,
821            Some(request_uuid.to_string()),
822            timestamp_ms,
823        );
824        self.apply_balance_update(update, output)?;
825        let last_deposit_sequence = self
826            .last_deposit_update(&wallet)
827            .and_then(|watermark| watermark.sequence);
828        self.ctx.deposit_update_watermarks.insert(
829            wallet,
830            crate::rsm::engine_deps::DepositUpdateWatermark {
831                sequence: last_deposit_sequence,
832                timestamp_ms,
833                balance_after,
834                source: crate::rsm::engine_deps::BalanceLedgerMutationSource::CashWithdrawal,
835            },
836        );
837        let rsm_nonce = self.allocate_rsm_nonce(rsm_signer);
838        output.outbox_appends.push(
839            crate::directive_outbox::DirectiveOutboxAppend::needs_rsm_signature(
840                request_uuid.to_string(),
841                hypercall_types::directives::ActionKey::SystemWithdrawToken,
842                wallet,
843                account,
844                rsm_signer,
845                rsm_nonce,
846                format!("rsm:{}:{}", rsm_signer, request_uuid),
847                action,
848                timestamp_ms,
849                Some(timestamp_ms.saturating_add(1_800_000)),
850            ),
851        );
852        debug!(
853            wallet = %wallet,
854            amount = %amount,
855            balance_after = %balance_after,
856            timestamp_ms = timestamp_ms,
857            request_id = %request_uuid,
858            "Applied CashWithdrawalUpdate command"
859        );
860        Ok(balance_after)
861    }
862
863    /// Apply a command to the engine state and return the resulting events.
864    ///
865    /// This is the core state machine interface that enables:
866    /// - Reasoning about correctness as a pure state transition
867    /// - Observing "input command -> output events" per request
868    /// - Testing determinism and replay
869    ///
870    /// # Contract
871    /// - `apply` must NOT publish to the event bus, write to DB, or mutate external caches
872    /// - It only mutates the engine's in-memory state and returns `Vec<EngineMessage>`
873    /// - The runtime is responsible for publishing events after apply returns
874    ///
875    /// # Event Ordering
876    /// Events are returned in the order they should be published. The current
877    /// implementation preserves the same ordering as the direct-send approach.
878    pub fn apply(&mut self, env: CommandEnvelope) -> Result<ApplyOutput, EngineError> {
879        let timestamp = env.received_ts_ms;
880        self.ctx.deps.margin_timestamp_s = (timestamp / 1000) as i64;
881        let mut output = ApplyOutput::with_capacity(8);
882
883        #[cfg(feature = "rsm-state")]
884        {
885            output.command_identity_hash = env.command.identity_hash();
886        }
887
888        match env.command {
889            EngineCommand::OrderAction(order_msg) => {
890                let signer = order_msg.api_wallet_address.unwrap_or(order_msg.wallet);
891                self.validate_and_advance_nonce(&signer, order_msg.info.nonce, timestamp)?;
892                self.apply_order_action(order_msg, timestamp, &mut output);
893            }
894            EngineCommand::MarketAction(market_cmd) => {
895                self.apply_market_action(market_cmd, timestamp, &mut output)?;
896            }
897            EngineCommand::LiquidationState(liq_msg) => {
898                self.ctx
899                    .deps
900                    .liquidation_states
901                    .insert(liq_msg.wallet, liq_msg.new_state);
902                output.push(EngineMessage::LiquidationStateChange(liq_msg));
903            }
904            EngineCommand::TickExpiry { now_ms, context } => {
905                // Process expiry tick
906                self.process_expiry_tick_collecting(now_ms, context, &mut output, None)?;
907            }
908            EngineCommand::TickSnapshot { now_ms: _ } => {
909                // Snapshot generation handled by existing emit_all_orderbook_snapshots
910                // For V1, we don't collect these events as they're high-volume
911                // This could be enhanced in a future version
912            }
913            EngineCommand::PriceUpdate {
914                underlying,
915                spot_price,
916                timestamp_ms,
917            } => {
918                use rust_decimal::prelude::ToPrimitive;
919                self.ctx.spot_prices.insert(underlying.clone(), spot_price);
920                if let Some(price_f64) = spot_price.to_f64() {
921                    self.ctx
922                        .deps
923                        .reference_prices
924                        .insert(underlying.clone(), price_f64);
925                }
926                debug!(
927                    underlying = %underlying,
928                    spot_price = %spot_price,
929                    timestamp_ms = timestamp_ms,
930                    "Applied PriceUpdate command"
931                );
932            }
933            EngineCommand::IvUpdate {
934                underlying,
935                surface,
936                timestamp_ms,
937                ..
938            } => {
939                self.ctx
940                    .iv_surfaces
941                    .insert(underlying.clone(), surface.clone());
942                self.engine_iv_surfaces
943                    .write()
944                    .expect("engine IV surfaces lock poisoned")
945                    .insert(underlying.clone(), surface);
946                debug!(
947                    underlying = %underlying,
948                    timestamp_ms = timestamp_ms,
949                    "Applied IvUpdate command"
950                );
951            }
952            EngineCommand::TierUpdate {
953                wallet,
954                margin_mode,
955                tier,
956                trading_limits,
957            } => {
958                self.ctx
959                    .deps
960                    .wallet_margin_modes
961                    .insert(wallet, margin_mode);
962                self.ctx.deps.wallet_tiers.insert(wallet, tier.clone());
963                self.ctx
964                    .deps
965                    .wallet_trading_limits
966                    .insert(wallet, trading_limits);
967                debug!(
968                    wallet = %wallet,
969                    margin_mode = ?margin_mode,
970                    tier = %tier,
971                    max_open_orders = trading_limits.max_open_orders,
972                    max_open_positions = trading_limits.max_open_positions,
973                    "Applied TierUpdate command"
974                );
975            }
976            EngineCommand::LegacyTierMarginModeUpdate {
977                wallet,
978                margin_mode,
979            } => {
980                self.ctx
981                    .deps
982                    .wallet_margin_modes
983                    .insert(wallet, margin_mode);
984                debug!(
985                    wallet = %wallet,
986                    margin_mode = ?margin_mode,
987                    "Applied legacy TierUpdate margin-mode command"
988                );
989            }
990            EngineCommand::HypercorePositionUpdate {
991                ref account,
992                ref coin,
993                size,
994                entry_price,
995                unrealized_pnl,
996                timestamp_ms,
997            } => {
998                let key = (account.to_lowercase(), coin.clone());
999                if size == 0.0 {
1000                    self.ctx.deps.perp_positions.remove(&key);
1001                } else {
1002                    self.ctx.deps.perp_positions.insert(
1003                        key,
1004                        crate::hypercore::PerpPosition {
1005                            coin: coin.clone(),
1006                            size,
1007                            entry_price: Some(entry_price),
1008                            position_value: size * entry_price,
1009                            unrealized_pnl,
1010                            margin_used: 0.0,
1011                            liquidation_price: None,
1012                        },
1013                    );
1014                }
1015                debug!(
1016                    account = %account,
1017                    coin = %coin,
1018                    size = size,
1019                    timestamp_ms = timestamp_ms,
1020                    "Applied HypercorePositionUpdate command"
1021                );
1022            }
1023            EngineCommand::HypercoreEquityUpdate {
1024                wallet,
1025                account_value,
1026                timestamp_ms,
1027            } => {
1028                let stale = self
1029                    .ctx
1030                    .deps
1031                    .hypercore_equity_timestamps
1032                    .get(&wallet)
1033                    .map(|&last_ts| timestamp_ms <= last_ts)
1034                    .unwrap_or(false);
1035                if stale {
1036                    debug!(
1037                        wallet = %wallet,
1038                        account_value = %account_value,
1039                        timestamp_ms = timestamp_ms,
1040                        "Skipping stale HypercoreEquityUpdate"
1041                    );
1042                } else {
1043                    self.ctx
1044                        .deps
1045                        .hypercore_account_equity
1046                        .insert(wallet, account_value);
1047                    self.ctx
1048                        .deps
1049                        .hypercore_equity_timestamps
1050                        .insert(wallet, timestamp_ms);
1051                    debug!(
1052                        wallet = %wallet,
1053                        account_value = %account_value,
1054                        timestamp_ms = timestamp_ms,
1055                        "Applied HypercoreEquityUpdate command"
1056                    );
1057                }
1058            }
1059            EngineCommand::SetPmSettlementPoolConfig(command) => {
1060                let effects = self
1061                    .ctx
1062                    .pm_settlement_state
1063                    .apply_set_config(command)
1064                    .map_err(EngineError::Rejected)?;
1065                output.pm_settlement_effects.extend(effects);
1066            }
1067            EngineCommand::RecordPmVaultDeposit(command) => {
1068                let effects = self
1069                    .ctx
1070                    .pm_settlement_state
1071                    .apply_record_vault_deposit(command)
1072                    .map_err(EngineError::Rejected)?;
1073                output.pm_settlement_effects.extend(effects);
1074            }
1075            EngineCommand::RequestPmVaultWithdrawal(command) => {
1076                let effects = self
1077                    .ctx
1078                    .pm_settlement_state
1079                    .apply_request_vault_withdrawal(command)
1080                    .map_err(EngineError::Rejected)?;
1081                output.pm_settlement_effects.extend(effects);
1082            }
1083            EngineCommand::AccruePmSettlementInterest(command) => {
1084                let effects = self
1085                    .ctx
1086                    .pm_settlement_state
1087                    .apply_accrue_interest(command)
1088                    .map_err(EngineError::Rejected)?;
1089                output.pm_settlement_effects.extend(effects);
1090            }
1091            EngineCommand::ApplyPmSettlementRepayment(command) => {
1092                let effects = self
1093                    .ctx
1094                    .pm_settlement_state
1095                    .apply_repayment(command)
1096                    .map_err(EngineError::Rejected)?;
1097                output.pm_settlement_effects.extend(effects);
1098            }
1099            EngineCommand::JournalPmRecoveryPlan(command) => {
1100                let effects = self
1101                    .ctx
1102                    .pm_settlement_state
1103                    .apply_journal_recovery_plan(command)
1104                    .unwrap_or_else(|error| {
1105                        panic!("RUNTIME_INVARIANT: replayed JournalPmRecoveryPlan failed: {error}")
1106                    });
1107                output.pm_settlement_effects.extend(effects);
1108            }
1109            EngineCommand::MarkPmRecoveryActionSubmitted(command) => {
1110                let effects = self
1111                    .ctx
1112                    .pm_settlement_state
1113                    .apply_mark_recovery_action_submitted(command)
1114                    .unwrap_or_else(|error| {
1115                        panic!(
1116                            "RUNTIME_INVARIANT: replayed MarkPmRecoveryActionSubmitted failed: {error}"
1117                        )
1118                    });
1119                output.pm_settlement_effects.extend(effects);
1120            }
1121            EngineCommand::ResolvePmRecoveryAction(command) => {
1122                let effects = self
1123                    .ctx
1124                    .pm_settlement_state
1125                    .apply_resolve_recovery_action(command)
1126                    .unwrap_or_else(|error| {
1127                        panic!(
1128                            "RUNTIME_INVARIANT: replayed ResolvePmRecoveryAction failed: {error}"
1129                        )
1130                    });
1131                output.pm_settlement_effects.extend(effects);
1132            }
1133            EngineCommand::MmpConfigUpdate {
1134                wallet,
1135                currency,
1136                enabled,
1137                interval_ms,
1138                frozen_time_ms,
1139                qty_limit,
1140                delta_limit,
1141                vega_limit,
1142            } => {
1143                self.ctx
1144                    .deps
1145                    .mmp_enabled
1146                    .insert((wallet, currency.clone()), enabled);
1147
1148                // Update or create engine-internal MMP state so the matching
1149                // loop can check MMP synchronously without async cache reads.
1150                let key = (wallet, currency.clone());
1151                let state = self.ctx.mmp_state.entry(key).or_insert_with(|| {
1152                    crate::rsm::engine_deps::EngineMmpState::new(
1153                        enabled,
1154                        interval_ms,
1155                        frozen_time_ms,
1156                        qty_limit,
1157                        delta_limit,
1158                        vega_limit,
1159                    )
1160                });
1161                // Always update config fields (the entry may already exist
1162                // from a previous MmpConfigUpdate).
1163                state.enabled = enabled;
1164                state.interval_ms = interval_ms;
1165                state.frozen_time_ms = frozen_time_ms;
1166                state.qty_limit = qty_limit;
1167                state.delta_limit = delta_limit;
1168                state.vega_limit = vega_limit;
1169
1170                debug!(
1171                    wallet = %wallet,
1172                    currency = %currency,
1173                    enabled = enabled,
1174                    "Applied MmpConfigUpdate command"
1175                );
1176            }
1177            EngineCommand::RfqExecute(rfq_cmd) => {
1178                let taker_submit_signer = rfq_cmd.taker_submit_signer();
1179                let taker_accept_signer = rfq_cmd.taker_accept_signer();
1180                let mut nonce_preview = std::collections::HashMap::new();
1181                self.validate_nonce_preview(
1182                    &taker_submit_signer,
1183                    rfq_cmd.taker_nonce,
1184                    timestamp,
1185                    &mut nonce_preview,
1186                )?;
1187                self.validate_nonce_preview(
1188                    &taker_accept_signer,
1189                    rfq_cmd.taker_accept_nonce,
1190                    timestamp,
1191                    &mut nonce_preview,
1192                )?;
1193
1194                match self.plan_rfq_execution(&rfq_cmd) {
1195                    Ok(plan) => {
1196                        self.validate_and_advance_nonce(
1197                            &taker_submit_signer,
1198                            rfq_cmd.taker_nonce,
1199                            timestamp,
1200                        )?;
1201                        self.validate_and_advance_nonce(
1202                            &taker_accept_signer,
1203                            rfq_cmd.taker_accept_nonce,
1204                            timestamp,
1205                        )?;
1206                        output.rfq_plan = Some(Ok(crate::rsm::apply::RfqPlanOutput {
1207                            fill_id: plan.fill_id,
1208                            mmp_updates: plan.mmp_updates,
1209                        }));
1210                        for event in self.account_for_events(plan.events, &mut output) {
1211                            output.push(event);
1212                        }
1213                    }
1214                    Err(failure) => {
1215                        output.rfq_plan = Some(Err(failure));
1216                    }
1217                }
1218            }
1219            EngineCommand::TradingModeUpdate { modes, .. } => {
1220                self.apply_underlying_trading_mode_update(&modes);
1221            }
1222            EngineCommand::DepositUpdate {
1223                wallet,
1224                amount,
1225                timestamp_ms,
1226                sequence,
1227                source_event_hash,
1228            } => {
1229                self.apply_deposit_update_to_balance_ledger(
1230                    "DepositUpdate",
1231                    wallet,
1232                    amount,
1233                    timestamp_ms,
1234                    sequence,
1235                    &source_event_hash,
1236                    &mut output,
1237                )?;
1238            }
1239            EngineCommand::OptionDepositUpdate {
1240                request_id,
1241                wallet,
1242                symbol,
1243                quantity,
1244                timestamp_ms,
1245            } => {
1246                self.apply_option_deposit_update(
1247                    request_id,
1248                    wallet,
1249                    symbol,
1250                    quantity,
1251                    timestamp_ms,
1252                )?;
1253            }
1254            EngineCommand::OptionWithdrawalUpdate {
1255                request_id,
1256                wallet,
1257                account,
1258                signer,
1259                rsm_signer,
1260                symbol,
1261                quantity,
1262                nonce,
1263                action,
1264                timestamp_ms,
1265            } => {
1266                self.apply_option_withdrawal_update(
1267                    request_id,
1268                    wallet,
1269                    account,
1270                    signer,
1271                    rsm_signer,
1272                    symbol,
1273                    quantity,
1274                    nonce,
1275                    action,
1276                    timestamp_ms,
1277                    &mut output,
1278                )?;
1279            }
1280            EngineCommand::CashWithdrawalUpdate {
1281                request_id,
1282                wallet,
1283                account,
1284                destination,
1285                signer,
1286                rsm_signer,
1287                amount,
1288                amount_wei,
1289                nonce,
1290                timestamp_ms,
1291            } => {
1292                self.apply_cash_withdrawal_update(
1293                    request_id,
1294                    wallet,
1295                    destination,
1296                    signer,
1297                    rsm_signer,
1298                    amount,
1299                    amount_wei,
1300                    account,
1301                    nonce,
1302                    timestamp_ms,
1303                    &mut output,
1304                )?;
1305            }
1306            EngineCommand::LiquidationBonusUpdate {
1307                wallet,
1308                amount,
1309                balance_after,
1310                timestamp_ms,
1311                sequence,
1312            } => {
1313                self.apply_balance_snapshot_update_to_balance_ledger(
1314                    "LiquidationBonusUpdate",
1315                    wallet,
1316                    amount,
1317                    balance_after,
1318                    timestamp_ms,
1319                    sequence,
1320                    &mut output,
1321                )?;
1322            }
1323            EngineCommand::ApproveAgent {
1324                wallet,
1325                agent,
1326                expires_at_ms,
1327                nonce,
1328                ..
1329            } => {
1330                if nonce.is_none() {
1331                    return Err(EngineError::Rejected(
1332                        "ApproveAgent requires a signed nonce".to_string(),
1333                    ));
1334                }
1335                const MAX_AGENTS_PER_WALLET: usize = 50;
1336                let agent_count = self
1337                    .ctx
1338                    .agent_authorizations
1339                    .get(&wallet)
1340                    .map(|m| m.len())
1341                    .unwrap_or(0);
1342                if agent_count >= MAX_AGENTS_PER_WALLET
1343                    && !self
1344                        .ctx
1345                        .agent_authorizations
1346                        .get(&wallet)
1347                        .map(|m| m.contains_key(&agent))
1348                        .unwrap_or(false)
1349                {
1350                    return Err(EngineError::Rejected(format!(
1351                        "wallet {} already has {} authorized agents (max {})",
1352                        wallet, agent_count, MAX_AGENTS_PER_WALLET
1353                    )));
1354                }
1355                self.validate_and_advance_nonce(&wallet, nonce, timestamp)?;
1356                self.ctx
1357                    .agent_authorizations
1358                    .entry(wallet)
1359                    .or_default()
1360                    .insert(agent, expires_at_ms);
1361                debug!(
1362                    wallet = %wallet,
1363                    agent = %agent,
1364                    expires_at_ms = ?expires_at_ms,
1365                    "Applied ApproveAgent command"
1366                );
1367            }
1368            EngineCommand::RevokeAgent {
1369                wallet,
1370                agent,
1371                nonce,
1372                ..
1373            } => {
1374                if nonce.is_none() {
1375                    return Err(EngineError::Rejected(
1376                        "RevokeAgent requires a signed nonce".to_string(),
1377                    ));
1378                }
1379                self.validate_and_advance_nonce(&wallet, nonce, timestamp)?;
1380                if let Some(map) = self.ctx.agent_authorizations.get_mut(&wallet) {
1381                    map.remove(&agent);
1382                    if map.is_empty() {
1383                        self.ctx.agent_authorizations.remove(&wallet);
1384                    }
1385                }
1386                debug!(
1387                    wallet = %wallet,
1388                    agent = %agent,
1389                    "Applied RevokeAgent command"
1390                );
1391            }
1392            EngineCommand::NonceAdvance { wallet, nonce, .. } => {
1393                self.validate_and_advance_nonce(&wallet, Some(nonce), timestamp)?;
1394                debug!(
1395                    wallet = %wallet,
1396                    nonce = nonce,
1397                    "Applied NonceAdvance command"
1398                );
1399            }
1400        }
1401
1402        Ok(output)
1403    }
1404
1405    pub(super) fn account_for_fill(
1406        &mut self,
1407        fill: hypercall_types::Fill,
1408        output: &mut ApplyOutput,
1409    ) -> EngineMessage {
1410        if !hypercall_types::utils::is_option_symbol(&fill.symbol) {
1411            use crate::rsm::engine_deps::apply_fill_to_positions;
1412
1413            let human_size = hypercall_types::to_human_readable_decimal(&fill.symbol, fill.size);
1414            let taker_signed_qty = match fill.taker_side {
1415                hypercall_types::Side::Buy => human_size,
1416                hypercall_types::Side::Sell => -human_size,
1417            };
1418
1419            apply_fill_to_positions(
1420                &mut self.ctx.engine_positions,
1421                fill.taker_wallet_address,
1422                fill.symbol.clone(),
1423                taker_signed_qty,
1424                fill.price,
1425            );
1426            apply_fill_to_positions(
1427                &mut self.ctx.engine_positions,
1428                fill.maker_wallet_address,
1429                fill.symbol.clone(),
1430                -taker_signed_qty,
1431                fill.price,
1432            );
1433
1434            return EngineMessage::OrderFilled {
1435                accounting: hypercall_engine::FillAccounting::zero(fill.trade_id),
1436                fill,
1437            };
1438        }
1439
1440        let accounting =
1441            hypercall_engine::apply_fill_position_accounting(&mut self.ctx.engine_positions, &fill);
1442        let reference_id = Some(fill.trade_id.to_string());
1443        let taker_balance_after = self.ctx.balance_ledger.balance(&fill.taker_wallet_address)
1444            + accounting.taker_net_cash_delta;
1445        let taker_update = self.build_balance_update(
1446            fill.taker_wallet_address,
1447            accounting.taker_net_cash_delta,
1448            taker_balance_after,
1449            hypercall_types::BalanceUpdateReason::OptionFillPremium,
1450            reference_id.clone(),
1451            fill.timestamp,
1452        );
1453        self.apply_balance_update(taker_update, output)
1454            .expect("CRITICAL: option fill taker balance update must apply");
1455
1456        let maker_balance_after = self.ctx.balance_ledger.balance(&fill.maker_wallet_address)
1457            + accounting.maker_net_cash_delta;
1458        let maker_update = self.build_balance_update(
1459            fill.maker_wallet_address,
1460            accounting.maker_net_cash_delta,
1461            maker_balance_after,
1462            hypercall_types::BalanceUpdateReason::OptionFillPremium,
1463            reference_id,
1464            fill.timestamp,
1465        );
1466        self.apply_balance_update(maker_update, output)
1467            .expect("CRITICAL: option fill maker balance update must apply");
1468        debug!(
1469            trade_id = fill.trade_id,
1470            symbol = %fill.symbol,
1471            "Applied option fill cashflow to balance_ledger"
1472        );
1473
1474        let mut enriched_fill = fill;
1475        self.attach_fill_underlying_notional(&mut enriched_fill);
1476        enriched_fill.taker_realized_pnl = Some(accounting.taker_realized_pnl);
1477        enriched_fill.maker_realized_pnl = Some(accounting.maker_realized_pnl);
1478
1479        EngineMessage::OrderFilled {
1480            fill: enriched_fill,
1481            accounting,
1482        }
1483    }
1484
1485    pub(super) fn account_for_events(
1486        &mut self,
1487        events: Vec<EngineMessage>,
1488        output: &mut ApplyOutput,
1489    ) -> Vec<EngineMessage> {
1490        events
1491            .into_iter()
1492            .map(|event| match event {
1493                EngineMessage::OrderFilled { fill, .. } => self.account_for_fill(fill, output),
1494                other => other,
1495            })
1496            .collect()
1497    }
1498
1499    pub(super) fn drain_orderbook_events(&mut self) -> Vec<EngineMessage> {
1500        let mut events = Vec::new();
1501        let spot_prices = self.ctx.spot_prices.clone();
1502        let reference_prices = self.ctx.deps.reference_prices.clone();
1503        for orderbook in self.ctx.orderbooks.values_mut() {
1504            for ob_event in orderbook.drain_events() {
1505                match ob_event {
1506                    hypercall_engine::OrderBookEvent::OrderFilled(mut fill) => {
1507                        fill.underlying_notional = fill_metadata::resolve_fill_underlying_notional(
1508                            &fill,
1509                            &spot_prices,
1510                            &reference_prices,
1511                        );
1512                        events.push(EngineMessage::OrderFilled {
1513                            accounting: hypercall_engine::FillAccounting::zero(fill.trade_id),
1514                            fill,
1515                        });
1516                    }
1517                    hypercall_engine::OrderBookEvent::Trade(trade_msg) => {
1518                        events.push(EngineMessage::Trade(trade_msg));
1519                    }
1520                    hypercall_engine::OrderBookEvent::L2Update(l2_msg) => {
1521                        events.push(EngineMessage::L2Update(l2_msg));
1522                    }
1523                    hypercall_engine::OrderBookEvent::OrderbookUpdated(update) => {
1524                        events.push(EngineMessage::OrderbookUpdated(update));
1525                    }
1526                }
1527            }
1528        }
1529        events
1530    }
1531
1532    /// Process expiry tick using runtime-provided context.
1533    pub(super) fn process_expiry_tick_collecting(
1534        &mut self,
1535        now_ms: u64,
1536        context: crate::rsm::apply::TickExpiryContext,
1537        output: &mut ApplyOutput,
1538        response_market_symbol: Option<&str>,
1539    ) -> Result<Option<MarketUpdateMessage>, EngineError> {
1540        let first_new_event = output.events.len();
1541        self.expiry_manager
1542            .apply_tick_expiry(now_ms, context, &mut self.ctx, &self.margin_manager, output)
1543            .map_err(EngineError::Internal)?;
1544
1545        Ok(output.events[first_new_event..]
1546            .iter()
1547            .find_map(|event| match event {
1548                EngineMessage::MarketUpdate(update)
1549                    if update.status == MarketUpdateStatus::MarketExpired
1550                        && response_market_symbol
1551                            .map(|symbol| update.market.symbol == symbol)
1552                            .unwrap_or(true) =>
1553                {
1554                    Some(update.clone())
1555                }
1556                _ => None,
1557            }))
1558    }
1559}