Skip to main content

hypercall/rsm/unified_engine/
order_routing.rs

1//! Order routing and order-type specific request handling for `UnifiedEngine`.
2
3use super::*;
4
5impl UnifiedEngine {
6    pub(super) fn apply_order_action(
7        &mut self,
8        message: OrderActionMessage,
9        timestamp: u64,
10        output: &mut crate::rsm::apply::ApplyOutput,
11    ) {
12        self.ctx.deps.margin_timestamp_s = (timestamp / 1000) as i64;
13        output.push(EngineMessage::OrderAction(message.clone()));
14        let action_label = match message.action {
15            OrderAction::CreateOrder => "create",
16            OrderAction::CancelOrder => "cancel",
17            OrderAction::ReplaceOrder => "replace",
18        };
19        counter!("ht_engine_orders_total", "action" => action_label).increment(1);
20
21        match message.action {
22            OrderAction::CreateOrder => {
23                if message.info.is_perp {
24                    self.apply_perp_order(message, timestamp, output);
25                } else {
26                    self.apply_create_order(message, timestamp, output);
27                }
28            }
29            OrderAction::CancelOrder => self.apply_cancel_order(message, timestamp, output),
30            OrderAction::ReplaceOrder => self.apply_replace_order(message, timestamp, output),
31        }
32    }
33
34    fn apply_create_order(
35        &mut self,
36        message: OrderActionMessage,
37        timestamp: u64,
38        output: &mut crate::rsm::apply::ApplyOutput,
39    ) {
40        let order_info = message.info.clone();
41        let wallet = message.wallet;
42
43        if let Err(reason) = self.validate_order(&order_info, &wallet) {
44            self.push_order_rejection(message, reason, timestamp, output);
45            return;
46        }
47        if let Err(reason) = self.check_order_restrictions(&order_info, &wallet) {
48            self.push_order_rejection(message, reason, timestamp, output);
49            return;
50        }
51
52        let order_id =
53            self.allocate_order_output(&message, &order_info, &wallet, timestamp, output);
54        let result = self.execute_matching_sync(order_id, &order_info, &wallet, timestamp, output);
55
56        if let Some(maker_id) = result.self_trade_maker_id {
57            self.handle_self_trade_prevention_output(
58                &message,
59                order_id,
60                maker_id,
61                &order_info,
62                &wallet,
63                &result.fills,
64                timestamp,
65                output,
66            );
67            self.flush_orderbook_events(output);
68            return;
69        }
70
71        self.finalize_order_output(
72            &message,
73            order_id,
74            &order_info,
75            &wallet,
76            timestamp,
77            &result,
78            output,
79        );
80        self.flush_orderbook_events(output);
81    }
82
83    fn apply_perp_order(
84        &mut self,
85        message: OrderActionMessage,
86        timestamp: u64,
87        output: &mut crate::rsm::apply::ApplyOutput,
88    ) {
89        let order_info = &message.info;
90        let wallet = &message.wallet;
91
92        match self.margin_manager.get_risk_account(
93            &self.ctx.deps,
94            &self.ctx.engine_positions,
95            &self.ctx.balance_ledger,
96            wallet,
97        ) {
98            Ok(account) if account.cash <= 0.0 => {
99                self.push_order_rejection(
100                    message,
101                    "Account has no funds. Please deposit before trading.".to_string(),
102                    timestamp,
103                    output,
104                );
105                return;
106            }
107            Err(reason) => {
108                self.push_order_rejection(message, reason, timestamp, output);
109                return;
110            }
111            Ok(_) => {}
112        }
113
114        if order_info.underlying.is_none() {
115            self.push_order_rejection(
116                message,
117                "Perp order missing underlying symbol".to_string(),
118                timestamp,
119                output,
120            );
121            return;
122        }
123
124        if let Err(reason) = self.margin_manager.check_margin_for_order(
125            &self.ctx.deps,
126            &self.ctx.engine_positions,
127            &self.ctx.balance_ledger,
128            wallet,
129            order_info,
130            &self.ctx.order_index,
131        ) {
132            self.push_order_rejection(message, reason, timestamp, output);
133            return;
134        }
135
136        let response = OrderUpdateMessage {
137            timestamp,
138            info: order_info.clone(),
139            status: OrderUpdateStatus::Acked,
140            reason: Some("Margin validation passed - order queued for execution".to_string()),
141            filled_size: dec!(0),
142            order_id: None,
143            wallet_address: *wallet,
144            mmp_triggered: false,
145            request_id: message.request_id.clone(),
146        };
147        output.push(EngineMessage::OrderUpdate(response.clone()));
148        output.order_responses.push(response);
149    }
150
151    fn apply_cancel_order(
152        &mut self,
153        message: OrderActionMessage,
154        timestamp: u64,
155        output: &mut crate::rsm::apply::ApplyOutput,
156    ) {
157        self.cancel_order_output(message, timestamp, output, true);
158        self.flush_orderbook_events(output);
159    }
160
161    fn apply_replace_order(
162        &mut self,
163        message: OrderActionMessage,
164        timestamp: u64,
165        output: &mut crate::rsm::apply::ApplyOutput,
166    ) {
167        let Some(cancel_order_id) = message.info.order_id else {
168            self.push_order_rejection(
169                message,
170                "Replace order missing order_id to cancel".to_string(),
171                timestamp,
172                output,
173            );
174            return;
175        };
176        let wallet = message.wallet;
177        let Some(cancel_symbol) = self.lookup_symbol_by_order_id(&wallet, cancel_order_id) else {
178            self.push_order_rejection(
179                message,
180                "Cancel target order has already been completed or cancelled".to_string(),
181                timestamp,
182                output,
183            );
184            return;
185        };
186
187        let Some(orderbook) = self.ctx.orderbooks.get_mut(&cancel_symbol) else {
188            self.push_order_rejection(
189                message,
190                format!("Orderbook not found for symbol: {}", cancel_symbol),
191                timestamp,
192                output,
193            );
194            return;
195        };
196
197        match orderbook.cancel_order(cancel_order_id) {
198            Some(canceled_order) => {
199                let next_l2_seq = self.ctx.l2_update_seq.fetch_add(1, Ordering::SeqCst) + 1;
200                orderbook.set_pending_l2_sequence(next_l2_seq);
201                orderbook.emit_orderbook_events(timestamp);
202                self.ctx.order_index.remove_order(&wallet, cancel_order_id);
203
204                let cancel_response = OrderUpdateMessage {
205                    timestamp,
206                    info: OrderInfo {
207                        symbol: cancel_symbol,
208                        price: canceled_order.price,
209                        size: canceled_order.quantity,
210                        side: canceled_order.side,
211                        tif: message.info.tif,
212                        client_id: None,
213                        order_id: Some(cancel_order_id),
214                        is_perp: false,
215                        underlying: None,
216                        reduce_only: None,
217                        nonce: None,
218                        signature: None,
219                        mmp_enabled: false,
220                        builder_code_address: None,
221                    },
222                    status: OrderUpdateStatus::Canceled,
223                    reason: Some("Order canceled by replace".to_string()),
224                    filled_size: dec!(0),
225                    order_id: Some(cancel_order_id),
226                    wallet_address: wallet,
227                    mmp_triggered: false,
228                    request_id: message.request_id.clone(),
229                };
230                output.push(EngineMessage::OrderUpdate(cancel_response));
231            }
232            None => {
233                self.push_order_rejection(
234                    message,
235                    format!(
236                        "Replace failed: order {} not found in orderbook {} (already filled or cancelled)",
237                        cancel_order_id, cancel_symbol
238                    ),
239                    timestamp,
240                    output,
241                );
242                return;
243            }
244        }
245
246        let mut create_info = message.info.clone();
247        create_info.order_id = None;
248        let create_message = OrderActionMessage {
249            timestamp: message.timestamp,
250            info: create_info,
251            action: OrderAction::CreateOrder,
252            wallet: message.wallet,
253            api_wallet_address: message.api_wallet_address,
254            mmp_triggered: false,
255            request_id: message.request_id,
256        };
257        self.apply_create_order(create_message, timestamp, output);
258        self.flush_orderbook_events(output);
259    }
260
261    fn push_order_rejection(
262        &mut self,
263        message: OrderActionMessage,
264        reason: String,
265        timestamp: u64,
266        output: &mut crate::rsm::apply::ApplyOutput,
267    ) {
268        self.push_order_rejection_with_response(message, reason, timestamp, output, true);
269    }
270
271    fn push_order_rejection_with_response(
272        &mut self,
273        message: OrderActionMessage,
274        reason: String,
275        timestamp: u64,
276        output: &mut crate::rsm::apply::ApplyOutput,
277        include_response: bool,
278    ) {
279        let response = OrderUpdateMessage {
280            timestamp,
281            info: message.info,
282            status: OrderUpdateStatus::Rejected,
283            reason: Some(reason),
284            filled_size: dec!(0),
285            order_id: None,
286            wallet_address: message.wallet,
287            mmp_triggered: false,
288            request_id: message.request_id,
289        };
290        output.push(EngineMessage::OrderUpdate(response.clone()));
291        if include_response {
292            output.order_responses.push(response);
293        }
294    }
295
296    pub(super) fn cancel_order_output(
297        &mut self,
298        message: OrderActionMessage,
299        timestamp: u64,
300        output: &mut crate::rsm::apply::ApplyOutput,
301        include_response: bool,
302    ) {
303        let order_info = &message.info;
304        let wallet = &message.wallet;
305        let (order_id_opt, symbol_opt) = if let Some(oid) = order_info.order_id {
306            (Some(oid), self.lookup_symbol_by_order_id(wallet, oid))
307        } else if let Some(ref client_id) = order_info.client_id {
308            if let Some(oid_str) = client_id.strip_prefix("oid:") {
309                if let Ok(oid) = oid_str.parse::<u64>() {
310                    (Some(oid), self.lookup_symbol_by_order_id(wallet, oid))
311                } else {
312                    (None, None)
313                }
314            } else {
315                self.lookup_by_client_id(wallet, client_id)
316            }
317        } else {
318            (None, None)
319        };
320
321        let (order_id, symbol) = match (order_id_opt, symbol_opt) {
322            (Some(oid), Some(sym)) => (oid, sym),
323            _ => {
324                self.push_order_rejection_with_response(
325                    message,
326                    "This order has already been completed or cancelled".to_string(),
327                    timestamp,
328                    output,
329                    include_response,
330                );
331                return;
332            }
333        };
334
335        let Some(orderbook) = self.ctx.orderbooks.get_mut(&symbol) else {
336            self.push_order_rejection_with_response(
337                message,
338                format!("Orderbook not found for symbol: {}", symbol),
339                timestamp,
340                output,
341                include_response,
342            );
343            return;
344        };
345
346        match orderbook.cancel_order(order_id) {
347            Some(canceled_order) => {
348                let next_l2_seq = self.ctx.l2_update_seq.fetch_add(1, Ordering::SeqCst) + 1;
349                orderbook.set_pending_l2_sequence(next_l2_seq);
350                orderbook.emit_orderbook_events(timestamp);
351                self.ctx.order_index.remove_order(wallet, order_id);
352
353                let reason_text = if message.mmp_triggered {
354                    "Order canceled by MMP trigger".to_string()
355                } else {
356                    "Order canceled successfully".to_string()
357                };
358                let response = OrderUpdateMessage {
359                    timestamp,
360                    info: OrderInfo {
361                        symbol,
362                        price: canceled_order.price,
363                        size: canceled_order.quantity,
364                        side: canceled_order.side,
365                        tif: order_info.tif,
366                        client_id: order_info.client_id.clone(),
367                        order_id: Some(order_id),
368                        is_perp: order_info.is_perp,
369                        underlying: order_info.underlying.clone(),
370                        reduce_only: order_info.reduce_only,
371                        nonce: order_info.nonce,
372                        signature: order_info.signature.clone(),
373                        mmp_enabled: order_info.mmp_enabled,
374                        builder_code_address: order_info.builder_code_address,
375                    },
376                    status: OrderUpdateStatus::Canceled,
377                    reason: Some(reason_text),
378                    filled_size: dec!(0),
379                    order_id: Some(order_id),
380                    wallet_address: *wallet,
381                    mmp_triggered: message.mmp_triggered,
382                    request_id: message.request_id.clone(),
383                };
384                output.push(EngineMessage::OrderUpdate(response.clone()));
385                if include_response {
386                    output.order_responses.push(response);
387                }
388            }
389            None => {
390                self.push_order_rejection_with_response(
391                    message,
392                    format!(
393                        "Order {} not found in orderbook {} (already filled or cancelled)",
394                        order_id, symbol
395                    ),
396                    timestamp,
397                    output,
398                    include_response,
399                );
400            }
401        }
402    }
403
404    /// Handle CREATE_ORDER action - orchestrates validation, matching, and finalization.
405    /// Also used as phase 2 of ReplaceOrder (after the cancel phase).
406    async fn handle_create_order(&mut self, request: UnifiedEngineRequest, timestamp: u64) {
407        let start = Instant::now();
408        // Track whether this is a standalone create or part of a replace
409        // to avoid double-counting metrics (process_order already records "replace")
410        let is_standalone_create = request.message.action == OrderAction::CreateOrder;
411        let order_info = request.message.info.clone();
412        let wallet = request.message.wallet;
413
414        // Validate
415        if let Err(reason) = OrderAdmission::validate_order(self, &order_info, &wallet) {
416            self.send_rejection(request, reason, timestamp).await;
417            return;
418        }
419
420        // Check restrictions
421        if let Err(reason) = OrderAdmission::check_order_restrictions(self, &order_info, &wallet) {
422            self.send_rejection(request, reason, timestamp).await;
423            return;
424        }
425
426        // Allocate and ack
427        let allocated =
428            OrderExecution::allocate_and_ack(self, &request, &order_info, &wallet, timestamp).await;
429
430        // Execute matching
431        let result = OrderExecution::execute_matching(
432            self,
433            allocated.order_id,
434            &order_info,
435            &wallet,
436            timestamp,
437        )
438        .await;
439
440        // Handle self-trade if detected
441        if let Some(maker_id) = result.self_trade_maker_id {
442            self.handle_self_trade_prevention(
443                &request,
444                allocated.order_id,
445                maker_id,
446                &order_info,
447                &wallet,
448                &result.fills,
449                timestamp,
450            )
451            .await;
452            return;
453        }
454
455        // Finalize
456        OrderExecution::finalize_order(
457            self,
458            &request,
459            allocated.order_id,
460            &order_info,
461            &wallet,
462            timestamp,
463            &result,
464        )
465        .await;
466
467        if is_standalone_create {
468            histogram!("ht_engine_order_processing_seconds", "action" => "create")
469                .record(start.elapsed().as_secs_f64());
470        }
471    }
472
473    /// Process a single order with SPAN calculations
474    pub(super) async fn process_order(&mut self, request: UnifiedEngineRequest) {
475        // Restore parent trace context from the API handler
476        #[cfg(feature = "otel-tracing")]
477        if let Some(ref parent_cx) = request.trace_context {
478            tracing::Span::current().set_parent(parent_cx.clone());
479        }
480
481        let start = Instant::now();
482
483        // Record queue wait time (time from enqueue to processing start)
484        let queue_wait = request.enqueued_at.elapsed().as_secs_f64();
485        histogram!("ht_engine_queue_wait_seconds").record(queue_wait);
486
487        // Decrement pending counter now that we're processing
488        decrement_pending_requests();
489
490        // Export current pending requests as gauge
491        gauge!("ht_engine_pending_requests").set(get_pending_requests() as f64);
492
493        debug!(
494            "Processing order with client_id {:?}: {:?} (queue_wait: {:.3}s)",
495            request.message.info.client_id, request, queue_wait
496        );
497        // Use command timestamp for determinism (not wall-clock time)
498        let timestamp = request.message.timestamp;
499        self.ctx.deps.margin_timestamp_s = (timestamp / 1000) as i64;
500
501        // Hash chain is advanced by apply() — process_order is the inner
502        // execution method and must not double-advance.
503
504        let order_info = &request.message.info;
505        let wallet = &request.message.wallet;
506
507        // Record order received metric
508        let action_label = match request.message.action {
509            OrderAction::CreateOrder => "create",
510            OrderAction::CancelOrder => "cancel",
511            OrderAction::ReplaceOrder => "replace",
512        };
513        counter!("ht_engine_orders_total", "action" => action_label).increment(1);
514
515        // First, send the OrderAction event (the request)
516        let _ = self
517            .ctx
518            .deps
519            .event_sender
520            .send(EngineMessage::OrderAction(request.message.clone()));
521
522        // Handle CANCEL_ORDER actions
523        if request.message.action == OrderAction::CancelOrder {
524            self.process_cancel_order(request, timestamp).await;
525            histogram!("ht_engine_order_processing_seconds", "action" => "cancel")
526                .record(start.elapsed().as_secs_f64());
527            return;
528        }
529
530        // Handle REPLACE_ORDER actions (atomic cancel + create)
531        if request.message.action == OrderAction::ReplaceOrder {
532            self.process_replace_order(request, timestamp).await;
533            histogram!("ht_engine_order_processing_seconds", "action" => "replace")
534                .record(start.elapsed().as_secs_f64());
535            return;
536        }
537
538        // Only process CREATE_ORDER actions beyond this point
539        if request.message.action != OrderAction::CreateOrder {
540            warn!(
541                "Unsupported order action for client_id {:?}: {:?}",
542                request.message.info.client_id, request.message.action
543            );
544            return;
545        }
546
547        // Route perp orders separately
548        if order_info.is_perp {
549            info!("🔄 RSM: Processing PERP order for wallet: {}, underlying: {:?}, price: {}, size: {}, side: {:?}",
550                wallet, order_info.underlying, order_info.price, order_info.size, order_info.side);
551            self.process_perp_order(request, timestamp).await;
552            histogram!("ht_engine_order_processing_seconds", "action" => "create")
553                .record(start.elapsed().as_secs_f64());
554            return;
555        }
556
557        // Handle option orders via orchestrator
558        self.handle_create_order(request, timestamp).await;
559    }
560
561    /// Look up a symbol for an order_id using the in-process order index.
562    fn lookup_symbol_by_order_id(&self, wallet: &WalletAddress, order_id: u64) -> Option<String> {
563        self.ctx
564            .order_index
565            .get_order_symbol(wallet, order_id)
566            .map(|s| s.to_string())
567    }
568
569    /// Look up (order_id, symbol) by client_id using the in-process order index.
570    fn lookup_by_client_id(
571        &self,
572        wallet: &WalletAddress,
573        client_id: &str,
574    ) -> (Option<u64>, Option<String>) {
575        if let Some((oid, sym)) = self
576            .ctx
577            .order_index
578            .get_order_by_client_id(wallet, client_id)
579        {
580            (Some(oid), Some(sym.to_string()))
581        } else {
582            (None, None)
583        }
584    }
585
586    /// Process a cancel order request
587    pub(super) async fn process_cancel_order(
588        &mut self,
589        request: UnifiedEngineRequest,
590        timestamp: u64,
591    ) {
592        let is_mmp_triggered = request.message.mmp_triggered;
593
594        if is_mmp_triggered {
595            debug!(
596                "Processing MMP-triggered cancel order with client_id {:?}: {:?}",
597                request.message.info.client_id, request
598            );
599        } else {
600            debug!(
601                "Processing user cancel order with client_id {:?}: {:?}",
602                request.message.info.client_id, request
603            );
604        }
605
606        let order_info = &request.message.info;
607        let wallet = &request.message.wallet;
608
609        // Check order_id field first, then fall back to client_id lookup
610        let (order_id_opt, symbol_opt) = if let Some(oid) = order_info.order_id {
611            let sym = self.lookup_symbol_by_order_id(wallet, oid);
612            (Some(oid), sym)
613        } else if let Some(ref client_id) = order_info.client_id {
614            if let Some(oid_str) = client_id.strip_prefix("oid:") {
615                if let Ok(oid) = oid_str.parse::<u64>() {
616                    let sym = self.lookup_symbol_by_order_id(wallet, oid);
617                    (Some(oid), sym)
618                } else {
619                    (None, None)
620                }
621            } else {
622                self.lookup_by_client_id(wallet, client_id)
623            }
624        } else {
625            (None, None)
626        };
627
628        // Validate we found the order
629        let (order_id, symbol) = match (order_id_opt, symbol_opt) {
630            (Some(oid), Some(sym)) => (oid, sym),
631            _ => {
632                error!(
633                    "Cancel failed for wallet {}: order not found (order_id={:?}, client_id={:?}), order may have already been filled/canceled",
634                    wallet, order_info.order_id, order_info.client_id
635                );
636                let reason = "This order has already been completed or cancelled".to_string();
637                self.send_rejection(request, reason, timestamp).await;
638                return;
639            }
640        };
641
642        info!(
643            "Processing cancel request for order_id={}, symbol={}, wallet={}",
644            order_id, symbol, wallet
645        );
646
647        // Check if instrument is expired (defense-in-depth check)
648        // Note: We still allow canceling orders on expired instruments if orderbook exists
649        // (needed to cancel open orders at expiry time)
650        if self
651            .expiry_manager
652            .is_instrument_expired(&symbol, &self.ctx.orderbooks)
653        {
654            debug!(
655                "Cancel request for expired instrument: {} (allowing cancel if orderbook exists)",
656                symbol
657            );
658        }
659
660        // Get the orderbook for this symbol
661        let orderbook = match self.ctx.orderbooks.get_mut(&symbol) {
662            Some(book) => book,
663            None => {
664                let reason = format!("Orderbook not found for symbol: {}", symbol);
665                error!("{}", reason);
666                self.send_rejection(request, reason, timestamp).await;
667                return;
668            }
669        };
670
671        // Cancel the order in the orderbook
672        // NOTE: We allocate L2 sequence AFTER successful cancel to avoid sequence leaks.
673        // Previously, sequences were allocated before cancel_order(), causing gaps when
674        // cancels failed (order not found). This caused engine_seq to drift ahead of
675        // cache_seq indefinitely.
676        match orderbook.cancel_order(order_id) {
677            Some(canceled_order) => {
678                info!(
679                    "Successfully canceled order_id={}, symbol={}, price={}, quantity={}",
680                    order_id, symbol, canceled_order.price, canceled_order.quantity
681                );
682
683                // Only allocate L2 sequence on successful cancel
684                let next_l2_seq = self.ctx.l2_update_seq.fetch_add(1, Ordering::SeqCst) + 1;
685                orderbook.set_pending_l2_sequence(next_l2_seq);
686
687                // Emit L2 and orderbook events
688                orderbook.emit_orderbook_events(timestamp);
689
690                // Update order index
691                self.ctx.order_index.remove_order(wallet, order_id);
692
693                let remaining_quantity = canceled_order.quantity;
694
695                let canceled_order_info = OrderInfo {
696                    symbol: symbol.clone(),
697                    price: canceled_order.price,
698                    size: remaining_quantity,
699                    side: canceled_order.side,
700                    tif: order_info.tif,
701                    client_id: order_info.client_id.clone(),
702                    order_id: Some(order_id),
703                    is_perp: order_info.is_perp,
704                    underlying: order_info.underlying.clone(),
705                    reduce_only: order_info.reduce_only,
706                    nonce: order_info.nonce,
707                    signature: order_info.signature.clone(),
708                    mmp_enabled: order_info.mmp_enabled,
709                    builder_code_address: order_info.builder_code_address,
710                };
711
712                // Send success response with Canceled status
713                let reason_text = if is_mmp_triggered {
714                    "Order canceled by MMP trigger".to_string()
715                } else {
716                    "Order canceled successfully".to_string()
717                };
718
719                let response = OrderUpdateMessage {
720                    timestamp,
721                    info: canceled_order_info,
722                    status: OrderUpdateStatus::Canceled,
723                    reason: Some(reason_text),
724                    filled_size: dec!(0),
725                    order_id: Some(order_id),
726                    wallet_address: *wallet,
727                    mmp_triggered: is_mmp_triggered,
728                    request_id: request.message.request_id.clone(),
729                };
730
731                // Send engine event
732                let _ = self
733                    .ctx
734                    .deps
735                    .event_sender
736                    .send(EngineMessage::OrderUpdate(response.clone()));
737
738                // Send response to API
739                let _ = request.response_tx.send(response).await;
740            }
741            None => {
742                // Order not in orderbook - it was already filled or cancelled
743                let reason = format!(
744                    "Order {} not found in orderbook {} (already filled or cancelled)",
745                    order_id, symbol
746                );
747                warn!("{}", reason);
748                self.send_rejection(request, reason, timestamp).await;
749            }
750        }
751    }
752
753    /// Process a replace order: atomically cancel an existing order, then create a new one.
754    ///
755    /// The order_id of the order to cancel is in `request.message.info.order_id`.
756    /// The new order details (symbol, price, size, side, tif, client_id) are in `request.message.info`.
757    /// If the cancel fails, the entire operation is rejected and no new order is placed.
758    pub(super) async fn process_replace_order(
759        &mut self,
760        request: UnifiedEngineRequest,
761        timestamp: u64,
762    ) {
763        let wallet = request.message.wallet;
764        let order_info = &request.message.info;
765
766        let cancel_order_id = match order_info.order_id {
767            Some(oid) => oid,
768            None => {
769                self.send_rejection(
770                    request,
771                    "Replace order missing order_id to cancel".to_string(),
772                    timestamp,
773                )
774                .await;
775                return;
776            }
777        };
778
779        info!(
780            "Processing replace order: cancel order_id={}, new symbol={}, wallet={}",
781            cancel_order_id, order_info.symbol, wallet
782        );
783
784        // --- Phase 1: Cancel the old order ---
785
786        // Look up the old order's symbol
787        let cancel_symbol = match self.lookup_symbol_by_order_id(&wallet, cancel_order_id) {
788            Some(sym) => sym,
789            None => {
790                error!(
791                    "Replace failed: cancel target order_id={} not found for wallet {}",
792                    cancel_order_id, wallet
793                );
794                self.send_rejection(
795                    request,
796                    "Cancel target order has already been completed or cancelled".to_string(),
797                    timestamp,
798                )
799                .await;
800                return;
801            }
802        };
803
804        // Get the orderbook and cancel the order
805        let orderbook = match self.ctx.orderbooks.get_mut(&cancel_symbol) {
806            Some(book) => book,
807            None => {
808                self.send_rejection(
809                    request,
810                    format!("Orderbook not found for symbol: {}", cancel_symbol),
811                    timestamp,
812                )
813                .await;
814                return;
815            }
816        };
817
818        match orderbook.cancel_order(cancel_order_id) {
819            Some(canceled_order) => {
820                info!(
821                    "Replace: canceled order_id={}, symbol={}, price={}, quantity={}",
822                    cancel_order_id, cancel_symbol, canceled_order.price, canceled_order.quantity
823                );
824
825                // Allocate L2 sequence and emit events for the cancel
826                let next_l2_seq = self.ctx.l2_update_seq.fetch_add(1, Ordering::SeqCst) + 1;
827                // Re-borrow orderbook after the cancel
828                let orderbook = self
829                    .ctx
830                    .orderbooks
831                    .get_mut(&cancel_symbol)
832                    .expect("orderbook disappeared between cancel and event emission");
833                orderbook.set_pending_l2_sequence(next_l2_seq);
834                orderbook.emit_orderbook_events(timestamp);
835
836                // Update order index
837                self.ctx.order_index.remove_order(&wallet, cancel_order_id);
838
839                // Emit cancel OrderUpdate event
840                let canceled_order_info = OrderInfo {
841                    symbol: cancel_symbol.clone(),
842                    price: canceled_order.price,
843                    size: canceled_order.quantity,
844                    side: canceled_order.side,
845                    tif: order_info.tif,
846                    client_id: None,
847                    order_id: Some(cancel_order_id),
848                    is_perp: false,
849                    underlying: None,
850                    reduce_only: None,
851                    nonce: None,
852                    signature: None,
853                    mmp_enabled: false,
854                    builder_code_address: None,
855                };
856
857                let cancel_response = OrderUpdateMessage {
858                    timestamp,
859                    info: canceled_order_info,
860                    status: OrderUpdateStatus::Canceled,
861                    reason: Some("Order canceled by replace".to_string()),
862                    filled_size: dec!(0),
863                    order_id: Some(cancel_order_id),
864                    wallet_address: wallet,
865                    mmp_triggered: false,
866                    request_id: request.message.request_id.clone(),
867                };
868
869                // Emit cancel event (NOT to response_tx)
870                let _ = self
871                    .ctx
872                    .deps
873                    .event_sender
874                    .send(EngineMessage::OrderUpdate(cancel_response));
875            }
876            None => {
877                // Order not found in orderbook
878                let reason = format!(
879                    "Replace failed: order {} not found in orderbook {} (already filled or cancelled)",
880                    cancel_order_id, cancel_symbol
881                );
882                warn!("{}", reason);
883                self.send_rejection(request, reason, timestamp).await;
884                return;
885            }
886        }
887
888        // --- Phase 2: Create the new order ---
889        // Build a CreateOrder request with the new order details (order_id cleared)
890        let mut create_info = request.message.info.clone();
891        create_info.order_id = None; // Will be assigned by allocate_and_ack
892
893        let create_message = OrderActionMessage {
894            timestamp: request.message.timestamp,
895            info: create_info,
896            action: OrderAction::CreateOrder,
897            wallet: request.message.wallet,
898            api_wallet_address: request.message.api_wallet_address,
899            mmp_triggered: false,
900            request_id: request.message.request_id.clone(),
901        };
902
903        let create_request = UnifiedEngineRequest {
904            message: create_message,
905            response_tx: request.response_tx,
906            enqueued_at: request.enqueued_at,
907            #[cfg(feature = "otel-tracing")]
908            trace_context: request.trace_context,
909        };
910
911        // Delegate to the existing create order handler
912        self.handle_create_order(create_request, timestamp).await;
913    }
914
915    /// Process a perp order (margin validation only, execution happens async)
916    pub(super) async fn process_perp_order(
917        &mut self,
918        request: UnifiedEngineRequest,
919        timestamp: u64,
920    ) {
921        debug!(
922            "Processing perp order with client_id {:?}: {:?}",
923            request.message.info.client_id, request
924        );
925        let order_info = &request.message.info;
926        let wallet = &request.message.wallet;
927
928        info!(
929            "🔍 RSM: Starting perp order margin validation for wallet: {}",
930            wallet
931        );
932
933        // Validate account has funds before processing order
934        // Use PortfolioService as canonical source for cash balance
935        match self.margin_manager.get_risk_account(
936            &self.ctx.deps,
937            &self.ctx.engine_positions,
938            &self.ctx.balance_ledger,
939            wallet,
940        ) {
941            Ok(account) => {
942                if account.cash <= 0.0 {
943                    warn!(
944                        "Perp order rejected for wallet {} - account has no funds (cash: {})",
945                        wallet, account.cash
946                    );
947                    self.send_rejection(
948                        request,
949                        "Account has no funds. Please deposit before trading.".to_string(),
950                        timestamp,
951                    )
952                    .await;
953                    return;
954                }
955            }
956            Err(e) => {
957                warn!("Perp order rejected for wallet {} - {}", wallet, e);
958                self.send_rejection(request, e, timestamp).await;
959                return;
960            }
961        }
962
963        // Get the underlying symbol
964        let underlying = match &order_info.underlying {
965            Some(u) => u.clone(),
966            None => {
967                error!("❌ RSM: Perp order missing underlying symbol");
968                self.send_rejection(
969                    request,
970                    "Perp order missing underlying symbol".to_string(),
971                    timestamp,
972                )
973                .await;
974                return;
975            }
976        };
977
978        info!(
979            "📊 RSM: Perp order details - underlying: {}, price: {}, size: {}, side: {:?}",
980            underlying, order_info.price, order_info.size, order_info.side
981        );
982
983        // Check margin requirements using unified check
984        debug!("🧮 RSM: Running unified margin check for perp order");
985        let margin_check_result = self.margin_manager.check_margin_for_order(
986            &self.ctx.deps,
987            &self.ctx.engine_positions,
988            &self.ctx.balance_ledger,
989            wallet,
990            order_info,
991            &self.ctx.order_index,
992        );
993
994        if let Err(margin_error) = margin_check_result {
995            warn!(
996                "⚠️ RSM: Perp order rejected due to insufficient margin: {}",
997                margin_error
998            );
999            self.send_rejection(request, margin_error, timestamp).await;
1000            return;
1001        }
1002
1003        info!(
1004            "✅ RSM: Perp order passed margin validation - wallet: {}, underlying: {}",
1005            wallet, underlying
1006        );
1007
1008        // Send success response (Acked status since we're not executing, just validating)
1009        let response = OrderUpdateMessage {
1010            timestamp,
1011            info: order_info.clone(),
1012            status: OrderUpdateStatus::Acked,
1013            reason: Some("Margin validation passed - order queued for execution".to_string()),
1014            filled_size: dec!(0),
1015            order_id: None,
1016            wallet_address: *wallet,
1017            mmp_triggered: false,
1018            request_id: request.message.request_id.clone(),
1019        };
1020
1021        // Send engine event
1022        let _ = self
1023            .ctx
1024            .deps
1025            .event_sender
1026            .send(EngineMessage::OrderUpdate(response.clone()));
1027
1028        // Send response to API
1029        let _ = request.response_tx.send(response).await;
1030    }
1031
1032    pub(super) async fn send_rejection(
1033        &self,
1034        request: UnifiedEngineRequest,
1035        reason: String,
1036        timestamp: u64,
1037    ) {
1038        debug!(
1039            "Sending rejection response for client_id {:?}: {:?}",
1040            request.message.info.client_id, reason
1041        );
1042
1043        let reason_label = crate::rsm::engine_deps::classify_rejection_reason(&reason);
1044        counter!("ht_engine_rejections_total", "reason" => reason_label).increment(1);
1045
1046        let response = OrderUpdateMessage {
1047            timestamp,
1048            info: request.message.info.clone(),
1049            status: OrderUpdateStatus::Rejected,
1050            reason: Some(reason),
1051            filled_size: dec!(0),
1052            order_id: None,
1053            wallet_address: request.message.wallet,
1054            mmp_triggered: false,
1055            request_id: request.message.request_id.clone(),
1056        };
1057
1058        self.ctx
1059            .deps
1060            .event_sender
1061            .send(EngineMessage::OrderUpdate(response.clone()))
1062            .unwrap_or_else(|e| {
1063                panic!(
1064                    "CRITICAL_FAILURE: Failed to send OrderUpdate (Rejected): {}. \
1065                     Event bus is dead. Restart required.",
1066                    e
1067                )
1068            });
1069        let _ = request.response_tx.send(response).await;
1070    }
1071
1072    pub(super) async fn handle_mmp_trigger(
1073        &mut self,
1074        wallet: &WalletAddress,
1075        underlying_symbol: &str,
1076        trigger_reason: String,
1077        timestamp: u64,
1078    ) {
1079        warn!(
1080            "MMP TRIGGERED: wallet={}, underlying={}, reason={}",
1081            wallet, underlying_symbol, trigger_reason
1082        );
1083
1084        let mmp_event = hypercall_types::MmpTriggeredMessage {
1085            wallet: *wallet,
1086            currency: underlying_symbol.to_string(),
1087            reason: trigger_reason.clone(),
1088            timestamp,
1089        };
1090        self.ctx
1091            .deps
1092            .event_sender
1093            .send(EngineMessage::MmpTriggered(mmp_event))
1094            .unwrap_or_else(|e| {
1095                panic!(
1096                    "CRITICAL_FAILURE: Failed to send MmpTriggered event for wallet {}: {}. \
1097                     Event bus is dead. Restart required.",
1098                    wallet, e
1099                )
1100            });
1101
1102        let mmp_order_ids = self
1103            .ctx
1104            .order_index
1105            .get_mmp_order_ids(wallet, underlying_symbol);
1106
1107        info!(
1108            "MMP trigger: Found {} MMP-enabled orders for wallet {} and underlying {}",
1109            mmp_order_ids.len(),
1110            wallet,
1111            underlying_symbol
1112        );
1113
1114        for (order_id, symbol) in mmp_order_ids {
1115            info!(
1116                "MMP auto-cancel: order_id={}, symbol={}, wallet={}, underlying={}",
1117                order_id, symbol, wallet, underlying_symbol
1118            );
1119
1120            let cancel_info = OrderInfo {
1121                symbol,
1122                price: dec!(0),
1123                size: dec!(0),
1124                side: Side::Buy,
1125                tif: TimeInForce::GTC,
1126                client_id: None,
1127                order_id: Some(order_id),
1128                is_perp: false,
1129                underlying: None,
1130                reduce_only: None,
1131                nonce: None,
1132                signature: None,
1133                mmp_enabled: true,
1134                builder_code_address: None,
1135            };
1136
1137            let (response_tx, _response_rx) = mpsc::channel(1);
1138            let deterministic_request_id = format!("mmp:{}:{}:{}", timestamp, wallet, order_id);
1139            let cancel_request = UnifiedEngineRequest {
1140                message: OrderActionMessage {
1141                    timestamp,
1142                    info: cancel_info,
1143                    action: OrderAction::CancelOrder,
1144                    wallet: *wallet,
1145                    api_wallet_address: None,
1146                    mmp_triggered: true,
1147                    request_id: Some(deterministic_request_id),
1148                },
1149                response_tx,
1150                enqueued_at: Instant::now(),
1151                #[cfg(feature = "otel-tracing")]
1152                trace_context: None,
1153            };
1154
1155            self.mmp_cancel_order(cancel_request, timestamp).await;
1156        }
1157    }
1158
1159    async fn mmp_cancel_order(&mut self, request: UnifiedEngineRequest, timestamp: u64) {
1160        let order_info = &request.message.info;
1161        let wallet = &request.message.wallet;
1162
1163        let (order_id_opt, symbol_opt) = if let Some(oid) = order_info.order_id {
1164            let sym = self
1165                .ctx
1166                .order_index
1167                .get_order_symbol(wallet, oid)
1168                .map(|s| s.to_string());
1169            (Some(oid), sym)
1170        } else if let Some(ref client_id) = order_info.client_id {
1171            if let Some(oid_str) = client_id.strip_prefix("oid:") {
1172                if let Ok(oid) = oid_str.parse::<u64>() {
1173                    let sym = self
1174                        .ctx
1175                        .order_index
1176                        .get_order_symbol(wallet, oid)
1177                        .map(|s| s.to_string());
1178                    (Some(oid), sym)
1179                } else {
1180                    (None, None)
1181                }
1182            } else if let Some((oid, sym)) = self
1183                .ctx
1184                .order_index
1185                .get_order_by_client_id(wallet, client_id)
1186            {
1187                (Some(oid), Some(sym.to_string()))
1188            } else {
1189                (None, None)
1190            }
1191        } else {
1192            (None, None)
1193        };
1194
1195        let (order_id, symbol) = match (order_id_opt, symbol_opt) {
1196            (Some(oid), Some(sym)) => (oid, sym),
1197            _ => {
1198                error!(
1199                    "Cancel failed for wallet {}: order not found (order_id={:?}, client_id={:?})",
1200                    wallet, order_info.order_id, order_info.client_id
1201                );
1202                let reason = "This order has already been completed or cancelled".to_string();
1203                self.send_rejection(request, reason, timestamp).await;
1204                return;
1205            }
1206        };
1207
1208        let is_mmp_triggered = request.message.mmp_triggered;
1209
1210        let orderbook = match self.ctx.orderbooks.get_mut(&symbol) {
1211            Some(book) => book,
1212            None => {
1213                let reason = format!("Orderbook not found for symbol: {}", symbol);
1214                error!("{}", reason);
1215                self.send_rejection(request, reason, timestamp).await;
1216                return;
1217            }
1218        };
1219
1220        match orderbook.cancel_order(order_id) {
1221            Some(canceled_order) => {
1222                let next_l2_seq = self.ctx.l2_update_seq.fetch_add(1, Ordering::SeqCst) + 1;
1223                orderbook.set_pending_l2_sequence(next_l2_seq);
1224
1225                info!(
1226                    "Successfully canceled order_id={}, symbol={}, price={}, quantity={}",
1227                    order_id, symbol, canceled_order.price, canceled_order.quantity
1228                );
1229
1230                orderbook.emit_orderbook_events(timestamp);
1231                self.ctx.order_index.remove_order(wallet, order_id);
1232
1233                let canceled_order_info = OrderInfo {
1234                    symbol: symbol.clone(),
1235                    price: canceled_order.price,
1236                    size: canceled_order.quantity,
1237                    side: canceled_order.side,
1238                    tif: order_info.tif,
1239                    client_id: order_info.client_id.clone(),
1240                    order_id: Some(order_id),
1241                    is_perp: order_info.is_perp,
1242                    underlying: order_info.underlying.clone(),
1243                    reduce_only: order_info.reduce_only,
1244                    nonce: order_info.nonce,
1245                    signature: order_info.signature.clone(),
1246                    mmp_enabled: order_info.mmp_enabled,
1247                    builder_code_address: order_info.builder_code_address,
1248                };
1249
1250                let reason_text = if is_mmp_triggered {
1251                    "Order canceled by MMP trigger".to_string()
1252                } else {
1253                    "Order canceled successfully".to_string()
1254                };
1255
1256                let response = OrderUpdateMessage {
1257                    timestamp,
1258                    info: canceled_order_info,
1259                    status: OrderUpdateStatus::Canceled,
1260                    reason: Some(reason_text),
1261                    filled_size: dec!(0),
1262                    order_id: Some(order_id),
1263                    wallet_address: *wallet,
1264                    mmp_triggered: is_mmp_triggered,
1265                    request_id: request.message.request_id.clone(),
1266                };
1267
1268                self.ctx
1269                    .deps
1270                    .event_sender
1271                    .send(EngineMessage::OrderUpdate(response.clone()))
1272                    .unwrap_or_else(|e| {
1273                        panic!(
1274                            "CRITICAL_FAILURE: Failed to send OrderUpdate (Canceled) for order {}: {}. \
1275                             Event bus is dead. Restart required.",
1276                            order_id, e
1277                        )
1278                    });
1279
1280                let _ = request.response_tx.send(response).await;
1281            }
1282            None => {
1283                let reason = format!(
1284                    "Order {} not found in orderbook {} (already filled or cancelled)",
1285                    order_id, symbol
1286                );
1287                warn!("{}", reason);
1288                self.send_rejection(request, reason, timestamp).await;
1289            }
1290        }
1291    }
1292
1293    pub(super) async fn handle_self_trade_prevention(
1294        &mut self,
1295        request: &UnifiedEngineRequest,
1296        taker_order_id: u64,
1297        maker_order_id: u64,
1298        order_info: &OrderInfo,
1299        wallet: &WalletAddress,
1300        partial_fills: &[Fill],
1301        timestamp: u64,
1302    ) {
1303        let symbol = &order_info.symbol;
1304        let filled_size: Decimal = partial_fills.iter().map(|f| f.size).sum();
1305
1306        info!(
1307            "Self-trade prevention: maker={}, taker={}, symbol={}, partial_fills={}",
1308            maker_order_id,
1309            taker_order_id,
1310            symbol,
1311            partial_fills.len()
1312        );
1313
1314        if let Some(orderbook) = self.ctx.orderbooks.get_mut(symbol) {
1315            if let Some(canceled) = orderbook.cancel_order(maker_order_id) {
1316                let next_l2_seq = self.ctx.l2_update_seq.fetch_add(1, Ordering::SeqCst) + 1;
1317                orderbook.set_pending_l2_sequence(next_l2_seq);
1318                orderbook.emit_orderbook_events(timestamp);
1319                self.ctx.order_index.remove_order(wallet, maker_order_id);
1320
1321                let maker_response = OrderUpdateMessage {
1322                    timestamp,
1323                    info: OrderInfo {
1324                        symbol: symbol.clone(),
1325                        price: canceled.price,
1326                        size: canceled.quantity,
1327                        side: canceled.side,
1328                        tif: TimeInForce::GTC,
1329                        client_id: None,
1330                        order_id: Some(maker_order_id),
1331                        is_perp: order_info.is_perp,
1332                        underlying: order_info.underlying.clone(),
1333                        reduce_only: None,
1334                        nonce: None,
1335                        signature: None,
1336                        mmp_enabled: false,
1337                        builder_code_address: None,
1338                    },
1339                    status: OrderUpdateStatus::Canceled,
1340                    reason: Some("Self-trade prevention".to_string()),
1341                    filled_size: dec!(0),
1342                    order_id: Some(maker_order_id),
1343                    wallet_address: *wallet,
1344                    mmp_triggered: false,
1345                    request_id: None,
1346                };
1347
1348                self.ctx
1349                    .deps
1350                    .event_sender
1351                    .send(EngineMessage::OrderUpdate(maker_response.clone()))
1352                    .unwrap_or_else(|e| {
1353                        panic!(
1354                            "CRITICAL_FAILURE: Failed to send OrderUpdate (STP maker cancel) for order {}: {}. \
1355                             Event bus is dead. Restart required.",
1356                            maker_order_id, e
1357                        )
1358                    });
1359                if let Some(ref ws_tx) = self.ctx.deps.ws_event_sender {
1360                    let _ = ws_tx.send(EngineMessage::OrderUpdate(maker_response));
1361                }
1362            }
1363        }
1364
1365        self.ctx.order_index.remove_order(wallet, taker_order_id);
1366
1367        counter!("ht_engine_self_trade_prevented_total").increment(1);
1368
1369        let (status, reason) = if filled_size > dec!(0) {
1370            (
1371                OrderUpdateStatus::Canceled,
1372                "Self-trade prevention (partial fill kept)",
1373            )
1374        } else {
1375            (OrderUpdateStatus::Rejected, "Self-trade prevention")
1376        };
1377
1378        let taker_response = OrderUpdateMessage {
1379            timestamp,
1380            info: order_info.clone(),
1381            status,
1382            reason: Some(reason.to_string()),
1383            filled_size,
1384            order_id: Some(taker_order_id),
1385            wallet_address: *wallet,
1386            mmp_triggered: false,
1387            request_id: request.message.request_id.clone(),
1388        };
1389
1390        self.ctx
1391            .deps
1392            .event_sender
1393            .send(EngineMessage::OrderUpdate(taker_response.clone()))
1394            .unwrap_or_else(|e| {
1395                panic!(
1396                    "CRITICAL_FAILURE: Failed to send OrderUpdate (STP taker) for order {}: {}. \
1397                     Event bus is dead. Restart required.",
1398                    taker_order_id, e
1399                )
1400            });
1401        if let Some(ref ws_tx) = self.ctx.deps.ws_event_sender {
1402            let _ = ws_tx.send(EngineMessage::OrderUpdate(taker_response.clone()));
1403        }
1404
1405        let _ = request.response_tx.send(taker_response).await;
1406    }
1407}