1use std::collections::HashMap;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::{Arc, RwLock};
11use std::time::Duration;
12
13use anyhow::{Context, Result};
14use chrono::Utc;
15use metrics::counter;
16use reqwest::Client;
17use serde::Deserialize;
18use tokio::task::JoinHandle;
19use tracing::{debug, error, info, warn};
20
21use super::risk_oracle::{
22 RiskVolOracle, VolLookupError, VolOracleStatus, VolProviderKind, VolSurfaceSnapshot,
23};
24use super::vol_surface_cache::VolatilitySurface;
25
26const GAMMA_API_BASE: &str = "https://gamma-api.polymarket.com";
27
28#[derive(Debug, Clone)]
29pub struct PolymarketVolOracleConfig {
30 pub event_slug: String,
31 pub shares_outstanding: f64,
33 pub reference_tte_days: f64,
36 pub poll_interval: Duration,
37 pub staleness_threshold: Duration,
38 pub symbols: Vec<String>,
39}
40
41#[derive(Debug, Default)]
42struct PolymarketSurfaceState {
43 surfaces: HashMap<String, VolatilitySurface>,
44 connected: HashMap<String, bool>,
45 last_update_ts_ms: HashMap<String, i64>,
46 last_error: HashMap<String, String>,
47 spot_prices: HashMap<String, f64>,
48}
49
50#[derive(Debug, Deserialize)]
53struct GammaEvent {
54 markets: Option<Vec<GammaMarket>>,
55}
56
57#[derive(Debug, Deserialize)]
58#[allow(dead_code)]
59struct GammaMarket {
60 question: Option<String>,
61 #[serde(rename = "outcomePrices")]
62 outcome_prices: Option<String>,
63}
64
65pub struct PolymarketVolOracle {
66 client: Client,
67 config: PolymarketVolOracleConfig,
68 state: Arc<RwLock<PolymarketSurfaceState>>,
69 messages_received: AtomicU64,
70}
71
72impl PolymarketVolOracle {
73 pub fn new(config: PolymarketVolOracleConfig) -> Self {
74 Self {
75 client: Client::builder()
76 .timeout(Duration::from_secs(15))
77 .build()
78 .expect("failed to build HTTP client"),
79 config,
80 state: Arc::new(RwLock::new(PolymarketSurfaceState::default())),
81 messages_received: AtomicU64::new(0),
82 }
83 }
84
85 pub fn start_polling(self: Arc<Self>) -> JoinHandle<()> {
86 tokio::spawn(async move {
87 let mut interval = tokio::time::interval(self.config.poll_interval);
88 loop {
89 interval.tick().await;
90 if let Err(err) = self.refresh_all().await {
91 error!("Polymarket vol oracle refresh failed: {err:#}");
92 }
93 }
94 })
95 }
96
97 async fn refresh_all(&self) -> Result<()> {
98 match self.fetch_surface().await {
99 Ok((surface, spot)) => {
100 let now_ms = Utc::now().timestamp_millis();
101 let point_count = surface.len();
102 {
103 let mut state = self
104 .state
105 .write()
106 .expect("polymarket vol oracle state poisoned");
107 for symbol in &self.config.symbols {
108 state.surfaces.insert(symbol.clone(), surface.clone());
109 state.connected.insert(symbol.clone(), true);
110 state.last_update_ts_ms.insert(symbol.clone(), now_ms);
111 state.last_error.remove(symbol);
112 state.spot_prices.insert(symbol.clone(), spot);
113 }
114 }
115 self.messages_received.fetch_add(1, Ordering::Relaxed);
116 for symbol in &self.config.symbols {
117 counter!(
118 "ht_vol_oracle_messages_received_total",
119 "provider" => VolProviderKind::Polymarket.as_str(),
120 "underlying" => symbol.clone()
121 )
122 .increment(1);
123 }
124 info!(
125 "Updated Polymarket vol surface with {} points (spot ${:.2})",
126 point_count, spot
127 );
128 }
129 Err(err) => {
130 let message = err.to_string();
131 let mut state = self
132 .state
133 .write()
134 .expect("polymarket vol oracle state poisoned");
135 for symbol in &self.config.symbols {
136 state.connected.insert(symbol.clone(), false);
137 state.last_error.insert(symbol.clone(), message.clone());
138 if state.surfaces.contains_key(symbol) {
139 warn!(
140 "Polymarket vol surface refresh failed for {} (keeping last good data): {}",
141 symbol, message
142 );
143 } else {
144 warn!(
145 "Polymarket vol surface refresh failed for {}: {}",
146 symbol, message
147 );
148 }
149 }
150 }
151 }
152 Ok(())
153 }
154
155 async fn fetch_surface(&self) -> Result<(VolatilitySurface, f64)> {
156 let url = format!("{}/events?slug={}", GAMMA_API_BASE, self.config.event_slug);
157 let events: Vec<GammaEvent> = self
158 .client
159 .get(&url)
160 .send()
161 .await
162 .with_context(|| {
163 format!(
164 "Failed to fetch Polymarket event {}",
165 self.config.event_slug
166 )
167 })?
168 .error_for_status()
169 .with_context(|| "Polymarket Gamma API returned non-success")?
170 .json()
171 .await
172 .with_context(|| "Failed to decode Polymarket event response")?;
173
174 let event = events
175 .into_iter()
176 .next()
177 .context("Polymarket returned no events for slug")?;
178
179 let markets = event.markets.context("Event has no markets")?;
180 let mut strike_probs: Vec<(f64, f64)> = Vec::new();
181
182 for market in &markets {
183 let question = match &market.question {
184 Some(q) => q,
185 None => continue,
186 };
187
188 let cap_dollars = match parse_market_cap_from_question(question) {
189 Some(cap) => cap,
190 None => {
191 debug!("Skipping unparseable Polymarket question: {}", question);
192 continue;
193 }
194 };
195
196 let strike = cap_dollars / self.config.shares_outstanding;
197
198 let prob = match &market.outcome_prices {
199 Some(prices_str) => match parse_yes_probability(prices_str) {
200 Some(p) => p,
201 None => continue,
202 },
203 None => continue,
204 };
205
206 if prob > 0.02 && prob < 0.98 {
207 strike_probs.push((strike, prob));
208 }
209 }
210
211 if strike_probs.len() < 3 {
212 anyhow::bail!(
213 "Polymarket returned only {} usable strike-probability pairs (need >= 3)",
214 strike_probs.len()
215 );
216 }
217
218 strike_probs.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap());
219
220 let spot = estimate_spot_from_cdf(&strike_probs);
222
223 let tte_years = self.config.reference_tte_days / 365.25;
224
225 let mut surface = VolatilitySurface::with_precision(1.0);
227
228 let now_ts = Utc::now().timestamp();
230 let day_secs: i64 = 86400;
231 let mut expiry_timestamps = Vec::new();
232 for d in 1..=7 {
233 expiry_timestamps.push(now_ts + d * day_secs);
234 }
235 for w in 2..=12 {
236 expiry_timestamps.push(now_ts + w * 7 * day_secs);
237 }
238
239 let mut total_inserted = 0_u32;
240 for (strike, prob) in &strike_probs {
241 let iv = match solve_digital_iv(spot, *strike, tte_years, *prob) {
242 Some(v) if v > 0.05 && v < 5.0 => v,
243 _ => {
244 debug!(
245 "Skipping Polymarket strike {:.1} prob {:.3}: IV solve failed",
246 strike, prob
247 );
248 continue;
249 }
250 };
251
252 for &expiry_ts in &expiry_timestamps {
253 surface.insert(*strike, expiry_ts, iv);
254 total_inserted += 1;
255 }
256
257 let moneyness = (*strike / spot - 1.0).abs();
258 if moneyness < 0.05 {
259 for &expiry_ts in &expiry_timestamps {
260 surface.set_atm_vol(expiry_ts, iv);
261 }
262 }
263 }
264
265 if total_inserted == 0 {
266 anyhow::bail!("Failed to extract any IVs from Polymarket data");
267 }
268
269 if spot > 0.0 {
270 let clamps = surface.sanitize_arb_free(spot, 0.0);
271 if clamps > 0 {
272 counter!(
273 "ht_vol_surface_arb_clamps_total",
274 "provider" => VolProviderKind::Polymarket.as_str(),
275 "underlying" => "SPCX".to_string()
276 )
277 .increment(clamps as u64);
278 info!(
279 clamps,
280 "Clamped non-arb-free vol surface points (polymarket)"
281 );
282 }
283 }
284
285 Ok((surface, spot))
286 }
287
288 fn status_for(&self, symbol: &str) -> VolOracleStatus {
289 let state = self
290 .state
291 .read()
292 .expect("polymarket vol oracle state poisoned");
293 let last_update_ts_ms = state.last_update_ts_ms.get(symbol).copied();
294 let staleness_seconds = last_update_ts_ms
295 .map(|ts| ((Utc::now().timestamp_millis() - ts) as f64 / 1000.0).max(0.0));
296 let ready = staleness_seconds
297 .map(|age| age <= self.config.staleness_threshold.as_secs_f64())
298 .unwrap_or(false);
299 let surface_points = state
300 .surfaces
301 .get(symbol)
302 .map(VolatilitySurface::len)
303 .unwrap_or(0);
304
305 VolOracleStatus {
306 underlying: symbol.to_string(),
307 provider: VolProviderKind::Polymarket,
308 route_facing: true,
309 connected: state.connected.get(symbol).copied().unwrap_or(false),
310 ready,
311 last_update_ts_ms,
312 staleness_seconds,
313 staleness_threshold_seconds: Some(self.config.staleness_threshold.as_secs_f64()),
314 surface_points,
315 messages_received: self.messages_received.load(Ordering::Relaxed),
316 last_error: state.last_error.get(symbol).cloned(),
317 }
318 }
319}
320
321impl RiskVolOracle for PolymarketVolOracle {
322 fn get_iv(&self, underlying: &str, strike: f64, expiry_ts: i64) -> Result<f64, VolLookupError> {
323 let state = self
324 .state
325 .read()
326 .expect("polymarket vol oracle state poisoned");
327 let has_surface = state.surfaces.contains_key(underlying);
328
329 let connected = state.connected.get(underlying).copied().unwrap_or(false);
330 if !connected && !has_surface {
331 return Err(VolLookupError::UnhealthyProvider {
332 underlying: underlying.to_string(),
333 provider: VolProviderKind::Polymarket,
334 reason: state
335 .last_error
336 .get(underlying)
337 .cloned()
338 .unwrap_or_else(|| "not connected".to_string()),
339 });
340 }
341
342 let last_update_ts_ms = state.last_update_ts_ms.get(underlying).copied();
343 let staleness_seconds = last_update_ts_ms
344 .map(|ts| ((Utc::now().timestamp_millis() - ts) as f64 / 1000.0).max(0.0));
345 let ready = staleness_seconds
346 .map(|age| age <= self.config.staleness_threshold.as_secs_f64())
347 .unwrap_or(false);
348
349 if !ready {
350 return Err(VolLookupError::StaleSurface {
351 underlying: underlying.to_string(),
352 provider: VolProviderKind::Polymarket,
353 staleness_seconds: staleness_seconds.unwrap_or(f64::INFINITY),
354 threshold_seconds: self.config.staleness_threshold.as_secs_f64(),
355 });
356 }
357
358 let iv = state
359 .surfaces
360 .get(underlying)
361 .and_then(|surface| surface.get_interpolated(strike, expiry_ts))
362 .ok_or_else(|| VolLookupError::MissingSurface {
363 underlying: underlying.to_string(),
364 provider: VolProviderKind::Polymarket,
365 strike,
366 expiry_ts,
367 })?;
368
369 debug!(
370 underlying,
371 strike,
372 expiry_ts,
373 iv,
374 provider = VolProviderKind::Polymarket.as_str(),
375 "Backend vol oracle value used"
376 );
377
378 Ok(iv)
379 }
380
381 fn statuses(&self) -> Vec<VolOracleStatus> {
382 self.config
383 .symbols
384 .iter()
385 .map(|symbol| self.status_for(symbol))
386 .collect()
387 }
388
389 fn get_surface_snapshot(&self, underlying: &str) -> Option<VolSurfaceSnapshot> {
390 let state = self
391 .state
392 .read()
393 .expect("polymarket vol oracle state poisoned");
394 let surface = state.surfaces.get(underlying)?;
395 Some(VolSurfaceSnapshot {
396 underlying: underlying.to_string(),
397 last_update_ts_ms: state.last_update_ts_ms.get(underlying).copied(),
398 expiries: surface.expiries().iter().copied().collect(),
399 strike_points: surface.export_all_points(),
400 delta_curves: surface.export_delta_curves(),
401 atm_vols: surface.export_atm_vols(),
402 spot_price: state.spot_prices.get(underlying).copied(),
403 })
404 }
405
406 fn supports_surface_snapshots(&self) -> bool {
407 true
408 }
409}
410
411fn parse_market_cap_from_question(question: &str) -> Option<f64> {
416 let dollar_pos = question.find('$')?;
417 let after_dollar = &question[dollar_pos + 1..];
418 let end = after_dollar
419 .find(|c: char| !c.is_ascii_digit() && c != '.' && c != 'T' && c != 'B' && c != 'M')
420 .unwrap_or(after_dollar.len());
421 let token = &after_dollar[..end];
422
423 if let Some(num_str) = token.strip_suffix('T') {
424 num_str.parse::<f64>().ok().map(|n| n * 1e12)
425 } else if let Some(num_str) = token.strip_suffix('B') {
426 num_str.parse::<f64>().ok().map(|n| n * 1e9)
427 } else if let Some(num_str) = token.strip_suffix('M') {
428 num_str.parse::<f64>().ok().map(|n| n * 1e6)
429 } else {
430 token.parse::<f64>().ok()
431 }
432}
433
434fn parse_yes_probability(prices_json: &str) -> Option<f64> {
437 let prices: Vec<String> = serde_json::from_str(prices_json).ok()?;
438 prices.first()?.parse::<f64>().ok()
439}
440
441fn estimate_spot_from_cdf(strike_probs: &[(f64, f64)]) -> f64 {
443 let mut best_strike = strike_probs[0].0;
444 let mut best_dist = f64::MAX;
445 for &(strike, prob) in strike_probs {
446 let dist = (prob - 0.50).abs();
447 if dist < best_dist {
448 best_dist = dist;
449 best_strike = strike;
450 }
451 }
452
453 for window in strike_probs.windows(2) {
455 let (k1, p1) = window[0];
456 let (k2, p2) = window[1];
457 if (p1 >= 0.50 && p2 <= 0.50) || (p1 <= 0.50 && p2 >= 0.50) {
458 let denom = p2 - p1;
459 if denom.abs() < 1e-12 {
460 return best_strike;
461 }
462 let t = (0.50 - p1) / denom;
463 return k1 + t * (k2 - k1);
464 }
465 }
466
467 best_strike
468}
469
470fn norm_cdf(x: f64) -> f64 {
473 0.5 * (1.0 + libm::erf(x / std::f64::consts::SQRT_2))
474}
475
476fn bs_digital_call_price(spot: f64, strike: f64, sigma: f64, tte_years: f64) -> f64 {
478 let d2 = (libm::log(spot / strike) - 0.5 * sigma * sigma * tte_years)
479 / (sigma * libm::sqrt(tte_years));
480 norm_cdf(d2)
481}
482
483fn solve_digital_iv(spot: f64, strike: f64, tte_years: f64, target_prob: f64) -> Option<f64> {
486 if tte_years <= 0.0 || spot <= 0.0 || strike <= 0.0 {
487 return None;
488 }
489 if target_prob <= 0.01 || target_prob >= 0.99 {
490 return None;
491 }
492
493 let mut lo = 0.05_f64;
494 let mut hi = 4.0_f64;
495
496 if strike > spot {
500 let m = libm::log(strike / spot);
501 let sigma_peak = libm::sqrt(2.0 * m / tte_years);
502 let peak_price = bs_digital_call_price(spot, strike, sigma_peak, tte_years);
503 if target_prob > peak_price + 0.001 {
504 return None;
505 }
506 hi = hi.min(sigma_peak);
507 }
508
509 let lo_price = bs_digital_call_price(spot, strike, lo, tte_years);
510 let hi_price = bs_digital_call_price(spot, strike, hi, tte_years);
511
512 let increasing = hi_price > lo_price;
513 if increasing {
514 if target_prob < lo_price || target_prob > hi_price {
515 return None;
516 }
517 } else if target_prob > lo_price || target_prob < hi_price {
518 return None;
519 }
520
521 for _ in 0..100 {
522 let mid = 0.5 * (lo + hi);
523 let price = bs_digital_call_price(spot, strike, mid, tte_years);
524 if (price - target_prob).abs() < 1e-6 {
525 return Some(mid);
526 }
527 if (price > target_prob) == increasing {
528 hi = mid;
529 } else {
530 lo = mid;
531 }
532 }
533
534 Some(0.5 * (lo + hi))
535}
536
537#[cfg(test)]
538mod tests {
539 use super::*;
540
541 #[test]
542 fn test_parse_market_cap() {
543 assert_eq!(
544 parse_market_cap_from_question("SpaceX IPO closing market cap above $2T?"),
545 Some(2e12)
546 );
547 assert_eq!(
548 parse_market_cap_from_question("SpaceX IPO closing market cap above $1.5T?"),
549 Some(1.5e12)
550 );
551 assert_eq!(
552 parse_market_cap_from_question("SpaceX IPO closing market cap above $3.2T?"),
553 Some(3.2e12)
554 );
555 assert_eq!(parse_market_cap_from_question("No IPO by 2028"), None);
556 }
557
558 #[test]
559 fn test_parse_yes_probability() {
560 assert_eq!(parse_yes_probability(r#"["0.705","0.295"]"#), Some(0.705));
561 assert_eq!(parse_yes_probability(r#"["0.984","0.016"]"#), Some(0.984));
562 }
563
564 #[test]
565 fn test_norm_cdf() {
566 assert!((norm_cdf(0.0) - 0.5).abs() < 1e-10);
567 assert!((norm_cdf(1.0) - 0.8413).abs() < 1e-3);
568 assert!((norm_cdf(-1.0) - 0.1587).abs() < 1e-3);
569 }
570
571 #[test]
572 fn test_bs_digital_roundtrip() {
573 let spot = 200.0;
574 let strike = 168.5;
575 let sigma = 0.65;
576 let tte = 30.0 / 365.25;
577
578 let price = bs_digital_call_price(spot, strike, sigma, tte);
579 assert!(
580 price > 0.5 && price < 1.0,
581 "ITM digital call should be > 0.5"
582 );
583
584 let recovered = solve_digital_iv(spot, strike, tte, price).unwrap();
585 assert!(
586 (recovered - sigma).abs() < 0.01,
587 "Roundtrip IV should match: expected {sigma}, got {recovered}"
588 );
589 }
590
591 #[test]
592 fn test_solve_iv_otm() {
593 let spot = 200.0;
594 let strike = 250.0;
595 let sigma = 0.80;
596 let tte = 30.0 / 365.25;
597
598 let price = bs_digital_call_price(spot, strike, sigma, tte);
599 assert!(
600 price > 0.0 && price < 0.5,
601 "OTM digital call should be < 0.5"
602 );
603
604 let recovered = solve_digital_iv(spot, strike, tte, price).unwrap();
605 assert!(
606 (recovered - sigma).abs() < 0.02,
607 "Roundtrip IV: expected {sigma}, got {recovered}"
608 );
609 }
610
611 #[test]
612 fn test_estimate_spot() {
613 let data = vec![
614 (150.0, 0.85),
615 (170.0, 0.75),
616 (200.0, 0.50),
617 (230.0, 0.25),
618 (260.0, 0.10),
619 ];
620 let spot = estimate_spot_from_cdf(&data);
621 assert!(
622 (spot - 200.0).abs() < 1.0,
623 "Spot should be near 200, got {spot}"
624 );
625 }
626
627 #[test]
628 fn test_oracle_not_ready_before_poll() {
629 let config = PolymarketVolOracleConfig {
630 event_slug: "test".to_string(),
631 shares_outstanding: 11.87e9,
632 reference_tte_days: 30.0,
633 poll_interval: Duration::from_secs(60),
634 staleness_threshold: Duration::from_secs(300),
635 symbols: vec!["SPCX".to_string()],
636 };
637 let oracle = PolymarketVolOracle::new(config);
638 let result = oracle.get_iv("SPCX", 200.0, 1800000000);
639 assert!(matches!(
640 result,
641 Err(VolLookupError::UnhealthyProvider { .. })
642 ));
643 }
644}