Skip to main content

hypercall_api/handlers/
bulk_orders.rs

1use std::str::FromStr;
2use std::time::Instant;
3
4use rust_decimal::prelude::ToPrimitive;
5use rust_decimal::Decimal;
6use rust_decimal_macros::dec;
7use utoipa::ToSchema;
8
9use crate::error::ApiError;
10use crate::request_auth::verify_request;
11use crate::sonic_json::SonicJson;
12use axum::extract::State;
13use hypercall_runtime_api::increment_pending_requests;
14use hypercall_types::utils::get_timestamp_millis;
15use hypercall_types::{
16    to_contract_units_decimal, validate_price_precision, CancelOrderRequest, PlaceOrderRequest,
17    MAX_PRICE_SIGNIFICANT_FIGURES,
18};
19use hypercall_types::{
20    OrderAction, OrderActionMessage, OrderInfo, OrderUpdateMessage, OrderUpdateStatus, TimeInForce,
21};
22use hypercall_types::{ParsedOptionSymbol, Side};
23use serde::{Deserialize, Serialize};
24use tokio::sync::mpsc;
25use tokio::time::timeout;
26use uuid::Uuid;
27
28#[cfg(feature = "otel-tracing")]
29use tracing_opentelemetry::OpenTelemetrySpanExt;
30
31use super::{ensure_order_creation_allowed, AppState, ENGINE_RESPONSE_TIMEOUT};
32
33use hypercall_types::CancelOrderByCloidRequest;
34
35pub(crate) const MAX_BULK_ORDER_SIZE: usize = 50;
36
37#[derive(Debug, Deserialize, ToSchema)]
38pub struct BulkPlaceOrderRequest {
39    /// Array of orders to place (max 50)
40    pub orders: Vec<PlaceOrderRequest>,
41}
42
43#[derive(Debug, Clone, Serialize, ToSchema)]
44pub struct BulkOrderResult {
45    /// Index of the order in the original request
46    pub index: usize,
47    /// Whether the operation succeeded
48    pub success: bool,
49    /// Order update data if successful
50    pub data: Option<OrderUpdateMessage>,
51    /// Error message if failed
52    pub error: Option<String>,
53}
54
55#[derive(Debug, Serialize, ToSchema)]
56pub struct BulkPlaceOrderResponse {
57    /// Results for each order in the request
58    pub results: Vec<BulkOrderResult>,
59}
60
61#[derive(Debug, Deserialize, ToSchema)]
62pub struct BulkCancelOrderRequest {
63    /// Array of cancel requests by order_id (max 50)
64    pub cancels: Vec<CancelOrderRequest>,
65}
66
67#[derive(Debug, Deserialize, ToSchema)]
68pub struct BulkCancelOrderByCloidRequest {
69    /// Array of cancel requests by client_id (max 50)
70    pub cancels: Vec<CancelOrderByCloidRequest>,
71}
72
73#[derive(Debug, Serialize, ToSchema)]
74pub struct BulkCancelOrderResponse {
75    /// Results for each cancel in the request
76    pub results: Vec<BulkOrderResult>,
77}
78
79/// Place multiple orders in a single request
80#[utoipa::path(
81    post,
82    path = "/bulk_order",
83    request_body = BulkPlaceOrderRequest,
84    responses(
85        (status = 200, description = "Bulk order results", body = BulkPlaceOrderResponse),
86        (status = 400, description = "Invalid request (e.g., too many orders)"),
87        (status = 401, description = "Unauthorized"),
88        (status = 500, description = "Internal server error")
89    ),
90    security(("eip712_signature" = [])),
91    tag = "Trading"
92)]
93pub async fn bulk_place_order(
94    State(state): State<AppState>,
95    SonicJson(request): SonicJson<BulkPlaceOrderRequest>,
96) -> Result<SonicJson<BulkPlaceOrderResponse>, ApiError> {
97    // Validate batch size
98    if request.orders.len() > MAX_BULK_ORDER_SIZE {
99        tracing::warn!(
100            "Bulk order request exceeds max size: {} > {}",
101            request.orders.len(),
102            MAX_BULK_ORDER_SIZE
103        );
104        return Err(ApiError::bad_request(format!(
105            "Bulk order request exceeds max size: {} > {}",
106            request.orders.len(),
107            MAX_BULK_ORDER_SIZE
108        )));
109    }
110
111    tracing::info!(
112        "Processing bulk order request with {} orders",
113        request.orders.len()
114    );
115
116    let mut results = Vec::with_capacity(request.orders.len());
117
118    for (index, order_req) in request.orders.into_iter().enumerate() {
119        let result = process_single_order(&state, order_req, index).await;
120        results.push(result);
121    }
122
123    Ok(SonicJson(BulkPlaceOrderResponse { results }))
124}
125
126/// Helper function to process a single order within a bulk request
127async fn process_single_order(
128    state: &AppState,
129    order_req: PlaceOrderRequest,
130    index: usize,
131) -> BulkOrderResult {
132    let authorized = match verify_request(
133        state.agent_auth.as_ref(),
134        &order_req,
135        state.runtime_config.signing_chain_id,
136    ) {
137        Ok(authorized) => authorized,
138        Err(e) => {
139            tracing::warn!("Failed to verify bulk order {}: {}", index, e);
140            return BulkOrderResult {
141                index,
142                success: false,
143                data: None,
144                error: Some(e.to_string()),
145            };
146        }
147    };
148
149    // Validate the order by parsing the symbol
150    let _parsed_symbol = match ParsedOptionSymbol::from_symbol(&order_req.symbol) {
151        Ok(s) => s,
152        Err(e) => {
153            return BulkOrderResult {
154                index,
155                success: false,
156                data: None,
157                error: Some(format!("Invalid symbol: {}", e)),
158            };
159        }
160    };
161
162    if let Err(err) = ensure_order_creation_allowed(state, &order_req.symbol).await {
163        return BulkOrderResult {
164            index,
165            success: false,
166            data: None,
167            error: Some(err.message),
168        };
169    }
170
171    // Parse price and size from strings as Decimal for precision
172    let price: Decimal = match Decimal::from_str(&order_req.price) {
173        Ok(p) => p,
174        Err(_) => {
175            return BulkOrderResult {
176                index,
177                success: false,
178                data: None,
179                error: Some(format!("Invalid price format: {}", order_req.price)),
180            };
181        }
182    };
183    let size: Decimal = match Decimal::from_str(&order_req.size) {
184        Ok(s) => s,
185        Err(_) => {
186            return BulkOrderResult {
187                index,
188                success: false,
189                data: None,
190                error: Some(format!("Invalid size format: {}", order_req.size)),
191            };
192        }
193    };
194
195    // Validate order parameters
196    if price <= dec!(0) {
197        return BulkOrderResult {
198            index,
199            success: false,
200            data: None,
201            error: Some("Price must be greater than 0".to_string()),
202        };
203    }
204    if size <= dec!(0) {
205        return BulkOrderResult {
206            index,
207            success: false,
208            data: None,
209            error: Some("Size must be greater than 0".to_string()),
210        };
211    }
212
213    let price_f64 = match price.to_f64() {
214        Some(p) => p,
215        None => {
216            return BulkOrderResult {
217                index,
218                success: false,
219                data: None,
220                error: Some("Price value out of range".to_string()),
221            };
222        }
223    };
224    if let Err(e) = validate_price_precision(price_f64, MAX_PRICE_SIGNIFICANT_FIGURES) {
225        return BulkOrderResult {
226            index,
227            success: false,
228            data: None,
229            error: Some(format!("Price validation failed: {}", e)),
230        };
231    }
232
233    // Create OrderInfo
234    let order_info = OrderInfo {
235        symbol: order_req.symbol.clone(),
236        price,
237        size: to_contract_units_decimal(&order_req.symbol, size),
238        side: order_req.side,
239        tif: order_req.tif,
240        client_id: order_req.client_id,
241        order_id: None,
242        is_perp: false,
243        underlying: None,
244        reduce_only: None,
245        nonce: Some(order_req.nonce),
246        signature: None,
247        mmp_enabled: order_req.mmp_enabled,
248        builder_code_address: order_req.builder_code_address,
249    };
250
251    // Create OrderActionMessage
252    let order_action_msg = OrderActionMessage {
253        timestamp: get_timestamp_millis(),
254        info: order_info,
255        action: OrderAction::CreateOrder,
256        wallet: order_req.wallet,
257        api_wallet_address: Some(authorized.signer.signer_address),
258        mmp_triggered: false,
259        request_id: Some(Uuid::now_v7().to_string()),
260    };
261
262    // Create a channel for the response
263    let (response_tx, mut response_rx) = mpsc::channel(1);
264
265    // Create engine request
266    let engine_request = hypercall_runtime_api::UnifiedEngineRequest {
267        message: order_action_msg,
268        response_tx,
269        enqueued_at: Instant::now(),
270        #[cfg(feature = "otel-tracing")]
271        trace_context: Some(tracing::Span::current().context()),
272    };
273
274    // Send order to RSM engine
275    increment_pending_requests();
276    if state.order_sender.send(engine_request).await.is_err() {
277        tracing::error!("Failed to send order {} to engine", index);
278        return BulkOrderResult {
279            index,
280            success: false,
281            data: None,
282            error: Some("Failed to send order to engine".to_string()),
283        };
284    }
285
286    // Wait for response from engine with timeout
287    match timeout(ENGINE_RESPONSE_TIMEOUT, response_rx.recv()).await {
288        Ok(Some(response)) => {
289            let success = response.status == OrderUpdateStatus::Acked
290                || response.status == OrderUpdateStatus::Open
291                || response.status == OrderUpdateStatus::Filled
292                || response.status == OrderUpdateStatus::PartiallyFilled;
293            BulkOrderResult {
294                index,
295                success,
296                data: Some(response),
297                error: None,
298            }
299        }
300        Ok(None) => {
301            tracing::error!("No response from engine for order {}", index);
302            BulkOrderResult {
303                index,
304                success: false,
305                data: None,
306                error: Some("No response from engine".to_string()),
307            }
308        }
309        Err(_) => {
310            tracing::error!("Timeout waiting for engine response for order {}", index);
311            BulkOrderResult {
312                index,
313                success: false,
314                data: None,
315                error: Some("Engine response timeout".to_string()),
316            }
317        }
318    }
319}
320
321/// Cancel multiple orders by order_id in a single request
322#[utoipa::path(
323    delete,
324    path = "/bulk_order",
325    request_body = BulkCancelOrderRequest,
326    responses(
327        (status = 200, description = "Bulk cancel results", body = BulkCancelOrderResponse),
328        (status = 400, description = "Invalid request (e.g., too many cancels)"),
329        (status = 401, description = "Unauthorized"),
330        (status = 500, description = "Internal server error")
331    ),
332    security(("eip712_signature" = [])),
333    tag = "Trading"
334)]
335pub async fn bulk_cancel_order(
336    State(state): State<AppState>,
337    SonicJson(request): SonicJson<BulkCancelOrderRequest>,
338) -> Result<SonicJson<BulkCancelOrderResponse>, ApiError> {
339    // Validate batch size
340    if request.cancels.len() > MAX_BULK_ORDER_SIZE {
341        tracing::warn!(
342            "Bulk cancel request exceeds max size: {} > {}",
343            request.cancels.len(),
344            MAX_BULK_ORDER_SIZE
345        );
346        return Err(ApiError::bad_request(format!(
347            "Bulk cancel request exceeds max size: {} > {}",
348            request.cancels.len(),
349            MAX_BULK_ORDER_SIZE
350        )));
351    }
352
353    tracing::info!(
354        "Processing bulk cancel request with {} cancels",
355        request.cancels.len()
356    );
357
358    let mut results = Vec::with_capacity(request.cancels.len());
359
360    for (index, cancel_req) in request.cancels.into_iter().enumerate() {
361        let result = process_single_cancel_by_order_id(&state, cancel_req, index).await;
362        results.push(result);
363    }
364
365    Ok(SonicJson(BulkCancelOrderResponse { results }))
366}
367
368/// Helper function to process a single cancel by order_id within a bulk request
369async fn process_single_cancel_by_order_id(
370    state: &AppState,
371    cancel_req: CancelOrderRequest,
372    index: usize,
373) -> BulkOrderResult {
374    let authorized = match verify_request(
375        state.agent_auth.as_ref(),
376        &cancel_req,
377        state.runtime_config.signing_chain_id,
378    ) {
379        Ok(authorized) => authorized,
380        Err(e) => {
381            tracing::warn!("Failed to verify bulk cancel {}: {}", index, e);
382            return BulkOrderResult {
383                index,
384                success: false,
385                data: None,
386                error: Some(e.to_string()),
387            };
388        }
389    };
390
391    // Create minimal OrderInfo for cancellation
392    let order_info = OrderInfo {
393        symbol: String::new(), // Will be looked up by order_id in the engine
394        price: dec!(0),
395        size: dec!(0),
396        side: Side::Buy, // Dummy value
397        tif: TimeInForce::GTC,
398        client_id: None,
399        order_id: Some(cancel_req.order_id),
400        is_perp: false,
401        underlying: None,
402        reduce_only: None,
403        nonce: Some(cancel_req.nonce),
404        signature: None,
405        mmp_enabled: false,
406        builder_code_address: None,
407    };
408
409    // Create OrderActionMessage with CancelOrder action
410    let order_action_msg = OrderActionMessage {
411        timestamp: get_timestamp_millis(),
412        info: order_info,
413        action: OrderAction::CancelOrder,
414        wallet: cancel_req.wallet,
415        api_wallet_address: Some(authorized.signer.signer_address),
416        mmp_triggered: false,
417        request_id: Some(Uuid::now_v7().to_string()),
418    };
419
420    // Create a channel for the response
421    let (response_tx, mut response_rx) = mpsc::channel(1);
422
423    // Create engine request
424    let engine_request = hypercall_runtime_api::UnifiedEngineRequest {
425        message: order_action_msg,
426        response_tx,
427        enqueued_at: Instant::now(),
428        #[cfg(feature = "otel-tracing")]
429        trace_context: Some(tracing::Span::current().context()),
430    };
431
432    // Send cancel request to RSM engine
433    increment_pending_requests();
434    if state.order_sender.send(engine_request).await.is_err() {
435        tracing::error!("Failed to send cancel {} to engine", index);
436        return BulkOrderResult {
437            index,
438            success: false,
439            data: None,
440            error: Some("Failed to send cancel to engine".to_string()),
441        };
442    }
443
444    // Wait for response from engine with timeout
445    match timeout(ENGINE_RESPONSE_TIMEOUT, response_rx.recv()).await {
446        Ok(Some(response)) => {
447            let success = response.status == OrderUpdateStatus::Canceled;
448            BulkOrderResult {
449                index,
450                success,
451                data: Some(response),
452                error: None,
453            }
454        }
455        Ok(None) => {
456            tracing::error!("No response from engine for cancel {}", index);
457            BulkOrderResult {
458                index,
459                success: false,
460                data: None,
461                error: Some("No response from engine".to_string()),
462            }
463        }
464        Err(_) => {
465            tracing::error!("Timeout waiting for engine response for cancel {}", index);
466            BulkOrderResult {
467                index,
468                success: false,
469                data: None,
470                error: Some("Engine response timeout".to_string()),
471            }
472        }
473    }
474}
475
476/// Cancel multiple orders by client_id in a single request
477#[utoipa::path(
478    delete,
479    path = "/bulk_order_cloid",
480    request_body = BulkCancelOrderByCloidRequest,
481    responses(
482        (status = 200, description = "Bulk cancel results", body = BulkCancelOrderResponse),
483        (status = 400, description = "Invalid request (e.g., too many cancels)"),
484        (status = 401, description = "Unauthorized"),
485        (status = 500, description = "Internal server error")
486    ),
487    security(("eip712_signature" = [])),
488    tag = "Trading"
489)]
490pub async fn bulk_cancel_order_by_cloid(
491    State(state): State<AppState>,
492    SonicJson(request): SonicJson<BulkCancelOrderByCloidRequest>,
493) -> Result<SonicJson<BulkCancelOrderResponse>, ApiError> {
494    // Validate batch size
495    if request.cancels.len() > MAX_BULK_ORDER_SIZE {
496        tracing::warn!(
497            "Bulk cancel by cloid request exceeds max size: {} > {}",
498            request.cancels.len(),
499            MAX_BULK_ORDER_SIZE
500        );
501        return Err(ApiError::bad_request(format!(
502            "Bulk cancel by cloid request exceeds max size: {} > {}",
503            request.cancels.len(),
504            MAX_BULK_ORDER_SIZE
505        )));
506    }
507
508    tracing::info!(
509        "Processing bulk cancel by cloid request with {} cancels",
510        request.cancels.len()
511    );
512
513    let mut results = Vec::with_capacity(request.cancels.len());
514
515    for (index, cancel_req) in request.cancels.into_iter().enumerate() {
516        let result = process_single_cancel_by_cloid(&state, cancel_req, index).await;
517        results.push(result);
518    }
519
520    Ok(SonicJson(BulkCancelOrderResponse { results }))
521}
522
523/// Helper function to process a single cancel by client_id within a bulk request
524async fn process_single_cancel_by_cloid(
525    state: &AppState,
526    cancel_req: CancelOrderByCloidRequest,
527    index: usize,
528) -> BulkOrderResult {
529    let authorized = match verify_request(
530        state.agent_auth.as_ref(),
531        &cancel_req,
532        state.runtime_config.signing_chain_id,
533    ) {
534        Ok(authorized) => authorized,
535        Err(e) => {
536            tracing::warn!("Failed to verify bulk cancel by cloid {}: {}", index, e);
537            return BulkOrderResult {
538                index,
539                success: false,
540                data: None,
541                error: Some(e.to_string()),
542            };
543        }
544    };
545
546    // Create minimal OrderInfo for cancellation
547    let order_info = OrderInfo {
548        symbol: String::new(), // Will be looked up by client_id in the engine
549        price: dec!(0),
550        size: dec!(0),
551        side: Side::Buy, // Dummy value
552        tif: TimeInForce::GTC,
553        client_id: Some(cancel_req.client_id.clone()),
554        order_id: None,
555        is_perp: false,
556        underlying: None,
557        reduce_only: None,
558        nonce: Some(cancel_req.nonce),
559        signature: None,
560        mmp_enabled: false,
561        builder_code_address: None,
562    };
563
564    // Create OrderActionMessage with CancelOrder action
565    let order_action_msg = OrderActionMessage {
566        timestamp: get_timestamp_millis(),
567        info: order_info,
568        action: OrderAction::CancelOrder,
569        wallet: cancel_req.wallet,
570        api_wallet_address: Some(authorized.signer.signer_address),
571        mmp_triggered: false,
572        request_id: Some(Uuid::now_v7().to_string()),
573    };
574
575    // Create a channel for the response
576    let (response_tx, mut response_rx) = mpsc::channel(1);
577
578    // Create engine request
579    let engine_request = hypercall_runtime_api::UnifiedEngineRequest {
580        message: order_action_msg,
581        response_tx,
582        enqueued_at: Instant::now(),
583        #[cfg(feature = "otel-tracing")]
584        trace_context: Some(tracing::Span::current().context()),
585    };
586
587    // Send cancel request to RSM engine
588    increment_pending_requests();
589    if state.order_sender.send(engine_request).await.is_err() {
590        tracing::error!("Failed to send cancel by cloid {} to engine", index);
591        return BulkOrderResult {
592            index,
593            success: false,
594            data: None,
595            error: Some("Failed to send cancel to engine".to_string()),
596        };
597    }
598
599    // Wait for response from engine with timeout
600    match timeout(ENGINE_RESPONSE_TIMEOUT, response_rx.recv()).await {
601        Ok(Some(response)) => {
602            let success = response.status == OrderUpdateStatus::Canceled;
603            BulkOrderResult {
604                index,
605                success,
606                data: Some(response),
607                error: None,
608            }
609        }
610        Ok(None) => {
611            tracing::error!("No response from engine for cancel by cloid {}", index);
612            BulkOrderResult {
613                index,
614                success: false,
615                data: None,
616                error: Some("No response from engine".to_string()),
617            }
618        }
619        Err(_) => {
620            tracing::error!(
621                "Timeout waiting for engine response for cancel by cloid {}",
622                index
623            );
624            BulkOrderResult {
625                index,
626                success: false,
627                data: None,
628                error: Some("Engine response timeout".to_string()),
629            }
630        }
631    }
632}