1use std::collections::{HashMap, HashSet};
8
9use anyhow::Result;
10use diesel::prelude::*;
11use diesel::upsert::excluded;
12use diesel_async::{AsyncConnection, RunQueryDsl};
13use rust_decimal::Decimal;
14use struct_convert::Convert;
15
16use crate::diesel_db::DieselDb;
17use crate::models::{
18 BboSnapshot, HistoricalTheoSnapshot, NewBboSnapshot, NewHistoricalPnlSnapshot,
19 NewHistoricalTheoSnapshot, NewVolSurfaceSnapshotRow, SettlementPayout, VolSurfaceSnapshotRow,
20};
21use crate::schema;
22use hypercall_db::{
23 AnalyticsReader, AnalyticsWriter, BboReferenceData, BboSnapshotRecord, FillRecord,
24 HistoricalPnlPoint, HistoricalTheoPoint, NewBboSnapshotInput, OrderRecord,
25 SettlementPayoutRecord, TradeRecord, VolSurfaceSnapshot,
26};
27use hypercall_types::{
28 WalletAddress, HISTORICAL_PNL_INTERVAL_1D_MS, HISTORICAL_PNL_INTERVAL_1H_MS,
29 HISTORICAL_PNL_INTERVAL_5M_MS, HISTORICAL_THEO_INTERVAL_1D_MS, HISTORICAL_THEO_INTERVAL_1H_MS,
30 HISTORICAL_THEO_INTERVAL_5M_MS,
31};
32
33use crate::models::NewSettlementPayoutSeen;
34
35#[derive(QueryableByName, Debug)]
38struct BboReferenceRow {
39 #[diesel(sql_type = diesel::sql_types::Text)]
40 pub symbol: String,
41 #[diesel(sql_type = diesel::sql_types::Numeric)]
42 pub reference_ask: Decimal,
43 #[diesel(sql_type = diesel::sql_types::BigInt)]
44 pub reference_ts: i64,
45 #[diesel(sql_type = diesel::sql_types::Bool)]
46 pub used_earliest_fallback: bool,
47}
48
49#[derive(QueryableByName, Debug, Convert)]
50#[convert(from_on = "TradeRecord")]
51struct TradeRow {
52 #[diesel(sql_type = diesel::sql_types::BigInt)]
53 pub trade_id: i64,
54 #[diesel(sql_type = diesel::sql_types::Text)]
55 pub symbol: String,
56 #[diesel(sql_type = diesel::sql_types::Numeric)]
57 pub price: Decimal,
58 #[diesel(sql_type = diesel::sql_types::Numeric)]
59 pub size: Decimal,
60 #[diesel(sql_type = diesel::sql_types::Bytea)]
61 pub maker_address: WalletAddress,
62 #[diesel(sql_type = diesel::sql_types::Bytea)]
63 pub taker_address: WalletAddress,
64 #[diesel(sql_type = diesel::sql_types::Numeric)]
65 pub maker_fee: Decimal,
66 #[diesel(sql_type = diesel::sql_types::Numeric)]
67 pub taker_fee: Decimal,
68 #[diesel(sql_type = diesel::sql_types::BigInt)]
69 pub timestamp: i64,
70 #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Timestamp>)]
71 #[convert_field(
72 custom_fn = ".map(|dt| chrono::DateTime::<chrono::Utc>::from_naive_utc_and_offset(dt, chrono::Utc))"
73 )]
74 pub created_at: Option<chrono::NaiveDateTime>,
75}
76
77#[derive(QueryableByName, Debug, Convert)]
78#[convert(from_on = "FillRecord")]
79struct FillRow {
80 #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::BigInt>)]
81 pub fill_id: Option<i64>,
82 #[diesel(sql_type = diesel::sql_types::BigInt)]
83 pub trade_id: i64,
84 #[diesel(sql_type = diesel::sql_types::Bytea)]
85 pub wallet_address: WalletAddress,
86 #[diesel(sql_type = diesel::sql_types::Text)]
87 pub symbol: String,
88 #[diesel(sql_type = diesel::sql_types::Numeric)]
89 pub price: Decimal,
90 #[diesel(sql_type = diesel::sql_types::Numeric)]
91 pub size: Decimal,
92 #[diesel(sql_type = diesel::sql_types::Numeric)]
93 pub fee: Decimal,
94 #[diesel(sql_type = diesel::sql_types::Bool)]
95 pub is_taker: bool,
96 #[diesel(sql_type = diesel::sql_types::BigInt)]
97 pub timestamp: i64,
98 #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Timestamp>)]
99 #[convert_field(
100 custom_fn = ".map(|dt| chrono::DateTime::<chrono::Utc>::from_naive_utc_and_offset(dt, chrono::Utc))"
101 )]
102 pub created_at: Option<chrono::NaiveDateTime>,
103 #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Bytea>)]
104 pub builder_code_address: Option<WalletAddress>,
105 #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Numeric>)]
106 pub builder_code_fee: Option<Decimal>,
107 #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Numeric>)]
108 pub realized_pnl: Option<Decimal>,
109 #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Numeric>)]
110 pub underlying_notional: Option<Decimal>,
111}
112
113#[derive(QueryableByName, Debug, Convert)]
114#[convert(from_on = "OrderRecord")]
115struct OrderRow {
116 #[diesel(sql_type = diesel::sql_types::BigInt)]
117 pub order_id: i64,
118 #[diesel(sql_type = diesel::sql_types::Bytea)]
119 pub wallet_address: WalletAddress,
120 #[diesel(sql_type = diesel::sql_types::Text)]
121 pub symbol: String,
122 #[diesel(sql_type = diesel::sql_types::Text)]
123 pub side: String,
124 #[diesel(sql_type = diesel::sql_types::Numeric)]
125 pub price: Decimal,
126 #[diesel(sql_type = diesel::sql_types::Numeric)]
127 pub size: Decimal,
128 #[diesel(sql_type = diesel::sql_types::Text)]
129 pub tif: String,
130 #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)]
131 pub client_id: Option<String>,
132 #[diesel(sql_type = diesel::sql_types::Bool)]
133 pub is_perp: bool,
134 #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)]
135 pub underlying: Option<String>,
136 #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Bool>)]
137 pub reduce_only: Option<bool>,
138 #[diesel(sql_type = diesel::sql_types::Text)]
139 pub status: String,
140 #[diesel(sql_type = diesel::sql_types::BigInt)]
141 #[convert_field(rename = "timestamp")]
142 pub created_at: i64,
143 #[diesel(sql_type = diesel::sql_types::Timestamp)]
144 #[convert_field(
145 rename = "created_at",
146 custom_fn = "Some(chrono::DateTime::<chrono::Utc>::from_naive_utc_and_offset(this.updated_at, chrono::Utc))"
147 )]
148 pub updated_at: chrono::NaiveDateTime,
149 #[diesel(sql_type = diesel::sql_types::Numeric)]
150 pub filled_size: Decimal,
151 #[diesel(sql_type = diesel::sql_types::Bool)]
152 pub mmp_enabled: bool,
153}
154
155fn normalize_materialized_order_status_filter(status: &str) -> String {
156 match status.trim().to_ascii_lowercase().as_str() {
157 "open" => "OPEN".to_string(),
158 "acked" => "ACKED".to_string(),
159 "partially_filled" | "partially-filled" => "PARTIALLY_FILLED".to_string(),
160 "filled" => "FILLED".to_string(),
161 "canceled" | "cancelled" => "CANCELED".to_string(),
162 "rejected" => "REJECTED".to_string(),
163 other => other.to_ascii_uppercase().replace('-', "_"),
164 }
165}
166
167#[async_trait::async_trait]
172impl AnalyticsReader for DieselDb {
173 async fn get_all_trades(&self, limit: usize, offset: usize) -> Result<Vec<TradeRecord>> {
174 let mut conn = self.get_conn().await?;
175
176 let trades = diesel::sql_query(
177 "SELECT trade_id, symbol, price, size,
178 maker_address, taker_address, maker_fee,
179 taker_fee, timestamp, created_at
180 FROM trades
181 ORDER BY timestamp DESC
182 LIMIT $1 OFFSET $2",
183 )
184 .bind::<diesel::sql_types::Integer, _>(limit as i32)
185 .bind::<diesel::sql_types::Integer, _>(offset as i32)
186 .load::<TradeRow>(&mut conn)
187 .await?;
188
189 Ok(trades.into_iter().map(Into::into).collect())
190 }
191
192 async fn get_trades_by_option(
193 &self,
194 option_id: &str,
195 limit: usize,
196 ) -> Result<Vec<TradeRecord>> {
197 let mut conn = self.get_conn().await?;
198
199 let trades = diesel::sql_query(
200 "SELECT trade_id, symbol, price, size,
201 maker_address, taker_address, maker_fee,
202 taker_fee, timestamp, created_at
203 FROM trades
204 WHERE symbol = $1
205 ORDER BY timestamp DESC
206 LIMIT $2",
207 )
208 .bind::<diesel::sql_types::Text, _>(option_id)
209 .bind::<diesel::sql_types::Integer, _>(limit as i32)
210 .load::<TradeRow>(&mut conn)
211 .await?;
212
213 Ok(trades.into_iter().map(Into::into).collect())
214 }
215
216 async fn get_trades_by_underlying(
217 &self,
218 underlying: &str,
219 limit: usize,
220 offset: usize,
221 ) -> Result<Vec<TradeRecord>> {
222 let mut conn = self.get_conn().await?;
223
224 let trades = diesel::sql_query(
225 "SELECT t.trade_id, t.symbol, t.price, t.size,
226 t.maker_address, t.taker_address, t.maker_fee,
227 t.taker_fee, t.timestamp, t.created_at
228 FROM trades t
229 INNER JOIN instruments i ON t.symbol = i.id
230 WHERE i.underlying = $1
231 ORDER BY t.timestamp DESC
232 LIMIT $2 OFFSET $3",
233 )
234 .bind::<diesel::sql_types::Text, _>(underlying)
235 .bind::<diesel::sql_types::Integer, _>(limit as i32)
236 .bind::<diesel::sql_types::Integer, _>(offset as i32)
237 .load::<TradeRow>(&mut conn)
238 .await?;
239
240 Ok(trades.into_iter().map(Into::into).collect())
241 }
242
243 async fn get_trades_by_account(
244 &self,
245 account: &WalletAddress,
246 limit: usize,
247 offset: usize,
248 ) -> Result<Vec<TradeRecord>> {
249 let mut conn = self.get_conn().await?;
250
251 let trades = diesel::sql_query(
252 "SELECT trade_id, symbol, price, size,
253 maker_address, taker_address, maker_fee,
254 taker_fee, timestamp, created_at
255 FROM trades
256 WHERE maker_address = $1 OR taker_address = $1
257 ORDER BY timestamp DESC
258 LIMIT $2 OFFSET $3",
259 )
260 .bind::<diesel::sql_types::Bytea, _>(account)
261 .bind::<diesel::sql_types::Integer, _>(limit as i32)
262 .bind::<diesel::sql_types::Integer, _>(offset as i32)
263 .load::<TradeRow>(&mut conn)
264 .await?;
265
266 Ok(trades.into_iter().map(Into::into).collect())
267 }
268
269 async fn get_trades_for_symbol_in_range(
270 &self,
271 symbol: &str,
272 start_time_ms: i64,
273 end_time_ms: i64,
274 ) -> Result<Vec<TradeRecord>> {
275 let mut conn = self.get_conn().await?;
276
277 let trades = diesel::sql_query(
278 "SELECT trade_id, symbol, price, size,
279 maker_address, taker_address, maker_fee,
280 taker_fee, timestamp, created_at
281 FROM trades
282 WHERE symbol = $1 AND timestamp >= $2 AND timestamp <= $3
283 ORDER BY timestamp ASC",
284 )
285 .bind::<diesel::sql_types::Text, _>(symbol)
286 .bind::<diesel::sql_types::BigInt, _>(start_time_ms)
287 .bind::<diesel::sql_types::BigInt, _>(end_time_ms)
288 .load::<TradeRow>(&mut conn)
289 .await?;
290
291 Ok(trades.into_iter().map(Into::into).collect())
292 }
293
294 async fn get_fills_by_account(
295 &self,
296 account: &WalletAddress,
297 limit: usize,
298 offset: usize,
299 ) -> Result<Vec<FillRecord>> {
300 let mut conn = self.get_conn().await?;
301
302 let fills = diesel::sql_query(
303 "SELECT fill_id, trade_id, wallet_address, symbol, price, size, fee, is_taker, timestamp, created_at, builder_code_address, builder_code_fee, realized_pnl, underlying_notional
304 FROM fills
305 WHERE wallet_address = $1
306 ORDER BY timestamp DESC
307 LIMIT $2 OFFSET $3",
308 )
309 .bind::<diesel::sql_types::Binary, _>(account)
310 .bind::<diesel::sql_types::Integer, _>(limit as i32)
311 .bind::<diesel::sql_types::Integer, _>(offset as i32)
312 .load::<FillRow>(&mut conn)
313 .await?;
314
315 Ok(fills.into_iter().map(Into::into).collect())
316 }
317
318 async fn get_orders_by_account(
319 &self,
320 account: &WalletAddress,
321 status: Option<&str>,
322 limit: usize,
323 offset: usize,
324 ) -> Result<Vec<OrderRecord>> {
325 let mut conn = self.get_conn().await?;
326
327 let orders = if let Some(status_filter) = status {
328 let normalized_status = normalize_materialized_order_status_filter(status_filter);
329 diesel::sql_query(
330 "SELECT
331 oi.order_id,
332 oi.wallet_address,
333 oi.symbol,
334 oi.side,
335 oi.price,
336 oi.size,
337 oi.tif,
338 oi.client_id,
339 oi.is_perp,
340 oi.underlying,
341 oi.reduce_only,
342 oi.status,
343 oi.timestamp as created_at,
344 oi.updated_at,
345 oi.filled_size,
346 oi.mmp_enabled
347 FROM order_infos oi
348 WHERE oi.wallet_address = $1 AND oi.status = $2
349 ORDER BY oi.timestamp DESC
350 LIMIT $3 OFFSET $4",
351 )
352 .bind::<diesel::sql_types::Binary, _>(account)
353 .bind::<diesel::sql_types::Text, _>(normalized_status)
354 .bind::<diesel::sql_types::Integer, _>(limit as i32)
355 .bind::<diesel::sql_types::Integer, _>(offset as i32)
356 .load::<OrderRow>(&mut conn)
357 .await?
358 } else {
359 diesel::sql_query(
360 "SELECT
361 oi.order_id,
362 oi.wallet_address,
363 oi.symbol,
364 oi.side,
365 oi.price,
366 oi.size,
367 oi.tif,
368 oi.client_id,
369 oi.is_perp,
370 oi.underlying,
371 oi.reduce_only,
372 oi.status,
373 oi.timestamp as created_at,
374 oi.updated_at,
375 oi.filled_size,
376 oi.mmp_enabled
377 FROM order_infos oi
378 WHERE oi.wallet_address = $1
379 ORDER BY oi.timestamp DESC
380 LIMIT $2 OFFSET $3",
381 )
382 .bind::<diesel::sql_types::Binary, _>(account)
383 .bind::<diesel::sql_types::Integer, _>(limit as i32)
384 .bind::<diesel::sql_types::Integer, _>(offset as i32)
385 .load::<OrderRow>(&mut conn)
386 .await?
387 };
388
389 Ok(orders.into_iter().map(Into::into).collect())
390 }
391
392 async fn trade_history_exists_for_symbol(&self, symbol: &str) -> Result<bool> {
393 let mut conn = self.get_conn().await?;
394
395 #[derive(QueryableByName, Debug)]
396 struct ExistsRow {
397 #[diesel(sql_type = diesel::sql_types::Bool)]
398 exists: bool,
399 }
400
401 let row =
402 diesel::sql_query("SELECT EXISTS(SELECT 1 FROM trades WHERE symbol = $1) AS exists")
403 .bind::<diesel::sql_types::Text, _>(symbol)
404 .get_result::<ExistsRow>(&mut conn)
405 .await?;
406
407 Ok(row.exists)
408 }
409
410 async fn historical_theo_history_exists_for_symbol(&self, symbol: &str) -> Result<bool> {
411 use schema::historical_theo_snapshots::dsl as ht;
412
413 let mut conn = self.get_conn().await?;
414
415 let exists = diesel::select(diesel::dsl::exists(
416 ht::historical_theo_snapshots.filter(ht::symbol.eq(symbol)),
417 ))
418 .get_result::<bool>(&mut conn)
419 .await?;
420
421 Ok(exists)
422 }
423
424 async fn get_historical_pnl(
429 &self,
430 wallet: &WalletAddress,
431 interval_ms: i64,
432 limit: usize,
433 include_attribution: bool,
434 ) -> Result<Vec<HistoricalPnlPoint>> {
435 use schema::historical_pnl_snapshots::dsl as hp;
436
437 let mut conn = self.get_conn().await?;
438
439 let mut rows: Vec<HistoricalPnlPoint> = if include_attribution {
440 hp::historical_pnl_snapshots
441 .filter(hp::wallet_address.eq(wallet))
442 .filter(hp::interval_ms.eq(interval_ms))
443 .order(hp::timestamp_ms.desc())
444 .limit(limit as i64)
445 .select((hp::timestamp_ms, hp::total_equity, hp::attribution))
446 .load::<(i64, Decimal, Option<Vec<u8>>)>(&mut conn)
447 .await?
448 .into_iter()
449 .map(
450 |(timestamp_ms, total_equity, attribution)| HistoricalPnlPoint {
451 timestamp_ms,
452 total_equity,
453 attribution,
454 },
455 )
456 .collect()
457 } else {
458 hp::historical_pnl_snapshots
459 .filter(hp::wallet_address.eq(wallet))
460 .filter(hp::interval_ms.eq(interval_ms))
461 .order(hp::timestamp_ms.desc())
462 .limit(limit as i64)
463 .select((hp::timestamp_ms, hp::total_equity))
464 .load::<(i64, Decimal)>(&mut conn)
465 .await?
466 .into_iter()
467 .map(|(timestamp_ms, total_equity)| HistoricalPnlPoint {
468 timestamp_ms,
469 total_equity,
470 attribution: None,
471 })
472 .collect()
473 };
474
475 rows.reverse();
476 Ok(rows)
477 }
478
479 async fn get_historical_theos(
480 &self,
481 symbol: &str,
482 interval_ms: i64,
483 limit: usize,
484 ) -> Result<Vec<HistoricalTheoPoint>> {
485 use schema::historical_theo_snapshots::dsl as ht;
486
487 let mut conn = self.get_conn().await?;
488
489 let mut rows = ht::historical_theo_snapshots
490 .filter(ht::symbol.eq(symbol))
491 .filter(ht::interval_ms.eq(interval_ms))
492 .order(ht::timestamp_ms.desc())
493 .limit(limit as i64)
494 .load::<HistoricalTheoSnapshot>(&mut conn)
495 .await?;
496
497 rows.reverse();
498
499 Ok(rows
500 .into_iter()
501 .map(|row| HistoricalTheoPoint {
502 timestamp_ms: row.timestamp_ms,
503 theoretical_price: row.theoretical_price,
504 })
505 .collect())
506 }
507
508 async fn get_historical_theos_batch(
509 &self,
510 symbols: &[String],
511 interval_ms: i64,
512 limit: usize,
513 ) -> Result<HashMap<String, Vec<HistoricalTheoPoint>>> {
514 use schema::historical_theo_snapshots::dsl as ht;
515
516 if symbols.is_empty() {
517 return Ok(HashMap::new());
518 }
519
520 let mut conn = self.get_conn().await?;
521
522 let rows = ht::historical_theo_snapshots
523 .filter(ht::symbol.eq_any(symbols))
524 .filter(ht::interval_ms.eq(interval_ms))
525 .order((ht::symbol.asc(), ht::timestamp_ms.desc()))
526 .load::<HistoricalTheoSnapshot>(&mut conn)
527 .await?;
528
529 let mut result: HashMap<String, Vec<HistoricalTheoPoint>> = HashMap::new();
530 for row in rows {
531 let entry = result.entry(row.symbol.clone()).or_default();
532 if entry.len() < limit {
533 entry.push(HistoricalTheoPoint {
534 timestamp_ms: row.timestamp_ms,
535 theoretical_price: row.theoretical_price,
536 });
537 }
538 }
539
540 for points in result.values_mut() {
541 points.reverse();
542 }
543
544 Ok(result)
545 }
546
547 async fn get_vol_surface_history(
548 &self,
549 underlying: &str,
550 interval_ms: i64,
551 limit: usize,
552 ) -> Result<Vec<VolSurfaceSnapshot>> {
553 use schema::vol_surface_snapshots::dsl as vs;
554
555 let mut conn = self.get_conn().await?;
556
557 let mut rows = vs::vol_surface_snapshots
558 .filter(vs::underlying.eq(underlying))
559 .filter(vs::interval_ms.eq(interval_ms))
560 .order(vs::timestamp_ms.desc())
561 .limit(limit as i64)
562 .load::<VolSurfaceSnapshotRow>(&mut conn)
563 .await?;
564
565 rows.reverse();
566
567 Ok(rows
568 .into_iter()
569 .map(|row| VolSurfaceSnapshot {
570 id: row.id,
571 underlying: row.underlying,
572 interval_ms: row.interval_ms,
573 timestamp_ms: row.timestamp_ms,
574 surface_json: row.surface_json,
575 created_at: row.created_at,
576 })
577 .collect())
578 }
579
580 async fn load_bbo_snapshots_since(&self, cutoff_ts: i64) -> Result<Vec<BboSnapshotRecord>> {
581 use schema::bbo_snapshots::dsl as bbo;
582
583 let mut conn = self.get_conn().await?;
584
585 let rows = bbo::bbo_snapshots
586 .filter(bbo::snapshot_ts.ge(cutoff_ts))
587 .order((bbo::symbol.asc(), bbo::snapshot_ts.asc()))
588 .load::<BboSnapshot>(&mut conn)
589 .await?;
590
591 Ok(rows
592 .into_iter()
593 .map(|row| BboSnapshotRecord {
594 id: row.id,
595 symbol: row.symbol,
596 best_bid: row.best_bid,
597 best_ask: row.best_ask,
598 best_bid_size: row.best_bid_size,
599 best_ask_size: row.best_ask_size,
600 snapshot_ts: row.snapshot_ts,
601 created_at: row.created_at,
602 })
603 .collect())
604 }
605
606 async fn get_bbo_reference_asks(
607 &self,
608 symbols: &[String],
609 cutoff_ts: i64,
610 ) -> Result<HashMap<String, BboReferenceData>> {
611 if symbols.is_empty() {
612 return Ok(HashMap::new());
613 }
614
615 let mut conn = self.get_conn().await?;
616
617 let rows = diesel::sql_query(
618 "WITH requested AS (
619 SELECT UNNEST($1::text[]) AS symbol
620 ),
621 latest_before AS (
622 SELECT DISTINCT ON (b.symbol) b.symbol, b.best_ask, b.snapshot_ts
623 FROM bbo_snapshots b
624 JOIN requested r ON r.symbol = b.symbol
625 WHERE b.snapshot_ts <= $2
626 ORDER BY b.symbol, b.snapshot_ts DESC
627 ),
628 earliest_any AS (
629 SELECT DISTINCT ON (b.symbol) b.symbol, b.best_ask, b.snapshot_ts
630 FROM bbo_snapshots b
631 JOIN requested r ON r.symbol = b.symbol
632 ORDER BY b.symbol, b.snapshot_ts ASC
633 )
634 SELECT r.symbol,
635 COALESCE(lb.best_ask, ea.best_ask) AS reference_ask,
636 COALESCE(lb.snapshot_ts, ea.snapshot_ts) AS reference_ts,
637 (lb.symbol IS NULL AND ea.symbol IS NOT NULL) AS used_earliest_fallback
638 FROM requested r
639 LEFT JOIN latest_before lb USING (symbol)
640 LEFT JOIN earliest_any ea USING (symbol)
641 WHERE COALESCE(lb.symbol, ea.symbol) IS NOT NULL",
642 )
643 .bind::<diesel::sql_types::Array<diesel::sql_types::Text>, _>(symbols)
644 .bind::<diesel::sql_types::BigInt, _>(cutoff_ts)
645 .load::<BboReferenceRow>(&mut conn)
646 .await?;
647
648 Ok(rows
649 .into_iter()
650 .map(|row| {
651 (
652 row.symbol,
653 BboReferenceData {
654 reference_ask: row.reference_ask,
655 reference_ts: row.reference_ts,
656 used_earliest_fallback: row.used_earliest_fallback,
657 },
658 )
659 })
660 .collect())
661 }
662
663 async fn get_settlement_payouts(
664 &self,
665 wallet: &WalletAddress,
666 limit: i64,
667 offset: i64,
668 symbol: Option<&str>,
669 ledger_applied: Option<bool>,
670 ) -> Result<(Vec<SettlementPayoutRecord>, i64)> {
671 use schema::settlement_payouts::dsl as sp;
672
673 let mut conn = self.get_conn().await?;
674
675 let mut count_query = sp::settlement_payouts
676 .filter(sp::wallet.eq(wallet))
677 .into_boxed();
678 let mut rows_query = sp::settlement_payouts
679 .filter(sp::wallet.eq(wallet))
680 .into_boxed();
681
682 if let Some(symbol_filter) = symbol {
683 count_query = count_query.filter(sp::symbol.eq(symbol_filter));
684 rows_query = rows_query.filter(sp::symbol.eq(symbol_filter));
685 }
686
687 if let Some(applied_filter) = ledger_applied {
688 count_query = count_query.filter(sp::ledger_applied.eq(applied_filter));
689 rows_query = rows_query.filter(sp::ledger_applied.eq(applied_filter));
690 }
691
692 let count: i64 = count_query.count().get_result(&mut conn).await?;
693 let rows = rows_query
694 .order(sp::id.desc())
695 .limit(limit)
696 .offset(offset)
697 .load::<SettlementPayout>(&mut conn)
698 .await?;
699
700 Ok((rows.into_iter().map(Into::into).collect(), count))
701 }
702
703 async fn get_seen_settlement_payout_ids(
704 &self,
705 wallet: &WalletAddress,
706 payout_ids: &[i64],
707 ) -> Result<HashSet<i64>> {
708 use schema::settlement_payout_seen::dsl as sps;
709
710 if payout_ids.is_empty() {
711 return Ok(HashSet::new());
712 }
713
714 let mut conn = self.get_conn().await?;
715
716 let rows: Vec<i64> = sps::settlement_payout_seen
717 .filter(sps::wallet.eq(wallet))
718 .filter(sps::payout_id.eq_any(payout_ids))
719 .select(sps::payout_id)
720 .load::<i64>(&mut conn)
721 .await?;
722
723 Ok(rows.into_iter().collect())
724 }
725
726 async fn get_client_ids_by_order_ids(
727 &self,
728 order_ids: &[i64],
729 ) -> Result<HashMap<i64, Option<String>>> {
730 if order_ids.is_empty() {
731 return Ok(HashMap::new());
732 }
733 let mut conn = self.get_conn().await?;
734 let results = crate::schema::order_infos::table
735 .filter(crate::schema::order_infos::order_id.eq_any(order_ids))
736 .select((
737 crate::schema::order_infos::order_id,
738 crate::schema::order_infos::client_id,
739 ))
740 .load::<(i64, Option<String>)>(&mut conn)
741 .await?;
742 Ok(results.into_iter().collect())
743 }
744
745 async fn get_theo_marks_at_timestamp(
746 &self,
747 symbols: &[String],
748 timestamp_ms: i64,
749 ) -> Result<HashMap<String, Decimal>> {
750 if symbols.is_empty() {
751 return Ok(HashMap::new());
752 }
753
754 #[derive(QueryableByName)]
755 struct TheoRow {
756 #[diesel(sql_type = diesel::sql_types::Text)]
757 symbol: String,
758 #[diesel(sql_type = diesel::sql_types::Numeric)]
759 theoretical_price: Decimal,
760 }
761
762 let mut conn = self.get_conn().await?;
763 let rows = diesel::sql_query(
764 "SELECT DISTINCT ON (symbol) symbol, theoretical_price
765 FROM historical_theo_snapshots
766 WHERE symbol = ANY($1) AND timestamp_ms <= $2
767 ORDER BY symbol, timestamp_ms DESC, interval_ms ASC",
768 )
769 .bind::<diesel::sql_types::Array<diesel::sql_types::Text>, _>(symbols)
770 .bind::<diesel::sql_types::BigInt, _>(timestamp_ms)
771 .get_results::<TheoRow>(&mut conn)
772 .await?;
773
774 Ok(rows
775 .into_iter()
776 .map(|row| (row.symbol, row.theoretical_price))
777 .collect())
778 }
779
780 async fn get_deposit_withdraw_events(
781 &self,
782 wallet: &WalletAddress,
783 ) -> Result<Vec<hypercall_db::LedgerEventTimeDelta>> {
784 #[derive(QueryableByName)]
785 struct LedgerEventRow {
786 #[diesel(sql_type = diesel::sql_types::BigInt)]
787 event_ts_ms: i64,
788 #[diesel(sql_type = diesel::sql_types::Numeric)]
789 delta: Decimal,
790 }
791
792 let mut conn = self.get_conn().await?;
793 let rows = diesel::sql_query(
794 "SELECT event_ts_ms, delta
795 FROM ledger_events
796 WHERE wallet = $1 AND event_type IN ('deposit', 'withdraw')
797 ORDER BY event_ts_ms ASC, id ASC",
798 )
799 .bind::<diesel::sql_types::Bytea, _>(wallet)
800 .get_results::<LedgerEventRow>(&mut conn)
801 .await?;
802
803 Ok(rows
804 .into_iter()
805 .map(|r| hypercall_db::LedgerEventTimeDelta {
806 event_ts_ms: r.event_ts_ms,
807 delta: r.delta,
808 })
809 .collect())
810 }
811
812 async fn get_settled_pnl_by_symbol(
813 &self,
814 wallet: &WalletAddress,
815 before_ts_ms: i64,
816 ) -> Result<Vec<(String, Decimal)>> {
817 #[derive(QueryableByName)]
818 struct SettlementRow {
819 #[diesel(sql_type = diesel::sql_types::Text)]
820 reference_symbol: String,
821 #[diesel(sql_type = diesel::sql_types::Numeric)]
822 total_pnl: Decimal,
823 }
824
825 let mut conn = self.get_conn().await?;
826 let rows = diesel::sql_query(
827 "SELECT reference_symbol, SUM(delta) AS total_pnl
828 FROM ledger_events
829 WHERE wallet = $1
830 AND event_ts_ms <= $2
831 AND event_type IN (
832 'fill_premium',
833 'fill_realized_pnl',
834 'settlement_realized_pnl'
835 )
836 AND reference_symbol IS NOT NULL
837 GROUP BY reference_symbol",
838 )
839 .bind::<diesel::sql_types::Bytea, _>(wallet)
840 .bind::<diesel::sql_types::BigInt, _>(before_ts_ms)
841 .get_results::<SettlementRow>(&mut conn)
842 .await?;
843
844 Ok(rows
845 .into_iter()
846 .map(|r| (r.reference_symbol, r.total_pnl))
847 .collect())
848 }
849
850 async fn get_fills_since_timestamp(
851 &self,
852 cutoff_timestamp: i64,
853 ) -> Result<Vec<hypercall_db::FillBackfillRecord>> {
854 #[derive(QueryableByName)]
855 struct FillBackfillRow {
856 #[diesel(sql_type = diesel::sql_types::Text)]
857 symbol: String,
858 #[diesel(sql_type = diesel::sql_types::Numeric)]
859 price: Decimal,
860 #[diesel(sql_type = diesel::sql_types::Numeric)]
861 size: Decimal,
862 #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Numeric>)]
863 underlying_notional: Option<Decimal>,
864 #[diesel(sql_type = diesel::sql_types::BigInt)]
865 timestamp: i64,
866 }
867
868 let mut conn = self.get_conn().await?;
869 let rows = diesel::sql_query(
870 "SELECT symbol, price, size, underlying_notional, timestamp
871 FROM fills
872 WHERE timestamp >= $1
873 AND is_taker = true
874 ORDER BY timestamp ASC",
875 )
876 .bind::<diesel::sql_types::BigInt, _>(cutoff_timestamp)
877 .get_results::<FillBackfillRow>(&mut conn)
878 .await?;
879
880 Ok(rows
881 .into_iter()
882 .map(|r| hypercall_db::FillBackfillRecord {
883 symbol: r.symbol,
884 price: r.price,
885 size: r.size,
886 underlying_notional: r.underlying_notional,
887 timestamp: r.timestamp,
888 })
889 .collect())
890 }
891}
892
893#[async_trait::async_trait]
898impl AnalyticsWriter for DieselDb {
899 async fn upsert_historical_pnl_batch(
900 &self,
901 interval_ms: i64,
902 timestamp_ms: i64,
903 snapshots: &[(WalletAddress, Decimal, Option<Vec<u8>>)],
904 max_periods: i64,
905 ) -> Result<usize> {
906 use schema::historical_pnl_snapshots::dsl as hp;
907
908 if snapshots.is_empty() {
909 return Ok(0);
910 }
911 if max_periods <= 0 {
912 anyhow::bail!("max_periods must be positive, got {}", max_periods);
913 }
914 if ![
915 HISTORICAL_PNL_INTERVAL_5M_MS,
916 HISTORICAL_PNL_INTERVAL_1H_MS,
917 HISTORICAL_PNL_INTERVAL_1D_MS,
918 ]
919 .contains(&interval_ms)
920 {
921 anyhow::bail!(
922 "Invalid interval_ms {}: must be one of the known intervals (5m, 1h, 1d)",
923 interval_ms
924 );
925 }
926
927 let mut conn = self.get_conn().await?;
928
929 let new_rows: Vec<NewHistoricalPnlSnapshot> = snapshots
930 .iter()
931 .map(|(wallet, equity, attr)| NewHistoricalPnlSnapshot {
932 wallet_address: *wallet,
933 interval_ms,
934 timestamp_ms,
935 total_equity: *equity,
936 attribution: attr.clone(),
937 })
938 .collect();
939
940 let wallet_bytes: Vec<Vec<u8>> = snapshots
941 .iter()
942 .map(|(w, _, _)| w.as_bytes().to_vec())
943 .collect();
944
945 conn.transaction::<_, diesel::result::Error, _>(async |conn| {
946 let affected = diesel::insert_into(hp::historical_pnl_snapshots)
947 .values(&new_rows)
948 .on_conflict((hp::wallet_address, hp::interval_ms, hp::timestamp_ms))
949 .do_update()
950 .set((
951 hp::total_equity.eq(excluded(hp::total_equity)),
952 hp::attribution.eq(excluded(hp::attribution)),
953 ))
954 .execute(&mut *conn)
955 .await?;
956
957 diesel::sql_query(
958 "DELETE FROM historical_pnl_snapshots h
959 USING (
960 SELECT id FROM (
961 SELECT id,
962 ROW_NUMBER() OVER (
963 PARTITION BY wallet_address, interval_ms
964 ORDER BY timestamp_ms DESC
965 ) AS row_num
966 FROM historical_pnl_snapshots
967 WHERE interval_ms = $1
968 AND wallet_address = ANY($3)
969 ) ranked
970 WHERE ranked.row_num > $2
971 ) stale
972 WHERE h.id = stale.id",
973 )
974 .bind::<diesel::sql_types::BigInt, _>(interval_ms)
975 .bind::<diesel::sql_types::BigInt, _>(max_periods)
976 .bind::<diesel::sql_types::Array<diesel::sql_types::Bytea>, _>(&wallet_bytes)
977 .execute(&mut *conn)
978 .await?;
979
980 Ok(affected)
981 })
982 .await
983 .map_err(Into::into)
984 }
985
986 async fn upsert_historical_theo_batch(
987 &self,
988 interval_ms: i64,
989 timestamp_ms: i64,
990 snapshots: &[(String, Decimal)],
991 max_periods: i64,
992 ) -> Result<usize> {
993 use schema::historical_theo_snapshots::dsl as ht;
994
995 if snapshots.is_empty() {
996 return Ok(0);
997 }
998 if max_periods <= 0 {
999 anyhow::bail!("max_periods must be positive, got {}", max_periods);
1000 }
1001 if ![
1002 HISTORICAL_THEO_INTERVAL_5M_MS,
1003 HISTORICAL_THEO_INTERVAL_1H_MS,
1004 HISTORICAL_THEO_INTERVAL_1D_MS,
1005 ]
1006 .contains(&interval_ms)
1007 {
1008 anyhow::bail!(
1009 "Invalid interval_ms {}: must be one of the known intervals (5m, 1h, 1d)",
1010 interval_ms
1011 );
1012 }
1013
1014 let mut conn = self.get_conn().await?;
1015
1016 let new_rows: Vec<NewHistoricalTheoSnapshot> = snapshots
1017 .iter()
1018 .map(|(symbol, theoretical_price)| NewHistoricalTheoSnapshot {
1019 symbol: symbol.clone(),
1020 interval_ms,
1021 timestamp_ms,
1022 theoretical_price: *theoretical_price,
1023 })
1024 .collect();
1025
1026 let symbols: Vec<String> = snapshots.iter().map(|(symbol, _)| symbol.clone()).collect();
1027
1028 conn.transaction::<_, diesel::result::Error, _>(async |conn| {
1029 let affected = diesel::insert_into(ht::historical_theo_snapshots)
1030 .values(&new_rows)
1031 .on_conflict((ht::symbol, ht::interval_ms, ht::timestamp_ms))
1032 .do_update()
1033 .set(ht::theoretical_price.eq(excluded(ht::theoretical_price)))
1034 .execute(&mut *conn)
1035 .await?;
1036
1037 diesel::sql_query(
1038 "DELETE FROM historical_theo_snapshots h
1039 USING (
1040 SELECT id FROM (
1041 SELECT id,
1042 ROW_NUMBER() OVER (
1043 PARTITION BY symbol, interval_ms
1044 ORDER BY timestamp_ms DESC
1045 ) AS row_num
1046 FROM historical_theo_snapshots
1047 WHERE interval_ms = $1
1048 AND symbol = ANY($3)
1049 ) ranked
1050 WHERE ranked.row_num > $2
1051 ) stale
1052 WHERE h.id = stale.id",
1053 )
1054 .bind::<diesel::sql_types::BigInt, _>(interval_ms)
1055 .bind::<diesel::sql_types::BigInt, _>(max_periods)
1056 .bind::<diesel::sql_types::Array<diesel::sql_types::Text>, _>(&symbols)
1057 .execute(&mut *conn)
1058 .await?;
1059
1060 Ok(affected)
1061 })
1062 .await
1063 .map_err(Into::into)
1064 }
1065
1066 async fn upsert_vol_surface_snapshot(
1067 &self,
1068 interval_ms: i64,
1069 timestamp_ms: i64,
1070 underlying: &str,
1071 surface_json: serde_json::Value,
1072 max_periods: i64,
1073 ) -> Result<usize> {
1074 use schema::vol_surface_snapshots::dsl as vs;
1075
1076 if max_periods <= 0 {
1077 anyhow::bail!("max_periods must be positive, got {}", max_periods);
1078 }
1079 if ![
1080 HISTORICAL_THEO_INTERVAL_5M_MS,
1081 HISTORICAL_THEO_INTERVAL_1H_MS,
1082 HISTORICAL_THEO_INTERVAL_1D_MS,
1083 ]
1084 .contains(&interval_ms)
1085 {
1086 anyhow::bail!(
1087 "Invalid interval_ms {}: must be one of the known intervals (5m, 1h, 1d)",
1088 interval_ms
1089 );
1090 }
1091
1092 let mut conn = self.get_conn().await?;
1093
1094 let new_row = NewVolSurfaceSnapshotRow {
1095 underlying: underlying.to_string(),
1096 interval_ms,
1097 timestamp_ms,
1098 surface_json,
1099 };
1100
1101 let underlying_owned = underlying.to_string();
1102
1103 conn.transaction::<_, diesel::result::Error, _>(async |conn| {
1104 let affected = diesel::insert_into(vs::vol_surface_snapshots)
1105 .values(&new_row)
1106 .on_conflict((vs::underlying, vs::interval_ms, vs::timestamp_ms))
1107 .do_update()
1108 .set(vs::surface_json.eq(excluded(vs::surface_json)))
1109 .execute(&mut *conn)
1110 .await?;
1111
1112 diesel::sql_query(
1113 "DELETE FROM vol_surface_snapshots v
1114 USING (
1115 SELECT id FROM (
1116 SELECT id,
1117 ROW_NUMBER() OVER (
1118 PARTITION BY underlying, interval_ms
1119 ORDER BY timestamp_ms DESC
1120 ) AS row_num
1121 FROM vol_surface_snapshots
1122 WHERE interval_ms = $1
1123 AND underlying = $3
1124 ) ranked
1125 WHERE ranked.row_num > $2
1126 ) stale
1127 WHERE v.id = stale.id",
1128 )
1129 .bind::<diesel::sql_types::BigInt, _>(interval_ms)
1130 .bind::<diesel::sql_types::BigInt, _>(max_periods)
1131 .bind::<diesel::sql_types::Text, _>(&underlying_owned)
1132 .execute(&mut *conn)
1133 .await?;
1134
1135 Ok(affected)
1136 })
1137 .await
1138 .map_err(Into::into)
1139 }
1140
1141 async fn upsert_bbo_snapshots(
1142 &self,
1143 snapshot_ts: i64,
1144 snapshots: &[NewBboSnapshotInput],
1145 ) -> Result<usize> {
1146 use schema::bbo_snapshots::dsl as bbo;
1147
1148 if snapshots.is_empty() {
1149 return Ok(0);
1150 }
1151
1152 let mut conn = self.get_conn().await?;
1153
1154 let rows: Vec<NewBboSnapshot> = snapshots
1155 .iter()
1156 .map(|snapshot| NewBboSnapshot {
1157 symbol: snapshot.symbol.clone(),
1158 best_bid: snapshot.best_bid,
1159 best_ask: snapshot.best_ask,
1160 best_bid_size: snapshot.best_bid_size,
1161 best_ask_size: snapshot.best_ask_size,
1162 snapshot_ts,
1163 })
1164 .collect();
1165
1166 let affected = diesel::insert_into(bbo::bbo_snapshots)
1167 .values(&rows)
1168 .on_conflict((bbo::symbol, bbo::snapshot_ts))
1169 .do_update()
1170 .set((
1171 bbo::best_bid.eq(excluded(bbo::best_bid)),
1172 bbo::best_ask.eq(excluded(bbo::best_ask)),
1173 bbo::best_bid_size.eq(excluded(bbo::best_bid_size)),
1174 bbo::best_ask_size.eq(excluded(bbo::best_ask_size)),
1175 ))
1176 .execute(&mut conn)
1177 .await?;
1178
1179 Ok(affected)
1180 }
1181
1182 async fn delete_bbo_snapshots_older_than(&self, cutoff_ts: i64) -> Result<usize> {
1183 use schema::bbo_snapshots::dsl as bbo;
1184
1185 let mut conn = self.get_conn().await?;
1186
1187 let deleted = diesel::delete(bbo::bbo_snapshots.filter(bbo::snapshot_ts.lt(cutoff_ts)))
1188 .execute(&mut conn)
1189 .await?;
1190
1191 Ok(deleted)
1192 }
1193
1194 async fn mark_settlement_payouts_seen(
1195 &self,
1196 wallet: &WalletAddress,
1197 payout_ids: &[i64],
1198 ) -> Result<usize> {
1199 use schema::settlement_payout_seen::dsl as sps;
1200 use schema::settlement_payouts::dsl as sp;
1201
1202 if payout_ids.is_empty() {
1203 return Ok(0);
1204 }
1205
1206 let mut conn = self.get_conn().await?;
1207
1208 let valid_ids: Vec<i64> = sp::settlement_payouts
1209 .filter(sp::wallet.eq(wallet))
1210 .filter(sp::id.eq_any(payout_ids))
1211 .select(sp::id)
1212 .load::<i64>(&mut conn)
1213 .await?;
1214
1215 if valid_ids.is_empty() {
1216 return Ok(0);
1217 }
1218
1219 let inserts: Vec<NewSettlementPayoutSeen> = valid_ids
1220 .into_iter()
1221 .map(|payout_id| NewSettlementPayoutSeen {
1222 wallet: *wallet,
1223 payout_id,
1224 })
1225 .collect();
1226
1227 let inserted_rows = diesel::insert_into(sps::settlement_payout_seen)
1228 .values(&inserts)
1229 .on_conflict((sps::wallet, sps::payout_id))
1230 .do_nothing()
1231 .execute(&mut conn)
1232 .await?;
1233
1234 Ok(inserted_rows)
1235 }
1236}
1237
1238#[cfg(test)]
1239mod tests {
1240 use crate::test_helpers::TestDb;
1241 use hypercall_db::AnalyticsReader;
1242 use hypercall_db::CatalogReader;
1243 use hypercall_types::wallet_address::test_wallet;
1244
1245 #[tokio::test]
1246 async fn instrument_exists_empty_db_returns_false() {
1247 let test_db = TestDb::new().await.unwrap();
1248 let db = test_db.diesel_db().await;
1249 let exists = db.instrument_exists("BTC-20260131-100000-C").await.unwrap();
1250 assert!(!exists);
1251 }
1252
1253 #[tokio::test]
1254 async fn get_all_trades_empty_db_returns_empty() {
1255 let test_db = TestDb::new().await.unwrap();
1256 let db = test_db.diesel_db().await;
1257 let trades = db.get_all_trades(100, 0).await.unwrap();
1258 assert!(trades.is_empty());
1259 }
1260
1261 #[tokio::test]
1262 async fn get_historical_pnl_empty_db_returns_empty() {
1263 let test_db = TestDb::new().await.unwrap();
1264 let db = test_db.diesel_db().await;
1265 let pnl = db
1266 .get_historical_pnl(
1267 &test_wallet(60),
1268 hypercall_types::HISTORICAL_PNL_INTERVAL_1H_MS,
1269 100,
1270 false,
1271 )
1272 .await
1273 .unwrap();
1274 assert!(pnl.is_empty());
1275 }
1276
1277 use super::*;
1280 use hypercall_db::{FillRecord, OrderRecord, TradeRecord};
1281 use rust_decimal_macros::dec;
1282
1283 #[test]
1284 fn trade_row_to_record_roundtrip() {
1285 let naive_dt = chrono::DateTime::from_timestamp(1_700_000_000, 0)
1286 .unwrap()
1287 .naive_utc();
1288 let row = TradeRow {
1289 trade_id: 42,
1290 symbol: "BTC-20260115-95000-C".to_string(),
1291 price: dec!(100.5),
1292 size: dec!(1000000),
1293 maker_address: test_wallet(1),
1294 taker_address: test_wallet(2),
1295 maker_fee: dec!(0.5),
1296 taker_fee: dec!(0.3),
1297 timestamp: 1_700_000_000_000,
1298 created_at: Some(naive_dt),
1299 };
1300 let record: TradeRecord = row.into();
1301 assert_eq!(record.trade_id, 42);
1302 assert_eq!(record.symbol, "BTC-20260115-95000-C");
1303 assert_eq!(record.price, dec!(100.5));
1304 assert_eq!(record.size, dec!(1000000));
1305 assert_eq!(record.maker_address, test_wallet(1));
1306 assert_eq!(record.taker_address, test_wallet(2));
1307 assert_eq!(record.maker_fee, dec!(0.5));
1308 assert_eq!(record.taker_fee, dec!(0.3));
1309 assert_eq!(record.timestamp, 1_700_000_000_000);
1310 let created = record.created_at.expect("created_at should be Some");
1312 assert_eq!(created.naive_utc(), naive_dt);
1313 }
1314
1315 #[test]
1316 fn trade_row_to_record_none_created_at() {
1317 let row = TradeRow {
1318 trade_id: 1,
1319 symbol: "ETH-20260115-3000-P".to_string(),
1320 price: dec!(50),
1321 size: dec!(100),
1322 maker_address: test_wallet(3),
1323 taker_address: test_wallet(4),
1324 maker_fee: dec!(0),
1325 taker_fee: dec!(0),
1326 timestamp: 0,
1327 created_at: None,
1328 };
1329 let record: TradeRecord = row.into();
1330 assert!(record.created_at.is_none());
1331 }
1332
1333 #[test]
1334 fn fill_row_to_record_roundtrip() {
1335 let naive_dt = chrono::DateTime::from_timestamp(1_700_000_000, 0)
1336 .unwrap()
1337 .naive_utc();
1338 let row = FillRow {
1339 fill_id: Some(99),
1340 trade_id: 42,
1341 wallet_address: test_wallet(5),
1342 symbol: "BTC-20260115-95000-C".to_string(),
1343 price: dec!(100.5),
1344 size: dec!(500),
1345 fee: dec!(0.25),
1346 is_taker: true,
1347 timestamp: 1_700_000_000_000,
1348 created_at: Some(naive_dt),
1349 builder_code_address: Some(test_wallet(10)),
1350 builder_code_fee: Some(dec!(0.1)),
1351 realized_pnl: Some(dec!(42.5)),
1352 underlying_notional: Some(dec!(12345)),
1353 };
1354 let record: FillRecord = row.into();
1355 assert_eq!(record.fill_id, Some(99));
1356 assert_eq!(record.trade_id, 42);
1357 assert_eq!(record.wallet_address, test_wallet(5));
1358 assert_eq!(record.symbol, "BTC-20260115-95000-C");
1359 assert_eq!(record.price, dec!(100.5));
1360 assert_eq!(record.size, dec!(500));
1361 assert_eq!(record.fee, dec!(0.25));
1362 assert!(record.is_taker);
1363 assert_eq!(record.timestamp, 1_700_000_000_000);
1364 let created = record.created_at.expect("created_at should be Some");
1365 assert_eq!(created.naive_utc(), naive_dt);
1366 assert_eq!(record.builder_code_address, Some(test_wallet(10)));
1367 assert_eq!(record.builder_code_fee, Some(dec!(0.1)));
1368 assert_eq!(record.realized_pnl, Some(dec!(42.5)));
1369 assert_eq!(record.underlying_notional, Some(dec!(12345)));
1370 }
1371
1372 #[test]
1373 fn fill_row_to_record_all_optionals_none() {
1374 let row = FillRow {
1375 fill_id: None,
1376 trade_id: 1,
1377 wallet_address: test_wallet(6),
1378 symbol: "ETH-20260115-3000-P".to_string(),
1379 price: dec!(10),
1380 size: dec!(100),
1381 fee: dec!(0),
1382 is_taker: false,
1383 timestamp: 0,
1384 created_at: None,
1385 builder_code_address: None,
1386 builder_code_fee: None,
1387 underlying_notional: None,
1388 realized_pnl: None,
1389 };
1390 let record: FillRecord = row.into();
1391 assert!(record.fill_id.is_none());
1392 assert!(record.created_at.is_none());
1393 assert!(record.builder_code_address.is_none());
1394 assert!(record.builder_code_fee.is_none());
1395 assert!(record.realized_pnl.is_none());
1396 assert!(!record.is_taker);
1397 }
1398
1399 #[test]
1400 fn order_row_to_record_roundtrip() {
1401 let naive_dt = chrono::DateTime::from_timestamp(1_700_000_000, 0)
1402 .unwrap()
1403 .naive_utc();
1404 let row = OrderRow {
1405 order_id: 77,
1406 wallet_address: test_wallet(7),
1407 symbol: "BTC-20260115-95000-C".to_string(),
1408 side: "Buy".to_string(),
1409 price: dec!(200),
1410 size: dec!(10),
1411 tif: "GTC".to_string(),
1412 client_id: Some("my-order-1".to_string()),
1413 is_perp: false,
1414 underlying: Some("BTC".to_string()),
1415 reduce_only: Some(true),
1416 status: "OPEN".to_string(),
1417 created_at: 1_700_000_000_000,
1419 updated_at: naive_dt,
1421 filled_size: dec!(5),
1422 mmp_enabled: true,
1423 };
1424 let record: OrderRecord = row.into();
1425 assert_eq!(record.order_id, 77);
1426 assert_eq!(record.wallet_address, test_wallet(7));
1427 assert_eq!(record.symbol, "BTC-20260115-95000-C");
1428 assert_eq!(record.side, "Buy");
1429 assert_eq!(record.price, dec!(200));
1430 assert_eq!(record.size, dec!(10));
1431 assert_eq!(record.tif, "GTC");
1432 assert_eq!(record.client_id, Some("my-order-1".to_string()));
1433 assert!(!record.is_perp);
1434 assert_eq!(record.underlying, Some("BTC".to_string()));
1435 assert_eq!(record.reduce_only, Some(true));
1436 assert_eq!(record.status, "OPEN");
1437 assert_eq!(record.filled_size, dec!(5));
1438 assert_eq!(record.timestamp, 1_700_000_000_000);
1440 let created = record
1442 .created_at
1443 .expect("created_at should be Some after conversion");
1444 assert_eq!(created.naive_utc(), naive_dt);
1445 }
1446
1447 #[test]
1448 fn order_row_to_record_optionals_none() {
1449 let naive_dt = chrono::DateTime::from_timestamp(1_600_000_000, 0)
1450 .unwrap()
1451 .naive_utc();
1452 let row = OrderRow {
1453 order_id: 1,
1454 wallet_address: test_wallet(8),
1455 symbol: "ETH-20260115-3000-P".to_string(),
1456 side: "Sell".to_string(),
1457 price: dec!(50),
1458 size: dec!(1),
1459 tif: "IOC".to_string(),
1460 client_id: None,
1461 is_perp: true,
1462 underlying: None,
1463 reduce_only: None,
1464 status: "FILLED".to_string(),
1465 created_at: 1_600_000_000_000,
1466 updated_at: naive_dt,
1467 filled_size: dec!(1),
1468 mmp_enabled: false,
1469 };
1470 let record: OrderRecord = row.into();
1471 assert!(record.client_id.is_none());
1472 assert!(record.is_perp);
1473 assert!(record.underlying.is_none());
1474 assert!(record.reduce_only.is_none());
1475 assert!(!record.mmp_enabled);
1476 assert_eq!(record.timestamp, 1_600_000_000_000);
1477 assert!(record.created_at.is_some());
1478 }
1479
1480 use hypercall_db::{AnalyticsWriter, NewBboSnapshotInput};
1485 use hypercall_types::HISTORICAL_PNL_INTERVAL_5M_MS;
1486
1487 fn wallet_hex(wallet: &hypercall_types::WalletAddress) -> String {
1489 format!("'\\x{}'", hex::encode(wallet.as_bytes()))
1490 }
1491
1492 fn exec_sql(conn: &mut diesel::pg::PgConnection, sql: &str) {
1494 diesel::RunQueryDsl::execute(diesel::sql_query(sql), conn).unwrap();
1495 }
1496
1497 fn load_sql<T>(conn: &mut diesel::pg::PgConnection, sql: &str) -> Vec<T>
1499 where
1500 T: diesel::deserialize::QueryableByName<diesel::pg::Pg> + 'static,
1501 {
1502 diesel::RunQueryDsl::load(diesel::sql_query(sql), conn).unwrap()
1503 }
1504
1505 #[tokio::test]
1506 async fn get_all_trades_returns_inserted() {
1507 let test_db = TestDb::new().await.unwrap();
1508 let db = test_db.diesel_db().await;
1509
1510 let mut conn = test_db.handler.pool().get().unwrap();
1511 let w1 = wallet_hex(&test_wallet(1));
1512 let w2 = wallet_hex(&test_wallet(2));
1513
1514 exec_sql(
1515 &mut conn,
1516 &format!(
1517 "INSERT INTO trades (trade_id, symbol, price, size, maker_address, taker_address, maker_fee, taker_fee, timestamp)
1518 VALUES
1519 (1, 'BTC-20260131-100000-C', 5.0, 1000000, {w1}, {w2}, 0.01, 0.02, 1000),
1520 (2, 'BTC-20260131-100000-C', 5.5, 2000000, {w1}, {w2}, 0.01, 0.02, 2000),
1521 (3, 'ETH-20260131-4000-P', 1.0, 500000, {w2}, {w1}, 0.005, 0.01, 3000)"
1522 ),
1523 );
1524
1525 let trades = AnalyticsReader::get_all_trades(&db, 10, 0).await.unwrap();
1526 assert_eq!(trades.len(), 3);
1527 assert_eq!(trades[0].trade_id, 3);
1529 assert_eq!(trades[1].trade_id, 2);
1530 assert_eq!(trades[2].trade_id, 1);
1531 }
1532
1533 #[tokio::test]
1534 async fn get_trades_by_option_filters_correctly() {
1535 let test_db = TestDb::new().await.unwrap();
1536 let db = test_db.diesel_db().await;
1537
1538 let mut conn = test_db.handler.pool().get().unwrap();
1539 let w1 = wallet_hex(&test_wallet(1));
1540 let w2 = wallet_hex(&test_wallet(2));
1541
1542 exec_sql(
1543 &mut conn,
1544 &format!(
1545 "INSERT INTO trades (trade_id, symbol, price, size, maker_address, taker_address, maker_fee, taker_fee, timestamp)
1546 VALUES
1547 (10, 'BTC-20260131-100000-C', 5.0, 1000000, {w1}, {w2}, 0.01, 0.02, 1000),
1548 (11, 'BTC-20260131-100000-C', 6.0, 1000000, {w1}, {w2}, 0.01, 0.02, 2000),
1549 (12, 'ETH-20260131-4000-P', 1.0, 500000, {w2}, {w1}, 0.005, 0.01, 3000)"
1550 ),
1551 );
1552
1553 let trades = db
1554 .get_trades_by_option("BTC-20260131-100000-C", 10)
1555 .await
1556 .unwrap();
1557 assert_eq!(trades.len(), 2);
1558 assert!(trades.iter().all(|t| t.symbol == "BTC-20260131-100000-C"));
1560 assert_eq!(trades[0].trade_id, 11);
1562 assert_eq!(trades[1].trade_id, 10);
1563
1564 let eth_trades = db
1565 .get_trades_by_option("ETH-20260131-4000-P", 10)
1566 .await
1567 .unwrap();
1568 assert_eq!(eth_trades.len(), 1);
1569 assert_eq!(eth_trades[0].trade_id, 12);
1570 }
1571
1572 #[tokio::test]
1573 async fn get_trades_by_underlying_filters_correctly() {
1574 let test_db = TestDb::new().await.unwrap();
1575 let db = test_db.diesel_db().await;
1576
1577 let mut conn = test_db.handler.pool().get().unwrap();
1578 let w1 = wallet_hex(&test_wallet(1));
1579 let w2 = wallet_hex(&test_wallet(2));
1580
1581 exec_sql(
1583 &mut conn,
1584 "INSERT INTO instruments (id, underlying, strike, expiry, option_type, status)
1585 VALUES
1586 ('BTC-20260131-100000-C', 'BTC', 100000, 1738300800, 'C', 'ACTIVE'),
1587 ('ETH-20260131-4000-P', 'ETH', 4000, 1738300800, 'P', 'ACTIVE')",
1588 );
1589
1590 exec_sql(
1591 &mut conn,
1592 &format!(
1593 "INSERT INTO trades (trade_id, symbol, price, size, maker_address, taker_address, maker_fee, taker_fee, timestamp)
1594 VALUES
1595 (20, 'BTC-20260131-100000-C', 5.0, 1000000, {w1}, {w2}, 0.01, 0.02, 1000),
1596 (21, 'BTC-20260131-100000-C', 6.0, 2000000, {w1}, {w2}, 0.01, 0.02, 2000),
1597 (22, 'ETH-20260131-4000-P', 1.0, 500000, {w2}, {w1}, 0.005, 0.01, 3000)"
1598 ),
1599 );
1600
1601 let btc_trades = db.get_trades_by_underlying("BTC", 10, 0).await.unwrap();
1602 assert_eq!(btc_trades.len(), 2);
1603 assert!(btc_trades
1604 .iter()
1605 .all(|t| t.symbol == "BTC-20260131-100000-C"));
1606
1607 let eth_trades = db.get_trades_by_underlying("ETH", 10, 0).await.unwrap();
1608 assert_eq!(eth_trades.len(), 1);
1609 assert_eq!(eth_trades[0].symbol, "ETH-20260131-4000-P");
1610 }
1611
1612 #[tokio::test]
1613 async fn get_fills_by_account_roundtrip() {
1614 let test_db = TestDb::new().await.unwrap();
1615 let db = test_db.diesel_db().await;
1616
1617 let mut conn = test_db.handler.pool().get().unwrap();
1618 let w1 = wallet_hex(&test_wallet(1));
1619 let w2 = wallet_hex(&test_wallet(2));
1620 let wallet1 = test_wallet(1);
1621 let wallet2 = test_wallet(2);
1622
1623 exec_sql(
1625 &mut conn,
1626 &format!(
1627 "INSERT INTO trades (trade_id, symbol, price, size, maker_address, taker_address, maker_fee, taker_fee, timestamp)
1628 VALUES
1629 (30, 'BTC-20260131-100000-C', 5.0, 1000000, {w1}, {w2}, 0.01, 0.02, 1000),
1630 (31, 'BTC-20260131-100000-C', 6.0, 2000000, {w1}, {w2}, 0.01, 0.02, 2000)"
1631 ),
1632 );
1633
1634 exec_sql(
1636 &mut conn,
1637 &format!(
1638 "INSERT INTO fills (trade_id, wallet_address, symbol, price, size, fee, is_taker, timestamp)
1639 VALUES
1640 (30, {w1}, 'BTC-20260131-100000-C', 5.0, 1000000, 0.01, false, 1000),
1641 (30, {w2}, 'BTC-20260131-100000-C', 5.0, 1000000, 0.02, true, 1000),
1642 (31, {w1}, 'BTC-20260131-100000-C', 6.0, 2000000, 0.01, false, 2000)"
1643 ),
1644 );
1645
1646 let fills_w1 = db.get_fills_by_account(&wallet1, 10, 0).await.unwrap();
1647 assert_eq!(fills_w1.len(), 2);
1648 assert!(fills_w1.iter().all(|f| f.wallet_address == wallet1));
1650 assert_eq!(fills_w1[0].trade_id, 31);
1652 assert_eq!(fills_w1[1].trade_id, 30);
1653
1654 let fills_w2 = db.get_fills_by_account(&wallet2, 10, 0).await.unwrap();
1655 assert_eq!(fills_w2.len(), 1);
1656 assert_eq!(fills_w2[0].wallet_address, wallet2);
1657 assert!(fills_w2[0].is_taker);
1658 }
1659
1660 #[tokio::test]
1661 async fn get_orders_by_account_roundtrip() {
1662 let test_db = TestDb::new().await.unwrap();
1663 let db = test_db.diesel_db().await;
1664
1665 let mut conn = test_db.handler.pool().get().unwrap();
1666 let w1 = wallet_hex(&test_wallet(1));
1667 let w2 = wallet_hex(&test_wallet(2));
1668 let wallet1 = test_wallet(1);
1669
1670 exec_sql(
1671 &mut conn,
1672 &format!(
1673 "INSERT INTO order_infos (order_id, wallet_address, symbol, side, price, size, tif, is_perp, timestamp, status, filled_size, updated_at)
1674 VALUES
1675 (100, {w1}, 'BTC-20260131-100000-C', 'Buy', 5.0, 1000000, 'GTC', false, 1000, 'OPEN', 0, '2026-01-01 00:00:00'),
1676 (101, {w1}, 'BTC-20260131-100000-C', 'Sell', 6.0, 2000000, 'GTC', false, 2000, 'FILLED', 2000000, '2026-01-01 00:01:00'),
1677 (102, {w2}, 'ETH-20260131-4000-P', 'Buy', 1.0, 500000, 'IOC', false, 3000, 'CANCELED', 0, '2026-01-01 00:02:00')"
1678 ),
1679 );
1680
1681 let orders = db
1682 .get_orders_by_account(&wallet1, None, 10, 0)
1683 .await
1684 .unwrap();
1685 assert_eq!(orders.len(), 2);
1686 assert!(orders.iter().all(|o| o.wallet_address == wallet1));
1687 assert_eq!(orders[0].order_id, 101);
1689 assert_eq!(orders[1].order_id, 100);
1690 }
1691
1692 #[tokio::test]
1693 async fn get_orders_by_account_with_status_filter() {
1694 let test_db = TestDb::new().await.unwrap();
1695 let db = test_db.diesel_db().await;
1696
1697 let mut conn = test_db.handler.pool().get().unwrap();
1698 let w1 = wallet_hex(&test_wallet(1));
1699 let wallet1 = test_wallet(1);
1700
1701 exec_sql(
1702 &mut conn,
1703 &format!(
1704 "INSERT INTO order_infos (order_id, wallet_address, symbol, side, price, size, tif, is_perp, timestamp, status, filled_size, updated_at)
1705 VALUES
1706 (200, {w1}, 'BTC-20260131-100000-C', 'Buy', 5.0, 1000000, 'GTC', false, 1000, 'OPEN', 0, '2026-01-01 00:00:00'),
1707 (201, {w1}, 'BTC-20260131-100000-C', 'Buy', 5.5, 1000000, 'GTC', false, 2000, 'OPEN', 0, '2026-01-01 00:01:00'),
1708 (202, {w1}, 'BTC-20260131-100000-C', 'Sell', 6.0, 2000000, 'GTC', false, 3000, 'FILLED', 2000000, '2026-01-01 00:02:00'),
1709 (203, {w1}, 'ETH-20260131-4000-P', 'Buy', 1.0, 500000, 'IOC', false, 4000, 'CANCELED', 0, '2026-01-01 00:03:00')"
1710 ),
1711 );
1712
1713 let open_orders = db
1714 .get_orders_by_account(&wallet1, Some("OPEN"), 10, 0)
1715 .await
1716 .unwrap();
1717 assert_eq!(open_orders.len(), 2);
1718 assert!(open_orders.iter().all(|o| o.status == "OPEN"));
1719
1720 let filled_orders = db
1721 .get_orders_by_account(&wallet1, Some("FILLED"), 10, 0)
1722 .await
1723 .unwrap();
1724 assert_eq!(filled_orders.len(), 1);
1725 assert_eq!(filled_orders[0].order_id, 202);
1726
1727 let canceled = db
1729 .get_orders_by_account(&wallet1, Some("canceled"), 10, 0)
1730 .await
1731 .unwrap();
1732 assert_eq!(canceled.len(), 1);
1733 assert_eq!(canceled[0].order_id, 203);
1734 }
1735
1736 #[tokio::test]
1737 async fn upsert_and_get_historical_pnl_roundtrip() {
1738 let test_db = TestDb::new().await.unwrap();
1739 let db = test_db.diesel_db().await;
1740
1741 let wallet = test_wallet(50);
1742 let interval = HISTORICAL_PNL_INTERVAL_5M_MS;
1743 let attribution_data = vec![1u8, 2, 3, 4];
1744
1745 let snap_t1 = vec![(wallet, dec!(1000.5), Some(attribution_data.clone()))];
1747 let snap_t2 = vec![(wallet, dec!(1050.25), None)];
1748
1749 AnalyticsWriter::upsert_historical_pnl_batch(&db, interval, 300_000, &snap_t1, 100)
1750 .await
1751 .unwrap();
1752 AnalyticsWriter::upsert_historical_pnl_batch(&db, interval, 600_000, &snap_t2, 100)
1753 .await
1754 .unwrap();
1755
1756 let points = db
1758 .get_historical_pnl(&wallet, interval, 100, true)
1759 .await
1760 .unwrap();
1761 assert_eq!(points.len(), 2);
1762 assert_eq!(points[0].timestamp_ms, 300_000);
1764 assert_eq!(points[0].total_equity, dec!(1000.5));
1765 assert_eq!(points[0].attribution, Some(attribution_data));
1766 assert_eq!(points[1].timestamp_ms, 600_000);
1767 assert_eq!(points[1].total_equity, dec!(1050.25));
1768 assert_eq!(points[1].attribution, None);
1769
1770 let points_no_attr = db
1772 .get_historical_pnl(&wallet, interval, 100, false)
1773 .await
1774 .unwrap();
1775 assert_eq!(points_no_attr.len(), 2);
1776 assert!(points_no_attr[0].attribution.is_none());
1777 assert!(points_no_attr[1].attribution.is_none());
1778 }
1779
1780 #[tokio::test]
1781 async fn upsert_and_get_historical_theos_roundtrip() {
1782 let test_db = TestDb::new().await.unwrap();
1783 let db = test_db.diesel_db().await;
1784
1785 let interval = hypercall_types::HISTORICAL_THEO_INTERVAL_5M_MS;
1786 let symbol = "BTC-20260131-100000-C".to_string();
1787
1788 let snap_t1 = vec![(symbol.clone(), dec!(5.25))];
1789 let snap_t2 = vec![(symbol.clone(), dec!(5.50))];
1790
1791 AnalyticsWriter::upsert_historical_theo_batch(&db, interval, 300_000, &snap_t1, 100)
1792 .await
1793 .unwrap();
1794 AnalyticsWriter::upsert_historical_theo_batch(&db, interval, 600_000, &snap_t2, 100)
1795 .await
1796 .unwrap();
1797
1798 let points = db
1799 .get_historical_theos(&symbol, interval, 100)
1800 .await
1801 .unwrap();
1802 assert_eq!(points.len(), 2);
1803 assert_eq!(points[0].timestamp_ms, 300_000);
1805 assert_eq!(points[0].theoretical_price, dec!(5.25));
1806 assert_eq!(points[1].timestamp_ms, 600_000);
1807 assert_eq!(points[1].theoretical_price, dec!(5.50));
1808 }
1809
1810 #[tokio::test]
1811 async fn upsert_and_load_bbo_snapshots_roundtrip() {
1812 let test_db = TestDb::new().await.unwrap();
1813 let db = test_db.diesel_db().await;
1814
1815 let snapshots = vec![
1816 NewBboSnapshotInput {
1817 symbol: "BTC-20260131-100000-C".to_string(),
1818 best_bid: dec!(4.5),
1819 best_ask: dec!(5.0),
1820 best_bid_size: Some(dec!(100)),
1821 best_ask_size: Some(dec!(200)),
1822 snapshot_ts: 1000,
1823 },
1824 NewBboSnapshotInput {
1825 symbol: "ETH-20260131-4000-P".to_string(),
1826 best_bid: dec!(0.8),
1827 best_ask: dec!(1.0),
1828 best_bid_size: None,
1829 best_ask_size: None,
1830 snapshot_ts: 1000,
1831 },
1832 ];
1833
1834 let affected = AnalyticsWriter::upsert_bbo_snapshots(&db, 1000, &snapshots)
1835 .await
1836 .unwrap();
1837 assert_eq!(affected, 2);
1838
1839 let loaded = db.load_bbo_snapshots_since(0).await.unwrap();
1840 assert_eq!(loaded.len(), 2);
1841 assert_eq!(loaded[0].symbol, "BTC-20260131-100000-C");
1843 assert_eq!(loaded[0].best_bid, dec!(4.5));
1844 assert_eq!(loaded[0].best_ask, dec!(5.0));
1845 assert_eq!(loaded[0].best_bid_size, Some(dec!(100)));
1846 assert_eq!(loaded[0].best_ask_size, Some(dec!(200)));
1847 assert_eq!(loaded[0].snapshot_ts, 1000);
1848
1849 assert_eq!(loaded[1].symbol, "ETH-20260131-4000-P");
1850 assert_eq!(loaded[1].best_bid, dec!(0.8));
1851 assert_eq!(loaded[1].best_ask, dec!(1.0));
1852 assert_eq!(loaded[1].best_bid_size, None);
1853 assert_eq!(loaded[1].best_ask_size, None);
1854 }
1855
1856 #[tokio::test]
1857 async fn delete_bbo_snapshots_older_than_removes_old() {
1858 let test_db = TestDb::new().await.unwrap();
1859 let db = test_db.diesel_db().await;
1860
1861 let old_snap = vec![NewBboSnapshotInput {
1862 symbol: "BTC-20260131-100000-C".to_string(),
1863 best_bid: dec!(4.0),
1864 best_ask: dec!(4.5),
1865 best_bid_size: Some(dec!(50)),
1866 best_ask_size: Some(dec!(60)),
1867 snapshot_ts: 100,
1868 }];
1869 let new_snap = vec![NewBboSnapshotInput {
1870 symbol: "BTC-20260131-100000-C".to_string(),
1871 best_bid: dec!(5.0),
1872 best_ask: dec!(5.5),
1873 best_bid_size: Some(dec!(70)),
1874 best_ask_size: Some(dec!(80)),
1875 snapshot_ts: 2000,
1876 }];
1877
1878 AnalyticsWriter::upsert_bbo_snapshots(&db, 100, &old_snap)
1879 .await
1880 .unwrap();
1881 AnalyticsWriter::upsert_bbo_snapshots(&db, 2000, &new_snap)
1882 .await
1883 .unwrap();
1884
1885 let deleted = db.delete_bbo_snapshots_older_than(1000).await.unwrap();
1887 assert_eq!(deleted, 1);
1888
1889 let remaining = db.load_bbo_snapshots_since(0).await.unwrap();
1890 assert_eq!(remaining.len(), 1);
1891 assert_eq!(remaining[0].snapshot_ts, 2000);
1892 }
1893
1894 #[tokio::test]
1895 async fn upsert_and_get_vol_surface_roundtrip() {
1896 let test_db = TestDb::new().await.unwrap();
1897 let db = test_db.diesel_db().await;
1898
1899 let interval = hypercall_types::HISTORICAL_THEO_INTERVAL_5M_MS;
1900 let surface = serde_json::json!({
1901 "model": "ssvi",
1902 "params": [0.1, 0.2, 0.3]
1903 });
1904
1905 let affected = AnalyticsWriter::upsert_vol_surface_snapshot(
1906 &db,
1907 interval,
1908 300_000,
1909 "BTC",
1910 surface.clone(),
1911 100,
1912 )
1913 .await
1914 .unwrap();
1915 assert_eq!(affected, 1);
1916
1917 let surface2 = serde_json::json!({"model": "sabr", "params": [0.4, 0.5]});
1919 AnalyticsWriter::upsert_vol_surface_snapshot(
1920 &db,
1921 interval,
1922 600_000,
1923 "BTC",
1924 surface2.clone(),
1925 100,
1926 )
1927 .await
1928 .unwrap();
1929
1930 let history = db
1931 .get_vol_surface_history("BTC", interval, 100)
1932 .await
1933 .unwrap();
1934 assert_eq!(history.len(), 2);
1935 assert_eq!(history[0].timestamp_ms, 300_000);
1937 assert_eq!(history[0].surface_json, surface);
1938 assert_eq!(history[0].underlying, "BTC");
1939 assert_eq!(history[1].timestamp_ms, 600_000);
1940 assert_eq!(history[1].surface_json, surface2);
1941 }
1942
1943 #[tokio::test]
1944 async fn mark_and_get_seen_settlement_payouts() {
1945 let test_db = TestDb::new().await.unwrap();
1946 let db = test_db.diesel_db().await;
1947
1948 let mut conn = test_db.handler.pool().get().unwrap();
1949 let wallet = test_wallet(70);
1950 let w_hex = wallet_hex(&wallet);
1951
1952 exec_sql(
1954 &mut conn,
1955 &format!(
1956 "INSERT INTO settlement_payouts (wallet, symbol, expiry_ts, position_size, settlement_price, payout_amount, ledger_applied)
1957 VALUES
1958 ({w_hex}, 'BTC-20260131-100000-C', 1738300800, 1000000, 100000, 5000, true),
1959 ({w_hex}, 'ETH-20260131-4000-P', 1738300800, 500000, 4000, 200, true)"
1960 ),
1961 );
1962
1963 #[derive(diesel::QueryableByName)]
1965 struct IdRow {
1966 #[diesel(sql_type = diesel::sql_types::BigInt)]
1967 id: i64,
1968 }
1969 let ids: Vec<IdRow> = load_sql(
1970 &mut conn,
1971 &format!("SELECT id FROM settlement_payouts WHERE wallet = {w_hex} ORDER BY id"),
1972 );
1973 let payout_ids: Vec<i64> = ids.iter().map(|r| r.id).collect();
1974 assert_eq!(payout_ids.len(), 2);
1975
1976 let seen_before = db
1978 .get_seen_settlement_payout_ids(&wallet, &payout_ids)
1979 .await
1980 .unwrap();
1981 assert!(seen_before.is_empty());
1982
1983 let marked = AnalyticsWriter::mark_settlement_payouts_seen(&db, &wallet, &[payout_ids[0]])
1985 .await
1986 .unwrap();
1987 assert_eq!(marked, 1);
1988
1989 let seen_after = db
1991 .get_seen_settlement_payout_ids(&wallet, &payout_ids)
1992 .await
1993 .unwrap();
1994 assert_eq!(seen_after.len(), 1);
1995 assert!(seen_after.contains(&payout_ids[0]));
1996 assert!(!seen_after.contains(&payout_ids[1]));
1997
1998 AnalyticsWriter::mark_settlement_payouts_seen(&db, &wallet, &payout_ids)
2000 .await
2001 .unwrap();
2002 let seen_all = db
2003 .get_seen_settlement_payout_ids(&wallet, &payout_ids)
2004 .await
2005 .unwrap();
2006 assert_eq!(seen_all.len(), 2);
2007 }
2008
2009 #[tokio::test]
2010 async fn instrument_exists_true_after_insert() {
2011 let test_db = TestDb::new().await.unwrap();
2012 let db = test_db.diesel_db().await;
2013
2014 let mut conn = test_db.handler.pool().get().unwrap();
2015
2016 assert!(!db.instrument_exists("BTC-20260131-100000-C").await.unwrap());
2018
2019 exec_sql(
2020 &mut conn,
2021 "INSERT INTO instruments (id, underlying, strike, expiry, option_type, status)
2022 VALUES ('BTC-20260131-100000-C', 'BTC', 100000, 1738300800, 'C', 'ACTIVE')",
2023 );
2024
2025 assert!(db.instrument_exists("BTC-20260131-100000-C").await.unwrap());
2027 assert!(!db.instrument_exists("ETH-20260131-4000-P").await.unwrap());
2029 }
2030
2031 #[tokio::test]
2032 async fn get_client_ids_by_order_ids_roundtrip() {
2033 let test_db = TestDb::new().await.unwrap();
2034 let db = test_db.diesel_db().await;
2035
2036 let mut conn = test_db.handler.pool().get().unwrap();
2037 let w1 = wallet_hex(&test_wallet(1));
2038
2039 exec_sql(
2040 &mut conn,
2041 &format!(
2042 "INSERT INTO order_infos (order_id, wallet_address, symbol, side, price, size, tif, is_perp, timestamp, status, filled_size, updated_at, client_id)
2043 VALUES
2044 (300, {w1}, 'BTC-20260131-100000-C', 'Buy', 5.0, 1000000, 'GTC', false, 1000, 'OPEN', 0, '2026-01-01 00:00:00', 'client-alpha'),
2045 (301, {w1}, 'BTC-20260131-100000-C', 'Sell', 6.0, 2000000, 'GTC', false, 2000, 'OPEN', 0, '2026-01-01 00:01:00', NULL),
2046 (302, {w1}, 'ETH-20260131-4000-P', 'Buy', 1.0, 500000, 'IOC', false, 3000, 'FILLED', 500000, '2026-01-01 00:02:00', 'client-beta')"
2047 ),
2048 );
2049
2050 let result = db
2051 .get_client_ids_by_order_ids(&[300, 301, 302, 999])
2052 .await
2053 .unwrap();
2054 assert_eq!(result.len(), 3); assert_eq!(result[&300], Some("client-alpha".to_string()));
2056 assert_eq!(result[&301], None);
2057 assert_eq!(result[&302], Some("client-beta".to_string()));
2058 assert!(!result.contains_key(&999));
2059 }
2060
2061 #[tokio::test]
2066 async fn get_theo_marks_at_timestamp_picks_latest_before() {
2067 let test_db = TestDb::new().await.unwrap();
2068 let db = test_db.diesel_db().await;
2069
2070 let interval = hypercall_types::HISTORICAL_THEO_INTERVAL_5M_MS;
2071 let symbol = "BTC-20260131-100000-C".to_string();
2072
2073 AnalyticsWriter::upsert_historical_theo_batch(
2075 &db,
2076 interval,
2077 1000,
2078 &[(symbol.clone(), dec!(5.0))],
2079 100,
2080 )
2081 .await
2082 .unwrap();
2083
2084 AnalyticsWriter::upsert_historical_theo_batch(
2085 &db,
2086 interval,
2087 2000,
2088 &[(symbol.clone(), dec!(7.5))],
2089 100,
2090 )
2091 .await
2092 .unwrap();
2093
2094 AnalyticsWriter::upsert_historical_theo_batch(
2095 &db,
2096 interval,
2097 3000,
2098 &[(symbol.clone(), dec!(9.0))],
2099 100,
2100 )
2101 .await
2102 .unwrap();
2103
2104 let marks = db
2106 .get_theo_marks_at_timestamp(&[symbol.clone()], 2500)
2107 .await
2108 .unwrap();
2109 assert_eq!(marks.len(), 1);
2110 assert_eq!(marks[&symbol], dec!(7.5));
2111
2112 let marks2 = db
2114 .get_theo_marks_at_timestamp(&[symbol.clone()], 3000)
2115 .await
2116 .unwrap();
2117 assert_eq!(marks2[&symbol], dec!(9.0));
2118
2119 let marks3 = db
2121 .get_theo_marks_at_timestamp(&[symbol.clone()], 500)
2122 .await
2123 .unwrap();
2124 assert!(marks3.is_empty());
2125 }
2126
2127 #[tokio::test]
2128 async fn get_theo_marks_at_timestamp_empty_symbols() {
2129 let test_db = TestDb::new().await.unwrap();
2130 let db = test_db.diesel_db().await;
2131
2132 let marks = db.get_theo_marks_at_timestamp(&[], 1000).await.unwrap();
2133 assert!(marks.is_empty());
2134 }
2135
2136 #[tokio::test]
2137 async fn get_deposit_withdraw_events_returns_ordered() {
2138 let test_db = TestDb::new().await.unwrap();
2139 let db = test_db.diesel_db().await;
2140
2141 let mut conn = test_db.handler.pool().get().unwrap();
2142 let wallet = test_wallet(80);
2143 let w_hex = wallet_hex(&wallet);
2144
2145 exec_sql(
2147 &mut conn,
2148 &format!(
2149 "INSERT INTO ledger_events (wallet, event_ts_ms, delta, event_type) VALUES \
2150 ({w_hex}, 1000, 500.0, 'deposit'), \
2151 ({w_hex}, 2000, -100.0, 'withdraw'), \
2152 ({w_hex}, 3000, 200.0, 'deposit'), \
2153 ({w_hex}, 2500, 50.0, 'fill_premium')"
2154 ),
2155 );
2156
2157 let events = db.get_deposit_withdraw_events(&wallet).await.unwrap();
2158 assert_eq!(events.len(), 3);
2160 assert_eq!(events[0].event_ts_ms, 1000);
2162 assert_eq!(events[0].delta, dec!(500.0));
2163 assert_eq!(events[1].event_ts_ms, 2000);
2164 assert_eq!(events[1].delta, dec!(-100.0));
2165 assert_eq!(events[2].event_ts_ms, 3000);
2166 assert_eq!(events[2].delta, dec!(200.0));
2167 }
2168
2169 #[tokio::test]
2170 async fn get_deposit_withdraw_events_empty() {
2171 let test_db = TestDb::new().await.unwrap();
2172 let db = test_db.diesel_db().await;
2173
2174 let wallet = test_wallet(81);
2175 let events = db.get_deposit_withdraw_events(&wallet).await.unwrap();
2176 assert!(events.is_empty());
2177 }
2178
2179 #[tokio::test]
2180 async fn get_settled_pnl_by_symbol_groups_correctly() {
2181 let test_db = TestDb::new().await.unwrap();
2182 let db = test_db.diesel_db().await;
2183
2184 let mut conn = test_db.handler.pool().get().unwrap();
2185 let wallet = test_wallet(82);
2186 let w_hex = wallet_hex(&wallet);
2187
2188 exec_sql(
2190 &mut conn,
2191 &format!(
2192 "INSERT INTO ledger_events (wallet, event_ts_ms, delta, event_type, reference_symbol) VALUES \
2193 ({w_hex}, 1000, 100.0, 'fill_premium', 'BTC-20260131-100000-C'), \
2194 ({w_hex}, 1500, -30.0, 'fill_premium', 'BTC-20260131-100000-C'), \
2195 ({w_hex}, 2000, 50.0, 'settlement_realized_pnl', 'ETH-20260131-4000-P'), \
2196 ({w_hex}, 2500, 25.0, 'fill_realized_pnl', 'ETH-20260131-4000-P'), \
2197 ({w_hex}, 3000, 200.0, 'deposit', NULL)"
2198 ),
2199 );
2200
2201 let pnl = db.get_settled_pnl_by_symbol(&wallet, 3000).await.unwrap();
2203 assert_eq!(pnl.len(), 2);
2204
2205 let btc_pnl = pnl.iter().find(|(s, _)| s == "BTC-20260131-100000-C");
2206 assert_eq!(btc_pnl.unwrap().1, dec!(70.0)); let eth_pnl = pnl.iter().find(|(s, _)| s == "ETH-20260131-4000-P");
2209 assert_eq!(eth_pnl.unwrap().1, dec!(75.0)); }
2211
2212 #[tokio::test]
2213 async fn get_settled_pnl_by_symbol_respects_timestamp_cutoff() {
2214 let test_db = TestDb::new().await.unwrap();
2215 let db = test_db.diesel_db().await;
2216
2217 let mut conn = test_db.handler.pool().get().unwrap();
2218 let wallet = test_wallet(83);
2219 let w_hex = wallet_hex(&wallet);
2220
2221 exec_sql(
2222 &mut conn,
2223 &format!(
2224 "INSERT INTO ledger_events (wallet, event_ts_ms, delta, event_type, reference_symbol) VALUES \
2225 ({w_hex}, 1000, 100.0, 'fill_premium', 'BTC-20260131-100000-C'), \
2226 ({w_hex}, 5000, 200.0, 'fill_premium', 'BTC-20260131-100000-C')"
2227 ),
2228 );
2229
2230 let pnl = db.get_settled_pnl_by_symbol(&wallet, 2000).await.unwrap();
2232 assert_eq!(pnl.len(), 1);
2233 assert_eq!(pnl[0].1, dec!(100.0));
2234 }
2235
2236 #[tokio::test]
2237 async fn get_fills_since_timestamp_filters_correctly() {
2238 let test_db = TestDb::new().await.unwrap();
2239 let db = test_db.diesel_db().await;
2240
2241 let mut conn = test_db.handler.pool().get().unwrap();
2242 let w1 = wallet_hex(&test_wallet(1));
2243 let w2 = wallet_hex(&test_wallet(2));
2244
2245 exec_sql(
2247 &mut conn,
2248 &format!(
2249 "INSERT INTO trades (trade_id, symbol, price, size, maker_address, taker_address, maker_fee, taker_fee, timestamp) \
2250 VALUES \
2251 (40, 'BTC-20260131-100000-C', 5.0, 1000000, {w1}, {w2}, 0.01, 0.02, 1000), \
2252 (41, 'BTC-20260131-100000-C', 6.0, 2000000, {w1}, {w2}, 0.01, 0.02, 2000), \
2253 (42, 'ETH-20260131-4000-P', 1.0, 500000, {w2}, {w1}, 0.005, 0.01, 3000)"
2254 ),
2255 );
2256
2257 exec_sql(
2260 &mut conn,
2261 &format!(
2262 "INSERT INTO fills (trade_id, wallet_address, symbol, price, size, fee, is_taker, timestamp, underlying_notional) \
2263 VALUES \
2264 (40, {w1}, 'BTC-20260131-100000-C', 5.0, 1000000, 0.01, false, 1000, 100.0), \
2265 (41, {w1}, 'BTC-20260131-100000-C', 6.0, 2000000, 0.01, false, 2000, 200.0), \
2266 (42, {w2}, 'ETH-20260131-4000-P', 1.0, 500000, 0.02, true, 3000, 300.0)"
2267 ),
2268 );
2269
2270 let fills = db.get_fills_since_timestamp(2000).await.unwrap();
2272 assert_eq!(fills.len(), 1);
2273 assert_eq!(fills[0].timestamp, 3000);
2275 assert_eq!(fills[0].symbol, "ETH-20260131-4000-P");
2276 assert_eq!(fills[0].underlying_notional, Some(dec!(300.0)));
2277
2278 let fills_empty = db.get_fills_since_timestamp(4000).await.unwrap();
2280 assert!(fills_empty.is_empty());
2281 }
2282}