1use std::collections::{HashMap, VecDeque};
2use std::sync::Arc;
3use std::time::Duration;
4
5use chrono::Utc;
6use metrics::counter;
7use rust_decimal::prelude::FromPrimitive;
8use rust_decimal::Decimal;
9use tokio::time::{interval_at, Instant, MissedTickBehavior};
10use tracing::{error, info, warn};
11
12use crate::read_cache::instruments_registry::InstrumentsCache;
13use hypercall_db::{AnalyticsWriter, BboReferenceData, NewBboSnapshotInput};
14use hypercall_runtime_api::{QuoteProvider, SnapshotBookQuote};
15
16const SECONDS_PER_DAY: i64 = 86_400;
17
18const CACHE_RETENTION_SECS: i64 = 25 * 3600; #[derive(Debug, Clone)]
23pub struct BboSnapshotTaskConfig {
24 pub interval_secs: u64,
25 pub retention_days: i64,
26}
27
28impl Default for BboSnapshotTaskConfig {
29 fn default() -> Self {
30 Self {
31 interval_secs: 300,
32 retention_days: 7,
33 }
34 }
35}
36
37impl BboSnapshotTaskConfig {
38 pub fn from_config(config: &crate::backend_config::BboSnapshotRuntimeConfig) -> Self {
39 let interval_secs = config.interval_secs;
40 let retention_days = config.retention_days;
41 assert!(
42 interval_secs > 0,
43 "background_tasks.bbo_snapshot.interval_secs must be > 0"
44 );
45 assert!(
46 retention_days > 0,
47 "background_tasks.bbo_snapshot.retention_days must be > 0"
48 );
49
50 Self {
51 interval_secs,
52 retention_days,
53 }
54 }
55
56 fn retention_secs(&self) -> i64 {
57 self.retention_days
58 .checked_mul(SECONDS_PER_DAY)
59 .expect("BBO snapshot retention days overflowed seconds conversion")
60 }
61}
62
63#[derive(Clone)]
64pub struct BboSnapshotService {
65 instruments_cache: Arc<InstrumentsCache>,
66 quote_provider: Arc<dyn QuoteProvider>,
67 db: Arc<dyn AnalyticsWriter>,
68 config: BboSnapshotTaskConfig,
69 in_memory_snapshots: Arc<tokio::sync::RwLock<HashMap<String, VecDeque<CachedBboSnapshot>>>>,
70}
71
72#[derive(Debug, Clone)]
73struct CachedBboSnapshot {
74 snapshot_ts: i64,
75 best_ask: Decimal,
76}
77
78impl BboSnapshotService {
79 pub fn new(
80 instruments_cache: Arc<InstrumentsCache>,
81 quote_provider: Arc<dyn QuoteProvider>,
82 db: Arc<dyn AnalyticsWriter>,
83 config: BboSnapshotTaskConfig,
84 ) -> Self {
85 Self {
86 instruments_cache,
87 quote_provider,
88 db,
89 config,
90 in_memory_snapshots: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
91 }
92 }
93
94 pub async fn bootstrap_from_db(&self) {
97 let cutoff_ts = Utc::now().timestamp() - CACHE_RETENTION_SECS;
98 match self.db.load_bbo_snapshots_since(cutoff_ts).await {
99 Ok(rows) => {
100 let mut cache = self.in_memory_snapshots.write().await;
101 for row in &rows {
102 let series = cache
103 .entry(row.symbol.clone())
104 .or_insert_with(VecDeque::new);
105 series.push_back(CachedBboSnapshot {
106 snapshot_ts: row.snapshot_ts,
107 best_ask: row.best_ask,
108 });
109 }
110 info!(
111 symbols = cache.len(),
112 rows = rows.len(),
113 "BBO snapshot cache bootstrapped from DB"
114 );
115 }
116 Err(e) => {
117 warn!(
118 error = %e,
119 "Failed to bootstrap BBO cache from DB; price_change will be null until snapshots accumulate"
120 );
121 }
122 }
123 }
124
125 pub async fn seed_snapshots(&self, snapshots: &[NewBboSnapshotInput]) {
128 let mut cache = self.in_memory_snapshots.write().await;
129 for snapshot in snapshots {
130 let series = cache
131 .entry(snapshot.symbol.clone())
132 .or_insert_with(VecDeque::new);
133
134 let pos = series
136 .binary_search_by_key(&snapshot.snapshot_ts, |s| s.snapshot_ts)
137 .unwrap_or_else(|pos| pos);
138
139 if pos < series.len() && series[pos].snapshot_ts == snapshot.snapshot_ts {
140 series[pos].best_ask = snapshot.best_ask;
142 } else {
143 series.insert(
144 pos,
145 CachedBboSnapshot {
146 snapshot_ts: snapshot.snapshot_ts,
147 best_ask: snapshot.best_ask,
148 },
149 );
150 }
151 }
152 }
153
154 pub async fn get_reference_asks(
155 &self,
156 symbols: &[String],
157 cutoff_ts: i64,
158 ) -> HashMap<String, BboReferenceData> {
159 if symbols.is_empty() {
160 return HashMap::new();
161 }
162
163 let snapshots = self.in_memory_snapshots.read().await;
164 build_reference_asks_from_cache(&snapshots, symbols, cutoff_ts)
165 }
166
167 pub fn start_with_shutdown(
168 self: Arc<Self>,
169 mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
170 ) -> tokio::task::JoinHandle<()> {
171 tokio::spawn(async move {
172 info!(
173 "Starting BBO snapshot service (interval_secs={}, retention_days={})",
174 self.config.interval_secs, self.config.retention_days
175 );
176
177 self.bootstrap_from_db().await;
178
179 let mut ticker = aligned_interval_secs(self.config.interval_secs);
180 let mut consecutive_write_failures = 0_u64;
181 let mut consecutive_cleanup_failures = 0_u64;
182
183 loop {
184 tokio::select! {
185 _ = shutdown_rx.recv() => {
186 info!("BBO snapshot service received shutdown signal");
187 break;
188 }
189 _ = ticker.tick() => {
190 let now_secs = Utc::now().timestamp();
191 self.capture_and_persist(now_secs, &mut consecutive_write_failures).await;
192 self.cleanup_retention(now_secs, &mut consecutive_cleanup_failures).await;
193 }
194 }
195 }
196
197 info!("BBO snapshot service stopped");
198 })
199 }
200
201 async fn capture_and_persist(&self, now_secs: i64, consecutive_failures: &mut u64) {
202 let snapshot_ts = align_timestamp_secs(now_secs, self.config.interval_secs);
203 let active_symbols: Vec<String> = self
204 .instruments_cache
205 .get_all()
206 .await
207 .into_iter()
208 .filter(|inst| inst.status.is_active())
209 .map(|inst| inst.id)
210 .collect();
211
212 counter!("ht_bbo_snapshot_rows_total", "status" => "attempted")
213 .increment(active_symbols.len() as u64);
214
215 let snapshots = build_snapshot_rows(&active_symbols, self.quote_provider.as_ref());
216 let skipped = active_symbols.len().saturating_sub(snapshots.len());
217 counter!("ht_bbo_snapshot_rows_total", "status" => "skipped_invalid")
218 .increment(skipped as u64);
219
220 if snapshots.is_empty() {
221 return;
222 }
223
224 match self.db.upsert_bbo_snapshots(snapshot_ts, &snapshots).await {
227 Ok(written_rows) => {
228 *consecutive_failures = 0;
229 counter!("ht_bbo_snapshot_rows_total", "status" => "written")
230 .increment(written_rows as u64);
231
232 self.update_in_memory_snapshots(snapshot_ts, &snapshots)
233 .await;
234 }
235 Err(e) => {
236 *consecutive_failures += 1;
237 counter!("ht_bbo_snapshot_rows_total", "status" => "write_error").increment(1);
238 if *consecutive_failures <= 3 || (*consecutive_failures).is_multiple_of(10) {
239 error!(
240 failures = *consecutive_failures,
241 snapshot_ts,
242 error = %e,
243 "Failed to persist BBO snapshot batch"
244 );
245 }
246 }
247 }
248 }
249
250 async fn cleanup_retention(&self, now_secs: i64, consecutive_failures: &mut u64) {
251 let cutoff_ts = now_secs - self.config.retention_secs();
252 let cache_cutoff = now_secs - CACHE_RETENTION_SECS;
254 self.prune_in_memory_snapshots(cache_cutoff).await;
255 match self.db.delete_bbo_snapshots_older_than(cutoff_ts).await {
256 Ok(deleted_rows) => {
257 *consecutive_failures = 0;
258 counter!("ht_bbo_snapshot_retention_rows_deleted_total")
259 .increment(deleted_rows as u64);
260 }
261 Err(e) => {
262 *consecutive_failures += 1;
263 counter!("ht_bbo_snapshot_retention_cleanup_total", "status" => "error")
264 .increment(1);
265 if *consecutive_failures <= 3 || (*consecutive_failures).is_multiple_of(10) {
266 error!(
267 failures = *consecutive_failures,
268 cutoff_ts,
269 error = %e,
270 "Failed to cleanup BBO snapshots retention"
271 );
272 }
273 }
274 }
275 }
276
277 async fn update_in_memory_snapshots(
278 &self,
279 snapshot_ts: i64,
280 snapshots: &[NewBboSnapshotInput],
281 ) {
282 let mut cache = self.in_memory_snapshots.write().await;
283 for snapshot in snapshots {
284 let series = cache
285 .entry(snapshot.symbol.clone())
286 .or_insert_with(VecDeque::new);
287
288 match series.back() {
289 Some(last) if last.snapshot_ts == snapshot_ts => {
290 series.back_mut().unwrap().best_ask = snapshot.best_ask;
292 }
293 Some(last) if last.snapshot_ts > snapshot_ts => {
294 let pos = series
296 .binary_search_by_key(&snapshot_ts, |s| s.snapshot_ts)
297 .unwrap_or_else(|pos| pos);
298 series.insert(
299 pos,
300 CachedBboSnapshot {
301 snapshot_ts,
302 best_ask: snapshot.best_ask,
303 },
304 );
305 }
306 _ => {
307 series.push_back(CachedBboSnapshot {
308 snapshot_ts,
309 best_ask: snapshot.best_ask,
310 });
311 }
312 }
313 }
314 }
315
316 async fn prune_in_memory_snapshots(&self, cutoff_ts: i64) {
317 let mut cache = self.in_memory_snapshots.write().await;
318 cache.retain(|_, series| {
319 while series
320 .front()
321 .is_some_and(|snapshot| snapshot.snapshot_ts < cutoff_ts)
322 {
323 let _ = series.pop_front();
324 }
325 !series.is_empty()
326 });
327 }
328}
329
330fn build_reference_asks_from_cache(
331 snapshots: &HashMap<String, VecDeque<CachedBboSnapshot>>,
332 symbols: &[String],
333 cutoff_ts: i64,
334) -> HashMap<String, BboReferenceData> {
335 let mut references = HashMap::new();
336
337 for symbol in symbols {
338 if let Some(series) = snapshots.get(symbol) {
339 if series.is_empty() {
340 continue;
341 }
342
343 let partition = series.partition_point(|entry| entry.snapshot_ts <= cutoff_ts);
347 let latest_before = if partition > 0 {
348 Some(&series[partition - 1])
349 } else {
350 None
351 };
352 if let Some(row) = latest_before {
357 references.insert(
358 symbol.clone(),
359 BboReferenceData {
360 reference_ask: row.best_ask,
361 reference_ts: row.snapshot_ts,
362 used_earliest_fallback: false,
363 },
364 );
365 }
366 }
367 }
368
369 references
370}
371
372fn build_snapshot_rows(
373 symbols: &[String],
374 quote_provider: &dyn QuoteProvider,
375) -> Vec<NewBboSnapshotInput> {
376 symbols
377 .iter()
378 .filter_map(|symbol| {
379 quote_provider
380 .get_quote(symbol)
381 .and_then(|quote| snapshot_row_from_quote(symbol, "e))
382 })
383 .collect()
384}
385
386fn snapshot_row_from_quote(symbol: &str, quote: &SnapshotBookQuote) -> Option<NewBboSnapshotInput> {
387 let best_bid = quote.best_bid?;
388 let best_ask = quote.best_ask?;
389 if best_bid <= 0.0 || best_ask <= 0.0 {
390 return None;
391 }
392
393 let best_bid_dec = Decimal::from_f64(best_bid)?;
394 let best_ask_dec = Decimal::from_f64(best_ask)?;
395
396 let best_bid_size = quote
397 .best_bid_size
398 .filter(|size| *size > 0.0)
399 .and_then(Decimal::from_f64);
400 let best_ask_size = quote
401 .best_ask_size
402 .filter(|size| *size > 0.0)
403 .and_then(Decimal::from_f64);
404
405 Some(NewBboSnapshotInput {
406 symbol: symbol.to_string(),
407 best_bid: best_bid_dec,
408 best_ask: best_ask_dec,
409 best_bid_size,
410 best_ask_size,
411 snapshot_ts: 0,
412 })
413}
414
415fn aligned_interval_secs(interval_secs: u64) -> tokio::time::Interval {
416 let now_secs = Utc::now().timestamp();
417 let next_secs = next_bucket_start_secs(now_secs, interval_secs);
418 let delay_secs = (next_secs - now_secs).max(0) as u64;
419
420 let start_at = Instant::now() + Duration::from_secs(delay_secs);
421 let mut ticker = interval_at(start_at, Duration::from_secs(interval_secs));
422 ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
423 ticker
424}
425
426fn align_timestamp_secs(timestamp_secs: i64, interval_secs: u64) -> i64 {
427 let interval = i64::try_from(interval_secs).expect("interval_secs exceeded i64::MAX");
428 timestamp_secs - (timestamp_secs % interval)
429}
430
431fn next_bucket_start_secs(timestamp_secs: i64, interval_secs: u64) -> i64 {
432 align_timestamp_secs(timestamp_secs, interval_secs)
433 + i64::try_from(interval_secs).expect("interval_secs exceeded i64::MAX")
434}
435
436#[async_trait::async_trait]
437impl hypercall_api::caches::options_summary::BboReferenceAskReader for BboSnapshotService {
438 async fn get_reference_asks(
439 &self,
440 symbols: &[String],
441 cutoff_ts: i64,
442 ) -> HashMap<String, BboReferenceData> {
443 self.get_reference_asks(symbols, cutoff_ts).await
444 }
445}
446
447#[async_trait::async_trait]
448impl crate::shared::service::Service for BboSnapshotService {
449 fn name(&self) -> &'static str {
450 "BboSnapshotService"
451 }
452
453 fn owner(&self) -> crate::shared::service::ServiceOwner {
454 crate::shared::service::ServiceOwner::Api
455 }
456
457 async fn run(
458 self: std::sync::Arc<Self>,
459 shutdown: crate::shared::ShutdownRx,
460 ) -> anyhow::Result<()> {
461 self.start_with_shutdown(shutdown)
462 .await
463 .map_err(|e| anyhow::anyhow!("BboSnapshotService task failed: {:?}", e))
464 }
465}
466
467#[cfg(test)]
468mod tests {
469 use super::*;
470 use crate::rsm::engine_snapshot::MockQuoteProvider;
471 use hypercall_runtime_api::SnapshotBookQuote;
472 use std::collections::VecDeque;
473
474 #[test]
475 fn test_align_timestamp_secs() {
476 let ts = 1_700_000_123_i64;
477 let aligned = align_timestamp_secs(ts, 300);
478 assert_eq!(aligned % 300, 0);
479 assert!(aligned <= ts);
480 assert!((ts - aligned) < 300);
481 }
482
483 #[test]
484 fn test_build_snapshot_rows_filters_invalid_quotes() {
485 let quote_provider = MockQuoteProvider::new();
486 quote_provider.set_quote(
487 "BTC-VALID",
488 SnapshotBookQuote {
489 best_bid: Some(100.0),
490 best_bid_size: Some(1.0),
491 best_ask: Some(101.0),
492 best_ask_size: Some(2.0),
493 mid: Some(100.5),
494 bids: vec![(100.0, 1.0)],
495 asks: vec![(101.0, 2.0)],
496 },
497 );
498 quote_provider.set_quote(
499 "BTC-NO-BID",
500 SnapshotBookQuote {
501 best_bid: None,
502 best_bid_size: None,
503 best_ask: Some(101.0),
504 best_ask_size: Some(2.0),
505 mid: Some(101.0),
506 bids: vec![],
507 asks: vec![(101.0, 2.0)],
508 },
509 );
510 quote_provider.set_quote(
511 "BTC-NON-POSITIVE",
512 SnapshotBookQuote {
513 best_bid: Some(0.0),
514 best_bid_size: Some(1.0),
515 best_ask: Some(101.0),
516 best_ask_size: Some(2.0),
517 mid: Some(50.5),
518 bids: vec![(0.0, 1.0)],
519 asks: vec![(101.0, 2.0)],
520 },
521 );
522
523 let symbols = vec![
524 "BTC-VALID".to_string(),
525 "BTC-NO-BID".to_string(),
526 "BTC-NON-POSITIVE".to_string(),
527 "BTC-MISSING".to_string(),
528 ];
529
530 let rows = build_snapshot_rows(&symbols, "e_provider);
531 assert_eq!(rows.len(), 1);
532 assert_eq!(rows[0].symbol, "BTC-VALID");
533 assert_eq!(rows[0].best_bid.to_string(), "100");
534 assert_eq!(rows[0].best_ask.to_string(), "101");
535 }
536
537 #[test]
538 fn test_retention_secs_default() {
539 let cfg = BboSnapshotTaskConfig::default();
540 assert_eq!(cfg.retention_secs(), 7 * SECONDS_PER_DAY);
541 }
542
543 #[tokio::test]
544 async fn test_get_reference_asks_uses_latest_before_cutoff_skips_too_new() {
545 let mut snapshots = HashMap::new();
546 snapshots.insert(
547 "BTC-1".to_string(),
548 VecDeque::from(vec![
549 CachedBboSnapshot {
550 snapshot_ts: 100,
551 best_ask: Decimal::from(10),
552 },
553 CachedBboSnapshot {
554 snapshot_ts: 200,
555 best_ask: Decimal::from(20),
556 },
557 CachedBboSnapshot {
558 snapshot_ts: 300,
559 best_ask: Decimal::from(30),
560 },
561 ]),
562 );
563 snapshots.insert(
566 "ETH-1".to_string(),
567 VecDeque::from(vec![CachedBboSnapshot {
568 snapshot_ts: 500,
569 best_ask: Decimal::from(50),
570 }]),
571 );
572
573 let refs = build_reference_asks_from_cache(
574 &snapshots,
575 &["BTC-1".to_string(), "ETH-1".to_string()],
576 250,
577 );
578
579 let btc = refs.get("BTC-1").expect("missing BTC reference");
580 assert_eq!(btc.reference_ts, 200);
581 assert_eq!(btc.reference_ask, Decimal::from(20));
582 assert!(!btc.used_earliest_fallback);
583
584 assert!(
586 !refs.contains_key("ETH-1"),
587 "instruments with no pre-cutoff snapshot should not get a reference"
588 );
589 }
590
591 #[tokio::test]
592 async fn test_exact_cutoff_boundary_returns_that_entry() {
593 let mut snapshots = HashMap::new();
594 snapshots.insert(
595 "SYM".to_string(),
596 VecDeque::from(vec![
597 CachedBboSnapshot {
598 snapshot_ts: 100,
599 best_ask: Decimal::from(10),
600 },
601 CachedBboSnapshot {
602 snapshot_ts: 200,
603 best_ask: Decimal::from(20),
604 },
605 ]),
606 );
607
608 let refs = build_reference_asks_from_cache(&snapshots, &["SYM".to_string()], 200);
610 let entry = refs
611 .get("SYM")
612 .expect("should find entry at exact boundary");
613 assert_eq!(entry.reference_ts, 200);
614 assert_eq!(entry.reference_ask, Decimal::from(20));
615 assert!(!entry.used_earliest_fallback);
616 }
617
618 #[tokio::test]
619 async fn test_duplicate_timestamp_upsert() {
620 let mut snapshots = HashMap::new();
621 snapshots.insert(
622 "SYM".to_string(),
623 VecDeque::from(vec![CachedBboSnapshot {
624 snapshot_ts: 100,
625 best_ask: Decimal::from(10),
626 }]),
627 );
628
629 let service_snapshots = Arc::new(tokio::sync::RwLock::new(snapshots));
631
632 {
634 let mut cache = service_snapshots.write().await;
635 let series = cache.get_mut("SYM").unwrap();
636 assert_eq!(series.back().unwrap().snapshot_ts, 100);
638 series.back_mut().unwrap().best_ask = Decimal::from(99);
639 assert_eq!(series.len(), 1);
640 assert_eq!(series.back().unwrap().best_ask, Decimal::from(99));
641 }
642 }
643
644 #[tokio::test]
645 async fn test_prune_boundary() {
646 let mut snapshots = HashMap::new();
647 snapshots.insert(
648 "SYM".to_string(),
649 VecDeque::from(vec![
650 CachedBboSnapshot {
651 snapshot_ts: 100,
652 best_ask: Decimal::from(10),
653 },
654 CachedBboSnapshot {
655 snapshot_ts: 200,
656 best_ask: Decimal::from(20),
657 },
658 CachedBboSnapshot {
659 snapshot_ts: 300,
660 best_ask: Decimal::from(30),
661 },
662 ]),
663 );
664
665 let cache = Arc::new(tokio::sync::RwLock::new(snapshots));
666
667 {
669 let mut c = cache.write().await;
670 c.retain(|_, series| {
671 while series.front().is_some_and(|s| s.snapshot_ts < 200) {
672 let _ = series.pop_front();
673 }
674 !series.is_empty()
675 });
676 }
677
678 let c = cache.read().await;
679 let series = c.get("SYM").unwrap();
680 assert_eq!(series.len(), 2);
681 assert_eq!(series[0].snapshot_ts, 200); assert_eq!(series[1].snapshot_ts, 300);
683 }
684
685 #[tokio::test]
686 async fn test_seed_snapshots_inserts_in_sorted_order() {
687 let _instruments_cache = Arc::new(InstrumentsCache::new());
688 let _quote_provider: Arc<dyn QuoteProvider> = Arc::new(MockQuoteProvider::new());
689 let cache: Arc<tokio::sync::RwLock<HashMap<String, VecDeque<CachedBboSnapshot>>>> =
693 Arc::new(tokio::sync::RwLock::new(HashMap::new()));
694
695 let snapshots = vec![
697 NewBboSnapshotInput {
698 symbol: "A".to_string(),
699 best_bid: Decimal::from(1),
700 best_ask: Decimal::from(30),
701 best_bid_size: None,
702 best_ask_size: None,
703 snapshot_ts: 300,
704 },
705 NewBboSnapshotInput {
706 symbol: "A".to_string(),
707 best_bid: Decimal::from(1),
708 best_ask: Decimal::from(10),
709 best_bid_size: None,
710 best_ask_size: None,
711 snapshot_ts: 100,
712 },
713 NewBboSnapshotInput {
714 symbol: "A".to_string(),
715 best_bid: Decimal::from(1),
716 best_ask: Decimal::from(20),
717 best_bid_size: None,
718 best_ask_size: None,
719 snapshot_ts: 200,
720 },
721 ];
722
723 {
724 let mut c = cache.write().await;
725 for snapshot in &snapshots {
726 let series = c
727 .entry(snapshot.symbol.clone())
728 .or_insert_with(VecDeque::new);
729 let pos = series
730 .binary_search_by_key(&snapshot.snapshot_ts, |s| s.snapshot_ts)
731 .unwrap_or_else(|pos| pos);
732 if pos < series.len() && series[pos].snapshot_ts == snapshot.snapshot_ts {
733 series[pos].best_ask = snapshot.best_ask;
734 } else {
735 series.insert(
736 pos,
737 CachedBboSnapshot {
738 snapshot_ts: snapshot.snapshot_ts,
739 best_ask: snapshot.best_ask,
740 },
741 );
742 }
743 }
744 }
745
746 let c = cache.read().await;
747 let series = c.get("A").unwrap();
748 assert_eq!(series.len(), 3);
749 assert_eq!(series[0].snapshot_ts, 100);
750 assert_eq!(series[1].snapshot_ts, 200);
751 assert_eq!(series[2].snapshot_ts, 300);
752 }
753}