Skip to main content

hypercall/read_cache/
mmp.rs

1use crate::shared::order_types::ParsedSymbol;
2use anyhow::{Context, Result};
3use hypercall_db::MmpConfigRecord as MmpConfig;
4use hypercall_db::MmpConfigWriter;
5use hypercall_types::Fill;
6use hypercall_types::{to_contract_units_decimal, WalletAddress};
7use rust_decimal::prelude::ToPrimitive;
8use std::collections::{HashMap, VecDeque};
9use std::sync::Arc;
10use tokio::sync::RwLock;
11use tracing::{debug, info, warn};
12
13/// Represents a single fill record with calculated metrics
14#[derive(Debug, Clone)]
15struct FillRecord {
16    timestamp: u64,
17    quantity: u64,
18    delta: f64,
19    vega: f64,
20}
21
22/// Per-currency fill tracking window for one wallet
23#[derive(Debug, Clone)]
24struct CurrencyFillWindow {
25    fills: VecDeque<FillRecord>,
26    cumulative_qty: u64,
27    cumulative_delta: f64,
28    cumulative_vega: f64,
29    frozen_until: Option<u64>, // Timestamp when this currency becomes unfrozen
30}
31
32impl CurrencyFillWindow {
33    fn new() -> Self {
34        Self {
35            fills: VecDeque::new(),
36            cumulative_qty: 0,
37            cumulative_delta: 0.0,
38            cumulative_vega: 0.0,
39            frozen_until: None,
40        }
41    }
42
43    /// Add the fill to tracking and check if limits are now breached
44    /// ALWAYS adds the fill first, then checks if cumulative metrics breach limits
45    /// Returns true if limits are breached after adding, false otherwise
46    fn eval_fill(
47        &mut self,
48        record: FillRecord,
49        qty_limit: Option<i64>,
50        delta_limit: Option<f64>,
51        vega_limit: Option<f64>,
52    ) -> bool {
53        // ALWAYS add the fill first
54        self.cumulative_qty += record.quantity;
55        self.cumulative_delta += record.delta;
56        self.cumulative_vega += record.vega;
57        self.fills.push_back(record);
58
59        // Then check if the new cumulative values breach any limits
60        if let Some(limit) = qty_limit {
61            if self.cumulative_qty > limit as u64 {
62                return true; // Breach detected
63            }
64        }
65
66        if let Some(limit) = delta_limit {
67            if self.cumulative_delta.abs() > limit {
68                return true; // Breach detected
69            }
70        }
71
72        if let Some(limit) = vega_limit {
73            if self.cumulative_vega.abs() > limit {
74                return true; // Breach detected
75            }
76        }
77
78        false // No breach
79    }
80
81    /// Remove fills older than the window interval
82    fn evict_old_fills(&mut self, current_time: u64, interval_ms: u64) {
83        let cutoff_time = current_time.saturating_sub(interval_ms);
84
85        while let Some(front) = self.fills.front() {
86            if front.timestamp < cutoff_time {
87                // Remove from cumulative counts
88                self.cumulative_qty = self.cumulative_qty.saturating_sub(front.quantity);
89                self.cumulative_delta -= front.delta;
90                self.cumulative_vega -= front.vega;
91                self.fills.pop_front();
92            } else {
93                break;
94            }
95        }
96    }
97
98    /// Check if currently frozen
99    fn is_frozen(&self, current_time: u64) -> bool {
100        if let Some(frozen_until) = self.frozen_until {
101            current_time < frozen_until
102        } else {
103            false
104        }
105    }
106
107    /// Set frozen state
108    fn set_frozen(&mut self, current_time: u64, frozen_duration_ms: u64) {
109        self.frozen_until = Some(current_time + frozen_duration_ms);
110    }
111
112    /// Clear all fills and unfreeze
113    fn reset(&mut self) {
114        self.fills.clear();
115        self.cumulative_qty = 0;
116        self.cumulative_delta = 0.0;
117        self.cumulative_vega = 0.0;
118        self.frozen_until = None;
119    }
120}
121
122/// Aggregate state for one wallet across all currencies
123#[derive(Debug, Clone)]
124struct WalletMmpState {
125    currencies: HashMap<String, CurrencyFillWindow>,
126}
127
128impl WalletMmpState {
129    fn new() -> Self {
130        Self {
131            currencies: HashMap::new(),
132        }
133    }
134
135    fn get_or_create_currency(&mut self, currency: &str) -> &mut CurrencyFillWindow {
136        self.currencies
137            .entry(currency.to_string())
138            .or_insert_with(CurrencyFillWindow::new)
139    }
140}
141
142/// Main MMP cache for tracking fills and checking limits
143pub struct MmpCache {
144    /// MMP configurations indexed by (wallet_address, currency)
145    configs: Arc<RwLock<HashMap<(WalletAddress, String), MmpConfig>>>,
146
147    /// Per-wallet state tracking
148    state: Arc<RwLock<HashMap<WalletAddress, WalletMmpState>>>,
149
150    /// Database handler for persistence
151    db: Arc<dyn MmpConfigWriter>,
152
153    /// Greeks cache for calculating delta and vega
154    greeks_cache: Arc<crate::read_cache::greeks::GreeksCache>,
155}
156
157impl MmpCache {
158    /// Create a new MMP cache
159    pub fn new(
160        db: Arc<dyn MmpConfigWriter>,
161        greeks_cache: Arc<crate::read_cache::greeks::GreeksCache>,
162    ) -> Result<Self> {
163        info!("Initializing MmpCache");
164
165        Ok(Self {
166            configs: Arc::new(RwLock::new(HashMap::new())),
167            state: Arc::new(RwLock::new(HashMap::new())),
168            db: db,
169            greeks_cache,
170        })
171    }
172
173    /// Load all MMP configs from database
174    pub async fn load_configs_from_db(&self) -> Result<()> {
175        let configs_list = self
176            .db
177            .get_all_mmp_configs_sync()
178            .context("Failed to load MMP configs from database")?;
179
180        let mut configs = self.configs.write().await;
181        for config in configs_list {
182            let key = (config.wallet_address, config.currency.clone());
183            info!(
184                "Loaded MMP config: wallet={}, currency={}, enabled={}",
185                config.wallet_address, config.currency, config.enabled
186            );
187            configs.insert(key, config);
188        }
189
190        info!("Loaded {} MMP configs from database", configs.len());
191        Ok(())
192    }
193
194    /// Set or update MMP config
195    pub async fn set_config(&self, config: MmpConfig) -> Result<()> {
196        // Save to database
197        self.db
198            .save_mmp_config_sync(&config)
199            .context("Failed to save MMP config to database")?;
200
201        // Load the saved config to get timestamps
202        let saved_config = self
203            .db
204            .get_mmp_config_sync(&config.wallet_address, &config.currency)?
205            .ok_or_else(|| anyhow::anyhow!("Config not found after save"))?;
206
207        // Update in-memory cache
208        let key = (config.wallet_address, config.currency.clone());
209        let mut configs = self.configs.write().await;
210        configs.insert(key, saved_config);
211
212        info!(
213            "MMP config updated: wallet={}, currency={}",
214            config.wallet_address, config.currency
215        );
216        Ok(())
217    }
218
219    /// Get MMP config for wallet and currency
220    pub async fn get_config(&self, wallet: &WalletAddress, currency: &str) -> Option<MmpConfig> {
221        let configs = self.configs.read().await;
222        configs.get(&(*wallet, currency.to_string())).cloned()
223    }
224
225    /// Fast path: Check if MMP is enabled for wallet and currency
226    /// Returns true if MMP config exists and is enabled, false otherwise
227    /// This is faster than calling process_fill() for wallets without MMP
228    pub async fn is_mmp_enabled(&self, wallet: &WalletAddress, currency: &str) -> bool {
229        let configs = self.configs.read().await;
230        if let Some(config) = configs.get(&(*wallet, currency.to_string())) {
231            config.enabled
232        } else {
233            false
234        }
235    }
236
237    /// Get all MMP configs for a wallet
238    pub async fn get_configs_for_wallet(&self, wallet: &WalletAddress) -> Vec<MmpConfig> {
239        let configs = self.configs.read().await;
240        configs
241            .iter()
242            .filter(|(key, _)| &key.0 == wallet)
243            .map(|(_, config)| config.clone())
244            .collect()
245    }
246
247    /// Delete (disable) MMP config
248    pub async fn delete_config(&self, wallet: &WalletAddress, currency: &str) -> Result<()> {
249        // Update database
250        self.db
251            .delete_mmp_config_sync(wallet, currency)
252            .context("Failed to delete MMP config from database")?;
253
254        // Remove from in-memory cache
255        let mut configs = self.configs.write().await;
256        let key = (*wallet, currency.to_string());
257        configs.remove(&key);
258
259        info!(
260            "MMP config deleted: wallet={}, currency={}",
261            wallet, currency
262        );
263        Ok(())
264    }
265
266    /// Reset MMP state for a specific currency (clear fills and unfreeze)
267    pub async fn reset_mmp(&self, wallet: &WalletAddress, currency: &str) {
268        let mut state = self.state.write().await;
269        if let Some(wallet_state) = state.get_mut(wallet) {
270            if let Some(currency_window) = wallet_state.currencies.get_mut(currency) {
271                currency_window.reset();
272                info!("MMP state reset: wallet={}, currency={}", wallet, currency);
273            }
274        }
275    }
276
277    /// Check if wallet+currency is currently frozen
278    pub async fn is_frozen(
279        &self,
280        wallet: &WalletAddress,
281        currency: &str,
282        current_time: u64,
283    ) -> bool {
284        let state = self.state.read().await;
285        if let Some(wallet_state) = state.get(wallet) {
286            if let Some(currency_window) = wallet_state.currencies.get(currency) {
287                return currency_window.is_frozen(current_time);
288            }
289        }
290        false
291    }
292
293    /// Process a fill and check if MMP limits are triggered
294    /// Returns Ok(()) if limits not exceeded, Err with reason if triggered
295    pub async fn process_fill(
296        &self,
297        wallet: &WalletAddress,
298        currency: &str,
299        fill: &Fill,
300        current_time: u64,
301    ) -> Result<(), String> {
302        // Skip perps - MMP only applies to options
303        if ParsedSymbol::from_symbol(&fill.symbol).is_err() {
304            return Ok(());
305        }
306
307        // Get config for this wallet+currency
308        let config = match self.get_config(wallet, currency).await {
309            Some(cfg) if cfg.enabled => cfg,
310            _ => {
311                // No config or disabled, allow fill
312                return Ok(());
313            }
314        };
315
316        // Calculate fill metrics (delta and vega)
317        let (delta, vega) = self
318            .calculate_fill_metrics(fill)
319            .await
320            .map_err(|e| format!("Failed to calculate fill metrics: {}", e))?;
321
322        // Convert Decimal size to u64 contract units for MMP tracking
323        let size_contract_units = to_contract_units_decimal(&fill.symbol, fill.size)
324            .to_u64()
325            .unwrap_or(0);
326        let record = FillRecord {
327            timestamp: fill.timestamp,
328            quantity: size_contract_units,
329            delta,
330            vega,
331        };
332
333        // Get or create wallet state
334        let mut state = self.state.write().await;
335        let wallet_state = state.entry(*wallet).or_insert_with(WalletMmpState::new);
336
337        let currency_window = wallet_state.get_or_create_currency(currency);
338
339        // Check if frozen
340        if currency_window.is_frozen(current_time) {
341            debug!(
342                "MMP: Fill rejected - wallet {} currency {} is frozen",
343                wallet, currency
344            );
345            return Err("MMP triggered - currently frozen".to_string());
346        }
347
348        // Evict old fills from rolling window
349        currency_window.evict_old_fills(current_time, config.interval_ms as u64);
350
351        // Evaluate if adding this fill would breach limits
352        // Convert Decimal limits to i64/f64 for internal MMP logic
353        let breached = currency_window.eval_fill(
354            record,
355            config.qty_limit.and_then(|d| d.to_i64()),
356            config.delta_limit.and_then(|d| d.to_f64()),
357            config.vega_limit.and_then(|d| d.to_f64()),
358        );
359
360        if breached {
361            // Freeze this currency
362            currency_window.set_frozen(current_time, config.frozen_time_ms as u64);
363
364            warn!(
365                "🚨 MMP TRIGGERED: Limit exceeded for wallet={}, currency={} (qty={}, delta={:.2}, vega={:.2})",
366                wallet, currency, currency_window.cumulative_qty, currency_window.cumulative_delta, currency_window.cumulative_vega
367            );
368
369            return Err("MMP limit exceeded".to_string());
370        }
371
372        debug!(
373            "MMP: Fill accepted - wallet={}, currency={}, qty={}, delta={:.2}, vega={:.2}",
374            wallet,
375            currency,
376            currency_window.cumulative_qty,
377            currency_window.cumulative_delta,
378            currency_window.cumulative_vega
379        );
380
381        Ok(())
382    }
383
384    /// Start the automatic eviction task (legacy, no shutdown support).
385    /// This spawns a background task that periodically evicts old fills from all currency windows.
386    /// For graceful shutdown support, use `start_with_shutdown` instead.
387    pub fn start(&self) {
388        // Create a dummy shutdown channel that never fires.
389        // IMPORTANT: We must keep the sender alive, otherwise recv() returns Closed immediately.
390        let (tx, rx) = tokio::sync::broadcast::channel::<()>(1);
391        std::mem::forget(tx);
392        self.start_with_shutdown(rx);
393    }
394
395    /// Start the automatic eviction task with shutdown support.
396    /// This spawns a background task that periodically evicts old fills from all currency windows.
397    /// The task will exit gracefully when the shutdown signal is received.
398    pub fn start_with_shutdown(
399        &self,
400        mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
401    ) -> tokio::task::JoinHandle<()> {
402        let state = Arc::clone(&self.state);
403        let configs = Arc::clone(&self.configs);
404
405        tokio::spawn(async move {
406            info!("MMP automatic eviction task started");
407            let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(1000));
408
409            loop {
410                tokio::select! {
411                    _ = shutdown_rx.recv() => {
412                        info!("MMP eviction task received shutdown signal");
413                        break;
414                    }
415                    _ = interval.tick() => {
416                        let current_time = std::time::SystemTime::now()
417                            .duration_since(std::time::UNIX_EPOCH)
418                            .unwrap()
419                            .as_millis() as u64;
420
421                        // Get read lock on configs to find all active intervals
422                        let configs_snapshot = configs.read().await;
423                        let mut wallet_currency_intervals: HashMap<(WalletAddress, String), u64> =
424                            HashMap::new();
425
426                        for ((wallet, currency), config) in configs_snapshot.iter() {
427                            if config.enabled {
428                                wallet_currency_intervals
429                                    .insert((*wallet, currency.clone()), config.interval_ms as u64);
430                            }
431                        }
432                        drop(configs_snapshot);
433
434                        // Get write lock on state to evict old fills
435                        let mut state_guard = state.write().await;
436
437                        for ((wallet, currency), interval_ms) in wallet_currency_intervals.iter() {
438                            if let Some(wallet_state) = state_guard.get_mut(wallet) {
439                                if let Some(currency_window) = wallet_state.currencies.get_mut(currency) {
440                                    let before_count = currency_window.fills.len();
441                                    currency_window.evict_old_fills(current_time, *interval_ms);
442                                    let after_count = currency_window.fills.len();
443
444                                    if before_count > after_count {
445                                        debug!(
446                                            "MMP eviction: wallet={}, currency={}, evicted {} fills",
447                                            wallet,
448                                            currency,
449                                            before_count - after_count
450                                        );
451                                    }
452                                }
453                            }
454                        }
455                    }
456                }
457            }
458            info!("MMP eviction task stopped");
459        })
460    }
461
462    /// Calculate delta and vega for a fill using the Greeks cache
463    /// Returns error if Greeks data is not available for the symbol
464    async fn calculate_fill_metrics(&self, fill: &Fill) -> Result<(f64, f64)> {
465        // Get Greeks from cache
466        let greeks = self
467            .greeks_cache
468            .get_greeks(&fill.symbol)
469            .await
470            .context("Greeks not available for symbol")?;
471
472        let quantity_in_contracts = fill.size.to_f64().unwrap_or(0.0);
473        let side_multiplier: f64 = match fill.taker_side {
474            hypercall_types::Side::Buy => 1.0,
475            hypercall_types::Side::Sell => -1.0,
476        };
477
478        // Apply side multiplier to delta and vega
479        let delta = greeks.delta * quantity_in_contracts * side_multiplier;
480        let vega = greeks.vega * quantity_in_contracts * side_multiplier.abs();
481
482        Ok((delta, vega))
483    }
484}
485
486#[async_trait::async_trait]
487impl crate::shared::service::Service for MmpCache {
488    fn name(&self) -> &'static str {
489        "MmpCache"
490    }
491
492    fn owner(&self) -> crate::shared::service::ServiceOwner {
493        crate::shared::service::ServiceOwner::Engine
494    }
495
496    async fn run(
497        self: std::sync::Arc<Self>,
498        shutdown: crate::shared::ShutdownRx,
499    ) -> anyhow::Result<()> {
500        let handle = self.start_with_shutdown(shutdown);
501        handle
502            .await
503            .map_err(|e| anyhow::anyhow!("MmpCache task task failed: {:?}", e))
504    }
505}
506
507#[cfg(test)]
508mod tests {
509    use super::*;
510
511    #[test]
512    fn test_currency_fill_window_basic() {
513        let mut window = CurrencyFillWindow::new();
514
515        let record = FillRecord {
516            timestamp: 1000,
517            quantity: 100,
518            delta: 50.0,
519            vega: 10.0,
520        };
521
522        // No limits, should accept
523        let breached = window.eval_fill(record, None, None, None);
524        assert!(!breached);
525
526        assert_eq!(window.cumulative_qty, 100);
527        assert_eq!(window.cumulative_delta, 50.0);
528        assert_eq!(window.cumulative_vega, 10.0);
529    }
530
531    #[test]
532    fn test_evict_old_fills() {
533        let mut window = CurrencyFillWindow::new();
534
535        window.eval_fill(
536            FillRecord {
537                timestamp: 1000,
538                quantity: 100,
539                delta: 50.0,
540                vega: 10.0,
541            },
542            None,
543            None,
544            None,
545        );
546
547        window.eval_fill(
548            FillRecord {
549                timestamp: 2000,
550                quantity: 200,
551                delta: 100.0,
552                vega: 20.0,
553            },
554            None,
555            None,
556            None,
557        );
558
559        // Evict fills older than 1500ms ago (cutoff at 3500 - 1500 = 2000)
560        window.evict_old_fills(3500, 1500);
561
562        // First fill (timestamp 1000) should be evicted
563        assert_eq!(window.cumulative_qty, 200);
564        assert_eq!(window.cumulative_delta, 100.0);
565        assert_eq!(window.cumulative_vega, 20.0);
566        assert_eq!(window.fills.len(), 1);
567    }
568
569    #[test]
570    fn test_frozen_state() {
571        let mut window = CurrencyFillWindow::new();
572
573        // Not frozen initially
574        assert!(!window.is_frozen(1000));
575
576        // Set frozen for 5000ms from time 1000
577        window.set_frozen(1000, 5000);
578
579        // Should be frozen at time 3000
580        assert!(window.is_frozen(3000));
581
582        // Should not be frozen at time 7000
583        assert!(!window.is_frozen(7000));
584    }
585
586    #[test]
587    fn test_reset() {
588        let mut window = CurrencyFillWindow::new();
589
590        window.eval_fill(
591            FillRecord {
592                timestamp: 1000,
593                quantity: 100,
594                delta: 50.0,
595                vega: 10.0,
596            },
597            None,
598            None,
599            None,
600        );
601
602        window.set_frozen(1000, 5000);
603
604        window.reset();
605
606        assert_eq!(window.cumulative_qty, 0);
607        assert_eq!(window.cumulative_delta, 0.0);
608        assert_eq!(window.cumulative_vega, 0.0);
609        assert!(window.fills.is_empty());
610        assert!(!window.is_frozen(3000));
611    }
612
613    #[test]
614    fn test_eval_fill_with_qty_limit() {
615        let mut window = CurrencyFillWindow::new();
616
617        // Add a fill that's within limits
618        let breached = window.eval_fill(
619            FillRecord {
620                timestamp: 1000,
621                quantity: 100,
622                delta: 50.0,
623                vega: 10.0,
624            },
625            Some(200), // qty_limit
626            None,
627            None,
628        );
629        assert!(!breached);
630        assert_eq!(window.cumulative_qty, 100);
631
632        // Try to add a fill that would exceed the limit
633        let breached = window.eval_fill(
634            FillRecord {
635                timestamp: 2000,
636                quantity: 150,
637                delta: 75.0,
638                vega: 15.0,
639            },
640            Some(200), // qty_limit
641            None,
642            None,
643        );
644        assert!(breached);
645        // Fill SHOULD have been added (new behavior: always add first, then check)
646        assert_eq!(window.cumulative_qty, 250);
647    }
648
649    #[test]
650    fn test_eval_fill_with_delta_limit() {
651        let mut window = CurrencyFillWindow::new();
652
653        // Add a fill within delta limit
654        let breached = window.eval_fill(
655            FillRecord {
656                timestamp: 1000,
657                quantity: 100,
658                delta: 50.0,
659                vega: 10.0,
660            },
661            None,
662            Some(100.0), // delta_limit
663            None,
664        );
665        assert!(!breached);
666
667        // Try to add a fill that would exceed delta limit
668        let breached = window.eval_fill(
669            FillRecord {
670                timestamp: 2000,
671                quantity: 100,
672                delta: 60.0,
673                vega: 10.0,
674            },
675            None,
676            Some(100.0), // delta_limit
677            None,
678        );
679        assert!(breached);
680        assert_eq!(window.cumulative_delta, 110.0); // Fill was added (new behavior)
681    }
682
683    #[test]
684    fn test_eval_fill_with_vega_limit() {
685        let mut window = CurrencyFillWindow::new();
686
687        // Add a fill within vega limit
688        let breached = window.eval_fill(
689            FillRecord {
690                timestamp: 1000,
691                quantity: 100,
692                delta: 50.0,
693                vega: 10.0,
694            },
695            None,
696            None,
697            Some(20.0), // vega_limit
698        );
699        assert!(!breached);
700
701        // Try to add a fill that would exceed vega limit
702        let breached = window.eval_fill(
703            FillRecord {
704                timestamp: 2000,
705                quantity: 100,
706                delta: 50.0,
707                vega: 15.0,
708            },
709            None,
710            None,
711            Some(20.0), // vega_limit
712        );
713        assert!(breached);
714        assert_eq!(window.cumulative_vega, 25.0); // Fill was added (new behavior)
715    }
716}