Skip to main content

hypercall/rsm/unified_engine/
matching.rs

1//! Order processing pipeline helpers for UnifiedEngine.
2
3use super::*;
4
5struct SelfTradeMatch {
6    maker_order_id: u64,
7}
8
9#[derive(Default)]
10struct PmShortOptionQuantity {
11    underlying: String,
12    long_position_quantity: rust_decimal::Decimal,
13    short_position_quantity: rust_decimal::Decimal,
14    open_sell_quantity: rust_decimal::Decimal,
15}
16
17impl UnifiedEngine {
18    /// Get the next order ID from the global sequence counter
19    fn get_next_order_id(&mut self) -> u64 {
20        let order_id = self.ctx.next_order_id;
21        self.ctx.next_order_id += 1;
22        order_id
23    }
24
25    // ===== Helper methods for process_order refactoring =====
26
27    /// Validate order: check expiry, orderbook existence, and account funds.
28    pub(super) fn validate_order(
29        &self,
30        order_info: &OrderInfo,
31        wallet: &WalletAddress,
32    ) -> Result<(), String> {
33        let validation_start = Instant::now();
34
35        hypercall_engine::admission::validate_order_shape(order_info)?;
36
37        let mut expired_instruments: std::collections::HashSet<String> = self
38            .ctx
39            .expired_instruments
40            .iter()
41            .filter_map(|(symbol, expired)| expired.then_some(symbol.clone()))
42            .collect();
43        if self
44            .expiry_manager
45            .is_instrument_expired(&order_info.symbol, &self.ctx.orderbooks)
46        {
47            expired_instruments.insert(order_info.symbol.clone());
48        }
49        let open_symbols: std::collections::HashSet<String> =
50            self.ctx.orderbooks.keys().cloned().collect();
51
52        if let Err(reason) = hypercall_engine::admission::validate_instrument_open(
53            order_info,
54            &expired_instruments,
55            &open_symbols,
56        ) {
57            warn!(
58                "Order rejected during engine admission for client_id {:?}: {}",
59                order_info.client_id, reason
60            );
61            return Err(reason);
62        }
63
64        // Use get_risk_account() so PM users are checked against HyperCore
65        // equity rather than the (empty) engine balance ledger.
66        match self.margin_manager.get_risk_account(
67            &self.ctx.deps,
68            &self.ctx.engine_positions,
69            &self.ctx.balance_ledger,
70            wallet,
71        ) {
72            Ok(account) if account.cash <= 0.0 => {
73                let reason = "Account has no funds. Please deposit before trading.".to_string();
74                warn!("Order rejected for wallet {} - {}", wallet, reason);
75                return Err(reason);
76            }
77            Err(reason) => {
78                warn!("Order rejected for wallet {} - {}", wallet, reason);
79                return Err(reason);
80            }
81            Ok(_) => {}
82        }
83
84        histogram!("ht_engine_phase_seconds", "phase" => "validation")
85            .record(validation_start.elapsed().as_secs_f64());
86
87        Ok(())
88    }
89
90    /// Check order restrictions: pre-liquidation, margin, and tier.
91    pub(super) fn check_order_restrictions(
92        &self,
93        order_info: &OrderInfo,
94        wallet: &WalletAddress,
95    ) -> Result<(), String> {
96        // Order limits check (max open orders, max positions)
97        debug!(
98            "Checking order limits for wallet {} on symbol {}",
99            wallet, order_info.symbol
100        );
101        if let Err(limit_error) = self.check_order_limits(wallet, order_info) {
102            warn!(
103                "Order rejected due to order limits for client_id {:?}: {}",
104                order_info.client_id, limit_error
105            );
106            return Err(limit_error);
107        }
108
109        // Pre-liquidation check
110        if let Err(block_reason) = LiquidationManager::check_preliquidation_order_allowed(
111            &self.ctx.deps,
112            &self.ctx.engine_positions,
113            wallet,
114            order_info,
115        ) {
116            warn!(
117                "Order blocked due to pre-liquidation for client_id {:?}: {}",
118                order_info.client_id, block_reason
119            );
120            return Err(block_reason);
121        }
122
123        if let Err(pm_pool_error) =
124            self.check_pm_settlement_pool_gate_for_orders(wallet, std::slice::from_ref(order_info))
125        {
126            warn!(
127                "Order blocked by PM settlement pool gate for client_id {:?}: {}",
128                order_info.client_id, pm_pool_error
129            );
130            return Err(pm_pool_error);
131        }
132
133        // Margin check
134        debug!(
135            "Running unified margin check before order acceptance for client_id {:?}: {}",
136            order_info.client_id, wallet
137        );
138        if let Err(margin_error) = self.margin_manager.check_margin_for_order(
139            &self.ctx.deps,
140            &self.ctx.engine_positions,
141            &self.ctx.balance_ledger,
142            wallet,
143            order_info,
144            &self.ctx.order_index,
145        ) {
146            warn!(
147                "Order rejected due to insufficient margin for client_id {:?}: {}",
148                order_info.client_id, margin_error
149            );
150            return Err(margin_error);
151        }
152
153        // Tier restrictions check
154        debug!(
155            "Checking tier restrictions for wallet {} on symbol {}",
156            wallet, order_info.symbol
157        );
158        if let Err(tier_error) = self.margin_manager.check_tier_restrictions(
159            &self.ctx.deps,
160            &self.ctx.engine_positions,
161            &self.ctx.balance_ledger,
162            wallet,
163            order_info,
164            &self.ctx.order_index,
165        ) {
166            warn!(
167                "Order rejected due to tier restrictions for client_id {:?}: {}",
168                order_info.client_id, tier_error
169            );
170            return Err(tier_error);
171        }
172
173        Ok(())
174    }
175
176    /// Allocate order ID and send Acked response.
177    pub(super) async fn allocate_and_ack(
178        &mut self,
179        request: &UnifiedEngineRequest,
180        order_info: &OrderInfo,
181        wallet: &WalletAddress,
182        timestamp: u64,
183    ) -> AllocatedOrder {
184        let order_id = self.get_next_order_id();
185        info!(
186            "Pre-allocated order ID {} for order: symbol={}, wallet={}",
187            order_id, order_info.symbol, wallet
188        );
189
190        // Publish static order info
191        let order_info_msg = hypercall_types::OrderInfoMessage {
192            timestamp,
193            order_id,
194            wallet: *wallet,
195            info: order_info.clone(),
196        };
197        self.ctx
198            .deps
199            .emit_event(&EngineMessage::OrderInfo(order_info_msg));
200
201        // Send Acked response
202        let acked_response = OrderUpdateMessage {
203            timestamp,
204            info: order_info.clone(),
205            status: OrderUpdateStatus::Acked,
206            reason: Some("Order accepted and queued for matching".to_string()),
207            filled_size: dec!(0),
208            order_id: Some(order_id),
209            wallet_address: *wallet,
210            mmp_triggered: false,
211            request_id: request.message.request_id.clone(),
212        };
213
214        self.ctx
215            .deps
216            .emit_event(&EngineMessage::OrderUpdate(acked_response.clone()));
217
218        // Track in order index (synchronous, zero-lag)
219        self.ctx.order_index.add_order(
220            wallet,
221            hypercall_engine::order_index::OrderSummary {
222                order_id,
223                symbol: order_info.symbol.clone(),
224                side: order_info.side,
225                price: order_info.price,
226                original_size: order_info.size,
227                remaining_size: order_info.size,
228                is_perp: order_info.is_perp,
229                mmp_enabled: order_info.mmp_enabled,
230                client_id: order_info.client_id.clone(),
231                created_at: timestamp as i64,
232            },
233        );
234
235        debug!(
236            "Sending Acked response for client_id {:?}: {:?}",
237            request.message.info.client_id, acked_response
238        );
239        let _ = request.response_tx.send(acked_response).await;
240
241        AllocatedOrder { order_id }
242    }
243
244    pub(super) fn allocate_order_output(
245        &mut self,
246        message: &OrderActionMessage,
247        order_info: &OrderInfo,
248        wallet: &WalletAddress,
249        timestamp: u64,
250        output: &mut crate::rsm::apply::ApplyOutput,
251    ) -> u64 {
252        let order_id = self.get_next_order_id();
253        let order_info_msg = hypercall_types::OrderInfoMessage {
254            timestamp,
255            order_id,
256            wallet: *wallet,
257            info: order_info.clone(),
258        };
259        output.push(EngineMessage::OrderInfo(order_info_msg));
260
261        let acked_response = OrderUpdateMessage {
262            timestamp,
263            info: order_info.clone(),
264            status: OrderUpdateStatus::Acked,
265            reason: Some("Order accepted and queued for matching".to_string()),
266            filled_size: dec!(0),
267            order_id: Some(order_id),
268            wallet_address: *wallet,
269            mmp_triggered: false,
270            request_id: message.request_id.clone(),
271        };
272        output.push(EngineMessage::OrderUpdate(acked_response.clone()));
273        output.order_responses.push(acked_response);
274
275        self.ctx.order_index.add_order(
276            wallet,
277            hypercall_engine::order_index::OrderSummary {
278                order_id,
279                symbol: order_info.symbol.clone(),
280                side: order_info.side,
281                price: order_info.price,
282                original_size: order_info.size,
283                remaining_size: order_info.size,
284                is_perp: order_info.is_perp,
285                mmp_enabled: order_info.mmp_enabled,
286                client_id: order_info.client_id.clone(),
287                created_at: timestamp as i64,
288            },
289        );
290
291        order_id
292    }
293
294    /// Execute orderbook matching with MMP and self-trade handling.
295    pub(super) async fn execute_matching(
296        &mut self,
297        order_id: u64,
298        order_info: &OrderInfo,
299        wallet: &WalletAddress,
300        timestamp: u64,
301    ) -> MatchingResult {
302        let orderbook_start = Instant::now();
303        let orderbook = self.ctx.orderbooks.get_mut(&order_info.symbol).unwrap();
304
305        let underlying_symbol = if let Ok(parsed) = ParsedSymbol::from_symbol(&order_info.symbol) {
306            parsed.underlying
307        } else {
308            order_info
309                .underlying
310                .clone()
311                .unwrap_or(order_info.symbol.clone())
312        };
313
314        info!(
315            "Sending order to orderbook for incremental matching: symbol={}, price={}, size={}, side={:?}, underlying={}",
316            order_info.symbol, order_info.price, order_info.size, order_info.side, underlying_symbol
317        );
318
319        let mut remaining_quantity = order_info.size;
320        let mut all_fills = Vec::new();
321        let mut mmp_triggered = false;
322        let mut self_trade_maker_order_id: Option<u64> = None;
323
324        while remaining_quantity > dec!(0) {
325            let trade_id = self.ctx.next_trade_id;
326            self.ctx.next_trade_id += 1;
327
328            let next_l2_seq = self.ctx.l2_update_seq.fetch_add(1, Ordering::SeqCst) + 1;
329            orderbook.set_pending_l2_sequence(next_l2_seq);
330
331            let (match_result, should_continue) = orderbook.process_order_with_metadata(
332                order_id,
333                order_info.price,
334                remaining_quantity,
335                order_info.side,
336                *wallet,
337                timestamp,
338                trade_id,
339                order_info.client_id.clone(),
340                order_info.mmp_enabled,
341                order_info.size, // original_size from the request
342            );
343
344            match Self::fill_from_match_result(match_result) {
345                Ok(Some(mut fill)) => {
346                    let price_f64 = fill
347                        .price
348                        .to_f64()
349                        .expect("CRITICAL_FAILURE: fill.price is not representable as f64");
350                    let size_u64 = fill.size.trunc().to_u64().expect(
351                        "CRITICAL_FAILURE: fill.size is not representable as u64 contract units",
352                    );
353                    let fee_calc = self.fee_service.get_fees(
354                        &fill.maker_wallet_address.as_hex(),
355                        &fill.taker_wallet_address.as_hex(),
356                        price_f64,
357                        size_u64,
358                        order_info.builder_code_address.as_ref(),
359                    );
360
361                    fill.fee = Decimal::from_f64_retain(fee_calc.taker_fee)
362                        .expect("CRITICAL_FAILURE: taker_fee is not representable as Decimal");
363                    fill.builder_code_address = order_info.builder_code_address;
364                    fill.builder_code_fee = fee_calc.builder_code_fee;
365
366                    all_fills.push(fill.clone());
367                    remaining_quantity -= fill.size;
368
369                    // Update order index: taker and maker fills
370                    self.ctx.order_index.fill_order(wallet, order_id, fill.size);
371                    self.ctx.order_index.fill_order(
372                        &fill.maker_wallet_address,
373                        fill.maker_order_id,
374                        fill.size,
375                    );
376
377                    // Check MMP if enabled -- synchronous, engine-internal state only.
378                    // Fail closed: if the wallet has mmp_enabled but no state entry
379                    // exists (e.g. after restart before config replay), reject the fill.
380                    if order_info.mmp_enabled {
381                        let mmp_key = (*wallet, underlying_symbol.clone());
382                        let mmp_state_entry = self.ctx.mmp_state.get_mut(&mmp_key);
383                        if let Some(mmp_state) = mmp_state_entry {
384                            if mmp_state.enabled {
385                                // Compute greeks synchronously from engine-owned
386                                // spot prices and IV surfaces. Fail closed: if
387                                // greeks cannot be computed for an options fill,
388                                // trigger MMP to prevent untracked risk.
389                                let greeks_result =
390                                    hypercall_engine::greeks::compute_fill_greeks_sync(
391                                        &self.ctx.spot_prices,
392                                        |underlying, strike, expiry_ts, spot| {
393                                            self.ctx.iv_surfaces.get(underlying).and_then(
394                                                |surface| {
395                                                    surface.get(strike, expiry_ts).or_else(|| {
396                                                        surface.get_interpolated_with_spot(
397                                                            strike,
398                                                            expiry_ts,
399                                                            Some(spot),
400                                                        )
401                                                    })
402                                                },
403                                            )
404                                        },
405                                        &fill.symbol,
406                                        fill.size,
407                                        fill.taker_side,
408                                        timestamp,
409                                    );
410
411                                let (delta, vega) = match greeks_result {
412                                    Some(dv) => dv,
413                                    None => {
414                                        // Missing market data for an options fill.
415                                        // Fail closed: trigger MMP rather than
416                                        // allowing untracked risk.
417                                        warn!(
418                                        "MMP TRIGGERED (fail closed): wallet={}, underlying={}, reason=greeks unavailable for {}",
419                                        wallet, underlying_symbol, fill.symbol
420                                    );
421                                        mmp_triggered = true;
422                                        self.handle_mmp_trigger(
423                                            wallet,
424                                            &underlying_symbol,
425                                            "MMP greeks unavailable -- fail closed".to_string(),
426                                            timestamp,
427                                        )
428                                        .await;
429                                        break;
430                                    }
431                                };
432
433                                let size_contract_units = fill
434                                .size
435                                .trunc()
436                                .to_u64()
437                                .expect(
438                                    "CRITICAL_FAILURE: fill.size is not representable as u64 contract units",
439                                );
440
441                                let record = crate::rsm::engine_deps::MmpFillRecord {
442                                    timestamp_ms: timestamp,
443                                    quantity: size_contract_units,
444                                    delta,
445                                    vega,
446                                };
447
448                                match mmp_state.process_fill(record, timestamp) {
449                                    Ok(()) => {
450                                        debug!(
451                                        "MMP check passed for fill: wallet={}, underlying={}, fill_qty={}",
452                                        wallet, underlying_symbol, fill.size
453                                    );
454                                    }
455                                    Err(trigger_reason) => {
456                                        warn!(
457                                            "MMP TRIGGERED: wallet={}, underlying={}, reason={}",
458                                            wallet, underlying_symbol, trigger_reason
459                                        );
460                                        mmp_triggered = true;
461                                        self.handle_mmp_trigger(
462                                            wallet,
463                                            &underlying_symbol,
464                                            trigger_reason,
465                                            timestamp,
466                                        )
467                                        .await;
468                                        break;
469                                    }
470                                }
471                            }
472                        } else {
473                            warn!(
474                                "MMP TRIGGERED (fail closed): wallet={}, underlying={}, reason=missing MMP state",
475                                wallet, underlying_symbol
476                            );
477                            mmp_triggered = true;
478                            self.handle_mmp_trigger(
479                                wallet,
480                                &underlying_symbol,
481                                "MMP state missing -- fail closed".to_string(),
482                                timestamp,
483                            )
484                            .await;
485                            break;
486                        }
487                    }
488                }
489                Err(self_trade) => {
490                    warn!(
491                        "SELF-TRADE DETECTED: taker_order={}, maker_order={}, wallet={}, symbol={}",
492                        order_id, self_trade.maker_order_id, wallet, order_info.symbol
493                    );
494                    self_trade_maker_order_id = Some(self_trade.maker_order_id);
495                    break;
496                }
497                Ok(None) => {}
498            }
499
500            if !should_continue {
501                break;
502            }
503        }
504
505        // Add remaining quantity to book if needed (GTC only; IOC/FOK never rest)
506        if remaining_quantity > dec!(0)
507            && !mmp_triggered
508            && !all_fills.is_empty()
509            && self_trade_maker_order_id.is_none()
510            && order_info.tif == TimeInForce::GTC
511        {
512            let orderbook = self.ctx.orderbooks.get_mut(&order_info.symbol).unwrap();
513            let next_l2_seq = self.ctx.l2_update_seq.fetch_add(1, Ordering::SeqCst) + 1;
514            orderbook.set_pending_l2_sequence(next_l2_seq);
515            orderbook.add_order_to_book_with_events_full(
516                order_id,
517                order_info.price,
518                remaining_quantity,
519                order_info.side,
520                *wallet,
521                timestamp,
522                order_info.client_id.clone(),
523                order_info.mmp_enabled,
524                order_info.size, // original_size from the request
525            );
526        }
527
528        histogram!("ht_engine_phase_seconds", "phase" => "orderbook_match")
529            .record(orderbook_start.elapsed().as_secs_f64());
530
531        let filled_size: Decimal = all_fills.iter().map(|f| f.size).sum();
532        MatchingResult {
533            fills: all_fills,
534            filled_size,
535            mmp_triggered,
536            self_trade_maker_id: self_trade_maker_order_id,
537        }
538    }
539
540    pub(super) fn execute_matching_sync(
541        &mut self,
542        order_id: u64,
543        order_info: &OrderInfo,
544        wallet: &WalletAddress,
545        timestamp: u64,
546        output: &mut crate::rsm::apply::ApplyOutput,
547    ) -> MatchingResult {
548        let underlying_symbol = if let Ok(parsed) = ParsedSymbol::from_symbol(&order_info.symbol) {
549            parsed.underlying
550        } else {
551            order_info
552                .underlying
553                .clone()
554                .unwrap_or(order_info.symbol.clone())
555        };
556
557        let mut remaining_quantity = order_info.size;
558        let mut all_fills = Vec::new();
559        let mut mmp_triggered = false;
560        let mut self_trade_maker_order_id: Option<u64> = None;
561
562        while remaining_quantity > dec!(0) {
563            let trade_id = self.ctx.next_trade_id;
564            self.ctx.next_trade_id += 1;
565
566            let orderbook = self
567                .ctx
568                .orderbooks
569                .get_mut(&order_info.symbol)
570                .expect("validated orderbook disappeared during apply");
571            let next_l2_seq = self.ctx.l2_update_seq.fetch_add(1, Ordering::SeqCst) + 1;
572            orderbook.set_pending_l2_sequence(next_l2_seq);
573
574            let (match_result, should_continue) = orderbook.process_order_with_metadata(
575                order_id,
576                order_info.price,
577                remaining_quantity,
578                order_info.side,
579                *wallet,
580                timestamp,
581                trade_id,
582                order_info.client_id.clone(),
583                order_info.mmp_enabled,
584                order_info.size,
585            );
586
587            match Self::fill_from_match_result(match_result) {
588                Ok(Some(mut fill)) => {
589                    let price_f64 = fill
590                        .price
591                        .to_f64()
592                        .expect("CRITICAL_FAILURE: fill.price is not representable as f64");
593                    let size_u64 = fill.size.trunc().to_u64().expect(
594                        "CRITICAL_FAILURE: fill.size is not representable as u64 contract units",
595                    );
596                    let fee_calc = self.fee_service.get_fees(
597                        &fill.maker_wallet_address.as_hex(),
598                        &fill.taker_wallet_address.as_hex(),
599                        price_f64,
600                        size_u64,
601                        order_info.builder_code_address.as_ref(),
602                    );
603                    fill.fee = Decimal::from_f64_retain(fee_calc.taker_fee)
604                        .expect("CRITICAL_FAILURE: taker_fee is not representable as Decimal");
605                    fill.builder_code_address = order_info.builder_code_address;
606                    fill.builder_code_fee = fee_calc.builder_code_fee;
607
608                    remaining_quantity -= fill.size;
609                    self.ctx.order_index.fill_order(wallet, order_id, fill.size);
610                    self.ctx.order_index.fill_order(
611                        &fill.maker_wallet_address,
612                        fill.maker_order_id,
613                        fill.size,
614                    );
615
616                    if order_info.mmp_enabled
617                        && self.mmp_should_trigger(wallet, &underlying_symbol, &fill, timestamp)
618                    {
619                        mmp_triggered = true;
620                        self.handle_mmp_trigger_output(
621                            wallet,
622                            &underlying_symbol,
623                            "MMP triggered during fill processing".to_string(),
624                            timestamp,
625                            output,
626                        );
627                        all_fills.push(fill);
628                        break;
629                    }
630
631                    all_fills.push(fill);
632                }
633                Err(self_trade) => {
634                    self_trade_maker_order_id = Some(self_trade.maker_order_id);
635                    break;
636                }
637                Ok(None) => {}
638            }
639
640            if !should_continue {
641                break;
642            }
643        }
644
645        if remaining_quantity > dec!(0)
646            && !mmp_triggered
647            && !all_fills.is_empty()
648            && self_trade_maker_order_id.is_none()
649            && order_info.tif == TimeInForce::GTC
650        {
651            let orderbook = self.ctx.orderbooks.get_mut(&order_info.symbol).unwrap();
652            let next_l2_seq = self.ctx.l2_update_seq.fetch_add(1, Ordering::SeqCst) + 1;
653            orderbook.set_pending_l2_sequence(next_l2_seq);
654            orderbook.add_order_to_book_with_events_full(
655                order_id,
656                order_info.price,
657                remaining_quantity,
658                order_info.side,
659                *wallet,
660                timestamp,
661                order_info.client_id.clone(),
662                order_info.mmp_enabled,
663                order_info.size,
664            );
665        }
666
667        let filled_size: Decimal = all_fills.iter().map(|f| f.size).sum();
668        MatchingResult {
669            fills: all_fills,
670            filled_size,
671            mmp_triggered,
672            self_trade_maker_id: self_trade_maker_order_id,
673        }
674    }
675
676    fn fill_from_match_result(
677        match_result: MatchResult,
678    ) -> Result<Option<hypercall_types::Fill>, SelfTradeMatch> {
679        match match_result {
680            MatchResult::Fill(fill) => Ok(Some(fill)),
681            MatchResult::SelfTrade { maker_order_id } => Err(SelfTradeMatch { maker_order_id }),
682            MatchResult::NoMatch => Ok(None),
683        }
684    }
685
686    fn mmp_should_trigger(
687        &mut self,
688        wallet: &WalletAddress,
689        underlying_symbol: &str,
690        fill: &hypercall_types::Fill,
691        timestamp: u64,
692    ) -> bool {
693        let Some(mmp_state) = self
694            .ctx
695            .mmp_state
696            .get_mut(&(*wallet, underlying_symbol.to_string()))
697        else {
698            warn!(
699                "MMP TRIGGERED (fail closed): wallet={}, underlying={}, reason=missing MMP state",
700                wallet, underlying_symbol
701            );
702            return true;
703        };
704        if !mmp_state.enabled {
705            return false;
706        }
707        let Some((delta, vega)) = hypercall_engine::greeks::compute_fill_greeks_sync(
708            &self.ctx.spot_prices,
709            |underlying, strike, expiry_ts, spot| {
710                self.ctx.iv_surfaces.get(underlying).and_then(|surface| {
711                    surface.get(strike, expiry_ts).or_else(|| {
712                        surface.get_interpolated_with_spot(strike, expiry_ts, Some(spot))
713                    })
714                })
715            },
716            &fill.symbol,
717            fill.size,
718            fill.taker_side,
719            timestamp,
720        ) else {
721            warn!(
722                "MMP TRIGGERED (fail closed): wallet={}, underlying={}, reason=greeks unavailable for {}",
723                wallet, underlying_symbol, fill.symbol
724            );
725            return true;
726        };
727        let quantity = fill
728            .size
729            .trunc()
730            .to_u64()
731            .expect("CRITICAL_FAILURE: fill.size is not representable as u64 contract units");
732        let record = crate::rsm::engine_deps::MmpFillRecord {
733            timestamp_ms: timestamp,
734            quantity,
735            delta,
736            vega,
737        };
738        mmp_state.process_fill(record, timestamp).is_err()
739    }
740
741    fn record_fill_metrics(result: &MatchingResult) {
742        let fill_count = result.fills.len();
743        if fill_count == 0 {
744            return;
745        }
746
747        counter!("ht_engine_fills_total").increment(fill_count as u64);
748
749        let total_fill_value: Decimal = result
750            .fills
751            .iter()
752            .map(|f| {
753                let size_human = to_human_readable_decimal(&f.symbol, f.size);
754                f.price * size_human
755            })
756            .sum();
757        if let Some(value_f64) = total_fill_value.to_f64() {
758            let value_usd = value_f64.max(0.0).round() as u64;
759            if value_usd > 0 {
760                counter!("ht_engine_fill_value_usd_total").increment(value_usd);
761            }
762        }
763    }
764
765    /// Finalize order: determine status, record metrics, emit events.
766    pub(super) async fn finalize_order(
767        &mut self,
768        request: &UnifiedEngineRequest,
769        order_id: u64,
770        order_info: &OrderInfo,
771        wallet: &WalletAddress,
772        timestamp: u64,
773        result: &MatchingResult,
774    ) {
775        let fill_count = result.fills.len();
776
777        Self::record_fill_metrics(result);
778
779        info!(
780            "Order processed: client_id {:?}: order_id={}, fills_count={}, filled_size={}, order_size={}, mmp_triggered={}",
781            order_info.client_id, order_id, fill_count, result.filled_size, order_info.size, result.mmp_triggered
782        );
783
784        // Determine order status
785        let status = if result.mmp_triggered && result.filled_size < order_info.size {
786            info!(
787                "Order status: CANCELED (MMP triggered after {} fills)",
788                result.filled_size
789            );
790            // MMP canceled — remove taker order from index (remaining qty won't rest)
791            self.ctx.order_index.remove_order(wallet, order_id);
792            OrderUpdateStatus::Canceled
793        } else if result.self_trade_maker_id.is_some() && result.filled_size < order_info.size {
794            info!(
795                "Order status: CANCELED (self-trade prevention, maker_order_id={:?}, filled_size={}, order_size={})",
796                result.self_trade_maker_id, result.filled_size, order_info.size
797            );
798            // Self-trade prevention canceled remaining quantity, so order must not remain open.
799            self.ctx.order_index.remove_order(wallet, order_id);
800            OrderUpdateStatus::Canceled
801        } else if result.filled_size >= order_info.size {
802            info!(
803                "Order status: FILLED (filled_size={} >= order_size={})",
804                result.filled_size, order_info.size
805            );
806            OrderUpdateStatus::Filled
807        } else if result.filled_size > dec!(0) && order_info.tif != TimeInForce::GTC {
808            info!(
809                "Order status: CANCELED (IOC/FOK partial fill, filled_size={}, order_size={})",
810                result.filled_size, order_info.size
811            );
812            self.ctx.order_index.remove_order(wallet, order_id);
813            OrderUpdateStatus::Canceled
814        } else if result.filled_size > dec!(0) {
815            info!(
816                "Order status: PARTIALLY_FILLED (filled_size={}, order_size={})",
817                result.filled_size, order_info.size
818            );
819            OrderUpdateStatus::PartiallyFilled
820        } else if order_info.tif != TimeInForce::GTC {
821            info!("Order status: CANCELED (IOC/FOK, no fills)");
822            self.ctx.order_index.remove_order(wallet, order_id);
823            if let Some(ob) = self.ctx.orderbooks.get_mut(&order_info.symbol) {
824                ob.cancel_order(order_id);
825            }
826            OrderUpdateStatus::Canceled
827        } else {
828            info!("Order status: OPEN (no fills, filled_size=0)");
829            OrderUpdateStatus::Open
830        };
831
832        // Emit order update event
833        let update_msg = OrderUpdateMessage {
834            timestamp,
835            info: order_info.clone(),
836            status,
837            reason: if result.mmp_triggered {
838                Some("MMP triggered during fill processing".to_string())
839            } else if result.self_trade_maker_id.is_some() {
840                Some("Self-trade prevention triggered".to_string())
841            } else {
842                None
843            },
844            filled_size: result.filled_size,
845            order_id: Some(order_id),
846            wallet_address: *wallet,
847            mmp_triggered: result.mmp_triggered,
848            request_id: request.message.request_id.clone(),
849        };
850
851        info!(
852            "Sending OrderUpdate event: client_id {:?}: order_id={}, status={:?}, filled_size={}",
853            order_info.client_id, order_id, status, result.filled_size
854        );
855        self.ctx
856            .deps
857            .emit_event(&EngineMessage::OrderUpdate(update_msg.clone()));
858    }
859
860    pub(super) fn finalize_order_output(
861        &mut self,
862        message: &OrderActionMessage,
863        order_id: u64,
864        order_info: &OrderInfo,
865        wallet: &WalletAddress,
866        timestamp: u64,
867        result: &MatchingResult,
868        output: &mut crate::rsm::apply::ApplyOutput,
869    ) {
870        Self::record_fill_metrics(result);
871
872        let forced_cancel = (result.mmp_triggered || result.self_trade_maker_id.is_some())
873            && result.filled_size < order_info.size;
874        let status = if forced_cancel {
875            self.ctx.order_index.remove_order(wallet, order_id);
876            OrderUpdateStatus::Canceled
877        } else if result.filled_size >= order_info.size {
878            OrderUpdateStatus::Filled
879        } else if result.filled_size > dec!(0) && order_info.tif != TimeInForce::GTC {
880            self.ctx.order_index.remove_order(wallet, order_id);
881            OrderUpdateStatus::Canceled
882        } else if result.filled_size > dec!(0) {
883            OrderUpdateStatus::PartiallyFilled
884        } else if order_info.tif != TimeInForce::GTC {
885            self.ctx.order_index.remove_order(wallet, order_id);
886            if let Some(ob) = self.ctx.orderbooks.get_mut(&order_info.symbol) {
887                ob.cancel_order(order_id);
888            }
889            OrderUpdateStatus::Canceled
890        } else {
891            OrderUpdateStatus::Open
892        };
893
894        let update_msg = OrderUpdateMessage {
895            timestamp,
896            info: order_info.clone(),
897            status,
898            reason: if result.mmp_triggered {
899                Some("MMP triggered during fill processing".to_string())
900            } else if result.self_trade_maker_id.is_some() {
901                Some("Self-trade prevention triggered".to_string())
902            } else {
903                None
904            },
905            filled_size: result.filled_size,
906            order_id: Some(order_id),
907            wallet_address: *wallet,
908            mmp_triggered: result.mmp_triggered,
909            request_id: message.request_id.clone(),
910        };
911        output.push(EngineMessage::OrderUpdate(update_msg.clone()));
912        output.order_responses.push(update_msg);
913    }
914
915    pub(super) fn flush_orderbook_events(&mut self, output: &mut crate::rsm::apply::ApplyOutput) {
916        let drained = self.drain_orderbook_events();
917        for event in self.account_for_events(drained, output) {
918            output.push(event);
919        }
920    }
921
922    pub(super) fn handle_mmp_trigger_output(
923        &mut self,
924        wallet: &WalletAddress,
925        underlying_symbol: &str,
926        trigger_reason: String,
927        timestamp: u64,
928        output: &mut crate::rsm::apply::ApplyOutput,
929    ) {
930        output.push(EngineMessage::MmpTriggered(
931            hypercall_types::MmpTriggeredMessage {
932                wallet: *wallet,
933                currency: underlying_symbol.to_string(),
934                reason: trigger_reason,
935                timestamp,
936            },
937        ));
938
939        for (order_id, symbol) in self
940            .ctx
941            .order_index
942            .get_mmp_order_ids(wallet, underlying_symbol)
943        {
944            let cancel_info = OrderInfo {
945                symbol,
946                price: dec!(0),
947                size: dec!(0),
948                side: hypercall_types::Side::Buy,
949                tif: TimeInForce::GTC,
950                client_id: None,
951                order_id: Some(order_id),
952                is_perp: false,
953                underlying: None,
954                reduce_only: None,
955                nonce: None,
956                signature: None,
957                mmp_enabled: true,
958                builder_code_address: None,
959            };
960            let cancel_message = OrderActionMessage {
961                timestamp,
962                info: cancel_info,
963                action: OrderAction::CancelOrder,
964                wallet: *wallet,
965                api_wallet_address: None,
966                mmp_triggered: true,
967                request_id: Some(format!("mmp:{}:{}:{}", timestamp, wallet, order_id)),
968            };
969            self.cancel_order_output(cancel_message, timestamp, output, false);
970        }
971    }
972
973    pub(super) fn handle_self_trade_prevention_output(
974        &mut self,
975        message: &OrderActionMessage,
976        taker_order_id: u64,
977        maker_order_id: u64,
978        order_info: &OrderInfo,
979        wallet: &WalletAddress,
980        partial_fills: &[hypercall_types::Fill],
981        timestamp: u64,
982        output: &mut crate::rsm::apply::ApplyOutput,
983    ) {
984        let symbol = &order_info.symbol;
985        let filled_size: Decimal = partial_fills.iter().map(|f| f.size).sum();
986
987        if let Some(orderbook) = self.ctx.orderbooks.get_mut(symbol) {
988            let next_l2_seq = self.ctx.l2_update_seq.fetch_add(1, Ordering::SeqCst) + 1;
989            orderbook.set_pending_l2_sequence(next_l2_seq);
990            if let Some(canceled) = orderbook.cancel_order(maker_order_id) {
991                orderbook.emit_orderbook_events(timestamp);
992                self.ctx.order_index.remove_order(wallet, maker_order_id);
993                output.push(EngineMessage::OrderUpdate(OrderUpdateMessage {
994                    timestamp,
995                    info: OrderInfo {
996                        symbol: symbol.clone(),
997                        price: canceled.price,
998                        size: canceled.quantity,
999                        side: canceled.side,
1000                        tif: TimeInForce::GTC,
1001                        client_id: None,
1002                        order_id: Some(maker_order_id),
1003                        is_perp: order_info.is_perp,
1004                        underlying: order_info.underlying.clone(),
1005                        reduce_only: None,
1006                        nonce: None,
1007                        signature: None,
1008                        mmp_enabled: false,
1009                        builder_code_address: None,
1010                    },
1011                    status: OrderUpdateStatus::Canceled,
1012                    reason: Some("Self-trade prevention".to_string()),
1013                    filled_size: dec!(0),
1014                    order_id: Some(maker_order_id),
1015                    wallet_address: *wallet,
1016                    mmp_triggered: false,
1017                    request_id: None,
1018                }));
1019            }
1020        }
1021
1022        self.ctx.order_index.remove_order(wallet, taker_order_id);
1023        let (status, reason) = if filled_size > dec!(0) {
1024            (
1025                OrderUpdateStatus::Canceled,
1026                "Self-trade prevention (partial fill kept)",
1027            )
1028        } else {
1029            (OrderUpdateStatus::Rejected, "Self-trade prevention")
1030        };
1031        let taker_response = OrderUpdateMessage {
1032            timestamp,
1033            info: order_info.clone(),
1034            status,
1035            reason: Some(reason.to_string()),
1036            filled_size,
1037            order_id: Some(taker_order_id),
1038            wallet_address: *wallet,
1039            mmp_triggered: false,
1040            request_id: message.request_id.clone(),
1041        };
1042        output.push(EngineMessage::OrderUpdate(taker_response.clone()));
1043        output.order_responses.push(taker_response);
1044    }
1045
1046    /// Check if order would exceed open order or position limits.
1047    ///
1048    /// Enforces per-tier limits on:
1049    /// - `max_open_orders`: Maximum number of active (open/acked/partial) orders
1050    /// - `max_open_positions`: Maximum number of distinct instruments with positions
1051    ///
1052    /// Returns Ok(()) if within limits, Err(reason) if limits exceeded.
1053    pub(super) fn check_order_limits(
1054        &self,
1055        wallet: &WalletAddress,
1056        order_info: &OrderInfo,
1057    ) -> Result<(), String> {
1058        let limits = self
1059            .ctx
1060            .deps
1061            .wallet_trading_limits
1062            .get(wallet)
1063            .cloned()
1064            .unwrap_or(self.ctx.deps.default_trading_limits);
1065
1066        hypercall_engine::admission::validate_order_limits(
1067            wallet,
1068            order_info,
1069            &self.ctx.order_index,
1070            &self.ctx.engine_positions,
1071            hypercall_engine::admission::TradingLimits {
1072                max_open_orders: limits.max_open_orders,
1073                max_open_positions: limits.max_open_positions,
1074            },
1075        )
1076    }
1077
1078    pub(crate) fn check_pm_settlement_pool_gate_for_orders(
1079        &self,
1080        wallet: &WalletAddress,
1081        orders: &[OrderInfo],
1082    ) -> Result<(), String> {
1083        self.check_pm_settlement_pool_gate_for_order_groups(&[(wallet, orders)])
1084    }
1085
1086    pub(crate) fn check_pm_settlement_pool_gate_for_order_groups(
1087        &self,
1088        order_groups: &[(&WalletAddress, &[OrderInfo])],
1089    ) -> Result<(), String> {
1090        if !self.ctx.deps.portfolio_margin_pool_enabled {
1091            return Ok(());
1092        }
1093
1094        let mut combined_exposure =
1095            std::collections::BTreeMap::<String, rust_decimal::Decimal>::new();
1096        let mut wallet_exposures = Vec::new();
1097        for (wallet, orders) in order_groups {
1098            if !self.is_pm_settlement_pool_eligible_wallet(wallet)? {
1099                continue;
1100            }
1101
1102            let risk_increasing_exposure =
1103                self.pm_risk_increasing_short_option_exposure_usdc(wallet, orders)?;
1104            if risk_increasing_exposure.is_empty() {
1105                continue;
1106            }
1107            for (underlying, notional_usdc) in &risk_increasing_exposure {
1108                *combined_exposure.entry(underlying.clone()).or_default() += *notional_usdc;
1109            }
1110            wallet_exposures.push((**wallet, risk_increasing_exposure));
1111        }
1112
1113        if combined_exposure.is_empty() {
1114            return Ok(());
1115        }
1116
1117        let now_ms = self.ctx.deps.margin_timestamp_s.saturating_mul(1000);
1118        for (wallet, risk_increasing_exposure) in wallet_exposures {
1119            for underlying in risk_increasing_exposure.keys() {
1120                let incoming_short_notional_usdc = *combined_exposure
1121                    .get(underlying)
1122                    .expect("wallet exposure underlying must be included in combined exposure");
1123                let pool = self
1124                    .ctx
1125                    .pm_settlement_state
1126                    .pools
1127                    .get(underlying)
1128                    .ok_or_else(|| {
1129                        format!(
1130                            "PM settlement gate rejected {underlying}: missing settlement pool facts"
1131                        )
1132                    })?;
1133                let config = pool.config.as_ref().ok_or_else(|| {
1134                    format!(
1135                        "PM settlement gate rejected {underlying}: missing settlement pool config"
1136                    )
1137                })?;
1138
1139                let account = self.ctx.pm_settlement_state.accounts.get(
1140                    &crate::rsm::portfolio_margin::settlement_state::PmSettlementAccountKey {
1141                        wallet,
1142                        underlying: underlying.clone(),
1143                    },
1144                );
1145                let account_bridge_usdc = account
1146                    .map(|account| account.bridge_principal_usdc)
1147                    .unwrap_or(rust_decimal::Decimal::ZERO);
1148                let account_debt_usdc = account
1149                    .map(|account| account.debt_principal_usdc)
1150                    .unwrap_or(rust_decimal::Decimal::ZERO);
1151                let bridge_overdue = account
1152                    .and_then(|account| account.bridge_deadline_ms)
1153                    .is_some_and(|deadline| {
1154                        account_bridge_usdc > rust_decimal::Decimal::ZERO && deadline < now_ms
1155                    });
1156                let current_short_option_oi_usdc =
1157                    self.pm_current_short_option_oi_usdc(underlying)?;
1158                let active_liability_usdc =
1159                    pool.active_timing_bridge_usdc + pool.active_settlement_debt_usdc;
1160                let projected_short_oi_target_usdc = config.target_short_oi_notional_multiplier
1161                    * (current_short_option_oi_usdc + incoming_short_notional_usdc);
1162                let projected_pool_target_usdc =
1163                    active_liability_usdc.max(projected_short_oi_target_usdc);
1164                let projected_active_liability_usdc = pool.active_timing_bridge_usdc
1165                    + pool.active_settlement_debt_usdc
1166                    + incoming_short_notional_usdc;
1167                let current_capacity_usdc = pool.pool_available_usdc + active_liability_usdc;
1168                let projected_post_order_utilization = (current_capacity_usdc
1169                    > rust_decimal::Decimal::ZERO)
1170                    .then_some(projected_active_liability_usdc / current_capacity_usdc);
1171
1172                let context = hypercall_engine::margin_admission::PmAdmissionSettlementContext {
1173                    wallet,
1174                    underlying: underlying.clone(),
1175                    pool_available_usdc: pool.pool_available_usdc,
1176                    pool_target_usdc: projected_pool_target_usdc,
1177                    active_timing_bridge_usdc: pool.active_timing_bridge_usdc,
1178                    active_settlement_debt_usdc: pool.active_settlement_debt_usdc,
1179                    current_utilization: pool.utilization,
1180                    projected_post_order_utilization,
1181                    account_bridge_usdc,
1182                    account_debt_usdc,
1183                    bridge_overdue,
1184                    facts_as_of_ms: pool.updated_at_ms as i64,
1185                    policy_version: config.policy_version,
1186                    normal_utilization_cap: config.normal_utilization_cap,
1187                    crisis_utilization_cap: config.crisis_utilization_cap,
1188                };
1189
1190                if let hypercall_engine::margin_admission::MarginAdmissionDecision::Rejected(
1191                    reason,
1192                ) = hypercall_engine::margin_admission::decide_portfolio_margin(
1193                    hypercall_engine::margin_admission::PortfolioMarginAdmissionInput {
1194                        is_reduce_only: false,
1195                        available_collateral: rust_decimal::Decimal::ZERO,
1196                        margin_required: rust_decimal::Decimal::ZERO,
1197                        settlement_context: Some(context),
1198                    },
1199                ) {
1200                    return Err(format!(
1201                        "PM settlement gate rejected {underlying}: {reason}"
1202                    ));
1203                }
1204            }
1205        }
1206
1207        Ok(())
1208    }
1209
1210    fn pm_current_short_option_oi_usdc(
1211        &self,
1212        underlying: &str,
1213    ) -> Result<rust_decimal::Decimal, String> {
1214        let mut quantities =
1215            std::collections::BTreeMap::<(WalletAddress, String), PmShortOptionQuantity>::new();
1216
1217        for ((wallet, symbol), position) in &self.ctx.engine_positions {
1218            if !self.is_pm_settlement_pool_eligible_wallet(wallet)? {
1219                continue;
1220            }
1221            let Ok(parsed) = hypercall_engine::instrument::ParsedInstrument::parse(symbol) else {
1222                continue;
1223            };
1224            if parsed.underlying != underlying {
1225                continue;
1226            }
1227
1228            let quantity = quantities.entry((*wallet, symbol.clone())).or_default();
1229            quantity.underlying = parsed.underlying;
1230            if position.quantity < rust_decimal::Decimal::ZERO {
1231                quantity.short_position_quantity = position.quantity.abs();
1232            } else {
1233                quantity.long_position_quantity = position.quantity;
1234            }
1235        }
1236
1237        for (wallet, orders) in self.ctx.order_index.snapshot_orders() {
1238            if !self.is_pm_settlement_pool_eligible_wallet(&wallet)? {
1239                continue;
1240            }
1241            for order in orders {
1242                if order.is_perp
1243                    || !matches!(order.side, Side::Sell)
1244                    || order.remaining_size <= rust_decimal::Decimal::ZERO
1245                {
1246                    continue;
1247                }
1248                let Ok(parsed) =
1249                    hypercall_engine::instrument::ParsedInstrument::parse(&order.symbol)
1250                else {
1251                    continue;
1252                };
1253                if parsed.underlying != underlying {
1254                    continue;
1255                }
1256
1257                let quantity = quantities.entry((wallet, order.symbol)).or_default();
1258                quantity.underlying = parsed.underlying;
1259                // OrderIndex normalizes option sizes to human contract units when orders are added.
1260                quantity.open_sell_quantity += order.remaining_size;
1261            }
1262        }
1263
1264        let mut short_option_oi_usdc = rust_decimal::Decimal::ZERO;
1265        for quantity in quantities.values() {
1266            let short_open_order_quantity = (quantity.open_sell_quantity
1267                - quantity.long_position_quantity)
1268                .max(rust_decimal::Decimal::ZERO);
1269            let aggregate_short_quantity =
1270                quantity.short_position_quantity + short_open_order_quantity;
1271            if aggregate_short_quantity <= rust_decimal::Decimal::ZERO {
1272                continue;
1273            }
1274
1275            let spot_price = self
1276                .ctx
1277                .spot_prices
1278                .get(&quantity.underlying)
1279                .copied()
1280                .ok_or_else(|| {
1281                    format!(
1282                        "PM settlement gate rejected {}: missing spot price facts",
1283                        quantity.underlying
1284                    )
1285                })?;
1286            if spot_price <= rust_decimal::Decimal::ZERO {
1287                return Err(format!(
1288                    "PM settlement gate rejected {}: nonpositive spot price {}",
1289                    quantity.underlying, spot_price
1290                ));
1291            }
1292            short_option_oi_usdc += spot_price * aggregate_short_quantity;
1293        }
1294
1295        Ok(short_option_oi_usdc)
1296    }
1297
1298    fn is_pm_settlement_pool_eligible_wallet(
1299        &self,
1300        wallet: &WalletAddress,
1301    ) -> Result<bool, String> {
1302        if !self.ctx.deps.portfolio_margin_pool_enabled
1303            || !self
1304                .ctx
1305                .deps
1306                .portfolio_margin_settlement_allowlist
1307                .contains(wallet)
1308        {
1309            return Ok(false);
1310        }
1311
1312        Ok(self
1313            .margin_manager
1314            .get_margin_mode(&self.ctx.deps, wallet)?
1315            == crate::rsm::margin_mode::MarginMode::Portfolio)
1316    }
1317
1318    fn pm_risk_increasing_short_option_exposure_usdc(
1319        &self,
1320        wallet: &WalletAddress,
1321        orders: &[OrderInfo],
1322    ) -> Result<std::collections::BTreeMap<String, rust_decimal::Decimal>, String> {
1323        let mut delta_by_symbol =
1324            std::collections::BTreeMap::<String, (String, rust_decimal::Decimal)>::new();
1325        for order in orders {
1326            if order.is_perp {
1327                continue;
1328            }
1329            let Ok(parsed) = hypercall_engine::instrument::ParsedInstrument::parse(&order.symbol)
1330            else {
1331                continue;
1332            };
1333            let size_human = hypercall_types::to_human_readable_decimal(&order.symbol, order.size);
1334            let signed_delta = match order.side {
1335                Side::Buy => size_human,
1336                Side::Sell => -size_human,
1337            };
1338            let entry = delta_by_symbol
1339                .entry(order.symbol.clone())
1340                .or_insert_with(|| (parsed.underlying, rust_decimal::Decimal::ZERO));
1341            entry.1 += signed_delta;
1342        }
1343
1344        let mut exposure_by_underlying = std::collections::BTreeMap::new();
1345        for (symbol, (underlying, signed_delta)) in delta_by_symbol {
1346            if signed_delta >= rust_decimal::Decimal::ZERO {
1347                continue;
1348            }
1349            let existing_quantity = self
1350                .ctx
1351                .engine_positions
1352                .get(&(*wallet, symbol.clone()))
1353                .map(|position| position.quantity)
1354                .unwrap_or(rust_decimal::Decimal::ZERO);
1355            let effective_quantity =
1356                existing_quantity - self.pm_resting_open_sell_quantity(wallet, &symbol);
1357            let existing_short_quantity = (-effective_quantity).max(rust_decimal::Decimal::ZERO);
1358            let projected_quantity = effective_quantity + signed_delta;
1359            let projected_short_quantity = (-projected_quantity).max(rust_decimal::Decimal::ZERO);
1360            let risk_increasing_short_quantity = projected_short_quantity - existing_short_quantity;
1361            if risk_increasing_short_quantity <= rust_decimal::Decimal::ZERO {
1362                continue;
1363            }
1364
1365            let spot_price = self
1366                .ctx
1367                .spot_prices
1368                .get(&underlying)
1369                .copied()
1370                .ok_or_else(|| {
1371                    format!(
1372                        "PM settlement gate rejected {}: missing spot price facts",
1373                        underlying
1374                    )
1375                })?;
1376            if spot_price <= rust_decimal::Decimal::ZERO {
1377                return Err(format!(
1378                    "PM settlement gate rejected {}: nonpositive spot price {}",
1379                    underlying, spot_price
1380                ));
1381            }
1382            let notional_usdc = spot_price * risk_increasing_short_quantity;
1383            if notional_usdc <= rust_decimal::Decimal::ZERO {
1384                continue;
1385            }
1386            *exposure_by_underlying.entry(underlying).or_default() += notional_usdc;
1387        }
1388
1389        Ok(exposure_by_underlying)
1390    }
1391
1392    fn pm_resting_open_sell_quantity(
1393        &self,
1394        wallet: &WalletAddress,
1395        symbol: &str,
1396    ) -> rust_decimal::Decimal {
1397        let mut resting_sell_quantity = rust_decimal::Decimal::ZERO;
1398        for (order_wallet, orders) in self.ctx.order_index.snapshot_orders() {
1399            if order_wallet != *wallet {
1400                continue;
1401            }
1402            for order in orders {
1403                if !order.is_perp
1404                    && order.symbol == symbol
1405                    && matches!(order.side, Side::Sell)
1406                    && order.remaining_size > rust_decimal::Decimal::ZERO
1407                {
1408                    // OrderIndex snapshot quantities are already in human contract units.
1409                    resting_sell_quantity += order.remaining_size;
1410                }
1411            }
1412        }
1413        resting_sell_quantity
1414    }
1415}