1pub mod exchange_encoder;
2mod tx_relayer;
3pub use exchange_encoder::{encode_execute_rsm_directive, encode_execute_user_directive};
4pub use tx_relayer::{
5 AwsKmsTxRelayer, DirectTxRelayer, MockTxRelayer, PendingTransactionStatus,
6 SubmittedTransaction, TxRelayerTrait,
7};
8use tx_relayer::{SignedAttemptRecorder, SignedTransactionAttempt};
9
10use alloy::primitives::{Address, Bytes};
11use async_trait::async_trait;
12use eyre::Result as EyreResult;
13use hypercall_db::AsyncDirectiveOutboxWriter;
14use hypercall_transaction_submitter_core::{
15 ContractCall, SubmissionStatus, SubmittedNonce, SubmitterId,
16};
17use hypercall_transaction_submitter_db::{
18 AsyncTransactionSubmitterStore, SubmissionAttemptRecord, SubmissionRecord,
19};
20use hypercall_types::{
21 topics::TOPIC_TRANSACTION_REQUESTS, EngineMessage, SignedDirectiveTx, TransactionRequest,
22 TransactionStatus, TransactionType, TransactionUpdate,
23};
24use serde::Deserialize;
25use std::str::FromStr;
26use std::sync::Arc;
27use std::time::{Duration, SystemTime, UNIX_EPOCH};
28use tokio::sync::{mpsc, Mutex};
29use tracing::{debug, error, info};
30
31#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq)]
32#[serde(rename_all = "snake_case")]
33pub enum TransactionSubmitterMode {
34 Mock,
35 Direct,
36 AwsKms,
37}
38
39impl FromStr for TransactionSubmitterMode {
40 type Err = String;
41
42 fn from_str(value: &str) -> Result<Self, Self::Err> {
43 match value.trim().to_ascii_lowercase().as_str() {
44 "mock" => Ok(Self::Mock),
45 "direct" => Ok(Self::Direct),
46 "aws_kms" => Ok(Self::AwsKms),
47 other => Err(format!(
48 "expected one of: mock, direct, aws_kms; got {}",
49 other
50 )),
51 }
52 }
53}
54
55#[derive(Debug, Clone, Deserialize)]
56#[serde(default, deny_unknown_fields)]
57pub struct TransactionSubmitterConfig {
58 pub mode: TransactionSubmitterMode,
59 pub aws_kms_key_id: String,
60 pub max_gas_price: String,
61 pub rpc_url: String,
62}
63
64impl Default for TransactionSubmitterConfig {
65 fn default() -> Self {
66 Self {
67 mode: TransactionSubmitterMode::Mock,
68 aws_kms_key_id: String::new(),
69 max_gas_price: "500000000000".to_string(),
70 rpc_url: "https://rpc.hyperliquid-testnet.xyz/evm".to_string(),
71 }
72 }
73}
74
75#[derive(Debug, Clone, Default)]
76pub struct TransactionSubmitterSecrets {
77 pub transaction_submitter_private_key: Option<String>,
78}
79
80impl TransactionSubmitterSecrets {
81 pub fn require_transaction_submitter_private_key(&self) -> eyre::Result<&str> {
82 self.transaction_submitter_private_key
83 .as_deref()
84 .ok_or_else(|| {
85 eyre::eyre!("TRANSACTION_SUBMITTER_PRIVATE_KEY must be set in the environment")
86 })
87 }
88}
89
90#[async_trait]
91pub trait EventBusTrait: Send + Sync {
92 fn get_sender(&self) -> mpsc::UnboundedSender<EngineMessage>;
93
94 async fn subscribe(
95 &self,
96 topics: Vec<String>,
97 ) -> Result<mpsc::UnboundedReceiver<EngineMessage>, String>;
98}
99
100type DirectiveEncoder = fn(Address, Bytes, Bytes, Option<String>) -> EyreResult<ContractCall>;
101type DirectiveOutboxDb = Arc<dyn AsyncDirectiveOutboxWriter>;
102type SubmitterStoreDb = Arc<dyn AsyncTransactionSubmitterStore>;
103
104const PENDING_RECONCILIATION_RETRY_DELAY: Duration = Duration::from_secs(30);
105const PERSISTED_PENDING_SUBMISSION_BATCH_SIZE: i64 = 1000;
106
107#[derive(Debug)]
108enum RelayDirectiveError {
109 Build(String),
110 Relay(String),
111 Terminal(String),
112}
113
114struct SubmitterAttemptRecorder {
115 store: Option<SubmitterStoreDb>,
116}
117
118#[async_trait]
119impl SignedAttemptRecorder for SubmitterAttemptRecorder {
120 async fn record_signed_attempt(&self, attempt: SignedTransactionAttempt) -> EyreResult<()> {
121 let Some(store) = self.store.as_ref() else {
122 return Ok(());
123 };
124 let record = SubmissionRecord {
125 submitter: attempt.submitter,
126 nonce: attempt.nonce,
127 status: SubmissionStatus::Created,
128 primary_tx_hash: Some(attempt.tx_hash.clone()),
129 terminal_error: None,
130 };
131 let attempts = vec![SubmissionAttemptRecord {
132 tx_hash: attempt.tx_hash,
133 raw_tx: attempt.raw_tx,
134 }];
135 TransactionSubmitter::persist_submitter_submission_with_db(
136 Arc::clone(store),
137 record,
138 attempts,
139 )
140 .await
141 }
142}
143
144pub struct TransactionSubmitter {
150 event_bus: Arc<dyn EventBusTrait>,
151 tx_relayer: Arc<dyn TxRelayerTrait>,
152 exchange_address: Address,
153 db: Option<DirectiveOutboxDb>,
158 submitter_store: Option<SubmitterStoreDb>,
161 nonce_lock: Arc<Mutex<()>>,
168}
169
170impl TransactionSubmitter {
171 pub async fn new_direct(
173 event_bus: Arc<dyn EventBusTrait>,
174 config: &TransactionSubmitterConfig,
175 exchange_contract_address: &str,
176 secrets: &TransactionSubmitterSecrets,
177 ) -> EyreResult<Self> {
178 let tx_relayer = DirectTxRelayer::new(config, secrets).await?;
179 let exchange_address = exchange_encoder::parse_exchange_address(exchange_contract_address)?;
180 Ok(Self {
181 event_bus,
182 tx_relayer: Arc::new(tx_relayer),
183 exchange_address,
184 db: None,
185 submitter_store: None,
186 nonce_lock: Arc::new(Mutex::new(())),
187 })
188 }
189
190 pub async fn new_aws_kms(
192 event_bus: Arc<dyn EventBusTrait>,
193 config: &TransactionSubmitterConfig,
194 exchange_contract_address: &str,
195 ) -> EyreResult<Self> {
196 let tx_relayer = AwsKmsTxRelayer::new(config).await?;
197 let exchange_address = exchange_encoder::parse_exchange_address(exchange_contract_address)?;
198 Ok(Self {
199 event_bus,
200 tx_relayer: Arc::new(tx_relayer),
201 exchange_address,
202 db: None,
203 submitter_store: None,
204 nonce_lock: Arc::new(Mutex::new(())),
205 })
206 }
207
208 pub fn new_mock(
210 event_bus: Arc<dyn EventBusTrait>,
211 exchange_contract_address: &str,
212 ) -> EyreResult<Self> {
213 Ok(Self {
214 event_bus,
215 tx_relayer: Arc::new(MockTxRelayer),
216 exchange_address: exchange_encoder::parse_exchange_address(exchange_contract_address)?,
217 db: None,
218 submitter_store: None,
219 nonce_lock: Arc::new(Mutex::new(())),
220 })
221 }
222
223 pub fn with_db(mut self, db: DirectiveOutboxDb) -> Self {
227 self.db = Some(db);
228 self
229 }
230
231 pub fn with_submitter_store(mut self, submitter_store: SubmitterStoreDb) -> Self {
232 self.submitter_store = Some(submitter_store);
233 self
234 }
235
236 pub fn start(self: Arc<Self>) -> Result<tokio::task::JoinHandle<()>, String> {
239 let (tx, rx) = tokio::sync::broadcast::channel::<()>(1);
242 std::mem::forget(tx);
243 self.start_with_shutdown(rx)
244 }
245
246 pub fn start_with_shutdown(
250 self: Arc<Self>,
251 mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
252 ) -> Result<tokio::task::JoinHandle<()>, String> {
253 info!("Starting Transaction Submitter service...");
254
255 let event_bus = self.event_bus.clone();
257 let submitter = self.clone();
258 let handle = tokio::spawn(async move {
259 submitter.spawn_persisted_pending_submissions().await;
260
261 let mut tx_receiver = match event_bus
262 .subscribe(vec![TOPIC_TRANSACTION_REQUESTS.to_string()])
263 .await
264 {
265 Ok(rx) => rx,
266 Err(e) => {
267 error!("Failed to subscribe to transaction requests: {}", e);
268 return;
269 }
270 };
271
272 loop {
273 tokio::select! {
274 _ = shutdown_rx.recv() => {
275 info!("Transaction Submitter received shutdown signal");
276 break;
277 }
278 maybe_message = tx_receiver.recv() => {
279 match maybe_message {
280 Some(EngineMessage::TransactionRequest(tx_req)) => {
281 submitter.process_transaction_request(tx_req).await;
282 }
283 Some(_) => {} None => break, }
286 }
287 }
288 }
289 info!("Transaction Submitter stopped");
290 });
291
292 info!("✓ Transaction Submitter service started");
293 Ok(handle)
294 }
295
296 async fn spawn_persisted_pending_submissions(&self) {
297 let Some(submitter_store) = self.submitter_store.as_ref() else {
298 return;
299 };
300
301 let mut after_submission_id = 0_i64;
302 loop {
303 let rows = match submitter_store
304 .list_pending_submissions(
305 after_submission_id,
306 PERSISTED_PENDING_SUBMISSION_BATCH_SIZE,
307 )
308 .await
309 {
310 Ok(rows) => rows,
311 Err(error) => {
312 error!("Failed to load persisted pending submitter submissions: {error}");
313 return;
314 }
315 };
316
317 if rows.is_empty() {
318 break;
319 }
320
321 for row in rows {
322 after_submission_id = row.submission_id;
323 if let Some(directive_id) = row.directive_id {
324 self.spawn_pending_reconciliation(
325 directive_id,
326 row.submitter,
327 row.nonce,
328 row.tx_hashes,
329 );
330 } else {
331 self.spawn_pending_submitter_reconciliation(
332 row.submitter,
333 row.nonce,
334 row.tx_hashes,
335 );
336 }
337 }
338 }
339 }
340
341 async fn process_transaction_request(&self, tx_req: TransactionRequest) {
343 info!("Processing transaction request: {}", tx_req.request_id);
344
345 let current_time = timestamp_millis();
347 if current_time > tx_req.expires_at {
348 self.persist_outbox_update(
349 &tx_req.request_id,
350 TransactionStatus::Expired,
351 None,
352 Some("Transaction request expired".to_string()),
353 )
354 .await;
355 self.send_update(
356 &tx_req.request_id,
357 TransactionStatus::Expired,
358 None,
359 Some("Transaction request expired".to_string()),
360 )
361 .await;
362 return;
363 }
364
365 let (directive, encoder, log_label): (&SignedDirectiveTx, DirectiveEncoder, &'static str) =
366 match &tx_req.transaction_type {
367 TransactionType::UserDirective(user_directive) => {
368 (user_directive, encode_execute_user_directive, "user")
369 }
370 TransactionType::RsmDirective(rsm_directive) => {
371 (rsm_directive, encode_execute_rsm_directive, "RSM")
372 }
373 };
374
375 self.handle_signed_directive(
376 &tx_req.request_id,
377 &tx_req.account_contract,
378 directive,
379 encoder,
380 log_label,
381 )
382 .await;
383 }
384
385 async fn handle_signed_directive(
387 &self,
388 request_id: &str,
389 account_contract: &hypercall_types::WalletAddress,
390 directive: &SignedDirectiveTx,
391 encoder: DirectiveEncoder,
392 log_label: &str,
393 ) {
394 info!(
395 "Submitting {} directive: request_id={}, account={}",
396 log_label, request_id, account_contract
397 );
398
399 match self
400 .submit_signed_directive(request_id, directive, encoder, log_label)
401 .await
402 {
403 Ok(submission) => {
404 let tx_hash = Some(submission.hash.clone());
405 let status = if submission.confirmed {
406 TransactionStatus::Confirmed
407 } else {
408 TransactionStatus::Pending
409 };
410 self.persist_outbox_update(request_id, status, tx_hash.clone(), None)
411 .await;
412 self.send_update(request_id, status, tx_hash, None).await;
413 if !submission.confirmed {
414 self.spawn_pending_reconciliation(
415 request_id.to_string(),
416 submission.submitter,
417 submission.nonce,
418 submission.all_hashes,
419 );
420 }
421 info!(
422 "{} directive submitted: {} confirmed={}",
423 log_label, request_id, submission.confirmed
424 );
425 }
426 Err(RelayDirectiveError::Build(error)) => {
427 error!(
428 "{} directive relay payload build failed: {} - {}; emitting terminal failure",
429 log_label, request_id, error
430 );
431 self.persist_outbox_update(
432 request_id,
433 TransactionStatus::Failed,
434 None,
435 Some(error.clone()),
436 )
437 .await;
438 self.send_update(request_id, TransactionStatus::Failed, None, Some(error))
439 .await;
440 }
441 Err(RelayDirectiveError::Relay(error)) => {
442 error!(
443 "{} directive relay submission failed: {} - {}; outbox row remains retryable",
444 log_label, request_id, error
445 );
446 self.persist_outbox_update(
447 request_id,
448 TransactionStatus::Submitted,
449 None,
450 Some(error.clone()),
451 )
452 .await;
453 self.send_update(request_id, TransactionStatus::Submitted, None, Some(error))
454 .await;
455 }
456 Err(RelayDirectiveError::Terminal(error)) => {
457 error!(
458 "{} directive relay terminal failure: {} - {}; emitting terminal failure",
459 log_label, request_id, error
460 );
461 self.persist_outbox_update(
462 request_id,
463 TransactionStatus::Failed,
464 None,
465 Some(error.clone()),
466 )
467 .await;
468 self.send_update(request_id, TransactionStatus::Failed, None, Some(error))
469 .await;
470 }
471 }
472 }
473
474 async fn submit_signed_directive(
476 &self,
477 request_id: &str,
478 signed_directive: &SignedDirectiveTx,
479 encoder: DirectiveEncoder,
480 log_label: &str,
481 ) -> Result<SubmittedTransaction, RelayDirectiveError> {
482 info!(
483 "Building {} directive transaction payload: request_id={}",
484 log_label, request_id
485 );
486
487 let directive = Bytes::from(signed_directive.directive.clone());
488 let signature = Bytes::from_str(&signed_directive.signature)
489 .map_err(|e| RelayDirectiveError::Build(format!("invalid signature bytes: {}", e)))?;
490 let contract_call = encoder(
491 self.exchange_address,
492 directive,
493 signature,
494 Some(request_id.to_string()),
495 )
496 .map_err(|e| RelayDirectiveError::Build(e.to_string()))?;
497
498 let calldata_hex = hex::encode(contract_call.data.as_ref());
499 error!(
500 request_id,
501 log_label,
502 exchange_address = %self.exchange_address,
503 tx_to = %contract_call.to,
504 tx_value = %contract_call.value,
505 calldata = %format!("0x{calldata_hex}"),
506 directive = %format!("0x{}", hex::encode(&signed_directive.directive)),
507 signature = %signed_directive.signature,
508 "prepared directive transaction payload"
509 );
510
511 self.submit_contract_call(request_id, contract_call)
512 .await
513 .map_err(|e| {
514 let error = e.to_string();
515 if is_terminal_relay_error(&error) {
516 RelayDirectiveError::Terminal(error)
517 } else {
518 RelayDirectiveError::Relay(error)
519 }
520 })
521 }
522
523 async fn submit_contract_call(
524 &self,
525 request_id: &str,
526 contract_call: ContractCall,
527 ) -> EyreResult<SubmittedTransaction> {
528 let _nonce_guard = self.nonce_lock.lock().await;
529 let submitter = self.tx_relayer.submitter_address();
530 let db_nonce_floor = match self.submitter_store.as_ref() {
531 Some(store) => store
532 .max_nonce_for_submitter(&submitter)
533 .await
534 .map_err(|error| eyre::eyre!("failed to load submitter nonce floor: {error}"))?
535 .map(|nonce| nonce.saturating_add(1)),
536 None => None,
537 };
538 let nonce = self.tx_relayer.select_next_nonce(db_nonce_floor).await?;
539 self.persist_created_submitter_submission(submitter, nonce)
540 .await?;
541 self.persist_directive_submitter_pointer(request_id, submitter, nonce)
542 .await?;
543
544 let recorder: Arc<dyn SignedAttemptRecorder> = Arc::new(SubmitterAttemptRecorder {
545 store: self.submitter_store.as_ref().map(Arc::clone),
546 });
547 let submission = match self
548 .tx_relayer
549 .send_transaction_with_nonce_recording_attempts(contract_call, nonce, recorder)
550 .await
551 {
552 Ok(submission) => submission,
553 Err(error) => {
554 if let Some(submitter_store) = self.submitter_store.as_ref() {
555 Self::persist_submitter_status_with_db(
556 Arc::clone(submitter_store),
557 submitter,
558 nonce,
559 SubmissionStatus::Failed,
560 None,
561 Some(error.to_string()),
562 )
563 .await;
564 }
565 return Err(error);
566 }
567 };
568 let status = if submission.confirmed {
569 SubmissionStatus::Confirmed
570 } else {
571 SubmissionStatus::Broadcasted
572 };
573 if let Some(submitter_store) = self.submitter_store.as_ref() {
574 Self::persist_submitter_status_with_db(
575 Arc::clone(submitter_store),
576 submission.submitter,
577 submission.nonce,
578 status,
579 Some(submission.hash.clone()),
580 None,
581 )
582 .await;
583 }
584
585 info!("Transaction submitted: {:?}", submission);
586 Ok(submission)
587 }
588
589 async fn send_update(
591 &self,
592 request_id: &str,
593 status: TransactionStatus,
594 tx_hash: Option<String>,
595 error: Option<String>,
596 ) {
597 let update = TransactionUpdate {
598 request_id: request_id.to_string(),
599 status,
600 tx_hash,
601 error,
602 timestamp: timestamp_millis(),
603 gas_used: None,
604 gas_price: None,
605 };
606
607 debug!("Sending transaction update: {:?}", update);
608
609 if let Err(e) = self
610 .event_bus
611 .get_sender()
612 .send(EngineMessage::TransactionUpdate(update))
613 {
614 error!("Failed to send transaction update: {}", e);
615 }
616 }
617
618 async fn persist_outbox_update(
620 &self,
621 request_id: &str,
622 status: TransactionStatus,
623 tx_hash: Option<String>,
624 error_msg: Option<String>,
625 ) {
626 let Some(db) = self.db.as_ref() else {
627 return;
628 };
629 Self::persist_outbox_update_with_db(
630 Arc::clone(db),
631 request_id.to_string(),
632 status,
633 tx_hash,
634 error_msg,
635 )
636 .await;
637 }
638
639 async fn persist_outbox_update_with_db(
640 db: DirectiveOutboxDb,
641 request_id: String,
642 status: TransactionStatus,
643 tx_hash: Option<String>,
644 error_msg: Option<String>,
645 ) {
646 if let Err(e) = db
647 .persist_directive_transaction_update(
648 &request_id,
649 status,
650 tx_hash.as_deref(),
651 error_msg.as_deref(),
652 )
653 .await
654 {
655 error!("Failed to persist outbox update for {}: {}", request_id, e);
656 }
657 }
658
659 async fn persist_created_submitter_submission(
660 &self,
661 submitter: SubmitterId,
662 nonce: SubmittedNonce,
663 ) -> EyreResult<()> {
664 let Some(submitter_store) = self.submitter_store.as_ref() else {
665 return Ok(());
666 };
667 let record = SubmissionRecord {
668 submitter,
669 nonce,
670 status: SubmissionStatus::Created,
671 primary_tx_hash: None,
672 terminal_error: None,
673 };
674 Self::persist_submitter_submission_with_db(Arc::clone(submitter_store), record, Vec::new())
675 .await
676 }
677
678 async fn persist_submitter_submission_with_db(
679 submitter_store: SubmitterStoreDb,
680 record: SubmissionRecord,
681 attempts: Vec<SubmissionAttemptRecord>,
682 ) -> EyreResult<()> {
683 submitter_store
684 .record_submission(&record, &attempts)
685 .await
686 .map_err(|e| {
687 eyre::eyre!(
688 "failed to persist submitter submission {}:{}: {}",
689 record.submitter,
690 record.nonce,
691 e
692 )
693 })
694 }
695
696 async fn persist_submitter_status_with_db(
697 submitter_store: SubmitterStoreDb,
698 submitter: SubmitterId,
699 nonce: SubmittedNonce,
700 status: SubmissionStatus,
701 primary_tx_hash: Option<String>,
702 terminal_error: Option<String>,
703 ) {
704 if let Err(e) = submitter_store
705 .update_submission_status(
706 &submitter,
707 nonce,
708 status,
709 primary_tx_hash.as_deref(),
710 terminal_error.as_deref(),
711 )
712 .await
713 {
714 error!(
715 "Failed to update submitter submission {}:{}: {}",
716 submitter, nonce, e
717 );
718 }
719 }
720
721 async fn persist_directive_submitter_pointer(
722 &self,
723 request_id: &str,
724 submitter: SubmitterId,
725 nonce: SubmittedNonce,
726 ) -> EyreResult<()> {
727 let Some(db) = self.db.as_ref() else {
728 return Ok(());
729 };
730 Self::persist_directive_submitter_pointer_with_db(
731 Arc::clone(db),
732 request_id.to_string(),
733 submitter,
734 nonce,
735 )
736 .await
737 }
738
739 async fn persist_directive_submitter_pointer_with_db(
740 db: DirectiveOutboxDb,
741 request_id: String,
742 submitter: SubmitterId,
743 nonce: SubmittedNonce,
744 ) -> EyreResult<()> {
745 db.record_directive_submitter_submission(&request_id, &submitter, nonce)
746 .await
747 .map_err(|e| {
748 eyre::eyre!(
749 "failed to persist directive submitter pointer for {}: {}",
750 request_id,
751 e
752 )
753 })
754 }
755
756 fn spawn_pending_reconciliation(
757 &self,
758 request_id: String,
759 submitter: SubmitterId,
760 nonce: SubmittedNonce,
761 tx_hashes: Vec<String>,
762 ) {
763 let sender = self.event_bus.get_sender();
764 let tx_relayer = Arc::clone(&self.tx_relayer);
765 let db = self.db.clone();
766 let submitter_store = self.submitter_store.clone();
767
768 tokio::spawn(async move {
769 let primary_tx_hash = tx_hashes
770 .last()
771 .cloned()
772 .unwrap_or_else(|| "unknown".to_string());
773 loop {
774 match tx_relayer.reconcile_pending_transactions(&tx_hashes).await {
775 Ok(PendingTransactionStatus::Unsupported) => {}
776 Ok(PendingTransactionStatus::Pending(reason)) => {
777 send_update_via_sender(
778 &sender,
779 request_id.clone(),
780 TransactionStatus::Pending,
781 Some(primary_tx_hash.clone()),
782 Some(reason),
783 );
784 tokio::time::sleep(PENDING_RECONCILIATION_RETRY_DELAY).await;
785 continue;
786 }
787 Ok(PendingTransactionStatus::Confirmed(confirmed_tx_hash)) => {
788 if let Some(submitter_store) = submitter_store.as_ref() {
789 Self::persist_submitter_status_with_db(
790 Arc::clone(submitter_store),
791 submitter,
792 nonce,
793 SubmissionStatus::Confirmed,
794 Some(confirmed_tx_hash.clone()),
795 None,
796 )
797 .await;
798 }
799 if let Some(db) = db.as_ref() {
800 Self::persist_outbox_update_with_db(
801 Arc::clone(db),
802 request_id.clone(),
803 TransactionStatus::Confirmed,
804 Some(confirmed_tx_hash.clone()),
805 None,
806 )
807 .await;
808 }
809 send_update_via_sender(
810 &sender,
811 request_id.clone(),
812 TransactionStatus::Confirmed,
813 Some(confirmed_tx_hash),
814 None,
815 );
816 }
817 Ok(PendingTransactionStatus::Failed(error)) => {
818 if let Some(submitter_store) = submitter_store.as_ref() {
819 Self::persist_submitter_status_with_db(
820 Arc::clone(submitter_store),
821 submitter,
822 nonce,
823 SubmissionStatus::Failed,
824 Some(primary_tx_hash.clone()),
825 Some(error.clone()),
826 )
827 .await;
828 }
829 if let Some(db) = db.as_ref() {
830 Self::persist_outbox_update_with_db(
831 Arc::clone(db),
832 request_id.clone(),
833 TransactionStatus::Failed,
834 Some(primary_tx_hash.clone()),
835 Some(error.clone()),
836 )
837 .await;
838 }
839 send_update_via_sender(
840 &sender,
841 request_id.clone(),
842 TransactionStatus::Failed,
843 Some(primary_tx_hash.clone()),
844 Some(error),
845 );
846 }
847 Err(error) => {
848 send_update_via_sender(
849 &sender,
850 request_id.clone(),
851 TransactionStatus::Pending,
852 Some(primary_tx_hash.clone()),
853 Some(format!(
854 "pending transaction reconciliation failed: {:?}",
855 error
856 )),
857 );
858 tokio::time::sleep(PENDING_RECONCILIATION_RETRY_DELAY).await;
859 continue;
860 }
861 }
862 break;
863 }
864 });
865 }
866
867 fn spawn_pending_submitter_reconciliation(
868 &self,
869 submitter: SubmitterId,
870 nonce: SubmittedNonce,
871 tx_hashes: Vec<String>,
872 ) {
873 let tx_relayer = Arc::clone(&self.tx_relayer);
874 let submitter_store = self.submitter_store.clone();
875
876 tokio::spawn(async move {
877 loop {
878 match tx_relayer.reconcile_pending_transactions(&tx_hashes).await {
879 Ok(PendingTransactionStatus::Unsupported) => {}
880 Ok(PendingTransactionStatus::Pending(reason)) => {
881 tracing::debug!(
882 %submitter,
883 %nonce,
884 %reason,
885 "submitter submission still pending"
886 );
887 tokio::time::sleep(PENDING_RECONCILIATION_RETRY_DELAY).await;
888 continue;
889 }
890 Ok(PendingTransactionStatus::Confirmed(confirmed_tx_hash)) => {
891 if let Some(submitter_store) = submitter_store.as_ref() {
892 Self::persist_submitter_status_with_db(
893 Arc::clone(submitter_store),
894 submitter,
895 nonce,
896 SubmissionStatus::Confirmed,
897 Some(confirmed_tx_hash),
898 None,
899 )
900 .await;
901 }
902 }
903 Ok(PendingTransactionStatus::Failed(error)) => {
904 if let Some(submitter_store) = submitter_store.as_ref() {
905 Self::persist_submitter_status_with_db(
906 Arc::clone(submitter_store),
907 submitter,
908 nonce,
909 SubmissionStatus::Failed,
910 None,
911 Some(error),
912 )
913 .await;
914 }
915 }
916 Err(error) => {
917 tracing::warn!(
918 %submitter,
919 %nonce,
920 ?error,
921 "submitter pending reconciliation failed"
922 );
923 tokio::time::sleep(PENDING_RECONCILIATION_RETRY_DELAY).await;
924 continue;
925 }
926 }
927 break;
928 }
929 });
930 }
931}
932
933fn send_update_via_sender(
934 sender: &mpsc::UnboundedSender<EngineMessage>,
935 request_id: String,
936 status: TransactionStatus,
937 tx_hash: Option<String>,
938 error: Option<String>,
939) {
940 let update = TransactionUpdate {
941 request_id,
942 status,
943 tx_hash,
944 error,
945 timestamp: timestamp_millis(),
946 gas_used: None,
947 gas_price: None,
948 };
949
950 debug!("Sending transaction update: {:?}", update);
951
952 if let Err(send_error) = sender.send(EngineMessage::TransactionUpdate(update)) {
953 error!("Failed to send transaction update: {}", send_error);
954 }
955}
956
957fn is_terminal_relay_error(error: &str) -> bool {
958 error.contains("direct submitter transaction reverted")
959 || error.contains("aws_kms submitter transaction reverted")
960}
961
962fn timestamp_millis() -> u64 {
963 SystemTime::now()
964 .duration_since(UNIX_EPOCH)
965 .expect("system clock must not be before Unix epoch")
966 .as_millis() as u64
967}
968
969#[cfg(test)]
970mod tests {
971 use super::*;
972 use alloy::primitives::{keccak256, Address, TxHash, U256};
973 use alloy::sol_types::SolCall;
974 use async_trait::async_trait;
975 use hypercall_transaction_submitter_db::{
976 PendingSubmissionRow, SubmissionDetailRow, TransactionSubmitterReader,
977 };
978 use hypercall_types::topics::TOPIC_TRANSACTION_UPDATES;
979 use hypercall_types::TransactionType;
980 use hypercall_types::WalletAddress;
981 use std::collections::HashMap;
982 use std::str::FromStr;
983 use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
984 use tokio::sync::{Mutex, Notify};
985 use tokio::time::{timeout, Duration};
986
987 const ACCOUNT: &str = "0x1111111111111111111111111111111111111111";
988
989 enum RelayerBehavior {
990 Success,
991 Pending,
992 Fail,
993 Reverted,
994 }
995
996 #[derive(Clone)]
997 #[allow(dead_code)]
998 enum PendingReconciliationBehavior {
999 Unsupported,
1000 StillPending(&'static str),
1001 Confirmed,
1002 Failed(&'static str),
1003 }
1004
1005 type SharedRequests = Arc<Mutex<Vec<ContractCall>>>;
1006
1007 struct MockEventBus {
1008 sender: mpsc::UnboundedSender<EngineMessage>,
1009 receiver: Mutex<Option<mpsc::UnboundedReceiver<EngineMessage>>>,
1010 subscribers: Mutex<Vec<(Vec<String>, mpsc::UnboundedSender<EngineMessage>)>>,
1011 }
1012
1013 impl MockEventBus {
1014 fn new() -> Self {
1015 let (sender, receiver) = mpsc::unbounded_channel();
1016 Self {
1017 sender,
1018 receiver: Mutex::new(Some(receiver)),
1019 subscribers: Mutex::new(Vec::new()),
1020 }
1021 }
1022
1023 async fn start_processing(self: Arc<Self>) {
1024 let Some(mut receiver) = self.receiver.lock().await.take() else {
1025 return;
1026 };
1027 tokio::spawn(async move {
1028 while let Some(message) = receiver.recv().await {
1029 let topic = match &message {
1030 EngineMessage::TransactionRequest(_) => TOPIC_TRANSACTION_REQUESTS,
1031 EngineMessage::TransactionUpdate(_) => TOPIC_TRANSACTION_UPDATES,
1032 _ => continue,
1033 };
1034 let subscribers = self.subscribers.lock().await;
1035 for (topics, sender) in subscribers.iter() {
1036 if topics.iter().any(|candidate| candidate == topic) {
1037 let _ = sender.send(message.clone());
1038 }
1039 }
1040 }
1041 });
1042 }
1043 }
1044
1045 #[async_trait]
1046 impl EventBusTrait for MockEventBus {
1047 fn get_sender(&self) -> mpsc::UnboundedSender<EngineMessage> {
1048 self.sender.clone()
1049 }
1050
1051 async fn subscribe(
1052 &self,
1053 topics: Vec<String>,
1054 ) -> Result<mpsc::UnboundedReceiver<EngineMessage>, String> {
1055 let (sender, receiver) = mpsc::unbounded_channel();
1056 self.subscribers.lock().await.push((topics, sender));
1057 Ok(receiver)
1058 }
1059 }
1060
1061 struct CapturingTxRelayer {
1062 behavior: RelayerBehavior,
1063 pending_reconciliation: PendingReconciliationBehavior,
1064 requests: SharedRequests,
1065 }
1066
1067 impl CapturingTxRelayer {
1068 fn with_shared_capture(
1069 behavior: RelayerBehavior,
1070 pending_reconciliation: PendingReconciliationBehavior,
1071 requests: SharedRequests,
1072 ) -> Self {
1073 Self {
1074 behavior,
1075 pending_reconciliation,
1076 requests,
1077 }
1078 }
1079 }
1080
1081 #[async_trait]
1082 impl TxRelayerTrait for CapturingTxRelayer {
1083 fn submitter_address(&self) -> SubmitterId {
1084 Address::repeat_byte(0x42)
1085 }
1086
1087 async fn send_transaction(
1088 &self,
1089 request: ContractCall,
1090 db_nonce_floor: Option<u64>,
1091 ) -> EyreResult<SubmittedTransaction> {
1092 self.requests.lock().await.push(request);
1093 let nonce = db_nonce_floor.unwrap_or(7);
1094
1095 match self.behavior {
1096 RelayerBehavior::Success => Ok(SubmittedTransaction::new(
1097 self.submitter_address(),
1098 nonce,
1099 TxHash::from_str(
1100 "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
1101 )
1102 .expect("valid tx hash")
1103 .to_string(),
1104 true,
1105 )),
1106 RelayerBehavior::Pending => Ok(SubmittedTransaction::new(
1107 self.submitter_address(),
1108 nonce,
1109 TxHash::from_str(
1110 "0xbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb",
1111 )
1112 .expect("valid tx hash")
1113 .to_string(),
1114 false,
1115 )),
1116 RelayerBehavior::Fail => Err(eyre::eyre!("forced failure")),
1117 RelayerBehavior::Reverted => Err(eyre::eyre!(
1118 "direct submitter transaction reverted: hash=0xdddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddd"
1119 )),
1120 }
1121 }
1122
1123 async fn reconcile_pending_transaction(
1124 &self,
1125 _tx_hash: &str,
1126 ) -> EyreResult<PendingTransactionStatus> {
1127 Ok(match self.pending_reconciliation {
1128 PendingReconciliationBehavior::Unsupported => PendingTransactionStatus::Unsupported,
1129 PendingReconciliationBehavior::StillPending(reason) => {
1130 PendingTransactionStatus::Pending(reason.to_string())
1131 }
1132 PendingReconciliationBehavior::Confirmed => {
1133 PendingTransactionStatus::Confirmed(_tx_hash.to_string())
1134 }
1135 PendingReconciliationBehavior::Failed(error) => {
1136 PendingTransactionStatus::Failed(error.to_string())
1137 }
1138 })
1139 }
1140 }
1141
1142 async fn take_requests(shared_requests: &SharedRequests) -> Vec<ContractCall> {
1143 shared_requests.lock().await.clone()
1144 }
1145
1146 struct BlockingPendingRelayer {
1147 started: Arc<Notify>,
1148 release: Arc<Notify>,
1149 requests: SharedRequests,
1150 }
1151
1152 #[async_trait]
1153 impl TxRelayerTrait for BlockingPendingRelayer {
1154 fn submitter_address(&self) -> SubmitterId {
1155 Address::repeat_byte(0x43)
1156 }
1157
1158 async fn send_transaction(
1159 &self,
1160 request: ContractCall,
1161 db_nonce_floor: Option<u64>,
1162 ) -> EyreResult<SubmittedTransaction> {
1163 self.requests.lock().await.push(request);
1164 Ok(SubmittedTransaction::new(
1165 self.submitter_address(),
1166 db_nonce_floor.unwrap_or(8),
1167 TxHash::from_str(
1168 "0xcccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc",
1169 )
1170 .expect("valid tx hash")
1171 .to_string(),
1172 false,
1173 ))
1174 }
1175
1176 async fn reconcile_pending_transaction(
1177 &self,
1178 _tx_hash: &str,
1179 ) -> EyreResult<PendingTransactionStatus> {
1180 self.started.notify_one();
1181 self.release.notified().await;
1182 Ok(PendingTransactionStatus::Confirmed(_tx_hash.to_string()))
1183 }
1184 }
1185
1186 struct ReplacementPendingRelayer {
1187 requests: SharedRequests,
1188 expected_attempts: Vec<String>,
1189 confirmed_hash: String,
1190 }
1191
1192 #[async_trait]
1193 impl TxRelayerTrait for ReplacementPendingRelayer {
1194 fn submitter_address(&self) -> SubmitterId {
1195 Address::repeat_byte(0x44)
1196 }
1197
1198 async fn send_transaction(
1199 &self,
1200 request: ContractCall,
1201 db_nonce_floor: Option<u64>,
1202 ) -> EyreResult<SubmittedTransaction> {
1203 self.requests.lock().await.push(request);
1204 Ok(SubmittedTransaction::with_hashes(
1205 self.submitter_address(),
1206 db_nonce_floor.unwrap_or(9),
1207 self.expected_attempts
1208 .last()
1209 .expect("test attempts should not be empty")
1210 .clone(),
1211 self.expected_attempts.clone(),
1212 false,
1213 ))
1214 }
1215
1216 async fn reconcile_pending_transactions(
1217 &self,
1218 tx_hashes: &[String],
1219 ) -> EyreResult<PendingTransactionStatus> {
1220 assert_eq!(tx_hashes, self.expected_attempts.as_slice());
1221 Ok(PendingTransactionStatus::Confirmed(
1222 self.confirmed_hash.clone(),
1223 ))
1224 }
1225 }
1226
1227 struct BlockingFirstSendRelayer {
1228 select_count: AtomicU64,
1229 send_count: AtomicU64,
1230 first_send_started: Notify,
1231 second_select_started: Notify,
1232 release_first_send: Notify,
1233 }
1234
1235 impl BlockingFirstSendRelayer {
1236 fn new() -> Self {
1237 Self {
1238 select_count: AtomicU64::new(0),
1239 send_count: AtomicU64::new(0),
1240 first_send_started: Notify::new(),
1241 second_select_started: Notify::new(),
1242 release_first_send: Notify::new(),
1243 }
1244 }
1245 }
1246
1247 struct RecorderOrderingRelayer {
1248 recorded_before_send: Arc<AtomicBool>,
1249 }
1250
1251 #[async_trait]
1252 impl TxRelayerTrait for RecorderOrderingRelayer {
1253 fn submitter_address(&self) -> SubmitterId {
1254 Address::repeat_byte(0x46)
1255 }
1256
1257 async fn send_transaction_with_nonce_recording_attempts(
1258 &self,
1259 _request: ContractCall,
1260 nonce: SubmittedNonce,
1261 recorder: Arc<dyn SignedAttemptRecorder>,
1262 ) -> EyreResult<SubmittedTransaction> {
1263 let tx_hash = TxHash::from([0x46; 32]).to_string();
1264 recorder
1265 .record_signed_attempt(SignedTransactionAttempt {
1266 submitter: self.submitter_address(),
1267 nonce,
1268 tx_hash: tx_hash.clone(),
1269 raw_tx: b"raw-signed-test-tx".to_vec(),
1270 })
1271 .await?;
1272 self.recorded_before_send.store(true, Ordering::SeqCst);
1273 Ok(SubmittedTransaction::new(
1274 self.submitter_address(),
1275 nonce,
1276 tx_hash,
1277 false,
1278 ))
1279 }
1280 }
1281
1282 #[derive(Default)]
1283 struct InMemorySubmitterStore {
1284 rows: Mutex<HashMap<(SubmitterId, SubmittedNonce), SubmissionDetailRow>>,
1285 }
1286
1287 #[async_trait]
1288 impl TransactionSubmitterReader for InMemorySubmitterStore {
1289 async fn get_submission_by_nonce(
1290 &self,
1291 submitter: &SubmitterId,
1292 nonce: SubmittedNonce,
1293 ) -> anyhow::Result<Option<SubmissionDetailRow>> {
1294 Ok(self.rows.lock().await.get(&(*submitter, nonce)).cloned())
1295 }
1296 }
1297
1298 #[async_trait]
1299 impl AsyncTransactionSubmitterStore for InMemorySubmitterStore {
1300 async fn max_nonce_for_submitter(
1301 &self,
1302 submitter: &SubmitterId,
1303 ) -> anyhow::Result<Option<u64>> {
1304 Ok(self
1305 .rows
1306 .lock()
1307 .await
1308 .iter()
1309 .filter(|((row_submitter, _), row)| {
1310 row_submitter == submitter && row.primary_tx_hash.is_some()
1311 })
1312 .map(|((_, nonce), _)| *nonce)
1313 .max())
1314 }
1315
1316 async fn record_submission(
1317 &self,
1318 record: &SubmissionRecord,
1319 attempts: &[SubmissionAttemptRecord],
1320 ) -> anyhow::Result<()> {
1321 let mut rows = self.rows.lock().await;
1322 let row = rows
1323 .entry((record.submitter, record.nonce))
1324 .or_insert_with(|| SubmissionDetailRow {
1325 submitter: record.submitter,
1326 nonce: record.nonce,
1327 status: record.status,
1328 primary_tx_hash: record.primary_tx_hash.clone(),
1329 terminal_error: record.terminal_error.clone(),
1330 attempts: Vec::new(),
1331 });
1332 row.status = record.status;
1333 if record.primary_tx_hash.is_some() {
1334 row.primary_tx_hash = record.primary_tx_hash.clone();
1335 }
1336 row.terminal_error = record.terminal_error.clone();
1337 for attempt in attempts {
1338 if !row
1339 .attempts
1340 .iter()
1341 .any(|existing| existing.tx_hash == attempt.tx_hash)
1342 {
1343 row.attempts.push(attempt.clone());
1344 }
1345 }
1346 Ok(())
1347 }
1348
1349 async fn update_submission_status(
1350 &self,
1351 submitter: &SubmitterId,
1352 nonce: SubmittedNonce,
1353 status: SubmissionStatus,
1354 primary_tx_hash: Option<&str>,
1355 terminal_error: Option<&str>,
1356 ) -> anyhow::Result<()> {
1357 let mut rows = self.rows.lock().await;
1358 let row = rows
1359 .get_mut(&(*submitter, nonce))
1360 .ok_or_else(|| anyhow::anyhow!("missing submission"))?;
1361 row.status = status;
1362 if let Some(primary_tx_hash) = primary_tx_hash {
1363 row.primary_tx_hash = Some(primary_tx_hash.to_string());
1364 }
1365 row.terminal_error = terminal_error.map(str::to_string);
1366 Ok(())
1367 }
1368
1369 async fn list_pending_submissions(
1370 &self,
1371 _after_submission_id: i64,
1372 _limit: i64,
1373 ) -> anyhow::Result<Vec<PendingSubmissionRow>> {
1374 Ok(Vec::new())
1375 }
1376 }
1377
1378 #[async_trait]
1379 impl TxRelayerTrait for BlockingFirstSendRelayer {
1380 fn submitter_address(&self) -> SubmitterId {
1381 Address::repeat_byte(0x45)
1382 }
1383
1384 async fn select_next_nonce(
1385 &self,
1386 db_nonce_floor: Option<u64>,
1387 ) -> EyreResult<SubmittedNonce> {
1388 let select_count = self.select_count.fetch_add(1, Ordering::SeqCst) + 1;
1389 if select_count == 2 {
1390 self.second_select_started.notify_one();
1391 }
1392 Ok(db_nonce_floor.unwrap_or(21))
1393 }
1394
1395 async fn send_transaction_with_nonce(
1396 &self,
1397 _request: ContractCall,
1398 nonce: SubmittedNonce,
1399 ) -> EyreResult<SubmittedTransaction> {
1400 let send_count = self.send_count.fetch_add(1, Ordering::SeqCst) + 1;
1401 if send_count == 1 {
1402 self.first_send_started.notify_one();
1403 self.release_first_send.notified().await;
1404 return Err(eyre::eyre!("forced pre-broadcast failure"));
1405 }
1406
1407 Ok(SubmittedTransaction::new(
1408 self.submitter_address(),
1409 nonce,
1410 TxHash::from([0x45; 32]).to_string(),
1411 true,
1412 ))
1413 }
1414 }
1415
1416 fn rsm_directive_bytes() -> Vec<u8> {
1417 vec![0x11, 0x22, 0x33]
1418 }
1419
1420 fn build_rsm_tx_request(
1421 request_id: &str,
1422 signature: String,
1423 expires_at: u64,
1424 ) -> TransactionRequest {
1425 let account = WalletAddress::from_str(ACCOUNT).expect("valid account");
1426 TransactionRequest {
1427 request_id: request_id.to_string(),
1428 wallet_address: account,
1429 account_contract: account,
1430 transaction_type: TransactionType::RsmDirective(SignedDirectiveTx {
1431 directive: rsm_directive_bytes(),
1432 signature,
1433 }),
1434 timestamp: timestamp_millis(),
1435 expires_at,
1436 }
1437 }
1438
1439 fn contract_call_for_test(external_id: &str) -> ContractCall {
1440 ContractCall {
1441 to: test_exchange_address(),
1442 value: U256::ZERO,
1443 data: Bytes::from(vec![0x12, 0x34]),
1444 external_id: Some(external_id.to_string()),
1445 }
1446 }
1447
1448 async fn recv_tx_update(
1449 rx: &mut tokio::sync::mpsc::UnboundedReceiver<EngineMessage>,
1450 ) -> TransactionUpdate {
1451 let message = timeout(Duration::from_secs(1), rx.recv())
1452 .await
1453 .expect("should receive tx update within timeout")
1454 .expect("channel should remain open");
1455 match message {
1456 EngineMessage::TransactionUpdate(update) => update,
1457 _ => panic!("expected TransactionUpdate"),
1458 }
1459 }
1460
1461 fn valid_signature_hex() -> String {
1462 format!("0x{}", hex::encode([0xAB; 65]))
1463 }
1464
1465 fn valid_signature_bytes() -> Vec<u8> {
1466 vec![0xAB; 65]
1467 }
1468
1469 fn test_exchange_address() -> Address {
1470 Address::from_str("0x1d70Ff185F6C25E4d76163F16563D63d5b590Cbc")
1471 .expect("valid exchange address")
1472 }
1473
1474 #[tokio::test]
1475 async fn transaction_submitter_rsm_expired_emits_expired_only() {
1476 let event_bus = Arc::new(MockEventBus::new());
1477 event_bus.clone().start_processing().await;
1478 let mut updates_rx = event_bus
1479 .subscribe(vec![TOPIC_TRANSACTION_UPDATES.to_string()])
1480 .await
1481 .expect("subscribe should succeed");
1482
1483 let shared_requests: SharedRequests = Arc::new(Mutex::new(Vec::new()));
1484 let submitter = TransactionSubmitter {
1485 event_bus: event_bus.clone(),
1486 tx_relayer: Arc::new(CapturingTxRelayer::with_shared_capture(
1487 RelayerBehavior::Success,
1488 PendingReconciliationBehavior::Unsupported,
1489 shared_requests.clone(),
1490 )),
1491 exchange_address: test_exchange_address(),
1492 db: None,
1493 submitter_store: None,
1494 nonce_lock: Arc::new(Mutex::new(())),
1495 };
1496
1497 let now = timestamp_millis();
1498 let tx_req =
1499 build_rsm_tx_request("expired-rsm", valid_signature_hex(), now.saturating_sub(1));
1500 submitter.process_transaction_request(tx_req).await;
1501
1502 let first = recv_tx_update(&mut updates_rx).await;
1503 assert_eq!(first.request_id, "expired-rsm");
1504 assert_eq!(first.status, TransactionStatus::Expired);
1505 assert_eq!(
1506 first.error.as_deref(),
1507 Some("Transaction request expired"),
1508 "expired message should be explicit"
1509 );
1510 assert!(
1511 timeout(Duration::from_millis(100), updates_rx.recv())
1512 .await
1513 .is_err(),
1514 "expired path should emit exactly one update"
1515 );
1516
1517 let requests = take_requests(&shared_requests).await;
1518 assert!(
1519 requests.is_empty(),
1520 "expired requests must not be sent to relayer"
1521 );
1522 }
1523
1524 #[tokio::test]
1525 async fn transaction_submitter_rsm_success_emits_confirmed_after_relay() {
1526 let event_bus = Arc::new(MockEventBus::new());
1527 event_bus.clone().start_processing().await;
1528 let mut updates_rx = event_bus
1529 .subscribe(vec![TOPIC_TRANSACTION_UPDATES.to_string()])
1530 .await
1531 .expect("subscribe should succeed");
1532
1533 let shared_requests: SharedRequests = Arc::new(Mutex::new(Vec::new()));
1534 let submitter = TransactionSubmitter {
1535 event_bus: event_bus.clone(),
1536 tx_relayer: Arc::new(CapturingTxRelayer::with_shared_capture(
1537 RelayerBehavior::Success,
1538 PendingReconciliationBehavior::Unsupported,
1539 shared_requests.clone(),
1540 )),
1541 exchange_address: test_exchange_address(),
1542 db: None,
1543 submitter_store: None,
1544 nonce_lock: Arc::new(Mutex::new(())),
1545 };
1546
1547 let now = timestamp_millis();
1548 let expected_directive = rsm_directive_bytes();
1549 let expected_signature_bytes = valid_signature_bytes();
1550 let tx_req = build_rsm_tx_request("success-rsm", valid_signature_hex(), now + 60_000);
1551 submitter.process_transaction_request(tx_req).await;
1552
1553 let confirmed = recv_tx_update(&mut updates_rx).await;
1554 assert_eq!(confirmed.request_id, "success-rsm");
1555 assert_eq!(confirmed.status, TransactionStatus::Confirmed);
1556 assert_eq!(
1557 confirmed.tx_hash.as_deref(),
1558 Some("0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
1559 );
1560
1561 let requests = take_requests(&shared_requests).await;
1562 assert_eq!(
1563 requests.len(),
1564 1,
1565 "exactly one relay request should be sent"
1566 );
1567 assert_eq!(requests[0].external_id.as_deref(), Some("success-rsm"));
1568
1569 let to: Address = requests[0].to.into();
1570 assert_eq!(to, test_exchange_address());
1571
1572 let calldata: Bytes = requests[0].data.clone().into();
1573 let expected_selector = keccak256("executeRsmDirective(bytes,bytes)".as_bytes());
1574 assert_eq!(&calldata[..4], &expected_selector[..4]);
1575
1576 let decoded_call = super::exchange_encoder::Exchange::executeRsmDirectiveCall::abi_decode(
1577 calldata.as_ref(),
1578 )
1579 .expect("calldata should decode as executeRsmDirectiveCall");
1580 assert_eq!(decoded_call.directive, Bytes::from(expected_directive));
1581 assert_eq!(
1582 decoded_call.signature,
1583 Bytes::from(expected_signature_bytes)
1584 );
1585 }
1586
1587 #[tokio::test]
1588 async fn transaction_submitter_rsm_relayer_error_leaves_outbox_retryable() {
1589 let event_bus = Arc::new(MockEventBus::new());
1590 event_bus.clone().start_processing().await;
1591 let mut updates_rx = event_bus
1592 .subscribe(vec![TOPIC_TRANSACTION_UPDATES.to_string()])
1593 .await
1594 .expect("subscribe should succeed");
1595
1596 let shared_requests: SharedRequests = Arc::new(Mutex::new(Vec::new()));
1597 let submitter = TransactionSubmitter {
1598 event_bus: event_bus.clone(),
1599 tx_relayer: Arc::new(CapturingTxRelayer::with_shared_capture(
1600 RelayerBehavior::Fail,
1601 PendingReconciliationBehavior::Unsupported,
1602 shared_requests.clone(),
1603 )),
1604 exchange_address: test_exchange_address(),
1605 db: None,
1606 submitter_store: None,
1607 nonce_lock: Arc::new(Mutex::new(())),
1608 };
1609
1610 let now = timestamp_millis();
1611 let tx_req = build_rsm_tx_request("failed-rsm", valid_signature_hex(), now + 60_000);
1612 submitter.process_transaction_request(tx_req).await;
1613
1614 let retryable = recv_tx_update(&mut updates_rx).await;
1615 assert_eq!(retryable.request_id, "failed-rsm");
1616 assert_eq!(retryable.status, TransactionStatus::Submitted);
1617 assert!(retryable.tx_hash.is_none());
1618 assert!(
1619 retryable
1620 .error
1621 .as_deref()
1622 .unwrap_or_default()
1623 .contains("forced failure"),
1624 "relay transport error should be recorded on a retryable nonterminal update: {retryable:?}"
1625 );
1626
1627 let requests = take_requests(&shared_requests).await;
1628 assert_eq!(
1629 requests.len(),
1630 1,
1631 "relay request should have been attempted"
1632 );
1633 }
1634
1635 #[tokio::test]
1636 async fn transaction_submitter_mined_revert_emits_terminal_failed_update() {
1637 let event_bus = Arc::new(MockEventBus::new());
1638 event_bus.clone().start_processing().await;
1639 let mut updates_rx = event_bus
1640 .subscribe(vec![TOPIC_TRANSACTION_UPDATES.to_string()])
1641 .await
1642 .expect("subscribe should succeed");
1643
1644 let shared_requests: SharedRequests = Arc::new(Mutex::new(Vec::new()));
1645 let submitter = TransactionSubmitter {
1646 event_bus: event_bus.clone(),
1647 tx_relayer: Arc::new(CapturingTxRelayer::with_shared_capture(
1648 RelayerBehavior::Reverted,
1649 PendingReconciliationBehavior::Unsupported,
1650 shared_requests.clone(),
1651 )),
1652 exchange_address: test_exchange_address(),
1653 db: None,
1654 submitter_store: None,
1655 nonce_lock: Arc::new(Mutex::new(())),
1656 };
1657
1658 let now = timestamp_millis();
1659 let tx_req = build_rsm_tx_request("reverted-rsm", valid_signature_hex(), now + 60_000);
1660 submitter.process_transaction_request(tx_req).await;
1661
1662 let failed = recv_tx_update(&mut updates_rx).await;
1663 assert_eq!(failed.request_id, "reverted-rsm");
1664 assert_eq!(failed.status, TransactionStatus::Failed);
1665 assert!(failed.tx_hash.is_none());
1666 assert!(
1667 failed
1668 .error
1669 .as_deref()
1670 .unwrap_or_default()
1671 .contains("direct submitter transaction reverted"),
1672 "mined reverts must be terminal failures for manual reconciliation: {failed:?}"
1673 );
1674
1675 let requests = take_requests(&shared_requests).await;
1676 assert_eq!(
1677 requests.len(),
1678 1,
1679 "relay request should have been attempted"
1680 );
1681 }
1682
1683 #[tokio::test]
1684 async fn transaction_submitter_pending_relayer_emits_pending_after_relay() {
1685 let event_bus = Arc::new(MockEventBus::new());
1686 event_bus.clone().start_processing().await;
1687 let mut updates_rx = event_bus
1688 .subscribe(vec![TOPIC_TRANSACTION_UPDATES.to_string()])
1689 .await
1690 .expect("subscribe should succeed");
1691
1692 let shared_requests: SharedRequests = Arc::new(Mutex::new(Vec::new()));
1693 let submitter = TransactionSubmitter {
1694 event_bus: event_bus.clone(),
1695 tx_relayer: Arc::new(CapturingTxRelayer::with_shared_capture(
1696 RelayerBehavior::Pending,
1697 PendingReconciliationBehavior::Unsupported,
1698 shared_requests.clone(),
1699 )),
1700 exchange_address: test_exchange_address(),
1701 db: None,
1702 submitter_store: None,
1703 nonce_lock: Arc::new(Mutex::new(())),
1704 };
1705
1706 let now = timestamp_millis();
1707 let tx_req = build_rsm_tx_request("pending-rsm", valid_signature_hex(), now + 60_000);
1708 submitter.process_transaction_request(tx_req).await;
1709
1710 let pending = recv_tx_update(&mut updates_rx).await;
1711 assert_eq!(pending.request_id, "pending-rsm");
1712 assert_eq!(pending.status, TransactionStatus::Pending);
1713 assert_eq!(
1714 pending.tx_hash.as_deref(),
1715 Some("0xbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")
1716 );
1717 }
1718
1719 #[tokio::test]
1720 async fn transaction_submitter_pending_relayer_with_reconciliation_timeout_stays_pending() {
1721 let event_bus = Arc::new(MockEventBus::new());
1722 event_bus.clone().start_processing().await;
1723 let mut updates_rx = event_bus
1724 .subscribe(vec![TOPIC_TRANSACTION_UPDATES.to_string()])
1725 .await
1726 .expect("subscribe should succeed");
1727
1728 let shared_requests: SharedRequests = Arc::new(Mutex::new(Vec::new()));
1729 let submitter = TransactionSubmitter {
1730 event_bus: event_bus.clone(),
1731 tx_relayer: Arc::new(CapturingTxRelayer::with_shared_capture(
1732 RelayerBehavior::Pending,
1733 PendingReconciliationBehavior::StillPending("timed out waiting for receipt"),
1734 shared_requests.clone(),
1735 )),
1736 exchange_address: test_exchange_address(),
1737 db: None,
1738 submitter_store: None,
1739 nonce_lock: Arc::new(Mutex::new(())),
1740 };
1741
1742 let now = timestamp_millis();
1743 let tx_req = build_rsm_tx_request(
1744 "pending-reconciled-rsm",
1745 valid_signature_hex(),
1746 now + 60_000,
1747 );
1748 submitter.process_transaction_request(tx_req).await;
1749
1750 let pending = recv_tx_update(&mut updates_rx).await;
1751 assert_eq!(pending.request_id, "pending-reconciled-rsm");
1752 assert_eq!(pending.status, TransactionStatus::Pending);
1753
1754 let still_pending = recv_tx_update(&mut updates_rx).await;
1755 assert_eq!(still_pending.request_id, "pending-reconciled-rsm");
1756 assert_eq!(still_pending.status, TransactionStatus::Pending);
1757 assert_eq!(
1758 still_pending.error.as_deref(),
1759 Some("timed out waiting for receipt")
1760 );
1761 assert_eq!(
1762 still_pending.tx_hash.as_deref(),
1763 Some("0xbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")
1764 );
1765 }
1766
1767 #[tokio::test]
1768 async fn transaction_submitter_pending_reconciliation_does_not_block_hot_path() {
1769 let event_bus = Arc::new(MockEventBus::new());
1770 event_bus.clone().start_processing().await;
1771 let mut updates_rx = event_bus
1772 .subscribe(vec![TOPIC_TRANSACTION_UPDATES.to_string()])
1773 .await
1774 .expect("subscribe should succeed");
1775
1776 let shared_requests: SharedRequests = Arc::new(Mutex::new(Vec::new()));
1777 let started = Arc::new(Notify::new());
1778 let release = Arc::new(Notify::new());
1779 let submitter = TransactionSubmitter {
1780 event_bus: event_bus.clone(),
1781 tx_relayer: Arc::new(BlockingPendingRelayer {
1782 started: Arc::clone(&started),
1783 release: Arc::clone(&release),
1784 requests: shared_requests.clone(),
1785 }),
1786 exchange_address: test_exchange_address(),
1787 db: None,
1788 submitter_store: None,
1789 nonce_lock: Arc::new(Mutex::new(())),
1790 };
1791
1792 let now = timestamp_millis();
1793 let tx_req = build_rsm_tx_request("pending-hot-path", valid_signature_hex(), now + 60_000);
1794 let handle = tokio::spawn(async move {
1795 submitter.process_transaction_request(tx_req).await;
1796 });
1797
1798 let pending = recv_tx_update(&mut updates_rx).await;
1799 assert_eq!(pending.request_id, "pending-hot-path");
1800 assert_eq!(pending.status, TransactionStatus::Pending);
1801
1802 timeout(Duration::from_millis(200), started.notified())
1803 .await
1804 .expect("reconciliation task should start in background");
1805 timeout(Duration::from_millis(200), handle)
1806 .await
1807 .expect("hot path should return before reconciliation completes")
1808 .expect("transaction task should complete successfully");
1809
1810 assert!(
1811 timeout(Duration::from_millis(100), updates_rx.recv())
1812 .await
1813 .is_err(),
1814 "confirmed update should wait for reconciliation completion"
1815 );
1816
1817 release.notify_one();
1818
1819 let confirmed = recv_tx_update(&mut updates_rx).await;
1820 assert_eq!(confirmed.request_id, "pending-hot-path");
1821 assert_eq!(confirmed.status, TransactionStatus::Confirmed);
1822 assert_eq!(
1823 confirmed.tx_hash.as_deref(),
1824 Some("0xcccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc")
1825 );
1826 }
1827
1828 #[tokio::test]
1829 async fn transaction_submitter_serializes_nonce_selection_through_first_submit() {
1830 let event_bus = Arc::new(MockEventBus::new());
1831 event_bus.clone().start_processing().await;
1832 let relayer = Arc::new(BlockingFirstSendRelayer::new());
1833 let submitter = Arc::new(TransactionSubmitter {
1834 event_bus,
1835 tx_relayer: relayer.clone(),
1836 exchange_address: test_exchange_address(),
1837 db: None,
1838 submitter_store: None,
1839 nonce_lock: Arc::new(Mutex::new(())),
1840 });
1841
1842 let first_submitter = submitter.clone();
1843 let first = tokio::spawn(async move {
1844 first_submitter
1845 .submit_contract_call("first", contract_call_for_test("first"))
1846 .await
1847 });
1848
1849 timeout(
1850 Duration::from_secs(1),
1851 relayer.first_send_started.notified(),
1852 )
1853 .await
1854 .expect("first submit should enter send path");
1855
1856 let second_submitter = submitter.clone();
1857 let second = tokio::spawn(async move {
1858 second_submitter
1859 .submit_contract_call("second", contract_call_for_test("second"))
1860 .await
1861 });
1862
1863 assert!(
1864 timeout(
1865 Duration::from_millis(100),
1866 relayer.second_select_started.notified()
1867 )
1868 .await
1869 .is_err(),
1870 "second submit must not allocate a nonce while first submit is undecided"
1871 );
1872
1873 relayer.release_first_send.notify_one();
1874
1875 let first_result = first.await.expect("first task should join");
1876 assert!(first_result.is_err());
1877
1878 let second_result = second.await.expect("second task should join");
1879 assert_eq!(
1880 second_result.expect("second submit should succeed").nonce,
1881 21
1882 );
1883 assert_eq!(relayer.select_count.load(Ordering::SeqCst), 2);
1884 }
1885
1886 #[tokio::test]
1887 async fn transaction_submitter_persists_signed_attempt_before_send() {
1888 let event_bus = Arc::new(MockEventBus::new());
1889 event_bus.clone().start_processing().await;
1890 let store = Arc::new(InMemorySubmitterStore::default());
1891 let recorded_before_send = Arc::new(AtomicBool::new(false));
1892 let submitter = TransactionSubmitter {
1893 event_bus,
1894 tx_relayer: Arc::new(RecorderOrderingRelayer {
1895 recorded_before_send: recorded_before_send.clone(),
1896 }),
1897 exchange_address: test_exchange_address(),
1898 db: None,
1899 submitter_store: Some(store.clone()),
1900 nonce_lock: Arc::new(Mutex::new(())),
1901 };
1902
1903 let submission = submitter
1904 .submit_contract_call(
1905 "record-before-send",
1906 contract_call_for_test("record-before-send"),
1907 )
1908 .await
1909 .expect("submission should stay pending");
1910
1911 assert!(
1912 recorded_before_send.load(Ordering::SeqCst),
1913 "relayer must not send until the recorder accepted the signed attempt"
1914 );
1915 let detail = store
1916 .get_submission_by_nonce(&Address::repeat_byte(0x46), submission.nonce)
1917 .await
1918 .expect("db read")
1919 .expect("submission exists");
1920 assert_eq!(detail.status, SubmissionStatus::Broadcasted);
1921 assert_eq!(
1922 detail.primary_tx_hash.as_deref(),
1923 Some(submission.hash.as_str())
1924 );
1925 assert_eq!(detail.attempts.len(), 1);
1926 assert_eq!(detail.attempts[0].raw_tx, b"raw-signed-test-tx");
1927 }
1928
1929 #[tokio::test]
1930 async fn transaction_submitter_reconciles_all_replacement_attempt_hashes() {
1931 let event_bus = Arc::new(MockEventBus::new());
1932 event_bus.clone().start_processing().await;
1933 let mut updates_rx = event_bus
1934 .subscribe(vec![TOPIC_TRANSACTION_UPDATES.to_string()])
1935 .await
1936 .expect("subscribe should succeed");
1937
1938 let attempts = vec![
1939 "0x1111111111111111111111111111111111111111111111111111111111111111".to_string(),
1940 "0x2222222222222222222222222222222222222222222222222222222222222222".to_string(),
1941 ];
1942 let shared_requests: SharedRequests = Arc::new(Mutex::new(Vec::new()));
1943 let submitter = TransactionSubmitter {
1944 event_bus: event_bus.clone(),
1945 tx_relayer: Arc::new(ReplacementPendingRelayer {
1946 requests: shared_requests.clone(),
1947 expected_attempts: attempts.clone(),
1948 confirmed_hash: attempts[0].clone(),
1949 }),
1950 exchange_address: test_exchange_address(),
1951 db: None,
1952 submitter_store: None,
1953 nonce_lock: Arc::new(Mutex::new(())),
1954 };
1955
1956 let now = timestamp_millis();
1957 let tx_req = build_rsm_tx_request(
1958 "replacement-attempts-rsm",
1959 valid_signature_hex(),
1960 now + 60_000,
1961 );
1962 submitter.process_transaction_request(tx_req).await;
1963
1964 let pending = recv_tx_update(&mut updates_rx).await;
1965 assert_eq!(pending.request_id, "replacement-attempts-rsm");
1966 assert_eq!(pending.status, TransactionStatus::Pending);
1967 assert_eq!(
1968 pending.tx_hash.as_deref(),
1969 Some(attempts[1].as_str()),
1970 "hot path should expose the newest replacement hash while reconciliation runs"
1971 );
1972
1973 let confirmed = recv_tx_update(&mut updates_rx).await;
1974 assert_eq!(confirmed.request_id, "replacement-attempts-rsm");
1975 assert_eq!(confirmed.status, TransactionStatus::Confirmed);
1976 assert_eq!(
1977 confirmed.tx_hash.as_deref(),
1978 Some(attempts[0].as_str()),
1979 "reconciliation must finalize with the attempt that actually mined"
1980 );
1981
1982 let requests = take_requests(&shared_requests).await;
1983 assert_eq!(requests.len(), 1);
1984 }
1985
1986 #[tokio::test]
1987 async fn transaction_submitter_rsm_invalid_signature_emits_terminal_failure() {
1988 let event_bus = Arc::new(MockEventBus::new());
1989 event_bus.clone().start_processing().await;
1990 let mut updates_rx = event_bus
1991 .subscribe(vec![TOPIC_TRANSACTION_UPDATES.to_string()])
1992 .await
1993 .expect("subscribe should succeed");
1994
1995 let shared_requests: SharedRequests = Arc::new(Mutex::new(Vec::new()));
1996 let submitter = TransactionSubmitter {
1997 event_bus: event_bus.clone(),
1998 tx_relayer: Arc::new(CapturingTxRelayer::with_shared_capture(
1999 RelayerBehavior::Success,
2000 PendingReconciliationBehavior::Unsupported,
2001 shared_requests.clone(),
2002 )),
2003 exchange_address: test_exchange_address(),
2004 db: None,
2005 submitter_store: None,
2006 nonce_lock: Arc::new(Mutex::new(())),
2007 };
2008
2009 let now = timestamp_millis();
2010 let tx_req = build_rsm_tx_request("invalid-sig-rsm", "0xzz".to_string(), now + 60_000);
2011 submitter.process_transaction_request(tx_req).await;
2012
2013 let failed = recv_tx_update(&mut updates_rx).await;
2014 assert_eq!(failed.request_id, "invalid-sig-rsm");
2015 assert_eq!(failed.status, TransactionStatus::Failed);
2016 assert!(
2017 failed
2018 .error
2019 .as_deref()
2020 .is_some_and(|error| error.contains("invalid signature bytes")),
2021 "signature parse errors must emit an explicit terminal failure"
2022 );
2023
2024 let requests = take_requests(&shared_requests).await;
2025 assert!(
2026 requests.is_empty(),
2027 "invalid signatures must not be sent to relayer"
2028 );
2029 }
2030}