1use dashmap::DashMap;
2use hypercall_types::WalletAddress;
3use rust_decimal::Decimal;
4use std::sync::Arc;
5
6#[derive(Debug, Clone)]
10pub struct IndicativeQuoteEntry {
11 pub instrument: String,
12 pub bid_price: Decimal,
13 pub ask_price: Decimal,
14 pub max_bid_size: Decimal,
15 pub max_ask_size: Decimal,
16 pub updated_at: u64,
17}
18
19#[derive(Debug, Clone)]
21pub struct AggregatedIndicativeQuote {
22 pub instrument: String,
23 pub best_bid: Decimal,
24 pub best_ask: Decimal,
25 pub indicative_bid_size: Decimal,
26 pub indicative_ask_size: Decimal,
27 pub num_providers: u32,
28 pub updated_at: u64,
29}
30
31pub struct IndicativeQuoteCache {
34 quotes: DashMap<(WalletAddress, String), IndicativeQuoteEntry>,
36 aggregates: DashMap<String, AggregatedIndicativeQuote>,
38 ttl_ms: u64,
40}
41
42impl IndicativeQuoteCache {
43 pub fn new(ttl_ms: u64) -> Self {
44 Self {
45 quotes: DashMap::new(),
46 aggregates: DashMap::new(),
47 ttl_ms,
48 }
49 }
50
51 pub fn update(
60 &self,
61 qp_wallet: WalletAddress,
62 entries: Vec<IndicativeQuoteEntry>,
63 ) -> Vec<(String, Option<AggregatedIndicativeQuote>)> {
64 use std::collections::HashSet;
65
66 let incoming: HashSet<String> = entries.iter().map(|e| e.instrument.clone()).collect();
67 let mut affected_instruments: Vec<String> = Vec::new();
68
69 self.quotes.retain(|(wallet, instrument), _| {
71 if wallet == &qp_wallet && !incoming.contains(instrument) {
72 if !affected_instruments.contains(instrument) {
73 affected_instruments.push(instrument.clone());
74 }
75 false
76 } else {
77 true
78 }
79 });
80
81 for entry in entries {
83 let instrument = entry.instrument.clone();
84 self.quotes.insert((qp_wallet, instrument.clone()), entry);
85 if !affected_instruments.contains(&instrument) {
86 affected_instruments.push(instrument);
87 }
88 }
89
90 affected_instruments
91 .into_iter()
92 .map(|instrument| {
93 let aggregate = self.recompute_aggregate(&instrument);
94 (instrument, aggregate)
95 })
96 .collect()
97 }
98
99 pub fn evict_qp(
101 &self,
102 qp_wallet: &WalletAddress,
103 ) -> Vec<(String, Option<AggregatedIndicativeQuote>)> {
104 let mut affected_instruments = Vec::new();
105
106 self.quotes.retain(|(wallet, instrument), _| {
107 if wallet == qp_wallet {
108 if !affected_instruments.contains(instrument) {
109 affected_instruments.push(instrument.clone());
110 }
111 false
112 } else {
113 true
114 }
115 });
116
117 affected_instruments
118 .into_iter()
119 .map(|instrument| {
120 let aggregate = self.recompute_aggregate(&instrument);
121 (instrument, aggregate)
122 })
123 .collect()
124 }
125
126 pub fn get_aggregate(&self, instrument: &str) -> Option<AggregatedIndicativeQuote> {
128 self.aggregates.get(instrument).map(|r| r.clone())
129 }
130
131 pub fn get_all_aggregates(&self) -> Vec<AggregatedIndicativeQuote> {
133 self.aggregates.iter().map(|r| r.value().clone()).collect()
134 }
135
136 pub fn start_ttl_eviction(
138 self: &Arc<Self>,
139 mut shutdown: tokio::sync::broadcast::Receiver<()>,
140 ) {
141 let cache = Arc::clone(self);
142 tokio::spawn(async move {
143 let mut interval = tokio::time::interval(std::time::Duration::from_secs(1));
144 loop {
145 tokio::select! {
146 _ = shutdown.recv() => break,
147 _ = interval.tick() => {
148 cache.evict_stale();
149 }
150 }
151 }
152 });
153 }
154
155 fn evict_stale(&self) {
157 let now = get_timestamp_millis();
158 let mut affected_instruments = Vec::new();
159
160 self.quotes.retain(|(_, instrument), entry| {
161 if now.saturating_sub(entry.updated_at) > self.ttl_ms {
162 if !affected_instruments.contains(instrument) {
163 affected_instruments.push(instrument.clone());
164 }
165 false
166 } else {
167 true
168 }
169 });
170
171 for instrument in affected_instruments {
172 self.recompute_aggregate(&instrument);
173 }
174 }
175
176 fn recompute_aggregate(&self, instrument: &str) -> Option<AggregatedIndicativeQuote> {
178 let mut best_bid = Decimal::ZERO;
179 let mut best_ask = Decimal::MAX;
180 let mut bid_size_at_best = Decimal::ZERO;
181 let mut ask_size_at_best = Decimal::ZERO;
182 let mut num_providers = 0u32;
183 let mut latest_update = 0u64;
184 let mut has_entries = false;
185
186 for entry in self.quotes.iter() {
187 let ((_, inst), quote) = entry.pair();
188 if inst != instrument {
189 continue;
190 }
191 has_entries = true;
192 num_providers += 1;
193
194 if quote.updated_at > latest_update {
195 latest_update = quote.updated_at;
196 }
197
198 if quote.bid_price > best_bid {
199 best_bid = quote.bid_price;
200 bid_size_at_best = quote.max_bid_size;
201 } else if quote.bid_price == best_bid {
202 bid_size_at_best += quote.max_bid_size;
203 }
204
205 if quote.ask_price < best_ask {
206 best_ask = quote.ask_price;
207 ask_size_at_best = quote.max_ask_size;
208 } else if quote.ask_price == best_ask {
209 ask_size_at_best += quote.max_ask_size;
210 }
211 }
212
213 if !has_entries {
214 self.aggregates.remove(instrument);
215 return None;
216 }
217
218 let aggregate = AggregatedIndicativeQuote {
219 instrument: instrument.to_string(),
220 best_bid,
221 best_ask,
222 indicative_bid_size: bid_size_at_best,
223 indicative_ask_size: ask_size_at_best,
224 num_providers,
225 updated_at: latest_update,
226 };
227
228 self.aggregates
229 .insert(instrument.to_string(), aggregate.clone());
230 Some(aggregate)
231 }
232}
233
234fn get_timestamp_millis() -> u64 {
235 std::time::SystemTime::now()
236 .duration_since(std::time::UNIX_EPOCH)
237 .map(|duration| duration.as_millis() as u64)
238 .unwrap_or(0)
239}
240
241#[cfg(test)]
242mod tests {
243 use super::*;
244 use rust_decimal_macros::dec;
245
246 fn test_wallet(id: u8) -> WalletAddress {
247 let mut bytes = [0u8; 20];
248 bytes[19] = id;
249 WalletAddress::from(alloy::primitives::Address::from(bytes))
250 }
251
252 fn make_entry(instrument: &str, bid: Decimal, ask: Decimal, ts: u64) -> IndicativeQuoteEntry {
253 IndicativeQuoteEntry {
254 instrument: instrument.to_string(),
255 bid_price: bid,
256 ask_price: ask,
257 max_bid_size: dec!(10),
258 max_ask_size: dec!(10),
259 updated_at: ts,
260 }
261 }
262
263 #[test]
264 fn test_single_qp_update() {
265 let cache = IndicativeQuoteCache::new(10_000);
266 let qp = test_wallet(1);
267 let now = get_timestamp_millis();
268
269 cache.update(
270 qp,
271 vec![make_entry("ETH-C-3000", dec!(120), dec!(125), now)],
272 );
273
274 let agg = cache.get_aggregate("ETH-C-3000").unwrap();
275 assert_eq!(agg.best_bid, dec!(120));
276 assert_eq!(agg.best_ask, dec!(125));
277 assert_eq!(agg.num_providers, 1);
278 }
279
280 #[test]
281 fn test_multiple_qps_aggregation() {
282 let cache = IndicativeQuoteCache::new(10_000);
283 let now = get_timestamp_millis();
284
285 cache.update(
287 test_wallet(1),
288 vec![make_entry("ETH-C-3000", dec!(120), dec!(125), now)],
289 );
290 cache.update(
292 test_wallet(2),
293 vec![make_entry("ETH-C-3000", dec!(121), dec!(124), now)],
294 );
295
296 let agg = cache.get_aggregate("ETH-C-3000").unwrap();
297 assert_eq!(agg.best_bid, dec!(121));
298 assert_eq!(agg.best_ask, dec!(124));
299 assert_eq!(agg.num_providers, 2);
300 assert_eq!(agg.indicative_bid_size, dec!(10)); assert_eq!(agg.indicative_ask_size, dec!(10)); }
303
304 #[test]
305 fn test_sizes_sum_at_same_price() {
306 let cache = IndicativeQuoteCache::new(10_000);
307 let now = get_timestamp_millis();
308
309 cache.update(
310 test_wallet(1),
311 vec![make_entry("ETH-C-3000", dec!(120), dec!(125), now)],
312 );
313 cache.update(
314 test_wallet(2),
315 vec![make_entry("ETH-C-3000", dec!(120), dec!(125), now)],
316 );
317
318 let agg = cache.get_aggregate("ETH-C-3000").unwrap();
319 assert_eq!(agg.best_bid, dec!(120));
320 assert_eq!(agg.indicative_bid_size, dec!(20)); assert_eq!(agg.indicative_ask_size, dec!(20));
322 }
323
324 #[test]
325 fn test_qp_disconnect_evicts() {
326 let cache = IndicativeQuoteCache::new(10_000);
327 let now = get_timestamp_millis();
328
329 cache.update(
330 test_wallet(1),
331 vec![make_entry("ETH-C-3000", dec!(120), dec!(125), now)],
332 );
333 cache.update(
334 test_wallet(2),
335 vec![make_entry("ETH-C-3000", dec!(121), dec!(124), now)],
336 );
337
338 let updates = cache.evict_qp(&test_wallet(2));
340
341 let agg = cache.get_aggregate("ETH-C-3000").unwrap();
342 assert_eq!(updates.len(), 1);
343 assert!(updates[0].1.is_some());
344 assert_eq!(agg.best_bid, dec!(120)); assert_eq!(agg.best_ask, dec!(125));
346 assert_eq!(agg.num_providers, 1);
347 }
348
349 #[test]
350 fn test_evict_last_qp_removes_aggregate() {
351 let cache = IndicativeQuoteCache::new(10_000);
352 let now = get_timestamp_millis();
353
354 cache.update(
355 test_wallet(1),
356 vec![make_entry("ETH-C-3000", dec!(120), dec!(125), now)],
357 );
358
359 let updates = cache.evict_qp(&test_wallet(1));
360
361 assert_eq!(updates.len(), 1);
362 assert!(updates[0].1.is_none());
363 assert!(cache.get_aggregate("ETH-C-3000").is_none());
364 }
365
366 #[test]
367 fn test_empty_snapshot_returns_removal_update() {
368 let cache = IndicativeQuoteCache::new(10_000);
369 let qp = test_wallet(1);
370 let now = get_timestamp_millis();
371
372 cache.update(
373 qp,
374 vec![make_entry("ETH-C-3000", dec!(120), dec!(125), now)],
375 );
376
377 let updates = cache.update(qp, vec![]);
378
379 assert_eq!(updates.len(), 1);
380 assert_eq!(updates[0].0, "ETH-C-3000");
381 assert!(updates[0].1.is_none());
382 assert!(cache.get_aggregate("ETH-C-3000").is_none());
383 }
384
385 #[test]
386 fn test_ttl_eviction() {
387 let cache = IndicativeQuoteCache::new(5_000);
388 let old_ts = get_timestamp_millis() - 10_000; cache.update(
391 test_wallet(1),
392 vec![make_entry("ETH-C-3000", dec!(120), dec!(125), old_ts)],
393 );
394
395 assert!(cache.get_aggregate("ETH-C-3000").is_some());
397
398 cache.evict_stale();
399
400 assert!(cache.get_aggregate("ETH-C-3000").is_none());
401 }
402
403 #[test]
404 fn test_update_evicts_symbols_missing_from_new_snapshot() {
405 let cache = IndicativeQuoteCache::new(10_000);
406 let now = get_timestamp_millis();
407
408 cache.update(
410 test_wallet(1),
411 vec![
412 make_entry("ETH-C-3000", dec!(120), dec!(125), now),
413 make_entry("ETH-C-3100", dec!(90), dec!(95), now),
414 ],
415 );
416 assert!(cache.get_aggregate("ETH-C-3000").is_some());
417 assert!(cache.get_aggregate("ETH-C-3100").is_some());
418
419 cache.update(
422 test_wallet(1),
423 vec![make_entry("ETH-C-3000", dec!(121), dec!(126), now + 1)],
424 );
425 assert!(cache.get_aggregate("ETH-C-3000").is_some());
426 assert!(
427 cache.get_aggregate("ETH-C-3100").is_none(),
428 "ETH-C-3100 should be evicted when absent from new snapshot"
429 );
430
431 cache.update(
433 test_wallet(2),
434 vec![make_entry("ETH-C-3100", dec!(88), dec!(93), now + 2)],
435 );
436 cache.update(
438 test_wallet(1),
439 vec![make_entry("ETH-C-3000", dec!(122), dec!(127), now + 3)],
440 );
441 assert!(
442 cache.get_aggregate("ETH-C-3100").is_some(),
443 "QP2's ETH-C-3100 entry must survive QP1's snapshot update"
444 );
445 }
446
447 #[test]
448 fn test_get_all_aggregates() {
449 let cache = IndicativeQuoteCache::new(10_000);
450 let now = get_timestamp_millis();
451
452 cache.update(
453 test_wallet(1),
454 vec![
455 make_entry("ETH-C-3000", dec!(120), dec!(125), now),
456 make_entry("BTC-C-100000", dec!(5000), dec!(5100), now),
457 ],
458 );
459
460 let all = cache.get_all_aggregates();
461 assert_eq!(all.len(), 2);
462 }
463}