Skip to main content

hypercall/catalog_manager/
manager.rs

1//! CatalogManager - Main reconciliation loop.
2//!
3//! Runs at a configurable interval and reconciles desired markets/instruments
4//! against actual database state. Uses advisory locks for distributed safety.
5
6use anyhow::{Context, Result};
7use hypercall_db::CatalogReader;
8use hypercall_db_diesel::DieselDb;
9use metrics::{counter, gauge};
10use std::collections::HashMap;
11use std::sync::Arc;
12use std::time::Duration;
13use tokio::sync::mpsc;
14use tokio::time::{interval, Instant};
15use tracing::{debug, error, info, warn};
16
17use crate::price_oracle::hyperliquid_oracle::HyperliquidMarkPriceOracle;
18use crate::rsm::unified_engine::MarketRequest;
19use crate::shared::traits::MarkPriceOracle;
20use catalog_manager::{
21    format_symbol, generate_expiry_schedule_at_date, generate_strike_set_at_time,
22    plan_extension_at_time, timestamp_to_code, CatalogConfig, ExpiryInfo, ExtensionRequest,
23};
24use hypercall_types::MarketActionMessage;
25
26/// `watch`-channel payload published to the engine whenever the catalog
27/// manager rewrites any `instruments.trading_mode` row. Keyed on
28/// underlying — the engine calls
29/// `UnifiedEngine::apply_underlying_trading_mode_update` with this map
30/// to refresh its in-memory `instrument_trading_modes` cache.
31pub type TradingModeUpdate = std::collections::HashMap<String, hypercall_types::TradingModes>;
32
33/// CatalogManager handles continuous market/instrument reconciliation
34pub struct CatalogManager {
35    /// Async Diesel database connection pool
36    diesel_db: Arc<DieselDb>,
37    /// Catalog configuration
38    config: CatalogConfig,
39    /// Channel to send market creation requests (uses MarketRequest with response channel)
40    market_sender: mpsc::Sender<MarketRequest>,
41    /// Mark price oracles for spot prices
42    mark_price_oracles: HashMap<String, Arc<HyperliquidMarkPriceOracle>>,
43    /// Reconciliation interval
44    interval_secs: u64,
45    /// Live notify channel to the engine main loop. Every time
46    /// `sync_trading_modes_to_db` rewrites any row we publish the full
47    /// current `underlying -> TradingModes` map here. `watch` semantics
48    /// mean the engine only observes the latest value, which is what
49    /// we want -- we never care about superseded updates.
50    trading_mode_notify: Option<tokio::sync::watch::Sender<TradingModeUpdate>>,
51}
52
53impl CatalogManager {
54    /// Create a new CatalogManager
55    pub fn new(
56        diesel_db: Arc<DieselDb>,
57        config: CatalogConfig,
58        market_sender: mpsc::Sender<MarketRequest>,
59        mark_price_oracles: HashMap<String, Arc<HyperliquidMarkPriceOracle>>,
60        interval_secs: u64,
61    ) -> Self {
62        Self {
63            diesel_db,
64            config,
65            market_sender,
66            mark_price_oracles,
67            interval_secs,
68            trading_mode_notify: None,
69        }
70    }
71
72    /// Attach a `tokio::sync::watch::Sender` that this catalog manager
73    /// will publish `underlying → trading_mode` updates on whenever a
74    /// periodic sync rewrites any row. Optional — call sites that don't
75    /// need live notifications can omit this and the catalog manager
76    /// behaves exactly as before (warn-only drift).
77    pub fn with_trading_mode_notify(
78        mut self,
79        sender: tokio::sync::watch::Sender<TradingModeUpdate>,
80    ) -> Self {
81        self.trading_mode_notify = Some(sender);
82        self
83    }
84
85    /// Execute one reconciliation tick
86    async fn reconcile_tick(&self) -> Result<()> {
87        let start = Instant::now();
88        counter!("catalog_manager_ticks_total").increment(1);
89
90        // Use transaction-scoped advisory lock: acquire on a dedicated connection,
91        // hold it during reconciliation, then let it auto-release when dropped.
92        // We use try_advisory_lock (session-level) on a connection we keep alive
93        // during the entire reconciliation to prevent other instances from running.
94        //
95        // IMPORTANT: This connection is held for the duration of reconciliation.
96        // All reconciliation queries use separate pool connections (DieselDb methods
97        // each checkout their own). Ensure the pool has at least 2 async connections.
98        let mut lock_conn = self
99            .diesel_db
100            .get_conn()
101            .await
102            .context("Failed to acquire DB connection for advisory lock")?;
103
104        let acquired = self
105            .diesel_db
106            .try_acquire_advisory_lock_on_conn(
107                &mut lock_conn,
108                super::CATALOG_MANAGER_ADVISORY_LOCK_ID,
109            )
110            .await?;
111        if !acquired {
112            debug!("Another CatalogManager instance holds the lock, skipping this tick");
113            return Ok(());
114        }
115
116        let result = self.do_reconciliation().await;
117
118        if let Err(e) = self
119            .diesel_db
120            .release_advisory_lock_on_conn(&mut lock_conn, super::CATALOG_MANAGER_ADVISORY_LOCK_ID)
121            .await
122        {
123            warn!("Failed to release advisory lock: {}", e);
124        }
125
126        let elapsed = start.elapsed();
127        info!("CatalogManager tick completed in {:?}", elapsed);
128
129        // Only update success timestamp on successful reconciliation
130        if result.is_ok() {
131            gauge!("catalog_manager_last_success_timestamp_seconds")
132                .set(chrono::Utc::now().timestamp() as f64);
133        }
134
135        result
136    }
137
138    /// Perform the actual reconciliation work
139    async fn do_reconciliation(&self) -> Result<()> {
140        let expiry_times = self.config.expiry_times()?;
141        let today = chrono::Utc::now().date_naive();
142
143        // Process each underlying with its own expiry schedule: the dates
144        // are usually shared policy, with per-underlying overrides for bespoke
145        // listings like SPCX.
146        for (underlying, underlying_config) in &self.config.underlyings {
147            info!("Processing underlying: {}", underlying);
148
149            let schedule_config = underlying_config
150                .schedule
151                .as_ref()
152                .unwrap_or(&self.config.expiry.schedule);
153            let schedule = generate_expiry_schedule_at_date(
154                today,
155                schedule_config.daily_count,
156                schedule_config.weekly_count,
157                schedule_config.monthly_count,
158                schedule_config.weekdays_only,
159                expiry_times.for_underlying(underlying),
160            )?;
161
162            info!(
163                "Target expiry schedule for {}: {} expiries",
164                underlying,
165                schedule.expiries.len()
166            );
167
168            // Get spot price from oracle
169            let spot_price = match self.get_spot_price(underlying).await {
170                Some(price) => price,
171                None => {
172                    warn!("No spot price for {}, skipping", underlying);
173                    continue;
174                }
175            };
176
177            info!("{} spot price: {}", underlying, spot_price);
178
179            let max_expiry_code = underlying_config.max_expiry_code;
180
181            for expiry_info in &schedule.expiries {
182                if let Some(cap) = max_expiry_code {
183                    if expiry_info.code > cap {
184                        continue;
185                    }
186                }
187                if let Err(e) = self
188                    .process_expiry(underlying, expiry_info, spot_price)
189                    .await
190                {
191                    error!(
192                        "Failed to process {}-{}: {:?}",
193                        underlying, expiry_info.code, e
194                    );
195                    counter!("catalog_manager_errors_total").increment(1);
196                }
197            }
198        }
199
200        Ok(())
201    }
202
203    /// Process a single expiry for an underlying
204    async fn process_expiry(
205        &self,
206        underlying: &str,
207        expiry_info: &ExpiryInfo,
208        spot_price: f64,
209    ) -> Result<()> {
210        let expiry_timestamp = expiry_info.timestamp;
211        // DB stores expiry as YYYYMMDD code, not Unix timestamp
212        let expiry_code = expiry_info.code as i64;
213
214        // Check if market exists (using YYYYMMDD code for DB query)
215        let market_exists = self
216            .diesel_db
217            .market_exists(underlying, expiry_code)
218            .await?;
219
220        // Get or create catalog state (using YYYYMMDD code for DB query)
221        let catalog_state = self
222            .diesel_db
223            .get_catalog_listing_state(underlying, expiry_code)
224            .await?;
225
226        let (ref_price, strike_reference_timestamp_secs) = if let Some(state) = &catalog_state {
227            // Use stored ref price for baseline grid stability
228            match state.ref_price_at_listing.to_string().parse::<f64>() {
229                Ok(price) if price > 0.0 => (price, state.listed_at.div_euclid(1000)),
230                Ok(price) => {
231                    anyhow::bail!(
232                        "Invalid stored ref_price {} for {}/{}",
233                        price,
234                        underlying,
235                        expiry_info.code
236                    );
237                }
238                Err(e) => {
239                    anyhow::bail!(
240                        "Failed to parse stored ref_price '{}' for {}/{}: {}",
241                        state.ref_price_at_listing,
242                        underlying,
243                        expiry_info.code,
244                        e
245                    );
246                }
247            }
248        } else {
249            // New market, use current spot as ref price
250            (spot_price, chrono::Utc::now().timestamp())
251        };
252
253        // Generate strike set based on ref price (for baseline stability)
254        let strike_set = generate_strike_set_at_time(
255            underlying,
256            ref_price,
257            expiry_timestamp,
258            &self.config.strike_selection,
259            strike_reference_timestamp_secs,
260        )?;
261
262        debug!(
263            "Generated {} strikes for {}-{} (ref_price={}, policy={:?})",
264            strike_set.strikes.len(),
265            underlying,
266            expiry_info.code,
267            ref_price,
268            strike_set.policy
269        );
270
271        // Create market if it doesn't exist
272        if !market_exists {
273            info!("Creating market {}-{}", underlying, expiry_info.code);
274
275            // Get first strike safely
276            let first_strike = strike_set.strikes.first().copied().ok_or_else(|| {
277                anyhow::anyhow!(
278                    "No strikes generated for {}-{}",
279                    underlying,
280                    expiry_info.code
281                )
282            })?;
283
284            // Create market via engine channel
285            self.create_market(underlying, expiry_timestamp, first_strike, true)
286                .await
287                .context("Failed to create market")?;
288
289            counter!("catalog_manager_markets_created_total").increment(1);
290
291            // Insert catalog state (using YYYYMMDD code)
292            self.diesel_db
293                .insert_catalog_listing_state(
294                    underlying,
295                    expiry_code,
296                    spot_price,
297                    self.config.version as i32,
298                )
299                .await?;
300        }
301
302        // Get existing instruments for this market (using YYYYMMDD code)
303        let existing_strikes = self
304            .diesel_db
305            .get_existing_strikes(underlying, expiry_code)
306            .await?;
307
308        // Build the union of baseline grid strikes and already-existing
309        // strikes (which includes prior extension strikes). This ensures
310        // that if an extension strike had only one side created, the
311        // missing call/put is retried here rather than falling through
312        // the cracks: the extension planner filters by strike value, so
313        // a partially-present extension strike is excluded from
314        // plan.new_strikes.
315        let mut all_strikes = strike_set.strikes.clone();
316        for &es in &existing_strikes {
317            if !all_strikes.iter().any(|&s| (s - es).abs() < 1e-6) {
318                all_strikes.push(es);
319            }
320        }
321        all_strikes.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
322
323        let mut instruments_created = 0;
324        let mut instruments_failed = 0;
325        for &strike in &all_strikes {
326            for is_call in [true, false] {
327                let symbol = format_symbol(underlying, expiry_info.code, strike, is_call);
328
329                if !self.diesel_db.instrument_exists(&symbol).await? {
330                    match self
331                        .create_market(underlying, expiry_timestamp, strike, is_call)
332                        .await
333                    {
334                        Ok(()) => {
335                            instruments_created += 1;
336                        }
337                        Err(e) => {
338                            warn!("Failed to create instrument {}: {}; continuing", symbol, e);
339                            instruments_failed += 1;
340                        }
341                    }
342                }
343            }
344        }
345
346        if instruments_created > 0 || instruments_failed > 0 {
347            info!(
348                "Created {} instruments for {}-{} ({} failed)",
349                instruments_created, underlying, expiry_info.code, instruments_failed
350            );
351            counter!("catalog_manager_instruments_created_total").increment(instruments_created);
352            if instruments_failed > 0 {
353                counter!("catalog_manager_instrument_creation_failures_total")
354                    .increment(instruments_failed);
355            }
356        }
357
358        // Check for extension if policy is enabled
359        if self.config.extension_policy.enabled {
360            if let Some(state) = &catalog_state {
361                // Check cooldown
362                let now_ms = chrono::Utc::now().timestamp_millis();
363                let cooldown_ms = (self.config.extension_policy.cooldown_secs * 1000) as i64;
364                let last_ext = state.last_extension_at.unwrap_or(0);
365
366                if now_ms - last_ext >= cooldown_ms {
367                    // Plan extension
368                    let plan = plan_extension_at_time(
369                        ExtensionRequest {
370                            underlying,
371                            current_spot: spot_price,
372                            ref_price_at_listing: ref_price,
373                            expiry_timestamp,
374                            existing_strikes: &existing_strikes,
375                            reference_timestamp_secs: strike_reference_timestamp_secs,
376                        },
377                        &self.config.strike_selection,
378                        &self.config.extension_policy,
379                    )?;
380
381                    if plan.needs_extension {
382                        info!(
383                            "Extension needed for {}-{}: {:?}",
384                            underlying, expiry_info.code, plan.reason
385                        );
386
387                        let mut ext_created = 0;
388                        for &strike in &plan.new_strikes {
389                            for is_call in [true, false] {
390                                let symbol =
391                                    format_symbol(underlying, expiry_info.code, strike, is_call);
392
393                                if !self.diesel_db.instrument_exists(&symbol).await? {
394                                    match self
395                                        .create_market(
396                                            underlying,
397                                            expiry_timestamp,
398                                            strike,
399                                            is_call,
400                                        )
401                                        .await
402                                    {
403                                        Ok(()) => {
404                                            ext_created += 1;
405                                        }
406                                        Err(e) => {
407                                            warn!(
408                                                "Failed to create extension instrument {}: {}; continuing",
409                                                symbol, e
410                                            );
411                                        }
412                                    }
413                                }
414                            }
415                        }
416
417                        if ext_created > 0 {
418                            info!(
419                                "Extension created {} instruments for {}-{}",
420                                ext_created, underlying, expiry_info.code
421                            );
422                            counter!("catalog_manager_extensions_total").increment(1);
423                            counter!("catalog_manager_instruments_created_total")
424                                .increment(ext_created);
425
426                            // Update extension state (using YYYYMMDD code)
427                            self.diesel_db
428                                .update_extension_state(underlying, expiry_code, spot_price)
429                                .await?;
430                        }
431                    }
432                } else {
433                    debug!(
434                        "Extension cooldown active for {}-{} ({} ms remaining)",
435                        underlying,
436                        expiry_info.code,
437                        cooldown_ms - (now_ms - last_ext)
438                    );
439                }
440            }
441        }
442
443        // Sync `trading_mode` from catalog config to DB for every
444        // underlying, including the default case. Gating on `!= default`
445        // would skip the rollback scenario where an underlying flips
446        // from `rfq` / `orderbook|rfq` back to plain `orderbook`: the
447        // catalog config would carry the new value but `instruments.
448        // trading_mode` would remain stale, and execution gating
449        // (`allows_rfq` / `allows_orderbook`) would keep the old mode.
450        // The `UPDATE ... WHERE trading_mode != $1` clause already
451        // makes the write a no-op on rows that already match, so
452        // running this unconditionally has no extra write cost.
453        //
454        // KNOWN LIMITATION: the engine's in-memory
455        // `instrument_trading_modes` map is only populated on
456        // recovery + CreateMarket paths and is NOT refreshed from
457        // this DB update. After a catalog mode change here, the
458        // running engine keeps applying the old mode until restart,
459        // so admission for orders / RFQs against the mutated
460        // instruments can diverge from the new catalog config. When
461        // we detect that a row was actually changed, emit a loud
462        // WARN so operators know a restart is needed to pick up the
463        // new mode. TODO(CALL-RFQ-MODE-LIVE): plumb a Notify from
464        // here into the engine main loop to refresh
465        // `instrument_trading_modes` from DB on change.
466        let mut any_row_changed = false;
467        for (underlying, config) in &self.config.underlyings {
468            let mode_str = config.trading_mode.as_db_str();
469            match self
470                .diesel_db
471                .update_trading_mode_count(underlying, &mode_str)
472                .await
473            {
474                Ok(rows) => {
475                    if rows > 0 {
476                        any_row_changed = true;
477                        tracing::info!(
478                            underlying,
479                            trading_mode = %mode_str,
480                            rows_changed = rows,
481                            "trading_mode changed in DB; publishing live update to engine via notify channel"
482                        );
483                    }
484                }
485                Err(e) => {
486                    tracing::warn!(
487                        underlying,
488                        trading_mode = %mode_str,
489                        "Failed to sync trading_mode: {}",
490                        e
491                    );
492                }
493            }
494        }
495
496        // If anything actually changed, publish the full current
497        // underlying -> TradingModes map on the watch channel. We always
498        // send the full map (not a delta) so the engine has no state
499        // machine to reconcile -- it just replaces its view with what
500        // the catalog says.
501        if any_row_changed {
502            if let Some(sender) = &self.trading_mode_notify {
503                let current: TradingModeUpdate = self
504                    .config
505                    .underlyings
506                    .iter()
507                    .map(|(u, c)| (u.clone(), c.trading_mode))
508                    .collect();
509                // send_replace is the right watch primitive -- we want
510                // the latest value, and we don't care if no receivers
511                // are currently observing (it'll still be stored).
512                let _ = sender.send(current);
513            }
514        }
515
516        Ok(())
517    }
518
519    /// Get spot price from oracle
520    async fn get_spot_price(&self, underlying: &str) -> Option<f64> {
521        let oracle = self.mark_price_oracles.get(underlying)?;
522        oracle.get_spot_price().await
523    }
524
525    /// Create a market/instrument via engine channel
526    async fn create_market(
527        &self,
528        underlying: &str,
529        expiry: i64,
530        strike: f64,
531        is_call: bool,
532    ) -> Result<()> {
533        use hypercall_types::{Market, MarketAction, OptionType};
534        use rust_decimal::Decimal;
535
536        let expiry_code = timestamp_to_code(expiry)
537            .ok_or_else(|| anyhow::anyhow!("Invalid expiry timestamp: {}", expiry))?;
538        let symbol = format_symbol(underlying, expiry_code, strike, is_call);
539
540        let option_type = if is_call {
541            OptionType::Call
542        } else {
543            OptionType::Put
544        };
545
546        let strike_decimal = Decimal::from_f64_retain(strike)
547            .ok_or_else(|| anyhow::anyhow!("Invalid strike: {}", strike))?;
548
549        let market = Market {
550            symbol: symbol.clone(),
551            underlying: underlying.to_string(),
552            expiry: expiry_code as u64, // Send YYYYMMDD code, not Unix timestamp
553            strike: strike_decimal,
554            option_type,
555        };
556
557        let message = MarketActionMessage {
558            market,
559            action: MarketAction::CreateMarket,
560            timestamp: chrono::Utc::now().timestamp_millis() as u64,
561        };
562
563        // Create a one-shot channel for the response
564        let (response_tx, mut response_rx) = mpsc::channel(1);
565
566        let request = MarketRequest {
567            message,
568            response_tx,
569        };
570
571        self.market_sender
572            .send(request)
573            .await
574            .map_err(|e| anyhow::anyhow!("Failed to send market request: {}", e))?;
575
576        // Wait for response (with timeout)
577        match tokio::time::timeout(Duration::from_secs(10), response_rx.recv()).await {
578            Ok(Some(response)) => {
579                use hypercall_types::MarketUpdateStatus;
580                match response.status {
581                    MarketUpdateStatus::MarketCreated => {
582                        // Success - instrument created
583                    }
584                    MarketUpdateStatus::MarketAlreadyExists => {
585                        // Idempotent - market already exists, this is fine
586                    }
587                    MarketUpdateStatus::MarketCreationFailed => {
588                        let error_msg = if let Some(ref reason) = response.reason {
589                            format!("Market creation failed for {}: {}", symbol, reason)
590                        } else {
591                            format!("Market creation failed for {} (no reason provided)", symbol)
592                        };
593                        return Err(anyhow::anyhow!(error_msg));
594                    }
595                    status => {
596                        return Err(anyhow::anyhow!(
597                            "Unexpected market creation status {:?} for {}",
598                            status,
599                            symbol
600                        ));
601                    }
602                }
603            }
604            Ok(None) => {
605                return Err(anyhow::anyhow!(
606                    "Market response channel closed for {}",
607                    symbol
608                ));
609            }
610            Err(_) => {
611                return Err(anyhow::anyhow!(
612                    "Timeout waiting for market creation response for {}",
613                    symbol
614                ));
615            }
616        }
617
618        Ok(())
619    }
620}
621
622#[async_trait::async_trait]
623impl crate::shared::service::Service for CatalogManager {
624    fn name(&self) -> &'static str {
625        "CatalogManager"
626    }
627
628    fn owner(&self) -> crate::shared::service::ServiceOwner {
629        crate::shared::service::ServiceOwner::Engine
630    }
631
632    async fn run(
633        self: std::sync::Arc<Self>,
634        mut shutdown: crate::shared::ShutdownRx,
635    ) -> anyhow::Result<()> {
636        info!(
637            "Starting CatalogManager with interval={}s, version={}",
638            self.interval_secs, self.config.version
639        );
640
641        let mut tick_interval = interval(Duration::from_secs(self.interval_secs));
642
643        if let Err(e) = self.reconcile_tick().await {
644            error!("Initial reconciliation failed: {}", e);
645            counter!("catalog_manager_errors_total").increment(1);
646        }
647
648        loop {
649            tokio::select! {
650                _ = tick_interval.tick() => {
651                    if let Err(e) = self.reconcile_tick().await {
652                        error!("Reconciliation tick failed: {}", e);
653                        counter!("catalog_manager_errors_total").increment(1);
654                    }
655                }
656                _ = shutdown.recv() => {
657                    info!("CatalogManager received shutdown signal");
658                    break;
659                }
660            }
661        }
662        Ok(())
663    }
664}
665
666#[cfg(test)]
667mod tests {
668    use super::*;
669
670    // =========================================================================
671    // format_symbol tests
672    // =========================================================================
673
674    #[test]
675    fn test_format_symbol_basic_call() {
676        assert_eq!(
677            format_symbol("BTC", 20260110, 95000.0, true),
678            "BTC-20260110-95000-C"
679        );
680    }
681
682    #[test]
683    fn test_format_symbol_basic_put() {
684        assert_eq!(
685            format_symbol("ETH", 20260227, 4000.0, false),
686            "ETH-20260227-4000-P"
687        );
688    }
689
690    #[test]
691    fn test_format_symbol_with_decimal_strike() {
692        // Strike with one decimal place
693        assert_eq!(
694            format_symbol("BTC", 20260315, 97500.5, true),
695            "BTC-20260315-97500.5-C"
696        );
697
698        // Strike with two decimal places
699        assert_eq!(
700            format_symbol("ETH", 20260401, 3456.25, false),
701            "ETH-20260401-3456.25-P"
702        );
703    }
704
705    #[test]
706    fn test_format_symbol_rounds_integer_strikes() {
707        // Strike that should round cleanly
708        assert_eq!(
709            format_symbol("BTC", 20260110, 100000.0, true),
710            "BTC-20260110-100000-C"
711        );
712
713        // Strike with tiny floating point error should still format as integer
714        assert_eq!(
715            format_symbol("BTC", 20260110, 100000.0000001, true),
716            "BTC-20260110-100000-C"
717        );
718    }
719
720    #[test]
721    fn test_format_symbol_trailing_zeros_stripped() {
722        // 3500.50 should become 3500.5, not 3500.50
723        assert_eq!(
724            format_symbol("ETH", 20260110, 3500.50, true),
725            "ETH-20260110-3500.5-C"
726        );
727    }
728
729    #[test]
730    fn test_format_symbol_small_strikes() {
731        // Small strike values (ETH-like)
732        assert_eq!(
733            format_symbol("ETH", 20260110, 3000.0, true),
734            "ETH-20260110-3000-C"
735        );
736        assert_eq!(
737            format_symbol("ETH", 20260110, 3250.0, false),
738            "ETH-20260110-3250-P"
739        );
740    }
741
742    #[test]
743    fn test_format_symbol_large_strikes() {
744        // Large strike values (BTC-like)
745        assert_eq!(
746            format_symbol("BTC", 20260110, 150000.0, true),
747            "BTC-20260110-150000-C"
748        );
749        assert_eq!(
750            format_symbol("BTC", 20260110, 200000.0, false),
751            "BTC-20260110-200000-P"
752        );
753    }
754
755    // =========================================================================
756    // timestamp_to_code tests
757    // =========================================================================
758
759    #[test]
760    fn test_timestamp_to_code_basic() {
761        // 2026-01-10 00:00:00 UTC
762        let code = timestamp_to_code(1768003200);
763        assert_eq!(code, Some(20260110));
764    }
765
766    #[test]
767    fn test_timestamp_to_code_with_time() {
768        // 2026-01-31 08:00:00 UTC - typical 8am expiry
769        let code = timestamp_to_code(1769846400);
770        assert_eq!(code, Some(20260131));
771    }
772
773    #[test]
774    fn test_timestamp_to_code_end_of_day() {
775        // 2026-02-28 23:59:59 UTC
776        let code = timestamp_to_code(1772323199);
777        assert_eq!(code, Some(20260228));
778    }
779
780    #[test]
781    fn test_timestamp_to_code_leap_year() {
782        // 2028-02-29 (leap year) 08:00:00 UTC
783        let code = timestamp_to_code(1835424000);
784        assert_eq!(code, Some(20280229));
785    }
786
787    #[test]
788    fn test_timestamp_to_code_year_boundaries() {
789        // Dec 31, 2025 23:59:59 UTC
790        let code_dec = timestamp_to_code(1767225599);
791        assert_eq!(code_dec, Some(20251231));
792
793        // Jan 1, 2026 00:00:00 UTC
794        let code_jan = timestamp_to_code(1767225600);
795        assert_eq!(code_jan, Some(20260101));
796    }
797
798    #[test]
799    fn test_timestamp_to_code_various_months() {
800        // Various months to ensure correct padding
801        // 2026-03-15
802        assert_eq!(timestamp_to_code(1773532800), Some(20260315));
803        // 2026-12-01
804        assert_eq!(timestamp_to_code(1796083200), Some(20261201));
805    }
806
807    // =========================================================================
808    // CatalogManager construction tests
809    // =========================================================================
810
811    fn create_test_config() -> CatalogConfig {
812        use catalog_manager::*;
813
814        let mut underlyings = HashMap::new();
815        underlyings.insert(
816            "BTC".to_string(),
817            UnderlyingConfig {
818                vol_source: "deribit".to_string(),
819                hl_symbol: None,
820                trading_mode: hypercall_types::TradingModes::ORDERBOOK,
821                max_expiry_code: None,
822                expiry_time_utc: None,
823                schedule: None,
824            },
825        );
826        underlyings.insert(
827            "ETH".to_string(),
828            UnderlyingConfig {
829                vol_source: "deribit".to_string(),
830                hl_symbol: None,
831                trading_mode: hypercall_types::TradingModes::ORDERBOOK,
832                max_expiry_code: None,
833                expiry_time_utc: None,
834                schedule: None,
835            },
836        );
837
838        CatalogConfig {
839            version: 1,
840            expiry: ExpiryConfig {
841                expiry_time_utc: "08:00".to_string(),
842                schedule: ExpiryScheduleConfig {
843                    daily_count: 2,
844                    weekly_count: 4,
845                    monthly_count: 3,
846                    weekdays_only: false,
847                },
848            },
849            underlyings,
850            collateral: HashMap::from([
851                (
852                    "BTC_PERP".to_string(),
853                    HyperliquidAssetConfig::Perp(PerpCollateralConfig {
854                        asset_id: 0,
855                        underlying: "BTC".to_string(),
856                    }),
857                ),
858                (
859                    "ETH_PERP".to_string(),
860                    HyperliquidAssetConfig::Perp(PerpCollateralConfig {
861                        asset_id: 1,
862                        underlying: "ETH".to_string(),
863                    }),
864                ),
865                (
866                    "USDC".to_string(),
867                    HyperliquidAssetConfig::Stablecoin(StablecoinCollateralConfig { token_id: 0 }),
868                ),
869            ]),
870            strike_selection: StrikeSelectionConfig {
871                deribit_table_assets: vec![
872                    "BTC".to_string(),
873                    "ETH".to_string(),
874                    "SOL".to_string(),
875                    "AVAX".to_string(),
876                    "XRP".to_string(),
877                    "TRX".to_string(),
878                ],
879                deribit_region_steps: DeribitRegionStepsConfig {
880                    atm: 3,
881                    outer: 4,
882                    wings: 3,
883                },
884                occ_fallback_side_count: 8,
885            },
886            extension_policy: ExtensionPolicyConfig {
887                enabled: true,
888                ensure_min_strikes_per_side: 3,
889                ensure_atm_within_pct: 0.05,
890                cooldown_secs: 3600,
891                max_total_strikes_per_expiry: 30,
892                min_spot_move_pct: 0.05,
893            },
894            observability: ObservabilityConfig::default(),
895            vol_oracles: VolOracleCatalogConfig {
896                providers: HashMap::from([(
897                    "fixed_test".to_string(),
898                    VolOracleProviderConfig::Fixed(FixedVolOracleProviderConfig { iv: 0.5 }),
899                )]),
900                routes: HashMap::from([
901                    ("BTC".to_string(), vec!["fixed_test".to_string()]),
902                    ("ETH".to_string(), vec!["fixed_test".to_string()]),
903                ]),
904            },
905        }
906    }
907
908    #[test]
909    fn test_config_expiry_times() {
910        let config = create_test_config();
911        let times = config.expiry_times().unwrap();
912        assert_eq!(
913            times.for_underlying("BTC"),
914            hypercall_types::ExpiryTime { hour: 8, minute: 0 }
915        );
916    }
917
918    #[test]
919    fn test_config_underlyings() {
920        let config = create_test_config();
921
922        assert!(config.underlyings.contains_key("BTC"));
923        assert!(config.underlyings.contains_key("ETH"));
924
925        let btc = config.underlyings.get("BTC").unwrap();
926        assert_eq!(btc.vol_source, "deribit");
927
928        let eth = config.underlyings.get("ETH").unwrap();
929        assert_eq!(eth.trading_mode, hypercall_types::TradingModes::ORDERBOOK);
930    }
931
932    #[test]
933    fn test_config_extension_policy() {
934        let config = create_test_config();
935
936        assert!(config.extension_policy.enabled);
937        assert_eq!(config.extension_policy.ensure_min_strikes_per_side, 3);
938        assert!((config.extension_policy.ensure_atm_within_pct - 0.05).abs() < 1e-6);
939        assert_eq!(config.extension_policy.cooldown_secs, 3600);
940    }
941
942    // =========================================================================
943    // Integration with other modules
944    // =========================================================================
945
946    #[test]
947    fn test_expiry_schedule_generation_integration() {
948        let config = create_test_config();
949        let times = config.expiry_times().unwrap();
950
951        let schedule = generate_expiry_schedule_at_date(
952            chrono::Utc::now().date_naive(),
953            config.expiry.schedule.daily_count,
954            config.expiry.schedule.weekly_count,
955            config.expiry.schedule.monthly_count,
956            config.expiry.schedule.weekdays_only,
957            times.for_underlying("BTC"),
958        )
959        .unwrap();
960
961        // Should have daily + weekly + monthly expiries
962        let min_expiries = config.expiry.schedule.daily_count + config.expiry.schedule.weekly_count;
963        assert!(schedule.expiries.len() >= min_expiries);
964
965        // All expiries should be in the future
966        let now = chrono::Utc::now().timestamp();
967        for expiry in &schedule.expiries {
968            assert!(expiry.timestamp > now);
969        }
970    }
971
972    #[test]
973    fn test_strike_generation_integration() {
974        use catalog_manager::generate_strike_set;
975
976        let config = create_test_config();
977        // Generate strikes for BTC at $100,000 spot
978        let strike_set = generate_strike_set(
979            "BTC",
980            100000.0,
981            chrono::Utc::now().timestamp() + 30 * 86400,
982            &config.strike_selection,
983        )
984        .unwrap();
985
986        // Strikes should be in order
987        for window in strike_set.strikes.windows(2) {
988            assert!(window[0] < window[1]);
989        }
990
991        // ATM strike should exist
992        let has_atm = strike_set
993            .strikes
994            .iter()
995            .any(|&s| (s - 100000.0).abs() < 1e-6);
996        assert!(has_atm);
997    }
998
999    #[test]
1000    fn test_strike_set_for_eth() {
1001        use catalog_manager::generate_strike_set;
1002
1003        let config = create_test_config();
1004        // Generate strikes for ETH at $3,500 spot
1005        let strike_set = generate_strike_set(
1006            "ETH",
1007            3500.0,
1008            chrono::Utc::now().timestamp() + 30 * 86400,
1009            &config.strike_selection,
1010        )
1011        .unwrap();
1012
1013        // Monthly ETH table uses $50/$100/$200 intervals, so every strike
1014        // remains aligned to $50.
1015        for &strike in &strike_set.strikes {
1016            let remainder = strike % 50.0;
1017            assert!(
1018                remainder.abs() < 1e-6 || (50.0 - remainder).abs() < 1e-6,
1019                "Strike {} not aligned to tick 50",
1020                strike
1021            );
1022        }
1023    }
1024
1025    // =========================================================================
1026    // Symbol parsing round-trip tests
1027    // =========================================================================
1028
1029    #[test]
1030    fn test_format_symbol_roundtrip() {
1031        // Test that we can format and parse back consistently
1032        let test_cases = [
1033            ("BTC", 20260131, 95000.0, true),
1034            ("ETH", 20260228, 3500.0, false),
1035            ("BTC", 20260315, 105000.5, true),
1036            ("ETH", 20260401, 3250.0, false),
1037        ];
1038
1039        for (underlying, expiry_code, strike, is_call) in test_cases {
1040            let symbol = format_symbol(underlying, expiry_code, strike, is_call);
1041
1042            // Verify symbol contains all components
1043            assert!(symbol.starts_with(underlying));
1044            assert!(symbol.contains(&expiry_code.to_string()));
1045            assert!(symbol.ends_with(if is_call { "-C" } else { "-P" }));
1046        }
1047    }
1048
1049    #[test]
1050    fn test_timestamp_code_consistency() {
1051        // Test various timestamps produce valid YYYYMMDD codes
1052        let timestamps = [
1053            1769846400i64, // 2026-01-31 08:00:00 UTC
1054            1772265600,    // 2026-02-28 08:00:00 UTC
1055            1774857600,    // 2026-03-27 08:00:00 UTC
1056            1777536000,    // 2026-04-24 08:00:00 UTC
1057        ];
1058
1059        for ts in timestamps {
1060            let code = timestamp_to_code(ts).expect("valid timestamp");
1061
1062            // Verify code is valid YYYYMMDD
1063            let year = code / 10000;
1064            let month = (code / 100) % 100;
1065            let day = code % 100;
1066
1067            assert!(
1068                (2025..=2030).contains(&year),
1069                "Year {} out of expected range",
1070                year
1071            );
1072            assert!((1..=12).contains(&month), "Month {} invalid", month);
1073            assert!((1..=31).contains(&day), "Day {} invalid", day);
1074        }
1075    }
1076
1077    // =========================================================================
1078    // Edge case tests
1079    // =========================================================================
1080
1081    #[test]
1082    fn test_format_symbol_very_small_decimal() {
1083        // Very small decimal that should not affect output
1084        let symbol = format_symbol("BTC", 20260110, 100000.00000001, true);
1085        assert_eq!(symbol, "BTC-20260110-100000-C");
1086    }
1087
1088    #[test]
1089    fn test_format_symbol_negative_strike_handled() {
1090        // This shouldn't happen in practice, but test robustness
1091        let symbol = format_symbol("BTC", 20260110, -100.0, true);
1092        assert!(symbol.contains("-100"));
1093    }
1094
1095    #[test]
1096    fn test_timestamp_to_code_handles_edge_timestamps() {
1097        // Unix epoch start (1970-01-01 00:00:00 UTC)
1098        let code = timestamp_to_code(0);
1099        assert_eq!(code, Some(19700101));
1100
1101        // Timestamp 1 second after epoch
1102        let code = timestamp_to_code(1);
1103        assert_eq!(code, Some(19700101));
1104
1105        // Just before epoch (1969-12-31 23:59:59) - negative timestamps are valid
1106        let code = timestamp_to_code(-1);
1107        assert_eq!(code, Some(19691231));
1108
1109        // Far-future timestamp that overflows - should return None
1110        // chrono's from_timestamp returns None for values outside valid range
1111        let code = timestamp_to_code(i64::MAX);
1112        assert!(code.is_none(), "Should return None for overflow timestamp");
1113    }
1114}