Skip to main content

hypercall/runtime/
standby.rs

1use std::sync::Arc;
2use tokio::sync::{mpsc, oneshot, Mutex};
3use tracing::info;
4
5use crate::rsm::unified_engine::UnifiedEngineRequest;
6
7/// Controls the engine standby → draining → active lifecycle.
8///
9/// In standby mode, orders are queued in memory. On `promote()`, the current
10/// queue is handed off for draining and the controller enters a draining phase;
11/// orders arriving during that phase continue to queue until `finish_drain()`
12/// transitions the controller to active mode, after which `try_enqueue()`
13/// returns `NotStandby` and callers forward directly to the engine.
14#[derive(Clone)]
15pub struct StandbyController {
16    state: Arc<Mutex<StandbyState>>,
17}
18
19enum StandbyState {
20    Standby {
21        promote_tx: Option<oneshot::Sender<()>>,
22        queue: Vec<UnifiedEngineRequest>,
23        capacity: usize,
24    },
25    Draining {
26        queue: Vec<UnifiedEngineRequest>,
27        capacity: usize,
28    },
29    Active,
30}
31
32/// Result of trying to enqueue an order during standby.
33pub enum EnqueueResult {
34    /// Order was queued (standby mode). Will be drained on promote.
35    Queued,
36    /// Not in standby mode — caller should send directly to engine.
37    NotStandby(UnifiedEngineRequest),
38    /// Queue is full. Caller should return 503.
39    QueueFull(UnifiedEngineRequest),
40}
41
42/// Result of a promote attempt.
43#[derive(Debug, PartialEq)]
44pub enum PromoteResult {
45    /// Successfully promoted. Returns number of queued orders to drain.
46    Promoted { queued_orders: usize },
47    /// Already active — promote was already called.
48    AlreadyActive,
49}
50
51impl StandbyController {
52    /// Create a new controller in standby mode.
53    /// Returns the controller and a future that resolves when promote is called.
54    pub fn new_standby(capacity: usize) -> (Self, oneshot::Receiver<()>) {
55        let (promote_tx, promote_rx) = oneshot::channel();
56        let controller = Self {
57            state: Arc::new(Mutex::new(StandbyState::Standby {
58                promote_tx: Some(promote_tx),
59                queue: Vec::with_capacity(capacity.min(1024)),
60                capacity,
61            })),
62        };
63        (controller, promote_rx)
64    }
65
66    /// Create a controller that starts in active mode (no standby).
67    pub fn new_active() -> Self {
68        Self {
69            state: Arc::new(Mutex::new(StandbyState::Active)),
70        }
71    }
72
73    /// Returns true if the controller is currently in standby or draining mode
74    /// (i.e., not yet fully active — orders are still being queued).
75    pub async fn is_standby(&self) -> bool {
76        !matches!(*self.state.lock().await, StandbyState::Active)
77    }
78
79    /// Returns true if promote() has been called (in Draining or Active state).
80    pub async fn is_promoted(&self) -> bool {
81        !matches!(*self.state.lock().await, StandbyState::Standby { .. })
82    }
83
84    /// Try to enqueue an order. Returns the appropriate result for each state:
85    /// - `Queued`: order was accepted into the standby queue
86    /// - `NotStandby(request)`: not in standby mode, caller should forward to engine directly
87    /// - `QueueFull(request)`: queue at capacity, caller should return 503
88    pub async fn try_enqueue(&self, request: UnifiedEngineRequest) -> EnqueueResult {
89        let mut state = self.state.lock().await;
90        match &mut *state {
91            StandbyState::Active => EnqueueResult::NotStandby(request),
92            StandbyState::Standby {
93                queue, capacity, ..
94            }
95            | StandbyState::Draining { queue, capacity } => {
96                if queue.len() >= *capacity {
97                    EnqueueResult::QueueFull(request)
98                } else {
99                    queue.push(request);
100                    EnqueueResult::Queued
101                }
102            }
103        }
104    }
105
106    /// Promote from standby to draining.
107    /// Sends the promote signal so any waiting tasks (engine, batcher, etc.) unblock.
108    ///
109    /// Orders remain in the controller — call `take_drain_batch()` to retrieve
110    /// them for draining. New orders arriving during drain are still queued
111    /// (not forwarded directly) to preserve ordering.
112    pub async fn promote(&self) -> PromoteResult {
113        let mut state = self.state.lock().await;
114        match &*state {
115            StandbyState::Standby { .. } => {
116                let capacity = match &*state {
117                    StandbyState::Standby { capacity, .. } => *capacity,
118                    _ => unreachable!(),
119                };
120                match std::mem::replace(
121                    &mut *state,
122                    StandbyState::Draining {
123                        queue: Vec::new(),
124                        capacity,
125                    },
126                ) {
127                    StandbyState::Standby {
128                        promote_tx,
129                        queue,
130                        capacity,
131                        ..
132                    } => {
133                        let count = queue.len();
134                        // Move existing orders into the new Draining queue
135                        *state = StandbyState::Draining { queue, capacity };
136                        if let Some(tx) = promote_tx {
137                            let _ = tx.send(());
138                        }
139                        info!(queued_orders = count, "Promoted from standby to draining");
140                        PromoteResult::Promoted {
141                            queued_orders: count,
142                        }
143                    }
144                    _ => unreachable!(),
145                }
146            }
147            StandbyState::Draining { .. } | StandbyState::Active => PromoteResult::AlreadyActive,
148        }
149    }
150
151    /// Take a batch of orders to drain into the engine.
152    /// Returns queued orders and keeps the controller in Draining state.
153    /// When the returned vec is empty, transitions to Active.
154    /// Call this in a loop until it returns an empty vec.
155    pub async fn take_drain_batch(&self) -> Vec<UnifiedEngineRequest> {
156        let mut state = self.state.lock().await;
157        match &mut *state {
158            StandbyState::Draining { queue, .. } => {
159                if queue.is_empty() {
160                    *state = StandbyState::Active;
161                    info!("Drain complete, transitioned to active");
162                    Vec::new()
163                } else {
164                    let orders = std::mem::take(queue);
165                    info!(batch_size = orders.len(), "Taking drain batch");
166                    orders
167                }
168            }
169            _ => Vec::new(),
170        }
171    }
172
173    /// Drain queued orders into the engine channel.
174    /// Returns (sent_count, unsent_orders) — unsent orders are returned on channel failure
175    /// so callers can retry or persist them.
176    pub async fn drain_to(
177        orders: Vec<UnifiedEngineRequest>,
178        engine_tx: &mpsc::Sender<UnifiedEngineRequest>,
179    ) -> (usize, Vec<UnifiedEngineRequest>) {
180        let total = orders.len();
181        let mut sent = 0;
182        let mut iter = orders.into_iter();
183        for order in iter.by_ref() {
184            match engine_tx.send(order).await {
185                Ok(()) => sent += 1,
186                Err(mpsc::error::SendError(failed_order)) => {
187                    let remaining = total - sent - 1;
188                    tracing::error!(
189                        sent = sent,
190                        remaining = remaining + 1,
191                        "Engine channel closed during queue drain"
192                    );
193                    let mut unsent = vec![failed_order];
194                    unsent.extend(iter);
195                    return (sent, unsent);
196                }
197            }
198        }
199        (sent, Vec::new())
200    }
201}
202
203#[cfg(test)]
204mod tests {
205    use super::*;
206    use rust_decimal::Decimal;
207    use tokio::sync::mpsc as tokio_mpsc;
208
209    fn make_test_request() -> (
210        UnifiedEngineRequest,
211        mpsc::Receiver<hypercall_types::OrderUpdateMessage>,
212    ) {
213        use hypercall_types::{OrderAction, OrderActionMessage};
214        use hypercall_types::{OrderInfo, Side, TimeInForce, WalletAddress};
215
216        let (response_tx, response_rx) = tokio_mpsc::channel(1);
217        let request = UnifiedEngineRequest {
218            message: OrderActionMessage {
219                timestamp: 0,
220                info: OrderInfo {
221                    symbol: "BTC-20260501-100000-C".to_string(),
222                    price: Decimal::ZERO,
223                    size: Decimal::ONE,
224                    side: Side::Buy,
225                    tif: TimeInForce::GTC,
226                    client_id: None,
227                    order_id: None,
228                    is_perp: false,
229                    underlying: None,
230                    reduce_only: None,
231                    nonce: None,
232                    signature: None,
233                    mmp_enabled: false,
234                    builder_code_address: None,
235                },
236                action: OrderAction::CancelOrder,
237                wallet: WalletAddress::default(),
238                api_wallet_address: None,
239                mmp_triggered: false,
240                request_id: None,
241            },
242            response_tx,
243            enqueued_at: std::time::Instant::now(),
244            #[cfg(feature = "otel-tracing")]
245            trace_context: None,
246        };
247        (request, response_rx)
248    }
249
250    #[tokio::test]
251    async fn active_mode_does_not_enqueue() {
252        let controller = StandbyController::new_active();
253        let (req, _rx) = make_test_request();
254
255        let result = controller.try_enqueue(req).await;
256        assert!(
257            matches!(result, EnqueueResult::NotStandby(_)),
258            "Active controller should return NotStandby"
259        );
260        assert!(!controller.is_standby().await);
261    }
262
263    #[tokio::test]
264    async fn standby_mode_queues_orders() {
265        let (controller, _promote_rx) = StandbyController::new_standby(100);
266        assert!(controller.is_standby().await);
267
268        let (req, _rx) = make_test_request();
269        let result = controller.try_enqueue(req).await;
270        assert!(matches!(result, EnqueueResult::Queued));
271    }
272
273    #[tokio::test]
274    async fn standby_rejects_when_full() {
275        let (controller, _promote_rx) = StandbyController::new_standby(2);
276
277        let (req1, _rx1) = make_test_request();
278        let (req2, _rx2) = make_test_request();
279        let (req3, _rx3) = make_test_request();
280
281        assert!(matches!(
282            controller.try_enqueue(req1).await,
283            EnqueueResult::Queued
284        ));
285        assert!(matches!(
286            controller.try_enqueue(req2).await,
287            EnqueueResult::Queued
288        ));
289        assert!(
290            matches!(
291                controller.try_enqueue(req3).await,
292                EnqueueResult::QueueFull(_)
293            ),
294            "Should reject when at capacity"
295        );
296    }
297
298    #[tokio::test]
299    async fn promote_drains_queued_orders() {
300        let (controller, _promote_rx) = StandbyController::new_standby(100);
301
302        let (req1, _rx1) = make_test_request();
303        let (req2, _rx2) = make_test_request();
304        controller.try_enqueue(req1).await;
305        controller.try_enqueue(req2).await;
306
307        let result = controller.promote().await;
308        let orders = controller.take_drain_batch().await;
309        assert_eq!(result, PromoteResult::Promoted { queued_orders: 2 });
310        assert_eq!(orders.len(), 2);
311        // Still in draining state until finish_drain
312        assert!(controller.is_standby().await);
313
314        let remaining = controller.take_drain_batch().await;
315        assert!(remaining.is_empty());
316        assert!(!controller.is_standby().await);
317    }
318
319    #[tokio::test]
320    async fn promote_sends_signal() {
321        let (controller, promote_rx) = StandbyController::new_standby(100);
322
323        let result = controller.promote().await;
324        assert_eq!(result, PromoteResult::Promoted { queued_orders: 0 });
325
326        // The receiver should have gotten the signal
327        assert!(promote_rx.await.is_ok());
328    }
329
330    #[tokio::test]
331    async fn double_promote_returns_already_active() {
332        let (controller, _promote_rx) = StandbyController::new_standby(100);
333
334        let result1 = controller.promote().await;
335        assert_eq!(result1, PromoteResult::Promoted { queued_orders: 0 });
336
337        // Second promote while draining returns AlreadyActive
338        let result2 = controller.promote().await;
339        assert_eq!(result2, PromoteResult::AlreadyActive);
340
341        // Also after finish_drain
342        controller.take_drain_batch().await;
343        let result3 = controller.promote().await;
344        assert_eq!(result3, PromoteResult::AlreadyActive);
345    }
346
347    #[tokio::test]
348    async fn during_drain_orders_still_queue() {
349        let (controller, _promote_rx) = StandbyController::new_standby(100);
350        controller.promote().await;
351
352        // During drain phase, orders still queue (preserving ordering)
353        let (req, _rx) = make_test_request();
354        let result = controller.try_enqueue(req).await;
355        assert!(
356            matches!(result, EnqueueResult::Queued),
357            "Should still queue during drain"
358        );
359
360        // After finish_drain, new orders get NotStandby
361        let remaining = controller.take_drain_batch().await;
362        assert_eq!(remaining.len(), 1);
363        // Drain the additional order
364        let remaining2 = controller.take_drain_batch().await;
365        assert!(remaining2.is_empty());
366        assert!(!controller.is_standby().await);
367
368        let (req2, _rx2) = make_test_request();
369        let result2 = controller.try_enqueue(req2).await;
370        assert!(
371            matches!(result2, EnqueueResult::NotStandby(_)),
372            "Should return NotStandby after drain complete"
373        );
374    }
375
376    #[tokio::test]
377    async fn drain_to_sends_all_orders() {
378        let (controller, _promote_rx) = StandbyController::new_standby(100);
379
380        let (req1, _rx1) = make_test_request();
381        let (req2, _rx2) = make_test_request();
382        let (req3, _rx3) = make_test_request();
383        controller.try_enqueue(req1).await;
384        controller.try_enqueue(req2).await;
385        controller.try_enqueue(req3).await;
386
387        controller.promote().await;
388        let orders = controller.take_drain_batch().await;
389
390        let (engine_tx, mut engine_rx) = tokio_mpsc::channel(100);
391        let (sent, unsent) = StandbyController::drain_to(orders, &engine_tx).await;
392        assert_eq!(sent, 3);
393        assert!(unsent.is_empty());
394
395        // Verify orders arrived
396        let mut received = 0;
397        while engine_rx.try_recv().is_ok() {
398            received += 1;
399        }
400        assert_eq!(received, 3);
401    }
402
403    #[tokio::test]
404    async fn drain_to_handles_closed_channel() {
405        let (controller, _promote_rx) = StandbyController::new_standby(100);
406
407        let (req1, _rx1) = make_test_request();
408        let (req2, _rx2) = make_test_request();
409        controller.try_enqueue(req1).await;
410        controller.try_enqueue(req2).await;
411
412        controller.promote().await;
413        let orders = controller.take_drain_batch().await;
414
415        let (engine_tx, engine_rx) = tokio_mpsc::channel(100);
416        drop(engine_rx);
417        let (sent, unsent) = StandbyController::drain_to(orders, &engine_tx).await;
418        // Should handle gracefully — sends fail but doesn't panic
419        assert_eq!(sent, 0);
420        // All orders preserved (failed order + remaining)
421        assert_eq!(unsent.len(), 2);
422    }
423
424    #[tokio::test]
425    async fn concurrent_enqueue_is_safe() {
426        let (controller, _promote_rx) = StandbyController::new_standby(1000);
427        let mut handles = Vec::new();
428
429        for _ in 0..100 {
430            let c = controller.clone();
431            handles.push(tokio::spawn(async move {
432                let (req, _rx) = make_test_request();
433                matches!(c.try_enqueue(req).await, EnqueueResult::Queued)
434            }));
435        }
436
437        let mut queued = 0;
438        for h in handles {
439            if h.await.unwrap() {
440                queued += 1;
441            }
442        }
443        assert_eq!(queued, 100);
444
445        let result = controller.promote().await;
446        let orders = controller.take_drain_batch().await;
447        assert_eq!(result, PromoteResult::Promoted { queued_orders: 100 });
448        assert_eq!(orders.len(), 100);
449    }
450
451    #[tokio::test]
452    async fn concurrent_enqueue_respects_capacity() {
453        let (controller, _promote_rx) = StandbyController::new_standby(50);
454        let mut handles = Vec::new();
455
456        for _ in 0..100 {
457            let c = controller.clone();
458            handles.push(tokio::spawn(async move {
459                let (req, _rx) = make_test_request();
460                matches!(c.try_enqueue(req).await, EnqueueResult::Queued)
461            }));
462        }
463
464        let mut queued = 0;
465        for h in handles {
466            if h.await.unwrap() {
467                queued += 1;
468            }
469        }
470        assert_eq!(queued, 50);
471    }
472
473    #[tokio::test]
474    async fn promote_during_concurrent_enqueue() {
475        let (controller, _promote_rx) = StandbyController::new_standby(1000);
476
477        // Start enqueueing
478        let c1 = controller.clone();
479        let enqueue_handle = tokio::spawn(async move {
480            let mut count = 0;
481            for _ in 0..50 {
482                let (req, _rx) = make_test_request();
483                if matches!(c1.try_enqueue(req).await, EnqueueResult::Queued) {
484                    count += 1;
485                }
486                tokio::task::yield_now().await;
487            }
488            count
489        });
490
491        // Promote mid-stream
492        tokio::task::yield_now().await;
493        let result = controller.promote().await;
494        let orders = controller.take_drain_batch().await;
495
496        let enqueued_after = enqueue_handle.await.unwrap();
497        match result {
498            PromoteResult::Promoted { queued_orders } => {
499                // All 50 orders should have been accepted: some before promote
500                // (in standby queue returned as `orders`), rest during drain
501                // (in drain queue, retrievable via finish_drain).
502                // enqueued_after counts ALL successful enqueues from the spawned task,
503                // which includes both pre-promote and during-drain enqueues.
504                assert!(queued_orders <= 50);
505                assert!(enqueued_after <= 50);
506                assert_eq!(orders.len(), queued_orders);
507            }
508            _ => panic!("Should have promoted"),
509        }
510    }
511
512    #[tokio::test]
513    async fn drain_preserves_order() {
514        let (controller, _promote_rx) = StandbyController::new_standby(100);
515
516        // Enqueue orders with distinguishable request_ids
517        for i in 0..5 {
518            let (mut req, _rx) = make_test_request();
519            req.message.request_id = Some(format!("order-{}", i));
520            controller.try_enqueue(req).await;
521        }
522
523        controller.promote().await;
524        let orders = controller.take_drain_batch().await;
525        let ids: Vec<String> = orders
526            .iter()
527            .map(|o| o.message.request_id.clone().unwrap())
528            .collect();
529        assert_eq!(
530            ids,
531            vec!["order-0", "order-1", "order-2", "order-3", "order-4"]
532        );
533    }
534
535    #[tokio::test]
536    async fn drain_to_preserves_fifo_into_engine() {
537        let (controller, _promote_rx) = StandbyController::new_standby(100);
538
539        for i in 0..3 {
540            let (mut req, _rx) = make_test_request();
541            req.message.request_id = Some(format!("req-{}", i));
542            controller.try_enqueue(req).await;
543        }
544
545        controller.promote().await;
546        let orders = controller.take_drain_batch().await;
547        let (engine_tx, mut engine_rx) = tokio_mpsc::channel(100);
548        let (sent, unsent) = StandbyController::drain_to(orders, &engine_tx).await;
549        assert_eq!(sent, 3);
550        assert!(unsent.is_empty());
551
552        let mut received_ids = Vec::new();
553        while let Ok(req) = engine_rx.try_recv() {
554            received_ids.push(req.message.request_id.clone().unwrap());
555        }
556        assert_eq!(received_ids, vec!["req-0", "req-1", "req-2"]);
557    }
558
559    #[tokio::test]
560    async fn response_channels_survive_queue() {
561        let (controller, _promote_rx) = StandbyController::new_standby(100);
562
563        let (req, mut response_rx) = make_test_request();
564        let _response_tx = req.response_tx.clone();
565        controller.try_enqueue(req).await;
566
567        controller.promote().await;
568        let orders = controller.take_drain_batch().await;
569        let (engine_tx, mut engine_rx) = tokio_mpsc::channel(100);
570        StandbyController::drain_to(orders, &engine_tx).await;
571
572        // Simulate engine processing: receive order and send response
573        let received = engine_rx.try_recv().unwrap();
574        let response = hypercall_types::OrderUpdateMessage {
575            order_id: Some(42),
576            info: received.message.info.clone(),
577            status: hypercall_types::OrderUpdateStatus::Canceled,
578            timestamp: 1,
579            reason: None,
580            filled_size: Decimal::ZERO,
581            wallet_address: hypercall_types::WalletAddress::default(),
582            mmp_triggered: false,
583            request_id: None,
584        };
585        received.response_tx.send(response).await.unwrap();
586
587        // The original handler's response_rx should receive it
588        let resp = response_rx.recv().await.unwrap();
589        assert_eq!(resp.order_id, Some(42));
590    }
591
592    #[tokio::test]
593    async fn zero_capacity_rejects_everything() {
594        let (controller, _promote_rx) = StandbyController::new_standby(0);
595
596        let (req, _rx) = make_test_request();
597        assert!(matches!(
598            controller.try_enqueue(req).await,
599            EnqueueResult::QueueFull(_)
600        ));
601
602        // But promote still works
603        let result = controller.promote().await;
604        let orders = controller.take_drain_batch().await;
605        assert_eq!(result, PromoteResult::Promoted { queued_orders: 0 });
606        assert!(orders.is_empty());
607    }
608
609    #[tokio::test]
610    async fn controller_dropped_closes_response_channels() {
611        // Simulates crash during standby: controller is dropped,
612        // HTTP handlers waiting on response_rx should get a clean error
613        let (req, mut response_rx) = make_test_request();
614
615        {
616            let (controller, _promote_rx) = StandbyController::new_standby(100);
617            controller.try_enqueue(req).await;
618            // controller + queued orders dropped here
619        }
620
621        // The response_tx inside the queued request is now dropped
622        // HTTP handler should see None (channel closed), not hang forever
623        let result =
624            tokio::time::timeout(std::time::Duration::from_millis(100), response_rx.recv()).await;
625        assert!(
626            result.is_ok(),
627            "recv should return immediately, not timeout"
628        );
629        assert!(
630            result.unwrap().is_none(),
631            "Should get None (channel closed)"
632        );
633    }
634
635    #[tokio::test]
636    async fn promote_signal_dropped_without_sending() {
637        // Simulates: controller is dropped without calling promote().
638        // The engine task waiting on promote_rx should get an error, not hang.
639        let promote_rx;
640        {
641            let (controller, rx) = StandbyController::new_standby(100);
642            promote_rx = rx;
643            // controller dropped without promote()
644            drop(controller);
645        }
646
647        let result = tokio::time::timeout(std::time::Duration::from_millis(100), promote_rx).await;
648        assert!(result.is_ok(), "promote_rx should resolve, not hang");
649        assert!(
650            result.unwrap().is_err(),
651            "Should get RecvError (sender dropped)"
652        );
653    }
654
655    #[tokio::test]
656    async fn partial_drain_does_not_corrupt_remaining() {
657        // Engine channel has capacity 1. We drain 3 orders.
658        // First goes through, channel blocks, we verify partial drain is clean.
659        let (controller, _promote_rx) = StandbyController::new_standby(100);
660
661        for i in 0..3 {
662            let (mut req, _rx) = make_test_request();
663            req.message.request_id = Some(format!("order-{}", i));
664            controller.try_enqueue(req).await;
665        }
666
667        controller.promote().await;
668        let orders = controller.take_drain_batch().await;
669
670        // Channel capacity 1: first send succeeds, second blocks
671        let (engine_tx, mut engine_rx) = tokio_mpsc::channel(1);
672
673        // Drain with a timeout to simulate partial drain before crash
674        let drain_handle =
675            tokio::spawn(async move { StandbyController::drain_to(orders, &engine_tx).await });
676
677        // Read one order to make room, then drop the receiver (simulate crash)
678        let first = engine_rx.recv().await.unwrap();
679        assert_eq!(first.message.request_id.as_deref(), Some("order-0"));
680        drop(engine_rx);
681
682        let (sent, unsent) =
683            tokio::time::timeout(std::time::Duration::from_millis(500), drain_handle)
684                .await
685                .expect("drain should finish, not hang")
686                .expect("drain task should not panic");
687
688        // Some orders sent before channel closed — at least 1 (order-0),
689        // possibly 2 (order-1 was buffered in the channel capacity)
690        assert!((1..=3).contains(&sent));
691        // All orders accounted for: sent + unsent = total
692        assert_eq!(sent + unsent.len(), 3);
693    }
694
695    #[tokio::test]
696    async fn queued_order_response_tx_remains_valid_through_promote() {
697        // End-to-end: enqueue → promote → drain → engine processes → HTTP handler gets response
698        // This is the golden path for the standby queue during deploy switchover
699        let (controller, promote_rx) = StandbyController::new_standby(100);
700
701        // HTTP handler enqueues 3 orders and holds response receivers
702        let mut response_rxs = Vec::new();
703        for i in 0..3 {
704            let (mut req, rx) = make_test_request();
705            req.message.request_id = Some(format!("deploy-order-{}", i));
706            controller.try_enqueue(req).await;
707            response_rxs.push(rx);
708        }
709
710        // Simulate engine waiting for promote, then starting
711        let (engine_tx, mut engine_rx) = tokio_mpsc::channel::<UnifiedEngineRequest>(100);
712        let engine_handle = tokio::spawn(async move {
713            // Wait for promote signal
714            promote_rx.await.ok();
715
716            // Process all orders
717            let mut processed = 0;
718            while let Some(req) = engine_rx.recv().await {
719                let resp: hypercall_types::OrderUpdateMessage =
720                    hypercall_types::OrderUpdateMessage {
721                        order_id: Some(100 + processed as u64),
722                        info: req.message.info.clone(),
723                        status: hypercall_types::OrderUpdateStatus::Open,
724                        timestamp: 1,
725                        reason: None,
726                        filled_size: Decimal::ZERO,
727                        wallet_address: hypercall_types::WalletAddress::default(),
728                        mmp_triggered: false,
729                        request_id: req.message.request_id.clone(),
730                    };
731                req.response_tx.send(resp).await.ok();
732                processed += 1;
733            }
734            processed
735        });
736
737        // Deploy script: promote and drain
738        controller.promote().await;
739        let orders = controller.take_drain_batch().await;
740        StandbyController::drain_to(orders, &engine_tx).await;
741        // Drain any orders that arrived during the drain phase
742        loop {
743            let additional = controller.take_drain_batch().await;
744            if additional.is_empty() {
745                break;
746            }
747            StandbyController::drain_to(additional, &engine_tx).await;
748        }
749        drop(engine_tx); // Signal no more orders
750
751        // Engine should process all 3
752        let processed = tokio::time::timeout(std::time::Duration::from_millis(500), engine_handle)
753            .await
754            .unwrap()
755            .unwrap();
756        assert_eq!(processed, 3);
757
758        // All HTTP handlers should have received their responses
759        for (i, mut rx) in response_rxs.into_iter().enumerate() {
760            let resp = rx.recv().await.unwrap();
761            assert_eq!(resp.order_id, Some(100 + i as u64));
762            assert_eq!(
763                resp.request_id.as_deref(),
764                Some(&format!("deploy-order-{}", i) as &str)
765            );
766        }
767    }
768
769    #[tokio::test]
770    async fn promote_signal_unblocks_waiting_task() {
771        let (controller, promote_rx) = StandbyController::new_standby(100);
772
773        let controller_clone = controller.clone();
774        let waiter = tokio::spawn(async move {
775            // Simulate engine waiting for promote
776            promote_rx.await.ok();
777            true
778        });
779
780        // Small delay to ensure waiter is parked
781        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
782
783        controller_clone.promote().await;
784
785        let unblocked = tokio::time::timeout(std::time::Duration::from_millis(100), waiter)
786            .await
787            .expect("Waiter should complete within timeout")
788            .expect("Waiter task should not panic");
789
790        assert!(unblocked);
791    }
792}