Skip to main content

hypercall_vol_oracle/
realized_vol_oracle.rs

1//! Realized-volatility oracle for assets without live option-market anchors.
2//!
3//! The provider derives a single annualized volatility from underlying candles.
4//! Any smile or wing behavior must be added as explicit risk policy elsewhere;
5//! this oracle intentionally does not pretend to calibrate an option surface.
6
7use std::collections::HashMap;
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::{Arc, RwLock};
10use std::time::Duration;
11
12use anyhow::{Context, Result};
13use chrono::Utc;
14use metrics::counter;
15use reqwest::Client;
16use serde::{Deserialize, Serialize};
17use tokio::task::JoinHandle;
18use tracing::{error, info, warn};
19
20use super::risk_oracle::{
21    RiskVolOracle, VolLookupError, VolOracleStatus, VolProviderKind, VolSurfaceSnapshot,
22};
23use super::vol_surface_cache::VolPoint;
24
25#[derive(Debug, Clone)]
26pub struct RealizedVolOracleConfig {
27    pub info_url: String,
28    pub candle_coin: String,
29    pub lookback_days: u32,
30    pub min_samples: usize,
31    pub annualization_days: f64,
32    pub floor_iv: f64,
33    pub cap_iv: f64,
34    pub multiplier: f64,
35    pub poll_interval: Duration,
36    pub staleness_threshold: Duration,
37    pub strike_min: f64,
38    pub strike_max: f64,
39    pub strike_step: f64,
40    pub symbols: Vec<String>,
41}
42
43#[derive(Debug, Clone)]
44struct RealizedVolState {
45    iv: f64,
46    spot: f64,
47    sample_count: usize,
48    last_update_ts_ms: i64,
49}
50
51#[derive(Debug, Default)]
52struct RealizedVolOracleState {
53    values: HashMap<String, RealizedVolState>,
54    connected: HashMap<String, bool>,
55    last_error: HashMap<String, String>,
56}
57
58#[derive(Debug, Serialize)]
59struct CandleSnapshotRequest {
60    #[serde(rename = "type")]
61    request_type: &'static str,
62    req: CandleSnapshotRequestBody,
63}
64
65#[derive(Debug, Serialize)]
66#[serde(rename_all = "camelCase")]
67struct CandleSnapshotRequestBody {
68    coin: String,
69    interval: &'static str,
70    start_time: i64,
71    end_time: i64,
72}
73
74#[derive(Debug, Deserialize)]
75struct CandleSnapshot {
76    #[serde(rename = "t")]
77    start_time_ms: i64,
78    #[serde(rename = "c")]
79    close: String,
80}
81
82pub struct RealizedVolOracle {
83    client: Client,
84    config: RealizedVolOracleConfig,
85    state: Arc<RwLock<RealizedVolOracleState>>,
86    messages_received: AtomicU64,
87}
88
89impl RealizedVolOracle {
90    pub fn new(config: RealizedVolOracleConfig) -> Self {
91        Self {
92            client: Client::builder()
93                .timeout(Duration::from_secs(15))
94                .build()
95                .expect("failed to build HTTP client"),
96            config,
97            state: Arc::new(RwLock::new(RealizedVolOracleState::default())),
98            messages_received: AtomicU64::new(0),
99        }
100    }
101
102    pub fn start_polling(self: Arc<Self>) -> JoinHandle<()> {
103        tokio::spawn(async move {
104            let mut interval = tokio::time::interval(self.config.poll_interval);
105            loop {
106                interval.tick().await;
107                if let Err(err) = self.refresh_all().await {
108                    error!("Realized vol oracle refresh failed: {err:#}");
109                }
110            }
111        })
112    }
113
114    async fn refresh_all(&self) -> Result<()> {
115        match self.fetch_realized_vol().await {
116            Ok(update) => {
117                let sample_count = update.sample_count;
118                let iv = update.iv;
119                let spot = update.spot;
120                {
121                    let mut state = self
122                        .state
123                        .write()
124                        .expect("realized vol oracle state poisoned");
125                    for symbol in &self.config.symbols {
126                        state.values.insert(symbol.clone(), update.clone());
127                        state.connected.insert(symbol.clone(), true);
128                        state.last_error.remove(symbol);
129                    }
130                }
131                self.messages_received.fetch_add(1, Ordering::Relaxed);
132                for symbol in &self.config.symbols {
133                    counter!(
134                        "ht_vol_oracle_messages_received_total",
135                        "provider" => VolProviderKind::RealizedVol.as_str(),
136                        "underlying" => symbol.clone()
137                    )
138                    .increment(1);
139                }
140                info!(
141                    coin = %self.config.candle_coin,
142                    iv,
143                    spot,
144                    sample_count,
145                    "Updated realized-vol oracle"
146                );
147            }
148            Err(err) => {
149                let message = err.to_string();
150                let mut state = self
151                    .state
152                    .write()
153                    .expect("realized vol oracle state poisoned");
154                for symbol in &self.config.symbols {
155                    state.connected.insert(symbol.clone(), false);
156                    state.last_error.insert(symbol.clone(), message.clone());
157                    if state.values.contains_key(symbol) {
158                        warn!(
159                            "Realized vol refresh failed for {} (keeping last good data): {}",
160                            symbol, message
161                        );
162                    } else {
163                        warn!("Realized vol refresh failed for {}: {}", symbol, message);
164                    }
165                }
166            }
167        }
168        Ok(())
169    }
170
171    async fn fetch_realized_vol(&self) -> Result<RealizedVolState> {
172        let end_ms = Utc::now().timestamp_millis();
173        let lookback_ms = i64::from(self.config.lookback_days) * 24 * 60 * 60 * 1000;
174        let start_ms = end_ms
175            .checked_sub(lookback_ms)
176            .context("realized-vol lookback overflow")?;
177        let request = CandleSnapshotRequest {
178            request_type: "candleSnapshot",
179            req: CandleSnapshotRequestBody {
180                coin: self.config.candle_coin.clone(),
181                interval: "1h",
182                start_time: start_ms,
183                end_time: end_ms,
184            },
185        };
186
187        let candles = self
188            .client
189            .post(&self.config.info_url)
190            .json(&request)
191            .send()
192            .await
193            .with_context(|| {
194                format!(
195                    "Failed to fetch candles for {} from {}",
196                    self.config.candle_coin, self.config.info_url
197                )
198            })?
199            .error_for_status()
200            .with_context(|| "Hyperliquid candle API returned non-success")?
201            .json::<Vec<CandleSnapshot>>()
202            .await
203            .with_context(|| "Failed to decode Hyperliquid candle response")?;
204
205        let mut closes = candles
206            .into_iter()
207            .map(|candle| {
208                let close = candle.close.parse::<f64>().with_context(|| {
209                    format!(
210                        "Failed to parse close={} for candle {}",
211                        candle.close, candle.start_time_ms
212                    )
213                })?;
214                if !close.is_finite() || close <= 0.0 {
215                    anyhow::bail!(
216                        "Invalid close={} for candle {}",
217                        close,
218                        candle.start_time_ms
219                    );
220                }
221                Ok((candle.start_time_ms, close))
222            })
223            .collect::<Result<Vec<_>>>()?;
224        closes.sort_by_key(|(ts, _)| *ts);
225
226        let (iv, sample_count) = realized_vol_from_closes(
227            closes.iter().map(|(_, close)| *close),
228            self.config.annualization_days,
229            self.config.multiplier,
230            self.config.floor_iv,
231            self.config.cap_iv,
232        )?;
233        if sample_count < self.config.min_samples {
234            anyhow::bail!(
235                "Realized vol had only {} return samples (need >= {})",
236                sample_count,
237                self.config.min_samples
238            );
239        }
240        let spot = closes
241            .last()
242            .map(|(_, close)| *close)
243            .context("Realized vol candle response was empty")?;
244
245        Ok(RealizedVolState {
246            iv,
247            spot,
248            sample_count,
249            last_update_ts_ms: end_ms,
250        })
251    }
252
253    fn status_for(&self, symbol: &str) -> VolOracleStatus {
254        let state = self
255            .state
256            .read()
257            .expect("realized vol oracle state poisoned");
258        let value = state.values.get(symbol);
259        let staleness_seconds = value.map(|v| {
260            ((Utc::now().timestamp_millis() - v.last_update_ts_ms) as f64 / 1000.0).max(0.0)
261        });
262        let ready = staleness_seconds
263            .map(|age| age <= self.config.staleness_threshold.as_secs_f64())
264            .unwrap_or(false);
265
266        VolOracleStatus {
267            underlying: symbol.to_string(),
268            provider: VolProviderKind::RealizedVol,
269            route_facing: true,
270            connected: state.connected.get(symbol).copied().unwrap_or(false),
271            ready,
272            last_update_ts_ms: value.map(|v| v.last_update_ts_ms),
273            staleness_seconds,
274            staleness_threshold_seconds: Some(self.config.staleness_threshold.as_secs_f64()),
275            surface_points: value
276                .map(|_| self.synthetic_surface_point_count())
277                .unwrap_or(0),
278            messages_received: self.messages_received.load(Ordering::Relaxed),
279            last_error: state.last_error.get(symbol).cloned(),
280        }
281    }
282
283    fn synthetic_surface_point_count(&self) -> usize {
284        let strike_count = (((self.config.strike_max - self.config.strike_min)
285            / self.config.strike_step)
286            .floor() as usize)
287            + 1;
288        strike_count * synthetic_expiries(Utc::now().timestamp()).len()
289    }
290}
291
292impl RiskVolOracle for RealizedVolOracle {
293    fn get_iv(
294        &self,
295        underlying: &str,
296        _strike: f64,
297        _expiry_ts: i64,
298    ) -> Result<f64, VolLookupError> {
299        let state = self
300            .state
301            .read()
302            .expect("realized vol oracle state poisoned");
303        let value = match state.values.get(underlying) {
304            Some(value) => value,
305            None => {
306                return Err(VolLookupError::UnhealthyProvider {
307                    underlying: underlying.to_string(),
308                    provider: VolProviderKind::RealizedVol,
309                    reason: state
310                        .last_error
311                        .get(underlying)
312                        .cloned()
313                        .unwrap_or_else(|| "not connected".to_string()),
314                });
315            }
316        };
317
318        let staleness_seconds =
319            ((Utc::now().timestamp_millis() - value.last_update_ts_ms) as f64 / 1000.0).max(0.0);
320        if staleness_seconds > self.config.staleness_threshold.as_secs_f64() {
321            return Err(VolLookupError::StaleSurface {
322                underlying: underlying.to_string(),
323                provider: VolProviderKind::RealizedVol,
324                staleness_seconds,
325                threshold_seconds: self.config.staleness_threshold.as_secs_f64(),
326            });
327        }
328
329        Ok(value.iv)
330    }
331
332    fn statuses(&self) -> Vec<VolOracleStatus> {
333        self.config
334            .symbols
335            .iter()
336            .map(|symbol| self.status_for(symbol))
337            .collect()
338    }
339
340    fn get_surface_snapshot(&self, underlying: &str) -> Option<VolSurfaceSnapshot> {
341        let state = self
342            .state
343            .read()
344            .expect("realized vol oracle state poisoned");
345        let value = state.values.get(underlying)?;
346        let now_ts = Utc::now().timestamp();
347        let expiries = synthetic_expiries(now_ts);
348        let mut strike_points = Vec::new();
349        let mut strike = self.config.strike_min;
350        while strike <= self.config.strike_max + f64::EPSILON {
351            for &expiry in &expiries {
352                strike_points.push(VolPoint {
353                    strike,
354                    expiry,
355                    iv: value.iv,
356                    timestamp: value.last_update_ts_ms,
357                });
358            }
359            strike += self.config.strike_step;
360        }
361
362        Some(VolSurfaceSnapshot {
363            underlying: underlying.to_string(),
364            last_update_ts_ms: Some(value.last_update_ts_ms),
365            expiries,
366            strike_points,
367            delta_curves: Vec::new(),
368            atm_vols: Vec::new(),
369            spot_price: Some(value.spot),
370        })
371    }
372
373    fn supports_surface_snapshots(&self) -> bool {
374        true
375    }
376}
377
378fn synthetic_expiries(now_ts: i64) -> Vec<i64> {
379    let hour = 60 * 60;
380    let day = 24 * hour;
381    let mut expiries = Vec::new();
382    expiries.push(now_ts);
383    for h in 1..=48 {
384        expiries.push(now_ts + h * hour);
385    }
386    for d in 3..=14 {
387        expiries.push(now_ts + d * day);
388    }
389    for w in 3..=12 {
390        expiries.push(now_ts + w * 7 * day);
391    }
392    expiries
393}
394
395fn realized_vol_from_closes(
396    closes: impl IntoIterator<Item = f64>,
397    annualization_days: f64,
398    multiplier: f64,
399    floor_iv: f64,
400    cap_iv: f64,
401) -> Result<(f64, usize)> {
402    let closes = closes.into_iter().collect::<Vec<_>>();
403    if closes.len() < 2 {
404        anyhow::bail!("Need at least 2 closes to compute realized vol");
405    }
406
407    let mut returns = Vec::with_capacity(closes.len() - 1);
408    for pair in closes.windows(2) {
409        let prev = pair[0];
410        let next = pair[1];
411        if !prev.is_finite() || !next.is_finite() || prev <= 0.0 || next <= 0.0 {
412            anyhow::bail!("Realized vol closes must be finite and positive");
413        }
414        returns.push((next / prev).ln());
415    }
416    if returns.len() < 2 {
417        anyhow::bail!("Need at least 2 returns to compute realized vol variance");
418    }
419
420    let mean = returns.iter().sum::<f64>() / returns.len() as f64;
421    let variance = returns
422        .iter()
423        .map(|r| {
424            let diff = r - mean;
425            diff * diff
426        })
427        .sum::<f64>()
428        / (returns.len() - 1) as f64;
429    let annualized = variance.sqrt() * (annualization_days * 24.0).sqrt();
430    let adjusted = annualized * multiplier;
431    Ok((adjusted.clamp(floor_iv, cap_iv), returns.len()))
432}
433
434#[cfg(test)]
435mod tests {
436    use super::*;
437
438    #[test]
439    fn realized_vol_applies_multiplier_and_bounds() {
440        let closes = [100.0, 101.0, 99.0, 103.0, 100.0, 104.0];
441        let (iv, samples) = realized_vol_from_closes(closes, 365.0, 1.25, 0.10, 5.0).expect("rv");
442
443        assert_eq!(samples, 5);
444        assert!(iv > 0.10);
445        assert!(iv < 5.0);
446    }
447
448    #[test]
449    fn realized_vol_fails_without_enough_returns() {
450        let err = realized_vol_from_closes([100.0, 101.0], 365.0, 1.0, 0.10, 5.0)
451            .expect_err("single return cannot estimate sample variance");
452
453        assert!(err.to_string().contains("Need at least 2 returns"));
454    }
455}