hypercall_api/rfq/
qp_sessions.rs1use crate::rfq::rfq_manager::QpSessionRegistry;
2use dashmap::DashMap;
3use hypercall_types::WalletAddress;
4use hypercall_ws_protocol::QpServerMessage as QpOutboundMessage;
5use metrics::gauge;
6use tokio::sync::mpsc;
7use uuid::Uuid;
8
9pub struct QpSessionManager {
11 sessions: DashMap<WalletAddress, Vec<QpSession>>,
12}
13
14struct QpSession {
15 session_id: Uuid,
16 sender: mpsc::UnboundedSender<QpOutboundMessage>,
17}
18
19impl Default for QpSessionManager {
20 fn default() -> Self {
21 Self::new()
22 }
23}
24
25impl QpSessionManager {
26 pub fn new() -> Self {
27 Self {
28 sessions: DashMap::new(),
29 }
30 }
31
32 pub fn register_session(
33 &self,
34 wallet: WalletAddress,
35 sender: mpsc::UnboundedSender<QpOutboundMessage>,
36 ) -> Uuid {
37 let session_id = Uuid::now_v7();
38 self.sessions
39 .entry(wallet)
40 .or_default()
41 .push(QpSession { session_id, sender });
42 let total: usize = self.sessions.iter().map(|e| e.value().len()).sum();
43 gauge!("ht_qp_session_count").set(total as f64);
44 session_id
45 }
46
47 pub fn remove_session(&self, wallet: &WalletAddress, session_id: Uuid) -> bool {
55 let fully_disconnected = {
66 let Some(mut sessions) = self.sessions.get_mut(wallet) else {
67 return true;
69 };
70 sessions.retain(|s| s.session_id != session_id);
71 sessions.is_empty()
72 }; if fully_disconnected {
75 self.sessions.remove(wallet);
76 }
77
78 let total: usize = self.sessions.iter().map(|e| e.value().len()).sum();
79 gauge!("ht_qp_session_count").set(total as f64);
80 fully_disconnected
81 }
82
83 pub fn send_to_qp(&self, wallet: &WalletAddress, message: QpOutboundMessage) {
85 if let Some(sessions) = self.sessions.get(wallet) {
86 for session in sessions.iter() {
87 let _ = session.sender.send(message.clone());
88 }
89 }
90 }
91
92 pub fn send_rfq_to_eligible(
94 &self,
95 eligible_wallets: &[WalletAddress],
96 message: QpOutboundMessage,
97 ) {
98 for wallet in eligible_wallets {
99 self.send_to_qp(wallet, message.clone());
100 }
101 }
102
103 pub fn get_connected_wallets(&self) -> Vec<WalletAddress> {
105 self.sessions.iter().map(|r| *r.key()).collect()
106 }
107}
108
109impl QpSessionRegistry for QpSessionManager {
110 fn send_rfq_to_eligible(&self, eligible_wallets: &[WalletAddress], message: QpOutboundMessage) {
111 QpSessionManager::send_rfq_to_eligible(self, eligible_wallets, message);
112 }
113
114 fn get_connected_wallets(&self) -> Vec<WalletAddress> {
115 QpSessionManager::get_connected_wallets(self)
116 }
117}