1use anyhow::{Context, Result};
7use diesel::prelude::*;
8use diesel::sql_types::*;
9use diesel_async::RunQueryDsl;
10use rust_decimal::Decimal;
11use tracing::debug;
12
13use crate::diesel_db::DieselDb;
14
15#[derive(QueryableByName, Debug)]
18struct AdvisoryLockResult {
19 #[diesel(sql_type = Bool)]
20 pub pg_try_advisory_lock: bool,
21}
22
23#[derive(QueryableByName, Debug)]
24struct AdvisoryUnlockResult {
25 #[diesel(sql_type = Bool)]
26 pub pg_advisory_unlock: bool,
27}
28
29#[derive(QueryableByName, Debug)]
30struct MarketCatalogStateRow {
31 #[diesel(sql_type = Text)]
32 pub underlying: String,
33 #[diesel(sql_type = BigInt)]
34 pub expiry: i64,
35 #[diesel(sql_type = Numeric)]
36 pub ref_price_at_listing: Decimal,
37 #[diesel(sql_type = BigInt)]
38 pub listed_at: i64,
39 #[diesel(sql_type = Integer)]
40 pub listing_policy_version: i32,
41 #[diesel(sql_type = Nullable<BigInt>)]
42 pub last_extension_at: Option<i64>,
43 #[diesel(sql_type = Nullable<Numeric>)]
44 pub last_extension_ref_price: Option<Decimal>,
45}
46
47#[derive(QueryableByName, Debug)]
48struct InstrumentRow {
49 #[diesel(sql_type = Text)]
50 pub id: String,
51 #[diesel(sql_type = Text)]
52 pub underlying: String,
53 #[diesel(sql_type = Numeric)]
54 pub strike: Decimal,
55 #[diesel(sql_type = BigInt)]
56 pub expiry: i64,
57 #[diesel(sql_type = Text)]
58 pub option_type: String,
59}
60
61#[derive(QueryableByName, Debug)]
62struct StrikeRow {
63 #[diesel(sql_type = Numeric)]
64 pub strike: Decimal,
65}
66
67#[derive(QueryableByName, Debug)]
68struct MarketRow {
69 #[diesel(sql_type = Text)]
70 pub underlying: String,
71 #[diesel(sql_type = BigInt)]
72 pub expiry: i64,
73}
74
75#[derive(QueryableByName, Debug)]
76struct CountRow {
77 #[diesel(sql_type = Integer)]
78 pub count: i32,
79}
80
81#[async_trait::async_trait]
86impl hypercall_db::CatalogReader for DieselDb {
87 async fn get_market_catalog_state(
88 &self,
89 underlying: &str,
90 expiry: i64,
91 ) -> Result<Option<hypercall_db::MarketCatalogState>> {
92 let mut conn = self.get_conn().await?;
93 let rows = diesel::sql_query(
94 r#"
95 SELECT underlying, expiry, ref_price_at_listing, listed_at,
96 listing_policy_version, last_extension_at, last_extension_ref_price
97 FROM market_catalog_state
98 WHERE underlying = $1 AND expiry = $2
99 "#,
100 )
101 .bind::<Text, _>(underlying)
102 .bind::<BigInt, _>(expiry)
103 .get_results::<MarketCatalogStateRow>(&mut conn)
104 .await
105 .context("Failed to query market_catalog_state")?;
106
107 Ok(rows
108 .into_iter()
109 .next()
110 .map(|row| hypercall_db::MarketCatalogState {
111 underlying: row.underlying,
112 expiry: row.expiry,
113 ref_price_at_listing: row.ref_price_at_listing,
114 listed_at: row.listed_at,
115 listing_policy_version: row.listing_policy_version,
116 last_extension_at: row.last_extension_at,
117 last_extension_ref_price: row.last_extension_ref_price,
118 }))
119 }
120
121 async fn get_instruments_for_settlement(
122 &self,
123 underlying: &str,
124 expiry: i64,
125 ) -> Result<Vec<hypercall_db::CatalogInstrument>> {
126 let mut conn = self.get_conn().await?;
127 let rows = diesel::sql_query(
128 r#"
129 SELECT id, underlying, strike, expiry, option_type
130 FROM instruments
131 WHERE underlying = $1 AND expiry = $2
132 "#,
133 )
134 .bind::<Text, _>(underlying)
135 .bind::<BigInt, _>(expiry)
136 .get_results::<InstrumentRow>(&mut conn)
137 .await
138 .context("Failed to query instruments for settlement")?;
139
140 Ok(rows
141 .into_iter()
142 .map(|row| hypercall_db::CatalogInstrument {
143 id: row.id,
144 underlying: row.underlying,
145 strike: row.strike,
146 expiry: row.expiry,
147 option_type: row.option_type,
148 })
149 .collect())
150 }
151
152 async fn get_distinct_strikes(&self, underlying: &str, expiry: i64) -> Result<Vec<Decimal>> {
153 let mut conn = self.get_conn().await?;
154 let rows = diesel::sql_query(
155 r#"
156 SELECT DISTINCT strike FROM instruments
157 WHERE underlying = $1 AND expiry = $2
158 ORDER BY strike
159 "#,
160 )
161 .bind::<Text, _>(underlying)
162 .bind::<BigInt, _>(expiry)
163 .get_results::<StrikeRow>(&mut conn)
164 .await
165 .context("Failed to query distinct strikes")?;
166
167 Ok(rows.into_iter().map(|row| row.strike).collect())
168 }
169
170 async fn get_markets_for_underlying(&self, underlying: &str) -> Result<Vec<(String, i64)>> {
171 let mut conn = self.get_conn().await?;
172 let rows = diesel::sql_query(
173 r#"
174 SELECT underlying, expiry FROM markets
175 WHERE underlying = $1
176 ORDER BY expiry
177 "#,
178 )
179 .bind::<Text, _>(underlying)
180 .get_results::<MarketRow>(&mut conn)
181 .await
182 .context("Failed to query markets for underlying")?;
183
184 Ok(rows
185 .into_iter()
186 .map(|row| (row.underlying, row.expiry))
187 .collect())
188 }
189
190 async fn get_instruments_count(&self, underlying: &str, expiry: i64) -> Result<i32> {
191 let mut conn = self.get_conn().await?;
192 let row = diesel::sql_query(
193 r#"
194 SELECT COUNT(*)::int as count FROM instruments
195 WHERE underlying = $1 AND expiry = $2
196 "#,
197 )
198 .bind::<Text, _>(underlying)
199 .bind::<BigInt, _>(expiry)
200 .get_result::<CountRow>(&mut conn)
201 .await
202 .context("Failed to count instruments")?;
203
204 Ok(row.count)
205 }
206
207 async fn market_exists(&self, underlying: &str, expiry: i64) -> Result<bool> {
208 let mut conn = self.get_conn().await?;
209 let rows = diesel::sql_query(
210 r#"
211 SELECT 1 as one FROM markets WHERE underlying = $1 AND expiry = $2
212 "#,
213 )
214 .bind::<Text, _>(underlying)
215 .bind::<BigInt, _>(expiry)
216 .get_results::<ExistsRow>(&mut conn)
217 .await
218 .context("Failed to check market existence")?;
219
220 Ok(!rows.is_empty())
221 }
222
223 async fn instrument_exists(&self, symbol: &str) -> Result<bool> {
224 let mut conn = self.get_conn().await?;
225 let rows = diesel::sql_query(
226 r#"
227 SELECT 1 as one FROM instruments WHERE id = $1
228 "#,
229 )
230 .bind::<Text, _>(symbol)
231 .get_results::<ExistsRow>(&mut conn)
232 .await
233 .context("Failed to check instrument existence")?;
234
235 Ok(!rows.is_empty())
236 }
237}
238
239#[async_trait::async_trait]
244impl hypercall_db::CatalogWriter for DieselDb {
245 async fn upsert_market_catalog_state(
246 &self,
247 state: &hypercall_db::MarketCatalogState,
248 ) -> Result<()> {
249 let mut conn = self.get_conn().await?;
250 diesel::sql_query(
251 r#"
252 INSERT INTO market_catalog_state
253 (underlying, expiry, ref_price_at_listing, listed_at, listing_policy_version,
254 last_extension_at, last_extension_ref_price)
255 VALUES ($1, $2, $3, $4, $5, $6, $7)
256 ON CONFLICT (underlying, expiry)
257 DO UPDATE SET
258 last_extension_at = EXCLUDED.last_extension_at,
259 last_extension_ref_price = EXCLUDED.last_extension_ref_price
260 "#,
261 )
262 .bind::<Text, _>(&state.underlying)
263 .bind::<BigInt, _>(state.expiry)
264 .bind::<Numeric, _>(state.ref_price_at_listing)
265 .bind::<BigInt, _>(state.listed_at)
266 .bind::<Integer, _>(state.listing_policy_version)
267 .bind::<Nullable<BigInt>, _>(state.last_extension_at)
268 .bind::<Nullable<Numeric>, _>(state.last_extension_ref_price)
269 .execute(&mut conn)
270 .await
271 .context("Failed to upsert market_catalog_state")?;
272
273 Ok(())
274 }
275
276 async fn update_trading_mode_for_underlying(
277 &self,
278 underlying: &str,
279 trading_mode: &str,
280 ) -> Result<()> {
281 let mut conn = self.get_conn().await?;
282 diesel::sql_query(
283 "UPDATE instruments SET trading_mode = $1 WHERE underlying = $2 AND trading_mode != $1",
284 )
285 .bind::<Text, _>(trading_mode)
286 .bind::<Text, _>(underlying)
287 .execute(&mut conn)
288 .await
289 .context("Failed to update trading_mode")?;
290
291 Ok(())
292 }
293}
294
295#[derive(Debug, Clone)]
301pub struct CatalogListingState {
302 pub underlying: String,
303 pub expiry: i64,
304 pub ref_price_at_listing: Decimal,
305 pub listed_at: i64,
306 pub listing_policy_version: i32,
307 pub last_extension_at: Option<i64>,
308 pub last_extension_ref_price: Option<Decimal>,
309}
310
311#[derive(QueryableByName, Debug)]
312struct CatalogListingStateRow {
313 #[diesel(sql_type = Text)]
314 pub underlying: String,
315 #[diesel(sql_type = BigInt)]
316 pub expiry: i64,
317 #[diesel(sql_type = Numeric)]
318 pub ref_price_at_listing: Decimal,
319 #[diesel(sql_type = BigInt)]
320 pub listed_at: i64,
321 #[diesel(sql_type = Integer)]
322 pub listing_policy_version: i32,
323 #[diesel(sql_type = Nullable<BigInt>)]
324 pub last_extension_at: Option<i64>,
325 #[diesel(sql_type = Nullable<Numeric>)]
326 pub last_extension_ref_price: Option<Decimal>,
327}
328
329#[derive(QueryableByName, Debug)]
330#[allow(dead_code)]
331struct ExistsRow {
332 #[diesel(sql_type = Integer)]
333 pub one: i32,
334}
335
336impl DieselDb {
337 pub async fn get_catalog_listing_state(
341 &self,
342 underlying: &str,
343 expiry: i64,
344 ) -> Result<Option<CatalogListingState>> {
345 let mut conn = self.get_conn().await?;
346 let rows = diesel::sql_query(
347 r#"
348 SELECT underlying, expiry, ref_price_at_listing, listed_at,
349 listing_policy_version, last_extension_at, last_extension_ref_price
350 FROM market_catalog_state
351 WHERE underlying = $1 AND expiry = $2
352 "#,
353 )
354 .bind::<Text, _>(underlying)
355 .bind::<BigInt, _>(expiry)
356 .get_results::<CatalogListingStateRow>(&mut conn)
357 .await
358 .context("Failed to query market_catalog_state")?;
359
360 Ok(rows.into_iter().next().map(|row| CatalogListingState {
361 underlying: row.underlying,
362 expiry: row.expiry,
363 ref_price_at_listing: row.ref_price_at_listing,
364 listed_at: row.listed_at,
365 listing_policy_version: row.listing_policy_version,
366 last_extension_at: row.last_extension_at,
367 last_extension_ref_price: row.last_extension_ref_price,
368 }))
369 }
370
371 pub async fn insert_catalog_listing_state(
373 &self,
374 underlying: &str,
375 expiry: i64,
376 ref_price: f64,
377 policy_version: i32,
378 ) -> Result<()> {
379 let now_ms = chrono::Utc::now().timestamp_millis();
380 let ref_price_dec = Decimal::from_f64_retain(ref_price)
381 .ok_or_else(|| anyhow::anyhow!("Invalid ref price: {}", ref_price))?;
382
383 let mut conn = self.get_conn().await?;
384 diesel::sql_query(
385 r#"
386 INSERT INTO market_catalog_state
387 (underlying, expiry, ref_price_at_listing, listed_at, listing_policy_version)
388 VALUES ($1, $2, $3, $4, $5)
389 ON CONFLICT (underlying, expiry) DO NOTHING
390 "#,
391 )
392 .bind::<Text, _>(underlying)
393 .bind::<BigInt, _>(expiry)
394 .bind::<Numeric, _>(ref_price_dec)
395 .bind::<BigInt, _>(now_ms)
396 .bind::<Integer, _>(policy_version)
397 .execute(&mut conn)
398 .await
399 .context("Failed to insert market_catalog_state")?;
400
401 tracing::info!(
402 "Inserted catalog state for {}-{} with ref_price={}",
403 underlying,
404 expiry,
405 ref_price
406 );
407
408 Ok(())
409 }
410
411 pub async fn update_extension_state(
413 &self,
414 underlying: &str,
415 expiry: i64,
416 ref_price: f64,
417 ) -> Result<()> {
418 let now_ms = chrono::Utc::now().timestamp_millis();
419 let ref_price_dec = Decimal::from_f64_retain(ref_price)
420 .ok_or_else(|| anyhow::anyhow!("Invalid ref price: {}", ref_price))?;
421
422 let mut conn = self.get_conn().await?;
423 let updated = diesel::sql_query(
424 r#"
425 UPDATE market_catalog_state
426 SET last_extension_at = $3, last_extension_ref_price = $4
427 WHERE underlying = $1 AND expiry = $2
428 "#,
429 )
430 .bind::<Text, _>(underlying)
431 .bind::<BigInt, _>(expiry)
432 .bind::<BigInt, _>(now_ms)
433 .bind::<Numeric, _>(ref_price_dec)
434 .execute(&mut conn)
435 .await
436 .context("Failed to update extension state")?;
437
438 if updated == 0 {
439 anyhow::bail!(
440 "No market_catalog_state row for {}-{}: cannot update extension",
441 underlying,
442 expiry
443 );
444 }
445
446 debug!(
447 "Updated extension state for {}-{} with ref_price={}",
448 underlying, expiry, ref_price
449 );
450
451 Ok(())
452 }
453
454 pub async fn get_existing_strikes(&self, underlying: &str, expiry: i64) -> Result<Vec<f64>> {
456 let mut conn = self.get_conn().await?;
457 let rows = diesel::sql_query(
458 r#"
459 SELECT DISTINCT strike FROM instruments
460 WHERE underlying = $1 AND expiry = $2
461 ORDER BY strike
462 "#,
463 )
464 .bind::<Text, _>(underlying)
465 .bind::<BigInt, _>(expiry)
466 .get_results::<StrikeRow>(&mut conn)
467 .await
468 .context("Failed to query strikes")?;
469
470 rows.into_iter()
471 .map(|row| {
472 row.strike
473 .to_string()
474 .parse::<f64>()
475 .map_err(|e| anyhow::anyhow!("invalid strike value {}: {}", row.strike, e))
476 })
477 .collect::<Result<Vec<f64>>>()
478 }
479
480 pub async fn update_trading_mode_count(
483 &self,
484 underlying: &str,
485 mode_str: &str,
486 ) -> Result<usize> {
487 let mut conn = self.get_conn().await?;
488 let result = diesel::sql_query(
489 "UPDATE instruments SET trading_mode = $1 WHERE underlying = $2 AND trading_mode != $1",
490 )
491 .bind::<Text, _>(mode_str)
492 .bind::<Text, _>(underlying)
493 .execute(&mut conn)
494 .await
495 .context("Failed to update trading_mode")?;
496
497 Ok(result)
498 }
499
500 pub async fn try_acquire_advisory_lock_on_conn(
505 &self,
506 conn: &mut diesel_async::AsyncPgConnection,
507 key: i64,
508 ) -> Result<bool> {
509 let result = diesel::sql_query("SELECT pg_try_advisory_lock($1)")
510 .bind::<BigInt, _>(key)
511 .get_result::<AdvisoryLockResult>(conn)
512 .await
513 .context("Failed to try advisory lock")?;
514
515 if result.pg_try_advisory_lock {
516 debug!("Acquired advisory lock {}", key);
517 } else {
518 debug!("Advisory lock {} held by another session", key);
519 }
520
521 Ok(result.pg_try_advisory_lock)
522 }
523
524 pub async fn release_advisory_lock_on_conn(
526 &self,
527 conn: &mut diesel_async::AsyncPgConnection,
528 key: i64,
529 ) -> Result<()> {
530 let result = diesel::sql_query("SELECT pg_advisory_unlock($1)")
531 .bind::<BigInt, _>(key)
532 .get_result::<AdvisoryUnlockResult>(conn)
533 .await
534 .context("Failed to release advisory lock")?;
535
536 if result.pg_advisory_unlock {
537 debug!("Released advisory lock {}", key);
538 } else {
539 debug!("Advisory lock {} was not held on this connection", key);
540 }
541
542 Ok(())
543 }
544}
545
546#[cfg(test)]
547mod tests {
548 use crate::test_helpers::TestDb;
549 use hypercall_db::*;
550 use hypercall_types::api_models::InstrumentStatus;
551 use hypercall_types::OptionType;
552 use rust_decimal_macros::dec;
553
554 #[tokio::test]
555 async fn get_instruments_for_settlement_returns_matching() {
556 let test_db = TestDb::new().await.unwrap();
557 let db = test_db.diesel_db().await;
558 let handler = test_db.handler.as_ref();
559
560 let call = InstrumentRecord {
561 instrument_numeric_id: 0,
562 id: "BTC-20260131-100000-C".to_string(),
563 underlying: "BTC".to_string(),
564 strike: dec!(100000),
565 expiry: 20260131,
566 option_type: OptionType::Call,
567 option_token_address: None,
568 status: InstrumentStatus::Active,
569 trading_mode: "orderbook".to_string(),
570 };
571 handler
572 .save_market_and_instrument_sync("BTC", 20260131, &call)
573 .unwrap();
574
575 let put = InstrumentRecord {
576 instrument_numeric_id: 0,
577 id: "BTC-20260131-90000-P".to_string(),
578 underlying: "BTC".to_string(),
579 strike: dec!(90000),
580 expiry: 20260131,
581 option_type: OptionType::Put,
582 option_token_address: None,
583 status: InstrumentStatus::Active,
584 trading_mode: "orderbook".to_string(),
585 };
586 handler
587 .save_market_and_instrument_sync("BTC", 20260131, &put)
588 .unwrap();
589
590 let eth = InstrumentRecord {
592 instrument_numeric_id: 0,
593 id: "ETH-20260131-4000-C".to_string(),
594 underlying: "ETH".to_string(),
595 strike: dec!(4000),
596 expiry: 20260131,
597 option_type: OptionType::Call,
598 option_token_address: None,
599 status: InstrumentStatus::Active,
600 trading_mode: "orderbook".to_string(),
601 };
602 handler
603 .save_market_and_instrument_sync("ETH", 20260131, ð)
604 .unwrap();
605
606 let instruments = db
607 .get_instruments_for_settlement("BTC", 20260131)
608 .await
609 .unwrap();
610 assert_eq!(instruments.len(), 2);
611 assert!(instruments.iter().all(|i| i.underlying == "BTC"));
612 assert!(instruments.iter().all(|i| i.expiry == 20260131));
613 }
614
615 #[tokio::test]
616 async fn get_distinct_strikes_returns_sorted_unique() {
617 let test_db = TestDb::new().await.unwrap();
618 let db = test_db.diesel_db().await;
619 let handler = test_db.handler.as_ref();
620
621 for (strike_val, opt_type, opt_str) in [
622 (dec!(100000), OptionType::Call, "C"),
623 (dec!(100000), OptionType::Put, "P"),
624 (dec!(90000), OptionType::Put, "P"),
625 (dec!(110000), OptionType::Call, "C"),
626 ] {
627 let id = format!("BTC-20260131-{}-{}", strike_val, opt_str);
628 let inst = InstrumentRecord {
629 instrument_numeric_id: 0,
630 id,
631 underlying: "BTC".to_string(),
632 strike: strike_val,
633 expiry: 20260131,
634 option_type: opt_type,
635 option_token_address: None,
636 status: InstrumentStatus::Active,
637 trading_mode: "orderbook".to_string(),
638 };
639 handler
640 .save_market_and_instrument_sync("BTC", 20260131, &inst)
641 .unwrap();
642 }
643
644 let strikes = db.get_distinct_strikes("BTC", 20260131).await.unwrap();
645 assert_eq!(strikes.len(), 3);
646 assert_eq!(strikes[0], dec!(90000));
647 assert_eq!(strikes[1], dec!(100000));
648 assert_eq!(strikes[2], dec!(110000));
649 }
650
651 #[tokio::test]
652 async fn get_markets_for_underlying_returns_ordered() {
653 let test_db = TestDb::new().await.unwrap();
654 let db = test_db.diesel_db().await;
655 let handler = test_db.handler.as_ref();
656
657 let inst1 = InstrumentRecord {
658 instrument_numeric_id: 0,
659 id: "BTC-20260131-100000-C".to_string(),
660 underlying: "BTC".to_string(),
661 strike: dec!(100000),
662 expiry: 20260131,
663 option_type: OptionType::Call,
664 option_token_address: None,
665 status: InstrumentStatus::Active,
666 trading_mode: "orderbook".to_string(),
667 };
668 handler
669 .save_market_and_instrument_sync("BTC", 20260131, &inst1)
670 .unwrap();
671
672 let inst2 = InstrumentRecord {
673 instrument_numeric_id: 0,
674 id: "BTC-20260228-100000-C".to_string(),
675 underlying: "BTC".to_string(),
676 strike: dec!(100000),
677 expiry: 20260228,
678 option_type: OptionType::Call,
679 option_token_address: None,
680 status: InstrumentStatus::Active,
681 trading_mode: "orderbook".to_string(),
682 };
683 handler
684 .save_market_and_instrument_sync("BTC", 20260228, &inst2)
685 .unwrap();
686
687 let markets = db.get_markets_for_underlying("BTC").await.unwrap();
688 assert_eq!(markets.len(), 2);
689 assert_eq!(markets[0].1, 20260131);
690 assert_eq!(markets[1].1, 20260228);
691 assert!(markets.iter().all(|(u, _)| u == "BTC"));
692 }
693
694 #[tokio::test]
695 async fn get_instruments_count_correct() {
696 let test_db = TestDb::new().await.unwrap();
697 let db = test_db.diesel_db().await;
698 let handler = test_db.handler.as_ref();
699
700 let count = db.get_instruments_count("BTC", 20260131).await.unwrap();
701 assert_eq!(count, 0);
702
703 for (strike, opt) in [(dec!(100000), "C"), (dec!(90000), "P")] {
704 let id = format!("BTC-20260131-{}-{}", strike, opt);
705 let inst = InstrumentRecord {
706 instrument_numeric_id: 0,
707 id,
708 underlying: "BTC".to_string(),
709 strike,
710 expiry: 20260131,
711 option_type: if opt == "C" {
712 OptionType::Call
713 } else {
714 OptionType::Put
715 },
716 option_token_address: None,
717 status: InstrumentStatus::Active,
718 trading_mode: "orderbook".to_string(),
719 };
720 handler
721 .save_market_and_instrument_sync("BTC", 20260131, &inst)
722 .unwrap();
723 }
724
725 let count = db.get_instruments_count("BTC", 20260131).await.unwrap();
726 assert_eq!(count, 2);
727
728 let count_eth = db.get_instruments_count("ETH", 20260131).await.unwrap();
729 assert_eq!(count_eth, 0);
730 }
731
732 #[tokio::test]
733 async fn update_trading_mode_changes_instruments() {
734 let test_db = TestDb::new().await.unwrap();
735 let db = test_db.diesel_db().await;
736 let handler = test_db.handler.as_ref();
737
738 let inst = InstrumentRecord {
739 instrument_numeric_id: 0,
740 id: "BTC-20260131-100000-C".to_string(),
741 underlying: "BTC".to_string(),
742 strike: dec!(100000),
743 expiry: 20260131,
744 option_type: OptionType::Call,
745 option_token_address: None,
746 status: InstrumentStatus::Active,
747 trading_mode: "orderbook".to_string(),
748 };
749 handler
750 .save_market_and_instrument_sync("BTC", 20260131, &inst)
751 .unwrap();
752
753 db.update_trading_mode_count("BTC", "rfq").await.unwrap();
754
755 let mut conn = test_db.handler.pool().get().unwrap();
756 #[derive(diesel::QueryableByName)]
757 struct ModeRow {
758 #[diesel(sql_type = diesel::sql_types::Text)]
759 trading_mode: String,
760 }
761 let rows: Vec<ModeRow> = diesel::RunQueryDsl::load(
762 diesel::sql_query(
763 "SELECT trading_mode FROM instruments WHERE id = 'BTC-20260131-100000-C'",
764 ),
765 &mut conn,
766 )
767 .unwrap();
768 assert_eq!(rows[0].trading_mode, "rfq");
769 }
770
771 #[tokio::test]
772 async fn catalog_listing_state_roundtrip() {
773 let test_db = TestDb::new().await.unwrap();
774 let db = test_db.diesel_db().await;
775 let handler = test_db.handler.as_ref();
776
777 let inst = InstrumentRecord {
778 instrument_numeric_id: 0,
779 id: "BTC-20260131-100000-C".to_string(),
780 underlying: "BTC".to_string(),
781 strike: dec!(100000),
782 expiry: 20260131,
783 option_type: OptionType::Call,
784 option_token_address: None,
785 status: InstrumentStatus::Active,
786 trading_mode: "orderbook".to_string(),
787 };
788 handler
789 .save_market_and_instrument_sync("BTC", 20260131, &inst)
790 .unwrap();
791
792 db.insert_catalog_listing_state("BTC", 20260131, 100000.0, 1)
793 .await
794 .unwrap();
795
796 let state = db.get_catalog_listing_state("BTC", 20260131).await.unwrap();
797 assert!(state.is_some());
798 let state = state.unwrap();
799 assert_eq!(state.underlying, "BTC");
800 assert_eq!(state.expiry, 20260131);
801 assert_eq!(state.listing_policy_version, 1);
802 assert!(state.last_extension_at.is_none());
803 }
804
805 #[tokio::test]
806 async fn update_extension_state_works() {
807 let test_db = TestDb::new().await.unwrap();
808 let db = test_db.diesel_db().await;
809 let handler = test_db.handler.as_ref();
810
811 let inst = InstrumentRecord {
812 instrument_numeric_id: 0,
813 id: "BTC-20260131-100000-C".to_string(),
814 underlying: "BTC".to_string(),
815 strike: dec!(100000),
816 expiry: 20260131,
817 option_type: OptionType::Call,
818 option_token_address: None,
819 status: InstrumentStatus::Active,
820 trading_mode: "orderbook".to_string(),
821 };
822 handler
823 .save_market_and_instrument_sync("BTC", 20260131, &inst)
824 .unwrap();
825
826 db.insert_catalog_listing_state("BTC", 20260131, 100000.0, 1)
827 .await
828 .unwrap();
829
830 db.update_extension_state("BTC", 20260131, 105000.0)
831 .await
832 .unwrap();
833
834 let state = db
835 .get_catalog_listing_state("BTC", 20260131)
836 .await
837 .unwrap()
838 .unwrap();
839 assert!(state.last_extension_at.is_some());
840 assert!(state.last_extension_ref_price.is_some());
841 }
842
843 #[tokio::test]
844 async fn market_exists_checks() {
845 let test_db = TestDb::new().await.unwrap();
846 let db = test_db.diesel_db().await;
847 let handler = test_db.handler.as_ref();
848
849 assert!(!db.market_exists("BTC", 20260131).await.unwrap());
850
851 let inst = InstrumentRecord {
852 instrument_numeric_id: 0,
853 id: "BTC-20260131-100000-C".to_string(),
854 underlying: "BTC".to_string(),
855 strike: dec!(100000),
856 expiry: 20260131,
857 option_type: OptionType::Call,
858 option_token_address: None,
859 status: InstrumentStatus::Active,
860 trading_mode: "orderbook".to_string(),
861 };
862 handler
863 .save_market_and_instrument_sync("BTC", 20260131, &inst)
864 .unwrap();
865
866 assert!(db.market_exists("BTC", 20260131).await.unwrap());
867 assert!(!db.market_exists("ETH", 20260131).await.unwrap());
868 }
869}