Skip to main content

hypercall_competition/
service.rs

1use anyhow::anyhow;
2
3use crate::{CompetitionError, Result};
4use hypercall_db::{
5    CompetitionFillInput, CompetitionFinalStatsInput, CompetitionRecord, CompetitionUpdateInput,
6    CompetitionUpsertInput, CompetitionWriteError, CompetitionWriter, ProfileFillRecord,
7    WalletUsernameRecord,
8};
9use hypercall_runtime_api::boundary::market_inputs::GreeksCacheReader;
10use hypercall_runtime_api::valuation::get_symbol_theoretical_price;
11use hypercall_types::ws_protocol::{
12    WsCompetitionFinalStats, WsCompetitionPnlStanding, WsCompetitionPnlSummary, WsCompetitionUpdate,
13};
14
15use crate::{competition_state, CompetitionState};
16use hypercall_types::{to_human_readable_decimal, WalletAddress};
17use rust_decimal::Decimal;
18use rust_decimal_macros::dec;
19use std::cmp::Ordering;
20use std::collections::{BTreeSet, HashMap, VecDeque};
21use std::str::FromStr;
22use std::sync::Arc;
23
24#[derive(Debug, Clone, Copy)]
25pub enum LeaderboardSortBy {
26    Pnl,
27    Volume,
28    Efficiency,
29}
30
31impl LeaderboardSortBy {
32    pub fn parse(input: &str) -> Option<Self> {
33        match input {
34            "pnl" => Some(Self::Pnl),
35            "volume" => Some(Self::Volume),
36            "efficiency" => Some(Self::Efficiency),
37            _ => None,
38        }
39    }
40}
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq)]
43pub enum CompetitionWinCondition {
44    Pnl,
45    Volume,
46    Efficiency,
47}
48
49impl CompetitionWinCondition {
50    fn parse(input: &str) -> Option<Self> {
51        match input {
52            "pnl" => Some(Self::Pnl),
53            "volume" => Some(Self::Volume),
54            "efficiency" => Some(Self::Efficiency),
55            _ => None,
56        }
57    }
58
59    fn sort_by(self) -> LeaderboardSortBy {
60        match self {
61            Self::Pnl => LeaderboardSortBy::Pnl,
62            Self::Volume => LeaderboardSortBy::Volume,
63            Self::Efficiency => LeaderboardSortBy::Efficiency,
64        }
65    }
66}
67
68#[derive(Debug, Clone, Copy)]
69pub enum SortOrder {
70    Asc,
71    Desc,
72}
73
74impl SortOrder {
75    pub fn parse(input: &str) -> Option<Self> {
76        match input {
77            "asc" => Some(Self::Asc),
78            "desc" => Some(Self::Desc),
79            _ => None,
80        }
81    }
82}
83
84#[derive(Debug, Clone)]
85pub struct LeaderboardQuery {
86    pub competition_id: i64,
87    pub sort_by: LeaderboardSortBy,
88    pub sort_order: SortOrder,
89    pub limit: usize,
90    pub offset: usize,
91    pub wallet: Option<WalletAddress>,
92}
93
94#[derive(Debug, Clone)]
95pub struct LeaderboardEntry {
96    pub rank: usize,
97    pub wallet: WalletAddress,
98    pub username: String,
99    pub pnl: Decimal,
100    pub volume: Decimal,
101    pub efficiency: Decimal,
102    pub medal: Option<u8>,
103}
104
105#[derive(Debug, Clone)]
106pub struct LeaderboardResult {
107    pub rows: Vec<LeaderboardEntry>,
108    pub connected_user: Option<LeaderboardEntry>,
109}
110
111#[derive(Debug, Clone)]
112pub struct RealizedPnlBySymbol {
113    pub symbol: String,
114    pub realized_pnl: Decimal,
115    pub event_count: i64,
116}
117
118#[derive(Debug, Clone, Copy, Default)]
119pub struct PlatformMetricMedals {
120    pub pnl: Option<u8>,
121    pub volume: Option<u8>,
122    pub efficiency: Option<u8>,
123}
124
125#[derive(Debug, Clone)]
126pub struct CompetitionEngagementSnapshot {
127    pub wallet: WalletAddress,
128    pub competition_id: i64,
129    pub rank: i64,
130    pub pnl: Decimal,
131    pub volume: Decimal,
132    pub efficiency: Decimal,
133    pub next_rank: Option<i64>,
134    pub gap_metric_value: Option<Decimal>,
135}
136
137#[derive(Debug, Clone)]
138struct WalletStats {
139    realized: Decimal,
140    unrealized: Decimal,
141    fees: Decimal,
142    volume: Decimal,
143}
144
145impl WalletStats {
146    fn pnl(&self) -> Decimal {
147        self.realized + self.unrealized - self.fees
148    }
149
150    fn efficiency(&self) -> Decimal {
151        if self.volume == dec!(0) {
152            dec!(0)
153        } else {
154            self.pnl() / self.volume
155        }
156    }
157}
158
159#[derive(Debug, Clone)]
160struct Lot {
161    qty: Decimal,
162    price: Decimal,
163    opened_in_window: bool,
164}
165
166#[derive(Debug, Clone, Default)]
167struct SymbolLots {
168    long_lots: VecDeque<Lot>,
169    short_lots: VecDeque<Lot>,
170    realized: Decimal,
171    fees: Decimal,
172    volume: Decimal,
173}
174
175impl SymbolLots {
176    fn apply_fill(
177        &mut self,
178        side: &str,
179        price: Decimal,
180        size: Decimal,
181        fee: Decimal,
182        in_window: bool,
183    ) -> Result<()> {
184        if size <= dec!(0) {
185            return Err(CompetitionError::BadRequest(
186                "fill size must be positive".to_string(),
187            ));
188        }
189
190        if in_window {
191            self.fees += fee;
192            self.volume += price * size;
193        }
194
195        match side {
196            "Buy" => self.apply_buy(price, size, in_window),
197            "Sell" => self.apply_sell(price, size, in_window),
198            other => panic!(
199                "invariant violated: invalid side '{}' in competition_fill_events",
200                other
201            ),
202        }
203    }
204
205    fn apply_buy(&mut self, price: Decimal, size: Decimal, in_window: bool) -> Result<()> {
206        let mut remaining = size;
207
208        while remaining > dec!(0) {
209            let Some(front) = self.short_lots.front_mut() else {
210                break;
211            };
212
213            let matched = remaining.min(front.qty);
214            if in_window {
215                self.realized += (front.price - price) * matched;
216            }
217            front.qty -= matched;
218            remaining -= matched;
219
220            if front.qty == dec!(0) {
221                self.short_lots.pop_front();
222            }
223        }
224
225        if remaining > dec!(0) {
226            self.long_lots.push_back(Lot {
227                qty: remaining,
228                price,
229                opened_in_window: in_window,
230            });
231        }
232
233        Ok(())
234    }
235
236    fn apply_sell(&mut self, price: Decimal, size: Decimal, in_window: bool) -> Result<()> {
237        let mut remaining = size;
238
239        while remaining > dec!(0) {
240            let Some(front) = self.long_lots.front_mut() else {
241                break;
242            };
243
244            let matched = remaining.min(front.qty);
245            if in_window {
246                self.realized += (price - front.price) * matched;
247            }
248            front.qty -= matched;
249            remaining -= matched;
250
251            if front.qty == dec!(0) {
252                self.long_lots.pop_front();
253            }
254        }
255
256        if remaining > dec!(0) {
257            self.short_lots.push_back(Lot {
258                qty: remaining,
259                price,
260                opened_in_window: in_window,
261            });
262        }
263
264        Ok(())
265    }
266
267    fn has_in_window_lots(&self) -> bool {
268        self.long_lots.iter().any(|lot| lot.opened_in_window)
269            || self.short_lots.iter().any(|lot| lot.opened_in_window)
270    }
271
272    fn unrealized_with_mark(&self, mark: Decimal) -> Decimal {
273        let mut total = dec!(0);
274        for lot in &self.long_lots {
275            if lot.opened_in_window {
276                total += (mark - lot.price) * lot.qty;
277            }
278        }
279        for lot in &self.short_lots {
280            if lot.opened_in_window {
281                total += (lot.price - mark) * lot.qty;
282            }
283        }
284        total
285    }
286}
287
288#[derive(Clone)]
289pub struct CompetitionService {
290    db: Arc<dyn CompetitionWriter>,
291    greeks_cache: Arc<dyn GreeksCacheReader>,
292}
293
294impl CompetitionService {
295    pub fn new(db: Arc<dyn CompetitionWriter>, greeks_cache: Arc<dyn GreeksCacheReader>) -> Self {
296        Self { db, greeks_cache }
297    }
298
299    pub async fn list_competitions(
300        &self,
301        state: Option<CompetitionState>,
302        from_ts_ms: Option<i64>,
303        to_ts_ms: Option<i64>,
304        limit: usize,
305        offset: usize,
306        now_ts_ms: i64,
307    ) -> Result<Vec<CompetitionRecord>> {
308        let state_str: Option<&str> = state.map(|s| s.as_str());
309        Ok(self
310            .db
311            .list_competitions(
312                state_str,
313                from_ts_ms,
314                to_ts_ms,
315                now_ts_ms,
316                limit as i64,
317                offset as i64,
318            )
319            .await?)
320    }
321
322    pub async fn get_competition(&self, competition_id: i64) -> Result<CompetitionRecord> {
323        self.db
324            .get_competition_by_id(competition_id)
325            .await?
326            .ok_or_else(|| CompetitionError::NotFound(format!("competition {}", competition_id)))
327    }
328
329    pub async fn create_competition(
330        &self,
331        input: CompetitionUpsertInput,
332    ) -> Result<CompetitionRecord> {
333        Self::validate_competition_window(input.start_ts_ms, input.end_ts_ms)?;
334        Self::validate_win_conditions(&input.win_conditions, &input.primary_win_condition)?;
335
336        self.db
337            .create_competition(&input)
338            .await
339            .map_err(|e| match e {
340                CompetitionWriteError::OverlapViolation => CompetitionError::Conflict(
341                    "competition window overlaps existing competition".to_string(),
342                ),
343                CompetitionWriteError::Internal(err) => {
344                    CompetitionError::Internal(anyhow!("failed to create competition: {}", err))
345                }
346                other => CompetitionError::Internal(anyhow!("{}", other)),
347            })
348    }
349
350    pub async fn update_competition(
351        &self,
352        competition_id: i64,
353        input: CompetitionUpdateInput,
354        now_ts_ms: i64,
355    ) -> Result<CompetitionRecord> {
356        let current = self.get_competition(competition_id).await?;
357        if competition_state(&current, now_ts_ms) == CompetitionState::Post {
358            return Err(CompetitionError::BadRequest(
359                "post competitions are immutable".to_string(),
360            ));
361        }
362
363        let name = input.name.unwrap_or(current.name);
364        let description = input.description.unwrap_or(current.description);
365        let rules_url = input.rules_url.unwrap_or(current.rules_url);
366        let rules_content = input.rules_content.unwrap_or(current.rules_content);
367        let win_conditions = input.win_conditions.unwrap_or(current.win_conditions);
368        let primary_win_condition = input
369            .primary_win_condition
370            .unwrap_or(current.primary_win_condition);
371        let start_ts_ms = input.start_ts_ms.unwrap_or(current.start_ts_ms);
372        let end_ts_ms = input.end_ts_ms.unwrap_or(current.end_ts_ms);
373        Self::validate_competition_window(start_ts_ms, end_ts_ms)?;
374        Self::validate_win_conditions(&win_conditions, &primary_win_condition)?;
375
376        self.db
377            .update_competition(
378                competition_id,
379                &name,
380                description.as_deref(),
381                rules_url.as_deref(),
382                rules_content.as_deref(),
383                &win_conditions,
384                &primary_win_condition,
385                start_ts_ms,
386                end_ts_ms,
387            )
388            .await
389            .map_err(|e| match e {
390                CompetitionWriteError::OverlapViolation => CompetitionError::Conflict(
391                    "competition window overlaps existing competition".to_string(),
392                ),
393                CompetitionWriteError::NotFound(msg) => CompetitionError::NotFound(msg),
394                CompetitionWriteError::Internal(err) => {
395                    CompetitionError::Internal(anyhow!("failed to update competition: {}", err))
396                }
397                other => CompetitionError::Internal(anyhow!("{}", other)),
398            })
399    }
400
401    pub async fn delete_competition(&self, competition_id: i64, now_ts_ms: i64) -> Result<()> {
402        let competition = self.get_competition(competition_id).await?;
403        if competition_state(&competition, now_ts_ms) != CompetitionState::Pre {
404            return Err(CompetitionError::BadRequest(
405                "only pre competitions can be deleted".to_string(),
406            ));
407        }
408
409        let result = self.db.delete_competition(competition_id).await?;
410        if result == 0 {
411            return Err(CompetitionError::NotFound(format!(
412                "competition {}",
413                competition_id
414            )));
415        }
416
417        Ok(())
418    }
419
420    pub async fn record_fill(&self, fill: &hypercall_types::Fill) -> Result<()> {
421        let taker_size_human = to_human_readable_decimal(&fill.symbol, fill.size);
422        let maker_side = match fill.taker_side {
423            hypercall_types::Side::Buy => "Sell",
424            hypercall_types::Side::Sell => "Buy",
425        };
426
427        self.db
428            .record_competition_fill(&CompetitionFillInput {
429                trade_id: fill.trade_id as i64,
430                wallet: fill.taker_wallet_address,
431                symbol: fill.symbol.clone(),
432                side: format!("{:?}", fill.taker_side),
433                price: fill.price,
434                size: taker_size_human,
435                fee: fill.fee,
436                timestamp_ms: fill.timestamp as i64,
437            })
438            .await?;
439
440        self.db
441            .record_competition_fill(&CompetitionFillInput {
442                trade_id: fill.trade_id as i64,
443                wallet: fill.maker_wallet_address,
444                symbol: fill.symbol.clone(),
445                side: maker_side.to_string(),
446                price: fill.price,
447                size: taker_size_human,
448                fee: dec!(0),
449                timestamp_ms: fill.timestamp as i64,
450            })
451            .await?;
452
453        Ok(())
454    }
455
456    pub async fn get_leaderboard(
457        &self,
458        query: LeaderboardQuery,
459        now_ts_ms: i64,
460    ) -> Result<LeaderboardResult> {
461        let competition = self.get_competition(query.competition_id).await?;
462        let state = competition_state(&competition, now_ts_ms);
463        let mut used_frozen_rows = false;
464        let cutoff_ts_ms = now_ts_ms.min(competition.end_ts_ms);
465
466        let mut rows = if state == CompetitionState::Pre {
467            Vec::new()
468        } else if state == CompetitionState::Post {
469            let finalized = self.get_finalized_leaderboard(query.competition_id).await?;
470            if finalized.is_empty() {
471                self.compute_live_leaderboard(
472                    &competition,
473                    query.sort_by,
474                    query.sort_order,
475                    true,
476                    competition.end_ts_ms,
477                )
478                .await?
479            } else {
480                used_frozen_rows = true;
481                finalized
482            }
483        } else {
484            self.compute_live_leaderboard(
485                &competition,
486                query.sort_by,
487                query.sort_order,
488                false,
489                cutoff_ts_ms,
490            )
491            .await?
492        };
493
494        if used_frozen_rows {
495            Self::sort_entries(&mut rows, query.sort_by, query.sort_order);
496            for (idx, row) in rows.iter_mut().enumerate() {
497                row.rank = idx + 1;
498                row.medal = match row.rank {
499                    1 => Some(1),
500                    2 => Some(2),
501                    3 => Some(3),
502                    _ => None,
503                };
504            }
505        }
506
507        let connected_user = query
508            .wallet
509            .and_then(|wallet| rows.iter().find(|r| r.wallet == wallet).cloned());
510
511        rows = rows
512            .into_iter()
513            .skip(query.offset)
514            .take(query.limit)
515            .collect();
516
517        Ok(LeaderboardResult {
518            rows,
519            connected_user,
520        })
521    }
522
523    pub async fn get_realized_pnl_by_symbol(
524        &self,
525        wallet: WalletAddress,
526        competition_id: Option<i64>,
527    ) -> Result<Vec<RealizedPnlBySymbol>> {
528        let (window_start, window_end) = if let Some(comp_id) = competition_id {
529            let comp = self.get_competition(comp_id).await?;
530            (Some(comp.start_ts_ms), Some(comp.end_ts_ms))
531        } else {
532            (None, None)
533        };
534
535        let records = self
536            .db
537            .get_realized_pnl_by_symbol(&wallet, window_start, window_end)
538            .await?;
539
540        Ok(records
541            .into_iter()
542            .map(|r| RealizedPnlBySymbol {
543                symbol: r.symbol,
544                realized_pnl: r.realized_pnl,
545                event_count: r.event_count,
546            })
547            .collect())
548    }
549
550    pub async fn build_competition_update_for_wallet(
551        &self,
552        wallet: WalletAddress,
553        now_ts_ms: i64,
554    ) -> Result<Option<WsCompetitionUpdate>> {
555        let active = self.db.get_active_competition(now_ts_ms).await?;
556        let Some(competition) = active else {
557            return Ok(None);
558        };
559        let primary_sort = CompetitionWinCondition::parse(&competition.primary_win_condition)
560            .ok_or_else(|| {
561                anyhow!(
562                    "invalid competition primary_win_condition {}",
563                    competition.primary_win_condition
564                )
565            })?
566            .sort_by();
567
568        let result = self
569            .get_leaderboard(
570                LeaderboardQuery {
571                    competition_id: competition.id,
572                    sort_by: primary_sort,
573                    sort_order: SortOrder::Desc,
574                    limit: 1,
575                    offset: 0,
576                    wallet: Some(wallet),
577                },
578                now_ts_ms,
579            )
580            .await?;
581
582        let Some(me) = result.connected_user else {
583            return Ok(None);
584        };
585
586        Ok(Some(WsCompetitionUpdate {
587            wallet_address: wallet,
588            competition_id: competition.id,
589            rank: me.rank as i64,
590            pnl: me.pnl,
591            volume: me.volume,
592            efficiency: me.efficiency,
593            timestamp: now_ts_ms,
594        }))
595    }
596
597    pub async fn build_competition_engagement_snapshot_for_wallet(
598        &self,
599        wallet: WalletAddress,
600        now_ts_ms: i64,
601    ) -> Result<Option<CompetitionEngagementSnapshot>> {
602        let Some(competition) = self.db.get_active_competition(now_ts_ms).await? else {
603            return Ok(None);
604        };
605
606        let primary = CompetitionWinCondition::parse(&competition.primary_win_condition)
607            .ok_or_else(|| {
608                anyhow!(
609                    "invalid competition primary_win_condition {}",
610                    competition.primary_win_condition
611                )
612            })?;
613
614        let cutoff_ts_ms = now_ts_ms.min(competition.end_ts_ms);
615        let rows = self
616            .compute_live_leaderboard(
617                &competition,
618                primary.sort_by(),
619                SortOrder::Desc,
620                false,
621                cutoff_ts_ms,
622            )
623            .await?;
624        let Some((idx, me)) = rows
625            .iter()
626            .enumerate()
627            .find(|(_, row)| row.wallet == wallet)
628        else {
629            return Ok(None);
630        };
631
632        let gap_metric_value = if idx == 0 {
633            None
634        } else {
635            let ahead = &rows[idx - 1];
636            let my_metric = Self::metric_value(me, primary.sort_by());
637            let ahead_metric = Self::metric_value(ahead, primary.sort_by());
638            Some(ahead_metric - my_metric)
639        };
640
641        Ok(Some(CompetitionEngagementSnapshot {
642            wallet,
643            competition_id: competition.id,
644            rank: me.rank as i64,
645            pnl: me.pnl,
646            volume: me.volume,
647            efficiency: me.efficiency,
648            next_rank: if idx == 0 {
649                None
650            } else {
651                Some(rows[idx - 1].rank as i64)
652            },
653            gap_metric_value,
654        }))
655    }
656
657    pub async fn build_competition_pnl_summary_for_wallet(
658        &self,
659        wallet: WalletAddress,
660        now_ts_ms: i64,
661    ) -> Result<WsCompetitionPnlSummary> {
662        let ledger_stats = self
663            .db
664            .compute_ledger_profile_stats(&wallet, now_ts_ms)
665            .await?;
666        let lifetime_realized_pnl = ledger_stats.lifetime_realized_pnl;
667
668        let active_competition = match self.db.get_active_competition(now_ts_ms).await? {
669            Some(competition) => {
670                let primary_sort =
671                    CompetitionWinCondition::parse(&competition.primary_win_condition)
672                        .ok_or_else(|| {
673                            anyhow!(
674                                "invalid competition primary_win_condition {}",
675                                competition.primary_win_condition
676                            )
677                        })?
678                        .sort_by();
679
680                let result = self
681                    .get_leaderboard(
682                        LeaderboardQuery {
683                            competition_id: competition.id,
684                            sort_by: primary_sort,
685                            sort_order: SortOrder::Desc,
686                            limit: 1,
687                            offset: 0,
688                            wallet: Some(wallet),
689                        },
690                        now_ts_ms,
691                    )
692                    .await?;
693
694                Some(match result.connected_user {
695                    Some(me) => WsCompetitionPnlStanding {
696                        competition_id: competition.id,
697                        competition_name: competition.name.clone(),
698                        competition_state: competition_state(&competition, now_ts_ms)
699                            .as_str()
700                            .to_string(),
701                        rank: Some(me.rank),
702                        pnl: me.pnl,
703                        volume: me.volume,
704                        efficiency: me.efficiency,
705                        medal: me.medal,
706                    },
707                    None => WsCompetitionPnlStanding {
708                        competition_id: competition.id,
709                        competition_name: competition.name.clone(),
710                        competition_state: competition_state(&competition, now_ts_ms)
711                            .as_str()
712                            .to_string(),
713                        rank: None,
714                        pnl: dec!(0),
715                        volume: dec!(0),
716                        efficiency: dec!(0),
717                        medal: None,
718                    },
719                })
720            }
721            None => None,
722        };
723
724        Ok(WsCompetitionPnlSummary {
725            wallet_address: wallet,
726            lifetime_realized_pnl,
727            active_competition,
728            timestamp: now_ts_ms,
729        })
730    }
731
732    pub async fn finalize_ended_competitions(
733        &self,
734        now_ts_ms: i64,
735    ) -> Result<Vec<WsCompetitionFinalStats>> {
736        let candidates = self.db.get_competitions_to_finalize(now_ts_ms).await?;
737
738        let mut notifications = Vec::new();
739
740        for competition in candidates {
741            let rows = self
742                .compute_live_leaderboard(
743                    &competition,
744                    CompetitionWinCondition::parse(&competition.primary_win_condition)
745                        .ok_or_else(|| {
746                            anyhow!(
747                                "invalid competition primary_win_condition {}",
748                                competition.primary_win_condition
749                            )
750                        })?
751                        .sort_by(),
752                    SortOrder::Desc,
753                    true,
754                    competition.end_ts_ms,
755                )
756                .await?;
757
758            if rows.is_empty() {
759                continue;
760            }
761
762            let competition_id = competition.id;
763            let stats: Vec<CompetitionFinalStatsInput> = rows
764                .iter()
765                .map(|row| CompetitionFinalStatsInput {
766                    competition_id,
767                    wallet: row.wallet,
768                    rank: row.rank as i32,
769                    pnl: row.pnl,
770                    volume: row.volume,
771                    efficiency: row.efficiency,
772                    medal: row.medal.map(i32::from),
773                })
774                .collect();
775
776            let inserted = self.db.finalize_competition(competition_id, &stats).await?;
777
778            if inserted > 0 {
779                for row in &rows {
780                    notifications.push(WsCompetitionFinalStats {
781                        wallet_address: row.wallet,
782                        competition_id,
783                        rank: row.rank as i64,
784                        pnl: row.pnl,
785                        volume: row.volume,
786                        efficiency: row.efficiency,
787                        medal: row.medal.map(i64::from),
788                        timestamp: now_ts_ms,
789                    });
790                }
791            }
792        }
793
794        Ok(notifications)
795    }
796
797    pub async fn get_display_username(&self, wallet: WalletAddress) -> Result<String> {
798        let username = self.db.get_display_username(&wallet).await?;
799        Ok(username.unwrap_or_else(|| truncate_wallet(wallet)))
800    }
801
802    pub async fn get_profile_image_url(&self, wallet: WalletAddress) -> Result<Option<String>> {
803        Ok(self.db.get_profile_image_url(&wallet).await?)
804    }
805
806    pub async fn set_profile_image_url(
807        &self,
808        wallet: WalletAddress,
809        profile_image_url: &str,
810    ) -> Result<Option<String>> {
811        Ok(self
812            .db
813            .set_profile_image_url(&wallet, profile_image_url)
814            .await?)
815    }
816
817    async fn get_display_usernames(
818        &self,
819        wallets: impl IntoIterator<Item = WalletAddress>,
820    ) -> Result<HashMap<WalletAddress, String>> {
821        let unique_wallets: Vec<WalletAddress> = wallets
822            .into_iter()
823            .collect::<BTreeSet<_>>()
824            .into_iter()
825            .collect();
826
827        let mut usernames = HashMap::with_capacity(unique_wallets.len());
828        for wallet in &unique_wallets {
829            usernames.insert(*wallet, truncate_wallet(*wallet));
830        }
831
832        if unique_wallets.is_empty() {
833            return Ok(usernames);
834        }
835
836        let wallet_strings: Vec<String> = unique_wallets
837            .iter()
838            .map(|w| format!("{:#x}", w.0).to_lowercase())
839            .collect();
840
841        let rows: Vec<WalletUsernameRecord> =
842            self.db.get_display_usernames_batch(&wallet_strings).await?;
843
844        for row in rows {
845            if let Ok(addr) = WalletAddress::from_str(&row.wallet_address) {
846                usernames.insert(addr, row.username);
847            }
848        }
849
850        Ok(usernames)
851    }
852
853    pub async fn get_profile_trade_history(
854        &self,
855        wallet: WalletAddress,
856        competition_id: Option<i64>,
857        from_ts_ms: Option<i64>,
858        to_ts_ms: Option<i64>,
859        symbol: Option<&str>,
860        limit: usize,
861        offset: usize,
862    ) -> Result<Vec<crate::Fill>> {
863        let (window_start, window_end) = if let Some(comp_id) = competition_id {
864            let comp = self.get_competition(comp_id).await?;
865            (Some(comp.start_ts_ms), Some(comp.end_ts_ms))
866        } else {
867            (None, None)
868        };
869
870        let rows: Vec<ProfileFillRecord> = self
871            .db
872            .get_profile_trade_history(
873                &wallet,
874                window_start,
875                window_end,
876                from_ts_ms,
877                to_ts_ms,
878                symbol,
879                limit as i64,
880                offset as i64,
881            )
882            .await?;
883
884        rows.into_iter()
885            .map(|r| {
886                let fill_id = r.fill_id.ok_or_else(|| {
887                    anyhow!("fill row has NULL fill_id (trade_id={})", r.trade_id)
888                })?;
889                let created_at = r
890                    .created_at
891                    .ok_or_else(|| anyhow!("fill row has NULL created_at (fill_id={})", fill_id))?;
892                Ok(crate::Fill {
893                    fill_id,
894                    trade_id: r.trade_id,
895                    wallet_address: r.wallet_address,
896                    symbol: r.symbol,
897                    price: r.price,
898                    size: r.size,
899                    fee: r.fee,
900                    is_taker: r.is_taker,
901                    timestamp: r.timestamp,
902                    created_at,
903                    builder_code_address: r.builder_code_address,
904                    builder_code_fee: r.builder_code_fee,
905                    realized_pnl: r.realized_pnl,
906                })
907            })
908            .collect()
909    }
910
911    pub async fn compute_ledger_profile_stats(
912        &self,
913        wallet: WalletAddress,
914        now_ts_ms: i64,
915    ) -> Result<(Decimal, Decimal, Decimal, Decimal)> {
916        let stats = self
917            .db
918            .compute_ledger_profile_stats(&wallet, now_ts_ms)
919            .await?;
920        Ok((
921            stats.deposits,
922            stats.withdrawals,
923            stats.lifetime_realized_pnl,
924            stats.pnl_24h,
925        ))
926    }
927
928    pub async fn get_account_first_seen_ts_ms(&self, wallet: WalletAddress) -> Result<Option<i64>> {
929        Ok(self.db.get_account_first_seen_ts_ms(&wallet).await?)
930    }
931
932    pub async fn get_platform_metric_medals(
933        &self,
934        wallet: WalletAddress,
935    ) -> Result<PlatformMetricMedals> {
936        let metrics = self.db.get_platform_wallet_metrics().await?;
937
938        let rows: Vec<(WalletAddress, Decimal, Decimal, Decimal)> = metrics
939            .into_iter()
940            .map(|m| {
941                let efficiency = if m.volume == dec!(0) {
942                    dec!(0)
943                } else {
944                    m.realized / m.volume
945                };
946                (m.wallet, m.realized, m.volume, efficiency)
947            })
948            .collect();
949
950        let pnl_rows: Vec<(WalletAddress, Decimal, Decimal)> =
951            rows.iter().map(|row| (row.0, row.1, row.2)).collect();
952        let volume_rows: Vec<(WalletAddress, Decimal, Decimal)> =
953            rows.iter().map(|row| (row.0, row.2, row.2)).collect();
954        let efficiency_rows: Vec<(WalletAddress, Decimal, Decimal)> =
955            rows.iter().map(|row| (row.0, row.3, row.2)).collect();
956
957        Ok(PlatformMetricMedals {
958            pnl: Self::wallet_medal_for_metric(&pnl_rows, wallet, false),
959            volume: Self::wallet_medal_for_metric(&volume_rows, wallet, false),
960            efficiency: Self::wallet_medal_for_metric(&efficiency_rows, wallet, true),
961        })
962    }
963
964    pub async fn get_active_competition(
965        &self,
966        now_ts_ms: i64,
967    ) -> Result<Option<CompetitionRecord>> {
968        Ok(self.db.get_active_competition(now_ts_ms).await?)
969    }
970
971    pub async fn get_latest_completed_competition(
972        &self,
973        now_ts_ms: i64,
974    ) -> Result<Option<CompetitionRecord>> {
975        Ok(self.db.get_latest_completed_competition(now_ts_ms).await?)
976    }
977
978    async fn get_finalized_leaderboard(
979        &self,
980        competition_id: i64,
981    ) -> Result<Vec<LeaderboardEntry>> {
982        let stats = self.db.get_finalized_stats(competition_id).await?;
983
984        let usernames = self
985            .get_display_usernames(stats.iter().map(|row| row.wallet))
986            .await?;
987
988        let mut entries = Vec::with_capacity(stats.len());
989        for row in stats {
990            entries.push(LeaderboardEntry {
991                rank: row.rank as usize,
992                wallet: row.wallet,
993                username: usernames
994                    .get(&row.wallet)
995                    .cloned()
996                    .unwrap_or_else(|| truncate_wallet(row.wallet)),
997                pnl: row.pnl,
998                volume: row.volume,
999                efficiency: row.efficiency,
1000                medal: row.medal.and_then(|m| u8::try_from(m).ok()),
1001            });
1002        }
1003
1004        Ok(entries)
1005    }
1006
1007    async fn compute_live_leaderboard(
1008        &self,
1009        competition: &CompetitionRecord,
1010        sort_by: LeaderboardSortBy,
1011        sort_order: SortOrder,
1012        use_historical_theoretical_marks: bool,
1013        cutoff_ts_ms: i64,
1014    ) -> Result<Vec<LeaderboardEntry>> {
1015        let fills = self.db.get_competition_fills_before(cutoff_ts_ms).await?;
1016
1017        if fills.is_empty() {
1018            return Ok(Vec::new());
1019        }
1020
1021        let mut wallet_symbol_lots: HashMap<WalletAddress, HashMap<String, SymbolLots>> =
1022            HashMap::new();
1023
1024        for fill in fills {
1025            let in_window =
1026                fill.timestamp_ms >= competition.start_ts_ms && fill.timestamp_ms < cutoff_ts_ms;
1027            let symbol_map = wallet_symbol_lots.entry(fill.wallet).or_default();
1028            let lots = symbol_map.entry(fill.symbol.clone()).or_default();
1029            lots.apply_fill(&fill.side, fill.price, fill.size, fill.fee, in_window)?;
1030        }
1031
1032        let symbols_needing_marks: Vec<String> = wallet_symbol_lots
1033            .values()
1034            .flat_map(|m| {
1035                m.iter()
1036                    .filter(|&(_symbol, lots)| lots.has_in_window_lots())
1037                    .map(|(symbol, _lots)| symbol.clone())
1038            })
1039            .collect::<std::collections::BTreeSet<_>>()
1040            .into_iter()
1041            .collect();
1042
1043        let marks = if use_historical_theoretical_marks {
1044            self.get_historical_theoretical_marks(&symbols_needing_marks, cutoff_ts_ms)
1045                .await?
1046        } else {
1047            self.get_live_theoretical_marks(&symbols_needing_marks)
1048                .await?
1049        };
1050
1051        let usernames = self
1052            .get_display_usernames(wallet_symbol_lots.keys().copied())
1053            .await?;
1054
1055        let mut leaderboard: Vec<LeaderboardEntry> = Vec::new();
1056
1057        for (wallet, symbol_lots) in wallet_symbol_lots {
1058            let mut stats = WalletStats {
1059                realized: dec!(0),
1060                unrealized: dec!(0),
1061                fees: dec!(0),
1062                volume: dec!(0),
1063            };
1064
1065            for (symbol, lots) in symbol_lots {
1066                stats.realized += lots.realized;
1067                stats.fees += lots.fees;
1068                stats.volume += lots.volume;
1069
1070                if lots.has_in_window_lots() {
1071                    let mark = marks.get(&symbol).ok_or_else(|| {
1072                        CompetitionError::Unavailable(format!(
1073                            "missing mark for symbol {} while computing leaderboard",
1074                            symbol
1075                        ))
1076                    })?;
1077                    stats.unrealized += lots.unrealized_with_mark(*mark);
1078                }
1079            }
1080
1081            if stats.realized == dec!(0)
1082                && stats.unrealized == dec!(0)
1083                && stats.fees == dec!(0)
1084                && stats.volume == dec!(0)
1085            {
1086                continue;
1087            }
1088
1089            leaderboard.push(LeaderboardEntry {
1090                rank: 0,
1091                wallet,
1092                username: usernames
1093                    .get(&wallet)
1094                    .cloned()
1095                    .unwrap_or_else(|| truncate_wallet(wallet)),
1096                pnl: stats.pnl(),
1097                volume: stats.volume,
1098                efficiency: stats.efficiency(),
1099                medal: None,
1100            });
1101        }
1102
1103        Self::sort_entries(&mut leaderboard, sort_by, sort_order);
1104
1105        for (idx, row) in leaderboard.iter_mut().enumerate() {
1106            row.rank = idx + 1;
1107            row.medal = match row.rank {
1108                1 => Some(1),
1109                2 => Some(2),
1110                3 => Some(3),
1111                _ => None,
1112            };
1113        }
1114
1115        Ok(leaderboard)
1116    }
1117
1118    fn sort_entries(
1119        entries: &mut [LeaderboardEntry],
1120        sort_by: LeaderboardSortBy,
1121        sort_order: SortOrder,
1122    ) {
1123        entries.sort_by(|a, b| {
1124            let primary = match sort_by {
1125                LeaderboardSortBy::Pnl => a.pnl.partial_cmp(&b.pnl).unwrap_or(Ordering::Equal),
1126                LeaderboardSortBy::Volume => {
1127                    a.volume.partial_cmp(&b.volume).unwrap_or(Ordering::Equal)
1128                }
1129                LeaderboardSortBy::Efficiency => a
1130                    .efficiency
1131                    .partial_cmp(&b.efficiency)
1132                    .unwrap_or(Ordering::Equal),
1133            };
1134
1135            let primary = match sort_order {
1136                SortOrder::Asc => primary,
1137                SortOrder::Desc => primary.reverse(),
1138            };
1139
1140            if primary != Ordering::Equal {
1141                return primary;
1142            }
1143
1144            let secondary = b.volume.partial_cmp(&a.volume).unwrap_or(Ordering::Equal);
1145            if secondary != Ordering::Equal {
1146                return secondary;
1147            }
1148
1149            a.wallet.to_string().cmp(&b.wallet.to_string())
1150        });
1151    }
1152
1153    fn metric_value(entry: &LeaderboardEntry, sort_by: LeaderboardSortBy) -> Decimal {
1154        match sort_by {
1155            LeaderboardSortBy::Pnl => entry.pnl,
1156            LeaderboardSortBy::Volume => entry.volume,
1157            LeaderboardSortBy::Efficiency => entry.efficiency,
1158        }
1159    }
1160
1161    fn wallet_medal_for_metric(
1162        rows: &[(WalletAddress, Decimal, Decimal)],
1163        wallet: WalletAddress,
1164        require_positive_volume: bool,
1165    ) -> Option<u8> {
1166        let mut ordered = rows.to_vec();
1167        ordered.sort_by(|a, b| {
1168            let primary = b.1.partial_cmp(&a.1).unwrap_or(Ordering::Equal);
1169            if primary != Ordering::Equal {
1170                return primary;
1171            }
1172            let volume_tiebreak = b.2.partial_cmp(&a.2).unwrap_or(Ordering::Equal);
1173            if volume_tiebreak != Ordering::Equal {
1174                return volume_tiebreak;
1175            }
1176            a.0.to_string().cmp(&b.0.to_string())
1177        });
1178
1179        let mut eligible_rank = 0_usize;
1180        for row in &ordered {
1181            if require_positive_volume && row.2 == dec!(0) {
1182                continue;
1183            }
1184            eligible_rank += 1;
1185            if row.0 == wallet {
1186                return match eligible_rank {
1187                    1 => Some(1),
1188                    2 => Some(2),
1189                    3 => Some(3),
1190                    _ => None,
1191                };
1192            }
1193        }
1194        None
1195    }
1196
1197    async fn get_historical_theoretical_marks(
1198        &self,
1199        symbols: &[String],
1200        cutoff_ts_ms: i64,
1201    ) -> Result<HashMap<String, Decimal>> {
1202        if symbols.is_empty() {
1203            return Ok(HashMap::new());
1204        }
1205
1206        let records = self
1207            .db
1208            .get_historical_theo_marks(symbols, cutoff_ts_ms)
1209            .await?;
1210
1211        let mut map = HashMap::with_capacity(records.len());
1212        for record in records {
1213            map.insert(record.symbol, record.theoretical_price);
1214        }
1215
1216        let missing_symbols: Vec<String> = symbols
1217            .iter()
1218            .filter(|symbol| !map.contains_key(symbol.as_str()))
1219            .cloned()
1220            .collect();
1221
1222        if missing_symbols.is_empty() {
1223            return Ok(map);
1224        }
1225
1226        tracing::warn!(
1227            cutoff_ts_ms,
1228            missing_symbols = ?missing_symbols,
1229            "Falling back to live theoretical marks for symbols missing historical snapshots"
1230        );
1231
1232        let live_fallback_marks = self.get_live_theoretical_marks(&missing_symbols).await?;
1233        map.extend(live_fallback_marks);
1234
1235        Ok(map)
1236    }
1237
1238    async fn get_live_theoretical_marks(
1239        &self,
1240        symbols: &[String],
1241    ) -> Result<HashMap<String, Decimal>> {
1242        let mut map = HashMap::with_capacity(symbols.len());
1243
1244        for symbol in symbols {
1245            let mark_f64 = get_symbol_theoretical_price(self.greeks_cache.as_ref(), symbol)
1246                .await
1247                .map_err(|error| CompetitionError::Unavailable(error.to_string()))?;
1248            let mark = Decimal::from_f64_retain(mark_f64)
1249                .ok_or_else(|| anyhow!("invalid theoretical price decimal for {}", symbol))?;
1250            map.insert(symbol.clone(), mark);
1251        }
1252
1253        Ok(map)
1254    }
1255
1256    fn validate_competition_window(start_ts_ms: i64, end_ts_ms: i64) -> Result<()> {
1257        if start_ts_ms >= end_ts_ms {
1258            return Err(CompetitionError::BadRequest(
1259                "start_ts_ms must be less than end_ts_ms".to_string(),
1260            ));
1261        }
1262        Ok(())
1263    }
1264
1265    fn validate_win_conditions(
1266        win_conditions: &[String],
1267        primary_win_condition: &str,
1268    ) -> Result<()> {
1269        if win_conditions.is_empty() {
1270            return Err(CompetitionError::BadRequest(
1271                "win_conditions must be non-empty".to_string(),
1272            ));
1273        }
1274        if !win_conditions
1275            .iter()
1276            .all(|cond| CompetitionWinCondition::parse(cond).is_some())
1277        {
1278            return Err(CompetitionError::BadRequest(
1279                "win_conditions must contain only pnl|volume|efficiency".to_string(),
1280            ));
1281        }
1282        if CompetitionWinCondition::parse(primary_win_condition).is_none() {
1283            return Err(CompetitionError::BadRequest(
1284                "primary_win_condition must be one of pnl|volume|efficiency".to_string(),
1285            ));
1286        }
1287        if !win_conditions
1288            .iter()
1289            .any(|cond| cond == primary_win_condition)
1290        {
1291            return Err(CompetitionError::BadRequest(
1292                "primary_win_condition must be included in win_conditions".to_string(),
1293            ));
1294        }
1295        Ok(())
1296    }
1297}
1298
1299fn truncate_wallet(wallet: WalletAddress) -> String {
1300    let full = wallet.to_string();
1301    if full.len() <= 10 {
1302        full
1303    } else {
1304        format!("{}...{}", &full[..6], &full[full.len() - 4..])
1305    }
1306}
1307
1308#[cfg(test)]
1309mod tests {
1310    use super::*;
1311    use std::str::FromStr;
1312
1313    fn wallet(index: u8) -> WalletAddress {
1314        WalletAddress::from_str(&format!("0x{:040x}", index)).expect("valid wallet")
1315    }
1316
1317    #[test]
1318    fn competition_state_transitions_follow_bounds() {
1319        assert_eq!(
1320            CompetitionState::from_bounds(2000, 3000, 1999),
1321            CompetitionState::Pre
1322        );
1323        assert_eq!(
1324            CompetitionState::from_bounds(2000, 3000, 2000),
1325            CompetitionState::Active
1326        );
1327        assert_eq!(
1328            CompetitionState::from_bounds(2000, 3000, 2999),
1329            CompetitionState::Active
1330        );
1331        assert_eq!(
1332            CompetitionState::from_bounds(2000, 3000, 3000),
1333            CompetitionState::Post
1334        );
1335    }
1336
1337    #[test]
1338    fn lot_accounting_handles_long_close_and_reversal() {
1339        let mut lots = SymbolLots::default();
1340        lots.apply_fill("Buy", dec!(100), dec!(10), dec!(0), true)
1341            .expect("buy fill should apply");
1342        lots.apply_fill("Sell", dec!(120), dec!(4), dec!(0), true)
1343            .expect("partial close should apply");
1344        assert_eq!(lots.realized, dec!(80));
1345        assert_eq!(lots.unrealized_with_mark(dec!(110)), dec!(60));
1346
1347        lots.apply_fill("Sell", dec!(90), dec!(10), dec!(0), true)
1348            .expect("reversal should apply");
1349        assert_eq!(lots.realized, dec!(20));
1350        assert_eq!(lots.unrealized_with_mark(dec!(80)), dec!(40));
1351    }
1352
1353    #[test]
1354    fn sorting_uses_requested_metric_then_tiebreakers() {
1355        let mut entries = vec![
1356            LeaderboardEntry {
1357                rank: 0,
1358                wallet: wallet(1),
1359                username: "u1".to_string(),
1360                pnl: dec!(10),
1361                volume: dec!(100),
1362                efficiency: dec!(0.1),
1363                medal: None,
1364            },
1365            LeaderboardEntry {
1366                rank: 0,
1367                wallet: wallet(2),
1368                username: "u2".to_string(),
1369                pnl: dec!(10),
1370                volume: dec!(200),
1371                efficiency: dec!(0.05),
1372                medal: None,
1373            },
1374            LeaderboardEntry {
1375                rank: 0,
1376                wallet: wallet(3),
1377                username: "u3".to_string(),
1378                pnl: dec!(15),
1379                volume: dec!(150),
1380                efficiency: dec!(0.1),
1381                medal: None,
1382            },
1383        ];
1384
1385        CompetitionService::sort_entries(&mut entries, LeaderboardSortBy::Pnl, SortOrder::Desc);
1386        assert_eq!(entries[0].wallet, wallet(3));
1387        assert_eq!(entries[1].wallet, wallet(2));
1388        assert_eq!(entries[2].wallet, wallet(1));
1389    }
1390
1391    #[test]
1392    fn win_condition_validation_enforces_membership() {
1393        CompetitionService::validate_win_conditions(&["pnl".to_string()], "pnl")
1394            .expect("valid win condition");
1395        assert!(CompetitionService::validate_win_conditions(&[], "pnl").is_err());
1396        assert!(
1397            CompetitionService::validate_win_conditions(&["pnl".to_string()], "volume").is_err()
1398        );
1399    }
1400}