1use std::sync::Arc;
2use tokio::sync::{mpsc, oneshot, Mutex};
3use tracing::info;
4
5use crate::rsm::unified_engine::UnifiedEngineRequest;
6
7#[derive(Clone)]
15pub struct StandbyController {
16 state: Arc<Mutex<StandbyState>>,
17}
18
19enum StandbyState {
20 Standby {
21 promote_tx: Option<oneshot::Sender<()>>,
22 queue: Vec<UnifiedEngineRequest>,
23 capacity: usize,
24 },
25 Draining {
26 queue: Vec<UnifiedEngineRequest>,
27 capacity: usize,
28 },
29 Active,
30}
31
32pub enum EnqueueResult {
34 Queued,
36 NotStandby(UnifiedEngineRequest),
38 QueueFull(UnifiedEngineRequest),
40}
41
42#[derive(Debug, PartialEq)]
44pub enum PromoteResult {
45 Promoted { queued_orders: usize },
47 AlreadyActive,
49}
50
51impl StandbyController {
52 pub fn new_standby(capacity: usize) -> (Self, oneshot::Receiver<()>) {
55 let (promote_tx, promote_rx) = oneshot::channel();
56 let controller = Self {
57 state: Arc::new(Mutex::new(StandbyState::Standby {
58 promote_tx: Some(promote_tx),
59 queue: Vec::with_capacity(capacity.min(1024)),
60 capacity,
61 })),
62 };
63 (controller, promote_rx)
64 }
65
66 pub fn new_active() -> Self {
68 Self {
69 state: Arc::new(Mutex::new(StandbyState::Active)),
70 }
71 }
72
73 pub async fn is_standby(&self) -> bool {
76 !matches!(*self.state.lock().await, StandbyState::Active)
77 }
78
79 pub async fn is_promoted(&self) -> bool {
81 !matches!(*self.state.lock().await, StandbyState::Standby { .. })
82 }
83
84 pub async fn try_enqueue(&self, request: UnifiedEngineRequest) -> EnqueueResult {
89 let mut state = self.state.lock().await;
90 match &mut *state {
91 StandbyState::Active => EnqueueResult::NotStandby(request),
92 StandbyState::Standby {
93 queue, capacity, ..
94 }
95 | StandbyState::Draining { queue, capacity } => {
96 if queue.len() >= *capacity {
97 EnqueueResult::QueueFull(request)
98 } else {
99 queue.push(request);
100 EnqueueResult::Queued
101 }
102 }
103 }
104 }
105
106 pub async fn promote(&self) -> PromoteResult {
113 let mut state = self.state.lock().await;
114 match &*state {
115 StandbyState::Standby { .. } => {
116 let capacity = match &*state {
117 StandbyState::Standby { capacity, .. } => *capacity,
118 _ => unreachable!(),
119 };
120 match std::mem::replace(
121 &mut *state,
122 StandbyState::Draining {
123 queue: Vec::new(),
124 capacity,
125 },
126 ) {
127 StandbyState::Standby {
128 promote_tx,
129 queue,
130 capacity,
131 ..
132 } => {
133 let count = queue.len();
134 *state = StandbyState::Draining { queue, capacity };
136 if let Some(tx) = promote_tx {
137 let _ = tx.send(());
138 }
139 info!(queued_orders = count, "Promoted from standby to draining");
140 PromoteResult::Promoted {
141 queued_orders: count,
142 }
143 }
144 _ => unreachable!(),
145 }
146 }
147 StandbyState::Draining { .. } | StandbyState::Active => PromoteResult::AlreadyActive,
148 }
149 }
150
151 pub async fn take_drain_batch(&self) -> Vec<UnifiedEngineRequest> {
156 let mut state = self.state.lock().await;
157 match &mut *state {
158 StandbyState::Draining { queue, .. } => {
159 if queue.is_empty() {
160 *state = StandbyState::Active;
161 info!("Drain complete, transitioned to active");
162 Vec::new()
163 } else {
164 let orders = std::mem::take(queue);
165 info!(batch_size = orders.len(), "Taking drain batch");
166 orders
167 }
168 }
169 _ => Vec::new(),
170 }
171 }
172
173 pub async fn drain_to(
177 orders: Vec<UnifiedEngineRequest>,
178 engine_tx: &mpsc::Sender<UnifiedEngineRequest>,
179 ) -> (usize, Vec<UnifiedEngineRequest>) {
180 let total = orders.len();
181 let mut sent = 0;
182 let mut iter = orders.into_iter();
183 for order in iter.by_ref() {
184 match engine_tx.send(order).await {
185 Ok(()) => sent += 1,
186 Err(mpsc::error::SendError(failed_order)) => {
187 let remaining = total - sent - 1;
188 tracing::error!(
189 sent = sent,
190 remaining = remaining + 1,
191 "Engine channel closed during queue drain"
192 );
193 let mut unsent = vec![failed_order];
194 unsent.extend(iter);
195 return (sent, unsent);
196 }
197 }
198 }
199 (sent, Vec::new())
200 }
201}
202
203#[cfg(test)]
204mod tests {
205 use super::*;
206 use rust_decimal::Decimal;
207 use tokio::sync::mpsc as tokio_mpsc;
208
209 fn make_test_request() -> (
210 UnifiedEngineRequest,
211 mpsc::Receiver<hypercall_types::OrderUpdateMessage>,
212 ) {
213 use hypercall_types::{OrderAction, OrderActionMessage};
214 use hypercall_types::{OrderInfo, Side, TimeInForce, WalletAddress};
215
216 let (response_tx, response_rx) = tokio_mpsc::channel(1);
217 let request = UnifiedEngineRequest {
218 message: OrderActionMessage {
219 timestamp: 0,
220 info: OrderInfo {
221 symbol: "BTC-20260501-100000-C".to_string(),
222 price: Decimal::ZERO,
223 size: Decimal::ONE,
224 side: Side::Buy,
225 tif: TimeInForce::GTC,
226 client_id: None,
227 order_id: None,
228 is_perp: false,
229 underlying: None,
230 reduce_only: None,
231 nonce: None,
232 signature: None,
233 mmp_enabled: false,
234 builder_code_address: None,
235 },
236 action: OrderAction::CancelOrder,
237 wallet: WalletAddress::default(),
238 api_wallet_address: None,
239 mmp_triggered: false,
240 request_id: None,
241 },
242 response_tx,
243 enqueued_at: std::time::Instant::now(),
244 #[cfg(feature = "otel-tracing")]
245 trace_context: None,
246 };
247 (request, response_rx)
248 }
249
250 #[tokio::test]
251 async fn active_mode_does_not_enqueue() {
252 let controller = StandbyController::new_active();
253 let (req, _rx) = make_test_request();
254
255 let result = controller.try_enqueue(req).await;
256 assert!(
257 matches!(result, EnqueueResult::NotStandby(_)),
258 "Active controller should return NotStandby"
259 );
260 assert!(!controller.is_standby().await);
261 }
262
263 #[tokio::test]
264 async fn standby_mode_queues_orders() {
265 let (controller, _promote_rx) = StandbyController::new_standby(100);
266 assert!(controller.is_standby().await);
267
268 let (req, _rx) = make_test_request();
269 let result = controller.try_enqueue(req).await;
270 assert!(matches!(result, EnqueueResult::Queued));
271 }
272
273 #[tokio::test]
274 async fn standby_rejects_when_full() {
275 let (controller, _promote_rx) = StandbyController::new_standby(2);
276
277 let (req1, _rx1) = make_test_request();
278 let (req2, _rx2) = make_test_request();
279 let (req3, _rx3) = make_test_request();
280
281 assert!(matches!(
282 controller.try_enqueue(req1).await,
283 EnqueueResult::Queued
284 ));
285 assert!(matches!(
286 controller.try_enqueue(req2).await,
287 EnqueueResult::Queued
288 ));
289 assert!(
290 matches!(
291 controller.try_enqueue(req3).await,
292 EnqueueResult::QueueFull(_)
293 ),
294 "Should reject when at capacity"
295 );
296 }
297
298 #[tokio::test]
299 async fn promote_drains_queued_orders() {
300 let (controller, _promote_rx) = StandbyController::new_standby(100);
301
302 let (req1, _rx1) = make_test_request();
303 let (req2, _rx2) = make_test_request();
304 controller.try_enqueue(req1).await;
305 controller.try_enqueue(req2).await;
306
307 let result = controller.promote().await;
308 let orders = controller.take_drain_batch().await;
309 assert_eq!(result, PromoteResult::Promoted { queued_orders: 2 });
310 assert_eq!(orders.len(), 2);
311 assert!(controller.is_standby().await);
313
314 let remaining = controller.take_drain_batch().await;
315 assert!(remaining.is_empty());
316 assert!(!controller.is_standby().await);
317 }
318
319 #[tokio::test]
320 async fn promote_sends_signal() {
321 let (controller, promote_rx) = StandbyController::new_standby(100);
322
323 let result = controller.promote().await;
324 assert_eq!(result, PromoteResult::Promoted { queued_orders: 0 });
325
326 assert!(promote_rx.await.is_ok());
328 }
329
330 #[tokio::test]
331 async fn double_promote_returns_already_active() {
332 let (controller, _promote_rx) = StandbyController::new_standby(100);
333
334 let result1 = controller.promote().await;
335 assert_eq!(result1, PromoteResult::Promoted { queued_orders: 0 });
336
337 let result2 = controller.promote().await;
339 assert_eq!(result2, PromoteResult::AlreadyActive);
340
341 controller.take_drain_batch().await;
343 let result3 = controller.promote().await;
344 assert_eq!(result3, PromoteResult::AlreadyActive);
345 }
346
347 #[tokio::test]
348 async fn during_drain_orders_still_queue() {
349 let (controller, _promote_rx) = StandbyController::new_standby(100);
350 controller.promote().await;
351
352 let (req, _rx) = make_test_request();
354 let result = controller.try_enqueue(req).await;
355 assert!(
356 matches!(result, EnqueueResult::Queued),
357 "Should still queue during drain"
358 );
359
360 let remaining = controller.take_drain_batch().await;
362 assert_eq!(remaining.len(), 1);
363 let remaining2 = controller.take_drain_batch().await;
365 assert!(remaining2.is_empty());
366 assert!(!controller.is_standby().await);
367
368 let (req2, _rx2) = make_test_request();
369 let result2 = controller.try_enqueue(req2).await;
370 assert!(
371 matches!(result2, EnqueueResult::NotStandby(_)),
372 "Should return NotStandby after drain complete"
373 );
374 }
375
376 #[tokio::test]
377 async fn drain_to_sends_all_orders() {
378 let (controller, _promote_rx) = StandbyController::new_standby(100);
379
380 let (req1, _rx1) = make_test_request();
381 let (req2, _rx2) = make_test_request();
382 let (req3, _rx3) = make_test_request();
383 controller.try_enqueue(req1).await;
384 controller.try_enqueue(req2).await;
385 controller.try_enqueue(req3).await;
386
387 controller.promote().await;
388 let orders = controller.take_drain_batch().await;
389
390 let (engine_tx, mut engine_rx) = tokio_mpsc::channel(100);
391 let (sent, unsent) = StandbyController::drain_to(orders, &engine_tx).await;
392 assert_eq!(sent, 3);
393 assert!(unsent.is_empty());
394
395 let mut received = 0;
397 while engine_rx.try_recv().is_ok() {
398 received += 1;
399 }
400 assert_eq!(received, 3);
401 }
402
403 #[tokio::test]
404 async fn drain_to_handles_closed_channel() {
405 let (controller, _promote_rx) = StandbyController::new_standby(100);
406
407 let (req1, _rx1) = make_test_request();
408 let (req2, _rx2) = make_test_request();
409 controller.try_enqueue(req1).await;
410 controller.try_enqueue(req2).await;
411
412 controller.promote().await;
413 let orders = controller.take_drain_batch().await;
414
415 let (engine_tx, engine_rx) = tokio_mpsc::channel(100);
416 drop(engine_rx);
417 let (sent, unsent) = StandbyController::drain_to(orders, &engine_tx).await;
418 assert_eq!(sent, 0);
420 assert_eq!(unsent.len(), 2);
422 }
423
424 #[tokio::test]
425 async fn concurrent_enqueue_is_safe() {
426 let (controller, _promote_rx) = StandbyController::new_standby(1000);
427 let mut handles = Vec::new();
428
429 for _ in 0..100 {
430 let c = controller.clone();
431 handles.push(tokio::spawn(async move {
432 let (req, _rx) = make_test_request();
433 matches!(c.try_enqueue(req).await, EnqueueResult::Queued)
434 }));
435 }
436
437 let mut queued = 0;
438 for h in handles {
439 if h.await.unwrap() {
440 queued += 1;
441 }
442 }
443 assert_eq!(queued, 100);
444
445 let result = controller.promote().await;
446 let orders = controller.take_drain_batch().await;
447 assert_eq!(result, PromoteResult::Promoted { queued_orders: 100 });
448 assert_eq!(orders.len(), 100);
449 }
450
451 #[tokio::test]
452 async fn concurrent_enqueue_respects_capacity() {
453 let (controller, _promote_rx) = StandbyController::new_standby(50);
454 let mut handles = Vec::new();
455
456 for _ in 0..100 {
457 let c = controller.clone();
458 handles.push(tokio::spawn(async move {
459 let (req, _rx) = make_test_request();
460 matches!(c.try_enqueue(req).await, EnqueueResult::Queued)
461 }));
462 }
463
464 let mut queued = 0;
465 for h in handles {
466 if h.await.unwrap() {
467 queued += 1;
468 }
469 }
470 assert_eq!(queued, 50);
471 }
472
473 #[tokio::test]
474 async fn promote_during_concurrent_enqueue() {
475 let (controller, _promote_rx) = StandbyController::new_standby(1000);
476
477 let c1 = controller.clone();
479 let enqueue_handle = tokio::spawn(async move {
480 let mut count = 0;
481 for _ in 0..50 {
482 let (req, _rx) = make_test_request();
483 if matches!(c1.try_enqueue(req).await, EnqueueResult::Queued) {
484 count += 1;
485 }
486 tokio::task::yield_now().await;
487 }
488 count
489 });
490
491 tokio::task::yield_now().await;
493 let result = controller.promote().await;
494 let orders = controller.take_drain_batch().await;
495
496 let enqueued_after = enqueue_handle.await.unwrap();
497 match result {
498 PromoteResult::Promoted { queued_orders } => {
499 assert!(queued_orders <= 50);
505 assert!(enqueued_after <= 50);
506 assert_eq!(orders.len(), queued_orders);
507 }
508 _ => panic!("Should have promoted"),
509 }
510 }
511
512 #[tokio::test]
513 async fn drain_preserves_order() {
514 let (controller, _promote_rx) = StandbyController::new_standby(100);
515
516 for i in 0..5 {
518 let (mut req, _rx) = make_test_request();
519 req.message.request_id = Some(format!("order-{}", i));
520 controller.try_enqueue(req).await;
521 }
522
523 controller.promote().await;
524 let orders = controller.take_drain_batch().await;
525 let ids: Vec<String> = orders
526 .iter()
527 .map(|o| o.message.request_id.clone().unwrap())
528 .collect();
529 assert_eq!(
530 ids,
531 vec!["order-0", "order-1", "order-2", "order-3", "order-4"]
532 );
533 }
534
535 #[tokio::test]
536 async fn drain_to_preserves_fifo_into_engine() {
537 let (controller, _promote_rx) = StandbyController::new_standby(100);
538
539 for i in 0..3 {
540 let (mut req, _rx) = make_test_request();
541 req.message.request_id = Some(format!("req-{}", i));
542 controller.try_enqueue(req).await;
543 }
544
545 controller.promote().await;
546 let orders = controller.take_drain_batch().await;
547 let (engine_tx, mut engine_rx) = tokio_mpsc::channel(100);
548 let (sent, unsent) = StandbyController::drain_to(orders, &engine_tx).await;
549 assert_eq!(sent, 3);
550 assert!(unsent.is_empty());
551
552 let mut received_ids = Vec::new();
553 while let Ok(req) = engine_rx.try_recv() {
554 received_ids.push(req.message.request_id.clone().unwrap());
555 }
556 assert_eq!(received_ids, vec!["req-0", "req-1", "req-2"]);
557 }
558
559 #[tokio::test]
560 async fn response_channels_survive_queue() {
561 let (controller, _promote_rx) = StandbyController::new_standby(100);
562
563 let (req, mut response_rx) = make_test_request();
564 let _response_tx = req.response_tx.clone();
565 controller.try_enqueue(req).await;
566
567 controller.promote().await;
568 let orders = controller.take_drain_batch().await;
569 let (engine_tx, mut engine_rx) = tokio_mpsc::channel(100);
570 StandbyController::drain_to(orders, &engine_tx).await;
571
572 let received = engine_rx.try_recv().unwrap();
574 let response = hypercall_types::OrderUpdateMessage {
575 order_id: Some(42),
576 info: received.message.info.clone(),
577 status: hypercall_types::OrderUpdateStatus::Canceled,
578 timestamp: 1,
579 reason: None,
580 filled_size: Decimal::ZERO,
581 wallet_address: hypercall_types::WalletAddress::default(),
582 mmp_triggered: false,
583 request_id: None,
584 };
585 received.response_tx.send(response).await.unwrap();
586
587 let resp = response_rx.recv().await.unwrap();
589 assert_eq!(resp.order_id, Some(42));
590 }
591
592 #[tokio::test]
593 async fn zero_capacity_rejects_everything() {
594 let (controller, _promote_rx) = StandbyController::new_standby(0);
595
596 let (req, _rx) = make_test_request();
597 assert!(matches!(
598 controller.try_enqueue(req).await,
599 EnqueueResult::QueueFull(_)
600 ));
601
602 let result = controller.promote().await;
604 let orders = controller.take_drain_batch().await;
605 assert_eq!(result, PromoteResult::Promoted { queued_orders: 0 });
606 assert!(orders.is_empty());
607 }
608
609 #[tokio::test]
610 async fn controller_dropped_closes_response_channels() {
611 let (req, mut response_rx) = make_test_request();
614
615 {
616 let (controller, _promote_rx) = StandbyController::new_standby(100);
617 controller.try_enqueue(req).await;
618 }
620
621 let result =
624 tokio::time::timeout(std::time::Duration::from_millis(100), response_rx.recv()).await;
625 assert!(
626 result.is_ok(),
627 "recv should return immediately, not timeout"
628 );
629 assert!(
630 result.unwrap().is_none(),
631 "Should get None (channel closed)"
632 );
633 }
634
635 #[tokio::test]
636 async fn promote_signal_dropped_without_sending() {
637 let promote_rx;
640 {
641 let (controller, rx) = StandbyController::new_standby(100);
642 promote_rx = rx;
643 drop(controller);
645 }
646
647 let result = tokio::time::timeout(std::time::Duration::from_millis(100), promote_rx).await;
648 assert!(result.is_ok(), "promote_rx should resolve, not hang");
649 assert!(
650 result.unwrap().is_err(),
651 "Should get RecvError (sender dropped)"
652 );
653 }
654
655 #[tokio::test]
656 async fn partial_drain_does_not_corrupt_remaining() {
657 let (controller, _promote_rx) = StandbyController::new_standby(100);
660
661 for i in 0..3 {
662 let (mut req, _rx) = make_test_request();
663 req.message.request_id = Some(format!("order-{}", i));
664 controller.try_enqueue(req).await;
665 }
666
667 controller.promote().await;
668 let orders = controller.take_drain_batch().await;
669
670 let (engine_tx, mut engine_rx) = tokio_mpsc::channel(1);
672
673 let drain_handle =
675 tokio::spawn(async move { StandbyController::drain_to(orders, &engine_tx).await });
676
677 let first = engine_rx.recv().await.unwrap();
679 assert_eq!(first.message.request_id.as_deref(), Some("order-0"));
680 drop(engine_rx);
681
682 let (sent, unsent) =
683 tokio::time::timeout(std::time::Duration::from_millis(500), drain_handle)
684 .await
685 .expect("drain should finish, not hang")
686 .expect("drain task should not panic");
687
688 assert!((1..=3).contains(&sent));
691 assert_eq!(sent + unsent.len(), 3);
693 }
694
695 #[tokio::test]
696 async fn queued_order_response_tx_remains_valid_through_promote() {
697 let (controller, promote_rx) = StandbyController::new_standby(100);
700
701 let mut response_rxs = Vec::new();
703 for i in 0..3 {
704 let (mut req, rx) = make_test_request();
705 req.message.request_id = Some(format!("deploy-order-{}", i));
706 controller.try_enqueue(req).await;
707 response_rxs.push(rx);
708 }
709
710 let (engine_tx, mut engine_rx) = tokio_mpsc::channel::<UnifiedEngineRequest>(100);
712 let engine_handle = tokio::spawn(async move {
713 promote_rx.await.ok();
715
716 let mut processed = 0;
718 while let Some(req) = engine_rx.recv().await {
719 let resp: hypercall_types::OrderUpdateMessage =
720 hypercall_types::OrderUpdateMessage {
721 order_id: Some(100 + processed as u64),
722 info: req.message.info.clone(),
723 status: hypercall_types::OrderUpdateStatus::Open,
724 timestamp: 1,
725 reason: None,
726 filled_size: Decimal::ZERO,
727 wallet_address: hypercall_types::WalletAddress::default(),
728 mmp_triggered: false,
729 request_id: req.message.request_id.clone(),
730 };
731 req.response_tx.send(resp).await.ok();
732 processed += 1;
733 }
734 processed
735 });
736
737 controller.promote().await;
739 let orders = controller.take_drain_batch().await;
740 StandbyController::drain_to(orders, &engine_tx).await;
741 loop {
743 let additional = controller.take_drain_batch().await;
744 if additional.is_empty() {
745 break;
746 }
747 StandbyController::drain_to(additional, &engine_tx).await;
748 }
749 drop(engine_tx); let processed = tokio::time::timeout(std::time::Duration::from_millis(500), engine_handle)
753 .await
754 .unwrap()
755 .unwrap();
756 assert_eq!(processed, 3);
757
758 for (i, mut rx) in response_rxs.into_iter().enumerate() {
760 let resp = rx.recv().await.unwrap();
761 assert_eq!(resp.order_id, Some(100 + i as u64));
762 assert_eq!(
763 resp.request_id.as_deref(),
764 Some(&format!("deploy-order-{}", i) as &str)
765 );
766 }
767 }
768
769 #[tokio::test]
770 async fn promote_signal_unblocks_waiting_task() {
771 let (controller, promote_rx) = StandbyController::new_standby(100);
772
773 let controller_clone = controller.clone();
774 let waiter = tokio::spawn(async move {
775 promote_rx.await.ok();
777 true
778 });
779
780 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
782
783 controller_clone.promote().await;
784
785 let unblocked = tokio::time::timeout(std::time::Duration::from_millis(100), waiter)
786 .await
787 .expect("Waiter should complete within timeout")
788 .expect("Waiter task should not panic");
789
790 assert!(unblocked);
791 }
792}