1use anyhow::Result;
4use hypercall_types::{to_human_readable_decimal, utils::is_option_symbol};
5use rust_decimal::Decimal;
6use rust_decimal_macros::dec;
7use std::collections::HashMap;
8use std::sync::Arc;
9use tokio::sync::RwLock;
10use tracing::{debug, info, warn};
11
12use crate::messaging::EventBusTrait;
13use crate::portfolio::PortfolioService;
14use crate::shared::topics::TOPIC_FILLS;
15use hypercall_types::EngineMessage;
16
17const HOURLY_BUCKETS: usize = 24;
19const UNIX_MS_THRESHOLD: u64 = 10_000_000_000;
20const DAY_MS: u128 = 86_400_000;
21
22fn fill_notional_volume(
23 symbol: &str,
24 size: Decimal,
25 price: Decimal,
26 underlying_notional: Option<Decimal>,
27) -> Option<Decimal> {
28 if is_option_symbol(symbol) {
29 return underlying_notional;
30 }
31
32 Some(to_human_readable_decimal(symbol, size).abs() * price)
33}
34
35fn market_stats_cutoff_timestamp() -> i64 {
36 std::time::SystemTime::now()
37 .duration_since(std::time::UNIX_EPOCH)
38 .map(|d| d.as_millis().saturating_sub(DAY_MS) as i64)
39 .unwrap_or(0)
40}
41
42fn timestamp_to_seconds(timestamp: u64) -> u64 {
43 if timestamp >= UNIX_MS_THRESHOLD {
44 timestamp / 1000
45 } else {
46 timestamp
47 }
48}
49
50#[derive(Debug, Clone)]
52struct InstrumentStats {
53 volume_buckets: [Decimal; HOURLY_BUCKETS],
56 current_bucket_hour: u64,
58}
59
60impl InstrumentStats {
61 fn new() -> Self {
62 Self {
63 volume_buckets: [dec!(0); HOURLY_BUCKETS],
64 current_bucket_hour: 0,
65 }
66 }
67
68 fn timestamp_to_hour(timestamp: u64) -> u64 {
70 timestamp_to_seconds(timestamp) / 3600
71 }
72
73 fn add_volume(&mut self, timestamp: u64, notional_volume: Decimal) {
75 let current_hour = Self::timestamp_to_hour(timestamp);
76
77 if self.current_bucket_hour == 0 {
78 self.current_bucket_hour = current_hour;
80 self.volume_buckets[0] = notional_volume;
81 return;
82 }
83
84 let hours_elapsed = current_hour.saturating_sub(self.current_bucket_hour);
85
86 if hours_elapsed == 0 {
87 self.volume_buckets[0] += notional_volume;
89 } else if hours_elapsed < HOURLY_BUCKETS as u64 {
90 let shift = hours_elapsed as usize;
93 for i in (shift..HOURLY_BUCKETS).rev() {
94 self.volume_buckets[i] = self.volume_buckets[i - shift];
95 }
96 for i in 0..shift {
98 self.volume_buckets[i] = dec!(0);
99 }
100 self.volume_buckets[0] = notional_volume;
102 self.current_bucket_hour = current_hour;
103 } else {
104 self.volume_buckets = [dec!(0); HOURLY_BUCKETS];
106 self.volume_buckets[0] = notional_volume;
107 self.current_bucket_hour = current_hour;
108 }
109 }
110
111 fn get_volume_24h(&self) -> Decimal {
113 self.volume_buckets.iter().sum()
114 }
115
116 fn roll_to_current(&mut self, current_timestamp: u64) {
118 let current_hour = Self::timestamp_to_hour(current_timestamp);
119
120 if self.current_bucket_hour == 0 {
121 return; }
123
124 let hours_elapsed = current_hour.saturating_sub(self.current_bucket_hour);
125
126 if hours_elapsed > 0 && hours_elapsed < HOURLY_BUCKETS as u64 {
127 let shift = hours_elapsed as usize;
129 for i in (shift..HOURLY_BUCKETS).rev() {
130 self.volume_buckets[i] = self.volume_buckets[i - shift];
131 }
132 for i in 0..shift {
134 self.volume_buckets[i] = dec!(0);
135 }
136 self.current_bucket_hour = current_hour;
137 } else if hours_elapsed >= HOURLY_BUCKETS as u64 {
138 self.volume_buckets = [dec!(0); HOURLY_BUCKETS];
140 self.current_bucket_hour = current_hour;
141 }
142 }
143}
144
145pub struct MarketStatsCache {
147 stats: Arc<RwLock<HashMap<String, InstrumentStats>>>,
149 portfolio_service: Arc<dyn PortfolioService + Send + Sync>,
151}
152
153impl MarketStatsCache {
154 pub fn new(portfolio_service: Arc<dyn PortfolioService + Send + Sync>) -> Self {
159 Self {
160 stats: Arc::new(RwLock::new(HashMap::new())),
161 portfolio_service,
162 }
163 }
164
165 pub async fn initialize(self: Arc<Self>, event_bus: Arc<dyn EventBusTrait>) -> Result<()> {
167 let (tx, rx) = tokio::sync::broadcast::channel::<()>(1);
169 std::mem::forget(tx);
170 self.initialize_with_shutdown(event_bus, rx).await
171 }
172
173 pub async fn initialize_with_shutdown(
178 self: Arc<Self>,
179 event_bus: Arc<dyn EventBusTrait>,
180 shutdown_rx: tokio::sync::broadcast::Receiver<()>,
181 ) -> Result<()> {
182 info!("Initializing MarketStatsCache (no DB backfill)");
183
184 self.start_event_listeners_with_shutdown(event_bus, shutdown_rx)
186 .await?;
187
188 info!("MarketStatsCache initialized");
189 Ok(())
190 }
191
192 pub async fn initialize_with_backfill(
200 self: Arc<Self>,
201 diesel_db: &dyn hypercall_db::AnalyticsReader,
202 event_bus: Arc<dyn EventBusTrait>,
203 shutdown_rx: tokio::sync::broadcast::Receiver<()>,
204 ) -> Result<()> {
205 info!("Initializing MarketStatsCache with DB backfill");
206
207 let fills_loaded = self.backfill_from_db(diesel_db).await?;
209 info!(
210 "MarketStatsCache backfilled {} fills from last 24h",
211 fills_loaded
212 );
213
214 self.start_event_listeners_with_shutdown(event_bus, shutdown_rx)
220 .await?;
221
222 info!("MarketStatsCache initialized with DB backfill");
223 Ok(())
224 }
225
226 async fn backfill_from_db(
227 &self,
228 diesel_db: &dyn hypercall_db::AnalyticsReader,
229 ) -> Result<usize> {
230 let cutoff_timestamp = market_stats_cutoff_timestamp();
231
232 match diesel_db.get_fills_since_timestamp(cutoff_timestamp).await {
233 Ok(fill_rows) => {
234 let count = fill_rows.len();
235 let mut skipped_missing_notional = 0;
236 for row in fill_rows {
237 let Some(notional_volume) = fill_notional_volume(
238 &row.symbol,
239 row.size,
240 row.price,
241 row.underlying_notional,
242 ) else {
243 skipped_missing_notional += 1;
244 continue;
245 };
246
247 let mut stats = self.stats.write().await;
248 let instrument_stats =
249 stats.entry(row.symbol).or_insert_with(InstrumentStats::new);
250 instrument_stats.add_volume(row.timestamp as u64, notional_volume);
251 }
252
253 if skipped_missing_notional > 0 {
254 warn!(
255 "MarketStatsCache skipped {} historical option fills missing underlying_notional",
256 skipped_missing_notional
257 );
258 }
259
260 Ok(count)
261 }
262 Err(e) => {
263 warn!(
265 "MarketStatsCache DB backfill failed (will rely on event bus): {}",
266 e
267 );
268 Ok(0)
269 }
270 }
271 }
272
273 async fn start_event_listeners_with_shutdown(
275 self: Arc<Self>,
276 event_bus: Arc<dyn EventBusTrait>,
277 mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
278 ) -> Result<()> {
279 info!("Starting event listener for MarketStatsCache");
280
281 let topics = vec![TOPIC_FILLS.to_string()];
282 let mut receiver = event_bus
283 .subscribe(topics)
284 .await
285 .map_err(|e| anyhow::anyhow!("Failed to subscribe to fills topic: {}", e))?;
286
287 let cache = self.clone();
288 tokio::spawn(async move {
289 info!("MarketStatsCache event listener started");
290 loop {
291 tokio::select! {
292 _ = shutdown_rx.recv() => {
293 info!("MarketStatsCache event listener received shutdown signal");
294 break;
295 }
296 maybe_message = receiver.recv() => {
297 match maybe_message {
298 Some(EngineMessage::OrderFilled { fill, .. }) => {
299 cache.handle_fill(&fill).await;
300 }
301 Some(_) => {
302 }
304 None => {
305 break;
307 }
308 }
309 }
310 }
311 }
312 info!("MarketStatsCache event listener stopped");
313 });
314
315 Ok(())
316 }
317
318 async fn handle_fill(&self, fill: &hypercall_types::Fill) {
320 let symbol = &fill.symbol;
321
322 let Some(notional_volume) =
323 fill_notional_volume(symbol, fill.size, fill.price, fill.underlying_notional)
324 else {
325 warn!(
326 "MarketStatsCache skipped option fill {} for {} missing underlying_notional",
327 fill.trade_id, symbol
328 );
329 return;
330 };
331
332 debug!(
333 "MarketStatsCache: Recording fill for {}: size={}, price={}, notional={}",
334 symbol,
335 to_human_readable_decimal(symbol, fill.size),
336 fill.price,
337 notional_volume
338 );
339
340 let mut stats = self.stats.write().await;
341 let instrument_stats = stats
342 .entry(symbol.clone())
343 .or_insert_with(InstrumentStats::new);
344
345 instrument_stats.add_volume(fill.timestamp, notional_volume);
346 }
347
348 pub async fn get_volume_24h(&self, symbol: &str) -> Decimal {
350 let current_timestamp = std::time::SystemTime::now()
351 .duration_since(std::time::UNIX_EPOCH)
352 .map(|d| d.as_secs())
353 .unwrap_or(0);
354
355 let mut stats = self.stats.write().await;
356 if let Some(instrument_stats) = stats.get_mut(symbol) {
357 instrument_stats.roll_to_current(current_timestamp);
359 instrument_stats.get_volume_24h()
360 } else {
361 dec!(0)
362 }
363 }
364
365 pub async fn get_open_interest(&self, symbol: &str) -> Decimal {
370 let portfolios = self.portfolio_service.all_portfolios().await;
372
373 let mut total_abs_amount = dec!(0);
374
375 for portfolio in portfolios.values() {
376 if let Some(position) = portfolio.positions.get(symbol) {
377 total_abs_amount += position.amount.abs();
378 }
379 }
380
381 total_abs_amount / dec!(2)
383 }
384
385 pub async fn get_stats(&self, symbol: &str) -> (Decimal, Decimal) {
387 let volume = self.get_volume_24h(symbol).await;
388 let open_interest = self.get_open_interest(symbol).await;
389 (volume, open_interest)
390 }
391
392 pub async fn get_all_stats(&self) -> HashMap<String, (Decimal, Decimal)> {
394 let current_timestamp = std::time::SystemTime::now()
395 .duration_since(std::time::UNIX_EPOCH)
396 .map(|d| d.as_secs())
397 .unwrap_or(0);
398
399 let portfolios = self.portfolio_service.all_portfolios().await;
400 let mut result = HashMap::new();
401
402 let mut stats = self.stats.write().await;
404 for (symbol, instrument_stats) in stats.iter_mut() {
405 instrument_stats.roll_to_current(current_timestamp);
406 result.insert(symbol.clone(), (instrument_stats.get_volume_24h(), dec!(0)));
407 }
408 drop(stats);
409
410 for portfolio in portfolios.values() {
412 for (symbol, position) in &portfolio.positions {
413 let entry = result.entry(symbol.clone()).or_insert((dec!(0), dec!(0)));
414 entry.1 += position.amount.abs();
415 }
416 }
417
418 for (_, (_, open_interest)) in result.iter_mut() {
419 *open_interest /= dec!(2);
420 }
421
422 result
423 }
424}
425
426#[cfg(test)]
427mod tests {
428 use super::*;
429 use crate::portfolio::PortfolioServiceImpl;
430
431 fn create_stats() -> InstrumentStats {
432 InstrumentStats::new()
433 }
434
435 #[test]
436 fn test_add_volume_first_fill() {
437 let mut stats = create_stats();
438
439 stats.add_volume(1000, dec!(100));
440
441 assert_eq!(stats.get_volume_24h(), dec!(100));
442 assert_eq!(stats.volume_buckets[0], dec!(100));
443 }
444
445 #[test]
446 fn test_add_volume_same_hour() {
447 let mut stats = create_stats();
448
449 let base_timestamp = 3600; stats.add_volume(base_timestamp, dec!(100));
451 stats.add_volume(base_timestamp + 1000, dec!(50)); assert_eq!(stats.get_volume_24h(), dec!(150));
454 assert_eq!(stats.volume_buckets[0], dec!(150));
455 }
456
457 #[test]
458 fn test_add_volume_rollover() {
459 let mut stats = create_stats();
460
461 let hour_0 = 3600;
462 let hour_1 = hour_0 + 3600;
463 let hour_2 = hour_0 + 7200;
464
465 stats.add_volume(hour_0, dec!(100));
466 stats.add_volume(hour_1, dec!(200));
467 stats.add_volume(hour_2, dec!(300));
468
469 assert_eq!(stats.get_volume_24h(), dec!(600)); assert_eq!(stats.volume_buckets[0], dec!(300));
471 assert_eq!(stats.volume_buckets[1], dec!(200));
472 assert_eq!(stats.volume_buckets[2], dec!(100));
473 }
474
475 #[test]
476 fn test_volume_expires_after_24h() {
477 let mut stats = create_stats();
478
479 let hour_0 = 3600;
480 stats.add_volume(hour_0, dec!(100));
481
482 let after_24h = hour_0 + (24 * 3600) + 1;
484 stats.add_volume(after_24h, dec!(50));
485
486 assert_eq!(stats.get_volume_24h(), dec!(50));
488 }
489
490 #[test]
491 fn test_roll_to_current() {
492 let mut stats = create_stats();
493
494 let hour_0 = 3600;
495 stats.add_volume(hour_0, dec!(100));
496 stats.add_volume(hour_0 + 3600, dec!(200)); let hour_3 = hour_0 + 3 * 3600;
500 stats.roll_to_current(hour_3);
501
502 assert_eq!(stats.volume_buckets[0], dec!(0)); assert_eq!(stats.volume_buckets[1], dec!(0)); assert_eq!(stats.volume_buckets[2], dec!(200)); assert_eq!(stats.volume_buckets[3], dec!(100)); assert_eq!(stats.get_volume_24h(), dec!(300));
508 }
509
510 #[test]
511 fn test_roll_past_24h() {
512 let mut stats = create_stats();
513
514 let hour_0 = 3600;
515 stats.add_volume(hour_0, dec!(100));
516
517 let way_later = hour_0 + 25 * 3600;
519 stats.roll_to_current(way_later);
520
521 assert_eq!(stats.get_volume_24h(), dec!(0));
523 }
524
525 #[tokio::test]
526 async fn test_cache_get_volume() {
527 let portfolio_service = Arc::new(PortfolioServiceImpl::new());
528 let cache = Arc::new(MarketStatsCache::new(portfolio_service));
529
530 let vol = cache.get_volume_24h("BTC-20260130-100000-C").await;
532 assert_eq!(vol, dec!(0));
533 }
534
535 #[tokio::test]
536 async fn test_cache_get_open_interest() {
537 use hypercall_types::wallet_address::test_wallet;
538
539 let portfolio_service = Arc::new(PortfolioServiceImpl::new());
540
541 use hypercall_types::Fill;
543 use hypercall_types::Side;
544
545 let fill = Fill {
546 trade_id: 1,
547 taker_order_id: 100,
548 maker_order_id: 101,
549 symbol: "BTC-20260130-100000-C".to_string(),
550 price: dec!(1000),
551 size: dec!(10_000_000), taker_side: Side::Buy,
553 taker_wallet_address: test_wallet(1),
554 maker_wallet_address: test_wallet(2),
555 fee: dec!(0),
556 is_taker: true,
557 timestamp: 0,
558 builder_code_address: None,
559 builder_code_fee: None,
560 source: Default::default(),
561 taker_realized_pnl: None,
562 maker_realized_pnl: None,
563 underlying_notional: None,
564 };
565
566 portfolio_service
567 .apply_event(&EngineMessage::OrderFilled {
568 accounting: hypercall_engine::FillAccounting::from_fill(&fill),
569 fill,
570 })
571 .await
572 .unwrap();
573
574 let cache = Arc::new(MarketStatsCache::new(portfolio_service));
575
576 let oi = cache.get_open_interest("BTC-20260130-100000-C").await;
578 assert_eq!(oi, dec!(10));
579 }
580
581 #[tokio::test]
582 async fn test_handle_fill() {
583 use hypercall_types::wallet_address::test_wallet;
584
585 let portfolio_service = Arc::new(PortfolioServiceImpl::new());
586 let cache = Arc::new(MarketStatsCache::new(portfolio_service));
587
588 use hypercall_types::Fill;
589 use hypercall_types::Side;
590
591 let fill = Fill {
592 trade_id: 1,
593 taker_order_id: 100,
594 maker_order_id: 101,
595 symbol: "BTC-20260130-100000-C".to_string(),
596 price: dec!(1000),
597 size: dec!(10_000_000), taker_side: Side::Buy,
599 taker_wallet_address: test_wallet(1),
600 maker_wallet_address: test_wallet(2),
601 fee: dec!(0),
602 is_taker: true,
603 timestamp: std::time::SystemTime::now()
604 .duration_since(std::time::UNIX_EPOCH)
605 .unwrap()
606 .as_secs(),
607 builder_code_address: None,
608 builder_code_fee: None,
609 source: Default::default(),
610 taker_realized_pnl: None,
611 maker_realized_pnl: None,
612 underlying_notional: Some(dec!(950000)),
613 };
614
615 cache.handle_fill(&fill).await;
616
617 let vol = cache.get_volume_24h("BTC-20260130-100000-C").await;
618 assert_eq!(vol, dec!(950000));
619 }
620
621 #[tokio::test]
622 async fn test_handle_option_fill_missing_underlying_notional_skips_volume() {
623 use hypercall_types::wallet_address::test_wallet;
624
625 let portfolio_service = Arc::new(PortfolioServiceImpl::new());
626 let cache = Arc::new(MarketStatsCache::new(portfolio_service));
627
628 use hypercall_types::Fill;
629 use hypercall_types::Side;
630
631 let fill = Fill {
632 trade_id: 1,
633 taker_order_id: 100,
634 maker_order_id: 101,
635 symbol: "BTC-20260130-100000-C".to_string(),
636 price: dec!(1000),
637 size: dec!(10_000_000),
638 taker_side: Side::Buy,
639 taker_wallet_address: test_wallet(1),
640 maker_wallet_address: test_wallet(2),
641 fee: dec!(0),
642 is_taker: true,
643 timestamp: std::time::SystemTime::now()
644 .duration_since(std::time::UNIX_EPOCH)
645 .unwrap()
646 .as_secs(),
647 builder_code_address: None,
648 builder_code_fee: None,
649 source: Default::default(),
650 taker_realized_pnl: None,
651 maker_realized_pnl: None,
652 underlying_notional: None,
653 };
654
655 cache.handle_fill(&fill).await;
656
657 let vol = cache.get_volume_24h("BTC-20260130-100000-C").await;
658 assert_eq!(vol, dec!(0));
659 }
660}