Skip to main content

hypercall_api/websocket/
mod.rs

1pub mod event_forwarder;
2pub mod rfq;
3
4use crate::boundary::read_models::PortfolioCacheApi;
5use crate::options_chain::{parse_expiry_date_to_code, OptionsChainOptionTypeFilter};
6use crate::rfq::handler_state::RfqHandlerState;
7use crate::rfq::indicative_quote_cache::IndicativeQuoteCache;
8use axum::{
9    extract::{
10        ws::{close_code, CloseFrame, Message, WebSocket, WebSocketUpgrade},
11        Query, State,
12    },
13    response::Response,
14};
15use dashmap::DashMap;
16use futures::{SinkExt, StreamExt};
17use hypercall_types::api_models::OptionsChainStrikeRow;
18pub use hypercall_types::ws_protocol::{
19    IndexPriceEntry, PortfolioUpdate, WsCandleUpdate, WsCompetitionFinalStanding,
20    WsCompetitionFinalStats, WsCompetitionGapUpdate, WsCompetitionPnlStanding,
21    WsCompetitionPnlSummary, WsCompetitionRankChange, WsCompetitionUpdate, WsFillUpdate,
22    WsIndexPriceUpdate, WsIndicativeMarketData, WsLiquidationStateChange, WsMarketUpdate,
23    WsMessage, WsOptionsChainUpdate, WsOrderMessage, WsOrderResult, WsOrderbookUpdate,
24    WsPositionExpired, WsRfqLegRequest, WsRfqQuoteEntry, WsRfqQuotes, WsRfqStatusUpdate,
25    WsTradeUpdate,
26};
27use hypercall_types::{Side, WalletAddress};
28use metrics::{counter, gauge, histogram};
29use rust_decimal::Decimal;
30use serde::Deserialize;
31use std::collections::HashSet;
32use std::str::FromStr;
33use std::sync::Arc;
34use tokio::sync::{mpsc, RwLock};
35use tokio::time::{self, Duration, Instant, MissedTickBehavior};
36use tracing::{debug, warn};
37
38/// Channels that require authentication before subscribing.
39/// These channels contain wallet-specific data and filter messages by authenticated wallet.
40const AUTHENTICATED_CHANNELS: &[&str] = &[
41    "order_updates",
42    "fills",
43    "portfolio",
44    "liquidation",
45    "competition",
46    "competition_engagement",
47    "rfq",
48];
49const DEFAULT_WS_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(20);
50const DEFAULT_WS_PONG_TIMEOUT: Duration = Duration::from_secs(60);
51
52fn validate_ws_heartbeat_config(
53    heartbeat_interval_ms: u64,
54    pong_timeout_ms: u64,
55) -> (Duration, Duration) {
56    if heartbeat_interval_ms == 0 || pong_timeout_ms == 0 {
57        warn!("WebSocket heartbeat settings must be positive, using defaults");
58        return (DEFAULT_WS_HEARTBEAT_INTERVAL, DEFAULT_WS_PONG_TIMEOUT);
59    }
60
61    let heartbeat_interval = Duration::from_millis(heartbeat_interval_ms);
62    let pong_timeout = Duration::from_millis(pong_timeout_ms);
63
64    if pong_timeout < heartbeat_interval {
65        warn!(
66            heartbeat_interval_ms,
67            pong_timeout_ms,
68            "WebSocket pong timeout must be greater than or equal to heartbeat interval, using defaults"
69        );
70        return (DEFAULT_WS_HEARTBEAT_INTERVAL, DEFAULT_WS_PONG_TIMEOUT);
71    }
72
73    (heartbeat_interval, pong_timeout)
74}
75
76pub fn ws_heartbeat_config(
77    api_config: &hypercall_config::ApiRuntimeConfig,
78) -> (Duration, Duration) {
79    validate_ws_heartbeat_config(
80        api_config.ws_heartbeat_interval_ms,
81        api_config.ws_pong_timeout_ms,
82    )
83}
84
85/// Check if a channel requires authentication.
86fn requires_authentication(channel: &str) -> bool {
87    AUTHENTICATED_CHANNELS.contains(&channel)
88}
89
90#[derive(Debug, Clone, PartialEq, Eq)]
91struct OptionsChainSubscription {
92    symbols: Option<Vec<String>>,
93    /// UTC calendar date code in YYYYMMDD form (e.g. 20260131), matched
94    /// against the UTC date of instrument expiry timestamps so one filter
95    /// spans underlyings with different expiry times of day.
96    expiry: Option<u64>,
97    option_type: OptionsChainOptionTypeFilter,
98}
99
100impl Default for OptionsChainSubscription {
101    fn default() -> Self {
102        Self {
103            symbols: None,
104            expiry: None,
105            option_type: OptionsChainOptionTypeFilter::Both,
106        }
107    }
108}
109
110fn normalize_symbols(symbols: Option<Vec<String>>) -> Option<Vec<String>> {
111    let values = symbols?;
112    let mut deduped: Vec<String> = values
113        .into_iter()
114        .map(|value| value.trim().to_ascii_uppercase())
115        .filter(|value| !value.is_empty())
116        .collect();
117
118    if deduped.is_empty() {
119        return None;
120    }
121
122    deduped.sort();
123    deduped.dedup();
124    Some(deduped)
125}
126
127fn parse_options_chain_subscription(
128    symbols: Option<Vec<String>>,
129    expiry: Option<String>,
130    option_type: Option<String>,
131) -> Result<OptionsChainSubscription, String> {
132    let expiry = match expiry {
133        Some(value) if value.trim().is_empty() => None,
134        Some(value) => Some(parse_expiry_date_to_code(value.trim())?),
135        None => None,
136    };
137
138    Ok(OptionsChainSubscription {
139        symbols: normalize_symbols(symbols),
140        expiry,
141        option_type: OptionsChainOptionTypeFilter::parse(option_type.as_deref())?,
142    })
143}
144
145fn options_chain_update_symbols(update: &WsOptionsChainUpdate) -> HashSet<String> {
146    match update {
147        WsOptionsChainUpdate::Upsert { row, .. } => {
148            let mut symbols = HashSet::new();
149            if let Some(call) = &row.call {
150                symbols.insert(call.symbol.to_ascii_uppercase());
151            }
152            if let Some(put) = &row.put {
153                symbols.insert(put.symbol.to_ascii_uppercase());
154            }
155            symbols
156        }
157        WsOptionsChainUpdate::Remove { symbol, .. } => HashSet::from([symbol.to_ascii_uppercase()]),
158    }
159}
160
161/// UTC calendar date code (YYYYMMDD) of an expiry timestamp in seconds.
162fn expiry_timestamp_to_code(expiry_ts_secs: u64) -> Option<u64> {
163    use chrono::Datelike;
164    let timestamp = i64::try_from(expiry_ts_secs).ok()?;
165    let date = chrono::DateTime::from_timestamp(timestamp, 0)?.date_naive();
166    Some((date.year() as u64) * 10_000 + (date.month() as u64) * 100 + date.day() as u64)
167}
168
169fn options_chain_update_matches(
170    update: &WsOptionsChainUpdate,
171    subscription: &OptionsChainSubscription,
172) -> bool {
173    if let Some(expected_expiry_code) = subscription.expiry {
174        let update_expiry = match update {
175            WsOptionsChainUpdate::Upsert { expiry, .. } => *expiry,
176            WsOptionsChainUpdate::Remove { expiry, .. } => *expiry,
177        };
178        // The subscription filter is a calendar date (YYYYMMDD). Compare on
179        // the UTC date of the update's expiry timestamp so the filter works
180        // across underlyings with different expiry times of day.
181        if expiry_timestamp_to_code(update_expiry) != Some(expected_expiry_code) {
182            return false;
183        }
184    }
185
186    if let Some(symbols_filter) = &subscription.symbols {
187        let symbols_set: HashSet<String> = symbols_filter.iter().cloned().collect();
188        let update_symbols = options_chain_update_symbols(update);
189        if symbols_set.is_disjoint(&update_symbols) {
190            return false;
191        }
192    }
193
194    match update {
195        WsOptionsChainUpdate::Upsert { row, .. } => {
196            let has_call = row.call.is_some();
197            let has_put = row.put.is_some();
198            match subscription.option_type {
199                OptionsChainOptionTypeFilter::Both => has_call || has_put,
200                OptionsChainOptionTypeFilter::Call => has_call,
201                OptionsChainOptionTypeFilter::Put => has_put,
202            }
203        }
204        WsOptionsChainUpdate::Remove { option_type, .. } => {
205            subscription.option_type.allows_option_type(option_type)
206        }
207    }
208}
209
210fn apply_option_type_filter_to_row(
211    mut row: OptionsChainStrikeRow,
212    option_type: OptionsChainOptionTypeFilter,
213) -> Option<OptionsChainStrikeRow> {
214    if row.call.is_some() && row.put.is_some() {
215        warn!(
216            strike = row.strike,
217            "options_chain upsert contained both call and put legs; pruning by subscription filter"
218        );
219    }
220
221    match option_type {
222        OptionsChainOptionTypeFilter::Both => {}
223        OptionsChainOptionTypeFilter::Call => row.put = None,
224        OptionsChainOptionTypeFilter::Put => row.call = None,
225    }
226
227    if row.call.is_none() && row.put.is_none() {
228        return None;
229    }
230
231    Some(row)
232}
233
234fn apply_options_chain_filters_to_update(
235    update: &WsOptionsChainUpdate,
236    option_type_filter: OptionsChainOptionTypeFilter,
237) -> Option<WsOptionsChainUpdate> {
238    match update {
239        WsOptionsChainUpdate::Upsert {
240            currency,
241            expiry,
242            row,
243            timestamp,
244        } => {
245            let row = apply_option_type_filter_to_row(row.clone(), option_type_filter)?;
246
247            Some(WsOptionsChainUpdate::Upsert {
248                currency: currency.clone(),
249                expiry: *expiry,
250                row,
251                timestamp: *timestamp,
252            })
253        }
254        WsOptionsChainUpdate::Remove {
255            currency,
256            expiry,
257            strike,
258            option_type,
259            symbol,
260            timestamp,
261        } => {
262            if !option_type_filter.allows_option_type(option_type) {
263                return None;
264            }
265
266            Some(WsOptionsChainUpdate::Remove {
267                currency: currency.clone(),
268                expiry: *expiry,
269                strike: *strike,
270                option_type: option_type.clone(),
271                symbol: symbol.clone(),
272                timestamp: *timestamp,
273            })
274        }
275    }
276}
277
278fn effective_option_type_for_matching_subscriptions(
279    subscriptions: &[OptionsChainSubscription],
280    update: &WsOptionsChainUpdate,
281) -> Option<OptionsChainOptionTypeFilter> {
282    let mut has_call = false;
283    let mut has_put = false;
284    let mut has_both = false;
285
286    for subscription in subscriptions {
287        if !options_chain_update_matches(update, subscription) {
288            continue;
289        }
290
291        match subscription.option_type {
292            OptionsChainOptionTypeFilter::Call => has_call = true,
293            OptionsChainOptionTypeFilter::Put => has_put = true,
294            OptionsChainOptionTypeFilter::Both => has_both = true,
295        }
296    }
297
298    if has_both || (has_call && has_put) {
299        Some(OptionsChainOptionTypeFilter::Both)
300    } else if has_call {
301        Some(OptionsChainOptionTypeFilter::Call)
302    } else if has_put {
303        Some(OptionsChainOptionTypeFilter::Put)
304    } else {
305        None
306    }
307}
308
309type ClientId = uuid::Uuid;
310
311struct Client {
312    sender: mpsc::UnboundedSender<Arc<WsMessage>>,
313    subscriptions: Vec<String>,
314    options_chain_subscriptions: Vec<OptionsChainSubscription>,
315    /// When set, only forward order_updates/fills whose symbol matches one of
316    /// these underlyings (e.g. "BTC", "ETH"). When empty, all orders are forwarded.
317    order_update_symbols: HashSet<String>,
318    /// When set, only forward orderbook/trades updates whose full instrument
319    /// symbol matches. When empty, all updates are forwarded.
320    public_channel_symbols: HashSet<String>,
321    authenticated: bool,
322    wallet_address: Option<WalletAddress>,
323}
324
325#[derive(Clone)]
326pub struct PubSubManager {
327    clients: Arc<DashMap<ClientId, Client>>,
328    channel_subscribers: Arc<DashMap<String, Vec<ClientId>>>,
329}
330
331impl Default for PubSubManager {
332    fn default() -> Self {
333        Self::new()
334    }
335}
336
337impl PubSubManager {
338    pub fn new() -> Self {
339        Self {
340            clients: Arc::new(DashMap::new()),
341            channel_subscribers: Arc::new(DashMap::new()),
342        }
343    }
344
345    pub fn add_client(
346        &self,
347        client_id: ClientId,
348        sender: mpsc::UnboundedSender<Arc<WsMessage>>,
349        authenticated: bool,
350        wallet_address: Option<WalletAddress>,
351    ) {
352        let client = Client {
353            sender,
354            subscriptions: Vec::new(),
355            options_chain_subscriptions: Vec::new(),
356            order_update_symbols: HashSet::new(),
357            public_channel_symbols: HashSet::new(),
358            authenticated,
359            wallet_address,
360        };
361        let prev = self.clients.insert(client_id, client);
362        // Only increment metrics if this is a new client (not a replacement)
363        if prev.is_none() {
364            gauge!("ht_ws_connections_active").increment(1.0);
365            counter!("ht_ws_connections_total").increment(1);
366        }
367    }
368
369    /// Set the wallet address on a connected client (message-based auth).
370    pub fn authenticate_client(
371        &self,
372        client_id: ClientId,
373        wallet_address: WalletAddress,
374    ) -> Result<(), String> {
375        let mut client = self.clients.get_mut(&client_id).ok_or("Client not found")?;
376        client.authenticated = true;
377        client.wallet_address = Some(wallet_address);
378        Ok(())
379    }
380
381    pub fn remove_client(&self, client_id: ClientId) {
382        if let Some((_, client)) = self.clients.remove(&client_id) {
383            // Only decrement metrics if client actually existed
384            gauge!("ht_ws_connections_active").decrement(1.0);
385            counter!("ht_ws_disconnections_total").increment(1);
386
387            for channel in &client.subscriptions {
388                if let Some(mut subs) = self.channel_subscribers.get_mut(channel) {
389                    subs.retain(|id| id != &client_id);
390                    if subs.is_empty() {
391                        // Drop the mutable ref before removing the key
392                        drop(subs);
393                        self.channel_subscribers.remove(channel);
394                    }
395                }
396                gauge!("ht_ws_channel_subscribers", "channel" => channel.clone()).decrement(1.0);
397            }
398        }
399    }
400
401    pub fn subscribe(
402        &self,
403        client_id: ClientId,
404        channel: String,
405        symbols: Option<Vec<String>>,
406        expiry: Option<String>,
407        option_type: Option<String>,
408    ) -> Result<(), String> {
409        // Get mutable access to client for auth check + subscription update
410        let mut client = self.clients.get_mut(&client_id).ok_or("Client not found")?;
411
412        // Check authentication for protected channels
413        if requires_authentication(&channel) && !client.authenticated {
414            return Err("Authentication required for this channel".to_string());
415        }
416
417        if channel == "options_chain" {
418            let subscription =
419                parse_options_chain_subscription(symbols.clone(), expiry, option_type)?;
420
421            if !client.options_chain_subscriptions.contains(&subscription) {
422                client.options_chain_subscriptions.push(subscription);
423            }
424        }
425
426        // For order_updates and fills: track symbol filters.
427        // Symbols are underlyings (e.g. "BTC", "ETH") extracted from the
428        // full instrument symbol (e.g. "BTC-20260131-100000-C" -> "BTC").
429        if (channel == "order_updates" || channel == "fills") && symbols.is_some() {
430            if let Some(syms) = &symbols {
431                for sym in syms {
432                    let normalized = sym.trim().to_uppercase();
433                    if !normalized.is_empty() {
434                        // Accept both full instrument names and bare underlyings.
435                        // Extract the underlying (first segment before '-').
436                        let underlying = normalized
437                            .split('-')
438                            .next()
439                            .unwrap_or(&normalized)
440                            .to_string();
441                        client.order_update_symbols.insert(underlying);
442                    }
443                }
444            }
445        }
446
447        // For public market-data channels: track exact instrument symbol filters.
448        if (channel == "orderbook" || channel == "trades" || channel == "indicative_market_data")
449            && symbols.is_some()
450        {
451            if let Some(syms) = &symbols {
452                for sym in syms {
453                    let normalized = sym.trim().to_uppercase();
454                    if !normalized.is_empty() {
455                        client.public_channel_symbols.insert(normalized);
456                    }
457                }
458            }
459        }
460
461        // Check if already subscribed to prevent duplicates
462        if client.subscriptions.contains(&channel) {
463            return Ok(());
464        }
465
466        // Add to client's subscriptions
467        client.subscriptions.push(channel.clone());
468
469        // Drop client ref before touching channel_subscribers (separate DashMap)
470        drop(client);
471
472        // Add to channel subscribers (with duplicate check)
473        let mut subs = self.channel_subscribers.entry(channel.clone()).or_default();
474        if !subs.contains(&client_id) {
475            subs.push(client_id);
476        }
477
478        gauge!("ht_ws_channel_subscribers", "channel" => channel).set(subs.len() as f64);
479
480        Ok(())
481    }
482
483    pub fn unsubscribe(
484        &self,
485        client_id: ClientId,
486        channel: String,
487        symbols: Option<Vec<String>>,
488        expiry: Option<String>,
489        option_type: Option<String>,
490    ) -> Result<(), String> {
491        let mut client = self.clients.get_mut(&client_id).ok_or("Client not found")?;
492
493        // For order_updates and fills: remove specific symbol filters,
494        // or clear all filters if no symbols specified.
495        if channel == "order_updates" || channel == "fills" {
496            if let Some(syms) = &symbols {
497                for sym in syms {
498                    let normalized = sym.trim().to_uppercase();
499                    let underlying = normalized
500                        .split('-')
501                        .next()
502                        .unwrap_or(&normalized)
503                        .to_string();
504                    client.order_update_symbols.remove(&underlying);
505                }
506                // If there are still symbol filters, don't fully unsubscribe
507                // from the channel - just acknowledge the symbol removal.
508                if !client.order_update_symbols.is_empty() {
509                    return Ok(());
510                }
511            } else {
512                // No symbols specified - clear all filters (full unsubscribe)
513                client.order_update_symbols.clear();
514            }
515        }
516
517        // For public market-data channels: remove specific instrument symbol filters.
518        if channel == "orderbook" || channel == "trades" || channel == "indicative_market_data" {
519            if let Some(syms) = &symbols {
520                for sym in syms {
521                    let normalized = sym.trim().to_uppercase();
522                    client.public_channel_symbols.remove(&normalized);
523                }
524                if !client.public_channel_symbols.is_empty() {
525                    return Ok(());
526                }
527            } else {
528                client.public_channel_symbols.clear();
529            }
530        }
531
532        if channel == "options_chain" {
533            let normalized_symbols = normalize_symbols(symbols);
534            let normalized_expiry = expiry.and_then(|value| {
535                let trimmed = value.trim();
536                if trimmed.is_empty() {
537                    None
538                } else {
539                    Some(trimmed.to_string())
540                }
541            });
542            let normalized_option_type = option_type.and_then(|value| {
543                let trimmed = value.trim();
544                if trimmed.is_empty() {
545                    None
546                } else {
547                    Some(trimmed.to_string())
548                }
549            });
550
551            if normalized_symbols.is_none()
552                && normalized_expiry.is_none()
553                && normalized_option_type.is_none()
554            {
555                client.options_chain_subscriptions.clear();
556            } else {
557                let subscription = parse_options_chain_subscription(
558                    normalized_symbols,
559                    normalized_expiry,
560                    normalized_option_type,
561                )?;
562                client
563                    .options_chain_subscriptions
564                    .retain(|existing| existing != &subscription);
565            }
566
567            if !client.options_chain_subscriptions.is_empty() {
568                return Ok(());
569            }
570        }
571
572        // Remove from client's subscriptions
573        client.subscriptions.retain(|ch| ch != &channel);
574
575        // Drop client ref before touching channel_subscribers
576        drop(client);
577
578        // Remove from channel subscribers
579        let mut subscriber_count = 0;
580        if let Some(mut subs) = self.channel_subscribers.get_mut(&channel) {
581            subs.retain(|id| id != &client_id);
582            subscriber_count = subs.len();
583            if subs.is_empty() {
584                drop(subs);
585                self.channel_subscribers.remove(&channel);
586            }
587        }
588
589        gauge!("ht_ws_channel_subscribers", "channel" => channel).set(subscriber_count as f64);
590
591        Ok(())
592    }
593
594    pub fn publish_to_channel(&self, channel: &str, message: WsMessage) {
595        debug!(
596            "Publishing ws message to channel: {:?}: {:?}",
597            channel, message
598        );
599        counter!("ht_ws_messages_sent_total", "channel" => channel.to_string()).increment(1);
600
601        let start = Instant::now();
602        let msg = Arc::new(message);
603        let mut fan_out: u64 = 0;
604
605        if let Some(subscriber_ids) = self.channel_subscribers.get(channel) {
606            let is_authenticated = requires_authentication(channel);
607
608            for client_id in subscriber_ids.iter() {
609                if let Some(client) = self.clients.get(client_id) {
610                    if is_authenticated {
611                        let should_send = match msg.as_ref() {
612                            WsMessage::OrderUpdate(order) => {
613                                let wallet_match =
614                                    client.wallet_address.as_ref() == Some(&order.wallet_address);
615                                let symbol_match = client.order_update_symbols.is_empty()
616                                    || order
617                                        .request
618                                        .symbol
619                                        .split('-')
620                                        .next()
621                                        .map(|u| {
622                                            client.order_update_symbols.contains(&u.to_uppercase())
623                                        })
624                                        .unwrap_or(true);
625                                wallet_match && symbol_match
626                            }
627                            WsMessage::Fill(fill) => {
628                                let wallet_match =
629                                    client.wallet_address.as_ref() == Some(&fill.wallet_address);
630                                let symbol_match = client.order_update_symbols.is_empty()
631                                    || fill
632                                        .symbol
633                                        .split('-')
634                                        .next()
635                                        .map(|u| {
636                                            client.order_update_symbols.contains(&u.to_uppercase())
637                                        })
638                                        .unwrap_or(true);
639                                wallet_match && symbol_match
640                            }
641                            WsMessage::PositionExpired(expired) => {
642                                client.wallet_address.as_ref() == Some(&expired.wallet_address)
643                            }
644                            WsMessage::LiquidationStateChange(state_change) => {
645                                client.wallet_address.as_ref() == Some(&state_change.wallet_address)
646                            }
647                            WsMessage::CompetitionUpdate(update) => {
648                                client.wallet_address.as_ref() == Some(&update.wallet_address)
649                            }
650                            WsMessage::CompetitionPnlSummary(summary) => {
651                                client.wallet_address.as_ref() == Some(&summary.wallet_address)
652                            }
653                            WsMessage::CompetitionFinalStats(final_stats) => {
654                                client.wallet_address.as_ref() == Some(&final_stats.wallet_address)
655                            }
656                            WsMessage::CompetitionRankChange(change) => {
657                                client.wallet_address.as_ref() == Some(&change.wallet_address)
658                            }
659                            WsMessage::CompetitionGapUpdate(gap) => {
660                                client.wallet_address.as_ref() == Some(&gap.wallet_address)
661                            }
662                            WsMessage::CompetitionFinalStanding(final_standing) => {
663                                client.wallet_address.as_ref()
664                                    == Some(&final_standing.wallet_address)
665                            }
666                            WsMessage::RfqQuotes(rfq) => {
667                                client.wallet_address.as_ref() == Some(&rfq.taker_wallet)
668                            }
669                            WsMessage::RfqStatusUpdate(rfq) => {
670                                client.wallet_address.as_ref() == Some(&rfq.taker_wallet)
671                            }
672                            WsMessage::RfqAcceptResult {
673                                taker_wallet: Some(ref tw),
674                                ..
675                            } => client.wallet_address.as_ref() == Some(tw),
676                            _ => true,
677                        };
678
679                        if should_send {
680                            let _ = client.sender.send(Arc::clone(&msg));
681                            fan_out += 1;
682                        }
683                    } else if channel == "options_chain" {
684                        if let WsMessage::OptionsChainUpdate(update) = msg.as_ref() {
685                            let effective_option_type =
686                                effective_option_type_for_matching_subscriptions(
687                                    &client.options_chain_subscriptions,
688                                    update,
689                                );
690                            if let Some(option_type) = effective_option_type {
691                                if let Some(filtered_update) =
692                                    apply_options_chain_filters_to_update(update, option_type)
693                                {
694                                    let _ = client.sender.send(Arc::new(
695                                        WsMessage::OptionsChainUpdate(filtered_update),
696                                    ));
697                                    fan_out += 1;
698                                }
699                            }
700                        }
701                    } else {
702                        // Public channel - apply symbol filter if set
703                        let passes_filter = if client.public_channel_symbols.is_empty() {
704                            true
705                        } else {
706                            match msg.as_ref() {
707                                WsMessage::OrderbookUpdate(update) => client
708                                    .public_channel_symbols
709                                    .contains(&update.symbol.to_uppercase()),
710                                WsMessage::Trade(trade) => client
711                                    .public_channel_symbols
712                                    .contains(&trade.symbol.to_uppercase()),
713                                WsMessage::IndicativeMarketData(update) => client
714                                    .public_channel_symbols
715                                    .contains(&update.instrument.to_uppercase()),
716                                _ => true,
717                            }
718                        };
719                        if passes_filter {
720                            let _ = client.sender.send(Arc::clone(&msg));
721                            fan_out += 1;
722                        }
723                    }
724                }
725            }
726        }
727
728        let duration_seconds = start.elapsed().as_secs_f64();
729        histogram!("ht_ws_publish_fan_out", "channel" => channel.to_string())
730            .record(fan_out as f64);
731        histogram!("ht_ws_publish_duration_seconds", "channel" => channel.to_string())
732            .record(duration_seconds);
733    }
734
735    pub async fn active_channels_with_prefix(&self, prefix: &str) -> Vec<String> {
736        let mut channels: Vec<String> = self
737            .channel_subscribers
738            .iter()
739            .map(|entry| entry.key().clone())
740            .filter(|channel| channel.starts_with(prefix))
741            .collect();
742        channels.sort_unstable();
743        channels
744    }
745
746    pub fn send_to_client(&self, client_id: ClientId, message: WsMessage) {
747        if let Some(client) = self.clients.get(&client_id) {
748            let _ = client.sender.send(Arc::new(message));
749        }
750    }
751
752    pub fn publish_orderbook_update(&self, update: WsOrderbookUpdate) {
753        self.publish_to_channel("orderbook", WsMessage::OrderbookUpdate(update));
754    }
755
756    pub fn publish_order_update(&self, order: WsOrderMessage) {
757        self.publish_to_channel("order_updates", WsMessage::OrderUpdate(order));
758    }
759
760    pub fn publish_fill(&self, fill: WsFillUpdate) {
761        self.publish_to_channel("fills", WsMessage::Fill(fill));
762    }
763
764    pub fn publish_market_update(&self, update: WsMarketUpdate) {
765        self.publish_to_channel("market_updates", WsMessage::MarketUpdate(update));
766    }
767
768    pub fn publish_trade(&self, trade: WsTradeUpdate) {
769        self.publish_to_channel("trades", WsMessage::Trade(trade));
770    }
771
772    pub fn publish_options_chain_update(&self, update: WsOptionsChainUpdate) {
773        self.publish_to_channel("options_chain", WsMessage::OptionsChainUpdate(update));
774    }
775
776    pub fn publish_index_prices(&self, update: WsIndexPriceUpdate) {
777        self.publish_to_channel("index_prices", WsMessage::IndexPriceUpdate(update));
778    }
779
780    pub fn publish_indicative_market_data(&self, update: WsIndicativeMarketData) {
781        self.publish_to_channel(
782            "indicative_market_data",
783            WsMessage::IndicativeMarketData(update),
784        );
785    }
786
787    pub fn publish_position_expired(&self, expired: WsPositionExpired) {
788        self.publish_to_channel("portfolio", WsMessage::PositionExpired(expired));
789    }
790
791    pub fn publish_liquidation_state_change(&self, state_change: WsLiquidationStateChange) {
792        self.publish_to_channel(
793            "liquidation",
794            WsMessage::LiquidationStateChange(state_change),
795        );
796    }
797
798    pub async fn publish_competition_update(&self, update: WsCompetitionUpdate) {
799        self.publish_to_channel("competition", WsMessage::CompetitionUpdate(update));
800    }
801
802    pub async fn publish_competition_pnl_summary(&self, summary: WsCompetitionPnlSummary) {
803        self.publish_to_channel("competition", WsMessage::CompetitionPnlSummary(summary));
804    }
805
806    pub async fn publish_competition_final_stats(&self, final_stats: WsCompetitionFinalStats) {
807        self.publish_to_channel("competition", WsMessage::CompetitionFinalStats(final_stats));
808    }
809
810    pub async fn publish_competition_rank_change(&self, update: WsCompetitionRankChange) {
811        self.publish_to_channel(
812            "competition_engagement",
813            WsMessage::CompetitionRankChange(update),
814        );
815    }
816
817    pub async fn publish_competition_gap_update(&self, update: WsCompetitionGapUpdate) {
818        self.publish_to_channel(
819            "competition_engagement",
820            WsMessage::CompetitionGapUpdate(update),
821        );
822    }
823
824    pub async fn publish_competition_final_standing(&self, update: WsCompetitionFinalStanding) {
825        self.publish_to_channel(
826            "competition_engagement",
827            WsMessage::CompetitionFinalStanding(update),
828        );
829    }
830
831    pub fn publish_rfq_quotes(&self, quotes: WsRfqQuotes) {
832        self.publish_to_channel("rfq", WsMessage::RfqQuotes(quotes));
833    }
834
835    /// Return wallets subscribed to the given channel (authenticated wallets only).
836    pub async fn subscribed_wallets(&self, channel: &str) -> Vec<WalletAddress> {
837        let mut wallets = HashSet::new();
838
839        if let Some(subscriber_ids) = self.channel_subscribers.get(channel) {
840            for client_id in subscriber_ids.iter() {
841                if let Some(client) = self.clients.get(client_id) {
842                    if let Some(wallet) = client.wallet_address {
843                        wallets.insert(wallet);
844                    }
845                }
846            }
847        }
848
849        wallets.into_iter().collect()
850    }
851}
852
853impl crate::rfq::qp_ws_state::RfqWebsocketPublisher for PubSubManager {
854    fn publish_indicative_market_data(&self, update: WsIndicativeMarketData) {
855        PubSubManager::publish_indicative_market_data(self, update);
856    }
857
858    fn publish_rfq_quotes(&self, quotes: WsRfqQuotes) {
859        PubSubManager::publish_rfq_quotes(self, quotes);
860    }
861
862    fn publish_to_channel(&self, channel: &str, message: WsMessage) {
863        PubSubManager::publish_to_channel(self, channel, message);
864    }
865}
866
867#[derive(Clone)]
868pub struct WsState {
869    pub pubsub: PubSubManager,
870    pub indicative_cache: Option<Arc<IndicativeQuoteCache>>,
871    pub portfolio_cache: Option<Arc<dyn PortfolioCacheApi>>,
872    pub competition_service: Option<Arc<hypercall_competition::CompetitionService>>,
873    pub heartbeat_config: (Duration, Duration),
874    pub rfq_handler_state: Option<RfqHandlerState>,
875    pub order_sender: Option<mpsc::Sender<hypercall_runtime_api::UnifiedEngineRequest>>,
876    pub trading_halt: Option<Arc<RwLock<crate::trading_halt::TradingHaltState>>>,
877    pub agent_auth: Arc<dyn hypercall_runtime_api::AgentAuthProvider>,
878    pub signing_chain_id: u64,
879}
880
881#[derive(Deserialize)]
882pub struct WsQuery {
883    pub wallet: Option<String>,
884}
885
886pub async fn websocket_handler(
887    ws: WebSocketUpgrade,
888    State(state): State<WsState>,
889    Query(query): Query<WsQuery>,
890) -> Response {
891    // WebSocket authentication modes:
892    // - No query params => public streams only
893    // - ?wallet=0x... => authenticated for that wallet, allows private streams
894    let (authenticated, wallet_address) = if let Some(wallet) = query.wallet {
895        // Wallet-based authentication for EIP-712 flow
896        // Parse wallet string to WalletAddress
897        match WalletAddress::from_str(&wallet) {
898            Ok(addr) => (true, Some(addr)),
899            Err(_) => {
900                tracing::warn!(
901                    "Invalid wallet address format in WebSocket query: {}",
902                    wallet
903                );
904                (false, None)
905            }
906        }
907    } else {
908        // Public streams only
909        (false, None)
910    };
911
912    ws.on_upgrade(move |socket| handle_socket(socket, state, authenticated, wallet_address))
913}
914
915async fn handle_socket(
916    socket: WebSocket,
917    state: WsState,
918    authenticated: bool,
919    wallet_address: Option<WalletAddress>,
920) {
921    let (heartbeat_interval, pong_timeout) = state.heartbeat_config;
922    let (mut sender, mut receiver) = socket.split();
923    let client_id = uuid::Uuid::new_v4();
924
925    let (tx, mut rx) = mpsc::unbounded_channel::<Arc<WsMessage>>();
926
927    // Add client to pubsub manager
928    let mut wallet_addr_clone = wallet_address;
929    let mut authenticated = authenticated;
930    state
931        .pubsub
932        .add_client(client_id, tx.clone(), authenticated, wallet_address);
933
934    // Task for portfolio updates if authenticated
935    let mut portfolio_task = None;
936    let mut portfolio_subscription: Option<(WalletAddress, u64)> = None;
937    if authenticated && wallet_addr_clone.is_some() && state.portfolio_cache.is_some() {
938        // This will be set up when client subscribes to portfolio channel
939    }
940
941    let mut heartbeat = time::interval(heartbeat_interval);
942    heartbeat.set_missed_tick_behavior(MissedTickBehavior::Delay);
943    heartbeat.tick().await;
944    let mut last_pong = Instant::now();
945
946    loop {
947        tokio::select! {
948            maybe_msg = rx.recv() => {
949                let Some(msg) = maybe_msg else {
950                    break;
951                };
952
953                if let Ok(json) = sonic_rs::to_string(msg.as_ref()) {
954                    debug!("Sending message to client: {:?}", msg);
955                    if sender.send(Message::Text(json)).await.is_err() {
956                        break;
957                    }
958                }
959            }
960            maybe_frame = receiver.next() => {
961                let Some(frame_result) = maybe_frame else {
962                    break;
963                };
964
965                let Ok(frame) = frame_result else {
966                    break;
967                };
968
969                match frame {
970                    Message::Text(text) => {
971                        let deser_result = sonic_rs::from_str::<WsMessage>(&text);
972                        if deser_result.is_err() && (text.contains("SubmitAutoExecuteRfq") || text.contains("SubmitRfq")) {
973                            tracing::warn!(
974                                client_id = ?client_id,
975                                error = %deser_result.as_ref().unwrap_err(),
976                                msg_len = text.len(),
977                                "Failed to deserialize RFQ WS message"
978                            );
979                        }
980                        if let Ok(ws_msg) = deser_result {
981                            match ws_msg {
982                                WsMessage::Subscribe {
983                                    channel,
984                                    symbols,
985                                    expiry,
986                                    option_type,
987                                } => {
988                                    match state.pubsub.subscribe(
989                                        client_id,
990                                        channel.clone(),
991                                        symbols.clone(),
992                                        expiry,
993                                        option_type,
994                                    ) {
995                                        Ok(_) => {
996                                            if channel == "portfolio"
997                                                && authenticated
998                                                && portfolio_subscription.is_none()
999                                            {
1000                                                if let Some(ref wallet) = wallet_addr_clone {
1001                                                    if let Some(ref cache) = state.portfolio_cache {
1002                                                        let (subscriber_id, mut portfolio_rx) =
1003                                                            cache.subscribe(*wallet).await;
1004                                                        let tx_clone = tx.clone();
1005                                                        portfolio_subscription =
1006                                                            Some((*wallet, subscriber_id));
1007                                                        portfolio_task =
1008                                                            Some(tokio::spawn(async move {
1009                                                                while let Some(update) =
1010                                                                    portfolio_rx.recv().await
1011                                                                {
1012                                                                    if tx_clone
1013                                                                        .send(Arc::new(
1014                                                                            WsMessage::PortfolioUpdate(
1015                                                                                update,
1016                                                                            ),
1017                                                                        ))
1018                                                                        .is_err()
1019                                                                    {
1020                                                                        break;
1021                                                                    }
1022                                                                }
1023                                                            }));
1024                                                    }
1025                                                }
1026                                            }
1027
1028                                            state.pubsub.send_to_client(
1029                                                client_id,
1030                                                WsMessage::Subscribed {
1031                                                    channel: channel.clone(),
1032                                                },
1033                                            );
1034
1035                                            if channel == "indicative_market_data" {
1036                                                if let Some(ref cache) = state.indicative_cache {
1037                                                    let symbol_filter = normalize_symbols(symbols);
1038                                                    let mut snapshot_count = 0u64;
1039                                                    for quote in cache.get_all_aggregates() {
1040                                                        if let Some(filter) = &symbol_filter {
1041                                                            if !filter.contains(
1042                                                                &quote.instrument.to_ascii_uppercase(),
1043                                                            ) {
1044                                                                continue;
1045                                                            }
1046                                                        }
1047                                                        snapshot_count += 1;
1048                                                        state
1049                                                            .pubsub
1050                                                            .send_to_client(
1051                                                                client_id,
1052                                                                WsMessage::IndicativeMarketData(
1053                                                                    WsIndicativeMarketData {
1054                                                                        instrument: quote.instrument,
1055                                                                        best_bid: Some(quote.best_bid),
1056                                                                        best_ask: Some(quote.best_ask),
1057                                                                        bid_iv: None,
1058                                                                        ask_iv: None,
1059                                                                        indicative_bid_size: quote
1060                                                                            .indicative_bid_size
1061                                                                            .into(),
1062                                                                        indicative_ask_size: quote
1063                                                                            .indicative_ask_size
1064                                                                            .into(),
1065                                                                        num_providers: quote
1066                                                                            .num_providers,
1067                                                                        timestamp: quote.updated_at,
1068                                                                    },
1069                                                                ),
1070                                                            );
1071                                                    }
1072                                                    counter!(
1073                                                        "ht_ws_indicative_snapshot_sent_total"
1074                                                    )
1075                                                    .increment(snapshot_count);
1076                                                }
1077                                            }
1078
1079                                            if channel == "competition" && authenticated {
1080                                                if let (Some(wallet), Some(service)) = (
1081                                                    wallet_addr_clone,
1082                                                    state.competition_service.as_ref(),
1083                                                ) {
1084                                                    match service
1085                                                        .build_competition_pnl_summary_for_wallet(
1086                                                            wallet,
1087                                                            chrono::Utc::now().timestamp_millis(),
1088                                                        )
1089                                                        .await
1090                                                    {
1091                                                        Ok(summary) => {
1092                                                            state.pubsub.send_to_client(
1093                                                                client_id,
1094                                                                WsMessage::CompetitionPnlSummary(
1095                                                                    summary,
1096                                                                ),
1097                                                            );
1098                                                        }
1099                                                        Err(err) => {
1100                                                            state.pubsub.send_to_client(
1101                                                                client_id,
1102                                                                WsMessage::Error {
1103                                                                    message: format!(
1104                                                                        "failed to build competition summary: {}",
1105                                                                        err
1106                                                                    ),
1107                                                                },
1108                                                            );
1109                                                        }
1110                                                    }
1111                                                }
1112                                            }
1113                                        }
1114                                        Err(err) => {
1115                                            state.pubsub.send_to_client(
1116                                                client_id,
1117                                                WsMessage::Error { message: err },
1118                                            );
1119                                        }
1120                                    }
1121                                }
1122                                WsMessage::Unsubscribe {
1123                                    channel,
1124                                    symbols,
1125                                    expiry,
1126                                    option_type,
1127                                } => {
1128                                    match state.pubsub.unsubscribe(
1129                                        client_id,
1130                                        channel.clone(),
1131                                        symbols,
1132                                        expiry,
1133                                        option_type,
1134                                    ) {
1135                                        Ok(_) => {
1136                                            if channel == "portfolio" {
1137                                                if let Some(task) = portfolio_task.take() {
1138                                                    task.abort();
1139                                                }
1140                                                if let Some((wallet, subscriber_id)) =
1141                                                    portfolio_subscription.take()
1142                                                {
1143                                                    if let Some(ref cache) = state.portfolio_cache {
1144                                                        cache.unsubscribe(&wallet, subscriber_id).await;
1145                                                    }
1146                                                }
1147                                            }
1148
1149                                            state.pubsub.send_to_client(
1150                                                client_id,
1151                                                WsMessage::Unsubscribed { channel },
1152                                            );
1153                                        }
1154                                        Err(err) => {
1155                                            state.pubsub.send_to_client(
1156                                                client_id,
1157                                                WsMessage::Error { message: err },
1158                                            );
1159                                        }
1160                                    }
1161                                }
1162                                WsMessage::Authenticate { wallet } => {
1163                                    match WalletAddress::from_str(&wallet) {
1164                                        Ok(addr) => {
1165                                            match state.pubsub.authenticate_client(client_id, addr) {
1166                                                Ok(_) => {
1167                                                    wallet_addr_clone = Some(addr);
1168                                                    authenticated = true;
1169                                                    state.pubsub.send_to_client(
1170                                                        client_id,
1171                                                        WsMessage::Authenticated {
1172                                                            wallet: wallet.clone(),
1173                                                        },
1174                                                    );
1175                                                }
1176                                                Err(err) => {
1177                                                    state.pubsub.send_to_client(
1178                                                        client_id,
1179                                                        WsMessage::Error { message: err },
1180                                                    );
1181                                                }
1182                                            }
1183                                        }
1184                                        Err(_) => {
1185                                            state.pubsub.send_to_client(
1186                                                client_id,
1187                                                WsMessage::Error {
1188                                                    message: format!(
1189                                                        "Invalid wallet address: {}",
1190                                                        wallet
1191                                                    ),
1192                                                },
1193                                            );
1194                                        }
1195                                    }
1196                                }
1197                                WsMessage::SubmitRfq {
1198                                    rfq_id,
1199                                    legs,
1200                                    wallet_address: req_wallet,
1201                                    nonce,
1202                                    signature,
1203                                } => {
1204                                    if !authenticated {
1205                                        state.pubsub.send_to_client(
1206                                            client_id,
1207                                            WsMessage::Error {
1208                                                message: "Not authenticated".to_string(),
1209                                            },
1210                                        );
1211                                    } else if let Some(ref rfq_state) = state.rfq_handler_state {
1212                                        // Convert WsRfqLegRequest -> RfqLegRequest
1213                                        let typed_legs: Vec<hypercall_types::RfqLegRequest> = legs
1214                                            .iter()
1215                                            .map(|l| hypercall_types::RfqLegRequest {
1216                                                instrument: l.instrument.clone(),
1217                                                side: l.side,
1218                                                size: l.size.clone(),
1219                                            })
1220                                            .collect();
1221
1222                                        let wallet = match WalletAddress::from_str(&req_wallet) {
1223                                            Ok(w) => w,
1224                                            Err(_) => {
1225                                                state.pubsub.send_to_client(
1226                                                    client_id,
1227                                                    WsMessage::Error {
1228                                                        message: format!(
1229                                                            "Invalid wallet address: {}",
1230                                                            req_wallet
1231                                                        ),
1232                                                    },
1233                                                );
1234                                                continue;
1235                                            }
1236                                        };
1237
1238                                        match super::handlers::rfq::submit_rfq_inner(
1239                                            &rfq_state.rfq_manager,
1240                                            rfq_state.agent_auth.as_ref(),
1241                                            &rfq_id,
1242                                            &typed_legs,
1243                                            wallet,
1244                                            nonce,
1245                                            &signature,
1246                                            state.signing_chain_id,
1247                                        )
1248                                        .await
1249                                        {
1250                                            Ok(record) => {
1251                                                state.pubsub.send_to_client(
1252                                                    client_id,
1253                                                    WsMessage::RfqStatusUpdate(WsRfqStatusUpdate {
1254                                                        rfq_id: record.rfq_id.to_string(),
1255                                                        status: format!("{:?}", record.status),
1256                                                        taker_wallet: record.taker_wallet,
1257                                                    }),
1258                                                );
1259                                            }
1260                                            Err(err) => {
1261                                                state.pubsub.send_to_client(
1262                                                    client_id,
1263                                                    WsMessage::Error {
1264                                                        message: format!(
1265                                                            "RFQ submission failed: {}",
1266                                                            err
1267                                                        ),
1268                                                    },
1269                                                );
1270                                            }
1271                                        }
1272                                    } else {
1273                                        state.pubsub.send_to_client(
1274                                            client_id,
1275                                            WsMessage::Error {
1276                                                message: "RFQ system not available".to_string(),
1277                                            },
1278                                        );
1279                                    }
1280                                }
1281                                WsMessage::SubmitAutoExecuteRfq {
1282                                    rfq_id,
1283                                    legs,
1284                                    wallet_address: req_wallet,
1285                                    limit_price,
1286                                    nonce,
1287                                    signature,
1288                                } => {
1289                                    tracing::info!(
1290                                        client_id = ?client_id,
1291                                        rfq_id = %rfq_id,
1292                                        limit_price = %limit_price,
1293                                        authenticated,
1294                                        "WS SubmitAutoExecuteRfq received"
1295                                    );
1296                                    if !authenticated {
1297                                        tracing::warn!(client_id = ?client_id, rfq_id = %rfq_id, "SubmitAutoExecuteRfq rejected: not authenticated");
1298                                        state.pubsub.send_to_client(
1299                                            client_id,
1300                                            WsMessage::Error {
1301                                                message: "Not authenticated".to_string(),
1302                                            },
1303                                        );
1304                                    } else if let Some(ref rfq_state) = state.rfq_handler_state {
1305                                        let typed_legs: Vec<hypercall_types::RfqLegRequest> = legs
1306                                            .iter()
1307                                            .map(|l| hypercall_types::RfqLegRequest {
1308                                                instrument: l.instrument.clone(),
1309                                                side: l.side,
1310                                                size: l.size.clone(),
1311                                            })
1312                                            .collect();
1313
1314                                        let wallet = match WalletAddress::from_str(&req_wallet) {
1315                                            Ok(w) => w,
1316                                            Err(_) => {
1317                                                state.pubsub.send_to_client(
1318                                                    client_id,
1319                                                    WsMessage::Error {
1320                                                        message: format!(
1321                                                            "Invalid wallet address: {}",
1322                                                            req_wallet
1323                                                        ),
1324                                                    },
1325                                                );
1326                                                continue;
1327                                            }
1328                                        };
1329
1330                                        match super::handlers::rfq::submit_auto_execute_rfq_inner(
1331                                            &rfq_state.rfq_manager,
1332                                            rfq_state.agent_auth.as_ref(),
1333                                            &rfq_id,
1334                                            &typed_legs,
1335                                            wallet,
1336                                            &limit_price,
1337                                            nonce,
1338                                            &signature,
1339                                            state.signing_chain_id,
1340                                        )
1341                                        .await
1342                                        {
1343                                            Ok(record) => {
1344                                                state.pubsub.send_to_client(
1345                                                    client_id,
1346                                                    WsMessage::RfqStatusUpdate(WsRfqStatusUpdate {
1347                                                        rfq_id: record.rfq_id.to_string(),
1348                                                        status: format!("{:?}", record.status),
1349                                                        taker_wallet: record.taker_wallet,
1350                                                    }),
1351                                                );
1352                                            }
1353                                            Err(err) => {
1354                                                state.pubsub.send_to_client(
1355                                                    client_id,
1356                                                    WsMessage::Error {
1357                                                        message: format!(
1358                                                            "Auto-execute RFQ submission failed: {}",
1359                                                            err
1360                                                        ),
1361                                                    },
1362                                                );
1363                                            }
1364                                        }
1365                                    } else {
1366                                        state.pubsub.send_to_client(
1367                                            client_id,
1368                                            WsMessage::Error {
1369                                                message: "RFQ system not available".to_string(),
1370                                            },
1371                                        );
1372                                    }
1373                                }
1374                                // TODO(rate-limit): WS PlaceOrder and AcceptRfqQuote bypass
1375                                // the HTTP `write_route_rate_limit_middleware`. The
1376                                // `RateLimitCache::check_and_increment` API is callable
1377                                // outside middleware, but `WsState` does not currently
1378                                // hold a `RateLimitCache` reference. Adding one requires
1379                                // threading it through `WsState` and plumbing the cache
1380                                // into the integrated server builder -- scope for a
1381                                // follow-up PR.
1382                                WsMessage::AcceptRfqQuote {
1383                                    rfq_id,
1384                                    quote_id,
1385                                    wallet_address: req_wallet,
1386                                    nonce,
1387                                    signature,
1388                                } => {
1389                                    if !authenticated {
1390                                        state.pubsub.send_to_client(
1391                                            client_id,
1392                                            WsMessage::Error {
1393                                                message: "Not authenticated".to_string(),
1394                                            },
1395                                        );
1396                                    } else if let Some(ref rfq_state) = state.rfq_handler_state {
1397                                        let wallet = match WalletAddress::from_str(&req_wallet) {
1398                                            Ok(w) => w,
1399                                            Err(_) => {
1400                                                state.pubsub.send_to_client(
1401                                                    client_id,
1402                                                    WsMessage::Error {
1403                                                        message: format!(
1404                                                            "Invalid wallet address: {}",
1405                                                            req_wallet
1406                                                        ),
1407                                                    },
1408                                                );
1409                                                continue;
1410                                            }
1411                                        };
1412
1413                                        match super::handlers::rfq::accept_rfq_quote_inner(
1414                                            &rfq_state.rfq_manager,
1415                                            rfq_state.agent_auth.as_ref(),
1416                                            &rfq_id,
1417                                            &quote_id,
1418                                            wallet,
1419                                            nonce,
1420                                            &signature,
1421                                            state.signing_chain_id,
1422                                        )
1423                                        .await
1424                                        {
1425                                            Ok(response) => {
1426                                                state.pubsub.send_to_client(
1427                                                    client_id,
1428                                                    WsMessage::RfqAcceptResult {
1429                                                        rfq_id: response.rfq_id,
1430                                                        quote_id: response.quote_id,
1431                                                        status: response.status.as_str().to_string(),
1432                                                        fill_id: Some(response.fill_id),
1433                                                        // send_to_client is already targeted; no
1434                                                        // channel-level filtering needed.
1435                                                        taker_wallet: None,
1436                                                    },
1437                                                );
1438                                            }
1439                                            Err(err) => {
1440                                                state.pubsub.send_to_client(
1441                                                    client_id,
1442                                                    WsMessage::Error {
1443                                                        message: format!(
1444                                                            "RFQ accept failed: {}",
1445                                                            err
1446                                                        ),
1447                                                    },
1448                                                );
1449                                            }
1450                                        }
1451                                    } else {
1452                                        state.pubsub.send_to_client(
1453                                            client_id,
1454                                            WsMessage::Error {
1455                                                message: "RFQ system not available".to_string(),
1456                                            },
1457                                        );
1458                                    }
1459                                }
1460                                WsMessage::PlaceOrder {
1461                                    wallet: req_wallet,
1462                                    symbol,
1463                                    side,
1464                                    size,
1465                                    price,
1466                                    tif,
1467                                    client_id: req_client_id,
1468                                    nonce,
1469                                    signature,
1470                                    mmp_enabled,
1471                                    builder_code_address,
1472                                } => {
1473                                    if !authenticated {
1474                                        state.pubsub.send_to_client(
1475                                            client_id,
1476                                            WsMessage::Error {
1477                                                message: "Not authenticated".to_string(),
1478                                            },
1479                                        );
1480                                    } else if let Some(ref order_sender) = state.order_sender {
1481                                        match handle_ws_place_order(
1482                                            &state,
1483                                            order_sender,
1484                                            &req_wallet,
1485                                            &symbol,
1486                                            &side,
1487                                            &size,
1488                                            &price,
1489                                            tif.as_deref(),
1490                                            req_client_id.as_deref(),
1491                                            nonce,
1492                                            &signature,
1493                                            mmp_enabled.unwrap_or(false),
1494                                            builder_code_address.as_deref(),
1495                                        )
1496                                        .await
1497                                        {
1498                                            Ok(result) => {
1499                                                state.pubsub.send_to_client(
1500                                                    client_id,
1501                                                    WsMessage::OrderResult(result),
1502                                                );
1503                                            }
1504                                            Err(err) => {
1505                                                state.pubsub.send_to_client(
1506                                                    client_id,
1507                                                    WsMessage::Error {
1508                                                        message: format!(
1509                                                            "Order placement failed: {}",
1510                                                            err
1511                                                        ),
1512                                                    },
1513                                                );
1514                                            }
1515                                        }
1516                                    } else {
1517                                        state.pubsub.send_to_client(
1518                                            client_id,
1519                                            WsMessage::Error {
1520                                                message: "Order system not available"
1521                                                    .to_string(),
1522                                            },
1523                                        );
1524                                    }
1525                                }
1526                                _ => {}
1527                            }
1528                        }
1529                    }
1530                    Message::Ping(payload) => {
1531                        counter!("ht_ws_ping_received_total").increment(1);
1532                        if sender.send(Message::Pong(payload)).await.is_err() {
1533                            break;
1534                        }
1535                    }
1536                    Message::Pong(_) => {
1537                        last_pong = Instant::now();
1538                        counter!("ht_ws_pongs_received_total").increment(1);
1539                    }
1540                    Message::Close(_) => break,
1541                    _ => {}
1542                }
1543            }
1544            _ = heartbeat.tick() => {
1545                if Instant::now().duration_since(last_pong) >= pong_timeout {
1546                    counter!("ht_ws_heartbeat_timeouts_total").increment(1);
1547                    let _ = sender
1548                        .send(Message::Close(Some(CloseFrame {
1549                            code: close_code::POLICY,
1550                            reason: "pong timeout".into(),
1551                        })))
1552                        .await;
1553                    break;
1554                }
1555
1556                counter!("ht_ws_pings_sent_total").increment(1);
1557                if sender.send(Message::Ping(Vec::new())).await.is_err() {
1558                    break;
1559                }
1560            }
1561        }
1562    }
1563
1564    // Clean up
1565    if let Some(task) = portfolio_task.take() {
1566        task.abort();
1567    }
1568    if let Some((wallet, subscriber_id)) = portfolio_subscription.take() {
1569        if let Some(ref cache) = state.portfolio_cache {
1570            cache.unsubscribe(&wallet, subscriber_id).await;
1571        }
1572    }
1573    state.pubsub.remove_client(client_id);
1574}
1575
1576/// Timeout for waiting for engine response on WS order placement (10 seconds).
1577const WS_ENGINE_RESPONSE_TIMEOUT: Duration = Duration::from_secs(10);
1578
1579/// Handle a PlaceOrder message received via WebSocket.
1580///
1581/// Replicates the core logic of the REST `place_order` / `process_single_order`
1582/// handlers: signature recovery, agent authorization, validation, engine
1583/// dispatch, and response mapping.
1584#[allow(clippy::too_many_arguments)]
1585async fn handle_ws_place_order(
1586    state: &WsState,
1587    order_sender: &mpsc::Sender<hypercall_runtime_api::UnifiedEngineRequest>,
1588    wallet_str: &str,
1589    symbol: &str,
1590    side_str: &str,
1591    size_str: &str,
1592    price_str: &str,
1593    tif_str: Option<&str>,
1594    client_id: Option<&str>,
1595    nonce: u64,
1596    signature: &str,
1597    mmp_enabled: bool,
1598    builder_code_address: Option<&str>,
1599) -> Result<WsOrderResult, String> {
1600    use hypercall_auth::SignatureRecovery;
1601    use hypercall_runtime_api::{increment_pending_requests, UnifiedEngineRequest};
1602    use hypercall_types::utils::get_timestamp_millis;
1603    use hypercall_types::{
1604        to_contract_units_decimal, validate_price_precision, ParsedOptionSymbol,
1605        MAX_PRICE_SIGNIFICANT_FIGURES,
1606    };
1607    use hypercall_types::{OrderAction, OrderActionMessage, OrderInfo, TimeInForce};
1608    use rust_decimal::prelude::ToPrimitive;
1609    use rust_decimal_macros::dec;
1610
1611    // Parse wallet address
1612    let wallet_address = WalletAddress::from_str(wallet_str)
1613        .map_err(|_| format!("Invalid wallet address: {}", wallet_str))?;
1614
1615    // Parse side
1616    let side = match side_str.to_lowercase().as_str() {
1617        "buy" => Side::Buy,
1618        "sell" => Side::Sell,
1619        other => return Err(format!("Invalid side: {}", other)),
1620    };
1621
1622    // Parse TIF
1623    let tif = match tif_str.unwrap_or("gtc").to_lowercase().as_str() {
1624        "gtc" => TimeInForce::GTC,
1625        "ioc" => TimeInForce::IOC,
1626        "fok" => TimeInForce::FOK,
1627        other => return Err(format!("Invalid tif: {}", other)),
1628    };
1629
1630    // Recover signer from EIP-712 signature
1631    let tif_label = match tif {
1632        TimeInForce::GTC => "gtc",
1633        TimeInForce::IOC => "ioc",
1634        TimeInForce::FOK => "fok",
1635    };
1636    let signer_address = SignatureRecovery::recover_place_order_signer(
1637        wallet_address,
1638        symbol,
1639        &format!("{:?}", side),
1640        size_str,
1641        price_str,
1642        tif_label,
1643        client_id.unwrap_or(""),
1644        nonce,
1645        signature,
1646        state.signing_chain_id,
1647    )
1648    .map_err(|e| format!("Signature verification failed: {}", e))?;
1649    let signer_wallet = WalletAddress::from(signer_address);
1650
1651    let is_authorized = state
1652        .agent_auth
1653        .is_agent_authorized(&wallet_address, &signer_wallet);
1654    if !is_authorized {
1655        return Err(format!(
1656            "Unauthorized: signer {} not authorized for wallet {}",
1657            signer_wallet, wallet_address
1658        ));
1659    }
1660
1661    // Validate the symbol
1662    let _parsed_symbol =
1663        ParsedOptionSymbol::from_symbol(symbol).map_err(|e| format!("Invalid symbol: {}", e))?;
1664
1665    // Check trading halt
1666    if let Some(ref trading_halt) = state.trading_halt {
1667        let halt_state = trading_halt.read().await;
1668        if let Some(reason) = halt_state.blocked_reason(symbol) {
1669            return Err(format!(
1670                "{}. Order placement is disabled while trading is halted.",
1671                reason
1672            ));
1673        }
1674    }
1675
1676    // Parse and validate price
1677    let price =
1678        Decimal::from_str(price_str).map_err(|_| format!("Invalid price format: {}", price_str))?;
1679    if price <= dec!(0) {
1680        return Err("Price must be positive".to_string());
1681    }
1682    let price_f64 = price
1683        .to_f64()
1684        .ok_or_else(|| format!("Price {} cannot be represented as f64", price))?;
1685    validate_price_precision(price_f64, MAX_PRICE_SIGNIFICANT_FIGURES)
1686        .map_err(|e| format!("Invalid price precision: {}", e))?;
1687
1688    // Parse and validate size
1689    let size =
1690        Decimal::from_str(size_str).map_err(|_| format!("Invalid size format: {}", size_str))?;
1691    if size <= dec!(0) {
1692        return Err("Size must be positive".to_string());
1693    }
1694
1695    // Parse optional builder code address
1696    let builder_code_wallet = match builder_code_address {
1697        Some(addr) => Some(
1698            WalletAddress::from_str(addr)
1699                .map_err(|_| format!("Invalid builder_code_address: {}", addr))?,
1700        ),
1701        None => None,
1702    };
1703
1704    // Build OrderInfo
1705    let order_info = OrderInfo {
1706        symbol: symbol.to_string(),
1707        price,
1708        size: to_contract_units_decimal(symbol, size),
1709        side,
1710        tif,
1711        client_id: client_id.map(|s| s.to_string()),
1712        order_id: None,
1713        is_perp: false,
1714        underlying: None,
1715        reduce_only: None,
1716        nonce: None,
1717        signature: None,
1718        mmp_enabled,
1719        builder_code_address: builder_code_wallet,
1720    };
1721
1722    // Build OrderActionMessage
1723    let order_action_msg = OrderActionMessage {
1724        timestamp: get_timestamp_millis(),
1725        info: order_info,
1726        action: OrderAction::CreateOrder,
1727        wallet: wallet_address,
1728        api_wallet_address: Some(signer_wallet),
1729        mmp_triggered: false,
1730        request_id: Some(::uuid::Uuid::now_v7().to_string()),
1731    };
1732
1733    // Create response channel
1734    let (response_tx, mut response_rx) = mpsc::channel(1);
1735
1736    let engine_request = UnifiedEngineRequest {
1737        message: order_action_msg,
1738        response_tx,
1739        enqueued_at: std::time::Instant::now(),
1740        #[cfg(feature = "otel-tracing")]
1741        trace_context: None,
1742    };
1743
1744    // Send to engine
1745    increment_pending_requests();
1746    order_sender
1747        .send(engine_request)
1748        .await
1749        .map_err(|_| "Failed to send order to engine".to_string())?;
1750
1751    // Wait for engine response
1752    let response = match tokio::time::timeout(WS_ENGINE_RESPONSE_TIMEOUT, response_rx.recv()).await
1753    {
1754        Ok(Some(resp)) => resp,
1755        Ok(None) => return Err("No response from engine".to_string()),
1756        Err(_) => return Err("Timeout waiting for engine response".to_string()),
1757    };
1758
1759    Ok(WsOrderResult {
1760        order_id: response.order_id,
1761        status: format!("{:?}", response.status),
1762        symbol: response.info.symbol.clone(),
1763        side: format!("{:?}", response.info.side),
1764        price: response.info.price.to_string(),
1765        // Echo back the human-readable size the client submitted, not the
1766        // engine's internal contract units (which are multiplied by 1e8).
1767        size: size_str.to_string(),
1768        reason: response.reason.clone(),
1769    })
1770}
1771
1772// Helper module for UUID
1773pub mod uuid {
1774    use std::sync::atomic::{AtomicU64, Ordering};
1775
1776    static COUNTER: AtomicU64 = AtomicU64::new(0);
1777
1778    #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1779    pub struct Uuid(u64);
1780
1781    impl Uuid {
1782        pub fn new_v4() -> Self {
1783            Uuid(COUNTER.fetch_add(1, Ordering::SeqCst))
1784        }
1785    }
1786}
1787
1788#[cfg(test)]
1789mod tests {
1790    use super::{
1791        validate_ws_heartbeat_config, ws_heartbeat_config, PubSubManager,
1792        DEFAULT_WS_HEARTBEAT_INTERVAL, DEFAULT_WS_PONG_TIMEOUT,
1793    };
1794    use hypercall_types::ws_protocol::WsIndicativeMarketData;
1795    use tokio::time::Duration;
1796
1797    #[test]
1798    fn test_ws_heartbeat_config_accepts_valid_custom_values() {
1799        let (heartbeat_interval, pong_timeout) = validate_ws_heartbeat_config(250, 750);
1800
1801        assert_eq!(heartbeat_interval, Duration::from_millis(250));
1802        assert_eq!(pong_timeout, Duration::from_millis(750));
1803    }
1804
1805    #[test]
1806    fn test_ws_heartbeat_config_rejects_zero_interval() {
1807        let (heartbeat_interval, pong_timeout) = validate_ws_heartbeat_config(0, 60_000);
1808
1809        assert_eq!(heartbeat_interval, DEFAULT_WS_HEARTBEAT_INTERVAL);
1810        assert_eq!(pong_timeout, DEFAULT_WS_PONG_TIMEOUT);
1811    }
1812
1813    #[test]
1814    fn test_ws_heartbeat_config_rejects_timeout_shorter_than_interval() {
1815        let (heartbeat_interval, pong_timeout) = validate_ws_heartbeat_config(500, 400);
1816
1817        assert_eq!(heartbeat_interval, DEFAULT_WS_HEARTBEAT_INTERVAL);
1818        assert_eq!(pong_timeout, DEFAULT_WS_PONG_TIMEOUT);
1819    }
1820
1821    #[test]
1822    fn test_ws_heartbeat_config_uses_defaults_without_runtime_config() {
1823        let (heartbeat_interval, pong_timeout) =
1824            ws_heartbeat_config(&hypercall_config::ApiRuntimeConfig::default());
1825
1826        assert_eq!(heartbeat_interval, DEFAULT_WS_HEARTBEAT_INTERVAL);
1827        assert_eq!(pong_timeout, DEFAULT_WS_PONG_TIMEOUT);
1828    }
1829
1830    #[test]
1831    fn test_indicative_market_data_honors_symbol_filter() {
1832        let pubsub = PubSubManager::new();
1833        let client_id = super::uuid::Uuid::new_v4();
1834        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
1835
1836        pubsub.add_client(client_id, tx, false, None);
1837        pubsub
1838            .subscribe(
1839                client_id,
1840                "indicative_market_data".to_string(),
1841                Some(vec!["BTC-20260131-100000-C".to_string()]),
1842                None,
1843                None,
1844            )
1845            .expect("indicative subscription should succeed");
1846
1847        pubsub.publish_indicative_market_data(WsIndicativeMarketData {
1848            instrument: "ETH-20260131-100000-C".to_string(),
1849            best_bid: Some(dec!(10)),
1850            bid_iv: None,
1851            ask_iv: None,
1852            best_ask: Some(dec!(11)),
1853            indicative_bid_size: Some(dec!(1)),
1854            indicative_ask_size: Some(dec!(1)),
1855            num_providers: 1,
1856            timestamp: 1,
1857        });
1858        assert!(rx.try_recv().is_err());
1859
1860        pubsub.publish_indicative_market_data(WsIndicativeMarketData {
1861            instrument: "BTC-20260131-100000-C".to_string(),
1862            best_bid: Some(dec!(20)),
1863            bid_iv: None,
1864            ask_iv: None,
1865            best_ask: Some(dec!(21)),
1866            indicative_bid_size: Some(dec!(2)),
1867            indicative_ask_size: Some(dec!(2)),
1868            num_providers: 1,
1869            timestamp: 2,
1870        });
1871
1872        let message = rx
1873            .try_recv()
1874            .expect("matching indicative quote should be delivered");
1875        match message.as_ref() {
1876            WsMessage::IndicativeMarketData(update) => {
1877                assert_eq!(update.instrument, "BTC-20260131-100000-C");
1878                assert_eq!(update.best_bid, Some(dec!(20)));
1879                assert_eq!(update.best_ask, Some(dec!(21)));
1880            }
1881            other => panic!("expected IndicativeMarketData, got {other:?}"),
1882        }
1883        assert!(rx.try_recv().is_err());
1884    }
1885
1886    // --- WS message serialization round-trip tests ---
1887
1888    use super::{
1889        WsMessage, WsOrderResult, WsRfqLegRequest, WsRfqQuoteEntry, WsRfqQuotes, WsRfqStatusUpdate,
1890    };
1891    use hypercall_types::{Side, WalletAddress};
1892    use rust_decimal_macros::dec;
1893    use std::str::FromStr;
1894
1895    #[test]
1896    fn test_submit_rfq_roundtrip() {
1897        let msg = WsMessage::SubmitRfq {
1898            rfq_id: "test-123".to_string(),
1899            legs: vec![WsRfqLegRequest {
1900                instrument: "ETH-20260425-2400-C".to_string(),
1901                side: Side::Buy,
1902                size: "10".to_string(),
1903            }],
1904            wallet_address: "0x1234567890abcdef1234567890abcdef12345678".to_string(),
1905            nonce: 42,
1906            signature: "0xsig".to_string(),
1907        };
1908        let json = serde_json::to_string(&msg).unwrap();
1909        assert!(json.contains("\"type\":\"SubmitRfq\""));
1910        assert!(json.contains("\"rfq_id\":\"test-123\""));
1911        assert!(json.contains("\"nonce\":42"));
1912
1913        let parsed: WsMessage = serde_json::from_str(&json).unwrap();
1914        match parsed {
1915            WsMessage::SubmitRfq {
1916                rfq_id,
1917                legs,
1918                wallet_address,
1919                nonce,
1920                signature,
1921            } => {
1922                assert_eq!(rfq_id, "test-123");
1923                assert_eq!(legs.len(), 1);
1924                assert_eq!(legs[0].instrument, "ETH-20260425-2400-C");
1925                assert_eq!(legs[0].size, "10");
1926                assert_eq!(wallet_address, "0x1234567890abcdef1234567890abcdef12345678");
1927                assert_eq!(nonce, 42);
1928                assert_eq!(signature, "0xsig");
1929            }
1930            other => panic!("Expected SubmitRfq, got {:?}", other),
1931        }
1932    }
1933
1934    #[test]
1935    fn test_accept_rfq_quote_roundtrip() {
1936        let msg = WsMessage::AcceptRfqQuote {
1937            rfq_id: "rfq-abc".to_string(),
1938            quote_id: "quote-xyz".to_string(),
1939            wallet_address: "0xdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef".to_string(),
1940            nonce: 99,
1941            signature: "0xdeadsig".to_string(),
1942        };
1943        let json = serde_json::to_string(&msg).unwrap();
1944        assert!(json.contains("\"type\":\"AcceptRfqQuote\""));
1945        assert!(json.contains("\"rfq_id\":\"rfq-abc\""));
1946        assert!(json.contains("\"quote_id\":\"quote-xyz\""));
1947
1948        let parsed: WsMessage = serde_json::from_str(&json).unwrap();
1949        match parsed {
1950            WsMessage::AcceptRfqQuote {
1951                rfq_id,
1952                quote_id,
1953                wallet_address,
1954                nonce,
1955                signature,
1956            } => {
1957                assert_eq!(rfq_id, "rfq-abc");
1958                assert_eq!(quote_id, "quote-xyz");
1959                assert_eq!(wallet_address, "0xdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef");
1960                assert_eq!(nonce, 99);
1961                assert_eq!(signature, "0xdeadsig");
1962            }
1963            other => panic!("Expected AcceptRfqQuote, got {:?}", other),
1964        }
1965    }
1966
1967    #[test]
1968    fn test_place_order_roundtrip() {
1969        let msg = WsMessage::PlaceOrder {
1970            wallet: "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".to_string(),
1971            symbol: "BTC-20260131-100000-C".to_string(),
1972            side: "Buy".to_string(),
1973            size: "1.5".to_string(),
1974            price: "5000".to_string(),
1975            tif: Some("gtc".to_string()),
1976            client_id: Some("my-order-1".to_string()),
1977            nonce: 1000,
1978            signature: "0xordersig".to_string(),
1979            mmp_enabled: Some(true),
1980            builder_code_address: None,
1981        };
1982        let json = serde_json::to_string(&msg).unwrap();
1983        assert!(json.contains("\"type\":\"PlaceOrder\""));
1984        assert!(json.contains("\"symbol\":\"BTC-20260131-100000-C\""));
1985        assert!(json.contains("\"side\":\"Buy\""));
1986        assert!(json.contains("\"price\":\"5000\""));
1987        // builder_code_address should be omitted via skip_serializing_if
1988        assert!(!json.contains("builder_code_address"));
1989
1990        let parsed: WsMessage = serde_json::from_str(&json).unwrap();
1991        match parsed {
1992            WsMessage::PlaceOrder {
1993                wallet,
1994                symbol,
1995                side,
1996                size,
1997                price,
1998                tif,
1999                client_id,
2000                nonce,
2001                signature,
2002                mmp_enabled,
2003                builder_code_address,
2004            } => {
2005                assert_eq!(wallet, "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa");
2006                assert_eq!(symbol, "BTC-20260131-100000-C");
2007                assert_eq!(side, "Buy");
2008                assert_eq!(size, "1.5");
2009                assert_eq!(price, "5000");
2010                assert_eq!(tif, Some("gtc".to_string()));
2011                assert_eq!(client_id, Some("my-order-1".to_string()));
2012                assert_eq!(nonce, 1000);
2013                assert_eq!(signature, "0xordersig");
2014                assert_eq!(mmp_enabled, Some(true));
2015                assert!(builder_code_address.is_none());
2016            }
2017            other => panic!("Expected PlaceOrder, got {:?}", other),
2018        }
2019    }
2020
2021    #[test]
2022    fn test_place_order_minimal_fields_roundtrip() {
2023        // Test with only required fields (no tif, client_id, mmp_enabled, builder_code_address)
2024        let json = r#"{"type":"PlaceOrder","wallet":"0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa","symbol":"ETH-20260228-3000-P","side":"Sell","size":"2","price":"100","nonce":1,"signature":"0xsig"}"#;
2025        let parsed: WsMessage = serde_json::from_str(json).unwrap();
2026        match parsed {
2027            WsMessage::PlaceOrder {
2028                tif,
2029                client_id,
2030                mmp_enabled,
2031                builder_code_address,
2032                ..
2033            } => {
2034                assert!(tif.is_none());
2035                assert!(client_id.is_none());
2036                assert!(mmp_enabled.is_none());
2037                assert!(builder_code_address.is_none());
2038            }
2039            other => panic!("Expected PlaceOrder, got {:?}", other),
2040        }
2041    }
2042
2043    #[test]
2044    fn test_rfq_quotes_response_serialization() {
2045        let msg = WsMessage::RfqQuotes(WsRfqQuotes {
2046            rfq_id: "rfq-001".to_string(),
2047            quotes: vec![WsRfqQuoteEntry {
2048                quote_id: "q-1".to_string(),
2049                net_premium: dec!(150.50),
2050                expires_at: 1700000000,
2051            }],
2052            status: "quoted".to_string(),
2053            taker_wallet: WalletAddress::from_str("0xbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")
2054                .unwrap(),
2055        });
2056        let json = serde_json::to_string(&msg).unwrap();
2057        assert!(json.contains("\"type\":\"RfqQuotes\""));
2058        assert!(json.contains("\"rfq_id\":\"rfq-001\""));
2059        assert!(json.contains("\"quote_id\":\"q-1\""));
2060        assert!(json.contains("\"status\":\"quoted\""));
2061
2062        // Verify it round-trips
2063        let parsed: WsMessage = serde_json::from_str(&json).unwrap();
2064        match parsed {
2065            WsMessage::RfqQuotes(quotes) => {
2066                assert_eq!(quotes.rfq_id, "rfq-001");
2067                assert_eq!(quotes.quotes.len(), 1);
2068                assert_eq!(quotes.quotes[0].quote_id, "q-1");
2069                assert_eq!(quotes.status, "quoted");
2070            }
2071            other => panic!("Expected RfqQuotes, got {:?}", other),
2072        }
2073    }
2074
2075    #[test]
2076    fn test_rfq_accept_result_serialization() {
2077        let msg = WsMessage::RfqAcceptResult {
2078            rfq_id: "rfq-002".to_string(),
2079            quote_id: "q-2".to_string(),
2080            status: "filled".to_string(),
2081            fill_id: Some("fill-abc".to_string()),
2082            taker_wallet: None,
2083        };
2084        let json = serde_json::to_string(&msg).unwrap();
2085        assert!(json.contains("\"type\":\"RfqAcceptResult\""));
2086        assert!(json.contains("\"rfq_id\":\"rfq-002\""));
2087        assert!(json.contains("\"fill_id\":\"fill-abc\""));
2088        // taker_wallet is skip_serializing so must NOT appear in JSON
2089        assert!(!json.contains("taker_wallet"));
2090
2091        let parsed: WsMessage = serde_json::from_str(&json).unwrap();
2092        match parsed {
2093            WsMessage::RfqAcceptResult {
2094                rfq_id,
2095                quote_id,
2096                status,
2097                fill_id,
2098                ..
2099            } => {
2100                assert_eq!(rfq_id, "rfq-002");
2101                assert_eq!(quote_id, "q-2");
2102                assert_eq!(status, "filled");
2103                assert_eq!(fill_id, Some("fill-abc".to_string()));
2104            }
2105            other => panic!("Expected RfqAcceptResult, got {:?}", other),
2106        }
2107    }
2108
2109    #[test]
2110    fn test_order_result_serialization() {
2111        let msg = WsMessage::OrderResult(WsOrderResult {
2112            order_id: Some(12345),
2113            status: "Open".to_string(),
2114            symbol: "BTC-20260131-100000-C".to_string(),
2115            side: "Buy".to_string(),
2116            price: "5000".to_string(),
2117            size: "1000000".to_string(),
2118            reason: None,
2119        });
2120        let json = serde_json::to_string(&msg).unwrap();
2121        assert!(json.contains("\"type\":\"OrderResult\""));
2122        assert!(json.contains("\"order_id\":12345"));
2123        assert!(json.contains("\"status\":\"Open\""));
2124        assert!(json.contains("\"symbol\":\"BTC-20260131-100000-C\""));
2125        // reason should be omitted via skip_serializing_if
2126        assert!(!json.contains("\"reason\""));
2127
2128        let parsed: WsMessage = serde_json::from_str(&json).unwrap();
2129        match parsed {
2130            WsMessage::OrderResult(result) => {
2131                assert_eq!(result.order_id, Some(12345));
2132                assert_eq!(result.status, "Open");
2133                assert_eq!(result.symbol, "BTC-20260131-100000-C");
2134                assert_eq!(result.side, "Buy");
2135                assert!(result.reason.is_none());
2136            }
2137            other => panic!("Expected OrderResult, got {:?}", other),
2138        }
2139    }
2140
2141    #[test]
2142    fn test_order_result_with_rejection_reason() {
2143        let msg = WsMessage::OrderResult(WsOrderResult {
2144            order_id: None,
2145            status: "Rejected".to_string(),
2146            symbol: "ETH-20260228-3000-P".to_string(),
2147            side: "Sell".to_string(),
2148            price: "100".to_string(),
2149            size: "500000".to_string(),
2150            reason: Some("Insufficient margin".to_string()),
2151        });
2152        let json = serde_json::to_string(&msg).unwrap();
2153        assert!(json.contains("\"reason\":\"Insufficient margin\""));
2154        assert!(json.contains("\"order_id\":null"));
2155
2156        let parsed: WsMessage = serde_json::from_str(&json).unwrap();
2157        match parsed {
2158            WsMessage::OrderResult(result) => {
2159                assert!(result.order_id.is_none());
2160                assert_eq!(result.reason, Some("Insufficient margin".to_string()));
2161            }
2162            other => panic!("Expected OrderResult, got {:?}", other),
2163        }
2164    }
2165
2166    #[test]
2167    fn test_rfq_status_update_serialization() {
2168        let msg = WsMessage::RfqStatusUpdate(WsRfqStatusUpdate {
2169            rfq_id: "rfq-status-test".to_string(),
2170            status: "expired".to_string(),
2171            taker_wallet: WalletAddress::from_str("0xcccccccccccccccccccccccccccccccccccccccc")
2172                .unwrap(),
2173        });
2174        let json = serde_json::to_string(&msg).unwrap();
2175        assert!(json.contains("\"type\":\"RfqStatusUpdate\""));
2176        assert!(json.contains("\"status\":\"expired\""));
2177
2178        let parsed: WsMessage = serde_json::from_str(&json).unwrap();
2179        match parsed {
2180            WsMessage::RfqStatusUpdate(update) => {
2181                assert_eq!(update.rfq_id, "rfq-status-test");
2182                assert_eq!(update.status, "expired");
2183            }
2184            other => panic!("Expected RfqStatusUpdate, got {:?}", other),
2185        }
2186    }
2187}