Skip to main content

hypercall_db_diesel/
orders.rs

1//! OrderReader + OrderWriter + fill ledger helper implementations for DatabaseHandler.
2//!
3//! Handles order info upserts, order action audit trail, status
4//! materialization, fill persistence with atomic ledger side effects,
5//! and batch cancellation for settlement and orphaned orders.
6
7use anyhow::Result;
8use diesel::pg::PgConnection;
9use diesel::prelude::*;
10use diesel::RunQueryDsl;
11use rust_decimal::Decimal;
12
13use hypercall_types::{utils::is_option_symbol, WalletAddress};
14
15use crate::database_handler::DatabaseHandler;
16
17impl hypercall_db::OrderReader for DatabaseHandler {
18    fn get_order_infos_sync(
19        &self,
20        wallet: Option<&WalletAddress>,
21    ) -> Result<Vec<hypercall_db::OrderInfoRecord>> {
22        use crate::schema;
23
24        let mut conn = self.pool().get()?;
25        let mut query = schema::order_infos::table.into_boxed();
26        if let Some(wallet) = wallet {
27            query = query.filter(schema::order_infos::wallet_address.eq(wallet));
28        }
29        let results = query
30            .order(schema::order_infos::timestamp.desc())
31            .load::<crate::models::OrderInfoRecord>(&mut conn)?;
32        Ok(results.into_iter().map(Into::into).collect())
33    }
34
35    fn get_client_ids_by_order_ids_sync(
36        &self,
37        order_ids: &[i64],
38    ) -> Result<std::collections::HashMap<i64, Option<String>>> {
39        use crate::schema;
40
41        if order_ids.is_empty() {
42            return Ok(std::collections::HashMap::new());
43        }
44        let mut conn = self.pool().get()?;
45        let results = schema::order_infos::table
46            .filter(schema::order_infos::order_id.eq_any(order_ids))
47            .select((
48                schema::order_infos::order_id,
49                schema::order_infos::client_id,
50            ))
51            .load::<(i64, Option<String>)>(&mut conn)?;
52        Ok(results.into_iter().collect())
53    }
54
55    fn get_terminal_order_ids_sync(&self, order_ids: &[i64]) -> Result<Vec<i64>> {
56        use diesel::sql_types::{Array, BigInt};
57
58        if order_ids.is_empty() {
59            return Ok(vec![]);
60        }
61
62        let mut conn = self.pool().get()?;
63
64        diesel::sql_query("SET statement_timeout = '120000'")
65            .execute(&mut conn)
66            .ok();
67
68        #[derive(diesel::QueryableByName)]
69        struct TerminalRow {
70            #[diesel(sql_type = BigInt)]
71            order_id: i64,
72        }
73
74        let results = diesel::sql_query(
75            "SELECT order_id
76             FROM order_infos
77             WHERE order_id = ANY($1)
78               AND status IN ('FILLED', 'CANCELED', 'REJECTED')",
79        )
80        .bind::<Array<BigInt>, _>(order_ids)
81        .load::<TerminalRow>(&mut conn)?;
82
83        diesel::sql_query("SET statement_timeout = '30000'")
84            .execute(&mut conn)
85            .ok();
86
87        let terminal: Vec<i64> = results.into_iter().map(|row| row.order_id).collect();
88
89        if !terminal.is_empty() {
90            tracing::info!(
91                "Post-startup reconciliation: found {} terminal orders in order_infos out of {} checked",
92                terminal.len(),
93                order_ids.len()
94            );
95        }
96
97        Ok(terminal)
98    }
99
100    fn get_max_order_id_sync(&self) -> Result<u64> {
101        use diesel::sql_types::BigInt;
102
103        #[derive(diesel::QueryableByName)]
104        struct MaxOrderId {
105            #[diesel(sql_type = BigInt)]
106            max_id: i64,
107        }
108
109        let mut conn = self.pool().get()?;
110        let result = diesel::sql_query(
111            "SELECT GREATEST(
112                COALESCE((SELECT MAX(order_id) FROM order_infos), 0),
113                COALESCE((SELECT MAX(order_id) FROM engine_commands WHERE order_id IS NOT NULL), 0)
114            ) as max_id",
115        )
116        .get_result::<MaxOrderId>(&mut conn);
117
118        match result {
119            Ok(row) => Ok(row.max_id as u64),
120            Err(_) => {
121                tracing::warn!(
122                    "engine_commands.order_id not available, falling back to order_infos only"
123                );
124                let mut fallback_conn = self.pool().get()?;
125                let row = diesel::sql_query(
126                    "SELECT COALESCE(MAX(order_id), 0) as max_id FROM order_infos",
127                )
128                .get_result::<MaxOrderId>(&mut fallback_conn)?;
129                Ok(row.max_id as u64)
130            }
131        }
132    }
133
134    fn get_max_trade_id_sync(&self) -> Result<u64> {
135        use diesel::sql_types::BigInt;
136
137        #[derive(diesel::QueryableByName)]
138        struct MaxTradeId {
139            #[diesel(sql_type = BigInt)]
140            max_id: i64,
141        }
142
143        let mut conn = self.pool().get()?;
144        let result = diesel::sql_query("SELECT COALESCE(MAX(trade_id), 0) as max_id FROM trades")
145            .get_result::<MaxTradeId>(&mut conn)?;
146        Ok(result.max_id as u64)
147    }
148}
149
150impl hypercall_db::OrderWriter for DatabaseHandler {
151    fn persist_order_info_sync(&self, info: &hypercall_db::PersistOrderInfo) -> Result<()> {
152        use crate::schema;
153        use diesel::upsert::excluded;
154
155        let new_order_info = crate::models::NewOrderInfo {
156            order_id: info.order_id,
157            wallet_address: info.wallet_address,
158            symbol: info.symbol.clone(),
159            side: format!("{:?}", info.side),
160            price: info.price,
161            size: info.size,
162            tif: format!("{:?}", info.tif),
163            client_id: info.client_id.clone(),
164            is_perp: info.is_perp,
165            underlying: info.underlying.clone(),
166            reduce_only: info.reduce_only,
167            nonce: info.nonce,
168            signature: info.signature.clone(),
169            mmp_enabled: info.mmp_enabled,
170            timestamp: info.timestamp,
171            status: "ACKED".to_string(),
172            filled_size: Decimal::ZERO,
173            last_materialized_order_update_id: 0,
174        };
175
176        let mut conn = self.pool().get()?;
177        diesel::insert_into(schema::order_infos::table)
178            .values(&new_order_info)
179            .on_conflict(schema::order_infos::order_id)
180            .do_update()
181            .set((
182                schema::order_infos::wallet_address
183                    .eq(excluded(schema::order_infos::wallet_address)),
184                schema::order_infos::symbol.eq(excluded(schema::order_infos::symbol)),
185                schema::order_infos::side.eq(excluded(schema::order_infos::side)),
186                schema::order_infos::price.eq(excluded(schema::order_infos::price)),
187                schema::order_infos::size.eq(excluded(schema::order_infos::size)),
188                schema::order_infos::tif.eq(excluded(schema::order_infos::tif)),
189                schema::order_infos::client_id.eq(excluded(schema::order_infos::client_id)),
190                schema::order_infos::is_perp.eq(excluded(schema::order_infos::is_perp)),
191                schema::order_infos::underlying.eq(excluded(schema::order_infos::underlying)),
192                schema::order_infos::reduce_only.eq(excluded(schema::order_infos::reduce_only)),
193                schema::order_infos::nonce.eq(excluded(schema::order_infos::nonce)),
194                schema::order_infos::signature.eq(excluded(schema::order_infos::signature)),
195                schema::order_infos::mmp_enabled.eq(excluded(schema::order_infos::mmp_enabled)),
196                schema::order_infos::timestamp.eq(excluded(schema::order_infos::timestamp)),
197            ))
198            .execute(&mut conn)?;
199        Ok(())
200    }
201
202    fn persist_order_action_sync(&self, action: &hypercall_db::PersistOrderAction) -> Result<()> {
203        use crate::schema;
204
205        let new_action = crate::models::NewOrderAction {
206            timestamp: action.timestamp,
207            wallet: action.wallet,
208            action: action.action.clone(),
209            symbol: action.symbol.clone(),
210            price: action.price,
211            size: action.size,
212            side: format!("{:?}", action.side),
213            tif: format!("{:?}", action.tif),
214            client_id: action.client_id.clone(),
215        };
216
217        let mut conn = self.pool().get()?;
218        diesel::insert_into(schema::order_actions::table)
219            .values(&new_action)
220            .execute(&mut conn)?;
221        Ok(())
222    }
223
224    fn persist_order_update_sync(
225        &self,
226        update: &hypercall_types::OrderUpdateMessage,
227    ) -> Result<()> {
228        use crate::models::{NewOrderInfo, NewOrderUpdate, NewRejectedOrder};
229        use crate::order_status_materialization::upsert_materialized_order_state;
230        use crate::schema;
231
232        let current_status = DatabaseHandler::order_update_status_to_db(update.status).to_string();
233
234        let new_order_update = NewOrderUpdate {
235            timestamp: update.timestamp as i64,
236            order_id: update.order_id.map(|id| id as i64),
237            status: current_status.clone(),
238            reason: update.reason.clone(),
239            filled_size: update.filled_size,
240            symbol: update.info.symbol.clone(),
241            price: update.info.price,
242            size: update.info.size,
243            side: format!("{:?}", update.info.side),
244            tif: format!("{:?}", update.info.tif),
245            client_id: update.info.client_id.clone(),
246        };
247
248        let mut conn = self.pool().get()?;
249
250        let order_update_id = diesel::insert_into(schema::order_updates::table)
251            .values(&new_order_update)
252            .returning(schema::order_updates::id)
253            .get_result::<Option<i32>>(&mut conn)?
254            .expect("order_updates.id should be populated after insert");
255
256        if let Some(order_id) = update.order_id {
257            let materialized_order_info = NewOrderInfo {
258                order_id: order_id as i64,
259                wallet_address: update.wallet_address,
260                symbol: update.info.symbol.clone(),
261                side: format!("{:?}", update.info.side),
262                price: update.info.price,
263                size: update.info.size,
264                tif: format!("{:?}", update.info.tif),
265                client_id: update.info.client_id.clone(),
266                is_perp: update.info.is_perp,
267                underlying: update.info.underlying.clone(),
268                reduce_only: update.info.reduce_only,
269                nonce: update.info.nonce.map(|n| n as i64),
270                signature: update.info.signature.clone(),
271                mmp_enabled: update.info.mmp_enabled,
272                timestamp: update.timestamp as i64,
273                status: current_status,
274                filled_size: update.filled_size,
275                last_materialized_order_update_id: order_update_id,
276            };
277
278            upsert_materialized_order_state(&mut conn, &materialized_order_info)?;
279        }
280
281        // Handle rejected orders
282        if let hypercall_types::OrderUpdateStatus::Rejected = update.status {
283            let new_rejected = NewRejectedOrder {
284                wallet_address: update.wallet_address,
285                symbol: update.info.symbol.clone(),
286                side: format!("{:?}", update.info.side),
287                price: update.info.price,
288                size: update.info.size,
289                reason: update
290                    .reason
291                    .clone()
292                    .unwrap_or_else(|| "Unknown".to_string()),
293                timestamp: update.timestamp as i64,
294            };
295
296            diesel::insert_into(schema::rejected_orders::table)
297                .values(&new_rejected)
298                .execute(&mut conn)?;
299        }
300
301        Ok(())
302    }
303
304    fn persist_fill_with_side_effects_sync(
305        &self,
306        fill: &hypercall_types::Fill,
307        side_effects: &hypercall_db::FillSideEffects,
308    ) -> Result<(bool, bool)> {
309        use diesel::sql_types::{BigInt, Binary, Bool, Numeric, Text};
310        use rust_decimal_macros::dec;
311
312        tracing::debug!(
313            "persist_fill_with_side_effects_sync: trade_id={}, symbol={}, price={}, size={}, taker={}, maker={}",
314            fill.trade_id, fill.symbol, fill.price, fill.size, fill.taker_wallet_address, fill.maker_wallet_address
315        );
316
317        assert_eq!(
318            side_effects.trade_id, fill.trade_id,
319            "CRITICAL: journal fill side effects trade_id mismatch for fill {}",
320            fill.trade_id
321        );
322        let underlying_notional =
323            validate_fill_underlying_notional(fill, side_effects.underlying_notional)?;
324
325        let taker_realized_pnl = fill
326            .taker_realized_pnl
327            .unwrap_or(side_effects.taker_ledger_delta);
328        let maker_realized_pnl = fill
329            .maker_realized_pnl
330            .unwrap_or(side_effects.maker_ledger_delta);
331
332        let mut conn = self.pool().get()?;
333
334        conn.transaction(|conn| {
335            diesel::sql_query(
336                "INSERT INTO trades (trade_id, symbol, price, size, maker_address, taker_address, maker_fee, taker_fee, timestamp)
337                 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
338                 ON CONFLICT (trade_id) DO NOTHING",
339            )
340            .bind::<BigInt, _>(fill.trade_id as i64)
341            .bind::<Text, _>(&fill.symbol)
342            .bind::<Numeric, _>(fill.price)
343            .bind::<Numeric, _>(fill.size)
344            .bind::<Binary, _>(&fill.maker_wallet_address)
345            .bind::<Binary, _>(&fill.taker_wallet_address)
346            .bind::<Numeric, _>(dec!(0))
347            .bind::<Numeric, _>(fill.fee)
348            .bind::<BigInt, _>(fill.timestamp as i64)
349            .execute(conn)?;
350
351            // Taker fill
352            let taker_rows: usize = diesel::sql_query(
353                "INSERT INTO fills (trade_id, wallet_address, symbol, price, size, fee, is_taker, timestamp, builder_code_address, builder_code_fee, realized_pnl, underlying_notional)
354                 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
355                 ON CONFLICT (trade_id, wallet_address, is_taker) DO NOTHING",
356            )
357            .bind::<BigInt, _>(fill.trade_id as i64)
358            .bind::<Binary, _>(&fill.taker_wallet_address)
359            .bind::<Text, _>(&fill.symbol)
360            .bind::<Numeric, _>(fill.price)
361            .bind::<Numeric, _>(fill.size)
362            .bind::<Numeric, _>(fill.fee)
363            .bind::<Bool, _>(true)
364            .bind::<BigInt, _>(fill.timestamp as i64)
365            .bind::<diesel::sql_types::Nullable<Binary>, _>(fill.builder_code_address.as_ref())
366            .bind::<diesel::sql_types::Nullable<Numeric>, _>(fill.builder_code_fee)
367            .bind::<diesel::sql_types::Nullable<Numeric>, _>(Some(taker_realized_pnl))
368            .bind::<diesel::sql_types::Nullable<Numeric>, _>(underlying_notional)
369            .execute(conn)?;
370            let taker_inserted = taker_rows > 0;
371
372            if taker_inserted {
373                DatabaseHandler::apply_fill_ledger_side_effects(
374                    conn,
375                    &fill.taker_wallet_address,
376                    fill,
377                    side_effects.taker_ledger_delta,
378                    side_effects.taker_premium_delta,
379                )?;
380            }
381
382            // Maker fill
383            let maker_rows: usize = diesel::sql_query(
384                "INSERT INTO fills (trade_id, wallet_address, symbol, price, size, fee, is_taker, timestamp, builder_code_address, builder_code_fee, realized_pnl, underlying_notional)
385                 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
386                 ON CONFLICT (trade_id, wallet_address, is_taker) DO NOTHING",
387            )
388            .bind::<BigInt, _>(fill.trade_id as i64)
389            .bind::<Binary, _>(&fill.maker_wallet_address)
390            .bind::<Text, _>(&fill.symbol)
391            .bind::<Numeric, _>(fill.price)
392            .bind::<Numeric, _>(fill.size)
393            .bind::<Numeric, _>(dec!(0))
394            .bind::<Bool, _>(false)
395            .bind::<BigInt, _>(fill.timestamp as i64)
396            .bind::<diesel::sql_types::Nullable<Binary>, _>(Option::<WalletAddress>::None)
397            .bind::<diesel::sql_types::Nullable<Numeric>, _>(Option::<Decimal>::None)
398            .bind::<diesel::sql_types::Nullable<Numeric>, _>(Some(maker_realized_pnl))
399            .bind::<diesel::sql_types::Nullable<Numeric>, _>(underlying_notional)
400            .execute(conn)?;
401            let maker_inserted = maker_rows > 0;
402
403            if maker_inserted {
404                DatabaseHandler::apply_fill_ledger_side_effects(
405                    conn,
406                    &fill.maker_wallet_address,
407                    fill,
408                    side_effects.maker_ledger_delta,
409                    side_effects.maker_premium_delta,
410                )?;
411            }
412
413            Ok((taker_inserted, maker_inserted))
414        })
415    }
416
417    fn batch_cancel_orders_for_settlement_sync(
418        &self,
419        order_ids: &[i64],
420        timestamp_ms: i64,
421    ) -> Result<usize> {
422        use diesel::sql_types::{Array, BigInt};
423
424        if order_ids.is_empty() {
425            return Ok(0);
426        }
427
428        let mut conn = self.pool().get()?;
429        let updated = diesel::sql_query(
430            "UPDATE order_infos \
431             SET status = 'CANCELED', updated_at = now() \
432             WHERE order_id = ANY($1) \
433             AND status NOT IN ('CANCELED', 'FILLED', 'REJECTED')",
434        )
435        .bind::<Array<BigInt>, _>(order_ids)
436        .execute(&mut conn)?;
437
438        if updated != order_ids.len() {
439            tracing::warn!(
440                expected = order_ids.len(),
441                actual = updated,
442                timestamp_ms,
443                "Settlement cancel: order_infos update count mismatch \
444                 (some orders may already be terminal or missing from order_infos)"
445            );
446        }
447
448        Ok(updated)
449    }
450
451    fn cancel_orphaned_orders_by_symbols_sync(&self, symbols: &[String]) -> Result<usize> {
452        if symbols.is_empty() {
453            return Ok(0);
454        }
455
456        let mut conn = self.pool().get()?;
457        let mut updated = 0;
458        for chunk in symbols.chunks(32) {
459            updated += diesel::sql_query(
460                "UPDATE order_infos \
461                 SET status = 'CANCELED', updated_at = now() \
462                 WHERE symbol = ANY($1) \
463                 AND status IN ('ACKED', 'OPEN', 'PARTIALLY_FILLED')",
464            )
465            .bind::<diesel::sql_types::Array<diesel::sql_types::Text>, _>(chunk)
466            .execute(&mut conn)?;
467        }
468
469        Ok(updated)
470    }
471}
472
473fn validate_fill_underlying_notional(
474    fill: &hypercall_types::Fill,
475    side_effect_underlying_notional: Option<Decimal>,
476) -> Result<Option<Decimal>> {
477    if !is_option_symbol(&fill.symbol) {
478        return Ok(fill.underlying_notional.or(side_effect_underlying_notional));
479    }
480
481    let fill_notional = fill.underlying_notional.ok_or_else(|| {
482        anyhow::anyhow!(
483            "CRITICAL: option fill {} for trade {} is missing underlying_notional",
484            fill.symbol,
485            fill.trade_id
486        )
487    })?;
488    let side_effect_notional = side_effect_underlying_notional.ok_or_else(|| {
489        anyhow::anyhow!(
490            "CRITICAL: option fill side effects for trade {} are missing underlying_notional",
491            fill.trade_id
492        )
493    })?;
494    anyhow::ensure!(
495        fill_notional == side_effect_notional,
496        "CRITICAL: option fill underlying_notional mismatch for trade {}: fill={} side_effect={}",
497        fill.trade_id,
498        fill_notional,
499        side_effect_notional
500    );
501    Ok(Some(fill_notional))
502}
503
504// Fill ledger side-effects helper (private)
505impl DatabaseHandler {
506    pub(crate) fn apply_fill_ledger_side_effects(
507        conn: &mut PgConnection,
508        wallet: &WalletAddress,
509        fill: &hypercall_types::Fill,
510        realized_delta: Decimal,
511        premium_delta: Decimal,
512    ) -> Result<()> {
513        use diesel::sql_types::{BigInt, Binary, Text};
514
515        let _total_delta = realized_delta + premium_delta;
516
517        if realized_delta != Decimal::ZERO {
518            let inserted = diesel::sql_query(
519                "INSERT INTO ledger_events
520                 (wallet, event_ts_ms, delta, event_type, reference_trade_id, reference_symbol)
521                 VALUES ($1, $2, $3, $4, $5, $6)
522                 ON CONFLICT (wallet, reference_trade_id, event_type)
523                 WHERE reference_trade_id IS NOT NULL
524                 DO NOTHING",
525            )
526            .bind::<Binary, _>(wallet)
527            .bind::<BigInt, _>(fill.timestamp as i64)
528            .bind::<diesel::sql_types::Numeric, _>(realized_delta)
529            .bind::<Text, _>("fill_realized_pnl")
530            .bind::<BigInt, _>(fill.trade_id as i64)
531            .bind::<diesel::sql_types::Nullable<Text>, _>(Some(fill.symbol.as_str()))
532            .execute(conn)?;
533            assert_eq!(
534                inserted, 1,
535                "CRITICAL: missing fill_realized_pnl ledger event for trade {} wallet {}",
536                fill.trade_id, wallet
537            );
538        }
539
540        if premium_delta != Decimal::ZERO {
541            let inserted = diesel::sql_query(
542                "INSERT INTO ledger_events
543                 (wallet, event_ts_ms, delta, event_type, reference_trade_id, reference_symbol)
544                 VALUES ($1, $2, $3, $4, $5, $6)
545                 ON CONFLICT (wallet, reference_trade_id, event_type)
546                 WHERE reference_trade_id IS NOT NULL
547                 DO NOTHING",
548            )
549            .bind::<Binary, _>(wallet)
550            .bind::<BigInt, _>(fill.timestamp as i64)
551            .bind::<diesel::sql_types::Numeric, _>(premium_delta)
552            .bind::<Text, _>("fill_premium")
553            .bind::<BigInt, _>(fill.trade_id as i64)
554            .bind::<diesel::sql_types::Nullable<Text>, _>(Some(fill.symbol.as_str()))
555            .execute(conn)?;
556            assert_eq!(
557                inserted, 1,
558                "CRITICAL: missing fill_premium ledger event for trade {} wallet {}",
559                fill.trade_id, wallet
560            );
561        }
562
563        Ok(())
564    }
565}
566
567#[cfg(test)]
568mod tests {
569    use crate::test_helpers::TestDb;
570    use hypercall_db::*;
571    use hypercall_types::wallet_address::test_wallet;
572    use rust_decimal_macros::dec;
573
574    #[tokio::test]
575    async fn order_info_write_read_roundtrip() {
576        let test_db = TestDb::new().await.unwrap();
577        let db = test_db.handler.as_ref();
578        let wallet = test_wallet(10);
579
580        let info = PersistOrderInfo {
581            order_id: 42,
582            wallet_address: wallet,
583            symbol: "ETH-20260131-4000-C".to_string(),
584            side: hypercall_types::Side::Buy,
585            price: dec!(500),
586            size: dec!(1000000),
587            tif: hypercall_types::TimeInForce::GTC,
588            client_id: Some("test-client-1".to_string()),
589            is_perp: false,
590            underlying: Some("ETH".to_string()),
591            reduce_only: None,
592            nonce: Some(1),
593            signature: Some("0xabc".to_string()),
594            mmp_enabled: false,
595            timestamp: 1700000000000,
596        };
597
598        db.persist_order_info_sync(&info).unwrap();
599        let orders = db.get_order_infos_sync(Some(&wallet)).unwrap();
600        assert_eq!(orders.len(), 1);
601        assert_eq!(orders[0].order_id, 42);
602        assert_eq!(orders[0].symbol, "ETH-20260131-4000-C");
603        assert_eq!(orders[0].client_id, Some("test-client-1".to_string()));
604    }
605
606    #[tokio::test]
607    async fn order_info_upsert_idempotent() {
608        let test_db = TestDb::new().await.unwrap();
609        let db = test_db.handler.as_ref();
610        let wallet = test_wallet(11);
611
612        let info = PersistOrderInfo {
613            order_id: 99,
614            wallet_address: wallet,
615            symbol: "BTC-20260131-100000-C".to_string(),
616            side: hypercall_types::Side::Sell,
617            price: dec!(5000),
618            size: dec!(500000),
619            tif: hypercall_types::TimeInForce::IOC,
620            client_id: None,
621            is_perp: false,
622            underlying: Some("BTC".to_string()),
623            reduce_only: None,
624            nonce: None,
625            signature: None,
626            mmp_enabled: false,
627            timestamp: 1700000000000,
628        };
629
630        db.persist_order_info_sync(&info).unwrap();
631        db.persist_order_info_sync(&info).unwrap(); // second call should not fail
632        let orders = db.get_order_infos_sync(Some(&wallet)).unwrap();
633        assert_eq!(orders.len(), 1); // still one order
634    }
635
636    #[tokio::test]
637    async fn order_action_write() {
638        let test_db = TestDb::new().await.unwrap();
639        let db = test_db.handler.as_ref();
640        let wallet = test_wallet(12);
641
642        let action = PersistOrderAction {
643            timestamp: 1700000000000,
644            wallet,
645            action: "CreateOrder".to_string(),
646            symbol: "ETH-20260131-4000-C".to_string(),
647            price: dec!(500),
648            size: dec!(1000000),
649            side: hypercall_types::Side::Buy,
650            tif: hypercall_types::TimeInForce::GTC,
651            client_id: Some("test-action".to_string()),
652        };
653
654        // Should not error -- no read-back for actions (audit trail only)
655        db.persist_order_action_sync(&action).unwrap();
656    }
657
658    #[tokio::test]
659    async fn max_order_id_increases_with_inserts() {
660        let test_db = TestDb::new().await.unwrap();
661        let db = test_db.handler.as_ref();
662        let wallet = test_wallet(13);
663
664        let initial = db.get_max_order_id_sync().unwrap();
665
666        let info = PersistOrderInfo {
667            order_id: (initial + 100) as i64,
668            wallet_address: wallet,
669            symbol: "ETH-20260131-4000-C".to_string(),
670            side: hypercall_types::Side::Buy,
671            price: dec!(500),
672            size: dec!(1000000),
673            tif: hypercall_types::TimeInForce::GTC,
674            client_id: None,
675            is_perp: false,
676            underlying: None,
677            reduce_only: None,
678            nonce: None,
679            signature: None,
680            mmp_enabled: false,
681            timestamp: 1700000000000,
682        };
683        db.persist_order_info_sync(&info).unwrap();
684
685        let after = db.get_max_order_id_sync().unwrap();
686        assert!(after >= initial + 100);
687    }
688
689    #[tokio::test]
690    async fn client_id_lookup_roundtrip() {
691        let test_db = TestDb::new().await.unwrap();
692        let db = test_db.handler.as_ref();
693        let wallet = test_wallet(14);
694
695        for i in 1..=3i64 {
696            let info = PersistOrderInfo {
697                order_id: 1000 + i,
698                wallet_address: wallet,
699                symbol: "ETH-20260131-4000-C".to_string(),
700                side: hypercall_types::Side::Buy,
701                price: dec!(500),
702                size: dec!(1000000),
703                tif: hypercall_types::TimeInForce::GTC,
704                client_id: Some(format!("client-{}", i)),
705                is_perp: false,
706                underlying: None,
707                reduce_only: None,
708                nonce: None,
709                signature: None,
710                mmp_enabled: false,
711                timestamp: 1700000000000,
712            };
713            db.persist_order_info_sync(&info).unwrap();
714        }
715
716        let ids = db
717            .get_client_ids_by_order_ids_sync(&[1001, 1002, 1003, 9999])
718            .unwrap();
719        assert_eq!(ids.len(), 3); // 9999 not found
720        assert_eq!(ids[&1001], Some("client-1".to_string()));
721        assert_eq!(ids[&1002], Some("client-2".to_string()));
722    }
723
724    #[tokio::test]
725    async fn order_get_terminal_ids_empty() {
726        let test_db = TestDb::new().await.unwrap();
727        let db = test_db.handler.as_ref();
728        let terminal = db.get_terminal_order_ids_sync(&[1, 2, 3]).unwrap();
729        assert!(terminal.is_empty());
730    }
731
732    #[tokio::test]
733    async fn order_max_trade_id_empty() {
734        let test_db = TestDb::new().await.unwrap();
735        let db = test_db.handler.as_ref();
736        let max = db.get_max_trade_id_sync().unwrap();
737        assert_eq!(max, 0);
738    }
739}