1use std::collections::{HashMap, HashSet};
2use std::str::FromStr;
3use std::time::Instant;
4
5use rust_decimal::prelude::ToPrimitive;
6use rust_decimal::Decimal;
7use rust_decimal_macros::dec;
8use utoipa::ToSchema;
9
10use crate::error::ApiError;
11use crate::middleware::SignerContext;
12use crate::request_auth::verify_request;
13use crate::sonic_json::SonicJson;
14use crate::trading_halt::TradingHaltState;
15use axum::extract::State;
16use hypercall_runtime_api::increment_pending_requests;
17use hypercall_types::utils::get_timestamp_millis;
18use hypercall_types::{
19 to_contract_units_decimal, validate_price_precision, ReplaceOrderRequest, WalletAddress,
20 MAX_PRICE_SIGNIFICANT_FIGURES,
21};
22use hypercall_types::{
23 OrderAction, OrderActionMessage, OrderInfo, OrderUpdateMessage, OrderUpdateStatus, TimeInForce,
24};
25use hypercall_types::{ParsedOptionSymbol, Side};
26use serde::{Deserialize, Serialize};
27use tokio::sync::{mpsc, RwLock};
28use tokio::time::timeout;
29use tracing::Instrument;
30use uuid::Uuid;
31
32#[cfg(feature = "otel-tracing")]
33use tracing_opentelemetry::OpenTelemetrySpanExt;
34
35use super::{ensure_order_creation_allowed, AppState, ENGINE_RESPONSE_TIMEOUT};
36
37pub use super::bulk_orders::BulkOrderResult;
38use super::bulk_orders::MAX_BULK_ORDER_SIZE;
39
40#[derive(Debug, Deserialize, ToSchema)]
41pub struct BulkReplaceOrderRequest {
42 pub replacements: Vec<ReplaceOrderRequest>,
44}
45
46#[derive(Debug, Serialize, ToSchema)]
47pub struct BulkReplaceOrderResponse {
48 pub results: Vec<BulkOrderResult>,
50}
51
52#[utoipa::path(
59 put,
60 path = "/order",
61 request_body = ReplaceOrderRequest,
62 responses(
63 (status = 200, description = "Replace result (new order status)", body = OrderUpdateMessage),
64 (status = 400, description = "Invalid request"),
65 (status = 401, description = "Unauthorized"),
66 (status = 500, description = "Internal server error")
67 ),
68 security(("eip712_signature" = [])),
69 tag = "Trading"
70)]
71pub async fn replace_order(
72 State(state): State<AppState>,
73 signer_ctx: SignerContext,
74 SonicJson(request): SonicJson<ReplaceOrderRequest>,
75) -> Result<SonicJson<OrderUpdateMessage>, ApiError> {
76 let span = tracing::info_span!(
77 "api.replace_order",
78 wallet = %signer_ctx.wallet_address,
79 cancel_order_id = %request.order_id,
80 symbol = %request.symbol,
81 side = ?request.side,
82 size = %request.size,
83 price = %request.price,
84 client_id = ?request.client_id,
85 action = "ReplaceOrder",
86 );
87
88 async move {
89 let price: Decimal = Decimal::from_str(&request.price).map_err(|_| {
91 ApiError::bad_request(format!("Invalid price format: {}", request.price))
92 })?;
93 let size: Decimal = Decimal::from_str(&request.size)
94 .map_err(|_| ApiError::bad_request(format!("Invalid size format: {}", request.size)))?;
95
96 let _parsed_symbol = ParsedOptionSymbol::from_symbol(&request.symbol)
98 .map_err(|e| ApiError::bad_request(format!("Invalid symbol: {}", e)))?;
99
100 ensure_order_creation_allowed(&state, &request.symbol).await?;
101
102 if price <= dec!(0) {
103 return Err(ApiError::bad_request("Price must be positive"));
104 }
105 if size <= dec!(0) {
106 return Err(ApiError::bad_request("Size must be positive"));
107 }
108
109 let price_f64 = price
110 .to_f64()
111 .ok_or_else(|| ApiError::bad_request("Price value out of range"))?;
112 if let Err(e) = validate_price_precision(price_f64, MAX_PRICE_SIGNIFICANT_FIGURES) {
113 return Err(ApiError::bad_request(format!(
114 "Invalid price precision: {}",
115 e
116 )));
117 }
118
119 tracing::info!(
120 "Replacing order {} for wallet: {} (signed by: {}), new symbol: {}",
121 request.order_id,
122 signer_ctx.wallet_address,
123 signer_ctx.signer_address,
124 request.symbol
125 );
126
127 let order_info = OrderInfo {
129 symbol: request.symbol.clone(),
130 price,
131 size: to_contract_units_decimal(&request.symbol, size),
132 side: request.side,
133 tif: request.tif,
134 client_id: request.client_id,
135 order_id: Some(request.order_id),
136 is_perp: false,
137 underlying: None,
138 reduce_only: None,
139 nonce: Some(request.nonce),
140 signature: None,
141 mmp_enabled: request.mmp_enabled,
142 builder_code_address: request.builder_code_address,
143 };
144
145 let order_action_msg = OrderActionMessage {
146 timestamp: get_timestamp_millis(),
147 info: order_info,
148 action: OrderAction::ReplaceOrder,
149 wallet: request.wallet,
150 api_wallet_address: Some(signer_ctx.signer_address),
151 mmp_triggered: false,
152 request_id: Some(Uuid::now_v7().to_string()),
153 };
154
155 let (response_tx, mut response_rx) = mpsc::channel(1);
156
157 let engine_request = hypercall_runtime_api::UnifiedEngineRequest {
158 message: order_action_msg,
159 response_tx,
160 enqueued_at: Instant::now(),
161 #[cfg(feature = "otel-tracing")]
162 trace_context: Some(tracing::Span::current().context()),
163 };
164
165 increment_pending_requests();
166 state
167 .order_sender
168 .send(engine_request)
169 .await
170 .map_err(|_| ApiError::internal_error("Failed to send replace order to engine"))?;
171
172 let response = match timeout(ENGINE_RESPONSE_TIMEOUT, response_rx.recv()).await {
173 Ok(Some(resp)) => resp,
174 Ok(None) => {
175 return Err(ApiError::internal_error("No response from engine"));
176 }
177 Err(_) => {
178 return Err(ApiError::gateway_timeout(
179 "Timeout waiting for engine response",
180 ));
181 }
182 };
183
184 Ok(SonicJson(response))
185 }
186 .instrument(span)
187 .await
188}
189
190#[utoipa::path(
204 put,
205 path = "/bulk_order",
206 request_body = BulkReplaceOrderRequest,
207 responses(
208 (status = 200, description = "Bulk replace results (per-leg, in input order)", body = BulkReplaceOrderResponse),
209 (status = 400, description = "Invalid request (e.g., too many replacements)"),
210 (status = 500, description = "Internal server error")
211 ),
212 security(("eip712_signature" = [])),
213 tag = "Trading"
214)]
215pub async fn bulk_replace_order(
243 State(state): State<AppState>,
244 SonicJson(request): SonicJson<BulkReplaceOrderRequest>,
245) -> Result<SonicJson<BulkReplaceOrderResponse>, ApiError> {
246 if request.replacements.len() > MAX_BULK_ORDER_SIZE {
247 return Err(ApiError::bad_request(format!(
248 "Bulk replace request exceeds max size: {} > {}",
249 request.replacements.len(),
250 MAX_BULK_ORDER_SIZE
251 )));
252 }
253
254 let n = request.replacements.len();
255 tracing::info!(
256 n,
257 "Processing bulk replace (cancel-all-then-create-all ordering)"
258 );
259
260 let mut validated: Vec<Result<ValidatedReplace, BulkOrderResult>> = Vec::with_capacity(n);
262 for (index, replace_req) in request.replacements.into_iter().enumerate() {
263 validated.push(validate_replace_request(&state, replace_req, index).await);
264 }
265
266 let results = orchestrate_bulk_replace(
267 &state.order_sender,
268 validated,
269 BULK_REPLACE_PHASE_DEADLINE,
270 BULK_REPLACE_PHASE_DEADLINE,
271 Some(&state.trading_halt),
272 )
273 .await;
274
275 Ok(SonicJson(BulkReplaceOrderResponse { results }))
276}
277
278const BULK_REPLACE_PHASE_DEADLINE: std::time::Duration = std::time::Duration::from_secs(5);
286
287async fn orchestrate_bulk_replace(
308 order_sender: &mpsc::Sender<hypercall_runtime_api::UnifiedEngineRequest>,
309 validated: Vec<Result<ValidatedReplace, BulkOrderResult>>,
310 cancel_phase_deadline: std::time::Duration,
311 create_phase_deadline: std::time::Duration,
312 trading_halt: Option<&std::sync::Arc<RwLock<TradingHaltState>>>,
313) -> Vec<BulkOrderResult> {
314 let mut cancel_rxs: Vec<(usize, mpsc::Receiver<OrderUpdateMessage>)> = Vec::new();
316 let mut cancel_enqueue_failed: HashSet<usize> = HashSet::new();
317 for (i, v) in validated.iter().enumerate() {
318 let Ok(v) = v else { continue };
319 match dispatch_to_engine(order_sender, build_cancel_message(v)).await {
320 Ok(rx) => {
321 cancel_rxs.push((i, rx));
322 }
323 Err(_) => {
324 tracing::error!(index = i, "Failed to enqueue cancel to engine");
325 cancel_enqueue_failed.insert(i);
326 }
327 }
328 }
329
330 let mut cancel_responses: HashMap<usize, OrderUpdateMessage> = HashMap::new();
333 let cancel_deadline = tokio::time::Instant::now() + cancel_phase_deadline;
334 for (i, mut rx) in cancel_rxs {
335 let now = tokio::time::Instant::now();
336 let remaining = cancel_deadline.saturating_duration_since(now);
337 match timeout(remaining, rx.recv()).await {
338 Ok(Some(resp)) => {
339 cancel_responses.insert(i, resp);
340 }
341 Ok(None) => tracing::error!(index = i, "No cancel response from engine"),
342 Err(_) => tracing::warn!(
343 index = i,
344 "Cancel response not received before phase deadline; \
345 skipping create because nonce consumption is unknown"
346 ),
347 }
348 }
349
350 let mut halted_after_cancel: HashSet<usize> = HashSet::new();
357 if let Some(halt_lock) = trading_halt {
358 let halt_state = halt_lock.read().await;
359 for (i, v) in validated.iter().enumerate() {
360 let Ok(v) = v else { continue };
361 if halt_state
362 .blocked_reason(&v.new_order_info.symbol)
363 .is_some()
364 {
365 halted_after_cancel.insert(i);
366 }
367 }
368 }
369
370 let mut create_rxs: Vec<(usize, mpsc::Receiver<OrderUpdateMessage>)> = Vec::new();
387 for (i, v) in validated.iter().enumerate() {
388 let Ok(v) = v else { continue };
389 if cancel_enqueue_failed.contains(&i) {
390 continue;
391 }
392 if halted_after_cancel.contains(&i) {
393 continue;
394 }
395 let cancel_allows_create = match cancel_responses.get(&i) {
396 Some(resp) => resp.status == OrderUpdateStatus::Canceled,
397 None => false,
398 };
399 if !cancel_allows_create {
400 continue;
401 }
402 match dispatch_to_engine(order_sender, build_create_message(v)).await {
403 Ok(rx) => create_rxs.push((i, rx)),
404 Err(_) => {
405 tracing::error!(index = i, "Failed to enqueue create to engine after cancel");
406 }
407 }
408 }
409
410 let mut create_responses: HashMap<usize, OrderUpdateMessage> = HashMap::new();
411 let create_deadline = tokio::time::Instant::now() + create_phase_deadline;
412 for (i, mut rx) in create_rxs {
413 let now = tokio::time::Instant::now();
414 let remaining = create_deadline.saturating_duration_since(now);
415 match timeout(remaining, rx.recv()).await {
416 Ok(Some(resp)) => {
417 create_responses.insert(i, resp);
418 }
419 Ok(None) => tracing::error!(index = i, "No create response from engine"),
420 Err(_) => tracing::warn!(
421 index = i,
422 "Create response not received before phase deadline"
423 ),
424 }
425 }
426
427 validated
429 .into_iter()
430 .enumerate()
431 .map(|(i, v)| {
432 merge_leg_result(
433 i,
434 v,
435 cancel_responses.remove(&i),
436 create_responses.remove(&i),
437 cancel_enqueue_failed.contains(&i),
438 halted_after_cancel.contains(&i),
439 )
440 })
441 .collect()
442}
443
444fn merge_leg_result(
445 index: usize,
446 validated: Result<ValidatedReplace, BulkOrderResult>,
447 cancel: Option<OrderUpdateMessage>,
448 create: Option<OrderUpdateMessage>,
449 cancel_enqueue_failed: bool,
450 halted_after_cancel: bool,
451) -> BulkOrderResult {
452 if let Err(validation_failure) = validated {
453 return validation_failure;
454 }
455 if cancel_enqueue_failed {
456 return BulkOrderResult {
457 index,
458 success: false,
459 data: None,
460 error: Some("Failed to enqueue cancel to engine (queue closed or full)".to_string()),
461 };
462 }
463 if let Some(create_resp) = create {
464 let success = matches!(
465 create_resp.status,
466 OrderUpdateStatus::Acked
467 | OrderUpdateStatus::Open
468 | OrderUpdateStatus::Filled
469 | OrderUpdateStatus::PartiallyFilled
470 );
471 return BulkOrderResult {
472 index,
473 success,
474 data: Some(create_resp),
475 error: None,
476 };
477 }
478 if halted_after_cancel {
479 return BulkOrderResult {
480 index,
481 success: false,
482 data: cancel,
483 error: Some(
484 "Trading halted for this symbol after cancel phase; create was skipped".to_string(),
485 ),
486 };
487 }
488 if let Some(cancel_resp) = cancel {
489 return if cancel_resp.status == OrderUpdateStatus::Canceled {
490 BulkOrderResult {
491 index,
492 success: false,
493 data: Some(cancel_resp),
494 error: Some(
495 "Cancel succeeded but create response did not arrive before the phase deadline"
496 .to_string(),
497 ),
498 }
499 } else {
500 BulkOrderResult {
501 index,
502 success: false,
503 data: Some(cancel_resp),
504 error: Some("Replace aborted: cancel of existing order failed".to_string()),
505 }
506 };
507 }
508 BulkOrderResult {
512 index,
513 success: false,
514 data: None,
515 error: Some(
516 "Cancel enqueued but no cancel response was received before the phase deadline; create skipped because nonce consumption is unknown"
517 .to_string(),
518 ),
519 }
520}
521
522#[derive(Debug, Clone)]
528struct ValidatedReplace {
529 wallet: WalletAddress,
530 signer_address: WalletAddress,
531 old_order_id: u64,
532 new_order_info: OrderInfo,
536}
537
538async fn validate_replace_request(
543 state: &AppState,
544 req: ReplaceOrderRequest,
545 index: usize,
546) -> Result<ValidatedReplace, BulkOrderResult> {
547 let authorized = verify_request(
548 state.agent_auth.as_ref(),
549 &req,
550 state.runtime_config.signing_chain_id,
551 )
552 .map_err(|e| BulkOrderResult {
553 index,
554 success: false,
555 data: None,
556 error: Some(e.to_string()),
557 })?;
558
559 if authorized.signer.wallet_address != req.wallet {
560 return Err(BulkOrderResult {
561 index,
562 success: false,
563 data: None,
564 error: Some("Verified wallet does not match request wallet".to_string()),
565 });
566 }
567
568 ParsedOptionSymbol::from_symbol(&req.symbol).map_err(|e| BulkOrderResult {
569 index,
570 success: false,
571 data: None,
572 error: Some(format!("Invalid symbol: {}", e)),
573 })?;
574
575 ensure_order_creation_allowed(state, &req.symbol)
576 .await
577 .map_err(|err| BulkOrderResult {
578 index,
579 success: false,
580 data: None,
581 error: Some(err.message),
582 })?;
583
584 let price = Decimal::from_str(&req.price).map_err(|_| BulkOrderResult {
585 index,
586 success: false,
587 data: None,
588 error: Some(format!("Invalid price format: {}", req.price)),
589 })?;
590 let size = Decimal::from_str(&req.size).map_err(|_| BulkOrderResult {
591 index,
592 success: false,
593 data: None,
594 error: Some(format!("Invalid size format: {}", req.size)),
595 })?;
596 if price <= dec!(0) {
597 return Err(BulkOrderResult {
598 index,
599 success: false,
600 data: None,
601 error: Some("Price must be greater than 0".to_string()),
602 });
603 }
604 if size <= dec!(0) {
605 return Err(BulkOrderResult {
606 index,
607 success: false,
608 data: None,
609 error: Some("Size must be greater than 0".to_string()),
610 });
611 }
612 let price_f64 = price.to_f64().ok_or_else(|| BulkOrderResult {
613 index,
614 success: false,
615 data: None,
616 error: Some("Price value out of range".to_string()),
617 })?;
618 validate_price_precision(price_f64, MAX_PRICE_SIGNIFICANT_FIGURES).map_err(|e| {
619 BulkOrderResult {
620 index,
621 success: false,
622 data: None,
623 error: Some(format!("Price validation failed: {}", e)),
624 }
625 })?;
626
627 let new_order_info = OrderInfo {
628 symbol: req.symbol.clone(),
629 price,
630 size: to_contract_units_decimal(&req.symbol, size),
631 side: req.side,
632 tif: req.tif,
633 client_id: req.client_id,
634 order_id: None,
635 is_perp: false,
636 underlying: None,
637 reduce_only: None,
638 nonce: Some(req.nonce),
639 signature: None,
640 mmp_enabled: req.mmp_enabled,
641 builder_code_address: req.builder_code_address,
642 };
643
644 Ok(ValidatedReplace {
645 wallet: req.wallet,
646 signer_address: authorized.signer.signer_address,
647 old_order_id: req.order_id,
648 new_order_info,
649 })
650}
651
652fn build_cancel_message(v: &ValidatedReplace) -> OrderActionMessage {
657 let cancel_info = OrderInfo {
658 symbol: String::new(),
659 price: dec!(0),
660 size: dec!(0),
661 side: Side::Buy,
662 tif: TimeInForce::GTC,
663 client_id: None,
664 order_id: Some(v.old_order_id),
665 is_perp: false,
666 underlying: None,
667 reduce_only: None,
668 nonce: v.new_order_info.nonce,
672 signature: None,
673 mmp_enabled: false,
674 builder_code_address: None,
675 };
676 OrderActionMessage {
677 timestamp: get_timestamp_millis(),
678 info: cancel_info,
679 action: OrderAction::CancelOrder,
680 wallet: v.wallet,
681 api_wallet_address: Some(v.signer_address),
682 mmp_triggered: false,
683 request_id: Some(Uuid::now_v7().to_string()),
684 }
685}
686
687fn build_create_message(v: &ValidatedReplace) -> OrderActionMessage {
688 let mut info = v.new_order_info.clone();
689 info.nonce = None;
690 OrderActionMessage {
691 timestamp: get_timestamp_millis(),
692 info,
693 action: OrderAction::CreateOrder,
694 wallet: v.wallet,
695 api_wallet_address: Some(v.signer_address),
696 mmp_triggered: false,
697 request_id: Some(Uuid::now_v7().to_string()),
698 }
699}
700
701async fn dispatch_to_engine(
704 order_sender: &mpsc::Sender<hypercall_runtime_api::UnifiedEngineRequest>,
705 msg: OrderActionMessage,
706) -> Result<mpsc::Receiver<OrderUpdateMessage>, ()> {
707 let (response_tx, response_rx) = mpsc::channel(1);
708 let engine_request = hypercall_runtime_api::UnifiedEngineRequest {
709 message: msg,
710 response_tx,
711 enqueued_at: Instant::now(),
712 #[cfg(feature = "otel-tracing")]
713 trace_context: Some(tracing::Span::current().context()),
714 };
715 increment_pending_requests();
716 order_sender.send(engine_request).await.map_err(|_| ())?;
717 Ok(response_rx)
718}
719
720#[cfg(test)]
721mod bulk_replace_tests {
722 use super::*;
736 use rust_decimal_macros::dec;
737 use std::time::Duration;
738 use tokio::sync::mpsc;
739
740 fn test_wallet(id: u8) -> WalletAddress {
741 let mut bytes = [0u8; 20];
742 bytes[19] = id;
743 WalletAddress::from(bytes)
744 }
745
746 fn mk_validated(old_order_id: u64) -> ValidatedReplace {
747 mk_validated_with(old_order_id, "BTC-20260101-75000-C".to_string(), false)
748 }
749
750 fn mk_validated_with_symbol(old_order_id: u64, symbol: String) -> ValidatedReplace {
751 mk_validated_with(old_order_id, symbol, false)
752 }
753
754 fn mk_validated_mmp(old_order_id: u64) -> ValidatedReplace {
755 mk_validated_with(old_order_id, "BTC-20260101-75000-C".to_string(), true)
756 }
757
758 fn mk_validated_with(old_order_id: u64, symbol: String, mmp_enabled: bool) -> ValidatedReplace {
759 let order_info = OrderInfo {
760 symbol,
761 price: dec!(100),
762 size: dec!(1),
763 side: Side::Buy,
764 tif: TimeInForce::GTC,
765 client_id: None,
766 order_id: None,
767 is_perp: false,
768 underlying: None,
769 reduce_only: None,
770 nonce: Some(1_700_000_000_000 + old_order_id),
771 signature: None,
772 mmp_enabled,
773 builder_code_address: None,
774 };
775 ValidatedReplace {
776 wallet: test_wallet(1),
777 signer_address: test_wallet(2),
778 old_order_id,
779 new_order_info: order_info,
780 }
781 }
782
783 fn mk_validation_error(index: usize) -> BulkOrderResult {
784 BulkOrderResult {
785 index,
786 success: false,
787 data: None,
788 error: Some("Signature verification failed: test".to_string()),
789 }
790 }
791
792 fn mk_response(
793 order_id: Option<u64>,
794 status: OrderUpdateStatus,
795 wallet: WalletAddress,
796 ) -> OrderUpdateMessage {
797 OrderUpdateMessage {
798 timestamp: 0,
799 info: OrderInfo {
800 symbol: "BTC-20260101-75000-C".to_string(),
801 price: dec!(100),
802 size: dec!(1),
803 side: Side::Buy,
804 tif: TimeInForce::GTC,
805 client_id: None,
806 order_id,
807 is_perp: false,
808 underlying: None,
809 reduce_only: None,
810 nonce: None,
811 signature: None,
812 mmp_enabled: false,
813 builder_code_address: None,
814 },
815 status,
816 reason: None,
817 filled_size: dec!(0),
818 order_id,
819 wallet_address: wallet,
820 mmp_triggered: false,
821 request_id: None,
822 }
823 }
824
825 async fn run_mock_engine<F>(
829 mut engine_rx: mpsc::Receiver<hypercall_runtime_api::UnifiedEngineRequest>,
830 mut responder: F,
831 ) -> Vec<OrderAction>
832 where
833 F: FnMut(&hypercall_runtime_api::UnifiedEngineRequest) -> Option<OrderUpdateMessage>
834 + Send
835 + 'static,
836 {
837 let mut observed: Vec<OrderAction> = Vec::new();
838 while let Some(req) = engine_rx.recv().await {
839 observed.push(req.message.action.clone());
840 if let Some(resp) = responder(&req) {
841 let _ = req.response_tx.send(resp).await;
842 }
843 }
844 observed
845 }
846
847 #[tokio::test]
850 async fn cancels_dispatched_before_creates_for_batch_of_three() {
851 let (engine_tx, engine_rx) = mpsc::channel(32);
852 let wallet = test_wallet(1);
853
854 let engine_handle = tokio::spawn({
855 async move {
856 run_mock_engine(engine_rx, move |req| {
857 let status = match req.message.action {
858 OrderAction::CancelOrder => OrderUpdateStatus::Canceled,
859 OrderAction::CreateOrder => OrderUpdateStatus::Open,
860 _ => OrderUpdateStatus::Rejected,
861 };
862 Some(mk_response(req.message.info.order_id, status, wallet))
863 })
864 .await
865 }
866 });
867
868 let validated = vec![
869 Ok(mk_validated(101)),
870 Ok(mk_validated(102)),
871 Ok(mk_validated(103)),
872 ];
873 let results = orchestrate_bulk_replace(
874 &engine_tx,
875 validated,
876 Duration::from_secs(2),
877 Duration::from_secs(2),
878 None,
879 )
880 .await;
881 drop(engine_tx);
882 let observed = engine_handle.await.expect("engine task");
883
884 assert_eq!(results.len(), 3);
885 assert!(results.iter().all(|r| r.success), "all 3 replaces succeed");
886 assert_eq!(observed.len(), 6, "3 cancels + 3 creates");
887
888 let cancel_idxs: Vec<usize> = observed
889 .iter()
890 .enumerate()
891 .filter(|(_, a)| **a == OrderAction::CancelOrder)
892 .map(|(i, _)| i)
893 .collect();
894 let create_idxs: Vec<usize> = observed
895 .iter()
896 .enumerate()
897 .filter(|(_, a)| **a == OrderAction::CreateOrder)
898 .map(|(i, _)| i)
899 .collect();
900 assert_eq!(cancel_idxs, vec![0, 1, 2], "cancels are first three");
901 assert_eq!(create_idxs, vec![3, 4, 5], "creates come after all cancels");
902 }
903
904 #[tokio::test]
909 async fn cancel_failure_skips_create_for_that_leg() {
910 let (engine_tx, engine_rx) = mpsc::channel(32);
911 let wallet = test_wallet(1);
912
913 let engine_handle = tokio::spawn({
914 async move {
915 run_mock_engine(engine_rx, move |req| {
916 let oid = req.message.info.order_id;
917 let status = match (&req.message.action, oid) {
918 (OrderAction::CancelOrder, Some(201)) => OrderUpdateStatus::Rejected,
920 (OrderAction::CancelOrder, _) => OrderUpdateStatus::Canceled,
921 (OrderAction::CreateOrder, _) => OrderUpdateStatus::Open,
922 _ => OrderUpdateStatus::Rejected,
923 };
924 Some(mk_response(oid, status, wallet))
925 })
926 .await
927 }
928 });
929
930 let validated = vec![
931 Ok(mk_validated(200)), Ok(mk_validated(201)), Ok(mk_validated(202)), ];
935 let results = orchestrate_bulk_replace(
936 &engine_tx,
937 validated,
938 Duration::from_secs(2),
939 Duration::from_secs(2),
940 None,
941 )
942 .await;
943 drop(engine_tx);
944 let observed = engine_handle.await.expect("engine task");
945
946 assert_eq!(observed.len(), 5);
948 assert_eq!(
949 observed
950 .iter()
951 .filter(|a| **a == OrderAction::CancelOrder)
952 .count(),
953 3
954 );
955 assert_eq!(
956 observed
957 .iter()
958 .filter(|a| **a == OrderAction::CreateOrder)
959 .count(),
960 2
961 );
962
963 assert!(results[0].success);
964 assert!(!results[1].success);
965 assert!(results[1]
966 .error
967 .as_deref()
968 .unwrap_or("")
969 .contains("cancel of existing order failed"));
970 assert!(results[2].success);
971 }
972
973 #[tokio::test]
977 async fn validation_failures_skip_engine_dispatch() {
978 let (engine_tx, engine_rx) = mpsc::channel(32);
979 let wallet = test_wallet(1);
980
981 let engine_handle = tokio::spawn({
982 async move {
983 run_mock_engine(engine_rx, move |req| {
984 let status = match req.message.action {
985 OrderAction::CancelOrder => OrderUpdateStatus::Canceled,
986 OrderAction::CreateOrder => OrderUpdateStatus::Open,
987 _ => OrderUpdateStatus::Rejected,
988 };
989 Some(mk_response(req.message.info.order_id, status, wallet))
990 })
991 .await
992 }
993 });
994
995 let validated = vec![
996 Ok(mk_validated(301)),
997 Err(mk_validation_error(1)),
998 Ok(mk_validated(303)),
999 ];
1000 let results = orchestrate_bulk_replace(
1001 &engine_tx,
1002 validated,
1003 Duration::from_secs(2),
1004 Duration::from_secs(2),
1005 None,
1006 )
1007 .await;
1008 drop(engine_tx);
1009 let observed = engine_handle.await.expect("engine task");
1010
1011 assert_eq!(observed.len(), 4);
1013 assert!(results[0].success);
1014 assert!(!results[1].success);
1015 assert_eq!(results[1].index, 1);
1016 assert!(results[1]
1017 .error
1018 .as_deref()
1019 .unwrap_or("")
1020 .contains("Signature verification failed"));
1021 assert!(results[2].success);
1022 }
1023
1024 #[tokio::test]
1031 async fn cancel_response_timeout_skips_create() {
1032 let (engine_tx, engine_rx) = mpsc::channel(32);
1033 let wallet = test_wallet(1);
1034
1035 let engine_handle = tokio::spawn({
1038 async move {
1039 run_mock_engine(engine_rx, move |req| {
1040 let oid = req.message.info.order_id;
1041 if oid == Some(401) && req.message.action == OrderAction::CancelOrder {
1042 return None; }
1044 let status = match req.message.action {
1045 OrderAction::CancelOrder => OrderUpdateStatus::Canceled,
1046 OrderAction::CreateOrder => OrderUpdateStatus::Open,
1047 _ => OrderUpdateStatus::Rejected,
1048 };
1049 Some(mk_response(oid, status, wallet))
1050 })
1051 .await
1052 }
1053 });
1054
1055 let validated = vec![
1056 Ok(mk_validated(400)),
1057 Ok(mk_validated(401)), Ok(mk_validated(402)),
1059 ];
1060 let short_deadline = Duration::from_millis(200);
1061 let start = std::time::Instant::now();
1062 let results = orchestrate_bulk_replace(
1063 &engine_tx,
1064 validated,
1065 short_deadline,
1066 Duration::from_secs(2),
1067 None,
1068 )
1069 .await;
1070 let elapsed = start.elapsed();
1071 drop(engine_tx);
1072 let observed = engine_handle.await.expect("engine task");
1073
1074 assert!(elapsed < Duration::from_secs(3), "elapsed {:?}", elapsed);
1077
1078 assert_eq!(observed.len(), 5);
1081 assert_eq!(
1082 observed
1083 .iter()
1084 .filter(|a| **a == OrderAction::CreateOrder)
1085 .count(),
1086 2,
1087 "create for the hung-cancel leg must be skipped"
1088 );
1089
1090 assert!(results[0].success);
1091 assert!(
1092 !results[1].success,
1093 "leg with silenced cancel response must not create"
1094 );
1095 assert!(
1096 results[1]
1097 .error
1098 .as_deref()
1099 .unwrap_or("")
1100 .contains("nonce consumption is unknown"),
1101 "unexpected timeout error: {:?}",
1102 results[1].error
1103 );
1104 assert!(results[2].success);
1105 }
1106
1107 #[tokio::test]
1112 async fn create_response_timeout_returns_cancel_only_result() {
1113 let (engine_tx, engine_rx) = mpsc::channel(32);
1114 let wallet = test_wallet(1);
1115
1116 let engine_handle = tokio::spawn({
1117 async move {
1118 run_mock_engine(engine_rx, move |req| {
1119 let oid = req.message.info.order_id;
1120 match req.message.action {
1121 OrderAction::CancelOrder => {
1122 Some(mk_response(oid, OrderUpdateStatus::Canceled, wallet))
1123 }
1124 OrderAction::CreateOrder => None, _ => None,
1126 }
1127 })
1128 .await
1129 }
1130 });
1131
1132 let validated = vec![Ok(mk_validated(501))];
1133 let short_deadline = Duration::from_millis(200);
1134 let results = orchestrate_bulk_replace(
1135 &engine_tx,
1136 validated,
1137 Duration::from_secs(2),
1138 short_deadline,
1139 None,
1140 )
1141 .await;
1142 drop(engine_tx);
1143 let _ = engine_handle.await;
1144
1145 assert_eq!(results.len(), 1);
1146 assert!(!results[0].success);
1147 assert_eq!(
1148 results[0].data.as_ref().map(|r| r.status),
1149 Some(OrderUpdateStatus::Canceled)
1150 );
1151 assert!(
1152 results[0]
1153 .error
1154 .as_deref()
1155 .unwrap_or("")
1156 .contains("create response did not arrive"),
1157 "unexpected error: {:?}",
1158 results[0].error
1159 );
1160 }
1161
1162 #[tokio::test]
1171 async fn halt_fired_during_phase_one_blocks_only_matching_leg_in_phase_two() {
1172 let (engine_tx, engine_rx) = mpsc::channel(32);
1173 let wallet = test_wallet(1);
1174
1175 let halt_state = std::sync::Arc::new(RwLock::new(TradingHaltState::new()));
1179 let halt_state_engine = halt_state.clone();
1180 let halted_symbol = "BTC-20260101-80000-C".to_string();
1181 let halted_symbol_engine = halted_symbol.clone();
1182
1183 let engine_handle = tokio::spawn({
1184 async move {
1185 let mut cancels_seen = 0usize;
1186 run_mock_engine(engine_rx, move |req| {
1187 let status = match req.message.action {
1188 OrderAction::CancelOrder => OrderUpdateStatus::Canceled,
1189 OrderAction::CreateOrder => OrderUpdateStatus::Open,
1190 _ => OrderUpdateStatus::Rejected,
1191 };
1192 if matches!(req.message.action, OrderAction::CancelOrder) {
1196 cancels_seen += 1;
1197 if cancels_seen == 3 {
1198 let halt_state = halt_state_engine.clone();
1199 let sym = halted_symbol_engine.clone();
1200 tokio::spawn(async move {
1201 halt_state.write().await.set_market_halt(
1202 &sym,
1203 true,
1204 "test halt".to_string(),
1205 "test".to_string(),
1206 );
1207 });
1208 }
1209 }
1210 Some(mk_response(req.message.info.order_id, status, wallet))
1211 })
1212 .await
1213 }
1214 });
1215
1216 let validated = vec![
1217 Ok(mk_validated(701)),
1218 Ok(mk_validated_with_symbol(702, halted_symbol.clone())),
1219 Ok(mk_validated(703)),
1220 ];
1221
1222 let results = orchestrate_bulk_replace(
1223 &engine_tx,
1224 validated,
1225 Duration::from_secs(2),
1226 Duration::from_secs(2),
1227 Some(&halt_state),
1228 )
1229 .await;
1230 drop(engine_tx);
1231 let observed = engine_handle.await.expect("engine task");
1232
1233 assert_eq!(
1236 observed
1237 .iter()
1238 .filter(|a| **a == OrderAction::CancelOrder)
1239 .count(),
1240 3
1241 );
1242 assert_eq!(
1243 observed
1244 .iter()
1245 .filter(|a| **a == OrderAction::CreateOrder)
1246 .count(),
1247 2
1248 );
1249
1250 assert!(results[0].success);
1251 assert!(!results[1].success);
1252 assert!(
1253 results[1]
1254 .error
1255 .as_deref()
1256 .unwrap_or("")
1257 .contains("Trading halted"),
1258 "unexpected error: {:?}",
1259 results[1].error
1260 );
1261 assert!(results[2].success);
1262 }
1263
1264 #[tokio::test]
1270 async fn cancel_enqueue_failure_reports_explicit_error() {
1271 let (engine_tx, engine_rx) = mpsc::channel(32);
1272 drop(engine_rx);
1274
1275 let validated = vec![Ok(mk_validated(801))];
1276 let results = orchestrate_bulk_replace(
1277 &engine_tx,
1278 validated,
1279 Duration::from_millis(200),
1280 Duration::from_millis(200),
1281 None,
1282 )
1283 .await;
1284
1285 assert_eq!(results.len(), 1);
1286 assert!(!results[0].success);
1287 assert!(
1288 results[0]
1289 .error
1290 .as_deref()
1291 .unwrap_or("")
1292 .contains("Failed to enqueue cancel"),
1293 "unexpected error: {:?}",
1294 results[0].error
1295 );
1296 }
1297
1298 #[tokio::test]
1303 async fn phase_deadline_bounds_total_bulk_latency() {
1304 let (engine_tx, engine_rx) = mpsc::channel(32);
1305 let wallet = test_wallet(1);
1306
1307 let engine_handle = tokio::spawn({
1310 async move {
1311 run_mock_engine(engine_rx, move |req| match req.message.action {
1312 OrderAction::CancelOrder => None,
1313 OrderAction::CreateOrder => Some(mk_response(
1314 req.message.info.order_id,
1315 OrderUpdateStatus::Open,
1316 wallet,
1317 )),
1318 _ => None,
1319 })
1320 .await
1321 }
1322 });
1323
1324 let validated = vec![
1325 Ok(mk_validated(901)),
1326 Ok(mk_validated(902)),
1327 Ok(mk_validated(903)),
1328 Ok(mk_validated(904)),
1329 Ok(mk_validated(905)),
1330 ];
1331 let cancel_deadline = Duration::from_millis(300);
1332 let create_deadline = Duration::from_millis(300);
1333 let start = std::time::Instant::now();
1334 let results = orchestrate_bulk_replace(
1335 &engine_tx,
1336 validated,
1337 cancel_deadline,
1338 create_deadline,
1339 None,
1340 )
1341 .await;
1342 let elapsed = start.elapsed();
1343 drop(engine_tx);
1344 let _ = engine_handle.await;
1345
1346 assert!(
1350 elapsed < cancel_deadline + create_deadline + Duration::from_millis(500),
1351 "elapsed {:?} exceeded phase-deadline budget",
1352 elapsed
1353 );
1354
1355 assert_eq!(results.len(), 5);
1356 assert!(results.iter().all(|r| !r.success));
1357 assert!(results.iter().all(|r| r
1358 .error
1359 .as_deref()
1360 .unwrap_or("")
1361 .contains("nonce consumption is unknown")));
1362 }
1363
1364 #[tokio::test]
1367 async fn result_order_matches_input_order() {
1368 let (engine_tx, engine_rx) = mpsc::channel(32);
1369 let wallet = test_wallet(1);
1370
1371 let engine_handle = tokio::spawn({
1372 async move {
1373 run_mock_engine(engine_rx, move |req| {
1374 let status = match req.message.action {
1375 OrderAction::CancelOrder => OrderUpdateStatus::Canceled,
1376 OrderAction::CreateOrder => OrderUpdateStatus::Open,
1377 _ => OrderUpdateStatus::Rejected,
1378 };
1379 Some(mk_response(req.message.info.order_id, status, wallet))
1380 })
1381 .await
1382 }
1383 });
1384
1385 let validated = vec![
1386 Ok(mk_validated(600)),
1387 Err(mk_validation_error(1)),
1388 Ok(mk_validated(602)),
1389 Err(mk_validation_error(3)),
1390 Ok(mk_validated(604)),
1391 ];
1392 let results = orchestrate_bulk_replace(
1393 &engine_tx,
1394 validated,
1395 Duration::from_secs(2),
1396 Duration::from_secs(2),
1397 None,
1398 )
1399 .await;
1400 drop(engine_tx);
1401 let _ = engine_handle.await;
1402
1403 assert_eq!(results.len(), 5);
1404 for (i, r) in results.iter().enumerate() {
1405 assert_eq!(r.index, i, "result {} has wrong index", i);
1406 }
1407 assert!(results[0].success);
1408 assert!(!results[1].success);
1409 assert!(results[2].success);
1410 assert!(!results[3].success);
1411 assert!(results[4].success);
1412 }
1413
1414 #[test]
1417 fn build_cancel_message_sets_action_and_order_id() {
1418 let v = mk_validated(999);
1419 let msg = build_cancel_message(&v);
1420 assert_eq!(msg.action, OrderAction::CancelOrder);
1421 assert_eq!(msg.info.order_id, Some(999));
1422 assert_eq!(msg.info.nonce, v.new_order_info.nonce);
1423 assert_eq!(msg.info.symbol, "");
1426 assert_eq!(msg.wallet, v.wallet);
1427 }
1428
1429 #[test]
1432 fn build_create_message_uses_new_order_info() {
1433 let v = mk_validated(1234);
1434 let msg = build_create_message(&v);
1435 assert_eq!(msg.action, OrderAction::CreateOrder);
1436 assert_eq!(msg.info.order_id, None);
1437 assert_eq!(msg.info.price, v.new_order_info.price);
1438 assert_eq!(msg.info.size, v.new_order_info.size);
1439 assert_eq!(msg.info.nonce, None);
1440 assert_eq!(msg.wallet, v.wallet);
1441 }
1442
1443 #[test]
1449 fn merge_leg_result_branches() {
1450 let wallet = test_wallet(1);
1451
1452 let ve = mk_validation_error(7);
1454 let r = merge_leg_result(7, Err(ve.clone()), None, None, false, false);
1455 assert_eq!(r.index, 7);
1456 assert!(!r.success);
1457 assert_eq!(r.error, ve.error);
1458
1459 let cancel = mk_response(Some(1), OrderUpdateStatus::Canceled, wallet);
1461 let create = mk_response(Some(10), OrderUpdateStatus::Open, wallet);
1462 let r = merge_leg_result(
1463 0,
1464 Ok(mk_validated(1)),
1465 Some(cancel),
1466 Some(create),
1467 false,
1468 false,
1469 );
1470 assert!(r.success);
1471 assert_eq!(r.data.unwrap().status, OrderUpdateStatus::Open);
1472
1473 let cancel = mk_response(Some(1), OrderUpdateStatus::Canceled, wallet);
1475 let r = merge_leg_result(0, Ok(mk_validated(1)), Some(cancel), None, false, false);
1476 assert!(!r.success);
1477 assert!(r
1478 .error
1479 .as_deref()
1480 .unwrap()
1481 .contains("create response did not arrive"));
1482
1483 let cancel = mk_response(Some(1), OrderUpdateStatus::Rejected, wallet);
1485 let r = merge_leg_result(0, Ok(mk_validated(1)), Some(cancel), None, false, false);
1486 assert!(!r.success);
1487 assert!(r
1488 .error
1489 .as_deref()
1490 .unwrap()
1491 .contains("cancel of existing order failed"));
1492
1493 let cancel = mk_response(Some(1), OrderUpdateStatus::Canceled, wallet);
1495 let r = merge_leg_result(0, Ok(mk_validated(1)), Some(cancel), None, false, true);
1496 assert!(!r.success);
1497 assert!(r.error.as_deref().unwrap().contains("Trading halted"));
1498
1499 let r = merge_leg_result(0, Ok(mk_validated(1)), None, None, true, false);
1501 assert!(!r.success);
1502 assert!(r
1503 .error
1504 .as_deref()
1505 .unwrap()
1506 .contains("Failed to enqueue cancel"));
1507
1508 let r = merge_leg_result(0, Ok(mk_validated(1)), None, None, false, false);
1511 assert!(!r.success);
1512 assert!(r
1513 .error
1514 .as_deref()
1515 .unwrap()
1516 .contains("nonce consumption is unknown"));
1517 }
1518
1519 #[tokio::test]
1544 async fn mmp_flag_propagates_to_create_and_not_to_cancel() {
1545 let (engine_tx, engine_rx) = mpsc::channel(32);
1546 let wallet = test_wallet(1);
1547
1548 let engine_handle = tokio::spawn({
1549 async move {
1550 let mut rx: mpsc::Receiver<hypercall_runtime_api::UnifiedEngineRequest> = engine_rx;
1553 let mut observed: Vec<(OrderAction, Option<u64>, bool)> = Vec::new();
1554 while let Some(req) = rx.recv().await {
1555 let action = req.message.action.clone();
1556 let oid = req.message.info.order_id;
1557 let mmp = req.message.info.mmp_enabled;
1558 observed.push((action.clone(), oid, mmp));
1559 let status = match action {
1560 OrderAction::CancelOrder => OrderUpdateStatus::Canceled,
1561 OrderAction::CreateOrder => OrderUpdateStatus::Filled,
1562 _ => OrderUpdateStatus::Rejected,
1563 };
1564 let mut resp = mk_response(oid, status, wallet);
1568 if matches!(action, OrderAction::CreateOrder) && mmp {
1569 resp.mmp_triggered = true;
1570 }
1571 let _ = req.response_tx.send(resp).await;
1572 }
1573 observed
1574 }
1575 });
1576
1577 let validated = vec![
1578 Ok(mk_validated(501)), Ok(mk_validated_mmp(502)), ];
1581 let results = orchestrate_bulk_replace(
1582 &engine_tx,
1583 validated,
1584 Duration::from_secs(2),
1585 Duration::from_secs(2),
1586 None,
1587 )
1588 .await;
1589 drop(engine_tx);
1590 let observed = engine_handle.await.expect("engine task");
1591
1592 assert_eq!(observed.len(), 4, "2 cancels + 2 creates");
1593
1594 let creates: Vec<_> = observed
1596 .iter()
1597 .filter(|(a, _, _)| *a == OrderAction::CreateOrder)
1598 .collect();
1599 assert_eq!(creates.len(), 2);
1600 assert!(
1602 creates.iter().any(|(_, _, mmp)| !*mmp),
1603 "leg 501 create must have mmp_enabled=false"
1604 );
1605 assert!(
1606 creates.iter().any(|(_, _, mmp)| *mmp),
1607 "leg 502 create must have mmp_enabled=true"
1608 );
1609
1610 let cancels: Vec<_> = observed
1614 .iter()
1615 .filter(|(a, _, _)| *a == OrderAction::CancelOrder)
1616 .collect();
1617 assert_eq!(cancels.len(), 2);
1618 assert!(
1619 cancels.iter().all(|(_, _, mmp)| !*mmp),
1620 "cancels must never carry mmp_enabled=true"
1621 );
1622
1623 assert_eq!(results.len(), 2);
1626 assert!(results[0].success);
1627 let leg0 = results[0].data.as_ref().expect("leg 0 has response data");
1628 assert!(
1629 !leg0.mmp_triggered,
1630 "leg 0 (mmp_enabled=false) must not report mmp_triggered"
1631 );
1632 assert!(results[1].success);
1633 let leg1 = results[1].data.as_ref().expect("leg 1 has response data");
1634 assert!(
1635 leg1.mmp_triggered,
1636 "leg 1 (mmp_enabled=true) must surface engine's mmp_triggered=true"
1637 );
1638 }
1639}