1use diesel::pg::PgConnection;
8use diesel::prelude::*;
9use rust_decimal::Decimal;
10
11use crate::models::NewOrderInfo;
12use crate::schema;
13
14const TERMINAL_ORDER_STATUSES: [&str; 3] = ["CANCELED", "FILLED", "REJECTED"];
15
16fn is_terminal_order_status(status: &str) -> bool {
17 TERMINAL_ORDER_STATUSES.contains(&status)
18}
19
20pub fn upsert_materialized_order_state(
24 conn: &mut PgConnection,
25 materialized_order_info: &NewOrderInfo,
26) -> QueryResult<usize> {
27 let inserted_rows = diesel::insert_into(schema::order_infos::table)
28 .values(materialized_order_info)
29 .on_conflict(schema::order_infos::order_id)
30 .do_nothing()
31 .execute(conn)?;
32
33 if inserted_rows > 0 {
34 return Ok(inserted_rows);
35 }
36
37 update_existing_materialized_order_state_if_newer(
38 conn,
39 materialized_order_info.order_id,
40 &materialized_order_info.status,
41 materialized_order_info.filled_size,
42 materialized_order_info.last_materialized_order_update_id,
43 )
44}
45
46pub fn update_existing_materialized_order_state_if_newer(
47 conn: &mut PgConnection,
48 order_id: i64,
49 status: &str,
50 filled_size: Decimal,
51 order_update_id: i32,
52) -> QueryResult<usize> {
53 if is_terminal_order_status(status) {
54 return diesel::update(
55 schema::order_infos::table
56 .filter(schema::order_infos::order_id.eq(order_id))
57 .filter(schema::order_infos::last_materialized_order_update_id.lt(order_update_id)),
58 )
59 .set((
60 schema::order_infos::status.eq(status),
61 schema::order_infos::filled_size.eq(filled_size),
62 schema::order_infos::updated_at.eq(diesel::dsl::now),
63 schema::order_infos::last_materialized_order_update_id.eq(order_update_id),
64 ))
65 .execute(conn);
66 }
67
68 diesel::update(
69 schema::order_infos::table
70 .filter(schema::order_infos::order_id.eq(order_id))
71 .filter(schema::order_infos::last_materialized_order_update_id.lt(order_update_id))
72 .filter(schema::order_infos::status.ne_all(TERMINAL_ORDER_STATUSES)),
73 )
74 .set((
75 schema::order_infos::status.eq(status),
76 schema::order_infos::filled_size.eq(filled_size),
77 schema::order_infos::updated_at.eq(diesel::dsl::now),
78 schema::order_infos::last_materialized_order_update_id.eq(order_update_id),
79 ))
80 .execute(conn)
81}
82
83#[cfg(test)]
84mod tests {
85 use super::*;
86 use crate::test_helpers::TestDb;
87 use hypercall_types::wallet_address::test_wallet;
88 use rust_decimal_macros::dec;
89
90 fn make_order_info(
91 order_id: i64,
92 status: &str,
93 filled_size: Decimal,
94 update_id: i32,
95 ) -> NewOrderInfo {
96 NewOrderInfo {
97 order_id,
98 wallet_address: test_wallet(1),
99 symbol: "BTC-20260115-100000-C".to_string(),
100 side: "Buy".to_string(),
101 price: dec!(5.0),
102 size: dec!(1000000),
103 tif: "GTC".to_string(),
104 client_id: None,
105 is_perp: false,
106 underlying: Some("BTC".to_string()),
107 reduce_only: None,
108 nonce: None,
109 signature: None,
110 mmp_enabled: false,
111 timestamp: 1700000000000,
112 status: status.to_string(),
113 filled_size,
114 last_materialized_order_update_id: update_id,
115 }
116 }
117
118 #[tokio::test]
119 async fn upsert_inserts_new_order() {
120 let test_db = TestDb::new().await.unwrap();
121 let mut conn = test_db.handler.pool().get().unwrap();
122 let info = make_order_info(1, "OPEN", dec!(0), 1);
123 let result = upsert_materialized_order_state(&mut conn, &info).unwrap();
124 assert_eq!(result, 1);
125 }
126
127 #[tokio::test]
128 async fn upsert_duplicate_does_not_overwrite() {
129 let test_db = TestDb::new().await.unwrap();
130 let mut conn = test_db.handler.pool().get().unwrap();
131 let info1 = make_order_info(2, "OPEN", dec!(0), 1);
132 upsert_materialized_order_state(&mut conn, &info1).unwrap();
133
134 let info2 = make_order_info(2, "FILLED", dec!(1000000), 2);
136 let result = upsert_materialized_order_state(&mut conn, &info2).unwrap();
137 assert_eq!(result, 1);
139 }
140
141 #[tokio::test]
142 async fn update_newer_status_applies() {
143 let test_db = TestDb::new().await.unwrap();
144 let mut conn = test_db.handler.pool().get().unwrap();
145
146 let info = make_order_info(3, "OPEN", dec!(0), 1);
147 upsert_materialized_order_state(&mut conn, &info).unwrap();
148
149 let result = update_existing_materialized_order_state_if_newer(
151 &mut conn,
152 3,
153 "PARTIALLY_FILLED",
154 dec!(500000),
155 2,
156 )
157 .unwrap();
158 assert_eq!(result, 1);
159 }
160
161 #[tokio::test]
162 async fn update_stale_status_rejected() {
163 let test_db = TestDb::new().await.unwrap();
164 let mut conn = test_db.handler.pool().get().unwrap();
165
166 let info = make_order_info(4, "OPEN", dec!(0), 5);
167 upsert_materialized_order_state(&mut conn, &info).unwrap();
168
169 let result = update_existing_materialized_order_state_if_newer(
171 &mut conn,
172 4,
173 "PARTIALLY_FILLED",
174 dec!(500000),
175 3,
176 )
177 .unwrap();
178 assert_eq!(result, 0);
179 }
180
181 #[tokio::test]
182 async fn terminal_status_cannot_be_reopened() {
183 let test_db = TestDb::new().await.unwrap();
184 let mut conn = test_db.handler.pool().get().unwrap();
185
186 let info = make_order_info(5, "CANCELED", dec!(0), 1);
187 upsert_materialized_order_state(&mut conn, &info).unwrap();
188
189 let result =
191 update_existing_materialized_order_state_if_newer(&mut conn, 5, "OPEN", dec!(0), 2)
192 .unwrap();
193 assert_eq!(result, 0);
194 }
195
196 #[tokio::test]
197 async fn terminal_update_overwrites_terminal() {
198 let test_db = TestDb::new().await.unwrap();
199 let mut conn = test_db.handler.pool().get().unwrap();
200
201 let info = make_order_info(6, "FILLED", dec!(1000000), 1);
202 upsert_materialized_order_state(&mut conn, &info).unwrap();
203
204 let result =
206 update_existing_materialized_order_state_if_newer(&mut conn, 6, "CANCELED", dec!(0), 2)
207 .unwrap();
208 assert_eq!(result, 1);
209 }
210}