1use super::*;
4
5impl UnifiedEngine {
6 pub(super) fn apply_order_action(
7 &mut self,
8 message: OrderActionMessage,
9 timestamp: u64,
10 output: &mut crate::rsm::apply::ApplyOutput,
11 ) {
12 self.ctx.deps.margin_timestamp_s = (timestamp / 1000) as i64;
13 output.push(EngineMessage::OrderAction(message.clone()));
14 let action_label = match message.action {
15 OrderAction::CreateOrder => "create",
16 OrderAction::CancelOrder => "cancel",
17 OrderAction::ReplaceOrder => "replace",
18 };
19 counter!("ht_engine_orders_total", "action" => action_label).increment(1);
20
21 match message.action {
22 OrderAction::CreateOrder => {
23 if message.info.is_perp {
24 self.apply_perp_order(message, timestamp, output);
25 } else {
26 self.apply_create_order(message, timestamp, output);
27 }
28 }
29 OrderAction::CancelOrder => self.apply_cancel_order(message, timestamp, output),
30 OrderAction::ReplaceOrder => self.apply_replace_order(message, timestamp, output),
31 }
32 }
33
34 fn apply_create_order(
35 &mut self,
36 message: OrderActionMessage,
37 timestamp: u64,
38 output: &mut crate::rsm::apply::ApplyOutput,
39 ) {
40 let order_info = message.info.clone();
41 let wallet = message.wallet;
42
43 if let Err(reason) = self.validate_order(&order_info, &wallet) {
44 self.push_order_rejection(message, reason, timestamp, output);
45 return;
46 }
47 if let Err(reason) = self.check_order_restrictions(&order_info, &wallet) {
48 self.push_order_rejection(message, reason, timestamp, output);
49 return;
50 }
51
52 let order_id =
53 self.allocate_order_output(&message, &order_info, &wallet, timestamp, output);
54 let result = self.execute_matching_sync(order_id, &order_info, &wallet, timestamp, output);
55
56 if let Some(maker_id) = result.self_trade_maker_id {
57 self.handle_self_trade_prevention_output(
58 &message,
59 order_id,
60 maker_id,
61 &order_info,
62 &wallet,
63 &result.fills,
64 timestamp,
65 output,
66 );
67 self.flush_orderbook_events(output);
68 return;
69 }
70
71 self.finalize_order_output(
72 &message,
73 order_id,
74 &order_info,
75 &wallet,
76 timestamp,
77 &result,
78 output,
79 );
80 self.flush_orderbook_events(output);
81 }
82
83 fn apply_perp_order(
84 &mut self,
85 message: OrderActionMessage,
86 timestamp: u64,
87 output: &mut crate::rsm::apply::ApplyOutput,
88 ) {
89 let order_info = &message.info;
90 let wallet = &message.wallet;
91
92 match self.margin_manager.get_risk_account(
93 &self.ctx.deps,
94 &self.ctx.engine_positions,
95 &self.ctx.balance_ledger,
96 wallet,
97 ) {
98 Ok(account) if account.cash <= 0.0 => {
99 self.push_order_rejection(
100 message,
101 "Account has no funds. Please deposit before trading.".to_string(),
102 timestamp,
103 output,
104 );
105 return;
106 }
107 Err(reason) => {
108 self.push_order_rejection(message, reason, timestamp, output);
109 return;
110 }
111 Ok(_) => {}
112 }
113
114 if order_info.underlying.is_none() {
115 self.push_order_rejection(
116 message,
117 "Perp order missing underlying symbol".to_string(),
118 timestamp,
119 output,
120 );
121 return;
122 }
123
124 if let Err(reason) = self.margin_manager.check_margin_for_order(
125 &self.ctx.deps,
126 &self.ctx.engine_positions,
127 &self.ctx.balance_ledger,
128 wallet,
129 order_info,
130 &self.ctx.order_index,
131 ) {
132 self.push_order_rejection(message, reason, timestamp, output);
133 return;
134 }
135
136 let response = OrderUpdateMessage {
137 timestamp,
138 info: order_info.clone(),
139 status: OrderUpdateStatus::Acked,
140 reason: Some("Margin validation passed - order queued for execution".to_string()),
141 filled_size: dec!(0),
142 order_id: None,
143 wallet_address: *wallet,
144 mmp_triggered: false,
145 request_id: message.request_id.clone(),
146 };
147 output.push(EngineMessage::OrderUpdate(response.clone()));
148 output.order_responses.push(response);
149 }
150
151 fn apply_cancel_order(
152 &mut self,
153 message: OrderActionMessage,
154 timestamp: u64,
155 output: &mut crate::rsm::apply::ApplyOutput,
156 ) {
157 self.cancel_order_output(message, timestamp, output, true);
158 self.flush_orderbook_events(output);
159 }
160
161 fn apply_replace_order(
162 &mut self,
163 message: OrderActionMessage,
164 timestamp: u64,
165 output: &mut crate::rsm::apply::ApplyOutput,
166 ) {
167 let Some(cancel_order_id) = message.info.order_id else {
168 self.push_order_rejection(
169 message,
170 "Replace order missing order_id to cancel".to_string(),
171 timestamp,
172 output,
173 );
174 return;
175 };
176 let wallet = message.wallet;
177 let Some(cancel_symbol) = self.lookup_symbol_by_order_id(&wallet, cancel_order_id) else {
178 self.push_order_rejection(
179 message,
180 "Cancel target order has already been completed or cancelled".to_string(),
181 timestamp,
182 output,
183 );
184 return;
185 };
186
187 let Some(orderbook) = self.ctx.orderbooks.get_mut(&cancel_symbol) else {
188 self.push_order_rejection(
189 message,
190 format!("Orderbook not found for symbol: {}", cancel_symbol),
191 timestamp,
192 output,
193 );
194 return;
195 };
196
197 match orderbook.cancel_order(cancel_order_id) {
198 Some(canceled_order) => {
199 let next_l2_seq = self.ctx.l2_update_seq.fetch_add(1, Ordering::SeqCst) + 1;
200 orderbook.set_pending_l2_sequence(next_l2_seq);
201 orderbook.emit_orderbook_events(timestamp);
202 self.ctx.order_index.remove_order(&wallet, cancel_order_id);
203
204 let cancel_response = OrderUpdateMessage {
205 timestamp,
206 info: OrderInfo {
207 symbol: cancel_symbol,
208 price: canceled_order.price,
209 size: canceled_order.quantity,
210 side: canceled_order.side,
211 tif: message.info.tif,
212 client_id: None,
213 order_id: Some(cancel_order_id),
214 is_perp: false,
215 underlying: None,
216 reduce_only: None,
217 nonce: None,
218 signature: None,
219 mmp_enabled: false,
220 builder_code_address: None,
221 },
222 status: OrderUpdateStatus::Canceled,
223 reason: Some("Order canceled by replace".to_string()),
224 filled_size: dec!(0),
225 order_id: Some(cancel_order_id),
226 wallet_address: wallet,
227 mmp_triggered: false,
228 request_id: message.request_id.clone(),
229 };
230 output.push(EngineMessage::OrderUpdate(cancel_response));
231 }
232 None => {
233 self.push_order_rejection(
234 message,
235 format!(
236 "Replace failed: order {} not found in orderbook {} (already filled or cancelled)",
237 cancel_order_id, cancel_symbol
238 ),
239 timestamp,
240 output,
241 );
242 return;
243 }
244 }
245
246 let mut create_info = message.info.clone();
247 create_info.order_id = None;
248 let create_message = OrderActionMessage {
249 timestamp: message.timestamp,
250 info: create_info,
251 action: OrderAction::CreateOrder,
252 wallet: message.wallet,
253 api_wallet_address: message.api_wallet_address,
254 mmp_triggered: false,
255 request_id: message.request_id,
256 };
257 self.apply_create_order(create_message, timestamp, output);
258 self.flush_orderbook_events(output);
259 }
260
261 fn push_order_rejection(
262 &mut self,
263 message: OrderActionMessage,
264 reason: String,
265 timestamp: u64,
266 output: &mut crate::rsm::apply::ApplyOutput,
267 ) {
268 self.push_order_rejection_with_response(message, reason, timestamp, output, true);
269 }
270
271 fn push_order_rejection_with_response(
272 &mut self,
273 message: OrderActionMessage,
274 reason: String,
275 timestamp: u64,
276 output: &mut crate::rsm::apply::ApplyOutput,
277 include_response: bool,
278 ) {
279 let response = OrderUpdateMessage {
280 timestamp,
281 info: message.info,
282 status: OrderUpdateStatus::Rejected,
283 reason: Some(reason),
284 filled_size: dec!(0),
285 order_id: None,
286 wallet_address: message.wallet,
287 mmp_triggered: false,
288 request_id: message.request_id,
289 };
290 output.push(EngineMessage::OrderUpdate(response.clone()));
291 if include_response {
292 output.order_responses.push(response);
293 }
294 }
295
296 pub(super) fn cancel_order_output(
297 &mut self,
298 message: OrderActionMessage,
299 timestamp: u64,
300 output: &mut crate::rsm::apply::ApplyOutput,
301 include_response: bool,
302 ) {
303 let order_info = &message.info;
304 let wallet = &message.wallet;
305 let (order_id_opt, symbol_opt) = if let Some(oid) = order_info.order_id {
306 (Some(oid), self.lookup_symbol_by_order_id(wallet, oid))
307 } else if let Some(ref client_id) = order_info.client_id {
308 if let Some(oid_str) = client_id.strip_prefix("oid:") {
309 if let Ok(oid) = oid_str.parse::<u64>() {
310 (Some(oid), self.lookup_symbol_by_order_id(wallet, oid))
311 } else {
312 (None, None)
313 }
314 } else {
315 self.lookup_by_client_id(wallet, client_id)
316 }
317 } else {
318 (None, None)
319 };
320
321 let (order_id, symbol) = match (order_id_opt, symbol_opt) {
322 (Some(oid), Some(sym)) => (oid, sym),
323 _ => {
324 self.push_order_rejection_with_response(
325 message,
326 "This order has already been completed or cancelled".to_string(),
327 timestamp,
328 output,
329 include_response,
330 );
331 return;
332 }
333 };
334
335 let Some(orderbook) = self.ctx.orderbooks.get_mut(&symbol) else {
336 self.push_order_rejection_with_response(
337 message,
338 format!("Orderbook not found for symbol: {}", symbol),
339 timestamp,
340 output,
341 include_response,
342 );
343 return;
344 };
345
346 match orderbook.cancel_order(order_id) {
347 Some(canceled_order) => {
348 let next_l2_seq = self.ctx.l2_update_seq.fetch_add(1, Ordering::SeqCst) + 1;
349 orderbook.set_pending_l2_sequence(next_l2_seq);
350 orderbook.emit_orderbook_events(timestamp);
351 self.ctx.order_index.remove_order(wallet, order_id);
352
353 let reason_text = if message.mmp_triggered {
354 "Order canceled by MMP trigger".to_string()
355 } else {
356 "Order canceled successfully".to_string()
357 };
358 let response = OrderUpdateMessage {
359 timestamp,
360 info: OrderInfo {
361 symbol,
362 price: canceled_order.price,
363 size: canceled_order.quantity,
364 side: canceled_order.side,
365 tif: order_info.tif,
366 client_id: order_info.client_id.clone(),
367 order_id: Some(order_id),
368 is_perp: order_info.is_perp,
369 underlying: order_info.underlying.clone(),
370 reduce_only: order_info.reduce_only,
371 nonce: order_info.nonce,
372 signature: order_info.signature.clone(),
373 mmp_enabled: order_info.mmp_enabled,
374 builder_code_address: order_info.builder_code_address,
375 },
376 status: OrderUpdateStatus::Canceled,
377 reason: Some(reason_text),
378 filled_size: dec!(0),
379 order_id: Some(order_id),
380 wallet_address: *wallet,
381 mmp_triggered: message.mmp_triggered,
382 request_id: message.request_id.clone(),
383 };
384 output.push(EngineMessage::OrderUpdate(response.clone()));
385 if include_response {
386 output.order_responses.push(response);
387 }
388 }
389 None => {
390 self.push_order_rejection_with_response(
391 message,
392 format!(
393 "Order {} not found in orderbook {} (already filled or cancelled)",
394 order_id, symbol
395 ),
396 timestamp,
397 output,
398 include_response,
399 );
400 }
401 }
402 }
403
404 async fn handle_create_order(&mut self, request: UnifiedEngineRequest, timestamp: u64) {
407 let start = Instant::now();
408 let is_standalone_create = request.message.action == OrderAction::CreateOrder;
411 let order_info = request.message.info.clone();
412 let wallet = request.message.wallet;
413
414 if let Err(reason) = OrderAdmission::validate_order(self, &order_info, &wallet) {
416 self.send_rejection(request, reason, timestamp).await;
417 return;
418 }
419
420 if let Err(reason) = OrderAdmission::check_order_restrictions(self, &order_info, &wallet) {
422 self.send_rejection(request, reason, timestamp).await;
423 return;
424 }
425
426 let allocated =
428 OrderExecution::allocate_and_ack(self, &request, &order_info, &wallet, timestamp).await;
429
430 let result = OrderExecution::execute_matching(
432 self,
433 allocated.order_id,
434 &order_info,
435 &wallet,
436 timestamp,
437 )
438 .await;
439
440 if let Some(maker_id) = result.self_trade_maker_id {
442 self.handle_self_trade_prevention(
443 &request,
444 allocated.order_id,
445 maker_id,
446 &order_info,
447 &wallet,
448 &result.fills,
449 timestamp,
450 )
451 .await;
452 return;
453 }
454
455 OrderExecution::finalize_order(
457 self,
458 &request,
459 allocated.order_id,
460 &order_info,
461 &wallet,
462 timestamp,
463 &result,
464 )
465 .await;
466
467 if is_standalone_create {
468 histogram!("ht_engine_order_processing_seconds", "action" => "create")
469 .record(start.elapsed().as_secs_f64());
470 }
471 }
472
473 pub(super) async fn process_order(&mut self, request: UnifiedEngineRequest) {
475 #[cfg(feature = "otel-tracing")]
477 if let Some(ref parent_cx) = request.trace_context {
478 tracing::Span::current().set_parent(parent_cx.clone());
479 }
480
481 let start = Instant::now();
482
483 let queue_wait = request.enqueued_at.elapsed().as_secs_f64();
485 histogram!("ht_engine_queue_wait_seconds").record(queue_wait);
486
487 decrement_pending_requests();
489
490 gauge!("ht_engine_pending_requests").set(get_pending_requests() as f64);
492
493 debug!(
494 "Processing order with client_id {:?}: {:?} (queue_wait: {:.3}s)",
495 request.message.info.client_id, request, queue_wait
496 );
497 let timestamp = request.message.timestamp;
499 self.ctx.deps.margin_timestamp_s = (timestamp / 1000) as i64;
500
501 let order_info = &request.message.info;
505 let wallet = &request.message.wallet;
506
507 let action_label = match request.message.action {
509 OrderAction::CreateOrder => "create",
510 OrderAction::CancelOrder => "cancel",
511 OrderAction::ReplaceOrder => "replace",
512 };
513 counter!("ht_engine_orders_total", "action" => action_label).increment(1);
514
515 let _ = self
517 .ctx
518 .deps
519 .event_sender
520 .send(EngineMessage::OrderAction(request.message.clone()));
521
522 if request.message.action == OrderAction::CancelOrder {
524 self.process_cancel_order(request, timestamp).await;
525 histogram!("ht_engine_order_processing_seconds", "action" => "cancel")
526 .record(start.elapsed().as_secs_f64());
527 return;
528 }
529
530 if request.message.action == OrderAction::ReplaceOrder {
532 self.process_replace_order(request, timestamp).await;
533 histogram!("ht_engine_order_processing_seconds", "action" => "replace")
534 .record(start.elapsed().as_secs_f64());
535 return;
536 }
537
538 if request.message.action != OrderAction::CreateOrder {
540 warn!(
541 "Unsupported order action for client_id {:?}: {:?}",
542 request.message.info.client_id, request.message.action
543 );
544 return;
545 }
546
547 if order_info.is_perp {
549 info!("🔄 RSM: Processing PERP order for wallet: {}, underlying: {:?}, price: {}, size: {}, side: {:?}",
550 wallet, order_info.underlying, order_info.price, order_info.size, order_info.side);
551 self.process_perp_order(request, timestamp).await;
552 histogram!("ht_engine_order_processing_seconds", "action" => "create")
553 .record(start.elapsed().as_secs_f64());
554 return;
555 }
556
557 self.handle_create_order(request, timestamp).await;
559 }
560
561 fn lookup_symbol_by_order_id(&self, wallet: &WalletAddress, order_id: u64) -> Option<String> {
563 self.ctx
564 .order_index
565 .get_order_symbol(wallet, order_id)
566 .map(|s| s.to_string())
567 }
568
569 fn lookup_by_client_id(
571 &self,
572 wallet: &WalletAddress,
573 client_id: &str,
574 ) -> (Option<u64>, Option<String>) {
575 if let Some((oid, sym)) = self
576 .ctx
577 .order_index
578 .get_order_by_client_id(wallet, client_id)
579 {
580 (Some(oid), Some(sym.to_string()))
581 } else {
582 (None, None)
583 }
584 }
585
586 pub(super) async fn process_cancel_order(
588 &mut self,
589 request: UnifiedEngineRequest,
590 timestamp: u64,
591 ) {
592 let is_mmp_triggered = request.message.mmp_triggered;
593
594 if is_mmp_triggered {
595 debug!(
596 "Processing MMP-triggered cancel order with client_id {:?}: {:?}",
597 request.message.info.client_id, request
598 );
599 } else {
600 debug!(
601 "Processing user cancel order with client_id {:?}: {:?}",
602 request.message.info.client_id, request
603 );
604 }
605
606 let order_info = &request.message.info;
607 let wallet = &request.message.wallet;
608
609 let (order_id_opt, symbol_opt) = if let Some(oid) = order_info.order_id {
611 let sym = self.lookup_symbol_by_order_id(wallet, oid);
612 (Some(oid), sym)
613 } else if let Some(ref client_id) = order_info.client_id {
614 if let Some(oid_str) = client_id.strip_prefix("oid:") {
615 if let Ok(oid) = oid_str.parse::<u64>() {
616 let sym = self.lookup_symbol_by_order_id(wallet, oid);
617 (Some(oid), sym)
618 } else {
619 (None, None)
620 }
621 } else {
622 self.lookup_by_client_id(wallet, client_id)
623 }
624 } else {
625 (None, None)
626 };
627
628 let (order_id, symbol) = match (order_id_opt, symbol_opt) {
630 (Some(oid), Some(sym)) => (oid, sym),
631 _ => {
632 error!(
633 "Cancel failed for wallet {}: order not found (order_id={:?}, client_id={:?}), order may have already been filled/canceled",
634 wallet, order_info.order_id, order_info.client_id
635 );
636 let reason = "This order has already been completed or cancelled".to_string();
637 self.send_rejection(request, reason, timestamp).await;
638 return;
639 }
640 };
641
642 info!(
643 "Processing cancel request for order_id={}, symbol={}, wallet={}",
644 order_id, symbol, wallet
645 );
646
647 if self
651 .expiry_manager
652 .is_instrument_expired(&symbol, &self.ctx.orderbooks)
653 {
654 debug!(
655 "Cancel request for expired instrument: {} (allowing cancel if orderbook exists)",
656 symbol
657 );
658 }
659
660 let orderbook = match self.ctx.orderbooks.get_mut(&symbol) {
662 Some(book) => book,
663 None => {
664 let reason = format!("Orderbook not found for symbol: {}", symbol);
665 error!("{}", reason);
666 self.send_rejection(request, reason, timestamp).await;
667 return;
668 }
669 };
670
671 match orderbook.cancel_order(order_id) {
677 Some(canceled_order) => {
678 info!(
679 "Successfully canceled order_id={}, symbol={}, price={}, quantity={}",
680 order_id, symbol, canceled_order.price, canceled_order.quantity
681 );
682
683 let next_l2_seq = self.ctx.l2_update_seq.fetch_add(1, Ordering::SeqCst) + 1;
685 orderbook.set_pending_l2_sequence(next_l2_seq);
686
687 orderbook.emit_orderbook_events(timestamp);
689
690 self.ctx.order_index.remove_order(wallet, order_id);
692
693 let remaining_quantity = canceled_order.quantity;
694
695 let canceled_order_info = OrderInfo {
696 symbol: symbol.clone(),
697 price: canceled_order.price,
698 size: remaining_quantity,
699 side: canceled_order.side,
700 tif: order_info.tif,
701 client_id: order_info.client_id.clone(),
702 order_id: Some(order_id),
703 is_perp: order_info.is_perp,
704 underlying: order_info.underlying.clone(),
705 reduce_only: order_info.reduce_only,
706 nonce: order_info.nonce,
707 signature: order_info.signature.clone(),
708 mmp_enabled: order_info.mmp_enabled,
709 builder_code_address: order_info.builder_code_address,
710 };
711
712 let reason_text = if is_mmp_triggered {
714 "Order canceled by MMP trigger".to_string()
715 } else {
716 "Order canceled successfully".to_string()
717 };
718
719 let response = OrderUpdateMessage {
720 timestamp,
721 info: canceled_order_info,
722 status: OrderUpdateStatus::Canceled,
723 reason: Some(reason_text),
724 filled_size: dec!(0),
725 order_id: Some(order_id),
726 wallet_address: *wallet,
727 mmp_triggered: is_mmp_triggered,
728 request_id: request.message.request_id.clone(),
729 };
730
731 let _ = self
733 .ctx
734 .deps
735 .event_sender
736 .send(EngineMessage::OrderUpdate(response.clone()));
737
738 let _ = request.response_tx.send(response).await;
740 }
741 None => {
742 let reason = format!(
744 "Order {} not found in orderbook {} (already filled or cancelled)",
745 order_id, symbol
746 );
747 warn!("{}", reason);
748 self.send_rejection(request, reason, timestamp).await;
749 }
750 }
751 }
752
753 pub(super) async fn process_replace_order(
759 &mut self,
760 request: UnifiedEngineRequest,
761 timestamp: u64,
762 ) {
763 let wallet = request.message.wallet;
764 let order_info = &request.message.info;
765
766 let cancel_order_id = match order_info.order_id {
767 Some(oid) => oid,
768 None => {
769 self.send_rejection(
770 request,
771 "Replace order missing order_id to cancel".to_string(),
772 timestamp,
773 )
774 .await;
775 return;
776 }
777 };
778
779 info!(
780 "Processing replace order: cancel order_id={}, new symbol={}, wallet={}",
781 cancel_order_id, order_info.symbol, wallet
782 );
783
784 let cancel_symbol = match self.lookup_symbol_by_order_id(&wallet, cancel_order_id) {
788 Some(sym) => sym,
789 None => {
790 error!(
791 "Replace failed: cancel target order_id={} not found for wallet {}",
792 cancel_order_id, wallet
793 );
794 self.send_rejection(
795 request,
796 "Cancel target order has already been completed or cancelled".to_string(),
797 timestamp,
798 )
799 .await;
800 return;
801 }
802 };
803
804 let orderbook = match self.ctx.orderbooks.get_mut(&cancel_symbol) {
806 Some(book) => book,
807 None => {
808 self.send_rejection(
809 request,
810 format!("Orderbook not found for symbol: {}", cancel_symbol),
811 timestamp,
812 )
813 .await;
814 return;
815 }
816 };
817
818 match orderbook.cancel_order(cancel_order_id) {
819 Some(canceled_order) => {
820 info!(
821 "Replace: canceled order_id={}, symbol={}, price={}, quantity={}",
822 cancel_order_id, cancel_symbol, canceled_order.price, canceled_order.quantity
823 );
824
825 let next_l2_seq = self.ctx.l2_update_seq.fetch_add(1, Ordering::SeqCst) + 1;
827 let orderbook = self
829 .ctx
830 .orderbooks
831 .get_mut(&cancel_symbol)
832 .expect("orderbook disappeared between cancel and event emission");
833 orderbook.set_pending_l2_sequence(next_l2_seq);
834 orderbook.emit_orderbook_events(timestamp);
835
836 self.ctx.order_index.remove_order(&wallet, cancel_order_id);
838
839 let canceled_order_info = OrderInfo {
841 symbol: cancel_symbol.clone(),
842 price: canceled_order.price,
843 size: canceled_order.quantity,
844 side: canceled_order.side,
845 tif: order_info.tif,
846 client_id: None,
847 order_id: Some(cancel_order_id),
848 is_perp: false,
849 underlying: None,
850 reduce_only: None,
851 nonce: None,
852 signature: None,
853 mmp_enabled: false,
854 builder_code_address: None,
855 };
856
857 let cancel_response = OrderUpdateMessage {
858 timestamp,
859 info: canceled_order_info,
860 status: OrderUpdateStatus::Canceled,
861 reason: Some("Order canceled by replace".to_string()),
862 filled_size: dec!(0),
863 order_id: Some(cancel_order_id),
864 wallet_address: wallet,
865 mmp_triggered: false,
866 request_id: request.message.request_id.clone(),
867 };
868
869 let _ = self
871 .ctx
872 .deps
873 .event_sender
874 .send(EngineMessage::OrderUpdate(cancel_response));
875 }
876 None => {
877 let reason = format!(
879 "Replace failed: order {} not found in orderbook {} (already filled or cancelled)",
880 cancel_order_id, cancel_symbol
881 );
882 warn!("{}", reason);
883 self.send_rejection(request, reason, timestamp).await;
884 return;
885 }
886 }
887
888 let mut create_info = request.message.info.clone();
891 create_info.order_id = None; let create_message = OrderActionMessage {
894 timestamp: request.message.timestamp,
895 info: create_info,
896 action: OrderAction::CreateOrder,
897 wallet: request.message.wallet,
898 api_wallet_address: request.message.api_wallet_address,
899 mmp_triggered: false,
900 request_id: request.message.request_id.clone(),
901 };
902
903 let create_request = UnifiedEngineRequest {
904 message: create_message,
905 response_tx: request.response_tx,
906 enqueued_at: request.enqueued_at,
907 #[cfg(feature = "otel-tracing")]
908 trace_context: request.trace_context,
909 };
910
911 self.handle_create_order(create_request, timestamp).await;
913 }
914
915 pub(super) async fn process_perp_order(
917 &mut self,
918 request: UnifiedEngineRequest,
919 timestamp: u64,
920 ) {
921 debug!(
922 "Processing perp order with client_id {:?}: {:?}",
923 request.message.info.client_id, request
924 );
925 let order_info = &request.message.info;
926 let wallet = &request.message.wallet;
927
928 info!(
929 "🔍 RSM: Starting perp order margin validation for wallet: {}",
930 wallet
931 );
932
933 match self.margin_manager.get_risk_account(
936 &self.ctx.deps,
937 &self.ctx.engine_positions,
938 &self.ctx.balance_ledger,
939 wallet,
940 ) {
941 Ok(account) => {
942 if account.cash <= 0.0 {
943 warn!(
944 "Perp order rejected for wallet {} - account has no funds (cash: {})",
945 wallet, account.cash
946 );
947 self.send_rejection(
948 request,
949 "Account has no funds. Please deposit before trading.".to_string(),
950 timestamp,
951 )
952 .await;
953 return;
954 }
955 }
956 Err(e) => {
957 warn!("Perp order rejected for wallet {} - {}", wallet, e);
958 self.send_rejection(request, e, timestamp).await;
959 return;
960 }
961 }
962
963 let underlying = match &order_info.underlying {
965 Some(u) => u.clone(),
966 None => {
967 error!("❌ RSM: Perp order missing underlying symbol");
968 self.send_rejection(
969 request,
970 "Perp order missing underlying symbol".to_string(),
971 timestamp,
972 )
973 .await;
974 return;
975 }
976 };
977
978 info!(
979 "📊 RSM: Perp order details - underlying: {}, price: {}, size: {}, side: {:?}",
980 underlying, order_info.price, order_info.size, order_info.side
981 );
982
983 debug!("🧮 RSM: Running unified margin check for perp order");
985 let margin_check_result = self.margin_manager.check_margin_for_order(
986 &self.ctx.deps,
987 &self.ctx.engine_positions,
988 &self.ctx.balance_ledger,
989 wallet,
990 order_info,
991 &self.ctx.order_index,
992 );
993
994 if let Err(margin_error) = margin_check_result {
995 warn!(
996 "⚠️ RSM: Perp order rejected due to insufficient margin: {}",
997 margin_error
998 );
999 self.send_rejection(request, margin_error, timestamp).await;
1000 return;
1001 }
1002
1003 info!(
1004 "✅ RSM: Perp order passed margin validation - wallet: {}, underlying: {}",
1005 wallet, underlying
1006 );
1007
1008 let response = OrderUpdateMessage {
1010 timestamp,
1011 info: order_info.clone(),
1012 status: OrderUpdateStatus::Acked,
1013 reason: Some("Margin validation passed - order queued for execution".to_string()),
1014 filled_size: dec!(0),
1015 order_id: None,
1016 wallet_address: *wallet,
1017 mmp_triggered: false,
1018 request_id: request.message.request_id.clone(),
1019 };
1020
1021 let _ = self
1023 .ctx
1024 .deps
1025 .event_sender
1026 .send(EngineMessage::OrderUpdate(response.clone()));
1027
1028 let _ = request.response_tx.send(response).await;
1030 }
1031
1032 pub(super) async fn send_rejection(
1033 &self,
1034 request: UnifiedEngineRequest,
1035 reason: String,
1036 timestamp: u64,
1037 ) {
1038 debug!(
1039 "Sending rejection response for client_id {:?}: {:?}",
1040 request.message.info.client_id, reason
1041 );
1042
1043 let reason_label = crate::rsm::engine_deps::classify_rejection_reason(&reason);
1044 counter!("ht_engine_rejections_total", "reason" => reason_label).increment(1);
1045
1046 let response = OrderUpdateMessage {
1047 timestamp,
1048 info: request.message.info.clone(),
1049 status: OrderUpdateStatus::Rejected,
1050 reason: Some(reason),
1051 filled_size: dec!(0),
1052 order_id: None,
1053 wallet_address: request.message.wallet,
1054 mmp_triggered: false,
1055 request_id: request.message.request_id.clone(),
1056 };
1057
1058 self.ctx
1059 .deps
1060 .event_sender
1061 .send(EngineMessage::OrderUpdate(response.clone()))
1062 .unwrap_or_else(|e| {
1063 panic!(
1064 "CRITICAL_FAILURE: Failed to send OrderUpdate (Rejected): {}. \
1065 Event bus is dead. Restart required.",
1066 e
1067 )
1068 });
1069 let _ = request.response_tx.send(response).await;
1070 }
1071
1072 pub(super) async fn handle_mmp_trigger(
1073 &mut self,
1074 wallet: &WalletAddress,
1075 underlying_symbol: &str,
1076 trigger_reason: String,
1077 timestamp: u64,
1078 ) {
1079 warn!(
1080 "MMP TRIGGERED: wallet={}, underlying={}, reason={}",
1081 wallet, underlying_symbol, trigger_reason
1082 );
1083
1084 let mmp_event = hypercall_types::MmpTriggeredMessage {
1085 wallet: *wallet,
1086 currency: underlying_symbol.to_string(),
1087 reason: trigger_reason.clone(),
1088 timestamp,
1089 };
1090 self.ctx
1091 .deps
1092 .event_sender
1093 .send(EngineMessage::MmpTriggered(mmp_event))
1094 .unwrap_or_else(|e| {
1095 panic!(
1096 "CRITICAL_FAILURE: Failed to send MmpTriggered event for wallet {}: {}. \
1097 Event bus is dead. Restart required.",
1098 wallet, e
1099 )
1100 });
1101
1102 let mmp_order_ids = self
1103 .ctx
1104 .order_index
1105 .get_mmp_order_ids(wallet, underlying_symbol);
1106
1107 info!(
1108 "MMP trigger: Found {} MMP-enabled orders for wallet {} and underlying {}",
1109 mmp_order_ids.len(),
1110 wallet,
1111 underlying_symbol
1112 );
1113
1114 for (order_id, symbol) in mmp_order_ids {
1115 info!(
1116 "MMP auto-cancel: order_id={}, symbol={}, wallet={}, underlying={}",
1117 order_id, symbol, wallet, underlying_symbol
1118 );
1119
1120 let cancel_info = OrderInfo {
1121 symbol,
1122 price: dec!(0),
1123 size: dec!(0),
1124 side: Side::Buy,
1125 tif: TimeInForce::GTC,
1126 client_id: None,
1127 order_id: Some(order_id),
1128 is_perp: false,
1129 underlying: None,
1130 reduce_only: None,
1131 nonce: None,
1132 signature: None,
1133 mmp_enabled: true,
1134 builder_code_address: None,
1135 };
1136
1137 let (response_tx, _response_rx) = mpsc::channel(1);
1138 let deterministic_request_id = format!("mmp:{}:{}:{}", timestamp, wallet, order_id);
1139 let cancel_request = UnifiedEngineRequest {
1140 message: OrderActionMessage {
1141 timestamp,
1142 info: cancel_info,
1143 action: OrderAction::CancelOrder,
1144 wallet: *wallet,
1145 api_wallet_address: None,
1146 mmp_triggered: true,
1147 request_id: Some(deterministic_request_id),
1148 },
1149 response_tx,
1150 enqueued_at: Instant::now(),
1151 #[cfg(feature = "otel-tracing")]
1152 trace_context: None,
1153 };
1154
1155 self.mmp_cancel_order(cancel_request, timestamp).await;
1156 }
1157 }
1158
1159 async fn mmp_cancel_order(&mut self, request: UnifiedEngineRequest, timestamp: u64) {
1160 let order_info = &request.message.info;
1161 let wallet = &request.message.wallet;
1162
1163 let (order_id_opt, symbol_opt) = if let Some(oid) = order_info.order_id {
1164 let sym = self
1165 .ctx
1166 .order_index
1167 .get_order_symbol(wallet, oid)
1168 .map(|s| s.to_string());
1169 (Some(oid), sym)
1170 } else if let Some(ref client_id) = order_info.client_id {
1171 if let Some(oid_str) = client_id.strip_prefix("oid:") {
1172 if let Ok(oid) = oid_str.parse::<u64>() {
1173 let sym = self
1174 .ctx
1175 .order_index
1176 .get_order_symbol(wallet, oid)
1177 .map(|s| s.to_string());
1178 (Some(oid), sym)
1179 } else {
1180 (None, None)
1181 }
1182 } else if let Some((oid, sym)) = self
1183 .ctx
1184 .order_index
1185 .get_order_by_client_id(wallet, client_id)
1186 {
1187 (Some(oid), Some(sym.to_string()))
1188 } else {
1189 (None, None)
1190 }
1191 } else {
1192 (None, None)
1193 };
1194
1195 let (order_id, symbol) = match (order_id_opt, symbol_opt) {
1196 (Some(oid), Some(sym)) => (oid, sym),
1197 _ => {
1198 error!(
1199 "Cancel failed for wallet {}: order not found (order_id={:?}, client_id={:?})",
1200 wallet, order_info.order_id, order_info.client_id
1201 );
1202 let reason = "This order has already been completed or cancelled".to_string();
1203 self.send_rejection(request, reason, timestamp).await;
1204 return;
1205 }
1206 };
1207
1208 let is_mmp_triggered = request.message.mmp_triggered;
1209
1210 let orderbook = match self.ctx.orderbooks.get_mut(&symbol) {
1211 Some(book) => book,
1212 None => {
1213 let reason = format!("Orderbook not found for symbol: {}", symbol);
1214 error!("{}", reason);
1215 self.send_rejection(request, reason, timestamp).await;
1216 return;
1217 }
1218 };
1219
1220 match orderbook.cancel_order(order_id) {
1221 Some(canceled_order) => {
1222 let next_l2_seq = self.ctx.l2_update_seq.fetch_add(1, Ordering::SeqCst) + 1;
1223 orderbook.set_pending_l2_sequence(next_l2_seq);
1224
1225 info!(
1226 "Successfully canceled order_id={}, symbol={}, price={}, quantity={}",
1227 order_id, symbol, canceled_order.price, canceled_order.quantity
1228 );
1229
1230 orderbook.emit_orderbook_events(timestamp);
1231 self.ctx.order_index.remove_order(wallet, order_id);
1232
1233 let canceled_order_info = OrderInfo {
1234 symbol: symbol.clone(),
1235 price: canceled_order.price,
1236 size: canceled_order.quantity,
1237 side: canceled_order.side,
1238 tif: order_info.tif,
1239 client_id: order_info.client_id.clone(),
1240 order_id: Some(order_id),
1241 is_perp: order_info.is_perp,
1242 underlying: order_info.underlying.clone(),
1243 reduce_only: order_info.reduce_only,
1244 nonce: order_info.nonce,
1245 signature: order_info.signature.clone(),
1246 mmp_enabled: order_info.mmp_enabled,
1247 builder_code_address: order_info.builder_code_address,
1248 };
1249
1250 let reason_text = if is_mmp_triggered {
1251 "Order canceled by MMP trigger".to_string()
1252 } else {
1253 "Order canceled successfully".to_string()
1254 };
1255
1256 let response = OrderUpdateMessage {
1257 timestamp,
1258 info: canceled_order_info,
1259 status: OrderUpdateStatus::Canceled,
1260 reason: Some(reason_text),
1261 filled_size: dec!(0),
1262 order_id: Some(order_id),
1263 wallet_address: *wallet,
1264 mmp_triggered: is_mmp_triggered,
1265 request_id: request.message.request_id.clone(),
1266 };
1267
1268 self.ctx
1269 .deps
1270 .event_sender
1271 .send(EngineMessage::OrderUpdate(response.clone()))
1272 .unwrap_or_else(|e| {
1273 panic!(
1274 "CRITICAL_FAILURE: Failed to send OrderUpdate (Canceled) for order {}: {}. \
1275 Event bus is dead. Restart required.",
1276 order_id, e
1277 )
1278 });
1279
1280 let _ = request.response_tx.send(response).await;
1281 }
1282 None => {
1283 let reason = format!(
1284 "Order {} not found in orderbook {} (already filled or cancelled)",
1285 order_id, symbol
1286 );
1287 warn!("{}", reason);
1288 self.send_rejection(request, reason, timestamp).await;
1289 }
1290 }
1291 }
1292
1293 pub(super) async fn handle_self_trade_prevention(
1294 &mut self,
1295 request: &UnifiedEngineRequest,
1296 taker_order_id: u64,
1297 maker_order_id: u64,
1298 order_info: &OrderInfo,
1299 wallet: &WalletAddress,
1300 partial_fills: &[Fill],
1301 timestamp: u64,
1302 ) {
1303 let symbol = &order_info.symbol;
1304 let filled_size: Decimal = partial_fills.iter().map(|f| f.size).sum();
1305
1306 info!(
1307 "Self-trade prevention: maker={}, taker={}, symbol={}, partial_fills={}",
1308 maker_order_id,
1309 taker_order_id,
1310 symbol,
1311 partial_fills.len()
1312 );
1313
1314 if let Some(orderbook) = self.ctx.orderbooks.get_mut(symbol) {
1315 if let Some(canceled) = orderbook.cancel_order(maker_order_id) {
1316 let next_l2_seq = self.ctx.l2_update_seq.fetch_add(1, Ordering::SeqCst) + 1;
1317 orderbook.set_pending_l2_sequence(next_l2_seq);
1318 orderbook.emit_orderbook_events(timestamp);
1319 self.ctx.order_index.remove_order(wallet, maker_order_id);
1320
1321 let maker_response = OrderUpdateMessage {
1322 timestamp,
1323 info: OrderInfo {
1324 symbol: symbol.clone(),
1325 price: canceled.price,
1326 size: canceled.quantity,
1327 side: canceled.side,
1328 tif: TimeInForce::GTC,
1329 client_id: None,
1330 order_id: Some(maker_order_id),
1331 is_perp: order_info.is_perp,
1332 underlying: order_info.underlying.clone(),
1333 reduce_only: None,
1334 nonce: None,
1335 signature: None,
1336 mmp_enabled: false,
1337 builder_code_address: None,
1338 },
1339 status: OrderUpdateStatus::Canceled,
1340 reason: Some("Self-trade prevention".to_string()),
1341 filled_size: dec!(0),
1342 order_id: Some(maker_order_id),
1343 wallet_address: *wallet,
1344 mmp_triggered: false,
1345 request_id: None,
1346 };
1347
1348 self.ctx
1349 .deps
1350 .event_sender
1351 .send(EngineMessage::OrderUpdate(maker_response.clone()))
1352 .unwrap_or_else(|e| {
1353 panic!(
1354 "CRITICAL_FAILURE: Failed to send OrderUpdate (STP maker cancel) for order {}: {}. \
1355 Event bus is dead. Restart required.",
1356 maker_order_id, e
1357 )
1358 });
1359 if let Some(ref ws_tx) = self.ctx.deps.ws_event_sender {
1360 let _ = ws_tx.send(EngineMessage::OrderUpdate(maker_response));
1361 }
1362 }
1363 }
1364
1365 self.ctx.order_index.remove_order(wallet, taker_order_id);
1366
1367 counter!("ht_engine_self_trade_prevented_total").increment(1);
1368
1369 let (status, reason) = if filled_size > dec!(0) {
1370 (
1371 OrderUpdateStatus::Canceled,
1372 "Self-trade prevention (partial fill kept)",
1373 )
1374 } else {
1375 (OrderUpdateStatus::Rejected, "Self-trade prevention")
1376 };
1377
1378 let taker_response = OrderUpdateMessage {
1379 timestamp,
1380 info: order_info.clone(),
1381 status,
1382 reason: Some(reason.to_string()),
1383 filled_size,
1384 order_id: Some(taker_order_id),
1385 wallet_address: *wallet,
1386 mmp_triggered: false,
1387 request_id: request.message.request_id.clone(),
1388 };
1389
1390 self.ctx
1391 .deps
1392 .event_sender
1393 .send(EngineMessage::OrderUpdate(taker_response.clone()))
1394 .unwrap_or_else(|e| {
1395 panic!(
1396 "CRITICAL_FAILURE: Failed to send OrderUpdate (STP taker) for order {}: {}. \
1397 Event bus is dead. Restart required.",
1398 taker_order_id, e
1399 )
1400 });
1401 if let Some(ref ws_tx) = self.ctx.deps.ws_event_sender {
1402 let _ = ws_tx.send(EngineMessage::OrderUpdate(taker_response.clone()));
1403 }
1404
1405 let _ = request.response_tx.send(taker_response).await;
1406 }
1407}