1use anyhow::Result;
7use diesel::RunQueryDsl;
8use rust_decimal::Decimal;
9
10use hypercall_types::WalletAddress;
11
12use crate::database_handler::{current_option_token_deployment, DatabaseHandler};
13
14impl hypercall_db::InstrumentReader for DatabaseHandler {
15 fn get_all_instruments_sync(&self) -> Result<Vec<hypercall_db::InstrumentRecord>> {
16 use diesel::sql_types::{BigInt, Integer, Numeric, Text};
17
18 #[derive(diesel::QueryableByName)]
19 struct InstrumentRow {
20 #[diesel(sql_type = Integer)]
21 instrument_numeric_id: i32,
22 #[diesel(sql_type = Text)]
23 id: String,
24 #[diesel(sql_type = Text)]
25 underlying: String,
26 #[diesel(sql_type = Numeric)]
27 strike: Decimal,
28 #[diesel(sql_type = BigInt)]
29 expiry: i64,
30 #[diesel(sql_type = Text)]
31 option_type: String,
32 #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Bytea>)]
33 option_token_address: Option<WalletAddress>,
34 #[diesel(sql_type = Text)]
35 status: String,
36 #[diesel(sql_type = Text)]
37 trading_mode: String,
38 }
39
40 let mut conn = self.pool().get()?;
41 let rows = diesel::sql_query(
42 "SELECT \
43 instrument_numeric_id, \
44 id, \
45 underlying, \
46 strike, \
47 expiry::bigint AS expiry, \
48 option_type, \
49 option_token_address, \
50 status, \
51 trading_mode \
52 FROM instruments \
53 ORDER BY id ASC",
54 )
55 .load::<InstrumentRow>(&mut conn)?;
56
57 Ok(rows
58 .into_iter()
59 .map(|row| {
60 let option_type = row.option_type.parse().unwrap_or_else(|_| {
61 panic!(
62 "invalid option_type '{}' in instruments table",
63 row.option_type
64 )
65 });
66 let status =
67 hypercall_types::api_models::InstrumentStatus::from_db_str(&row.status)
68 .unwrap_or_else(|| {
69 panic!("invalid status '{}' in instruments table", row.status)
70 });
71 hypercall_db::InstrumentRecord {
72 instrument_numeric_id: row.instrument_numeric_id,
73 id: row.id,
74 underlying: row.underlying,
75 strike: row.strike,
76 expiry: row.expiry,
77 option_type,
78 option_token_address: row.option_token_address,
79 status,
80 trading_mode: row.trading_mode,
81 }
82 })
83 .collect())
84 }
85
86 fn get_instruments_by_status_sync(
87 &self,
88 status: &str,
89 ) -> Result<Vec<hypercall_db::InstrumentRecord>> {
90 use diesel::sql_types::{BigInt, Integer, Numeric, Text};
91
92 #[derive(diesel::QueryableByName)]
93 struct InstrumentRow {
94 #[diesel(sql_type = Integer)]
95 instrument_numeric_id: i32,
96 #[diesel(sql_type = Text)]
97 id: String,
98 #[diesel(sql_type = Text)]
99 underlying: String,
100 #[diesel(sql_type = Numeric)]
101 strike: Decimal,
102 #[diesel(sql_type = BigInt)]
103 expiry: i64,
104 #[diesel(sql_type = Text)]
105 option_type: String,
106 #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Bytea>)]
107 option_token_address: Option<WalletAddress>,
108 #[diesel(sql_type = Text)]
109 status: String,
110 #[diesel(sql_type = Text)]
111 trading_mode: String,
112 }
113
114 let mut conn = self.pool().get()?;
115 let rows = diesel::sql_query(
116 "SELECT \
117 instrument_numeric_id, \
118 id, \
119 underlying, \
120 strike, \
121 expiry::bigint AS expiry, \
122 option_type, \
123 option_token_address, \
124 status, \
125 trading_mode \
126 FROM instruments \
127 WHERE status = $1 \
128 ORDER BY expiry ASC, id ASC",
129 )
130 .bind::<diesel::sql_types::Text, _>(status)
131 .load::<InstrumentRow>(&mut conn)?;
132
133 Ok(rows
134 .into_iter()
135 .map(|row| {
136 let option_type = row.option_type.parse().unwrap_or_else(|_| {
137 panic!(
138 "invalid option_type '{}' in instruments table",
139 row.option_type
140 )
141 });
142 let status =
143 hypercall_types::api_models::InstrumentStatus::from_db_str(&row.status)
144 .unwrap_or_else(|| {
145 panic!("invalid status '{}' in instruments table", row.status)
146 });
147 hypercall_db::InstrumentRecord {
148 instrument_numeric_id: row.instrument_numeric_id,
149 id: row.id,
150 underlying: row.underlying,
151 strike: row.strike,
152 expiry: row.expiry,
153 option_type,
154 option_token_address: row.option_token_address,
155 status,
156 trading_mode: row.trading_mode,
157 }
158 })
159 .collect())
160 }
161
162 fn get_instrument_status_counts_sync(&self) -> Result<Vec<(String, i64)>> {
163 use diesel::sql_types::{BigInt, Text};
164
165 #[derive(diesel::QueryableByName)]
166 struct StatusCount {
167 #[diesel(sql_type = Text)]
168 status: String,
169 #[diesel(sql_type = BigInt)]
170 count: i64,
171 }
172
173 let mut conn = self.pool().get()?;
174 let rows = diesel::sql_query(
175 "SELECT status, COUNT(*)::bigint AS count \
176 FROM instruments \
177 GROUP BY status",
178 )
179 .load::<StatusCount>(&mut conn)?;
180
181 Ok(rows.into_iter().map(|r| (r.status, r.count)).collect())
182 }
183
184 fn get_markets_expiring_within_sync(&self, seconds: i64) -> Result<i64> {
185 use diesel::sql_types::BigInt;
186
187 let now = chrono::Utc::now();
188 let deadline = now + chrono::Duration::seconds(seconds);
189 let deadline_yyyymmdd = deadline.format("%Y%m%d").to_string().parse::<i64>()?;
190 let now_yyyymmdd = now.format("%Y%m%d").to_string().parse::<i64>()?;
191
192 #[derive(diesel::QueryableByName)]
193 struct CountResult {
194 #[diesel(sql_type = BigInt)]
195 count: i64,
196 }
197
198 let mut conn = self.pool().get()?;
199 let result = diesel::sql_query(
200 "SELECT COUNT(*)::bigint AS count \
201 FROM instruments \
202 WHERE expiry > $1 AND expiry <= $2 AND status = 'ACTIVE'",
203 )
204 .bind::<BigInt, _>(now_yyyymmdd)
205 .bind::<BigInt, _>(deadline_yyyymmdd)
206 .get_result::<CountResult>(&mut conn)?;
207
208 Ok(result.count)
209 }
210}
211
212impl hypercall_db::InstrumentWriter for DatabaseHandler {
213 fn save_market_and_instrument_sync(
214 &self,
215 underlying: &str,
216 expiry: i64,
217 instrument: &hypercall_db::InstrumentRecord,
218 ) -> Result<()> {
219 use crate::models::{Instrument, Market};
220 use diesel::prelude::*;
221
222 let market = Market {
223 underlying: underlying.to_string(),
224 expiry,
225 };
226 let diesel_instrument = Instrument {
227 instrument_numeric_id: instrument.instrument_numeric_id,
228 id: instrument.id.clone(),
229 underlying: instrument.underlying.clone(),
230 strike: instrument.strike,
231 expiry: instrument.expiry,
232 option_type: instrument.option_type.to_string().to_lowercase(),
233 option_token_address: instrument.option_token_address,
234 status: match instrument.status {
235 hypercall_types::api_models::InstrumentStatus::Active => "ACTIVE",
236 hypercall_types::api_models::InstrumentStatus::ExpiredPendingPrice => {
237 "EXPIRED_PENDING_PRICE"
238 }
239 hypercall_types::api_models::InstrumentStatus::Settled => "SETTLED",
240 }
241 .to_string(),
242 trading_mode: instrument.trading_mode.clone(),
243 };
244
245 let instrument_strike = diesel_instrument.strike;
246 let instrument_expiry_yyyymmdd =
247 u64::try_from(diesel_instrument.expiry).map_err(|_| {
248 anyhow::anyhow!(
249 "Invalid instrument expiry {}: cannot derive option token address from negative YYYYMMDD",
250 diesel_instrument.expiry
251 )
252 })?;
253 let option_token_address = Some(hypercall_types::derive_option_token_address(
254 current_option_token_deployment()?,
255 &diesel_instrument.underlying,
256 instrument_expiry_yyyymmdd,
257 instrument_strike,
258 &diesel_instrument.option_type,
259 )?);
260
261 let mut conn = self.pool().get()?;
262 conn.transaction::<_, anyhow::Error, _>(|conn| {
263 diesel::sql_query(
264 "INSERT INTO markets (underlying, expiry) VALUES ($1, $2) ON CONFLICT (underlying, expiry) DO NOTHING"
265 )
266 .bind::<diesel::sql_types::Text, _>(&market.underlying)
267 .bind::<diesel::sql_types::BigInt, _>(market.expiry)
268 .execute(conn)?;
269
270 match diesel::sql_query(
271 "INSERT INTO instruments (id, underlying, strike, expiry, option_type, option_token_address, trading_mode) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (id) DO NOTHING"
272 )
273 .bind::<diesel::sql_types::Text, _>(&diesel_instrument.id)
274 .bind::<diesel::sql_types::Text, _>(&diesel_instrument.underlying)
275 .bind::<diesel::sql_types::Numeric, _>(instrument_strike)
276 .bind::<diesel::sql_types::BigInt, _>(diesel_instrument.expiry)
277 .bind::<diesel::sql_types::Text, _>(&diesel_instrument.option_type)
278 .bind::<diesel::sql_types::Nullable<diesel::sql_types::Bytea>, _>(option_token_address)
279 .bind::<diesel::sql_types::Text, _>(&diesel_instrument.trading_mode)
280 .execute(conn) {
281 Ok(_rows) => {}
282 Err(err) => {
283 DatabaseHandler::observe_diesel_option_token_violation(&err);
284 return Err(err.into());
285 }
286 };
287
288 Ok(())
289 })
290 }
291
292 fn delete_market_and_instrument_sync(&self, symbol: &str) -> Result<()> {
293 use diesel::prelude::*;
294
295 #[derive(diesel::QueryableByName)]
296 struct MarketKey {
297 #[diesel(sql_type = diesel::sql_types::Text)]
298 underlying: String,
299 #[diesel(sql_type = diesel::sql_types::BigInt)]
300 expiry: i64,
301 }
302
303 let mut conn = self.pool().get()?;
304
305 conn.transaction::<_, anyhow::Error, _>(|conn| {
306 let key: MarketKey = diesel::sql_query(
307 "SELECT underlying, expiry
308 FROM instruments
309 WHERE id = $1",
310 )
311 .bind::<diesel::sql_types::Text, _>(symbol)
312 .get_result(conn)
313 .optional()?
314 .ok_or_else(|| {
315 anyhow::anyhow!("Market instrument does not exist for symbol: {}", symbol)
316 })?;
317
318 let deleted_instruments = diesel::sql_query(
319 "DELETE FROM instruments
320 WHERE id = $1",
321 )
322 .bind::<diesel::sql_types::Text, _>(symbol)
323 .execute(conn)?;
324
325 if deleted_instruments != 1 {
326 return Err(anyhow::anyhow!(
327 "Expected to delete exactly one instrument for symbol {}, deleted {}",
328 symbol,
329 deleted_instruments
330 ));
331 }
332
333 diesel::sql_query(
334 "DELETE FROM markets
335 WHERE underlying = $1
336 AND expiry = $2
337 AND NOT EXISTS (
338 SELECT 1
339 FROM instruments
340 WHERE underlying = $1
341 AND expiry = $2
342 )",
343 )
344 .bind::<diesel::sql_types::Text, _>(&key.underlying)
345 .bind::<diesel::sql_types::BigInt, _>(key.expiry)
346 .execute(conn)?;
347
348 Ok(())
349 })
350 }
351
352 fn update_instrument_status_sync(&self, symbols: &[String], status: &str) -> Result<usize> {
353 let mut conn = self.pool().get()?;
354
355 let updated = diesel::sql_query("UPDATE instruments SET status = $1 WHERE id = ANY($2)")
356 .bind::<diesel::sql_types::Text, _>(status)
357 .bind::<diesel::sql_types::Array<diesel::sql_types::Text>, _>(symbols)
358 .execute(&mut conn)?;
359
360 tracing::info!("Updated {} instruments to status '{}'", updated, status);
361
362 Ok(updated)
363 }
364
365 fn transition_active_instruments_to_expired_pending_sync(
366 &self,
367 symbols: &[String],
368 ) -> Result<usize> {
369 let mut conn = self.pool().get()?;
370
371 let updated = diesel::sql_query(
372 "UPDATE instruments
373 SET status = 'EXPIRED_PENDING_PRICE'
374 WHERE id = ANY($1)
375 AND status = 'ACTIVE'",
376 )
377 .bind::<diesel::sql_types::Array<diesel::sql_types::Text>, _>(symbols)
378 .execute(&mut conn)?;
379
380 tracing::info!(
381 "Transitioned {} ACTIVE instruments to EXPIRED_PENDING_PRICE",
382 updated
383 );
384
385 Ok(updated)
386 }
387}
388
389#[cfg(test)]
390mod tests {
391 use crate::test_helpers::TestDb;
392 use hypercall_db::*;
393 use rust_decimal_macros::dec;
394
395 #[tokio::test]
396 async fn instrument_write_read_roundtrip() {
397 let test_db = TestDb::new().await.unwrap();
398 let db = test_db.handler.as_ref();
399
400 let instrument = InstrumentRecord {
401 instrument_numeric_id: 1,
402 id: "BTC-20260131-100000-C".to_string(),
403 underlying: "BTC".to_string(),
404 strike: dec!(100000),
405 expiry: 20260131,
406 option_type: hypercall_types::enums::OptionType::Call,
407 option_token_address: None,
408 status: hypercall_types::api_models::InstrumentStatus::Active,
409 trading_mode: "orderbook".to_string(),
410 };
411
412 db.save_market_and_instrument_sync("BTC", 20260131, &instrument)
413 .unwrap();
414
415 let loaded = db.get_all_instruments_sync().unwrap();
416 assert_eq!(loaded.len(), 1);
417 assert_eq!(loaded[0].id, "BTC-20260131-100000-C");
418 assert_eq!(loaded[0].strike, dec!(100000));
419 }
420
421 #[tokio::test]
422 async fn instrument_status_counts() {
423 let test_db = TestDb::new().await.unwrap();
424 let db = test_db.handler.as_ref();
425
426 for (i, symbol) in ["BTC-20260131-100000-C", "BTC-20260131-110000-C"]
427 .iter()
428 .enumerate()
429 {
430 let instrument = InstrumentRecord {
431 instrument_numeric_id: (i + 1) as i32,
432 id: symbol.to_string(),
433 underlying: "BTC".to_string(),
434 strike: dec!(100000) + rust_decimal::Decimal::from(i as i64) * dec!(10000),
435 expiry: 20260131,
436 option_type: hypercall_types::enums::OptionType::Call,
437 option_token_address: None,
438 status: hypercall_types::api_models::InstrumentStatus::Active,
439 trading_mode: "orderbook".to_string(),
440 };
441 db.save_market_and_instrument_sync("BTC", 20260131, &instrument)
442 .unwrap();
443 }
444
445 let counts = db.get_instrument_status_counts_sync().unwrap();
446 let active_count = counts.iter().find(|(s, _)| s == "ACTIVE").map(|(_, c)| *c);
447 assert_eq!(active_count, Some(2));
448 }
449
450 #[tokio::test]
451 async fn instrument_update_status() {
452 let test_db = TestDb::new().await.unwrap();
453 let db = test_db.handler.as_ref();
454
455 let instrument = InstrumentRecord {
456 instrument_numeric_id: 1,
457 id: "ETH-20260131-4000-P".to_string(),
458 underlying: "ETH".to_string(),
459 strike: dec!(4000),
460 expiry: 20260131,
461 option_type: hypercall_types::enums::OptionType::Put,
462 option_token_address: None,
463 status: hypercall_types::api_models::InstrumentStatus::Active,
464 trading_mode: "orderbook".to_string(),
465 };
466
467 db.save_market_and_instrument_sync("ETH", 20260131, &instrument)
468 .unwrap();
469
470 let updated = db
471 .update_instrument_status_sync(
472 &["ETH-20260131-4000-P".to_string()],
473 "EXPIRED_PENDING_PRICE",
474 )
475 .unwrap();
476 assert_eq!(updated, 1);
477
478 let loaded = db
479 .get_instruments_by_status_sync("EXPIRED_PENDING_PRICE")
480 .unwrap();
481 assert_eq!(loaded.len(), 1);
482 assert_eq!(loaded[0].id, "ETH-20260131-4000-P");
483 }
484
485 #[tokio::test]
486 async fn instrument_active_to_expired_pending_transition_does_not_downgrade_settled() {
487 let test_db = TestDb::new().await.unwrap();
488 let db = test_db.handler.as_ref();
489
490 let active_symbol = "ETH-20260131-4000-C".to_string();
491 let settled_symbol = "ETH-20260131-4100-C".to_string();
492 for (instrument_numeric_id, symbol, strike) in [
493 (1, active_symbol.as_str(), dec!(4000)),
494 (2, settled_symbol.as_str(), dec!(4100)),
495 ] {
496 let instrument = InstrumentRecord {
497 instrument_numeric_id,
498 id: symbol.to_string(),
499 underlying: "ETH".to_string(),
500 strike,
501 expiry: 20260131,
502 option_type: hypercall_types::enums::OptionType::Call,
503 option_token_address: None,
504 status: hypercall_types::api_models::InstrumentStatus::Active,
505 trading_mode: "orderbook".to_string(),
506 };
507 db.save_market_and_instrument_sync("ETH", 20260131, &instrument)
508 .unwrap();
509 }
510
511 db.update_instrument_status_sync(std::slice::from_ref(&settled_symbol), "SETTLED")
512 .unwrap();
513
514 let updated = db
515 .transition_active_instruments_to_expired_pending_sync(&[
516 active_symbol.clone(),
517 settled_symbol.clone(),
518 ])
519 .unwrap();
520 assert_eq!(updated, 1);
521
522 let pending = db
523 .get_instruments_by_status_sync("EXPIRED_PENDING_PRICE")
524 .unwrap();
525 assert_eq!(pending.len(), 1);
526 assert_eq!(pending[0].id, active_symbol);
527
528 let settled = db.get_instruments_by_status_sync("SETTLED").unwrap();
529 assert_eq!(settled.len(), 1);
530 assert_eq!(settled[0].id, settled_symbol);
531 }
532
533 #[tokio::test]
534 async fn instrument_delete() {
535 let test_db = TestDb::new().await.unwrap();
536 let db = test_db.handler.as_ref();
537
538 let instrument = InstrumentRecord {
539 instrument_numeric_id: 1,
540 id: "BTC-20260131-100000-C".to_string(),
541 underlying: "BTC".to_string(),
542 strike: dec!(100000),
543 expiry: 20260131,
544 option_type: hypercall_types::enums::OptionType::Call,
545 option_token_address: None,
546 status: hypercall_types::api_models::InstrumentStatus::Active,
547 trading_mode: "orderbook".to_string(),
548 };
549
550 db.save_market_and_instrument_sync("BTC", 20260131, &instrument)
551 .unwrap();
552 db.delete_market_and_instrument_sync("BTC-20260131-100000-C")
553 .unwrap();
554
555 let loaded = db.get_all_instruments_sync().unwrap();
556 assert!(loaded.is_empty());
557 }
558}