1use anyhow::anyhow;
2
3use crate::{CompetitionError, Result};
4use hypercall_db::{
5 CompetitionFillInput, CompetitionFinalStatsInput, CompetitionRecord, CompetitionUpdateInput,
6 CompetitionUpsertInput, CompetitionWriteError, CompetitionWriter, ProfileFillRecord,
7 WalletUsernameRecord,
8};
9use hypercall_runtime_api::boundary::market_inputs::GreeksCacheReader;
10use hypercall_runtime_api::valuation::get_symbol_theoretical_price;
11use hypercall_types::ws_protocol::{
12 WsCompetitionFinalStats, WsCompetitionPnlStanding, WsCompetitionPnlSummary, WsCompetitionUpdate,
13};
14
15use crate::{competition_state, CompetitionState};
16use hypercall_types::{to_human_readable_decimal, WalletAddress};
17use rust_decimal::Decimal;
18use rust_decimal_macros::dec;
19use std::cmp::Ordering;
20use std::collections::{BTreeSet, HashMap, VecDeque};
21use std::str::FromStr;
22use std::sync::Arc;
23
24#[derive(Debug, Clone, Copy)]
25pub enum LeaderboardSortBy {
26 Pnl,
27 Volume,
28 Efficiency,
29}
30
31impl LeaderboardSortBy {
32 pub fn parse(input: &str) -> Option<Self> {
33 match input {
34 "pnl" => Some(Self::Pnl),
35 "volume" => Some(Self::Volume),
36 "efficiency" => Some(Self::Efficiency),
37 _ => None,
38 }
39 }
40}
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq)]
43pub enum CompetitionWinCondition {
44 Pnl,
45 Volume,
46 Efficiency,
47}
48
49impl CompetitionWinCondition {
50 fn parse(input: &str) -> Option<Self> {
51 match input {
52 "pnl" => Some(Self::Pnl),
53 "volume" => Some(Self::Volume),
54 "efficiency" => Some(Self::Efficiency),
55 _ => None,
56 }
57 }
58
59 fn sort_by(self) -> LeaderboardSortBy {
60 match self {
61 Self::Pnl => LeaderboardSortBy::Pnl,
62 Self::Volume => LeaderboardSortBy::Volume,
63 Self::Efficiency => LeaderboardSortBy::Efficiency,
64 }
65 }
66}
67
68#[derive(Debug, Clone, Copy)]
69pub enum SortOrder {
70 Asc,
71 Desc,
72}
73
74impl SortOrder {
75 pub fn parse(input: &str) -> Option<Self> {
76 match input {
77 "asc" => Some(Self::Asc),
78 "desc" => Some(Self::Desc),
79 _ => None,
80 }
81 }
82}
83
84#[derive(Debug, Clone)]
85pub struct LeaderboardQuery {
86 pub competition_id: i64,
87 pub sort_by: LeaderboardSortBy,
88 pub sort_order: SortOrder,
89 pub limit: usize,
90 pub offset: usize,
91 pub wallet: Option<WalletAddress>,
92}
93
94#[derive(Debug, Clone)]
95pub struct LeaderboardEntry {
96 pub rank: usize,
97 pub wallet: WalletAddress,
98 pub username: String,
99 pub pnl: Decimal,
100 pub volume: Decimal,
101 pub efficiency: Decimal,
102 pub medal: Option<u8>,
103}
104
105#[derive(Debug, Clone)]
106pub struct LeaderboardResult {
107 pub rows: Vec<LeaderboardEntry>,
108 pub connected_user: Option<LeaderboardEntry>,
109}
110
111#[derive(Debug, Clone)]
112pub struct RealizedPnlBySymbol {
113 pub symbol: String,
114 pub realized_pnl: Decimal,
115 pub event_count: i64,
116}
117
118#[derive(Debug, Clone, Copy, Default)]
119pub struct PlatformMetricMedals {
120 pub pnl: Option<u8>,
121 pub volume: Option<u8>,
122 pub efficiency: Option<u8>,
123}
124
125#[derive(Debug, Clone)]
126pub struct CompetitionEngagementSnapshot {
127 pub wallet: WalletAddress,
128 pub competition_id: i64,
129 pub rank: i64,
130 pub pnl: Decimal,
131 pub volume: Decimal,
132 pub efficiency: Decimal,
133 pub next_rank: Option<i64>,
134 pub gap_metric_value: Option<Decimal>,
135}
136
137#[derive(Debug, Clone)]
138struct WalletStats {
139 realized: Decimal,
140 unrealized: Decimal,
141 fees: Decimal,
142 volume: Decimal,
143}
144
145impl WalletStats {
146 fn pnl(&self) -> Decimal {
147 self.realized + self.unrealized - self.fees
148 }
149
150 fn efficiency(&self) -> Decimal {
151 if self.volume == dec!(0) {
152 dec!(0)
153 } else {
154 self.pnl() / self.volume
155 }
156 }
157}
158
159#[derive(Debug, Clone)]
160struct Lot {
161 qty: Decimal,
162 price: Decimal,
163 opened_in_window: bool,
164}
165
166#[derive(Debug, Clone, Default)]
167struct SymbolLots {
168 long_lots: VecDeque<Lot>,
169 short_lots: VecDeque<Lot>,
170 realized: Decimal,
171 fees: Decimal,
172 volume: Decimal,
173}
174
175impl SymbolLots {
176 fn apply_fill(
177 &mut self,
178 side: &str,
179 price: Decimal,
180 size: Decimal,
181 fee: Decimal,
182 in_window: bool,
183 ) -> Result<()> {
184 if size <= dec!(0) {
185 return Err(CompetitionError::BadRequest(
186 "fill size must be positive".to_string(),
187 ));
188 }
189
190 if in_window {
191 self.fees += fee;
192 self.volume += price * size;
193 }
194
195 match side {
196 "Buy" => self.apply_buy(price, size, in_window),
197 "Sell" => self.apply_sell(price, size, in_window),
198 other => panic!(
199 "invariant violated: invalid side '{}' in competition_fill_events",
200 other
201 ),
202 }
203 }
204
205 fn apply_buy(&mut self, price: Decimal, size: Decimal, in_window: bool) -> Result<()> {
206 let mut remaining = size;
207
208 while remaining > dec!(0) {
209 let Some(front) = self.short_lots.front_mut() else {
210 break;
211 };
212
213 let matched = remaining.min(front.qty);
214 if in_window {
215 self.realized += (front.price - price) * matched;
216 }
217 front.qty -= matched;
218 remaining -= matched;
219
220 if front.qty == dec!(0) {
221 self.short_lots.pop_front();
222 }
223 }
224
225 if remaining > dec!(0) {
226 self.long_lots.push_back(Lot {
227 qty: remaining,
228 price,
229 opened_in_window: in_window,
230 });
231 }
232
233 Ok(())
234 }
235
236 fn apply_sell(&mut self, price: Decimal, size: Decimal, in_window: bool) -> Result<()> {
237 let mut remaining = size;
238
239 while remaining > dec!(0) {
240 let Some(front) = self.long_lots.front_mut() else {
241 break;
242 };
243
244 let matched = remaining.min(front.qty);
245 if in_window {
246 self.realized += (price - front.price) * matched;
247 }
248 front.qty -= matched;
249 remaining -= matched;
250
251 if front.qty == dec!(0) {
252 self.long_lots.pop_front();
253 }
254 }
255
256 if remaining > dec!(0) {
257 self.short_lots.push_back(Lot {
258 qty: remaining,
259 price,
260 opened_in_window: in_window,
261 });
262 }
263
264 Ok(())
265 }
266
267 fn has_in_window_lots(&self) -> bool {
268 self.long_lots.iter().any(|lot| lot.opened_in_window)
269 || self.short_lots.iter().any(|lot| lot.opened_in_window)
270 }
271
272 fn unrealized_with_mark(&self, mark: Decimal) -> Decimal {
273 let mut total = dec!(0);
274 for lot in &self.long_lots {
275 if lot.opened_in_window {
276 total += (mark - lot.price) * lot.qty;
277 }
278 }
279 for lot in &self.short_lots {
280 if lot.opened_in_window {
281 total += (lot.price - mark) * lot.qty;
282 }
283 }
284 total
285 }
286}
287
288#[derive(Clone)]
289pub struct CompetitionService {
290 db: Arc<dyn CompetitionWriter>,
291 greeks_cache: Arc<dyn GreeksCacheReader>,
292}
293
294impl CompetitionService {
295 pub fn new(db: Arc<dyn CompetitionWriter>, greeks_cache: Arc<dyn GreeksCacheReader>) -> Self {
296 Self { db, greeks_cache }
297 }
298
299 pub async fn list_competitions(
300 &self,
301 state: Option<CompetitionState>,
302 from_ts_ms: Option<i64>,
303 to_ts_ms: Option<i64>,
304 limit: usize,
305 offset: usize,
306 now_ts_ms: i64,
307 ) -> Result<Vec<CompetitionRecord>> {
308 let state_str: Option<&str> = state.map(|s| s.as_str());
309 Ok(self
310 .db
311 .list_competitions(
312 state_str,
313 from_ts_ms,
314 to_ts_ms,
315 now_ts_ms,
316 limit as i64,
317 offset as i64,
318 )
319 .await?)
320 }
321
322 pub async fn get_competition(&self, competition_id: i64) -> Result<CompetitionRecord> {
323 self.db
324 .get_competition_by_id(competition_id)
325 .await?
326 .ok_or_else(|| CompetitionError::NotFound(format!("competition {}", competition_id)))
327 }
328
329 pub async fn create_competition(
330 &self,
331 input: CompetitionUpsertInput,
332 ) -> Result<CompetitionRecord> {
333 Self::validate_competition_window(input.start_ts_ms, input.end_ts_ms)?;
334 Self::validate_win_conditions(&input.win_conditions, &input.primary_win_condition)?;
335
336 self.db
337 .create_competition(&input)
338 .await
339 .map_err(|e| match e {
340 CompetitionWriteError::OverlapViolation => CompetitionError::Conflict(
341 "competition window overlaps existing competition".to_string(),
342 ),
343 CompetitionWriteError::Internal(err) => {
344 CompetitionError::Internal(anyhow!("failed to create competition: {}", err))
345 }
346 other => CompetitionError::Internal(anyhow!("{}", other)),
347 })
348 }
349
350 pub async fn update_competition(
351 &self,
352 competition_id: i64,
353 input: CompetitionUpdateInput,
354 now_ts_ms: i64,
355 ) -> Result<CompetitionRecord> {
356 let current = self.get_competition(competition_id).await?;
357 if competition_state(¤t, now_ts_ms) == CompetitionState::Post {
358 return Err(CompetitionError::BadRequest(
359 "post competitions are immutable".to_string(),
360 ));
361 }
362
363 let name = input.name.unwrap_or(current.name);
364 let description = input.description.unwrap_or(current.description);
365 let rules_url = input.rules_url.unwrap_or(current.rules_url);
366 let rules_content = input.rules_content.unwrap_or(current.rules_content);
367 let win_conditions = input.win_conditions.unwrap_or(current.win_conditions);
368 let primary_win_condition = input
369 .primary_win_condition
370 .unwrap_or(current.primary_win_condition);
371 let start_ts_ms = input.start_ts_ms.unwrap_or(current.start_ts_ms);
372 let end_ts_ms = input.end_ts_ms.unwrap_or(current.end_ts_ms);
373 Self::validate_competition_window(start_ts_ms, end_ts_ms)?;
374 Self::validate_win_conditions(&win_conditions, &primary_win_condition)?;
375
376 self.db
377 .update_competition(
378 competition_id,
379 &name,
380 description.as_deref(),
381 rules_url.as_deref(),
382 rules_content.as_deref(),
383 &win_conditions,
384 &primary_win_condition,
385 start_ts_ms,
386 end_ts_ms,
387 )
388 .await
389 .map_err(|e| match e {
390 CompetitionWriteError::OverlapViolation => CompetitionError::Conflict(
391 "competition window overlaps existing competition".to_string(),
392 ),
393 CompetitionWriteError::NotFound(msg) => CompetitionError::NotFound(msg),
394 CompetitionWriteError::Internal(err) => {
395 CompetitionError::Internal(anyhow!("failed to update competition: {}", err))
396 }
397 other => CompetitionError::Internal(anyhow!("{}", other)),
398 })
399 }
400
401 pub async fn delete_competition(&self, competition_id: i64, now_ts_ms: i64) -> Result<()> {
402 let competition = self.get_competition(competition_id).await?;
403 if competition_state(&competition, now_ts_ms) != CompetitionState::Pre {
404 return Err(CompetitionError::BadRequest(
405 "only pre competitions can be deleted".to_string(),
406 ));
407 }
408
409 let result = self.db.delete_competition(competition_id).await?;
410 if result == 0 {
411 return Err(CompetitionError::NotFound(format!(
412 "competition {}",
413 competition_id
414 )));
415 }
416
417 Ok(())
418 }
419
420 pub async fn record_fill(&self, fill: &hypercall_types::Fill) -> Result<()> {
421 let taker_size_human = to_human_readable_decimal(&fill.symbol, fill.size);
422 let maker_side = match fill.taker_side {
423 hypercall_types::Side::Buy => "Sell",
424 hypercall_types::Side::Sell => "Buy",
425 };
426
427 self.db
428 .record_competition_fill(&CompetitionFillInput {
429 trade_id: fill.trade_id as i64,
430 wallet: fill.taker_wallet_address,
431 symbol: fill.symbol.clone(),
432 side: format!("{:?}", fill.taker_side),
433 price: fill.price,
434 size: taker_size_human,
435 fee: fill.fee,
436 timestamp_ms: fill.timestamp as i64,
437 })
438 .await?;
439
440 self.db
441 .record_competition_fill(&CompetitionFillInput {
442 trade_id: fill.trade_id as i64,
443 wallet: fill.maker_wallet_address,
444 symbol: fill.symbol.clone(),
445 side: maker_side.to_string(),
446 price: fill.price,
447 size: taker_size_human,
448 fee: dec!(0),
449 timestamp_ms: fill.timestamp as i64,
450 })
451 .await?;
452
453 Ok(())
454 }
455
456 pub async fn get_leaderboard(
457 &self,
458 query: LeaderboardQuery,
459 now_ts_ms: i64,
460 ) -> Result<LeaderboardResult> {
461 let competition = self.get_competition(query.competition_id).await?;
462 let state = competition_state(&competition, now_ts_ms);
463 let mut used_frozen_rows = false;
464 let cutoff_ts_ms = now_ts_ms.min(competition.end_ts_ms);
465
466 let mut rows = if state == CompetitionState::Pre {
467 Vec::new()
468 } else if state == CompetitionState::Post {
469 let finalized = self.get_finalized_leaderboard(query.competition_id).await?;
470 if finalized.is_empty() {
471 self.compute_live_leaderboard(
472 &competition,
473 query.sort_by,
474 query.sort_order,
475 true,
476 competition.end_ts_ms,
477 )
478 .await?
479 } else {
480 used_frozen_rows = true;
481 finalized
482 }
483 } else {
484 self.compute_live_leaderboard(
485 &competition,
486 query.sort_by,
487 query.sort_order,
488 false,
489 cutoff_ts_ms,
490 )
491 .await?
492 };
493
494 if used_frozen_rows {
495 Self::sort_entries(&mut rows, query.sort_by, query.sort_order);
496 for (idx, row) in rows.iter_mut().enumerate() {
497 row.rank = idx + 1;
498 row.medal = match row.rank {
499 1 => Some(1),
500 2 => Some(2),
501 3 => Some(3),
502 _ => None,
503 };
504 }
505 }
506
507 let connected_user = query
508 .wallet
509 .and_then(|wallet| rows.iter().find(|r| r.wallet == wallet).cloned());
510
511 rows = rows
512 .into_iter()
513 .skip(query.offset)
514 .take(query.limit)
515 .collect();
516
517 Ok(LeaderboardResult {
518 rows,
519 connected_user,
520 })
521 }
522
523 pub async fn get_realized_pnl_by_symbol(
524 &self,
525 wallet: WalletAddress,
526 competition_id: Option<i64>,
527 ) -> Result<Vec<RealizedPnlBySymbol>> {
528 let (window_start, window_end) = if let Some(comp_id) = competition_id {
529 let comp = self.get_competition(comp_id).await?;
530 (Some(comp.start_ts_ms), Some(comp.end_ts_ms))
531 } else {
532 (None, None)
533 };
534
535 let records = self
536 .db
537 .get_realized_pnl_by_symbol(&wallet, window_start, window_end)
538 .await?;
539
540 Ok(records
541 .into_iter()
542 .map(|r| RealizedPnlBySymbol {
543 symbol: r.symbol,
544 realized_pnl: r.realized_pnl,
545 event_count: r.event_count,
546 })
547 .collect())
548 }
549
550 pub async fn build_competition_update_for_wallet(
551 &self,
552 wallet: WalletAddress,
553 now_ts_ms: i64,
554 ) -> Result<Option<WsCompetitionUpdate>> {
555 let active = self.db.get_active_competition(now_ts_ms).await?;
556 let Some(competition) = active else {
557 return Ok(None);
558 };
559 let primary_sort = CompetitionWinCondition::parse(&competition.primary_win_condition)
560 .ok_or_else(|| {
561 anyhow!(
562 "invalid competition primary_win_condition {}",
563 competition.primary_win_condition
564 )
565 })?
566 .sort_by();
567
568 let result = self
569 .get_leaderboard(
570 LeaderboardQuery {
571 competition_id: competition.id,
572 sort_by: primary_sort,
573 sort_order: SortOrder::Desc,
574 limit: 1,
575 offset: 0,
576 wallet: Some(wallet),
577 },
578 now_ts_ms,
579 )
580 .await?;
581
582 let Some(me) = result.connected_user else {
583 return Ok(None);
584 };
585
586 Ok(Some(WsCompetitionUpdate {
587 wallet_address: wallet,
588 competition_id: competition.id,
589 rank: me.rank as i64,
590 pnl: me.pnl,
591 volume: me.volume,
592 efficiency: me.efficiency,
593 timestamp: now_ts_ms,
594 }))
595 }
596
597 pub async fn build_competition_engagement_snapshot_for_wallet(
598 &self,
599 wallet: WalletAddress,
600 now_ts_ms: i64,
601 ) -> Result<Option<CompetitionEngagementSnapshot>> {
602 let Some(competition) = self.db.get_active_competition(now_ts_ms).await? else {
603 return Ok(None);
604 };
605
606 let primary = CompetitionWinCondition::parse(&competition.primary_win_condition)
607 .ok_or_else(|| {
608 anyhow!(
609 "invalid competition primary_win_condition {}",
610 competition.primary_win_condition
611 )
612 })?;
613
614 let cutoff_ts_ms = now_ts_ms.min(competition.end_ts_ms);
615 let rows = self
616 .compute_live_leaderboard(
617 &competition,
618 primary.sort_by(),
619 SortOrder::Desc,
620 false,
621 cutoff_ts_ms,
622 )
623 .await?;
624 let Some((idx, me)) = rows
625 .iter()
626 .enumerate()
627 .find(|(_, row)| row.wallet == wallet)
628 else {
629 return Ok(None);
630 };
631
632 let gap_metric_value = if idx == 0 {
633 None
634 } else {
635 let ahead = &rows[idx - 1];
636 let my_metric = Self::metric_value(me, primary.sort_by());
637 let ahead_metric = Self::metric_value(ahead, primary.sort_by());
638 Some(ahead_metric - my_metric)
639 };
640
641 Ok(Some(CompetitionEngagementSnapshot {
642 wallet,
643 competition_id: competition.id,
644 rank: me.rank as i64,
645 pnl: me.pnl,
646 volume: me.volume,
647 efficiency: me.efficiency,
648 next_rank: if idx == 0 {
649 None
650 } else {
651 Some(rows[idx - 1].rank as i64)
652 },
653 gap_metric_value,
654 }))
655 }
656
657 pub async fn build_competition_pnl_summary_for_wallet(
658 &self,
659 wallet: WalletAddress,
660 now_ts_ms: i64,
661 ) -> Result<WsCompetitionPnlSummary> {
662 let ledger_stats = self
663 .db
664 .compute_ledger_profile_stats(&wallet, now_ts_ms)
665 .await?;
666 let lifetime_realized_pnl = ledger_stats.lifetime_realized_pnl;
667
668 let active_competition = match self.db.get_active_competition(now_ts_ms).await? {
669 Some(competition) => {
670 let primary_sort =
671 CompetitionWinCondition::parse(&competition.primary_win_condition)
672 .ok_or_else(|| {
673 anyhow!(
674 "invalid competition primary_win_condition {}",
675 competition.primary_win_condition
676 )
677 })?
678 .sort_by();
679
680 let result = self
681 .get_leaderboard(
682 LeaderboardQuery {
683 competition_id: competition.id,
684 sort_by: primary_sort,
685 sort_order: SortOrder::Desc,
686 limit: 1,
687 offset: 0,
688 wallet: Some(wallet),
689 },
690 now_ts_ms,
691 )
692 .await?;
693
694 Some(match result.connected_user {
695 Some(me) => WsCompetitionPnlStanding {
696 competition_id: competition.id,
697 competition_name: competition.name.clone(),
698 competition_state: competition_state(&competition, now_ts_ms)
699 .as_str()
700 .to_string(),
701 rank: Some(me.rank),
702 pnl: me.pnl,
703 volume: me.volume,
704 efficiency: me.efficiency,
705 medal: me.medal,
706 },
707 None => WsCompetitionPnlStanding {
708 competition_id: competition.id,
709 competition_name: competition.name.clone(),
710 competition_state: competition_state(&competition, now_ts_ms)
711 .as_str()
712 .to_string(),
713 rank: None,
714 pnl: dec!(0),
715 volume: dec!(0),
716 efficiency: dec!(0),
717 medal: None,
718 },
719 })
720 }
721 None => None,
722 };
723
724 Ok(WsCompetitionPnlSummary {
725 wallet_address: wallet,
726 lifetime_realized_pnl,
727 active_competition,
728 timestamp: now_ts_ms,
729 })
730 }
731
732 pub async fn finalize_ended_competitions(
733 &self,
734 now_ts_ms: i64,
735 ) -> Result<Vec<WsCompetitionFinalStats>> {
736 let candidates = self.db.get_competitions_to_finalize(now_ts_ms).await?;
737
738 let mut notifications = Vec::new();
739
740 for competition in candidates {
741 let rows = self
742 .compute_live_leaderboard(
743 &competition,
744 CompetitionWinCondition::parse(&competition.primary_win_condition)
745 .ok_or_else(|| {
746 anyhow!(
747 "invalid competition primary_win_condition {}",
748 competition.primary_win_condition
749 )
750 })?
751 .sort_by(),
752 SortOrder::Desc,
753 true,
754 competition.end_ts_ms,
755 )
756 .await?;
757
758 if rows.is_empty() {
759 continue;
760 }
761
762 let competition_id = competition.id;
763 let stats: Vec<CompetitionFinalStatsInput> = rows
764 .iter()
765 .map(|row| CompetitionFinalStatsInput {
766 competition_id,
767 wallet: row.wallet,
768 rank: row.rank as i32,
769 pnl: row.pnl,
770 volume: row.volume,
771 efficiency: row.efficiency,
772 medal: row.medal.map(i32::from),
773 })
774 .collect();
775
776 let inserted = self.db.finalize_competition(competition_id, &stats).await?;
777
778 if inserted > 0 {
779 for row in &rows {
780 notifications.push(WsCompetitionFinalStats {
781 wallet_address: row.wallet,
782 competition_id,
783 rank: row.rank as i64,
784 pnl: row.pnl,
785 volume: row.volume,
786 efficiency: row.efficiency,
787 medal: row.medal.map(i64::from),
788 timestamp: now_ts_ms,
789 });
790 }
791 }
792 }
793
794 Ok(notifications)
795 }
796
797 pub async fn get_display_username(&self, wallet: WalletAddress) -> Result<String> {
798 let username = self.db.get_display_username(&wallet).await?;
799 Ok(username.unwrap_or_else(|| truncate_wallet(wallet)))
800 }
801
802 pub async fn get_profile_image_url(&self, wallet: WalletAddress) -> Result<Option<String>> {
803 Ok(self.db.get_profile_image_url(&wallet).await?)
804 }
805
806 pub async fn set_profile_image_url(
807 &self,
808 wallet: WalletAddress,
809 profile_image_url: &str,
810 ) -> Result<Option<String>> {
811 Ok(self
812 .db
813 .set_profile_image_url(&wallet, profile_image_url)
814 .await?)
815 }
816
817 async fn get_display_usernames(
818 &self,
819 wallets: impl IntoIterator<Item = WalletAddress>,
820 ) -> Result<HashMap<WalletAddress, String>> {
821 let unique_wallets: Vec<WalletAddress> = wallets
822 .into_iter()
823 .collect::<BTreeSet<_>>()
824 .into_iter()
825 .collect();
826
827 let mut usernames = HashMap::with_capacity(unique_wallets.len());
828 for wallet in &unique_wallets {
829 usernames.insert(*wallet, truncate_wallet(*wallet));
830 }
831
832 if unique_wallets.is_empty() {
833 return Ok(usernames);
834 }
835
836 let wallet_strings: Vec<String> = unique_wallets
837 .iter()
838 .map(|w| format!("{:#x}", w.0).to_lowercase())
839 .collect();
840
841 let rows: Vec<WalletUsernameRecord> =
842 self.db.get_display_usernames_batch(&wallet_strings).await?;
843
844 for row in rows {
845 if let Ok(addr) = WalletAddress::from_str(&row.wallet_address) {
846 usernames.insert(addr, row.username);
847 }
848 }
849
850 Ok(usernames)
851 }
852
853 pub async fn get_profile_trade_history(
854 &self,
855 wallet: WalletAddress,
856 competition_id: Option<i64>,
857 from_ts_ms: Option<i64>,
858 to_ts_ms: Option<i64>,
859 symbol: Option<&str>,
860 limit: usize,
861 offset: usize,
862 ) -> Result<Vec<crate::Fill>> {
863 let (window_start, window_end) = if let Some(comp_id) = competition_id {
864 let comp = self.get_competition(comp_id).await?;
865 (Some(comp.start_ts_ms), Some(comp.end_ts_ms))
866 } else {
867 (None, None)
868 };
869
870 let rows: Vec<ProfileFillRecord> = self
871 .db
872 .get_profile_trade_history(
873 &wallet,
874 window_start,
875 window_end,
876 from_ts_ms,
877 to_ts_ms,
878 symbol,
879 limit as i64,
880 offset as i64,
881 )
882 .await?;
883
884 rows.into_iter()
885 .map(|r| {
886 let fill_id = r.fill_id.ok_or_else(|| {
887 anyhow!("fill row has NULL fill_id (trade_id={})", r.trade_id)
888 })?;
889 let created_at = r
890 .created_at
891 .ok_or_else(|| anyhow!("fill row has NULL created_at (fill_id={})", fill_id))?;
892 Ok(crate::Fill {
893 fill_id,
894 trade_id: r.trade_id,
895 wallet_address: r.wallet_address,
896 symbol: r.symbol,
897 price: r.price,
898 size: r.size,
899 fee: r.fee,
900 is_taker: r.is_taker,
901 timestamp: r.timestamp,
902 created_at,
903 builder_code_address: r.builder_code_address,
904 builder_code_fee: r.builder_code_fee,
905 realized_pnl: r.realized_pnl,
906 })
907 })
908 .collect()
909 }
910
911 pub async fn compute_ledger_profile_stats(
912 &self,
913 wallet: WalletAddress,
914 now_ts_ms: i64,
915 ) -> Result<(Decimal, Decimal, Decimal, Decimal)> {
916 let stats = self
917 .db
918 .compute_ledger_profile_stats(&wallet, now_ts_ms)
919 .await?;
920 Ok((
921 stats.deposits,
922 stats.withdrawals,
923 stats.lifetime_realized_pnl,
924 stats.pnl_24h,
925 ))
926 }
927
928 pub async fn get_account_first_seen_ts_ms(&self, wallet: WalletAddress) -> Result<Option<i64>> {
929 Ok(self.db.get_account_first_seen_ts_ms(&wallet).await?)
930 }
931
932 pub async fn get_platform_metric_medals(
933 &self,
934 wallet: WalletAddress,
935 ) -> Result<PlatformMetricMedals> {
936 let metrics = self.db.get_platform_wallet_metrics().await?;
937
938 let rows: Vec<(WalletAddress, Decimal, Decimal, Decimal)> = metrics
939 .into_iter()
940 .map(|m| {
941 let efficiency = if m.volume == dec!(0) {
942 dec!(0)
943 } else {
944 m.realized / m.volume
945 };
946 (m.wallet, m.realized, m.volume, efficiency)
947 })
948 .collect();
949
950 let pnl_rows: Vec<(WalletAddress, Decimal, Decimal)> =
951 rows.iter().map(|row| (row.0, row.1, row.2)).collect();
952 let volume_rows: Vec<(WalletAddress, Decimal, Decimal)> =
953 rows.iter().map(|row| (row.0, row.2, row.2)).collect();
954 let efficiency_rows: Vec<(WalletAddress, Decimal, Decimal)> =
955 rows.iter().map(|row| (row.0, row.3, row.2)).collect();
956
957 Ok(PlatformMetricMedals {
958 pnl: Self::wallet_medal_for_metric(&pnl_rows, wallet, false),
959 volume: Self::wallet_medal_for_metric(&volume_rows, wallet, false),
960 efficiency: Self::wallet_medal_for_metric(&efficiency_rows, wallet, true),
961 })
962 }
963
964 pub async fn get_active_competition(
965 &self,
966 now_ts_ms: i64,
967 ) -> Result<Option<CompetitionRecord>> {
968 Ok(self.db.get_active_competition(now_ts_ms).await?)
969 }
970
971 pub async fn get_latest_completed_competition(
972 &self,
973 now_ts_ms: i64,
974 ) -> Result<Option<CompetitionRecord>> {
975 Ok(self.db.get_latest_completed_competition(now_ts_ms).await?)
976 }
977
978 async fn get_finalized_leaderboard(
979 &self,
980 competition_id: i64,
981 ) -> Result<Vec<LeaderboardEntry>> {
982 let stats = self.db.get_finalized_stats(competition_id).await?;
983
984 let usernames = self
985 .get_display_usernames(stats.iter().map(|row| row.wallet))
986 .await?;
987
988 let mut entries = Vec::with_capacity(stats.len());
989 for row in stats {
990 entries.push(LeaderboardEntry {
991 rank: row.rank as usize,
992 wallet: row.wallet,
993 username: usernames
994 .get(&row.wallet)
995 .cloned()
996 .unwrap_or_else(|| truncate_wallet(row.wallet)),
997 pnl: row.pnl,
998 volume: row.volume,
999 efficiency: row.efficiency,
1000 medal: row.medal.and_then(|m| u8::try_from(m).ok()),
1001 });
1002 }
1003
1004 Ok(entries)
1005 }
1006
1007 async fn compute_live_leaderboard(
1008 &self,
1009 competition: &CompetitionRecord,
1010 sort_by: LeaderboardSortBy,
1011 sort_order: SortOrder,
1012 use_historical_theoretical_marks: bool,
1013 cutoff_ts_ms: i64,
1014 ) -> Result<Vec<LeaderboardEntry>> {
1015 let fills = self.db.get_competition_fills_before(cutoff_ts_ms).await?;
1016
1017 if fills.is_empty() {
1018 return Ok(Vec::new());
1019 }
1020
1021 let mut wallet_symbol_lots: HashMap<WalletAddress, HashMap<String, SymbolLots>> =
1022 HashMap::new();
1023
1024 for fill in fills {
1025 let in_window =
1026 fill.timestamp_ms >= competition.start_ts_ms && fill.timestamp_ms < cutoff_ts_ms;
1027 let symbol_map = wallet_symbol_lots.entry(fill.wallet).or_default();
1028 let lots = symbol_map.entry(fill.symbol.clone()).or_default();
1029 lots.apply_fill(&fill.side, fill.price, fill.size, fill.fee, in_window)?;
1030 }
1031
1032 let symbols_needing_marks: Vec<String> = wallet_symbol_lots
1033 .values()
1034 .flat_map(|m| {
1035 m.iter()
1036 .filter(|&(_symbol, lots)| lots.has_in_window_lots())
1037 .map(|(symbol, _lots)| symbol.clone())
1038 })
1039 .collect::<std::collections::BTreeSet<_>>()
1040 .into_iter()
1041 .collect();
1042
1043 let marks = if use_historical_theoretical_marks {
1044 self.get_historical_theoretical_marks(&symbols_needing_marks, cutoff_ts_ms)
1045 .await?
1046 } else {
1047 self.get_live_theoretical_marks(&symbols_needing_marks)
1048 .await?
1049 };
1050
1051 let usernames = self
1052 .get_display_usernames(wallet_symbol_lots.keys().copied())
1053 .await?;
1054
1055 let mut leaderboard: Vec<LeaderboardEntry> = Vec::new();
1056
1057 for (wallet, symbol_lots) in wallet_symbol_lots {
1058 let mut stats = WalletStats {
1059 realized: dec!(0),
1060 unrealized: dec!(0),
1061 fees: dec!(0),
1062 volume: dec!(0),
1063 };
1064
1065 for (symbol, lots) in symbol_lots {
1066 stats.realized += lots.realized;
1067 stats.fees += lots.fees;
1068 stats.volume += lots.volume;
1069
1070 if lots.has_in_window_lots() {
1071 let mark = marks.get(&symbol).ok_or_else(|| {
1072 CompetitionError::Unavailable(format!(
1073 "missing mark for symbol {} while computing leaderboard",
1074 symbol
1075 ))
1076 })?;
1077 stats.unrealized += lots.unrealized_with_mark(*mark);
1078 }
1079 }
1080
1081 if stats.realized == dec!(0)
1082 && stats.unrealized == dec!(0)
1083 && stats.fees == dec!(0)
1084 && stats.volume == dec!(0)
1085 {
1086 continue;
1087 }
1088
1089 leaderboard.push(LeaderboardEntry {
1090 rank: 0,
1091 wallet,
1092 username: usernames
1093 .get(&wallet)
1094 .cloned()
1095 .unwrap_or_else(|| truncate_wallet(wallet)),
1096 pnl: stats.pnl(),
1097 volume: stats.volume,
1098 efficiency: stats.efficiency(),
1099 medal: None,
1100 });
1101 }
1102
1103 Self::sort_entries(&mut leaderboard, sort_by, sort_order);
1104
1105 for (idx, row) in leaderboard.iter_mut().enumerate() {
1106 row.rank = idx + 1;
1107 row.medal = match row.rank {
1108 1 => Some(1),
1109 2 => Some(2),
1110 3 => Some(3),
1111 _ => None,
1112 };
1113 }
1114
1115 Ok(leaderboard)
1116 }
1117
1118 fn sort_entries(
1119 entries: &mut [LeaderboardEntry],
1120 sort_by: LeaderboardSortBy,
1121 sort_order: SortOrder,
1122 ) {
1123 entries.sort_by(|a, b| {
1124 let primary = match sort_by {
1125 LeaderboardSortBy::Pnl => a.pnl.partial_cmp(&b.pnl).unwrap_or(Ordering::Equal),
1126 LeaderboardSortBy::Volume => {
1127 a.volume.partial_cmp(&b.volume).unwrap_or(Ordering::Equal)
1128 }
1129 LeaderboardSortBy::Efficiency => a
1130 .efficiency
1131 .partial_cmp(&b.efficiency)
1132 .unwrap_or(Ordering::Equal),
1133 };
1134
1135 let primary = match sort_order {
1136 SortOrder::Asc => primary,
1137 SortOrder::Desc => primary.reverse(),
1138 };
1139
1140 if primary != Ordering::Equal {
1141 return primary;
1142 }
1143
1144 let secondary = b.volume.partial_cmp(&a.volume).unwrap_or(Ordering::Equal);
1145 if secondary != Ordering::Equal {
1146 return secondary;
1147 }
1148
1149 a.wallet.to_string().cmp(&b.wallet.to_string())
1150 });
1151 }
1152
1153 fn metric_value(entry: &LeaderboardEntry, sort_by: LeaderboardSortBy) -> Decimal {
1154 match sort_by {
1155 LeaderboardSortBy::Pnl => entry.pnl,
1156 LeaderboardSortBy::Volume => entry.volume,
1157 LeaderboardSortBy::Efficiency => entry.efficiency,
1158 }
1159 }
1160
1161 fn wallet_medal_for_metric(
1162 rows: &[(WalletAddress, Decimal, Decimal)],
1163 wallet: WalletAddress,
1164 require_positive_volume: bool,
1165 ) -> Option<u8> {
1166 let mut ordered = rows.to_vec();
1167 ordered.sort_by(|a, b| {
1168 let primary = b.1.partial_cmp(&a.1).unwrap_or(Ordering::Equal);
1169 if primary != Ordering::Equal {
1170 return primary;
1171 }
1172 let volume_tiebreak = b.2.partial_cmp(&a.2).unwrap_or(Ordering::Equal);
1173 if volume_tiebreak != Ordering::Equal {
1174 return volume_tiebreak;
1175 }
1176 a.0.to_string().cmp(&b.0.to_string())
1177 });
1178
1179 let mut eligible_rank = 0_usize;
1180 for row in &ordered {
1181 if require_positive_volume && row.2 == dec!(0) {
1182 continue;
1183 }
1184 eligible_rank += 1;
1185 if row.0 == wallet {
1186 return match eligible_rank {
1187 1 => Some(1),
1188 2 => Some(2),
1189 3 => Some(3),
1190 _ => None,
1191 };
1192 }
1193 }
1194 None
1195 }
1196
1197 async fn get_historical_theoretical_marks(
1198 &self,
1199 symbols: &[String],
1200 cutoff_ts_ms: i64,
1201 ) -> Result<HashMap<String, Decimal>> {
1202 if symbols.is_empty() {
1203 return Ok(HashMap::new());
1204 }
1205
1206 let records = self
1207 .db
1208 .get_historical_theo_marks(symbols, cutoff_ts_ms)
1209 .await?;
1210
1211 let mut map = HashMap::with_capacity(records.len());
1212 for record in records {
1213 map.insert(record.symbol, record.theoretical_price);
1214 }
1215
1216 let missing_symbols: Vec<String> = symbols
1217 .iter()
1218 .filter(|symbol| !map.contains_key(symbol.as_str()))
1219 .cloned()
1220 .collect();
1221
1222 if missing_symbols.is_empty() {
1223 return Ok(map);
1224 }
1225
1226 tracing::warn!(
1227 cutoff_ts_ms,
1228 missing_symbols = ?missing_symbols,
1229 "Falling back to live theoretical marks for symbols missing historical snapshots"
1230 );
1231
1232 let live_fallback_marks = self.get_live_theoretical_marks(&missing_symbols).await?;
1233 map.extend(live_fallback_marks);
1234
1235 Ok(map)
1236 }
1237
1238 async fn get_live_theoretical_marks(
1239 &self,
1240 symbols: &[String],
1241 ) -> Result<HashMap<String, Decimal>> {
1242 let mut map = HashMap::with_capacity(symbols.len());
1243
1244 for symbol in symbols {
1245 let mark_f64 = get_symbol_theoretical_price(self.greeks_cache.as_ref(), symbol)
1246 .await
1247 .map_err(|error| CompetitionError::Unavailable(error.to_string()))?;
1248 let mark = Decimal::from_f64_retain(mark_f64)
1249 .ok_or_else(|| anyhow!("invalid theoretical price decimal for {}", symbol))?;
1250 map.insert(symbol.clone(), mark);
1251 }
1252
1253 Ok(map)
1254 }
1255
1256 fn validate_competition_window(start_ts_ms: i64, end_ts_ms: i64) -> Result<()> {
1257 if start_ts_ms >= end_ts_ms {
1258 return Err(CompetitionError::BadRequest(
1259 "start_ts_ms must be less than end_ts_ms".to_string(),
1260 ));
1261 }
1262 Ok(())
1263 }
1264
1265 fn validate_win_conditions(
1266 win_conditions: &[String],
1267 primary_win_condition: &str,
1268 ) -> Result<()> {
1269 if win_conditions.is_empty() {
1270 return Err(CompetitionError::BadRequest(
1271 "win_conditions must be non-empty".to_string(),
1272 ));
1273 }
1274 if !win_conditions
1275 .iter()
1276 .all(|cond| CompetitionWinCondition::parse(cond).is_some())
1277 {
1278 return Err(CompetitionError::BadRequest(
1279 "win_conditions must contain only pnl|volume|efficiency".to_string(),
1280 ));
1281 }
1282 if CompetitionWinCondition::parse(primary_win_condition).is_none() {
1283 return Err(CompetitionError::BadRequest(
1284 "primary_win_condition must be one of pnl|volume|efficiency".to_string(),
1285 ));
1286 }
1287 if !win_conditions
1288 .iter()
1289 .any(|cond| cond == primary_win_condition)
1290 {
1291 return Err(CompetitionError::BadRequest(
1292 "primary_win_condition must be included in win_conditions".to_string(),
1293 ));
1294 }
1295 Ok(())
1296 }
1297}
1298
1299fn truncate_wallet(wallet: WalletAddress) -> String {
1300 let full = wallet.to_string();
1301 if full.len() <= 10 {
1302 full
1303 } else {
1304 format!("{}...{}", &full[..6], &full[full.len() - 4..])
1305 }
1306}
1307
1308#[cfg(test)]
1309mod tests {
1310 use super::*;
1311 use std::str::FromStr;
1312
1313 fn wallet(index: u8) -> WalletAddress {
1314 WalletAddress::from_str(&format!("0x{:040x}", index)).expect("valid wallet")
1315 }
1316
1317 #[test]
1318 fn competition_state_transitions_follow_bounds() {
1319 assert_eq!(
1320 CompetitionState::from_bounds(2000, 3000, 1999),
1321 CompetitionState::Pre
1322 );
1323 assert_eq!(
1324 CompetitionState::from_bounds(2000, 3000, 2000),
1325 CompetitionState::Active
1326 );
1327 assert_eq!(
1328 CompetitionState::from_bounds(2000, 3000, 2999),
1329 CompetitionState::Active
1330 );
1331 assert_eq!(
1332 CompetitionState::from_bounds(2000, 3000, 3000),
1333 CompetitionState::Post
1334 );
1335 }
1336
1337 #[test]
1338 fn lot_accounting_handles_long_close_and_reversal() {
1339 let mut lots = SymbolLots::default();
1340 lots.apply_fill("Buy", dec!(100), dec!(10), dec!(0), true)
1341 .expect("buy fill should apply");
1342 lots.apply_fill("Sell", dec!(120), dec!(4), dec!(0), true)
1343 .expect("partial close should apply");
1344 assert_eq!(lots.realized, dec!(80));
1345 assert_eq!(lots.unrealized_with_mark(dec!(110)), dec!(60));
1346
1347 lots.apply_fill("Sell", dec!(90), dec!(10), dec!(0), true)
1348 .expect("reversal should apply");
1349 assert_eq!(lots.realized, dec!(20));
1350 assert_eq!(lots.unrealized_with_mark(dec!(80)), dec!(40));
1351 }
1352
1353 #[test]
1354 fn sorting_uses_requested_metric_then_tiebreakers() {
1355 let mut entries = vec![
1356 LeaderboardEntry {
1357 rank: 0,
1358 wallet: wallet(1),
1359 username: "u1".to_string(),
1360 pnl: dec!(10),
1361 volume: dec!(100),
1362 efficiency: dec!(0.1),
1363 medal: None,
1364 },
1365 LeaderboardEntry {
1366 rank: 0,
1367 wallet: wallet(2),
1368 username: "u2".to_string(),
1369 pnl: dec!(10),
1370 volume: dec!(200),
1371 efficiency: dec!(0.05),
1372 medal: None,
1373 },
1374 LeaderboardEntry {
1375 rank: 0,
1376 wallet: wallet(3),
1377 username: "u3".to_string(),
1378 pnl: dec!(15),
1379 volume: dec!(150),
1380 efficiency: dec!(0.1),
1381 medal: None,
1382 },
1383 ];
1384
1385 CompetitionService::sort_entries(&mut entries, LeaderboardSortBy::Pnl, SortOrder::Desc);
1386 assert_eq!(entries[0].wallet, wallet(3));
1387 assert_eq!(entries[1].wallet, wallet(2));
1388 assert_eq!(entries[2].wallet, wallet(1));
1389 }
1390
1391 #[test]
1392 fn win_condition_validation_enforces_membership() {
1393 CompetitionService::validate_win_conditions(&["pnl".to_string()], "pnl")
1394 .expect("valid win condition");
1395 assert!(CompetitionService::validate_win_conditions(&[], "pnl").is_err());
1396 assert!(
1397 CompetitionService::validate_win_conditions(&["pnl".to_string()], "volume").is_err()
1398 );
1399 }
1400}