1use anyhow::{anyhow, Result};
2use arc_swap::ArcSwap;
3use axum::body::Bytes;
4use rust_decimal::Decimal;
5use rust_decimal_macros::dec;
6use serde::Serialize;
7use std::collections::{BTreeMap, HashMap};
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10use tracing::error;
11
12use crate::boundary::market_inputs::GreeksCacheReader;
13use crate::boundary::market_inputs::InstrumentsCacheReader;
14use crate::boundary::market_inputs::MarketStatsCacheReader;
15use crate::models::{Instrument, MarketInfo};
16
17const DEFAULT_REFRESH_INTERVAL_MS: u64 = 1000;
18const MARKETS_SNAPSHOT_ENVELOPE_SCHEMA_VERSION: u32 = 1;
19
20#[derive(Clone)]
21struct MarketsSnapshotData {
22 response_bytes: Bytes,
23 response_bytes_slim: Bytes,
24 response_bytes_slim_upstash: Bytes,
25 built_at: std::time::SystemTime,
26}
27
28pub struct MarketsSnapshotCache {
29 instruments_cache: Arc<dyn InstrumentsCacheReader>,
30 greeks_cache: Arc<dyn GreeksCacheReader>,
31 market_stats_cache: Arc<dyn MarketStatsCacheReader>,
32 snapshot: ArcSwap<MarketsSnapshotData>,
33 refresh_interval: Duration,
34}
35
36impl MarketsSnapshotCache {
37 pub fn new(
38 instruments_cache: Arc<dyn InstrumentsCacheReader>,
39 greeks_cache: Arc<dyn GreeksCacheReader>,
40 market_stats_cache: Arc<dyn MarketStatsCacheReader>,
41 ) -> Self {
42 let failure_payload = Bytes::from_static(br#"{"success":false,"data":[]}"#);
43
44 Self {
45 instruments_cache,
46 greeks_cache,
47 market_stats_cache,
48 snapshot: ArcSwap::from_pointee(MarketsSnapshotData {
49 response_bytes: failure_payload.clone(),
50 response_bytes_slim: failure_payload,
51 response_bytes_slim_upstash: Bytes::from_static(
52 br#"{"schema_version":1,"built_at_ms":0,"payload":{"success":false,"data":[]}}"#,
53 ),
54 built_at: std::time::SystemTime::UNIX_EPOCH,
55 }),
56 refresh_interval: Duration::from_millis(DEFAULT_REFRESH_INTERVAL_MS),
57 }
58 }
59
60 pub fn with_refresh_interval(mut self, refresh_interval: Duration) -> Self {
61 self.refresh_interval = refresh_interval;
62 self
63 }
64
65 pub async fn refresh_once(&self) -> Result<()> {
66 let cache_start = Instant::now();
67 let instruments = self.instruments_cache.get_all().await;
68 let all_stats = self.market_stats_cache.get_all_stats().await;
69 let all_ivs = self.greeks_cache.get_all_iv_snapshot().await;
70 let all_spot_prices = self.greeks_cache.get_all_spot_prices_snapshot().await;
71 let all_prev_day_prices = self.greeks_cache.get_all_prev_day_prices_snapshot().await;
72 metrics::histogram!("ht_markets_cache_snapshot_seconds")
73 .record(cache_start.elapsed().as_secs_f64());
74
75 let build_start = Instant::now();
76 let market_infos = build_market_infos(
77 instruments,
78 &all_stats,
79 &all_ivs,
80 &all_spot_prices,
81 &all_prev_day_prices,
82 )?;
83 metrics::histogram!("ht_markets_group_build_seconds")
84 .record(build_start.elapsed().as_secs_f64());
85
86 let serialize_start = Instant::now();
87 let built_at = std::time::SystemTime::now();
88 let response = MarketsResponseRef {
89 success: true,
90 data: &market_infos,
91 };
92 let response_bytes = Bytes::from(
93 serde_json::to_vec(&response)
94 .map_err(|e| anyhow!("Failed to serialize /markets snapshot: {}", e))?,
95 );
96
97 let slim_data: Vec<MarketInfoSlimRef> =
98 market_infos.iter().map(MarketInfoSlimRef::from).collect();
99 let slim_response = MarketsSlimResponseRef {
100 success: true,
101 data: slim_data,
102 };
103 let response_bytes_slim = Bytes::from(
104 serde_json::to_vec(&slim_response)
105 .map_err(|e| anyhow!("Failed to serialize slim /markets snapshot: {}", e))?,
106 );
107 let response_bytes_slim_upstash = Bytes::from(
108 serde_json::to_vec(&PublishedMarketsSnapshotRef {
109 schema_version: MARKETS_SNAPSHOT_ENVELOPE_SCHEMA_VERSION,
110 built_at_ms: system_time_to_millis(built_at)?,
111 payload: &slim_response,
112 })
113 .map_err(|e| {
114 anyhow!(
115 "Failed to serialize published slim /markets snapshot: {}",
116 e
117 )
118 })?,
119 );
120 metrics::histogram!("ht_markets_serialize_seconds")
121 .record(serialize_start.elapsed().as_secs_f64());
122
123 self.snapshot.store(Arc::new(MarketsSnapshotData {
124 response_bytes,
125 response_bytes_slim,
126 response_bytes_slim_upstash,
127 built_at,
128 }));
129
130 metrics::counter!("ht_markets_snapshot_refresh_total", "status" => "success").increment(1);
131 Ok(())
132 }
133
134 pub async fn initialize(&self) {
135 if let Err(e) = self.refresh_once().await {
136 metrics::counter!("ht_markets_snapshot_refresh_total", "status" => "error")
137 .increment(1);
138 error!(error = %e, "Initial /markets snapshot build failed; serving cold-start payload");
139 }
140 }
141
142 pub fn start_with_shutdown(
143 self: Arc<Self>,
144 mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
145 ) -> tokio::task::JoinHandle<()> {
146 tokio::spawn(async move {
147 let mut interval = tokio::time::interval(self.refresh_interval);
148 loop {
149 tokio::select! {
150 _ = shutdown_rx.recv() => {
151 break;
152 }
153 _ = interval.tick() => {
154 if let Err(e) = self.refresh_once().await {
155 metrics::counter!("ht_markets_snapshot_refresh_total", "status" => "error").increment(1);
156 error!(error = %e, "Failed to refresh /markets snapshot; keeping last-good");
157 }
158 }
159 }
160 }
161 })
162 }
163
164 pub fn response_bytes(&self) -> Bytes {
165 self.snapshot.load().response_bytes.clone()
166 }
167
168 pub fn response(&self) -> (Bytes, std::time::SystemTime) {
170 let snap = self.snapshot.load();
171 (snap.response_bytes.clone(), snap.built_at)
172 }
173
174 pub fn response_slim(&self) -> (Bytes, std::time::SystemTime) {
176 let snap = self.snapshot.load();
177 (snap.response_bytes_slim.clone(), snap.built_at)
178 }
179
180 pub fn published_response_slim(&self) -> (Bytes, std::time::SystemTime) {
182 let snap = self.snapshot.load();
183 (snap.response_bytes_slim_upstash.clone(), snap.built_at)
184 }
185}
186
187#[derive(Serialize)]
188struct MarketsResponseRef<'a> {
189 success: bool,
190 data: &'a [MarketInfo],
191}
192
193#[derive(Serialize)]
195pub struct MarketInfoSlimRef<'a> {
196 pub underlying: &'a str,
197 pub expiry: u64,
198 pub index_price: &'a Decimal,
199 pub atm_vol: Option<&'a Decimal>,
200 pub total_volume_24h: &'a Decimal,
201 pub total_open_interest: &'a Decimal,
202 #[serde(skip_serializing_if = "Option::is_none")]
203 pub prev_day_px: Option<&'a Decimal>,
204}
205
206impl<'a> From<&'a MarketInfo> for MarketInfoSlimRef<'a> {
207 fn from(m: &'a MarketInfo) -> Self {
208 Self {
209 underlying: &m.underlying,
210 expiry: m.expiry,
211 index_price: &m.index_price,
212 atm_vol: m.atm_vol.as_ref(),
213 total_volume_24h: &m.total_volume_24h,
214 total_open_interest: &m.total_open_interest,
215 prev_day_px: m.prev_day_price.as_ref(),
216 }
217 }
218}
219
220#[derive(Serialize)]
221pub struct MarketsSlimResponseRef<'a> {
222 pub success: bool,
223 pub data: Vec<MarketInfoSlimRef<'a>>,
224}
225
226#[derive(Serialize)]
227pub struct PublishedMarketsSnapshotRef<'a> {
228 pub schema_version: u32,
229 pub built_at_ms: u64,
230 pub payload: &'a MarketsSlimResponseRef<'a>,
231}
232
233fn system_time_to_millis(value: std::time::SystemTime) -> Result<u64> {
234 Ok(value
235 .duration_since(std::time::SystemTime::UNIX_EPOCH)
236 .map_err(|e| anyhow!("Invalid snapshot build time: {}", e))?
237 .as_millis() as u64)
238}
239
240pub fn build_market_infos(
241 instruments: Vec<Instrument>,
242 all_stats: &HashMap<String, (Decimal, Decimal)>,
243 all_ivs: &HashMap<String, f64>,
244 all_spot_prices: &HashMap<String, f64>,
245 all_prev_day_prices: &HashMap<String, f64>,
246) -> Result<Vec<MarketInfo>> {
247 let mut active_instruments: Vec<Instrument> = instruments
248 .into_iter()
249 .filter(|instrument| instrument.status.is_active())
250 .collect();
251
252 active_instruments.sort_by(|a, b| {
253 (
254 a.underlying.as_str(),
255 a.expiry,
256 a.strike,
257 a.option_type.as_str(),
258 a.id.as_str(),
259 )
260 .cmp(&(
261 b.underlying.as_str(),
262 b.expiry,
263 b.strike,
264 b.option_type.as_str(),
265 b.id.as_str(),
266 ))
267 });
268
269 let mut grouped: BTreeMap<(String, u64), Vec<Instrument>> = BTreeMap::new();
270
271 for mut instrument in active_instruments {
272 if let Some((volume, open_interest)) = all_stats.get(&instrument.id) {
273 instrument.volume_24h = *volume;
274 instrument.open_interest = *open_interest;
275 } else {
276 instrument.volume_24h = dec!(0);
277 instrument.open_interest = dec!(0);
278 }
279
280 instrument.mark_iv = all_ivs
281 .get(&instrument.id)
282 .and_then(|iv| Decimal::from_f64_retain(*iv));
283
284 let key = (instrument.underlying.clone(), instrument.expiry);
285 grouped.entry(key).or_default().push(instrument);
286 }
287
288 let mut market_infos = Vec::with_capacity(grouped.len());
289
290 for ((underlying, expiry), market_instruments) in grouped {
291 let spot_price_f64 = *all_spot_prices
292 .get(&underlying)
293 .ok_or_else(|| anyhow!("Missing spot price for {}", underlying))?;
294
295 if !spot_price_f64.is_finite() || spot_price_f64 <= 0.0 {
296 return Err(anyhow!(
297 "Invalid spot price {} for {}",
298 spot_price_f64,
299 underlying
300 ));
301 }
302
303 let spot_price = Decimal::from_f64_retain(spot_price_f64)
304 .ok_or_else(|| anyhow!("Failed to convert spot price for {}", underlying))?;
305
306 let total_volume_24h: Decimal = market_instruments.iter().map(|i| i.volume_24h).sum();
307 let total_open_interest: Decimal = market_instruments.iter().map(|i| i.open_interest).sum();
308
309 let mut atm_vols = Vec::new();
310 for instrument in &market_instruments {
311 let moneyness = instrument.strike / spot_price;
312 if moneyness > dec!(0.95) && moneyness < dec!(1.05) {
313 if let Some(mark_iv) = instrument.mark_iv {
314 atm_vols.push(mark_iv);
315 }
316 }
317 }
318
319 let atm_vol = if atm_vols.is_empty() {
320 None
321 } else {
322 Some(atm_vols.iter().sum::<Decimal>() / Decimal::from(atm_vols.len()))
323 };
324
325 let prev_day_price = all_prev_day_prices
326 .get(&underlying)
327 .and_then(|v| Decimal::from_f64_retain(*v));
328
329 market_infos.push(MarketInfo {
330 underlying,
331 expiry,
332 index_price: spot_price,
333 atm_vol,
334 instruments: market_instruments,
335 total_volume_24h,
336 total_open_interest,
337 prev_day_price,
338 });
339 }
340
341 Ok(market_infos)
342}
343
344#[cfg(test)]
345mod tests {
346 use super::{
347 build_market_infos, system_time_to_millis, MarketInfoSlimRef, MarketsSlimResponseRef,
348 PublishedMarketsSnapshotRef, MARKETS_SNAPSHOT_ENVELOPE_SCHEMA_VERSION,
349 };
350 use crate::models::{Instrument, InstrumentStatus};
351 use chrono::Utc;
352 use hypercall_types::TradingModes;
353 use rust_decimal_macros::dec;
354 use std::collections::HashMap;
355
356 fn instrument(
357 symbol: &str,
358 underlying: &str,
359 strike: rust_decimal::Decimal,
360 expiry: u64,
361 ) -> Instrument {
362 Instrument {
363 instrument_id: 1,
364 id: symbol.to_string(),
365 underlying: underlying.to_string(),
366 strike,
367 expiry,
368 option_type: "call".to_string(),
369 option_token_address: None,
370 mark_iv: None,
371 volume_24h: dec!(0),
372 open_interest: dec!(0),
373 updated_at: Utc::now(),
374 status: InstrumentStatus::Active,
375 trading_mode: TradingModes::ORDERBOOK,
376 }
377 }
378
379 #[test]
380 fn build_market_infos_requires_spot_price() {
381 let instruments = vec![instrument("BTC-1", "BTC", dec!(100000), 1_772_668_800)];
382 let all_stats = HashMap::new();
383 let all_ivs = HashMap::new();
384 let all_spot_prices = HashMap::new();
385
386 let err = build_market_infos(
387 instruments,
388 &all_stats,
389 &all_ivs,
390 &all_spot_prices,
391 &HashMap::new(),
392 )
393 .expect_err("build must fail when spot is missing");
394 assert!(err.to_string().contains("Missing spot price"));
395 }
396
397 #[test]
398 fn build_market_infos_groups_and_aggregates() {
399 let expiry = 1_772_668_800;
400 let instruments = vec![
401 instrument("BTC-1", "BTC", dec!(100000), expiry),
402 instrument("BTC-2", "BTC", dec!(110000), expiry),
403 ];
404
405 let mut all_stats = HashMap::new();
406 all_stats.insert("BTC-1".to_string(), (dec!(10), dec!(2)));
407 all_stats.insert("BTC-2".to_string(), (dec!(5), dec!(3)));
408
409 let mut all_ivs = HashMap::new();
410 all_ivs.insert("BTC-1".to_string(), 0.75);
411 all_ivs.insert("BTC-2".to_string(), 0.80);
412
413 let mut all_spot_prices = HashMap::new();
414 all_spot_prices.insert("BTC".to_string(), 105000.0);
415
416 let result = build_market_infos(
417 instruments,
418 &all_stats,
419 &all_ivs,
420 &all_spot_prices,
421 &HashMap::new(),
422 )
423 .expect("build should succeed");
424
425 assert_eq!(result.len(), 1);
426 assert_eq!(result[0].underlying, "BTC");
427 assert_eq!(result[0].total_volume_24h, dec!(15));
428 assert_eq!(result[0].total_open_interest, dec!(5));
429 assert_eq!(result[0].instruments.len(), 2);
430 }
431
432 #[test]
433 fn slim_response_omits_instruments() {
434 let expiry = 1_772_668_800;
435 let instruments = vec![
436 instrument("BTC-1", "BTC", dec!(100000), expiry),
437 instrument("BTC-2", "BTC", dec!(110000), expiry),
438 ];
439
440 let mut all_stats = HashMap::new();
441 all_stats.insert("BTC-1".to_string(), (dec!(10), dec!(2)));
442 all_stats.insert("BTC-2".to_string(), (dec!(5), dec!(3)));
443
444 let all_ivs = HashMap::new();
445
446 let mut all_spot_prices = HashMap::new();
447 all_spot_prices.insert("BTC".to_string(), 105000.0);
448
449 let market_infos = build_market_infos(
450 instruments,
451 &all_stats,
452 &all_ivs,
453 &all_spot_prices,
454 &HashMap::new(),
455 )
456 .expect("build should succeed");
457
458 let slim: Vec<MarketInfoSlimRef> =
459 market_infos.iter().map(MarketInfoSlimRef::from).collect();
460
461 let json = serde_json::to_value(&slim).expect("serialization must succeed");
462 let market = &json[0];
463 assert_eq!(market["underlying"], "BTC");
464 assert!(
465 market.get("instruments").is_none(),
466 "slim response must not contain instruments"
467 );
468 }
469
470 #[test]
471 fn slim_response_preserves_aggregates() {
472 let expiry = 1_772_668_800;
473 let instruments = vec![
474 instrument("BTC-1", "BTC", dec!(100000), expiry),
475 instrument("BTC-2", "BTC", dec!(110000), expiry),
476 ];
477
478 let mut all_stats = HashMap::new();
479 all_stats.insert("BTC-1".to_string(), (dec!(10), dec!(2)));
480 all_stats.insert("BTC-2".to_string(), (dec!(5), dec!(3)));
481
482 let mut all_ivs = HashMap::new();
483 all_ivs.insert("BTC-1".to_string(), 0.75);
484 all_ivs.insert("BTC-2".to_string(), 0.80);
485
486 let mut all_spot_prices = HashMap::new();
487 all_spot_prices.insert("BTC".to_string(), 105000.0);
488
489 let market_infos = build_market_infos(
490 instruments,
491 &all_stats,
492 &all_ivs,
493 &all_spot_prices,
494 &HashMap::new(),
495 )
496 .expect("build should succeed");
497
498 let slim: Vec<MarketInfoSlimRef> =
499 market_infos.iter().map(MarketInfoSlimRef::from).collect();
500
501 assert_eq!(slim.len(), 1);
502 assert_eq!(slim[0].underlying, "BTC");
503 assert_eq!(*slim[0].total_volume_24h, dec!(15));
504 assert_eq!(*slim[0].total_open_interest, dec!(5));
505 assert_eq!(*slim[0].index_price, market_infos[0].index_price);
506 assert_eq!(slim[0].atm_vol, market_infos[0].atm_vol.as_ref());
507 }
508
509 #[test]
510 fn slim_response_wraps_in_markets_response() {
511 let expiry = 1_772_668_800;
512 let instruments = vec![instrument("BTC-1", "BTC", dec!(100000), expiry)];
513
514 let mut all_stats = HashMap::new();
515 all_stats.insert("BTC-1".to_string(), (dec!(10), dec!(2)));
516 let all_ivs = HashMap::new();
517 let mut all_spot_prices = HashMap::new();
518 all_spot_prices.insert("BTC".to_string(), 105000.0);
519
520 let market_infos = build_market_infos(
521 instruments,
522 &all_stats,
523 &all_ivs,
524 &all_spot_prices,
525 &HashMap::new(),
526 )
527 .expect("build should succeed");
528
529 let slim_data: Vec<MarketInfoSlimRef> =
530 market_infos.iter().map(MarketInfoSlimRef::from).collect();
531 let response = MarketsSlimResponseRef {
532 success: true,
533 data: slim_data,
534 };
535
536 let json = serde_json::to_value(&response).expect("serialization must succeed");
537 assert_eq!(json["success"], true);
538 let data = json["data"].as_array().expect("data must be array");
539 assert_eq!(data.len(), 1);
540 assert!(data[0].get("instruments").is_none());
541 assert!(data[0].get("underlying").is_some());
542 assert!(data[0].get("index_price").is_some());
543 }
544
545 #[test]
546 fn published_slim_response_wraps_payload_with_metadata() {
547 let expiry = 1_772_668_800;
548 let instruments = vec![instrument("BTC-1", "BTC", dec!(100000), expiry)];
549
550 let mut all_stats = HashMap::new();
551 all_stats.insert("BTC-1".to_string(), (dec!(10), dec!(2)));
552 let all_ivs = HashMap::new();
553 let mut all_spot_prices = HashMap::new();
554 all_spot_prices.insert("BTC".to_string(), 105000.0);
555
556 let market_infos = build_market_infos(
557 instruments,
558 &all_stats,
559 &all_ivs,
560 &all_spot_prices,
561 &HashMap::new(),
562 )
563 .expect("build should succeed");
564 let slim_data: Vec<MarketInfoSlimRef> =
565 market_infos.iter().map(MarketInfoSlimRef::from).collect();
566 let slim_response = MarketsSlimResponseRef {
567 success: true,
568 data: slim_data,
569 };
570 let built_at = std::time::SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(123);
571
572 let envelope = PublishedMarketsSnapshotRef {
573 schema_version: MARKETS_SNAPSHOT_ENVELOPE_SCHEMA_VERSION,
574 built_at_ms: system_time_to_millis(built_at).expect("millis conversion should work"),
575 payload: &slim_response,
576 };
577
578 let json = serde_json::to_value(&envelope).expect("serialization must succeed");
579 assert_eq!(json["schema_version"], 1);
580 assert_eq!(json["built_at_ms"], 123000);
581 assert_eq!(json["payload"]["success"], true);
582 assert!(json["payload"]["data"].is_array());
583 assert!(json["payload"]["data"][0].get("instruments").is_none());
584 }
585
586 #[test]
587 fn slim_serialization_is_smaller_than_full() {
588 let expiry = 1_772_668_800;
589 let instruments = vec![
590 instrument("BTC-1", "BTC", dec!(100000), expiry),
591 instrument("BTC-2", "BTC", dec!(110000), expiry),
592 instrument("ETH-1", "ETH", dec!(3000), expiry),
593 ];
594
595 let mut all_stats = HashMap::new();
596 all_stats.insert("BTC-1".to_string(), (dec!(10), dec!(2)));
597 all_stats.insert("BTC-2".to_string(), (dec!(5), dec!(3)));
598 all_stats.insert("ETH-1".to_string(), (dec!(20), dec!(5)));
599
600 let all_ivs = HashMap::new();
601 let mut all_spot_prices = HashMap::new();
602 all_spot_prices.insert("BTC".to_string(), 105000.0);
603 all_spot_prices.insert("ETH".to_string(), 3100.0);
604
605 let market_infos = build_market_infos(
606 instruments,
607 &all_stats,
608 &all_ivs,
609 &all_spot_prices,
610 &HashMap::new(),
611 )
612 .expect("build should succeed");
613
614 let full_bytes = serde_json::to_vec(&market_infos).unwrap();
615
616 let slim: Vec<MarketInfoSlimRef> =
617 market_infos.iter().map(MarketInfoSlimRef::from).collect();
618 let slim_bytes = serde_json::to_vec(&slim).unwrap();
619
620 assert!(
621 slim_bytes.len() < full_bytes.len(),
622 "slim ({} bytes) must be smaller than full ({} bytes)",
623 slim_bytes.len(),
624 full_bytes.len()
625 );
626 }
627}