1use super::*;
4
5struct SelfTradeMatch {
6 maker_order_id: u64,
7}
8
9#[derive(Default)]
10struct PmShortOptionQuantity {
11 underlying: String,
12 long_position_quantity: rust_decimal::Decimal,
13 short_position_quantity: rust_decimal::Decimal,
14 open_sell_quantity: rust_decimal::Decimal,
15}
16
17impl UnifiedEngine {
18 fn get_next_order_id(&mut self) -> u64 {
20 let order_id = self.ctx.next_order_id;
21 self.ctx.next_order_id += 1;
22 order_id
23 }
24
25 pub(super) fn validate_order(
29 &self,
30 order_info: &OrderInfo,
31 wallet: &WalletAddress,
32 ) -> Result<(), String> {
33 let validation_start = Instant::now();
34
35 hypercall_engine::admission::validate_order_shape(order_info)?;
36
37 let mut expired_instruments: std::collections::HashSet<String> = self
38 .ctx
39 .expired_instruments
40 .iter()
41 .filter_map(|(symbol, expired)| expired.then_some(symbol.clone()))
42 .collect();
43 if self
44 .expiry_manager
45 .is_instrument_expired(&order_info.symbol, &self.ctx.orderbooks)
46 {
47 expired_instruments.insert(order_info.symbol.clone());
48 }
49 let open_symbols: std::collections::HashSet<String> =
50 self.ctx.orderbooks.keys().cloned().collect();
51
52 if let Err(reason) = hypercall_engine::admission::validate_instrument_open(
53 order_info,
54 &expired_instruments,
55 &open_symbols,
56 ) {
57 warn!(
58 "Order rejected during engine admission for client_id {:?}: {}",
59 order_info.client_id, reason
60 );
61 return Err(reason);
62 }
63
64 match self.margin_manager.get_risk_account(
67 &self.ctx.deps,
68 &self.ctx.engine_positions,
69 &self.ctx.balance_ledger,
70 wallet,
71 ) {
72 Ok(account) if account.cash <= 0.0 => {
73 let reason = "Account has no funds. Please deposit before trading.".to_string();
74 warn!("Order rejected for wallet {} - {}", wallet, reason);
75 return Err(reason);
76 }
77 Err(reason) => {
78 warn!("Order rejected for wallet {} - {}", wallet, reason);
79 return Err(reason);
80 }
81 Ok(_) => {}
82 }
83
84 histogram!("ht_engine_phase_seconds", "phase" => "validation")
85 .record(validation_start.elapsed().as_secs_f64());
86
87 Ok(())
88 }
89
90 pub(super) fn check_order_restrictions(
92 &self,
93 order_info: &OrderInfo,
94 wallet: &WalletAddress,
95 ) -> Result<(), String> {
96 debug!(
98 "Checking order limits for wallet {} on symbol {}",
99 wallet, order_info.symbol
100 );
101 if let Err(limit_error) = self.check_order_limits(wallet, order_info) {
102 warn!(
103 "Order rejected due to order limits for client_id {:?}: {}",
104 order_info.client_id, limit_error
105 );
106 return Err(limit_error);
107 }
108
109 if let Err(block_reason) = LiquidationManager::check_preliquidation_order_allowed(
111 &self.ctx.deps,
112 &self.ctx.engine_positions,
113 wallet,
114 order_info,
115 ) {
116 warn!(
117 "Order blocked due to pre-liquidation for client_id {:?}: {}",
118 order_info.client_id, block_reason
119 );
120 return Err(block_reason);
121 }
122
123 if let Err(pm_pool_error) =
124 self.check_pm_settlement_pool_gate_for_orders(wallet, std::slice::from_ref(order_info))
125 {
126 warn!(
127 "Order blocked by PM settlement pool gate for client_id {:?}: {}",
128 order_info.client_id, pm_pool_error
129 );
130 return Err(pm_pool_error);
131 }
132
133 debug!(
135 "Running unified margin check before order acceptance for client_id {:?}: {}",
136 order_info.client_id, wallet
137 );
138 if let Err(margin_error) = self.margin_manager.check_margin_for_order(
139 &self.ctx.deps,
140 &self.ctx.engine_positions,
141 &self.ctx.balance_ledger,
142 wallet,
143 order_info,
144 &self.ctx.order_index,
145 ) {
146 warn!(
147 "Order rejected due to insufficient margin for client_id {:?}: {}",
148 order_info.client_id, margin_error
149 );
150 return Err(margin_error);
151 }
152
153 debug!(
155 "Checking tier restrictions for wallet {} on symbol {}",
156 wallet, order_info.symbol
157 );
158 if let Err(tier_error) = self.margin_manager.check_tier_restrictions(
159 &self.ctx.deps,
160 &self.ctx.engine_positions,
161 &self.ctx.balance_ledger,
162 wallet,
163 order_info,
164 &self.ctx.order_index,
165 ) {
166 warn!(
167 "Order rejected due to tier restrictions for client_id {:?}: {}",
168 order_info.client_id, tier_error
169 );
170 return Err(tier_error);
171 }
172
173 Ok(())
174 }
175
176 pub(super) async fn allocate_and_ack(
178 &mut self,
179 request: &UnifiedEngineRequest,
180 order_info: &OrderInfo,
181 wallet: &WalletAddress,
182 timestamp: u64,
183 ) -> AllocatedOrder {
184 let order_id = self.get_next_order_id();
185 info!(
186 "Pre-allocated order ID {} for order: symbol={}, wallet={}",
187 order_id, order_info.symbol, wallet
188 );
189
190 let order_info_msg = hypercall_types::OrderInfoMessage {
192 timestamp,
193 order_id,
194 wallet: *wallet,
195 info: order_info.clone(),
196 };
197 self.ctx
198 .deps
199 .emit_event(&EngineMessage::OrderInfo(order_info_msg));
200
201 let acked_response = OrderUpdateMessage {
203 timestamp,
204 info: order_info.clone(),
205 status: OrderUpdateStatus::Acked,
206 reason: Some("Order accepted and queued for matching".to_string()),
207 filled_size: dec!(0),
208 order_id: Some(order_id),
209 wallet_address: *wallet,
210 mmp_triggered: false,
211 request_id: request.message.request_id.clone(),
212 };
213
214 self.ctx
215 .deps
216 .emit_event(&EngineMessage::OrderUpdate(acked_response.clone()));
217
218 self.ctx.order_index.add_order(
220 wallet,
221 hypercall_engine::order_index::OrderSummary {
222 order_id,
223 symbol: order_info.symbol.clone(),
224 side: order_info.side,
225 price: order_info.price,
226 original_size: order_info.size,
227 remaining_size: order_info.size,
228 is_perp: order_info.is_perp,
229 mmp_enabled: order_info.mmp_enabled,
230 client_id: order_info.client_id.clone(),
231 created_at: timestamp as i64,
232 },
233 );
234
235 debug!(
236 "Sending Acked response for client_id {:?}: {:?}",
237 request.message.info.client_id, acked_response
238 );
239 let _ = request.response_tx.send(acked_response).await;
240
241 AllocatedOrder { order_id }
242 }
243
244 pub(super) fn allocate_order_output(
245 &mut self,
246 message: &OrderActionMessage,
247 order_info: &OrderInfo,
248 wallet: &WalletAddress,
249 timestamp: u64,
250 output: &mut crate::rsm::apply::ApplyOutput,
251 ) -> u64 {
252 let order_id = self.get_next_order_id();
253 let order_info_msg = hypercall_types::OrderInfoMessage {
254 timestamp,
255 order_id,
256 wallet: *wallet,
257 info: order_info.clone(),
258 };
259 output.push(EngineMessage::OrderInfo(order_info_msg));
260
261 let acked_response = OrderUpdateMessage {
262 timestamp,
263 info: order_info.clone(),
264 status: OrderUpdateStatus::Acked,
265 reason: Some("Order accepted and queued for matching".to_string()),
266 filled_size: dec!(0),
267 order_id: Some(order_id),
268 wallet_address: *wallet,
269 mmp_triggered: false,
270 request_id: message.request_id.clone(),
271 };
272 output.push(EngineMessage::OrderUpdate(acked_response.clone()));
273 output.order_responses.push(acked_response);
274
275 self.ctx.order_index.add_order(
276 wallet,
277 hypercall_engine::order_index::OrderSummary {
278 order_id,
279 symbol: order_info.symbol.clone(),
280 side: order_info.side,
281 price: order_info.price,
282 original_size: order_info.size,
283 remaining_size: order_info.size,
284 is_perp: order_info.is_perp,
285 mmp_enabled: order_info.mmp_enabled,
286 client_id: order_info.client_id.clone(),
287 created_at: timestamp as i64,
288 },
289 );
290
291 order_id
292 }
293
294 pub(super) async fn execute_matching(
296 &mut self,
297 order_id: u64,
298 order_info: &OrderInfo,
299 wallet: &WalletAddress,
300 timestamp: u64,
301 ) -> MatchingResult {
302 let orderbook_start = Instant::now();
303 let orderbook = self.ctx.orderbooks.get_mut(&order_info.symbol).unwrap();
304
305 let underlying_symbol = if let Ok(parsed) = ParsedSymbol::from_symbol(&order_info.symbol) {
306 parsed.underlying
307 } else {
308 order_info
309 .underlying
310 .clone()
311 .unwrap_or(order_info.symbol.clone())
312 };
313
314 info!(
315 "Sending order to orderbook for incremental matching: symbol={}, price={}, size={}, side={:?}, underlying={}",
316 order_info.symbol, order_info.price, order_info.size, order_info.side, underlying_symbol
317 );
318
319 let mut remaining_quantity = order_info.size;
320 let mut all_fills = Vec::new();
321 let mut mmp_triggered = false;
322 let mut self_trade_maker_order_id: Option<u64> = None;
323
324 while remaining_quantity > dec!(0) {
325 let trade_id = self.ctx.next_trade_id;
326 self.ctx.next_trade_id += 1;
327
328 let next_l2_seq = self.ctx.l2_update_seq.fetch_add(1, Ordering::SeqCst) + 1;
329 orderbook.set_pending_l2_sequence(next_l2_seq);
330
331 let (match_result, should_continue) = orderbook.process_order_with_metadata(
332 order_id,
333 order_info.price,
334 remaining_quantity,
335 order_info.side,
336 *wallet,
337 timestamp,
338 trade_id,
339 order_info.client_id.clone(),
340 order_info.mmp_enabled,
341 order_info.size, );
343
344 match Self::fill_from_match_result(match_result) {
345 Ok(Some(mut fill)) => {
346 let price_f64 = fill
347 .price
348 .to_f64()
349 .expect("CRITICAL_FAILURE: fill.price is not representable as f64");
350 let size_u64 = fill.size.trunc().to_u64().expect(
351 "CRITICAL_FAILURE: fill.size is not representable as u64 contract units",
352 );
353 let fee_calc = self.fee_service.get_fees(
354 &fill.maker_wallet_address.as_hex(),
355 &fill.taker_wallet_address.as_hex(),
356 price_f64,
357 size_u64,
358 order_info.builder_code_address.as_ref(),
359 );
360
361 fill.fee = Decimal::from_f64_retain(fee_calc.taker_fee)
362 .expect("CRITICAL_FAILURE: taker_fee is not representable as Decimal");
363 fill.builder_code_address = order_info.builder_code_address;
364 fill.builder_code_fee = fee_calc.builder_code_fee;
365
366 all_fills.push(fill.clone());
367 remaining_quantity -= fill.size;
368
369 self.ctx.order_index.fill_order(wallet, order_id, fill.size);
371 self.ctx.order_index.fill_order(
372 &fill.maker_wallet_address,
373 fill.maker_order_id,
374 fill.size,
375 );
376
377 if order_info.mmp_enabled {
381 let mmp_key = (*wallet, underlying_symbol.clone());
382 let mmp_state_entry = self.ctx.mmp_state.get_mut(&mmp_key);
383 if let Some(mmp_state) = mmp_state_entry {
384 if mmp_state.enabled {
385 let greeks_result =
390 hypercall_engine::greeks::compute_fill_greeks_sync(
391 &self.ctx.spot_prices,
392 |underlying, strike, expiry_ts, spot| {
393 self.ctx.iv_surfaces.get(underlying).and_then(
394 |surface| {
395 surface.get(strike, expiry_ts).or_else(|| {
396 surface.get_interpolated_with_spot(
397 strike,
398 expiry_ts,
399 Some(spot),
400 )
401 })
402 },
403 )
404 },
405 &fill.symbol,
406 fill.size,
407 fill.taker_side,
408 timestamp,
409 );
410
411 let (delta, vega) = match greeks_result {
412 Some(dv) => dv,
413 None => {
414 warn!(
418 "MMP TRIGGERED (fail closed): wallet={}, underlying={}, reason=greeks unavailable for {}",
419 wallet, underlying_symbol, fill.symbol
420 );
421 mmp_triggered = true;
422 self.handle_mmp_trigger(
423 wallet,
424 &underlying_symbol,
425 "MMP greeks unavailable -- fail closed".to_string(),
426 timestamp,
427 )
428 .await;
429 break;
430 }
431 };
432
433 let size_contract_units = fill
434 .size
435 .trunc()
436 .to_u64()
437 .expect(
438 "CRITICAL_FAILURE: fill.size is not representable as u64 contract units",
439 );
440
441 let record = crate::rsm::engine_deps::MmpFillRecord {
442 timestamp_ms: timestamp,
443 quantity: size_contract_units,
444 delta,
445 vega,
446 };
447
448 match mmp_state.process_fill(record, timestamp) {
449 Ok(()) => {
450 debug!(
451 "MMP check passed for fill: wallet={}, underlying={}, fill_qty={}",
452 wallet, underlying_symbol, fill.size
453 );
454 }
455 Err(trigger_reason) => {
456 warn!(
457 "MMP TRIGGERED: wallet={}, underlying={}, reason={}",
458 wallet, underlying_symbol, trigger_reason
459 );
460 mmp_triggered = true;
461 self.handle_mmp_trigger(
462 wallet,
463 &underlying_symbol,
464 trigger_reason,
465 timestamp,
466 )
467 .await;
468 break;
469 }
470 }
471 }
472 } else {
473 warn!(
474 "MMP TRIGGERED (fail closed): wallet={}, underlying={}, reason=missing MMP state",
475 wallet, underlying_symbol
476 );
477 mmp_triggered = true;
478 self.handle_mmp_trigger(
479 wallet,
480 &underlying_symbol,
481 "MMP state missing -- fail closed".to_string(),
482 timestamp,
483 )
484 .await;
485 break;
486 }
487 }
488 }
489 Err(self_trade) => {
490 warn!(
491 "SELF-TRADE DETECTED: taker_order={}, maker_order={}, wallet={}, symbol={}",
492 order_id, self_trade.maker_order_id, wallet, order_info.symbol
493 );
494 self_trade_maker_order_id = Some(self_trade.maker_order_id);
495 break;
496 }
497 Ok(None) => {}
498 }
499
500 if !should_continue {
501 break;
502 }
503 }
504
505 if remaining_quantity > dec!(0)
507 && !mmp_triggered
508 && !all_fills.is_empty()
509 && self_trade_maker_order_id.is_none()
510 && order_info.tif == TimeInForce::GTC
511 {
512 let orderbook = self.ctx.orderbooks.get_mut(&order_info.symbol).unwrap();
513 let next_l2_seq = self.ctx.l2_update_seq.fetch_add(1, Ordering::SeqCst) + 1;
514 orderbook.set_pending_l2_sequence(next_l2_seq);
515 orderbook.add_order_to_book_with_events_full(
516 order_id,
517 order_info.price,
518 remaining_quantity,
519 order_info.side,
520 *wallet,
521 timestamp,
522 order_info.client_id.clone(),
523 order_info.mmp_enabled,
524 order_info.size, );
526 }
527
528 histogram!("ht_engine_phase_seconds", "phase" => "orderbook_match")
529 .record(orderbook_start.elapsed().as_secs_f64());
530
531 let filled_size: Decimal = all_fills.iter().map(|f| f.size).sum();
532 MatchingResult {
533 fills: all_fills,
534 filled_size,
535 mmp_triggered,
536 self_trade_maker_id: self_trade_maker_order_id,
537 }
538 }
539
540 pub(super) fn execute_matching_sync(
541 &mut self,
542 order_id: u64,
543 order_info: &OrderInfo,
544 wallet: &WalletAddress,
545 timestamp: u64,
546 output: &mut crate::rsm::apply::ApplyOutput,
547 ) -> MatchingResult {
548 let underlying_symbol = if let Ok(parsed) = ParsedSymbol::from_symbol(&order_info.symbol) {
549 parsed.underlying
550 } else {
551 order_info
552 .underlying
553 .clone()
554 .unwrap_or(order_info.symbol.clone())
555 };
556
557 let mut remaining_quantity = order_info.size;
558 let mut all_fills = Vec::new();
559 let mut mmp_triggered = false;
560 let mut self_trade_maker_order_id: Option<u64> = None;
561
562 while remaining_quantity > dec!(0) {
563 let trade_id = self.ctx.next_trade_id;
564 self.ctx.next_trade_id += 1;
565
566 let orderbook = self
567 .ctx
568 .orderbooks
569 .get_mut(&order_info.symbol)
570 .expect("validated orderbook disappeared during apply");
571 let next_l2_seq = self.ctx.l2_update_seq.fetch_add(1, Ordering::SeqCst) + 1;
572 orderbook.set_pending_l2_sequence(next_l2_seq);
573
574 let (match_result, should_continue) = orderbook.process_order_with_metadata(
575 order_id,
576 order_info.price,
577 remaining_quantity,
578 order_info.side,
579 *wallet,
580 timestamp,
581 trade_id,
582 order_info.client_id.clone(),
583 order_info.mmp_enabled,
584 order_info.size,
585 );
586
587 match Self::fill_from_match_result(match_result) {
588 Ok(Some(mut fill)) => {
589 let price_f64 = fill
590 .price
591 .to_f64()
592 .expect("CRITICAL_FAILURE: fill.price is not representable as f64");
593 let size_u64 = fill.size.trunc().to_u64().expect(
594 "CRITICAL_FAILURE: fill.size is not representable as u64 contract units",
595 );
596 let fee_calc = self.fee_service.get_fees(
597 &fill.maker_wallet_address.as_hex(),
598 &fill.taker_wallet_address.as_hex(),
599 price_f64,
600 size_u64,
601 order_info.builder_code_address.as_ref(),
602 );
603 fill.fee = Decimal::from_f64_retain(fee_calc.taker_fee)
604 .expect("CRITICAL_FAILURE: taker_fee is not representable as Decimal");
605 fill.builder_code_address = order_info.builder_code_address;
606 fill.builder_code_fee = fee_calc.builder_code_fee;
607
608 remaining_quantity -= fill.size;
609 self.ctx.order_index.fill_order(wallet, order_id, fill.size);
610 self.ctx.order_index.fill_order(
611 &fill.maker_wallet_address,
612 fill.maker_order_id,
613 fill.size,
614 );
615
616 if order_info.mmp_enabled
617 && self.mmp_should_trigger(wallet, &underlying_symbol, &fill, timestamp)
618 {
619 mmp_triggered = true;
620 self.handle_mmp_trigger_output(
621 wallet,
622 &underlying_symbol,
623 "MMP triggered during fill processing".to_string(),
624 timestamp,
625 output,
626 );
627 all_fills.push(fill);
628 break;
629 }
630
631 all_fills.push(fill);
632 }
633 Err(self_trade) => {
634 self_trade_maker_order_id = Some(self_trade.maker_order_id);
635 break;
636 }
637 Ok(None) => {}
638 }
639
640 if !should_continue {
641 break;
642 }
643 }
644
645 if remaining_quantity > dec!(0)
646 && !mmp_triggered
647 && !all_fills.is_empty()
648 && self_trade_maker_order_id.is_none()
649 && order_info.tif == TimeInForce::GTC
650 {
651 let orderbook = self.ctx.orderbooks.get_mut(&order_info.symbol).unwrap();
652 let next_l2_seq = self.ctx.l2_update_seq.fetch_add(1, Ordering::SeqCst) + 1;
653 orderbook.set_pending_l2_sequence(next_l2_seq);
654 orderbook.add_order_to_book_with_events_full(
655 order_id,
656 order_info.price,
657 remaining_quantity,
658 order_info.side,
659 *wallet,
660 timestamp,
661 order_info.client_id.clone(),
662 order_info.mmp_enabled,
663 order_info.size,
664 );
665 }
666
667 let filled_size: Decimal = all_fills.iter().map(|f| f.size).sum();
668 MatchingResult {
669 fills: all_fills,
670 filled_size,
671 mmp_triggered,
672 self_trade_maker_id: self_trade_maker_order_id,
673 }
674 }
675
676 fn fill_from_match_result(
677 match_result: MatchResult,
678 ) -> Result<Option<hypercall_types::Fill>, SelfTradeMatch> {
679 match match_result {
680 MatchResult::Fill(fill) => Ok(Some(fill)),
681 MatchResult::SelfTrade { maker_order_id } => Err(SelfTradeMatch { maker_order_id }),
682 MatchResult::NoMatch => Ok(None),
683 }
684 }
685
686 fn mmp_should_trigger(
687 &mut self,
688 wallet: &WalletAddress,
689 underlying_symbol: &str,
690 fill: &hypercall_types::Fill,
691 timestamp: u64,
692 ) -> bool {
693 let Some(mmp_state) = self
694 .ctx
695 .mmp_state
696 .get_mut(&(*wallet, underlying_symbol.to_string()))
697 else {
698 warn!(
699 "MMP TRIGGERED (fail closed): wallet={}, underlying={}, reason=missing MMP state",
700 wallet, underlying_symbol
701 );
702 return true;
703 };
704 if !mmp_state.enabled {
705 return false;
706 }
707 let Some((delta, vega)) = hypercall_engine::greeks::compute_fill_greeks_sync(
708 &self.ctx.spot_prices,
709 |underlying, strike, expiry_ts, spot| {
710 self.ctx.iv_surfaces.get(underlying).and_then(|surface| {
711 surface.get(strike, expiry_ts).or_else(|| {
712 surface.get_interpolated_with_spot(strike, expiry_ts, Some(spot))
713 })
714 })
715 },
716 &fill.symbol,
717 fill.size,
718 fill.taker_side,
719 timestamp,
720 ) else {
721 warn!(
722 "MMP TRIGGERED (fail closed): wallet={}, underlying={}, reason=greeks unavailable for {}",
723 wallet, underlying_symbol, fill.symbol
724 );
725 return true;
726 };
727 let quantity = fill
728 .size
729 .trunc()
730 .to_u64()
731 .expect("CRITICAL_FAILURE: fill.size is not representable as u64 contract units");
732 let record = crate::rsm::engine_deps::MmpFillRecord {
733 timestamp_ms: timestamp,
734 quantity,
735 delta,
736 vega,
737 };
738 mmp_state.process_fill(record, timestamp).is_err()
739 }
740
741 fn record_fill_metrics(result: &MatchingResult) {
742 let fill_count = result.fills.len();
743 if fill_count == 0 {
744 return;
745 }
746
747 counter!("ht_engine_fills_total").increment(fill_count as u64);
748
749 let total_fill_value: Decimal = result
750 .fills
751 .iter()
752 .map(|f| {
753 let size_human = to_human_readable_decimal(&f.symbol, f.size);
754 f.price * size_human
755 })
756 .sum();
757 if let Some(value_f64) = total_fill_value.to_f64() {
758 let value_usd = value_f64.max(0.0).round() as u64;
759 if value_usd > 0 {
760 counter!("ht_engine_fill_value_usd_total").increment(value_usd);
761 }
762 }
763 }
764
765 pub(super) async fn finalize_order(
767 &mut self,
768 request: &UnifiedEngineRequest,
769 order_id: u64,
770 order_info: &OrderInfo,
771 wallet: &WalletAddress,
772 timestamp: u64,
773 result: &MatchingResult,
774 ) {
775 let fill_count = result.fills.len();
776
777 Self::record_fill_metrics(result);
778
779 info!(
780 "Order processed: client_id {:?}: order_id={}, fills_count={}, filled_size={}, order_size={}, mmp_triggered={}",
781 order_info.client_id, order_id, fill_count, result.filled_size, order_info.size, result.mmp_triggered
782 );
783
784 let status = if result.mmp_triggered && result.filled_size < order_info.size {
786 info!(
787 "Order status: CANCELED (MMP triggered after {} fills)",
788 result.filled_size
789 );
790 self.ctx.order_index.remove_order(wallet, order_id);
792 OrderUpdateStatus::Canceled
793 } else if result.self_trade_maker_id.is_some() && result.filled_size < order_info.size {
794 info!(
795 "Order status: CANCELED (self-trade prevention, maker_order_id={:?}, filled_size={}, order_size={})",
796 result.self_trade_maker_id, result.filled_size, order_info.size
797 );
798 self.ctx.order_index.remove_order(wallet, order_id);
800 OrderUpdateStatus::Canceled
801 } else if result.filled_size >= order_info.size {
802 info!(
803 "Order status: FILLED (filled_size={} >= order_size={})",
804 result.filled_size, order_info.size
805 );
806 OrderUpdateStatus::Filled
807 } else if result.filled_size > dec!(0) && order_info.tif != TimeInForce::GTC {
808 info!(
809 "Order status: CANCELED (IOC/FOK partial fill, filled_size={}, order_size={})",
810 result.filled_size, order_info.size
811 );
812 self.ctx.order_index.remove_order(wallet, order_id);
813 OrderUpdateStatus::Canceled
814 } else if result.filled_size > dec!(0) {
815 info!(
816 "Order status: PARTIALLY_FILLED (filled_size={}, order_size={})",
817 result.filled_size, order_info.size
818 );
819 OrderUpdateStatus::PartiallyFilled
820 } else if order_info.tif != TimeInForce::GTC {
821 info!("Order status: CANCELED (IOC/FOK, no fills)");
822 self.ctx.order_index.remove_order(wallet, order_id);
823 if let Some(ob) = self.ctx.orderbooks.get_mut(&order_info.symbol) {
824 ob.cancel_order(order_id);
825 }
826 OrderUpdateStatus::Canceled
827 } else {
828 info!("Order status: OPEN (no fills, filled_size=0)");
829 OrderUpdateStatus::Open
830 };
831
832 let update_msg = OrderUpdateMessage {
834 timestamp,
835 info: order_info.clone(),
836 status,
837 reason: if result.mmp_triggered {
838 Some("MMP triggered during fill processing".to_string())
839 } else if result.self_trade_maker_id.is_some() {
840 Some("Self-trade prevention triggered".to_string())
841 } else {
842 None
843 },
844 filled_size: result.filled_size,
845 order_id: Some(order_id),
846 wallet_address: *wallet,
847 mmp_triggered: result.mmp_triggered,
848 request_id: request.message.request_id.clone(),
849 };
850
851 info!(
852 "Sending OrderUpdate event: client_id {:?}: order_id={}, status={:?}, filled_size={}",
853 order_info.client_id, order_id, status, result.filled_size
854 );
855 self.ctx
856 .deps
857 .emit_event(&EngineMessage::OrderUpdate(update_msg.clone()));
858 }
859
860 pub(super) fn finalize_order_output(
861 &mut self,
862 message: &OrderActionMessage,
863 order_id: u64,
864 order_info: &OrderInfo,
865 wallet: &WalletAddress,
866 timestamp: u64,
867 result: &MatchingResult,
868 output: &mut crate::rsm::apply::ApplyOutput,
869 ) {
870 Self::record_fill_metrics(result);
871
872 let forced_cancel = (result.mmp_triggered || result.self_trade_maker_id.is_some())
873 && result.filled_size < order_info.size;
874 let status = if forced_cancel {
875 self.ctx.order_index.remove_order(wallet, order_id);
876 OrderUpdateStatus::Canceled
877 } else if result.filled_size >= order_info.size {
878 OrderUpdateStatus::Filled
879 } else if result.filled_size > dec!(0) && order_info.tif != TimeInForce::GTC {
880 self.ctx.order_index.remove_order(wallet, order_id);
881 OrderUpdateStatus::Canceled
882 } else if result.filled_size > dec!(0) {
883 OrderUpdateStatus::PartiallyFilled
884 } else if order_info.tif != TimeInForce::GTC {
885 self.ctx.order_index.remove_order(wallet, order_id);
886 if let Some(ob) = self.ctx.orderbooks.get_mut(&order_info.symbol) {
887 ob.cancel_order(order_id);
888 }
889 OrderUpdateStatus::Canceled
890 } else {
891 OrderUpdateStatus::Open
892 };
893
894 let update_msg = OrderUpdateMessage {
895 timestamp,
896 info: order_info.clone(),
897 status,
898 reason: if result.mmp_triggered {
899 Some("MMP triggered during fill processing".to_string())
900 } else if result.self_trade_maker_id.is_some() {
901 Some("Self-trade prevention triggered".to_string())
902 } else {
903 None
904 },
905 filled_size: result.filled_size,
906 order_id: Some(order_id),
907 wallet_address: *wallet,
908 mmp_triggered: result.mmp_triggered,
909 request_id: message.request_id.clone(),
910 };
911 output.push(EngineMessage::OrderUpdate(update_msg.clone()));
912 output.order_responses.push(update_msg);
913 }
914
915 pub(super) fn flush_orderbook_events(&mut self, output: &mut crate::rsm::apply::ApplyOutput) {
916 let drained = self.drain_orderbook_events();
917 for event in self.account_for_events(drained, output) {
918 output.push(event);
919 }
920 }
921
922 pub(super) fn handle_mmp_trigger_output(
923 &mut self,
924 wallet: &WalletAddress,
925 underlying_symbol: &str,
926 trigger_reason: String,
927 timestamp: u64,
928 output: &mut crate::rsm::apply::ApplyOutput,
929 ) {
930 output.push(EngineMessage::MmpTriggered(
931 hypercall_types::MmpTriggeredMessage {
932 wallet: *wallet,
933 currency: underlying_symbol.to_string(),
934 reason: trigger_reason,
935 timestamp,
936 },
937 ));
938
939 for (order_id, symbol) in self
940 .ctx
941 .order_index
942 .get_mmp_order_ids(wallet, underlying_symbol)
943 {
944 let cancel_info = OrderInfo {
945 symbol,
946 price: dec!(0),
947 size: dec!(0),
948 side: hypercall_types::Side::Buy,
949 tif: TimeInForce::GTC,
950 client_id: None,
951 order_id: Some(order_id),
952 is_perp: false,
953 underlying: None,
954 reduce_only: None,
955 nonce: None,
956 signature: None,
957 mmp_enabled: true,
958 builder_code_address: None,
959 };
960 let cancel_message = OrderActionMessage {
961 timestamp,
962 info: cancel_info,
963 action: OrderAction::CancelOrder,
964 wallet: *wallet,
965 api_wallet_address: None,
966 mmp_triggered: true,
967 request_id: Some(format!("mmp:{}:{}:{}", timestamp, wallet, order_id)),
968 };
969 self.cancel_order_output(cancel_message, timestamp, output, false);
970 }
971 }
972
973 pub(super) fn handle_self_trade_prevention_output(
974 &mut self,
975 message: &OrderActionMessage,
976 taker_order_id: u64,
977 maker_order_id: u64,
978 order_info: &OrderInfo,
979 wallet: &WalletAddress,
980 partial_fills: &[hypercall_types::Fill],
981 timestamp: u64,
982 output: &mut crate::rsm::apply::ApplyOutput,
983 ) {
984 let symbol = &order_info.symbol;
985 let filled_size: Decimal = partial_fills.iter().map(|f| f.size).sum();
986
987 if let Some(orderbook) = self.ctx.orderbooks.get_mut(symbol) {
988 let next_l2_seq = self.ctx.l2_update_seq.fetch_add(1, Ordering::SeqCst) + 1;
989 orderbook.set_pending_l2_sequence(next_l2_seq);
990 if let Some(canceled) = orderbook.cancel_order(maker_order_id) {
991 orderbook.emit_orderbook_events(timestamp);
992 self.ctx.order_index.remove_order(wallet, maker_order_id);
993 output.push(EngineMessage::OrderUpdate(OrderUpdateMessage {
994 timestamp,
995 info: OrderInfo {
996 symbol: symbol.clone(),
997 price: canceled.price,
998 size: canceled.quantity,
999 side: canceled.side,
1000 tif: TimeInForce::GTC,
1001 client_id: None,
1002 order_id: Some(maker_order_id),
1003 is_perp: order_info.is_perp,
1004 underlying: order_info.underlying.clone(),
1005 reduce_only: None,
1006 nonce: None,
1007 signature: None,
1008 mmp_enabled: false,
1009 builder_code_address: None,
1010 },
1011 status: OrderUpdateStatus::Canceled,
1012 reason: Some("Self-trade prevention".to_string()),
1013 filled_size: dec!(0),
1014 order_id: Some(maker_order_id),
1015 wallet_address: *wallet,
1016 mmp_triggered: false,
1017 request_id: None,
1018 }));
1019 }
1020 }
1021
1022 self.ctx.order_index.remove_order(wallet, taker_order_id);
1023 let (status, reason) = if filled_size > dec!(0) {
1024 (
1025 OrderUpdateStatus::Canceled,
1026 "Self-trade prevention (partial fill kept)",
1027 )
1028 } else {
1029 (OrderUpdateStatus::Rejected, "Self-trade prevention")
1030 };
1031 let taker_response = OrderUpdateMessage {
1032 timestamp,
1033 info: order_info.clone(),
1034 status,
1035 reason: Some(reason.to_string()),
1036 filled_size,
1037 order_id: Some(taker_order_id),
1038 wallet_address: *wallet,
1039 mmp_triggered: false,
1040 request_id: message.request_id.clone(),
1041 };
1042 output.push(EngineMessage::OrderUpdate(taker_response.clone()));
1043 output.order_responses.push(taker_response);
1044 }
1045
1046 pub(super) fn check_order_limits(
1054 &self,
1055 wallet: &WalletAddress,
1056 order_info: &OrderInfo,
1057 ) -> Result<(), String> {
1058 let limits = self
1059 .ctx
1060 .deps
1061 .wallet_trading_limits
1062 .get(wallet)
1063 .cloned()
1064 .unwrap_or(self.ctx.deps.default_trading_limits);
1065
1066 hypercall_engine::admission::validate_order_limits(
1067 wallet,
1068 order_info,
1069 &self.ctx.order_index,
1070 &self.ctx.engine_positions,
1071 hypercall_engine::admission::TradingLimits {
1072 max_open_orders: limits.max_open_orders,
1073 max_open_positions: limits.max_open_positions,
1074 },
1075 )
1076 }
1077
1078 pub(crate) fn check_pm_settlement_pool_gate_for_orders(
1079 &self,
1080 wallet: &WalletAddress,
1081 orders: &[OrderInfo],
1082 ) -> Result<(), String> {
1083 self.check_pm_settlement_pool_gate_for_order_groups(&[(wallet, orders)])
1084 }
1085
1086 pub(crate) fn check_pm_settlement_pool_gate_for_order_groups(
1087 &self,
1088 order_groups: &[(&WalletAddress, &[OrderInfo])],
1089 ) -> Result<(), String> {
1090 if !self.ctx.deps.portfolio_margin_pool_enabled {
1091 return Ok(());
1092 }
1093
1094 let mut combined_exposure =
1095 std::collections::BTreeMap::<String, rust_decimal::Decimal>::new();
1096 let mut wallet_exposures = Vec::new();
1097 for (wallet, orders) in order_groups {
1098 if !self.is_pm_settlement_pool_eligible_wallet(wallet)? {
1099 continue;
1100 }
1101
1102 let risk_increasing_exposure =
1103 self.pm_risk_increasing_short_option_exposure_usdc(wallet, orders)?;
1104 if risk_increasing_exposure.is_empty() {
1105 continue;
1106 }
1107 for (underlying, notional_usdc) in &risk_increasing_exposure {
1108 *combined_exposure.entry(underlying.clone()).or_default() += *notional_usdc;
1109 }
1110 wallet_exposures.push((**wallet, risk_increasing_exposure));
1111 }
1112
1113 if combined_exposure.is_empty() {
1114 return Ok(());
1115 }
1116
1117 let now_ms = self.ctx.deps.margin_timestamp_s.saturating_mul(1000);
1118 for (wallet, risk_increasing_exposure) in wallet_exposures {
1119 for underlying in risk_increasing_exposure.keys() {
1120 let incoming_short_notional_usdc = *combined_exposure
1121 .get(underlying)
1122 .expect("wallet exposure underlying must be included in combined exposure");
1123 let pool = self
1124 .ctx
1125 .pm_settlement_state
1126 .pools
1127 .get(underlying)
1128 .ok_or_else(|| {
1129 format!(
1130 "PM settlement gate rejected {underlying}: missing settlement pool facts"
1131 )
1132 })?;
1133 let config = pool.config.as_ref().ok_or_else(|| {
1134 format!(
1135 "PM settlement gate rejected {underlying}: missing settlement pool config"
1136 )
1137 })?;
1138
1139 let account = self.ctx.pm_settlement_state.accounts.get(
1140 &crate::rsm::portfolio_margin::settlement_state::PmSettlementAccountKey {
1141 wallet,
1142 underlying: underlying.clone(),
1143 },
1144 );
1145 let account_bridge_usdc = account
1146 .map(|account| account.bridge_principal_usdc)
1147 .unwrap_or(rust_decimal::Decimal::ZERO);
1148 let account_debt_usdc = account
1149 .map(|account| account.debt_principal_usdc)
1150 .unwrap_or(rust_decimal::Decimal::ZERO);
1151 let bridge_overdue = account
1152 .and_then(|account| account.bridge_deadline_ms)
1153 .is_some_and(|deadline| {
1154 account_bridge_usdc > rust_decimal::Decimal::ZERO && deadline < now_ms
1155 });
1156 let current_short_option_oi_usdc =
1157 self.pm_current_short_option_oi_usdc(underlying)?;
1158 let active_liability_usdc =
1159 pool.active_timing_bridge_usdc + pool.active_settlement_debt_usdc;
1160 let projected_short_oi_target_usdc = config.target_short_oi_notional_multiplier
1161 * (current_short_option_oi_usdc + incoming_short_notional_usdc);
1162 let projected_pool_target_usdc =
1163 active_liability_usdc.max(projected_short_oi_target_usdc);
1164 let projected_active_liability_usdc = pool.active_timing_bridge_usdc
1165 + pool.active_settlement_debt_usdc
1166 + incoming_short_notional_usdc;
1167 let current_capacity_usdc = pool.pool_available_usdc + active_liability_usdc;
1168 let projected_post_order_utilization = (current_capacity_usdc
1169 > rust_decimal::Decimal::ZERO)
1170 .then_some(projected_active_liability_usdc / current_capacity_usdc);
1171
1172 let context = hypercall_engine::margin_admission::PmAdmissionSettlementContext {
1173 wallet,
1174 underlying: underlying.clone(),
1175 pool_available_usdc: pool.pool_available_usdc,
1176 pool_target_usdc: projected_pool_target_usdc,
1177 active_timing_bridge_usdc: pool.active_timing_bridge_usdc,
1178 active_settlement_debt_usdc: pool.active_settlement_debt_usdc,
1179 current_utilization: pool.utilization,
1180 projected_post_order_utilization,
1181 account_bridge_usdc,
1182 account_debt_usdc,
1183 bridge_overdue,
1184 facts_as_of_ms: pool.updated_at_ms as i64,
1185 policy_version: config.policy_version,
1186 normal_utilization_cap: config.normal_utilization_cap,
1187 crisis_utilization_cap: config.crisis_utilization_cap,
1188 };
1189
1190 if let hypercall_engine::margin_admission::MarginAdmissionDecision::Rejected(
1191 reason,
1192 ) = hypercall_engine::margin_admission::decide_portfolio_margin(
1193 hypercall_engine::margin_admission::PortfolioMarginAdmissionInput {
1194 is_reduce_only: false,
1195 available_collateral: rust_decimal::Decimal::ZERO,
1196 margin_required: rust_decimal::Decimal::ZERO,
1197 settlement_context: Some(context),
1198 },
1199 ) {
1200 return Err(format!(
1201 "PM settlement gate rejected {underlying}: {reason}"
1202 ));
1203 }
1204 }
1205 }
1206
1207 Ok(())
1208 }
1209
1210 fn pm_current_short_option_oi_usdc(
1211 &self,
1212 underlying: &str,
1213 ) -> Result<rust_decimal::Decimal, String> {
1214 let mut quantities =
1215 std::collections::BTreeMap::<(WalletAddress, String), PmShortOptionQuantity>::new();
1216
1217 for ((wallet, symbol), position) in &self.ctx.engine_positions {
1218 if !self.is_pm_settlement_pool_eligible_wallet(wallet)? {
1219 continue;
1220 }
1221 let Ok(parsed) = hypercall_engine::instrument::ParsedInstrument::parse(symbol) else {
1222 continue;
1223 };
1224 if parsed.underlying != underlying {
1225 continue;
1226 }
1227
1228 let quantity = quantities.entry((*wallet, symbol.clone())).or_default();
1229 quantity.underlying = parsed.underlying;
1230 if position.quantity < rust_decimal::Decimal::ZERO {
1231 quantity.short_position_quantity = position.quantity.abs();
1232 } else {
1233 quantity.long_position_quantity = position.quantity;
1234 }
1235 }
1236
1237 for (wallet, orders) in self.ctx.order_index.snapshot_orders() {
1238 if !self.is_pm_settlement_pool_eligible_wallet(&wallet)? {
1239 continue;
1240 }
1241 for order in orders {
1242 if order.is_perp
1243 || !matches!(order.side, Side::Sell)
1244 || order.remaining_size <= rust_decimal::Decimal::ZERO
1245 {
1246 continue;
1247 }
1248 let Ok(parsed) =
1249 hypercall_engine::instrument::ParsedInstrument::parse(&order.symbol)
1250 else {
1251 continue;
1252 };
1253 if parsed.underlying != underlying {
1254 continue;
1255 }
1256
1257 let quantity = quantities.entry((wallet, order.symbol)).or_default();
1258 quantity.underlying = parsed.underlying;
1259 quantity.open_sell_quantity += order.remaining_size;
1261 }
1262 }
1263
1264 let mut short_option_oi_usdc = rust_decimal::Decimal::ZERO;
1265 for quantity in quantities.values() {
1266 let short_open_order_quantity = (quantity.open_sell_quantity
1267 - quantity.long_position_quantity)
1268 .max(rust_decimal::Decimal::ZERO);
1269 let aggregate_short_quantity =
1270 quantity.short_position_quantity + short_open_order_quantity;
1271 if aggregate_short_quantity <= rust_decimal::Decimal::ZERO {
1272 continue;
1273 }
1274
1275 let spot_price = self
1276 .ctx
1277 .spot_prices
1278 .get(&quantity.underlying)
1279 .copied()
1280 .ok_or_else(|| {
1281 format!(
1282 "PM settlement gate rejected {}: missing spot price facts",
1283 quantity.underlying
1284 )
1285 })?;
1286 if spot_price <= rust_decimal::Decimal::ZERO {
1287 return Err(format!(
1288 "PM settlement gate rejected {}: nonpositive spot price {}",
1289 quantity.underlying, spot_price
1290 ));
1291 }
1292 short_option_oi_usdc += spot_price * aggregate_short_quantity;
1293 }
1294
1295 Ok(short_option_oi_usdc)
1296 }
1297
1298 fn is_pm_settlement_pool_eligible_wallet(
1299 &self,
1300 wallet: &WalletAddress,
1301 ) -> Result<bool, String> {
1302 if !self.ctx.deps.portfolio_margin_pool_enabled
1303 || !self
1304 .ctx
1305 .deps
1306 .portfolio_margin_settlement_allowlist
1307 .contains(wallet)
1308 {
1309 return Ok(false);
1310 }
1311
1312 Ok(self
1313 .margin_manager
1314 .get_margin_mode(&self.ctx.deps, wallet)?
1315 == crate::rsm::margin_mode::MarginMode::Portfolio)
1316 }
1317
1318 fn pm_risk_increasing_short_option_exposure_usdc(
1319 &self,
1320 wallet: &WalletAddress,
1321 orders: &[OrderInfo],
1322 ) -> Result<std::collections::BTreeMap<String, rust_decimal::Decimal>, String> {
1323 let mut delta_by_symbol =
1324 std::collections::BTreeMap::<String, (String, rust_decimal::Decimal)>::new();
1325 for order in orders {
1326 if order.is_perp {
1327 continue;
1328 }
1329 let Ok(parsed) = hypercall_engine::instrument::ParsedInstrument::parse(&order.symbol)
1330 else {
1331 continue;
1332 };
1333 let size_human = hypercall_types::to_human_readable_decimal(&order.symbol, order.size);
1334 let signed_delta = match order.side {
1335 Side::Buy => size_human,
1336 Side::Sell => -size_human,
1337 };
1338 let entry = delta_by_symbol
1339 .entry(order.symbol.clone())
1340 .or_insert_with(|| (parsed.underlying, rust_decimal::Decimal::ZERO));
1341 entry.1 += signed_delta;
1342 }
1343
1344 let mut exposure_by_underlying = std::collections::BTreeMap::new();
1345 for (symbol, (underlying, signed_delta)) in delta_by_symbol {
1346 if signed_delta >= rust_decimal::Decimal::ZERO {
1347 continue;
1348 }
1349 let existing_quantity = self
1350 .ctx
1351 .engine_positions
1352 .get(&(*wallet, symbol.clone()))
1353 .map(|position| position.quantity)
1354 .unwrap_or(rust_decimal::Decimal::ZERO);
1355 let effective_quantity =
1356 existing_quantity - self.pm_resting_open_sell_quantity(wallet, &symbol);
1357 let existing_short_quantity = (-effective_quantity).max(rust_decimal::Decimal::ZERO);
1358 let projected_quantity = effective_quantity + signed_delta;
1359 let projected_short_quantity = (-projected_quantity).max(rust_decimal::Decimal::ZERO);
1360 let risk_increasing_short_quantity = projected_short_quantity - existing_short_quantity;
1361 if risk_increasing_short_quantity <= rust_decimal::Decimal::ZERO {
1362 continue;
1363 }
1364
1365 let spot_price = self
1366 .ctx
1367 .spot_prices
1368 .get(&underlying)
1369 .copied()
1370 .ok_or_else(|| {
1371 format!(
1372 "PM settlement gate rejected {}: missing spot price facts",
1373 underlying
1374 )
1375 })?;
1376 if spot_price <= rust_decimal::Decimal::ZERO {
1377 return Err(format!(
1378 "PM settlement gate rejected {}: nonpositive spot price {}",
1379 underlying, spot_price
1380 ));
1381 }
1382 let notional_usdc = spot_price * risk_increasing_short_quantity;
1383 if notional_usdc <= rust_decimal::Decimal::ZERO {
1384 continue;
1385 }
1386 *exposure_by_underlying.entry(underlying).or_default() += notional_usdc;
1387 }
1388
1389 Ok(exposure_by_underlying)
1390 }
1391
1392 fn pm_resting_open_sell_quantity(
1393 &self,
1394 wallet: &WalletAddress,
1395 symbol: &str,
1396 ) -> rust_decimal::Decimal {
1397 let mut resting_sell_quantity = rust_decimal::Decimal::ZERO;
1398 for (order_wallet, orders) in self.ctx.order_index.snapshot_orders() {
1399 if order_wallet != *wallet {
1400 continue;
1401 }
1402 for order in orders {
1403 if !order.is_perp
1404 && order.symbol == symbol
1405 && matches!(order.side, Side::Sell)
1406 && order.remaining_size > rust_decimal::Decimal::ZERO
1407 {
1408 resting_sell_quantity += order.remaining_size;
1410 }
1411 }
1412 }
1413 resting_sell_quantity
1414 }
1415}