Skip to main content

hypercall_db_diesel/
analytics.rs

1//! Analytics trait implementations for DieselDb.
2//!
3//! Implements `AnalyticsReader` and `AnalyticsWriter` from `hypercall-db`,
4//! covering historical PnL, theos, BBO snapshots, vol surfaces, settlement
5//! payouts, instrument existence checks, and trade/fill/order queries.
6
7use std::collections::{HashMap, HashSet};
8
9use anyhow::Result;
10use diesel::prelude::*;
11use diesel::upsert::excluded;
12use diesel_async::{AsyncConnection, RunQueryDsl};
13use rust_decimal::Decimal;
14use struct_convert::Convert;
15
16use crate::diesel_db::DieselDb;
17use crate::models::{
18    BboSnapshot, HistoricalTheoSnapshot, NewBboSnapshot, NewHistoricalPnlSnapshot,
19    NewHistoricalTheoSnapshot, NewVolSurfaceSnapshotRow, SettlementPayout, VolSurfaceSnapshotRow,
20};
21use crate::schema;
22use hypercall_db::{
23    AnalyticsReader, AnalyticsWriter, BboReferenceData, BboSnapshotRecord, FillRecord,
24    HistoricalPnlPoint, HistoricalTheoPoint, NewBboSnapshotInput, OrderRecord,
25    SettlementPayoutRecord, TradeRecord, VolSurfaceSnapshot,
26};
27use hypercall_types::{
28    WalletAddress, HISTORICAL_PNL_INTERVAL_1D_MS, HISTORICAL_PNL_INTERVAL_1H_MS,
29    HISTORICAL_PNL_INTERVAL_5M_MS, HISTORICAL_THEO_INTERVAL_1D_MS, HISTORICAL_THEO_INTERVAL_1H_MS,
30    HISTORICAL_THEO_INTERVAL_5M_MS,
31};
32
33use crate::models::NewSettlementPayoutSeen;
34
35// ---- Private row types for raw SQL queries ----
36
37#[derive(QueryableByName, Debug)]
38struct BboReferenceRow {
39    #[diesel(sql_type = diesel::sql_types::Text)]
40    pub symbol: String,
41    #[diesel(sql_type = diesel::sql_types::Numeric)]
42    pub reference_ask: Decimal,
43    #[diesel(sql_type = diesel::sql_types::BigInt)]
44    pub reference_ts: i64,
45    #[diesel(sql_type = diesel::sql_types::Bool)]
46    pub used_earliest_fallback: bool,
47}
48
49#[derive(QueryableByName, Debug, Convert)]
50#[convert(from_on = "TradeRecord")]
51struct TradeRow {
52    #[diesel(sql_type = diesel::sql_types::BigInt)]
53    pub trade_id: i64,
54    #[diesel(sql_type = diesel::sql_types::Text)]
55    pub symbol: String,
56    #[diesel(sql_type = diesel::sql_types::Numeric)]
57    pub price: Decimal,
58    #[diesel(sql_type = diesel::sql_types::Numeric)]
59    pub size: Decimal,
60    #[diesel(sql_type = diesel::sql_types::Bytea)]
61    pub maker_address: WalletAddress,
62    #[diesel(sql_type = diesel::sql_types::Bytea)]
63    pub taker_address: WalletAddress,
64    #[diesel(sql_type = diesel::sql_types::Numeric)]
65    pub maker_fee: Decimal,
66    #[diesel(sql_type = diesel::sql_types::Numeric)]
67    pub taker_fee: Decimal,
68    #[diesel(sql_type = diesel::sql_types::BigInt)]
69    pub timestamp: i64,
70    #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Timestamp>)]
71    #[convert_field(
72        custom_fn = ".map(|dt| chrono::DateTime::<chrono::Utc>::from_naive_utc_and_offset(dt, chrono::Utc))"
73    )]
74    pub created_at: Option<chrono::NaiveDateTime>,
75}
76
77#[derive(QueryableByName, Debug, Convert)]
78#[convert(from_on = "FillRecord")]
79struct FillRow {
80    #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::BigInt>)]
81    pub fill_id: Option<i64>,
82    #[diesel(sql_type = diesel::sql_types::BigInt)]
83    pub trade_id: i64,
84    #[diesel(sql_type = diesel::sql_types::Bytea)]
85    pub wallet_address: WalletAddress,
86    #[diesel(sql_type = diesel::sql_types::Text)]
87    pub symbol: String,
88    #[diesel(sql_type = diesel::sql_types::Numeric)]
89    pub price: Decimal,
90    #[diesel(sql_type = diesel::sql_types::Numeric)]
91    pub size: Decimal,
92    #[diesel(sql_type = diesel::sql_types::Numeric)]
93    pub fee: Decimal,
94    #[diesel(sql_type = diesel::sql_types::Bool)]
95    pub is_taker: bool,
96    #[diesel(sql_type = diesel::sql_types::BigInt)]
97    pub timestamp: i64,
98    #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Timestamp>)]
99    #[convert_field(
100        custom_fn = ".map(|dt| chrono::DateTime::<chrono::Utc>::from_naive_utc_and_offset(dt, chrono::Utc))"
101    )]
102    pub created_at: Option<chrono::NaiveDateTime>,
103    #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Bytea>)]
104    pub builder_code_address: Option<WalletAddress>,
105    #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Numeric>)]
106    pub builder_code_fee: Option<Decimal>,
107    #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Numeric>)]
108    pub realized_pnl: Option<Decimal>,
109    #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Numeric>)]
110    pub underlying_notional: Option<Decimal>,
111}
112
113#[derive(QueryableByName, Debug, Convert)]
114#[convert(from_on = "OrderRecord")]
115struct OrderRow {
116    #[diesel(sql_type = diesel::sql_types::BigInt)]
117    pub order_id: i64,
118    #[diesel(sql_type = diesel::sql_types::Bytea)]
119    pub wallet_address: WalletAddress,
120    #[diesel(sql_type = diesel::sql_types::Text)]
121    pub symbol: String,
122    #[diesel(sql_type = diesel::sql_types::Text)]
123    pub side: String,
124    #[diesel(sql_type = diesel::sql_types::Numeric)]
125    pub price: Decimal,
126    #[diesel(sql_type = diesel::sql_types::Numeric)]
127    pub size: Decimal,
128    #[diesel(sql_type = diesel::sql_types::Text)]
129    pub tif: String,
130    #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)]
131    pub client_id: Option<String>,
132    #[diesel(sql_type = diesel::sql_types::Bool)]
133    pub is_perp: bool,
134    #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)]
135    pub underlying: Option<String>,
136    #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Bool>)]
137    pub reduce_only: Option<bool>,
138    #[diesel(sql_type = diesel::sql_types::Text)]
139    pub status: String,
140    #[diesel(sql_type = diesel::sql_types::BigInt)]
141    #[convert_field(rename = "timestamp")]
142    pub created_at: i64,
143    #[diesel(sql_type = diesel::sql_types::Timestamp)]
144    #[convert_field(
145        rename = "created_at",
146        custom_fn = "Some(chrono::DateTime::<chrono::Utc>::from_naive_utc_and_offset(this.updated_at, chrono::Utc))"
147    )]
148    pub updated_at: chrono::NaiveDateTime,
149    #[diesel(sql_type = diesel::sql_types::Numeric)]
150    pub filled_size: Decimal,
151    #[diesel(sql_type = diesel::sql_types::Bool)]
152    pub mmp_enabled: bool,
153}
154
155fn normalize_materialized_order_status_filter(status: &str) -> String {
156    match status.trim().to_ascii_lowercase().as_str() {
157        "open" => "OPEN".to_string(),
158        "acked" => "ACKED".to_string(),
159        "partially_filled" | "partially-filled" => "PARTIALLY_FILLED".to_string(),
160        "filled" => "FILLED".to_string(),
161        "canceled" | "cancelled" => "CANCELED".to_string(),
162        "rejected" => "REJECTED".to_string(),
163        other => other.to_ascii_uppercase().replace('-', "_"),
164    }
165}
166
167// =============================================================================
168// AnalyticsReader implementation
169// =============================================================================
170
171#[async_trait::async_trait]
172impl AnalyticsReader for DieselDb {
173    async fn get_all_trades(&self, limit: usize, offset: usize) -> Result<Vec<TradeRecord>> {
174        let mut conn = self.get_conn().await?;
175
176        let trades = diesel::sql_query(
177            "SELECT trade_id, symbol, price, size,
178                    maker_address, taker_address, maker_fee,
179                    taker_fee, timestamp, created_at
180             FROM trades
181             ORDER BY timestamp DESC
182             LIMIT $1 OFFSET $2",
183        )
184        .bind::<diesel::sql_types::Integer, _>(limit as i32)
185        .bind::<diesel::sql_types::Integer, _>(offset as i32)
186        .load::<TradeRow>(&mut conn)
187        .await?;
188
189        Ok(trades.into_iter().map(Into::into).collect())
190    }
191
192    async fn get_trades_by_option(
193        &self,
194        option_id: &str,
195        limit: usize,
196    ) -> Result<Vec<TradeRecord>> {
197        let mut conn = self.get_conn().await?;
198
199        let trades = diesel::sql_query(
200            "SELECT trade_id, symbol, price, size,
201                    maker_address, taker_address, maker_fee,
202                    taker_fee, timestamp, created_at
203             FROM trades
204             WHERE symbol = $1
205             ORDER BY timestamp DESC
206             LIMIT $2",
207        )
208        .bind::<diesel::sql_types::Text, _>(option_id)
209        .bind::<diesel::sql_types::Integer, _>(limit as i32)
210        .load::<TradeRow>(&mut conn)
211        .await?;
212
213        Ok(trades.into_iter().map(Into::into).collect())
214    }
215
216    async fn get_trades_by_underlying(
217        &self,
218        underlying: &str,
219        limit: usize,
220        offset: usize,
221    ) -> Result<Vec<TradeRecord>> {
222        let mut conn = self.get_conn().await?;
223
224        let trades = diesel::sql_query(
225            "SELECT t.trade_id, t.symbol, t.price, t.size,
226                    t.maker_address, t.taker_address, t.maker_fee,
227                    t.taker_fee, t.timestamp, t.created_at
228             FROM trades t
229             INNER JOIN instruments i ON t.symbol = i.id
230             WHERE i.underlying = $1
231             ORDER BY t.timestamp DESC
232             LIMIT $2 OFFSET $3",
233        )
234        .bind::<diesel::sql_types::Text, _>(underlying)
235        .bind::<diesel::sql_types::Integer, _>(limit as i32)
236        .bind::<diesel::sql_types::Integer, _>(offset as i32)
237        .load::<TradeRow>(&mut conn)
238        .await?;
239
240        Ok(trades.into_iter().map(Into::into).collect())
241    }
242
243    async fn get_trades_by_account(
244        &self,
245        account: &WalletAddress,
246        limit: usize,
247        offset: usize,
248    ) -> Result<Vec<TradeRecord>> {
249        let mut conn = self.get_conn().await?;
250
251        let trades = diesel::sql_query(
252            "SELECT trade_id, symbol, price, size,
253                    maker_address, taker_address, maker_fee,
254                    taker_fee, timestamp, created_at
255             FROM trades
256             WHERE maker_address = $1 OR taker_address = $1
257             ORDER BY timestamp DESC
258             LIMIT $2 OFFSET $3",
259        )
260        .bind::<diesel::sql_types::Bytea, _>(account)
261        .bind::<diesel::sql_types::Integer, _>(limit as i32)
262        .bind::<diesel::sql_types::Integer, _>(offset as i32)
263        .load::<TradeRow>(&mut conn)
264        .await?;
265
266        Ok(trades.into_iter().map(Into::into).collect())
267    }
268
269    async fn get_trades_for_symbol_in_range(
270        &self,
271        symbol: &str,
272        start_time_ms: i64,
273        end_time_ms: i64,
274    ) -> Result<Vec<TradeRecord>> {
275        let mut conn = self.get_conn().await?;
276
277        let trades = diesel::sql_query(
278            "SELECT trade_id, symbol, price, size,
279                    maker_address, taker_address, maker_fee,
280                    taker_fee, timestamp, created_at
281             FROM trades
282             WHERE symbol = $1 AND timestamp >= $2 AND timestamp <= $3
283             ORDER BY timestamp ASC",
284        )
285        .bind::<diesel::sql_types::Text, _>(symbol)
286        .bind::<diesel::sql_types::BigInt, _>(start_time_ms)
287        .bind::<diesel::sql_types::BigInt, _>(end_time_ms)
288        .load::<TradeRow>(&mut conn)
289        .await?;
290
291        Ok(trades.into_iter().map(Into::into).collect())
292    }
293
294    async fn get_fills_by_account(
295        &self,
296        account: &WalletAddress,
297        limit: usize,
298        offset: usize,
299    ) -> Result<Vec<FillRecord>> {
300        let mut conn = self.get_conn().await?;
301
302        let fills = diesel::sql_query(
303            "SELECT fill_id, trade_id, wallet_address, symbol, price, size, fee, is_taker, timestamp, created_at, builder_code_address, builder_code_fee, realized_pnl, underlying_notional
304             FROM fills
305             WHERE wallet_address = $1
306             ORDER BY timestamp DESC
307             LIMIT $2 OFFSET $3",
308        )
309        .bind::<diesel::sql_types::Binary, _>(account)
310        .bind::<diesel::sql_types::Integer, _>(limit as i32)
311        .bind::<diesel::sql_types::Integer, _>(offset as i32)
312        .load::<FillRow>(&mut conn)
313        .await?;
314
315        Ok(fills.into_iter().map(Into::into).collect())
316    }
317
318    async fn get_orders_by_account(
319        &self,
320        account: &WalletAddress,
321        status: Option<&str>,
322        limit: usize,
323        offset: usize,
324    ) -> Result<Vec<OrderRecord>> {
325        let mut conn = self.get_conn().await?;
326
327        let orders = if let Some(status_filter) = status {
328            let normalized_status = normalize_materialized_order_status_filter(status_filter);
329            diesel::sql_query(
330                "SELECT
331                    oi.order_id,
332                    oi.wallet_address,
333                    oi.symbol,
334                    oi.side,
335                    oi.price,
336                    oi.size,
337                    oi.tif,
338                    oi.client_id,
339                    oi.is_perp,
340                    oi.underlying,
341                    oi.reduce_only,
342                    oi.status,
343                    oi.timestamp as created_at,
344                    oi.updated_at,
345                    oi.filled_size,
346                    oi.mmp_enabled
347                FROM order_infos oi
348                WHERE oi.wallet_address = $1 AND oi.status = $2
349                ORDER BY oi.timestamp DESC
350                LIMIT $3 OFFSET $4",
351            )
352            .bind::<diesel::sql_types::Binary, _>(account)
353            .bind::<diesel::sql_types::Text, _>(normalized_status)
354            .bind::<diesel::sql_types::Integer, _>(limit as i32)
355            .bind::<diesel::sql_types::Integer, _>(offset as i32)
356            .load::<OrderRow>(&mut conn)
357            .await?
358        } else {
359            diesel::sql_query(
360                "SELECT
361                    oi.order_id,
362                    oi.wallet_address,
363                    oi.symbol,
364                    oi.side,
365                    oi.price,
366                    oi.size,
367                    oi.tif,
368                    oi.client_id,
369                    oi.is_perp,
370                    oi.underlying,
371                    oi.reduce_only,
372                    oi.status,
373                    oi.timestamp as created_at,
374                    oi.updated_at,
375                    oi.filled_size,
376                    oi.mmp_enabled
377                FROM order_infos oi
378                WHERE oi.wallet_address = $1
379                ORDER BY oi.timestamp DESC
380                LIMIT $2 OFFSET $3",
381            )
382            .bind::<diesel::sql_types::Binary, _>(account)
383            .bind::<diesel::sql_types::Integer, _>(limit as i32)
384            .bind::<diesel::sql_types::Integer, _>(offset as i32)
385            .load::<OrderRow>(&mut conn)
386            .await?
387        };
388
389        Ok(orders.into_iter().map(Into::into).collect())
390    }
391
392    async fn trade_history_exists_for_symbol(&self, symbol: &str) -> Result<bool> {
393        let mut conn = self.get_conn().await?;
394
395        #[derive(QueryableByName, Debug)]
396        struct ExistsRow {
397            #[diesel(sql_type = diesel::sql_types::Bool)]
398            exists: bool,
399        }
400
401        let row =
402            diesel::sql_query("SELECT EXISTS(SELECT 1 FROM trades WHERE symbol = $1) AS exists")
403                .bind::<diesel::sql_types::Text, _>(symbol)
404                .get_result::<ExistsRow>(&mut conn)
405                .await?;
406
407        Ok(row.exists)
408    }
409
410    async fn historical_theo_history_exists_for_symbol(&self, symbol: &str) -> Result<bool> {
411        use schema::historical_theo_snapshots::dsl as ht;
412
413        let mut conn = self.get_conn().await?;
414
415        let exists = diesel::select(diesel::dsl::exists(
416            ht::historical_theo_snapshots.filter(ht::symbol.eq(symbol)),
417        ))
418        .get_result::<bool>(&mut conn)
419        .await?;
420
421        Ok(exists)
422    }
423
424    /// Historical equity point from DB, used by GET /historical-pnl.
425    ///
426    /// When `include_attribution` is false, the large `attribution` bytea is
427    /// omitted from the SELECT.
428    async fn get_historical_pnl(
429        &self,
430        wallet: &WalletAddress,
431        interval_ms: i64,
432        limit: usize,
433        include_attribution: bool,
434    ) -> Result<Vec<HistoricalPnlPoint>> {
435        use schema::historical_pnl_snapshots::dsl as hp;
436
437        let mut conn = self.get_conn().await?;
438
439        let mut rows: Vec<HistoricalPnlPoint> = if include_attribution {
440            hp::historical_pnl_snapshots
441                .filter(hp::wallet_address.eq(wallet))
442                .filter(hp::interval_ms.eq(interval_ms))
443                .order(hp::timestamp_ms.desc())
444                .limit(limit as i64)
445                .select((hp::timestamp_ms, hp::total_equity, hp::attribution))
446                .load::<(i64, Decimal, Option<Vec<u8>>)>(&mut conn)
447                .await?
448                .into_iter()
449                .map(
450                    |(timestamp_ms, total_equity, attribution)| HistoricalPnlPoint {
451                        timestamp_ms,
452                        total_equity,
453                        attribution,
454                    },
455                )
456                .collect()
457        } else {
458            hp::historical_pnl_snapshots
459                .filter(hp::wallet_address.eq(wallet))
460                .filter(hp::interval_ms.eq(interval_ms))
461                .order(hp::timestamp_ms.desc())
462                .limit(limit as i64)
463                .select((hp::timestamp_ms, hp::total_equity))
464                .load::<(i64, Decimal)>(&mut conn)
465                .await?
466                .into_iter()
467                .map(|(timestamp_ms, total_equity)| HistoricalPnlPoint {
468                    timestamp_ms,
469                    total_equity,
470                    attribution: None,
471                })
472                .collect()
473        };
474
475        rows.reverse();
476        Ok(rows)
477    }
478
479    async fn get_historical_theos(
480        &self,
481        symbol: &str,
482        interval_ms: i64,
483        limit: usize,
484    ) -> Result<Vec<HistoricalTheoPoint>> {
485        use schema::historical_theo_snapshots::dsl as ht;
486
487        let mut conn = self.get_conn().await?;
488
489        let mut rows = ht::historical_theo_snapshots
490            .filter(ht::symbol.eq(symbol))
491            .filter(ht::interval_ms.eq(interval_ms))
492            .order(ht::timestamp_ms.desc())
493            .limit(limit as i64)
494            .load::<HistoricalTheoSnapshot>(&mut conn)
495            .await?;
496
497        rows.reverse();
498
499        Ok(rows
500            .into_iter()
501            .map(|row| HistoricalTheoPoint {
502                timestamp_ms: row.timestamp_ms,
503                theoretical_price: row.theoretical_price,
504            })
505            .collect())
506    }
507
508    async fn get_historical_theos_batch(
509        &self,
510        symbols: &[String],
511        interval_ms: i64,
512        limit: usize,
513    ) -> Result<HashMap<String, Vec<HistoricalTheoPoint>>> {
514        use schema::historical_theo_snapshots::dsl as ht;
515
516        if symbols.is_empty() {
517            return Ok(HashMap::new());
518        }
519
520        let mut conn = self.get_conn().await?;
521
522        let rows = ht::historical_theo_snapshots
523            .filter(ht::symbol.eq_any(symbols))
524            .filter(ht::interval_ms.eq(interval_ms))
525            .order((ht::symbol.asc(), ht::timestamp_ms.desc()))
526            .load::<HistoricalTheoSnapshot>(&mut conn)
527            .await?;
528
529        let mut result: HashMap<String, Vec<HistoricalTheoPoint>> = HashMap::new();
530        for row in rows {
531            let entry = result.entry(row.symbol.clone()).or_default();
532            if entry.len() < limit {
533                entry.push(HistoricalTheoPoint {
534                    timestamp_ms: row.timestamp_ms,
535                    theoretical_price: row.theoretical_price,
536                });
537            }
538        }
539
540        for points in result.values_mut() {
541            points.reverse();
542        }
543
544        Ok(result)
545    }
546
547    async fn get_vol_surface_history(
548        &self,
549        underlying: &str,
550        interval_ms: i64,
551        limit: usize,
552    ) -> Result<Vec<VolSurfaceSnapshot>> {
553        use schema::vol_surface_snapshots::dsl as vs;
554
555        let mut conn = self.get_conn().await?;
556
557        let mut rows = vs::vol_surface_snapshots
558            .filter(vs::underlying.eq(underlying))
559            .filter(vs::interval_ms.eq(interval_ms))
560            .order(vs::timestamp_ms.desc())
561            .limit(limit as i64)
562            .load::<VolSurfaceSnapshotRow>(&mut conn)
563            .await?;
564
565        rows.reverse();
566
567        Ok(rows
568            .into_iter()
569            .map(|row| VolSurfaceSnapshot {
570                id: row.id,
571                underlying: row.underlying,
572                interval_ms: row.interval_ms,
573                timestamp_ms: row.timestamp_ms,
574                surface_json: row.surface_json,
575                created_at: row.created_at,
576            })
577            .collect())
578    }
579
580    async fn load_bbo_snapshots_since(&self, cutoff_ts: i64) -> Result<Vec<BboSnapshotRecord>> {
581        use schema::bbo_snapshots::dsl as bbo;
582
583        let mut conn = self.get_conn().await?;
584
585        let rows = bbo::bbo_snapshots
586            .filter(bbo::snapshot_ts.ge(cutoff_ts))
587            .order((bbo::symbol.asc(), bbo::snapshot_ts.asc()))
588            .load::<BboSnapshot>(&mut conn)
589            .await?;
590
591        Ok(rows
592            .into_iter()
593            .map(|row| BboSnapshotRecord {
594                id: row.id,
595                symbol: row.symbol,
596                best_bid: row.best_bid,
597                best_ask: row.best_ask,
598                best_bid_size: row.best_bid_size,
599                best_ask_size: row.best_ask_size,
600                snapshot_ts: row.snapshot_ts,
601                created_at: row.created_at,
602            })
603            .collect())
604    }
605
606    async fn get_bbo_reference_asks(
607        &self,
608        symbols: &[String],
609        cutoff_ts: i64,
610    ) -> Result<HashMap<String, BboReferenceData>> {
611        if symbols.is_empty() {
612            return Ok(HashMap::new());
613        }
614
615        let mut conn = self.get_conn().await?;
616
617        let rows = diesel::sql_query(
618            "WITH requested AS (
619                SELECT UNNEST($1::text[]) AS symbol
620            ),
621            latest_before AS (
622                SELECT DISTINCT ON (b.symbol) b.symbol, b.best_ask, b.snapshot_ts
623                FROM bbo_snapshots b
624                JOIN requested r ON r.symbol = b.symbol
625                WHERE b.snapshot_ts <= $2
626                ORDER BY b.symbol, b.snapshot_ts DESC
627            ),
628            earliest_any AS (
629                SELECT DISTINCT ON (b.symbol) b.symbol, b.best_ask, b.snapshot_ts
630                FROM bbo_snapshots b
631                JOIN requested r ON r.symbol = b.symbol
632                ORDER BY b.symbol, b.snapshot_ts ASC
633            )
634            SELECT r.symbol,
635                   COALESCE(lb.best_ask, ea.best_ask) AS reference_ask,
636                   COALESCE(lb.snapshot_ts, ea.snapshot_ts) AS reference_ts,
637                   (lb.symbol IS NULL AND ea.symbol IS NOT NULL) AS used_earliest_fallback
638            FROM requested r
639            LEFT JOIN latest_before lb USING (symbol)
640            LEFT JOIN earliest_any ea USING (symbol)
641            WHERE COALESCE(lb.symbol, ea.symbol) IS NOT NULL",
642        )
643        .bind::<diesel::sql_types::Array<diesel::sql_types::Text>, _>(symbols)
644        .bind::<diesel::sql_types::BigInt, _>(cutoff_ts)
645        .load::<BboReferenceRow>(&mut conn)
646        .await?;
647
648        Ok(rows
649            .into_iter()
650            .map(|row| {
651                (
652                    row.symbol,
653                    BboReferenceData {
654                        reference_ask: row.reference_ask,
655                        reference_ts: row.reference_ts,
656                        used_earliest_fallback: row.used_earliest_fallback,
657                    },
658                )
659            })
660            .collect())
661    }
662
663    async fn get_settlement_payouts(
664        &self,
665        wallet: &WalletAddress,
666        limit: i64,
667        offset: i64,
668        symbol: Option<&str>,
669        ledger_applied: Option<bool>,
670    ) -> Result<(Vec<SettlementPayoutRecord>, i64)> {
671        use schema::settlement_payouts::dsl as sp;
672
673        let mut conn = self.get_conn().await?;
674
675        let mut count_query = sp::settlement_payouts
676            .filter(sp::wallet.eq(wallet))
677            .into_boxed();
678        let mut rows_query = sp::settlement_payouts
679            .filter(sp::wallet.eq(wallet))
680            .into_boxed();
681
682        if let Some(symbol_filter) = symbol {
683            count_query = count_query.filter(sp::symbol.eq(symbol_filter));
684            rows_query = rows_query.filter(sp::symbol.eq(symbol_filter));
685        }
686
687        if let Some(applied_filter) = ledger_applied {
688            count_query = count_query.filter(sp::ledger_applied.eq(applied_filter));
689            rows_query = rows_query.filter(sp::ledger_applied.eq(applied_filter));
690        }
691
692        let count: i64 = count_query.count().get_result(&mut conn).await?;
693        let rows = rows_query
694            .order(sp::id.desc())
695            .limit(limit)
696            .offset(offset)
697            .load::<SettlementPayout>(&mut conn)
698            .await?;
699
700        Ok((rows.into_iter().map(Into::into).collect(), count))
701    }
702
703    async fn get_seen_settlement_payout_ids(
704        &self,
705        wallet: &WalletAddress,
706        payout_ids: &[i64],
707    ) -> Result<HashSet<i64>> {
708        use schema::settlement_payout_seen::dsl as sps;
709
710        if payout_ids.is_empty() {
711            return Ok(HashSet::new());
712        }
713
714        let mut conn = self.get_conn().await?;
715
716        let rows: Vec<i64> = sps::settlement_payout_seen
717            .filter(sps::wallet.eq(wallet))
718            .filter(sps::payout_id.eq_any(payout_ids))
719            .select(sps::payout_id)
720            .load::<i64>(&mut conn)
721            .await?;
722
723        Ok(rows.into_iter().collect())
724    }
725
726    async fn get_client_ids_by_order_ids(
727        &self,
728        order_ids: &[i64],
729    ) -> Result<HashMap<i64, Option<String>>> {
730        if order_ids.is_empty() {
731            return Ok(HashMap::new());
732        }
733        let mut conn = self.get_conn().await?;
734        let results = crate::schema::order_infos::table
735            .filter(crate::schema::order_infos::order_id.eq_any(order_ids))
736            .select((
737                crate::schema::order_infos::order_id,
738                crate::schema::order_infos::client_id,
739            ))
740            .load::<(i64, Option<String>)>(&mut conn)
741            .await?;
742        Ok(results.into_iter().collect())
743    }
744
745    async fn get_theo_marks_at_timestamp(
746        &self,
747        symbols: &[String],
748        timestamp_ms: i64,
749    ) -> Result<HashMap<String, Decimal>> {
750        if symbols.is_empty() {
751            return Ok(HashMap::new());
752        }
753
754        #[derive(QueryableByName)]
755        struct TheoRow {
756            #[diesel(sql_type = diesel::sql_types::Text)]
757            symbol: String,
758            #[diesel(sql_type = diesel::sql_types::Numeric)]
759            theoretical_price: Decimal,
760        }
761
762        let mut conn = self.get_conn().await?;
763        let rows = diesel::sql_query(
764            "SELECT DISTINCT ON (symbol) symbol, theoretical_price
765             FROM historical_theo_snapshots
766             WHERE symbol = ANY($1) AND timestamp_ms <= $2
767             ORDER BY symbol, timestamp_ms DESC, interval_ms ASC",
768        )
769        .bind::<diesel::sql_types::Array<diesel::sql_types::Text>, _>(symbols)
770        .bind::<diesel::sql_types::BigInt, _>(timestamp_ms)
771        .get_results::<TheoRow>(&mut conn)
772        .await?;
773
774        Ok(rows
775            .into_iter()
776            .map(|row| (row.symbol, row.theoretical_price))
777            .collect())
778    }
779
780    async fn get_deposit_withdraw_events(
781        &self,
782        wallet: &WalletAddress,
783    ) -> Result<Vec<hypercall_db::LedgerEventTimeDelta>> {
784        #[derive(QueryableByName)]
785        struct LedgerEventRow {
786            #[diesel(sql_type = diesel::sql_types::BigInt)]
787            event_ts_ms: i64,
788            #[diesel(sql_type = diesel::sql_types::Numeric)]
789            delta: Decimal,
790        }
791
792        let mut conn = self.get_conn().await?;
793        let rows = diesel::sql_query(
794            "SELECT event_ts_ms, delta
795             FROM ledger_events
796             WHERE wallet = $1 AND event_type IN ('deposit', 'withdraw')
797             ORDER BY event_ts_ms ASC, id ASC",
798        )
799        .bind::<diesel::sql_types::Bytea, _>(wallet)
800        .get_results::<LedgerEventRow>(&mut conn)
801        .await?;
802
803        Ok(rows
804            .into_iter()
805            .map(|r| hypercall_db::LedgerEventTimeDelta {
806                event_ts_ms: r.event_ts_ms,
807                delta: r.delta,
808            })
809            .collect())
810    }
811
812    async fn get_settled_pnl_by_symbol(
813        &self,
814        wallet: &WalletAddress,
815        before_ts_ms: i64,
816    ) -> Result<Vec<(String, Decimal)>> {
817        #[derive(QueryableByName)]
818        struct SettlementRow {
819            #[diesel(sql_type = diesel::sql_types::Text)]
820            reference_symbol: String,
821            #[diesel(sql_type = diesel::sql_types::Numeric)]
822            total_pnl: Decimal,
823        }
824
825        let mut conn = self.get_conn().await?;
826        let rows = diesel::sql_query(
827            "SELECT reference_symbol, SUM(delta) AS total_pnl
828             FROM ledger_events
829             WHERE wallet = $1
830               AND event_ts_ms <= $2
831               AND event_type IN (
832                   'fill_premium',
833                   'fill_realized_pnl',
834                   'settlement_realized_pnl'
835               )
836               AND reference_symbol IS NOT NULL
837             GROUP BY reference_symbol",
838        )
839        .bind::<diesel::sql_types::Bytea, _>(wallet)
840        .bind::<diesel::sql_types::BigInt, _>(before_ts_ms)
841        .get_results::<SettlementRow>(&mut conn)
842        .await?;
843
844        Ok(rows
845            .into_iter()
846            .map(|r| (r.reference_symbol, r.total_pnl))
847            .collect())
848    }
849
850    async fn get_fills_since_timestamp(
851        &self,
852        cutoff_timestamp: i64,
853    ) -> Result<Vec<hypercall_db::FillBackfillRecord>> {
854        #[derive(QueryableByName)]
855        struct FillBackfillRow {
856            #[diesel(sql_type = diesel::sql_types::Text)]
857            symbol: String,
858            #[diesel(sql_type = diesel::sql_types::Numeric)]
859            price: Decimal,
860            #[diesel(sql_type = diesel::sql_types::Numeric)]
861            size: Decimal,
862            #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Numeric>)]
863            underlying_notional: Option<Decimal>,
864            #[diesel(sql_type = diesel::sql_types::BigInt)]
865            timestamp: i64,
866        }
867
868        let mut conn = self.get_conn().await?;
869        let rows = diesel::sql_query(
870            "SELECT symbol, price, size, underlying_notional, timestamp
871             FROM fills
872             WHERE timestamp >= $1
873               AND is_taker = true
874             ORDER BY timestamp ASC",
875        )
876        .bind::<diesel::sql_types::BigInt, _>(cutoff_timestamp)
877        .get_results::<FillBackfillRow>(&mut conn)
878        .await?;
879
880        Ok(rows
881            .into_iter()
882            .map(|r| hypercall_db::FillBackfillRecord {
883                symbol: r.symbol,
884                price: r.price,
885                size: r.size,
886                underlying_notional: r.underlying_notional,
887                timestamp: r.timestamp,
888            })
889            .collect())
890    }
891}
892
893// =============================================================================
894// AnalyticsWriter implementation
895// =============================================================================
896
897#[async_trait::async_trait]
898impl AnalyticsWriter for DieselDb {
899    async fn upsert_historical_pnl_batch(
900        &self,
901        interval_ms: i64,
902        timestamp_ms: i64,
903        snapshots: &[(WalletAddress, Decimal, Option<Vec<u8>>)],
904        max_periods: i64,
905    ) -> Result<usize> {
906        use schema::historical_pnl_snapshots::dsl as hp;
907
908        if snapshots.is_empty() {
909            return Ok(0);
910        }
911        if max_periods <= 0 {
912            anyhow::bail!("max_periods must be positive, got {}", max_periods);
913        }
914        if ![
915            HISTORICAL_PNL_INTERVAL_5M_MS,
916            HISTORICAL_PNL_INTERVAL_1H_MS,
917            HISTORICAL_PNL_INTERVAL_1D_MS,
918        ]
919        .contains(&interval_ms)
920        {
921            anyhow::bail!(
922                "Invalid interval_ms {}: must be one of the known intervals (5m, 1h, 1d)",
923                interval_ms
924            );
925        }
926
927        let mut conn = self.get_conn().await?;
928
929        let new_rows: Vec<NewHistoricalPnlSnapshot> = snapshots
930            .iter()
931            .map(|(wallet, equity, attr)| NewHistoricalPnlSnapshot {
932                wallet_address: *wallet,
933                interval_ms,
934                timestamp_ms,
935                total_equity: *equity,
936                attribution: attr.clone(),
937            })
938            .collect();
939
940        let wallet_bytes: Vec<Vec<u8>> = snapshots
941            .iter()
942            .map(|(w, _, _)| w.as_bytes().to_vec())
943            .collect();
944
945        conn.transaction::<_, diesel::result::Error, _>(async |conn| {
946            let affected = diesel::insert_into(hp::historical_pnl_snapshots)
947                .values(&new_rows)
948                .on_conflict((hp::wallet_address, hp::interval_ms, hp::timestamp_ms))
949                .do_update()
950                .set((
951                    hp::total_equity.eq(excluded(hp::total_equity)),
952                    hp::attribution.eq(excluded(hp::attribution)),
953                ))
954                .execute(&mut *conn)
955                .await?;
956
957            diesel::sql_query(
958                "DELETE FROM historical_pnl_snapshots h
959                     USING (
960                        SELECT id FROM (
961                            SELECT id,
962                                   ROW_NUMBER() OVER (
963                                       PARTITION BY wallet_address, interval_ms
964                                       ORDER BY timestamp_ms DESC
965                                   ) AS row_num
966                            FROM historical_pnl_snapshots
967                            WHERE interval_ms = $1
968                              AND wallet_address = ANY($3)
969                        ) ranked
970                        WHERE ranked.row_num > $2
971                     ) stale
972                     WHERE h.id = stale.id",
973            )
974            .bind::<diesel::sql_types::BigInt, _>(interval_ms)
975            .bind::<diesel::sql_types::BigInt, _>(max_periods)
976            .bind::<diesel::sql_types::Array<diesel::sql_types::Bytea>, _>(&wallet_bytes)
977            .execute(&mut *conn)
978            .await?;
979
980            Ok(affected)
981        })
982        .await
983        .map_err(Into::into)
984    }
985
986    async fn upsert_historical_theo_batch(
987        &self,
988        interval_ms: i64,
989        timestamp_ms: i64,
990        snapshots: &[(String, Decimal)],
991        max_periods: i64,
992    ) -> Result<usize> {
993        use schema::historical_theo_snapshots::dsl as ht;
994
995        if snapshots.is_empty() {
996            return Ok(0);
997        }
998        if max_periods <= 0 {
999            anyhow::bail!("max_periods must be positive, got {}", max_periods);
1000        }
1001        if ![
1002            HISTORICAL_THEO_INTERVAL_5M_MS,
1003            HISTORICAL_THEO_INTERVAL_1H_MS,
1004            HISTORICAL_THEO_INTERVAL_1D_MS,
1005        ]
1006        .contains(&interval_ms)
1007        {
1008            anyhow::bail!(
1009                "Invalid interval_ms {}: must be one of the known intervals (5m, 1h, 1d)",
1010                interval_ms
1011            );
1012        }
1013
1014        let mut conn = self.get_conn().await?;
1015
1016        let new_rows: Vec<NewHistoricalTheoSnapshot> = snapshots
1017            .iter()
1018            .map(|(symbol, theoretical_price)| NewHistoricalTheoSnapshot {
1019                symbol: symbol.clone(),
1020                interval_ms,
1021                timestamp_ms,
1022                theoretical_price: *theoretical_price,
1023            })
1024            .collect();
1025
1026        let symbols: Vec<String> = snapshots.iter().map(|(symbol, _)| symbol.clone()).collect();
1027
1028        conn.transaction::<_, diesel::result::Error, _>(async |conn| {
1029            let affected = diesel::insert_into(ht::historical_theo_snapshots)
1030                .values(&new_rows)
1031                .on_conflict((ht::symbol, ht::interval_ms, ht::timestamp_ms))
1032                .do_update()
1033                .set(ht::theoretical_price.eq(excluded(ht::theoretical_price)))
1034                .execute(&mut *conn)
1035                .await?;
1036
1037            diesel::sql_query(
1038                "DELETE FROM historical_theo_snapshots h
1039                     USING (
1040                        SELECT id FROM (
1041                            SELECT id,
1042                                   ROW_NUMBER() OVER (
1043                                       PARTITION BY symbol, interval_ms
1044                                       ORDER BY timestamp_ms DESC
1045                                   ) AS row_num
1046                            FROM historical_theo_snapshots
1047                            WHERE interval_ms = $1
1048                              AND symbol = ANY($3)
1049                        ) ranked
1050                        WHERE ranked.row_num > $2
1051                     ) stale
1052                     WHERE h.id = stale.id",
1053            )
1054            .bind::<diesel::sql_types::BigInt, _>(interval_ms)
1055            .bind::<diesel::sql_types::BigInt, _>(max_periods)
1056            .bind::<diesel::sql_types::Array<diesel::sql_types::Text>, _>(&symbols)
1057            .execute(&mut *conn)
1058            .await?;
1059
1060            Ok(affected)
1061        })
1062        .await
1063        .map_err(Into::into)
1064    }
1065
1066    async fn upsert_vol_surface_snapshot(
1067        &self,
1068        interval_ms: i64,
1069        timestamp_ms: i64,
1070        underlying: &str,
1071        surface_json: serde_json::Value,
1072        max_periods: i64,
1073    ) -> Result<usize> {
1074        use schema::vol_surface_snapshots::dsl as vs;
1075
1076        if max_periods <= 0 {
1077            anyhow::bail!("max_periods must be positive, got {}", max_periods);
1078        }
1079        if ![
1080            HISTORICAL_THEO_INTERVAL_5M_MS,
1081            HISTORICAL_THEO_INTERVAL_1H_MS,
1082            HISTORICAL_THEO_INTERVAL_1D_MS,
1083        ]
1084        .contains(&interval_ms)
1085        {
1086            anyhow::bail!(
1087                "Invalid interval_ms {}: must be one of the known intervals (5m, 1h, 1d)",
1088                interval_ms
1089            );
1090        }
1091
1092        let mut conn = self.get_conn().await?;
1093
1094        let new_row = NewVolSurfaceSnapshotRow {
1095            underlying: underlying.to_string(),
1096            interval_ms,
1097            timestamp_ms,
1098            surface_json,
1099        };
1100
1101        let underlying_owned = underlying.to_string();
1102
1103        conn.transaction::<_, diesel::result::Error, _>(async |conn| {
1104            let affected = diesel::insert_into(vs::vol_surface_snapshots)
1105                .values(&new_row)
1106                .on_conflict((vs::underlying, vs::interval_ms, vs::timestamp_ms))
1107                .do_update()
1108                .set(vs::surface_json.eq(excluded(vs::surface_json)))
1109                .execute(&mut *conn)
1110                .await?;
1111
1112            diesel::sql_query(
1113                "DELETE FROM vol_surface_snapshots v
1114                     USING (
1115                        SELECT id FROM (
1116                            SELECT id,
1117                                   ROW_NUMBER() OVER (
1118                                       PARTITION BY underlying, interval_ms
1119                                       ORDER BY timestamp_ms DESC
1120                                   ) AS row_num
1121                            FROM vol_surface_snapshots
1122                            WHERE interval_ms = $1
1123                              AND underlying = $3
1124                        ) ranked
1125                        WHERE ranked.row_num > $2
1126                     ) stale
1127                     WHERE v.id = stale.id",
1128            )
1129            .bind::<diesel::sql_types::BigInt, _>(interval_ms)
1130            .bind::<diesel::sql_types::BigInt, _>(max_periods)
1131            .bind::<diesel::sql_types::Text, _>(&underlying_owned)
1132            .execute(&mut *conn)
1133            .await?;
1134
1135            Ok(affected)
1136        })
1137        .await
1138        .map_err(Into::into)
1139    }
1140
1141    async fn upsert_bbo_snapshots(
1142        &self,
1143        snapshot_ts: i64,
1144        snapshots: &[NewBboSnapshotInput],
1145    ) -> Result<usize> {
1146        use schema::bbo_snapshots::dsl as bbo;
1147
1148        if snapshots.is_empty() {
1149            return Ok(0);
1150        }
1151
1152        let mut conn = self.get_conn().await?;
1153
1154        let rows: Vec<NewBboSnapshot> = snapshots
1155            .iter()
1156            .map(|snapshot| NewBboSnapshot {
1157                symbol: snapshot.symbol.clone(),
1158                best_bid: snapshot.best_bid,
1159                best_ask: snapshot.best_ask,
1160                best_bid_size: snapshot.best_bid_size,
1161                best_ask_size: snapshot.best_ask_size,
1162                snapshot_ts,
1163            })
1164            .collect();
1165
1166        let affected = diesel::insert_into(bbo::bbo_snapshots)
1167            .values(&rows)
1168            .on_conflict((bbo::symbol, bbo::snapshot_ts))
1169            .do_update()
1170            .set((
1171                bbo::best_bid.eq(excluded(bbo::best_bid)),
1172                bbo::best_ask.eq(excluded(bbo::best_ask)),
1173                bbo::best_bid_size.eq(excluded(bbo::best_bid_size)),
1174                bbo::best_ask_size.eq(excluded(bbo::best_ask_size)),
1175            ))
1176            .execute(&mut conn)
1177            .await?;
1178
1179        Ok(affected)
1180    }
1181
1182    async fn delete_bbo_snapshots_older_than(&self, cutoff_ts: i64) -> Result<usize> {
1183        use schema::bbo_snapshots::dsl as bbo;
1184
1185        let mut conn = self.get_conn().await?;
1186
1187        let deleted = diesel::delete(bbo::bbo_snapshots.filter(bbo::snapshot_ts.lt(cutoff_ts)))
1188            .execute(&mut conn)
1189            .await?;
1190
1191        Ok(deleted)
1192    }
1193
1194    async fn mark_settlement_payouts_seen(
1195        &self,
1196        wallet: &WalletAddress,
1197        payout_ids: &[i64],
1198    ) -> Result<usize> {
1199        use schema::settlement_payout_seen::dsl as sps;
1200        use schema::settlement_payouts::dsl as sp;
1201
1202        if payout_ids.is_empty() {
1203            return Ok(0);
1204        }
1205
1206        let mut conn = self.get_conn().await?;
1207
1208        let valid_ids: Vec<i64> = sp::settlement_payouts
1209            .filter(sp::wallet.eq(wallet))
1210            .filter(sp::id.eq_any(payout_ids))
1211            .select(sp::id)
1212            .load::<i64>(&mut conn)
1213            .await?;
1214
1215        if valid_ids.is_empty() {
1216            return Ok(0);
1217        }
1218
1219        let inserts: Vec<NewSettlementPayoutSeen> = valid_ids
1220            .into_iter()
1221            .map(|payout_id| NewSettlementPayoutSeen {
1222                wallet: *wallet,
1223                payout_id,
1224            })
1225            .collect();
1226
1227        let inserted_rows = diesel::insert_into(sps::settlement_payout_seen)
1228            .values(&inserts)
1229            .on_conflict((sps::wallet, sps::payout_id))
1230            .do_nothing()
1231            .execute(&mut conn)
1232            .await?;
1233
1234        Ok(inserted_rows)
1235    }
1236}
1237
1238#[cfg(test)]
1239mod tests {
1240    use crate::test_helpers::TestDb;
1241    use hypercall_db::AnalyticsReader;
1242    use hypercall_db::CatalogReader;
1243    use hypercall_types::wallet_address::test_wallet;
1244
1245    #[tokio::test]
1246    async fn instrument_exists_empty_db_returns_false() {
1247        let test_db = TestDb::new().await.unwrap();
1248        let db = test_db.diesel_db().await;
1249        let exists = db.instrument_exists("BTC-20260131-100000-C").await.unwrap();
1250        assert!(!exists);
1251    }
1252
1253    #[tokio::test]
1254    async fn get_all_trades_empty_db_returns_empty() {
1255        let test_db = TestDb::new().await.unwrap();
1256        let db = test_db.diesel_db().await;
1257        let trades = db.get_all_trades(100, 0).await.unwrap();
1258        assert!(trades.is_empty());
1259    }
1260
1261    #[tokio::test]
1262    async fn get_historical_pnl_empty_db_returns_empty() {
1263        let test_db = TestDb::new().await.unwrap();
1264        let db = test_db.diesel_db().await;
1265        let pnl = db
1266            .get_historical_pnl(
1267                &test_wallet(60),
1268                hypercall_types::HISTORICAL_PNL_INTERVAL_1H_MS,
1269                100,
1270                false,
1271            )
1272            .await
1273            .unwrap();
1274        assert!(pnl.is_empty());
1275    }
1276
1277    // ---- Convert derive roundtrip tests (no DB needed) ----
1278
1279    use super::*;
1280    use hypercall_db::{FillRecord, OrderRecord, TradeRecord};
1281    use rust_decimal_macros::dec;
1282
1283    #[test]
1284    fn trade_row_to_record_roundtrip() {
1285        let naive_dt = chrono::DateTime::from_timestamp(1_700_000_000, 0)
1286            .unwrap()
1287            .naive_utc();
1288        let row = TradeRow {
1289            trade_id: 42,
1290            symbol: "BTC-20260115-95000-C".to_string(),
1291            price: dec!(100.5),
1292            size: dec!(1000000),
1293            maker_address: test_wallet(1),
1294            taker_address: test_wallet(2),
1295            maker_fee: dec!(0.5),
1296            taker_fee: dec!(0.3),
1297            timestamp: 1_700_000_000_000,
1298            created_at: Some(naive_dt),
1299        };
1300        let record: TradeRecord = row.into();
1301        assert_eq!(record.trade_id, 42);
1302        assert_eq!(record.symbol, "BTC-20260115-95000-C");
1303        assert_eq!(record.price, dec!(100.5));
1304        assert_eq!(record.size, dec!(1000000));
1305        assert_eq!(record.maker_address, test_wallet(1));
1306        assert_eq!(record.taker_address, test_wallet(2));
1307        assert_eq!(record.maker_fee, dec!(0.5));
1308        assert_eq!(record.taker_fee, dec!(0.3));
1309        assert_eq!(record.timestamp, 1_700_000_000_000);
1310        // NaiveDateTime should be converted to DateTime<Utc>
1311        let created = record.created_at.expect("created_at should be Some");
1312        assert_eq!(created.naive_utc(), naive_dt);
1313    }
1314
1315    #[test]
1316    fn trade_row_to_record_none_created_at() {
1317        let row = TradeRow {
1318            trade_id: 1,
1319            symbol: "ETH-20260115-3000-P".to_string(),
1320            price: dec!(50),
1321            size: dec!(100),
1322            maker_address: test_wallet(3),
1323            taker_address: test_wallet(4),
1324            maker_fee: dec!(0),
1325            taker_fee: dec!(0),
1326            timestamp: 0,
1327            created_at: None,
1328        };
1329        let record: TradeRecord = row.into();
1330        assert!(record.created_at.is_none());
1331    }
1332
1333    #[test]
1334    fn fill_row_to_record_roundtrip() {
1335        let naive_dt = chrono::DateTime::from_timestamp(1_700_000_000, 0)
1336            .unwrap()
1337            .naive_utc();
1338        let row = FillRow {
1339            fill_id: Some(99),
1340            trade_id: 42,
1341            wallet_address: test_wallet(5),
1342            symbol: "BTC-20260115-95000-C".to_string(),
1343            price: dec!(100.5),
1344            size: dec!(500),
1345            fee: dec!(0.25),
1346            is_taker: true,
1347            timestamp: 1_700_000_000_000,
1348            created_at: Some(naive_dt),
1349            builder_code_address: Some(test_wallet(10)),
1350            builder_code_fee: Some(dec!(0.1)),
1351            realized_pnl: Some(dec!(42.5)),
1352            underlying_notional: Some(dec!(12345)),
1353        };
1354        let record: FillRecord = row.into();
1355        assert_eq!(record.fill_id, Some(99));
1356        assert_eq!(record.trade_id, 42);
1357        assert_eq!(record.wallet_address, test_wallet(5));
1358        assert_eq!(record.symbol, "BTC-20260115-95000-C");
1359        assert_eq!(record.price, dec!(100.5));
1360        assert_eq!(record.size, dec!(500));
1361        assert_eq!(record.fee, dec!(0.25));
1362        assert!(record.is_taker);
1363        assert_eq!(record.timestamp, 1_700_000_000_000);
1364        let created = record.created_at.expect("created_at should be Some");
1365        assert_eq!(created.naive_utc(), naive_dt);
1366        assert_eq!(record.builder_code_address, Some(test_wallet(10)));
1367        assert_eq!(record.builder_code_fee, Some(dec!(0.1)));
1368        assert_eq!(record.realized_pnl, Some(dec!(42.5)));
1369        assert_eq!(record.underlying_notional, Some(dec!(12345)));
1370    }
1371
1372    #[test]
1373    fn fill_row_to_record_all_optionals_none() {
1374        let row = FillRow {
1375            fill_id: None,
1376            trade_id: 1,
1377            wallet_address: test_wallet(6),
1378            symbol: "ETH-20260115-3000-P".to_string(),
1379            price: dec!(10),
1380            size: dec!(100),
1381            fee: dec!(0),
1382            is_taker: false,
1383            timestamp: 0,
1384            created_at: None,
1385            builder_code_address: None,
1386            builder_code_fee: None,
1387            underlying_notional: None,
1388            realized_pnl: None,
1389        };
1390        let record: FillRecord = row.into();
1391        assert!(record.fill_id.is_none());
1392        assert!(record.created_at.is_none());
1393        assert!(record.builder_code_address.is_none());
1394        assert!(record.builder_code_fee.is_none());
1395        assert!(record.realized_pnl.is_none());
1396        assert!(!record.is_taker);
1397    }
1398
1399    #[test]
1400    fn order_row_to_record_roundtrip() {
1401        let naive_dt = chrono::DateTime::from_timestamp(1_700_000_000, 0)
1402            .unwrap()
1403            .naive_utc();
1404        let row = OrderRow {
1405            order_id: 77,
1406            wallet_address: test_wallet(7),
1407            symbol: "BTC-20260115-95000-C".to_string(),
1408            side: "Buy".to_string(),
1409            price: dec!(200),
1410            size: dec!(10),
1411            tif: "GTC".to_string(),
1412            client_id: Some("my-order-1".to_string()),
1413            is_perp: false,
1414            underlying: Some("BTC".to_string()),
1415            reduce_only: Some(true),
1416            status: "OPEN".to_string(),
1417            // In OrderRow: created_at (i64) is renamed to timestamp in OrderRecord
1418            created_at: 1_700_000_000_000,
1419            // In OrderRow: updated_at (NaiveDateTime) is renamed to created_at in OrderRecord
1420            updated_at: naive_dt,
1421            filled_size: dec!(5),
1422            mmp_enabled: true,
1423        };
1424        let record: OrderRecord = row.into();
1425        assert_eq!(record.order_id, 77);
1426        assert_eq!(record.wallet_address, test_wallet(7));
1427        assert_eq!(record.symbol, "BTC-20260115-95000-C");
1428        assert_eq!(record.side, "Buy");
1429        assert_eq!(record.price, dec!(200));
1430        assert_eq!(record.size, dec!(10));
1431        assert_eq!(record.tif, "GTC");
1432        assert_eq!(record.client_id, Some("my-order-1".to_string()));
1433        assert!(!record.is_perp);
1434        assert_eq!(record.underlying, Some("BTC".to_string()));
1435        assert_eq!(record.reduce_only, Some(true));
1436        assert_eq!(record.status, "OPEN");
1437        assert_eq!(record.filled_size, dec!(5));
1438        // Verify rename mapping: row.created_at (i64) -> record.timestamp
1439        assert_eq!(record.timestamp, 1_700_000_000_000);
1440        // Verify rename mapping: row.updated_at (NaiveDateTime) -> record.created_at (Option<DateTime<Utc>>)
1441        let created = record
1442            .created_at
1443            .expect("created_at should be Some after conversion");
1444        assert_eq!(created.naive_utc(), naive_dt);
1445    }
1446
1447    #[test]
1448    fn order_row_to_record_optionals_none() {
1449        let naive_dt = chrono::DateTime::from_timestamp(1_600_000_000, 0)
1450            .unwrap()
1451            .naive_utc();
1452        let row = OrderRow {
1453            order_id: 1,
1454            wallet_address: test_wallet(8),
1455            symbol: "ETH-20260115-3000-P".to_string(),
1456            side: "Sell".to_string(),
1457            price: dec!(50),
1458            size: dec!(1),
1459            tif: "IOC".to_string(),
1460            client_id: None,
1461            is_perp: true,
1462            underlying: None,
1463            reduce_only: None,
1464            status: "FILLED".to_string(),
1465            created_at: 1_600_000_000_000,
1466            updated_at: naive_dt,
1467            filled_size: dec!(1),
1468            mmp_enabled: false,
1469        };
1470        let record: OrderRecord = row.into();
1471        assert!(record.client_id.is_none());
1472        assert!(record.is_perp);
1473        assert!(record.underlying.is_none());
1474        assert!(record.reduce_only.is_none());
1475        assert!(!record.mmp_enabled);
1476        assert_eq!(record.timestamp, 1_600_000_000_000);
1477        assert!(record.created_at.is_some());
1478    }
1479
1480    // ========================================================================
1481    // Roundtrip integration tests (insert via raw SQL or Writer, read via trait)
1482    // ========================================================================
1483
1484    use hypercall_db::{AnalyticsWriter, NewBboSnapshotInput};
1485    use hypercall_types::HISTORICAL_PNL_INTERVAL_5M_MS;
1486
1487    /// Helper: hex-encode a 20-byte wallet address for SQL literals.
1488    fn wallet_hex(wallet: &hypercall_types::WalletAddress) -> String {
1489        format!("'\\x{}'", hex::encode(wallet.as_bytes()))
1490    }
1491
1492    /// Helper to run raw SQL without ambiguity between sync/async RunQueryDsl.
1493    fn exec_sql(conn: &mut diesel::pg::PgConnection, sql: &str) {
1494        diesel::RunQueryDsl::execute(diesel::sql_query(sql), conn).unwrap();
1495    }
1496
1497    /// Helper to load rows via raw SQL without ambiguity.
1498    fn load_sql<T>(conn: &mut diesel::pg::PgConnection, sql: &str) -> Vec<T>
1499    where
1500        T: diesel::deserialize::QueryableByName<diesel::pg::Pg> + 'static,
1501    {
1502        diesel::RunQueryDsl::load(diesel::sql_query(sql), conn).unwrap()
1503    }
1504
1505    #[tokio::test]
1506    async fn get_all_trades_returns_inserted() {
1507        let test_db = TestDb::new().await.unwrap();
1508        let db = test_db.diesel_db().await;
1509
1510        let mut conn = test_db.handler.pool().get().unwrap();
1511        let w1 = wallet_hex(&test_wallet(1));
1512        let w2 = wallet_hex(&test_wallet(2));
1513
1514        exec_sql(
1515            &mut conn,
1516            &format!(
1517                "INSERT INTO trades (trade_id, symbol, price, size, maker_address, taker_address, maker_fee, taker_fee, timestamp)
1518                 VALUES
1519                   (1, 'BTC-20260131-100000-C', 5.0, 1000000, {w1}, {w2}, 0.01, 0.02, 1000),
1520                   (2, 'BTC-20260131-100000-C', 5.5, 2000000, {w1}, {w2}, 0.01, 0.02, 2000),
1521                   (3, 'ETH-20260131-4000-P', 1.0, 500000, {w2}, {w1}, 0.005, 0.01, 3000)"
1522            ),
1523        );
1524
1525        let trades = AnalyticsReader::get_all_trades(&db, 10, 0).await.unwrap();
1526        assert_eq!(trades.len(), 3);
1527        // Ordered DESC by timestamp
1528        assert_eq!(trades[0].trade_id, 3);
1529        assert_eq!(trades[1].trade_id, 2);
1530        assert_eq!(trades[2].trade_id, 1);
1531    }
1532
1533    #[tokio::test]
1534    async fn get_trades_by_option_filters_correctly() {
1535        let test_db = TestDb::new().await.unwrap();
1536        let db = test_db.diesel_db().await;
1537
1538        let mut conn = test_db.handler.pool().get().unwrap();
1539        let w1 = wallet_hex(&test_wallet(1));
1540        let w2 = wallet_hex(&test_wallet(2));
1541
1542        exec_sql(
1543            &mut conn,
1544            &format!(
1545                "INSERT INTO trades (trade_id, symbol, price, size, maker_address, taker_address, maker_fee, taker_fee, timestamp)
1546                 VALUES
1547                   (10, 'BTC-20260131-100000-C', 5.0, 1000000, {w1}, {w2}, 0.01, 0.02, 1000),
1548                   (11, 'BTC-20260131-100000-C', 6.0, 1000000, {w1}, {w2}, 0.01, 0.02, 2000),
1549                   (12, 'ETH-20260131-4000-P', 1.0, 500000, {w2}, {w1}, 0.005, 0.01, 3000)"
1550            ),
1551        );
1552
1553        let trades = db
1554            .get_trades_by_option("BTC-20260131-100000-C", 10)
1555            .await
1556            .unwrap();
1557        assert_eq!(trades.len(), 2);
1558        // All should be BTC option
1559        assert!(trades.iter().all(|t| t.symbol == "BTC-20260131-100000-C"));
1560        // DESC ordering
1561        assert_eq!(trades[0].trade_id, 11);
1562        assert_eq!(trades[1].trade_id, 10);
1563
1564        let eth_trades = db
1565            .get_trades_by_option("ETH-20260131-4000-P", 10)
1566            .await
1567            .unwrap();
1568        assert_eq!(eth_trades.len(), 1);
1569        assert_eq!(eth_trades[0].trade_id, 12);
1570    }
1571
1572    #[tokio::test]
1573    async fn get_trades_by_underlying_filters_correctly() {
1574        let test_db = TestDb::new().await.unwrap();
1575        let db = test_db.diesel_db().await;
1576
1577        let mut conn = test_db.handler.pool().get().unwrap();
1578        let w1 = wallet_hex(&test_wallet(1));
1579        let w2 = wallet_hex(&test_wallet(2));
1580
1581        // Insert instruments first (needed for JOIN)
1582        exec_sql(
1583            &mut conn,
1584            "INSERT INTO instruments (id, underlying, strike, expiry, option_type, status)
1585             VALUES
1586               ('BTC-20260131-100000-C', 'BTC', 100000, 1738300800, 'C', 'ACTIVE'),
1587               ('ETH-20260131-4000-P', 'ETH', 4000, 1738300800, 'P', 'ACTIVE')",
1588        );
1589
1590        exec_sql(
1591            &mut conn,
1592            &format!(
1593                "INSERT INTO trades (trade_id, symbol, price, size, maker_address, taker_address, maker_fee, taker_fee, timestamp)
1594                 VALUES
1595                   (20, 'BTC-20260131-100000-C', 5.0, 1000000, {w1}, {w2}, 0.01, 0.02, 1000),
1596                   (21, 'BTC-20260131-100000-C', 6.0, 2000000, {w1}, {w2}, 0.01, 0.02, 2000),
1597                   (22, 'ETH-20260131-4000-P', 1.0, 500000, {w2}, {w1}, 0.005, 0.01, 3000)"
1598            ),
1599        );
1600
1601        let btc_trades = db.get_trades_by_underlying("BTC", 10, 0).await.unwrap();
1602        assert_eq!(btc_trades.len(), 2);
1603        assert!(btc_trades
1604            .iter()
1605            .all(|t| t.symbol == "BTC-20260131-100000-C"));
1606
1607        let eth_trades = db.get_trades_by_underlying("ETH", 10, 0).await.unwrap();
1608        assert_eq!(eth_trades.len(), 1);
1609        assert_eq!(eth_trades[0].symbol, "ETH-20260131-4000-P");
1610    }
1611
1612    #[tokio::test]
1613    async fn get_fills_by_account_roundtrip() {
1614        let test_db = TestDb::new().await.unwrap();
1615        let db = test_db.diesel_db().await;
1616
1617        let mut conn = test_db.handler.pool().get().unwrap();
1618        let w1 = wallet_hex(&test_wallet(1));
1619        let w2 = wallet_hex(&test_wallet(2));
1620        let wallet1 = test_wallet(1);
1621        let wallet2 = test_wallet(2);
1622
1623        // Insert parent trades first (fills reference trade_id FK)
1624        exec_sql(
1625            &mut conn,
1626            &format!(
1627                "INSERT INTO trades (trade_id, symbol, price, size, maker_address, taker_address, maker_fee, taker_fee, timestamp)
1628                 VALUES
1629                   (30, 'BTC-20260131-100000-C', 5.0, 1000000, {w1}, {w2}, 0.01, 0.02, 1000),
1630                   (31, 'BTC-20260131-100000-C', 6.0, 2000000, {w1}, {w2}, 0.01, 0.02, 2000)"
1631            ),
1632        );
1633
1634        // Insert fills for two wallets
1635        exec_sql(
1636            &mut conn,
1637            &format!(
1638                "INSERT INTO fills (trade_id, wallet_address, symbol, price, size, fee, is_taker, timestamp)
1639                 VALUES
1640                   (30, {w1}, 'BTC-20260131-100000-C', 5.0, 1000000, 0.01, false, 1000),
1641                   (30, {w2}, 'BTC-20260131-100000-C', 5.0, 1000000, 0.02, true, 1000),
1642                   (31, {w1}, 'BTC-20260131-100000-C', 6.0, 2000000, 0.01, false, 2000)"
1643            ),
1644        );
1645
1646        let fills_w1 = db.get_fills_by_account(&wallet1, 10, 0).await.unwrap();
1647        assert_eq!(fills_w1.len(), 2);
1648        // All for wallet1
1649        assert!(fills_w1.iter().all(|f| f.wallet_address == wallet1));
1650        // DESC ordering
1651        assert_eq!(fills_w1[0].trade_id, 31);
1652        assert_eq!(fills_w1[1].trade_id, 30);
1653
1654        let fills_w2 = db.get_fills_by_account(&wallet2, 10, 0).await.unwrap();
1655        assert_eq!(fills_w2.len(), 1);
1656        assert_eq!(fills_w2[0].wallet_address, wallet2);
1657        assert!(fills_w2[0].is_taker);
1658    }
1659
1660    #[tokio::test]
1661    async fn get_orders_by_account_roundtrip() {
1662        let test_db = TestDb::new().await.unwrap();
1663        let db = test_db.diesel_db().await;
1664
1665        let mut conn = test_db.handler.pool().get().unwrap();
1666        let w1 = wallet_hex(&test_wallet(1));
1667        let w2 = wallet_hex(&test_wallet(2));
1668        let wallet1 = test_wallet(1);
1669
1670        exec_sql(
1671            &mut conn,
1672            &format!(
1673                "INSERT INTO order_infos (order_id, wallet_address, symbol, side, price, size, tif, is_perp, timestamp, status, filled_size, updated_at)
1674                 VALUES
1675                   (100, {w1}, 'BTC-20260131-100000-C', 'Buy', 5.0, 1000000, 'GTC', false, 1000, 'OPEN', 0, '2026-01-01 00:00:00'),
1676                   (101, {w1}, 'BTC-20260131-100000-C', 'Sell', 6.0, 2000000, 'GTC', false, 2000, 'FILLED', 2000000, '2026-01-01 00:01:00'),
1677                   (102, {w2}, 'ETH-20260131-4000-P', 'Buy', 1.0, 500000, 'IOC', false, 3000, 'CANCELED', 0, '2026-01-01 00:02:00')"
1678            ),
1679        );
1680
1681        let orders = db
1682            .get_orders_by_account(&wallet1, None, 10, 0)
1683            .await
1684            .unwrap();
1685        assert_eq!(orders.len(), 2);
1686        assert!(orders.iter().all(|o| o.wallet_address == wallet1));
1687        // DESC by timestamp
1688        assert_eq!(orders[0].order_id, 101);
1689        assert_eq!(orders[1].order_id, 100);
1690    }
1691
1692    #[tokio::test]
1693    async fn get_orders_by_account_with_status_filter() {
1694        let test_db = TestDb::new().await.unwrap();
1695        let db = test_db.diesel_db().await;
1696
1697        let mut conn = test_db.handler.pool().get().unwrap();
1698        let w1 = wallet_hex(&test_wallet(1));
1699        let wallet1 = test_wallet(1);
1700
1701        exec_sql(
1702            &mut conn,
1703            &format!(
1704                "INSERT INTO order_infos (order_id, wallet_address, symbol, side, price, size, tif, is_perp, timestamp, status, filled_size, updated_at)
1705                 VALUES
1706                   (200, {w1}, 'BTC-20260131-100000-C', 'Buy', 5.0, 1000000, 'GTC', false, 1000, 'OPEN', 0, '2026-01-01 00:00:00'),
1707                   (201, {w1}, 'BTC-20260131-100000-C', 'Buy', 5.5, 1000000, 'GTC', false, 2000, 'OPEN', 0, '2026-01-01 00:01:00'),
1708                   (202, {w1}, 'BTC-20260131-100000-C', 'Sell', 6.0, 2000000, 'GTC', false, 3000, 'FILLED', 2000000, '2026-01-01 00:02:00'),
1709                   (203, {w1}, 'ETH-20260131-4000-P', 'Buy', 1.0, 500000, 'IOC', false, 4000, 'CANCELED', 0, '2026-01-01 00:03:00')"
1710            ),
1711        );
1712
1713        let open_orders = db
1714            .get_orders_by_account(&wallet1, Some("OPEN"), 10, 0)
1715            .await
1716            .unwrap();
1717        assert_eq!(open_orders.len(), 2);
1718        assert!(open_orders.iter().all(|o| o.status == "OPEN"));
1719
1720        let filled_orders = db
1721            .get_orders_by_account(&wallet1, Some("FILLED"), 10, 0)
1722            .await
1723            .unwrap();
1724        assert_eq!(filled_orders.len(), 1);
1725        assert_eq!(filled_orders[0].order_id, 202);
1726
1727        // Test lowercase normalization
1728        let canceled = db
1729            .get_orders_by_account(&wallet1, Some("canceled"), 10, 0)
1730            .await
1731            .unwrap();
1732        assert_eq!(canceled.len(), 1);
1733        assert_eq!(canceled[0].order_id, 203);
1734    }
1735
1736    #[tokio::test]
1737    async fn upsert_and_get_historical_pnl_roundtrip() {
1738        let test_db = TestDb::new().await.unwrap();
1739        let db = test_db.diesel_db().await;
1740
1741        let wallet = test_wallet(50);
1742        let interval = HISTORICAL_PNL_INTERVAL_5M_MS;
1743        let attribution_data = vec![1u8, 2, 3, 4];
1744
1745        // Call with single wallet, two distinct timestamps
1746        let snap_t1 = vec![(wallet, dec!(1000.5), Some(attribution_data.clone()))];
1747        let snap_t2 = vec![(wallet, dec!(1050.25), None)];
1748
1749        AnalyticsWriter::upsert_historical_pnl_batch(&db, interval, 300_000, &snap_t1, 100)
1750            .await
1751            .unwrap();
1752        AnalyticsWriter::upsert_historical_pnl_batch(&db, interval, 600_000, &snap_t2, 100)
1753            .await
1754            .unwrap();
1755
1756        // Read back (with attribution)
1757        let points = db
1758            .get_historical_pnl(&wallet, interval, 100, true)
1759            .await
1760            .unwrap();
1761        assert_eq!(points.len(), 2);
1762        // Returned in ASC order (reversed from DESC query)
1763        assert_eq!(points[0].timestamp_ms, 300_000);
1764        assert_eq!(points[0].total_equity, dec!(1000.5));
1765        assert_eq!(points[0].attribution, Some(attribution_data));
1766        assert_eq!(points[1].timestamp_ms, 600_000);
1767        assert_eq!(points[1].total_equity, dec!(1050.25));
1768        assert_eq!(points[1].attribution, None);
1769
1770        // Read without attribution
1771        let points_no_attr = db
1772            .get_historical_pnl(&wallet, interval, 100, false)
1773            .await
1774            .unwrap();
1775        assert_eq!(points_no_attr.len(), 2);
1776        assert!(points_no_attr[0].attribution.is_none());
1777        assert!(points_no_attr[1].attribution.is_none());
1778    }
1779
1780    #[tokio::test]
1781    async fn upsert_and_get_historical_theos_roundtrip() {
1782        let test_db = TestDb::new().await.unwrap();
1783        let db = test_db.diesel_db().await;
1784
1785        let interval = hypercall_types::HISTORICAL_THEO_INTERVAL_5M_MS;
1786        let symbol = "BTC-20260131-100000-C".to_string();
1787
1788        let snap_t1 = vec![(symbol.clone(), dec!(5.25))];
1789        let snap_t2 = vec![(symbol.clone(), dec!(5.50))];
1790
1791        AnalyticsWriter::upsert_historical_theo_batch(&db, interval, 300_000, &snap_t1, 100)
1792            .await
1793            .unwrap();
1794        AnalyticsWriter::upsert_historical_theo_batch(&db, interval, 600_000, &snap_t2, 100)
1795            .await
1796            .unwrap();
1797
1798        let points = db
1799            .get_historical_theos(&symbol, interval, 100)
1800            .await
1801            .unwrap();
1802        assert_eq!(points.len(), 2);
1803        // Returned in ASC order
1804        assert_eq!(points[0].timestamp_ms, 300_000);
1805        assert_eq!(points[0].theoretical_price, dec!(5.25));
1806        assert_eq!(points[1].timestamp_ms, 600_000);
1807        assert_eq!(points[1].theoretical_price, dec!(5.50));
1808    }
1809
1810    #[tokio::test]
1811    async fn upsert_and_load_bbo_snapshots_roundtrip() {
1812        let test_db = TestDb::new().await.unwrap();
1813        let db = test_db.diesel_db().await;
1814
1815        let snapshots = vec![
1816            NewBboSnapshotInput {
1817                symbol: "BTC-20260131-100000-C".to_string(),
1818                best_bid: dec!(4.5),
1819                best_ask: dec!(5.0),
1820                best_bid_size: Some(dec!(100)),
1821                best_ask_size: Some(dec!(200)),
1822                snapshot_ts: 1000,
1823            },
1824            NewBboSnapshotInput {
1825                symbol: "ETH-20260131-4000-P".to_string(),
1826                best_bid: dec!(0.8),
1827                best_ask: dec!(1.0),
1828                best_bid_size: None,
1829                best_ask_size: None,
1830                snapshot_ts: 1000,
1831            },
1832        ];
1833
1834        let affected = AnalyticsWriter::upsert_bbo_snapshots(&db, 1000, &snapshots)
1835            .await
1836            .unwrap();
1837        assert_eq!(affected, 2);
1838
1839        let loaded = db.load_bbo_snapshots_since(0).await.unwrap();
1840        assert_eq!(loaded.len(), 2);
1841        // Ordered by symbol ASC, snapshot_ts ASC
1842        assert_eq!(loaded[0].symbol, "BTC-20260131-100000-C");
1843        assert_eq!(loaded[0].best_bid, dec!(4.5));
1844        assert_eq!(loaded[0].best_ask, dec!(5.0));
1845        assert_eq!(loaded[0].best_bid_size, Some(dec!(100)));
1846        assert_eq!(loaded[0].best_ask_size, Some(dec!(200)));
1847        assert_eq!(loaded[0].snapshot_ts, 1000);
1848
1849        assert_eq!(loaded[1].symbol, "ETH-20260131-4000-P");
1850        assert_eq!(loaded[1].best_bid, dec!(0.8));
1851        assert_eq!(loaded[1].best_ask, dec!(1.0));
1852        assert_eq!(loaded[1].best_bid_size, None);
1853        assert_eq!(loaded[1].best_ask_size, None);
1854    }
1855
1856    #[tokio::test]
1857    async fn delete_bbo_snapshots_older_than_removes_old() {
1858        let test_db = TestDb::new().await.unwrap();
1859        let db = test_db.diesel_db().await;
1860
1861        let old_snap = vec![NewBboSnapshotInput {
1862            symbol: "BTC-20260131-100000-C".to_string(),
1863            best_bid: dec!(4.0),
1864            best_ask: dec!(4.5),
1865            best_bid_size: Some(dec!(50)),
1866            best_ask_size: Some(dec!(60)),
1867            snapshot_ts: 100,
1868        }];
1869        let new_snap = vec![NewBboSnapshotInput {
1870            symbol: "BTC-20260131-100000-C".to_string(),
1871            best_bid: dec!(5.0),
1872            best_ask: dec!(5.5),
1873            best_bid_size: Some(dec!(70)),
1874            best_ask_size: Some(dec!(80)),
1875            snapshot_ts: 2000,
1876        }];
1877
1878        AnalyticsWriter::upsert_bbo_snapshots(&db, 100, &old_snap)
1879            .await
1880            .unwrap();
1881        AnalyticsWriter::upsert_bbo_snapshots(&db, 2000, &new_snap)
1882            .await
1883            .unwrap();
1884
1885        // Delete snapshots older than ts=1000
1886        let deleted = db.delete_bbo_snapshots_older_than(1000).await.unwrap();
1887        assert_eq!(deleted, 1);
1888
1889        let remaining = db.load_bbo_snapshots_since(0).await.unwrap();
1890        assert_eq!(remaining.len(), 1);
1891        assert_eq!(remaining[0].snapshot_ts, 2000);
1892    }
1893
1894    #[tokio::test]
1895    async fn upsert_and_get_vol_surface_roundtrip() {
1896        let test_db = TestDb::new().await.unwrap();
1897        let db = test_db.diesel_db().await;
1898
1899        let interval = hypercall_types::HISTORICAL_THEO_INTERVAL_5M_MS;
1900        let surface = serde_json::json!({
1901            "model": "ssvi",
1902            "params": [0.1, 0.2, 0.3]
1903        });
1904
1905        let affected = AnalyticsWriter::upsert_vol_surface_snapshot(
1906            &db,
1907            interval,
1908            300_000,
1909            "BTC",
1910            surface.clone(),
1911            100,
1912        )
1913        .await
1914        .unwrap();
1915        assert_eq!(affected, 1);
1916
1917        // Insert a second snapshot at different timestamp
1918        let surface2 = serde_json::json!({"model": "sabr", "params": [0.4, 0.5]});
1919        AnalyticsWriter::upsert_vol_surface_snapshot(
1920            &db,
1921            interval,
1922            600_000,
1923            "BTC",
1924            surface2.clone(),
1925            100,
1926        )
1927        .await
1928        .unwrap();
1929
1930        let history = db
1931            .get_vol_surface_history("BTC", interval, 100)
1932            .await
1933            .unwrap();
1934        assert_eq!(history.len(), 2);
1935        // Returned in ASC order
1936        assert_eq!(history[0].timestamp_ms, 300_000);
1937        assert_eq!(history[0].surface_json, surface);
1938        assert_eq!(history[0].underlying, "BTC");
1939        assert_eq!(history[1].timestamp_ms, 600_000);
1940        assert_eq!(history[1].surface_json, surface2);
1941    }
1942
1943    #[tokio::test]
1944    async fn mark_and_get_seen_settlement_payouts() {
1945        let test_db = TestDb::new().await.unwrap();
1946        let db = test_db.diesel_db().await;
1947
1948        let mut conn = test_db.handler.pool().get().unwrap();
1949        let wallet = test_wallet(70);
1950        let w_hex = wallet_hex(&wallet);
1951
1952        // Insert settlement payouts directly
1953        exec_sql(
1954            &mut conn,
1955            &format!(
1956                "INSERT INTO settlement_payouts (wallet, symbol, expiry_ts, position_size, settlement_price, payout_amount, ledger_applied)
1957                 VALUES
1958                   ({w_hex}, 'BTC-20260131-100000-C', 1738300800, 1000000, 100000, 5000, true),
1959                   ({w_hex}, 'ETH-20260131-4000-P', 1738300800, 500000, 4000, 200, true)"
1960            ),
1961        );
1962
1963        // Get the IDs of inserted payouts
1964        #[derive(diesel::QueryableByName)]
1965        struct IdRow {
1966            #[diesel(sql_type = diesel::sql_types::BigInt)]
1967            id: i64,
1968        }
1969        let ids: Vec<IdRow> = load_sql(
1970            &mut conn,
1971            &format!("SELECT id FROM settlement_payouts WHERE wallet = {w_hex} ORDER BY id"),
1972        );
1973        let payout_ids: Vec<i64> = ids.iter().map(|r| r.id).collect();
1974        assert_eq!(payout_ids.len(), 2);
1975
1976        // Before marking, none should be seen
1977        let seen_before = db
1978            .get_seen_settlement_payout_ids(&wallet, &payout_ids)
1979            .await
1980            .unwrap();
1981        assert!(seen_before.is_empty());
1982
1983        // Mark first payout as seen
1984        let marked = AnalyticsWriter::mark_settlement_payouts_seen(&db, &wallet, &[payout_ids[0]])
1985            .await
1986            .unwrap();
1987        assert_eq!(marked, 1);
1988
1989        // Verify only first is seen
1990        let seen_after = db
1991            .get_seen_settlement_payout_ids(&wallet, &payout_ids)
1992            .await
1993            .unwrap();
1994        assert_eq!(seen_after.len(), 1);
1995        assert!(seen_after.contains(&payout_ids[0]));
1996        assert!(!seen_after.contains(&payout_ids[1]));
1997
1998        // Mark both (idempotent for already-seen)
1999        AnalyticsWriter::mark_settlement_payouts_seen(&db, &wallet, &payout_ids)
2000            .await
2001            .unwrap();
2002        let seen_all = db
2003            .get_seen_settlement_payout_ids(&wallet, &payout_ids)
2004            .await
2005            .unwrap();
2006        assert_eq!(seen_all.len(), 2);
2007    }
2008
2009    #[tokio::test]
2010    async fn instrument_exists_true_after_insert() {
2011        let test_db = TestDb::new().await.unwrap();
2012        let db = test_db.diesel_db().await;
2013
2014        let mut conn = test_db.handler.pool().get().unwrap();
2015
2016        // Before insert
2017        assert!(!db.instrument_exists("BTC-20260131-100000-C").await.unwrap());
2018
2019        exec_sql(
2020            &mut conn,
2021            "INSERT INTO instruments (id, underlying, strike, expiry, option_type, status)
2022             VALUES ('BTC-20260131-100000-C', 'BTC', 100000, 1738300800, 'C', 'ACTIVE')",
2023        );
2024
2025        // After insert
2026        assert!(db.instrument_exists("BTC-20260131-100000-C").await.unwrap());
2027        // Different symbol still false
2028        assert!(!db.instrument_exists("ETH-20260131-4000-P").await.unwrap());
2029    }
2030
2031    #[tokio::test]
2032    async fn get_client_ids_by_order_ids_roundtrip() {
2033        let test_db = TestDb::new().await.unwrap();
2034        let db = test_db.diesel_db().await;
2035
2036        let mut conn = test_db.handler.pool().get().unwrap();
2037        let w1 = wallet_hex(&test_wallet(1));
2038
2039        exec_sql(
2040            &mut conn,
2041            &format!(
2042                "INSERT INTO order_infos (order_id, wallet_address, symbol, side, price, size, tif, is_perp, timestamp, status, filled_size, updated_at, client_id)
2043                 VALUES
2044                   (300, {w1}, 'BTC-20260131-100000-C', 'Buy', 5.0, 1000000, 'GTC', false, 1000, 'OPEN', 0, '2026-01-01 00:00:00', 'client-alpha'),
2045                   (301, {w1}, 'BTC-20260131-100000-C', 'Sell', 6.0, 2000000, 'GTC', false, 2000, 'OPEN', 0, '2026-01-01 00:01:00', NULL),
2046                   (302, {w1}, 'ETH-20260131-4000-P', 'Buy', 1.0, 500000, 'IOC', false, 3000, 'FILLED', 500000, '2026-01-01 00:02:00', 'client-beta')"
2047            ),
2048        );
2049
2050        let result = db
2051            .get_client_ids_by_order_ids(&[300, 301, 302, 999])
2052            .await
2053            .unwrap();
2054        assert_eq!(result.len(), 3); // 999 not found
2055        assert_eq!(result[&300], Some("client-alpha".to_string()));
2056        assert_eq!(result[&301], None);
2057        assert_eq!(result[&302], Some("client-beta".to_string()));
2058        assert!(!result.contains_key(&999));
2059    }
2060
2061    // ========================================================================
2062    // Tests for newly added trait methods
2063    // ========================================================================
2064
2065    #[tokio::test]
2066    async fn get_theo_marks_at_timestamp_picks_latest_before() {
2067        let test_db = TestDb::new().await.unwrap();
2068        let db = test_db.diesel_db().await;
2069
2070        let interval = hypercall_types::HISTORICAL_THEO_INTERVAL_5M_MS;
2071        let symbol = "BTC-20260131-100000-C".to_string();
2072
2073        // Insert theos at different timestamps
2074        AnalyticsWriter::upsert_historical_theo_batch(
2075            &db,
2076            interval,
2077            1000,
2078            &[(symbol.clone(), dec!(5.0))],
2079            100,
2080        )
2081        .await
2082        .unwrap();
2083
2084        AnalyticsWriter::upsert_historical_theo_batch(
2085            &db,
2086            interval,
2087            2000,
2088            &[(symbol.clone(), dec!(7.5))],
2089            100,
2090        )
2091        .await
2092        .unwrap();
2093
2094        AnalyticsWriter::upsert_historical_theo_batch(
2095            &db,
2096            interval,
2097            3000,
2098            &[(symbol.clone(), dec!(9.0))],
2099            100,
2100        )
2101        .await
2102        .unwrap();
2103
2104        // Query at ts=2500 should pick ts=2000 (latest before 2500)
2105        let marks = db
2106            .get_theo_marks_at_timestamp(&[symbol.clone()], 2500)
2107            .await
2108            .unwrap();
2109        assert_eq!(marks.len(), 1);
2110        assert_eq!(marks[&symbol], dec!(7.5));
2111
2112        // Query at ts=3000 should pick ts=3000 (equal counts)
2113        let marks2 = db
2114            .get_theo_marks_at_timestamp(&[symbol.clone()], 3000)
2115            .await
2116            .unwrap();
2117        assert_eq!(marks2[&symbol], dec!(9.0));
2118
2119        // Query at ts=500 should return empty (no data before 500)
2120        let marks3 = db
2121            .get_theo_marks_at_timestamp(&[symbol.clone()], 500)
2122            .await
2123            .unwrap();
2124        assert!(marks3.is_empty());
2125    }
2126
2127    #[tokio::test]
2128    async fn get_theo_marks_at_timestamp_empty_symbols() {
2129        let test_db = TestDb::new().await.unwrap();
2130        let db = test_db.diesel_db().await;
2131
2132        let marks = db.get_theo_marks_at_timestamp(&[], 1000).await.unwrap();
2133        assert!(marks.is_empty());
2134    }
2135
2136    #[tokio::test]
2137    async fn get_deposit_withdraw_events_returns_ordered() {
2138        let test_db = TestDb::new().await.unwrap();
2139        let db = test_db.diesel_db().await;
2140
2141        let mut conn = test_db.handler.pool().get().unwrap();
2142        let wallet = test_wallet(80);
2143        let w_hex = wallet_hex(&wallet);
2144
2145        // Insert deposit and withdraw events
2146        exec_sql(
2147            &mut conn,
2148            &format!(
2149                "INSERT INTO ledger_events (wallet, event_ts_ms, delta, event_type) VALUES \
2150                 ({w_hex}, 1000, 500.0, 'deposit'), \
2151                 ({w_hex}, 2000, -100.0, 'withdraw'), \
2152                 ({w_hex}, 3000, 200.0, 'deposit'), \
2153                 ({w_hex}, 2500, 50.0, 'fill_premium')"
2154            ),
2155        );
2156
2157        let events = db.get_deposit_withdraw_events(&wallet).await.unwrap();
2158        // Should only return deposit and withdraw, not fill_premium
2159        assert_eq!(events.len(), 3);
2160        // Ordered by event_ts_ms ASC
2161        assert_eq!(events[0].event_ts_ms, 1000);
2162        assert_eq!(events[0].delta, dec!(500.0));
2163        assert_eq!(events[1].event_ts_ms, 2000);
2164        assert_eq!(events[1].delta, dec!(-100.0));
2165        assert_eq!(events[2].event_ts_ms, 3000);
2166        assert_eq!(events[2].delta, dec!(200.0));
2167    }
2168
2169    #[tokio::test]
2170    async fn get_deposit_withdraw_events_empty() {
2171        let test_db = TestDb::new().await.unwrap();
2172        let db = test_db.diesel_db().await;
2173
2174        let wallet = test_wallet(81);
2175        let events = db.get_deposit_withdraw_events(&wallet).await.unwrap();
2176        assert!(events.is_empty());
2177    }
2178
2179    #[tokio::test]
2180    async fn get_settled_pnl_by_symbol_groups_correctly() {
2181        let test_db = TestDb::new().await.unwrap();
2182        let db = test_db.diesel_db().await;
2183
2184        let mut conn = test_db.handler.pool().get().unwrap();
2185        let wallet = test_wallet(82);
2186        let w_hex = wallet_hex(&wallet);
2187
2188        // Insert ledger events with different event types and reference symbols
2189        exec_sql(
2190            &mut conn,
2191            &format!(
2192                "INSERT INTO ledger_events (wallet, event_ts_ms, delta, event_type, reference_symbol) VALUES \
2193                 ({w_hex}, 1000, 100.0, 'fill_premium', 'BTC-20260131-100000-C'), \
2194                 ({w_hex}, 1500, -30.0, 'fill_premium', 'BTC-20260131-100000-C'), \
2195                 ({w_hex}, 2000, 50.0, 'settlement_realized_pnl', 'ETH-20260131-4000-P'), \
2196                 ({w_hex}, 2500, 25.0, 'fill_realized_pnl', 'ETH-20260131-4000-P'), \
2197                 ({w_hex}, 3000, 200.0, 'deposit', NULL)"
2198            ),
2199        );
2200
2201        // Query before ts=3000 should include all matching events
2202        let pnl = db.get_settled_pnl_by_symbol(&wallet, 3000).await.unwrap();
2203        assert_eq!(pnl.len(), 2);
2204
2205        let btc_pnl = pnl.iter().find(|(s, _)| s == "BTC-20260131-100000-C");
2206        assert_eq!(btc_pnl.unwrap().1, dec!(70.0)); // 100 - 30
2207
2208        let eth_pnl = pnl.iter().find(|(s, _)| s == "ETH-20260131-4000-P");
2209        assert_eq!(eth_pnl.unwrap().1, dec!(75.0)); // 50 + 25
2210    }
2211
2212    #[tokio::test]
2213    async fn get_settled_pnl_by_symbol_respects_timestamp_cutoff() {
2214        let test_db = TestDb::new().await.unwrap();
2215        let db = test_db.diesel_db().await;
2216
2217        let mut conn = test_db.handler.pool().get().unwrap();
2218        let wallet = test_wallet(83);
2219        let w_hex = wallet_hex(&wallet);
2220
2221        exec_sql(
2222            &mut conn,
2223            &format!(
2224                "INSERT INTO ledger_events (wallet, event_ts_ms, delta, event_type, reference_symbol) VALUES \
2225                 ({w_hex}, 1000, 100.0, 'fill_premium', 'BTC-20260131-100000-C'), \
2226                 ({w_hex}, 5000, 200.0, 'fill_premium', 'BTC-20260131-100000-C')"
2227            ),
2228        );
2229
2230        // Query before ts=2000 should only include the first event
2231        let pnl = db.get_settled_pnl_by_symbol(&wallet, 2000).await.unwrap();
2232        assert_eq!(pnl.len(), 1);
2233        assert_eq!(pnl[0].1, dec!(100.0));
2234    }
2235
2236    #[tokio::test]
2237    async fn get_fills_since_timestamp_filters_correctly() {
2238        let test_db = TestDb::new().await.unwrap();
2239        let db = test_db.diesel_db().await;
2240
2241        let mut conn = test_db.handler.pool().get().unwrap();
2242        let w1 = wallet_hex(&test_wallet(1));
2243        let w2 = wallet_hex(&test_wallet(2));
2244
2245        // Insert parent trades first
2246        exec_sql(
2247            &mut conn,
2248            &format!(
2249                "INSERT INTO trades (trade_id, symbol, price, size, maker_address, taker_address, maker_fee, taker_fee, timestamp) \
2250                 VALUES \
2251                   (40, 'BTC-20260131-100000-C', 5.0, 1000000, {w1}, {w2}, 0.01, 0.02, 1000), \
2252                   (41, 'BTC-20260131-100000-C', 6.0, 2000000, {w1}, {w2}, 0.01, 0.02, 2000), \
2253                   (42, 'ETH-20260131-4000-P', 1.0, 500000, {w2}, {w1}, 0.005, 0.01, 3000)"
2254            ),
2255        );
2256
2257        // Insert fills at different timestamps. Market-stats backfill reads
2258        // only taker rows so it does not double-count both wallet fill rows.
2259        exec_sql(
2260            &mut conn,
2261            &format!(
2262                "INSERT INTO fills (trade_id, wallet_address, symbol, price, size, fee, is_taker, timestamp, underlying_notional) \
2263                 VALUES \
2264                   (40, {w1}, 'BTC-20260131-100000-C', 5.0, 1000000, 0.01, false, 1000, 100.0), \
2265                   (41, {w1}, 'BTC-20260131-100000-C', 6.0, 2000000, 0.01, false, 2000, 200.0), \
2266                   (42, {w2}, 'ETH-20260131-4000-P', 1.0, 500000, 0.02, true, 3000, 300.0)"
2267            ),
2268        );
2269
2270        // Query since ts=2000 should return only the taker fill at 3000.
2271        let fills = db.get_fills_since_timestamp(2000).await.unwrap();
2272        assert_eq!(fills.len(), 1);
2273        // Ordered ASC by timestamp
2274        assert_eq!(fills[0].timestamp, 3000);
2275        assert_eq!(fills[0].symbol, "ETH-20260131-4000-P");
2276        assert_eq!(fills[0].underlying_notional, Some(dec!(300.0)));
2277
2278        // Query since ts=4000 should return empty
2279        let fills_empty = db.get_fills_since_timestamp(4000).await.unwrap();
2280        assert!(fills_empty.is_empty());
2281    }
2282}