Skip to main content

hypercall_vol_oracle/
derive_oracle.rs

1//! Derive (Lyra) Volatility Oracle implementation.
2//!
3//! TODO(clients): Move Derive HTTP transport and wire models into a dedicated client crate.
4//!
5//! Polls Derive's public API for option tickers and populates a volatility
6//! surface with exchange-computed IV values. No API key required.
7//!
8//! API pattern:
9//! 1. GET /public/get_instruments?currency=X&instrument_type=option&expired=false
10//!    -> list of instruments with expiry timestamps
11//! 2. POST /public/get_tickers {currency, instrument_type, expiry_date}
12//!    -> bulk ticker data per expiry with IV in option_pricing.i
13
14use std::collections::{BTreeSet, HashMap};
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::sync::{Arc, RwLock};
17use std::time::Duration;
18
19use anyhow::{Context, Result};
20use chrono::Utc;
21use metrics::counter;
22use reqwest::Client;
23use serde::Deserialize;
24use tokio::task::JoinHandle;
25use tracing::{debug, error, info, warn};
26
27use super::risk_oracle::{
28    RiskVolOracle, VolLookupError, VolOracleStatus, VolProviderKind, VolSurfaceSnapshot,
29};
30use super::vol_surface_cache::VolatilitySurface;
31
32pub const DEFAULT_DERIVE_BASE_URL: &str = "https://api.lyra.finance";
33
34#[derive(Debug, Clone)]
35pub struct DeriveVolOracleConfig {
36    pub base_url: String,
37    pub poll_interval: Duration,
38    pub staleness_threshold: Duration,
39    /// Underlyings to poll (e.g., ["HYPE"]).
40    pub symbols: Vec<String>,
41}
42
43#[derive(Debug, Default)]
44struct DeriveSurfaceState {
45    surfaces: HashMap<String, VolatilitySurface>,
46    connected: HashMap<String, bool>,
47    last_update_ts_ms: HashMap<String, i64>,
48    last_error: HashMap<String, String>,
49}
50
51// --- API response types ---
52
53#[derive(Debug, Deserialize)]
54struct DeriveInstrumentsResponse {
55    result: Vec<DeriveInstrument>,
56}
57
58#[derive(Debug, Deserialize)]
59#[allow(dead_code)]
60struct DeriveInstrument {
61    instrument_name: String,
62    option_details: Option<DeriveOptionDetails>,
63}
64
65#[derive(Debug, Deserialize)]
66#[allow(dead_code)]
67struct DeriveOptionDetails {
68    expiry: i64,
69    strike: String,
70}
71
72/// Bulk tickers response. Keyed by instrument name with abbreviated fields.
73#[derive(Debug, Deserialize)]
74struct DeriveTickersResponse {
75    result: DeriveTickersResult,
76}
77
78#[derive(Debug, Deserialize)]
79struct DeriveTickersResult {
80    tickers: HashMap<String, DeriveTicker>,
81}
82
83#[derive(Debug, Deserialize)]
84struct DeriveTicker {
85    /// Index (spot) price
86    #[serde(rename = "I")]
87    index_price: Option<String>,
88    /// Option pricing with abbreviated keys
89    option_pricing: Option<DeriveOptionPricing>,
90}
91
92#[derive(Debug, Deserialize)]
93#[allow(dead_code)]
94struct DeriveOptionPricing {
95    /// Implied volatility as decimal string (e.g., "0.812" = 81.2%)
96    #[serde(rename = "i")]
97    iv: Option<String>,
98    /// Mark price
99    #[serde(rename = "m")]
100    mark_price: Option<String>,
101}
102
103pub struct DeriveVolOracle {
104    client: Client,
105    config: DeriveVolOracleConfig,
106    state: Arc<RwLock<DeriveSurfaceState>>,
107    messages_received: AtomicU64,
108}
109
110impl DeriveVolOracle {
111    pub fn new(config: DeriveVolOracleConfig) -> Self {
112        Self {
113            client: Client::builder()
114                .timeout(Duration::from_secs(15))
115                .build()
116                .expect("failed to build HTTP client"),
117            config,
118            state: Arc::new(RwLock::new(DeriveSurfaceState::default())),
119            messages_received: AtomicU64::new(0),
120        }
121    }
122
123    pub fn start_polling(self: Arc<Self>) -> JoinHandle<()> {
124        tokio::spawn(async move {
125            let mut interval = tokio::time::interval(self.config.poll_interval);
126            loop {
127                interval.tick().await;
128                if let Err(err) = self.refresh_all().await {
129                    error!("Derive vol oracle refresh failed: {err:#}");
130                }
131            }
132        })
133    }
134
135    async fn refresh_all(&self) -> Result<()> {
136        for symbol in &self.config.symbols {
137            match self.fetch_surface(symbol).await {
138                Ok((surface, spot)) => {
139                    let now_ms = Utc::now().timestamp_millis();
140                    let point_count = surface.len();
141                    {
142                        let mut state = self
143                            .state
144                            .write()
145                            .expect("derive vol oracle state poisoned");
146                        // Only swap if new surface is at least 50% the size of the
147                        // existing one to avoid replacing a full surface with a
148                        // partial one when some per-expiry ticker fetches fail.
149                        let existing_points =
150                            state.surfaces.get(symbol).map(|s| s.len()).unwrap_or(0);
151                        let accepted = point_count > 0
152                            && (existing_points == 0 || point_count >= existing_points / 2);
153                        if accepted {
154                            state.surfaces.insert(symbol.clone(), surface);
155                            state.connected.insert(symbol.clone(), true);
156                            state.last_update_ts_ms.insert(symbol.clone(), now_ms);
157                            state.last_error.remove(symbol);
158                        } else if existing_points > 0 {
159                            // Partial refresh rejected - don't update timestamp so
160                            // staleness gate will eventually fire if this persists.
161                            warn!(
162                                "Derive partial refresh for {} ({} points vs {} existing), keeping old surface",
163                                symbol, point_count, existing_points
164                            );
165                        }
166                    }
167                    self.messages_received.fetch_add(1, Ordering::Relaxed);
168                    counter!(
169                        "ht_vol_oracle_messages_received_total",
170                        "provider" => VolProviderKind::Derive.as_str(),
171                        "underlying" => symbol.clone()
172                    )
173                    .increment(1);
174                    info!(
175                        "Updated Derive vol surface for {} with {} points (spot ${:.2})",
176                        symbol, point_count, spot
177                    );
178                }
179                Err(err) => {
180                    let message = err.to_string();
181                    let mut state = self
182                        .state
183                        .write()
184                        .expect("derive vol oracle state poisoned");
185                    state.connected.insert(symbol.clone(), false);
186                    state.last_error.insert(symbol.clone(), message.clone());
187                    if state.surfaces.contains_key(symbol) {
188                        warn!(
189                            "Derive vol surface refresh failed for {} (keeping last good data): {}",
190                            symbol, message
191                        );
192                    } else {
193                        warn!(
194                            "Derive vol surface refresh failed for {}: {}",
195                            symbol, message
196                        );
197                    }
198                }
199            }
200        }
201
202        Ok(())
203    }
204
205    /// Fetch the full vol surface for a single underlying from Derive.
206    ///
207    /// 1. Fetch instruments to discover expiry dates
208    /// 2. For each expiry, POST get_tickers to get bulk IV data
209    /// 3. Parse instrument names for strike, read IV from option_pricing.i
210    async fn fetch_surface(&self, symbol: &str) -> Result<(VolatilitySurface, f64)> {
211        // Step 1: Fetch instruments to discover expiries
212        let instruments_url = format!(
213            "{}/public/get_instruments?instrument_type=option&currency={}&expired=false",
214            self.config.base_url, symbol
215        );
216        let instruments_resp = self
217            .client
218            .get(&instruments_url)
219            .send()
220            .await
221            .with_context(|| format!("Failed to fetch Derive instruments for {symbol}"))?
222            .error_for_status()
223            .with_context(|| format!("Derive instruments returned non-success for {symbol}"))?
224            .json::<DeriveInstrumentsResponse>()
225            .await
226            .with_context(|| format!("Failed to decode Derive instruments for {symbol}"))?;
227
228        // Collect unique expiry timestamps
229        let now_ts = Utc::now().timestamp();
230        let mut expiry_dates: BTreeSet<(i64, String)> = BTreeSet::new();
231        for inst in &instruments_resp.result {
232            if let Some(ref details) = inst.option_details {
233                if details.expiry > now_ts {
234                    if let Some(dt) = chrono::DateTime::from_timestamp(details.expiry, 0) {
235                        let date_str = dt.format("%Y%m%d").to_string();
236                        expiry_dates.insert((details.expiry, date_str));
237                    } else {
238                        warn!(
239                            "Skipping invalid Derive expiry timestamp: {}",
240                            details.expiry
241                        );
242                    }
243                }
244            }
245        }
246
247        if expiry_dates.is_empty() {
248            anyhow::bail!("Derive returned no active expiries for {symbol}");
249        }
250
251        // Step 2: For each expiry, fetch tickers
252        // Use $0.5 strike precision since Derive lists half-dollar strikes
253        // (e.g., HYPE at $34.5, $35.0). The default $100 precision would
254        // quantize all HYPE strikes into ~2 buckets.
255        let mut surface = VolatilitySurface::with_precision(0.5);
256        let mut spot_price = 0.0_f64;
257        let mut total_inserted = 0_u32;
258
259        let tickers_url = format!("{}/public/get_tickers", self.config.base_url);
260
261        for (expiry_ts, expiry_date) in &expiry_dates {
262            let body = serde_json::json!({
263                "currency": symbol,
264                "instrument_type": "option",
265                "expiry_date": expiry_date,
266            });
267
268            let resp = match self.client.post(&tickers_url).json(&body).send().await {
269                Ok(r) => match r.error_for_status() {
270                    Ok(r) => r,
271                    Err(e) => {
272                        warn!(
273                            "Derive get_tickers HTTP error for {} expiry {}: {}",
274                            symbol, expiry_date, e
275                        );
276                        continue;
277                    }
278                },
279                Err(e) => {
280                    warn!(
281                        "Derive get_tickers failed for {} expiry {}: {}",
282                        symbol, expiry_date, e
283                    );
284                    continue;
285                }
286            };
287
288            let tickers: DeriveTickersResponse = match resp.json().await {
289                Ok(t) => t,
290                Err(e) => {
291                    warn!(
292                        "Derive get_tickers decode failed for {} expiry {}: {}",
293                        symbol, expiry_date, e
294                    );
295                    continue;
296                }
297            };
298
299            let mut expiry_count = 0_u32;
300            for (inst_name, ticker) in &tickers.result.tickers {
301                // Parse strike from instrument name: "HYPE-20260410-37-C" -> 37
302                // Also handles half-strikes like "HYPE-20260410-34_5-C" -> 34.5
303                let strike = match parse_strike_from_instrument(inst_name) {
304                    Some(s) => s,
305                    None => continue,
306                };
307
308                // Get IV
309                let iv = match ticker
310                    .option_pricing
311                    .as_ref()
312                    .and_then(|op| op.iv.as_ref())
313                    .and_then(|s| s.parse::<f64>().ok())
314                {
315                    Some(v) if v > 0.01 && v < 5.0 => v,
316                    _ => continue,
317                };
318
319                // Get spot price
320                if let Some(ref idx) = ticker.index_price {
321                    if let Ok(p) = idx.parse::<f64>() {
322                        if p > 0.0 {
323                            spot_price = p;
324                        }
325                    }
326                }
327
328                surface.insert(strike, *expiry_ts, iv);
329                expiry_count += 1;
330
331                // Set ATM vol if close to spot
332                if spot_price > 0.0 {
333                    let moneyness = (strike / spot_price - 1.0).abs();
334                    if moneyness < 0.03 {
335                        surface.set_atm_vol(*expiry_ts, iv);
336                    }
337                }
338            }
339
340            total_inserted += expiry_count;
341            debug!(
342                "Derive {} expiry {}: {} IV points",
343                symbol, expiry_date, expiry_count
344            );
345        }
346
347        if total_inserted == 0 {
348            anyhow::bail!("Derive returned no usable IV points for {symbol}");
349        }
350
351        // Enforce static no-arbitrage on the ingested surface. Derive quotes
352        // HYPE 0DTE/weeklies from a thin order book where sparse strike
353        // coverage routinely leaves monotonicity/butterfly violations in the
354        // raw mark IVs.
355        if spot_price > 0.0 {
356            let clamps = surface.sanitize_arb_free(spot_price, 0.0);
357            if clamps > 0 {
358                counter!(
359                    "ht_vol_surface_arb_clamps_total",
360                    "provider" => VolProviderKind::Derive.as_str(),
361                    "underlying" => symbol.to_string()
362                )
363                .increment(clamps as u64);
364                info!(
365                    underlying = symbol,
366                    clamps, "Clamped non-arb-free vol surface points (derive)"
367                );
368            }
369        }
370
371        Ok((surface, spot_price))
372    }
373
374    fn status_for(&self, symbol: &str) -> VolOracleStatus {
375        let state = self.state.read().expect("derive vol oracle state poisoned");
376        let last_update_ts_ms = state.last_update_ts_ms.get(symbol).copied();
377        let staleness_seconds = last_update_ts_ms
378            .map(|ts| ((Utc::now().timestamp_millis() - ts) as f64 / 1000.0).max(0.0));
379        let ready = staleness_seconds
380            .map(|age| age <= self.config.staleness_threshold.as_secs_f64())
381            .unwrap_or(false);
382        let surface_points = state
383            .surfaces
384            .get(symbol)
385            .map(VolatilitySurface::len)
386            .unwrap_or(0);
387
388        VolOracleStatus {
389            underlying: symbol.to_string(),
390            provider: VolProviderKind::Derive,
391            route_facing: true,
392            connected: state.connected.get(symbol).copied().unwrap_or(false),
393            ready,
394            last_update_ts_ms,
395            staleness_seconds,
396            staleness_threshold_seconds: Some(self.config.staleness_threshold.as_secs_f64()),
397            surface_points,
398            messages_received: self.messages_received.load(Ordering::Relaxed),
399            last_error: state.last_error.get(symbol).cloned(),
400        }
401    }
402}
403
404impl RiskVolOracle for DeriveVolOracle {
405    fn get_iv(&self, underlying: &str, strike: f64, expiry_ts: i64) -> Result<f64, VolLookupError> {
406        let state = self.state.read().expect("derive vol oracle state poisoned");
407        let has_surface = state.surfaces.contains_key(underlying);
408
409        // If disconnected and no cached surface, fail immediately
410        let connected = state.connected.get(underlying).copied().unwrap_or(false);
411        if !connected && !has_surface {
412            return Err(VolLookupError::UnhealthyProvider {
413                underlying: underlying.to_string(),
414                provider: VolProviderKind::Derive,
415                reason: state
416                    .last_error
417                    .get(underlying)
418                    .cloned()
419                    .unwrap_or_else(|| "not connected".to_string()),
420            });
421        }
422
423        // Check staleness (applies whether connected or not)
424        let last_update_ts_ms = state.last_update_ts_ms.get(underlying).copied();
425        let staleness_seconds = last_update_ts_ms
426            .map(|ts| ((Utc::now().timestamp_millis() - ts) as f64 / 1000.0).max(0.0));
427        let ready = staleness_seconds
428            .map(|age| age <= self.config.staleness_threshold.as_secs_f64())
429            .unwrap_or(false);
430
431        if !ready {
432            return Err(VolLookupError::StaleSurface {
433                underlying: underlying.to_string(),
434                provider: VolProviderKind::Derive,
435                staleness_seconds: staleness_seconds.unwrap_or(f64::INFINITY),
436                threshold_seconds: self.config.staleness_threshold.as_secs_f64(),
437            });
438        }
439
440        let iv = state
441            .surfaces
442            .get(underlying)
443            .and_then(|surface| surface.get_interpolated(strike, expiry_ts))
444            .ok_or_else(|| VolLookupError::MissingSurface {
445                underlying: underlying.to_string(),
446                provider: VolProviderKind::Derive,
447                strike,
448                expiry_ts,
449            })?;
450
451        debug!(
452            underlying,
453            strike,
454            expiry_ts,
455            iv,
456            provider = VolProviderKind::Derive.as_str(),
457            "Backend vol oracle value used"
458        );
459
460        Ok(iv)
461    }
462
463    fn statuses(&self) -> Vec<VolOracleStatus> {
464        self.config
465            .symbols
466            .iter()
467            .map(|symbol| self.status_for(symbol))
468            .collect()
469    }
470
471    fn get_surface_snapshot(&self, underlying: &str) -> Option<VolSurfaceSnapshot> {
472        let state = self.state.read().expect("derive vol oracle state poisoned");
473        let surface = state.surfaces.get(underlying)?;
474        Some(VolSurfaceSnapshot {
475            underlying: underlying.to_string(),
476            last_update_ts_ms: state.last_update_ts_ms.get(underlying).copied(),
477            expiries: surface.expiries().iter().copied().collect(),
478            strike_points: surface.export_all_points(),
479            delta_curves: surface.export_delta_curves(),
480            atm_vols: surface.export_atm_vols(),
481            spot_price: None,
482        })
483    }
484
485    fn supports_surface_snapshots(&self) -> bool {
486        true
487    }
488}
489
490/// Parse strike price from a Derive instrument name.
491///
492/// Formats:
493/// - "HYPE-20260410-37-C" -> 37.0
494/// - "HYPE-20260410-34_5-C" -> 34.5  (underscore = decimal point)
495fn parse_strike_from_instrument(name: &str) -> Option<f64> {
496    let parts: Vec<&str> = name.split('-').collect();
497    if parts.len() < 4 {
498        return None;
499    }
500    // Third part is the strike, with underscore for decimal
501    let strike_str = parts[2].replace('_', ".");
502    strike_str.parse::<f64>().ok()
503}
504
505#[cfg(test)]
506mod tests {
507    use super::*;
508
509    #[test]
510    fn test_parse_strike() {
511        assert_eq!(
512            parse_strike_from_instrument("HYPE-20260410-37-C"),
513            Some(37.0)
514        );
515        assert_eq!(
516            parse_strike_from_instrument("HYPE-20260410-34_5-P"),
517            Some(34.5)
518        );
519        assert_eq!(
520            parse_strike_from_instrument("HYPE-20260410-100-C"),
521            Some(100.0)
522        );
523        assert_eq!(parse_strike_from_instrument("bad"), None);
524    }
525
526    #[test]
527    fn test_oracle_not_ready_before_poll() {
528        let config = DeriveVolOracleConfig {
529            base_url: DEFAULT_DERIVE_BASE_URL.to_string(),
530            poll_interval: Duration::from_secs(30),
531            staleness_threshold: Duration::from_secs(120),
532            symbols: vec!["HYPE".to_string()],
533        };
534        let oracle = DeriveVolOracle::new(config);
535
536        let result = oracle.get_iv("HYPE", 35.0, 1800000000);
537        assert!(matches!(
538            result,
539            Err(VolLookupError::UnhealthyProvider { .. })
540        ));
541    }
542}