1use anyhow::Result;
2use async_trait::async_trait;
3use rust_decimal::Decimal;
4use rust_decimal_macros::dec;
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::Arc;
9use std::time::Duration;
10use tokio::sync::RwLock;
11use tracing::{debug, error, info, warn};
12
13use crate::messaging::EventBusTrait;
14use crate::shared::option_token_address::derive_option_token_address;
15use crate::shared::topics::TOPIC_MARKET_UPDATES;
16use crate::snapshot::{SnapshotError, Snapshotable, SyncStatus};
17use chrono::{DateTime, Utc};
18use hypercall_types::api_models::{Instrument, InstrumentStatus};
19use hypercall_types::{
20 EngineMessage, Market, MarketUpdateMessage, MarketUpdateStatus, OptionType as MessageOptionType,
21};
22use hypercall_types::{TradingModes, WalletAddress};
23
24const DEFAULT_REFRESH_INTERVAL_SECS: u64 = 60;
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct InstrumentSnapshotState {
30 pub instrument_id: i32,
31 pub id: String,
32 pub underlying: String,
33 pub strike: Decimal,
34 pub expiry: u64,
35 pub option_type: String,
36 #[serde(default)]
37 pub option_token_address: Option<WalletAddress>,
38 pub mark_iv: Option<Decimal>,
39 pub volume_24h: Decimal,
40 pub open_interest: Decimal,
41 pub updated_at: i64, #[serde(default)]
43 pub trading_mode: String,
44 #[serde(default = "default_active_status")]
45 pub status: String,
46}
47
48fn default_active_status() -> String {
49 "ACTIVE".to_string()
50}
51
52impl From<&Instrument> for InstrumentSnapshotState {
53 fn from(inst: &Instrument) -> Self {
54 Self {
55 instrument_id: inst.instrument_id,
56 id: inst.id.clone(),
57 underlying: inst.underlying.clone(),
58 strike: inst.strike,
59 expiry: inst.expiry,
60 option_type: inst.option_type.clone(),
61 option_token_address: inst.option_token_address,
62 mark_iv: inst.mark_iv,
63 volume_24h: inst.volume_24h,
64 open_interest: inst.open_interest,
65 updated_at: inst.updated_at.timestamp(),
66 trading_mode: inst.trading_mode.as_db_str(),
67 status: inst.status.as_db_str().to_string(),
68 }
69 }
70}
71
72impl InstrumentSnapshotState {
73 pub fn into_instrument(self) -> Instrument {
74 let state = self;
75 let mut status = InstrumentStatus::from_db_str(&state.status).unwrap_or_else(|| {
76 panic!(
77 "CRITICAL_FAILURE: Unknown instrument status '{}' for {} in snapshot. Persisted data is corrupt.",
78 state.status, state.id
79 )
80 });
81 if status.is_active() && is_expiry_passed(state.expiry) {
86 status = InstrumentStatus::ExpiredPendingPrice;
87 }
88 Instrument {
89 instrument_id: state.instrument_id,
90 id: state.id,
91 underlying: state.underlying,
92 strike: state.strike,
93 expiry: state.expiry,
94 option_type: state.option_type,
95 option_token_address: state.option_token_address,
96 mark_iv: state.mark_iv,
97 volume_24h: state.volume_24h,
98 open_interest: state.open_interest,
99 updated_at: DateTime::from_timestamp(state.updated_at, 0).unwrap_or_else(Utc::now),
100 status,
101 trading_mode: TradingModes::from_db_str(&state.trading_mode),
102 }
103 }
104}
105
106pub mod serialization {
108 use super::{default_active_status, InstrumentSnapshotState, SnapshotError};
109 use rust_decimal::Decimal;
110 use serde::Deserialize;
111
112 pub fn serialize(state: &InstrumentSnapshotState) -> Result<Vec<u8>, SnapshotError> {
113 bincode::serialize(state).map_err(|e| SnapshotError::Serialization(e.to_string()))
114 }
115
116 pub fn deserialize(data: &[u8]) -> Result<InstrumentSnapshotState, SnapshotError> {
117 use tracing::warn;
118
119 if let Ok(state) = bincode::deserialize::<InstrumentSnapshotState>(data) {
126 return Ok(state);
127 }
128
129 #[derive(Deserialize)]
131 struct V3 {
132 instrument_id: i32,
133 id: String,
134 underlying: String,
135 strike: Decimal,
136 expiry: u64,
137 option_type: String,
138 option_token_address: Option<hypercall_types::WalletAddress>,
139 mark_iv: Option<Decimal>,
140 volume_24h: Decimal,
141 open_interest: Decimal,
142 updated_at: i64,
143 trading_mode: String,
144 }
145 if let Ok(v) = bincode::deserialize::<V3>(data) {
146 warn!(
147 instrument_id = v.instrument_id,
148 id = %v.id,
149 "Deserialized instrument snapshot from V3 format (missing status field)"
150 );
151 return Ok(InstrumentSnapshotState {
152 instrument_id: v.instrument_id,
153 id: v.id,
154 underlying: v.underlying,
155 strike: v.strike,
156 expiry: v.expiry,
157 option_type: v.option_type,
158 option_token_address: v.option_token_address,
159 mark_iv: v.mark_iv,
160 volume_24h: v.volume_24h,
161 open_interest: v.open_interest,
162 updated_at: v.updated_at,
163 trading_mode: v.trading_mode,
164 status: default_active_status(),
165 });
166 }
167
168 #[derive(Deserialize)]
170 struct V2 {
171 instrument_id: i32,
172 id: String,
173 underlying: String,
174 strike: Decimal,
175 expiry: u64,
176 option_type: String,
177 option_token_address: Option<hypercall_types::WalletAddress>,
178 mark_iv: Option<Decimal>,
179 volume_24h: Decimal,
180 open_interest: Decimal,
181 updated_at: i64,
182 }
183 if let Ok(v) = bincode::deserialize::<V2>(data) {
184 warn!(
185 instrument_id = v.instrument_id,
186 id = %v.id,
187 "Deserialized instrument snapshot from V2 format (missing trading_mode + status)"
188 );
189 return Ok(InstrumentSnapshotState {
190 instrument_id: v.instrument_id,
191 id: v.id,
192 underlying: v.underlying,
193 strike: v.strike,
194 expiry: v.expiry,
195 option_type: v.option_type,
196 option_token_address: v.option_token_address,
197 mark_iv: v.mark_iv,
198 volume_24h: v.volume_24h,
199 open_interest: v.open_interest,
200 updated_at: v.updated_at,
201 trading_mode: hypercall_types::TradingModes::ORDERBOOK.as_db_str(),
202 status: default_active_status(),
203 });
204 }
205
206 #[derive(Deserialize)]
208 struct V1 {
209 instrument_id: i32,
210 id: String,
211 underlying: String,
212 strike: Decimal,
213 expiry: u64,
214 option_type: String,
215 mark_iv: Option<Decimal>,
216 volume_24h: Decimal,
217 open_interest: Decimal,
218 updated_at: i64,
219 }
220 let v = bincode::deserialize::<V1>(data).map_err(|e| {
221 SnapshotError::Serialization(format!(
222 "Failed to deserialize instrument snapshot (tried v4/v3/v2/v1, {} bytes): {}",
223 data.len(),
224 e
225 ))
226 })?;
227 warn!(
228 instrument_id = v.instrument_id,
229 id = %v.id,
230 "Deserialized instrument snapshot from V1 format (missing option_token_address + trading_mode + status)"
231 );
232 Ok(InstrumentSnapshotState {
233 instrument_id: v.instrument_id,
234 id: v.id,
235 underlying: v.underlying,
236 strike: v.strike,
237 expiry: v.expiry,
238 option_type: v.option_type,
239 option_token_address: None,
240 mark_iv: v.mark_iv,
241 volume_24h: v.volume_24h,
242 open_interest: v.open_interest,
243 updated_at: v.updated_at,
244 trading_mode: hypercall_types::TradingModes::ORDERBOOK.as_db_str(),
245 status: default_active_status(),
246 })
247 }
248}
249
250fn is_expiry_passed(expiry_ts: u64) -> bool {
252 let now = Utc::now().timestamp() as u64;
253 expiry_ts <= now
254}
255
256pub struct InstrumentsCache {
259 instruments: Arc<RwLock<HashMap<String, Instrument>>>,
261
262 by_underlying: Arc<RwLock<HashMap<String, Vec<String>>>>,
264
265 by_underlying_expiry: Arc<RwLock<HashMap<(String, u64), Vec<String>>>>,
267
268 by_instrument_id: Arc<RwLock<HashMap<i32, String>>>,
270
271 last_market_update_seq: Arc<RwLock<i64>>,
273
274 last_snapshot_seq: Arc<RwLock<i64>>,
277
278 sync_status: Arc<SyncStatus>,
280
281 has_received_update: AtomicBool,
283}
284
285impl Default for InstrumentsCache {
286 fn default() -> Self {
287 Self::new()
288 }
289}
290
291impl InstrumentsCache {
292 pub fn new() -> Self {
294 Self {
295 instruments: Arc::new(RwLock::new(HashMap::new())),
296 by_underlying: Arc::new(RwLock::new(HashMap::new())),
297 by_underlying_expiry: Arc::new(RwLock::new(HashMap::new())),
298 by_instrument_id: Arc::new(RwLock::new(HashMap::new())),
299 last_market_update_seq: Arc::new(RwLock::new(0)),
300 last_snapshot_seq: Arc::new(RwLock::new(0)),
301 sync_status: Arc::new(SyncStatus::new()),
302 has_received_update: AtomicBool::new(false),
303 }
304 }
305
306 pub fn sync_status(&self) -> Arc<SyncStatus> {
308 self.sync_status.clone()
309 }
310
311 pub fn is_ready(&self) -> bool {
313 self.sync_status.is_ready()
314 }
315
316 pub fn snapshot_offsets(&self) -> Result<HashMap<String, HashMap<i32, i64>>, SnapshotError> {
319 let seq = tokio::task::block_in_place(|| {
321 tokio::runtime::Handle::current()
322 .block_on(async { *self.last_snapshot_seq.read().await })
323 });
324
325 let mut offsets = HashMap::new();
326 let mut partition_offsets = HashMap::new();
327 partition_offsets.insert(0, seq + 1);
329 offsets.insert(TOPIC_MARKET_UPDATES.to_string(), partition_offsets);
330 Ok(offsets)
331 }
332
333 pub async fn apply_snapshot_offsets(
335 &self,
336 offsets: HashMap<String, HashMap<i32, i64>>,
337 ) -> Result<(), SnapshotError> {
338 if let Some(topic_offsets) = offsets.get(TOPIC_MARKET_UPDATES) {
339 if let Some(&offset) = topic_offsets.get(&0) {
340 let mut seq = self.last_market_update_seq.write().await;
342 *seq = offset.saturating_sub(1);
343 }
344 }
345 Ok(())
346 }
347
348 pub async fn update_seq(&self, seq: i64) {
350 let mut last_seq = self.last_market_update_seq.write().await;
351 if seq > *last_seq {
352 *last_seq = seq;
353 }
354 }
355
356 pub async fn initialize(
359 self: Arc<Self>,
360 diesel_db: &Arc<dyn hypercall_db::BootstrapReader>,
361 event_bus: Arc<dyn EventBusTrait>,
362 ) -> Result<()> {
363 let (tx, rx) = tokio::sync::broadcast::channel::<()>(1);
364 std::mem::forget(tx);
365 self.initialize_with_shutdown(diesel_db, event_bus, rx)
366 .await
367 }
368
369 pub async fn initialize_with_shutdown(
370 self: Arc<Self>,
371 diesel_db: &Arc<dyn hypercall_db::BootstrapReader>,
372 _event_bus: Arc<dyn EventBusTrait>,
373 shutdown_rx: tokio::sync::broadcast::Receiver<()>,
374 ) -> Result<()> {
375 self.load_instruments_from_db(diesel_db.as_ref()).await?;
376
377 self.sync_status.set_ready();
378 self.has_received_update.store(true, Ordering::SeqCst);
379
380 self.clone()
381 .start_periodic_refresh(diesel_db.clone(), shutdown_rx)
382 .await;
383 info!("InstrumentsCache initialized with periodic DB refresh");
384 Ok(())
385 }
386
387 async fn start_periodic_refresh(
388 self: Arc<Self>,
389 diesel_db: Arc<dyn hypercall_db::BootstrapReader>,
390 mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
391 ) {
392 let refresh_interval = DEFAULT_REFRESH_INTERVAL_SECS;
393 info!(
394 "Starting InstrumentsCache periodic refresh (interval={}s)",
395 refresh_interval
396 );
397
398 let cache = self.clone();
399 let handle = tokio::spawn(async move {
400 let mut interval = tokio::time::interval(Duration::from_secs(refresh_interval));
401 interval.tick().await;
402 info!("InstrumentsCache periodic refresh task started");
403
404 loop {
405 tokio::select! {
406 result = shutdown_rx.recv() => {
407 info!(?result, "InstrumentsCache periodic refresh stopping");
408 break;
409 }
410 _ = interval.tick() => {
411 let count_before = cache.len().await;
412 match cache.reload_from_db(diesel_db.as_ref()).await {
413 Ok(()) => {
414 let count_after = cache.len().await;
415 info!(count_before, count_after, "InstrumentsCache periodic refresh completed");
416 }
417 Err(e) => {
418 error!("InstrumentsCache periodic refresh failed: {}", e);
419 }
420 }
421 }
422 }
423 }
424 });
425
426 tokio::spawn(async move {
427 match handle.await {
428 Ok(()) => info!("InstrumentsCache periodic refresh task exited normally"),
429 Err(e) => error!("InstrumentsCache periodic refresh task died: {}", e),
430 }
431 });
432 }
433
434 pub async fn initialize_with_snapshot<L>(
442 self: Arc<Self>,
443 diesel_db: &Arc<dyn hypercall_db::BootstrapReader>,
444 event_bus: Arc<dyn EventBusTrait>,
445 loader: &L,
446 shutdown_rx: tokio::sync::broadcast::Receiver<()>,
447 ) -> Result<Option<HashMap<String, HashMap<i32, i64>>>>
448 where
449 L: crate::snapshot::SnapshotLoader<Key = String, State = InstrumentSnapshotState>,
450 {
451 use crate::snapshot::bootstrap_from_snapshot;
452
453 let offsets = match bootstrap_from_snapshot(loader, self.as_ref()).await {
454 Ok(Some((snapshot_id, offsets))) => {
455 let instruments_count = self.len().await;
456 if instruments_count == 0 {
457 warn!(
458 snapshot_id,
459 "Instruments snapshot restored 0 entries - falling back to database"
460 );
461 self.load_instruments_from_db(diesel_db.as_ref()).await?;
462 self.has_received_update.store(true, Ordering::SeqCst);
463 None
464 } else {
465 self.apply_snapshot_offsets(offsets.clone()).await?;
466 self.has_received_update.store(true, Ordering::SeqCst);
467 info!(
468 snapshot_id,
469 instruments_count, "InstrumentsCache restored from snapshot"
470 );
471 Some(offsets)
472 }
473 }
474 Ok(None) => {
475 info!("No instruments snapshot found, loading from database");
476 self.load_instruments_from_db(diesel_db.as_ref()).await?;
477 self.has_received_update.store(true, Ordering::SeqCst);
478 None
479 }
480 Err(e) => {
481 let error_msg = e.to_string();
482 warn!(error = %error_msg, "Failed to restore instruments snapshot, falling back to database");
483 self.load_instruments_from_db(diesel_db.as_ref()).await?;
484 self.has_received_update.store(true, Ordering::SeqCst);
485 None
486 }
487 };
488
489 if offsets.is_none() {
490 self.sync_status.set_ready();
491 }
492
493 self.clone()
494 .start_event_listeners_with_shutdown(event_bus, shutdown_rx.resubscribe())
495 .await?;
496
497 self.clone()
498 .start_periodic_refresh(diesel_db.clone(), shutdown_rx)
499 .await;
500
501 let watchdog_cache = self.clone();
502 let watchdog_db = diesel_db.clone();
503 tokio::spawn(async move {
504 for delay_secs in [30u64, 60] {
505 tokio::time::sleep(Duration::from_secs(delay_secs)).await;
506 let count = watchdog_cache.len().await;
507 if count > 0 {
508 info!(
509 count,
510 delay_secs, "InstrumentsCache watchdog: cache is populated, exiting"
511 );
512 return;
513 }
514 warn!(
515 delay_secs,
516 "InstrumentsCache watchdog: cache still empty, reloading from DB"
517 );
518 match watchdog_cache.reload_from_db(watchdog_db.as_ref()).await {
519 Ok(()) => {
520 let new_count = watchdog_cache.len().await;
521 info!(
522 new_count,
523 delay_secs, "InstrumentsCache watchdog: DB reload complete"
524 );
525 if new_count > 0 {
526 return;
527 }
528 }
529 Err(e) => {
530 error!(error = %e, delay_secs, "InstrumentsCache watchdog: DB reload failed");
531 }
532 }
533 }
534 warn!("InstrumentsCache watchdog: cache still empty after all retries");
535 });
536
537 Ok(offsets)
538 }
539
540 async fn load_instruments_from_db(
541 &self,
542 diesel_db: &dyn hypercall_db::BootstrapReader,
543 ) -> Result<()> {
544 use hypercall_types::expiry_date_to_timestamp;
545
546 let db_count: i64 = diesel_db.get_instrument_count().await.unwrap_or(-1);
547
548 if db_count < 0 {
549 error!("Failed to count instruments in database - table may not exist");
550 }
551
552 let records = diesel_db.get_all_instruments().await?;
553
554 let mut instruments: Vec<Instrument> = records
558 .into_iter()
559 .map(|r| {
560 let expiry_ts = expiry_date_to_timestamp(&r.underlying, r.expiry as u64);
561 let option_type_str = match r.option_type {
562 hypercall_types::OptionType::Call => "call".to_string(),
563 hypercall_types::OptionType::Put => "put".to_string(),
564 };
565 Instrument {
566 instrument_id: r.instrument_numeric_id,
567 id: r.id,
568 underlying: r.underlying,
569 strike: r.strike,
570 expiry: expiry_ts as u64,
571 option_type: option_type_str,
572 option_token_address: r.option_token_address,
573 mark_iv: None,
574 volume_24h: Decimal::ZERO,
575 open_interest: Decimal::ZERO,
576 updated_at: Utc::now(),
577 status: r.status,
578 trading_mode: TradingModes::from_db_str(&r.trading_mode),
579 }
580 })
581 .collect();
582
583 for inst in &mut instruments {
586 if inst.status.is_active() && is_expiry_passed(inst.expiry) {
587 inst.status = InstrumentStatus::ExpiredPendingPrice;
588 }
589 }
590 let mut rebuilt_instruments_map = HashMap::with_capacity(instruments.len());
591 let mut rebuilt_by_underlying_map: HashMap<String, Vec<String>> = HashMap::new();
592 let mut rebuilt_by_expiry_map: HashMap<(String, u64), Vec<String>> = HashMap::new();
593 let mut rebuilt_by_id_map = HashMap::with_capacity(instruments.len());
594
595 for instrument in instruments {
596 let symbol = instrument.id.clone();
597 let underlying = instrument.underlying.clone();
598 let expiry = instrument.expiry;
599 let instrument_id = instrument.instrument_id;
600
601 rebuilt_instruments_map.insert(symbol.clone(), instrument);
602 rebuilt_by_underlying_map
603 .entry(underlying.clone())
604 .or_default()
605 .push(symbol.clone());
606 rebuilt_by_expiry_map
607 .entry((underlying, expiry))
608 .or_default()
609 .push(symbol.clone());
610 rebuilt_by_id_map.insert(instrument_id, symbol);
611 }
612
613 let mut instruments_map = self.instruments.write().await;
614 let mut by_underlying_map = self.by_underlying.write().await;
615 let mut by_expiry_map = self.by_underlying_expiry.write().await;
616 let mut by_id_map = self.by_instrument_id.write().await;
617
618 *instruments_map = rebuilt_instruments_map;
619 *by_underlying_map = rebuilt_by_underlying_map;
620 *by_expiry_map = rebuilt_by_expiry_map;
621 *by_id_map = rebuilt_by_id_map;
622
623 let total_instruments = instruments_map.len();
624 let total_underlyings = by_underlying_map.len();
625 let underlyings: Vec<String> = by_underlying_map.keys().cloned().collect();
626
627 info!(
628 db_count,
629 total_instruments,
630 total_underlyings,
631 ?underlyings,
632 "InstrumentsCache loaded from database"
633 );
634
635 if total_instruments == 0 && db_count > 0 {
636 warn!(
637 db_count,
638 "Database has instruments but none loaded - possible conversion error"
639 );
640 }
641
642 Ok(())
643 }
644
645 async fn start_event_listeners_with_shutdown(
648 self: Arc<Self>,
649 event_bus: Arc<dyn EventBusTrait>,
650 mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
651 ) -> Result<()> {
652 info!("Starting event listener for InstrumentsCache");
653
654 let topics = vec![TOPIC_MARKET_UPDATES.to_string()];
655 let mut receiver = event_bus
656 .subscribe_with_offsets(topics)
657 .await
658 .map_err(|e| anyhow::anyhow!("Failed to subscribe to event topics: {}", e))?;
659
660 tokio::spawn(async move {
662 loop {
663 tokio::select! {
664 _ = shutdown_rx.recv() => {
665 info!("InstrumentsCache event listener received shutdown signal");
666 break;
667 }
668 maybe_envelope = receiver.recv() => {
669 match maybe_envelope {
670 Some(envelope) => {
671 let offset = envelope.offset;
672
673 match envelope.message {
674 EngineMessage::MarketUpdate(market_update) => {
675 if let Err(e) = self.handle_market_update(market_update).await {
676 error!("Error handling market update: {}", e);
677 } else if let Some(off) = offset {
678 self.update_seq(off).await;
681 }
682 }
683 other => {
684 debug!("InstrumentsCache ignoring non-MarketUpdate message: {:?}", std::mem::discriminant(&other));
685 }
686 }
687 }
688 None => break, }
690 }
691 }
692 }
693 info!("InstrumentsCache event listener stopped");
694 });
695
696 info!("Event listener started for InstrumentsCache");
697 Ok(())
698 }
699
700 pub async fn handle_market_update(&self, message: MarketUpdateMessage) -> Result<()> {
703 match message.status {
704 MarketUpdateStatus::MarketCreated | MarketUpdateStatus::MarketAlreadyExists => {
705 self.add_instrument(&message.market).await?;
706 info!("Added instrument to cache: {}", message.market.symbol);
707 }
708 MarketUpdateStatus::MarketDeleted
709 | MarketUpdateStatus::MarketExpired
710 | MarketUpdateStatus::MarketPendingSettlement => {
711 self.remove_instrument(&message.market.symbol).await?;
712 info!("Removed instrument from cache: {}", message.market.symbol);
713 }
714 MarketUpdateStatus::MarketCreationFailed | MarketUpdateStatus::MarketDeletionFailed => {
715 debug!(
717 "Market lifecycle update failed for {}: not mutating cache",
718 message.market.symbol
719 );
720 }
721 }
722 Ok(())
723 }
724
725 async fn add_instrument(&self, market: &Market) -> Result<()> {
727 use hypercall_types::expiry_date_to_timestamp;
728
729 let symbol = market.symbol.clone();
730 let underlying = market.underlying.clone();
731 let expiry = expiry_date_to_timestamp(&underlying, market.expiry) as u64;
733
734 let instruments_map = self.instruments.read().await;
736 let max_id = instruments_map
737 .values()
738 .map(|i| i.instrument_id)
739 .max()
740 .unwrap_or(0);
741 drop(instruments_map);
742
743 let option_type = match market.option_type {
744 MessageOptionType::Call => "call".to_string(),
745 MessageOptionType::Put => "put".to_string(),
746 };
747 let option_token_address = Some(derive_option_token_address(
748 &underlying,
749 market.expiry,
750 market.strike,
751 &option_type,
752 )?);
753
754 let instrument = Instrument {
755 instrument_id: max_id + 1,
756 id: symbol.clone(),
757 underlying: underlying.clone(),
758 strike: market.strike,
759 expiry,
760 option_type,
761 option_token_address,
762 mark_iv: None,
763 volume_24h: dec!(0),
764 open_interest: dec!(0),
765 updated_at: Utc::now(),
766 status: InstrumentStatus::Active,
767 trading_mode: TradingModes::ORDERBOOK,
768 };
769 let instrument_id = instrument.instrument_id;
770
771 let mut instruments_map = self.instruments.write().await;
773 instruments_map.insert(symbol.clone(), instrument);
774 drop(instruments_map);
775
776 let mut by_underlying_map = self.by_underlying.write().await;
778 by_underlying_map
779 .entry(underlying.clone())
780 .or_insert_with(Vec::new)
781 .push(symbol.clone());
782 drop(by_underlying_map);
783
784 let mut by_expiry_map = self.by_underlying_expiry.write().await;
786 by_expiry_map
787 .entry((underlying, expiry))
788 .or_insert_with(Vec::new)
789 .push(symbol.clone());
790 drop(by_expiry_map);
791
792 let mut by_id_map = self.by_instrument_id.write().await;
793 by_id_map.insert(instrument_id, symbol.clone());
794
795 Ok(())
796 }
797
798 async fn remove_instrument(&self, symbol: &str) -> Result<()> {
800 let instruments_map = self.instruments.read().await;
802 let instrument = match instruments_map.get(symbol) {
803 Some(i) => i.clone(),
804 None => return Ok(()), };
806 drop(instruments_map);
807
808 let underlying = instrument.underlying.clone();
809 let expiry = instrument.expiry;
810 let instrument_id = instrument.instrument_id;
811
812 let mut instruments_map = self.instruments.write().await;
814 instruments_map.remove(symbol);
815 drop(instruments_map);
816
817 let mut by_underlying_map = self.by_underlying.write().await;
819 if let Some(symbols) = by_underlying_map.get_mut(&underlying) {
820 symbols.retain(|s| s != symbol);
821 if symbols.is_empty() {
822 by_underlying_map.remove(&underlying);
823 }
824 }
825 drop(by_underlying_map);
826
827 let mut by_expiry_map = self.by_underlying_expiry.write().await;
829 if let Some(symbols) = by_expiry_map.get_mut(&(underlying.clone(), expiry)) {
830 symbols.retain(|s| s != symbol);
831 if symbols.is_empty() {
832 by_expiry_map.remove(&(underlying, expiry));
833 }
834 }
835 drop(by_expiry_map);
836
837 let mut by_id_map = self.by_instrument_id.write().await;
838 by_id_map.remove(&instrument_id);
839
840 Ok(())
841 }
842
843 pub async fn get_by_symbol(&self, symbol: &str) -> Option<Instrument> {
845 let instruments = self.instruments.read().await;
846 instruments.get(symbol).cloned()
847 }
848
849 pub fn allows_rfq(&self, symbol: &str) -> bool {
852 self.instruments
853 .try_read()
854 .map(|guard| {
855 guard
856 .get(symbol)
857 .map(|i| i.trading_mode.allows_rfq())
858 .unwrap_or(false)
859 })
860 .unwrap_or(false)
861 }
862
863 pub async fn get_by_underlying(&self, underlying: &str) -> Vec<Instrument> {
865 let by_underlying = self.by_underlying.read().await;
866 let available: Vec<&String> = by_underlying.keys().collect();
867 let cache_total = self.instruments.read().await.len();
868
869 let symbols = match by_underlying.get(underlying) {
870 Some(s) => s.clone(),
871 None => {
872 warn!(
873 underlying,
874 ?available,
875 cache_total,
876 "No instruments found for underlying"
877 );
878 return Vec::new();
879 }
880 };
881 drop(by_underlying);
882
883 let instruments = self.instruments.read().await;
884 let result: Vec<Instrument> = symbols
885 .iter()
886 .filter_map(|symbol| instruments.get(symbol).cloned())
887 .collect();
888
889 result
890 }
891
892 pub async fn get_by_underlying_and_expiry(
894 &self,
895 underlying: &str,
896 expiry: u64,
897 ) -> Vec<Instrument> {
898 let by_expiry = self.by_underlying_expiry.read().await;
899 let symbols = match by_expiry.get(&(underlying.to_string(), expiry)) {
900 Some(s) => s.clone(),
901 None => return Vec::new(),
902 };
903 drop(by_expiry);
904
905 let instruments = self.instruments.read().await;
906 symbols
907 .iter()
908 .filter_map(|symbol| instruments.get(symbol).cloned())
909 .collect()
910 }
911
912 pub async fn get_by_instrument_id(&self, instrument_id: i32) -> Option<Instrument> {
914 let by_id = self.by_instrument_id.read().await;
915 let symbol = match by_id.get(&instrument_id) {
916 Some(symbol) => symbol.clone(),
917 None => return None,
918 };
919 drop(by_id);
920
921 let instruments = self.instruments.read().await;
922 instruments.get(&symbol).cloned()
923 }
924
925 pub async fn get_all(&self) -> Vec<Instrument> {
927 let instruments = self.instruments.read().await;
928 instruments.values().cloned().collect()
929 }
930
931 pub async fn len(&self) -> usize {
933 let instruments = self.instruments.read().await;
934 instruments.len()
935 }
936
937 pub async fn is_empty(&self) -> bool {
939 let instruments = self.instruments.read().await;
940 instruments.is_empty()
941 }
942
943 pub async fn reload_from_db(
944 &self,
945 diesel_db: &dyn hypercall_db::BootstrapReader,
946 ) -> Result<()> {
947 self.load_instruments_from_db(diesel_db).await
948 }
949
950 async fn add_instrument_direct(&self, instrument: Instrument) -> Result<()> {
952 let symbol = instrument.id.clone();
953 let underlying = instrument.underlying.clone();
954 let expiry = instrument.expiry;
955 let instrument_id = instrument.instrument_id;
956
957 let mut instruments_map = self.instruments.write().await;
959 instruments_map.insert(symbol.clone(), instrument);
960 drop(instruments_map);
961
962 let mut by_underlying_map = self.by_underlying.write().await;
964 by_underlying_map
965 .entry(underlying.clone())
966 .or_insert_with(Vec::new)
967 .push(symbol.clone());
968 drop(by_underlying_map);
969
970 let mut by_expiry_map = self.by_underlying_expiry.write().await;
972 by_expiry_map
973 .entry((underlying, expiry))
974 .or_insert_with(Vec::new)
975 .push(symbol.clone());
976 drop(by_expiry_map);
977
978 let mut by_id_map = self.by_instrument_id.write().await;
979 by_id_map.insert(instrument_id, symbol);
980
981 Ok(())
982 }
983}
984
985#[async_trait]
986impl Snapshotable for InstrumentsCache {
987 type Key = String;
988 type State = InstrumentSnapshotState;
989
990 async fn list_all(&self) -> Result<HashMap<Self::Key, Self::State>, SnapshotError> {
995 let instruments = self.instruments.read().await;
997 let seq = *self.last_market_update_seq.read().await;
998
999 {
1001 let mut snapshot_seq = self.last_snapshot_seq.write().await;
1002 *snapshot_seq = seq;
1003 }
1004
1005 let mut states = HashMap::with_capacity(instruments.len());
1006 for (symbol, instrument) in instruments.iter() {
1007 states.insert(symbol.clone(), InstrumentSnapshotState::from(instrument));
1008 }
1009
1010 Ok(states)
1011 }
1012
1013 async fn restore(&self, key: &Self::Key, state: Self::State) -> Result<(), SnapshotError> {
1014 let instrument: Instrument = state.into_instrument();
1015
1016 if &instrument.id != key {
1018 return Err(SnapshotError::Serialization(format!(
1019 "Key mismatch: expected {}, got {}",
1020 key, instrument.id
1021 )));
1022 }
1023
1024 self.add_instrument_direct(instrument)
1025 .await
1026 .map_err(|e| SnapshotError::DbError(e.to_string()))
1027 }
1028
1029 async fn clear_all(&self) -> Result<(), SnapshotError> {
1030 self.instruments.write().await.clear();
1031 self.by_underlying.write().await.clear();
1032 self.by_underlying_expiry.write().await.clear();
1033 self.by_instrument_id.write().await.clear();
1034 *self.last_market_update_seq.write().await = 0;
1035 self.has_received_update.store(false, Ordering::SeqCst);
1036 Ok(())
1037 }
1038}
1039
1040#[cfg(test)]
1041mod tests {
1042 use super::*;
1043
1044 #[tokio::test]
1045 async fn reload_from_db_keeps_existing_cache_when_db_load_fails() {
1046 let cache = InstrumentsCache::new();
1047
1048 cache
1049 .add_instrument_direct(Instrument {
1050 instrument_id: 1,
1051 id: "BTC-20270101-100000-C".to_string(),
1052 underlying: "BTC".to_string(),
1053 strike: dec!(100000),
1054 expiry: 1798761600,
1055 option_type: "call".to_string(),
1056 option_token_address: None,
1057 mark_iv: None,
1058 volume_24h: dec!(0),
1059 open_interest: dec!(0),
1060 updated_at: Utc::now(),
1061 status: InstrumentStatus::Active,
1062 trading_mode: TradingModes::ORDERBOOK,
1063 })
1064 .await
1065 .expect("seed instrument should be inserted");
1066
1067 let before = cache.len().await;
1068 assert_eq!(before, 1, "expected seeded cache size");
1069
1070 let bad_db = hypercall_db_diesel::DieselDb::new_no_tls(
1073 "postgresql://invalid:invalid@127.0.0.1:1/invalid",
1074 1,
1075 )
1076 .await
1077 .expect("lazy pool should be constructed");
1078
1079 let reload_result = cache.reload_from_db(&bad_db).await;
1080 assert!(
1081 reload_result.is_err(),
1082 "reload should fail against an invalid DB endpoint"
1083 );
1084
1085 let after = cache.len().await;
1086 assert_eq!(
1087 after, before,
1088 "failed reload must not clear previously cached instruments"
1089 );
1090 }
1091}