Skip to main content

hypercall_db_diesel/
instruments.rs

1//! InstrumentReader + InstrumentWriter implementations for DatabaseHandler.
2//!
3//! Handles instrument CRUD and derives on-chain option token addresses
4//! via `current_option_token_deployment()` on insert.
5
6use anyhow::Result;
7use diesel::RunQueryDsl;
8use rust_decimal::Decimal;
9
10use hypercall_types::WalletAddress;
11
12use crate::database_handler::{current_option_token_deployment, DatabaseHandler};
13
14impl hypercall_db::InstrumentReader for DatabaseHandler {
15    fn get_all_instruments_sync(&self) -> Result<Vec<hypercall_db::InstrumentRecord>> {
16        use diesel::sql_types::{BigInt, Integer, Numeric, Text};
17
18        #[derive(diesel::QueryableByName)]
19        struct InstrumentRow {
20            #[diesel(sql_type = Integer)]
21            instrument_numeric_id: i32,
22            #[diesel(sql_type = Text)]
23            id: String,
24            #[diesel(sql_type = Text)]
25            underlying: String,
26            #[diesel(sql_type = Numeric)]
27            strike: Decimal,
28            #[diesel(sql_type = BigInt)]
29            expiry: i64,
30            #[diesel(sql_type = Text)]
31            option_type: String,
32            #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Bytea>)]
33            option_token_address: Option<WalletAddress>,
34            #[diesel(sql_type = Text)]
35            status: String,
36            #[diesel(sql_type = Text)]
37            trading_mode: String,
38        }
39
40        let mut conn = self.pool().get()?;
41        let rows = diesel::sql_query(
42            "SELECT \
43                instrument_numeric_id, \
44                id, \
45                underlying, \
46                strike, \
47                expiry::bigint AS expiry, \
48                option_type, \
49                option_token_address, \
50                status, \
51                trading_mode \
52             FROM instruments \
53             ORDER BY id ASC",
54        )
55        .load::<InstrumentRow>(&mut conn)?;
56
57        Ok(rows
58            .into_iter()
59            .map(|row| {
60                let option_type = row.option_type.parse().unwrap_or_else(|_| {
61                    panic!(
62                        "invalid option_type '{}' in instruments table",
63                        row.option_type
64                    )
65                });
66                let status =
67                    hypercall_types::api_models::InstrumentStatus::from_db_str(&row.status)
68                        .unwrap_or_else(|| {
69                            panic!("invalid status '{}' in instruments table", row.status)
70                        });
71                hypercall_db::InstrumentRecord {
72                    instrument_numeric_id: row.instrument_numeric_id,
73                    id: row.id,
74                    underlying: row.underlying,
75                    strike: row.strike,
76                    expiry: row.expiry,
77                    option_type,
78                    option_token_address: row.option_token_address,
79                    status,
80                    trading_mode: row.trading_mode,
81                }
82            })
83            .collect())
84    }
85
86    fn get_instruments_by_status_sync(
87        &self,
88        status: &str,
89    ) -> Result<Vec<hypercall_db::InstrumentRecord>> {
90        use diesel::sql_types::{BigInt, Integer, Numeric, Text};
91
92        #[derive(diesel::QueryableByName)]
93        struct InstrumentRow {
94            #[diesel(sql_type = Integer)]
95            instrument_numeric_id: i32,
96            #[diesel(sql_type = Text)]
97            id: String,
98            #[diesel(sql_type = Text)]
99            underlying: String,
100            #[diesel(sql_type = Numeric)]
101            strike: Decimal,
102            #[diesel(sql_type = BigInt)]
103            expiry: i64,
104            #[diesel(sql_type = Text)]
105            option_type: String,
106            #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Bytea>)]
107            option_token_address: Option<WalletAddress>,
108            #[diesel(sql_type = Text)]
109            status: String,
110            #[diesel(sql_type = Text)]
111            trading_mode: String,
112        }
113
114        let mut conn = self.pool().get()?;
115        let rows = diesel::sql_query(
116            "SELECT \
117                instrument_numeric_id, \
118                id, \
119                underlying, \
120                strike, \
121                expiry::bigint AS expiry, \
122                option_type, \
123                option_token_address, \
124                status, \
125                trading_mode \
126             FROM instruments \
127             WHERE status = $1 \
128             ORDER BY expiry ASC, id ASC",
129        )
130        .bind::<diesel::sql_types::Text, _>(status)
131        .load::<InstrumentRow>(&mut conn)?;
132
133        Ok(rows
134            .into_iter()
135            .map(|row| {
136                let option_type = row.option_type.parse().unwrap_or_else(|_| {
137                    panic!(
138                        "invalid option_type '{}' in instruments table",
139                        row.option_type
140                    )
141                });
142                let status =
143                    hypercall_types::api_models::InstrumentStatus::from_db_str(&row.status)
144                        .unwrap_or_else(|| {
145                            panic!("invalid status '{}' in instruments table", row.status)
146                        });
147                hypercall_db::InstrumentRecord {
148                    instrument_numeric_id: row.instrument_numeric_id,
149                    id: row.id,
150                    underlying: row.underlying,
151                    strike: row.strike,
152                    expiry: row.expiry,
153                    option_type,
154                    option_token_address: row.option_token_address,
155                    status,
156                    trading_mode: row.trading_mode,
157                }
158            })
159            .collect())
160    }
161
162    fn get_instrument_status_counts_sync(&self) -> Result<Vec<(String, i64)>> {
163        use diesel::sql_types::{BigInt, Text};
164
165        #[derive(diesel::QueryableByName)]
166        struct StatusCount {
167            #[diesel(sql_type = Text)]
168            status: String,
169            #[diesel(sql_type = BigInt)]
170            count: i64,
171        }
172
173        let mut conn = self.pool().get()?;
174        let rows = diesel::sql_query(
175            "SELECT status, COUNT(*)::bigint AS count \
176             FROM instruments \
177             GROUP BY status",
178        )
179        .load::<StatusCount>(&mut conn)?;
180
181        Ok(rows.into_iter().map(|r| (r.status, r.count)).collect())
182    }
183
184    fn get_markets_expiring_within_sync(&self, seconds: i64) -> Result<i64> {
185        use diesel::sql_types::BigInt;
186
187        let now = chrono::Utc::now();
188        let deadline = now + chrono::Duration::seconds(seconds);
189        let deadline_yyyymmdd = deadline.format("%Y%m%d").to_string().parse::<i64>()?;
190        let now_yyyymmdd = now.format("%Y%m%d").to_string().parse::<i64>()?;
191
192        #[derive(diesel::QueryableByName)]
193        struct CountResult {
194            #[diesel(sql_type = BigInt)]
195            count: i64,
196        }
197
198        let mut conn = self.pool().get()?;
199        let result = diesel::sql_query(
200            "SELECT COUNT(*)::bigint AS count \
201             FROM instruments \
202             WHERE expiry > $1 AND expiry <= $2 AND status = 'ACTIVE'",
203        )
204        .bind::<BigInt, _>(now_yyyymmdd)
205        .bind::<BigInt, _>(deadline_yyyymmdd)
206        .get_result::<CountResult>(&mut conn)?;
207
208        Ok(result.count)
209    }
210}
211
212impl hypercall_db::InstrumentWriter for DatabaseHandler {
213    fn save_market_and_instrument_sync(
214        &self,
215        underlying: &str,
216        expiry: i64,
217        instrument: &hypercall_db::InstrumentRecord,
218    ) -> Result<()> {
219        use crate::models::{Instrument, Market};
220        use diesel::prelude::*;
221
222        let market = Market {
223            underlying: underlying.to_string(),
224            expiry,
225        };
226        let diesel_instrument = Instrument {
227            instrument_numeric_id: instrument.instrument_numeric_id,
228            id: instrument.id.clone(),
229            underlying: instrument.underlying.clone(),
230            strike: instrument.strike,
231            expiry: instrument.expiry,
232            option_type: instrument.option_type.to_string().to_lowercase(),
233            option_token_address: instrument.option_token_address,
234            status: match instrument.status {
235                hypercall_types::api_models::InstrumentStatus::Active => "ACTIVE",
236                hypercall_types::api_models::InstrumentStatus::ExpiredPendingPrice => {
237                    "EXPIRED_PENDING_PRICE"
238                }
239                hypercall_types::api_models::InstrumentStatus::Settled => "SETTLED",
240            }
241            .to_string(),
242            trading_mode: instrument.trading_mode.clone(),
243        };
244
245        let instrument_strike = diesel_instrument.strike;
246        let instrument_expiry_yyyymmdd =
247            u64::try_from(diesel_instrument.expiry).map_err(|_| {
248                anyhow::anyhow!(
249                    "Invalid instrument expiry {}: cannot derive option token address from negative YYYYMMDD",
250                    diesel_instrument.expiry
251                )
252            })?;
253        let option_token_address = Some(hypercall_types::derive_option_token_address(
254            current_option_token_deployment()?,
255            &diesel_instrument.underlying,
256            instrument_expiry_yyyymmdd,
257            instrument_strike,
258            &diesel_instrument.option_type,
259        )?);
260
261        let mut conn = self.pool().get()?;
262        conn.transaction::<_, anyhow::Error, _>(|conn| {
263            diesel::sql_query(
264                "INSERT INTO markets (underlying, expiry) VALUES ($1, $2) ON CONFLICT (underlying, expiry) DO NOTHING"
265            )
266            .bind::<diesel::sql_types::Text, _>(&market.underlying)
267            .bind::<diesel::sql_types::BigInt, _>(market.expiry)
268            .execute(conn)?;
269
270            match diesel::sql_query(
271                "INSERT INTO instruments (id, underlying, strike, expiry, option_type, option_token_address, trading_mode) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (id) DO NOTHING"
272            )
273            .bind::<diesel::sql_types::Text, _>(&diesel_instrument.id)
274            .bind::<diesel::sql_types::Text, _>(&diesel_instrument.underlying)
275            .bind::<diesel::sql_types::Numeric, _>(instrument_strike)
276            .bind::<diesel::sql_types::BigInt, _>(diesel_instrument.expiry)
277            .bind::<diesel::sql_types::Text, _>(&diesel_instrument.option_type)
278            .bind::<diesel::sql_types::Nullable<diesel::sql_types::Bytea>, _>(option_token_address)
279            .bind::<diesel::sql_types::Text, _>(&diesel_instrument.trading_mode)
280            .execute(conn) {
281                Ok(_rows) => {}
282                Err(err) => {
283                    DatabaseHandler::observe_diesel_option_token_violation(&err);
284                    return Err(err.into());
285                }
286            };
287
288            Ok(())
289        })
290    }
291
292    fn delete_market_and_instrument_sync(&self, symbol: &str) -> Result<()> {
293        use diesel::prelude::*;
294
295        #[derive(diesel::QueryableByName)]
296        struct MarketKey {
297            #[diesel(sql_type = diesel::sql_types::Text)]
298            underlying: String,
299            #[diesel(sql_type = diesel::sql_types::BigInt)]
300            expiry: i64,
301        }
302
303        let mut conn = self.pool().get()?;
304
305        conn.transaction::<_, anyhow::Error, _>(|conn| {
306            let key: MarketKey = diesel::sql_query(
307                "SELECT underlying, expiry
308                 FROM instruments
309                 WHERE id = $1",
310            )
311            .bind::<diesel::sql_types::Text, _>(symbol)
312            .get_result(conn)
313            .optional()?
314            .ok_or_else(|| {
315                anyhow::anyhow!("Market instrument does not exist for symbol: {}", symbol)
316            })?;
317
318            let deleted_instruments = diesel::sql_query(
319                "DELETE FROM instruments
320                 WHERE id = $1",
321            )
322            .bind::<diesel::sql_types::Text, _>(symbol)
323            .execute(conn)?;
324
325            if deleted_instruments != 1 {
326                return Err(anyhow::anyhow!(
327                    "Expected to delete exactly one instrument for symbol {}, deleted {}",
328                    symbol,
329                    deleted_instruments
330                ));
331            }
332
333            diesel::sql_query(
334                "DELETE FROM markets
335                 WHERE underlying = $1
336                   AND expiry = $2
337                   AND NOT EXISTS (
338                       SELECT 1
339                       FROM instruments
340                       WHERE underlying = $1
341                         AND expiry = $2
342                   )",
343            )
344            .bind::<diesel::sql_types::Text, _>(&key.underlying)
345            .bind::<diesel::sql_types::BigInt, _>(key.expiry)
346            .execute(conn)?;
347
348            Ok(())
349        })
350    }
351
352    fn update_instrument_status_sync(&self, symbols: &[String], status: &str) -> Result<usize> {
353        let mut conn = self.pool().get()?;
354
355        let updated = diesel::sql_query("UPDATE instruments SET status = $1 WHERE id = ANY($2)")
356            .bind::<diesel::sql_types::Text, _>(status)
357            .bind::<diesel::sql_types::Array<diesel::sql_types::Text>, _>(symbols)
358            .execute(&mut conn)?;
359
360        tracing::info!("Updated {} instruments to status '{}'", updated, status);
361
362        Ok(updated)
363    }
364
365    fn transition_active_instruments_to_expired_pending_sync(
366        &self,
367        symbols: &[String],
368    ) -> Result<usize> {
369        let mut conn = self.pool().get()?;
370
371        let updated = diesel::sql_query(
372            "UPDATE instruments
373             SET status = 'EXPIRED_PENDING_PRICE'
374             WHERE id = ANY($1)
375               AND status = 'ACTIVE'",
376        )
377        .bind::<diesel::sql_types::Array<diesel::sql_types::Text>, _>(symbols)
378        .execute(&mut conn)?;
379
380        tracing::info!(
381            "Transitioned {} ACTIVE instruments to EXPIRED_PENDING_PRICE",
382            updated
383        );
384
385        Ok(updated)
386    }
387}
388
389#[cfg(test)]
390mod tests {
391    use crate::test_helpers::TestDb;
392    use hypercall_db::*;
393    use rust_decimal_macros::dec;
394
395    #[tokio::test]
396    async fn instrument_write_read_roundtrip() {
397        let test_db = TestDb::new().await.unwrap();
398        let db = test_db.handler.as_ref();
399
400        let instrument = InstrumentRecord {
401            instrument_numeric_id: 1,
402            id: "BTC-20260131-100000-C".to_string(),
403            underlying: "BTC".to_string(),
404            strike: dec!(100000),
405            expiry: 20260131,
406            option_type: hypercall_types::enums::OptionType::Call,
407            option_token_address: None,
408            status: hypercall_types::api_models::InstrumentStatus::Active,
409            trading_mode: "orderbook".to_string(),
410        };
411
412        db.save_market_and_instrument_sync("BTC", 20260131, &instrument)
413            .unwrap();
414
415        let loaded = db.get_all_instruments_sync().unwrap();
416        assert_eq!(loaded.len(), 1);
417        assert_eq!(loaded[0].id, "BTC-20260131-100000-C");
418        assert_eq!(loaded[0].strike, dec!(100000));
419    }
420
421    #[tokio::test]
422    async fn instrument_status_counts() {
423        let test_db = TestDb::new().await.unwrap();
424        let db = test_db.handler.as_ref();
425
426        for (i, symbol) in ["BTC-20260131-100000-C", "BTC-20260131-110000-C"]
427            .iter()
428            .enumerate()
429        {
430            let instrument = InstrumentRecord {
431                instrument_numeric_id: (i + 1) as i32,
432                id: symbol.to_string(),
433                underlying: "BTC".to_string(),
434                strike: dec!(100000) + rust_decimal::Decimal::from(i as i64) * dec!(10000),
435                expiry: 20260131,
436                option_type: hypercall_types::enums::OptionType::Call,
437                option_token_address: None,
438                status: hypercall_types::api_models::InstrumentStatus::Active,
439                trading_mode: "orderbook".to_string(),
440            };
441            db.save_market_and_instrument_sync("BTC", 20260131, &instrument)
442                .unwrap();
443        }
444
445        let counts = db.get_instrument_status_counts_sync().unwrap();
446        let active_count = counts.iter().find(|(s, _)| s == "ACTIVE").map(|(_, c)| *c);
447        assert_eq!(active_count, Some(2));
448    }
449
450    #[tokio::test]
451    async fn instrument_update_status() {
452        let test_db = TestDb::new().await.unwrap();
453        let db = test_db.handler.as_ref();
454
455        let instrument = InstrumentRecord {
456            instrument_numeric_id: 1,
457            id: "ETH-20260131-4000-P".to_string(),
458            underlying: "ETH".to_string(),
459            strike: dec!(4000),
460            expiry: 20260131,
461            option_type: hypercall_types::enums::OptionType::Put,
462            option_token_address: None,
463            status: hypercall_types::api_models::InstrumentStatus::Active,
464            trading_mode: "orderbook".to_string(),
465        };
466
467        db.save_market_and_instrument_sync("ETH", 20260131, &instrument)
468            .unwrap();
469
470        let updated = db
471            .update_instrument_status_sync(
472                &["ETH-20260131-4000-P".to_string()],
473                "EXPIRED_PENDING_PRICE",
474            )
475            .unwrap();
476        assert_eq!(updated, 1);
477
478        let loaded = db
479            .get_instruments_by_status_sync("EXPIRED_PENDING_PRICE")
480            .unwrap();
481        assert_eq!(loaded.len(), 1);
482        assert_eq!(loaded[0].id, "ETH-20260131-4000-P");
483    }
484
485    #[tokio::test]
486    async fn instrument_active_to_expired_pending_transition_does_not_downgrade_settled() {
487        let test_db = TestDb::new().await.unwrap();
488        let db = test_db.handler.as_ref();
489
490        let active_symbol = "ETH-20260131-4000-C".to_string();
491        let settled_symbol = "ETH-20260131-4100-C".to_string();
492        for (instrument_numeric_id, symbol, strike) in [
493            (1, active_symbol.as_str(), dec!(4000)),
494            (2, settled_symbol.as_str(), dec!(4100)),
495        ] {
496            let instrument = InstrumentRecord {
497                instrument_numeric_id,
498                id: symbol.to_string(),
499                underlying: "ETH".to_string(),
500                strike,
501                expiry: 20260131,
502                option_type: hypercall_types::enums::OptionType::Call,
503                option_token_address: None,
504                status: hypercall_types::api_models::InstrumentStatus::Active,
505                trading_mode: "orderbook".to_string(),
506            };
507            db.save_market_and_instrument_sync("ETH", 20260131, &instrument)
508                .unwrap();
509        }
510
511        db.update_instrument_status_sync(std::slice::from_ref(&settled_symbol), "SETTLED")
512            .unwrap();
513
514        let updated = db
515            .transition_active_instruments_to_expired_pending_sync(&[
516                active_symbol.clone(),
517                settled_symbol.clone(),
518            ])
519            .unwrap();
520        assert_eq!(updated, 1);
521
522        let pending = db
523            .get_instruments_by_status_sync("EXPIRED_PENDING_PRICE")
524            .unwrap();
525        assert_eq!(pending.len(), 1);
526        assert_eq!(pending[0].id, active_symbol);
527
528        let settled = db.get_instruments_by_status_sync("SETTLED").unwrap();
529        assert_eq!(settled.len(), 1);
530        assert_eq!(settled[0].id, settled_symbol);
531    }
532
533    #[tokio::test]
534    async fn instrument_delete() {
535        let test_db = TestDb::new().await.unwrap();
536        let db = test_db.handler.as_ref();
537
538        let instrument = InstrumentRecord {
539            instrument_numeric_id: 1,
540            id: "BTC-20260131-100000-C".to_string(),
541            underlying: "BTC".to_string(),
542            strike: dec!(100000),
543            expiry: 20260131,
544            option_type: hypercall_types::enums::OptionType::Call,
545            option_token_address: None,
546            status: hypercall_types::api_models::InstrumentStatus::Active,
547            trading_mode: "orderbook".to_string(),
548        };
549
550        db.save_market_and_instrument_sync("BTC", 20260131, &instrument)
551            .unwrap();
552        db.delete_market_and_instrument_sync("BTC-20260131-100000-C")
553            .unwrap();
554
555        let loaded = db.get_all_instruments_sync().unwrap();
556        assert!(loaded.is_empty());
557    }
558}