1use crate::shared::order_types::ParsedSymbol;
2use anyhow::{Context, Result};
3use hypercall_db::MmpConfigRecord as MmpConfig;
4use hypercall_db::MmpConfigWriter;
5use hypercall_types::Fill;
6use hypercall_types::{to_contract_units_decimal, WalletAddress};
7use rust_decimal::prelude::ToPrimitive;
8use std::collections::{HashMap, VecDeque};
9use std::sync::Arc;
10use tokio::sync::RwLock;
11use tracing::{debug, info, warn};
12
13#[derive(Debug, Clone)]
15struct FillRecord {
16 timestamp: u64,
17 quantity: u64,
18 delta: f64,
19 vega: f64,
20}
21
22#[derive(Debug, Clone)]
24struct CurrencyFillWindow {
25 fills: VecDeque<FillRecord>,
26 cumulative_qty: u64,
27 cumulative_delta: f64,
28 cumulative_vega: f64,
29 frozen_until: Option<u64>, }
31
32impl CurrencyFillWindow {
33 fn new() -> Self {
34 Self {
35 fills: VecDeque::new(),
36 cumulative_qty: 0,
37 cumulative_delta: 0.0,
38 cumulative_vega: 0.0,
39 frozen_until: None,
40 }
41 }
42
43 fn eval_fill(
47 &mut self,
48 record: FillRecord,
49 qty_limit: Option<i64>,
50 delta_limit: Option<f64>,
51 vega_limit: Option<f64>,
52 ) -> bool {
53 self.cumulative_qty += record.quantity;
55 self.cumulative_delta += record.delta;
56 self.cumulative_vega += record.vega;
57 self.fills.push_back(record);
58
59 if let Some(limit) = qty_limit {
61 if self.cumulative_qty > limit as u64 {
62 return true; }
64 }
65
66 if let Some(limit) = delta_limit {
67 if self.cumulative_delta.abs() > limit {
68 return true; }
70 }
71
72 if let Some(limit) = vega_limit {
73 if self.cumulative_vega.abs() > limit {
74 return true; }
76 }
77
78 false }
80
81 fn evict_old_fills(&mut self, current_time: u64, interval_ms: u64) {
83 let cutoff_time = current_time.saturating_sub(interval_ms);
84
85 while let Some(front) = self.fills.front() {
86 if front.timestamp < cutoff_time {
87 self.cumulative_qty = self.cumulative_qty.saturating_sub(front.quantity);
89 self.cumulative_delta -= front.delta;
90 self.cumulative_vega -= front.vega;
91 self.fills.pop_front();
92 } else {
93 break;
94 }
95 }
96 }
97
98 fn is_frozen(&self, current_time: u64) -> bool {
100 if let Some(frozen_until) = self.frozen_until {
101 current_time < frozen_until
102 } else {
103 false
104 }
105 }
106
107 fn set_frozen(&mut self, current_time: u64, frozen_duration_ms: u64) {
109 self.frozen_until = Some(current_time + frozen_duration_ms);
110 }
111
112 fn reset(&mut self) {
114 self.fills.clear();
115 self.cumulative_qty = 0;
116 self.cumulative_delta = 0.0;
117 self.cumulative_vega = 0.0;
118 self.frozen_until = None;
119 }
120}
121
122#[derive(Debug, Clone)]
124struct WalletMmpState {
125 currencies: HashMap<String, CurrencyFillWindow>,
126}
127
128impl WalletMmpState {
129 fn new() -> Self {
130 Self {
131 currencies: HashMap::new(),
132 }
133 }
134
135 fn get_or_create_currency(&mut self, currency: &str) -> &mut CurrencyFillWindow {
136 self.currencies
137 .entry(currency.to_string())
138 .or_insert_with(CurrencyFillWindow::new)
139 }
140}
141
142pub struct MmpCache {
144 configs: Arc<RwLock<HashMap<(WalletAddress, String), MmpConfig>>>,
146
147 state: Arc<RwLock<HashMap<WalletAddress, WalletMmpState>>>,
149
150 db: Arc<dyn MmpConfigWriter>,
152
153 greeks_cache: Arc<crate::read_cache::greeks::GreeksCache>,
155}
156
157impl MmpCache {
158 pub fn new(
160 db: Arc<dyn MmpConfigWriter>,
161 greeks_cache: Arc<crate::read_cache::greeks::GreeksCache>,
162 ) -> Result<Self> {
163 info!("Initializing MmpCache");
164
165 Ok(Self {
166 configs: Arc::new(RwLock::new(HashMap::new())),
167 state: Arc::new(RwLock::new(HashMap::new())),
168 db: db,
169 greeks_cache,
170 })
171 }
172
173 pub async fn load_configs_from_db(&self) -> Result<()> {
175 let configs_list = self
176 .db
177 .get_all_mmp_configs_sync()
178 .context("Failed to load MMP configs from database")?;
179
180 let mut configs = self.configs.write().await;
181 for config in configs_list {
182 let key = (config.wallet_address, config.currency.clone());
183 info!(
184 "Loaded MMP config: wallet={}, currency={}, enabled={}",
185 config.wallet_address, config.currency, config.enabled
186 );
187 configs.insert(key, config);
188 }
189
190 info!("Loaded {} MMP configs from database", configs.len());
191 Ok(())
192 }
193
194 pub async fn set_config(&self, config: MmpConfig) -> Result<()> {
196 self.db
198 .save_mmp_config_sync(&config)
199 .context("Failed to save MMP config to database")?;
200
201 let saved_config = self
203 .db
204 .get_mmp_config_sync(&config.wallet_address, &config.currency)?
205 .ok_or_else(|| anyhow::anyhow!("Config not found after save"))?;
206
207 let key = (config.wallet_address, config.currency.clone());
209 let mut configs = self.configs.write().await;
210 configs.insert(key, saved_config);
211
212 info!(
213 "MMP config updated: wallet={}, currency={}",
214 config.wallet_address, config.currency
215 );
216 Ok(())
217 }
218
219 pub async fn get_config(&self, wallet: &WalletAddress, currency: &str) -> Option<MmpConfig> {
221 let configs = self.configs.read().await;
222 configs.get(&(*wallet, currency.to_string())).cloned()
223 }
224
225 pub async fn is_mmp_enabled(&self, wallet: &WalletAddress, currency: &str) -> bool {
229 let configs = self.configs.read().await;
230 if let Some(config) = configs.get(&(*wallet, currency.to_string())) {
231 config.enabled
232 } else {
233 false
234 }
235 }
236
237 pub async fn get_configs_for_wallet(&self, wallet: &WalletAddress) -> Vec<MmpConfig> {
239 let configs = self.configs.read().await;
240 configs
241 .iter()
242 .filter(|(key, _)| &key.0 == wallet)
243 .map(|(_, config)| config.clone())
244 .collect()
245 }
246
247 pub async fn delete_config(&self, wallet: &WalletAddress, currency: &str) -> Result<()> {
249 self.db
251 .delete_mmp_config_sync(wallet, currency)
252 .context("Failed to delete MMP config from database")?;
253
254 let mut configs = self.configs.write().await;
256 let key = (*wallet, currency.to_string());
257 configs.remove(&key);
258
259 info!(
260 "MMP config deleted: wallet={}, currency={}",
261 wallet, currency
262 );
263 Ok(())
264 }
265
266 pub async fn reset_mmp(&self, wallet: &WalletAddress, currency: &str) {
268 let mut state = self.state.write().await;
269 if let Some(wallet_state) = state.get_mut(wallet) {
270 if let Some(currency_window) = wallet_state.currencies.get_mut(currency) {
271 currency_window.reset();
272 info!("MMP state reset: wallet={}, currency={}", wallet, currency);
273 }
274 }
275 }
276
277 pub async fn is_frozen(
279 &self,
280 wallet: &WalletAddress,
281 currency: &str,
282 current_time: u64,
283 ) -> bool {
284 let state = self.state.read().await;
285 if let Some(wallet_state) = state.get(wallet) {
286 if let Some(currency_window) = wallet_state.currencies.get(currency) {
287 return currency_window.is_frozen(current_time);
288 }
289 }
290 false
291 }
292
293 pub async fn process_fill(
296 &self,
297 wallet: &WalletAddress,
298 currency: &str,
299 fill: &Fill,
300 current_time: u64,
301 ) -> Result<(), String> {
302 if ParsedSymbol::from_symbol(&fill.symbol).is_err() {
304 return Ok(());
305 }
306
307 let config = match self.get_config(wallet, currency).await {
309 Some(cfg) if cfg.enabled => cfg,
310 _ => {
311 return Ok(());
313 }
314 };
315
316 let (delta, vega) = self
318 .calculate_fill_metrics(fill)
319 .await
320 .map_err(|e| format!("Failed to calculate fill metrics: {}", e))?;
321
322 let size_contract_units = to_contract_units_decimal(&fill.symbol, fill.size)
324 .to_u64()
325 .unwrap_or(0);
326 let record = FillRecord {
327 timestamp: fill.timestamp,
328 quantity: size_contract_units,
329 delta,
330 vega,
331 };
332
333 let mut state = self.state.write().await;
335 let wallet_state = state.entry(*wallet).or_insert_with(WalletMmpState::new);
336
337 let currency_window = wallet_state.get_or_create_currency(currency);
338
339 if currency_window.is_frozen(current_time) {
341 debug!(
342 "MMP: Fill rejected - wallet {} currency {} is frozen",
343 wallet, currency
344 );
345 return Err("MMP triggered - currently frozen".to_string());
346 }
347
348 currency_window.evict_old_fills(current_time, config.interval_ms as u64);
350
351 let breached = currency_window.eval_fill(
354 record,
355 config.qty_limit.and_then(|d| d.to_i64()),
356 config.delta_limit.and_then(|d| d.to_f64()),
357 config.vega_limit.and_then(|d| d.to_f64()),
358 );
359
360 if breached {
361 currency_window.set_frozen(current_time, config.frozen_time_ms as u64);
363
364 warn!(
365 "🚨 MMP TRIGGERED: Limit exceeded for wallet={}, currency={} (qty={}, delta={:.2}, vega={:.2})",
366 wallet, currency, currency_window.cumulative_qty, currency_window.cumulative_delta, currency_window.cumulative_vega
367 );
368
369 return Err("MMP limit exceeded".to_string());
370 }
371
372 debug!(
373 "MMP: Fill accepted - wallet={}, currency={}, qty={}, delta={:.2}, vega={:.2}",
374 wallet,
375 currency,
376 currency_window.cumulative_qty,
377 currency_window.cumulative_delta,
378 currency_window.cumulative_vega
379 );
380
381 Ok(())
382 }
383
384 pub fn start(&self) {
388 let (tx, rx) = tokio::sync::broadcast::channel::<()>(1);
391 std::mem::forget(tx);
392 self.start_with_shutdown(rx);
393 }
394
395 pub fn start_with_shutdown(
399 &self,
400 mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
401 ) -> tokio::task::JoinHandle<()> {
402 let state = Arc::clone(&self.state);
403 let configs = Arc::clone(&self.configs);
404
405 tokio::spawn(async move {
406 info!("MMP automatic eviction task started");
407 let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(1000));
408
409 loop {
410 tokio::select! {
411 _ = shutdown_rx.recv() => {
412 info!("MMP eviction task received shutdown signal");
413 break;
414 }
415 _ = interval.tick() => {
416 let current_time = std::time::SystemTime::now()
417 .duration_since(std::time::UNIX_EPOCH)
418 .unwrap()
419 .as_millis() as u64;
420
421 let configs_snapshot = configs.read().await;
423 let mut wallet_currency_intervals: HashMap<(WalletAddress, String), u64> =
424 HashMap::new();
425
426 for ((wallet, currency), config) in configs_snapshot.iter() {
427 if config.enabled {
428 wallet_currency_intervals
429 .insert((*wallet, currency.clone()), config.interval_ms as u64);
430 }
431 }
432 drop(configs_snapshot);
433
434 let mut state_guard = state.write().await;
436
437 for ((wallet, currency), interval_ms) in wallet_currency_intervals.iter() {
438 if let Some(wallet_state) = state_guard.get_mut(wallet) {
439 if let Some(currency_window) = wallet_state.currencies.get_mut(currency) {
440 let before_count = currency_window.fills.len();
441 currency_window.evict_old_fills(current_time, *interval_ms);
442 let after_count = currency_window.fills.len();
443
444 if before_count > after_count {
445 debug!(
446 "MMP eviction: wallet={}, currency={}, evicted {} fills",
447 wallet,
448 currency,
449 before_count - after_count
450 );
451 }
452 }
453 }
454 }
455 }
456 }
457 }
458 info!("MMP eviction task stopped");
459 })
460 }
461
462 async fn calculate_fill_metrics(&self, fill: &Fill) -> Result<(f64, f64)> {
465 let greeks = self
467 .greeks_cache
468 .get_greeks(&fill.symbol)
469 .await
470 .context("Greeks not available for symbol")?;
471
472 let quantity_in_contracts = fill.size.to_f64().unwrap_or(0.0);
473 let side_multiplier: f64 = match fill.taker_side {
474 hypercall_types::Side::Buy => 1.0,
475 hypercall_types::Side::Sell => -1.0,
476 };
477
478 let delta = greeks.delta * quantity_in_contracts * side_multiplier;
480 let vega = greeks.vega * quantity_in_contracts * side_multiplier.abs();
481
482 Ok((delta, vega))
483 }
484}
485
486#[async_trait::async_trait]
487impl crate::shared::service::Service for MmpCache {
488 fn name(&self) -> &'static str {
489 "MmpCache"
490 }
491
492 fn owner(&self) -> crate::shared::service::ServiceOwner {
493 crate::shared::service::ServiceOwner::Engine
494 }
495
496 async fn run(
497 self: std::sync::Arc<Self>,
498 shutdown: crate::shared::ShutdownRx,
499 ) -> anyhow::Result<()> {
500 let handle = self.start_with_shutdown(shutdown);
501 handle
502 .await
503 .map_err(|e| anyhow::anyhow!("MmpCache task task failed: {:?}", e))
504 }
505}
506
507#[cfg(test)]
508mod tests {
509 use super::*;
510
511 #[test]
512 fn test_currency_fill_window_basic() {
513 let mut window = CurrencyFillWindow::new();
514
515 let record = FillRecord {
516 timestamp: 1000,
517 quantity: 100,
518 delta: 50.0,
519 vega: 10.0,
520 };
521
522 let breached = window.eval_fill(record, None, None, None);
524 assert!(!breached);
525
526 assert_eq!(window.cumulative_qty, 100);
527 assert_eq!(window.cumulative_delta, 50.0);
528 assert_eq!(window.cumulative_vega, 10.0);
529 }
530
531 #[test]
532 fn test_evict_old_fills() {
533 let mut window = CurrencyFillWindow::new();
534
535 window.eval_fill(
536 FillRecord {
537 timestamp: 1000,
538 quantity: 100,
539 delta: 50.0,
540 vega: 10.0,
541 },
542 None,
543 None,
544 None,
545 );
546
547 window.eval_fill(
548 FillRecord {
549 timestamp: 2000,
550 quantity: 200,
551 delta: 100.0,
552 vega: 20.0,
553 },
554 None,
555 None,
556 None,
557 );
558
559 window.evict_old_fills(3500, 1500);
561
562 assert_eq!(window.cumulative_qty, 200);
564 assert_eq!(window.cumulative_delta, 100.0);
565 assert_eq!(window.cumulative_vega, 20.0);
566 assert_eq!(window.fills.len(), 1);
567 }
568
569 #[test]
570 fn test_frozen_state() {
571 let mut window = CurrencyFillWindow::new();
572
573 assert!(!window.is_frozen(1000));
575
576 window.set_frozen(1000, 5000);
578
579 assert!(window.is_frozen(3000));
581
582 assert!(!window.is_frozen(7000));
584 }
585
586 #[test]
587 fn test_reset() {
588 let mut window = CurrencyFillWindow::new();
589
590 window.eval_fill(
591 FillRecord {
592 timestamp: 1000,
593 quantity: 100,
594 delta: 50.0,
595 vega: 10.0,
596 },
597 None,
598 None,
599 None,
600 );
601
602 window.set_frozen(1000, 5000);
603
604 window.reset();
605
606 assert_eq!(window.cumulative_qty, 0);
607 assert_eq!(window.cumulative_delta, 0.0);
608 assert_eq!(window.cumulative_vega, 0.0);
609 assert!(window.fills.is_empty());
610 assert!(!window.is_frozen(3000));
611 }
612
613 #[test]
614 fn test_eval_fill_with_qty_limit() {
615 let mut window = CurrencyFillWindow::new();
616
617 let breached = window.eval_fill(
619 FillRecord {
620 timestamp: 1000,
621 quantity: 100,
622 delta: 50.0,
623 vega: 10.0,
624 },
625 Some(200), None,
627 None,
628 );
629 assert!(!breached);
630 assert_eq!(window.cumulative_qty, 100);
631
632 let breached = window.eval_fill(
634 FillRecord {
635 timestamp: 2000,
636 quantity: 150,
637 delta: 75.0,
638 vega: 15.0,
639 },
640 Some(200), None,
642 None,
643 );
644 assert!(breached);
645 assert_eq!(window.cumulative_qty, 250);
647 }
648
649 #[test]
650 fn test_eval_fill_with_delta_limit() {
651 let mut window = CurrencyFillWindow::new();
652
653 let breached = window.eval_fill(
655 FillRecord {
656 timestamp: 1000,
657 quantity: 100,
658 delta: 50.0,
659 vega: 10.0,
660 },
661 None,
662 Some(100.0), None,
664 );
665 assert!(!breached);
666
667 let breached = window.eval_fill(
669 FillRecord {
670 timestamp: 2000,
671 quantity: 100,
672 delta: 60.0,
673 vega: 10.0,
674 },
675 None,
676 Some(100.0), None,
678 );
679 assert!(breached);
680 assert_eq!(window.cumulative_delta, 110.0); }
682
683 #[test]
684 fn test_eval_fill_with_vega_limit() {
685 let mut window = CurrencyFillWindow::new();
686
687 let breached = window.eval_fill(
689 FillRecord {
690 timestamp: 1000,
691 quantity: 100,
692 delta: 50.0,
693 vega: 10.0,
694 },
695 None,
696 None,
697 Some(20.0), );
699 assert!(!breached);
700
701 let breached = window.eval_fill(
703 FillRecord {
704 timestamp: 2000,
705 quantity: 100,
706 delta: 50.0,
707 vega: 15.0,
708 },
709 None,
710 None,
711 Some(20.0), );
713 assert!(breached);
714 assert_eq!(window.cumulative_vega, 25.0); }
716}