1use std::collections::HashMap;
12use std::sync::atomic::{AtomicU64, Ordering};
13use std::sync::{Arc, RwLock};
14use std::time::Duration;
15
16use anyhow::{Context, Result};
17use chrono::Utc;
18use metrics::counter;
19use reqwest::Client;
20use serde::Deserialize;
21use tokio::task::JoinHandle;
22use tracing::{debug, error, info, warn};
23
24use super::risk_oracle::{
25 RiskVolOracle, VolLookupError, VolOracleStatus, VolProviderKind, VolSurfaceSnapshot,
26};
27use super::vol_surface_cache::VolatilitySurface;
28
29pub const DEFAULT_DERIBIT_BASE_URL: &str = "https://www.deribit.com/api/v2";
30
31#[derive(Debug, Clone)]
32pub struct DeribitVolOracleConfig {
33 pub base_url: String,
34 pub poll_interval: Duration,
35 pub staleness_threshold: Duration,
36 pub symbols: Vec<String>,
38}
39
40#[derive(Debug, Default)]
41struct DeribitSurfaceState {
42 surfaces: HashMap<String, VolatilitySurface>,
43 connected: HashMap<String, bool>,
44 last_update_ts_ms: HashMap<String, i64>,
45 last_error: HashMap<String, String>,
46}
47
48#[derive(Debug, Deserialize)]
50struct DeribitResponse {
51 result: Vec<BookSummary>,
52}
53
54#[derive(Debug, Deserialize)]
55struct BookSummary {
56 instrument_name: String,
57 mark_iv: Option<f64>,
59 underlying_price: Option<f64>,
60}
61
62#[derive(Debug, Deserialize)]
64struct DeribitInstrumentsResponse {
65 result: Vec<DeribitInstrument>,
66}
67
68#[derive(Debug, Deserialize)]
69struct DeribitInstrument {
70 instrument_name: String,
71 strike: f64,
72 expiration_timestamp: i64,
74}
75
76pub struct DeribitVolOracle {
77 client: Client,
78 config: DeribitVolOracleConfig,
79 state: Arc<RwLock<DeribitSurfaceState>>,
80 messages_received: AtomicU64,
81}
82
83impl DeribitVolOracle {
84 pub fn new(config: DeribitVolOracleConfig) -> Self {
85 Self {
86 client: Client::builder()
87 .timeout(Duration::from_secs(15))
88 .build()
89 .expect("failed to build HTTP client"),
90 config,
91 state: Arc::new(RwLock::new(DeribitSurfaceState::default())),
92 messages_received: AtomicU64::new(0),
93 }
94 }
95
96 pub fn start_polling(self: Arc<Self>) -> JoinHandle<()> {
97 tokio::spawn(async move {
98 let mut interval = tokio::time::interval(self.config.poll_interval);
99 loop {
100 interval.tick().await;
101 if let Err(err) = self.refresh_all().await {
102 error!("Deribit vol oracle refresh failed: {err:#}");
103 }
104 }
105 })
106 }
107
108 async fn refresh_all(&self) -> Result<()> {
109 for symbol in &self.config.symbols {
110 match self.fetch_surface(symbol).await {
111 Ok((surface, spot)) => {
112 let now_ms = Utc::now().timestamp_millis();
113 let point_count = surface.len();
114 {
115 let mut state = self
116 .state
117 .write()
118 .expect("deribit vol oracle state poisoned");
119 state.connected.insert(symbol.clone(), true);
120 state.last_update_ts_ms.insert(symbol.clone(), now_ms);
121 state.last_error.remove(symbol);
122 state.surfaces.insert(symbol.clone(), surface);
123 }
124 self.messages_received.fetch_add(1, Ordering::Relaxed);
125 counter!(
126 "ht_vol_oracle_messages_received_total",
127 "provider" => VolProviderKind::Deribit.as_str(),
128 "underlying" => symbol.clone()
129 )
130 .increment(1);
131 info!(
132 "Updated Deribit vol surface for {} with {} points (spot ${:.0})",
133 symbol, point_count, spot
134 );
135 }
136 Err(err) => {
137 let message = err.to_string();
138 let mut state = self
139 .state
140 .write()
141 .expect("deribit vol oracle state poisoned");
142 state.connected.insert(symbol.clone(), false);
143 state.last_error.insert(symbol.clone(), message.clone());
144 if state.surfaces.contains_key(symbol) {
145 warn!(
146 "Deribit vol surface refresh failed for {} (keeping last good data): {}",
147 symbol, message
148 );
149 } else {
150 warn!(
151 "Deribit vol surface refresh failed for {}: {}",
152 symbol, message
153 );
154 }
155 }
156 }
157 }
158
159 Ok(())
160 }
161
162 async fn fetch_surface(&self, symbol: &str) -> Result<(VolatilitySurface, f64)> {
167 let instruments = self.fetch_instruments(symbol).await?;
170
171 let mut instrument_map: HashMap<String, (f64, i64)> = HashMap::new();
173 for inst in &instruments {
174 instrument_map.insert(
176 inst.instrument_name.clone(),
177 (inst.strike, inst.expiration_timestamp / 1000),
178 );
179 }
180
181 let url = format!(
183 "{}/public/get_book_summary_by_currency",
184 self.config.base_url
185 );
186 let resp = self
187 .client
188 .get(&url)
189 .query(&[("currency", symbol), ("kind", "option")])
190 .send()
191 .await
192 .with_context(|| format!("Failed to fetch Deribit book summary for {symbol}"))?
193 .error_for_status()
194 .with_context(|| format!("Deribit returned non-success status for {symbol}"))?
195 .json::<DeribitResponse>()
196 .await
197 .with_context(|| format!("Failed to decode Deribit book summary for {symbol}"))?;
198
199 let mut surface = VolatilitySurface::with_precision(1.0);
203 let now_ts = Utc::now().timestamp();
204 let mut spot_price = 0.0_f64;
205 let mut inserted = 0_u32;
206
207 for summary in &resp.result {
208 if let Some(price) = summary.underlying_price {
210 if price > 0.0 {
211 spot_price = price;
212 }
213 }
214
215 let Some(mark_iv_pct) = summary.mark_iv else {
216 continue;
217 };
218 let iv = mark_iv_pct / 100.0;
220
221 if !(0.01..5.0).contains(&iv) {
223 debug!(
224 "Skipping {} with IV {:.1}% (out of range)",
225 summary.instrument_name, mark_iv_pct
226 );
227 continue;
228 }
229
230 let Some(&(strike, expiry_ts)) = instrument_map.get(&summary.instrument_name) else {
232 continue;
233 };
234
235 if expiry_ts <= now_ts {
237 continue;
238 }
239
240 surface.insert(strike, expiry_ts, iv);
241 inserted += 1;
242
243 if spot_price > 0.0 {
245 let moneyness = (strike / spot_price - 1.0).abs();
246 if moneyness < 0.02 {
247 surface.set_atm_vol(expiry_ts, iv);
248 }
249 }
250 }
251
252 if inserted == 0 {
253 anyhow::bail!("Deribit returned no usable IV points for {symbol}");
254 }
255
256 debug!(
257 "Deribit {} surface: {} points from {} book summaries, {} instruments",
258 symbol,
259 inserted,
260 resp.result.len(),
261 instruments.len()
262 );
263
264 if spot_price > 0.0 {
268 let clamps = surface.sanitize_arb_free(spot_price, 0.0);
269 if clamps > 0 {
270 counter!(
271 "ht_vol_surface_arb_clamps_total",
272 "provider" => VolProviderKind::Deribit.as_str(),
273 "underlying" => symbol.to_string()
274 )
275 .increment(clamps as u64);
276 info!(
277 underlying = symbol,
278 clamps, "Clamped non-arb-free vol surface points (deribit)"
279 );
280 }
281 }
282
283 Ok((surface, spot_price))
284 }
285
286 async fn fetch_instruments(&self, symbol: &str) -> Result<Vec<DeribitInstrument>> {
288 let url = format!("{}/public/get_instruments", self.config.base_url);
289 let resp = self
290 .client
291 .get(&url)
292 .query(&[
293 ("currency", symbol),
294 ("kind", "option"),
295 ("expired", "false"),
296 ])
297 .send()
298 .await
299 .with_context(|| format!("Failed to fetch Deribit instruments for {symbol}"))?
300 .error_for_status()
301 .with_context(|| {
302 format!("Deribit instruments returned non-success status for {symbol}")
303 })?
304 .json::<DeribitInstrumentsResponse>()
305 .await
306 .with_context(|| format!("Failed to decode Deribit instruments for {symbol}"))?;
307
308 Ok(resp.result)
309 }
310
311 fn status_for(&self, symbol: &str) -> VolOracleStatus {
312 let state = self
313 .state
314 .read()
315 .expect("deribit vol oracle state poisoned");
316 let last_update_ts_ms = state.last_update_ts_ms.get(symbol).copied();
317 let staleness_seconds = last_update_ts_ms
318 .map(|ts| ((Utc::now().timestamp_millis() - ts) as f64 / 1000.0).max(0.0));
319 let ready = staleness_seconds
320 .map(|age| age <= self.config.staleness_threshold.as_secs_f64())
321 .unwrap_or(false);
322 let surface_points = state
323 .surfaces
324 .get(symbol)
325 .map(VolatilitySurface::len)
326 .unwrap_or(0);
327
328 VolOracleStatus {
329 underlying: symbol.to_string(),
330 provider: VolProviderKind::Deribit,
331 route_facing: true,
332 connected: state.connected.get(symbol).copied().unwrap_or(false),
333 ready,
334 last_update_ts_ms,
335 staleness_seconds,
336 staleness_threshold_seconds: Some(self.config.staleness_threshold.as_secs_f64()),
337 surface_points,
338 messages_received: self.messages_received.load(Ordering::Relaxed),
339 last_error: state.last_error.get(symbol).cloned(),
340 }
341 }
342}
343
344impl RiskVolOracle for DeribitVolOracle {
345 fn get_iv(&self, underlying: &str, strike: f64, expiry_ts: i64) -> Result<f64, VolLookupError> {
346 let state = self
347 .state
348 .read()
349 .expect("deribit vol oracle state poisoned");
350 let has_surface = state.surfaces.contains_key(underlying);
351
352 let connected = state.connected.get(underlying).copied().unwrap_or(false);
354 if !connected && !has_surface {
355 return Err(VolLookupError::UnhealthyProvider {
356 underlying: underlying.to_string(),
357 provider: VolProviderKind::Deribit,
358 reason: state
359 .last_error
360 .get(underlying)
361 .cloned()
362 .unwrap_or_else(|| "not connected".to_string()),
363 });
364 }
365
366 let last_update_ts_ms = state.last_update_ts_ms.get(underlying).copied();
368 let staleness_seconds = last_update_ts_ms
369 .map(|ts| ((Utc::now().timestamp_millis() - ts) as f64 / 1000.0).max(0.0));
370 let ready = staleness_seconds
371 .map(|age| age <= self.config.staleness_threshold.as_secs_f64())
372 .unwrap_or(false);
373
374 if !ready {
375 return Err(VolLookupError::StaleSurface {
376 underlying: underlying.to_string(),
377 provider: VolProviderKind::Deribit,
378 staleness_seconds: staleness_seconds.unwrap_or(f64::INFINITY),
379 threshold_seconds: self.config.staleness_threshold.as_secs_f64(),
380 });
381 }
382
383 let iv = state
385 .surfaces
386 .get(underlying)
387 .and_then(|surface| surface.get_interpolated(strike, expiry_ts))
388 .ok_or_else(|| VolLookupError::MissingSurface {
389 underlying: underlying.to_string(),
390 provider: VolProviderKind::Deribit,
391 strike,
392 expiry_ts,
393 })?;
394
395 debug!(
396 underlying,
397 strike,
398 expiry_ts,
399 iv,
400 provider = VolProviderKind::Deribit.as_str(),
401 "Backend vol oracle value used"
402 );
403
404 Ok(iv)
405 }
406
407 fn statuses(&self) -> Vec<VolOracleStatus> {
408 self.config
409 .symbols
410 .iter()
411 .map(|symbol| self.status_for(symbol))
412 .collect()
413 }
414
415 fn get_surface_snapshot(&self, underlying: &str) -> Option<VolSurfaceSnapshot> {
416 let state = self
417 .state
418 .read()
419 .expect("deribit vol oracle state poisoned");
420 let surface = state.surfaces.get(underlying)?;
421 Some(VolSurfaceSnapshot {
422 underlying: underlying.to_string(),
423 last_update_ts_ms: state.last_update_ts_ms.get(underlying).copied(),
424 expiries: surface.expiries().iter().copied().collect(),
425 strike_points: surface.export_all_points(),
426 delta_curves: surface.export_delta_curves(),
427 atm_vols: surface.export_atm_vols(),
428 spot_price: None,
429 })
430 }
431
432 fn supports_surface_snapshots(&self) -> bool {
433 true
434 }
435}
436
437#[cfg(test)]
438mod tests {
439 use super::*;
440
441 #[test]
442 fn test_oracle_not_ready_before_poll() {
443 let config = DeribitVolOracleConfig {
444 base_url: DEFAULT_DERIBIT_BASE_URL.to_string(),
445 poll_interval: Duration::from_secs(15),
446 staleness_threshold: Duration::from_secs(120),
447 symbols: vec!["BTC".to_string()],
448 };
449 let oracle = DeribitVolOracle::new(config);
450
451 let result = oracle.get_iv("BTC", 85000.0, 1800000000);
453 assert!(matches!(
454 result,
455 Err(VolLookupError::UnhealthyProvider { .. })
456 ));
457 }
458}