1use anyhow::{Context, Result};
7use chrono::{DateTime, Utc};
8use diesel::prelude::*;
9use diesel::sql_types::*;
10use diesel_async::{AsyncConnection, RunQueryDsl};
11use hypercall_types::WalletAddress;
12use rust_decimal::Decimal;
13
14use crate::diesel_db::DieselDb;
15use hypercall_db::{
16 CompetitionFillInput, CompetitionFillRecord, CompetitionFinalStatsInput,
17 CompetitionFinalStatsRecord, CompetitionReader, CompetitionRecord, CompetitionUpsertInput,
18 CompetitionWriteError, CompetitionWriter, PlatformWalletMetrics, ProfileFillRecord,
19 SymbolPnlRecord, TheoMarkRecord, WalletLedgerStats, WalletUsernameRecord,
20};
21
22#[derive(Debug, QueryableByName)]
27struct CompetitionRow {
28 #[diesel(sql_type = BigInt)]
29 pub id: i64,
30 #[diesel(sql_type = Text)]
31 pub name: String,
32 #[diesel(sql_type = Nullable<Text>)]
33 pub description: Option<String>,
34 #[diesel(sql_type = Nullable<Text>)]
35 pub rules_url: Option<String>,
36 #[diesel(sql_type = Nullable<Text>)]
37 pub rules_content: Option<String>,
38 #[diesel(sql_type = Array<Text>)]
39 pub win_conditions: Vec<String>,
40 #[diesel(sql_type = Text)]
41 pub primary_win_condition: String,
42 #[diesel(sql_type = BigInt)]
43 pub start_ts_ms: i64,
44 #[diesel(sql_type = BigInt)]
45 pub end_ts_ms: i64,
46 #[diesel(sql_type = Timestamptz)]
47 pub created_at: DateTime<Utc>,
48 #[diesel(sql_type = Timestamptz)]
49 pub updated_at: DateTime<Utc>,
50}
51
52impl From<CompetitionRow> for CompetitionRecord {
53 fn from(r: CompetitionRow) -> Self {
54 Self {
55 id: r.id,
56 name: r.name,
57 description: r.description,
58 rules_url: r.rules_url,
59 rules_content: r.rules_content,
60 win_conditions: r.win_conditions,
61 primary_win_condition: r.primary_win_condition,
62 start_ts_ms: r.start_ts_ms,
63 end_ts_ms: r.end_ts_ms,
64 created_at: r.created_at,
65 updated_at: r.updated_at,
66 }
67 }
68}
69
70#[derive(Debug, QueryableByName)]
71struct CompetitionFillRow {
72 #[diesel(sql_type = Bytea)]
73 pub wallet: WalletAddress,
74 #[diesel(sql_type = Text)]
75 pub symbol: String,
76 #[diesel(sql_type = Text)]
77 pub side: String,
78 #[diesel(sql_type = Numeric)]
79 pub price: Decimal,
80 #[diesel(sql_type = Numeric)]
81 pub size: Decimal,
82 #[diesel(sql_type = Numeric)]
83 pub fee: Decimal,
84 #[diesel(sql_type = BigInt)]
85 pub timestamp_ms: i64,
86}
87
88impl From<CompetitionFillRow> for CompetitionFillRecord {
89 fn from(r: CompetitionFillRow) -> Self {
90 Self {
91 wallet: r.wallet,
92 symbol: r.symbol,
93 side: r.side,
94 price: r.price,
95 size: r.size,
96 fee: r.fee,
97 timestamp_ms: r.timestamp_ms,
98 }
99 }
100}
101
102#[derive(Debug, QueryableByName)]
103struct FinalizedStatsRow {
104 #[diesel(sql_type = Integer)]
105 pub rank: i32,
106 #[diesel(sql_type = Bytea)]
107 pub wallet: WalletAddress,
108 #[diesel(sql_type = Numeric)]
109 pub pnl: Decimal,
110 #[diesel(sql_type = Numeric)]
111 pub volume: Decimal,
112 #[diesel(sql_type = Numeric)]
113 pub efficiency: Decimal,
114 #[diesel(sql_type = Nullable<Integer>)]
115 pub medal: Option<i32>,
116}
117
118impl From<FinalizedStatsRow> for CompetitionFinalStatsRecord {
119 fn from(r: FinalizedStatsRow) -> Self {
120 Self {
121 wallet: r.wallet,
122 rank: r.rank,
123 pnl: r.pnl,
124 volume: r.volume,
125 efficiency: r.efficiency,
126 medal: r.medal,
127 }
128 }
129}
130
131#[derive(Debug, QueryableByName)]
132struct DecimalRow {
133 #[diesel(sql_type = Nullable<Numeric>)]
134 pub value: Option<Decimal>,
135}
136
137#[derive(Debug, QueryableByName)]
138struct TsRow {
139 #[diesel(sql_type = Nullable<BigInt>)]
140 pub value: Option<i64>,
141}
142
143#[derive(Debug, QueryableByName)]
144struct SymbolPnlRow {
145 #[diesel(sql_type = Text)]
146 pub reference_symbol: String,
147 #[diesel(sql_type = Numeric)]
148 pub total_pnl: Decimal,
149 #[diesel(sql_type = BigInt)]
150 pub event_count: i64,
151}
152
153#[derive(Debug, QueryableByName)]
154struct ProfileFillDbRow {
155 #[diesel(sql_type = Nullable<BigInt>)]
156 pub fill_id: Option<i64>,
157 #[diesel(sql_type = BigInt)]
158 pub trade_id: i64,
159 #[diesel(sql_type = Bytea)]
160 pub wallet_address: WalletAddress,
161 #[diesel(sql_type = Text)]
162 pub symbol: String,
163 #[diesel(sql_type = Numeric)]
164 pub price: Decimal,
165 #[diesel(sql_type = Numeric)]
166 pub size: Decimal,
167 #[diesel(sql_type = Numeric)]
168 pub fee: Decimal,
169 #[diesel(sql_type = Bool)]
170 pub is_taker: bool,
171 #[diesel(sql_type = BigInt)]
172 pub timestamp: i64,
173 #[diesel(sql_type = Nullable<Timestamp>)]
174 pub created_at: Option<chrono::NaiveDateTime>,
175 #[diesel(sql_type = Nullable<Bytea>)]
176 pub builder_code_address: Option<WalletAddress>,
177 #[diesel(sql_type = Nullable<Numeric>)]
178 pub builder_code_fee: Option<Decimal>,
179 #[diesel(sql_type = Nullable<Numeric>)]
180 pub realized_pnl: Option<Decimal>,
181}
182
183impl From<ProfileFillDbRow> for ProfileFillRecord {
184 fn from(r: ProfileFillDbRow) -> Self {
185 Self {
186 fill_id: r.fill_id,
187 trade_id: r.trade_id,
188 wallet_address: r.wallet_address,
189 symbol: r.symbol,
190 price: r.price,
191 size: r.size,
192 fee: r.fee,
193 is_taker: r.is_taker,
194 timestamp: r.timestamp,
195 created_at: r.created_at.map(|dt| dt.and_utc()),
196 builder_code_address: r.builder_code_address,
197 builder_code_fee: r.builder_code_fee,
198 realized_pnl: r.realized_pnl,
199 }
200 }
201}
202
203#[derive(Debug, QueryableByName)]
204struct RealizedRow {
205 #[diesel(sql_type = Bytea)]
206 pub wallet: WalletAddress,
207 #[diesel(sql_type = Numeric)]
208 pub realized: Decimal,
209}
210
211#[derive(Debug, QueryableByName)]
212struct VolumeRow {
213 #[diesel(sql_type = Bytea)]
214 pub wallet: WalletAddress,
215 #[diesel(sql_type = Numeric)]
216 pub volume: Decimal,
217}
218
219#[derive(Debug, QueryableByName)]
220struct MarkRow {
221 #[diesel(sql_type = Text)]
222 pub symbol: String,
223 #[diesel(sql_type = Numeric)]
224 pub theoretical_price: Decimal,
225}
226
227#[derive(Debug, QueryableByName)]
228struct UserRow {
229 #[diesel(sql_type = Text)]
230 pub wallet_address: String,
231 #[diesel(sql_type = Text)]
232 pub username: String,
233}
234
235#[derive(Debug, QueryableByName)]
236struct UsernameRow {
237 #[diesel(sql_type = Text)]
238 pub username: String,
239}
240
241#[derive(Debug, QueryableByName)]
242struct ProfileImageRow {
243 #[diesel(sql_type = Nullable<Text>)]
244 pub profile_image_url: Option<String>,
245}
246
247#[derive(QueryableByName)]
248struct ApprovedUsernameRow {
249 #[diesel(sql_type = Nullable<Text>)]
250 #[allow(dead_code)]
251 pub approved_username: Option<String>,
252}
253
254#[derive(QueryableByName)]
255struct ExistsRow {
256 #[diesel(sql_type = Integer)]
257 #[allow(dead_code)]
258 pub one: i32,
259}
260
261fn is_overlap_violation(err: &diesel::result::Error) -> bool {
266 match err {
267 diesel::result::Error::DatabaseError(
268 diesel::result::DatabaseErrorKind::ExclusionViolation,
269 _,
270 ) => true,
271 diesel::result::Error::DatabaseError(diesel::result::DatabaseErrorKind::Unknown, info) => {
272 info.message().contains("exclusion")
273 || info
274 .constraint_name()
275 .map(|c| c.contains("excl"))
276 .unwrap_or(false)
277 }
278 _ => false,
279 }
280}
281
282fn is_unique_violation(err: &diesel::result::Error) -> bool {
283 matches!(
284 err,
285 diesel::result::Error::DatabaseError(diesel::result::DatabaseErrorKind::UniqueViolation, _)
286 )
287}
288
289#[async_trait::async_trait]
294impl CompetitionReader for DieselDb {
295 async fn list_competitions(
296 &self,
297 state_filter: Option<&str>,
298 from_ts_ms: Option<i64>,
299 to_ts_ms: Option<i64>,
300 now_ts_ms: i64,
301 limit: i64,
302 offset: i64,
303 ) -> Result<Vec<CompetitionRecord>> {
304 let mut conn = self.get_conn().await?;
305 let state_owned: Option<String> = state_filter.map(|s| s.to_string());
306 let rows: Vec<CompetitionRow> = diesel::sql_query(
307 "SELECT id, name, description, rules_url, rules_content, win_conditions, primary_win_condition,
308 start_ts_ms, end_ts_ms, created_at, updated_at
309 FROM competitions
310 WHERE ($1::bigint IS NULL OR end_ts_ms >= $1)
311 AND ($2::bigint IS NULL OR start_ts_ms <= $2)
312 AND (
313 $3::text IS NULL
314 OR ($3 = 'pre' AND $4 < start_ts_ms)
315 OR ($3 = 'active' AND $4 >= start_ts_ms AND $4 < end_ts_ms)
316 OR ($3 = 'post' AND $4 >= end_ts_ms)
317 )
318 ORDER BY start_ts_ms DESC
319 LIMIT $5 OFFSET $6",
320 )
321 .bind::<Nullable<BigInt>, _>(from_ts_ms)
322 .bind::<Nullable<BigInt>, _>(to_ts_ms)
323 .bind::<Nullable<Text>, _>(&state_owned)
324 .bind::<BigInt, _>(now_ts_ms)
325 .bind::<BigInt, _>(limit)
326 .bind::<BigInt, _>(offset)
327 .get_results(&mut conn)
328 .await
329 .context("failed to list competitions")?;
330
331 Ok(rows.into_iter().map(Into::into).collect())
332 }
333
334 async fn get_competition_by_id(
335 &self,
336 competition_id: i64,
337 ) -> Result<Option<CompetitionRecord>> {
338 let mut conn = self.get_conn().await?;
339 let rows: Vec<CompetitionRow> = diesel::sql_query(
340 "SELECT id, name, description, rules_url, rules_content, win_conditions, primary_win_condition,
341 start_ts_ms, end_ts_ms, created_at, updated_at
342 FROM competitions
343 WHERE id = $1",
344 )
345 .bind::<BigInt, _>(competition_id)
346 .get_results(&mut conn)
347 .await
348 .context("failed to fetch competition")?;
349
350 Ok(rows.into_iter().next().map(Into::into))
351 }
352
353 async fn get_active_competition(&self, now_ts_ms: i64) -> Result<Option<CompetitionRecord>> {
354 let mut conn = self.get_conn().await?;
355 let rows: Vec<CompetitionRow> = diesel::sql_query(
356 "SELECT id, name, description, rules_url, rules_content, win_conditions, primary_win_condition,
357 start_ts_ms, end_ts_ms, created_at, updated_at
358 FROM competitions
359 WHERE start_ts_ms <= $1 AND end_ts_ms > $1
360 ORDER BY start_ts_ms DESC
361 LIMIT 1",
362 )
363 .bind::<BigInt, _>(now_ts_ms)
364 .get_results(&mut conn)
365 .await
366 .context("failed to query active competition")?;
367
368 Ok(rows.into_iter().next().map(Into::into))
369 }
370
371 async fn get_latest_completed_competition(
372 &self,
373 now_ts_ms: i64,
374 ) -> Result<Option<CompetitionRecord>> {
375 let mut conn = self.get_conn().await?;
376 let rows: Vec<CompetitionRow> = diesel::sql_query(
377 "SELECT id, name, description, rules_url, rules_content, win_conditions, primary_win_condition,
378 start_ts_ms, end_ts_ms, created_at, updated_at
379 FROM competitions
380 WHERE end_ts_ms <= $1
381 ORDER BY end_ts_ms DESC
382 LIMIT 1",
383 )
384 .bind::<BigInt, _>(now_ts_ms)
385 .get_results(&mut conn)
386 .await
387 .context("failed to query latest completed competition")?;
388
389 Ok(rows.into_iter().next().map(Into::into))
390 }
391
392 async fn get_competitions_to_finalize(&self, now_ts_ms: i64) -> Result<Vec<CompetitionRecord>> {
393 let mut conn = self.get_conn().await?;
394 let rows: Vec<CompetitionRow> = diesel::sql_query(
395 "SELECT c.id, c.name, c.description, c.rules_url, c.rules_content, c.win_conditions,
396 c.primary_win_condition, c.start_ts_ms, c.end_ts_ms, c.created_at, c.updated_at
397 FROM competitions c
398 WHERE c.end_ts_ms <= $1
399 AND NOT EXISTS (
400 SELECT 1
401 FROM competition_final_stats fs
402 WHERE fs.competition_id = c.id
403 )
404 ORDER BY c.end_ts_ms ASC",
405 )
406 .bind::<BigInt, _>(now_ts_ms)
407 .get_results(&mut conn)
408 .await
409 .context("failed to load finalization candidates")?;
410
411 Ok(rows.into_iter().map(Into::into).collect())
412 }
413
414 async fn get_finalized_stats(
415 &self,
416 competition_id: i64,
417 ) -> Result<Vec<CompetitionFinalStatsRecord>> {
418 let mut conn = self.get_conn().await?;
419 let rows: Vec<FinalizedStatsRow> = diesel::sql_query(
420 "SELECT cfs.rank, cfs.wallet, cfs.pnl, cfs.volume, cfs.efficiency, cfs.medal
421 FROM competition_final_stats cfs
422 LEFT JOIN user_tiers ut ON cfs.wallet = ut.wallet_address
423 WHERE cfs.competition_id = $1
424 AND (ut.wallet_address IS NULL OR ut.tier != 'market_maker')
425 ORDER BY cfs.rank ASC",
426 )
427 .bind::<BigInt, _>(competition_id)
428 .get_results(&mut conn)
429 .await
430 .context("failed to query finalized leaderboard")?;
431
432 Ok(rows.into_iter().map(Into::into).collect())
433 }
434
435 async fn get_competition_fills_before(
436 &self,
437 cutoff_ts_ms: i64,
438 ) -> Result<Vec<CompetitionFillRecord>> {
439 let mut conn = self.get_conn().await?;
440 let rows: Vec<CompetitionFillRow> = diesel::sql_query(
441 "SELECT cfe.wallet, cfe.symbol, cfe.side, cfe.price, cfe.size, cfe.fee, cfe.timestamp_ms
442 FROM competition_fill_events cfe
443 LEFT JOIN user_tiers ut ON cfe.wallet = ut.wallet_address
444 WHERE cfe.timestamp_ms < $1
445 AND (ut.wallet_address IS NULL OR ut.tier != 'market_maker')
446 ORDER BY cfe.timestamp_ms ASC, cfe.id ASC",
447 )
448 .bind::<BigInt, _>(cutoff_ts_ms)
449 .get_results(&mut conn)
450 .await
451 .context("failed to query competition fill events")?;
452
453 Ok(rows.into_iter().map(Into::into).collect())
454 }
455
456 async fn get_historical_theo_marks(
457 &self,
458 symbols: &[String],
459 cutoff_ts_ms: i64,
460 ) -> Result<Vec<TheoMarkRecord>> {
461 if symbols.is_empty() {
462 return Ok(Vec::new());
463 }
464
465 let mut conn = self.get_conn().await?;
466 let rows: Vec<MarkRow> = diesel::sql_query(
467 "SELECT DISTINCT ON (symbol) symbol, theoretical_price
468 FROM historical_theo_snapshots
469 WHERE symbol = ANY($1)
470 AND timestamp_ms <= $2
471 ORDER BY symbol, timestamp_ms DESC, interval_ms ASC",
472 )
473 .bind::<Array<Text>, _>(symbols)
474 .bind::<BigInt, _>(cutoff_ts_ms)
475 .get_results(&mut conn)
476 .await
477 .context("failed to load historical theoretical marks")?;
478
479 Ok(rows
480 .into_iter()
481 .map(|r| TheoMarkRecord {
482 symbol: r.symbol,
483 theoretical_price: r.theoretical_price,
484 })
485 .collect())
486 }
487
488 async fn get_display_usernames_batch(
489 &self,
490 wallet_strings: &[String],
491 ) -> Result<Vec<WalletUsernameRecord>> {
492 if wallet_strings.is_empty() {
493 return Ok(Vec::new());
494 }
495
496 let mut conn = self.get_conn().await?;
497 let rows: Vec<UserRow> = diesel::sql_query(
498 "SELECT wallet_address, username FROM usernames WHERE LOWER(wallet_address) = ANY($1)",
499 )
500 .bind::<Array<Text>, _>(wallet_strings)
501 .get_results(&mut conn)
502 .await
503 .context("failed to load display usernames")?;
504
505 Ok(rows
506 .into_iter()
507 .map(|r| WalletUsernameRecord {
508 wallet_address: r.wallet_address,
509 username: r.username,
510 })
511 .collect())
512 }
513
514 async fn get_display_username(&self, wallet: &WalletAddress) -> Result<Option<String>> {
515 let mut conn = self.get_conn().await?;
516 let rows: Vec<UsernameRow> = diesel::sql_query(
517 "SELECT username
518 FROM usernames
519 WHERE LOWER(wallet_address) = LOWER($1)",
520 )
521 .bind::<Text, _>(format!("{:#x}", wallet.0))
522 .get_results(&mut conn)
523 .await
524 .context("failed to load display username")?;
525
526 Ok(rows.into_iter().next().map(|r| r.username))
527 }
528
529 async fn get_profile_image_url(&self, wallet: &WalletAddress) -> Result<Option<String>> {
530 let mut conn = self.get_conn().await?;
531 let rows: Vec<ProfileImageRow> = diesel::sql_query(
532 "SELECT profile_image_url
533 FROM user_profiles
534 WHERE wallet_address = $1",
535 )
536 .bind::<Bytea, _>(*wallet)
537 .get_results(&mut conn)
538 .await
539 .context("failed to load profile image URL")?;
540
541 Ok(rows.into_iter().next().and_then(|r| r.profile_image_url))
542 }
543
544 async fn compute_ledger_profile_stats(
545 &self,
546 wallet: &WalletAddress,
547 now_ts_ms: i64,
548 ) -> Result<WalletLedgerStats> {
549 let day_ago = now_ts_ms - 24 * 60 * 60 * 1000;
550 let mut conn = self.get_conn().await?;
551
552 let deposits_row: Vec<DecimalRow> = diesel::sql_query(
553 "SELECT COALESCE(SUM(delta), 0) AS value
554 FROM ledger_events
555 WHERE wallet = $1
556 AND event_type = 'deposit'",
557 )
558 .bind::<Bytea, _>(*wallet)
559 .get_results(&mut conn)
560 .await
561 .context("failed to compute deposits")?;
562
563 let withdrawals_row: Vec<DecimalRow> = diesel::sql_query(
564 "SELECT COALESCE(SUM(ABS(delta)), 0) AS value
565 FROM ledger_events
566 WHERE wallet = $1
567 AND event_type = 'withdraw'",
568 )
569 .bind::<Bytea, _>(*wallet)
570 .get_results(&mut conn)
571 .await
572 .context("failed to compute withdrawals")?;
573
574 let lifetime_realized_row: Vec<DecimalRow> = diesel::sql_query(
575 "SELECT COALESCE(SUM(delta), 0) AS value
576 FROM ledger_events
577 WHERE wallet = $1
578 AND event_type IN ('fill_realized_pnl', 'settlement_realized_pnl')",
579 )
580 .bind::<Bytea, _>(*wallet)
581 .get_results(&mut conn)
582 .await
583 .context("failed to compute lifetime realized pnl")?;
584
585 let pnl_24h_row: Vec<DecimalRow> = diesel::sql_query(
586 "SELECT COALESCE(SUM(delta), 0) AS value
587 FROM ledger_events
588 WHERE wallet = $1
589 AND event_type IN ('fill_realized_pnl', 'settlement_realized_pnl')
590 AND event_ts_ms >= $2",
591 )
592 .bind::<Bytea, _>(*wallet)
593 .bind::<BigInt, _>(day_ago)
594 .get_results(&mut conn)
595 .await
596 .context("failed to compute 24h realized pnl")?;
597
598 let zero = Decimal::ZERO;
599 Ok(WalletLedgerStats {
600 deposits: deposits_row
601 .into_iter()
602 .next()
603 .and_then(|r| r.value)
604 .unwrap_or(zero),
605 withdrawals: withdrawals_row
606 .into_iter()
607 .next()
608 .and_then(|r| r.value)
609 .unwrap_or(zero),
610 lifetime_realized_pnl: lifetime_realized_row
611 .into_iter()
612 .next()
613 .and_then(|r| r.value)
614 .unwrap_or(zero),
615 pnl_24h: pnl_24h_row
616 .into_iter()
617 .next()
618 .and_then(|r| r.value)
619 .unwrap_or(zero),
620 })
621 }
622
623 async fn get_account_first_seen_ts_ms(&self, wallet: &WalletAddress) -> Result<Option<i64>> {
624 let mut conn = self.get_conn().await?;
625 let rows: Vec<TsRow> = diesel::sql_query(
626 "SELECT MIN(event_ts_ms) AS value
627 FROM ledger_events
628 WHERE wallet = $1",
629 )
630 .bind::<Bytea, _>(*wallet)
631 .get_results(&mut conn)
632 .await
633 .context("failed to query account first seen timestamp")?;
634
635 Ok(rows.into_iter().next().and_then(|r| r.value))
636 }
637
638 async fn get_realized_pnl_by_symbol(
639 &self,
640 wallet: &WalletAddress,
641 window_start: Option<i64>,
642 window_end: Option<i64>,
643 ) -> Result<Vec<SymbolPnlRecord>> {
644 let mut conn = self.get_conn().await?;
645 let rows: Vec<SymbolPnlRow> = diesel::sql_query(
646 "SELECT reference_symbol, SUM(delta) AS total_pnl, COUNT(*) AS event_count
647 FROM ledger_events
648 WHERE wallet = $1
649 AND event_type IN ('fill_realized_pnl', 'settlement_realized_pnl')
650 AND reference_symbol IS NOT NULL
651 AND ($2::bigint IS NULL OR event_ts_ms >= $2)
652 AND ($3::bigint IS NULL OR event_ts_ms < $3)
653 GROUP BY reference_symbol
654 ORDER BY ABS(SUM(delta)) DESC",
655 )
656 .bind::<Bytea, _>(*wallet)
657 .bind::<Nullable<BigInt>, _>(window_start)
658 .bind::<Nullable<BigInt>, _>(window_end)
659 .get_results(&mut conn)
660 .await
661 .context("failed to query realized pnl by symbol")?;
662
663 Ok(rows
664 .into_iter()
665 .map(|r| SymbolPnlRecord {
666 symbol: r.reference_symbol,
667 realized_pnl: r.total_pnl,
668 event_count: r.event_count,
669 })
670 .collect())
671 }
672
673 async fn get_profile_trade_history(
674 &self,
675 wallet: &WalletAddress,
676 window_start: Option<i64>,
677 window_end: Option<i64>,
678 from_ts_ms: Option<i64>,
679 to_ts_ms: Option<i64>,
680 symbol: Option<&str>,
681 limit: i64,
682 offset: i64,
683 ) -> Result<Vec<ProfileFillRecord>> {
684 let mut conn = self.get_conn().await?;
685 let symbol_owned: Option<String> = symbol.map(|s| s.to_string());
686 let rows: Vec<ProfileFillDbRow> = diesel::sql_query(
687 "SELECT fill_id, trade_id, wallet_address, symbol, price, size, fee, is_taker,
688 timestamp, created_at, builder_code_address, builder_code_fee, realized_pnl
689 FROM fills
690 WHERE wallet_address = $1
691 AND ($2::bigint IS NULL OR timestamp >= $2)
692 AND ($3::bigint IS NULL OR timestamp < $3)
693 AND ($4::bigint IS NULL OR timestamp >= $4)
694 AND ($5::bigint IS NULL OR timestamp <= $5)
695 AND ($8::text IS NULL OR symbol = $8)
696 ORDER BY timestamp DESC
697 LIMIT $6 OFFSET $7",
698 )
699 .bind::<Bytea, _>(*wallet)
700 .bind::<Nullable<BigInt>, _>(window_start)
701 .bind::<Nullable<BigInt>, _>(window_end)
702 .bind::<Nullable<BigInt>, _>(from_ts_ms)
703 .bind::<Nullable<BigInt>, _>(to_ts_ms)
704 .bind::<BigInt, _>(limit)
705 .bind::<BigInt, _>(offset)
706 .bind::<Nullable<Text>, _>(&symbol_owned)
707 .get_results(&mut conn)
708 .await
709 .context("failed to query profile trade history")?;
710
711 Ok(rows.into_iter().map(Into::into).collect())
712 }
713
714 async fn get_platform_wallet_metrics(&self) -> Result<Vec<PlatformWalletMetrics>> {
715 let mut conn = self.get_conn().await?;
716
717 let realized_rows: Vec<RealizedRow> = diesel::sql_query(
719 "SELECT wallet, ROUND(COALESCE(SUM(delta), 0), 8) AS realized
720 FROM ledger_events
721 WHERE event_type IN ('fill_realized_pnl', 'settlement_realized_pnl')
722 GROUP BY wallet",
723 )
724 .get_results(&mut conn)
725 .await
726 .context("failed to load platform realized pnl rows")?;
727
728 let volume_rows: Vec<VolumeRow> = diesel::sql_query(
729 "SELECT wallet, ROUND(COALESCE(SUM(price * size), 0), 8) AS volume
730 FROM competition_fill_events
731 GROUP BY wallet",
732 )
733 .get_results(&mut conn)
734 .await
735 .context("failed to load platform volume rows")?;
736
737 let mut by_wallet: std::collections::HashMap<WalletAddress, PlatformWalletMetrics> =
738 std::collections::HashMap::new();
739 for row in realized_rows {
740 by_wallet.insert(
741 row.wallet,
742 PlatformWalletMetrics {
743 wallet: row.wallet,
744 realized: row.realized,
745 volume: Decimal::ZERO,
746 },
747 );
748 }
749 for row in volume_rows {
750 by_wallet
751 .entry(row.wallet)
752 .and_modify(|entry| entry.volume = row.volume)
753 .or_insert_with(|| PlatformWalletMetrics {
754 wallet: row.wallet,
755 realized: Decimal::ZERO,
756 volume: row.volume,
757 });
758 }
759
760 Ok(by_wallet.into_values().collect())
761 }
762}
763
764#[async_trait::async_trait]
769impl CompetitionWriter for DieselDb {
770 async fn create_competition(
771 &self,
772 input: &CompetitionUpsertInput,
773 ) -> std::result::Result<CompetitionRecord, CompetitionWriteError> {
774 let mut conn = self
775 .get_conn()
776 .await
777 .map_err(CompetitionWriteError::Internal)?;
778 let row: CompetitionRow = diesel::sql_query(
779 "INSERT INTO competitions
780 (name, description, rules_url, rules_content, win_conditions, primary_win_condition, start_ts_ms, end_ts_ms)
781 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
782 RETURNING id, name, description, rules_url, rules_content, win_conditions, primary_win_condition,
783 start_ts_ms, end_ts_ms, created_at, updated_at",
784 )
785 .bind::<Text, _>(&input.name)
786 .bind::<Nullable<Text>, _>(&input.description)
787 .bind::<Nullable<Text>, _>(&input.rules_url)
788 .bind::<Nullable<Text>, _>(&input.rules_content)
789 .bind::<Array<Text>, _>(&input.win_conditions)
790 .bind::<Text, _>(&input.primary_win_condition)
791 .bind::<BigInt, _>(input.start_ts_ms)
792 .bind::<BigInt, _>(input.end_ts_ms)
793 .get_result(&mut conn)
794 .await
795 .map_err(|e| {
796 if is_overlap_violation(&e) {
797 CompetitionWriteError::OverlapViolation
798 } else {
799 CompetitionWriteError::Internal(anyhow::anyhow!(
800 "failed to create competition: {}",
801 e
802 ))
803 }
804 })?;
805
806 Ok(row.into())
807 }
808
809 async fn update_competition(
810 &self,
811 competition_id: i64,
812 name: &str,
813 description: Option<&str>,
814 rules_url: Option<&str>,
815 rules_content: Option<&str>,
816 win_conditions: &[String],
817 primary_win_condition: &str,
818 start_ts_ms: i64,
819 end_ts_ms: i64,
820 ) -> std::result::Result<CompetitionRecord, CompetitionWriteError> {
821 let mut conn = self
822 .get_conn()
823 .await
824 .map_err(CompetitionWriteError::Internal)?;
825 let desc_owned: Option<String> = description.map(|s| s.to_string());
826 let rules_url_owned: Option<String> = rules_url.map(|s| s.to_string());
827 let rules_content_owned: Option<String> = rules_content.map(|s| s.to_string());
828 let rows: Vec<CompetitionRow> = diesel::sql_query(
829 "UPDATE competitions
830 SET name = $1,
831 description = $2,
832 rules_url = $3,
833 rules_content = $4,
834 win_conditions = $5,
835 primary_win_condition = $6,
836 start_ts_ms = $7,
837 end_ts_ms = $8,
838 updated_at = NOW()
839 WHERE id = $9
840 RETURNING id, name, description, rules_url, rules_content, win_conditions, primary_win_condition,
841 start_ts_ms, end_ts_ms, created_at, updated_at",
842 )
843 .bind::<Text, _>(name)
844 .bind::<Nullable<Text>, _>(&desc_owned)
845 .bind::<Nullable<Text>, _>(&rules_url_owned)
846 .bind::<Nullable<Text>, _>(&rules_content_owned)
847 .bind::<Array<Text>, _>(win_conditions)
848 .bind::<Text, _>(primary_win_condition)
849 .bind::<BigInt, _>(start_ts_ms)
850 .bind::<BigInt, _>(end_ts_ms)
851 .bind::<BigInt, _>(competition_id)
852 .get_results(&mut conn)
853 .await
854 .map_err(|e| {
855 if is_overlap_violation(&e) {
856 CompetitionWriteError::OverlapViolation
857 } else {
858 CompetitionWriteError::Internal(anyhow::anyhow!(
859 "failed to update competition: {}",
860 e
861 ))
862 }
863 })?;
864
865 rows.into_iter().next().map(Into::into).ok_or_else(|| {
866 CompetitionWriteError::NotFound(format!("competition {}", competition_id))
867 })
868 }
869
870 async fn delete_competition(&self, competition_id: i64) -> Result<usize> {
871 let mut conn = self.get_conn().await?;
872 let result = diesel::sql_query("DELETE FROM competitions WHERE id = $1")
873 .bind::<BigInt, _>(competition_id)
874 .execute(&mut conn)
875 .await
876 .context("failed to delete competition")?;
877
878 Ok(result)
879 }
880
881 async fn record_competition_fill(&self, input: &CompetitionFillInput) -> Result<()> {
882 let mut conn = self.get_conn().await?;
883 diesel::sql_query(
884 "INSERT INTO competition_fill_events
885 (trade_id, wallet, symbol, side, price, size, fee, timestamp_ms)
886 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
887 ON CONFLICT (trade_id, wallet) DO NOTHING",
888 )
889 .bind::<BigInt, _>(input.trade_id)
890 .bind::<Bytea, _>(input.wallet)
891 .bind::<Text, _>(&input.symbol)
892 .bind::<Text, _>(&input.side)
893 .bind::<Numeric, _>(input.price)
894 .bind::<Numeric, _>(input.size)
895 .bind::<Numeric, _>(input.fee)
896 .bind::<BigInt, _>(input.timestamp_ms)
897 .execute(&mut conn)
898 .await
899 .context("failed to insert competition fill event")?;
900
901 Ok(())
902 }
903
904 async fn finalize_competition(
905 &self,
906 competition_id: i64,
907 stats: &[CompetitionFinalStatsInput],
908 ) -> Result<usize> {
909 let mut conn = self.get_conn().await?;
910 let stats_owned: Vec<CompetitionFinalStatsInput> = stats.to_vec();
911
912 let inserted_count: usize = conn
913 .transaction(async |conn| {
914 diesel::sql_query("SELECT pg_advisory_xact_lock($1)")
915 .bind::<BigInt, _>(competition_id)
916 .execute(&mut *conn)
917 .await?;
918
919 let already_finalized: Vec<ExistsRow> = diesel::sql_query(
920 "SELECT 1 AS one FROM competition_final_stats WHERE competition_id = $1 LIMIT 1",
921 )
922 .bind::<BigInt, _>(competition_id)
923 .get_results(&mut *conn)
924 .await?;
925
926 if !already_finalized.is_empty() {
927 return Ok(0);
928 }
929
930 let mut count = 0_usize;
931 for row in &stats_owned {
932 let result = diesel::sql_query(
933 "INSERT INTO competition_final_stats
934 (competition_id, wallet, rank, pnl, volume, efficiency, medal, finalized_at)
935 VALUES ($1, $2, $3, $4, $5, $6, $7, NOW())
936 ON CONFLICT (competition_id, wallet) DO NOTHING",
937 )
938 .bind::<BigInt, _>(competition_id)
939 .bind::<Bytea, _>(row.wallet)
940 .bind::<Integer, _>(row.rank)
941 .bind::<Numeric, _>(row.pnl)
942 .bind::<Numeric, _>(row.volume)
943 .bind::<Numeric, _>(row.efficiency)
944 .bind::<Nullable<Integer>, _>(row.medal)
945 .execute(&mut *conn)
946 .await?;
947
948 count += result;
949 }
950
951 Ok::<_, diesel::result::Error>(count)
952 })
953 .await
954 .context("failed to commit finalization tx")?;
955
956 Ok(inserted_count)
957 }
958
959 async fn set_profile_image_url(
960 &self,
961 wallet: &WalletAddress,
962 profile_image_url: &str,
963 ) -> Result<Option<String>> {
964 let mut conn = self.get_conn().await?;
965 let previous_rows: Vec<ProfileImageRow> = diesel::sql_query(
966 "WITH previous AS (
967 SELECT profile_image_url
968 FROM user_profiles
969 WHERE wallet_address = $1
970 ),
971 upsert AS (
972 INSERT INTO user_profiles (
973 wallet_address,
974 profile_image_url,
975 profile_image_updated_at
976 )
977 VALUES ($1, $2, NOW())
978 ON CONFLICT (wallet_address)
979 DO UPDATE SET
980 profile_image_url = EXCLUDED.profile_image_url,
981 profile_image_updated_at = EXCLUDED.profile_image_updated_at,
982 updated_at = NOW()
983 RETURNING 1
984 )
985 SELECT profile_image_url FROM previous",
986 )
987 .bind::<Bytea, _>(*wallet)
988 .bind::<Text, _>(profile_image_url)
989 .get_results(&mut conn)
990 .await
991 .context("failed to set profile image URL")?;
992
993 Ok(previous_rows
994 .into_iter()
995 .next()
996 .and_then(|r| r.profile_image_url))
997 }
998}
999
1000#[cfg(test)]
1001mod tests {
1002 use crate::test_helpers::TestDb;
1003 use hypercall_db::*;
1004 use hypercall_types::wallet_address::test_wallet;
1005 use rust_decimal_macros::dec;
1006
1007 fn make_competition_input(name: &str, start: i64, end: i64) -> CompetitionUpsertInput {
1008 CompetitionUpsertInput {
1009 name: name.to_string(),
1010 description: Some("Test competition".to_string()),
1011 rules_url: None,
1012 rules_content: None,
1013 win_conditions: vec!["pnl".to_string(), "volume".to_string()],
1014 primary_win_condition: "pnl".to_string(),
1015 start_ts_ms: start,
1016 end_ts_ms: end,
1017 }
1018 }
1019
1020 #[tokio::test]
1021 async fn create_and_list_competitions() {
1022 let test_db = TestDb::new().await.unwrap();
1023 let db = test_db.diesel_db().await;
1024
1025 let input = make_competition_input("Season 1", 1_000_000, 2_000_000);
1026 let created = db.create_competition(&input).await.unwrap();
1027
1028 assert_eq!(created.name, "Season 1");
1029 assert_eq!(created.start_ts_ms, 1_000_000);
1030 assert_eq!(created.end_ts_ms, 2_000_000);
1031 assert_eq!(created.primary_win_condition, "pnl");
1032 assert_eq!(created.win_conditions, vec!["pnl", "volume"]);
1033 assert_eq!(created.description, Some("Test competition".to_string()));
1034
1035 let all = db
1037 .list_competitions(None, None, None, 1_500_000, 100, 0)
1038 .await
1039 .unwrap();
1040 assert_eq!(all.len(), 1);
1041 assert_eq!(all[0].id, created.id);
1042 }
1043
1044 #[tokio::test]
1045 async fn get_competition_by_id_after_create() {
1046 let test_db = TestDb::new().await.unwrap();
1047 let db = test_db.diesel_db().await;
1048
1049 let input = make_competition_input("Season 2", 3_000_000, 4_000_000);
1050 let created = db.create_competition(&input).await.unwrap();
1051
1052 let fetched = db.get_competition_by_id(created.id).await.unwrap().unwrap();
1053 assert_eq!(fetched.name, "Season 2");
1054 assert_eq!(fetched.id, created.id);
1055
1056 let missing = db.get_competition_by_id(99999).await.unwrap();
1058 assert!(missing.is_none());
1059 }
1060
1061 #[tokio::test]
1062 async fn delete_competition_removes_it() {
1063 let test_db = TestDb::new().await.unwrap();
1064 let db = test_db.diesel_db().await;
1065
1066 let input = make_competition_input("Ephemeral", 5_000_000, 6_000_000);
1067 let created = db.create_competition(&input).await.unwrap();
1068
1069 let deleted = db.delete_competition(created.id).await.unwrap();
1070 assert_eq!(deleted, 1);
1071
1072 let fetched = db.get_competition_by_id(created.id).await.unwrap();
1073 assert!(fetched.is_none());
1074
1075 let deleted_again = db.delete_competition(created.id).await.unwrap();
1077 assert_eq!(deleted_again, 0);
1078 }
1079
1080 #[tokio::test]
1081 async fn finalize_competition_inserts_stats() {
1082 let test_db = TestDb::new().await.unwrap();
1083 let db = test_db.diesel_db().await;
1084
1085 let input = make_competition_input("Finals", 1_000, 2_000);
1086 let created = db.create_competition(&input).await.unwrap();
1087
1088 let stats = vec![
1089 CompetitionFinalStatsInput {
1090 competition_id: created.id,
1091 wallet: test_wallet(1),
1092 rank: 1,
1093 pnl: dec!(5000),
1094 volume: dec!(100000),
1095 efficiency: dec!(0.05),
1096 medal: Some(1),
1097 },
1098 CompetitionFinalStatsInput {
1099 competition_id: created.id,
1100 wallet: test_wallet(2),
1101 rank: 2,
1102 pnl: dec!(3000),
1103 volume: dec!(80000),
1104 efficiency: dec!(0.0375),
1105 medal: Some(2),
1106 },
1107 ];
1108
1109 let inserted = db.finalize_competition(created.id, &stats).await.unwrap();
1110 assert_eq!(inserted, 2);
1111
1112 let finalized = db.get_finalized_stats(created.id).await.unwrap();
1114 assert_eq!(finalized.len(), 2);
1115 assert_eq!(finalized[0].rank, 1);
1116 assert_eq!(finalized[0].wallet, test_wallet(1));
1117 assert_eq!(finalized[0].pnl, dec!(5000));
1118 assert_eq!(finalized[1].rank, 2);
1119 assert_eq!(finalized[1].wallet, test_wallet(2));
1120
1121 let inserted_again = db.finalize_competition(created.id, &stats).await.unwrap();
1123 assert_eq!(inserted_again, 0);
1124 }
1125
1126 #[tokio::test]
1127 async fn get_active_competition_returns_current() {
1128 let test_db = TestDb::new().await.unwrap();
1129 let db = test_db.diesel_db().await;
1130
1131 let input = make_competition_input("Active One", 1000, 2000);
1133 let created = db.create_competition(&input).await.unwrap();
1134
1135 let active = db.get_active_competition(1500).await.unwrap();
1136 assert!(active.is_some());
1137 assert_eq!(active.unwrap().id, created.id);
1138
1139 let before = db.get_active_competition(500).await.unwrap();
1141 assert!(before.is_none());
1142
1143 let after = db.get_active_competition(2500).await.unwrap();
1145 assert!(after.is_none());
1146 }
1147
1148 #[tokio::test]
1149 async fn get_competitions_to_finalize_finds_unfinalized() {
1150 let test_db = TestDb::new().await.unwrap();
1151 let db = test_db.diesel_db().await;
1152
1153 let input = make_competition_input("Ended", 1000, 2000);
1155 let created = db.create_competition(&input).await.unwrap();
1156
1157 let to_finalize = db.get_competitions_to_finalize(3000).await.unwrap();
1159 assert_eq!(to_finalize.len(), 1);
1160 assert_eq!(to_finalize[0].id, created.id);
1161
1162 let stats = vec![CompetitionFinalStatsInput {
1164 competition_id: created.id,
1165 wallet: test_wallet(1),
1166 rank: 1,
1167 pnl: dec!(1000),
1168 volume: dec!(50000),
1169 efficiency: dec!(0.02),
1170 medal: Some(1),
1171 }];
1172 db.finalize_competition(created.id, &stats).await.unwrap();
1173
1174 let to_finalize_after = db.get_competitions_to_finalize(3000).await.unwrap();
1175 assert!(to_finalize_after.is_empty());
1176 }
1177
1178 #[tokio::test]
1179 async fn record_competition_fill_is_idempotent() {
1180 let test_db = TestDb::new().await.unwrap();
1181 let db = test_db.diesel_db().await;
1182
1183 let fill_input = CompetitionFillInput {
1184 trade_id: 999,
1185 wallet: test_wallet(60),
1186 symbol: "BTC-20260131-100000-C".to_string(),
1187 side: "Buy".to_string(),
1188 price: dec!(5.0),
1189 size: dec!(1000000),
1190 fee: dec!(0.01),
1191 timestamp_ms: 1_500_000,
1192 };
1193
1194 db.record_competition_fill(&fill_input).await.unwrap();
1196
1197 db.record_competition_fill(&fill_input).await.unwrap();
1199
1200 let fills = db.get_competition_fills_before(2_000_000).await.unwrap();
1202 assert_eq!(fills.len(), 1);
1203 assert_eq!(fills[0].wallet, test_wallet(60));
1204 assert_eq!(fills[0].symbol, "BTC-20260131-100000-C");
1205 assert_eq!(fills[0].price, dec!(5.0));
1206 }
1207
1208 #[tokio::test]
1209 async fn set_profile_image_url_roundtrip() {
1210 let test_db = TestDb::new().await.unwrap();
1211 let db = test_db.diesel_db().await;
1212
1213 let wallet = test_wallet(70);
1214
1215 let prev = db
1217 .set_profile_image_url(&wallet, "https://example.com/img1.png")
1218 .await
1219 .unwrap();
1220 assert!(prev.is_none());
1221
1222 let prev2 = db
1224 .set_profile_image_url(&wallet, "https://example.com/img2.png")
1225 .await
1226 .unwrap();
1227 assert_eq!(prev2, Some("https://example.com/img1.png".to_string()));
1228
1229 let url = db.get_profile_image_url(&wallet).await.unwrap();
1231 assert_eq!(url, Some("https://example.com/img2.png".to_string()));
1232 }
1233}