hypercall_api/upstash/
mod.rs1mod codec;
2mod publisher;
3mod sources;
4
5pub use codec::{compress_snapshot_payload, version_upstash_key};
6pub use publisher::{system_time_to_millis, UpstashBatchPublisher, UpstashSnapshotSource};
7pub use sources::{MarqueeMoversSource, StandardSnapshotSource};
8
9use anyhow::Result;
10use std::sync::Arc;
11
12use crate::caches::{
13 CompetitionsSnapshotCache, InstrumentsSnapshotCache, MarketsSnapshotCache, MarqueeMoversCache,
14 OptionsSummarySnapshotCache, SparklinesSnapshotCache,
15};
16use hypercall_db::AnalyticsReader;
17
18pub struct UpstashInitResult {
19 pub redis_client: Option<redis::Client>,
20 pub publisher: Option<Arc<UpstashBatchPublisher>>,
21}
22
23pub async fn init(
24 markets_cache: &Arc<MarketsSnapshotCache>,
25 competitions_cache: &Arc<CompetitionsSnapshotCache>,
26 sparklines_cache: &Arc<SparklinesSnapshotCache>,
27 instruments_cache: &Arc<InstrumentsSnapshotCache>,
28 options_summary_cache: &Arc<OptionsSummarySnapshotCache>,
29 diesel_db: Arc<dyn AnalyticsReader>,
30) -> Result<UpstashInitResult> {
31 let env_configured = |name: &str| -> bool {
32 std::env::var(name)
33 .ok()
34 .map(|v| !v.trim().is_empty())
35 .unwrap_or(false)
36 };
37
38 let endpoint_configured = env_configured("MARKETS_SNAPSHOT_UPSTASH_ENDPOINT");
39 let port_configured = env_configured("MARKETS_SNAPSHOT_UPSTASH_PORT");
40 let password_configured = env_configured("MARKETS_SNAPSHOT_UPSTASH_PASSWORD");
41 let key_configured = env_configured("MARKETS_SNAPSHOT_UPSTASH_KEY");
42
43 metrics::counter!("ht_markets_upstash_init_total", "status" => "attempt").increment(1);
44 tracing::info!(
45 endpoint_configured,
46 port_configured,
47 password_configured,
48 key_configured,
49 "Evaluating Upstash publisher configuration"
50 );
51
52 let batch_publisher = match UpstashBatchPublisher::from_env() {
53 Ok(Some(bp)) => {
54 metrics::counter!("ht_markets_upstash_init_total", "status" => "initialized")
55 .increment(1);
56 bp
57 }
58 Ok(None) => {
59 metrics::counter!("ht_markets_upstash_init_total", "status" => "disabled").increment(1);
60 tracing::info!(
61 endpoint_configured,
62 port_configured,
63 password_configured,
64 key_configured,
65 "Upstash publishers disabled (env not configured)"
66 );
67 return Ok(UpstashInitResult {
68 redis_client: None,
69 publisher: None,
70 });
71 }
72 Err(error) => {
73 metrics::counter!("ht_markets_upstash_init_total", "status" => "error").increment(1);
74 tracing::error!(
75 endpoint_configured,
76 port_configured,
77 password_configured,
78 key_configured,
79 error = %error,
80 "Failed to initialize Upstash publishers"
81 );
82 return Err(error);
83 }
84 };
85
86 let redis_client = Some(batch_publisher.client());
87
88 let marquee_movers_cache = Arc::new(MarqueeMoversCache::new(
89 options_summary_cache.clone(),
90 diesel_db,
91 ));
92
93 let sources: Vec<Arc<dyn UpstashSnapshotSource>> = vec![
94 Arc::new(StandardSnapshotSource::new(
95 "markets",
96 "MARKETS_SNAPSHOT_UPSTASH_KEY",
97 "markets:snapshot:slim:latest",
98 "MARKETS_SNAPSHOT_UPSTASH_TTL_SECONDS",
99 60,
100 {
101 let c = markets_cache.clone();
102 Box::new(move || c.published_response_slim())
103 },
104 )?),
105 Arc::new(StandardSnapshotSource::new(
106 "competitions",
107 "COMPETITIONS_UPSTASH_KEY",
108 "competitions:snapshot:latest",
109 "COMPETITIONS_UPSTASH_TTL_SECONDS",
110 60,
111 {
112 let c = competitions_cache.clone();
113 Box::new(move || c.published_response())
114 },
115 )?),
116 Arc::new(StandardSnapshotSource::new(
117 "sparklines",
118 "SPARKLINES_UPSTASH_KEY",
119 "sparklines:2h:latest",
120 "SPARKLINES_UPSTASH_TTL_SECONDS",
121 300,
122 {
123 let c = sparklines_cache.clone();
124 Box::new(move || c.published_response())
125 },
126 )?),
127 Arc::new(StandardSnapshotSource::new(
128 "instruments",
129 "INSTRUMENTS_UPSTASH_KEY",
130 "instruments:snapshot:latest",
131 "INSTRUMENTS_UPSTASH_TTL_SECONDS",
132 300,
133 {
134 let c = instruments_cache.clone();
135 Box::new(move || c.published_response())
136 },
137 )?),
138 Arc::new(StandardSnapshotSource::new(
139 "options_summary",
140 "OPTIONS_SUMMARY_UPSTASH_KEY",
141 "options-summary:snapshot:latest",
142 "OPTIONS_SUMMARY_UPSTASH_TTL_SECONDS",
143 60,
144 {
145 let c = options_summary_cache.clone();
146 Box::new(move || c.published_response())
147 },
148 )?),
149 Arc::new(MarqueeMoversSource::new(marquee_movers_cache)?),
150 ];
151
152 let publisher = Arc::new(batch_publisher.with_sources(sources));
153
154 tracing::info!(
155 endpoint_configured,
156 port_configured,
157 password_configured,
158 key_configured,
159 "✓ UpstashBatchPublisher initialized (6 sources, 1s tick)"
160 );
161
162 Ok(UpstashInitResult {
163 redis_client,
164 publisher: Some(publisher),
165 })
166}