Skip to main content

hypercall_transaction_submitter/
lib.rs

1pub mod exchange_encoder;
2mod tx_relayer;
3pub use exchange_encoder::{encode_execute_rsm_directive, encode_execute_user_directive};
4pub use tx_relayer::{
5    AwsKmsTxRelayer, DirectTxRelayer, MockTxRelayer, PendingTransactionStatus,
6    SubmittedTransaction, TxRelayerTrait,
7};
8use tx_relayer::{SignedAttemptRecorder, SignedTransactionAttempt};
9
10use alloy::primitives::{Address, Bytes};
11use async_trait::async_trait;
12use eyre::Result as EyreResult;
13use hypercall_db::AsyncDirectiveOutboxWriter;
14use hypercall_transaction_submitter_core::{
15    ContractCall, SubmissionStatus, SubmittedNonce, SubmitterId,
16};
17use hypercall_transaction_submitter_db::{
18    AsyncTransactionSubmitterStore, SubmissionAttemptRecord, SubmissionRecord,
19};
20use hypercall_types::{
21    topics::TOPIC_TRANSACTION_REQUESTS, EngineMessage, SignedDirectiveTx, TransactionRequest,
22    TransactionStatus, TransactionType, TransactionUpdate,
23};
24use serde::Deserialize;
25use std::str::FromStr;
26use std::sync::Arc;
27use std::time::{Duration, SystemTime, UNIX_EPOCH};
28use tokio::sync::{mpsc, Mutex};
29use tracing::{debug, error, info};
30
31#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq)]
32#[serde(rename_all = "snake_case")]
33pub enum TransactionSubmitterMode {
34    Mock,
35    Direct,
36    AwsKms,
37}
38
39impl FromStr for TransactionSubmitterMode {
40    type Err = String;
41
42    fn from_str(value: &str) -> Result<Self, Self::Err> {
43        match value.trim().to_ascii_lowercase().as_str() {
44            "mock" => Ok(Self::Mock),
45            "direct" => Ok(Self::Direct),
46            "aws_kms" => Ok(Self::AwsKms),
47            other => Err(format!(
48                "expected one of: mock, direct, aws_kms; got {}",
49                other
50            )),
51        }
52    }
53}
54
55#[derive(Debug, Clone, Deserialize)]
56#[serde(default, deny_unknown_fields)]
57pub struct TransactionSubmitterConfig {
58    pub mode: TransactionSubmitterMode,
59    pub aws_kms_key_id: String,
60    pub max_gas_price: String,
61    pub rpc_url: String,
62}
63
64impl Default for TransactionSubmitterConfig {
65    fn default() -> Self {
66        Self {
67            mode: TransactionSubmitterMode::Mock,
68            aws_kms_key_id: String::new(),
69            max_gas_price: "500000000000".to_string(),
70            rpc_url: "https://rpc.hyperliquid-testnet.xyz/evm".to_string(),
71        }
72    }
73}
74
75#[derive(Debug, Clone, Default)]
76pub struct TransactionSubmitterSecrets {
77    pub transaction_submitter_private_key: Option<String>,
78}
79
80impl TransactionSubmitterSecrets {
81    pub fn require_transaction_submitter_private_key(&self) -> eyre::Result<&str> {
82        self.transaction_submitter_private_key
83            .as_deref()
84            .ok_or_else(|| {
85                eyre::eyre!("TRANSACTION_SUBMITTER_PRIVATE_KEY must be set in the environment")
86            })
87    }
88}
89
90#[async_trait]
91pub trait EventBusTrait: Send + Sync {
92    fn get_sender(&self) -> mpsc::UnboundedSender<EngineMessage>;
93
94    async fn subscribe(
95        &self,
96        topics: Vec<String>,
97    ) -> Result<mpsc::UnboundedReceiver<EngineMessage>, String>;
98}
99
100type DirectiveEncoder = fn(Address, Bytes, Bytes, Option<String>) -> EyreResult<ContractCall>;
101type DirectiveOutboxDb = Arc<dyn AsyncDirectiveOutboxWriter>;
102type SubmitterStoreDb = Arc<dyn AsyncTransactionSubmitterStore>;
103
104const PENDING_RECONCILIATION_RETRY_DELAY: Duration = Duration::from_secs(30);
105const PERSISTED_PENDING_SUBMISSION_BATCH_SIZE: i64 = 1000;
106
107#[derive(Debug)]
108enum RelayDirectiveError {
109    Build(String),
110    Relay(String),
111    Terminal(String),
112}
113
114struct SubmitterAttemptRecorder {
115    store: Option<SubmitterStoreDb>,
116}
117
118#[async_trait]
119impl SignedAttemptRecorder for SubmitterAttemptRecorder {
120    async fn record_signed_attempt(&self, attempt: SignedTransactionAttempt) -> EyreResult<()> {
121        let Some(store) = self.store.as_ref() else {
122            return Ok(());
123        };
124        let record = SubmissionRecord {
125            submitter: attempt.submitter,
126            nonce: attempt.nonce,
127            status: SubmissionStatus::Created,
128            primary_tx_hash: Some(attempt.tx_hash.clone()),
129            terminal_error: None,
130        };
131        let attempts = vec![SubmissionAttemptRecord {
132            tx_hash: attempt.tx_hash,
133            raw_tx: attempt.raw_tx,
134        }];
135        TransactionSubmitter::persist_submitter_submission_with_db(
136            Arc::clone(store),
137            record,
138            attempts,
139        )
140        .await
141    }
142}
143
144/// Delivers signed directive requests to chain and publishes delivery status.
145///
146/// Signer-specific crates own key custody and signer construction. This type
147/// owns the operational path around those signers: encode, submit, reconcile,
148/// publish status, and persist outbox delivery updates.
149pub struct TransactionSubmitter {
150    event_bus: Arc<dyn EventBusTrait>,
151    tx_relayer: Arc<dyn TxRelayerTrait>,
152    exchange_address: Address,
153    /// Optional direct DB handle for outbox delivery status updates.
154    ///
155    /// This keeps direct-submission deployments from relying only on the
156    /// in-process event bus to record terminal delivery state.
157    db: Option<DirectiveOutboxDb>,
158    /// Optional submitter-owned persistence for submissions and replacement
159    /// attempts.
160    submitter_store: Option<SubmitterStoreDb>,
161    /// Serializes nonce allocation and the first submit attempt for this
162    /// submitter instance.
163    ///
164    /// This mirrors the Sanguine submitter invariant: no later request may
165    /// observe or allocate around a nonce while an earlier request is still in
166    /// the undecided window between nonce selection and durable submitter state.
167    nonce_lock: Arc<Mutex<()>>,
168}
169
170impl TransactionSubmitter {
171    /// Creates TransactionSubmitter with direct RPC submission using the configured signer key.
172    pub async fn new_direct(
173        event_bus: Arc<dyn EventBusTrait>,
174        config: &TransactionSubmitterConfig,
175        exchange_contract_address: &str,
176        secrets: &TransactionSubmitterSecrets,
177    ) -> EyreResult<Self> {
178        let tx_relayer = DirectTxRelayer::new(config, secrets).await?;
179        let exchange_address = exchange_encoder::parse_exchange_address(exchange_contract_address)?;
180        Ok(Self {
181            event_bus,
182            tx_relayer: Arc::new(tx_relayer),
183            exchange_address,
184            db: None,
185            submitter_store: None,
186            nonce_lock: Arc::new(Mutex::new(())),
187        })
188    }
189
190    /// Creates TransactionSubmitter with direct RPC submission using AWS KMS.
191    pub async fn new_aws_kms(
192        event_bus: Arc<dyn EventBusTrait>,
193        config: &TransactionSubmitterConfig,
194        exchange_contract_address: &str,
195    ) -> EyreResult<Self> {
196        let tx_relayer = AwsKmsTxRelayer::new(config).await?;
197        let exchange_address = exchange_encoder::parse_exchange_address(exchange_contract_address)?;
198        Ok(Self {
199            event_bus,
200            tx_relayer: Arc::new(tx_relayer),
201            exchange_address,
202            db: None,
203            submitter_store: None,
204            nonce_lock: Arc::new(Mutex::new(())),
205        })
206    }
207
208    /// Creates TransactionSubmitter with no-op mock for local tests and disabled delivery.
209    pub fn new_mock(
210        event_bus: Arc<dyn EventBusTrait>,
211        exchange_contract_address: &str,
212    ) -> EyreResult<Self> {
213        Ok(Self {
214            event_bus,
215            tx_relayer: Arc::new(MockTxRelayer),
216            exchange_address: exchange_encoder::parse_exchange_address(exchange_contract_address)?,
217            db: None,
218            submitter_store: None,
219            nonce_lock: Arc::new(Mutex::new(())),
220        })
221    }
222
223    /// Attach a database handler for direct outbox persistence.
224    /// When set, outbox updates are written to Postgres directly,
225    /// independent of the event bus consumer.
226    pub fn with_db(mut self, db: DirectiveOutboxDb) -> Self {
227        self.db = Some(db);
228        self
229    }
230
231    pub fn with_submitter_store(mut self, submitter_store: SubmitterStoreDb) -> Self {
232        self.submitter_store = Some(submitter_store);
233        self
234    }
235
236    /// Start the transaction submission service (legacy, no shutdown support).
237    /// For graceful shutdown support, use `start_with_shutdown`.
238    pub fn start(self: Arc<Self>) -> Result<tokio::task::JoinHandle<()>, String> {
239        // Create a dummy shutdown channel that never fires.
240        // IMPORTANT: We must keep the sender alive, otherwise recv() returns Closed immediately.
241        let (tx, rx) = tokio::sync::broadcast::channel::<()>(1);
242        std::mem::forget(tx);
243        self.start_with_shutdown(rx)
244    }
245
246    /// Start the transaction submission service with shutdown support.
247    ///
248    /// Returns a `JoinHandle` for integration with `TaskGroup` for coordinated shutdown.
249    pub fn start_with_shutdown(
250        self: Arc<Self>,
251        mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
252    ) -> Result<tokio::task::JoinHandle<()>, String> {
253        info!("Starting Transaction Submitter service...");
254
255        // Subscribe to transaction requests
256        let event_bus = self.event_bus.clone();
257        let submitter = self.clone();
258        let handle = tokio::spawn(async move {
259            submitter.spawn_persisted_pending_submissions().await;
260
261            let mut tx_receiver = match event_bus
262                .subscribe(vec![TOPIC_TRANSACTION_REQUESTS.to_string()])
263                .await
264            {
265                Ok(rx) => rx,
266                Err(e) => {
267                    error!("Failed to subscribe to transaction requests: {}", e);
268                    return;
269                }
270            };
271
272            loop {
273                tokio::select! {
274                    _ = shutdown_rx.recv() => {
275                        info!("Transaction Submitter received shutdown signal");
276                        break;
277                    }
278                    maybe_message = tx_receiver.recv() => {
279                        match maybe_message {
280                            Some(EngineMessage::TransactionRequest(tx_req)) => {
281                                submitter.process_transaction_request(tx_req).await;
282                            }
283                            Some(_) => {} // Ignore other message types
284                            None => break, // Channel closed
285                        }
286                    }
287                }
288            }
289            info!("Transaction Submitter stopped");
290        });
291
292        info!("✓ Transaction Submitter service started");
293        Ok(handle)
294    }
295
296    async fn spawn_persisted_pending_submissions(&self) {
297        let Some(submitter_store) = self.submitter_store.as_ref() else {
298            return;
299        };
300
301        let mut after_submission_id = 0_i64;
302        loop {
303            let rows = match submitter_store
304                .list_pending_submissions(
305                    after_submission_id,
306                    PERSISTED_PENDING_SUBMISSION_BATCH_SIZE,
307                )
308                .await
309            {
310                Ok(rows) => rows,
311                Err(error) => {
312                    error!("Failed to load persisted pending submitter submissions: {error}");
313                    return;
314                }
315            };
316
317            if rows.is_empty() {
318                break;
319            }
320
321            for row in rows {
322                after_submission_id = row.submission_id;
323                if let Some(directive_id) = row.directive_id {
324                    self.spawn_pending_reconciliation(
325                        directive_id,
326                        row.submitter,
327                        row.nonce,
328                        row.tx_hashes,
329                    );
330                } else {
331                    self.spawn_pending_submitter_reconciliation(
332                        row.submitter,
333                        row.nonce,
334                        row.tx_hashes,
335                    );
336                }
337            }
338        }
339    }
340
341    /// Process a single transaction request
342    async fn process_transaction_request(&self, tx_req: TransactionRequest) {
343        info!("Processing transaction request: {}", tx_req.request_id);
344
345        // Check if request has expired
346        let current_time = timestamp_millis();
347        if current_time > tx_req.expires_at {
348            self.persist_outbox_update(
349                &tx_req.request_id,
350                TransactionStatus::Expired,
351                None,
352                Some("Transaction request expired".to_string()),
353            )
354            .await;
355            self.send_update(
356                &tx_req.request_id,
357                TransactionStatus::Expired,
358                None,
359                Some("Transaction request expired".to_string()),
360            )
361            .await;
362            return;
363        }
364
365        let (directive, encoder, log_label): (&SignedDirectiveTx, DirectiveEncoder, &'static str) =
366            match &tx_req.transaction_type {
367                TransactionType::UserDirective(user_directive) => {
368                    (user_directive, encode_execute_user_directive, "user")
369                }
370                TransactionType::RsmDirective(rsm_directive) => {
371                    (rsm_directive, encode_execute_rsm_directive, "RSM")
372                }
373            };
374
375        self.handle_signed_directive(
376            &tx_req.request_id,
377            &tx_req.account_contract,
378            directive,
379            encoder,
380            log_label,
381        )
382        .await;
383    }
384
385    /// Orchestrate signed directive submission with status updates.
386    async fn handle_signed_directive(
387        &self,
388        request_id: &str,
389        account_contract: &hypercall_types::WalletAddress,
390        directive: &SignedDirectiveTx,
391        encoder: DirectiveEncoder,
392        log_label: &str,
393    ) {
394        info!(
395            "Submitting {} directive: request_id={}, account={}",
396            log_label, request_id, account_contract
397        );
398
399        match self
400            .submit_signed_directive(request_id, directive, encoder, log_label)
401            .await
402        {
403            Ok(submission) => {
404                let tx_hash = Some(submission.hash.clone());
405                let status = if submission.confirmed {
406                    TransactionStatus::Confirmed
407                } else {
408                    TransactionStatus::Pending
409                };
410                self.persist_outbox_update(request_id, status, tx_hash.clone(), None)
411                    .await;
412                self.send_update(request_id, status, tx_hash, None).await;
413                if !submission.confirmed {
414                    self.spawn_pending_reconciliation(
415                        request_id.to_string(),
416                        submission.submitter,
417                        submission.nonce,
418                        submission.all_hashes,
419                    );
420                }
421                info!(
422                    "{} directive submitted: {} confirmed={}",
423                    log_label, request_id, submission.confirmed
424                );
425            }
426            Err(RelayDirectiveError::Build(error)) => {
427                error!(
428                    "{} directive relay payload build failed: {} - {}; emitting terminal failure",
429                    log_label, request_id, error
430                );
431                self.persist_outbox_update(
432                    request_id,
433                    TransactionStatus::Failed,
434                    None,
435                    Some(error.clone()),
436                )
437                .await;
438                self.send_update(request_id, TransactionStatus::Failed, None, Some(error))
439                    .await;
440            }
441            Err(RelayDirectiveError::Relay(error)) => {
442                error!(
443                    "{} directive relay submission failed: {} - {}; outbox row remains retryable",
444                    log_label, request_id, error
445                );
446                self.persist_outbox_update(
447                    request_id,
448                    TransactionStatus::Submitted,
449                    None,
450                    Some(error.clone()),
451                )
452                .await;
453                self.send_update(request_id, TransactionStatus::Submitted, None, Some(error))
454                    .await;
455            }
456            Err(RelayDirectiveError::Terminal(error)) => {
457                error!(
458                    "{} directive relay terminal failure: {} - {}; emitting terminal failure",
459                    log_label, request_id, error
460                );
461                self.persist_outbox_update(
462                    request_id,
463                    TransactionStatus::Failed,
464                    None,
465                    Some(error.clone()),
466                )
467                .await;
468                self.send_update(request_id, TransactionStatus::Failed, None, Some(error))
469                    .await;
470            }
471        }
472    }
473
474    /// Build a submitter-owned contract call from SignedDirectiveTx and submit it.
475    async fn submit_signed_directive(
476        &self,
477        request_id: &str,
478        signed_directive: &SignedDirectiveTx,
479        encoder: DirectiveEncoder,
480        log_label: &str,
481    ) -> Result<SubmittedTransaction, RelayDirectiveError> {
482        info!(
483            "Building {} directive transaction payload: request_id={}",
484            log_label, request_id
485        );
486
487        let directive = Bytes::from(signed_directive.directive.clone());
488        let signature = Bytes::from_str(&signed_directive.signature)
489            .map_err(|e| RelayDirectiveError::Build(format!("invalid signature bytes: {}", e)))?;
490        let contract_call = encoder(
491            self.exchange_address,
492            directive,
493            signature,
494            Some(request_id.to_string()),
495        )
496        .map_err(|e| RelayDirectiveError::Build(e.to_string()))?;
497
498        let calldata_hex = hex::encode(contract_call.data.as_ref());
499        error!(
500            request_id,
501            log_label,
502            exchange_address = %self.exchange_address,
503            tx_to = %contract_call.to,
504            tx_value = %contract_call.value,
505            calldata = %format!("0x{calldata_hex}"),
506            directive = %format!("0x{}", hex::encode(&signed_directive.directive)),
507            signature = %signed_directive.signature,
508            "prepared directive transaction payload"
509        );
510
511        self.submit_contract_call(request_id, contract_call)
512            .await
513            .map_err(|e| {
514                let error = e.to_string();
515                if is_terminal_relay_error(&error) {
516                    RelayDirectiveError::Terminal(error)
517                } else {
518                    RelayDirectiveError::Relay(error)
519                }
520            })
521    }
522
523    async fn submit_contract_call(
524        &self,
525        request_id: &str,
526        contract_call: ContractCall,
527    ) -> EyreResult<SubmittedTransaction> {
528        let _nonce_guard = self.nonce_lock.lock().await;
529        let submitter = self.tx_relayer.submitter_address();
530        let db_nonce_floor = match self.submitter_store.as_ref() {
531            Some(store) => store
532                .max_nonce_for_submitter(&submitter)
533                .await
534                .map_err(|error| eyre::eyre!("failed to load submitter nonce floor: {error}"))?
535                .map(|nonce| nonce.saturating_add(1)),
536            None => None,
537        };
538        let nonce = self.tx_relayer.select_next_nonce(db_nonce_floor).await?;
539        self.persist_created_submitter_submission(submitter, nonce)
540            .await?;
541        self.persist_directive_submitter_pointer(request_id, submitter, nonce)
542            .await?;
543
544        let recorder: Arc<dyn SignedAttemptRecorder> = Arc::new(SubmitterAttemptRecorder {
545            store: self.submitter_store.as_ref().map(Arc::clone),
546        });
547        let submission = match self
548            .tx_relayer
549            .send_transaction_with_nonce_recording_attempts(contract_call, nonce, recorder)
550            .await
551        {
552            Ok(submission) => submission,
553            Err(error) => {
554                if let Some(submitter_store) = self.submitter_store.as_ref() {
555                    Self::persist_submitter_status_with_db(
556                        Arc::clone(submitter_store),
557                        submitter,
558                        nonce,
559                        SubmissionStatus::Failed,
560                        None,
561                        Some(error.to_string()),
562                    )
563                    .await;
564                }
565                return Err(error);
566            }
567        };
568        let status = if submission.confirmed {
569            SubmissionStatus::Confirmed
570        } else {
571            SubmissionStatus::Broadcasted
572        };
573        if let Some(submitter_store) = self.submitter_store.as_ref() {
574            Self::persist_submitter_status_with_db(
575                Arc::clone(submitter_store),
576                submission.submitter,
577                submission.nonce,
578                status,
579                Some(submission.hash.clone()),
580                None,
581            )
582            .await;
583        }
584
585        info!("Transaction submitted: {:?}", submission);
586        Ok(submission)
587    }
588
589    /// Send transaction status update
590    async fn send_update(
591        &self,
592        request_id: &str,
593        status: TransactionStatus,
594        tx_hash: Option<String>,
595        error: Option<String>,
596    ) {
597        let update = TransactionUpdate {
598            request_id: request_id.to_string(),
599            status,
600            tx_hash,
601            error,
602            timestamp: timestamp_millis(),
603            gas_used: None,
604            gas_price: None,
605        };
606
607        debug!("Sending transaction update: {:?}", update);
608
609        if let Err(e) = self
610            .event_bus
611            .get_sender()
612            .send(EngineMessage::TransactionUpdate(update))
613        {
614            error!("Failed to send transaction update: {}", e);
615        }
616    }
617
618    /// Persist outbox delivery status directly through the DB writer interface.
619    async fn persist_outbox_update(
620        &self,
621        request_id: &str,
622        status: TransactionStatus,
623        tx_hash: Option<String>,
624        error_msg: Option<String>,
625    ) {
626        let Some(db) = self.db.as_ref() else {
627            return;
628        };
629        Self::persist_outbox_update_with_db(
630            Arc::clone(db),
631            request_id.to_string(),
632            status,
633            tx_hash,
634            error_msg,
635        )
636        .await;
637    }
638
639    async fn persist_outbox_update_with_db(
640        db: DirectiveOutboxDb,
641        request_id: String,
642        status: TransactionStatus,
643        tx_hash: Option<String>,
644        error_msg: Option<String>,
645    ) {
646        if let Err(e) = db
647            .persist_directive_transaction_update(
648                &request_id,
649                status,
650                tx_hash.as_deref(),
651                error_msg.as_deref(),
652            )
653            .await
654        {
655            error!("Failed to persist outbox update for {}: {}", request_id, e);
656        }
657    }
658
659    async fn persist_created_submitter_submission(
660        &self,
661        submitter: SubmitterId,
662        nonce: SubmittedNonce,
663    ) -> EyreResult<()> {
664        let Some(submitter_store) = self.submitter_store.as_ref() else {
665            return Ok(());
666        };
667        let record = SubmissionRecord {
668            submitter,
669            nonce,
670            status: SubmissionStatus::Created,
671            primary_tx_hash: None,
672            terminal_error: None,
673        };
674        Self::persist_submitter_submission_with_db(Arc::clone(submitter_store), record, Vec::new())
675            .await
676    }
677
678    async fn persist_submitter_submission_with_db(
679        submitter_store: SubmitterStoreDb,
680        record: SubmissionRecord,
681        attempts: Vec<SubmissionAttemptRecord>,
682    ) -> EyreResult<()> {
683        submitter_store
684            .record_submission(&record, &attempts)
685            .await
686            .map_err(|e| {
687                eyre::eyre!(
688                    "failed to persist submitter submission {}:{}: {}",
689                    record.submitter,
690                    record.nonce,
691                    e
692                )
693            })
694    }
695
696    async fn persist_submitter_status_with_db(
697        submitter_store: SubmitterStoreDb,
698        submitter: SubmitterId,
699        nonce: SubmittedNonce,
700        status: SubmissionStatus,
701        primary_tx_hash: Option<String>,
702        terminal_error: Option<String>,
703    ) {
704        if let Err(e) = submitter_store
705            .update_submission_status(
706                &submitter,
707                nonce,
708                status,
709                primary_tx_hash.as_deref(),
710                terminal_error.as_deref(),
711            )
712            .await
713        {
714            error!(
715                "Failed to update submitter submission {}:{}: {}",
716                submitter, nonce, e
717            );
718        }
719    }
720
721    async fn persist_directive_submitter_pointer(
722        &self,
723        request_id: &str,
724        submitter: SubmitterId,
725        nonce: SubmittedNonce,
726    ) -> EyreResult<()> {
727        let Some(db) = self.db.as_ref() else {
728            return Ok(());
729        };
730        Self::persist_directive_submitter_pointer_with_db(
731            Arc::clone(db),
732            request_id.to_string(),
733            submitter,
734            nonce,
735        )
736        .await
737    }
738
739    async fn persist_directive_submitter_pointer_with_db(
740        db: DirectiveOutboxDb,
741        request_id: String,
742        submitter: SubmitterId,
743        nonce: SubmittedNonce,
744    ) -> EyreResult<()> {
745        db.record_directive_submitter_submission(&request_id, &submitter, nonce)
746            .await
747            .map_err(|e| {
748                eyre::eyre!(
749                    "failed to persist directive submitter pointer for {}: {}",
750                    request_id,
751                    e
752                )
753            })
754    }
755
756    fn spawn_pending_reconciliation(
757        &self,
758        request_id: String,
759        submitter: SubmitterId,
760        nonce: SubmittedNonce,
761        tx_hashes: Vec<String>,
762    ) {
763        let sender = self.event_bus.get_sender();
764        let tx_relayer = Arc::clone(&self.tx_relayer);
765        let db = self.db.clone();
766        let submitter_store = self.submitter_store.clone();
767
768        tokio::spawn(async move {
769            let primary_tx_hash = tx_hashes
770                .last()
771                .cloned()
772                .unwrap_or_else(|| "unknown".to_string());
773            loop {
774                match tx_relayer.reconcile_pending_transactions(&tx_hashes).await {
775                    Ok(PendingTransactionStatus::Unsupported) => {}
776                    Ok(PendingTransactionStatus::Pending(reason)) => {
777                        send_update_via_sender(
778                            &sender,
779                            request_id.clone(),
780                            TransactionStatus::Pending,
781                            Some(primary_tx_hash.clone()),
782                            Some(reason),
783                        );
784                        tokio::time::sleep(PENDING_RECONCILIATION_RETRY_DELAY).await;
785                        continue;
786                    }
787                    Ok(PendingTransactionStatus::Confirmed(confirmed_tx_hash)) => {
788                        if let Some(submitter_store) = submitter_store.as_ref() {
789                            Self::persist_submitter_status_with_db(
790                                Arc::clone(submitter_store),
791                                submitter,
792                                nonce,
793                                SubmissionStatus::Confirmed,
794                                Some(confirmed_tx_hash.clone()),
795                                None,
796                            )
797                            .await;
798                        }
799                        if let Some(db) = db.as_ref() {
800                            Self::persist_outbox_update_with_db(
801                                Arc::clone(db),
802                                request_id.clone(),
803                                TransactionStatus::Confirmed,
804                                Some(confirmed_tx_hash.clone()),
805                                None,
806                            )
807                            .await;
808                        }
809                        send_update_via_sender(
810                            &sender,
811                            request_id.clone(),
812                            TransactionStatus::Confirmed,
813                            Some(confirmed_tx_hash),
814                            None,
815                        );
816                    }
817                    Ok(PendingTransactionStatus::Failed(error)) => {
818                        if let Some(submitter_store) = submitter_store.as_ref() {
819                            Self::persist_submitter_status_with_db(
820                                Arc::clone(submitter_store),
821                                submitter,
822                                nonce,
823                                SubmissionStatus::Failed,
824                                Some(primary_tx_hash.clone()),
825                                Some(error.clone()),
826                            )
827                            .await;
828                        }
829                        if let Some(db) = db.as_ref() {
830                            Self::persist_outbox_update_with_db(
831                                Arc::clone(db),
832                                request_id.clone(),
833                                TransactionStatus::Failed,
834                                Some(primary_tx_hash.clone()),
835                                Some(error.clone()),
836                            )
837                            .await;
838                        }
839                        send_update_via_sender(
840                            &sender,
841                            request_id.clone(),
842                            TransactionStatus::Failed,
843                            Some(primary_tx_hash.clone()),
844                            Some(error),
845                        );
846                    }
847                    Err(error) => {
848                        send_update_via_sender(
849                            &sender,
850                            request_id.clone(),
851                            TransactionStatus::Pending,
852                            Some(primary_tx_hash.clone()),
853                            Some(format!(
854                                "pending transaction reconciliation failed: {:?}",
855                                error
856                            )),
857                        );
858                        tokio::time::sleep(PENDING_RECONCILIATION_RETRY_DELAY).await;
859                        continue;
860                    }
861                }
862                break;
863            }
864        });
865    }
866
867    fn spawn_pending_submitter_reconciliation(
868        &self,
869        submitter: SubmitterId,
870        nonce: SubmittedNonce,
871        tx_hashes: Vec<String>,
872    ) {
873        let tx_relayer = Arc::clone(&self.tx_relayer);
874        let submitter_store = self.submitter_store.clone();
875
876        tokio::spawn(async move {
877            loop {
878                match tx_relayer.reconcile_pending_transactions(&tx_hashes).await {
879                    Ok(PendingTransactionStatus::Unsupported) => {}
880                    Ok(PendingTransactionStatus::Pending(reason)) => {
881                        tracing::debug!(
882                            %submitter,
883                            %nonce,
884                            %reason,
885                            "submitter submission still pending"
886                        );
887                        tokio::time::sleep(PENDING_RECONCILIATION_RETRY_DELAY).await;
888                        continue;
889                    }
890                    Ok(PendingTransactionStatus::Confirmed(confirmed_tx_hash)) => {
891                        if let Some(submitter_store) = submitter_store.as_ref() {
892                            Self::persist_submitter_status_with_db(
893                                Arc::clone(submitter_store),
894                                submitter,
895                                nonce,
896                                SubmissionStatus::Confirmed,
897                                Some(confirmed_tx_hash),
898                                None,
899                            )
900                            .await;
901                        }
902                    }
903                    Ok(PendingTransactionStatus::Failed(error)) => {
904                        if let Some(submitter_store) = submitter_store.as_ref() {
905                            Self::persist_submitter_status_with_db(
906                                Arc::clone(submitter_store),
907                                submitter,
908                                nonce,
909                                SubmissionStatus::Failed,
910                                None,
911                                Some(error),
912                            )
913                            .await;
914                        }
915                    }
916                    Err(error) => {
917                        tracing::warn!(
918                            %submitter,
919                            %nonce,
920                            ?error,
921                            "submitter pending reconciliation failed"
922                        );
923                        tokio::time::sleep(PENDING_RECONCILIATION_RETRY_DELAY).await;
924                        continue;
925                    }
926                }
927                break;
928            }
929        });
930    }
931}
932
933fn send_update_via_sender(
934    sender: &mpsc::UnboundedSender<EngineMessage>,
935    request_id: String,
936    status: TransactionStatus,
937    tx_hash: Option<String>,
938    error: Option<String>,
939) {
940    let update = TransactionUpdate {
941        request_id,
942        status,
943        tx_hash,
944        error,
945        timestamp: timestamp_millis(),
946        gas_used: None,
947        gas_price: None,
948    };
949
950    debug!("Sending transaction update: {:?}", update);
951
952    if let Err(send_error) = sender.send(EngineMessage::TransactionUpdate(update)) {
953        error!("Failed to send transaction update: {}", send_error);
954    }
955}
956
957fn is_terminal_relay_error(error: &str) -> bool {
958    error.contains("direct submitter transaction reverted")
959        || error.contains("aws_kms submitter transaction reverted")
960}
961
962fn timestamp_millis() -> u64 {
963    SystemTime::now()
964        .duration_since(UNIX_EPOCH)
965        .expect("system clock must not be before Unix epoch")
966        .as_millis() as u64
967}
968
969#[cfg(test)]
970mod tests {
971    use super::*;
972    use alloy::primitives::{keccak256, Address, TxHash, U256};
973    use alloy::sol_types::SolCall;
974    use async_trait::async_trait;
975    use hypercall_transaction_submitter_db::{
976        PendingSubmissionRow, SubmissionDetailRow, TransactionSubmitterReader,
977    };
978    use hypercall_types::topics::TOPIC_TRANSACTION_UPDATES;
979    use hypercall_types::TransactionType;
980    use hypercall_types::WalletAddress;
981    use std::collections::HashMap;
982    use std::str::FromStr;
983    use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
984    use tokio::sync::{Mutex, Notify};
985    use tokio::time::{timeout, Duration};
986
987    const ACCOUNT: &str = "0x1111111111111111111111111111111111111111";
988
989    enum RelayerBehavior {
990        Success,
991        Pending,
992        Fail,
993        Reverted,
994    }
995
996    #[derive(Clone)]
997    #[allow(dead_code)]
998    enum PendingReconciliationBehavior {
999        Unsupported,
1000        StillPending(&'static str),
1001        Confirmed,
1002        Failed(&'static str),
1003    }
1004
1005    type SharedRequests = Arc<Mutex<Vec<ContractCall>>>;
1006
1007    struct MockEventBus {
1008        sender: mpsc::UnboundedSender<EngineMessage>,
1009        receiver: Mutex<Option<mpsc::UnboundedReceiver<EngineMessage>>>,
1010        subscribers: Mutex<Vec<(Vec<String>, mpsc::UnboundedSender<EngineMessage>)>>,
1011    }
1012
1013    impl MockEventBus {
1014        fn new() -> Self {
1015            let (sender, receiver) = mpsc::unbounded_channel();
1016            Self {
1017                sender,
1018                receiver: Mutex::new(Some(receiver)),
1019                subscribers: Mutex::new(Vec::new()),
1020            }
1021        }
1022
1023        async fn start_processing(self: Arc<Self>) {
1024            let Some(mut receiver) = self.receiver.lock().await.take() else {
1025                return;
1026            };
1027            tokio::spawn(async move {
1028                while let Some(message) = receiver.recv().await {
1029                    let topic = match &message {
1030                        EngineMessage::TransactionRequest(_) => TOPIC_TRANSACTION_REQUESTS,
1031                        EngineMessage::TransactionUpdate(_) => TOPIC_TRANSACTION_UPDATES,
1032                        _ => continue,
1033                    };
1034                    let subscribers = self.subscribers.lock().await;
1035                    for (topics, sender) in subscribers.iter() {
1036                        if topics.iter().any(|candidate| candidate == topic) {
1037                            let _ = sender.send(message.clone());
1038                        }
1039                    }
1040                }
1041            });
1042        }
1043    }
1044
1045    #[async_trait]
1046    impl EventBusTrait for MockEventBus {
1047        fn get_sender(&self) -> mpsc::UnboundedSender<EngineMessage> {
1048            self.sender.clone()
1049        }
1050
1051        async fn subscribe(
1052            &self,
1053            topics: Vec<String>,
1054        ) -> Result<mpsc::UnboundedReceiver<EngineMessage>, String> {
1055            let (sender, receiver) = mpsc::unbounded_channel();
1056            self.subscribers.lock().await.push((topics, sender));
1057            Ok(receiver)
1058        }
1059    }
1060
1061    struct CapturingTxRelayer {
1062        behavior: RelayerBehavior,
1063        pending_reconciliation: PendingReconciliationBehavior,
1064        requests: SharedRequests,
1065    }
1066
1067    impl CapturingTxRelayer {
1068        fn with_shared_capture(
1069            behavior: RelayerBehavior,
1070            pending_reconciliation: PendingReconciliationBehavior,
1071            requests: SharedRequests,
1072        ) -> Self {
1073            Self {
1074                behavior,
1075                pending_reconciliation,
1076                requests,
1077            }
1078        }
1079    }
1080
1081    #[async_trait]
1082    impl TxRelayerTrait for CapturingTxRelayer {
1083        fn submitter_address(&self) -> SubmitterId {
1084            Address::repeat_byte(0x42)
1085        }
1086
1087        async fn send_transaction(
1088            &self,
1089            request: ContractCall,
1090            db_nonce_floor: Option<u64>,
1091        ) -> EyreResult<SubmittedTransaction> {
1092            self.requests.lock().await.push(request);
1093            let nonce = db_nonce_floor.unwrap_or(7);
1094
1095            match self.behavior {
1096                RelayerBehavior::Success => Ok(SubmittedTransaction::new(
1097                    self.submitter_address(),
1098                    nonce,
1099                    TxHash::from_str(
1100                        "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
1101                    )
1102                    .expect("valid tx hash")
1103                    .to_string(),
1104                    true,
1105                )),
1106                RelayerBehavior::Pending => Ok(SubmittedTransaction::new(
1107                    self.submitter_address(),
1108                    nonce,
1109                    TxHash::from_str(
1110                        "0xbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb",
1111                    )
1112                    .expect("valid tx hash")
1113                    .to_string(),
1114                    false,
1115                )),
1116                RelayerBehavior::Fail => Err(eyre::eyre!("forced failure")),
1117                RelayerBehavior::Reverted => Err(eyre::eyre!(
1118                    "direct submitter transaction reverted: hash=0xdddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddd"
1119                )),
1120            }
1121        }
1122
1123        async fn reconcile_pending_transaction(
1124            &self,
1125            _tx_hash: &str,
1126        ) -> EyreResult<PendingTransactionStatus> {
1127            Ok(match self.pending_reconciliation {
1128                PendingReconciliationBehavior::Unsupported => PendingTransactionStatus::Unsupported,
1129                PendingReconciliationBehavior::StillPending(reason) => {
1130                    PendingTransactionStatus::Pending(reason.to_string())
1131                }
1132                PendingReconciliationBehavior::Confirmed => {
1133                    PendingTransactionStatus::Confirmed(_tx_hash.to_string())
1134                }
1135                PendingReconciliationBehavior::Failed(error) => {
1136                    PendingTransactionStatus::Failed(error.to_string())
1137                }
1138            })
1139        }
1140    }
1141
1142    async fn take_requests(shared_requests: &SharedRequests) -> Vec<ContractCall> {
1143        shared_requests.lock().await.clone()
1144    }
1145
1146    struct BlockingPendingRelayer {
1147        started: Arc<Notify>,
1148        release: Arc<Notify>,
1149        requests: SharedRequests,
1150    }
1151
1152    #[async_trait]
1153    impl TxRelayerTrait for BlockingPendingRelayer {
1154        fn submitter_address(&self) -> SubmitterId {
1155            Address::repeat_byte(0x43)
1156        }
1157
1158        async fn send_transaction(
1159            &self,
1160            request: ContractCall,
1161            db_nonce_floor: Option<u64>,
1162        ) -> EyreResult<SubmittedTransaction> {
1163            self.requests.lock().await.push(request);
1164            Ok(SubmittedTransaction::new(
1165                self.submitter_address(),
1166                db_nonce_floor.unwrap_or(8),
1167                TxHash::from_str(
1168                    "0xcccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc",
1169                )
1170                .expect("valid tx hash")
1171                .to_string(),
1172                false,
1173            ))
1174        }
1175
1176        async fn reconcile_pending_transaction(
1177            &self,
1178            _tx_hash: &str,
1179        ) -> EyreResult<PendingTransactionStatus> {
1180            self.started.notify_one();
1181            self.release.notified().await;
1182            Ok(PendingTransactionStatus::Confirmed(_tx_hash.to_string()))
1183        }
1184    }
1185
1186    struct ReplacementPendingRelayer {
1187        requests: SharedRequests,
1188        expected_attempts: Vec<String>,
1189        confirmed_hash: String,
1190    }
1191
1192    #[async_trait]
1193    impl TxRelayerTrait for ReplacementPendingRelayer {
1194        fn submitter_address(&self) -> SubmitterId {
1195            Address::repeat_byte(0x44)
1196        }
1197
1198        async fn send_transaction(
1199            &self,
1200            request: ContractCall,
1201            db_nonce_floor: Option<u64>,
1202        ) -> EyreResult<SubmittedTransaction> {
1203            self.requests.lock().await.push(request);
1204            Ok(SubmittedTransaction::with_hashes(
1205                self.submitter_address(),
1206                db_nonce_floor.unwrap_or(9),
1207                self.expected_attempts
1208                    .last()
1209                    .expect("test attempts should not be empty")
1210                    .clone(),
1211                self.expected_attempts.clone(),
1212                false,
1213            ))
1214        }
1215
1216        async fn reconcile_pending_transactions(
1217            &self,
1218            tx_hashes: &[String],
1219        ) -> EyreResult<PendingTransactionStatus> {
1220            assert_eq!(tx_hashes, self.expected_attempts.as_slice());
1221            Ok(PendingTransactionStatus::Confirmed(
1222                self.confirmed_hash.clone(),
1223            ))
1224        }
1225    }
1226
1227    struct BlockingFirstSendRelayer {
1228        select_count: AtomicU64,
1229        send_count: AtomicU64,
1230        first_send_started: Notify,
1231        second_select_started: Notify,
1232        release_first_send: Notify,
1233    }
1234
1235    impl BlockingFirstSendRelayer {
1236        fn new() -> Self {
1237            Self {
1238                select_count: AtomicU64::new(0),
1239                send_count: AtomicU64::new(0),
1240                first_send_started: Notify::new(),
1241                second_select_started: Notify::new(),
1242                release_first_send: Notify::new(),
1243            }
1244        }
1245    }
1246
1247    struct RecorderOrderingRelayer {
1248        recorded_before_send: Arc<AtomicBool>,
1249    }
1250
1251    #[async_trait]
1252    impl TxRelayerTrait for RecorderOrderingRelayer {
1253        fn submitter_address(&self) -> SubmitterId {
1254            Address::repeat_byte(0x46)
1255        }
1256
1257        async fn send_transaction_with_nonce_recording_attempts(
1258            &self,
1259            _request: ContractCall,
1260            nonce: SubmittedNonce,
1261            recorder: Arc<dyn SignedAttemptRecorder>,
1262        ) -> EyreResult<SubmittedTransaction> {
1263            let tx_hash = TxHash::from([0x46; 32]).to_string();
1264            recorder
1265                .record_signed_attempt(SignedTransactionAttempt {
1266                    submitter: self.submitter_address(),
1267                    nonce,
1268                    tx_hash: tx_hash.clone(),
1269                    raw_tx: b"raw-signed-test-tx".to_vec(),
1270                })
1271                .await?;
1272            self.recorded_before_send.store(true, Ordering::SeqCst);
1273            Ok(SubmittedTransaction::new(
1274                self.submitter_address(),
1275                nonce,
1276                tx_hash,
1277                false,
1278            ))
1279        }
1280    }
1281
1282    #[derive(Default)]
1283    struct InMemorySubmitterStore {
1284        rows: Mutex<HashMap<(SubmitterId, SubmittedNonce), SubmissionDetailRow>>,
1285    }
1286
1287    #[async_trait]
1288    impl TransactionSubmitterReader for InMemorySubmitterStore {
1289        async fn get_submission_by_nonce(
1290            &self,
1291            submitter: &SubmitterId,
1292            nonce: SubmittedNonce,
1293        ) -> anyhow::Result<Option<SubmissionDetailRow>> {
1294            Ok(self.rows.lock().await.get(&(*submitter, nonce)).cloned())
1295        }
1296    }
1297
1298    #[async_trait]
1299    impl AsyncTransactionSubmitterStore for InMemorySubmitterStore {
1300        async fn max_nonce_for_submitter(
1301            &self,
1302            submitter: &SubmitterId,
1303        ) -> anyhow::Result<Option<u64>> {
1304            Ok(self
1305                .rows
1306                .lock()
1307                .await
1308                .iter()
1309                .filter(|((row_submitter, _), row)| {
1310                    row_submitter == submitter && row.primary_tx_hash.is_some()
1311                })
1312                .map(|((_, nonce), _)| *nonce)
1313                .max())
1314        }
1315
1316        async fn record_submission(
1317            &self,
1318            record: &SubmissionRecord,
1319            attempts: &[SubmissionAttemptRecord],
1320        ) -> anyhow::Result<()> {
1321            let mut rows = self.rows.lock().await;
1322            let row = rows
1323                .entry((record.submitter, record.nonce))
1324                .or_insert_with(|| SubmissionDetailRow {
1325                    submitter: record.submitter,
1326                    nonce: record.nonce,
1327                    status: record.status,
1328                    primary_tx_hash: record.primary_tx_hash.clone(),
1329                    terminal_error: record.terminal_error.clone(),
1330                    attempts: Vec::new(),
1331                });
1332            row.status = record.status;
1333            if record.primary_tx_hash.is_some() {
1334                row.primary_tx_hash = record.primary_tx_hash.clone();
1335            }
1336            row.terminal_error = record.terminal_error.clone();
1337            for attempt in attempts {
1338                if !row
1339                    .attempts
1340                    .iter()
1341                    .any(|existing| existing.tx_hash == attempt.tx_hash)
1342                {
1343                    row.attempts.push(attempt.clone());
1344                }
1345            }
1346            Ok(())
1347        }
1348
1349        async fn update_submission_status(
1350            &self,
1351            submitter: &SubmitterId,
1352            nonce: SubmittedNonce,
1353            status: SubmissionStatus,
1354            primary_tx_hash: Option<&str>,
1355            terminal_error: Option<&str>,
1356        ) -> anyhow::Result<()> {
1357            let mut rows = self.rows.lock().await;
1358            let row = rows
1359                .get_mut(&(*submitter, nonce))
1360                .ok_or_else(|| anyhow::anyhow!("missing submission"))?;
1361            row.status = status;
1362            if let Some(primary_tx_hash) = primary_tx_hash {
1363                row.primary_tx_hash = Some(primary_tx_hash.to_string());
1364            }
1365            row.terminal_error = terminal_error.map(str::to_string);
1366            Ok(())
1367        }
1368
1369        async fn list_pending_submissions(
1370            &self,
1371            _after_submission_id: i64,
1372            _limit: i64,
1373        ) -> anyhow::Result<Vec<PendingSubmissionRow>> {
1374            Ok(Vec::new())
1375        }
1376    }
1377
1378    #[async_trait]
1379    impl TxRelayerTrait for BlockingFirstSendRelayer {
1380        fn submitter_address(&self) -> SubmitterId {
1381            Address::repeat_byte(0x45)
1382        }
1383
1384        async fn select_next_nonce(
1385            &self,
1386            db_nonce_floor: Option<u64>,
1387        ) -> EyreResult<SubmittedNonce> {
1388            let select_count = self.select_count.fetch_add(1, Ordering::SeqCst) + 1;
1389            if select_count == 2 {
1390                self.second_select_started.notify_one();
1391            }
1392            Ok(db_nonce_floor.unwrap_or(21))
1393        }
1394
1395        async fn send_transaction_with_nonce(
1396            &self,
1397            _request: ContractCall,
1398            nonce: SubmittedNonce,
1399        ) -> EyreResult<SubmittedTransaction> {
1400            let send_count = self.send_count.fetch_add(1, Ordering::SeqCst) + 1;
1401            if send_count == 1 {
1402                self.first_send_started.notify_one();
1403                self.release_first_send.notified().await;
1404                return Err(eyre::eyre!("forced pre-broadcast failure"));
1405            }
1406
1407            Ok(SubmittedTransaction::new(
1408                self.submitter_address(),
1409                nonce,
1410                TxHash::from([0x45; 32]).to_string(),
1411                true,
1412            ))
1413        }
1414    }
1415
1416    fn rsm_directive_bytes() -> Vec<u8> {
1417        vec![0x11, 0x22, 0x33]
1418    }
1419
1420    fn build_rsm_tx_request(
1421        request_id: &str,
1422        signature: String,
1423        expires_at: u64,
1424    ) -> TransactionRequest {
1425        let account = WalletAddress::from_str(ACCOUNT).expect("valid account");
1426        TransactionRequest {
1427            request_id: request_id.to_string(),
1428            wallet_address: account,
1429            account_contract: account,
1430            transaction_type: TransactionType::RsmDirective(SignedDirectiveTx {
1431                directive: rsm_directive_bytes(),
1432                signature,
1433            }),
1434            timestamp: timestamp_millis(),
1435            expires_at,
1436        }
1437    }
1438
1439    fn contract_call_for_test(external_id: &str) -> ContractCall {
1440        ContractCall {
1441            to: test_exchange_address(),
1442            value: U256::ZERO,
1443            data: Bytes::from(vec![0x12, 0x34]),
1444            external_id: Some(external_id.to_string()),
1445        }
1446    }
1447
1448    async fn recv_tx_update(
1449        rx: &mut tokio::sync::mpsc::UnboundedReceiver<EngineMessage>,
1450    ) -> TransactionUpdate {
1451        let message = timeout(Duration::from_secs(1), rx.recv())
1452            .await
1453            .expect("should receive tx update within timeout")
1454            .expect("channel should remain open");
1455        match message {
1456            EngineMessage::TransactionUpdate(update) => update,
1457            _ => panic!("expected TransactionUpdate"),
1458        }
1459    }
1460
1461    fn valid_signature_hex() -> String {
1462        format!("0x{}", hex::encode([0xAB; 65]))
1463    }
1464
1465    fn valid_signature_bytes() -> Vec<u8> {
1466        vec![0xAB; 65]
1467    }
1468
1469    fn test_exchange_address() -> Address {
1470        Address::from_str("0x1d70Ff185F6C25E4d76163F16563D63d5b590Cbc")
1471            .expect("valid exchange address")
1472    }
1473
1474    #[tokio::test]
1475    async fn transaction_submitter_rsm_expired_emits_expired_only() {
1476        let event_bus = Arc::new(MockEventBus::new());
1477        event_bus.clone().start_processing().await;
1478        let mut updates_rx = event_bus
1479            .subscribe(vec![TOPIC_TRANSACTION_UPDATES.to_string()])
1480            .await
1481            .expect("subscribe should succeed");
1482
1483        let shared_requests: SharedRequests = Arc::new(Mutex::new(Vec::new()));
1484        let submitter = TransactionSubmitter {
1485            event_bus: event_bus.clone(),
1486            tx_relayer: Arc::new(CapturingTxRelayer::with_shared_capture(
1487                RelayerBehavior::Success,
1488                PendingReconciliationBehavior::Unsupported,
1489                shared_requests.clone(),
1490            )),
1491            exchange_address: test_exchange_address(),
1492            db: None,
1493            submitter_store: None,
1494            nonce_lock: Arc::new(Mutex::new(())),
1495        };
1496
1497        let now = timestamp_millis();
1498        let tx_req =
1499            build_rsm_tx_request("expired-rsm", valid_signature_hex(), now.saturating_sub(1));
1500        submitter.process_transaction_request(tx_req).await;
1501
1502        let first = recv_tx_update(&mut updates_rx).await;
1503        assert_eq!(first.request_id, "expired-rsm");
1504        assert_eq!(first.status, TransactionStatus::Expired);
1505        assert_eq!(
1506            first.error.as_deref(),
1507            Some("Transaction request expired"),
1508            "expired message should be explicit"
1509        );
1510        assert!(
1511            timeout(Duration::from_millis(100), updates_rx.recv())
1512                .await
1513                .is_err(),
1514            "expired path should emit exactly one update"
1515        );
1516
1517        let requests = take_requests(&shared_requests).await;
1518        assert!(
1519            requests.is_empty(),
1520            "expired requests must not be sent to relayer"
1521        );
1522    }
1523
1524    #[tokio::test]
1525    async fn transaction_submitter_rsm_success_emits_confirmed_after_relay() {
1526        let event_bus = Arc::new(MockEventBus::new());
1527        event_bus.clone().start_processing().await;
1528        let mut updates_rx = event_bus
1529            .subscribe(vec![TOPIC_TRANSACTION_UPDATES.to_string()])
1530            .await
1531            .expect("subscribe should succeed");
1532
1533        let shared_requests: SharedRequests = Arc::new(Mutex::new(Vec::new()));
1534        let submitter = TransactionSubmitter {
1535            event_bus: event_bus.clone(),
1536            tx_relayer: Arc::new(CapturingTxRelayer::with_shared_capture(
1537                RelayerBehavior::Success,
1538                PendingReconciliationBehavior::Unsupported,
1539                shared_requests.clone(),
1540            )),
1541            exchange_address: test_exchange_address(),
1542            db: None,
1543            submitter_store: None,
1544            nonce_lock: Arc::new(Mutex::new(())),
1545        };
1546
1547        let now = timestamp_millis();
1548        let expected_directive = rsm_directive_bytes();
1549        let expected_signature_bytes = valid_signature_bytes();
1550        let tx_req = build_rsm_tx_request("success-rsm", valid_signature_hex(), now + 60_000);
1551        submitter.process_transaction_request(tx_req).await;
1552
1553        let confirmed = recv_tx_update(&mut updates_rx).await;
1554        assert_eq!(confirmed.request_id, "success-rsm");
1555        assert_eq!(confirmed.status, TransactionStatus::Confirmed);
1556        assert_eq!(
1557            confirmed.tx_hash.as_deref(),
1558            Some("0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
1559        );
1560
1561        let requests = take_requests(&shared_requests).await;
1562        assert_eq!(
1563            requests.len(),
1564            1,
1565            "exactly one relay request should be sent"
1566        );
1567        assert_eq!(requests[0].external_id.as_deref(), Some("success-rsm"));
1568
1569        let to: Address = requests[0].to.into();
1570        assert_eq!(to, test_exchange_address());
1571
1572        let calldata: Bytes = requests[0].data.clone().into();
1573        let expected_selector = keccak256("executeRsmDirective(bytes,bytes)".as_bytes());
1574        assert_eq!(&calldata[..4], &expected_selector[..4]);
1575
1576        let decoded_call = super::exchange_encoder::Exchange::executeRsmDirectiveCall::abi_decode(
1577            calldata.as_ref(),
1578        )
1579        .expect("calldata should decode as executeRsmDirectiveCall");
1580        assert_eq!(decoded_call.directive, Bytes::from(expected_directive));
1581        assert_eq!(
1582            decoded_call.signature,
1583            Bytes::from(expected_signature_bytes)
1584        );
1585    }
1586
1587    #[tokio::test]
1588    async fn transaction_submitter_rsm_relayer_error_leaves_outbox_retryable() {
1589        let event_bus = Arc::new(MockEventBus::new());
1590        event_bus.clone().start_processing().await;
1591        let mut updates_rx = event_bus
1592            .subscribe(vec![TOPIC_TRANSACTION_UPDATES.to_string()])
1593            .await
1594            .expect("subscribe should succeed");
1595
1596        let shared_requests: SharedRequests = Arc::new(Mutex::new(Vec::new()));
1597        let submitter = TransactionSubmitter {
1598            event_bus: event_bus.clone(),
1599            tx_relayer: Arc::new(CapturingTxRelayer::with_shared_capture(
1600                RelayerBehavior::Fail,
1601                PendingReconciliationBehavior::Unsupported,
1602                shared_requests.clone(),
1603            )),
1604            exchange_address: test_exchange_address(),
1605            db: None,
1606            submitter_store: None,
1607            nonce_lock: Arc::new(Mutex::new(())),
1608        };
1609
1610        let now = timestamp_millis();
1611        let tx_req = build_rsm_tx_request("failed-rsm", valid_signature_hex(), now + 60_000);
1612        submitter.process_transaction_request(tx_req).await;
1613
1614        let retryable = recv_tx_update(&mut updates_rx).await;
1615        assert_eq!(retryable.request_id, "failed-rsm");
1616        assert_eq!(retryable.status, TransactionStatus::Submitted);
1617        assert!(retryable.tx_hash.is_none());
1618        assert!(
1619            retryable
1620                .error
1621                .as_deref()
1622                .unwrap_or_default()
1623                .contains("forced failure"),
1624            "relay transport error should be recorded on a retryable nonterminal update: {retryable:?}"
1625        );
1626
1627        let requests = take_requests(&shared_requests).await;
1628        assert_eq!(
1629            requests.len(),
1630            1,
1631            "relay request should have been attempted"
1632        );
1633    }
1634
1635    #[tokio::test]
1636    async fn transaction_submitter_mined_revert_emits_terminal_failed_update() {
1637        let event_bus = Arc::new(MockEventBus::new());
1638        event_bus.clone().start_processing().await;
1639        let mut updates_rx = event_bus
1640            .subscribe(vec![TOPIC_TRANSACTION_UPDATES.to_string()])
1641            .await
1642            .expect("subscribe should succeed");
1643
1644        let shared_requests: SharedRequests = Arc::new(Mutex::new(Vec::new()));
1645        let submitter = TransactionSubmitter {
1646            event_bus: event_bus.clone(),
1647            tx_relayer: Arc::new(CapturingTxRelayer::with_shared_capture(
1648                RelayerBehavior::Reverted,
1649                PendingReconciliationBehavior::Unsupported,
1650                shared_requests.clone(),
1651            )),
1652            exchange_address: test_exchange_address(),
1653            db: None,
1654            submitter_store: None,
1655            nonce_lock: Arc::new(Mutex::new(())),
1656        };
1657
1658        let now = timestamp_millis();
1659        let tx_req = build_rsm_tx_request("reverted-rsm", valid_signature_hex(), now + 60_000);
1660        submitter.process_transaction_request(tx_req).await;
1661
1662        let failed = recv_tx_update(&mut updates_rx).await;
1663        assert_eq!(failed.request_id, "reverted-rsm");
1664        assert_eq!(failed.status, TransactionStatus::Failed);
1665        assert!(failed.tx_hash.is_none());
1666        assert!(
1667            failed
1668                .error
1669                .as_deref()
1670                .unwrap_or_default()
1671                .contains("direct submitter transaction reverted"),
1672            "mined reverts must be terminal failures for manual reconciliation: {failed:?}"
1673        );
1674
1675        let requests = take_requests(&shared_requests).await;
1676        assert_eq!(
1677            requests.len(),
1678            1,
1679            "relay request should have been attempted"
1680        );
1681    }
1682
1683    #[tokio::test]
1684    async fn transaction_submitter_pending_relayer_emits_pending_after_relay() {
1685        let event_bus = Arc::new(MockEventBus::new());
1686        event_bus.clone().start_processing().await;
1687        let mut updates_rx = event_bus
1688            .subscribe(vec![TOPIC_TRANSACTION_UPDATES.to_string()])
1689            .await
1690            .expect("subscribe should succeed");
1691
1692        let shared_requests: SharedRequests = Arc::new(Mutex::new(Vec::new()));
1693        let submitter = TransactionSubmitter {
1694            event_bus: event_bus.clone(),
1695            tx_relayer: Arc::new(CapturingTxRelayer::with_shared_capture(
1696                RelayerBehavior::Pending,
1697                PendingReconciliationBehavior::Unsupported,
1698                shared_requests.clone(),
1699            )),
1700            exchange_address: test_exchange_address(),
1701            db: None,
1702            submitter_store: None,
1703            nonce_lock: Arc::new(Mutex::new(())),
1704        };
1705
1706        let now = timestamp_millis();
1707        let tx_req = build_rsm_tx_request("pending-rsm", valid_signature_hex(), now + 60_000);
1708        submitter.process_transaction_request(tx_req).await;
1709
1710        let pending = recv_tx_update(&mut updates_rx).await;
1711        assert_eq!(pending.request_id, "pending-rsm");
1712        assert_eq!(pending.status, TransactionStatus::Pending);
1713        assert_eq!(
1714            pending.tx_hash.as_deref(),
1715            Some("0xbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")
1716        );
1717    }
1718
1719    #[tokio::test]
1720    async fn transaction_submitter_pending_relayer_with_reconciliation_timeout_stays_pending() {
1721        let event_bus = Arc::new(MockEventBus::new());
1722        event_bus.clone().start_processing().await;
1723        let mut updates_rx = event_bus
1724            .subscribe(vec![TOPIC_TRANSACTION_UPDATES.to_string()])
1725            .await
1726            .expect("subscribe should succeed");
1727
1728        let shared_requests: SharedRequests = Arc::new(Mutex::new(Vec::new()));
1729        let submitter = TransactionSubmitter {
1730            event_bus: event_bus.clone(),
1731            tx_relayer: Arc::new(CapturingTxRelayer::with_shared_capture(
1732                RelayerBehavior::Pending,
1733                PendingReconciliationBehavior::StillPending("timed out waiting for receipt"),
1734                shared_requests.clone(),
1735            )),
1736            exchange_address: test_exchange_address(),
1737            db: None,
1738            submitter_store: None,
1739            nonce_lock: Arc::new(Mutex::new(())),
1740        };
1741
1742        let now = timestamp_millis();
1743        let tx_req = build_rsm_tx_request(
1744            "pending-reconciled-rsm",
1745            valid_signature_hex(),
1746            now + 60_000,
1747        );
1748        submitter.process_transaction_request(tx_req).await;
1749
1750        let pending = recv_tx_update(&mut updates_rx).await;
1751        assert_eq!(pending.request_id, "pending-reconciled-rsm");
1752        assert_eq!(pending.status, TransactionStatus::Pending);
1753
1754        let still_pending = recv_tx_update(&mut updates_rx).await;
1755        assert_eq!(still_pending.request_id, "pending-reconciled-rsm");
1756        assert_eq!(still_pending.status, TransactionStatus::Pending);
1757        assert_eq!(
1758            still_pending.error.as_deref(),
1759            Some("timed out waiting for receipt")
1760        );
1761        assert_eq!(
1762            still_pending.tx_hash.as_deref(),
1763            Some("0xbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")
1764        );
1765    }
1766
1767    #[tokio::test]
1768    async fn transaction_submitter_pending_reconciliation_does_not_block_hot_path() {
1769        let event_bus = Arc::new(MockEventBus::new());
1770        event_bus.clone().start_processing().await;
1771        let mut updates_rx = event_bus
1772            .subscribe(vec![TOPIC_TRANSACTION_UPDATES.to_string()])
1773            .await
1774            .expect("subscribe should succeed");
1775
1776        let shared_requests: SharedRequests = Arc::new(Mutex::new(Vec::new()));
1777        let started = Arc::new(Notify::new());
1778        let release = Arc::new(Notify::new());
1779        let submitter = TransactionSubmitter {
1780            event_bus: event_bus.clone(),
1781            tx_relayer: Arc::new(BlockingPendingRelayer {
1782                started: Arc::clone(&started),
1783                release: Arc::clone(&release),
1784                requests: shared_requests.clone(),
1785            }),
1786            exchange_address: test_exchange_address(),
1787            db: None,
1788            submitter_store: None,
1789            nonce_lock: Arc::new(Mutex::new(())),
1790        };
1791
1792        let now = timestamp_millis();
1793        let tx_req = build_rsm_tx_request("pending-hot-path", valid_signature_hex(), now + 60_000);
1794        let handle = tokio::spawn(async move {
1795            submitter.process_transaction_request(tx_req).await;
1796        });
1797
1798        let pending = recv_tx_update(&mut updates_rx).await;
1799        assert_eq!(pending.request_id, "pending-hot-path");
1800        assert_eq!(pending.status, TransactionStatus::Pending);
1801
1802        timeout(Duration::from_millis(200), started.notified())
1803            .await
1804            .expect("reconciliation task should start in background");
1805        timeout(Duration::from_millis(200), handle)
1806            .await
1807            .expect("hot path should return before reconciliation completes")
1808            .expect("transaction task should complete successfully");
1809
1810        assert!(
1811            timeout(Duration::from_millis(100), updates_rx.recv())
1812                .await
1813                .is_err(),
1814            "confirmed update should wait for reconciliation completion"
1815        );
1816
1817        release.notify_one();
1818
1819        let confirmed = recv_tx_update(&mut updates_rx).await;
1820        assert_eq!(confirmed.request_id, "pending-hot-path");
1821        assert_eq!(confirmed.status, TransactionStatus::Confirmed);
1822        assert_eq!(
1823            confirmed.tx_hash.as_deref(),
1824            Some("0xcccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc")
1825        );
1826    }
1827
1828    #[tokio::test]
1829    async fn transaction_submitter_serializes_nonce_selection_through_first_submit() {
1830        let event_bus = Arc::new(MockEventBus::new());
1831        event_bus.clone().start_processing().await;
1832        let relayer = Arc::new(BlockingFirstSendRelayer::new());
1833        let submitter = Arc::new(TransactionSubmitter {
1834            event_bus,
1835            tx_relayer: relayer.clone(),
1836            exchange_address: test_exchange_address(),
1837            db: None,
1838            submitter_store: None,
1839            nonce_lock: Arc::new(Mutex::new(())),
1840        });
1841
1842        let first_submitter = submitter.clone();
1843        let first = tokio::spawn(async move {
1844            first_submitter
1845                .submit_contract_call("first", contract_call_for_test("first"))
1846                .await
1847        });
1848
1849        timeout(
1850            Duration::from_secs(1),
1851            relayer.first_send_started.notified(),
1852        )
1853        .await
1854        .expect("first submit should enter send path");
1855
1856        let second_submitter = submitter.clone();
1857        let second = tokio::spawn(async move {
1858            second_submitter
1859                .submit_contract_call("second", contract_call_for_test("second"))
1860                .await
1861        });
1862
1863        assert!(
1864            timeout(
1865                Duration::from_millis(100),
1866                relayer.second_select_started.notified()
1867            )
1868            .await
1869            .is_err(),
1870            "second submit must not allocate a nonce while first submit is undecided"
1871        );
1872
1873        relayer.release_first_send.notify_one();
1874
1875        let first_result = first.await.expect("first task should join");
1876        assert!(first_result.is_err());
1877
1878        let second_result = second.await.expect("second task should join");
1879        assert_eq!(
1880            second_result.expect("second submit should succeed").nonce,
1881            21
1882        );
1883        assert_eq!(relayer.select_count.load(Ordering::SeqCst), 2);
1884    }
1885
1886    #[tokio::test]
1887    async fn transaction_submitter_persists_signed_attempt_before_send() {
1888        let event_bus = Arc::new(MockEventBus::new());
1889        event_bus.clone().start_processing().await;
1890        let store = Arc::new(InMemorySubmitterStore::default());
1891        let recorded_before_send = Arc::new(AtomicBool::new(false));
1892        let submitter = TransactionSubmitter {
1893            event_bus,
1894            tx_relayer: Arc::new(RecorderOrderingRelayer {
1895                recorded_before_send: recorded_before_send.clone(),
1896            }),
1897            exchange_address: test_exchange_address(),
1898            db: None,
1899            submitter_store: Some(store.clone()),
1900            nonce_lock: Arc::new(Mutex::new(())),
1901        };
1902
1903        let submission = submitter
1904            .submit_contract_call(
1905                "record-before-send",
1906                contract_call_for_test("record-before-send"),
1907            )
1908            .await
1909            .expect("submission should stay pending");
1910
1911        assert!(
1912            recorded_before_send.load(Ordering::SeqCst),
1913            "relayer must not send until the recorder accepted the signed attempt"
1914        );
1915        let detail = store
1916            .get_submission_by_nonce(&Address::repeat_byte(0x46), submission.nonce)
1917            .await
1918            .expect("db read")
1919            .expect("submission exists");
1920        assert_eq!(detail.status, SubmissionStatus::Broadcasted);
1921        assert_eq!(
1922            detail.primary_tx_hash.as_deref(),
1923            Some(submission.hash.as_str())
1924        );
1925        assert_eq!(detail.attempts.len(), 1);
1926        assert_eq!(detail.attempts[0].raw_tx, b"raw-signed-test-tx");
1927    }
1928
1929    #[tokio::test]
1930    async fn transaction_submitter_reconciles_all_replacement_attempt_hashes() {
1931        let event_bus = Arc::new(MockEventBus::new());
1932        event_bus.clone().start_processing().await;
1933        let mut updates_rx = event_bus
1934            .subscribe(vec![TOPIC_TRANSACTION_UPDATES.to_string()])
1935            .await
1936            .expect("subscribe should succeed");
1937
1938        let attempts = vec![
1939            "0x1111111111111111111111111111111111111111111111111111111111111111".to_string(),
1940            "0x2222222222222222222222222222222222222222222222222222222222222222".to_string(),
1941        ];
1942        let shared_requests: SharedRequests = Arc::new(Mutex::new(Vec::new()));
1943        let submitter = TransactionSubmitter {
1944            event_bus: event_bus.clone(),
1945            tx_relayer: Arc::new(ReplacementPendingRelayer {
1946                requests: shared_requests.clone(),
1947                expected_attempts: attempts.clone(),
1948                confirmed_hash: attempts[0].clone(),
1949            }),
1950            exchange_address: test_exchange_address(),
1951            db: None,
1952            submitter_store: None,
1953            nonce_lock: Arc::new(Mutex::new(())),
1954        };
1955
1956        let now = timestamp_millis();
1957        let tx_req = build_rsm_tx_request(
1958            "replacement-attempts-rsm",
1959            valid_signature_hex(),
1960            now + 60_000,
1961        );
1962        submitter.process_transaction_request(tx_req).await;
1963
1964        let pending = recv_tx_update(&mut updates_rx).await;
1965        assert_eq!(pending.request_id, "replacement-attempts-rsm");
1966        assert_eq!(pending.status, TransactionStatus::Pending);
1967        assert_eq!(
1968            pending.tx_hash.as_deref(),
1969            Some(attempts[1].as_str()),
1970            "hot path should expose the newest replacement hash while reconciliation runs"
1971        );
1972
1973        let confirmed = recv_tx_update(&mut updates_rx).await;
1974        assert_eq!(confirmed.request_id, "replacement-attempts-rsm");
1975        assert_eq!(confirmed.status, TransactionStatus::Confirmed);
1976        assert_eq!(
1977            confirmed.tx_hash.as_deref(),
1978            Some(attempts[0].as_str()),
1979            "reconciliation must finalize with the attempt that actually mined"
1980        );
1981
1982        let requests = take_requests(&shared_requests).await;
1983        assert_eq!(requests.len(), 1);
1984    }
1985
1986    #[tokio::test]
1987    async fn transaction_submitter_rsm_invalid_signature_emits_terminal_failure() {
1988        let event_bus = Arc::new(MockEventBus::new());
1989        event_bus.clone().start_processing().await;
1990        let mut updates_rx = event_bus
1991            .subscribe(vec![TOPIC_TRANSACTION_UPDATES.to_string()])
1992            .await
1993            .expect("subscribe should succeed");
1994
1995        let shared_requests: SharedRequests = Arc::new(Mutex::new(Vec::new()));
1996        let submitter = TransactionSubmitter {
1997            event_bus: event_bus.clone(),
1998            tx_relayer: Arc::new(CapturingTxRelayer::with_shared_capture(
1999                RelayerBehavior::Success,
2000                PendingReconciliationBehavior::Unsupported,
2001                shared_requests.clone(),
2002            )),
2003            exchange_address: test_exchange_address(),
2004            db: None,
2005            submitter_store: None,
2006            nonce_lock: Arc::new(Mutex::new(())),
2007        };
2008
2009        let now = timestamp_millis();
2010        let tx_req = build_rsm_tx_request("invalid-sig-rsm", "0xzz".to_string(), now + 60_000);
2011        submitter.process_transaction_request(tx_req).await;
2012
2013        let failed = recv_tx_update(&mut updates_rx).await;
2014        assert_eq!(failed.request_id, "invalid-sig-rsm");
2015        assert_eq!(failed.status, TransactionStatus::Failed);
2016        assert!(
2017            failed
2018                .error
2019                .as_deref()
2020                .is_some_and(|error| error.contains("invalid signature bytes")),
2021            "signature parse errors must emit an explicit terminal failure"
2022        );
2023
2024        let requests = take_requests(&shared_requests).await;
2025        assert!(
2026            requests.is_empty(),
2027            "invalid signatures must not be sent to relayer"
2028        );
2029    }
2030}