1use anyhow::Result;
7use diesel::RunQueryDsl;
8
9use crate::database_handler::DatabaseHandler;
10
11#[derive(diesel::QueryableByName)]
15struct JournalCommandRow {
16 #[diesel(sql_type = diesel::sql_types::BigInt)]
17 command_id: i64,
18 #[diesel(sql_type = diesel::sql_types::Uuid)]
19 request_uuid: crate::engine_enums::DbUuid,
20 #[diesel(sql_type = crate::engine_enums::EngineCommandTypeSql)]
21 command_type: crate::engine_enums::CommandType,
22 #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Bytea>)]
23 command_data: Option<Vec<u8>>,
24 #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Bytea>)]
25 response_data: Option<Vec<u8>>,
26}
27
28#[derive(diesel::QueryableByName)]
29struct PortfolioEventRow {
30 #[diesel(sql_type = diesel::sql_types::BigInt)]
31 command_id: i64,
32 #[diesel(sql_type = crate::engine_enums::EngineEventTypeSql)]
33 event_type_enum: crate::engine_enums::EventType,
34 #[diesel(sql_type = diesel::sql_types::Binary)]
35 event_data: Vec<u8>,
36}
37
38impl hypercall_db::JournalReplayReader for DatabaseHandler {
39 fn get_next_engine_command_id_sync(&self) -> Result<i64> {
40 use diesel::sql_types::BigInt;
41
42 #[derive(diesel::QueryableByName)]
43 struct Row {
44 #[diesel(sql_type = BigInt)]
45 next_command_id: i64,
46 }
47
48 let mut conn = self.pool().get()?;
49 let row = diesel::sql_query(
50 "SELECT COALESCE(MAX(command_id), 0) + 1 AS next_command_id FROM engine_commands",
51 )
52 .get_result::<Row>(&mut conn)?;
53 Ok(row.next_command_id)
54 }
55
56 fn get_journal_command_id_bounds_sync(
57 &self,
58 ) -> Result<Option<hypercall_db::JournalCommandIdBounds>> {
59 #[derive(diesel::QueryableByName)]
60 struct Row {
61 #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::BigInt>)]
62 min_command_id: Option<i64>,
63 #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::BigInt>)]
64 max_command_id: Option<i64>,
65 }
66
67 let mut conn = self.pool().get()?;
68 let row = diesel::sql_query(
69 "SELECT MIN(command_id) AS min_command_id,
70 MAX(command_id) AS max_command_id
71 FROM engine_commands
72 WHERE command_type_enum IS NOT NULL",
73 )
74 .get_result::<Row>(&mut conn)?;
75
76 Ok(match (row.min_command_id, row.max_command_id) {
77 (Some(min_command_id), Some(max_command_id)) => {
78 Some(hypercall_db::JournalCommandIdBounds {
79 min_command_id,
80 max_command_id,
81 })
82 }
83 _ => None,
84 })
85 }
86
87 fn count_non_replayable_commands_in_range_sync(
88 &self,
89 start_command_id: i64,
90 end_command_id: i64,
91 ) -> Result<i64> {
92 use diesel::sql_types::BigInt;
93
94 #[derive(diesel::QueryableByName)]
95 struct Row {
96 #[diesel(sql_type = diesel::sql_types::BigInt)]
97 command_count: i64,
98 }
99
100 if end_command_id <= start_command_id {
101 return Ok(0);
102 }
103
104 let mut conn = self.pool().get()?;
105 let row = diesel::sql_query(
106 "SELECT COUNT(*) AS command_count
107 FROM engine_commands
108 WHERE command_id > $1
109 AND command_id <= $2
110 AND command_type_enum IS NULL",
111 )
112 .bind::<BigInt, _>(start_command_id)
113 .bind::<BigInt, _>(end_command_id)
114 .get_result::<Row>(&mut conn)?;
115
116 Ok(row.command_count)
117 }
118
119 fn get_commands_with_l2_after_seq_sync(
120 &self,
121 l2_seq: i64,
122 ) -> Result<Vec<hypercall_db::ReplayCommand>> {
123 use diesel::sql_types::BigInt;
124
125 let mut conn = self.pool().get()?;
126 let results = diesel::sql_query(
127 "WITH snapshot_boundary AS (
128 SELECT COALESCE(MAX(ee.command_id), 0) AS last_cmd
129 FROM engine_events ee
130 WHERE ee.event_type_enum = 'L2Update'
131 AND ee.l2_sequence <= $1
132 )
133 SELECT ec.command_id, ec.request_uuid,
134 ec.command_type_enum AS command_type,
135 ec.command_data, ec.response_data
136 FROM engine_commands ec, snapshot_boundary sb
137 WHERE ec.command_id > sb.last_cmd
138 AND ec.command_type_enum IS NOT NULL
139 ORDER BY ec.command_id ASC",
140 )
141 .bind::<BigInt, _>(l2_seq)
142 .load::<JournalCommandRow>(&mut conn)?;
143
144 let commands: Vec<hypercall_db::ReplayCommand> = results
145 .into_iter()
146 .map(|row| hypercall_db::ReplayCommand {
147 command_id: row.command_id,
148 request_id: row.request_uuid.0.to_string(),
149 command_type: row.command_type.as_str().to_string(),
150 command_data: row
151 .command_data
152 .expect("command_data is NULL during replay cutover"),
153 response_data: row.response_data,
154 })
155 .collect();
156
157 if !commands.is_empty() {
158 tracing::info!(
159 "Found {} commands with L2 events after seq {} for replay",
160 commands.len(),
161 l2_seq
162 );
163 }
164
165 Ok(commands)
166 }
167
168 fn get_replay_commands_after_command_id_sync(
169 &self,
170 after_command_id: i64,
171 up_to_command_id: Option<i64>,
172 limit: i64,
173 ) -> Result<Vec<hypercall_db::ReplayCommand>> {
174 use diesel::sql_types::BigInt;
175
176 if limit <= 0 {
177 anyhow::bail!("replay command query limit must be positive, got {}", limit);
178 }
179
180 let mut conn = self.pool().get()?;
181 let results = if let Some(upper_bound) = up_to_command_id {
182 diesel::sql_query(
183 "SELECT ec.command_id, ec.request_uuid,
184 ec.command_type_enum AS command_type,
185 ec.command_data, ec.response_data
186 FROM engine_commands ec
187 WHERE ec.command_id > $1
188 AND ec.command_id <= $2
189 AND ec.command_type_enum IS NOT NULL
190 ORDER BY ec.command_id ASC
191 LIMIT $3",
192 )
193 .bind::<BigInt, _>(after_command_id)
194 .bind::<BigInt, _>(upper_bound)
195 .bind::<BigInt, _>(limit)
196 .load::<JournalCommandRow>(&mut conn)?
197 } else {
198 diesel::sql_query(
199 "SELECT ec.command_id, ec.request_uuid,
200 ec.command_type_enum AS command_type,
201 ec.command_data, ec.response_data
202 FROM engine_commands ec
203 WHERE ec.command_id > $1
204 AND ec.command_type_enum IS NOT NULL
205 ORDER BY ec.command_id ASC
206 LIMIT $2",
207 )
208 .bind::<BigInt, _>(after_command_id)
209 .bind::<BigInt, _>(limit)
210 .load::<JournalCommandRow>(&mut conn)?
211 };
212
213 Ok(results
214 .into_iter()
215 .map(|row| hypercall_db::ReplayCommand {
216 command_id: row.command_id,
217 request_id: row.request_uuid.0.to_string(),
218 command_type: row.command_type.as_str().to_string(),
219 command_data: row
220 .command_data
221 .expect("command_data is NULL during replay cutover"),
222 response_data: row.response_data,
223 })
224 .collect())
225 }
226
227 fn get_fill_events_for_command_range_sync(
228 &self,
229 start_command_id: i64,
230 end_command_id: i64,
231 ) -> Result<Vec<Vec<u8>>> {
232 use diesel::sql_types::BigInt;
233
234 #[derive(diesel::QueryableByName)]
235 struct Row {
236 #[diesel(sql_type = diesel::sql_types::Binary)]
237 event_data: Vec<u8>,
238 }
239
240 let mut conn = self.pool().get()?;
241 let results = diesel::sql_query(
242 "SELECT ee.event_data
243 FROM engine_events ee
244 WHERE ee.event_type_enum = 'OrderFilled'
245 AND ee.command_id > $1
246 AND ee.command_id <= $2
247 ORDER BY ee.event_id ASC",
248 )
249 .bind::<BigInt, _>(start_command_id)
250 .bind::<BigInt, _>(end_command_id)
251 .load::<Row>(&mut conn)?;
252
253 Ok(results.into_iter().map(|row| row.event_data).collect())
254 }
255
256 fn get_portfolio_events_for_command_range_sync(
257 &self,
258 start_command_id: i64,
259 end_command_id: i64,
260 ) -> Result<Vec<hypercall_db::PortfolioReplayEvent>> {
261 use diesel::sql_types::BigInt;
262
263 let mut conn = self.pool().get()?;
264 let results = diesel::sql_query(
265 "SELECT ee.command_id, ee.event_type_enum, ee.event_data
266 FROM engine_events ee
267 WHERE ee.command_id >= $1
268 AND ee.command_id <= $2
269 AND ee.event_type_enum IN ('OrderFilled', 'PositionExpired')
270 ORDER BY ee.command_id ASC, ee.event_idx ASC",
271 )
272 .bind::<BigInt, _>(start_command_id)
273 .bind::<BigInt, _>(end_command_id)
274 .load::<PortfolioEventRow>(&mut conn)?;
275
276 Ok(results
277 .into_iter()
278 .map(|row| hypercall_db::PortfolioReplayEvent {
279 command_id: row.command_id,
280 event_type: row.event_type_enum.into(),
281 event_data: row.event_data,
282 })
283 .collect())
284 }
285
286 fn get_order_update_events_for_command_range_sync(
287 &self,
288 start_command_id: i64,
289 end_command_id: i64,
290 ) -> Result<Vec<Vec<u8>>> {
291 use diesel::sql_types::BigInt;
292
293 #[derive(diesel::QueryableByName)]
294 struct Row {
295 #[diesel(sql_type = diesel::sql_types::Binary)]
296 event_data: Vec<u8>,
297 }
298
299 let mut conn = self.pool().get()?;
300 let results = diesel::sql_query(
301 "SELECT ee.event_data
302 FROM engine_events ee
303 WHERE ee.event_type_enum = 'OrderUpdate'
304 AND ee.command_id > $1
305 AND ee.command_id <= $2
306 ORDER BY ee.event_id ASC",
307 )
308 .bind::<BigInt, _>(start_command_id)
309 .bind::<BigInt, _>(end_command_id)
310 .load::<Row>(&mut conn)?;
311
312 Ok(results.into_iter().map(|row| row.event_data).collect())
313 }
314
315 fn get_max_l2_seq_from_events_sync(&self) -> Result<i64> {
316 #[derive(diesel::QueryableByName)]
317 struct Row {
318 #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::BigInt>)]
319 max_seq: Option<i64>,
320 }
321
322 let mut conn = self.pool().get()?;
323 let result = diesel::sql_query(
324 "SELECT MAX(l2_sequence) as max_seq
325 FROM engine_events
326 WHERE event_type_enum = 'L2Update'
327 AND l2_sequence IS NOT NULL",
328 )
329 .get_result::<Row>(&mut conn)?;
330
331 Ok(result.max_seq.unwrap_or(0))
332 }
333
334 fn get_fill_events_after_seq_sync(&self, l2_seq: i64) -> Result<Vec<Vec<u8>>> {
335 use diesel::sql_types::BigInt;
336
337 #[derive(diesel::QueryableByName)]
338 struct Row {
339 #[diesel(sql_type = diesel::sql_types::Binary)]
340 event_data: Vec<u8>,
341 }
342
343 let mut conn = self.pool().get()?;
344 let results = diesel::sql_query(
345 "WITH snapshot_boundary AS (
346 SELECT COALESCE(MAX(ee.command_id), 0) AS last_cmd
347 FROM engine_events ee
348 WHERE ee.event_type_enum = 'L2Update'
349 AND ee.l2_sequence <= $1
350 )
351 SELECT ee.event_data
352 FROM engine_events ee, snapshot_boundary sb
353 WHERE ee.event_type_enum = 'OrderFilled'
354 AND ee.command_id > sb.last_cmd
355 ORDER BY ee.event_id ASC",
356 )
357 .bind::<BigInt, _>(l2_seq)
358 .load::<Row>(&mut conn)?;
359
360 let events: Vec<Vec<u8>> = results.into_iter().map(|row| row.event_data).collect();
361
362 if !events.is_empty() {
363 tracing::info!(
364 "Found {} OrderFilled events after L2 seq {} for replay",
365 events.len(),
366 l2_seq
367 );
368 }
369
370 Ok(events)
371 }
372
373 fn get_order_update_events_after_seq_sync(&self, l2_seq: i64) -> Result<Vec<Vec<u8>>> {
374 use diesel::sql_types::BigInt;
375
376 #[derive(diesel::QueryableByName)]
377 struct Row {
378 #[diesel(sql_type = diesel::sql_types::Binary)]
379 event_data: Vec<u8>,
380 }
381
382 let mut conn = self.pool().get()?;
383 let results = diesel::sql_query(
384 "WITH snapshot_boundary AS (
385 SELECT COALESCE(MAX(ee.command_id), 0) AS last_cmd
386 FROM engine_events ee
387 WHERE ee.event_type_enum = 'L2Update'
388 AND ee.l2_sequence <= $1
389 )
390 SELECT ee.event_data
391 FROM engine_events ee, snapshot_boundary sb
392 WHERE ee.event_type_enum = 'OrderUpdate'
393 AND ee.command_id > sb.last_cmd
394 ORDER BY ee.event_id ASC",
395 )
396 .bind::<BigInt, _>(l2_seq)
397 .load::<Row>(&mut conn)?;
398
399 let events: Vec<Vec<u8>> = results.into_iter().map(|row| row.event_data).collect();
400
401 if !events.is_empty() {
402 tracing::info!(
403 "Found {} OrderUpdate events after L2 seq {} for replay Pass 3",
404 events.len(),
405 l2_seq
406 );
407 }
408
409 Ok(events)
410 }
411}
412
413#[cfg(test)]
414mod tests {
415 use crate::test_helpers::TestDb;
416 use hypercall_db::*;
417
418 #[tokio::test]
419 async fn replay_next_command_id_empty_table() {
420 let test_db = TestDb::new().await.unwrap();
421 let db = test_db.handler.as_ref();
422 let next_id = db.get_next_engine_command_id_sync().unwrap();
423 assert_eq!(next_id, 1);
424 }
425
426 #[tokio::test]
427 async fn replay_max_l2_seq_empty_table() {
428 let test_db = TestDb::new().await.unwrap();
429 let db = test_db.handler.as_ref();
430 let max_seq = db.get_max_l2_seq_from_events_sync().unwrap();
431 assert_eq!(max_seq, 0);
432 }
433
434 #[tokio::test]
435 async fn replay_command_bounds_empty_table() {
436 let test_db = TestDb::new().await.unwrap();
437 let db = test_db.handler.as_ref();
438 let bounds = db.get_journal_command_id_bounds_sync().unwrap();
439 assert_eq!(bounds, None);
440 }
441
442 #[tokio::test]
443 async fn replay_commands_with_l2_empty() {
444 let test_db = TestDb::new().await.unwrap();
445 let db = test_db.handler.as_ref();
446 let cmds = db.get_commands_with_l2_after_seq_sync(0).unwrap();
447 assert!(cmds.is_empty());
448 }
449
450 #[tokio::test]
451 async fn replay_fill_events_empty_range() {
452 let test_db = TestDb::new().await.unwrap();
453 let db = test_db.handler.as_ref();
454 let events = db.get_fill_events_for_command_range_sync(0, 100).unwrap();
455 assert!(events.is_empty());
456 }
457
458 #[tokio::test]
459 async fn replay_order_update_events_empty_range() {
460 let test_db = TestDb::new().await.unwrap();
461 let db = test_db.handler.as_ref();
462 let events = db
463 .get_order_update_events_for_command_range_sync(0, 100)
464 .unwrap();
465 assert!(events.is_empty());
466 }
467
468 #[tokio::test]
469 async fn replay_portfolio_events_empty_range() {
470 let test_db = TestDb::new().await.unwrap();
471 let db = test_db.handler.as_ref();
472 let events = db
473 .get_portfolio_events_for_command_range_sync(0, 100)
474 .unwrap();
475 assert!(events.is_empty());
476 }
477}