Skip to main content

hypercall_vol_oracle/
polygon_oracle.rs

1// TODO(clients): Move Polygon HTTP transport and wire models into a dedicated client crate.
2
3use std::collections::HashMap;
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::sync::{Arc, RwLock};
6use std::time::Duration;
7
8use anyhow::{Context, Result};
9use chrono::{Datelike, NaiveDate, Utc};
10use metrics::{counter, gauge};
11use reqwest::Client;
12use serde::Deserialize;
13use tokio::task::JoinHandle;
14use tracing::{debug, error, info, warn};
15
16use super::risk_oracle::{
17    RiskVolOracle, VolLookupError, VolOracleStatus, VolProviderKind, VolSurfaceSnapshot,
18};
19use super::vol_surface_cache::VolatilitySurface;
20
21pub const DEFAULT_POLYGON_BASE_URL: &str = "https://api.polygon.io";
22
23#[derive(Debug, Clone)]
24pub struct PolygonVolOracleConfig {
25    pub api_key: String,
26    pub base_url: String,
27    pub poll_interval: Duration,
28    pub staleness_threshold: Duration,
29    pub underlyings: HashMap<String, PolygonUnderlyingConfig>,
30}
31
32#[derive(Debug, Clone)]
33pub struct PolygonUnderlyingConfig {
34    pub ticker: String,
35    /// Static scale for local/test routes. Production routes should require live dynamic scaling.
36    pub strike_scale: f64,
37    pub require_live_strike_scale: bool,
38}
39
40/// Shared handle for injecting platform spot prices into the Polygon oracle.
41/// The oracle uses these to compute a dynamic strike scale at each refresh.
42pub type PlatformSpotPrices = Arc<RwLock<HashMap<String, f64>>>;
43
44#[derive(Debug, Default)]
45struct PolygonSurfaceState {
46    surfaces: HashMap<String, VolatilitySurface>,
47    connected: HashMap<String, bool>,
48    last_update_ts_ms: HashMap<String, i64>,
49    last_error: HashMap<String, String>,
50    /// ETF spot prices extracted from the most recent Polygon response.
51    etf_spots: HashMap<String, f64>,
52    /// Platform spot used when the current scaled surface was built.
53    surface_spots: HashMap<String, f64>,
54    /// Last successfully computed dynamic strike scale per underlying.
55    /// Exposed for diagnostics only. Production live refreshes still require
56    /// current platform and ETF spots when `require_live_strike_scale` is set.
57    last_dynamic_scale: HashMap<String, f64>,
58}
59
60#[derive(Debug, Deserialize)]
61struct PolygonResponse {
62    #[serde(default)]
63    status: Option<String>,
64    #[serde(default)]
65    results: Option<Vec<PolygonSnapshot>>,
66    #[serde(default)]
67    next_url: Option<String>,
68}
69
70#[derive(Debug, Deserialize)]
71struct PolygonSnapshot {
72    #[serde(default)]
73    details: Option<PolygonDetails>,
74    #[serde(default)]
75    implied_volatility: Option<f64>,
76    #[serde(default)]
77    underlying_asset: Option<PolygonUnderlyingAsset>,
78    #[serde(default)]
79    last_quote: Option<PolygonMarketTimestamp>,
80    #[serde(default)]
81    last_trade: Option<PolygonMarketTimestamp>,
82    #[serde(default)]
83    fmv_last_updated: Option<i64>,
84}
85
86#[derive(Debug, Deserialize)]
87struct PolygonDetails {
88    #[serde(default)]
89    strike_price: Option<f64>,
90    #[serde(default)]
91    expiration_date: Option<String>,
92}
93
94#[derive(Debug, Deserialize)]
95struct PolygonUnderlyingAsset {
96    #[serde(default)]
97    price: Option<f64>,
98}
99
100#[derive(Debug, Deserialize)]
101struct PolygonMarketTimestamp {
102    #[serde(default)]
103    last_updated: Option<i64>,
104    #[serde(default)]
105    sip_timestamp: Option<i64>,
106    #[serde(default)]
107    participant_timestamp: Option<i64>,
108}
109
110impl PolygonSnapshot {
111    fn source_update_ts_ms(&self) -> Option<i64> {
112        [
113            self.fmv_last_updated.and_then(polygon_timestamp_to_ms),
114            self.last_quote
115                .as_ref()
116                .and_then(PolygonMarketTimestamp::source_update_ts_ms),
117            self.last_trade
118                .as_ref()
119                .and_then(PolygonMarketTimestamp::source_update_ts_ms),
120        ]
121        .into_iter()
122        .flatten()
123        .max()
124    }
125}
126
127impl PolygonMarketTimestamp {
128    fn source_update_ts_ms(&self) -> Option<i64> {
129        [
130            self.last_updated,
131            self.sip_timestamp,
132            self.participant_timestamp,
133        ]
134        .into_iter()
135        .flatten()
136        .filter_map(polygon_timestamp_to_ms)
137        .max()
138    }
139}
140
141fn polygon_timestamp_to_ms(timestamp: i64) -> Option<i64> {
142    if timestamp <= 0 {
143        return None;
144    }
145
146    // Polygon option snapshots report market-data timestamps in nanoseconds.
147    // Be permissive for fixtures or future fields that may already be millis.
148    if timestamp >= 1_000_000_000_000_000 {
149        Some(timestamp / 1_000_000)
150    } else if timestamp >= 1_000_000_000_000 {
151        Some(timestamp)
152    } else {
153        None
154    }
155}
156
157pub struct PolygonVolOracle {
158    client: Client,
159    config: PolygonVolOracleConfig,
160    state: Arc<RwLock<PolygonSurfaceState>>,
161    messages_received: AtomicU64,
162    /// Platform spot prices injected externally (e.g. from GreeksCache).
163    /// Used to compute dynamic strike_scale = platform_spot / etf_spot.
164    platform_spots: PlatformSpotPrices,
165}
166
167impl PolygonVolOracle {
168    pub fn new(config: PolygonVolOracleConfig, platform_spots: PlatformSpotPrices) -> Self {
169        Self {
170            client: Client::new(),
171            config,
172            state: Arc::new(RwLock::new(PolygonSurfaceState::default())),
173            messages_received: AtomicU64::new(0),
174            platform_spots,
175        }
176    }
177
178    pub fn start_polling(self: Arc<Self>) -> JoinHandle<()> {
179        tokio::spawn(async move {
180            let mut interval = tokio::time::interval(self.config.poll_interval);
181            loop {
182                interval.tick().await;
183                if let Err(err) = self.refresh_all().await {
184                    error!("Polygon vol oracle refresh failed: {err:#}");
185                }
186            }
187        })
188    }
189
190    /// Compute the strike scale for an underlying.
191    ///
192    /// Priority: live dynamic ratio > cached last-good dynamic > static config.
193    /// The last-good dynamic scale is stored in state so that transient spot
194    /// price gaps after warm-up don't silently revert to a stale config value.
195    fn compute_strike_scale(
196        &self,
197        underlying: &str,
198        etf_spot: Option<f64>,
199        config: &PolygonUnderlyingConfig,
200    ) -> Result<(f64, Option<f64>)> {
201        let platform_spot = self
202            .platform_spots
203            .read()
204            .expect("platform_spots poisoned")
205            .get(underlying)
206            .copied();
207
208        if let (Some(ps), Some(es)) = (platform_spot, etf_spot) {
209            if ps > 0.0 && es > 0.0 {
210                let dynamic = ps / es;
211                debug!(
212                    underlying,
213                    platform_spot = ps,
214                    etf_spot = es,
215                    dynamic_scale = dynamic,
216                    "Using dynamic strike scale"
217                );
218                // Cache for reuse during transient gaps.
219                self.state
220                    .write()
221                    .expect("polygon vol oracle state poisoned")
222                    .last_dynamic_scale
223                    .insert(underlying.to_string(), dynamic);
224                return Ok((dynamic, Some(ps)));
225            }
226        }
227
228        if config.require_live_strike_scale {
229            anyhow::bail!(
230                "Missing live strike scale inputs for {}: platform_spot={:?}, etf_spot={:?}",
231                underlying,
232                platform_spot,
233                etf_spot
234            );
235        }
236
237        // Prefer last-good dynamic over static config.
238        let cached = self
239            .state
240            .read()
241            .expect("polygon vol oracle state poisoned")
242            .last_dynamic_scale
243            .get(underlying)
244            .copied();
245
246        if let Some(scale) = cached {
247            debug!(
248                underlying,
249                cached_scale = scale,
250                "Using cached dynamic strike scale (live spots unavailable)"
251            );
252            return Ok((scale, None));
253        }
254
255        debug!(
256            underlying,
257            fallback_scale = config.strike_scale,
258            "Using static fallback strike scale (no dynamic history)"
259        );
260        Ok((config.strike_scale, None))
261    }
262
263    async fn refresh_all(&self) -> Result<()> {
264        let underlyings = self
265            .config
266            .underlyings
267            .iter()
268            .map(|(underlying, config)| (underlying.clone(), config.clone()))
269            .collect::<Vec<_>>();
270
271        for (underlying, config) in underlyings {
272            match self.fetch_surface(&underlying, &config).await {
273                Ok((surface, etf_spot, platform_spot, source_update_ts_ms)) => {
274                    let point_count = surface.len();
275                    {
276                        let mut state = self
277                            .state
278                            .write()
279                            .expect("polygon vol oracle state poisoned");
280                        state.connected.insert(underlying.clone(), true);
281                        state
282                            .last_update_ts_ms
283                            .insert(underlying.clone(), source_update_ts_ms);
284                        state.last_error.remove(&underlying);
285                        if let Some(spot) = etf_spot {
286                            state.etf_spots.insert(underlying.clone(), spot);
287                        }
288                        if let Some(spot) = platform_spot {
289                            state.surface_spots.insert(underlying.clone(), spot);
290                        } else {
291                            state.surface_spots.remove(&underlying);
292                        }
293                        state.surfaces.insert(underlying.clone(), surface);
294                    }
295                    self.messages_received.fetch_add(1, Ordering::Relaxed);
296                    counter!(
297                        "ht_vol_oracle_messages_received_total",
298                        "provider" => VolProviderKind::Polygon.as_str(),
299                        "underlying" => underlying.clone()
300                    )
301                    .increment(1);
302
303                    // Vol sanity check: warn if ATM vol looks unreasonable.
304                    self.check_atm_vol_sanity(&underlying);
305
306                    info!(
307                        "Updated Polygon vol surface for {} with {} points",
308                        underlying, point_count
309                    );
310                }
311                Err(err) => {
312                    let message = err.to_string();
313                    let has_existing = self
314                        .state
315                        .read()
316                        .expect("polygon vol oracle state poisoned")
317                        .surfaces
318                        .contains_key(&underlying);
319                    if has_existing {
320                        warn!(
321                            "Polygon vol surface refresh failed for {} (keeping last good data): {}",
322                            underlying, message
323                        );
324                    } else {
325                        warn!(
326                            "Polygon vol surface refresh failed for {}: {}",
327                            underlying, message
328                        );
329                        let mut state = self
330                            .state
331                            .write()
332                            .expect("polygon vol oracle state poisoned");
333                        state.connected.insert(underlying.clone(), false);
334                    }
335                    let mut state = self
336                        .state
337                        .write()
338                        .expect("polygon vol oracle state poisoned");
339                    state.last_error.insert(underlying, message);
340                }
341            }
342        }
343
344        Ok(())
345    }
346
347    /// Fetch vol surface from Polygon, returning the surface and the ETF spot price.
348    async fn fetch_surface(
349        &self,
350        underlying: &str,
351        config: &PolygonUnderlyingConfig,
352    ) -> Result<(VolatilitySurface, Option<f64>, Option<f64>, i64)> {
353        let mut raw_points: Vec<(f64, i64, f64, i64)> = Vec::new();
354        let mut etf_spot: Option<f64> = None;
355        let mut source_update_ts_ms: Option<i64> = None;
356        let mut url = format!(
357            "{}/v3/snapshot/options/{}?limit=250&apiKey={}",
358            self.config.base_url, config.ticker, self.config.api_key
359        );
360        let now_ts = Utc::now().timestamp();
361        let mut page = 0_u8;
362
363        while page < 6 {
364            page += 1;
365            let response = self
366                .client
367                .get(&url)
368                .send()
369                .await
370                .with_context(|| format!("Failed to fetch Polygon page {page} for {underlying}"))?
371                .error_for_status()
372                .with_context(|| {
373                    format!("Polygon returned non-success status on page {page} for {underlying}")
374                })?
375                .json::<PolygonResponse>()
376                .await
377                .with_context(|| {
378                    format!("Failed to decode Polygon response on page {page} for {underlying}")
379                })?;
380
381            if response.status.as_deref() != Some("OK") {
382                anyhow::bail!(
383                    "Polygon response status was {:?} on page {} for {}",
384                    response.status,
385                    page,
386                    underlying
387                );
388            }
389
390            for snapshot in response.results.unwrap_or_default() {
391                // Extract ETF spot from the first snapshot that has it.
392                if etf_spot.is_none() {
393                    if let Some(ref ua) = snapshot.underlying_asset {
394                        if let Some(price) = ua.price {
395                            if price > 0.0 {
396                                etf_spot = Some(price);
397                            }
398                        }
399                    }
400                }
401
402                let Some(details) = snapshot.details.as_ref() else {
403                    continue;
404                };
405                let Some(strike) = details.strike_price else {
406                    continue;
407                };
408                let Some(expiration_date) = details.expiration_date.as_deref() else {
409                    continue;
410                };
411                let Some(iv) = snapshot.implied_volatility else {
412                    continue;
413                };
414                if !(0.01..5.0).contains(&iv) {
415                    continue;
416                }
417                let expiry_date = NaiveDate::parse_from_str(&expiration_date, "%Y-%m-%d")
418                    .with_context(|| {
419                        format!("Invalid Polygon expiry {expiration_date} for {underlying}")
420                    })?;
421                let expiry_code = expiry_date.year() as u64 * 10_000
422                    + expiry_date.month() as u64 * 100
423                    + expiry_date.day() as u64;
424                let expiry_ts =
425                    hypercall_types::expiry_date_to_timestamp_checked(underlying, expiry_code)
426                        .with_context(|| {
427                            format!("Invalid Polygon expiry {expiration_date} for {underlying}")
428                        })? as i64;
429                if expiry_ts <= now_ts {
430                    continue;
431                }
432                let Some(point_update_ts_ms) = snapshot.source_update_ts_ms() else {
433                    continue;
434                };
435                source_update_ts_ms = Some(
436                    source_update_ts_ms
437                        .unwrap_or(point_update_ts_ms)
438                        .max(point_update_ts_ms),
439                );
440
441                raw_points.push((strike, expiry_ts, iv, point_update_ts_ms));
442            }
443
444            let Some(next_url) = response.next_url else {
445                break;
446            };
447            url = if next_url.contains("apiKey=") {
448                next_url
449            } else {
450                format!("{}&apiKey={}", next_url, self.config.api_key)
451            };
452        }
453
454        if raw_points.is_empty() {
455            anyhow::bail!(
456                "Polygon returned no usable surface points for {underlying} (market may be closed)"
457            );
458        }
459        let source_update_ts_ms = source_update_ts_ms.ok_or_else(|| {
460            anyhow::anyhow!(
461                "Polygon returned no source market timestamps for usable surface points for {underlying}"
462            )
463        })?;
464
465        // Compute the strike scale now that we know the ETF spot.
466        let (scale, platform_spot_at_build) =
467            self.compute_strike_scale(underlying, etf_spot, config)?;
468
469        let mut surface = VolatilitySurface::new();
470        for (strike, expiry_ts, iv, _point_update_ts_ms) in raw_points {
471            surface.insert(strike * scale, expiry_ts, iv);
472        }
473
474        // Polygon's per-instrument IVs are solved from exchange mids and can
475        // carry cross-model artifacts on the OTM wings (e.g. a jump/tail fit
476        // that makes a higher-strike call more expensive than a lower-strike
477        // call in the same expiry). Enforce static no-arbitrage in the surface
478        // itself — monotone and convex call prices per expiry — before anyone
479        // reads from it. Requires platform spot (surface strikes are already in
480        // platform space via `scale`).
481        let platform_spot = self
482            .platform_spots
483            .read()
484            .expect("platform_spots poisoned")
485            .get(underlying)
486            .copied();
487        if let Some(spot) = platform_spot {
488            let clamps = surface.sanitize_arb_free(spot, 0.0);
489            if clamps > 0 {
490                counter!(
491                    "ht_vol_surface_arb_clamps_total",
492                    "provider" => VolProviderKind::Polygon.as_str(),
493                    "underlying" => underlying.to_string()
494                )
495                .increment(clamps as u64);
496                info!(
497                    underlying,
498                    clamps, "Clamped non-arb-free vol surface points (polygon)"
499                );
500            }
501        }
502
503        Ok((
504            surface,
505            etf_spot,
506            platform_spot_at_build,
507            source_update_ts_ms,
508        ))
509    }
510
511    /// Emit a warning and metric if the ATM vol for an underlying looks unreasonable.
512    fn check_atm_vol_sanity(&self, underlying: &str) {
513        let atm_gauge = gauge!(
514            "ht_vol_oracle_atm_iv",
515            "provider" => VolProviderKind::Polygon.as_str(),
516            "underlying" => underlying.to_string()
517        );
518
519        let state = self
520            .state
521            .read()
522            .expect("polygon vol oracle state poisoned");
523        let Some(surface) = state.surfaces.get(underlying) else {
524            atm_gauge.set(-1.0);
525            return;
526        };
527
528        let now_ts = Utc::now().timestamp();
529        let nearest_expiry = surface.expiries().iter().find(|&&e| e > now_ts).copied();
530        let Some(expiry) = nearest_expiry else {
531            atm_gauge.set(-1.0);
532            return;
533        };
534
535        let platform_spot = self
536            .platform_spots
537            .read()
538            .expect("platform_spots poisoned")
539            .get(underlying)
540            .copied();
541        let Some(spot) = platform_spot else {
542            atm_gauge.set(-1.0);
543            return;
544        };
545
546        if let Some(atm_iv) = surface.get_interpolated(spot, expiry) {
547            atm_gauge.set(atm_iv);
548
549            if atm_iv > 2.0 {
550                warn!(
551                    underlying,
552                    atm_iv,
553                    expiry,
554                    "Polygon ATM vol exceeds 200% — possible strike scale mismatch or stale data"
555                );
556            }
557        }
558    }
559
560    fn status_for(&self, underlying: &str) -> VolOracleStatus {
561        let state = self
562            .state
563            .read()
564            .expect("polygon vol oracle state poisoned");
565        let last_update_ts_ms = state.last_update_ts_ms.get(underlying).copied();
566        let staleness_seconds = last_update_ts_ms
567            .map(|ts| ((Utc::now().timestamp_millis() - ts) as f64 / 1000.0).max(0.0));
568        let ready = staleness_seconds
569            .map(|age| age <= self.config.staleness_threshold.as_secs_f64())
570            .unwrap_or(false);
571        let surface_points = state
572            .surfaces
573            .get(underlying)
574            .map(VolatilitySurface::len)
575            .unwrap_or(0);
576
577        VolOracleStatus {
578            underlying: underlying.to_string(),
579            provider: VolProviderKind::Polygon,
580            route_facing: true,
581            connected: state.connected.get(underlying).copied().unwrap_or(false),
582            ready,
583            last_update_ts_ms,
584            staleness_seconds,
585            staleness_threshold_seconds: Some(self.config.staleness_threshold.as_secs_f64()),
586            surface_points,
587            messages_received: self.messages_received.load(Ordering::Relaxed),
588            last_error: state.last_error.get(underlying).cloned(),
589        }
590    }
591}
592
593impl RiskVolOracle for PolygonVolOracle {
594    fn get_iv(&self, underlying: &str, strike: f64, expiry_ts: i64) -> Result<f64, VolLookupError> {
595        let status = self.status_for(underlying);
596        if !status.connected {
597            return Err(VolLookupError::UnhealthyProvider {
598                underlying: underlying.to_string(),
599                provider: VolProviderKind::Polygon,
600                reason: status
601                    .last_error
602                    .unwrap_or_else(|| "not connected".to_string()),
603            });
604        }
605
606        if !status.ready {
607            return Err(VolLookupError::StaleSurface {
608                underlying: underlying.to_string(),
609                provider: VolProviderKind::Polygon,
610                staleness_seconds: status.staleness_seconds.unwrap_or(f64::INFINITY),
611                threshold_seconds: self.config.staleness_threshold.as_secs_f64(),
612            });
613        }
614
615        let state = self
616            .state
617            .read()
618            .expect("polygon vol oracle state poisoned");
619        let iv = state
620            .surfaces
621            .get(underlying)
622            .and_then(|surface| surface.get_interpolated(strike, expiry_ts))
623            .ok_or_else(|| VolLookupError::MissingSurface {
624                underlying: underlying.to_string(),
625                provider: VolProviderKind::Polygon,
626                strike,
627                expiry_ts,
628            })?;
629
630        debug!(
631            underlying,
632            strike,
633            expiry_ts,
634            iv,
635            provider = VolProviderKind::Polygon.as_str(),
636            "Backend vol oracle value used"
637        );
638
639        Ok(iv)
640    }
641
642    fn statuses(&self) -> Vec<VolOracleStatus> {
643        self.config
644            .underlyings
645            .keys()
646            .map(|underlying| self.status_for(underlying))
647            .collect()
648    }
649
650    fn get_surface_snapshot(&self, underlying: &str) -> Option<VolSurfaceSnapshot> {
651        let state = self
652            .state
653            .read()
654            .expect("polygon vol oracle state poisoned");
655        let surface = state.surfaces.get(underlying)?;
656        Some(VolSurfaceSnapshot {
657            underlying: underlying.to_string(),
658            last_update_ts_ms: state.last_update_ts_ms.get(underlying).copied(),
659            expiries: surface.expiries().iter().copied().collect(),
660            strike_points: surface.export_all_points(),
661            delta_curves: surface.export_delta_curves(),
662            atm_vols: surface.export_atm_vols(),
663            // Return the platform spot from the refresh that produced this
664            // scaled surface. Reading the live spot here would erase the
665            // moneyness shift needed by closed-session transforms.
666            spot_price: state.surface_spots.get(underlying).copied(),
667        })
668    }
669
670    fn supports_surface_snapshots(&self) -> bool {
671        true
672    }
673}
674
675#[cfg(test)]
676mod tests {
677    use super::*;
678
679    #[test]
680    fn source_update_ts_uses_market_data_timestamps() {
681        let snapshot = PolygonSnapshot {
682            details: None,
683            implied_volatility: None,
684            underlying_asset: None,
685            last_quote: Some(PolygonMarketTimestamp {
686                last_updated: Some(1_700_000_000_123_456_789),
687                sip_timestamp: None,
688                participant_timestamp: None,
689            }),
690            last_trade: Some(PolygonMarketTimestamp {
691                last_updated: None,
692                sip_timestamp: Some(1_700_000_001_000_000_000),
693                participant_timestamp: None,
694            }),
695            fmv_last_updated: Some(1_699_999_999_000_000_000),
696        };
697
698        assert_eq!(snapshot.source_update_ts_ms(), Some(1_700_000_001_000));
699    }
700
701    #[test]
702    fn source_update_ts_rejects_missing_or_invalid_timestamps() {
703        let snapshot = PolygonSnapshot {
704            details: None,
705            implied_volatility: None,
706            underlying_asset: None,
707            last_quote: Some(PolygonMarketTimestamp {
708                last_updated: Some(0),
709                sip_timestamp: Some(-1),
710                participant_timestamp: None,
711            }),
712            last_trade: None,
713            fmv_last_updated: None,
714        };
715
716        assert_eq!(snapshot.source_update_ts_ms(), None);
717    }
718
719    #[test]
720    fn snapshot_uses_surface_build_spot_not_current_platform_spot() {
721        let platform_spots = Arc::new(RwLock::new(HashMap::from([("OIL".to_string(), 120.0)])));
722        let oracle = PolygonVolOracle::new(
723            PolygonVolOracleConfig {
724                api_key: "test".to_string(),
725                base_url: "http://127.0.0.1:9".to_string(),
726                poll_interval: Duration::from_secs(60),
727                staleness_threshold: Duration::from_secs(180),
728                underlyings: HashMap::from([(
729                    "OIL".to_string(),
730                    PolygonUnderlyingConfig {
731                        ticker: "OIL".to_string(),
732                        strike_scale: 1.0,
733                        require_live_strike_scale: true,
734                    },
735                )]),
736            },
737            platform_spots.clone(),
738        );
739        let expiry = Utc::now().timestamp() + 30 * 86_400;
740        let mut surface = VolatilitySurface::new();
741        surface.insert(100.0, expiry, 0.50);
742        {
743            let mut state = oracle
744                .state
745                .write()
746                .expect("polygon vol oracle state poisoned");
747            state.surfaces.insert("OIL".to_string(), surface);
748            state
749                .last_update_ts_ms
750                .insert("OIL".to_string(), Utc::now().timestamp_millis());
751            state.surface_spots.insert("OIL".to_string(), 100.0);
752        }
753        platform_spots
754            .write()
755            .expect("platform_spots poisoned")
756            .insert("OIL".to_string(), 140.0);
757
758        let snapshot = oracle
759            .get_surface_snapshot("OIL")
760            .expect("snapshot should exist");
761        assert_eq!(snapshot.spot_price, Some(100.0));
762    }
763}