Skip to main content

hypercall_db_diesel/
replay.rs

1//! JournalReplayReader implementation for DatabaseHandler.
2//!
3//! Provides paginated replay queries for engine startup: fetches
4//! commands/events after a snapshot boundary for state reconstruction.
5
6use anyhow::Result;
7use diesel::RunQueryDsl;
8
9use crate::database_handler::DatabaseHandler;
10
11// Module-level Row structs for journal replay queries.
12// Diesel's QueryableByName derive requires these at module scope when
13// referencing custom SQL types from other modules.
14#[derive(diesel::QueryableByName)]
15struct JournalCommandRow {
16    #[diesel(sql_type = diesel::sql_types::BigInt)]
17    command_id: i64,
18    #[diesel(sql_type = diesel::sql_types::Uuid)]
19    request_uuid: crate::engine_enums::DbUuid,
20    #[diesel(sql_type = crate::engine_enums::EngineCommandTypeSql)]
21    command_type: crate::engine_enums::CommandType,
22    #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Bytea>)]
23    command_data: Option<Vec<u8>>,
24    #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Bytea>)]
25    response_data: Option<Vec<u8>>,
26}
27
28#[derive(diesel::QueryableByName)]
29struct PortfolioEventRow {
30    #[diesel(sql_type = diesel::sql_types::BigInt)]
31    command_id: i64,
32    #[diesel(sql_type = crate::engine_enums::EngineEventTypeSql)]
33    event_type_enum: crate::engine_enums::EventType,
34    #[diesel(sql_type = diesel::sql_types::Binary)]
35    event_data: Vec<u8>,
36}
37
38impl hypercall_db::JournalReplayReader for DatabaseHandler {
39    fn get_next_engine_command_id_sync(&self) -> Result<i64> {
40        use diesel::sql_types::BigInt;
41
42        #[derive(diesel::QueryableByName)]
43        struct Row {
44            #[diesel(sql_type = BigInt)]
45            next_command_id: i64,
46        }
47
48        let mut conn = self.pool().get()?;
49        let row = diesel::sql_query(
50            "SELECT COALESCE(MAX(command_id), 0) + 1 AS next_command_id FROM engine_commands",
51        )
52        .get_result::<Row>(&mut conn)?;
53        Ok(row.next_command_id)
54    }
55
56    fn get_journal_command_id_bounds_sync(
57        &self,
58    ) -> Result<Option<hypercall_db::JournalCommandIdBounds>> {
59        #[derive(diesel::QueryableByName)]
60        struct Row {
61            #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::BigInt>)]
62            min_command_id: Option<i64>,
63            #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::BigInt>)]
64            max_command_id: Option<i64>,
65        }
66
67        let mut conn = self.pool().get()?;
68        let row = diesel::sql_query(
69            "SELECT MIN(command_id) AS min_command_id,
70                    MAX(command_id) AS max_command_id
71             FROM engine_commands
72             WHERE command_type_enum IS NOT NULL",
73        )
74        .get_result::<Row>(&mut conn)?;
75
76        Ok(match (row.min_command_id, row.max_command_id) {
77            (Some(min_command_id), Some(max_command_id)) => {
78                Some(hypercall_db::JournalCommandIdBounds {
79                    min_command_id,
80                    max_command_id,
81                })
82            }
83            _ => None,
84        })
85    }
86
87    fn count_non_replayable_commands_in_range_sync(
88        &self,
89        start_command_id: i64,
90        end_command_id: i64,
91    ) -> Result<i64> {
92        use diesel::sql_types::BigInt;
93
94        #[derive(diesel::QueryableByName)]
95        struct Row {
96            #[diesel(sql_type = diesel::sql_types::BigInt)]
97            command_count: i64,
98        }
99
100        if end_command_id <= start_command_id {
101            return Ok(0);
102        }
103
104        let mut conn = self.pool().get()?;
105        let row = diesel::sql_query(
106            "SELECT COUNT(*) AS command_count
107             FROM engine_commands
108             WHERE command_id > $1
109               AND command_id <= $2
110               AND command_type_enum IS NULL",
111        )
112        .bind::<BigInt, _>(start_command_id)
113        .bind::<BigInt, _>(end_command_id)
114        .get_result::<Row>(&mut conn)?;
115
116        Ok(row.command_count)
117    }
118
119    fn get_commands_with_l2_after_seq_sync(
120        &self,
121        l2_seq: i64,
122    ) -> Result<Vec<hypercall_db::ReplayCommand>> {
123        use diesel::sql_types::BigInt;
124
125        let mut conn = self.pool().get()?;
126        let results = diesel::sql_query(
127            "WITH snapshot_boundary AS (
128                SELECT COALESCE(MAX(ee.command_id), 0) AS last_cmd
129                FROM engine_events ee
130                WHERE ee.event_type_enum = 'L2Update'
131                AND ee.l2_sequence <= $1
132             )
133             SELECT ec.command_id, ec.request_uuid,
134                    ec.command_type_enum AS command_type,
135                    ec.command_data, ec.response_data
136             FROM engine_commands ec, snapshot_boundary sb
137             WHERE ec.command_id > sb.last_cmd
138               AND ec.command_type_enum IS NOT NULL
139             ORDER BY ec.command_id ASC",
140        )
141        .bind::<BigInt, _>(l2_seq)
142        .load::<JournalCommandRow>(&mut conn)?;
143
144        let commands: Vec<hypercall_db::ReplayCommand> = results
145            .into_iter()
146            .map(|row| hypercall_db::ReplayCommand {
147                command_id: row.command_id,
148                request_id: row.request_uuid.0.to_string(),
149                command_type: row.command_type.as_str().to_string(),
150                command_data: row
151                    .command_data
152                    .expect("command_data is NULL during replay cutover"),
153                response_data: row.response_data,
154            })
155            .collect();
156
157        if !commands.is_empty() {
158            tracing::info!(
159                "Found {} commands with L2 events after seq {} for replay",
160                commands.len(),
161                l2_seq
162            );
163        }
164
165        Ok(commands)
166    }
167
168    fn get_replay_commands_after_command_id_sync(
169        &self,
170        after_command_id: i64,
171        up_to_command_id: Option<i64>,
172        limit: i64,
173    ) -> Result<Vec<hypercall_db::ReplayCommand>> {
174        use diesel::sql_types::BigInt;
175
176        if limit <= 0 {
177            anyhow::bail!("replay command query limit must be positive, got {}", limit);
178        }
179
180        let mut conn = self.pool().get()?;
181        let results = if let Some(upper_bound) = up_to_command_id {
182            diesel::sql_query(
183                "SELECT ec.command_id, ec.request_uuid,
184                        ec.command_type_enum AS command_type,
185                        ec.command_data, ec.response_data
186                 FROM engine_commands ec
187                 WHERE ec.command_id > $1
188                   AND ec.command_id <= $2
189                   AND ec.command_type_enum IS NOT NULL
190                 ORDER BY ec.command_id ASC
191                 LIMIT $3",
192            )
193            .bind::<BigInt, _>(after_command_id)
194            .bind::<BigInt, _>(upper_bound)
195            .bind::<BigInt, _>(limit)
196            .load::<JournalCommandRow>(&mut conn)?
197        } else {
198            diesel::sql_query(
199                "SELECT ec.command_id, ec.request_uuid,
200                        ec.command_type_enum AS command_type,
201                        ec.command_data, ec.response_data
202                 FROM engine_commands ec
203                 WHERE ec.command_id > $1
204                   AND ec.command_type_enum IS NOT NULL
205                 ORDER BY ec.command_id ASC
206                 LIMIT $2",
207            )
208            .bind::<BigInt, _>(after_command_id)
209            .bind::<BigInt, _>(limit)
210            .load::<JournalCommandRow>(&mut conn)?
211        };
212
213        Ok(results
214            .into_iter()
215            .map(|row| hypercall_db::ReplayCommand {
216                command_id: row.command_id,
217                request_id: row.request_uuid.0.to_string(),
218                command_type: row.command_type.as_str().to_string(),
219                command_data: row
220                    .command_data
221                    .expect("command_data is NULL during replay cutover"),
222                response_data: row.response_data,
223            })
224            .collect())
225    }
226
227    fn get_fill_events_for_command_range_sync(
228        &self,
229        start_command_id: i64,
230        end_command_id: i64,
231    ) -> Result<Vec<Vec<u8>>> {
232        use diesel::sql_types::BigInt;
233
234        #[derive(diesel::QueryableByName)]
235        struct Row {
236            #[diesel(sql_type = diesel::sql_types::Binary)]
237            event_data: Vec<u8>,
238        }
239
240        let mut conn = self.pool().get()?;
241        let results = diesel::sql_query(
242            "SELECT ee.event_data
243             FROM engine_events ee
244             WHERE ee.event_type_enum = 'OrderFilled'
245               AND ee.command_id > $1
246               AND ee.command_id <= $2
247             ORDER BY ee.event_id ASC",
248        )
249        .bind::<BigInt, _>(start_command_id)
250        .bind::<BigInt, _>(end_command_id)
251        .load::<Row>(&mut conn)?;
252
253        Ok(results.into_iter().map(|row| row.event_data).collect())
254    }
255
256    fn get_portfolio_events_for_command_range_sync(
257        &self,
258        start_command_id: i64,
259        end_command_id: i64,
260    ) -> Result<Vec<hypercall_db::PortfolioReplayEvent>> {
261        use diesel::sql_types::BigInt;
262
263        let mut conn = self.pool().get()?;
264        let results = diesel::sql_query(
265            "SELECT ee.command_id, ee.event_type_enum, ee.event_data
266             FROM engine_events ee
267             WHERE ee.command_id >= $1
268               AND ee.command_id <= $2
269               AND ee.event_type_enum IN ('OrderFilled', 'PositionExpired')
270             ORDER BY ee.command_id ASC, ee.event_idx ASC",
271        )
272        .bind::<BigInt, _>(start_command_id)
273        .bind::<BigInt, _>(end_command_id)
274        .load::<PortfolioEventRow>(&mut conn)?;
275
276        Ok(results
277            .into_iter()
278            .map(|row| hypercall_db::PortfolioReplayEvent {
279                command_id: row.command_id,
280                event_type: row.event_type_enum.into(),
281                event_data: row.event_data,
282            })
283            .collect())
284    }
285
286    fn get_order_update_events_for_command_range_sync(
287        &self,
288        start_command_id: i64,
289        end_command_id: i64,
290    ) -> Result<Vec<Vec<u8>>> {
291        use diesel::sql_types::BigInt;
292
293        #[derive(diesel::QueryableByName)]
294        struct Row {
295            #[diesel(sql_type = diesel::sql_types::Binary)]
296            event_data: Vec<u8>,
297        }
298
299        let mut conn = self.pool().get()?;
300        let results = diesel::sql_query(
301            "SELECT ee.event_data
302             FROM engine_events ee
303             WHERE ee.event_type_enum = 'OrderUpdate'
304               AND ee.command_id > $1
305               AND ee.command_id <= $2
306             ORDER BY ee.event_id ASC",
307        )
308        .bind::<BigInt, _>(start_command_id)
309        .bind::<BigInt, _>(end_command_id)
310        .load::<Row>(&mut conn)?;
311
312        Ok(results.into_iter().map(|row| row.event_data).collect())
313    }
314
315    fn get_max_l2_seq_from_events_sync(&self) -> Result<i64> {
316        #[derive(diesel::QueryableByName)]
317        struct Row {
318            #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::BigInt>)]
319            max_seq: Option<i64>,
320        }
321
322        let mut conn = self.pool().get()?;
323        let result = diesel::sql_query(
324            "SELECT MAX(l2_sequence) as max_seq
325             FROM engine_events
326             WHERE event_type_enum = 'L2Update'
327             AND l2_sequence IS NOT NULL",
328        )
329        .get_result::<Row>(&mut conn)?;
330
331        Ok(result.max_seq.unwrap_or(0))
332    }
333
334    fn get_fill_events_after_seq_sync(&self, l2_seq: i64) -> Result<Vec<Vec<u8>>> {
335        use diesel::sql_types::BigInt;
336
337        #[derive(diesel::QueryableByName)]
338        struct Row {
339            #[diesel(sql_type = diesel::sql_types::Binary)]
340            event_data: Vec<u8>,
341        }
342
343        let mut conn = self.pool().get()?;
344        let results = diesel::sql_query(
345            "WITH snapshot_boundary AS (
346                SELECT COALESCE(MAX(ee.command_id), 0) AS last_cmd
347                FROM engine_events ee
348                WHERE ee.event_type_enum = 'L2Update'
349                AND ee.l2_sequence <= $1
350             )
351             SELECT ee.event_data
352             FROM engine_events ee, snapshot_boundary sb
353             WHERE ee.event_type_enum = 'OrderFilled'
354             AND ee.command_id > sb.last_cmd
355             ORDER BY ee.event_id ASC",
356        )
357        .bind::<BigInt, _>(l2_seq)
358        .load::<Row>(&mut conn)?;
359
360        let events: Vec<Vec<u8>> = results.into_iter().map(|row| row.event_data).collect();
361
362        if !events.is_empty() {
363            tracing::info!(
364                "Found {} OrderFilled events after L2 seq {} for replay",
365                events.len(),
366                l2_seq
367            );
368        }
369
370        Ok(events)
371    }
372
373    fn get_order_update_events_after_seq_sync(&self, l2_seq: i64) -> Result<Vec<Vec<u8>>> {
374        use diesel::sql_types::BigInt;
375
376        #[derive(diesel::QueryableByName)]
377        struct Row {
378            #[diesel(sql_type = diesel::sql_types::Binary)]
379            event_data: Vec<u8>,
380        }
381
382        let mut conn = self.pool().get()?;
383        let results = diesel::sql_query(
384            "WITH snapshot_boundary AS (
385                SELECT COALESCE(MAX(ee.command_id), 0) AS last_cmd
386                FROM engine_events ee
387                WHERE ee.event_type_enum = 'L2Update'
388                AND ee.l2_sequence <= $1
389             )
390             SELECT ee.event_data
391             FROM engine_events ee, snapshot_boundary sb
392             WHERE ee.event_type_enum = 'OrderUpdate'
393             AND ee.command_id > sb.last_cmd
394             ORDER BY ee.event_id ASC",
395        )
396        .bind::<BigInt, _>(l2_seq)
397        .load::<Row>(&mut conn)?;
398
399        let events: Vec<Vec<u8>> = results.into_iter().map(|row| row.event_data).collect();
400
401        if !events.is_empty() {
402            tracing::info!(
403                "Found {} OrderUpdate events after L2 seq {} for replay Pass 3",
404                events.len(),
405                l2_seq
406            );
407        }
408
409        Ok(events)
410    }
411}
412
413#[cfg(test)]
414mod tests {
415    use crate::test_helpers::TestDb;
416    use hypercall_db::*;
417
418    #[tokio::test]
419    async fn replay_next_command_id_empty_table() {
420        let test_db = TestDb::new().await.unwrap();
421        let db = test_db.handler.as_ref();
422        let next_id = db.get_next_engine_command_id_sync().unwrap();
423        assert_eq!(next_id, 1);
424    }
425
426    #[tokio::test]
427    async fn replay_max_l2_seq_empty_table() {
428        let test_db = TestDb::new().await.unwrap();
429        let db = test_db.handler.as_ref();
430        let max_seq = db.get_max_l2_seq_from_events_sync().unwrap();
431        assert_eq!(max_seq, 0);
432    }
433
434    #[tokio::test]
435    async fn replay_command_bounds_empty_table() {
436        let test_db = TestDb::new().await.unwrap();
437        let db = test_db.handler.as_ref();
438        let bounds = db.get_journal_command_id_bounds_sync().unwrap();
439        assert_eq!(bounds, None);
440    }
441
442    #[tokio::test]
443    async fn replay_commands_with_l2_empty() {
444        let test_db = TestDb::new().await.unwrap();
445        let db = test_db.handler.as_ref();
446        let cmds = db.get_commands_with_l2_after_seq_sync(0).unwrap();
447        assert!(cmds.is_empty());
448    }
449
450    #[tokio::test]
451    async fn replay_fill_events_empty_range() {
452        let test_db = TestDb::new().await.unwrap();
453        let db = test_db.handler.as_ref();
454        let events = db.get_fill_events_for_command_range_sync(0, 100).unwrap();
455        assert!(events.is_empty());
456    }
457
458    #[tokio::test]
459    async fn replay_order_update_events_empty_range() {
460        let test_db = TestDb::new().await.unwrap();
461        let db = test_db.handler.as_ref();
462        let events = db
463            .get_order_update_events_for_command_range_sync(0, 100)
464            .unwrap();
465        assert!(events.is_empty());
466    }
467
468    #[tokio::test]
469    async fn replay_portfolio_events_empty_range() {
470        let test_db = TestDb::new().await.unwrap();
471        let db = test_db.handler.as_ref();
472        let events = db
473            .get_portfolio_events_for_command_range_sync(0, 100)
474            .unwrap();
475        assert!(events.is_empty());
476    }
477}