Skip to main content

hypercall_vol_oracle/
blockscholes_oracle.rs

1//! Block Scholes Volatility Oracle implementation.
2//!
3//! This module provides a WebSocket-based oracle for fetching real-time
4//! implied volatility data from the Block Scholes API.
5
6use std::collections::HashMap;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::{Arc, RwLock};
9use std::time::{Duration, Instant};
10
11use anyhow::{Context, Result};
12use async_trait::async_trait;
13use chrono::Utc;
14use futures::{SinkExt, StreamExt};
15use metrics::{counter, gauge};
16use sonic_rs::{JsonContainerTrait, JsonValueTrait};
17use tokio::task::JoinHandle;
18use tokio_tungstenite::{connect_async, tungstenite::Message};
19use tracing::{debug, error, info, warn};
20
21use super::blockscholes_types::{
22    AtmVolUpdateMessage, BlockScholesAuth, BlockScholesMessage, BlockScholesSubscribe,
23    VolSurfaceUpdateMessage,
24};
25use super::risk_oracle::{
26    RiskVolOracle, VolLookupError, VolOracleStatus, VolProviderKind, VolSurfaceSnapshot,
27};
28use super::vol_surface_cache::VolatilitySurface;
29use crate::PollingVolOracle;
30
31/// Default WebSocket URL for Block Scholes API
32pub const DEFAULT_WS_URL: &str = "wss://prod-websocket-api.blockscholes.com/";
33
34/// Default reconnection delay in milliseconds
35pub const DEFAULT_RECONNECT_DELAY_MS: u64 = 5000;
36
37/// Default heartbeat interval in milliseconds
38pub const DEFAULT_HEARTBEAT_INTERVAL_MS: u64 = 30000;
39
40/// Default cache TTL in milliseconds
41pub const DEFAULT_CACHE_TTL_MS: u64 = 60000;
42
43/// Configuration for the Block Scholes Volatility Oracle.
44#[derive(Debug, Clone)]
45pub struct BlockScholesVolOracleConfig {
46    /// WebSocket URL for Block Scholes API
47    pub ws_url: String,
48    /// API key for authentication (when credentials are available)
49    pub api_key: Option<String>,
50    /// API secret for authentication (if required)
51    pub api_secret: Option<String>,
52    /// Symbols to track (e.g., ["BTC", "ETH"])
53    pub symbols: Vec<String>,
54    /// Strikes to subscribe to by symbol (e.g., {"BTC": [80000, 90000, 100000]})
55    /// If empty, only ATM IV (delta=0.5) will be subscribed
56    pub strikes: std::collections::HashMap<String, Vec<f64>>,
57    /// Expiries to subscribe to in ISO 8601 format (e.g., "2026-03-27T08:00:00Z")
58    /// These should be "listed" expiries from the exchange (Deribit quarterly/monthly)
59    pub expiries: Vec<String>,
60    /// Reconnection delay in milliseconds
61    pub reconnect_delay_ms: u64,
62    /// Maximum reconnection attempts (0 = infinite)
63    pub max_reconnect_attempts: u32,
64    /// Heartbeat interval in milliseconds
65    pub heartbeat_interval_ms: u64,
66    /// Cache TTL in milliseconds (how long to consider data fresh)
67    pub cache_ttl_ms: u64,
68}
69
70impl Default for BlockScholesVolOracleConfig {
71    fn default() -> Self {
72        Self {
73            ws_url: DEFAULT_WS_URL.to_string(),
74            api_key: None,
75            api_secret: None,
76            symbols: vec!["BTC".to_string(), "ETH".to_string()],
77            strikes: std::collections::HashMap::new(),
78            expiries: vec!["2026-03-27T08:00:00Z".to_string()], // Default Q1 2026 expiry
79            reconnect_delay_ms: DEFAULT_RECONNECT_DELAY_MS,
80            max_reconnect_attempts: 0, // Infinite
81            heartbeat_interval_ms: DEFAULT_HEARTBEAT_INTERVAL_MS,
82            cache_ttl_ms: DEFAULT_CACHE_TTL_MS,
83        }
84    }
85}
86
87/// Internal state for the volatility oracle.
88#[derive(Debug, Default)]
89struct OracleState {
90    /// Volatility surface cache by symbol
91    surfaces: HashMap<String, VolatilitySurface>,
92    /// Last successful update timestamp by symbol (milliseconds)
93    last_update_timestamps: HashMap<String, i64>,
94    /// Index (spot) prices by symbol, used for delta→strike conversion
95    index_prices: HashMap<String, f64>,
96    /// Connection health status
97    is_connected: bool,
98    /// Reconnection attempt counter
99    reconnect_attempts: u32,
100    /// Last provider error for reporting
101    last_error: Option<String>,
102}
103
104/// Block Scholes Volatility Oracle implementation.
105///
106/// Connects to Block Scholes WebSocket API to receive real-time
107/// volatility surface updates.
108pub struct BlockScholesVolOracle {
109    config: BlockScholesVolOracleConfig,
110    state: Arc<RwLock<OracleState>>,
111    messages_received: AtomicU64,
112}
113
114impl BlockScholesVolOracle {
115    /// Create a new oracle with the given configuration.
116    pub fn new(config: BlockScholesVolOracleConfig) -> Self {
117        Self {
118            config,
119            state: Arc::new(RwLock::new(OracleState::default())),
120            messages_received: AtomicU64::new(0),
121        }
122    }
123
124    /// Create with default configuration.
125    pub fn with_defaults() -> Self {
126        Self::new(BlockScholesVolOracleConfig::default())
127    }
128
129    /// Get the oracle configuration.
130    pub fn config(&self) -> &BlockScholesVolOracleConfig {
131        &self.config
132    }
133
134    fn status_for(&self, symbol: &str) -> VolOracleStatus {
135        let state = self
136            .state
137            .read()
138            .expect("block scholes vol oracle state poisoned");
139        let last_update_ts_ms = state.last_update_timestamps.get(symbol).copied();
140        let staleness_seconds = last_update_ts_ms
141            .map(|ts| ((Utc::now().timestamp_millis() - ts) as f64 / 1000.0).max(0.0));
142        let surface_points = state
143            .surfaces
144            .get(symbol)
145            .map(VolatilitySurface::len)
146            .unwrap_or(0);
147        let ready = state.is_connected
148            && surface_points > 0
149            && staleness_seconds
150                .map(|age| age <= self.config.cache_ttl_ms as f64 / 1000.0)
151                .unwrap_or(false);
152
153        VolOracleStatus {
154            underlying: symbol.to_string(),
155            provider: VolProviderKind::BlockScholes,
156            route_facing: true,
157            connected: state.is_connected,
158            ready,
159            last_update_ts_ms,
160            staleness_seconds,
161            staleness_threshold_seconds: Some(self.config.cache_ttl_ms as f64 / 1000.0),
162            surface_points,
163            messages_received: self.messages_received.load(Ordering::Relaxed),
164            last_error: state.last_error.clone(),
165        }
166    }
167
168    /// Connect to WebSocket and process messages.
169    async fn connect_and_run(self: &Arc<Self>) -> Result<()> {
170        let url = &self.config.ws_url;
171        info!("Connecting to Block Scholes WebSocket: {}", url);
172
173        let (ws_stream, _) = connect_async(url)
174            .await
175            .context("Failed to connect to Block Scholes WebSocket")?;
176
177        let (mut write, mut read) = ws_stream.split();
178
179        // Update connection state
180        {
181            let mut state = self
182                .state
183                .write()
184                .expect("block scholes vol oracle state poisoned");
185            state.is_connected = true;
186            state.reconnect_attempts = 0;
187            state.last_error = None;
188        }
189
190        gauge!(
191            "ht_vol_oracle_connected",
192            "provider" => VolProviderKind::BlockScholes.as_str()
193        )
194        .set(1.0);
195
196        info!("Connected to Block Scholes WebSocket");
197
198        // Authenticate if credentials are available
199        if let Some(ref api_key) = self.config.api_key {
200            let auth_msg = BlockScholesAuth::new(api_key, self.config.api_secret.clone());
201            let json = sonic_rs::to_string(&auth_msg)?;
202            write.send(Message::Text(json)).await?;
203            info!("Sent authentication request");
204        }
205
206        // Subscribe to volatility feeds for each symbol and expiry
207        // Block Scholes requires "listed" expiries (actual exchange expiries)
208        for symbol in &self.config.symbols {
209            for expiry in &self.config.expiries {
210                // Subscribe to delta grid IV instead of strike-level IV.
211                // Strike-level subscriptions require exchange-listed strikes
212                // (Block Scholes rejects unlisted strikes with -2610 errors).
213                // Delta grid doesn't need listed strikes and gives us enough
214                // points to interpolate a full vol surface via BS delta inversion.
215                let delta_subscribe = BlockScholesSubscribe::delta_iv(
216                    symbol.clone(),
217                    expiry.clone(),
218                    Some(format!("delta_{}_{}", symbol, expiry)),
219                );
220                let json = sonic_rs::to_string(&delta_subscribe)?;
221                write.send(Message::Text(json)).await?;
222                info!(
223                    "Subscribed to delta.iv grid for {} at expiry {}",
224                    symbol, expiry
225                );
226            }
227
228            // Also subscribe to index price for the symbol
229            let index_subscribe = BlockScholesSubscribe::index_price(
230                symbol.clone(),
231                Some(format!("index_{}", symbol)),
232            );
233            let json = sonic_rs::to_string(&index_subscribe)?;
234            write.send(Message::Text(json)).await?;
235            info!("Subscribed to index price for {}", symbol);
236        }
237
238        // Set up heartbeat interval
239        let mut heartbeat =
240            tokio::time::interval(Duration::from_millis(self.config.heartbeat_interval_ms));
241
242        // Staleness watchdog: if no data messages arrive for 30s, the connection
243        // is stalled (TCP alive via pings but Block Scholes not sending data).
244        // Block Scholes sends data every 1s, so 30s = 30x tolerance.
245        // Vol surface stale PD alert fires at 120s, so 30s gives ~90s buffer
246        // for the reconnect + re-subscribe to complete before paging.
247        let staleness_timeout = Duration::from_secs(30);
248        let mut last_data_message = Instant::now();
249
250        // Process messages
251        loop {
252            tokio::select! {
253                msg = read.next() => {
254                    match msg {
255                        Some(Ok(Message::Text(text))) => {
256                            last_data_message = Instant::now();
257                            self.handle_message(&text).await;
258                        }
259                        Some(Ok(Message::Binary(data))) => {
260                            last_data_message = Instant::now();
261                            // Try to parse binary as JSON text
262                            if let Ok(text) = String::from_utf8(data) {
263                                self.handle_message(&text).await;
264                            }
265                        }
266                        Some(Ok(Message::Close(frame))) => {
267                            info!("WebSocket closed by server: {:?}", frame);
268                            break;
269                        }
270                        Some(Ok(Message::Ping(data))) => {
271                            if let Err(e) = write.send(Message::Pong(data)).await {
272                                error!("Failed to send pong: {}", e);
273                            }
274                        }
275                        Some(Ok(Message::Pong(_))) => {
276                            debug!("Received pong");
277                        }
278                        Some(Ok(Message::Frame(_))) => {
279                            // Raw frame, ignore
280                        }
281                        Some(Err(e)) => {
282                            error!("WebSocket error: {}", e);
283                            break;
284                        }
285                        None => {
286                            info!("WebSocket stream ended");
287                            break;
288                        }
289                    }
290                }
291                _ = heartbeat.tick() => {
292                    // Send ping to keep connection alive
293                    if let Err(e) = write.send(Message::Ping(vec![])).await {
294                        error!("Failed to send ping: {}", e);
295                        break;
296                    }
297                    debug!("Sent heartbeat ping");
298
299                    // Check for stalled connection
300                    if last_data_message.elapsed() > staleness_timeout {
301                        warn!(
302                            "Block Scholes WebSocket stalled: no data messages for {:.0}s, forcing reconnect",
303                            last_data_message.elapsed().as_secs_f64()
304                        );
305                        counter!("ht_vol_oracle_stale_reconnects_total", "provider" => VolProviderKind::BlockScholes.as_str()).increment(1);
306                        break;
307                    }
308                }
309            }
310        }
311
312        // Mark as disconnected
313        {
314            let mut state = self
315                .state
316                .write()
317                .expect("block scholes vol oracle state poisoned");
318            state.is_connected = false;
319        }
320
321        gauge!(
322            "ht_vol_oracle_connected",
323            "provider" => VolProviderKind::BlockScholes.as_str()
324        )
325        .set(0.0);
326
327        Ok(())
328    }
329
330    /// Handle incoming WebSocket message.
331    async fn handle_message(&self, text: &str) {
332        // First try to parse as JSON-RPC 2.0 response
333        if let Ok(rpc) = sonic_rs::from_str::<super::blockscholes_types::JsonRpcResponse>(text) {
334            if let Some(ref err) = rpc.error {
335                error!(
336                    "Block Scholes RPC error (code {}): {}",
337                    err.code, err.message
338                );
339                return;
340            }
341
342            if rpc.is_success() {
343                // Check if this is an auth response
344                if rpc.result == Some(sonic_rs::json!("ok")) {
345                    info!("Authentication successful");
346                    return;
347                }
348                // Other success responses
349                debug!("RPC success: {:?}", rpc.result);
350                return;
351            }
352
353            // Check for subscription data push (method="subscription" with params)
354            if rpc.method.as_deref() == Some("subscription") {
355                if let Some(params) = rpc.params {
356                    self.process_subscription_data(params).await;
357                }
358                return;
359            }
360        }
361
362        // Fall back to trying the custom message format
363        match sonic_rs::from_str::<BlockScholesMessage>(text) {
364            Ok(msg) => match msg {
365                BlockScholesMessage::VolSurfaceUpdate(update) => {
366                    self.process_vol_surface_update(update).await;
367                }
368                BlockScholesMessage::AtmVolUpdate(update) => {
369                    self.process_atm_vol_update(update).await;
370                }
371                BlockScholesMessage::Subscribed { channel, symbol } => {
372                    info!("Confirmed subscription: {} for {}", channel, symbol);
373                }
374                BlockScholesMessage::Unsubscribed { channel, symbol } => {
375                    info!("Confirmed unsubscription: {} for {}", channel, symbol);
376                }
377                BlockScholesMessage::Authenticated { message } => {
378                    info!("Authentication successful: {:?}", message);
379                }
380                BlockScholesMessage::Error { message, code } => {
381                    error!("Block Scholes error (code {:?}): {}", code, message);
382                }
383                BlockScholesMessage::Heartbeat { timestamp } => {
384                    debug!("Received server heartbeat: {}", timestamp);
385                }
386                BlockScholesMessage::Info { message, version } => {
387                    info!("Server info: {:?}, version: {:?}", message, version);
388                }
389            },
390            Err(e) => {
391                debug!("Failed to parse message: {} - {}", e, text);
392            }
393        }
394    }
395
396    /// Process Block Scholes subscription data.
397    /// Format: [{"data": {"values": [{"sid": "...", "v": 0.42, ...}], "timestamp": 123}, "client_id": "..."}]
398    async fn process_subscription_data(&self, params: sonic_rs::Value) {
399        let params_array = match params.as_array() {
400            Some(arr) => arr,
401            None => {
402                debug!("Subscription params is not an array: {:?}", params);
403                return;
404            }
405        };
406
407        let mut state = self
408            .state
409            .write()
410            .expect("block scholes vol oracle state poisoned");
411
412        for param in params_array {
413            let client_id = param
414                .get("client_id")
415                .and_then(|v| v.as_str())
416                .unwrap_or("");
417            let data = match param.get("data") {
418                Some(d) => d,
419                None => continue,
420            };
421
422            let timestamp = data.get("timestamp").and_then(|v| v.as_i64());
423            let values = match data.get("values").and_then(|v| v.as_array()) {
424                Some(v) => v,
425                None => continue,
426            };
427
428            for value in values {
429                let sid = value.get("sid").and_then(|v| v.as_str()).unwrap_or("");
430
431                // Extract base asset from client_id (format: "delta_BTC_expiry", "strike_BTC_expiry", or "index_BTC")
432                let base_asset = if client_id.starts_with("delta_")
433                    || client_id.starts_with("atm_")
434                    || client_id.starts_with("strike_")
435                {
436                    client_id.split('_').nth(1).unwrap_or("BTC")
437                } else if let Some(index_symbol) = client_id.strip_prefix("index_") {
438                    index_symbol
439                } else {
440                    // Try to extract from sid
441                    sid.split('_').nth(1).unwrap_or("BTC")
442                };
443
444                // Extract expiry timestamp from client_id if available
445                // Format: "strike_BTC_2026-03-27T08:00:00Z" or "delta_BTC_2026-03-27T08:00:00Z"
446                let expiry_ts = client_id
447                    .split('_')
448                    .nth(2)
449                    .and_then(|exp| chrono::DateTime::parse_from_rfc3339(exp).ok())
450                    .map(|dt| dt.timestamp())
451                    .unwrap_or_else(|| timestamp.unwrap_or(0) / 1000);
452
453                // Handle strike.iv feed (IV by strike price)
454                if let Some(strikes) = value.get("strike").and_then(|v| v.as_array()) {
455                    if let Some(ivs) = value.get("v").and_then(|v| v.as_array()) {
456                        let surface = state.surfaces.entry(base_asset.to_string()).or_default();
457
458                        for (strike, iv) in strikes.iter().zip(ivs.iter()) {
459                            if let (Some(s), Some(v)) = (strike.as_f64(), iv.as_f64()) {
460                                surface.insert(s, expiry_ts, v);
461                                debug!(
462                                    "Updated strike IV for {} strike={} expiry={}: {:.2}%",
463                                    base_asset,
464                                    s,
465                                    expiry_ts,
466                                    v * 100.0
467                                );
468                            }
469                        }
470                        info!(
471                            "Updated {} strike IVs for {} at expiry {}",
472                            strikes.len(),
473                            base_asset,
474                            expiry_ts
475                        );
476                    }
477                }
478                // Handle delta.iv feed — store delta-IV curve for smile interpolation
479                else if let Some(deltas) = value.get("delta").and_then(|v| v.as_array()) {
480                    if let Some(ivs) = value.get("v").and_then(|v| v.as_array()) {
481                        let surface = state.surfaces.entry(base_asset.to_string()).or_default();
482
483                        let mut count = 0u32;
484                        for (delta, iv) in deltas.iter().zip(ivs.iter()) {
485                            if let (Some(d), Some(v)) = (delta.as_f64(), iv.as_f64()) {
486                                // Store in delta-IV curve for smile interpolation
487                                surface.set_delta_iv(expiry_ts, d, v);
488                                count += 1;
489
490                                // Also store ATM vol for delta ≈ 0.5
491                                if (d - 0.5).abs() < 0.01 {
492                                    surface.set_atm_vol(expiry_ts, v);
493                                }
494                            }
495                        }
496
497                        if count > 0 {
498                            debug!(
499                                "Updated {} delta IVs for {} at expiry {}",
500                                count, base_asset, expiry_ts
501                            );
502                        }
503                    }
504                }
505                // Handle index.px feed — store spot price for delta→strike conversion
506                else if let Some(price) = value.get("v").and_then(|v| v.as_f64()) {
507                    state.index_prices.insert(base_asset.to_string(), price);
508                    debug!("Index price for {}: ${:.2}", base_asset, price);
509                    // Don't update last_update_timestamp for index prices —
510                    // only IV data should affect staleness checks.
511                    continue;
512                }
513                if let Some(timestamp) = timestamp {
514                    state
515                        .last_update_timestamps
516                        .insert(base_asset.to_string(), timestamp);
517                }
518            }
519        }
520
521        self.messages_received.fetch_add(1, Ordering::Relaxed);
522        counter!(
523            "ht_vol_oracle_messages_received_total",
524            "provider" => VolProviderKind::BlockScholes.as_str()
525        )
526        .increment(1);
527    }
528
529    /// Process volatility surface update.
530    async fn process_vol_surface_update(&self, update: VolSurfaceUpdateMessage) {
531        let mut state = self
532            .state
533            .write()
534            .expect("block scholes vol oracle state poisoned");
535
536        let surface = state.surfaces.entry(update.symbol.clone()).or_default();
537
538        // Update the surface with new data points
539        for point in &update.points {
540            surface.insert(point.strike, point.expiry, point.iv);
541        }
542
543        // Update ATM vols if provided
544        for atm in &update.atm_vols {
545            surface.set_atm_vol(atm.expiry, atm.iv);
546        }
547
548        state
549            .last_update_timestamps
550            .insert(update.symbol.clone(), Utc::now().timestamp_millis());
551        self.messages_received.fetch_add(1, Ordering::Relaxed);
552        counter!(
553            "ht_vol_oracle_messages_received_total",
554            "provider" => VolProviderKind::BlockScholes.as_str(),
555            "underlying" => update.symbol.clone()
556        )
557        .increment(1);
558
559        debug!(
560            "Updated vol surface for {}: {} points, {} ATM vols",
561            update.symbol,
562            update.points.len(),
563            update.atm_vols.len()
564        );
565    }
566
567    /// Process ATM volatility update.
568    async fn process_atm_vol_update(&self, update: AtmVolUpdateMessage) {
569        let mut state = self
570            .state
571            .write()
572            .expect("block scholes vol oracle state poisoned");
573
574        let surface = state.surfaces.entry(update.symbol.clone()).or_default();
575
576        for point in &update.points {
577            surface.set_atm_vol(point.expiry, point.iv);
578        }
579
580        state
581            .last_update_timestamps
582            .insert(update.symbol.clone(), Utc::now().timestamp_millis());
583        self.messages_received.fetch_add(1, Ordering::Relaxed);
584        counter!(
585            "ht_vol_oracle_messages_received_total",
586            "provider" => VolProviderKind::BlockScholes.as_str(),
587            "underlying" => update.symbol.clone()
588        )
589        .increment(1);
590
591        debug!(
592            "Updated ATM vols for {}: {} expiries",
593            update.symbol,
594            update.points.len()
595        );
596    }
597
598    /// Set volatility data directly (for testing).
599    ///
600    /// This allows tests to inject volatility data without making network requests.
601    pub async fn set_vol_for_testing(&self, symbol: &str, strike: f64, expiry: i64, iv: f64) {
602        let mut state = self
603            .state
604            .write()
605            .expect("block scholes vol oracle state poisoned");
606        let surface = state.surfaces.entry(symbol.to_string()).or_default();
607        surface.insert(strike, expiry, iv);
608        state
609            .last_update_timestamps
610            .insert(symbol.to_string(), Utc::now().timestamp_millis());
611        state.is_connected = true;
612        info!(
613            "Test mode: Set vol for {} strike={} expiry={} to {:.2}%",
614            symbol,
615            strike,
616            expiry,
617            iv * 100.0
618        );
619    }
620
621    /// Set ATM volatility directly (for testing).
622    pub async fn set_atm_vol_for_testing(&self, symbol: &str, expiry: i64, iv: f64) {
623        let mut state = self
624            .state
625            .write()
626            .expect("block scholes vol oracle state poisoned");
627        let surface = state.surfaces.entry(symbol.to_string()).or_default();
628        surface.set_atm_vol(expiry, iv);
629        state
630            .last_update_timestamps
631            .insert(symbol.to_string(), Utc::now().timestamp_millis());
632        state.is_connected = true;
633        info!(
634            "Test mode: Set ATM vol for {} expiry={} to {:.2}%",
635            symbol,
636            expiry,
637            iv * 100.0
638        );
639    }
640}
641
642#[async_trait]
643impl PollingVolOracle for BlockScholesVolOracle {
644    fn start_polling(self: Arc<Self>) -> JoinHandle<()> {
645        let oracle = Arc::clone(&self);
646
647        tokio::spawn(async move {
648            loop {
649                match oracle.connect_and_run().await {
650                    Ok(()) => {
651                        info!("WebSocket connection closed gracefully");
652                        // Check if this was intentional shutdown
653                        // For now, always attempt to reconnect
654                    }
655                    Err(e) => {
656                        error!("WebSocket connection error: {}", e);
657                    }
658                }
659
660                // Update state and check reconnection limit
661                let should_stop = {
662                    let mut state = oracle
663                        .state
664                        .write()
665                        .expect("block scholes vol oracle state poisoned");
666                    state.is_connected = false;
667                    state.reconnect_attempts += 1;
668                    state.last_error = Some("websocket disconnected".to_string());
669
670                    oracle.config.max_reconnect_attempts > 0
671                        && state.reconnect_attempts >= oracle.config.max_reconnect_attempts
672                };
673
674                if should_stop {
675                    error!("Max reconnection attempts reached, stopping oracle");
676                    break;
677                }
678
679                let attempts = oracle
680                    .state
681                    .read()
682                    .expect("block scholes vol oracle state poisoned")
683                    .reconnect_attempts;
684                info!(
685                    "Reconnecting in {}ms (attempt {})",
686                    oracle.config.reconnect_delay_ms, attempts
687                );
688
689                tokio::time::sleep(Duration::from_millis(oracle.config.reconnect_delay_ms)).await;
690            }
691        })
692    }
693
694    async fn get_vol(&self, symbol: &str, strike: f64, expiry: i64) -> Option<f64> {
695        let state = self
696            .state
697            .read()
698            .expect("block scholes vol oracle state poisoned");
699        let spot = state.index_prices.get(symbol).copied();
700        state
701            .surfaces
702            .get(symbol)
703            .and_then(|surface| surface.get_interpolated_with_spot(strike, expiry, spot))
704    }
705
706    async fn get_atm_vol(&self, symbol: &str, expiry: i64) -> Option<f64> {
707        let state = self
708            .state
709            .read()
710            .expect("block scholes vol oracle state poisoned");
711        state
712            .surfaces
713            .get(symbol)
714            .and_then(|surface| surface.get_atm(expiry))
715    }
716
717    async fn is_healthy(&self) -> bool {
718        let state = self
719            .state
720            .read()
721            .expect("block scholes vol oracle state poisoned");
722
723        // Check if connected
724        if !state.is_connected {
725            return false;
726        }
727
728        // Check if data is fresh (within cache TTL)
729        if let Some(last_update) = state.last_update_timestamps.values().copied().max() {
730            let now = Utc::now().timestamp_millis();
731            let age = now - last_update;
732            return age < self.config.cache_ttl_ms as i64;
733        }
734
735        false
736    }
737
738    async fn last_update_timestamp(&self) -> Option<i64> {
739        self.state
740            .read()
741            .expect("block scholes vol oracle state poisoned")
742            .last_update_timestamps
743            .values()
744            .copied()
745            .max()
746    }
747}
748
749impl RiskVolOracle for BlockScholesVolOracle {
750    fn get_iv(&self, underlying: &str, strike: f64, expiry_ts: i64) -> Result<f64, VolLookupError> {
751        let status = self.status_for(underlying);
752        if !status.connected {
753            return Err(VolLookupError::UnhealthyProvider {
754                underlying: underlying.to_string(),
755                provider: VolProviderKind::BlockScholes,
756                reason: status
757                    .last_error
758                    .unwrap_or_else(|| "not connected".to_string()),
759            });
760        }
761
762        // Skip the ready check — let the lookup itself determine if data exists.
763        // The readiness gate was causing false "stale surface" errors when index
764        // prices updated the timestamp but no IV data had arrived yet.
765
766        let state = self
767            .state
768            .read()
769            .expect("block scholes vol oracle state poisoned");
770        let spot = state.index_prices.get(underlying).copied();
771        let iv = state
772            .surfaces
773            .get(underlying)
774            .and_then(|surface| surface.get_interpolated_with_spot(strike, expiry_ts, spot))
775            .ok_or_else(|| VolLookupError::MissingSurface {
776                underlying: underlying.to_string(),
777                provider: VolProviderKind::BlockScholes,
778                strike,
779                expiry_ts,
780            })?;
781
782        debug!(
783            underlying,
784            strike,
785            expiry_ts,
786            iv,
787            spot = ?spot,
788            provider = VolProviderKind::BlockScholes.as_str(),
789            "Backend vol oracle value used"
790        );
791
792        Ok(iv)
793    }
794
795    fn statuses(&self) -> Vec<VolOracleStatus> {
796        self.config
797            .symbols
798            .iter()
799            .map(|symbol| self.status_for(symbol))
800            .collect()
801    }
802
803    fn get_surface_snapshot(&self, underlying: &str) -> Option<VolSurfaceSnapshot> {
804        let state = self
805            .state
806            .read()
807            .expect("block scholes vol oracle state poisoned");
808        let surface = state.surfaces.get(underlying)?;
809        Some(VolSurfaceSnapshot {
810            underlying: underlying.to_string(),
811            last_update_ts_ms: state.last_update_timestamps.get(underlying).copied(),
812            expiries: surface.expiries().iter().copied().collect(),
813            strike_points: surface.export_all_points(),
814            delta_curves: surface.export_delta_curves(),
815            atm_vols: surface.export_atm_vols(),
816            spot_price: state.index_prices.get(underlying).copied(),
817        })
818    }
819
820    fn supports_surface_snapshots(&self) -> bool {
821        true
822    }
823}
824
825/// Mock polling oracle for unit testing margin calculations.
826///
827/// Returns a fixed volatility value for all queries.
828pub struct MockVolOracle {
829    fixed_vol: f64,
830}
831
832impl MockVolOracle {
833    /// Create a new mock oracle with a fixed volatility value.
834    pub fn new(fixed_vol: f64) -> Self {
835        Self { fixed_vol }
836    }
837}
838
839#[async_trait]
840impl PollingVolOracle for MockVolOracle {
841    fn start_polling(self: Arc<Self>) -> JoinHandle<()> {
842        tokio::spawn(async {})
843    }
844
845    async fn get_vol(&self, _symbol: &str, _strike: f64, _expiry: i64) -> Option<f64> {
846        Some(self.fixed_vol)
847    }
848
849    async fn get_atm_vol(&self, _symbol: &str, _expiry: i64) -> Option<f64> {
850        Some(self.fixed_vol)
851    }
852
853    async fn is_healthy(&self) -> bool {
854        true
855    }
856
857    async fn last_update_timestamp(&self) -> Option<i64> {
858        Some(Utc::now().timestamp_millis())
859    }
860}
861
862#[cfg(test)]
863mod tests {
864    use super::*;
865
866    #[test]
867    fn test_config_default() {
868        let config = BlockScholesVolOracleConfig::default();
869        assert_eq!(config.symbols, vec!["BTC", "ETH"]);
870        assert_eq!(config.reconnect_delay_ms, DEFAULT_RECONNECT_DELAY_MS);
871        assert_eq!(config.max_reconnect_attempts, 0);
872        assert!(config.api_key.is_none());
873    }
874
875    #[test]
876    fn test_oracle_creation() {
877        let oracle = BlockScholesVolOracle::with_defaults();
878        assert_eq!(oracle.config().symbols, vec!["BTC", "ETH"]);
879    }
880
881    #[tokio::test]
882    async fn test_set_vol_for_testing() {
883        let oracle = BlockScholesVolOracle::with_defaults();
884
885        oracle
886            .set_vol_for_testing("BTC", 100000.0, 1735689600, 0.75)
887            .await;
888
889        let vol = oracle.get_vol("BTC", 100000.0, 1735689600).await;
890        assert_eq!(vol, Some(0.75));
891    }
892
893    #[tokio::test]
894    async fn test_set_atm_vol_for_testing() {
895        let oracle = BlockScholesVolOracle::with_defaults();
896
897        oracle
898            .set_atm_vol_for_testing("ETH", 1735689600, 0.80)
899            .await;
900
901        let vol = oracle.get_atm_vol("ETH", 1735689600).await;
902        assert_eq!(vol, Some(0.80));
903    }
904
905    #[tokio::test]
906    async fn test_get_vol_not_found() {
907        let oracle = BlockScholesVolOracle::with_defaults();
908
909        let vol = oracle.get_vol("BTC", 100000.0, 1735689600).await;
910        assert_eq!(vol, None);
911    }
912
913    #[tokio::test]
914    async fn test_is_healthy_when_not_connected() {
915        let oracle = BlockScholesVolOracle::with_defaults();
916
917        assert!(!oracle.is_healthy().await);
918    }
919
920    #[tokio::test]
921    async fn test_is_healthy_after_test_data() {
922        let oracle = BlockScholesVolOracle::with_defaults();
923
924        // Setting test data marks the oracle as connected
925        oracle
926            .set_vol_for_testing("BTC", 100000.0, 1735689600, 0.75)
927            .await;
928
929        assert!(oracle.is_healthy().await);
930    }
931
932    #[tokio::test]
933    async fn test_mock_vol_oracle() {
934        let mock = MockVolOracle::new(0.65);
935
936        assert_eq!(mock.get_vol("ANY", 0.0, 0).await, Some(0.65));
937        assert_eq!(mock.get_atm_vol("ANY", 0).await, Some(0.65));
938        assert!(mock.is_healthy().await);
939        assert!(mock.last_update_timestamp().await.is_some());
940    }
941
942    #[tokio::test]
943    async fn test_interpolation_through_oracle() {
944        let oracle = BlockScholesVolOracle::with_defaults();
945
946        // Set up a 2x2 grid
947        oracle
948            .set_vol_for_testing("BTC", 40000.0, 1735689600, 0.70)
949            .await;
950        oracle
951            .set_vol_for_testing("BTC", 40000.0, 1735776000, 0.75)
952            .await;
953        oracle
954            .set_vol_for_testing("BTC", 60000.0, 1735689600, 0.80)
955            .await;
956        oracle
957            .set_vol_for_testing("BTC", 60000.0, 1735776000, 0.85)
958            .await;
959
960        // Query center point
961        let mid_expiry = (1735689600 + 1735776000) / 2;
962        let vol = oracle.get_vol("BTC", 50000.0, mid_expiry).await;
963
964        assert!(vol.is_some());
965        let v = vol.unwrap();
966        // Should be around 0.775 (average of corners)
967        assert!(v > 0.70 && v < 0.90, "Expected ~0.775, got {}", v);
968    }
969
970    #[tokio::test]
971    async fn test_atm_fallback_when_no_strikes() {
972        // When only ATM data exists (no delta curves, no strikes), fall back to ATM
973        let oracle = BlockScholesVolOracle::with_defaults();
974
975        oracle
976            .set_atm_vol_for_testing("BTC", 1735689600, 0.65)
977            .await;
978        oracle
979            .set_atm_vol_for_testing("BTC", 1735776000, 0.70)
980            .await;
981
982        // Query an arbitrary strike — should fall back to ATM
983        let vol = oracle.get_vol("BTC", 95000.0, 1735689600).await;
984        assert_eq!(vol, Some(0.65), "Should fall back to ATM vol");
985
986        // Query between expiries — should interpolate ATM
987        let mid = (1735689600 + 1735776000) / 2;
988        let vol = oracle.get_vol("BTC", 95000.0, mid).await;
989        assert!(vol.is_some());
990        let v = vol.unwrap();
991        assert!(
992            v > 0.64 && v < 0.71,
993            "Expected interpolated ATM ~0.675, got {}",
994            v
995        );
996    }
997}
998// CALL-681: force rebuild for fixed vol catalog change