Skip to main content

hypercall_db_diesel/
transaction_submitter.rs

1use crate::{DatabaseHandler, DieselDb};
2use anyhow::{anyhow, ensure, Result};
3use async_trait::async_trait;
4use diesel::sql_types::{Array, BigInt, Binary, Nullable, Text};
5use diesel::{Connection, ExpressionMethods, OptionalExtension, QueryDsl, RunQueryDsl};
6use diesel_async::AsyncConnection;
7use hypercall_transaction_submitter_core::{SubmissionStatus, SubmittedNonce, SubmitterId};
8use hypercall_transaction_submitter_db::{
9    AsyncTransactionSubmitterStore, PendingSubmissionRow, SubmissionAttemptRecord,
10    SubmissionDetailRow, SubmissionRecord, TransactionSubmitterReader, TransactionSubmitterStore,
11};
12
13fn submission_status_from_str(value: &str) -> Result<SubmissionStatus> {
14    match value {
15        "not_found" => Ok(SubmissionStatus::NotFound),
16        "pending" => Ok(SubmissionStatus::Pending),
17        "confirming" => Ok(SubmissionStatus::Confirming),
18        "confirmed" => Ok(SubmissionStatus::Confirmed),
19        "failed" => Ok(SubmissionStatus::Failed),
20        "created" => Ok(SubmissionStatus::Created),
21        "broadcasted" => Ok(SubmissionStatus::Broadcasted),
22        other => panic!("corrupt persisted transaction submitter status {other}"),
23    }
24}
25
26fn nonce_from_i64(value: i64) -> Result<SubmittedNonce> {
27    u64::try_from(value).map_err(|_| anyhow!("negative transaction submitter nonce {value}"))
28}
29
30fn nonce_to_i64(nonce: SubmittedNonce) -> Result<i64> {
31    i64::try_from(nonce).map_err(|_| anyhow!("submitter nonce does not fit i64: {nonce}"))
32}
33
34fn address_from_bytes(bytes: &[u8], label: &str) -> Result<SubmitterId> {
35    let arr: [u8; 20] = bytes.try_into().map_err(|_| {
36        anyhow!(
37            "invalid transaction submitter {} address length {}, expected 20",
38            label,
39            bytes.len()
40        )
41    })?;
42    Ok(SubmitterId::from(arr))
43}
44
45impl TransactionSubmitterStore for DatabaseHandler {
46    fn max_nonce_for_submitter_sync(&self, submitter: &SubmitterId) -> Result<Option<u64>> {
47        use crate::schema::transaction_submitter_submissions::dsl as submissions_dsl;
48
49        let mut conn = self.pool().get()?;
50        let max_nonce: Option<i64> = submissions_dsl::transaction_submitter_submissions
51            .filter(submissions_dsl::submitter_address.eq(submitter.as_slice()))
52            .filter(submissions_dsl::primary_tx_hash.is_not_null())
53            .select(diesel::dsl::max(submissions_dsl::nonce))
54            .first(&mut conn)?;
55
56        max_nonce.map(nonce_from_i64).transpose()
57    }
58
59    fn record_submission_sync(
60        &self,
61        record: &SubmissionRecord,
62        attempts: &[SubmissionAttemptRecord],
63    ) -> Result<()> {
64        #[derive(diesel::QueryableByName)]
65        struct SubmissionIdRow {
66            #[diesel(sql_type = BigInt)]
67            submission_id: i64,
68        }
69
70        let mut conn = self.pool().get()?;
71        let nonce = nonce_to_i64(record.nonce)?;
72        conn.transaction::<_, diesel::result::Error, _>(|conn| {
73            let row = diesel::sql_query(
74                r#"
75                INSERT INTO transaction_submitter_submissions (
76                    submitter_address,
77                    nonce,
78                    status,
79                    primary_tx_hash,
80                    terminal_error
81                )
82                VALUES ($1, $2, $3, $4, $5)
83                ON CONFLICT (submitter_address, nonce) DO UPDATE
84                SET status = EXCLUDED.status,
85                    primary_tx_hash = COALESCE(EXCLUDED.primary_tx_hash, transaction_submitter_submissions.primary_tx_hash),
86                    terminal_error = EXCLUDED.terminal_error,
87                    updated_at = CURRENT_TIMESTAMP
88                RETURNING submission_id
89                "#,
90            )
91            .bind::<Binary, _>(record.submitter.as_slice())
92            .bind::<BigInt, _>(nonce)
93            .bind::<Text, _>(record.status.as_str())
94            .bind::<Nullable<Text>, _>(record.primary_tx_hash.as_deref())
95            .bind::<Nullable<Text>, _>(record.terminal_error.as_deref())
96            .get_result::<SubmissionIdRow>(conn)?;
97
98            if !attempts.is_empty() {
99                use crate::schema::transaction_submitter_attempts::dsl as attempts_dsl;
100
101                let attempt_values = attempts
102                    .iter()
103                    .map(|attempt| {
104                        (
105                            attempts_dsl::submission_id.eq(row.submission_id),
106                            attempts_dsl::tx_hash.eq(attempt.tx_hash.as_str()),
107                            attempts_dsl::raw_tx.eq(attempt.raw_tx.as_slice()),
108                        )
109                    })
110                    .collect::<Vec<_>>();
111
112                diesel::insert_into(attempts_dsl::transaction_submitter_attempts)
113                    .values(&attempt_values)
114                    .on_conflict(attempts_dsl::tx_hash)
115                    .do_nothing()
116                    .execute(conn)?;
117            }
118
119            Ok(())
120        })?;
121        Ok(())
122    }
123
124    fn update_submission_status_sync(
125        &self,
126        submitter: &SubmitterId,
127        nonce: SubmittedNonce,
128        status: SubmissionStatus,
129        primary_tx_hash: Option<&str>,
130        terminal_error: Option<&str>,
131    ) -> Result<()> {
132        use crate::schema::transaction_submitter_submissions::dsl as submissions_dsl;
133
134        let mut conn = self.pool().get()?;
135        let nonce_i64 = nonce_to_i64(nonce)?;
136        let target = submissions_dsl::transaction_submitter_submissions
137            .filter(submissions_dsl::submitter_address.eq(submitter.as_slice()))
138            .filter(submissions_dsl::nonce.eq(nonce_i64));
139
140        let updated = if let Some(primary_tx_hash) = primary_tx_hash {
141            diesel::update(target)
142                .set((
143                    submissions_dsl::status.eq(status.as_str()),
144                    submissions_dsl::primary_tx_hash.eq(Some(primary_tx_hash)),
145                    submissions_dsl::terminal_error.eq(terminal_error),
146                    submissions_dsl::updated_at.eq(diesel::dsl::now),
147                ))
148                .execute(&mut conn)?
149        } else {
150            diesel::update(target)
151                .set((
152                    submissions_dsl::status.eq(status.as_str()),
153                    submissions_dsl::terminal_error.eq(terminal_error),
154                    submissions_dsl::updated_at.eq(diesel::dsl::now),
155                ))
156                .execute(&mut conn)?
157        };
158        ensure!(
159            updated == 1,
160            "expected one transaction_submitter_submissions row for submitter={} nonce={} while setting status={} primary_tx_hash={:?}, updated {}",
161            submitter,
162            nonce_i64,
163            status.as_str(),
164            primary_tx_hash,
165            updated
166        );
167
168        Ok(())
169    }
170
171    fn list_pending_submissions_sync(
172        &self,
173        after_submission_id: i64,
174        limit: i64,
175    ) -> Result<Vec<PendingSubmissionRow>> {
176        #[derive(diesel::QueryableByName)]
177        struct Row {
178            #[diesel(sql_type = BigInt)]
179            submission_id: i64,
180            #[diesel(sql_type = Binary)]
181            submitter_address: Vec<u8>,
182            #[diesel(sql_type = BigInt)]
183            nonce: i64,
184            #[diesel(sql_type = Array<Text>)]
185            tx_hashes: Vec<String>,
186            #[diesel(sql_type = Nullable<Text>)]
187            directive_id: Option<String>,
188        }
189
190        let mut conn = self.pool().get()?;
191        let rows = diesel::sql_query(
192            r#"
193            SELECT s.submission_id,
194                   s.submitter_address,
195                   s.nonce,
196                   COALESCE(
197                       array_agg(a.tx_hash ORDER BY a.attempt_id)
198                           FILTER (WHERE a.tx_hash IS NOT NULL),
199                       ARRAY[]::TEXT[]
200                   ) AS tx_hashes,
201                   MAX(d.directive_id) AS directive_id
202            FROM transaction_submitter_submissions s
203            LEFT JOIN transaction_submitter_attempts a
204              ON a.submission_id = s.submission_id
205            LEFT JOIN directive_outbox d
206              ON d.submitter_address = s.submitter_address
207             AND d.submitter_nonce = s.nonce
208            WHERE s.submission_id > $1
209              AND s.status IN ('broadcasted', 'pending', 'confirming')
210            GROUP BY s.submission_id, s.submitter_address, s.nonce
211            ORDER BY s.submission_id ASC
212            LIMIT $2
213            "#,
214        )
215        .bind::<BigInt, _>(after_submission_id)
216        .bind::<BigInt, _>(limit.max(1))
217        .get_results::<Row>(&mut conn)?;
218
219        rows.into_iter()
220            .map(|row| {
221                Ok(PendingSubmissionRow {
222                    submission_id: row.submission_id,
223                    submitter: address_from_bytes(&row.submitter_address, "submitter")?,
224                    nonce: nonce_from_i64(row.nonce)?,
225                    tx_hashes: row.tx_hashes,
226                    directive_id: row.directive_id,
227                })
228            })
229            .collect()
230    }
231}
232
233#[async_trait]
234impl AsyncTransactionSubmitterStore for DieselDb {
235    async fn max_nonce_for_submitter(&self, submitter: &SubmitterId) -> Result<Option<u64>> {
236        use crate::schema::transaction_submitter_submissions::dsl as submissions_dsl;
237
238        let mut conn = self.get_conn().await?;
239        let query = submissions_dsl::transaction_submitter_submissions
240            .filter(submissions_dsl::submitter_address.eq(submitter.as_slice()))
241            .filter(submissions_dsl::primary_tx_hash.is_not_null())
242            .select(diesel::dsl::max(submissions_dsl::nonce));
243        let max_nonce: Option<i64> = diesel_async::RunQueryDsl::first(query, &mut conn).await?;
244
245        max_nonce.map(nonce_from_i64).transpose()
246    }
247
248    async fn record_submission(
249        &self,
250        record: &SubmissionRecord,
251        attempts: &[SubmissionAttemptRecord],
252    ) -> Result<()> {
253        #[derive(diesel::QueryableByName)]
254        struct SubmissionIdRow {
255            #[diesel(sql_type = BigInt)]
256            submission_id: i64,
257        }
258
259        let mut conn = self.get_conn().await?;
260        let nonce = nonce_to_i64(record.nonce)?;
261        conn.transaction::<_, diesel::result::Error, _>(async |conn| {
262                let query = diesel::sql_query(
263                    r#"
264                    INSERT INTO transaction_submitter_submissions (
265                        submitter_address,
266                        nonce,
267                        status,
268                        primary_tx_hash,
269                        terminal_error
270                    )
271                    VALUES ($1, $2, $3, $4, $5)
272                    ON CONFLICT (submitter_address, nonce) DO UPDATE
273                    SET status = EXCLUDED.status,
274                        primary_tx_hash = COALESCE(EXCLUDED.primary_tx_hash, transaction_submitter_submissions.primary_tx_hash),
275                        terminal_error = EXCLUDED.terminal_error,
276                        updated_at = CURRENT_TIMESTAMP
277                    RETURNING submission_id
278                    "#,
279                )
280                .bind::<Binary, _>(record.submitter.as_slice())
281                .bind::<BigInt, _>(nonce)
282                .bind::<Text, _>(record.status.as_str())
283                .bind::<Nullable<Text>, _>(record.primary_tx_hash.as_deref())
284                .bind::<Nullable<Text>, _>(record.terminal_error.as_deref());
285
286                let row =
287                    diesel_async::RunQueryDsl::get_result::<SubmissionIdRow>(query, &mut *conn)
288                        .await?;
289
290                if !attempts.is_empty() {
291                    use crate::schema::transaction_submitter_attempts::dsl as attempts_dsl;
292
293                    let attempt_values = attempts
294                        .iter()
295                        .map(|attempt| {
296                            (
297                                attempts_dsl::submission_id.eq(row.submission_id),
298                                attempts_dsl::tx_hash.eq(attempt.tx_hash.as_str()),
299                                attempts_dsl::raw_tx.eq(attempt.raw_tx.as_slice()),
300                            )
301                        })
302                        .collect::<Vec<_>>();
303
304                    let query = diesel::insert_into(attempts_dsl::transaction_submitter_attempts)
305                        .values(&attempt_values)
306                        .on_conflict(attempts_dsl::tx_hash)
307                        .do_nothing();
308                    diesel_async::RunQueryDsl::execute(query, &mut *conn).await?;
309                }
310
311                Ok(())
312        })
313        .await?;
314
315        Ok(())
316    }
317
318    async fn update_submission_status(
319        &self,
320        submitter: &SubmitterId,
321        nonce: SubmittedNonce,
322        status: SubmissionStatus,
323        primary_tx_hash: Option<&str>,
324        terminal_error: Option<&str>,
325    ) -> Result<()> {
326        use crate::schema::transaction_submitter_submissions::dsl as submissions_dsl;
327
328        let mut conn = self.get_conn().await?;
329        let nonce_i64 = nonce_to_i64(nonce)?;
330        let target = submissions_dsl::transaction_submitter_submissions
331            .filter(submissions_dsl::submitter_address.eq(submitter.as_slice()))
332            .filter(submissions_dsl::nonce.eq(nonce_i64));
333
334        let updated = if let Some(primary_tx_hash) = primary_tx_hash {
335            let query = diesel::update(target).set((
336                submissions_dsl::status.eq(status.as_str()),
337                submissions_dsl::primary_tx_hash.eq(Some(primary_tx_hash)),
338                submissions_dsl::terminal_error.eq(terminal_error),
339                submissions_dsl::updated_at.eq(diesel::dsl::now),
340            ));
341            diesel_async::RunQueryDsl::execute(query, &mut conn).await?
342        } else {
343            let query = diesel::update(target).set((
344                submissions_dsl::status.eq(status.as_str()),
345                submissions_dsl::terminal_error.eq(terminal_error),
346                submissions_dsl::updated_at.eq(diesel::dsl::now),
347            ));
348            diesel_async::RunQueryDsl::execute(query, &mut conn).await?
349        };
350        ensure!(
351            updated == 1,
352            "expected one transaction_submitter_submissions row for submitter={} nonce={} while setting status={} primary_tx_hash={:?}, updated {}",
353            submitter,
354            nonce_i64,
355            status.as_str(),
356            primary_tx_hash,
357            updated
358        );
359        Ok(())
360    }
361
362    async fn list_pending_submissions(
363        &self,
364        after_submission_id: i64,
365        limit: i64,
366    ) -> Result<Vec<PendingSubmissionRow>> {
367        #[derive(diesel::QueryableByName)]
368        struct Row {
369            #[diesel(sql_type = BigInt)]
370            submission_id: i64,
371            #[diesel(sql_type = Binary)]
372            submitter_address: Vec<u8>,
373            #[diesel(sql_type = BigInt)]
374            nonce: i64,
375            #[diesel(sql_type = Array<Text>)]
376            tx_hashes: Vec<String>,
377            #[diesel(sql_type = Nullable<Text>)]
378            directive_id: Option<String>,
379        }
380
381        let mut conn = self.get_conn().await?;
382        let query = diesel::sql_query(
383            r#"
384            SELECT s.submission_id,
385                   s.submitter_address,
386                   s.nonce,
387                   COALESCE(
388                       array_agg(a.tx_hash ORDER BY a.attempt_id)
389                           FILTER (WHERE a.tx_hash IS NOT NULL),
390                       ARRAY[]::TEXT[]
391                   ) AS tx_hashes,
392                   MAX(d.directive_id) AS directive_id
393            FROM transaction_submitter_submissions s
394            LEFT JOIN transaction_submitter_attempts a
395              ON a.submission_id = s.submission_id
396            LEFT JOIN directive_outbox d
397              ON d.submitter_address = s.submitter_address
398             AND d.submitter_nonce = s.nonce
399            WHERE s.submission_id > $1
400              AND s.status IN ('broadcasted', 'pending', 'confirming')
401            GROUP BY s.submission_id, s.submitter_address, s.nonce
402            ORDER BY s.submission_id ASC
403            LIMIT $2
404            "#,
405        )
406        .bind::<BigInt, _>(after_submission_id)
407        .bind::<BigInt, _>(limit.max(1));
408        let rows = diesel_async::RunQueryDsl::get_results::<Row>(query, &mut conn).await?;
409
410        rows.into_iter()
411            .map(|row| {
412                Ok(PendingSubmissionRow {
413                    submission_id: row.submission_id,
414                    submitter: address_from_bytes(&row.submitter_address, "submitter")?,
415                    nonce: nonce_from_i64(row.nonce)?,
416                    tx_hashes: row.tx_hashes,
417                    directive_id: row.directive_id,
418                })
419            })
420            .collect()
421    }
422}
423
424#[async_trait]
425impl TransactionSubmitterReader for DieselDb {
426    async fn get_submission_by_nonce(
427        &self,
428        submitter: &SubmitterId,
429        nonce: SubmittedNonce,
430    ) -> Result<Option<SubmissionDetailRow>> {
431        #[derive(diesel::QueryableByName)]
432        struct Row {
433            #[diesel(sql_type = Binary)]
434            submitter_address: Vec<u8>,
435            #[diesel(sql_type = BigInt)]
436            nonce: i64,
437            #[diesel(sql_type = Text)]
438            status: String,
439            #[diesel(sql_type = Nullable<Text>)]
440            primary_tx_hash: Option<String>,
441            #[diesel(sql_type = Nullable<Text>)]
442            terminal_error: Option<String>,
443            #[diesel(sql_type = Array<Text>)]
444            tx_hashes: Vec<String>,
445            #[diesel(sql_type = Array<Binary>)]
446            raw_txs: Vec<Vec<u8>>,
447        }
448
449        let mut conn = self.get_conn().await?;
450        let query = diesel::sql_query(
451            r#"
452            SELECT s.submitter_address,
453                   s.nonce,
454                   s.status,
455                   s.primary_tx_hash,
456                   s.terminal_error,
457                   COALESCE(
458                       array_agg(a.tx_hash ORDER BY a.attempt_id)
459                           FILTER (WHERE a.tx_hash IS NOT NULL),
460                       ARRAY[]::TEXT[]
461                   ) AS tx_hashes,
462                   COALESCE(
463                       array_agg(a.raw_tx ORDER BY a.attempt_id)
464                           FILTER (WHERE a.raw_tx IS NOT NULL),
465                       ARRAY[]::BYTEA[]
466                   ) AS raw_txs
467            FROM transaction_submitter_submissions s
468            LEFT JOIN transaction_submitter_attempts a
469              ON a.submission_id = s.submission_id
470            WHERE s.submitter_address = $1
471              AND s.nonce = $2
472            GROUP BY s.submission_id, s.submitter_address, s.nonce, s.status, s.primary_tx_hash, s.terminal_error
473            "#,
474        )
475        .bind::<Binary, _>(submitter.as_slice())
476        .bind::<BigInt, _>(nonce_to_i64(nonce)?);
477
478        let row = diesel_async::RunQueryDsl::get_result::<Row>(query, &mut conn)
479            .await
480            .optional()?;
481
482        row.map(|row| {
483            Ok(SubmissionDetailRow {
484                submitter: address_from_bytes(&row.submitter_address, "submitter")?,
485                nonce: nonce_from_i64(row.nonce)?,
486                status: submission_status_from_str(&row.status)?,
487                primary_tx_hash: row.primary_tx_hash,
488                terminal_error: row.terminal_error,
489                attempts: row
490                    .tx_hashes
491                    .into_iter()
492                    .zip(row.raw_txs)
493                    .map(|(tx_hash, raw_tx)| SubmissionAttemptRecord { tx_hash, raw_tx })
494                    .collect(),
495            })
496        })
497        .transpose()
498    }
499}
500
501#[cfg(test)]
502mod tests {
503    use super::*;
504    use crate::test_helpers::TestDb;
505    use alloy::primitives::Address;
506
507    fn submitter(byte: u8) -> SubmitterId {
508        SubmitterId::from(Address::repeat_byte(byte))
509    }
510
511    fn record(byte: u8, nonce: u64, tx_hash: &str) -> SubmissionRecord {
512        SubmissionRecord {
513            submitter: submitter(byte),
514            nonce: nonce,
515            status: SubmissionStatus::Pending,
516            primary_tx_hash: Some(tx_hash.to_string()),
517            terminal_error: None,
518        }
519    }
520
521    fn attempt(tx_hash: &str) -> SubmissionAttemptRecord {
522        SubmissionAttemptRecord {
523            tx_hash: tx_hash.to_string(),
524            raw_tx: format!("raw:{tx_hash}").into_bytes(),
525        }
526    }
527
528    #[tokio::test]
529    async fn sync_record_submission_persists_submission_and_attempts() {
530        let db = TestDb::new().await.expect("test db");
531        let record = record(
532            1,
533            7,
534            "0x1111111111111111111111111111111111111111111111111111111111111111",
535        );
536        let attempts = vec![
537            attempt("0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"),
538            attempt("0xbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"),
539        ];
540
541        db.handler
542            .record_submission_sync(&record, &attempts)
543            .expect("record submission");
544        db.handler
545            .record_submission_sync(&record, &attempts)
546            .expect("record submission idempotently");
547
548        let diesel_db = db.diesel_db().await;
549        let detail = diesel_db
550            .get_submission_by_nonce(&record.submitter, record.nonce)
551            .await
552            .expect("submission detail")
553            .expect("submission exists");
554
555        assert_eq!(detail.submitter, record.submitter);
556        assert_eq!(detail.nonce, record.nonce);
557        assert_eq!(detail.status, SubmissionStatus::Pending);
558        assert_eq!(detail.attempts, attempts);
559    }
560
561    #[tokio::test]
562    async fn sync_max_nonce_for_submitter_returns_highest_nonce_for_address() {
563        let db = TestDb::new().await.expect("test db");
564        let submitter = submitter(1);
565
566        let empty = db
567            .handler
568            .max_nonce_for_submitter_sync(&submitter)
569            .expect("empty max nonce lookup");
570        assert_eq!(empty, None);
571
572        db.handler
573            .record_submission_sync(
574                &record(
575                    1,
576                    3,
577                    "0x0303030303030303030303030303030303030303030303030303030303030303",
578                ),
579                &[],
580            )
581            .expect("record lower nonce");
582        db.handler
583            .record_submission_sync(
584                &record(
585                    1,
586                    11,
587                    "0x1111111111111111111111111111111111111111111111111111111111111111",
588                ),
589                &[],
590            )
591            .expect("record higher nonce");
592        db.handler
593            .record_submission_sync(
594                &record(
595                    9,
596                    99,
597                    "0x9999999999999999999999999999999999999999999999999999999999999999",
598                ),
599                &[],
600            )
601            .expect("record different submitter");
602
603        let max_nonce = db
604            .handler
605            .max_nonce_for_submitter_sync(&submitter)
606            .expect("max nonce lookup");
607        assert_eq!(max_nonce, Some(11));
608    }
609
610    #[tokio::test]
611    async fn sync_max_nonce_for_submitter_ignores_unbroadcast_rows() {
612        let db = TestDb::new().await.expect("test db");
613        let submitter = submitter(1);
614
615        db.handler
616            .record_submission_sync(
617                &record(
618                    1,
619                    3,
620                    "0x0303030303030303030303030303030303030303030303030303030303030303",
621                ),
622                &[],
623            )
624            .expect("record broadcast nonce");
625        db.handler
626            .record_submission_sync(
627                &SubmissionRecord {
628                    submitter,
629                    nonce: 11,
630                    status: SubmissionStatus::Created,
631                    primary_tx_hash: None,
632                    terminal_error: None,
633                },
634                &[],
635            )
636            .expect("record created nonce");
637        db.handler
638            .update_submission_status_sync(
639                &submitter,
640                11,
641                SubmissionStatus::Failed,
642                None,
643                Some("rpc rejected before broadcast"),
644            )
645            .expect("mark unbroadcast nonce failed");
646
647        let max_nonce = db
648            .handler
649            .max_nonce_for_submitter_sync(&submitter)
650            .expect("max nonce lookup");
651        assert_eq!(max_nonce, Some(3));
652    }
653
654    #[tokio::test]
655    async fn sync_update_submission_status_updates_terminal_state_without_clearing_hash() {
656        let db = TestDb::new().await.expect("test db");
657        let record = record(
658            2,
659            8,
660            "0x2222222222222222222222222222222222222222222222222222222222222222",
661        );
662        db.handler
663            .record_submission_sync(&record, &[])
664            .expect("record submission");
665
666        db.handler
667            .update_submission_status_sync(
668                &record.submitter,
669                record.nonce,
670                SubmissionStatus::Failed,
671                None,
672                Some("reverted"),
673            )
674            .expect("update submission status");
675
676        let diesel_db = db.diesel_db().await;
677        let detail = diesel_db
678            .get_submission_by_nonce(&record.submitter, record.nonce)
679            .await
680            .expect("submission detail")
681            .expect("submission exists");
682
683        assert_eq!(detail.status, SubmissionStatus::Failed);
684        assert_eq!(detail.primary_tx_hash, record.primary_tx_hash);
685        assert_eq!(detail.terminal_error.as_deref(), Some("reverted"));
686    }
687
688    #[tokio::test]
689    async fn sync_update_submission_status_errors_for_missing_nonce() {
690        let db = TestDb::new().await.expect("test db");
691        let submitter = submitter(2);
692
693        let err = db
694            .handler
695            .update_submission_status_sync(
696                &submitter,
697                404,
698                SubmissionStatus::Failed,
699                None,
700                Some("missing"),
701            )
702            .expect_err("missing submission update must fail");
703
704        assert!(
705            err.to_string()
706                .contains("expected one transaction_submitter_submissions row"),
707            "{err}"
708        );
709    }
710
711    #[tokio::test]
712    async fn sync_list_pending_submissions_returns_pending_rows_with_attempts() {
713        let db = TestDb::new().await.expect("test db");
714        let pending = record(
715            3,
716            9,
717            "0x3333333333333333333333333333333333333333333333333333333333333333",
718        );
719        let confirmed = record(
720            4,
721            10,
722            "0x4444444444444444444444444444444444444444444444444444444444444444",
723        );
724        let attempts = vec![attempt(
725            "0xcccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc",
726        )];
727
728        db.handler
729            .record_submission_sync(&pending, &attempts)
730            .expect("record pending");
731        db.handler
732            .record_submission_sync(&confirmed, &[])
733            .expect("record confirmed seed");
734        db.handler
735            .update_submission_status_sync(
736                &confirmed.submitter,
737                confirmed.nonce,
738                SubmissionStatus::Confirmed,
739                confirmed.primary_tx_hash.as_deref(),
740                None,
741            )
742            .expect("mark confirmed");
743
744        let rows = db
745            .handler
746            .list_pending_submissions_sync(0, 10)
747            .expect("pending submissions");
748
749        assert_eq!(rows.len(), 1);
750        assert_eq!(rows[0].submitter, pending.submitter);
751        assert_eq!(rows[0].nonce, pending.nonce);
752        assert_eq!(rows[0].tx_hashes, vec![attempts[0].tx_hash.clone()]);
753    }
754
755    #[tokio::test]
756    async fn sync_list_pending_submissions_excludes_created_rows_without_broadcast_attempts() {
757        let db = TestDb::new().await.expect("test db");
758        let created = SubmissionRecord {
759            submitter: submitter(3),
760            nonce: 17,
761            status: SubmissionStatus::Created,
762            primary_tx_hash: None,
763            terminal_error: None,
764        };
765
766        db.handler
767            .record_submission_sync(&created, &[])
768            .expect("record created submission");
769
770        let rows = db
771            .handler
772            .list_pending_submissions_sync(0, 10)
773            .expect("pending submissions");
774
775        assert!(rows.is_empty());
776    }
777
778    #[tokio::test]
779    async fn async_record_submission_persists_submission_and_attempts() {
780        let db = TestDb::new().await.expect("test db");
781        let diesel_db = db.diesel_db().await;
782        let record = record(
783            5,
784            11,
785            "0x5555555555555555555555555555555555555555555555555555555555555555",
786        );
787        let attempts = vec![attempt(
788            "0xdddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddd",
789        )];
790
791        diesel_db
792            .record_submission(&record, &attempts)
793            .await
794            .expect("record submission");
795
796        let detail = diesel_db
797            .get_submission_by_nonce(&record.submitter, record.nonce)
798            .await
799            .expect("submission detail")
800            .expect("submission exists");
801
802        assert_eq!(detail.status, SubmissionStatus::Pending);
803        assert_eq!(detail.attempts, attempts);
804    }
805
806    #[tokio::test]
807    async fn async_max_nonce_for_submitter_returns_highest_nonce_for_address() {
808        let db = TestDb::new().await.expect("test db");
809        let diesel_db = db.diesel_db().await;
810        let submitter = submitter(5);
811
812        let empty = diesel_db
813            .max_nonce_for_submitter(&submitter)
814            .await
815            .expect("empty max nonce lookup");
816        assert_eq!(empty, None);
817
818        diesel_db
819            .record_submission(
820                &record(
821                    5,
822                    4,
823                    "0x0404040404040404040404040404040404040404040404040404040404040404",
824                ),
825                &[],
826            )
827            .await
828            .expect("record lower nonce");
829        diesel_db
830            .record_submission(
831                &record(
832                    5,
833                    12,
834                    "0x1212121212121212121212121212121212121212121212121212121212121212",
835                ),
836                &[],
837            )
838            .await
839            .expect("record higher nonce");
840        diesel_db
841            .record_submission(
842                &record(
843                    10,
844                    100,
845                    "0x1010101010101010101010101010101010101010101010101010101010101010",
846                ),
847                &[],
848            )
849            .await
850            .expect("record different submitter");
851
852        let max_nonce = diesel_db
853            .max_nonce_for_submitter(&submitter)
854            .await
855            .expect("max nonce lookup");
856        assert_eq!(max_nonce, Some(12));
857    }
858
859    #[tokio::test]
860    async fn async_max_nonce_for_submitter_ignores_unbroadcast_rows() {
861        let db = TestDb::new().await.expect("test db");
862        let diesel_db = db.diesel_db().await;
863        let submitter = submitter(5);
864
865        diesel_db
866            .record_submission(
867                &record(
868                    5,
869                    4,
870                    "0x0404040404040404040404040404040404040404040404040404040404040404",
871                ),
872                &[],
873            )
874            .await
875            .expect("record broadcast nonce");
876        diesel_db
877            .record_submission(
878                &SubmissionRecord {
879                    submitter,
880                    nonce: 12,
881                    status: SubmissionStatus::Created,
882                    primary_tx_hash: None,
883                    terminal_error: None,
884                },
885                &[],
886            )
887            .await
888            .expect("record created nonce");
889        diesel_db
890            .update_submission_status(
891                &submitter,
892                12,
893                SubmissionStatus::Failed,
894                None,
895                Some("rpc rejected before broadcast"),
896            )
897            .await
898            .expect("mark unbroadcast nonce failed");
899
900        let max_nonce = diesel_db
901            .max_nonce_for_submitter(&submitter)
902            .await
903            .expect("max nonce lookup");
904        assert_eq!(max_nonce, Some(4));
905    }
906
907    #[tokio::test]
908    async fn async_update_submission_status_updates_terminal_state_without_clearing_hash() {
909        let db = TestDb::new().await.expect("test db");
910        let diesel_db = db.diesel_db().await;
911        let record = record(
912            6,
913            12,
914            "0x6666666666666666666666666666666666666666666666666666666666666666",
915        );
916        diesel_db
917            .record_submission(&record, &[])
918            .await
919            .expect("record submission");
920
921        diesel_db
922            .update_submission_status(
923                &record.submitter,
924                record.nonce,
925                SubmissionStatus::Failed,
926                None,
927                Some("timeout"),
928            )
929            .await
930            .expect("update submission");
931
932        let detail = diesel_db
933            .get_submission_by_nonce(&record.submitter, record.nonce)
934            .await
935            .expect("submission detail")
936            .expect("submission exists");
937
938        assert_eq!(detail.status, SubmissionStatus::Failed);
939        assert_eq!(detail.primary_tx_hash, record.primary_tx_hash);
940        assert_eq!(detail.terminal_error.as_deref(), Some("timeout"));
941    }
942
943    #[tokio::test]
944    async fn async_update_submission_status_errors_for_missing_nonce() {
945        let db = TestDb::new().await.expect("test db");
946        let diesel_db = db.diesel_db().await;
947        let submitter = submitter(6);
948
949        let err = diesel_db
950            .update_submission_status(
951                &submitter,
952                405,
953                SubmissionStatus::Failed,
954                None,
955                Some("missing"),
956            )
957            .await
958            .expect_err("missing submission update must fail");
959
960        assert!(
961            err.to_string()
962                .contains("expected one transaction_submitter_submissions row"),
963            "{err}"
964        );
965    }
966
967    #[tokio::test]
968    async fn async_list_pending_submissions_returns_pending_rows_with_attempts() {
969        let db = TestDb::new().await.expect("test db");
970        let diesel_db = db.diesel_db().await;
971        let pending = record(
972            7,
973            13,
974            "0x7777777777777777777777777777777777777777777777777777777777777777",
975        );
976        let attempts = vec![attempt(
977            "0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee",
978        )];
979
980        diesel_db
981            .record_submission(&pending, &attempts)
982            .await
983            .expect("record pending");
984
985        let rows = diesel_db
986            .list_pending_submissions(0, 10)
987            .await
988            .expect("pending submissions");
989
990        assert_eq!(rows.len(), 1);
991        assert_eq!(rows[0].submitter, pending.submitter);
992        assert_eq!(rows[0].nonce, pending.nonce);
993        assert_eq!(rows[0].tx_hashes, vec![attempts[0].tx_hash.clone()]);
994    }
995
996    #[tokio::test]
997    async fn get_submission_by_nonce_returns_none_for_unknown_nonce() {
998        let db = TestDb::new().await.expect("test db");
999        let diesel_db = db.diesel_db().await;
1000
1001        let detail = diesel_db
1002            .get_submission_by_nonce(&submitter(8), 14)
1003            .await
1004            .expect("lookup unknown submission");
1005
1006        assert!(detail.is_none());
1007    }
1008}