1pub mod event_forwarder;
2pub mod rfq;
3
4use crate::boundary::read_models::PortfolioCacheApi;
5use crate::options_chain::{parse_expiry_date_to_code, OptionsChainOptionTypeFilter};
6use crate::rfq::handler_state::RfqHandlerState;
7use crate::rfq::indicative_quote_cache::IndicativeQuoteCache;
8use axum::{
9 extract::{
10 ws::{close_code, CloseFrame, Message, WebSocket, WebSocketUpgrade},
11 Query, State,
12 },
13 response::Response,
14};
15use dashmap::DashMap;
16use futures::{SinkExt, StreamExt};
17use hypercall_types::api_models::OptionsChainStrikeRow;
18pub use hypercall_types::ws_protocol::{
19 IndexPriceEntry, PortfolioUpdate, WsCandleUpdate, WsCompetitionFinalStanding,
20 WsCompetitionFinalStats, WsCompetitionGapUpdate, WsCompetitionPnlStanding,
21 WsCompetitionPnlSummary, WsCompetitionRankChange, WsCompetitionUpdate, WsFillUpdate,
22 WsIndexPriceUpdate, WsIndicativeMarketData, WsLiquidationStateChange, WsMarketUpdate,
23 WsMessage, WsOptionsChainUpdate, WsOrderMessage, WsOrderResult, WsOrderbookUpdate,
24 WsPositionExpired, WsRfqLegRequest, WsRfqQuoteEntry, WsRfqQuotes, WsRfqStatusUpdate,
25 WsTradeUpdate,
26};
27use hypercall_types::{Side, WalletAddress};
28use metrics::{counter, gauge, histogram};
29use rust_decimal::Decimal;
30use serde::Deserialize;
31use std::collections::HashSet;
32use std::str::FromStr;
33use std::sync::Arc;
34use tokio::sync::{mpsc, RwLock};
35use tokio::time::{self, Duration, Instant, MissedTickBehavior};
36use tracing::{debug, warn};
37
38const AUTHENTICATED_CHANNELS: &[&str] = &[
41 "order_updates",
42 "fills",
43 "portfolio",
44 "liquidation",
45 "competition",
46 "competition_engagement",
47 "rfq",
48];
49const DEFAULT_WS_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(20);
50const DEFAULT_WS_PONG_TIMEOUT: Duration = Duration::from_secs(60);
51
52fn validate_ws_heartbeat_config(
53 heartbeat_interval_ms: u64,
54 pong_timeout_ms: u64,
55) -> (Duration, Duration) {
56 if heartbeat_interval_ms == 0 || pong_timeout_ms == 0 {
57 warn!("WebSocket heartbeat settings must be positive, using defaults");
58 return (DEFAULT_WS_HEARTBEAT_INTERVAL, DEFAULT_WS_PONG_TIMEOUT);
59 }
60
61 let heartbeat_interval = Duration::from_millis(heartbeat_interval_ms);
62 let pong_timeout = Duration::from_millis(pong_timeout_ms);
63
64 if pong_timeout < heartbeat_interval {
65 warn!(
66 heartbeat_interval_ms,
67 pong_timeout_ms,
68 "WebSocket pong timeout must be greater than or equal to heartbeat interval, using defaults"
69 );
70 return (DEFAULT_WS_HEARTBEAT_INTERVAL, DEFAULT_WS_PONG_TIMEOUT);
71 }
72
73 (heartbeat_interval, pong_timeout)
74}
75
76pub fn ws_heartbeat_config(
77 api_config: &hypercall_config::ApiRuntimeConfig,
78) -> (Duration, Duration) {
79 validate_ws_heartbeat_config(
80 api_config.ws_heartbeat_interval_ms,
81 api_config.ws_pong_timeout_ms,
82 )
83}
84
85fn requires_authentication(channel: &str) -> bool {
87 AUTHENTICATED_CHANNELS.contains(&channel)
88}
89
90#[derive(Debug, Clone, PartialEq, Eq)]
91struct OptionsChainSubscription {
92 symbols: Option<Vec<String>>,
93 expiry: Option<u64>,
97 option_type: OptionsChainOptionTypeFilter,
98}
99
100impl Default for OptionsChainSubscription {
101 fn default() -> Self {
102 Self {
103 symbols: None,
104 expiry: None,
105 option_type: OptionsChainOptionTypeFilter::Both,
106 }
107 }
108}
109
110fn normalize_symbols(symbols: Option<Vec<String>>) -> Option<Vec<String>> {
111 let values = symbols?;
112 let mut deduped: Vec<String> = values
113 .into_iter()
114 .map(|value| value.trim().to_ascii_uppercase())
115 .filter(|value| !value.is_empty())
116 .collect();
117
118 if deduped.is_empty() {
119 return None;
120 }
121
122 deduped.sort();
123 deduped.dedup();
124 Some(deduped)
125}
126
127fn parse_options_chain_subscription(
128 symbols: Option<Vec<String>>,
129 expiry: Option<String>,
130 option_type: Option<String>,
131) -> Result<OptionsChainSubscription, String> {
132 let expiry = match expiry {
133 Some(value) if value.trim().is_empty() => None,
134 Some(value) => Some(parse_expiry_date_to_code(value.trim())?),
135 None => None,
136 };
137
138 Ok(OptionsChainSubscription {
139 symbols: normalize_symbols(symbols),
140 expiry,
141 option_type: OptionsChainOptionTypeFilter::parse(option_type.as_deref())?,
142 })
143}
144
145fn options_chain_update_symbols(update: &WsOptionsChainUpdate) -> HashSet<String> {
146 match update {
147 WsOptionsChainUpdate::Upsert { row, .. } => {
148 let mut symbols = HashSet::new();
149 if let Some(call) = &row.call {
150 symbols.insert(call.symbol.to_ascii_uppercase());
151 }
152 if let Some(put) = &row.put {
153 symbols.insert(put.symbol.to_ascii_uppercase());
154 }
155 symbols
156 }
157 WsOptionsChainUpdate::Remove { symbol, .. } => HashSet::from([symbol.to_ascii_uppercase()]),
158 }
159}
160
161fn expiry_timestamp_to_code(expiry_ts_secs: u64) -> Option<u64> {
163 use chrono::Datelike;
164 let timestamp = i64::try_from(expiry_ts_secs).ok()?;
165 let date = chrono::DateTime::from_timestamp(timestamp, 0)?.date_naive();
166 Some((date.year() as u64) * 10_000 + (date.month() as u64) * 100 + date.day() as u64)
167}
168
169fn options_chain_update_matches(
170 update: &WsOptionsChainUpdate,
171 subscription: &OptionsChainSubscription,
172) -> bool {
173 if let Some(expected_expiry_code) = subscription.expiry {
174 let update_expiry = match update {
175 WsOptionsChainUpdate::Upsert { expiry, .. } => *expiry,
176 WsOptionsChainUpdate::Remove { expiry, .. } => *expiry,
177 };
178 if expiry_timestamp_to_code(update_expiry) != Some(expected_expiry_code) {
182 return false;
183 }
184 }
185
186 if let Some(symbols_filter) = &subscription.symbols {
187 let symbols_set: HashSet<String> = symbols_filter.iter().cloned().collect();
188 let update_symbols = options_chain_update_symbols(update);
189 if symbols_set.is_disjoint(&update_symbols) {
190 return false;
191 }
192 }
193
194 match update {
195 WsOptionsChainUpdate::Upsert { row, .. } => {
196 let has_call = row.call.is_some();
197 let has_put = row.put.is_some();
198 match subscription.option_type {
199 OptionsChainOptionTypeFilter::Both => has_call || has_put,
200 OptionsChainOptionTypeFilter::Call => has_call,
201 OptionsChainOptionTypeFilter::Put => has_put,
202 }
203 }
204 WsOptionsChainUpdate::Remove { option_type, .. } => {
205 subscription.option_type.allows_option_type(option_type)
206 }
207 }
208}
209
210fn apply_option_type_filter_to_row(
211 mut row: OptionsChainStrikeRow,
212 option_type: OptionsChainOptionTypeFilter,
213) -> Option<OptionsChainStrikeRow> {
214 if row.call.is_some() && row.put.is_some() {
215 warn!(
216 strike = row.strike,
217 "options_chain upsert contained both call and put legs; pruning by subscription filter"
218 );
219 }
220
221 match option_type {
222 OptionsChainOptionTypeFilter::Both => {}
223 OptionsChainOptionTypeFilter::Call => row.put = None,
224 OptionsChainOptionTypeFilter::Put => row.call = None,
225 }
226
227 if row.call.is_none() && row.put.is_none() {
228 return None;
229 }
230
231 Some(row)
232}
233
234fn apply_options_chain_filters_to_update(
235 update: &WsOptionsChainUpdate,
236 option_type_filter: OptionsChainOptionTypeFilter,
237) -> Option<WsOptionsChainUpdate> {
238 match update {
239 WsOptionsChainUpdate::Upsert {
240 currency,
241 expiry,
242 row,
243 timestamp,
244 } => {
245 let row = apply_option_type_filter_to_row(row.clone(), option_type_filter)?;
246
247 Some(WsOptionsChainUpdate::Upsert {
248 currency: currency.clone(),
249 expiry: *expiry,
250 row,
251 timestamp: *timestamp,
252 })
253 }
254 WsOptionsChainUpdate::Remove {
255 currency,
256 expiry,
257 strike,
258 option_type,
259 symbol,
260 timestamp,
261 } => {
262 if !option_type_filter.allows_option_type(option_type) {
263 return None;
264 }
265
266 Some(WsOptionsChainUpdate::Remove {
267 currency: currency.clone(),
268 expiry: *expiry,
269 strike: *strike,
270 option_type: option_type.clone(),
271 symbol: symbol.clone(),
272 timestamp: *timestamp,
273 })
274 }
275 }
276}
277
278fn effective_option_type_for_matching_subscriptions(
279 subscriptions: &[OptionsChainSubscription],
280 update: &WsOptionsChainUpdate,
281) -> Option<OptionsChainOptionTypeFilter> {
282 let mut has_call = false;
283 let mut has_put = false;
284 let mut has_both = false;
285
286 for subscription in subscriptions {
287 if !options_chain_update_matches(update, subscription) {
288 continue;
289 }
290
291 match subscription.option_type {
292 OptionsChainOptionTypeFilter::Call => has_call = true,
293 OptionsChainOptionTypeFilter::Put => has_put = true,
294 OptionsChainOptionTypeFilter::Both => has_both = true,
295 }
296 }
297
298 if has_both || (has_call && has_put) {
299 Some(OptionsChainOptionTypeFilter::Both)
300 } else if has_call {
301 Some(OptionsChainOptionTypeFilter::Call)
302 } else if has_put {
303 Some(OptionsChainOptionTypeFilter::Put)
304 } else {
305 None
306 }
307}
308
309type ClientId = uuid::Uuid;
310
311struct Client {
312 sender: mpsc::UnboundedSender<Arc<WsMessage>>,
313 subscriptions: Vec<String>,
314 options_chain_subscriptions: Vec<OptionsChainSubscription>,
315 order_update_symbols: HashSet<String>,
318 public_channel_symbols: HashSet<String>,
321 authenticated: bool,
322 wallet_address: Option<WalletAddress>,
323}
324
325#[derive(Clone)]
326pub struct PubSubManager {
327 clients: Arc<DashMap<ClientId, Client>>,
328 channel_subscribers: Arc<DashMap<String, Vec<ClientId>>>,
329}
330
331impl Default for PubSubManager {
332 fn default() -> Self {
333 Self::new()
334 }
335}
336
337impl PubSubManager {
338 pub fn new() -> Self {
339 Self {
340 clients: Arc::new(DashMap::new()),
341 channel_subscribers: Arc::new(DashMap::new()),
342 }
343 }
344
345 pub fn add_client(
346 &self,
347 client_id: ClientId,
348 sender: mpsc::UnboundedSender<Arc<WsMessage>>,
349 authenticated: bool,
350 wallet_address: Option<WalletAddress>,
351 ) {
352 let client = Client {
353 sender,
354 subscriptions: Vec::new(),
355 options_chain_subscriptions: Vec::new(),
356 order_update_symbols: HashSet::new(),
357 public_channel_symbols: HashSet::new(),
358 authenticated,
359 wallet_address,
360 };
361 let prev = self.clients.insert(client_id, client);
362 if prev.is_none() {
364 gauge!("ht_ws_connections_active").increment(1.0);
365 counter!("ht_ws_connections_total").increment(1);
366 }
367 }
368
369 pub fn authenticate_client(
371 &self,
372 client_id: ClientId,
373 wallet_address: WalletAddress,
374 ) -> Result<(), String> {
375 let mut client = self.clients.get_mut(&client_id).ok_or("Client not found")?;
376 client.authenticated = true;
377 client.wallet_address = Some(wallet_address);
378 Ok(())
379 }
380
381 pub fn remove_client(&self, client_id: ClientId) {
382 if let Some((_, client)) = self.clients.remove(&client_id) {
383 gauge!("ht_ws_connections_active").decrement(1.0);
385 counter!("ht_ws_disconnections_total").increment(1);
386
387 for channel in &client.subscriptions {
388 if let Some(mut subs) = self.channel_subscribers.get_mut(channel) {
389 subs.retain(|id| id != &client_id);
390 if subs.is_empty() {
391 drop(subs);
393 self.channel_subscribers.remove(channel);
394 }
395 }
396 gauge!("ht_ws_channel_subscribers", "channel" => channel.clone()).decrement(1.0);
397 }
398 }
399 }
400
401 pub fn subscribe(
402 &self,
403 client_id: ClientId,
404 channel: String,
405 symbols: Option<Vec<String>>,
406 expiry: Option<String>,
407 option_type: Option<String>,
408 ) -> Result<(), String> {
409 let mut client = self.clients.get_mut(&client_id).ok_or("Client not found")?;
411
412 if requires_authentication(&channel) && !client.authenticated {
414 return Err("Authentication required for this channel".to_string());
415 }
416
417 if channel == "options_chain" {
418 let subscription =
419 parse_options_chain_subscription(symbols.clone(), expiry, option_type)?;
420
421 if !client.options_chain_subscriptions.contains(&subscription) {
422 client.options_chain_subscriptions.push(subscription);
423 }
424 }
425
426 if (channel == "order_updates" || channel == "fills") && symbols.is_some() {
430 if let Some(syms) = &symbols {
431 for sym in syms {
432 let normalized = sym.trim().to_uppercase();
433 if !normalized.is_empty() {
434 let underlying = normalized
437 .split('-')
438 .next()
439 .unwrap_or(&normalized)
440 .to_string();
441 client.order_update_symbols.insert(underlying);
442 }
443 }
444 }
445 }
446
447 if (channel == "orderbook" || channel == "trades" || channel == "indicative_market_data")
449 && symbols.is_some()
450 {
451 if let Some(syms) = &symbols {
452 for sym in syms {
453 let normalized = sym.trim().to_uppercase();
454 if !normalized.is_empty() {
455 client.public_channel_symbols.insert(normalized);
456 }
457 }
458 }
459 }
460
461 if client.subscriptions.contains(&channel) {
463 return Ok(());
464 }
465
466 client.subscriptions.push(channel.clone());
468
469 drop(client);
471
472 let mut subs = self.channel_subscribers.entry(channel.clone()).or_default();
474 if !subs.contains(&client_id) {
475 subs.push(client_id);
476 }
477
478 gauge!("ht_ws_channel_subscribers", "channel" => channel).set(subs.len() as f64);
479
480 Ok(())
481 }
482
483 pub fn unsubscribe(
484 &self,
485 client_id: ClientId,
486 channel: String,
487 symbols: Option<Vec<String>>,
488 expiry: Option<String>,
489 option_type: Option<String>,
490 ) -> Result<(), String> {
491 let mut client = self.clients.get_mut(&client_id).ok_or("Client not found")?;
492
493 if channel == "order_updates" || channel == "fills" {
496 if let Some(syms) = &symbols {
497 for sym in syms {
498 let normalized = sym.trim().to_uppercase();
499 let underlying = normalized
500 .split('-')
501 .next()
502 .unwrap_or(&normalized)
503 .to_string();
504 client.order_update_symbols.remove(&underlying);
505 }
506 if !client.order_update_symbols.is_empty() {
509 return Ok(());
510 }
511 } else {
512 client.order_update_symbols.clear();
514 }
515 }
516
517 if channel == "orderbook" || channel == "trades" || channel == "indicative_market_data" {
519 if let Some(syms) = &symbols {
520 for sym in syms {
521 let normalized = sym.trim().to_uppercase();
522 client.public_channel_symbols.remove(&normalized);
523 }
524 if !client.public_channel_symbols.is_empty() {
525 return Ok(());
526 }
527 } else {
528 client.public_channel_symbols.clear();
529 }
530 }
531
532 if channel == "options_chain" {
533 let normalized_symbols = normalize_symbols(symbols);
534 let normalized_expiry = expiry.and_then(|value| {
535 let trimmed = value.trim();
536 if trimmed.is_empty() {
537 None
538 } else {
539 Some(trimmed.to_string())
540 }
541 });
542 let normalized_option_type = option_type.and_then(|value| {
543 let trimmed = value.trim();
544 if trimmed.is_empty() {
545 None
546 } else {
547 Some(trimmed.to_string())
548 }
549 });
550
551 if normalized_symbols.is_none()
552 && normalized_expiry.is_none()
553 && normalized_option_type.is_none()
554 {
555 client.options_chain_subscriptions.clear();
556 } else {
557 let subscription = parse_options_chain_subscription(
558 normalized_symbols,
559 normalized_expiry,
560 normalized_option_type,
561 )?;
562 client
563 .options_chain_subscriptions
564 .retain(|existing| existing != &subscription);
565 }
566
567 if !client.options_chain_subscriptions.is_empty() {
568 return Ok(());
569 }
570 }
571
572 client.subscriptions.retain(|ch| ch != &channel);
574
575 drop(client);
577
578 let mut subscriber_count = 0;
580 if let Some(mut subs) = self.channel_subscribers.get_mut(&channel) {
581 subs.retain(|id| id != &client_id);
582 subscriber_count = subs.len();
583 if subs.is_empty() {
584 drop(subs);
585 self.channel_subscribers.remove(&channel);
586 }
587 }
588
589 gauge!("ht_ws_channel_subscribers", "channel" => channel).set(subscriber_count as f64);
590
591 Ok(())
592 }
593
594 pub fn publish_to_channel(&self, channel: &str, message: WsMessage) {
595 debug!(
596 "Publishing ws message to channel: {:?}: {:?}",
597 channel, message
598 );
599 counter!("ht_ws_messages_sent_total", "channel" => channel.to_string()).increment(1);
600
601 let start = Instant::now();
602 let msg = Arc::new(message);
603 let mut fan_out: u64 = 0;
604
605 if let Some(subscriber_ids) = self.channel_subscribers.get(channel) {
606 let is_authenticated = requires_authentication(channel);
607
608 for client_id in subscriber_ids.iter() {
609 if let Some(client) = self.clients.get(client_id) {
610 if is_authenticated {
611 let should_send = match msg.as_ref() {
612 WsMessage::OrderUpdate(order) => {
613 let wallet_match =
614 client.wallet_address.as_ref() == Some(&order.wallet_address);
615 let symbol_match = client.order_update_symbols.is_empty()
616 || order
617 .request
618 .symbol
619 .split('-')
620 .next()
621 .map(|u| {
622 client.order_update_symbols.contains(&u.to_uppercase())
623 })
624 .unwrap_or(true);
625 wallet_match && symbol_match
626 }
627 WsMessage::Fill(fill) => {
628 let wallet_match =
629 client.wallet_address.as_ref() == Some(&fill.wallet_address);
630 let symbol_match = client.order_update_symbols.is_empty()
631 || fill
632 .symbol
633 .split('-')
634 .next()
635 .map(|u| {
636 client.order_update_symbols.contains(&u.to_uppercase())
637 })
638 .unwrap_or(true);
639 wallet_match && symbol_match
640 }
641 WsMessage::PositionExpired(expired) => {
642 client.wallet_address.as_ref() == Some(&expired.wallet_address)
643 }
644 WsMessage::LiquidationStateChange(state_change) => {
645 client.wallet_address.as_ref() == Some(&state_change.wallet_address)
646 }
647 WsMessage::CompetitionUpdate(update) => {
648 client.wallet_address.as_ref() == Some(&update.wallet_address)
649 }
650 WsMessage::CompetitionPnlSummary(summary) => {
651 client.wallet_address.as_ref() == Some(&summary.wallet_address)
652 }
653 WsMessage::CompetitionFinalStats(final_stats) => {
654 client.wallet_address.as_ref() == Some(&final_stats.wallet_address)
655 }
656 WsMessage::CompetitionRankChange(change) => {
657 client.wallet_address.as_ref() == Some(&change.wallet_address)
658 }
659 WsMessage::CompetitionGapUpdate(gap) => {
660 client.wallet_address.as_ref() == Some(&gap.wallet_address)
661 }
662 WsMessage::CompetitionFinalStanding(final_standing) => {
663 client.wallet_address.as_ref()
664 == Some(&final_standing.wallet_address)
665 }
666 WsMessage::RfqQuotes(rfq) => {
667 client.wallet_address.as_ref() == Some(&rfq.taker_wallet)
668 }
669 WsMessage::RfqStatusUpdate(rfq) => {
670 client.wallet_address.as_ref() == Some(&rfq.taker_wallet)
671 }
672 WsMessage::RfqAcceptResult {
673 taker_wallet: Some(ref tw),
674 ..
675 } => client.wallet_address.as_ref() == Some(tw),
676 _ => true,
677 };
678
679 if should_send {
680 let _ = client.sender.send(Arc::clone(&msg));
681 fan_out += 1;
682 }
683 } else if channel == "options_chain" {
684 if let WsMessage::OptionsChainUpdate(update) = msg.as_ref() {
685 let effective_option_type =
686 effective_option_type_for_matching_subscriptions(
687 &client.options_chain_subscriptions,
688 update,
689 );
690 if let Some(option_type) = effective_option_type {
691 if let Some(filtered_update) =
692 apply_options_chain_filters_to_update(update, option_type)
693 {
694 let _ = client.sender.send(Arc::new(
695 WsMessage::OptionsChainUpdate(filtered_update),
696 ));
697 fan_out += 1;
698 }
699 }
700 }
701 } else {
702 let passes_filter = if client.public_channel_symbols.is_empty() {
704 true
705 } else {
706 match msg.as_ref() {
707 WsMessage::OrderbookUpdate(update) => client
708 .public_channel_symbols
709 .contains(&update.symbol.to_uppercase()),
710 WsMessage::Trade(trade) => client
711 .public_channel_symbols
712 .contains(&trade.symbol.to_uppercase()),
713 WsMessage::IndicativeMarketData(update) => client
714 .public_channel_symbols
715 .contains(&update.instrument.to_uppercase()),
716 _ => true,
717 }
718 };
719 if passes_filter {
720 let _ = client.sender.send(Arc::clone(&msg));
721 fan_out += 1;
722 }
723 }
724 }
725 }
726 }
727
728 let duration_seconds = start.elapsed().as_secs_f64();
729 histogram!("ht_ws_publish_fan_out", "channel" => channel.to_string())
730 .record(fan_out as f64);
731 histogram!("ht_ws_publish_duration_seconds", "channel" => channel.to_string())
732 .record(duration_seconds);
733 }
734
735 pub async fn active_channels_with_prefix(&self, prefix: &str) -> Vec<String> {
736 let mut channels: Vec<String> = self
737 .channel_subscribers
738 .iter()
739 .map(|entry| entry.key().clone())
740 .filter(|channel| channel.starts_with(prefix))
741 .collect();
742 channels.sort_unstable();
743 channels
744 }
745
746 pub fn send_to_client(&self, client_id: ClientId, message: WsMessage) {
747 if let Some(client) = self.clients.get(&client_id) {
748 let _ = client.sender.send(Arc::new(message));
749 }
750 }
751
752 pub fn publish_orderbook_update(&self, update: WsOrderbookUpdate) {
753 self.publish_to_channel("orderbook", WsMessage::OrderbookUpdate(update));
754 }
755
756 pub fn publish_order_update(&self, order: WsOrderMessage) {
757 self.publish_to_channel("order_updates", WsMessage::OrderUpdate(order));
758 }
759
760 pub fn publish_fill(&self, fill: WsFillUpdate) {
761 self.publish_to_channel("fills", WsMessage::Fill(fill));
762 }
763
764 pub fn publish_market_update(&self, update: WsMarketUpdate) {
765 self.publish_to_channel("market_updates", WsMessage::MarketUpdate(update));
766 }
767
768 pub fn publish_trade(&self, trade: WsTradeUpdate) {
769 self.publish_to_channel("trades", WsMessage::Trade(trade));
770 }
771
772 pub fn publish_options_chain_update(&self, update: WsOptionsChainUpdate) {
773 self.publish_to_channel("options_chain", WsMessage::OptionsChainUpdate(update));
774 }
775
776 pub fn publish_index_prices(&self, update: WsIndexPriceUpdate) {
777 self.publish_to_channel("index_prices", WsMessage::IndexPriceUpdate(update));
778 }
779
780 pub fn publish_indicative_market_data(&self, update: WsIndicativeMarketData) {
781 self.publish_to_channel(
782 "indicative_market_data",
783 WsMessage::IndicativeMarketData(update),
784 );
785 }
786
787 pub fn publish_position_expired(&self, expired: WsPositionExpired) {
788 self.publish_to_channel("portfolio", WsMessage::PositionExpired(expired));
789 }
790
791 pub fn publish_liquidation_state_change(&self, state_change: WsLiquidationStateChange) {
792 self.publish_to_channel(
793 "liquidation",
794 WsMessage::LiquidationStateChange(state_change),
795 );
796 }
797
798 pub async fn publish_competition_update(&self, update: WsCompetitionUpdate) {
799 self.publish_to_channel("competition", WsMessage::CompetitionUpdate(update));
800 }
801
802 pub async fn publish_competition_pnl_summary(&self, summary: WsCompetitionPnlSummary) {
803 self.publish_to_channel("competition", WsMessage::CompetitionPnlSummary(summary));
804 }
805
806 pub async fn publish_competition_final_stats(&self, final_stats: WsCompetitionFinalStats) {
807 self.publish_to_channel("competition", WsMessage::CompetitionFinalStats(final_stats));
808 }
809
810 pub async fn publish_competition_rank_change(&self, update: WsCompetitionRankChange) {
811 self.publish_to_channel(
812 "competition_engagement",
813 WsMessage::CompetitionRankChange(update),
814 );
815 }
816
817 pub async fn publish_competition_gap_update(&self, update: WsCompetitionGapUpdate) {
818 self.publish_to_channel(
819 "competition_engagement",
820 WsMessage::CompetitionGapUpdate(update),
821 );
822 }
823
824 pub async fn publish_competition_final_standing(&self, update: WsCompetitionFinalStanding) {
825 self.publish_to_channel(
826 "competition_engagement",
827 WsMessage::CompetitionFinalStanding(update),
828 );
829 }
830
831 pub fn publish_rfq_quotes(&self, quotes: WsRfqQuotes) {
832 self.publish_to_channel("rfq", WsMessage::RfqQuotes(quotes));
833 }
834
835 pub async fn subscribed_wallets(&self, channel: &str) -> Vec<WalletAddress> {
837 let mut wallets = HashSet::new();
838
839 if let Some(subscriber_ids) = self.channel_subscribers.get(channel) {
840 for client_id in subscriber_ids.iter() {
841 if let Some(client) = self.clients.get(client_id) {
842 if let Some(wallet) = client.wallet_address {
843 wallets.insert(wallet);
844 }
845 }
846 }
847 }
848
849 wallets.into_iter().collect()
850 }
851}
852
853impl crate::rfq::qp_ws_state::RfqWebsocketPublisher for PubSubManager {
854 fn publish_indicative_market_data(&self, update: WsIndicativeMarketData) {
855 PubSubManager::publish_indicative_market_data(self, update);
856 }
857
858 fn publish_rfq_quotes(&self, quotes: WsRfqQuotes) {
859 PubSubManager::publish_rfq_quotes(self, quotes);
860 }
861
862 fn publish_to_channel(&self, channel: &str, message: WsMessage) {
863 PubSubManager::publish_to_channel(self, channel, message);
864 }
865}
866
867#[derive(Clone)]
868pub struct WsState {
869 pub pubsub: PubSubManager,
870 pub indicative_cache: Option<Arc<IndicativeQuoteCache>>,
871 pub portfolio_cache: Option<Arc<dyn PortfolioCacheApi>>,
872 pub competition_service: Option<Arc<hypercall_competition::CompetitionService>>,
873 pub heartbeat_config: (Duration, Duration),
874 pub rfq_handler_state: Option<RfqHandlerState>,
875 pub order_sender: Option<mpsc::Sender<hypercall_runtime_api::UnifiedEngineRequest>>,
876 pub trading_halt: Option<Arc<RwLock<crate::trading_halt::TradingHaltState>>>,
877 pub agent_auth: Arc<dyn hypercall_runtime_api::AgentAuthProvider>,
878 pub signing_chain_id: u64,
879}
880
881#[derive(Deserialize)]
882pub struct WsQuery {
883 pub wallet: Option<String>,
884}
885
886pub async fn websocket_handler(
887 ws: WebSocketUpgrade,
888 State(state): State<WsState>,
889 Query(query): Query<WsQuery>,
890) -> Response {
891 let (authenticated, wallet_address) = if let Some(wallet) = query.wallet {
895 match WalletAddress::from_str(&wallet) {
898 Ok(addr) => (true, Some(addr)),
899 Err(_) => {
900 tracing::warn!(
901 "Invalid wallet address format in WebSocket query: {}",
902 wallet
903 );
904 (false, None)
905 }
906 }
907 } else {
908 (false, None)
910 };
911
912 ws.on_upgrade(move |socket| handle_socket(socket, state, authenticated, wallet_address))
913}
914
915async fn handle_socket(
916 socket: WebSocket,
917 state: WsState,
918 authenticated: bool,
919 wallet_address: Option<WalletAddress>,
920) {
921 let (heartbeat_interval, pong_timeout) = state.heartbeat_config;
922 let (mut sender, mut receiver) = socket.split();
923 let client_id = uuid::Uuid::new_v4();
924
925 let (tx, mut rx) = mpsc::unbounded_channel::<Arc<WsMessage>>();
926
927 let mut wallet_addr_clone = wallet_address;
929 let mut authenticated = authenticated;
930 state
931 .pubsub
932 .add_client(client_id, tx.clone(), authenticated, wallet_address);
933
934 let mut portfolio_task = None;
936 let mut portfolio_subscription: Option<(WalletAddress, u64)> = None;
937 if authenticated && wallet_addr_clone.is_some() && state.portfolio_cache.is_some() {
938 }
940
941 let mut heartbeat = time::interval(heartbeat_interval);
942 heartbeat.set_missed_tick_behavior(MissedTickBehavior::Delay);
943 heartbeat.tick().await;
944 let mut last_pong = Instant::now();
945
946 loop {
947 tokio::select! {
948 maybe_msg = rx.recv() => {
949 let Some(msg) = maybe_msg else {
950 break;
951 };
952
953 if let Ok(json) = sonic_rs::to_string(msg.as_ref()) {
954 debug!("Sending message to client: {:?}", msg);
955 if sender.send(Message::Text(json)).await.is_err() {
956 break;
957 }
958 }
959 }
960 maybe_frame = receiver.next() => {
961 let Some(frame_result) = maybe_frame else {
962 break;
963 };
964
965 let Ok(frame) = frame_result else {
966 break;
967 };
968
969 match frame {
970 Message::Text(text) => {
971 let deser_result = sonic_rs::from_str::<WsMessage>(&text);
972 if deser_result.is_err() && (text.contains("SubmitAutoExecuteRfq") || text.contains("SubmitRfq")) {
973 tracing::warn!(
974 client_id = ?client_id,
975 error = %deser_result.as_ref().unwrap_err(),
976 msg_len = text.len(),
977 "Failed to deserialize RFQ WS message"
978 );
979 }
980 if let Ok(ws_msg) = deser_result {
981 match ws_msg {
982 WsMessage::Subscribe {
983 channel,
984 symbols,
985 expiry,
986 option_type,
987 } => {
988 match state.pubsub.subscribe(
989 client_id,
990 channel.clone(),
991 symbols.clone(),
992 expiry,
993 option_type,
994 ) {
995 Ok(_) => {
996 if channel == "portfolio"
997 && authenticated
998 && portfolio_subscription.is_none()
999 {
1000 if let Some(ref wallet) = wallet_addr_clone {
1001 if let Some(ref cache) = state.portfolio_cache {
1002 let (subscriber_id, mut portfolio_rx) =
1003 cache.subscribe(*wallet).await;
1004 let tx_clone = tx.clone();
1005 portfolio_subscription =
1006 Some((*wallet, subscriber_id));
1007 portfolio_task =
1008 Some(tokio::spawn(async move {
1009 while let Some(update) =
1010 portfolio_rx.recv().await
1011 {
1012 if tx_clone
1013 .send(Arc::new(
1014 WsMessage::PortfolioUpdate(
1015 update,
1016 ),
1017 ))
1018 .is_err()
1019 {
1020 break;
1021 }
1022 }
1023 }));
1024 }
1025 }
1026 }
1027
1028 state.pubsub.send_to_client(
1029 client_id,
1030 WsMessage::Subscribed {
1031 channel: channel.clone(),
1032 },
1033 );
1034
1035 if channel == "indicative_market_data" {
1036 if let Some(ref cache) = state.indicative_cache {
1037 let symbol_filter = normalize_symbols(symbols);
1038 let mut snapshot_count = 0u64;
1039 for quote in cache.get_all_aggregates() {
1040 if let Some(filter) = &symbol_filter {
1041 if !filter.contains(
1042 "e.instrument.to_ascii_uppercase(),
1043 ) {
1044 continue;
1045 }
1046 }
1047 snapshot_count += 1;
1048 state
1049 .pubsub
1050 .send_to_client(
1051 client_id,
1052 WsMessage::IndicativeMarketData(
1053 WsIndicativeMarketData {
1054 instrument: quote.instrument,
1055 best_bid: Some(quote.best_bid),
1056 best_ask: Some(quote.best_ask),
1057 bid_iv: None,
1058 ask_iv: None,
1059 indicative_bid_size: quote
1060 .indicative_bid_size
1061 .into(),
1062 indicative_ask_size: quote
1063 .indicative_ask_size
1064 .into(),
1065 num_providers: quote
1066 .num_providers,
1067 timestamp: quote.updated_at,
1068 },
1069 ),
1070 );
1071 }
1072 counter!(
1073 "ht_ws_indicative_snapshot_sent_total"
1074 )
1075 .increment(snapshot_count);
1076 }
1077 }
1078
1079 if channel == "competition" && authenticated {
1080 if let (Some(wallet), Some(service)) = (
1081 wallet_addr_clone,
1082 state.competition_service.as_ref(),
1083 ) {
1084 match service
1085 .build_competition_pnl_summary_for_wallet(
1086 wallet,
1087 chrono::Utc::now().timestamp_millis(),
1088 )
1089 .await
1090 {
1091 Ok(summary) => {
1092 state.pubsub.send_to_client(
1093 client_id,
1094 WsMessage::CompetitionPnlSummary(
1095 summary,
1096 ),
1097 );
1098 }
1099 Err(err) => {
1100 state.pubsub.send_to_client(
1101 client_id,
1102 WsMessage::Error {
1103 message: format!(
1104 "failed to build competition summary: {}",
1105 err
1106 ),
1107 },
1108 );
1109 }
1110 }
1111 }
1112 }
1113 }
1114 Err(err) => {
1115 state.pubsub.send_to_client(
1116 client_id,
1117 WsMessage::Error { message: err },
1118 );
1119 }
1120 }
1121 }
1122 WsMessage::Unsubscribe {
1123 channel,
1124 symbols,
1125 expiry,
1126 option_type,
1127 } => {
1128 match state.pubsub.unsubscribe(
1129 client_id,
1130 channel.clone(),
1131 symbols,
1132 expiry,
1133 option_type,
1134 ) {
1135 Ok(_) => {
1136 if channel == "portfolio" {
1137 if let Some(task) = portfolio_task.take() {
1138 task.abort();
1139 }
1140 if let Some((wallet, subscriber_id)) =
1141 portfolio_subscription.take()
1142 {
1143 if let Some(ref cache) = state.portfolio_cache {
1144 cache.unsubscribe(&wallet, subscriber_id).await;
1145 }
1146 }
1147 }
1148
1149 state.pubsub.send_to_client(
1150 client_id,
1151 WsMessage::Unsubscribed { channel },
1152 );
1153 }
1154 Err(err) => {
1155 state.pubsub.send_to_client(
1156 client_id,
1157 WsMessage::Error { message: err },
1158 );
1159 }
1160 }
1161 }
1162 WsMessage::Authenticate { wallet } => {
1163 match WalletAddress::from_str(&wallet) {
1164 Ok(addr) => {
1165 match state.pubsub.authenticate_client(client_id, addr) {
1166 Ok(_) => {
1167 wallet_addr_clone = Some(addr);
1168 authenticated = true;
1169 state.pubsub.send_to_client(
1170 client_id,
1171 WsMessage::Authenticated {
1172 wallet: wallet.clone(),
1173 },
1174 );
1175 }
1176 Err(err) => {
1177 state.pubsub.send_to_client(
1178 client_id,
1179 WsMessage::Error { message: err },
1180 );
1181 }
1182 }
1183 }
1184 Err(_) => {
1185 state.pubsub.send_to_client(
1186 client_id,
1187 WsMessage::Error {
1188 message: format!(
1189 "Invalid wallet address: {}",
1190 wallet
1191 ),
1192 },
1193 );
1194 }
1195 }
1196 }
1197 WsMessage::SubmitRfq {
1198 rfq_id,
1199 legs,
1200 wallet_address: req_wallet,
1201 nonce,
1202 signature,
1203 } => {
1204 if !authenticated {
1205 state.pubsub.send_to_client(
1206 client_id,
1207 WsMessage::Error {
1208 message: "Not authenticated".to_string(),
1209 },
1210 );
1211 } else if let Some(ref rfq_state) = state.rfq_handler_state {
1212 let typed_legs: Vec<hypercall_types::RfqLegRequest> = legs
1214 .iter()
1215 .map(|l| hypercall_types::RfqLegRequest {
1216 instrument: l.instrument.clone(),
1217 side: l.side,
1218 size: l.size.clone(),
1219 })
1220 .collect();
1221
1222 let wallet = match WalletAddress::from_str(&req_wallet) {
1223 Ok(w) => w,
1224 Err(_) => {
1225 state.pubsub.send_to_client(
1226 client_id,
1227 WsMessage::Error {
1228 message: format!(
1229 "Invalid wallet address: {}",
1230 req_wallet
1231 ),
1232 },
1233 );
1234 continue;
1235 }
1236 };
1237
1238 match super::handlers::rfq::submit_rfq_inner(
1239 &rfq_state.rfq_manager,
1240 rfq_state.agent_auth.as_ref(),
1241 &rfq_id,
1242 &typed_legs,
1243 wallet,
1244 nonce,
1245 &signature,
1246 state.signing_chain_id,
1247 )
1248 .await
1249 {
1250 Ok(record) => {
1251 state.pubsub.send_to_client(
1252 client_id,
1253 WsMessage::RfqStatusUpdate(WsRfqStatusUpdate {
1254 rfq_id: record.rfq_id.to_string(),
1255 status: format!("{:?}", record.status),
1256 taker_wallet: record.taker_wallet,
1257 }),
1258 );
1259 }
1260 Err(err) => {
1261 state.pubsub.send_to_client(
1262 client_id,
1263 WsMessage::Error {
1264 message: format!(
1265 "RFQ submission failed: {}",
1266 err
1267 ),
1268 },
1269 );
1270 }
1271 }
1272 } else {
1273 state.pubsub.send_to_client(
1274 client_id,
1275 WsMessage::Error {
1276 message: "RFQ system not available".to_string(),
1277 },
1278 );
1279 }
1280 }
1281 WsMessage::SubmitAutoExecuteRfq {
1282 rfq_id,
1283 legs,
1284 wallet_address: req_wallet,
1285 limit_price,
1286 nonce,
1287 signature,
1288 } => {
1289 tracing::info!(
1290 client_id = ?client_id,
1291 rfq_id = %rfq_id,
1292 limit_price = %limit_price,
1293 authenticated,
1294 "WS SubmitAutoExecuteRfq received"
1295 );
1296 if !authenticated {
1297 tracing::warn!(client_id = ?client_id, rfq_id = %rfq_id, "SubmitAutoExecuteRfq rejected: not authenticated");
1298 state.pubsub.send_to_client(
1299 client_id,
1300 WsMessage::Error {
1301 message: "Not authenticated".to_string(),
1302 },
1303 );
1304 } else if let Some(ref rfq_state) = state.rfq_handler_state {
1305 let typed_legs: Vec<hypercall_types::RfqLegRequest> = legs
1306 .iter()
1307 .map(|l| hypercall_types::RfqLegRequest {
1308 instrument: l.instrument.clone(),
1309 side: l.side,
1310 size: l.size.clone(),
1311 })
1312 .collect();
1313
1314 let wallet = match WalletAddress::from_str(&req_wallet) {
1315 Ok(w) => w,
1316 Err(_) => {
1317 state.pubsub.send_to_client(
1318 client_id,
1319 WsMessage::Error {
1320 message: format!(
1321 "Invalid wallet address: {}",
1322 req_wallet
1323 ),
1324 },
1325 );
1326 continue;
1327 }
1328 };
1329
1330 match super::handlers::rfq::submit_auto_execute_rfq_inner(
1331 &rfq_state.rfq_manager,
1332 rfq_state.agent_auth.as_ref(),
1333 &rfq_id,
1334 &typed_legs,
1335 wallet,
1336 &limit_price,
1337 nonce,
1338 &signature,
1339 state.signing_chain_id,
1340 )
1341 .await
1342 {
1343 Ok(record) => {
1344 state.pubsub.send_to_client(
1345 client_id,
1346 WsMessage::RfqStatusUpdate(WsRfqStatusUpdate {
1347 rfq_id: record.rfq_id.to_string(),
1348 status: format!("{:?}", record.status),
1349 taker_wallet: record.taker_wallet,
1350 }),
1351 );
1352 }
1353 Err(err) => {
1354 state.pubsub.send_to_client(
1355 client_id,
1356 WsMessage::Error {
1357 message: format!(
1358 "Auto-execute RFQ submission failed: {}",
1359 err
1360 ),
1361 },
1362 );
1363 }
1364 }
1365 } else {
1366 state.pubsub.send_to_client(
1367 client_id,
1368 WsMessage::Error {
1369 message: "RFQ system not available".to_string(),
1370 },
1371 );
1372 }
1373 }
1374 WsMessage::AcceptRfqQuote {
1383 rfq_id,
1384 quote_id,
1385 wallet_address: req_wallet,
1386 nonce,
1387 signature,
1388 } => {
1389 if !authenticated {
1390 state.pubsub.send_to_client(
1391 client_id,
1392 WsMessage::Error {
1393 message: "Not authenticated".to_string(),
1394 },
1395 );
1396 } else if let Some(ref rfq_state) = state.rfq_handler_state {
1397 let wallet = match WalletAddress::from_str(&req_wallet) {
1398 Ok(w) => w,
1399 Err(_) => {
1400 state.pubsub.send_to_client(
1401 client_id,
1402 WsMessage::Error {
1403 message: format!(
1404 "Invalid wallet address: {}",
1405 req_wallet
1406 ),
1407 },
1408 );
1409 continue;
1410 }
1411 };
1412
1413 match super::handlers::rfq::accept_rfq_quote_inner(
1414 &rfq_state.rfq_manager,
1415 rfq_state.agent_auth.as_ref(),
1416 &rfq_id,
1417 "e_id,
1418 wallet,
1419 nonce,
1420 &signature,
1421 state.signing_chain_id,
1422 )
1423 .await
1424 {
1425 Ok(response) => {
1426 state.pubsub.send_to_client(
1427 client_id,
1428 WsMessage::RfqAcceptResult {
1429 rfq_id: response.rfq_id,
1430 quote_id: response.quote_id,
1431 status: response.status.as_str().to_string(),
1432 fill_id: Some(response.fill_id),
1433 taker_wallet: None,
1436 },
1437 );
1438 }
1439 Err(err) => {
1440 state.pubsub.send_to_client(
1441 client_id,
1442 WsMessage::Error {
1443 message: format!(
1444 "RFQ accept failed: {}",
1445 err
1446 ),
1447 },
1448 );
1449 }
1450 }
1451 } else {
1452 state.pubsub.send_to_client(
1453 client_id,
1454 WsMessage::Error {
1455 message: "RFQ system not available".to_string(),
1456 },
1457 );
1458 }
1459 }
1460 WsMessage::PlaceOrder {
1461 wallet: req_wallet,
1462 symbol,
1463 side,
1464 size,
1465 price,
1466 tif,
1467 client_id: req_client_id,
1468 nonce,
1469 signature,
1470 mmp_enabled,
1471 builder_code_address,
1472 } => {
1473 if !authenticated {
1474 state.pubsub.send_to_client(
1475 client_id,
1476 WsMessage::Error {
1477 message: "Not authenticated".to_string(),
1478 },
1479 );
1480 } else if let Some(ref order_sender) = state.order_sender {
1481 match handle_ws_place_order(
1482 &state,
1483 order_sender,
1484 &req_wallet,
1485 &symbol,
1486 &side,
1487 &size,
1488 &price,
1489 tif.as_deref(),
1490 req_client_id.as_deref(),
1491 nonce,
1492 &signature,
1493 mmp_enabled.unwrap_or(false),
1494 builder_code_address.as_deref(),
1495 )
1496 .await
1497 {
1498 Ok(result) => {
1499 state.pubsub.send_to_client(
1500 client_id,
1501 WsMessage::OrderResult(result),
1502 );
1503 }
1504 Err(err) => {
1505 state.pubsub.send_to_client(
1506 client_id,
1507 WsMessage::Error {
1508 message: format!(
1509 "Order placement failed: {}",
1510 err
1511 ),
1512 },
1513 );
1514 }
1515 }
1516 } else {
1517 state.pubsub.send_to_client(
1518 client_id,
1519 WsMessage::Error {
1520 message: "Order system not available"
1521 .to_string(),
1522 },
1523 );
1524 }
1525 }
1526 _ => {}
1527 }
1528 }
1529 }
1530 Message::Ping(payload) => {
1531 counter!("ht_ws_ping_received_total").increment(1);
1532 if sender.send(Message::Pong(payload)).await.is_err() {
1533 break;
1534 }
1535 }
1536 Message::Pong(_) => {
1537 last_pong = Instant::now();
1538 counter!("ht_ws_pongs_received_total").increment(1);
1539 }
1540 Message::Close(_) => break,
1541 _ => {}
1542 }
1543 }
1544 _ = heartbeat.tick() => {
1545 if Instant::now().duration_since(last_pong) >= pong_timeout {
1546 counter!("ht_ws_heartbeat_timeouts_total").increment(1);
1547 let _ = sender
1548 .send(Message::Close(Some(CloseFrame {
1549 code: close_code::POLICY,
1550 reason: "pong timeout".into(),
1551 })))
1552 .await;
1553 break;
1554 }
1555
1556 counter!("ht_ws_pings_sent_total").increment(1);
1557 if sender.send(Message::Ping(Vec::new())).await.is_err() {
1558 break;
1559 }
1560 }
1561 }
1562 }
1563
1564 if let Some(task) = portfolio_task.take() {
1566 task.abort();
1567 }
1568 if let Some((wallet, subscriber_id)) = portfolio_subscription.take() {
1569 if let Some(ref cache) = state.portfolio_cache {
1570 cache.unsubscribe(&wallet, subscriber_id).await;
1571 }
1572 }
1573 state.pubsub.remove_client(client_id);
1574}
1575
1576const WS_ENGINE_RESPONSE_TIMEOUT: Duration = Duration::from_secs(10);
1578
1579#[allow(clippy::too_many_arguments)]
1585async fn handle_ws_place_order(
1586 state: &WsState,
1587 order_sender: &mpsc::Sender<hypercall_runtime_api::UnifiedEngineRequest>,
1588 wallet_str: &str,
1589 symbol: &str,
1590 side_str: &str,
1591 size_str: &str,
1592 price_str: &str,
1593 tif_str: Option<&str>,
1594 client_id: Option<&str>,
1595 nonce: u64,
1596 signature: &str,
1597 mmp_enabled: bool,
1598 builder_code_address: Option<&str>,
1599) -> Result<WsOrderResult, String> {
1600 use hypercall_auth::SignatureRecovery;
1601 use hypercall_runtime_api::{increment_pending_requests, UnifiedEngineRequest};
1602 use hypercall_types::utils::get_timestamp_millis;
1603 use hypercall_types::{
1604 to_contract_units_decimal, validate_price_precision, ParsedOptionSymbol,
1605 MAX_PRICE_SIGNIFICANT_FIGURES,
1606 };
1607 use hypercall_types::{OrderAction, OrderActionMessage, OrderInfo, TimeInForce};
1608 use rust_decimal::prelude::ToPrimitive;
1609 use rust_decimal_macros::dec;
1610
1611 let wallet_address = WalletAddress::from_str(wallet_str)
1613 .map_err(|_| format!("Invalid wallet address: {}", wallet_str))?;
1614
1615 let side = match side_str.to_lowercase().as_str() {
1617 "buy" => Side::Buy,
1618 "sell" => Side::Sell,
1619 other => return Err(format!("Invalid side: {}", other)),
1620 };
1621
1622 let tif = match tif_str.unwrap_or("gtc").to_lowercase().as_str() {
1624 "gtc" => TimeInForce::GTC,
1625 "ioc" => TimeInForce::IOC,
1626 "fok" => TimeInForce::FOK,
1627 other => return Err(format!("Invalid tif: {}", other)),
1628 };
1629
1630 let tif_label = match tif {
1632 TimeInForce::GTC => "gtc",
1633 TimeInForce::IOC => "ioc",
1634 TimeInForce::FOK => "fok",
1635 };
1636 let signer_address = SignatureRecovery::recover_place_order_signer(
1637 wallet_address,
1638 symbol,
1639 &format!("{:?}", side),
1640 size_str,
1641 price_str,
1642 tif_label,
1643 client_id.unwrap_or(""),
1644 nonce,
1645 signature,
1646 state.signing_chain_id,
1647 )
1648 .map_err(|e| format!("Signature verification failed: {}", e))?;
1649 let signer_wallet = WalletAddress::from(signer_address);
1650
1651 let is_authorized = state
1652 .agent_auth
1653 .is_agent_authorized(&wallet_address, &signer_wallet);
1654 if !is_authorized {
1655 return Err(format!(
1656 "Unauthorized: signer {} not authorized for wallet {}",
1657 signer_wallet, wallet_address
1658 ));
1659 }
1660
1661 let _parsed_symbol =
1663 ParsedOptionSymbol::from_symbol(symbol).map_err(|e| format!("Invalid symbol: {}", e))?;
1664
1665 if let Some(ref trading_halt) = state.trading_halt {
1667 let halt_state = trading_halt.read().await;
1668 if let Some(reason) = halt_state.blocked_reason(symbol) {
1669 return Err(format!(
1670 "{}. Order placement is disabled while trading is halted.",
1671 reason
1672 ));
1673 }
1674 }
1675
1676 let price =
1678 Decimal::from_str(price_str).map_err(|_| format!("Invalid price format: {}", price_str))?;
1679 if price <= dec!(0) {
1680 return Err("Price must be positive".to_string());
1681 }
1682 let price_f64 = price
1683 .to_f64()
1684 .ok_or_else(|| format!("Price {} cannot be represented as f64", price))?;
1685 validate_price_precision(price_f64, MAX_PRICE_SIGNIFICANT_FIGURES)
1686 .map_err(|e| format!("Invalid price precision: {}", e))?;
1687
1688 let size =
1690 Decimal::from_str(size_str).map_err(|_| format!("Invalid size format: {}", size_str))?;
1691 if size <= dec!(0) {
1692 return Err("Size must be positive".to_string());
1693 }
1694
1695 let builder_code_wallet = match builder_code_address {
1697 Some(addr) => Some(
1698 WalletAddress::from_str(addr)
1699 .map_err(|_| format!("Invalid builder_code_address: {}", addr))?,
1700 ),
1701 None => None,
1702 };
1703
1704 let order_info = OrderInfo {
1706 symbol: symbol.to_string(),
1707 price,
1708 size: to_contract_units_decimal(symbol, size),
1709 side,
1710 tif,
1711 client_id: client_id.map(|s| s.to_string()),
1712 order_id: None,
1713 is_perp: false,
1714 underlying: None,
1715 reduce_only: None,
1716 nonce: None,
1717 signature: None,
1718 mmp_enabled,
1719 builder_code_address: builder_code_wallet,
1720 };
1721
1722 let order_action_msg = OrderActionMessage {
1724 timestamp: get_timestamp_millis(),
1725 info: order_info,
1726 action: OrderAction::CreateOrder,
1727 wallet: wallet_address,
1728 api_wallet_address: Some(signer_wallet),
1729 mmp_triggered: false,
1730 request_id: Some(::uuid::Uuid::now_v7().to_string()),
1731 };
1732
1733 let (response_tx, mut response_rx) = mpsc::channel(1);
1735
1736 let engine_request = UnifiedEngineRequest {
1737 message: order_action_msg,
1738 response_tx,
1739 enqueued_at: std::time::Instant::now(),
1740 #[cfg(feature = "otel-tracing")]
1741 trace_context: None,
1742 };
1743
1744 increment_pending_requests();
1746 order_sender
1747 .send(engine_request)
1748 .await
1749 .map_err(|_| "Failed to send order to engine".to_string())?;
1750
1751 let response = match tokio::time::timeout(WS_ENGINE_RESPONSE_TIMEOUT, response_rx.recv()).await
1753 {
1754 Ok(Some(resp)) => resp,
1755 Ok(None) => return Err("No response from engine".to_string()),
1756 Err(_) => return Err("Timeout waiting for engine response".to_string()),
1757 };
1758
1759 Ok(WsOrderResult {
1760 order_id: response.order_id,
1761 status: format!("{:?}", response.status),
1762 symbol: response.info.symbol.clone(),
1763 side: format!("{:?}", response.info.side),
1764 price: response.info.price.to_string(),
1765 size: size_str.to_string(),
1768 reason: response.reason.clone(),
1769 })
1770}
1771
1772pub mod uuid {
1774 use std::sync::atomic::{AtomicU64, Ordering};
1775
1776 static COUNTER: AtomicU64 = AtomicU64::new(0);
1777
1778 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1779 pub struct Uuid(u64);
1780
1781 impl Uuid {
1782 pub fn new_v4() -> Self {
1783 Uuid(COUNTER.fetch_add(1, Ordering::SeqCst))
1784 }
1785 }
1786}
1787
1788#[cfg(test)]
1789mod tests {
1790 use super::{
1791 validate_ws_heartbeat_config, ws_heartbeat_config, PubSubManager,
1792 DEFAULT_WS_HEARTBEAT_INTERVAL, DEFAULT_WS_PONG_TIMEOUT,
1793 };
1794 use hypercall_types::ws_protocol::WsIndicativeMarketData;
1795 use tokio::time::Duration;
1796
1797 #[test]
1798 fn test_ws_heartbeat_config_accepts_valid_custom_values() {
1799 let (heartbeat_interval, pong_timeout) = validate_ws_heartbeat_config(250, 750);
1800
1801 assert_eq!(heartbeat_interval, Duration::from_millis(250));
1802 assert_eq!(pong_timeout, Duration::from_millis(750));
1803 }
1804
1805 #[test]
1806 fn test_ws_heartbeat_config_rejects_zero_interval() {
1807 let (heartbeat_interval, pong_timeout) = validate_ws_heartbeat_config(0, 60_000);
1808
1809 assert_eq!(heartbeat_interval, DEFAULT_WS_HEARTBEAT_INTERVAL);
1810 assert_eq!(pong_timeout, DEFAULT_WS_PONG_TIMEOUT);
1811 }
1812
1813 #[test]
1814 fn test_ws_heartbeat_config_rejects_timeout_shorter_than_interval() {
1815 let (heartbeat_interval, pong_timeout) = validate_ws_heartbeat_config(500, 400);
1816
1817 assert_eq!(heartbeat_interval, DEFAULT_WS_HEARTBEAT_INTERVAL);
1818 assert_eq!(pong_timeout, DEFAULT_WS_PONG_TIMEOUT);
1819 }
1820
1821 #[test]
1822 fn test_ws_heartbeat_config_uses_defaults_without_runtime_config() {
1823 let (heartbeat_interval, pong_timeout) =
1824 ws_heartbeat_config(&hypercall_config::ApiRuntimeConfig::default());
1825
1826 assert_eq!(heartbeat_interval, DEFAULT_WS_HEARTBEAT_INTERVAL);
1827 assert_eq!(pong_timeout, DEFAULT_WS_PONG_TIMEOUT);
1828 }
1829
1830 #[test]
1831 fn test_indicative_market_data_honors_symbol_filter() {
1832 let pubsub = PubSubManager::new();
1833 let client_id = super::uuid::Uuid::new_v4();
1834 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
1835
1836 pubsub.add_client(client_id, tx, false, None);
1837 pubsub
1838 .subscribe(
1839 client_id,
1840 "indicative_market_data".to_string(),
1841 Some(vec!["BTC-20260131-100000-C".to_string()]),
1842 None,
1843 None,
1844 )
1845 .expect("indicative subscription should succeed");
1846
1847 pubsub.publish_indicative_market_data(WsIndicativeMarketData {
1848 instrument: "ETH-20260131-100000-C".to_string(),
1849 best_bid: Some(dec!(10)),
1850 bid_iv: None,
1851 ask_iv: None,
1852 best_ask: Some(dec!(11)),
1853 indicative_bid_size: Some(dec!(1)),
1854 indicative_ask_size: Some(dec!(1)),
1855 num_providers: 1,
1856 timestamp: 1,
1857 });
1858 assert!(rx.try_recv().is_err());
1859
1860 pubsub.publish_indicative_market_data(WsIndicativeMarketData {
1861 instrument: "BTC-20260131-100000-C".to_string(),
1862 best_bid: Some(dec!(20)),
1863 bid_iv: None,
1864 ask_iv: None,
1865 best_ask: Some(dec!(21)),
1866 indicative_bid_size: Some(dec!(2)),
1867 indicative_ask_size: Some(dec!(2)),
1868 num_providers: 1,
1869 timestamp: 2,
1870 });
1871
1872 let message = rx
1873 .try_recv()
1874 .expect("matching indicative quote should be delivered");
1875 match message.as_ref() {
1876 WsMessage::IndicativeMarketData(update) => {
1877 assert_eq!(update.instrument, "BTC-20260131-100000-C");
1878 assert_eq!(update.best_bid, Some(dec!(20)));
1879 assert_eq!(update.best_ask, Some(dec!(21)));
1880 }
1881 other => panic!("expected IndicativeMarketData, got {other:?}"),
1882 }
1883 assert!(rx.try_recv().is_err());
1884 }
1885
1886 use super::{
1889 WsMessage, WsOrderResult, WsRfqLegRequest, WsRfqQuoteEntry, WsRfqQuotes, WsRfqStatusUpdate,
1890 };
1891 use hypercall_types::{Side, WalletAddress};
1892 use rust_decimal_macros::dec;
1893 use std::str::FromStr;
1894
1895 #[test]
1896 fn test_submit_rfq_roundtrip() {
1897 let msg = WsMessage::SubmitRfq {
1898 rfq_id: "test-123".to_string(),
1899 legs: vec![WsRfqLegRequest {
1900 instrument: "ETH-20260425-2400-C".to_string(),
1901 side: Side::Buy,
1902 size: "10".to_string(),
1903 }],
1904 wallet_address: "0x1234567890abcdef1234567890abcdef12345678".to_string(),
1905 nonce: 42,
1906 signature: "0xsig".to_string(),
1907 };
1908 let json = serde_json::to_string(&msg).unwrap();
1909 assert!(json.contains("\"type\":\"SubmitRfq\""));
1910 assert!(json.contains("\"rfq_id\":\"test-123\""));
1911 assert!(json.contains("\"nonce\":42"));
1912
1913 let parsed: WsMessage = serde_json::from_str(&json).unwrap();
1914 match parsed {
1915 WsMessage::SubmitRfq {
1916 rfq_id,
1917 legs,
1918 wallet_address,
1919 nonce,
1920 signature,
1921 } => {
1922 assert_eq!(rfq_id, "test-123");
1923 assert_eq!(legs.len(), 1);
1924 assert_eq!(legs[0].instrument, "ETH-20260425-2400-C");
1925 assert_eq!(legs[0].size, "10");
1926 assert_eq!(wallet_address, "0x1234567890abcdef1234567890abcdef12345678");
1927 assert_eq!(nonce, 42);
1928 assert_eq!(signature, "0xsig");
1929 }
1930 other => panic!("Expected SubmitRfq, got {:?}", other),
1931 }
1932 }
1933
1934 #[test]
1935 fn test_accept_rfq_quote_roundtrip() {
1936 let msg = WsMessage::AcceptRfqQuote {
1937 rfq_id: "rfq-abc".to_string(),
1938 quote_id: "quote-xyz".to_string(),
1939 wallet_address: "0xdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef".to_string(),
1940 nonce: 99,
1941 signature: "0xdeadsig".to_string(),
1942 };
1943 let json = serde_json::to_string(&msg).unwrap();
1944 assert!(json.contains("\"type\":\"AcceptRfqQuote\""));
1945 assert!(json.contains("\"rfq_id\":\"rfq-abc\""));
1946 assert!(json.contains("\"quote_id\":\"quote-xyz\""));
1947
1948 let parsed: WsMessage = serde_json::from_str(&json).unwrap();
1949 match parsed {
1950 WsMessage::AcceptRfqQuote {
1951 rfq_id,
1952 quote_id,
1953 wallet_address,
1954 nonce,
1955 signature,
1956 } => {
1957 assert_eq!(rfq_id, "rfq-abc");
1958 assert_eq!(quote_id, "quote-xyz");
1959 assert_eq!(wallet_address, "0xdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef");
1960 assert_eq!(nonce, 99);
1961 assert_eq!(signature, "0xdeadsig");
1962 }
1963 other => panic!("Expected AcceptRfqQuote, got {:?}", other),
1964 }
1965 }
1966
1967 #[test]
1968 fn test_place_order_roundtrip() {
1969 let msg = WsMessage::PlaceOrder {
1970 wallet: "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".to_string(),
1971 symbol: "BTC-20260131-100000-C".to_string(),
1972 side: "Buy".to_string(),
1973 size: "1.5".to_string(),
1974 price: "5000".to_string(),
1975 tif: Some("gtc".to_string()),
1976 client_id: Some("my-order-1".to_string()),
1977 nonce: 1000,
1978 signature: "0xordersig".to_string(),
1979 mmp_enabled: Some(true),
1980 builder_code_address: None,
1981 };
1982 let json = serde_json::to_string(&msg).unwrap();
1983 assert!(json.contains("\"type\":\"PlaceOrder\""));
1984 assert!(json.contains("\"symbol\":\"BTC-20260131-100000-C\""));
1985 assert!(json.contains("\"side\":\"Buy\""));
1986 assert!(json.contains("\"price\":\"5000\""));
1987 assert!(!json.contains("builder_code_address"));
1989
1990 let parsed: WsMessage = serde_json::from_str(&json).unwrap();
1991 match parsed {
1992 WsMessage::PlaceOrder {
1993 wallet,
1994 symbol,
1995 side,
1996 size,
1997 price,
1998 tif,
1999 client_id,
2000 nonce,
2001 signature,
2002 mmp_enabled,
2003 builder_code_address,
2004 } => {
2005 assert_eq!(wallet, "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa");
2006 assert_eq!(symbol, "BTC-20260131-100000-C");
2007 assert_eq!(side, "Buy");
2008 assert_eq!(size, "1.5");
2009 assert_eq!(price, "5000");
2010 assert_eq!(tif, Some("gtc".to_string()));
2011 assert_eq!(client_id, Some("my-order-1".to_string()));
2012 assert_eq!(nonce, 1000);
2013 assert_eq!(signature, "0xordersig");
2014 assert_eq!(mmp_enabled, Some(true));
2015 assert!(builder_code_address.is_none());
2016 }
2017 other => panic!("Expected PlaceOrder, got {:?}", other),
2018 }
2019 }
2020
2021 #[test]
2022 fn test_place_order_minimal_fields_roundtrip() {
2023 let json = r#"{"type":"PlaceOrder","wallet":"0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa","symbol":"ETH-20260228-3000-P","side":"Sell","size":"2","price":"100","nonce":1,"signature":"0xsig"}"#;
2025 let parsed: WsMessage = serde_json::from_str(json).unwrap();
2026 match parsed {
2027 WsMessage::PlaceOrder {
2028 tif,
2029 client_id,
2030 mmp_enabled,
2031 builder_code_address,
2032 ..
2033 } => {
2034 assert!(tif.is_none());
2035 assert!(client_id.is_none());
2036 assert!(mmp_enabled.is_none());
2037 assert!(builder_code_address.is_none());
2038 }
2039 other => panic!("Expected PlaceOrder, got {:?}", other),
2040 }
2041 }
2042
2043 #[test]
2044 fn test_rfq_quotes_response_serialization() {
2045 let msg = WsMessage::RfqQuotes(WsRfqQuotes {
2046 rfq_id: "rfq-001".to_string(),
2047 quotes: vec![WsRfqQuoteEntry {
2048 quote_id: "q-1".to_string(),
2049 net_premium: dec!(150.50),
2050 expires_at: 1700000000,
2051 }],
2052 status: "quoted".to_string(),
2053 taker_wallet: WalletAddress::from_str("0xbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")
2054 .unwrap(),
2055 });
2056 let json = serde_json::to_string(&msg).unwrap();
2057 assert!(json.contains("\"type\":\"RfqQuotes\""));
2058 assert!(json.contains("\"rfq_id\":\"rfq-001\""));
2059 assert!(json.contains("\"quote_id\":\"q-1\""));
2060 assert!(json.contains("\"status\":\"quoted\""));
2061
2062 let parsed: WsMessage = serde_json::from_str(&json).unwrap();
2064 match parsed {
2065 WsMessage::RfqQuotes(quotes) => {
2066 assert_eq!(quotes.rfq_id, "rfq-001");
2067 assert_eq!(quotes.quotes.len(), 1);
2068 assert_eq!(quotes.quotes[0].quote_id, "q-1");
2069 assert_eq!(quotes.status, "quoted");
2070 }
2071 other => panic!("Expected RfqQuotes, got {:?}", other),
2072 }
2073 }
2074
2075 #[test]
2076 fn test_rfq_accept_result_serialization() {
2077 let msg = WsMessage::RfqAcceptResult {
2078 rfq_id: "rfq-002".to_string(),
2079 quote_id: "q-2".to_string(),
2080 status: "filled".to_string(),
2081 fill_id: Some("fill-abc".to_string()),
2082 taker_wallet: None,
2083 };
2084 let json = serde_json::to_string(&msg).unwrap();
2085 assert!(json.contains("\"type\":\"RfqAcceptResult\""));
2086 assert!(json.contains("\"rfq_id\":\"rfq-002\""));
2087 assert!(json.contains("\"fill_id\":\"fill-abc\""));
2088 assert!(!json.contains("taker_wallet"));
2090
2091 let parsed: WsMessage = serde_json::from_str(&json).unwrap();
2092 match parsed {
2093 WsMessage::RfqAcceptResult {
2094 rfq_id,
2095 quote_id,
2096 status,
2097 fill_id,
2098 ..
2099 } => {
2100 assert_eq!(rfq_id, "rfq-002");
2101 assert_eq!(quote_id, "q-2");
2102 assert_eq!(status, "filled");
2103 assert_eq!(fill_id, Some("fill-abc".to_string()));
2104 }
2105 other => panic!("Expected RfqAcceptResult, got {:?}", other),
2106 }
2107 }
2108
2109 #[test]
2110 fn test_order_result_serialization() {
2111 let msg = WsMessage::OrderResult(WsOrderResult {
2112 order_id: Some(12345),
2113 status: "Open".to_string(),
2114 symbol: "BTC-20260131-100000-C".to_string(),
2115 side: "Buy".to_string(),
2116 price: "5000".to_string(),
2117 size: "1000000".to_string(),
2118 reason: None,
2119 });
2120 let json = serde_json::to_string(&msg).unwrap();
2121 assert!(json.contains("\"type\":\"OrderResult\""));
2122 assert!(json.contains("\"order_id\":12345"));
2123 assert!(json.contains("\"status\":\"Open\""));
2124 assert!(json.contains("\"symbol\":\"BTC-20260131-100000-C\""));
2125 assert!(!json.contains("\"reason\""));
2127
2128 let parsed: WsMessage = serde_json::from_str(&json).unwrap();
2129 match parsed {
2130 WsMessage::OrderResult(result) => {
2131 assert_eq!(result.order_id, Some(12345));
2132 assert_eq!(result.status, "Open");
2133 assert_eq!(result.symbol, "BTC-20260131-100000-C");
2134 assert_eq!(result.side, "Buy");
2135 assert!(result.reason.is_none());
2136 }
2137 other => panic!("Expected OrderResult, got {:?}", other),
2138 }
2139 }
2140
2141 #[test]
2142 fn test_order_result_with_rejection_reason() {
2143 let msg = WsMessage::OrderResult(WsOrderResult {
2144 order_id: None,
2145 status: "Rejected".to_string(),
2146 symbol: "ETH-20260228-3000-P".to_string(),
2147 side: "Sell".to_string(),
2148 price: "100".to_string(),
2149 size: "500000".to_string(),
2150 reason: Some("Insufficient margin".to_string()),
2151 });
2152 let json = serde_json::to_string(&msg).unwrap();
2153 assert!(json.contains("\"reason\":\"Insufficient margin\""));
2154 assert!(json.contains("\"order_id\":null"));
2155
2156 let parsed: WsMessage = serde_json::from_str(&json).unwrap();
2157 match parsed {
2158 WsMessage::OrderResult(result) => {
2159 assert!(result.order_id.is_none());
2160 assert_eq!(result.reason, Some("Insufficient margin".to_string()));
2161 }
2162 other => panic!("Expected OrderResult, got {:?}", other),
2163 }
2164 }
2165
2166 #[test]
2167 fn test_rfq_status_update_serialization() {
2168 let msg = WsMessage::RfqStatusUpdate(WsRfqStatusUpdate {
2169 rfq_id: "rfq-status-test".to_string(),
2170 status: "expired".to_string(),
2171 taker_wallet: WalletAddress::from_str("0xcccccccccccccccccccccccccccccccccccccccc")
2172 .unwrap(),
2173 });
2174 let json = serde_json::to_string(&msg).unwrap();
2175 assert!(json.contains("\"type\":\"RfqStatusUpdate\""));
2176 assert!(json.contains("\"status\":\"expired\""));
2177
2178 let parsed: WsMessage = serde_json::from_str(&json).unwrap();
2179 match parsed {
2180 WsMessage::RfqStatusUpdate(update) => {
2181 assert_eq!(update.rfq_id, "rfq-status-test");
2182 assert_eq!(update.status, "expired");
2183 }
2184 other => panic!("Expected RfqStatusUpdate, got {:?}", other),
2185 }
2186 }
2187}