1use std::collections::{BTreeSet, HashMap};
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::sync::{Arc, RwLock};
17use std::time::Duration;
18
19use anyhow::{Context, Result};
20use chrono::Utc;
21use metrics::counter;
22use reqwest::Client;
23use serde::Deserialize;
24use tokio::task::JoinHandle;
25use tracing::{debug, error, info, warn};
26
27use super::risk_oracle::{
28 RiskVolOracle, VolLookupError, VolOracleStatus, VolProviderKind, VolSurfaceSnapshot,
29};
30use super::vol_surface_cache::VolatilitySurface;
31
32pub const DEFAULT_DERIVE_BASE_URL: &str = "https://api.lyra.finance";
33
34#[derive(Debug, Clone)]
35pub struct DeriveVolOracleConfig {
36 pub base_url: String,
37 pub poll_interval: Duration,
38 pub staleness_threshold: Duration,
39 pub symbols: Vec<String>,
41}
42
43#[derive(Debug, Default)]
44struct DeriveSurfaceState {
45 surfaces: HashMap<String, VolatilitySurface>,
46 connected: HashMap<String, bool>,
47 last_update_ts_ms: HashMap<String, i64>,
48 last_error: HashMap<String, String>,
49}
50
51#[derive(Debug, Deserialize)]
54struct DeriveInstrumentsResponse {
55 result: Vec<DeriveInstrument>,
56}
57
58#[derive(Debug, Deserialize)]
59#[allow(dead_code)]
60struct DeriveInstrument {
61 instrument_name: String,
62 option_details: Option<DeriveOptionDetails>,
63}
64
65#[derive(Debug, Deserialize)]
66#[allow(dead_code)]
67struct DeriveOptionDetails {
68 expiry: i64,
69 strike: String,
70}
71
72#[derive(Debug, Deserialize)]
74struct DeriveTickersResponse {
75 result: DeriveTickersResult,
76}
77
78#[derive(Debug, Deserialize)]
79struct DeriveTickersResult {
80 tickers: HashMap<String, DeriveTicker>,
81}
82
83#[derive(Debug, Deserialize)]
84struct DeriveTicker {
85 #[serde(rename = "I")]
87 index_price: Option<String>,
88 option_pricing: Option<DeriveOptionPricing>,
90}
91
92#[derive(Debug, Deserialize)]
93#[allow(dead_code)]
94struct DeriveOptionPricing {
95 #[serde(rename = "i")]
97 iv: Option<String>,
98 #[serde(rename = "m")]
100 mark_price: Option<String>,
101}
102
103pub struct DeriveVolOracle {
104 client: Client,
105 config: DeriveVolOracleConfig,
106 state: Arc<RwLock<DeriveSurfaceState>>,
107 messages_received: AtomicU64,
108}
109
110impl DeriveVolOracle {
111 pub fn new(config: DeriveVolOracleConfig) -> Self {
112 Self {
113 client: Client::builder()
114 .timeout(Duration::from_secs(15))
115 .build()
116 .expect("failed to build HTTP client"),
117 config,
118 state: Arc::new(RwLock::new(DeriveSurfaceState::default())),
119 messages_received: AtomicU64::new(0),
120 }
121 }
122
123 pub fn start_polling(self: Arc<Self>) -> JoinHandle<()> {
124 tokio::spawn(async move {
125 let mut interval = tokio::time::interval(self.config.poll_interval);
126 loop {
127 interval.tick().await;
128 if let Err(err) = self.refresh_all().await {
129 error!("Derive vol oracle refresh failed: {err:#}");
130 }
131 }
132 })
133 }
134
135 async fn refresh_all(&self) -> Result<()> {
136 for symbol in &self.config.symbols {
137 match self.fetch_surface(symbol).await {
138 Ok((surface, spot)) => {
139 let now_ms = Utc::now().timestamp_millis();
140 let point_count = surface.len();
141 {
142 let mut state = self
143 .state
144 .write()
145 .expect("derive vol oracle state poisoned");
146 let existing_points =
150 state.surfaces.get(symbol).map(|s| s.len()).unwrap_or(0);
151 let accepted = point_count > 0
152 && (existing_points == 0 || point_count >= existing_points / 2);
153 if accepted {
154 state.surfaces.insert(symbol.clone(), surface);
155 state.connected.insert(symbol.clone(), true);
156 state.last_update_ts_ms.insert(symbol.clone(), now_ms);
157 state.last_error.remove(symbol);
158 } else if existing_points > 0 {
159 warn!(
162 "Derive partial refresh for {} ({} points vs {} existing), keeping old surface",
163 symbol, point_count, existing_points
164 );
165 }
166 }
167 self.messages_received.fetch_add(1, Ordering::Relaxed);
168 counter!(
169 "ht_vol_oracle_messages_received_total",
170 "provider" => VolProviderKind::Derive.as_str(),
171 "underlying" => symbol.clone()
172 )
173 .increment(1);
174 info!(
175 "Updated Derive vol surface for {} with {} points (spot ${:.2})",
176 symbol, point_count, spot
177 );
178 }
179 Err(err) => {
180 let message = err.to_string();
181 let mut state = self
182 .state
183 .write()
184 .expect("derive vol oracle state poisoned");
185 state.connected.insert(symbol.clone(), false);
186 state.last_error.insert(symbol.clone(), message.clone());
187 if state.surfaces.contains_key(symbol) {
188 warn!(
189 "Derive vol surface refresh failed for {} (keeping last good data): {}",
190 symbol, message
191 );
192 } else {
193 warn!(
194 "Derive vol surface refresh failed for {}: {}",
195 symbol, message
196 );
197 }
198 }
199 }
200 }
201
202 Ok(())
203 }
204
205 async fn fetch_surface(&self, symbol: &str) -> Result<(VolatilitySurface, f64)> {
211 let instruments_url = format!(
213 "{}/public/get_instruments?instrument_type=option¤cy={}&expired=false",
214 self.config.base_url, symbol
215 );
216 let instruments_resp = self
217 .client
218 .get(&instruments_url)
219 .send()
220 .await
221 .with_context(|| format!("Failed to fetch Derive instruments for {symbol}"))?
222 .error_for_status()
223 .with_context(|| format!("Derive instruments returned non-success for {symbol}"))?
224 .json::<DeriveInstrumentsResponse>()
225 .await
226 .with_context(|| format!("Failed to decode Derive instruments for {symbol}"))?;
227
228 let now_ts = Utc::now().timestamp();
230 let mut expiry_dates: BTreeSet<(i64, String)> = BTreeSet::new();
231 for inst in &instruments_resp.result {
232 if let Some(ref details) = inst.option_details {
233 if details.expiry > now_ts {
234 if let Some(dt) = chrono::DateTime::from_timestamp(details.expiry, 0) {
235 let date_str = dt.format("%Y%m%d").to_string();
236 expiry_dates.insert((details.expiry, date_str));
237 } else {
238 warn!(
239 "Skipping invalid Derive expiry timestamp: {}",
240 details.expiry
241 );
242 }
243 }
244 }
245 }
246
247 if expiry_dates.is_empty() {
248 anyhow::bail!("Derive returned no active expiries for {symbol}");
249 }
250
251 let mut surface = VolatilitySurface::with_precision(0.5);
256 let mut spot_price = 0.0_f64;
257 let mut total_inserted = 0_u32;
258
259 let tickers_url = format!("{}/public/get_tickers", self.config.base_url);
260
261 for (expiry_ts, expiry_date) in &expiry_dates {
262 let body = serde_json::json!({
263 "currency": symbol,
264 "instrument_type": "option",
265 "expiry_date": expiry_date,
266 });
267
268 let resp = match self.client.post(&tickers_url).json(&body).send().await {
269 Ok(r) => match r.error_for_status() {
270 Ok(r) => r,
271 Err(e) => {
272 warn!(
273 "Derive get_tickers HTTP error for {} expiry {}: {}",
274 symbol, expiry_date, e
275 );
276 continue;
277 }
278 },
279 Err(e) => {
280 warn!(
281 "Derive get_tickers failed for {} expiry {}: {}",
282 symbol, expiry_date, e
283 );
284 continue;
285 }
286 };
287
288 let tickers: DeriveTickersResponse = match resp.json().await {
289 Ok(t) => t,
290 Err(e) => {
291 warn!(
292 "Derive get_tickers decode failed for {} expiry {}: {}",
293 symbol, expiry_date, e
294 );
295 continue;
296 }
297 };
298
299 let mut expiry_count = 0_u32;
300 for (inst_name, ticker) in &tickers.result.tickers {
301 let strike = match parse_strike_from_instrument(inst_name) {
304 Some(s) => s,
305 None => continue,
306 };
307
308 let iv = match ticker
310 .option_pricing
311 .as_ref()
312 .and_then(|op| op.iv.as_ref())
313 .and_then(|s| s.parse::<f64>().ok())
314 {
315 Some(v) if v > 0.01 && v < 5.0 => v,
316 _ => continue,
317 };
318
319 if let Some(ref idx) = ticker.index_price {
321 if let Ok(p) = idx.parse::<f64>() {
322 if p > 0.0 {
323 spot_price = p;
324 }
325 }
326 }
327
328 surface.insert(strike, *expiry_ts, iv);
329 expiry_count += 1;
330
331 if spot_price > 0.0 {
333 let moneyness = (strike / spot_price - 1.0).abs();
334 if moneyness < 0.03 {
335 surface.set_atm_vol(*expiry_ts, iv);
336 }
337 }
338 }
339
340 total_inserted += expiry_count;
341 debug!(
342 "Derive {} expiry {}: {} IV points",
343 symbol, expiry_date, expiry_count
344 );
345 }
346
347 if total_inserted == 0 {
348 anyhow::bail!("Derive returned no usable IV points for {symbol}");
349 }
350
351 if spot_price > 0.0 {
356 let clamps = surface.sanitize_arb_free(spot_price, 0.0);
357 if clamps > 0 {
358 counter!(
359 "ht_vol_surface_arb_clamps_total",
360 "provider" => VolProviderKind::Derive.as_str(),
361 "underlying" => symbol.to_string()
362 )
363 .increment(clamps as u64);
364 info!(
365 underlying = symbol,
366 clamps, "Clamped non-arb-free vol surface points (derive)"
367 );
368 }
369 }
370
371 Ok((surface, spot_price))
372 }
373
374 fn status_for(&self, symbol: &str) -> VolOracleStatus {
375 let state = self.state.read().expect("derive vol oracle state poisoned");
376 let last_update_ts_ms = state.last_update_ts_ms.get(symbol).copied();
377 let staleness_seconds = last_update_ts_ms
378 .map(|ts| ((Utc::now().timestamp_millis() - ts) as f64 / 1000.0).max(0.0));
379 let ready = staleness_seconds
380 .map(|age| age <= self.config.staleness_threshold.as_secs_f64())
381 .unwrap_or(false);
382 let surface_points = state
383 .surfaces
384 .get(symbol)
385 .map(VolatilitySurface::len)
386 .unwrap_or(0);
387
388 VolOracleStatus {
389 underlying: symbol.to_string(),
390 provider: VolProviderKind::Derive,
391 route_facing: true,
392 connected: state.connected.get(symbol).copied().unwrap_or(false),
393 ready,
394 last_update_ts_ms,
395 staleness_seconds,
396 staleness_threshold_seconds: Some(self.config.staleness_threshold.as_secs_f64()),
397 surface_points,
398 messages_received: self.messages_received.load(Ordering::Relaxed),
399 last_error: state.last_error.get(symbol).cloned(),
400 }
401 }
402}
403
404impl RiskVolOracle for DeriveVolOracle {
405 fn get_iv(&self, underlying: &str, strike: f64, expiry_ts: i64) -> Result<f64, VolLookupError> {
406 let state = self.state.read().expect("derive vol oracle state poisoned");
407 let has_surface = state.surfaces.contains_key(underlying);
408
409 let connected = state.connected.get(underlying).copied().unwrap_or(false);
411 if !connected && !has_surface {
412 return Err(VolLookupError::UnhealthyProvider {
413 underlying: underlying.to_string(),
414 provider: VolProviderKind::Derive,
415 reason: state
416 .last_error
417 .get(underlying)
418 .cloned()
419 .unwrap_or_else(|| "not connected".to_string()),
420 });
421 }
422
423 let last_update_ts_ms = state.last_update_ts_ms.get(underlying).copied();
425 let staleness_seconds = last_update_ts_ms
426 .map(|ts| ((Utc::now().timestamp_millis() - ts) as f64 / 1000.0).max(0.0));
427 let ready = staleness_seconds
428 .map(|age| age <= self.config.staleness_threshold.as_secs_f64())
429 .unwrap_or(false);
430
431 if !ready {
432 return Err(VolLookupError::StaleSurface {
433 underlying: underlying.to_string(),
434 provider: VolProviderKind::Derive,
435 staleness_seconds: staleness_seconds.unwrap_or(f64::INFINITY),
436 threshold_seconds: self.config.staleness_threshold.as_secs_f64(),
437 });
438 }
439
440 let iv = state
441 .surfaces
442 .get(underlying)
443 .and_then(|surface| surface.get_interpolated(strike, expiry_ts))
444 .ok_or_else(|| VolLookupError::MissingSurface {
445 underlying: underlying.to_string(),
446 provider: VolProviderKind::Derive,
447 strike,
448 expiry_ts,
449 })?;
450
451 debug!(
452 underlying,
453 strike,
454 expiry_ts,
455 iv,
456 provider = VolProviderKind::Derive.as_str(),
457 "Backend vol oracle value used"
458 );
459
460 Ok(iv)
461 }
462
463 fn statuses(&self) -> Vec<VolOracleStatus> {
464 self.config
465 .symbols
466 .iter()
467 .map(|symbol| self.status_for(symbol))
468 .collect()
469 }
470
471 fn get_surface_snapshot(&self, underlying: &str) -> Option<VolSurfaceSnapshot> {
472 let state = self.state.read().expect("derive vol oracle state poisoned");
473 let surface = state.surfaces.get(underlying)?;
474 Some(VolSurfaceSnapshot {
475 underlying: underlying.to_string(),
476 last_update_ts_ms: state.last_update_ts_ms.get(underlying).copied(),
477 expiries: surface.expiries().iter().copied().collect(),
478 strike_points: surface.export_all_points(),
479 delta_curves: surface.export_delta_curves(),
480 atm_vols: surface.export_atm_vols(),
481 spot_price: None,
482 })
483 }
484
485 fn supports_surface_snapshots(&self) -> bool {
486 true
487 }
488}
489
490fn parse_strike_from_instrument(name: &str) -> Option<f64> {
496 let parts: Vec<&str> = name.split('-').collect();
497 if parts.len() < 4 {
498 return None;
499 }
500 let strike_str = parts[2].replace('_', ".");
502 strike_str.parse::<f64>().ok()
503}
504
505#[cfg(test)]
506mod tests {
507 use super::*;
508
509 #[test]
510 fn test_parse_strike() {
511 assert_eq!(
512 parse_strike_from_instrument("HYPE-20260410-37-C"),
513 Some(37.0)
514 );
515 assert_eq!(
516 parse_strike_from_instrument("HYPE-20260410-34_5-P"),
517 Some(34.5)
518 );
519 assert_eq!(
520 parse_strike_from_instrument("HYPE-20260410-100-C"),
521 Some(100.0)
522 );
523 assert_eq!(parse_strike_from_instrument("bad"), None);
524 }
525
526 #[test]
527 fn test_oracle_not_ready_before_poll() {
528 let config = DeriveVolOracleConfig {
529 base_url: DEFAULT_DERIVE_BASE_URL.to_string(),
530 poll_interval: Duration::from_secs(30),
531 staleness_threshold: Duration::from_secs(120),
532 symbols: vec!["HYPE".to_string()],
533 };
534 let oracle = DeriveVolOracle::new(config);
535
536 let result = oracle.get_iv("HYPE", 35.0, 1800000000);
537 assert!(matches!(
538 result,
539 Err(VolLookupError::UnhealthyProvider { .. })
540 ));
541 }
542}