Skip to main content

hypercall_db_diesel/
catalog.rs

1//! CatalogReader + CatalogWriter trait implementations for DieselDb.
2//!
3//! Implements catalog management queries: advisory locks, market catalog state,
4//! instrument queries, and trading mode updates.
5
6use anyhow::{Context, Result};
7use diesel::prelude::*;
8use diesel::sql_types::*;
9use diesel_async::RunQueryDsl;
10use rust_decimal::Decimal;
11use tracing::debug;
12
13use crate::diesel_db::DieselDb;
14
15// ---- Private QueryableByName structs for raw SQL results ----
16
17#[derive(QueryableByName, Debug)]
18struct AdvisoryLockResult {
19    #[diesel(sql_type = Bool)]
20    pub pg_try_advisory_lock: bool,
21}
22
23#[derive(QueryableByName, Debug)]
24struct AdvisoryUnlockResult {
25    #[diesel(sql_type = Bool)]
26    pub pg_advisory_unlock: bool,
27}
28
29#[derive(QueryableByName, Debug)]
30struct MarketCatalogStateRow {
31    #[diesel(sql_type = Text)]
32    pub underlying: String,
33    #[diesel(sql_type = BigInt)]
34    pub expiry: i64,
35    #[diesel(sql_type = Numeric)]
36    pub ref_price_at_listing: Decimal,
37    #[diesel(sql_type = BigInt)]
38    pub listed_at: i64,
39    #[diesel(sql_type = Integer)]
40    pub listing_policy_version: i32,
41    #[diesel(sql_type = Nullable<BigInt>)]
42    pub last_extension_at: Option<i64>,
43    #[diesel(sql_type = Nullable<Numeric>)]
44    pub last_extension_ref_price: Option<Decimal>,
45}
46
47#[derive(QueryableByName, Debug)]
48struct InstrumentRow {
49    #[diesel(sql_type = Text)]
50    pub id: String,
51    #[diesel(sql_type = Text)]
52    pub underlying: String,
53    #[diesel(sql_type = Numeric)]
54    pub strike: Decimal,
55    #[diesel(sql_type = BigInt)]
56    pub expiry: i64,
57    #[diesel(sql_type = Text)]
58    pub option_type: String,
59}
60
61#[derive(QueryableByName, Debug)]
62struct StrikeRow {
63    #[diesel(sql_type = Numeric)]
64    pub strike: Decimal,
65}
66
67#[derive(QueryableByName, Debug)]
68struct MarketRow {
69    #[diesel(sql_type = Text)]
70    pub underlying: String,
71    #[diesel(sql_type = BigInt)]
72    pub expiry: i64,
73}
74
75#[derive(QueryableByName, Debug)]
76struct CountRow {
77    #[diesel(sql_type = Integer)]
78    pub count: i32,
79}
80
81// =============================================================================
82// CatalogReader
83// =============================================================================
84
85#[async_trait::async_trait]
86impl hypercall_db::CatalogReader for DieselDb {
87    async fn get_market_catalog_state(
88        &self,
89        underlying: &str,
90        expiry: i64,
91    ) -> Result<Option<hypercall_db::MarketCatalogState>> {
92        let mut conn = self.get_conn().await?;
93        let rows = diesel::sql_query(
94            r#"
95            SELECT underlying, expiry, ref_price_at_listing, listed_at,
96                   listing_policy_version, last_extension_at, last_extension_ref_price
97            FROM market_catalog_state
98            WHERE underlying = $1 AND expiry = $2
99            "#,
100        )
101        .bind::<Text, _>(underlying)
102        .bind::<BigInt, _>(expiry)
103        .get_results::<MarketCatalogStateRow>(&mut conn)
104        .await
105        .context("Failed to query market_catalog_state")?;
106
107        Ok(rows
108            .into_iter()
109            .next()
110            .map(|row| hypercall_db::MarketCatalogState {
111                underlying: row.underlying,
112                expiry: row.expiry,
113                ref_price_at_listing: row.ref_price_at_listing,
114                listed_at: row.listed_at,
115                listing_policy_version: row.listing_policy_version,
116                last_extension_at: row.last_extension_at,
117                last_extension_ref_price: row.last_extension_ref_price,
118            }))
119    }
120
121    async fn get_instruments_for_settlement(
122        &self,
123        underlying: &str,
124        expiry: i64,
125    ) -> Result<Vec<hypercall_db::CatalogInstrument>> {
126        let mut conn = self.get_conn().await?;
127        let rows = diesel::sql_query(
128            r#"
129            SELECT id, underlying, strike, expiry, option_type
130            FROM instruments
131            WHERE underlying = $1 AND expiry = $2
132            "#,
133        )
134        .bind::<Text, _>(underlying)
135        .bind::<BigInt, _>(expiry)
136        .get_results::<InstrumentRow>(&mut conn)
137        .await
138        .context("Failed to query instruments for settlement")?;
139
140        Ok(rows
141            .into_iter()
142            .map(|row| hypercall_db::CatalogInstrument {
143                id: row.id,
144                underlying: row.underlying,
145                strike: row.strike,
146                expiry: row.expiry,
147                option_type: row.option_type,
148            })
149            .collect())
150    }
151
152    async fn get_distinct_strikes(&self, underlying: &str, expiry: i64) -> Result<Vec<Decimal>> {
153        let mut conn = self.get_conn().await?;
154        let rows = diesel::sql_query(
155            r#"
156            SELECT DISTINCT strike FROM instruments
157            WHERE underlying = $1 AND expiry = $2
158            ORDER BY strike
159            "#,
160        )
161        .bind::<Text, _>(underlying)
162        .bind::<BigInt, _>(expiry)
163        .get_results::<StrikeRow>(&mut conn)
164        .await
165        .context("Failed to query distinct strikes")?;
166
167        Ok(rows.into_iter().map(|row| row.strike).collect())
168    }
169
170    async fn get_markets_for_underlying(&self, underlying: &str) -> Result<Vec<(String, i64)>> {
171        let mut conn = self.get_conn().await?;
172        let rows = diesel::sql_query(
173            r#"
174            SELECT underlying, expiry FROM markets
175            WHERE underlying = $1
176            ORDER BY expiry
177            "#,
178        )
179        .bind::<Text, _>(underlying)
180        .get_results::<MarketRow>(&mut conn)
181        .await
182        .context("Failed to query markets for underlying")?;
183
184        Ok(rows
185            .into_iter()
186            .map(|row| (row.underlying, row.expiry))
187            .collect())
188    }
189
190    async fn get_instruments_count(&self, underlying: &str, expiry: i64) -> Result<i32> {
191        let mut conn = self.get_conn().await?;
192        let row = diesel::sql_query(
193            r#"
194            SELECT COUNT(*)::int as count FROM instruments
195            WHERE underlying = $1 AND expiry = $2
196            "#,
197        )
198        .bind::<Text, _>(underlying)
199        .bind::<BigInt, _>(expiry)
200        .get_result::<CountRow>(&mut conn)
201        .await
202        .context("Failed to count instruments")?;
203
204        Ok(row.count)
205    }
206
207    async fn market_exists(&self, underlying: &str, expiry: i64) -> Result<bool> {
208        let mut conn = self.get_conn().await?;
209        let rows = diesel::sql_query(
210            r#"
211            SELECT 1 as one FROM markets WHERE underlying = $1 AND expiry = $2
212            "#,
213        )
214        .bind::<Text, _>(underlying)
215        .bind::<BigInt, _>(expiry)
216        .get_results::<ExistsRow>(&mut conn)
217        .await
218        .context("Failed to check market existence")?;
219
220        Ok(!rows.is_empty())
221    }
222
223    async fn instrument_exists(&self, symbol: &str) -> Result<bool> {
224        let mut conn = self.get_conn().await?;
225        let rows = diesel::sql_query(
226            r#"
227            SELECT 1 as one FROM instruments WHERE id = $1
228            "#,
229        )
230        .bind::<Text, _>(symbol)
231        .get_results::<ExistsRow>(&mut conn)
232        .await
233        .context("Failed to check instrument existence")?;
234
235        Ok(!rows.is_empty())
236    }
237}
238
239// =============================================================================
240// CatalogWriter
241// =============================================================================
242
243#[async_trait::async_trait]
244impl hypercall_db::CatalogWriter for DieselDb {
245    async fn upsert_market_catalog_state(
246        &self,
247        state: &hypercall_db::MarketCatalogState,
248    ) -> Result<()> {
249        let mut conn = self.get_conn().await?;
250        diesel::sql_query(
251            r#"
252            INSERT INTO market_catalog_state
253                (underlying, expiry, ref_price_at_listing, listed_at, listing_policy_version,
254                 last_extension_at, last_extension_ref_price)
255            VALUES ($1, $2, $3, $4, $5, $6, $7)
256            ON CONFLICT (underlying, expiry)
257            DO UPDATE SET
258                last_extension_at = EXCLUDED.last_extension_at,
259                last_extension_ref_price = EXCLUDED.last_extension_ref_price
260            "#,
261        )
262        .bind::<Text, _>(&state.underlying)
263        .bind::<BigInt, _>(state.expiry)
264        .bind::<Numeric, _>(state.ref_price_at_listing)
265        .bind::<BigInt, _>(state.listed_at)
266        .bind::<Integer, _>(state.listing_policy_version)
267        .bind::<Nullable<BigInt>, _>(state.last_extension_at)
268        .bind::<Nullable<Numeric>, _>(state.last_extension_ref_price)
269        .execute(&mut conn)
270        .await
271        .context("Failed to upsert market_catalog_state")?;
272
273        Ok(())
274    }
275
276    async fn update_trading_mode_for_underlying(
277        &self,
278        underlying: &str,
279        trading_mode: &str,
280    ) -> Result<()> {
281        let mut conn = self.get_conn().await?;
282        diesel::sql_query(
283            "UPDATE instruments SET trading_mode = $1 WHERE underlying = $2 AND trading_mode != $1",
284        )
285        .bind::<Text, _>(trading_mode)
286        .bind::<Text, _>(underlying)
287        .execute(&mut conn)
288        .await
289        .context("Failed to update trading_mode")?;
290
291        Ok(())
292    }
293}
294
295// =============================================================================
296// Additional DieselDb methods for CatalogManager (not part of trait interface)
297// =============================================================================
298
299/// Market catalog state record (listing-oriented, used by CatalogManager internally).
300#[derive(Debug, Clone)]
301pub struct CatalogListingState {
302    pub underlying: String,
303    pub expiry: i64,
304    pub ref_price_at_listing: Decimal,
305    pub listed_at: i64,
306    pub listing_policy_version: i32,
307    pub last_extension_at: Option<i64>,
308    pub last_extension_ref_price: Option<Decimal>,
309}
310
311#[derive(QueryableByName, Debug)]
312struct CatalogListingStateRow {
313    #[diesel(sql_type = Text)]
314    pub underlying: String,
315    #[diesel(sql_type = BigInt)]
316    pub expiry: i64,
317    #[diesel(sql_type = Numeric)]
318    pub ref_price_at_listing: Decimal,
319    #[diesel(sql_type = BigInt)]
320    pub listed_at: i64,
321    #[diesel(sql_type = Integer)]
322    pub listing_policy_version: i32,
323    #[diesel(sql_type = Nullable<BigInt>)]
324    pub last_extension_at: Option<i64>,
325    #[diesel(sql_type = Nullable<Numeric>)]
326    pub last_extension_ref_price: Option<Decimal>,
327}
328
329#[derive(QueryableByName, Debug)]
330#[allow(dead_code)]
331struct ExistsRow {
332    #[diesel(sql_type = Integer)]
333    pub one: i32,
334}
335
336impl DieselDb {
337    /// Get catalog listing state for a market (underlying + expiry).
338    /// This is the listing-oriented state used by CatalogManager for
339    /// reference prices and extension tracking.
340    pub async fn get_catalog_listing_state(
341        &self,
342        underlying: &str,
343        expiry: i64,
344    ) -> Result<Option<CatalogListingState>> {
345        let mut conn = self.get_conn().await?;
346        let rows = diesel::sql_query(
347            r#"
348            SELECT underlying, expiry, ref_price_at_listing, listed_at,
349                   listing_policy_version, last_extension_at, last_extension_ref_price
350            FROM market_catalog_state
351            WHERE underlying = $1 AND expiry = $2
352            "#,
353        )
354        .bind::<Text, _>(underlying)
355        .bind::<BigInt, _>(expiry)
356        .get_results::<CatalogListingStateRow>(&mut conn)
357        .await
358        .context("Failed to query market_catalog_state")?;
359
360        Ok(rows.into_iter().next().map(|row| CatalogListingState {
361            underlying: row.underlying,
362            expiry: row.expiry,
363            ref_price_at_listing: row.ref_price_at_listing,
364            listed_at: row.listed_at,
365            listing_policy_version: row.listing_policy_version,
366            last_extension_at: row.last_extension_at,
367            last_extension_ref_price: row.last_extension_ref_price,
368        }))
369    }
370
371    /// Insert initial catalog state for a new market.
372    pub async fn insert_catalog_listing_state(
373        &self,
374        underlying: &str,
375        expiry: i64,
376        ref_price: f64,
377        policy_version: i32,
378    ) -> Result<()> {
379        let now_ms = chrono::Utc::now().timestamp_millis();
380        let ref_price_dec = Decimal::from_f64_retain(ref_price)
381            .ok_or_else(|| anyhow::anyhow!("Invalid ref price: {}", ref_price))?;
382
383        let mut conn = self.get_conn().await?;
384        diesel::sql_query(
385            r#"
386            INSERT INTO market_catalog_state
387                (underlying, expiry, ref_price_at_listing, listed_at, listing_policy_version)
388            VALUES ($1, $2, $3, $4, $5)
389            ON CONFLICT (underlying, expiry) DO NOTHING
390            "#,
391        )
392        .bind::<Text, _>(underlying)
393        .bind::<BigInt, _>(expiry)
394        .bind::<Numeric, _>(ref_price_dec)
395        .bind::<BigInt, _>(now_ms)
396        .bind::<Integer, _>(policy_version)
397        .execute(&mut conn)
398        .await
399        .context("Failed to insert market_catalog_state")?;
400
401        tracing::info!(
402            "Inserted catalog state for {}-{} with ref_price={}",
403            underlying,
404            expiry,
405            ref_price
406        );
407
408        Ok(())
409    }
410
411    /// Update extension timestamp and ref price for a market.
412    pub async fn update_extension_state(
413        &self,
414        underlying: &str,
415        expiry: i64,
416        ref_price: f64,
417    ) -> Result<()> {
418        let now_ms = chrono::Utc::now().timestamp_millis();
419        let ref_price_dec = Decimal::from_f64_retain(ref_price)
420            .ok_or_else(|| anyhow::anyhow!("Invalid ref price: {}", ref_price))?;
421
422        let mut conn = self.get_conn().await?;
423        let updated = diesel::sql_query(
424            r#"
425            UPDATE market_catalog_state
426            SET last_extension_at = $3, last_extension_ref_price = $4
427            WHERE underlying = $1 AND expiry = $2
428            "#,
429        )
430        .bind::<Text, _>(underlying)
431        .bind::<BigInt, _>(expiry)
432        .bind::<BigInt, _>(now_ms)
433        .bind::<Numeric, _>(ref_price_dec)
434        .execute(&mut conn)
435        .await
436        .context("Failed to update extension state")?;
437
438        if updated == 0 {
439            anyhow::bail!(
440                "No market_catalog_state row for {}-{}: cannot update extension",
441                underlying,
442                expiry
443            );
444        }
445
446        debug!(
447            "Updated extension state for {}-{} with ref_price={}",
448            underlying, expiry, ref_price
449        );
450
451        Ok(())
452    }
453
454    /// Get all existing strikes for a market as f64 values.
455    pub async fn get_existing_strikes(&self, underlying: &str, expiry: i64) -> Result<Vec<f64>> {
456        let mut conn = self.get_conn().await?;
457        let rows = diesel::sql_query(
458            r#"
459            SELECT DISTINCT strike FROM instruments
460            WHERE underlying = $1 AND expiry = $2
461            ORDER BY strike
462            "#,
463        )
464        .bind::<Text, _>(underlying)
465        .bind::<BigInt, _>(expiry)
466        .get_results::<StrikeRow>(&mut conn)
467        .await
468        .context("Failed to query strikes")?;
469
470        rows.into_iter()
471            .map(|row| {
472                row.strike
473                    .to_string()
474                    .parse::<f64>()
475                    .map_err(|e| anyhow::anyhow!("invalid strike value {}: {}", row.strike, e))
476            })
477            .collect::<Result<Vec<f64>>>()
478    }
479
480    /// Update trading_mode for instruments of a given underlying.
481    /// Returns the number of rows actually changed.
482    pub async fn update_trading_mode_count(
483        &self,
484        underlying: &str,
485        mode_str: &str,
486    ) -> Result<usize> {
487        let mut conn = self.get_conn().await?;
488        let result = diesel::sql_query(
489            "UPDATE instruments SET trading_mode = $1 WHERE underlying = $2 AND trading_mode != $1",
490        )
491        .bind::<Text, _>(mode_str)
492        .bind::<Text, _>(underlying)
493        .execute(&mut conn)
494        .await
495        .context("Failed to update trading_mode")?;
496
497        Ok(result)
498    }
499
500    /// Try to acquire the advisory lock for CatalogManager.
501    /// Returns true if lock was acquired, false if another instance holds it.
502    /// Unlike the trait method, this operates on a specific connection that is
503    /// passed in, preserving session-scoped lock semantics.
504    pub async fn try_acquire_advisory_lock_on_conn(
505        &self,
506        conn: &mut diesel_async::AsyncPgConnection,
507        key: i64,
508    ) -> Result<bool> {
509        let result = diesel::sql_query("SELECT pg_try_advisory_lock($1)")
510            .bind::<BigInt, _>(key)
511            .get_result::<AdvisoryLockResult>(conn)
512            .await
513            .context("Failed to try advisory lock")?;
514
515        if result.pg_try_advisory_lock {
516            debug!("Acquired advisory lock {}", key);
517        } else {
518            debug!("Advisory lock {} held by another session", key);
519        }
520
521        Ok(result.pg_try_advisory_lock)
522    }
523
524    /// Release the advisory lock on a specific connection.
525    pub async fn release_advisory_lock_on_conn(
526        &self,
527        conn: &mut diesel_async::AsyncPgConnection,
528        key: i64,
529    ) -> Result<()> {
530        let result = diesel::sql_query("SELECT pg_advisory_unlock($1)")
531            .bind::<BigInt, _>(key)
532            .get_result::<AdvisoryUnlockResult>(conn)
533            .await
534            .context("Failed to release advisory lock")?;
535
536        if result.pg_advisory_unlock {
537            debug!("Released advisory lock {}", key);
538        } else {
539            debug!("Advisory lock {} was not held on this connection", key);
540        }
541
542        Ok(())
543    }
544}
545
546#[cfg(test)]
547mod tests {
548    use crate::test_helpers::TestDb;
549    use hypercall_db::*;
550    use hypercall_types::api_models::InstrumentStatus;
551    use hypercall_types::OptionType;
552    use rust_decimal_macros::dec;
553
554    #[tokio::test]
555    async fn get_instruments_for_settlement_returns_matching() {
556        let test_db = TestDb::new().await.unwrap();
557        let db = test_db.diesel_db().await;
558        let handler = test_db.handler.as_ref();
559
560        let call = InstrumentRecord {
561            instrument_numeric_id: 0,
562            id: "BTC-20260131-100000-C".to_string(),
563            underlying: "BTC".to_string(),
564            strike: dec!(100000),
565            expiry: 20260131,
566            option_type: OptionType::Call,
567            option_token_address: None,
568            status: InstrumentStatus::Active,
569            trading_mode: "orderbook".to_string(),
570        };
571        handler
572            .save_market_and_instrument_sync("BTC", 20260131, &call)
573            .unwrap();
574
575        let put = InstrumentRecord {
576            instrument_numeric_id: 0,
577            id: "BTC-20260131-90000-P".to_string(),
578            underlying: "BTC".to_string(),
579            strike: dec!(90000),
580            expiry: 20260131,
581            option_type: OptionType::Put,
582            option_token_address: None,
583            status: InstrumentStatus::Active,
584            trading_mode: "orderbook".to_string(),
585        };
586        handler
587            .save_market_and_instrument_sync("BTC", 20260131, &put)
588            .unwrap();
589
590        // Different underlying
591        let eth = InstrumentRecord {
592            instrument_numeric_id: 0,
593            id: "ETH-20260131-4000-C".to_string(),
594            underlying: "ETH".to_string(),
595            strike: dec!(4000),
596            expiry: 20260131,
597            option_type: OptionType::Call,
598            option_token_address: None,
599            status: InstrumentStatus::Active,
600            trading_mode: "orderbook".to_string(),
601        };
602        handler
603            .save_market_and_instrument_sync("ETH", 20260131, &eth)
604            .unwrap();
605
606        let instruments = db
607            .get_instruments_for_settlement("BTC", 20260131)
608            .await
609            .unwrap();
610        assert_eq!(instruments.len(), 2);
611        assert!(instruments.iter().all(|i| i.underlying == "BTC"));
612        assert!(instruments.iter().all(|i| i.expiry == 20260131));
613    }
614
615    #[tokio::test]
616    async fn get_distinct_strikes_returns_sorted_unique() {
617        let test_db = TestDb::new().await.unwrap();
618        let db = test_db.diesel_db().await;
619        let handler = test_db.handler.as_ref();
620
621        for (strike_val, opt_type, opt_str) in [
622            (dec!(100000), OptionType::Call, "C"),
623            (dec!(100000), OptionType::Put, "P"),
624            (dec!(90000), OptionType::Put, "P"),
625            (dec!(110000), OptionType::Call, "C"),
626        ] {
627            let id = format!("BTC-20260131-{}-{}", strike_val, opt_str);
628            let inst = InstrumentRecord {
629                instrument_numeric_id: 0,
630                id,
631                underlying: "BTC".to_string(),
632                strike: strike_val,
633                expiry: 20260131,
634                option_type: opt_type,
635                option_token_address: None,
636                status: InstrumentStatus::Active,
637                trading_mode: "orderbook".to_string(),
638            };
639            handler
640                .save_market_and_instrument_sync("BTC", 20260131, &inst)
641                .unwrap();
642        }
643
644        let strikes = db.get_distinct_strikes("BTC", 20260131).await.unwrap();
645        assert_eq!(strikes.len(), 3);
646        assert_eq!(strikes[0], dec!(90000));
647        assert_eq!(strikes[1], dec!(100000));
648        assert_eq!(strikes[2], dec!(110000));
649    }
650
651    #[tokio::test]
652    async fn get_markets_for_underlying_returns_ordered() {
653        let test_db = TestDb::new().await.unwrap();
654        let db = test_db.diesel_db().await;
655        let handler = test_db.handler.as_ref();
656
657        let inst1 = InstrumentRecord {
658            instrument_numeric_id: 0,
659            id: "BTC-20260131-100000-C".to_string(),
660            underlying: "BTC".to_string(),
661            strike: dec!(100000),
662            expiry: 20260131,
663            option_type: OptionType::Call,
664            option_token_address: None,
665            status: InstrumentStatus::Active,
666            trading_mode: "orderbook".to_string(),
667        };
668        handler
669            .save_market_and_instrument_sync("BTC", 20260131, &inst1)
670            .unwrap();
671
672        let inst2 = InstrumentRecord {
673            instrument_numeric_id: 0,
674            id: "BTC-20260228-100000-C".to_string(),
675            underlying: "BTC".to_string(),
676            strike: dec!(100000),
677            expiry: 20260228,
678            option_type: OptionType::Call,
679            option_token_address: None,
680            status: InstrumentStatus::Active,
681            trading_mode: "orderbook".to_string(),
682        };
683        handler
684            .save_market_and_instrument_sync("BTC", 20260228, &inst2)
685            .unwrap();
686
687        let markets = db.get_markets_for_underlying("BTC").await.unwrap();
688        assert_eq!(markets.len(), 2);
689        assert_eq!(markets[0].1, 20260131);
690        assert_eq!(markets[1].1, 20260228);
691        assert!(markets.iter().all(|(u, _)| u == "BTC"));
692    }
693
694    #[tokio::test]
695    async fn get_instruments_count_correct() {
696        let test_db = TestDb::new().await.unwrap();
697        let db = test_db.diesel_db().await;
698        let handler = test_db.handler.as_ref();
699
700        let count = db.get_instruments_count("BTC", 20260131).await.unwrap();
701        assert_eq!(count, 0);
702
703        for (strike, opt) in [(dec!(100000), "C"), (dec!(90000), "P")] {
704            let id = format!("BTC-20260131-{}-{}", strike, opt);
705            let inst = InstrumentRecord {
706                instrument_numeric_id: 0,
707                id,
708                underlying: "BTC".to_string(),
709                strike,
710                expiry: 20260131,
711                option_type: if opt == "C" {
712                    OptionType::Call
713                } else {
714                    OptionType::Put
715                },
716                option_token_address: None,
717                status: InstrumentStatus::Active,
718                trading_mode: "orderbook".to_string(),
719            };
720            handler
721                .save_market_and_instrument_sync("BTC", 20260131, &inst)
722                .unwrap();
723        }
724
725        let count = db.get_instruments_count("BTC", 20260131).await.unwrap();
726        assert_eq!(count, 2);
727
728        let count_eth = db.get_instruments_count("ETH", 20260131).await.unwrap();
729        assert_eq!(count_eth, 0);
730    }
731
732    #[tokio::test]
733    async fn update_trading_mode_changes_instruments() {
734        let test_db = TestDb::new().await.unwrap();
735        let db = test_db.diesel_db().await;
736        let handler = test_db.handler.as_ref();
737
738        let inst = InstrumentRecord {
739            instrument_numeric_id: 0,
740            id: "BTC-20260131-100000-C".to_string(),
741            underlying: "BTC".to_string(),
742            strike: dec!(100000),
743            expiry: 20260131,
744            option_type: OptionType::Call,
745            option_token_address: None,
746            status: InstrumentStatus::Active,
747            trading_mode: "orderbook".to_string(),
748        };
749        handler
750            .save_market_and_instrument_sync("BTC", 20260131, &inst)
751            .unwrap();
752
753        db.update_trading_mode_count("BTC", "rfq").await.unwrap();
754
755        let mut conn = test_db.handler.pool().get().unwrap();
756        #[derive(diesel::QueryableByName)]
757        struct ModeRow {
758            #[diesel(sql_type = diesel::sql_types::Text)]
759            trading_mode: String,
760        }
761        let rows: Vec<ModeRow> = diesel::RunQueryDsl::load(
762            diesel::sql_query(
763                "SELECT trading_mode FROM instruments WHERE id = 'BTC-20260131-100000-C'",
764            ),
765            &mut conn,
766        )
767        .unwrap();
768        assert_eq!(rows[0].trading_mode, "rfq");
769    }
770
771    #[tokio::test]
772    async fn catalog_listing_state_roundtrip() {
773        let test_db = TestDb::new().await.unwrap();
774        let db = test_db.diesel_db().await;
775        let handler = test_db.handler.as_ref();
776
777        let inst = InstrumentRecord {
778            instrument_numeric_id: 0,
779            id: "BTC-20260131-100000-C".to_string(),
780            underlying: "BTC".to_string(),
781            strike: dec!(100000),
782            expiry: 20260131,
783            option_type: OptionType::Call,
784            option_token_address: None,
785            status: InstrumentStatus::Active,
786            trading_mode: "orderbook".to_string(),
787        };
788        handler
789            .save_market_and_instrument_sync("BTC", 20260131, &inst)
790            .unwrap();
791
792        db.insert_catalog_listing_state("BTC", 20260131, 100000.0, 1)
793            .await
794            .unwrap();
795
796        let state = db.get_catalog_listing_state("BTC", 20260131).await.unwrap();
797        assert!(state.is_some());
798        let state = state.unwrap();
799        assert_eq!(state.underlying, "BTC");
800        assert_eq!(state.expiry, 20260131);
801        assert_eq!(state.listing_policy_version, 1);
802        assert!(state.last_extension_at.is_none());
803    }
804
805    #[tokio::test]
806    async fn update_extension_state_works() {
807        let test_db = TestDb::new().await.unwrap();
808        let db = test_db.diesel_db().await;
809        let handler = test_db.handler.as_ref();
810
811        let inst = InstrumentRecord {
812            instrument_numeric_id: 0,
813            id: "BTC-20260131-100000-C".to_string(),
814            underlying: "BTC".to_string(),
815            strike: dec!(100000),
816            expiry: 20260131,
817            option_type: OptionType::Call,
818            option_token_address: None,
819            status: InstrumentStatus::Active,
820            trading_mode: "orderbook".to_string(),
821        };
822        handler
823            .save_market_and_instrument_sync("BTC", 20260131, &inst)
824            .unwrap();
825
826        db.insert_catalog_listing_state("BTC", 20260131, 100000.0, 1)
827            .await
828            .unwrap();
829
830        db.update_extension_state("BTC", 20260131, 105000.0)
831            .await
832            .unwrap();
833
834        let state = db
835            .get_catalog_listing_state("BTC", 20260131)
836            .await
837            .unwrap()
838            .unwrap();
839        assert!(state.last_extension_at.is_some());
840        assert!(state.last_extension_ref_price.is_some());
841    }
842
843    #[tokio::test]
844    async fn market_exists_checks() {
845        let test_db = TestDb::new().await.unwrap();
846        let db = test_db.diesel_db().await;
847        let handler = test_db.handler.as_ref();
848
849        assert!(!db.market_exists("BTC", 20260131).await.unwrap());
850
851        let inst = InstrumentRecord {
852            instrument_numeric_id: 0,
853            id: "BTC-20260131-100000-C".to_string(),
854            underlying: "BTC".to_string(),
855            strike: dec!(100000),
856            expiry: 20260131,
857            option_type: OptionType::Call,
858            option_token_address: None,
859            status: InstrumentStatus::Active,
860            trading_mode: "orderbook".to_string(),
861        };
862        handler
863            .save_market_and_instrument_sync("BTC", 20260131, &inst)
864            .unwrap();
865
866        assert!(db.market_exists("BTC", 20260131).await.unwrap());
867        assert!(!db.market_exists("ETH", 20260131).await.unwrap());
868    }
869}