Skip to main content

hypercall_api/rfq/
indicative_quote_cache.rs

1use dashmap::DashMap;
2use hypercall_types::WalletAddress;
3use rust_decimal::Decimal;
4use std::sync::Arc;
5
6/// Entry for a single QP's indicative quote on a single instrument.
7/// Prices are the only source of truth — implied vol is derived
8/// downstream by a canonical pricer when needed.
9#[derive(Debug, Clone)]
10pub struct IndicativeQuoteEntry {
11    pub instrument: String,
12    pub bid_price: Decimal,
13    pub ask_price: Decimal,
14    pub max_bid_size: Decimal,
15    pub max_ask_size: Decimal,
16    pub updated_at: u64,
17}
18
19/// Aggregated indicative quote across all QPs for a single instrument.
20#[derive(Debug, Clone)]
21pub struct AggregatedIndicativeQuote {
22    pub instrument: String,
23    pub best_bid: Decimal,
24    pub best_ask: Decimal,
25    pub indicative_bid_size: Decimal,
26    pub indicative_ask_size: Decimal,
27    pub num_providers: u32,
28    pub updated_at: u64,
29}
30
31/// In-memory cache of indicative quotes from all QPs, with TTL eviction
32/// and aggregation into best-bid/best-ask per instrument.
33pub struct IndicativeQuoteCache {
34    /// Per-QP per-instrument quotes: (qp_wallet, instrument) -> entry
35    quotes: DashMap<(WalletAddress, String), IndicativeQuoteEntry>,
36    /// Aggregated best quotes per instrument
37    aggregates: DashMap<String, AggregatedIndicativeQuote>,
38    /// TTL in milliseconds for stale quote eviction
39    ttl_ms: u64,
40}
41
42impl IndicativeQuoteCache {
43    pub fn new(ttl_ms: u64) -> Self {
44        Self {
45            quotes: DashMap::new(),
46            aggregates: DashMap::new(),
47            ttl_ms,
48        }
49    }
50
51    /// Update quotes from a QP with a full snapshot. Upserts every
52    /// instrument in `entries` and evicts any previously-seen instrument
53    /// for this QP that is NOT present in the new batch, since `rfq-mm`
54    /// streams periodic snapshots and silently omits instruments when it
55    /// can no longer price them. Without the eviction, the aggregate
56    /// would keep serving stale prices until TTL eviction fires much
57    /// later, so `/options-summary` could show an old quote long after
58    /// the QP stopped supporting that instrument.
59    pub fn update(
60        &self,
61        qp_wallet: WalletAddress,
62        entries: Vec<IndicativeQuoteEntry>,
63    ) -> Vec<(String, Option<AggregatedIndicativeQuote>)> {
64        use std::collections::HashSet;
65
66        let incoming: HashSet<String> = entries.iter().map(|e| e.instrument.clone()).collect();
67        let mut affected_instruments: Vec<String> = Vec::new();
68
69        // Evict this QP's previous entries that are absent from the new snapshot.
70        self.quotes.retain(|(wallet, instrument), _| {
71            if wallet == &qp_wallet && !incoming.contains(instrument) {
72                if !affected_instruments.contains(instrument) {
73                    affected_instruments.push(instrument.clone());
74                }
75                false
76            } else {
77                true
78            }
79        });
80
81        // Upsert the new snapshot.
82        for entry in entries {
83            let instrument = entry.instrument.clone();
84            self.quotes.insert((qp_wallet, instrument.clone()), entry);
85            if !affected_instruments.contains(&instrument) {
86                affected_instruments.push(instrument);
87            }
88        }
89
90        affected_instruments
91            .into_iter()
92            .map(|instrument| {
93                let aggregate = self.recompute_aggregate(&instrument);
94                (instrument, aggregate)
95            })
96            .collect()
97    }
98
99    /// Evict all quotes from a disconnected QP.
100    pub fn evict_qp(
101        &self,
102        qp_wallet: &WalletAddress,
103    ) -> Vec<(String, Option<AggregatedIndicativeQuote>)> {
104        let mut affected_instruments = Vec::new();
105
106        self.quotes.retain(|(wallet, instrument), _| {
107            if wallet == qp_wallet {
108                if !affected_instruments.contains(instrument) {
109                    affected_instruments.push(instrument.clone());
110                }
111                false
112            } else {
113                true
114            }
115        });
116
117        affected_instruments
118            .into_iter()
119            .map(|instrument| {
120                let aggregate = self.recompute_aggregate(&instrument);
121                (instrument, aggregate)
122            })
123            .collect()
124    }
125
126    /// Get the aggregated quote for an instrument.
127    pub fn get_aggregate(&self, instrument: &str) -> Option<AggregatedIndicativeQuote> {
128        self.aggregates.get(instrument).map(|r| r.clone())
129    }
130
131    /// Get all aggregated quotes.
132    pub fn get_all_aggregates(&self) -> Vec<AggregatedIndicativeQuote> {
133        self.aggregates.iter().map(|r| r.value().clone()).collect()
134    }
135
136    /// Start a background task that evicts stale quotes.
137    pub fn start_ttl_eviction(
138        self: &Arc<Self>,
139        mut shutdown: tokio::sync::broadcast::Receiver<()>,
140    ) {
141        let cache = Arc::clone(self);
142        tokio::spawn(async move {
143            let mut interval = tokio::time::interval(std::time::Duration::from_secs(1));
144            loop {
145                tokio::select! {
146                    _ = shutdown.recv() => break,
147                    _ = interval.tick() => {
148                        cache.evict_stale();
149                    }
150                }
151            }
152        });
153    }
154
155    /// Evict entries older than TTL.
156    fn evict_stale(&self) {
157        let now = get_timestamp_millis();
158        let mut affected_instruments = Vec::new();
159
160        self.quotes.retain(|(_, instrument), entry| {
161            if now.saturating_sub(entry.updated_at) > self.ttl_ms {
162                if !affected_instruments.contains(instrument) {
163                    affected_instruments.push(instrument.clone());
164                }
165                false
166            } else {
167                true
168            }
169        });
170
171        for instrument in affected_instruments {
172            self.recompute_aggregate(&instrument);
173        }
174    }
175
176    /// Recompute the aggregate for a single instrument from all QP entries.
177    fn recompute_aggregate(&self, instrument: &str) -> Option<AggregatedIndicativeQuote> {
178        let mut best_bid = Decimal::ZERO;
179        let mut best_ask = Decimal::MAX;
180        let mut bid_size_at_best = Decimal::ZERO;
181        let mut ask_size_at_best = Decimal::ZERO;
182        let mut num_providers = 0u32;
183        let mut latest_update = 0u64;
184        let mut has_entries = false;
185
186        for entry in self.quotes.iter() {
187            let ((_, inst), quote) = entry.pair();
188            if inst != instrument {
189                continue;
190            }
191            has_entries = true;
192            num_providers += 1;
193
194            if quote.updated_at > latest_update {
195                latest_update = quote.updated_at;
196            }
197
198            if quote.bid_price > best_bid {
199                best_bid = quote.bid_price;
200                bid_size_at_best = quote.max_bid_size;
201            } else if quote.bid_price == best_bid {
202                bid_size_at_best += quote.max_bid_size;
203            }
204
205            if quote.ask_price < best_ask {
206                best_ask = quote.ask_price;
207                ask_size_at_best = quote.max_ask_size;
208            } else if quote.ask_price == best_ask {
209                ask_size_at_best += quote.max_ask_size;
210            }
211        }
212
213        if !has_entries {
214            self.aggregates.remove(instrument);
215            return None;
216        }
217
218        let aggregate = AggregatedIndicativeQuote {
219            instrument: instrument.to_string(),
220            best_bid,
221            best_ask,
222            indicative_bid_size: bid_size_at_best,
223            indicative_ask_size: ask_size_at_best,
224            num_providers,
225            updated_at: latest_update,
226        };
227
228        self.aggregates
229            .insert(instrument.to_string(), aggregate.clone());
230        Some(aggregate)
231    }
232}
233
234fn get_timestamp_millis() -> u64 {
235    std::time::SystemTime::now()
236        .duration_since(std::time::UNIX_EPOCH)
237        .map(|duration| duration.as_millis() as u64)
238        .unwrap_or(0)
239}
240
241#[cfg(test)]
242mod tests {
243    use super::*;
244    use rust_decimal_macros::dec;
245
246    fn test_wallet(id: u8) -> WalletAddress {
247        let mut bytes = [0u8; 20];
248        bytes[19] = id;
249        WalletAddress::from(alloy::primitives::Address::from(bytes))
250    }
251
252    fn make_entry(instrument: &str, bid: Decimal, ask: Decimal, ts: u64) -> IndicativeQuoteEntry {
253        IndicativeQuoteEntry {
254            instrument: instrument.to_string(),
255            bid_price: bid,
256            ask_price: ask,
257            max_bid_size: dec!(10),
258            max_ask_size: dec!(10),
259            updated_at: ts,
260        }
261    }
262
263    #[test]
264    fn test_single_qp_update() {
265        let cache = IndicativeQuoteCache::new(10_000);
266        let qp = test_wallet(1);
267        let now = get_timestamp_millis();
268
269        cache.update(
270            qp,
271            vec![make_entry("ETH-C-3000", dec!(120), dec!(125), now)],
272        );
273
274        let agg = cache.get_aggregate("ETH-C-3000").unwrap();
275        assert_eq!(agg.best_bid, dec!(120));
276        assert_eq!(agg.best_ask, dec!(125));
277        assert_eq!(agg.num_providers, 1);
278    }
279
280    #[test]
281    fn test_multiple_qps_aggregation() {
282        let cache = IndicativeQuoteCache::new(10_000);
283        let now = get_timestamp_millis();
284
285        // QP1: bid 120, ask 125
286        cache.update(
287            test_wallet(1),
288            vec![make_entry("ETH-C-3000", dec!(120), dec!(125), now)],
289        );
290        // QP2: bid 121, ask 124 (better on both sides)
291        cache.update(
292            test_wallet(2),
293            vec![make_entry("ETH-C-3000", dec!(121), dec!(124), now)],
294        );
295
296        let agg = cache.get_aggregate("ETH-C-3000").unwrap();
297        assert_eq!(agg.best_bid, dec!(121));
298        assert_eq!(agg.best_ask, dec!(124));
299        assert_eq!(agg.num_providers, 2);
300        assert_eq!(agg.indicative_bid_size, dec!(10)); // only QP2 at best
301        assert_eq!(agg.indicative_ask_size, dec!(10)); // only QP2 at best
302    }
303
304    #[test]
305    fn test_sizes_sum_at_same_price() {
306        let cache = IndicativeQuoteCache::new(10_000);
307        let now = get_timestamp_millis();
308
309        cache.update(
310            test_wallet(1),
311            vec![make_entry("ETH-C-3000", dec!(120), dec!(125), now)],
312        );
313        cache.update(
314            test_wallet(2),
315            vec![make_entry("ETH-C-3000", dec!(120), dec!(125), now)],
316        );
317
318        let agg = cache.get_aggregate("ETH-C-3000").unwrap();
319        assert_eq!(agg.best_bid, dec!(120));
320        assert_eq!(agg.indicative_bid_size, dec!(20)); // 10 + 10
321        assert_eq!(agg.indicative_ask_size, dec!(20));
322    }
323
324    #[test]
325    fn test_qp_disconnect_evicts() {
326        let cache = IndicativeQuoteCache::new(10_000);
327        let now = get_timestamp_millis();
328
329        cache.update(
330            test_wallet(1),
331            vec![make_entry("ETH-C-3000", dec!(120), dec!(125), now)],
332        );
333        cache.update(
334            test_wallet(2),
335            vec![make_entry("ETH-C-3000", dec!(121), dec!(124), now)],
336        );
337
338        // QP2 disconnects
339        let updates = cache.evict_qp(&test_wallet(2));
340
341        let agg = cache.get_aggregate("ETH-C-3000").unwrap();
342        assert_eq!(updates.len(), 1);
343        assert!(updates[0].1.is_some());
344        assert_eq!(agg.best_bid, dec!(120)); // falls back to QP1
345        assert_eq!(agg.best_ask, dec!(125));
346        assert_eq!(agg.num_providers, 1);
347    }
348
349    #[test]
350    fn test_evict_last_qp_removes_aggregate() {
351        let cache = IndicativeQuoteCache::new(10_000);
352        let now = get_timestamp_millis();
353
354        cache.update(
355            test_wallet(1),
356            vec![make_entry("ETH-C-3000", dec!(120), dec!(125), now)],
357        );
358
359        let updates = cache.evict_qp(&test_wallet(1));
360
361        assert_eq!(updates.len(), 1);
362        assert!(updates[0].1.is_none());
363        assert!(cache.get_aggregate("ETH-C-3000").is_none());
364    }
365
366    #[test]
367    fn test_empty_snapshot_returns_removal_update() {
368        let cache = IndicativeQuoteCache::new(10_000);
369        let qp = test_wallet(1);
370        let now = get_timestamp_millis();
371
372        cache.update(
373            qp,
374            vec![make_entry("ETH-C-3000", dec!(120), dec!(125), now)],
375        );
376
377        let updates = cache.update(qp, vec![]);
378
379        assert_eq!(updates.len(), 1);
380        assert_eq!(updates[0].0, "ETH-C-3000");
381        assert!(updates[0].1.is_none());
382        assert!(cache.get_aggregate("ETH-C-3000").is_none());
383    }
384
385    #[test]
386    fn test_ttl_eviction() {
387        let cache = IndicativeQuoteCache::new(5_000);
388        let old_ts = get_timestamp_millis() - 10_000; // 10s ago, beyond 5s TTL
389
390        cache.update(
391            test_wallet(1),
392            vec![make_entry("ETH-C-3000", dec!(120), dec!(125), old_ts)],
393        );
394
395        // Entry exists but is stale
396        assert!(cache.get_aggregate("ETH-C-3000").is_some());
397
398        cache.evict_stale();
399
400        assert!(cache.get_aggregate("ETH-C-3000").is_none());
401    }
402
403    #[test]
404    fn test_update_evicts_symbols_missing_from_new_snapshot() {
405        let cache = IndicativeQuoteCache::new(10_000);
406        let now = get_timestamp_millis();
407
408        // First snapshot: QP1 quotes two instruments.
409        cache.update(
410            test_wallet(1),
411            vec![
412                make_entry("ETH-C-3000", dec!(120), dec!(125), now),
413                make_entry("ETH-C-3100", dec!(90), dec!(95), now),
414            ],
415        );
416        assert!(cache.get_aggregate("ETH-C-3000").is_some());
417        assert!(cache.get_aggregate("ETH-C-3100").is_some());
418
419        // Second snapshot omits ETH-C-3100 (e.g. QP can no longer price it).
420        // The old entry must be evicted so the aggregate disappears.
421        cache.update(
422            test_wallet(1),
423            vec![make_entry("ETH-C-3000", dec!(121), dec!(126), now + 1)],
424        );
425        assert!(cache.get_aggregate("ETH-C-3000").is_some());
426        assert!(
427            cache.get_aggregate("ETH-C-3100").is_none(),
428            "ETH-C-3100 should be evicted when absent from new snapshot"
429        );
430
431        // Other QPs' quotes for the evicted instrument must NOT be touched.
432        cache.update(
433            test_wallet(2),
434            vec![make_entry("ETH-C-3100", dec!(88), dec!(93), now + 2)],
435        );
436        // QP1 sends another snapshot missing ETH-C-3100 — QP2's entry remains.
437        cache.update(
438            test_wallet(1),
439            vec![make_entry("ETH-C-3000", dec!(122), dec!(127), now + 3)],
440        );
441        assert!(
442            cache.get_aggregate("ETH-C-3100").is_some(),
443            "QP2's ETH-C-3100 entry must survive QP1's snapshot update"
444        );
445    }
446
447    #[test]
448    fn test_get_all_aggregates() {
449        let cache = IndicativeQuoteCache::new(10_000);
450        let now = get_timestamp_millis();
451
452        cache.update(
453            test_wallet(1),
454            vec![
455                make_entry("ETH-C-3000", dec!(120), dec!(125), now),
456                make_entry("BTC-C-100000", dec!(5000), dec!(5100), now),
457            ],
458        );
459
460        let all = cache.get_all_aggregates();
461        assert_eq!(all.len(), 2);
462    }
463}