Skip to main content

hypercall/runtime/tasks/
bbo_snapshot.rs

1use std::collections::{HashMap, VecDeque};
2use std::sync::Arc;
3use std::time::Duration;
4
5use chrono::Utc;
6use metrics::counter;
7use rust_decimal::prelude::FromPrimitive;
8use rust_decimal::Decimal;
9use tokio::time::{interval_at, Instant, MissedTickBehavior};
10use tracing::{error, info, warn};
11
12use crate::read_cache::instruments_registry::InstrumentsCache;
13use hypercall_db::{AnalyticsWriter, BboReferenceData, NewBboSnapshotInput};
14use hypercall_runtime_api::{QuoteProvider, SnapshotBookQuote};
15
16const SECONDS_PER_DAY: i64 = 86_400;
17
18/// We only need ~24h of data for the reference ask lookup, but keep a small
19/// buffer so that edge-of-window queries still resolve correctly.
20const CACHE_RETENTION_SECS: i64 = 25 * 3600; // 25 hours
21
22#[derive(Debug, Clone)]
23pub struct BboSnapshotTaskConfig {
24    pub interval_secs: u64,
25    pub retention_days: i64,
26}
27
28impl Default for BboSnapshotTaskConfig {
29    fn default() -> Self {
30        Self {
31            interval_secs: 300,
32            retention_days: 7,
33        }
34    }
35}
36
37impl BboSnapshotTaskConfig {
38    pub fn from_config(config: &crate::backend_config::BboSnapshotRuntimeConfig) -> Self {
39        let interval_secs = config.interval_secs;
40        let retention_days = config.retention_days;
41        assert!(
42            interval_secs > 0,
43            "background_tasks.bbo_snapshot.interval_secs must be > 0"
44        );
45        assert!(
46            retention_days > 0,
47            "background_tasks.bbo_snapshot.retention_days must be > 0"
48        );
49
50        Self {
51            interval_secs,
52            retention_days,
53        }
54    }
55
56    fn retention_secs(&self) -> i64 {
57        self.retention_days
58            .checked_mul(SECONDS_PER_DAY)
59            .expect("BBO snapshot retention days overflowed seconds conversion")
60    }
61}
62
63#[derive(Clone)]
64pub struct BboSnapshotService {
65    instruments_cache: Arc<InstrumentsCache>,
66    quote_provider: Arc<dyn QuoteProvider>,
67    db: Arc<dyn AnalyticsWriter>,
68    config: BboSnapshotTaskConfig,
69    in_memory_snapshots: Arc<tokio::sync::RwLock<HashMap<String, VecDeque<CachedBboSnapshot>>>>,
70}
71
72#[derive(Debug, Clone)]
73struct CachedBboSnapshot {
74    snapshot_ts: i64,
75    best_ask: Decimal,
76}
77
78impl BboSnapshotService {
79    pub fn new(
80        instruments_cache: Arc<InstrumentsCache>,
81        quote_provider: Arc<dyn QuoteProvider>,
82        db: Arc<dyn AnalyticsWriter>,
83        config: BboSnapshotTaskConfig,
84    ) -> Self {
85        Self {
86            instruments_cache,
87            quote_provider,
88            db,
89            config,
90            in_memory_snapshots: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
91        }
92    }
93
94    /// Bootstrap the in-memory cache from DB so that `price_change` is
95    /// available immediately after restart (no 24h gap).
96    pub async fn bootstrap_from_db(&self) {
97        let cutoff_ts = Utc::now().timestamp() - CACHE_RETENTION_SECS;
98        match self.db.load_bbo_snapshots_since(cutoff_ts).await {
99            Ok(rows) => {
100                let mut cache = self.in_memory_snapshots.write().await;
101                for row in &rows {
102                    let series = cache
103                        .entry(row.symbol.clone())
104                        .or_insert_with(VecDeque::new);
105                    series.push_back(CachedBboSnapshot {
106                        snapshot_ts: row.snapshot_ts,
107                        best_ask: row.best_ask,
108                    });
109                }
110                info!(
111                    symbols = cache.len(),
112                    rows = rows.len(),
113                    "BBO snapshot cache bootstrapped from DB"
114                );
115            }
116            Err(e) => {
117                warn!(
118                    error = %e,
119                    "Failed to bootstrap BBO cache from DB; price_change will be null until snapshots accumulate"
120                );
121            }
122        }
123    }
124
125    /// Directly seed the in-memory cache. Used by integration tests that insert
126    /// BBO rows via `upsert_bbo_snapshots` and need the handler to see them.
127    pub async fn seed_snapshots(&self, snapshots: &[NewBboSnapshotInput]) {
128        let mut cache = self.in_memory_snapshots.write().await;
129        for snapshot in snapshots {
130            let series = cache
131                .entry(snapshot.symbol.clone())
132                .or_insert_with(VecDeque::new);
133
134            // Insert in sorted order using binary search
135            let pos = series
136                .binary_search_by_key(&snapshot.snapshot_ts, |s| s.snapshot_ts)
137                .unwrap_or_else(|pos| pos);
138
139            if pos < series.len() && series[pos].snapshot_ts == snapshot.snapshot_ts {
140                // Duplicate timestamp -- update in place
141                series[pos].best_ask = snapshot.best_ask;
142            } else {
143                series.insert(
144                    pos,
145                    CachedBboSnapshot {
146                        snapshot_ts: snapshot.snapshot_ts,
147                        best_ask: snapshot.best_ask,
148                    },
149                );
150            }
151        }
152    }
153
154    pub async fn get_reference_asks(
155        &self,
156        symbols: &[String],
157        cutoff_ts: i64,
158    ) -> HashMap<String, BboReferenceData> {
159        if symbols.is_empty() {
160            return HashMap::new();
161        }
162
163        let snapshots = self.in_memory_snapshots.read().await;
164        build_reference_asks_from_cache(&snapshots, symbols, cutoff_ts)
165    }
166
167    pub fn start_with_shutdown(
168        self: Arc<Self>,
169        mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
170    ) -> tokio::task::JoinHandle<()> {
171        tokio::spawn(async move {
172            info!(
173                "Starting BBO snapshot service (interval_secs={}, retention_days={})",
174                self.config.interval_secs, self.config.retention_days
175            );
176
177            self.bootstrap_from_db().await;
178
179            let mut ticker = aligned_interval_secs(self.config.interval_secs);
180            let mut consecutive_write_failures = 0_u64;
181            let mut consecutive_cleanup_failures = 0_u64;
182
183            loop {
184                tokio::select! {
185                    _ = shutdown_rx.recv() => {
186                        info!("BBO snapshot service received shutdown signal");
187                        break;
188                    }
189                    _ = ticker.tick() => {
190                        let now_secs = Utc::now().timestamp();
191                        self.capture_and_persist(now_secs, &mut consecutive_write_failures).await;
192                        self.cleanup_retention(now_secs, &mut consecutive_cleanup_failures).await;
193                    }
194                }
195            }
196
197            info!("BBO snapshot service stopped");
198        })
199    }
200
201    async fn capture_and_persist(&self, now_secs: i64, consecutive_failures: &mut u64) {
202        let snapshot_ts = align_timestamp_secs(now_secs, self.config.interval_secs);
203        let active_symbols: Vec<String> = self
204            .instruments_cache
205            .get_all()
206            .await
207            .into_iter()
208            .filter(|inst| inst.status.is_active())
209            .map(|inst| inst.id)
210            .collect();
211
212        counter!("ht_bbo_snapshot_rows_total", "status" => "attempted")
213            .increment(active_symbols.len() as u64);
214
215        let snapshots = build_snapshot_rows(&active_symbols, self.quote_provider.as_ref());
216        let skipped = active_symbols.len().saturating_sub(snapshots.len());
217        counter!("ht_bbo_snapshot_rows_total", "status" => "skipped_invalid")
218            .increment(skipped as u64);
219
220        if snapshots.is_empty() {
221            return;
222        }
223
224        // Write to DB first -- only update in-memory cache on success so that
225        // the cache never contains data that wasn't persisted.
226        match self.db.upsert_bbo_snapshots(snapshot_ts, &snapshots).await {
227            Ok(written_rows) => {
228                *consecutive_failures = 0;
229                counter!("ht_bbo_snapshot_rows_total", "status" => "written")
230                    .increment(written_rows as u64);
231
232                self.update_in_memory_snapshots(snapshot_ts, &snapshots)
233                    .await;
234            }
235            Err(e) => {
236                *consecutive_failures += 1;
237                counter!("ht_bbo_snapshot_rows_total", "status" => "write_error").increment(1);
238                if *consecutive_failures <= 3 || (*consecutive_failures).is_multiple_of(10) {
239                    error!(
240                        failures = *consecutive_failures,
241                        snapshot_ts,
242                        error = %e,
243                        "Failed to persist BBO snapshot batch"
244                    );
245                }
246            }
247        }
248    }
249
250    async fn cleanup_retention(&self, now_secs: i64, consecutive_failures: &mut u64) {
251        let cutoff_ts = now_secs - self.config.retention_secs();
252        // Prune in-memory cache more aggressively -- we only need ~25h of data.
253        let cache_cutoff = now_secs - CACHE_RETENTION_SECS;
254        self.prune_in_memory_snapshots(cache_cutoff).await;
255        match self.db.delete_bbo_snapshots_older_than(cutoff_ts).await {
256            Ok(deleted_rows) => {
257                *consecutive_failures = 0;
258                counter!("ht_bbo_snapshot_retention_rows_deleted_total")
259                    .increment(deleted_rows as u64);
260            }
261            Err(e) => {
262                *consecutive_failures += 1;
263                counter!("ht_bbo_snapshot_retention_cleanup_total", "status" => "error")
264                    .increment(1);
265                if *consecutive_failures <= 3 || (*consecutive_failures).is_multiple_of(10) {
266                    error!(
267                        failures = *consecutive_failures,
268                        cutoff_ts,
269                        error = %e,
270                        "Failed to cleanup BBO snapshots retention"
271                    );
272                }
273            }
274        }
275    }
276
277    async fn update_in_memory_snapshots(
278        &self,
279        snapshot_ts: i64,
280        snapshots: &[NewBboSnapshotInput],
281    ) {
282        let mut cache = self.in_memory_snapshots.write().await;
283        for snapshot in snapshots {
284            let series = cache
285                .entry(snapshot.symbol.clone())
286                .or_insert_with(VecDeque::new);
287
288            match series.back() {
289                Some(last) if last.snapshot_ts == snapshot_ts => {
290                    // Same timestamp as the tail -- update in place.
291                    series.back_mut().unwrap().best_ask = snapshot.best_ask;
292                }
293                Some(last) if last.snapshot_ts > snapshot_ts => {
294                    // Out-of-order arrival -- insert in sorted position.
295                    let pos = series
296                        .binary_search_by_key(&snapshot_ts, |s| s.snapshot_ts)
297                        .unwrap_or_else(|pos| pos);
298                    series.insert(
299                        pos,
300                        CachedBboSnapshot {
301                            snapshot_ts,
302                            best_ask: snapshot.best_ask,
303                        },
304                    );
305                }
306                _ => {
307                    series.push_back(CachedBboSnapshot {
308                        snapshot_ts,
309                        best_ask: snapshot.best_ask,
310                    });
311                }
312            }
313        }
314    }
315
316    async fn prune_in_memory_snapshots(&self, cutoff_ts: i64) {
317        let mut cache = self.in_memory_snapshots.write().await;
318        cache.retain(|_, series| {
319            while series
320                .front()
321                .is_some_and(|snapshot| snapshot.snapshot_ts < cutoff_ts)
322            {
323                let _ = series.pop_front();
324            }
325            !series.is_empty()
326        });
327    }
328}
329
330fn build_reference_asks_from_cache(
331    snapshots: &HashMap<String, VecDeque<CachedBboSnapshot>>,
332    symbols: &[String],
333    cutoff_ts: i64,
334) -> HashMap<String, BboReferenceData> {
335    let mut references = HashMap::new();
336
337    for symbol in symbols {
338        if let Some(series) = snapshots.get(symbol) {
339            if series.is_empty() {
340                continue;
341            }
342
343            // Binary search for the last entry at or before cutoff_ts.
344            // partition_point returns the first index where snapshot_ts > cutoff_ts,
345            // so the entry just before it (if any) is the latest <= cutoff_ts.
346            let partition = series.partition_point(|entry| entry.snapshot_ts <= cutoff_ts);
347            let latest_before = if partition > 0 {
348                Some(&series[partition - 1])
349            } else {
350                None
351            };
352            // Only use a real 24h-ago reference. If there's no snapshot
353            // before the cutoff, the instrument is too new for a meaningful
354            // price_change — skip it rather than comparing against a
355            // minutes-old price that produces absurd percentages.
356            if let Some(row) = latest_before {
357                references.insert(
358                    symbol.clone(),
359                    BboReferenceData {
360                        reference_ask: row.best_ask,
361                        reference_ts: row.snapshot_ts,
362                        used_earliest_fallback: false,
363                    },
364                );
365            }
366        }
367    }
368
369    references
370}
371
372fn build_snapshot_rows(
373    symbols: &[String],
374    quote_provider: &dyn QuoteProvider,
375) -> Vec<NewBboSnapshotInput> {
376    symbols
377        .iter()
378        .filter_map(|symbol| {
379            quote_provider
380                .get_quote(symbol)
381                .and_then(|quote| snapshot_row_from_quote(symbol, &quote))
382        })
383        .collect()
384}
385
386fn snapshot_row_from_quote(symbol: &str, quote: &SnapshotBookQuote) -> Option<NewBboSnapshotInput> {
387    let best_bid = quote.best_bid?;
388    let best_ask = quote.best_ask?;
389    if best_bid <= 0.0 || best_ask <= 0.0 {
390        return None;
391    }
392
393    let best_bid_dec = Decimal::from_f64(best_bid)?;
394    let best_ask_dec = Decimal::from_f64(best_ask)?;
395
396    let best_bid_size = quote
397        .best_bid_size
398        .filter(|size| *size > 0.0)
399        .and_then(Decimal::from_f64);
400    let best_ask_size = quote
401        .best_ask_size
402        .filter(|size| *size > 0.0)
403        .and_then(Decimal::from_f64);
404
405    Some(NewBboSnapshotInput {
406        symbol: symbol.to_string(),
407        best_bid: best_bid_dec,
408        best_ask: best_ask_dec,
409        best_bid_size,
410        best_ask_size,
411        snapshot_ts: 0,
412    })
413}
414
415fn aligned_interval_secs(interval_secs: u64) -> tokio::time::Interval {
416    let now_secs = Utc::now().timestamp();
417    let next_secs = next_bucket_start_secs(now_secs, interval_secs);
418    let delay_secs = (next_secs - now_secs).max(0) as u64;
419
420    let start_at = Instant::now() + Duration::from_secs(delay_secs);
421    let mut ticker = interval_at(start_at, Duration::from_secs(interval_secs));
422    ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
423    ticker
424}
425
426fn align_timestamp_secs(timestamp_secs: i64, interval_secs: u64) -> i64 {
427    let interval = i64::try_from(interval_secs).expect("interval_secs exceeded i64::MAX");
428    timestamp_secs - (timestamp_secs % interval)
429}
430
431fn next_bucket_start_secs(timestamp_secs: i64, interval_secs: u64) -> i64 {
432    align_timestamp_secs(timestamp_secs, interval_secs)
433        + i64::try_from(interval_secs).expect("interval_secs exceeded i64::MAX")
434}
435
436#[async_trait::async_trait]
437impl hypercall_api::caches::options_summary::BboReferenceAskReader for BboSnapshotService {
438    async fn get_reference_asks(
439        &self,
440        symbols: &[String],
441        cutoff_ts: i64,
442    ) -> HashMap<String, BboReferenceData> {
443        self.get_reference_asks(symbols, cutoff_ts).await
444    }
445}
446
447#[async_trait::async_trait]
448impl crate::shared::service::Service for BboSnapshotService {
449    fn name(&self) -> &'static str {
450        "BboSnapshotService"
451    }
452
453    fn owner(&self) -> crate::shared::service::ServiceOwner {
454        crate::shared::service::ServiceOwner::Api
455    }
456
457    async fn run(
458        self: std::sync::Arc<Self>,
459        shutdown: crate::shared::ShutdownRx,
460    ) -> anyhow::Result<()> {
461        self.start_with_shutdown(shutdown)
462            .await
463            .map_err(|e| anyhow::anyhow!("BboSnapshotService task failed: {:?}", e))
464    }
465}
466
467#[cfg(test)]
468mod tests {
469    use super::*;
470    use crate::rsm::engine_snapshot::MockQuoteProvider;
471    use hypercall_runtime_api::SnapshotBookQuote;
472    use std::collections::VecDeque;
473
474    #[test]
475    fn test_align_timestamp_secs() {
476        let ts = 1_700_000_123_i64;
477        let aligned = align_timestamp_secs(ts, 300);
478        assert_eq!(aligned % 300, 0);
479        assert!(aligned <= ts);
480        assert!((ts - aligned) < 300);
481    }
482
483    #[test]
484    fn test_build_snapshot_rows_filters_invalid_quotes() {
485        let quote_provider = MockQuoteProvider::new();
486        quote_provider.set_quote(
487            "BTC-VALID",
488            SnapshotBookQuote {
489                best_bid: Some(100.0),
490                best_bid_size: Some(1.0),
491                best_ask: Some(101.0),
492                best_ask_size: Some(2.0),
493                mid: Some(100.5),
494                bids: vec![(100.0, 1.0)],
495                asks: vec![(101.0, 2.0)],
496            },
497        );
498        quote_provider.set_quote(
499            "BTC-NO-BID",
500            SnapshotBookQuote {
501                best_bid: None,
502                best_bid_size: None,
503                best_ask: Some(101.0),
504                best_ask_size: Some(2.0),
505                mid: Some(101.0),
506                bids: vec![],
507                asks: vec![(101.0, 2.0)],
508            },
509        );
510        quote_provider.set_quote(
511            "BTC-NON-POSITIVE",
512            SnapshotBookQuote {
513                best_bid: Some(0.0),
514                best_bid_size: Some(1.0),
515                best_ask: Some(101.0),
516                best_ask_size: Some(2.0),
517                mid: Some(50.5),
518                bids: vec![(0.0, 1.0)],
519                asks: vec![(101.0, 2.0)],
520            },
521        );
522
523        let symbols = vec![
524            "BTC-VALID".to_string(),
525            "BTC-NO-BID".to_string(),
526            "BTC-NON-POSITIVE".to_string(),
527            "BTC-MISSING".to_string(),
528        ];
529
530        let rows = build_snapshot_rows(&symbols, &quote_provider);
531        assert_eq!(rows.len(), 1);
532        assert_eq!(rows[0].symbol, "BTC-VALID");
533        assert_eq!(rows[0].best_bid.to_string(), "100");
534        assert_eq!(rows[0].best_ask.to_string(), "101");
535    }
536
537    #[test]
538    fn test_retention_secs_default() {
539        let cfg = BboSnapshotTaskConfig::default();
540        assert_eq!(cfg.retention_secs(), 7 * SECONDS_PER_DAY);
541    }
542
543    #[tokio::test]
544    async fn test_get_reference_asks_uses_latest_before_cutoff_skips_too_new() {
545        let mut snapshots = HashMap::new();
546        snapshots.insert(
547            "BTC-1".to_string(),
548            VecDeque::from(vec![
549                CachedBboSnapshot {
550                    snapshot_ts: 100,
551                    best_ask: Decimal::from(10),
552                },
553                CachedBboSnapshot {
554                    snapshot_ts: 200,
555                    best_ask: Decimal::from(20),
556                },
557                CachedBboSnapshot {
558                    snapshot_ts: 300,
559                    best_ask: Decimal::from(30),
560                },
561            ]),
562        );
563        // ETH-1 only has a snapshot AFTER the cutoff — too new for a
564        // meaningful 24h reference, so it should be skipped entirely.
565        snapshots.insert(
566            "ETH-1".to_string(),
567            VecDeque::from(vec![CachedBboSnapshot {
568                snapshot_ts: 500,
569                best_ask: Decimal::from(50),
570            }]),
571        );
572
573        let refs = build_reference_asks_from_cache(
574            &snapshots,
575            &["BTC-1".to_string(), "ETH-1".to_string()],
576            250,
577        );
578
579        let btc = refs.get("BTC-1").expect("missing BTC reference");
580        assert_eq!(btc.reference_ts, 200);
581        assert_eq!(btc.reference_ask, Decimal::from(20));
582        assert!(!btc.used_earliest_fallback);
583
584        // ETH-1 should NOT have a reference — no snapshot before cutoff
585        assert!(
586            !refs.contains_key("ETH-1"),
587            "instruments with no pre-cutoff snapshot should not get a reference"
588        );
589    }
590
591    #[tokio::test]
592    async fn test_exact_cutoff_boundary_returns_that_entry() {
593        let mut snapshots = HashMap::new();
594        snapshots.insert(
595            "SYM".to_string(),
596            VecDeque::from(vec![
597                CachedBboSnapshot {
598                    snapshot_ts: 100,
599                    best_ask: Decimal::from(10),
600                },
601                CachedBboSnapshot {
602                    snapshot_ts: 200,
603                    best_ask: Decimal::from(20),
604                },
605            ]),
606        );
607
608        // cutoff_ts == snapshot_ts should match (<=)
609        let refs = build_reference_asks_from_cache(&snapshots, &["SYM".to_string()], 200);
610        let entry = refs
611            .get("SYM")
612            .expect("should find entry at exact boundary");
613        assert_eq!(entry.reference_ts, 200);
614        assert_eq!(entry.reference_ask, Decimal::from(20));
615        assert!(!entry.used_earliest_fallback);
616    }
617
618    #[tokio::test]
619    async fn test_duplicate_timestamp_upsert() {
620        let mut snapshots = HashMap::new();
621        snapshots.insert(
622            "SYM".to_string(),
623            VecDeque::from(vec![CachedBboSnapshot {
624                snapshot_ts: 100,
625                best_ask: Decimal::from(10),
626            }]),
627        );
628
629        // Insert duplicate timestamp via update_in_memory_snapshots path
630        let service_snapshots = Arc::new(tokio::sync::RwLock::new(snapshots));
631
632        // Simulate the update logic directly
633        {
634            let mut cache = service_snapshots.write().await;
635            let series = cache.get_mut("SYM").unwrap();
636            // Same timestamp as back -- should update
637            assert_eq!(series.back().unwrap().snapshot_ts, 100);
638            series.back_mut().unwrap().best_ask = Decimal::from(99);
639            assert_eq!(series.len(), 1);
640            assert_eq!(series.back().unwrap().best_ask, Decimal::from(99));
641        }
642    }
643
644    #[tokio::test]
645    async fn test_prune_boundary() {
646        let mut snapshots = HashMap::new();
647        snapshots.insert(
648            "SYM".to_string(),
649            VecDeque::from(vec![
650                CachedBboSnapshot {
651                    snapshot_ts: 100,
652                    best_ask: Decimal::from(10),
653                },
654                CachedBboSnapshot {
655                    snapshot_ts: 200,
656                    best_ask: Decimal::from(20),
657                },
658                CachedBboSnapshot {
659                    snapshot_ts: 300,
660                    best_ask: Decimal::from(30),
661                },
662            ]),
663        );
664
665        let cache = Arc::new(tokio::sync::RwLock::new(snapshots));
666
667        // Prune with cutoff_ts = 200: entries with ts < 200 should be removed
668        {
669            let mut c = cache.write().await;
670            c.retain(|_, series| {
671                while series.front().is_some_and(|s| s.snapshot_ts < 200) {
672                    let _ = series.pop_front();
673                }
674                !series.is_empty()
675            });
676        }
677
678        let c = cache.read().await;
679        let series = c.get("SYM").unwrap();
680        assert_eq!(series.len(), 2);
681        assert_eq!(series[0].snapshot_ts, 200); // ts=200 is NOT pruned (< 200 is false)
682        assert_eq!(series[1].snapshot_ts, 300);
683    }
684
685    #[tokio::test]
686    async fn test_seed_snapshots_inserts_in_sorted_order() {
687        let _instruments_cache = Arc::new(InstrumentsCache::new());
688        let _quote_provider: Arc<dyn QuoteProvider> = Arc::new(MockQuoteProvider::new());
689        // We need a DieselDb but seed_snapshots doesn't use it -- construct with
690        // a dummy URL that won't actually be connected to in this test.
691        // Instead, just test the cache logic directly.
692        let cache: Arc<tokio::sync::RwLock<HashMap<String, VecDeque<CachedBboSnapshot>>>> =
693            Arc::new(tokio::sync::RwLock::new(HashMap::new()));
694
695        // Simulate seed_snapshots logic
696        let snapshots = vec![
697            NewBboSnapshotInput {
698                symbol: "A".to_string(),
699                best_bid: Decimal::from(1),
700                best_ask: Decimal::from(30),
701                best_bid_size: None,
702                best_ask_size: None,
703                snapshot_ts: 300,
704            },
705            NewBboSnapshotInput {
706                symbol: "A".to_string(),
707                best_bid: Decimal::from(1),
708                best_ask: Decimal::from(10),
709                best_bid_size: None,
710                best_ask_size: None,
711                snapshot_ts: 100,
712            },
713            NewBboSnapshotInput {
714                symbol: "A".to_string(),
715                best_bid: Decimal::from(1),
716                best_ask: Decimal::from(20),
717                best_bid_size: None,
718                best_ask_size: None,
719                snapshot_ts: 200,
720            },
721        ];
722
723        {
724            let mut c = cache.write().await;
725            for snapshot in &snapshots {
726                let series = c
727                    .entry(snapshot.symbol.clone())
728                    .or_insert_with(VecDeque::new);
729                let pos = series
730                    .binary_search_by_key(&snapshot.snapshot_ts, |s| s.snapshot_ts)
731                    .unwrap_or_else(|pos| pos);
732                if pos < series.len() && series[pos].snapshot_ts == snapshot.snapshot_ts {
733                    series[pos].best_ask = snapshot.best_ask;
734                } else {
735                    series.insert(
736                        pos,
737                        CachedBboSnapshot {
738                            snapshot_ts: snapshot.snapshot_ts,
739                            best_ask: snapshot.best_ask,
740                        },
741                    );
742                }
743            }
744        }
745
746        let c = cache.read().await;
747        let series = c.get("A").unwrap();
748        assert_eq!(series.len(), 3);
749        assert_eq!(series[0].snapshot_ts, 100);
750        assert_eq!(series[1].snapshot_ts, 200);
751        assert_eq!(series[2].snapshot_ts, 300);
752    }
753}