1use super::*;
4use crate::shared::order_types::ParsedSymbol;
5use crate::vol_oracle::{VolPoint, VolProviderKind, VolSurfaceSnapshot};
6use hypercall_db::{
7 DirectiveOutboxReader, OrderWriter, PmSettlementProjectionSyncWriter, SettlementWriter,
8};
9use rust_decimal::prelude::ToPrimitive;
10use std::collections::BTreeSet;
11
12const STANDBY_SETTLEMENT_OBSERVE_ATTEMPTS: usize = 8;
13const STANDBY_SETTLEMENT_OBSERVE_INITIAL_BACKOFF_MS: u64 = 25;
14const STANDBY_SETTLEMENT_OBSERVE_MAX_BACKOFF_MS: u64 = 500;
15
16fn synthesize_fixed_surface_snapshot(
17 orderbooks: &std::collections::HashMap<String, hypercall_engine::OrderBook>,
18 vol_oracle: &crate::vol_oracle::SharedVolOracle,
19 statuses: &[crate::vol_oracle::VolOracleStatus],
20 underlying: &str,
21 now_ms: i64,
22) -> Option<VolSurfaceSnapshot> {
23 let has_ready_fixed = statuses.iter().any(|status| {
24 status.underlying == underlying && status.provider == VolProviderKind::Fixed && status.ready
25 });
26 if !has_ready_fixed {
27 return None;
28 }
29
30 let mut strike_points = Vec::new();
31 let mut seen = BTreeSet::new();
32 for symbol in orderbooks.keys() {
33 let Ok(parsed) = ParsedSymbol::from_symbol(symbol) else {
34 continue;
35 };
36 if parsed.underlying != underlying {
37 continue;
38 }
39
40 let Some(strike) = parsed.strike.to_f64() else {
41 panic!(
42 "STATE_CORRUPTION: strike {} for {} is not representable as f64",
43 parsed.strike, symbol
44 );
45 };
46 let expiry_ts =
47 hypercall_types::expiry_date_to_timestamp(&parsed.underlying, parsed.expiry) as i64;
48 let dedupe_key = (expiry_ts, parsed.strike.to_string());
49 if !seen.insert(dedupe_key) {
50 continue;
51 }
52
53 let iv = vol_oracle.get_iv(underlying, strike, expiry_ts).ok()?;
54 strike_points.push(VolPoint {
55 strike,
56 expiry: expiry_ts,
57 iv,
58 timestamp: now_ms,
59 });
60 }
61
62 if strike_points.is_empty() {
63 return None;
64 }
65
66 let expiries = strike_points
67 .iter()
68 .map(|point| point.expiry)
69 .collect::<BTreeSet<_>>()
70 .into_iter()
71 .collect();
72
73 Some(VolSurfaceSnapshot {
74 underlying: underlying.to_string(),
75 last_update_ts_ms: Some(now_ms),
76 expiries,
77 strike_points,
78 delta_curves: Vec::new(),
79 atm_vols: Vec::new(),
80 spot_price: None,
81 })
82}
83
84fn max_listed_option_expiry_ts_ms(
85 orderbooks: &std::collections::HashMap<String, hypercall_engine::OrderBook>,
86 underlying: &str,
87) -> Result<i64, String> {
88 orderbooks
89 .keys()
90 .filter_map(|symbol| ParsedSymbol::from_symbol(symbol).ok())
91 .filter(|parsed| parsed.underlying == underlying)
92 .map(|parsed| {
93 hypercall_types::expiry_date_to_timestamp_checked(&parsed.underlying, parsed.expiry)
94 .map_err(|error| {
95 format!(
96 "invalid listed option expiry {} for {}: {}",
97 parsed.expiry, parsed.underlying, error
98 )
99 })
100 .and_then(|expiry_ts| {
101 i64::try_from(expiry_ts)
102 .map_err(|_| "listed option expiry exceeds i64".to_string())?
103 .checked_mul(1_000)
104 .ok_or_else(|| "listed option expiry ms overflow".to_string())
105 })
106 })
107 .try_fold(None, |max: Option<i64>, expiry_ts_ms| {
108 let expiry_ts_ms = expiry_ts_ms?;
109 Ok::<_, String>(Some(
110 max.map_or(expiry_ts_ms, |current| current.max(expiry_ts_ms)),
111 ))
112 })?
113 .ok_or_else(|| format!("no listed option markets for underlying {underlying}"))
114}
115
116impl UnifiedEngine {
117 fn require_external_durable_mutation_uuid(command_type: &str, request_id: &str) -> uuid::Uuid {
120 uuid::Uuid::parse_str(request_id).unwrap_or_else(|error| {
121 panic!(
122 "RUNTIME_INVARIANT: external engine command {} request_id {} is not a UUID: {}",
123 command_type, request_id, error
124 )
125 })
126 }
127
128 fn parse_directive_domain_status(
129 value: &str,
130 ) -> Result<crate::directive_outbox::DirectiveDomainStatus, String> {
131 match value {
132 "accepted" => Ok(crate::directive_outbox::DirectiveDomainStatus::Accepted),
133 "rejected" => Ok(crate::directive_outbox::DirectiveDomainStatus::Rejected),
134 "pending_chain_effect" => {
135 Ok(crate::directive_outbox::DirectiveDomainStatus::PendingChainEffect)
136 }
137 "completed" => Ok(crate::directive_outbox::DirectiveDomainStatus::Completed),
138 "failed" => Ok(crate::directive_outbox::DirectiveDomainStatus::Failed),
139 other => Err(format!("unknown directive domain_status {}", other)),
140 }
141 }
142
143 fn parse_directive_delivery_status(
144 value: &str,
145 ) -> Result<crate::directive_outbox::DirectiveDeliveryStatus, String> {
146 match value {
147 "pending" => Ok(crate::directive_outbox::DirectiveDeliveryStatus::Pending),
148 "broadcasted" => Ok(crate::directive_outbox::DirectiveDeliveryStatus::Broadcasted),
149 "included" => Ok(crate::directive_outbox::DirectiveDeliveryStatus::Included),
150 "finalized" => Ok(crate::directive_outbox::DirectiveDeliveryStatus::Finalized),
151 "reverted" => Ok(crate::directive_outbox::DirectiveDeliveryStatus::Reverted),
152 "expired" => Ok(crate::directive_outbox::DirectiveDeliveryStatus::Expired),
153 "dead_lettered" => Ok(crate::directive_outbox::DirectiveDeliveryStatus::DeadLettered),
154 other => Err(format!("unknown directive delivery_status {}", other)),
155 }
156 }
157
158 fn persisted_withdrawal_directive_status(
159 &self,
160 request_id: &str,
161 ) -> Result<
162 Option<(
163 crate::directive_outbox::DirectiveDomainStatus,
164 crate::directive_outbox::DirectiveDeliveryStatus,
165 )>,
166 String,
167 > {
168 let Some(db) = self.ctx.db.as_ref() else {
169 warn!(
170 request_id = %request_id,
171 "Cannot read persisted directive status without DB handle; returning pending retry receipt"
172 );
173 return Ok(None);
174 };
175 let row = db
176 .get_directive_status_sync(request_id)
177 .map_err(|error| format!("failed to read directive status for {}: {}", request_id, error))?
178 .unwrap_or_else(|| {
179 panic!(
180 "JOURNAL_FATAL: withdrawal request_id {} is journaled but missing directive_outbox row",
181 request_id
182 )
183 });
184
185 Ok(Some((
186 Self::parse_directive_domain_status(&row.domain_status)?,
187 Self::parse_directive_delivery_status(&row.delivery_status)?,
188 )))
189 }
190
191 async fn external_option_command_already_journaled(
192 &self,
193 request_id: &str,
194 command_type: &str,
195 ) -> bool {
196 let request_uuid = Self::require_external_durable_mutation_uuid(command_type, request_id);
197 let Some(journal_writer) = self.journal_writer.clone() else {
198 return false;
199 };
200 let command_type = command_type.to_string();
201 let result =
202 tokio::task::spawn_blocking(move || journal_writer.get_by_request_id(&request_uuid))
203 .await
204 .unwrap_or_else(|error| {
205 panic!(
206 "JOURNAL_FATAL: idempotency lookup task failed for {}: {}",
207 request_id, error
208 )
209 })
210 .unwrap_or_else(|error| {
211 panic!(
212 "JOURNAL_FATAL: idempotency lookup failed for {}: {}",
213 request_id, error
214 )
215 });
216
217 match result {
218 Some(record) if record.command_type == command_type => true,
219 Some(record) => {
220 panic!(
221 "JOURNAL_FATAL: request_id {} already used for {}, not {}",
222 request_id, record.command_type, command_type
223 )
224 }
225 None => false,
226 }
227 }
228
229 async fn journal_external_option_position_command(
230 &self,
231 env: &crate::rsm::apply::CommandEnvelope,
232 request_id: &str,
233 command_type_enum: hypercall_db_diesel::engine_enums::CommandType,
234 outbox_appends: Vec<crate::directive_outbox::DirectiveOutboxAppend>,
235 ) {
236 use crate::journal::{JournalEntry, JournalMessage};
237 use crate::rsm::apply::EngineCommand;
238 use hypercall_db_diesel::engine_enums::DbUuid;
239
240 let request_uuid =
241 Self::require_external_durable_mutation_uuid(env.command.command_type(), request_id);
242 #[cfg(feature = "rsm-state")]
243 let command_identity_hash = env.command.identity_hash();
244 let command_data = match &env.command {
245 EngineCommand::OptionDepositUpdate {
246 request_id,
247 wallet,
248 symbol,
249 quantity,
250 timestamp_ms,
251 } => hypercall_types::serialize_to_wire_bytes(&(
252 request_id,
253 wallet,
254 symbol,
255 quantity,
256 timestamp_ms,
257 )),
258 EngineCommand::OptionWithdrawalUpdate {
259 request_id,
260 wallet,
261 account,
262 signer,
263 rsm_signer,
264 symbol,
265 quantity,
266 nonce,
267 action,
268 timestamp_ms,
269 } => hypercall_types::serialize_to_wire_bytes(&(
270 request_id,
271 wallet,
272 account,
273 signer,
274 rsm_signer,
275 symbol,
276 quantity,
277 nonce,
278 action,
279 timestamp_ms,
280 )),
281 other => panic!(
282 "RUNTIME_INVARIANT: journal_external_option_position_command called for {}",
283 other.command_type()
284 ),
285 };
286
287 if let Some(ref batch_sender) = self.journal_batch_sender {
288 let (ack_tx, ack_rx) = tokio::sync::oneshot::channel();
289 let entry = JournalEntry {
290 received_ts_ms: env.received_ts_ms,
291 command_data,
292 response_data: None,
293 order_id: None,
294 pre_digest: Default::default(),
295 post_digest: Default::default(),
296 duration_ms: 0,
297 events: Vec::new(),
298 outbox_appends,
299 fill_side_effects: Vec::new(),
300 cash_withdrawal_side_effect: None,
301 balance_updates: Vec::new(),
302 created_at: Instant::now(),
303 commit_ack: Some(ack_tx),
304 request_uuid: DbUuid(request_uuid),
305 command_type_enum: Some(command_type_enum),
306 #[cfg(feature = "rsm-state")]
307 command_identity_hash,
308 #[cfg(feature = "rsm-state")]
309 rsm_state_digest: None,
310 };
311 if let Err(error) = batch_sender.send(JournalMessage::Entry(entry)).await {
312 panic!(
313 "JOURNAL_FATAL: failed to journal {} for request_id {}: {}",
314 env.command.command_type(),
315 request_id,
316 error
317 );
318 }
319 if ack_rx.await.is_err() {
320 panic!(
321 "CRITICAL_FAILURE: journal commit_ack dropped for {} request_id {}. \
322 Durability boundary is unknown after state mutation.",
323 env.command.command_type(),
324 request_id
325 );
326 }
327 return;
328 }
329
330 let Some(ref journal_writer) = self.journal_writer else {
331 panic!(
332 "JOURNAL_FATAL: no journal configured for {} request_id {}",
333 env.command.command_type(),
334 request_id
335 );
336 };
337 if !outbox_appends.is_empty() {
338 panic!(
339 "JOURNAL_FATAL: synchronous journal path cannot persist directive outbox appends for {} request_id {}",
340 env.command.command_type(),
341 request_id
342 );
343 }
344 let balance_updates = Vec::new();
345 journal_writer
346 .append_transition_with_fill_side_effects(
347 env.received_ts_ms,
348 &command_data,
349 None,
350 None,
351 &Default::default(),
352 &Default::default(),
353 0,
354 &[],
355 &[],
356 &balance_updates,
357 DbUuid(request_uuid),
358 Some(command_type_enum),
359 )
360 .unwrap_or_else(|error| {
361 panic!(
362 "JOURNAL_FATAL: failed to journal {} for request_id {}: {}",
363 env.command.command_type(),
364 request_id,
365 error
366 )
367 });
368 }
369
370 async fn journal_external_balance_update_command(
371 &self,
372 env: &crate::rsm::apply::CommandEnvelope,
373 request_id: &str,
374 outbox_appends: Vec<crate::directive_outbox::DirectiveOutboxAppend>,
375 balance_updates: Vec<hypercall_types::BalanceUpdate>,
376 ) {
377 use crate::journal::{JournalEntry, JournalMessage};
378 use crate::rsm::apply::{BalanceCommandPayload, DepositUpdatePayload, EngineCommand};
379 use hypercall_db_diesel::engine_enums::{CommandType, DbUuid};
380
381 let request_uuid =
382 Self::require_external_durable_mutation_uuid(env.command.command_type(), request_id);
383 let (command_data, command_type_enum) = match &env.command {
384 EngineCommand::DepositUpdate {
385 wallet,
386 amount,
387 timestamp_ms,
388 sequence,
389 source_event_hash,
390 } => {
391 let Some(sequence) = sequence else {
392 panic!(
393 "RUNTIME_INVARIANT: DepositUpdate {} missing durable sequence",
394 request_id
395 );
396 };
397 (
398 hypercall_types::serialize_to_wire_bytes(&DepositUpdatePayload {
399 wallet: *wallet,
400 amount: *amount,
401 timestamp_ms: *timestamp_ms,
402 sequence: *sequence,
403 source_event_hash: source_event_hash.clone(),
404 }),
405 CommandType::DepositUpdate,
406 )
407 }
408 EngineCommand::LiquidationBonusUpdate {
409 wallet,
410 amount,
411 balance_after,
412 timestamp_ms,
413 sequence,
414 } => (
415 hypercall_types::serialize_to_wire_bytes(&BalanceCommandPayload {
416 wallet: *wallet,
417 amount: *amount,
418 balance_after: *balance_after,
419 timestamp_ms: *timestamp_ms,
420 sequence: *sequence,
421 }),
422 CommandType::LiquidationBonusUpdate,
423 ),
424 _ => {
425 panic!(
426 "RUNTIME_INVARIANT: journal_external_balance_update_command called for {}",
427 env.command.command_type()
428 );
429 }
430 };
431 #[cfg(feature = "rsm-state")]
432 let command_identity_hash = env.command.identity_hash();
433
434 if let Some(ref batch_sender) = self.journal_batch_sender {
435 let (ack_tx, ack_rx) = tokio::sync::oneshot::channel();
436 let entry = JournalEntry {
437 received_ts_ms: env.received_ts_ms,
438 command_data,
439 response_data: None,
440 order_id: None,
441 pre_digest: Default::default(),
442 post_digest: Default::default(),
443 duration_ms: 0,
444 events: Vec::new(),
445 outbox_appends,
446 fill_side_effects: Vec::new(),
447 cash_withdrawal_side_effect: None,
448 balance_updates,
449 created_at: Instant::now(),
450 commit_ack: Some(ack_tx),
451 request_uuid: DbUuid(request_uuid),
452 command_type_enum: Some(command_type_enum),
453 #[cfg(feature = "rsm-state")]
454 command_identity_hash,
455 #[cfg(feature = "rsm-state")]
456 rsm_state_digest: None,
457 };
458 if let Err(error) = batch_sender.send(JournalMessage::Entry(entry)).await {
459 panic!(
460 "JOURNAL_FATAL: failed to journal {} for request_id {}: {}",
461 env.command.command_type(),
462 request_id,
463 error
464 );
465 }
466 if ack_rx.await.is_err() {
467 panic!(
468 "CRITICAL_FAILURE: journal commit_ack dropped for {} request_id {}. \
469 Durability boundary is unknown after state mutation.",
470 env.command.command_type(),
471 request_id
472 );
473 }
474 return;
475 }
476
477 let Some(ref journal_writer) = self.journal_writer else {
478 panic!(
479 "JOURNAL_FATAL: no journal configured for {} request_id {}",
480 env.command.command_type(),
481 request_id
482 );
483 };
484 if !outbox_appends.is_empty() {
485 panic!(
486 "JOURNAL_FATAL: synchronous journal path cannot persist directive outbox appends for {} request_id {}",
487 env.command.command_type(),
488 request_id
489 );
490 }
491 journal_writer
492 .append_transition_with_fill_side_effects(
493 env.received_ts_ms,
494 &command_data,
495 None,
496 None,
497 &Default::default(),
498 &Default::default(),
499 0,
500 &[],
501 &[],
502 &balance_updates,
503 DbUuid(request_uuid),
504 Some(command_type_enum),
505 )
506 .unwrap_or_else(|error| {
507 panic!(
508 "JOURNAL_FATAL: failed to journal {} for request_id {}: {}",
509 env.command.command_type(),
510 request_id,
511 error
512 )
513 });
514 }
515
516 fn pm_settlement_request_id(command: &crate::rsm::apply::EngineCommand) -> Option<uuid::Uuid> {
517 match command {
518 crate::rsm::apply::EngineCommand::SetPmSettlementPoolConfig(command) => {
519 Some(command.request_id)
520 }
521 crate::rsm::apply::EngineCommand::RecordPmVaultDeposit(command) => {
522 Some(command.request_id)
523 }
524 crate::rsm::apply::EngineCommand::RequestPmVaultWithdrawal(command) => {
525 Some(command.request_id)
526 }
527 crate::rsm::apply::EngineCommand::AccruePmSettlementInterest(command) => {
528 Some(command.request_id)
529 }
530 crate::rsm::apply::EngineCommand::ApplyPmSettlementRepayment(command) => {
531 Some(command.request_id)
532 }
533 crate::rsm::apply::EngineCommand::JournalPmRecoveryPlan(command) => {
534 Some(command.request_id)
535 }
536 crate::rsm::apply::EngineCommand::MarkPmRecoveryActionSubmitted(command) => {
537 Some(command.request_id)
538 }
539 crate::rsm::apply::EngineCommand::ResolvePmRecoveryAction(command) => {
540 Some(command.request_id)
541 }
542 _ => None,
543 }
544 }
545
546 fn preflight_pm_settlement_admin_command(
547 &self,
548 command: &crate::rsm::apply::EngineCommand,
549 ) -> Result<(), String> {
550 let mut pm_state = self.ctx.pm_settlement_state.clone();
551 match command.clone() {
552 crate::rsm::apply::EngineCommand::SetPmSettlementPoolConfig(command) => {
553 pm_state.apply_set_config(command).map(|_| ())
554 }
555 crate::rsm::apply::EngineCommand::RecordPmVaultDeposit(command) => {
556 let engine_max_expiry_ts_ms =
557 max_listed_option_expiry_ts_ms(&self.ctx.orderbooks, &command.underlying)?;
558 if command.max_listed_expiry_ts_ms != engine_max_expiry_ts_ms {
559 return Err(format!(
560 "RecordPmVaultDeposit max_listed_expiry_ts_ms {} does not match engine max listed expiry {} for {}",
561 command.max_listed_expiry_ts_ms,
562 engine_max_expiry_ts_ms,
563 command.underlying
564 ));
565 }
566 pm_state.apply_record_vault_deposit(command).map(|_| ())
567 }
568 crate::rsm::apply::EngineCommand::RequestPmVaultWithdrawal(command) => {
569 pm_state.apply_request_vault_withdrawal(command).map(|_| ())
570 }
571 crate::rsm::apply::EngineCommand::AccruePmSettlementInterest(command) => {
572 pm_state.apply_accrue_interest(command).map(|_| ())
573 }
574 crate::rsm::apply::EngineCommand::ApplyPmSettlementRepayment(command) => {
575 pm_state.apply_repayment(command).map(|_| ())
576 }
577 crate::rsm::apply::EngineCommand::JournalPmRecoveryPlan(command) => {
578 pm_state.apply_journal_recovery_plan(command).map(|_| ())
579 }
580 crate::rsm::apply::EngineCommand::MarkPmRecoveryActionSubmitted(command) => pm_state
581 .apply_mark_recovery_action_submitted(command)
582 .map(|_| ()),
583 crate::rsm::apply::EngineCommand::ResolvePmRecoveryAction(command) => {
584 pm_state.apply_resolve_recovery_action(command).map(|_| ())
585 }
586 other => Err(format!(
587 "unsupported PM settlement admin command {}",
588 other.command_type()
589 )),
590 }
591 }
592
593 fn stamp_pm_settlement_admin_command(
594 command: crate::rsm::apply::EngineCommand,
595 timestamp_ms: u64,
596 ) -> crate::rsm::apply::EngineCommand {
597 match command {
598 crate::rsm::apply::EngineCommand::SetPmSettlementPoolConfig(mut command) => {
599 command.timestamp_ms = timestamp_ms;
600 crate::rsm::apply::EngineCommand::SetPmSettlementPoolConfig(command)
601 }
602 crate::rsm::apply::EngineCommand::RecordPmVaultDeposit(mut command) => {
603 command.timestamp_ms = timestamp_ms;
604 crate::rsm::apply::EngineCommand::RecordPmVaultDeposit(command)
605 }
606 crate::rsm::apply::EngineCommand::RequestPmVaultWithdrawal(mut command) => {
607 command.timestamp_ms = timestamp_ms;
608 crate::rsm::apply::EngineCommand::RequestPmVaultWithdrawal(command)
609 }
610 crate::rsm::apply::EngineCommand::AccruePmSettlementInterest(mut command) => {
611 command.timestamp_ms = timestamp_ms;
612 crate::rsm::apply::EngineCommand::AccruePmSettlementInterest(command)
613 }
614 crate::rsm::apply::EngineCommand::ApplyPmSettlementRepayment(mut command) => {
615 command.timestamp_ms = timestamp_ms;
616 crate::rsm::apply::EngineCommand::ApplyPmSettlementRepayment(command)
617 }
618 crate::rsm::apply::EngineCommand::JournalPmRecoveryPlan(mut command) => {
619 command.timestamp_ms = timestamp_ms;
620 crate::rsm::apply::EngineCommand::JournalPmRecoveryPlan(command)
621 }
622 crate::rsm::apply::EngineCommand::MarkPmRecoveryActionSubmitted(mut command) => {
623 command.timestamp_ms = timestamp_ms;
624 crate::rsm::apply::EngineCommand::MarkPmRecoveryActionSubmitted(command)
625 }
626 crate::rsm::apply::EngineCommand::ResolvePmRecoveryAction(mut command) => {
627 command.timestamp_ms = timestamp_ms;
628 crate::rsm::apply::EngineCommand::ResolvePmRecoveryAction(command)
629 }
630 command => command,
631 }
632 }
633
634 async fn journal_pm_settlement_admin_command(
635 &self,
636 env: &crate::rsm::apply::CommandEnvelope,
637 request_uuid: uuid::Uuid,
638 ) {
639 use crate::engine_enums_ext::CommandTypeExt;
640 use crate::journal::{JournalEntry, JournalMessage};
641 use hypercall_db_diesel::engine_enums::{CommandType, DbUuid};
642
643 let command_type_enum = CommandType::from_command(&env.command);
644 let command_data = match &env.command {
645 crate::rsm::apply::EngineCommand::SetPmSettlementPoolConfig(command) => {
646 hypercall_types::serialize_to_wire_bytes(command)
647 }
648 crate::rsm::apply::EngineCommand::RecordPmVaultDeposit(command) => {
649 hypercall_types::serialize_to_wire_bytes(command)
650 }
651 crate::rsm::apply::EngineCommand::RequestPmVaultWithdrawal(command) => {
652 hypercall_types::serialize_to_wire_bytes(command)
653 }
654 crate::rsm::apply::EngineCommand::AccruePmSettlementInterest(command) => {
655 hypercall_types::serialize_to_wire_bytes(command)
656 }
657 crate::rsm::apply::EngineCommand::ApplyPmSettlementRepayment(command) => {
658 hypercall_types::serialize_to_wire_bytes(command)
659 }
660 crate::rsm::apply::EngineCommand::JournalPmRecoveryPlan(command) => {
661 hypercall_types::serialize_to_wire_bytes(command)
662 }
663 crate::rsm::apply::EngineCommand::MarkPmRecoveryActionSubmitted(command) => {
664 hypercall_types::serialize_to_wire_bytes(command)
665 }
666 crate::rsm::apply::EngineCommand::ResolvePmRecoveryAction(command) => {
667 hypercall_types::serialize_to_wire_bytes(command)
668 }
669 _ => {
670 panic!(
671 "RUNTIME_INVARIANT: journal_pm_settlement_admin_command called for {}",
672 env.command.command_type()
673 );
674 }
675 };
676 let request_id = request_uuid.to_string();
677 #[cfg(feature = "rsm-state")]
678 let command_identity_hash = env.command.identity_hash();
679
680 if let Some(ref batch_sender) = self.journal_batch_sender {
681 let (ack_tx, ack_rx) = tokio::sync::oneshot::channel();
682 let entry = JournalEntry {
683 received_ts_ms: env.received_ts_ms,
684 command_data,
685 response_data: None,
686 order_id: None,
687 pre_digest: Default::default(),
688 post_digest: Default::default(),
689 duration_ms: 0,
690 events: Vec::new(),
691 outbox_appends: Vec::new(),
692 fill_side_effects: Vec::new(),
693 cash_withdrawal_side_effect: None,
694 balance_updates: Vec::new(),
695 created_at: Instant::now(),
696 commit_ack: Some(ack_tx),
697 request_uuid: DbUuid(request_uuid),
698 command_type_enum: Some(command_type_enum),
699 #[cfg(feature = "rsm-state")]
700 command_identity_hash,
701 #[cfg(feature = "rsm-state")]
702 rsm_state_digest: None,
703 };
704 if let Err(error) = batch_sender.send(JournalMessage::Entry(entry)).await {
705 panic!(
706 "JOURNAL_FATAL: failed to journal {} for request_id {}: {}",
707 env.command.command_type(),
708 request_id,
709 error
710 );
711 }
712 if ack_rx.await.is_err() {
713 panic!(
714 "CRITICAL_FAILURE: journal commit_ack dropped for {} request_id {}. \
715 Durability boundary is unknown after state mutation.",
716 env.command.command_type(),
717 request_id
718 );
719 }
720 return;
721 }
722
723 let Some(ref journal_writer) = self.journal_writer else {
724 panic!(
725 "JOURNAL_FATAL: no journal configured for {} request_id {}",
726 env.command.command_type(),
727 request_id
728 );
729 };
730 journal_writer
731 .append_transition_with_fill_side_effects(
732 env.received_ts_ms,
733 &command_data,
734 None,
735 None,
736 &Default::default(),
737 &Default::default(),
738 0,
739 &[],
740 &[],
741 &[],
742 DbUuid(request_uuid),
743 Some(command_type_enum),
744 )
745 .unwrap_or_else(|error| {
746 panic!(
747 "JOURNAL_FATAL: failed to journal {} for request_id {}: {}",
748 env.command.command_type(),
749 request_id,
750 error
751 )
752 });
753 }
754
755 async fn journal_agent_auth_command(&self, env: &crate::rsm::apply::CommandEnvelope) {
756 use crate::journal::{JournalEntry, JournalMessage};
757 use crate::rsm::apply::{ApproveAgentPayload, EngineCommand, RevokeAgentPayload};
758 use hypercall_db_diesel::engine_enums::{CommandType, DbUuid};
759
760 #[cfg(feature = "rsm-state")]
761 let command_identity_hash = env.command.identity_hash();
762
763 let (command_data, command_type_enum) = match &env.command {
764 EngineCommand::ApproveAgent {
765 wallet,
766 agent,
767 expires_at_ms,
768 nonce,
769 timestamp_ms,
770 } => (
771 hypercall_types::serialize_to_wire_bytes(&ApproveAgentPayload {
772 wallet: *wallet,
773 agent: *agent,
774 expires_at_ms: *expires_at_ms,
775 nonce: *nonce,
776 timestamp_ms: *timestamp_ms,
777 }),
778 CommandType::ApproveAgent,
779 ),
780 EngineCommand::RevokeAgent {
781 wallet,
782 agent,
783 nonce,
784 timestamp_ms,
785 } => (
786 hypercall_types::serialize_to_wire_bytes(&RevokeAgentPayload {
787 wallet: *wallet,
788 agent: *agent,
789 nonce: *nonce,
790 timestamp_ms: *timestamp_ms,
791 }),
792 CommandType::RevokeAgent,
793 ),
794 other => panic!(
795 "RUNTIME_INVARIANT: journal_agent_auth_command called for {}",
796 other.command_type()
797 ),
798 };
799
800 if let Some(ref batch_sender) = self.journal_batch_sender {
801 let (ack_tx, ack_rx) = tokio::sync::oneshot::channel();
802 let entry = JournalEntry {
803 received_ts_ms: env.received_ts_ms,
804 command_data,
805 response_data: None,
806 order_id: None,
807 pre_digest: Default::default(),
808 post_digest: Default::default(),
809 duration_ms: 0,
810 events: Vec::new(),
811 outbox_appends: Vec::new(),
812 fill_side_effects: Vec::new(),
813 cash_withdrawal_side_effect: None,
814 balance_updates: Vec::new(),
815 created_at: Instant::now(),
816 commit_ack: Some(ack_tx),
817 request_uuid: DbUuid(uuid::Uuid::new_v4()),
818 command_type_enum: Some(command_type_enum),
819 #[cfg(feature = "rsm-state")]
820 command_identity_hash,
821 #[cfg(feature = "rsm-state")]
822 rsm_state_digest: None,
823 };
824 if let Err(error) = batch_sender.send(JournalMessage::Entry(entry)).await {
825 panic!(
826 "JOURNAL_FATAL: failed to journal {}: {}",
827 env.command.command_type(),
828 error
829 );
830 }
831 if ack_rx.await.is_err() {
832 panic!(
833 "CRITICAL_FAILURE: journal commit_ack dropped for {}. \
834 Durability boundary is unknown after state mutation.",
835 env.command.command_type()
836 );
837 }
838 return;
839 }
840
841 let Some(ref journal_writer) = self.journal_writer else {
842 panic!(
843 "JOURNAL_FATAL: no journal configured for {}",
844 env.command.command_type()
845 );
846 };
847 journal_writer
848 .append_transition(
849 env.received_ts_ms,
850 &command_data,
851 None,
852 None,
853 &Default::default(),
854 &Default::default(),
855 0,
856 &[],
857 DbUuid(uuid::Uuid::new_v4()),
858 Some(command_type_enum),
859 )
860 .unwrap_or_else(|error| {
861 panic!(
862 "JOURNAL_FATAL: failed to journal {}: {}",
863 env.command.command_type(),
864 error
865 )
866 });
867 }
868
869 fn agent_auth_journal_available(&self) -> bool {
870 self.journal_batch_sender.is_some()
871 || self
872 .journal_writer
873 .as_ref()
874 .is_some_and(|writer| writer.is_durable())
875 }
876
877 pub(crate) fn withdrawal_directive_journal_unavailable_error(
878 &self,
879 command_type: &str,
880 request_id: &str,
881 ) -> Option<String> {
882 if self.journal_batch_sender.is_some() {
883 return None;
884 }
885 Some(format!(
886 "{command_type} requires ENGINE_JOURNAL_ENABLED=true because directive outbox appends \
887 must be persisted atomically with the state transition; request_id {request_id} was not applied"
888 ))
889 }
890
891 async fn journal_external_cash_withdrawal_command(
892 &self,
893 env: &crate::rsm::apply::CommandEnvelope,
894 request_id: &str,
895 outbox_appends: Vec<crate::directive_outbox::DirectiveOutboxAppend>,
896 wallet: hypercall_types::WalletAddress,
897 amount: rust_decimal::Decimal,
898 balance_after: rust_decimal::Decimal,
899 timestamp_ms: u64,
900 balance_updates: Vec<hypercall_types::BalanceUpdate>,
901 ) {
902 use crate::journal::engine_journal_batcher::{
903 JournalCashWithdrawalSideEffect, JournalEntry, JournalMessage,
904 };
905 use crate::rsm::apply::EngineCommand;
906 use hypercall_db_diesel::engine_enums::{CommandType, DbUuid};
907
908 let request_uuid =
909 Self::require_external_durable_mutation_uuid("CashWithdrawalUpdate", request_id);
910 let EngineCommand::CashWithdrawalUpdate {
911 request_id: ref rid,
912 wallet: w,
913 account: a,
914 destination: d,
915 signer: s,
916 rsm_signer: rs,
917 amount: amt,
918 amount_wei: aw,
919 nonce: n,
920 timestamp_ms: ts,
921 } = env.command
922 else {
923 panic!(
924 "RUNTIME_INVARIANT: journal_external_cash_withdrawal_command called for {}",
925 env.command.command_type()
926 );
927 };
928 let command_data = hypercall_types::serialize_to_wire_bytes(&(
929 rid, &w, &a, &d, &s, &rs, &amt, &aw, &n, &ts,
930 ));
931 #[cfg(feature = "rsm-state")]
932 let command_identity_hash = env.command.identity_hash();
933
934 let side_effect = JournalCashWithdrawalSideEffect {
935 wallet,
936 request_id: request_id.to_string(),
937 amount,
938 balance_after,
939 timestamp_ms,
940 };
941
942 if let Some(ref batch_sender) = self.journal_batch_sender {
943 let (ack_tx, ack_rx) = tokio::sync::oneshot::channel();
944 let entry = JournalEntry {
945 received_ts_ms: env.received_ts_ms,
946 command_data,
947 response_data: None,
948 order_id: None,
949 pre_digest: Default::default(),
950 post_digest: Default::default(),
951 duration_ms: 0,
952 events: Vec::new(),
953 outbox_appends,
954 fill_side_effects: Vec::new(),
955 cash_withdrawal_side_effect: Some(side_effect),
956 balance_updates,
957 created_at: std::time::Instant::now(),
958 commit_ack: Some(ack_tx),
959 request_uuid: DbUuid(request_uuid),
960 command_type_enum: Some(CommandType::CashWithdrawalUpdate),
961 #[cfg(feature = "rsm-state")]
962 command_identity_hash,
963 #[cfg(feature = "rsm-state")]
964 rsm_state_digest: None,
965 };
966 if let Err(error) = batch_sender.send(JournalMessage::Entry(entry)).await {
967 panic!(
968 "JOURNAL_FATAL: failed to journal CashWithdrawalUpdate for request_id {}: {}",
969 request_id, error
970 );
971 }
972 if ack_rx.await.is_err() {
973 panic!(
974 "CRITICAL_FAILURE: journal commit_ack dropped for CashWithdrawalUpdate request_id {}. \
975 Durability boundary is unknown after state mutation.",
976 request_id
977 );
978 }
979 return;
980 }
981
982 let Some(ref journal_writer) = self.journal_writer else {
983 panic!(
984 "JOURNAL_FATAL: no journal configured for CashWithdrawalUpdate request_id {}",
985 request_id
986 );
987 };
988 if !outbox_appends.is_empty() {
989 panic!(
990 "JOURNAL_FATAL: synchronous journal path cannot persist directive outbox appends for CashWithdrawalUpdate request_id {}",
991 request_id
992 );
993 }
994 journal_writer
995 .append_transition_with_fill_side_effects(
996 env.received_ts_ms,
997 &command_data,
998 None,
999 None,
1000 &Default::default(),
1001 &Default::default(),
1002 0,
1003 &[],
1004 &[],
1005 &balance_updates,
1006 DbUuid(request_uuid),
1007 Some(CommandType::CashWithdrawalUpdate),
1008 )
1009 .unwrap_or_else(|error| {
1010 panic!(
1011 "JOURNAL_FATAL: failed to journal CashWithdrawalUpdate for request_id {}: {}",
1012 request_id, error
1013 )
1014 });
1015 }
1016
1017 async fn prepare_tick_expiry_env(
1018 &self,
1019 now_ms: u64,
1020 ) -> Result<crate::rsm::apply::CommandEnvelope, EngineError> {
1021 let context = self
1022 .expiry_manager
1023 .prepare_tick_expiry_context(now_ms, &self.ctx, &self.margin_manager)
1024 .await
1025 .map_err(EngineError::Internal)?;
1026 self.validate_tick_expiry_nats_payload(now_ms, &context)?;
1027 Ok(crate::rsm::apply::CommandEnvelope::new(
1028 now_ms,
1029 crate::rsm::apply::EngineCommand::TickExpiry { now_ms, context },
1030 ))
1031 }
1032
1033 fn replay_owned_expiry_command_payload(
1034 env: &crate::rsm::apply::CommandEnvelope,
1035 ) -> Option<(
1036 hypercall_db_diesel::engine_enums::CommandType,
1037 Vec<u8>,
1038 &'static str,
1039 )> {
1040 match &env.command {
1041 crate::rsm::apply::EngineCommand::MarketAction(command)
1042 if matches!(
1043 command.message.action,
1044 hypercall_types::MarketAction::ExpireMarket
1045 ) =>
1046 {
1047 Some((
1048 hypercall_db_diesel::engine_enums::CommandType::ExpireMarket,
1049 hypercall_types::serialize_to_wire_bytes(command),
1050 "ExpireMarket",
1051 ))
1052 }
1053 crate::rsm::apply::EngineCommand::TickExpiry { now_ms, context }
1054 if !context.due_expiries.is_empty() || !context.pending_settlements.is_empty() =>
1055 {
1056 Some((
1057 hypercall_db_diesel::engine_enums::CommandType::TickExpiry,
1058 hypercall_types::serialize_to_wire_bytes(&(*now_ms, context)),
1059 "TickExpiry",
1060 ))
1061 }
1062 _ => None,
1063 }
1064 }
1065
1066 async fn journal_replay_owned_expiry_command(
1067 &self,
1068 env: &crate::rsm::apply::CommandEnvelope,
1069 balance_updates: &[hypercall_types::BalanceUpdate],
1070 ) {
1071 use crate::journal::JournalMessage;
1072
1073 if let Some(ref batch_sender) = self.journal_batch_sender {
1074 let (ack_tx, ack_rx) = tokio::sync::oneshot::channel();
1075 let Some((entry, command_label)) = Self::replay_owned_expiry_journal_entry(
1076 env,
1077 balance_updates,
1078 hypercall_db_diesel::engine_enums::DbUuid(uuid::Uuid::now_v7()),
1079 Some(ack_tx),
1080 ) else {
1081 return;
1082 };
1083
1084 if let Err(error) = batch_sender.send(JournalMessage::Entry(entry)).await {
1085 panic!(
1086 "JOURNAL_FATAL: failed to enqueue {} for replay journal: {}",
1087 command_label, error
1088 );
1089 }
1090 if ack_rx.await.is_err() {
1091 panic!(
1092 "JOURNAL_FATAL: replay journal commit ack dropped for {}",
1093 command_label
1094 );
1095 }
1096 return;
1097 }
1098
1099 let Some((entry, command_label)) = Self::replay_owned_expiry_journal_entry(
1100 env,
1101 balance_updates,
1102 hypercall_db_diesel::engine_enums::DbUuid(uuid::Uuid::now_v7()),
1103 None,
1104 ) else {
1105 return;
1106 };
1107
1108 let Some(ref journal_writer) = self.journal_writer else {
1109 if self.ctx.db.is_some() {
1110 panic!("JOURNAL_FATAL: no journal configured for {}", command_label);
1111 }
1112 return;
1113 };
1114 journal_writer
1115 .append_transition_with_fill_side_effects(
1116 entry.received_ts_ms,
1117 &entry.command_data,
1118 None,
1119 None,
1120 &Default::default(),
1121 &Default::default(),
1122 0,
1123 &[],
1124 &[],
1125 &entry.balance_updates,
1126 entry.request_uuid,
1127 entry.command_type_enum,
1128 )
1129 .unwrap_or_else(|error| {
1130 panic!(
1131 "JOURNAL_FATAL: failed to journal {} for replay: {}",
1132 command_label, error
1133 )
1134 });
1135 }
1136
1137 fn replay_owned_expiry_journal_entry(
1138 env: &crate::rsm::apply::CommandEnvelope,
1139 balance_updates: &[hypercall_types::BalanceUpdate],
1140 request_uuid: hypercall_db_diesel::engine_enums::DbUuid,
1141 commit_ack: Option<tokio::sync::oneshot::Sender<()>>,
1142 ) -> Option<(crate::journal::JournalEntry, &'static str)> {
1143 let (command_type_enum, command_data, command_label) =
1144 Self::replay_owned_expiry_command_payload(env)?;
1145 #[cfg(feature = "rsm-state")]
1146 let command_identity_hash = env.command.identity_hash();
1147
1148 Some((
1149 crate::journal::JournalEntry {
1150 received_ts_ms: env.received_ts_ms,
1151 command_data,
1152 response_data: None,
1153 order_id: None,
1154 pre_digest: Default::default(),
1155 post_digest: Default::default(),
1156 duration_ms: 0,
1157 events: Vec::new(),
1158 outbox_appends: Vec::new(),
1159 fill_side_effects: Vec::new(),
1160 cash_withdrawal_side_effect: None,
1161 balance_updates: balance_updates.to_vec(),
1162 created_at: Instant::now(),
1163 commit_ack,
1164 request_uuid,
1165 command_type_enum: Some(command_type_enum),
1166 #[cfg(feature = "rsm-state")]
1167 command_identity_hash,
1168 #[cfg(feature = "rsm-state")]
1169 rsm_state_digest: None,
1170 },
1171 command_label,
1172 ))
1173 }
1174
1175 fn validate_tick_expiry_nats_payload(
1176 &self,
1177 now_ms: u64,
1178 context: &crate::rsm::apply::TickExpiryContext,
1179 ) -> Result<(), EngineError> {
1180 if self.nats_publisher.is_none() {
1181 return Ok(());
1182 }
1183
1184 let command_data = hypercall_types::serialize_to_wire_bytes(&(now_ms, context));
1185 let total_payload_len = crate::nats::COMMAND_PAYLOAD_PREFIX_LEN + command_data.len();
1186 if total_payload_len > super::MAX_EXPIRY_NATS_PAYLOAD_BYTES {
1187 return Err(EngineError::Internal(format!(
1188 "TickExpiry NATS payload is {} bytes, exceeding {} byte limit. due_groups={}, pending_groups={}, settlement_prices={}, margin_modes={}",
1189 total_payload_len,
1190 super::MAX_EXPIRY_NATS_PAYLOAD_BYTES,
1191 context.due_expiries.len(),
1192 context.pending_settlements.len(),
1193 context.settlement_prices.len(),
1194 context.margin_modes.len()
1195 )));
1196 }
1197
1198 Ok(())
1199 }
1200
1201 fn validate_manual_market_expiry_nats_payload(
1202 &self,
1203 command: &crate::rsm::apply::MarketActionCommand,
1204 ) -> Result<(), EngineError> {
1205 if self.nats_publisher.is_none() || command.expiry_context.is_none() {
1206 return Ok(());
1207 }
1208
1209 let command_data = hypercall_types::serialize_to_wire_bytes(command);
1210 let total_payload_len = crate::nats::COMMAND_PAYLOAD_PREFIX_LEN + command_data.len();
1211 if total_payload_len > super::MAX_EXPIRY_NATS_PAYLOAD_BYTES {
1212 let context = command.expiry_context.as_ref().expect("checked above");
1213 return Err(EngineError::Internal(format!(
1214 "ExpireMarket NATS payload is {} bytes, exceeding {} byte limit for {}. due_groups={}, settlement_prices={}, margin_modes={}",
1215 total_payload_len,
1216 super::MAX_EXPIRY_NATS_PAYLOAD_BYTES,
1217 command.message.market.symbol,
1218 context.due_expiries.len(),
1219 context.settlement_prices.len(),
1220 context.margin_modes.len()
1221 )));
1222 }
1223
1224 Ok(())
1225 }
1226
1227 async fn prepare_manual_market_expiry_context(
1228 &self,
1229 message: &MarketActionMessage,
1230 ) -> Result<crate::rsm::apply::TickExpiryContext, EngineError> {
1231 let expiry_ts = crate::rsm::margin_manager::expiry_date_to_timestamp(
1232 &message.market.underlying,
1233 message.market.expiry,
1234 );
1235 let settlement_price = self
1236 .expiry_manager
1237 .get_settlement_price(&self.ctx.deps, &message.market.underlying, expiry_ts as i64)
1238 .await;
1239 let settlement_prices = match settlement_price {
1240 Some(price) => {
1241 vec![crate::rsm::apply::TickExpirySettlementPrice {
1242 underlying: message.market.underlying.clone(),
1243 expiry_ts: expiry_ts as i64,
1244 price,
1245 }]
1246 }
1247 None => Vec::new(),
1248 };
1249
1250 let mut margin_modes = Vec::new();
1251 if !settlement_prices.is_empty() {
1252 for ((wallet, symbol), position) in &self.ctx.engine_positions {
1253 if symbol != &message.market.symbol
1254 || position.quantity == rust_decimal::Decimal::ZERO
1255 {
1256 continue;
1257 }
1258 let margin_mode = self
1259 .ctx
1260 .deps
1261 .wallet_margin_modes
1262 .get(wallet)
1263 .copied()
1264 .ok_or_else(|| {
1265 EngineError::Internal(format!(
1266 "Missing margin mode for {} while preparing manual expiry for {}",
1267 wallet, message.market.symbol
1268 ))
1269 })?;
1270 margin_modes.push(crate::rsm::apply::TickExpiryWalletMarginMode {
1271 wallet: *wallet,
1272 margin_mode,
1273 pm_settlement_required: margin_mode
1274 == hypercall_margin::margin_mode::MarginMode::Portfolio
1275 && self.ctx.deps.portfolio_margin_pool_enabled
1276 && self
1277 .ctx
1278 .deps
1279 .portfolio_margin_settlement_allowlist
1280 .contains(wallet),
1281 });
1282 }
1283 }
1284 margin_modes.sort_by(|left, right| left.wallet.as_bytes().cmp(right.wallet.as_bytes()));
1285 margin_modes.dedup_by_key(|mode| mode.wallet);
1286
1287 let due_expiries = vec![crate::rsm::apply::TickExpiryDueGroup {
1288 expiry_ts: expiry_ts as i64,
1289 symbols: vec![message.market.symbol.clone()],
1290 }];
1291 let pending_settlements = Vec::new();
1292 let pm_settlements = self
1293 .expiry_manager
1294 .prepare_pm_settlements(
1295 &due_expiries,
1296 &pending_settlements,
1297 &settlement_prices,
1298 &margin_modes,
1299 message.timestamp,
1300 &self.ctx,
1301 &self.margin_manager,
1302 )
1303 .map_err(EngineError::Internal)?;
1304
1305 Ok(crate::rsm::apply::TickExpiryContext {
1306 due_expiries,
1307 pending_settlements,
1308 settlement_prices,
1309 margin_modes,
1310 pm_settlements,
1311 })
1312 }
1313
1314 async fn apply_expiry_effects_and_events(
1315 &mut self,
1316 output: &crate::rsm::apply::ApplyOutput,
1317 req_id: &str,
1318 ) -> Result<(), EngineError> {
1319 for effect in &output.expiry_effects {
1320 self.apply_expiry_effect(effect)?;
1321 }
1322 self.apply_pm_settlement_projection_effects_sync(&output.pm_settlement_effects, req_id)
1323 .map_err(EngineError::Internal)?;
1324 self.apply_replayed_events_sync(&output.events, req_id)
1325 .await;
1326 Ok(())
1327 }
1328
1329 async fn publish_tick_expiry_balance_updates(
1330 &self,
1331 output: &crate::rsm::apply::ApplyOutput,
1332 context: &'static str,
1333 ) {
1334 if output.balance_updates.is_empty() {
1335 return;
1336 }
1337
1338 debug!(
1339 balance_update_count = output.balance_updates.len(),
1340 context, "Publishing TickExpiry settlement balance updates"
1341 );
1342 self.publish_balance_updates_to_nats(&output.balance_updates)
1343 .await;
1344 }
1345
1346 async fn apply_market_effects(
1347 &self,
1348 output: &crate::rsm::apply::ApplyOutput,
1349 ) -> Result<(), EngineError> {
1350 for effect in &output.market_effects {
1351 match effect {
1352 crate::rsm::apply::MarketEffect::SaveMarketAndInstrument {
1353 underlying,
1354 expiry,
1355 instrument,
1356 } => {
1357 if let Some(handler) = self.ctx.db.as_ref() {
1358 if let Err(error) =
1359 handler.save_market_and_instrument_sync(underlying, *expiry, instrument)
1360 {
1361 panic!(
1362 "CRITICAL_FAILURE: Failed to persist market {} to database: {}. \
1363 Market will be lost on restart. Restart required.",
1364 instrument.id, error
1365 );
1366 }
1367 }
1368 }
1369 crate::rsm::apply::MarketEffect::DeleteMarketAndInstrument { symbol } => {
1370 if let Some(handler) = self.ctx.db.as_ref() {
1371 if let Err(error) = handler.delete_market_and_instrument_sync(symbol) {
1372 panic!(
1373 "CRITICAL_FAILURE: Failed to delete market {} from database: {}. \
1374 In-memory and persisted state would diverge.",
1375 symbol, error
1376 );
1377 }
1378 }
1379 }
1380 crate::rsm::apply::MarketEffect::RegisterSettlement {
1381 underlying,
1382 symbol,
1383 expiry_ts,
1384 twap_window_seconds,
1385 } => {
1386 let Some(oracle) = self.ctx.deps.mark_price_oracles.get(underlying) else {
1387 warn!(
1388 "Missing mark price oracle for {} while registering settlement for {} expiry {} window {}s",
1389 underlying, symbol, expiry_ts, twap_window_seconds
1390 );
1391 continue;
1392 };
1393 oracle
1394 .register_settlement(*expiry_ts, *twap_window_seconds)
1395 .await;
1396 debug!(
1397 "Registered TWAP settlement for {} at expiry {}",
1398 symbol, expiry_ts
1399 );
1400 }
1401 }
1402 }
1403 Ok(())
1404 }
1405
1406 pub(crate) fn apply_pm_settlement_projection_effects_sync(
1407 &self,
1408 effects: &[crate::rsm::portfolio_margin::settlement_state::PmSettlementProjectionEffect],
1409 _request_id: &str,
1410 ) -> Result<(), String> {
1411 if effects.is_empty() {
1412 return Ok(());
1413 }
1414 let handler = self
1415 .ctx
1416 .db
1417 .as_ref()
1418 .ok_or_else(|| "PM settlement projection write requires a database".to_string())?;
1419 let writes = Self::pm_settlement_projection_writes_from_effects(effects)?;
1420 handler
1421 .apply_pm_settlement_projection_writes_sync(&writes)
1422 .map_err(|error| format!("PM settlement projection write failed: {error}"))
1423 }
1424
1425 pub(crate) fn pm_settlement_projection_writes_from_effects(
1426 effects: &[crate::rsm::portfolio_margin::settlement_state::PmSettlementProjectionEffect],
1427 ) -> Result<Vec<hypercall_db::PmSettlementProjectionWrite>, String> {
1428 let mut writes = Vec::with_capacity(effects.len());
1429 for effect in effects {
1430 match effect {
1431 crate::rsm::portfolio_margin::settlement_state::PmSettlementProjectionEffect::PoolUpsert(pool_state) => {
1432 let capacity = pool_state.pool_available_usdc
1433 + pool_state.active_timing_bridge_usdc
1434 + pool_state.active_settlement_debt_usdc;
1435 let config = pool_state.config.as_ref();
1436 let updated_at_ms = i64::try_from(pool_state.updated_at_ms)
1437 .map_err(|_| "PM settlement pool projection timestamp exceeds i64".to_string())?;
1438 writes.push(hypercall_db::PmSettlementProjectionWrite::Pool(
1439 hypercall_db::PmSettlementPoolProjectionWrite {
1440 underlying: pool_state.underlying.clone(),
1441 config_version: Self::pm_projection_version_i32(
1442 "config_version",
1443 pool_state.config_version,
1444 )?,
1445 policy_version: Self::pm_projection_version_i32(
1446 "policy_version",
1447 pool_state.policy_version,
1448 )?,
1449 pool_available_usdc: pool_state.pool_available_usdc,
1450 pool_target_usdc: pool_state.pool_target_usdc,
1451 pool_capacity_usdc: capacity,
1452 pool_utilization: pool_state.utilization,
1453 active_timing_bridge_usdc: pool_state.active_timing_bridge_usdc,
1454 active_settlement_debt_usdc: pool_state.active_settlement_debt_usdc,
1455 target_short_oi_notional_multiplier: config
1456 .map(|c| c.target_short_oi_notional_multiplier),
1457 utilization_kink: config.map(|c| c.utilization_kink),
1458 apr_at_kink: config.map(|c| c.apr_at_kink),
1459 max_apr: config.map(|c| c.max_apr),
1460 normal_utilization_cap: config.map(|c| c.normal_utilization_cap),
1461 crisis_utilization_cap: config.map(|c| c.crisis_utilization_cap),
1462 bridge_window_ms: config.map(|c| c.bridge_window_ms),
1463 projection_seq: updated_at_ms,
1464 updated_at_ms,
1465 },
1466 ));
1467 }
1468 crate::rsm::portfolio_margin::settlement_state::PmSettlementProjectionEffect::AccountUpsert(account) => {
1469 let updated_at_ms = i64::try_from(account.updated_at_ms)
1470 .map_err(|_| "PM settlement account projection timestamp exceeds i64".to_string())?;
1471 writes.push(hypercall_db::PmSettlementProjectionWrite::Account(
1472 hypercall_db::PmSettlementAccountProjectionWrite {
1473 wallet: account.wallet,
1474 underlying: account.underlying.clone(),
1475 status: format!("{:?}", account.status),
1476 timing_bridge_principal_usdc: account.bridge_principal_usdc,
1477 settlement_debt_principal_usdc: account.debt_principal_usdc,
1478 accrued_interest_usdc: account.accrued_interest_usdc,
1479 interest_cursor_ms: account.last_interest_accrual_ms,
1480 bridge_deadline_ms: account.bridge_deadline_ms,
1481 active_recovery_plan_id: account.active_recovery_plan_id.clone(),
1482 policy_version: Self::pm_projection_version_i32(
1483 "policy_version",
1484 account.policy_version,
1485 )?,
1486 projection_seq: updated_at_ms,
1487 updated_at_ms,
1488 },
1489 ));
1490 }
1491 crate::rsm::portfolio_margin::settlement_state::PmSettlementProjectionEffect::InterestEvent(event) => {
1492 writes.push(hypercall_db::PmSettlementProjectionWrite::InterestEvent(
1493 hypercall_db::PmSettlementInterestEventProjectionWrite {
1494 request_id: event.request_id,
1495 wallet: event.wallet,
1496 underlying: event.underlying.clone(),
1497 from_ms: event.accrual_start_ms,
1498 to_ms: event.accrual_end_ms,
1499 utilization: event.utilization,
1500 apr: event.apr,
1501 interest_usdc: event.interest_amount_usdc,
1502 policy_version: Self::pm_projection_version_i32(
1503 "policy_version",
1504 event.policy_version,
1505 )?,
1506 },
1507 ));
1508 }
1509 crate::rsm::portfolio_margin::settlement_state::PmSettlementProjectionEffect::RepaymentEvent(event) => {
1510 writes.push(hypercall_db::PmSettlementProjectionWrite::RepaymentEvent(
1511 hypercall_db::PmSettlementRepaymentEventProjectionWrite {
1512 request_id: event.request_id,
1513 wallet: event.wallet,
1514 underlying: event.underlying.clone(),
1515 amount_usdc: event.amount_usdc,
1516 interest_paid_usdc: event.interest_paid_usdc,
1517 principal_paid_usdc: event.principal_paid_usdc,
1518 reason: event.reason.clone(),
1519 source_event_id: event.source_event_id.clone(),
1520 },
1521 ));
1522 }
1523 crate::rsm::portfolio_margin::settlement_state::PmSettlementProjectionEffect::VaultDepositUpsert(deposit) => {
1524 let projection_seq = i64::try_from(deposit.updated_at_ms)
1525 .map_err(|_| "PM vault deposit projection timestamp exceeds i64".to_string())?;
1526 let created_at_ms = i64::try_from(deposit.created_at_ms)
1527 .map_err(|_| "PM vault deposit created_at timestamp exceeds i64".to_string())?;
1528 let chain_id = i64::try_from(deposit.chain_id)
1529 .map_err(|_| "PM vault deposit chain_id exceeds i64".to_string())?;
1530 let log_index = i32::try_from(deposit.log_index)
1531 .map_err(|_| "PM vault deposit log_index exceeds i32".to_string())?;
1532 writes.push(hypercall_db::PmSettlementProjectionWrite::VaultDeposit(
1533 hypercall_db::PmVaultDepositProjectionWrite {
1534 deposit_id: deposit.deposit_id,
1535 depositor: deposit.depositor,
1536 underlying: deposit.underlying.clone(),
1537 principal_usdc: deposit.principal_usdc,
1538 remaining_usdc: deposit.remaining_usdc,
1539 withdrawn_usdc: deposit.withdrawn_usdc,
1540 reserved_withdrawal_usdc: deposit.reserved_withdrawal_usdc,
1541 chain_id,
1542 source_contract_address: deposit.source_contract_address,
1543 tx_hash: deposit.tx_hash.clone(),
1544 log_index,
1545 max_listed_expiry_ts_ms: deposit.max_listed_expiry_ts_ms,
1546 settlement_grace_ms: deposit.settlement_grace_ms,
1547 lock_until_ms: deposit.lock_until_ms,
1548 status: match deposit.status {
1549 crate::rsm::portfolio_margin::settlement_state::PmVaultDepositStatus::Active => {
1550 hypercall_db::PmVaultDepositProjectionStatus::Active
1551 }
1552 crate::rsm::portfolio_margin::settlement_state::PmVaultDepositStatus::PartiallyReserved => {
1553 hypercall_db::PmVaultDepositProjectionStatus::PartiallyReserved
1554 }
1555 crate::rsm::portfolio_margin::settlement_state::PmVaultDepositStatus::Reserved => {
1556 hypercall_db::PmVaultDepositProjectionStatus::Reserved
1557 }
1558 },
1559 projection_seq,
1560 created_at_ms,
1561 updated_at_ms: projection_seq,
1562 },
1563 ));
1564 }
1565 crate::rsm::portfolio_margin::settlement_state::PmSettlementProjectionEffect::VaultWithdrawalUpsert(withdrawal) => {
1566 let projection_seq = i64::try_from(withdrawal.updated_at_ms)
1567 .map_err(|_| "PM vault withdrawal projection timestamp exceeds i64".to_string())?;
1568 let requested_at_ms = i64::try_from(withdrawal.requested_at_ms)
1569 .map_err(|_| "PM vault withdrawal requested_at timestamp exceeds i64".to_string())?;
1570 writes.push(hypercall_db::PmSettlementProjectionWrite::VaultWithdrawal(
1571 hypercall_db::PmVaultWithdrawalProjectionWrite {
1572 withdrawal_id: withdrawal.withdrawal_id,
1573 deposit_id: withdrawal.deposit_id,
1574 depositor: withdrawal.depositor,
1575 underlying: withdrawal.underlying.clone(),
1576 amount_usdc: withdrawal.amount_usdc,
1577 lock_until_ms: withdrawal.lock_until_ms,
1578 status: match withdrawal.status {
1579 crate::rsm::portfolio_margin::settlement_state::PmVaultWithdrawalStatus::Reserved => {
1580 hypercall_db::PmVaultWithdrawalProjectionStatus::Reserved
1581 }
1582 },
1583 projection_seq,
1584 requested_at_ms,
1585 updated_at_ms: projection_seq,
1586 },
1587 ));
1588 }
1589 crate::rsm::portfolio_margin::settlement_state::PmSettlementProjectionEffect::EventUpsert(event) => {
1590 let event_key = format!(
1591 "{}:{}:{}:{}:{}",
1592 event.event_key.wallet,
1593 event.event_key.market_id,
1594 event.event_key.expiry_ts_ms,
1595 event.event_key.margin_mode,
1596 event.event_key.settlement_event_sequence
1597 );
1598 writes.push(hypercall_db::PmSettlementProjectionWrite::Event(
1599 hypercall_db::PmSettlementEventProjectionWrite {
1600 event_key,
1601 wallet: event.event_key.wallet,
1602 underlying: event.underlying.clone(),
1603 event_type: event.status.clone(),
1604 amount_usdc: event.amount_usdc,
1605 request_id: event.request_id,
1606 input_digest: event.input_digest.clone(),
1607 },
1608 ));
1609 }
1610 crate::rsm::portfolio_margin::settlement_state::PmSettlementProjectionEffect::RecoveryPlanUpsert(plan) => {
1611 let updated_at_ms = i64::try_from(plan.updated_at_ms)
1612 .map_err(|_| "PM recovery plan projection timestamp exceeds i64".to_string())?;
1613 let mut actions = Vec::with_capacity(plan.actions.len());
1614 for action in &plan.actions {
1615 let action_updated_at_ms = i64::try_from(action.updated_at_ms)
1616 .map_err(|_| "PM recovery action projection timestamp exceeds i64".to_string())?;
1617 actions.push(hypercall_db::PmRecoveryActionProjectionWrite {
1618 plan_id: plan.plan_id.clone(),
1619 action_id: format!("{}:{}", plan.plan_id, action.action_index),
1620 action_type: action.action_type.clone(),
1621 status: action.status.clone(),
1622 target: action.target.clone(),
1623 attempt: Self::pm_projection_version_i32("attempt", action.attempt)?,
1624 external_id: action.submitted_external_id.clone(),
1625 external_kind: action.external_kind.clone(),
1626 result_external_id: action.result_external_id.clone(),
1627 result: action.result.clone(),
1628 expected_usdc_recovered: action.expected_usdc_recovered,
1629 expected_obligation_reduced: action.expected_obligation_reduced,
1630 expected_impact_usdc: action.expected_impact_usdc,
1631 recovered_usdc: action.recovered_usdc,
1632 liability_reduction_usdc: action.liability_reduction_usdc,
1633 projection_seq: action_updated_at_ms,
1634 updated_at_ms: action_updated_at_ms,
1635 });
1636 }
1637 writes.push(hypercall_db::PmSettlementProjectionWrite::RecoveryPlan(
1638 hypercall_db::PmRecoveryPlanProjectionWrite {
1639 plan_id: plan.plan_id.clone(),
1640 wallet: plan.wallet,
1641 underlying: plan.underlying.clone(),
1642 status: plan.status.clone(),
1643 trigger: plan.trigger.clone(),
1644 reason: plan.reason.clone(),
1645 policy_version: Self::pm_projection_version_i32(
1646 "policy_version",
1647 plan.policy_version,
1648 )?,
1649 recovery_priority_version: Self::pm_projection_version_i32(
1650 "recovery_priority_version",
1651 plan.recovery_priority_version,
1652 )?,
1653 target_reduction_usdc: plan.target_reduction_usdc,
1654 expected_usdc_recovered: plan.expected_usdc_recovered,
1655 expected_obligation_reduced: plan.expected_obligation_reduced,
1656 expected_impact_usdc: plan.expected_impact_usdc,
1657 post_plan_utilization: plan.post_plan_utilization,
1658 projection_seq: updated_at_ms,
1659 updated_at_ms,
1660 actions,
1661 },
1662 ));
1663 }
1664 }
1665 }
1666 Ok(writes)
1667 }
1668
1669 fn pm_projection_version_i32(field: &str, value: u32) -> Result<i32, String> {
1670 i32::try_from(value).map_err(|_| {
1671 format!("PM settlement projection {field} {value} exceeds signed 32-bit integer range")
1672 })
1673 }
1674
1675 pub(crate) fn apply_expiry_effect(
1676 &mut self,
1677 effect: &crate::rsm::apply::ExpiryEffect,
1678 ) -> Result<(), EngineError> {
1679 match effect {
1680 crate::rsm::apply::ExpiryEffect::UpdateInstrumentStatus { symbols, status } => {
1681 let Some(handler) = self.ctx.db.as_ref() else {
1682 panic!(
1683 "CRITICAL_FAILURE: Missing database handler for instrument status update {} on {:?}. \
1684 Engine state has advanced without a durable expiry sink. Restart required.",
1685 status, symbols
1686 );
1687 };
1688 let update_result = if status == "EXPIRED_PENDING_PRICE" {
1689 handler.transition_active_instruments_to_expired_pending_sync(symbols)
1690 } else {
1691 handler.update_instrument_status_sync(symbols, status)
1692 };
1693 if let Err(error) = update_result {
1694 panic!(
1695 "CRITICAL_FAILURE: Failed to update instrument status to {} for {:?}: {}. \
1696 Memory state and database are now inconsistent. Restart required.",
1697 status, symbols, error
1698 );
1699 }
1700 if status == "SETTLED" {
1701 for symbol in symbols {
1702 match crate::shared::ParsedSymbol::from_symbol(symbol) {
1703 Ok(parsed) => {
1704 crate::observability::record_settlement(&parsed.underlying, true);
1705 }
1706 Err(error) => {
1707 warn!(
1708 "Failed to parse settled symbol {} for settlement metric: {}",
1709 symbol, error
1710 );
1711 }
1712 }
1713 }
1714 }
1715 }
1716 crate::rsm::apply::ExpiryEffect::BatchCancelOrdersForSettlement {
1717 order_ids,
1718 now_ms,
1719 } => {
1720 let Some(handler) = self.ctx.db.as_ref() else {
1721 panic!(
1722 "CRITICAL_FAILURE: Missing database handler for {} settlement order cancels at {}. \
1723 Engine state has advanced without a durable expiry sink. Restart required.",
1724 order_ids.len(),
1725 now_ms
1726 );
1727 };
1728 for chunk in order_ids.chunks(5000) {
1729 if let Err(error) =
1730 handler.batch_cancel_orders_for_settlement_sync(chunk, *now_ms as i64)
1731 {
1732 panic!(
1733 "CRITICAL_FAILURE: Failed to persist {} settlement order cancels: {}. \
1734 Memory state and database are now inconsistent. Restart required.",
1735 chunk.len(),
1736 error
1737 );
1738 }
1739 }
1740 }
1741 crate::rsm::apply::ExpiryEffect::CancelOrphanedOrdersBySymbols { symbols } => {
1742 let Some(handler) = self.ctx.db.as_ref() else {
1743 panic!(
1744 "CRITICAL_FAILURE: Missing database handler for orphaned order cleanup on {:?}. \
1745 Engine state has advanced without a durable expiry sink. Restart required.",
1746 symbols
1747 );
1748 };
1749 match handler.cancel_orphaned_orders_by_symbols_sync(symbols) {
1750 Ok(0) => {}
1751 Ok(n) => {
1752 warn!(
1753 "Settlement cleanup: cancelled {} orphaned order_infos rows on {} instruments",
1754 n,
1755 symbols.len()
1756 );
1757 }
1758 Err(error) => {
1759 panic!(
1760 "CRITICAL_FAILURE: Failed settlement orphan cleanup for {:?}: {}. \
1761 Orphaned orders may remain in order_infos. Restart required.",
1762 symbols, error
1763 );
1764 }
1765 }
1766 }
1767 crate::rsm::apply::ExpiryEffect::ApplySettlement(intent) => {
1768 let Some(handler) = self.ctx.db.as_ref() else {
1769 panic!(
1770 "CRITICAL_FAILURE: Missing database handler for durable settlement of {}/{} at {}. \
1771 Engine state has advanced without a settlement sink. Restart required.",
1772 intent.wallet, intent.symbol, intent.event_ts_ms
1773 );
1774 };
1775 let outcome = match handler.try_apply_settlement_sync(
1776 &intent.wallet,
1777 &intent.symbol,
1778 intent.position_size,
1779 intent.settlement_price,
1780 intent.settlement_value,
1781 intent.margin_mode,
1782 intent.event_ts_ms,
1783 intent.settlement_entry_price,
1784 intent.cost_basis,
1785 intent.net_pnl,
1786 ) {
1787 Ok(outcome) => outcome,
1788 Err(error) => {
1789 panic!(
1790 "CRITICAL_FAILURE: Failed to apply durable settlement for {}/{}: {}. \
1791 Engine state has advanced but durable settlement is unknown. Restart required.",
1792 intent.wallet, intent.symbol, error
1793 );
1794 }
1795 };
1796 self.reconcile_settlement_balance(intent, outcome);
1797 }
1798 }
1799
1800 Ok(())
1801 }
1802
1803 pub(crate) fn apply_standby_expiry_effect(
1804 &mut self,
1805 effect: &crate::rsm::apply::ExpiryEffect,
1806 ) -> Result<(), EngineError> {
1807 match effect {
1808 crate::rsm::apply::ExpiryEffect::ApplySettlement(intent) => {
1809 let Some(handler) = self.ctx.db.as_ref() else {
1810 panic!(
1811 "CRITICAL_FAILURE: Missing database handler for replayed settlement of {}/{} at {}. \
1812 Standby memory state has advanced without a durable settlement source.",
1813 intent.wallet, intent.symbol, intent.event_ts_ms
1814 );
1815 };
1816 let outcome = match Self::observe_standby_settlement_with_retry(handler, intent) {
1817 Ok(outcome) => outcome,
1818 Err(error) => {
1819 panic!(
1820 "CRITICAL_FAILURE: Failed to observe durable replayed settlement for {}/{}: {}. \
1821 Standby memory state has advanced but durable settlement is unknown.",
1822 intent.wallet, intent.symbol, error
1823 );
1824 }
1825 };
1826 self.reconcile_settlement_balance(intent, outcome);
1827 }
1828 crate::rsm::apply::ExpiryEffect::UpdateInstrumentStatus { symbols, status } => {
1829 debug!(
1830 status,
1831 symbols = ?symbols,
1832 "Skipping primary-applied instrument status expiry effect during standby replay"
1833 );
1834 }
1835 crate::rsm::apply::ExpiryEffect::BatchCancelOrdersForSettlement {
1836 order_ids,
1837 now_ms,
1838 } => {
1839 debug!(
1840 order_count = order_ids.len(),
1841 now_ms,
1842 "Skipping primary-applied settlement order cancel effect during standby replay"
1843 );
1844 }
1845 crate::rsm::apply::ExpiryEffect::CancelOrphanedOrdersBySymbols { symbols } => {
1846 debug!(
1847 symbols = ?symbols,
1848 "Skipping primary-applied orphan cleanup expiry effect during standby replay"
1849 );
1850 }
1851 }
1852
1853 Ok(())
1854 }
1855
1856 pub(crate) fn apply_replayed_expiry_effects(
1865 &mut self,
1866 effects: &[crate::rsm::apply::ExpiryEffect],
1867 ) -> Result<(), EngineError> {
1868 for effect in effects {
1869 match effect {
1870 crate::rsm::apply::ExpiryEffect::ApplySettlement(intent) => {
1871 let Some(handler) = self.ctx.db.as_ref() else {
1872 panic!(
1873 "CRITICAL_FAILURE: Missing database handler for replayed settlement of {}/{} at {}. \
1874 Recovery cannot persist settlement cash replay.",
1875 intent.wallet, intent.symbol, intent.event_ts_ms
1876 );
1877 };
1878 let outcome = match handler.try_apply_settlement_sync(
1879 &intent.wallet,
1880 &intent.symbol,
1881 intent.position_size,
1882 intent.settlement_price,
1883 intent.settlement_value,
1884 intent.margin_mode,
1885 intent.event_ts_ms,
1886 intent.settlement_entry_price,
1887 intent.cost_basis,
1888 intent.net_pnl,
1889 ) {
1890 Ok(outcome) => outcome,
1891 Err(error) => {
1892 panic!(
1893 "CRITICAL_FAILURE: Failed to apply durable replayed settlement for {}/{}: {}. \
1894 Recovery cannot persist settlement cash replay.",
1895 intent.wallet, intent.symbol, error
1896 );
1897 }
1898 };
1899 debug!(
1900 wallet = %intent.wallet,
1901 symbol = %intent.symbol,
1902 newly_persisted = outcome.newly_persisted,
1903 "Applied replayed TickExpiry settlement effect"
1904 );
1905 }
1906 crate::rsm::apply::ExpiryEffect::UpdateInstrumentStatus { symbols, status } => {
1907 let Some(handler) = self.ctx.db.as_ref() else {
1908 panic!(
1909 "CRITICAL_FAILURE: Missing database handler for replayed instrument status update {} on {:?}. \
1910 Recovery cannot persist expiry status projection.",
1911 status, symbols
1912 );
1913 };
1914 let update_result = if status == "EXPIRED_PENDING_PRICE" {
1915 handler.transition_active_instruments_to_expired_pending_sync(symbols)
1916 } else {
1917 handler.update_instrument_status_sync(symbols, status)
1918 };
1919 if let Err(error) = update_result {
1920 panic!(
1921 "CRITICAL_FAILURE: Failed to persist replayed instrument status {} for {:?}: {}. \
1922 Recovery cannot persist expiry status projection.",
1923 status, symbols, error
1924 );
1925 }
1926 }
1927 crate::rsm::apply::ExpiryEffect::BatchCancelOrdersForSettlement {
1928 order_ids,
1929 now_ms,
1930 } => {
1931 let Some(handler) = self.ctx.db.as_ref() else {
1932 panic!(
1933 "CRITICAL_FAILURE: Missing database handler for replayed settlement order cancels at {}. \
1934 Recovery cannot persist expiry order projection.",
1935 now_ms
1936 );
1937 };
1938 for chunk in order_ids.chunks(5000) {
1939 if let Err(error) =
1940 handler.batch_cancel_orders_for_settlement_sync(chunk, *now_ms as i64)
1941 {
1942 panic!(
1943 "CRITICAL_FAILURE: Failed to persist {} replayed settlement order cancels: {}. \
1944 Recovery cannot persist expiry order projection.",
1945 chunk.len(),
1946 error
1947 );
1948 }
1949 }
1950 }
1951 crate::rsm::apply::ExpiryEffect::CancelOrphanedOrdersBySymbols { symbols } => {
1952 let Some(handler) = self.ctx.db.as_ref() else {
1953 panic!(
1954 "CRITICAL_FAILURE: Missing database handler for replayed orphaned order cleanup on {:?}. \
1955 Recovery cannot persist expiry order projection.",
1956 symbols
1957 );
1958 };
1959 if let Err(error) = handler.cancel_orphaned_orders_by_symbols_sync(symbols) {
1960 panic!(
1961 "CRITICAL_FAILURE: Failed replayed settlement orphan cleanup for {:?}: {}. \
1962 Recovery cannot persist expiry order projection.",
1963 symbols, error
1964 );
1965 }
1966 }
1967 }
1968 }
1969
1970 Ok(())
1971 }
1972
1973 fn observe_standby_settlement_with_retry(
1974 handler: &hypercall_db_diesel::DatabaseHandler,
1975 intent: &crate::rsm::apply::ExpirySettlementIntent,
1976 ) -> Result<hypercall_db::SettlementResult, anyhow::Error> {
1977 let mut backoff_ms = STANDBY_SETTLEMENT_OBSERVE_INITIAL_BACKOFF_MS;
1978 let mut last_error = None;
1979
1980 for attempt in 1..=STANDBY_SETTLEMENT_OBSERVE_ATTEMPTS {
1981 match handler.observe_applied_settlement_sync(
1982 &intent.wallet,
1983 &intent.symbol,
1984 intent.position_size,
1985 intent.settlement_price,
1986 intent.settlement_value,
1987 intent.margin_mode,
1988 intent.settlement_entry_price,
1989 intent.cost_basis,
1990 intent.net_pnl,
1991 ) {
1992 Ok(outcome) => return Ok(outcome),
1993 Err(error) => {
1994 if attempt == STANDBY_SETTLEMENT_OBSERVE_ATTEMPTS {
1995 last_error = Some(error);
1996 break;
1997 }
1998 warn!(
1999 wallet = %intent.wallet,
2000 symbol = %intent.symbol,
2001 attempt,
2002 max_attempts = STANDBY_SETTLEMENT_OBSERVE_ATTEMPTS,
2003 error = %error,
2004 "Standby settlement observation missed durable row; retrying"
2005 );
2006 last_error = Some(error);
2007 std::thread::sleep(Duration::from_millis(backoff_ms));
2008 backoff_ms = (backoff_ms * 2).min(STANDBY_SETTLEMENT_OBSERVE_MAX_BACKOFF_MS);
2009 }
2010 }
2011 }
2012
2013 Err(last_error.expect("standby settlement observation must record a final error"))
2014 }
2015
2016 fn reconcile_settlement_balance(
2017 &mut self,
2018 intent: &crate::rsm::apply::ExpirySettlementIntent,
2019 outcome: hypercall_db::SettlementResult,
2020 ) {
2021 debug!(
2022 wallet = %intent.wallet,
2023 symbol = %intent.symbol,
2024 balance_after = %self.ctx.balance_ledger.balance(&intent.wallet),
2025 newly_persisted = outcome.newly_persisted,
2026 "Observed durable TickExpiry settlement outcome without mutating engine cash"
2027 );
2028 }
2029
2030 async fn flush_journal(&self) {
2032 if let Some(ref sender) = self.journal_batch_sender {
2033 let (ack_tx, ack_rx) = tokio::sync::oneshot::channel();
2034 if let Err(e) = sender
2035 .send(crate::journal::JournalMessage::Flush(ack_tx))
2036 .await
2037 {
2038 error!("Failed to send flush to journal batcher: {}", e);
2039 return;
2040 }
2041 match ack_rx.await {
2042 Ok(()) => {
2043 debug!("Journal batcher flush completed");
2044 }
2045 Err(_) => {
2046 error!("Journal batcher flush ack channel dropped");
2047 }
2048 }
2049 }
2050 }
2051
2052 async fn hydrate_runtime_state_from_dependencies(
2053 &mut self,
2054 context: &'static str,
2055 hydrate_market_data: bool,
2056 ) {
2057 if let Some(ref portfolio_service) = self.ctx.deps.portfolio_service {
2060 let all = portfolio_service.all_portfolios().await;
2061 let previous_count = self.ctx.engine_positions.len();
2062 self.ctx.engine_positions.clear();
2063
2064 let mut count = 0usize;
2065 for (wallet, balance) in all {
2066 for (symbol, pos) in balance.positions {
2067 if pos.amount != rust_decimal::Decimal::ZERO {
2068 self.ctx.engine_positions.insert(
2069 (wallet, symbol),
2070 crate::rsm::engine_deps::EnginePosition {
2071 quantity: pos.amount,
2072 entry_price: pos.entry_price,
2073 },
2074 );
2075 count += 1;
2076 }
2077 }
2078 }
2079 if count > 0 || previous_count > 0 {
2080 info!(
2081 context,
2082 previous_count, count, "Reconciled engine positions from PortfolioService"
2083 );
2084 }
2085 }
2086
2087 if hydrate_market_data {
2088 self.ingest_price_updates_without_side_effects().await;
2089 self.ingest_iv_updates_without_side_effects().await;
2090 }
2091 }
2092
2093 pub(crate) async fn hydrate_standby_base_state(&mut self) {
2094 self.hydrate_runtime_state_from_dependencies("standby_startup", !self.runtime_quiesced)
2095 .await;
2096 self.publish_snapshot();
2097 info!(
2098 positions = self.ctx.engine_positions.len(),
2099 cash_wallets = self.ctx.balance_ledger.len(),
2100 spot_prices = self.ctx.spot_prices.len(),
2101 iv_surfaces = self.ctx.iv_surfaces.len(),
2102 "Hydrated standby base engine state"
2103 );
2104 }
2105
2106 pub(crate) async fn hydrate_primary_base_state(&mut self) {
2107 self.hydrate_runtime_state_from_dependencies("startup", false)
2108 .await;
2109 }
2110
2111 pub(crate) async fn apply_startup_replayed_events(&mut self) {
2112 if self.startup_replayed_events.is_empty() {
2113 return;
2114 }
2115
2116 let batches = std::mem::take(&mut self.startup_replayed_events);
2117 let event_count = batches
2118 .iter()
2119 .map(|(_, events)| events.len())
2120 .sum::<usize>();
2121 for (req_id, events) in batches {
2122 self.apply_startup_replayed_events_sync(&events, &req_id)
2123 .await;
2124 }
2125 info!(
2126 event_count,
2127 "Applied startup-replayed TickExpiry event projections"
2128 );
2129 }
2130
2131 pub(super) fn persist_engine_state_snapshot(&self) -> Option<i64> {
2137 let wal_path_configured = hypercall_journal::checkpoint::wal_path_is_explicitly_configured(
2138 self.wal_path.as_ref(),
2139 );
2140 if !wal_path_configured || self.ctx.db.is_none() {
2141 return None;
2142 }
2143
2144 use crate::rsm::engine_state_snapshot::{snapshot_path_from_wal_path, write_snapshot};
2145 use crate::rsm::restart_components::{
2146 EngineRecoveryCapture, PersistentEngineStateComponent,
2147 };
2148 use hypercall_journal::checkpoint::{
2149 checkpoint_path_for, read_checkpoint, wal_path_from_config,
2150 };
2151 use hypercall_recovery::RestartStateComponent;
2152
2153 let wal_path = wal_path_from_config(self.wal_path.as_ref());
2154 let checkpoint_path = checkpoint_path_for(&wal_path);
2155 match read_checkpoint(&checkpoint_path) {
2156 Ok(checkpoint) if checkpoint.last_command_id > 0 => {
2157 let snapshot_path = snapshot_path_from_wal_path(&wal_path);
2158 let snapshot = PersistentEngineStateComponent::capture(
2159 &EngineRecoveryCapture::for_snapshot(&self.ctx, &checkpoint),
2160 );
2161 if let Err(e) = write_snapshot(&snapshot_path, &snapshot) {
2162 warn!("Failed to write engine state snapshot: {}", e);
2163 None
2164 } else {
2165 if let Some(ref handler) = self.ctx.db {
2166 if let Err(e) =
2167 handler.update_snapshot_boundary_sync(checkpoint.last_command_id)
2168 {
2169 warn!("Failed to persist snapshot boundary to Postgres: {}", e);
2170 return None;
2171 }
2172 }
2173 info!(
2174 "Persisted engine state snapshot: last_command_id={}",
2175 checkpoint.last_command_id
2176 );
2177 Some(checkpoint.last_command_id)
2178 }
2179 }
2180 Ok(_) => {
2181 None
2183 }
2184 Err(e) => {
2185 panic!(
2186 "CRITICAL_FAILURE: failed to read checkpoint for snapshot write ({}): {}",
2187 checkpoint_path.display(),
2188 e
2189 );
2190 }
2191 }
2192 }
2193
2194 fn current_wal_checkpoint_command_id(&self) -> i64 {
2195 let wal_path_configured = hypercall_journal::checkpoint::wal_path_is_explicitly_configured(
2196 self.wal_path.as_ref(),
2197 );
2198 if !wal_path_configured {
2199 return self.replay_checkpoint.last_command_id;
2200 }
2201
2202 let wal_path = hypercall_journal::checkpoint::wal_path_from_config(self.wal_path.as_ref());
2203 let checkpoint_path = hypercall_journal::checkpoint::checkpoint_path_for(&wal_path);
2204 match hypercall_journal::checkpoint::read_checkpoint(&checkpoint_path) {
2205 Ok(checkpoint) => checkpoint.last_command_id,
2206 Err(error) => {
2207 warn!(
2208 path = %checkpoint_path.display(),
2209 error = %error,
2210 "Failed to read current WAL checkpoint for quiesce report"
2211 );
2212 self.replay_checkpoint.last_command_id
2213 }
2214 }
2215 }
2216
2217 async fn apply_runtime_balance_update(
2218 &mut self,
2219 env: crate::rsm::apply::CommandEnvelope,
2220 applied_tx: Option<tokio::sync::oneshot::Sender<Result<(), String>>>,
2221 wallet: WalletAddress,
2222 amount: Decimal,
2223 update_kind: &'static str,
2224 last_read_snapshot: &mut Instant,
2225 journal_request_id: String,
2226 outbox_appends: Vec<crate::directive_outbox::DirectiveOutboxAppend>,
2227 ) {
2228 Self::require_external_durable_mutation_uuid(update_kind, &journal_request_id);
2229 match self.apply(env.clone()) {
2230 Ok(output) => {
2231 let output_is_empty = output.is_empty();
2232 let balance_updates = output.balance_updates;
2233 if balance_updates.is_empty()
2234 && output_is_empty
2235 && Self::is_idempotent_empty_balance_update(update_kind)
2236 {
2237 debug!(
2238 wallet = %wallet,
2239 amount = %amount,
2240 update_kind = update_kind,
2241 "Observed duplicate balance update with no new balance effects"
2242 );
2243 if let Some(tx) = applied_tx {
2244 let _ = tx.send(Ok(()));
2245 }
2246 return;
2247 }
2248 assert!(
2249 !balance_updates.is_empty(),
2250 "RUNTIME_INVARIANT: {} applied without canonical balance_updates",
2251 update_kind
2252 );
2253 self.journal_external_balance_update_command(
2254 &env,
2255 &journal_request_id,
2256 outbox_appends,
2257 balance_updates.clone(),
2258 )
2259 .await;
2260 self.publish_to_nats(&env).await;
2261 self.publish_balance_updates_to_nats(&balance_updates).await;
2262 self.publish_snapshot();
2263 *last_read_snapshot = Instant::now();
2264 if let Some(tx) = applied_tx {
2265 let _ = tx.send(Ok(()));
2266 }
2267 }
2268 Err(e) => {
2269 let err = e.to_string();
2270 if let Some(tx) = applied_tx {
2271 let _ = tx.send(Err(err.clone()));
2272 }
2273 panic!(
2274 "RUNTIME_INVARIANT: failed to apply {} balance_ledger update for {} amount {} before publish: {}",
2275 update_kind, wallet, amount, err
2276 );
2277 }
2278 }
2279 }
2280
2281 fn is_idempotent_empty_balance_update(update_kind: &str) -> bool {
2282 matches!(update_kind, "DepositUpdate" | "LiquidationBonusUpdate")
2283 }
2284
2285 pub async fn start(mut self) {
2287 info!("Unified engine starting with SPAN calculations...");
2288
2289 self.apply_startup_replayed_events().await;
2290 self.hydrate_primary_base_state().await;
2291
2292 if let Some(ref mmp_cache) = self.ctx.deps.mmp_cache {
2294 mmp_cache.start();
2295 info!("MMP cache automatic eviction started");
2296 }
2297
2298 let mut expiry_interval = tokio::time::interval(Duration::from_secs(60));
2300
2301 let mut price_update_interval = tokio::time::interval(Duration::from_secs(2));
2303
2304 if !self.runtime_quiesced {
2310 self.ingest_price_updates().await;
2311 self.ingest_iv_updates().await;
2312 } else {
2313 info!("Unified engine starting quiesced; startup market ingestion is paused");
2314 }
2315
2316 if !self.runtime_quiesced {
2321 let now_ms = get_timestamp_millis();
2322 let env = match self.prepare_tick_expiry_env(now_ms).await {
2323 Ok(env) => env,
2324 Err(e) => {
2325 panic!(
2326 "CRITICAL_FAILURE: Startup expiry preparation failed: {}. \
2327 Expiry settlement cannot continue without explicit context.",
2328 e
2329 );
2330 }
2331 };
2332 let journal_env = env.clone();
2333 match self.apply(env) {
2334 Ok(output) => {
2335 self.journal_replay_owned_expiry_command(&journal_env, &output.balance_updates)
2336 .await;
2337 if let Err(e) = self
2338 .apply_expiry_effects_and_events(&output, "startup-tick-expiry")
2339 .await
2340 {
2341 panic!(
2342 "CRITICAL_FAILURE: Startup expiry effects failed: {}. \
2343 Expiry runtime effects could not be applied after memory state advanced.",
2344 e
2345 );
2346 }
2347 self.publish_tick_expiry_balance_updates(&output, "startup-tick-expiry")
2348 .await;
2349 info!(
2350 "Startup expiry check complete: {} events",
2351 output.events.len()
2352 );
2353 }
2354 Err(e) => {
2355 panic!(
2356 "CRITICAL_FAILURE: apply() failed for startup TickExpiry: {}. \
2357 Expiry state could not be advanced deterministically.",
2358 e
2359 );
2360 }
2361 }
2362 self.publish_snapshot();
2363 } else {
2364 info!("Unified engine starting quiesced; startup expiry check is paused");
2365 }
2366
2367 let mut snapshot_interval = tokio::time::interval(self.snapshot_interval);
2369 let post_startup_reconcile = tokio::time::sleep(self.post_startup_reconcile_delay);
2370 tokio::pin!(post_startup_reconcile);
2371
2372 let mut last_read_snapshot = Instant::now();
2373
2374 let mut loop_start;
2376 let mut market_start;
2377 let mut expiry_start;
2378 let mut snapshot_start;
2379
2380 let loop_iteration_hist = histogram!("ht_engine_loop_iteration_seconds");
2382 let market_request_hist = histogram!("ht_engine_market_request_seconds");
2383 let expiry_check_hist = histogram!("ht_engine_expiry_check_seconds");
2384 let snapshot_hist = histogram!("ht_engine_snapshot_seconds");
2385
2386 loop {
2387 loop_start = Instant::now();
2388 select! {
2389 Some(request) = async {
2390 match self.quiesce_receiver.as_mut() {
2391 Some(rx) => rx.recv().await,
2392 None => std::future::pending().await,
2393 }
2394 } => {
2395 self.handle_quiesce_request(request).await;
2396 }
2397
2398 Some(request) = self.order_receiver.recv(), if !self.runtime_quiesced => {
2400 DurableJournaling::process_order_journaled(&mut self, request).await;
2401 if last_read_snapshot.elapsed() >= self.read_snapshot_interval {
2402 self.publish_snapshot();
2403 last_read_snapshot = Instant::now();
2404 }
2405 }
2406
2407 Some(request) = async {
2409 match self.rfq_receiver.as_mut() {
2410 Some(rx) => rx.recv().await,
2411 None => std::future::pending().await,
2412 }
2413 }, if !self.runtime_quiesced => {
2414 self.handle_rfq_execute(request).await;
2415 if last_read_snapshot.elapsed() >= self.read_snapshot_interval {
2416 self.publish_snapshot();
2417 last_read_snapshot = Instant::now();
2418 }
2419 }
2420
2421 Some(deposit) = async {
2423 match self.deposit_receiver.as_mut() {
2424 Some(rx) => rx.recv().await,
2425 None => std::future::pending().await,
2426 }
2427 } => {
2428 if self.runtime_quiesced {
2429 if let Some(tx) = deposit.applied_tx {
2430 let _ = tx.send(Err("engine is quiesced".to_string()));
2431 }
2432 continue;
2433 }
2434 let wallet = deposit.wallet;
2435 let amount = deposit.amount;
2436 let source_event_hash = deposit.source_event_hash;
2437 let journal_request_id = deposit.journal_request_id;
2438 let outbox_appends = deposit.outbox_appends;
2439 let applied_tx = deposit.applied_tx;
2440 let env = crate::rsm::apply::CommandEnvelope::new(
2441 deposit.timestamp_ms,
2442 crate::rsm::apply::EngineCommand::DepositUpdate {
2443 wallet,
2444 amount,
2445 timestamp_ms: deposit.timestamp_ms,
2446 sequence: deposit.sequence,
2447 source_event_hash,
2448 },
2449 );
2450 self.apply_runtime_balance_update(
2451 env,
2452 applied_tx,
2453 wallet,
2454 amount,
2455 "DepositUpdate",
2456 &mut last_read_snapshot,
2457 journal_request_id,
2458 outbox_appends,
2459 )
2460 .await;
2461 }
2462
2463 Some(deposit) = async {
2465 match self.option_deposit_receiver.as_mut() {
2466 Some(rx) => rx.recv().await,
2467 None => std::future::pending().await,
2468 }
2469 } => {
2470 if self.runtime_quiesced {
2471 if let Some(tx) = deposit.applied_tx {
2472 let _ = tx.send(Err("engine is quiesced".to_string()));
2473 }
2474 continue;
2475 }
2476 let wallet = deposit.wallet;
2477 let symbol = deposit.symbol;
2478 let quantity = deposit.quantity;
2479 let request_id = deposit.request_id;
2480 let timestamp_ms = deposit.timestamp_ms;
2481 let applied_tx = deposit.applied_tx;
2482 if self
2483 .external_option_command_already_journaled(
2484 &request_id,
2485 "OptionDepositUpdate",
2486 )
2487 .await
2488 {
2489 if let Some(tx) = applied_tx {
2490 let _ = tx.send(Ok(()));
2491 }
2492 continue;
2493 }
2494 let env = crate::rsm::apply::CommandEnvelope::new(
2495 timestamp_ms,
2496 crate::rsm::apply::EngineCommand::OptionDepositUpdate {
2497 request_id: request_id.clone(),
2498 wallet,
2499 symbol: symbol.clone(),
2500 quantity,
2501 timestamp_ms,
2502 },
2503 );
2504 match self.apply(env.clone()) {
2505 Ok(_) => {
2506 self
2507 .journal_external_option_position_command(
2508 &env,
2509 &request_id,
2510 hypercall_db_diesel::engine_enums::CommandType::OptionDepositUpdate,
2511 Vec::new(),
2512 )
2513 .await;
2514 if let Some(portfolio_cache) = &self.ctx.deps.portfolio_cache {
2515 portfolio_cache
2516 .handle_option_custody_delta(
2517 wallet,
2518 symbol.clone(),
2519 quantity,
2520 None,
2521 )
2522 .await;
2523 }
2524 self.publish_to_nats(&env).await;
2525 self.publish_snapshot();
2526 last_read_snapshot = Instant::now();
2527 if let Some(tx) = applied_tx {
2528 let _ = tx.send(Ok(()));
2529 }
2530 }
2531 Err(e) => {
2532 let err = e.to_string();
2533 if let Some(tx) = applied_tx {
2534 let _ = tx.send(Err(err.clone()));
2535 }
2536 panic!(
2537 "RUNTIME_INVARIANT: failed to apply OptionDepositUpdate for {} {} quantity {} before publish: {}",
2538 wallet, symbol, quantity, err
2539 );
2540 }
2541 }
2542 }
2543
2544 Some(withdrawal) = async {
2545 match self.option_withdrawal_receiver.as_mut() {
2546 Some(rx) => rx.recv().await,
2547 None => std::future::pending().await,
2548 }
2549 } => {
2550 if self.runtime_quiesced {
2551 if let Some(tx) = withdrawal.applied_tx {
2552 let _ = tx.send(Err("engine is quiesced".to_string()));
2553 }
2554 continue;
2555 }
2556 let wallet = withdrawal.wallet;
2557 let account = withdrawal.account;
2558 let signer = withdrawal.signer;
2559 let rsm_signer = withdrawal.rsm_signer;
2560 let symbol = withdrawal.symbol;
2561 let quantity = withdrawal.quantity;
2562 let nonce = withdrawal.nonce;
2563 let action = withdrawal.action;
2564 let request_id = withdrawal.request_id;
2565 let timestamp_ms = withdrawal.timestamp_ms;
2566 let applied_tx = withdrawal.applied_tx;
2567 if self
2568 .external_option_command_already_journaled(
2569 &request_id,
2570 "OptionWithdrawalUpdate",
2571 )
2572 .await
2573 {
2574 let persisted_status = self
2575 .persisted_withdrawal_directive_status(&request_id)
2576 .unwrap_or_else(|error| {
2577 panic!(
2578 "JOURNAL_FATAL: failed to load persisted OptionWithdrawalUpdate status for {}: {}",
2579 request_id, error
2580 )
2581 });
2582 let (domain_status, delivery_status) = persisted_status.unwrap_or((
2583 crate::directive_outbox::DirectiveDomainStatus::PendingChainEffect,
2584 crate::directive_outbox::DirectiveDeliveryStatus::Pending,
2585 ));
2586 let receipt = crate::rsm::unified_engine::OptionWithdrawalApplyReceipt {
2587 directive_id: request_id.clone(),
2588 domain_status,
2589 delivery_status,
2590 };
2591 if let Some(tx) = applied_tx {
2592 let _ = tx.send(Ok(receipt));
2593 }
2594 continue;
2595 }
2596 if let Some(err) = self.withdrawal_directive_journal_unavailable_error(
2597 "OptionWithdrawalUpdate",
2598 &request_id,
2599 ) {
2600 warn!("{}", err);
2601 if let Some(tx) = applied_tx {
2602 let _ = tx.send(Err(err));
2603 }
2604 continue;
2605 }
2606 let env = crate::rsm::apply::CommandEnvelope::new(
2607 timestamp_ms,
2608 crate::rsm::apply::EngineCommand::OptionWithdrawalUpdate {
2609 request_id: request_id.clone(),
2610 wallet,
2611 account,
2612 signer,
2613 rsm_signer,
2614 symbol: symbol.clone(),
2615 quantity,
2616 nonce: Some(nonce),
2617 action,
2618 timestamp_ms,
2619 },
2620 );
2621 match self.apply(env.clone()) {
2622 Ok(output) => {
2623 let outbox_appends = output.outbox_appends;
2624 let receipt = outbox_appends
2625 .first()
2626 .map(|append| {
2627 crate::rsm::unified_engine::OptionWithdrawalApplyReceipt {
2628 directive_id: append.directive_id.clone(),
2629 domain_status: append.domain_status.clone(),
2630 delivery_status: append.delivery_status.clone(),
2631 }
2632 })
2633 .unwrap_or_else(|| {
2634 crate::rsm::unified_engine::OptionWithdrawalApplyReceipt {
2635 directive_id: request_id.clone(),
2636 domain_status: crate::directive_outbox::DirectiveDomainStatus::PendingChainEffect,
2637 delivery_status: crate::directive_outbox::DirectiveDeliveryStatus::Pending,
2638 }
2639 });
2640 self
2641 .journal_external_option_position_command(
2642 &env,
2643 &request_id,
2644 hypercall_db_diesel::engine_enums::CommandType::OptionWithdrawalUpdate,
2645 outbox_appends,
2646 )
2647 .await;
2648 if let Some(portfolio_cache) = &self.ctx.deps.portfolio_cache {
2649 portfolio_cache
2650 .handle_option_custody_delta(
2651 wallet,
2652 symbol.clone(),
2653 -quantity,
2654 None,
2655 )
2656 .await;
2657 }
2658 self.publish_to_nats(&env).await;
2659 self.publish_snapshot();
2660 last_read_snapshot = Instant::now();
2661 if let Some(tx) = applied_tx {
2662 let _ = tx.send(Ok(receipt));
2663 }
2664 }
2665 Err(e) => {
2666 let err = e.to_string();
2667 if let Some(tx) = applied_tx {
2668 let _ = tx.send(Err(err.clone()));
2669 }
2670 }
2671 }
2672 }
2673
2674 Some(withdrawal) = async {
2676 match self.cash_withdrawal_receiver.as_mut() {
2677 Some(rx) => rx.recv().await,
2678 None => std::future::pending().await,
2679 }
2680 } => {
2681 if self.runtime_quiesced {
2682 if let Some(tx) = withdrawal.applied_tx {
2683 let _ = tx.send(Err("engine is quiesced".to_string()));
2684 }
2685 continue;
2686 }
2687 let wallet = withdrawal.wallet;
2688 let account = withdrawal.account;
2689 let destination = withdrawal.destination;
2690 let signer = withdrawal.signer;
2691 let rsm_signer = withdrawal.rsm_signer;
2692 let amount = withdrawal.amount;
2693 let amount_wei = withdrawal.amount_wei;
2694 let nonce = withdrawal.nonce;
2695 let request_id = withdrawal.request_id;
2696 let timestamp_ms = withdrawal.timestamp_ms;
2697 let applied_tx = withdrawal.applied_tx;
2698 if self
2699 .external_option_command_already_journaled(
2700 &request_id,
2701 "CashWithdrawalUpdate",
2702 )
2703 .await
2704 {
2705 let persisted_status = self
2706 .persisted_withdrawal_directive_status(&request_id)
2707 .unwrap_or_else(|error| {
2708 panic!(
2709 "JOURNAL_FATAL: failed to load persisted CashWithdrawalUpdate status for {}: {}",
2710 request_id, error
2711 )
2712 });
2713 let (domain_status, delivery_status) = persisted_status.unwrap_or((
2714 crate::directive_outbox::DirectiveDomainStatus::PendingChainEffect,
2715 crate::directive_outbox::DirectiveDeliveryStatus::Pending,
2716 ));
2717 let receipt = crate::rsm::unified_engine::CashWithdrawalApplyReceipt {
2718 directive_id: request_id.clone(),
2719 domain_status,
2720 delivery_status,
2721 balance_after: self.ctx.balance_ledger.balance(&wallet),
2722 };
2723 if let Some(tx) = applied_tx {
2724 let _ = tx.send(Ok(receipt));
2725 }
2726 continue;
2727 }
2728 if let Some(err) = self.withdrawal_directive_journal_unavailable_error(
2729 "CashWithdrawalUpdate",
2730 &request_id,
2731 ) {
2732 warn!("{}", err);
2733 if let Some(tx) = applied_tx {
2734 let _ = tx.send(Err(err));
2735 }
2736 continue;
2737 }
2738 let env = crate::rsm::apply::CommandEnvelope::new(
2739 timestamp_ms,
2740 crate::rsm::apply::EngineCommand::CashWithdrawalUpdate {
2741 request_id: request_id.clone(),
2742 wallet,
2743 account,
2744 destination,
2745 signer,
2746 rsm_signer,
2747 amount,
2748 amount_wei,
2749 nonce: Some(nonce),
2750 timestamp_ms,
2751 },
2752 );
2753 match self.apply(env.clone()) {
2754 Ok(output) => {
2755 let balance_after = self.ctx.balance_ledger.balance(&wallet);
2756 let outbox_appends = output.outbox_appends;
2757 let balance_updates = output.balance_updates;
2758 assert!(
2759 !balance_updates.is_empty(),
2760 "RUNTIME_INVARIANT: CashWithdrawalUpdate applied without canonical balance_updates"
2761 );
2762 let receipt = outbox_appends
2763 .first()
2764 .map(|append| {
2765 crate::rsm::unified_engine::CashWithdrawalApplyReceipt {
2766 directive_id: append.directive_id.clone(),
2767 domain_status: append.domain_status.clone(),
2768 delivery_status: append.delivery_status.clone(),
2769 balance_after,
2770 }
2771 })
2772 .unwrap_or_else(|| {
2773 crate::rsm::unified_engine::CashWithdrawalApplyReceipt {
2774 directive_id: request_id.clone(),
2775 domain_status: crate::directive_outbox::DirectiveDomainStatus::PendingChainEffect,
2776 delivery_status: crate::directive_outbox::DirectiveDeliveryStatus::Pending,
2777 balance_after,
2778 }
2779 });
2780 self.journal_external_cash_withdrawal_command(
2781 &env,
2782 &request_id,
2783 outbox_appends,
2784 wallet,
2785 amount,
2786 balance_after,
2787 timestamp_ms,
2788 balance_updates.clone(),
2789 )
2790 .await;
2791 self.publish_to_nats(&env).await;
2792 self.publish_balance_updates_to_nats(&balance_updates).await;
2793 self.publish_snapshot();
2794 last_read_snapshot = Instant::now();
2795 if let Some(tx) = applied_tx {
2796 let _ = tx.send(Ok(receipt));
2797 }
2798 }
2799 Err(e) => {
2800 let err = e.to_string();
2801 if let Some(tx) = applied_tx {
2802 let _ = tx.send(Err(err));
2803 }
2804 }
2805 }
2806 }
2807
2808 Some(bonus) = async {
2810 match self.liquidation_bonus_receiver.as_mut() {
2811 Some(rx) => rx.recv().await,
2812 None => std::future::pending().await,
2813 }
2814 } => {
2815 if self.runtime_quiesced {
2816 if let Some(tx) = bonus.applied_tx {
2817 let _ = tx.send(Err("engine is quiesced".to_string()));
2818 }
2819 continue;
2820 }
2821 let wallet = bonus.wallet;
2822 let amount = bonus.amount;
2823 let balance_after = self.ctx.balance_ledger.balance(&wallet) + amount;
2824 let request_id = bonus.request_id;
2825 let applied_tx = bonus.applied_tx;
2826 let env = crate::rsm::apply::CommandEnvelope::new(
2827 bonus.timestamp_ms,
2828 crate::rsm::apply::EngineCommand::LiquidationBonusUpdate {
2829 wallet,
2830 amount,
2831 balance_after,
2832 timestamp_ms: bonus.timestamp_ms,
2833 sequence: bonus.sequence,
2834 },
2835 );
2836 self.apply_runtime_balance_update(
2837 env,
2838 applied_tx,
2839 wallet,
2840 amount,
2841 "LiquidationBonusUpdate",
2842 &mut last_read_snapshot,
2843 request_id,
2844 Vec::new(),
2845 ).await;
2846 }
2847
2848 Some(update) = async {
2851 match self.margin_mode_receiver.as_mut() {
2852 Some(rx) => rx.recv().await,
2853 None => std::future::pending().await,
2854 }
2855 } => {
2856 if self.runtime_quiesced {
2857 let _ = update.applied_tx.send(Err("engine is quiesced".to_string()));
2858 continue;
2859 }
2860 let wallet = update.wallet;
2861 let margin_mode = update.margin_mode;
2862 let applied_tx = update.applied_tx;
2863 let env = crate::rsm::apply::CommandEnvelope::new(
2864 update.timestamp_ms,
2865 crate::rsm::apply::EngineCommand::LegacyTierMarginModeUpdate {
2866 wallet,
2867 margin_mode,
2868 },
2869 );
2870 let nats_env = env.clone();
2871 match self.apply(env) {
2872 Ok(output) => {
2873 self.publish_to_nats(&nats_env).await;
2874 for event in &output.events {
2875 self.ctx.deps.emit_event(event);
2876 }
2877 let _ = applied_tx.send(Ok(()));
2878 }
2879 Err(e) => {
2880 let err = e.to_string();
2881 warn!(
2882 wallet = %wallet,
2883 margin_mode = ?margin_mode,
2884 error = %err,
2885 "Failed to apply TierUpdate"
2886 );
2887 let _ = applied_tx.send(Err(err));
2888 }
2889 }
2890 }
2891
2892 Some(request) = async {
2894 match self.agent_auth_receiver.as_mut() {
2895 Some(rx) => rx.recv().await,
2896 None => std::future::pending().await,
2897 }
2898 } => {
2899 if self.runtime_quiesced {
2900 let _ = request.applied_tx.send(Err("engine is quiesced".to_string()));
2901 continue;
2902 }
2903 let wallet = request.wallet;
2904 let agent = request.agent;
2905 let applied_tx = request.applied_tx;
2906 let expires_at_ms = request.expires_at_ms;
2907 let nonce = request.nonce;
2908 let ts = get_timestamp_millis();
2909 let command = if request.approve {
2910 crate::rsm::apply::EngineCommand::ApproveAgent { wallet, agent, expires_at_ms, nonce, timestamp_ms: ts }
2911 } else {
2912 crate::rsm::apply::EngineCommand::RevokeAgent { wallet, agent, nonce, timestamp_ms: ts }
2913 };
2914 let env = crate::rsm::apply::CommandEnvelope::new(
2915 ts,
2916 command,
2917 );
2918 if !self.agent_auth_journal_available() {
2919 let _ = applied_tx.send(Err(
2920 "agent authorization requires engine journaling to be enabled"
2921 .to_string(),
2922 ));
2923 continue;
2924 }
2925 let nats_env = env.clone();
2926 match self.apply(env) {
2927 Ok(_) => {
2928 self.journal_agent_auth_command(&nats_env).await;
2929 self.publish_to_nats(&nats_env).await;
2930 self.publish_snapshot();
2931 last_read_snapshot = Instant::now();
2932 let _ = applied_tx.send(Ok(()));
2933 }
2934 Err(e) => {
2935 let _ = applied_tx.send(Err(e.to_string()));
2936 }
2937 }
2938 }
2939
2940 Some(request) = async {
2942 match self.nonce_check_receiver.as_mut() {
2943 Some(rx) => rx.recv().await,
2944 None => std::future::pending().await,
2945 }
2946 } => {
2947 if self.runtime_quiesced {
2948 let _ = request.applied_tx.send(Err("engine is quiesced".to_string()));
2949 continue;
2950 }
2951 let wallet = request.wallet;
2952 let nonce = request.nonce;
2953 let applied_tx = request.applied_tx;
2954 let ts = get_timestamp_millis();
2955 let env = crate::rsm::apply::CommandEnvelope::new(
2956 ts,
2957 crate::rsm::apply::EngineCommand::NonceAdvance { wallet, nonce, timestamp_ms: ts },
2958 );
2959 let nats_env = env.clone();
2960 match self.apply(env) {
2961 Ok(_) => {
2962 self.publish_to_nats(&nats_env).await;
2963 let _ = applied_tx.send(Ok(()));
2964 }
2965 Err(e) => {
2966 let _ = applied_tx.send(Err(e.to_string()));
2967 }
2968 }
2969 }
2970
2971 Some(update) = async {
2973 match self.tier_update_receiver.as_mut() {
2974 Some(rx) => rx.recv().await,
2975 None => std::future::pending().await,
2976 }
2977 } => {
2978 if self.runtime_quiesced {
2979 if let Some(tx) = update.applied_tx {
2980 let _ = tx.send(Err("engine is quiesced".to_string()));
2981 }
2982 continue;
2983 }
2984 let wallet = update.wallet;
2985 let tier = update.tier;
2986 let trading_limits = update.trading_limits;
2987 let margin_mode = update.margin_mode;
2988 let applied_tx = update.applied_tx;
2989 let env = crate::rsm::apply::CommandEnvelope::new(
2990 update.timestamp_ms,
2991 crate::rsm::apply::EngineCommand::TierUpdate {
2992 wallet,
2993 margin_mode,
2994 tier,
2995 trading_limits,
2996 },
2997 );
2998 match self.apply(env.clone()) {
2999 Ok(_) => {
3000 self.publish_to_nats(&env).await;
3001 if let Some(tx) = applied_tx {
3002 let _ = tx.send(Ok(()));
3003 }
3004 }
3005 Err(e) => {
3006 let err = e.to_string();
3007 warn!(
3008 wallet = %wallet,
3009 error = %err,
3010 "Failed to apply TierUpdate"
3011 );
3012 if let Some(tx) = applied_tx {
3013 let _ = tx.send(Err(err));
3014 }
3015 }
3016 }
3017 }
3018
3019 Some(request) = async {
3021 match self.pm_settlement_admin_receiver.as_mut() {
3022 Some(rx) => rx.recv().await,
3023 None => std::future::pending().await,
3024 }
3025 } => {
3026 if self.runtime_quiesced {
3027 let _ = request.applied_tx.send(Err("engine is quiesced".to_string()));
3028 continue;
3029 }
3030 let timestamp_ms = get_timestamp_millis();
3031 let command =
3032 Self::stamp_pm_settlement_admin_command(request.command, timestamp_ms);
3033 let request_uuid = Self::pm_settlement_request_id(&command)
3034 .unwrap_or_else(|| {
3035 panic!(
3036 "RUNTIME_INVARIANT: PM settlement admin channel received unsupported command {}",
3037 command.command_type()
3038 )
3039 });
3040 let request_id = request_uuid.to_string();
3041 let env = crate::rsm::apply::CommandEnvelope::new(timestamp_ms, command);
3042 if matches!(
3043 &env.command,
3044 crate::rsm::apply::EngineCommand::RecordPmVaultDeposit(_)
3045 | crate::rsm::apply::EngineCommand::RequestPmVaultWithdrawal(_)
3046 ) && self
3047 .external_option_command_already_journaled(
3048 &request_id,
3049 env.command.command_type(),
3050 )
3051 .await
3052 {
3053 let _ = request.applied_tx.send(Ok(()));
3054 continue;
3055 }
3056 if let Err(error) = self.preflight_pm_settlement_admin_command(&env.command) {
3057 let _ = request.applied_tx.send(Err(error));
3058 continue;
3059 }
3060 self.journal_pm_settlement_admin_command(&env, request_uuid).await;
3061 let output = self.apply(env.clone()).unwrap_or_else(|error| {
3062 panic!(
3063 "RUNTIME_INVARIANT: PM settlement command {} request_id {} passed preflight but apply rejected: {}",
3064 env.command.command_type(),
3065 request_id,
3066 error
3067 )
3068 });
3069 if let Err(error) = self.apply_pm_settlement_projection_effects_sync(
3070 &output.pm_settlement_effects,
3071 &request_id,
3072 ) {
3073 panic!(
3074 "CRITICAL_FAILURE: failed to apply PM settlement projection effects for request_id {}: {}",
3075 request_id, error
3076 );
3077 }
3078 self.publish_to_nats(&env).await;
3079 self.publish_snapshot();
3080 last_read_snapshot = Instant::now();
3081 let _ = request.applied_tx.send(Ok(()));
3082 }
3083
3084 Some(update) = async {
3086 match self.hypercore_equity_receiver.as_mut() {
3087 Some(rx) => rx.recv().await,
3088 None => std::future::pending().await,
3089 }
3090 }, if !self.runtime_quiesced => {
3091 let env = crate::rsm::apply::CommandEnvelope::new(
3092 update.timestamp_ms,
3093 crate::rsm::apply::EngineCommand::HypercoreEquityUpdate {
3094 wallet: update.wallet,
3095 account_value: update.account_value,
3096 timestamp_ms: update.timestamp_ms,
3097 },
3098 );
3099 match self.apply(env.clone()) {
3100 Ok(_) => {
3101 self.publish_to_nats(&env).await;
3102 }
3103 Err(e) => {
3104 warn!(
3105 wallet = %update.wallet,
3106 error = %e,
3107 "Failed to apply HypercoreEquityUpdate"
3108 );
3109 }
3110 }
3111 }
3112
3113 Some(request) = self.market_receiver.recv() => {
3115 if self.runtime_quiesced {
3116 let status = match request.message.action {
3117 MarketAction::CreateMarket => MarketUpdateStatus::MarketCreationFailed,
3118 MarketAction::DeleteMarket => MarketUpdateStatus::MarketDeletionFailed,
3119 MarketAction::ExpireMarket => MarketUpdateStatus::MarketPendingSettlement,
3120 };
3121 let _ = request
3122 .response_tx
3123 .send(MarketUpdateMessage {
3124 market: request.message.market,
3125 status,
3126 timestamp: request.message.timestamp,
3127 reason: Some("engine is quiesced".to_string()),
3128 })
3129 .await;
3130 continue;
3131 }
3132 market_start = Instant::now();
3133 let original_response_tx = request.response_tx;
3134 let message = request.message;
3135 let market_cmd = if matches!(message.action, MarketAction::ExpireMarket) {
3136 match self.prepare_manual_market_expiry_context(&message).await {
3137 Ok(context) => crate::rsm::apply::MarketActionCommand::with_expiry_context(message, context),
3138 Err(e) => {
3139 error!("Failed to prepare MarketAction expiry context: {}", e);
3140 let _ = original_response_tx
3141 .send(MarketUpdateMessage {
3142 market: message.market,
3143 status: MarketUpdateStatus::MarketPendingSettlement,
3144 timestamp: message.timestamp,
3145 reason: Some(e.to_string()),
3146 })
3147 .await;
3148 continue;
3149 }
3150 }
3151 } else {
3152 crate::rsm::apply::MarketActionCommand::new(message)
3153 };
3154 if let Err(e) = self.validate_manual_market_expiry_nats_payload(&market_cmd) {
3155 error!("Failed to validate MarketAction NATS payload: {}", e);
3156 let _ = original_response_tx
3157 .send(MarketUpdateMessage {
3158 market: market_cmd.message.market,
3159 status: MarketUpdateStatus::MarketPendingSettlement,
3160 timestamp: market_cmd.message.timestamp,
3161 reason: Some(e.to_string()),
3162 })
3163 .await;
3164 continue;
3165 }
3166 let failure_market = (
3167 market_cmd.message.market.clone(),
3168 market_cmd.message.timestamp,
3169 );
3170 let env = crate::rsm::apply::CommandEnvelope::new(
3171 market_cmd.message.timestamp,
3172 crate::rsm::apply::EngineCommand::MarketAction(market_cmd),
3173 );
3174 let nats_env = env.clone();
3175 match self.apply(env) {
3176 Ok(output) => {
3177 self.journal_replay_owned_expiry_command(
3178 &nats_env,
3179 &output.balance_updates,
3180 )
3181 .await;
3182 if let Err(e) = self.apply_market_effects(&output).await {
3183 panic!(
3184 "CRITICAL_FAILURE: apply_market_effects() failed for MarketAction: {}",
3185 e
3186 );
3187 }
3188 let events_emitted_by_expiry_effects =
3189 if output.expiry_effects.is_empty() {
3190 false
3191 } else if let Err(e) = self
3192 .apply_expiry_effects_and_events(&output, "manual-market-expiry")
3193 .await
3194 {
3195 panic!(
3196 "CRITICAL_FAILURE: apply_expiry_effects_and_events() failed for MarketAction: {}. \
3197 Expiry runtime effects could not be applied after memory state advanced.",
3198 e
3199 );
3200 } else {
3201 true
3202 };
3203 self.publish_to_nats(&nats_env).await;
3204 if !events_emitted_by_expiry_effects {
3205 for event in &output.events {
3206 self.ctx.deps.emit_event(event);
3207 }
3208 }
3209 if let Some(response) = output.market_response {
3210 let _ = original_response_tx.send(response).await;
3211 }
3212 self.publish_balance_updates_to_nats(&output.balance_updates)
3213 .await;
3214 }
3215 Err(e) => {
3216 error!("apply() failed for MarketAction: {}", e);
3217 let (market, timestamp) = failure_market;
3218 let _ = original_response_tx
3219 .send(MarketUpdateMessage {
3220 market,
3221 status: MarketUpdateStatus::MarketPendingSettlement,
3222 timestamp,
3223 reason: Some(e.to_string()),
3224 })
3225 .await;
3226 market_request_hist.record(market_start.elapsed().as_secs_f64());
3227 continue;
3228 }
3229 }
3230 self.publish_snapshot();
3231 last_read_snapshot = Instant::now();
3232 market_request_hist.record(market_start.elapsed().as_secs_f64());
3233 }
3234
3235 Some(()) = async {
3243 match self.trading_mode_receiver.as_mut() {
3244 Some(rx) => match rx.changed().await {
3245 Ok(()) => Some(()),
3246 Err(_) => {
3247 std::future::pending::<()>().await;
3248 unreachable!()
3249 }
3250 },
3251 None => std::future::pending().await,
3252 }
3253 }, if !self.runtime_quiesced => {
3254 if let Some(rx) = self.trading_mode_receiver.as_mut() {
3255 let update = rx.borrow_and_update().clone();
3256 if !update.is_empty() {
3257 info!(
3258 update_size = update.len(),
3259 "Received live trading_mode update from catalog manager"
3260 );
3261 let now_ms = get_timestamp_millis();
3262 let env = crate::rsm::apply::CommandEnvelope::new(
3263 now_ms,
3264 crate::rsm::apply::EngineCommand::TradingModeUpdate {
3265 modes: update,
3266 timestamp_ms: now_ms,
3267 },
3268 );
3269 let nats_env = env.clone();
3270 match self.apply(env) {
3271 Ok(_) => { self.publish_to_nats(&nats_env).await; }
3272 Err(e) => { error!("apply() failed for TradingModeUpdate: {}", e); }
3273 }
3274 }
3275 }
3276 }
3277
3278 _ = expiry_interval.tick(), if !self.runtime_quiesced => {
3280 expiry_start = Instant::now();
3281 let now_ms = get_timestamp_millis();
3282 let env = match self.prepare_tick_expiry_env(now_ms).await {
3283 Ok(env) => env,
3284 Err(e) => {
3285 panic!(
3286 "CRITICAL_FAILURE: prepare_tick_expiry_env() failed: {}. \
3287 Expiry settlement cannot continue without explicit context.",
3288 e
3289 );
3290 }
3291 };
3292 let nats_env = env.clone();
3293 match self.apply(env) {
3294 Ok(output) => {
3295 self.journal_replay_owned_expiry_command(
3296 &nats_env,
3297 &output.balance_updates,
3298 )
3299 .await;
3300 if let Err(e) = self
3301 .apply_expiry_effects_and_events(&output, "tick-expiry")
3302 .await
3303 {
3304 panic!(
3305 "CRITICAL_FAILURE: apply_expiry_effects_and_events() failed for TickExpiry: {}. \
3306 Expiry runtime effects could not be applied after memory state advanced.",
3307 e
3308 );
3309 } else {
3310 self.publish_to_nats(&nats_env).await;
3311 }
3312 self.publish_tick_expiry_balance_updates(&output, "tick-expiry")
3313 .await;
3314 }
3315 Err(e) => {
3316 panic!(
3317 "CRITICAL_FAILURE: apply() failed for TickExpiry: {}. \
3318 Expiry state could not be advanced deterministically.",
3319 e
3320 );
3321 }
3322 }
3323 self.publish_snapshot();
3324 last_read_snapshot = Instant::now();
3325 expiry_check_hist.record(expiry_start.elapsed().as_secs_f64());
3326 }
3327
3328 _ = price_update_interval.tick(), if !self.runtime_quiesced => {
3330 self.ingest_price_updates().await;
3331 self.ingest_iv_updates().await;
3332 }
3333
3334 _ = &mut post_startup_reconcile, if !self.post_startup_reconciled => {
3336 self.post_startup_reconciled = true;
3337 ReplayRecovery::run_post_startup_reconciliation(&mut self);
3338 if !self.sync_status.is_ready() {
3339 self.sync_status.set_ready();
3340 info!("UnifiedEngine sync status: Ready (post-startup reconciliation complete)");
3341 }
3342 }
3343
3344 _ = snapshot_interval.tick() => {
3346 let now = Instant::now();
3347
3348 if now.duration_since(self.last_snapshot) >= self.snapshot_interval {
3349 snapshot_start = Instant::now();
3350 self.flush_journal().await;
3351
3352 self.persist_engine_state_snapshot();
3353 snapshot_hist.record(snapshot_start.elapsed().as_secs_f64());
3354 self.last_snapshot = now;
3355 }
3356 }
3357
3358 _ = self.shutdown_receiver.recv() => {
3360 info!("Unified engine received shutdown signal");
3361 self.flush_journal().await;
3362 self.persist_engine_state_snapshot();
3363 break;
3364 }
3365
3366 else => {
3368 info!("Channels closed, shutting down engine");
3369 self.flush_journal().await;
3370 self.persist_engine_state_snapshot();
3371 break;
3372 }
3373 }
3374
3375 loop_iteration_hist.record(loop_start.elapsed().as_secs_f64());
3377 }
3378
3379 info!("Unified engine stopped");
3380 }
3381
3382 async fn handle_quiesce_request(&mut self, request: EngineQuiesceRequest) {
3383 match request.action {
3384 EngineQuiesceAction::Quiesce => {
3385 self.runtime_quiesced = true;
3386 self.flush_journal().await;
3387 let snapshot_command_id = self.persist_engine_state_snapshot();
3388 let last_command_id =
3389 snapshot_command_id.unwrap_or_else(|| self.current_wal_checkpoint_command_id());
3390 self.publish_snapshot();
3391 self.last_snapshot = Instant::now();
3392 info!(
3393 last_command_id,
3394 last_l2_seq = self.ctx.l2_update_seq.load(Ordering::SeqCst),
3395 "Unified engine entered quiesced mode"
3396 );
3397 let _ = request.response_tx.send(EngineQuiesceReport {
3398 phase: hypercall_recovery::RecoveryPhase::Snapshotting,
3399 quiesced: true,
3400 last_command_id,
3401 last_l2_seq: self.ctx.l2_update_seq.load(Ordering::SeqCst),
3402 snapshot_persisted: snapshot_command_id.is_some(),
3403 paused_sources: crate::rsm::restart_components::registered_mutation_sources(),
3404 });
3405 }
3406 EngineQuiesceAction::Resume => {
3407 self.runtime_quiesced = false;
3408 info!("Unified engine resumed live mutation sources");
3409 let _ = request.response_tx.send(EngineQuiesceReport {
3410 phase: hypercall_recovery::RecoveryPhase::Recovered,
3411 quiesced: false,
3412 last_command_id: self.current_wal_checkpoint_command_id(),
3413 last_l2_seq: self.ctx.l2_update_seq.load(Ordering::SeqCst),
3414 snapshot_persisted: false,
3415 paused_sources: Vec::new(),
3416 });
3417 }
3418 }
3419 }
3420
3421 async fn ingest_price_updates(&mut self) {
3424 self.ingest_price_updates_with_side_effects(true).await;
3425 }
3426
3427 async fn ingest_price_updates_without_side_effects(&mut self) {
3428 self.ingest_price_updates_with_side_effects(false).await;
3429 }
3430
3431 async fn ingest_price_updates_with_side_effects(&mut self, emit_side_effects: bool) {
3432 use crate::journal::engine_journal_batcher::{JournalEntry, JournalMessage};
3433 use crate::rsm::apply::{CommandEnvelope, EngineCommand, PriceUpdatePayload};
3434 use rust_decimal::Decimal;
3435
3436 let greeks_cache = match &self.ctx.deps.greeks_cache {
3437 Some(cache) => cache.clone(),
3438 None => return,
3439 };
3440
3441 let mut underlyings = greeks_cache.get_configured_underlyings();
3442 underlyings.sort();
3443 let now_ms = get_timestamp_millis();
3444
3445 for underlying in &underlyings {
3446 if let Some(price_f64) = greeks_cache.get_spot_price(underlying).await {
3447 let spot_price = match Decimal::from_f64_retain(price_f64) {
3448 Some(d) => d,
3449 None => continue,
3450 };
3451
3452 if self.ctx.spot_prices.get(underlying) == Some(&spot_price) {
3453 continue;
3454 }
3455
3456 let env = CommandEnvelope::new(
3457 now_ms,
3458 EngineCommand::PriceUpdate {
3459 underlying: underlying.clone(),
3460 spot_price,
3461 timestamp_ms: now_ms,
3462 },
3463 );
3464
3465 #[cfg(feature = "rsm-state")]
3466 let cmd_identity_hash = emit_side_effects.then(|| env.command.identity_hash());
3467 let nats_env = emit_side_effects.then(|| env.clone());
3468 if let Err(e) = self.apply(env) {
3469 warn!(
3470 underlying = %underlying,
3471 error = %e,
3472 "Failed to apply PriceUpdate command"
3473 );
3474 continue;
3475 }
3476 if let Some(nats_env) = nats_env {
3477 self.publish_to_nats(&nats_env).await;
3478 }
3479
3480 if let (Some(ref batch_sender), Some(_cmd_identity_hash)) = (
3481 &self.journal_batch_sender,
3482 #[cfg(feature = "rsm-state")]
3483 cmd_identity_hash,
3484 #[cfg(not(feature = "rsm-state"))]
3485 emit_side_effects.then_some(()),
3486 ) {
3487 let payload = PriceUpdatePayload {
3488 underlying: underlying.clone(),
3489 spot_price,
3490 timestamp_ms: now_ms,
3491 };
3492 let command_data = hypercall_types::serialize_to_wire_bytes(&payload);
3493
3494 let entry = JournalEntry {
3495 received_ts_ms: now_ms,
3496 command_data,
3497 response_data: None,
3498 order_id: None,
3499 pre_digest: Default::default(),
3500 post_digest: Default::default(),
3501 duration_ms: 0,
3502 events: Vec::new(),
3503 outbox_appends: Vec::new(),
3504 fill_side_effects: Vec::new(),
3505 cash_withdrawal_side_effect: None,
3506 balance_updates: Vec::new(),
3507 created_at: Instant::now(),
3508 commit_ack: None,
3509 request_uuid: hypercall_db_diesel::engine_enums::DbUuid(
3510 uuid::Uuid::new_v4(),
3511 ),
3512 command_type_enum: Some(
3513 hypercall_db_diesel::engine_enums::CommandType::PriceUpdate,
3514 ),
3515 #[cfg(feature = "rsm-state")]
3516 command_identity_hash: _cmd_identity_hash,
3517 #[cfg(feature = "rsm-state")]
3518 rsm_state_digest: Some(
3519 crate::rsm::engine_snapshot::EngineStateDigest::from_ctx(
3520 &self.ctx,
3521 self.ctx
3522 .l2_update_seq
3523 .load(std::sync::atomic::Ordering::SeqCst),
3524 ),
3525 ),
3526 };
3527 if let Err(e) = batch_sender.send(JournalMessage::Entry(entry)).await {
3528 panic!(
3529 "JOURNAL_FATAL: failed to journal PriceUpdate for {}: {}",
3530 underlying, e
3531 );
3532 }
3533 }
3534 }
3535 }
3536 }
3537
3538 async fn ingest_iv_updates(&mut self) {
3544 self.ingest_iv_updates_with_side_effects(true).await;
3545 }
3546
3547 async fn ingest_iv_updates_without_side_effects(&mut self) {
3548 self.ingest_iv_updates_with_side_effects(false).await;
3549 }
3550
3551 async fn ingest_iv_updates_with_side_effects(&mut self, emit_side_effects: bool) {
3552 use crate::journal::engine_journal_batcher::{JournalEntry, JournalMessage};
3553 use crate::rsm::apply::{CommandEnvelope, EngineCommand, IvUpdatePayload};
3554 use crate::vol_oracle::vol_surface_cache::VolatilitySurface;
3555
3556 let vol_oracle = match &self.external_vol_oracle {
3557 Some(oracle) => oracle.clone(),
3558 None => return,
3559 };
3560
3561 let statuses = vol_oracle.statuses();
3562 let now_ms = get_timestamp_millis();
3563
3564 let mut all_underlyings: Vec<String> =
3565 statuses.iter().map(|s| s.underlying.clone()).collect();
3566 all_underlyings.sort();
3567 all_underlyings.dedup();
3568
3569 for underlying in &all_underlyings {
3572 let is_ready = statuses
3573 .iter()
3574 .any(|s| s.underlying == *underlying && s.ready);
3575 if !is_ready {
3576 if self.ctx.iv_surfaces.remove(underlying).is_some() {
3577 if let Ok(mut shared) = self.engine_iv_surfaces.write() {
3578 shared.remove(underlying);
3579 }
3580 debug!(
3581 underlying = %underlying,
3582 "Cleared stale engine IV surface (provider no longer ready)"
3583 );
3584 }
3585 continue;
3586 }
3587 }
3588
3589 let ready_underlyings: Vec<String> = all_underlyings
3590 .iter()
3591 .filter(|u| statuses.iter().any(|s| s.underlying == **u && s.ready))
3592 .cloned()
3593 .collect();
3594
3595 for underlying in &ready_underlyings {
3596 let snapshot = match vol_oracle.get_surface_snapshot(underlying) {
3597 Some(s) if !s.strike_points.is_empty() => s,
3598 _ => match synthesize_fixed_surface_snapshot(
3599 &self.ctx.orderbooks,
3600 &vol_oracle,
3601 &statuses,
3602 underlying,
3603 now_ms as i64,
3604 ) {
3605 Some(snapshot) => snapshot,
3606 None => continue,
3607 },
3608 };
3609
3610 let mut surface = VolatilitySurface::new();
3612 for point in &snapshot.strike_points {
3613 surface.insert(point.strike, point.expiry, point.iv);
3614 }
3615
3616 let journal_data = if emit_side_effects && self.journal_batch_sender.is_some() {
3618 let payload = IvUpdatePayload {
3619 underlying: underlying.clone(),
3620 strike_points: snapshot.strike_points.clone(),
3621 timestamp_ms: now_ms,
3622 };
3623 Some(hypercall_types::serialize_to_wire_bytes(&payload))
3624 } else {
3625 None
3626 };
3627
3628 let changed = match self.ctx.iv_surfaces.get(underlying) {
3632 None => true,
3633 Some(existing) => {
3634 snapshot.strike_points.len() != existing.len()
3636 || snapshot.strike_points.iter().any(|point| {
3637 existing
3638 .get_interpolated(point.strike, point.expiry)
3639 .map(|existing_iv| (existing_iv - point.iv).abs() > 1e-12)
3640 .unwrap_or(true)
3641 })
3642 }
3643 };
3644
3645 if !changed {
3646 if let Some(ts) = snapshot.last_update_ts_ms {
3647 self.ctx.iv_source_timestamps.insert(underlying.clone(), ts);
3648 }
3649 continue;
3650 }
3651
3652 let env = CommandEnvelope::new(
3653 now_ms,
3654 EngineCommand::IvUpdate {
3655 underlying: underlying.clone(),
3656 surface,
3657 journal_data: journal_data.clone(),
3658 timestamp_ms: now_ms,
3659 },
3660 );
3661
3662 if let Some(ts) = snapshot.last_update_ts_ms {
3663 self.ctx.iv_source_timestamps.insert(underlying.clone(), ts);
3664 }
3665
3666 #[cfg(feature = "rsm-state")]
3667 let cmd_identity_hash = emit_side_effects.then(|| env.command.identity_hash());
3668 let nats_env = emit_side_effects.then(|| env.clone());
3669 if let Err(e) = self.apply(env) {
3670 warn!(
3671 underlying = %underlying,
3672 error = %e,
3673 "Failed to apply IvUpdate command"
3674 );
3675 continue;
3676 }
3677 if let Some(nats_env) = nats_env {
3678 self.publish_to_nats(&nats_env).await;
3679 }
3680
3681 if let (Some(ref batch_sender), Some(command_data), Some(_cmd_identity_hash)) = (
3682 &self.journal_batch_sender,
3683 journal_data,
3684 #[cfg(feature = "rsm-state")]
3685 cmd_identity_hash,
3686 #[cfg(not(feature = "rsm-state"))]
3687 emit_side_effects.then_some(()),
3688 ) {
3689 let entry = JournalEntry {
3690 received_ts_ms: now_ms,
3691 command_data,
3692 response_data: None,
3693 order_id: None,
3694 pre_digest: Default::default(),
3695 post_digest: Default::default(),
3696 duration_ms: 0,
3697 events: Vec::new(),
3698 outbox_appends: Vec::new(),
3699 fill_side_effects: Vec::new(),
3700 cash_withdrawal_side_effect: None,
3701 balance_updates: Vec::new(),
3702 created_at: Instant::now(),
3703 commit_ack: None,
3704 request_uuid: hypercall_db_diesel::engine_enums::DbUuid(uuid::Uuid::new_v4()),
3705 command_type_enum: Some(
3706 hypercall_db_diesel::engine_enums::CommandType::IvUpdate,
3707 ),
3708 #[cfg(feature = "rsm-state")]
3709 command_identity_hash: _cmd_identity_hash,
3710 #[cfg(feature = "rsm-state")]
3711 rsm_state_digest: Some(
3712 crate::rsm::engine_snapshot::EngineStateDigest::from_ctx(
3713 &self.ctx,
3714 self.ctx
3715 .l2_update_seq
3716 .load(std::sync::atomic::Ordering::SeqCst),
3717 ),
3718 ),
3719 };
3720 if let Err(e) = batch_sender.send(JournalMessage::Entry(entry)).await {
3721 panic!(
3722 "JOURNAL_FATAL: failed to journal IvUpdate for {}: {}",
3723 underlying, e
3724 );
3725 }
3726 }
3727 }
3728 }
3729}
3730
3731#[cfg(test)]
3732mod tests {
3733 use super::*;
3734 use hypercall_engine::OrderBook;
3735 use rust_decimal_macros::dec;
3736
3737 #[test]
3738 fn external_durable_mutation_uuid_accepts_uuid() {
3739 let request_id = uuid::Uuid::now_v7().to_string();
3740 let parsed =
3741 UnifiedEngine::require_external_durable_mutation_uuid("DepositUpdate", &request_id);
3742 assert_eq!(parsed.to_string(), request_id);
3743 }
3744
3745 #[test]
3746 #[should_panic(
3747 expected = "RUNTIME_INVARIANT: external engine command DepositUpdate request_id not-a-uuid is not a UUID"
3748 )]
3749 fn external_durable_mutation_uuid_panics_on_non_uuid() {
3750 UnifiedEngine::require_external_durable_mutation_uuid("DepositUpdate", "not-a-uuid");
3751 }
3752
3753 #[test]
3754 fn empty_external_balance_updates_are_idempotent_for_journal_retries() {
3755 assert!(UnifiedEngine::is_idempotent_empty_balance_update(
3756 "DepositUpdate"
3757 ));
3758 assert!(UnifiedEngine::is_idempotent_empty_balance_update(
3759 "LiquidationBonusUpdate"
3760 ));
3761 assert!(!UnifiedEngine::is_idempotent_empty_balance_update(
3762 "CashWithdrawalUpdate"
3763 ));
3764 }
3765
3766 #[test]
3767 fn pm_settlement_admin_stamp_replaces_client_timestamp() {
3768 let client_timestamp_ms = 1_800_000_000_000;
3769 let server_timestamp_ms = client_timestamp_ms + 60_000;
3770 let command = crate::rsm::apply::EngineCommand::AccruePmSettlementInterest(
3771 hypercall_engine::command::AccruePmSettlementInterestCommand {
3772 request_id: uuid::Uuid::now_v7(),
3773 input_digest: "admin-pm-interest-test".to_string(),
3774 wallet: hypercall_types::wallet_address::test_wallet(1),
3775 underlying: "BTC".to_string(),
3776 to_ms: client_timestamp_ms as i64,
3777 timestamp_ms: client_timestamp_ms,
3778 },
3779 );
3780
3781 let stamped =
3782 UnifiedEngine::stamp_pm_settlement_admin_command(command, server_timestamp_ms);
3783
3784 match stamped {
3785 crate::rsm::apply::EngineCommand::AccruePmSettlementInterest(command) => {
3786 assert_eq!(command.timestamp_ms, server_timestamp_ms);
3787 assert_eq!(command.to_ms, client_timestamp_ms as i64);
3788 }
3789 other => panic!("unexpected command {}", other.command_type()),
3790 }
3791 }
3792
3793 #[test]
3794 fn expire_market_is_replay_journal_owned() {
3795 let market = hypercall_types::Market {
3796 symbol: "BTC-20260531-70000-C".to_string(),
3797 underlying: "BTC".to_string(),
3798 expiry: 20260531,
3799 strike: dec!(70000),
3800 option_type: hypercall_types::OptionType::Call,
3801 };
3802 let command = crate::rsm::apply::MarketActionCommand::with_expiry_context(
3803 hypercall_types::MarketActionMessage {
3804 market,
3805 action: hypercall_types::MarketAction::ExpireMarket,
3806 timestamp: 1_800_000_000_000,
3807 },
3808 crate::rsm::apply::TickExpiryContext::empty(),
3809 );
3810 let env = crate::rsm::apply::CommandEnvelope::new(
3811 1_800_000_000_000,
3812 crate::rsm::apply::EngineCommand::MarketAction(command),
3813 );
3814
3815 let (command_type, data, label) = UnifiedEngine::replay_owned_expiry_command_payload(&env)
3816 .expect("ExpireMarket must be journaled for replay");
3817
3818 assert_eq!(
3819 command_type,
3820 hypercall_db_diesel::engine_enums::CommandType::ExpireMarket
3821 );
3822 assert_eq!(label, "ExpireMarket");
3823 let decoded = crate::nats::deserialize::deserialize_command(
3824 crate::nats::CommandType::MarketAction,
3825 crate::nats::COMMAND_WIRE_VERSION_V1,
3826 &data,
3827 )
3828 .expect("journaled ExpireMarket payload should decode for replay");
3829 assert!(matches!(
3830 decoded,
3831 crate::rsm::apply::EngineCommand::MarketAction(command)
3832 if command.message.action == hypercall_types::MarketAction::ExpireMarket
3833 ));
3834 }
3835
3836 #[test]
3837 fn non_empty_tick_expiry_is_replay_journal_owned() {
3838 let context = crate::rsm::apply::TickExpiryContext {
3839 due_expiries: vec![crate::rsm::apply::TickExpiryDueGroup {
3840 expiry_ts: 1_800_000_000,
3841 symbols: vec!["BTC-20260531-70000-C".to_string()],
3842 }],
3843 pending_settlements: Vec::new(),
3844 settlement_prices: Vec::new(),
3845 margin_modes: Vec::new(),
3846 pm_settlements: Vec::new(),
3847 };
3848 let env = crate::rsm::apply::CommandEnvelope::new(
3849 1_800_000_000_000,
3850 crate::rsm::apply::EngineCommand::TickExpiry {
3851 now_ms: 1_800_000_000_000,
3852 context,
3853 },
3854 );
3855
3856 let (command_type, data, label) = UnifiedEngine::replay_owned_expiry_command_payload(&env)
3857 .expect("non-empty TickExpiry must be journaled for replay");
3858
3859 assert_eq!(
3860 command_type,
3861 hypercall_db_diesel::engine_enums::CommandType::TickExpiry
3862 );
3863 assert_eq!(label, "TickExpiry");
3864 let decoded = crate::nats::deserialize::deserialize_command(
3865 crate::nats::CommandType::TickExpiry,
3866 crate::nats::COMMAND_WIRE_VERSION_V1,
3867 &data,
3868 )
3869 .expect("journaled TickExpiry payload should decode for replay");
3870 assert!(matches!(
3871 decoded,
3872 crate::rsm::apply::EngineCommand::TickExpiry { .. }
3873 ));
3874 }
3875
3876 #[test]
3877 fn replay_owned_expiry_journal_entry_carries_balance_updates() {
3878 let context = crate::rsm::apply::TickExpiryContext {
3879 due_expiries: vec![crate::rsm::apply::TickExpiryDueGroup {
3880 expiry_ts: 1_800_000_000,
3881 symbols: vec!["BTC-20260531-70000-C".to_string()],
3882 }],
3883 pending_settlements: Vec::new(),
3884 settlement_prices: Vec::new(),
3885 margin_modes: Vec::new(),
3886 pm_settlements: Vec::new(),
3887 };
3888 let env = crate::rsm::apply::CommandEnvelope::new(
3889 1_800_000_000_000,
3890 crate::rsm::apply::EngineCommand::TickExpiry {
3891 now_ms: 1_800_000_000_000,
3892 context,
3893 },
3894 );
3895 let balance_update = hypercall_types::BalanceUpdate {
3896 balance_update_seq: 17,
3897 wallet: hypercall_types::wallet_address::test_wallet(7),
3898 delta: dec!(12.5),
3899 balance_after: dec!(112.5),
3900 reason: hypercall_types::BalanceUpdateReason::Settlement,
3901 reference_id: Some("settlement:BTC-20260531-70000-C".to_string()),
3902 source_command_id: None,
3903 timestamp_ms: 1_800_000_000_000,
3904 };
3905 let request_uuid = hypercall_db_diesel::engine_enums::DbUuid(uuid::Uuid::nil());
3906
3907 let (entry, label) = UnifiedEngine::replay_owned_expiry_journal_entry(
3908 &env,
3909 std::slice::from_ref(&balance_update),
3910 request_uuid,
3911 None,
3912 )
3913 .expect("non-empty TickExpiry must produce a replay journal entry");
3914
3915 assert_eq!(label, "TickExpiry");
3916 assert_eq!(entry.balance_updates, vec![balance_update]);
3917 assert_eq!(entry.request_uuid, request_uuid);
3918 assert_eq!(
3919 entry.command_type_enum,
3920 Some(hypercall_db_diesel::engine_enums::CommandType::TickExpiry)
3921 );
3922 }
3923
3924 #[test]
3925 fn empty_tick_expiry_is_not_replay_journal_owned() {
3926 let env = crate::rsm::apply::CommandEnvelope::new(
3927 1_800_000_000_000,
3928 crate::rsm::apply::EngineCommand::TickExpiry {
3929 now_ms: 1_800_000_000_000,
3930 context: crate::rsm::apply::TickExpiryContext::empty(),
3931 },
3932 );
3933
3934 assert!(UnifiedEngine::replay_owned_expiry_command_payload(&env).is_none());
3935 }
3936
3937 #[test]
3938 fn synthesize_fixed_surface_snapshot_uses_live_orderbook_grid() {
3939 let mut orderbooks = std::collections::HashMap::new();
3940 orderbooks.insert(
3941 "GOLD-20261231-4700-C".to_string(),
3942 OrderBook::with_symbol(
3943 hypercall_types::expiry_date_to_timestamp("GOLD", 20261231) as u64,
3944 dec!(4700),
3945 hypercall_types::OptionType::Call,
3946 "GOLD-20261231-4700-C".to_string(),
3947 ),
3948 );
3949 orderbooks.insert(
3950 "GOLD-20261231-4700-P".to_string(),
3951 OrderBook::with_symbol(
3952 hypercall_types::expiry_date_to_timestamp("GOLD", 20261231) as u64,
3953 dec!(4700),
3954 hypercall_types::OptionType::Put,
3955 "GOLD-20261231-4700-P".to_string(),
3956 ),
3957 );
3958
3959 let oracle: crate::vol_oracle::SharedVolOracle =
3960 std::sync::Arc::new(crate::vol_oracle::FixedTestRiskVolOracle::with_underlyings(
3961 0.50,
3962 vec!["GOLD".to_string()],
3963 ));
3964 let statuses = oracle.statuses();
3965 let snapshot = synthesize_fixed_surface_snapshot(
3966 &orderbooks,
3967 &oracle,
3968 &statuses,
3969 "GOLD",
3970 1_800_000_000_000,
3971 )
3972 .expect("fixed snapshot should be synthesized");
3973
3974 assert_eq!(snapshot.strike_points.len(), 1);
3975 assert_eq!(snapshot.strike_points[0].strike, 4700.0);
3976 assert_eq!(
3977 snapshot.strike_points[0].expiry,
3978 hypercall_types::expiry_date_to_timestamp("GOLD", 20261231) as i64
3979 );
3980 assert_eq!(snapshot.strike_points[0].iv, 0.50);
3981 }
3982
3983 #[test]
3984 fn max_listed_option_expiry_uses_engine_orderbooks() {
3985 let mut orderbooks = std::collections::HashMap::new();
3986 orderbooks.insert(
3987 "GOLD-20261231-4700-C".to_string(),
3988 OrderBook::with_symbol(
3989 hypercall_types::expiry_date_to_timestamp("GOLD", 20261231) as u64,
3990 dec!(4700),
3991 hypercall_types::OptionType::Call,
3992 "GOLD-20261231-4700-C".to_string(),
3993 ),
3994 );
3995 orderbooks.insert(
3996 "GOLD-20270630-4700-C".to_string(),
3997 OrderBook::with_symbol(
3998 hypercall_types::expiry_date_to_timestamp("GOLD", 20270630) as u64,
3999 dec!(4700),
4000 hypercall_types::OptionType::Call,
4001 "GOLD-20270630-4700-C".to_string(),
4002 ),
4003 );
4004 orderbooks.insert(
4005 "BTC-20280630-100000-C".to_string(),
4006 OrderBook::with_symbol(
4007 hypercall_types::expiry_date_to_timestamp("BTC", 20280630) as u64,
4008 dec!(100000),
4009 hypercall_types::OptionType::Call,
4010 "BTC-20280630-100000-C".to_string(),
4011 ),
4012 );
4013
4014 assert_eq!(
4015 max_listed_option_expiry_ts_ms(&orderbooks, "GOLD").unwrap(),
4016 hypercall_types::expiry_date_to_timestamp("GOLD", 20270630) * 1_000
4017 );
4018 }
4019
4020 #[test]
4021 fn max_listed_option_expiry_rejects_unknown_underlying() {
4022 let orderbooks = std::collections::HashMap::new();
4023 let error = max_listed_option_expiry_ts_ms(&orderbooks, "GOLD")
4024 .expect_err("missing listed markets should reject");
4025 assert!(error.contains("no listed option markets"));
4026 }
4027}