hypercall_api/upstash/
sources.rs1use anyhow::Result;
2use async_trait::async_trait;
3use axum::body::Bytes;
4use std::sync::{Arc, Mutex};
5use std::time::{Duration, Instant, SystemTime};
6
7use super::codec::{compress_snapshot_payload, version_upstash_key};
8use super::publisher::{system_time_to_millis, UpstashSnapshotSource};
9use crate::caches::MarqueeMoversCache;
10
11fn parse_key_env(env_name: &str, default: &str) -> Result<String> {
16 let raw = match std::env::var(env_name) {
17 Ok(v) if v.trim().is_empty() => {
18 return Err(anyhow::anyhow!(
19 "{env_name} is set but empty; either provide a valid key or unset it"
20 ));
21 }
22 Ok(v) => v,
23 Err(_) => default.to_string(),
24 };
25 version_upstash_key(&raw)
26 .map_err(|e| anyhow::anyhow!("{env_name} must resolve to a non-empty value: {e}"))
27}
28
29fn parse_ttl_env(env_name: &str, default: u64) -> u64 {
30 match std::env::var(env_name).ok().map(|v| v.trim().to_string()) {
31 Some(v) if !v.is_empty() => match v.parse::<u64>() {
32 Ok(n) if n > 0 => n,
33 Ok(_) => {
34 tracing::warn!("{env_name}=0 is invalid, using default {default}");
35 default
36 }
37 Err(e) => {
38 tracing::warn!(
39 "{env_name}={v:?} is not a valid u64 ({e}), using default {default}"
40 );
41 default
42 }
43 },
44 _ => default,
45 }
46}
47
48pub struct StandardSnapshotSource {
53 name: &'static str,
54 redis_key: String,
55 ttl_seconds: u64,
56 last_built_at_ms: Mutex<Option<u64>>,
57 get_snapshot: Box<dyn Fn() -> (Bytes, SystemTime) + Send + Sync>,
58}
59
60impl StandardSnapshotSource {
61 pub fn new(
62 name: &'static str,
63 key_env: &str,
64 key_default: &str,
65 ttl_env: &str,
66 ttl_default: u64,
67 get_snapshot: Box<dyn Fn() -> (Bytes, SystemTime) + Send + Sync>,
68 ) -> Result<Self> {
69 Ok(Self {
70 name,
71 redis_key: parse_key_env(key_env, key_default)?,
72 ttl_seconds: parse_ttl_env(ttl_env, ttl_default),
73 last_built_at_ms: Mutex::new(None),
74 get_snapshot,
75 })
76 }
77}
78
79#[async_trait]
80impl UpstashSnapshotSource for StandardSnapshotSource {
81 fn name(&self) -> &'static str {
82 self.name
83 }
84 fn key(&self) -> &str {
85 &self.redis_key
86 }
87 fn ttl_seconds(&self) -> u64 {
88 self.ttl_seconds
89 }
90
91 async fn next_payload(&self) -> Result<Option<(Vec<u8>, u64)>> {
92 let (payload, built_at) = (self.get_snapshot)();
93 let built_at_ms = system_time_to_millis(built_at)?;
94 {
95 let last = self.last_built_at_ms.lock().unwrap();
96 if *last == Some(built_at_ms) {
97 return Ok(None);
98 }
99 }
100 let compressed = compress_snapshot_payload(payload.as_ref())?;
101 Ok(Some((compressed, built_at_ms)))
102 }
103
104 fn on_success(&self, built_at_ms: u64, elapsed: Duration) {
105 {
106 let mut last = self.last_built_at_ms.lock().unwrap();
107 *last = Some(built_at_ms);
108 }
109 let n = self.name;
110 metrics::counter!(format!("ht_{n}_upstash_publish_total"), "status" => "success")
111 .increment(1);
112 metrics::histogram!(format!("ht_{n}_upstash_publish_seconds"))
113 .record(elapsed.as_secs_f64());
114 metrics::gauge!(format!("ht_{n}_upstash_last_success_timestamp"))
115 .set(built_at_ms as f64 / 1000.0);
116 }
117
118 fn on_skip(&self) {
119 let n = self.name;
120 metrics::counter!(format!("ht_{n}_upstash_publish_total"), "status" => "skipped_unchanged")
121 .increment(1);
122 }
123
124 fn on_error(&self) {
125 let n = self.name;
126 metrics::counter!(format!("ht_{n}_upstash_publish_total"), "status" => "error")
127 .increment(1);
128 }
129}
130
131const MARQUEE_MOVERS_INTERVAL: Duration = Duration::from_secs(15);
136
137pub struct MarqueeMoversSource {
138 cache: Arc<MarqueeMoversCache>,
139 redis_key: String,
140 ttl_seconds: u64,
141 last_built_at_ms: Mutex<Option<u64>>,
142 last_refresh: Mutex<Option<Instant>>,
143}
144
145impl MarqueeMoversSource {
146 pub fn new(cache: Arc<MarqueeMoversCache>) -> Result<Self> {
147 Ok(Self {
148 cache,
149 redis_key: parse_key_env("MARQUEE_MOVERS_UPSTASH_KEY", "marquee:movers:latest")?,
150 ttl_seconds: parse_ttl_env("MARQUEE_MOVERS_UPSTASH_TTL_SECONDS", 60),
151 last_built_at_ms: Mutex::new(None),
152 last_refresh: Mutex::new(None),
153 })
154 }
155}
156
157#[async_trait]
158impl UpstashSnapshotSource for MarqueeMoversSource {
159 fn name(&self) -> &'static str {
160 "marquee_movers"
161 }
162 fn key(&self) -> &str {
163 &self.redis_key
164 }
165 fn ttl_seconds(&self) -> u64 {
166 self.ttl_seconds
167 }
168
169 async fn next_payload(&self) -> Result<Option<(Vec<u8>, u64)>> {
170 {
171 let mut last = self.last_refresh.lock().unwrap();
172 if let Some(t) = *last {
173 if t.elapsed() < MARQUEE_MOVERS_INTERVAL {
174 return Ok(None);
175 }
176 }
177 *last = Some(Instant::now());
178 }
179
180 self.cache.refresh().await;
181
182 let (payload, built_at) = self.cache.published_response().await;
183 let built_at_ms = system_time_to_millis(built_at)?;
184 {
185 let last = self.last_built_at_ms.lock().unwrap();
186 if *last == Some(built_at_ms) {
187 return Ok(None);
188 }
189 }
190 let compressed = compress_snapshot_payload(payload.as_ref())?;
191 Ok(Some((compressed, built_at_ms)))
192 }
193
194 fn on_success(&self, built_at_ms: u64, elapsed: Duration) {
195 {
196 let mut last = self.last_built_at_ms.lock().unwrap();
197 *last = Some(built_at_ms);
198 }
199 metrics::counter!("ht_marquee_movers_upstash_publish_total", "status" => "success")
200 .increment(1);
201 metrics::histogram!("ht_marquee_movers_upstash_publish_seconds")
202 .record(elapsed.as_secs_f64());
203 metrics::gauge!("ht_marquee_movers_upstash_last_success_timestamp")
204 .set(built_at_ms as f64 / 1000.0);
205 }
206
207 fn on_skip(&self) {
208 metrics::counter!("ht_marquee_movers_upstash_publish_total", "status" => "skipped_unchanged")
209 .increment(1);
210 }
211
212 fn on_error(&self) {
213 metrics::counter!("ht_marquee_movers_upstash_publish_total", "status" => "error")
214 .increment(1);
215 }
216
217 fn min_interval(&self) -> Option<Duration> {
218 Some(MARQUEE_MOVERS_INTERVAL)
219 }
220}