1use crate::engine_enums::{DirectiveActionKeyDb, DirectiveActionKeySql};
4use crate::{DatabaseHandler, DieselDb};
5use alloy::primitives::Address;
6use anyhow::Result;
7use async_trait::async_trait;
8use diesel::sql_types::{BigInt, Binary, Nullable, Text};
9use diesel::OptionalExtension;
10use diesel::RunQueryDsl;
11use diesel::{ExpressionMethods, QueryDsl};
12use hypercall_types::{TransactionStatus, WalletAddress};
13
14fn optional_nonnegative_i64_to_u64(value: Option<i64>, field: &str) -> Result<Option<u64>> {
15 value
16 .map(|v| u64::try_from(v).map_err(|_| anyhow::anyhow!("negative directive {field} {}", v)))
17 .transpose()
18}
19
20fn wallet_from_bytes(bytes: &[u8], label: &str) -> Result<WalletAddress> {
21 let arr: [u8; 20] = bytes.try_into().map_err(|_| {
22 anyhow::anyhow!(
23 "invalid directive {} address length {}, expected 20",
24 label,
25 bytes.len()
26 )
27 })?;
28 Ok(WalletAddress::from(arr))
29}
30
31fn address_from_bytes(bytes: &[u8], label: &str) -> Result<Address> {
32 let arr: [u8; 20] = bytes.try_into().map_err(|_| {
33 anyhow::anyhow!(
34 "invalid directive {} address length {}, expected 20",
35 label,
36 bytes.len()
37 )
38 })?;
39 Ok(Address::from(arr))
40}
41
42const DIRECTIVE_OUTBOX_DELIVERY_METRICS_SQL: &str = r#"
43SELECT
44 action_key::text AS action_key,
45 delivery_status,
46 COUNT(*)::BIGINT AS pending_count,
47 SUM(
48 CASE
49 WHEN delivery_attempts > 0 OR last_delivery_error IS NOT NULL THEN 1
50 ELSE 0
51 END
52 )::BIGINT AS retrying_count,
53 GREATEST(
54 MAX(EXTRACT(EPOCH FROM (CURRENT_TIMESTAMP - to_timestamp(created_ts_ms / 1000.0))))::BIGINT,
55 0
56 ) AS oldest_created_age_seconds,
57 GREATEST(
58 MAX(EXTRACT(EPOCH FROM (
59 CURRENT_TIMESTAMP - COALESCE(last_attempt_at, to_timestamp(created_ts_ms / 1000.0))
60 )))::BIGINT,
61 0
62 ) AS oldest_attempt_age_seconds
63FROM directive_outbox
64WHERE tx_hash IS NULL
65 AND delivery_status IN ('pending', 'broadcasted')
66GROUP BY action_key, delivery_status
67"#;
68
69#[async_trait]
70impl hypercall_db::AsyncDirectiveOutboxReader for DieselDb {
71 async fn directive_outbox_exists(&self, directive_id: &str) -> Result<bool> {
72 #[derive(diesel::QueryableByName)]
73 struct ExistsRow {
74 #[diesel(sql_type = diesel::sql_types::Bool)]
75 exists: bool,
76 }
77
78 let mut conn = self.get_conn().await?;
79 let query = diesel::sql_query(
80 "SELECT EXISTS (
81 SELECT 1 FROM directive_outbox WHERE directive_id = $1
82 ) AS exists",
83 )
84 .bind::<Text, _>(directive_id);
85 let row = diesel_async::RunQueryDsl::get_result::<ExistsRow>(query, &mut conn).await?;
86 Ok(row.exists)
87 }
88
89 async fn get_directive_status(
90 &self,
91 directive_id: &str,
92 ) -> Result<Option<hypercall_db::DirectiveStatusRow>> {
93 #[derive(diesel::QueryableByName)]
94 struct StatusRow {
95 #[diesel(sql_type = Text)]
96 directive_id: String,
97 #[diesel(sql_type = Text)]
98 action_key: String,
99 #[diesel(sql_type = Text)]
100 domain_status: String,
101 #[diesel(sql_type = Text)]
102 delivery_status: String,
103 #[diesel(sql_type = Nullable<Binary>)]
104 submitter_address: Option<Vec<u8>>,
105 #[diesel(sql_type = Nullable<BigInt>)]
106 submitter_nonce: Option<i64>,
107 #[diesel(sql_type = Nullable<Text>)]
108 tx_hash: Option<String>,
109 #[diesel(sql_type = Nullable<BigInt>)]
110 created_ts_ms: Option<i64>,
111 }
112
113 let mut conn = self.get_conn().await?;
114 let query = diesel::sql_query(
115 r#"
116 SELECT directive_id, action_key::text AS action_key, domain_status, delivery_status,
117 submitter_address, submitter_nonce, tx_hash, created_ts_ms
118 FROM directive_outbox
119 WHERE directive_id = $1
120 "#,
121 )
122 .bind::<Text, _>(directive_id);
123 let row = diesel_async::RunQueryDsl::get_result::<StatusRow>(query, &mut conn)
124 .await
125 .optional()?;
126
127 row.map(|r| {
128 Ok(hypercall_db::DirectiveStatusRow {
129 directive_id: r.directive_id,
130 action_key: r.action_key,
131 domain_status: r.domain_status,
132 delivery_status: r.delivery_status,
133 submitter_address: r
134 .submitter_address
135 .map(|bytes| address_from_bytes(&bytes, "submitter"))
136 .transpose()?,
137 submitter_nonce: optional_nonnegative_i64_to_u64(
138 r.submitter_nonce,
139 "submitter_nonce",
140 )?,
141 tx_hash: r.tx_hash,
142 created_ts_ms: r.created_ts_ms,
143 })
144 })
145 .transpose()
146 }
147
148 async fn get_withdrawal_history(
149 &self,
150 wallet: &WalletAddress,
151 limit: i64,
152 ) -> Result<Vec<hypercall_db::DirectiveStatusRow>> {
153 #[derive(diesel::QueryableByName)]
154 struct StatusRow {
155 #[diesel(sql_type = Text)]
156 directive_id: String,
157 #[diesel(sql_type = Text)]
158 action_key: String,
159 #[diesel(sql_type = Text)]
160 domain_status: String,
161 #[diesel(sql_type = Text)]
162 delivery_status: String,
163 #[diesel(sql_type = Nullable<Binary>)]
164 submitter_address: Option<Vec<u8>>,
165 #[diesel(sql_type = Nullable<BigInt>)]
166 submitter_nonce: Option<i64>,
167 #[diesel(sql_type = Nullable<Text>)]
168 tx_hash: Option<String>,
169 #[diesel(sql_type = Nullable<BigInt>)]
170 created_ts_ms: Option<i64>,
171 }
172
173 let mut conn = self.get_conn().await?;
174 let limit = limit.max(1);
175 let query = diesel::sql_query(
176 r#"
177 SELECT directive_id, action_key::text AS action_key, domain_status, delivery_status,
178 submitter_address, submitter_nonce, tx_hash, created_ts_ms
179 FROM directive_outbox d
180 JOIN engine_commands ec ON ec.command_id = d.command_id
181 WHERE d.action_key::text IN ('system_withdraw_token', 'system_credit_option')
182 AND (d.wallet_address = $1 OR d.account_address = $1)
183 AND ec.command_type_enum IN ('CashWithdrawalUpdate', 'OptionWithdrawalUpdate')
184 ORDER BY d.created_ts_ms DESC
185 LIMIT $2
186 "#,
187 )
188 .bind::<Binary, _>(wallet.as_bytes())
189 .bind::<BigInt, _>(limit);
190 let rows = diesel_async::RunQueryDsl::get_results::<StatusRow>(query, &mut conn).await?;
191
192 rows.into_iter()
193 .map(|r| {
194 Ok(hypercall_db::DirectiveStatusRow {
195 directive_id: r.directive_id,
196 action_key: r.action_key,
197 domain_status: r.domain_status,
198 delivery_status: r.delivery_status,
199 submitter_address: r
200 .submitter_address
201 .map(|bytes| address_from_bytes(&bytes, "submitter"))
202 .transpose()?,
203 submitter_nonce: optional_nonnegative_i64_to_u64(
204 r.submitter_nonce,
205 "submitter_nonce",
206 )?,
207 tx_hash: r.tx_hash,
208 created_ts_ms: r.created_ts_ms,
209 })
210 })
211 .collect()
212 }
213
214 async fn list_directive_outbox_delivery_metrics(
215 &self,
216 ) -> Result<Vec<hypercall_db::DirectiveOutboxDeliveryMetricsRow>> {
217 #[derive(diesel::QueryableByName)]
218 struct MetricsRow {
219 #[diesel(sql_type = Text)]
220 action_key: String,
221 #[diesel(sql_type = Text)]
222 delivery_status: String,
223 #[diesel(sql_type = BigInt)]
224 pending_count: i64,
225 #[diesel(sql_type = BigInt)]
226 retrying_count: i64,
227 #[diesel(sql_type = BigInt)]
228 oldest_created_age_seconds: i64,
229 #[diesel(sql_type = BigInt)]
230 oldest_attempt_age_seconds: i64,
231 }
232
233 let mut conn = self.get_conn().await?;
234 let query = diesel::sql_query(DIRECTIVE_OUTBOX_DELIVERY_METRICS_SQL);
235 let rows = diesel_async::RunQueryDsl::get_results::<MetricsRow>(query, &mut conn).await?;
236
237 Ok(rows
238 .into_iter()
239 .map(|row| hypercall_db::DirectiveOutboxDeliveryMetricsRow {
240 action_key: row.action_key,
241 delivery_status: row.delivery_status,
242 pending_count: row.pending_count,
243 retrying_count: row.retrying_count,
244 oldest_created_age_seconds: row.oldest_created_age_seconds,
245 oldest_attempt_age_seconds: row.oldest_attempt_age_seconds,
246 })
247 .collect())
248 }
249
250 async fn list_recent_directive_outbox_rows(
251 &self,
252 limit: i64,
253 offset: i64,
254 ) -> Result<Vec<hypercall_db::DirectiveOutboxRecentRow>> {
255 #[derive(diesel::QueryableByName)]
256 struct Row {
257 #[diesel(sql_type = BigInt)]
258 outbox_seq: i64,
259 #[diesel(sql_type = Text)]
260 directive_id: String,
261 #[diesel(sql_type = Text)]
262 kind: String,
263 #[diesel(sql_type = Text)]
264 action_key: String,
265 #[diesel(sql_type = Binary)]
266 wallet_address: Vec<u8>,
267 #[diesel(sql_type = Binary)]
268 account_address: Vec<u8>,
269 #[diesel(sql_type = Binary)]
270 signer_address: Vec<u8>,
271 #[diesel(sql_type = BigInt)]
272 directive_nonce: i64,
273 #[diesel(sql_type = Text)]
274 domain_status: String,
275 #[diesel(sql_type = Text)]
276 delivery_status: String,
277 #[diesel(sql_type = Nullable<Binary>)]
278 submitter_address: Option<Vec<u8>>,
279 #[diesel(sql_type = Nullable<BigInt>)]
280 submitter_nonce: Option<i64>,
281 #[diesel(sql_type = Nullable<Text>)]
282 tx_hash: Option<String>,
283 #[diesel(sql_type = BigInt)]
284 delivery_attempts: i64,
285 #[diesel(sql_type = Nullable<Text>)]
286 last_delivery_error: Option<String>,
287 #[diesel(sql_type = BigInt)]
288 created_ts_ms: i64,
289 #[diesel(sql_type = Nullable<BigInt>)]
290 last_attempt_ts_ms: Option<i64>,
291 #[diesel(sql_type = Nullable<BigInt>)]
292 expires_at_ms: Option<i64>,
293 }
294
295 let mut conn = self.get_conn().await?;
296 let limit = limit.clamp(1, 500);
297 let offset = offset.max(0);
298 let query = diesel::sql_query(
299 r#"
300 SELECT outbox_seq,
301 directive_id,
302 kind,
303 action_key::text AS action_key,
304 wallet_address,
305 account_address,
306 signer_address,
307 directive_nonce,
308 domain_status,
309 delivery_status,
310 submitter_address,
311 submitter_nonce,
312 tx_hash,
313 delivery_attempts::BIGINT AS delivery_attempts,
314 last_delivery_error,
315 created_ts_ms,
316 (EXTRACT(EPOCH FROM last_attempt_at) * 1000)::BIGINT AS last_attempt_ts_ms,
317 expires_at_ms
318 FROM directive_outbox
319 ORDER BY created_ts_ms DESC, outbox_seq DESC
320 LIMIT $1
321 OFFSET $2
322 "#,
323 )
324 .bind::<BigInt, _>(limit)
325 .bind::<BigInt, _>(offset);
326 let rows = diesel_async::RunQueryDsl::get_results::<Row>(query, &mut conn).await?;
327
328 rows.into_iter()
329 .map(|row| {
330 Ok(hypercall_db::DirectiveOutboxRecentRow {
331 outbox_seq: row.outbox_seq,
332 directive_id: row.directive_id,
333 kind: row.kind,
334 action_key: row.action_key,
335 wallet_address: wallet_from_bytes(&row.wallet_address, "wallet")?,
336 account_address: wallet_from_bytes(&row.account_address, "account")?,
337 signer_address: wallet_from_bytes(&row.signer_address, "signer")?,
338 directive_nonce: row.directive_nonce,
339 domain_status: row.domain_status,
340 delivery_status: row.delivery_status,
341 submitter_address: row
342 .submitter_address
343 .map(|bytes| address_from_bytes(&bytes, "submitter"))
344 .transpose()?,
345 submitter_nonce: optional_nonnegative_i64_to_u64(
346 row.submitter_nonce,
347 "submitter_nonce",
348 )?,
349 tx_hash: row.tx_hash,
350 delivery_attempts: row.delivery_attempts,
351 last_delivery_error: row.last_delivery_error,
352 created_ts_ms: row.created_ts_ms,
353 last_attempt_ts_ms: row.last_attempt_ts_ms,
354 expires_at_ms: row.expires_at_ms,
355 })
356 })
357 .collect()
358 }
359}
360
361#[async_trait]
362impl hypercall_db::AsyncDirectiveOutboxWriter for DieselDb {
363 async fn record_directive_submitter_submission(
364 &self,
365 request_id: &str,
366 submitter_address: &Address,
367 submitter_nonce: u64,
368 ) -> Result<()> {
369 let submitter_nonce = i64::try_from(submitter_nonce).map_err(|_| {
370 anyhow::anyhow!("submitter nonce {submitter_nonce} does not fit in BIGINT")
371 })?;
372 use crate::schema::directive_outbox::dsl as outbox_dsl;
373
374 let mut conn = self.get_conn().await?;
375 let query = diesel::update(
376 outbox_dsl::directive_outbox.filter(outbox_dsl::directive_id.eq(request_id)),
377 )
378 .set((
379 outbox_dsl::submitter_address.eq(Some(submitter_address.as_slice())),
380 outbox_dsl::submitter_nonce.eq(Some(submitter_nonce)),
381 outbox_dsl::updated_at.eq(diesel::dsl::now),
382 ));
383 let updated = diesel_async::RunQueryDsl::execute(query, &mut conn).await?;
384 anyhow::ensure!(
385 updated == 1,
386 "expected exactly one directive_outbox row for directive_id={request_id}, updated {updated}"
387 );
388 Ok(())
389 }
390
391 async fn persist_directive_transaction_update(
392 &self,
393 request_id: &str,
394 status: TransactionStatus,
395 tx_hash: Option<&str>,
396 error: Option<&str>,
397 ) -> Result<()> {
398 let (domain_status, delivery_status): (Option<&str>, Option<&str>) = match status {
399 TransactionStatus::Submitted => (None, Some("pending")),
400 TransactionStatus::Pending => (None, Some("pending")),
401 TransactionStatus::Confirmed => (Some("completed"), Some("finalized")),
402 TransactionStatus::Failed => (Some("failed"), Some("reverted")),
403 TransactionStatus::Expired => (Some("failed"), Some("expired")),
404 };
405
406 let mut conn = self.get_conn().await?;
407 if let Some(domain_status) = domain_status {
408 #[derive(diesel::QueryableByName)]
409 struct ActionKeyRow {
410 #[diesel(sql_type = DirectiveActionKeySql)]
411 action_key: DirectiveActionKeyDb,
412 }
413
414 let query = diesel::sql_query(
415 r#"
416 WITH updated_outbox AS (
417 UPDATE directive_outbox
418 SET domain_status = $2,
419 delivery_status = $3,
420 tx_hash = COALESCE($4, tx_hash),
421 last_delivery_error = $5,
422 updated_at = CURRENT_TIMESTAMP
423 WHERE directive_id = $1
424 AND delivery_status NOT IN ('finalized', 'reverted', 'expired', 'dead_lettered')
425 RETURNING directive_id, action_key
426 )
427 SELECT action_key FROM updated_outbox
428 "#,
429 )
430 .bind::<Text, _>(request_id)
431 .bind::<Text, _>(domain_status)
432 .bind::<Text, _>(
433 delivery_status.expect("terminal update must have delivery_status"),
434 )
435 .bind::<Nullable<Text>, _>(tx_hash)
436 .bind::<Nullable<Text>, _>(error);
437
438 let updated_row: Option<ActionKeyRow> =
439 diesel_async::RunQueryDsl::get_result(query, &mut conn)
440 .await
441 .optional()?;
442
443 if matches!(
444 status,
445 TransactionStatus::Failed | TransactionStatus::Expired
446 ) {
447 if let Some(row) = updated_row {
448 let action_key: hypercall_types::directives::ActionKey = row.action_key.into();
449 if matches!(
450 action_key,
451 hypercall_types::directives::ActionKey::SystemWithdrawToken
452 | hypercall_types::directives::ActionKey::SystemCreditOption
453 ) {
454 metrics::counter!(
455 "ht_withdrawal_manual_reconciliation_required_total",
456 "action_key" => action_key.as_str(),
457 )
458 .increment(1);
459 tracing::warn!(
460 directive_id = %request_id,
461 action_key = ?action_key,
462 "Withdrawal directive reached terminal delivery status; automatic refunds are disabled and operator reconciliation is required"
463 );
464 }
465 }
466 }
467 } else if let Some(delivery_status) = delivery_status {
468 let query = diesel::sql_query(
469 r#"
470 UPDATE directive_outbox
471 SET delivery_status = $2,
472 tx_hash = COALESCE($3, tx_hash),
473 last_delivery_error = $4,
474 updated_at = CURRENT_TIMESTAMP
475 WHERE directive_id = $1
476 AND delivery_status NOT IN ('finalized', 'reverted', 'expired', 'dead_lettered')
477 "#,
478 )
479 .bind::<Text, _>(request_id)
480 .bind::<Text, _>(delivery_status)
481 .bind::<Nullable<Text>, _>(tx_hash)
482 .bind::<Nullable<Text>, _>(error);
483 diesel_async::RunQueryDsl::execute(query, &mut conn).await?;
484 }
485 Ok(())
486 }
487}
488
489impl hypercall_db::DirectiveOutboxReader for DatabaseHandler {
490 fn claim_next_directive_outbox_item_sync(
491 &self,
492 ) -> Result<Option<hypercall_db::DirectiveOutboxRow>> {
493 #[derive(diesel::QueryableByName)]
494 struct Row {
495 #[diesel(sql_type = BigInt)]
496 outbox_seq: i64,
497 #[diesel(sql_type = Text)]
498 directive_id: String,
499 #[diesel(sql_type = Text)]
500 kind: String,
501 #[diesel(sql_type = DirectiveActionKeySql)]
502 action_key: DirectiveActionKeyDb,
503 #[diesel(sql_type = Binary)]
504 account_address: Vec<u8>,
505 #[diesel(sql_type = Binary)]
506 signer_address: Vec<u8>,
507 #[diesel(sql_type = BigInt)]
508 directive_nonce: i64,
509 #[diesel(sql_type = Binary)]
510 payload: Vec<u8>,
511 #[diesel(sql_type = Nullable<BigInt>)]
512 expires_at_ms: Option<i64>,
513 }
514
515 let mut conn = self.pool().get()?;
516 let row = diesel::sql_query(
517 r#"
518 UPDATE directive_outbox
519 SET delivery_attempts = delivery_attempts + 1,
520 delivery_status = 'broadcasted',
521 last_attempt_at = CURRENT_TIMESTAMP,
522 updated_at = CURRENT_TIMESTAMP
523 WHERE outbox_seq = (
524 SELECT outbox_seq
525 FROM directive_outbox
526 WHERE tx_hash IS NULL
527 AND (
528 (
529 delivery_status = 'pending'
530 AND (
531 last_attempt_at IS NULL
532 OR last_attempt_at <= CURRENT_TIMESTAMP - INTERVAL '30 seconds'
533 )
534 )
535 OR (
536 delivery_status = 'broadcasted'
537 AND last_attempt_at <= CURRENT_TIMESTAMP - INTERVAL '5 minutes'
538 )
539 )
540 ORDER BY outbox_seq ASC
541 LIMIT 1
542 FOR UPDATE SKIP LOCKED
543 )
544 RETURNING outbox_seq, directive_id, kind, action_key, account_address,
545 signer_address, directive_nonce, payload, expires_at_ms
546 "#,
547 )
548 .get_result::<Row>(&mut conn)
549 .optional()?;
550
551 row.map(|row| {
552 let nonce = u64::try_from(row.directive_nonce)
553 .map_err(|_| anyhow::anyhow!("negative directive nonce {}", row.directive_nonce))?;
554 Ok(hypercall_db::DirectiveOutboxRow {
555 outbox_seq: row.outbox_seq,
556 directive_id: row.directive_id,
557 kind: row.kind,
558 action_key: row.action_key.into(),
559 account: wallet_from_bytes(&row.account_address, "account")?,
560 signer: wallet_from_bytes(&row.signer_address, "signer")?,
561 nonce,
562 payload: row.payload,
563 expires_at_ms: optional_nonnegative_i64_to_u64(row.expires_at_ms, "expires_at_ms")?,
564 })
565 })
566 .transpose()
567 }
568
569 fn get_directive_status_sync(
570 &self,
571 directive_id: &str,
572 ) -> Result<Option<hypercall_db::DirectiveStatusRow>> {
573 #[derive(diesel::QueryableByName)]
574 struct StatusRow {
575 #[diesel(sql_type = Text)]
576 directive_id: String,
577 #[diesel(sql_type = Text)]
578 action_key: String,
579 #[diesel(sql_type = Text)]
580 domain_status: String,
581 #[diesel(sql_type = Text)]
582 delivery_status: String,
583 #[diesel(sql_type = Nullable<Binary>)]
584 submitter_address: Option<Vec<u8>>,
585 #[diesel(sql_type = Nullable<BigInt>)]
586 submitter_nonce: Option<i64>,
587 #[diesel(sql_type = Nullable<Text>)]
588 tx_hash: Option<String>,
589 #[diesel(sql_type = Nullable<BigInt>)]
590 created_ts_ms: Option<i64>,
591 }
592
593 let mut conn = self.pool().get()?;
594 let row = diesel::sql_query(
595 r#"
596 SELECT directive_id, action_key::text AS action_key, domain_status, delivery_status,
597 submitter_address, submitter_nonce, tx_hash, created_ts_ms
598 FROM directive_outbox
599 WHERE directive_id = $1
600 "#,
601 )
602 .bind::<Text, _>(directive_id)
603 .get_result::<StatusRow>(&mut conn)
604 .optional()?;
605
606 row.map(|r| {
607 Ok(hypercall_db::DirectiveStatusRow {
608 directive_id: r.directive_id,
609 action_key: r.action_key,
610 domain_status: r.domain_status,
611 delivery_status: r.delivery_status,
612 submitter_address: r
613 .submitter_address
614 .map(|bytes| address_from_bytes(&bytes, "submitter"))
615 .transpose()?,
616 submitter_nonce: optional_nonnegative_i64_to_u64(
617 r.submitter_nonce,
618 "submitter_nonce",
619 )?,
620 tx_hash: r.tx_hash,
621 created_ts_ms: r.created_ts_ms,
622 })
623 })
624 .transpose()
625 }
626
627 fn get_withdrawal_history_sync(
628 &self,
629 wallet: &WalletAddress,
630 limit: i64,
631 ) -> Result<Vec<hypercall_db::DirectiveStatusRow>> {
632 #[derive(diesel::QueryableByName)]
633 struct StatusRow {
634 #[diesel(sql_type = Text)]
635 directive_id: String,
636 #[diesel(sql_type = Text)]
637 action_key: String,
638 #[diesel(sql_type = Text)]
639 domain_status: String,
640 #[diesel(sql_type = Text)]
641 delivery_status: String,
642 #[diesel(sql_type = Nullable<Binary>)]
643 submitter_address: Option<Vec<u8>>,
644 #[diesel(sql_type = Nullable<BigInt>)]
645 submitter_nonce: Option<i64>,
646 #[diesel(sql_type = Nullable<Text>)]
647 tx_hash: Option<String>,
648 #[diesel(sql_type = Nullable<BigInt>)]
649 created_ts_ms: Option<i64>,
650 }
651
652 let mut conn = self.pool().get()?;
653 let limit = limit.max(1);
654 let rows = diesel::sql_query(
655 r#"
656 SELECT directive_id, action_key::text AS action_key, domain_status, delivery_status,
657 submitter_address, submitter_nonce, tx_hash, created_ts_ms
658 FROM directive_outbox d
659 JOIN engine_commands ec ON ec.command_id = d.command_id
660 WHERE d.action_key::text IN ('system_withdraw_token', 'system_credit_option')
661 AND (d.wallet_address = $1 OR d.account_address = $1)
662 AND ec.command_type_enum IN ('CashWithdrawalUpdate', 'OptionWithdrawalUpdate')
663 ORDER BY d.created_ts_ms DESC
664 LIMIT $2
665 "#,
666 )
667 .bind::<Binary, _>(wallet.as_bytes())
668 .bind::<BigInt, _>(limit)
669 .get_results::<StatusRow>(&mut conn)?;
670
671 rows.into_iter()
672 .map(|r| {
673 Ok(hypercall_db::DirectiveStatusRow {
674 directive_id: r.directive_id,
675 action_key: r.action_key,
676 domain_status: r.domain_status,
677 delivery_status: r.delivery_status,
678 submitter_address: r
679 .submitter_address
680 .map(|bytes| address_from_bytes(&bytes, "submitter"))
681 .transpose()?,
682 submitter_nonce: optional_nonnegative_i64_to_u64(
683 r.submitter_nonce,
684 "submitter_nonce",
685 )?,
686 tx_hash: r.tx_hash,
687 created_ts_ms: r.created_ts_ms,
688 })
689 })
690 .collect()
691 }
692
693 fn list_directive_outbox_delivery_metrics_sync(
694 &self,
695 ) -> Result<Vec<hypercall_db::DirectiveOutboxDeliveryMetricsRow>> {
696 #[derive(diesel::QueryableByName)]
697 struct MetricsRow {
698 #[diesel(sql_type = Text)]
699 action_key: String,
700 #[diesel(sql_type = Text)]
701 delivery_status: String,
702 #[diesel(sql_type = BigInt)]
703 pending_count: i64,
704 #[diesel(sql_type = BigInt)]
705 retrying_count: i64,
706 #[diesel(sql_type = BigInt)]
707 oldest_created_age_seconds: i64,
708 #[diesel(sql_type = BigInt)]
709 oldest_attempt_age_seconds: i64,
710 }
711
712 let mut conn = self.pool().get()?;
713 let rows = diesel::sql_query(DIRECTIVE_OUTBOX_DELIVERY_METRICS_SQL)
714 .get_results::<MetricsRow>(&mut conn)?;
715
716 Ok(rows
717 .into_iter()
718 .map(|row| hypercall_db::DirectiveOutboxDeliveryMetricsRow {
719 action_key: row.action_key,
720 delivery_status: row.delivery_status,
721 pending_count: row.pending_count,
722 retrying_count: row.retrying_count,
723 oldest_created_age_seconds: row.oldest_created_age_seconds,
724 oldest_attempt_age_seconds: row.oldest_attempt_age_seconds,
725 })
726 .collect())
727 }
728
729 fn list_recent_directive_outbox_rows_sync(
730 &self,
731 limit: i64,
732 offset: i64,
733 ) -> Result<Vec<hypercall_db::DirectiveOutboxRecentRow>> {
734 #[derive(diesel::QueryableByName)]
735 struct Row {
736 #[diesel(sql_type = BigInt)]
737 outbox_seq: i64,
738 #[diesel(sql_type = Text)]
739 directive_id: String,
740 #[diesel(sql_type = Text)]
741 kind: String,
742 #[diesel(sql_type = Text)]
743 action_key: String,
744 #[diesel(sql_type = Binary)]
745 wallet_address: Vec<u8>,
746 #[diesel(sql_type = Binary)]
747 account_address: Vec<u8>,
748 #[diesel(sql_type = Binary)]
749 signer_address: Vec<u8>,
750 #[diesel(sql_type = BigInt)]
751 directive_nonce: i64,
752 #[diesel(sql_type = Text)]
753 domain_status: String,
754 #[diesel(sql_type = Text)]
755 delivery_status: String,
756 #[diesel(sql_type = Nullable<Binary>)]
757 submitter_address: Option<Vec<u8>>,
758 #[diesel(sql_type = Nullable<BigInt>)]
759 submitter_nonce: Option<i64>,
760 #[diesel(sql_type = Nullable<Text>)]
761 tx_hash: Option<String>,
762 #[diesel(sql_type = BigInt)]
763 delivery_attempts: i64,
764 #[diesel(sql_type = Nullable<Text>)]
765 last_delivery_error: Option<String>,
766 #[diesel(sql_type = BigInt)]
767 created_ts_ms: i64,
768 #[diesel(sql_type = Nullable<BigInt>)]
769 last_attempt_ts_ms: Option<i64>,
770 #[diesel(sql_type = Nullable<BigInt>)]
771 expires_at_ms: Option<i64>,
772 }
773
774 let mut conn = self.pool().get()?;
775 let rows = diesel::sql_query(
776 r#"
777 SELECT outbox_seq,
778 directive_id,
779 kind,
780 action_key::text AS action_key,
781 wallet_address,
782 account_address,
783 signer_address,
784 directive_nonce,
785 domain_status,
786 delivery_status,
787 submitter_address,
788 submitter_nonce,
789 tx_hash,
790 delivery_attempts::BIGINT AS delivery_attempts,
791 last_delivery_error,
792 created_ts_ms,
793 (EXTRACT(EPOCH FROM last_attempt_at) * 1000)::BIGINT AS last_attempt_ts_ms,
794 expires_at_ms
795 FROM directive_outbox
796 ORDER BY created_ts_ms DESC, outbox_seq DESC
797 LIMIT $1
798 OFFSET $2
799 "#,
800 )
801 .bind::<BigInt, _>(limit.clamp(1, 500))
802 .bind::<BigInt, _>(offset.max(0))
803 .get_results::<Row>(&mut conn)?;
804
805 rows.into_iter()
806 .map(|row| {
807 Ok(hypercall_db::DirectiveOutboxRecentRow {
808 outbox_seq: row.outbox_seq,
809 directive_id: row.directive_id,
810 kind: row.kind,
811 action_key: row.action_key,
812 wallet_address: wallet_from_bytes(&row.wallet_address, "wallet")?,
813 account_address: wallet_from_bytes(&row.account_address, "account")?,
814 signer_address: wallet_from_bytes(&row.signer_address, "signer")?,
815 directive_nonce: row.directive_nonce,
816 domain_status: row.domain_status,
817 delivery_status: row.delivery_status,
818 submitter_address: row
819 .submitter_address
820 .map(|bytes| address_from_bytes(&bytes, "submitter"))
821 .transpose()?,
822 submitter_nonce: optional_nonnegative_i64_to_u64(
823 row.submitter_nonce,
824 "submitter_nonce",
825 )?,
826 tx_hash: row.tx_hash,
827 delivery_attempts: row.delivery_attempts,
828 last_delivery_error: row.last_delivery_error,
829 created_ts_ms: row.created_ts_ms,
830 last_attempt_ts_ms: row.last_attempt_ts_ms,
831 expires_at_ms: row.expires_at_ms,
832 })
833 })
834 .collect()
835 }
836}
837
838#[cfg(test)]
839mod tests {
840 use super::*;
841 use crate::test_helpers::TestDb;
842 use alloy::primitives::Address;
843 use diesel::sql_types::{BigInt, Binary, Nullable, Numeric, Text};
844 use diesel::RunQueryDsl;
845 use hypercall_db::{
846 AsyncDirectiveOutboxReader, AsyncDirectiveOutboxWriter, DirectiveOutboxReader,
847 DirectiveOutboxWriter,
848 };
849 use rust_decimal::Decimal;
850
851 fn wallet(byte: u8) -> WalletAddress {
852 WalletAddress::from(Address::repeat_byte(byte))
853 }
854
855 fn address(byte: u8) -> Address {
856 Address::repeat_byte(byte)
857 }
858
859 #[test]
860 fn negative_directive_expiry_is_rejected() {
861 let error = optional_nonnegative_i64_to_u64(Some(-1), "expires_at_ms")
862 .expect_err("negative expiry must not wrap to u64");
863 assert!(error
864 .to_string()
865 .contains("negative directive expires_at_ms -1"));
866 assert_eq!(
867 optional_nonnegative_i64_to_u64(Some(42), "expires_at_ms").unwrap(),
868 Some(42)
869 );
870 assert_eq!(
871 optional_nonnegative_i64_to_u64(None, "expires_at_ms").unwrap(),
872 None
873 );
874 }
875
876 fn insert_withdrawal_outbox_row(
877 handler: &DatabaseHandler,
878 directive_id: &str,
879 wallet_address: WalletAddress,
880 account_address: WalletAddress,
881 created_ts_ms: i64,
882 ) {
883 #[derive(diesel::QueryableByName)]
884 struct CommandIdRow {
885 #[diesel(sql_type = BigInt)]
886 command_id: i64,
887 }
888
889 let mut conn = handler.pool().get().expect("db connection");
890 let request_uuid = uuid::Uuid::new_v4().to_string();
891 let command_id = diesel::sql_query(
892 r#"
893 INSERT INTO engine_commands (
894 received_ts_ms, request_uuid, command_type_enum, command_data
895 )
896 VALUES ($1, $2::uuid, 'CashWithdrawalUpdate', $3)
897 RETURNING command_id
898 "#,
899 )
900 .bind::<BigInt, _>(created_ts_ms)
901 .bind::<Text, _>(request_uuid)
902 .bind::<Binary, _>(b"test-command".as_slice())
903 .get_result::<CommandIdRow>(&mut conn)
904 .expect("insert engine command")
905 .command_id;
906
907 diesel::sql_query(
908 r#"
909 INSERT INTO directive_outbox (
910 directive_id, command_id, kind, action_key, wallet_address,
911 account_address, signer_address, directive_nonce, idempotency_key,
912 payload_hash, payload, domain_status, delivery_status, created_ts_ms
913 )
914 VALUES (
915 $1, $2, 'needs_rsm_signature', 'system_withdraw_token',
916 $3, $4, $5, $6, $7, $8, $9, 'pending_chain_effect',
917 'pending', $10
918 )
919 "#,
920 )
921 .bind::<Text, _>(directive_id)
922 .bind::<BigInt, _>(command_id)
923 .bind::<Binary, _>(wallet_address.as_bytes())
924 .bind::<Binary, _>(account_address.as_bytes())
925 .bind::<Binary, _>(wallet(9).as_bytes())
926 .bind::<BigInt, _>(created_ts_ms)
927 .bind::<Text, _>(format!("idempotency-{directive_id}"))
928 .bind::<Binary, _>(&[0_u8; 32][..])
929 .bind::<Binary, _>(b"payload".as_slice())
930 .bind::<BigInt, _>(created_ts_ms)
931 .execute(&mut conn)
932 .expect("insert directive outbox row");
933 }
934
935 #[tokio::test]
936 async fn withdrawal_history_filters_wallet_before_limit() {
937 let db = TestDb::new().await.expect("test db");
938 let target_wallet = wallet(1);
939 let target_account = wallet(2);
940 let other_wallet = wallet(3);
941
942 insert_withdrawal_outbox_row(&db.handler, "other-newest", other_wallet, wallet(4), 300);
943 insert_withdrawal_outbox_row(&db.handler, "other-next", other_wallet, wallet(5), 200);
944 insert_withdrawal_outbox_row(
945 &db.handler,
946 "target-oldest",
947 target_wallet,
948 target_account,
949 100,
950 );
951
952 let rows = db
953 .handler
954 .get_withdrawal_history_sync(&target_wallet, 1)
955 .expect("withdrawal history");
956
957 assert_eq!(rows.len(), 1);
958 assert_eq!(rows[0].directive_id, "target-oldest");
959 }
960
961 #[tokio::test]
962 async fn directive_outbox_delivery_metrics_include_retrying_withdrawals() {
963 let db = TestDb::new().await.expect("test db");
964 let now_ms = std::time::SystemTime::now()
965 .duration_since(std::time::UNIX_EPOCH)
966 .expect("system clock after epoch")
967 .as_millis() as i64;
968 let created_ts_ms = now_ms - 120_000;
969
970 insert_withdrawal_outbox_row(
971 &db.handler,
972 "retrying-withdrawal",
973 wallet(1),
974 wallet(2),
975 created_ts_ms,
976 );
977
978 {
979 let mut conn = db.handler.pool().get().expect("db connection");
980 diesel::sql_query(
981 r#"
982 UPDATE directive_outbox
983 SET delivery_attempts = 3,
984 last_delivery_error = 'test delivery failure',
985 last_attempt_at = CURRENT_TIMESTAMP - INTERVAL '90 seconds'
986 WHERE directive_id = 'retrying-withdrawal'
987 "#,
988 )
989 .execute(&mut conn)
990 .expect("mark directive retrying");
991 }
992
993 let rows = db
994 .handler
995 .list_directive_outbox_delivery_metrics_sync()
996 .expect("directive outbox metrics");
997 let row = rows
998 .iter()
999 .find(|row| {
1000 row.action_key == "system_withdraw_token" && row.delivery_status == "pending"
1001 })
1002 .expect("pending withdrawal metrics row");
1003
1004 assert_eq!(row.pending_count, 1);
1005 assert_eq!(row.retrying_count, 1);
1006 assert!(row.oldest_created_age_seconds >= 100);
1007 assert!(row.oldest_attempt_age_seconds >= 80);
1008 }
1009
1010 #[tokio::test]
1011 async fn directive_outbox_records_submitter_pointer_without_attempts() {
1012 let db = TestDb::new().await.expect("test db");
1013 let directive_id = "submitter-pointer";
1014 let submitter_address = address(9);
1015 let submitter_nonce = 42;
1016
1017 insert_withdrawal_outbox_row(&db.handler, directive_id, wallet(1), wallet(2), 100);
1018 db.handler
1019 .record_directive_submitter_submission_sync(
1020 directive_id,
1021 &submitter_address,
1022 submitter_nonce,
1023 )
1024 .expect("record submitter pointer");
1025
1026 let row = db
1027 .handler
1028 .get_directive_status_sync(directive_id)
1029 .expect("directive status")
1030 .expect("directive row");
1031
1032 assert_eq!(row.submitter_address, Some(submitter_address));
1033 assert_eq!(row.submitter_nonce, Some(submitter_nonce));
1034 }
1035
1036 #[tokio::test]
1037 async fn directive_outbox_submitter_pointer_errors_for_missing_directive() {
1038 let db = TestDb::new().await.expect("test db");
1039 let submitter_address = address(9);
1040
1041 let err = db
1042 .handler
1043 .record_directive_submitter_submission_sync("missing-directive", &submitter_address, 42)
1044 .expect_err("missing directive pointer update must fail");
1045
1046 assert!(
1047 err.to_string()
1048 .contains("expected exactly one directive_outbox row"),
1049 "{err}"
1050 );
1051 }
1052
1053 #[tokio::test]
1054 async fn async_directive_outbox_records_submitter_pointer_without_attempts() {
1055 let db = TestDb::new().await.expect("test db");
1056 let diesel_db = db.diesel_db().await;
1057 let directive_id = "async-submitter-pointer";
1058 let submitter_address = address(8);
1059 let submitter_nonce = 43;
1060
1061 insert_withdrawal_outbox_row(&db.handler, directive_id, wallet(1), wallet(2), 100);
1062 diesel_db
1063 .record_directive_submitter_submission(
1064 directive_id,
1065 &submitter_address,
1066 submitter_nonce,
1067 )
1068 .await
1069 .expect("record submitter pointer");
1070
1071 let row = diesel_db
1072 .get_directive_status(directive_id)
1073 .await
1074 .expect("directive status")
1075 .expect("directive row");
1076
1077 assert_eq!(row.submitter_address, Some(submitter_address));
1078 assert_eq!(row.submitter_nonce, Some(submitter_nonce));
1079 }
1080
1081 #[tokio::test]
1082 async fn async_directive_outbox_submitter_pointer_errors_for_missing_directive() {
1083 let db = TestDb::new().await.expect("test db");
1084 let diesel_db = db.diesel_db().await;
1085 let submitter_address = address(8);
1086
1087 let err = diesel_db
1088 .record_directive_submitter_submission(
1089 "missing-async-directive",
1090 &submitter_address,
1091 43,
1092 )
1093 .await
1094 .expect_err("missing directive pointer update must fail");
1095
1096 assert!(
1097 err.to_string()
1098 .contains("expected exactly one directive_outbox row"),
1099 "{err}"
1100 );
1101 }
1102
1103 #[tokio::test]
1104 async fn async_persist_directive_transaction_update_confirms_delivery() {
1105 let db = TestDb::new().await.expect("test db");
1106 let diesel_db = db.diesel_db().await;
1107 let directive_id = "async-confirmed-delivery";
1108 let tx_hash = "0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff";
1109
1110 insert_withdrawal_outbox_row(&db.handler, directive_id, wallet(1), wallet(2), 100);
1111 diesel_db
1112 .persist_directive_transaction_update(
1113 directive_id,
1114 TransactionStatus::Confirmed,
1115 Some(tx_hash),
1116 None,
1117 )
1118 .await
1119 .expect("persist confirmed delivery");
1120
1121 let row = diesel_db
1122 .get_directive_status(directive_id)
1123 .await
1124 .expect("directive status")
1125 .expect("directive row");
1126
1127 assert_eq!(row.domain_status, "completed");
1128 assert_eq!(row.delivery_status, "finalized");
1129 assert_eq!(row.tx_hash.as_deref(), Some(tx_hash));
1130 }
1131
1132 #[tokio::test]
1133 async fn recent_directive_outbox_rows_are_ordered_paginated_and_decoded_sync() {
1134 let db = TestDb::new().await.expect("test db");
1135
1136 insert_withdrawal_outbox_row(&db.handler, "oldest", wallet(1), wallet(2), 100);
1137 insert_withdrawal_outbox_row(&db.handler, "middle", wallet(3), wallet(4), 200);
1138 insert_withdrawal_outbox_row(&db.handler, "newest", wallet(5), wallet(6), 300);
1139
1140 {
1141 let mut conn = db.handler.pool().get().expect("db connection");
1142 diesel::sql_query(
1143 r#"
1144 UPDATE directive_outbox
1145 SET delivery_attempts = 7,
1146 last_delivery_error = 'rpc timeout',
1147 last_attempt_at = to_timestamp(1),
1148 expires_at_ms = 12345,
1149 tx_hash = '0xabc'
1150 WHERE directive_id = 'newest'
1151 "#,
1152 )
1153 .execute(&mut conn)
1154 .expect("mark newest directive delivery metadata");
1155 }
1156
1157 let rows = db
1158 .handler
1159 .list_recent_directive_outbox_rows_sync(2, 0)
1160 .expect("recent directive outbox rows");
1161
1162 assert_eq!(
1163 rows.iter()
1164 .map(|row| row.directive_id.as_str())
1165 .collect::<Vec<_>>(),
1166 vec!["newest", "middle"]
1167 );
1168 assert_eq!(rows[0].wallet_address, wallet(5));
1169 assert_eq!(rows[0].account_address, wallet(6));
1170 assert_eq!(rows[0].signer_address, wallet(9));
1171 assert_eq!(rows[0].delivery_attempts, 7);
1172 assert_eq!(rows[0].last_delivery_error.as_deref(), Some("rpc timeout"));
1173 assert_eq!(rows[0].last_attempt_ts_ms, Some(1000));
1174 assert_eq!(rows[0].expires_at_ms, Some(12345));
1175 assert_eq!(rows[0].tx_hash.as_deref(), Some("0xabc"));
1176
1177 let offset_rows = db
1178 .handler
1179 .list_recent_directive_outbox_rows_sync(1, 1)
1180 .expect("offset directive outbox rows");
1181 assert_eq!(offset_rows[0].directive_id, "middle");
1182 }
1183
1184 #[tokio::test]
1185 async fn recent_directive_outbox_rows_are_ordered_paginated_and_decoded_async() {
1186 let db = TestDb::new().await.expect("test db");
1187 let diesel_db = db.diesel_db().await;
1188
1189 insert_withdrawal_outbox_row(&db.handler, "oldest", wallet(1), wallet(2), 100);
1190 insert_withdrawal_outbox_row(&db.handler, "middle", wallet(3), wallet(4), 200);
1191 insert_withdrawal_outbox_row(&db.handler, "newest", wallet(5), wallet(6), 300);
1192
1193 let rows = diesel_db
1194 .list_recent_directive_outbox_rows(2, 1)
1195 .await
1196 .expect("recent directive outbox rows");
1197
1198 assert_eq!(
1199 rows.iter()
1200 .map(|row| row.directive_id.as_str())
1201 .collect::<Vec<_>>(),
1202 vec!["middle", "oldest"]
1203 );
1204 assert_eq!(rows[0].wallet_address, wallet(3));
1205 assert_eq!(rows[0].account_address, wallet(4));
1206 assert_eq!(rows[0].delivery_attempts, 0);
1207 }
1208
1209 #[tokio::test]
1210 async fn expired_withdrawal_update_requires_manual_reconciliation_without_refund() {
1211 #[derive(diesel::QueryableByName)]
1212 struct StatusRow {
1213 #[diesel(sql_type = Text)]
1214 domain_status: String,
1215 #[diesel(sql_type = Text)]
1216 delivery_status: String,
1217 }
1218
1219 #[derive(diesel::QueryableByName)]
1220 struct CashLedgerRow {
1221 #[diesel(sql_type = Text)]
1222 status: String,
1223 #[diesel(sql_type = Nullable<Text>)]
1224 error: Option<String>,
1225 }
1226
1227 #[derive(diesel::QueryableByName)]
1228 struct CountRow {
1229 #[diesel(sql_type = BigInt)]
1230 count: i64,
1231 }
1232
1233 let db = TestDb::new().await.expect("test db");
1234 let target_wallet = wallet(1);
1235 let target_account = wallet(2);
1236 let directive_id = "expired-withdrawal";
1237 let error = "directive expired before delivery";
1238
1239 insert_withdrawal_outbox_row(
1240 &db.handler,
1241 directive_id,
1242 target_wallet,
1243 target_account,
1244 100,
1245 );
1246
1247 {
1248 let mut conn = db.handler.pool().get().expect("db connection");
1249 diesel::sql_query(
1250 r#"
1251 INSERT INTO hypercore_cash_ledger_events (
1252 wallet, event_hash, event_time_ms, delta_type, amount_usdc, status
1253 )
1254 VALUES ($1, $2, $3, 'withdraw', $4, 'submitted')
1255 "#,
1256 )
1257 .bind::<Binary, _>(target_wallet.as_bytes())
1258 .bind::<Text, _>(directive_id)
1259 .bind::<BigInt, _>(100_i64)
1260 .bind::<Numeric, _>(Decimal::new(1250, 2))
1261 .execute(&mut conn)
1262 .expect("insert cash withdrawal ledger event");
1263 }
1264
1265 db.handler
1266 .persist_directive_transaction_update_sync(
1267 directive_id,
1268 TransactionStatus::Expired,
1269 None,
1270 Some(error),
1271 )
1272 .expect("persist expired update");
1273
1274 let mut conn = db.handler.pool().get().expect("db connection");
1275 let status = diesel::sql_query(
1276 "SELECT domain_status, delivery_status FROM directive_outbox WHERE directive_id = $1",
1277 )
1278 .bind::<Text, _>(directive_id)
1279 .get_result::<StatusRow>(&mut conn)
1280 .expect("directive status");
1281 assert_eq!(status.domain_status, "failed");
1282 assert_eq!(status.delivery_status, "expired");
1283
1284 let cash_ledger = diesel::sql_query(
1285 "SELECT status, error FROM hypercore_cash_ledger_events WHERE event_hash = $1",
1286 )
1287 .bind::<Text, _>(directive_id)
1288 .get_result::<CashLedgerRow>(&mut conn)
1289 .expect("cash ledger row");
1290 assert_eq!(cash_ledger.status, "submitted");
1291 assert_eq!(cash_ledger.error, None);
1292
1293 let wallet_balance_rows = diesel::sql_query(
1294 "SELECT COUNT(*)::bigint AS count FROM account_balances WHERE account_address = $1",
1295 )
1296 .bind::<Binary, _>(target_wallet.as_bytes())
1297 .get_result::<CountRow>(&mut conn)
1298 .expect("wallet account balance count");
1299 assert_eq!(wallet_balance_rows.count, 0);
1300
1301 let account_balance_rows = diesel::sql_query(
1302 "SELECT COUNT(*)::bigint AS count FROM account_balances WHERE account_address = $1",
1303 )
1304 .bind::<Binary, _>(target_account.as_bytes())
1305 .get_result::<CountRow>(&mut conn)
1306 .expect("account account balance count");
1307 assert_eq!(account_balance_rows.count, 0);
1308
1309 let ledger_events = diesel::sql_query(
1310 "SELECT COUNT(*)::bigint AS count FROM ledger_events WHERE wallet = $1 AND event_type = 'withdraw_failed' AND reference_symbol = $2",
1311 )
1312 .bind::<Binary, _>(target_wallet.as_bytes())
1313 .bind::<Text, _>(directive_id)
1314 .get_result::<CountRow>(&mut conn)
1315 .expect("withdraw_failed ledger event count");
1316 assert_eq!(ledger_events.count, 0);
1317 }
1318
1319 #[tokio::test]
1320 async fn manual_reconciliation_status_does_not_refund_submitted_withdrawal() {
1321 #[derive(diesel::QueryableByName)]
1322 struct StatusRow {
1323 #[diesel(sql_type = Text)]
1324 domain_status: String,
1325 #[diesel(sql_type = Text)]
1326 delivery_status: String,
1327 #[diesel(sql_type = Nullable<Text>)]
1328 last_delivery_error: Option<String>,
1329 }
1330
1331 #[derive(diesel::QueryableByName)]
1332 struct CashLedgerRow {
1333 #[diesel(sql_type = Text)]
1334 status: String,
1335 }
1336
1337 #[derive(diesel::QueryableByName)]
1338 struct CountRow {
1339 #[diesel(sql_type = BigInt)]
1340 count: i64,
1341 }
1342
1343 #[derive(diesel::QueryableByName)]
1344 struct OutboxSeqRow {
1345 #[diesel(sql_type = BigInt)]
1346 outbox_seq: i64,
1347 }
1348
1349 let db = TestDb::new().await.expect("test db");
1350 let target_wallet = wallet(1);
1351 let target_account = wallet(2);
1352 let directive_id = "manual-reconciliation-withdrawal";
1353 let error = "RSM nonce 7 is already used before directive submission; on-chain outcome is unknown and requires manual reconciliation";
1354
1355 insert_withdrawal_outbox_row(
1356 &db.handler,
1357 directive_id,
1358 target_wallet,
1359 target_account,
1360 100,
1361 );
1362
1363 {
1364 let mut conn = db.handler.pool().get().expect("db connection");
1365 diesel::sql_query(
1366 r#"
1367 INSERT INTO hypercore_cash_ledger_events (
1368 wallet, event_hash, event_time_ms, delta_type, amount_usdc, status
1369 )
1370 VALUES ($1, $2, $3, 'withdraw', $4, 'submitted')
1371 "#,
1372 )
1373 .bind::<Binary, _>(target_wallet.as_bytes())
1374 .bind::<Text, _>(directive_id)
1375 .bind::<BigInt, _>(100_i64)
1376 .bind::<Numeric, _>(Decimal::new(1250, 2))
1377 .execute(&mut conn)
1378 .expect("insert cash withdrawal ledger event");
1379 }
1380
1381 let outbox_seq = {
1382 let mut conn = db.handler.pool().get().expect("db connection");
1383 diesel::sql_query("SELECT outbox_seq FROM directive_outbox WHERE directive_id = $1")
1384 .bind::<Text, _>(directive_id)
1385 .get_result::<OutboxSeqRow>(&mut conn)
1386 .expect("outbox seq")
1387 .outbox_seq
1388 };
1389
1390 db.handler
1391 .mark_directive_outbox_manual_reconciliation_sync(outbox_seq, error)
1392 .expect("mark manual reconciliation");
1393
1394 let mut conn = db.handler.pool().get().expect("db connection");
1395 let status = diesel::sql_query(
1396 "SELECT domain_status, delivery_status, last_delivery_error FROM directive_outbox WHERE directive_id = $1",
1397 )
1398 .bind::<Text, _>(directive_id)
1399 .get_result::<StatusRow>(&mut conn)
1400 .expect("directive status");
1401 assert_eq!(status.domain_status, "pending_chain_effect");
1402 assert_eq!(status.delivery_status, "dead_lettered");
1403 assert_eq!(status.last_delivery_error.as_deref(), Some(error));
1404
1405 let cash_ledger = diesel::sql_query(
1406 "SELECT status FROM hypercore_cash_ledger_events WHERE event_hash = $1",
1407 )
1408 .bind::<Text, _>(directive_id)
1409 .get_result::<CashLedgerRow>(&mut conn)
1410 .expect("cash ledger row");
1411 assert_eq!(cash_ledger.status, "submitted");
1412
1413 let balance_rows = diesel::sql_query(
1414 "SELECT COUNT(*)::bigint AS count FROM account_balances WHERE account_address = $1",
1415 )
1416 .bind::<Binary, _>(target_wallet.as_bytes())
1417 .get_result::<CountRow>(&mut conn)
1418 .expect("account balance count");
1419 assert_eq!(balance_rows.count, 0);
1420
1421 let ledger_events = diesel::sql_query(
1422 "SELECT COUNT(*)::bigint AS count FROM ledger_events WHERE wallet = $1 AND event_type = 'withdraw_failed' AND reference_symbol = $2",
1423 )
1424 .bind::<Binary, _>(target_wallet.as_bytes())
1425 .bind::<Text, _>(directive_id)
1426 .get_result::<CountRow>(&mut conn)
1427 .expect("withdraw_failed ledger event count");
1428 assert_eq!(ledger_events.count, 0);
1429 }
1430
1431 #[tokio::test]
1432 async fn failed_status_requires_manual_reconciliation_without_refund() {
1433 #[derive(diesel::QueryableByName)]
1434 struct CashLedgerRow {
1435 #[diesel(sql_type = Text)]
1436 status: String,
1437 }
1438
1439 #[derive(diesel::QueryableByName)]
1440 struct CountRow {
1441 #[diesel(sql_type = BigInt)]
1442 count: i64,
1443 }
1444
1445 let db = TestDb::new().await.expect("test db");
1446 let target_wallet = wallet(1);
1447 let target_account = wallet(2);
1448 let directive_id = "failed-manual-reconciliation-withdrawal";
1449
1450 insert_withdrawal_outbox_row(
1451 &db.handler,
1452 directive_id,
1453 target_wallet,
1454 target_account,
1455 100,
1456 );
1457
1458 {
1459 let mut conn = db.handler.pool().get().expect("db connection");
1460 diesel::sql_query(
1461 r#"
1462 INSERT INTO hypercore_cash_ledger_events (
1463 wallet, event_hash, event_time_ms, delta_type, amount_usdc, status
1464 )
1465 VALUES ($1, $2, $3, 'withdraw', $4, 'submitted')
1466 "#,
1467 )
1468 .bind::<Binary, _>(target_wallet.as_bytes())
1469 .bind::<Text, _>(directive_id)
1470 .bind::<BigInt, _>(100_i64)
1471 .bind::<Numeric, _>(Decimal::new(1250, 2))
1472 .execute(&mut conn)
1473 .expect("insert cash withdrawal ledger event");
1474 }
1475
1476 db.handler
1477 .persist_directive_transaction_update_sync(
1478 directive_id,
1479 TransactionStatus::Failed,
1480 None,
1481 Some("failed without proven no-chain-effect"),
1482 )
1483 .expect("persist failed update");
1484
1485 let mut conn = db.handler.pool().get().expect("db connection");
1486 let cash_ledger = diesel::sql_query(
1487 "SELECT status FROM hypercore_cash_ledger_events WHERE event_hash = $1",
1488 )
1489 .bind::<Text, _>(directive_id)
1490 .get_result::<CashLedgerRow>(&mut conn)
1491 .expect("cash ledger row");
1492 assert_eq!(cash_ledger.status, "submitted");
1493
1494 let balance_rows = diesel::sql_query(
1495 "SELECT COUNT(*)::bigint AS count FROM account_balances WHERE account_address = $1",
1496 )
1497 .bind::<Binary, _>(target_wallet.as_bytes())
1498 .get_result::<CountRow>(&mut conn)
1499 .expect("account balance count");
1500 assert_eq!(balance_rows.count, 0);
1501
1502 let ledger_events = diesel::sql_query(
1503 "SELECT COUNT(*)::bigint AS count FROM ledger_events WHERE wallet = $1 AND event_type = 'withdraw_failed' AND reference_symbol = $2",
1504 )
1505 .bind::<Binary, _>(target_wallet.as_bytes())
1506 .bind::<Text, _>(directive_id)
1507 .get_result::<CountRow>(&mut conn)
1508 .expect("withdraw_failed ledger event count");
1509 assert_eq!(ledger_events.count, 0);
1510 }
1511}
1512
1513impl hypercall_db::DirectiveOutboxWriter for DatabaseHandler {
1514 fn mark_directive_outbox_delivery_failed_sync(
1515 &self,
1516 outbox_seq: i64,
1517 error: &str,
1518 ) -> Result<()> {
1519 let mut conn = self.pool().get()?;
1520 diesel::sql_query(
1521 r#"
1522 UPDATE directive_outbox
1523 SET delivery_status = 'pending',
1524 last_delivery_error = $2,
1525 updated_at = CURRENT_TIMESTAMP
1526 WHERE outbox_seq = $1
1527 AND delivery_status NOT IN ('finalized', 'reverted', 'expired', 'dead_lettered')
1528 "#,
1529 )
1530 .bind::<BigInt, _>(outbox_seq)
1531 .bind::<Text, _>(error)
1532 .execute(&mut conn)?;
1533 metrics::counter!("ht_directive_outbox_delivery_failures_total").increment(1);
1534 Ok(())
1535 }
1536
1537 fn mark_directive_outbox_dead_lettered_sync(&self, outbox_seq: i64, error: &str) -> Result<()> {
1538 let mut conn = self.pool().get()?;
1539 diesel::sql_query(
1540 r#"
1541 UPDATE directive_outbox
1542 SET delivery_status = 'dead_lettered',
1543 domain_status = 'failed',
1544 last_delivery_error = $2,
1545 updated_at = CURRENT_TIMESTAMP
1546 WHERE outbox_seq = $1
1547 AND delivery_status NOT IN ('finalized', 'reverted', 'expired', 'dead_lettered')
1548 "#,
1549 )
1550 .bind::<BigInt, _>(outbox_seq)
1551 .bind::<Text, _>(error)
1552 .execute(&mut conn)?;
1553 Ok(())
1554 }
1555
1556 fn mark_directive_outbox_manual_reconciliation_sync(
1557 &self,
1558 outbox_seq: i64,
1559 error: &str,
1560 ) -> Result<()> {
1561 let mut conn = self.pool().get()?;
1562 diesel::sql_query(
1563 r#"
1564 UPDATE directive_outbox
1565 SET delivery_status = 'dead_lettered',
1566 last_delivery_error = $2,
1567 updated_at = CURRENT_TIMESTAMP
1568 WHERE outbox_seq = $1
1569 AND delivery_status NOT IN ('finalized', 'reverted', 'expired', 'dead_lettered')
1570 "#,
1571 )
1572 .bind::<BigInt, _>(outbox_seq)
1573 .bind::<Text, _>(error)
1574 .execute(&mut conn)?;
1575 Ok(())
1576 }
1577
1578 fn record_directive_submitter_submission_sync(
1579 &self,
1580 request_id: &str,
1581 submitter_address: &Address,
1582 submitter_nonce: u64,
1583 ) -> Result<()> {
1584 let submitter_nonce = i64::try_from(submitter_nonce).map_err(|_| {
1585 anyhow::anyhow!("submitter nonce {submitter_nonce} does not fit in BIGINT")
1586 })?;
1587 use crate::schema::directive_outbox::dsl as outbox_dsl;
1588
1589 let mut conn = self.pool().get()?;
1590 let updated = diesel::update(
1591 outbox_dsl::directive_outbox.filter(outbox_dsl::directive_id.eq(request_id)),
1592 )
1593 .set((
1594 outbox_dsl::submitter_address.eq(Some(submitter_address.as_slice())),
1595 outbox_dsl::submitter_nonce.eq(Some(submitter_nonce)),
1596 outbox_dsl::updated_at.eq(diesel::dsl::now),
1597 ))
1598 .execute(&mut conn)?;
1599 anyhow::ensure!(
1600 updated == 1,
1601 "expected exactly one directive_outbox row for directive_id={request_id}, updated {updated}"
1602 );
1603 Ok(())
1604 }
1605
1606 fn persist_directive_transaction_update_sync(
1607 &self,
1608 request_id: &str,
1609 status: TransactionStatus,
1610 tx_hash: Option<&str>,
1611 error: Option<&str>,
1612 ) -> Result<()> {
1613 let (domain_status, delivery_status): (Option<&str>, Option<&str>) = match status {
1614 TransactionStatus::Submitted => (None, Some("pending")),
1615 TransactionStatus::Pending => (None, Some("pending")),
1616 TransactionStatus::Confirmed => (Some("completed"), Some("finalized")),
1617 TransactionStatus::Failed => (Some("failed"), Some("reverted")),
1618 TransactionStatus::Expired => (Some("failed"), Some("expired")),
1619 };
1620
1621 let mut conn = self.pool().get()?;
1622 if let Some(domain_status) = domain_status {
1623 #[derive(diesel::QueryableByName)]
1624 struct ActionKeyRow {
1625 #[diesel(sql_type = DirectiveActionKeySql)]
1626 action_key: DirectiveActionKeyDb,
1627 }
1628
1629 let updated_row: Option<ActionKeyRow> = diesel::sql_query(
1630 r#"
1631 WITH updated_outbox AS (
1632 UPDATE directive_outbox
1633 SET domain_status = $2,
1634 delivery_status = $3,
1635 tx_hash = COALESCE($4, tx_hash),
1636 last_delivery_error = $5,
1637 updated_at = CURRENT_TIMESTAMP
1638 WHERE directive_id = $1
1639 AND delivery_status NOT IN ('finalized', 'reverted', 'expired', 'dead_lettered')
1640 RETURNING directive_id, action_key
1641 )
1642 SELECT action_key FROM updated_outbox
1643 "#,
1644 )
1645 .bind::<Text, _>(request_id)
1646 .bind::<Text, _>(domain_status)
1647 .bind::<Text, _>(
1648 delivery_status.expect("terminal update must have delivery_status"),
1649 )
1650 .bind::<Nullable<Text>, _>(tx_hash)
1651 .bind::<Nullable<Text>, _>(error)
1652 .get_result(&mut conn)
1653 .optional()?;
1654
1655 if matches!(
1656 status,
1657 TransactionStatus::Failed | TransactionStatus::Expired
1658 ) {
1659 if let Some(row) = updated_row {
1660 let action_key: hypercall_types::directives::ActionKey = row.action_key.into();
1661 if matches!(
1662 action_key,
1663 hypercall_types::directives::ActionKey::SystemWithdrawToken
1664 | hypercall_types::directives::ActionKey::SystemCreditOption
1665 ) {
1666 metrics::counter!(
1667 "ht_withdrawal_manual_reconciliation_required_total",
1668 "action_key" => action_key.as_str(),
1669 )
1670 .increment(1);
1671 tracing::warn!(
1672 directive_id = %request_id,
1673 action_key = ?action_key,
1674 "Withdrawal directive reached terminal delivery status; automatic refunds are disabled and operator reconciliation is required"
1675 );
1676 }
1677 }
1678 }
1679 } else if let Some(delivery_status) = delivery_status {
1680 diesel::sql_query(
1681 r#"
1682 UPDATE directive_outbox
1683 SET delivery_status = $2,
1684 tx_hash = COALESCE($3, tx_hash),
1685 last_delivery_error = $4,
1686 updated_at = CURRENT_TIMESTAMP
1687 WHERE directive_id = $1
1688 AND delivery_status NOT IN ('finalized', 'reverted', 'expired', 'dead_lettered')
1689 "#,
1690 )
1691 .bind::<Text, _>(request_id)
1692 .bind::<Text, _>(delivery_status)
1693 .bind::<Nullable<Text>, _>(tx_hash)
1694 .bind::<Nullable<Text>, _>(error)
1695 .execute(&mut conn)?;
1696 }
1697 Ok(())
1698 }
1699}