Skip to main content

hypercall_api/caches/
sparklines.rs

1use anyhow::{anyhow, Result};
2use arc_swap::ArcSwap;
3use axum::body::Bytes;
4use serde::Serialize;
5use std::collections::HashMap;
6use std::sync::Arc;
7use std::time::{Duration, Instant};
8use tracing::error;
9
10const DEFAULT_REFRESH_INTERVAL_MS: u64 = 15_000;
11const SPARKLINES_SNAPSHOT_ENVELOPE_SCHEMA_VERSION: u32 = 1;
12const DEFAULT_HYPERLIQUID_INFO_URL: &str = "https://api.hyperliquid.xyz/info";
13const DEFAULT_HYPERLIQUID_TESTNET_INFO_URL: &str = "https://api.hyperliquid-testnet.xyz/info";
14const HL_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
15const SPARKLINES_INTERVAL: &str = "2h";
16const SPARKLINES_LOOKBACK_MS: i64 = 2 * 24 * 60 * 60 * 1000;
17
18/// Default ticker→candle-coin mapping matching frontend ASSET_METADATA.
19const DEFAULT_TICKER_MAP: &[(&str, &str)] = &[
20    ("BTC", "BTC"),
21    ("ETH", "ETH"),
22    ("HYPE", "HYPE"),
23    ("US500", "km:US500"),
24    ("USOIL", "km:USOIL"),
25];
26
27#[derive(Clone)]
28struct SparklinesSnapshotData {
29    response_bytes_upstash: Bytes,
30    built_at: std::time::SystemTime,
31}
32
33pub struct SparklinesSnapshotCache {
34    http_client: reqwest::Client,
35    info_url: String,
36    tickers: Vec<(String, String)>,
37    snapshot: ArcSwap<SparklinesSnapshotData>,
38    refresh_interval: Duration,
39}
40
41#[derive(Serialize)]
42struct PublishedSparklinesSnapshot {
43    schema_version: u32,
44    built_at_ms: u64,
45    interval: &'static str,
46    tickers: HashMap<String, serde_json::Value>,
47}
48
49#[derive(Serialize)]
50#[serde(rename_all = "camelCase")]
51struct CandleSnapshotReq {
52    coin: String,
53    interval: &'static str,
54    start_time: i64,
55    end_time: i64,
56}
57
58#[derive(Serialize)]
59struct CandleSnapshotEnvelope {
60    #[serde(rename = "type")]
61    request_type: &'static str,
62    req: CandleSnapshotReq,
63}
64
65impl SparklinesSnapshotCache {
66    pub fn from_config(
67        pricing: &hypercall_config::PricingConfig,
68        is_testnet_mode: bool,
69    ) -> Result<Self> {
70        let info_url = if pricing.hyperliquid_info_url.is_empty() {
71            if is_testnet_mode {
72                DEFAULT_HYPERLIQUID_TESTNET_INFO_URL.to_string()
73            } else {
74                DEFAULT_HYPERLIQUID_INFO_URL.to_string()
75            }
76        } else {
77            pricing.hyperliquid_info_url.clone()
78        };
79
80        let tickers = parse_ticker_map_from_env();
81
82        let http_client = reqwest::Client::builder()
83            .timeout(HL_REQUEST_TIMEOUT)
84            .build()
85            .map_err(|e| anyhow!("Failed to build HTTP client for sparklines: {}", e))?;
86
87        let empty_payload = Bytes::from_static(
88            br#"{"schema_version":1,"built_at_ms":0,"interval":"2h","tickers":{}}"#,
89        );
90
91        Ok(Self {
92            http_client,
93            info_url,
94            tickers,
95            snapshot: ArcSwap::from_pointee(SparklinesSnapshotData {
96                response_bytes_upstash: empty_payload,
97                built_at: std::time::SystemTime::UNIX_EPOCH,
98            }),
99            refresh_interval: Duration::from_millis(DEFAULT_REFRESH_INTERVAL_MS),
100        })
101    }
102
103    pub fn with_refresh_interval(mut self, refresh_interval: Duration) -> Self {
104        self.refresh_interval = refresh_interval;
105        self
106    }
107
108    pub async fn refresh_once(&self) -> Result<()> {
109        let start = Instant::now();
110        let now_ms = std::time::SystemTime::now()
111            .duration_since(std::time::SystemTime::UNIX_EPOCH)
112            .map_err(|e| anyhow!("System time error: {}", e))?
113            .as_millis() as i64;
114        let start_time = now_ms - SPARKLINES_LOOKBACK_MS;
115
116        let fetches: Vec<_> = self
117            .tickers
118            .iter()
119            .map(|(ticker, coin)| {
120                let client = self.http_client.clone();
121                let url = self.info_url.clone();
122                let coin = coin.clone();
123                let ticker = ticker.clone();
124                async move {
125                    let body = CandleSnapshotEnvelope {
126                        request_type: "candleSnapshot",
127                        req: CandleSnapshotReq {
128                            coin,
129                            interval: SPARKLINES_INTERVAL,
130                            start_time,
131                            end_time: now_ms,
132                        },
133                    };
134                    match client.post(&url).json(&body).send().await {
135                        Ok(resp) if resp.status().is_success() => {
136                            match resp.json::<serde_json::Value>().await {
137                                Ok(val) if val.is_array() => (ticker, val),
138                                Ok(_) => {
139                                    metrics::counter!("ht_sparklines_fetch_total", "status" => "bad_response").increment(1);
140                                    (ticker, serde_json::Value::Array(vec![]))
141                                }
142                                Err(e) => {
143                                    metrics::counter!("ht_sparklines_fetch_total", "status" => "parse_error").increment(1);
144                                    tracing::warn!(error = %e, "Failed to parse sparkline response");
145                                    (ticker, serde_json::Value::Array(vec![]))
146                                }
147                            }
148                        }
149                        Ok(resp) => {
150                            metrics::counter!("ht_sparklines_fetch_total", "status" => "http_error").increment(1);
151                            tracing::warn!(status = %resp.status(), "HL API returned error for sparklines");
152                            (ticker, serde_json::Value::Array(vec![]))
153                        }
154                        Err(e) => {
155                            metrics::counter!("ht_sparklines_fetch_total", "status" => "network_error").increment(1);
156                            tracing::warn!(error = %e, "Failed to fetch sparkline from HL API");
157                            (ticker, serde_json::Value::Array(vec![]))
158                        }
159                    }
160                }
161            })
162            .collect();
163
164        let results = futures::future::join_all(fetches).await;
165
166        let mut tickers_map = HashMap::with_capacity(results.len());
167        for (ticker, candles) in results {
168            tickers_map.insert(ticker, candles);
169        }
170
171        let built_at = std::time::SystemTime::now();
172        let envelope = PublishedSparklinesSnapshot {
173            schema_version: SPARKLINES_SNAPSHOT_ENVELOPE_SCHEMA_VERSION,
174            built_at_ms: system_time_to_millis(built_at)?,
175            interval: SPARKLINES_INTERVAL,
176            tickers: tickers_map,
177        };
178        let response_bytes_upstash = Bytes::from(
179            serde_json::to_vec(&envelope)
180                .map_err(|e| anyhow!("Failed to serialize sparklines snapshot: {}", e))?,
181        );
182
183        self.snapshot.store(Arc::new(SparklinesSnapshotData {
184            response_bytes_upstash,
185            built_at,
186        }));
187
188        metrics::counter!("ht_sparklines_snapshot_refresh_total", "status" => "success")
189            .increment(1);
190        metrics::histogram!("ht_sparklines_snapshot_refresh_seconds")
191            .record(start.elapsed().as_secs_f64());
192        Ok(())
193    }
194
195    pub async fn initialize(&self) {
196        if let Err(e) = self.refresh_once().await {
197            metrics::counter!("ht_sparklines_snapshot_refresh_total", "status" => "error")
198                .increment(1);
199            error!(error = %e, "Initial sparklines snapshot build failed; serving cold-start payload");
200        }
201    }
202
203    pub fn start_with_shutdown(
204        self: Arc<Self>,
205        mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
206    ) -> tokio::task::JoinHandle<()> {
207        tokio::spawn(async move {
208            let mut interval = tokio::time::interval(self.refresh_interval);
209            loop {
210                tokio::select! {
211                    _ = shutdown_rx.recv() => {
212                        break;
213                    }
214                    _ = interval.tick() => {
215                        if let Err(e) = self.refresh_once().await {
216                            metrics::counter!("ht_sparklines_snapshot_refresh_total", "status" => "error").increment(1);
217                            error!(error = %e, "Failed to refresh sparklines snapshot; keeping last-good");
218                        }
219                    }
220                }
221            }
222        })
223    }
224
225    pub fn published_response(&self) -> (Bytes, std::time::SystemTime) {
226        let snap = self.snapshot.load();
227        (snap.response_bytes_upstash.clone(), snap.built_at)
228    }
229}
230
231fn parse_ticker_map_from_env() -> Vec<(String, String)> {
232    if let Ok(raw) = std::env::var("SPARKLINES_UPSTASH_TICKERS") {
233        let parsed: Vec<(String, String)> = raw
234            .split(',')
235            .filter_map(|entry| {
236                let mut parts = entry.splitn(2, ':');
237                let ticker = parts.next()?.trim();
238                let coin = parts.next()?.trim();
239                if ticker.is_empty() || coin.is_empty() {
240                    None
241                } else {
242                    Some((ticker.to_string(), coin.to_string()))
243                }
244            })
245            .collect();
246        if !parsed.is_empty() {
247            return parsed;
248        }
249    }
250
251    DEFAULT_TICKER_MAP
252        .iter()
253        .map(|(t, c)| (t.to_string(), c.to_string()))
254        .collect()
255}
256
257fn system_time_to_millis(value: std::time::SystemTime) -> Result<u64> {
258    Ok(value
259        .duration_since(std::time::SystemTime::UNIX_EPOCH)
260        .map_err(|e| anyhow!("Invalid snapshot build time: {}", e))?
261        .as_millis() as u64)
262}