Skip to main content

hypercall_db_diesel/
competition.rs

1//! CompetitionReader + CompetitionWriter trait implementations for DieselDb.
2//!
3//! Manages competition CRUD, leaderboard data, profile image management, and
4//! competition finalization.
5
6use anyhow::{Context, Result};
7use chrono::{DateTime, Utc};
8use diesel::prelude::*;
9use diesel::sql_types::*;
10use diesel_async::{AsyncConnection, RunQueryDsl};
11use hypercall_types::WalletAddress;
12use rust_decimal::Decimal;
13
14use crate::diesel_db::DieselDb;
15use hypercall_db::{
16    CompetitionFillInput, CompetitionFillRecord, CompetitionFinalStatsInput,
17    CompetitionFinalStatsRecord, CompetitionReader, CompetitionRecord, CompetitionUpsertInput,
18    CompetitionWriteError, CompetitionWriter, PlatformWalletMetrics, ProfileFillRecord,
19    SymbolPnlRecord, TheoMarkRecord, WalletLedgerStats, WalletUsernameRecord,
20};
21
22// =============================================================================
23// Internal Diesel QueryableByName helper structs
24// =============================================================================
25
26#[derive(Debug, QueryableByName)]
27struct CompetitionRow {
28    #[diesel(sql_type = BigInt)]
29    pub id: i64,
30    #[diesel(sql_type = Text)]
31    pub name: String,
32    #[diesel(sql_type = Nullable<Text>)]
33    pub description: Option<String>,
34    #[diesel(sql_type = Nullable<Text>)]
35    pub rules_url: Option<String>,
36    #[diesel(sql_type = Nullable<Text>)]
37    pub rules_content: Option<String>,
38    #[diesel(sql_type = Array<Text>)]
39    pub win_conditions: Vec<String>,
40    #[diesel(sql_type = Text)]
41    pub primary_win_condition: String,
42    #[diesel(sql_type = BigInt)]
43    pub start_ts_ms: i64,
44    #[diesel(sql_type = BigInt)]
45    pub end_ts_ms: i64,
46    #[diesel(sql_type = Timestamptz)]
47    pub created_at: DateTime<Utc>,
48    #[diesel(sql_type = Timestamptz)]
49    pub updated_at: DateTime<Utc>,
50}
51
52impl From<CompetitionRow> for CompetitionRecord {
53    fn from(r: CompetitionRow) -> Self {
54        Self {
55            id: r.id,
56            name: r.name,
57            description: r.description,
58            rules_url: r.rules_url,
59            rules_content: r.rules_content,
60            win_conditions: r.win_conditions,
61            primary_win_condition: r.primary_win_condition,
62            start_ts_ms: r.start_ts_ms,
63            end_ts_ms: r.end_ts_ms,
64            created_at: r.created_at,
65            updated_at: r.updated_at,
66        }
67    }
68}
69
70#[derive(Debug, QueryableByName)]
71struct CompetitionFillRow {
72    #[diesel(sql_type = Bytea)]
73    pub wallet: WalletAddress,
74    #[diesel(sql_type = Text)]
75    pub symbol: String,
76    #[diesel(sql_type = Text)]
77    pub side: String,
78    #[diesel(sql_type = Numeric)]
79    pub price: Decimal,
80    #[diesel(sql_type = Numeric)]
81    pub size: Decimal,
82    #[diesel(sql_type = Numeric)]
83    pub fee: Decimal,
84    #[diesel(sql_type = BigInt)]
85    pub timestamp_ms: i64,
86}
87
88impl From<CompetitionFillRow> for CompetitionFillRecord {
89    fn from(r: CompetitionFillRow) -> Self {
90        Self {
91            wallet: r.wallet,
92            symbol: r.symbol,
93            side: r.side,
94            price: r.price,
95            size: r.size,
96            fee: r.fee,
97            timestamp_ms: r.timestamp_ms,
98        }
99    }
100}
101
102#[derive(Debug, QueryableByName)]
103struct FinalizedStatsRow {
104    #[diesel(sql_type = Integer)]
105    pub rank: i32,
106    #[diesel(sql_type = Bytea)]
107    pub wallet: WalletAddress,
108    #[diesel(sql_type = Numeric)]
109    pub pnl: Decimal,
110    #[diesel(sql_type = Numeric)]
111    pub volume: Decimal,
112    #[diesel(sql_type = Numeric)]
113    pub efficiency: Decimal,
114    #[diesel(sql_type = Nullable<Integer>)]
115    pub medal: Option<i32>,
116}
117
118impl From<FinalizedStatsRow> for CompetitionFinalStatsRecord {
119    fn from(r: FinalizedStatsRow) -> Self {
120        Self {
121            wallet: r.wallet,
122            rank: r.rank,
123            pnl: r.pnl,
124            volume: r.volume,
125            efficiency: r.efficiency,
126            medal: r.medal,
127        }
128    }
129}
130
131#[derive(Debug, QueryableByName)]
132struct DecimalRow {
133    #[diesel(sql_type = Nullable<Numeric>)]
134    pub value: Option<Decimal>,
135}
136
137#[derive(Debug, QueryableByName)]
138struct TsRow {
139    #[diesel(sql_type = Nullable<BigInt>)]
140    pub value: Option<i64>,
141}
142
143#[derive(Debug, QueryableByName)]
144struct SymbolPnlRow {
145    #[diesel(sql_type = Text)]
146    pub reference_symbol: String,
147    #[diesel(sql_type = Numeric)]
148    pub total_pnl: Decimal,
149    #[diesel(sql_type = BigInt)]
150    pub event_count: i64,
151}
152
153#[derive(Debug, QueryableByName)]
154struct ProfileFillDbRow {
155    #[diesel(sql_type = Nullable<BigInt>)]
156    pub fill_id: Option<i64>,
157    #[diesel(sql_type = BigInt)]
158    pub trade_id: i64,
159    #[diesel(sql_type = Bytea)]
160    pub wallet_address: WalletAddress,
161    #[diesel(sql_type = Text)]
162    pub symbol: String,
163    #[diesel(sql_type = Numeric)]
164    pub price: Decimal,
165    #[diesel(sql_type = Numeric)]
166    pub size: Decimal,
167    #[diesel(sql_type = Numeric)]
168    pub fee: Decimal,
169    #[diesel(sql_type = Bool)]
170    pub is_taker: bool,
171    #[diesel(sql_type = BigInt)]
172    pub timestamp: i64,
173    #[diesel(sql_type = Nullable<Timestamp>)]
174    pub created_at: Option<chrono::NaiveDateTime>,
175    #[diesel(sql_type = Nullable<Bytea>)]
176    pub builder_code_address: Option<WalletAddress>,
177    #[diesel(sql_type = Nullable<Numeric>)]
178    pub builder_code_fee: Option<Decimal>,
179    #[diesel(sql_type = Nullable<Numeric>)]
180    pub realized_pnl: Option<Decimal>,
181}
182
183impl From<ProfileFillDbRow> for ProfileFillRecord {
184    fn from(r: ProfileFillDbRow) -> Self {
185        Self {
186            fill_id: r.fill_id,
187            trade_id: r.trade_id,
188            wallet_address: r.wallet_address,
189            symbol: r.symbol,
190            price: r.price,
191            size: r.size,
192            fee: r.fee,
193            is_taker: r.is_taker,
194            timestamp: r.timestamp,
195            created_at: r.created_at.map(|dt| dt.and_utc()),
196            builder_code_address: r.builder_code_address,
197            builder_code_fee: r.builder_code_fee,
198            realized_pnl: r.realized_pnl,
199        }
200    }
201}
202
203#[derive(Debug, QueryableByName)]
204struct RealizedRow {
205    #[diesel(sql_type = Bytea)]
206    pub wallet: WalletAddress,
207    #[diesel(sql_type = Numeric)]
208    pub realized: Decimal,
209}
210
211#[derive(Debug, QueryableByName)]
212struct VolumeRow {
213    #[diesel(sql_type = Bytea)]
214    pub wallet: WalletAddress,
215    #[diesel(sql_type = Numeric)]
216    pub volume: Decimal,
217}
218
219#[derive(Debug, QueryableByName)]
220struct MarkRow {
221    #[diesel(sql_type = Text)]
222    pub symbol: String,
223    #[diesel(sql_type = Numeric)]
224    pub theoretical_price: Decimal,
225}
226
227#[derive(Debug, QueryableByName)]
228struct UserRow {
229    #[diesel(sql_type = Text)]
230    pub wallet_address: String,
231    #[diesel(sql_type = Text)]
232    pub username: String,
233}
234
235#[derive(Debug, QueryableByName)]
236struct UsernameRow {
237    #[diesel(sql_type = Text)]
238    pub username: String,
239}
240
241#[derive(Debug, QueryableByName)]
242struct ProfileImageRow {
243    #[diesel(sql_type = Nullable<Text>)]
244    pub profile_image_url: Option<String>,
245}
246
247#[derive(QueryableByName)]
248struct ApprovedUsernameRow {
249    #[diesel(sql_type = Nullable<Text>)]
250    #[allow(dead_code)]
251    pub approved_username: Option<String>,
252}
253
254#[derive(QueryableByName)]
255struct ExistsRow {
256    #[diesel(sql_type = Integer)]
257    #[allow(dead_code)]
258    pub one: i32,
259}
260
261// =============================================================================
262// Helper functions
263// =============================================================================
264
265fn is_overlap_violation(err: &diesel::result::Error) -> bool {
266    match err {
267        diesel::result::Error::DatabaseError(
268            diesel::result::DatabaseErrorKind::ExclusionViolation,
269            _,
270        ) => true,
271        diesel::result::Error::DatabaseError(diesel::result::DatabaseErrorKind::Unknown, info) => {
272            info.message().contains("exclusion")
273                || info
274                    .constraint_name()
275                    .map(|c| c.contains("excl"))
276                    .unwrap_or(false)
277        }
278        _ => false,
279    }
280}
281
282fn is_unique_violation(err: &diesel::result::Error) -> bool {
283    matches!(
284        err,
285        diesel::result::Error::DatabaseError(diesel::result::DatabaseErrorKind::UniqueViolation, _)
286    )
287}
288
289// =============================================================================
290// CompetitionReader
291// =============================================================================
292
293#[async_trait::async_trait]
294impl CompetitionReader for DieselDb {
295    async fn list_competitions(
296        &self,
297        state_filter: Option<&str>,
298        from_ts_ms: Option<i64>,
299        to_ts_ms: Option<i64>,
300        now_ts_ms: i64,
301        limit: i64,
302        offset: i64,
303    ) -> Result<Vec<CompetitionRecord>> {
304        let mut conn = self.get_conn().await?;
305        let state_owned: Option<String> = state_filter.map(|s| s.to_string());
306        let rows: Vec<CompetitionRow> = diesel::sql_query(
307            "SELECT id, name, description, rules_url, rules_content, win_conditions, primary_win_condition,
308                    start_ts_ms, end_ts_ms, created_at, updated_at
309             FROM competitions
310             WHERE ($1::bigint IS NULL OR end_ts_ms >= $1)
311               AND ($2::bigint IS NULL OR start_ts_ms <= $2)
312               AND (
313                    $3::text IS NULL
314                    OR ($3 = 'pre' AND $4 < start_ts_ms)
315                    OR ($3 = 'active' AND $4 >= start_ts_ms AND $4 < end_ts_ms)
316                    OR ($3 = 'post' AND $4 >= end_ts_ms)
317               )
318             ORDER BY start_ts_ms DESC
319             LIMIT $5 OFFSET $6",
320        )
321        .bind::<Nullable<BigInt>, _>(from_ts_ms)
322        .bind::<Nullable<BigInt>, _>(to_ts_ms)
323        .bind::<Nullable<Text>, _>(&state_owned)
324        .bind::<BigInt, _>(now_ts_ms)
325        .bind::<BigInt, _>(limit)
326        .bind::<BigInt, _>(offset)
327        .get_results(&mut conn)
328        .await
329        .context("failed to list competitions")?;
330
331        Ok(rows.into_iter().map(Into::into).collect())
332    }
333
334    async fn get_competition_by_id(
335        &self,
336        competition_id: i64,
337    ) -> Result<Option<CompetitionRecord>> {
338        let mut conn = self.get_conn().await?;
339        let rows: Vec<CompetitionRow> = diesel::sql_query(
340            "SELECT id, name, description, rules_url, rules_content, win_conditions, primary_win_condition,
341                    start_ts_ms, end_ts_ms, created_at, updated_at
342             FROM competitions
343             WHERE id = $1",
344        )
345        .bind::<BigInt, _>(competition_id)
346        .get_results(&mut conn)
347        .await
348        .context("failed to fetch competition")?;
349
350        Ok(rows.into_iter().next().map(Into::into))
351    }
352
353    async fn get_active_competition(&self, now_ts_ms: i64) -> Result<Option<CompetitionRecord>> {
354        let mut conn = self.get_conn().await?;
355        let rows: Vec<CompetitionRow> = diesel::sql_query(
356            "SELECT id, name, description, rules_url, rules_content, win_conditions, primary_win_condition,
357                    start_ts_ms, end_ts_ms, created_at, updated_at
358             FROM competitions
359             WHERE start_ts_ms <= $1 AND end_ts_ms > $1
360             ORDER BY start_ts_ms DESC
361             LIMIT 1",
362        )
363        .bind::<BigInt, _>(now_ts_ms)
364        .get_results(&mut conn)
365        .await
366        .context("failed to query active competition")?;
367
368        Ok(rows.into_iter().next().map(Into::into))
369    }
370
371    async fn get_latest_completed_competition(
372        &self,
373        now_ts_ms: i64,
374    ) -> Result<Option<CompetitionRecord>> {
375        let mut conn = self.get_conn().await?;
376        let rows: Vec<CompetitionRow> = diesel::sql_query(
377            "SELECT id, name, description, rules_url, rules_content, win_conditions, primary_win_condition,
378                    start_ts_ms, end_ts_ms, created_at, updated_at
379             FROM competitions
380             WHERE end_ts_ms <= $1
381             ORDER BY end_ts_ms DESC
382             LIMIT 1",
383        )
384        .bind::<BigInt, _>(now_ts_ms)
385        .get_results(&mut conn)
386        .await
387        .context("failed to query latest completed competition")?;
388
389        Ok(rows.into_iter().next().map(Into::into))
390    }
391
392    async fn get_competitions_to_finalize(&self, now_ts_ms: i64) -> Result<Vec<CompetitionRecord>> {
393        let mut conn = self.get_conn().await?;
394        let rows: Vec<CompetitionRow> = diesel::sql_query(
395            "SELECT c.id, c.name, c.description, c.rules_url, c.rules_content, c.win_conditions,
396                    c.primary_win_condition, c.start_ts_ms, c.end_ts_ms, c.created_at, c.updated_at
397             FROM competitions c
398             WHERE c.end_ts_ms <= $1
399               AND NOT EXISTS (
400                   SELECT 1
401                   FROM competition_final_stats fs
402                   WHERE fs.competition_id = c.id
403               )
404             ORDER BY c.end_ts_ms ASC",
405        )
406        .bind::<BigInt, _>(now_ts_ms)
407        .get_results(&mut conn)
408        .await
409        .context("failed to load finalization candidates")?;
410
411        Ok(rows.into_iter().map(Into::into).collect())
412    }
413
414    async fn get_finalized_stats(
415        &self,
416        competition_id: i64,
417    ) -> Result<Vec<CompetitionFinalStatsRecord>> {
418        let mut conn = self.get_conn().await?;
419        let rows: Vec<FinalizedStatsRow> = diesel::sql_query(
420            "SELECT cfs.rank, cfs.wallet, cfs.pnl, cfs.volume, cfs.efficiency, cfs.medal
421             FROM competition_final_stats cfs
422             LEFT JOIN user_tiers ut ON cfs.wallet = ut.wallet_address
423             WHERE cfs.competition_id = $1
424               AND (ut.wallet_address IS NULL OR ut.tier != 'market_maker')
425             ORDER BY cfs.rank ASC",
426        )
427        .bind::<BigInt, _>(competition_id)
428        .get_results(&mut conn)
429        .await
430        .context("failed to query finalized leaderboard")?;
431
432        Ok(rows.into_iter().map(Into::into).collect())
433    }
434
435    async fn get_competition_fills_before(
436        &self,
437        cutoff_ts_ms: i64,
438    ) -> Result<Vec<CompetitionFillRecord>> {
439        let mut conn = self.get_conn().await?;
440        let rows: Vec<CompetitionFillRow> = diesel::sql_query(
441            "SELECT cfe.wallet, cfe.symbol, cfe.side, cfe.price, cfe.size, cfe.fee, cfe.timestamp_ms
442             FROM competition_fill_events cfe
443             LEFT JOIN user_tiers ut ON cfe.wallet = ut.wallet_address
444             WHERE cfe.timestamp_ms < $1
445               AND (ut.wallet_address IS NULL OR ut.tier != 'market_maker')
446             ORDER BY cfe.timestamp_ms ASC, cfe.id ASC",
447        )
448        .bind::<BigInt, _>(cutoff_ts_ms)
449        .get_results(&mut conn)
450        .await
451        .context("failed to query competition fill events")?;
452
453        Ok(rows.into_iter().map(Into::into).collect())
454    }
455
456    async fn get_historical_theo_marks(
457        &self,
458        symbols: &[String],
459        cutoff_ts_ms: i64,
460    ) -> Result<Vec<TheoMarkRecord>> {
461        if symbols.is_empty() {
462            return Ok(Vec::new());
463        }
464
465        let mut conn = self.get_conn().await?;
466        let rows: Vec<MarkRow> = diesel::sql_query(
467            "SELECT DISTINCT ON (symbol) symbol, theoretical_price
468             FROM historical_theo_snapshots
469             WHERE symbol = ANY($1)
470               AND timestamp_ms <= $2
471             ORDER BY symbol, timestamp_ms DESC, interval_ms ASC",
472        )
473        .bind::<Array<Text>, _>(symbols)
474        .bind::<BigInt, _>(cutoff_ts_ms)
475        .get_results(&mut conn)
476        .await
477        .context("failed to load historical theoretical marks")?;
478
479        Ok(rows
480            .into_iter()
481            .map(|r| TheoMarkRecord {
482                symbol: r.symbol,
483                theoretical_price: r.theoretical_price,
484            })
485            .collect())
486    }
487
488    async fn get_display_usernames_batch(
489        &self,
490        wallet_strings: &[String],
491    ) -> Result<Vec<WalletUsernameRecord>> {
492        if wallet_strings.is_empty() {
493            return Ok(Vec::new());
494        }
495
496        let mut conn = self.get_conn().await?;
497        let rows: Vec<UserRow> = diesel::sql_query(
498            "SELECT wallet_address, username FROM usernames WHERE LOWER(wallet_address) = ANY($1)",
499        )
500        .bind::<Array<Text>, _>(wallet_strings)
501        .get_results(&mut conn)
502        .await
503        .context("failed to load display usernames")?;
504
505        Ok(rows
506            .into_iter()
507            .map(|r| WalletUsernameRecord {
508                wallet_address: r.wallet_address,
509                username: r.username,
510            })
511            .collect())
512    }
513
514    async fn get_display_username(&self, wallet: &WalletAddress) -> Result<Option<String>> {
515        let mut conn = self.get_conn().await?;
516        let rows: Vec<UsernameRow> = diesel::sql_query(
517            "SELECT username
518             FROM usernames
519             WHERE LOWER(wallet_address) = LOWER($1)",
520        )
521        .bind::<Text, _>(format!("{:#x}", wallet.0))
522        .get_results(&mut conn)
523        .await
524        .context("failed to load display username")?;
525
526        Ok(rows.into_iter().next().map(|r| r.username))
527    }
528
529    async fn get_profile_image_url(&self, wallet: &WalletAddress) -> Result<Option<String>> {
530        let mut conn = self.get_conn().await?;
531        let rows: Vec<ProfileImageRow> = diesel::sql_query(
532            "SELECT profile_image_url
533             FROM user_profiles
534             WHERE wallet_address = $1",
535        )
536        .bind::<Bytea, _>(*wallet)
537        .get_results(&mut conn)
538        .await
539        .context("failed to load profile image URL")?;
540
541        Ok(rows.into_iter().next().and_then(|r| r.profile_image_url))
542    }
543
544    async fn compute_ledger_profile_stats(
545        &self,
546        wallet: &WalletAddress,
547        now_ts_ms: i64,
548    ) -> Result<WalletLedgerStats> {
549        let day_ago = now_ts_ms - 24 * 60 * 60 * 1000;
550        let mut conn = self.get_conn().await?;
551
552        let deposits_row: Vec<DecimalRow> = diesel::sql_query(
553            "SELECT COALESCE(SUM(delta), 0) AS value
554             FROM ledger_events
555             WHERE wallet = $1
556               AND event_type = 'deposit'",
557        )
558        .bind::<Bytea, _>(*wallet)
559        .get_results(&mut conn)
560        .await
561        .context("failed to compute deposits")?;
562
563        let withdrawals_row: Vec<DecimalRow> = diesel::sql_query(
564            "SELECT COALESCE(SUM(ABS(delta)), 0) AS value
565             FROM ledger_events
566             WHERE wallet = $1
567               AND event_type = 'withdraw'",
568        )
569        .bind::<Bytea, _>(*wallet)
570        .get_results(&mut conn)
571        .await
572        .context("failed to compute withdrawals")?;
573
574        let lifetime_realized_row: Vec<DecimalRow> = diesel::sql_query(
575            "SELECT COALESCE(SUM(delta), 0) AS value
576             FROM ledger_events
577             WHERE wallet = $1
578               AND event_type IN ('fill_realized_pnl', 'settlement_realized_pnl')",
579        )
580        .bind::<Bytea, _>(*wallet)
581        .get_results(&mut conn)
582        .await
583        .context("failed to compute lifetime realized pnl")?;
584
585        let pnl_24h_row: Vec<DecimalRow> = diesel::sql_query(
586            "SELECT COALESCE(SUM(delta), 0) AS value
587             FROM ledger_events
588             WHERE wallet = $1
589               AND event_type IN ('fill_realized_pnl', 'settlement_realized_pnl')
590               AND event_ts_ms >= $2",
591        )
592        .bind::<Bytea, _>(*wallet)
593        .bind::<BigInt, _>(day_ago)
594        .get_results(&mut conn)
595        .await
596        .context("failed to compute 24h realized pnl")?;
597
598        let zero = Decimal::ZERO;
599        Ok(WalletLedgerStats {
600            deposits: deposits_row
601                .into_iter()
602                .next()
603                .and_then(|r| r.value)
604                .unwrap_or(zero),
605            withdrawals: withdrawals_row
606                .into_iter()
607                .next()
608                .and_then(|r| r.value)
609                .unwrap_or(zero),
610            lifetime_realized_pnl: lifetime_realized_row
611                .into_iter()
612                .next()
613                .and_then(|r| r.value)
614                .unwrap_or(zero),
615            pnl_24h: pnl_24h_row
616                .into_iter()
617                .next()
618                .and_then(|r| r.value)
619                .unwrap_or(zero),
620        })
621    }
622
623    async fn get_account_first_seen_ts_ms(&self, wallet: &WalletAddress) -> Result<Option<i64>> {
624        let mut conn = self.get_conn().await?;
625        let rows: Vec<TsRow> = diesel::sql_query(
626            "SELECT MIN(event_ts_ms) AS value
627             FROM ledger_events
628             WHERE wallet = $1",
629        )
630        .bind::<Bytea, _>(*wallet)
631        .get_results(&mut conn)
632        .await
633        .context("failed to query account first seen timestamp")?;
634
635        Ok(rows.into_iter().next().and_then(|r| r.value))
636    }
637
638    async fn get_realized_pnl_by_symbol(
639        &self,
640        wallet: &WalletAddress,
641        window_start: Option<i64>,
642        window_end: Option<i64>,
643    ) -> Result<Vec<SymbolPnlRecord>> {
644        let mut conn = self.get_conn().await?;
645        let rows: Vec<SymbolPnlRow> = diesel::sql_query(
646            "SELECT reference_symbol, SUM(delta) AS total_pnl, COUNT(*) AS event_count
647             FROM ledger_events
648             WHERE wallet = $1
649               AND event_type IN ('fill_realized_pnl', 'settlement_realized_pnl')
650               AND reference_symbol IS NOT NULL
651               AND ($2::bigint IS NULL OR event_ts_ms >= $2)
652               AND ($3::bigint IS NULL OR event_ts_ms < $3)
653             GROUP BY reference_symbol
654             ORDER BY ABS(SUM(delta)) DESC",
655        )
656        .bind::<Bytea, _>(*wallet)
657        .bind::<Nullable<BigInt>, _>(window_start)
658        .bind::<Nullable<BigInt>, _>(window_end)
659        .get_results(&mut conn)
660        .await
661        .context("failed to query realized pnl by symbol")?;
662
663        Ok(rows
664            .into_iter()
665            .map(|r| SymbolPnlRecord {
666                symbol: r.reference_symbol,
667                realized_pnl: r.total_pnl,
668                event_count: r.event_count,
669            })
670            .collect())
671    }
672
673    async fn get_profile_trade_history(
674        &self,
675        wallet: &WalletAddress,
676        window_start: Option<i64>,
677        window_end: Option<i64>,
678        from_ts_ms: Option<i64>,
679        to_ts_ms: Option<i64>,
680        symbol: Option<&str>,
681        limit: i64,
682        offset: i64,
683    ) -> Result<Vec<ProfileFillRecord>> {
684        let mut conn = self.get_conn().await?;
685        let symbol_owned: Option<String> = symbol.map(|s| s.to_string());
686        let rows: Vec<ProfileFillDbRow> = diesel::sql_query(
687            "SELECT fill_id, trade_id, wallet_address, symbol, price, size, fee, is_taker,
688                    timestamp, created_at, builder_code_address, builder_code_fee, realized_pnl
689             FROM fills
690             WHERE wallet_address = $1
691               AND ($2::bigint IS NULL OR timestamp >= $2)
692               AND ($3::bigint IS NULL OR timestamp < $3)
693               AND ($4::bigint IS NULL OR timestamp >= $4)
694               AND ($5::bigint IS NULL OR timestamp <= $5)
695               AND ($8::text IS NULL OR symbol = $8)
696             ORDER BY timestamp DESC
697             LIMIT $6 OFFSET $7",
698        )
699        .bind::<Bytea, _>(*wallet)
700        .bind::<Nullable<BigInt>, _>(window_start)
701        .bind::<Nullable<BigInt>, _>(window_end)
702        .bind::<Nullable<BigInt>, _>(from_ts_ms)
703        .bind::<Nullable<BigInt>, _>(to_ts_ms)
704        .bind::<BigInt, _>(limit)
705        .bind::<BigInt, _>(offset)
706        .bind::<Nullable<Text>, _>(&symbol_owned)
707        .get_results(&mut conn)
708        .await
709        .context("failed to query profile trade history")?;
710
711        Ok(rows.into_iter().map(Into::into).collect())
712    }
713
714    async fn get_platform_wallet_metrics(&self) -> Result<Vec<PlatformWalletMetrics>> {
715        let mut conn = self.get_conn().await?;
716
717        // ROUND(..., 8) keeps aggregates inside rust_decimal::Decimal's ~29-digit range.
718        let realized_rows: Vec<RealizedRow> = diesel::sql_query(
719            "SELECT wallet, ROUND(COALESCE(SUM(delta), 0), 8) AS realized
720             FROM ledger_events
721             WHERE event_type IN ('fill_realized_pnl', 'settlement_realized_pnl')
722             GROUP BY wallet",
723        )
724        .get_results(&mut conn)
725        .await
726        .context("failed to load platform realized pnl rows")?;
727
728        let volume_rows: Vec<VolumeRow> = diesel::sql_query(
729            "SELECT wallet, ROUND(COALESCE(SUM(price * size), 0), 8) AS volume
730             FROM competition_fill_events
731             GROUP BY wallet",
732        )
733        .get_results(&mut conn)
734        .await
735        .context("failed to load platform volume rows")?;
736
737        let mut by_wallet: std::collections::HashMap<WalletAddress, PlatformWalletMetrics> =
738            std::collections::HashMap::new();
739        for row in realized_rows {
740            by_wallet.insert(
741                row.wallet,
742                PlatformWalletMetrics {
743                    wallet: row.wallet,
744                    realized: row.realized,
745                    volume: Decimal::ZERO,
746                },
747            );
748        }
749        for row in volume_rows {
750            by_wallet
751                .entry(row.wallet)
752                .and_modify(|entry| entry.volume = row.volume)
753                .or_insert_with(|| PlatformWalletMetrics {
754                    wallet: row.wallet,
755                    realized: Decimal::ZERO,
756                    volume: row.volume,
757                });
758        }
759
760        Ok(by_wallet.into_values().collect())
761    }
762}
763
764// =============================================================================
765// CompetitionWriter
766// =============================================================================
767
768#[async_trait::async_trait]
769impl CompetitionWriter for DieselDb {
770    async fn create_competition(
771        &self,
772        input: &CompetitionUpsertInput,
773    ) -> std::result::Result<CompetitionRecord, CompetitionWriteError> {
774        let mut conn = self
775            .get_conn()
776            .await
777            .map_err(CompetitionWriteError::Internal)?;
778        let row: CompetitionRow = diesel::sql_query(
779            "INSERT INTO competitions
780             (name, description, rules_url, rules_content, win_conditions, primary_win_condition, start_ts_ms, end_ts_ms)
781             VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
782             RETURNING id, name, description, rules_url, rules_content, win_conditions, primary_win_condition,
783                       start_ts_ms, end_ts_ms, created_at, updated_at",
784        )
785        .bind::<Text, _>(&input.name)
786        .bind::<Nullable<Text>, _>(&input.description)
787        .bind::<Nullable<Text>, _>(&input.rules_url)
788        .bind::<Nullable<Text>, _>(&input.rules_content)
789        .bind::<Array<Text>, _>(&input.win_conditions)
790        .bind::<Text, _>(&input.primary_win_condition)
791        .bind::<BigInt, _>(input.start_ts_ms)
792        .bind::<BigInt, _>(input.end_ts_ms)
793        .get_result(&mut conn)
794        .await
795        .map_err(|e| {
796            if is_overlap_violation(&e) {
797                CompetitionWriteError::OverlapViolation
798            } else {
799                CompetitionWriteError::Internal(anyhow::anyhow!(
800                    "failed to create competition: {}",
801                    e
802                ))
803            }
804        })?;
805
806        Ok(row.into())
807    }
808
809    async fn update_competition(
810        &self,
811        competition_id: i64,
812        name: &str,
813        description: Option<&str>,
814        rules_url: Option<&str>,
815        rules_content: Option<&str>,
816        win_conditions: &[String],
817        primary_win_condition: &str,
818        start_ts_ms: i64,
819        end_ts_ms: i64,
820    ) -> std::result::Result<CompetitionRecord, CompetitionWriteError> {
821        let mut conn = self
822            .get_conn()
823            .await
824            .map_err(CompetitionWriteError::Internal)?;
825        let desc_owned: Option<String> = description.map(|s| s.to_string());
826        let rules_url_owned: Option<String> = rules_url.map(|s| s.to_string());
827        let rules_content_owned: Option<String> = rules_content.map(|s| s.to_string());
828        let rows: Vec<CompetitionRow> = diesel::sql_query(
829            "UPDATE competitions
830             SET name = $1,
831                 description = $2,
832                 rules_url = $3,
833                 rules_content = $4,
834                 win_conditions = $5,
835                 primary_win_condition = $6,
836                 start_ts_ms = $7,
837                 end_ts_ms = $8,
838                 updated_at = NOW()
839             WHERE id = $9
840             RETURNING id, name, description, rules_url, rules_content, win_conditions, primary_win_condition,
841                       start_ts_ms, end_ts_ms, created_at, updated_at",
842        )
843        .bind::<Text, _>(name)
844        .bind::<Nullable<Text>, _>(&desc_owned)
845        .bind::<Nullable<Text>, _>(&rules_url_owned)
846        .bind::<Nullable<Text>, _>(&rules_content_owned)
847        .bind::<Array<Text>, _>(win_conditions)
848        .bind::<Text, _>(primary_win_condition)
849        .bind::<BigInt, _>(start_ts_ms)
850        .bind::<BigInt, _>(end_ts_ms)
851        .bind::<BigInt, _>(competition_id)
852        .get_results(&mut conn)
853        .await
854        .map_err(|e| {
855            if is_overlap_violation(&e) {
856                CompetitionWriteError::OverlapViolation
857            } else {
858                CompetitionWriteError::Internal(anyhow::anyhow!(
859                    "failed to update competition: {}",
860                    e
861                ))
862            }
863        })?;
864
865        rows.into_iter().next().map(Into::into).ok_or_else(|| {
866            CompetitionWriteError::NotFound(format!("competition {}", competition_id))
867        })
868    }
869
870    async fn delete_competition(&self, competition_id: i64) -> Result<usize> {
871        let mut conn = self.get_conn().await?;
872        let result = diesel::sql_query("DELETE FROM competitions WHERE id = $1")
873            .bind::<BigInt, _>(competition_id)
874            .execute(&mut conn)
875            .await
876            .context("failed to delete competition")?;
877
878        Ok(result)
879    }
880
881    async fn record_competition_fill(&self, input: &CompetitionFillInput) -> Result<()> {
882        let mut conn = self.get_conn().await?;
883        diesel::sql_query(
884            "INSERT INTO competition_fill_events
885             (trade_id, wallet, symbol, side, price, size, fee, timestamp_ms)
886             VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
887             ON CONFLICT (trade_id, wallet) DO NOTHING",
888        )
889        .bind::<BigInt, _>(input.trade_id)
890        .bind::<Bytea, _>(input.wallet)
891        .bind::<Text, _>(&input.symbol)
892        .bind::<Text, _>(&input.side)
893        .bind::<Numeric, _>(input.price)
894        .bind::<Numeric, _>(input.size)
895        .bind::<Numeric, _>(input.fee)
896        .bind::<BigInt, _>(input.timestamp_ms)
897        .execute(&mut conn)
898        .await
899        .context("failed to insert competition fill event")?;
900
901        Ok(())
902    }
903
904    async fn finalize_competition(
905        &self,
906        competition_id: i64,
907        stats: &[CompetitionFinalStatsInput],
908    ) -> Result<usize> {
909        let mut conn = self.get_conn().await?;
910        let stats_owned: Vec<CompetitionFinalStatsInput> = stats.to_vec();
911
912        let inserted_count: usize = conn
913            .transaction(async |conn| {
914                diesel::sql_query("SELECT pg_advisory_xact_lock($1)")
915                    .bind::<BigInt, _>(competition_id)
916                    .execute(&mut *conn)
917                    .await?;
918
919                let already_finalized: Vec<ExistsRow> = diesel::sql_query(
920                    "SELECT 1 AS one FROM competition_final_stats WHERE competition_id = $1 LIMIT 1",
921                )
922                .bind::<BigInt, _>(competition_id)
923                .get_results(&mut *conn)
924                .await?;
925
926                if !already_finalized.is_empty() {
927                    return Ok(0);
928                }
929
930                let mut count = 0_usize;
931                for row in &stats_owned {
932                    let result = diesel::sql_query(
933                        "INSERT INTO competition_final_stats
934                         (competition_id, wallet, rank, pnl, volume, efficiency, medal, finalized_at)
935                         VALUES ($1, $2, $3, $4, $5, $6, $7, NOW())
936                         ON CONFLICT (competition_id, wallet) DO NOTHING",
937                    )
938                    .bind::<BigInt, _>(competition_id)
939                    .bind::<Bytea, _>(row.wallet)
940                    .bind::<Integer, _>(row.rank)
941                    .bind::<Numeric, _>(row.pnl)
942                    .bind::<Numeric, _>(row.volume)
943                    .bind::<Numeric, _>(row.efficiency)
944                    .bind::<Nullable<Integer>, _>(row.medal)
945                    .execute(&mut *conn)
946                    .await?;
947
948                    count += result;
949                }
950
951                Ok::<_, diesel::result::Error>(count)
952            })
953            .await
954            .context("failed to commit finalization tx")?;
955
956        Ok(inserted_count)
957    }
958
959    async fn set_profile_image_url(
960        &self,
961        wallet: &WalletAddress,
962        profile_image_url: &str,
963    ) -> Result<Option<String>> {
964        let mut conn = self.get_conn().await?;
965        let previous_rows: Vec<ProfileImageRow> = diesel::sql_query(
966            "WITH previous AS (
967                 SELECT profile_image_url
968                 FROM user_profiles
969                 WHERE wallet_address = $1
970             ),
971             upsert AS (
972                 INSERT INTO user_profiles (
973                     wallet_address,
974                     profile_image_url,
975                     profile_image_updated_at
976                 )
977                 VALUES ($1, $2, NOW())
978                 ON CONFLICT (wallet_address)
979                 DO UPDATE SET
980                     profile_image_url = EXCLUDED.profile_image_url,
981                     profile_image_updated_at = EXCLUDED.profile_image_updated_at,
982                     updated_at = NOW()
983                 RETURNING 1
984             )
985             SELECT profile_image_url FROM previous",
986        )
987        .bind::<Bytea, _>(*wallet)
988        .bind::<Text, _>(profile_image_url)
989        .get_results(&mut conn)
990        .await
991        .context("failed to set profile image URL")?;
992
993        Ok(previous_rows
994            .into_iter()
995            .next()
996            .and_then(|r| r.profile_image_url))
997    }
998}
999
1000#[cfg(test)]
1001mod tests {
1002    use crate::test_helpers::TestDb;
1003    use hypercall_db::*;
1004    use hypercall_types::wallet_address::test_wallet;
1005    use rust_decimal_macros::dec;
1006
1007    fn make_competition_input(name: &str, start: i64, end: i64) -> CompetitionUpsertInput {
1008        CompetitionUpsertInput {
1009            name: name.to_string(),
1010            description: Some("Test competition".to_string()),
1011            rules_url: None,
1012            rules_content: None,
1013            win_conditions: vec!["pnl".to_string(), "volume".to_string()],
1014            primary_win_condition: "pnl".to_string(),
1015            start_ts_ms: start,
1016            end_ts_ms: end,
1017        }
1018    }
1019
1020    #[tokio::test]
1021    async fn create_and_list_competitions() {
1022        let test_db = TestDb::new().await.unwrap();
1023        let db = test_db.diesel_db().await;
1024
1025        let input = make_competition_input("Season 1", 1_000_000, 2_000_000);
1026        let created = db.create_competition(&input).await.unwrap();
1027
1028        assert_eq!(created.name, "Season 1");
1029        assert_eq!(created.start_ts_ms, 1_000_000);
1030        assert_eq!(created.end_ts_ms, 2_000_000);
1031        assert_eq!(created.primary_win_condition, "pnl");
1032        assert_eq!(created.win_conditions, vec!["pnl", "volume"]);
1033        assert_eq!(created.description, Some("Test competition".to_string()));
1034
1035        // List all competitions
1036        let all = db
1037            .list_competitions(None, None, None, 1_500_000, 100, 0)
1038            .await
1039            .unwrap();
1040        assert_eq!(all.len(), 1);
1041        assert_eq!(all[0].id, created.id);
1042    }
1043
1044    #[tokio::test]
1045    async fn get_competition_by_id_after_create() {
1046        let test_db = TestDb::new().await.unwrap();
1047        let db = test_db.diesel_db().await;
1048
1049        let input = make_competition_input("Season 2", 3_000_000, 4_000_000);
1050        let created = db.create_competition(&input).await.unwrap();
1051
1052        let fetched = db.get_competition_by_id(created.id).await.unwrap().unwrap();
1053        assert_eq!(fetched.name, "Season 2");
1054        assert_eq!(fetched.id, created.id);
1055
1056        // Non-existent ID
1057        let missing = db.get_competition_by_id(99999).await.unwrap();
1058        assert!(missing.is_none());
1059    }
1060
1061    #[tokio::test]
1062    async fn delete_competition_removes_it() {
1063        let test_db = TestDb::new().await.unwrap();
1064        let db = test_db.diesel_db().await;
1065
1066        let input = make_competition_input("Ephemeral", 5_000_000, 6_000_000);
1067        let created = db.create_competition(&input).await.unwrap();
1068
1069        let deleted = db.delete_competition(created.id).await.unwrap();
1070        assert_eq!(deleted, 1);
1071
1072        let fetched = db.get_competition_by_id(created.id).await.unwrap();
1073        assert!(fetched.is_none());
1074
1075        // Deleting non-existent returns 0
1076        let deleted_again = db.delete_competition(created.id).await.unwrap();
1077        assert_eq!(deleted_again, 0);
1078    }
1079
1080    #[tokio::test]
1081    async fn finalize_competition_inserts_stats() {
1082        let test_db = TestDb::new().await.unwrap();
1083        let db = test_db.diesel_db().await;
1084
1085        let input = make_competition_input("Finals", 1_000, 2_000);
1086        let created = db.create_competition(&input).await.unwrap();
1087
1088        let stats = vec![
1089            CompetitionFinalStatsInput {
1090                competition_id: created.id,
1091                wallet: test_wallet(1),
1092                rank: 1,
1093                pnl: dec!(5000),
1094                volume: dec!(100000),
1095                efficiency: dec!(0.05),
1096                medal: Some(1),
1097            },
1098            CompetitionFinalStatsInput {
1099                competition_id: created.id,
1100                wallet: test_wallet(2),
1101                rank: 2,
1102                pnl: dec!(3000),
1103                volume: dec!(80000),
1104                efficiency: dec!(0.0375),
1105                medal: Some(2),
1106            },
1107        ];
1108
1109        let inserted = db.finalize_competition(created.id, &stats).await.unwrap();
1110        assert_eq!(inserted, 2);
1111
1112        // Get finalized stats
1113        let finalized = db.get_finalized_stats(created.id).await.unwrap();
1114        assert_eq!(finalized.len(), 2);
1115        assert_eq!(finalized[0].rank, 1);
1116        assert_eq!(finalized[0].wallet, test_wallet(1));
1117        assert_eq!(finalized[0].pnl, dec!(5000));
1118        assert_eq!(finalized[1].rank, 2);
1119        assert_eq!(finalized[1].wallet, test_wallet(2));
1120
1121        // Second finalize is idempotent (returns 0)
1122        let inserted_again = db.finalize_competition(created.id, &stats).await.unwrap();
1123        assert_eq!(inserted_again, 0);
1124    }
1125
1126    #[tokio::test]
1127    async fn get_active_competition_returns_current() {
1128        let test_db = TestDb::new().await.unwrap();
1129        let db = test_db.diesel_db().await;
1130
1131        // Create a competition that spans now_ts_ms = 1500
1132        let input = make_competition_input("Active One", 1000, 2000);
1133        let created = db.create_competition(&input).await.unwrap();
1134
1135        let active = db.get_active_competition(1500).await.unwrap();
1136        assert!(active.is_some());
1137        assert_eq!(active.unwrap().id, created.id);
1138
1139        // Before start: no active competition
1140        let before = db.get_active_competition(500).await.unwrap();
1141        assert!(before.is_none());
1142
1143        // After end: no active competition
1144        let after = db.get_active_competition(2500).await.unwrap();
1145        assert!(after.is_none());
1146    }
1147
1148    #[tokio::test]
1149    async fn get_competitions_to_finalize_finds_unfinalized() {
1150        let test_db = TestDb::new().await.unwrap();
1151        let db = test_db.diesel_db().await;
1152
1153        // Create a competition that has ended
1154        let input = make_competition_input("Ended", 1000, 2000);
1155        let created = db.create_competition(&input).await.unwrap();
1156
1157        // Should appear in to-finalize list at now=3000
1158        let to_finalize = db.get_competitions_to_finalize(3000).await.unwrap();
1159        assert_eq!(to_finalize.len(), 1);
1160        assert_eq!(to_finalize[0].id, created.id);
1161
1162        // After finalization, it should no longer appear
1163        let stats = vec![CompetitionFinalStatsInput {
1164            competition_id: created.id,
1165            wallet: test_wallet(1),
1166            rank: 1,
1167            pnl: dec!(1000),
1168            volume: dec!(50000),
1169            efficiency: dec!(0.02),
1170            medal: Some(1),
1171        }];
1172        db.finalize_competition(created.id, &stats).await.unwrap();
1173
1174        let to_finalize_after = db.get_competitions_to_finalize(3000).await.unwrap();
1175        assert!(to_finalize_after.is_empty());
1176    }
1177
1178    #[tokio::test]
1179    async fn record_competition_fill_is_idempotent() {
1180        let test_db = TestDb::new().await.unwrap();
1181        let db = test_db.diesel_db().await;
1182
1183        let fill_input = CompetitionFillInput {
1184            trade_id: 999,
1185            wallet: test_wallet(60),
1186            symbol: "BTC-20260131-100000-C".to_string(),
1187            side: "Buy".to_string(),
1188            price: dec!(5.0),
1189            size: dec!(1000000),
1190            fee: dec!(0.01),
1191            timestamp_ms: 1_500_000,
1192        };
1193
1194        // First insert
1195        db.record_competition_fill(&fill_input).await.unwrap();
1196
1197        // Duplicate insert should not fail (ON CONFLICT DO NOTHING)
1198        db.record_competition_fill(&fill_input).await.unwrap();
1199
1200        // Verify only one row via get_competition_fills_before
1201        let fills = db.get_competition_fills_before(2_000_000).await.unwrap();
1202        assert_eq!(fills.len(), 1);
1203        assert_eq!(fills[0].wallet, test_wallet(60));
1204        assert_eq!(fills[0].symbol, "BTC-20260131-100000-C");
1205        assert_eq!(fills[0].price, dec!(5.0));
1206    }
1207
1208    #[tokio::test]
1209    async fn set_profile_image_url_roundtrip() {
1210        let test_db = TestDb::new().await.unwrap();
1211        let db = test_db.diesel_db().await;
1212
1213        let wallet = test_wallet(70);
1214
1215        // First set: no previous URL
1216        let prev = db
1217            .set_profile_image_url(&wallet, "https://example.com/img1.png")
1218            .await
1219            .unwrap();
1220        assert!(prev.is_none());
1221
1222        // Second set: returns previous URL
1223        let prev2 = db
1224            .set_profile_image_url(&wallet, "https://example.com/img2.png")
1225            .await
1226            .unwrap();
1227        assert_eq!(prev2, Some("https://example.com/img1.png".to_string()));
1228
1229        // Read back via reader
1230        let url = db.get_profile_image_url(&wallet).await.unwrap();
1231        assert_eq!(url, Some("https://example.com/img2.png".to_string()));
1232    }
1233}