1use std::collections::HashMap;
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::{Arc, RwLock};
10use std::time::Duration;
11
12use anyhow::{Context, Result};
13use chrono::Utc;
14use metrics::counter;
15use reqwest::Client;
16use serde::{Deserialize, Serialize};
17use tokio::task::JoinHandle;
18use tracing::{error, info, warn};
19
20use super::risk_oracle::{
21 RiskVolOracle, VolLookupError, VolOracleStatus, VolProviderKind, VolSurfaceSnapshot,
22};
23use super::vol_surface_cache::VolPoint;
24
25#[derive(Debug, Clone)]
26pub struct RealizedVolOracleConfig {
27 pub info_url: String,
28 pub candle_coin: String,
29 pub lookback_days: u32,
30 pub min_samples: usize,
31 pub annualization_days: f64,
32 pub floor_iv: f64,
33 pub cap_iv: f64,
34 pub multiplier: f64,
35 pub poll_interval: Duration,
36 pub staleness_threshold: Duration,
37 pub strike_min: f64,
38 pub strike_max: f64,
39 pub strike_step: f64,
40 pub symbols: Vec<String>,
41}
42
43#[derive(Debug, Clone)]
44struct RealizedVolState {
45 iv: f64,
46 spot: f64,
47 sample_count: usize,
48 last_update_ts_ms: i64,
49}
50
51#[derive(Debug, Default)]
52struct RealizedVolOracleState {
53 values: HashMap<String, RealizedVolState>,
54 connected: HashMap<String, bool>,
55 last_error: HashMap<String, String>,
56}
57
58#[derive(Debug, Serialize)]
59struct CandleSnapshotRequest {
60 #[serde(rename = "type")]
61 request_type: &'static str,
62 req: CandleSnapshotRequestBody,
63}
64
65#[derive(Debug, Serialize)]
66#[serde(rename_all = "camelCase")]
67struct CandleSnapshotRequestBody {
68 coin: String,
69 interval: &'static str,
70 start_time: i64,
71 end_time: i64,
72}
73
74#[derive(Debug, Deserialize)]
75struct CandleSnapshot {
76 #[serde(rename = "t")]
77 start_time_ms: i64,
78 #[serde(rename = "c")]
79 close: String,
80}
81
82pub struct RealizedVolOracle {
83 client: Client,
84 config: RealizedVolOracleConfig,
85 state: Arc<RwLock<RealizedVolOracleState>>,
86 messages_received: AtomicU64,
87}
88
89impl RealizedVolOracle {
90 pub fn new(config: RealizedVolOracleConfig) -> Self {
91 Self {
92 client: Client::builder()
93 .timeout(Duration::from_secs(15))
94 .build()
95 .expect("failed to build HTTP client"),
96 config,
97 state: Arc::new(RwLock::new(RealizedVolOracleState::default())),
98 messages_received: AtomicU64::new(0),
99 }
100 }
101
102 pub fn start_polling(self: Arc<Self>) -> JoinHandle<()> {
103 tokio::spawn(async move {
104 let mut interval = tokio::time::interval(self.config.poll_interval);
105 loop {
106 interval.tick().await;
107 if let Err(err) = self.refresh_all().await {
108 error!("Realized vol oracle refresh failed: {err:#}");
109 }
110 }
111 })
112 }
113
114 async fn refresh_all(&self) -> Result<()> {
115 match self.fetch_realized_vol().await {
116 Ok(update) => {
117 let sample_count = update.sample_count;
118 let iv = update.iv;
119 let spot = update.spot;
120 {
121 let mut state = self
122 .state
123 .write()
124 .expect("realized vol oracle state poisoned");
125 for symbol in &self.config.symbols {
126 state.values.insert(symbol.clone(), update.clone());
127 state.connected.insert(symbol.clone(), true);
128 state.last_error.remove(symbol);
129 }
130 }
131 self.messages_received.fetch_add(1, Ordering::Relaxed);
132 for symbol in &self.config.symbols {
133 counter!(
134 "ht_vol_oracle_messages_received_total",
135 "provider" => VolProviderKind::RealizedVol.as_str(),
136 "underlying" => symbol.clone()
137 )
138 .increment(1);
139 }
140 info!(
141 coin = %self.config.candle_coin,
142 iv,
143 spot,
144 sample_count,
145 "Updated realized-vol oracle"
146 );
147 }
148 Err(err) => {
149 let message = err.to_string();
150 let mut state = self
151 .state
152 .write()
153 .expect("realized vol oracle state poisoned");
154 for symbol in &self.config.symbols {
155 state.connected.insert(symbol.clone(), false);
156 state.last_error.insert(symbol.clone(), message.clone());
157 if state.values.contains_key(symbol) {
158 warn!(
159 "Realized vol refresh failed for {} (keeping last good data): {}",
160 symbol, message
161 );
162 } else {
163 warn!("Realized vol refresh failed for {}: {}", symbol, message);
164 }
165 }
166 }
167 }
168 Ok(())
169 }
170
171 async fn fetch_realized_vol(&self) -> Result<RealizedVolState> {
172 let end_ms = Utc::now().timestamp_millis();
173 let lookback_ms = i64::from(self.config.lookback_days) * 24 * 60 * 60 * 1000;
174 let start_ms = end_ms
175 .checked_sub(lookback_ms)
176 .context("realized-vol lookback overflow")?;
177 let request = CandleSnapshotRequest {
178 request_type: "candleSnapshot",
179 req: CandleSnapshotRequestBody {
180 coin: self.config.candle_coin.clone(),
181 interval: "1h",
182 start_time: start_ms,
183 end_time: end_ms,
184 },
185 };
186
187 let candles = self
188 .client
189 .post(&self.config.info_url)
190 .json(&request)
191 .send()
192 .await
193 .with_context(|| {
194 format!(
195 "Failed to fetch candles for {} from {}",
196 self.config.candle_coin, self.config.info_url
197 )
198 })?
199 .error_for_status()
200 .with_context(|| "Hyperliquid candle API returned non-success")?
201 .json::<Vec<CandleSnapshot>>()
202 .await
203 .with_context(|| "Failed to decode Hyperliquid candle response")?;
204
205 let mut closes = candles
206 .into_iter()
207 .map(|candle| {
208 let close = candle.close.parse::<f64>().with_context(|| {
209 format!(
210 "Failed to parse close={} for candle {}",
211 candle.close, candle.start_time_ms
212 )
213 })?;
214 if !close.is_finite() || close <= 0.0 {
215 anyhow::bail!(
216 "Invalid close={} for candle {}",
217 close,
218 candle.start_time_ms
219 );
220 }
221 Ok((candle.start_time_ms, close))
222 })
223 .collect::<Result<Vec<_>>>()?;
224 closes.sort_by_key(|(ts, _)| *ts);
225
226 let (iv, sample_count) = realized_vol_from_closes(
227 closes.iter().map(|(_, close)| *close),
228 self.config.annualization_days,
229 self.config.multiplier,
230 self.config.floor_iv,
231 self.config.cap_iv,
232 )?;
233 if sample_count < self.config.min_samples {
234 anyhow::bail!(
235 "Realized vol had only {} return samples (need >= {})",
236 sample_count,
237 self.config.min_samples
238 );
239 }
240 let spot = closes
241 .last()
242 .map(|(_, close)| *close)
243 .context("Realized vol candle response was empty")?;
244
245 Ok(RealizedVolState {
246 iv,
247 spot,
248 sample_count,
249 last_update_ts_ms: end_ms,
250 })
251 }
252
253 fn status_for(&self, symbol: &str) -> VolOracleStatus {
254 let state = self
255 .state
256 .read()
257 .expect("realized vol oracle state poisoned");
258 let value = state.values.get(symbol);
259 let staleness_seconds = value.map(|v| {
260 ((Utc::now().timestamp_millis() - v.last_update_ts_ms) as f64 / 1000.0).max(0.0)
261 });
262 let ready = staleness_seconds
263 .map(|age| age <= self.config.staleness_threshold.as_secs_f64())
264 .unwrap_or(false);
265
266 VolOracleStatus {
267 underlying: symbol.to_string(),
268 provider: VolProviderKind::RealizedVol,
269 route_facing: true,
270 connected: state.connected.get(symbol).copied().unwrap_or(false),
271 ready,
272 last_update_ts_ms: value.map(|v| v.last_update_ts_ms),
273 staleness_seconds,
274 staleness_threshold_seconds: Some(self.config.staleness_threshold.as_secs_f64()),
275 surface_points: value
276 .map(|_| self.synthetic_surface_point_count())
277 .unwrap_or(0),
278 messages_received: self.messages_received.load(Ordering::Relaxed),
279 last_error: state.last_error.get(symbol).cloned(),
280 }
281 }
282
283 fn synthetic_surface_point_count(&self) -> usize {
284 let strike_count = (((self.config.strike_max - self.config.strike_min)
285 / self.config.strike_step)
286 .floor() as usize)
287 + 1;
288 strike_count * synthetic_expiries(Utc::now().timestamp()).len()
289 }
290}
291
292impl RiskVolOracle for RealizedVolOracle {
293 fn get_iv(
294 &self,
295 underlying: &str,
296 _strike: f64,
297 _expiry_ts: i64,
298 ) -> Result<f64, VolLookupError> {
299 let state = self
300 .state
301 .read()
302 .expect("realized vol oracle state poisoned");
303 let value = match state.values.get(underlying) {
304 Some(value) => value,
305 None => {
306 return Err(VolLookupError::UnhealthyProvider {
307 underlying: underlying.to_string(),
308 provider: VolProviderKind::RealizedVol,
309 reason: state
310 .last_error
311 .get(underlying)
312 .cloned()
313 .unwrap_or_else(|| "not connected".to_string()),
314 });
315 }
316 };
317
318 let staleness_seconds =
319 ((Utc::now().timestamp_millis() - value.last_update_ts_ms) as f64 / 1000.0).max(0.0);
320 if staleness_seconds > self.config.staleness_threshold.as_secs_f64() {
321 return Err(VolLookupError::StaleSurface {
322 underlying: underlying.to_string(),
323 provider: VolProviderKind::RealizedVol,
324 staleness_seconds,
325 threshold_seconds: self.config.staleness_threshold.as_secs_f64(),
326 });
327 }
328
329 Ok(value.iv)
330 }
331
332 fn statuses(&self) -> Vec<VolOracleStatus> {
333 self.config
334 .symbols
335 .iter()
336 .map(|symbol| self.status_for(symbol))
337 .collect()
338 }
339
340 fn get_surface_snapshot(&self, underlying: &str) -> Option<VolSurfaceSnapshot> {
341 let state = self
342 .state
343 .read()
344 .expect("realized vol oracle state poisoned");
345 let value = state.values.get(underlying)?;
346 let now_ts = Utc::now().timestamp();
347 let expiries = synthetic_expiries(now_ts);
348 let mut strike_points = Vec::new();
349 let mut strike = self.config.strike_min;
350 while strike <= self.config.strike_max + f64::EPSILON {
351 for &expiry in &expiries {
352 strike_points.push(VolPoint {
353 strike,
354 expiry,
355 iv: value.iv,
356 timestamp: value.last_update_ts_ms,
357 });
358 }
359 strike += self.config.strike_step;
360 }
361
362 Some(VolSurfaceSnapshot {
363 underlying: underlying.to_string(),
364 last_update_ts_ms: Some(value.last_update_ts_ms),
365 expiries,
366 strike_points,
367 delta_curves: Vec::new(),
368 atm_vols: Vec::new(),
369 spot_price: Some(value.spot),
370 })
371 }
372
373 fn supports_surface_snapshots(&self) -> bool {
374 true
375 }
376}
377
378fn synthetic_expiries(now_ts: i64) -> Vec<i64> {
379 let hour = 60 * 60;
380 let day = 24 * hour;
381 let mut expiries = Vec::new();
382 expiries.push(now_ts);
383 for h in 1..=48 {
384 expiries.push(now_ts + h * hour);
385 }
386 for d in 3..=14 {
387 expiries.push(now_ts + d * day);
388 }
389 for w in 3..=12 {
390 expiries.push(now_ts + w * 7 * day);
391 }
392 expiries
393}
394
395fn realized_vol_from_closes(
396 closes: impl IntoIterator<Item = f64>,
397 annualization_days: f64,
398 multiplier: f64,
399 floor_iv: f64,
400 cap_iv: f64,
401) -> Result<(f64, usize)> {
402 let closes = closes.into_iter().collect::<Vec<_>>();
403 if closes.len() < 2 {
404 anyhow::bail!("Need at least 2 closes to compute realized vol");
405 }
406
407 let mut returns = Vec::with_capacity(closes.len() - 1);
408 for pair in closes.windows(2) {
409 let prev = pair[0];
410 let next = pair[1];
411 if !prev.is_finite() || !next.is_finite() || prev <= 0.0 || next <= 0.0 {
412 anyhow::bail!("Realized vol closes must be finite and positive");
413 }
414 returns.push((next / prev).ln());
415 }
416 if returns.len() < 2 {
417 anyhow::bail!("Need at least 2 returns to compute realized vol variance");
418 }
419
420 let mean = returns.iter().sum::<f64>() / returns.len() as f64;
421 let variance = returns
422 .iter()
423 .map(|r| {
424 let diff = r - mean;
425 diff * diff
426 })
427 .sum::<f64>()
428 / (returns.len() - 1) as f64;
429 let annualized = variance.sqrt() * (annualization_days * 24.0).sqrt();
430 let adjusted = annualized * multiplier;
431 Ok((adjusted.clamp(floor_iv, cap_iv), returns.len()))
432}
433
434#[cfg(test)]
435mod tests {
436 use super::*;
437
438 #[test]
439 fn realized_vol_applies_multiplier_and_bounds() {
440 let closes = [100.0, 101.0, 99.0, 103.0, 100.0, 104.0];
441 let (iv, samples) = realized_vol_from_closes(closes, 365.0, 1.25, 0.10, 5.0).expect("rv");
442
443 assert_eq!(samples, 5);
444 assert!(iv > 0.10);
445 assert!(iv < 5.0);
446 }
447
448 #[test]
449 fn realized_vol_fails_without_enough_returns() {
450 let err = realized_vol_from_closes([100.0, 101.0], 365.0, 1.0, 0.10, 5.0)
451 .expect_err("single return cannot estimate sample variance");
452
453 assert!(err.to_string().contains("Need at least 2 returns"));
454 }
455}