Skip to main content

hypercall_api/caches/
competitions.rs

1use anyhow::{anyhow, Result};
2use arc_swap::ArcSwap;
3use axum::body::Bytes;
4use chrono::Utc;
5use serde::Serialize;
6use std::sync::Arc;
7use std::time::{Duration, Instant};
8use tracing::error;
9
10use crate::models::CompetitionData;
11use hypercall_competition::{competition_state, CompetitionService};
12
13const DEFAULT_REFRESH_INTERVAL_MS: u64 = 10_000;
14const COMPETITIONS_SNAPSHOT_ENVELOPE_SCHEMA_VERSION: u32 = 1;
15const MAX_COMPETITIONS: usize = 200;
16
17#[derive(Clone)]
18struct CompetitionsSnapshotData {
19    response_bytes_upstash: Bytes,
20    built_at: std::time::SystemTime,
21}
22
23pub struct CompetitionsSnapshotCache {
24    competition_service: Arc<CompetitionService>,
25    snapshot: ArcSwap<CompetitionsSnapshotData>,
26    refresh_interval: Duration,
27}
28
29#[derive(Serialize)]
30struct PublishedCompetitionsSnapshot<'a> {
31    schema_version: u32,
32    built_at_ms: u64,
33    payload: &'a CompetitionsSnapshotPayload,
34}
35
36#[derive(Serialize)]
37struct CompetitionsSnapshotPayload {
38    success: bool,
39    data: Vec<CompetitionData>,
40}
41
42impl CompetitionsSnapshotCache {
43    pub fn new(competition_service: Arc<CompetitionService>) -> Self {
44        let empty_payload = Bytes::from_static(
45            br#"{"schema_version":1,"built_at_ms":0,"payload":{"success":false,"data":[]}}"#,
46        );
47
48        Self {
49            competition_service,
50            snapshot: ArcSwap::from_pointee(CompetitionsSnapshotData {
51                response_bytes_upstash: empty_payload,
52                built_at: std::time::SystemTime::UNIX_EPOCH,
53            }),
54            refresh_interval: Duration::from_millis(DEFAULT_REFRESH_INTERVAL_MS),
55        }
56    }
57
58    pub fn with_refresh_interval(mut self, refresh_interval: Duration) -> Self {
59        self.refresh_interval = refresh_interval;
60        self
61    }
62
63    pub async fn refresh_once(&self) -> Result<()> {
64        let start = Instant::now();
65        let now_ts_ms = Utc::now().timestamp_millis();
66
67        let rows = self
68            .competition_service
69            .list_competitions(None, None, None, MAX_COMPETITIONS, 0, now_ts_ms)
70            .await
71            .map_err(|e| anyhow!("Failed to fetch competitions: {}", e))?;
72
73        let data: Vec<CompetitionData> = rows
74            .into_iter()
75            .map(|row| {
76                let state = competition_state(&row, now_ts_ms).as_str().to_string();
77                CompetitionData {
78                    id: row.id,
79                    name: row.name,
80                    description: row.description,
81                    rules_url: row.rules_url,
82                    rules_content: row.rules_content,
83                    win_conditions: row.win_conditions,
84                    primary_win_condition: row.primary_win_condition,
85                    start_ts_ms: row.start_ts_ms,
86                    end_ts_ms: row.end_ts_ms,
87                    state,
88                    created_at: row.created_at,
89                    updated_at: row.updated_at,
90                }
91            })
92            .collect();
93
94        let built_at = std::time::SystemTime::now();
95        let payload = CompetitionsSnapshotPayload {
96            success: true,
97            data,
98        };
99        let envelope = PublishedCompetitionsSnapshot {
100            schema_version: COMPETITIONS_SNAPSHOT_ENVELOPE_SCHEMA_VERSION,
101            built_at_ms: system_time_to_millis(built_at)?,
102            payload: &payload,
103        };
104        let response_bytes_upstash = Bytes::from(
105            serde_json::to_vec(&envelope)
106                .map_err(|e| anyhow!("Failed to serialize competitions snapshot: {}", e))?,
107        );
108
109        self.snapshot.store(Arc::new(CompetitionsSnapshotData {
110            response_bytes_upstash,
111            built_at,
112        }));
113
114        metrics::counter!("ht_competitions_snapshot_refresh_total", "status" => "success")
115            .increment(1);
116        metrics::histogram!("ht_competitions_snapshot_refresh_seconds")
117            .record(start.elapsed().as_secs_f64());
118        Ok(())
119    }
120
121    pub async fn initialize(&self) {
122        if let Err(e) = self.refresh_once().await {
123            metrics::counter!("ht_competitions_snapshot_refresh_total", "status" => "error")
124                .increment(1);
125            error!(error = %e, "Initial competitions snapshot build failed; serving cold-start payload");
126        }
127    }
128
129    pub fn start_with_shutdown(
130        self: Arc<Self>,
131        mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
132    ) -> tokio::task::JoinHandle<()> {
133        tokio::spawn(async move {
134            let mut interval = tokio::time::interval(self.refresh_interval);
135            loop {
136                tokio::select! {
137                    _ = shutdown_rx.recv() => {
138                        break;
139                    }
140                    _ = interval.tick() => {
141                        if let Err(e) = self.refresh_once().await {
142                            metrics::counter!("ht_competitions_snapshot_refresh_total", "status" => "error").increment(1);
143                            error!(error = %e, "Failed to refresh competitions snapshot; keeping last-good");
144                        }
145                    }
146                }
147            }
148        })
149    }
150
151    /// Returns the pre-serialized Upstash envelope and the build time.
152    pub fn published_response(&self) -> (Bytes, std::time::SystemTime) {
153        let snap = self.snapshot.load();
154        (snap.response_bytes_upstash.clone(), snap.built_at)
155    }
156}
157
158fn system_time_to_millis(value: std::time::SystemTime) -> Result<u64> {
159    Ok(value
160        .duration_since(std::time::SystemTime::UNIX_EPOCH)
161        .map_err(|e| anyhow!("Invalid snapshot build time: {}", e))?
162        .as_millis() as u64)
163}