1use diesel::prelude::*;
7use diesel_async::{AsyncConnection, RunQueryDsl};
8
9use crate::diesel_db::DieselDb;
10use crate::models::NewRsmSignerNonce;
11use crate::schema;
12use hypercall_types::WalletAddress;
13
14#[async_trait::async_trait]
19impl hypercall_db::NonceReader for DieselDb {
20 async fn get_rsm_signer_nonce(
21 &self,
22 signer: &WalletAddress,
23 ) -> anyhow::Result<Option<hypercall_db::RsmSignerNonceRecord>> {
24 use schema::rsm_signer_nonces;
25 let mut conn = self.get_conn().await?;
26 let result = rsm_signer_nonces::table
27 .filter(rsm_signer_nonces::signer_address.eq(signer))
28 .first::<crate::models::RsmSignerNonceRecord>(&mut conn)
29 .await
30 .optional()?;
31 Ok(result.map(Into::into))
32 }
33}
34
35#[async_trait::async_trait]
40impl hypercall_db::NonceWriter for DieselDb {
41 async fn save_rsm_signer_nonce(
42 &self,
43 record: &hypercall_db::RsmSignerNonceRecord,
44 ) -> anyhow::Result<()> {
45 use schema::rsm_signer_nonces;
46 let diesel_nonce = NewRsmSignerNonce {
47 signer_address: record.signer_address,
48 next_nonce: record.next_nonce,
49 last_synced_nonce: record.last_synced_nonce,
50 };
51 let mut conn = self.get_conn().await?;
52 diesel::insert_into(rsm_signer_nonces::table)
53 .values(&diesel_nonce)
54 .on_conflict(rsm_signer_nonces::signer_address)
55 .do_update()
56 .set(&diesel_nonce)
57 .execute(&mut conn)
58 .await?;
59 Ok(())
60 }
61
62 #[cfg(feature = "test-utils")]
63 async fn reserve_next_rsm_signer_nonce(
64 &self,
65 signer: &WalletAddress,
66 initial_nonce: u64,
67 ) -> anyhow::Result<u64> {
68 use diesel::sql_types::BigInt;
69
70 #[derive(QueryableByName)]
71 struct ReservedNonce {
72 #[diesel(sql_type = BigInt)]
73 reserved_nonce: i64,
74 }
75
76 let initial_nonce_i64 =
77 i64::try_from(initial_nonce).map_err(|_| anyhow::anyhow!("initial nonce overflow"))?;
78 let mut conn = self.get_conn().await?;
79 conn.transaction(async |conn| {
80 let signer = *signer;
81 diesel::sql_query(
83 "INSERT INTO rsm_signer_nonces (signer_address, next_nonce, last_synced_nonce, created_at, updated_at) \
84 VALUES ($1, $2, $2, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) \
85 ON CONFLICT (signer_address) DO NOTHING",
86 )
87 .bind::<diesel::sql_types::Binary, _>(&signer)
88 .bind::<BigInt, _>(initial_nonce_i64)
89 .execute(&mut *conn)
90 .await?;
91
92 let reserved: ReservedNonce = diesel::sql_query(
94 "UPDATE rsm_signer_nonces SET next_nonce = next_nonce + 1, updated_at = CURRENT_TIMESTAMP \
95 WHERE signer_address = $1 \
96 RETURNING (next_nonce - 1) AS reserved_nonce",
97 )
98 .bind::<diesel::sql_types::Binary, _>(&signer)
99 .get_result(&mut *conn)
100 .await?;
101 u64::try_from(reserved.reserved_nonce)
102 .map_err(|_| anyhow::anyhow!("negative reserved nonce"))
103 })
104 .await
105 }
106
107 async fn claim_rsm_signer_request(
108 &self,
109 request_id: &str,
110 signer: &WalletAddress,
111 account: &WalletAddress,
112 action: &[u8],
113 initial_nonce: u64,
114 ) -> anyhow::Result<hypercall_db::RsmSignerRequestClaim> {
115 use diesel::sql_types::{BigInt, Text};
116 use hypercall_db::RsmSignerRequestClaim;
117
118 #[derive(QueryableByName)]
119 struct RsmRow {
120 #[diesel(sql_type = Text)]
121 request_id: String,
122 #[diesel(sql_type = diesel::sql_types::Binary)]
123 signer_address: WalletAddress,
124 #[diesel(sql_type = diesel::sql_types::Binary)]
125 account_address: WalletAddress,
126 #[diesel(sql_type = diesel::sql_types::Binary)]
127 action: Vec<u8>,
128 #[diesel(sql_type = BigInt)]
129 nonce: i64,
130 #[diesel(sql_type = Text)]
131 status: String,
132 #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Binary>)]
133 directive: Option<Vec<u8>>,
134 #[diesel(sql_type = diesel::sql_types::Nullable<Text>)]
135 signature: Option<String>,
136 #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Timestamp>)]
137 created_at: Option<chrono::NaiveDateTime>,
138 #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Timestamp>)]
139 updated_at: Option<chrono::NaiveDateTime>,
140 }
141
142 fn row_to_domain_record(row: RsmRow) -> hypercall_db::RsmSignerRequestRecord {
143 hypercall_db::RsmSignerRequestRecord {
144 request_id: row.request_id,
145 signer_address: row.signer_address,
146 account_address: row.account_address,
147 action: row.action,
148 nonce: row.nonce,
149 status: row.status,
150 directive: row.directive,
151 signature: row.signature,
152 created_at: row.created_at,
153 updated_at: row.updated_at,
154 }
155 }
156
157 #[derive(QueryableByName)]
158 struct ReservedNonce {
159 #[diesel(sql_type = BigInt)]
160 reserved_nonce: i64,
161 }
162
163 let mut conn = self.get_conn().await?;
164 conn.transaction(async |conn| {
165 let request_id = request_id.to_string();
166 let signer_address = *signer;
167 let account = *account;
168 let action = action.to_vec();
169
170 diesel::sql_query("SELECT pg_advisory_xact_lock(hashtext($1))")
171 .bind::<Text, _>(&request_id)
172 .execute(&mut *conn)
173 .await?;
174
175 let existing = diesel::sql_query(
176 "SELECT request_id, signer_address, account_address, action, nonce, status, directive, signature, created_at, updated_at FROM rsm_signer_requests WHERE request_id = $1 FOR UPDATE",
177 )
178 .bind::<Text, _>(&request_id)
179 .get_result::<RsmRow>(&mut *conn)
180 .await
181 .optional()?;
182
183 if let Some(row) = existing {
184 if row.signer_address != signer_address || row.account_address != account || row.action != action {
185 return Ok(RsmSignerRequestClaim::Conflict {
186 message: format!("request_id {} reused with different params", request_id),
187 });
188 }
189 let nonce = u64::try_from(row.nonce).map_err(|_| anyhow::anyhow!("negative nonce"))?;
190 return match row.status.as_str() {
191 "pending" => Ok(RsmSignerRequestClaim::Pending { nonce }),
192 "completed" => Ok(RsmSignerRequestClaim::Completed(row_to_domain_record(row))),
193 other => panic!("STATE_CORRUPTION: RSM request {} has status {}", request_id, other),
194 };
195 }
196
197 let initial_nonce_i64 = i64::try_from(initial_nonce).map_err(|_| anyhow::anyhow!("initial nonce overflow"))?;
198 diesel::sql_query(
200 "INSERT INTO rsm_signer_nonces (signer_address, next_nonce, last_synced_nonce, created_at, updated_at) \
201 VALUES ($1, $2, $2, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) \
202 ON CONFLICT (signer_address) DO NOTHING",
203 )
204 .bind::<diesel::sql_types::Binary, _>(&signer_address)
205 .bind::<BigInt, _>(initial_nonce_i64)
206 .execute(&mut *conn)
207 .await?;
208
209 let reserved: ReservedNonce = diesel::sql_query(
211 "UPDATE rsm_signer_nonces SET next_nonce = next_nonce + 1, updated_at = CURRENT_TIMESTAMP \
212 WHERE signer_address = $1 \
213 RETURNING (next_nonce - 1) AS reserved_nonce",
214 )
215 .bind::<diesel::sql_types::Binary, _>(&signer_address)
216 .get_result(&mut *conn)
217 .await?;
218
219 let reserved_nonce = u64::try_from(reserved.reserved_nonce).map_err(|_| anyhow::anyhow!("negative reserved nonce"))?;
220 let reserved_nonce_i64 = reserved.reserved_nonce;
221
222 diesel::sql_query(
223 "INSERT INTO rsm_signer_requests (request_id, signer_address, account_address, action, nonce, status, created_at, updated_at) VALUES ($1, $2, $3, $4, $5, 'pending', CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)",
224 )
225 .bind::<Text, _>(&request_id)
226 .bind::<diesel::sql_types::Binary, _>(&signer_address)
227 .bind::<diesel::sql_types::Binary, _>(&account)
228 .bind::<diesel::sql_types::Binary, _>(&action)
229 .bind::<BigInt, _>(reserved_nonce_i64)
230 .execute(&mut *conn)
231 .await?;
232
233 Ok(RsmSignerRequestClaim::Pending { nonce: reserved_nonce })
234 })
235 .await
236 }
237
238 async fn complete_rsm_signer_request(
239 &self,
240 request_id: &str,
241 directive: &[u8],
242 signature: &str,
243 ) -> anyhow::Result<hypercall_db::RsmSignerRequestRecord> {
244 use diesel::sql_types::Text;
245
246 #[derive(QueryableByName)]
247 struct RsmRow {
248 #[diesel(sql_type = Text)]
249 request_id: String,
250 #[diesel(sql_type = diesel::sql_types::Binary)]
251 signer_address: WalletAddress,
252 #[diesel(sql_type = diesel::sql_types::Binary)]
253 account_address: WalletAddress,
254 #[diesel(sql_type = diesel::sql_types::Binary)]
255 action: Vec<u8>,
256 #[diesel(sql_type = diesel::sql_types::BigInt)]
257 nonce: i64,
258 #[diesel(sql_type = Text)]
259 status: String,
260 #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Binary>)]
261 directive: Option<Vec<u8>>,
262 #[diesel(sql_type = diesel::sql_types::Nullable<Text>)]
263 signature: Option<String>,
264 #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Timestamp>)]
265 created_at: Option<chrono::NaiveDateTime>,
266 #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Timestamp>)]
267 updated_at: Option<chrono::NaiveDateTime>,
268 }
269
270 fn to_record(r: RsmRow) -> hypercall_db::RsmSignerRequestRecord {
271 hypercall_db::RsmSignerRequestRecord {
272 request_id: r.request_id,
273 signer_address: r.signer_address,
274 account_address: r.account_address,
275 action: r.action,
276 nonce: r.nonce,
277 status: r.status,
278 directive: r.directive,
279 signature: r.signature,
280 created_at: r.created_at,
281 updated_at: r.updated_at,
282 }
283 }
284
285 let request_id_owned = request_id.to_string();
286 let directive_owned = directive.to_vec();
287 let signature_owned = signature.to_string();
288
289 let mut conn = self.get_conn().await?;
290 conn.transaction(async |conn| {
291 let row = diesel::sql_query(
292 "SELECT request_id, signer_address, account_address, action, nonce, status, directive, signature, created_at, updated_at FROM rsm_signer_requests WHERE request_id = $1 FOR UPDATE",
293 )
294 .bind::<Text, _>(&request_id_owned)
295 .get_result::<RsmRow>(&mut *conn)
296 .await
297 .optional()?
298 .ok_or_else(|| anyhow::anyhow!("missing RSM signer request {}", request_id_owned))?;
299
300 match row.status.as_str() {
301 "pending" => {}
302 "completed" => {
303 if row.directive.as_deref() == Some(&directive_owned) && row.signature.as_deref() == Some(&signature_owned) {
304 return Ok(to_record(row));
305 }
306 panic!("STATE_CORRUPTION: completed RSM request {} completed twice with different data", request_id_owned);
307 }
308 other => panic!("STATE_CORRUPTION: RSM request {} has status {}", request_id_owned, other),
309 }
310
311 let updated = diesel::sql_query(
312 "UPDATE rsm_signer_requests SET status = 'completed', directive = $2, signature = $3, updated_at = CURRENT_TIMESTAMP WHERE request_id = $1 RETURNING request_id, signer_address, account_address, action, nonce, status, directive, signature, created_at, updated_at",
313 )
314 .bind::<Text, _>(&request_id_owned)
315 .bind::<diesel::sql_types::Binary, _>(&directive_owned)
316 .bind::<Text, _>(&signature_owned)
317 .get_result::<RsmRow>(&mut *conn)
318 .await?;
319
320 Ok(to_record(updated))
321 })
322 .await
323 }
324}
325
326#[cfg(test)]
327mod tests {
328 use crate::test_helpers::TestDb;
329 use hypercall_db::*;
330 use hypercall_types::wallet_address::test_wallet;
331
332 #[tokio::test]
333 async fn nonce_empty_db_returns_none() {
334 let test_db = TestDb::new().await.unwrap();
335 let db = test_db.diesel_db().await;
336 let nonce_reader: &dyn NonceReader = &db;
337 let result = nonce_reader
338 .get_rsm_signer_nonce(&test_wallet(50))
339 .await
340 .unwrap();
341 assert!(result.is_none());
342 }
343
344 #[tokio::test]
345 async fn nonce_save_and_read_roundtrip() {
346 let test_db = TestDb::new().await.unwrap();
347 let db = test_db.diesel_db().await;
348 let signer = test_wallet(51);
349
350 let record = RsmSignerNonceRecord {
351 signer_address: signer,
352 next_nonce: 42,
353 last_synced_nonce: Some(40),
354 created_at: None,
355 updated_at: None,
356 };
357
358 let nonce_writer: &dyn NonceWriter = &db;
359 nonce_writer.save_rsm_signer_nonce(&record).await.unwrap();
360
361 let nonce_reader: &dyn NonceReader = &db;
362 let loaded = nonce_reader
363 .get_rsm_signer_nonce(&signer)
364 .await
365 .unwrap()
366 .expect("nonce should exist after save");
367 assert_eq!(loaded.signer_address, signer);
368 assert_eq!(loaded.next_nonce, 42);
369 assert_eq!(loaded.last_synced_nonce, Some(40));
370 }
371
372 #[tokio::test]
373 async fn claim_rsm_signer_request_new() {
374 let test_db = TestDb::new().await.unwrap();
375 let db = test_db.diesel_db().await;
376 let signer = test_wallet(52);
377 let account = test_wallet(53);
378 let action = b"place_order_action_bytes";
379
380 let nonce_writer: &dyn NonceWriter = &db;
381 let result = nonce_writer
382 .claim_rsm_signer_request("req-001", &signer, &account, action, 100)
383 .await
384 .unwrap();
385
386 match result {
387 RsmSignerRequestClaim::Pending { nonce } => {
388 assert_eq!(nonce, 100);
390 }
391 other => panic!("Expected Pending, got {:?}", other),
392 }
393 }
394
395 #[tokio::test]
396 async fn claim_rsm_signer_request_duplicate_is_idempotent() {
397 let test_db = TestDb::new().await.unwrap();
398 let db = test_db.diesel_db().await;
399 let signer = test_wallet(54);
400 let account = test_wallet(55);
401 let action = b"some_action";
402
403 let nonce_writer: &dyn NonceWriter = &db;
404
405 let result1 = nonce_writer
407 .claim_rsm_signer_request("req-dup", &signer, &account, action, 0)
408 .await
409 .unwrap();
410 let nonce1 = match result1 {
411 RsmSignerRequestClaim::Pending { nonce } => nonce,
412 other => panic!("Expected Pending, got {:?}", other),
413 };
414
415 let result2 = nonce_writer
417 .claim_rsm_signer_request("req-dup", &signer, &account, action, 0)
418 .await
419 .unwrap();
420 let nonce2 = match result2 {
421 RsmSignerRequestClaim::Pending { nonce } => nonce,
422 other => panic!("Expected Pending, got {:?}", other),
423 };
424
425 assert_eq!(nonce1, nonce2);
426 }
427
428 #[tokio::test]
429 async fn complete_rsm_signer_request_roundtrip() {
430 let test_db = TestDb::new().await.unwrap();
431 let db = test_db.diesel_db().await;
432 let signer = test_wallet(56);
433 let account = test_wallet(57);
434 let action = b"action_data";
435 let directive = b"directive_bytes";
436 let signature = "0xdeadbeef";
437
438 let nonce_writer: &dyn NonceWriter = &db;
439
440 let claim = nonce_writer
442 .claim_rsm_signer_request("req-complete", &signer, &account, action, 5)
443 .await
444 .unwrap();
445 assert!(matches!(claim, RsmSignerRequestClaim::Pending { .. }));
446
447 let record = nonce_writer
449 .complete_rsm_signer_request("req-complete", directive, signature)
450 .await
451 .unwrap();
452
453 assert_eq!(record.request_id, "req-complete");
454 assert_eq!(record.signer_address, signer);
455 assert_eq!(record.account_address, account);
456 assert_eq!(record.action, action);
457 assert_eq!(record.status, "completed");
458 assert_eq!(record.directive.as_deref(), Some(directive.as_slice()));
459 assert_eq!(record.signature.as_deref(), Some(signature));
460 }
461
462 #[cfg(feature = "test-utils")]
463 #[tokio::test]
464 async fn reserve_next_rsm_signer_nonce_increments() {
465 let test_db = TestDb::new().await.unwrap();
466 let db = test_db.diesel_db().await;
467 let signer = test_wallet(58);
468
469 let nonce_writer: &dyn NonceWriter = &db;
470
471 let n1 = nonce_writer
473 .reserve_next_rsm_signer_nonce(&signer, 10)
474 .await
475 .unwrap();
476 assert_eq!(n1, 10);
477
478 let n2 = nonce_writer
480 .reserve_next_rsm_signer_nonce(&signer, 10)
481 .await
482 .unwrap();
483 assert_eq!(n2, 11);
484
485 let n3 = nonce_writer
487 .reserve_next_rsm_signer_nonce(&signer, 10)
488 .await
489 .unwrap();
490 assert_eq!(n3, 12);
491 }
492}