hypercall_api/caches/
competitions.rs1use anyhow::{anyhow, Result};
2use arc_swap::ArcSwap;
3use axum::body::Bytes;
4use chrono::Utc;
5use serde::Serialize;
6use std::sync::Arc;
7use std::time::{Duration, Instant};
8use tracing::error;
9
10use crate::models::CompetitionData;
11use hypercall_competition::{competition_state, CompetitionService};
12
13const DEFAULT_REFRESH_INTERVAL_MS: u64 = 10_000;
14const COMPETITIONS_SNAPSHOT_ENVELOPE_SCHEMA_VERSION: u32 = 1;
15const MAX_COMPETITIONS: usize = 200;
16
17#[derive(Clone)]
18struct CompetitionsSnapshotData {
19 response_bytes_upstash: Bytes,
20 built_at: std::time::SystemTime,
21}
22
23pub struct CompetitionsSnapshotCache {
24 competition_service: Arc<CompetitionService>,
25 snapshot: ArcSwap<CompetitionsSnapshotData>,
26 refresh_interval: Duration,
27}
28
29#[derive(Serialize)]
30struct PublishedCompetitionsSnapshot<'a> {
31 schema_version: u32,
32 built_at_ms: u64,
33 payload: &'a CompetitionsSnapshotPayload,
34}
35
36#[derive(Serialize)]
37struct CompetitionsSnapshotPayload {
38 success: bool,
39 data: Vec<CompetitionData>,
40}
41
42impl CompetitionsSnapshotCache {
43 pub fn new(competition_service: Arc<CompetitionService>) -> Self {
44 let empty_payload = Bytes::from_static(
45 br#"{"schema_version":1,"built_at_ms":0,"payload":{"success":false,"data":[]}}"#,
46 );
47
48 Self {
49 competition_service,
50 snapshot: ArcSwap::from_pointee(CompetitionsSnapshotData {
51 response_bytes_upstash: empty_payload,
52 built_at: std::time::SystemTime::UNIX_EPOCH,
53 }),
54 refresh_interval: Duration::from_millis(DEFAULT_REFRESH_INTERVAL_MS),
55 }
56 }
57
58 pub fn with_refresh_interval(mut self, refresh_interval: Duration) -> Self {
59 self.refresh_interval = refresh_interval;
60 self
61 }
62
63 pub async fn refresh_once(&self) -> Result<()> {
64 let start = Instant::now();
65 let now_ts_ms = Utc::now().timestamp_millis();
66
67 let rows = self
68 .competition_service
69 .list_competitions(None, None, None, MAX_COMPETITIONS, 0, now_ts_ms)
70 .await
71 .map_err(|e| anyhow!("Failed to fetch competitions: {}", e))?;
72
73 let data: Vec<CompetitionData> = rows
74 .into_iter()
75 .map(|row| {
76 let state = competition_state(&row, now_ts_ms).as_str().to_string();
77 CompetitionData {
78 id: row.id,
79 name: row.name,
80 description: row.description,
81 rules_url: row.rules_url,
82 rules_content: row.rules_content,
83 win_conditions: row.win_conditions,
84 primary_win_condition: row.primary_win_condition,
85 start_ts_ms: row.start_ts_ms,
86 end_ts_ms: row.end_ts_ms,
87 state,
88 created_at: row.created_at,
89 updated_at: row.updated_at,
90 }
91 })
92 .collect();
93
94 let built_at = std::time::SystemTime::now();
95 let payload = CompetitionsSnapshotPayload {
96 success: true,
97 data,
98 };
99 let envelope = PublishedCompetitionsSnapshot {
100 schema_version: COMPETITIONS_SNAPSHOT_ENVELOPE_SCHEMA_VERSION,
101 built_at_ms: system_time_to_millis(built_at)?,
102 payload: &payload,
103 };
104 let response_bytes_upstash = Bytes::from(
105 serde_json::to_vec(&envelope)
106 .map_err(|e| anyhow!("Failed to serialize competitions snapshot: {}", e))?,
107 );
108
109 self.snapshot.store(Arc::new(CompetitionsSnapshotData {
110 response_bytes_upstash,
111 built_at,
112 }));
113
114 metrics::counter!("ht_competitions_snapshot_refresh_total", "status" => "success")
115 .increment(1);
116 metrics::histogram!("ht_competitions_snapshot_refresh_seconds")
117 .record(start.elapsed().as_secs_f64());
118 Ok(())
119 }
120
121 pub async fn initialize(&self) {
122 if let Err(e) = self.refresh_once().await {
123 metrics::counter!("ht_competitions_snapshot_refresh_total", "status" => "error")
124 .increment(1);
125 error!(error = %e, "Initial competitions snapshot build failed; serving cold-start payload");
126 }
127 }
128
129 pub fn start_with_shutdown(
130 self: Arc<Self>,
131 mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
132 ) -> tokio::task::JoinHandle<()> {
133 tokio::spawn(async move {
134 let mut interval = tokio::time::interval(self.refresh_interval);
135 loop {
136 tokio::select! {
137 _ = shutdown_rx.recv() => {
138 break;
139 }
140 _ = interval.tick() => {
141 if let Err(e) = self.refresh_once().await {
142 metrics::counter!("ht_competitions_snapshot_refresh_total", "status" => "error").increment(1);
143 error!(error = %e, "Failed to refresh competitions snapshot; keeping last-good");
144 }
145 }
146 }
147 }
148 })
149 }
150
151 pub fn published_response(&self) -> (Bytes, std::time::SystemTime) {
153 let snap = self.snapshot.load();
154 (snap.response_bytes_upstash.clone(), snap.built_at)
155 }
156}
157
158fn system_time_to_millis(value: std::time::SystemTime) -> Result<u64> {
159 Ok(value
160 .duration_since(std::time::SystemTime::UNIX_EPOCH)
161 .map_err(|e| anyhow!("Invalid snapshot build time: {}", e))?
162 .as_millis() as u64)
163}