Skip to main content

hypercall_transaction_submitter/
tx_relayer.rs

1use alloy::{
2    eips::eip2718::Encodable2718,
3    network::{EthereumWallet, TransactionBuilder},
4    primitives::{Address, TxHash},
5    providers::{DynProvider, Provider},
6    rpc::types::{TransactionInput, TransactionRequest as AlloyTransactionRequest},
7};
8#[cfg(feature = "aws")]
9use alloy::{providers::ProviderBuilder, signers::Signer as AlloySigner};
10use async_trait::async_trait;
11use ethers::{
12    middleware::SignerMiddleware,
13    providers::{Http, Middleware, Provider as EthersProvider},
14    signers::{LocalWallet, Signer},
15    types::{
16        transaction::{eip1559::Eip1559TransactionRequest, eip2718::TypedTransaction},
17        Address as EthersAddress, BlockId, BlockNumber, Bytes as EthersBytes, H256,
18        U256 as EthersU256,
19    },
20};
21use eyre::Result as EyreResult;
22use hypercall_transaction_submitter_core::{ContractCall, SubmittedNonce, SubmitterId};
23use std::{
24    str::FromStr,
25    sync::{
26        atomic::{AtomicU64, Ordering},
27        Arc,
28    },
29    time::Duration,
30};
31use tokio::time::{sleep, timeout, Instant};
32
33const DIRECT_SUBMITTER_SEND_TIMEOUT: Duration = Duration::from_secs(10);
34const DIRECT_SUBMITTER_RECEIPT_TIMEOUT: Duration = Duration::from_secs(10);
35const DIRECT_SUBMITTER_PENDING_RECONCILIATION_TIMEOUT: Duration = Duration::from_secs(120);
36const DIRECT_SUBMITTER_PENDING_RECONCILIATION_POLL_INTERVAL: Duration = Duration::from_secs(5);
37const DIRECT_SUBMITTER_REPLACEMENT_ATTEMPTS: usize = 6;
38const DIRECT_SUBMITTER_REPLACEMENT_GAS_MULTIPLIER: u128 = 2;
39const DIRECT_SUBMITTER_GAS_LIMIT_BUFFER_NUMERATOR: u64 = 12;
40const DIRECT_SUBMITTER_GAS_LIMIT_BUFFER_DENOMINATOR: u64 = 10;
41const DIRECT_SUBMITTER_FALLBACK_GAS_LIMIT: u64 = 250_000;
42
43#[derive(Clone, Debug, PartialEq, Eq)]
44pub enum PendingTransactionStatus {
45    Unsupported,
46    Pending(String),
47    Confirmed(String),
48    Failed(String),
49}
50
51#[derive(Clone, Debug, PartialEq, Eq)]
52pub struct SubmittedTransaction {
53    pub submitter: SubmitterId,
54    pub nonce: SubmittedNonce,
55    pub hash: String,
56    pub all_hashes: Vec<String>,
57    pub confirmed: bool,
58}
59
60#[derive(Clone, Debug, PartialEq, Eq)]
61pub struct SignedTransactionAttempt {
62    pub submitter: SubmitterId,
63    pub nonce: SubmittedNonce,
64    pub tx_hash: String,
65    pub raw_tx: Vec<u8>,
66}
67
68#[async_trait]
69pub trait SignedAttemptRecorder: Send + Sync {
70    async fn record_signed_attempt(&self, attempt: SignedTransactionAttempt) -> EyreResult<()>;
71}
72
73impl SubmittedTransaction {
74    pub fn new(
75        submitter: SubmitterId,
76        nonce: SubmittedNonce,
77        hash: String,
78        confirmed: bool,
79    ) -> Self {
80        Self {
81            submitter,
82            nonce,
83            all_hashes: vec![hash.clone()],
84            hash,
85            confirmed,
86        }
87    }
88
89    pub fn with_hashes(
90        submitter: SubmitterId,
91        nonce: SubmittedNonce,
92        hash: String,
93        all_hashes: Vec<String>,
94        confirmed: bool,
95    ) -> Self {
96        let all_hashes = if all_hashes.is_empty() {
97            vec![hash.clone()]
98        } else {
99            all_hashes
100        };
101        Self {
102            submitter,
103            nonce,
104            hash,
105            all_hashes,
106            confirmed,
107        }
108    }
109}
110
111#[async_trait]
112pub trait TxRelayerTrait: Send + Sync {
113    fn submitter_address(&self) -> SubmitterId;
114
115    async fn select_next_nonce(&self, db_nonce_floor: Option<u64>) -> EyreResult<SubmittedNonce> {
116        Ok(db_nonce_floor.unwrap_or(0))
117    }
118
119    async fn send_transaction_with_nonce(
120        &self,
121        request: ContractCall,
122        nonce: SubmittedNonce,
123    ) -> EyreResult<SubmittedTransaction> {
124        self.send_transaction(request, Some(nonce)).await
125    }
126
127    async fn send_transaction_with_nonce_recording_attempts(
128        &self,
129        request: ContractCall,
130        nonce: SubmittedNonce,
131        recorder: Arc<dyn SignedAttemptRecorder>,
132    ) -> EyreResult<SubmittedTransaction> {
133        let submission = self.send_transaction_with_nonce(request, nonce).await?;
134        for tx_hash in &submission.all_hashes {
135            recorder
136                .record_signed_attempt(SignedTransactionAttempt {
137                    submitter: submission.submitter,
138                    nonce: submission.nonce,
139                    tx_hash: tx_hash.clone(),
140                    raw_tx: Vec::new(),
141                })
142                .await?;
143        }
144        Ok(submission)
145    }
146
147    async fn send_transaction(
148        &self,
149        request: ContractCall,
150        db_nonce_floor: Option<u64>,
151    ) -> EyreResult<SubmittedTransaction> {
152        let nonce = self.select_next_nonce(db_nonce_floor).await?;
153        self.send_transaction_with_nonce(request, nonce).await
154    }
155
156    async fn reconcile_pending_transaction(
157        &self,
158        _tx_hash: &str,
159    ) -> EyreResult<PendingTransactionStatus> {
160        Ok(PendingTransactionStatus::Unsupported)
161    }
162
163    async fn reconcile_pending_transactions(
164        &self,
165        tx_hashes: &[String],
166    ) -> EyreResult<PendingTransactionStatus> {
167        let mut pending_reason = None;
168        for tx_hash in tx_hashes {
169            match self.reconcile_pending_transaction(tx_hash).await? {
170                PendingTransactionStatus::Unsupported => {}
171                PendingTransactionStatus::Pending(reason) => pending_reason = Some(reason),
172                terminal @ PendingTransactionStatus::Confirmed(_) => return Ok(terminal),
173                terminal @ PendingTransactionStatus::Failed(_) => return Ok(terminal),
174            }
175        }
176        if let Some(reason) = pending_reason {
177            return Ok(PendingTransactionStatus::Pending(reason));
178        }
179        Ok(PendingTransactionStatus::Pending(format!(
180            "transaction attempts remained pending past reconciliation timeout: hashes={tx_hashes:?}"
181        )))
182    }
183}
184
185pub struct DirectTxRelayer {
186    client: Arc<SignerMiddleware<EthersProvider<Http>, LocalWallet>>,
187    max_gas_price: EthersU256,
188}
189
190pub struct AwsKmsTxRelayer {
191    provider: DynProvider,
192    wallet: EthereumWallet,
193    signer_address: Address,
194    chain_id: u64,
195    max_gas_price: u128,
196}
197
198impl DirectTxRelayer {
199    pub async fn new(
200        config: &crate::TransactionSubmitterConfig,
201        secrets: &crate::TransactionSubmitterSecrets,
202    ) -> EyreResult<Self> {
203        let provider = EthersProvider::<Http>::try_from(config.rpc_url.as_str())?
204            .interval(Duration::from_millis(100));
205        let chain_id = provider.get_chainid().await?.as_u64();
206        let wallet = secrets
207            .require_transaction_submitter_private_key()
208            .map_err(|err| {
209                eyre::eyre!(
210                    "Transaction submitter private key must be available for direct submitter mode: {err}"
211                )
212            })?
213            .parse::<LocalWallet>()?
214            .with_chain_id(chain_id);
215
216        Ok(Self {
217            client: Arc::new(SignerMiddleware::new(provider, wallet)),
218            max_gas_price: EthersU256::from_dec_str(&config.max_gas_price)?,
219        })
220    }
221
222    async fn apply_direct_gas_limit(
223        &self,
224        tx: &mut TypedTransaction,
225        nonce: SubmittedNonce,
226    ) -> EyreResult<()> {
227        let gas_limit = match self.client.estimate_gas(tx, None).await {
228            Ok(estimated_gas) => buffer_ethers_gas_limit(estimated_gas)?,
229            Err(error) => {
230                tracing::warn!(
231                    %error,
232                    sender = %self.client.address(),
233                    evm_nonce = nonce,
234                    fallback_gas_limit = DIRECT_SUBMITTER_FALLBACK_GAS_LIMIT,
235                    "direct submitter failed to estimate gas, using fallback gas limit"
236                );
237                EthersU256::from(DIRECT_SUBMITTER_FALLBACK_GAS_LIMIT)
238            }
239        };
240        tx.set_gas(gas_limit);
241        Ok(())
242    }
243}
244
245impl AwsKmsTxRelayer {
246    #[cfg(feature = "aws")]
247    pub async fn new(config: &crate::TransactionSubmitterConfig) -> EyreResult<Self> {
248        let signer = hypercall_signer_aws::build_aws_kms_evm_transaction_signer(
249            &config.aws_kms_key_id,
250            &config.rpc_url,
251        )
252        .await?;
253        let signer_address = signer.address();
254        let wallet = EthereumWallet::new(signer);
255        let provider = ProviderBuilder::new()
256            .connect_http(config.rpc_url.parse()?)
257            .erased();
258        let chain_id = provider.get_chain_id().await?;
259
260        Ok(Self {
261            provider,
262            wallet,
263            signer_address,
264            chain_id,
265            max_gas_price: config.max_gas_price.parse::<u128>()?,
266        })
267    }
268
269    #[cfg(not(feature = "aws"))]
270    pub async fn new(_config: &crate::TransactionSubmitterConfig) -> EyreResult<Self> {
271        Err(eyre::eyre!(
272            "AWS KMS transaction submitter requires building hypercall with the aws feature"
273        ))
274    }
275
276    async fn find_confirmed_or_reverted_receipt(
277        &self,
278        nonce: SubmittedNonce,
279        tx_hashes: &[String],
280    ) -> EyreResult<Option<SubmittedTransaction>> {
281        for tx_hash in tx_hashes {
282            let hash = TxHash::from_str(tx_hash).map_err(|error| {
283                eyre::eyre!(
284                    "invalid aws_kms submitter replacement transaction hash {tx_hash}: {error}"
285                )
286            })?;
287            let Some(receipt) =
288                self.provider
289                    .get_transaction_receipt(hash)
290                    .await
291                    .map_err(|error| {
292                        eyre::eyre!(
293                            "aws_kms submitter failed while checking replacement receipt {tx_hash}: {error}"
294                        )
295                    })?
296            else {
297                continue;
298            };
299
300            if receipt.status() {
301                return Ok(Some(SubmittedTransaction::with_hashes(
302                    self.signer_address,
303                    nonce,
304                    tx_hash.clone(),
305                    tx_hashes.to_vec(),
306                    true,
307                )));
308            }
309            return Err(eyre::eyre!(
310                "aws_kms submitter transaction reverted: hash={}",
311                receipt.transaction_hash
312            ));
313        }
314        Ok(None)
315    }
316}
317
318#[async_trait]
319impl TxRelayerTrait for DirectTxRelayer {
320    fn submitter_address(&self) -> SubmitterId {
321        Address::from(*self.client.address().as_fixed_bytes())
322    }
323
324    async fn select_next_nonce(&self, db_nonce_floor: Option<u64>) -> EyreResult<SubmittedNonce> {
325        let chain_nonce = self
326            .client
327            .get_transaction_count(
328                self.client.address(),
329                Some(BlockId::Number(BlockNumber::Pending)),
330            )
331            .await
332            .map_err(|error| eyre::eyre!("direct submitter failed to load nonce: {error}"))?
333            .as_u64();
334        Ok(select_submitter_nonce(chain_nonce, db_nonce_floor))
335    }
336
337    async fn send_transaction_with_nonce(
338        &self,
339        request: ContractCall,
340        nonce: SubmittedNonce,
341    ) -> EyreResult<SubmittedTransaction> {
342        let submitter = self.submitter_address();
343        let mut tx = direct_contract_call_to_ethers_transaction(
344            request,
345            self.client.address(),
346            self.max_gas_price,
347            nonce,
348            self.client.signer().chain_id(),
349        );
350        self.apply_direct_gas_limit(&mut tx, nonce).await?;
351        let signature = self
352            .client
353            .signer()
354            .sign_transaction(&tx)
355            .await
356            .map_err(|error| eyre::eyre!("direct submitter failed to sign tx: {error}"))?;
357        let transaction_hash = format!("{:#066x}", tx.hash(&signature));
358        let raw_tx = tx.rlp_signed(&signature);
359
360        let pending = timeout(
361            DIRECT_SUBMITTER_SEND_TIMEOUT,
362            self.client.provider().send_raw_transaction(raw_tx.clone()),
363        )
364        .await
365        .map_err(|_| eyre::eyre!("direct submitter send_transaction timed out after signing"))?
366        .map_err(|error| eyre::eyre!("direct submitter send_transaction failed: {error}"))?;
367        let receipt = timeout(DIRECT_SUBMITTER_RECEIPT_TIMEOUT, pending)
368            .await
369            .map_err(|_| {
370                SubmittedTransaction::new(submitter, nonce, transaction_hash.clone(), false)
371            });
372        let receipt = match receipt {
373            Ok(receipt) => receipt.map_err(|error| {
374                eyre::eyre!("direct submitter failed while awaiting receipt: {error}")
375            })?,
376            Err(pending_submission) => return Ok(pending_submission),
377        };
378        let Some(receipt) = receipt else {
379            tracing::warn!(
380                tx_hash = %transaction_hash,
381                "direct submitter receipt not available, treating as pending"
382            );
383            return Ok(SubmittedTransaction::new(
384                submitter,
385                nonce,
386                transaction_hash,
387                false,
388            ));
389        };
390        if receipt.status != Some(1u64.into()) {
391            return Err(eyre::eyre!(
392                "direct submitter transaction reverted: hash={}",
393                receipt.transaction_hash
394            ));
395        }
396        Ok(SubmittedTransaction::new(
397            submitter,
398            nonce,
399            transaction_hash,
400            true,
401        ))
402    }
403
404    async fn send_transaction_with_nonce_recording_attempts(
405        &self,
406        request: ContractCall,
407        nonce: SubmittedNonce,
408        recorder: Arc<dyn SignedAttemptRecorder>,
409    ) -> EyreResult<SubmittedTransaction> {
410        let submitter = self.submitter_address();
411        let mut tx = direct_contract_call_to_ethers_transaction(
412            request,
413            self.client.address(),
414            self.max_gas_price,
415            nonce,
416            self.client.signer().chain_id(),
417        );
418        self.apply_direct_gas_limit(&mut tx, nonce).await?;
419        let signature = self
420            .client
421            .signer()
422            .sign_transaction(&tx)
423            .await
424            .map_err(|error| eyre::eyre!("direct submitter failed to sign tx: {error}"))?;
425        let transaction_hash = format!("{:#066x}", tx.hash(&signature));
426        let raw_tx = tx.rlp_signed(&signature);
427        recorder
428            .record_signed_attempt(SignedTransactionAttempt {
429                submitter,
430                nonce,
431                tx_hash: transaction_hash.clone(),
432                raw_tx: raw_tx.to_vec(),
433            })
434            .await?;
435
436        let pending = timeout(
437            DIRECT_SUBMITTER_SEND_TIMEOUT,
438            self.client.provider().send_raw_transaction(raw_tx),
439        )
440        .await;
441        let pending = match pending {
442            Ok(Ok(pending)) => pending,
443            Ok(Err(error)) => {
444                tracing::error!(
445                    tx_hash = %transaction_hash,
446                    %error,
447                    "direct submitter send_raw_transaction failed after signed tx was persisted; keeping attempt pending for reconciliation"
448                );
449                return Ok(SubmittedTransaction::new(
450                    submitter,
451                    nonce,
452                    transaction_hash,
453                    false,
454                ));
455            }
456            Err(_) => {
457                tracing::error!(
458                    tx_hash = %transaction_hash,
459                    "direct submitter send_raw_transaction timed out after signed tx was persisted; keeping attempt pending for reconciliation"
460                );
461                return Ok(SubmittedTransaction::new(
462                    submitter,
463                    nonce,
464                    transaction_hash,
465                    false,
466                ));
467            }
468        };
469        let receipt = timeout(DIRECT_SUBMITTER_RECEIPT_TIMEOUT, pending)
470            .await
471            .map_err(|_| {
472                SubmittedTransaction::new(submitter, nonce, transaction_hash.clone(), false)
473            });
474        let receipt = match receipt {
475            Ok(receipt) => receipt.map_err(|error| {
476                eyre::eyre!("direct submitter failed while awaiting receipt: {error}")
477            })?,
478            Err(pending_submission) => return Ok(pending_submission),
479        };
480        let Some(receipt) = receipt else {
481            tracing::warn!(
482                tx_hash = %transaction_hash,
483                "direct submitter receipt not available, treating as pending"
484            );
485            return Ok(SubmittedTransaction::new(
486                submitter,
487                nonce,
488                transaction_hash,
489                false,
490            ));
491        };
492        if receipt.status != Some(1u64.into()) {
493            return Err(eyre::eyre!(
494                "direct submitter transaction reverted: hash={}",
495                receipt.transaction_hash
496            ));
497        }
498        Ok(SubmittedTransaction::new(
499            submitter,
500            nonce,
501            transaction_hash,
502            true,
503        ))
504    }
505
506    async fn send_transaction(
507        &self,
508        request: ContractCall,
509        db_nonce_floor: Option<u64>,
510    ) -> EyreResult<SubmittedTransaction> {
511        let nonce = self.select_next_nonce(db_nonce_floor).await?;
512        self.send_transaction_with_nonce(request, nonce).await
513    }
514
515    async fn reconcile_pending_transaction(
516        &self,
517        tx_hash: &str,
518    ) -> EyreResult<PendingTransactionStatus> {
519        let hash = H256::from_str(tx_hash).map_err(|error| {
520            eyre::eyre!("invalid direct submitter transaction hash {tx_hash}: {error}")
521        })?;
522        let deadline = Instant::now() + DIRECT_SUBMITTER_PENDING_RECONCILIATION_TIMEOUT;
523
524        while Instant::now() < deadline {
525            let maybe_receipt = self
526                .client
527                .get_transaction_receipt(hash)
528                .await
529                .map_err(|error| {
530                    eyre::eyre!(
531                        "direct submitter failed while reconciling pending receipt {tx_hash}: {error}"
532                    )
533                })?;
534
535            if let Some(receipt) = maybe_receipt {
536                if receipt.status == Some(1u64.into()) {
537                    return Ok(PendingTransactionStatus::Confirmed(tx_hash.to_string()));
538                }
539                return Ok(PendingTransactionStatus::Failed(format!(
540                    "direct submitter transaction reverted during reconciliation: hash={}",
541                    receipt.transaction_hash
542                )));
543            }
544
545            sleep(DIRECT_SUBMITTER_PENDING_RECONCILIATION_POLL_INTERVAL).await;
546        }
547
548        Ok(PendingTransactionStatus::Pending(format!(
549            "direct submitter transaction remained pending past reconciliation timeout: hash={tx_hash}"
550        )))
551    }
552}
553
554#[async_trait]
555impl TxRelayerTrait for AwsKmsTxRelayer {
556    fn submitter_address(&self) -> SubmitterId {
557        self.signer_address
558    }
559
560    async fn select_next_nonce(&self, db_nonce_floor: Option<u64>) -> EyreResult<SubmittedNonce> {
561        let chain_nonce = self
562            .provider
563            .get_transaction_count(self.signer_address)
564            .pending()
565            .await
566            .map_err(|error| eyre::eyre!("aws_kms submitter failed to load nonce: {error}"))?;
567        Ok(select_submitter_nonce(chain_nonce, db_nonce_floor))
568    }
569
570    async fn send_transaction_with_nonce(
571        &self,
572        request: ContractCall,
573        nonce: SubmittedNonce,
574    ) -> EyreResult<SubmittedTransaction> {
575        let mut tx =
576            contract_call_to_alloy_transaction(request, self.signer_address, self.max_gas_price);
577        tx.chain_id = Some(self.chain_id);
578        tx.nonce = Some(nonce);
579        tx.gas = Some(match self.provider.estimate_gas(tx.clone()).await {
580            Ok(estimated_gas) => buffer_gas_limit(estimated_gas),
581            Err(error) => {
582                tracing::warn!(
583                    %error,
584                    sender = %self.signer_address,
585                    evm_nonce = nonce,
586                    fallback_gas_limit = DIRECT_SUBMITTER_FALLBACK_GAS_LIMIT,
587                    "aws_kms submitter failed to estimate gas, using fallback gas limit"
588                );
589                DIRECT_SUBMITTER_FALLBACK_GAS_LIMIT
590            }
591        });
592
593        let mut gas_price = self.max_gas_price;
594        let mut submitted_hashes = Vec::new();
595        for attempt in 0..=DIRECT_SUBMITTER_REPLACEMENT_ATTEMPTS {
596            tx.max_fee_per_gas = Some(gas_price);
597            tx.max_priority_fee_per_gas = Some(gas_price);
598
599            let signed_tx = tx
600                .clone()
601                .build(&self.wallet)
602                .await
603                .map_err(|error| eyre::eyre!("aws_kms submitter failed to sign tx: {error}"))?;
604            let transaction_hash = format!("{:#066x}", signed_tx.tx_hash());
605            let mut raw_tx = Vec::with_capacity(signed_tx.encode_2718_len());
606            signed_tx.encode_2718(&mut raw_tx);
607            submitted_hashes.push(transaction_hash.clone());
608
609            let pending_result = timeout(
610                DIRECT_SUBMITTER_SEND_TIMEOUT,
611                self.provider.send_raw_transaction(&raw_tx),
612            )
613            .await;
614            let pending = match pending_result {
615                Ok(Ok(pending)) => pending,
616                Ok(Err(error)) => {
617                    if let Some(submission) = self
618                        .find_confirmed_or_reverted_receipt(nonce, &submitted_hashes)
619                        .await?
620                    {
621                        return Ok(submission);
622                    }
623                    tracing::error!(
624                        tx_hash = %transaction_hash,
625                        %error,
626                        "aws_kms submitter send_raw_transaction failed after signing; keeping attempt pending for reconciliation"
627                    );
628                    return Ok(SubmittedTransaction::new(
629                        self.signer_address,
630                        nonce,
631                        transaction_hash,
632                        false,
633                    ));
634                }
635                Err(_) => {
636                    if let Some(submission) = self
637                        .find_confirmed_or_reverted_receipt(nonce, &submitted_hashes)
638                        .await?
639                    {
640                        return Ok(submission);
641                    }
642                    tracing::warn!(
643                        tx_hash = %transaction_hash,
644                        evm_nonce = nonce,
645                        attempt,
646                        "aws_kms submitter send_raw_transaction timed out after signing; keeping hash for reconciliation and replacement"
647                    );
648                    if attempt == DIRECT_SUBMITTER_REPLACEMENT_ATTEMPTS {
649                        break;
650                    }
651                    gas_price =
652                        gas_price.saturating_mul(DIRECT_SUBMITTER_REPLACEMENT_GAS_MULTIPLIER);
653                    continue;
654                }
655            };
656
657            let receipt = timeout(DIRECT_SUBMITTER_RECEIPT_TIMEOUT, pending.get_receipt()).await;
658            let receipt = match receipt {
659                Ok(receipt) => receipt.map_err(|error| {
660                    eyre::eyre!("aws_kms submitter failed while awaiting receipt: {error}")
661                })?,
662                Err(_) => {
663                    if let Some(submission) = self
664                        .find_confirmed_or_reverted_receipt(nonce, &submitted_hashes)
665                        .await?
666                    {
667                        return Ok(submission);
668                    }
669                    if attempt == DIRECT_SUBMITTER_REPLACEMENT_ATTEMPTS {
670                        break;
671                    }
672                    let next_gas_price =
673                        gas_price.saturating_mul(DIRECT_SUBMITTER_REPLACEMENT_GAS_MULTIPLIER);
674                    tracing::warn!(
675                        tx_hash = %transaction_hash,
676                        evm_nonce = nonce,
677                        attempt,
678                        gas_price,
679                        next_gas_price,
680                        "aws_kms submitter receipt not available, replacing transaction with bumped gas"
681                    );
682                    gas_price = next_gas_price;
683                    continue;
684                }
685            };
686
687            if !receipt.status() {
688                return Err(eyre::eyre!(
689                    "aws_kms submitter transaction reverted: hash={}",
690                    receipt.transaction_hash
691                ));
692            }
693            return Ok(SubmittedTransaction::new(
694                self.signer_address,
695                nonce,
696                transaction_hash,
697                true,
698            ));
699        }
700
701        let deadline = Instant::now() + DIRECT_SUBMITTER_PENDING_RECONCILIATION_TIMEOUT;
702        while Instant::now() < deadline {
703            if let Some(submission) = self
704                .find_confirmed_or_reverted_receipt(nonce, &submitted_hashes)
705                .await?
706            {
707                return Ok(submission);
708            }
709
710            let chain_nonce = self
711                .provider
712                .get_transaction_count(self.signer_address)
713                .latest()
714                .await
715                .map_err(|error| {
716                    eyre::eyre!(
717                        "aws_kms submitter failed to load latest nonce while monitoring replacements: {error}"
718                    )
719                })?;
720            if chain_nonce > nonce {
721                tracing::warn!(
722                    evm_nonce = nonce,
723                    chain_nonce,
724                    hashes = ?submitted_hashes,
725                    "aws_kms submitter nonce advanced while monitoring replacement receipts"
726                );
727            }
728
729            sleep(DIRECT_SUBMITTER_PENDING_RECONCILIATION_POLL_INTERVAL).await;
730        }
731
732        let Some(primary_hash) = submitted_hashes.last().cloned() else {
733            return Err(eyre::eyre!(
734                "aws_kms submitter did not produce a transaction hash before timeout: nonce={nonce}"
735            ));
736        };
737        tracing::warn!(
738            evm_nonce = nonce,
739            hashes = ?submitted_hashes,
740            "aws_kms submitter replacement transactions are still pending; persisting attempts for reconciliation"
741        );
742        Ok(SubmittedTransaction::with_hashes(
743            self.signer_address,
744            nonce,
745            primary_hash,
746            submitted_hashes,
747            false,
748        ))
749    }
750
751    async fn send_transaction_with_nonce_recording_attempts(
752        &self,
753        request: ContractCall,
754        nonce: SubmittedNonce,
755        recorder: Arc<dyn SignedAttemptRecorder>,
756    ) -> EyreResult<SubmittedTransaction> {
757        let mut tx =
758            contract_call_to_alloy_transaction(request, self.signer_address, self.max_gas_price);
759        tx.chain_id = Some(self.chain_id);
760        tx.nonce = Some(nonce);
761        tx.gas = Some(match self.provider.estimate_gas(tx.clone()).await {
762            Ok(estimated_gas) => buffer_gas_limit(estimated_gas),
763            Err(error) => {
764                tracing::warn!(
765                    %error,
766                    sender = %self.signer_address,
767                    evm_nonce = nonce,
768                    fallback_gas_limit = DIRECT_SUBMITTER_FALLBACK_GAS_LIMIT,
769                    "aws_kms submitter failed to estimate gas, using fallback gas limit"
770                );
771                DIRECT_SUBMITTER_FALLBACK_GAS_LIMIT
772            }
773        });
774
775        let mut gas_price = self.max_gas_price;
776        let mut submitted_hashes = Vec::new();
777        for attempt in 0..=DIRECT_SUBMITTER_REPLACEMENT_ATTEMPTS {
778            tx.max_fee_per_gas = Some(gas_price);
779            tx.max_priority_fee_per_gas = Some(gas_price);
780
781            let signed_tx = tx
782                .clone()
783                .build(&self.wallet)
784                .await
785                .map_err(|error| eyre::eyre!("aws_kms submitter failed to sign tx: {error}"))?;
786            let transaction_hash = format!("{:#066x}", signed_tx.tx_hash());
787            let mut raw_tx = Vec::with_capacity(signed_tx.encode_2718_len());
788            signed_tx.encode_2718(&mut raw_tx);
789            recorder
790                .record_signed_attempt(SignedTransactionAttempt {
791                    submitter: self.signer_address,
792                    nonce,
793                    tx_hash: transaction_hash.clone(),
794                    raw_tx: raw_tx.clone(),
795                })
796                .await?;
797            submitted_hashes.push(transaction_hash.clone());
798
799            let pending_result = timeout(
800                DIRECT_SUBMITTER_SEND_TIMEOUT,
801                self.provider.send_raw_transaction(&raw_tx),
802            )
803            .await;
804            let pending = match pending_result {
805                Ok(Ok(pending)) => pending,
806                Ok(Err(error)) => {
807                    if let Some(submission) = self
808                        .find_confirmed_or_reverted_receipt(nonce, &submitted_hashes)
809                        .await?
810                    {
811                        return Ok(submission);
812                    }
813                    return Err(eyre::eyre!(
814                        "aws_kms submitter send_raw_transaction failed: {error}"
815                    ));
816                }
817                Err(_) => {
818                    if let Some(submission) = self
819                        .find_confirmed_or_reverted_receipt(nonce, &submitted_hashes)
820                        .await?
821                    {
822                        return Ok(submission);
823                    }
824                    tracing::warn!(
825                        tx_hash = %transaction_hash,
826                        evm_nonce = nonce,
827                        attempt,
828                        "aws_kms submitter send_raw_transaction timed out after signing; keeping hash for reconciliation and replacement"
829                    );
830                    if attempt == DIRECT_SUBMITTER_REPLACEMENT_ATTEMPTS {
831                        break;
832                    }
833                    gas_price =
834                        gas_price.saturating_mul(DIRECT_SUBMITTER_REPLACEMENT_GAS_MULTIPLIER);
835                    continue;
836                }
837            };
838
839            let receipt = timeout(DIRECT_SUBMITTER_RECEIPT_TIMEOUT, pending.get_receipt()).await;
840            let receipt = match receipt {
841                Ok(receipt) => receipt.map_err(|error| {
842                    eyre::eyre!("aws_kms submitter failed while awaiting receipt: {error}")
843                })?,
844                Err(_) => {
845                    if let Some(submission) = self
846                        .find_confirmed_or_reverted_receipt(nonce, &submitted_hashes)
847                        .await?
848                    {
849                        return Ok(submission);
850                    }
851                    if attempt == DIRECT_SUBMITTER_REPLACEMENT_ATTEMPTS {
852                        break;
853                    }
854                    let next_gas_price =
855                        gas_price.saturating_mul(DIRECT_SUBMITTER_REPLACEMENT_GAS_MULTIPLIER);
856                    tracing::warn!(
857                        tx_hash = %transaction_hash,
858                        evm_nonce = nonce,
859                        attempt,
860                        gas_price,
861                        next_gas_price,
862                        "aws_kms submitter receipt not available, replacing transaction with bumped gas"
863                    );
864                    gas_price = next_gas_price;
865                    continue;
866                }
867            };
868
869            if !receipt.status() {
870                return Err(eyre::eyre!(
871                    "aws_kms submitter transaction reverted: hash={}",
872                    receipt.transaction_hash
873                ));
874            }
875            return Ok(SubmittedTransaction::new(
876                self.signer_address,
877                nonce,
878                transaction_hash,
879                true,
880            ));
881        }
882
883        let deadline = Instant::now() + DIRECT_SUBMITTER_PENDING_RECONCILIATION_TIMEOUT;
884        while Instant::now() < deadline {
885            if let Some(submission) = self
886                .find_confirmed_or_reverted_receipt(nonce, &submitted_hashes)
887                .await?
888            {
889                return Ok(submission);
890            }
891
892            let chain_nonce = self
893                .provider
894                .get_transaction_count(self.signer_address)
895                .latest()
896                .await
897                .map_err(|error| {
898                    eyre::eyre!(
899                        "aws_kms submitter failed to load latest nonce while monitoring replacements: {error}"
900                    )
901                })?;
902            if chain_nonce > nonce {
903                tracing::warn!(
904                    evm_nonce = nonce,
905                    chain_nonce,
906                    hashes = ?submitted_hashes,
907                    "aws_kms submitter nonce advanced while monitoring replacement receipts"
908                );
909            }
910
911            sleep(DIRECT_SUBMITTER_PENDING_RECONCILIATION_POLL_INTERVAL).await;
912        }
913
914        let Some(primary_hash) = submitted_hashes.last().cloned() else {
915            return Err(eyre::eyre!(
916                "aws_kms submitter did not produce a transaction hash before timeout: nonce={nonce}"
917            ));
918        };
919        tracing::warn!(
920            evm_nonce = nonce,
921            hashes = ?submitted_hashes,
922            "aws_kms submitter replacement transactions are still pending; persisting attempts for reconciliation"
923        );
924        Ok(SubmittedTransaction::with_hashes(
925            self.signer_address,
926            nonce,
927            primary_hash,
928            submitted_hashes,
929            false,
930        ))
931    }
932
933    async fn send_transaction(
934        &self,
935        request: ContractCall,
936        db_nonce_floor: Option<u64>,
937    ) -> EyreResult<SubmittedTransaction> {
938        let nonce = self.select_next_nonce(db_nonce_floor).await?;
939        self.send_transaction_with_nonce(request, nonce).await
940    }
941
942    async fn reconcile_pending_transaction(
943        &self,
944        tx_hash: &str,
945    ) -> EyreResult<PendingTransactionStatus> {
946        let hash = TxHash::from_str(tx_hash).map_err(|error| {
947            eyre::eyre!("invalid aws_kms submitter transaction hash {tx_hash}: {error}")
948        })?;
949        let deadline = Instant::now() + DIRECT_SUBMITTER_PENDING_RECONCILIATION_TIMEOUT;
950
951        while Instant::now() < deadline {
952            let maybe_receipt =
953                self.provider
954                    .get_transaction_receipt(hash)
955                    .await
956                    .map_err(|error| {
957                        eyre::eyre!(
958                            "aws_kms submitter failed while reconciling pending receipt {tx_hash}: {error}"
959                        )
960                    })?;
961
962            if let Some(receipt) = maybe_receipt {
963                if receipt.status() {
964                    return Ok(PendingTransactionStatus::Confirmed(tx_hash.to_string()));
965                }
966                return Ok(PendingTransactionStatus::Failed(format!(
967                    "aws_kms submitter transaction reverted during reconciliation: hash={}",
968                    receipt.transaction_hash
969                )));
970            }
971
972            sleep(DIRECT_SUBMITTER_PENDING_RECONCILIATION_POLL_INTERVAL).await;
973        }
974
975        Ok(PendingTransactionStatus::Pending(format!(
976            "aws_kms submitter transaction remained pending past reconciliation timeout: hash={tx_hash}"
977        )))
978    }
979
980    async fn reconcile_pending_transactions(
981        &self,
982        tx_hashes: &[String],
983    ) -> EyreResult<PendingTransactionStatus> {
984        let deadline = Instant::now() + DIRECT_SUBMITTER_PENDING_RECONCILIATION_TIMEOUT;
985        while Instant::now() < deadline {
986            if let Some(submission) = self
987                .find_confirmed_or_reverted_receipt(0, tx_hashes)
988                .await?
989            {
990                return if submission.confirmed {
991                    Ok(PendingTransactionStatus::Confirmed(submission.hash))
992                } else {
993                    Ok(PendingTransactionStatus::Pending(format!(
994                        "aws_kms submitter replacement attempts still pending: hashes={tx_hashes:?}"
995                    )))
996                };
997            }
998            sleep(DIRECT_SUBMITTER_PENDING_RECONCILIATION_POLL_INTERVAL).await;
999        }
1000
1001        Ok(PendingTransactionStatus::Pending(format!(
1002            "aws_kms submitter replacement attempts remained pending past reconciliation timeout: hashes={tx_hashes:?}"
1003        )))
1004    }
1005}
1006
1007fn contract_call_to_alloy_transaction(
1008    request: ContractCall,
1009    from: Address,
1010    max_gas_price: u128,
1011) -> AlloyTransactionRequest {
1012    AlloyTransactionRequest::default()
1013        .from(from)
1014        .to(request.to)
1015        .value(request.value)
1016        .input(TransactionInput::new(request.data))
1017        .max_fee_per_gas(max_gas_price)
1018        .max_priority_fee_per_gas(max_gas_price)
1019}
1020
1021fn direct_contract_call_to_ethers_transaction(
1022    request: ContractCall,
1023    from: EthersAddress,
1024    max_gas_price: EthersU256,
1025    nonce: SubmittedNonce,
1026    chain_id: u64,
1027) -> TypedTransaction {
1028    let to = EthersAddress::from_slice(request.to.as_slice());
1029    let data = EthersBytes::from(request.data.to_vec());
1030    let value = EthersU256::from_big_endian(&request.value.to_be_bytes::<32>());
1031
1032    let mut tx = TypedTransaction::Eip1559(Eip1559TransactionRequest::new());
1033    tx.set_from(from);
1034    tx.set_to(to);
1035    tx.set_data(data);
1036    tx.set_value(value);
1037    if let TypedTransaction::Eip1559(inner) = &mut tx {
1038        inner.max_fee_per_gas = Some(max_gas_price);
1039        inner.max_priority_fee_per_gas = Some(max_gas_price);
1040    }
1041    tx.set_nonce(EthersU256::from(nonce));
1042    tx.set_chain_id(chain_id);
1043    tx
1044}
1045
1046fn select_submitter_nonce(chain_nonce: u64, db_nonce_floor: Option<u64>) -> u64 {
1047    db_nonce_floor.map_or(chain_nonce, |db_nonce| chain_nonce.max(db_nonce))
1048}
1049
1050fn buffer_gas_limit(estimated_gas: u64) -> u64 {
1051    estimated_gas
1052        .saturating_mul(DIRECT_SUBMITTER_GAS_LIMIT_BUFFER_NUMERATOR)
1053        .div_ceil(DIRECT_SUBMITTER_GAS_LIMIT_BUFFER_DENOMINATOR)
1054        .max(DIRECT_SUBMITTER_FALLBACK_GAS_LIMIT)
1055}
1056
1057fn buffer_ethers_gas_limit(estimated_gas: EthersU256) -> EyreResult<EthersU256> {
1058    if estimated_gas > EthersU256::from(u64::MAX) {
1059        return Err(eyre::eyre!(
1060            "direct submitter estimated gas {estimated_gas} exceeds u64 range"
1061        ));
1062    }
1063    Ok(EthersU256::from(buffer_gas_limit(estimated_gas.as_u64())))
1064}
1065
1066pub struct MockTxRelayer;
1067
1068static MOCK_SUBMITTER_NONCE: AtomicU64 = AtomicU64::new(1);
1069
1070#[async_trait]
1071impl TxRelayerTrait for MockTxRelayer {
1072    fn submitter_address(&self) -> SubmitterId {
1073        Address::ZERO
1074    }
1075
1076    async fn select_next_nonce(&self, db_nonce_floor: Option<u64>) -> EyreResult<SubmittedNonce> {
1077        let next = MOCK_SUBMITTER_NONCE.fetch_add(1, Ordering::Relaxed);
1078        Ok(select_submitter_nonce(next, db_nonce_floor))
1079    }
1080
1081    async fn send_transaction_with_nonce(
1082        &self,
1083        _request: ContractCall,
1084        nonce: SubmittedNonce,
1085    ) -> EyreResult<SubmittedTransaction> {
1086        let mut hash = [0_u8; 32];
1087        hash[24..].copy_from_slice(&nonce.to_be_bytes());
1088        let hash = TxHash::from(hash).to_string();
1089        Ok(SubmittedTransaction::new(Address::ZERO, nonce, hash, true))
1090    }
1091
1092    async fn send_transaction(
1093        &self,
1094        request: ContractCall,
1095        db_nonce_floor: Option<u64>,
1096    ) -> EyreResult<SubmittedTransaction> {
1097        let nonce = self.select_next_nonce(db_nonce_floor).await?;
1098        self.send_transaction_with_nonce(request, nonce).await
1099    }
1100}
1101
1102#[cfg(test)]
1103mod tests {
1104    use super::*;
1105    use alloy::primitives::{address, bytes, U256 as AlloyU256};
1106
1107    fn contract_call() -> ContractCall {
1108        ContractCall {
1109            to: address!("1111111111111111111111111111111111111111"),
1110            value: AlloyU256::ZERO,
1111            data: bytes!("12345678"),
1112            external_id: Some("outbox-1".to_string()),
1113        }
1114    }
1115
1116    #[test]
1117    fn aws_kms_contract_call_conversion_preserves_sender_fee_and_calldata() {
1118        let from = address!("2222222222222222222222222222222222222222");
1119        let tx = contract_call_to_alloy_transaction(contract_call(), from, 1_000_000_000);
1120
1121        assert_eq!(tx.chain_id, None);
1122        assert_eq!(tx.from, Some(from));
1123        assert_eq!(
1124            tx.to,
1125            Some(alloy::primitives::TxKind::Call(address!(
1126                "1111111111111111111111111111111111111111"
1127            )))
1128        );
1129        assert_eq!(tx.value, Some(AlloyU256::ZERO));
1130        assert_eq!(tx.max_fee_per_gas, Some(1_000_000_000));
1131        assert_eq!(tx.max_priority_fee_per_gas, Some(1_000_000_000));
1132        assert_eq!(tx.gas, None);
1133        assert_eq!(tx.input.input().unwrap(), &bytes!("12345678"));
1134    }
1135
1136    #[test]
1137    fn direct_contract_call_conversion_preserves_sender_fee_nonce_and_calldata() {
1138        let from = EthersAddress::from_slice(&[0x22; 20]);
1139        let tx = direct_contract_call_to_ethers_transaction(
1140            contract_call(),
1141            from,
1142            EthersU256::from(1_000_000_000_u64),
1143            42,
1144            31337,
1145        );
1146
1147        let TypedTransaction::Eip1559(inner) = tx else {
1148            panic!("direct submitter must build EIP-1559 transactions");
1149        };
1150        assert_eq!(inner.from, Some(from));
1151        assert_eq!(
1152            inner.to,
1153            Some(ethers::types::NameOrAddress::Address(
1154                EthersAddress::from_slice(
1155                    address!("1111111111111111111111111111111111111111").as_slice()
1156                )
1157            ))
1158        );
1159        assert_eq!(inner.value, Some(EthersU256::zero()));
1160        assert_eq!(
1161            inner.max_fee_per_gas,
1162            Some(EthersU256::from(1_000_000_000_u64))
1163        );
1164        assert_eq!(
1165            inner.max_priority_fee_per_gas,
1166            Some(EthersU256::from(1_000_000_000_u64))
1167        );
1168        assert_eq!(inner.nonce, Some(EthersU256::from(42_u64)));
1169        assert_eq!(inner.chain_id, Some(31337_u64.into()));
1170        assert_eq!(inner.gas, None);
1171        assert_eq!(
1172            inner.data,
1173            Some(EthersBytes::from(vec![0x12, 0x34, 0x56, 0x78]))
1174        );
1175    }
1176
1177    #[test]
1178    fn select_submitter_nonce_uses_max_of_chain_and_db_floor() {
1179        assert_eq!(select_submitter_nonce(7, None), 7);
1180        assert_eq!(select_submitter_nonce(7, Some(3)), 7);
1181        assert_eq!(select_submitter_nonce(7, Some(9)), 9);
1182    }
1183
1184    #[test]
1185    fn aws_kms_gas_limit_buffer_rounds_up() {
1186        assert_eq!(
1187            buffer_gas_limit(21_000),
1188            DIRECT_SUBMITTER_FALLBACK_GAS_LIMIT
1189        );
1190        assert_eq!(
1191            buffer_gas_limit(21_001),
1192            DIRECT_SUBMITTER_FALLBACK_GAS_LIMIT
1193        );
1194        assert_eq!(buffer_gas_limit(250_000), 300_000);
1195    }
1196
1197    #[test]
1198    fn aws_kms_gas_limit_buffer_floors_unusable_estimates() {
1199        assert_eq!(buffer_gas_limit(0), DIRECT_SUBMITTER_FALLBACK_GAS_LIMIT);
1200        assert_eq!(buffer_gas_limit(1), DIRECT_SUBMITTER_FALLBACK_GAS_LIMIT);
1201    }
1202
1203    #[test]
1204    fn direct_gas_limit_buffer_converts_ethers_estimates() {
1205        assert_eq!(
1206            buffer_ethers_gas_limit(EthersU256::from(21_000_u64)).unwrap(),
1207            EthersU256::from(DIRECT_SUBMITTER_FALLBACK_GAS_LIMIT)
1208        );
1209        assert_eq!(
1210            buffer_ethers_gas_limit(EthersU256::from(250_000_u64)).unwrap(),
1211            EthersU256::from(300_000_u64)
1212        );
1213        assert!(buffer_ethers_gas_limit(EthersU256::from(u64::MAX) + EthersU256::one()).is_err());
1214    }
1215}