1use crate::{DatabaseHandler, DieselDb};
2use anyhow::{anyhow, ensure, Result};
3use async_trait::async_trait;
4use diesel::sql_types::{Array, BigInt, Binary, Nullable, Text};
5use diesel::{Connection, ExpressionMethods, OptionalExtension, QueryDsl, RunQueryDsl};
6use diesel_async::AsyncConnection;
7use hypercall_transaction_submitter_core::{SubmissionStatus, SubmittedNonce, SubmitterId};
8use hypercall_transaction_submitter_db::{
9 AsyncTransactionSubmitterStore, PendingSubmissionRow, SubmissionAttemptRecord,
10 SubmissionDetailRow, SubmissionRecord, TransactionSubmitterReader, TransactionSubmitterStore,
11};
12
13fn submission_status_from_str(value: &str) -> Result<SubmissionStatus> {
14 match value {
15 "not_found" => Ok(SubmissionStatus::NotFound),
16 "pending" => Ok(SubmissionStatus::Pending),
17 "confirming" => Ok(SubmissionStatus::Confirming),
18 "confirmed" => Ok(SubmissionStatus::Confirmed),
19 "failed" => Ok(SubmissionStatus::Failed),
20 "created" => Ok(SubmissionStatus::Created),
21 "broadcasted" => Ok(SubmissionStatus::Broadcasted),
22 other => panic!("corrupt persisted transaction submitter status {other}"),
23 }
24}
25
26fn nonce_from_i64(value: i64) -> Result<SubmittedNonce> {
27 u64::try_from(value).map_err(|_| anyhow!("negative transaction submitter nonce {value}"))
28}
29
30fn nonce_to_i64(nonce: SubmittedNonce) -> Result<i64> {
31 i64::try_from(nonce).map_err(|_| anyhow!("submitter nonce does not fit i64: {nonce}"))
32}
33
34fn address_from_bytes(bytes: &[u8], label: &str) -> Result<SubmitterId> {
35 let arr: [u8; 20] = bytes.try_into().map_err(|_| {
36 anyhow!(
37 "invalid transaction submitter {} address length {}, expected 20",
38 label,
39 bytes.len()
40 )
41 })?;
42 Ok(SubmitterId::from(arr))
43}
44
45impl TransactionSubmitterStore for DatabaseHandler {
46 fn max_nonce_for_submitter_sync(&self, submitter: &SubmitterId) -> Result<Option<u64>> {
47 use crate::schema::transaction_submitter_submissions::dsl as submissions_dsl;
48
49 let mut conn = self.pool().get()?;
50 let max_nonce: Option<i64> = submissions_dsl::transaction_submitter_submissions
51 .filter(submissions_dsl::submitter_address.eq(submitter.as_slice()))
52 .filter(submissions_dsl::primary_tx_hash.is_not_null())
53 .select(diesel::dsl::max(submissions_dsl::nonce))
54 .first(&mut conn)?;
55
56 max_nonce.map(nonce_from_i64).transpose()
57 }
58
59 fn record_submission_sync(
60 &self,
61 record: &SubmissionRecord,
62 attempts: &[SubmissionAttemptRecord],
63 ) -> Result<()> {
64 #[derive(diesel::QueryableByName)]
65 struct SubmissionIdRow {
66 #[diesel(sql_type = BigInt)]
67 submission_id: i64,
68 }
69
70 let mut conn = self.pool().get()?;
71 let nonce = nonce_to_i64(record.nonce)?;
72 conn.transaction::<_, diesel::result::Error, _>(|conn| {
73 let row = diesel::sql_query(
74 r#"
75 INSERT INTO transaction_submitter_submissions (
76 submitter_address,
77 nonce,
78 status,
79 primary_tx_hash,
80 terminal_error
81 )
82 VALUES ($1, $2, $3, $4, $5)
83 ON CONFLICT (submitter_address, nonce) DO UPDATE
84 SET status = EXCLUDED.status,
85 primary_tx_hash = COALESCE(EXCLUDED.primary_tx_hash, transaction_submitter_submissions.primary_tx_hash),
86 terminal_error = EXCLUDED.terminal_error,
87 updated_at = CURRENT_TIMESTAMP
88 RETURNING submission_id
89 "#,
90 )
91 .bind::<Binary, _>(record.submitter.as_slice())
92 .bind::<BigInt, _>(nonce)
93 .bind::<Text, _>(record.status.as_str())
94 .bind::<Nullable<Text>, _>(record.primary_tx_hash.as_deref())
95 .bind::<Nullable<Text>, _>(record.terminal_error.as_deref())
96 .get_result::<SubmissionIdRow>(conn)?;
97
98 if !attempts.is_empty() {
99 use crate::schema::transaction_submitter_attempts::dsl as attempts_dsl;
100
101 let attempt_values = attempts
102 .iter()
103 .map(|attempt| {
104 (
105 attempts_dsl::submission_id.eq(row.submission_id),
106 attempts_dsl::tx_hash.eq(attempt.tx_hash.as_str()),
107 attempts_dsl::raw_tx.eq(attempt.raw_tx.as_slice()),
108 )
109 })
110 .collect::<Vec<_>>();
111
112 diesel::insert_into(attempts_dsl::transaction_submitter_attempts)
113 .values(&attempt_values)
114 .on_conflict(attempts_dsl::tx_hash)
115 .do_nothing()
116 .execute(conn)?;
117 }
118
119 Ok(())
120 })?;
121 Ok(())
122 }
123
124 fn update_submission_status_sync(
125 &self,
126 submitter: &SubmitterId,
127 nonce: SubmittedNonce,
128 status: SubmissionStatus,
129 primary_tx_hash: Option<&str>,
130 terminal_error: Option<&str>,
131 ) -> Result<()> {
132 use crate::schema::transaction_submitter_submissions::dsl as submissions_dsl;
133
134 let mut conn = self.pool().get()?;
135 let nonce_i64 = nonce_to_i64(nonce)?;
136 let target = submissions_dsl::transaction_submitter_submissions
137 .filter(submissions_dsl::submitter_address.eq(submitter.as_slice()))
138 .filter(submissions_dsl::nonce.eq(nonce_i64));
139
140 let updated = if let Some(primary_tx_hash) = primary_tx_hash {
141 diesel::update(target)
142 .set((
143 submissions_dsl::status.eq(status.as_str()),
144 submissions_dsl::primary_tx_hash.eq(Some(primary_tx_hash)),
145 submissions_dsl::terminal_error.eq(terminal_error),
146 submissions_dsl::updated_at.eq(diesel::dsl::now),
147 ))
148 .execute(&mut conn)?
149 } else {
150 diesel::update(target)
151 .set((
152 submissions_dsl::status.eq(status.as_str()),
153 submissions_dsl::terminal_error.eq(terminal_error),
154 submissions_dsl::updated_at.eq(diesel::dsl::now),
155 ))
156 .execute(&mut conn)?
157 };
158 ensure!(
159 updated == 1,
160 "expected one transaction_submitter_submissions row for submitter={} nonce={} while setting status={} primary_tx_hash={:?}, updated {}",
161 submitter,
162 nonce_i64,
163 status.as_str(),
164 primary_tx_hash,
165 updated
166 );
167
168 Ok(())
169 }
170
171 fn list_pending_submissions_sync(
172 &self,
173 after_submission_id: i64,
174 limit: i64,
175 ) -> Result<Vec<PendingSubmissionRow>> {
176 #[derive(diesel::QueryableByName)]
177 struct Row {
178 #[diesel(sql_type = BigInt)]
179 submission_id: i64,
180 #[diesel(sql_type = Binary)]
181 submitter_address: Vec<u8>,
182 #[diesel(sql_type = BigInt)]
183 nonce: i64,
184 #[diesel(sql_type = Array<Text>)]
185 tx_hashes: Vec<String>,
186 #[diesel(sql_type = Nullable<Text>)]
187 directive_id: Option<String>,
188 }
189
190 let mut conn = self.pool().get()?;
191 let rows = diesel::sql_query(
192 r#"
193 SELECT s.submission_id,
194 s.submitter_address,
195 s.nonce,
196 COALESCE(
197 array_agg(a.tx_hash ORDER BY a.attempt_id)
198 FILTER (WHERE a.tx_hash IS NOT NULL),
199 ARRAY[]::TEXT[]
200 ) AS tx_hashes,
201 MAX(d.directive_id) AS directive_id
202 FROM transaction_submitter_submissions s
203 LEFT JOIN transaction_submitter_attempts a
204 ON a.submission_id = s.submission_id
205 LEFT JOIN directive_outbox d
206 ON d.submitter_address = s.submitter_address
207 AND d.submitter_nonce = s.nonce
208 WHERE s.submission_id > $1
209 AND s.status IN ('broadcasted', 'pending', 'confirming')
210 GROUP BY s.submission_id, s.submitter_address, s.nonce
211 ORDER BY s.submission_id ASC
212 LIMIT $2
213 "#,
214 )
215 .bind::<BigInt, _>(after_submission_id)
216 .bind::<BigInt, _>(limit.max(1))
217 .get_results::<Row>(&mut conn)?;
218
219 rows.into_iter()
220 .map(|row| {
221 Ok(PendingSubmissionRow {
222 submission_id: row.submission_id,
223 submitter: address_from_bytes(&row.submitter_address, "submitter")?,
224 nonce: nonce_from_i64(row.nonce)?,
225 tx_hashes: row.tx_hashes,
226 directive_id: row.directive_id,
227 })
228 })
229 .collect()
230 }
231}
232
233#[async_trait]
234impl AsyncTransactionSubmitterStore for DieselDb {
235 async fn max_nonce_for_submitter(&self, submitter: &SubmitterId) -> Result<Option<u64>> {
236 use crate::schema::transaction_submitter_submissions::dsl as submissions_dsl;
237
238 let mut conn = self.get_conn().await?;
239 let query = submissions_dsl::transaction_submitter_submissions
240 .filter(submissions_dsl::submitter_address.eq(submitter.as_slice()))
241 .filter(submissions_dsl::primary_tx_hash.is_not_null())
242 .select(diesel::dsl::max(submissions_dsl::nonce));
243 let max_nonce: Option<i64> = diesel_async::RunQueryDsl::first(query, &mut conn).await?;
244
245 max_nonce.map(nonce_from_i64).transpose()
246 }
247
248 async fn record_submission(
249 &self,
250 record: &SubmissionRecord,
251 attempts: &[SubmissionAttemptRecord],
252 ) -> Result<()> {
253 #[derive(diesel::QueryableByName)]
254 struct SubmissionIdRow {
255 #[diesel(sql_type = BigInt)]
256 submission_id: i64,
257 }
258
259 let mut conn = self.get_conn().await?;
260 let nonce = nonce_to_i64(record.nonce)?;
261 conn.transaction::<_, diesel::result::Error, _>(async |conn| {
262 let query = diesel::sql_query(
263 r#"
264 INSERT INTO transaction_submitter_submissions (
265 submitter_address,
266 nonce,
267 status,
268 primary_tx_hash,
269 terminal_error
270 )
271 VALUES ($1, $2, $3, $4, $5)
272 ON CONFLICT (submitter_address, nonce) DO UPDATE
273 SET status = EXCLUDED.status,
274 primary_tx_hash = COALESCE(EXCLUDED.primary_tx_hash, transaction_submitter_submissions.primary_tx_hash),
275 terminal_error = EXCLUDED.terminal_error,
276 updated_at = CURRENT_TIMESTAMP
277 RETURNING submission_id
278 "#,
279 )
280 .bind::<Binary, _>(record.submitter.as_slice())
281 .bind::<BigInt, _>(nonce)
282 .bind::<Text, _>(record.status.as_str())
283 .bind::<Nullable<Text>, _>(record.primary_tx_hash.as_deref())
284 .bind::<Nullable<Text>, _>(record.terminal_error.as_deref());
285
286 let row =
287 diesel_async::RunQueryDsl::get_result::<SubmissionIdRow>(query, &mut *conn)
288 .await?;
289
290 if !attempts.is_empty() {
291 use crate::schema::transaction_submitter_attempts::dsl as attempts_dsl;
292
293 let attempt_values = attempts
294 .iter()
295 .map(|attempt| {
296 (
297 attempts_dsl::submission_id.eq(row.submission_id),
298 attempts_dsl::tx_hash.eq(attempt.tx_hash.as_str()),
299 attempts_dsl::raw_tx.eq(attempt.raw_tx.as_slice()),
300 )
301 })
302 .collect::<Vec<_>>();
303
304 let query = diesel::insert_into(attempts_dsl::transaction_submitter_attempts)
305 .values(&attempt_values)
306 .on_conflict(attempts_dsl::tx_hash)
307 .do_nothing();
308 diesel_async::RunQueryDsl::execute(query, &mut *conn).await?;
309 }
310
311 Ok(())
312 })
313 .await?;
314
315 Ok(())
316 }
317
318 async fn update_submission_status(
319 &self,
320 submitter: &SubmitterId,
321 nonce: SubmittedNonce,
322 status: SubmissionStatus,
323 primary_tx_hash: Option<&str>,
324 terminal_error: Option<&str>,
325 ) -> Result<()> {
326 use crate::schema::transaction_submitter_submissions::dsl as submissions_dsl;
327
328 let mut conn = self.get_conn().await?;
329 let nonce_i64 = nonce_to_i64(nonce)?;
330 let target = submissions_dsl::transaction_submitter_submissions
331 .filter(submissions_dsl::submitter_address.eq(submitter.as_slice()))
332 .filter(submissions_dsl::nonce.eq(nonce_i64));
333
334 let updated = if let Some(primary_tx_hash) = primary_tx_hash {
335 let query = diesel::update(target).set((
336 submissions_dsl::status.eq(status.as_str()),
337 submissions_dsl::primary_tx_hash.eq(Some(primary_tx_hash)),
338 submissions_dsl::terminal_error.eq(terminal_error),
339 submissions_dsl::updated_at.eq(diesel::dsl::now),
340 ));
341 diesel_async::RunQueryDsl::execute(query, &mut conn).await?
342 } else {
343 let query = diesel::update(target).set((
344 submissions_dsl::status.eq(status.as_str()),
345 submissions_dsl::terminal_error.eq(terminal_error),
346 submissions_dsl::updated_at.eq(diesel::dsl::now),
347 ));
348 diesel_async::RunQueryDsl::execute(query, &mut conn).await?
349 };
350 ensure!(
351 updated == 1,
352 "expected one transaction_submitter_submissions row for submitter={} nonce={} while setting status={} primary_tx_hash={:?}, updated {}",
353 submitter,
354 nonce_i64,
355 status.as_str(),
356 primary_tx_hash,
357 updated
358 );
359 Ok(())
360 }
361
362 async fn list_pending_submissions(
363 &self,
364 after_submission_id: i64,
365 limit: i64,
366 ) -> Result<Vec<PendingSubmissionRow>> {
367 #[derive(diesel::QueryableByName)]
368 struct Row {
369 #[diesel(sql_type = BigInt)]
370 submission_id: i64,
371 #[diesel(sql_type = Binary)]
372 submitter_address: Vec<u8>,
373 #[diesel(sql_type = BigInt)]
374 nonce: i64,
375 #[diesel(sql_type = Array<Text>)]
376 tx_hashes: Vec<String>,
377 #[diesel(sql_type = Nullable<Text>)]
378 directive_id: Option<String>,
379 }
380
381 let mut conn = self.get_conn().await?;
382 let query = diesel::sql_query(
383 r#"
384 SELECT s.submission_id,
385 s.submitter_address,
386 s.nonce,
387 COALESCE(
388 array_agg(a.tx_hash ORDER BY a.attempt_id)
389 FILTER (WHERE a.tx_hash IS NOT NULL),
390 ARRAY[]::TEXT[]
391 ) AS tx_hashes,
392 MAX(d.directive_id) AS directive_id
393 FROM transaction_submitter_submissions s
394 LEFT JOIN transaction_submitter_attempts a
395 ON a.submission_id = s.submission_id
396 LEFT JOIN directive_outbox d
397 ON d.submitter_address = s.submitter_address
398 AND d.submitter_nonce = s.nonce
399 WHERE s.submission_id > $1
400 AND s.status IN ('broadcasted', 'pending', 'confirming')
401 GROUP BY s.submission_id, s.submitter_address, s.nonce
402 ORDER BY s.submission_id ASC
403 LIMIT $2
404 "#,
405 )
406 .bind::<BigInt, _>(after_submission_id)
407 .bind::<BigInt, _>(limit.max(1));
408 let rows = diesel_async::RunQueryDsl::get_results::<Row>(query, &mut conn).await?;
409
410 rows.into_iter()
411 .map(|row| {
412 Ok(PendingSubmissionRow {
413 submission_id: row.submission_id,
414 submitter: address_from_bytes(&row.submitter_address, "submitter")?,
415 nonce: nonce_from_i64(row.nonce)?,
416 tx_hashes: row.tx_hashes,
417 directive_id: row.directive_id,
418 })
419 })
420 .collect()
421 }
422}
423
424#[async_trait]
425impl TransactionSubmitterReader for DieselDb {
426 async fn get_submission_by_nonce(
427 &self,
428 submitter: &SubmitterId,
429 nonce: SubmittedNonce,
430 ) -> Result<Option<SubmissionDetailRow>> {
431 #[derive(diesel::QueryableByName)]
432 struct Row {
433 #[diesel(sql_type = Binary)]
434 submitter_address: Vec<u8>,
435 #[diesel(sql_type = BigInt)]
436 nonce: i64,
437 #[diesel(sql_type = Text)]
438 status: String,
439 #[diesel(sql_type = Nullable<Text>)]
440 primary_tx_hash: Option<String>,
441 #[diesel(sql_type = Nullable<Text>)]
442 terminal_error: Option<String>,
443 #[diesel(sql_type = Array<Text>)]
444 tx_hashes: Vec<String>,
445 #[diesel(sql_type = Array<Binary>)]
446 raw_txs: Vec<Vec<u8>>,
447 }
448
449 let mut conn = self.get_conn().await?;
450 let query = diesel::sql_query(
451 r#"
452 SELECT s.submitter_address,
453 s.nonce,
454 s.status,
455 s.primary_tx_hash,
456 s.terminal_error,
457 COALESCE(
458 array_agg(a.tx_hash ORDER BY a.attempt_id)
459 FILTER (WHERE a.tx_hash IS NOT NULL),
460 ARRAY[]::TEXT[]
461 ) AS tx_hashes,
462 COALESCE(
463 array_agg(a.raw_tx ORDER BY a.attempt_id)
464 FILTER (WHERE a.raw_tx IS NOT NULL),
465 ARRAY[]::BYTEA[]
466 ) AS raw_txs
467 FROM transaction_submitter_submissions s
468 LEFT JOIN transaction_submitter_attempts a
469 ON a.submission_id = s.submission_id
470 WHERE s.submitter_address = $1
471 AND s.nonce = $2
472 GROUP BY s.submission_id, s.submitter_address, s.nonce, s.status, s.primary_tx_hash, s.terminal_error
473 "#,
474 )
475 .bind::<Binary, _>(submitter.as_slice())
476 .bind::<BigInt, _>(nonce_to_i64(nonce)?);
477
478 let row = diesel_async::RunQueryDsl::get_result::<Row>(query, &mut conn)
479 .await
480 .optional()?;
481
482 row.map(|row| {
483 Ok(SubmissionDetailRow {
484 submitter: address_from_bytes(&row.submitter_address, "submitter")?,
485 nonce: nonce_from_i64(row.nonce)?,
486 status: submission_status_from_str(&row.status)?,
487 primary_tx_hash: row.primary_tx_hash,
488 terminal_error: row.terminal_error,
489 attempts: row
490 .tx_hashes
491 .into_iter()
492 .zip(row.raw_txs)
493 .map(|(tx_hash, raw_tx)| SubmissionAttemptRecord { tx_hash, raw_tx })
494 .collect(),
495 })
496 })
497 .transpose()
498 }
499}
500
501#[cfg(test)]
502mod tests {
503 use super::*;
504 use crate::test_helpers::TestDb;
505 use alloy::primitives::Address;
506
507 fn submitter(byte: u8) -> SubmitterId {
508 SubmitterId::from(Address::repeat_byte(byte))
509 }
510
511 fn record(byte: u8, nonce: u64, tx_hash: &str) -> SubmissionRecord {
512 SubmissionRecord {
513 submitter: submitter(byte),
514 nonce: nonce,
515 status: SubmissionStatus::Pending,
516 primary_tx_hash: Some(tx_hash.to_string()),
517 terminal_error: None,
518 }
519 }
520
521 fn attempt(tx_hash: &str) -> SubmissionAttemptRecord {
522 SubmissionAttemptRecord {
523 tx_hash: tx_hash.to_string(),
524 raw_tx: format!("raw:{tx_hash}").into_bytes(),
525 }
526 }
527
528 #[tokio::test]
529 async fn sync_record_submission_persists_submission_and_attempts() {
530 let db = TestDb::new().await.expect("test db");
531 let record = record(
532 1,
533 7,
534 "0x1111111111111111111111111111111111111111111111111111111111111111",
535 );
536 let attempts = vec![
537 attempt("0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"),
538 attempt("0xbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"),
539 ];
540
541 db.handler
542 .record_submission_sync(&record, &attempts)
543 .expect("record submission");
544 db.handler
545 .record_submission_sync(&record, &attempts)
546 .expect("record submission idempotently");
547
548 let diesel_db = db.diesel_db().await;
549 let detail = diesel_db
550 .get_submission_by_nonce(&record.submitter, record.nonce)
551 .await
552 .expect("submission detail")
553 .expect("submission exists");
554
555 assert_eq!(detail.submitter, record.submitter);
556 assert_eq!(detail.nonce, record.nonce);
557 assert_eq!(detail.status, SubmissionStatus::Pending);
558 assert_eq!(detail.attempts, attempts);
559 }
560
561 #[tokio::test]
562 async fn sync_max_nonce_for_submitter_returns_highest_nonce_for_address() {
563 let db = TestDb::new().await.expect("test db");
564 let submitter = submitter(1);
565
566 let empty = db
567 .handler
568 .max_nonce_for_submitter_sync(&submitter)
569 .expect("empty max nonce lookup");
570 assert_eq!(empty, None);
571
572 db.handler
573 .record_submission_sync(
574 &record(
575 1,
576 3,
577 "0x0303030303030303030303030303030303030303030303030303030303030303",
578 ),
579 &[],
580 )
581 .expect("record lower nonce");
582 db.handler
583 .record_submission_sync(
584 &record(
585 1,
586 11,
587 "0x1111111111111111111111111111111111111111111111111111111111111111",
588 ),
589 &[],
590 )
591 .expect("record higher nonce");
592 db.handler
593 .record_submission_sync(
594 &record(
595 9,
596 99,
597 "0x9999999999999999999999999999999999999999999999999999999999999999",
598 ),
599 &[],
600 )
601 .expect("record different submitter");
602
603 let max_nonce = db
604 .handler
605 .max_nonce_for_submitter_sync(&submitter)
606 .expect("max nonce lookup");
607 assert_eq!(max_nonce, Some(11));
608 }
609
610 #[tokio::test]
611 async fn sync_max_nonce_for_submitter_ignores_unbroadcast_rows() {
612 let db = TestDb::new().await.expect("test db");
613 let submitter = submitter(1);
614
615 db.handler
616 .record_submission_sync(
617 &record(
618 1,
619 3,
620 "0x0303030303030303030303030303030303030303030303030303030303030303",
621 ),
622 &[],
623 )
624 .expect("record broadcast nonce");
625 db.handler
626 .record_submission_sync(
627 &SubmissionRecord {
628 submitter,
629 nonce: 11,
630 status: SubmissionStatus::Created,
631 primary_tx_hash: None,
632 terminal_error: None,
633 },
634 &[],
635 )
636 .expect("record created nonce");
637 db.handler
638 .update_submission_status_sync(
639 &submitter,
640 11,
641 SubmissionStatus::Failed,
642 None,
643 Some("rpc rejected before broadcast"),
644 )
645 .expect("mark unbroadcast nonce failed");
646
647 let max_nonce = db
648 .handler
649 .max_nonce_for_submitter_sync(&submitter)
650 .expect("max nonce lookup");
651 assert_eq!(max_nonce, Some(3));
652 }
653
654 #[tokio::test]
655 async fn sync_update_submission_status_updates_terminal_state_without_clearing_hash() {
656 let db = TestDb::new().await.expect("test db");
657 let record = record(
658 2,
659 8,
660 "0x2222222222222222222222222222222222222222222222222222222222222222",
661 );
662 db.handler
663 .record_submission_sync(&record, &[])
664 .expect("record submission");
665
666 db.handler
667 .update_submission_status_sync(
668 &record.submitter,
669 record.nonce,
670 SubmissionStatus::Failed,
671 None,
672 Some("reverted"),
673 )
674 .expect("update submission status");
675
676 let diesel_db = db.diesel_db().await;
677 let detail = diesel_db
678 .get_submission_by_nonce(&record.submitter, record.nonce)
679 .await
680 .expect("submission detail")
681 .expect("submission exists");
682
683 assert_eq!(detail.status, SubmissionStatus::Failed);
684 assert_eq!(detail.primary_tx_hash, record.primary_tx_hash);
685 assert_eq!(detail.terminal_error.as_deref(), Some("reverted"));
686 }
687
688 #[tokio::test]
689 async fn sync_update_submission_status_errors_for_missing_nonce() {
690 let db = TestDb::new().await.expect("test db");
691 let submitter = submitter(2);
692
693 let err = db
694 .handler
695 .update_submission_status_sync(
696 &submitter,
697 404,
698 SubmissionStatus::Failed,
699 None,
700 Some("missing"),
701 )
702 .expect_err("missing submission update must fail");
703
704 assert!(
705 err.to_string()
706 .contains("expected one transaction_submitter_submissions row"),
707 "{err}"
708 );
709 }
710
711 #[tokio::test]
712 async fn sync_list_pending_submissions_returns_pending_rows_with_attempts() {
713 let db = TestDb::new().await.expect("test db");
714 let pending = record(
715 3,
716 9,
717 "0x3333333333333333333333333333333333333333333333333333333333333333",
718 );
719 let confirmed = record(
720 4,
721 10,
722 "0x4444444444444444444444444444444444444444444444444444444444444444",
723 );
724 let attempts = vec![attempt(
725 "0xcccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc",
726 )];
727
728 db.handler
729 .record_submission_sync(&pending, &attempts)
730 .expect("record pending");
731 db.handler
732 .record_submission_sync(&confirmed, &[])
733 .expect("record confirmed seed");
734 db.handler
735 .update_submission_status_sync(
736 &confirmed.submitter,
737 confirmed.nonce,
738 SubmissionStatus::Confirmed,
739 confirmed.primary_tx_hash.as_deref(),
740 None,
741 )
742 .expect("mark confirmed");
743
744 let rows = db
745 .handler
746 .list_pending_submissions_sync(0, 10)
747 .expect("pending submissions");
748
749 assert_eq!(rows.len(), 1);
750 assert_eq!(rows[0].submitter, pending.submitter);
751 assert_eq!(rows[0].nonce, pending.nonce);
752 assert_eq!(rows[0].tx_hashes, vec![attempts[0].tx_hash.clone()]);
753 }
754
755 #[tokio::test]
756 async fn sync_list_pending_submissions_excludes_created_rows_without_broadcast_attempts() {
757 let db = TestDb::new().await.expect("test db");
758 let created = SubmissionRecord {
759 submitter: submitter(3),
760 nonce: 17,
761 status: SubmissionStatus::Created,
762 primary_tx_hash: None,
763 terminal_error: None,
764 };
765
766 db.handler
767 .record_submission_sync(&created, &[])
768 .expect("record created submission");
769
770 let rows = db
771 .handler
772 .list_pending_submissions_sync(0, 10)
773 .expect("pending submissions");
774
775 assert!(rows.is_empty());
776 }
777
778 #[tokio::test]
779 async fn async_record_submission_persists_submission_and_attempts() {
780 let db = TestDb::new().await.expect("test db");
781 let diesel_db = db.diesel_db().await;
782 let record = record(
783 5,
784 11,
785 "0x5555555555555555555555555555555555555555555555555555555555555555",
786 );
787 let attempts = vec![attempt(
788 "0xdddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddd",
789 )];
790
791 diesel_db
792 .record_submission(&record, &attempts)
793 .await
794 .expect("record submission");
795
796 let detail = diesel_db
797 .get_submission_by_nonce(&record.submitter, record.nonce)
798 .await
799 .expect("submission detail")
800 .expect("submission exists");
801
802 assert_eq!(detail.status, SubmissionStatus::Pending);
803 assert_eq!(detail.attempts, attempts);
804 }
805
806 #[tokio::test]
807 async fn async_max_nonce_for_submitter_returns_highest_nonce_for_address() {
808 let db = TestDb::new().await.expect("test db");
809 let diesel_db = db.diesel_db().await;
810 let submitter = submitter(5);
811
812 let empty = diesel_db
813 .max_nonce_for_submitter(&submitter)
814 .await
815 .expect("empty max nonce lookup");
816 assert_eq!(empty, None);
817
818 diesel_db
819 .record_submission(
820 &record(
821 5,
822 4,
823 "0x0404040404040404040404040404040404040404040404040404040404040404",
824 ),
825 &[],
826 )
827 .await
828 .expect("record lower nonce");
829 diesel_db
830 .record_submission(
831 &record(
832 5,
833 12,
834 "0x1212121212121212121212121212121212121212121212121212121212121212",
835 ),
836 &[],
837 )
838 .await
839 .expect("record higher nonce");
840 diesel_db
841 .record_submission(
842 &record(
843 10,
844 100,
845 "0x1010101010101010101010101010101010101010101010101010101010101010",
846 ),
847 &[],
848 )
849 .await
850 .expect("record different submitter");
851
852 let max_nonce = diesel_db
853 .max_nonce_for_submitter(&submitter)
854 .await
855 .expect("max nonce lookup");
856 assert_eq!(max_nonce, Some(12));
857 }
858
859 #[tokio::test]
860 async fn async_max_nonce_for_submitter_ignores_unbroadcast_rows() {
861 let db = TestDb::new().await.expect("test db");
862 let diesel_db = db.diesel_db().await;
863 let submitter = submitter(5);
864
865 diesel_db
866 .record_submission(
867 &record(
868 5,
869 4,
870 "0x0404040404040404040404040404040404040404040404040404040404040404",
871 ),
872 &[],
873 )
874 .await
875 .expect("record broadcast nonce");
876 diesel_db
877 .record_submission(
878 &SubmissionRecord {
879 submitter,
880 nonce: 12,
881 status: SubmissionStatus::Created,
882 primary_tx_hash: None,
883 terminal_error: None,
884 },
885 &[],
886 )
887 .await
888 .expect("record created nonce");
889 diesel_db
890 .update_submission_status(
891 &submitter,
892 12,
893 SubmissionStatus::Failed,
894 None,
895 Some("rpc rejected before broadcast"),
896 )
897 .await
898 .expect("mark unbroadcast nonce failed");
899
900 let max_nonce = diesel_db
901 .max_nonce_for_submitter(&submitter)
902 .await
903 .expect("max nonce lookup");
904 assert_eq!(max_nonce, Some(4));
905 }
906
907 #[tokio::test]
908 async fn async_update_submission_status_updates_terminal_state_without_clearing_hash() {
909 let db = TestDb::new().await.expect("test db");
910 let diesel_db = db.diesel_db().await;
911 let record = record(
912 6,
913 12,
914 "0x6666666666666666666666666666666666666666666666666666666666666666",
915 );
916 diesel_db
917 .record_submission(&record, &[])
918 .await
919 .expect("record submission");
920
921 diesel_db
922 .update_submission_status(
923 &record.submitter,
924 record.nonce,
925 SubmissionStatus::Failed,
926 None,
927 Some("timeout"),
928 )
929 .await
930 .expect("update submission");
931
932 let detail = diesel_db
933 .get_submission_by_nonce(&record.submitter, record.nonce)
934 .await
935 .expect("submission detail")
936 .expect("submission exists");
937
938 assert_eq!(detail.status, SubmissionStatus::Failed);
939 assert_eq!(detail.primary_tx_hash, record.primary_tx_hash);
940 assert_eq!(detail.terminal_error.as_deref(), Some("timeout"));
941 }
942
943 #[tokio::test]
944 async fn async_update_submission_status_errors_for_missing_nonce() {
945 let db = TestDb::new().await.expect("test db");
946 let diesel_db = db.diesel_db().await;
947 let submitter = submitter(6);
948
949 let err = diesel_db
950 .update_submission_status(
951 &submitter,
952 405,
953 SubmissionStatus::Failed,
954 None,
955 Some("missing"),
956 )
957 .await
958 .expect_err("missing submission update must fail");
959
960 assert!(
961 err.to_string()
962 .contains("expected one transaction_submitter_submissions row"),
963 "{err}"
964 );
965 }
966
967 #[tokio::test]
968 async fn async_list_pending_submissions_returns_pending_rows_with_attempts() {
969 let db = TestDb::new().await.expect("test db");
970 let diesel_db = db.diesel_db().await;
971 let pending = record(
972 7,
973 13,
974 "0x7777777777777777777777777777777777777777777777777777777777777777",
975 );
976 let attempts = vec![attempt(
977 "0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee",
978 )];
979
980 diesel_db
981 .record_submission(&pending, &attempts)
982 .await
983 .expect("record pending");
984
985 let rows = diesel_db
986 .list_pending_submissions(0, 10)
987 .await
988 .expect("pending submissions");
989
990 assert_eq!(rows.len(), 1);
991 assert_eq!(rows[0].submitter, pending.submitter);
992 assert_eq!(rows[0].nonce, pending.nonce);
993 assert_eq!(rows[0].tx_hashes, vec![attempts[0].tx_hash.clone()]);
994 }
995
996 #[tokio::test]
997 async fn get_submission_by_nonce_returns_none_for_unknown_nonce() {
998 let db = TestDb::new().await.expect("test db");
999 let diesel_db = db.diesel_db().await;
1000
1001 let detail = diesel_db
1002 .get_submission_by_nonce(&submitter(8), 14)
1003 .await
1004 .expect("lookup unknown submission");
1005
1006 assert!(detail.is_none());
1007 }
1008}