1use std::sync::Arc;
2use std::time::Duration;
3
4use tokio::sync::broadcast;
5use tracing::{error, info};
6
7use crate::shared::order_types::get_timestamp_millis;
8pub use hypercall_db::{
9 DirectiveDeliveryStatus, DirectiveDomainStatus, DirectiveOutboxAppend, DirectiveOutboxKind,
10 DirectiveOutboxRow,
11};
12use hypercall_db::{DirectiveOutboxReader, DirectiveOutboxWriter};
13use hypercall_signer::{RsmSigner, RsmSignerError, SignedDirective};
14use hypercall_types::directives::ActionKey;
15use hypercall_types::TransactionStatus;
16use hypercall_types::{
17 EngineMessage, SignedDirectiveTx, TransactionRequest, TransactionType, TransactionUpdate,
18};
19
20pub struct DirectiveDeliverySequencer {
21 db: Arc<hypercall_db_diesel::DatabaseHandler>,
22 rsm_signer: Arc<dyn RsmSigner>,
23 transaction_request_sender: tokio::sync::mpsc::UnboundedSender<EngineMessage>,
24 poll_interval: Duration,
25}
26
27impl DirectiveDeliverySequencer {
28 pub fn new(
29 db: Arc<hypercall_db_diesel::DatabaseHandler>,
30 rsm_signer: Arc<dyn RsmSigner>,
31 transaction_request_sender: tokio::sync::mpsc::UnboundedSender<EngineMessage>,
32 poll_ms: u64,
33 ) -> Self {
34 Self {
35 db,
36 rsm_signer,
37 transaction_request_sender,
38 poll_interval: Duration::from_millis(poll_ms.max(1)),
39 }
40 }
41
42 pub fn start_with_shutdown(
43 self: Arc<Self>,
44 mut shutdown_rx: broadcast::Receiver<()>,
45 ) -> tokio::task::JoinHandle<()> {
46 tokio::spawn(async move {
47 info!("Starting directive delivery sequencer");
48 loop {
49 tokio::select! {
50 _ = shutdown_rx.recv() => {
51 info!("Directive delivery sequencer received shutdown signal");
52 break;
53 }
54 _ = tokio::time::sleep(self.poll_interval) => {
55 if let Err(error) = self.drain_once().await {
56 error!(%error, "directive delivery sequencer drain failed");
57 }
58 }
59 }
60 }
61 info!("Directive delivery sequencer stopped");
62 })
63 }
64
65 async fn drain_once(&self) -> anyhow::Result<()> {
66 let Some(row) = self.db.claim_next_directive_outbox_item_sync()? else {
67 return Ok(());
68 };
69
70 if row.kind != DirectiveOutboxKind::NeedsRsmSignature.as_str() {
71 let error = format!("unsupported directive outbox kind {}", row.kind);
72 self.dead_letter_directive(&row, &error, DirectiveDeadLetterReason::UnsupportedKind)?;
73 return Ok(());
74 }
75
76 let signed = match self
81 .rsm_signer
82 .sign_preallocated(&row.directive_id, &row.account, &row.payload, row.nonce)
83 .await
84 {
85 Ok(signed) => signed,
86 Err(error) => {
87 if is_permanent_signing_error(&error) {
88 self.dead_letter_directive(
89 &row,
90 &error.to_string(),
91 DirectiveDeadLetterReason::from_signer_error(&error),
92 )?;
93 } else {
94 self.db.mark_directive_outbox_delivery_failed_sync(
95 row.outbox_seq,
96 &error.to_string(),
97 )?;
98 }
99 return Err(anyhow::anyhow!(error.to_string()));
100 }
101 };
102
103 match self.rsm_signer.is_nonce_used(signed.nonce).await {
104 Ok(true) => {
105 let error = nonce_consumed_manual_reconciliation_error(signed.nonce);
106 tracing::warn!(
107 directive_id = %row.directive_id,
108 rsm_nonce = signed.nonce,
109 "RSM nonce already appears consumed on-chain before directive submission, stopping retries for manual reconciliation"
110 );
111 metrics::counter!(
112 "ht_directive_outbox_manual_reconciliation_required_total",
113 "action_key" => row.action_key.as_str(),
114 "reason_class" => "nonce_consumed",
115 )
116 .increment(1);
117 emit_withdrawal_manual_reconciliation_signal(&row.directive_id, row.action_key);
118 self.db
119 .mark_directive_outbox_manual_reconciliation_sync(row.outbox_seq, &error)?;
120 return Ok(());
121 }
122 Ok(false) => {}
123 Err(error) => {
124 tracing::warn!(
125 directive_id = %row.directive_id,
126 %error,
127 "failed to check RSM nonce on-chain, proceeding with submission"
128 );
129 }
130 }
131
132 let tx_request = build_rsm_transaction_request(&row, signed);
133 if self
134 .transaction_request_sender
135 .send(EngineMessage::TransactionRequest(tx_request))
136 .is_err()
137 {
138 self.db.mark_directive_outbox_delivery_failed_sync(
139 row.outbox_seq,
140 "transaction request sender closed",
141 )?;
142 return Err(anyhow::anyhow!("transaction request sender closed"));
143 }
144 Ok(())
145 }
146
147 fn dead_letter_directive(
148 &self,
149 row: &DirectiveOutboxRow,
150 error: &str,
151 reason: DirectiveDeadLetterReason,
152 ) -> anyhow::Result<()> {
153 metrics::counter!(
154 "ht_directive_outbox_dead_lettered_total",
155 "action_key" => row.action_key.as_str(),
156 "reason_class" => reason.as_str(),
157 )
158 .increment(1);
159 emit_withdrawal_manual_reconciliation_signal(&row.directive_id, row.action_key);
160 self.db
161 .mark_directive_outbox_dead_lettered_sync(row.outbox_seq, error)?;
162 let Some(update) = dead_lettered_withdrawal_update(row, error, get_timestamp_millis())
163 else {
164 return Ok(());
165 };
166 if self
167 .transaction_request_sender
168 .send(EngineMessage::TransactionUpdate(update))
169 .is_err()
170 {
171 return Err(anyhow::anyhow!(
172 "transaction update sender closed while dead-lettering directive {}",
173 row.directive_id
174 ));
175 }
176 Ok(())
177 }
178}
179
180fn is_permanent_signing_error(error: &RsmSignerError) -> bool {
181 matches!(
182 error,
183 RsmSignerError::InvalidAction(_)
184 | RsmSignerError::UnsupportedAction(_)
185 | RsmSignerError::IdempotencyConflict(_)
186 )
187}
188
189#[derive(Debug, Clone, Copy, PartialEq, Eq)]
190enum DirectiveDeadLetterReason {
191 UnsupportedKind,
192 InvalidAction,
193 UnsupportedAction,
194 IdempotencyConflict,
195}
196
197impl DirectiveDeadLetterReason {
198 fn from_signer_error(error: &RsmSignerError) -> Self {
199 match error {
200 RsmSignerError::InvalidAction(_) => Self::InvalidAction,
201 RsmSignerError::UnsupportedAction(_) => Self::UnsupportedAction,
202 RsmSignerError::IdempotencyConflict(_) => Self::IdempotencyConflict,
203 RsmSignerError::SigningFailed(_) | RsmSignerError::PersistenceFailed(_) => {
204 panic!("non-permanent signer error cannot be dead-lettered: {error}")
205 }
206 }
207 }
208
209 fn as_str(self) -> &'static str {
210 match self {
211 Self::UnsupportedKind => "unsupported_kind",
212 Self::InvalidAction => "invalid_action",
213 Self::UnsupportedAction => "unsupported_action",
214 Self::IdempotencyConflict => "idempotency_conflict",
215 }
216 }
217}
218
219fn nonce_consumed_manual_reconciliation_error(nonce: u64) -> String {
220 format!(
221 "RSM nonce {nonce} is already used before directive submission; on-chain outcome is unknown and requires manual reconciliation"
222 )
223}
224
225fn emit_withdrawal_manual_reconciliation_signal(directive_id: &str, action_key: ActionKey) {
226 if !matches!(
227 action_key,
228 ActionKey::SystemWithdrawToken | ActionKey::SystemCreditOption
229 ) {
230 return;
231 }
232
233 metrics::counter!(
234 "ht_withdrawal_manual_reconciliation_required_total",
235 "action_key" => action_key.as_str(),
236 )
237 .increment(1);
238 tracing::warn!(
239 directive_id = %directive_id,
240 action_key = ?action_key,
241 "Withdrawal directive requires manual reconciliation; automatic refunds are disabled"
242 );
243}
244
245fn dead_lettered_withdrawal_update(
246 row: &DirectiveOutboxRow,
247 error: &str,
248 timestamp: u64,
249) -> Option<TransactionUpdate> {
250 if !matches!(
251 row.action_key,
252 ActionKey::SystemWithdrawToken | ActionKey::SystemCreditOption
253 ) {
254 return None;
255 }
256
257 Some(TransactionUpdate {
258 request_id: row.directive_id.clone(),
259 status: TransactionStatus::Failed,
260 tx_hash: None,
261 error: Some(error.to_string()),
262 timestamp,
263 gas_used: None,
264 gas_price: None,
265 })
266}
267
268fn build_rsm_transaction_request(
269 row: &DirectiveOutboxRow,
270 signed: SignedDirective,
271) -> TransactionRequest {
272 let timestamp = get_timestamp_millis();
273 TransactionRequest {
274 request_id: row.directive_id.clone(),
275 wallet_address: row.account,
276 account_contract: row.account,
277 transaction_type: TransactionType::RsmDirective(SignedDirectiveTx {
278 directive: signed.directive,
279 signature: signed.signature,
280 }),
281 timestamp,
282 expires_at: timestamp.saturating_add(300_000),
287 }
288}
289
290#[cfg(test)]
291mod tests {
292 use super::*;
293 use hypercall_types::wallet_address::test_wallet;
294
295 #[test]
296 fn only_permanent_signing_errors_dead_letter_outbox_rows() {
297 assert!(is_permanent_signing_error(&RsmSignerError::InvalidAction(
298 "malformed action".to_string()
299 )));
300 assert!(is_permanent_signing_error(
301 &RsmSignerError::UnsupportedAction("unknown action".to_string())
302 ));
303 assert!(is_permanent_signing_error(
304 &RsmSignerError::IdempotencyConflict("different payload".to_string())
305 ));
306 assert!(!is_permanent_signing_error(&RsmSignerError::SigningFailed(
307 "relayer unavailable".to_string()
308 )));
309 assert!(!is_permanent_signing_error(
310 &RsmSignerError::PersistenceFailed("db unavailable".to_string())
311 ));
312 }
313
314 #[test]
315 fn dead_letter_reason_uses_signer_error_variants() {
316 assert_eq!(
317 DirectiveDeadLetterReason::from_signer_error(&RsmSignerError::InvalidAction(
318 "malformed action".to_string()
319 ))
320 .as_str(),
321 "invalid_action"
322 );
323 assert_eq!(
324 DirectiveDeadLetterReason::from_signer_error(&RsmSignerError::UnsupportedAction(
325 "unknown action".to_string()
326 ))
327 .as_str(),
328 "unsupported_action"
329 );
330 assert_eq!(
331 DirectiveDeadLetterReason::from_signer_error(&RsmSignerError::IdempotencyConflict(
332 "different payload".to_string()
333 ))
334 .as_str(),
335 "idempotency_conflict"
336 );
337 }
338
339 fn test_outbox_row(action_key: ActionKey) -> DirectiveOutboxRow {
340 DirectiveOutboxRow {
341 outbox_seq: 1,
342 directive_id: "directive-1".to_string(),
343 kind: DirectiveOutboxKind::NeedsRsmSignature.as_str().to_string(),
344 action_key,
345 account: test_wallet(1),
346 signer: test_wallet(2),
347 nonce: 7,
348 payload: vec![1, 2, 3],
349 expires_at_ms: None,
350 }
351 }
352
353 #[test]
354 fn dead_lettered_withdrawals_emit_failed_transaction_update() {
355 let cash = dead_lettered_withdrawal_update(
356 &test_outbox_row(ActionKey::SystemWithdrawToken),
357 "bad directive",
358 123,
359 )
360 .expect("cash withdrawal should emit an update");
361 assert_eq!(cash.request_id, "directive-1");
362 assert_eq!(cash.status, TransactionStatus::Failed);
363 assert_eq!(cash.error.as_deref(), Some("bad directive"));
364 assert_eq!(cash.timestamp, 123);
365
366 let option = dead_lettered_withdrawal_update(
367 &test_outbox_row(ActionKey::SystemCreditOption),
368 "bad option directive",
369 456,
370 )
371 .expect("option withdrawal should emit an update");
372 assert_eq!(option.status, TransactionStatus::Failed);
373 assert_eq!(option.error.as_deref(), Some("bad option directive"));
374 assert_eq!(option.timestamp, 456);
375 }
376
377 #[test]
378 fn dead_lettered_non_withdrawals_do_not_emit_transaction_update() {
379 assert!(dead_lettered_withdrawal_update(
380 &test_outbox_row(ActionKey::RsmHlLimitOrder),
381 "bad order",
382 123,
383 )
384 .is_none());
385 }
386
387 #[test]
388 fn consumed_nonce_error_requires_manual_reconciliation() {
389 let error = nonce_consumed_manual_reconciliation_error(7);
390 assert!(error.contains("already used"));
391 assert!(error.contains("manual reconciliation"));
392 }
393
394 #[test]
395 fn rsm_transaction_request_uses_fresh_delivery_deadline() {
396 let mut row = test_outbox_row(ActionKey::SystemWithdrawToken);
397 row.nonce = 10;
398 row.expires_at_ms = Some(1);
399 let signed = SignedDirective {
400 request_id: row.directive_id.clone(),
401 account: row.account,
402 nonce: row.nonce,
403 directive: vec![1, 2, 3],
404 signature: "0x1234".to_string(),
405 };
406
407 let tx_request = build_rsm_transaction_request(&row, signed);
408
409 assert!(
410 tx_request.expires_at > tx_request.timestamp,
411 "delivery request should have a fresh in-process deadline"
412 );
413 assert_ne!(
414 tx_request.expires_at,
415 row.expires_at_ms.unwrap(),
416 "domain request expiry must not terminal-expire accepted chain delivery"
417 );
418 }
419}