Skip to main content

hypercall_vol_oracle/
databento_oracle.rs

1//! Databento-backed vol oracle for CME products (initially GC gold options).
2//!
3//! This module holds everything Databento-specific behind a single file: the
4//! CME symbology parser, expiry calendar, MBP-1 book cache, chain-to-IV
5//! aggregator, a mockable feed trait, and the `RiskVolOracle` implementation
6//! that publishes surfaces to the rest of the API. Nothing here is exposed
7//! to consumers except the final `DatabentoVolOracle` struct.
8//!
9//! The build-out follows a test-driven sequence; see the cycle markers in
10//! the `tests` submodules.
11
12#![allow(dead_code)]
13
14// ---------------------------------------------------------------------------
15// Symbology (cycle 3)
16// ---------------------------------------------------------------------------
17
18/// CME Globex month codes. Standard across every listed product.
19///
20/// | Code | Month     | Code | Month     |
21/// |------|-----------|------|-----------|
22/// | F    | January   | N    | July      |
23/// | G    | February  | Q    | August    |
24/// | H    | March     | U    | September |
25/// | J    | April     | V    | October   |
26/// | K    | May       | X    | November  |
27/// | M    | June      | Z    | December  |
28fn cme_month_code_to_number(code: char) -> Option<u32> {
29    match code {
30        'F' => Some(1),
31        'G' => Some(2),
32        'H' => Some(3),
33        'J' => Some(4),
34        'K' => Some(5),
35        'M' => Some(6),
36        'N' => Some(7),
37        'Q' => Some(8),
38        'U' => Some(9),
39        'V' => Some(10),
40        'X' => Some(11),
41        'Z' => Some(12),
42        _ => None,
43    }
44}
45
46/// A parsed CME futures contract symbol.
47///
48/// The wire format we accept is `<ROOT><MONTH><YEAR>` where ROOT is the
49/// two-character product root (`GC` for gold futures), MONTH is one of the
50/// CME month codes above, and YEAR is the last digit of the year. Example:
51/// `GCM6` = June 2026 gold futures.
52#[derive(Debug, Clone, PartialEq, Eq)]
53struct CmeFutureSymbol {
54    root: String,
55    month_code: char,
56    year_digit: u8,
57}
58
59impl CmeFutureSymbol {
60    fn parse(raw: &str) -> Result<Self, String> {
61        let trimmed = raw.trim();
62        if trimmed.len() != 4 {
63            return Err(format!(
64                "expected 4-char CME future symbol, got {trimmed:?}"
65            ));
66        }
67        let mut chars = trimmed.chars();
68        let root: String = [chars.next().unwrap(), chars.next().unwrap()]
69            .iter()
70            .collect();
71        let month_code = chars.next().unwrap();
72        let year_char = chars.next().unwrap();
73
74        if !root.chars().all(|c| c.is_ascii_uppercase()) {
75            return Err(format!("future root must be uppercase ASCII: {root:?}"));
76        }
77        if cme_month_code_to_number(month_code).is_none() {
78            return Err(format!("invalid CME month code {month_code:?}"));
79        }
80        let year_digit = year_char
81            .to_digit(10)
82            .ok_or_else(|| format!("year digit must be 0-9, got {year_char:?}"))?
83            as u8;
84
85        Ok(Self {
86            root,
87            month_code,
88            year_digit,
89        })
90    }
91}
92
93/// Which side of the option — call or put.
94#[derive(Debug, Clone, Copy, PartialEq, Eq)]
95pub(crate) enum CmeOptionSide {
96    Call,
97    Put,
98}
99
100/// A parsed CME option-on-future symbol.
101///
102/// The wire format we accept is `<ROOT><MONTH><YEAR> <C|P> <STRIKE>`, for
103/// example `OGM6 C 2400`. Whitespace is flexible (any run of ASCII space).
104#[derive(Debug, Clone, PartialEq)]
105struct CmeOptionSymbol {
106    root: String,
107    month_code: char,
108    year_digit: u8,
109    side: CmeOptionSide,
110    strike: f64,
111}
112
113impl CmeOptionSymbol {
114    fn parse(raw: &str) -> Result<Self, String> {
115        let parts: Vec<&str> = raw.split_ascii_whitespace().collect();
116        if parts.len() != 3 {
117            return Err(format!(
118                "expected '<root><month><year> <C|P> <strike>', got {raw:?}"
119            ));
120        }
121        let (contract, side_str, strike_str) = (parts[0], parts[1], parts[2]);
122
123        if contract.len() != 4 {
124            return Err(format!("contract segment must be 4 chars: {contract:?}"));
125        }
126        let mut chars = contract.chars();
127        let root: String = [chars.next().unwrap(), chars.next().unwrap()]
128            .iter()
129            .collect();
130        let month_code = chars.next().unwrap();
131        let year_char = chars.next().unwrap();
132
133        if !root.chars().all(|c| c.is_ascii_uppercase()) {
134            return Err(format!("option root must be uppercase ASCII: {root:?}"));
135        }
136        if cme_month_code_to_number(month_code).is_none() {
137            return Err(format!("invalid CME month code {month_code:?}"));
138        }
139        let year_digit = year_char
140            .to_digit(10)
141            .ok_or_else(|| format!("year digit must be 0-9, got {year_char:?}"))?
142            as u8;
143
144        let side = match side_str {
145            "C" | "c" => CmeOptionSide::Call,
146            "P" | "p" => CmeOptionSide::Put,
147            other => return Err(format!("option side must be C or P, got {other:?}")),
148        };
149
150        let strike = strike_str
151            .parse::<f64>()
152            .map_err(|err| format!("invalid strike {strike_str:?}: {err}"))?;
153        if !strike.is_finite() || strike <= 0.0 {
154            return Err(format!("strike must be positive finite, got {strike}"));
155        }
156
157        Ok(Self {
158            root,
159            month_code,
160            year_digit,
161            side,
162            strike,
163        })
164    }
165}
166
167// ---------------------------------------------------------------------------
168// Expiry calendar (cycle 4)
169// ---------------------------------------------------------------------------
170
171use chrono::{DateTime, Datelike, NaiveDate, TimeZone, Utc, Weekday};
172
173/// Compute the nominal expiry for a CME OG (gold-on-futures) monthly option
174/// contract in the requested option-contract month.
175///
176/// CME rule: *"Trading terminates at 1:30 p.m. CT on the 4th last business
177/// day of the month prior to the option contract month."* This function
178/// implements the "4th last business day of the prior month" piece at
179/// **daily** resolution and returns UTC midnight of that day.
180///
181/// # Limitations
182///
183/// The US-holiday calendar is **not** applied here — we treat Mon–Fri as
184/// business days regardless of federal holidays. Near-end-of-month
185/// holidays (e.g. Memorial Day on the last Monday of May) can shift the
186/// real CME expiry by one calendar day vs. what this function returns.
187/// For the nearest Jan–Dec 2026 months none of the federal holidays fall
188/// inside the "last 4 business days" window, so the approximation is
189/// correct for our current deployment surface; the cycle-4 unit tests
190/// pin the specific months we've verified by hand. Production-grade
191/// holiday handling is a v2 item.
192fn og_monthly_expiry(option_year: i32, option_month: u32) -> Option<DateTime<Utc>> {
193    if !(1..=12).contains(&option_month) {
194        return None;
195    }
196
197    let (prior_year, prior_month) = if option_month == 1 {
198        (option_year - 1, 12)
199    } else {
200        (option_year, option_month - 1)
201    };
202
203    let last_day = last_day_of_month(prior_year, prior_month)?;
204    let mut date = NaiveDate::from_ymd_opt(prior_year, prior_month, last_day)?;
205
206    let mut business_days_found = 0u32;
207    loop {
208        if is_weekday(date) {
209            business_days_found += 1;
210            if business_days_found == 4 {
211                break;
212            }
213        }
214        date = date.pred_opt()?;
215    }
216
217    Some(Utc.from_utc_datetime(&date.and_hms_opt(0, 0, 0)?))
218}
219
220fn last_day_of_month(year: i32, month: u32) -> Option<u32> {
221    let (next_year, next_month) = if month == 12 {
222        (year + 1, 1)
223    } else {
224        (year, month + 1)
225    };
226    NaiveDate::from_ymd_opt(next_year, next_month, 1)
227        .and_then(|d| d.pred_opt())
228        .map(|d| d.day())
229}
230
231fn is_weekday(date: NaiveDate) -> bool {
232    !matches!(date.weekday(), Weekday::Sat | Weekday::Sun)
233}
234
235#[cfg(test)]
236mod expiry_tests {
237    use super::*;
238    use chrono::TimeZone;
239
240    /// OGM6 (June 2026 gold options): 4th last business day of May 2026.
241    /// May 29 Fri (1), May 28 Thu (2), May 27 Wed (3), May 26 Tue (4).
242    /// Memorial Day 2026 = May 25, which sits *outside* the final-4 window,
243    /// so the no-holiday approximation matches CME's actual calendar here.
244    #[test]
245    fn ogm6_expires_on_2026_05_26() {
246        let expiry = og_monthly_expiry(2026, 6).expect("OGM6 should have a valid expiry");
247        let expected = Utc.with_ymd_and_hms(2026, 5, 26, 0, 0, 0).unwrap();
248        assert_eq!(expiry, expected, "got {expiry}");
249    }
250
251    /// OGU6 (September 2026): Aug 31 Mon (1), Aug 28 Fri (2), Aug 27 Thu (3),
252    /// Aug 26 Wed (4). No holidays in the window.
253    #[test]
254    fn ogu6_expires_on_2026_08_26() {
255        let expiry = og_monthly_expiry(2026, 9).expect("OGU6 should have a valid expiry");
256        let expected = Utc.with_ymd_and_hms(2026, 8, 26, 0, 0, 0).unwrap();
257        assert_eq!(expiry, expected, "got {expiry}");
258    }
259
260    /// OGF7 (January 2027): wraps to prior year, Dec 2026.
261    /// Dec 31 Thu (1), Dec 30 Wed (2), Dec 29 Tue (3), Dec 28 Mon (4).
262    #[test]
263    fn ogf7_wraps_to_prior_year_december() {
264        let expiry = og_monthly_expiry(2027, 1).expect("OGF7 should have a valid expiry");
265        let expected = Utc.with_ymd_and_hms(2026, 12, 28, 0, 0, 0).unwrap();
266        assert_eq!(expiry, expected, "got {expiry}");
267    }
268
269    /// OGH7 (March 2027): Feb 26 Fri (1), Feb 25 Thu (2), Feb 24 Wed (3),
270    /// Feb 23 Tue (4). Presidents Day 2027 (Feb 15) is outside the window.
271    #[test]
272    fn ogh7_expires_on_2027_02_23() {
273        let expiry = og_monthly_expiry(2027, 3).expect("OGH7 should have a valid expiry");
274        let expected = Utc.with_ymd_and_hms(2027, 2, 23, 0, 0, 0).unwrap();
275        assert_eq!(expiry, expected, "got {expiry}");
276    }
277
278    #[test]
279    fn invalid_month_returns_none() {
280        assert!(og_monthly_expiry(2026, 0).is_none());
281        assert!(og_monthly_expiry(2026, 13).is_none());
282    }
283
284    #[test]
285    fn last_day_of_month_handles_leap_and_year_wrap() {
286        assert_eq!(last_day_of_month(2024, 2), Some(29)); // leap
287        assert_eq!(last_day_of_month(2025, 2), Some(28));
288        assert_eq!(last_day_of_month(2026, 12), Some(31)); // wraps forward
289        assert_eq!(last_day_of_month(2026, 4), Some(30));
290    }
291
292    #[test]
293    fn is_weekday_classifies_all_seven_days() {
294        let sunday = NaiveDate::from_ymd_opt(2026, 4, 5).unwrap();
295        let monday = NaiveDate::from_ymd_opt(2026, 4, 6).unwrap();
296        let friday = NaiveDate::from_ymd_opt(2026, 4, 10).unwrap();
297        let saturday = NaiveDate::from_ymd_opt(2026, 4, 11).unwrap();
298        assert!(!is_weekday(sunday));
299        assert!(is_weekday(monday));
300        assert!(is_weekday(friday));
301        assert!(!is_weekday(saturday));
302    }
303}
304
305// ---------------------------------------------------------------------------
306// Chain → IV points (cycle 5)
307// ---------------------------------------------------------------------------
308
309use hypercall_margin::black_76::black_76_implied_vol;
310use hypercall_margin::OptionType;
311
312/// Quality filters applied to raw option quotes before we even try to solve
313/// for IV. Tuned defaults aim to reject obvious garbage (one-sided markets,
314/// locked books, stale prints) without being so aggressive that we eat the
315/// thin wings of a real CME chain.
316#[derive(Debug, Clone)]
317struct ChainContext {
318    /// Front-month futures price, used as the forward F in Black-76.
319    forward: f64,
320    /// Risk-free rate for discounting. `0.05` is the project-wide static
321    /// assumption per earlier agreement.
322    risk_free_rate: f64,
323    /// "Now" in unix milliseconds. Supplied by the caller so tests are
324    /// deterministic.
325    now_ms: i64,
326    /// Reject quotes whose `(ask − bid) / mid` exceeds this fraction.
327    /// A value of `0.20` means "drop anything with > 20% spread".
328    max_spread_fraction: f64,
329    /// Reject quotes whose `ingested_at_ms` is older than this many
330    /// milliseconds relative to `now_ms`.
331    max_staleness_ms: i64,
332}
333
334impl Default for ChainContext {
335    fn default() -> Self {
336        Self {
337            forward: 0.0,
338            risk_free_rate: 0.05,
339            now_ms: 0,
340            max_spread_fraction: 0.20,
341            max_staleness_ms: 60_000,
342        }
343    }
344}
345
346/// A raw option quote as we pull it off Databento. Normalised to the shape
347/// the aggregator wants: one strike, one expiry, one side, BBO, and the
348/// timestamp we received it at.
349#[derive(Debug, Clone)]
350struct OptionQuote {
351    strike: f64,
352    /// Expiry timestamp in unix **seconds** — matches `VolatilitySurface`'s
353    /// existing convention.
354    expiry_ts: i64,
355    side: CmeOptionSide,
356    bid: f64,
357    ask: f64,
358    /// Ingest timestamp in unix **milliseconds**.
359    ingested_at_ms: i64,
360}
361
362/// A single (strike, expiry, iv) point fit from a quality quote. Ready to
363/// insert into a `VolatilitySurface`.
364#[derive(Debug, Clone, Copy, PartialEq)]
365struct FittedIvPoint {
366    strike: f64,
367    expiry_ts: i64,
368    iv: f64,
369}
370
371/// Seconds per year used to convert seconds-to-expiry into Black-76's
372/// time-to-expiry parameter (τ). 365.0 matches the existing
373/// `calculate_implied_volatility` callers in this repo.
374const SECONDS_PER_YEAR: f64 = 365.0 * 86_400.0;
375
376/// Fit an implied-vol point from each quote in the chain that survives
377/// quality checks. Filters silently — the count of dropped quotes is
378/// observable by comparing input/output lengths if a caller needs it.
379fn compute_iv_points(quotes: &[OptionQuote], ctx: &ChainContext) -> Vec<FittedIvPoint> {
380    let mut fitted = Vec::with_capacity(quotes.len());
381
382    if !ctx.forward.is_finite() || ctx.forward <= 0.0 {
383        return fitted;
384    }
385
386    for quote in quotes {
387        // Structural BBO checks.
388        if quote.bid <= 0.0 || quote.ask <= 0.0 || quote.bid >= quote.ask {
389            continue;
390        }
391        if !quote.bid.is_finite() || !quote.ask.is_finite() {
392            continue;
393        }
394
395        // Staleness.
396        if ctx.now_ms.saturating_sub(quote.ingested_at_ms) > ctx.max_staleness_ms {
397            continue;
398        }
399
400        // Spread width.
401        let mid = 0.5 * (quote.bid + quote.ask);
402        let spread = quote.ask - quote.bid;
403        if mid <= 0.0 || spread / mid > ctx.max_spread_fraction {
404            continue;
405        }
406
407        // Time to expiry in years. Drop anything already expired or
408        // missing a positive tau.
409        let now_secs = ctx.now_ms / 1000;
410        let seconds_to_expiry = quote.expiry_ts.saturating_sub(now_secs);
411        if seconds_to_expiry <= 0 {
412            continue;
413        }
414        let tau = seconds_to_expiry as f64 / SECONDS_PER_YEAR;
415        if !tau.is_finite() || tau <= 0.0 {
416            continue;
417        }
418
419        let option_type = match quote.side {
420            CmeOptionSide::Call => OptionType::Call,
421            CmeOptionSide::Put => OptionType::Put,
422        };
423
424        // Below-intrinsic and non-convergence are both handled inside
425        // the Black-76 wrapper via the underlying BS solver — they
426        // come back as None, which we silently drop.
427        let Some(iv) = black_76_implied_vol(
428            &option_type,
429            ctx.forward,
430            quote.strike,
431            tau,
432            ctx.risk_free_rate,
433            mid,
434        ) else {
435            continue;
436        };
437
438        if !iv.is_finite() || iv <= 0.0 {
439            continue;
440        }
441
442        fitted.push(FittedIvPoint {
443            strike: quote.strike,
444            expiry_ts: quote.expiry_ts,
445            iv,
446        });
447    }
448
449    fitted
450}
451
452#[cfg(test)]
453mod chain_tests {
454    use super::*;
455    use hypercall_margin::black_76::black_76_price;
456
457    /// Helper: build a `ChainContext` with the cycle-5 defaults plus a
458    /// supplied forward and `now` to keep tests readable.
459    fn ctx(forward: f64, now_ms: i64) -> ChainContext {
460        ChainContext {
461            forward,
462            now_ms,
463            ..ChainContext::default()
464        }
465    }
466
467    /// Helper: fabricate an option quote at a known sigma by pricing it
468    /// through Black-76 and placing the bid/ask symmetrically around the
469    /// resulting mid. The solver should recover sigma to high precision.
470    fn quote_at_sigma(
471        strike: f64,
472        expiry_ts: i64,
473        side: CmeOptionSide,
474        ctx: &ChainContext,
475        sigma: f64,
476        half_spread: f64,
477        ingested_at_ms: i64,
478    ) -> OptionQuote {
479        let option_type = match side {
480            CmeOptionSide::Call => OptionType::Call,
481            CmeOptionSide::Put => OptionType::Put,
482        };
483        let tau = ((expiry_ts - ctx.now_ms / 1000) as f64) / SECONDS_PER_YEAR;
484        let mid = black_76_price(
485            &option_type,
486            ctx.forward,
487            strike,
488            tau,
489            ctx.risk_free_rate,
490            sigma,
491        );
492        OptionQuote {
493            strike,
494            expiry_ts,
495            side,
496            bid: mid - half_spread,
497            ask: mid + half_spread,
498            ingested_at_ms,
499        }
500    }
501
502    /// 30 days from now in unix seconds, relative to a supplied `now_ms`.
503    fn expiry_30d(now_ms: i64) -> i64 {
504        now_ms / 1000 + 30 * 86_400
505    }
506
507    #[test]
508    fn happy_path_recovers_sigma_for_three_strikes() {
509        let now_ms = 1_775_000_000_000_i64;
510        let ctx = ctx(4700.0, now_ms);
511        let exp = expiry_30d(now_ms);
512        let sigma = 0.35;
513
514        let quotes = vec![
515            quote_at_sigma(4500.0, exp, CmeOptionSide::Put, &ctx, sigma, 1.0, now_ms),
516            quote_at_sigma(4700.0, exp, CmeOptionSide::Call, &ctx, sigma, 1.0, now_ms),
517            quote_at_sigma(4900.0, exp, CmeOptionSide::Call, &ctx, sigma, 1.0, now_ms),
518        ];
519
520        let points = compute_iv_points(&quotes, &ctx);
521        assert_eq!(points.len(), 3, "all 3 clean quotes should fit");
522        for p in &points {
523            assert!(
524                (p.iv - sigma).abs() < 1e-3,
525                "fitted iv {} should be close to {sigma}",
526                p.iv
527            );
528        }
529    }
530
531    #[test]
532    fn drops_zero_or_inverted_bids_and_asks() {
533        let now_ms = 1_775_000_000_000i64;
534        let ctx = ctx(4700.0, now_ms);
535        let exp = expiry_30d(now_ms);
536
537        let mut q = quote_at_sigma(4700.0, exp, CmeOptionSide::Call, &ctx, 0.3, 1.0, now_ms);
538        let zero_bid = OptionQuote {
539            bid: 0.0,
540            ..q.clone()
541        };
542        let zero_ask = OptionQuote {
543            ask: 0.0,
544            ..q.clone()
545        };
546        let inverted = OptionQuote {
547            bid: q.ask,
548            ask: q.bid,
549            ..q.clone()
550        };
551        // And a clean quote to anchor the test
552        q.bid = q.bid.max(0.001);
553
554        let points = compute_iv_points(&[zero_bid, zero_ask, inverted, q], &ctx);
555        assert_eq!(points.len(), 1, "only the clean quote should survive");
556    }
557
558    #[test]
559    fn drops_quotes_with_spread_wider_than_context() {
560        let now_ms = 1_775_000_000_000i64;
561        let ctx = ctx(4700.0, now_ms); // default max_spread_fraction = 0.20
562        let exp = expiry_30d(now_ms);
563
564        // Use a deep-OTM put so the mid is small and the 40-wide spread
565        // is clearly > 20% of mid.
566        let wide = {
567            let mut q = quote_at_sigma(3800.0, exp, CmeOptionSide::Put, &ctx, 0.3, 20.0, now_ms);
568            q.bid = 0.01;
569            q.ask = 40.0;
570            q
571        };
572
573        assert!(compute_iv_points(&[wide], &ctx).is_empty());
574    }
575
576    #[test]
577    fn drops_stale_quotes() {
578        let now_ms = 1_775_000_000_000i64;
579        let ctx = ctx(4700.0, now_ms);
580        let exp = expiry_30d(now_ms);
581
582        // Ingested 2 minutes ago — beyond the 60s default staleness.
583        let stale = quote_at_sigma(
584            4700.0,
585            exp,
586            CmeOptionSide::Call,
587            &ctx,
588            0.3,
589            1.0,
590            now_ms - 120_000,
591        );
592        assert!(compute_iv_points(&[stale], &ctx).is_empty());
593    }
594
595    #[test]
596    fn drops_below_intrinsic_quotes() {
597        let now_ms = 1_775_000_000_000i64;
598        let ctx = ctx(5000.0, now_ms);
599        let exp = expiry_30d(now_ms);
600
601        // A 4000-strike call has intrinsic ≈ 1000 in forward space. Price
602        // it at mid = 100 — absurdly below intrinsic, solver must reject.
603        let q = OptionQuote {
604            strike: 4000.0,
605            expiry_ts: exp,
606            side: CmeOptionSide::Call,
607            bid: 95.0,
608            ask: 105.0,
609            ingested_at_ms: now_ms,
610        };
611        assert!(compute_iv_points(&[q], &ctx).is_empty());
612    }
613
614    #[test]
615    fn drops_already_expired_quotes() {
616        let now_ms = 1_775_000_000_000i64;
617        let ctx = ctx(4700.0, now_ms);
618        let already_expired = now_ms / 1000 - 1;
619
620        let q = OptionQuote {
621            strike: 4700.0,
622            expiry_ts: already_expired,
623            side: CmeOptionSide::Call,
624            bid: 50.0,
625            ask: 52.0,
626            ingested_at_ms: now_ms,
627        };
628        assert!(compute_iv_points(&[q], &ctx).is_empty());
629    }
630
631    #[test]
632    fn empty_input_is_not_an_error() {
633        let ctx = ctx(4700.0, 1_775_000_000_000);
634        assert!(compute_iv_points(&[], &ctx).is_empty());
635    }
636
637    #[test]
638    fn non_positive_forward_returns_empty() {
639        let ctx = ctx(-1.0, 1_775_000_000_000);
640        // Even well-formed quotes should drop on bad context.
641        let exp = expiry_30d(ctx.now_ms);
642        let q = OptionQuote {
643            strike: 4700.0,
644            expiry_ts: exp,
645            side: CmeOptionSide::Call,
646            bid: 50.0,
647            ask: 52.0,
648            ingested_at_ms: ctx.now_ms,
649        };
650        assert!(compute_iv_points(&[q], &ctx).is_empty());
651    }
652}
653
654// ---------------------------------------------------------------------------
655// MBP-1 book cache (cycle 6)
656// ---------------------------------------------------------------------------
657
658use std::collections::HashMap;
659
660/// Databento's canonical 64-bit instrument identifier.
661type InstrumentId = u64;
662
663/// A single top-of-book snapshot. Databento MBP-1 records carry exactly
664/// this shape (best bid/size + best ask/size + timestamp), so we just
665/// store what we're given.
666#[derive(Debug, Clone, Copy, PartialEq)]
667pub(crate) struct Bbo {
668    bid: f64,
669    bid_size: u64,
670    ask: f64,
671    ask_size: u64,
672    /// Databento event timestamp, in unix **milliseconds**. Used by the
673    /// chain aggregator's staleness check upstream.
674    ts_event_ms: i64,
675}
676
677/// Per-instrument top-of-book cache. Intentionally dumb: the Databento
678/// MBP-1 feed already collapses the full book to a single BBO per update,
679/// so there's no order book to maintain here — just the most-recent
680/// snapshot per instrument.
681///
682/// The cache is a pure state machine with no I/O, no locks, and no async.
683/// All synchronisation is the caller's problem.
684#[derive(Debug, Default)]
685struct BookCache {
686    books: HashMap<InstrumentId, Bbo>,
687}
688
689impl BookCache {
690    fn new() -> Self {
691        Self::default()
692    }
693
694    /// Apply a new BBO snapshot for `instrument_id`. Replaces any prior
695    /// entry unconditionally — Databento's MBP-1 updates are already
696    /// post-aggregation, so "latest wins" is correct.
697    fn apply(&mut self, instrument_id: InstrumentId, bbo: Bbo) {
698        self.books.insert(instrument_id, bbo);
699    }
700
701    /// Read the current BBO for an instrument, or `None` if we have never
702    /// received a quote for it.
703    fn get(&self, instrument_id: InstrumentId) -> Option<&Bbo> {
704        self.books.get(&instrument_id)
705    }
706
707    /// Forget an instrument (e.g. it expired or was removed from the
708    /// subscription). Returns `true` if we had it and removed it.
709    fn delete(&mut self, instrument_id: InstrumentId) -> bool {
710        self.books.remove(&instrument_id).is_some()
711    }
712
713    fn len(&self) -> usize {
714        self.books.len()
715    }
716
717    #[allow(dead_code)]
718    fn is_empty(&self) -> bool {
719        self.books.is_empty()
720    }
721
722    /// Iterate over `(instrument_id, bbo)` pairs for the chain aggregator.
723    fn iter(&self) -> impl Iterator<Item = (InstrumentId, &Bbo)> {
724        self.books.iter().map(|(id, bbo)| (*id, bbo))
725    }
726}
727
728#[cfg(test)]
729mod book_cache_tests {
730    use super::*;
731
732    fn sample_bbo(bid: f64, ask: f64, ts_ms: i64) -> Bbo {
733        Bbo {
734            bid,
735            bid_size: 10,
736            ask,
737            ask_size: 10,
738            ts_event_ms: ts_ms,
739        }
740    }
741
742    #[test]
743    fn apply_then_get_returns_the_bbo() {
744        let mut cache = BookCache::new();
745        let bbo = sample_bbo(100.0, 101.0, 1_700_000_000_000);
746        cache.apply(42, bbo);
747        assert_eq!(cache.get(42), Some(&bbo));
748        assert_eq!(cache.len(), 1);
749    }
750
751    #[test]
752    fn apply_replaces_existing_snapshot() {
753        let mut cache = BookCache::new();
754        cache.apply(42, sample_bbo(100.0, 101.0, 1));
755        cache.apply(42, sample_bbo(100.5, 101.5, 2));
756        let got = cache.get(42).expect("should still be present");
757        assert_eq!(got.bid, 100.5);
758        assert_eq!(got.ask, 101.5);
759        assert_eq!(got.ts_event_ms, 2);
760        assert_eq!(cache.len(), 1, "same instrument, still one entry");
761    }
762
763    #[test]
764    fn get_on_unknown_instrument_is_none() {
765        let cache = BookCache::new();
766        assert!(cache.get(999).is_none());
767    }
768
769    #[test]
770    fn delete_removes_and_reports() {
771        let mut cache = BookCache::new();
772        cache.apply(1, sample_bbo(1.0, 2.0, 0));
773        cache.apply(2, sample_bbo(3.0, 4.0, 0));
774        assert!(cache.delete(1));
775        assert!(!cache.delete(1)); // second delete: nothing to remove
776        assert_eq!(cache.len(), 1);
777        assert!(cache.get(1).is_none());
778        assert!(cache.get(2).is_some());
779    }
780
781    #[test]
782    fn iter_yields_all_entries() {
783        let mut cache = BookCache::new();
784        cache.apply(10, sample_bbo(1.0, 2.0, 100));
785        cache.apply(20, sample_bbo(3.0, 4.0, 200));
786        cache.apply(30, sample_bbo(5.0, 6.0, 300));
787
788        let mut collected: Vec<(InstrumentId, Bbo)> =
789            cache.iter().map(|(id, bbo)| (id, *bbo)).collect();
790        collected.sort_by_key(|(id, _)| *id);
791
792        assert_eq!(collected.len(), 3);
793        assert_eq!(collected[0].0, 10);
794        assert_eq!(collected[1].0, 20);
795        assert_eq!(collected[2].0, 30);
796    }
797
798    #[test]
799    fn new_cache_is_empty() {
800        let cache = BookCache::new();
801        assert!(cache.is_empty());
802        assert_eq!(cache.len(), 0);
803    }
804}
805
806// ---------------------------------------------------------------------------
807// Feed abstraction + fake feed for tests (cycle 7)
808// ---------------------------------------------------------------------------
809
810use async_trait::async_trait;
811
812/// A product-type-agnostic view of a single Databento instrument
813/// definition, normalised to the fields we actually consume.
814#[derive(Debug, Clone, PartialEq)]
815pub(crate) enum InstrumentKind {
816    Future {
817        root: String,
818        month_code: char,
819        year_digit: u8,
820    },
821    Option {
822        root: String,
823        month_code: char,
824        year_digit: u8,
825        side: CmeOptionSide,
826        strike: f64,
827        expiry_ts: i64,
828    },
829}
830
831/// Events we consume from a Databento feed, after the SDK-specific DBN
832/// records have been adapted to our internal representation.
833///
834/// We deliberately do **not** expose Databento's native `dbn::Record`
835/// types across this boundary — that would couple our provider to a
836/// particular SDK version and make the fake-feed test harness awkward.
837/// The real feed adapter (in cycle 8) is responsible for translating DBN
838/// records into these four variants.
839#[derive(Debug, Clone, PartialEq)]
840pub(crate) enum FeedRecord {
841    /// An instrument definition update. Fired once per instrument when the
842    /// stream starts, and again if a new instrument is listed mid-stream.
843    Definition {
844        instrument_id: InstrumentId,
845        kind: InstrumentKind,
846    },
847    /// A top-of-book update for a previously-defined instrument.
848    Mbp1 {
849        instrument_id: InstrumentId,
850        bbo: Bbo,
851    },
852    /// A transient feed error. The provider should treat this as a
853    /// reconnect signal — not a hard failure — and continue.
854    Error(String),
855    /// The feed has closed cleanly (e.g. historical replay done). No
856    /// further records will follow.
857    EndOfStream,
858}
859
860/// Abstraction over a Databento live-or-historical feed, narrowed to the
861/// two calls the provider needs: pull the next record, or stop.
862///
863/// Async because the real implementation wraps `databento::live::Client`
864/// which is async; a synchronous `Iterator` would force us to block inside
865/// the provider task.
866#[async_trait]
867pub(crate) trait DatabentoFeed: Send + Sync {
868    /// Return the next record from the feed, or `None` once the feed has
869    /// ended and will never produce another record. Transient errors are
870    /// returned as `Some(FeedRecord::Error(..))` — only a terminal state
871    /// yields `None`.
872    async fn next_record(&mut self) -> Option<FeedRecord>;
873}
874
875/// Deterministic in-memory feed used exclusively in tests. Hand it a
876/// `Vec<FeedRecord>` and it replays them in order, one per `next_record`
877/// call, then returns `None` forever.
878struct FakeFeed {
879    records: std::collections::VecDeque<FeedRecord>,
880}
881
882impl FakeFeed {
883    fn new(records: Vec<FeedRecord>) -> Self {
884        Self {
885            records: records.into(),
886        }
887    }
888}
889
890#[async_trait]
891impl DatabentoFeed for FakeFeed {
892    async fn next_record(&mut self) -> Option<FeedRecord> {
893        self.records.pop_front()
894    }
895}
896
897#[cfg(test)]
898mod feed_tests {
899    use super::*;
900
901    fn sample_gc_definition() -> FeedRecord {
902        FeedRecord::Definition {
903            instrument_id: 1,
904            kind: InstrumentKind::Future {
905                root: "GC".to_string(),
906                month_code: 'M',
907                year_digit: 6,
908            },
909        }
910    }
911
912    fn sample_og_definition() -> FeedRecord {
913        FeedRecord::Definition {
914            instrument_id: 2,
915            kind: InstrumentKind::Option {
916                root: "OG".to_string(),
917                month_code: 'M',
918                year_digit: 6,
919                side: CmeOptionSide::Call,
920                strike: 4700.0,
921                expiry_ts: 1_777_000_000,
922            },
923        }
924    }
925
926    fn sample_bbo_update(instrument_id: InstrumentId) -> FeedRecord {
927        FeedRecord::Mbp1 {
928            instrument_id,
929            bbo: Bbo {
930                bid: 100.0,
931                bid_size: 5,
932                ask: 101.0,
933                ask_size: 5,
934                ts_event_ms: 1_700_000_000_000,
935            },
936        }
937    }
938
939    #[tokio::test]
940    async fn fake_feed_replays_records_in_order() {
941        let mut feed = FakeFeed::new(vec![
942            sample_gc_definition(),
943            sample_og_definition(),
944            sample_bbo_update(1),
945            sample_bbo_update(2),
946        ]);
947
948        let r1 = feed.next_record().await.expect("record 1");
949        let r2 = feed.next_record().await.expect("record 2");
950        let r3 = feed.next_record().await.expect("record 3");
951        let r4 = feed.next_record().await.expect("record 4");
952        let r5 = feed.next_record().await;
953
954        assert!(matches!(
955            r1,
956            FeedRecord::Definition {
957                instrument_id: 1,
958                ..
959            }
960        ));
961        assert!(matches!(
962            r2,
963            FeedRecord::Definition {
964                instrument_id: 2,
965                ..
966            }
967        ));
968        assert!(matches!(
969            r3,
970            FeedRecord::Mbp1 {
971                instrument_id: 1,
972                ..
973            }
974        ));
975        assert!(matches!(
976            r4,
977            FeedRecord::Mbp1 {
978                instrument_id: 2,
979                ..
980            }
981        ));
982        assert!(r5.is_none(), "feed should be exhausted after 4 records");
983    }
984
985    #[tokio::test]
986    async fn fake_feed_propagates_error_and_continues() {
987        // Errors are transient — the provider sees them, reacts, then
988        // keeps consuming records. A terminal feed yields None instead.
989        let mut feed = FakeFeed::new(vec![
990            FeedRecord::Error("transient: connection reset".to_string()),
991            sample_gc_definition(),
992            FeedRecord::EndOfStream,
993        ]);
994
995        let r1 = feed.next_record().await.expect("error record");
996        assert!(matches!(r1, FeedRecord::Error(_)));
997
998        let r2 = feed.next_record().await.expect("definition record");
999        assert!(matches!(r2, FeedRecord::Definition { .. }));
1000
1001        let r3 = feed.next_record().await.expect("end of stream marker");
1002        assert!(matches!(r3, FeedRecord::EndOfStream));
1003
1004        let r4 = feed.next_record().await;
1005        assert!(
1006            r4.is_none(),
1007            "after EndOfStream and no more records, feed is exhausted"
1008        );
1009    }
1010
1011    #[tokio::test]
1012    async fn empty_fake_feed_is_immediately_exhausted() {
1013        let mut feed = FakeFeed::new(vec![]);
1014        assert!(feed.next_record().await.is_none());
1015    }
1016}
1017
1018// ---------------------------------------------------------------------------
1019// Orchestrator (cycle 8)
1020// ---------------------------------------------------------------------------
1021
1022use super::vol_surface_cache::VolatilitySurface;
1023
1024/// Stateful glue that turns a stream of `FeedRecord`s into a live vol
1025/// surface. Holds the instrument registry, the BBO cache, and the most
1026/// recent forward we've seen for the underlying futures contract.
1027///
1028/// The orchestrator does not own a feed — the oracle (cycle 11) drives
1029/// the async loop and calls `ingest` for each record. This makes
1030/// testing trivial: hand-roll a `Vec<FeedRecord>` and ingest them
1031/// synchronously.
1032///
1033/// # Front-month selection
1034///
1035/// The forward price used for Black-76 is the most recent GC futures
1036/// BBO mid we've observed. If multiple GC contracts stream
1037/// simultaneously (e.g. June + August) the forward will flip-flop
1038/// between them, which is wrong. In practice the Databento subscription
1039/// is pinned to the front-month contract only, so this isn't a problem
1040/// in dev-jake's configuration; a smarter "nearest non-expired contract"
1041/// rule is a v2 item if we ever subscribe to the full curve.
1042#[derive(Debug)]
1043struct DatabentoOrchestrator {
1044    book_cache: BookCache,
1045    instruments: HashMap<InstrumentId, InstrumentKind>,
1046    current_forward: Option<f64>,
1047    risk_free_rate: f64,
1048    max_spread_fraction: f64,
1049    max_staleness_ms: i64,
1050    /// km:GOLD / CME GC scale factor. 1.0 for dev-jake since km:GOLD
1051    /// tracks XAU/USD 1:1 per the 2026-04 research. Stored here so
1052    /// `build_surface` can apply it uniformly on every rebuild.
1053    strike_scale: f64,
1054}
1055
1056impl DatabentoOrchestrator {
1057    fn new(risk_free_rate: f64, strike_scale: f64) -> Self {
1058        Self {
1059            book_cache: BookCache::new(),
1060            instruments: HashMap::new(),
1061            current_forward: None,
1062            risk_free_rate,
1063            max_spread_fraction: 0.20,
1064            max_staleness_ms: 60_000,
1065            strike_scale,
1066        }
1067    }
1068
1069    /// Process one record from the feed. Quiet on transient errors and
1070    /// end-of-stream — those are the caller's concern.
1071    fn ingest(&mut self, record: FeedRecord) {
1072        match record {
1073            FeedRecord::Definition {
1074                instrument_id,
1075                kind,
1076            } => {
1077                self.instruments.insert(instrument_id, kind);
1078            }
1079            FeedRecord::Mbp1 { instrument_id, bbo } => {
1080                self.book_cache.apply(instrument_id, bbo);
1081                if let Some(InstrumentKind::Future { root, .. }) =
1082                    self.instruments.get(&instrument_id)
1083                {
1084                    if root == "GC" {
1085                        let mid = 0.5 * (bbo.bid + bbo.ask);
1086                        if mid.is_finite() && mid > 0.0 {
1087                            self.current_forward = Some(mid);
1088                        }
1089                    }
1090                }
1091            }
1092            FeedRecord::Error(_) | FeedRecord::EndOfStream => {}
1093        }
1094    }
1095
1096    /// Build a fresh `VolatilitySurface` from the current cached state.
1097    /// Returns an empty surface if we don't yet have a forward (no GC
1098    /// BBO has arrived) — callers can check `.is_empty()` and decide
1099    /// whether to expose the oracle as ready.
1100    fn build_surface(&self, now_ms: i64) -> VolatilitySurface {
1101        let mut surface = VolatilitySurface::new();
1102        let Some(forward) = self.current_forward else {
1103            return surface;
1104        };
1105
1106        let ctx = ChainContext {
1107            forward,
1108            risk_free_rate: self.risk_free_rate,
1109            now_ms,
1110            max_spread_fraction: self.max_spread_fraction,
1111            max_staleness_ms: self.max_staleness_ms,
1112        };
1113
1114        // Solve IV in CME space (forward and strike both in CME units), then
1115        // apply `strike_scale` only at the surface-insert step. Black-76 IV
1116        // is scale-invariant under uniform (F, K) rescaling — log-moneyness
1117        // is unchanged — so this produces the correct IV and places the
1118        // point at a km-space strike that downstream lookups can find.
1119        let mut quotes = Vec::new();
1120        for (instrument_id, bbo) in self.book_cache.iter() {
1121            let Some(kind) = self.instruments.get(&instrument_id) else {
1122                continue;
1123            };
1124            if let InstrumentKind::Option {
1125                side,
1126                strike,
1127                expiry_ts,
1128                ..
1129            } = kind
1130            {
1131                quotes.push(OptionQuote {
1132                    strike: *strike,
1133                    expiry_ts: *expiry_ts,
1134                    side: *side,
1135                    bid: bbo.bid,
1136                    ask: bbo.ask,
1137                    // Use the surface-build time, not CME's ts_recv. CME
1138                    // emits MBP1 updates per instrument only when the
1139                    // BBO changes, so for illiquid OG options the "last
1140                    // event" can be many minutes old even though the
1141                    // quote in book_cache is the current best. The 60s
1142                    // per-quote staleness in compute_iv_points was then
1143                    // dropping every option — surface stayed at 0 points
1144                    // with 800k+ MBP1s in flight. Feed-level freshness is
1145                    // already gated by `state.last_update_ts_ms` against
1146                    // `config.staleness_threshold` in `status()`, so we
1147                    // don't lose the dead-feed protection by doing this.
1148                    ingested_at_ms: now_ms,
1149                });
1150            }
1151        }
1152
1153        for point in compute_iv_points(&quotes, &ctx) {
1154            surface.insert(point.strike * self.strike_scale, point.expiry_ts, point.iv);
1155        }
1156
1157        // Project onto the no-arb cone in price space. Black-76 call prices
1158        // equal BS call prices at r=0 with spot replaced by the (already
1159        // discounted) forward, so we can reuse the shared BS-based sanitizer
1160        // by passing the scaled forward and rate=0. Scale the forward the
1161        // same way we scale strikes so both live in the platform's km-space.
1162        let scaled_forward = forward * self.strike_scale;
1163        if scaled_forward > 0.0 {
1164            let clamps = surface.sanitize_arb_free(scaled_forward, 0.0);
1165            if clamps > 0 {
1166                // This orchestrator is GOLD-only (keyed off CME GC as the
1167                // forward root above), so we can hardcode the underlying
1168                // label on the metric.
1169                metrics::counter!(
1170                    "ht_vol_surface_arb_clamps_total",
1171                    "provider" => "databento",
1172                    "underlying" => "GOLD"
1173                )
1174                .increment(clamps as u64);
1175            }
1176        }
1177
1178        surface
1179    }
1180
1181    fn current_forward(&self) -> Option<f64> {
1182        self.current_forward
1183    }
1184}
1185
1186#[cfg(test)]
1187mod orchestrator_tests {
1188    use super::*;
1189    use hypercall_margin::black_76::black_76_price;
1190
1191    fn now_ms() -> i64 {
1192        1_775_000_000_000
1193    }
1194
1195    fn expiry_30d() -> i64 {
1196        now_ms() / 1000 + 30 * 86_400
1197    }
1198
1199    /// Helper: build an MBP-1 record for an option with bid/ask generated
1200    /// from a known sigma, matching the chain_tests helper.
1201    fn option_mbp1(
1202        instrument_id: InstrumentId,
1203        forward: f64,
1204        strike: f64,
1205        side: CmeOptionSide,
1206        sigma: f64,
1207    ) -> FeedRecord {
1208        let tau = 30.0 / 365.0;
1209        let opt_type = match side {
1210            CmeOptionSide::Call => OptionType::Call,
1211            CmeOptionSide::Put => OptionType::Put,
1212        };
1213        let mid = black_76_price(&opt_type, forward, strike, tau, 0.05, sigma);
1214        FeedRecord::Mbp1 {
1215            instrument_id,
1216            bbo: Bbo {
1217                bid: mid - 1.0,
1218                bid_size: 10,
1219                ask: mid + 1.0,
1220                ask_size: 10,
1221                ts_event_ms: now_ms(),
1222            },
1223        }
1224    }
1225
1226    #[test]
1227    fn new_orchestrator_has_no_forward_and_empty_surface() {
1228        let orch = DatabentoOrchestrator::new(0.05, 1.0);
1229        assert!(orch.current_forward().is_none());
1230        assert!(orch.build_surface(now_ms()).is_empty());
1231    }
1232
1233    #[test]
1234    fn gc_future_update_populates_forward() {
1235        let mut orch = DatabentoOrchestrator::new(0.05, 1.0);
1236        orch.ingest(FeedRecord::Definition {
1237            instrument_id: 1,
1238            kind: InstrumentKind::Future {
1239                root: "GC".to_string(),
1240                month_code: 'M',
1241                year_digit: 6,
1242            },
1243        });
1244        orch.ingest(FeedRecord::Mbp1 {
1245            instrument_id: 1,
1246            bbo: Bbo {
1247                bid: 4700.0,
1248                bid_size: 5,
1249                ask: 4702.0,
1250                ask_size: 5,
1251                ts_event_ms: now_ms(),
1252            },
1253        });
1254        assert_eq!(orch.current_forward(), Some(4701.0));
1255    }
1256
1257    #[test]
1258    fn option_quote_without_forward_yields_empty_surface() {
1259        let mut orch = DatabentoOrchestrator::new(0.05, 1.0);
1260        // Define and quote an OG option, but NO GC future — orchestrator
1261        // has no forward, so build_surface returns empty even though a
1262        // valid BBO is sitting in the cache.
1263        orch.ingest(FeedRecord::Definition {
1264            instrument_id: 10,
1265            kind: InstrumentKind::Option {
1266                root: "OG".to_string(),
1267                month_code: 'M',
1268                year_digit: 6,
1269                side: CmeOptionSide::Call,
1270                strike: 4700.0,
1271                expiry_ts: expiry_30d(),
1272            },
1273        });
1274        orch.ingest(option_mbp1(
1275            10,
1276            4700.0, // "pretend forward" for pricing, not actually known to orch
1277            4700.0,
1278            CmeOptionSide::Call,
1279            0.35,
1280        ));
1281
1282        assert!(orch.build_surface(now_ms()).is_empty());
1283    }
1284
1285    #[test]
1286    fn full_pipeline_yields_surface_with_expected_point() {
1287        let mut orch = DatabentoOrchestrator::new(0.05, 1.0);
1288
1289        // GC future definition + BBO → populates forward
1290        orch.ingest(FeedRecord::Definition {
1291            instrument_id: 1,
1292            kind: InstrumentKind::Future {
1293                root: "GC".to_string(),
1294                month_code: 'M',
1295                year_digit: 6,
1296            },
1297        });
1298        orch.ingest(FeedRecord::Mbp1 {
1299            instrument_id: 1,
1300            bbo: Bbo {
1301                bid: 4699.5,
1302                bid_size: 5,
1303                ask: 4700.5,
1304                ask_size: 5,
1305                ts_event_ms: now_ms(),
1306            },
1307        });
1308
1309        // OG option definition + BBO (priced at known sigma = 0.32)
1310        let exp = expiry_30d();
1311        orch.ingest(FeedRecord::Definition {
1312            instrument_id: 10,
1313            kind: InstrumentKind::Option {
1314                root: "OG".to_string(),
1315                month_code: 'M',
1316                year_digit: 6,
1317                side: CmeOptionSide::Call,
1318                strike: 4700.0,
1319                expiry_ts: exp,
1320            },
1321        });
1322        orch.ingest(option_mbp1(10, 4700.0, 4700.0, CmeOptionSide::Call, 0.32));
1323
1324        let surface = orch.build_surface(now_ms());
1325        assert!(!surface.is_empty(), "surface should have the option point");
1326
1327        let iv = surface
1328            .get(4700.0, exp)
1329            .expect("4700@exp should be in the surface");
1330        assert!(
1331            (iv - 0.32).abs() < 5e-3,
1332            "recovered iv {iv} should be near 0.32",
1333        );
1334    }
1335
1336    #[test]
1337    fn multiple_options_populate_multiple_points() {
1338        let mut orch = DatabentoOrchestrator::new(0.05, 1.0);
1339
1340        orch.ingest(FeedRecord::Definition {
1341            instrument_id: 1,
1342            kind: InstrumentKind::Future {
1343                root: "GC".to_string(),
1344                month_code: 'M',
1345                year_digit: 6,
1346            },
1347        });
1348        orch.ingest(FeedRecord::Mbp1 {
1349            instrument_id: 1,
1350            bbo: Bbo {
1351                bid: 4699.5,
1352                bid_size: 5,
1353                ask: 4700.5,
1354                ask_size: 5,
1355                ts_event_ms: now_ms(),
1356            },
1357        });
1358
1359        let exp = expiry_30d();
1360        for (id, strike, side) in [
1361            (10, 4500.0, CmeOptionSide::Put),
1362            (11, 4700.0, CmeOptionSide::Call),
1363            (12, 4900.0, CmeOptionSide::Call),
1364        ] {
1365            orch.ingest(FeedRecord::Definition {
1366                instrument_id: id,
1367                kind: InstrumentKind::Option {
1368                    root: "OG".to_string(),
1369                    month_code: 'M',
1370                    year_digit: 6,
1371                    side,
1372                    strike,
1373                    expiry_ts: exp,
1374                },
1375            });
1376            orch.ingest(option_mbp1(id, 4700.0, strike, side, 0.3));
1377        }
1378
1379        let surface = orch.build_surface(now_ms());
1380        assert_eq!(surface.len(), 3);
1381        assert!(surface.get(4500.0, exp).is_some());
1382        assert!(surface.get(4700.0, exp).is_some());
1383        assert!(surface.get(4900.0, exp).is_some());
1384    }
1385
1386    #[test]
1387    fn error_and_end_of_stream_are_noop_on_orchestrator() {
1388        let mut orch = DatabentoOrchestrator::new(0.05, 1.0);
1389        orch.ingest(FeedRecord::Error("transient".to_string()));
1390        orch.ingest(FeedRecord::EndOfStream);
1391        // Still clean state, still empty surface.
1392        assert!(orch.current_forward().is_none());
1393        assert!(orch.build_surface(now_ms()).is_empty());
1394    }
1395
1396    #[test]
1397    fn strike_scale_places_points_in_km_space() {
1398        // CME GC quotes in ~$2350 range; km:GOLD target space is ~$4700.
1399        // strike_scale = 2.0 mirrors a hypothetical 2x rescaling. IV is
1400        // scale-invariant, so the orchestrator should solve in CME
1401        // space and only rescale strikes at the surface-insert step.
1402        let mut orch = DatabentoOrchestrator::new(0.05, 2.0);
1403
1404        orch.ingest(FeedRecord::Definition {
1405            instrument_id: 1,
1406            kind: InstrumentKind::Future {
1407                root: "GC".to_string(),
1408                month_code: 'M',
1409                year_digit: 6,
1410            },
1411        });
1412        orch.ingest(FeedRecord::Mbp1 {
1413            instrument_id: 1,
1414            bbo: Bbo {
1415                bid: 2349.5,
1416                bid_size: 5,
1417                ask: 2350.5,
1418                ask_size: 5,
1419                ts_event_ms: now_ms(),
1420            },
1421        });
1422
1423        let exp = expiry_30d();
1424        orch.ingest(FeedRecord::Definition {
1425            instrument_id: 10,
1426            kind: InstrumentKind::Option {
1427                root: "OG".to_string(),
1428                month_code: 'M',
1429                year_digit: 6,
1430                side: CmeOptionSide::Call,
1431                strike: 2350.0,
1432                expiry_ts: exp,
1433            },
1434        });
1435        // Mid generated against CME forward = 2350, CME strike = 2350.
1436        orch.ingest(option_mbp1(10, 2350.0, 2350.0, CmeOptionSide::Call, 0.3));
1437
1438        let surface = orch.build_surface(now_ms());
1439        let iv = surface
1440            .get(4700.0, exp)
1441            .expect("scale=2 should place the point at km-space strike 4700");
1442        assert!(
1443            (iv - 0.3).abs() < 5e-3,
1444            "iv should be scale-invariant: got {iv}, expected 0.3",
1445        );
1446        // And the CME-space strike should NOT be in the surface.
1447        assert!(
1448            surface.get(2350.0, exp).is_none(),
1449            "CME-space strike should not exist in a km-space surface",
1450        );
1451    }
1452}
1453
1454// ---------------------------------------------------------------------------
1455// DatabentoVolOracle — the public face (cycle 11)
1456// ---------------------------------------------------------------------------
1457
1458use super::risk_oracle::{
1459    RiskVolOracle, VolLookupError, VolOracleStatus, VolProviderKind, VolSurfaceSnapshot,
1460};
1461use std::sync::{Arc, RwLock};
1462use std::time::Duration;
1463
1464/// Config for the Databento vol oracle.
1465#[derive(Debug, Clone)]
1466pub struct DatabentoVolOracleConfig {
1467    /// Hypercall underlying this oracle serves (e.g. `"GOLD"`). The
1468    /// oracle rejects `get_iv` calls for any other underlying.
1469    pub underlying: String,
1470    /// Risk-free rate used by the Black-76 wrapper. Project default is
1471    /// 0.05 per the r-rate-research agreement.
1472    pub risk_free_rate: f64,
1473    /// Multiplier applied to CME option strikes to land them in the
1474    /// Hypercall underlying's price space. For km:GOLD this is `1.0`.
1475    pub strike_scale: f64,
1476    /// How long before a non-refreshed surface is considered stale.
1477    pub staleness_threshold: Duration,
1478}
1479
1480#[derive(Debug)]
1481struct DatabentoOracleState {
1482    orchestrator: DatabentoOrchestrator,
1483    surface: VolatilitySurface,
1484    last_update_ts_ms: Option<i64>,
1485    last_error: Option<String>,
1486    messages_received: u64,
1487}
1488
1489/// A `RiskVolOracle` backed by a Databento feed. Owns a
1490/// `DatabentoOrchestrator` under an `RwLock` and a cached
1491/// `VolatilitySurface` that is rebuilt on every ingested record.
1492///
1493/// Callers drive the oracle by calling [`ingest`](Self::ingest) for each
1494/// `FeedRecord`, or by awaiting [`run`](Self::run) with an async
1495/// `DatabentoFeed`. In production the factory (cycle 12) spawns a task
1496/// that holds a `DatabentoFeed` over `databento::live::Client` and loops
1497/// on `run`. In tests, `ingest` is called synchronously with records
1498/// from `FakeFeed`.
1499pub struct DatabentoVolOracle {
1500    config: DatabentoVolOracleConfig,
1501    state: Arc<RwLock<DatabentoOracleState>>,
1502}
1503
1504impl DatabentoVolOracle {
1505    pub fn new(config: DatabentoVolOracleConfig) -> Self {
1506        let orchestrator = DatabentoOrchestrator::new(config.risk_free_rate, config.strike_scale);
1507        let state = DatabentoOracleState {
1508            orchestrator,
1509            surface: VolatilitySurface::new(),
1510            last_update_ts_ms: None,
1511            last_error: None,
1512            messages_received: 0,
1513        };
1514        Self {
1515            config,
1516            state: Arc::new(RwLock::new(state)),
1517        }
1518    }
1519
1520    /// Directly set the `last_error` field without ingesting a record.
1521    /// Used by the live feed task helper to surface connect-time errors
1522    /// (which don't flow through the normal `FeedRecord::Error` path
1523    /// because they happen before the run loop starts).
1524    pub fn record_external_error(&self, message: impl Into<String>) {
1525        let mut state = self.state.write().expect("databento oracle state poisoned");
1526        state.last_error = Some(message.into());
1527    }
1528
1529    /// Ingest a single record at "now" = `now_ms`. Tests pass a pinned
1530    /// timestamp; the async `run` loop uses `Utc::now().timestamp_millis()`.
1531    fn ingest(&self, record: FeedRecord, now_ms: i64) {
1532        let mut state = self.state.write().expect("databento oracle state poisoned");
1533        match &record {
1534            FeedRecord::Error(msg) => {
1535                state.last_error = Some(msg.clone());
1536            }
1537            FeedRecord::Mbp1 { .. } => {
1538                state.messages_received += 1;
1539                state.last_error = None;
1540            }
1541            _ => {}
1542        }
1543        state.orchestrator.ingest(record);
1544        state.surface = state.orchestrator.build_surface(now_ms);
1545        if !state.surface.is_empty() {
1546            state.last_update_ts_ms = Some(now_ms);
1547        }
1548    }
1549
1550    /// Drive an async feed until it is exhausted. In production this is
1551    /// spawned on a long-lived tokio task; it returns only when the feed
1552    /// yields `None` (terminal state). Transient `FeedRecord::Error`
1553    /// records are recorded on the oracle but do not break the loop.
1554    pub(crate) async fn run<F: DatabentoFeed>(&self, mut feed: F) {
1555        let mut record_count: u64 = 0;
1556        let mut last_log = std::time::Instant::now();
1557        tracing::info!(
1558            underlying = %self.config.underlying,
1559            "Databento oracle run loop starting — waiting for first record"
1560        );
1561        while let Some(record) = feed.next_record().await {
1562            record_count += 1;
1563            // Log progress every 30s or on the first few records
1564            if record_count <= 5 || last_log.elapsed().as_secs() >= 30 {
1565                let state = self.state.read().expect("state");
1566                tracing::info!(
1567                    underlying = %self.config.underlying,
1568                    record_count,
1569                    messages_received = state.messages_received,
1570                    has_forward = state.orchestrator.current_forward().is_some(),
1571                    forward = ?state.orchestrator.current_forward(),
1572                    surface_points = state.surface.len(),
1573                    record_type = match &record {
1574                        FeedRecord::Definition { .. } => "definition",
1575                        FeedRecord::Mbp1 { .. } => "mbp1",
1576                        FeedRecord::Error(_) => "error",
1577                        FeedRecord::EndOfStream => "end_of_stream",
1578                    },
1579                    "Databento oracle ingesting record"
1580                );
1581                last_log = std::time::Instant::now();
1582            }
1583            self.ingest(record, Utc::now().timestamp_millis());
1584        }
1585        tracing::warn!(
1586            underlying = %self.config.underlying,
1587            record_count,
1588            "Databento oracle run loop ended (feed exhausted)"
1589        );
1590    }
1591
1592    /// Current oracle health for the single configured underlying.
1593    fn status(&self) -> VolOracleStatus {
1594        let state = self.state.read().expect("databento oracle state poisoned");
1595        let last_update_ts_ms = state.last_update_ts_ms;
1596        let staleness_seconds = last_update_ts_ms
1597            .map(|ts| ((Utc::now().timestamp_millis() - ts) as f64 / 1000.0).max(0.0));
1598        let ready = staleness_seconds
1599            .map(|age| age <= self.config.staleness_threshold.as_secs_f64())
1600            .unwrap_or(false)
1601            && !state.surface.is_empty();
1602        let connected = state.orchestrator.current_forward().is_some();
1603
1604        VolOracleStatus {
1605            underlying: self.config.underlying.clone(),
1606            provider: VolProviderKind::Databento,
1607            route_facing: true,
1608            connected,
1609            ready,
1610            last_update_ts_ms,
1611            staleness_seconds,
1612            staleness_threshold_seconds: Some(self.config.staleness_threshold.as_secs_f64()),
1613            surface_points: state.surface.len(),
1614            messages_received: state.messages_received,
1615            last_error: state.last_error.clone(),
1616        }
1617    }
1618}
1619
1620impl RiskVolOracle for DatabentoVolOracle {
1621    fn get_iv(&self, underlying: &str, strike: f64, expiry_ts: i64) -> Result<f64, VolLookupError> {
1622        if underlying != self.config.underlying {
1623            return Err(VolLookupError::UnsupportedUnderlying {
1624                underlying: underlying.to_string(),
1625            });
1626        }
1627
1628        let status = self.status();
1629        if !status.connected {
1630            return Err(VolLookupError::UnhealthyProvider {
1631                underlying: underlying.to_string(),
1632                provider: VolProviderKind::Databento,
1633                reason: status
1634                    .last_error
1635                    .unwrap_or_else(|| "no GC forward received yet".to_string()),
1636            });
1637        }
1638        if !status.ready {
1639            return Err(VolLookupError::StaleSurface {
1640                underlying: underlying.to_string(),
1641                provider: VolProviderKind::Databento,
1642                staleness_seconds: status.staleness_seconds.unwrap_or(f64::INFINITY),
1643                threshold_seconds: self.config.staleness_threshold.as_secs_f64(),
1644            });
1645        }
1646
1647        let state = self.state.read().expect("databento oracle state poisoned");
1648        state
1649            .surface
1650            .get_interpolated(strike, expiry_ts)
1651            .ok_or_else(|| VolLookupError::MissingSurface {
1652                underlying: underlying.to_string(),
1653                provider: VolProviderKind::Databento,
1654                strike,
1655                expiry_ts,
1656            })
1657    }
1658
1659    fn statuses(&self) -> Vec<VolOracleStatus> {
1660        vec![self.status()]
1661    }
1662
1663    /// Expose the live surface so `/monitoring/vol-surface` and
1664    /// `ServerVolProvider` (the MM's client) can read it. Without this
1665    /// override the trait default returned `None` and the MM never saw
1666    /// databento-sourced GOLD quotes even while the oracle held them —
1667    /// Prometheus showed `ht_vol_surface_points{databento,GOLD} = 1896`
1668    /// while the endpoint returned `surfaces: []`.
1669    fn get_surface_snapshot(&self, underlying: &str) -> Option<VolSurfaceSnapshot> {
1670        if underlying != self.config.underlying {
1671            return None;
1672        }
1673        let state = self.state.read().expect("databento oracle state poisoned");
1674        if state.surface.is_empty() {
1675            return None;
1676        }
1677        Some(VolSurfaceSnapshot {
1678            underlying: underlying.to_string(),
1679            last_update_ts_ms: state.last_update_ts_ms,
1680            expiries: state.surface.expiries().iter().copied().collect(),
1681            strike_points: state.surface.export_all_points(),
1682            delta_curves: state.surface.export_delta_curves(),
1683            atm_vols: state.surface.export_atm_vols(),
1684            spot_price: state.orchestrator.current_forward(),
1685        })
1686    }
1687
1688    fn supports_surface_snapshots(&self) -> bool {
1689        true
1690    }
1691}
1692
1693#[cfg(test)]
1694mod oracle_tests {
1695    use super::*;
1696    use hypercall_margin::black_76::black_76_price;
1697
1698    fn now_ms() -> i64 {
1699        1_775_000_000_000
1700    }
1701
1702    fn expiry_30d() -> i64 {
1703        now_ms() / 1000 + 30 * 86_400
1704    }
1705
1706    fn config_for_gold() -> DatabentoVolOracleConfig {
1707        DatabentoVolOracleConfig {
1708            underlying: "GOLD".to_string(),
1709            risk_free_rate: 0.05,
1710            strike_scale: 1.0,
1711            // 10 minutes — wide enough that ingested records aren't
1712            // immediately stale under the Utc::now() staleness check.
1713            staleness_threshold: Duration::from_secs(60 * 60 * 24 * 365 * 100),
1714        }
1715    }
1716
1717    fn option_mbp1(
1718        instrument_id: InstrumentId,
1719        forward: f64,
1720        strike: f64,
1721        sigma: f64,
1722    ) -> FeedRecord {
1723        let tau = 30.0 / 365.0;
1724        let mid = black_76_price(&OptionType::Call, forward, strike, tau, 0.05, sigma);
1725        FeedRecord::Mbp1 {
1726            instrument_id,
1727            bbo: Bbo {
1728                bid: mid - 1.0,
1729                bid_size: 10,
1730                ask: mid + 1.0,
1731                ask_size: 10,
1732                ts_event_ms: now_ms(),
1733            },
1734        }
1735    }
1736
1737    #[test]
1738    fn brand_new_oracle_reports_disconnected_and_get_iv_errors() {
1739        let oracle = DatabentoVolOracle::new(config_for_gold());
1740        let status = oracle.statuses();
1741        assert_eq!(status.len(), 1);
1742        assert!(!status[0].connected);
1743        assert!(!status[0].ready);
1744        assert_eq!(status[0].surface_points, 0);
1745
1746        let err = oracle
1747            .get_iv("GOLD", 4700.0, expiry_30d())
1748            .expect_err("should fail without any data");
1749        assert!(matches!(err, VolLookupError::UnhealthyProvider { .. }));
1750    }
1751
1752    #[test]
1753    fn ingesting_gc_only_is_connected_but_not_ready() {
1754        let oracle = DatabentoVolOracle::new(config_for_gold());
1755        oracle.ingest(
1756            FeedRecord::Definition {
1757                instrument_id: 1,
1758                kind: InstrumentKind::Future {
1759                    root: "GC".to_string(),
1760                    month_code: 'M',
1761                    year_digit: 6,
1762                },
1763            },
1764            now_ms(),
1765        );
1766        oracle.ingest(
1767            FeedRecord::Mbp1 {
1768                instrument_id: 1,
1769                bbo: Bbo {
1770                    bid: 4699.5,
1771                    bid_size: 5,
1772                    ask: 4700.5,
1773                    ask_size: 5,
1774                    ts_event_ms: now_ms(),
1775                },
1776            },
1777            now_ms(),
1778        );
1779
1780        let status = oracle.statuses();
1781        assert!(status[0].connected, "forward arrived -> connected");
1782        assert!(!status[0].ready, "no option points yet -> not ready");
1783    }
1784
1785    #[test]
1786    fn full_ingest_yields_queryable_surface_and_ready_status() {
1787        let oracle = DatabentoVolOracle::new(config_for_gold());
1788        // Future
1789        oracle.ingest(
1790            FeedRecord::Definition {
1791                instrument_id: 1,
1792                kind: InstrumentKind::Future {
1793                    root: "GC".to_string(),
1794                    month_code: 'M',
1795                    year_digit: 6,
1796                },
1797            },
1798            now_ms(),
1799        );
1800        oracle.ingest(
1801            FeedRecord::Mbp1 {
1802                instrument_id: 1,
1803                bbo: Bbo {
1804                    bid: 4699.5,
1805                    bid_size: 5,
1806                    ask: 4700.5,
1807                    ask_size: 5,
1808                    ts_event_ms: now_ms(),
1809                },
1810            },
1811            now_ms(),
1812        );
1813        // Option
1814        let exp = expiry_30d();
1815        oracle.ingest(
1816            FeedRecord::Definition {
1817                instrument_id: 10,
1818                kind: InstrumentKind::Option {
1819                    root: "OG".to_string(),
1820                    month_code: 'M',
1821                    year_digit: 6,
1822                    side: CmeOptionSide::Call,
1823                    strike: 4700.0,
1824                    expiry_ts: exp,
1825                },
1826            },
1827            now_ms(),
1828        );
1829        oracle.ingest(option_mbp1(10, 4700.0, 4700.0, 0.31), now_ms());
1830
1831        let status = oracle.statuses();
1832        assert!(status[0].connected);
1833        assert!(status[0].ready);
1834        assert_eq!(status[0].surface_points, 1);
1835        assert!(status[0].messages_received >= 1);
1836
1837        let iv = oracle
1838            .get_iv("GOLD", 4700.0, exp)
1839            .expect("populated surface should lookup cleanly");
1840        assert!((iv - 0.31).abs() < 5e-3, "got {iv}");
1841    }
1842
1843    #[test]
1844    fn unsupported_underlying_rejects() {
1845        let oracle = DatabentoVolOracle::new(config_for_gold());
1846        let err = oracle
1847            .get_iv("BTC", 100_000.0, expiry_30d())
1848            .expect_err("BTC is not our underlying");
1849        assert!(matches!(err, VolLookupError::UnsupportedUnderlying { .. }));
1850    }
1851
1852    #[test]
1853    fn staleness_threshold_gates_readiness() {
1854        // Tight staleness: 10ms. Even a fresh ingest should flip to stale
1855        // almost immediately once the Utc::now() status check runs.
1856        let mut config = config_for_gold();
1857        config.staleness_threshold = Duration::from_millis(10);
1858        let oracle = DatabentoVolOracle::new(config);
1859
1860        // Seed with a far-past timestamp so staleness_seconds is huge.
1861        let ancient = 100_000_000i64;
1862        oracle.ingest(
1863            FeedRecord::Definition {
1864                instrument_id: 1,
1865                kind: InstrumentKind::Future {
1866                    root: "GC".to_string(),
1867                    month_code: 'M',
1868                    year_digit: 6,
1869                },
1870            },
1871            ancient,
1872        );
1873        oracle.ingest(
1874            FeedRecord::Mbp1 {
1875                instrument_id: 1,
1876                bbo: Bbo {
1877                    bid: 4699.5,
1878                    bid_size: 5,
1879                    ask: 4700.5,
1880                    ask_size: 5,
1881                    ts_event_ms: ancient,
1882                },
1883            },
1884            ancient,
1885        );
1886        let exp = expiry_30d();
1887        oracle.ingest(
1888            FeedRecord::Definition {
1889                instrument_id: 10,
1890                kind: InstrumentKind::Option {
1891                    root: "OG".to_string(),
1892                    month_code: 'M',
1893                    year_digit: 6,
1894                    side: CmeOptionSide::Call,
1895                    strike: 4700.0,
1896                    expiry_ts: exp,
1897                },
1898            },
1899            ancient,
1900        );
1901        oracle.ingest(option_mbp1(10, 4700.0, 4700.0, 0.3), ancient);
1902
1903        let err = oracle
1904            .get_iv("GOLD", 4700.0, exp)
1905            .expect_err("ancient surface should be stale");
1906        assert!(matches!(err, VolLookupError::StaleSurface { .. }));
1907    }
1908
1909    #[test]
1910    fn error_records_track_last_error_field() {
1911        let oracle = DatabentoVolOracle::new(config_for_gold());
1912        oracle.ingest(
1913            FeedRecord::Error("connection reset by peer".to_string()),
1914            now_ms(),
1915        );
1916        let status = oracle.statuses();
1917        assert_eq!(
1918            status[0].last_error.as_deref(),
1919            Some("connection reset by peer")
1920        );
1921    }
1922
1923    #[tokio::test]
1924    async fn run_drains_a_fake_feed() {
1925        // Unlike the synchronous ingest tests (which use a frozen now_ms
1926        // so the chain's staleness check always passes), `run` uses the
1927        // real wall clock. Every record's ts_event_ms and the option
1928        // expiry must be anchored to real-now, otherwise the chain
1929        // aggregator drops everything as stale.
1930        use chrono::Utc;
1931        let oracle = DatabentoVolOracle::new(config_for_gold());
1932        let real_now_ms = Utc::now().timestamp_millis();
1933        let real_exp = Utc::now().timestamp() + 30 * 86_400;
1934
1935        let forward = 4700.0;
1936        let strike = 4700.0;
1937        let sigma = 0.33;
1938        let tau = 30.0 / 365.0;
1939        let mid = black_76_price(&OptionType::Call, forward, strike, tau, 0.05, sigma);
1940
1941        let feed = FakeFeed::new(vec![
1942            FeedRecord::Definition {
1943                instrument_id: 1,
1944                kind: InstrumentKind::Future {
1945                    root: "GC".to_string(),
1946                    month_code: 'M',
1947                    year_digit: 6,
1948                },
1949            },
1950            FeedRecord::Mbp1 {
1951                instrument_id: 1,
1952                bbo: Bbo {
1953                    bid: forward - 0.5,
1954                    bid_size: 5,
1955                    ask: forward + 0.5,
1956                    ask_size: 5,
1957                    ts_event_ms: real_now_ms,
1958                },
1959            },
1960            FeedRecord::Definition {
1961                instrument_id: 10,
1962                kind: InstrumentKind::Option {
1963                    root: "OG".to_string(),
1964                    month_code: 'M',
1965                    year_digit: 6,
1966                    side: CmeOptionSide::Call,
1967                    strike,
1968                    expiry_ts: real_exp,
1969                },
1970            },
1971            FeedRecord::Mbp1 {
1972                instrument_id: 10,
1973                bbo: Bbo {
1974                    bid: mid - 1.0,
1975                    bid_size: 5,
1976                    ask: mid + 1.0,
1977                    ask_size: 5,
1978                    ts_event_ms: real_now_ms,
1979                },
1980            },
1981            FeedRecord::EndOfStream,
1982        ]);
1983
1984        oracle.run(feed).await;
1985
1986        let status = oracle.statuses();
1987        assert!(
1988            status[0].ready,
1989            "feed should have populated a ready surface"
1990        );
1991        assert_eq!(status[0].surface_points, 1);
1992    }
1993}
1994
1995// ---------------------------------------------------------------------------
1996// Real Databento live feed adapter (cycle 15)
1997// ---------------------------------------------------------------------------
1998
1999use databento::dbn::{InstrumentClass, InstrumentDefMsg, Mbp1Msg, SType, Schema};
2000use databento::live::Subscription as DatabentoSubscription;
2001use databento::{LiveClient, Symbols};
2002use time::{Duration as TimeDuration, OffsetDateTime};
2003
2004/// How far back to replay MBP1 on connect. Only used to re-seed `book_cache`
2005/// on pod restart — 5 minutes is plenty, the live tail does the rest.
2006const DATABENTO_MBP1_REPLAY_MINUTES: i64 = 5;
2007
2008/// How long to wait (idle seconds) before we consider the Definition
2009/// snapshot drained and shut that client down. Local reproduction shows
2010/// the full `GLBX.MDP3` GC+OG directory (43 730 records) drains in
2011/// ~5s — a 3s idle window is a comfortable cutoff.
2012const DATABENTO_DEFINITION_DRAIN_IDLE_SECS: u64 = 3;
2013
2014/// Adapter that wraps Databento Live behind our `DatabentoFeed` trait.
2015///
2016/// Databento's Live gateway deadlocks the MBP1 stream when a single
2017/// session carries a Definition subscription with `start = UNIX_EPOCH`
2018/// (full snapshot) alongside an MBP1 subscription with any other start —
2019/// verified both on staging (no MBP1 records after the 43k def snapshot)
2020/// and in a local harness. Splitting into two independent Live clients
2021/// on separate sessions avoids the deadlock: the Definition client
2022/// drains the snapshot and is dropped, and the MBP1 client streams
2023/// live quotes with a short 5-minute replay to re-seed the book.
2024///
2025/// The `DatabentoFeed::next_record` surface is preserved: both reader
2026/// tasks feed a shared unbounded mpsc and `next_record` just receives
2027/// from it.
2028pub struct DatabentoLiveFeed {
2029    rx: tokio::sync::mpsc::UnboundedReceiver<FeedRecord>,
2030    // Tokio JoinHandles don't abort on drop, but the reader tasks will
2031    // exit cleanly when their `UnboundedSender` drops (i.e. when this
2032    // struct and its `rx` are dropped and they try to send). For a
2033    // process-lifetime oracle that's good enough.
2034    _def_task: tokio::task::JoinHandle<()>,
2035    _mbp1_task: tokio::task::JoinHandle<()>,
2036}
2037
2038impl DatabentoLiveFeed {
2039    /// Connect to Databento Live using two independent sessions:
2040    /// one for the Definition snapshot (`start = EPOCH`), one for
2041    /// live MBP1 quotes (`start = now − 5 min` for a short re-seed).
2042    pub async fn connect(api_key: &str, dataset: &str) -> anyhow::Result<Self> {
2043        let symbols = Symbols::Symbols(vec!["GC.FUT".to_string(), "OG.OPT".to_string()]);
2044
2045        // --- Session 1: Definition snapshot ---
2046        let mut def_client = LiveClient::builder()
2047            .key(api_key)?
2048            .dataset(dataset.to_string())
2049            .build()
2050            .await?;
2051        def_client
2052            .subscribe(
2053                DatabentoSubscription::builder()
2054                    .symbols(symbols.clone())
2055                    .schema(Schema::Definition)
2056                    .stype_in(SType::Parent)
2057                    .start(OffsetDateTime::UNIX_EPOCH)
2058                    .build(),
2059            )
2060            .await?;
2061        def_client.start().await?;
2062
2063        // --- Session 2: live MBP1 with short replay ---
2064        let mbp1_start =
2065            OffsetDateTime::now_utc() - TimeDuration::minutes(DATABENTO_MBP1_REPLAY_MINUTES);
2066        let mut mbp1_client = LiveClient::builder()
2067            .key(api_key)?
2068            .dataset(dataset.to_string())
2069            .build()
2070            .await?;
2071        mbp1_client
2072            .subscribe(
2073                DatabentoSubscription::builder()
2074                    .symbols(symbols)
2075                    .schema(Schema::Mbp1)
2076                    .stype_in(SType::Parent)
2077                    .start(mbp1_start)
2078                    .build(),
2079            )
2080            .await?;
2081        mbp1_client.start().await?;
2082
2083        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
2084
2085        // Def reader: drain the snapshot until the stream goes idle,
2086        // then exit (and drop the client). Live definitions that arrive
2087        // after the initial snapshot are rare and would require
2088        // long-lived session management to forward cleanly.
2089        let def_tx = tx.clone();
2090        let def_task = tokio::spawn(Self::run_definition_reader(def_client, def_tx));
2091
2092        // MBP1 reader: pump records into the channel forever.
2093        let mbp1_tx = tx;
2094        let mbp1_task = tokio::spawn(Self::run_mbp1_reader(mbp1_client, mbp1_tx));
2095
2096        Ok(Self {
2097            rx,
2098            _def_task: def_task,
2099            _mbp1_task: mbp1_task,
2100        })
2101    }
2102
2103    /// Drive the Definition-only client until it goes idle, converting
2104    /// each matching `InstrumentDefMsg` into `FeedRecord::Definition`.
2105    async fn run_definition_reader(
2106        mut client: LiveClient,
2107        tx: tokio::sync::mpsc::UnboundedSender<FeedRecord>,
2108    ) {
2109        let idle = std::time::Duration::from_secs(DATABENTO_DEFINITION_DRAIN_IDLE_SECS);
2110        let poll = std::time::Duration::from_millis(500);
2111        let mut seen = 0u64;
2112        let mut sent = 0u64;
2113        let mut last_activity = std::time::Instant::now();
2114        loop {
2115            match tokio::time::timeout(poll, client.next_record()).await {
2116                Err(_) => {
2117                    // Idle tick. Only treat quiet as "snapshot done" after
2118                    // we've received something — otherwise we'd bail before
2119                    // the first record on a slow-start session.
2120                    if seen > 0 && last_activity.elapsed() > idle {
2121                        tracing::info!(
2122                            defs_seen = seen,
2123                            defs_sent = sent,
2124                            "Databento definition snapshot drained"
2125                        );
2126                        break;
2127                    }
2128                }
2129                Ok(Ok(Some(rec))) => {
2130                    seen += 1;
2131                    last_activity = std::time::Instant::now();
2132                    if let Some(def) = rec.get::<InstrumentDefMsg>() {
2133                        if let Some(kind) = Self::definition_to_kind(def) {
2134                            sent += 1;
2135                            if tx
2136                                .send(FeedRecord::Definition {
2137                                    instrument_id: def.hd.instrument_id as InstrumentId,
2138                                    kind,
2139                                })
2140                                .is_err()
2141                            {
2142                                tracing::info!(
2143                                    "Databento def receiver dropped; stopping def reader"
2144                                );
2145                                return;
2146                            }
2147                        }
2148                    }
2149                }
2150                Ok(Ok(None)) => {
2151                    tracing::info!(defs_seen = seen, "Databento def stream ended");
2152                    break;
2153                }
2154                Ok(Err(e)) => {
2155                    tracing::error!(error = %e, "Databento def client error");
2156                    let _ = tx.send(FeedRecord::Error(format!("databento def: {e}")));
2157                    break;
2158                }
2159            }
2160        }
2161    }
2162
2163    /// Drive the MBP1 client forever, converting matching `Mbp1Msg` into
2164    /// `FeedRecord::Mbp1` and forwarding them through the channel.
2165    async fn run_mbp1_reader(
2166        mut client: LiveClient,
2167        tx: tokio::sync::mpsc::UnboundedSender<FeedRecord>,
2168    ) {
2169        loop {
2170            match client.next_record().await {
2171                Ok(Some(rec)) => {
2172                    if let Some(msg) = rec.get::<Mbp1Msg>() {
2173                        let Some(bbo) = Self::mbp1_to_bbo(msg) else {
2174                            continue;
2175                        };
2176                        if tx
2177                            .send(FeedRecord::Mbp1 {
2178                                instrument_id: msg.hd.instrument_id as InstrumentId,
2179                                bbo,
2180                            })
2181                            .is_err()
2182                        {
2183                            tracing::info!("Databento mbp1 receiver dropped; stopping mbp1 reader");
2184                            return;
2185                        }
2186                    }
2187                }
2188                Ok(None) => {
2189                    tracing::warn!("Databento mbp1 stream ended");
2190                    let _ = tx.send(FeedRecord::EndOfStream);
2191                    return;
2192                }
2193                Err(e) => {
2194                    tracing::error!(error = %e, "Databento mbp1 client error");
2195                    let _ = tx.send(FeedRecord::Error(format!("databento mbp1: {e}")));
2196                    return;
2197                }
2198            }
2199        }
2200    }
2201
2202    /// Translate a Databento `InstrumentDefMsg` into our internal
2203    /// `InstrumentKind`, or return `None` for instruments we don't
2204    /// care about (non-GC futures, non-gold options, unknown classes).
2205    fn definition_to_kind(def: &InstrumentDefMsg) -> Option<InstrumentKind> {
2206        let class = def.instrument_class().ok()?;
2207        let asset = def.asset().ok()?.to_string();
2208
2209        match class {
2210            InstrumentClass::Future => {
2211                if asset != "GC" {
2212                    return None;
2213                }
2214                // We don't use month_code/year_digit downstream — the
2215                // orchestrator only gates on `root == "GC"`. Use
2216                // placeholder values.
2217                Some(InstrumentKind::Future {
2218                    root: asset,
2219                    month_code: '?',
2220                    year_digit: 0,
2221                })
2222            }
2223            InstrumentClass::Call | InstrumentClass::Put => {
2224                // OG options on GC futures have `asset == "OG"` in the
2225                // current GLBX.MDP3 definitions (verified against a live
2226                // snapshot: 19 445 Calls + 19 445 Puts all carry asset=OG).
2227                // The previous check insisted on "GC" and silently filtered
2228                // every option out, leaving the oracle with no options to
2229                // fit. Anything else is a different product.
2230                if asset != "OG" {
2231                    return None;
2232                }
2233                let side = if matches!(class, InstrumentClass::Call) {
2234                    CmeOptionSide::Call
2235                } else {
2236                    CmeOptionSide::Put
2237                };
2238                let strike = def.strike_price as f64 / 1e9;
2239                if !strike.is_finite() || strike <= 0.0 {
2240                    return None;
2241                }
2242                // expiration is u64 nanoseconds since epoch.
2243                let expiry_ts = (def.expiration / 1_000_000_000) as i64;
2244                if expiry_ts <= 0 {
2245                    return None;
2246                }
2247                Some(InstrumentKind::Option {
2248                    root: "OG".to_string(),
2249                    month_code: '?',
2250                    year_digit: 0,
2251                    side,
2252                    strike,
2253                    expiry_ts,
2254                })
2255            }
2256            _ => None,
2257        }
2258    }
2259
2260    fn mbp1_to_bbo(msg: &Mbp1Msg) -> Option<Bbo> {
2261        let level = msg.levels.first()?;
2262        let bid = level.bid_px as f64 / 1e9;
2263        let ask = level.ask_px as f64 / 1e9;
2264        if !bid.is_finite() || !ask.is_finite() {
2265            return None;
2266        }
2267        Some(Bbo {
2268            bid,
2269            bid_size: level.bid_sz as u64,
2270            ask,
2271            ask_size: level.ask_sz as u64,
2272            ts_event_ms: (msg.ts_recv / 1_000_000) as i64,
2273        })
2274    }
2275}
2276
2277#[async_trait]
2278impl DatabentoFeed for DatabentoLiveFeed {
2279    async fn next_record(&mut self) -> Option<FeedRecord> {
2280        // The reader tasks do all the filtering and variant-building; here
2281        // we just receive. `recv` returns `None` when both tasks have exited
2282        // (and therefore both halves of the channel are dropped), which the
2283        // oracle treats as a terminal condition.
2284        self.rx.recv().await
2285    }
2286}
2287
2288/// Initial delay before the first reconnect attempt.
2289const RECONNECT_BASE_DELAY: std::time::Duration = std::time::Duration::from_secs(5);
2290/// Maximum delay between reconnect attempts (capped exponential backoff).
2291const RECONNECT_MAX_DELAY: std::time::Duration = std::time::Duration::from_secs(300);
2292
2293/// Spawns a detached tokio task that connects to Databento, drives the
2294/// oracle's `run` loop, and automatically reconnects with exponential
2295/// backoff when the feed drops or fails to connect.
2296pub fn spawn_databento_live_feed_task(
2297    oracle: Arc<DatabentoVolOracle>,
2298    api_key: String,
2299    dataset: String,
2300) {
2301    tokio::spawn(async move {
2302        let mut attempt: u32 = 0;
2303        loop {
2304            match DatabentoLiveFeed::connect(&api_key, &dataset).await {
2305                Ok(feed) => {
2306                    let connected_at = tokio::time::Instant::now();
2307                    tracing::info!(
2308                        dataset = %dataset,
2309                        "Databento live feed connected — running oracle loop"
2310                    );
2311                    oracle.run(feed).await;
2312                    if connected_at.elapsed() > std::time::Duration::from_secs(60) {
2313                        attempt = 0;
2314                    }
2315                    let msg = "Databento live feed run loop exited — will reconnect";
2316                    tracing::warn!("{msg}");
2317                    oracle.record_external_error(msg.to_string());
2318                }
2319                Err(err) => {
2320                    let msg = format!("Failed to connect Databento live feed ({dataset}): {err}");
2321                    tracing::error!(error = %msg, attempt, "Databento connect failed — will retry");
2322                    oracle.record_external_error(msg);
2323                }
2324            }
2325
2326            let delay = RECONNECT_BASE_DELAY.saturating_mul(2u32.saturating_pow(attempt.min(6)));
2327            let delay = delay.min(RECONNECT_MAX_DELAY);
2328            tracing::info!(
2329                dataset = %dataset,
2330                delay_secs = delay.as_secs(),
2331                attempt,
2332                "Databento feed reconnecting after backoff"
2333            );
2334            tokio::time::sleep(delay).await;
2335            attempt = attempt.saturating_add(1);
2336        }
2337    });
2338}
2339
2340#[cfg(test)]
2341mod symbology_tests {
2342    use super::*;
2343
2344    #[test]
2345    fn month_codes_map_to_numbers() {
2346        let pairs = [
2347            ('F', 1),
2348            ('G', 2),
2349            ('H', 3),
2350            ('J', 4),
2351            ('K', 5),
2352            ('M', 6),
2353            ('N', 7),
2354            ('Q', 8),
2355            ('U', 9),
2356            ('V', 10),
2357            ('X', 11),
2358            ('Z', 12),
2359        ];
2360        for (code, expected) in pairs {
2361            assert_eq!(cme_month_code_to_number(code), Some(expected), "{code}");
2362        }
2363        assert_eq!(cme_month_code_to_number('A'), None);
2364        assert_eq!(cme_month_code_to_number('I'), None); // not a valid CME code
2365    }
2366
2367    #[test]
2368    fn parses_gcm6_as_gold_june_2026_future() {
2369        let parsed = CmeFutureSymbol::parse("GCM6").expect("GCM6 should parse");
2370        assert_eq!(
2371            parsed,
2372            CmeFutureSymbol {
2373                root: "GC".to_string(),
2374                month_code: 'M',
2375                year_digit: 6,
2376            }
2377        );
2378    }
2379
2380    #[test]
2381    fn parses_ogm6_c_2400_as_june_2026_gold_call_at_2400() {
2382        let parsed = CmeOptionSymbol::parse("OGM6 C 2400").expect("OGM6 C 2400 should parse");
2383        assert_eq!(
2384            parsed,
2385            CmeOptionSymbol {
2386                root: "OG".to_string(),
2387                month_code: 'M',
2388                year_digit: 6,
2389                side: CmeOptionSide::Call,
2390                strike: 2400.0,
2391            }
2392        );
2393    }
2394
2395    #[test]
2396    fn parses_put_side() {
2397        let parsed = CmeOptionSymbol::parse("OGM6 P 4800").expect("put should parse");
2398        assert_eq!(parsed.side, CmeOptionSide::Put);
2399        assert_eq!(parsed.strike, 4800.0);
2400    }
2401
2402    #[test]
2403    fn parses_fractional_strike() {
2404        // Some CME products list fractional strikes (e.g. $0.25 ticks).
2405        let parsed = CmeOptionSymbol::parse("OGM6 C 4700.5").expect("fractional should parse");
2406        assert_eq!(parsed.strike, 4700.5);
2407    }
2408
2409    #[test]
2410    fn parses_future_with_extra_whitespace() {
2411        assert_eq!(
2412            CmeFutureSymbol::parse("  GCM6  ").unwrap().root,
2413            "GC".to_string()
2414        );
2415    }
2416
2417    #[test]
2418    fn rejects_wrong_length_future() {
2419        assert!(CmeFutureSymbol::parse("").is_err());
2420        assert!(CmeFutureSymbol::parse("GC").is_err());
2421        assert!(CmeFutureSymbol::parse("GCM62").is_err());
2422    }
2423
2424    #[test]
2425    fn rejects_lowercase_root() {
2426        assert!(CmeFutureSymbol::parse("gcM6").is_err());
2427        assert!(CmeOptionSymbol::parse("ogM6 C 2400").is_err());
2428    }
2429
2430    #[test]
2431    fn rejects_invalid_month_code() {
2432        // 'I' is not in the CME codes.
2433        assert!(CmeFutureSymbol::parse("GCI6").is_err());
2434        assert!(CmeOptionSymbol::parse("OGI6 C 2400").is_err());
2435    }
2436
2437    #[test]
2438    fn rejects_non_digit_year() {
2439        assert!(CmeFutureSymbol::parse("GCMX").is_err());
2440    }
2441
2442    #[test]
2443    fn rejects_non_cp_side() {
2444        assert!(CmeOptionSymbol::parse("OGM6 X 2400").is_err());
2445    }
2446
2447    #[test]
2448    fn rejects_non_positive_strike() {
2449        assert!(CmeOptionSymbol::parse("OGM6 C 0").is_err());
2450        assert!(CmeOptionSymbol::parse("OGM6 C -100").is_err());
2451        assert!(CmeOptionSymbol::parse("OGM6 C not_a_number").is_err());
2452    }
2453
2454    #[test]
2455    fn rejects_option_missing_parts() {
2456        assert!(CmeOptionSymbol::parse("OGM6 C").is_err());
2457        assert!(CmeOptionSymbol::parse("OGM6").is_err());
2458        assert!(CmeOptionSymbol::parse("").is_err());
2459    }
2460}