Skip to main content

hypercall/rsm/
expiry_manager.rs

1//! Expiry and settlement management.
2//!
3//! `ExpiryManager` owns the expiry schedules and implements the two-phase
4//! settlement process: transition to pending → settle when TWAP is available.
5//! The canonical `expired_instruments` map lives in `EngineCtx`, and
6//! `reference_prices` lives in `EngineDeps` to avoid duplication.
7
8use crate::observability::record_settlement;
9use crate::rsm::apply::{
10    ApplyOutput, ExpiryEffect, ExpirySettlementIntent, TickExpiryContext, TickExpiryDueGroup,
11    TickExpiryPendingGroup, TickExpiryPmSettlement, TickExpirySettlementPrice,
12    TickExpiryWalletMarginMode,
13};
14use crate::rsm::engine_deps::{EngineCtx, EngineDeps};
15use crate::rsm::margin_manager::{expiry_date_to_timestamp, MarginManager};
16use crate::rsm::margin_mode::MarginMode;
17use crate::shared::order_types::ParsedSymbol;
18use crate::shared::traits::MarkPriceOracle;
19use hypercall_db::{InstrumentReader, InstrumentWriter, OrderWriter, TierReader};
20use hypercall_db_diesel::DatabaseHandler;
21use hypercall_engine::command::PmSettlementEventKey;
22use hypercall_margin::portfolio::{
23    classify_liquidity_gap, PmAccountSettlementFacts, PmLiquidityClassification,
24    PmSettlementObligation,
25};
26use hypercall_settlement::{
27    self, build_position_expired_message, plan_position_settlements,
28    OptionType as SettlementOptionType, SettlementInstrument, SettlementPosition,
29};
30use hypercall_types::WalletAddress;
31use hypercall_types::{
32    EngineMessage, MarketUpdateMessage, MarketUpdateStatus, OptionType as MessageOptionType,
33    OrderInfo, OrderUpdateMessage, OrderUpdateStatus,
34};
35use rust_decimal::Decimal;
36use rust_decimal_macros::dec;
37use sha2::{Digest, Sha256};
38use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
39use tracing::{debug, error, info, warn};
40
41/// Manages instrument expiry schedules and settlement.
42///
43/// Owns the expiry schedules. The canonical `expired_instruments` map lives in
44/// `EngineCtx`, and `reference_prices` lives in `EngineDeps` to avoid duplication.
45pub struct ExpiryManager {
46    /// expiry_timestamp → list of symbols
47    pub expiry_schedules: BTreeMap<u64, Vec<String>>,
48}
49
50#[derive(Debug, Clone, Copy, Default)]
51struct ExpireInstrumentApplyResult {
52    settled: bool,
53    blocked: bool,
54}
55
56fn to_settlement_option_type(ot: &crate::types::OptionType) -> SettlementOptionType {
57    match ot {
58        crate::types::OptionType::Call => SettlementOptionType::Call,
59        crate::types::OptionType::Put => SettlementOptionType::Put,
60    }
61}
62
63fn expiry_cash_delta_for_margin_mode(
64    margin_mode: MarginMode,
65    settlement_value: Decimal,
66    net_pnl: Decimal,
67) -> Decimal {
68    hypercall_settlement::settlement_cash_delta_for_margin_mode(
69        margin_mode,
70        settlement_value,
71        Some(net_pnl),
72        "expiry apply intent",
73    )
74    .expect("net_pnl is present for expiry apply intent")
75}
76
77fn simulated_pm_liquid_after_settlement(
78    current_liquid: Decimal,
79    settlement_cash_delta: Decimal,
80    pool_front_usdc: Decimal,
81) -> Decimal {
82    (current_liquid + settlement_cash_delta + pool_front_usdc).max(Decimal::ZERO)
83}
84
85fn initial_pm_liquid_with_precredit(balance: Decimal, precredited_cash: Decimal) -> Decimal {
86    (balance + precredited_cash).max(Decimal::ZERO)
87}
88
89fn effective_pm_cash_delta_after_precredit(
90    settlement_cash_delta: Decimal,
91    remaining_precredited_cash: &mut Decimal,
92) -> Decimal {
93    if settlement_cash_delta <= Decimal::ZERO {
94        return settlement_cash_delta;
95    }
96
97    let applied_precredit = (*remaining_precredited_cash).min(settlement_cash_delta);
98    *remaining_precredited_cash -= applied_precredit;
99    settlement_cash_delta - applied_precredit
100}
101
102fn simulated_recoverable_after_pm_front(
103    current_recoverable: Decimal,
104    pool_front_usdc: Decimal,
105) -> Decimal {
106    (current_recoverable - pool_front_usdc).max(Decimal::ZERO)
107}
108
109fn apply_engine_balance_update(
110    ctx: &mut EngineCtx,
111    output: Option<&mut ApplyOutput>,
112    wallet: WalletAddress,
113    delta: Decimal,
114    reason: hypercall_types::BalanceUpdateReason,
115    reference_id: Option<String>,
116    timestamp_ms: u64,
117) -> Decimal {
118    let balance_after = ctx.balance_ledger.balance(&wallet) + delta;
119    let update = hypercall_types::BalanceUpdate {
120        balance_update_seq: ctx.balance_ledger.next_balance_update_seq(),
121        wallet,
122        delta,
123        balance_after,
124        reason,
125        reference_id,
126        source_command_id: None,
127        timestamp_ms,
128    };
129    ctx.balance_ledger
130        .apply_balance_update(&update)
131        .unwrap_or_else(|error| {
132            panic!(
133                "CRITICAL: failed to apply engine balance update during expiry: {}",
134                error
135            )
136        });
137    if let Some(output) = output {
138        output.balance_updates.push(update);
139    }
140    balance_after
141}
142
143fn parse_settlement_instrument_or_panic(symbol: &str, context: &str) -> SettlementInstrument {
144    SettlementInstrument::from_symbol(symbol).unwrap_or_else(|e| {
145        panic!(
146            "CRITICAL_FAILURE: failed to parse canonical settlement symbol for {}: {} ({})",
147            context, symbol, e
148        )
149    })
150}
151
152fn digest_pm_settlement_inputs(
153    event_key: &PmSettlementEventKey,
154    obligation: &PmSettlementObligation,
155    liquid_usdc: Decimal,
156    pm_facts: Option<&PmAccountSettlementFacts>,
157    pool_snapshot: Option<&hypercall_margin::portfolio::PmSettlementPoolSnapshot>,
158    policy_version: u32,
159    unavailable_reason: Option<&str>,
160) -> Result<String, String> {
161    let payload = serde_json::json!({
162        "event_key": event_key,
163        "obligation": obligation,
164        "liquid_usdc": liquid_usdc,
165        "pm_facts": pm_facts,
166        "pool_snapshot": pool_snapshot,
167        "policy_version": policy_version,
168        "unavailable_reason": unavailable_reason,
169    });
170    let bytes = serde_json::to_vec(&payload)
171        .map_err(|error| format!("failed to serialize PM settlement facts digest: {error}"))?;
172    let mut hasher = Sha256::new();
173    hasher.update(bytes);
174    Ok(format!("sha256:{}", hex::encode(hasher.finalize())))
175}
176
177fn deterministic_pm_settlement_request_id(event_key: &PmSettlementEventKey) -> uuid::Uuid {
178    uuid::Uuid::new_v5(
179        &uuid::Uuid::NAMESPACE_OID,
180        format!(
181            "pm-settlement:{}:{}:{}:{}:{}",
182            event_key.wallet,
183            event_key.market_id,
184            event_key.expiry_ts_ms,
185            event_key.margin_mode,
186            event_key.settlement_event_sequence
187        )
188        .as_bytes(),
189    )
190}
191
192fn margin_mode_for_settlement(
193    ctx: &EngineCtx,
194    wallet: WalletAddress,
195) -> Result<MarginMode, String> {
196    if let Some(margin_mode) = ctx.deps.wallet_margin_modes.get(&wallet).copied() {
197        return Ok(margin_mode);
198    }
199
200    let Some(handler) = ctx.db.as_ref() else {
201        return Ok(MarginMode::Standard);
202    };
203    let margin_mode = handler.get_margin_mode_sync(&wallet).map_err(|error| {
204        format!(
205            "missing durable margin mode for settlement wallet {}: {}",
206            wallet, error
207        )
208    })?;
209    Ok(margin_mode)
210}
211
212struct SettlementApplyInput<'a> {
213    wallet: WalletAddress,
214    symbol: &'a str,
215    option_type: SettlementOptionType,
216    strike: Decimal,
217    reference_price: Decimal,
218    position_size: Decimal,
219    entry_price: Decimal,
220    margin_mode: MarginMode,
221    timestamp_ms: u64,
222}
223
224struct SettlementApplyIntent {
225    message: hypercall_types::PositionExpiredMessage,
226    cash_delta: Decimal,
227}
228
229fn build_settlement_apply_intent(input: SettlementApplyInput<'_>) -> SettlementApplyIntent {
230    let instrument = parse_settlement_instrument_or_panic(input.symbol, "settlement apply intent");
231    let position = SettlementPosition {
232        wallet: input.wallet,
233        position_size: input.position_size,
234        entry_price: input.entry_price,
235    };
236    let settlement =
237        hypercall_settlement::settle_position(&hypercall_settlement::SettlementInput {
238            option_type: input.option_type,
239            strike: input.strike,
240            reference_price: input.reference_price,
241            position_size: input.position_size,
242            entry_price: input.entry_price,
243        });
244    let cash_delta = expiry_cash_delta_for_margin_mode(
245        input.margin_mode,
246        settlement.settlement_value,
247        settlement.net_pnl,
248    );
249
250    SettlementApplyIntent {
251        message: build_position_expired_message(
252            &instrument,
253            &position,
254            &settlement,
255            input.margin_mode,
256            input.timestamp_ms,
257        ),
258        cash_delta,
259    }
260}
261
262impl Default for ExpiryManager {
263    fn default() -> Self {
264        Self::new()
265    }
266}
267
268impl ExpiryManager {
269    /// Create an empty `ExpiryManager`.
270    pub fn new() -> Self {
271        Self {
272            expiry_schedules: BTreeMap::new(),
273        }
274    }
275
276    // ===== Query helpers =====
277
278    /// Check if an instrument is expired using canonical orderbook expiry.
279    pub fn is_instrument_expired(
280        &self,
281        symbol: &str,
282        orderbooks: &HashMap<String, hypercall_engine::OrderBook>,
283    ) -> bool {
284        if !orderbooks.contains_key(symbol) {
285            return false;
286        };
287        let parsed = ParsedSymbol::from_symbol(symbol).unwrap_or_else(|error| {
288            panic!(
289                "STATE_CORRUPTION: orderbook symbol {} cannot be parsed for expiry validation: {}",
290                symbol, error
291            )
292        });
293        let expiry_ts = expiry_date_to_timestamp(&parsed.underlying, parsed.expiry);
294        if expiry_ts == 0 {
295            panic!(
296                "STATE_CORRUPTION: orderbook symbol {} has invalid expiry code {}",
297                symbol, parsed.expiry
298            );
299        }
300        let now = crate::shared::clock::unix_now_secs();
301        now >= expiry_ts
302    }
303
304    /// Schedule an expiry for a symbol.
305    pub fn schedule_expiry(&mut self, symbol: String, expiry_timestamp: u64) {
306        self.expiry_schedules
307            .entry(expiry_timestamp)
308            .or_default()
309            .push(symbol);
310    }
311
312    // ===== Settlement price =====
313
314    /// Get settlement price from oracle for a specific expiry.
315    pub async fn get_settlement_price(
316        &self,
317        deps: &EngineDeps,
318        underlying: &str,
319        expiry_ts: i64,
320    ) -> Option<Decimal> {
321        if let Some(oracle) = deps.mark_price_oracles.get(underlying) {
322            if let Some(price) = oracle.get_settlement_price(expiry_ts).await {
323                match Decimal::from_f64_retain(price) {
324                    Some(decimal_price) => {
325                        debug!(
326                            "Got settlement price from oracle for {} expiry {}: {}",
327                            underlying, expiry_ts, decimal_price
328                        );
329                        return Some(decimal_price);
330                    }
331                    None => {
332                        error!(
333                            "Invalid oracle settlement price for {} expiry {}: {} - cannot convert to Decimal",
334                            underlying, expiry_ts, price
335                        );
336                        return None;
337                    }
338                }
339            }
340        }
341
342        if let Some(&price) = deps.reference_prices.get(underlying) {
343            match Decimal::from_f64_retain(price) {
344                Some(decimal_price) => {
345                    debug!(
346                        "Using test reference price for {} expiry {}: {}",
347                        underlying, expiry_ts, decimal_price
348                    );
349                    return Some(decimal_price);
350                }
351                None => {
352                    error!(
353                        "Invalid reference price for {} expiry {}: {} - cannot convert to Decimal",
354                        underlying, expiry_ts, price
355                    );
356                    return None;
357                }
358            }
359        }
360
361        None
362    }
363
364    // ===== Settlement processing =====
365
366    /// Transition instruments to EXPIRED_PENDING_PRICE status.
367    ///
368    /// Blocks new orders, cancels open orders, removes orderbooks.
369    pub fn transition_to_pending_settlement(
370        &mut self,
371        symbols: &[String],
372        now_ms: u64,
373        ctx: &mut EngineCtx,
374    ) {
375        if symbols.is_empty() {
376            return;
377        }
378
379        let timestamp = now_ms;
380        let mut total_cancelled = 0;
381        let mut cancelled_order_ids: Vec<i64> = Vec::new();
382
383        for symbol in symbols {
384            ctx.expired_instruments.insert(symbol.clone(), true);
385
386            if let Some(orderbook) = ctx.orderbooks.get(symbol) {
387                let open_orders = orderbook.get_all_orders();
388                let order_count = open_orders.len();
389
390                let underlying =
391                    parse_settlement_instrument_or_panic(symbol, "expiry transition").underlying;
392
393                for r in open_orders {
394                    let (order_id, price, size, side, wallet) =
395                        (r.order_id, r.price, r.quantity, r.side, r.wallet);
396                    let cancel_message = OrderUpdateMessage {
397                        timestamp,
398                        info: OrderInfo {
399                            symbol: symbol.clone(),
400                            price,
401                            size,
402                            side,
403                            tif: hypercall_types::TimeInForce::GTC,
404                            client_id: None,
405                            order_id: Some(order_id),
406                            is_perp: false,
407                            underlying: Some(underlying.clone()),
408                            reduce_only: None,
409                            nonce: None,
410                            signature: None,
411                            mmp_enabled: false,
412                            builder_code_address: None,
413                        },
414                        status: OrderUpdateStatus::Canceled,
415                        reason: Some("Instrument expired".to_string()),
416                        filled_size: dec!(0),
417                        order_id: Some(order_id),
418                        wallet_address: wallet,
419                        mmp_triggered: false,
420                        request_id: None,
421                    };
422
423                    // Remove from order index so the engine snapshot no longer
424                    // reports these as open orders.  Without this, the /orders
425                    // API continues to serve them and users see un-cancellable
426                    // ghost orders on expired instruments (CALL-553).
427                    ctx.order_index.remove_order(&wallet, order_id);
428
429                    cancelled_order_ids.push(order_id as i64);
430
431                    ctx.deps
432                        .event_sender
433                        .send(EngineMessage::OrderUpdate(cancel_message))
434                        .unwrap_or_else(|e| {
435                            panic!(
436                                "CRITICAL_FAILURE: Failed to send OrderUpdate (cancel) for order {} on symbol {}: {}. \
437                                 Order cancellation event lost during expiry. Restart required.",
438                                order_id, symbol, e
439                            )
440                        });
441                }
442
443                total_cancelled += order_count;
444            }
445
446            ctx.orderbooks.remove(symbol);
447        }
448
449        if let Some(handler) = ctx.db.as_ref() {
450            if let Err(e) = handler.transition_active_instruments_to_expired_pending_sync(symbols) {
451                panic!(
452                    "CRITICAL_FAILURE: Failed to update instrument status to EXPIRED_PENDING_PRICE for {:?}: {}. \
453                     Memory state shows instruments as expired but database disagrees. Restart required.",
454                    symbols, e
455                );
456            }
457
458            // Persist cancel status to order_infos so the DB reflects
459            // the in-memory state. Without this, settlement cancels go
460            // through the EventBus (WS delivery) but bypass the journal
461            // batcher, leaving order_infos with stale OPEN/ACKED status.
462            if !cancelled_order_ids.is_empty() {
463                for chunk in cancelled_order_ids.chunks(5000) {
464                    if let Err(e) =
465                        handler.batch_cancel_orders_for_settlement_sync(chunk, now_ms as i64)
466                    {
467                        error!(
468                            order_count = chunk.len(),
469                            "Failed to persist settlement order cancels to order_infos: {}. \
470                             Orders are cancelled in-memory but DB still shows them as open. \
471                             Will appear as orphaned orders in integrity checks.",
472                            e
473                        );
474                    }
475                }
476            }
477
478            // Defensive cleanup: cancel any non-terminal order_infos rows on
479            // these symbols that were NOT in the orderbook. This catches orders
480            // whose cancel was lost across restarts (journal archived before
481            // materialization). Without this, order_infos keeps stale OPEN
482            // status and the orphaned-settlement-orders canary fires (CALL-1066).
483            let symbol_list: Vec<String> = symbols.to_vec();
484            match handler.cancel_orphaned_orders_by_symbols_sync(&symbol_list) {
485                Ok(0) => {}
486                Ok(n) => {
487                    warn!(
488                        "Settlement defensive cleanup: cancelled {} orphaned order_infos rows \
489                         on {} settling instruments (these were not in the orderbook)",
490                        n,
491                        symbols.len()
492                    );
493                }
494                Err(e) => {
495                    error!(
496                        "Failed settlement defensive cleanup for {:?}: {}. \
497                         Orphaned orders may remain in order_infos.",
498                        symbols, e
499                    );
500                }
501            }
502        }
503
504        info!(
505            "Transitioned {} instruments to EXPIRED_PENDING_PRICE, cancelled {} orders",
506            symbols.len(),
507            total_cancelled
508        );
509    }
510
511    /// Update instrument status in database.
512    pub fn update_instrument_status(
513        &self,
514        symbols: &[String],
515        status: &str,
516        db: Option<&DatabaseHandler>,
517    ) {
518        if symbols.is_empty() {
519            return;
520        }
521        if let Some(handler) = db {
522            let update_result = if status == "EXPIRED_PENDING_PRICE" {
523                handler.transition_active_instruments_to_expired_pending_sync(symbols)
524            } else {
525                handler.update_instrument_status_sync(symbols, status)
526            };
527            if let Err(e) = update_result {
528                panic!(
529                    "CRITICAL_FAILURE: Failed to update instrument status to {} for {:?}: {}. \
530                     Memory state and database are now inconsistent. Restart required.",
531                    status, symbols, e
532                );
533            }
534        }
535        debug!("Updated {} instruments to status {}", symbols.len(), status);
536    }
537
538    /// Get all instruments pending settlement.
539    pub fn get_pending_settlement_instruments(
540        &self,
541        db: Option<&DatabaseHandler>,
542    ) -> Vec<(String, i64, Vec<String>)> {
543        let Some(handler) = db else {
544            return Vec::new();
545        };
546
547        let instruments = match handler.get_instruments_by_status_sync("EXPIRED_PENDING_PRICE") {
548            Ok(instruments) => instruments,
549            Err(e) => {
550                error!("Failed to query pending settlement instruments: {}", e);
551                return Vec::new();
552            }
553        };
554
555        let mut grouped: HashMap<(String, u64), Vec<String>> = HashMap::new();
556        for instrument in instruments {
557            let parsed =
558                parse_settlement_instrument_or_panic(&instrument.id, "pending settlement query");
559            grouped
560                .entry((parsed.underlying.clone(), parsed.expiry_ts as u64))
561                .or_default()
562                .push(instrument.id);
563        }
564
565        grouped
566            .into_iter()
567            .map(|((underlying, expiry_ts), symbols)| (underlying, expiry_ts as i64, symbols))
568            .collect()
569    }
570
571    /// Prepare deterministic context for a TickExpiry command.
572    ///
573    /// This is the runtime side of the TickExpiry boundary. It may read DB
574    /// state and settlement oracles; apply() must only consume the returned
575    /// context.
576    pub async fn prepare_tick_expiry_context(
577        &self,
578        now_ms: u64,
579        ctx: &EngineCtx,
580        margin_manager: &MarginManager,
581    ) -> Result<TickExpiryContext, String> {
582        let current_time = now_ms / 1000;
583        let due_expiries = self.collect_due_expiries(current_time, ctx.db.as_ref())?;
584
585        let pending_settlements: Vec<TickExpiryPendingGroup> = self
586            .get_pending_settlement_instruments(ctx.db.as_ref())
587            .into_iter()
588            .map(|(underlying, expiry_ts, symbols)| TickExpiryPendingGroup {
589                underlying,
590                expiry_ts,
591                symbols,
592            })
593            .collect();
594
595        let mut price_keys: BTreeSet<(String, i64)> = BTreeSet::new();
596        for group in &due_expiries {
597            for symbol in &group.symbols {
598                let parsed =
599                    parse_settlement_instrument_or_panic(symbol, "tick expiry price lookup");
600                price_keys.insert((parsed.underlying, group.expiry_ts));
601            }
602        }
603        for group in &pending_settlements {
604            price_keys.insert((group.underlying.clone(), group.expiry_ts));
605        }
606
607        let mut settlement_prices = Vec::new();
608        for (underlying, expiry_ts) in &price_keys {
609            if let Some(price) = self
610                .get_settlement_price(&ctx.deps, underlying, *expiry_ts)
611                .await
612            {
613                settlement_prices.push(TickExpirySettlementPrice {
614                    underlying: underlying.clone(),
615                    expiry_ts: *expiry_ts,
616                    price,
617                });
618            }
619        }
620
621        let priced_symbols = Self::priced_symbols_for_context(
622            &due_expiries,
623            &pending_settlements,
624            &settlement_prices,
625        );
626
627        let positions_requiring_margin_modes: Vec<(WalletAddress, String)> = ctx
628            .engine_positions
629            .iter()
630            .filter_map(|((wallet, symbol), position)| {
631                if priced_symbols.contains(symbol) && position.quantity != Decimal::ZERO {
632                    Some((*wallet, symbol.clone()))
633                } else {
634                    None
635                }
636            })
637            .collect();
638
639        let mut margin_modes_by_wallet = BTreeMap::new();
640        for (wallet, _symbol) in positions_requiring_margin_modes {
641            let margin_mode =
642                if let Some(margin_mode) = ctx.deps.wallet_margin_modes.get(&wallet).copied() {
643                    margin_mode
644                } else if let Some(handler) = ctx.db.as_ref() {
645                    handler.get_margin_mode_sync(&wallet).map_err(|error| {
646                        format!(
647                            "failed to load margin mode for expiry settlement wallet {}: {}",
648                            wallet, error
649                        )
650                    })?
651                } else {
652                    MarginMode::Standard
653                };
654            margin_modes_by_wallet.insert(wallet, margin_mode);
655        }
656
657        let margin_modes = margin_modes_by_wallet
658            .into_iter()
659            .map(|(wallet, margin_mode)| TickExpiryWalletMarginMode {
660                pm_settlement_required: margin_mode == MarginMode::Portfolio
661                    && ctx.deps.portfolio_margin_pool_enabled
662                    && ctx
663                        .deps
664                        .portfolio_margin_settlement_allowlist
665                        .contains(&wallet),
666                wallet,
667                margin_mode,
668            })
669            .collect::<Vec<_>>();
670        let pm_settlements = self.prepare_pm_settlements(
671            &due_expiries,
672            &pending_settlements,
673            &settlement_prices,
674            &margin_modes,
675            now_ms,
676            ctx,
677            margin_manager,
678        )?;
679
680        Ok(TickExpiryContext {
681            due_expiries,
682            pending_settlements,
683            settlement_prices,
684            margin_modes,
685            pm_settlements,
686        })
687    }
688
689    #[allow(clippy::too_many_arguments)]
690    pub(crate) fn prepare_pm_settlements(
691        &self,
692        due_expiries: &[TickExpiryDueGroup],
693        pending_settlements: &[TickExpiryPendingGroup],
694        settlement_prices: &[TickExpirySettlementPrice],
695        margin_modes: &[TickExpiryWalletMarginMode],
696        now_ms: u64,
697        ctx: &EngineCtx,
698        margin_manager: &MarginManager,
699    ) -> Result<Vec<TickExpiryPmSettlement>, String> {
700        if !ctx.deps.portfolio_margin_pool_enabled
701            || ctx.deps.portfolio_margin_settlement_allowlist.is_empty()
702        {
703            return Ok(Vec::new());
704        }
705
706        let margin_modes_by_wallet: HashMap<WalletAddress, MarginMode> = margin_modes
707            .iter()
708            .map(|mode| (mode.wallet, mode.margin_mode))
709            .collect();
710        let price_map: HashMap<(String, i64), Decimal> = settlement_prices
711            .iter()
712            .map(|price| ((price.underlying.clone(), price.expiry_ts), price.price))
713            .collect();
714        let mut symbol_expiries = Vec::new();
715        let mut seen_symbols = HashSet::new();
716        for group in due_expiries {
717            for symbol in Self::ordered_due_group_symbols(group) {
718                if seen_symbols.insert(symbol.clone()) {
719                    symbol_expiries.push((symbol, group.expiry_ts));
720                }
721            }
722        }
723        for group in pending_settlements {
724            for symbol in &group.symbols {
725                if seen_symbols.insert(symbol.clone()) {
726                    symbol_expiries.push((symbol.clone(), group.expiry_ts));
727                }
728            }
729        }
730
731        let mut precredited_positive_cash_by_wallet: HashMap<WalletAddress, Decimal> =
732            HashMap::new();
733        for (symbol, expiry_ts) in &symbol_expiries {
734            let instrument =
735                parse_settlement_instrument_or_panic(symbol, "PM settlement gain preparation");
736            let Some(reference_price) = price_map
737                .get(&(instrument.underlying.clone(), *expiry_ts))
738                .copied()
739            else {
740                continue;
741            };
742            let mut positions = ctx
743                .engine_positions
744                .iter()
745                .filter_map(|((wallet, position_symbol), position)| {
746                    if position_symbol == symbol && position.quantity != Decimal::ZERO {
747                        Some((*wallet, position.quantity, position.entry_price))
748                    } else {
749                        None
750                    }
751                })
752                .collect::<Vec<_>>();
753            positions.sort_by_key(|(wallet, _, _)| *wallet);
754
755            for (wallet, quantity, entry_price) in positions {
756                let margin_mode = margin_modes_by_wallet
757                    .get(&wallet)
758                    .copied()
759                    .or_else(|| ctx.deps.wallet_margin_modes.get(&wallet).copied())
760                    .unwrap_or(MarginMode::Standard);
761                if margin_mode != MarginMode::Portfolio
762                    || !ctx
763                        .deps
764                        .portfolio_margin_settlement_allowlist
765                        .contains(&wallet)
766                {
767                    continue;
768                }
769                let settlements = plan_position_settlements(
770                    &instrument,
771                    reference_price,
772                    &[SettlementPosition {
773                        wallet,
774                        position_size: quantity,
775                        entry_price,
776                    }],
777                );
778                let Some((_, settlement)) = settlements.into_iter().next() else {
779                    continue;
780                };
781                let cash_delta = expiry_cash_delta_for_margin_mode(
782                    MarginMode::Portfolio,
783                    settlement.settlement_value,
784                    settlement.net_pnl,
785                );
786                if cash_delta <= Decimal::ZERO {
787                    continue;
788                }
789                let (_, _, _, unavailable_reason) = self.prepare_pm_settlement_fact_bundle(
790                    wallet,
791                    &instrument.underlying,
792                    ctx.balance_ledger.balance(&wallet).max(Decimal::ZERO),
793                    ctx.pm_settlement_state.pools.get(&instrument.underlying),
794                    now_ms,
795                    ctx,
796                    margin_manager,
797                );
798                if unavailable_reason.is_none() {
799                    *precredited_positive_cash_by_wallet
800                        .entry(wallet)
801                        .or_default() += cash_delta;
802                }
803            }
804        }
805
806        let mut remaining_precredited_cash_by_wallet = precredited_positive_cash_by_wallet.clone();
807        let mut pm_settlements = Vec::new();
808        let mut settlement_event_sequence = 0_u64;
809        let mut available_liquid_by_wallet: HashMap<WalletAddress, Decimal> = HashMap::new();
810        let mut recoverable_by_wallet: HashMap<WalletAddress, Decimal> = HashMap::new();
811        let mut simulated_pools = ctx.pm_settlement_state.pools.clone();
812        for (symbol, expiry_ts) in symbol_expiries {
813            let instrument =
814                parse_settlement_instrument_or_panic(&symbol, "PM settlement fact preparation");
815            let Some(reference_price) = price_map
816                .get(&(instrument.underlying.clone(), expiry_ts))
817                .copied()
818            else {
819                continue;
820            };
821
822            let mut positions = ctx
823                .engine_positions
824                .iter()
825                .filter_map(|((wallet, position_symbol), position)| {
826                    if position_symbol == &symbol && position.quantity != Decimal::ZERO {
827                        Some((*wallet, position.quantity, position.entry_price))
828                    } else {
829                        None
830                    }
831                })
832                .collect::<Vec<_>>();
833            positions.sort_by_key(|(wallet, _, _)| *wallet);
834
835            for (wallet, quantity, entry_price) in positions {
836                let margin_mode = margin_modes_by_wallet
837                    .get(&wallet)
838                    .copied()
839                    .or_else(|| ctx.deps.wallet_margin_modes.get(&wallet).copied())
840                    .unwrap_or(MarginMode::Standard);
841                if margin_mode != MarginMode::Portfolio
842                    || !ctx
843                        .deps
844                        .portfolio_margin_settlement_allowlist
845                        .contains(&wallet)
846                {
847                    continue;
848                }
849
850                let settlements = plan_position_settlements(
851                    &instrument,
852                    reference_price,
853                    &[SettlementPosition {
854                        wallet,
855                        position_size: quantity,
856                        entry_price,
857                    }],
858                );
859                let Some((_, settlement)) = settlements.into_iter().next() else {
860                    continue;
861                };
862                let expiry_ts_ms = expiry_ts
863                    .checked_mul(1000)
864                    .ok_or_else(|| format!("expiry timestamp overflow for {symbol}"))?;
865                let obligation = PmSettlementObligation {
866                    wallet,
867                    market_id: symbol.clone(),
868                    expiry_ts_ms,
869                    underlying: instrument.underlying.clone(),
870                    net_pnl_usdc: settlement.net_pnl,
871                    settlement_obligation_usdc: Decimal::ZERO.max(-settlement.net_pnl),
872                };
873                let liquid_usdc = *available_liquid_by_wallet.entry(wallet).or_insert_with(|| {
874                    initial_pm_liquid_with_precredit(
875                        ctx.balance_ledger.balance(&wallet),
876                        precredited_positive_cash_by_wallet
877                            .get(&wallet)
878                            .copied()
879                            .unwrap_or(Decimal::ZERO),
880                    )
881                });
882                let event_key = PmSettlementEventKey {
883                    wallet,
884                    market_id: symbol.clone(),
885                    expiry_ts_ms,
886                    margin_mode: "portfolio".to_string(),
887                    settlement_event_sequence,
888                };
889                settlement_event_sequence =
890                    settlement_event_sequence.checked_add(1).ok_or_else(|| {
891                        "PM settlement event sequence overflow during expiry preparation"
892                            .to_string()
893                    })?;
894                let request_id = deterministic_pm_settlement_request_id(&event_key);
895                let (mut pm_facts, pool_snapshot, policy_version, unavailable_reason) = self
896                    .prepare_pm_settlement_fact_bundle(
897                        wallet,
898                        &instrument.underlying,
899                        liquid_usdc,
900                        simulated_pools.get(&instrument.underlying),
901                        now_ms,
902                        ctx,
903                        margin_manager,
904                    );
905                if let Some(facts) = pm_facts.as_mut() {
906                    let remaining_recoverable = recoverable_by_wallet
907                        .entry(wallet)
908                        .or_insert(facts.recoverable_collateral_usdc);
909                    facts.recoverable_collateral_usdc = *remaining_recoverable;
910                }
911                let facts_digest = digest_pm_settlement_inputs(
912                    &event_key,
913                    &obligation,
914                    liquid_usdc,
915                    pm_facts.as_ref(),
916                    pool_snapshot.as_ref(),
917                    policy_version,
918                    unavailable_reason.as_deref(),
919                )?;
920                let reserved_classification = match (
921                    unavailable_reason.as_ref(),
922                    pm_facts.as_ref(),
923                    pool_snapshot.as_ref(),
924                    simulated_pools.get(&instrument.underlying),
925                ) {
926                    (None, Some(facts), Some(snapshot), Some(pool)) => {
927                        pool.config.as_ref().and_then(|config| {
928                            classify_liquidity_gap(&obligation, facts, snapshot, config).ok()
929                        })
930                    }
931                    _ => None,
932                };
933                if let Some(classification) = reserved_classification {
934                    let planned_pool_front_usdc = if matches!(
935                        classification,
936                        PmLiquidityClassification::TimingBridge
937                            | PmLiquidityClassification::SettlementDebt
938                    ) {
939                        (obligation.settlement_obligation_usdc - liquid_usdc).max(Decimal::ZERO)
940                    } else {
941                        Decimal::ZERO
942                    };
943                    if classification != PmLiquidityClassification::Unavailable {
944                        if let Some(available) = available_liquid_by_wallet.get_mut(&wallet) {
945                            let settlement_cash_delta = expiry_cash_delta_for_margin_mode(
946                                MarginMode::Portfolio,
947                                settlement.settlement_value,
948                                settlement.net_pnl,
949                            );
950                            let effective_cash_delta = effective_pm_cash_delta_after_precredit(
951                                settlement_cash_delta,
952                                remaining_precredited_cash_by_wallet
953                                    .entry(wallet)
954                                    .or_default(),
955                            );
956                            *available = simulated_pm_liquid_after_settlement(
957                                *available,
958                                effective_cash_delta,
959                                planned_pool_front_usdc,
960                            );
961                        }
962                        if let Some(remaining_recoverable) = recoverable_by_wallet.get_mut(&wallet)
963                        {
964                            *remaining_recoverable = simulated_recoverable_after_pm_front(
965                                *remaining_recoverable,
966                                planned_pool_front_usdc,
967                            );
968                        }
969                    }
970                    if matches!(
971                        classification,
972                        PmLiquidityClassification::TimingBridge
973                            | PmLiquidityClassification::SettlementDebt
974                    ) {
975                        if planned_pool_front_usdc > Decimal::ZERO {
976                            if let Some(pool) = simulated_pools.get_mut(&instrument.underlying) {
977                                pool.pool_available_usdc -= planned_pool_front_usdc;
978                                match classification {
979                                    PmLiquidityClassification::TimingBridge => {
980                                        pool.active_timing_bridge_usdc += planned_pool_front_usdc;
981                                    }
982                                    PmLiquidityClassification::SettlementDebt => {
983                                        pool.active_settlement_debt_usdc += planned_pool_front_usdc;
984                                    }
985                                    PmLiquidityClassification::Paid
986                                    | PmLiquidityClassification::Unavailable => {}
987                                }
988                            }
989                        }
990                    }
991                }
992
993                pm_settlements.push(TickExpiryPmSettlement {
994                    request_id,
995                    event_key,
996                    wallet,
997                    market_id: symbol.clone(),
998                    underlying: instrument.underlying.clone(),
999                    expiry_ts_ms,
1000                    settlement_obligation_usdc: obligation.settlement_obligation_usdc,
1001                    liquid_usdc,
1002                    pm_facts,
1003                    pool_snapshot,
1004                    policy_version,
1005                    facts_digest,
1006                    unavailable_reason,
1007                    obligation,
1008                });
1009            }
1010        }
1011        Ok(pm_settlements)
1012    }
1013
1014    fn ordered_due_group_symbols(group: &TickExpiryDueGroup) -> Vec<String> {
1015        let mut underlying_symbols: BTreeMap<String, Vec<String>> = BTreeMap::new();
1016        for symbol in &group.symbols {
1017            let parsed = parse_settlement_instrument_or_panic(symbol, "tick expiry grouping");
1018            underlying_symbols
1019                .entry(parsed.underlying)
1020                .or_default()
1021                .push(symbol.clone());
1022        }
1023        underlying_symbols.into_values().flatten().collect()
1024    }
1025
1026    fn prepare_pm_settlement_fact_bundle(
1027        &self,
1028        wallet: WalletAddress,
1029        underlying: &str,
1030        liquid_usdc: Decimal,
1031        pool: Option<&crate::rsm::portfolio_margin::settlement_state::PmSettlementPoolState>,
1032        now_ms: u64,
1033        ctx: &EngineCtx,
1034        margin_manager: &MarginManager,
1035    ) -> (
1036        Option<PmAccountSettlementFacts>,
1037        Option<hypercall_margin::portfolio::PmSettlementPoolSnapshot>,
1038        u32,
1039        Option<String>,
1040    ) {
1041        let Some(pool) = pool else {
1042            return (
1043                None,
1044                None,
1045                0,
1046                Some(format!("missing PM settlement pool for {underlying}")),
1047            );
1048        };
1049        let Some(config) = pool.config.as_ref() else {
1050            return (
1051                None,
1052                Some(pool.snapshot()),
1053                pool.policy_version,
1054                Some(format!("missing PM settlement config for {underlying}")),
1055            );
1056        };
1057        let margin_details = match margin_manager.get_span_margin_for_wallet(
1058            &ctx.deps,
1059            &ctx.engine_positions,
1060            ctx.balance_ledger.balances(),
1061            &wallet,
1062        ) {
1063            Ok(Some(details)) => details,
1064            Ok(None) => {
1065                return (
1066                    None,
1067                    Some(pool.snapshot()),
1068                    config.policy_version,
1069                    Some(format!("missing PM margin details for {wallet}")),
1070                );
1071            }
1072            Err(error) => {
1073                return (
1074                    None,
1075                    Some(pool.snapshot()),
1076                    config.policy_version,
1077                    Some(format!("PM margin facts unavailable for {wallet}: {error}")),
1078                );
1079            }
1080        };
1081        let recoverable =
1082            (margin_details.equity - margin_details.maintenance_margin_required).max(Decimal::ZERO);
1083        (
1084            Some(PmAccountSettlementFacts {
1085                wallet,
1086                underlying: underlying.to_string(),
1087                liquid_usdc,
1088                pm_equity_usdc: margin_details.equity,
1089                pm_maintenance_requirement_usdc: margin_details.maintenance_margin_required,
1090                recoverable_collateral_usdc: recoverable,
1091                facts_as_of_ms: now_ms as i64,
1092                stale: false,
1093            }),
1094            Some(pool.snapshot()),
1095            config.policy_version,
1096            None,
1097        )
1098    }
1099
1100    fn collect_due_expiries(
1101        &self,
1102        current_time: u64,
1103        db: Option<&DatabaseHandler>,
1104    ) -> Result<Vec<TickExpiryDueGroup>, String> {
1105        let mut grouped: BTreeMap<u64, Vec<String>> = self
1106            .expiry_schedules
1107            .range(..=current_time)
1108            .map(|(expiry_ts, symbols)| (*expiry_ts, symbols.clone()))
1109            .collect();
1110        let mut scheduled_symbols: HashSet<String> = grouped.values().flatten().cloned().collect();
1111
1112        let Some(handler) = db else {
1113            return Ok(grouped
1114                .into_iter()
1115                .map(|(expiry_ts, symbols)| TickExpiryDueGroup {
1116                    expiry_ts: expiry_ts as i64,
1117                    symbols,
1118                })
1119                .collect());
1120        };
1121
1122        let expired_active = handler
1123            .get_active_instruments_expired_by_sync(current_time)
1124            .map_err(|error| {
1125                format!(
1126                    "Failed to query expired ACTIVE instruments during expiry reconciliation: {}",
1127                    error
1128                )
1129            })?;
1130
1131        let mut reconciled_count = 0usize;
1132        for instrument in expired_active {
1133            if !scheduled_symbols.insert(instrument.id.clone()) {
1134                continue;
1135            }
1136
1137            let expiry_code = u64::try_from(instrument.expiry).map_err(|_| {
1138                format!(
1139                    "Invalid negative expiry {} for active instrument {}",
1140                    instrument.expiry, instrument.id
1141                )
1142            })?;
1143            let expiry_ts = expiry_date_to_timestamp(&instrument.underlying, expiry_code);
1144            if expiry_ts == 0 {
1145                return Err(format!(
1146                    "Invalid expiry {} for active instrument {}",
1147                    instrument.expiry, instrument.id
1148                ));
1149            }
1150
1151            grouped.entry(expiry_ts).or_default().push(instrument.id);
1152            reconciled_count += 1;
1153        }
1154
1155        if reconciled_count > 0 {
1156            warn!(
1157                reconciled_count,
1158                "Expiry tick found expired ACTIVE instruments missing from in-memory schedule; \
1159                 routing them through TickExpiry reconciliation"
1160            );
1161        }
1162
1163        Ok(grouped
1164            .into_iter()
1165            .map(|(expiry_ts, symbols)| TickExpiryDueGroup {
1166                expiry_ts: expiry_ts as i64,
1167                symbols,
1168            })
1169            .collect())
1170    }
1171
1172    /// Apply a TickExpiry command using only explicit command context.
1173    pub fn apply_tick_expiry(
1174        &mut self,
1175        now_ms: u64,
1176        context: TickExpiryContext,
1177        ctx: &mut EngineCtx,
1178        margin_manager: &MarginManager,
1179        output: &mut ApplyOutput,
1180    ) -> Result<(), String> {
1181        let margin_modes = self.effective_tick_expiry_margin_modes(&context, ctx)?;
1182        let mut pm_settlement_required_wallets: HashSet<WalletAddress> = context
1183            .margin_modes
1184            .iter()
1185            .filter_map(|mode| mode.pm_settlement_required.then_some(mode.wallet))
1186            .collect();
1187        pm_settlement_required_wallets.extend(margin_modes.iter().filter_map(
1188            |(wallet, margin_mode)| {
1189                (*margin_mode == MarginMode::Portfolio
1190                    && ctx.deps.portfolio_margin_pool_enabled
1191                    && ctx
1192                        .deps
1193                        .portfolio_margin_settlement_allowlist
1194                        .contains(wallet))
1195                .then_some(*wallet)
1196            },
1197        ));
1198        let price_map: HashMap<(String, i64), Decimal> = context
1199            .settlement_prices
1200            .iter()
1201            .map(|price| ((price.underlying.clone(), price.expiry_ts), price.price))
1202            .collect();
1203        self.validate_tick_expiry_pm_settlements(now_ms, &context, ctx, margin_manager)?;
1204        let expected_pm_settlement_count = context.pm_settlements.len() as u64;
1205        let pm_settlements: HashMap<(WalletAddress, String), TickExpiryPmSettlement> = context
1206            .pm_settlements
1207            .into_iter()
1208            .map(|settlement| {
1209                (
1210                    (settlement.wallet, settlement.market_id.clone()),
1211                    settlement,
1212                )
1213            })
1214            .collect();
1215        let mut next_pm_settlement_sequence = 0_u64;
1216
1217        for group in context.due_expiries {
1218            let group_symbols: HashSet<&str> = group.symbols.iter().map(String::as_str).collect();
1219            let Some(schedule_expiry_ts) = u64::try_from(group.expiry_ts).ok() else {
1220                continue;
1221            };
1222            let remove_expiry_bucket = if let Some(scheduled_symbols) =
1223                self.expiry_schedules.get_mut(&schedule_expiry_ts)
1224            {
1225                scheduled_symbols.retain(|symbol| !group_symbols.contains(symbol.as_str()));
1226                scheduled_symbols.is_empty()
1227            } else {
1228                false
1229            };
1230            if remove_expiry_bucket {
1231                self.expiry_schedules.remove(&schedule_expiry_ts);
1232            }
1233            self.apply_transition_to_pending_settlement(&group.symbols, now_ms, ctx, output);
1234
1235            let mut underlying_symbols: BTreeMap<String, Vec<String>> = BTreeMap::new();
1236            for symbol in Self::ordered_due_group_symbols(&group) {
1237                let parsed = parse_settlement_instrument_or_panic(&symbol, "tick expiry grouping");
1238                underlying_symbols
1239                    .entry(parsed.underlying)
1240                    .or_default()
1241                    .push(symbol);
1242            }
1243
1244            let mut settled_symbols = Vec::new();
1245            for (underlying, symbols) in underlying_symbols {
1246                let expiry_ts = group.expiry_ts;
1247                let Some(price) = price_map.get(&(underlying.clone(), expiry_ts)).copied() else {
1248                    continue;
1249                };
1250                for symbol in &symbols {
1251                    let result = self.apply_expire_instrument(
1252                        symbol,
1253                        price,
1254                        now_ms,
1255                        ctx,
1256                        output,
1257                        &margin_modes,
1258                        &pm_settlement_required_wallets,
1259                        &pm_settlements,
1260                        &mut next_pm_settlement_sequence,
1261                    )?;
1262                    if result.settled && !result.blocked {
1263                        settled_symbols.push(symbol.clone());
1264                    }
1265                }
1266            }
1267            if !settled_symbols.is_empty() {
1268                output
1269                    .expiry_effects
1270                    .push(ExpiryEffect::UpdateInstrumentStatus {
1271                        symbols: settled_symbols,
1272                        status: "SETTLED".to_string(),
1273                    });
1274            }
1275        }
1276
1277        for group in context.pending_settlements {
1278            let Some(price) = price_map
1279                .get(&(group.underlying.clone(), group.expiry_ts))
1280                .copied()
1281            else {
1282                continue;
1283            };
1284
1285            let mut settled_symbols = Vec::new();
1286            for symbol in &group.symbols {
1287                let result = self.apply_expire_instrument(
1288                    symbol,
1289                    price,
1290                    now_ms,
1291                    ctx,
1292                    output,
1293                    &margin_modes,
1294                    &pm_settlement_required_wallets,
1295                    &pm_settlements,
1296                    &mut next_pm_settlement_sequence,
1297                )?;
1298                if result.settled && !result.blocked {
1299                    settled_symbols.push(symbol.clone());
1300                }
1301            }
1302            if !settled_symbols.is_empty() {
1303                output
1304                    .expiry_effects
1305                    .push(ExpiryEffect::CancelOrphanedOrdersBySymbols {
1306                        symbols: settled_symbols.clone(),
1307                    });
1308                output
1309                    .expiry_effects
1310                    .push(ExpiryEffect::UpdateInstrumentStatus {
1311                        symbols: settled_symbols,
1312                        status: "SETTLED".to_string(),
1313                    });
1314            }
1315        }
1316
1317        if next_pm_settlement_sequence != expected_pm_settlement_count {
1318            return Err(format!(
1319                "TickExpiry applied {} PM settlement classifications but command carried {}",
1320                next_pm_settlement_sequence, expected_pm_settlement_count
1321            ));
1322        }
1323
1324        Ok(())
1325    }
1326
1327    fn validate_tick_expiry_pm_settlements(
1328        &self,
1329        now_ms: u64,
1330        context: &TickExpiryContext,
1331        ctx: &EngineCtx,
1332        margin_manager: &MarginManager,
1333    ) -> Result<(), String> {
1334        let expected = self.prepare_pm_settlements(
1335            &context.due_expiries,
1336            &context.pending_settlements,
1337            &context.settlement_prices,
1338            &context.margin_modes,
1339            now_ms,
1340            ctx,
1341            margin_manager,
1342        )?;
1343        let mut pm_settlement_state = ctx.pm_settlement_state.clone();
1344
1345        let mut actual = HashMap::new();
1346        for settlement in &context.pm_settlements {
1347            let key = (settlement.wallet, settlement.market_id.clone());
1348            if actual.insert(key.clone(), settlement).is_some() {
1349                return Err(format!(
1350                    "duplicate PM settlement classification facts for wallet {} on {}",
1351                    key.0, key.1
1352                ));
1353            }
1354        }
1355
1356        for expected_settlement in &expected {
1357            let wallet = expected_settlement.wallet;
1358            let symbol = expected_settlement.market_id.as_str();
1359            let Some(settlement) = actual.get(&(wallet, expected_settlement.market_id.clone()))
1360            else {
1361                return Err(format!(
1362                    "missing PM settlement classification facts for required wallet {wallet} on {symbol}"
1363                ));
1364            };
1365            let actual_sequence = settlement.event_key.settlement_event_sequence;
1366            if actual_sequence != expected_settlement.event_key.settlement_event_sequence {
1367                return Err(format!(
1368                    "PM settlement application order mismatch for {}/{}: expected sequence {}, got {}",
1369                    wallet, symbol, expected_settlement.event_key.settlement_event_sequence, actual_sequence
1370                ));
1371            }
1372            validate_pm_settlement_matches_expiry(
1373                settlement,
1374                wallet,
1375                symbol,
1376                &expected_settlement.underlying,
1377                expected_settlement.expiry_ts_ms,
1378                expected_settlement.obligation.net_pnl_usdc,
1379            )?;
1380            validate_pm_settlement_matches_prepared(
1381                settlement,
1382                expected_settlement,
1383                wallet,
1384                symbol,
1385            )?;
1386            pm_settlement_state.apply_classify_settlement((*settlement).clone(), now_ms)?;
1387        }
1388
1389        if actual.len() != expected.len() {
1390            let expected_keys: HashSet<(WalletAddress, String)> = expected
1391                .iter()
1392                .map(|expected| (expected.wallet, expected.market_id.clone()))
1393                .collect();
1394            let mut extra = actual
1395                .keys()
1396                .filter(|key| !expected_keys.contains(*key))
1397                .collect::<Vec<_>>();
1398            extra.sort_by_key(|(wallet, symbol)| (*wallet, symbol.clone()));
1399            if let Some((wallet, symbol)) = extra.first() {
1400                return Err(format!(
1401                    "unexpected PM settlement classification facts for wallet {} on {}",
1402                    wallet, symbol
1403                ));
1404            }
1405            return Err(format!(
1406                "TickExpiry expected {} PM settlement classifications but command carried {}",
1407                expected.len(),
1408                actual.len()
1409            ));
1410        }
1411
1412        Ok(())
1413    }
1414
1415    fn effective_tick_expiry_margin_modes(
1416        &self,
1417        context: &TickExpiryContext,
1418        ctx: &EngineCtx,
1419    ) -> Result<HashMap<WalletAddress, MarginMode>, String> {
1420        let context_margin_modes: HashMap<WalletAddress, MarginMode> = context
1421            .margin_modes
1422            .iter()
1423            .map(|mode| (mode.wallet, mode.margin_mode))
1424            .collect();
1425        let mut margin_modes = HashMap::new();
1426        let priced_symbols = Self::priced_symbols_for_context(
1427            &context.due_expiries,
1428            &context.pending_settlements,
1429            &context.settlement_prices,
1430        );
1431
1432        for ((wallet, symbol), position) in &ctx.engine_positions {
1433            if !priced_symbols.contains(symbol) || position.quantity == Decimal::ZERO {
1434                continue;
1435            }
1436
1437            let engine_margin_mode = ctx.deps.wallet_margin_modes.get(wallet).copied();
1438            if let (Some(command_mode), Some(engine_mode)) = (
1439                context_margin_modes.get(wallet).copied(),
1440                engine_margin_mode,
1441            ) {
1442                if command_mode != engine_mode {
1443                    return Err(format!(
1444                        "TickExpiry margin mode mismatch for wallet {wallet}: command carried {command_mode:?}, engine state has {engine_mode:?}"
1445                    ));
1446                }
1447            }
1448            if margin_modes.contains_key(wallet) {
1449                continue;
1450            }
1451
1452            let margin_mode = engine_margin_mode
1453                .or_else(|| context_margin_modes.get(wallet).copied())
1454                .unwrap_or(MarginMode::Standard);
1455            margin_modes.insert(*wallet, margin_mode);
1456        }
1457
1458        Ok(margin_modes)
1459    }
1460
1461    fn priced_symbols_for_context(
1462        due_expiries: &[TickExpiryDueGroup],
1463        pending_settlements: &[TickExpiryPendingGroup],
1464        settlement_prices: &[TickExpirySettlementPrice],
1465    ) -> HashSet<String> {
1466        let priced_keys: HashSet<(String, i64)> = settlement_prices
1467            .iter()
1468            .map(|price| (price.underlying.clone(), price.expiry_ts))
1469            .collect();
1470        let mut priced_symbols = HashSet::new();
1471
1472        for group in due_expiries {
1473            for symbol in &group.symbols {
1474                let parsed = parse_settlement_instrument_or_panic(symbol, "priced expiry context");
1475                if priced_keys.contains(&(parsed.underlying, group.expiry_ts)) {
1476                    priced_symbols.insert(symbol.clone());
1477                }
1478            }
1479        }
1480        for group in pending_settlements {
1481            if priced_keys.contains(&(group.underlying.clone(), group.expiry_ts)) {
1482                priced_symbols.extend(group.symbols.iter().cloned());
1483            }
1484        }
1485
1486        priced_symbols
1487    }
1488
1489    fn apply_transition_to_pending_settlement(
1490        &mut self,
1491        symbols: &[String],
1492        now_ms: u64,
1493        ctx: &mut EngineCtx,
1494        output: &mut ApplyOutput,
1495    ) {
1496        if symbols.is_empty() {
1497            return;
1498        }
1499
1500        let mut cancelled_order_ids = Vec::new();
1501        for symbol in symbols {
1502            ctx.expired_instruments.insert(symbol.clone(), true);
1503
1504            if let Some(orderbook) = ctx.orderbooks.get(symbol) {
1505                let underlying =
1506                    parse_settlement_instrument_or_panic(symbol, "apply expiry transition")
1507                        .underlying;
1508
1509                for r in orderbook.get_all_orders() {
1510                    let cancel_message = OrderUpdateMessage {
1511                        timestamp: now_ms,
1512                        info: OrderInfo {
1513                            symbol: symbol.clone(),
1514                            price: r.price,
1515                            size: r.quantity,
1516                            side: r.side,
1517                            tif: hypercall_types::TimeInForce::GTC,
1518                            client_id: None,
1519                            order_id: Some(r.order_id),
1520                            is_perp: false,
1521                            underlying: Some(underlying.clone()),
1522                            reduce_only: None,
1523                            nonce: None,
1524                            signature: None,
1525                            mmp_enabled: false,
1526                            builder_code_address: None,
1527                        },
1528                        status: OrderUpdateStatus::Canceled,
1529                        reason: Some("Instrument expired".to_string()),
1530                        filled_size: dec!(0),
1531                        order_id: Some(r.order_id),
1532                        wallet_address: r.wallet,
1533                        mmp_triggered: false,
1534                        request_id: None,
1535                    };
1536                    ctx.order_index.remove_order(&r.wallet, r.order_id);
1537                    cancelled_order_ids.push(r.order_id as i64);
1538                    output.push(EngineMessage::OrderUpdate(cancel_message));
1539                }
1540            }
1541
1542            ctx.orderbooks.remove(symbol);
1543        }
1544
1545        output
1546            .expiry_effects
1547            .push(ExpiryEffect::UpdateInstrumentStatus {
1548                symbols: symbols.to_vec(),
1549                status: "EXPIRED_PENDING_PRICE".to_string(),
1550            });
1551        if !cancelled_order_ids.is_empty() {
1552            output
1553                .expiry_effects
1554                .push(ExpiryEffect::BatchCancelOrdersForSettlement {
1555                    order_ids: cancelled_order_ids,
1556                    now_ms,
1557                });
1558        }
1559        output
1560            .expiry_effects
1561            .push(ExpiryEffect::CancelOrphanedOrdersBySymbols {
1562                symbols: symbols.to_vec(),
1563            });
1564    }
1565
1566    fn apply_expire_instrument(
1567        &mut self,
1568        symbol: &str,
1569        reference_price: Decimal,
1570        now_ms: u64,
1571        ctx: &mut EngineCtx,
1572        output: &mut ApplyOutput,
1573        margin_modes: &HashMap<WalletAddress, MarginMode>,
1574        pm_settlement_required_wallets: &HashSet<WalletAddress>,
1575        pm_settlements: &HashMap<(WalletAddress, String), TickExpiryPmSettlement>,
1576        next_pm_settlement_sequence: &mut u64,
1577    ) -> Result<ExpireInstrumentApplyResult, String> {
1578        let instrument = SettlementInstrument::from_symbol(symbol)
1579            .map_err(|e| format!("Failed to parse symbol {}: {}", symbol, e))?;
1580
1581        let mut settlements_to_apply: Vec<SettlementPosition> = Vec::new();
1582        for ((wallet, sym), pos) in &ctx.engine_positions {
1583            if sym == symbol && pos.quantity != Decimal::ZERO {
1584                settlements_to_apply.push(SettlementPosition {
1585                    wallet: *wallet,
1586                    position_size: pos.quantity,
1587                    entry_price: pos.entry_price,
1588                });
1589            }
1590        }
1591        settlements_to_apply.sort_by_key(|position| position.wallet);
1592
1593        if settlements_to_apply.is_empty() && ctx.expired_instruments.get(symbol) == Some(&true) {
1594            return Ok(ExpireInstrumentApplyResult {
1595                settled: true,
1596                blocked: false,
1597            });
1598        }
1599
1600        ctx.expired_instruments.insert(symbol.to_string(), true);
1601
1602        let market = hypercall_types::Market {
1603            symbol: symbol.to_string(),
1604            underlying: instrument.underlying.clone(),
1605            expiry: instrument.expiry,
1606            strike: instrument.strike,
1607            option_type: match instrument.option_type {
1608                SettlementOptionType::Call => MessageOptionType::Call,
1609                SettlementOptionType::Put => MessageOptionType::Put,
1610            },
1611        };
1612
1613        ctx.orderbooks.remove(symbol);
1614
1615        let settlement_outputs =
1616            plan_position_settlements(&instrument, reference_price, &settlements_to_apply);
1617        let expiry_ts_ms = instrument
1618            .expiry_ts
1619            .checked_mul(1000)
1620            .ok_or_else(|| format!("expiry timestamp overflow for {symbol}"))?;
1621
1622        let mut result = ExpireInstrumentApplyResult::default();
1623        if settlement_outputs.is_empty() {
1624            result.settled = true;
1625        }
1626        for (position, out) in settlement_outputs {
1627            let wallet = position.wallet;
1628            let margin_mode = margin_modes
1629                .get(&wallet)
1630                .copied()
1631                .or_else(|| ctx.deps.wallet_margin_modes.get(&wallet).copied())
1632                .unwrap_or(MarginMode::Standard);
1633            let settlement_value = out.settlement_value;
1634            let cost_basis = out.cost_basis;
1635            let net_pnl = out.net_pnl;
1636            let pm_settlement = if margin_mode == MarginMode::Portfolio {
1637                pm_settlements.get(&(wallet, symbol.to_string()))
1638            } else {
1639                None
1640            };
1641            if margin_mode == MarginMode::Portfolio
1642                && pm_settlement_required_wallets.contains(&wallet)
1643                && pm_settlement.is_none()
1644            {
1645                return Err(format!(
1646                    "missing PM settlement classification facts for required wallet {wallet} on {symbol}"
1647                ));
1648            }
1649            let mut pool_front_usdc = Decimal::ZERO;
1650            if let Some(pm_settlement) = pm_settlement {
1651                validate_pm_settlement_matches_expiry(
1652                    pm_settlement,
1653                    wallet,
1654                    symbol,
1655                    &instrument.underlying,
1656                    expiry_ts_ms,
1657                    net_pnl,
1658                )?;
1659                let actual_sequence = pm_settlement.event_key.settlement_event_sequence;
1660                if actual_sequence != *next_pm_settlement_sequence {
1661                    return Err(format!(
1662                        "PM settlement application order mismatch for {}/{}: expected sequence {}, got {}",
1663                        wallet, symbol, *next_pm_settlement_sequence, actual_sequence
1664                    ));
1665                }
1666                *next_pm_settlement_sequence = (*next_pm_settlement_sequence)
1667                    .checked_add(1)
1668                    .ok_or_else(|| {
1669                        "PM settlement event sequence overflow during expiry apply".to_string()
1670                    })?;
1671                let effects = ctx
1672                    .pm_settlement_state
1673                    .apply_classify_settlement(pm_settlement.clone(), now_ms)?;
1674                output.pm_settlement_effects.extend(effects);
1675                let status = ctx
1676                    .pm_settlement_state
1677                    .events
1678                    .get(&pm_settlement.event_key)
1679                    .map(|event| event.status.as_str())
1680                    .unwrap_or("Unavailable");
1681                match status {
1682                    "Unavailable" => {
1683                        result.blocked = true;
1684                        continue;
1685                    }
1686                    "TimingBridge" | "SettlementDebt" => {
1687                        pool_front_usdc = (pm_settlement.settlement_obligation_usdc
1688                            - pm_settlement.liquid_usdc)
1689                            .max(Decimal::ZERO);
1690                    }
1691                    "Paid" => {}
1692                    other => {
1693                        return Err(format!("unknown PM settlement classification {other}"));
1694                    }
1695                }
1696            }
1697
1698            let cash_delta =
1699                expiry_cash_delta_for_margin_mode(margin_mode, settlement_value, net_pnl);
1700            apply_engine_balance_update(
1701                ctx,
1702                Some(output),
1703                wallet,
1704                cash_delta,
1705                hypercall_types::BalanceUpdateReason::Settlement,
1706                Some(symbol.to_string()),
1707                now_ms,
1708            );
1709            if pool_front_usdc > Decimal::ZERO {
1710                apply_engine_balance_update(
1711                    ctx,
1712                    Some(output),
1713                    wallet,
1714                    pool_front_usdc,
1715                    hypercall_types::BalanceUpdateReason::Settlement,
1716                    Some(format!("pm-settlement-pool-front:{symbol}")),
1717                    now_ms,
1718                );
1719            }
1720
1721            output
1722                .expiry_effects
1723                .push(ExpiryEffect::ApplySettlement(ExpirySettlementIntent {
1724                    wallet,
1725                    symbol: symbol.to_string(),
1726                    position_size: position.position_size,
1727                    settlement_price: out.intrinsic_value,
1728                    settlement_value,
1729                    margin_mode,
1730                    event_ts_ms: now_ms as i64,
1731                    settlement_entry_price: Some(position.entry_price),
1732                    cost_basis: Some(cost_basis),
1733                    net_pnl: Some(net_pnl),
1734                }));
1735
1736            output.push(EngineMessage::PositionExpired(
1737                build_position_expired_message(&instrument, &position, &out, margin_mode, now_ms),
1738            ));
1739
1740            ctx.engine_positions.remove(&(wallet, symbol.to_string()));
1741            result.settled = true;
1742        }
1743
1744        if result.settled && !result.blocked {
1745            output.push(EngineMessage::MarketUpdate(MarketUpdateMessage {
1746                market,
1747                status: MarketUpdateStatus::MarketExpired,
1748                timestamp: now_ms,
1749                reason: None,
1750            }));
1751        }
1752
1753        Ok(result)
1754    }
1755
1756    /// Check for expired instruments and process expiries.
1757    pub async fn check_and_process_expiries(&mut self, now_ms: u64, ctx: &mut EngineCtx) {
1758        let current_time = now_ms / 1000;
1759
1760        // Phase 1: Process newly expired instruments
1761        let expired_times: Vec<u64> = self
1762            .expiry_schedules
1763            .range(..=current_time)
1764            .map(|(time, _)| *time)
1765            .collect();
1766
1767        for expiry_time in expired_times {
1768            if let Some(symbols) = self.expiry_schedules.remove(&expiry_time) {
1769                info!(
1770                    "Processing expiry at {} for {} instruments",
1771                    expiry_time,
1772                    symbols.len()
1773                );
1774
1775                self.transition_to_pending_settlement(&symbols, now_ms, ctx);
1776
1777                let mut underlying_symbols: HashMap<String, Vec<String>> = HashMap::new();
1778                for symbol in &symbols {
1779                    let parsed =
1780                        parse_settlement_instrument_or_panic(symbol, "expiry price grouping");
1781                    underlying_symbols
1782                        .entry(parsed.underlying.clone())
1783                        .or_default()
1784                        .push(symbol.clone());
1785                }
1786
1787                for (underlying, option_symbols) in underlying_symbols {
1788                    let expiry_ts_i64 = expiry_time as i64;
1789                    if let Some(price) = self
1790                        .get_settlement_price(&ctx.deps, &underlying, expiry_ts_i64)
1791                        .await
1792                    {
1793                        let mut settled_symbols = Vec::new();
1794                        for symbol in &option_symbols {
1795                            if let Err(e) = self.expire_instrument(symbol, price, now_ms, ctx).await
1796                            {
1797                                panic!(
1798                                    "CRITICAL_FAILURE: Failed to expire instrument {}: {}. \
1799                                     Partial settlement is worse than no settlement. Restart required.",
1800                                    symbol, e
1801                                );
1802                            } else {
1803                                settled_symbols.push(symbol.clone());
1804                                record_settlement(&underlying, true);
1805                            }
1806                        }
1807                        if !settled_symbols.is_empty() {
1808                            self.update_instrument_status(
1809                                &settled_symbols,
1810                                "SETTLED",
1811                                ctx.db.as_ref(),
1812                            );
1813                        }
1814                    } else {
1815                        warn!(
1816                            "Settlement price not yet available for {} expiry {}, will retry",
1817                            underlying, expiry_ts_i64
1818                        );
1819                    }
1820                }
1821            }
1822        }
1823
1824        // Phase 2: Retry settlement for instruments pending from previous ticks/restarts
1825        let pending = self.get_pending_settlement_instruments(ctx.db.as_ref());
1826        for (underlying, expiry_ts, symbols) in pending {
1827            if let Some(price) = self
1828                .get_settlement_price(&ctx.deps, &underlying, expiry_ts)
1829                .await
1830            {
1831                info!(
1832                    "Retrying settlement for {} instruments of {} expiry {}",
1833                    symbols.len(),
1834                    underlying,
1835                    expiry_ts
1836                );
1837
1838                let mut settled_symbols = Vec::new();
1839                for symbol in &symbols {
1840                    if let Err(e) = self.expire_instrument(symbol, price, now_ms, ctx).await {
1841                        panic!(
1842                            "CRITICAL_FAILURE: Failed to expire instrument {} during retry: {}. \
1843                             Partial settlement is worse than no settlement. Restart required.",
1844                            symbol, e
1845                        );
1846                    } else {
1847                        settled_symbols.push(symbol.clone());
1848                        record_settlement(&underlying, true);
1849                    }
1850                }
1851                if !settled_symbols.is_empty() {
1852                    if let Some(ref handler) = ctx.db {
1853                        match handler.cancel_orphaned_orders_by_symbols_sync(&settled_symbols) {
1854                            Ok(0) => {}
1855                            Ok(n) => {
1856                                warn!(
1857                                    "Settlement retry cleanup: cancelled {} orphaned order_infos \
1858                                     rows on {} instruments",
1859                                    n,
1860                                    settled_symbols.len()
1861                                );
1862                            }
1863                            Err(e) => {
1864                                error!(
1865                                    "Failed settlement retry cleanup for {:?}: {}",
1866                                    settled_symbols, e
1867                                );
1868                            }
1869                        }
1870                    }
1871
1872                    self.update_instrument_status(&settled_symbols, "SETTLED", ctx.db.as_ref());
1873                }
1874            }
1875        }
1876    }
1877
1878    /// Expire a specific instrument and settle all positions.
1879    pub async fn expire_instrument(
1880        &mut self,
1881        symbol: &str,
1882        reference_price: Decimal,
1883        now_ms: u64,
1884        ctx: &mut EngineCtx,
1885    ) -> Result<(), String> {
1886        let parsed = ParsedSymbol::from_symbol(symbol)
1887            .map_err(|e| format!("Failed to parse symbol {}: {}", symbol, e))?;
1888
1889        let mut settlements_to_apply: Vec<(WalletAddress, Decimal, Decimal)> = Vec::new();
1890        for ((wallet, sym), pos) in &ctx.engine_positions {
1891            if sym == symbol && pos.quantity != Decimal::ZERO {
1892                settlements_to_apply.push((*wallet, pos.quantity, pos.entry_price));
1893            }
1894        }
1895        let has_positions = !settlements_to_apply.is_empty();
1896        let settlement_option_type = to_settlement_option_type(&parsed.option_type);
1897        let mut settlement_intents: Vec<(WalletAddress, SettlementApplyIntent)> = Vec::new();
1898
1899        for (wallet, position_size, settlement_entry_price) in settlements_to_apply {
1900            let margin_mode = margin_mode_for_settlement(ctx, wallet)?;
1901            let intent = build_settlement_apply_intent(SettlementApplyInput {
1902                wallet,
1903                symbol,
1904                option_type: settlement_option_type,
1905                strike: parsed.strike,
1906                reference_price,
1907                position_size,
1908                entry_price: settlement_entry_price,
1909                margin_mode,
1910                timestamp_ms: now_ms,
1911            });
1912            settlement_intents.push((wallet, intent));
1913        }
1914
1915        if !has_positions && ctx.expired_instruments.get(symbol) == Some(&true) {
1916            debug!(
1917                "Instrument {} already expired and settled (no positions), skipping",
1918                symbol
1919            );
1920            return Ok(());
1921        }
1922
1923        ctx.expired_instruments.insert(symbol.to_string(), true);
1924
1925        info!(
1926            "Expiring {} with reference price {}",
1927            symbol, reference_price
1928        );
1929
1930        let market = hypercall_types::Market {
1931            symbol: symbol.to_string(),
1932            underlying: parsed.underlying.clone(),
1933            expiry: parsed.expiry,
1934            strike: parsed.strike,
1935            option_type: match parsed.option_type {
1936                crate::types::OptionType::Call => MessageOptionType::Call,
1937                crate::types::OptionType::Put => MessageOptionType::Put,
1938            },
1939        };
1940
1941        let market_update = MarketUpdateMessage {
1942            market,
1943            status: MarketUpdateStatus::MarketExpired,
1944            timestamp: now_ms,
1945            reason: None,
1946        };
1947
1948        ctx.deps
1949            .event_sender
1950            .send(EngineMessage::MarketUpdate(market_update))
1951            .unwrap_or_else(|e| {
1952                panic!(
1953                    "CRITICAL_FAILURE: Failed to send MarketUpdate (expired) for symbol {}: {}. \
1954                     Market expiry event lost. Restart required.",
1955                    symbol, e
1956                )
1957            });
1958
1959        if ctx.orderbooks.contains_key(symbol) {
1960            ctx.orderbooks.remove(symbol);
1961        }
1962
1963        for (wallet, intent) in settlement_intents {
1964            if let Some(ref handler) = ctx.db {
1965                let settlement: &dyn hypercall_db::SettlementWriter = handler;
1966                let outcome = settlement
1967                    .try_apply_settlement_sync(
1968                        &wallet,
1969                        symbol,
1970                        intent.message.position_size,
1971                        intent.message.settlement_price,
1972                        intent.message.settlement_value,
1973                        intent.message.margin_mode,
1974                        now_ms as i64,
1975                        intent.message.settlement_entry_price,
1976                        intent.message.cost_basis,
1977                        intent.message.net_pnl,
1978                    )
1979                    .map_err(|e| {
1980                        format!(
1981                            "Failed to persist settlement for {}/{}: {}",
1982                            wallet, symbol, e
1983                        )
1984                    })?;
1985
1986                let balance_before = ctx.balance_ledger.balance(&wallet);
1987                let balance_after = apply_engine_balance_update(
1988                    ctx,
1989                    None,
1990                    wallet,
1991                    intent.cash_delta,
1992                    hypercall_types::BalanceUpdateReason::Settlement,
1993                    Some(symbol.to_string()),
1994                    now_ms,
1995                );
1996                debug!(
1997                    wallet = %wallet,
1998                    symbol = %symbol,
1999                    balance_before = %balance_before,
2000                    balance_after = %balance_after,
2001                    newly_persisted = outcome.newly_persisted,
2002                    "Applied settlement cashflow to balance_ledger after durable claim"
2003                );
2004
2005                if !outcome.newly_persisted {
2006                    info!(
2007                        "Settlement for {}/{} already durably claimed; applying local expiry state",
2008                        wallet, symbol
2009                    );
2010                }
2011            } else {
2012                let balance_before = ctx.balance_ledger.balance(&wallet);
2013                let balance_after = apply_engine_balance_update(
2014                    ctx,
2015                    None,
2016                    wallet,
2017                    intent.cash_delta,
2018                    hypercall_types::BalanceUpdateReason::Settlement,
2019                    Some(symbol.to_string()),
2020                    now_ms,
2021                );
2022                debug!(
2023                    wallet = %wallet,
2024                    symbol = %symbol,
2025                    cash_delta = %intent.cash_delta,
2026                    balance_before = %balance_before,
2027                    balance_after = %balance_after,
2028                    "Applied in-memory settlement cashflow to balance_ledger"
2029                );
2030            }
2031
2032            let expiry_msg = EngineMessage::PositionExpired(intent.message.clone());
2033
2034            ctx.deps
2035                .event_sender
2036                .send(expiry_msg.clone())
2037                .unwrap_or_else(|e| {
2038                    panic!(
2039                        "CRITICAL_FAILURE: Failed to send PositionExpired event for {}/{}: {}. \
2040                     Settlement event lost. Restart required.",
2041                        wallet, symbol, e
2042                    )
2043                });
2044            // Also send to direct WS channel (best-effort)
2045            if let Some(ref ws_tx) = ctx.deps.ws_event_sender {
2046                let _ = ws_tx.send(expiry_msg);
2047            }
2048
2049            ctx.engine_positions.remove(&(wallet, symbol.to_string()));
2050
2051            info!(
2052                "Position expired for account {}: symbol={}, size={}, settlement_price={}, settlement_value={}, settlement_entry_price={}, cost_basis={}, net_pnl={}",
2053                wallet,
2054                symbol,
2055                intent.message.position_size,
2056                intent.message.settlement_price,
2057                intent.message.settlement_value,
2058                intent
2059                    .message
2060                    .settlement_entry_price
2061                    .expect("settlement intent includes entry price"),
2062                intent.message.cost_basis.expect("settlement intent includes cost_basis"),
2063                intent.message.net_pnl.expect("settlement intent includes net_pnl")
2064            );
2065        }
2066
2067        info!("Successfully expired instrument: {}", symbol);
2068        Ok(())
2069    }
2070
2071    /// Register TWAP settlement windows for all active instruments.
2072    pub fn register_settlements_for_existing_instruments(
2073        &self,
2074        deps: &EngineDeps,
2075        orderbooks: &HashMap<String, hypercall_engine::OrderBook>,
2076    ) {
2077        if deps.mark_price_oracles.is_empty() {
2078            debug!("No oracles configured, skipping settlement registration");
2079            return;
2080        }
2081
2082        let mut registered = 0;
2083        for (symbol, orderbook) in orderbooks {
2084            let parsed =
2085                parse_settlement_instrument_or_panic(symbol, "existing settlement registration");
2086            let expiry_timestamp = expiry_date_to_timestamp(&parsed.underlying, orderbook.expiry);
2087            if let Some(oracle) = deps.mark_price_oracles.get(&parsed.underlying) {
2088                let expiry_ts_i64 = expiry_timestamp as i64;
2089                let oracle = oracle.clone();
2090                let symbol_clone = symbol.clone();
2091                tokio::spawn(async move {
2092                    oracle.register_settlement(expiry_ts_i64, 1800).await;
2093                    debug!(
2094                        "Registered TWAP settlement for {} at expiry {}",
2095                        symbol_clone, expiry_ts_i64
2096                    );
2097                });
2098                registered += 1;
2099            }
2100        }
2101
2102        info!(
2103            "Registered TWAP settlements for {} existing instruments",
2104            registered
2105        );
2106    }
2107}
2108
2109fn validate_pm_settlement_matches_expiry(
2110    pm_settlement: &TickExpiryPmSettlement,
2111    wallet: WalletAddress,
2112    symbol: &str,
2113    underlying: &str,
2114    expiry_ts_ms: i64,
2115    net_pnl_usdc: Decimal,
2116) -> Result<(), String> {
2117    let settlement_obligation_usdc = Decimal::ZERO.max(-net_pnl_usdc);
2118    if pm_settlement.wallet != wallet {
2119        return Err(format!(
2120            "PM settlement classification facts do not match expiry for {wallet}/{symbol}: wallet mismatch"
2121        ));
2122    }
2123    if pm_settlement.market_id != symbol {
2124        return Err(format!(
2125            "PM settlement classification facts do not match expiry for {wallet}/{symbol}: market_id mismatch"
2126        ));
2127    }
2128    if pm_settlement.underlying != underlying {
2129        return Err(format!(
2130            "PM settlement classification facts do not match expiry for {wallet}/{symbol}: underlying mismatch"
2131        ));
2132    }
2133    if pm_settlement.expiry_ts_ms != expiry_ts_ms {
2134        return Err(format!(
2135            "PM settlement classification facts do not match expiry for {wallet}/{symbol}: expiry_ts_ms mismatch"
2136        ));
2137    }
2138    if pm_settlement.settlement_obligation_usdc != settlement_obligation_usdc {
2139        return Err(format!(
2140            "PM settlement classification facts do not match expiry for {wallet}/{symbol}: settlement_obligation_usdc mismatch"
2141        ));
2142    }
2143    let obligation = &pm_settlement.obligation;
2144    if obligation.wallet != wallet
2145        || obligation.market_id != symbol
2146        || obligation.underlying != underlying
2147        || obligation.expiry_ts_ms != expiry_ts_ms
2148        || obligation.net_pnl_usdc != net_pnl_usdc
2149        || obligation.settlement_obligation_usdc != settlement_obligation_usdc
2150    {
2151        return Err(format!(
2152            "PM settlement classification facts do not match expiry for {wallet}/{symbol}: obligation mismatch"
2153        ));
2154    }
2155    if pm_settlement.event_key.wallet != wallet
2156        || pm_settlement.event_key.market_id != symbol
2157        || pm_settlement.event_key.expiry_ts_ms != expiry_ts_ms
2158    {
2159        return Err(format!(
2160            "PM settlement classification facts do not match expiry for {wallet}/{symbol}: event key mismatch"
2161        ));
2162    }
2163    Ok(())
2164}
2165
2166fn validate_pm_settlement_matches_prepared(
2167    pm_settlement: &TickExpiryPmSettlement,
2168    expected: &TickExpiryPmSettlement,
2169    wallet: WalletAddress,
2170    symbol: &str,
2171) -> Result<(), String> {
2172    if pm_settlement != expected {
2173        return Err(format!(
2174            "PM settlement classification facts do not match prepared PM facts/digest for {wallet}/{symbol}"
2175        ));
2176    }
2177    Ok(())
2178}
2179
2180#[cfg(test)]
2181mod tests {
2182    use super::*;
2183    use hypercall_types::wallet_address::test_wallet;
2184    use rust_decimal_macros::dec;
2185
2186    #[test]
2187    fn settlement_crate_matches_legacy_long() {
2188        let out = hypercall_settlement::settle_position(&hypercall_settlement::SettlementInput {
2189            option_type: SettlementOptionType::Call,
2190            strike: dec!(0),
2191            reference_price: dec!(130),
2192            position_size: dec!(2),
2193            entry_price: dec!(100),
2194        });
2195        assert_eq!(out.cost_basis, dec!(200));
2196        assert_eq!(out.net_pnl, dec!(60));
2197    }
2198
2199    #[test]
2200    fn settlement_crate_matches_legacy_short() {
2201        let out = hypercall_settlement::settle_position(&hypercall_settlement::SettlementInput {
2202            option_type: SettlementOptionType::Put,
2203            strike: dec!(1000),
2204            reference_price: dec!(910),
2205            position_size: dec!(-3),
2206            entry_price: dec!(120),
2207        });
2208        assert_eq!(out.cost_basis, dec!(-360));
2209        assert_eq!(out.net_pnl, dec!(90));
2210    }
2211
2212    #[test]
2213    fn settlement_apply_intent_uses_standard_terminal_cashflow() {
2214        let intent = build_settlement_apply_intent(SettlementApplyInput {
2215            wallet: test_wallet(1),
2216            symbol: "BTC-20261231-100000-C",
2217            option_type: SettlementOptionType::Call,
2218            strike: dec!(100000),
2219            reference_price: dec!(105000),
2220            position_size: dec!(1),
2221            entry_price: dec!(1000),
2222            margin_mode: MarginMode::Standard,
2223            timestamp_ms: 1777017633000,
2224        });
2225
2226        assert_eq!(intent.message.settlement_price, dec!(5000));
2227        assert_eq!(intent.message.settlement_value, dec!(5000));
2228        assert_eq!(intent.message.cost_basis, Some(dec!(1000)));
2229        assert_eq!(intent.message.net_pnl, Some(dec!(4000)));
2230        assert_eq!(intent.cash_delta, dec!(5000));
2231    }
2232
2233    #[test]
2234    fn settlement_apply_intent_uses_portfolio_net_pnl() {
2235        let intent = build_settlement_apply_intent(SettlementApplyInput {
2236            wallet: test_wallet(2),
2237            symbol: "BTC-20261231-100000-C",
2238            option_type: SettlementOptionType::Call,
2239            strike: dec!(100000),
2240            reference_price: dec!(99000),
2241            position_size: dec!(1),
2242            entry_price: dec!(1000),
2243            margin_mode: MarginMode::Portfolio,
2244            timestamp_ms: 1777017633000,
2245        });
2246
2247        assert_eq!(intent.message.settlement_price, dec!(0));
2248        assert_eq!(intent.message.settlement_value, dec!(0));
2249        assert_eq!(intent.message.cost_basis, Some(dec!(1000)));
2250        assert_eq!(intent.message.net_pnl, Some(dec!(-1000)));
2251        assert_eq!(intent.cash_delta, dec!(-1000));
2252    }
2253
2254    #[test]
2255    fn simulated_pm_liquid_reserves_same_tick_gains_for_later_losses() {
2256        let after_winner =
2257            simulated_pm_liquid_after_settlement(Decimal::ZERO, dec!(500), Decimal::ZERO);
2258        assert_eq!(after_winner, dec!(500));
2259
2260        let after_paid_loss =
2261            simulated_pm_liquid_after_settlement(after_winner, dec!(-400), Decimal::ZERO);
2262        assert_eq!(after_paid_loss, dec!(100));
2263    }
2264
2265    #[test]
2266    fn simulated_pm_liquid_includes_pool_front_for_bridge_and_debt() {
2267        let after_fronted_loss =
2268            simulated_pm_liquid_after_settlement(dec!(500), dec!(-900), dec!(400));
2269        assert_eq!(after_fronted_loss, Decimal::ZERO);
2270    }
2271
2272    #[test]
2273    fn initial_pm_liquid_precredit_consumes_existing_negative_cash() {
2274        assert_eq!(
2275            initial_pm_liquid_with_precredit(dec!(-300), dec!(500)),
2276            dec!(200)
2277        );
2278        assert_eq!(
2279            initial_pm_liquid_with_precredit(dec!(-700), dec!(500)),
2280            Decimal::ZERO
2281        );
2282        assert_eq!(
2283            initial_pm_liquid_with_precredit(dec!(100), dec!(500)),
2284            dec!(600)
2285        );
2286    }
2287
2288    #[test]
2289    fn precredited_pm_gain_nets_loss_when_losing_symbol_is_visited_first() {
2290        let mut remaining_precredited = dec!(500);
2291        let after_loss = simulated_pm_liquid_after_settlement(
2292            dec!(500),
2293            effective_pm_cash_delta_after_precredit(dec!(-400), &mut remaining_precredited),
2294            Decimal::ZERO,
2295        );
2296        assert_eq!(after_loss, dec!(100));
2297        assert_eq!(remaining_precredited, dec!(500));
2298
2299        let after_winner = simulated_pm_liquid_after_settlement(
2300            after_loss,
2301            effective_pm_cash_delta_after_precredit(dec!(500), &mut remaining_precredited),
2302            Decimal::ZERO,
2303        );
2304        assert_eq!(after_winner, dec!(100));
2305        assert_eq!(remaining_precredited, Decimal::ZERO);
2306    }
2307
2308    #[test]
2309    fn recoverable_reservation_uses_pool_front_not_gross_obligation() {
2310        assert_eq!(
2311            simulated_recoverable_after_pm_front(dec!(100), Decimal::ZERO),
2312            dec!(100)
2313        );
2314        assert_eq!(
2315            simulated_recoverable_after_pm_front(dec!(100), dec!(40)),
2316            dec!(60)
2317        );
2318        assert_eq!(
2319            simulated_recoverable_after_pm_front(dec!(100), dec!(140)),
2320            Decimal::ZERO
2321        );
2322    }
2323}