Skip to main content

hypercall_db_diesel/
order_status_materialization.rs

1//! Materialized order state management.
2//!
3//! Keeps `order_infos.status` and `order_infos.filled_size` in sync with
4//! the latest `order_updates` row. Terminal statuses (CANCELED, FILLED,
5//! REJECTED) are never overwritten by earlier updates.
6
7use diesel::pg::PgConnection;
8use diesel::prelude::*;
9use rust_decimal::Decimal;
10
11use crate::models::NewOrderInfo;
12use crate::schema;
13
14const TERMINAL_ORDER_STATUSES: [&str; 3] = ["CANCELED", "FILLED", "REJECTED"];
15
16fn is_terminal_order_status(status: &str) -> bool {
17    TERMINAL_ORDER_STATUSES.contains(&status)
18}
19
20/// Upsert the materialized order_infos row with the latest status and
21/// filled_size. Skips the update if the existing row is already terminal
22/// or has a newer `last_materialized_order_update_id`.
23pub fn upsert_materialized_order_state(
24    conn: &mut PgConnection,
25    materialized_order_info: &NewOrderInfo,
26) -> QueryResult<usize> {
27    let inserted_rows = diesel::insert_into(schema::order_infos::table)
28        .values(materialized_order_info)
29        .on_conflict(schema::order_infos::order_id)
30        .do_nothing()
31        .execute(conn)?;
32
33    if inserted_rows > 0 {
34        return Ok(inserted_rows);
35    }
36
37    update_existing_materialized_order_state_if_newer(
38        conn,
39        materialized_order_info.order_id,
40        &materialized_order_info.status,
41        materialized_order_info.filled_size,
42        materialized_order_info.last_materialized_order_update_id,
43    )
44}
45
46pub fn update_existing_materialized_order_state_if_newer(
47    conn: &mut PgConnection,
48    order_id: i64,
49    status: &str,
50    filled_size: Decimal,
51    order_update_id: i32,
52) -> QueryResult<usize> {
53    if is_terminal_order_status(status) {
54        return diesel::update(
55            schema::order_infos::table
56                .filter(schema::order_infos::order_id.eq(order_id))
57                .filter(schema::order_infos::last_materialized_order_update_id.lt(order_update_id)),
58        )
59        .set((
60            schema::order_infos::status.eq(status),
61            schema::order_infos::filled_size.eq(filled_size),
62            schema::order_infos::updated_at.eq(diesel::dsl::now),
63            schema::order_infos::last_materialized_order_update_id.eq(order_update_id),
64        ))
65        .execute(conn);
66    }
67
68    diesel::update(
69        schema::order_infos::table
70            .filter(schema::order_infos::order_id.eq(order_id))
71            .filter(schema::order_infos::last_materialized_order_update_id.lt(order_update_id))
72            .filter(schema::order_infos::status.ne_all(TERMINAL_ORDER_STATUSES)),
73    )
74    .set((
75        schema::order_infos::status.eq(status),
76        schema::order_infos::filled_size.eq(filled_size),
77        schema::order_infos::updated_at.eq(diesel::dsl::now),
78        schema::order_infos::last_materialized_order_update_id.eq(order_update_id),
79    ))
80    .execute(conn)
81}
82
83#[cfg(test)]
84mod tests {
85    use super::*;
86    use crate::test_helpers::TestDb;
87    use hypercall_types::wallet_address::test_wallet;
88    use rust_decimal_macros::dec;
89
90    fn make_order_info(
91        order_id: i64,
92        status: &str,
93        filled_size: Decimal,
94        update_id: i32,
95    ) -> NewOrderInfo {
96        NewOrderInfo {
97            order_id,
98            wallet_address: test_wallet(1),
99            symbol: "BTC-20260115-100000-C".to_string(),
100            side: "Buy".to_string(),
101            price: dec!(5.0),
102            size: dec!(1000000),
103            tif: "GTC".to_string(),
104            client_id: None,
105            is_perp: false,
106            underlying: Some("BTC".to_string()),
107            reduce_only: None,
108            nonce: None,
109            signature: None,
110            mmp_enabled: false,
111            timestamp: 1700000000000,
112            status: status.to_string(),
113            filled_size,
114            last_materialized_order_update_id: update_id,
115        }
116    }
117
118    #[tokio::test]
119    async fn upsert_inserts_new_order() {
120        let test_db = TestDb::new().await.unwrap();
121        let mut conn = test_db.handler.pool().get().unwrap();
122        let info = make_order_info(1, "OPEN", dec!(0), 1);
123        let result = upsert_materialized_order_state(&mut conn, &info).unwrap();
124        assert_eq!(result, 1);
125    }
126
127    #[tokio::test]
128    async fn upsert_duplicate_does_not_overwrite() {
129        let test_db = TestDb::new().await.unwrap();
130        let mut conn = test_db.handler.pool().get().unwrap();
131        let info1 = make_order_info(2, "OPEN", dec!(0), 1);
132        upsert_materialized_order_state(&mut conn, &info1).unwrap();
133
134        // Second insert with same order_id should not overwrite (DO NOTHING)
135        let info2 = make_order_info(2, "FILLED", dec!(1000000), 2);
136        let result = upsert_materialized_order_state(&mut conn, &info2).unwrap();
137        // Returns 1 because it falls through to UPDATE path (newer update_id)
138        assert_eq!(result, 1);
139    }
140
141    #[tokio::test]
142    async fn update_newer_status_applies() {
143        let test_db = TestDb::new().await.unwrap();
144        let mut conn = test_db.handler.pool().get().unwrap();
145
146        let info = make_order_info(3, "OPEN", dec!(0), 1);
147        upsert_materialized_order_state(&mut conn, &info).unwrap();
148
149        // Update with newer update_id should apply
150        let result = update_existing_materialized_order_state_if_newer(
151            &mut conn,
152            3,
153            "PARTIALLY_FILLED",
154            dec!(500000),
155            2,
156        )
157        .unwrap();
158        assert_eq!(result, 1);
159    }
160
161    #[tokio::test]
162    async fn update_stale_status_rejected() {
163        let test_db = TestDb::new().await.unwrap();
164        let mut conn = test_db.handler.pool().get().unwrap();
165
166        let info = make_order_info(4, "OPEN", dec!(0), 5);
167        upsert_materialized_order_state(&mut conn, &info).unwrap();
168
169        // Update with older update_id should be rejected
170        let result = update_existing_materialized_order_state_if_newer(
171            &mut conn,
172            4,
173            "PARTIALLY_FILLED",
174            dec!(500000),
175            3,
176        )
177        .unwrap();
178        assert_eq!(result, 0);
179    }
180
181    #[tokio::test]
182    async fn terminal_status_cannot_be_reopened() {
183        let test_db = TestDb::new().await.unwrap();
184        let mut conn = test_db.handler.pool().get().unwrap();
185
186        let info = make_order_info(5, "CANCELED", dec!(0), 1);
187        upsert_materialized_order_state(&mut conn, &info).unwrap();
188
189        // Non-terminal update with newer id should NOT reopen a terminal order
190        let result =
191            update_existing_materialized_order_state_if_newer(&mut conn, 5, "OPEN", dec!(0), 2)
192                .unwrap();
193        assert_eq!(result, 0);
194    }
195
196    #[tokio::test]
197    async fn terminal_update_overwrites_terminal() {
198        let test_db = TestDb::new().await.unwrap();
199        let mut conn = test_db.handler.pool().get().unwrap();
200
201        let info = make_order_info(6, "FILLED", dec!(1000000), 1);
202        upsert_materialized_order_state(&mut conn, &info).unwrap();
203
204        // Terminal -> terminal with newer id DOES apply (e.g. FILLED -> CANCELED correction)
205        let result =
206            update_existing_materialized_order_state_if_newer(&mut conn, 6, "CANCELED", dec!(0), 2)
207                .unwrap();
208        assert_eq!(result, 1);
209    }
210}