Skip to main content

hypercall/read_cache/
instruments_registry.rs

1use anyhow::Result;
2use async_trait::async_trait;
3use rust_decimal::Decimal;
4use rust_decimal_macros::dec;
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::Arc;
9use std::time::Duration;
10use tokio::sync::RwLock;
11use tracing::{debug, error, info, warn};
12
13use crate::messaging::EventBusTrait;
14use crate::shared::option_token_address::derive_option_token_address;
15use crate::shared::topics::TOPIC_MARKET_UPDATES;
16use crate::snapshot::{SnapshotError, Snapshotable, SyncStatus};
17use chrono::{DateTime, Utc};
18use hypercall_types::api_models::{Instrument, InstrumentStatus};
19use hypercall_types::{
20    EngineMessage, Market, MarketUpdateMessage, MarketUpdateStatus, OptionType as MessageOptionType,
21};
22use hypercall_types::{TradingModes, WalletAddress};
23
24/// Default interval for periodic DB refresh (60 seconds)
25const DEFAULT_REFRESH_INTERVAL_SECS: u64 = 60;
26
27/// Serializable instrument state for snapshots.
28#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct InstrumentSnapshotState {
30    pub instrument_id: i32,
31    pub id: String,
32    pub underlying: String,
33    pub strike: Decimal,
34    pub expiry: u64,
35    pub option_type: String,
36    #[serde(default)]
37    pub option_token_address: Option<WalletAddress>,
38    pub mark_iv: Option<Decimal>,
39    pub volume_24h: Decimal,
40    pub open_interest: Decimal,
41    pub updated_at: i64, // Unix timestamp for serialization
42    #[serde(default)]
43    pub trading_mode: String,
44    #[serde(default = "default_active_status")]
45    pub status: String,
46}
47
48fn default_active_status() -> String {
49    "ACTIVE".to_string()
50}
51
52impl From<&Instrument> for InstrumentSnapshotState {
53    fn from(inst: &Instrument) -> Self {
54        Self {
55            instrument_id: inst.instrument_id,
56            id: inst.id.clone(),
57            underlying: inst.underlying.clone(),
58            strike: inst.strike,
59            expiry: inst.expiry,
60            option_type: inst.option_type.clone(),
61            option_token_address: inst.option_token_address,
62            mark_iv: inst.mark_iv,
63            volume_24h: inst.volume_24h,
64            open_interest: inst.open_interest,
65            updated_at: inst.updated_at.timestamp(),
66            trading_mode: inst.trading_mode.as_db_str(),
67            status: inst.status.as_db_str().to_string(),
68        }
69    }
70}
71
72impl InstrumentSnapshotState {
73    pub fn into_instrument(self) -> Instrument {
74        let state = self;
75        let mut status = InstrumentStatus::from_db_str(&state.status).unwrap_or_else(|| {
76            panic!(
77                "CRITICAL_FAILURE: Unknown instrument status '{}' for {} in snapshot. Persisted data is corrupt.",
78                state.status, state.id
79            )
80        });
81        // If the stored status claims Active but the expiry has already passed,
82        // override to ExpiredPendingPrice. This prevents stale snapshots (or old
83        // snapshots without a status field) from briefly serving expired instruments
84        // until the engine or periodic refresh corrects the state.
85        if status.is_active() && is_expiry_passed(state.expiry) {
86            status = InstrumentStatus::ExpiredPendingPrice;
87        }
88        Instrument {
89            instrument_id: state.instrument_id,
90            id: state.id,
91            underlying: state.underlying,
92            strike: state.strike,
93            expiry: state.expiry,
94            option_type: state.option_type,
95            option_token_address: state.option_token_address,
96            mark_iv: state.mark_iv,
97            volume_24h: state.volume_24h,
98            open_interest: state.open_interest,
99            updated_at: DateTime::from_timestamp(state.updated_at, 0).unwrap_or_else(Utc::now),
100            status,
101            trading_mode: TradingModes::from_db_str(&state.trading_mode),
102        }
103    }
104}
105
106/// Binary serialization helpers for instrument snapshots.
107pub mod serialization {
108    use super::{default_active_status, InstrumentSnapshotState, SnapshotError};
109    use rust_decimal::Decimal;
110    use serde::Deserialize;
111
112    pub fn serialize(state: &InstrumentSnapshotState) -> Result<Vec<u8>, SnapshotError> {
113        bincode::serialize(state).map_err(|e| SnapshotError::Serialization(e.to_string()))
114    }
115
116    pub fn deserialize(data: &[u8]) -> Result<InstrumentSnapshotState, SnapshotError> {
117        use tracing::warn;
118
119        // Helper: bincode::deserialize already validates ALL bytes are consumed
120        // (returns Err if trailing bytes remain), so truncated V4 data cannot
121        // silently succeed as V3. We additionally log on every fallback so
122        // schema drift is visible in metrics.
123
124        // V4 (current): all fields including status
125        if let Ok(state) = bincode::deserialize::<InstrumentSnapshotState>(data) {
126            return Ok(state);
127        }
128
129        // V3: has trading_mode but no status
130        #[derive(Deserialize)]
131        struct V3 {
132            instrument_id: i32,
133            id: String,
134            underlying: String,
135            strike: Decimal,
136            expiry: u64,
137            option_type: String,
138            option_token_address: Option<hypercall_types::WalletAddress>,
139            mark_iv: Option<Decimal>,
140            volume_24h: Decimal,
141            open_interest: Decimal,
142            updated_at: i64,
143            trading_mode: String,
144        }
145        if let Ok(v) = bincode::deserialize::<V3>(data) {
146            warn!(
147                instrument_id = v.instrument_id,
148                id = %v.id,
149                "Deserialized instrument snapshot from V3 format (missing status field)"
150            );
151            return Ok(InstrumentSnapshotState {
152                instrument_id: v.instrument_id,
153                id: v.id,
154                underlying: v.underlying,
155                strike: v.strike,
156                expiry: v.expiry,
157                option_type: v.option_type,
158                option_token_address: v.option_token_address,
159                mark_iv: v.mark_iv,
160                volume_24h: v.volume_24h,
161                open_interest: v.open_interest,
162                updated_at: v.updated_at,
163                trading_mode: v.trading_mode,
164                status: default_active_status(),
165            });
166        }
167
168        // V2: has option_token_address but no trading_mode or status
169        #[derive(Deserialize)]
170        struct V2 {
171            instrument_id: i32,
172            id: String,
173            underlying: String,
174            strike: Decimal,
175            expiry: u64,
176            option_type: String,
177            option_token_address: Option<hypercall_types::WalletAddress>,
178            mark_iv: Option<Decimal>,
179            volume_24h: Decimal,
180            open_interest: Decimal,
181            updated_at: i64,
182        }
183        if let Ok(v) = bincode::deserialize::<V2>(data) {
184            warn!(
185                instrument_id = v.instrument_id,
186                id = %v.id,
187                "Deserialized instrument snapshot from V2 format (missing trading_mode + status)"
188            );
189            return Ok(InstrumentSnapshotState {
190                instrument_id: v.instrument_id,
191                id: v.id,
192                underlying: v.underlying,
193                strike: v.strike,
194                expiry: v.expiry,
195                option_type: v.option_type,
196                option_token_address: v.option_token_address,
197                mark_iv: v.mark_iv,
198                volume_24h: v.volume_24h,
199                open_interest: v.open_interest,
200                updated_at: v.updated_at,
201                trading_mode: hypercall_types::TradingModes::ORDERBOOK.as_db_str(),
202                status: default_active_status(),
203            });
204        }
205
206        // V1 (oldest): no option_token_address, no trading_mode, no status
207        #[derive(Deserialize)]
208        struct V1 {
209            instrument_id: i32,
210            id: String,
211            underlying: String,
212            strike: Decimal,
213            expiry: u64,
214            option_type: String,
215            mark_iv: Option<Decimal>,
216            volume_24h: Decimal,
217            open_interest: Decimal,
218            updated_at: i64,
219        }
220        let v = bincode::deserialize::<V1>(data).map_err(|e| {
221            SnapshotError::Serialization(format!(
222                "Failed to deserialize instrument snapshot (tried v4/v3/v2/v1, {} bytes): {}",
223                data.len(),
224                e
225            ))
226        })?;
227        warn!(
228            instrument_id = v.instrument_id,
229            id = %v.id,
230            "Deserialized instrument snapshot from V1 format (missing option_token_address + trading_mode + status)"
231        );
232        Ok(InstrumentSnapshotState {
233            instrument_id: v.instrument_id,
234            id: v.id,
235            underlying: v.underlying,
236            strike: v.strike,
237            expiry: v.expiry,
238            option_type: v.option_type,
239            option_token_address: None,
240            mark_iv: v.mark_iv,
241            volume_24h: v.volume_24h,
242            open_interest: v.open_interest,
243            updated_at: v.updated_at,
244            trading_mode: hypercall_types::TradingModes::ORDERBOOK.as_db_str(),
245            status: default_active_status(),
246        })
247    }
248}
249
250/// Check whether an expiry Unix timestamp has already passed.
251fn is_expiry_passed(expiry_ts: u64) -> bool {
252    let now = Utc::now().timestamp() as u64;
253    expiry_ts <= now
254}
255
256/// Cache for fast instrument lookups
257/// Provides O(1) access to instruments by symbol, underlying, expiry, and instrument id
258pub struct InstrumentsCache {
259    /// Primary storage: symbol -> Instrument
260    instruments: Arc<RwLock<HashMap<String, Instrument>>>,
261
262    /// Index: underlying -> Vec<symbol>
263    by_underlying: Arc<RwLock<HashMap<String, Vec<String>>>>,
264
265    /// Index: (underlying, expiry) -> Vec<symbol>
266    by_underlying_expiry: Arc<RwLock<HashMap<(String, u64), Vec<String>>>>,
267
268    /// Index: instrument id -> symbol
269    by_instrument_id: Arc<RwLock<HashMap<i32, String>>>,
270
271    /// Last processed market update sequence number
272    last_market_update_seq: Arc<RwLock<i64>>,
273
274    /// Sequence captured during last list_all() call for atomic snapshot consistency.
275    /// This ensures snapshot_offsets() returns the same seq that was captured with the state.
276    last_snapshot_seq: Arc<RwLock<i64>>,
277
278    /// Sync status for readiness tracking
279    sync_status: Arc<SyncStatus>,
280
281    /// Flag to track if we've received at least one update (for readiness)
282    has_received_update: AtomicBool,
283}
284
285impl Default for InstrumentsCache {
286    fn default() -> Self {
287        Self::new()
288    }
289}
290
291impl InstrumentsCache {
292    /// Create a new empty instruments cache
293    pub fn new() -> Self {
294        Self {
295            instruments: Arc::new(RwLock::new(HashMap::new())),
296            by_underlying: Arc::new(RwLock::new(HashMap::new())),
297            by_underlying_expiry: Arc::new(RwLock::new(HashMap::new())),
298            by_instrument_id: Arc::new(RwLock::new(HashMap::new())),
299            last_market_update_seq: Arc::new(RwLock::new(0)),
300            last_snapshot_seq: Arc::new(RwLock::new(0)),
301            sync_status: Arc::new(SyncStatus::new()),
302            has_received_update: AtomicBool::new(false),
303        }
304    }
305
306    /// Get the sync status for readiness checks.
307    pub fn sync_status(&self) -> Arc<SyncStatus> {
308        self.sync_status.clone()
309    }
310
311    /// Check if the cache is ready to serve requests.
312    pub fn is_ready(&self) -> bool {
313        self.sync_status.is_ready()
314    }
315
316    /// Get snapshot offsets for the market updates topic.
317    /// Returns the last processed sequence number as an offset.
318    pub fn snapshot_offsets(&self) -> Result<HashMap<String, HashMap<i32, i64>>, SnapshotError> {
319        // Use the snapshot seq captured during list_all() for consistency
320        let seq = tokio::task::block_in_place(|| {
321            tokio::runtime::Handle::current()
322                .block_on(async { *self.last_snapshot_seq.read().await })
323        });
324
325        let mut offsets = HashMap::new();
326        let mut partition_offsets = HashMap::new();
327        // Store next offset to process (seq + 1)
328        partition_offsets.insert(0, seq + 1);
329        offsets.insert(TOPIC_MARKET_UPDATES.to_string(), partition_offsets);
330        Ok(offsets)
331    }
332
333    /// Apply snapshot offsets after restore.
334    pub async fn apply_snapshot_offsets(
335        &self,
336        offsets: HashMap<String, HashMap<i32, i64>>,
337    ) -> Result<(), SnapshotError> {
338        if let Some(topic_offsets) = offsets.get(TOPIC_MARKET_UPDATES) {
339            if let Some(&offset) = topic_offsets.get(&0) {
340                // offset is next_offset_to_apply, so last processed is offset - 1
341                let mut seq = self.last_market_update_seq.write().await;
342                *seq = offset.saturating_sub(1);
343            }
344        }
345        Ok(())
346    }
347
348    /// Update the last processed sequence number.
349    pub async fn update_seq(&self, seq: i64) {
350        let mut last_seq = self.last_market_update_seq.write().await;
351        if seq > *last_seq {
352            *last_seq = seq;
353        }
354    }
355
356    /// Initialize the cache by loading instruments from DB and starting event listener (legacy, no shutdown support).
357    /// For graceful shutdown support, use `initialize_with_shutdown`.
358    pub async fn initialize(
359        self: Arc<Self>,
360        diesel_db: &Arc<dyn hypercall_db::BootstrapReader>,
361        event_bus: Arc<dyn EventBusTrait>,
362    ) -> Result<()> {
363        let (tx, rx) = tokio::sync::broadcast::channel::<()>(1);
364        std::mem::forget(tx);
365        self.initialize_with_shutdown(diesel_db, event_bus, rx)
366            .await
367    }
368
369    pub async fn initialize_with_shutdown(
370        self: Arc<Self>,
371        diesel_db: &Arc<dyn hypercall_db::BootstrapReader>,
372        _event_bus: Arc<dyn EventBusTrait>,
373        shutdown_rx: tokio::sync::broadcast::Receiver<()>,
374    ) -> Result<()> {
375        self.load_instruments_from_db(diesel_db.as_ref()).await?;
376
377        self.sync_status.set_ready();
378        self.has_received_update.store(true, Ordering::SeqCst);
379
380        self.clone()
381            .start_periodic_refresh(diesel_db.clone(), shutdown_rx)
382            .await;
383        info!("InstrumentsCache initialized with periodic DB refresh");
384        Ok(())
385    }
386
387    async fn start_periodic_refresh(
388        self: Arc<Self>,
389        diesel_db: Arc<dyn hypercall_db::BootstrapReader>,
390        mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
391    ) {
392        let refresh_interval = DEFAULT_REFRESH_INTERVAL_SECS;
393        info!(
394            "Starting InstrumentsCache periodic refresh (interval={}s)",
395            refresh_interval
396        );
397
398        let cache = self.clone();
399        let handle = tokio::spawn(async move {
400            let mut interval = tokio::time::interval(Duration::from_secs(refresh_interval));
401            interval.tick().await;
402            info!("InstrumentsCache periodic refresh task started");
403
404            loop {
405                tokio::select! {
406                    result = shutdown_rx.recv() => {
407                        info!(?result, "InstrumentsCache periodic refresh stopping");
408                        break;
409                    }
410                    _ = interval.tick() => {
411                        let count_before = cache.len().await;
412                        match cache.reload_from_db(diesel_db.as_ref()).await {
413                            Ok(()) => {
414                                let count_after = cache.len().await;
415                                info!(count_before, count_after, "InstrumentsCache periodic refresh completed");
416                            }
417                            Err(e) => {
418                                error!("InstrumentsCache periodic refresh failed: {}", e);
419                            }
420                        }
421                    }
422                }
423            }
424        });
425
426        tokio::spawn(async move {
427            match handle.await {
428                Ok(()) => info!("InstrumentsCache periodic refresh task exited normally"),
429                Err(e) => error!("InstrumentsCache periodic refresh task died: {}", e),
430            }
431        });
432    }
433
434    /// Initialize the cache with snapshot support.
435    ///
436    /// Flow:
437    /// 1. Try to load from snapshot (if available)
438    /// 2. If snapshot exists: restore state, catchup from offsets, mark as ready
439    /// 3. If no snapshot: load from DB as fallback, mark as ready
440    /// 4. Start event listener for live updates
441    pub async fn initialize_with_snapshot<L>(
442        self: Arc<Self>,
443        diesel_db: &Arc<dyn hypercall_db::BootstrapReader>,
444        event_bus: Arc<dyn EventBusTrait>,
445        loader: &L,
446        shutdown_rx: tokio::sync::broadcast::Receiver<()>,
447    ) -> Result<Option<HashMap<String, HashMap<i32, i64>>>>
448    where
449        L: crate::snapshot::SnapshotLoader<Key = String, State = InstrumentSnapshotState>,
450    {
451        use crate::snapshot::bootstrap_from_snapshot;
452
453        let offsets = match bootstrap_from_snapshot(loader, self.as_ref()).await {
454            Ok(Some((snapshot_id, offsets))) => {
455                let instruments_count = self.len().await;
456                if instruments_count == 0 {
457                    warn!(
458                        snapshot_id,
459                        "Instruments snapshot restored 0 entries - falling back to database"
460                    );
461                    self.load_instruments_from_db(diesel_db.as_ref()).await?;
462                    self.has_received_update.store(true, Ordering::SeqCst);
463                    None
464                } else {
465                    self.apply_snapshot_offsets(offsets.clone()).await?;
466                    self.has_received_update.store(true, Ordering::SeqCst);
467                    info!(
468                        snapshot_id,
469                        instruments_count, "InstrumentsCache restored from snapshot"
470                    );
471                    Some(offsets)
472                }
473            }
474            Ok(None) => {
475                info!("No instruments snapshot found, loading from database");
476                self.load_instruments_from_db(diesel_db.as_ref()).await?;
477                self.has_received_update.store(true, Ordering::SeqCst);
478                None
479            }
480            Err(e) => {
481                let error_msg = e.to_string();
482                warn!(error = %error_msg, "Failed to restore instruments snapshot, falling back to database");
483                self.load_instruments_from_db(diesel_db.as_ref()).await?;
484                self.has_received_update.store(true, Ordering::SeqCst);
485                None
486            }
487        };
488
489        if offsets.is_none() {
490            self.sync_status.set_ready();
491        }
492
493        self.clone()
494            .start_event_listeners_with_shutdown(event_bus, shutdown_rx.resubscribe())
495            .await?;
496
497        self.clone()
498            .start_periodic_refresh(diesel_db.clone(), shutdown_rx)
499            .await;
500
501        let watchdog_cache = self.clone();
502        let watchdog_db = diesel_db.clone();
503        tokio::spawn(async move {
504            for delay_secs in [30u64, 60] {
505                tokio::time::sleep(Duration::from_secs(delay_secs)).await;
506                let count = watchdog_cache.len().await;
507                if count > 0 {
508                    info!(
509                        count,
510                        delay_secs, "InstrumentsCache watchdog: cache is populated, exiting"
511                    );
512                    return;
513                }
514                warn!(
515                    delay_secs,
516                    "InstrumentsCache watchdog: cache still empty, reloading from DB"
517                );
518                match watchdog_cache.reload_from_db(watchdog_db.as_ref()).await {
519                    Ok(()) => {
520                        let new_count = watchdog_cache.len().await;
521                        info!(
522                            new_count,
523                            delay_secs, "InstrumentsCache watchdog: DB reload complete"
524                        );
525                        if new_count > 0 {
526                            return;
527                        }
528                    }
529                    Err(e) => {
530                        error!(error = %e, delay_secs, "InstrumentsCache watchdog: DB reload failed");
531                    }
532                }
533            }
534            warn!("InstrumentsCache watchdog: cache still empty after all retries");
535        });
536
537        Ok(offsets)
538    }
539
540    async fn load_instruments_from_db(
541        &self,
542        diesel_db: &dyn hypercall_db::BootstrapReader,
543    ) -> Result<()> {
544        use hypercall_types::expiry_date_to_timestamp;
545
546        let db_count: i64 = diesel_db.get_instrument_count().await.unwrap_or(-1);
547
548        if db_count < 0 {
549            error!("Failed to count instruments in database - table may not exist");
550        }
551
552        let records = diesel_db.get_all_instruments().await?;
553
554        // Convert from InstrumentRecord to API Instrument type, then build fresh indexes.
555        // We fully rebuild off-lock and only swap after success, so transient DB
556        // failures cannot poison the live cache with an empty state.
557        let mut instruments: Vec<Instrument> = records
558            .into_iter()
559            .map(|r| {
560                let expiry_ts = expiry_date_to_timestamp(&r.underlying, r.expiry as u64);
561                let option_type_str = match r.option_type {
562                    hypercall_types::OptionType::Call => "call".to_string(),
563                    hypercall_types::OptionType::Put => "put".to_string(),
564                };
565                Instrument {
566                    instrument_id: r.instrument_numeric_id,
567                    id: r.id,
568                    underlying: r.underlying,
569                    strike: r.strike,
570                    expiry: expiry_ts as u64,
571                    option_type: option_type_str,
572                    option_token_address: r.option_token_address,
573                    mark_iv: None,
574                    volume_24h: Decimal::ZERO,
575                    open_interest: Decimal::ZERO,
576                    updated_at: Utc::now(),
577                    status: r.status,
578                    trading_mode: TradingModes::from_db_str(&r.trading_mode),
579                }
580            })
581            .collect();
582
583        // Override status for instruments whose expiry has passed but whose DB
584        // status hasn't been updated yet (startup race with the engine).
585        for inst in &mut instruments {
586            if inst.status.is_active() && is_expiry_passed(inst.expiry) {
587                inst.status = InstrumentStatus::ExpiredPendingPrice;
588            }
589        }
590        let mut rebuilt_instruments_map = HashMap::with_capacity(instruments.len());
591        let mut rebuilt_by_underlying_map: HashMap<String, Vec<String>> = HashMap::new();
592        let mut rebuilt_by_expiry_map: HashMap<(String, u64), Vec<String>> = HashMap::new();
593        let mut rebuilt_by_id_map = HashMap::with_capacity(instruments.len());
594
595        for instrument in instruments {
596            let symbol = instrument.id.clone();
597            let underlying = instrument.underlying.clone();
598            let expiry = instrument.expiry;
599            let instrument_id = instrument.instrument_id;
600
601            rebuilt_instruments_map.insert(symbol.clone(), instrument);
602            rebuilt_by_underlying_map
603                .entry(underlying.clone())
604                .or_default()
605                .push(symbol.clone());
606            rebuilt_by_expiry_map
607                .entry((underlying, expiry))
608                .or_default()
609                .push(symbol.clone());
610            rebuilt_by_id_map.insert(instrument_id, symbol);
611        }
612
613        let mut instruments_map = self.instruments.write().await;
614        let mut by_underlying_map = self.by_underlying.write().await;
615        let mut by_expiry_map = self.by_underlying_expiry.write().await;
616        let mut by_id_map = self.by_instrument_id.write().await;
617
618        *instruments_map = rebuilt_instruments_map;
619        *by_underlying_map = rebuilt_by_underlying_map;
620        *by_expiry_map = rebuilt_by_expiry_map;
621        *by_id_map = rebuilt_by_id_map;
622
623        let total_instruments = instruments_map.len();
624        let total_underlyings = by_underlying_map.len();
625        let underlyings: Vec<String> = by_underlying_map.keys().cloned().collect();
626
627        info!(
628            db_count,
629            total_instruments,
630            total_underlyings,
631            ?underlyings,
632            "InstrumentsCache loaded from database"
633        );
634
635        if total_instruments == 0 && db_count > 0 {
636            warn!(
637                db_count,
638                "Database has instruments but none loaded - possible conversion error"
639            );
640        }
641
642        Ok(())
643    }
644
645    /// Start event listeners for market updates with shutdown support.
646    /// Uses `subscribe_with_offsets` to track offsets for snapshot consistency.
647    async fn start_event_listeners_with_shutdown(
648        self: Arc<Self>,
649        event_bus: Arc<dyn EventBusTrait>,
650        mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
651    ) -> Result<()> {
652        info!("Starting event listener for InstrumentsCache");
653
654        let topics = vec![TOPIC_MARKET_UPDATES.to_string()];
655        let mut receiver = event_bus
656            .subscribe_with_offsets(topics)
657            .await
658            .map_err(|e| anyhow::anyhow!("Failed to subscribe to event topics: {}", e))?;
659
660        // Spawn task to listen for market updates
661        tokio::spawn(async move {
662            loop {
663                tokio::select! {
664                    _ = shutdown_rx.recv() => {
665                        info!("InstrumentsCache event listener received shutdown signal");
666                        break;
667                    }
668                    maybe_envelope = receiver.recv() => {
669                        match maybe_envelope {
670                            Some(envelope) => {
671                                let offset = envelope.offset;
672
673                                match envelope.message {
674                                    EngineMessage::MarketUpdate(market_update) => {
675                                        if let Err(e) = self.handle_market_update(market_update).await {
676                                            error!("Error handling market update: {}", e);
677                                        } else if let Some(off) = offset {
678                                            // Only advance seq after successful processing
679                                            // so failed messages are retried on restart.
680                                            self.update_seq(off).await;
681                                        }
682                                    }
683                                    other => {
684                                        debug!("InstrumentsCache ignoring non-MarketUpdate message: {:?}", std::mem::discriminant(&other));
685                                    }
686                                }
687                            }
688                            None => break, // Channel closed
689                        }
690                    }
691                }
692            }
693            info!("InstrumentsCache event listener stopped");
694        });
695
696        info!("Event listener started for InstrumentsCache");
697        Ok(())
698    }
699
700    /// Handle market update messages (create, delete, expire)
701    /// Public to allow catchup replay from saved offsets.
702    pub async fn handle_market_update(&self, message: MarketUpdateMessage) -> Result<()> {
703        match message.status {
704            MarketUpdateStatus::MarketCreated | MarketUpdateStatus::MarketAlreadyExists => {
705                self.add_instrument(&message.market).await?;
706                info!("Added instrument to cache: {}", message.market.symbol);
707            }
708            MarketUpdateStatus::MarketDeleted
709            | MarketUpdateStatus::MarketExpired
710            | MarketUpdateStatus::MarketPendingSettlement => {
711                self.remove_instrument(&message.market.symbol).await?;
712                info!("Removed instrument from cache: {}", message.market.symbol);
713            }
714            MarketUpdateStatus::MarketCreationFailed | MarketUpdateStatus::MarketDeletionFailed => {
715                // Market lifecycle update failed, cache remains unchanged.
716                debug!(
717                    "Market lifecycle update failed for {}: not mutating cache",
718                    message.market.symbol
719                );
720            }
721        }
722        Ok(())
723    }
724
725    /// Add a new instrument to the cache
726    async fn add_instrument(&self, market: &Market) -> Result<()> {
727        use hypercall_types::expiry_date_to_timestamp;
728
729        let symbol = market.symbol.clone();
730        let underlying = market.underlying.clone();
731        // market.expiry is YYYYMMDD code from CatalogManager, convert to Unix timestamp
732        let expiry = expiry_date_to_timestamp(&underlying, market.expiry) as u64;
733
734        // Generate instrument_id (temporary until persisted IDs are plumbed through)
735        let instruments_map = self.instruments.read().await;
736        let max_id = instruments_map
737            .values()
738            .map(|i| i.instrument_id)
739            .max()
740            .unwrap_or(0);
741        drop(instruments_map);
742
743        let option_type = match market.option_type {
744            MessageOptionType::Call => "call".to_string(),
745            MessageOptionType::Put => "put".to_string(),
746        };
747        let option_token_address = Some(derive_option_token_address(
748            &underlying,
749            market.expiry,
750            market.strike,
751            &option_type,
752        )?);
753
754        let instrument = Instrument {
755            instrument_id: max_id + 1,
756            id: symbol.clone(),
757            underlying: underlying.clone(),
758            strike: market.strike,
759            expiry,
760            option_type,
761            option_token_address,
762            mark_iv: None,
763            volume_24h: dec!(0),
764            open_interest: dec!(0),
765            updated_at: Utc::now(),
766            status: InstrumentStatus::Active,
767            trading_mode: TradingModes::ORDERBOOK,
768        };
769        let instrument_id = instrument.instrument_id;
770
771        // Insert into primary storage
772        let mut instruments_map = self.instruments.write().await;
773        instruments_map.insert(symbol.clone(), instrument);
774        drop(instruments_map);
775
776        // Update by_underlying index
777        let mut by_underlying_map = self.by_underlying.write().await;
778        by_underlying_map
779            .entry(underlying.clone())
780            .or_insert_with(Vec::new)
781            .push(symbol.clone());
782        drop(by_underlying_map);
783
784        // Update by_underlying_expiry index
785        let mut by_expiry_map = self.by_underlying_expiry.write().await;
786        by_expiry_map
787            .entry((underlying, expiry))
788            .or_insert_with(Vec::new)
789            .push(symbol.clone());
790        drop(by_expiry_map);
791
792        let mut by_id_map = self.by_instrument_id.write().await;
793        by_id_map.insert(instrument_id, symbol.clone());
794
795        Ok(())
796    }
797
798    /// Remove an instrument from the cache
799    async fn remove_instrument(&self, symbol: &str) -> Result<()> {
800        // Get instrument details before removing
801        let instruments_map = self.instruments.read().await;
802        let instrument = match instruments_map.get(symbol) {
803            Some(i) => i.clone(),
804            None => return Ok(()), // Already removed
805        };
806        drop(instruments_map);
807
808        let underlying = instrument.underlying.clone();
809        let expiry = instrument.expiry;
810        let instrument_id = instrument.instrument_id;
811
812        // Remove from primary storage
813        let mut instruments_map = self.instruments.write().await;
814        instruments_map.remove(symbol);
815        drop(instruments_map);
816
817        // Remove from by_underlying index
818        let mut by_underlying_map = self.by_underlying.write().await;
819        if let Some(symbols) = by_underlying_map.get_mut(&underlying) {
820            symbols.retain(|s| s != symbol);
821            if symbols.is_empty() {
822                by_underlying_map.remove(&underlying);
823            }
824        }
825        drop(by_underlying_map);
826
827        // Remove from by_underlying_expiry index
828        let mut by_expiry_map = self.by_underlying_expiry.write().await;
829        if let Some(symbols) = by_expiry_map.get_mut(&(underlying.clone(), expiry)) {
830            symbols.retain(|s| s != symbol);
831            if symbols.is_empty() {
832                by_expiry_map.remove(&(underlying, expiry));
833            }
834        }
835        drop(by_expiry_map);
836
837        let mut by_id_map = self.by_instrument_id.write().await;
838        by_id_map.remove(&instrument_id);
839
840        Ok(())
841    }
842
843    /// Get instrument by symbol
844    pub async fn get_by_symbol(&self, symbol: &str) -> Option<Instrument> {
845        let instruments = self.instruments.read().await;
846        instruments.get(symbol).cloned()
847    }
848
849    /// Synchronously check whether an instrument's trading mode includes RFQ.
850    /// Returns false for unknown instruments so callers default to rejecting.
851    pub fn allows_rfq(&self, symbol: &str) -> bool {
852        self.instruments
853            .try_read()
854            .map(|guard| {
855                guard
856                    .get(symbol)
857                    .map(|i| i.trading_mode.allows_rfq())
858                    .unwrap_or(false)
859            })
860            .unwrap_or(false)
861    }
862
863    /// Get all instruments for a given underlying
864    pub async fn get_by_underlying(&self, underlying: &str) -> Vec<Instrument> {
865        let by_underlying = self.by_underlying.read().await;
866        let available: Vec<&String> = by_underlying.keys().collect();
867        let cache_total = self.instruments.read().await.len();
868
869        let symbols = match by_underlying.get(underlying) {
870            Some(s) => s.clone(),
871            None => {
872                warn!(
873                    underlying,
874                    ?available,
875                    cache_total,
876                    "No instruments found for underlying"
877                );
878                return Vec::new();
879            }
880        };
881        drop(by_underlying);
882
883        let instruments = self.instruments.read().await;
884        let result: Vec<Instrument> = symbols
885            .iter()
886            .filter_map(|symbol| instruments.get(symbol).cloned())
887            .collect();
888
889        result
890    }
891
892    /// Get all instruments for a given underlying and expiry
893    pub async fn get_by_underlying_and_expiry(
894        &self,
895        underlying: &str,
896        expiry: u64,
897    ) -> Vec<Instrument> {
898        let by_expiry = self.by_underlying_expiry.read().await;
899        let symbols = match by_expiry.get(&(underlying.to_string(), expiry)) {
900            Some(s) => s.clone(),
901            None => return Vec::new(),
902        };
903        drop(by_expiry);
904
905        let instruments = self.instruments.read().await;
906        symbols
907            .iter()
908            .filter_map(|symbol| instruments.get(symbol).cloned())
909            .collect()
910    }
911
912    /// Get instrument by instrument id
913    pub async fn get_by_instrument_id(&self, instrument_id: i32) -> Option<Instrument> {
914        let by_id = self.by_instrument_id.read().await;
915        let symbol = match by_id.get(&instrument_id) {
916            Some(symbol) => symbol.clone(),
917            None => return None,
918        };
919        drop(by_id);
920
921        let instruments = self.instruments.read().await;
922        instruments.get(&symbol).cloned()
923    }
924
925    /// Get all instruments (for debugging/admin)
926    pub async fn get_all(&self) -> Vec<Instrument> {
927        let instruments = self.instruments.read().await;
928        instruments.values().cloned().collect()
929    }
930
931    /// Get count of cached instruments
932    pub async fn len(&self) -> usize {
933        let instruments = self.instruments.read().await;
934        instruments.len()
935    }
936
937    /// Check if cache is empty
938    pub async fn is_empty(&self) -> bool {
939        let instruments = self.instruments.read().await;
940        instruments.is_empty()
941    }
942
943    pub async fn reload_from_db(
944        &self,
945        diesel_db: &dyn hypercall_db::BootstrapReader,
946    ) -> Result<()> {
947        self.load_instruments_from_db(diesel_db).await
948    }
949
950    /// Add an instrument directly to the cache (used during snapshot restore).
951    async fn add_instrument_direct(&self, instrument: Instrument) -> Result<()> {
952        let symbol = instrument.id.clone();
953        let underlying = instrument.underlying.clone();
954        let expiry = instrument.expiry;
955        let instrument_id = instrument.instrument_id;
956
957        // Insert into primary storage
958        let mut instruments_map = self.instruments.write().await;
959        instruments_map.insert(symbol.clone(), instrument);
960        drop(instruments_map);
961
962        // Update by_underlying index
963        let mut by_underlying_map = self.by_underlying.write().await;
964        by_underlying_map
965            .entry(underlying.clone())
966            .or_insert_with(Vec::new)
967            .push(symbol.clone());
968        drop(by_underlying_map);
969
970        // Update by_underlying_expiry index
971        let mut by_expiry_map = self.by_underlying_expiry.write().await;
972        by_expiry_map
973            .entry((underlying, expiry))
974            .or_insert_with(Vec::new)
975            .push(symbol.clone());
976        drop(by_expiry_map);
977
978        let mut by_id_map = self.by_instrument_id.write().await;
979        by_id_map.insert(instrument_id, symbol);
980
981        Ok(())
982    }
983}
984
985#[async_trait]
986impl Snapshotable for InstrumentsCache {
987    type Key = String;
988    type State = InstrumentSnapshotState;
989
990    /// Returns a cloned snapshot of all instrument entries.
991    ///
992    /// CRITICAL: We must capture the state AND the sequence atomically.
993    /// This ensures snapshot_offsets() returns the same seq that was captured with the state.
994    async fn list_all(&self) -> Result<HashMap<Self::Key, Self::State>, SnapshotError> {
995        // Acquire BOTH locks to ensure atomic snapshot
996        let instruments = self.instruments.read().await;
997        let seq = *self.last_market_update_seq.read().await;
998
999        // Store the captured seq for snapshot_offsets() to use
1000        {
1001            let mut snapshot_seq = self.last_snapshot_seq.write().await;
1002            *snapshot_seq = seq;
1003        }
1004
1005        let mut states = HashMap::with_capacity(instruments.len());
1006        for (symbol, instrument) in instruments.iter() {
1007            states.insert(symbol.clone(), InstrumentSnapshotState::from(instrument));
1008        }
1009
1010        Ok(states)
1011    }
1012
1013    async fn restore(&self, key: &Self::Key, state: Self::State) -> Result<(), SnapshotError> {
1014        let instrument: Instrument = state.into_instrument();
1015
1016        // Verify key matches
1017        if &instrument.id != key {
1018            return Err(SnapshotError::Serialization(format!(
1019                "Key mismatch: expected {}, got {}",
1020                key, instrument.id
1021            )));
1022        }
1023
1024        self.add_instrument_direct(instrument)
1025            .await
1026            .map_err(|e| SnapshotError::DbError(e.to_string()))
1027    }
1028
1029    async fn clear_all(&self) -> Result<(), SnapshotError> {
1030        self.instruments.write().await.clear();
1031        self.by_underlying.write().await.clear();
1032        self.by_underlying_expiry.write().await.clear();
1033        self.by_instrument_id.write().await.clear();
1034        *self.last_market_update_seq.write().await = 0;
1035        self.has_received_update.store(false, Ordering::SeqCst);
1036        Ok(())
1037    }
1038}
1039
1040#[cfg(test)]
1041mod tests {
1042    use super::*;
1043
1044    #[tokio::test]
1045    async fn reload_from_db_keeps_existing_cache_when_db_load_fails() {
1046        let cache = InstrumentsCache::new();
1047
1048        cache
1049            .add_instrument_direct(Instrument {
1050                instrument_id: 1,
1051                id: "BTC-20270101-100000-C".to_string(),
1052                underlying: "BTC".to_string(),
1053                strike: dec!(100000),
1054                expiry: 1798761600,
1055                option_type: "call".to_string(),
1056                option_token_address: None,
1057                mark_iv: None,
1058                volume_24h: dec!(0),
1059                open_interest: dec!(0),
1060                updated_at: Utc::now(),
1061                status: InstrumentStatus::Active,
1062                trading_mode: TradingModes::ORDERBOOK,
1063            })
1064            .await
1065            .expect("seed instrument should be inserted");
1066
1067        let before = cache.len().await;
1068        assert_eq!(before, 1, "expected seeded cache size");
1069
1070        // Invalid endpoint forces DB load failure. On failure, cache must remain
1071        // unchanged instead of being cleared.
1072        let bad_db = hypercall_db_diesel::DieselDb::new_no_tls(
1073            "postgresql://invalid:invalid@127.0.0.1:1/invalid",
1074            1,
1075        )
1076        .await
1077        .expect("lazy pool should be constructed");
1078
1079        let reload_result = cache.reload_from_db(&bad_db).await;
1080        assert!(
1081            reload_result.is_err(),
1082            "reload should fail against an invalid DB endpoint"
1083        );
1084
1085        let after = cache.len().await;
1086        assert_eq!(
1087            after, before,
1088            "failed reload must not clear previously cached instruments"
1089        );
1090    }
1091}