Skip to main content

hypercall/rsm/unified_engine/
runtime.rs

1//! Runtime loop and flush coordination for UnifiedEngine.
2
3use super::*;
4use crate::shared::order_types::ParsedSymbol;
5use crate::vol_oracle::{VolPoint, VolProviderKind, VolSurfaceSnapshot};
6use hypercall_db::{
7    DirectiveOutboxReader, OrderWriter, PmSettlementProjectionSyncWriter, SettlementWriter,
8};
9use rust_decimal::prelude::ToPrimitive;
10use std::collections::BTreeSet;
11
12const STANDBY_SETTLEMENT_OBSERVE_ATTEMPTS: usize = 8;
13const STANDBY_SETTLEMENT_OBSERVE_INITIAL_BACKOFF_MS: u64 = 25;
14const STANDBY_SETTLEMENT_OBSERVE_MAX_BACKOFF_MS: u64 = 500;
15
16fn synthesize_fixed_surface_snapshot(
17    orderbooks: &std::collections::HashMap<String, hypercall_engine::OrderBook>,
18    vol_oracle: &crate::vol_oracle::SharedVolOracle,
19    statuses: &[crate::vol_oracle::VolOracleStatus],
20    underlying: &str,
21    now_ms: i64,
22) -> Option<VolSurfaceSnapshot> {
23    let has_ready_fixed = statuses.iter().any(|status| {
24        status.underlying == underlying && status.provider == VolProviderKind::Fixed && status.ready
25    });
26    if !has_ready_fixed {
27        return None;
28    }
29
30    let mut strike_points = Vec::new();
31    let mut seen = BTreeSet::new();
32    for symbol in orderbooks.keys() {
33        let Ok(parsed) = ParsedSymbol::from_symbol(symbol) else {
34            continue;
35        };
36        if parsed.underlying != underlying {
37            continue;
38        }
39
40        let Some(strike) = parsed.strike.to_f64() else {
41            panic!(
42                "STATE_CORRUPTION: strike {} for {} is not representable as f64",
43                parsed.strike, symbol
44            );
45        };
46        let expiry_ts =
47            hypercall_types::expiry_date_to_timestamp(&parsed.underlying, parsed.expiry) as i64;
48        let dedupe_key = (expiry_ts, parsed.strike.to_string());
49        if !seen.insert(dedupe_key) {
50            continue;
51        }
52
53        let iv = vol_oracle.get_iv(underlying, strike, expiry_ts).ok()?;
54        strike_points.push(VolPoint {
55            strike,
56            expiry: expiry_ts,
57            iv,
58            timestamp: now_ms,
59        });
60    }
61
62    if strike_points.is_empty() {
63        return None;
64    }
65
66    let expiries = strike_points
67        .iter()
68        .map(|point| point.expiry)
69        .collect::<BTreeSet<_>>()
70        .into_iter()
71        .collect();
72
73    Some(VolSurfaceSnapshot {
74        underlying: underlying.to_string(),
75        last_update_ts_ms: Some(now_ms),
76        expiries,
77        strike_points,
78        delta_curves: Vec::new(),
79        atm_vols: Vec::new(),
80        spot_price: None,
81    })
82}
83
84fn max_listed_option_expiry_ts_ms(
85    orderbooks: &std::collections::HashMap<String, hypercall_engine::OrderBook>,
86    underlying: &str,
87) -> Result<i64, String> {
88    orderbooks
89        .keys()
90        .filter_map(|symbol| ParsedSymbol::from_symbol(symbol).ok())
91        .filter(|parsed| parsed.underlying == underlying)
92        .map(|parsed| {
93            hypercall_types::expiry_date_to_timestamp_checked(&parsed.underlying, parsed.expiry)
94                .map_err(|error| {
95                    format!(
96                        "invalid listed option expiry {} for {}: {}",
97                        parsed.expiry, parsed.underlying, error
98                    )
99                })
100                .and_then(|expiry_ts| {
101                    i64::try_from(expiry_ts)
102                        .map_err(|_| "listed option expiry exceeds i64".to_string())?
103                        .checked_mul(1_000)
104                        .ok_or_else(|| "listed option expiry ms overflow".to_string())
105                })
106        })
107        .try_fold(None, |max: Option<i64>, expiry_ts_ms| {
108            let expiry_ts_ms = expiry_ts_ms?;
109            Ok::<_, String>(Some(
110                max.map_or(expiry_ts_ms, |current| current.max(expiry_ts_ms)),
111            ))
112        })?
113        .ok_or_else(|| format!("no listed option markets for underlying {underlying}"))
114}
115
116impl UnifiedEngine {
117    /// Fail before mutation when an external durable operation enters the
118    /// engine without a UUID-backed journal identity.
119    fn require_external_durable_mutation_uuid(command_type: &str, request_id: &str) -> uuid::Uuid {
120        uuid::Uuid::parse_str(request_id).unwrap_or_else(|error| {
121            panic!(
122                "RUNTIME_INVARIANT: external engine command {} request_id {} is not a UUID: {}",
123                command_type, request_id, error
124            )
125        })
126    }
127
128    fn parse_directive_domain_status(
129        value: &str,
130    ) -> Result<crate::directive_outbox::DirectiveDomainStatus, String> {
131        match value {
132            "accepted" => Ok(crate::directive_outbox::DirectiveDomainStatus::Accepted),
133            "rejected" => Ok(crate::directive_outbox::DirectiveDomainStatus::Rejected),
134            "pending_chain_effect" => {
135                Ok(crate::directive_outbox::DirectiveDomainStatus::PendingChainEffect)
136            }
137            "completed" => Ok(crate::directive_outbox::DirectiveDomainStatus::Completed),
138            "failed" => Ok(crate::directive_outbox::DirectiveDomainStatus::Failed),
139            other => Err(format!("unknown directive domain_status {}", other)),
140        }
141    }
142
143    fn parse_directive_delivery_status(
144        value: &str,
145    ) -> Result<crate::directive_outbox::DirectiveDeliveryStatus, String> {
146        match value {
147            "pending" => Ok(crate::directive_outbox::DirectiveDeliveryStatus::Pending),
148            "broadcasted" => Ok(crate::directive_outbox::DirectiveDeliveryStatus::Broadcasted),
149            "included" => Ok(crate::directive_outbox::DirectiveDeliveryStatus::Included),
150            "finalized" => Ok(crate::directive_outbox::DirectiveDeliveryStatus::Finalized),
151            "reverted" => Ok(crate::directive_outbox::DirectiveDeliveryStatus::Reverted),
152            "expired" => Ok(crate::directive_outbox::DirectiveDeliveryStatus::Expired),
153            "dead_lettered" => Ok(crate::directive_outbox::DirectiveDeliveryStatus::DeadLettered),
154            other => Err(format!("unknown directive delivery_status {}", other)),
155        }
156    }
157
158    fn persisted_withdrawal_directive_status(
159        &self,
160        request_id: &str,
161    ) -> Result<
162        Option<(
163            crate::directive_outbox::DirectiveDomainStatus,
164            crate::directive_outbox::DirectiveDeliveryStatus,
165        )>,
166        String,
167    > {
168        let Some(db) = self.ctx.db.as_ref() else {
169            warn!(
170                request_id = %request_id,
171                "Cannot read persisted directive status without DB handle; returning pending retry receipt"
172            );
173            return Ok(None);
174        };
175        let row = db
176            .get_directive_status_sync(request_id)
177            .map_err(|error| format!("failed to read directive status for {}: {}", request_id, error))?
178            .unwrap_or_else(|| {
179                panic!(
180                    "JOURNAL_FATAL: withdrawal request_id {} is journaled but missing directive_outbox row",
181                    request_id
182                )
183            });
184
185        Ok(Some((
186            Self::parse_directive_domain_status(&row.domain_status)?,
187            Self::parse_directive_delivery_status(&row.delivery_status)?,
188        )))
189    }
190
191    async fn external_option_command_already_journaled(
192        &self,
193        request_id: &str,
194        command_type: &str,
195    ) -> bool {
196        let request_uuid = Self::require_external_durable_mutation_uuid(command_type, request_id);
197        let Some(journal_writer) = self.journal_writer.clone() else {
198            return false;
199        };
200        let command_type = command_type.to_string();
201        let result =
202            tokio::task::spawn_blocking(move || journal_writer.get_by_request_id(&request_uuid))
203                .await
204                .unwrap_or_else(|error| {
205                    panic!(
206                        "JOURNAL_FATAL: idempotency lookup task failed for {}: {}",
207                        request_id, error
208                    )
209                })
210                .unwrap_or_else(|error| {
211                    panic!(
212                        "JOURNAL_FATAL: idempotency lookup failed for {}: {}",
213                        request_id, error
214                    )
215                });
216
217        match result {
218            Some(record) if record.command_type == command_type => true,
219            Some(record) => {
220                panic!(
221                    "JOURNAL_FATAL: request_id {} already used for {}, not {}",
222                    request_id, record.command_type, command_type
223                )
224            }
225            None => false,
226        }
227    }
228
229    async fn journal_external_option_position_command(
230        &self,
231        env: &crate::rsm::apply::CommandEnvelope,
232        request_id: &str,
233        command_type_enum: hypercall_db_diesel::engine_enums::CommandType,
234        outbox_appends: Vec<crate::directive_outbox::DirectiveOutboxAppend>,
235    ) {
236        use crate::journal::{JournalEntry, JournalMessage};
237        use crate::rsm::apply::EngineCommand;
238        use hypercall_db_diesel::engine_enums::DbUuid;
239
240        let request_uuid =
241            Self::require_external_durable_mutation_uuid(env.command.command_type(), request_id);
242        #[cfg(feature = "rsm-state")]
243        let command_identity_hash = env.command.identity_hash();
244        let command_data = match &env.command {
245            EngineCommand::OptionDepositUpdate {
246                request_id,
247                wallet,
248                symbol,
249                quantity,
250                timestamp_ms,
251            } => hypercall_types::serialize_to_wire_bytes(&(
252                request_id,
253                wallet,
254                symbol,
255                quantity,
256                timestamp_ms,
257            )),
258            EngineCommand::OptionWithdrawalUpdate {
259                request_id,
260                wallet,
261                account,
262                signer,
263                rsm_signer,
264                symbol,
265                quantity,
266                nonce,
267                action,
268                timestamp_ms,
269            } => hypercall_types::serialize_to_wire_bytes(&(
270                request_id,
271                wallet,
272                account,
273                signer,
274                rsm_signer,
275                symbol,
276                quantity,
277                nonce,
278                action,
279                timestamp_ms,
280            )),
281            other => panic!(
282                "RUNTIME_INVARIANT: journal_external_option_position_command called for {}",
283                other.command_type()
284            ),
285        };
286
287        if let Some(ref batch_sender) = self.journal_batch_sender {
288            let (ack_tx, ack_rx) = tokio::sync::oneshot::channel();
289            let entry = JournalEntry {
290                received_ts_ms: env.received_ts_ms,
291                command_data,
292                response_data: None,
293                order_id: None,
294                pre_digest: Default::default(),
295                post_digest: Default::default(),
296                duration_ms: 0,
297                events: Vec::new(),
298                outbox_appends,
299                fill_side_effects: Vec::new(),
300                cash_withdrawal_side_effect: None,
301                balance_updates: Vec::new(),
302                created_at: Instant::now(),
303                commit_ack: Some(ack_tx),
304                request_uuid: DbUuid(request_uuid),
305                command_type_enum: Some(command_type_enum),
306                #[cfg(feature = "rsm-state")]
307                command_identity_hash,
308                #[cfg(feature = "rsm-state")]
309                rsm_state_digest: None,
310            };
311            if let Err(error) = batch_sender.send(JournalMessage::Entry(entry)).await {
312                panic!(
313                    "JOURNAL_FATAL: failed to journal {} for request_id {}: {}",
314                    env.command.command_type(),
315                    request_id,
316                    error
317                );
318            }
319            if ack_rx.await.is_err() {
320                panic!(
321                    "CRITICAL_FAILURE: journal commit_ack dropped for {} request_id {}. \
322                     Durability boundary is unknown after state mutation.",
323                    env.command.command_type(),
324                    request_id
325                );
326            }
327            return;
328        }
329
330        let Some(ref journal_writer) = self.journal_writer else {
331            panic!(
332                "JOURNAL_FATAL: no journal configured for {} request_id {}",
333                env.command.command_type(),
334                request_id
335            );
336        };
337        if !outbox_appends.is_empty() {
338            panic!(
339                "JOURNAL_FATAL: synchronous journal path cannot persist directive outbox appends for {} request_id {}",
340                env.command.command_type(),
341                request_id
342            );
343        }
344        let balance_updates = Vec::new();
345        journal_writer
346            .append_transition_with_fill_side_effects(
347                env.received_ts_ms,
348                &command_data,
349                None,
350                None,
351                &Default::default(),
352                &Default::default(),
353                0,
354                &[],
355                &[],
356                &balance_updates,
357                DbUuid(request_uuid),
358                Some(command_type_enum),
359            )
360            .unwrap_or_else(|error| {
361                panic!(
362                    "JOURNAL_FATAL: failed to journal {} for request_id {}: {}",
363                    env.command.command_type(),
364                    request_id,
365                    error
366                )
367            });
368    }
369
370    async fn journal_external_balance_update_command(
371        &self,
372        env: &crate::rsm::apply::CommandEnvelope,
373        request_id: &str,
374        outbox_appends: Vec<crate::directive_outbox::DirectiveOutboxAppend>,
375        balance_updates: Vec<hypercall_types::BalanceUpdate>,
376    ) {
377        use crate::journal::{JournalEntry, JournalMessage};
378        use crate::rsm::apply::{BalanceCommandPayload, DepositUpdatePayload, EngineCommand};
379        use hypercall_db_diesel::engine_enums::{CommandType, DbUuid};
380
381        let request_uuid =
382            Self::require_external_durable_mutation_uuid(env.command.command_type(), request_id);
383        let (command_data, command_type_enum) = match &env.command {
384            EngineCommand::DepositUpdate {
385                wallet,
386                amount,
387                timestamp_ms,
388                sequence,
389                source_event_hash,
390            } => {
391                let Some(sequence) = sequence else {
392                    panic!(
393                        "RUNTIME_INVARIANT: DepositUpdate {} missing durable sequence",
394                        request_id
395                    );
396                };
397                (
398                    hypercall_types::serialize_to_wire_bytes(&DepositUpdatePayload {
399                        wallet: *wallet,
400                        amount: *amount,
401                        timestamp_ms: *timestamp_ms,
402                        sequence: *sequence,
403                        source_event_hash: source_event_hash.clone(),
404                    }),
405                    CommandType::DepositUpdate,
406                )
407            }
408            EngineCommand::LiquidationBonusUpdate {
409                wallet,
410                amount,
411                balance_after,
412                timestamp_ms,
413                sequence,
414            } => (
415                hypercall_types::serialize_to_wire_bytes(&BalanceCommandPayload {
416                    wallet: *wallet,
417                    amount: *amount,
418                    balance_after: *balance_after,
419                    timestamp_ms: *timestamp_ms,
420                    sequence: *sequence,
421                }),
422                CommandType::LiquidationBonusUpdate,
423            ),
424            _ => {
425                panic!(
426                    "RUNTIME_INVARIANT: journal_external_balance_update_command called for {}",
427                    env.command.command_type()
428                );
429            }
430        };
431        #[cfg(feature = "rsm-state")]
432        let command_identity_hash = env.command.identity_hash();
433
434        if let Some(ref batch_sender) = self.journal_batch_sender {
435            let (ack_tx, ack_rx) = tokio::sync::oneshot::channel();
436            let entry = JournalEntry {
437                received_ts_ms: env.received_ts_ms,
438                command_data,
439                response_data: None,
440                order_id: None,
441                pre_digest: Default::default(),
442                post_digest: Default::default(),
443                duration_ms: 0,
444                events: Vec::new(),
445                outbox_appends,
446                fill_side_effects: Vec::new(),
447                cash_withdrawal_side_effect: None,
448                balance_updates,
449                created_at: Instant::now(),
450                commit_ack: Some(ack_tx),
451                request_uuid: DbUuid(request_uuid),
452                command_type_enum: Some(command_type_enum),
453                #[cfg(feature = "rsm-state")]
454                command_identity_hash,
455                #[cfg(feature = "rsm-state")]
456                rsm_state_digest: None,
457            };
458            if let Err(error) = batch_sender.send(JournalMessage::Entry(entry)).await {
459                panic!(
460                    "JOURNAL_FATAL: failed to journal {} for request_id {}: {}",
461                    env.command.command_type(),
462                    request_id,
463                    error
464                );
465            }
466            if ack_rx.await.is_err() {
467                panic!(
468                    "CRITICAL_FAILURE: journal commit_ack dropped for {} request_id {}. \
469                     Durability boundary is unknown after state mutation.",
470                    env.command.command_type(),
471                    request_id
472                );
473            }
474            return;
475        }
476
477        let Some(ref journal_writer) = self.journal_writer else {
478            panic!(
479                "JOURNAL_FATAL: no journal configured for {} request_id {}",
480                env.command.command_type(),
481                request_id
482            );
483        };
484        if !outbox_appends.is_empty() {
485            panic!(
486                "JOURNAL_FATAL: synchronous journal path cannot persist directive outbox appends for {} request_id {}",
487                env.command.command_type(),
488                request_id
489            );
490        }
491        journal_writer
492            .append_transition_with_fill_side_effects(
493                env.received_ts_ms,
494                &command_data,
495                None,
496                None,
497                &Default::default(),
498                &Default::default(),
499                0,
500                &[],
501                &[],
502                &balance_updates,
503                DbUuid(request_uuid),
504                Some(command_type_enum),
505            )
506            .unwrap_or_else(|error| {
507                panic!(
508                    "JOURNAL_FATAL: failed to journal {} for request_id {}: {}",
509                    env.command.command_type(),
510                    request_id,
511                    error
512                )
513            });
514    }
515
516    fn pm_settlement_request_id(command: &crate::rsm::apply::EngineCommand) -> Option<uuid::Uuid> {
517        match command {
518            crate::rsm::apply::EngineCommand::SetPmSettlementPoolConfig(command) => {
519                Some(command.request_id)
520            }
521            crate::rsm::apply::EngineCommand::RecordPmVaultDeposit(command) => {
522                Some(command.request_id)
523            }
524            crate::rsm::apply::EngineCommand::RequestPmVaultWithdrawal(command) => {
525                Some(command.request_id)
526            }
527            crate::rsm::apply::EngineCommand::AccruePmSettlementInterest(command) => {
528                Some(command.request_id)
529            }
530            crate::rsm::apply::EngineCommand::ApplyPmSettlementRepayment(command) => {
531                Some(command.request_id)
532            }
533            crate::rsm::apply::EngineCommand::JournalPmRecoveryPlan(command) => {
534                Some(command.request_id)
535            }
536            crate::rsm::apply::EngineCommand::MarkPmRecoveryActionSubmitted(command) => {
537                Some(command.request_id)
538            }
539            crate::rsm::apply::EngineCommand::ResolvePmRecoveryAction(command) => {
540                Some(command.request_id)
541            }
542            _ => None,
543        }
544    }
545
546    fn preflight_pm_settlement_admin_command(
547        &self,
548        command: &crate::rsm::apply::EngineCommand,
549    ) -> Result<(), String> {
550        let mut pm_state = self.ctx.pm_settlement_state.clone();
551        match command.clone() {
552            crate::rsm::apply::EngineCommand::SetPmSettlementPoolConfig(command) => {
553                pm_state.apply_set_config(command).map(|_| ())
554            }
555            crate::rsm::apply::EngineCommand::RecordPmVaultDeposit(command) => {
556                let engine_max_expiry_ts_ms =
557                    max_listed_option_expiry_ts_ms(&self.ctx.orderbooks, &command.underlying)?;
558                if command.max_listed_expiry_ts_ms != engine_max_expiry_ts_ms {
559                    return Err(format!(
560                        "RecordPmVaultDeposit max_listed_expiry_ts_ms {} does not match engine max listed expiry {} for {}",
561                        command.max_listed_expiry_ts_ms,
562                        engine_max_expiry_ts_ms,
563                        command.underlying
564                    ));
565                }
566                pm_state.apply_record_vault_deposit(command).map(|_| ())
567            }
568            crate::rsm::apply::EngineCommand::RequestPmVaultWithdrawal(command) => {
569                pm_state.apply_request_vault_withdrawal(command).map(|_| ())
570            }
571            crate::rsm::apply::EngineCommand::AccruePmSettlementInterest(command) => {
572                pm_state.apply_accrue_interest(command).map(|_| ())
573            }
574            crate::rsm::apply::EngineCommand::ApplyPmSettlementRepayment(command) => {
575                pm_state.apply_repayment(command).map(|_| ())
576            }
577            crate::rsm::apply::EngineCommand::JournalPmRecoveryPlan(command) => {
578                pm_state.apply_journal_recovery_plan(command).map(|_| ())
579            }
580            crate::rsm::apply::EngineCommand::MarkPmRecoveryActionSubmitted(command) => pm_state
581                .apply_mark_recovery_action_submitted(command)
582                .map(|_| ()),
583            crate::rsm::apply::EngineCommand::ResolvePmRecoveryAction(command) => {
584                pm_state.apply_resolve_recovery_action(command).map(|_| ())
585            }
586            other => Err(format!(
587                "unsupported PM settlement admin command {}",
588                other.command_type()
589            )),
590        }
591    }
592
593    fn stamp_pm_settlement_admin_command(
594        command: crate::rsm::apply::EngineCommand,
595        timestamp_ms: u64,
596    ) -> crate::rsm::apply::EngineCommand {
597        match command {
598            crate::rsm::apply::EngineCommand::SetPmSettlementPoolConfig(mut command) => {
599                command.timestamp_ms = timestamp_ms;
600                crate::rsm::apply::EngineCommand::SetPmSettlementPoolConfig(command)
601            }
602            crate::rsm::apply::EngineCommand::RecordPmVaultDeposit(mut command) => {
603                command.timestamp_ms = timestamp_ms;
604                crate::rsm::apply::EngineCommand::RecordPmVaultDeposit(command)
605            }
606            crate::rsm::apply::EngineCommand::RequestPmVaultWithdrawal(mut command) => {
607                command.timestamp_ms = timestamp_ms;
608                crate::rsm::apply::EngineCommand::RequestPmVaultWithdrawal(command)
609            }
610            crate::rsm::apply::EngineCommand::AccruePmSettlementInterest(mut command) => {
611                command.timestamp_ms = timestamp_ms;
612                crate::rsm::apply::EngineCommand::AccruePmSettlementInterest(command)
613            }
614            crate::rsm::apply::EngineCommand::ApplyPmSettlementRepayment(mut command) => {
615                command.timestamp_ms = timestamp_ms;
616                crate::rsm::apply::EngineCommand::ApplyPmSettlementRepayment(command)
617            }
618            crate::rsm::apply::EngineCommand::JournalPmRecoveryPlan(mut command) => {
619                command.timestamp_ms = timestamp_ms;
620                crate::rsm::apply::EngineCommand::JournalPmRecoveryPlan(command)
621            }
622            crate::rsm::apply::EngineCommand::MarkPmRecoveryActionSubmitted(mut command) => {
623                command.timestamp_ms = timestamp_ms;
624                crate::rsm::apply::EngineCommand::MarkPmRecoveryActionSubmitted(command)
625            }
626            crate::rsm::apply::EngineCommand::ResolvePmRecoveryAction(mut command) => {
627                command.timestamp_ms = timestamp_ms;
628                crate::rsm::apply::EngineCommand::ResolvePmRecoveryAction(command)
629            }
630            command => command,
631        }
632    }
633
634    async fn journal_pm_settlement_admin_command(
635        &self,
636        env: &crate::rsm::apply::CommandEnvelope,
637        request_uuid: uuid::Uuid,
638    ) {
639        use crate::engine_enums_ext::CommandTypeExt;
640        use crate::journal::{JournalEntry, JournalMessage};
641        use hypercall_db_diesel::engine_enums::{CommandType, DbUuid};
642
643        let command_type_enum = CommandType::from_command(&env.command);
644        let command_data = match &env.command {
645            crate::rsm::apply::EngineCommand::SetPmSettlementPoolConfig(command) => {
646                hypercall_types::serialize_to_wire_bytes(command)
647            }
648            crate::rsm::apply::EngineCommand::RecordPmVaultDeposit(command) => {
649                hypercall_types::serialize_to_wire_bytes(command)
650            }
651            crate::rsm::apply::EngineCommand::RequestPmVaultWithdrawal(command) => {
652                hypercall_types::serialize_to_wire_bytes(command)
653            }
654            crate::rsm::apply::EngineCommand::AccruePmSettlementInterest(command) => {
655                hypercall_types::serialize_to_wire_bytes(command)
656            }
657            crate::rsm::apply::EngineCommand::ApplyPmSettlementRepayment(command) => {
658                hypercall_types::serialize_to_wire_bytes(command)
659            }
660            crate::rsm::apply::EngineCommand::JournalPmRecoveryPlan(command) => {
661                hypercall_types::serialize_to_wire_bytes(command)
662            }
663            crate::rsm::apply::EngineCommand::MarkPmRecoveryActionSubmitted(command) => {
664                hypercall_types::serialize_to_wire_bytes(command)
665            }
666            crate::rsm::apply::EngineCommand::ResolvePmRecoveryAction(command) => {
667                hypercall_types::serialize_to_wire_bytes(command)
668            }
669            _ => {
670                panic!(
671                    "RUNTIME_INVARIANT: journal_pm_settlement_admin_command called for {}",
672                    env.command.command_type()
673                );
674            }
675        };
676        let request_id = request_uuid.to_string();
677        #[cfg(feature = "rsm-state")]
678        let command_identity_hash = env.command.identity_hash();
679
680        if let Some(ref batch_sender) = self.journal_batch_sender {
681            let (ack_tx, ack_rx) = tokio::sync::oneshot::channel();
682            let entry = JournalEntry {
683                received_ts_ms: env.received_ts_ms,
684                command_data,
685                response_data: None,
686                order_id: None,
687                pre_digest: Default::default(),
688                post_digest: Default::default(),
689                duration_ms: 0,
690                events: Vec::new(),
691                outbox_appends: Vec::new(),
692                fill_side_effects: Vec::new(),
693                cash_withdrawal_side_effect: None,
694                balance_updates: Vec::new(),
695                created_at: Instant::now(),
696                commit_ack: Some(ack_tx),
697                request_uuid: DbUuid(request_uuid),
698                command_type_enum: Some(command_type_enum),
699                #[cfg(feature = "rsm-state")]
700                command_identity_hash,
701                #[cfg(feature = "rsm-state")]
702                rsm_state_digest: None,
703            };
704            if let Err(error) = batch_sender.send(JournalMessage::Entry(entry)).await {
705                panic!(
706                    "JOURNAL_FATAL: failed to journal {} for request_id {}: {}",
707                    env.command.command_type(),
708                    request_id,
709                    error
710                );
711            }
712            if ack_rx.await.is_err() {
713                panic!(
714                    "CRITICAL_FAILURE: journal commit_ack dropped for {} request_id {}. \
715                     Durability boundary is unknown after state mutation.",
716                    env.command.command_type(),
717                    request_id
718                );
719            }
720            return;
721        }
722
723        let Some(ref journal_writer) = self.journal_writer else {
724            panic!(
725                "JOURNAL_FATAL: no journal configured for {} request_id {}",
726                env.command.command_type(),
727                request_id
728            );
729        };
730        journal_writer
731            .append_transition_with_fill_side_effects(
732                env.received_ts_ms,
733                &command_data,
734                None,
735                None,
736                &Default::default(),
737                &Default::default(),
738                0,
739                &[],
740                &[],
741                &[],
742                DbUuid(request_uuid),
743                Some(command_type_enum),
744            )
745            .unwrap_or_else(|error| {
746                panic!(
747                    "JOURNAL_FATAL: failed to journal {} for request_id {}: {}",
748                    env.command.command_type(),
749                    request_id,
750                    error
751                )
752            });
753    }
754
755    async fn journal_agent_auth_command(&self, env: &crate::rsm::apply::CommandEnvelope) {
756        use crate::journal::{JournalEntry, JournalMessage};
757        use crate::rsm::apply::{ApproveAgentPayload, EngineCommand, RevokeAgentPayload};
758        use hypercall_db_diesel::engine_enums::{CommandType, DbUuid};
759
760        #[cfg(feature = "rsm-state")]
761        let command_identity_hash = env.command.identity_hash();
762
763        let (command_data, command_type_enum) = match &env.command {
764            EngineCommand::ApproveAgent {
765                wallet,
766                agent,
767                expires_at_ms,
768                nonce,
769                timestamp_ms,
770            } => (
771                hypercall_types::serialize_to_wire_bytes(&ApproveAgentPayload {
772                    wallet: *wallet,
773                    agent: *agent,
774                    expires_at_ms: *expires_at_ms,
775                    nonce: *nonce,
776                    timestamp_ms: *timestamp_ms,
777                }),
778                CommandType::ApproveAgent,
779            ),
780            EngineCommand::RevokeAgent {
781                wallet,
782                agent,
783                nonce,
784                timestamp_ms,
785            } => (
786                hypercall_types::serialize_to_wire_bytes(&RevokeAgentPayload {
787                    wallet: *wallet,
788                    agent: *agent,
789                    nonce: *nonce,
790                    timestamp_ms: *timestamp_ms,
791                }),
792                CommandType::RevokeAgent,
793            ),
794            other => panic!(
795                "RUNTIME_INVARIANT: journal_agent_auth_command called for {}",
796                other.command_type()
797            ),
798        };
799
800        if let Some(ref batch_sender) = self.journal_batch_sender {
801            let (ack_tx, ack_rx) = tokio::sync::oneshot::channel();
802            let entry = JournalEntry {
803                received_ts_ms: env.received_ts_ms,
804                command_data,
805                response_data: None,
806                order_id: None,
807                pre_digest: Default::default(),
808                post_digest: Default::default(),
809                duration_ms: 0,
810                events: Vec::new(),
811                outbox_appends: Vec::new(),
812                fill_side_effects: Vec::new(),
813                cash_withdrawal_side_effect: None,
814                balance_updates: Vec::new(),
815                created_at: Instant::now(),
816                commit_ack: Some(ack_tx),
817                request_uuid: DbUuid(uuid::Uuid::new_v4()),
818                command_type_enum: Some(command_type_enum),
819                #[cfg(feature = "rsm-state")]
820                command_identity_hash,
821                #[cfg(feature = "rsm-state")]
822                rsm_state_digest: None,
823            };
824            if let Err(error) = batch_sender.send(JournalMessage::Entry(entry)).await {
825                panic!(
826                    "JOURNAL_FATAL: failed to journal {}: {}",
827                    env.command.command_type(),
828                    error
829                );
830            }
831            if ack_rx.await.is_err() {
832                panic!(
833                    "CRITICAL_FAILURE: journal commit_ack dropped for {}. \
834                     Durability boundary is unknown after state mutation.",
835                    env.command.command_type()
836                );
837            }
838            return;
839        }
840
841        let Some(ref journal_writer) = self.journal_writer else {
842            panic!(
843                "JOURNAL_FATAL: no journal configured for {}",
844                env.command.command_type()
845            );
846        };
847        journal_writer
848            .append_transition(
849                env.received_ts_ms,
850                &command_data,
851                None,
852                None,
853                &Default::default(),
854                &Default::default(),
855                0,
856                &[],
857                DbUuid(uuid::Uuid::new_v4()),
858                Some(command_type_enum),
859            )
860            .unwrap_or_else(|error| {
861                panic!(
862                    "JOURNAL_FATAL: failed to journal {}: {}",
863                    env.command.command_type(),
864                    error
865                )
866            });
867    }
868
869    fn agent_auth_journal_available(&self) -> bool {
870        self.journal_batch_sender.is_some()
871            || self
872                .journal_writer
873                .as_ref()
874                .is_some_and(|writer| writer.is_durable())
875    }
876
877    pub(crate) fn withdrawal_directive_journal_unavailable_error(
878        &self,
879        command_type: &str,
880        request_id: &str,
881    ) -> Option<String> {
882        if self.journal_batch_sender.is_some() {
883            return None;
884        }
885        Some(format!(
886            "{command_type} requires ENGINE_JOURNAL_ENABLED=true because directive outbox appends \
887             must be persisted atomically with the state transition; request_id {request_id} was not applied"
888        ))
889    }
890
891    async fn journal_external_cash_withdrawal_command(
892        &self,
893        env: &crate::rsm::apply::CommandEnvelope,
894        request_id: &str,
895        outbox_appends: Vec<crate::directive_outbox::DirectiveOutboxAppend>,
896        wallet: hypercall_types::WalletAddress,
897        amount: rust_decimal::Decimal,
898        balance_after: rust_decimal::Decimal,
899        timestamp_ms: u64,
900        balance_updates: Vec<hypercall_types::BalanceUpdate>,
901    ) {
902        use crate::journal::engine_journal_batcher::{
903            JournalCashWithdrawalSideEffect, JournalEntry, JournalMessage,
904        };
905        use crate::rsm::apply::EngineCommand;
906        use hypercall_db_diesel::engine_enums::{CommandType, DbUuid};
907
908        let request_uuid =
909            Self::require_external_durable_mutation_uuid("CashWithdrawalUpdate", request_id);
910        let EngineCommand::CashWithdrawalUpdate {
911            request_id: ref rid,
912            wallet: w,
913            account: a,
914            destination: d,
915            signer: s,
916            rsm_signer: rs,
917            amount: amt,
918            amount_wei: aw,
919            nonce: n,
920            timestamp_ms: ts,
921        } = env.command
922        else {
923            panic!(
924                "RUNTIME_INVARIANT: journal_external_cash_withdrawal_command called for {}",
925                env.command.command_type()
926            );
927        };
928        let command_data = hypercall_types::serialize_to_wire_bytes(&(
929            rid, &w, &a, &d, &s, &rs, &amt, &aw, &n, &ts,
930        ));
931        #[cfg(feature = "rsm-state")]
932        let command_identity_hash = env.command.identity_hash();
933
934        let side_effect = JournalCashWithdrawalSideEffect {
935            wallet,
936            request_id: request_id.to_string(),
937            amount,
938            balance_after,
939            timestamp_ms,
940        };
941
942        if let Some(ref batch_sender) = self.journal_batch_sender {
943            let (ack_tx, ack_rx) = tokio::sync::oneshot::channel();
944            let entry = JournalEntry {
945                received_ts_ms: env.received_ts_ms,
946                command_data,
947                response_data: None,
948                order_id: None,
949                pre_digest: Default::default(),
950                post_digest: Default::default(),
951                duration_ms: 0,
952                events: Vec::new(),
953                outbox_appends,
954                fill_side_effects: Vec::new(),
955                cash_withdrawal_side_effect: Some(side_effect),
956                balance_updates,
957                created_at: std::time::Instant::now(),
958                commit_ack: Some(ack_tx),
959                request_uuid: DbUuid(request_uuid),
960                command_type_enum: Some(CommandType::CashWithdrawalUpdate),
961                #[cfg(feature = "rsm-state")]
962                command_identity_hash,
963                #[cfg(feature = "rsm-state")]
964                rsm_state_digest: None,
965            };
966            if let Err(error) = batch_sender.send(JournalMessage::Entry(entry)).await {
967                panic!(
968                    "JOURNAL_FATAL: failed to journal CashWithdrawalUpdate for request_id {}: {}",
969                    request_id, error
970                );
971            }
972            if ack_rx.await.is_err() {
973                panic!(
974                    "CRITICAL_FAILURE: journal commit_ack dropped for CashWithdrawalUpdate request_id {}. \
975                     Durability boundary is unknown after state mutation.",
976                    request_id
977                );
978            }
979            return;
980        }
981
982        let Some(ref journal_writer) = self.journal_writer else {
983            panic!(
984                "JOURNAL_FATAL: no journal configured for CashWithdrawalUpdate request_id {}",
985                request_id
986            );
987        };
988        if !outbox_appends.is_empty() {
989            panic!(
990                "JOURNAL_FATAL: synchronous journal path cannot persist directive outbox appends for CashWithdrawalUpdate request_id {}",
991                request_id
992            );
993        }
994        journal_writer
995            .append_transition_with_fill_side_effects(
996                env.received_ts_ms,
997                &command_data,
998                None,
999                None,
1000                &Default::default(),
1001                &Default::default(),
1002                0,
1003                &[],
1004                &[],
1005                &balance_updates,
1006                DbUuid(request_uuid),
1007                Some(CommandType::CashWithdrawalUpdate),
1008            )
1009            .unwrap_or_else(|error| {
1010                panic!(
1011                    "JOURNAL_FATAL: failed to journal CashWithdrawalUpdate for request_id {}: {}",
1012                    request_id, error
1013                )
1014            });
1015    }
1016
1017    async fn prepare_tick_expiry_env(
1018        &self,
1019        now_ms: u64,
1020    ) -> Result<crate::rsm::apply::CommandEnvelope, EngineError> {
1021        let context = self
1022            .expiry_manager
1023            .prepare_tick_expiry_context(now_ms, &self.ctx, &self.margin_manager)
1024            .await
1025            .map_err(EngineError::Internal)?;
1026        self.validate_tick_expiry_nats_payload(now_ms, &context)?;
1027        Ok(crate::rsm::apply::CommandEnvelope::new(
1028            now_ms,
1029            crate::rsm::apply::EngineCommand::TickExpiry { now_ms, context },
1030        ))
1031    }
1032
1033    fn replay_owned_expiry_command_payload(
1034        env: &crate::rsm::apply::CommandEnvelope,
1035    ) -> Option<(
1036        hypercall_db_diesel::engine_enums::CommandType,
1037        Vec<u8>,
1038        &'static str,
1039    )> {
1040        match &env.command {
1041            crate::rsm::apply::EngineCommand::MarketAction(command)
1042                if matches!(
1043                    command.message.action,
1044                    hypercall_types::MarketAction::ExpireMarket
1045                ) =>
1046            {
1047                Some((
1048                    hypercall_db_diesel::engine_enums::CommandType::ExpireMarket,
1049                    hypercall_types::serialize_to_wire_bytes(command),
1050                    "ExpireMarket",
1051                ))
1052            }
1053            crate::rsm::apply::EngineCommand::TickExpiry { now_ms, context }
1054                if !context.due_expiries.is_empty() || !context.pending_settlements.is_empty() =>
1055            {
1056                Some((
1057                    hypercall_db_diesel::engine_enums::CommandType::TickExpiry,
1058                    hypercall_types::serialize_to_wire_bytes(&(*now_ms, context)),
1059                    "TickExpiry",
1060                ))
1061            }
1062            _ => None,
1063        }
1064    }
1065
1066    async fn journal_replay_owned_expiry_command(
1067        &self,
1068        env: &crate::rsm::apply::CommandEnvelope,
1069        balance_updates: &[hypercall_types::BalanceUpdate],
1070    ) {
1071        use crate::journal::JournalMessage;
1072
1073        if let Some(ref batch_sender) = self.journal_batch_sender {
1074            let (ack_tx, ack_rx) = tokio::sync::oneshot::channel();
1075            let Some((entry, command_label)) = Self::replay_owned_expiry_journal_entry(
1076                env,
1077                balance_updates,
1078                hypercall_db_diesel::engine_enums::DbUuid(uuid::Uuid::now_v7()),
1079                Some(ack_tx),
1080            ) else {
1081                return;
1082            };
1083
1084            if let Err(error) = batch_sender.send(JournalMessage::Entry(entry)).await {
1085                panic!(
1086                    "JOURNAL_FATAL: failed to enqueue {} for replay journal: {}",
1087                    command_label, error
1088                );
1089            }
1090            if ack_rx.await.is_err() {
1091                panic!(
1092                    "JOURNAL_FATAL: replay journal commit ack dropped for {}",
1093                    command_label
1094                );
1095            }
1096            return;
1097        }
1098
1099        let Some((entry, command_label)) = Self::replay_owned_expiry_journal_entry(
1100            env,
1101            balance_updates,
1102            hypercall_db_diesel::engine_enums::DbUuid(uuid::Uuid::now_v7()),
1103            None,
1104        ) else {
1105            return;
1106        };
1107
1108        let Some(ref journal_writer) = self.journal_writer else {
1109            if self.ctx.db.is_some() {
1110                panic!("JOURNAL_FATAL: no journal configured for {}", command_label);
1111            }
1112            return;
1113        };
1114        journal_writer
1115            .append_transition_with_fill_side_effects(
1116                entry.received_ts_ms,
1117                &entry.command_data,
1118                None,
1119                None,
1120                &Default::default(),
1121                &Default::default(),
1122                0,
1123                &[],
1124                &[],
1125                &entry.balance_updates,
1126                entry.request_uuid,
1127                entry.command_type_enum,
1128            )
1129            .unwrap_or_else(|error| {
1130                panic!(
1131                    "JOURNAL_FATAL: failed to journal {} for replay: {}",
1132                    command_label, error
1133                )
1134            });
1135    }
1136
1137    fn replay_owned_expiry_journal_entry(
1138        env: &crate::rsm::apply::CommandEnvelope,
1139        balance_updates: &[hypercall_types::BalanceUpdate],
1140        request_uuid: hypercall_db_diesel::engine_enums::DbUuid,
1141        commit_ack: Option<tokio::sync::oneshot::Sender<()>>,
1142    ) -> Option<(crate::journal::JournalEntry, &'static str)> {
1143        let (command_type_enum, command_data, command_label) =
1144            Self::replay_owned_expiry_command_payload(env)?;
1145        #[cfg(feature = "rsm-state")]
1146        let command_identity_hash = env.command.identity_hash();
1147
1148        Some((
1149            crate::journal::JournalEntry {
1150                received_ts_ms: env.received_ts_ms,
1151                command_data,
1152                response_data: None,
1153                order_id: None,
1154                pre_digest: Default::default(),
1155                post_digest: Default::default(),
1156                duration_ms: 0,
1157                events: Vec::new(),
1158                outbox_appends: Vec::new(),
1159                fill_side_effects: Vec::new(),
1160                cash_withdrawal_side_effect: None,
1161                balance_updates: balance_updates.to_vec(),
1162                created_at: Instant::now(),
1163                commit_ack,
1164                request_uuid,
1165                command_type_enum: Some(command_type_enum),
1166                #[cfg(feature = "rsm-state")]
1167                command_identity_hash,
1168                #[cfg(feature = "rsm-state")]
1169                rsm_state_digest: None,
1170            },
1171            command_label,
1172        ))
1173    }
1174
1175    fn validate_tick_expiry_nats_payload(
1176        &self,
1177        now_ms: u64,
1178        context: &crate::rsm::apply::TickExpiryContext,
1179    ) -> Result<(), EngineError> {
1180        if self.nats_publisher.is_none() {
1181            return Ok(());
1182        }
1183
1184        let command_data = hypercall_types::serialize_to_wire_bytes(&(now_ms, context));
1185        let total_payload_len = crate::nats::COMMAND_PAYLOAD_PREFIX_LEN + command_data.len();
1186        if total_payload_len > super::MAX_EXPIRY_NATS_PAYLOAD_BYTES {
1187            return Err(EngineError::Internal(format!(
1188                "TickExpiry NATS payload is {} bytes, exceeding {} byte limit. due_groups={}, pending_groups={}, settlement_prices={}, margin_modes={}",
1189                total_payload_len,
1190                super::MAX_EXPIRY_NATS_PAYLOAD_BYTES,
1191                context.due_expiries.len(),
1192                context.pending_settlements.len(),
1193                context.settlement_prices.len(),
1194                context.margin_modes.len()
1195            )));
1196        }
1197
1198        Ok(())
1199    }
1200
1201    fn validate_manual_market_expiry_nats_payload(
1202        &self,
1203        command: &crate::rsm::apply::MarketActionCommand,
1204    ) -> Result<(), EngineError> {
1205        if self.nats_publisher.is_none() || command.expiry_context.is_none() {
1206            return Ok(());
1207        }
1208
1209        let command_data = hypercall_types::serialize_to_wire_bytes(command);
1210        let total_payload_len = crate::nats::COMMAND_PAYLOAD_PREFIX_LEN + command_data.len();
1211        if total_payload_len > super::MAX_EXPIRY_NATS_PAYLOAD_BYTES {
1212            let context = command.expiry_context.as_ref().expect("checked above");
1213            return Err(EngineError::Internal(format!(
1214                "ExpireMarket NATS payload is {} bytes, exceeding {} byte limit for {}. due_groups={}, settlement_prices={}, margin_modes={}",
1215                total_payload_len,
1216                super::MAX_EXPIRY_NATS_PAYLOAD_BYTES,
1217                command.message.market.symbol,
1218                context.due_expiries.len(),
1219                context.settlement_prices.len(),
1220                context.margin_modes.len()
1221            )));
1222        }
1223
1224        Ok(())
1225    }
1226
1227    async fn prepare_manual_market_expiry_context(
1228        &self,
1229        message: &MarketActionMessage,
1230    ) -> Result<crate::rsm::apply::TickExpiryContext, EngineError> {
1231        let expiry_ts = crate::rsm::margin_manager::expiry_date_to_timestamp(
1232            &message.market.underlying,
1233            message.market.expiry,
1234        );
1235        let settlement_price = self
1236            .expiry_manager
1237            .get_settlement_price(&self.ctx.deps, &message.market.underlying, expiry_ts as i64)
1238            .await;
1239        let settlement_prices = match settlement_price {
1240            Some(price) => {
1241                vec![crate::rsm::apply::TickExpirySettlementPrice {
1242                    underlying: message.market.underlying.clone(),
1243                    expiry_ts: expiry_ts as i64,
1244                    price,
1245                }]
1246            }
1247            None => Vec::new(),
1248        };
1249
1250        let mut margin_modes = Vec::new();
1251        if !settlement_prices.is_empty() {
1252            for ((wallet, symbol), position) in &self.ctx.engine_positions {
1253                if symbol != &message.market.symbol
1254                    || position.quantity == rust_decimal::Decimal::ZERO
1255                {
1256                    continue;
1257                }
1258                let margin_mode = self
1259                    .ctx
1260                    .deps
1261                    .wallet_margin_modes
1262                    .get(wallet)
1263                    .copied()
1264                    .ok_or_else(|| {
1265                        EngineError::Internal(format!(
1266                            "Missing margin mode for {} while preparing manual expiry for {}",
1267                            wallet, message.market.symbol
1268                        ))
1269                    })?;
1270                margin_modes.push(crate::rsm::apply::TickExpiryWalletMarginMode {
1271                    wallet: *wallet,
1272                    margin_mode,
1273                    pm_settlement_required: margin_mode
1274                        == hypercall_margin::margin_mode::MarginMode::Portfolio
1275                        && self.ctx.deps.portfolio_margin_pool_enabled
1276                        && self
1277                            .ctx
1278                            .deps
1279                            .portfolio_margin_settlement_allowlist
1280                            .contains(wallet),
1281                });
1282            }
1283        }
1284        margin_modes.sort_by(|left, right| left.wallet.as_bytes().cmp(right.wallet.as_bytes()));
1285        margin_modes.dedup_by_key(|mode| mode.wallet);
1286
1287        let due_expiries = vec![crate::rsm::apply::TickExpiryDueGroup {
1288            expiry_ts: expiry_ts as i64,
1289            symbols: vec![message.market.symbol.clone()],
1290        }];
1291        let pending_settlements = Vec::new();
1292        let pm_settlements = self
1293            .expiry_manager
1294            .prepare_pm_settlements(
1295                &due_expiries,
1296                &pending_settlements,
1297                &settlement_prices,
1298                &margin_modes,
1299                message.timestamp,
1300                &self.ctx,
1301                &self.margin_manager,
1302            )
1303            .map_err(EngineError::Internal)?;
1304
1305        Ok(crate::rsm::apply::TickExpiryContext {
1306            due_expiries,
1307            pending_settlements,
1308            settlement_prices,
1309            margin_modes,
1310            pm_settlements,
1311        })
1312    }
1313
1314    async fn apply_expiry_effects_and_events(
1315        &mut self,
1316        output: &crate::rsm::apply::ApplyOutput,
1317        req_id: &str,
1318    ) -> Result<(), EngineError> {
1319        for effect in &output.expiry_effects {
1320            self.apply_expiry_effect(effect)?;
1321        }
1322        self.apply_pm_settlement_projection_effects_sync(&output.pm_settlement_effects, req_id)
1323            .map_err(EngineError::Internal)?;
1324        self.apply_replayed_events_sync(&output.events, req_id)
1325            .await;
1326        Ok(())
1327    }
1328
1329    async fn publish_tick_expiry_balance_updates(
1330        &self,
1331        output: &crate::rsm::apply::ApplyOutput,
1332        context: &'static str,
1333    ) {
1334        if output.balance_updates.is_empty() {
1335            return;
1336        }
1337
1338        debug!(
1339            balance_update_count = output.balance_updates.len(),
1340            context, "Publishing TickExpiry settlement balance updates"
1341        );
1342        self.publish_balance_updates_to_nats(&output.balance_updates)
1343            .await;
1344    }
1345
1346    async fn apply_market_effects(
1347        &self,
1348        output: &crate::rsm::apply::ApplyOutput,
1349    ) -> Result<(), EngineError> {
1350        for effect in &output.market_effects {
1351            match effect {
1352                crate::rsm::apply::MarketEffect::SaveMarketAndInstrument {
1353                    underlying,
1354                    expiry,
1355                    instrument,
1356                } => {
1357                    if let Some(handler) = self.ctx.db.as_ref() {
1358                        if let Err(error) =
1359                            handler.save_market_and_instrument_sync(underlying, *expiry, instrument)
1360                        {
1361                            panic!(
1362                                "CRITICAL_FAILURE: Failed to persist market {} to database: {}. \
1363                                 Market will be lost on restart. Restart required.",
1364                                instrument.id, error
1365                            );
1366                        }
1367                    }
1368                }
1369                crate::rsm::apply::MarketEffect::DeleteMarketAndInstrument { symbol } => {
1370                    if let Some(handler) = self.ctx.db.as_ref() {
1371                        if let Err(error) = handler.delete_market_and_instrument_sync(symbol) {
1372                            panic!(
1373                                "CRITICAL_FAILURE: Failed to delete market {} from database: {}. \
1374                                 In-memory and persisted state would diverge.",
1375                                symbol, error
1376                            );
1377                        }
1378                    }
1379                }
1380                crate::rsm::apply::MarketEffect::RegisterSettlement {
1381                    underlying,
1382                    symbol,
1383                    expiry_ts,
1384                    twap_window_seconds,
1385                } => {
1386                    let Some(oracle) = self.ctx.deps.mark_price_oracles.get(underlying) else {
1387                        warn!(
1388                            "Missing mark price oracle for {} while registering settlement for {} expiry {} window {}s",
1389                            underlying, symbol, expiry_ts, twap_window_seconds
1390                        );
1391                        continue;
1392                    };
1393                    oracle
1394                        .register_settlement(*expiry_ts, *twap_window_seconds)
1395                        .await;
1396                    debug!(
1397                        "Registered TWAP settlement for {} at expiry {}",
1398                        symbol, expiry_ts
1399                    );
1400                }
1401            }
1402        }
1403        Ok(())
1404    }
1405
1406    pub(crate) fn apply_pm_settlement_projection_effects_sync(
1407        &self,
1408        effects: &[crate::rsm::portfolio_margin::settlement_state::PmSettlementProjectionEffect],
1409        _request_id: &str,
1410    ) -> Result<(), String> {
1411        if effects.is_empty() {
1412            return Ok(());
1413        }
1414        let handler = self
1415            .ctx
1416            .db
1417            .as_ref()
1418            .ok_or_else(|| "PM settlement projection write requires a database".to_string())?;
1419        let writes = Self::pm_settlement_projection_writes_from_effects(effects)?;
1420        handler
1421            .apply_pm_settlement_projection_writes_sync(&writes)
1422            .map_err(|error| format!("PM settlement projection write failed: {error}"))
1423    }
1424
1425    pub(crate) fn pm_settlement_projection_writes_from_effects(
1426        effects: &[crate::rsm::portfolio_margin::settlement_state::PmSettlementProjectionEffect],
1427    ) -> Result<Vec<hypercall_db::PmSettlementProjectionWrite>, String> {
1428        let mut writes = Vec::with_capacity(effects.len());
1429        for effect in effects {
1430            match effect {
1431                crate::rsm::portfolio_margin::settlement_state::PmSettlementProjectionEffect::PoolUpsert(pool_state) => {
1432                    let capacity = pool_state.pool_available_usdc
1433                        + pool_state.active_timing_bridge_usdc
1434                        + pool_state.active_settlement_debt_usdc;
1435                    let config = pool_state.config.as_ref();
1436                    let updated_at_ms = i64::try_from(pool_state.updated_at_ms)
1437                        .map_err(|_| "PM settlement pool projection timestamp exceeds i64".to_string())?;
1438                    writes.push(hypercall_db::PmSettlementProjectionWrite::Pool(
1439                        hypercall_db::PmSettlementPoolProjectionWrite {
1440                            underlying: pool_state.underlying.clone(),
1441                            config_version: Self::pm_projection_version_i32(
1442                                "config_version",
1443                                pool_state.config_version,
1444                            )?,
1445                            policy_version: Self::pm_projection_version_i32(
1446                                "policy_version",
1447                                pool_state.policy_version,
1448                            )?,
1449                            pool_available_usdc: pool_state.pool_available_usdc,
1450                            pool_target_usdc: pool_state.pool_target_usdc,
1451                            pool_capacity_usdc: capacity,
1452                            pool_utilization: pool_state.utilization,
1453                            active_timing_bridge_usdc: pool_state.active_timing_bridge_usdc,
1454                            active_settlement_debt_usdc: pool_state.active_settlement_debt_usdc,
1455                            target_short_oi_notional_multiplier: config
1456                                .map(|c| c.target_short_oi_notional_multiplier),
1457                            utilization_kink: config.map(|c| c.utilization_kink),
1458                            apr_at_kink: config.map(|c| c.apr_at_kink),
1459                            max_apr: config.map(|c| c.max_apr),
1460                            normal_utilization_cap: config.map(|c| c.normal_utilization_cap),
1461                            crisis_utilization_cap: config.map(|c| c.crisis_utilization_cap),
1462                            bridge_window_ms: config.map(|c| c.bridge_window_ms),
1463                            projection_seq: updated_at_ms,
1464                            updated_at_ms,
1465                        },
1466                    ));
1467                }
1468                crate::rsm::portfolio_margin::settlement_state::PmSettlementProjectionEffect::AccountUpsert(account) => {
1469                    let updated_at_ms = i64::try_from(account.updated_at_ms)
1470                        .map_err(|_| "PM settlement account projection timestamp exceeds i64".to_string())?;
1471                    writes.push(hypercall_db::PmSettlementProjectionWrite::Account(
1472                        hypercall_db::PmSettlementAccountProjectionWrite {
1473                            wallet: account.wallet,
1474                            underlying: account.underlying.clone(),
1475                            status: format!("{:?}", account.status),
1476                            timing_bridge_principal_usdc: account.bridge_principal_usdc,
1477                            settlement_debt_principal_usdc: account.debt_principal_usdc,
1478                            accrued_interest_usdc: account.accrued_interest_usdc,
1479                            interest_cursor_ms: account.last_interest_accrual_ms,
1480                            bridge_deadline_ms: account.bridge_deadline_ms,
1481                            active_recovery_plan_id: account.active_recovery_plan_id.clone(),
1482                            policy_version: Self::pm_projection_version_i32(
1483                                "policy_version",
1484                                account.policy_version,
1485                            )?,
1486                            projection_seq: updated_at_ms,
1487                            updated_at_ms,
1488                        },
1489                    ));
1490                }
1491                crate::rsm::portfolio_margin::settlement_state::PmSettlementProjectionEffect::InterestEvent(event) => {
1492                    writes.push(hypercall_db::PmSettlementProjectionWrite::InterestEvent(
1493                        hypercall_db::PmSettlementInterestEventProjectionWrite {
1494                            request_id: event.request_id,
1495                            wallet: event.wallet,
1496                            underlying: event.underlying.clone(),
1497                            from_ms: event.accrual_start_ms,
1498                            to_ms: event.accrual_end_ms,
1499                            utilization: event.utilization,
1500                            apr: event.apr,
1501                            interest_usdc: event.interest_amount_usdc,
1502                            policy_version: Self::pm_projection_version_i32(
1503                                "policy_version",
1504                                event.policy_version,
1505                            )?,
1506                        },
1507                    ));
1508                }
1509                crate::rsm::portfolio_margin::settlement_state::PmSettlementProjectionEffect::RepaymentEvent(event) => {
1510                    writes.push(hypercall_db::PmSettlementProjectionWrite::RepaymentEvent(
1511                        hypercall_db::PmSettlementRepaymentEventProjectionWrite {
1512                            request_id: event.request_id,
1513                            wallet: event.wallet,
1514                            underlying: event.underlying.clone(),
1515                            amount_usdc: event.amount_usdc,
1516                            interest_paid_usdc: event.interest_paid_usdc,
1517                            principal_paid_usdc: event.principal_paid_usdc,
1518                            reason: event.reason.clone(),
1519                            source_event_id: event.source_event_id.clone(),
1520                        },
1521                    ));
1522                }
1523                crate::rsm::portfolio_margin::settlement_state::PmSettlementProjectionEffect::VaultDepositUpsert(deposit) => {
1524                    let projection_seq = i64::try_from(deposit.updated_at_ms)
1525                        .map_err(|_| "PM vault deposit projection timestamp exceeds i64".to_string())?;
1526                    let created_at_ms = i64::try_from(deposit.created_at_ms)
1527                        .map_err(|_| "PM vault deposit created_at timestamp exceeds i64".to_string())?;
1528                    let chain_id = i64::try_from(deposit.chain_id)
1529                        .map_err(|_| "PM vault deposit chain_id exceeds i64".to_string())?;
1530                    let log_index = i32::try_from(deposit.log_index)
1531                        .map_err(|_| "PM vault deposit log_index exceeds i32".to_string())?;
1532                    writes.push(hypercall_db::PmSettlementProjectionWrite::VaultDeposit(
1533                        hypercall_db::PmVaultDepositProjectionWrite {
1534                            deposit_id: deposit.deposit_id,
1535                            depositor: deposit.depositor,
1536                            underlying: deposit.underlying.clone(),
1537                            principal_usdc: deposit.principal_usdc,
1538                            remaining_usdc: deposit.remaining_usdc,
1539                            withdrawn_usdc: deposit.withdrawn_usdc,
1540                            reserved_withdrawal_usdc: deposit.reserved_withdrawal_usdc,
1541                            chain_id,
1542                            source_contract_address: deposit.source_contract_address,
1543                            tx_hash: deposit.tx_hash.clone(),
1544                            log_index,
1545                            max_listed_expiry_ts_ms: deposit.max_listed_expiry_ts_ms,
1546                            settlement_grace_ms: deposit.settlement_grace_ms,
1547                            lock_until_ms: deposit.lock_until_ms,
1548                            status: match deposit.status {
1549                                crate::rsm::portfolio_margin::settlement_state::PmVaultDepositStatus::Active => {
1550                                    hypercall_db::PmVaultDepositProjectionStatus::Active
1551                                }
1552                                crate::rsm::portfolio_margin::settlement_state::PmVaultDepositStatus::PartiallyReserved => {
1553                                    hypercall_db::PmVaultDepositProjectionStatus::PartiallyReserved
1554                                }
1555                                crate::rsm::portfolio_margin::settlement_state::PmVaultDepositStatus::Reserved => {
1556                                    hypercall_db::PmVaultDepositProjectionStatus::Reserved
1557                                }
1558                            },
1559                            projection_seq,
1560                            created_at_ms,
1561                            updated_at_ms: projection_seq,
1562                        },
1563                    ));
1564                }
1565                crate::rsm::portfolio_margin::settlement_state::PmSettlementProjectionEffect::VaultWithdrawalUpsert(withdrawal) => {
1566                    let projection_seq = i64::try_from(withdrawal.updated_at_ms)
1567                        .map_err(|_| "PM vault withdrawal projection timestamp exceeds i64".to_string())?;
1568                    let requested_at_ms = i64::try_from(withdrawal.requested_at_ms)
1569                        .map_err(|_| "PM vault withdrawal requested_at timestamp exceeds i64".to_string())?;
1570                    writes.push(hypercall_db::PmSettlementProjectionWrite::VaultWithdrawal(
1571                        hypercall_db::PmVaultWithdrawalProjectionWrite {
1572                            withdrawal_id: withdrawal.withdrawal_id,
1573                            deposit_id: withdrawal.deposit_id,
1574                            depositor: withdrawal.depositor,
1575                            underlying: withdrawal.underlying.clone(),
1576                            amount_usdc: withdrawal.amount_usdc,
1577                            lock_until_ms: withdrawal.lock_until_ms,
1578                            status: match withdrawal.status {
1579                                crate::rsm::portfolio_margin::settlement_state::PmVaultWithdrawalStatus::Reserved => {
1580                                    hypercall_db::PmVaultWithdrawalProjectionStatus::Reserved
1581                                }
1582                            },
1583                            projection_seq,
1584                            requested_at_ms,
1585                            updated_at_ms: projection_seq,
1586                        },
1587                    ));
1588                }
1589                crate::rsm::portfolio_margin::settlement_state::PmSettlementProjectionEffect::EventUpsert(event) => {
1590                    let event_key = format!(
1591                        "{}:{}:{}:{}:{}",
1592                        event.event_key.wallet,
1593                        event.event_key.market_id,
1594                        event.event_key.expiry_ts_ms,
1595                        event.event_key.margin_mode,
1596                        event.event_key.settlement_event_sequence
1597                    );
1598                    writes.push(hypercall_db::PmSettlementProjectionWrite::Event(
1599                        hypercall_db::PmSettlementEventProjectionWrite {
1600                            event_key,
1601                            wallet: event.event_key.wallet,
1602                            underlying: event.underlying.clone(),
1603                            event_type: event.status.clone(),
1604                            amount_usdc: event.amount_usdc,
1605                            request_id: event.request_id,
1606                            input_digest: event.input_digest.clone(),
1607                        },
1608                    ));
1609                }
1610                crate::rsm::portfolio_margin::settlement_state::PmSettlementProjectionEffect::RecoveryPlanUpsert(plan) => {
1611                    let updated_at_ms = i64::try_from(plan.updated_at_ms)
1612                        .map_err(|_| "PM recovery plan projection timestamp exceeds i64".to_string())?;
1613                    let mut actions = Vec::with_capacity(plan.actions.len());
1614                    for action in &plan.actions {
1615                        let action_updated_at_ms = i64::try_from(action.updated_at_ms)
1616                            .map_err(|_| "PM recovery action projection timestamp exceeds i64".to_string())?;
1617                        actions.push(hypercall_db::PmRecoveryActionProjectionWrite {
1618                            plan_id: plan.plan_id.clone(),
1619                            action_id: format!("{}:{}", plan.plan_id, action.action_index),
1620                            action_type: action.action_type.clone(),
1621                            status: action.status.clone(),
1622                            target: action.target.clone(),
1623                            attempt: Self::pm_projection_version_i32("attempt", action.attempt)?,
1624                            external_id: action.submitted_external_id.clone(),
1625                            external_kind: action.external_kind.clone(),
1626                            result_external_id: action.result_external_id.clone(),
1627                            result: action.result.clone(),
1628                            expected_usdc_recovered: action.expected_usdc_recovered,
1629                            expected_obligation_reduced: action.expected_obligation_reduced,
1630                            expected_impact_usdc: action.expected_impact_usdc,
1631                            recovered_usdc: action.recovered_usdc,
1632                            liability_reduction_usdc: action.liability_reduction_usdc,
1633                            projection_seq: action_updated_at_ms,
1634                            updated_at_ms: action_updated_at_ms,
1635                        });
1636                    }
1637                    writes.push(hypercall_db::PmSettlementProjectionWrite::RecoveryPlan(
1638                        hypercall_db::PmRecoveryPlanProjectionWrite {
1639                            plan_id: plan.plan_id.clone(),
1640                            wallet: plan.wallet,
1641                            underlying: plan.underlying.clone(),
1642                            status: plan.status.clone(),
1643                            trigger: plan.trigger.clone(),
1644                            reason: plan.reason.clone(),
1645                            policy_version: Self::pm_projection_version_i32(
1646                                "policy_version",
1647                                plan.policy_version,
1648                            )?,
1649                            recovery_priority_version: Self::pm_projection_version_i32(
1650                                "recovery_priority_version",
1651                                plan.recovery_priority_version,
1652                            )?,
1653                            target_reduction_usdc: plan.target_reduction_usdc,
1654                            expected_usdc_recovered: plan.expected_usdc_recovered,
1655                            expected_obligation_reduced: plan.expected_obligation_reduced,
1656                            expected_impact_usdc: plan.expected_impact_usdc,
1657                            post_plan_utilization: plan.post_plan_utilization,
1658                            projection_seq: updated_at_ms,
1659                            updated_at_ms,
1660                            actions,
1661                        },
1662                    ));
1663                }
1664            }
1665        }
1666        Ok(writes)
1667    }
1668
1669    fn pm_projection_version_i32(field: &str, value: u32) -> Result<i32, String> {
1670        i32::try_from(value).map_err(|_| {
1671            format!("PM settlement projection {field} {value} exceeds signed 32-bit integer range")
1672        })
1673    }
1674
1675    pub(crate) fn apply_expiry_effect(
1676        &mut self,
1677        effect: &crate::rsm::apply::ExpiryEffect,
1678    ) -> Result<(), EngineError> {
1679        match effect {
1680            crate::rsm::apply::ExpiryEffect::UpdateInstrumentStatus { symbols, status } => {
1681                let Some(handler) = self.ctx.db.as_ref() else {
1682                    panic!(
1683                        "CRITICAL_FAILURE: Missing database handler for instrument status update {} on {:?}. \
1684                         Engine state has advanced without a durable expiry sink. Restart required.",
1685                        status, symbols
1686                    );
1687                };
1688                let update_result = if status == "EXPIRED_PENDING_PRICE" {
1689                    handler.transition_active_instruments_to_expired_pending_sync(symbols)
1690                } else {
1691                    handler.update_instrument_status_sync(symbols, status)
1692                };
1693                if let Err(error) = update_result {
1694                    panic!(
1695                        "CRITICAL_FAILURE: Failed to update instrument status to {} for {:?}: {}. \
1696                         Memory state and database are now inconsistent. Restart required.",
1697                        status, symbols, error
1698                    );
1699                }
1700                if status == "SETTLED" {
1701                    for symbol in symbols {
1702                        match crate::shared::ParsedSymbol::from_symbol(symbol) {
1703                            Ok(parsed) => {
1704                                crate::observability::record_settlement(&parsed.underlying, true);
1705                            }
1706                            Err(error) => {
1707                                warn!(
1708                                    "Failed to parse settled symbol {} for settlement metric: {}",
1709                                    symbol, error
1710                                );
1711                            }
1712                        }
1713                    }
1714                }
1715            }
1716            crate::rsm::apply::ExpiryEffect::BatchCancelOrdersForSettlement {
1717                order_ids,
1718                now_ms,
1719            } => {
1720                let Some(handler) = self.ctx.db.as_ref() else {
1721                    panic!(
1722                        "CRITICAL_FAILURE: Missing database handler for {} settlement order cancels at {}. \
1723                         Engine state has advanced without a durable expiry sink. Restart required.",
1724                        order_ids.len(),
1725                        now_ms
1726                    );
1727                };
1728                for chunk in order_ids.chunks(5000) {
1729                    if let Err(error) =
1730                        handler.batch_cancel_orders_for_settlement_sync(chunk, *now_ms as i64)
1731                    {
1732                        panic!(
1733                            "CRITICAL_FAILURE: Failed to persist {} settlement order cancels: {}. \
1734                             Memory state and database are now inconsistent. Restart required.",
1735                            chunk.len(),
1736                            error
1737                        );
1738                    }
1739                }
1740            }
1741            crate::rsm::apply::ExpiryEffect::CancelOrphanedOrdersBySymbols { symbols } => {
1742                let Some(handler) = self.ctx.db.as_ref() else {
1743                    panic!(
1744                        "CRITICAL_FAILURE: Missing database handler for orphaned order cleanup on {:?}. \
1745                         Engine state has advanced without a durable expiry sink. Restart required.",
1746                        symbols
1747                    );
1748                };
1749                match handler.cancel_orphaned_orders_by_symbols_sync(symbols) {
1750                    Ok(0) => {}
1751                    Ok(n) => {
1752                        warn!(
1753                            "Settlement cleanup: cancelled {} orphaned order_infos rows on {} instruments",
1754                            n,
1755                            symbols.len()
1756                        );
1757                    }
1758                    Err(error) => {
1759                        panic!(
1760                            "CRITICAL_FAILURE: Failed settlement orphan cleanup for {:?}: {}. \
1761                             Orphaned orders may remain in order_infos. Restart required.",
1762                            symbols, error
1763                        );
1764                    }
1765                }
1766            }
1767            crate::rsm::apply::ExpiryEffect::ApplySettlement(intent) => {
1768                let Some(handler) = self.ctx.db.as_ref() else {
1769                    panic!(
1770                        "CRITICAL_FAILURE: Missing database handler for durable settlement of {}/{} at {}. \
1771                         Engine state has advanced without a settlement sink. Restart required.",
1772                        intent.wallet, intent.symbol, intent.event_ts_ms
1773                    );
1774                };
1775                let outcome = match handler.try_apply_settlement_sync(
1776                    &intent.wallet,
1777                    &intent.symbol,
1778                    intent.position_size,
1779                    intent.settlement_price,
1780                    intent.settlement_value,
1781                    intent.margin_mode,
1782                    intent.event_ts_ms,
1783                    intent.settlement_entry_price,
1784                    intent.cost_basis,
1785                    intent.net_pnl,
1786                ) {
1787                    Ok(outcome) => outcome,
1788                    Err(error) => {
1789                        panic!(
1790                            "CRITICAL_FAILURE: Failed to apply durable settlement for {}/{}: {}. \
1791                             Engine state has advanced but durable settlement is unknown. Restart required.",
1792                            intent.wallet, intent.symbol, error
1793                        );
1794                    }
1795                };
1796                self.reconcile_settlement_balance(intent, outcome);
1797            }
1798        }
1799
1800        Ok(())
1801    }
1802
1803    pub(crate) fn apply_standby_expiry_effect(
1804        &mut self,
1805        effect: &crate::rsm::apply::ExpiryEffect,
1806    ) -> Result<(), EngineError> {
1807        match effect {
1808            crate::rsm::apply::ExpiryEffect::ApplySettlement(intent) => {
1809                let Some(handler) = self.ctx.db.as_ref() else {
1810                    panic!(
1811                        "CRITICAL_FAILURE: Missing database handler for replayed settlement of {}/{} at {}. \
1812                         Standby memory state has advanced without a durable settlement source.",
1813                        intent.wallet, intent.symbol, intent.event_ts_ms
1814                    );
1815                };
1816                let outcome = match Self::observe_standby_settlement_with_retry(handler, intent) {
1817                    Ok(outcome) => outcome,
1818                    Err(error) => {
1819                        panic!(
1820                            "CRITICAL_FAILURE: Failed to observe durable replayed settlement for {}/{}: {}. \
1821                             Standby memory state has advanced but durable settlement is unknown.",
1822                            intent.wallet, intent.symbol, error
1823                        );
1824                    }
1825                };
1826                self.reconcile_settlement_balance(intent, outcome);
1827            }
1828            crate::rsm::apply::ExpiryEffect::UpdateInstrumentStatus { symbols, status } => {
1829                debug!(
1830                    status,
1831                    symbols = ?symbols,
1832                    "Skipping primary-applied instrument status expiry effect during standby replay"
1833                );
1834            }
1835            crate::rsm::apply::ExpiryEffect::BatchCancelOrdersForSettlement {
1836                order_ids,
1837                now_ms,
1838            } => {
1839                debug!(
1840                    order_count = order_ids.len(),
1841                    now_ms,
1842                    "Skipping primary-applied settlement order cancel effect during standby replay"
1843                );
1844            }
1845            crate::rsm::apply::ExpiryEffect::CancelOrphanedOrdersBySymbols { symbols } => {
1846                debug!(
1847                    symbols = ?symbols,
1848                    "Skipping primary-applied orphan cleanup expiry effect during standby replay"
1849                );
1850            }
1851        }
1852
1853        Ok(())
1854    }
1855
1856    /// Apply TickExpiry runtime effects after deterministic command replay.
1857    ///
1858    /// The replayed command remains the authority for in-memory balance state.
1859    /// This recovery-only path only catches durable projections up to the WAL
1860    /// command: it writes idempotent settlement evidence, persists idempotent
1861    /// status/cancel projections, and then checks that replayed engine cash
1862    /// matches the durable balance. It must never set `balance_ledger` from
1863    /// durable accounting.
1864    pub(crate) fn apply_replayed_expiry_effects(
1865        &mut self,
1866        effects: &[crate::rsm::apply::ExpiryEffect],
1867    ) -> Result<(), EngineError> {
1868        for effect in effects {
1869            match effect {
1870                crate::rsm::apply::ExpiryEffect::ApplySettlement(intent) => {
1871                    let Some(handler) = self.ctx.db.as_ref() else {
1872                        panic!(
1873                            "CRITICAL_FAILURE: Missing database handler for replayed settlement of {}/{} at {}. \
1874                             Recovery cannot persist settlement cash replay.",
1875                            intent.wallet, intent.symbol, intent.event_ts_ms
1876                        );
1877                    };
1878                    let outcome = match handler.try_apply_settlement_sync(
1879                        &intent.wallet,
1880                        &intent.symbol,
1881                        intent.position_size,
1882                        intent.settlement_price,
1883                        intent.settlement_value,
1884                        intent.margin_mode,
1885                        intent.event_ts_ms,
1886                        intent.settlement_entry_price,
1887                        intent.cost_basis,
1888                        intent.net_pnl,
1889                    ) {
1890                        Ok(outcome) => outcome,
1891                        Err(error) => {
1892                            panic!(
1893                                "CRITICAL_FAILURE: Failed to apply durable replayed settlement for {}/{}: {}. \
1894                                 Recovery cannot persist settlement cash replay.",
1895                                intent.wallet, intent.symbol, error
1896                            );
1897                        }
1898                    };
1899                    debug!(
1900                        wallet = %intent.wallet,
1901                        symbol = %intent.symbol,
1902                        newly_persisted = outcome.newly_persisted,
1903                        "Applied replayed TickExpiry settlement effect"
1904                    );
1905                }
1906                crate::rsm::apply::ExpiryEffect::UpdateInstrumentStatus { symbols, status } => {
1907                    let Some(handler) = self.ctx.db.as_ref() else {
1908                        panic!(
1909                            "CRITICAL_FAILURE: Missing database handler for replayed instrument status update {} on {:?}. \
1910                             Recovery cannot persist expiry status projection.",
1911                            status, symbols
1912                        );
1913                    };
1914                    let update_result = if status == "EXPIRED_PENDING_PRICE" {
1915                        handler.transition_active_instruments_to_expired_pending_sync(symbols)
1916                    } else {
1917                        handler.update_instrument_status_sync(symbols, status)
1918                    };
1919                    if let Err(error) = update_result {
1920                        panic!(
1921                            "CRITICAL_FAILURE: Failed to persist replayed instrument status {} for {:?}: {}. \
1922                             Recovery cannot persist expiry status projection.",
1923                            status, symbols, error
1924                        );
1925                    }
1926                }
1927                crate::rsm::apply::ExpiryEffect::BatchCancelOrdersForSettlement {
1928                    order_ids,
1929                    now_ms,
1930                } => {
1931                    let Some(handler) = self.ctx.db.as_ref() else {
1932                        panic!(
1933                            "CRITICAL_FAILURE: Missing database handler for replayed settlement order cancels at {}. \
1934                             Recovery cannot persist expiry order projection.",
1935                            now_ms
1936                        );
1937                    };
1938                    for chunk in order_ids.chunks(5000) {
1939                        if let Err(error) =
1940                            handler.batch_cancel_orders_for_settlement_sync(chunk, *now_ms as i64)
1941                        {
1942                            panic!(
1943                                "CRITICAL_FAILURE: Failed to persist {} replayed settlement order cancels: {}. \
1944                                 Recovery cannot persist expiry order projection.",
1945                                chunk.len(),
1946                                error
1947                            );
1948                        }
1949                    }
1950                }
1951                crate::rsm::apply::ExpiryEffect::CancelOrphanedOrdersBySymbols { symbols } => {
1952                    let Some(handler) = self.ctx.db.as_ref() else {
1953                        panic!(
1954                            "CRITICAL_FAILURE: Missing database handler for replayed orphaned order cleanup on {:?}. \
1955                             Recovery cannot persist expiry order projection.",
1956                            symbols
1957                        );
1958                    };
1959                    if let Err(error) = handler.cancel_orphaned_orders_by_symbols_sync(symbols) {
1960                        panic!(
1961                            "CRITICAL_FAILURE: Failed replayed settlement orphan cleanup for {:?}: {}. \
1962                             Recovery cannot persist expiry order projection.",
1963                            symbols, error
1964                        );
1965                    }
1966                }
1967            }
1968        }
1969
1970        Ok(())
1971    }
1972
1973    fn observe_standby_settlement_with_retry(
1974        handler: &hypercall_db_diesel::DatabaseHandler,
1975        intent: &crate::rsm::apply::ExpirySettlementIntent,
1976    ) -> Result<hypercall_db::SettlementResult, anyhow::Error> {
1977        let mut backoff_ms = STANDBY_SETTLEMENT_OBSERVE_INITIAL_BACKOFF_MS;
1978        let mut last_error = None;
1979
1980        for attempt in 1..=STANDBY_SETTLEMENT_OBSERVE_ATTEMPTS {
1981            match handler.observe_applied_settlement_sync(
1982                &intent.wallet,
1983                &intent.symbol,
1984                intent.position_size,
1985                intent.settlement_price,
1986                intent.settlement_value,
1987                intent.margin_mode,
1988                intent.settlement_entry_price,
1989                intent.cost_basis,
1990                intent.net_pnl,
1991            ) {
1992                Ok(outcome) => return Ok(outcome),
1993                Err(error) => {
1994                    if attempt == STANDBY_SETTLEMENT_OBSERVE_ATTEMPTS {
1995                        last_error = Some(error);
1996                        break;
1997                    }
1998                    warn!(
1999                        wallet = %intent.wallet,
2000                        symbol = %intent.symbol,
2001                        attempt,
2002                        max_attempts = STANDBY_SETTLEMENT_OBSERVE_ATTEMPTS,
2003                        error = %error,
2004                        "Standby settlement observation missed durable row; retrying"
2005                    );
2006                    last_error = Some(error);
2007                    std::thread::sleep(Duration::from_millis(backoff_ms));
2008                    backoff_ms = (backoff_ms * 2).min(STANDBY_SETTLEMENT_OBSERVE_MAX_BACKOFF_MS);
2009                }
2010            }
2011        }
2012
2013        Err(last_error.expect("standby settlement observation must record a final error"))
2014    }
2015
2016    fn reconcile_settlement_balance(
2017        &mut self,
2018        intent: &crate::rsm::apply::ExpirySettlementIntent,
2019        outcome: hypercall_db::SettlementResult,
2020    ) {
2021        debug!(
2022            wallet = %intent.wallet,
2023            symbol = %intent.symbol,
2024            balance_after = %self.ctx.balance_ledger.balance(&intent.wallet),
2025            newly_persisted = outcome.newly_persisted,
2026            "Observed durable TickExpiry settlement outcome without mutating engine cash"
2027        );
2028    }
2029
2030    /// Flush the journal batcher, ensuring all pending entries are committed to DB.
2031    async fn flush_journal(&self) {
2032        if let Some(ref sender) = self.journal_batch_sender {
2033            let (ack_tx, ack_rx) = tokio::sync::oneshot::channel();
2034            if let Err(e) = sender
2035                .send(crate::journal::JournalMessage::Flush(ack_tx))
2036                .await
2037            {
2038                error!("Failed to send flush to journal batcher: {}", e);
2039                return;
2040            }
2041            match ack_rx.await {
2042                Ok(()) => {
2043                    debug!("Journal batcher flush completed");
2044                }
2045                Err(_) => {
2046                    error!("Journal batcher flush ack channel dropped");
2047                }
2048            }
2049        }
2050    }
2051
2052    async fn hydrate_runtime_state_from_dependencies(
2053        &mut self,
2054        context: &'static str,
2055        hydrate_market_data: bool,
2056    ) {
2057        // Seed engine_positions from PortfolioService on startup.
2058        // After startup, positions are updated from fills inside apply().
2059        if let Some(ref portfolio_service) = self.ctx.deps.portfolio_service {
2060            let all = portfolio_service.all_portfolios().await;
2061            let previous_count = self.ctx.engine_positions.len();
2062            self.ctx.engine_positions.clear();
2063
2064            let mut count = 0usize;
2065            for (wallet, balance) in all {
2066                for (symbol, pos) in balance.positions {
2067                    if pos.amount != rust_decimal::Decimal::ZERO {
2068                        self.ctx.engine_positions.insert(
2069                            (wallet, symbol),
2070                            crate::rsm::engine_deps::EnginePosition {
2071                                quantity: pos.amount,
2072                                entry_price: pos.entry_price,
2073                            },
2074                        );
2075                        count += 1;
2076                    }
2077                }
2078            }
2079            if count > 0 || previous_count > 0 {
2080                info!(
2081                    context,
2082                    previous_count, count, "Reconciled engine positions from PortfolioService"
2083                );
2084            }
2085        }
2086
2087        if hydrate_market_data {
2088            self.ingest_price_updates_without_side_effects().await;
2089            self.ingest_iv_updates_without_side_effects().await;
2090        }
2091    }
2092
2093    pub(crate) async fn hydrate_standby_base_state(&mut self) {
2094        self.hydrate_runtime_state_from_dependencies("standby_startup", !self.runtime_quiesced)
2095            .await;
2096        self.publish_snapshot();
2097        info!(
2098            positions = self.ctx.engine_positions.len(),
2099            cash_wallets = self.ctx.balance_ledger.len(),
2100            spot_prices = self.ctx.spot_prices.len(),
2101            iv_surfaces = self.ctx.iv_surfaces.len(),
2102            "Hydrated standby base engine state"
2103        );
2104    }
2105
2106    pub(crate) async fn hydrate_primary_base_state(&mut self) {
2107        self.hydrate_runtime_state_from_dependencies("startup", false)
2108            .await;
2109    }
2110
2111    pub(crate) async fn apply_startup_replayed_events(&mut self) {
2112        if self.startup_replayed_events.is_empty() {
2113            return;
2114        }
2115
2116        let batches = std::mem::take(&mut self.startup_replayed_events);
2117        let event_count = batches
2118            .iter()
2119            .map(|(_, events)| events.len())
2120            .sum::<usize>();
2121        for (req_id, events) in batches {
2122            self.apply_startup_replayed_events_sync(&events, &req_id)
2123                .await;
2124        }
2125        info!(
2126            event_count,
2127            "Applied startup-replayed TickExpiry event projections"
2128        );
2129    }
2130
2131    /// Persist engine state snapshot to disk for fast restart recovery.
2132    ///
2133    /// Reads the current WAL checkpoint and serializes the engine's in-memory
2134    /// state (orders, positions, balances) so the next startup can skip full
2135    /// replay from Postgres. Only runs when ENGINE_JOURNAL_WAL_PATH is set.
2136    pub(super) fn persist_engine_state_snapshot(&self) -> Option<i64> {
2137        let wal_path_configured = hypercall_journal::checkpoint::wal_path_is_explicitly_configured(
2138            self.wal_path.as_ref(),
2139        );
2140        if !wal_path_configured || self.ctx.db.is_none() {
2141            return None;
2142        }
2143
2144        use crate::rsm::engine_state_snapshot::{snapshot_path_from_wal_path, write_snapshot};
2145        use crate::rsm::restart_components::{
2146            EngineRecoveryCapture, PersistentEngineStateComponent,
2147        };
2148        use hypercall_journal::checkpoint::{
2149            checkpoint_path_for, read_checkpoint, wal_path_from_config,
2150        };
2151        use hypercall_recovery::RestartStateComponent;
2152
2153        let wal_path = wal_path_from_config(self.wal_path.as_ref());
2154        let checkpoint_path = checkpoint_path_for(&wal_path);
2155        match read_checkpoint(&checkpoint_path) {
2156            Ok(checkpoint) if checkpoint.last_command_id > 0 => {
2157                let snapshot_path = snapshot_path_from_wal_path(&wal_path);
2158                let snapshot = PersistentEngineStateComponent::capture(
2159                    &EngineRecoveryCapture::for_snapshot(&self.ctx, &checkpoint),
2160                );
2161                if let Err(e) = write_snapshot(&snapshot_path, &snapshot) {
2162                    warn!("Failed to write engine state snapshot: {}", e);
2163                    None
2164                } else {
2165                    if let Some(ref handler) = self.ctx.db {
2166                        if let Err(e) =
2167                            handler.update_snapshot_boundary_sync(checkpoint.last_command_id)
2168                        {
2169                            warn!("Failed to persist snapshot boundary to Postgres: {}", e);
2170                            return None;
2171                        }
2172                    }
2173                    info!(
2174                        "Persisted engine state snapshot: last_command_id={}",
2175                        checkpoint.last_command_id
2176                    );
2177                    Some(checkpoint.last_command_id)
2178                }
2179            }
2180            Ok(_) => {
2181                // last_command_id == 0: no commands replicated yet, skip snapshot
2182                None
2183            }
2184            Err(e) => {
2185                panic!(
2186                    "CRITICAL_FAILURE: failed to read checkpoint for snapshot write ({}): {}",
2187                    checkpoint_path.display(),
2188                    e
2189                );
2190            }
2191        }
2192    }
2193
2194    fn current_wal_checkpoint_command_id(&self) -> i64 {
2195        let wal_path_configured = hypercall_journal::checkpoint::wal_path_is_explicitly_configured(
2196            self.wal_path.as_ref(),
2197        );
2198        if !wal_path_configured {
2199            return self.replay_checkpoint.last_command_id;
2200        }
2201
2202        let wal_path = hypercall_journal::checkpoint::wal_path_from_config(self.wal_path.as_ref());
2203        let checkpoint_path = hypercall_journal::checkpoint::checkpoint_path_for(&wal_path);
2204        match hypercall_journal::checkpoint::read_checkpoint(&checkpoint_path) {
2205            Ok(checkpoint) => checkpoint.last_command_id,
2206            Err(error) => {
2207                warn!(
2208                    path = %checkpoint_path.display(),
2209                    error = %error,
2210                    "Failed to read current WAL checkpoint for quiesce report"
2211                );
2212                self.replay_checkpoint.last_command_id
2213            }
2214        }
2215    }
2216
2217    async fn apply_runtime_balance_update(
2218        &mut self,
2219        env: crate::rsm::apply::CommandEnvelope,
2220        applied_tx: Option<tokio::sync::oneshot::Sender<Result<(), String>>>,
2221        wallet: WalletAddress,
2222        amount: Decimal,
2223        update_kind: &'static str,
2224        last_read_snapshot: &mut Instant,
2225        journal_request_id: String,
2226        outbox_appends: Vec<crate::directive_outbox::DirectiveOutboxAppend>,
2227    ) {
2228        Self::require_external_durable_mutation_uuid(update_kind, &journal_request_id);
2229        match self.apply(env.clone()) {
2230            Ok(output) => {
2231                let output_is_empty = output.is_empty();
2232                let balance_updates = output.balance_updates;
2233                if balance_updates.is_empty()
2234                    && output_is_empty
2235                    && Self::is_idempotent_empty_balance_update(update_kind)
2236                {
2237                    debug!(
2238                        wallet = %wallet,
2239                        amount = %amount,
2240                        update_kind = update_kind,
2241                        "Observed duplicate balance update with no new balance effects"
2242                    );
2243                    if let Some(tx) = applied_tx {
2244                        let _ = tx.send(Ok(()));
2245                    }
2246                    return;
2247                }
2248                assert!(
2249                    !balance_updates.is_empty(),
2250                    "RUNTIME_INVARIANT: {} applied without canonical balance_updates",
2251                    update_kind
2252                );
2253                self.journal_external_balance_update_command(
2254                    &env,
2255                    &journal_request_id,
2256                    outbox_appends,
2257                    balance_updates.clone(),
2258                )
2259                .await;
2260                self.publish_to_nats(&env).await;
2261                self.publish_balance_updates_to_nats(&balance_updates).await;
2262                self.publish_snapshot();
2263                *last_read_snapshot = Instant::now();
2264                if let Some(tx) = applied_tx {
2265                    let _ = tx.send(Ok(()));
2266                }
2267            }
2268            Err(e) => {
2269                let err = e.to_string();
2270                if let Some(tx) = applied_tx {
2271                    let _ = tx.send(Err(err.clone()));
2272                }
2273                panic!(
2274                    "RUNTIME_INVARIANT: failed to apply {} balance_ledger update for {} amount {} before publish: {}",
2275                    update_kind, wallet, amount, err
2276                );
2277            }
2278        }
2279    }
2280
2281    fn is_idempotent_empty_balance_update(update_kind: &str) -> bool {
2282        matches!(update_kind, "DepositUpdate" | "LiquidationBonusUpdate")
2283    }
2284
2285    /// Start the engine event loop
2286    pub async fn start(mut self) {
2287        info!("Unified engine starting with SPAN calculations...");
2288
2289        self.apply_startup_replayed_events().await;
2290        self.hydrate_primary_base_state().await;
2291
2292        // Start MMP automatic eviction if cache is available
2293        if let Some(ref mmp_cache) = self.ctx.deps.mmp_cache {
2294            mmp_cache.start();
2295            info!("MMP cache automatic eviction started");
2296        }
2297
2298        // Create expiry check interval (check every minute)
2299        let mut expiry_interval = tokio::time::interval(Duration::from_secs(60));
2300
2301        // Create price update interval (feed spot prices into command stream every 2s)
2302        let mut price_update_interval = tokio::time::interval(Duration::from_secs(2));
2303
2304        // Hydrate prices before accepting orders so margin checks don't
2305        // reject during the startup window before the first periodic tick.
2306        // If the process is starting under a persisted admin drain marker,
2307        // preserve the snapshot/replay state until /admin/undrain resumes
2308        // live external ingestion.
2309        if !self.runtime_quiesced {
2310            self.ingest_price_updates().await;
2311            self.ingest_iv_updates().await;
2312        } else {
2313            info!("Unified engine starting quiesced; startup market ingestion is paused");
2314        }
2315
2316        // Settle any already-expired instruments synchronously before accepting
2317        // orders. Without this, expired positions linger for up to 60s (the
2318        // expiry tick interval), causing portfolio cache repricing errors and
2319        // the engine rejecting orders on expired chains.
2320        if !self.runtime_quiesced {
2321            let now_ms = get_timestamp_millis();
2322            let env = match self.prepare_tick_expiry_env(now_ms).await {
2323                Ok(env) => env,
2324                Err(e) => {
2325                    panic!(
2326                        "CRITICAL_FAILURE: Startup expiry preparation failed: {}. \
2327                         Expiry settlement cannot continue without explicit context.",
2328                        e
2329                    );
2330                }
2331            };
2332            let journal_env = env.clone();
2333            match self.apply(env) {
2334                Ok(output) => {
2335                    self.journal_replay_owned_expiry_command(&journal_env, &output.balance_updates)
2336                        .await;
2337                    if let Err(e) = self
2338                        .apply_expiry_effects_and_events(&output, "startup-tick-expiry")
2339                        .await
2340                    {
2341                        panic!(
2342                            "CRITICAL_FAILURE: Startup expiry effects failed: {}. \
2343                             Expiry runtime effects could not be applied after memory state advanced.",
2344                            e
2345                        );
2346                    }
2347                    self.publish_tick_expiry_balance_updates(&output, "startup-tick-expiry")
2348                        .await;
2349                    info!(
2350                        "Startup expiry check complete: {} events",
2351                        output.events.len()
2352                    );
2353                }
2354                Err(e) => {
2355                    panic!(
2356                        "CRITICAL_FAILURE: apply() failed for startup TickExpiry: {}. \
2357                         Expiry state could not be advanced deterministically.",
2358                        e
2359                    );
2360                }
2361            }
2362            self.publish_snapshot();
2363        } else {
2364            info!("Unified engine starting quiesced; startup expiry check is paused");
2365        }
2366
2367        // Spawn periodic snapshot task
2368        let mut snapshot_interval = tokio::time::interval(self.snapshot_interval);
2369        let post_startup_reconcile = tokio::time::sleep(self.post_startup_reconcile_delay);
2370        tokio::pin!(post_startup_reconcile);
2371
2372        let mut last_read_snapshot = Instant::now();
2373
2374        // Reuse Instant variables and histogram handles to avoid repeated lookups
2375        let mut loop_start;
2376        let mut market_start;
2377        let mut expiry_start;
2378        let mut snapshot_start;
2379
2380        // Get histogram handles once (they're cheap references into the global registry)
2381        let loop_iteration_hist = histogram!("ht_engine_loop_iteration_seconds");
2382        let market_request_hist = histogram!("ht_engine_market_request_seconds");
2383        let expiry_check_hist = histogram!("ht_engine_expiry_check_seconds");
2384        let snapshot_hist = histogram!("ht_engine_snapshot_seconds");
2385
2386        loop {
2387            loop_start = Instant::now();
2388            select! {
2389                Some(request) = async {
2390                    match self.quiesce_receiver.as_mut() {
2391                        Some(rx) => rx.recv().await,
2392                        None => std::future::pending().await,
2393                    }
2394                } => {
2395                    self.handle_quiesce_request(request).await;
2396                }
2397
2398                // Handle incoming orders
2399                Some(request) = self.order_receiver.recv(), if !self.runtime_quiesced => {
2400                    DurableJournaling::process_order_journaled(&mut self, request).await;
2401                    if last_read_snapshot.elapsed() >= self.read_snapshot_interval {
2402                        self.publish_snapshot();
2403                        last_read_snapshot = Instant::now();
2404                    }
2405                }
2406
2407                // Handle RFQ execution requests
2408                Some(request) = async {
2409                    match self.rfq_receiver.as_mut() {
2410                        Some(rx) => rx.recv().await,
2411                        None => std::future::pending().await,
2412                    }
2413                }, if !self.runtime_quiesced => {
2414                    self.handle_rfq_execute(request).await;
2415                    if last_read_snapshot.elapsed() >= self.read_snapshot_interval {
2416                        self.publish_snapshot();
2417                        last_read_snapshot = Instant::now();
2418                    }
2419                }
2420
2421                // Handle deposit requests (faucet/admin)
2422                Some(deposit) = async {
2423                    match self.deposit_receiver.as_mut() {
2424                        Some(rx) => rx.recv().await,
2425                        None => std::future::pending().await,
2426                    }
2427                } => {
2428                    if self.runtime_quiesced {
2429                        if let Some(tx) = deposit.applied_tx {
2430                            let _ = tx.send(Err("engine is quiesced".to_string()));
2431                        }
2432                        continue;
2433                    }
2434                    let wallet = deposit.wallet;
2435                    let amount = deposit.amount;
2436                    let source_event_hash = deposit.source_event_hash;
2437                    let journal_request_id = deposit.journal_request_id;
2438                    let outbox_appends = deposit.outbox_appends;
2439                    let applied_tx = deposit.applied_tx;
2440                    let env = crate::rsm::apply::CommandEnvelope::new(
2441                        deposit.timestamp_ms,
2442                        crate::rsm::apply::EngineCommand::DepositUpdate {
2443                            wallet,
2444                            amount,
2445                            timestamp_ms: deposit.timestamp_ms,
2446                            sequence: deposit.sequence,
2447                            source_event_hash,
2448                        },
2449                    );
2450                    self.apply_runtime_balance_update(
2451                        env,
2452                        applied_tx,
2453                        wallet,
2454                        amount,
2455                        "DepositUpdate",
2456                        &mut last_read_snapshot,
2457                        journal_request_id,
2458                        outbox_appends,
2459                    )
2460                    .await;
2461                }
2462
2463                // Handle option-token deposits observed on-chain.
2464                Some(deposit) = async {
2465                    match self.option_deposit_receiver.as_mut() {
2466                        Some(rx) => rx.recv().await,
2467                        None => std::future::pending().await,
2468                    }
2469                } => {
2470                    if self.runtime_quiesced {
2471                        if let Some(tx) = deposit.applied_tx {
2472                            let _ = tx.send(Err("engine is quiesced".to_string()));
2473                        }
2474                        continue;
2475                    }
2476                    let wallet = deposit.wallet;
2477                    let symbol = deposit.symbol;
2478                    let quantity = deposit.quantity;
2479                    let request_id = deposit.request_id;
2480                    let timestamp_ms = deposit.timestamp_ms;
2481                    let applied_tx = deposit.applied_tx;
2482                    if self
2483                        .external_option_command_already_journaled(
2484                            &request_id,
2485                            "OptionDepositUpdate",
2486                        )
2487                        .await
2488                    {
2489                        if let Some(tx) = applied_tx {
2490                            let _ = tx.send(Ok(()));
2491                        }
2492                        continue;
2493                    }
2494                    let env = crate::rsm::apply::CommandEnvelope::new(
2495                        timestamp_ms,
2496                        crate::rsm::apply::EngineCommand::OptionDepositUpdate {
2497                            request_id: request_id.clone(),
2498                            wallet,
2499                            symbol: symbol.clone(),
2500                            quantity,
2501                            timestamp_ms,
2502                        },
2503                    );
2504                    match self.apply(env.clone()) {
2505                        Ok(_) => {
2506                            self
2507                                .journal_external_option_position_command(
2508                                    &env,
2509                                    &request_id,
2510                                    hypercall_db_diesel::engine_enums::CommandType::OptionDepositUpdate,
2511                                    Vec::new(),
2512                                )
2513                                .await;
2514                            if let Some(portfolio_cache) = &self.ctx.deps.portfolio_cache {
2515                                portfolio_cache
2516                                    .handle_option_custody_delta(
2517                                        wallet,
2518                                        symbol.clone(),
2519                                        quantity,
2520                                        None,
2521                                    )
2522                                    .await;
2523                            }
2524                            self.publish_to_nats(&env).await;
2525                            self.publish_snapshot();
2526                            last_read_snapshot = Instant::now();
2527                            if let Some(tx) = applied_tx {
2528                                let _ = tx.send(Ok(()));
2529                            }
2530                        }
2531                        Err(e) => {
2532                            let err = e.to_string();
2533                            if let Some(tx) = applied_tx {
2534                                let _ = tx.send(Err(err.clone()));
2535                            }
2536                            panic!(
2537                                "RUNTIME_INVARIANT: failed to apply OptionDepositUpdate for {} {} quantity {} before publish: {}",
2538                                wallet, symbol, quantity, err
2539                            );
2540                        }
2541                    }
2542                }
2543
2544                Some(withdrawal) = async {
2545                    match self.option_withdrawal_receiver.as_mut() {
2546                        Some(rx) => rx.recv().await,
2547                        None => std::future::pending().await,
2548                    }
2549                } => {
2550                    if self.runtime_quiesced {
2551                        if let Some(tx) = withdrawal.applied_tx {
2552                            let _ = tx.send(Err("engine is quiesced".to_string()));
2553                        }
2554                        continue;
2555                    }
2556                    let wallet = withdrawal.wallet;
2557                    let account = withdrawal.account;
2558                    let signer = withdrawal.signer;
2559                    let rsm_signer = withdrawal.rsm_signer;
2560                    let symbol = withdrawal.symbol;
2561                    let quantity = withdrawal.quantity;
2562                    let nonce = withdrawal.nonce;
2563                    let action = withdrawal.action;
2564                    let request_id = withdrawal.request_id;
2565                    let timestamp_ms = withdrawal.timestamp_ms;
2566                    let applied_tx = withdrawal.applied_tx;
2567                    if self
2568                        .external_option_command_already_journaled(
2569                            &request_id,
2570                            "OptionWithdrawalUpdate",
2571                        )
2572                        .await
2573                    {
2574                        let persisted_status = self
2575                            .persisted_withdrawal_directive_status(&request_id)
2576                            .unwrap_or_else(|error| {
2577                                panic!(
2578                                    "JOURNAL_FATAL: failed to load persisted OptionWithdrawalUpdate status for {}: {}",
2579                                    request_id, error
2580                                )
2581                            });
2582                        let (domain_status, delivery_status) = persisted_status.unwrap_or((
2583                            crate::directive_outbox::DirectiveDomainStatus::PendingChainEffect,
2584                            crate::directive_outbox::DirectiveDeliveryStatus::Pending,
2585                        ));
2586                        let receipt = crate::rsm::unified_engine::OptionWithdrawalApplyReceipt {
2587                            directive_id: request_id.clone(),
2588                            domain_status,
2589                            delivery_status,
2590                        };
2591                        if let Some(tx) = applied_tx {
2592                            let _ = tx.send(Ok(receipt));
2593                        }
2594                        continue;
2595                    }
2596                    if let Some(err) = self.withdrawal_directive_journal_unavailable_error(
2597                        "OptionWithdrawalUpdate",
2598                        &request_id,
2599                    ) {
2600                        warn!("{}", err);
2601                        if let Some(tx) = applied_tx {
2602                            let _ = tx.send(Err(err));
2603                        }
2604                        continue;
2605                    }
2606                    let env = crate::rsm::apply::CommandEnvelope::new(
2607                        timestamp_ms,
2608                        crate::rsm::apply::EngineCommand::OptionWithdrawalUpdate {
2609                            request_id: request_id.clone(),
2610                            wallet,
2611                            account,
2612                            signer,
2613                            rsm_signer,
2614                            symbol: symbol.clone(),
2615                            quantity,
2616                            nonce: Some(nonce),
2617                            action,
2618                            timestamp_ms,
2619                        },
2620                    );
2621                    match self.apply(env.clone()) {
2622                        Ok(output) => {
2623                            let outbox_appends = output.outbox_appends;
2624                            let receipt = outbox_appends
2625                                .first()
2626                                .map(|append| {
2627                                    crate::rsm::unified_engine::OptionWithdrawalApplyReceipt {
2628                                        directive_id: append.directive_id.clone(),
2629                                        domain_status: append.domain_status.clone(),
2630                                        delivery_status: append.delivery_status.clone(),
2631                                    }
2632                                })
2633                                .unwrap_or_else(|| {
2634                                    crate::rsm::unified_engine::OptionWithdrawalApplyReceipt {
2635                                        directive_id: request_id.clone(),
2636                                        domain_status: crate::directive_outbox::DirectiveDomainStatus::PendingChainEffect,
2637                                        delivery_status: crate::directive_outbox::DirectiveDeliveryStatus::Pending,
2638                                    }
2639                                });
2640                            self
2641                                .journal_external_option_position_command(
2642                                    &env,
2643                                    &request_id,
2644                                    hypercall_db_diesel::engine_enums::CommandType::OptionWithdrawalUpdate,
2645                                    outbox_appends,
2646                                )
2647                                .await;
2648                            if let Some(portfolio_cache) = &self.ctx.deps.portfolio_cache {
2649                                portfolio_cache
2650                                    .handle_option_custody_delta(
2651                                        wallet,
2652                                        symbol.clone(),
2653                                        -quantity,
2654                                        None,
2655                                    )
2656                                    .await;
2657                            }
2658                            self.publish_to_nats(&env).await;
2659                            self.publish_snapshot();
2660                            last_read_snapshot = Instant::now();
2661                            if let Some(tx) = applied_tx {
2662                                let _ = tx.send(Ok(receipt));
2663                            }
2664                        }
2665                        Err(e) => {
2666                            let err = e.to_string();
2667                            if let Some(tx) = applied_tx {
2668                                let _ = tx.send(Err(err.clone()));
2669                            }
2670                        }
2671                    }
2672                }
2673
2674                // Handle USDC cash withdrawals through engine-owned state.
2675                Some(withdrawal) = async {
2676                    match self.cash_withdrawal_receiver.as_mut() {
2677                        Some(rx) => rx.recv().await,
2678                        None => std::future::pending().await,
2679                    }
2680                } => {
2681                    if self.runtime_quiesced {
2682                        if let Some(tx) = withdrawal.applied_tx {
2683                            let _ = tx.send(Err("engine is quiesced".to_string()));
2684                        }
2685                        continue;
2686                    }
2687                    let wallet = withdrawal.wallet;
2688                    let account = withdrawal.account;
2689                    let destination = withdrawal.destination;
2690                    let signer = withdrawal.signer;
2691                    let rsm_signer = withdrawal.rsm_signer;
2692                    let amount = withdrawal.amount;
2693                    let amount_wei = withdrawal.amount_wei;
2694                    let nonce = withdrawal.nonce;
2695                    let request_id = withdrawal.request_id;
2696                    let timestamp_ms = withdrawal.timestamp_ms;
2697                    let applied_tx = withdrawal.applied_tx;
2698                    if self
2699                        .external_option_command_already_journaled(
2700                            &request_id,
2701                            "CashWithdrawalUpdate",
2702                        )
2703                        .await
2704                    {
2705                        let persisted_status = self
2706                            .persisted_withdrawal_directive_status(&request_id)
2707                            .unwrap_or_else(|error| {
2708                                panic!(
2709                                    "JOURNAL_FATAL: failed to load persisted CashWithdrawalUpdate status for {}: {}",
2710                                    request_id, error
2711                                )
2712                            });
2713                        let (domain_status, delivery_status) = persisted_status.unwrap_or((
2714                            crate::directive_outbox::DirectiveDomainStatus::PendingChainEffect,
2715                            crate::directive_outbox::DirectiveDeliveryStatus::Pending,
2716                        ));
2717                        let receipt = crate::rsm::unified_engine::CashWithdrawalApplyReceipt {
2718                            directive_id: request_id.clone(),
2719                            domain_status,
2720                            delivery_status,
2721                            balance_after: self.ctx.balance_ledger.balance(&wallet),
2722                        };
2723                        if let Some(tx) = applied_tx {
2724                            let _ = tx.send(Ok(receipt));
2725                        }
2726                        continue;
2727                    }
2728                    if let Some(err) = self.withdrawal_directive_journal_unavailable_error(
2729                        "CashWithdrawalUpdate",
2730                        &request_id,
2731                    ) {
2732                        warn!("{}", err);
2733                        if let Some(tx) = applied_tx {
2734                            let _ = tx.send(Err(err));
2735                        }
2736                        continue;
2737                    }
2738                    let env = crate::rsm::apply::CommandEnvelope::new(
2739                        timestamp_ms,
2740                        crate::rsm::apply::EngineCommand::CashWithdrawalUpdate {
2741                            request_id: request_id.clone(),
2742                            wallet,
2743                            account,
2744                            destination,
2745                            signer,
2746                            rsm_signer,
2747                            amount,
2748                            amount_wei,
2749                            nonce: Some(nonce),
2750                            timestamp_ms,
2751                        },
2752                    );
2753                    match self.apply(env.clone()) {
2754                        Ok(output) => {
2755                            let balance_after = self.ctx.balance_ledger.balance(&wallet);
2756                            let outbox_appends = output.outbox_appends;
2757                            let balance_updates = output.balance_updates;
2758                            assert!(
2759                                !balance_updates.is_empty(),
2760                                "RUNTIME_INVARIANT: CashWithdrawalUpdate applied without canonical balance_updates"
2761                            );
2762                            let receipt = outbox_appends
2763                                .first()
2764                                .map(|append| {
2765                                    crate::rsm::unified_engine::CashWithdrawalApplyReceipt {
2766                                        directive_id: append.directive_id.clone(),
2767                                        domain_status: append.domain_status.clone(),
2768                                        delivery_status: append.delivery_status.clone(),
2769                                        balance_after,
2770                                    }
2771                                })
2772                                .unwrap_or_else(|| {
2773                                    crate::rsm::unified_engine::CashWithdrawalApplyReceipt {
2774                                        directive_id: request_id.clone(),
2775                                        domain_status: crate::directive_outbox::DirectiveDomainStatus::PendingChainEffect,
2776                                        delivery_status: crate::directive_outbox::DirectiveDeliveryStatus::Pending,
2777                                        balance_after,
2778                                    }
2779                                });
2780                            self.journal_external_cash_withdrawal_command(
2781                                &env,
2782                                &request_id,
2783                                outbox_appends,
2784                                wallet,
2785                                amount,
2786                                balance_after,
2787                                timestamp_ms,
2788                                balance_updates.clone(),
2789                            )
2790                            .await;
2791                            self.publish_to_nats(&env).await;
2792                            self.publish_balance_updates_to_nats(&balance_updates).await;
2793                            self.publish_snapshot();
2794                            last_read_snapshot = Instant::now();
2795                            if let Some(tx) = applied_tx {
2796                                let _ = tx.send(Ok(receipt));
2797                            }
2798                        }
2799                        Err(e) => {
2800                            let err = e.to_string();
2801                            if let Some(tx) = applied_tx {
2802                                let _ = tx.send(Err(err));
2803                            }
2804                        }
2805                    }
2806                }
2807
2808                // Handle liquidation bonus credits after durable DB persistence.
2809                Some(bonus) = async {
2810                    match self.liquidation_bonus_receiver.as_mut() {
2811                        Some(rx) => rx.recv().await,
2812                        None => std::future::pending().await,
2813                    }
2814                } => {
2815                    if self.runtime_quiesced {
2816                        if let Some(tx) = bonus.applied_tx {
2817                            let _ = tx.send(Err("engine is quiesced".to_string()));
2818                        }
2819                        continue;
2820                    }
2821                    let wallet = bonus.wallet;
2822                    let amount = bonus.amount;
2823                    let balance_after = self.ctx.balance_ledger.balance(&wallet) + amount;
2824                    let request_id = bonus.request_id;
2825                    let applied_tx = bonus.applied_tx;
2826                    let env = crate::rsm::apply::CommandEnvelope::new(
2827                        bonus.timestamp_ms,
2828                        crate::rsm::apply::EngineCommand::LiquidationBonusUpdate {
2829                            wallet,
2830                            amount,
2831                            balance_after,
2832                            timestamp_ms: bonus.timestamp_ms,
2833                            sequence: bonus.sequence,
2834                        },
2835                    );
2836                    self.apply_runtime_balance_update(
2837                        env,
2838                        applied_tx,
2839                        wallet,
2840                        amount,
2841                        "LiquidationBonusUpdate",
2842                        &mut last_read_snapshot,
2843                        request_id,
2844                        Vec::new(),
2845                    ).await;
2846                }
2847
2848                // Handle margin-mode changes through apply() so order admission reads
2849                // the same engine-owned state that is snapshotted and replicated.
2850                Some(update) = async {
2851                    match self.margin_mode_receiver.as_mut() {
2852                        Some(rx) => rx.recv().await,
2853                        None => std::future::pending().await,
2854                    }
2855                } => {
2856                    if self.runtime_quiesced {
2857                        let _ = update.applied_tx.send(Err("engine is quiesced".to_string()));
2858                        continue;
2859                    }
2860                    let wallet = update.wallet;
2861                    let margin_mode = update.margin_mode;
2862                    let applied_tx = update.applied_tx;
2863                    let env = crate::rsm::apply::CommandEnvelope::new(
2864                        update.timestamp_ms,
2865                        crate::rsm::apply::EngineCommand::LegacyTierMarginModeUpdate {
2866                            wallet,
2867                            margin_mode,
2868                        },
2869                    );
2870                    let nats_env = env.clone();
2871                    match self.apply(env) {
2872                        Ok(output) => {
2873                            self.publish_to_nats(&nats_env).await;
2874                            for event in &output.events {
2875                                self.ctx.deps.emit_event(event);
2876                            }
2877                            let _ = applied_tx.send(Ok(()));
2878                        }
2879                        Err(e) => {
2880                            let err = e.to_string();
2881                            warn!(
2882                                wallet = %wallet,
2883                                margin_mode = ?margin_mode,
2884                                error = %err,
2885                                "Failed to apply TierUpdate"
2886                            );
2887                            let _ = applied_tx.send(Err(err));
2888                        }
2889                    }
2890                }
2891
2892                // Handle agent authorization changes through apply()
2893                Some(request) = async {
2894                    match self.agent_auth_receiver.as_mut() {
2895                        Some(rx) => rx.recv().await,
2896                        None => std::future::pending().await,
2897                    }
2898                } => {
2899                    if self.runtime_quiesced {
2900                        let _ = request.applied_tx.send(Err("engine is quiesced".to_string()));
2901                        continue;
2902                    }
2903                    let wallet = request.wallet;
2904                    let agent = request.agent;
2905                    let applied_tx = request.applied_tx;
2906                    let expires_at_ms = request.expires_at_ms;
2907                    let nonce = request.nonce;
2908                    let ts = get_timestamp_millis();
2909                    let command = if request.approve {
2910                        crate::rsm::apply::EngineCommand::ApproveAgent { wallet, agent, expires_at_ms, nonce, timestamp_ms: ts }
2911                    } else {
2912                        crate::rsm::apply::EngineCommand::RevokeAgent { wallet, agent, nonce, timestamp_ms: ts }
2913                    };
2914                    let env = crate::rsm::apply::CommandEnvelope::new(
2915                        ts,
2916                        command,
2917                    );
2918                    if !self.agent_auth_journal_available() {
2919                        let _ = applied_tx.send(Err(
2920                            "agent authorization requires engine journaling to be enabled"
2921                                .to_string(),
2922                        ));
2923                        continue;
2924                    }
2925                    let nats_env = env.clone();
2926                    match self.apply(env) {
2927                        Ok(_) => {
2928                            self.journal_agent_auth_command(&nats_env).await;
2929                            self.publish_to_nats(&nats_env).await;
2930                            self.publish_snapshot();
2931                            last_read_snapshot = Instant::now();
2932                            let _ = applied_tx.send(Ok(()));
2933                        }
2934                        Err(e) => {
2935                            let _ = applied_tx.send(Err(e.to_string()));
2936                        }
2937                    }
2938                }
2939
2940                // Handle nonce-only checks (QP handshakes).
2941                Some(request) = async {
2942                    match self.nonce_check_receiver.as_mut() {
2943                        Some(rx) => rx.recv().await,
2944                        None => std::future::pending().await,
2945                    }
2946                } => {
2947                    if self.runtime_quiesced {
2948                        let _ = request.applied_tx.send(Err("engine is quiesced".to_string()));
2949                        continue;
2950                    }
2951                    let wallet = request.wallet;
2952                    let nonce = request.nonce;
2953                    let applied_tx = request.applied_tx;
2954                    let ts = get_timestamp_millis();
2955                    let env = crate::rsm::apply::CommandEnvelope::new(
2956                        ts,
2957                        crate::rsm::apply::EngineCommand::NonceAdvance { wallet, nonce, timestamp_ms: ts },
2958                    );
2959                    let nats_env = env.clone();
2960                    match self.apply(env) {
2961                        Ok(_) => {
2962                            self.publish_to_nats(&nats_env).await;
2963                            let _ = applied_tx.send(Ok(()));
2964                        }
2965                        Err(e) => {
2966                            let _ = applied_tx.send(Err(e.to_string()));
2967                        }
2968                    }
2969                }
2970
2971                // Handle tier updates that affect engine-owned admission state.
2972                Some(update) = async {
2973                    match self.tier_update_receiver.as_mut() {
2974                        Some(rx) => rx.recv().await,
2975                        None => std::future::pending().await,
2976                    }
2977                } => {
2978                    if self.runtime_quiesced {
2979                        if let Some(tx) = update.applied_tx {
2980                            let _ = tx.send(Err("engine is quiesced".to_string()));
2981                        }
2982                        continue;
2983                    }
2984                    let wallet = update.wallet;
2985                    let tier = update.tier;
2986                    let trading_limits = update.trading_limits;
2987                    let margin_mode = update.margin_mode;
2988                    let applied_tx = update.applied_tx;
2989                    let env = crate::rsm::apply::CommandEnvelope::new(
2990                        update.timestamp_ms,
2991                        crate::rsm::apply::EngineCommand::TierUpdate {
2992                            wallet,
2993                            margin_mode,
2994                            tier,
2995                            trading_limits,
2996                        },
2997                    );
2998                    match self.apply(env.clone()) {
2999                        Ok(_) => {
3000                            self.publish_to_nats(&env).await;
3001                            if let Some(tx) = applied_tx {
3002                                let _ = tx.send(Ok(()));
3003                            }
3004                        }
3005                        Err(e) => {
3006                            let err = e.to_string();
3007                            warn!(
3008                                wallet = %wallet,
3009                                error = %err,
3010                                "Failed to apply TierUpdate"
3011                            );
3012                            if let Some(tx) = applied_tx {
3013                                let _ = tx.send(Err(err));
3014                            }
3015                        }
3016                    }
3017                }
3018
3019                // Handle PM settlement admin commands.
3020                Some(request) = async {
3021                    match self.pm_settlement_admin_receiver.as_mut() {
3022                        Some(rx) => rx.recv().await,
3023                        None => std::future::pending().await,
3024                    }
3025                } => {
3026                    if self.runtime_quiesced {
3027                        let _ = request.applied_tx.send(Err("engine is quiesced".to_string()));
3028                        continue;
3029                    }
3030                    let timestamp_ms = get_timestamp_millis();
3031                    let command =
3032                        Self::stamp_pm_settlement_admin_command(request.command, timestamp_ms);
3033                    let request_uuid = Self::pm_settlement_request_id(&command)
3034                        .unwrap_or_else(|| {
3035                            panic!(
3036                                "RUNTIME_INVARIANT: PM settlement admin channel received unsupported command {}",
3037                                command.command_type()
3038                            )
3039                        });
3040                    let request_id = request_uuid.to_string();
3041                    let env = crate::rsm::apply::CommandEnvelope::new(timestamp_ms, command);
3042                    if matches!(
3043                        &env.command,
3044                        crate::rsm::apply::EngineCommand::RecordPmVaultDeposit(_)
3045                            | crate::rsm::apply::EngineCommand::RequestPmVaultWithdrawal(_)
3046                    ) && self
3047                        .external_option_command_already_journaled(
3048                            &request_id,
3049                            env.command.command_type(),
3050                        )
3051                        .await
3052                    {
3053                        let _ = request.applied_tx.send(Ok(()));
3054                        continue;
3055                    }
3056                    if let Err(error) = self.preflight_pm_settlement_admin_command(&env.command) {
3057                        let _ = request.applied_tx.send(Err(error));
3058                        continue;
3059                    }
3060                    self.journal_pm_settlement_admin_command(&env, request_uuid).await;
3061                    let output = self.apply(env.clone()).unwrap_or_else(|error| {
3062                        panic!(
3063                            "RUNTIME_INVARIANT: PM settlement command {} request_id {} passed preflight but apply rejected: {}",
3064                            env.command.command_type(),
3065                            request_id,
3066                            error
3067                        )
3068                    });
3069                    if let Err(error) = self.apply_pm_settlement_projection_effects_sync(
3070                        &output.pm_settlement_effects,
3071                        &request_id,
3072                    ) {
3073                        panic!(
3074                            "CRITICAL_FAILURE: failed to apply PM settlement projection effects for request_id {}: {}",
3075                            request_id, error
3076                        );
3077                    }
3078                    self.publish_to_nats(&env).await;
3079                    self.publish_snapshot();
3080                    last_read_snapshot = Instant::now();
3081                    let _ = request.applied_tx.send(Ok(()));
3082                }
3083
3084                // Handle HyperCore equity updates from Hydromancer feed.
3085                Some(update) = async {
3086                    match self.hypercore_equity_receiver.as_mut() {
3087                        Some(rx) => rx.recv().await,
3088                        None => std::future::pending().await,
3089                    }
3090                }, if !self.runtime_quiesced => {
3091                    let env = crate::rsm::apply::CommandEnvelope::new(
3092                        update.timestamp_ms,
3093                        crate::rsm::apply::EngineCommand::HypercoreEquityUpdate {
3094                            wallet: update.wallet,
3095                            account_value: update.account_value,
3096                            timestamp_ms: update.timestamp_ms,
3097                        },
3098                    );
3099                    match self.apply(env.clone()) {
3100                        Ok(_) => {
3101                            self.publish_to_nats(&env).await;
3102                        }
3103                        Err(e) => {
3104                            warn!(
3105                                wallet = %update.wallet,
3106                                error = %e,
3107                                "Failed to apply HypercoreEquityUpdate"
3108                            );
3109                        }
3110                    }
3111                }
3112
3113                // Handle market management requests through apply()
3114                Some(request) = self.market_receiver.recv() => {
3115                    if self.runtime_quiesced {
3116                        let status = match request.message.action {
3117                            MarketAction::CreateMarket => MarketUpdateStatus::MarketCreationFailed,
3118                            MarketAction::DeleteMarket => MarketUpdateStatus::MarketDeletionFailed,
3119                            MarketAction::ExpireMarket => MarketUpdateStatus::MarketPendingSettlement,
3120                        };
3121                        let _ = request
3122                            .response_tx
3123                            .send(MarketUpdateMessage {
3124                                market: request.message.market,
3125                                status,
3126                                timestamp: request.message.timestamp,
3127                                reason: Some("engine is quiesced".to_string()),
3128                            })
3129                            .await;
3130                        continue;
3131                    }
3132                    market_start = Instant::now();
3133                    let original_response_tx = request.response_tx;
3134                    let message = request.message;
3135                    let market_cmd = if matches!(message.action, MarketAction::ExpireMarket) {
3136                        match self.prepare_manual_market_expiry_context(&message).await {
3137                            Ok(context) => crate::rsm::apply::MarketActionCommand::with_expiry_context(message, context),
3138                            Err(e) => {
3139                                error!("Failed to prepare MarketAction expiry context: {}", e);
3140                                let _ = original_response_tx
3141                                    .send(MarketUpdateMessage {
3142                                        market: message.market,
3143                                        status: MarketUpdateStatus::MarketPendingSettlement,
3144                                        timestamp: message.timestamp,
3145                                        reason: Some(e.to_string()),
3146                                    })
3147                                    .await;
3148                                continue;
3149                            }
3150                        }
3151                    } else {
3152                        crate::rsm::apply::MarketActionCommand::new(message)
3153                    };
3154                    if let Err(e) = self.validate_manual_market_expiry_nats_payload(&market_cmd) {
3155                        error!("Failed to validate MarketAction NATS payload: {}", e);
3156                        let _ = original_response_tx
3157                            .send(MarketUpdateMessage {
3158                                market: market_cmd.message.market,
3159                                status: MarketUpdateStatus::MarketPendingSettlement,
3160                                timestamp: market_cmd.message.timestamp,
3161                                reason: Some(e.to_string()),
3162                            })
3163                            .await;
3164                        continue;
3165                    }
3166                    let failure_market = (
3167                        market_cmd.message.market.clone(),
3168                        market_cmd.message.timestamp,
3169                    );
3170                    let env = crate::rsm::apply::CommandEnvelope::new(
3171                        market_cmd.message.timestamp,
3172                        crate::rsm::apply::EngineCommand::MarketAction(market_cmd),
3173                    );
3174                    let nats_env = env.clone();
3175                    match self.apply(env) {
3176                        Ok(output) => {
3177                            self.journal_replay_owned_expiry_command(
3178                                &nats_env,
3179                                &output.balance_updates,
3180                            )
3181                            .await;
3182                            if let Err(e) = self.apply_market_effects(&output).await {
3183                                panic!(
3184                                    "CRITICAL_FAILURE: apply_market_effects() failed for MarketAction: {}",
3185                                    e
3186                                );
3187                            }
3188                            let events_emitted_by_expiry_effects =
3189                                if output.expiry_effects.is_empty() {
3190                                    false
3191                                } else if let Err(e) = self
3192                                    .apply_expiry_effects_and_events(&output, "manual-market-expiry")
3193                                    .await
3194                                {
3195                                    panic!(
3196                                        "CRITICAL_FAILURE: apply_expiry_effects_and_events() failed for MarketAction: {}. \
3197                                         Expiry runtime effects could not be applied after memory state advanced.",
3198                                        e
3199                                    );
3200                                } else {
3201                                    true
3202                                };
3203                            self.publish_to_nats(&nats_env).await;
3204                            if !events_emitted_by_expiry_effects {
3205                                for event in &output.events {
3206                                    self.ctx.deps.emit_event(event);
3207                                }
3208                            }
3209                            if let Some(response) = output.market_response {
3210                                let _ = original_response_tx.send(response).await;
3211                            }
3212                            self.publish_balance_updates_to_nats(&output.balance_updates)
3213                                .await;
3214                        }
3215                        Err(e) => {
3216                            error!("apply() failed for MarketAction: {}", e);
3217                            let (market, timestamp) = failure_market;
3218                            let _ = original_response_tx
3219                                .send(MarketUpdateMessage {
3220                                    market,
3221                                    status: MarketUpdateStatus::MarketPendingSettlement,
3222                                    timestamp,
3223                                    reason: Some(e.to_string()),
3224                                })
3225                                .await;
3226                            market_request_hist.record(market_start.elapsed().as_secs_f64());
3227                            continue;
3228                        }
3229                    }
3230                    self.publish_snapshot();
3231                    last_read_snapshot = Instant::now();
3232                    market_request_hist.record(market_start.elapsed().as_secs_f64());
3233                }
3234
3235                // Handle live trading_mode updates from catalog manager.
3236                // `watch::Receiver::changed()` returns Ok(()) when a new
3237                // value has been published and Err when the sender side
3238                // is dropped (e.g. catalog manager disabled or crashed).
3239                // On Err we park forever — continuing to poll would hot-
3240                // loop since `changed()` returns Err synchronously after
3241                // the sender is gone.
3242                Some(()) = async {
3243                    match self.trading_mode_receiver.as_mut() {
3244                        Some(rx) => match rx.changed().await {
3245                            Ok(()) => Some(()),
3246                            Err(_) => {
3247                                std::future::pending::<()>().await;
3248                                unreachable!()
3249                            }
3250                        },
3251                        None => std::future::pending().await,
3252                    }
3253                }, if !self.runtime_quiesced => {
3254                    if let Some(rx) = self.trading_mode_receiver.as_mut() {
3255                        let update = rx.borrow_and_update().clone();
3256                        if !update.is_empty() {
3257                            info!(
3258                                update_size = update.len(),
3259                                "Received live trading_mode update from catalog manager"
3260                            );
3261                            let now_ms = get_timestamp_millis();
3262                            let env = crate::rsm::apply::CommandEnvelope::new(
3263                                now_ms,
3264                                crate::rsm::apply::EngineCommand::TradingModeUpdate {
3265                                    modes: update,
3266                                    timestamp_ms: now_ms,
3267                                },
3268                            );
3269                            let nats_env = env.clone();
3270                            match self.apply(env) {
3271                                Ok(_) => { self.publish_to_nats(&nats_env).await; }
3272                                Err(e) => { error!("apply() failed for TradingModeUpdate: {}", e); }
3273                            }
3274                        }
3275                    }
3276                }
3277
3278                // Periodic expiry check — routed through apply() for hash chain
3279                _ = expiry_interval.tick(), if !self.runtime_quiesced => {
3280                    expiry_start = Instant::now();
3281                    let now_ms = get_timestamp_millis();
3282                    let env = match self.prepare_tick_expiry_env(now_ms).await {
3283                        Ok(env) => env,
3284                        Err(e) => {
3285                            panic!(
3286                                "CRITICAL_FAILURE: prepare_tick_expiry_env() failed: {}. \
3287                                 Expiry settlement cannot continue without explicit context.",
3288                                e
3289                            );
3290                        }
3291                    };
3292                    let nats_env = env.clone();
3293                    match self.apply(env) {
3294                        Ok(output) => {
3295                            self.journal_replay_owned_expiry_command(
3296                                &nats_env,
3297                                &output.balance_updates,
3298                            )
3299                            .await;
3300                            if let Err(e) = self
3301                                .apply_expiry_effects_and_events(&output, "tick-expiry")
3302                                .await
3303                            {
3304                                panic!(
3305                                    "CRITICAL_FAILURE: apply_expiry_effects_and_events() failed for TickExpiry: {}. \
3306                                     Expiry runtime effects could not be applied after memory state advanced.",
3307                                    e
3308                                );
3309                            } else {
3310                                self.publish_to_nats(&nats_env).await;
3311                            }
3312                            self.publish_tick_expiry_balance_updates(&output, "tick-expiry")
3313                                .await;
3314                        }
3315                        Err(e) => {
3316                            panic!(
3317                                "CRITICAL_FAILURE: apply() failed for TickExpiry: {}. \
3318                                 Expiry state could not be advanced deterministically.",
3319                                e
3320                            );
3321                        }
3322                    }
3323                    self.publish_snapshot();
3324                    last_read_snapshot = Instant::now();
3325                    expiry_check_hist.record(expiry_start.elapsed().as_secs_f64());
3326                }
3327
3328                // Feed spot prices into the engine as deterministic commands
3329                _ = price_update_interval.tick(), if !self.runtime_quiesced => {
3330                    self.ingest_price_updates().await;
3331                    self.ingest_iv_updates().await;
3332                }
3333
3334                // One-time post-startup reconciliation after persistence has had time to catch up.
3335                _ = &mut post_startup_reconcile, if !self.post_startup_reconciled => {
3336                    self.post_startup_reconciled = true;
3337                    ReplayRecovery::run_post_startup_reconciliation(&mut self);
3338                    if !self.sync_status.is_ready() {
3339                        self.sync_status.set_ready();
3340                        info!("UnifiedEngine sync status: Ready (post-startup reconciliation complete)");
3341                    }
3342                }
3343
3344                // Periodic journal flush + engine state snapshot
3345                _ = snapshot_interval.tick() => {
3346                    let now = Instant::now();
3347
3348                    if now.duration_since(self.last_snapshot) >= self.snapshot_interval {
3349                        snapshot_start = Instant::now();
3350                        self.flush_journal().await;
3351
3352                        self.persist_engine_state_snapshot();
3353                        snapshot_hist.record(snapshot_start.elapsed().as_secs_f64());
3354                        self.last_snapshot = now;
3355                    }
3356                }
3357
3358                // Handle shutdown signal
3359                _ = self.shutdown_receiver.recv() => {
3360                    info!("Unified engine received shutdown signal");
3361                    self.flush_journal().await;
3362                    self.persist_engine_state_snapshot();
3363                    break;
3364                }
3365
3366                // If channels are closed, exit
3367                else => {
3368                    info!("Channels closed, shutting down engine");
3369                    self.flush_journal().await;
3370                    self.persist_engine_state_snapshot();
3371                    break;
3372                }
3373            }
3374
3375            // Record loop iteration time (metrics crate uses lock-free atomics, very fast)
3376            loop_iteration_hist.record(loop_start.elapsed().as_secs_f64());
3377        }
3378
3379        info!("Unified engine stopped");
3380    }
3381
3382    async fn handle_quiesce_request(&mut self, request: EngineQuiesceRequest) {
3383        match request.action {
3384            EngineQuiesceAction::Quiesce => {
3385                self.runtime_quiesced = true;
3386                self.flush_journal().await;
3387                let snapshot_command_id = self.persist_engine_state_snapshot();
3388                let last_command_id =
3389                    snapshot_command_id.unwrap_or_else(|| self.current_wal_checkpoint_command_id());
3390                self.publish_snapshot();
3391                self.last_snapshot = Instant::now();
3392                info!(
3393                    last_command_id,
3394                    last_l2_seq = self.ctx.l2_update_seq.load(Ordering::SeqCst),
3395                    "Unified engine entered quiesced mode"
3396                );
3397                let _ = request.response_tx.send(EngineQuiesceReport {
3398                    phase: hypercall_recovery::RecoveryPhase::Snapshotting,
3399                    quiesced: true,
3400                    last_command_id,
3401                    last_l2_seq: self.ctx.l2_update_seq.load(Ordering::SeqCst),
3402                    snapshot_persisted: snapshot_command_id.is_some(),
3403                    paused_sources: crate::rsm::restart_components::registered_mutation_sources(),
3404                });
3405            }
3406            EngineQuiesceAction::Resume => {
3407                self.runtime_quiesced = false;
3408                info!("Unified engine resumed live mutation sources");
3409                let _ = request.response_tx.send(EngineQuiesceReport {
3410                    phase: hypercall_recovery::RecoveryPhase::Recovered,
3411                    quiesced: false,
3412                    last_command_id: self.current_wal_checkpoint_command_id(),
3413                    last_l2_seq: self.ctx.l2_update_seq.load(Ordering::SeqCst),
3414                    snapshot_persisted: false,
3415                    paused_sources: Vec::new(),
3416                });
3417            }
3418        }
3419    }
3420
3421    /// Read spot prices from the external greeks cache and apply them as
3422    /// deterministic PriceUpdate commands to the engine state.
3423    async fn ingest_price_updates(&mut self) {
3424        self.ingest_price_updates_with_side_effects(true).await;
3425    }
3426
3427    async fn ingest_price_updates_without_side_effects(&mut self) {
3428        self.ingest_price_updates_with_side_effects(false).await;
3429    }
3430
3431    async fn ingest_price_updates_with_side_effects(&mut self, emit_side_effects: bool) {
3432        use crate::journal::engine_journal_batcher::{JournalEntry, JournalMessage};
3433        use crate::rsm::apply::{CommandEnvelope, EngineCommand, PriceUpdatePayload};
3434        use rust_decimal::Decimal;
3435
3436        let greeks_cache = match &self.ctx.deps.greeks_cache {
3437            Some(cache) => cache.clone(),
3438            None => return,
3439        };
3440
3441        let mut underlyings = greeks_cache.get_configured_underlyings();
3442        underlyings.sort();
3443        let now_ms = get_timestamp_millis();
3444
3445        for underlying in &underlyings {
3446            if let Some(price_f64) = greeks_cache.get_spot_price(underlying).await {
3447                let spot_price = match Decimal::from_f64_retain(price_f64) {
3448                    Some(d) => d,
3449                    None => continue,
3450                };
3451
3452                if self.ctx.spot_prices.get(underlying) == Some(&spot_price) {
3453                    continue;
3454                }
3455
3456                let env = CommandEnvelope::new(
3457                    now_ms,
3458                    EngineCommand::PriceUpdate {
3459                        underlying: underlying.clone(),
3460                        spot_price,
3461                        timestamp_ms: now_ms,
3462                    },
3463                );
3464
3465                #[cfg(feature = "rsm-state")]
3466                let cmd_identity_hash = emit_side_effects.then(|| env.command.identity_hash());
3467                let nats_env = emit_side_effects.then(|| env.clone());
3468                if let Err(e) = self.apply(env) {
3469                    warn!(
3470                        underlying = %underlying,
3471                        error = %e,
3472                        "Failed to apply PriceUpdate command"
3473                    );
3474                    continue;
3475                }
3476                if let Some(nats_env) = nats_env {
3477                    self.publish_to_nats(&nats_env).await;
3478                }
3479
3480                if let (Some(ref batch_sender), Some(_cmd_identity_hash)) = (
3481                    &self.journal_batch_sender,
3482                    #[cfg(feature = "rsm-state")]
3483                    cmd_identity_hash,
3484                    #[cfg(not(feature = "rsm-state"))]
3485                    emit_side_effects.then_some(()),
3486                ) {
3487                    let payload = PriceUpdatePayload {
3488                        underlying: underlying.clone(),
3489                        spot_price,
3490                        timestamp_ms: now_ms,
3491                    };
3492                    let command_data = hypercall_types::serialize_to_wire_bytes(&payload);
3493
3494                    let entry = JournalEntry {
3495                        received_ts_ms: now_ms,
3496                        command_data,
3497                        response_data: None,
3498                        order_id: None,
3499                        pre_digest: Default::default(),
3500                        post_digest: Default::default(),
3501                        duration_ms: 0,
3502                        events: Vec::new(),
3503                        outbox_appends: Vec::new(),
3504                        fill_side_effects: Vec::new(),
3505                        cash_withdrawal_side_effect: None,
3506                        balance_updates: Vec::new(),
3507                        created_at: Instant::now(),
3508                        commit_ack: None,
3509                        request_uuid: hypercall_db_diesel::engine_enums::DbUuid(
3510                            uuid::Uuid::new_v4(),
3511                        ),
3512                        command_type_enum: Some(
3513                            hypercall_db_diesel::engine_enums::CommandType::PriceUpdate,
3514                        ),
3515                        #[cfg(feature = "rsm-state")]
3516                        command_identity_hash: _cmd_identity_hash,
3517                        #[cfg(feature = "rsm-state")]
3518                        rsm_state_digest: Some(
3519                            crate::rsm::engine_snapshot::EngineStateDigest::from_ctx(
3520                                &self.ctx,
3521                                self.ctx
3522                                    .l2_update_seq
3523                                    .load(std::sync::atomic::Ordering::SeqCst),
3524                            ),
3525                        ),
3526                    };
3527                    if let Err(e) = batch_sender.send(JournalMessage::Entry(entry)).await {
3528                        panic!(
3529                            "JOURNAL_FATAL: failed to journal PriceUpdate for {}: {}",
3530                            underlying, e
3531                        );
3532                    }
3533                }
3534            }
3535        }
3536    }
3537
3538    /// Read IV surfaces from the external vol oracle and apply them as
3539    /// deterministic IvUpdate commands to the engine state.
3540    ///
3541    /// Heavy work (surface rebuild, serialization) happens here, outside the
3542    /// engine's critical path. apply() only does two map inserts.
3543    async fn ingest_iv_updates(&mut self) {
3544        self.ingest_iv_updates_with_side_effects(true).await;
3545    }
3546
3547    async fn ingest_iv_updates_without_side_effects(&mut self) {
3548        self.ingest_iv_updates_with_side_effects(false).await;
3549    }
3550
3551    async fn ingest_iv_updates_with_side_effects(&mut self, emit_side_effects: bool) {
3552        use crate::journal::engine_journal_batcher::{JournalEntry, JournalMessage};
3553        use crate::rsm::apply::{CommandEnvelope, EngineCommand, IvUpdatePayload};
3554        use crate::vol_oracle::vol_surface_cache::VolatilitySurface;
3555
3556        let vol_oracle = match &self.external_vol_oracle {
3557            Some(oracle) => oracle.clone(),
3558            None => return,
3559        };
3560
3561        let statuses = vol_oracle.statuses();
3562        let now_ms = get_timestamp_millis();
3563
3564        let mut all_underlyings: Vec<String> =
3565            statuses.iter().map(|s| s.underlying.clone()).collect();
3566        all_underlyings.sort();
3567        all_underlyings.dedup();
3568
3569        // Clear engine surfaces for underlyings whose providers are no longer ready.
3570        // This ensures the fallback oracle takes over when feeds go stale.
3571        for underlying in &all_underlyings {
3572            let is_ready = statuses
3573                .iter()
3574                .any(|s| s.underlying == *underlying && s.ready);
3575            if !is_ready {
3576                if self.ctx.iv_surfaces.remove(underlying).is_some() {
3577                    if let Ok(mut shared) = self.engine_iv_surfaces.write() {
3578                        shared.remove(underlying);
3579                    }
3580                    debug!(
3581                        underlying = %underlying,
3582                        "Cleared stale engine IV surface (provider no longer ready)"
3583                    );
3584                }
3585                continue;
3586            }
3587        }
3588
3589        let ready_underlyings: Vec<String> = all_underlyings
3590            .iter()
3591            .filter(|u| statuses.iter().any(|s| s.underlying == **u && s.ready))
3592            .cloned()
3593            .collect();
3594
3595        for underlying in &ready_underlyings {
3596            let snapshot = match vol_oracle.get_surface_snapshot(underlying) {
3597                Some(s) if !s.strike_points.is_empty() => s,
3598                _ => match synthesize_fixed_surface_snapshot(
3599                    &self.ctx.orderbooks,
3600                    &vol_oracle,
3601                    &statuses,
3602                    underlying,
3603                    now_ms as i64,
3604                ) {
3605                    Some(snapshot) => snapshot,
3606                    None => continue,
3607                },
3608            };
3609
3610            // Always rebuild surface outside the engine loop
3611            let mut surface = VolatilitySurface::new();
3612            for point in &snapshot.strike_points {
3613                surface.insert(point.strike, point.expiry, point.iv);
3614            }
3615
3616            // Serialize in parallel with content comparison (both outside critical path)
3617            let journal_data = if emit_side_effects && self.journal_batch_sender.is_some() {
3618                let payload = IvUpdatePayload {
3619                    underlying: underlying.clone(),
3620                    strike_points: snapshot.strike_points.clone(),
3621                    timestamp_ms: now_ms,
3622                };
3623                Some(hypercall_types::serialize_to_wire_bytes(&payload))
3624            } else {
3625                None
3626            };
3627
3628            // Full content comparison: check every point against existing surface.
3629            // This is outside the engine loop so compute cost doesn't matter.
3630            // Only apply + journal if something actually changed.
3631            let changed = match self.ctx.iv_surfaces.get(underlying) {
3632                None => true,
3633                Some(existing) => {
3634                    // Detect point removals (expiry drops, grid shrinks)
3635                    snapshot.strike_points.len() != existing.len()
3636                        || snapshot.strike_points.iter().any(|point| {
3637                            existing
3638                                .get_interpolated(point.strike, point.expiry)
3639                                .map(|existing_iv| (existing_iv - point.iv).abs() > 1e-12)
3640                                .unwrap_or(true)
3641                        })
3642                }
3643            };
3644
3645            if !changed {
3646                if let Some(ts) = snapshot.last_update_ts_ms {
3647                    self.ctx.iv_source_timestamps.insert(underlying.clone(), ts);
3648                }
3649                continue;
3650            }
3651
3652            let env = CommandEnvelope::new(
3653                now_ms,
3654                EngineCommand::IvUpdate {
3655                    underlying: underlying.clone(),
3656                    surface,
3657                    journal_data: journal_data.clone(),
3658                    timestamp_ms: now_ms,
3659                },
3660            );
3661
3662            if let Some(ts) = snapshot.last_update_ts_ms {
3663                self.ctx.iv_source_timestamps.insert(underlying.clone(), ts);
3664            }
3665
3666            #[cfg(feature = "rsm-state")]
3667            let cmd_identity_hash = emit_side_effects.then(|| env.command.identity_hash());
3668            let nats_env = emit_side_effects.then(|| env.clone());
3669            if let Err(e) = self.apply(env) {
3670                warn!(
3671                    underlying = %underlying,
3672                    error = %e,
3673                    "Failed to apply IvUpdate command"
3674                );
3675                continue;
3676            }
3677            if let Some(nats_env) = nats_env {
3678                self.publish_to_nats(&nats_env).await;
3679            }
3680
3681            if let (Some(ref batch_sender), Some(command_data), Some(_cmd_identity_hash)) = (
3682                &self.journal_batch_sender,
3683                journal_data,
3684                #[cfg(feature = "rsm-state")]
3685                cmd_identity_hash,
3686                #[cfg(not(feature = "rsm-state"))]
3687                emit_side_effects.then_some(()),
3688            ) {
3689                let entry = JournalEntry {
3690                    received_ts_ms: now_ms,
3691                    command_data,
3692                    response_data: None,
3693                    order_id: None,
3694                    pre_digest: Default::default(),
3695                    post_digest: Default::default(),
3696                    duration_ms: 0,
3697                    events: Vec::new(),
3698                    outbox_appends: Vec::new(),
3699                    fill_side_effects: Vec::new(),
3700                    cash_withdrawal_side_effect: None,
3701                    balance_updates: Vec::new(),
3702                    created_at: Instant::now(),
3703                    commit_ack: None,
3704                    request_uuid: hypercall_db_diesel::engine_enums::DbUuid(uuid::Uuid::new_v4()),
3705                    command_type_enum: Some(
3706                        hypercall_db_diesel::engine_enums::CommandType::IvUpdate,
3707                    ),
3708                    #[cfg(feature = "rsm-state")]
3709                    command_identity_hash: _cmd_identity_hash,
3710                    #[cfg(feature = "rsm-state")]
3711                    rsm_state_digest: Some(
3712                        crate::rsm::engine_snapshot::EngineStateDigest::from_ctx(
3713                            &self.ctx,
3714                            self.ctx
3715                                .l2_update_seq
3716                                .load(std::sync::atomic::Ordering::SeqCst),
3717                        ),
3718                    ),
3719                };
3720                if let Err(e) = batch_sender.send(JournalMessage::Entry(entry)).await {
3721                    panic!(
3722                        "JOURNAL_FATAL: failed to journal IvUpdate for {}: {}",
3723                        underlying, e
3724                    );
3725                }
3726            }
3727        }
3728    }
3729}
3730
3731#[cfg(test)]
3732mod tests {
3733    use super::*;
3734    use hypercall_engine::OrderBook;
3735    use rust_decimal_macros::dec;
3736
3737    #[test]
3738    fn external_durable_mutation_uuid_accepts_uuid() {
3739        let request_id = uuid::Uuid::now_v7().to_string();
3740        let parsed =
3741            UnifiedEngine::require_external_durable_mutation_uuid("DepositUpdate", &request_id);
3742        assert_eq!(parsed.to_string(), request_id);
3743    }
3744
3745    #[test]
3746    #[should_panic(
3747        expected = "RUNTIME_INVARIANT: external engine command DepositUpdate request_id not-a-uuid is not a UUID"
3748    )]
3749    fn external_durable_mutation_uuid_panics_on_non_uuid() {
3750        UnifiedEngine::require_external_durable_mutation_uuid("DepositUpdate", "not-a-uuid");
3751    }
3752
3753    #[test]
3754    fn empty_external_balance_updates_are_idempotent_for_journal_retries() {
3755        assert!(UnifiedEngine::is_idempotent_empty_balance_update(
3756            "DepositUpdate"
3757        ));
3758        assert!(UnifiedEngine::is_idempotent_empty_balance_update(
3759            "LiquidationBonusUpdate"
3760        ));
3761        assert!(!UnifiedEngine::is_idempotent_empty_balance_update(
3762            "CashWithdrawalUpdate"
3763        ));
3764    }
3765
3766    #[test]
3767    fn pm_settlement_admin_stamp_replaces_client_timestamp() {
3768        let client_timestamp_ms = 1_800_000_000_000;
3769        let server_timestamp_ms = client_timestamp_ms + 60_000;
3770        let command = crate::rsm::apply::EngineCommand::AccruePmSettlementInterest(
3771            hypercall_engine::command::AccruePmSettlementInterestCommand {
3772                request_id: uuid::Uuid::now_v7(),
3773                input_digest: "admin-pm-interest-test".to_string(),
3774                wallet: hypercall_types::wallet_address::test_wallet(1),
3775                underlying: "BTC".to_string(),
3776                to_ms: client_timestamp_ms as i64,
3777                timestamp_ms: client_timestamp_ms,
3778            },
3779        );
3780
3781        let stamped =
3782            UnifiedEngine::stamp_pm_settlement_admin_command(command, server_timestamp_ms);
3783
3784        match stamped {
3785            crate::rsm::apply::EngineCommand::AccruePmSettlementInterest(command) => {
3786                assert_eq!(command.timestamp_ms, server_timestamp_ms);
3787                assert_eq!(command.to_ms, client_timestamp_ms as i64);
3788            }
3789            other => panic!("unexpected command {}", other.command_type()),
3790        }
3791    }
3792
3793    #[test]
3794    fn expire_market_is_replay_journal_owned() {
3795        let market = hypercall_types::Market {
3796            symbol: "BTC-20260531-70000-C".to_string(),
3797            underlying: "BTC".to_string(),
3798            expiry: 20260531,
3799            strike: dec!(70000),
3800            option_type: hypercall_types::OptionType::Call,
3801        };
3802        let command = crate::rsm::apply::MarketActionCommand::with_expiry_context(
3803            hypercall_types::MarketActionMessage {
3804                market,
3805                action: hypercall_types::MarketAction::ExpireMarket,
3806                timestamp: 1_800_000_000_000,
3807            },
3808            crate::rsm::apply::TickExpiryContext::empty(),
3809        );
3810        let env = crate::rsm::apply::CommandEnvelope::new(
3811            1_800_000_000_000,
3812            crate::rsm::apply::EngineCommand::MarketAction(command),
3813        );
3814
3815        let (command_type, data, label) = UnifiedEngine::replay_owned_expiry_command_payload(&env)
3816            .expect("ExpireMarket must be journaled for replay");
3817
3818        assert_eq!(
3819            command_type,
3820            hypercall_db_diesel::engine_enums::CommandType::ExpireMarket
3821        );
3822        assert_eq!(label, "ExpireMarket");
3823        let decoded = crate::nats::deserialize::deserialize_command(
3824            crate::nats::CommandType::MarketAction,
3825            crate::nats::COMMAND_WIRE_VERSION_V1,
3826            &data,
3827        )
3828        .expect("journaled ExpireMarket payload should decode for replay");
3829        assert!(matches!(
3830            decoded,
3831            crate::rsm::apply::EngineCommand::MarketAction(command)
3832                if command.message.action == hypercall_types::MarketAction::ExpireMarket
3833        ));
3834    }
3835
3836    #[test]
3837    fn non_empty_tick_expiry_is_replay_journal_owned() {
3838        let context = crate::rsm::apply::TickExpiryContext {
3839            due_expiries: vec![crate::rsm::apply::TickExpiryDueGroup {
3840                expiry_ts: 1_800_000_000,
3841                symbols: vec!["BTC-20260531-70000-C".to_string()],
3842            }],
3843            pending_settlements: Vec::new(),
3844            settlement_prices: Vec::new(),
3845            margin_modes: Vec::new(),
3846            pm_settlements: Vec::new(),
3847        };
3848        let env = crate::rsm::apply::CommandEnvelope::new(
3849            1_800_000_000_000,
3850            crate::rsm::apply::EngineCommand::TickExpiry {
3851                now_ms: 1_800_000_000_000,
3852                context,
3853            },
3854        );
3855
3856        let (command_type, data, label) = UnifiedEngine::replay_owned_expiry_command_payload(&env)
3857            .expect("non-empty TickExpiry must be journaled for replay");
3858
3859        assert_eq!(
3860            command_type,
3861            hypercall_db_diesel::engine_enums::CommandType::TickExpiry
3862        );
3863        assert_eq!(label, "TickExpiry");
3864        let decoded = crate::nats::deserialize::deserialize_command(
3865            crate::nats::CommandType::TickExpiry,
3866            crate::nats::COMMAND_WIRE_VERSION_V1,
3867            &data,
3868        )
3869        .expect("journaled TickExpiry payload should decode for replay");
3870        assert!(matches!(
3871            decoded,
3872            crate::rsm::apply::EngineCommand::TickExpiry { .. }
3873        ));
3874    }
3875
3876    #[test]
3877    fn replay_owned_expiry_journal_entry_carries_balance_updates() {
3878        let context = crate::rsm::apply::TickExpiryContext {
3879            due_expiries: vec![crate::rsm::apply::TickExpiryDueGroup {
3880                expiry_ts: 1_800_000_000,
3881                symbols: vec!["BTC-20260531-70000-C".to_string()],
3882            }],
3883            pending_settlements: Vec::new(),
3884            settlement_prices: Vec::new(),
3885            margin_modes: Vec::new(),
3886            pm_settlements: Vec::new(),
3887        };
3888        let env = crate::rsm::apply::CommandEnvelope::new(
3889            1_800_000_000_000,
3890            crate::rsm::apply::EngineCommand::TickExpiry {
3891                now_ms: 1_800_000_000_000,
3892                context,
3893            },
3894        );
3895        let balance_update = hypercall_types::BalanceUpdate {
3896            balance_update_seq: 17,
3897            wallet: hypercall_types::wallet_address::test_wallet(7),
3898            delta: dec!(12.5),
3899            balance_after: dec!(112.5),
3900            reason: hypercall_types::BalanceUpdateReason::Settlement,
3901            reference_id: Some("settlement:BTC-20260531-70000-C".to_string()),
3902            source_command_id: None,
3903            timestamp_ms: 1_800_000_000_000,
3904        };
3905        let request_uuid = hypercall_db_diesel::engine_enums::DbUuid(uuid::Uuid::nil());
3906
3907        let (entry, label) = UnifiedEngine::replay_owned_expiry_journal_entry(
3908            &env,
3909            std::slice::from_ref(&balance_update),
3910            request_uuid,
3911            None,
3912        )
3913        .expect("non-empty TickExpiry must produce a replay journal entry");
3914
3915        assert_eq!(label, "TickExpiry");
3916        assert_eq!(entry.balance_updates, vec![balance_update]);
3917        assert_eq!(entry.request_uuid, request_uuid);
3918        assert_eq!(
3919            entry.command_type_enum,
3920            Some(hypercall_db_diesel::engine_enums::CommandType::TickExpiry)
3921        );
3922    }
3923
3924    #[test]
3925    fn empty_tick_expiry_is_not_replay_journal_owned() {
3926        let env = crate::rsm::apply::CommandEnvelope::new(
3927            1_800_000_000_000,
3928            crate::rsm::apply::EngineCommand::TickExpiry {
3929                now_ms: 1_800_000_000_000,
3930                context: crate::rsm::apply::TickExpiryContext::empty(),
3931            },
3932        );
3933
3934        assert!(UnifiedEngine::replay_owned_expiry_command_payload(&env).is_none());
3935    }
3936
3937    #[test]
3938    fn synthesize_fixed_surface_snapshot_uses_live_orderbook_grid() {
3939        let mut orderbooks = std::collections::HashMap::new();
3940        orderbooks.insert(
3941            "GOLD-20261231-4700-C".to_string(),
3942            OrderBook::with_symbol(
3943                hypercall_types::expiry_date_to_timestamp("GOLD", 20261231) as u64,
3944                dec!(4700),
3945                hypercall_types::OptionType::Call,
3946                "GOLD-20261231-4700-C".to_string(),
3947            ),
3948        );
3949        orderbooks.insert(
3950            "GOLD-20261231-4700-P".to_string(),
3951            OrderBook::with_symbol(
3952                hypercall_types::expiry_date_to_timestamp("GOLD", 20261231) as u64,
3953                dec!(4700),
3954                hypercall_types::OptionType::Put,
3955                "GOLD-20261231-4700-P".to_string(),
3956            ),
3957        );
3958
3959        let oracle: crate::vol_oracle::SharedVolOracle =
3960            std::sync::Arc::new(crate::vol_oracle::FixedTestRiskVolOracle::with_underlyings(
3961                0.50,
3962                vec!["GOLD".to_string()],
3963            ));
3964        let statuses = oracle.statuses();
3965        let snapshot = synthesize_fixed_surface_snapshot(
3966            &orderbooks,
3967            &oracle,
3968            &statuses,
3969            "GOLD",
3970            1_800_000_000_000,
3971        )
3972        .expect("fixed snapshot should be synthesized");
3973
3974        assert_eq!(snapshot.strike_points.len(), 1);
3975        assert_eq!(snapshot.strike_points[0].strike, 4700.0);
3976        assert_eq!(
3977            snapshot.strike_points[0].expiry,
3978            hypercall_types::expiry_date_to_timestamp("GOLD", 20261231) as i64
3979        );
3980        assert_eq!(snapshot.strike_points[0].iv, 0.50);
3981    }
3982
3983    #[test]
3984    fn max_listed_option_expiry_uses_engine_orderbooks() {
3985        let mut orderbooks = std::collections::HashMap::new();
3986        orderbooks.insert(
3987            "GOLD-20261231-4700-C".to_string(),
3988            OrderBook::with_symbol(
3989                hypercall_types::expiry_date_to_timestamp("GOLD", 20261231) as u64,
3990                dec!(4700),
3991                hypercall_types::OptionType::Call,
3992                "GOLD-20261231-4700-C".to_string(),
3993            ),
3994        );
3995        orderbooks.insert(
3996            "GOLD-20270630-4700-C".to_string(),
3997            OrderBook::with_symbol(
3998                hypercall_types::expiry_date_to_timestamp("GOLD", 20270630) as u64,
3999                dec!(4700),
4000                hypercall_types::OptionType::Call,
4001                "GOLD-20270630-4700-C".to_string(),
4002            ),
4003        );
4004        orderbooks.insert(
4005            "BTC-20280630-100000-C".to_string(),
4006            OrderBook::with_symbol(
4007                hypercall_types::expiry_date_to_timestamp("BTC", 20280630) as u64,
4008                dec!(100000),
4009                hypercall_types::OptionType::Call,
4010                "BTC-20280630-100000-C".to_string(),
4011            ),
4012        );
4013
4014        assert_eq!(
4015            max_listed_option_expiry_ts_ms(&orderbooks, "GOLD").unwrap(),
4016            hypercall_types::expiry_date_to_timestamp("GOLD", 20270630) * 1_000
4017        );
4018    }
4019
4020    #[test]
4021    fn max_listed_option_expiry_rejects_unknown_underlying() {
4022        let orderbooks = std::collections::HashMap::new();
4023        let error = max_listed_option_expiry_ts_ms(&orderbooks, "GOLD")
4024            .expect_err("missing listed markets should reject");
4025        assert!(error.contains("no listed option markets"));
4026    }
4027}