1use std::time::Duration;
2
3use metrics::counter;
4use rust_decimal::Decimal;
5use tokio::time::{interval_at, Instant, MissedTickBehavior};
6use tracing::{debug, error, info};
7
8use crate::pnl_attribution::{encode_attribution, Attribution, SymbolAttribution};
9use crate::read_cache::portfolio::PortfolioCache;
10use hypercall_db::AnalyticsWriter;
11use hypercall_types::{
12 WalletAddress, HISTORICAL_PNL_INTERVAL_1D_MS, HISTORICAL_PNL_INTERVAL_1H_MS,
13 HISTORICAL_PNL_INTERVAL_5M_MS,
14};
15use rust_decimal_macros::dec;
16use std::collections::HashMap;
17use std::sync::Arc;
18
19pub const INTERVAL_5M_MS: i64 = HISTORICAL_PNL_INTERVAL_5M_MS;
20pub const INTERVAL_1H_MS: i64 = HISTORICAL_PNL_INTERVAL_1H_MS;
21pub const INTERVAL_1D_MS: i64 = HISTORICAL_PNL_INTERVAL_1D_MS;
22
23#[derive(Debug, Clone)]
25pub struct HistoricalPnlTaskConfig {
26 pub max_periods: i64,
28 pub capture_every_5m_ms: i64,
30 pub capture_every_1h_ms: i64,
32 pub capture_every_1d_ms: i64,
34}
35
36impl Default for HistoricalPnlTaskConfig {
37 fn default() -> Self {
38 Self {
39 max_periods: 100,
40 capture_every_5m_ms: INTERVAL_5M_MS,
41 capture_every_1h_ms: INTERVAL_1H_MS,
42 capture_every_1d_ms: INTERVAL_1D_MS,
43 }
44 }
45}
46
47impl HistoricalPnlTaskConfig {
48 pub fn from_config(config: &crate::backend_config::HistoricalPnlRuntimeConfig) -> Self {
49 Self {
50 max_periods: config.max_periods,
51 capture_every_5m_ms: config.capture_every_5m_ms,
52 capture_every_1h_ms: config.capture_every_1h_ms,
53 capture_every_1d_ms: config.capture_every_1d_ms,
54 }
55 }
56}
57
58#[derive(Clone)]
59pub struct HistoricalPnlSnapshotTask {
61 portfolio_cache: Arc<PortfolioCache>,
62 db: Arc<dyn AnalyticsWriter>,
63 config: HistoricalPnlTaskConfig,
64}
65
66impl HistoricalPnlSnapshotTask {
67 pub fn new(
68 portfolio_cache: Arc<PortfolioCache>,
69 db: Arc<dyn AnalyticsWriter>,
70 config: HistoricalPnlTaskConfig,
71 ) -> Self {
72 Self {
73 portfolio_cache,
74 db,
75 config,
76 }
77 }
78
79 async fn capture_interval(&self, interval_ms: i64, label: &'static str) {
80 let bucket_timestamp_ms = align_timestamp_ms(now_timestamp_ms(), interval_ms);
81
82 let portfolios = self.portfolio_cache.get_service().all_portfolios().await;
83 let wallets: Vec<WalletAddress> = portfolios.keys().copied().collect();
84
85 if wallets.is_empty() {
86 debug!("Historical pnl snapshot skipped for {}: no wallets", label);
87 return;
88 }
89
90 let mut snapshots: Vec<(WalletAddress, Decimal, Option<Vec<u8>>)> =
91 Vec::with_capacity(wallets.len());
92
93 for wallet in &wallets {
94 match self
95 .portfolio_cache
96 .compute_wallet_margin_snapshot(wallet)
97 .await
98 {
99 Ok(snapshot) => {
100 let attr_bytes = {
112 let mut by_symbol = HashMap::new();
113 let mut total_pnl = dec!(0);
114
115 let open_symbols: Vec<String> = portfolios
117 .get(wallet)
118 .map(|b| {
119 b.positions
120 .iter()
121 .filter(|(_, p)| p.amount != dec!(0))
122 .map(|(sym, _)| sym.clone())
123 .collect()
124 })
125 .unwrap_or_default();
126
127 let marks = if open_symbols.is_empty() {
128 HashMap::new()
129 } else {
130 match self
131 .db
132 .get_theo_marks_at_timestamp(&open_symbols, bucket_timestamp_ms)
133 .await
134 {
135 Ok(m) => m,
136 Err(e) => {
137 error!(
138 "attribution theo-mark lookup failed wallet={} ts={}: {}",
139 wallet, bucket_timestamp_ms, e
140 );
141 HashMap::new()
142 }
143 }
144 };
145
146 if let Some(balance) = portfolios.get(wallet) {
148 for (symbol, pos) in &balance.positions {
149 if pos.amount == dec!(0) && pos.realized_pnl == dec!(0) {
150 continue;
151 }
152 let unrealized = if pos.amount == dec!(0) {
153 dec!(0)
154 } else {
155 let mark = marks.get(symbol).copied().unwrap_or(dec!(0));
156 (mark - pos.entry_price) * pos.amount
157 };
158 let sa = SymbolAttribution {
159 position: pos.amount,
160 entry_price: pos.entry_price,
161 realized_pnl: pos.realized_pnl,
162 unrealized_pnl: unrealized,
163 total_pnl: pos.realized_pnl + unrealized,
164 };
165 total_pnl += sa.total_pnl;
166 by_symbol.insert(symbol.clone(), sa);
167 }
168 }
169
170 let settlement_result: anyhow::Result<Vec<(String, Decimal)>> = self
171 .db
172 .get_settled_pnl_by_symbol(wallet, bucket_timestamp_ms)
173 .await;
174 if let Ok(settlements) = settlement_result {
175 for (reference_symbol, pnl) in settlements {
176 if by_symbol.contains_key(&reference_symbol) {
177 continue;
178 }
179 if pnl == dec!(0) {
180 continue;
181 }
182 let sa = SymbolAttribution {
183 position: dec!(0),
184 entry_price: dec!(0),
185 realized_pnl: pnl,
186 unrealized_pnl: dec!(0),
187 total_pnl: pnl,
188 };
189 total_pnl += sa.total_pnl;
190 by_symbol.insert(reference_symbol, sa);
191 }
192 }
193
194 let attr = Attribution {
200 by_symbol,
201 total_pnl,
202 };
203
204 if attr.by_symbol.is_empty() {
205 None
206 } else {
207 Some(encode_attribution(&attr))
208 }
209 };
210
211 snapshots.push((*wallet, snapshot.margin_summary.equity, attr_bytes));
212 }
213 Err(e) => {
214 error!(
215 "Historical pnl snapshot failed wallet={} interval={} bucket_ts={}: {}",
216 wallet, label, bucket_timestamp_ms, e
217 );
218 counter!(
219 "ht_historical_pnl_snapshot_total",
220 "interval" => label,
221 "status" => "error"
222 )
223 .increment(1);
224 continue;
225 }
226 }
227 }
228
229 if snapshots.is_empty() {
230 debug!(
231 "Historical pnl snapshot skipped for {}: no wallets with computed equity",
232 label
233 );
234 return;
235 }
236
237 match self
238 .db
239 .upsert_historical_pnl_batch(
240 interval_ms,
241 bucket_timestamp_ms,
242 &snapshots,
243 self.config.max_periods,
244 )
245 .await
246 {
247 Ok(affected) => {
248 info!(
249 "Historical pnl snapshot captured interval={} wallets={} affected_rows={} bucket_ts={}",
250 label,
251 snapshots.len(),
252 affected,
253 bucket_timestamp_ms
254 );
255 counter!("ht_historical_pnl_snapshot_total", "interval" => label, "status" => "success")
256 .increment(1);
257 }
258 Err(e) => {
259 error!(
260 "Failed to persist historical pnl snapshot interval={} bucket_ts={}: {}",
261 label, bucket_timestamp_ms, e
262 );
263 counter!("ht_historical_pnl_snapshot_total", "interval" => label, "status" => "error")
264 .increment(1);
265 }
266 }
267 }
268}
269
270fn aligned_interval(interval_ms: i64) -> tokio::time::Interval {
271 let now_ms = now_timestamp_ms();
272 let next_ms = next_bucket_start_ms(now_ms, interval_ms);
273 let delay_ms = (next_ms - now_ms).max(0) as u64;
274
275 let start_at = Instant::now() + Duration::from_millis(delay_ms);
276 let mut ticker = interval_at(start_at, Duration::from_millis(interval_ms as u64));
277 ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
278 ticker
279}
280
281fn now_timestamp_ms() -> i64 {
282 chrono::Utc::now().timestamp_millis()
283}
284
285fn align_timestamp_ms(timestamp_ms: i64, interval_ms: i64) -> i64 {
286 timestamp_ms - (timestamp_ms % interval_ms)
287}
288
289fn next_bucket_start_ms(timestamp_ms: i64, interval_ms: i64) -> i64 {
290 align_timestamp_ms(timestamp_ms, interval_ms) + interval_ms
291}
292
293#[async_trait::async_trait]
294impl crate::shared::service::Service for HistoricalPnlSnapshotTask {
295 fn name(&self) -> &'static str {
296 "HistoricalPnlSnapshotTask"
297 }
298
299 fn owner(&self) -> crate::shared::service::ServiceOwner {
300 crate::shared::service::ServiceOwner::Api
301 }
302
303 async fn run(
304 self: std::sync::Arc<Self>,
305 mut shutdown: crate::shared::ShutdownRx,
306 ) -> anyhow::Result<()> {
307 let mut ticker_5m = aligned_interval(self.config.capture_every_5m_ms);
308 let mut ticker_1h = aligned_interval(self.config.capture_every_1h_ms);
309 let mut ticker_1d = aligned_interval(self.config.capture_every_1d_ms);
310
311 loop {
312 tokio::select! {
313 _ = shutdown.recv() => {
314 info!("Historical pnl snapshot task received shutdown signal");
315 break;
316 }
317 _ = ticker_5m.tick() => {
318 self.capture_interval(INTERVAL_5M_MS, "5m").await;
319 }
320 _ = ticker_1h.tick() => {
321 self.capture_interval(INTERVAL_1H_MS, "1h").await;
322 }
323 _ = ticker_1d.tick() => {
324 self.capture_interval(INTERVAL_1D_MS, "1d").await;
325 }
326 }
327 }
328 Ok(())
329 }
330}
331
332#[cfg(test)]
333mod tests {
334 use super::*;
335
336 #[test]
337 fn test_align_timestamp_ms() {
338 let ts = 1_700_000_123_456i64;
339 let aligned_5m = align_timestamp_ms(ts, INTERVAL_5M_MS);
340 assert_eq!(aligned_5m % INTERVAL_5M_MS, 0);
341 assert!(aligned_5m <= ts);
342 assert!((ts - aligned_5m) < INTERVAL_5M_MS);
343
344 let aligned_1h = align_timestamp_ms(ts, INTERVAL_1H_MS);
345 assert_eq!(aligned_1h % INTERVAL_1H_MS, 0);
346 assert!(aligned_1h <= ts);
347 assert!((ts - aligned_1h) < INTERVAL_1H_MS);
348 }
349
350 #[test]
351 fn test_next_bucket_start_ms() {
352 let exact_5m = 1_700_000_100_000i64;
353 assert_eq!(
354 next_bucket_start_ms(exact_5m, INTERVAL_5M_MS),
355 exact_5m + INTERVAL_5M_MS
356 );
357
358 let between = exact_5m + 1234;
359 assert_eq!(
360 next_bucket_start_ms(between, INTERVAL_5M_MS),
361 exact_5m + INTERVAL_5M_MS
362 );
363 }
364}