Skip to main content

hypercall/runtime/tasks/
historical_pnl.rs

1use std::time::Duration;
2
3use metrics::counter;
4use rust_decimal::Decimal;
5use tokio::time::{interval_at, Instant, MissedTickBehavior};
6use tracing::{debug, error, info};
7
8use crate::pnl_attribution::{encode_attribution, Attribution, SymbolAttribution};
9use crate::read_cache::portfolio::PortfolioCache;
10use hypercall_db::AnalyticsWriter;
11use hypercall_types::{
12    WalletAddress, HISTORICAL_PNL_INTERVAL_1D_MS, HISTORICAL_PNL_INTERVAL_1H_MS,
13    HISTORICAL_PNL_INTERVAL_5M_MS,
14};
15use rust_decimal_macros::dec;
16use std::collections::HashMap;
17use std::sync::Arc;
18
19pub const INTERVAL_5M_MS: i64 = HISTORICAL_PNL_INTERVAL_5M_MS;
20pub const INTERVAL_1H_MS: i64 = HISTORICAL_PNL_INTERVAL_1H_MS;
21pub const INTERVAL_1D_MS: i64 = HISTORICAL_PNL_INTERVAL_1D_MS;
22
23/// Configuration for the historical equity snapshot background task.
24#[derive(Debug, Clone)]
25pub struct HistoricalPnlTaskConfig {
26    /// Retention cap per `(wallet_address, interval)` in persisted snapshots.
27    pub max_periods: i64,
28    /// Capture cadence for the 5m bucket writer.
29    pub capture_every_5m_ms: i64,
30    /// Capture cadence for the 1h bucket writer.
31    pub capture_every_1h_ms: i64,
32    /// Capture cadence for the 1d bucket writer.
33    pub capture_every_1d_ms: i64,
34}
35
36impl Default for HistoricalPnlTaskConfig {
37    fn default() -> Self {
38        Self {
39            max_periods: 100,
40            capture_every_5m_ms: INTERVAL_5M_MS,
41            capture_every_1h_ms: INTERVAL_1H_MS,
42            capture_every_1d_ms: INTERVAL_1D_MS,
43        }
44    }
45}
46
47impl HistoricalPnlTaskConfig {
48    pub fn from_config(config: &crate::backend_config::HistoricalPnlRuntimeConfig) -> Self {
49        Self {
50            max_periods: config.max_periods,
51            capture_every_5m_ms: config.capture_every_5m_ms,
52            capture_every_1h_ms: config.capture_every_1h_ms,
53            capture_every_1d_ms: config.capture_every_1d_ms,
54        }
55    }
56}
57
58#[derive(Clone)]
59/// Background task that periodically snapshots wallet equity into the database.
60pub struct HistoricalPnlSnapshotTask {
61    portfolio_cache: Arc<PortfolioCache>,
62    db: Arc<dyn AnalyticsWriter>,
63    config: HistoricalPnlTaskConfig,
64}
65
66impl HistoricalPnlSnapshotTask {
67    pub fn new(
68        portfolio_cache: Arc<PortfolioCache>,
69        db: Arc<dyn AnalyticsWriter>,
70        config: HistoricalPnlTaskConfig,
71    ) -> Self {
72        Self {
73            portfolio_cache,
74            db,
75            config,
76        }
77    }
78
79    async fn capture_interval(&self, interval_ms: i64, label: &'static str) {
80        let bucket_timestamp_ms = align_timestamp_ms(now_timestamp_ms(), interval_ms);
81
82        let portfolios = self.portfolio_cache.get_service().all_portfolios().await;
83        let wallets: Vec<WalletAddress> = portfolios.keys().copied().collect();
84
85        if wallets.is_empty() {
86            debug!("Historical pnl snapshot skipped for {}: no wallets", label);
87            return;
88        }
89
90        let mut snapshots: Vec<(WalletAddress, Decimal, Option<Vec<u8>>)> =
91            Vec::with_capacity(wallets.len());
92
93        for wallet in &wallets {
94            match self
95                .portfolio_cache
96                .compute_wallet_margin_snapshot(wallet)
97                .await
98            {
99                Ok(snapshot) => {
100                    // Build attribution from portfolio positions + settlement PnL.
101                    //
102                    // Unrealized is (re)computed from the wallet's position + theo mark at
103                    // bucket_timestamp_ms rather than read from pos.unrealized_pnl, because
104                    // the live portfolio cache falls back to intrinsic value when UPNL=0
105                    // (see commit 5afaa73af) which makes the tail of the chart go flat and
106                    // disagree with the backfill's theo-based historical values.
107                    //
108                    // The final Attribution is then rolled up to collapse closed positions
109                    // into per-underlying realized buckets, bounding payload growth as
110                    // settled options accumulate forever.
111                    let attr_bytes = {
112                        let mut by_symbol = HashMap::new();
113                        let mut total_pnl = dec!(0);
114
115                        // Collect open-position symbols to look marks up for in one query.
116                        let open_symbols: Vec<String> = portfolios
117                            .get(wallet)
118                            .map(|b| {
119                                b.positions
120                                    .iter()
121                                    .filter(|(_, p)| p.amount != dec!(0))
122                                    .map(|(sym, _)| sym.clone())
123                                    .collect()
124                            })
125                            .unwrap_or_default();
126
127                        let marks = if open_symbols.is_empty() {
128                            HashMap::new()
129                        } else {
130                            match self
131                                .db
132                                .get_theo_marks_at_timestamp(&open_symbols, bucket_timestamp_ms)
133                                .await
134                            {
135                                Ok(m) => m,
136                                Err(e) => {
137                                    error!(
138                                        "attribution theo-mark lookup failed wallet={} ts={}: {}",
139                                        wallet, bucket_timestamp_ms, e
140                                    );
141                                    HashMap::new()
142                                }
143                            }
144                        };
145
146                        // Open positions from portfolio, unrealized from theo marks.
147                        if let Some(balance) = portfolios.get(wallet) {
148                            for (symbol, pos) in &balance.positions {
149                                if pos.amount == dec!(0) && pos.realized_pnl == dec!(0) {
150                                    continue;
151                                }
152                                let unrealized = if pos.amount == dec!(0) {
153                                    dec!(0)
154                                } else {
155                                    let mark = marks.get(symbol).copied().unwrap_or(dec!(0));
156                                    (mark - pos.entry_price) * pos.amount
157                                };
158                                let sa = SymbolAttribution {
159                                    position: pos.amount,
160                                    entry_price: pos.entry_price,
161                                    realized_pnl: pos.realized_pnl,
162                                    unrealized_pnl: unrealized,
163                                    total_pnl: pos.realized_pnl + unrealized,
164                                };
165                                total_pnl += sa.total_pnl;
166                                by_symbol.insert(symbol.clone(), sa);
167                            }
168                        }
169
170                        let settlement_result: anyhow::Result<Vec<(String, Decimal)>> = self
171                            .db
172                            .get_settled_pnl_by_symbol(wallet, bucket_timestamp_ms)
173                            .await;
174                        if let Ok(settlements) = settlement_result {
175                            for (reference_symbol, pnl) in settlements {
176                                if by_symbol.contains_key(&reference_symbol) {
177                                    continue;
178                                }
179                                if pnl == dec!(0) {
180                                    continue;
181                                }
182                                let sa = SymbolAttribution {
183                                    position: dec!(0),
184                                    entry_price: dec!(0),
185                                    realized_pnl: pnl,
186                                    unrealized_pnl: dec!(0),
187                                    total_pnl: pnl,
188                                };
189                                total_pnl += sa.total_pnl;
190                                by_symbol.insert(reference_symbol, sa);
191                            }
192                        }
193
194                        // Emit individual option symbols (no per-underlying
195                        // rollup): the frontend wants option names like
196                        // "BTC-20260414-55000-C" and does its own top-N
197                        // consolidation if a wallet has too many settled
198                        // rows to render.
199                        let attr = Attribution {
200                            by_symbol,
201                            total_pnl,
202                        };
203
204                        if attr.by_symbol.is_empty() {
205                            None
206                        } else {
207                            Some(encode_attribution(&attr))
208                        }
209                    };
210
211                    snapshots.push((*wallet, snapshot.margin_summary.equity, attr_bytes));
212                }
213                Err(e) => {
214                    error!(
215                        "Historical pnl snapshot failed wallet={} interval={} bucket_ts={}: {}",
216                        wallet, label, bucket_timestamp_ms, e
217                    );
218                    counter!(
219                        "ht_historical_pnl_snapshot_total",
220                        "interval" => label,
221                        "status" => "error"
222                    )
223                    .increment(1);
224                    continue;
225                }
226            }
227        }
228
229        if snapshots.is_empty() {
230            debug!(
231                "Historical pnl snapshot skipped for {}: no wallets with computed equity",
232                label
233            );
234            return;
235        }
236
237        match self
238            .db
239            .upsert_historical_pnl_batch(
240                interval_ms,
241                bucket_timestamp_ms,
242                &snapshots,
243                self.config.max_periods,
244            )
245            .await
246        {
247            Ok(affected) => {
248                info!(
249                    "Historical pnl snapshot captured interval={} wallets={} affected_rows={} bucket_ts={}",
250                    label,
251                    snapshots.len(),
252                    affected,
253                    bucket_timestamp_ms
254                );
255                counter!("ht_historical_pnl_snapshot_total", "interval" => label, "status" => "success")
256                    .increment(1);
257            }
258            Err(e) => {
259                error!(
260                    "Failed to persist historical pnl snapshot interval={} bucket_ts={}: {}",
261                    label, bucket_timestamp_ms, e
262                );
263                counter!("ht_historical_pnl_snapshot_total", "interval" => label, "status" => "error")
264                    .increment(1);
265            }
266        }
267    }
268}
269
270fn aligned_interval(interval_ms: i64) -> tokio::time::Interval {
271    let now_ms = now_timestamp_ms();
272    let next_ms = next_bucket_start_ms(now_ms, interval_ms);
273    let delay_ms = (next_ms - now_ms).max(0) as u64;
274
275    let start_at = Instant::now() + Duration::from_millis(delay_ms);
276    let mut ticker = interval_at(start_at, Duration::from_millis(interval_ms as u64));
277    ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
278    ticker
279}
280
281fn now_timestamp_ms() -> i64 {
282    chrono::Utc::now().timestamp_millis()
283}
284
285fn align_timestamp_ms(timestamp_ms: i64, interval_ms: i64) -> i64 {
286    timestamp_ms - (timestamp_ms % interval_ms)
287}
288
289fn next_bucket_start_ms(timestamp_ms: i64, interval_ms: i64) -> i64 {
290    align_timestamp_ms(timestamp_ms, interval_ms) + interval_ms
291}
292
293#[async_trait::async_trait]
294impl crate::shared::service::Service for HistoricalPnlSnapshotTask {
295    fn name(&self) -> &'static str {
296        "HistoricalPnlSnapshotTask"
297    }
298
299    fn owner(&self) -> crate::shared::service::ServiceOwner {
300        crate::shared::service::ServiceOwner::Api
301    }
302
303    async fn run(
304        self: std::sync::Arc<Self>,
305        mut shutdown: crate::shared::ShutdownRx,
306    ) -> anyhow::Result<()> {
307        let mut ticker_5m = aligned_interval(self.config.capture_every_5m_ms);
308        let mut ticker_1h = aligned_interval(self.config.capture_every_1h_ms);
309        let mut ticker_1d = aligned_interval(self.config.capture_every_1d_ms);
310
311        loop {
312            tokio::select! {
313                _ = shutdown.recv() => {
314                    info!("Historical pnl snapshot task received shutdown signal");
315                    break;
316                }
317                _ = ticker_5m.tick() => {
318                    self.capture_interval(INTERVAL_5M_MS, "5m").await;
319                }
320                _ = ticker_1h.tick() => {
321                    self.capture_interval(INTERVAL_1H_MS, "1h").await;
322                }
323                _ = ticker_1d.tick() => {
324                    self.capture_interval(INTERVAL_1D_MS, "1d").await;
325                }
326            }
327        }
328        Ok(())
329    }
330}
331
332#[cfg(test)]
333mod tests {
334    use super::*;
335
336    #[test]
337    fn test_align_timestamp_ms() {
338        let ts = 1_700_000_123_456i64;
339        let aligned_5m = align_timestamp_ms(ts, INTERVAL_5M_MS);
340        assert_eq!(aligned_5m % INTERVAL_5M_MS, 0);
341        assert!(aligned_5m <= ts);
342        assert!((ts - aligned_5m) < INTERVAL_5M_MS);
343
344        let aligned_1h = align_timestamp_ms(ts, INTERVAL_1H_MS);
345        assert_eq!(aligned_1h % INTERVAL_1H_MS, 0);
346        assert!(aligned_1h <= ts);
347        assert!((ts - aligned_1h) < INTERVAL_1H_MS);
348    }
349
350    #[test]
351    fn test_next_bucket_start_ms() {
352        let exact_5m = 1_700_000_100_000i64;
353        assert_eq!(
354            next_bucket_start_ms(exact_5m, INTERVAL_5M_MS),
355            exact_5m + INTERVAL_5M_MS
356        );
357
358        let between = exact_5m + 1234;
359        assert_eq!(
360            next_bucket_start_ms(between, INTERVAL_5M_MS),
361            exact_5m + INTERVAL_5M_MS
362        );
363    }
364}