Skip to main content

hypercall_db_diesel/
archiver.rs

1//! Diesel implementation of ArchiverReader and ArchiverWriter traits.
2//!
3//! Moves all SQL from `tools/db-archiver/src/db.rs` into the shared
4//! persistence layer so the archiver binary becomes a thin orchestrator.
5
6use anyhow::{Context, Result};
7use chrono::{DateTime, Duration, Utc};
8use diesel::prelude::*;
9use diesel::sql_query;
10use diesel::sql_types::{BigInt, Nullable};
11use tracing::{info, warn};
12
13use crate::database_handler::DatabaseHandler;
14use crate::engine_enums::{CommandType, DbUuid, EventType};
15use crate::models::{OrderActionRecord, OrderUpdateRecord};
16use crate::schema::{
17    engine_command_digests, engine_commands, engine_events, notifications, order_actions,
18    order_updates,
19};
20use hypercall_db::{
21    ArchivalBoundary, ArchivedCommand, ArchivedEvent, ArchivedOrderAction, ArchivedOrderUpdate,
22    ArchiverReader, ArchiverWriter,
23};
24
25// ---------------------------------------------------------------------------
26// Private Diesel row types (same pattern as analytics.rs)
27// ---------------------------------------------------------------------------
28
29/// Typed struct matching engine_events columns.
30#[derive(Queryable, Selectable, Debug)]
31#[diesel(table_name = engine_events)]
32struct EventRow {
33    pub event_id: i64,
34    pub command_id: i64,
35    pub event_idx: i32,
36    pub event_data: Vec<u8>,
37    pub event_key: Option<String>,
38    pub l2_sequence: Option<i64>,
39    pub created_at: DateTime<Utc>,
40    pub published_at: Option<DateTime<Utc>>,
41    pub publish_attempts: i32,
42    pub last_publish_error: Option<String>,
43    pub last_publish_attempt_at: Option<DateTime<Utc>>,
44    pub event_type_enum: EventType,
45}
46
47impl From<EventRow> for ArchivedEvent {
48    fn from(r: EventRow) -> Self {
49        Self {
50            event_id: r.event_id,
51            command_id: r.command_id,
52            event_idx: r.event_idx,
53            event_data: r.event_data,
54            event_key: r.event_key,
55            l2_sequence: r.l2_sequence,
56            created_at: r.created_at,
57            published_at: r.published_at,
58            publish_attempts: r.publish_attempts,
59            last_publish_error: r.last_publish_error,
60            last_publish_attempt_at: r.last_publish_attempt_at,
61            event_type: r.event_type_enum.as_str().to_string(),
62        }
63    }
64}
65
66/// Typed struct matching engine_commands columns.
67#[derive(Queryable, Selectable, QueryableByName, Debug)]
68#[diesel(table_name = engine_commands)]
69struct CommandRow {
70    pub command_id: i64,
71    pub received_ts_ms: i64,
72    pub command_data: Option<Vec<u8>>,
73    pub response_data: Option<Vec<u8>>,
74    pub order_id: Option<i64>,
75    pub request_uuid: DbUuid,
76    pub command_type_enum: Option<CommandType>,
77    pub created_at: DateTime<Utc>,
78}
79
80impl From<CommandRow> for ArchivedCommand {
81    fn from(r: CommandRow) -> Self {
82        Self {
83            command_id: r.command_id,
84            received_ts_ms: r.received_ts_ms,
85            command_data: r.command_data,
86            response_data: r.response_data,
87            order_id: r.order_id,
88            request_uuid: r.request_uuid.to_string(),
89            command_type: r.command_type_enum.map(|ct| ct.as_str().to_string()),
90            created_at: r.created_at,
91        }
92    }
93}
94
95impl From<OrderActionRecord> for ArchivedOrderAction {
96    fn from(r: OrderActionRecord) -> Self {
97        Self {
98            id: r.id,
99            timestamp: r.timestamp,
100            wallet: r.wallet.0.as_slice().to_vec(),
101            action: r.action,
102            symbol: r.symbol,
103            price: r.price,
104            size: r.size,
105            side: r.side,
106            tif: r.tif,
107            client_id: r.client_id,
108            created_at: r.created_at,
109        }
110    }
111}
112
113impl From<OrderUpdateRecord> for ArchivedOrderUpdate {
114    fn from(r: OrderUpdateRecord) -> Self {
115        Self {
116            id: r.id,
117            timestamp: r.timestamp,
118            order_id: r.order_id,
119            status: r.status,
120            reason: r.reason,
121            filled_size: r.filled_size,
122            symbol: r.symbol,
123            price: r.price,
124            size: r.size,
125            side: r.side,
126            tif: r.tif,
127            client_id: r.client_id,
128            created_at: r.created_at,
129        }
130    }
131}
132
133// ---------------------------------------------------------------------------
134// ArchiverReader
135// ---------------------------------------------------------------------------
136
137impl ArchiverReader for DatabaseHandler {
138    fn find_safe_boundary(&self, min_age_hours: i64) -> Result<ArchivalBoundary> {
139        let mut conn = self.pool().get().context("failed to get connection")?;
140        let age_cutoff = Utc::now() - Duration::hours(min_age_hours);
141
142        #[derive(diesel::QueryableByName, Debug)]
143        struct BoundaryRow {
144            #[diesel(sql_type = BigInt)]
145            last_command_id: i64,
146        }
147
148        let snapshot_boundary: i64 = sql_query(
149            "SELECT COALESCE(\
150                 (SELECT last_command_id FROM engine_snapshot_boundary WHERE id = 1), \
151                 0\
152             ) AS last_command_id",
153        )
154        .get_result::<BoundaryRow>(&mut conn)
155        .map(|r| r.last_command_id)
156        .unwrap_or(0);
157
158        if snapshot_boundary <= 0 {
159            warn!(
160                age_cutoff = %age_cutoff,
161                "engine journal archival disabled: no snapshot boundary persisted yet"
162            );
163            return Ok(ArchivalBoundary {
164                max_command_id: 0,
165                age_cutoff,
166            });
167        }
168
169        info!(
170            snapshot_boundary,
171            age_cutoff = %age_cutoff,
172            "archival boundary: engine_commands up to snapshot last_command_id"
173        );
174
175        Ok(ArchivalBoundary {
176            max_command_id: snapshot_boundary,
177            age_cutoff,
178        })
179    }
180
181    fn fetch_events_batch(
182        &self,
183        max_command_id: i64,
184        age_cutoff: &DateTime<Utc>,
185        batch_size: i64,
186        after_event_id: Option<i64>,
187    ) -> Result<Vec<ArchivedEvent>> {
188        let mut conn = self.pool().get().context("failed to get connection")?;
189
190        let mut query = engine_events::table
191            .into_boxed()
192            .filter(engine_events::command_id.le(max_command_id))
193            .filter(engine_events::created_at.lt(age_cutoff));
194        if let Some(after_event_id) = after_event_id {
195            query = query.filter(engine_events::event_id.gt(after_event_id));
196        }
197
198        let rows = query
199            .order(engine_events::event_id.asc())
200            .limit(batch_size)
201            .select(EventRow::as_select())
202            .load::<EventRow>(&mut conn)
203            .context("failed to fetch engine_events batch")?;
204
205        Ok(rows.into_iter().map(ArchivedEvent::from).collect())
206    }
207
208    fn fetch_orphan_commands_batch(
209        &self,
210        max_command_id: i64,
211        age_cutoff: &DateTime<Utc>,
212        batch_size: i64,
213        after_command_id: Option<i64>,
214    ) -> Result<Vec<ArchivedCommand>> {
215        let mut conn = self.pool().get().context("failed to get connection")?;
216
217        let rows = sql_query(
218            r#"
219            SELECT ec.command_id, ec.received_ts_ms, ec.command_data, ec.response_data,
220                   ec.order_id, ec.request_uuid, ec.command_type_enum, ec.created_at
221            FROM engine_commands ec
222            WHERE NOT EXISTS (SELECT 1 FROM engine_events ee WHERE ee.command_id = ec.command_id)
223              AND ec.command_id <= $1
224              AND ec.created_at < $2
225              AND ($3 IS NULL OR ec.command_id > $3)
226            ORDER BY ec.command_id ASC
227            LIMIT $4
228            "#,
229        )
230        .bind::<BigInt, _>(max_command_id)
231        .bind::<diesel::sql_types::Timestamptz, _>(age_cutoff)
232        .bind::<Nullable<BigInt>, _>(after_command_id)
233        .bind::<BigInt, _>(batch_size)
234        .load::<CommandRow>(&mut conn)
235        .context("failed to fetch orphan engine_commands batch")?;
236
237        Ok(rows.into_iter().map(ArchivedCommand::from).collect())
238    }
239
240    fn fetch_order_actions_batch(
241        &self,
242        age_cutoff: &DateTime<Utc>,
243        batch_size: i64,
244        after_id: Option<i32>,
245    ) -> Result<Vec<ArchivedOrderAction>> {
246        let mut conn = self.pool().get().context("failed to get connection")?;
247        let cutoff_naive = age_cutoff.naive_utc();
248
249        let mut query = order_actions::table
250            .into_boxed()
251            .filter(order_actions::created_at.lt(cutoff_naive));
252        if let Some(after_id) = after_id {
253            query = query.filter(order_actions::id.gt(Some(after_id)));
254        }
255
256        let rows = query
257            .order(order_actions::id.asc())
258            .limit(batch_size)
259            .select(OrderActionRecord::as_select())
260            .load::<OrderActionRecord>(&mut conn)
261            .context("failed to fetch order_actions batch")?;
262
263        Ok(rows.into_iter().map(ArchivedOrderAction::from).collect())
264    }
265
266    fn fetch_order_updates_batch(
267        &self,
268        age_cutoff: &DateTime<Utc>,
269        batch_size: i64,
270        after_id: Option<i32>,
271    ) -> Result<Vec<ArchivedOrderUpdate>> {
272        let mut conn = self.pool().get().context("failed to get connection")?;
273        let cutoff_naive = age_cutoff.naive_utc();
274        let cutoff_ms = age_cutoff.timestamp_millis();
275
276        let mut query = order_updates::table.into_boxed().filter(
277            order_updates::created_at
278                .lt(cutoff_naive)
279                .or(order_updates::created_at
280                    .is_null()
281                    .and(order_updates::timestamp.lt(cutoff_ms))),
282        );
283        if let Some(after_id) = after_id {
284            query = query.filter(order_updates::id.gt(Some(after_id)));
285        }
286
287        let rows = query
288            .order(order_updates::id.asc())
289            .limit(batch_size)
290            .select(OrderUpdateRecord::as_select())
291            .load::<OrderUpdateRecord>(&mut conn)
292            .context("failed to fetch order_updates batch")?;
293
294        Ok(rows.into_iter().map(ArchivedOrderUpdate::from).collect())
295    }
296}
297
298// ---------------------------------------------------------------------------
299// ArchiverWriter
300// ---------------------------------------------------------------------------
301
302impl ArchiverWriter for DatabaseHandler {
303    fn delete_events(&self, event_ids: &[i64]) -> Result<usize> {
304        let mut conn = self.pool().get().context("failed to get connection")?;
305
306        let deleted =
307            diesel::delete(engine_events::table.filter(engine_events::event_id.eq_any(event_ids)))
308                .execute(&mut conn)
309                .context("failed to delete engine_events")?;
310
311        Ok(deleted)
312    }
313
314    fn delete_commands(&self, command_ids: &[i64]) -> Result<usize> {
315        let mut conn = self.pool().get().context("failed to get connection")?;
316
317        let deleted = conn
318            .transaction::<_, diesel::result::Error, _>(|conn| {
319                // Delete digests first (FK constraint)
320                diesel::delete(
321                    engine_command_digests::table
322                        .filter(engine_command_digests::command_id.eq_any(command_ids)),
323                )
324                .execute(conn)?;
325
326                diesel::delete(
327                    engine_commands::table.filter(engine_commands::command_id.eq_any(command_ids)),
328                )
329                .execute(conn)
330            })
331            .context("failed to delete engine_commands")?;
332
333        Ok(deleted)
334    }
335
336    fn delete_order_actions(&self, ids: &[i32]) -> Result<usize> {
337        let mut conn = self.pool().get().context("failed to get connection")?;
338
339        let deleted = diesel::delete(order_actions::table.filter(order_actions::id.eq_any(ids)))
340            .execute(&mut conn)
341            .context("failed to delete order_actions")?;
342
343        Ok(deleted)
344    }
345
346    fn delete_order_updates(&self, ids: &[i32]) -> Result<usize> {
347        let mut conn = self.pool().get().context("failed to get connection")?;
348
349        let deleted = diesel::delete(order_updates::table.filter(order_updates::id.eq_any(ids)))
350            .execute(&mut conn)
351            .context("failed to delete order_updates")?;
352
353        Ok(deleted)
354    }
355
356    fn delete_notifications_before(
357        &self,
358        cutoff: &DateTime<Utc>,
359        batch_size: i64,
360    ) -> Result<usize> {
361        let mut conn = self.pool().get().context("failed to get connection")?;
362
363        let sql = r#"
364            DELETE FROM notifications
365            WHERE id IN (
366                SELECT id FROM notifications
367                WHERE created_at < $1
368                ORDER BY id ASC
369                LIMIT $2
370            )
371        "#;
372        let deleted = diesel::sql_query(sql)
373            .bind::<diesel::sql_types::Timestamptz, _>(cutoff)
374            .bind::<BigInt, _>(batch_size)
375            .execute(&mut conn)
376            .context("failed to delete notifications batch")?;
377
378        Ok(deleted)
379    }
380
381    fn delete_notifications_over_per_user_cap(
382        &self,
383        max_per_user: i64,
384        batch_size: i64,
385    ) -> Result<usize> {
386        let mut conn = self.pool().get().context("failed to get connection")?;
387
388        let sql = r#"
389            DELETE FROM notifications
390            WHERE id IN (
391                SELECT id FROM (
392                    SELECT id, ROW_NUMBER() OVER (PARTITION BY wallet_address ORDER BY id DESC) AS rn
393                    FROM notifications
394                ) sub
395                WHERE sub.rn > $1
396                LIMIT $2
397            )
398        "#;
399        let deleted = diesel::sql_query(sql)
400            .bind::<BigInt, _>(max_per_user)
401            .bind::<BigInt, _>(batch_size)
402            .execute(&mut conn)
403            .context("failed to delete notifications over per-user cap")?;
404
405        Ok(deleted)
406    }
407}
408
409#[allow(dead_code)]
410fn _unused_notifications_schema_guard() {
411    let _ = notifications::table;
412}
413
414#[cfg(test)]
415mod tests {
416    use crate::test_helpers::TestDb;
417    use chrono::{Duration, Utc};
418    use diesel::prelude::*;
419    use diesel::sql_query;
420    use diesel::sql_types::BigInt;
421    use hypercall_db::*;
422
423    /// Helper: upsert an engine_snapshot_boundary row.
424    fn seed_snapshot_boundary(test_db: &TestDb, last_command_id: i64) {
425        let mut conn = test_db.handler.pool().get().unwrap();
426        sql_query(
427            "INSERT INTO engine_snapshot_boundary (id, last_command_id, updated_at) \
428             VALUES (1, $1, NOW()) \
429             ON CONFLICT (id) DO UPDATE SET last_command_id = EXCLUDED.last_command_id, updated_at = NOW()",
430        )
431        .bind::<BigInt, _>(last_command_id)
432        .execute(&mut conn)
433        .unwrap();
434    }
435
436    /// Helper: insert an engine_command row.
437    fn seed_engine_command(test_db: &TestDb, command_id: i64, created_at_hours_ago: i64) {
438        let mut conn = test_db.handler.pool().get().unwrap();
439        let ts = Utc::now() - Duration::hours(created_at_hours_ago);
440        sql_query(
441            "INSERT INTO engine_commands (command_id, received_ts_ms, request_uuid, created_at, command_type_enum) \
442             VALUES ($1, $2, gen_random_uuid(), $3, 'CreateOrder')",
443        )
444        .bind::<BigInt, _>(command_id)
445        .bind::<BigInt, _>(command_id * 1000)
446        .bind::<diesel::sql_types::Timestamptz, _>(ts)
447        .execute(&mut conn)
448        .unwrap();
449    }
450
451    /// Helper: insert an engine_event row.
452    /// Uses `event_id` as the `event_idx` to avoid unique constraint violations
453    /// when inserting multiple events for the same command_id.
454    fn seed_engine_event(
455        test_db: &TestDb,
456        event_id: i64,
457        command_id: i64,
458        created_at_hours_ago: i64,
459    ) {
460        let mut conn = test_db.handler.pool().get().unwrap();
461        let ts = Utc::now() - Duration::hours(created_at_hours_ago);
462        sql_query(
463            "INSERT INTO engine_events (event_id, command_id, event_idx, event_data, created_at, publish_attempts, event_type_enum) \
464             VALUES ($1, $2, $1::int, E'\\\\x00', $3, 0, 'OrderAction')",
465        )
466        .bind::<BigInt, _>(event_id)
467        .bind::<BigInt, _>(command_id)
468        .bind::<diesel::sql_types::Timestamptz, _>(ts)
469        .execute(&mut conn)
470        .unwrap();
471    }
472
473    /// Helper: insert an engine_command_digest row.
474    fn seed_command_digest(test_db: &TestDb, command_id: i64) {
475        let mut conn = test_db.handler.pool().get().unwrap();
476        sql_query(
477            "INSERT INTO engine_command_digests (command_id, duration_ms, created_at, pre_digest_data, post_digest_data) \
478             VALUES ($1, 10, NOW(), E'\\\\x00', E'\\\\x00')",
479        )
480        .bind::<BigInt, _>(command_id)
481        .execute(&mut conn)
482        .unwrap();
483    }
484
485    /// Helper: insert an order_actions row.
486    fn seed_order_action(test_db: &TestDb, created_at_hours_ago: i64) -> i32 {
487        let mut conn = test_db.handler.pool().get().unwrap();
488        let ts = (Utc::now() - Duration::hours(created_at_hours_ago)).naive_utc();
489        #[derive(diesel::QueryableByName)]
490        struct IdRow {
491            #[diesel(sql_type = diesel::sql_types::Integer)]
492            id: i32,
493        }
494        let row = sql_query(
495            "INSERT INTO order_actions (timestamp, wallet, action, symbol, price, size, side, tif, created_at) \
496             VALUES (1000, E'\\\\x0000000000000000000000000000000000000001', 'place', 'BTC-25DEC26-100000-C', 0.05, 1, 'buy', 'GTC', $1) \
497             RETURNING id",
498        )
499        .bind::<diesel::sql_types::Timestamp, _>(ts)
500        .get_result::<IdRow>(&mut conn)
501        .unwrap();
502        row.id
503    }
504
505    /// Helper: insert an order_updates row.
506    fn seed_order_update(test_db: &TestDb, created_at_hours_ago: i64) -> i32 {
507        let mut conn = test_db.handler.pool().get().unwrap();
508        let ts = (Utc::now() - Duration::hours(created_at_hours_ago)).naive_utc();
509        #[derive(diesel::QueryableByName)]
510        struct IdRow {
511            #[diesel(sql_type = diesel::sql_types::Integer)]
512            id: i32,
513        }
514        let row = sql_query(
515            "INSERT INTO order_updates (timestamp, status, filled_size, symbol, price, size, side, tif, created_at) \
516             VALUES (1000, 'FILLED', 1, 'BTC-25DEC26-100000-C', 0.05, 1, 'buy', 'GTC', $1) \
517             RETURNING id",
518        )
519        .bind::<diesel::sql_types::Timestamp, _>(ts)
520        .get_result::<IdRow>(&mut conn)
521        .unwrap();
522        row.id
523    }
524
525    /// Helper: insert a notifications row.
526    fn seed_notification(test_db: &TestDb, wallet: &str, created_at_hours_ago: i64) -> i64 {
527        let mut conn = test_db.handler.pool().get().unwrap();
528        let ts = Utc::now() - Duration::hours(created_at_hours_ago);
529        #[derive(diesel::QueryableByName)]
530        struct IdRow {
531            #[diesel(sql_type = diesel::sql_types::BigInt)]
532            id: i64,
533        }
534        let row = sql_query(
535            "INSERT INTO notifications (wallet_address, notification_type, payload, created_at) \
536             VALUES ($1, 'order_filled', E'\\\\x00', $2) \
537             RETURNING id",
538        )
539        .bind::<diesel::sql_types::Text, _>(wallet)
540        .bind::<diesel::sql_types::Timestamptz, _>(ts)
541        .get_result::<IdRow>(&mut conn)
542        .unwrap();
543        row.id
544    }
545
546    // =========================================================================
547    // ArchiverReader tests
548    // =========================================================================
549
550    #[tokio::test]
551    async fn find_safe_boundary_returns_snapshot_boundary() {
552        let test_db = TestDb::new().await.unwrap();
553        seed_snapshot_boundary(&test_db, 500);
554
555        let reader: &dyn ArchiverReader = test_db.handler.as_ref();
556        let boundary = reader.find_safe_boundary(24).unwrap();
557        assert_eq!(boundary.max_command_id, 500);
558    }
559
560    #[tokio::test]
561    async fn find_safe_boundary_no_snapshot_returns_zero() {
562        let test_db = TestDb::new().await.unwrap();
563        let reader: &dyn ArchiverReader = test_db.handler.as_ref();
564        let boundary = reader.find_safe_boundary(24).unwrap();
565        assert_eq!(boundary.max_command_id, 0);
566    }
567
568    #[tokio::test]
569    async fn fetch_events_batch_filters_by_command_id_and_age() {
570        let test_db = TestDb::new().await.unwrap();
571        // Command 1, old enough (48h ago)
572        seed_engine_command(&test_db, 1, 48);
573        seed_engine_event(&test_db, 1, 1, 48);
574        // Command 2, old enough but command_id too high
575        seed_engine_command(&test_db, 100, 48);
576        seed_engine_event(&test_db, 2, 100, 48);
577        // Command 3, recent (not old enough)
578        seed_engine_command(&test_db, 3, 1);
579        seed_engine_event(&test_db, 3, 3, 1);
580
581        let reader: &dyn ArchiverReader = test_db.handler.as_ref();
582        let age_cutoff = Utc::now() - Duration::hours(24);
583        let batch = reader
584            .fetch_events_batch(50, &age_cutoff, 100, None)
585            .unwrap();
586        // Should only get event_id=1 (command_id=1 <= 50, and 48h > 24h cutoff)
587        assert_eq!(batch.len(), 1);
588        assert_eq!(batch[0].event_id, 1);
589        assert_eq!(batch[0].command_id, 1);
590    }
591
592    #[tokio::test]
593    async fn fetch_events_batch_respects_after_event_id() {
594        let test_db = TestDb::new().await.unwrap();
595        seed_engine_command(&test_db, 1, 48);
596        seed_engine_event(&test_db, 10, 1, 48);
597        seed_engine_event(&test_db, 20, 1, 48);
598
599        let reader: &dyn ArchiverReader = test_db.handler.as_ref();
600        let age_cutoff = Utc::now() - Duration::hours(24);
601        let batch = reader
602            .fetch_events_batch(100, &age_cutoff, 100, Some(10))
603            .unwrap();
604        assert_eq!(batch.len(), 1);
605        assert_eq!(batch[0].event_id, 20);
606    }
607
608    #[tokio::test]
609    async fn delete_events_removes_matching_rows() {
610        let test_db = TestDb::new().await.unwrap();
611        seed_engine_command(&test_db, 1, 48);
612        seed_engine_event(&test_db, 10, 1, 48);
613        seed_engine_event(&test_db, 20, 1, 48);
614
615        let writer: &dyn ArchiverWriter = test_db.handler.as_ref();
616        let deleted = writer.delete_events(&[10]).unwrap();
617        assert_eq!(deleted, 1);
618
619        // Verify only event 20 remains
620        let reader: &dyn ArchiverReader = test_db.handler.as_ref();
621        let age_cutoff = Utc::now() - Duration::hours(1);
622        let remaining = reader
623            .fetch_events_batch(100, &age_cutoff, 100, None)
624            .unwrap();
625        assert_eq!(remaining.len(), 1);
626        assert_eq!(remaining[0].event_id, 20);
627    }
628
629    #[tokio::test]
630    async fn fetch_orphan_commands_batch_returns_commands_without_events() {
631        let test_db = TestDb::new().await.unwrap();
632        // Command with events (not orphan)
633        seed_engine_command(&test_db, 1, 48);
634        seed_engine_event(&test_db, 1, 1, 48);
635        // Command without events (orphan)
636        seed_engine_command(&test_db, 2, 48);
637
638        let reader: &dyn ArchiverReader = test_db.handler.as_ref();
639        let age_cutoff = Utc::now() - Duration::hours(24);
640        let batch = reader
641            .fetch_orphan_commands_batch(100, &age_cutoff, 100, None)
642            .unwrap();
643        assert_eq!(batch.len(), 1);
644        assert_eq!(batch[0].command_id, 2);
645    }
646
647    #[tokio::test]
648    async fn delete_commands_cascades_to_digests() {
649        let test_db = TestDb::new().await.unwrap();
650        seed_engine_command(&test_db, 5, 48);
651        seed_command_digest(&test_db, 5);
652
653        let writer: &dyn ArchiverWriter = test_db.handler.as_ref();
654        let deleted = writer.delete_commands(&[5]).unwrap();
655        assert_eq!(deleted, 1);
656
657        // Verify command is gone
658        let reader: &dyn ArchiverReader = test_db.handler.as_ref();
659        let age_cutoff = Utc::now() - Duration::hours(1);
660        let orphans = reader
661            .fetch_orphan_commands_batch(100, &age_cutoff, 100, None)
662            .unwrap();
663        assert!(orphans.is_empty());
664    }
665
666    #[tokio::test]
667    async fn fetch_order_actions_batch_filters_by_age() {
668        let test_db = TestDb::new().await.unwrap();
669        let _old_id = seed_order_action(&test_db, 48);
670        let _new_id = seed_order_action(&test_db, 1);
671
672        let reader: &dyn ArchiverReader = test_db.handler.as_ref();
673        let age_cutoff = Utc::now() - Duration::hours(24);
674        let batch = reader
675            .fetch_order_actions_batch(&age_cutoff, 100, None)
676            .unwrap();
677        assert_eq!(batch.len(), 1);
678    }
679
680    #[tokio::test]
681    async fn delete_order_actions_removes_rows() {
682        let test_db = TestDb::new().await.unwrap();
683        let id = seed_order_action(&test_db, 48);
684
685        let writer: &dyn ArchiverWriter = test_db.handler.as_ref();
686        let deleted = writer.delete_order_actions(&[id]).unwrap();
687        assert_eq!(deleted, 1);
688
689        // Verify gone
690        let reader: &dyn ArchiverReader = test_db.handler.as_ref();
691        let age_cutoff = Utc::now() - Duration::hours(1);
692        let remaining = reader
693            .fetch_order_actions_batch(&age_cutoff, 100, None)
694            .unwrap();
695        assert!(remaining.is_empty());
696    }
697
698    #[tokio::test]
699    async fn fetch_order_updates_batch_filters_by_age() {
700        let test_db = TestDb::new().await.unwrap();
701        let _old_id = seed_order_update(&test_db, 48);
702        let _new_id = seed_order_update(&test_db, 1);
703
704        let reader: &dyn ArchiverReader = test_db.handler.as_ref();
705        let age_cutoff = Utc::now() - Duration::hours(24);
706        let batch = reader
707            .fetch_order_updates_batch(&age_cutoff, 100, None)
708            .unwrap();
709        assert_eq!(batch.len(), 1);
710    }
711
712    #[tokio::test]
713    async fn delete_order_updates_removes_rows() {
714        let test_db = TestDb::new().await.unwrap();
715        let id = seed_order_update(&test_db, 48);
716
717        let writer: &dyn ArchiverWriter = test_db.handler.as_ref();
718        let deleted = writer.delete_order_updates(&[id]).unwrap();
719        assert_eq!(deleted, 1);
720
721        let reader: &dyn ArchiverReader = test_db.handler.as_ref();
722        let age_cutoff = Utc::now() - Duration::hours(1);
723        let remaining = reader
724            .fetch_order_updates_batch(&age_cutoff, 100, None)
725            .unwrap();
726        assert!(remaining.is_empty());
727    }
728
729    #[tokio::test]
730    async fn delete_notifications_before_removes_old_rows() {
731        let test_db = TestDb::new().await.unwrap();
732        let _old = seed_notification(&test_db, "0xabc", 48);
733        let _new = seed_notification(&test_db, "0xabc", 1);
734
735        let writer: &dyn ArchiverWriter = test_db.handler.as_ref();
736        let cutoff = Utc::now() - Duration::hours(24);
737        let deleted = writer.delete_notifications_before(&cutoff, 100).unwrap();
738        assert_eq!(deleted, 1);
739    }
740
741    #[tokio::test]
742    async fn delete_notifications_over_per_user_cap_enforces_limit() {
743        let test_db = TestDb::new().await.unwrap();
744        // Insert 5 notifications for same wallet
745        for i in 0..5 {
746            seed_notification(&test_db, "0xwallet1", i);
747        }
748
749        let writer: &dyn ArchiverWriter = test_db.handler.as_ref();
750        // Cap at 2 per user: should delete 3
751        let deleted = writer
752            .delete_notifications_over_per_user_cap(2, 100)
753            .unwrap();
754        assert_eq!(deleted, 3);
755    }
756}