1#![allow(dead_code)]
13
14fn cme_month_code_to_number(code: char) -> Option<u32> {
29 match code {
30 'F' => Some(1),
31 'G' => Some(2),
32 'H' => Some(3),
33 'J' => Some(4),
34 'K' => Some(5),
35 'M' => Some(6),
36 'N' => Some(7),
37 'Q' => Some(8),
38 'U' => Some(9),
39 'V' => Some(10),
40 'X' => Some(11),
41 'Z' => Some(12),
42 _ => None,
43 }
44}
45
46#[derive(Debug, Clone, PartialEq, Eq)]
53struct CmeFutureSymbol {
54 root: String,
55 month_code: char,
56 year_digit: u8,
57}
58
59impl CmeFutureSymbol {
60 fn parse(raw: &str) -> Result<Self, String> {
61 let trimmed = raw.trim();
62 if trimmed.len() != 4 {
63 return Err(format!(
64 "expected 4-char CME future symbol, got {trimmed:?}"
65 ));
66 }
67 let mut chars = trimmed.chars();
68 let root: String = [chars.next().unwrap(), chars.next().unwrap()]
69 .iter()
70 .collect();
71 let month_code = chars.next().unwrap();
72 let year_char = chars.next().unwrap();
73
74 if !root.chars().all(|c| c.is_ascii_uppercase()) {
75 return Err(format!("future root must be uppercase ASCII: {root:?}"));
76 }
77 if cme_month_code_to_number(month_code).is_none() {
78 return Err(format!("invalid CME month code {month_code:?}"));
79 }
80 let year_digit = year_char
81 .to_digit(10)
82 .ok_or_else(|| format!("year digit must be 0-9, got {year_char:?}"))?
83 as u8;
84
85 Ok(Self {
86 root,
87 month_code,
88 year_digit,
89 })
90 }
91}
92
93#[derive(Debug, Clone, Copy, PartialEq, Eq)]
95pub(crate) enum CmeOptionSide {
96 Call,
97 Put,
98}
99
100#[derive(Debug, Clone, PartialEq)]
105struct CmeOptionSymbol {
106 root: String,
107 month_code: char,
108 year_digit: u8,
109 side: CmeOptionSide,
110 strike: f64,
111}
112
113impl CmeOptionSymbol {
114 fn parse(raw: &str) -> Result<Self, String> {
115 let parts: Vec<&str> = raw.split_ascii_whitespace().collect();
116 if parts.len() != 3 {
117 return Err(format!(
118 "expected '<root><month><year> <C|P> <strike>', got {raw:?}"
119 ));
120 }
121 let (contract, side_str, strike_str) = (parts[0], parts[1], parts[2]);
122
123 if contract.len() != 4 {
124 return Err(format!("contract segment must be 4 chars: {contract:?}"));
125 }
126 let mut chars = contract.chars();
127 let root: String = [chars.next().unwrap(), chars.next().unwrap()]
128 .iter()
129 .collect();
130 let month_code = chars.next().unwrap();
131 let year_char = chars.next().unwrap();
132
133 if !root.chars().all(|c| c.is_ascii_uppercase()) {
134 return Err(format!("option root must be uppercase ASCII: {root:?}"));
135 }
136 if cme_month_code_to_number(month_code).is_none() {
137 return Err(format!("invalid CME month code {month_code:?}"));
138 }
139 let year_digit = year_char
140 .to_digit(10)
141 .ok_or_else(|| format!("year digit must be 0-9, got {year_char:?}"))?
142 as u8;
143
144 let side = match side_str {
145 "C" | "c" => CmeOptionSide::Call,
146 "P" | "p" => CmeOptionSide::Put,
147 other => return Err(format!("option side must be C or P, got {other:?}")),
148 };
149
150 let strike = strike_str
151 .parse::<f64>()
152 .map_err(|err| format!("invalid strike {strike_str:?}: {err}"))?;
153 if !strike.is_finite() || strike <= 0.0 {
154 return Err(format!("strike must be positive finite, got {strike}"));
155 }
156
157 Ok(Self {
158 root,
159 month_code,
160 year_digit,
161 side,
162 strike,
163 })
164 }
165}
166
167use chrono::{DateTime, Datelike, NaiveDate, TimeZone, Utc, Weekday};
172
173fn og_monthly_expiry(option_year: i32, option_month: u32) -> Option<DateTime<Utc>> {
193 if !(1..=12).contains(&option_month) {
194 return None;
195 }
196
197 let (prior_year, prior_month) = if option_month == 1 {
198 (option_year - 1, 12)
199 } else {
200 (option_year, option_month - 1)
201 };
202
203 let last_day = last_day_of_month(prior_year, prior_month)?;
204 let mut date = NaiveDate::from_ymd_opt(prior_year, prior_month, last_day)?;
205
206 let mut business_days_found = 0u32;
207 loop {
208 if is_weekday(date) {
209 business_days_found += 1;
210 if business_days_found == 4 {
211 break;
212 }
213 }
214 date = date.pred_opt()?;
215 }
216
217 Some(Utc.from_utc_datetime(&date.and_hms_opt(0, 0, 0)?))
218}
219
220fn last_day_of_month(year: i32, month: u32) -> Option<u32> {
221 let (next_year, next_month) = if month == 12 {
222 (year + 1, 1)
223 } else {
224 (year, month + 1)
225 };
226 NaiveDate::from_ymd_opt(next_year, next_month, 1)
227 .and_then(|d| d.pred_opt())
228 .map(|d| d.day())
229}
230
231fn is_weekday(date: NaiveDate) -> bool {
232 !matches!(date.weekday(), Weekday::Sat | Weekday::Sun)
233}
234
235#[cfg(test)]
236mod expiry_tests {
237 use super::*;
238 use chrono::TimeZone;
239
240 #[test]
245 fn ogm6_expires_on_2026_05_26() {
246 let expiry = og_monthly_expiry(2026, 6).expect("OGM6 should have a valid expiry");
247 let expected = Utc.with_ymd_and_hms(2026, 5, 26, 0, 0, 0).unwrap();
248 assert_eq!(expiry, expected, "got {expiry}");
249 }
250
251 #[test]
254 fn ogu6_expires_on_2026_08_26() {
255 let expiry = og_monthly_expiry(2026, 9).expect("OGU6 should have a valid expiry");
256 let expected = Utc.with_ymd_and_hms(2026, 8, 26, 0, 0, 0).unwrap();
257 assert_eq!(expiry, expected, "got {expiry}");
258 }
259
260 #[test]
263 fn ogf7_wraps_to_prior_year_december() {
264 let expiry = og_monthly_expiry(2027, 1).expect("OGF7 should have a valid expiry");
265 let expected = Utc.with_ymd_and_hms(2026, 12, 28, 0, 0, 0).unwrap();
266 assert_eq!(expiry, expected, "got {expiry}");
267 }
268
269 #[test]
272 fn ogh7_expires_on_2027_02_23() {
273 let expiry = og_monthly_expiry(2027, 3).expect("OGH7 should have a valid expiry");
274 let expected = Utc.with_ymd_and_hms(2027, 2, 23, 0, 0, 0).unwrap();
275 assert_eq!(expiry, expected, "got {expiry}");
276 }
277
278 #[test]
279 fn invalid_month_returns_none() {
280 assert!(og_monthly_expiry(2026, 0).is_none());
281 assert!(og_monthly_expiry(2026, 13).is_none());
282 }
283
284 #[test]
285 fn last_day_of_month_handles_leap_and_year_wrap() {
286 assert_eq!(last_day_of_month(2024, 2), Some(29)); assert_eq!(last_day_of_month(2025, 2), Some(28));
288 assert_eq!(last_day_of_month(2026, 12), Some(31)); assert_eq!(last_day_of_month(2026, 4), Some(30));
290 }
291
292 #[test]
293 fn is_weekday_classifies_all_seven_days() {
294 let sunday = NaiveDate::from_ymd_opt(2026, 4, 5).unwrap();
295 let monday = NaiveDate::from_ymd_opt(2026, 4, 6).unwrap();
296 let friday = NaiveDate::from_ymd_opt(2026, 4, 10).unwrap();
297 let saturday = NaiveDate::from_ymd_opt(2026, 4, 11).unwrap();
298 assert!(!is_weekday(sunday));
299 assert!(is_weekday(monday));
300 assert!(is_weekday(friday));
301 assert!(!is_weekday(saturday));
302 }
303}
304
305use hypercall_margin::black_76::black_76_implied_vol;
310use hypercall_margin::OptionType;
311
312#[derive(Debug, Clone)]
317struct ChainContext {
318 forward: f64,
320 risk_free_rate: f64,
323 now_ms: i64,
326 max_spread_fraction: f64,
329 max_staleness_ms: i64,
332}
333
334impl Default for ChainContext {
335 fn default() -> Self {
336 Self {
337 forward: 0.0,
338 risk_free_rate: 0.05,
339 now_ms: 0,
340 max_spread_fraction: 0.20,
341 max_staleness_ms: 60_000,
342 }
343 }
344}
345
346#[derive(Debug, Clone)]
350struct OptionQuote {
351 strike: f64,
352 expiry_ts: i64,
355 side: CmeOptionSide,
356 bid: f64,
357 ask: f64,
358 ingested_at_ms: i64,
360}
361
362#[derive(Debug, Clone, Copy, PartialEq)]
365struct FittedIvPoint {
366 strike: f64,
367 expiry_ts: i64,
368 iv: f64,
369}
370
371const SECONDS_PER_YEAR: f64 = 365.0 * 86_400.0;
375
376fn compute_iv_points(quotes: &[OptionQuote], ctx: &ChainContext) -> Vec<FittedIvPoint> {
380 let mut fitted = Vec::with_capacity(quotes.len());
381
382 if !ctx.forward.is_finite() || ctx.forward <= 0.0 {
383 return fitted;
384 }
385
386 for quote in quotes {
387 if quote.bid <= 0.0 || quote.ask <= 0.0 || quote.bid >= quote.ask {
389 continue;
390 }
391 if !quote.bid.is_finite() || !quote.ask.is_finite() {
392 continue;
393 }
394
395 if ctx.now_ms.saturating_sub(quote.ingested_at_ms) > ctx.max_staleness_ms {
397 continue;
398 }
399
400 let mid = 0.5 * (quote.bid + quote.ask);
402 let spread = quote.ask - quote.bid;
403 if mid <= 0.0 || spread / mid > ctx.max_spread_fraction {
404 continue;
405 }
406
407 let now_secs = ctx.now_ms / 1000;
410 let seconds_to_expiry = quote.expiry_ts.saturating_sub(now_secs);
411 if seconds_to_expiry <= 0 {
412 continue;
413 }
414 let tau = seconds_to_expiry as f64 / SECONDS_PER_YEAR;
415 if !tau.is_finite() || tau <= 0.0 {
416 continue;
417 }
418
419 let option_type = match quote.side {
420 CmeOptionSide::Call => OptionType::Call,
421 CmeOptionSide::Put => OptionType::Put,
422 };
423
424 let Some(iv) = black_76_implied_vol(
428 &option_type,
429 ctx.forward,
430 quote.strike,
431 tau,
432 ctx.risk_free_rate,
433 mid,
434 ) else {
435 continue;
436 };
437
438 if !iv.is_finite() || iv <= 0.0 {
439 continue;
440 }
441
442 fitted.push(FittedIvPoint {
443 strike: quote.strike,
444 expiry_ts: quote.expiry_ts,
445 iv,
446 });
447 }
448
449 fitted
450}
451
452#[cfg(test)]
453mod chain_tests {
454 use super::*;
455 use hypercall_margin::black_76::black_76_price;
456
457 fn ctx(forward: f64, now_ms: i64) -> ChainContext {
460 ChainContext {
461 forward,
462 now_ms,
463 ..ChainContext::default()
464 }
465 }
466
467 fn quote_at_sigma(
471 strike: f64,
472 expiry_ts: i64,
473 side: CmeOptionSide,
474 ctx: &ChainContext,
475 sigma: f64,
476 half_spread: f64,
477 ingested_at_ms: i64,
478 ) -> OptionQuote {
479 let option_type = match side {
480 CmeOptionSide::Call => OptionType::Call,
481 CmeOptionSide::Put => OptionType::Put,
482 };
483 let tau = ((expiry_ts - ctx.now_ms / 1000) as f64) / SECONDS_PER_YEAR;
484 let mid = black_76_price(
485 &option_type,
486 ctx.forward,
487 strike,
488 tau,
489 ctx.risk_free_rate,
490 sigma,
491 );
492 OptionQuote {
493 strike,
494 expiry_ts,
495 side,
496 bid: mid - half_spread,
497 ask: mid + half_spread,
498 ingested_at_ms,
499 }
500 }
501
502 fn expiry_30d(now_ms: i64) -> i64 {
504 now_ms / 1000 + 30 * 86_400
505 }
506
507 #[test]
508 fn happy_path_recovers_sigma_for_three_strikes() {
509 let now_ms = 1_775_000_000_000_i64;
510 let ctx = ctx(4700.0, now_ms);
511 let exp = expiry_30d(now_ms);
512 let sigma = 0.35;
513
514 let quotes = vec![
515 quote_at_sigma(4500.0, exp, CmeOptionSide::Put, &ctx, sigma, 1.0, now_ms),
516 quote_at_sigma(4700.0, exp, CmeOptionSide::Call, &ctx, sigma, 1.0, now_ms),
517 quote_at_sigma(4900.0, exp, CmeOptionSide::Call, &ctx, sigma, 1.0, now_ms),
518 ];
519
520 let points = compute_iv_points("es, &ctx);
521 assert_eq!(points.len(), 3, "all 3 clean quotes should fit");
522 for p in &points {
523 assert!(
524 (p.iv - sigma).abs() < 1e-3,
525 "fitted iv {} should be close to {sigma}",
526 p.iv
527 );
528 }
529 }
530
531 #[test]
532 fn drops_zero_or_inverted_bids_and_asks() {
533 let now_ms = 1_775_000_000_000i64;
534 let ctx = ctx(4700.0, now_ms);
535 let exp = expiry_30d(now_ms);
536
537 let mut q = quote_at_sigma(4700.0, exp, CmeOptionSide::Call, &ctx, 0.3, 1.0, now_ms);
538 let zero_bid = OptionQuote {
539 bid: 0.0,
540 ..q.clone()
541 };
542 let zero_ask = OptionQuote {
543 ask: 0.0,
544 ..q.clone()
545 };
546 let inverted = OptionQuote {
547 bid: q.ask,
548 ask: q.bid,
549 ..q.clone()
550 };
551 q.bid = q.bid.max(0.001);
553
554 let points = compute_iv_points(&[zero_bid, zero_ask, inverted, q], &ctx);
555 assert_eq!(points.len(), 1, "only the clean quote should survive");
556 }
557
558 #[test]
559 fn drops_quotes_with_spread_wider_than_context() {
560 let now_ms = 1_775_000_000_000i64;
561 let ctx = ctx(4700.0, now_ms); let exp = expiry_30d(now_ms);
563
564 let wide = {
567 let mut q = quote_at_sigma(3800.0, exp, CmeOptionSide::Put, &ctx, 0.3, 20.0, now_ms);
568 q.bid = 0.01;
569 q.ask = 40.0;
570 q
571 };
572
573 assert!(compute_iv_points(&[wide], &ctx).is_empty());
574 }
575
576 #[test]
577 fn drops_stale_quotes() {
578 let now_ms = 1_775_000_000_000i64;
579 let ctx = ctx(4700.0, now_ms);
580 let exp = expiry_30d(now_ms);
581
582 let stale = quote_at_sigma(
584 4700.0,
585 exp,
586 CmeOptionSide::Call,
587 &ctx,
588 0.3,
589 1.0,
590 now_ms - 120_000,
591 );
592 assert!(compute_iv_points(&[stale], &ctx).is_empty());
593 }
594
595 #[test]
596 fn drops_below_intrinsic_quotes() {
597 let now_ms = 1_775_000_000_000i64;
598 let ctx = ctx(5000.0, now_ms);
599 let exp = expiry_30d(now_ms);
600
601 let q = OptionQuote {
604 strike: 4000.0,
605 expiry_ts: exp,
606 side: CmeOptionSide::Call,
607 bid: 95.0,
608 ask: 105.0,
609 ingested_at_ms: now_ms,
610 };
611 assert!(compute_iv_points(&[q], &ctx).is_empty());
612 }
613
614 #[test]
615 fn drops_already_expired_quotes() {
616 let now_ms = 1_775_000_000_000i64;
617 let ctx = ctx(4700.0, now_ms);
618 let already_expired = now_ms / 1000 - 1;
619
620 let q = OptionQuote {
621 strike: 4700.0,
622 expiry_ts: already_expired,
623 side: CmeOptionSide::Call,
624 bid: 50.0,
625 ask: 52.0,
626 ingested_at_ms: now_ms,
627 };
628 assert!(compute_iv_points(&[q], &ctx).is_empty());
629 }
630
631 #[test]
632 fn empty_input_is_not_an_error() {
633 let ctx = ctx(4700.0, 1_775_000_000_000);
634 assert!(compute_iv_points(&[], &ctx).is_empty());
635 }
636
637 #[test]
638 fn non_positive_forward_returns_empty() {
639 let ctx = ctx(-1.0, 1_775_000_000_000);
640 let exp = expiry_30d(ctx.now_ms);
642 let q = OptionQuote {
643 strike: 4700.0,
644 expiry_ts: exp,
645 side: CmeOptionSide::Call,
646 bid: 50.0,
647 ask: 52.0,
648 ingested_at_ms: ctx.now_ms,
649 };
650 assert!(compute_iv_points(&[q], &ctx).is_empty());
651 }
652}
653
654use std::collections::HashMap;
659
660type InstrumentId = u64;
662
663#[derive(Debug, Clone, Copy, PartialEq)]
667pub(crate) struct Bbo {
668 bid: f64,
669 bid_size: u64,
670 ask: f64,
671 ask_size: u64,
672 ts_event_ms: i64,
675}
676
677#[derive(Debug, Default)]
685struct BookCache {
686 books: HashMap<InstrumentId, Bbo>,
687}
688
689impl BookCache {
690 fn new() -> Self {
691 Self::default()
692 }
693
694 fn apply(&mut self, instrument_id: InstrumentId, bbo: Bbo) {
698 self.books.insert(instrument_id, bbo);
699 }
700
701 fn get(&self, instrument_id: InstrumentId) -> Option<&Bbo> {
704 self.books.get(&instrument_id)
705 }
706
707 fn delete(&mut self, instrument_id: InstrumentId) -> bool {
710 self.books.remove(&instrument_id).is_some()
711 }
712
713 fn len(&self) -> usize {
714 self.books.len()
715 }
716
717 #[allow(dead_code)]
718 fn is_empty(&self) -> bool {
719 self.books.is_empty()
720 }
721
722 fn iter(&self) -> impl Iterator<Item = (InstrumentId, &Bbo)> {
724 self.books.iter().map(|(id, bbo)| (*id, bbo))
725 }
726}
727
728#[cfg(test)]
729mod book_cache_tests {
730 use super::*;
731
732 fn sample_bbo(bid: f64, ask: f64, ts_ms: i64) -> Bbo {
733 Bbo {
734 bid,
735 bid_size: 10,
736 ask,
737 ask_size: 10,
738 ts_event_ms: ts_ms,
739 }
740 }
741
742 #[test]
743 fn apply_then_get_returns_the_bbo() {
744 let mut cache = BookCache::new();
745 let bbo = sample_bbo(100.0, 101.0, 1_700_000_000_000);
746 cache.apply(42, bbo);
747 assert_eq!(cache.get(42), Some(&bbo));
748 assert_eq!(cache.len(), 1);
749 }
750
751 #[test]
752 fn apply_replaces_existing_snapshot() {
753 let mut cache = BookCache::new();
754 cache.apply(42, sample_bbo(100.0, 101.0, 1));
755 cache.apply(42, sample_bbo(100.5, 101.5, 2));
756 let got = cache.get(42).expect("should still be present");
757 assert_eq!(got.bid, 100.5);
758 assert_eq!(got.ask, 101.5);
759 assert_eq!(got.ts_event_ms, 2);
760 assert_eq!(cache.len(), 1, "same instrument, still one entry");
761 }
762
763 #[test]
764 fn get_on_unknown_instrument_is_none() {
765 let cache = BookCache::new();
766 assert!(cache.get(999).is_none());
767 }
768
769 #[test]
770 fn delete_removes_and_reports() {
771 let mut cache = BookCache::new();
772 cache.apply(1, sample_bbo(1.0, 2.0, 0));
773 cache.apply(2, sample_bbo(3.0, 4.0, 0));
774 assert!(cache.delete(1));
775 assert!(!cache.delete(1)); assert_eq!(cache.len(), 1);
777 assert!(cache.get(1).is_none());
778 assert!(cache.get(2).is_some());
779 }
780
781 #[test]
782 fn iter_yields_all_entries() {
783 let mut cache = BookCache::new();
784 cache.apply(10, sample_bbo(1.0, 2.0, 100));
785 cache.apply(20, sample_bbo(3.0, 4.0, 200));
786 cache.apply(30, sample_bbo(5.0, 6.0, 300));
787
788 let mut collected: Vec<(InstrumentId, Bbo)> =
789 cache.iter().map(|(id, bbo)| (id, *bbo)).collect();
790 collected.sort_by_key(|(id, _)| *id);
791
792 assert_eq!(collected.len(), 3);
793 assert_eq!(collected[0].0, 10);
794 assert_eq!(collected[1].0, 20);
795 assert_eq!(collected[2].0, 30);
796 }
797
798 #[test]
799 fn new_cache_is_empty() {
800 let cache = BookCache::new();
801 assert!(cache.is_empty());
802 assert_eq!(cache.len(), 0);
803 }
804}
805
806use async_trait::async_trait;
811
812#[derive(Debug, Clone, PartialEq)]
815pub(crate) enum InstrumentKind {
816 Future {
817 root: String,
818 month_code: char,
819 year_digit: u8,
820 },
821 Option {
822 root: String,
823 month_code: char,
824 year_digit: u8,
825 side: CmeOptionSide,
826 strike: f64,
827 expiry_ts: i64,
828 },
829}
830
831#[derive(Debug, Clone, PartialEq)]
840pub(crate) enum FeedRecord {
841 Definition {
844 instrument_id: InstrumentId,
845 kind: InstrumentKind,
846 },
847 Mbp1 {
849 instrument_id: InstrumentId,
850 bbo: Bbo,
851 },
852 Error(String),
855 EndOfStream,
858}
859
860#[async_trait]
867pub(crate) trait DatabentoFeed: Send + Sync {
868 async fn next_record(&mut self) -> Option<FeedRecord>;
873}
874
875struct FakeFeed {
879 records: std::collections::VecDeque<FeedRecord>,
880}
881
882impl FakeFeed {
883 fn new(records: Vec<FeedRecord>) -> Self {
884 Self {
885 records: records.into(),
886 }
887 }
888}
889
890#[async_trait]
891impl DatabentoFeed for FakeFeed {
892 async fn next_record(&mut self) -> Option<FeedRecord> {
893 self.records.pop_front()
894 }
895}
896
897#[cfg(test)]
898mod feed_tests {
899 use super::*;
900
901 fn sample_gc_definition() -> FeedRecord {
902 FeedRecord::Definition {
903 instrument_id: 1,
904 kind: InstrumentKind::Future {
905 root: "GC".to_string(),
906 month_code: 'M',
907 year_digit: 6,
908 },
909 }
910 }
911
912 fn sample_og_definition() -> FeedRecord {
913 FeedRecord::Definition {
914 instrument_id: 2,
915 kind: InstrumentKind::Option {
916 root: "OG".to_string(),
917 month_code: 'M',
918 year_digit: 6,
919 side: CmeOptionSide::Call,
920 strike: 4700.0,
921 expiry_ts: 1_777_000_000,
922 },
923 }
924 }
925
926 fn sample_bbo_update(instrument_id: InstrumentId) -> FeedRecord {
927 FeedRecord::Mbp1 {
928 instrument_id,
929 bbo: Bbo {
930 bid: 100.0,
931 bid_size: 5,
932 ask: 101.0,
933 ask_size: 5,
934 ts_event_ms: 1_700_000_000_000,
935 },
936 }
937 }
938
939 #[tokio::test]
940 async fn fake_feed_replays_records_in_order() {
941 let mut feed = FakeFeed::new(vec![
942 sample_gc_definition(),
943 sample_og_definition(),
944 sample_bbo_update(1),
945 sample_bbo_update(2),
946 ]);
947
948 let r1 = feed.next_record().await.expect("record 1");
949 let r2 = feed.next_record().await.expect("record 2");
950 let r3 = feed.next_record().await.expect("record 3");
951 let r4 = feed.next_record().await.expect("record 4");
952 let r5 = feed.next_record().await;
953
954 assert!(matches!(
955 r1,
956 FeedRecord::Definition {
957 instrument_id: 1,
958 ..
959 }
960 ));
961 assert!(matches!(
962 r2,
963 FeedRecord::Definition {
964 instrument_id: 2,
965 ..
966 }
967 ));
968 assert!(matches!(
969 r3,
970 FeedRecord::Mbp1 {
971 instrument_id: 1,
972 ..
973 }
974 ));
975 assert!(matches!(
976 r4,
977 FeedRecord::Mbp1 {
978 instrument_id: 2,
979 ..
980 }
981 ));
982 assert!(r5.is_none(), "feed should be exhausted after 4 records");
983 }
984
985 #[tokio::test]
986 async fn fake_feed_propagates_error_and_continues() {
987 let mut feed = FakeFeed::new(vec![
990 FeedRecord::Error("transient: connection reset".to_string()),
991 sample_gc_definition(),
992 FeedRecord::EndOfStream,
993 ]);
994
995 let r1 = feed.next_record().await.expect("error record");
996 assert!(matches!(r1, FeedRecord::Error(_)));
997
998 let r2 = feed.next_record().await.expect("definition record");
999 assert!(matches!(r2, FeedRecord::Definition { .. }));
1000
1001 let r3 = feed.next_record().await.expect("end of stream marker");
1002 assert!(matches!(r3, FeedRecord::EndOfStream));
1003
1004 let r4 = feed.next_record().await;
1005 assert!(
1006 r4.is_none(),
1007 "after EndOfStream and no more records, feed is exhausted"
1008 );
1009 }
1010
1011 #[tokio::test]
1012 async fn empty_fake_feed_is_immediately_exhausted() {
1013 let mut feed = FakeFeed::new(vec![]);
1014 assert!(feed.next_record().await.is_none());
1015 }
1016}
1017
1018use super::vol_surface_cache::VolatilitySurface;
1023
1024#[derive(Debug)]
1043struct DatabentoOrchestrator {
1044 book_cache: BookCache,
1045 instruments: HashMap<InstrumentId, InstrumentKind>,
1046 current_forward: Option<f64>,
1047 risk_free_rate: f64,
1048 max_spread_fraction: f64,
1049 max_staleness_ms: i64,
1050 strike_scale: f64,
1054}
1055
1056impl DatabentoOrchestrator {
1057 fn new(risk_free_rate: f64, strike_scale: f64) -> Self {
1058 Self {
1059 book_cache: BookCache::new(),
1060 instruments: HashMap::new(),
1061 current_forward: None,
1062 risk_free_rate,
1063 max_spread_fraction: 0.20,
1064 max_staleness_ms: 60_000,
1065 strike_scale,
1066 }
1067 }
1068
1069 fn ingest(&mut self, record: FeedRecord) {
1072 match record {
1073 FeedRecord::Definition {
1074 instrument_id,
1075 kind,
1076 } => {
1077 self.instruments.insert(instrument_id, kind);
1078 }
1079 FeedRecord::Mbp1 { instrument_id, bbo } => {
1080 self.book_cache.apply(instrument_id, bbo);
1081 if let Some(InstrumentKind::Future { root, .. }) =
1082 self.instruments.get(&instrument_id)
1083 {
1084 if root == "GC" {
1085 let mid = 0.5 * (bbo.bid + bbo.ask);
1086 if mid.is_finite() && mid > 0.0 {
1087 self.current_forward = Some(mid);
1088 }
1089 }
1090 }
1091 }
1092 FeedRecord::Error(_) | FeedRecord::EndOfStream => {}
1093 }
1094 }
1095
1096 fn build_surface(&self, now_ms: i64) -> VolatilitySurface {
1101 let mut surface = VolatilitySurface::new();
1102 let Some(forward) = self.current_forward else {
1103 return surface;
1104 };
1105
1106 let ctx = ChainContext {
1107 forward,
1108 risk_free_rate: self.risk_free_rate,
1109 now_ms,
1110 max_spread_fraction: self.max_spread_fraction,
1111 max_staleness_ms: self.max_staleness_ms,
1112 };
1113
1114 let mut quotes = Vec::new();
1120 for (instrument_id, bbo) in self.book_cache.iter() {
1121 let Some(kind) = self.instruments.get(&instrument_id) else {
1122 continue;
1123 };
1124 if let InstrumentKind::Option {
1125 side,
1126 strike,
1127 expiry_ts,
1128 ..
1129 } = kind
1130 {
1131 quotes.push(OptionQuote {
1132 strike: *strike,
1133 expiry_ts: *expiry_ts,
1134 side: *side,
1135 bid: bbo.bid,
1136 ask: bbo.ask,
1137 ingested_at_ms: now_ms,
1149 });
1150 }
1151 }
1152
1153 for point in compute_iv_points("es, &ctx) {
1154 surface.insert(point.strike * self.strike_scale, point.expiry_ts, point.iv);
1155 }
1156
1157 let scaled_forward = forward * self.strike_scale;
1163 if scaled_forward > 0.0 {
1164 let clamps = surface.sanitize_arb_free(scaled_forward, 0.0);
1165 if clamps > 0 {
1166 metrics::counter!(
1170 "ht_vol_surface_arb_clamps_total",
1171 "provider" => "databento",
1172 "underlying" => "GOLD"
1173 )
1174 .increment(clamps as u64);
1175 }
1176 }
1177
1178 surface
1179 }
1180
1181 fn current_forward(&self) -> Option<f64> {
1182 self.current_forward
1183 }
1184}
1185
1186#[cfg(test)]
1187mod orchestrator_tests {
1188 use super::*;
1189 use hypercall_margin::black_76::black_76_price;
1190
1191 fn now_ms() -> i64 {
1192 1_775_000_000_000
1193 }
1194
1195 fn expiry_30d() -> i64 {
1196 now_ms() / 1000 + 30 * 86_400
1197 }
1198
1199 fn option_mbp1(
1202 instrument_id: InstrumentId,
1203 forward: f64,
1204 strike: f64,
1205 side: CmeOptionSide,
1206 sigma: f64,
1207 ) -> FeedRecord {
1208 let tau = 30.0 / 365.0;
1209 let opt_type = match side {
1210 CmeOptionSide::Call => OptionType::Call,
1211 CmeOptionSide::Put => OptionType::Put,
1212 };
1213 let mid = black_76_price(&opt_type, forward, strike, tau, 0.05, sigma);
1214 FeedRecord::Mbp1 {
1215 instrument_id,
1216 bbo: Bbo {
1217 bid: mid - 1.0,
1218 bid_size: 10,
1219 ask: mid + 1.0,
1220 ask_size: 10,
1221 ts_event_ms: now_ms(),
1222 },
1223 }
1224 }
1225
1226 #[test]
1227 fn new_orchestrator_has_no_forward_and_empty_surface() {
1228 let orch = DatabentoOrchestrator::new(0.05, 1.0);
1229 assert!(orch.current_forward().is_none());
1230 assert!(orch.build_surface(now_ms()).is_empty());
1231 }
1232
1233 #[test]
1234 fn gc_future_update_populates_forward() {
1235 let mut orch = DatabentoOrchestrator::new(0.05, 1.0);
1236 orch.ingest(FeedRecord::Definition {
1237 instrument_id: 1,
1238 kind: InstrumentKind::Future {
1239 root: "GC".to_string(),
1240 month_code: 'M',
1241 year_digit: 6,
1242 },
1243 });
1244 orch.ingest(FeedRecord::Mbp1 {
1245 instrument_id: 1,
1246 bbo: Bbo {
1247 bid: 4700.0,
1248 bid_size: 5,
1249 ask: 4702.0,
1250 ask_size: 5,
1251 ts_event_ms: now_ms(),
1252 },
1253 });
1254 assert_eq!(orch.current_forward(), Some(4701.0));
1255 }
1256
1257 #[test]
1258 fn option_quote_without_forward_yields_empty_surface() {
1259 let mut orch = DatabentoOrchestrator::new(0.05, 1.0);
1260 orch.ingest(FeedRecord::Definition {
1264 instrument_id: 10,
1265 kind: InstrumentKind::Option {
1266 root: "OG".to_string(),
1267 month_code: 'M',
1268 year_digit: 6,
1269 side: CmeOptionSide::Call,
1270 strike: 4700.0,
1271 expiry_ts: expiry_30d(),
1272 },
1273 });
1274 orch.ingest(option_mbp1(
1275 10,
1276 4700.0, 4700.0,
1278 CmeOptionSide::Call,
1279 0.35,
1280 ));
1281
1282 assert!(orch.build_surface(now_ms()).is_empty());
1283 }
1284
1285 #[test]
1286 fn full_pipeline_yields_surface_with_expected_point() {
1287 let mut orch = DatabentoOrchestrator::new(0.05, 1.0);
1288
1289 orch.ingest(FeedRecord::Definition {
1291 instrument_id: 1,
1292 kind: InstrumentKind::Future {
1293 root: "GC".to_string(),
1294 month_code: 'M',
1295 year_digit: 6,
1296 },
1297 });
1298 orch.ingest(FeedRecord::Mbp1 {
1299 instrument_id: 1,
1300 bbo: Bbo {
1301 bid: 4699.5,
1302 bid_size: 5,
1303 ask: 4700.5,
1304 ask_size: 5,
1305 ts_event_ms: now_ms(),
1306 },
1307 });
1308
1309 let exp = expiry_30d();
1311 orch.ingest(FeedRecord::Definition {
1312 instrument_id: 10,
1313 kind: InstrumentKind::Option {
1314 root: "OG".to_string(),
1315 month_code: 'M',
1316 year_digit: 6,
1317 side: CmeOptionSide::Call,
1318 strike: 4700.0,
1319 expiry_ts: exp,
1320 },
1321 });
1322 orch.ingest(option_mbp1(10, 4700.0, 4700.0, CmeOptionSide::Call, 0.32));
1323
1324 let surface = orch.build_surface(now_ms());
1325 assert!(!surface.is_empty(), "surface should have the option point");
1326
1327 let iv = surface
1328 .get(4700.0, exp)
1329 .expect("4700@exp should be in the surface");
1330 assert!(
1331 (iv - 0.32).abs() < 5e-3,
1332 "recovered iv {iv} should be near 0.32",
1333 );
1334 }
1335
1336 #[test]
1337 fn multiple_options_populate_multiple_points() {
1338 let mut orch = DatabentoOrchestrator::new(0.05, 1.0);
1339
1340 orch.ingest(FeedRecord::Definition {
1341 instrument_id: 1,
1342 kind: InstrumentKind::Future {
1343 root: "GC".to_string(),
1344 month_code: 'M',
1345 year_digit: 6,
1346 },
1347 });
1348 orch.ingest(FeedRecord::Mbp1 {
1349 instrument_id: 1,
1350 bbo: Bbo {
1351 bid: 4699.5,
1352 bid_size: 5,
1353 ask: 4700.5,
1354 ask_size: 5,
1355 ts_event_ms: now_ms(),
1356 },
1357 });
1358
1359 let exp = expiry_30d();
1360 for (id, strike, side) in [
1361 (10, 4500.0, CmeOptionSide::Put),
1362 (11, 4700.0, CmeOptionSide::Call),
1363 (12, 4900.0, CmeOptionSide::Call),
1364 ] {
1365 orch.ingest(FeedRecord::Definition {
1366 instrument_id: id,
1367 kind: InstrumentKind::Option {
1368 root: "OG".to_string(),
1369 month_code: 'M',
1370 year_digit: 6,
1371 side,
1372 strike,
1373 expiry_ts: exp,
1374 },
1375 });
1376 orch.ingest(option_mbp1(id, 4700.0, strike, side, 0.3));
1377 }
1378
1379 let surface = orch.build_surface(now_ms());
1380 assert_eq!(surface.len(), 3);
1381 assert!(surface.get(4500.0, exp).is_some());
1382 assert!(surface.get(4700.0, exp).is_some());
1383 assert!(surface.get(4900.0, exp).is_some());
1384 }
1385
1386 #[test]
1387 fn error_and_end_of_stream_are_noop_on_orchestrator() {
1388 let mut orch = DatabentoOrchestrator::new(0.05, 1.0);
1389 orch.ingest(FeedRecord::Error("transient".to_string()));
1390 orch.ingest(FeedRecord::EndOfStream);
1391 assert!(orch.current_forward().is_none());
1393 assert!(orch.build_surface(now_ms()).is_empty());
1394 }
1395
1396 #[test]
1397 fn strike_scale_places_points_in_km_space() {
1398 let mut orch = DatabentoOrchestrator::new(0.05, 2.0);
1403
1404 orch.ingest(FeedRecord::Definition {
1405 instrument_id: 1,
1406 kind: InstrumentKind::Future {
1407 root: "GC".to_string(),
1408 month_code: 'M',
1409 year_digit: 6,
1410 },
1411 });
1412 orch.ingest(FeedRecord::Mbp1 {
1413 instrument_id: 1,
1414 bbo: Bbo {
1415 bid: 2349.5,
1416 bid_size: 5,
1417 ask: 2350.5,
1418 ask_size: 5,
1419 ts_event_ms: now_ms(),
1420 },
1421 });
1422
1423 let exp = expiry_30d();
1424 orch.ingest(FeedRecord::Definition {
1425 instrument_id: 10,
1426 kind: InstrumentKind::Option {
1427 root: "OG".to_string(),
1428 month_code: 'M',
1429 year_digit: 6,
1430 side: CmeOptionSide::Call,
1431 strike: 2350.0,
1432 expiry_ts: exp,
1433 },
1434 });
1435 orch.ingest(option_mbp1(10, 2350.0, 2350.0, CmeOptionSide::Call, 0.3));
1437
1438 let surface = orch.build_surface(now_ms());
1439 let iv = surface
1440 .get(4700.0, exp)
1441 .expect("scale=2 should place the point at km-space strike 4700");
1442 assert!(
1443 (iv - 0.3).abs() < 5e-3,
1444 "iv should be scale-invariant: got {iv}, expected 0.3",
1445 );
1446 assert!(
1448 surface.get(2350.0, exp).is_none(),
1449 "CME-space strike should not exist in a km-space surface",
1450 );
1451 }
1452}
1453
1454use super::risk_oracle::{
1459 RiskVolOracle, VolLookupError, VolOracleStatus, VolProviderKind, VolSurfaceSnapshot,
1460};
1461use std::sync::{Arc, RwLock};
1462use std::time::Duration;
1463
1464#[derive(Debug, Clone)]
1466pub struct DatabentoVolOracleConfig {
1467 pub underlying: String,
1470 pub risk_free_rate: f64,
1473 pub strike_scale: f64,
1476 pub staleness_threshold: Duration,
1478}
1479
1480#[derive(Debug)]
1481struct DatabentoOracleState {
1482 orchestrator: DatabentoOrchestrator,
1483 surface: VolatilitySurface,
1484 last_update_ts_ms: Option<i64>,
1485 last_error: Option<String>,
1486 messages_received: u64,
1487}
1488
1489pub struct DatabentoVolOracle {
1500 config: DatabentoVolOracleConfig,
1501 state: Arc<RwLock<DatabentoOracleState>>,
1502}
1503
1504impl DatabentoVolOracle {
1505 pub fn new(config: DatabentoVolOracleConfig) -> Self {
1506 let orchestrator = DatabentoOrchestrator::new(config.risk_free_rate, config.strike_scale);
1507 let state = DatabentoOracleState {
1508 orchestrator,
1509 surface: VolatilitySurface::new(),
1510 last_update_ts_ms: None,
1511 last_error: None,
1512 messages_received: 0,
1513 };
1514 Self {
1515 config,
1516 state: Arc::new(RwLock::new(state)),
1517 }
1518 }
1519
1520 pub fn record_external_error(&self, message: impl Into<String>) {
1525 let mut state = self.state.write().expect("databento oracle state poisoned");
1526 state.last_error = Some(message.into());
1527 }
1528
1529 fn ingest(&self, record: FeedRecord, now_ms: i64) {
1532 let mut state = self.state.write().expect("databento oracle state poisoned");
1533 match &record {
1534 FeedRecord::Error(msg) => {
1535 state.last_error = Some(msg.clone());
1536 }
1537 FeedRecord::Mbp1 { .. } => {
1538 state.messages_received += 1;
1539 state.last_error = None;
1540 }
1541 _ => {}
1542 }
1543 state.orchestrator.ingest(record);
1544 state.surface = state.orchestrator.build_surface(now_ms);
1545 if !state.surface.is_empty() {
1546 state.last_update_ts_ms = Some(now_ms);
1547 }
1548 }
1549
1550 pub(crate) async fn run<F: DatabentoFeed>(&self, mut feed: F) {
1555 let mut record_count: u64 = 0;
1556 let mut last_log = std::time::Instant::now();
1557 tracing::info!(
1558 underlying = %self.config.underlying,
1559 "Databento oracle run loop starting — waiting for first record"
1560 );
1561 while let Some(record) = feed.next_record().await {
1562 record_count += 1;
1563 if record_count <= 5 || last_log.elapsed().as_secs() >= 30 {
1565 let state = self.state.read().expect("state");
1566 tracing::info!(
1567 underlying = %self.config.underlying,
1568 record_count,
1569 messages_received = state.messages_received,
1570 has_forward = state.orchestrator.current_forward().is_some(),
1571 forward = ?state.orchestrator.current_forward(),
1572 surface_points = state.surface.len(),
1573 record_type = match &record {
1574 FeedRecord::Definition { .. } => "definition",
1575 FeedRecord::Mbp1 { .. } => "mbp1",
1576 FeedRecord::Error(_) => "error",
1577 FeedRecord::EndOfStream => "end_of_stream",
1578 },
1579 "Databento oracle ingesting record"
1580 );
1581 last_log = std::time::Instant::now();
1582 }
1583 self.ingest(record, Utc::now().timestamp_millis());
1584 }
1585 tracing::warn!(
1586 underlying = %self.config.underlying,
1587 record_count,
1588 "Databento oracle run loop ended (feed exhausted)"
1589 );
1590 }
1591
1592 fn status(&self) -> VolOracleStatus {
1594 let state = self.state.read().expect("databento oracle state poisoned");
1595 let last_update_ts_ms = state.last_update_ts_ms;
1596 let staleness_seconds = last_update_ts_ms
1597 .map(|ts| ((Utc::now().timestamp_millis() - ts) as f64 / 1000.0).max(0.0));
1598 let ready = staleness_seconds
1599 .map(|age| age <= self.config.staleness_threshold.as_secs_f64())
1600 .unwrap_or(false)
1601 && !state.surface.is_empty();
1602 let connected = state.orchestrator.current_forward().is_some();
1603
1604 VolOracleStatus {
1605 underlying: self.config.underlying.clone(),
1606 provider: VolProviderKind::Databento,
1607 route_facing: true,
1608 connected,
1609 ready,
1610 last_update_ts_ms,
1611 staleness_seconds,
1612 staleness_threshold_seconds: Some(self.config.staleness_threshold.as_secs_f64()),
1613 surface_points: state.surface.len(),
1614 messages_received: state.messages_received,
1615 last_error: state.last_error.clone(),
1616 }
1617 }
1618}
1619
1620impl RiskVolOracle for DatabentoVolOracle {
1621 fn get_iv(&self, underlying: &str, strike: f64, expiry_ts: i64) -> Result<f64, VolLookupError> {
1622 if underlying != self.config.underlying {
1623 return Err(VolLookupError::UnsupportedUnderlying {
1624 underlying: underlying.to_string(),
1625 });
1626 }
1627
1628 let status = self.status();
1629 if !status.connected {
1630 return Err(VolLookupError::UnhealthyProvider {
1631 underlying: underlying.to_string(),
1632 provider: VolProviderKind::Databento,
1633 reason: status
1634 .last_error
1635 .unwrap_or_else(|| "no GC forward received yet".to_string()),
1636 });
1637 }
1638 if !status.ready {
1639 return Err(VolLookupError::StaleSurface {
1640 underlying: underlying.to_string(),
1641 provider: VolProviderKind::Databento,
1642 staleness_seconds: status.staleness_seconds.unwrap_or(f64::INFINITY),
1643 threshold_seconds: self.config.staleness_threshold.as_secs_f64(),
1644 });
1645 }
1646
1647 let state = self.state.read().expect("databento oracle state poisoned");
1648 state
1649 .surface
1650 .get_interpolated(strike, expiry_ts)
1651 .ok_or_else(|| VolLookupError::MissingSurface {
1652 underlying: underlying.to_string(),
1653 provider: VolProviderKind::Databento,
1654 strike,
1655 expiry_ts,
1656 })
1657 }
1658
1659 fn statuses(&self) -> Vec<VolOracleStatus> {
1660 vec![self.status()]
1661 }
1662
1663 fn get_surface_snapshot(&self, underlying: &str) -> Option<VolSurfaceSnapshot> {
1670 if underlying != self.config.underlying {
1671 return None;
1672 }
1673 let state = self.state.read().expect("databento oracle state poisoned");
1674 if state.surface.is_empty() {
1675 return None;
1676 }
1677 Some(VolSurfaceSnapshot {
1678 underlying: underlying.to_string(),
1679 last_update_ts_ms: state.last_update_ts_ms,
1680 expiries: state.surface.expiries().iter().copied().collect(),
1681 strike_points: state.surface.export_all_points(),
1682 delta_curves: state.surface.export_delta_curves(),
1683 atm_vols: state.surface.export_atm_vols(),
1684 spot_price: state.orchestrator.current_forward(),
1685 })
1686 }
1687
1688 fn supports_surface_snapshots(&self) -> bool {
1689 true
1690 }
1691}
1692
1693#[cfg(test)]
1694mod oracle_tests {
1695 use super::*;
1696 use hypercall_margin::black_76::black_76_price;
1697
1698 fn now_ms() -> i64 {
1699 1_775_000_000_000
1700 }
1701
1702 fn expiry_30d() -> i64 {
1703 now_ms() / 1000 + 30 * 86_400
1704 }
1705
1706 fn config_for_gold() -> DatabentoVolOracleConfig {
1707 DatabentoVolOracleConfig {
1708 underlying: "GOLD".to_string(),
1709 risk_free_rate: 0.05,
1710 strike_scale: 1.0,
1711 staleness_threshold: Duration::from_secs(60 * 60 * 24 * 365 * 100),
1714 }
1715 }
1716
1717 fn option_mbp1(
1718 instrument_id: InstrumentId,
1719 forward: f64,
1720 strike: f64,
1721 sigma: f64,
1722 ) -> FeedRecord {
1723 let tau = 30.0 / 365.0;
1724 let mid = black_76_price(&OptionType::Call, forward, strike, tau, 0.05, sigma);
1725 FeedRecord::Mbp1 {
1726 instrument_id,
1727 bbo: Bbo {
1728 bid: mid - 1.0,
1729 bid_size: 10,
1730 ask: mid + 1.0,
1731 ask_size: 10,
1732 ts_event_ms: now_ms(),
1733 },
1734 }
1735 }
1736
1737 #[test]
1738 fn brand_new_oracle_reports_disconnected_and_get_iv_errors() {
1739 let oracle = DatabentoVolOracle::new(config_for_gold());
1740 let status = oracle.statuses();
1741 assert_eq!(status.len(), 1);
1742 assert!(!status[0].connected);
1743 assert!(!status[0].ready);
1744 assert_eq!(status[0].surface_points, 0);
1745
1746 let err = oracle
1747 .get_iv("GOLD", 4700.0, expiry_30d())
1748 .expect_err("should fail without any data");
1749 assert!(matches!(err, VolLookupError::UnhealthyProvider { .. }));
1750 }
1751
1752 #[test]
1753 fn ingesting_gc_only_is_connected_but_not_ready() {
1754 let oracle = DatabentoVolOracle::new(config_for_gold());
1755 oracle.ingest(
1756 FeedRecord::Definition {
1757 instrument_id: 1,
1758 kind: InstrumentKind::Future {
1759 root: "GC".to_string(),
1760 month_code: 'M',
1761 year_digit: 6,
1762 },
1763 },
1764 now_ms(),
1765 );
1766 oracle.ingest(
1767 FeedRecord::Mbp1 {
1768 instrument_id: 1,
1769 bbo: Bbo {
1770 bid: 4699.5,
1771 bid_size: 5,
1772 ask: 4700.5,
1773 ask_size: 5,
1774 ts_event_ms: now_ms(),
1775 },
1776 },
1777 now_ms(),
1778 );
1779
1780 let status = oracle.statuses();
1781 assert!(status[0].connected, "forward arrived -> connected");
1782 assert!(!status[0].ready, "no option points yet -> not ready");
1783 }
1784
1785 #[test]
1786 fn full_ingest_yields_queryable_surface_and_ready_status() {
1787 let oracle = DatabentoVolOracle::new(config_for_gold());
1788 oracle.ingest(
1790 FeedRecord::Definition {
1791 instrument_id: 1,
1792 kind: InstrumentKind::Future {
1793 root: "GC".to_string(),
1794 month_code: 'M',
1795 year_digit: 6,
1796 },
1797 },
1798 now_ms(),
1799 );
1800 oracle.ingest(
1801 FeedRecord::Mbp1 {
1802 instrument_id: 1,
1803 bbo: Bbo {
1804 bid: 4699.5,
1805 bid_size: 5,
1806 ask: 4700.5,
1807 ask_size: 5,
1808 ts_event_ms: now_ms(),
1809 },
1810 },
1811 now_ms(),
1812 );
1813 let exp = expiry_30d();
1815 oracle.ingest(
1816 FeedRecord::Definition {
1817 instrument_id: 10,
1818 kind: InstrumentKind::Option {
1819 root: "OG".to_string(),
1820 month_code: 'M',
1821 year_digit: 6,
1822 side: CmeOptionSide::Call,
1823 strike: 4700.0,
1824 expiry_ts: exp,
1825 },
1826 },
1827 now_ms(),
1828 );
1829 oracle.ingest(option_mbp1(10, 4700.0, 4700.0, 0.31), now_ms());
1830
1831 let status = oracle.statuses();
1832 assert!(status[0].connected);
1833 assert!(status[0].ready);
1834 assert_eq!(status[0].surface_points, 1);
1835 assert!(status[0].messages_received >= 1);
1836
1837 let iv = oracle
1838 .get_iv("GOLD", 4700.0, exp)
1839 .expect("populated surface should lookup cleanly");
1840 assert!((iv - 0.31).abs() < 5e-3, "got {iv}");
1841 }
1842
1843 #[test]
1844 fn unsupported_underlying_rejects() {
1845 let oracle = DatabentoVolOracle::new(config_for_gold());
1846 let err = oracle
1847 .get_iv("BTC", 100_000.0, expiry_30d())
1848 .expect_err("BTC is not our underlying");
1849 assert!(matches!(err, VolLookupError::UnsupportedUnderlying { .. }));
1850 }
1851
1852 #[test]
1853 fn staleness_threshold_gates_readiness() {
1854 let mut config = config_for_gold();
1857 config.staleness_threshold = Duration::from_millis(10);
1858 let oracle = DatabentoVolOracle::new(config);
1859
1860 let ancient = 100_000_000i64;
1862 oracle.ingest(
1863 FeedRecord::Definition {
1864 instrument_id: 1,
1865 kind: InstrumentKind::Future {
1866 root: "GC".to_string(),
1867 month_code: 'M',
1868 year_digit: 6,
1869 },
1870 },
1871 ancient,
1872 );
1873 oracle.ingest(
1874 FeedRecord::Mbp1 {
1875 instrument_id: 1,
1876 bbo: Bbo {
1877 bid: 4699.5,
1878 bid_size: 5,
1879 ask: 4700.5,
1880 ask_size: 5,
1881 ts_event_ms: ancient,
1882 },
1883 },
1884 ancient,
1885 );
1886 let exp = expiry_30d();
1887 oracle.ingest(
1888 FeedRecord::Definition {
1889 instrument_id: 10,
1890 kind: InstrumentKind::Option {
1891 root: "OG".to_string(),
1892 month_code: 'M',
1893 year_digit: 6,
1894 side: CmeOptionSide::Call,
1895 strike: 4700.0,
1896 expiry_ts: exp,
1897 },
1898 },
1899 ancient,
1900 );
1901 oracle.ingest(option_mbp1(10, 4700.0, 4700.0, 0.3), ancient);
1902
1903 let err = oracle
1904 .get_iv("GOLD", 4700.0, exp)
1905 .expect_err("ancient surface should be stale");
1906 assert!(matches!(err, VolLookupError::StaleSurface { .. }));
1907 }
1908
1909 #[test]
1910 fn error_records_track_last_error_field() {
1911 let oracle = DatabentoVolOracle::new(config_for_gold());
1912 oracle.ingest(
1913 FeedRecord::Error("connection reset by peer".to_string()),
1914 now_ms(),
1915 );
1916 let status = oracle.statuses();
1917 assert_eq!(
1918 status[0].last_error.as_deref(),
1919 Some("connection reset by peer")
1920 );
1921 }
1922
1923 #[tokio::test]
1924 async fn run_drains_a_fake_feed() {
1925 use chrono::Utc;
1931 let oracle = DatabentoVolOracle::new(config_for_gold());
1932 let real_now_ms = Utc::now().timestamp_millis();
1933 let real_exp = Utc::now().timestamp() + 30 * 86_400;
1934
1935 let forward = 4700.0;
1936 let strike = 4700.0;
1937 let sigma = 0.33;
1938 let tau = 30.0 / 365.0;
1939 let mid = black_76_price(&OptionType::Call, forward, strike, tau, 0.05, sigma);
1940
1941 let feed = FakeFeed::new(vec![
1942 FeedRecord::Definition {
1943 instrument_id: 1,
1944 kind: InstrumentKind::Future {
1945 root: "GC".to_string(),
1946 month_code: 'M',
1947 year_digit: 6,
1948 },
1949 },
1950 FeedRecord::Mbp1 {
1951 instrument_id: 1,
1952 bbo: Bbo {
1953 bid: forward - 0.5,
1954 bid_size: 5,
1955 ask: forward + 0.5,
1956 ask_size: 5,
1957 ts_event_ms: real_now_ms,
1958 },
1959 },
1960 FeedRecord::Definition {
1961 instrument_id: 10,
1962 kind: InstrumentKind::Option {
1963 root: "OG".to_string(),
1964 month_code: 'M',
1965 year_digit: 6,
1966 side: CmeOptionSide::Call,
1967 strike,
1968 expiry_ts: real_exp,
1969 },
1970 },
1971 FeedRecord::Mbp1 {
1972 instrument_id: 10,
1973 bbo: Bbo {
1974 bid: mid - 1.0,
1975 bid_size: 5,
1976 ask: mid + 1.0,
1977 ask_size: 5,
1978 ts_event_ms: real_now_ms,
1979 },
1980 },
1981 FeedRecord::EndOfStream,
1982 ]);
1983
1984 oracle.run(feed).await;
1985
1986 let status = oracle.statuses();
1987 assert!(
1988 status[0].ready,
1989 "feed should have populated a ready surface"
1990 );
1991 assert_eq!(status[0].surface_points, 1);
1992 }
1993}
1994
1995use databento::dbn::{InstrumentClass, InstrumentDefMsg, Mbp1Msg, SType, Schema};
2000use databento::live::Subscription as DatabentoSubscription;
2001use databento::{LiveClient, Symbols};
2002use time::{Duration as TimeDuration, OffsetDateTime};
2003
2004const DATABENTO_MBP1_REPLAY_MINUTES: i64 = 5;
2007
2008const DATABENTO_DEFINITION_DRAIN_IDLE_SECS: u64 = 3;
2013
2014pub struct DatabentoLiveFeed {
2029 rx: tokio::sync::mpsc::UnboundedReceiver<FeedRecord>,
2030 _def_task: tokio::task::JoinHandle<()>,
2035 _mbp1_task: tokio::task::JoinHandle<()>,
2036}
2037
2038impl DatabentoLiveFeed {
2039 pub async fn connect(api_key: &str, dataset: &str) -> anyhow::Result<Self> {
2043 let symbols = Symbols::Symbols(vec!["GC.FUT".to_string(), "OG.OPT".to_string()]);
2044
2045 let mut def_client = LiveClient::builder()
2047 .key(api_key)?
2048 .dataset(dataset.to_string())
2049 .build()
2050 .await?;
2051 def_client
2052 .subscribe(
2053 DatabentoSubscription::builder()
2054 .symbols(symbols.clone())
2055 .schema(Schema::Definition)
2056 .stype_in(SType::Parent)
2057 .start(OffsetDateTime::UNIX_EPOCH)
2058 .build(),
2059 )
2060 .await?;
2061 def_client.start().await?;
2062
2063 let mbp1_start =
2065 OffsetDateTime::now_utc() - TimeDuration::minutes(DATABENTO_MBP1_REPLAY_MINUTES);
2066 let mut mbp1_client = LiveClient::builder()
2067 .key(api_key)?
2068 .dataset(dataset.to_string())
2069 .build()
2070 .await?;
2071 mbp1_client
2072 .subscribe(
2073 DatabentoSubscription::builder()
2074 .symbols(symbols)
2075 .schema(Schema::Mbp1)
2076 .stype_in(SType::Parent)
2077 .start(mbp1_start)
2078 .build(),
2079 )
2080 .await?;
2081 mbp1_client.start().await?;
2082
2083 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
2084
2085 let def_tx = tx.clone();
2090 let def_task = tokio::spawn(Self::run_definition_reader(def_client, def_tx));
2091
2092 let mbp1_tx = tx;
2094 let mbp1_task = tokio::spawn(Self::run_mbp1_reader(mbp1_client, mbp1_tx));
2095
2096 Ok(Self {
2097 rx,
2098 _def_task: def_task,
2099 _mbp1_task: mbp1_task,
2100 })
2101 }
2102
2103 async fn run_definition_reader(
2106 mut client: LiveClient,
2107 tx: tokio::sync::mpsc::UnboundedSender<FeedRecord>,
2108 ) {
2109 let idle = std::time::Duration::from_secs(DATABENTO_DEFINITION_DRAIN_IDLE_SECS);
2110 let poll = std::time::Duration::from_millis(500);
2111 let mut seen = 0u64;
2112 let mut sent = 0u64;
2113 let mut last_activity = std::time::Instant::now();
2114 loop {
2115 match tokio::time::timeout(poll, client.next_record()).await {
2116 Err(_) => {
2117 if seen > 0 && last_activity.elapsed() > idle {
2121 tracing::info!(
2122 defs_seen = seen,
2123 defs_sent = sent,
2124 "Databento definition snapshot drained"
2125 );
2126 break;
2127 }
2128 }
2129 Ok(Ok(Some(rec))) => {
2130 seen += 1;
2131 last_activity = std::time::Instant::now();
2132 if let Some(def) = rec.get::<InstrumentDefMsg>() {
2133 if let Some(kind) = Self::definition_to_kind(def) {
2134 sent += 1;
2135 if tx
2136 .send(FeedRecord::Definition {
2137 instrument_id: def.hd.instrument_id as InstrumentId,
2138 kind,
2139 })
2140 .is_err()
2141 {
2142 tracing::info!(
2143 "Databento def receiver dropped; stopping def reader"
2144 );
2145 return;
2146 }
2147 }
2148 }
2149 }
2150 Ok(Ok(None)) => {
2151 tracing::info!(defs_seen = seen, "Databento def stream ended");
2152 break;
2153 }
2154 Ok(Err(e)) => {
2155 tracing::error!(error = %e, "Databento def client error");
2156 let _ = tx.send(FeedRecord::Error(format!("databento def: {e}")));
2157 break;
2158 }
2159 }
2160 }
2161 }
2162
2163 async fn run_mbp1_reader(
2166 mut client: LiveClient,
2167 tx: tokio::sync::mpsc::UnboundedSender<FeedRecord>,
2168 ) {
2169 loop {
2170 match client.next_record().await {
2171 Ok(Some(rec)) => {
2172 if let Some(msg) = rec.get::<Mbp1Msg>() {
2173 let Some(bbo) = Self::mbp1_to_bbo(msg) else {
2174 continue;
2175 };
2176 if tx
2177 .send(FeedRecord::Mbp1 {
2178 instrument_id: msg.hd.instrument_id as InstrumentId,
2179 bbo,
2180 })
2181 .is_err()
2182 {
2183 tracing::info!("Databento mbp1 receiver dropped; stopping mbp1 reader");
2184 return;
2185 }
2186 }
2187 }
2188 Ok(None) => {
2189 tracing::warn!("Databento mbp1 stream ended");
2190 let _ = tx.send(FeedRecord::EndOfStream);
2191 return;
2192 }
2193 Err(e) => {
2194 tracing::error!(error = %e, "Databento mbp1 client error");
2195 let _ = tx.send(FeedRecord::Error(format!("databento mbp1: {e}")));
2196 return;
2197 }
2198 }
2199 }
2200 }
2201
2202 fn definition_to_kind(def: &InstrumentDefMsg) -> Option<InstrumentKind> {
2206 let class = def.instrument_class().ok()?;
2207 let asset = def.asset().ok()?.to_string();
2208
2209 match class {
2210 InstrumentClass::Future => {
2211 if asset != "GC" {
2212 return None;
2213 }
2214 Some(InstrumentKind::Future {
2218 root: asset,
2219 month_code: '?',
2220 year_digit: 0,
2221 })
2222 }
2223 InstrumentClass::Call | InstrumentClass::Put => {
2224 if asset != "OG" {
2231 return None;
2232 }
2233 let side = if matches!(class, InstrumentClass::Call) {
2234 CmeOptionSide::Call
2235 } else {
2236 CmeOptionSide::Put
2237 };
2238 let strike = def.strike_price as f64 / 1e9;
2239 if !strike.is_finite() || strike <= 0.0 {
2240 return None;
2241 }
2242 let expiry_ts = (def.expiration / 1_000_000_000) as i64;
2244 if expiry_ts <= 0 {
2245 return None;
2246 }
2247 Some(InstrumentKind::Option {
2248 root: "OG".to_string(),
2249 month_code: '?',
2250 year_digit: 0,
2251 side,
2252 strike,
2253 expiry_ts,
2254 })
2255 }
2256 _ => None,
2257 }
2258 }
2259
2260 fn mbp1_to_bbo(msg: &Mbp1Msg) -> Option<Bbo> {
2261 let level = msg.levels.first()?;
2262 let bid = level.bid_px as f64 / 1e9;
2263 let ask = level.ask_px as f64 / 1e9;
2264 if !bid.is_finite() || !ask.is_finite() {
2265 return None;
2266 }
2267 Some(Bbo {
2268 bid,
2269 bid_size: level.bid_sz as u64,
2270 ask,
2271 ask_size: level.ask_sz as u64,
2272 ts_event_ms: (msg.ts_recv / 1_000_000) as i64,
2273 })
2274 }
2275}
2276
2277#[async_trait]
2278impl DatabentoFeed for DatabentoLiveFeed {
2279 async fn next_record(&mut self) -> Option<FeedRecord> {
2280 self.rx.recv().await
2285 }
2286}
2287
2288const RECONNECT_BASE_DELAY: std::time::Duration = std::time::Duration::from_secs(5);
2290const RECONNECT_MAX_DELAY: std::time::Duration = std::time::Duration::from_secs(300);
2292
2293pub fn spawn_databento_live_feed_task(
2297 oracle: Arc<DatabentoVolOracle>,
2298 api_key: String,
2299 dataset: String,
2300) {
2301 tokio::spawn(async move {
2302 let mut attempt: u32 = 0;
2303 loop {
2304 match DatabentoLiveFeed::connect(&api_key, &dataset).await {
2305 Ok(feed) => {
2306 let connected_at = tokio::time::Instant::now();
2307 tracing::info!(
2308 dataset = %dataset,
2309 "Databento live feed connected — running oracle loop"
2310 );
2311 oracle.run(feed).await;
2312 if connected_at.elapsed() > std::time::Duration::from_secs(60) {
2313 attempt = 0;
2314 }
2315 let msg = "Databento live feed run loop exited — will reconnect";
2316 tracing::warn!("{msg}");
2317 oracle.record_external_error(msg.to_string());
2318 }
2319 Err(err) => {
2320 let msg = format!("Failed to connect Databento live feed ({dataset}): {err}");
2321 tracing::error!(error = %msg, attempt, "Databento connect failed — will retry");
2322 oracle.record_external_error(msg);
2323 }
2324 }
2325
2326 let delay = RECONNECT_BASE_DELAY.saturating_mul(2u32.saturating_pow(attempt.min(6)));
2327 let delay = delay.min(RECONNECT_MAX_DELAY);
2328 tracing::info!(
2329 dataset = %dataset,
2330 delay_secs = delay.as_secs(),
2331 attempt,
2332 "Databento feed reconnecting after backoff"
2333 );
2334 tokio::time::sleep(delay).await;
2335 attempt = attempt.saturating_add(1);
2336 }
2337 });
2338}
2339
2340#[cfg(test)]
2341mod symbology_tests {
2342 use super::*;
2343
2344 #[test]
2345 fn month_codes_map_to_numbers() {
2346 let pairs = [
2347 ('F', 1),
2348 ('G', 2),
2349 ('H', 3),
2350 ('J', 4),
2351 ('K', 5),
2352 ('M', 6),
2353 ('N', 7),
2354 ('Q', 8),
2355 ('U', 9),
2356 ('V', 10),
2357 ('X', 11),
2358 ('Z', 12),
2359 ];
2360 for (code, expected) in pairs {
2361 assert_eq!(cme_month_code_to_number(code), Some(expected), "{code}");
2362 }
2363 assert_eq!(cme_month_code_to_number('A'), None);
2364 assert_eq!(cme_month_code_to_number('I'), None); }
2366
2367 #[test]
2368 fn parses_gcm6_as_gold_june_2026_future() {
2369 let parsed = CmeFutureSymbol::parse("GCM6").expect("GCM6 should parse");
2370 assert_eq!(
2371 parsed,
2372 CmeFutureSymbol {
2373 root: "GC".to_string(),
2374 month_code: 'M',
2375 year_digit: 6,
2376 }
2377 );
2378 }
2379
2380 #[test]
2381 fn parses_ogm6_c_2400_as_june_2026_gold_call_at_2400() {
2382 let parsed = CmeOptionSymbol::parse("OGM6 C 2400").expect("OGM6 C 2400 should parse");
2383 assert_eq!(
2384 parsed,
2385 CmeOptionSymbol {
2386 root: "OG".to_string(),
2387 month_code: 'M',
2388 year_digit: 6,
2389 side: CmeOptionSide::Call,
2390 strike: 2400.0,
2391 }
2392 );
2393 }
2394
2395 #[test]
2396 fn parses_put_side() {
2397 let parsed = CmeOptionSymbol::parse("OGM6 P 4800").expect("put should parse");
2398 assert_eq!(parsed.side, CmeOptionSide::Put);
2399 assert_eq!(parsed.strike, 4800.0);
2400 }
2401
2402 #[test]
2403 fn parses_fractional_strike() {
2404 let parsed = CmeOptionSymbol::parse("OGM6 C 4700.5").expect("fractional should parse");
2406 assert_eq!(parsed.strike, 4700.5);
2407 }
2408
2409 #[test]
2410 fn parses_future_with_extra_whitespace() {
2411 assert_eq!(
2412 CmeFutureSymbol::parse(" GCM6 ").unwrap().root,
2413 "GC".to_string()
2414 );
2415 }
2416
2417 #[test]
2418 fn rejects_wrong_length_future() {
2419 assert!(CmeFutureSymbol::parse("").is_err());
2420 assert!(CmeFutureSymbol::parse("GC").is_err());
2421 assert!(CmeFutureSymbol::parse("GCM62").is_err());
2422 }
2423
2424 #[test]
2425 fn rejects_lowercase_root() {
2426 assert!(CmeFutureSymbol::parse("gcM6").is_err());
2427 assert!(CmeOptionSymbol::parse("ogM6 C 2400").is_err());
2428 }
2429
2430 #[test]
2431 fn rejects_invalid_month_code() {
2432 assert!(CmeFutureSymbol::parse("GCI6").is_err());
2434 assert!(CmeOptionSymbol::parse("OGI6 C 2400").is_err());
2435 }
2436
2437 #[test]
2438 fn rejects_non_digit_year() {
2439 assert!(CmeFutureSymbol::parse("GCMX").is_err());
2440 }
2441
2442 #[test]
2443 fn rejects_non_cp_side() {
2444 assert!(CmeOptionSymbol::parse("OGM6 X 2400").is_err());
2445 }
2446
2447 #[test]
2448 fn rejects_non_positive_strike() {
2449 assert!(CmeOptionSymbol::parse("OGM6 C 0").is_err());
2450 assert!(CmeOptionSymbol::parse("OGM6 C -100").is_err());
2451 assert!(CmeOptionSymbol::parse("OGM6 C not_a_number").is_err());
2452 }
2453
2454 #[test]
2455 fn rejects_option_missing_parts() {
2456 assert!(CmeOptionSymbol::parse("OGM6 C").is_err());
2457 assert!(CmeOptionSymbol::parse("OGM6").is_err());
2458 assert!(CmeOptionSymbol::parse("").is_err());
2459 }
2460}