Skip to main content

hypercall/read_cache/
market_stats.rs

1//! Market statistics cache for 24h volume and open interest.
2
3use anyhow::Result;
4use hypercall_types::{to_human_readable_decimal, utils::is_option_symbol};
5use rust_decimal::Decimal;
6use rust_decimal_macros::dec;
7use std::collections::HashMap;
8use std::sync::Arc;
9use tokio::sync::RwLock;
10use tracing::{debug, info, warn};
11
12use crate::messaging::EventBusTrait;
13use crate::portfolio::PortfolioService;
14use crate::shared::topics::TOPIC_FILLS;
15use hypercall_types::EngineMessage;
16
17/// Number of hourly buckets for 24h rolling volume
18const HOURLY_BUCKETS: usize = 24;
19const UNIX_MS_THRESHOLD: u64 = 10_000_000_000;
20const DAY_MS: u128 = 86_400_000;
21
22fn fill_notional_volume(
23    symbol: &str,
24    size: Decimal,
25    price: Decimal,
26    underlying_notional: Option<Decimal>,
27) -> Option<Decimal> {
28    if is_option_symbol(symbol) {
29        return underlying_notional;
30    }
31
32    Some(to_human_readable_decimal(symbol, size).abs() * price)
33}
34
35fn market_stats_cutoff_timestamp() -> i64 {
36    std::time::SystemTime::now()
37        .duration_since(std::time::UNIX_EPOCH)
38        .map(|d| d.as_millis().saturating_sub(DAY_MS) as i64)
39        .unwrap_or(0)
40}
41
42fn timestamp_to_seconds(timestamp: u64) -> u64 {
43    if timestamp >= UNIX_MS_THRESHOLD {
44        timestamp / 1000
45    } else {
46        timestamp
47    }
48}
49
50/// Statistics for a single instrument
51#[derive(Debug, Clone)]
52struct InstrumentStats {
53    /// Hourly volume buckets for rolling 24h calculation
54    /// Index 0 is the current hour, index 1 is 1 hour ago, etc.
55    volume_buckets: [Decimal; HOURLY_BUCKETS],
56    /// The Unix timestamp (truncated to hour) of the current bucket
57    current_bucket_hour: u64,
58}
59
60impl InstrumentStats {
61    fn new() -> Self {
62        Self {
63            volume_buckets: [dec!(0); HOURLY_BUCKETS],
64            current_bucket_hour: 0,
65        }
66    }
67
68    /// Get the current hour from a Unix timestamp
69    fn timestamp_to_hour(timestamp: u64) -> u64 {
70        timestamp_to_seconds(timestamp) / 3600
71    }
72
73    /// Add volume to the appropriate bucket, rolling over if needed
74    fn add_volume(&mut self, timestamp: u64, notional_volume: Decimal) {
75        let current_hour = Self::timestamp_to_hour(timestamp);
76
77        if self.current_bucket_hour == 0 {
78            // First fill - initialize
79            self.current_bucket_hour = current_hour;
80            self.volume_buckets[0] = notional_volume;
81            return;
82        }
83
84        let hours_elapsed = current_hour.saturating_sub(self.current_bucket_hour);
85
86        if hours_elapsed == 0 {
87            // Same hour - add to current bucket
88            self.volume_buckets[0] += notional_volume;
89        } else if hours_elapsed < HOURLY_BUCKETS as u64 {
90            // Roll buckets forward
91            // Shift existing buckets to the right by hours_elapsed positions
92            let shift = hours_elapsed as usize;
93            for i in (shift..HOURLY_BUCKETS).rev() {
94                self.volume_buckets[i] = self.volume_buckets[i - shift];
95            }
96            // Zero out the newly exposed buckets
97            for i in 0..shift {
98                self.volume_buckets[i] = dec!(0);
99            }
100            // Add new volume to current bucket
101            self.volume_buckets[0] = notional_volume;
102            self.current_bucket_hour = current_hour;
103        } else {
104            // More than 24 hours elapsed - reset all buckets
105            self.volume_buckets = [dec!(0); HOURLY_BUCKETS];
106            self.volume_buckets[0] = notional_volume;
107            self.current_bucket_hour = current_hour;
108        }
109    }
110
111    /// Get the total 24h volume
112    fn get_volume_24h(&self) -> Decimal {
113        self.volume_buckets.iter().sum()
114    }
115
116    /// Roll buckets to current time (for accurate reads even without fills)
117    fn roll_to_current(&mut self, current_timestamp: u64) {
118        let current_hour = Self::timestamp_to_hour(current_timestamp);
119
120        if self.current_bucket_hour == 0 {
121            return; // No data yet
122        }
123
124        let hours_elapsed = current_hour.saturating_sub(self.current_bucket_hour);
125
126        if hours_elapsed > 0 && hours_elapsed < HOURLY_BUCKETS as u64 {
127            // Roll buckets forward
128            let shift = hours_elapsed as usize;
129            for i in (shift..HOURLY_BUCKETS).rev() {
130                self.volume_buckets[i] = self.volume_buckets[i - shift];
131            }
132            // Zero out the newly exposed buckets
133            for i in 0..shift {
134                self.volume_buckets[i] = dec!(0);
135            }
136            self.current_bucket_hour = current_hour;
137        } else if hours_elapsed >= HOURLY_BUCKETS as u64 {
138            // More than 24 hours elapsed - reset all buckets
139            self.volume_buckets = [dec!(0); HOURLY_BUCKETS];
140            self.current_bucket_hour = current_hour;
141        }
142    }
143}
144
145/// Cache for market statistics (volume_24h and open_interest)
146pub struct MarketStatsCache {
147    /// Per-symbol volume statistics
148    stats: Arc<RwLock<HashMap<String, InstrumentStats>>>,
149    /// Reference to portfolio service for open interest calculation
150    portfolio_service: Arc<dyn PortfolioService + Send + Sync>,
151}
152
153impl MarketStatsCache {
154    /// Create a new MarketStatsCache.
155    ///
156    /// # Arguments
157    /// * `portfolio_service` - The portfolio service to derive open interest from
158    pub fn new(portfolio_service: Arc<dyn PortfolioService + Send + Sync>) -> Self {
159        Self {
160            stats: Arc::new(RwLock::new(HashMap::new())),
161            portfolio_service,
162        }
163    }
164
165    /// Initialize the cache and start listening to events (legacy, no shutdown support).
166    pub async fn initialize(self: Arc<Self>, event_bus: Arc<dyn EventBusTrait>) -> Result<()> {
167        // Create a dummy shutdown channel that never fires.
168        let (tx, rx) = tokio::sync::broadcast::channel::<()>(1);
169        std::mem::forget(tx);
170        self.initialize_with_shutdown(event_bus, rx).await
171    }
172
173    /// Initialize the cache with shutdown support.
174    ///
175    /// This method starts the event listener for fills. For better startup performance,
176    /// use `initialize_with_backfill` which loads last 24h from DB first.
177    pub async fn initialize_with_shutdown(
178        self: Arc<Self>,
179        event_bus: Arc<dyn EventBusTrait>,
180        shutdown_rx: tokio::sync::broadcast::Receiver<()>,
181    ) -> Result<()> {
182        info!("Initializing MarketStatsCache (no DB backfill)");
183
184        // Start event listener for fills
185        self.start_event_listeners_with_shutdown(event_bus, shutdown_rx)
186            .await?;
187
188        info!("MarketStatsCache initialized");
189        Ok(())
190    }
191
192    /// Initialize with DB backfill for the last 24 hours of fills.
193    ///
194    /// This is the recommended initialization method (Phase 3c):
195    /// 1. Load last 24h of fills from database
196    /// 2. Then subscribe to event bus for real-time updates
197    ///
198    /// This avoids replaying all historical fills.
199    pub async fn initialize_with_backfill(
200        self: Arc<Self>,
201        diesel_db: &dyn hypercall_db::AnalyticsReader,
202        event_bus: Arc<dyn EventBusTrait>,
203        shutdown_rx: tokio::sync::broadcast::Receiver<()>,
204    ) -> Result<()> {
205        info!("Initializing MarketStatsCache with DB backfill");
206
207        // 1. Load last 24h of fills from database
208        let fills_loaded = self.backfill_from_db(diesel_db).await?;
209        info!(
210            "MarketStatsCache backfilled {} fills from last 24h",
211            fills_loaded
212        );
213
214        // 2. Subscribe to event bus for real-time updates
215        // TODO: There is a small data gap between the DB backfill query completing and
216        // the event subscription starting. Fills in that window are missed.
217        // For 24h volume stats this is negligible (ms-level gap). A proper fix would
218        // subscribe first (buffering events), backfill, then replay the buffer.
219        self.start_event_listeners_with_shutdown(event_bus, shutdown_rx)
220            .await?;
221
222        info!("MarketStatsCache initialized with DB backfill");
223        Ok(())
224    }
225
226    async fn backfill_from_db(
227        &self,
228        diesel_db: &dyn hypercall_db::AnalyticsReader,
229    ) -> Result<usize> {
230        let cutoff_timestamp = market_stats_cutoff_timestamp();
231
232        match diesel_db.get_fills_since_timestamp(cutoff_timestamp).await {
233            Ok(fill_rows) => {
234                let count = fill_rows.len();
235                let mut skipped_missing_notional = 0;
236                for row in fill_rows {
237                    let Some(notional_volume) = fill_notional_volume(
238                        &row.symbol,
239                        row.size,
240                        row.price,
241                        row.underlying_notional,
242                    ) else {
243                        skipped_missing_notional += 1;
244                        continue;
245                    };
246
247                    let mut stats = self.stats.write().await;
248                    let instrument_stats =
249                        stats.entry(row.symbol).or_insert_with(InstrumentStats::new);
250                    instrument_stats.add_volume(row.timestamp as u64, notional_volume);
251                }
252
253                if skipped_missing_notional > 0 {
254                    warn!(
255                        "MarketStatsCache skipped {} historical option fills missing underlying_notional",
256                        skipped_missing_notional
257                    );
258                }
259
260                Ok(count)
261            }
262            Err(e) => {
263                // Log warning but don't fail - DB might not have fills table in some environments
264                warn!(
265                    "MarketStatsCache DB backfill failed (will rely on event bus): {}",
266                    e
267                );
268                Ok(0)
269            }
270        }
271    }
272
273    /// Start event listeners for fill events.
274    async fn start_event_listeners_with_shutdown(
275        self: Arc<Self>,
276        event_bus: Arc<dyn EventBusTrait>,
277        mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
278    ) -> Result<()> {
279        info!("Starting event listener for MarketStatsCache");
280
281        let topics = vec![TOPIC_FILLS.to_string()];
282        let mut receiver = event_bus
283            .subscribe(topics)
284            .await
285            .map_err(|e| anyhow::anyhow!("Failed to subscribe to fills topic: {}", e))?;
286
287        let cache = self.clone();
288        tokio::spawn(async move {
289            info!("MarketStatsCache event listener started");
290            loop {
291                tokio::select! {
292                    _ = shutdown_rx.recv() => {
293                        info!("MarketStatsCache event listener received shutdown signal");
294                        break;
295                    }
296                    maybe_message = receiver.recv() => {
297                        match maybe_message {
298                            Some(EngineMessage::OrderFilled { fill, .. }) => {
299                                cache.handle_fill(&fill).await;
300                            }
301                            Some(_) => {
302                                // Ignore other message types
303                            }
304                            None => {
305                                // Channel closed
306                                break;
307                            }
308                        }
309                    }
310                }
311            }
312            info!("MarketStatsCache event listener stopped");
313        });
314
315        Ok(())
316    }
317
318    /// Handle a fill event to update volume statistics.
319    async fn handle_fill(&self, fill: &hypercall_types::Fill) {
320        let symbol = &fill.symbol;
321
322        let Some(notional_volume) =
323            fill_notional_volume(symbol, fill.size, fill.price, fill.underlying_notional)
324        else {
325            warn!(
326                "MarketStatsCache skipped option fill {} for {} missing underlying_notional",
327                fill.trade_id, symbol
328            );
329            return;
330        };
331
332        debug!(
333            "MarketStatsCache: Recording fill for {}: size={}, price={}, notional={}",
334            symbol,
335            to_human_readable_decimal(symbol, fill.size),
336            fill.price,
337            notional_volume
338        );
339
340        let mut stats = self.stats.write().await;
341        let instrument_stats = stats
342            .entry(symbol.clone())
343            .or_insert_with(InstrumentStats::new);
344
345        instrument_stats.add_volume(fill.timestamp, notional_volume);
346    }
347
348    /// Get the 24-hour volume for a symbol.
349    pub async fn get_volume_24h(&self, symbol: &str) -> Decimal {
350        let current_timestamp = std::time::SystemTime::now()
351            .duration_since(std::time::UNIX_EPOCH)
352            .map(|d| d.as_secs())
353            .unwrap_or(0);
354
355        let mut stats = self.stats.write().await;
356        if let Some(instrument_stats) = stats.get_mut(symbol) {
357            // Roll to current time before reading
358            instrument_stats.roll_to_current(current_timestamp);
359            instrument_stats.get_volume_24h()
360        } else {
361            dec!(0)
362        }
363    }
364
365    /// Get the open interest for a symbol.
366    ///
367    /// Open interest is calculated as SUM(ABS(amount)) / 2 across all accounts.
368    /// Division by 2 because each position is double-counted (taker + maker sides).
369    pub async fn get_open_interest(&self, symbol: &str) -> Decimal {
370        // Get all portfolios from the portfolio service
371        let portfolios = self.portfolio_service.all_portfolios().await;
372
373        let mut total_abs_amount = dec!(0);
374
375        for portfolio in portfolios.values() {
376            if let Some(position) = portfolio.positions.get(symbol) {
377                total_abs_amount += position.amount.abs();
378            }
379        }
380
381        // Divide by 2 because each trade creates two positions (taker + maker)
382        total_abs_amount / dec!(2)
383    }
384
385    /// Get both volume_24h and open_interest for a symbol.
386    pub async fn get_stats(&self, symbol: &str) -> (Decimal, Decimal) {
387        let volume = self.get_volume_24h(symbol).await;
388        let open_interest = self.get_open_interest(symbol).await;
389        (volume, open_interest)
390    }
391
392    /// Get stats for all tracked symbols.
393    pub async fn get_all_stats(&self) -> HashMap<String, (Decimal, Decimal)> {
394        let current_timestamp = std::time::SystemTime::now()
395            .duration_since(std::time::UNIX_EPOCH)
396            .map(|d| d.as_secs())
397            .unwrap_or(0);
398
399        let portfolios = self.portfolio_service.all_portfolios().await;
400        let mut result = HashMap::new();
401
402        // Roll and snapshot all volume buckets in one lock acquisition.
403        let mut stats = self.stats.write().await;
404        for (symbol, instrument_stats) in stats.iter_mut() {
405            instrument_stats.roll_to_current(current_timestamp);
406            result.insert(symbol.clone(), (instrument_stats.get_volume_24h(), dec!(0)));
407        }
408        drop(stats);
409
410        // Aggregate open interest in a single pass through all positions.
411        for portfolio in portfolios.values() {
412            for (symbol, position) in &portfolio.positions {
413                let entry = result.entry(symbol.clone()).or_insert((dec!(0), dec!(0)));
414                entry.1 += position.amount.abs();
415            }
416        }
417
418        for (_, (_, open_interest)) in result.iter_mut() {
419            *open_interest /= dec!(2);
420        }
421
422        result
423    }
424}
425
426#[cfg(test)]
427mod tests {
428    use super::*;
429    use crate::portfolio::PortfolioServiceImpl;
430
431    fn create_stats() -> InstrumentStats {
432        InstrumentStats::new()
433    }
434
435    #[test]
436    fn test_add_volume_first_fill() {
437        let mut stats = create_stats();
438
439        stats.add_volume(1000, dec!(100));
440
441        assert_eq!(stats.get_volume_24h(), dec!(100));
442        assert_eq!(stats.volume_buckets[0], dec!(100));
443    }
444
445    #[test]
446    fn test_add_volume_same_hour() {
447        let mut stats = create_stats();
448
449        let base_timestamp = 3600; // 1 hour in seconds
450        stats.add_volume(base_timestamp, dec!(100));
451        stats.add_volume(base_timestamp + 1000, dec!(50)); // Same hour
452
453        assert_eq!(stats.get_volume_24h(), dec!(150));
454        assert_eq!(stats.volume_buckets[0], dec!(150));
455    }
456
457    #[test]
458    fn test_add_volume_rollover() {
459        let mut stats = create_stats();
460
461        let hour_0 = 3600;
462        let hour_1 = hour_0 + 3600;
463        let hour_2 = hour_0 + 7200;
464
465        stats.add_volume(hour_0, dec!(100));
466        stats.add_volume(hour_1, dec!(200));
467        stats.add_volume(hour_2, dec!(300));
468
469        assert_eq!(stats.get_volume_24h(), dec!(600)); // 100 + 200 + 300
470        assert_eq!(stats.volume_buckets[0], dec!(300));
471        assert_eq!(stats.volume_buckets[1], dec!(200));
472        assert_eq!(stats.volume_buckets[2], dec!(100));
473    }
474
475    #[test]
476    fn test_volume_expires_after_24h() {
477        let mut stats = create_stats();
478
479        let hour_0 = 3600;
480        stats.add_volume(hour_0, dec!(100));
481
482        // After 24 hours + 1 second
483        let after_24h = hour_0 + (24 * 3600) + 1;
484        stats.add_volume(after_24h, dec!(50));
485
486        // Old volume should be gone
487        assert_eq!(stats.get_volume_24h(), dec!(50));
488    }
489
490    #[test]
491    fn test_roll_to_current() {
492        let mut stats = create_stats();
493
494        let hour_0 = 3600;
495        stats.add_volume(hour_0, dec!(100));
496        stats.add_volume(hour_0 + 3600, dec!(200)); // Hour 1
497
498        // Roll to hour 3
499        let hour_3 = hour_0 + 3 * 3600;
500        stats.roll_to_current(hour_3);
501
502        // Data should be shifted
503        assert_eq!(stats.volume_buckets[0], dec!(0)); // Current hour (no fills)
504        assert_eq!(stats.volume_buckets[1], dec!(0)); // 1 hour ago (no fills)
505        assert_eq!(stats.volume_buckets[2], dec!(200)); // 2 hours ago
506        assert_eq!(stats.volume_buckets[3], dec!(100)); // 3 hours ago
507        assert_eq!(stats.get_volume_24h(), dec!(300));
508    }
509
510    #[test]
511    fn test_roll_past_24h() {
512        let mut stats = create_stats();
513
514        let hour_0 = 3600;
515        stats.add_volume(hour_0, dec!(100));
516
517        // Roll past 24 hours
518        let way_later = hour_0 + 25 * 3600;
519        stats.roll_to_current(way_later);
520
521        // All data should be gone
522        assert_eq!(stats.get_volume_24h(), dec!(0));
523    }
524
525    #[tokio::test]
526    async fn test_cache_get_volume() {
527        let portfolio_service = Arc::new(PortfolioServiceImpl::new());
528        let cache = Arc::new(MarketStatsCache::new(portfolio_service));
529
530        // Initially should be zero
531        let vol = cache.get_volume_24h("BTC-20260130-100000-C").await;
532        assert_eq!(vol, dec!(0));
533    }
534
535    #[tokio::test]
536    async fn test_cache_get_open_interest() {
537        use hypercall_types::wallet_address::test_wallet;
538
539        let portfolio_service = Arc::new(PortfolioServiceImpl::new());
540
541        // Apply fills to create positions. Cash is supplied separately by balance providers.
542        use hypercall_types::Fill;
543        use hypercall_types::Side;
544
545        let fill = Fill {
546            trade_id: 1,
547            taker_order_id: 100,
548            maker_order_id: 101,
549            symbol: "BTC-20260130-100000-C".to_string(),
550            price: dec!(1000),
551            size: dec!(10_000_000), // 10 contracts in contract units
552            taker_side: Side::Buy,
553            taker_wallet_address: test_wallet(1),
554            maker_wallet_address: test_wallet(2),
555            fee: dec!(0),
556            is_taker: true,
557            timestamp: 0,
558            builder_code_address: None,
559            builder_code_fee: None,
560            source: Default::default(),
561            taker_realized_pnl: None,
562            maker_realized_pnl: None,
563            underlying_notional: None,
564        };
565
566        portfolio_service
567            .apply_event(&EngineMessage::OrderFilled {
568                accounting: hypercall_engine::FillAccounting::from_fill(&fill),
569                fill,
570            })
571            .await
572            .unwrap();
573
574        let cache = Arc::new(MarketStatsCache::new(portfolio_service));
575
576        // Open interest should be (|10| + |-10|) / 2 = 10
577        let oi = cache.get_open_interest("BTC-20260130-100000-C").await;
578        assert_eq!(oi, dec!(10));
579    }
580
581    #[tokio::test]
582    async fn test_handle_fill() {
583        use hypercall_types::wallet_address::test_wallet;
584
585        let portfolio_service = Arc::new(PortfolioServiceImpl::new());
586        let cache = Arc::new(MarketStatsCache::new(portfolio_service));
587
588        use hypercall_types::Fill;
589        use hypercall_types::Side;
590
591        let fill = Fill {
592            trade_id: 1,
593            taker_order_id: 100,
594            maker_order_id: 101,
595            symbol: "BTC-20260130-100000-C".to_string(),
596            price: dec!(1000),
597            size: dec!(10_000_000), // 10 contracts in contract units
598            taker_side: Side::Buy,
599            taker_wallet_address: test_wallet(1),
600            maker_wallet_address: test_wallet(2),
601            fee: dec!(0),
602            is_taker: true,
603            timestamp: std::time::SystemTime::now()
604                .duration_since(std::time::UNIX_EPOCH)
605                .unwrap()
606                .as_secs(),
607            builder_code_address: None,
608            builder_code_fee: None,
609            source: Default::default(),
610            taker_realized_pnl: None,
611            maker_realized_pnl: None,
612            underlying_notional: Some(dec!(950000)),
613        };
614
615        cache.handle_fill(&fill).await;
616
617        let vol = cache.get_volume_24h("BTC-20260130-100000-C").await;
618        assert_eq!(vol, dec!(950000));
619    }
620
621    #[tokio::test]
622    async fn test_handle_option_fill_missing_underlying_notional_skips_volume() {
623        use hypercall_types::wallet_address::test_wallet;
624
625        let portfolio_service = Arc::new(PortfolioServiceImpl::new());
626        let cache = Arc::new(MarketStatsCache::new(portfolio_service));
627
628        use hypercall_types::Fill;
629        use hypercall_types::Side;
630
631        let fill = Fill {
632            trade_id: 1,
633            taker_order_id: 100,
634            maker_order_id: 101,
635            symbol: "BTC-20260130-100000-C".to_string(),
636            price: dec!(1000),
637            size: dec!(10_000_000),
638            taker_side: Side::Buy,
639            taker_wallet_address: test_wallet(1),
640            maker_wallet_address: test_wallet(2),
641            fee: dec!(0),
642            is_taker: true,
643            timestamp: std::time::SystemTime::now()
644                .duration_since(std::time::UNIX_EPOCH)
645                .unwrap()
646                .as_secs(),
647            builder_code_address: None,
648            builder_code_fee: None,
649            source: Default::default(),
650            taker_realized_pnl: None,
651            maker_realized_pnl: None,
652            underlying_notional: None,
653        };
654
655        cache.handle_fill(&fill).await;
656
657        let vol = cache.get_volume_24h("BTC-20260130-100000-C").await;
658        assert_eq!(vol, dec!(0));
659    }
660}