Skip to main content

hypercall_db_diesel/
nonces.rs

1//! NonceReader + NonceWriter trait implementations for DieselDb.
2//!
3//! Manages RSM signer nonces and request slots with advisory locks
4//! for exactly-once transaction sequencing.
5
6use diesel::prelude::*;
7use diesel_async::{AsyncConnection, RunQueryDsl};
8
9use crate::diesel_db::DieselDb;
10use crate::models::NewRsmSignerNonce;
11use crate::schema;
12use hypercall_types::WalletAddress;
13
14// =============================================================================
15// NonceReader (async, diesel-async)
16// =============================================================================
17
18#[async_trait::async_trait]
19impl hypercall_db::NonceReader for DieselDb {
20    async fn get_rsm_signer_nonce(
21        &self,
22        signer: &WalletAddress,
23    ) -> anyhow::Result<Option<hypercall_db::RsmSignerNonceRecord>> {
24        use schema::rsm_signer_nonces;
25        let mut conn = self.get_conn().await?;
26        let result = rsm_signer_nonces::table
27            .filter(rsm_signer_nonces::signer_address.eq(signer))
28            .first::<crate::models::RsmSignerNonceRecord>(&mut conn)
29            .await
30            .optional()?;
31        Ok(result.map(Into::into))
32    }
33}
34
35// =============================================================================
36// NonceWriter (async, diesel-async)
37// =============================================================================
38
39#[async_trait::async_trait]
40impl hypercall_db::NonceWriter for DieselDb {
41    async fn save_rsm_signer_nonce(
42        &self,
43        record: &hypercall_db::RsmSignerNonceRecord,
44    ) -> anyhow::Result<()> {
45        use schema::rsm_signer_nonces;
46        let diesel_nonce = NewRsmSignerNonce {
47            signer_address: record.signer_address,
48            next_nonce: record.next_nonce,
49            last_synced_nonce: record.last_synced_nonce,
50        };
51        let mut conn = self.get_conn().await?;
52        diesel::insert_into(rsm_signer_nonces::table)
53            .values(&diesel_nonce)
54            .on_conflict(rsm_signer_nonces::signer_address)
55            .do_update()
56            .set(&diesel_nonce)
57            .execute(&mut conn)
58            .await?;
59        Ok(())
60    }
61
62    #[cfg(feature = "test-utils")]
63    async fn reserve_next_rsm_signer_nonce(
64        &self,
65        signer: &WalletAddress,
66        initial_nonce: u64,
67    ) -> anyhow::Result<u64> {
68        use diesel::sql_types::BigInt;
69
70        #[derive(QueryableByName)]
71        struct ReservedNonce {
72            #[diesel(sql_type = BigInt)]
73            reserved_nonce: i64,
74        }
75
76        let initial_nonce_i64 =
77            i64::try_from(initial_nonce).map_err(|_| anyhow::anyhow!("initial nonce overflow"))?;
78        let mut conn = self.get_conn().await?;
79        conn.transaction(async |conn| {
80                let signer = *signer;
81                // Ensure the nonce row exists (separate statement so it's visible below)
82                diesel::sql_query(
83                    "INSERT INTO rsm_signer_nonces (signer_address, next_nonce, last_synced_nonce, created_at, updated_at) \
84                     VALUES ($1, $2, $2, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) \
85                     ON CONFLICT (signer_address) DO NOTHING",
86                )
87                .bind::<diesel::sql_types::Binary, _>(&signer)
88                .bind::<BigInt, _>(initial_nonce_i64)
89                .execute(&mut *conn)
90                .await?;
91
92                // Atomically reserve the next nonce
93                let reserved: ReservedNonce = diesel::sql_query(
94                    "UPDATE rsm_signer_nonces SET next_nonce = next_nonce + 1, updated_at = CURRENT_TIMESTAMP \
95                     WHERE signer_address = $1 \
96                     RETURNING (next_nonce - 1) AS reserved_nonce",
97                )
98                .bind::<diesel::sql_types::Binary, _>(&signer)
99                .get_result(&mut *conn)
100                .await?;
101                u64::try_from(reserved.reserved_nonce)
102                    .map_err(|_| anyhow::anyhow!("negative reserved nonce"))
103        })
104        .await
105    }
106
107    async fn claim_rsm_signer_request(
108        &self,
109        request_id: &str,
110        signer: &WalletAddress,
111        account: &WalletAddress,
112        action: &[u8],
113        initial_nonce: u64,
114    ) -> anyhow::Result<hypercall_db::RsmSignerRequestClaim> {
115        use diesel::sql_types::{BigInt, Text};
116        use hypercall_db::RsmSignerRequestClaim;
117
118        #[derive(QueryableByName)]
119        struct RsmRow {
120            #[diesel(sql_type = Text)]
121            request_id: String,
122            #[diesel(sql_type = diesel::sql_types::Binary)]
123            signer_address: WalletAddress,
124            #[diesel(sql_type = diesel::sql_types::Binary)]
125            account_address: WalletAddress,
126            #[diesel(sql_type = diesel::sql_types::Binary)]
127            action: Vec<u8>,
128            #[diesel(sql_type = BigInt)]
129            nonce: i64,
130            #[diesel(sql_type = Text)]
131            status: String,
132            #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Binary>)]
133            directive: Option<Vec<u8>>,
134            #[diesel(sql_type = diesel::sql_types::Nullable<Text>)]
135            signature: Option<String>,
136            #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Timestamp>)]
137            created_at: Option<chrono::NaiveDateTime>,
138            #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Timestamp>)]
139            updated_at: Option<chrono::NaiveDateTime>,
140        }
141
142        fn row_to_domain_record(row: RsmRow) -> hypercall_db::RsmSignerRequestRecord {
143            hypercall_db::RsmSignerRequestRecord {
144                request_id: row.request_id,
145                signer_address: row.signer_address,
146                account_address: row.account_address,
147                action: row.action,
148                nonce: row.nonce,
149                status: row.status,
150                directive: row.directive,
151                signature: row.signature,
152                created_at: row.created_at,
153                updated_at: row.updated_at,
154            }
155        }
156
157        #[derive(QueryableByName)]
158        struct ReservedNonce {
159            #[diesel(sql_type = BigInt)]
160            reserved_nonce: i64,
161        }
162
163        let mut conn = self.get_conn().await?;
164        conn.transaction(async |conn| {
165                let request_id = request_id.to_string();
166                let signer_address = *signer;
167                let account = *account;
168                let action = action.to_vec();
169
170                diesel::sql_query("SELECT pg_advisory_xact_lock(hashtext($1))")
171                    .bind::<Text, _>(&request_id)
172                    .execute(&mut *conn)
173                    .await?;
174
175                let existing = diesel::sql_query(
176                    "SELECT request_id, signer_address, account_address, action, nonce, status, directive, signature, created_at, updated_at FROM rsm_signer_requests WHERE request_id = $1 FOR UPDATE",
177                )
178                .bind::<Text, _>(&request_id)
179                .get_result::<RsmRow>(&mut *conn)
180                .await
181                .optional()?;
182
183                if let Some(row) = existing {
184                    if row.signer_address != signer_address || row.account_address != account || row.action != action {
185                        return Ok(RsmSignerRequestClaim::Conflict {
186                            message: format!("request_id {} reused with different params", request_id),
187                        });
188                    }
189                    let nonce = u64::try_from(row.nonce).map_err(|_| anyhow::anyhow!("negative nonce"))?;
190                    return match row.status.as_str() {
191                        "pending" => Ok(RsmSignerRequestClaim::Pending { nonce }),
192                        "completed" => Ok(RsmSignerRequestClaim::Completed(row_to_domain_record(row))),
193                        other => panic!("STATE_CORRUPTION: RSM request {} has status {}", request_id, other),
194                    };
195                }
196
197                let initial_nonce_i64 = i64::try_from(initial_nonce).map_err(|_| anyhow::anyhow!("initial nonce overflow"))?;
198                // Ensure the nonce row exists (separate statement so it's visible below)
199                diesel::sql_query(
200                    "INSERT INTO rsm_signer_nonces (signer_address, next_nonce, last_synced_nonce, created_at, updated_at) \
201                     VALUES ($1, $2, $2, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) \
202                     ON CONFLICT (signer_address) DO NOTHING",
203                )
204                .bind::<diesel::sql_types::Binary, _>(&signer_address)
205                .bind::<BigInt, _>(initial_nonce_i64)
206                .execute(&mut *conn)
207                .await?;
208
209                // Atomically reserve the next nonce
210                let reserved: ReservedNonce = diesel::sql_query(
211                    "UPDATE rsm_signer_nonces SET next_nonce = next_nonce + 1, updated_at = CURRENT_TIMESTAMP \
212                     WHERE signer_address = $1 \
213                     RETURNING (next_nonce - 1) AS reserved_nonce",
214                )
215                .bind::<diesel::sql_types::Binary, _>(&signer_address)
216                .get_result(&mut *conn)
217                .await?;
218
219                let reserved_nonce = u64::try_from(reserved.reserved_nonce).map_err(|_| anyhow::anyhow!("negative reserved nonce"))?;
220                let reserved_nonce_i64 = reserved.reserved_nonce;
221
222                diesel::sql_query(
223                    "INSERT INTO rsm_signer_requests (request_id, signer_address, account_address, action, nonce, status, created_at, updated_at) VALUES ($1, $2, $3, $4, $5, 'pending', CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)",
224                )
225                .bind::<Text, _>(&request_id)
226                .bind::<diesel::sql_types::Binary, _>(&signer_address)
227                .bind::<diesel::sql_types::Binary, _>(&account)
228                .bind::<diesel::sql_types::Binary, _>(&action)
229                .bind::<BigInt, _>(reserved_nonce_i64)
230                .execute(&mut *conn)
231                .await?;
232
233                Ok(RsmSignerRequestClaim::Pending { nonce: reserved_nonce })
234        })
235        .await
236    }
237
238    async fn complete_rsm_signer_request(
239        &self,
240        request_id: &str,
241        directive: &[u8],
242        signature: &str,
243    ) -> anyhow::Result<hypercall_db::RsmSignerRequestRecord> {
244        use diesel::sql_types::Text;
245
246        #[derive(QueryableByName)]
247        struct RsmRow {
248            #[diesel(sql_type = Text)]
249            request_id: String,
250            #[diesel(sql_type = diesel::sql_types::Binary)]
251            signer_address: WalletAddress,
252            #[diesel(sql_type = diesel::sql_types::Binary)]
253            account_address: WalletAddress,
254            #[diesel(sql_type = diesel::sql_types::Binary)]
255            action: Vec<u8>,
256            #[diesel(sql_type = diesel::sql_types::BigInt)]
257            nonce: i64,
258            #[diesel(sql_type = Text)]
259            status: String,
260            #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Binary>)]
261            directive: Option<Vec<u8>>,
262            #[diesel(sql_type = diesel::sql_types::Nullable<Text>)]
263            signature: Option<String>,
264            #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Timestamp>)]
265            created_at: Option<chrono::NaiveDateTime>,
266            #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Timestamp>)]
267            updated_at: Option<chrono::NaiveDateTime>,
268        }
269
270        fn to_record(r: RsmRow) -> hypercall_db::RsmSignerRequestRecord {
271            hypercall_db::RsmSignerRequestRecord {
272                request_id: r.request_id,
273                signer_address: r.signer_address,
274                account_address: r.account_address,
275                action: r.action,
276                nonce: r.nonce,
277                status: r.status,
278                directive: r.directive,
279                signature: r.signature,
280                created_at: r.created_at,
281                updated_at: r.updated_at,
282            }
283        }
284
285        let request_id_owned = request_id.to_string();
286        let directive_owned = directive.to_vec();
287        let signature_owned = signature.to_string();
288
289        let mut conn = self.get_conn().await?;
290        conn.transaction(async |conn| {
291                let row = diesel::sql_query(
292                    "SELECT request_id, signer_address, account_address, action, nonce, status, directive, signature, created_at, updated_at FROM rsm_signer_requests WHERE request_id = $1 FOR UPDATE",
293                )
294                .bind::<Text, _>(&request_id_owned)
295                .get_result::<RsmRow>(&mut *conn)
296                .await
297                .optional()?
298                .ok_or_else(|| anyhow::anyhow!("missing RSM signer request {}", request_id_owned))?;
299
300                match row.status.as_str() {
301                    "pending" => {}
302                    "completed" => {
303                        if row.directive.as_deref() == Some(&directive_owned) && row.signature.as_deref() == Some(&signature_owned) {
304                            return Ok(to_record(row));
305                        }
306                        panic!("STATE_CORRUPTION: completed RSM request {} completed twice with different data", request_id_owned);
307                    }
308                    other => panic!("STATE_CORRUPTION: RSM request {} has status {}", request_id_owned, other),
309                }
310
311                let updated = diesel::sql_query(
312                    "UPDATE rsm_signer_requests SET status = 'completed', directive = $2, signature = $3, updated_at = CURRENT_TIMESTAMP WHERE request_id = $1 RETURNING request_id, signer_address, account_address, action, nonce, status, directive, signature, created_at, updated_at",
313                )
314                .bind::<Text, _>(&request_id_owned)
315                .bind::<diesel::sql_types::Binary, _>(&directive_owned)
316                .bind::<Text, _>(&signature_owned)
317                .get_result::<RsmRow>(&mut *conn)
318                .await?;
319
320                Ok(to_record(updated))
321        })
322        .await
323    }
324}
325
326#[cfg(test)]
327mod tests {
328    use crate::test_helpers::TestDb;
329    use hypercall_db::*;
330    use hypercall_types::wallet_address::test_wallet;
331
332    #[tokio::test]
333    async fn nonce_empty_db_returns_none() {
334        let test_db = TestDb::new().await.unwrap();
335        let db = test_db.diesel_db().await;
336        let nonce_reader: &dyn NonceReader = &db;
337        let result = nonce_reader
338            .get_rsm_signer_nonce(&test_wallet(50))
339            .await
340            .unwrap();
341        assert!(result.is_none());
342    }
343
344    #[tokio::test]
345    async fn nonce_save_and_read_roundtrip() {
346        let test_db = TestDb::new().await.unwrap();
347        let db = test_db.diesel_db().await;
348        let signer = test_wallet(51);
349
350        let record = RsmSignerNonceRecord {
351            signer_address: signer,
352            next_nonce: 42,
353            last_synced_nonce: Some(40),
354            created_at: None,
355            updated_at: None,
356        };
357
358        let nonce_writer: &dyn NonceWriter = &db;
359        nonce_writer.save_rsm_signer_nonce(&record).await.unwrap();
360
361        let nonce_reader: &dyn NonceReader = &db;
362        let loaded = nonce_reader
363            .get_rsm_signer_nonce(&signer)
364            .await
365            .unwrap()
366            .expect("nonce should exist after save");
367        assert_eq!(loaded.signer_address, signer);
368        assert_eq!(loaded.next_nonce, 42);
369        assert_eq!(loaded.last_synced_nonce, Some(40));
370    }
371
372    #[tokio::test]
373    async fn claim_rsm_signer_request_new() {
374        let test_db = TestDb::new().await.unwrap();
375        let db = test_db.diesel_db().await;
376        let signer = test_wallet(52);
377        let account = test_wallet(53);
378        let action = b"place_order_action_bytes";
379
380        let nonce_writer: &dyn NonceWriter = &db;
381        let result = nonce_writer
382            .claim_rsm_signer_request("req-001", &signer, &account, action, 100)
383            .await
384            .unwrap();
385
386        match result {
387            RsmSignerRequestClaim::Pending { nonce } => {
388                // Initial nonce is 100, so the first reserved should be 100
389                assert_eq!(nonce, 100);
390            }
391            other => panic!("Expected Pending, got {:?}", other),
392        }
393    }
394
395    #[tokio::test]
396    async fn claim_rsm_signer_request_duplicate_is_idempotent() {
397        let test_db = TestDb::new().await.unwrap();
398        let db = test_db.diesel_db().await;
399        let signer = test_wallet(54);
400        let account = test_wallet(55);
401        let action = b"some_action";
402
403        let nonce_writer: &dyn NonceWriter = &db;
404
405        // First claim
406        let result1 = nonce_writer
407            .claim_rsm_signer_request("req-dup", &signer, &account, action, 0)
408            .await
409            .unwrap();
410        let nonce1 = match result1 {
411            RsmSignerRequestClaim::Pending { nonce } => nonce,
412            other => panic!("Expected Pending, got {:?}", other),
413        };
414
415        // Second claim with same params - should return same nonce
416        let result2 = nonce_writer
417            .claim_rsm_signer_request("req-dup", &signer, &account, action, 0)
418            .await
419            .unwrap();
420        let nonce2 = match result2 {
421            RsmSignerRequestClaim::Pending { nonce } => nonce,
422            other => panic!("Expected Pending, got {:?}", other),
423        };
424
425        assert_eq!(nonce1, nonce2);
426    }
427
428    #[tokio::test]
429    async fn complete_rsm_signer_request_roundtrip() {
430        let test_db = TestDb::new().await.unwrap();
431        let db = test_db.diesel_db().await;
432        let signer = test_wallet(56);
433        let account = test_wallet(57);
434        let action = b"action_data";
435        let directive = b"directive_bytes";
436        let signature = "0xdeadbeef";
437
438        let nonce_writer: &dyn NonceWriter = &db;
439
440        // Claim the request
441        let claim = nonce_writer
442            .claim_rsm_signer_request("req-complete", &signer, &account, action, 5)
443            .await
444            .unwrap();
445        assert!(matches!(claim, RsmSignerRequestClaim::Pending { .. }));
446
447        // Complete the request
448        let record = nonce_writer
449            .complete_rsm_signer_request("req-complete", directive, signature)
450            .await
451            .unwrap();
452
453        assert_eq!(record.request_id, "req-complete");
454        assert_eq!(record.signer_address, signer);
455        assert_eq!(record.account_address, account);
456        assert_eq!(record.action, action);
457        assert_eq!(record.status, "completed");
458        assert_eq!(record.directive.as_deref(), Some(directive.as_slice()));
459        assert_eq!(record.signature.as_deref(), Some(signature));
460    }
461
462    #[cfg(feature = "test-utils")]
463    #[tokio::test]
464    async fn reserve_next_rsm_signer_nonce_increments() {
465        let test_db = TestDb::new().await.unwrap();
466        let db = test_db.diesel_db().await;
467        let signer = test_wallet(58);
468
469        let nonce_writer: &dyn NonceWriter = &db;
470
471        // First reservation with initial_nonce=10 should return 10
472        let n1 = nonce_writer
473            .reserve_next_rsm_signer_nonce(&signer, 10)
474            .await
475            .unwrap();
476        assert_eq!(n1, 10);
477
478        // Second reservation should return 11 (incremented)
479        let n2 = nonce_writer
480            .reserve_next_rsm_signer_nonce(&signer, 10)
481            .await
482            .unwrap();
483        assert_eq!(n2, 11);
484
485        // Third should return 12
486        let n3 = nonce_writer
487            .reserve_next_rsm_signer_nonce(&signer, 10)
488            .await
489            .unwrap();
490        assert_eq!(n3, 12);
491    }
492}