hypercall/runtime/tasks/
index_price_publisher.rs1use std::sync::Arc;
2use std::time::Duration;
3
4use chrono::Utc;
5use rust_decimal::prelude::FromPrimitive;
6use rust_decimal::Decimal;
7use tokio::sync::{broadcast, Notify};
8use tracing::debug;
9
10use crate::read_cache::greeks::GreeksCache;
11use hypercall_types::ws_protocol::{IndexPriceEntry, WsIndexPriceUpdate};
12
13const DEBOUNCE: Duration = Duration::from_millis(50);
16
17pub trait IndexPriceUpdatePublisher: Send + Sync {
23 fn publish_index_prices(&self, update: WsIndexPriceUpdate);
24}
25
26pub struct IndexPricePublisher {
27 greeks_cache: Arc<GreeksCache>,
28 pubsub: Arc<dyn IndexPriceUpdatePublisher>,
29 price_notify: Arc<Notify>,
30}
31
32impl IndexPricePublisher {
33 pub fn new(
34 greeks_cache: Arc<GreeksCache>,
35 pubsub: Arc<dyn IndexPriceUpdatePublisher>,
36 price_notify: Arc<Notify>,
37 ) -> Self {
38 Self {
39 greeks_cache,
40 pubsub,
41 price_notify,
42 }
43 }
44
45 pub async fn run_with_shutdown(self, mut shutdown_rx: broadcast::Receiver<()>) {
46 loop {
47 tokio::select! {
48 _ = shutdown_rx.recv() => {
49 debug!("IndexPricePublisher received shutdown signal");
50 break;
51 }
52 _ = self.price_notify.notified() => {
53 tokio::time::sleep(DEBOUNCE).await;
55 self.publish_once().await;
56 }
57 }
58 }
59 }
60}
61
62#[async_trait::async_trait]
63impl crate::shared::service::Service for IndexPricePublisher {
64 fn name(&self) -> &'static str {
65 "IndexPricePublisher"
66 }
67
68 fn owner(&self) -> crate::shared::service::ServiceOwner {
69 crate::shared::service::ServiceOwner::Api
70 }
71
72 async fn run(self: Arc<Self>, mut shutdown: crate::shared::ShutdownRx) -> anyhow::Result<()> {
73 loop {
74 tokio::select! {
75 _ = shutdown.recv() => {
76 debug!("IndexPricePublisher received shutdown signal");
77 break;
78 }
79 _ = self.price_notify.notified() => {
80 tokio::time::sleep(DEBOUNCE).await;
81 self.publish_once().await;
82 }
83 }
84 }
85 Ok(())
86 }
87}
88
89impl IndexPricePublisher {
90 async fn publish_once(&self) {
91 let spot_prices = self.greeks_cache.get_all_spot_prices_snapshot().await;
92 if spot_prices.is_empty() {
93 return;
94 }
95
96 let mut prices: Vec<IndexPriceEntry> = spot_prices
97 .into_iter()
98 .filter_map(|(underlying, price)| {
99 Decimal::from_f64(price).map(|p| IndexPriceEntry {
100 underlying,
101 price: p,
102 })
103 })
104 .collect();
105 prices.sort_by(|a, b| a.underlying.cmp(&b.underlying));
106
107 let update = WsIndexPriceUpdate {
108 prices,
109 timestamp: Utc::now().timestamp_millis(),
110 };
111
112 self.pubsub.publish_index_prices(update);
113 }
114}