Skip to main content

hypercall/runtime/tasks/
vol_surface_snapshot.rs

1use std::collections::HashSet;
2use std::sync::Arc;
3use std::time::Duration;
4
5use metrics::counter;
6use tokio::time::{interval_at, Instant, MissedTickBehavior};
7use tracing::{debug, error, info};
8
9use super::historical_pnl::{INTERVAL_1D_MS, INTERVAL_1H_MS, INTERVAL_5M_MS};
10use crate::vol_oracle::SharedVolOracle;
11use hypercall_db::AnalyticsWriter;
12
13/// Alias used by integrated.rs wiring.
14pub type VolSurfaceSnapshotConfig = VolSurfaceSnapshotTaskConfig;
15
16/// Configuration for the vol surface snapshot background task.
17#[derive(Debug, Clone)]
18pub struct VolSurfaceSnapshotTaskConfig {
19    /// Retention cap per `(underlying, interval)` in persisted snapshots.
20    pub max_periods: i64,
21    /// Capture cadence for the 5m bucket writer.
22    pub capture_every_5m_ms: i64,
23    /// Capture cadence for the 1h bucket writer.
24    pub capture_every_1h_ms: i64,
25    /// Capture cadence for the 1d bucket writer.
26    pub capture_every_1d_ms: i64,
27}
28
29impl Default for VolSurfaceSnapshotTaskConfig {
30    fn default() -> Self {
31        Self {
32            max_periods: 1000,
33            capture_every_5m_ms: INTERVAL_5M_MS,
34            capture_every_1h_ms: INTERVAL_1H_MS,
35            capture_every_1d_ms: INTERVAL_1D_MS,
36        }
37    }
38}
39
40impl VolSurfaceSnapshotTaskConfig {
41    pub fn from_env() -> Self {
42        let default = Self::default();
43        Self {
44            max_periods: read_positive_i64("VOL_SURFACE_SNAPSHOT_MAX_PERIODS", default.max_periods),
45            capture_every_5m_ms: read_positive_i64(
46                "VOL_SURFACE_SNAPSHOT_CAPTURE_EVERY_5M_MS",
47                default.capture_every_5m_ms,
48            ),
49            capture_every_1h_ms: read_positive_i64(
50                "VOL_SURFACE_SNAPSHOT_CAPTURE_EVERY_1H_MS",
51                default.capture_every_1h_ms,
52            ),
53            capture_every_1d_ms: read_positive_i64(
54                "VOL_SURFACE_SNAPSHOT_CAPTURE_EVERY_1D_MS",
55                default.capture_every_1d_ms,
56            ),
57        }
58    }
59}
60
61#[derive(Clone)]
62pub struct VolSurfaceSnapshotTask {
63    vol_oracle: SharedVolOracle,
64    db: Arc<dyn AnalyticsWriter>,
65    config: VolSurfaceSnapshotTaskConfig,
66}
67
68impl VolSurfaceSnapshotTask {
69    pub fn new(
70        vol_oracle: SharedVolOracle,
71        db: Arc<dyn AnalyticsWriter>,
72        config: VolSurfaceSnapshotTaskConfig,
73    ) -> Self {
74        Self {
75            vol_oracle,
76            db,
77            config,
78        }
79    }
80
81    async fn capture_interval(&self, interval_ms: i64, label: &'static str) {
82        let bucket_timestamp_ms = align_timestamp_ms(now_timestamp_ms(), interval_ms);
83
84        // Collect deduplicated underlyings from oracle statuses
85        let statuses = self.vol_oracle.statuses();
86        let mut seen = HashSet::new();
87        let underlyings: Vec<String> = statuses
88            .into_iter()
89            .filter(|s| seen.insert(s.underlying.clone()))
90            .map(|s| s.underlying)
91            .collect();
92
93        if underlyings.is_empty() {
94            debug!("Vol surface snapshot skipped for {}: no underlyings", label);
95            return;
96        }
97
98        let mut captured = 0u32;
99        for underlying in &underlyings {
100            let snapshot = match self.vol_oracle.get_surface_snapshot(underlying) {
101                Some(s) => s,
102                None => {
103                    debug!(
104                        underlying,
105                        interval = label,
106                        "Vol surface snapshot skipped: no surface data"
107                    );
108                    continue;
109                }
110            };
111
112            let surface_json = match serde_json::to_value(&snapshot) {
113                Ok(v) => v,
114                Err(err) => {
115                    error!(
116                        underlying,
117                        interval = label,
118                        error = %err,
119                        "Vol surface snapshot failed to serialize"
120                    );
121                    counter!(
122                        "ht_vol_surface_snapshot_total",
123                        "interval" => label,
124                        "status" => "error"
125                    )
126                    .increment(1);
127                    continue;
128                }
129            };
130
131            match self
132                .db
133                .upsert_vol_surface_snapshot(
134                    interval_ms,
135                    bucket_timestamp_ms,
136                    underlying,
137                    surface_json,
138                    self.config.max_periods,
139                )
140                .await
141            {
142                Ok(_affected) => {
143                    captured += 1;
144                }
145                Err(err) => {
146                    error!(
147                        underlying,
148                        interval = label,
149                        bucket_timestamp_ms,
150                        error = %err,
151                        "Failed to persist vol surface snapshot"
152                    );
153                    counter!(
154                        "ht_vol_surface_snapshot_total",
155                        "interval" => label,
156                        "status" => "error"
157                    )
158                    .increment(1);
159                }
160            }
161        }
162
163        if captured > 0 {
164            info!(
165                "Vol surface snapshot captured interval={} underlyings={} bucket_ts={}",
166                label, captured, bucket_timestamp_ms
167            );
168            counter!("ht_vol_surface_snapshot_total", "interval" => label, "status" => "success")
169                .increment(captured as u64);
170        }
171    }
172}
173
174fn aligned_interval(interval_ms: i64) -> tokio::time::Interval {
175    let now_ms = now_timestamp_ms();
176    let next_ms = next_bucket_start_ms(now_ms, interval_ms);
177    let delay_ms = (next_ms - now_ms).max(0) as u64;
178
179    let start_at = Instant::now() + Duration::from_millis(delay_ms);
180    let mut ticker = interval_at(start_at, Duration::from_millis(interval_ms as u64));
181    ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
182    ticker
183}
184
185fn now_timestamp_ms() -> i64 {
186    chrono::Utc::now().timestamp_millis()
187}
188
189fn align_timestamp_ms(timestamp_ms: i64, interval_ms: i64) -> i64 {
190    timestamp_ms - (timestamp_ms % interval_ms)
191}
192
193fn next_bucket_start_ms(timestamp_ms: i64, interval_ms: i64) -> i64 {
194    align_timestamp_ms(timestamp_ms, interval_ms) + interval_ms
195}
196
197fn read_positive_i64(key: &str, default: i64) -> i64 {
198    match std::env::var(key) {
199        Ok(value) => match value.parse::<i64>() {
200            Ok(parsed) if parsed > 0 => parsed,
201            _ => {
202                error!(
203                    "Invalid {}='{}' (must be positive integer), using default {}",
204                    key, value, default
205                );
206                default
207            }
208        },
209        Err(_) => default,
210    }
211}
212
213#[async_trait::async_trait]
214impl crate::shared::service::Service for VolSurfaceSnapshotTask {
215    fn name(&self) -> &'static str {
216        "VolSurfaceSnapshotTask"
217    }
218
219    fn owner(&self) -> crate::shared::service::ServiceOwner {
220        crate::shared::service::ServiceOwner::Api
221    }
222
223    async fn run(
224        self: std::sync::Arc<Self>,
225        mut shutdown: crate::shared::ShutdownRx,
226    ) -> anyhow::Result<()> {
227        let mut ticker_5m = aligned_interval(self.config.capture_every_5m_ms);
228        let mut ticker_1h = aligned_interval(self.config.capture_every_1h_ms);
229        let mut ticker_1d = aligned_interval(self.config.capture_every_1d_ms);
230
231        loop {
232            tokio::select! {
233                _ = shutdown.recv() => {
234                    info!("Vol surface snapshot task received shutdown signal");
235                    break;
236                }
237                _ = ticker_5m.tick() => {
238                    self.capture_interval(INTERVAL_5M_MS, "5m").await;
239                }
240                _ = ticker_1h.tick() => {
241                    self.capture_interval(INTERVAL_1H_MS, "1h").await;
242                }
243                _ = ticker_1d.tick() => {
244                    self.capture_interval(INTERVAL_1D_MS, "1d").await;
245                }
246            }
247        }
248        Ok(())
249    }
250}
251
252#[cfg(test)]
253mod tests {
254    use super::*;
255
256    #[test]
257    fn test_config_defaults() {
258        let config = VolSurfaceSnapshotTaskConfig::default();
259        assert_eq!(config.max_periods, 1000);
260        assert_eq!(config.capture_every_5m_ms, INTERVAL_5M_MS);
261        assert_eq!(config.capture_every_1h_ms, INTERVAL_1H_MS);
262        assert_eq!(config.capture_every_1d_ms, INTERVAL_1D_MS);
263    }
264
265    #[test]
266    fn test_align_timestamp_ms() {
267        let ts = 1_700_000_123_456i64;
268        let aligned_5m = align_timestamp_ms(ts, INTERVAL_5M_MS);
269        assert_eq!(aligned_5m % INTERVAL_5M_MS, 0);
270        assert!(aligned_5m <= ts);
271        assert!((ts - aligned_5m) < INTERVAL_5M_MS);
272
273        let aligned_1h = align_timestamp_ms(ts, INTERVAL_1H_MS);
274        assert_eq!(aligned_1h % INTERVAL_1H_MS, 0);
275        assert!(aligned_1h <= ts);
276        assert!((ts - aligned_1h) < INTERVAL_1H_MS);
277    }
278
279    #[test]
280    fn test_next_bucket_start_ms() {
281        let ts = 1_700_000_123_456i64;
282        let next_5m = next_bucket_start_ms(ts, INTERVAL_5M_MS);
283        assert_eq!(next_5m % INTERVAL_5M_MS, 0);
284        assert!(next_5m > ts);
285        assert_eq!(
286            next_5m - align_timestamp_ms(ts, INTERVAL_5M_MS),
287            INTERVAL_5M_MS
288        );
289    }
290}