1use std::sync::Arc;
2use std::time::Duration;
3
4use metrics::counter;
5use rust_decimal::Decimal;
6use tokio::time::{interval_at, Instant, MissedTickBehavior};
7use tracing::{debug, error, info};
8
9use super::historical_pnl::{INTERVAL_1D_MS, INTERVAL_1H_MS, INTERVAL_5M_MS};
10use crate::read_cache::greeks::GreeksCache;
11use crate::read_cache::instruments_registry::InstrumentsCache;
12use hypercall_db::AnalyticsWriter;
13use std::collections::BTreeSet;
14
15#[derive(Debug, Clone)]
17pub struct HistoricalTheoTaskConfig {
18 pub max_periods: i64,
20 pub capture_every_5m_ms: i64,
22 pub capture_every_1h_ms: i64,
24 pub capture_every_1d_ms: i64,
26}
27
28impl Default for HistoricalTheoTaskConfig {
29 fn default() -> Self {
30 Self {
31 max_periods: 100,
32 capture_every_5m_ms: INTERVAL_5M_MS,
33 capture_every_1h_ms: INTERVAL_1H_MS,
34 capture_every_1d_ms: INTERVAL_1D_MS,
35 }
36 }
37}
38
39impl HistoricalTheoTaskConfig {
40 pub fn from_env() -> Self {
41 let default = Self::default();
42 Self {
43 max_periods: read_positive_i64("HISTORICAL_THEO_MAX_PERIODS", default.max_periods),
44 capture_every_5m_ms: read_positive_i64(
45 "HISTORICAL_THEO_CAPTURE_EVERY_5M_MS",
46 default.capture_every_5m_ms,
47 ),
48 capture_every_1h_ms: read_positive_i64(
49 "HISTORICAL_THEO_CAPTURE_EVERY_1H_MS",
50 default.capture_every_1h_ms,
51 ),
52 capture_every_1d_ms: read_positive_i64(
53 "HISTORICAL_THEO_CAPTURE_EVERY_1D_MS",
54 default.capture_every_1d_ms,
55 ),
56 }
57 }
58}
59
60#[derive(Clone)]
61pub struct HistoricalTheoSnapshotTask {
62 instruments_cache: Arc<InstrumentsCache>,
63 greeks_cache: Arc<GreeksCache>,
64 db: Arc<dyn AnalyticsWriter>,
65 config: HistoricalTheoTaskConfig,
66}
67
68impl HistoricalTheoSnapshotTask {
69 pub fn new(
70 instruments_cache: Arc<InstrumentsCache>,
71 greeks_cache: Arc<GreeksCache>,
72 db: Arc<dyn AnalyticsWriter>,
73 config: HistoricalTheoTaskConfig,
74 ) -> Self {
75 Self {
76 instruments_cache,
77 greeks_cache,
78 db,
79 config,
80 }
81 }
82
83 async fn capture_interval(&self, interval_ms: i64, label: &'static str) {
84 let bucket_timestamp_ms = align_timestamp_ms(now_timestamp_ms(), interval_ms);
85 let active_instrument_symbols: Vec<String> = self
86 .instruments_cache
87 .get_all()
88 .await
89 .into_iter()
90 .filter(|instrument| instrument.status.is_active())
91 .map(|instrument| instrument.id)
92 .collect();
93 let configured_underlyings = self.greeks_cache.configured_underlyings();
94 let snapshot_symbols =
95 build_snapshot_symbols(active_instrument_symbols, configured_underlyings);
96
97 if snapshot_symbols.is_empty() {
98 debug!(
99 "Historical theo snapshot skipped for {}: no active instruments or configured underlyings",
100 label
101 );
102 return;
103 }
104
105 let mut snapshots: Vec<(String, Decimal)> = Vec::with_capacity(snapshot_symbols.len());
106 for symbol in snapshot_symbols {
107 let theoretical_mark = match self.greeks_cache.get_theoretical_mark(&symbol).await {
108 Ok(price) => price,
109 Err(error) => {
110 error!(
111 symbol,
112 interval = label,
113 bucket_timestamp_ms,
114 error = %error,
115 "Historical theo snapshot failed to compute theoretical mark"
116 );
117 counter!(
118 "ht_historical_theo_snapshot_total",
119 "interval" => label,
120 "status" => "error"
121 )
122 .increment(1);
123 continue;
124 }
125 };
126
127 let Some(theoretical_mark) = Decimal::from_f64_retain(theoretical_mark) else {
128 error!(
129 symbol,
130 interval = label,
131 bucket_timestamp_ms,
132 theoretical_mark,
133 "Historical theo snapshot failed to convert theoretical mark"
134 );
135 counter!(
136 "ht_historical_theo_snapshot_total",
137 "interval" => label,
138 "status" => "error"
139 )
140 .increment(1);
141 continue;
142 };
143
144 snapshots.push((symbol, theoretical_mark));
145 }
146
147 if snapshots.is_empty() {
148 debug!(
149 "Historical theo snapshot skipped for {}: no symbols with computed theos",
150 label
151 );
152 return;
153 }
154
155 match self
156 .db
157 .upsert_historical_theo_batch(
158 interval_ms,
159 bucket_timestamp_ms,
160 &snapshots,
161 self.config.max_periods,
162 )
163 .await
164 {
165 Ok(affected) => {
166 info!(
167 "Historical theo snapshot captured interval={} symbols={} affected_rows={} bucket_ts={}",
168 label,
169 snapshots.len(),
170 affected,
171 bucket_timestamp_ms
172 );
173 counter!("ht_historical_theo_snapshot_total", "interval" => label, "status" => "success")
174 .increment(snapshots.len() as u64);
175 }
176 Err(error) => {
177 error!(
178 interval = label,
179 bucket_timestamp_ms,
180 error = %error,
181 "Failed to persist historical theo snapshot"
182 );
183 counter!("ht_historical_theo_snapshot_total", "interval" => label, "status" => "error")
184 .increment(snapshots.len() as u64);
185 }
186 }
187 }
188}
189
190fn build_snapshot_symbols(
191 active_instrument_symbols: Vec<String>,
192 configured_underlyings: Vec<String>,
193) -> Vec<String> {
194 let configured_perp_symbols = configured_underlyings
195 .iter()
196 .map(|underlying| format!("{}-PERP", underlying))
197 .collect::<Vec<_>>();
198
199 active_instrument_symbols
200 .into_iter()
201 .chain(configured_underlyings.iter().cloned())
202 .chain(configured_perp_symbols)
203 .collect::<BTreeSet<_>>()
204 .into_iter()
205 .collect()
206}
207
208fn aligned_interval(interval_ms: i64) -> tokio::time::Interval {
209 let now_ms = now_timestamp_ms();
210 let next_ms = next_bucket_start_ms(now_ms, interval_ms);
211 let delay_ms = (next_ms - now_ms).max(0) as u64;
212
213 let start_at = Instant::now() + Duration::from_millis(delay_ms);
214 let mut ticker = interval_at(start_at, Duration::from_millis(interval_ms as u64));
215 ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
216 ticker
217}
218
219fn now_timestamp_ms() -> i64 {
220 chrono::Utc::now().timestamp_millis()
221}
222
223fn align_timestamp_ms(timestamp_ms: i64, interval_ms: i64) -> i64 {
224 timestamp_ms - (timestamp_ms % interval_ms)
225}
226
227fn next_bucket_start_ms(timestamp_ms: i64, interval_ms: i64) -> i64 {
228 align_timestamp_ms(timestamp_ms, interval_ms) + interval_ms
229}
230
231fn read_positive_i64(key: &str, default: i64) -> i64 {
232 match std::env::var(key) {
233 Ok(value) => match value.parse::<i64>() {
234 Ok(parsed) if parsed > 0 => parsed,
235 _ => {
236 error!(
237 "Invalid {}='{}' (must be positive integer), using default {}",
238 key, value, default
239 );
240 default
241 }
242 },
243 Err(_) => default,
244 }
245}
246
247#[async_trait::async_trait]
248impl crate::shared::service::Service for HistoricalTheoSnapshotTask {
249 fn name(&self) -> &'static str {
250 "HistoricalTheoSnapshotTask"
251 }
252
253 fn owner(&self) -> crate::shared::service::ServiceOwner {
254 crate::shared::service::ServiceOwner::Api
255 }
256
257 async fn run(
258 self: std::sync::Arc<Self>,
259 mut shutdown: crate::shared::ShutdownRx,
260 ) -> anyhow::Result<()> {
261 let mut ticker_5m = aligned_interval(self.config.capture_every_5m_ms);
262 let mut ticker_1h = aligned_interval(self.config.capture_every_1h_ms);
263 let mut ticker_1d = aligned_interval(self.config.capture_every_1d_ms);
264
265 loop {
266 tokio::select! {
267 _ = shutdown.recv() => {
268 info!("Historical theo snapshot task received shutdown signal");
269 break;
270 }
271 _ = ticker_5m.tick() => {
272 self.capture_interval(INTERVAL_5M_MS, "5m").await;
273 }
274 _ = ticker_1h.tick() => {
275 self.capture_interval(INTERVAL_1H_MS, "1h").await;
276 }
277 _ = ticker_1d.tick() => {
278 self.capture_interval(INTERVAL_1D_MS, "1d").await;
279 }
280 }
281 }
282 Ok(())
283 }
284}
285
286#[cfg(test)]
287mod tests {
288 use super::*;
289
290 #[test]
291 fn test_build_snapshot_symbols_includes_active_instruments_and_underlyings() {
292 let symbols = build_snapshot_symbols(
293 vec![
294 "BTC-CALL-50000-20261231".to_string(),
295 "BTC-PERP".to_string(),
296 "BTC-PERP".to_string(),
297 ],
298 vec!["BTC".to_string(), "ETH".to_string(), "BTC".to_string()],
299 );
300
301 assert_eq!(
302 symbols,
303 vec![
304 "BTC".to_string(),
305 "BTC-CALL-50000-20261231".to_string(),
306 "BTC-PERP".to_string(),
307 "ETH".to_string(),
308 "ETH-PERP".to_string(),
309 ]
310 );
311 }
312
313 #[test]
314 fn test_align_timestamp_ms() {
315 let ts = 1_700_000_123_456i64;
316 let aligned_5m = align_timestamp_ms(ts, INTERVAL_5M_MS);
317 assert_eq!(aligned_5m % INTERVAL_5M_MS, 0);
318 assert!(aligned_5m <= ts);
319 assert!((ts - aligned_5m) < INTERVAL_5M_MS);
320
321 let aligned_1h = align_timestamp_ms(ts, INTERVAL_1H_MS);
322 assert_eq!(aligned_1h % INTERVAL_1H_MS, 0);
323 assert!(aligned_1h <= ts);
324 assert!((ts - aligned_1h) < INTERVAL_1H_MS);
325 }
326
327 #[test]
328 fn test_next_bucket_start_ms() {
329 let ts = 1_700_000_123_456i64;
330 let next_5m = next_bucket_start_ms(ts, INTERVAL_5M_MS);
331 assert_eq!(next_5m % INTERVAL_5M_MS, 0);
332 assert!(next_5m > ts);
333 assert_eq!(
334 next_5m - align_timestamp_ms(ts, INTERVAL_5M_MS),
335 INTERVAL_5M_MS
336 );
337 }
338}