Skip to main content

hypercall/price_oracle/
hyperliquid_ws.rs

1//! WebSocket-based price feed from Hyperliquid.
2//!
3//! Maintains a single persistent WebSocket connection to the Hyperliquid public API,
4//! subscribing to `activeAssetCtx` for configured coins. This replaces HTTP polling
5//! of `metaAndAssetCtxs` which isn't available on self-hosted nodes and hits rate limits.
6
7use std::collections::HashMap;
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10
11use futures::{SinkExt, StreamExt};
12use serde::{Deserialize, Serialize};
13use tokio::sync::RwLock;
14use tokio_tungstenite::tungstenite::Message;
15use tracing::{debug, error, info, warn};
16
17/// Default WebSocket URL for Hyperliquid mainnet.
18pub const DEFAULT_WS_URL: &str = "wss://api.hyperliquid.xyz/ws";
19
20/// Default WebSocket URL for Hyperliquid testnet.
21pub const DEFAULT_TESTNET_WS_URL: &str = "wss://api.hyperliquid-testnet.xyz/ws";
22
23/// A cached oracle price with its timestamp.
24#[derive(Debug, Clone)]
25struct PriceEntry {
26    price: f64,
27    prev_day_px: Option<f64>,
28    updated_at: Instant,
29}
30
31/// Shared WebSocket price feed for Hyperliquid oracle prices.
32///
33/// One instance serves multiple oracle consumers. Subscribes to `activeAssetCtx`
34/// for each configured coin and stores the latest oracle price.
35pub struct HyperliquidWsFeed {
36    ws_url: String,
37    coins: Vec<String>,
38    prices: Arc<RwLock<HashMap<String, PriceEntry>>>,
39}
40
41impl HyperliquidWsFeed {
42    pub fn new(ws_url: String, coins: Vec<String>) -> Self {
43        Self {
44            ws_url,
45            coins,
46            prices: Arc::new(RwLock::new(HashMap::new())),
47        }
48    }
49
50    /// Create a mock feed with default prices for each coin. Used in test mode
51    /// so that CatalogManager can create markets without a live WS connection.
52    /// Prices are refreshed on every `get_price` call so they never go stale.
53    pub fn new_mock(coins: Vec<String>, default_prices: &HashMap<String, f64>) -> Self {
54        let mut prices = HashMap::new();
55        for coin in &coins {
56            if let Some(&price) = default_prices.get(coin) {
57                prices.insert(
58                    coin.clone(),
59                    PriceEntry {
60                        price,
61                        prev_day_px: Some(price),
62                        updated_at: Instant::now(),
63                    },
64                );
65            }
66        }
67        Self {
68            ws_url: String::new(),
69            coins,
70            prices: Arc::new(RwLock::new(prices)),
71        }
72    }
73
74    /// Refresh the `updated_at` timestamp on all mock prices so they never go stale.
75    /// Called periodically in test mode since `get_price` returns None after 10s.
76    pub async fn refresh_mock_timestamps(&self) {
77        let mut prices = self.prices.write().await;
78        for entry in prices.values_mut() {
79            entry.updated_at = Instant::now();
80        }
81    }
82
83    /// Get the latest oracle price for a coin.
84    ///
85    /// Returns None if no price has been received or if the price is stale (>10s old).
86    pub async fn get_price(&self, coin: &str) -> Option<f64> {
87        let prices = self.prices.read().await;
88        prices.get(coin).and_then(|entry| {
89            if entry.updated_at.elapsed() < Duration::from_secs(10) {
90                Some(entry.price)
91            } else {
92                None
93            }
94        })
95    }
96
97    /// Get the latest previous-day price for a coin from the WS feed.
98    pub async fn get_prev_day_price(&self, coin: &str) -> Option<f64> {
99        let prices = self.prices.read().await;
100        prices.get(coin).and_then(|entry| entry.prev_day_px)
101    }
102
103    /// Start the WebSocket feed. Runs until shutdown signal.
104    ///
105    /// Auto-reconnects with exponential backoff on disconnection.
106    pub async fn start(&self, mut shutdown_rx: tokio::sync::broadcast::Receiver<()>) {
107        let prices = self.prices.clone();
108        let ws_url = self.ws_url.clone();
109        let coins = self.coins.clone();
110
111        const BASE_BACKOFF: Duration = Duration::from_secs(1);
112        const MAX_BACKOFF: Duration = Duration::from_secs(30);
113        let mut consecutive_failures: u32 = 0;
114
115        loop {
116            let before = Instant::now();
117            tokio::select! {
118                _ = shutdown_rx.recv() => {
119                    info!("HyperliquidWsFeed: shutdown signal received");
120                    return;
121                }
122                _ = Self::run_connection(&ws_url, &coins, &prices) => {
123                    // If the connection stayed up for >60s it was healthy;
124                    // reset backoff so the next reconnect is fast.
125                    if before.elapsed() > Duration::from_secs(60) {
126                        consecutive_failures = 0;
127                    }
128                    consecutive_failures += 1;
129                    let backoff = std::cmp::min(
130                        BASE_BACKOFF * 2u32.saturating_pow(consecutive_failures - 1),
131                        MAX_BACKOFF,
132                    );
133                    warn!(
134                        "HyperliquidWsFeed: connection lost (failures: {}), reconnecting in {:?}",
135                        consecutive_failures, backoff
136                    );
137                    tokio::select! {
138                        _ = shutdown_rx.recv() => {
139                            info!("HyperliquidWsFeed: shutdown during reconnect backoff");
140                            return;
141                        }
142                        _ = tokio::time::sleep(backoff) => {}
143                    }
144                }
145            }
146        }
147    }
148
149    async fn run_connection(
150        ws_url: &str,
151        coins: &[String],
152        prices: &Arc<RwLock<HashMap<String, PriceEntry>>>,
153    ) {
154        info!("HyperliquidWsFeed: connecting to {}", ws_url);
155
156        let (ws_stream, _) = match tokio_tungstenite::connect_async(ws_url).await {
157            Ok(conn) => {
158                info!("HyperliquidWsFeed: connected");
159                conn
160            }
161            Err(e) => {
162                error!("HyperliquidWsFeed: connection failed: {}", e);
163                return;
164            }
165        };
166
167        let (mut write, mut read) = ws_stream.split();
168
169        // Subscribe to activeAssetCtx for each coin
170        for coin in coins {
171            let sub = WsSubscription {
172                method: "subscribe".to_string(),
173                subscription: ActiveAssetCtxSub {
174                    sub_type: "activeAssetCtx".to_string(),
175                    coin: coin.clone(),
176                },
177            };
178            let msg = sonic_rs::to_string(&sub).expect("serialize subscription");
179            if let Err(e) = write.send(Message::Text(msg)).await {
180                error!("HyperliquidWsFeed: failed to subscribe to {}: {}", coin, e);
181                return;
182            }
183            info!("HyperliquidWsFeed: subscribed to activeAssetCtx:{}", coin);
184        }
185
186        // Read messages with a read timeout to detect hung connections.
187        // Without this, a half-open connection (connected but not sending data)
188        // blocks read.next() forever, preventing reconnect. This was the root
189        // cause of CALL-720 (22h stale spot prices in the market-maker) and
190        // CALL-731 (same issue in the API server).
191        const WS_READ_TIMEOUT: Duration = Duration::from_secs(60);
192
193        loop {
194            match tokio::time::timeout(WS_READ_TIMEOUT, read.next()).await {
195                Ok(Some(msg)) => match msg {
196                    Ok(Message::Text(text)) => {
197                        Self::handle_message(&text, prices).await;
198                    }
199                    Ok(Message::Ping(data)) => {
200                        if let Err(e) = write.send(Message::Pong(data)).await {
201                            error!("HyperliquidWsFeed: failed to send pong: {}", e);
202                            return;
203                        }
204                    }
205                    Ok(Message::Close(_)) => {
206                        info!("HyperliquidWsFeed: server sent close frame");
207                        return;
208                    }
209                    Err(e) => {
210                        error!("HyperliquidWsFeed: read error: {}", e);
211                        return;
212                    }
213                    _ => {}
214                },
215                Ok(None) => {
216                    info!("HyperliquidWsFeed: stream ended");
217                    return;
218                }
219                Err(_) => {
220                    warn!(
221                        "HyperliquidWsFeed: read timeout ({}s) — no messages received, reconnecting",
222                        WS_READ_TIMEOUT.as_secs()
223                    );
224                    return;
225                }
226            }
227        }
228    }
229
230    async fn handle_message(text: &str, prices: &Arc<RwLock<HashMap<String, PriceEntry>>>) {
231        // Parse the WS message envelope
232        let envelope: WsMessage = match sonic_rs::from_str(text) {
233            Ok(m) => m,
234            Err(_) => {
235                // Subscription confirmations and other non-data messages
236                debug!(
237                    "HyperliquidWsFeed: non-data message: {}",
238                    &text[..text.len().min(200)]
239                );
240                return;
241            }
242        };
243
244        // We only care about activeAssetCtx channels
245        if !envelope.channel.starts_with("activeAssetCtx") {
246            return;
247        }
248
249        // Extract coin from channel name: "activeAssetCtx:BTC" -> "BTC"
250        // The channel is "activeAssetCtx" but coin comes from the data
251        let ctx: ActiveAssetCtxData = match sonic_rs::from_value(&envelope.data) {
252            Ok(d) => d,
253            Err(e) => {
254                warn!(
255                    "HyperliquidWsFeed: failed to parse activeAssetCtx data: {}",
256                    e
257                );
258                return;
259            }
260        };
261
262        let coin = ctx.coin;
263        let prev_day_px = ctx
264            .ctx
265            .prev_day_px
266            .parse::<f64>()
267            .ok()
268            .filter(|v| v.is_finite() && *v > 0.0);
269        if let Some(oracle_price) = ctx
270            .ctx
271            .oracle_px
272            .parse::<f64>()
273            .ok()
274            .filter(|v| v.is_finite() && *v > 0.0)
275        {
276            debug!(
277                "HyperliquidWsFeed: {} oracle_px={} mark_px={} prev_day_px={:?}",
278                coin, oracle_price, ctx.ctx.mark_px, prev_day_px
279            );
280            let mut prices = prices.write().await;
281            // Preserve existing prev_day_px if the WS message omitted it
282            let effective_prev_day_px =
283                prev_day_px.or_else(|| prices.get(&coin).and_then(|e| e.prev_day_px));
284            prices.insert(
285                coin,
286                PriceEntry {
287                    price: oracle_price,
288                    prev_day_px: effective_prev_day_px,
289                    updated_at: Instant::now(),
290                },
291            );
292        }
293    }
294}
295
296// --- Serde types for WS protocol ---
297
298#[derive(Serialize)]
299struct WsSubscription {
300    method: String,
301    subscription: ActiveAssetCtxSub,
302}
303
304#[derive(Serialize)]
305struct ActiveAssetCtxSub {
306    #[serde(rename = "type")]
307    sub_type: String,
308    coin: String,
309}
310
311#[derive(Deserialize)]
312struct WsMessage {
313    channel: String,
314    data: sonic_rs::Value,
315}
316
317#[derive(Deserialize)]
318struct ActiveAssetCtxData {
319    coin: String,
320    ctx: AssetCtxWs,
321}
322
323#[derive(Deserialize)]
324#[serde(rename_all = "camelCase")]
325struct AssetCtxWs {
326    oracle_px: String,
327    mark_px: String,
328    #[serde(default)]
329    prev_day_px: String,
330}
331
332#[async_trait::async_trait]
333impl crate::shared::service::Service for HyperliquidWsFeed {
334    fn name(&self) -> &'static str {
335        "HyperliquidWsFeed"
336    }
337
338    fn owner(&self) -> crate::shared::service::ServiceOwner {
339        crate::shared::service::ServiceOwner::Engine
340    }
341
342    async fn run(
343        self: std::sync::Arc<Self>,
344        shutdown: crate::shared::ShutdownRx,
345    ) -> anyhow::Result<()> {
346        self.start(shutdown).await;
347        Ok(())
348    }
349}
350
351#[cfg(test)]
352mod tests {
353    use super::*;
354
355    #[test]
356    fn test_parse_active_asset_ctx_message() {
357        let json = r#"{
358            "channel": "activeAssetCtx:BTC",
359            "data": {
360                "coin": "BTC",
361                "ctx": {
362                    "funding": "0.0001",
363                    "openInterest": "500000",
364                    "prevDayPx": "64000.0",
365                    "dayNtlVlm": "1000000000",
366                    "premium": "0.001",
367                    "oraclePx": "64500.5",
368                    "markPx": "64550.0"
369                }
370            }
371        }"#;
372
373        let msg: WsMessage = sonic_rs::from_str(json).unwrap();
374        assert_eq!(msg.channel, "activeAssetCtx:BTC");
375
376        let data: ActiveAssetCtxData = sonic_rs::from_value(&msg.data).unwrap();
377        assert_eq!(data.coin, "BTC");
378        assert_eq!(data.ctx.oracle_px, "64500.5");
379        assert_eq!(data.ctx.mark_px, "64550.0");
380    }
381
382    #[test]
383    fn test_subscription_serialization() {
384        let sub = WsSubscription {
385            method: "subscribe".to_string(),
386            subscription: ActiveAssetCtxSub {
387                sub_type: "activeAssetCtx".to_string(),
388                coin: "ETH".to_string(),
389            },
390        };
391        let json = sonic_rs::to_string(&sub).unwrap();
392        assert!(json.contains("\"method\":\"subscribe\""));
393        assert!(json.contains("\"type\":\"activeAssetCtx\""));
394        assert!(json.contains("\"coin\":\"ETH\""));
395    }
396}