Skip to main content

hypercall_api/handlers/
historical_pnl.rs

1use crate::sonic_json::SonicJson;
2use axum::{
3    extract::{Query, State},
4    http::StatusCode,
5};
6use hypercall_db::AnalyticsReader;
7use hypercall_types::{
8    HistoricalPnlInterval, HistoricalPnlPoint, HistoricalPnlResponse, WalletAddress,
9};
10use rust_decimal::Decimal;
11use rust_decimal_macros::dec;
12use serde::{Deserialize, Serialize};
13use utoipa::{IntoParams, ToSchema};
14
15use super::AppState;
16use crate::models::ApiResponse;
17
18/// Short helper to mask a wallet address in log output: keeps the first 6 and
19/// last 4 hex characters, so operators can correlate errors without logging
20/// the full identifier.
21fn mask_wallet(wallet: &WalletAddress) -> String {
22    let s = wallet.to_string();
23    let head = &s[..6.min(s.len())];
24    let tail = &s[s.len().saturating_sub(4)..];
25    format!("{head}..{tail}")
26}
27
28/// One deposit/withdraw ledger event, chronologically ordered by caller.
29#[derive(Debug, Clone, Copy)]
30pub(crate) struct DepositEvent {
31    pub event_ts_ms: i64,
32    pub delta: Decimal,
33}
34
35/// Walk an ordered deposit/withdraw event stream against a snapshot-timestamp
36/// stream, returning the cumulative `net_deposits` at each snapshot. Events
37/// with `event_ts_ms <= snapshot_ts` count toward that snapshot; events with
38/// equal timestamps are applied in the order they appear in `events`.
39///
40/// Pure function, no I/O — kept outside the handler so it's unit-testable.
41pub(crate) fn cumulative_net_deposits_per_snapshot(
42    events: &[DepositEvent],
43    snapshot_timestamps_ms: &[i64],
44) -> Vec<Decimal> {
45    let mut out = Vec::with_capacity(snapshot_timestamps_ms.len());
46    let mut cumulative = dec!(0);
47    let mut evt_idx = 0usize;
48    for &ts in snapshot_timestamps_ms {
49        while evt_idx < events.len() && events[evt_idx].event_ts_ms <= ts {
50            cumulative += events[evt_idx].delta;
51            evt_idx += 1;
52        }
53        out.push(cumulative);
54    }
55    out
56}
57
58#[derive(Debug, Deserialize, IntoParams)]
59pub struct HistoricalPnlQuery {
60    /// Wallet address to query
61    #[param(example = "0x1234567890abcdef1234567890abcdef12345678")]
62    pub wallet: WalletAddress,
63    /// Interval bucket size (`5m`, `1h`, `1d`)
64    #[param(example = "1h")]
65    pub interval: HistoricalPnlInterval,
66    /// Maximum number of periods to return (default: 100, max: 100)
67    #[param(maximum = 100, example = 100)]
68    pub limit: Option<usize>,
69    /// Return per-symbol attribution blobs on each point. Defaults to `false` because the blob is a few KB per row and sits in TOAST, opt in only when the caller actually renders the breakdown view.
70    #[param(example = false)]
71    pub include_attribution: Option<bool>,
72}
73
74#[derive(Debug, Serialize, ToSchema)]
75pub struct HistoricalPnlApiResponse {
76    pub success: bool,
77    pub data: Option<HistoricalPnlResponse>,
78    pub error: Option<String>,
79}
80
81/// Get historical equity snapshots for a wallet.
82#[utoipa::path(
83    get,
84    path = "/historical-pnl",
85    params(HistoricalPnlQuery),
86    responses(
87        (status = 200, description = "Historical equity snapshots", body = HistoricalPnlApiResponse),
88        (status = 400, description = "Invalid query parameters"),
89        (status = 500, description = "Internal server error")
90    ),
91    security(("wallet_query" = [])),
92    tag = "Portfolio"
93)]
94pub async fn get_historical_pnl(
95    State(app_state): State<AppState>,
96    Query(params): Query<HistoricalPnlQuery>,
97) -> Result<SonicJson<ApiResponse<HistoricalPnlResponse>>, StatusCode> {
98    let limit = params.limit.unwrap_or(100).min(100);
99    let include_attribution = params.include_attribution.unwrap_or(false);
100
101    // Both fetches are independent — run them in parallel so the handler's
102    // wall-clock time is max(snapshots, ledger_events) instead of their sum.
103    let analytics: &dyn AnalyticsReader = app_state.db.as_ref();
104    let points_fut = analytics.get_historical_pnl(
105        &params.wallet,
106        params.interval.as_ms(),
107        limit,
108        include_attribution,
109    );
110    let wallet_for_ledger = params.wallet;
111    let ledger_fut = analytics.get_deposit_withdraw_events(&wallet_for_ledger);
112    let (points, deposit_event_rows) = tokio::try_join!(
113        async {
114            points_fut.await.map_err(|e| {
115                tracing::error!(
116                    "Failed to fetch historical pnl for wallet={} interval={}: {}",
117                    mask_wallet(&params.wallet),
118                    params.interval.as_str(),
119                    e
120                );
121                StatusCode::INTERNAL_SERVER_ERROR
122            })
123        },
124        async {
125            ledger_fut.await.map_err(|e| {
126                tracing::error!(
127                    "historical-pnl deposit-event lookup failed wallet={}: {}",
128                    mask_wallet(&params.wallet),
129                    e
130                );
131                StatusCode::INTERNAL_SERVER_ERROR
132            })
133        },
134    )?;
135
136    let deposit_events: Vec<DepositEvent> = deposit_event_rows
137        .into_iter()
138        .map(|r| DepositEvent {
139            event_ts_ms: r.event_ts_ms,
140            delta: r.delta,
141        })
142        .collect();
143
144    let snapshot_timestamps: Vec<i64> = points.iter().map(|p| p.timestamp_ms).collect();
145    let cumulatives = cumulative_net_deposits_per_snapshot(&deposit_events, &snapshot_timestamps);
146
147    let points = points
148        .into_iter()
149        .zip(cumulatives)
150        .map(|(point, cumulative)| {
151            let attribution = point
152                .attribution
153                .and_then(|bytes| hypercall_db::decode_pnl_attribution_values(&bytes).ok());
154            HistoricalPnlPoint {
155                timestamp: point.timestamp_ms,
156                equity: point.total_equity,
157                attribution,
158                net_deposits: Some(cumulative),
159            }
160        })
161        .collect();
162
163    let response = HistoricalPnlResponse {
164        wallet_address: params.wallet,
165        interval: params.interval,
166        points,
167    };
168
169    Ok(SonicJson(ApiResponse {
170        success: true,
171        data: Some(response),
172        error: None,
173    }))
174}
175
176#[cfg(test)]
177mod tests {
178    use super::*;
179    use hypercall_types::{
180        HISTORICAL_PNL_INTERVAL_1D_MS, HISTORICAL_PNL_INTERVAL_1H_MS, HISTORICAL_PNL_INTERVAL_5M_MS,
181    };
182
183    #[test]
184    fn test_historical_pnl_interval_mapping() {
185        assert_eq!(
186            HistoricalPnlInterval::FiveMinutes.as_ms(),
187            HISTORICAL_PNL_INTERVAL_5M_MS
188        );
189        assert_eq!(
190            HistoricalPnlInterval::OneHour.as_ms(),
191            HISTORICAL_PNL_INTERVAL_1H_MS
192        );
193        assert_eq!(
194            HistoricalPnlInterval::OneDay.as_ms(),
195            HISTORICAL_PNL_INTERVAL_1D_MS
196        );
197
198        assert_eq!(HistoricalPnlInterval::FiveMinutes.as_str(), "5m");
199        assert_eq!(HistoricalPnlInterval::OneHour.as_str(), "1h");
200        assert_eq!(HistoricalPnlInterval::OneDay.as_str(), "1d");
201    }
202
203    #[test]
204    fn test_historical_pnl_interval_deserialization() {
205        let i5m: HistoricalPnlInterval = sonic_rs::from_str("\"5m\"").expect("parse 5m");
206        let i1h: HistoricalPnlInterval = sonic_rs::from_str("\"1h\"").expect("parse 1h");
207        let i1d: HistoricalPnlInterval = sonic_rs::from_str("\"1d\"").expect("parse 1d");
208
209        assert_eq!(i5m, HistoricalPnlInterval::FiveMinutes);
210        assert_eq!(i1h, HistoricalPnlInterval::OneHour);
211        assert_eq!(i1d, HistoricalPnlInterval::OneDay);
212
213        let invalid = sonic_rs::from_str::<HistoricalPnlInterval>("\"10m\"");
214        assert!(
215            invalid.is_err(),
216            "invalid interval should fail to deserialize"
217        );
218    }
219
220    fn ev(ts: i64, delta: &str) -> DepositEvent {
221        DepositEvent {
222            event_ts_ms: ts,
223            delta: delta.parse().expect("valid decimal"),
224        }
225    }
226
227    #[test]
228    fn net_deposits_is_zero_before_any_event() {
229        // Snapshot at t=50 exists before the first deposit at t=100 → 0.
230        let events = [ev(100, "10000")];
231        let snaps = [50, 100, 150];
232        let got = cumulative_net_deposits_per_snapshot(&events, &snaps);
233        assert_eq!(
234            got,
235            vec![dec!(0), dec!(10000), dec!(10000)],
236            "snapshot before the first event should be 0"
237        );
238    }
239
240    #[test]
241    fn net_deposits_accumulates_across_buckets() {
242        // jake's real pattern: three deposits over time totaling $100K.
243        let events = [ev(1_000, "10000"), ev(2_000, "10000"), ev(3_000, "80000")];
244        let snaps = [500, 1_500, 2_500, 3_500];
245        let got = cumulative_net_deposits_per_snapshot(&events, &snaps);
246        assert_eq!(got, vec![dec!(0), dec!(10000), dec!(20000), dec!(100000)]);
247    }
248
249    #[test]
250    fn net_deposits_counts_events_equal_to_snapshot_timestamp() {
251        // An event exactly at the snapshot boundary should be included.
252        let events = [ev(1_000, "5000")];
253        let snaps = [999, 1_000, 1_001];
254        let got = cumulative_net_deposits_per_snapshot(&events, &snaps);
255        assert_eq!(got, vec![dec!(0), dec!(5000), dec!(5000)]);
256    }
257
258    #[test]
259    fn net_deposits_applies_same_timestamp_events_in_order() {
260        // Two events at the identical timestamp both count toward that snap.
261        let events = [ev(500, "10000"), ev(500, "-2500"), ev(500, "1000")];
262        let snaps = [499, 500, 501];
263        let got = cumulative_net_deposits_per_snapshot(&events, &snaps);
264        assert_eq!(got, vec![dec!(0), dec!(8500), dec!(8500)]);
265    }
266
267    #[test]
268    fn net_deposits_handles_withdraws() {
269        // Withdraws reduce the running cumulative.
270        let events = [
271            ev(100, "50000"),  // deposit
272            ev(200, "-20000"), // withdraw
273            ev(300, "10000"),  // deposit
274        ];
275        let snaps = [50, 150, 250, 350];
276        let got = cumulative_net_deposits_per_snapshot(&events, &snaps);
277        assert_eq!(got, vec![dec!(0), dec!(50000), dec!(30000), dec!(40000)]);
278    }
279
280    #[test]
281    fn net_deposits_empty_events_yields_zero_at_every_snapshot() {
282        let got = cumulative_net_deposits_per_snapshot(&[], &[100, 200, 300]);
283        assert_eq!(got, vec![dec!(0), dec!(0), dec!(0)]);
284    }
285
286    #[test]
287    fn net_deposits_empty_snapshots_yields_empty_vec() {
288        let events = [ev(100, "1000")];
289        let got = cumulative_net_deposits_per_snapshot(&events, &[]);
290        assert_eq!(got, Vec::<Decimal>::new());
291    }
292}