Skip to main content

hypercall_db_diesel/
directive_outbox.rs

1//! DirectiveOutboxReader + DirectiveOutboxWriter implementations for DatabaseHandler.
2
3use crate::engine_enums::{DirectiveActionKeyDb, DirectiveActionKeySql};
4use crate::{DatabaseHandler, DieselDb};
5use alloy::primitives::Address;
6use anyhow::Result;
7use async_trait::async_trait;
8use diesel::sql_types::{BigInt, Binary, Nullable, Text};
9use diesel::OptionalExtension;
10use diesel::RunQueryDsl;
11use diesel::{ExpressionMethods, QueryDsl};
12use hypercall_types::{TransactionStatus, WalletAddress};
13
14fn optional_nonnegative_i64_to_u64(value: Option<i64>, field: &str) -> Result<Option<u64>> {
15    value
16        .map(|v| u64::try_from(v).map_err(|_| anyhow::anyhow!("negative directive {field} {}", v)))
17        .transpose()
18}
19
20fn wallet_from_bytes(bytes: &[u8], label: &str) -> Result<WalletAddress> {
21    let arr: [u8; 20] = bytes.try_into().map_err(|_| {
22        anyhow::anyhow!(
23            "invalid directive {} address length {}, expected 20",
24            label,
25            bytes.len()
26        )
27    })?;
28    Ok(WalletAddress::from(arr))
29}
30
31fn address_from_bytes(bytes: &[u8], label: &str) -> Result<Address> {
32    let arr: [u8; 20] = bytes.try_into().map_err(|_| {
33        anyhow::anyhow!(
34            "invalid directive {} address length {}, expected 20",
35            label,
36            bytes.len()
37        )
38    })?;
39    Ok(Address::from(arr))
40}
41
42const DIRECTIVE_OUTBOX_DELIVERY_METRICS_SQL: &str = r#"
43SELECT
44    action_key::text AS action_key,
45    delivery_status,
46    COUNT(*)::BIGINT AS pending_count,
47    SUM(
48        CASE
49            WHEN delivery_attempts > 0 OR last_delivery_error IS NOT NULL THEN 1
50            ELSE 0
51        END
52    )::BIGINT AS retrying_count,
53    GREATEST(
54        MAX(EXTRACT(EPOCH FROM (CURRENT_TIMESTAMP - to_timestamp(created_ts_ms / 1000.0))))::BIGINT,
55        0
56    ) AS oldest_created_age_seconds,
57    GREATEST(
58        MAX(EXTRACT(EPOCH FROM (
59            CURRENT_TIMESTAMP - COALESCE(last_attempt_at, to_timestamp(created_ts_ms / 1000.0))
60        )))::BIGINT,
61        0
62    ) AS oldest_attempt_age_seconds
63FROM directive_outbox
64WHERE tx_hash IS NULL
65  AND delivery_status IN ('pending', 'broadcasted')
66GROUP BY action_key, delivery_status
67"#;
68
69#[async_trait]
70impl hypercall_db::AsyncDirectiveOutboxReader for DieselDb {
71    async fn directive_outbox_exists(&self, directive_id: &str) -> Result<bool> {
72        #[derive(diesel::QueryableByName)]
73        struct ExistsRow {
74            #[diesel(sql_type = diesel::sql_types::Bool)]
75            exists: bool,
76        }
77
78        let mut conn = self.get_conn().await?;
79        let query = diesel::sql_query(
80            "SELECT EXISTS (
81                 SELECT 1 FROM directive_outbox WHERE directive_id = $1
82             ) AS exists",
83        )
84        .bind::<Text, _>(directive_id);
85        let row = diesel_async::RunQueryDsl::get_result::<ExistsRow>(query, &mut conn).await?;
86        Ok(row.exists)
87    }
88
89    async fn get_directive_status(
90        &self,
91        directive_id: &str,
92    ) -> Result<Option<hypercall_db::DirectiveStatusRow>> {
93        #[derive(diesel::QueryableByName)]
94        struct StatusRow {
95            #[diesel(sql_type = Text)]
96            directive_id: String,
97            #[diesel(sql_type = Text)]
98            action_key: String,
99            #[diesel(sql_type = Text)]
100            domain_status: String,
101            #[diesel(sql_type = Text)]
102            delivery_status: String,
103            #[diesel(sql_type = Nullable<Binary>)]
104            submitter_address: Option<Vec<u8>>,
105            #[diesel(sql_type = Nullable<BigInt>)]
106            submitter_nonce: Option<i64>,
107            #[diesel(sql_type = Nullable<Text>)]
108            tx_hash: Option<String>,
109            #[diesel(sql_type = Nullable<BigInt>)]
110            created_ts_ms: Option<i64>,
111        }
112
113        let mut conn = self.get_conn().await?;
114        let query = diesel::sql_query(
115            r#"
116            SELECT directive_id, action_key::text AS action_key, domain_status, delivery_status,
117                   submitter_address, submitter_nonce, tx_hash, created_ts_ms
118            FROM directive_outbox
119            WHERE directive_id = $1
120            "#,
121        )
122        .bind::<Text, _>(directive_id);
123        let row = diesel_async::RunQueryDsl::get_result::<StatusRow>(query, &mut conn)
124            .await
125            .optional()?;
126
127        row.map(|r| {
128            Ok(hypercall_db::DirectiveStatusRow {
129                directive_id: r.directive_id,
130                action_key: r.action_key,
131                domain_status: r.domain_status,
132                delivery_status: r.delivery_status,
133                submitter_address: r
134                    .submitter_address
135                    .map(|bytes| address_from_bytes(&bytes, "submitter"))
136                    .transpose()?,
137                submitter_nonce: optional_nonnegative_i64_to_u64(
138                    r.submitter_nonce,
139                    "submitter_nonce",
140                )?,
141                tx_hash: r.tx_hash,
142                created_ts_ms: r.created_ts_ms,
143            })
144        })
145        .transpose()
146    }
147
148    async fn get_withdrawal_history(
149        &self,
150        wallet: &WalletAddress,
151        limit: i64,
152    ) -> Result<Vec<hypercall_db::DirectiveStatusRow>> {
153        #[derive(diesel::QueryableByName)]
154        struct StatusRow {
155            #[diesel(sql_type = Text)]
156            directive_id: String,
157            #[diesel(sql_type = Text)]
158            action_key: String,
159            #[diesel(sql_type = Text)]
160            domain_status: String,
161            #[diesel(sql_type = Text)]
162            delivery_status: String,
163            #[diesel(sql_type = Nullable<Binary>)]
164            submitter_address: Option<Vec<u8>>,
165            #[diesel(sql_type = Nullable<BigInt>)]
166            submitter_nonce: Option<i64>,
167            #[diesel(sql_type = Nullable<Text>)]
168            tx_hash: Option<String>,
169            #[diesel(sql_type = Nullable<BigInt>)]
170            created_ts_ms: Option<i64>,
171        }
172
173        let mut conn = self.get_conn().await?;
174        let limit = limit.max(1);
175        let query = diesel::sql_query(
176            r#"
177            SELECT directive_id, action_key::text AS action_key, domain_status, delivery_status,
178                   submitter_address, submitter_nonce, tx_hash, created_ts_ms
179            FROM directive_outbox d
180            JOIN engine_commands ec ON ec.command_id = d.command_id
181            WHERE d.action_key::text IN ('system_withdraw_token', 'system_credit_option')
182              AND (d.wallet_address = $1 OR d.account_address = $1)
183              AND ec.command_type_enum IN ('CashWithdrawalUpdate', 'OptionWithdrawalUpdate')
184            ORDER BY d.created_ts_ms DESC
185            LIMIT $2
186            "#,
187        )
188        .bind::<Binary, _>(wallet.as_bytes())
189        .bind::<BigInt, _>(limit);
190        let rows = diesel_async::RunQueryDsl::get_results::<StatusRow>(query, &mut conn).await?;
191
192        rows.into_iter()
193            .map(|r| {
194                Ok(hypercall_db::DirectiveStatusRow {
195                    directive_id: r.directive_id,
196                    action_key: r.action_key,
197                    domain_status: r.domain_status,
198                    delivery_status: r.delivery_status,
199                    submitter_address: r
200                        .submitter_address
201                        .map(|bytes| address_from_bytes(&bytes, "submitter"))
202                        .transpose()?,
203                    submitter_nonce: optional_nonnegative_i64_to_u64(
204                        r.submitter_nonce,
205                        "submitter_nonce",
206                    )?,
207                    tx_hash: r.tx_hash,
208                    created_ts_ms: r.created_ts_ms,
209                })
210            })
211            .collect()
212    }
213
214    async fn list_directive_outbox_delivery_metrics(
215        &self,
216    ) -> Result<Vec<hypercall_db::DirectiveOutboxDeliveryMetricsRow>> {
217        #[derive(diesel::QueryableByName)]
218        struct MetricsRow {
219            #[diesel(sql_type = Text)]
220            action_key: String,
221            #[diesel(sql_type = Text)]
222            delivery_status: String,
223            #[diesel(sql_type = BigInt)]
224            pending_count: i64,
225            #[diesel(sql_type = BigInt)]
226            retrying_count: i64,
227            #[diesel(sql_type = BigInt)]
228            oldest_created_age_seconds: i64,
229            #[diesel(sql_type = BigInt)]
230            oldest_attempt_age_seconds: i64,
231        }
232
233        let mut conn = self.get_conn().await?;
234        let query = diesel::sql_query(DIRECTIVE_OUTBOX_DELIVERY_METRICS_SQL);
235        let rows = diesel_async::RunQueryDsl::get_results::<MetricsRow>(query, &mut conn).await?;
236
237        Ok(rows
238            .into_iter()
239            .map(|row| hypercall_db::DirectiveOutboxDeliveryMetricsRow {
240                action_key: row.action_key,
241                delivery_status: row.delivery_status,
242                pending_count: row.pending_count,
243                retrying_count: row.retrying_count,
244                oldest_created_age_seconds: row.oldest_created_age_seconds,
245                oldest_attempt_age_seconds: row.oldest_attempt_age_seconds,
246            })
247            .collect())
248    }
249
250    async fn list_recent_directive_outbox_rows(
251        &self,
252        limit: i64,
253        offset: i64,
254    ) -> Result<Vec<hypercall_db::DirectiveOutboxRecentRow>> {
255        #[derive(diesel::QueryableByName)]
256        struct Row {
257            #[diesel(sql_type = BigInt)]
258            outbox_seq: i64,
259            #[diesel(sql_type = Text)]
260            directive_id: String,
261            #[diesel(sql_type = Text)]
262            kind: String,
263            #[diesel(sql_type = Text)]
264            action_key: String,
265            #[diesel(sql_type = Binary)]
266            wallet_address: Vec<u8>,
267            #[diesel(sql_type = Binary)]
268            account_address: Vec<u8>,
269            #[diesel(sql_type = Binary)]
270            signer_address: Vec<u8>,
271            #[diesel(sql_type = BigInt)]
272            directive_nonce: i64,
273            #[diesel(sql_type = Text)]
274            domain_status: String,
275            #[diesel(sql_type = Text)]
276            delivery_status: String,
277            #[diesel(sql_type = Nullable<Binary>)]
278            submitter_address: Option<Vec<u8>>,
279            #[diesel(sql_type = Nullable<BigInt>)]
280            submitter_nonce: Option<i64>,
281            #[diesel(sql_type = Nullable<Text>)]
282            tx_hash: Option<String>,
283            #[diesel(sql_type = BigInt)]
284            delivery_attempts: i64,
285            #[diesel(sql_type = Nullable<Text>)]
286            last_delivery_error: Option<String>,
287            #[diesel(sql_type = BigInt)]
288            created_ts_ms: i64,
289            #[diesel(sql_type = Nullable<BigInt>)]
290            last_attempt_ts_ms: Option<i64>,
291            #[diesel(sql_type = Nullable<BigInt>)]
292            expires_at_ms: Option<i64>,
293        }
294
295        let mut conn = self.get_conn().await?;
296        let limit = limit.clamp(1, 500);
297        let offset = offset.max(0);
298        let query = diesel::sql_query(
299            r#"
300            SELECT outbox_seq,
301                   directive_id,
302                   kind,
303                   action_key::text AS action_key,
304                   wallet_address,
305                   account_address,
306                   signer_address,
307                   directive_nonce,
308                   domain_status,
309                   delivery_status,
310                   submitter_address,
311                   submitter_nonce,
312                   tx_hash,
313                   delivery_attempts::BIGINT AS delivery_attempts,
314                   last_delivery_error,
315                   created_ts_ms,
316                   (EXTRACT(EPOCH FROM last_attempt_at) * 1000)::BIGINT AS last_attempt_ts_ms,
317                   expires_at_ms
318            FROM directive_outbox
319            ORDER BY created_ts_ms DESC, outbox_seq DESC
320            LIMIT $1
321            OFFSET $2
322            "#,
323        )
324        .bind::<BigInt, _>(limit)
325        .bind::<BigInt, _>(offset);
326        let rows = diesel_async::RunQueryDsl::get_results::<Row>(query, &mut conn).await?;
327
328        rows.into_iter()
329            .map(|row| {
330                Ok(hypercall_db::DirectiveOutboxRecentRow {
331                    outbox_seq: row.outbox_seq,
332                    directive_id: row.directive_id,
333                    kind: row.kind,
334                    action_key: row.action_key,
335                    wallet_address: wallet_from_bytes(&row.wallet_address, "wallet")?,
336                    account_address: wallet_from_bytes(&row.account_address, "account")?,
337                    signer_address: wallet_from_bytes(&row.signer_address, "signer")?,
338                    directive_nonce: row.directive_nonce,
339                    domain_status: row.domain_status,
340                    delivery_status: row.delivery_status,
341                    submitter_address: row
342                        .submitter_address
343                        .map(|bytes| address_from_bytes(&bytes, "submitter"))
344                        .transpose()?,
345                    submitter_nonce: optional_nonnegative_i64_to_u64(
346                        row.submitter_nonce,
347                        "submitter_nonce",
348                    )?,
349                    tx_hash: row.tx_hash,
350                    delivery_attempts: row.delivery_attempts,
351                    last_delivery_error: row.last_delivery_error,
352                    created_ts_ms: row.created_ts_ms,
353                    last_attempt_ts_ms: row.last_attempt_ts_ms,
354                    expires_at_ms: row.expires_at_ms,
355                })
356            })
357            .collect()
358    }
359}
360
361#[async_trait]
362impl hypercall_db::AsyncDirectiveOutboxWriter for DieselDb {
363    async fn record_directive_submitter_submission(
364        &self,
365        request_id: &str,
366        submitter_address: &Address,
367        submitter_nonce: u64,
368    ) -> Result<()> {
369        let submitter_nonce = i64::try_from(submitter_nonce).map_err(|_| {
370            anyhow::anyhow!("submitter nonce {submitter_nonce} does not fit in BIGINT")
371        })?;
372        use crate::schema::directive_outbox::dsl as outbox_dsl;
373
374        let mut conn = self.get_conn().await?;
375        let query = diesel::update(
376            outbox_dsl::directive_outbox.filter(outbox_dsl::directive_id.eq(request_id)),
377        )
378        .set((
379            outbox_dsl::submitter_address.eq(Some(submitter_address.as_slice())),
380            outbox_dsl::submitter_nonce.eq(Some(submitter_nonce)),
381            outbox_dsl::updated_at.eq(diesel::dsl::now),
382        ));
383        let updated = diesel_async::RunQueryDsl::execute(query, &mut conn).await?;
384        anyhow::ensure!(
385            updated == 1,
386            "expected exactly one directive_outbox row for directive_id={request_id}, updated {updated}"
387        );
388        Ok(())
389    }
390
391    async fn persist_directive_transaction_update(
392        &self,
393        request_id: &str,
394        status: TransactionStatus,
395        tx_hash: Option<&str>,
396        error: Option<&str>,
397    ) -> Result<()> {
398        let (domain_status, delivery_status): (Option<&str>, Option<&str>) = match status {
399            TransactionStatus::Submitted => (None, Some("pending")),
400            TransactionStatus::Pending => (None, Some("pending")),
401            TransactionStatus::Confirmed => (Some("completed"), Some("finalized")),
402            TransactionStatus::Failed => (Some("failed"), Some("reverted")),
403            TransactionStatus::Expired => (Some("failed"), Some("expired")),
404        };
405
406        let mut conn = self.get_conn().await?;
407        if let Some(domain_status) = domain_status {
408            #[derive(diesel::QueryableByName)]
409            struct ActionKeyRow {
410                #[diesel(sql_type = DirectiveActionKeySql)]
411                action_key: DirectiveActionKeyDb,
412            }
413
414            let query = diesel::sql_query(
415                r#"
416                WITH updated_outbox AS (
417                    UPDATE directive_outbox
418                    SET domain_status = $2,
419                        delivery_status = $3,
420                        tx_hash = COALESCE($4, tx_hash),
421                        last_delivery_error = $5,
422                        updated_at = CURRENT_TIMESTAMP
423                    WHERE directive_id = $1
424                      AND delivery_status NOT IN ('finalized', 'reverted', 'expired', 'dead_lettered')
425                    RETURNING directive_id, action_key
426                )
427                SELECT action_key FROM updated_outbox
428                "#,
429            )
430            .bind::<Text, _>(request_id)
431            .bind::<Text, _>(domain_status)
432            .bind::<Text, _>(
433                delivery_status.expect("terminal update must have delivery_status"),
434            )
435            .bind::<Nullable<Text>, _>(tx_hash)
436            .bind::<Nullable<Text>, _>(error);
437
438            let updated_row: Option<ActionKeyRow> =
439                diesel_async::RunQueryDsl::get_result(query, &mut conn)
440                    .await
441                    .optional()?;
442
443            if matches!(
444                status,
445                TransactionStatus::Failed | TransactionStatus::Expired
446            ) {
447                if let Some(row) = updated_row {
448                    let action_key: hypercall_types::directives::ActionKey = row.action_key.into();
449                    if matches!(
450                        action_key,
451                        hypercall_types::directives::ActionKey::SystemWithdrawToken
452                            | hypercall_types::directives::ActionKey::SystemCreditOption
453                    ) {
454                        metrics::counter!(
455                            "ht_withdrawal_manual_reconciliation_required_total",
456                            "action_key" => action_key.as_str(),
457                        )
458                        .increment(1);
459                        tracing::warn!(
460                            directive_id = %request_id,
461                            action_key = ?action_key,
462                            "Withdrawal directive reached terminal delivery status; automatic refunds are disabled and operator reconciliation is required"
463                        );
464                    }
465                }
466            }
467        } else if let Some(delivery_status) = delivery_status {
468            let query = diesel::sql_query(
469                r#"
470                UPDATE directive_outbox
471                SET delivery_status = $2,
472                    tx_hash = COALESCE($3, tx_hash),
473                    last_delivery_error = $4,
474                    updated_at = CURRENT_TIMESTAMP
475                WHERE directive_id = $1
476                  AND delivery_status NOT IN ('finalized', 'reverted', 'expired', 'dead_lettered')
477                "#,
478            )
479            .bind::<Text, _>(request_id)
480            .bind::<Text, _>(delivery_status)
481            .bind::<Nullable<Text>, _>(tx_hash)
482            .bind::<Nullable<Text>, _>(error);
483            diesel_async::RunQueryDsl::execute(query, &mut conn).await?;
484        }
485        Ok(())
486    }
487}
488
489impl hypercall_db::DirectiveOutboxReader for DatabaseHandler {
490    fn claim_next_directive_outbox_item_sync(
491        &self,
492    ) -> Result<Option<hypercall_db::DirectiveOutboxRow>> {
493        #[derive(diesel::QueryableByName)]
494        struct Row {
495            #[diesel(sql_type = BigInt)]
496            outbox_seq: i64,
497            #[diesel(sql_type = Text)]
498            directive_id: String,
499            #[diesel(sql_type = Text)]
500            kind: String,
501            #[diesel(sql_type = DirectiveActionKeySql)]
502            action_key: DirectiveActionKeyDb,
503            #[diesel(sql_type = Binary)]
504            account_address: Vec<u8>,
505            #[diesel(sql_type = Binary)]
506            signer_address: Vec<u8>,
507            #[diesel(sql_type = BigInt)]
508            directive_nonce: i64,
509            #[diesel(sql_type = Binary)]
510            payload: Vec<u8>,
511            #[diesel(sql_type = Nullable<BigInt>)]
512            expires_at_ms: Option<i64>,
513        }
514
515        let mut conn = self.pool().get()?;
516        let row = diesel::sql_query(
517            r#"
518            UPDATE directive_outbox
519            SET delivery_attempts = delivery_attempts + 1,
520                delivery_status = 'broadcasted',
521                last_attempt_at = CURRENT_TIMESTAMP,
522                updated_at = CURRENT_TIMESTAMP
523            WHERE outbox_seq = (
524                SELECT outbox_seq
525                FROM directive_outbox
526                WHERE tx_hash IS NULL
527                  AND (
528                      (
529                          delivery_status = 'pending'
530                          AND (
531                              last_attempt_at IS NULL
532                              OR last_attempt_at <= CURRENT_TIMESTAMP - INTERVAL '30 seconds'
533                          )
534                      )
535                      OR (
536                          delivery_status = 'broadcasted'
537                          AND last_attempt_at <= CURRENT_TIMESTAMP - INTERVAL '5 minutes'
538                      )
539                  )
540                ORDER BY outbox_seq ASC
541                LIMIT 1
542                FOR UPDATE SKIP LOCKED
543            )
544            RETURNING outbox_seq, directive_id, kind, action_key, account_address,
545                      signer_address, directive_nonce, payload, expires_at_ms
546            "#,
547        )
548        .get_result::<Row>(&mut conn)
549        .optional()?;
550
551        row.map(|row| {
552            let nonce = u64::try_from(row.directive_nonce)
553                .map_err(|_| anyhow::anyhow!("negative directive nonce {}", row.directive_nonce))?;
554            Ok(hypercall_db::DirectiveOutboxRow {
555                outbox_seq: row.outbox_seq,
556                directive_id: row.directive_id,
557                kind: row.kind,
558                action_key: row.action_key.into(),
559                account: wallet_from_bytes(&row.account_address, "account")?,
560                signer: wallet_from_bytes(&row.signer_address, "signer")?,
561                nonce,
562                payload: row.payload,
563                expires_at_ms: optional_nonnegative_i64_to_u64(row.expires_at_ms, "expires_at_ms")?,
564            })
565        })
566        .transpose()
567    }
568
569    fn get_directive_status_sync(
570        &self,
571        directive_id: &str,
572    ) -> Result<Option<hypercall_db::DirectiveStatusRow>> {
573        #[derive(diesel::QueryableByName)]
574        struct StatusRow {
575            #[diesel(sql_type = Text)]
576            directive_id: String,
577            #[diesel(sql_type = Text)]
578            action_key: String,
579            #[diesel(sql_type = Text)]
580            domain_status: String,
581            #[diesel(sql_type = Text)]
582            delivery_status: String,
583            #[diesel(sql_type = Nullable<Binary>)]
584            submitter_address: Option<Vec<u8>>,
585            #[diesel(sql_type = Nullable<BigInt>)]
586            submitter_nonce: Option<i64>,
587            #[diesel(sql_type = Nullable<Text>)]
588            tx_hash: Option<String>,
589            #[diesel(sql_type = Nullable<BigInt>)]
590            created_ts_ms: Option<i64>,
591        }
592
593        let mut conn = self.pool().get()?;
594        let row = diesel::sql_query(
595            r#"
596            SELECT directive_id, action_key::text AS action_key, domain_status, delivery_status,
597                   submitter_address, submitter_nonce, tx_hash, created_ts_ms
598            FROM directive_outbox
599            WHERE directive_id = $1
600            "#,
601        )
602        .bind::<Text, _>(directive_id)
603        .get_result::<StatusRow>(&mut conn)
604        .optional()?;
605
606        row.map(|r| {
607            Ok(hypercall_db::DirectiveStatusRow {
608                directive_id: r.directive_id,
609                action_key: r.action_key,
610                domain_status: r.domain_status,
611                delivery_status: r.delivery_status,
612                submitter_address: r
613                    .submitter_address
614                    .map(|bytes| address_from_bytes(&bytes, "submitter"))
615                    .transpose()?,
616                submitter_nonce: optional_nonnegative_i64_to_u64(
617                    r.submitter_nonce,
618                    "submitter_nonce",
619                )?,
620                tx_hash: r.tx_hash,
621                created_ts_ms: r.created_ts_ms,
622            })
623        })
624        .transpose()
625    }
626
627    fn get_withdrawal_history_sync(
628        &self,
629        wallet: &WalletAddress,
630        limit: i64,
631    ) -> Result<Vec<hypercall_db::DirectiveStatusRow>> {
632        #[derive(diesel::QueryableByName)]
633        struct StatusRow {
634            #[diesel(sql_type = Text)]
635            directive_id: String,
636            #[diesel(sql_type = Text)]
637            action_key: String,
638            #[diesel(sql_type = Text)]
639            domain_status: String,
640            #[diesel(sql_type = Text)]
641            delivery_status: String,
642            #[diesel(sql_type = Nullable<Binary>)]
643            submitter_address: Option<Vec<u8>>,
644            #[diesel(sql_type = Nullable<BigInt>)]
645            submitter_nonce: Option<i64>,
646            #[diesel(sql_type = Nullable<Text>)]
647            tx_hash: Option<String>,
648            #[diesel(sql_type = Nullable<BigInt>)]
649            created_ts_ms: Option<i64>,
650        }
651
652        let mut conn = self.pool().get()?;
653        let limit = limit.max(1);
654        let rows = diesel::sql_query(
655            r#"
656            SELECT directive_id, action_key::text AS action_key, domain_status, delivery_status,
657                   submitter_address, submitter_nonce, tx_hash, created_ts_ms
658            FROM directive_outbox d
659            JOIN engine_commands ec ON ec.command_id = d.command_id
660            WHERE d.action_key::text IN ('system_withdraw_token', 'system_credit_option')
661              AND (d.wallet_address = $1 OR d.account_address = $1)
662              AND ec.command_type_enum IN ('CashWithdrawalUpdate', 'OptionWithdrawalUpdate')
663            ORDER BY d.created_ts_ms DESC
664            LIMIT $2
665            "#,
666        )
667        .bind::<Binary, _>(wallet.as_bytes())
668        .bind::<BigInt, _>(limit)
669        .get_results::<StatusRow>(&mut conn)?;
670
671        rows.into_iter()
672            .map(|r| {
673                Ok(hypercall_db::DirectiveStatusRow {
674                    directive_id: r.directive_id,
675                    action_key: r.action_key,
676                    domain_status: r.domain_status,
677                    delivery_status: r.delivery_status,
678                    submitter_address: r
679                        .submitter_address
680                        .map(|bytes| address_from_bytes(&bytes, "submitter"))
681                        .transpose()?,
682                    submitter_nonce: optional_nonnegative_i64_to_u64(
683                        r.submitter_nonce,
684                        "submitter_nonce",
685                    )?,
686                    tx_hash: r.tx_hash,
687                    created_ts_ms: r.created_ts_ms,
688                })
689            })
690            .collect()
691    }
692
693    fn list_directive_outbox_delivery_metrics_sync(
694        &self,
695    ) -> Result<Vec<hypercall_db::DirectiveOutboxDeliveryMetricsRow>> {
696        #[derive(diesel::QueryableByName)]
697        struct MetricsRow {
698            #[diesel(sql_type = Text)]
699            action_key: String,
700            #[diesel(sql_type = Text)]
701            delivery_status: String,
702            #[diesel(sql_type = BigInt)]
703            pending_count: i64,
704            #[diesel(sql_type = BigInt)]
705            retrying_count: i64,
706            #[diesel(sql_type = BigInt)]
707            oldest_created_age_seconds: i64,
708            #[diesel(sql_type = BigInt)]
709            oldest_attempt_age_seconds: i64,
710        }
711
712        let mut conn = self.pool().get()?;
713        let rows = diesel::sql_query(DIRECTIVE_OUTBOX_DELIVERY_METRICS_SQL)
714            .get_results::<MetricsRow>(&mut conn)?;
715
716        Ok(rows
717            .into_iter()
718            .map(|row| hypercall_db::DirectiveOutboxDeliveryMetricsRow {
719                action_key: row.action_key,
720                delivery_status: row.delivery_status,
721                pending_count: row.pending_count,
722                retrying_count: row.retrying_count,
723                oldest_created_age_seconds: row.oldest_created_age_seconds,
724                oldest_attempt_age_seconds: row.oldest_attempt_age_seconds,
725            })
726            .collect())
727    }
728
729    fn list_recent_directive_outbox_rows_sync(
730        &self,
731        limit: i64,
732        offset: i64,
733    ) -> Result<Vec<hypercall_db::DirectiveOutboxRecentRow>> {
734        #[derive(diesel::QueryableByName)]
735        struct Row {
736            #[diesel(sql_type = BigInt)]
737            outbox_seq: i64,
738            #[diesel(sql_type = Text)]
739            directive_id: String,
740            #[diesel(sql_type = Text)]
741            kind: String,
742            #[diesel(sql_type = Text)]
743            action_key: String,
744            #[diesel(sql_type = Binary)]
745            wallet_address: Vec<u8>,
746            #[diesel(sql_type = Binary)]
747            account_address: Vec<u8>,
748            #[diesel(sql_type = Binary)]
749            signer_address: Vec<u8>,
750            #[diesel(sql_type = BigInt)]
751            directive_nonce: i64,
752            #[diesel(sql_type = Text)]
753            domain_status: String,
754            #[diesel(sql_type = Text)]
755            delivery_status: String,
756            #[diesel(sql_type = Nullable<Binary>)]
757            submitter_address: Option<Vec<u8>>,
758            #[diesel(sql_type = Nullable<BigInt>)]
759            submitter_nonce: Option<i64>,
760            #[diesel(sql_type = Nullable<Text>)]
761            tx_hash: Option<String>,
762            #[diesel(sql_type = BigInt)]
763            delivery_attempts: i64,
764            #[diesel(sql_type = Nullable<Text>)]
765            last_delivery_error: Option<String>,
766            #[diesel(sql_type = BigInt)]
767            created_ts_ms: i64,
768            #[diesel(sql_type = Nullable<BigInt>)]
769            last_attempt_ts_ms: Option<i64>,
770            #[diesel(sql_type = Nullable<BigInt>)]
771            expires_at_ms: Option<i64>,
772        }
773
774        let mut conn = self.pool().get()?;
775        let rows = diesel::sql_query(
776            r#"
777            SELECT outbox_seq,
778                   directive_id,
779                   kind,
780                   action_key::text AS action_key,
781                   wallet_address,
782                   account_address,
783                   signer_address,
784                   directive_nonce,
785                   domain_status,
786                   delivery_status,
787                   submitter_address,
788                   submitter_nonce,
789                   tx_hash,
790                   delivery_attempts::BIGINT AS delivery_attempts,
791                   last_delivery_error,
792                   created_ts_ms,
793                   (EXTRACT(EPOCH FROM last_attempt_at) * 1000)::BIGINT AS last_attempt_ts_ms,
794                   expires_at_ms
795            FROM directive_outbox
796            ORDER BY created_ts_ms DESC, outbox_seq DESC
797            LIMIT $1
798            OFFSET $2
799            "#,
800        )
801        .bind::<BigInt, _>(limit.clamp(1, 500))
802        .bind::<BigInt, _>(offset.max(0))
803        .get_results::<Row>(&mut conn)?;
804
805        rows.into_iter()
806            .map(|row| {
807                Ok(hypercall_db::DirectiveOutboxRecentRow {
808                    outbox_seq: row.outbox_seq,
809                    directive_id: row.directive_id,
810                    kind: row.kind,
811                    action_key: row.action_key,
812                    wallet_address: wallet_from_bytes(&row.wallet_address, "wallet")?,
813                    account_address: wallet_from_bytes(&row.account_address, "account")?,
814                    signer_address: wallet_from_bytes(&row.signer_address, "signer")?,
815                    directive_nonce: row.directive_nonce,
816                    domain_status: row.domain_status,
817                    delivery_status: row.delivery_status,
818                    submitter_address: row
819                        .submitter_address
820                        .map(|bytes| address_from_bytes(&bytes, "submitter"))
821                        .transpose()?,
822                    submitter_nonce: optional_nonnegative_i64_to_u64(
823                        row.submitter_nonce,
824                        "submitter_nonce",
825                    )?,
826                    tx_hash: row.tx_hash,
827                    delivery_attempts: row.delivery_attempts,
828                    last_delivery_error: row.last_delivery_error,
829                    created_ts_ms: row.created_ts_ms,
830                    last_attempt_ts_ms: row.last_attempt_ts_ms,
831                    expires_at_ms: row.expires_at_ms,
832                })
833            })
834            .collect()
835    }
836}
837
838#[cfg(test)]
839mod tests {
840    use super::*;
841    use crate::test_helpers::TestDb;
842    use alloy::primitives::Address;
843    use diesel::sql_types::{BigInt, Binary, Nullable, Numeric, Text};
844    use diesel::RunQueryDsl;
845    use hypercall_db::{
846        AsyncDirectiveOutboxReader, AsyncDirectiveOutboxWriter, DirectiveOutboxReader,
847        DirectiveOutboxWriter,
848    };
849    use rust_decimal::Decimal;
850
851    fn wallet(byte: u8) -> WalletAddress {
852        WalletAddress::from(Address::repeat_byte(byte))
853    }
854
855    fn address(byte: u8) -> Address {
856        Address::repeat_byte(byte)
857    }
858
859    #[test]
860    fn negative_directive_expiry_is_rejected() {
861        let error = optional_nonnegative_i64_to_u64(Some(-1), "expires_at_ms")
862            .expect_err("negative expiry must not wrap to u64");
863        assert!(error
864            .to_string()
865            .contains("negative directive expires_at_ms -1"));
866        assert_eq!(
867            optional_nonnegative_i64_to_u64(Some(42), "expires_at_ms").unwrap(),
868            Some(42)
869        );
870        assert_eq!(
871            optional_nonnegative_i64_to_u64(None, "expires_at_ms").unwrap(),
872            None
873        );
874    }
875
876    fn insert_withdrawal_outbox_row(
877        handler: &DatabaseHandler,
878        directive_id: &str,
879        wallet_address: WalletAddress,
880        account_address: WalletAddress,
881        created_ts_ms: i64,
882    ) {
883        #[derive(diesel::QueryableByName)]
884        struct CommandIdRow {
885            #[diesel(sql_type = BigInt)]
886            command_id: i64,
887        }
888
889        let mut conn = handler.pool().get().expect("db connection");
890        let request_uuid = uuid::Uuid::new_v4().to_string();
891        let command_id = diesel::sql_query(
892            r#"
893            INSERT INTO engine_commands (
894                received_ts_ms, request_uuid, command_type_enum, command_data
895            )
896            VALUES ($1, $2::uuid, 'CashWithdrawalUpdate', $3)
897            RETURNING command_id
898            "#,
899        )
900        .bind::<BigInt, _>(created_ts_ms)
901        .bind::<Text, _>(request_uuid)
902        .bind::<Binary, _>(b"test-command".as_slice())
903        .get_result::<CommandIdRow>(&mut conn)
904        .expect("insert engine command")
905        .command_id;
906
907        diesel::sql_query(
908            r#"
909            INSERT INTO directive_outbox (
910                directive_id, command_id, kind, action_key, wallet_address,
911                account_address, signer_address, directive_nonce, idempotency_key,
912                payload_hash, payload, domain_status, delivery_status, created_ts_ms
913            )
914            VALUES (
915                $1, $2, 'needs_rsm_signature', 'system_withdraw_token',
916                $3, $4, $5, $6, $7, $8, $9, 'pending_chain_effect',
917                'pending', $10
918            )
919            "#,
920        )
921        .bind::<Text, _>(directive_id)
922        .bind::<BigInt, _>(command_id)
923        .bind::<Binary, _>(wallet_address.as_bytes())
924        .bind::<Binary, _>(account_address.as_bytes())
925        .bind::<Binary, _>(wallet(9).as_bytes())
926        .bind::<BigInt, _>(created_ts_ms)
927        .bind::<Text, _>(format!("idempotency-{directive_id}"))
928        .bind::<Binary, _>(&[0_u8; 32][..])
929        .bind::<Binary, _>(b"payload".as_slice())
930        .bind::<BigInt, _>(created_ts_ms)
931        .execute(&mut conn)
932        .expect("insert directive outbox row");
933    }
934
935    #[tokio::test]
936    async fn withdrawal_history_filters_wallet_before_limit() {
937        let db = TestDb::new().await.expect("test db");
938        let target_wallet = wallet(1);
939        let target_account = wallet(2);
940        let other_wallet = wallet(3);
941
942        insert_withdrawal_outbox_row(&db.handler, "other-newest", other_wallet, wallet(4), 300);
943        insert_withdrawal_outbox_row(&db.handler, "other-next", other_wallet, wallet(5), 200);
944        insert_withdrawal_outbox_row(
945            &db.handler,
946            "target-oldest",
947            target_wallet,
948            target_account,
949            100,
950        );
951
952        let rows = db
953            .handler
954            .get_withdrawal_history_sync(&target_wallet, 1)
955            .expect("withdrawal history");
956
957        assert_eq!(rows.len(), 1);
958        assert_eq!(rows[0].directive_id, "target-oldest");
959    }
960
961    #[tokio::test]
962    async fn directive_outbox_delivery_metrics_include_retrying_withdrawals() {
963        let db = TestDb::new().await.expect("test db");
964        let now_ms = std::time::SystemTime::now()
965            .duration_since(std::time::UNIX_EPOCH)
966            .expect("system clock after epoch")
967            .as_millis() as i64;
968        let created_ts_ms = now_ms - 120_000;
969
970        insert_withdrawal_outbox_row(
971            &db.handler,
972            "retrying-withdrawal",
973            wallet(1),
974            wallet(2),
975            created_ts_ms,
976        );
977
978        {
979            let mut conn = db.handler.pool().get().expect("db connection");
980            diesel::sql_query(
981                r#"
982                UPDATE directive_outbox
983                SET delivery_attempts = 3,
984                    last_delivery_error = 'test delivery failure',
985                    last_attempt_at = CURRENT_TIMESTAMP - INTERVAL '90 seconds'
986                WHERE directive_id = 'retrying-withdrawal'
987                "#,
988            )
989            .execute(&mut conn)
990            .expect("mark directive retrying");
991        }
992
993        let rows = db
994            .handler
995            .list_directive_outbox_delivery_metrics_sync()
996            .expect("directive outbox metrics");
997        let row = rows
998            .iter()
999            .find(|row| {
1000                row.action_key == "system_withdraw_token" && row.delivery_status == "pending"
1001            })
1002            .expect("pending withdrawal metrics row");
1003
1004        assert_eq!(row.pending_count, 1);
1005        assert_eq!(row.retrying_count, 1);
1006        assert!(row.oldest_created_age_seconds >= 100);
1007        assert!(row.oldest_attempt_age_seconds >= 80);
1008    }
1009
1010    #[tokio::test]
1011    async fn directive_outbox_records_submitter_pointer_without_attempts() {
1012        let db = TestDb::new().await.expect("test db");
1013        let directive_id = "submitter-pointer";
1014        let submitter_address = address(9);
1015        let submitter_nonce = 42;
1016
1017        insert_withdrawal_outbox_row(&db.handler, directive_id, wallet(1), wallet(2), 100);
1018        db.handler
1019            .record_directive_submitter_submission_sync(
1020                directive_id,
1021                &submitter_address,
1022                submitter_nonce,
1023            )
1024            .expect("record submitter pointer");
1025
1026        let row = db
1027            .handler
1028            .get_directive_status_sync(directive_id)
1029            .expect("directive status")
1030            .expect("directive row");
1031
1032        assert_eq!(row.submitter_address, Some(submitter_address));
1033        assert_eq!(row.submitter_nonce, Some(submitter_nonce));
1034    }
1035
1036    #[tokio::test]
1037    async fn directive_outbox_submitter_pointer_errors_for_missing_directive() {
1038        let db = TestDb::new().await.expect("test db");
1039        let submitter_address = address(9);
1040
1041        let err = db
1042            .handler
1043            .record_directive_submitter_submission_sync("missing-directive", &submitter_address, 42)
1044            .expect_err("missing directive pointer update must fail");
1045
1046        assert!(
1047            err.to_string()
1048                .contains("expected exactly one directive_outbox row"),
1049            "{err}"
1050        );
1051    }
1052
1053    #[tokio::test]
1054    async fn async_directive_outbox_records_submitter_pointer_without_attempts() {
1055        let db = TestDb::new().await.expect("test db");
1056        let diesel_db = db.diesel_db().await;
1057        let directive_id = "async-submitter-pointer";
1058        let submitter_address = address(8);
1059        let submitter_nonce = 43;
1060
1061        insert_withdrawal_outbox_row(&db.handler, directive_id, wallet(1), wallet(2), 100);
1062        diesel_db
1063            .record_directive_submitter_submission(
1064                directive_id,
1065                &submitter_address,
1066                submitter_nonce,
1067            )
1068            .await
1069            .expect("record submitter pointer");
1070
1071        let row = diesel_db
1072            .get_directive_status(directive_id)
1073            .await
1074            .expect("directive status")
1075            .expect("directive row");
1076
1077        assert_eq!(row.submitter_address, Some(submitter_address));
1078        assert_eq!(row.submitter_nonce, Some(submitter_nonce));
1079    }
1080
1081    #[tokio::test]
1082    async fn async_directive_outbox_submitter_pointer_errors_for_missing_directive() {
1083        let db = TestDb::new().await.expect("test db");
1084        let diesel_db = db.diesel_db().await;
1085        let submitter_address = address(8);
1086
1087        let err = diesel_db
1088            .record_directive_submitter_submission(
1089                "missing-async-directive",
1090                &submitter_address,
1091                43,
1092            )
1093            .await
1094            .expect_err("missing directive pointer update must fail");
1095
1096        assert!(
1097            err.to_string()
1098                .contains("expected exactly one directive_outbox row"),
1099            "{err}"
1100        );
1101    }
1102
1103    #[tokio::test]
1104    async fn async_persist_directive_transaction_update_confirms_delivery() {
1105        let db = TestDb::new().await.expect("test db");
1106        let diesel_db = db.diesel_db().await;
1107        let directive_id = "async-confirmed-delivery";
1108        let tx_hash = "0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff";
1109
1110        insert_withdrawal_outbox_row(&db.handler, directive_id, wallet(1), wallet(2), 100);
1111        diesel_db
1112            .persist_directive_transaction_update(
1113                directive_id,
1114                TransactionStatus::Confirmed,
1115                Some(tx_hash),
1116                None,
1117            )
1118            .await
1119            .expect("persist confirmed delivery");
1120
1121        let row = diesel_db
1122            .get_directive_status(directive_id)
1123            .await
1124            .expect("directive status")
1125            .expect("directive row");
1126
1127        assert_eq!(row.domain_status, "completed");
1128        assert_eq!(row.delivery_status, "finalized");
1129        assert_eq!(row.tx_hash.as_deref(), Some(tx_hash));
1130    }
1131
1132    #[tokio::test]
1133    async fn recent_directive_outbox_rows_are_ordered_paginated_and_decoded_sync() {
1134        let db = TestDb::new().await.expect("test db");
1135
1136        insert_withdrawal_outbox_row(&db.handler, "oldest", wallet(1), wallet(2), 100);
1137        insert_withdrawal_outbox_row(&db.handler, "middle", wallet(3), wallet(4), 200);
1138        insert_withdrawal_outbox_row(&db.handler, "newest", wallet(5), wallet(6), 300);
1139
1140        {
1141            let mut conn = db.handler.pool().get().expect("db connection");
1142            diesel::sql_query(
1143                r#"
1144                UPDATE directive_outbox
1145                SET delivery_attempts = 7,
1146                    last_delivery_error = 'rpc timeout',
1147                    last_attempt_at = to_timestamp(1),
1148                    expires_at_ms = 12345,
1149                    tx_hash = '0xabc'
1150                WHERE directive_id = 'newest'
1151                "#,
1152            )
1153            .execute(&mut conn)
1154            .expect("mark newest directive delivery metadata");
1155        }
1156
1157        let rows = db
1158            .handler
1159            .list_recent_directive_outbox_rows_sync(2, 0)
1160            .expect("recent directive outbox rows");
1161
1162        assert_eq!(
1163            rows.iter()
1164                .map(|row| row.directive_id.as_str())
1165                .collect::<Vec<_>>(),
1166            vec!["newest", "middle"]
1167        );
1168        assert_eq!(rows[0].wallet_address, wallet(5));
1169        assert_eq!(rows[0].account_address, wallet(6));
1170        assert_eq!(rows[0].signer_address, wallet(9));
1171        assert_eq!(rows[0].delivery_attempts, 7);
1172        assert_eq!(rows[0].last_delivery_error.as_deref(), Some("rpc timeout"));
1173        assert_eq!(rows[0].last_attempt_ts_ms, Some(1000));
1174        assert_eq!(rows[0].expires_at_ms, Some(12345));
1175        assert_eq!(rows[0].tx_hash.as_deref(), Some("0xabc"));
1176
1177        let offset_rows = db
1178            .handler
1179            .list_recent_directive_outbox_rows_sync(1, 1)
1180            .expect("offset directive outbox rows");
1181        assert_eq!(offset_rows[0].directive_id, "middle");
1182    }
1183
1184    #[tokio::test]
1185    async fn recent_directive_outbox_rows_are_ordered_paginated_and_decoded_async() {
1186        let db = TestDb::new().await.expect("test db");
1187        let diesel_db = db.diesel_db().await;
1188
1189        insert_withdrawal_outbox_row(&db.handler, "oldest", wallet(1), wallet(2), 100);
1190        insert_withdrawal_outbox_row(&db.handler, "middle", wallet(3), wallet(4), 200);
1191        insert_withdrawal_outbox_row(&db.handler, "newest", wallet(5), wallet(6), 300);
1192
1193        let rows = diesel_db
1194            .list_recent_directive_outbox_rows(2, 1)
1195            .await
1196            .expect("recent directive outbox rows");
1197
1198        assert_eq!(
1199            rows.iter()
1200                .map(|row| row.directive_id.as_str())
1201                .collect::<Vec<_>>(),
1202            vec!["middle", "oldest"]
1203        );
1204        assert_eq!(rows[0].wallet_address, wallet(3));
1205        assert_eq!(rows[0].account_address, wallet(4));
1206        assert_eq!(rows[0].delivery_attempts, 0);
1207    }
1208
1209    #[tokio::test]
1210    async fn expired_withdrawal_update_requires_manual_reconciliation_without_refund() {
1211        #[derive(diesel::QueryableByName)]
1212        struct StatusRow {
1213            #[diesel(sql_type = Text)]
1214            domain_status: String,
1215            #[diesel(sql_type = Text)]
1216            delivery_status: String,
1217        }
1218
1219        #[derive(diesel::QueryableByName)]
1220        struct CashLedgerRow {
1221            #[diesel(sql_type = Text)]
1222            status: String,
1223            #[diesel(sql_type = Nullable<Text>)]
1224            error: Option<String>,
1225        }
1226
1227        #[derive(diesel::QueryableByName)]
1228        struct CountRow {
1229            #[diesel(sql_type = BigInt)]
1230            count: i64,
1231        }
1232
1233        let db = TestDb::new().await.expect("test db");
1234        let target_wallet = wallet(1);
1235        let target_account = wallet(2);
1236        let directive_id = "expired-withdrawal";
1237        let error = "directive expired before delivery";
1238
1239        insert_withdrawal_outbox_row(
1240            &db.handler,
1241            directive_id,
1242            target_wallet,
1243            target_account,
1244            100,
1245        );
1246
1247        {
1248            let mut conn = db.handler.pool().get().expect("db connection");
1249            diesel::sql_query(
1250                r#"
1251                INSERT INTO hypercore_cash_ledger_events (
1252                    wallet, event_hash, event_time_ms, delta_type, amount_usdc, status
1253                )
1254                VALUES ($1, $2, $3, 'withdraw', $4, 'submitted')
1255                "#,
1256            )
1257            .bind::<Binary, _>(target_wallet.as_bytes())
1258            .bind::<Text, _>(directive_id)
1259            .bind::<BigInt, _>(100_i64)
1260            .bind::<Numeric, _>(Decimal::new(1250, 2))
1261            .execute(&mut conn)
1262            .expect("insert cash withdrawal ledger event");
1263        }
1264
1265        db.handler
1266            .persist_directive_transaction_update_sync(
1267                directive_id,
1268                TransactionStatus::Expired,
1269                None,
1270                Some(error),
1271            )
1272            .expect("persist expired update");
1273
1274        let mut conn = db.handler.pool().get().expect("db connection");
1275        let status = diesel::sql_query(
1276            "SELECT domain_status, delivery_status FROM directive_outbox WHERE directive_id = $1",
1277        )
1278        .bind::<Text, _>(directive_id)
1279        .get_result::<StatusRow>(&mut conn)
1280        .expect("directive status");
1281        assert_eq!(status.domain_status, "failed");
1282        assert_eq!(status.delivery_status, "expired");
1283
1284        let cash_ledger = diesel::sql_query(
1285            "SELECT status, error FROM hypercore_cash_ledger_events WHERE event_hash = $1",
1286        )
1287        .bind::<Text, _>(directive_id)
1288        .get_result::<CashLedgerRow>(&mut conn)
1289        .expect("cash ledger row");
1290        assert_eq!(cash_ledger.status, "submitted");
1291        assert_eq!(cash_ledger.error, None);
1292
1293        let wallet_balance_rows = diesel::sql_query(
1294            "SELECT COUNT(*)::bigint AS count FROM account_balances WHERE account_address = $1",
1295        )
1296        .bind::<Binary, _>(target_wallet.as_bytes())
1297        .get_result::<CountRow>(&mut conn)
1298        .expect("wallet account balance count");
1299        assert_eq!(wallet_balance_rows.count, 0);
1300
1301        let account_balance_rows = diesel::sql_query(
1302            "SELECT COUNT(*)::bigint AS count FROM account_balances WHERE account_address = $1",
1303        )
1304        .bind::<Binary, _>(target_account.as_bytes())
1305        .get_result::<CountRow>(&mut conn)
1306        .expect("account account balance count");
1307        assert_eq!(account_balance_rows.count, 0);
1308
1309        let ledger_events = diesel::sql_query(
1310            "SELECT COUNT(*)::bigint AS count FROM ledger_events WHERE wallet = $1 AND event_type = 'withdraw_failed' AND reference_symbol = $2",
1311        )
1312        .bind::<Binary, _>(target_wallet.as_bytes())
1313        .bind::<Text, _>(directive_id)
1314        .get_result::<CountRow>(&mut conn)
1315        .expect("withdraw_failed ledger event count");
1316        assert_eq!(ledger_events.count, 0);
1317    }
1318
1319    #[tokio::test]
1320    async fn manual_reconciliation_status_does_not_refund_submitted_withdrawal() {
1321        #[derive(diesel::QueryableByName)]
1322        struct StatusRow {
1323            #[diesel(sql_type = Text)]
1324            domain_status: String,
1325            #[diesel(sql_type = Text)]
1326            delivery_status: String,
1327            #[diesel(sql_type = Nullable<Text>)]
1328            last_delivery_error: Option<String>,
1329        }
1330
1331        #[derive(diesel::QueryableByName)]
1332        struct CashLedgerRow {
1333            #[diesel(sql_type = Text)]
1334            status: String,
1335        }
1336
1337        #[derive(diesel::QueryableByName)]
1338        struct CountRow {
1339            #[diesel(sql_type = BigInt)]
1340            count: i64,
1341        }
1342
1343        #[derive(diesel::QueryableByName)]
1344        struct OutboxSeqRow {
1345            #[diesel(sql_type = BigInt)]
1346            outbox_seq: i64,
1347        }
1348
1349        let db = TestDb::new().await.expect("test db");
1350        let target_wallet = wallet(1);
1351        let target_account = wallet(2);
1352        let directive_id = "manual-reconciliation-withdrawal";
1353        let error = "RSM nonce 7 is already used before directive submission; on-chain outcome is unknown and requires manual reconciliation";
1354
1355        insert_withdrawal_outbox_row(
1356            &db.handler,
1357            directive_id,
1358            target_wallet,
1359            target_account,
1360            100,
1361        );
1362
1363        {
1364            let mut conn = db.handler.pool().get().expect("db connection");
1365            diesel::sql_query(
1366                r#"
1367                INSERT INTO hypercore_cash_ledger_events (
1368                    wallet, event_hash, event_time_ms, delta_type, amount_usdc, status
1369                )
1370                VALUES ($1, $2, $3, 'withdraw', $4, 'submitted')
1371                "#,
1372            )
1373            .bind::<Binary, _>(target_wallet.as_bytes())
1374            .bind::<Text, _>(directive_id)
1375            .bind::<BigInt, _>(100_i64)
1376            .bind::<Numeric, _>(Decimal::new(1250, 2))
1377            .execute(&mut conn)
1378            .expect("insert cash withdrawal ledger event");
1379        }
1380
1381        let outbox_seq = {
1382            let mut conn = db.handler.pool().get().expect("db connection");
1383            diesel::sql_query("SELECT outbox_seq FROM directive_outbox WHERE directive_id = $1")
1384                .bind::<Text, _>(directive_id)
1385                .get_result::<OutboxSeqRow>(&mut conn)
1386                .expect("outbox seq")
1387                .outbox_seq
1388        };
1389
1390        db.handler
1391            .mark_directive_outbox_manual_reconciliation_sync(outbox_seq, error)
1392            .expect("mark manual reconciliation");
1393
1394        let mut conn = db.handler.pool().get().expect("db connection");
1395        let status = diesel::sql_query(
1396            "SELECT domain_status, delivery_status, last_delivery_error FROM directive_outbox WHERE directive_id = $1",
1397        )
1398        .bind::<Text, _>(directive_id)
1399        .get_result::<StatusRow>(&mut conn)
1400        .expect("directive status");
1401        assert_eq!(status.domain_status, "pending_chain_effect");
1402        assert_eq!(status.delivery_status, "dead_lettered");
1403        assert_eq!(status.last_delivery_error.as_deref(), Some(error));
1404
1405        let cash_ledger = diesel::sql_query(
1406            "SELECT status FROM hypercore_cash_ledger_events WHERE event_hash = $1",
1407        )
1408        .bind::<Text, _>(directive_id)
1409        .get_result::<CashLedgerRow>(&mut conn)
1410        .expect("cash ledger row");
1411        assert_eq!(cash_ledger.status, "submitted");
1412
1413        let balance_rows = diesel::sql_query(
1414            "SELECT COUNT(*)::bigint AS count FROM account_balances WHERE account_address = $1",
1415        )
1416        .bind::<Binary, _>(target_wallet.as_bytes())
1417        .get_result::<CountRow>(&mut conn)
1418        .expect("account balance count");
1419        assert_eq!(balance_rows.count, 0);
1420
1421        let ledger_events = diesel::sql_query(
1422            "SELECT COUNT(*)::bigint AS count FROM ledger_events WHERE wallet = $1 AND event_type = 'withdraw_failed' AND reference_symbol = $2",
1423        )
1424        .bind::<Binary, _>(target_wallet.as_bytes())
1425        .bind::<Text, _>(directive_id)
1426        .get_result::<CountRow>(&mut conn)
1427        .expect("withdraw_failed ledger event count");
1428        assert_eq!(ledger_events.count, 0);
1429    }
1430
1431    #[tokio::test]
1432    async fn failed_status_requires_manual_reconciliation_without_refund() {
1433        #[derive(diesel::QueryableByName)]
1434        struct CashLedgerRow {
1435            #[diesel(sql_type = Text)]
1436            status: String,
1437        }
1438
1439        #[derive(diesel::QueryableByName)]
1440        struct CountRow {
1441            #[diesel(sql_type = BigInt)]
1442            count: i64,
1443        }
1444
1445        let db = TestDb::new().await.expect("test db");
1446        let target_wallet = wallet(1);
1447        let target_account = wallet(2);
1448        let directive_id = "failed-manual-reconciliation-withdrawal";
1449
1450        insert_withdrawal_outbox_row(
1451            &db.handler,
1452            directive_id,
1453            target_wallet,
1454            target_account,
1455            100,
1456        );
1457
1458        {
1459            let mut conn = db.handler.pool().get().expect("db connection");
1460            diesel::sql_query(
1461                r#"
1462                INSERT INTO hypercore_cash_ledger_events (
1463                    wallet, event_hash, event_time_ms, delta_type, amount_usdc, status
1464                )
1465                VALUES ($1, $2, $3, 'withdraw', $4, 'submitted')
1466                "#,
1467            )
1468            .bind::<Binary, _>(target_wallet.as_bytes())
1469            .bind::<Text, _>(directive_id)
1470            .bind::<BigInt, _>(100_i64)
1471            .bind::<Numeric, _>(Decimal::new(1250, 2))
1472            .execute(&mut conn)
1473            .expect("insert cash withdrawal ledger event");
1474        }
1475
1476        db.handler
1477            .persist_directive_transaction_update_sync(
1478                directive_id,
1479                TransactionStatus::Failed,
1480                None,
1481                Some("failed without proven no-chain-effect"),
1482            )
1483            .expect("persist failed update");
1484
1485        let mut conn = db.handler.pool().get().expect("db connection");
1486        let cash_ledger = diesel::sql_query(
1487            "SELECT status FROM hypercore_cash_ledger_events WHERE event_hash = $1",
1488        )
1489        .bind::<Text, _>(directive_id)
1490        .get_result::<CashLedgerRow>(&mut conn)
1491        .expect("cash ledger row");
1492        assert_eq!(cash_ledger.status, "submitted");
1493
1494        let balance_rows = diesel::sql_query(
1495            "SELECT COUNT(*)::bigint AS count FROM account_balances WHERE account_address = $1",
1496        )
1497        .bind::<Binary, _>(target_wallet.as_bytes())
1498        .get_result::<CountRow>(&mut conn)
1499        .expect("account balance count");
1500        assert_eq!(balance_rows.count, 0);
1501
1502        let ledger_events = diesel::sql_query(
1503            "SELECT COUNT(*)::bigint AS count FROM ledger_events WHERE wallet = $1 AND event_type = 'withdraw_failed' AND reference_symbol = $2",
1504        )
1505        .bind::<Binary, _>(target_wallet.as_bytes())
1506        .bind::<Text, _>(directive_id)
1507        .get_result::<CountRow>(&mut conn)
1508        .expect("withdraw_failed ledger event count");
1509        assert_eq!(ledger_events.count, 0);
1510    }
1511}
1512
1513impl hypercall_db::DirectiveOutboxWriter for DatabaseHandler {
1514    fn mark_directive_outbox_delivery_failed_sync(
1515        &self,
1516        outbox_seq: i64,
1517        error: &str,
1518    ) -> Result<()> {
1519        let mut conn = self.pool().get()?;
1520        diesel::sql_query(
1521            r#"
1522            UPDATE directive_outbox
1523            SET delivery_status = 'pending',
1524                last_delivery_error = $2,
1525                updated_at = CURRENT_TIMESTAMP
1526            WHERE outbox_seq = $1
1527              AND delivery_status NOT IN ('finalized', 'reverted', 'expired', 'dead_lettered')
1528            "#,
1529        )
1530        .bind::<BigInt, _>(outbox_seq)
1531        .bind::<Text, _>(error)
1532        .execute(&mut conn)?;
1533        metrics::counter!("ht_directive_outbox_delivery_failures_total").increment(1);
1534        Ok(())
1535    }
1536
1537    fn mark_directive_outbox_dead_lettered_sync(&self, outbox_seq: i64, error: &str) -> Result<()> {
1538        let mut conn = self.pool().get()?;
1539        diesel::sql_query(
1540            r#"
1541            UPDATE directive_outbox
1542            SET delivery_status = 'dead_lettered',
1543                domain_status = 'failed',
1544                last_delivery_error = $2,
1545                updated_at = CURRENT_TIMESTAMP
1546            WHERE outbox_seq = $1
1547              AND delivery_status NOT IN ('finalized', 'reverted', 'expired', 'dead_lettered')
1548            "#,
1549        )
1550        .bind::<BigInt, _>(outbox_seq)
1551        .bind::<Text, _>(error)
1552        .execute(&mut conn)?;
1553        Ok(())
1554    }
1555
1556    fn mark_directive_outbox_manual_reconciliation_sync(
1557        &self,
1558        outbox_seq: i64,
1559        error: &str,
1560    ) -> Result<()> {
1561        let mut conn = self.pool().get()?;
1562        diesel::sql_query(
1563            r#"
1564            UPDATE directive_outbox
1565            SET delivery_status = 'dead_lettered',
1566                last_delivery_error = $2,
1567                updated_at = CURRENT_TIMESTAMP
1568            WHERE outbox_seq = $1
1569              AND delivery_status NOT IN ('finalized', 'reverted', 'expired', 'dead_lettered')
1570            "#,
1571        )
1572        .bind::<BigInt, _>(outbox_seq)
1573        .bind::<Text, _>(error)
1574        .execute(&mut conn)?;
1575        Ok(())
1576    }
1577
1578    fn record_directive_submitter_submission_sync(
1579        &self,
1580        request_id: &str,
1581        submitter_address: &Address,
1582        submitter_nonce: u64,
1583    ) -> Result<()> {
1584        let submitter_nonce = i64::try_from(submitter_nonce).map_err(|_| {
1585            anyhow::anyhow!("submitter nonce {submitter_nonce} does not fit in BIGINT")
1586        })?;
1587        use crate::schema::directive_outbox::dsl as outbox_dsl;
1588
1589        let mut conn = self.pool().get()?;
1590        let updated = diesel::update(
1591            outbox_dsl::directive_outbox.filter(outbox_dsl::directive_id.eq(request_id)),
1592        )
1593        .set((
1594            outbox_dsl::submitter_address.eq(Some(submitter_address.as_slice())),
1595            outbox_dsl::submitter_nonce.eq(Some(submitter_nonce)),
1596            outbox_dsl::updated_at.eq(diesel::dsl::now),
1597        ))
1598        .execute(&mut conn)?;
1599        anyhow::ensure!(
1600            updated == 1,
1601            "expected exactly one directive_outbox row for directive_id={request_id}, updated {updated}"
1602        );
1603        Ok(())
1604    }
1605
1606    fn persist_directive_transaction_update_sync(
1607        &self,
1608        request_id: &str,
1609        status: TransactionStatus,
1610        tx_hash: Option<&str>,
1611        error: Option<&str>,
1612    ) -> Result<()> {
1613        let (domain_status, delivery_status): (Option<&str>, Option<&str>) = match status {
1614            TransactionStatus::Submitted => (None, Some("pending")),
1615            TransactionStatus::Pending => (None, Some("pending")),
1616            TransactionStatus::Confirmed => (Some("completed"), Some("finalized")),
1617            TransactionStatus::Failed => (Some("failed"), Some("reverted")),
1618            TransactionStatus::Expired => (Some("failed"), Some("expired")),
1619        };
1620
1621        let mut conn = self.pool().get()?;
1622        if let Some(domain_status) = domain_status {
1623            #[derive(diesel::QueryableByName)]
1624            struct ActionKeyRow {
1625                #[diesel(sql_type = DirectiveActionKeySql)]
1626                action_key: DirectiveActionKeyDb,
1627            }
1628
1629            let updated_row: Option<ActionKeyRow> = diesel::sql_query(
1630                r#"
1631                WITH updated_outbox AS (
1632                    UPDATE directive_outbox
1633                    SET domain_status = $2,
1634                        delivery_status = $3,
1635                        tx_hash = COALESCE($4, tx_hash),
1636                        last_delivery_error = $5,
1637                        updated_at = CURRENT_TIMESTAMP
1638                    WHERE directive_id = $1
1639                      AND delivery_status NOT IN ('finalized', 'reverted', 'expired', 'dead_lettered')
1640                    RETURNING directive_id, action_key
1641                )
1642                SELECT action_key FROM updated_outbox
1643                "#,
1644            )
1645            .bind::<Text, _>(request_id)
1646            .bind::<Text, _>(domain_status)
1647            .bind::<Text, _>(
1648                delivery_status.expect("terminal update must have delivery_status"),
1649            )
1650            .bind::<Nullable<Text>, _>(tx_hash)
1651            .bind::<Nullable<Text>, _>(error)
1652            .get_result(&mut conn)
1653            .optional()?;
1654
1655            if matches!(
1656                status,
1657                TransactionStatus::Failed | TransactionStatus::Expired
1658            ) {
1659                if let Some(row) = updated_row {
1660                    let action_key: hypercall_types::directives::ActionKey = row.action_key.into();
1661                    if matches!(
1662                        action_key,
1663                        hypercall_types::directives::ActionKey::SystemWithdrawToken
1664                            | hypercall_types::directives::ActionKey::SystemCreditOption
1665                    ) {
1666                        metrics::counter!(
1667                            "ht_withdrawal_manual_reconciliation_required_total",
1668                            "action_key" => action_key.as_str(),
1669                        )
1670                        .increment(1);
1671                        tracing::warn!(
1672                            directive_id = %request_id,
1673                            action_key = ?action_key,
1674                            "Withdrawal directive reached terminal delivery status; automatic refunds are disabled and operator reconciliation is required"
1675                        );
1676                    }
1677                }
1678            }
1679        } else if let Some(delivery_status) = delivery_status {
1680            diesel::sql_query(
1681                r#"
1682                UPDATE directive_outbox
1683                SET delivery_status = $2,
1684                    tx_hash = COALESCE($3, tx_hash),
1685                    last_delivery_error = $4,
1686                    updated_at = CURRENT_TIMESTAMP
1687                WHERE directive_id = $1
1688                  AND delivery_status NOT IN ('finalized', 'reverted', 'expired', 'dead_lettered')
1689                "#,
1690            )
1691            .bind::<Text, _>(request_id)
1692            .bind::<Text, _>(delivery_status)
1693            .bind::<Nullable<Text>, _>(tx_hash)
1694            .bind::<Nullable<Text>, _>(error)
1695            .execute(&mut conn)?;
1696        }
1697        Ok(())
1698    }
1699}