Skip to main content

hypercall/
directive_outbox.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use tokio::sync::broadcast;
5use tracing::{error, info};
6
7use crate::shared::order_types::get_timestamp_millis;
8pub use hypercall_db::{
9    DirectiveDeliveryStatus, DirectiveDomainStatus, DirectiveOutboxAppend, DirectiveOutboxKind,
10    DirectiveOutboxRow,
11};
12use hypercall_db::{DirectiveOutboxReader, DirectiveOutboxWriter};
13use hypercall_signer::{RsmSigner, RsmSignerError, SignedDirective};
14use hypercall_types::directives::ActionKey;
15use hypercall_types::TransactionStatus;
16use hypercall_types::{
17    EngineMessage, SignedDirectiveTx, TransactionRequest, TransactionType, TransactionUpdate,
18};
19
20pub struct DirectiveDeliverySequencer {
21    db: Arc<hypercall_db_diesel::DatabaseHandler>,
22    rsm_signer: Arc<dyn RsmSigner>,
23    transaction_request_sender: tokio::sync::mpsc::UnboundedSender<EngineMessage>,
24    poll_interval: Duration,
25}
26
27impl DirectiveDeliverySequencer {
28    pub fn new(
29        db: Arc<hypercall_db_diesel::DatabaseHandler>,
30        rsm_signer: Arc<dyn RsmSigner>,
31        transaction_request_sender: tokio::sync::mpsc::UnboundedSender<EngineMessage>,
32        poll_ms: u64,
33    ) -> Self {
34        Self {
35            db,
36            rsm_signer,
37            transaction_request_sender,
38            poll_interval: Duration::from_millis(poll_ms.max(1)),
39        }
40    }
41
42    pub fn start_with_shutdown(
43        self: Arc<Self>,
44        mut shutdown_rx: broadcast::Receiver<()>,
45    ) -> tokio::task::JoinHandle<()> {
46        tokio::spawn(async move {
47            info!("Starting directive delivery sequencer");
48            loop {
49                tokio::select! {
50                    _ = shutdown_rx.recv() => {
51                        info!("Directive delivery sequencer received shutdown signal");
52                        break;
53                    }
54                    _ = tokio::time::sleep(self.poll_interval) => {
55                        if let Err(error) = self.drain_once().await {
56                            error!(%error, "directive delivery sequencer drain failed");
57                        }
58                    }
59                }
60            }
61            info!("Directive delivery sequencer stopped");
62        })
63    }
64
65    async fn drain_once(&self) -> anyhow::Result<()> {
66        let Some(row) = self.db.claim_next_directive_outbox_item_sync()? else {
67            return Ok(());
68        };
69
70        if row.kind != DirectiveOutboxKind::NeedsRsmSignature.as_str() {
71            let error = format!("unsupported directive outbox kind {}", row.kind);
72            self.dead_letter_directive(&row, &error, DirectiveDeadLetterReason::UnsupportedKind)?;
73            return Ok(());
74        }
75
76        // Use sign_preallocated with the nonce persisted in the outbox row.
77        // The outbox append already allocated and persisted the nonce during
78        // journaling, so calling sign() would allocate a second nonce and
79        // desynchronize durable nonce state from directive metadata.
80        let signed = match self
81            .rsm_signer
82            .sign_preallocated(&row.directive_id, &row.account, &row.payload, row.nonce)
83            .await
84        {
85            Ok(signed) => signed,
86            Err(error) => {
87                if is_permanent_signing_error(&error) {
88                    self.dead_letter_directive(
89                        &row,
90                        &error.to_string(),
91                        DirectiveDeadLetterReason::from_signer_error(&error),
92                    )?;
93                } else {
94                    self.db.mark_directive_outbox_delivery_failed_sync(
95                        row.outbox_seq,
96                        &error.to_string(),
97                    )?;
98                }
99                return Err(anyhow::anyhow!(error.to_string()));
100            }
101        };
102
103        match self.rsm_signer.is_nonce_used(signed.nonce).await {
104            Ok(true) => {
105                let error = nonce_consumed_manual_reconciliation_error(signed.nonce);
106                tracing::warn!(
107                    directive_id = %row.directive_id,
108                    rsm_nonce = signed.nonce,
109                    "RSM nonce already appears consumed on-chain before directive submission, stopping retries for manual reconciliation"
110                );
111                metrics::counter!(
112                    "ht_directive_outbox_manual_reconciliation_required_total",
113                    "action_key" => row.action_key.as_str(),
114                    "reason_class" => "nonce_consumed",
115                )
116                .increment(1);
117                emit_withdrawal_manual_reconciliation_signal(&row.directive_id, row.action_key);
118                self.db
119                    .mark_directive_outbox_manual_reconciliation_sync(row.outbox_seq, &error)?;
120                return Ok(());
121            }
122            Ok(false) => {}
123            Err(error) => {
124                tracing::warn!(
125                    directive_id = %row.directive_id,
126                    %error,
127                    "failed to check RSM nonce on-chain, proceeding with submission"
128                );
129            }
130        }
131
132        let tx_request = build_rsm_transaction_request(&row, signed);
133        if self
134            .transaction_request_sender
135            .send(EngineMessage::TransactionRequest(tx_request))
136            .is_err()
137        {
138            self.db.mark_directive_outbox_delivery_failed_sync(
139                row.outbox_seq,
140                "transaction request sender closed",
141            )?;
142            return Err(anyhow::anyhow!("transaction request sender closed"));
143        }
144        Ok(())
145    }
146
147    fn dead_letter_directive(
148        &self,
149        row: &DirectiveOutboxRow,
150        error: &str,
151        reason: DirectiveDeadLetterReason,
152    ) -> anyhow::Result<()> {
153        metrics::counter!(
154            "ht_directive_outbox_dead_lettered_total",
155            "action_key" => row.action_key.as_str(),
156            "reason_class" => reason.as_str(),
157        )
158        .increment(1);
159        emit_withdrawal_manual_reconciliation_signal(&row.directive_id, row.action_key);
160        self.db
161            .mark_directive_outbox_dead_lettered_sync(row.outbox_seq, error)?;
162        let Some(update) = dead_lettered_withdrawal_update(row, error, get_timestamp_millis())
163        else {
164            return Ok(());
165        };
166        if self
167            .transaction_request_sender
168            .send(EngineMessage::TransactionUpdate(update))
169            .is_err()
170        {
171            return Err(anyhow::anyhow!(
172                "transaction update sender closed while dead-lettering directive {}",
173                row.directive_id
174            ));
175        }
176        Ok(())
177    }
178}
179
180fn is_permanent_signing_error(error: &RsmSignerError) -> bool {
181    matches!(
182        error,
183        RsmSignerError::InvalidAction(_)
184            | RsmSignerError::UnsupportedAction(_)
185            | RsmSignerError::IdempotencyConflict(_)
186    )
187}
188
189#[derive(Debug, Clone, Copy, PartialEq, Eq)]
190enum DirectiveDeadLetterReason {
191    UnsupportedKind,
192    InvalidAction,
193    UnsupportedAction,
194    IdempotencyConflict,
195}
196
197impl DirectiveDeadLetterReason {
198    fn from_signer_error(error: &RsmSignerError) -> Self {
199        match error {
200            RsmSignerError::InvalidAction(_) => Self::InvalidAction,
201            RsmSignerError::UnsupportedAction(_) => Self::UnsupportedAction,
202            RsmSignerError::IdempotencyConflict(_) => Self::IdempotencyConflict,
203            RsmSignerError::SigningFailed(_) | RsmSignerError::PersistenceFailed(_) => {
204                panic!("non-permanent signer error cannot be dead-lettered: {error}")
205            }
206        }
207    }
208
209    fn as_str(self) -> &'static str {
210        match self {
211            Self::UnsupportedKind => "unsupported_kind",
212            Self::InvalidAction => "invalid_action",
213            Self::UnsupportedAction => "unsupported_action",
214            Self::IdempotencyConflict => "idempotency_conflict",
215        }
216    }
217}
218
219fn nonce_consumed_manual_reconciliation_error(nonce: u64) -> String {
220    format!(
221        "RSM nonce {nonce} is already used before directive submission; on-chain outcome is unknown and requires manual reconciliation"
222    )
223}
224
225fn emit_withdrawal_manual_reconciliation_signal(directive_id: &str, action_key: ActionKey) {
226    if !matches!(
227        action_key,
228        ActionKey::SystemWithdrawToken | ActionKey::SystemCreditOption
229    ) {
230        return;
231    }
232
233    metrics::counter!(
234        "ht_withdrawal_manual_reconciliation_required_total",
235        "action_key" => action_key.as_str(),
236    )
237    .increment(1);
238    tracing::warn!(
239        directive_id = %directive_id,
240        action_key = ?action_key,
241        "Withdrawal directive requires manual reconciliation; automatic refunds are disabled"
242    );
243}
244
245fn dead_lettered_withdrawal_update(
246    row: &DirectiveOutboxRow,
247    error: &str,
248    timestamp: u64,
249) -> Option<TransactionUpdate> {
250    if !matches!(
251        row.action_key,
252        ActionKey::SystemWithdrawToken | ActionKey::SystemCreditOption
253    ) {
254        return None;
255    }
256
257    Some(TransactionUpdate {
258        request_id: row.directive_id.clone(),
259        status: TransactionStatus::Failed,
260        tx_hash: None,
261        error: Some(error.to_string()),
262        timestamp,
263        gas_used: None,
264        gas_price: None,
265    })
266}
267
268fn build_rsm_transaction_request(
269    row: &DirectiveOutboxRow,
270    signed: SignedDirective,
271) -> TransactionRequest {
272    let timestamp = get_timestamp_millis();
273    TransactionRequest {
274        request_id: row.directive_id.clone(),
275        wallet_address: row.account,
276        account_contract: row.account,
277        transaction_type: TransactionType::RsmDirective(SignedDirectiveTx {
278            directive: signed.directive,
279            signature: signed.signature,
280        }),
281        timestamp,
282        // Outbox rows represent domain-accepted work that still needs a chain
283        // effect. The original request expiry must not turn transient delivery
284        // failures into terminal failures, so each submit attempt gets only a
285        // fresh in-process guard.
286        expires_at: timestamp.saturating_add(300_000),
287    }
288}
289
290#[cfg(test)]
291mod tests {
292    use super::*;
293    use hypercall_types::wallet_address::test_wallet;
294
295    #[test]
296    fn only_permanent_signing_errors_dead_letter_outbox_rows() {
297        assert!(is_permanent_signing_error(&RsmSignerError::InvalidAction(
298            "malformed action".to_string()
299        )));
300        assert!(is_permanent_signing_error(
301            &RsmSignerError::UnsupportedAction("unknown action".to_string())
302        ));
303        assert!(is_permanent_signing_error(
304            &RsmSignerError::IdempotencyConflict("different payload".to_string())
305        ));
306        assert!(!is_permanent_signing_error(&RsmSignerError::SigningFailed(
307            "relayer unavailable".to_string()
308        )));
309        assert!(!is_permanent_signing_error(
310            &RsmSignerError::PersistenceFailed("db unavailable".to_string())
311        ));
312    }
313
314    #[test]
315    fn dead_letter_reason_uses_signer_error_variants() {
316        assert_eq!(
317            DirectiveDeadLetterReason::from_signer_error(&RsmSignerError::InvalidAction(
318                "malformed action".to_string()
319            ))
320            .as_str(),
321            "invalid_action"
322        );
323        assert_eq!(
324            DirectiveDeadLetterReason::from_signer_error(&RsmSignerError::UnsupportedAction(
325                "unknown action".to_string()
326            ))
327            .as_str(),
328            "unsupported_action"
329        );
330        assert_eq!(
331            DirectiveDeadLetterReason::from_signer_error(&RsmSignerError::IdempotencyConflict(
332                "different payload".to_string()
333            ))
334            .as_str(),
335            "idempotency_conflict"
336        );
337    }
338
339    fn test_outbox_row(action_key: ActionKey) -> DirectiveOutboxRow {
340        DirectiveOutboxRow {
341            outbox_seq: 1,
342            directive_id: "directive-1".to_string(),
343            kind: DirectiveOutboxKind::NeedsRsmSignature.as_str().to_string(),
344            action_key,
345            account: test_wallet(1),
346            signer: test_wallet(2),
347            nonce: 7,
348            payload: vec![1, 2, 3],
349            expires_at_ms: None,
350        }
351    }
352
353    #[test]
354    fn dead_lettered_withdrawals_emit_failed_transaction_update() {
355        let cash = dead_lettered_withdrawal_update(
356            &test_outbox_row(ActionKey::SystemWithdrawToken),
357            "bad directive",
358            123,
359        )
360        .expect("cash withdrawal should emit an update");
361        assert_eq!(cash.request_id, "directive-1");
362        assert_eq!(cash.status, TransactionStatus::Failed);
363        assert_eq!(cash.error.as_deref(), Some("bad directive"));
364        assert_eq!(cash.timestamp, 123);
365
366        let option = dead_lettered_withdrawal_update(
367            &test_outbox_row(ActionKey::SystemCreditOption),
368            "bad option directive",
369            456,
370        )
371        .expect("option withdrawal should emit an update");
372        assert_eq!(option.status, TransactionStatus::Failed);
373        assert_eq!(option.error.as_deref(), Some("bad option directive"));
374        assert_eq!(option.timestamp, 456);
375    }
376
377    #[test]
378    fn dead_lettered_non_withdrawals_do_not_emit_transaction_update() {
379        assert!(dead_lettered_withdrawal_update(
380            &test_outbox_row(ActionKey::RsmHlLimitOrder),
381            "bad order",
382            123,
383        )
384        .is_none());
385    }
386
387    #[test]
388    fn consumed_nonce_error_requires_manual_reconciliation() {
389        let error = nonce_consumed_manual_reconciliation_error(7);
390        assert!(error.contains("already used"));
391        assert!(error.contains("manual reconciliation"));
392    }
393
394    #[test]
395    fn rsm_transaction_request_uses_fresh_delivery_deadline() {
396        let mut row = test_outbox_row(ActionKey::SystemWithdrawToken);
397        row.nonce = 10;
398        row.expires_at_ms = Some(1);
399        let signed = SignedDirective {
400            request_id: row.directive_id.clone(),
401            account: row.account,
402            nonce: row.nonce,
403            directive: vec![1, 2, 3],
404            signature: "0x1234".to_string(),
405        };
406
407        let tx_request = build_rsm_transaction_request(&row, signed);
408
409        assert!(
410            tx_request.expires_at > tx_request.timestamp,
411            "delivery request should have a fresh in-process deadline"
412        );
413        assert_ne!(
414            tx_request.expires_at,
415            row.expires_at_ms.unwrap(),
416            "domain request expiry must not terminal-expire accepted chain delivery"
417        );
418    }
419}