1use std::collections::HashMap;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::{Arc, RwLock};
9use std::time::{Duration, Instant};
10
11use anyhow::{Context, Result};
12use async_trait::async_trait;
13use chrono::Utc;
14use futures::{SinkExt, StreamExt};
15use metrics::{counter, gauge};
16use sonic_rs::{JsonContainerTrait, JsonValueTrait};
17use tokio::task::JoinHandle;
18use tokio_tungstenite::{connect_async, tungstenite::Message};
19use tracing::{debug, error, info, warn};
20
21use super::blockscholes_types::{
22 AtmVolUpdateMessage, BlockScholesAuth, BlockScholesMessage, BlockScholesSubscribe,
23 VolSurfaceUpdateMessage,
24};
25use super::risk_oracle::{
26 RiskVolOracle, VolLookupError, VolOracleStatus, VolProviderKind, VolSurfaceSnapshot,
27};
28use super::vol_surface_cache::VolatilitySurface;
29use crate::PollingVolOracle;
30
31pub const DEFAULT_WS_URL: &str = "wss://prod-websocket-api.blockscholes.com/";
33
34pub const DEFAULT_RECONNECT_DELAY_MS: u64 = 5000;
36
37pub const DEFAULT_HEARTBEAT_INTERVAL_MS: u64 = 30000;
39
40pub const DEFAULT_CACHE_TTL_MS: u64 = 60000;
42
43#[derive(Debug, Clone)]
45pub struct BlockScholesVolOracleConfig {
46 pub ws_url: String,
48 pub api_key: Option<String>,
50 pub api_secret: Option<String>,
52 pub symbols: Vec<String>,
54 pub strikes: std::collections::HashMap<String, Vec<f64>>,
57 pub expiries: Vec<String>,
60 pub reconnect_delay_ms: u64,
62 pub max_reconnect_attempts: u32,
64 pub heartbeat_interval_ms: u64,
66 pub cache_ttl_ms: u64,
68}
69
70impl Default for BlockScholesVolOracleConfig {
71 fn default() -> Self {
72 Self {
73 ws_url: DEFAULT_WS_URL.to_string(),
74 api_key: None,
75 api_secret: None,
76 symbols: vec!["BTC".to_string(), "ETH".to_string()],
77 strikes: std::collections::HashMap::new(),
78 expiries: vec!["2026-03-27T08:00:00Z".to_string()], reconnect_delay_ms: DEFAULT_RECONNECT_DELAY_MS,
80 max_reconnect_attempts: 0, heartbeat_interval_ms: DEFAULT_HEARTBEAT_INTERVAL_MS,
82 cache_ttl_ms: DEFAULT_CACHE_TTL_MS,
83 }
84 }
85}
86
87#[derive(Debug, Default)]
89struct OracleState {
90 surfaces: HashMap<String, VolatilitySurface>,
92 last_update_timestamps: HashMap<String, i64>,
94 index_prices: HashMap<String, f64>,
96 is_connected: bool,
98 reconnect_attempts: u32,
100 last_error: Option<String>,
102}
103
104pub struct BlockScholesVolOracle {
109 config: BlockScholesVolOracleConfig,
110 state: Arc<RwLock<OracleState>>,
111 messages_received: AtomicU64,
112}
113
114impl BlockScholesVolOracle {
115 pub fn new(config: BlockScholesVolOracleConfig) -> Self {
117 Self {
118 config,
119 state: Arc::new(RwLock::new(OracleState::default())),
120 messages_received: AtomicU64::new(0),
121 }
122 }
123
124 pub fn with_defaults() -> Self {
126 Self::new(BlockScholesVolOracleConfig::default())
127 }
128
129 pub fn config(&self) -> &BlockScholesVolOracleConfig {
131 &self.config
132 }
133
134 fn status_for(&self, symbol: &str) -> VolOracleStatus {
135 let state = self
136 .state
137 .read()
138 .expect("block scholes vol oracle state poisoned");
139 let last_update_ts_ms = state.last_update_timestamps.get(symbol).copied();
140 let staleness_seconds = last_update_ts_ms
141 .map(|ts| ((Utc::now().timestamp_millis() - ts) as f64 / 1000.0).max(0.0));
142 let surface_points = state
143 .surfaces
144 .get(symbol)
145 .map(VolatilitySurface::len)
146 .unwrap_or(0);
147 let ready = state.is_connected
148 && surface_points > 0
149 && staleness_seconds
150 .map(|age| age <= self.config.cache_ttl_ms as f64 / 1000.0)
151 .unwrap_or(false);
152
153 VolOracleStatus {
154 underlying: symbol.to_string(),
155 provider: VolProviderKind::BlockScholes,
156 route_facing: true,
157 connected: state.is_connected,
158 ready,
159 last_update_ts_ms,
160 staleness_seconds,
161 staleness_threshold_seconds: Some(self.config.cache_ttl_ms as f64 / 1000.0),
162 surface_points,
163 messages_received: self.messages_received.load(Ordering::Relaxed),
164 last_error: state.last_error.clone(),
165 }
166 }
167
168 async fn connect_and_run(self: &Arc<Self>) -> Result<()> {
170 let url = &self.config.ws_url;
171 info!("Connecting to Block Scholes WebSocket: {}", url);
172
173 let (ws_stream, _) = connect_async(url)
174 .await
175 .context("Failed to connect to Block Scholes WebSocket")?;
176
177 let (mut write, mut read) = ws_stream.split();
178
179 {
181 let mut state = self
182 .state
183 .write()
184 .expect("block scholes vol oracle state poisoned");
185 state.is_connected = true;
186 state.reconnect_attempts = 0;
187 state.last_error = None;
188 }
189
190 gauge!(
191 "ht_vol_oracle_connected",
192 "provider" => VolProviderKind::BlockScholes.as_str()
193 )
194 .set(1.0);
195
196 info!("Connected to Block Scholes WebSocket");
197
198 if let Some(ref api_key) = self.config.api_key {
200 let auth_msg = BlockScholesAuth::new(api_key, self.config.api_secret.clone());
201 let json = sonic_rs::to_string(&auth_msg)?;
202 write.send(Message::Text(json)).await?;
203 info!("Sent authentication request");
204 }
205
206 for symbol in &self.config.symbols {
209 for expiry in &self.config.expiries {
210 let delta_subscribe = BlockScholesSubscribe::delta_iv(
216 symbol.clone(),
217 expiry.clone(),
218 Some(format!("delta_{}_{}", symbol, expiry)),
219 );
220 let json = sonic_rs::to_string(&delta_subscribe)?;
221 write.send(Message::Text(json)).await?;
222 info!(
223 "Subscribed to delta.iv grid for {} at expiry {}",
224 symbol, expiry
225 );
226 }
227
228 let index_subscribe = BlockScholesSubscribe::index_price(
230 symbol.clone(),
231 Some(format!("index_{}", symbol)),
232 );
233 let json = sonic_rs::to_string(&index_subscribe)?;
234 write.send(Message::Text(json)).await?;
235 info!("Subscribed to index price for {}", symbol);
236 }
237
238 let mut heartbeat =
240 tokio::time::interval(Duration::from_millis(self.config.heartbeat_interval_ms));
241
242 let staleness_timeout = Duration::from_secs(30);
248 let mut last_data_message = Instant::now();
249
250 loop {
252 tokio::select! {
253 msg = read.next() => {
254 match msg {
255 Some(Ok(Message::Text(text))) => {
256 last_data_message = Instant::now();
257 self.handle_message(&text).await;
258 }
259 Some(Ok(Message::Binary(data))) => {
260 last_data_message = Instant::now();
261 if let Ok(text) = String::from_utf8(data) {
263 self.handle_message(&text).await;
264 }
265 }
266 Some(Ok(Message::Close(frame))) => {
267 info!("WebSocket closed by server: {:?}", frame);
268 break;
269 }
270 Some(Ok(Message::Ping(data))) => {
271 if let Err(e) = write.send(Message::Pong(data)).await {
272 error!("Failed to send pong: {}", e);
273 }
274 }
275 Some(Ok(Message::Pong(_))) => {
276 debug!("Received pong");
277 }
278 Some(Ok(Message::Frame(_))) => {
279 }
281 Some(Err(e)) => {
282 error!("WebSocket error: {}", e);
283 break;
284 }
285 None => {
286 info!("WebSocket stream ended");
287 break;
288 }
289 }
290 }
291 _ = heartbeat.tick() => {
292 if let Err(e) = write.send(Message::Ping(vec![])).await {
294 error!("Failed to send ping: {}", e);
295 break;
296 }
297 debug!("Sent heartbeat ping");
298
299 if last_data_message.elapsed() > staleness_timeout {
301 warn!(
302 "Block Scholes WebSocket stalled: no data messages for {:.0}s, forcing reconnect",
303 last_data_message.elapsed().as_secs_f64()
304 );
305 counter!("ht_vol_oracle_stale_reconnects_total", "provider" => VolProviderKind::BlockScholes.as_str()).increment(1);
306 break;
307 }
308 }
309 }
310 }
311
312 {
314 let mut state = self
315 .state
316 .write()
317 .expect("block scholes vol oracle state poisoned");
318 state.is_connected = false;
319 }
320
321 gauge!(
322 "ht_vol_oracle_connected",
323 "provider" => VolProviderKind::BlockScholes.as_str()
324 )
325 .set(0.0);
326
327 Ok(())
328 }
329
330 async fn handle_message(&self, text: &str) {
332 if let Ok(rpc) = sonic_rs::from_str::<super::blockscholes_types::JsonRpcResponse>(text) {
334 if let Some(ref err) = rpc.error {
335 error!(
336 "Block Scholes RPC error (code {}): {}",
337 err.code, err.message
338 );
339 return;
340 }
341
342 if rpc.is_success() {
343 if rpc.result == Some(sonic_rs::json!("ok")) {
345 info!("Authentication successful");
346 return;
347 }
348 debug!("RPC success: {:?}", rpc.result);
350 return;
351 }
352
353 if rpc.method.as_deref() == Some("subscription") {
355 if let Some(params) = rpc.params {
356 self.process_subscription_data(params).await;
357 }
358 return;
359 }
360 }
361
362 match sonic_rs::from_str::<BlockScholesMessage>(text) {
364 Ok(msg) => match msg {
365 BlockScholesMessage::VolSurfaceUpdate(update) => {
366 self.process_vol_surface_update(update).await;
367 }
368 BlockScholesMessage::AtmVolUpdate(update) => {
369 self.process_atm_vol_update(update).await;
370 }
371 BlockScholesMessage::Subscribed { channel, symbol } => {
372 info!("Confirmed subscription: {} for {}", channel, symbol);
373 }
374 BlockScholesMessage::Unsubscribed { channel, symbol } => {
375 info!("Confirmed unsubscription: {} for {}", channel, symbol);
376 }
377 BlockScholesMessage::Authenticated { message } => {
378 info!("Authentication successful: {:?}", message);
379 }
380 BlockScholesMessage::Error { message, code } => {
381 error!("Block Scholes error (code {:?}): {}", code, message);
382 }
383 BlockScholesMessage::Heartbeat { timestamp } => {
384 debug!("Received server heartbeat: {}", timestamp);
385 }
386 BlockScholesMessage::Info { message, version } => {
387 info!("Server info: {:?}, version: {:?}", message, version);
388 }
389 },
390 Err(e) => {
391 debug!("Failed to parse message: {} - {}", e, text);
392 }
393 }
394 }
395
396 async fn process_subscription_data(&self, params: sonic_rs::Value) {
399 let params_array = match params.as_array() {
400 Some(arr) => arr,
401 None => {
402 debug!("Subscription params is not an array: {:?}", params);
403 return;
404 }
405 };
406
407 let mut state = self
408 .state
409 .write()
410 .expect("block scholes vol oracle state poisoned");
411
412 for param in params_array {
413 let client_id = param
414 .get("client_id")
415 .and_then(|v| v.as_str())
416 .unwrap_or("");
417 let data = match param.get("data") {
418 Some(d) => d,
419 None => continue,
420 };
421
422 let timestamp = data.get("timestamp").and_then(|v| v.as_i64());
423 let values = match data.get("values").and_then(|v| v.as_array()) {
424 Some(v) => v,
425 None => continue,
426 };
427
428 for value in values {
429 let sid = value.get("sid").and_then(|v| v.as_str()).unwrap_or("");
430
431 let base_asset = if client_id.starts_with("delta_")
433 || client_id.starts_with("atm_")
434 || client_id.starts_with("strike_")
435 {
436 client_id.split('_').nth(1).unwrap_or("BTC")
437 } else if let Some(index_symbol) = client_id.strip_prefix("index_") {
438 index_symbol
439 } else {
440 sid.split('_').nth(1).unwrap_or("BTC")
442 };
443
444 let expiry_ts = client_id
447 .split('_')
448 .nth(2)
449 .and_then(|exp| chrono::DateTime::parse_from_rfc3339(exp).ok())
450 .map(|dt| dt.timestamp())
451 .unwrap_or_else(|| timestamp.unwrap_or(0) / 1000);
452
453 if let Some(strikes) = value.get("strike").and_then(|v| v.as_array()) {
455 if let Some(ivs) = value.get("v").and_then(|v| v.as_array()) {
456 let surface = state.surfaces.entry(base_asset.to_string()).or_default();
457
458 for (strike, iv) in strikes.iter().zip(ivs.iter()) {
459 if let (Some(s), Some(v)) = (strike.as_f64(), iv.as_f64()) {
460 surface.insert(s, expiry_ts, v);
461 debug!(
462 "Updated strike IV for {} strike={} expiry={}: {:.2}%",
463 base_asset,
464 s,
465 expiry_ts,
466 v * 100.0
467 );
468 }
469 }
470 info!(
471 "Updated {} strike IVs for {} at expiry {}",
472 strikes.len(),
473 base_asset,
474 expiry_ts
475 );
476 }
477 }
478 else if let Some(deltas) = value.get("delta").and_then(|v| v.as_array()) {
480 if let Some(ivs) = value.get("v").and_then(|v| v.as_array()) {
481 let surface = state.surfaces.entry(base_asset.to_string()).or_default();
482
483 let mut count = 0u32;
484 for (delta, iv) in deltas.iter().zip(ivs.iter()) {
485 if let (Some(d), Some(v)) = (delta.as_f64(), iv.as_f64()) {
486 surface.set_delta_iv(expiry_ts, d, v);
488 count += 1;
489
490 if (d - 0.5).abs() < 0.01 {
492 surface.set_atm_vol(expiry_ts, v);
493 }
494 }
495 }
496
497 if count > 0 {
498 debug!(
499 "Updated {} delta IVs for {} at expiry {}",
500 count, base_asset, expiry_ts
501 );
502 }
503 }
504 }
505 else if let Some(price) = value.get("v").and_then(|v| v.as_f64()) {
507 state.index_prices.insert(base_asset.to_string(), price);
508 debug!("Index price for {}: ${:.2}", base_asset, price);
509 continue;
512 }
513 if let Some(timestamp) = timestamp {
514 state
515 .last_update_timestamps
516 .insert(base_asset.to_string(), timestamp);
517 }
518 }
519 }
520
521 self.messages_received.fetch_add(1, Ordering::Relaxed);
522 counter!(
523 "ht_vol_oracle_messages_received_total",
524 "provider" => VolProviderKind::BlockScholes.as_str()
525 )
526 .increment(1);
527 }
528
529 async fn process_vol_surface_update(&self, update: VolSurfaceUpdateMessage) {
531 let mut state = self
532 .state
533 .write()
534 .expect("block scholes vol oracle state poisoned");
535
536 let surface = state.surfaces.entry(update.symbol.clone()).or_default();
537
538 for point in &update.points {
540 surface.insert(point.strike, point.expiry, point.iv);
541 }
542
543 for atm in &update.atm_vols {
545 surface.set_atm_vol(atm.expiry, atm.iv);
546 }
547
548 state
549 .last_update_timestamps
550 .insert(update.symbol.clone(), Utc::now().timestamp_millis());
551 self.messages_received.fetch_add(1, Ordering::Relaxed);
552 counter!(
553 "ht_vol_oracle_messages_received_total",
554 "provider" => VolProviderKind::BlockScholes.as_str(),
555 "underlying" => update.symbol.clone()
556 )
557 .increment(1);
558
559 debug!(
560 "Updated vol surface for {}: {} points, {} ATM vols",
561 update.symbol,
562 update.points.len(),
563 update.atm_vols.len()
564 );
565 }
566
567 async fn process_atm_vol_update(&self, update: AtmVolUpdateMessage) {
569 let mut state = self
570 .state
571 .write()
572 .expect("block scholes vol oracle state poisoned");
573
574 let surface = state.surfaces.entry(update.symbol.clone()).or_default();
575
576 for point in &update.points {
577 surface.set_atm_vol(point.expiry, point.iv);
578 }
579
580 state
581 .last_update_timestamps
582 .insert(update.symbol.clone(), Utc::now().timestamp_millis());
583 self.messages_received.fetch_add(1, Ordering::Relaxed);
584 counter!(
585 "ht_vol_oracle_messages_received_total",
586 "provider" => VolProviderKind::BlockScholes.as_str(),
587 "underlying" => update.symbol.clone()
588 )
589 .increment(1);
590
591 debug!(
592 "Updated ATM vols for {}: {} expiries",
593 update.symbol,
594 update.points.len()
595 );
596 }
597
598 pub async fn set_vol_for_testing(&self, symbol: &str, strike: f64, expiry: i64, iv: f64) {
602 let mut state = self
603 .state
604 .write()
605 .expect("block scholes vol oracle state poisoned");
606 let surface = state.surfaces.entry(symbol.to_string()).or_default();
607 surface.insert(strike, expiry, iv);
608 state
609 .last_update_timestamps
610 .insert(symbol.to_string(), Utc::now().timestamp_millis());
611 state.is_connected = true;
612 info!(
613 "Test mode: Set vol for {} strike={} expiry={} to {:.2}%",
614 symbol,
615 strike,
616 expiry,
617 iv * 100.0
618 );
619 }
620
621 pub async fn set_atm_vol_for_testing(&self, symbol: &str, expiry: i64, iv: f64) {
623 let mut state = self
624 .state
625 .write()
626 .expect("block scholes vol oracle state poisoned");
627 let surface = state.surfaces.entry(symbol.to_string()).or_default();
628 surface.set_atm_vol(expiry, iv);
629 state
630 .last_update_timestamps
631 .insert(symbol.to_string(), Utc::now().timestamp_millis());
632 state.is_connected = true;
633 info!(
634 "Test mode: Set ATM vol for {} expiry={} to {:.2}%",
635 symbol,
636 expiry,
637 iv * 100.0
638 );
639 }
640}
641
642#[async_trait]
643impl PollingVolOracle for BlockScholesVolOracle {
644 fn start_polling(self: Arc<Self>) -> JoinHandle<()> {
645 let oracle = Arc::clone(&self);
646
647 tokio::spawn(async move {
648 loop {
649 match oracle.connect_and_run().await {
650 Ok(()) => {
651 info!("WebSocket connection closed gracefully");
652 }
655 Err(e) => {
656 error!("WebSocket connection error: {}", e);
657 }
658 }
659
660 let should_stop = {
662 let mut state = oracle
663 .state
664 .write()
665 .expect("block scholes vol oracle state poisoned");
666 state.is_connected = false;
667 state.reconnect_attempts += 1;
668 state.last_error = Some("websocket disconnected".to_string());
669
670 oracle.config.max_reconnect_attempts > 0
671 && state.reconnect_attempts >= oracle.config.max_reconnect_attempts
672 };
673
674 if should_stop {
675 error!("Max reconnection attempts reached, stopping oracle");
676 break;
677 }
678
679 let attempts = oracle
680 .state
681 .read()
682 .expect("block scholes vol oracle state poisoned")
683 .reconnect_attempts;
684 info!(
685 "Reconnecting in {}ms (attempt {})",
686 oracle.config.reconnect_delay_ms, attempts
687 );
688
689 tokio::time::sleep(Duration::from_millis(oracle.config.reconnect_delay_ms)).await;
690 }
691 })
692 }
693
694 async fn get_vol(&self, symbol: &str, strike: f64, expiry: i64) -> Option<f64> {
695 let state = self
696 .state
697 .read()
698 .expect("block scholes vol oracle state poisoned");
699 let spot = state.index_prices.get(symbol).copied();
700 state
701 .surfaces
702 .get(symbol)
703 .and_then(|surface| surface.get_interpolated_with_spot(strike, expiry, spot))
704 }
705
706 async fn get_atm_vol(&self, symbol: &str, expiry: i64) -> Option<f64> {
707 let state = self
708 .state
709 .read()
710 .expect("block scholes vol oracle state poisoned");
711 state
712 .surfaces
713 .get(symbol)
714 .and_then(|surface| surface.get_atm(expiry))
715 }
716
717 async fn is_healthy(&self) -> bool {
718 let state = self
719 .state
720 .read()
721 .expect("block scholes vol oracle state poisoned");
722
723 if !state.is_connected {
725 return false;
726 }
727
728 if let Some(last_update) = state.last_update_timestamps.values().copied().max() {
730 let now = Utc::now().timestamp_millis();
731 let age = now - last_update;
732 return age < self.config.cache_ttl_ms as i64;
733 }
734
735 false
736 }
737
738 async fn last_update_timestamp(&self) -> Option<i64> {
739 self.state
740 .read()
741 .expect("block scholes vol oracle state poisoned")
742 .last_update_timestamps
743 .values()
744 .copied()
745 .max()
746 }
747}
748
749impl RiskVolOracle for BlockScholesVolOracle {
750 fn get_iv(&self, underlying: &str, strike: f64, expiry_ts: i64) -> Result<f64, VolLookupError> {
751 let status = self.status_for(underlying);
752 if !status.connected {
753 return Err(VolLookupError::UnhealthyProvider {
754 underlying: underlying.to_string(),
755 provider: VolProviderKind::BlockScholes,
756 reason: status
757 .last_error
758 .unwrap_or_else(|| "not connected".to_string()),
759 });
760 }
761
762 let state = self
767 .state
768 .read()
769 .expect("block scholes vol oracle state poisoned");
770 let spot = state.index_prices.get(underlying).copied();
771 let iv = state
772 .surfaces
773 .get(underlying)
774 .and_then(|surface| surface.get_interpolated_with_spot(strike, expiry_ts, spot))
775 .ok_or_else(|| VolLookupError::MissingSurface {
776 underlying: underlying.to_string(),
777 provider: VolProviderKind::BlockScholes,
778 strike,
779 expiry_ts,
780 })?;
781
782 debug!(
783 underlying,
784 strike,
785 expiry_ts,
786 iv,
787 spot = ?spot,
788 provider = VolProviderKind::BlockScholes.as_str(),
789 "Backend vol oracle value used"
790 );
791
792 Ok(iv)
793 }
794
795 fn statuses(&self) -> Vec<VolOracleStatus> {
796 self.config
797 .symbols
798 .iter()
799 .map(|symbol| self.status_for(symbol))
800 .collect()
801 }
802
803 fn get_surface_snapshot(&self, underlying: &str) -> Option<VolSurfaceSnapshot> {
804 let state = self
805 .state
806 .read()
807 .expect("block scholes vol oracle state poisoned");
808 let surface = state.surfaces.get(underlying)?;
809 Some(VolSurfaceSnapshot {
810 underlying: underlying.to_string(),
811 last_update_ts_ms: state.last_update_timestamps.get(underlying).copied(),
812 expiries: surface.expiries().iter().copied().collect(),
813 strike_points: surface.export_all_points(),
814 delta_curves: surface.export_delta_curves(),
815 atm_vols: surface.export_atm_vols(),
816 spot_price: state.index_prices.get(underlying).copied(),
817 })
818 }
819
820 fn supports_surface_snapshots(&self) -> bool {
821 true
822 }
823}
824
825pub struct MockVolOracle {
829 fixed_vol: f64,
830}
831
832impl MockVolOracle {
833 pub fn new(fixed_vol: f64) -> Self {
835 Self { fixed_vol }
836 }
837}
838
839#[async_trait]
840impl PollingVolOracle for MockVolOracle {
841 fn start_polling(self: Arc<Self>) -> JoinHandle<()> {
842 tokio::spawn(async {})
843 }
844
845 async fn get_vol(&self, _symbol: &str, _strike: f64, _expiry: i64) -> Option<f64> {
846 Some(self.fixed_vol)
847 }
848
849 async fn get_atm_vol(&self, _symbol: &str, _expiry: i64) -> Option<f64> {
850 Some(self.fixed_vol)
851 }
852
853 async fn is_healthy(&self) -> bool {
854 true
855 }
856
857 async fn last_update_timestamp(&self) -> Option<i64> {
858 Some(Utc::now().timestamp_millis())
859 }
860}
861
862#[cfg(test)]
863mod tests {
864 use super::*;
865
866 #[test]
867 fn test_config_default() {
868 let config = BlockScholesVolOracleConfig::default();
869 assert_eq!(config.symbols, vec!["BTC", "ETH"]);
870 assert_eq!(config.reconnect_delay_ms, DEFAULT_RECONNECT_DELAY_MS);
871 assert_eq!(config.max_reconnect_attempts, 0);
872 assert!(config.api_key.is_none());
873 }
874
875 #[test]
876 fn test_oracle_creation() {
877 let oracle = BlockScholesVolOracle::with_defaults();
878 assert_eq!(oracle.config().symbols, vec!["BTC", "ETH"]);
879 }
880
881 #[tokio::test]
882 async fn test_set_vol_for_testing() {
883 let oracle = BlockScholesVolOracle::with_defaults();
884
885 oracle
886 .set_vol_for_testing("BTC", 100000.0, 1735689600, 0.75)
887 .await;
888
889 let vol = oracle.get_vol("BTC", 100000.0, 1735689600).await;
890 assert_eq!(vol, Some(0.75));
891 }
892
893 #[tokio::test]
894 async fn test_set_atm_vol_for_testing() {
895 let oracle = BlockScholesVolOracle::with_defaults();
896
897 oracle
898 .set_atm_vol_for_testing("ETH", 1735689600, 0.80)
899 .await;
900
901 let vol = oracle.get_atm_vol("ETH", 1735689600).await;
902 assert_eq!(vol, Some(0.80));
903 }
904
905 #[tokio::test]
906 async fn test_get_vol_not_found() {
907 let oracle = BlockScholesVolOracle::with_defaults();
908
909 let vol = oracle.get_vol("BTC", 100000.0, 1735689600).await;
910 assert_eq!(vol, None);
911 }
912
913 #[tokio::test]
914 async fn test_is_healthy_when_not_connected() {
915 let oracle = BlockScholesVolOracle::with_defaults();
916
917 assert!(!oracle.is_healthy().await);
918 }
919
920 #[tokio::test]
921 async fn test_is_healthy_after_test_data() {
922 let oracle = BlockScholesVolOracle::with_defaults();
923
924 oracle
926 .set_vol_for_testing("BTC", 100000.0, 1735689600, 0.75)
927 .await;
928
929 assert!(oracle.is_healthy().await);
930 }
931
932 #[tokio::test]
933 async fn test_mock_vol_oracle() {
934 let mock = MockVolOracle::new(0.65);
935
936 assert_eq!(mock.get_vol("ANY", 0.0, 0).await, Some(0.65));
937 assert_eq!(mock.get_atm_vol("ANY", 0).await, Some(0.65));
938 assert!(mock.is_healthy().await);
939 assert!(mock.last_update_timestamp().await.is_some());
940 }
941
942 #[tokio::test]
943 async fn test_interpolation_through_oracle() {
944 let oracle = BlockScholesVolOracle::with_defaults();
945
946 oracle
948 .set_vol_for_testing("BTC", 40000.0, 1735689600, 0.70)
949 .await;
950 oracle
951 .set_vol_for_testing("BTC", 40000.0, 1735776000, 0.75)
952 .await;
953 oracle
954 .set_vol_for_testing("BTC", 60000.0, 1735689600, 0.80)
955 .await;
956 oracle
957 .set_vol_for_testing("BTC", 60000.0, 1735776000, 0.85)
958 .await;
959
960 let mid_expiry = (1735689600 + 1735776000) / 2;
962 let vol = oracle.get_vol("BTC", 50000.0, mid_expiry).await;
963
964 assert!(vol.is_some());
965 let v = vol.unwrap();
966 assert!(v > 0.70 && v < 0.90, "Expected ~0.775, got {}", v);
968 }
969
970 #[tokio::test]
971 async fn test_atm_fallback_when_no_strikes() {
972 let oracle = BlockScholesVolOracle::with_defaults();
974
975 oracle
976 .set_atm_vol_for_testing("BTC", 1735689600, 0.65)
977 .await;
978 oracle
979 .set_atm_vol_for_testing("BTC", 1735776000, 0.70)
980 .await;
981
982 let vol = oracle.get_vol("BTC", 95000.0, 1735689600).await;
984 assert_eq!(vol, Some(0.65), "Should fall back to ATM vol");
985
986 let mid = (1735689600 + 1735776000) / 2;
988 let vol = oracle.get_vol("BTC", 95000.0, mid).await;
989 assert!(vol.is_some());
990 let v = vol.unwrap();
991 assert!(
992 v > 0.64 && v < 0.71,
993 "Expected interpolated ATM ~0.675, got {}",
994 v
995 );
996 }
997}
998