1use anyhow::{anyhow, Result};
2use arc_swap::ArcSwap;
3use axum::body::Bytes;
4use serde::Serialize;
5use std::collections::HashMap;
6use std::sync::Arc;
7use std::time::{Duration, Instant};
8use tracing::error;
9
10const DEFAULT_REFRESH_INTERVAL_MS: u64 = 15_000;
11const SPARKLINES_SNAPSHOT_ENVELOPE_SCHEMA_VERSION: u32 = 1;
12const DEFAULT_HYPERLIQUID_INFO_URL: &str = "https://api.hyperliquid.xyz/info";
13const DEFAULT_HYPERLIQUID_TESTNET_INFO_URL: &str = "https://api.hyperliquid-testnet.xyz/info";
14const HL_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
15const SPARKLINES_INTERVAL: &str = "2h";
16const SPARKLINES_LOOKBACK_MS: i64 = 2 * 24 * 60 * 60 * 1000;
17
18const DEFAULT_TICKER_MAP: &[(&str, &str)] = &[
20 ("BTC", "BTC"),
21 ("ETH", "ETH"),
22 ("HYPE", "HYPE"),
23 ("US500", "km:US500"),
24 ("USOIL", "km:USOIL"),
25];
26
27#[derive(Clone)]
28struct SparklinesSnapshotData {
29 response_bytes_upstash: Bytes,
30 built_at: std::time::SystemTime,
31}
32
33pub struct SparklinesSnapshotCache {
34 http_client: reqwest::Client,
35 info_url: String,
36 tickers: Vec<(String, String)>,
37 snapshot: ArcSwap<SparklinesSnapshotData>,
38 refresh_interval: Duration,
39}
40
41#[derive(Serialize)]
42struct PublishedSparklinesSnapshot {
43 schema_version: u32,
44 built_at_ms: u64,
45 interval: &'static str,
46 tickers: HashMap<String, serde_json::Value>,
47}
48
49#[derive(Serialize)]
50#[serde(rename_all = "camelCase")]
51struct CandleSnapshotReq {
52 coin: String,
53 interval: &'static str,
54 start_time: i64,
55 end_time: i64,
56}
57
58#[derive(Serialize)]
59struct CandleSnapshotEnvelope {
60 #[serde(rename = "type")]
61 request_type: &'static str,
62 req: CandleSnapshotReq,
63}
64
65impl SparklinesSnapshotCache {
66 pub fn from_config(
67 pricing: &hypercall_config::PricingConfig,
68 is_testnet_mode: bool,
69 ) -> Result<Self> {
70 let info_url = if pricing.hyperliquid_info_url.is_empty() {
71 if is_testnet_mode {
72 DEFAULT_HYPERLIQUID_TESTNET_INFO_URL.to_string()
73 } else {
74 DEFAULT_HYPERLIQUID_INFO_URL.to_string()
75 }
76 } else {
77 pricing.hyperliquid_info_url.clone()
78 };
79
80 let tickers = parse_ticker_map_from_env();
81
82 let http_client = reqwest::Client::builder()
83 .timeout(HL_REQUEST_TIMEOUT)
84 .build()
85 .map_err(|e| anyhow!("Failed to build HTTP client for sparklines: {}", e))?;
86
87 let empty_payload = Bytes::from_static(
88 br#"{"schema_version":1,"built_at_ms":0,"interval":"2h","tickers":{}}"#,
89 );
90
91 Ok(Self {
92 http_client,
93 info_url,
94 tickers,
95 snapshot: ArcSwap::from_pointee(SparklinesSnapshotData {
96 response_bytes_upstash: empty_payload,
97 built_at: std::time::SystemTime::UNIX_EPOCH,
98 }),
99 refresh_interval: Duration::from_millis(DEFAULT_REFRESH_INTERVAL_MS),
100 })
101 }
102
103 pub fn with_refresh_interval(mut self, refresh_interval: Duration) -> Self {
104 self.refresh_interval = refresh_interval;
105 self
106 }
107
108 pub async fn refresh_once(&self) -> Result<()> {
109 let start = Instant::now();
110 let now_ms = std::time::SystemTime::now()
111 .duration_since(std::time::SystemTime::UNIX_EPOCH)
112 .map_err(|e| anyhow!("System time error: {}", e))?
113 .as_millis() as i64;
114 let start_time = now_ms - SPARKLINES_LOOKBACK_MS;
115
116 let fetches: Vec<_> = self
117 .tickers
118 .iter()
119 .map(|(ticker, coin)| {
120 let client = self.http_client.clone();
121 let url = self.info_url.clone();
122 let coin = coin.clone();
123 let ticker = ticker.clone();
124 async move {
125 let body = CandleSnapshotEnvelope {
126 request_type: "candleSnapshot",
127 req: CandleSnapshotReq {
128 coin,
129 interval: SPARKLINES_INTERVAL,
130 start_time,
131 end_time: now_ms,
132 },
133 };
134 match client.post(&url).json(&body).send().await {
135 Ok(resp) if resp.status().is_success() => {
136 match resp.json::<serde_json::Value>().await {
137 Ok(val) if val.is_array() => (ticker, val),
138 Ok(_) => {
139 metrics::counter!("ht_sparklines_fetch_total", "status" => "bad_response").increment(1);
140 (ticker, serde_json::Value::Array(vec![]))
141 }
142 Err(e) => {
143 metrics::counter!("ht_sparklines_fetch_total", "status" => "parse_error").increment(1);
144 tracing::warn!(error = %e, "Failed to parse sparkline response");
145 (ticker, serde_json::Value::Array(vec![]))
146 }
147 }
148 }
149 Ok(resp) => {
150 metrics::counter!("ht_sparklines_fetch_total", "status" => "http_error").increment(1);
151 tracing::warn!(status = %resp.status(), "HL API returned error for sparklines");
152 (ticker, serde_json::Value::Array(vec![]))
153 }
154 Err(e) => {
155 metrics::counter!("ht_sparklines_fetch_total", "status" => "network_error").increment(1);
156 tracing::warn!(error = %e, "Failed to fetch sparkline from HL API");
157 (ticker, serde_json::Value::Array(vec![]))
158 }
159 }
160 }
161 })
162 .collect();
163
164 let results = futures::future::join_all(fetches).await;
165
166 let mut tickers_map = HashMap::with_capacity(results.len());
167 for (ticker, candles) in results {
168 tickers_map.insert(ticker, candles);
169 }
170
171 let built_at = std::time::SystemTime::now();
172 let envelope = PublishedSparklinesSnapshot {
173 schema_version: SPARKLINES_SNAPSHOT_ENVELOPE_SCHEMA_VERSION,
174 built_at_ms: system_time_to_millis(built_at)?,
175 interval: SPARKLINES_INTERVAL,
176 tickers: tickers_map,
177 };
178 let response_bytes_upstash = Bytes::from(
179 serde_json::to_vec(&envelope)
180 .map_err(|e| anyhow!("Failed to serialize sparklines snapshot: {}", e))?,
181 );
182
183 self.snapshot.store(Arc::new(SparklinesSnapshotData {
184 response_bytes_upstash,
185 built_at,
186 }));
187
188 metrics::counter!("ht_sparklines_snapshot_refresh_total", "status" => "success")
189 .increment(1);
190 metrics::histogram!("ht_sparklines_snapshot_refresh_seconds")
191 .record(start.elapsed().as_secs_f64());
192 Ok(())
193 }
194
195 pub async fn initialize(&self) {
196 if let Err(e) = self.refresh_once().await {
197 metrics::counter!("ht_sparklines_snapshot_refresh_total", "status" => "error")
198 .increment(1);
199 error!(error = %e, "Initial sparklines snapshot build failed; serving cold-start payload");
200 }
201 }
202
203 pub fn start_with_shutdown(
204 self: Arc<Self>,
205 mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
206 ) -> tokio::task::JoinHandle<()> {
207 tokio::spawn(async move {
208 let mut interval = tokio::time::interval(self.refresh_interval);
209 loop {
210 tokio::select! {
211 _ = shutdown_rx.recv() => {
212 break;
213 }
214 _ = interval.tick() => {
215 if let Err(e) = self.refresh_once().await {
216 metrics::counter!("ht_sparklines_snapshot_refresh_total", "status" => "error").increment(1);
217 error!(error = %e, "Failed to refresh sparklines snapshot; keeping last-good");
218 }
219 }
220 }
221 }
222 })
223 }
224
225 pub fn published_response(&self) -> (Bytes, std::time::SystemTime) {
226 let snap = self.snapshot.load();
227 (snap.response_bytes_upstash.clone(), snap.built_at)
228 }
229}
230
231fn parse_ticker_map_from_env() -> Vec<(String, String)> {
232 if let Ok(raw) = std::env::var("SPARKLINES_UPSTASH_TICKERS") {
233 let parsed: Vec<(String, String)> = raw
234 .split(',')
235 .filter_map(|entry| {
236 let mut parts = entry.splitn(2, ':');
237 let ticker = parts.next()?.trim();
238 let coin = parts.next()?.trim();
239 if ticker.is_empty() || coin.is_empty() {
240 None
241 } else {
242 Some((ticker.to_string(), coin.to_string()))
243 }
244 })
245 .collect();
246 if !parsed.is_empty() {
247 return parsed;
248 }
249 }
250
251 DEFAULT_TICKER_MAP
252 .iter()
253 .map(|(t, c)| (t.to_string(), c.to_string()))
254 .collect()
255}
256
257fn system_time_to_millis(value: std::time::SystemTime) -> Result<u64> {
258 Ok(value
259 .duration_since(std::time::SystemTime::UNIX_EPOCH)
260 .map_err(|e| anyhow!("Invalid snapshot build time: {}", e))?
261 .as_millis() as u64)
262}