Skip to main content

hypercall_api/upstash/
sources.rs

1use anyhow::Result;
2use async_trait::async_trait;
3use axum::body::Bytes;
4use std::sync::{Arc, Mutex};
5use std::time::{Duration, Instant, SystemTime};
6
7use super::codec::{compress_snapshot_payload, version_upstash_key};
8use super::publisher::{system_time_to_millis, UpstashSnapshotSource};
9use crate::caches::MarqueeMoversCache;
10
11// ---------------------------------------------------------------------------
12// Helpers
13// ---------------------------------------------------------------------------
14
15fn parse_key_env(env_name: &str, default: &str) -> Result<String> {
16    let raw = match std::env::var(env_name) {
17        Ok(v) if v.trim().is_empty() => {
18            return Err(anyhow::anyhow!(
19                "{env_name} is set but empty; either provide a valid key or unset it"
20            ));
21        }
22        Ok(v) => v,
23        Err(_) => default.to_string(),
24    };
25    version_upstash_key(&raw)
26        .map_err(|e| anyhow::anyhow!("{env_name} must resolve to a non-empty value: {e}"))
27}
28
29fn parse_ttl_env(env_name: &str, default: u64) -> u64 {
30    match std::env::var(env_name).ok().map(|v| v.trim().to_string()) {
31        Some(v) if !v.is_empty() => match v.parse::<u64>() {
32            Ok(n) if n > 0 => n,
33            Ok(_) => {
34                tracing::warn!("{env_name}=0 is invalid, using default {default}");
35                default
36            }
37            Err(e) => {
38                tracing::warn!(
39                    "{env_name}={v:?} is not a valid u64 ({e}), using default {default}"
40                );
41                default
42            }
43        },
44        _ => default,
45    }
46}
47
48// ===========================================================================
49// StandardSnapshotSource — one struct for all sync snapshot caches
50// ===========================================================================
51
52pub struct StandardSnapshotSource {
53    name: &'static str,
54    redis_key: String,
55    ttl_seconds: u64,
56    last_built_at_ms: Mutex<Option<u64>>,
57    get_snapshot: Box<dyn Fn() -> (Bytes, SystemTime) + Send + Sync>,
58}
59
60impl StandardSnapshotSource {
61    pub fn new(
62        name: &'static str,
63        key_env: &str,
64        key_default: &str,
65        ttl_env: &str,
66        ttl_default: u64,
67        get_snapshot: Box<dyn Fn() -> (Bytes, SystemTime) + Send + Sync>,
68    ) -> Result<Self> {
69        Ok(Self {
70            name,
71            redis_key: parse_key_env(key_env, key_default)?,
72            ttl_seconds: parse_ttl_env(ttl_env, ttl_default),
73            last_built_at_ms: Mutex::new(None),
74            get_snapshot,
75        })
76    }
77}
78
79#[async_trait]
80impl UpstashSnapshotSource for StandardSnapshotSource {
81    fn name(&self) -> &'static str {
82        self.name
83    }
84    fn key(&self) -> &str {
85        &self.redis_key
86    }
87    fn ttl_seconds(&self) -> u64 {
88        self.ttl_seconds
89    }
90
91    async fn next_payload(&self) -> Result<Option<(Vec<u8>, u64)>> {
92        let (payload, built_at) = (self.get_snapshot)();
93        let built_at_ms = system_time_to_millis(built_at)?;
94        {
95            let last = self.last_built_at_ms.lock().unwrap();
96            if *last == Some(built_at_ms) {
97                return Ok(None);
98            }
99        }
100        let compressed = compress_snapshot_payload(payload.as_ref())?;
101        Ok(Some((compressed, built_at_ms)))
102    }
103
104    fn on_success(&self, built_at_ms: u64, elapsed: Duration) {
105        {
106            let mut last = self.last_built_at_ms.lock().unwrap();
107            *last = Some(built_at_ms);
108        }
109        let n = self.name;
110        metrics::counter!(format!("ht_{n}_upstash_publish_total"), "status" => "success")
111            .increment(1);
112        metrics::histogram!(format!("ht_{n}_upstash_publish_seconds"))
113            .record(elapsed.as_secs_f64());
114        metrics::gauge!(format!("ht_{n}_upstash_last_success_timestamp"))
115            .set(built_at_ms as f64 / 1000.0);
116    }
117
118    fn on_skip(&self) {
119        let n = self.name;
120        metrics::counter!(format!("ht_{n}_upstash_publish_total"), "status" => "skipped_unchanged")
121            .increment(1);
122    }
123
124    fn on_error(&self) {
125        let n = self.name;
126        metrics::counter!(format!("ht_{n}_upstash_publish_total"), "status" => "error")
127            .increment(1);
128    }
129}
130
131// ===========================================================================
132// MarqueeMoversSource — async refresh + rate limiting
133// ===========================================================================
134
135const MARQUEE_MOVERS_INTERVAL: Duration = Duration::from_secs(15);
136
137pub struct MarqueeMoversSource {
138    cache: Arc<MarqueeMoversCache>,
139    redis_key: String,
140    ttl_seconds: u64,
141    last_built_at_ms: Mutex<Option<u64>>,
142    last_refresh: Mutex<Option<Instant>>,
143}
144
145impl MarqueeMoversSource {
146    pub fn new(cache: Arc<MarqueeMoversCache>) -> Result<Self> {
147        Ok(Self {
148            cache,
149            redis_key: parse_key_env("MARQUEE_MOVERS_UPSTASH_KEY", "marquee:movers:latest")?,
150            ttl_seconds: parse_ttl_env("MARQUEE_MOVERS_UPSTASH_TTL_SECONDS", 60),
151            last_built_at_ms: Mutex::new(None),
152            last_refresh: Mutex::new(None),
153        })
154    }
155}
156
157#[async_trait]
158impl UpstashSnapshotSource for MarqueeMoversSource {
159    fn name(&self) -> &'static str {
160        "marquee_movers"
161    }
162    fn key(&self) -> &str {
163        &self.redis_key
164    }
165    fn ttl_seconds(&self) -> u64 {
166        self.ttl_seconds
167    }
168
169    async fn next_payload(&self) -> Result<Option<(Vec<u8>, u64)>> {
170        {
171            let mut last = self.last_refresh.lock().unwrap();
172            if let Some(t) = *last {
173                if t.elapsed() < MARQUEE_MOVERS_INTERVAL {
174                    return Ok(None);
175                }
176            }
177            *last = Some(Instant::now());
178        }
179
180        self.cache.refresh().await;
181
182        let (payload, built_at) = self.cache.published_response().await;
183        let built_at_ms = system_time_to_millis(built_at)?;
184        {
185            let last = self.last_built_at_ms.lock().unwrap();
186            if *last == Some(built_at_ms) {
187                return Ok(None);
188            }
189        }
190        let compressed = compress_snapshot_payload(payload.as_ref())?;
191        Ok(Some((compressed, built_at_ms)))
192    }
193
194    fn on_success(&self, built_at_ms: u64, elapsed: Duration) {
195        {
196            let mut last = self.last_built_at_ms.lock().unwrap();
197            *last = Some(built_at_ms);
198        }
199        metrics::counter!("ht_marquee_movers_upstash_publish_total", "status" => "success")
200            .increment(1);
201        metrics::histogram!("ht_marquee_movers_upstash_publish_seconds")
202            .record(elapsed.as_secs_f64());
203        metrics::gauge!("ht_marquee_movers_upstash_last_success_timestamp")
204            .set(built_at_ms as f64 / 1000.0);
205    }
206
207    fn on_skip(&self) {
208        metrics::counter!("ht_marquee_movers_upstash_publish_total", "status" => "skipped_unchanged")
209            .increment(1);
210    }
211
212    fn on_error(&self) {
213        metrics::counter!("ht_marquee_movers_upstash_publish_total", "status" => "error")
214            .increment(1);
215    }
216
217    fn min_interval(&self) -> Option<Duration> {
218        Some(MARQUEE_MOVERS_INTERVAL)
219    }
220}