Skip to main content

hypercall_api/upstash/
mod.rs

1mod codec;
2mod publisher;
3mod sources;
4
5pub use codec::{compress_snapshot_payload, version_upstash_key};
6pub use publisher::{system_time_to_millis, UpstashBatchPublisher, UpstashSnapshotSource};
7pub use sources::{MarqueeMoversSource, StandardSnapshotSource};
8
9use anyhow::Result;
10use std::sync::Arc;
11
12use crate::caches::{
13    CompetitionsSnapshotCache, InstrumentsSnapshotCache, MarketsSnapshotCache, MarqueeMoversCache,
14    OptionsSummarySnapshotCache, SparklinesSnapshotCache,
15};
16use hypercall_db::AnalyticsReader;
17
18pub struct UpstashInitResult {
19    pub redis_client: Option<redis::Client>,
20    pub publisher: Option<Arc<UpstashBatchPublisher>>,
21}
22
23pub async fn init(
24    markets_cache: &Arc<MarketsSnapshotCache>,
25    competitions_cache: &Arc<CompetitionsSnapshotCache>,
26    sparklines_cache: &Arc<SparklinesSnapshotCache>,
27    instruments_cache: &Arc<InstrumentsSnapshotCache>,
28    options_summary_cache: &Arc<OptionsSummarySnapshotCache>,
29    diesel_db: Arc<dyn AnalyticsReader>,
30) -> Result<UpstashInitResult> {
31    let env_configured = |name: &str| -> bool {
32        std::env::var(name)
33            .ok()
34            .map(|v| !v.trim().is_empty())
35            .unwrap_or(false)
36    };
37
38    let endpoint_configured = env_configured("MARKETS_SNAPSHOT_UPSTASH_ENDPOINT");
39    let port_configured = env_configured("MARKETS_SNAPSHOT_UPSTASH_PORT");
40    let password_configured = env_configured("MARKETS_SNAPSHOT_UPSTASH_PASSWORD");
41    let key_configured = env_configured("MARKETS_SNAPSHOT_UPSTASH_KEY");
42
43    metrics::counter!("ht_markets_upstash_init_total", "status" => "attempt").increment(1);
44    tracing::info!(
45        endpoint_configured,
46        port_configured,
47        password_configured,
48        key_configured,
49        "Evaluating Upstash publisher configuration"
50    );
51
52    let batch_publisher = match UpstashBatchPublisher::from_env() {
53        Ok(Some(bp)) => {
54            metrics::counter!("ht_markets_upstash_init_total", "status" => "initialized")
55                .increment(1);
56            bp
57        }
58        Ok(None) => {
59            metrics::counter!("ht_markets_upstash_init_total", "status" => "disabled").increment(1);
60            tracing::info!(
61                endpoint_configured,
62                port_configured,
63                password_configured,
64                key_configured,
65                "Upstash publishers disabled (env not configured)"
66            );
67            return Ok(UpstashInitResult {
68                redis_client: None,
69                publisher: None,
70            });
71        }
72        Err(error) => {
73            metrics::counter!("ht_markets_upstash_init_total", "status" => "error").increment(1);
74            tracing::error!(
75                endpoint_configured,
76                port_configured,
77                password_configured,
78                key_configured,
79                error = %error,
80                "Failed to initialize Upstash publishers"
81            );
82            return Err(error);
83        }
84    };
85
86    let redis_client = Some(batch_publisher.client());
87
88    let marquee_movers_cache = Arc::new(MarqueeMoversCache::new(
89        options_summary_cache.clone(),
90        diesel_db,
91    ));
92
93    let sources: Vec<Arc<dyn UpstashSnapshotSource>> = vec![
94        Arc::new(StandardSnapshotSource::new(
95            "markets",
96            "MARKETS_SNAPSHOT_UPSTASH_KEY",
97            "markets:snapshot:slim:latest",
98            "MARKETS_SNAPSHOT_UPSTASH_TTL_SECONDS",
99            60,
100            {
101                let c = markets_cache.clone();
102                Box::new(move || c.published_response_slim())
103            },
104        )?),
105        Arc::new(StandardSnapshotSource::new(
106            "competitions",
107            "COMPETITIONS_UPSTASH_KEY",
108            "competitions:snapshot:latest",
109            "COMPETITIONS_UPSTASH_TTL_SECONDS",
110            60,
111            {
112                let c = competitions_cache.clone();
113                Box::new(move || c.published_response())
114            },
115        )?),
116        Arc::new(StandardSnapshotSource::new(
117            "sparklines",
118            "SPARKLINES_UPSTASH_KEY",
119            "sparklines:2h:latest",
120            "SPARKLINES_UPSTASH_TTL_SECONDS",
121            300,
122            {
123                let c = sparklines_cache.clone();
124                Box::new(move || c.published_response())
125            },
126        )?),
127        Arc::new(StandardSnapshotSource::new(
128            "instruments",
129            "INSTRUMENTS_UPSTASH_KEY",
130            "instruments:snapshot:latest",
131            "INSTRUMENTS_UPSTASH_TTL_SECONDS",
132            300,
133            {
134                let c = instruments_cache.clone();
135                Box::new(move || c.published_response())
136            },
137        )?),
138        Arc::new(StandardSnapshotSource::new(
139            "options_summary",
140            "OPTIONS_SUMMARY_UPSTASH_KEY",
141            "options-summary:snapshot:latest",
142            "OPTIONS_SUMMARY_UPSTASH_TTL_SECONDS",
143            60,
144            {
145                let c = options_summary_cache.clone();
146                Box::new(move || c.published_response())
147            },
148        )?),
149        Arc::new(MarqueeMoversSource::new(marquee_movers_cache)?),
150    ];
151
152    let publisher = Arc::new(batch_publisher.with_sources(sources));
153
154    tracing::info!(
155        endpoint_configured,
156        port_configured,
157        password_configured,
158        key_configured,
159        "✓ UpstashBatchPublisher initialized (6 sources, 1s tick)"
160    );
161
162    Ok(UpstashInitResult {
163        redis_client,
164        publisher: Some(publisher),
165    })
166}