hypercall_api/caches/
marquee_movers.rs1use rust_decimal::prelude::ToPrimitive;
2use serde::Serialize;
3use std::collections::HashSet;
4use std::sync::Arc;
5use std::time::SystemTime;
6use tokio::sync::RwLock;
7use tracing::debug;
8
9use super::options_summary::OptionsSummarySnapshotCache;
10use hypercall_db::AnalyticsReader;
11use hypercall_types::HISTORICAL_THEO_INTERVAL_1H_MS;
12
13const MAX_MOVERS: usize = 10;
14const MIN_MOVE_THRESHOLD: f64 = 2.0;
15const MIN_THEO_FLOOR: f64 = 0.50;
19
20#[derive(Debug, Clone, Serialize)]
21pub struct MarqueeMover {
22 pub instrument_name: String,
23 pub underlying: String,
24 pub mark_price: f64,
25 pub price_change: Option<f64>,
27 pub volume: f64,
28}
29
30#[derive(Debug, Clone, Serialize)]
31pub struct MarqueeMoversSnapshot {
32 pub built_at_ms: u64,
33 pub movers: Vec<MarqueeMover>,
34}
35
36struct CachedSnapshot {
37 payload: Arc<Vec<u8>>,
38 built_at: SystemTime,
39}
40
41pub struct MarqueeMoversCache {
42 options_summary_cache: Arc<OptionsSummarySnapshotCache>,
43 db: Arc<dyn AnalyticsReader>,
44 cached: RwLock<Option<CachedSnapshot>>,
45}
46
47impl MarqueeMoversCache {
48 pub fn new(
49 options_summary_cache: Arc<OptionsSummarySnapshotCache>,
50 db: Arc<dyn AnalyticsReader>,
51 ) -> Self {
52 Self {
53 options_summary_cache,
54 db,
55 cached: RwLock::new(None),
56 }
57 }
58
59 pub async fn refresh(&self) {
60 let now = SystemTime::now();
61 let built_at_ms = now
62 .duration_since(std::time::UNIX_EPOCH)
63 .map(|d| d.as_millis() as u64)
64 .unwrap_or(0);
65
66 let underlyings = self.options_summary_cache.available_underlyings();
67 if underlyings.is_empty() {
68 self.store_snapshot(built_at_ms, vec![], now).await;
69 return;
70 }
71
72 let (summaries, _built_at) = self.options_summary_cache.get_for_underlyings(&underlyings);
73
74 let quoted: Vec<_> = summaries
77 .iter()
78 .filter(|s| {
79 let has_quote = s.bid_price > 0.0
80 || s.ask_price > 0.0
81 || s.indicative_bid_price.is_some_and(|p| p > 0.0)
82 || s.indicative_ask_price.is_some_and(|p| p > 0.0);
83 let has_theo = s.theoretical_price.is_some_and(|t| t > 0.0);
84 has_quote && has_theo
85 })
86 .collect();
87
88 if quoted.is_empty() {
89 self.store_snapshot(built_at_ms, vec![], now).await;
90 return;
91 }
92
93 let symbols: Vec<String> = quoted.iter().map(|s| s.instrument_name.clone()).collect();
94
95 let cutoff_ms = chrono::Utc::now().timestamp_millis() - 86_400_000;
96 let historical = self
97 .db
98 .get_historical_theos_batch(&symbols, HISTORICAL_THEO_INTERVAL_1H_MS, 25)
99 .await
100 .unwrap_or_default();
101
102 let mut movers: Vec<MarqueeMover> = Vec::new();
103 let mut filler_candidates: Vec<MarqueeMover> = Vec::new();
104
105 for summary in "ed {
106 let Some(current_theo) = summary.theoretical_price else {
107 continue;
108 };
109 if current_theo <= 0.0 {
110 continue;
111 }
112
113 filler_candidates.push(MarqueeMover {
114 instrument_name: summary.instrument_name.clone(),
115 underlying: summary.base_currency.clone(),
116 mark_price: current_theo,
117 price_change: None,
118 volume: summary.volume,
119 });
120
121 let ref_theo = historical
122 .get(&summary.instrument_name)
123 .and_then(|series| {
124 series
125 .iter()
126 .rfind(|p| p.timestamp_ms <= cutoff_ms)
127 .and_then(|p| p.theoretical_price.to_f64())
128 })
129 .unwrap_or(0.0);
130
131 if ref_theo < MIN_THEO_FLOOR || current_theo < MIN_THEO_FLOOR {
132 continue;
133 }
134
135 let price_change = ((current_theo - ref_theo) / ref_theo) * 100.0;
136 if !price_change.is_finite() || price_change.abs() < MIN_MOVE_THRESHOLD {
137 continue;
138 }
139
140 movers.push(MarqueeMover {
141 instrument_name: summary.instrument_name.clone(),
142 underlying: summary.base_currency.clone(),
143 mark_price: current_theo,
144 price_change: Some(price_change),
145 volume: summary.volume,
146 });
147 }
148
149 movers.sort_by(|a, b| {
150 b.price_change
151 .map(f64::abs)
152 .partial_cmp(&a.price_change.map(f64::abs))
153 .unwrap_or(std::cmp::Ordering::Equal)
154 });
155 movers.truncate(MAX_MOVERS);
156
157 if movers.len() < MAX_MOVERS {
158 let mut seen: HashSet<String> = movers
159 .iter()
160 .map(|mover| mover.instrument_name.clone())
161 .collect();
162
163 filler_candidates.sort_by(|a, b| {
164 b.volume
165 .partial_cmp(&a.volume)
166 .unwrap_or(std::cmp::Ordering::Equal)
167 .then_with(|| a.instrument_name.cmp(&b.instrument_name))
168 });
169
170 for candidate in filler_candidates {
171 if movers.len() >= MAX_MOVERS {
172 break;
173 }
174 if seen.insert(candidate.instrument_name.clone()) {
175 movers.push(candidate);
176 }
177 }
178 }
179
180 debug!(count = movers.len(), "MarqueeMoversCache refreshed");
181
182 self.store_snapshot(built_at_ms, movers, now).await;
183 }
184
185 async fn store_snapshot(
186 &self,
187 built_at_ms: u64,
188 movers: Vec<MarqueeMover>,
189 built_at: SystemTime,
190 ) {
191 let snapshot = MarqueeMoversSnapshot {
192 built_at_ms,
193 movers,
194 };
195 if let Ok(payload) = serde_json::to_vec(&snapshot) {
196 let mut cached = self.cached.write().await;
197 *cached = Some(CachedSnapshot {
198 payload: Arc::new(payload),
199 built_at,
200 });
201 }
202 }
203
204 pub async fn published_response(&self) -> (Arc<Vec<u8>>, SystemTime) {
205 let cached = self.cached.read().await;
206 match cached.as_ref() {
207 Some(snapshot) => (snapshot.payload.clone(), snapshot.built_at),
208 None => (Arc::new(b"{}".to_vec()), SystemTime::now()),
209 }
210 }
211}