1use std::collections::HashMap;
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::sync::{Arc, RwLock};
6use std::time::Duration;
7
8use anyhow::{Context, Result};
9use chrono::{Datelike, NaiveDate, Utc};
10use metrics::{counter, gauge};
11use reqwest::Client;
12use serde::Deserialize;
13use tokio::task::JoinHandle;
14use tracing::{debug, error, info, warn};
15
16use super::risk_oracle::{
17 RiskVolOracle, VolLookupError, VolOracleStatus, VolProviderKind, VolSurfaceSnapshot,
18};
19use super::vol_surface_cache::VolatilitySurface;
20
21pub const DEFAULT_POLYGON_BASE_URL: &str = "https://api.polygon.io";
22
23#[derive(Debug, Clone)]
24pub struct PolygonVolOracleConfig {
25 pub api_key: String,
26 pub base_url: String,
27 pub poll_interval: Duration,
28 pub staleness_threshold: Duration,
29 pub underlyings: HashMap<String, PolygonUnderlyingConfig>,
30}
31
32#[derive(Debug, Clone)]
33pub struct PolygonUnderlyingConfig {
34 pub ticker: String,
35 pub strike_scale: f64,
37 pub require_live_strike_scale: bool,
38}
39
40pub type PlatformSpotPrices = Arc<RwLock<HashMap<String, f64>>>;
43
44#[derive(Debug, Default)]
45struct PolygonSurfaceState {
46 surfaces: HashMap<String, VolatilitySurface>,
47 connected: HashMap<String, bool>,
48 last_update_ts_ms: HashMap<String, i64>,
49 last_error: HashMap<String, String>,
50 etf_spots: HashMap<String, f64>,
52 surface_spots: HashMap<String, f64>,
54 last_dynamic_scale: HashMap<String, f64>,
58}
59
60#[derive(Debug, Deserialize)]
61struct PolygonResponse {
62 #[serde(default)]
63 status: Option<String>,
64 #[serde(default)]
65 results: Option<Vec<PolygonSnapshot>>,
66 #[serde(default)]
67 next_url: Option<String>,
68}
69
70#[derive(Debug, Deserialize)]
71struct PolygonSnapshot {
72 #[serde(default)]
73 details: Option<PolygonDetails>,
74 #[serde(default)]
75 implied_volatility: Option<f64>,
76 #[serde(default)]
77 underlying_asset: Option<PolygonUnderlyingAsset>,
78 #[serde(default)]
79 last_quote: Option<PolygonMarketTimestamp>,
80 #[serde(default)]
81 last_trade: Option<PolygonMarketTimestamp>,
82 #[serde(default)]
83 fmv_last_updated: Option<i64>,
84}
85
86#[derive(Debug, Deserialize)]
87struct PolygonDetails {
88 #[serde(default)]
89 strike_price: Option<f64>,
90 #[serde(default)]
91 expiration_date: Option<String>,
92}
93
94#[derive(Debug, Deserialize)]
95struct PolygonUnderlyingAsset {
96 #[serde(default)]
97 price: Option<f64>,
98}
99
100#[derive(Debug, Deserialize)]
101struct PolygonMarketTimestamp {
102 #[serde(default)]
103 last_updated: Option<i64>,
104 #[serde(default)]
105 sip_timestamp: Option<i64>,
106 #[serde(default)]
107 participant_timestamp: Option<i64>,
108}
109
110impl PolygonSnapshot {
111 fn source_update_ts_ms(&self) -> Option<i64> {
112 [
113 self.fmv_last_updated.and_then(polygon_timestamp_to_ms),
114 self.last_quote
115 .as_ref()
116 .and_then(PolygonMarketTimestamp::source_update_ts_ms),
117 self.last_trade
118 .as_ref()
119 .and_then(PolygonMarketTimestamp::source_update_ts_ms),
120 ]
121 .into_iter()
122 .flatten()
123 .max()
124 }
125}
126
127impl PolygonMarketTimestamp {
128 fn source_update_ts_ms(&self) -> Option<i64> {
129 [
130 self.last_updated,
131 self.sip_timestamp,
132 self.participant_timestamp,
133 ]
134 .into_iter()
135 .flatten()
136 .filter_map(polygon_timestamp_to_ms)
137 .max()
138 }
139}
140
141fn polygon_timestamp_to_ms(timestamp: i64) -> Option<i64> {
142 if timestamp <= 0 {
143 return None;
144 }
145
146 if timestamp >= 1_000_000_000_000_000 {
149 Some(timestamp / 1_000_000)
150 } else if timestamp >= 1_000_000_000_000 {
151 Some(timestamp)
152 } else {
153 None
154 }
155}
156
157pub struct PolygonVolOracle {
158 client: Client,
159 config: PolygonVolOracleConfig,
160 state: Arc<RwLock<PolygonSurfaceState>>,
161 messages_received: AtomicU64,
162 platform_spots: PlatformSpotPrices,
165}
166
167impl PolygonVolOracle {
168 pub fn new(config: PolygonVolOracleConfig, platform_spots: PlatformSpotPrices) -> Self {
169 Self {
170 client: Client::new(),
171 config,
172 state: Arc::new(RwLock::new(PolygonSurfaceState::default())),
173 messages_received: AtomicU64::new(0),
174 platform_spots,
175 }
176 }
177
178 pub fn start_polling(self: Arc<Self>) -> JoinHandle<()> {
179 tokio::spawn(async move {
180 let mut interval = tokio::time::interval(self.config.poll_interval);
181 loop {
182 interval.tick().await;
183 if let Err(err) = self.refresh_all().await {
184 error!("Polygon vol oracle refresh failed: {err:#}");
185 }
186 }
187 })
188 }
189
190 fn compute_strike_scale(
196 &self,
197 underlying: &str,
198 etf_spot: Option<f64>,
199 config: &PolygonUnderlyingConfig,
200 ) -> Result<(f64, Option<f64>)> {
201 let platform_spot = self
202 .platform_spots
203 .read()
204 .expect("platform_spots poisoned")
205 .get(underlying)
206 .copied();
207
208 if let (Some(ps), Some(es)) = (platform_spot, etf_spot) {
209 if ps > 0.0 && es > 0.0 {
210 let dynamic = ps / es;
211 debug!(
212 underlying,
213 platform_spot = ps,
214 etf_spot = es,
215 dynamic_scale = dynamic,
216 "Using dynamic strike scale"
217 );
218 self.state
220 .write()
221 .expect("polygon vol oracle state poisoned")
222 .last_dynamic_scale
223 .insert(underlying.to_string(), dynamic);
224 return Ok((dynamic, Some(ps)));
225 }
226 }
227
228 if config.require_live_strike_scale {
229 anyhow::bail!(
230 "Missing live strike scale inputs for {}: platform_spot={:?}, etf_spot={:?}",
231 underlying,
232 platform_spot,
233 etf_spot
234 );
235 }
236
237 let cached = self
239 .state
240 .read()
241 .expect("polygon vol oracle state poisoned")
242 .last_dynamic_scale
243 .get(underlying)
244 .copied();
245
246 if let Some(scale) = cached {
247 debug!(
248 underlying,
249 cached_scale = scale,
250 "Using cached dynamic strike scale (live spots unavailable)"
251 );
252 return Ok((scale, None));
253 }
254
255 debug!(
256 underlying,
257 fallback_scale = config.strike_scale,
258 "Using static fallback strike scale (no dynamic history)"
259 );
260 Ok((config.strike_scale, None))
261 }
262
263 async fn refresh_all(&self) -> Result<()> {
264 let underlyings = self
265 .config
266 .underlyings
267 .iter()
268 .map(|(underlying, config)| (underlying.clone(), config.clone()))
269 .collect::<Vec<_>>();
270
271 for (underlying, config) in underlyings {
272 match self.fetch_surface(&underlying, &config).await {
273 Ok((surface, etf_spot, platform_spot, source_update_ts_ms)) => {
274 let point_count = surface.len();
275 {
276 let mut state = self
277 .state
278 .write()
279 .expect("polygon vol oracle state poisoned");
280 state.connected.insert(underlying.clone(), true);
281 state
282 .last_update_ts_ms
283 .insert(underlying.clone(), source_update_ts_ms);
284 state.last_error.remove(&underlying);
285 if let Some(spot) = etf_spot {
286 state.etf_spots.insert(underlying.clone(), spot);
287 }
288 if let Some(spot) = platform_spot {
289 state.surface_spots.insert(underlying.clone(), spot);
290 } else {
291 state.surface_spots.remove(&underlying);
292 }
293 state.surfaces.insert(underlying.clone(), surface);
294 }
295 self.messages_received.fetch_add(1, Ordering::Relaxed);
296 counter!(
297 "ht_vol_oracle_messages_received_total",
298 "provider" => VolProviderKind::Polygon.as_str(),
299 "underlying" => underlying.clone()
300 )
301 .increment(1);
302
303 self.check_atm_vol_sanity(&underlying);
305
306 info!(
307 "Updated Polygon vol surface for {} with {} points",
308 underlying, point_count
309 );
310 }
311 Err(err) => {
312 let message = err.to_string();
313 let has_existing = self
314 .state
315 .read()
316 .expect("polygon vol oracle state poisoned")
317 .surfaces
318 .contains_key(&underlying);
319 if has_existing {
320 warn!(
321 "Polygon vol surface refresh failed for {} (keeping last good data): {}",
322 underlying, message
323 );
324 } else {
325 warn!(
326 "Polygon vol surface refresh failed for {}: {}",
327 underlying, message
328 );
329 let mut state = self
330 .state
331 .write()
332 .expect("polygon vol oracle state poisoned");
333 state.connected.insert(underlying.clone(), false);
334 }
335 let mut state = self
336 .state
337 .write()
338 .expect("polygon vol oracle state poisoned");
339 state.last_error.insert(underlying, message);
340 }
341 }
342 }
343
344 Ok(())
345 }
346
347 async fn fetch_surface(
349 &self,
350 underlying: &str,
351 config: &PolygonUnderlyingConfig,
352 ) -> Result<(VolatilitySurface, Option<f64>, Option<f64>, i64)> {
353 let mut raw_points: Vec<(f64, i64, f64, i64)> = Vec::new();
354 let mut etf_spot: Option<f64> = None;
355 let mut source_update_ts_ms: Option<i64> = None;
356 let mut url = format!(
357 "{}/v3/snapshot/options/{}?limit=250&apiKey={}",
358 self.config.base_url, config.ticker, self.config.api_key
359 );
360 let now_ts = Utc::now().timestamp();
361 let mut page = 0_u8;
362
363 while page < 6 {
364 page += 1;
365 let response = self
366 .client
367 .get(&url)
368 .send()
369 .await
370 .with_context(|| format!("Failed to fetch Polygon page {page} for {underlying}"))?
371 .error_for_status()
372 .with_context(|| {
373 format!("Polygon returned non-success status on page {page} for {underlying}")
374 })?
375 .json::<PolygonResponse>()
376 .await
377 .with_context(|| {
378 format!("Failed to decode Polygon response on page {page} for {underlying}")
379 })?;
380
381 if response.status.as_deref() != Some("OK") {
382 anyhow::bail!(
383 "Polygon response status was {:?} on page {} for {}",
384 response.status,
385 page,
386 underlying
387 );
388 }
389
390 for snapshot in response.results.unwrap_or_default() {
391 if etf_spot.is_none() {
393 if let Some(ref ua) = snapshot.underlying_asset {
394 if let Some(price) = ua.price {
395 if price > 0.0 {
396 etf_spot = Some(price);
397 }
398 }
399 }
400 }
401
402 let Some(details) = snapshot.details.as_ref() else {
403 continue;
404 };
405 let Some(strike) = details.strike_price else {
406 continue;
407 };
408 let Some(expiration_date) = details.expiration_date.as_deref() else {
409 continue;
410 };
411 let Some(iv) = snapshot.implied_volatility else {
412 continue;
413 };
414 if !(0.01..5.0).contains(&iv) {
415 continue;
416 }
417 let expiry_date = NaiveDate::parse_from_str(&expiration_date, "%Y-%m-%d")
418 .with_context(|| {
419 format!("Invalid Polygon expiry {expiration_date} for {underlying}")
420 })?;
421 let expiry_code = expiry_date.year() as u64 * 10_000
422 + expiry_date.month() as u64 * 100
423 + expiry_date.day() as u64;
424 let expiry_ts =
425 hypercall_types::expiry_date_to_timestamp_checked(underlying, expiry_code)
426 .with_context(|| {
427 format!("Invalid Polygon expiry {expiration_date} for {underlying}")
428 })? as i64;
429 if expiry_ts <= now_ts {
430 continue;
431 }
432 let Some(point_update_ts_ms) = snapshot.source_update_ts_ms() else {
433 continue;
434 };
435 source_update_ts_ms = Some(
436 source_update_ts_ms
437 .unwrap_or(point_update_ts_ms)
438 .max(point_update_ts_ms),
439 );
440
441 raw_points.push((strike, expiry_ts, iv, point_update_ts_ms));
442 }
443
444 let Some(next_url) = response.next_url else {
445 break;
446 };
447 url = if next_url.contains("apiKey=") {
448 next_url
449 } else {
450 format!("{}&apiKey={}", next_url, self.config.api_key)
451 };
452 }
453
454 if raw_points.is_empty() {
455 anyhow::bail!(
456 "Polygon returned no usable surface points for {underlying} (market may be closed)"
457 );
458 }
459 let source_update_ts_ms = source_update_ts_ms.ok_or_else(|| {
460 anyhow::anyhow!(
461 "Polygon returned no source market timestamps for usable surface points for {underlying}"
462 )
463 })?;
464
465 let (scale, platform_spot_at_build) =
467 self.compute_strike_scale(underlying, etf_spot, config)?;
468
469 let mut surface = VolatilitySurface::new();
470 for (strike, expiry_ts, iv, _point_update_ts_ms) in raw_points {
471 surface.insert(strike * scale, expiry_ts, iv);
472 }
473
474 let platform_spot = self
482 .platform_spots
483 .read()
484 .expect("platform_spots poisoned")
485 .get(underlying)
486 .copied();
487 if let Some(spot) = platform_spot {
488 let clamps = surface.sanitize_arb_free(spot, 0.0);
489 if clamps > 0 {
490 counter!(
491 "ht_vol_surface_arb_clamps_total",
492 "provider" => VolProviderKind::Polygon.as_str(),
493 "underlying" => underlying.to_string()
494 )
495 .increment(clamps as u64);
496 info!(
497 underlying,
498 clamps, "Clamped non-arb-free vol surface points (polygon)"
499 );
500 }
501 }
502
503 Ok((
504 surface,
505 etf_spot,
506 platform_spot_at_build,
507 source_update_ts_ms,
508 ))
509 }
510
511 fn check_atm_vol_sanity(&self, underlying: &str) {
513 let atm_gauge = gauge!(
514 "ht_vol_oracle_atm_iv",
515 "provider" => VolProviderKind::Polygon.as_str(),
516 "underlying" => underlying.to_string()
517 );
518
519 let state = self
520 .state
521 .read()
522 .expect("polygon vol oracle state poisoned");
523 let Some(surface) = state.surfaces.get(underlying) else {
524 atm_gauge.set(-1.0);
525 return;
526 };
527
528 let now_ts = Utc::now().timestamp();
529 let nearest_expiry = surface.expiries().iter().find(|&&e| e > now_ts).copied();
530 let Some(expiry) = nearest_expiry else {
531 atm_gauge.set(-1.0);
532 return;
533 };
534
535 let platform_spot = self
536 .platform_spots
537 .read()
538 .expect("platform_spots poisoned")
539 .get(underlying)
540 .copied();
541 let Some(spot) = platform_spot else {
542 atm_gauge.set(-1.0);
543 return;
544 };
545
546 if let Some(atm_iv) = surface.get_interpolated(spot, expiry) {
547 atm_gauge.set(atm_iv);
548
549 if atm_iv > 2.0 {
550 warn!(
551 underlying,
552 atm_iv,
553 expiry,
554 "Polygon ATM vol exceeds 200% — possible strike scale mismatch or stale data"
555 );
556 }
557 }
558 }
559
560 fn status_for(&self, underlying: &str) -> VolOracleStatus {
561 let state = self
562 .state
563 .read()
564 .expect("polygon vol oracle state poisoned");
565 let last_update_ts_ms = state.last_update_ts_ms.get(underlying).copied();
566 let staleness_seconds = last_update_ts_ms
567 .map(|ts| ((Utc::now().timestamp_millis() - ts) as f64 / 1000.0).max(0.0));
568 let ready = staleness_seconds
569 .map(|age| age <= self.config.staleness_threshold.as_secs_f64())
570 .unwrap_or(false);
571 let surface_points = state
572 .surfaces
573 .get(underlying)
574 .map(VolatilitySurface::len)
575 .unwrap_or(0);
576
577 VolOracleStatus {
578 underlying: underlying.to_string(),
579 provider: VolProviderKind::Polygon,
580 route_facing: true,
581 connected: state.connected.get(underlying).copied().unwrap_or(false),
582 ready,
583 last_update_ts_ms,
584 staleness_seconds,
585 staleness_threshold_seconds: Some(self.config.staleness_threshold.as_secs_f64()),
586 surface_points,
587 messages_received: self.messages_received.load(Ordering::Relaxed),
588 last_error: state.last_error.get(underlying).cloned(),
589 }
590 }
591}
592
593impl RiskVolOracle for PolygonVolOracle {
594 fn get_iv(&self, underlying: &str, strike: f64, expiry_ts: i64) -> Result<f64, VolLookupError> {
595 let status = self.status_for(underlying);
596 if !status.connected {
597 return Err(VolLookupError::UnhealthyProvider {
598 underlying: underlying.to_string(),
599 provider: VolProviderKind::Polygon,
600 reason: status
601 .last_error
602 .unwrap_or_else(|| "not connected".to_string()),
603 });
604 }
605
606 if !status.ready {
607 return Err(VolLookupError::StaleSurface {
608 underlying: underlying.to_string(),
609 provider: VolProviderKind::Polygon,
610 staleness_seconds: status.staleness_seconds.unwrap_or(f64::INFINITY),
611 threshold_seconds: self.config.staleness_threshold.as_secs_f64(),
612 });
613 }
614
615 let state = self
616 .state
617 .read()
618 .expect("polygon vol oracle state poisoned");
619 let iv = state
620 .surfaces
621 .get(underlying)
622 .and_then(|surface| surface.get_interpolated(strike, expiry_ts))
623 .ok_or_else(|| VolLookupError::MissingSurface {
624 underlying: underlying.to_string(),
625 provider: VolProviderKind::Polygon,
626 strike,
627 expiry_ts,
628 })?;
629
630 debug!(
631 underlying,
632 strike,
633 expiry_ts,
634 iv,
635 provider = VolProviderKind::Polygon.as_str(),
636 "Backend vol oracle value used"
637 );
638
639 Ok(iv)
640 }
641
642 fn statuses(&self) -> Vec<VolOracleStatus> {
643 self.config
644 .underlyings
645 .keys()
646 .map(|underlying| self.status_for(underlying))
647 .collect()
648 }
649
650 fn get_surface_snapshot(&self, underlying: &str) -> Option<VolSurfaceSnapshot> {
651 let state = self
652 .state
653 .read()
654 .expect("polygon vol oracle state poisoned");
655 let surface = state.surfaces.get(underlying)?;
656 Some(VolSurfaceSnapshot {
657 underlying: underlying.to_string(),
658 last_update_ts_ms: state.last_update_ts_ms.get(underlying).copied(),
659 expiries: surface.expiries().iter().copied().collect(),
660 strike_points: surface.export_all_points(),
661 delta_curves: surface.export_delta_curves(),
662 atm_vols: surface.export_atm_vols(),
663 spot_price: state.surface_spots.get(underlying).copied(),
667 })
668 }
669
670 fn supports_surface_snapshots(&self) -> bool {
671 true
672 }
673}
674
675#[cfg(test)]
676mod tests {
677 use super::*;
678
679 #[test]
680 fn source_update_ts_uses_market_data_timestamps() {
681 let snapshot = PolygonSnapshot {
682 details: None,
683 implied_volatility: None,
684 underlying_asset: None,
685 last_quote: Some(PolygonMarketTimestamp {
686 last_updated: Some(1_700_000_000_123_456_789),
687 sip_timestamp: None,
688 participant_timestamp: None,
689 }),
690 last_trade: Some(PolygonMarketTimestamp {
691 last_updated: None,
692 sip_timestamp: Some(1_700_000_001_000_000_000),
693 participant_timestamp: None,
694 }),
695 fmv_last_updated: Some(1_699_999_999_000_000_000),
696 };
697
698 assert_eq!(snapshot.source_update_ts_ms(), Some(1_700_000_001_000));
699 }
700
701 #[test]
702 fn source_update_ts_rejects_missing_or_invalid_timestamps() {
703 let snapshot = PolygonSnapshot {
704 details: None,
705 implied_volatility: None,
706 underlying_asset: None,
707 last_quote: Some(PolygonMarketTimestamp {
708 last_updated: Some(0),
709 sip_timestamp: Some(-1),
710 participant_timestamp: None,
711 }),
712 last_trade: None,
713 fmv_last_updated: None,
714 };
715
716 assert_eq!(snapshot.source_update_ts_ms(), None);
717 }
718
719 #[test]
720 fn snapshot_uses_surface_build_spot_not_current_platform_spot() {
721 let platform_spots = Arc::new(RwLock::new(HashMap::from([("OIL".to_string(), 120.0)])));
722 let oracle = PolygonVolOracle::new(
723 PolygonVolOracleConfig {
724 api_key: "test".to_string(),
725 base_url: "http://127.0.0.1:9".to_string(),
726 poll_interval: Duration::from_secs(60),
727 staleness_threshold: Duration::from_secs(180),
728 underlyings: HashMap::from([(
729 "OIL".to_string(),
730 PolygonUnderlyingConfig {
731 ticker: "OIL".to_string(),
732 strike_scale: 1.0,
733 require_live_strike_scale: true,
734 },
735 )]),
736 },
737 platform_spots.clone(),
738 );
739 let expiry = Utc::now().timestamp() + 30 * 86_400;
740 let mut surface = VolatilitySurface::new();
741 surface.insert(100.0, expiry, 0.50);
742 {
743 let mut state = oracle
744 .state
745 .write()
746 .expect("polygon vol oracle state poisoned");
747 state.surfaces.insert("OIL".to_string(), surface);
748 state
749 .last_update_ts_ms
750 .insert("OIL".to_string(), Utc::now().timestamp_millis());
751 state.surface_spots.insert("OIL".to_string(), 100.0);
752 }
753 platform_spots
754 .write()
755 .expect("platform_spots poisoned")
756 .insert("OIL".to_string(), 140.0);
757
758 let snapshot = oracle
759 .get_surface_snapshot("OIL")
760 .expect("snapshot should exist");
761 assert_eq!(snapshot.spot_price, Some(100.0));
762 }
763}