Skip to main content

hypercall_api/rfq/
qp_sessions.rs

1use crate::rfq::rfq_manager::QpSessionRegistry;
2use dashmap::DashMap;
3use hypercall_types::WalletAddress;
4use hypercall_ws_protocol::QpServerMessage as QpOutboundMessage;
5use metrics::gauge;
6use tokio::sync::mpsc;
7use uuid::Uuid;
8
9/// Manages active quote-provider WebSocket sessions.
10pub struct QpSessionManager {
11    sessions: DashMap<WalletAddress, Vec<QpSession>>,
12}
13
14struct QpSession {
15    session_id: Uuid,
16    sender: mpsc::UnboundedSender<QpOutboundMessage>,
17}
18
19impl Default for QpSessionManager {
20    fn default() -> Self {
21        Self::new()
22    }
23}
24
25impl QpSessionManager {
26    pub fn new() -> Self {
27        Self {
28            sessions: DashMap::new(),
29        }
30    }
31
32    pub fn register_session(
33        &self,
34        wallet: WalletAddress,
35        sender: mpsc::UnboundedSender<QpOutboundMessage>,
36    ) -> Uuid {
37        let session_id = Uuid::now_v7();
38        self.sessions
39            .entry(wallet)
40            .or_default()
41            .push(QpSession { session_id, sender });
42        let total: usize = self.sessions.iter().map(|e| e.value().len()).sum();
43        gauge!("ht_qp_session_count").set(total as f64);
44        session_id
45    }
46
47    /// Remove a single QP session. Returns `true` if the wallet now has
48    /// zero remaining sessions (i.e. the QP is fully disconnected),
49    /// `false` if other sessions for the same wallet are still active.
50    /// Callers that need to cascade cleanup (e.g. evicting indicative
51    /// quotes from the aggregate cache) should only do so when this
52    /// returns `true`, otherwise a single tab/socket closing would
53    /// clobber quotes for a QP that still has other live connections.
54    pub fn remove_session(&self, wallet: &WalletAddress, session_id: Uuid) -> bool {
55        // Two invariants:
56        //   1. Every `self.sessions.iter()` (which acquires read guards on *all*
57        //      shards) must run *after* we drop the per-wallet write guard,
58        //      otherwise DashMap deadlocks on the shard that still has the
59        //      write lock from `get_mut` held. The old control flow at the
60        //      non-empty branch fell through into `self.sessions.iter()` with
61        //      the `RefMut` still alive and hung the sync `test_session_manager_
62        //      multiple_sessions_per_wallet` test past the 180s nextest cap.
63        //   2. `self.sessions.remove(wallet)` also needs the same shard's
64        //      write lock, so it also must run after the guard is dropped.
65        let fully_disconnected = {
66            let Some(mut sessions) = self.sessions.get_mut(wallet) else {
67                // Wallet had no entry, treat as fully disconnected.
68                return true;
69            };
70            sessions.retain(|s| s.session_id != session_id);
71            sessions.is_empty()
72        }; // <- RefMut dropped here
73
74        if fully_disconnected {
75            self.sessions.remove(wallet);
76        }
77
78        let total: usize = self.sessions.iter().map(|e| e.value().len()).sum();
79        gauge!("ht_qp_session_count").set(total as f64);
80        fully_disconnected
81    }
82
83    /// Send a message to all sessions for a given QP wallet.
84    pub fn send_to_qp(&self, wallet: &WalletAddress, message: QpOutboundMessage) {
85        if let Some(sessions) = self.sessions.get(wallet) {
86            for session in sessions.iter() {
87                let _ = session.sender.send(message.clone());
88            }
89        }
90    }
91
92    /// Send an RFQ request to a list of eligible QPs.
93    pub fn send_rfq_to_eligible(
94        &self,
95        eligible_wallets: &[WalletAddress],
96        message: QpOutboundMessage,
97    ) {
98        for wallet in eligible_wallets {
99            self.send_to_qp(wallet, message.clone());
100        }
101    }
102
103    /// Get wallets of all currently connected QPs.
104    pub fn get_connected_wallets(&self) -> Vec<WalletAddress> {
105        self.sessions.iter().map(|r| *r.key()).collect()
106    }
107}
108
109impl QpSessionRegistry for QpSessionManager {
110    fn send_rfq_to_eligible(&self, eligible_wallets: &[WalletAddress], message: QpOutboundMessage) {
111        QpSessionManager::send_rfq_to_eligible(self, eligible_wallets, message);
112    }
113
114    fn get_connected_wallets(&self) -> Vec<WalletAddress> {
115        QpSessionManager::get_connected_wallets(self)
116    }
117}