Skip to main content

hypercall_api/caches/
markets.rs

1use anyhow::{anyhow, Result};
2use arc_swap::ArcSwap;
3use axum::body::Bytes;
4use rust_decimal::Decimal;
5use rust_decimal_macros::dec;
6use serde::Serialize;
7use std::collections::{BTreeMap, HashMap};
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10use tracing::error;
11
12use crate::boundary::market_inputs::GreeksCacheReader;
13use crate::boundary::market_inputs::InstrumentsCacheReader;
14use crate::boundary::market_inputs::MarketStatsCacheReader;
15use crate::models::{Instrument, MarketInfo};
16
17const DEFAULT_REFRESH_INTERVAL_MS: u64 = 1000;
18const MARKETS_SNAPSHOT_ENVELOPE_SCHEMA_VERSION: u32 = 1;
19
20#[derive(Clone)]
21struct MarketsSnapshotData {
22    response_bytes: Bytes,
23    response_bytes_slim: Bytes,
24    response_bytes_slim_upstash: Bytes,
25    built_at: std::time::SystemTime,
26}
27
28pub struct MarketsSnapshotCache {
29    instruments_cache: Arc<dyn InstrumentsCacheReader>,
30    greeks_cache: Arc<dyn GreeksCacheReader>,
31    market_stats_cache: Arc<dyn MarketStatsCacheReader>,
32    snapshot: ArcSwap<MarketsSnapshotData>,
33    refresh_interval: Duration,
34}
35
36impl MarketsSnapshotCache {
37    pub fn new(
38        instruments_cache: Arc<dyn InstrumentsCacheReader>,
39        greeks_cache: Arc<dyn GreeksCacheReader>,
40        market_stats_cache: Arc<dyn MarketStatsCacheReader>,
41    ) -> Self {
42        let failure_payload = Bytes::from_static(br#"{"success":false,"data":[]}"#);
43
44        Self {
45            instruments_cache,
46            greeks_cache,
47            market_stats_cache,
48            snapshot: ArcSwap::from_pointee(MarketsSnapshotData {
49                response_bytes: failure_payload.clone(),
50                response_bytes_slim: failure_payload,
51                response_bytes_slim_upstash: Bytes::from_static(
52                    br#"{"schema_version":1,"built_at_ms":0,"payload":{"success":false,"data":[]}}"#,
53                ),
54                built_at: std::time::SystemTime::UNIX_EPOCH,
55            }),
56            refresh_interval: Duration::from_millis(DEFAULT_REFRESH_INTERVAL_MS),
57        }
58    }
59
60    pub fn with_refresh_interval(mut self, refresh_interval: Duration) -> Self {
61        self.refresh_interval = refresh_interval;
62        self
63    }
64
65    pub async fn refresh_once(&self) -> Result<()> {
66        let cache_start = Instant::now();
67        let instruments = self.instruments_cache.get_all().await;
68        let all_stats = self.market_stats_cache.get_all_stats().await;
69        let all_ivs = self.greeks_cache.get_all_iv_snapshot().await;
70        let all_spot_prices = self.greeks_cache.get_all_spot_prices_snapshot().await;
71        let all_prev_day_prices = self.greeks_cache.get_all_prev_day_prices_snapshot().await;
72        metrics::histogram!("ht_markets_cache_snapshot_seconds")
73            .record(cache_start.elapsed().as_secs_f64());
74
75        let build_start = Instant::now();
76        let market_infos = build_market_infos(
77            instruments,
78            &all_stats,
79            &all_ivs,
80            &all_spot_prices,
81            &all_prev_day_prices,
82        )?;
83        metrics::histogram!("ht_markets_group_build_seconds")
84            .record(build_start.elapsed().as_secs_f64());
85
86        let serialize_start = Instant::now();
87        let built_at = std::time::SystemTime::now();
88        let response = MarketsResponseRef {
89            success: true,
90            data: &market_infos,
91        };
92        let response_bytes = Bytes::from(
93            serde_json::to_vec(&response)
94                .map_err(|e| anyhow!("Failed to serialize /markets snapshot: {}", e))?,
95        );
96
97        let slim_data: Vec<MarketInfoSlimRef> =
98            market_infos.iter().map(MarketInfoSlimRef::from).collect();
99        let slim_response = MarketsSlimResponseRef {
100            success: true,
101            data: slim_data,
102        };
103        let response_bytes_slim = Bytes::from(
104            serde_json::to_vec(&slim_response)
105                .map_err(|e| anyhow!("Failed to serialize slim /markets snapshot: {}", e))?,
106        );
107        let response_bytes_slim_upstash = Bytes::from(
108            serde_json::to_vec(&PublishedMarketsSnapshotRef {
109                schema_version: MARKETS_SNAPSHOT_ENVELOPE_SCHEMA_VERSION,
110                built_at_ms: system_time_to_millis(built_at)?,
111                payload: &slim_response,
112            })
113            .map_err(|e| {
114                anyhow!(
115                    "Failed to serialize published slim /markets snapshot: {}",
116                    e
117                )
118            })?,
119        );
120        metrics::histogram!("ht_markets_serialize_seconds")
121            .record(serialize_start.elapsed().as_secs_f64());
122
123        self.snapshot.store(Arc::new(MarketsSnapshotData {
124            response_bytes,
125            response_bytes_slim,
126            response_bytes_slim_upstash,
127            built_at,
128        }));
129
130        metrics::counter!("ht_markets_snapshot_refresh_total", "status" => "success").increment(1);
131        Ok(())
132    }
133
134    pub async fn initialize(&self) {
135        if let Err(e) = self.refresh_once().await {
136            metrics::counter!("ht_markets_snapshot_refresh_total", "status" => "error")
137                .increment(1);
138            error!(error = %e, "Initial /markets snapshot build failed; serving cold-start payload");
139        }
140    }
141
142    pub fn start_with_shutdown(
143        self: Arc<Self>,
144        mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
145    ) -> tokio::task::JoinHandle<()> {
146        tokio::spawn(async move {
147            let mut interval = tokio::time::interval(self.refresh_interval);
148            loop {
149                tokio::select! {
150                    _ = shutdown_rx.recv() => {
151                        break;
152                    }
153                    _ = interval.tick() => {
154                        if let Err(e) = self.refresh_once().await {
155                            metrics::counter!("ht_markets_snapshot_refresh_total", "status" => "error").increment(1);
156                            error!(error = %e, "Failed to refresh /markets snapshot; keeping last-good");
157                        }
158                    }
159                }
160            }
161        })
162    }
163
164    pub fn response_bytes(&self) -> Bytes {
165        self.snapshot.load().response_bytes.clone()
166    }
167
168    /// Returns pre-serialized response bytes and the time the snapshot was built.
169    pub fn response(&self) -> (Bytes, std::time::SystemTime) {
170        let snap = self.snapshot.load();
171        (snap.response_bytes.clone(), snap.built_at)
172    }
173
174    /// Returns pre-serialized slim response bytes (without instruments) and the build time.
175    pub fn response_slim(&self) -> (Bytes, std::time::SystemTime) {
176        let snap = self.snapshot.load();
177        (snap.response_bytes_slim.clone(), snap.built_at)
178    }
179
180    /// Returns the pre-serialized slim snapshot payload wrapped with build metadata for Upstash.
181    pub fn published_response_slim(&self) -> (Bytes, std::time::SystemTime) {
182        let snap = self.snapshot.load();
183        (snap.response_bytes_slim_upstash.clone(), snap.built_at)
184    }
185}
186
187#[derive(Serialize)]
188struct MarketsResponseRef<'a> {
189    success: bool,
190    data: &'a [MarketInfo],
191}
192
193/// Slim view of MarketInfo for serialization, omits the instruments array.
194#[derive(Serialize)]
195pub struct MarketInfoSlimRef<'a> {
196    pub underlying: &'a str,
197    pub expiry: u64,
198    pub index_price: &'a Decimal,
199    pub atm_vol: Option<&'a Decimal>,
200    pub total_volume_24h: &'a Decimal,
201    pub total_open_interest: &'a Decimal,
202    #[serde(skip_serializing_if = "Option::is_none")]
203    pub prev_day_px: Option<&'a Decimal>,
204}
205
206impl<'a> From<&'a MarketInfo> for MarketInfoSlimRef<'a> {
207    fn from(m: &'a MarketInfo) -> Self {
208        Self {
209            underlying: &m.underlying,
210            expiry: m.expiry,
211            index_price: &m.index_price,
212            atm_vol: m.atm_vol.as_ref(),
213            total_volume_24h: &m.total_volume_24h,
214            total_open_interest: &m.total_open_interest,
215            prev_day_px: m.prev_day_price.as_ref(),
216        }
217    }
218}
219
220#[derive(Serialize)]
221pub struct MarketsSlimResponseRef<'a> {
222    pub success: bool,
223    pub data: Vec<MarketInfoSlimRef<'a>>,
224}
225
226#[derive(Serialize)]
227pub struct PublishedMarketsSnapshotRef<'a> {
228    pub schema_version: u32,
229    pub built_at_ms: u64,
230    pub payload: &'a MarketsSlimResponseRef<'a>,
231}
232
233fn system_time_to_millis(value: std::time::SystemTime) -> Result<u64> {
234    Ok(value
235        .duration_since(std::time::SystemTime::UNIX_EPOCH)
236        .map_err(|e| anyhow!("Invalid snapshot build time: {}", e))?
237        .as_millis() as u64)
238}
239
240pub fn build_market_infos(
241    instruments: Vec<Instrument>,
242    all_stats: &HashMap<String, (Decimal, Decimal)>,
243    all_ivs: &HashMap<String, f64>,
244    all_spot_prices: &HashMap<String, f64>,
245    all_prev_day_prices: &HashMap<String, f64>,
246) -> Result<Vec<MarketInfo>> {
247    let mut active_instruments: Vec<Instrument> = instruments
248        .into_iter()
249        .filter(|instrument| instrument.status.is_active())
250        .collect();
251
252    active_instruments.sort_by(|a, b| {
253        (
254            a.underlying.as_str(),
255            a.expiry,
256            a.strike,
257            a.option_type.as_str(),
258            a.id.as_str(),
259        )
260            .cmp(&(
261                b.underlying.as_str(),
262                b.expiry,
263                b.strike,
264                b.option_type.as_str(),
265                b.id.as_str(),
266            ))
267    });
268
269    let mut grouped: BTreeMap<(String, u64), Vec<Instrument>> = BTreeMap::new();
270
271    for mut instrument in active_instruments {
272        if let Some((volume, open_interest)) = all_stats.get(&instrument.id) {
273            instrument.volume_24h = *volume;
274            instrument.open_interest = *open_interest;
275        } else {
276            instrument.volume_24h = dec!(0);
277            instrument.open_interest = dec!(0);
278        }
279
280        instrument.mark_iv = all_ivs
281            .get(&instrument.id)
282            .and_then(|iv| Decimal::from_f64_retain(*iv));
283
284        let key = (instrument.underlying.clone(), instrument.expiry);
285        grouped.entry(key).or_default().push(instrument);
286    }
287
288    let mut market_infos = Vec::with_capacity(grouped.len());
289
290    for ((underlying, expiry), market_instruments) in grouped {
291        let spot_price_f64 = *all_spot_prices
292            .get(&underlying)
293            .ok_or_else(|| anyhow!("Missing spot price for {}", underlying))?;
294
295        if !spot_price_f64.is_finite() || spot_price_f64 <= 0.0 {
296            return Err(anyhow!(
297                "Invalid spot price {} for {}",
298                spot_price_f64,
299                underlying
300            ));
301        }
302
303        let spot_price = Decimal::from_f64_retain(spot_price_f64)
304            .ok_or_else(|| anyhow!("Failed to convert spot price for {}", underlying))?;
305
306        let total_volume_24h: Decimal = market_instruments.iter().map(|i| i.volume_24h).sum();
307        let total_open_interest: Decimal = market_instruments.iter().map(|i| i.open_interest).sum();
308
309        let mut atm_vols = Vec::new();
310        for instrument in &market_instruments {
311            let moneyness = instrument.strike / spot_price;
312            if moneyness > dec!(0.95) && moneyness < dec!(1.05) {
313                if let Some(mark_iv) = instrument.mark_iv {
314                    atm_vols.push(mark_iv);
315                }
316            }
317        }
318
319        let atm_vol = if atm_vols.is_empty() {
320            None
321        } else {
322            Some(atm_vols.iter().sum::<Decimal>() / Decimal::from(atm_vols.len()))
323        };
324
325        let prev_day_price = all_prev_day_prices
326            .get(&underlying)
327            .and_then(|v| Decimal::from_f64_retain(*v));
328
329        market_infos.push(MarketInfo {
330            underlying,
331            expiry,
332            index_price: spot_price,
333            atm_vol,
334            instruments: market_instruments,
335            total_volume_24h,
336            total_open_interest,
337            prev_day_price,
338        });
339    }
340
341    Ok(market_infos)
342}
343
344#[cfg(test)]
345mod tests {
346    use super::{
347        build_market_infos, system_time_to_millis, MarketInfoSlimRef, MarketsSlimResponseRef,
348        PublishedMarketsSnapshotRef, MARKETS_SNAPSHOT_ENVELOPE_SCHEMA_VERSION,
349    };
350    use crate::models::{Instrument, InstrumentStatus};
351    use chrono::Utc;
352    use hypercall_types::TradingModes;
353    use rust_decimal_macros::dec;
354    use std::collections::HashMap;
355
356    fn instrument(
357        symbol: &str,
358        underlying: &str,
359        strike: rust_decimal::Decimal,
360        expiry: u64,
361    ) -> Instrument {
362        Instrument {
363            instrument_id: 1,
364            id: symbol.to_string(),
365            underlying: underlying.to_string(),
366            strike,
367            expiry,
368            option_type: "call".to_string(),
369            option_token_address: None,
370            mark_iv: None,
371            volume_24h: dec!(0),
372            open_interest: dec!(0),
373            updated_at: Utc::now(),
374            status: InstrumentStatus::Active,
375            trading_mode: TradingModes::ORDERBOOK,
376        }
377    }
378
379    #[test]
380    fn build_market_infos_requires_spot_price() {
381        let instruments = vec![instrument("BTC-1", "BTC", dec!(100000), 1_772_668_800)];
382        let all_stats = HashMap::new();
383        let all_ivs = HashMap::new();
384        let all_spot_prices = HashMap::new();
385
386        let err = build_market_infos(
387            instruments,
388            &all_stats,
389            &all_ivs,
390            &all_spot_prices,
391            &HashMap::new(),
392        )
393        .expect_err("build must fail when spot is missing");
394        assert!(err.to_string().contains("Missing spot price"));
395    }
396
397    #[test]
398    fn build_market_infos_groups_and_aggregates() {
399        let expiry = 1_772_668_800;
400        let instruments = vec![
401            instrument("BTC-1", "BTC", dec!(100000), expiry),
402            instrument("BTC-2", "BTC", dec!(110000), expiry),
403        ];
404
405        let mut all_stats = HashMap::new();
406        all_stats.insert("BTC-1".to_string(), (dec!(10), dec!(2)));
407        all_stats.insert("BTC-2".to_string(), (dec!(5), dec!(3)));
408
409        let mut all_ivs = HashMap::new();
410        all_ivs.insert("BTC-1".to_string(), 0.75);
411        all_ivs.insert("BTC-2".to_string(), 0.80);
412
413        let mut all_spot_prices = HashMap::new();
414        all_spot_prices.insert("BTC".to_string(), 105000.0);
415
416        let result = build_market_infos(
417            instruments,
418            &all_stats,
419            &all_ivs,
420            &all_spot_prices,
421            &HashMap::new(),
422        )
423        .expect("build should succeed");
424
425        assert_eq!(result.len(), 1);
426        assert_eq!(result[0].underlying, "BTC");
427        assert_eq!(result[0].total_volume_24h, dec!(15));
428        assert_eq!(result[0].total_open_interest, dec!(5));
429        assert_eq!(result[0].instruments.len(), 2);
430    }
431
432    #[test]
433    fn slim_response_omits_instruments() {
434        let expiry = 1_772_668_800;
435        let instruments = vec![
436            instrument("BTC-1", "BTC", dec!(100000), expiry),
437            instrument("BTC-2", "BTC", dec!(110000), expiry),
438        ];
439
440        let mut all_stats = HashMap::new();
441        all_stats.insert("BTC-1".to_string(), (dec!(10), dec!(2)));
442        all_stats.insert("BTC-2".to_string(), (dec!(5), dec!(3)));
443
444        let all_ivs = HashMap::new();
445
446        let mut all_spot_prices = HashMap::new();
447        all_spot_prices.insert("BTC".to_string(), 105000.0);
448
449        let market_infos = build_market_infos(
450            instruments,
451            &all_stats,
452            &all_ivs,
453            &all_spot_prices,
454            &HashMap::new(),
455        )
456        .expect("build should succeed");
457
458        let slim: Vec<MarketInfoSlimRef> =
459            market_infos.iter().map(MarketInfoSlimRef::from).collect();
460
461        let json = serde_json::to_value(&slim).expect("serialization must succeed");
462        let market = &json[0];
463        assert_eq!(market["underlying"], "BTC");
464        assert!(
465            market.get("instruments").is_none(),
466            "slim response must not contain instruments"
467        );
468    }
469
470    #[test]
471    fn slim_response_preserves_aggregates() {
472        let expiry = 1_772_668_800;
473        let instruments = vec![
474            instrument("BTC-1", "BTC", dec!(100000), expiry),
475            instrument("BTC-2", "BTC", dec!(110000), expiry),
476        ];
477
478        let mut all_stats = HashMap::new();
479        all_stats.insert("BTC-1".to_string(), (dec!(10), dec!(2)));
480        all_stats.insert("BTC-2".to_string(), (dec!(5), dec!(3)));
481
482        let mut all_ivs = HashMap::new();
483        all_ivs.insert("BTC-1".to_string(), 0.75);
484        all_ivs.insert("BTC-2".to_string(), 0.80);
485
486        let mut all_spot_prices = HashMap::new();
487        all_spot_prices.insert("BTC".to_string(), 105000.0);
488
489        let market_infos = build_market_infos(
490            instruments,
491            &all_stats,
492            &all_ivs,
493            &all_spot_prices,
494            &HashMap::new(),
495        )
496        .expect("build should succeed");
497
498        let slim: Vec<MarketInfoSlimRef> =
499            market_infos.iter().map(MarketInfoSlimRef::from).collect();
500
501        assert_eq!(slim.len(), 1);
502        assert_eq!(slim[0].underlying, "BTC");
503        assert_eq!(*slim[0].total_volume_24h, dec!(15));
504        assert_eq!(*slim[0].total_open_interest, dec!(5));
505        assert_eq!(*slim[0].index_price, market_infos[0].index_price);
506        assert_eq!(slim[0].atm_vol, market_infos[0].atm_vol.as_ref());
507    }
508
509    #[test]
510    fn slim_response_wraps_in_markets_response() {
511        let expiry = 1_772_668_800;
512        let instruments = vec![instrument("BTC-1", "BTC", dec!(100000), expiry)];
513
514        let mut all_stats = HashMap::new();
515        all_stats.insert("BTC-1".to_string(), (dec!(10), dec!(2)));
516        let all_ivs = HashMap::new();
517        let mut all_spot_prices = HashMap::new();
518        all_spot_prices.insert("BTC".to_string(), 105000.0);
519
520        let market_infos = build_market_infos(
521            instruments,
522            &all_stats,
523            &all_ivs,
524            &all_spot_prices,
525            &HashMap::new(),
526        )
527        .expect("build should succeed");
528
529        let slim_data: Vec<MarketInfoSlimRef> =
530            market_infos.iter().map(MarketInfoSlimRef::from).collect();
531        let response = MarketsSlimResponseRef {
532            success: true,
533            data: slim_data,
534        };
535
536        let json = serde_json::to_value(&response).expect("serialization must succeed");
537        assert_eq!(json["success"], true);
538        let data = json["data"].as_array().expect("data must be array");
539        assert_eq!(data.len(), 1);
540        assert!(data[0].get("instruments").is_none());
541        assert!(data[0].get("underlying").is_some());
542        assert!(data[0].get("index_price").is_some());
543    }
544
545    #[test]
546    fn published_slim_response_wraps_payload_with_metadata() {
547        let expiry = 1_772_668_800;
548        let instruments = vec![instrument("BTC-1", "BTC", dec!(100000), expiry)];
549
550        let mut all_stats = HashMap::new();
551        all_stats.insert("BTC-1".to_string(), (dec!(10), dec!(2)));
552        let all_ivs = HashMap::new();
553        let mut all_spot_prices = HashMap::new();
554        all_spot_prices.insert("BTC".to_string(), 105000.0);
555
556        let market_infos = build_market_infos(
557            instruments,
558            &all_stats,
559            &all_ivs,
560            &all_spot_prices,
561            &HashMap::new(),
562        )
563        .expect("build should succeed");
564        let slim_data: Vec<MarketInfoSlimRef> =
565            market_infos.iter().map(MarketInfoSlimRef::from).collect();
566        let slim_response = MarketsSlimResponseRef {
567            success: true,
568            data: slim_data,
569        };
570        let built_at = std::time::SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(123);
571
572        let envelope = PublishedMarketsSnapshotRef {
573            schema_version: MARKETS_SNAPSHOT_ENVELOPE_SCHEMA_VERSION,
574            built_at_ms: system_time_to_millis(built_at).expect("millis conversion should work"),
575            payload: &slim_response,
576        };
577
578        let json = serde_json::to_value(&envelope).expect("serialization must succeed");
579        assert_eq!(json["schema_version"], 1);
580        assert_eq!(json["built_at_ms"], 123000);
581        assert_eq!(json["payload"]["success"], true);
582        assert!(json["payload"]["data"].is_array());
583        assert!(json["payload"]["data"][0].get("instruments").is_none());
584    }
585
586    #[test]
587    fn slim_serialization_is_smaller_than_full() {
588        let expiry = 1_772_668_800;
589        let instruments = vec![
590            instrument("BTC-1", "BTC", dec!(100000), expiry),
591            instrument("BTC-2", "BTC", dec!(110000), expiry),
592            instrument("ETH-1", "ETH", dec!(3000), expiry),
593        ];
594
595        let mut all_stats = HashMap::new();
596        all_stats.insert("BTC-1".to_string(), (dec!(10), dec!(2)));
597        all_stats.insert("BTC-2".to_string(), (dec!(5), dec!(3)));
598        all_stats.insert("ETH-1".to_string(), (dec!(20), dec!(5)));
599
600        let all_ivs = HashMap::new();
601        let mut all_spot_prices = HashMap::new();
602        all_spot_prices.insert("BTC".to_string(), 105000.0);
603        all_spot_prices.insert("ETH".to_string(), 3100.0);
604
605        let market_infos = build_market_infos(
606            instruments,
607            &all_stats,
608            &all_ivs,
609            &all_spot_prices,
610            &HashMap::new(),
611        )
612        .expect("build should succeed");
613
614        let full_bytes = serde_json::to_vec(&market_infos).unwrap();
615
616        let slim: Vec<MarketInfoSlimRef> =
617            market_infos.iter().map(MarketInfoSlimRef::from).collect();
618        let slim_bytes = serde_json::to_vec(&slim).unwrap();
619
620        assert!(
621            slim_bytes.len() < full_bytes.len(),
622            "slim ({} bytes) must be smaller than full ({} bytes)",
623            slim_bytes.len(),
624            full_bytes.len()
625        );
626    }
627}