Skip to main content

hypercall_api/handlers/
replace_orders.rs

1use std::collections::{HashMap, HashSet};
2use std::str::FromStr;
3use std::time::Instant;
4
5use rust_decimal::prelude::ToPrimitive;
6use rust_decimal::Decimal;
7use rust_decimal_macros::dec;
8use utoipa::ToSchema;
9
10use crate::error::ApiError;
11use crate::middleware::SignerContext;
12use crate::request_auth::verify_request;
13use crate::sonic_json::SonicJson;
14use crate::trading_halt::TradingHaltState;
15use axum::extract::State;
16use hypercall_runtime_api::increment_pending_requests;
17use hypercall_types::utils::get_timestamp_millis;
18use hypercall_types::{
19    to_contract_units_decimal, validate_price_precision, ReplaceOrderRequest, WalletAddress,
20    MAX_PRICE_SIGNIFICANT_FIGURES,
21};
22use hypercall_types::{
23    OrderAction, OrderActionMessage, OrderInfo, OrderUpdateMessage, OrderUpdateStatus, TimeInForce,
24};
25use hypercall_types::{ParsedOptionSymbol, Side};
26use serde::{Deserialize, Serialize};
27use tokio::sync::{mpsc, RwLock};
28use tokio::time::timeout;
29use tracing::Instrument;
30use uuid::Uuid;
31
32#[cfg(feature = "otel-tracing")]
33use tracing_opentelemetry::OpenTelemetrySpanExt;
34
35use super::{ensure_order_creation_allowed, AppState, ENGINE_RESPONSE_TIMEOUT};
36
37pub use super::bulk_orders::BulkOrderResult;
38use super::bulk_orders::MAX_BULK_ORDER_SIZE;
39
40#[derive(Debug, Deserialize, ToSchema)]
41pub struct BulkReplaceOrderRequest {
42    /// Array of replace requests (max 50)
43    pub replacements: Vec<ReplaceOrderRequest>,
44}
45
46#[derive(Debug, Serialize, ToSchema)]
47pub struct BulkReplaceOrderResponse {
48    /// Results for each replace in the request
49    pub results: Vec<BulkOrderResult>,
50}
51
52/// Atomically cancel an existing order and place a new one.
53///
54/// The old order is canceled first. If the cancel fails (order not found,
55/// already filled), the new order is NOT placed and the endpoint returns
56/// an error. If the cancel succeeds, the new order is placed in the same
57/// engine tick with no gap.
58#[utoipa::path(
59    put,
60    path = "/order",
61    request_body = ReplaceOrderRequest,
62    responses(
63        (status = 200, description = "Replace result (new order status)", body = OrderUpdateMessage),
64        (status = 400, description = "Invalid request"),
65        (status = 401, description = "Unauthorized"),
66        (status = 500, description = "Internal server error")
67    ),
68    security(("eip712_signature" = [])),
69    tag = "Trading"
70)]
71pub async fn replace_order(
72    State(state): State<AppState>,
73    signer_ctx: SignerContext,
74    SonicJson(request): SonicJson<ReplaceOrderRequest>,
75) -> Result<SonicJson<OrderUpdateMessage>, ApiError> {
76    let span = tracing::info_span!(
77        "api.replace_order",
78        wallet = %signer_ctx.wallet_address,
79        cancel_order_id = %request.order_id,
80        symbol = %request.symbol,
81        side = ?request.side,
82        size = %request.size,
83        price = %request.price,
84        client_id = ?request.client_id,
85        action = "ReplaceOrder",
86    );
87
88    async move {
89        // Parse price and size
90        let price: Decimal = Decimal::from_str(&request.price).map_err(|_| {
91            ApiError::bad_request(format!("Invalid price format: {}", request.price))
92        })?;
93        let size: Decimal = Decimal::from_str(&request.size)
94            .map_err(|_| ApiError::bad_request(format!("Invalid size format: {}", request.size)))?;
95
96        // Validate symbol
97        let _parsed_symbol = ParsedOptionSymbol::from_symbol(&request.symbol)
98            .map_err(|e| ApiError::bad_request(format!("Invalid symbol: {}", e)))?;
99
100        ensure_order_creation_allowed(&state, &request.symbol).await?;
101
102        if price <= dec!(0) {
103            return Err(ApiError::bad_request("Price must be positive"));
104        }
105        if size <= dec!(0) {
106            return Err(ApiError::bad_request("Size must be positive"));
107        }
108
109        let price_f64 = price
110            .to_f64()
111            .ok_or_else(|| ApiError::bad_request("Price value out of range"))?;
112        if let Err(e) = validate_price_precision(price_f64, MAX_PRICE_SIGNIFICANT_FIGURES) {
113            return Err(ApiError::bad_request(format!(
114                "Invalid price precision: {}",
115                e
116            )));
117        }
118
119        tracing::info!(
120            "Replacing order {} for wallet: {} (signed by: {}), new symbol: {}",
121            request.order_id,
122            signer_ctx.wallet_address,
123            signer_ctx.signer_address,
124            request.symbol
125        );
126
127        // Build OrderInfo with cancel target in order_id and new order details in the rest
128        let order_info = OrderInfo {
129            symbol: request.symbol.clone(),
130            price,
131            size: to_contract_units_decimal(&request.symbol, size),
132            side: request.side,
133            tif: request.tif,
134            client_id: request.client_id,
135            order_id: Some(request.order_id),
136            is_perp: false,
137            underlying: None,
138            reduce_only: None,
139            nonce: Some(request.nonce),
140            signature: None,
141            mmp_enabled: request.mmp_enabled,
142            builder_code_address: request.builder_code_address,
143        };
144
145        let order_action_msg = OrderActionMessage {
146            timestamp: get_timestamp_millis(),
147            info: order_info,
148            action: OrderAction::ReplaceOrder,
149            wallet: request.wallet,
150            api_wallet_address: Some(signer_ctx.signer_address),
151            mmp_triggered: false,
152            request_id: Some(Uuid::now_v7().to_string()),
153        };
154
155        let (response_tx, mut response_rx) = mpsc::channel(1);
156
157        let engine_request = hypercall_runtime_api::UnifiedEngineRequest {
158            message: order_action_msg,
159            response_tx,
160            enqueued_at: Instant::now(),
161            #[cfg(feature = "otel-tracing")]
162            trace_context: Some(tracing::Span::current().context()),
163        };
164
165        increment_pending_requests();
166        state
167            .order_sender
168            .send(engine_request)
169            .await
170            .map_err(|_| ApiError::internal_error("Failed to send replace order to engine"))?;
171
172        let response = match timeout(ENGINE_RESPONSE_TIMEOUT, response_rx.recv()).await {
173            Ok(Some(resp)) => resp,
174            Ok(None) => {
175                return Err(ApiError::internal_error("No response from engine"));
176            }
177            Err(_) => {
178                return Err(ApiError::gateway_timeout(
179                    "Timeout waiting for engine response",
180                ));
181            }
182        };
183
184        Ok(SonicJson(response))
185    }
186    .instrument(span)
187    .await
188}
189
190/// Replace multiple orders using cancel-all-then-create-all ordering.
191///
192/// Per-leg semantics mirror PUT /order (cancel old + place new), but across
193/// a batch every leg's cancel is enqueued before any create, so the engine
194/// cannot match a new leg against a sibling's still-resting opposite. This
195/// eliminates the self-trade-prevention kill path that fired when the bulk
196/// dispatched independent ReplaceOrder commands. A per-leg response is
197/// returned in original index order; a leg whose cancel failed (already
198/// filled / not found) is skipped for the create phase and returned with
199/// the cancel rejection details. Not atomic across the batch: the gap
200/// between a leg's cancel committing and its create landing is per-leg.
201/// See engineering-docs/docs/flows/order-lifecycle.mdx for tradeoffs vs.
202/// the single-command PUT /order path.
203#[utoipa::path(
204    put,
205    path = "/bulk_order",
206    request_body = BulkReplaceOrderRequest,
207    responses(
208        (status = 200, description = "Bulk replace results (per-leg, in input order)", body = BulkReplaceOrderResponse),
209        (status = 400, description = "Invalid request (e.g., too many replacements)"),
210        (status = 500, description = "Internal server error")
211    ),
212    security(("eip712_signature" = [])),
213    tag = "Trading"
214)]
215/// Bulk replace orders.
216///
217/// Executes **cancel-all-then-create-all** ordering across the batch: every
218/// leg's cancel is enqueued before any create, so the engine is guaranteed
219/// to process all cancels (removing every old leg from the book) before any
220/// new order matches. Without this, the engine's FIFO processing of
221/// independent `OrderAction::ReplaceOrder` commands meant each command's
222/// phase-2 place could match against a sibling's still-resting opposite
223/// leg, triggering self-trade prevention and losing the new leg entirely.
224///
225/// The path is:
226///   1. `validate_replace_request` per leg (sig / auth / parse / precision).
227///   2. `orchestrate_bulk_replace` dispatches all `CancelOrder` commands,
228///      awaits responses, then dispatches `CreateOrder` commands only for
229///      legs whose cancel actually reached `Canceled`.
230///   3. Per-leg results merge: prefer the create response; fall back to a
231///      cancel-only result when the cancel landed but the create didn't.
232///
233/// Semantics vs. single-command `ReplaceOrder`: the `PUT /order` single-
234/// replace path dispatches **one** `OrderAction::ReplaceOrder` engine command
235/// that handles the cancel and place atomically within one engine tick and
236/// one journal entry (`src/rsm/unified_engine/order_routing.rs:process_replace_order`).
237/// The bulk path, by contrast, issues separate `CancelOrder` and `CreateOrder`
238/// commands (2N journal entries per bulk of N), so there is a per-leg gap
239/// between a leg's cancel committing and its create landing. Real-world MMs
240/// re-quote every cycle, so a missed create is recovered on the next tick.
241/// The `PUT /order` single-replace endpoint is unchanged.
242pub async fn bulk_replace_order(
243    State(state): State<AppState>,
244    SonicJson(request): SonicJson<BulkReplaceOrderRequest>,
245) -> Result<SonicJson<BulkReplaceOrderResponse>, ApiError> {
246    if request.replacements.len() > MAX_BULK_ORDER_SIZE {
247        return Err(ApiError::bad_request(format!(
248            "Bulk replace request exceeds max size: {} > {}",
249            request.replacements.len(),
250            MAX_BULK_ORDER_SIZE
251        )));
252    }
253
254    let n = request.replacements.len();
255    tracing::info!(
256        n,
257        "Processing bulk replace (cancel-all-then-create-all ordering)"
258    );
259
260    // ---- Phase 0: per-leg validation (sig, auth, parsing) ----
261    let mut validated: Vec<Result<ValidatedReplace, BulkOrderResult>> = Vec::with_capacity(n);
262    for (index, replace_req) in request.replacements.into_iter().enumerate() {
263        validated.push(validate_replace_request(&state, replace_req, index).await);
264    }
265
266    let results = orchestrate_bulk_replace(
267        &state.order_sender,
268        validated,
269        BULK_REPLACE_PHASE_DEADLINE,
270        BULK_REPLACE_PHASE_DEADLINE,
271        Some(&state.trading_halt),
272    )
273    .await;
274
275    Ok(SonicJson(BulkReplaceOrderResponse { results }))
276}
277
278/// Per-phase deadline for the bulk-replace handler. Unlike
279/// `ENGINE_RESPONSE_TIMEOUT` (which is the per-message budget on the
280/// singular endpoints), this is the budget for *all* legs in a phase
281/// combined. Keeps worst-case bulk latency bounded even if a single
282/// leg's response is hung. Phase 1 and Phase 2 each get their own
283/// deadline of this size, so a 50-leg bulk is bounded by roughly
284/// 2 × this duration end-to-end.
285const BULK_REPLACE_PHASE_DEADLINE: std::time::Duration = std::time::Duration::from_secs(5);
286
287/// Drive the cancel-all-then-create-all dispatch against a provided engine
288/// request sender.
289///
290/// - `cancel_phase_deadline` / `create_phase_deadline`: single budget for
291///   *all* legs in each phase, measured from when that phase starts reading
292///   responses. A 50-leg bulk with one hung cancel is bounded by at most
293///   `cancel_phase_deadline + create_phase_deadline`, not
294///   `N × per_leg_timeout`.
295///
296/// - `trading_halt`: optional reference to the global halt state. When
297///   provided, a fresh snapshot is read **after phase 1 completes** (not
298///   before it starts) so halts that fire while cancels are in flight
299///   are caught before any create is dispatched. Tests that don't
300///   exercise halts can pass `None`.
301///
302/// - Phase 2 dispatches a create only after observing a `Canceled`
303///   response for that leg. The phase-1 cancel carries the signed nonce
304///   and the phase-2 create is engine-internal, so a timeout/closed
305///   response channel is treated as an unknown nonce-consumption outcome
306///   and does not get a create.
307async fn orchestrate_bulk_replace(
308    order_sender: &mpsc::Sender<hypercall_runtime_api::UnifiedEngineRequest>,
309    validated: Vec<Result<ValidatedReplace, BulkOrderResult>>,
310    cancel_phase_deadline: std::time::Duration,
311    create_phase_deadline: std::time::Duration,
312    trading_halt: Option<&std::sync::Arc<RwLock<TradingHaltState>>>,
313) -> Vec<BulkOrderResult> {
314    // ---- Phase 1: dispatch all CancelOrder commands ----
315    let mut cancel_rxs: Vec<(usize, mpsc::Receiver<OrderUpdateMessage>)> = Vec::new();
316    let mut cancel_enqueue_failed: HashSet<usize> = HashSet::new();
317    for (i, v) in validated.iter().enumerate() {
318        let Ok(v) = v else { continue };
319        match dispatch_to_engine(order_sender, build_cancel_message(v)).await {
320            Ok(rx) => {
321                cancel_rxs.push((i, rx));
322            }
323            Err(_) => {
324                tracing::error!(index = i, "Failed to enqueue cancel to engine");
325                cancel_enqueue_failed.insert(i);
326            }
327        }
328    }
329
330    // Phase-level deadline: start the clock *after* the last cancel was
331    // enqueued, and share it across all per-leg recv()s.
332    let mut cancel_responses: HashMap<usize, OrderUpdateMessage> = HashMap::new();
333    let cancel_deadline = tokio::time::Instant::now() + cancel_phase_deadline;
334    for (i, mut rx) in cancel_rxs {
335        let now = tokio::time::Instant::now();
336        let remaining = cancel_deadline.saturating_duration_since(now);
337        match timeout(remaining, rx.recv()).await {
338            Ok(Some(resp)) => {
339                cancel_responses.insert(i, resp);
340            }
341            Ok(None) => tracing::error!(index = i, "No cancel response from engine"),
342            Err(_) => tracing::warn!(
343                index = i,
344                "Cancel response not received before phase deadline; \
345                 skipping create because nonce consumption is unknown"
346            ),
347        }
348    }
349
350    // Re-check trading-halt *after* phase 1 completes (cancels have all
351    // been dispatched and their responses awaited). A halt can fire at
352    // any point during that window, so a snapshot taken before phase 1
353    // would be stale by now; reading it here guarantees that any leg
354    // whose symbol became halted during phase 1 is blocked from phase 2
355    // before we send any `CreateOrder`.
356    let mut halted_after_cancel: HashSet<usize> = HashSet::new();
357    if let Some(halt_lock) = trading_halt {
358        let halt_state = halt_lock.read().await;
359        for (i, v) in validated.iter().enumerate() {
360            let Ok(v) = v else { continue };
361            if halt_state
362                .blocked_reason(&v.new_order_info.symbol)
363                .is_some()
364            {
365                halted_after_cancel.insert(i);
366            }
367        }
368    }
369
370    // ---- Phase 2: dispatch CreateOrder ----
371    //
372    // Skip create for a leg when:
373    //   * validation failed (no ValidatedReplace to build the message from), OR
374    //   * cancel dispatch to the engine channel failed outright (we don't
375    //     know the engine state), OR
376    //   * cancel response came back as something other than `Canceled`
377    //     (engine saw the cancel request and rejected it — the old order
378    //     already filled, was already canceled, or the order id was bogus),
379    //     OR
380    //   * cancel response did not arrive before the phase deadline. The
381    //     cancel is the nonce-consuming command. Since `build_create_message`
382    //     clears the nonce for phase 2, creating after an unknown phase-1
383    //     outcome could turn a replay into an engine-internal create.
384    //     OR
385    //   * the symbol became trading-halted between phases.
386    let mut create_rxs: Vec<(usize, mpsc::Receiver<OrderUpdateMessage>)> = Vec::new();
387    for (i, v) in validated.iter().enumerate() {
388        let Ok(v) = v else { continue };
389        if cancel_enqueue_failed.contains(&i) {
390            continue;
391        }
392        if halted_after_cancel.contains(&i) {
393            continue;
394        }
395        let cancel_allows_create = match cancel_responses.get(&i) {
396            Some(resp) => resp.status == OrderUpdateStatus::Canceled,
397            None => false,
398        };
399        if !cancel_allows_create {
400            continue;
401        }
402        match dispatch_to_engine(order_sender, build_create_message(v)).await {
403            Ok(rx) => create_rxs.push((i, rx)),
404            Err(_) => {
405                tracing::error!(index = i, "Failed to enqueue create to engine after cancel");
406            }
407        }
408    }
409
410    let mut create_responses: HashMap<usize, OrderUpdateMessage> = HashMap::new();
411    let create_deadline = tokio::time::Instant::now() + create_phase_deadline;
412    for (i, mut rx) in create_rxs {
413        let now = tokio::time::Instant::now();
414        let remaining = create_deadline.saturating_duration_since(now);
415        match timeout(remaining, rx.recv()).await {
416            Ok(Some(resp)) => {
417                create_responses.insert(i, resp);
418            }
419            Ok(None) => tracing::error!(index = i, "No create response from engine"),
420            Err(_) => tracing::warn!(
421                index = i,
422                "Create response not received before phase deadline"
423            ),
424        }
425    }
426
427    // ---- Phase 3: merge per-leg results in original order ----
428    validated
429        .into_iter()
430        .enumerate()
431        .map(|(i, v)| {
432            merge_leg_result(
433                i,
434                v,
435                cancel_responses.remove(&i),
436                create_responses.remove(&i),
437                cancel_enqueue_failed.contains(&i),
438                halted_after_cancel.contains(&i),
439            )
440        })
441        .collect()
442}
443
444fn merge_leg_result(
445    index: usize,
446    validated: Result<ValidatedReplace, BulkOrderResult>,
447    cancel: Option<OrderUpdateMessage>,
448    create: Option<OrderUpdateMessage>,
449    cancel_enqueue_failed: bool,
450    halted_after_cancel: bool,
451) -> BulkOrderResult {
452    if let Err(validation_failure) = validated {
453        return validation_failure;
454    }
455    if cancel_enqueue_failed {
456        return BulkOrderResult {
457            index,
458            success: false,
459            data: None,
460            error: Some("Failed to enqueue cancel to engine (queue closed or full)".to_string()),
461        };
462    }
463    if let Some(create_resp) = create {
464        let success = matches!(
465            create_resp.status,
466            OrderUpdateStatus::Acked
467                | OrderUpdateStatus::Open
468                | OrderUpdateStatus::Filled
469                | OrderUpdateStatus::PartiallyFilled
470        );
471        return BulkOrderResult {
472            index,
473            success,
474            data: Some(create_resp),
475            error: None,
476        };
477    }
478    if halted_after_cancel {
479        return BulkOrderResult {
480            index,
481            success: false,
482            data: cancel,
483            error: Some(
484                "Trading halted for this symbol after cancel phase; create was skipped".to_string(),
485            ),
486        };
487    }
488    if let Some(cancel_resp) = cancel {
489        return if cancel_resp.status == OrderUpdateStatus::Canceled {
490            BulkOrderResult {
491                index,
492                success: false,
493                data: Some(cancel_resp),
494                error: Some(
495                    "Cancel succeeded but create response did not arrive before the phase deadline"
496                        .to_string(),
497                ),
498            }
499        } else {
500            BulkOrderResult {
501                index,
502                success: false,
503                data: Some(cancel_resp),
504                error: Some("Replace aborted: cancel of existing order failed".to_string()),
505            }
506        };
507    }
508    // Cancel enqueued, no response by phase deadline. Do not dispatch the
509    // engine-internal create because the nonce-consuming cancel outcome is
510    // unknown.
511    BulkOrderResult {
512        index,
513        success: false,
514        data: None,
515        error: Some(
516            "Cancel enqueued but no cancel response was received before the phase deadline; create skipped because nonce consumption is unknown"
517                .to_string(),
518        ),
519    }
520}
521
522/// A replace request that has passed signature / authorization / parsing
523/// validation and is ready to be turned into a `(CancelOrder, CreateOrder)`
524/// pair by the bulk-replace orchestrator. Splitting validation from
525/// dispatch lets `orchestrate_bulk_replace` be unit-tested with a mocked
526/// engine sender.
527#[derive(Debug, Clone)]
528struct ValidatedReplace {
529    wallet: WalletAddress,
530    signer_address: WalletAddress,
531    old_order_id: u64,
532    /// `new_order_info` already carries `mmp_enabled`, `builder_code_address`,
533    /// the size converted to contract units, and `order_id = None` so the
534    /// engine allocates a fresh id at place time.
535    new_order_info: OrderInfo,
536}
537
538/// Run signature verification, agent authorization, symbol parsing,
539/// trading-allowed checks, and price/size parsing + precision checks.
540/// Returns a `ValidatedReplace` on success, or a pre-populated
541/// `BulkOrderResult` carrying the rejection reason on any failure.
542async fn validate_replace_request(
543    state: &AppState,
544    req: ReplaceOrderRequest,
545    index: usize,
546) -> Result<ValidatedReplace, BulkOrderResult> {
547    let authorized = verify_request(
548        state.agent_auth.as_ref(),
549        &req,
550        state.runtime_config.signing_chain_id,
551    )
552    .map_err(|e| BulkOrderResult {
553        index,
554        success: false,
555        data: None,
556        error: Some(e.to_string()),
557    })?;
558
559    if authorized.signer.wallet_address != req.wallet {
560        return Err(BulkOrderResult {
561            index,
562            success: false,
563            data: None,
564            error: Some("Verified wallet does not match request wallet".to_string()),
565        });
566    }
567
568    ParsedOptionSymbol::from_symbol(&req.symbol).map_err(|e| BulkOrderResult {
569        index,
570        success: false,
571        data: None,
572        error: Some(format!("Invalid symbol: {}", e)),
573    })?;
574
575    ensure_order_creation_allowed(state, &req.symbol)
576        .await
577        .map_err(|err| BulkOrderResult {
578            index,
579            success: false,
580            data: None,
581            error: Some(err.message),
582        })?;
583
584    let price = Decimal::from_str(&req.price).map_err(|_| BulkOrderResult {
585        index,
586        success: false,
587        data: None,
588        error: Some(format!("Invalid price format: {}", req.price)),
589    })?;
590    let size = Decimal::from_str(&req.size).map_err(|_| BulkOrderResult {
591        index,
592        success: false,
593        data: None,
594        error: Some(format!("Invalid size format: {}", req.size)),
595    })?;
596    if price <= dec!(0) {
597        return Err(BulkOrderResult {
598            index,
599            success: false,
600            data: None,
601            error: Some("Price must be greater than 0".to_string()),
602        });
603    }
604    if size <= dec!(0) {
605        return Err(BulkOrderResult {
606            index,
607            success: false,
608            data: None,
609            error: Some("Size must be greater than 0".to_string()),
610        });
611    }
612    let price_f64 = price.to_f64().ok_or_else(|| BulkOrderResult {
613        index,
614        success: false,
615        data: None,
616        error: Some("Price value out of range".to_string()),
617    })?;
618    validate_price_precision(price_f64, MAX_PRICE_SIGNIFICANT_FIGURES).map_err(|e| {
619        BulkOrderResult {
620            index,
621            success: false,
622            data: None,
623            error: Some(format!("Price validation failed: {}", e)),
624        }
625    })?;
626
627    let new_order_info = OrderInfo {
628        symbol: req.symbol.clone(),
629        price,
630        size: to_contract_units_decimal(&req.symbol, size),
631        side: req.side,
632        tif: req.tif,
633        client_id: req.client_id,
634        order_id: None,
635        is_perp: false,
636        underlying: None,
637        reduce_only: None,
638        nonce: Some(req.nonce),
639        signature: None,
640        mmp_enabled: req.mmp_enabled,
641        builder_code_address: req.builder_code_address,
642    };
643
644    Ok(ValidatedReplace {
645        wallet: req.wallet,
646        signer_address: authorized.signer.signer_address,
647        old_order_id: req.order_id,
648        new_order_info,
649    })
650}
651
652/// Build an `OrderAction::CancelOrder` message targeting this replace's
653/// pre-existing order. `symbol` is intentionally empty — the engine resolves
654/// it from `order_id` in its per-wallet order index (see how
655/// `cancel_order()` does it in this file).
656fn build_cancel_message(v: &ValidatedReplace) -> OrderActionMessage {
657    let cancel_info = OrderInfo {
658        symbol: String::new(),
659        price: dec!(0),
660        size: dec!(0),
661        side: Side::Buy,
662        tif: TimeInForce::GTC,
663        client_id: None,
664        order_id: Some(v.old_order_id),
665        is_perp: false,
666        underlying: None,
667        reduce_only: None,
668        // Bulk replace splits one signed ReplaceOrder into cancel and
669        // create commands. Consume the signed nonce on the cancel command
670        // so a replay is rejected before the old order can be removed.
671        nonce: v.new_order_info.nonce,
672        signature: None,
673        mmp_enabled: false,
674        builder_code_address: None,
675    };
676    OrderActionMessage {
677        timestamp: get_timestamp_millis(),
678        info: cancel_info,
679        action: OrderAction::CancelOrder,
680        wallet: v.wallet,
681        api_wallet_address: Some(v.signer_address),
682        mmp_triggered: false,
683        request_id: Some(Uuid::now_v7().to_string()),
684    }
685}
686
687fn build_create_message(v: &ValidatedReplace) -> OrderActionMessage {
688    let mut info = v.new_order_info.clone();
689    info.nonce = None;
690    OrderActionMessage {
691        timestamp: get_timestamp_millis(),
692        info,
693        action: OrderAction::CreateOrder,
694        wallet: v.wallet,
695        api_wallet_address: Some(v.signer_address),
696        mmp_triggered: false,
697        request_id: Some(Uuid::now_v7().to_string()),
698    }
699}
700
701/// Send an `OrderActionMessage` through the engine queue. Returns the
702/// response receiver. Returns `Err(())` if the engine queue is closed.
703async fn dispatch_to_engine(
704    order_sender: &mpsc::Sender<hypercall_runtime_api::UnifiedEngineRequest>,
705    msg: OrderActionMessage,
706) -> Result<mpsc::Receiver<OrderUpdateMessage>, ()> {
707    let (response_tx, response_rx) = mpsc::channel(1);
708    let engine_request = hypercall_runtime_api::UnifiedEngineRequest {
709        message: msg,
710        response_tx,
711        enqueued_at: Instant::now(),
712        #[cfg(feature = "otel-tracing")]
713        trace_context: Some(tracing::Span::current().context()),
714    };
715    increment_pending_requests();
716    order_sender.send(engine_request).await.map_err(|_| ())?;
717    Ok(response_rx)
718}
719
720#[cfg(test)]
721mod bulk_replace_tests {
722    //! Unit tests for `orchestrate_bulk_replace` and its helpers.
723    //!
724    //! These tests wire up a mock engine task that reads the shared
725    //! `UnifiedEngineRequest` channel the handler would push into and
726    //! responds per-request with a scripted `OrderUpdateStatus`. The
727    //! orchestrator's job is:
728    //!   1. Send all `CancelOrder` messages before any `CreateOrder`.
729    //!   2. Skip the create phase for any leg whose cancel didn't reach
730    //!      `Canceled`.
731    //!   3. Pass through validation errors unchanged.
732    //!   4. Merge per-leg results in original index order.
733    //!
734    //! Each test below exercises one of these invariants.
735    use super::*;
736    use rust_decimal_macros::dec;
737    use std::time::Duration;
738    use tokio::sync::mpsc;
739
740    fn test_wallet(id: u8) -> WalletAddress {
741        let mut bytes = [0u8; 20];
742        bytes[19] = id;
743        WalletAddress::from(bytes)
744    }
745
746    fn mk_validated(old_order_id: u64) -> ValidatedReplace {
747        mk_validated_with(old_order_id, "BTC-20260101-75000-C".to_string(), false)
748    }
749
750    fn mk_validated_with_symbol(old_order_id: u64, symbol: String) -> ValidatedReplace {
751        mk_validated_with(old_order_id, symbol, false)
752    }
753
754    fn mk_validated_mmp(old_order_id: u64) -> ValidatedReplace {
755        mk_validated_with(old_order_id, "BTC-20260101-75000-C".to_string(), true)
756    }
757
758    fn mk_validated_with(old_order_id: u64, symbol: String, mmp_enabled: bool) -> ValidatedReplace {
759        let order_info = OrderInfo {
760            symbol,
761            price: dec!(100),
762            size: dec!(1),
763            side: Side::Buy,
764            tif: TimeInForce::GTC,
765            client_id: None,
766            order_id: None,
767            is_perp: false,
768            underlying: None,
769            reduce_only: None,
770            nonce: Some(1_700_000_000_000 + old_order_id),
771            signature: None,
772            mmp_enabled,
773            builder_code_address: None,
774        };
775        ValidatedReplace {
776            wallet: test_wallet(1),
777            signer_address: test_wallet(2),
778            old_order_id,
779            new_order_info: order_info,
780        }
781    }
782
783    fn mk_validation_error(index: usize) -> BulkOrderResult {
784        BulkOrderResult {
785            index,
786            success: false,
787            data: None,
788            error: Some("Signature verification failed: test".to_string()),
789        }
790    }
791
792    fn mk_response(
793        order_id: Option<u64>,
794        status: OrderUpdateStatus,
795        wallet: WalletAddress,
796    ) -> OrderUpdateMessage {
797        OrderUpdateMessage {
798            timestamp: 0,
799            info: OrderInfo {
800                symbol: "BTC-20260101-75000-C".to_string(),
801                price: dec!(100),
802                size: dec!(1),
803                side: Side::Buy,
804                tif: TimeInForce::GTC,
805                client_id: None,
806                order_id,
807                is_perp: false,
808                underlying: None,
809                reduce_only: None,
810                nonce: None,
811                signature: None,
812                mmp_enabled: false,
813                builder_code_address: None,
814            },
815            status,
816            reason: None,
817            filled_size: dec!(0),
818            order_id,
819            wallet_address: wallet,
820            mmp_triggered: false,
821            request_id: None,
822        }
823    }
824
825    /// Drives the mock engine: consume requests from the shared receiver,
826    /// record the `OrderAction` of each, reply using `responder`. Returns
827    /// the ordered list of actions the orchestrator enqueued.
828    async fn run_mock_engine<F>(
829        mut engine_rx: mpsc::Receiver<hypercall_runtime_api::UnifiedEngineRequest>,
830        mut responder: F,
831    ) -> Vec<OrderAction>
832    where
833        F: FnMut(&hypercall_runtime_api::UnifiedEngineRequest) -> Option<OrderUpdateMessage>
834            + Send
835            + 'static,
836    {
837        let mut observed: Vec<OrderAction> = Vec::new();
838        while let Some(req) = engine_rx.recv().await {
839            observed.push(req.message.action.clone());
840            if let Some(resp) = responder(&req) {
841                let _ = req.response_tx.send(resp).await;
842            }
843        }
844        observed
845    }
846
847    /// Invariant 1: every CancelOrder arrives at the engine before any
848    /// CreateOrder. This is the whole reason the handler exists.
849    #[tokio::test]
850    async fn cancels_dispatched_before_creates_for_batch_of_three() {
851        let (engine_tx, engine_rx) = mpsc::channel(32);
852        let wallet = test_wallet(1);
853
854        let engine_handle = tokio::spawn({
855            async move {
856                run_mock_engine(engine_rx, move |req| {
857                    let status = match req.message.action {
858                        OrderAction::CancelOrder => OrderUpdateStatus::Canceled,
859                        OrderAction::CreateOrder => OrderUpdateStatus::Open,
860                        _ => OrderUpdateStatus::Rejected,
861                    };
862                    Some(mk_response(req.message.info.order_id, status, wallet))
863                })
864                .await
865            }
866        });
867
868        let validated = vec![
869            Ok(mk_validated(101)),
870            Ok(mk_validated(102)),
871            Ok(mk_validated(103)),
872        ];
873        let results = orchestrate_bulk_replace(
874            &engine_tx,
875            validated,
876            Duration::from_secs(2),
877            Duration::from_secs(2),
878            None,
879        )
880        .await;
881        drop(engine_tx);
882        let observed = engine_handle.await.expect("engine task");
883
884        assert_eq!(results.len(), 3);
885        assert!(results.iter().all(|r| r.success), "all 3 replaces succeed");
886        assert_eq!(observed.len(), 6, "3 cancels + 3 creates");
887
888        let cancel_idxs: Vec<usize> = observed
889            .iter()
890            .enumerate()
891            .filter(|(_, a)| **a == OrderAction::CancelOrder)
892            .map(|(i, _)| i)
893            .collect();
894        let create_idxs: Vec<usize> = observed
895            .iter()
896            .enumerate()
897            .filter(|(_, a)| **a == OrderAction::CreateOrder)
898            .map(|(i, _)| i)
899            .collect();
900        assert_eq!(cancel_idxs, vec![0, 1, 2], "cancels are first three");
901        assert_eq!(create_idxs, vec![3, 4, 5], "creates come after all cancels");
902    }
903
904    /// Invariant 2: when a cancel comes back as anything other than
905    /// Canceled (e.g. the order was already filled), the create phase
906    /// for that leg is skipped. The leg returns a cancel-only result
907    /// with an informative error string.
908    #[tokio::test]
909    async fn cancel_failure_skips_create_for_that_leg() {
910        let (engine_tx, engine_rx) = mpsc::channel(32);
911        let wallet = test_wallet(1);
912
913        let engine_handle = tokio::spawn({
914            async move {
915                run_mock_engine(engine_rx, move |req| {
916                    let oid = req.message.info.order_id;
917                    let status = match (&req.message.action, oid) {
918                        // Leg with old_order_id = 201 is "already filled".
919                        (OrderAction::CancelOrder, Some(201)) => OrderUpdateStatus::Rejected,
920                        (OrderAction::CancelOrder, _) => OrderUpdateStatus::Canceled,
921                        (OrderAction::CreateOrder, _) => OrderUpdateStatus::Open,
922                        _ => OrderUpdateStatus::Rejected,
923                    };
924                    Some(mk_response(oid, status, wallet))
925                })
926                .await
927            }
928        });
929
930        let validated = vec![
931            Ok(mk_validated(200)), // will succeed
932            Ok(mk_validated(201)), // cancel fails
933            Ok(mk_validated(202)), // will succeed
934        ];
935        let results = orchestrate_bulk_replace(
936            &engine_tx,
937            validated,
938            Duration::from_secs(2),
939            Duration::from_secs(2),
940            None,
941        )
942        .await;
943        drop(engine_tx);
944        let observed = engine_handle.await.expect("engine task");
945
946        // 3 cancels + 2 creates (skipped 201's create).
947        assert_eq!(observed.len(), 5);
948        assert_eq!(
949            observed
950                .iter()
951                .filter(|a| **a == OrderAction::CancelOrder)
952                .count(),
953            3
954        );
955        assert_eq!(
956            observed
957                .iter()
958                .filter(|a| **a == OrderAction::CreateOrder)
959                .count(),
960            2
961        );
962
963        assert!(results[0].success);
964        assert!(!results[1].success);
965        assert!(results[1]
966            .error
967            .as_deref()
968            .unwrap_or("")
969            .contains("cancel of existing order failed"));
970        assert!(results[2].success);
971    }
972
973    /// Invariant 3: validation errors pass through unchanged to the
974    /// correct output index, and a validation-failed leg is never sent
975    /// to the engine.
976    #[tokio::test]
977    async fn validation_failures_skip_engine_dispatch() {
978        let (engine_tx, engine_rx) = mpsc::channel(32);
979        let wallet = test_wallet(1);
980
981        let engine_handle = tokio::spawn({
982            async move {
983                run_mock_engine(engine_rx, move |req| {
984                    let status = match req.message.action {
985                        OrderAction::CancelOrder => OrderUpdateStatus::Canceled,
986                        OrderAction::CreateOrder => OrderUpdateStatus::Open,
987                        _ => OrderUpdateStatus::Rejected,
988                    };
989                    Some(mk_response(req.message.info.order_id, status, wallet))
990                })
991                .await
992            }
993        });
994
995        let validated = vec![
996            Ok(mk_validated(301)),
997            Err(mk_validation_error(1)),
998            Ok(mk_validated(303)),
999        ];
1000        let results = orchestrate_bulk_replace(
1001            &engine_tx,
1002            validated,
1003            Duration::from_secs(2),
1004            Duration::from_secs(2),
1005            None,
1006        )
1007        .await;
1008        drop(engine_tx);
1009        let observed = engine_handle.await.expect("engine task");
1010
1011        // Validation failure means no engine traffic for that leg → 2 cancels + 2 creates.
1012        assert_eq!(observed.len(), 4);
1013        assert!(results[0].success);
1014        assert!(!results[1].success);
1015        assert_eq!(results[1].index, 1);
1016        assert!(results[1]
1017            .error
1018            .as_deref()
1019            .unwrap_or("")
1020            .contains("Signature verification failed"));
1021        assert!(results[2].success);
1022    }
1023
1024    /// Invariant 4: when a cancel *response* times out (no reply by the
1025    /// phase deadline), phase 2 skips the create for that leg. The cancel
1026    /// carries the signed nonce and the create is intentionally
1027    /// engine-internal, so a missing cancel response leaves nonce
1028    /// consumption unknown and must not permit a replay to place a new
1029    /// order.
1030    #[tokio::test]
1031    async fn cancel_response_timeout_skips_create() {
1032        let (engine_tx, engine_rx) = mpsc::channel(32);
1033        let wallet = test_wallet(1);
1034
1035        // Mock engine deliberately silences the cancel response for oid=401
1036        // so the orchestrator cannot know whether the signed nonce was consumed.
1037        let engine_handle = tokio::spawn({
1038            async move {
1039                run_mock_engine(engine_rx, move |req| {
1040                    let oid = req.message.info.order_id;
1041                    if oid == Some(401) && req.message.action == OrderAction::CancelOrder {
1042                        return None; // silence this cancel response
1043                    }
1044                    let status = match req.message.action {
1045                        OrderAction::CancelOrder => OrderUpdateStatus::Canceled,
1046                        OrderAction::CreateOrder => OrderUpdateStatus::Open,
1047                        _ => OrderUpdateStatus::Rejected,
1048                    };
1049                    Some(mk_response(oid, status, wallet))
1050                })
1051                .await
1052            }
1053        });
1054
1055        let validated = vec![
1056            Ok(mk_validated(400)),
1057            Ok(mk_validated(401)), // cancel response silenced
1058            Ok(mk_validated(402)),
1059        ];
1060        let short_deadline = Duration::from_millis(200);
1061        let start = std::time::Instant::now();
1062        let results = orchestrate_bulk_replace(
1063            &engine_tx,
1064            validated,
1065            short_deadline,
1066            Duration::from_secs(2),
1067            None,
1068        )
1069        .await;
1070        let elapsed = start.elapsed();
1071        drop(engine_tx);
1072        let observed = engine_handle.await.expect("engine task");
1073
1074        // Phase deadline is shared across legs, so one hung cancel can't
1075        // stretch total time beyond ~deadline + create phase.
1076        assert!(elapsed < Duration::from_secs(3), "elapsed {:?}", elapsed);
1077
1078        // All 3 cancels and only 2 creates were dispatched. The timed-out
1079        // cancel leg is skipped in phase 2.
1080        assert_eq!(observed.len(), 5);
1081        assert_eq!(
1082            observed
1083                .iter()
1084                .filter(|a| **a == OrderAction::CreateOrder)
1085                .count(),
1086            2,
1087            "create for the hung-cancel leg must be skipped"
1088        );
1089
1090        assert!(results[0].success);
1091        assert!(
1092            !results[1].success,
1093            "leg with silenced cancel response must not create"
1094        );
1095        assert!(
1096            results[1]
1097                .error
1098                .as_deref()
1099                .unwrap_or("")
1100                .contains("nonce consumption is unknown"),
1101            "unexpected timeout error: {:?}",
1102            results[1].error
1103        );
1104        assert!(results[2].success);
1105    }
1106
1107    /// Invariant 5: if the cancel succeeds but the create never responds,
1108    /// the leg ends with a "cancel succeeded but create response did not
1109    /// arrive" error.  The old order is gone engine-side; caller should
1110    /// know to re-quote on the next cycle.
1111    #[tokio::test]
1112    async fn create_response_timeout_returns_cancel_only_result() {
1113        let (engine_tx, engine_rx) = mpsc::channel(32);
1114        let wallet = test_wallet(1);
1115
1116        let engine_handle = tokio::spawn({
1117            async move {
1118                run_mock_engine(engine_rx, move |req| {
1119                    let oid = req.message.info.order_id;
1120                    match req.message.action {
1121                        OrderAction::CancelOrder => {
1122                            Some(mk_response(oid, OrderUpdateStatus::Canceled, wallet))
1123                        }
1124                        OrderAction::CreateOrder => None, // hang all creates
1125                        _ => None,
1126                    }
1127                })
1128                .await
1129            }
1130        });
1131
1132        let validated = vec![Ok(mk_validated(501))];
1133        let short_deadline = Duration::from_millis(200);
1134        let results = orchestrate_bulk_replace(
1135            &engine_tx,
1136            validated,
1137            Duration::from_secs(2),
1138            short_deadline,
1139            None,
1140        )
1141        .await;
1142        drop(engine_tx);
1143        let _ = engine_handle.await;
1144
1145        assert_eq!(results.len(), 1);
1146        assert!(!results[0].success);
1147        assert_eq!(
1148            results[0].data.as_ref().map(|r| r.status),
1149            Some(OrderUpdateStatus::Canceled)
1150        );
1151        assert!(
1152            results[0]
1153                .error
1154                .as_deref()
1155                .unwrap_or("")
1156                .contains("create response did not arrive"),
1157            "unexpected error: {:?}",
1158            results[0].error
1159        );
1160    }
1161
1162    /// Invariant 7: a leg whose symbol becomes trading-halted *between
1163    /// phase 1 and phase 2* does NOT get a create dispatched, and returns
1164    /// a cancel-only result with an explicit halt reason. `orchestrate_bulk_replace`
1165    /// itself reads the halt state after phase 1 completes (see
1166    /// "Recompute halt gate after cancel phase completes" in the codex
1167    /// review); this test verifies that path by flipping the market halt
1168    /// on leg 1's symbol *while phase 1 is in flight* and asserting the
1169    /// create phase correctly skips it.
1170    #[tokio::test]
1171    async fn halt_fired_during_phase_one_blocks_only_matching_leg_in_phase_two() {
1172        let (engine_tx, engine_rx) = mpsc::channel(32);
1173        let wallet = test_wallet(1);
1174
1175        // Build a halt state we'll mutate from inside the mock engine once
1176        // phase-1 cancels are observed — simulating a halt that fires
1177        // while cancels are in flight.
1178        let halt_state = std::sync::Arc::new(RwLock::new(TradingHaltState::new()));
1179        let halt_state_engine = halt_state.clone();
1180        let halted_symbol = "BTC-20260101-80000-C".to_string();
1181        let halted_symbol_engine = halted_symbol.clone();
1182
1183        let engine_handle = tokio::spawn({
1184            async move {
1185                let mut cancels_seen = 0usize;
1186                run_mock_engine(engine_rx, move |req| {
1187                    let status = match req.message.action {
1188                        OrderAction::CancelOrder => OrderUpdateStatus::Canceled,
1189                        OrderAction::CreateOrder => OrderUpdateStatus::Open,
1190                        _ => OrderUpdateStatus::Rejected,
1191                    };
1192                    // As soon as we've seen every cancel respond (phase 1 is
1193                    // effectively done), flip the halt. The orchestrator's
1194                    // post-phase-1 snapshot must see this.
1195                    if matches!(req.message.action, OrderAction::CancelOrder) {
1196                        cancels_seen += 1;
1197                        if cancels_seen == 3 {
1198                            let halt_state = halt_state_engine.clone();
1199                            let sym = halted_symbol_engine.clone();
1200                            tokio::spawn(async move {
1201                                halt_state.write().await.set_market_halt(
1202                                    &sym,
1203                                    true,
1204                                    "test halt".to_string(),
1205                                    "test".to_string(),
1206                                );
1207                            });
1208                        }
1209                    }
1210                    Some(mk_response(req.message.info.order_id, status, wallet))
1211                })
1212                .await
1213            }
1214        });
1215
1216        let validated = vec![
1217            Ok(mk_validated(701)),
1218            Ok(mk_validated_with_symbol(702, halted_symbol.clone())),
1219            Ok(mk_validated(703)),
1220        ];
1221
1222        let results = orchestrate_bulk_replace(
1223            &engine_tx,
1224            validated,
1225            Duration::from_secs(2),
1226            Duration::from_secs(2),
1227            Some(&halt_state),
1228        )
1229        .await;
1230        drop(engine_tx);
1231        let observed = engine_handle.await.expect("engine task");
1232
1233        // 3 cancels + 2 creates (leg 1 skipped in phase 2 because its
1234        // symbol got halted during phase 1).
1235        assert_eq!(
1236            observed
1237                .iter()
1238                .filter(|a| **a == OrderAction::CancelOrder)
1239                .count(),
1240            3
1241        );
1242        assert_eq!(
1243            observed
1244                .iter()
1245                .filter(|a| **a == OrderAction::CreateOrder)
1246                .count(),
1247            2
1248        );
1249
1250        assert!(results[0].success);
1251        assert!(!results[1].success);
1252        assert!(
1253            results[1]
1254                .error
1255                .as_deref()
1256                .unwrap_or("")
1257                .contains("Trading halted"),
1258            "unexpected error: {:?}",
1259            results[1].error
1260        );
1261        assert!(results[2].success);
1262    }
1263
1264    /// Invariant 8: if the engine request sender is closed (channel queue
1265    /// dropped — a liveness bug or shutdown race), the leg returns a
1266    /// distinct "Failed to enqueue cancel" error rather than a generic
1267    /// no-response-from-engine, so operators can tell a dead-queue state
1268    /// apart from an engine-timeout state.
1269    #[tokio::test]
1270    async fn cancel_enqueue_failure_reports_explicit_error() {
1271        let (engine_tx, engine_rx) = mpsc::channel(32);
1272        // Close the receiver side immediately so every `send(...)` returns Err.
1273        drop(engine_rx);
1274
1275        let validated = vec![Ok(mk_validated(801))];
1276        let results = orchestrate_bulk_replace(
1277            &engine_tx,
1278            validated,
1279            Duration::from_millis(200),
1280            Duration::from_millis(200),
1281            None,
1282        )
1283        .await;
1284
1285        assert_eq!(results.len(), 1);
1286        assert!(!results[0].success);
1287        assert!(
1288            results[0]
1289                .error
1290                .as_deref()
1291                .unwrap_or("")
1292                .contains("Failed to enqueue cancel"),
1293            "unexpected error: {:?}",
1294            results[0].error
1295        );
1296    }
1297
1298    /// Invariant 9: total bulk latency is bounded by the sum of the two
1299    /// phase deadlines, not N times a per-leg timeout. A 5-leg bulk with
1300    /// every cancel response silenced should finish in roughly the shared
1301    /// cancel deadline, not five times that.
1302    #[tokio::test]
1303    async fn phase_deadline_bounds_total_bulk_latency() {
1304        let (engine_tx, engine_rx) = mpsc::channel(32);
1305        let wallet = test_wallet(1);
1306
1307        // Engine silences every cancel response. No creates should be sent
1308        // because the nonce-consuming cancel outcome is unknown.
1309        let engine_handle = tokio::spawn({
1310            async move {
1311                run_mock_engine(engine_rx, move |req| match req.message.action {
1312                    OrderAction::CancelOrder => None,
1313                    OrderAction::CreateOrder => Some(mk_response(
1314                        req.message.info.order_id,
1315                        OrderUpdateStatus::Open,
1316                        wallet,
1317                    )),
1318                    _ => None,
1319                })
1320                .await
1321            }
1322        });
1323
1324        let validated = vec![
1325            Ok(mk_validated(901)),
1326            Ok(mk_validated(902)),
1327            Ok(mk_validated(903)),
1328            Ok(mk_validated(904)),
1329            Ok(mk_validated(905)),
1330        ];
1331        let cancel_deadline = Duration::from_millis(300);
1332        let create_deadline = Duration::from_millis(300);
1333        let start = std::time::Instant::now();
1334        let results = orchestrate_bulk_replace(
1335            &engine_tx,
1336            validated,
1337            cancel_deadline,
1338            create_deadline,
1339            None,
1340        )
1341        .await;
1342        let elapsed = start.elapsed();
1343        drop(engine_tx);
1344        let _ = engine_handle.await;
1345
1346        // Phase deadlines are shared, so total time is O(cancel_deadline),
1347        // not O(N * per_leg_timeout). Give some slack
1348        // for scheduling overhead but strictly bound below N × deadline.
1349        assert!(
1350            elapsed < cancel_deadline + create_deadline + Duration::from_millis(500),
1351            "elapsed {:?} exceeded phase-deadline budget",
1352            elapsed
1353        );
1354
1355        assert_eq!(results.len(), 5);
1356        assert!(results.iter().all(|r| !r.success));
1357        assert!(results.iter().all(|r| r
1358            .error
1359            .as_deref()
1360            .unwrap_or("")
1361            .contains("nonce consumption is unknown")));
1362    }
1363
1364    /// Invariant 6: result indices are preserved — result[i] corresponds
1365    /// to input[i], regardless of success/failure mix.
1366    #[tokio::test]
1367    async fn result_order_matches_input_order() {
1368        let (engine_tx, engine_rx) = mpsc::channel(32);
1369        let wallet = test_wallet(1);
1370
1371        let engine_handle = tokio::spawn({
1372            async move {
1373                run_mock_engine(engine_rx, move |req| {
1374                    let status = match req.message.action {
1375                        OrderAction::CancelOrder => OrderUpdateStatus::Canceled,
1376                        OrderAction::CreateOrder => OrderUpdateStatus::Open,
1377                        _ => OrderUpdateStatus::Rejected,
1378                    };
1379                    Some(mk_response(req.message.info.order_id, status, wallet))
1380                })
1381                .await
1382            }
1383        });
1384
1385        let validated = vec![
1386            Ok(mk_validated(600)),
1387            Err(mk_validation_error(1)),
1388            Ok(mk_validated(602)),
1389            Err(mk_validation_error(3)),
1390            Ok(mk_validated(604)),
1391        ];
1392        let results = orchestrate_bulk_replace(
1393            &engine_tx,
1394            validated,
1395            Duration::from_secs(2),
1396            Duration::from_secs(2),
1397            None,
1398        )
1399        .await;
1400        drop(engine_tx);
1401        let _ = engine_handle.await;
1402
1403        assert_eq!(results.len(), 5);
1404        for (i, r) in results.iter().enumerate() {
1405            assert_eq!(r.index, i, "result {} has wrong index", i);
1406        }
1407        assert!(results[0].success);
1408        assert!(!results[1].success);
1409        assert!(results[2].success);
1410        assert!(!results[3].success);
1411        assert!(results[4].success);
1412    }
1413
1414    /// `build_cancel_message` encodes enough information for the engine
1415    /// to cancel by order_id without needing the symbol up front.
1416    #[test]
1417    fn build_cancel_message_sets_action_and_order_id() {
1418        let v = mk_validated(999);
1419        let msg = build_cancel_message(&v);
1420        assert_eq!(msg.action, OrderAction::CancelOrder);
1421        assert_eq!(msg.info.order_id, Some(999));
1422        assert_eq!(msg.info.nonce, v.new_order_info.nonce);
1423        // Symbol is intentionally empty — engine resolves it from the per-
1424        // wallet order_id index (matches the singular cancel_order() path).
1425        assert_eq!(msg.info.symbol, "");
1426        assert_eq!(msg.wallet, v.wallet);
1427    }
1428
1429    /// `build_create_message` forwards the new order's price/size/side and
1430    /// clears the order_id so the engine allocates a fresh id.
1431    #[test]
1432    fn build_create_message_uses_new_order_info() {
1433        let v = mk_validated(1234);
1434        let msg = build_create_message(&v);
1435        assert_eq!(msg.action, OrderAction::CreateOrder);
1436        assert_eq!(msg.info.order_id, None);
1437        assert_eq!(msg.info.price, v.new_order_info.price);
1438        assert_eq!(msg.info.size, v.new_order_info.size);
1439        assert_eq!(msg.info.nonce, None);
1440        assert_eq!(msg.wallet, v.wallet);
1441    }
1442
1443    /// `merge_leg_result` branch coverage — each terminal outcome path
1444    /// should produce the right success flag, result data, and error
1445    /// string. Covers the new flags `cancel_enqueue_failed` and
1446    /// `halted_after_cancel` in addition to the original four response
1447    /// combinations.
1448    #[test]
1449    fn merge_leg_result_branches() {
1450        let wallet = test_wallet(1);
1451
1452        // (a) validation error → returned verbatim, flags ignored.
1453        let ve = mk_validation_error(7);
1454        let r = merge_leg_result(7, Err(ve.clone()), None, None, false, false);
1455        assert_eq!(r.index, 7);
1456        assert!(!r.success);
1457        assert_eq!(r.error, ve.error);
1458
1459        // (b) create present + Acked → success, cancel data discarded in favour of create.
1460        let cancel = mk_response(Some(1), OrderUpdateStatus::Canceled, wallet);
1461        let create = mk_response(Some(10), OrderUpdateStatus::Open, wallet);
1462        let r = merge_leg_result(
1463            0,
1464            Ok(mk_validated(1)),
1465            Some(cancel),
1466            Some(create),
1467            false,
1468            false,
1469        );
1470        assert!(r.success);
1471        assert_eq!(r.data.unwrap().status, OrderUpdateStatus::Open);
1472
1473        // (c) cancel Canceled, no create response → "create response did not arrive".
1474        let cancel = mk_response(Some(1), OrderUpdateStatus::Canceled, wallet);
1475        let r = merge_leg_result(0, Ok(mk_validated(1)), Some(cancel), None, false, false);
1476        assert!(!r.success);
1477        assert!(r
1478            .error
1479            .as_deref()
1480            .unwrap()
1481            .contains("create response did not arrive"));
1482
1483        // (d) cancel Rejected → cancel-failed path, no create attempted.
1484        let cancel = mk_response(Some(1), OrderUpdateStatus::Rejected, wallet);
1485        let r = merge_leg_result(0, Ok(mk_validated(1)), Some(cancel), None, false, false);
1486        assert!(!r.success);
1487        assert!(r
1488            .error
1489            .as_deref()
1490            .unwrap()
1491            .contains("cancel of existing order failed"));
1492
1493        // (e) halted_after_cancel=true → explicit halt error, never success.
1494        let cancel = mk_response(Some(1), OrderUpdateStatus::Canceled, wallet);
1495        let r = merge_leg_result(0, Ok(mk_validated(1)), Some(cancel), None, false, true);
1496        assert!(!r.success);
1497        assert!(r.error.as_deref().unwrap().contains("Trading halted"));
1498
1499        // (f) cancel enqueue failed → explicit "Failed to enqueue" error.
1500        let r = merge_leg_result(0, Ok(mk_validated(1)), None, None, true, false);
1501        assert!(!r.success);
1502        assert!(r
1503            .error
1504            .as_deref()
1505            .unwrap()
1506            .contains("Failed to enqueue cancel"));
1507
1508        // (g) no signals at all (cancel enqueued but no response AND no create
1509        //     response) — terminal, informative.
1510        let r = merge_leg_result(0, Ok(mk_validated(1)), None, None, false, false);
1511        assert!(!r.success);
1512        assert!(r
1513            .error
1514            .as_deref()
1515            .unwrap()
1516            .contains("nonce consumption is unknown"));
1517    }
1518
1519    /// MMP propagation invariant. The real MMP logic lives in the engine
1520    /// (`src/rsm/unified_engine/matching.rs` → `mmp_cache.process_fill`);
1521    /// the bulk-replace handler only has to guarantee two things:
1522    ///
1523    ///   1. `mmp_enabled=true` on the `ReplaceOrderRequest` arrives on the
1524    ///      `CreateOrder` message that the engine ultimately sees. A flag
1525    ///      that silently gets flipped to `false` by `build_create_message`
1526    ///      would defeat the market maker's MMP config — the engine can't
1527    ///      retroactively decide a fill was MMP-eligible.
1528    ///
1529    ///   2. Phase-1 `CancelOrder` messages never carry `mmp_enabled=true`.
1530    ///      Cancels don't fill and shouldn't count toward any MMP
1531    ///      accounting; more importantly, setting it would be a lie about
1532    ///      the order being replaced.
1533    ///
1534    ///   3. When the engine sends back an `OrderUpdateMessage` carrying
1535    ///      `mmp_triggered=true` on a create response (MMP fired during
1536    ///      the fill processing of that leg), the merged bulk result
1537    ///      must surface that status field faithfully so clients can
1538    ///      react (re-configure MMP, back off quoting, etc.).
1539    ///
1540    /// This test drives all three via the mock engine: one leg has
1541    /// `mmp_enabled=true`, one has it false, and the engine tags the
1542    /// mmp-enabled leg's create response with `mmp_triggered=true`.
1543    #[tokio::test]
1544    async fn mmp_flag_propagates_to_create_and_not_to_cancel() {
1545        let (engine_tx, engine_rx) = mpsc::channel(32);
1546        let wallet = test_wallet(1);
1547
1548        let engine_handle = tokio::spawn({
1549            async move {
1550                // Collect the full messages this time (not just the actions)
1551                // so we can inspect `info.mmp_enabled` per leg per phase.
1552                let mut rx: mpsc::Receiver<hypercall_runtime_api::UnifiedEngineRequest> = engine_rx;
1553                let mut observed: Vec<(OrderAction, Option<u64>, bool)> = Vec::new();
1554                while let Some(req) = rx.recv().await {
1555                    let action = req.message.action.clone();
1556                    let oid = req.message.info.order_id;
1557                    let mmp = req.message.info.mmp_enabled;
1558                    observed.push((action.clone(), oid, mmp));
1559                    let status = match action {
1560                        OrderAction::CancelOrder => OrderUpdateStatus::Canceled,
1561                        OrderAction::CreateOrder => OrderUpdateStatus::Filled,
1562                        _ => OrderUpdateStatus::Rejected,
1563                    };
1564                    // On the mmp-enabled leg's create (old_order_id=502 →
1565                    // new OrderInfo.order_id=None on create, so match via
1566                    // mmp flag), tag mmp_triggered=true on the response.
1567                    let mut resp = mk_response(oid, status, wallet);
1568                    if matches!(action, OrderAction::CreateOrder) && mmp {
1569                        resp.mmp_triggered = true;
1570                    }
1571                    let _ = req.response_tx.send(resp).await;
1572                }
1573                observed
1574            }
1575        });
1576
1577        let validated = vec![
1578            Ok(mk_validated(501)),     // mmp_enabled = false
1579            Ok(mk_validated_mmp(502)), // mmp_enabled = true
1580        ];
1581        let results = orchestrate_bulk_replace(
1582            &engine_tx,
1583            validated,
1584            Duration::from_secs(2),
1585            Duration::from_secs(2),
1586            None,
1587        )
1588        .await;
1589        drop(engine_tx);
1590        let observed = engine_handle.await.expect("engine task");
1591
1592        assert_eq!(observed.len(), 4, "2 cancels + 2 creates");
1593
1594        // Invariant 1: create's mmp_enabled mirrors the request.
1595        let creates: Vec<_> = observed
1596            .iter()
1597            .filter(|(a, _, _)| *a == OrderAction::CreateOrder)
1598            .collect();
1599        assert_eq!(creates.len(), 2);
1600        // The only leg with mmp_enabled=true on its new order is 502.
1601        assert!(
1602            creates.iter().any(|(_, _, mmp)| !*mmp),
1603            "leg 501 create must have mmp_enabled=false"
1604        );
1605        assert!(
1606            creates.iter().any(|(_, _, mmp)| *mmp),
1607            "leg 502 create must have mmp_enabled=true"
1608        );
1609
1610        // Invariant 2: every cancel carries mmp_enabled=false, regardless
1611        // of the leg's new-order mmp_enabled setting. Cancels don't fill;
1612        // MMP accounting must not be triggered by a cancel.
1613        let cancels: Vec<_> = observed
1614            .iter()
1615            .filter(|(a, _, _)| *a == OrderAction::CancelOrder)
1616            .collect();
1617        assert_eq!(cancels.len(), 2);
1618        assert!(
1619            cancels.iter().all(|(_, _, mmp)| !*mmp),
1620            "cancels must never carry mmp_enabled=true"
1621        );
1622
1623        // Invariant 3: the merged bulk result surfaces `mmp_triggered=true`
1624        // on the leg whose fill breached MMP, so clients can react.
1625        assert_eq!(results.len(), 2);
1626        assert!(results[0].success);
1627        let leg0 = results[0].data.as_ref().expect("leg 0 has response data");
1628        assert!(
1629            !leg0.mmp_triggered,
1630            "leg 0 (mmp_enabled=false) must not report mmp_triggered"
1631        );
1632        assert!(results[1].success);
1633        let leg1 = results[1].data.as_ref().expect("leg 1 has response data");
1634        assert!(
1635            leg1.mmp_triggered,
1636            "leg 1 (mmp_enabled=true) must surface engine's mmp_triggered=true"
1637        );
1638    }
1639}