1use std::collections::HashMap;
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10
11use futures::{SinkExt, StreamExt};
12use serde::{Deserialize, Serialize};
13use tokio::sync::RwLock;
14use tokio_tungstenite::tungstenite::Message;
15use tracing::{debug, error, info, warn};
16
17pub const DEFAULT_WS_URL: &str = "wss://api.hyperliquid.xyz/ws";
19
20pub const DEFAULT_TESTNET_WS_URL: &str = "wss://api.hyperliquid-testnet.xyz/ws";
22
23#[derive(Debug, Clone)]
25struct PriceEntry {
26 price: f64,
27 prev_day_px: Option<f64>,
28 updated_at: Instant,
29}
30
31pub struct HyperliquidWsFeed {
36 ws_url: String,
37 coins: Vec<String>,
38 prices: Arc<RwLock<HashMap<String, PriceEntry>>>,
39}
40
41impl HyperliquidWsFeed {
42 pub fn new(ws_url: String, coins: Vec<String>) -> Self {
43 Self {
44 ws_url,
45 coins,
46 prices: Arc::new(RwLock::new(HashMap::new())),
47 }
48 }
49
50 pub fn new_mock(coins: Vec<String>, default_prices: &HashMap<String, f64>) -> Self {
54 let mut prices = HashMap::new();
55 for coin in &coins {
56 if let Some(&price) = default_prices.get(coin) {
57 prices.insert(
58 coin.clone(),
59 PriceEntry {
60 price,
61 prev_day_px: Some(price),
62 updated_at: Instant::now(),
63 },
64 );
65 }
66 }
67 Self {
68 ws_url: String::new(),
69 coins,
70 prices: Arc::new(RwLock::new(prices)),
71 }
72 }
73
74 pub async fn refresh_mock_timestamps(&self) {
77 let mut prices = self.prices.write().await;
78 for entry in prices.values_mut() {
79 entry.updated_at = Instant::now();
80 }
81 }
82
83 pub async fn get_price(&self, coin: &str) -> Option<f64> {
87 let prices = self.prices.read().await;
88 prices.get(coin).and_then(|entry| {
89 if entry.updated_at.elapsed() < Duration::from_secs(10) {
90 Some(entry.price)
91 } else {
92 None
93 }
94 })
95 }
96
97 pub async fn get_prev_day_price(&self, coin: &str) -> Option<f64> {
99 let prices = self.prices.read().await;
100 prices.get(coin).and_then(|entry| entry.prev_day_px)
101 }
102
103 pub async fn start(&self, mut shutdown_rx: tokio::sync::broadcast::Receiver<()>) {
107 let prices = self.prices.clone();
108 let ws_url = self.ws_url.clone();
109 let coins = self.coins.clone();
110
111 const BASE_BACKOFF: Duration = Duration::from_secs(1);
112 const MAX_BACKOFF: Duration = Duration::from_secs(30);
113 let mut consecutive_failures: u32 = 0;
114
115 loop {
116 let before = Instant::now();
117 tokio::select! {
118 _ = shutdown_rx.recv() => {
119 info!("HyperliquidWsFeed: shutdown signal received");
120 return;
121 }
122 _ = Self::run_connection(&ws_url, &coins, &prices) => {
123 if before.elapsed() > Duration::from_secs(60) {
126 consecutive_failures = 0;
127 }
128 consecutive_failures += 1;
129 let backoff = std::cmp::min(
130 BASE_BACKOFF * 2u32.saturating_pow(consecutive_failures - 1),
131 MAX_BACKOFF,
132 );
133 warn!(
134 "HyperliquidWsFeed: connection lost (failures: {}), reconnecting in {:?}",
135 consecutive_failures, backoff
136 );
137 tokio::select! {
138 _ = shutdown_rx.recv() => {
139 info!("HyperliquidWsFeed: shutdown during reconnect backoff");
140 return;
141 }
142 _ = tokio::time::sleep(backoff) => {}
143 }
144 }
145 }
146 }
147 }
148
149 async fn run_connection(
150 ws_url: &str,
151 coins: &[String],
152 prices: &Arc<RwLock<HashMap<String, PriceEntry>>>,
153 ) {
154 info!("HyperliquidWsFeed: connecting to {}", ws_url);
155
156 let (ws_stream, _) = match tokio_tungstenite::connect_async(ws_url).await {
157 Ok(conn) => {
158 info!("HyperliquidWsFeed: connected");
159 conn
160 }
161 Err(e) => {
162 error!("HyperliquidWsFeed: connection failed: {}", e);
163 return;
164 }
165 };
166
167 let (mut write, mut read) = ws_stream.split();
168
169 for coin in coins {
171 let sub = WsSubscription {
172 method: "subscribe".to_string(),
173 subscription: ActiveAssetCtxSub {
174 sub_type: "activeAssetCtx".to_string(),
175 coin: coin.clone(),
176 },
177 };
178 let msg = sonic_rs::to_string(&sub).expect("serialize subscription");
179 if let Err(e) = write.send(Message::Text(msg)).await {
180 error!("HyperliquidWsFeed: failed to subscribe to {}: {}", coin, e);
181 return;
182 }
183 info!("HyperliquidWsFeed: subscribed to activeAssetCtx:{}", coin);
184 }
185
186 const WS_READ_TIMEOUT: Duration = Duration::from_secs(60);
192
193 loop {
194 match tokio::time::timeout(WS_READ_TIMEOUT, read.next()).await {
195 Ok(Some(msg)) => match msg {
196 Ok(Message::Text(text)) => {
197 Self::handle_message(&text, prices).await;
198 }
199 Ok(Message::Ping(data)) => {
200 if let Err(e) = write.send(Message::Pong(data)).await {
201 error!("HyperliquidWsFeed: failed to send pong: {}", e);
202 return;
203 }
204 }
205 Ok(Message::Close(_)) => {
206 info!("HyperliquidWsFeed: server sent close frame");
207 return;
208 }
209 Err(e) => {
210 error!("HyperliquidWsFeed: read error: {}", e);
211 return;
212 }
213 _ => {}
214 },
215 Ok(None) => {
216 info!("HyperliquidWsFeed: stream ended");
217 return;
218 }
219 Err(_) => {
220 warn!(
221 "HyperliquidWsFeed: read timeout ({}s) — no messages received, reconnecting",
222 WS_READ_TIMEOUT.as_secs()
223 );
224 return;
225 }
226 }
227 }
228 }
229
230 async fn handle_message(text: &str, prices: &Arc<RwLock<HashMap<String, PriceEntry>>>) {
231 let envelope: WsMessage = match sonic_rs::from_str(text) {
233 Ok(m) => m,
234 Err(_) => {
235 debug!(
237 "HyperliquidWsFeed: non-data message: {}",
238 &text[..text.len().min(200)]
239 );
240 return;
241 }
242 };
243
244 if !envelope.channel.starts_with("activeAssetCtx") {
246 return;
247 }
248
249 let ctx: ActiveAssetCtxData = match sonic_rs::from_value(&envelope.data) {
252 Ok(d) => d,
253 Err(e) => {
254 warn!(
255 "HyperliquidWsFeed: failed to parse activeAssetCtx data: {}",
256 e
257 );
258 return;
259 }
260 };
261
262 let coin = ctx.coin;
263 let prev_day_px = ctx
264 .ctx
265 .prev_day_px
266 .parse::<f64>()
267 .ok()
268 .filter(|v| v.is_finite() && *v > 0.0);
269 if let Some(oracle_price) = ctx
270 .ctx
271 .oracle_px
272 .parse::<f64>()
273 .ok()
274 .filter(|v| v.is_finite() && *v > 0.0)
275 {
276 debug!(
277 "HyperliquidWsFeed: {} oracle_px={} mark_px={} prev_day_px={:?}",
278 coin, oracle_price, ctx.ctx.mark_px, prev_day_px
279 );
280 let mut prices = prices.write().await;
281 let effective_prev_day_px =
283 prev_day_px.or_else(|| prices.get(&coin).and_then(|e| e.prev_day_px));
284 prices.insert(
285 coin,
286 PriceEntry {
287 price: oracle_price,
288 prev_day_px: effective_prev_day_px,
289 updated_at: Instant::now(),
290 },
291 );
292 }
293 }
294}
295
296#[derive(Serialize)]
299struct WsSubscription {
300 method: String,
301 subscription: ActiveAssetCtxSub,
302}
303
304#[derive(Serialize)]
305struct ActiveAssetCtxSub {
306 #[serde(rename = "type")]
307 sub_type: String,
308 coin: String,
309}
310
311#[derive(Deserialize)]
312struct WsMessage {
313 channel: String,
314 data: sonic_rs::Value,
315}
316
317#[derive(Deserialize)]
318struct ActiveAssetCtxData {
319 coin: String,
320 ctx: AssetCtxWs,
321}
322
323#[derive(Deserialize)]
324#[serde(rename_all = "camelCase")]
325struct AssetCtxWs {
326 oracle_px: String,
327 mark_px: String,
328 #[serde(default)]
329 prev_day_px: String,
330}
331
332#[async_trait::async_trait]
333impl crate::shared::service::Service for HyperliquidWsFeed {
334 fn name(&self) -> &'static str {
335 "HyperliquidWsFeed"
336 }
337
338 fn owner(&self) -> crate::shared::service::ServiceOwner {
339 crate::shared::service::ServiceOwner::Engine
340 }
341
342 async fn run(
343 self: std::sync::Arc<Self>,
344 shutdown: crate::shared::ShutdownRx,
345 ) -> anyhow::Result<()> {
346 self.start(shutdown).await;
347 Ok(())
348 }
349}
350
351#[cfg(test)]
352mod tests {
353 use super::*;
354
355 #[test]
356 fn test_parse_active_asset_ctx_message() {
357 let json = r#"{
358 "channel": "activeAssetCtx:BTC",
359 "data": {
360 "coin": "BTC",
361 "ctx": {
362 "funding": "0.0001",
363 "openInterest": "500000",
364 "prevDayPx": "64000.0",
365 "dayNtlVlm": "1000000000",
366 "premium": "0.001",
367 "oraclePx": "64500.5",
368 "markPx": "64550.0"
369 }
370 }
371 }"#;
372
373 let msg: WsMessage = sonic_rs::from_str(json).unwrap();
374 assert_eq!(msg.channel, "activeAssetCtx:BTC");
375
376 let data: ActiveAssetCtxData = sonic_rs::from_value(&msg.data).unwrap();
377 assert_eq!(data.coin, "BTC");
378 assert_eq!(data.ctx.oracle_px, "64500.5");
379 assert_eq!(data.ctx.mark_px, "64550.0");
380 }
381
382 #[test]
383 fn test_subscription_serialization() {
384 let sub = WsSubscription {
385 method: "subscribe".to_string(),
386 subscription: ActiveAssetCtxSub {
387 sub_type: "activeAssetCtx".to_string(),
388 coin: "ETH".to_string(),
389 },
390 };
391 let json = sonic_rs::to_string(&sub).unwrap();
392 assert!(json.contains("\"method\":\"subscribe\""));
393 assert!(json.contains("\"type\":\"activeAssetCtx\""));
394 assert!(json.contains("\"coin\":\"ETH\""));
395 }
396}