Skip to main content

hypercall/runtime/tasks/
index_price_publisher.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use chrono::Utc;
5use rust_decimal::prelude::FromPrimitive;
6use rust_decimal::Decimal;
7use tokio::sync::{broadcast, Notify};
8use tracing::debug;
9
10use crate::read_cache::greeks::GreeksCache;
11use hypercall_types::ws_protocol::{IndexPriceEntry, WsIndexPriceUpdate};
12
13/// Debounce window after a price notification to batch near-simultaneous
14/// updates from multiple oracles (e.g., BTC and ETH updating within ms).
15const DEBOUNCE: Duration = Duration::from_millis(50);
16
17/// Publishes a fully materialized index-price websocket update.
18///
19/// Called inline from the runtime debounce loop, so implementations should stay
20/// cheap and non-blocking. The call is fire-and-forget from the task's point of
21/// view, and implementors must be shareable across runtime tasks.
22pub trait IndexPriceUpdatePublisher: Send + Sync {
23    fn publish_index_prices(&self, update: WsIndexPriceUpdate);
24}
25
26pub struct IndexPricePublisher {
27    greeks_cache: Arc<GreeksCache>,
28    pubsub: Arc<dyn IndexPriceUpdatePublisher>,
29    price_notify: Arc<Notify>,
30}
31
32impl IndexPricePublisher {
33    pub fn new(
34        greeks_cache: Arc<GreeksCache>,
35        pubsub: Arc<dyn IndexPriceUpdatePublisher>,
36        price_notify: Arc<Notify>,
37    ) -> Self {
38        Self {
39            greeks_cache,
40            pubsub,
41            price_notify,
42        }
43    }
44
45    pub async fn run_with_shutdown(self, mut shutdown_rx: broadcast::Receiver<()>) {
46        loop {
47            tokio::select! {
48                _ = shutdown_rx.recv() => {
49                    debug!("IndexPricePublisher received shutdown signal");
50                    break;
51                }
52                _ = self.price_notify.notified() => {
53                    // Debounce: wait briefly to batch concurrent oracle updates
54                    tokio::time::sleep(DEBOUNCE).await;
55                    self.publish_once().await;
56                }
57            }
58        }
59    }
60}
61
62#[async_trait::async_trait]
63impl crate::shared::service::Service for IndexPricePublisher {
64    fn name(&self) -> &'static str {
65        "IndexPricePublisher"
66    }
67
68    fn owner(&self) -> crate::shared::service::ServiceOwner {
69        crate::shared::service::ServiceOwner::Api
70    }
71
72    async fn run(self: Arc<Self>, mut shutdown: crate::shared::ShutdownRx) -> anyhow::Result<()> {
73        loop {
74            tokio::select! {
75                _ = shutdown.recv() => {
76                    debug!("IndexPricePublisher received shutdown signal");
77                    break;
78                }
79                _ = self.price_notify.notified() => {
80                    tokio::time::sleep(DEBOUNCE).await;
81                    self.publish_once().await;
82                }
83            }
84        }
85        Ok(())
86    }
87}
88
89impl IndexPricePublisher {
90    async fn publish_once(&self) {
91        let spot_prices = self.greeks_cache.get_all_spot_prices_snapshot().await;
92        if spot_prices.is_empty() {
93            return;
94        }
95
96        let mut prices: Vec<IndexPriceEntry> = spot_prices
97            .into_iter()
98            .filter_map(|(underlying, price)| {
99                Decimal::from_f64(price).map(|p| IndexPriceEntry {
100                    underlying,
101                    price: p,
102                })
103            })
104            .collect();
105        prices.sort_by(|a, b| a.underlying.cmp(&b.underlying));
106
107        let update = WsIndexPriceUpdate {
108            prices,
109            timestamp: Utc::now().timestamp_millis(),
110        };
111
112        self.pubsub.publish_index_prices(update);
113    }
114}