1use std::collections::HashSet;
2use std::sync::Arc;
3use std::time::Duration;
4
5use metrics::counter;
6use tokio::time::{interval_at, Instant, MissedTickBehavior};
7use tracing::{debug, error, info};
8
9use super::historical_pnl::{INTERVAL_1D_MS, INTERVAL_1H_MS, INTERVAL_5M_MS};
10use crate::vol_oracle::SharedVolOracle;
11use hypercall_db::AnalyticsWriter;
12
13pub type VolSurfaceSnapshotConfig = VolSurfaceSnapshotTaskConfig;
15
16#[derive(Debug, Clone)]
18pub struct VolSurfaceSnapshotTaskConfig {
19 pub max_periods: i64,
21 pub capture_every_5m_ms: i64,
23 pub capture_every_1h_ms: i64,
25 pub capture_every_1d_ms: i64,
27}
28
29impl Default for VolSurfaceSnapshotTaskConfig {
30 fn default() -> Self {
31 Self {
32 max_periods: 1000,
33 capture_every_5m_ms: INTERVAL_5M_MS,
34 capture_every_1h_ms: INTERVAL_1H_MS,
35 capture_every_1d_ms: INTERVAL_1D_MS,
36 }
37 }
38}
39
40impl VolSurfaceSnapshotTaskConfig {
41 pub fn from_env() -> Self {
42 let default = Self::default();
43 Self {
44 max_periods: read_positive_i64("VOL_SURFACE_SNAPSHOT_MAX_PERIODS", default.max_periods),
45 capture_every_5m_ms: read_positive_i64(
46 "VOL_SURFACE_SNAPSHOT_CAPTURE_EVERY_5M_MS",
47 default.capture_every_5m_ms,
48 ),
49 capture_every_1h_ms: read_positive_i64(
50 "VOL_SURFACE_SNAPSHOT_CAPTURE_EVERY_1H_MS",
51 default.capture_every_1h_ms,
52 ),
53 capture_every_1d_ms: read_positive_i64(
54 "VOL_SURFACE_SNAPSHOT_CAPTURE_EVERY_1D_MS",
55 default.capture_every_1d_ms,
56 ),
57 }
58 }
59}
60
61#[derive(Clone)]
62pub struct VolSurfaceSnapshotTask {
63 vol_oracle: SharedVolOracle,
64 db: Arc<dyn AnalyticsWriter>,
65 config: VolSurfaceSnapshotTaskConfig,
66}
67
68impl VolSurfaceSnapshotTask {
69 pub fn new(
70 vol_oracle: SharedVolOracle,
71 db: Arc<dyn AnalyticsWriter>,
72 config: VolSurfaceSnapshotTaskConfig,
73 ) -> Self {
74 Self {
75 vol_oracle,
76 db,
77 config,
78 }
79 }
80
81 async fn capture_interval(&self, interval_ms: i64, label: &'static str) {
82 let bucket_timestamp_ms = align_timestamp_ms(now_timestamp_ms(), interval_ms);
83
84 let statuses = self.vol_oracle.statuses();
86 let mut seen = HashSet::new();
87 let underlyings: Vec<String> = statuses
88 .into_iter()
89 .filter(|s| seen.insert(s.underlying.clone()))
90 .map(|s| s.underlying)
91 .collect();
92
93 if underlyings.is_empty() {
94 debug!("Vol surface snapshot skipped for {}: no underlyings", label);
95 return;
96 }
97
98 let mut captured = 0u32;
99 for underlying in &underlyings {
100 let snapshot = match self.vol_oracle.get_surface_snapshot(underlying) {
101 Some(s) => s,
102 None => {
103 debug!(
104 underlying,
105 interval = label,
106 "Vol surface snapshot skipped: no surface data"
107 );
108 continue;
109 }
110 };
111
112 let surface_json = match serde_json::to_value(&snapshot) {
113 Ok(v) => v,
114 Err(err) => {
115 error!(
116 underlying,
117 interval = label,
118 error = %err,
119 "Vol surface snapshot failed to serialize"
120 );
121 counter!(
122 "ht_vol_surface_snapshot_total",
123 "interval" => label,
124 "status" => "error"
125 )
126 .increment(1);
127 continue;
128 }
129 };
130
131 match self
132 .db
133 .upsert_vol_surface_snapshot(
134 interval_ms,
135 bucket_timestamp_ms,
136 underlying,
137 surface_json,
138 self.config.max_periods,
139 )
140 .await
141 {
142 Ok(_affected) => {
143 captured += 1;
144 }
145 Err(err) => {
146 error!(
147 underlying,
148 interval = label,
149 bucket_timestamp_ms,
150 error = %err,
151 "Failed to persist vol surface snapshot"
152 );
153 counter!(
154 "ht_vol_surface_snapshot_total",
155 "interval" => label,
156 "status" => "error"
157 )
158 .increment(1);
159 }
160 }
161 }
162
163 if captured > 0 {
164 info!(
165 "Vol surface snapshot captured interval={} underlyings={} bucket_ts={}",
166 label, captured, bucket_timestamp_ms
167 );
168 counter!("ht_vol_surface_snapshot_total", "interval" => label, "status" => "success")
169 .increment(captured as u64);
170 }
171 }
172}
173
174fn aligned_interval(interval_ms: i64) -> tokio::time::Interval {
175 let now_ms = now_timestamp_ms();
176 let next_ms = next_bucket_start_ms(now_ms, interval_ms);
177 let delay_ms = (next_ms - now_ms).max(0) as u64;
178
179 let start_at = Instant::now() + Duration::from_millis(delay_ms);
180 let mut ticker = interval_at(start_at, Duration::from_millis(interval_ms as u64));
181 ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
182 ticker
183}
184
185fn now_timestamp_ms() -> i64 {
186 chrono::Utc::now().timestamp_millis()
187}
188
189fn align_timestamp_ms(timestamp_ms: i64, interval_ms: i64) -> i64 {
190 timestamp_ms - (timestamp_ms % interval_ms)
191}
192
193fn next_bucket_start_ms(timestamp_ms: i64, interval_ms: i64) -> i64 {
194 align_timestamp_ms(timestamp_ms, interval_ms) + interval_ms
195}
196
197fn read_positive_i64(key: &str, default: i64) -> i64 {
198 match std::env::var(key) {
199 Ok(value) => match value.parse::<i64>() {
200 Ok(parsed) if parsed > 0 => parsed,
201 _ => {
202 error!(
203 "Invalid {}='{}' (must be positive integer), using default {}",
204 key, value, default
205 );
206 default
207 }
208 },
209 Err(_) => default,
210 }
211}
212
213#[async_trait::async_trait]
214impl crate::shared::service::Service for VolSurfaceSnapshotTask {
215 fn name(&self) -> &'static str {
216 "VolSurfaceSnapshotTask"
217 }
218
219 fn owner(&self) -> crate::shared::service::ServiceOwner {
220 crate::shared::service::ServiceOwner::Api
221 }
222
223 async fn run(
224 self: std::sync::Arc<Self>,
225 mut shutdown: crate::shared::ShutdownRx,
226 ) -> anyhow::Result<()> {
227 let mut ticker_5m = aligned_interval(self.config.capture_every_5m_ms);
228 let mut ticker_1h = aligned_interval(self.config.capture_every_1h_ms);
229 let mut ticker_1d = aligned_interval(self.config.capture_every_1d_ms);
230
231 loop {
232 tokio::select! {
233 _ = shutdown.recv() => {
234 info!("Vol surface snapshot task received shutdown signal");
235 break;
236 }
237 _ = ticker_5m.tick() => {
238 self.capture_interval(INTERVAL_5M_MS, "5m").await;
239 }
240 _ = ticker_1h.tick() => {
241 self.capture_interval(INTERVAL_1H_MS, "1h").await;
242 }
243 _ = ticker_1d.tick() => {
244 self.capture_interval(INTERVAL_1D_MS, "1d").await;
245 }
246 }
247 }
248 Ok(())
249 }
250}
251
252#[cfg(test)]
253mod tests {
254 use super::*;
255
256 #[test]
257 fn test_config_defaults() {
258 let config = VolSurfaceSnapshotTaskConfig::default();
259 assert_eq!(config.max_periods, 1000);
260 assert_eq!(config.capture_every_5m_ms, INTERVAL_5M_MS);
261 assert_eq!(config.capture_every_1h_ms, INTERVAL_1H_MS);
262 assert_eq!(config.capture_every_1d_ms, INTERVAL_1D_MS);
263 }
264
265 #[test]
266 fn test_align_timestamp_ms() {
267 let ts = 1_700_000_123_456i64;
268 let aligned_5m = align_timestamp_ms(ts, INTERVAL_5M_MS);
269 assert_eq!(aligned_5m % INTERVAL_5M_MS, 0);
270 assert!(aligned_5m <= ts);
271 assert!((ts - aligned_5m) < INTERVAL_5M_MS);
272
273 let aligned_1h = align_timestamp_ms(ts, INTERVAL_1H_MS);
274 assert_eq!(aligned_1h % INTERVAL_1H_MS, 0);
275 assert!(aligned_1h <= ts);
276 assert!((ts - aligned_1h) < INTERVAL_1H_MS);
277 }
278
279 #[test]
280 fn test_next_bucket_start_ms() {
281 let ts = 1_700_000_123_456i64;
282 let next_5m = next_bucket_start_ms(ts, INTERVAL_5M_MS);
283 assert_eq!(next_5m % INTERVAL_5M_MS, 0);
284 assert!(next_5m > ts);
285 assert_eq!(
286 next_5m - align_timestamp_ms(ts, INTERVAL_5M_MS),
287 INTERVAL_5M_MS
288 );
289 }
290}