1use anyhow::{Context, Result};
7use chrono::{DateTime, Duration, Utc};
8use diesel::prelude::*;
9use diesel::sql_query;
10use diesel::sql_types::{BigInt, Nullable};
11use tracing::{info, warn};
12
13use crate::database_handler::DatabaseHandler;
14use crate::engine_enums::{CommandType, DbUuid, EventType};
15use crate::models::{OrderActionRecord, OrderUpdateRecord};
16use crate::schema::{
17 engine_command_digests, engine_commands, engine_events, notifications, order_actions,
18 order_updates,
19};
20use hypercall_db::{
21 ArchivalBoundary, ArchivedCommand, ArchivedEvent, ArchivedOrderAction, ArchivedOrderUpdate,
22 ArchiverReader, ArchiverWriter,
23};
24
25#[derive(Queryable, Selectable, Debug)]
31#[diesel(table_name = engine_events)]
32struct EventRow {
33 pub event_id: i64,
34 pub command_id: i64,
35 pub event_idx: i32,
36 pub event_data: Vec<u8>,
37 pub event_key: Option<String>,
38 pub l2_sequence: Option<i64>,
39 pub created_at: DateTime<Utc>,
40 pub published_at: Option<DateTime<Utc>>,
41 pub publish_attempts: i32,
42 pub last_publish_error: Option<String>,
43 pub last_publish_attempt_at: Option<DateTime<Utc>>,
44 pub event_type_enum: EventType,
45}
46
47impl From<EventRow> for ArchivedEvent {
48 fn from(r: EventRow) -> Self {
49 Self {
50 event_id: r.event_id,
51 command_id: r.command_id,
52 event_idx: r.event_idx,
53 event_data: r.event_data,
54 event_key: r.event_key,
55 l2_sequence: r.l2_sequence,
56 created_at: r.created_at,
57 published_at: r.published_at,
58 publish_attempts: r.publish_attempts,
59 last_publish_error: r.last_publish_error,
60 last_publish_attempt_at: r.last_publish_attempt_at,
61 event_type: r.event_type_enum.as_str().to_string(),
62 }
63 }
64}
65
66#[derive(Queryable, Selectable, QueryableByName, Debug)]
68#[diesel(table_name = engine_commands)]
69struct CommandRow {
70 pub command_id: i64,
71 pub received_ts_ms: i64,
72 pub command_data: Option<Vec<u8>>,
73 pub response_data: Option<Vec<u8>>,
74 pub order_id: Option<i64>,
75 pub request_uuid: DbUuid,
76 pub command_type_enum: Option<CommandType>,
77 pub created_at: DateTime<Utc>,
78}
79
80impl From<CommandRow> for ArchivedCommand {
81 fn from(r: CommandRow) -> Self {
82 Self {
83 command_id: r.command_id,
84 received_ts_ms: r.received_ts_ms,
85 command_data: r.command_data,
86 response_data: r.response_data,
87 order_id: r.order_id,
88 request_uuid: r.request_uuid.to_string(),
89 command_type: r.command_type_enum.map(|ct| ct.as_str().to_string()),
90 created_at: r.created_at,
91 }
92 }
93}
94
95impl From<OrderActionRecord> for ArchivedOrderAction {
96 fn from(r: OrderActionRecord) -> Self {
97 Self {
98 id: r.id,
99 timestamp: r.timestamp,
100 wallet: r.wallet.0.as_slice().to_vec(),
101 action: r.action,
102 symbol: r.symbol,
103 price: r.price,
104 size: r.size,
105 side: r.side,
106 tif: r.tif,
107 client_id: r.client_id,
108 created_at: r.created_at,
109 }
110 }
111}
112
113impl From<OrderUpdateRecord> for ArchivedOrderUpdate {
114 fn from(r: OrderUpdateRecord) -> Self {
115 Self {
116 id: r.id,
117 timestamp: r.timestamp,
118 order_id: r.order_id,
119 status: r.status,
120 reason: r.reason,
121 filled_size: r.filled_size,
122 symbol: r.symbol,
123 price: r.price,
124 size: r.size,
125 side: r.side,
126 tif: r.tif,
127 client_id: r.client_id,
128 created_at: r.created_at,
129 }
130 }
131}
132
133impl ArchiverReader for DatabaseHandler {
138 fn find_safe_boundary(&self, min_age_hours: i64) -> Result<ArchivalBoundary> {
139 let mut conn = self.pool().get().context("failed to get connection")?;
140 let age_cutoff = Utc::now() - Duration::hours(min_age_hours);
141
142 #[derive(diesel::QueryableByName, Debug)]
143 struct BoundaryRow {
144 #[diesel(sql_type = BigInt)]
145 last_command_id: i64,
146 }
147
148 let snapshot_boundary: i64 = sql_query(
149 "SELECT COALESCE(\
150 (SELECT last_command_id FROM engine_snapshot_boundary WHERE id = 1), \
151 0\
152 ) AS last_command_id",
153 )
154 .get_result::<BoundaryRow>(&mut conn)
155 .map(|r| r.last_command_id)
156 .unwrap_or(0);
157
158 if snapshot_boundary <= 0 {
159 warn!(
160 age_cutoff = %age_cutoff,
161 "engine journal archival disabled: no snapshot boundary persisted yet"
162 );
163 return Ok(ArchivalBoundary {
164 max_command_id: 0,
165 age_cutoff,
166 });
167 }
168
169 info!(
170 snapshot_boundary,
171 age_cutoff = %age_cutoff,
172 "archival boundary: engine_commands up to snapshot last_command_id"
173 );
174
175 Ok(ArchivalBoundary {
176 max_command_id: snapshot_boundary,
177 age_cutoff,
178 })
179 }
180
181 fn fetch_events_batch(
182 &self,
183 max_command_id: i64,
184 age_cutoff: &DateTime<Utc>,
185 batch_size: i64,
186 after_event_id: Option<i64>,
187 ) -> Result<Vec<ArchivedEvent>> {
188 let mut conn = self.pool().get().context("failed to get connection")?;
189
190 let mut query = engine_events::table
191 .into_boxed()
192 .filter(engine_events::command_id.le(max_command_id))
193 .filter(engine_events::created_at.lt(age_cutoff));
194 if let Some(after_event_id) = after_event_id {
195 query = query.filter(engine_events::event_id.gt(after_event_id));
196 }
197
198 let rows = query
199 .order(engine_events::event_id.asc())
200 .limit(batch_size)
201 .select(EventRow::as_select())
202 .load::<EventRow>(&mut conn)
203 .context("failed to fetch engine_events batch")?;
204
205 Ok(rows.into_iter().map(ArchivedEvent::from).collect())
206 }
207
208 fn fetch_orphan_commands_batch(
209 &self,
210 max_command_id: i64,
211 age_cutoff: &DateTime<Utc>,
212 batch_size: i64,
213 after_command_id: Option<i64>,
214 ) -> Result<Vec<ArchivedCommand>> {
215 let mut conn = self.pool().get().context("failed to get connection")?;
216
217 let rows = sql_query(
218 r#"
219 SELECT ec.command_id, ec.received_ts_ms, ec.command_data, ec.response_data,
220 ec.order_id, ec.request_uuid, ec.command_type_enum, ec.created_at
221 FROM engine_commands ec
222 WHERE NOT EXISTS (SELECT 1 FROM engine_events ee WHERE ee.command_id = ec.command_id)
223 AND ec.command_id <= $1
224 AND ec.created_at < $2
225 AND ($3 IS NULL OR ec.command_id > $3)
226 ORDER BY ec.command_id ASC
227 LIMIT $4
228 "#,
229 )
230 .bind::<BigInt, _>(max_command_id)
231 .bind::<diesel::sql_types::Timestamptz, _>(age_cutoff)
232 .bind::<Nullable<BigInt>, _>(after_command_id)
233 .bind::<BigInt, _>(batch_size)
234 .load::<CommandRow>(&mut conn)
235 .context("failed to fetch orphan engine_commands batch")?;
236
237 Ok(rows.into_iter().map(ArchivedCommand::from).collect())
238 }
239
240 fn fetch_order_actions_batch(
241 &self,
242 age_cutoff: &DateTime<Utc>,
243 batch_size: i64,
244 after_id: Option<i32>,
245 ) -> Result<Vec<ArchivedOrderAction>> {
246 let mut conn = self.pool().get().context("failed to get connection")?;
247 let cutoff_naive = age_cutoff.naive_utc();
248
249 let mut query = order_actions::table
250 .into_boxed()
251 .filter(order_actions::created_at.lt(cutoff_naive));
252 if let Some(after_id) = after_id {
253 query = query.filter(order_actions::id.gt(Some(after_id)));
254 }
255
256 let rows = query
257 .order(order_actions::id.asc())
258 .limit(batch_size)
259 .select(OrderActionRecord::as_select())
260 .load::<OrderActionRecord>(&mut conn)
261 .context("failed to fetch order_actions batch")?;
262
263 Ok(rows.into_iter().map(ArchivedOrderAction::from).collect())
264 }
265
266 fn fetch_order_updates_batch(
267 &self,
268 age_cutoff: &DateTime<Utc>,
269 batch_size: i64,
270 after_id: Option<i32>,
271 ) -> Result<Vec<ArchivedOrderUpdate>> {
272 let mut conn = self.pool().get().context("failed to get connection")?;
273 let cutoff_naive = age_cutoff.naive_utc();
274 let cutoff_ms = age_cutoff.timestamp_millis();
275
276 let mut query = order_updates::table.into_boxed().filter(
277 order_updates::created_at
278 .lt(cutoff_naive)
279 .or(order_updates::created_at
280 .is_null()
281 .and(order_updates::timestamp.lt(cutoff_ms))),
282 );
283 if let Some(after_id) = after_id {
284 query = query.filter(order_updates::id.gt(Some(after_id)));
285 }
286
287 let rows = query
288 .order(order_updates::id.asc())
289 .limit(batch_size)
290 .select(OrderUpdateRecord::as_select())
291 .load::<OrderUpdateRecord>(&mut conn)
292 .context("failed to fetch order_updates batch")?;
293
294 Ok(rows.into_iter().map(ArchivedOrderUpdate::from).collect())
295 }
296}
297
298impl ArchiverWriter for DatabaseHandler {
303 fn delete_events(&self, event_ids: &[i64]) -> Result<usize> {
304 let mut conn = self.pool().get().context("failed to get connection")?;
305
306 let deleted =
307 diesel::delete(engine_events::table.filter(engine_events::event_id.eq_any(event_ids)))
308 .execute(&mut conn)
309 .context("failed to delete engine_events")?;
310
311 Ok(deleted)
312 }
313
314 fn delete_commands(&self, command_ids: &[i64]) -> Result<usize> {
315 let mut conn = self.pool().get().context("failed to get connection")?;
316
317 let deleted = conn
318 .transaction::<_, diesel::result::Error, _>(|conn| {
319 diesel::delete(
321 engine_command_digests::table
322 .filter(engine_command_digests::command_id.eq_any(command_ids)),
323 )
324 .execute(conn)?;
325
326 diesel::delete(
327 engine_commands::table.filter(engine_commands::command_id.eq_any(command_ids)),
328 )
329 .execute(conn)
330 })
331 .context("failed to delete engine_commands")?;
332
333 Ok(deleted)
334 }
335
336 fn delete_order_actions(&self, ids: &[i32]) -> Result<usize> {
337 let mut conn = self.pool().get().context("failed to get connection")?;
338
339 let deleted = diesel::delete(order_actions::table.filter(order_actions::id.eq_any(ids)))
340 .execute(&mut conn)
341 .context("failed to delete order_actions")?;
342
343 Ok(deleted)
344 }
345
346 fn delete_order_updates(&self, ids: &[i32]) -> Result<usize> {
347 let mut conn = self.pool().get().context("failed to get connection")?;
348
349 let deleted = diesel::delete(order_updates::table.filter(order_updates::id.eq_any(ids)))
350 .execute(&mut conn)
351 .context("failed to delete order_updates")?;
352
353 Ok(deleted)
354 }
355
356 fn delete_notifications_before(
357 &self,
358 cutoff: &DateTime<Utc>,
359 batch_size: i64,
360 ) -> Result<usize> {
361 let mut conn = self.pool().get().context("failed to get connection")?;
362
363 let sql = r#"
364 DELETE FROM notifications
365 WHERE id IN (
366 SELECT id FROM notifications
367 WHERE created_at < $1
368 ORDER BY id ASC
369 LIMIT $2
370 )
371 "#;
372 let deleted = diesel::sql_query(sql)
373 .bind::<diesel::sql_types::Timestamptz, _>(cutoff)
374 .bind::<BigInt, _>(batch_size)
375 .execute(&mut conn)
376 .context("failed to delete notifications batch")?;
377
378 Ok(deleted)
379 }
380
381 fn delete_notifications_over_per_user_cap(
382 &self,
383 max_per_user: i64,
384 batch_size: i64,
385 ) -> Result<usize> {
386 let mut conn = self.pool().get().context("failed to get connection")?;
387
388 let sql = r#"
389 DELETE FROM notifications
390 WHERE id IN (
391 SELECT id FROM (
392 SELECT id, ROW_NUMBER() OVER (PARTITION BY wallet_address ORDER BY id DESC) AS rn
393 FROM notifications
394 ) sub
395 WHERE sub.rn > $1
396 LIMIT $2
397 )
398 "#;
399 let deleted = diesel::sql_query(sql)
400 .bind::<BigInt, _>(max_per_user)
401 .bind::<BigInt, _>(batch_size)
402 .execute(&mut conn)
403 .context("failed to delete notifications over per-user cap")?;
404
405 Ok(deleted)
406 }
407}
408
409#[allow(dead_code)]
410fn _unused_notifications_schema_guard() {
411 let _ = notifications::table;
412}
413
414#[cfg(test)]
415mod tests {
416 use crate::test_helpers::TestDb;
417 use chrono::{Duration, Utc};
418 use diesel::prelude::*;
419 use diesel::sql_query;
420 use diesel::sql_types::BigInt;
421 use hypercall_db::*;
422
423 fn seed_snapshot_boundary(test_db: &TestDb, last_command_id: i64) {
425 let mut conn = test_db.handler.pool().get().unwrap();
426 sql_query(
427 "INSERT INTO engine_snapshot_boundary (id, last_command_id, updated_at) \
428 VALUES (1, $1, NOW()) \
429 ON CONFLICT (id) DO UPDATE SET last_command_id = EXCLUDED.last_command_id, updated_at = NOW()",
430 )
431 .bind::<BigInt, _>(last_command_id)
432 .execute(&mut conn)
433 .unwrap();
434 }
435
436 fn seed_engine_command(test_db: &TestDb, command_id: i64, created_at_hours_ago: i64) {
438 let mut conn = test_db.handler.pool().get().unwrap();
439 let ts = Utc::now() - Duration::hours(created_at_hours_ago);
440 sql_query(
441 "INSERT INTO engine_commands (command_id, received_ts_ms, request_uuid, created_at, command_type_enum) \
442 VALUES ($1, $2, gen_random_uuid(), $3, 'CreateOrder')",
443 )
444 .bind::<BigInt, _>(command_id)
445 .bind::<BigInt, _>(command_id * 1000)
446 .bind::<diesel::sql_types::Timestamptz, _>(ts)
447 .execute(&mut conn)
448 .unwrap();
449 }
450
451 fn seed_engine_event(
455 test_db: &TestDb,
456 event_id: i64,
457 command_id: i64,
458 created_at_hours_ago: i64,
459 ) {
460 let mut conn = test_db.handler.pool().get().unwrap();
461 let ts = Utc::now() - Duration::hours(created_at_hours_ago);
462 sql_query(
463 "INSERT INTO engine_events (event_id, command_id, event_idx, event_data, created_at, publish_attempts, event_type_enum) \
464 VALUES ($1, $2, $1::int, E'\\\\x00', $3, 0, 'OrderAction')",
465 )
466 .bind::<BigInt, _>(event_id)
467 .bind::<BigInt, _>(command_id)
468 .bind::<diesel::sql_types::Timestamptz, _>(ts)
469 .execute(&mut conn)
470 .unwrap();
471 }
472
473 fn seed_command_digest(test_db: &TestDb, command_id: i64) {
475 let mut conn = test_db.handler.pool().get().unwrap();
476 sql_query(
477 "INSERT INTO engine_command_digests (command_id, duration_ms, created_at, pre_digest_data, post_digest_data) \
478 VALUES ($1, 10, NOW(), E'\\\\x00', E'\\\\x00')",
479 )
480 .bind::<BigInt, _>(command_id)
481 .execute(&mut conn)
482 .unwrap();
483 }
484
485 fn seed_order_action(test_db: &TestDb, created_at_hours_ago: i64) -> i32 {
487 let mut conn = test_db.handler.pool().get().unwrap();
488 let ts = (Utc::now() - Duration::hours(created_at_hours_ago)).naive_utc();
489 #[derive(diesel::QueryableByName)]
490 struct IdRow {
491 #[diesel(sql_type = diesel::sql_types::Integer)]
492 id: i32,
493 }
494 let row = sql_query(
495 "INSERT INTO order_actions (timestamp, wallet, action, symbol, price, size, side, tif, created_at) \
496 VALUES (1000, E'\\\\x0000000000000000000000000000000000000001', 'place', 'BTC-25DEC26-100000-C', 0.05, 1, 'buy', 'GTC', $1) \
497 RETURNING id",
498 )
499 .bind::<diesel::sql_types::Timestamp, _>(ts)
500 .get_result::<IdRow>(&mut conn)
501 .unwrap();
502 row.id
503 }
504
505 fn seed_order_update(test_db: &TestDb, created_at_hours_ago: i64) -> i32 {
507 let mut conn = test_db.handler.pool().get().unwrap();
508 let ts = (Utc::now() - Duration::hours(created_at_hours_ago)).naive_utc();
509 #[derive(diesel::QueryableByName)]
510 struct IdRow {
511 #[diesel(sql_type = diesel::sql_types::Integer)]
512 id: i32,
513 }
514 let row = sql_query(
515 "INSERT INTO order_updates (timestamp, status, filled_size, symbol, price, size, side, tif, created_at) \
516 VALUES (1000, 'FILLED', 1, 'BTC-25DEC26-100000-C', 0.05, 1, 'buy', 'GTC', $1) \
517 RETURNING id",
518 )
519 .bind::<diesel::sql_types::Timestamp, _>(ts)
520 .get_result::<IdRow>(&mut conn)
521 .unwrap();
522 row.id
523 }
524
525 fn seed_notification(test_db: &TestDb, wallet: &str, created_at_hours_ago: i64) -> i64 {
527 let mut conn = test_db.handler.pool().get().unwrap();
528 let ts = Utc::now() - Duration::hours(created_at_hours_ago);
529 #[derive(diesel::QueryableByName)]
530 struct IdRow {
531 #[diesel(sql_type = diesel::sql_types::BigInt)]
532 id: i64,
533 }
534 let row = sql_query(
535 "INSERT INTO notifications (wallet_address, notification_type, payload, created_at) \
536 VALUES ($1, 'order_filled', E'\\\\x00', $2) \
537 RETURNING id",
538 )
539 .bind::<diesel::sql_types::Text, _>(wallet)
540 .bind::<diesel::sql_types::Timestamptz, _>(ts)
541 .get_result::<IdRow>(&mut conn)
542 .unwrap();
543 row.id
544 }
545
546 #[tokio::test]
551 async fn find_safe_boundary_returns_snapshot_boundary() {
552 let test_db = TestDb::new().await.unwrap();
553 seed_snapshot_boundary(&test_db, 500);
554
555 let reader: &dyn ArchiverReader = test_db.handler.as_ref();
556 let boundary = reader.find_safe_boundary(24).unwrap();
557 assert_eq!(boundary.max_command_id, 500);
558 }
559
560 #[tokio::test]
561 async fn find_safe_boundary_no_snapshot_returns_zero() {
562 let test_db = TestDb::new().await.unwrap();
563 let reader: &dyn ArchiverReader = test_db.handler.as_ref();
564 let boundary = reader.find_safe_boundary(24).unwrap();
565 assert_eq!(boundary.max_command_id, 0);
566 }
567
568 #[tokio::test]
569 async fn fetch_events_batch_filters_by_command_id_and_age() {
570 let test_db = TestDb::new().await.unwrap();
571 seed_engine_command(&test_db, 1, 48);
573 seed_engine_event(&test_db, 1, 1, 48);
574 seed_engine_command(&test_db, 100, 48);
576 seed_engine_event(&test_db, 2, 100, 48);
577 seed_engine_command(&test_db, 3, 1);
579 seed_engine_event(&test_db, 3, 3, 1);
580
581 let reader: &dyn ArchiverReader = test_db.handler.as_ref();
582 let age_cutoff = Utc::now() - Duration::hours(24);
583 let batch = reader
584 .fetch_events_batch(50, &age_cutoff, 100, None)
585 .unwrap();
586 assert_eq!(batch.len(), 1);
588 assert_eq!(batch[0].event_id, 1);
589 assert_eq!(batch[0].command_id, 1);
590 }
591
592 #[tokio::test]
593 async fn fetch_events_batch_respects_after_event_id() {
594 let test_db = TestDb::new().await.unwrap();
595 seed_engine_command(&test_db, 1, 48);
596 seed_engine_event(&test_db, 10, 1, 48);
597 seed_engine_event(&test_db, 20, 1, 48);
598
599 let reader: &dyn ArchiverReader = test_db.handler.as_ref();
600 let age_cutoff = Utc::now() - Duration::hours(24);
601 let batch = reader
602 .fetch_events_batch(100, &age_cutoff, 100, Some(10))
603 .unwrap();
604 assert_eq!(batch.len(), 1);
605 assert_eq!(batch[0].event_id, 20);
606 }
607
608 #[tokio::test]
609 async fn delete_events_removes_matching_rows() {
610 let test_db = TestDb::new().await.unwrap();
611 seed_engine_command(&test_db, 1, 48);
612 seed_engine_event(&test_db, 10, 1, 48);
613 seed_engine_event(&test_db, 20, 1, 48);
614
615 let writer: &dyn ArchiverWriter = test_db.handler.as_ref();
616 let deleted = writer.delete_events(&[10]).unwrap();
617 assert_eq!(deleted, 1);
618
619 let reader: &dyn ArchiverReader = test_db.handler.as_ref();
621 let age_cutoff = Utc::now() - Duration::hours(1);
622 let remaining = reader
623 .fetch_events_batch(100, &age_cutoff, 100, None)
624 .unwrap();
625 assert_eq!(remaining.len(), 1);
626 assert_eq!(remaining[0].event_id, 20);
627 }
628
629 #[tokio::test]
630 async fn fetch_orphan_commands_batch_returns_commands_without_events() {
631 let test_db = TestDb::new().await.unwrap();
632 seed_engine_command(&test_db, 1, 48);
634 seed_engine_event(&test_db, 1, 1, 48);
635 seed_engine_command(&test_db, 2, 48);
637
638 let reader: &dyn ArchiverReader = test_db.handler.as_ref();
639 let age_cutoff = Utc::now() - Duration::hours(24);
640 let batch = reader
641 .fetch_orphan_commands_batch(100, &age_cutoff, 100, None)
642 .unwrap();
643 assert_eq!(batch.len(), 1);
644 assert_eq!(batch[0].command_id, 2);
645 }
646
647 #[tokio::test]
648 async fn delete_commands_cascades_to_digests() {
649 let test_db = TestDb::new().await.unwrap();
650 seed_engine_command(&test_db, 5, 48);
651 seed_command_digest(&test_db, 5);
652
653 let writer: &dyn ArchiverWriter = test_db.handler.as_ref();
654 let deleted = writer.delete_commands(&[5]).unwrap();
655 assert_eq!(deleted, 1);
656
657 let reader: &dyn ArchiverReader = test_db.handler.as_ref();
659 let age_cutoff = Utc::now() - Duration::hours(1);
660 let orphans = reader
661 .fetch_orphan_commands_batch(100, &age_cutoff, 100, None)
662 .unwrap();
663 assert!(orphans.is_empty());
664 }
665
666 #[tokio::test]
667 async fn fetch_order_actions_batch_filters_by_age() {
668 let test_db = TestDb::new().await.unwrap();
669 let _old_id = seed_order_action(&test_db, 48);
670 let _new_id = seed_order_action(&test_db, 1);
671
672 let reader: &dyn ArchiverReader = test_db.handler.as_ref();
673 let age_cutoff = Utc::now() - Duration::hours(24);
674 let batch = reader
675 .fetch_order_actions_batch(&age_cutoff, 100, None)
676 .unwrap();
677 assert_eq!(batch.len(), 1);
678 }
679
680 #[tokio::test]
681 async fn delete_order_actions_removes_rows() {
682 let test_db = TestDb::new().await.unwrap();
683 let id = seed_order_action(&test_db, 48);
684
685 let writer: &dyn ArchiverWriter = test_db.handler.as_ref();
686 let deleted = writer.delete_order_actions(&[id]).unwrap();
687 assert_eq!(deleted, 1);
688
689 let reader: &dyn ArchiverReader = test_db.handler.as_ref();
691 let age_cutoff = Utc::now() - Duration::hours(1);
692 let remaining = reader
693 .fetch_order_actions_batch(&age_cutoff, 100, None)
694 .unwrap();
695 assert!(remaining.is_empty());
696 }
697
698 #[tokio::test]
699 async fn fetch_order_updates_batch_filters_by_age() {
700 let test_db = TestDb::new().await.unwrap();
701 let _old_id = seed_order_update(&test_db, 48);
702 let _new_id = seed_order_update(&test_db, 1);
703
704 let reader: &dyn ArchiverReader = test_db.handler.as_ref();
705 let age_cutoff = Utc::now() - Duration::hours(24);
706 let batch = reader
707 .fetch_order_updates_batch(&age_cutoff, 100, None)
708 .unwrap();
709 assert_eq!(batch.len(), 1);
710 }
711
712 #[tokio::test]
713 async fn delete_order_updates_removes_rows() {
714 let test_db = TestDb::new().await.unwrap();
715 let id = seed_order_update(&test_db, 48);
716
717 let writer: &dyn ArchiverWriter = test_db.handler.as_ref();
718 let deleted = writer.delete_order_updates(&[id]).unwrap();
719 assert_eq!(deleted, 1);
720
721 let reader: &dyn ArchiverReader = test_db.handler.as_ref();
722 let age_cutoff = Utc::now() - Duration::hours(1);
723 let remaining = reader
724 .fetch_order_updates_batch(&age_cutoff, 100, None)
725 .unwrap();
726 assert!(remaining.is_empty());
727 }
728
729 #[tokio::test]
730 async fn delete_notifications_before_removes_old_rows() {
731 let test_db = TestDb::new().await.unwrap();
732 let _old = seed_notification(&test_db, "0xabc", 48);
733 let _new = seed_notification(&test_db, "0xabc", 1);
734
735 let writer: &dyn ArchiverWriter = test_db.handler.as_ref();
736 let cutoff = Utc::now() - Duration::hours(24);
737 let deleted = writer.delete_notifications_before(&cutoff, 100).unwrap();
738 assert_eq!(deleted, 1);
739 }
740
741 #[tokio::test]
742 async fn delete_notifications_over_per_user_cap_enforces_limit() {
743 let test_db = TestDb::new().await.unwrap();
744 for i in 0..5 {
746 seed_notification(&test_db, "0xwallet1", i);
747 }
748
749 let writer: &dyn ArchiverWriter = test_db.handler.as_ref();
750 let deleted = writer
752 .delete_notifications_over_per_user_cap(2, 100)
753 .unwrap();
754 assert_eq!(deleted, 3);
755 }
756}