Skip to main content

hypercall_api/caches/
marquee_movers.rs

1use rust_decimal::prelude::ToPrimitive;
2use serde::Serialize;
3use std::collections::HashSet;
4use std::sync::Arc;
5use std::time::SystemTime;
6use tokio::sync::RwLock;
7use tracing::debug;
8
9use super::options_summary::OptionsSummarySnapshotCache;
10use hypercall_db::AnalyticsReader;
11use hypercall_types::HISTORICAL_THEO_INTERVAL_1H_MS;
12
13const MAX_MOVERS: usize = 10;
14const MIN_MOVE_THRESHOLD: f64 = 2.0;
15/// Skip options where either the current or 24h-ago theo is below this floor.
16/// Percentage changes on near-zero theos are mathematically correct but
17/// misleading (e.g. $0.004 -> $0.18 = +4000%).
18const MIN_THEO_FLOOR: f64 = 0.50;
19
20#[derive(Debug, Clone, Serialize)]
21pub struct MarqueeMover {
22    pub instrument_name: String,
23    pub underlying: String,
24    pub mark_price: f64,
25    /// 24h price change in percentage points (already * 100).
26    pub price_change: Option<f64>,
27    pub volume: f64,
28}
29
30#[derive(Debug, Clone, Serialize)]
31pub struct MarqueeMoversSnapshot {
32    pub built_at_ms: u64,
33    pub movers: Vec<MarqueeMover>,
34}
35
36struct CachedSnapshot {
37    payload: Arc<Vec<u8>>,
38    built_at: SystemTime,
39}
40
41pub struct MarqueeMoversCache {
42    options_summary_cache: Arc<OptionsSummarySnapshotCache>,
43    db: Arc<dyn AnalyticsReader>,
44    cached: RwLock<Option<CachedSnapshot>>,
45}
46
47impl MarqueeMoversCache {
48    pub fn new(
49        options_summary_cache: Arc<OptionsSummarySnapshotCache>,
50        db: Arc<dyn AnalyticsReader>,
51    ) -> Self {
52        Self {
53            options_summary_cache,
54            db,
55            cached: RwLock::new(None),
56        }
57    }
58
59    pub async fn refresh(&self) {
60        let now = SystemTime::now();
61        let built_at_ms = now
62            .duration_since(std::time::UNIX_EPOCH)
63            .map(|d| d.as_millis() as u64)
64            .unwrap_or(0);
65
66        let underlyings = self.options_summary_cache.available_underlyings();
67        if underlyings.is_empty() {
68            self.store_snapshot(built_at_ms, vec![], now).await;
69            return;
70        }
71
72        let (summaries, _built_at) = self.options_summary_cache.get_for_underlyings(&underlyings);
73
74        // Filter to instruments with a real quote (orderbook or indicative)
75        // and a current theoretical price.
76        let quoted: Vec<_> = summaries
77            .iter()
78            .filter(|s| {
79                let has_quote = s.bid_price > 0.0
80                    || s.ask_price > 0.0
81                    || s.indicative_bid_price.is_some_and(|p| p > 0.0)
82                    || s.indicative_ask_price.is_some_and(|p| p > 0.0);
83                let has_theo = s.theoretical_price.is_some_and(|t| t > 0.0);
84                has_quote && has_theo
85            })
86            .collect();
87
88        if quoted.is_empty() {
89            self.store_snapshot(built_at_ms, vec![], now).await;
90            return;
91        }
92
93        let symbols: Vec<String> = quoted.iter().map(|s| s.instrument_name.clone()).collect();
94
95        let cutoff_ms = chrono::Utc::now().timestamp_millis() - 86_400_000;
96        let historical = self
97            .db
98            .get_historical_theos_batch(&symbols, HISTORICAL_THEO_INTERVAL_1H_MS, 25)
99            .await
100            .unwrap_or_default();
101
102        let mut movers: Vec<MarqueeMover> = Vec::new();
103        let mut filler_candidates: Vec<MarqueeMover> = Vec::new();
104
105        for summary in &quoted {
106            let Some(current_theo) = summary.theoretical_price else {
107                continue;
108            };
109            if current_theo <= 0.0 {
110                continue;
111            }
112
113            filler_candidates.push(MarqueeMover {
114                instrument_name: summary.instrument_name.clone(),
115                underlying: summary.base_currency.clone(),
116                mark_price: current_theo,
117                price_change: None,
118                volume: summary.volume,
119            });
120
121            let ref_theo = historical
122                .get(&summary.instrument_name)
123                .and_then(|series| {
124                    series
125                        .iter()
126                        .rfind(|p| p.timestamp_ms <= cutoff_ms)
127                        .and_then(|p| p.theoretical_price.to_f64())
128                })
129                .unwrap_or(0.0);
130
131            if ref_theo < MIN_THEO_FLOOR || current_theo < MIN_THEO_FLOOR {
132                continue;
133            }
134
135            let price_change = ((current_theo - ref_theo) / ref_theo) * 100.0;
136            if !price_change.is_finite() || price_change.abs() < MIN_MOVE_THRESHOLD {
137                continue;
138            }
139
140            movers.push(MarqueeMover {
141                instrument_name: summary.instrument_name.clone(),
142                underlying: summary.base_currency.clone(),
143                mark_price: current_theo,
144                price_change: Some(price_change),
145                volume: summary.volume,
146            });
147        }
148
149        movers.sort_by(|a, b| {
150            b.price_change
151                .map(f64::abs)
152                .partial_cmp(&a.price_change.map(f64::abs))
153                .unwrap_or(std::cmp::Ordering::Equal)
154        });
155        movers.truncate(MAX_MOVERS);
156
157        if movers.len() < MAX_MOVERS {
158            let mut seen: HashSet<String> = movers
159                .iter()
160                .map(|mover| mover.instrument_name.clone())
161                .collect();
162
163            filler_candidates.sort_by(|a, b| {
164                b.volume
165                    .partial_cmp(&a.volume)
166                    .unwrap_or(std::cmp::Ordering::Equal)
167                    .then_with(|| a.instrument_name.cmp(&b.instrument_name))
168            });
169
170            for candidate in filler_candidates {
171                if movers.len() >= MAX_MOVERS {
172                    break;
173                }
174                if seen.insert(candidate.instrument_name.clone()) {
175                    movers.push(candidate);
176                }
177            }
178        }
179
180        debug!(count = movers.len(), "MarqueeMoversCache refreshed");
181
182        self.store_snapshot(built_at_ms, movers, now).await;
183    }
184
185    async fn store_snapshot(
186        &self,
187        built_at_ms: u64,
188        movers: Vec<MarqueeMover>,
189        built_at: SystemTime,
190    ) {
191        let snapshot = MarqueeMoversSnapshot {
192            built_at_ms,
193            movers,
194        };
195        if let Ok(payload) = serde_json::to_vec(&snapshot) {
196            let mut cached = self.cached.write().await;
197            *cached = Some(CachedSnapshot {
198                payload: Arc::new(payload),
199                built_at,
200            });
201        }
202    }
203
204    pub async fn published_response(&self) -> (Arc<Vec<u8>>, SystemTime) {
205        let cached = self.cached.read().await;
206        match cached.as_ref() {
207            Some(snapshot) => (snapshot.payload.clone(), snapshot.built_at),
208            None => (Arc::new(b"{}".to_vec()), SystemTime::now()),
209        }
210    }
211}