Skip to main content

hypercall/runtime/tasks/
historical_theo.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use metrics::counter;
5use rust_decimal::Decimal;
6use tokio::time::{interval_at, Instant, MissedTickBehavior};
7use tracing::{debug, error, info};
8
9use super::historical_pnl::{INTERVAL_1D_MS, INTERVAL_1H_MS, INTERVAL_5M_MS};
10use crate::read_cache::greeks::GreeksCache;
11use crate::read_cache::instruments_registry::InstrumentsCache;
12use hypercall_db::AnalyticsWriter;
13use std::collections::BTreeSet;
14
15/// Configuration for the historical theo snapshot background task.
16#[derive(Debug, Clone)]
17pub struct HistoricalTheoTaskConfig {
18    /// Retention cap per `(symbol, interval)` in persisted snapshots.
19    pub max_periods: i64,
20    /// Capture cadence for the 5m bucket writer.
21    pub capture_every_5m_ms: i64,
22    /// Capture cadence for the 1h bucket writer.
23    pub capture_every_1h_ms: i64,
24    /// Capture cadence for the 1d bucket writer.
25    pub capture_every_1d_ms: i64,
26}
27
28impl Default for HistoricalTheoTaskConfig {
29    fn default() -> Self {
30        Self {
31            max_periods: 100,
32            capture_every_5m_ms: INTERVAL_5M_MS,
33            capture_every_1h_ms: INTERVAL_1H_MS,
34            capture_every_1d_ms: INTERVAL_1D_MS,
35        }
36    }
37}
38
39impl HistoricalTheoTaskConfig {
40    pub fn from_env() -> Self {
41        let default = Self::default();
42        Self {
43            max_periods: read_positive_i64("HISTORICAL_THEO_MAX_PERIODS", default.max_periods),
44            capture_every_5m_ms: read_positive_i64(
45                "HISTORICAL_THEO_CAPTURE_EVERY_5M_MS",
46                default.capture_every_5m_ms,
47            ),
48            capture_every_1h_ms: read_positive_i64(
49                "HISTORICAL_THEO_CAPTURE_EVERY_1H_MS",
50                default.capture_every_1h_ms,
51            ),
52            capture_every_1d_ms: read_positive_i64(
53                "HISTORICAL_THEO_CAPTURE_EVERY_1D_MS",
54                default.capture_every_1d_ms,
55            ),
56        }
57    }
58}
59
60#[derive(Clone)]
61pub struct HistoricalTheoSnapshotTask {
62    instruments_cache: Arc<InstrumentsCache>,
63    greeks_cache: Arc<GreeksCache>,
64    db: Arc<dyn AnalyticsWriter>,
65    config: HistoricalTheoTaskConfig,
66}
67
68impl HistoricalTheoSnapshotTask {
69    pub fn new(
70        instruments_cache: Arc<InstrumentsCache>,
71        greeks_cache: Arc<GreeksCache>,
72        db: Arc<dyn AnalyticsWriter>,
73        config: HistoricalTheoTaskConfig,
74    ) -> Self {
75        Self {
76            instruments_cache,
77            greeks_cache,
78            db,
79            config,
80        }
81    }
82
83    async fn capture_interval(&self, interval_ms: i64, label: &'static str) {
84        let bucket_timestamp_ms = align_timestamp_ms(now_timestamp_ms(), interval_ms);
85        let active_instrument_symbols: Vec<String> = self
86            .instruments_cache
87            .get_all()
88            .await
89            .into_iter()
90            .filter(|instrument| instrument.status.is_active())
91            .map(|instrument| instrument.id)
92            .collect();
93        let configured_underlyings = self.greeks_cache.configured_underlyings();
94        let snapshot_symbols =
95            build_snapshot_symbols(active_instrument_symbols, configured_underlyings);
96
97        if snapshot_symbols.is_empty() {
98            debug!(
99                "Historical theo snapshot skipped for {}: no active instruments or configured underlyings",
100                label
101            );
102            return;
103        }
104
105        let mut snapshots: Vec<(String, Decimal)> = Vec::with_capacity(snapshot_symbols.len());
106        for symbol in snapshot_symbols {
107            let theoretical_mark = match self.greeks_cache.get_theoretical_mark(&symbol).await {
108                Ok(price) => price,
109                Err(error) => {
110                    error!(
111                        symbol,
112                        interval = label,
113                        bucket_timestamp_ms,
114                        error = %error,
115                        "Historical theo snapshot failed to compute theoretical mark"
116                    );
117                    counter!(
118                        "ht_historical_theo_snapshot_total",
119                        "interval" => label,
120                        "status" => "error"
121                    )
122                    .increment(1);
123                    continue;
124                }
125            };
126
127            let Some(theoretical_mark) = Decimal::from_f64_retain(theoretical_mark) else {
128                error!(
129                    symbol,
130                    interval = label,
131                    bucket_timestamp_ms,
132                    theoretical_mark,
133                    "Historical theo snapshot failed to convert theoretical mark"
134                );
135                counter!(
136                    "ht_historical_theo_snapshot_total",
137                    "interval" => label,
138                    "status" => "error"
139                )
140                .increment(1);
141                continue;
142            };
143
144            snapshots.push((symbol, theoretical_mark));
145        }
146
147        if snapshots.is_empty() {
148            debug!(
149                "Historical theo snapshot skipped for {}: no symbols with computed theos",
150                label
151            );
152            return;
153        }
154
155        match self
156            .db
157            .upsert_historical_theo_batch(
158                interval_ms,
159                bucket_timestamp_ms,
160                &snapshots,
161                self.config.max_periods,
162            )
163            .await
164        {
165            Ok(affected) => {
166                info!(
167                    "Historical theo snapshot captured interval={} symbols={} affected_rows={} bucket_ts={}",
168                    label,
169                    snapshots.len(),
170                    affected,
171                    bucket_timestamp_ms
172                );
173                counter!("ht_historical_theo_snapshot_total", "interval" => label, "status" => "success")
174                    .increment(snapshots.len() as u64);
175            }
176            Err(error) => {
177                error!(
178                    interval = label,
179                    bucket_timestamp_ms,
180                    error = %error,
181                    "Failed to persist historical theo snapshot"
182                );
183                counter!("ht_historical_theo_snapshot_total", "interval" => label, "status" => "error")
184                    .increment(snapshots.len() as u64);
185            }
186        }
187    }
188}
189
190fn build_snapshot_symbols(
191    active_instrument_symbols: Vec<String>,
192    configured_underlyings: Vec<String>,
193) -> Vec<String> {
194    let configured_perp_symbols = configured_underlyings
195        .iter()
196        .map(|underlying| format!("{}-PERP", underlying))
197        .collect::<Vec<_>>();
198
199    active_instrument_symbols
200        .into_iter()
201        .chain(configured_underlyings.iter().cloned())
202        .chain(configured_perp_symbols)
203        .collect::<BTreeSet<_>>()
204        .into_iter()
205        .collect()
206}
207
208fn aligned_interval(interval_ms: i64) -> tokio::time::Interval {
209    let now_ms = now_timestamp_ms();
210    let next_ms = next_bucket_start_ms(now_ms, interval_ms);
211    let delay_ms = (next_ms - now_ms).max(0) as u64;
212
213    let start_at = Instant::now() + Duration::from_millis(delay_ms);
214    let mut ticker = interval_at(start_at, Duration::from_millis(interval_ms as u64));
215    ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
216    ticker
217}
218
219fn now_timestamp_ms() -> i64 {
220    chrono::Utc::now().timestamp_millis()
221}
222
223fn align_timestamp_ms(timestamp_ms: i64, interval_ms: i64) -> i64 {
224    timestamp_ms - (timestamp_ms % interval_ms)
225}
226
227fn next_bucket_start_ms(timestamp_ms: i64, interval_ms: i64) -> i64 {
228    align_timestamp_ms(timestamp_ms, interval_ms) + interval_ms
229}
230
231fn read_positive_i64(key: &str, default: i64) -> i64 {
232    match std::env::var(key) {
233        Ok(value) => match value.parse::<i64>() {
234            Ok(parsed) if parsed > 0 => parsed,
235            _ => {
236                error!(
237                    "Invalid {}='{}' (must be positive integer), using default {}",
238                    key, value, default
239                );
240                default
241            }
242        },
243        Err(_) => default,
244    }
245}
246
247#[async_trait::async_trait]
248impl crate::shared::service::Service for HistoricalTheoSnapshotTask {
249    fn name(&self) -> &'static str {
250        "HistoricalTheoSnapshotTask"
251    }
252
253    fn owner(&self) -> crate::shared::service::ServiceOwner {
254        crate::shared::service::ServiceOwner::Api
255    }
256
257    async fn run(
258        self: std::sync::Arc<Self>,
259        mut shutdown: crate::shared::ShutdownRx,
260    ) -> anyhow::Result<()> {
261        let mut ticker_5m = aligned_interval(self.config.capture_every_5m_ms);
262        let mut ticker_1h = aligned_interval(self.config.capture_every_1h_ms);
263        let mut ticker_1d = aligned_interval(self.config.capture_every_1d_ms);
264
265        loop {
266            tokio::select! {
267                _ = shutdown.recv() => {
268                    info!("Historical theo snapshot task received shutdown signal");
269                    break;
270                }
271                _ = ticker_5m.tick() => {
272                    self.capture_interval(INTERVAL_5M_MS, "5m").await;
273                }
274                _ = ticker_1h.tick() => {
275                    self.capture_interval(INTERVAL_1H_MS, "1h").await;
276                }
277                _ = ticker_1d.tick() => {
278                    self.capture_interval(INTERVAL_1D_MS, "1d").await;
279                }
280            }
281        }
282        Ok(())
283    }
284}
285
286#[cfg(test)]
287mod tests {
288    use super::*;
289
290    #[test]
291    fn test_build_snapshot_symbols_includes_active_instruments_and_underlyings() {
292        let symbols = build_snapshot_symbols(
293            vec![
294                "BTC-CALL-50000-20261231".to_string(),
295                "BTC-PERP".to_string(),
296                "BTC-PERP".to_string(),
297            ],
298            vec!["BTC".to_string(), "ETH".to_string(), "BTC".to_string()],
299        );
300
301        assert_eq!(
302            symbols,
303            vec![
304                "BTC".to_string(),
305                "BTC-CALL-50000-20261231".to_string(),
306                "BTC-PERP".to_string(),
307                "ETH".to_string(),
308                "ETH-PERP".to_string(),
309            ]
310        );
311    }
312
313    #[test]
314    fn test_align_timestamp_ms() {
315        let ts = 1_700_000_123_456i64;
316        let aligned_5m = align_timestamp_ms(ts, INTERVAL_5M_MS);
317        assert_eq!(aligned_5m % INTERVAL_5M_MS, 0);
318        assert!(aligned_5m <= ts);
319        assert!((ts - aligned_5m) < INTERVAL_5M_MS);
320
321        let aligned_1h = align_timestamp_ms(ts, INTERVAL_1H_MS);
322        assert_eq!(aligned_1h % INTERVAL_1H_MS, 0);
323        assert!(aligned_1h <= ts);
324        assert!((ts - aligned_1h) < INTERVAL_1H_MS);
325    }
326
327    #[test]
328    fn test_next_bucket_start_ms() {
329        let ts = 1_700_000_123_456i64;
330        let next_5m = next_bucket_start_ms(ts, INTERVAL_5M_MS);
331        assert_eq!(next_5m % INTERVAL_5M_MS, 0);
332        assert!(next_5m > ts);
333        assert_eq!(
334            next_5m - align_timestamp_ms(ts, INTERVAL_5M_MS),
335            INTERVAL_5M_MS
336        );
337    }
338}