1use anyhow::Result;
8use diesel::pg::PgConnection;
9use diesel::prelude::*;
10use diesel::RunQueryDsl;
11use rust_decimal::Decimal;
12
13use hypercall_types::{utils::is_option_symbol, WalletAddress};
14
15use crate::database_handler::DatabaseHandler;
16
17impl hypercall_db::OrderReader for DatabaseHandler {
18 fn get_order_infos_sync(
19 &self,
20 wallet: Option<&WalletAddress>,
21 ) -> Result<Vec<hypercall_db::OrderInfoRecord>> {
22 use crate::schema;
23
24 let mut conn = self.pool().get()?;
25 let mut query = schema::order_infos::table.into_boxed();
26 if let Some(wallet) = wallet {
27 query = query.filter(schema::order_infos::wallet_address.eq(wallet));
28 }
29 let results = query
30 .order(schema::order_infos::timestamp.desc())
31 .load::<crate::models::OrderInfoRecord>(&mut conn)?;
32 Ok(results.into_iter().map(Into::into).collect())
33 }
34
35 fn get_client_ids_by_order_ids_sync(
36 &self,
37 order_ids: &[i64],
38 ) -> Result<std::collections::HashMap<i64, Option<String>>> {
39 use crate::schema;
40
41 if order_ids.is_empty() {
42 return Ok(std::collections::HashMap::new());
43 }
44 let mut conn = self.pool().get()?;
45 let results = schema::order_infos::table
46 .filter(schema::order_infos::order_id.eq_any(order_ids))
47 .select((
48 schema::order_infos::order_id,
49 schema::order_infos::client_id,
50 ))
51 .load::<(i64, Option<String>)>(&mut conn)?;
52 Ok(results.into_iter().collect())
53 }
54
55 fn get_terminal_order_ids_sync(&self, order_ids: &[i64]) -> Result<Vec<i64>> {
56 use diesel::sql_types::{Array, BigInt};
57
58 if order_ids.is_empty() {
59 return Ok(vec![]);
60 }
61
62 let mut conn = self.pool().get()?;
63
64 diesel::sql_query("SET statement_timeout = '120000'")
65 .execute(&mut conn)
66 .ok();
67
68 #[derive(diesel::QueryableByName)]
69 struct TerminalRow {
70 #[diesel(sql_type = BigInt)]
71 order_id: i64,
72 }
73
74 let results = diesel::sql_query(
75 "SELECT order_id
76 FROM order_infos
77 WHERE order_id = ANY($1)
78 AND status IN ('FILLED', 'CANCELED', 'REJECTED')",
79 )
80 .bind::<Array<BigInt>, _>(order_ids)
81 .load::<TerminalRow>(&mut conn)?;
82
83 diesel::sql_query("SET statement_timeout = '30000'")
84 .execute(&mut conn)
85 .ok();
86
87 let terminal: Vec<i64> = results.into_iter().map(|row| row.order_id).collect();
88
89 if !terminal.is_empty() {
90 tracing::info!(
91 "Post-startup reconciliation: found {} terminal orders in order_infos out of {} checked",
92 terminal.len(),
93 order_ids.len()
94 );
95 }
96
97 Ok(terminal)
98 }
99
100 fn get_max_order_id_sync(&self) -> Result<u64> {
101 use diesel::sql_types::BigInt;
102
103 #[derive(diesel::QueryableByName)]
104 struct MaxOrderId {
105 #[diesel(sql_type = BigInt)]
106 max_id: i64,
107 }
108
109 let mut conn = self.pool().get()?;
110 let result = diesel::sql_query(
111 "SELECT GREATEST(
112 COALESCE((SELECT MAX(order_id) FROM order_infos), 0),
113 COALESCE((SELECT MAX(order_id) FROM engine_commands WHERE order_id IS NOT NULL), 0)
114 ) as max_id",
115 )
116 .get_result::<MaxOrderId>(&mut conn);
117
118 match result {
119 Ok(row) => Ok(row.max_id as u64),
120 Err(_) => {
121 tracing::warn!(
122 "engine_commands.order_id not available, falling back to order_infos only"
123 );
124 let mut fallback_conn = self.pool().get()?;
125 let row = diesel::sql_query(
126 "SELECT COALESCE(MAX(order_id), 0) as max_id FROM order_infos",
127 )
128 .get_result::<MaxOrderId>(&mut fallback_conn)?;
129 Ok(row.max_id as u64)
130 }
131 }
132 }
133
134 fn get_max_trade_id_sync(&self) -> Result<u64> {
135 use diesel::sql_types::BigInt;
136
137 #[derive(diesel::QueryableByName)]
138 struct MaxTradeId {
139 #[diesel(sql_type = BigInt)]
140 max_id: i64,
141 }
142
143 let mut conn = self.pool().get()?;
144 let result = diesel::sql_query("SELECT COALESCE(MAX(trade_id), 0) as max_id FROM trades")
145 .get_result::<MaxTradeId>(&mut conn)?;
146 Ok(result.max_id as u64)
147 }
148}
149
150impl hypercall_db::OrderWriter for DatabaseHandler {
151 fn persist_order_info_sync(&self, info: &hypercall_db::PersistOrderInfo) -> Result<()> {
152 use crate::schema;
153 use diesel::upsert::excluded;
154
155 let new_order_info = crate::models::NewOrderInfo {
156 order_id: info.order_id,
157 wallet_address: info.wallet_address,
158 symbol: info.symbol.clone(),
159 side: format!("{:?}", info.side),
160 price: info.price,
161 size: info.size,
162 tif: format!("{:?}", info.tif),
163 client_id: info.client_id.clone(),
164 is_perp: info.is_perp,
165 underlying: info.underlying.clone(),
166 reduce_only: info.reduce_only,
167 nonce: info.nonce,
168 signature: info.signature.clone(),
169 mmp_enabled: info.mmp_enabled,
170 timestamp: info.timestamp,
171 status: "ACKED".to_string(),
172 filled_size: Decimal::ZERO,
173 last_materialized_order_update_id: 0,
174 };
175
176 let mut conn = self.pool().get()?;
177 diesel::insert_into(schema::order_infos::table)
178 .values(&new_order_info)
179 .on_conflict(schema::order_infos::order_id)
180 .do_update()
181 .set((
182 schema::order_infos::wallet_address
183 .eq(excluded(schema::order_infos::wallet_address)),
184 schema::order_infos::symbol.eq(excluded(schema::order_infos::symbol)),
185 schema::order_infos::side.eq(excluded(schema::order_infos::side)),
186 schema::order_infos::price.eq(excluded(schema::order_infos::price)),
187 schema::order_infos::size.eq(excluded(schema::order_infos::size)),
188 schema::order_infos::tif.eq(excluded(schema::order_infos::tif)),
189 schema::order_infos::client_id.eq(excluded(schema::order_infos::client_id)),
190 schema::order_infos::is_perp.eq(excluded(schema::order_infos::is_perp)),
191 schema::order_infos::underlying.eq(excluded(schema::order_infos::underlying)),
192 schema::order_infos::reduce_only.eq(excluded(schema::order_infos::reduce_only)),
193 schema::order_infos::nonce.eq(excluded(schema::order_infos::nonce)),
194 schema::order_infos::signature.eq(excluded(schema::order_infos::signature)),
195 schema::order_infos::mmp_enabled.eq(excluded(schema::order_infos::mmp_enabled)),
196 schema::order_infos::timestamp.eq(excluded(schema::order_infos::timestamp)),
197 ))
198 .execute(&mut conn)?;
199 Ok(())
200 }
201
202 fn persist_order_action_sync(&self, action: &hypercall_db::PersistOrderAction) -> Result<()> {
203 use crate::schema;
204
205 let new_action = crate::models::NewOrderAction {
206 timestamp: action.timestamp,
207 wallet: action.wallet,
208 action: action.action.clone(),
209 symbol: action.symbol.clone(),
210 price: action.price,
211 size: action.size,
212 side: format!("{:?}", action.side),
213 tif: format!("{:?}", action.tif),
214 client_id: action.client_id.clone(),
215 };
216
217 let mut conn = self.pool().get()?;
218 diesel::insert_into(schema::order_actions::table)
219 .values(&new_action)
220 .execute(&mut conn)?;
221 Ok(())
222 }
223
224 fn persist_order_update_sync(
225 &self,
226 update: &hypercall_types::OrderUpdateMessage,
227 ) -> Result<()> {
228 use crate::models::{NewOrderInfo, NewOrderUpdate, NewRejectedOrder};
229 use crate::order_status_materialization::upsert_materialized_order_state;
230 use crate::schema;
231
232 let current_status = DatabaseHandler::order_update_status_to_db(update.status).to_string();
233
234 let new_order_update = NewOrderUpdate {
235 timestamp: update.timestamp as i64,
236 order_id: update.order_id.map(|id| id as i64),
237 status: current_status.clone(),
238 reason: update.reason.clone(),
239 filled_size: update.filled_size,
240 symbol: update.info.symbol.clone(),
241 price: update.info.price,
242 size: update.info.size,
243 side: format!("{:?}", update.info.side),
244 tif: format!("{:?}", update.info.tif),
245 client_id: update.info.client_id.clone(),
246 };
247
248 let mut conn = self.pool().get()?;
249
250 let order_update_id = diesel::insert_into(schema::order_updates::table)
251 .values(&new_order_update)
252 .returning(schema::order_updates::id)
253 .get_result::<Option<i32>>(&mut conn)?
254 .expect("order_updates.id should be populated after insert");
255
256 if let Some(order_id) = update.order_id {
257 let materialized_order_info = NewOrderInfo {
258 order_id: order_id as i64,
259 wallet_address: update.wallet_address,
260 symbol: update.info.symbol.clone(),
261 side: format!("{:?}", update.info.side),
262 price: update.info.price,
263 size: update.info.size,
264 tif: format!("{:?}", update.info.tif),
265 client_id: update.info.client_id.clone(),
266 is_perp: update.info.is_perp,
267 underlying: update.info.underlying.clone(),
268 reduce_only: update.info.reduce_only,
269 nonce: update.info.nonce.map(|n| n as i64),
270 signature: update.info.signature.clone(),
271 mmp_enabled: update.info.mmp_enabled,
272 timestamp: update.timestamp as i64,
273 status: current_status,
274 filled_size: update.filled_size,
275 last_materialized_order_update_id: order_update_id,
276 };
277
278 upsert_materialized_order_state(&mut conn, &materialized_order_info)?;
279 }
280
281 if let hypercall_types::OrderUpdateStatus::Rejected = update.status {
283 let new_rejected = NewRejectedOrder {
284 wallet_address: update.wallet_address,
285 symbol: update.info.symbol.clone(),
286 side: format!("{:?}", update.info.side),
287 price: update.info.price,
288 size: update.info.size,
289 reason: update
290 .reason
291 .clone()
292 .unwrap_or_else(|| "Unknown".to_string()),
293 timestamp: update.timestamp as i64,
294 };
295
296 diesel::insert_into(schema::rejected_orders::table)
297 .values(&new_rejected)
298 .execute(&mut conn)?;
299 }
300
301 Ok(())
302 }
303
304 fn persist_fill_with_side_effects_sync(
305 &self,
306 fill: &hypercall_types::Fill,
307 side_effects: &hypercall_db::FillSideEffects,
308 ) -> Result<(bool, bool)> {
309 use diesel::sql_types::{BigInt, Binary, Bool, Numeric, Text};
310 use rust_decimal_macros::dec;
311
312 tracing::debug!(
313 "persist_fill_with_side_effects_sync: trade_id={}, symbol={}, price={}, size={}, taker={}, maker={}",
314 fill.trade_id, fill.symbol, fill.price, fill.size, fill.taker_wallet_address, fill.maker_wallet_address
315 );
316
317 assert_eq!(
318 side_effects.trade_id, fill.trade_id,
319 "CRITICAL: journal fill side effects trade_id mismatch for fill {}",
320 fill.trade_id
321 );
322 let underlying_notional =
323 validate_fill_underlying_notional(fill, side_effects.underlying_notional)?;
324
325 let taker_realized_pnl = fill
326 .taker_realized_pnl
327 .unwrap_or(side_effects.taker_ledger_delta);
328 let maker_realized_pnl = fill
329 .maker_realized_pnl
330 .unwrap_or(side_effects.maker_ledger_delta);
331
332 let mut conn = self.pool().get()?;
333
334 conn.transaction(|conn| {
335 diesel::sql_query(
336 "INSERT INTO trades (trade_id, symbol, price, size, maker_address, taker_address, maker_fee, taker_fee, timestamp)
337 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
338 ON CONFLICT (trade_id) DO NOTHING",
339 )
340 .bind::<BigInt, _>(fill.trade_id as i64)
341 .bind::<Text, _>(&fill.symbol)
342 .bind::<Numeric, _>(fill.price)
343 .bind::<Numeric, _>(fill.size)
344 .bind::<Binary, _>(&fill.maker_wallet_address)
345 .bind::<Binary, _>(&fill.taker_wallet_address)
346 .bind::<Numeric, _>(dec!(0))
347 .bind::<Numeric, _>(fill.fee)
348 .bind::<BigInt, _>(fill.timestamp as i64)
349 .execute(conn)?;
350
351 let taker_rows: usize = diesel::sql_query(
353 "INSERT INTO fills (trade_id, wallet_address, symbol, price, size, fee, is_taker, timestamp, builder_code_address, builder_code_fee, realized_pnl, underlying_notional)
354 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
355 ON CONFLICT (trade_id, wallet_address, is_taker) DO NOTHING",
356 )
357 .bind::<BigInt, _>(fill.trade_id as i64)
358 .bind::<Binary, _>(&fill.taker_wallet_address)
359 .bind::<Text, _>(&fill.symbol)
360 .bind::<Numeric, _>(fill.price)
361 .bind::<Numeric, _>(fill.size)
362 .bind::<Numeric, _>(fill.fee)
363 .bind::<Bool, _>(true)
364 .bind::<BigInt, _>(fill.timestamp as i64)
365 .bind::<diesel::sql_types::Nullable<Binary>, _>(fill.builder_code_address.as_ref())
366 .bind::<diesel::sql_types::Nullable<Numeric>, _>(fill.builder_code_fee)
367 .bind::<diesel::sql_types::Nullable<Numeric>, _>(Some(taker_realized_pnl))
368 .bind::<diesel::sql_types::Nullable<Numeric>, _>(underlying_notional)
369 .execute(conn)?;
370 let taker_inserted = taker_rows > 0;
371
372 if taker_inserted {
373 DatabaseHandler::apply_fill_ledger_side_effects(
374 conn,
375 &fill.taker_wallet_address,
376 fill,
377 side_effects.taker_ledger_delta,
378 side_effects.taker_premium_delta,
379 )?;
380 }
381
382 let maker_rows: usize = diesel::sql_query(
384 "INSERT INTO fills (trade_id, wallet_address, symbol, price, size, fee, is_taker, timestamp, builder_code_address, builder_code_fee, realized_pnl, underlying_notional)
385 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
386 ON CONFLICT (trade_id, wallet_address, is_taker) DO NOTHING",
387 )
388 .bind::<BigInt, _>(fill.trade_id as i64)
389 .bind::<Binary, _>(&fill.maker_wallet_address)
390 .bind::<Text, _>(&fill.symbol)
391 .bind::<Numeric, _>(fill.price)
392 .bind::<Numeric, _>(fill.size)
393 .bind::<Numeric, _>(dec!(0))
394 .bind::<Bool, _>(false)
395 .bind::<BigInt, _>(fill.timestamp as i64)
396 .bind::<diesel::sql_types::Nullable<Binary>, _>(Option::<WalletAddress>::None)
397 .bind::<diesel::sql_types::Nullable<Numeric>, _>(Option::<Decimal>::None)
398 .bind::<diesel::sql_types::Nullable<Numeric>, _>(Some(maker_realized_pnl))
399 .bind::<diesel::sql_types::Nullable<Numeric>, _>(underlying_notional)
400 .execute(conn)?;
401 let maker_inserted = maker_rows > 0;
402
403 if maker_inserted {
404 DatabaseHandler::apply_fill_ledger_side_effects(
405 conn,
406 &fill.maker_wallet_address,
407 fill,
408 side_effects.maker_ledger_delta,
409 side_effects.maker_premium_delta,
410 )?;
411 }
412
413 Ok((taker_inserted, maker_inserted))
414 })
415 }
416
417 fn batch_cancel_orders_for_settlement_sync(
418 &self,
419 order_ids: &[i64],
420 timestamp_ms: i64,
421 ) -> Result<usize> {
422 use diesel::sql_types::{Array, BigInt};
423
424 if order_ids.is_empty() {
425 return Ok(0);
426 }
427
428 let mut conn = self.pool().get()?;
429 let updated = diesel::sql_query(
430 "UPDATE order_infos \
431 SET status = 'CANCELED', updated_at = now() \
432 WHERE order_id = ANY($1) \
433 AND status NOT IN ('CANCELED', 'FILLED', 'REJECTED')",
434 )
435 .bind::<Array<BigInt>, _>(order_ids)
436 .execute(&mut conn)?;
437
438 if updated != order_ids.len() {
439 tracing::warn!(
440 expected = order_ids.len(),
441 actual = updated,
442 timestamp_ms,
443 "Settlement cancel: order_infos update count mismatch \
444 (some orders may already be terminal or missing from order_infos)"
445 );
446 }
447
448 Ok(updated)
449 }
450
451 fn cancel_orphaned_orders_by_symbols_sync(&self, symbols: &[String]) -> Result<usize> {
452 if symbols.is_empty() {
453 return Ok(0);
454 }
455
456 let mut conn = self.pool().get()?;
457 let mut updated = 0;
458 for chunk in symbols.chunks(32) {
459 updated += diesel::sql_query(
460 "UPDATE order_infos \
461 SET status = 'CANCELED', updated_at = now() \
462 WHERE symbol = ANY($1) \
463 AND status IN ('ACKED', 'OPEN', 'PARTIALLY_FILLED')",
464 )
465 .bind::<diesel::sql_types::Array<diesel::sql_types::Text>, _>(chunk)
466 .execute(&mut conn)?;
467 }
468
469 Ok(updated)
470 }
471}
472
473fn validate_fill_underlying_notional(
474 fill: &hypercall_types::Fill,
475 side_effect_underlying_notional: Option<Decimal>,
476) -> Result<Option<Decimal>> {
477 if !is_option_symbol(&fill.symbol) {
478 return Ok(fill.underlying_notional.or(side_effect_underlying_notional));
479 }
480
481 let fill_notional = fill.underlying_notional.ok_or_else(|| {
482 anyhow::anyhow!(
483 "CRITICAL: option fill {} for trade {} is missing underlying_notional",
484 fill.symbol,
485 fill.trade_id
486 )
487 })?;
488 let side_effect_notional = side_effect_underlying_notional.ok_or_else(|| {
489 anyhow::anyhow!(
490 "CRITICAL: option fill side effects for trade {} are missing underlying_notional",
491 fill.trade_id
492 )
493 })?;
494 anyhow::ensure!(
495 fill_notional == side_effect_notional,
496 "CRITICAL: option fill underlying_notional mismatch for trade {}: fill={} side_effect={}",
497 fill.trade_id,
498 fill_notional,
499 side_effect_notional
500 );
501 Ok(Some(fill_notional))
502}
503
504impl DatabaseHandler {
506 pub(crate) fn apply_fill_ledger_side_effects(
507 conn: &mut PgConnection,
508 wallet: &WalletAddress,
509 fill: &hypercall_types::Fill,
510 realized_delta: Decimal,
511 premium_delta: Decimal,
512 ) -> Result<()> {
513 use diesel::sql_types::{BigInt, Binary, Text};
514
515 let _total_delta = realized_delta + premium_delta;
516
517 if realized_delta != Decimal::ZERO {
518 let inserted = diesel::sql_query(
519 "INSERT INTO ledger_events
520 (wallet, event_ts_ms, delta, event_type, reference_trade_id, reference_symbol)
521 VALUES ($1, $2, $3, $4, $5, $6)
522 ON CONFLICT (wallet, reference_trade_id, event_type)
523 WHERE reference_trade_id IS NOT NULL
524 DO NOTHING",
525 )
526 .bind::<Binary, _>(wallet)
527 .bind::<BigInt, _>(fill.timestamp as i64)
528 .bind::<diesel::sql_types::Numeric, _>(realized_delta)
529 .bind::<Text, _>("fill_realized_pnl")
530 .bind::<BigInt, _>(fill.trade_id as i64)
531 .bind::<diesel::sql_types::Nullable<Text>, _>(Some(fill.symbol.as_str()))
532 .execute(conn)?;
533 assert_eq!(
534 inserted, 1,
535 "CRITICAL: missing fill_realized_pnl ledger event for trade {} wallet {}",
536 fill.trade_id, wallet
537 );
538 }
539
540 if premium_delta != Decimal::ZERO {
541 let inserted = diesel::sql_query(
542 "INSERT INTO ledger_events
543 (wallet, event_ts_ms, delta, event_type, reference_trade_id, reference_symbol)
544 VALUES ($1, $2, $3, $4, $5, $6)
545 ON CONFLICT (wallet, reference_trade_id, event_type)
546 WHERE reference_trade_id IS NOT NULL
547 DO NOTHING",
548 )
549 .bind::<Binary, _>(wallet)
550 .bind::<BigInt, _>(fill.timestamp as i64)
551 .bind::<diesel::sql_types::Numeric, _>(premium_delta)
552 .bind::<Text, _>("fill_premium")
553 .bind::<BigInt, _>(fill.trade_id as i64)
554 .bind::<diesel::sql_types::Nullable<Text>, _>(Some(fill.symbol.as_str()))
555 .execute(conn)?;
556 assert_eq!(
557 inserted, 1,
558 "CRITICAL: missing fill_premium ledger event for trade {} wallet {}",
559 fill.trade_id, wallet
560 );
561 }
562
563 Ok(())
564 }
565}
566
567#[cfg(test)]
568mod tests {
569 use crate::test_helpers::TestDb;
570 use hypercall_db::*;
571 use hypercall_types::wallet_address::test_wallet;
572 use rust_decimal_macros::dec;
573
574 #[tokio::test]
575 async fn order_info_write_read_roundtrip() {
576 let test_db = TestDb::new().await.unwrap();
577 let db = test_db.handler.as_ref();
578 let wallet = test_wallet(10);
579
580 let info = PersistOrderInfo {
581 order_id: 42,
582 wallet_address: wallet,
583 symbol: "ETH-20260131-4000-C".to_string(),
584 side: hypercall_types::Side::Buy,
585 price: dec!(500),
586 size: dec!(1000000),
587 tif: hypercall_types::TimeInForce::GTC,
588 client_id: Some("test-client-1".to_string()),
589 is_perp: false,
590 underlying: Some("ETH".to_string()),
591 reduce_only: None,
592 nonce: Some(1),
593 signature: Some("0xabc".to_string()),
594 mmp_enabled: false,
595 timestamp: 1700000000000,
596 };
597
598 db.persist_order_info_sync(&info).unwrap();
599 let orders = db.get_order_infos_sync(Some(&wallet)).unwrap();
600 assert_eq!(orders.len(), 1);
601 assert_eq!(orders[0].order_id, 42);
602 assert_eq!(orders[0].symbol, "ETH-20260131-4000-C");
603 assert_eq!(orders[0].client_id, Some("test-client-1".to_string()));
604 }
605
606 #[tokio::test]
607 async fn order_info_upsert_idempotent() {
608 let test_db = TestDb::new().await.unwrap();
609 let db = test_db.handler.as_ref();
610 let wallet = test_wallet(11);
611
612 let info = PersistOrderInfo {
613 order_id: 99,
614 wallet_address: wallet,
615 symbol: "BTC-20260131-100000-C".to_string(),
616 side: hypercall_types::Side::Sell,
617 price: dec!(5000),
618 size: dec!(500000),
619 tif: hypercall_types::TimeInForce::IOC,
620 client_id: None,
621 is_perp: false,
622 underlying: Some("BTC".to_string()),
623 reduce_only: None,
624 nonce: None,
625 signature: None,
626 mmp_enabled: false,
627 timestamp: 1700000000000,
628 };
629
630 db.persist_order_info_sync(&info).unwrap();
631 db.persist_order_info_sync(&info).unwrap(); let orders = db.get_order_infos_sync(Some(&wallet)).unwrap();
633 assert_eq!(orders.len(), 1); }
635
636 #[tokio::test]
637 async fn order_action_write() {
638 let test_db = TestDb::new().await.unwrap();
639 let db = test_db.handler.as_ref();
640 let wallet = test_wallet(12);
641
642 let action = PersistOrderAction {
643 timestamp: 1700000000000,
644 wallet,
645 action: "CreateOrder".to_string(),
646 symbol: "ETH-20260131-4000-C".to_string(),
647 price: dec!(500),
648 size: dec!(1000000),
649 side: hypercall_types::Side::Buy,
650 tif: hypercall_types::TimeInForce::GTC,
651 client_id: Some("test-action".to_string()),
652 };
653
654 db.persist_order_action_sync(&action).unwrap();
656 }
657
658 #[tokio::test]
659 async fn max_order_id_increases_with_inserts() {
660 let test_db = TestDb::new().await.unwrap();
661 let db = test_db.handler.as_ref();
662 let wallet = test_wallet(13);
663
664 let initial = db.get_max_order_id_sync().unwrap();
665
666 let info = PersistOrderInfo {
667 order_id: (initial + 100) as i64,
668 wallet_address: wallet,
669 symbol: "ETH-20260131-4000-C".to_string(),
670 side: hypercall_types::Side::Buy,
671 price: dec!(500),
672 size: dec!(1000000),
673 tif: hypercall_types::TimeInForce::GTC,
674 client_id: None,
675 is_perp: false,
676 underlying: None,
677 reduce_only: None,
678 nonce: None,
679 signature: None,
680 mmp_enabled: false,
681 timestamp: 1700000000000,
682 };
683 db.persist_order_info_sync(&info).unwrap();
684
685 let after = db.get_max_order_id_sync().unwrap();
686 assert!(after >= initial + 100);
687 }
688
689 #[tokio::test]
690 async fn client_id_lookup_roundtrip() {
691 let test_db = TestDb::new().await.unwrap();
692 let db = test_db.handler.as_ref();
693 let wallet = test_wallet(14);
694
695 for i in 1..=3i64 {
696 let info = PersistOrderInfo {
697 order_id: 1000 + i,
698 wallet_address: wallet,
699 symbol: "ETH-20260131-4000-C".to_string(),
700 side: hypercall_types::Side::Buy,
701 price: dec!(500),
702 size: dec!(1000000),
703 tif: hypercall_types::TimeInForce::GTC,
704 client_id: Some(format!("client-{}", i)),
705 is_perp: false,
706 underlying: None,
707 reduce_only: None,
708 nonce: None,
709 signature: None,
710 mmp_enabled: false,
711 timestamp: 1700000000000,
712 };
713 db.persist_order_info_sync(&info).unwrap();
714 }
715
716 let ids = db
717 .get_client_ids_by_order_ids_sync(&[1001, 1002, 1003, 9999])
718 .unwrap();
719 assert_eq!(ids.len(), 3); assert_eq!(ids[&1001], Some("client-1".to_string()));
721 assert_eq!(ids[&1002], Some("client-2".to_string()));
722 }
723
724 #[tokio::test]
725 async fn order_get_terminal_ids_empty() {
726 let test_db = TestDb::new().await.unwrap();
727 let db = test_db.handler.as_ref();
728 let terminal = db.get_terminal_order_ids_sync(&[1, 2, 3]).unwrap();
729 assert!(terminal.is_empty());
730 }
731
732 #[tokio::test]
733 async fn order_max_trade_id_empty() {
734 let test_db = TestDb::new().await.unwrap();
735 let db = test_db.handler.as_ref();
736 let max = db.get_max_trade_id_sync().unwrap();
737 assert_eq!(max, 0);
738 }
739}