1use alloy::{
2 eips::eip2718::Encodable2718,
3 network::{EthereumWallet, TransactionBuilder},
4 primitives::{Address, TxHash},
5 providers::{DynProvider, Provider},
6 rpc::types::{TransactionInput, TransactionRequest as AlloyTransactionRequest},
7};
8#[cfg(feature = "aws")]
9use alloy::{providers::ProviderBuilder, signers::Signer as AlloySigner};
10use async_trait::async_trait;
11use ethers::{
12 middleware::SignerMiddleware,
13 providers::{Http, Middleware, Provider as EthersProvider},
14 signers::{LocalWallet, Signer},
15 types::{
16 transaction::{eip1559::Eip1559TransactionRequest, eip2718::TypedTransaction},
17 Address as EthersAddress, BlockId, BlockNumber, Bytes as EthersBytes, H256,
18 U256 as EthersU256,
19 },
20};
21use eyre::Result as EyreResult;
22use hypercall_transaction_submitter_core::{ContractCall, SubmittedNonce, SubmitterId};
23use std::{
24 str::FromStr,
25 sync::{
26 atomic::{AtomicU64, Ordering},
27 Arc,
28 },
29 time::Duration,
30};
31use tokio::time::{sleep, timeout, Instant};
32
33const DIRECT_SUBMITTER_SEND_TIMEOUT: Duration = Duration::from_secs(10);
34const DIRECT_SUBMITTER_RECEIPT_TIMEOUT: Duration = Duration::from_secs(10);
35const DIRECT_SUBMITTER_PENDING_RECONCILIATION_TIMEOUT: Duration = Duration::from_secs(120);
36const DIRECT_SUBMITTER_PENDING_RECONCILIATION_POLL_INTERVAL: Duration = Duration::from_secs(5);
37const DIRECT_SUBMITTER_REPLACEMENT_ATTEMPTS: usize = 6;
38const DIRECT_SUBMITTER_REPLACEMENT_GAS_MULTIPLIER: u128 = 2;
39const DIRECT_SUBMITTER_GAS_LIMIT_BUFFER_NUMERATOR: u64 = 12;
40const DIRECT_SUBMITTER_GAS_LIMIT_BUFFER_DENOMINATOR: u64 = 10;
41const DIRECT_SUBMITTER_FALLBACK_GAS_LIMIT: u64 = 250_000;
42
43#[derive(Clone, Debug, PartialEq, Eq)]
44pub enum PendingTransactionStatus {
45 Unsupported,
46 Pending(String),
47 Confirmed(String),
48 Failed(String),
49}
50
51#[derive(Clone, Debug, PartialEq, Eq)]
52pub struct SubmittedTransaction {
53 pub submitter: SubmitterId,
54 pub nonce: SubmittedNonce,
55 pub hash: String,
56 pub all_hashes: Vec<String>,
57 pub confirmed: bool,
58}
59
60#[derive(Clone, Debug, PartialEq, Eq)]
61pub struct SignedTransactionAttempt {
62 pub submitter: SubmitterId,
63 pub nonce: SubmittedNonce,
64 pub tx_hash: String,
65 pub raw_tx: Vec<u8>,
66}
67
68#[async_trait]
69pub trait SignedAttemptRecorder: Send + Sync {
70 async fn record_signed_attempt(&self, attempt: SignedTransactionAttempt) -> EyreResult<()>;
71}
72
73impl SubmittedTransaction {
74 pub fn new(
75 submitter: SubmitterId,
76 nonce: SubmittedNonce,
77 hash: String,
78 confirmed: bool,
79 ) -> Self {
80 Self {
81 submitter,
82 nonce,
83 all_hashes: vec![hash.clone()],
84 hash,
85 confirmed,
86 }
87 }
88
89 pub fn with_hashes(
90 submitter: SubmitterId,
91 nonce: SubmittedNonce,
92 hash: String,
93 all_hashes: Vec<String>,
94 confirmed: bool,
95 ) -> Self {
96 let all_hashes = if all_hashes.is_empty() {
97 vec![hash.clone()]
98 } else {
99 all_hashes
100 };
101 Self {
102 submitter,
103 nonce,
104 hash,
105 all_hashes,
106 confirmed,
107 }
108 }
109}
110
111#[async_trait]
112pub trait TxRelayerTrait: Send + Sync {
113 fn submitter_address(&self) -> SubmitterId;
114
115 async fn select_next_nonce(&self, db_nonce_floor: Option<u64>) -> EyreResult<SubmittedNonce> {
116 Ok(db_nonce_floor.unwrap_or(0))
117 }
118
119 async fn send_transaction_with_nonce(
120 &self,
121 request: ContractCall,
122 nonce: SubmittedNonce,
123 ) -> EyreResult<SubmittedTransaction> {
124 self.send_transaction(request, Some(nonce)).await
125 }
126
127 async fn send_transaction_with_nonce_recording_attempts(
128 &self,
129 request: ContractCall,
130 nonce: SubmittedNonce,
131 recorder: Arc<dyn SignedAttemptRecorder>,
132 ) -> EyreResult<SubmittedTransaction> {
133 let submission = self.send_transaction_with_nonce(request, nonce).await?;
134 for tx_hash in &submission.all_hashes {
135 recorder
136 .record_signed_attempt(SignedTransactionAttempt {
137 submitter: submission.submitter,
138 nonce: submission.nonce,
139 tx_hash: tx_hash.clone(),
140 raw_tx: Vec::new(),
141 })
142 .await?;
143 }
144 Ok(submission)
145 }
146
147 async fn send_transaction(
148 &self,
149 request: ContractCall,
150 db_nonce_floor: Option<u64>,
151 ) -> EyreResult<SubmittedTransaction> {
152 let nonce = self.select_next_nonce(db_nonce_floor).await?;
153 self.send_transaction_with_nonce(request, nonce).await
154 }
155
156 async fn reconcile_pending_transaction(
157 &self,
158 _tx_hash: &str,
159 ) -> EyreResult<PendingTransactionStatus> {
160 Ok(PendingTransactionStatus::Unsupported)
161 }
162
163 async fn reconcile_pending_transactions(
164 &self,
165 tx_hashes: &[String],
166 ) -> EyreResult<PendingTransactionStatus> {
167 let mut pending_reason = None;
168 for tx_hash in tx_hashes {
169 match self.reconcile_pending_transaction(tx_hash).await? {
170 PendingTransactionStatus::Unsupported => {}
171 PendingTransactionStatus::Pending(reason) => pending_reason = Some(reason),
172 terminal @ PendingTransactionStatus::Confirmed(_) => return Ok(terminal),
173 terminal @ PendingTransactionStatus::Failed(_) => return Ok(terminal),
174 }
175 }
176 if let Some(reason) = pending_reason {
177 return Ok(PendingTransactionStatus::Pending(reason));
178 }
179 Ok(PendingTransactionStatus::Pending(format!(
180 "transaction attempts remained pending past reconciliation timeout: hashes={tx_hashes:?}"
181 )))
182 }
183}
184
185pub struct DirectTxRelayer {
186 client: Arc<SignerMiddleware<EthersProvider<Http>, LocalWallet>>,
187 max_gas_price: EthersU256,
188}
189
190pub struct AwsKmsTxRelayer {
191 provider: DynProvider,
192 wallet: EthereumWallet,
193 signer_address: Address,
194 chain_id: u64,
195 max_gas_price: u128,
196}
197
198impl DirectTxRelayer {
199 pub async fn new(
200 config: &crate::TransactionSubmitterConfig,
201 secrets: &crate::TransactionSubmitterSecrets,
202 ) -> EyreResult<Self> {
203 let provider = EthersProvider::<Http>::try_from(config.rpc_url.as_str())?
204 .interval(Duration::from_millis(100));
205 let chain_id = provider.get_chainid().await?.as_u64();
206 let wallet = secrets
207 .require_transaction_submitter_private_key()
208 .map_err(|err| {
209 eyre::eyre!(
210 "Transaction submitter private key must be available for direct submitter mode: {err}"
211 )
212 })?
213 .parse::<LocalWallet>()?
214 .with_chain_id(chain_id);
215
216 Ok(Self {
217 client: Arc::new(SignerMiddleware::new(provider, wallet)),
218 max_gas_price: EthersU256::from_dec_str(&config.max_gas_price)?,
219 })
220 }
221
222 async fn apply_direct_gas_limit(
223 &self,
224 tx: &mut TypedTransaction,
225 nonce: SubmittedNonce,
226 ) -> EyreResult<()> {
227 let gas_limit = match self.client.estimate_gas(tx, None).await {
228 Ok(estimated_gas) => buffer_ethers_gas_limit(estimated_gas)?,
229 Err(error) => {
230 tracing::warn!(
231 %error,
232 sender = %self.client.address(),
233 evm_nonce = nonce,
234 fallback_gas_limit = DIRECT_SUBMITTER_FALLBACK_GAS_LIMIT,
235 "direct submitter failed to estimate gas, using fallback gas limit"
236 );
237 EthersU256::from(DIRECT_SUBMITTER_FALLBACK_GAS_LIMIT)
238 }
239 };
240 tx.set_gas(gas_limit);
241 Ok(())
242 }
243}
244
245impl AwsKmsTxRelayer {
246 #[cfg(feature = "aws")]
247 pub async fn new(config: &crate::TransactionSubmitterConfig) -> EyreResult<Self> {
248 let signer = hypercall_signer_aws::build_aws_kms_evm_transaction_signer(
249 &config.aws_kms_key_id,
250 &config.rpc_url,
251 )
252 .await?;
253 let signer_address = signer.address();
254 let wallet = EthereumWallet::new(signer);
255 let provider = ProviderBuilder::new()
256 .connect_http(config.rpc_url.parse()?)
257 .erased();
258 let chain_id = provider.get_chain_id().await?;
259
260 Ok(Self {
261 provider,
262 wallet,
263 signer_address,
264 chain_id,
265 max_gas_price: config.max_gas_price.parse::<u128>()?,
266 })
267 }
268
269 #[cfg(not(feature = "aws"))]
270 pub async fn new(_config: &crate::TransactionSubmitterConfig) -> EyreResult<Self> {
271 Err(eyre::eyre!(
272 "AWS KMS transaction submitter requires building hypercall with the aws feature"
273 ))
274 }
275
276 async fn find_confirmed_or_reverted_receipt(
277 &self,
278 nonce: SubmittedNonce,
279 tx_hashes: &[String],
280 ) -> EyreResult<Option<SubmittedTransaction>> {
281 for tx_hash in tx_hashes {
282 let hash = TxHash::from_str(tx_hash).map_err(|error| {
283 eyre::eyre!(
284 "invalid aws_kms submitter replacement transaction hash {tx_hash}: {error}"
285 )
286 })?;
287 let Some(receipt) =
288 self.provider
289 .get_transaction_receipt(hash)
290 .await
291 .map_err(|error| {
292 eyre::eyre!(
293 "aws_kms submitter failed while checking replacement receipt {tx_hash}: {error}"
294 )
295 })?
296 else {
297 continue;
298 };
299
300 if receipt.status() {
301 return Ok(Some(SubmittedTransaction::with_hashes(
302 self.signer_address,
303 nonce,
304 tx_hash.clone(),
305 tx_hashes.to_vec(),
306 true,
307 )));
308 }
309 return Err(eyre::eyre!(
310 "aws_kms submitter transaction reverted: hash={}",
311 receipt.transaction_hash
312 ));
313 }
314 Ok(None)
315 }
316}
317
318#[async_trait]
319impl TxRelayerTrait for DirectTxRelayer {
320 fn submitter_address(&self) -> SubmitterId {
321 Address::from(*self.client.address().as_fixed_bytes())
322 }
323
324 async fn select_next_nonce(&self, db_nonce_floor: Option<u64>) -> EyreResult<SubmittedNonce> {
325 let chain_nonce = self
326 .client
327 .get_transaction_count(
328 self.client.address(),
329 Some(BlockId::Number(BlockNumber::Pending)),
330 )
331 .await
332 .map_err(|error| eyre::eyre!("direct submitter failed to load nonce: {error}"))?
333 .as_u64();
334 Ok(select_submitter_nonce(chain_nonce, db_nonce_floor))
335 }
336
337 async fn send_transaction_with_nonce(
338 &self,
339 request: ContractCall,
340 nonce: SubmittedNonce,
341 ) -> EyreResult<SubmittedTransaction> {
342 let submitter = self.submitter_address();
343 let mut tx = direct_contract_call_to_ethers_transaction(
344 request,
345 self.client.address(),
346 self.max_gas_price,
347 nonce,
348 self.client.signer().chain_id(),
349 );
350 self.apply_direct_gas_limit(&mut tx, nonce).await?;
351 let signature = self
352 .client
353 .signer()
354 .sign_transaction(&tx)
355 .await
356 .map_err(|error| eyre::eyre!("direct submitter failed to sign tx: {error}"))?;
357 let transaction_hash = format!("{:#066x}", tx.hash(&signature));
358 let raw_tx = tx.rlp_signed(&signature);
359
360 let pending = timeout(
361 DIRECT_SUBMITTER_SEND_TIMEOUT,
362 self.client.provider().send_raw_transaction(raw_tx.clone()),
363 )
364 .await
365 .map_err(|_| eyre::eyre!("direct submitter send_transaction timed out after signing"))?
366 .map_err(|error| eyre::eyre!("direct submitter send_transaction failed: {error}"))?;
367 let receipt = timeout(DIRECT_SUBMITTER_RECEIPT_TIMEOUT, pending)
368 .await
369 .map_err(|_| {
370 SubmittedTransaction::new(submitter, nonce, transaction_hash.clone(), false)
371 });
372 let receipt = match receipt {
373 Ok(receipt) => receipt.map_err(|error| {
374 eyre::eyre!("direct submitter failed while awaiting receipt: {error}")
375 })?,
376 Err(pending_submission) => return Ok(pending_submission),
377 };
378 let Some(receipt) = receipt else {
379 tracing::warn!(
380 tx_hash = %transaction_hash,
381 "direct submitter receipt not available, treating as pending"
382 );
383 return Ok(SubmittedTransaction::new(
384 submitter,
385 nonce,
386 transaction_hash,
387 false,
388 ));
389 };
390 if receipt.status != Some(1u64.into()) {
391 return Err(eyre::eyre!(
392 "direct submitter transaction reverted: hash={}",
393 receipt.transaction_hash
394 ));
395 }
396 Ok(SubmittedTransaction::new(
397 submitter,
398 nonce,
399 transaction_hash,
400 true,
401 ))
402 }
403
404 async fn send_transaction_with_nonce_recording_attempts(
405 &self,
406 request: ContractCall,
407 nonce: SubmittedNonce,
408 recorder: Arc<dyn SignedAttemptRecorder>,
409 ) -> EyreResult<SubmittedTransaction> {
410 let submitter = self.submitter_address();
411 let mut tx = direct_contract_call_to_ethers_transaction(
412 request,
413 self.client.address(),
414 self.max_gas_price,
415 nonce,
416 self.client.signer().chain_id(),
417 );
418 self.apply_direct_gas_limit(&mut tx, nonce).await?;
419 let signature = self
420 .client
421 .signer()
422 .sign_transaction(&tx)
423 .await
424 .map_err(|error| eyre::eyre!("direct submitter failed to sign tx: {error}"))?;
425 let transaction_hash = format!("{:#066x}", tx.hash(&signature));
426 let raw_tx = tx.rlp_signed(&signature);
427 recorder
428 .record_signed_attempt(SignedTransactionAttempt {
429 submitter,
430 nonce,
431 tx_hash: transaction_hash.clone(),
432 raw_tx: raw_tx.to_vec(),
433 })
434 .await?;
435
436 let pending = timeout(
437 DIRECT_SUBMITTER_SEND_TIMEOUT,
438 self.client.provider().send_raw_transaction(raw_tx),
439 )
440 .await;
441 let pending = match pending {
442 Ok(Ok(pending)) => pending,
443 Ok(Err(error)) => {
444 tracing::error!(
445 tx_hash = %transaction_hash,
446 %error,
447 "direct submitter send_raw_transaction failed after signed tx was persisted; keeping attempt pending for reconciliation"
448 );
449 return Ok(SubmittedTransaction::new(
450 submitter,
451 nonce,
452 transaction_hash,
453 false,
454 ));
455 }
456 Err(_) => {
457 tracing::error!(
458 tx_hash = %transaction_hash,
459 "direct submitter send_raw_transaction timed out after signed tx was persisted; keeping attempt pending for reconciliation"
460 );
461 return Ok(SubmittedTransaction::new(
462 submitter,
463 nonce,
464 transaction_hash,
465 false,
466 ));
467 }
468 };
469 let receipt = timeout(DIRECT_SUBMITTER_RECEIPT_TIMEOUT, pending)
470 .await
471 .map_err(|_| {
472 SubmittedTransaction::new(submitter, nonce, transaction_hash.clone(), false)
473 });
474 let receipt = match receipt {
475 Ok(receipt) => receipt.map_err(|error| {
476 eyre::eyre!("direct submitter failed while awaiting receipt: {error}")
477 })?,
478 Err(pending_submission) => return Ok(pending_submission),
479 };
480 let Some(receipt) = receipt else {
481 tracing::warn!(
482 tx_hash = %transaction_hash,
483 "direct submitter receipt not available, treating as pending"
484 );
485 return Ok(SubmittedTransaction::new(
486 submitter,
487 nonce,
488 transaction_hash,
489 false,
490 ));
491 };
492 if receipt.status != Some(1u64.into()) {
493 return Err(eyre::eyre!(
494 "direct submitter transaction reverted: hash={}",
495 receipt.transaction_hash
496 ));
497 }
498 Ok(SubmittedTransaction::new(
499 submitter,
500 nonce,
501 transaction_hash,
502 true,
503 ))
504 }
505
506 async fn send_transaction(
507 &self,
508 request: ContractCall,
509 db_nonce_floor: Option<u64>,
510 ) -> EyreResult<SubmittedTransaction> {
511 let nonce = self.select_next_nonce(db_nonce_floor).await?;
512 self.send_transaction_with_nonce(request, nonce).await
513 }
514
515 async fn reconcile_pending_transaction(
516 &self,
517 tx_hash: &str,
518 ) -> EyreResult<PendingTransactionStatus> {
519 let hash = H256::from_str(tx_hash).map_err(|error| {
520 eyre::eyre!("invalid direct submitter transaction hash {tx_hash}: {error}")
521 })?;
522 let deadline = Instant::now() + DIRECT_SUBMITTER_PENDING_RECONCILIATION_TIMEOUT;
523
524 while Instant::now() < deadline {
525 let maybe_receipt = self
526 .client
527 .get_transaction_receipt(hash)
528 .await
529 .map_err(|error| {
530 eyre::eyre!(
531 "direct submitter failed while reconciling pending receipt {tx_hash}: {error}"
532 )
533 })?;
534
535 if let Some(receipt) = maybe_receipt {
536 if receipt.status == Some(1u64.into()) {
537 return Ok(PendingTransactionStatus::Confirmed(tx_hash.to_string()));
538 }
539 return Ok(PendingTransactionStatus::Failed(format!(
540 "direct submitter transaction reverted during reconciliation: hash={}",
541 receipt.transaction_hash
542 )));
543 }
544
545 sleep(DIRECT_SUBMITTER_PENDING_RECONCILIATION_POLL_INTERVAL).await;
546 }
547
548 Ok(PendingTransactionStatus::Pending(format!(
549 "direct submitter transaction remained pending past reconciliation timeout: hash={tx_hash}"
550 )))
551 }
552}
553
554#[async_trait]
555impl TxRelayerTrait for AwsKmsTxRelayer {
556 fn submitter_address(&self) -> SubmitterId {
557 self.signer_address
558 }
559
560 async fn select_next_nonce(&self, db_nonce_floor: Option<u64>) -> EyreResult<SubmittedNonce> {
561 let chain_nonce = self
562 .provider
563 .get_transaction_count(self.signer_address)
564 .pending()
565 .await
566 .map_err(|error| eyre::eyre!("aws_kms submitter failed to load nonce: {error}"))?;
567 Ok(select_submitter_nonce(chain_nonce, db_nonce_floor))
568 }
569
570 async fn send_transaction_with_nonce(
571 &self,
572 request: ContractCall,
573 nonce: SubmittedNonce,
574 ) -> EyreResult<SubmittedTransaction> {
575 let mut tx =
576 contract_call_to_alloy_transaction(request, self.signer_address, self.max_gas_price);
577 tx.chain_id = Some(self.chain_id);
578 tx.nonce = Some(nonce);
579 tx.gas = Some(match self.provider.estimate_gas(tx.clone()).await {
580 Ok(estimated_gas) => buffer_gas_limit(estimated_gas),
581 Err(error) => {
582 tracing::warn!(
583 %error,
584 sender = %self.signer_address,
585 evm_nonce = nonce,
586 fallback_gas_limit = DIRECT_SUBMITTER_FALLBACK_GAS_LIMIT,
587 "aws_kms submitter failed to estimate gas, using fallback gas limit"
588 );
589 DIRECT_SUBMITTER_FALLBACK_GAS_LIMIT
590 }
591 });
592
593 let mut gas_price = self.max_gas_price;
594 let mut submitted_hashes = Vec::new();
595 for attempt in 0..=DIRECT_SUBMITTER_REPLACEMENT_ATTEMPTS {
596 tx.max_fee_per_gas = Some(gas_price);
597 tx.max_priority_fee_per_gas = Some(gas_price);
598
599 let signed_tx = tx
600 .clone()
601 .build(&self.wallet)
602 .await
603 .map_err(|error| eyre::eyre!("aws_kms submitter failed to sign tx: {error}"))?;
604 let transaction_hash = format!("{:#066x}", signed_tx.tx_hash());
605 let mut raw_tx = Vec::with_capacity(signed_tx.encode_2718_len());
606 signed_tx.encode_2718(&mut raw_tx);
607 submitted_hashes.push(transaction_hash.clone());
608
609 let pending_result = timeout(
610 DIRECT_SUBMITTER_SEND_TIMEOUT,
611 self.provider.send_raw_transaction(&raw_tx),
612 )
613 .await;
614 let pending = match pending_result {
615 Ok(Ok(pending)) => pending,
616 Ok(Err(error)) => {
617 if let Some(submission) = self
618 .find_confirmed_or_reverted_receipt(nonce, &submitted_hashes)
619 .await?
620 {
621 return Ok(submission);
622 }
623 tracing::error!(
624 tx_hash = %transaction_hash,
625 %error,
626 "aws_kms submitter send_raw_transaction failed after signing; keeping attempt pending for reconciliation"
627 );
628 return Ok(SubmittedTransaction::new(
629 self.signer_address,
630 nonce,
631 transaction_hash,
632 false,
633 ));
634 }
635 Err(_) => {
636 if let Some(submission) = self
637 .find_confirmed_or_reverted_receipt(nonce, &submitted_hashes)
638 .await?
639 {
640 return Ok(submission);
641 }
642 tracing::warn!(
643 tx_hash = %transaction_hash,
644 evm_nonce = nonce,
645 attempt,
646 "aws_kms submitter send_raw_transaction timed out after signing; keeping hash for reconciliation and replacement"
647 );
648 if attempt == DIRECT_SUBMITTER_REPLACEMENT_ATTEMPTS {
649 break;
650 }
651 gas_price =
652 gas_price.saturating_mul(DIRECT_SUBMITTER_REPLACEMENT_GAS_MULTIPLIER);
653 continue;
654 }
655 };
656
657 let receipt = timeout(DIRECT_SUBMITTER_RECEIPT_TIMEOUT, pending.get_receipt()).await;
658 let receipt = match receipt {
659 Ok(receipt) => receipt.map_err(|error| {
660 eyre::eyre!("aws_kms submitter failed while awaiting receipt: {error}")
661 })?,
662 Err(_) => {
663 if let Some(submission) = self
664 .find_confirmed_or_reverted_receipt(nonce, &submitted_hashes)
665 .await?
666 {
667 return Ok(submission);
668 }
669 if attempt == DIRECT_SUBMITTER_REPLACEMENT_ATTEMPTS {
670 break;
671 }
672 let next_gas_price =
673 gas_price.saturating_mul(DIRECT_SUBMITTER_REPLACEMENT_GAS_MULTIPLIER);
674 tracing::warn!(
675 tx_hash = %transaction_hash,
676 evm_nonce = nonce,
677 attempt,
678 gas_price,
679 next_gas_price,
680 "aws_kms submitter receipt not available, replacing transaction with bumped gas"
681 );
682 gas_price = next_gas_price;
683 continue;
684 }
685 };
686
687 if !receipt.status() {
688 return Err(eyre::eyre!(
689 "aws_kms submitter transaction reverted: hash={}",
690 receipt.transaction_hash
691 ));
692 }
693 return Ok(SubmittedTransaction::new(
694 self.signer_address,
695 nonce,
696 transaction_hash,
697 true,
698 ));
699 }
700
701 let deadline = Instant::now() + DIRECT_SUBMITTER_PENDING_RECONCILIATION_TIMEOUT;
702 while Instant::now() < deadline {
703 if let Some(submission) = self
704 .find_confirmed_or_reverted_receipt(nonce, &submitted_hashes)
705 .await?
706 {
707 return Ok(submission);
708 }
709
710 let chain_nonce = self
711 .provider
712 .get_transaction_count(self.signer_address)
713 .latest()
714 .await
715 .map_err(|error| {
716 eyre::eyre!(
717 "aws_kms submitter failed to load latest nonce while monitoring replacements: {error}"
718 )
719 })?;
720 if chain_nonce > nonce {
721 tracing::warn!(
722 evm_nonce = nonce,
723 chain_nonce,
724 hashes = ?submitted_hashes,
725 "aws_kms submitter nonce advanced while monitoring replacement receipts"
726 );
727 }
728
729 sleep(DIRECT_SUBMITTER_PENDING_RECONCILIATION_POLL_INTERVAL).await;
730 }
731
732 let Some(primary_hash) = submitted_hashes.last().cloned() else {
733 return Err(eyre::eyre!(
734 "aws_kms submitter did not produce a transaction hash before timeout: nonce={nonce}"
735 ));
736 };
737 tracing::warn!(
738 evm_nonce = nonce,
739 hashes = ?submitted_hashes,
740 "aws_kms submitter replacement transactions are still pending; persisting attempts for reconciliation"
741 );
742 Ok(SubmittedTransaction::with_hashes(
743 self.signer_address,
744 nonce,
745 primary_hash,
746 submitted_hashes,
747 false,
748 ))
749 }
750
751 async fn send_transaction_with_nonce_recording_attempts(
752 &self,
753 request: ContractCall,
754 nonce: SubmittedNonce,
755 recorder: Arc<dyn SignedAttemptRecorder>,
756 ) -> EyreResult<SubmittedTransaction> {
757 let mut tx =
758 contract_call_to_alloy_transaction(request, self.signer_address, self.max_gas_price);
759 tx.chain_id = Some(self.chain_id);
760 tx.nonce = Some(nonce);
761 tx.gas = Some(match self.provider.estimate_gas(tx.clone()).await {
762 Ok(estimated_gas) => buffer_gas_limit(estimated_gas),
763 Err(error) => {
764 tracing::warn!(
765 %error,
766 sender = %self.signer_address,
767 evm_nonce = nonce,
768 fallback_gas_limit = DIRECT_SUBMITTER_FALLBACK_GAS_LIMIT,
769 "aws_kms submitter failed to estimate gas, using fallback gas limit"
770 );
771 DIRECT_SUBMITTER_FALLBACK_GAS_LIMIT
772 }
773 });
774
775 let mut gas_price = self.max_gas_price;
776 let mut submitted_hashes = Vec::new();
777 for attempt in 0..=DIRECT_SUBMITTER_REPLACEMENT_ATTEMPTS {
778 tx.max_fee_per_gas = Some(gas_price);
779 tx.max_priority_fee_per_gas = Some(gas_price);
780
781 let signed_tx = tx
782 .clone()
783 .build(&self.wallet)
784 .await
785 .map_err(|error| eyre::eyre!("aws_kms submitter failed to sign tx: {error}"))?;
786 let transaction_hash = format!("{:#066x}", signed_tx.tx_hash());
787 let mut raw_tx = Vec::with_capacity(signed_tx.encode_2718_len());
788 signed_tx.encode_2718(&mut raw_tx);
789 recorder
790 .record_signed_attempt(SignedTransactionAttempt {
791 submitter: self.signer_address,
792 nonce,
793 tx_hash: transaction_hash.clone(),
794 raw_tx: raw_tx.clone(),
795 })
796 .await?;
797 submitted_hashes.push(transaction_hash.clone());
798
799 let pending_result = timeout(
800 DIRECT_SUBMITTER_SEND_TIMEOUT,
801 self.provider.send_raw_transaction(&raw_tx),
802 )
803 .await;
804 let pending = match pending_result {
805 Ok(Ok(pending)) => pending,
806 Ok(Err(error)) => {
807 if let Some(submission) = self
808 .find_confirmed_or_reverted_receipt(nonce, &submitted_hashes)
809 .await?
810 {
811 return Ok(submission);
812 }
813 return Err(eyre::eyre!(
814 "aws_kms submitter send_raw_transaction failed: {error}"
815 ));
816 }
817 Err(_) => {
818 if let Some(submission) = self
819 .find_confirmed_or_reverted_receipt(nonce, &submitted_hashes)
820 .await?
821 {
822 return Ok(submission);
823 }
824 tracing::warn!(
825 tx_hash = %transaction_hash,
826 evm_nonce = nonce,
827 attempt,
828 "aws_kms submitter send_raw_transaction timed out after signing; keeping hash for reconciliation and replacement"
829 );
830 if attempt == DIRECT_SUBMITTER_REPLACEMENT_ATTEMPTS {
831 break;
832 }
833 gas_price =
834 gas_price.saturating_mul(DIRECT_SUBMITTER_REPLACEMENT_GAS_MULTIPLIER);
835 continue;
836 }
837 };
838
839 let receipt = timeout(DIRECT_SUBMITTER_RECEIPT_TIMEOUT, pending.get_receipt()).await;
840 let receipt = match receipt {
841 Ok(receipt) => receipt.map_err(|error| {
842 eyre::eyre!("aws_kms submitter failed while awaiting receipt: {error}")
843 })?,
844 Err(_) => {
845 if let Some(submission) = self
846 .find_confirmed_or_reverted_receipt(nonce, &submitted_hashes)
847 .await?
848 {
849 return Ok(submission);
850 }
851 if attempt == DIRECT_SUBMITTER_REPLACEMENT_ATTEMPTS {
852 break;
853 }
854 let next_gas_price =
855 gas_price.saturating_mul(DIRECT_SUBMITTER_REPLACEMENT_GAS_MULTIPLIER);
856 tracing::warn!(
857 tx_hash = %transaction_hash,
858 evm_nonce = nonce,
859 attempt,
860 gas_price,
861 next_gas_price,
862 "aws_kms submitter receipt not available, replacing transaction with bumped gas"
863 );
864 gas_price = next_gas_price;
865 continue;
866 }
867 };
868
869 if !receipt.status() {
870 return Err(eyre::eyre!(
871 "aws_kms submitter transaction reverted: hash={}",
872 receipt.transaction_hash
873 ));
874 }
875 return Ok(SubmittedTransaction::new(
876 self.signer_address,
877 nonce,
878 transaction_hash,
879 true,
880 ));
881 }
882
883 let deadline = Instant::now() + DIRECT_SUBMITTER_PENDING_RECONCILIATION_TIMEOUT;
884 while Instant::now() < deadline {
885 if let Some(submission) = self
886 .find_confirmed_or_reverted_receipt(nonce, &submitted_hashes)
887 .await?
888 {
889 return Ok(submission);
890 }
891
892 let chain_nonce = self
893 .provider
894 .get_transaction_count(self.signer_address)
895 .latest()
896 .await
897 .map_err(|error| {
898 eyre::eyre!(
899 "aws_kms submitter failed to load latest nonce while monitoring replacements: {error}"
900 )
901 })?;
902 if chain_nonce > nonce {
903 tracing::warn!(
904 evm_nonce = nonce,
905 chain_nonce,
906 hashes = ?submitted_hashes,
907 "aws_kms submitter nonce advanced while monitoring replacement receipts"
908 );
909 }
910
911 sleep(DIRECT_SUBMITTER_PENDING_RECONCILIATION_POLL_INTERVAL).await;
912 }
913
914 let Some(primary_hash) = submitted_hashes.last().cloned() else {
915 return Err(eyre::eyre!(
916 "aws_kms submitter did not produce a transaction hash before timeout: nonce={nonce}"
917 ));
918 };
919 tracing::warn!(
920 evm_nonce = nonce,
921 hashes = ?submitted_hashes,
922 "aws_kms submitter replacement transactions are still pending; persisting attempts for reconciliation"
923 );
924 Ok(SubmittedTransaction::with_hashes(
925 self.signer_address,
926 nonce,
927 primary_hash,
928 submitted_hashes,
929 false,
930 ))
931 }
932
933 async fn send_transaction(
934 &self,
935 request: ContractCall,
936 db_nonce_floor: Option<u64>,
937 ) -> EyreResult<SubmittedTransaction> {
938 let nonce = self.select_next_nonce(db_nonce_floor).await?;
939 self.send_transaction_with_nonce(request, nonce).await
940 }
941
942 async fn reconcile_pending_transaction(
943 &self,
944 tx_hash: &str,
945 ) -> EyreResult<PendingTransactionStatus> {
946 let hash = TxHash::from_str(tx_hash).map_err(|error| {
947 eyre::eyre!("invalid aws_kms submitter transaction hash {tx_hash}: {error}")
948 })?;
949 let deadline = Instant::now() + DIRECT_SUBMITTER_PENDING_RECONCILIATION_TIMEOUT;
950
951 while Instant::now() < deadline {
952 let maybe_receipt =
953 self.provider
954 .get_transaction_receipt(hash)
955 .await
956 .map_err(|error| {
957 eyre::eyre!(
958 "aws_kms submitter failed while reconciling pending receipt {tx_hash}: {error}"
959 )
960 })?;
961
962 if let Some(receipt) = maybe_receipt {
963 if receipt.status() {
964 return Ok(PendingTransactionStatus::Confirmed(tx_hash.to_string()));
965 }
966 return Ok(PendingTransactionStatus::Failed(format!(
967 "aws_kms submitter transaction reverted during reconciliation: hash={}",
968 receipt.transaction_hash
969 )));
970 }
971
972 sleep(DIRECT_SUBMITTER_PENDING_RECONCILIATION_POLL_INTERVAL).await;
973 }
974
975 Ok(PendingTransactionStatus::Pending(format!(
976 "aws_kms submitter transaction remained pending past reconciliation timeout: hash={tx_hash}"
977 )))
978 }
979
980 async fn reconcile_pending_transactions(
981 &self,
982 tx_hashes: &[String],
983 ) -> EyreResult<PendingTransactionStatus> {
984 let deadline = Instant::now() + DIRECT_SUBMITTER_PENDING_RECONCILIATION_TIMEOUT;
985 while Instant::now() < deadline {
986 if let Some(submission) = self
987 .find_confirmed_or_reverted_receipt(0, tx_hashes)
988 .await?
989 {
990 return if submission.confirmed {
991 Ok(PendingTransactionStatus::Confirmed(submission.hash))
992 } else {
993 Ok(PendingTransactionStatus::Pending(format!(
994 "aws_kms submitter replacement attempts still pending: hashes={tx_hashes:?}"
995 )))
996 };
997 }
998 sleep(DIRECT_SUBMITTER_PENDING_RECONCILIATION_POLL_INTERVAL).await;
999 }
1000
1001 Ok(PendingTransactionStatus::Pending(format!(
1002 "aws_kms submitter replacement attempts remained pending past reconciliation timeout: hashes={tx_hashes:?}"
1003 )))
1004 }
1005}
1006
1007fn contract_call_to_alloy_transaction(
1008 request: ContractCall,
1009 from: Address,
1010 max_gas_price: u128,
1011) -> AlloyTransactionRequest {
1012 AlloyTransactionRequest::default()
1013 .from(from)
1014 .to(request.to)
1015 .value(request.value)
1016 .input(TransactionInput::new(request.data))
1017 .max_fee_per_gas(max_gas_price)
1018 .max_priority_fee_per_gas(max_gas_price)
1019}
1020
1021fn direct_contract_call_to_ethers_transaction(
1022 request: ContractCall,
1023 from: EthersAddress,
1024 max_gas_price: EthersU256,
1025 nonce: SubmittedNonce,
1026 chain_id: u64,
1027) -> TypedTransaction {
1028 let to = EthersAddress::from_slice(request.to.as_slice());
1029 let data = EthersBytes::from(request.data.to_vec());
1030 let value = EthersU256::from_big_endian(&request.value.to_be_bytes::<32>());
1031
1032 let mut tx = TypedTransaction::Eip1559(Eip1559TransactionRequest::new());
1033 tx.set_from(from);
1034 tx.set_to(to);
1035 tx.set_data(data);
1036 tx.set_value(value);
1037 if let TypedTransaction::Eip1559(inner) = &mut tx {
1038 inner.max_fee_per_gas = Some(max_gas_price);
1039 inner.max_priority_fee_per_gas = Some(max_gas_price);
1040 }
1041 tx.set_nonce(EthersU256::from(nonce));
1042 tx.set_chain_id(chain_id);
1043 tx
1044}
1045
1046fn select_submitter_nonce(chain_nonce: u64, db_nonce_floor: Option<u64>) -> u64 {
1047 db_nonce_floor.map_or(chain_nonce, |db_nonce| chain_nonce.max(db_nonce))
1048}
1049
1050fn buffer_gas_limit(estimated_gas: u64) -> u64 {
1051 estimated_gas
1052 .saturating_mul(DIRECT_SUBMITTER_GAS_LIMIT_BUFFER_NUMERATOR)
1053 .div_ceil(DIRECT_SUBMITTER_GAS_LIMIT_BUFFER_DENOMINATOR)
1054 .max(DIRECT_SUBMITTER_FALLBACK_GAS_LIMIT)
1055}
1056
1057fn buffer_ethers_gas_limit(estimated_gas: EthersU256) -> EyreResult<EthersU256> {
1058 if estimated_gas > EthersU256::from(u64::MAX) {
1059 return Err(eyre::eyre!(
1060 "direct submitter estimated gas {estimated_gas} exceeds u64 range"
1061 ));
1062 }
1063 Ok(EthersU256::from(buffer_gas_limit(estimated_gas.as_u64())))
1064}
1065
1066pub struct MockTxRelayer;
1067
1068static MOCK_SUBMITTER_NONCE: AtomicU64 = AtomicU64::new(1);
1069
1070#[async_trait]
1071impl TxRelayerTrait for MockTxRelayer {
1072 fn submitter_address(&self) -> SubmitterId {
1073 Address::ZERO
1074 }
1075
1076 async fn select_next_nonce(&self, db_nonce_floor: Option<u64>) -> EyreResult<SubmittedNonce> {
1077 let next = MOCK_SUBMITTER_NONCE.fetch_add(1, Ordering::Relaxed);
1078 Ok(select_submitter_nonce(next, db_nonce_floor))
1079 }
1080
1081 async fn send_transaction_with_nonce(
1082 &self,
1083 _request: ContractCall,
1084 nonce: SubmittedNonce,
1085 ) -> EyreResult<SubmittedTransaction> {
1086 let mut hash = [0_u8; 32];
1087 hash[24..].copy_from_slice(&nonce.to_be_bytes());
1088 let hash = TxHash::from(hash).to_string();
1089 Ok(SubmittedTransaction::new(Address::ZERO, nonce, hash, true))
1090 }
1091
1092 async fn send_transaction(
1093 &self,
1094 request: ContractCall,
1095 db_nonce_floor: Option<u64>,
1096 ) -> EyreResult<SubmittedTransaction> {
1097 let nonce = self.select_next_nonce(db_nonce_floor).await?;
1098 self.send_transaction_with_nonce(request, nonce).await
1099 }
1100}
1101
1102#[cfg(test)]
1103mod tests {
1104 use super::*;
1105 use alloy::primitives::{address, bytes, U256 as AlloyU256};
1106
1107 fn contract_call() -> ContractCall {
1108 ContractCall {
1109 to: address!("1111111111111111111111111111111111111111"),
1110 value: AlloyU256::ZERO,
1111 data: bytes!("12345678"),
1112 external_id: Some("outbox-1".to_string()),
1113 }
1114 }
1115
1116 #[test]
1117 fn aws_kms_contract_call_conversion_preserves_sender_fee_and_calldata() {
1118 let from = address!("2222222222222222222222222222222222222222");
1119 let tx = contract_call_to_alloy_transaction(contract_call(), from, 1_000_000_000);
1120
1121 assert_eq!(tx.chain_id, None);
1122 assert_eq!(tx.from, Some(from));
1123 assert_eq!(
1124 tx.to,
1125 Some(alloy::primitives::TxKind::Call(address!(
1126 "1111111111111111111111111111111111111111"
1127 )))
1128 );
1129 assert_eq!(tx.value, Some(AlloyU256::ZERO));
1130 assert_eq!(tx.max_fee_per_gas, Some(1_000_000_000));
1131 assert_eq!(tx.max_priority_fee_per_gas, Some(1_000_000_000));
1132 assert_eq!(tx.gas, None);
1133 assert_eq!(tx.input.input().unwrap(), &bytes!("12345678"));
1134 }
1135
1136 #[test]
1137 fn direct_contract_call_conversion_preserves_sender_fee_nonce_and_calldata() {
1138 let from = EthersAddress::from_slice(&[0x22; 20]);
1139 let tx = direct_contract_call_to_ethers_transaction(
1140 contract_call(),
1141 from,
1142 EthersU256::from(1_000_000_000_u64),
1143 42,
1144 31337,
1145 );
1146
1147 let TypedTransaction::Eip1559(inner) = tx else {
1148 panic!("direct submitter must build EIP-1559 transactions");
1149 };
1150 assert_eq!(inner.from, Some(from));
1151 assert_eq!(
1152 inner.to,
1153 Some(ethers::types::NameOrAddress::Address(
1154 EthersAddress::from_slice(
1155 address!("1111111111111111111111111111111111111111").as_slice()
1156 )
1157 ))
1158 );
1159 assert_eq!(inner.value, Some(EthersU256::zero()));
1160 assert_eq!(
1161 inner.max_fee_per_gas,
1162 Some(EthersU256::from(1_000_000_000_u64))
1163 );
1164 assert_eq!(
1165 inner.max_priority_fee_per_gas,
1166 Some(EthersU256::from(1_000_000_000_u64))
1167 );
1168 assert_eq!(inner.nonce, Some(EthersU256::from(42_u64)));
1169 assert_eq!(inner.chain_id, Some(31337_u64.into()));
1170 assert_eq!(inner.gas, None);
1171 assert_eq!(
1172 inner.data,
1173 Some(EthersBytes::from(vec![0x12, 0x34, 0x56, 0x78]))
1174 );
1175 }
1176
1177 #[test]
1178 fn select_submitter_nonce_uses_max_of_chain_and_db_floor() {
1179 assert_eq!(select_submitter_nonce(7, None), 7);
1180 assert_eq!(select_submitter_nonce(7, Some(3)), 7);
1181 assert_eq!(select_submitter_nonce(7, Some(9)), 9);
1182 }
1183
1184 #[test]
1185 fn aws_kms_gas_limit_buffer_rounds_up() {
1186 assert_eq!(
1187 buffer_gas_limit(21_000),
1188 DIRECT_SUBMITTER_FALLBACK_GAS_LIMIT
1189 );
1190 assert_eq!(
1191 buffer_gas_limit(21_001),
1192 DIRECT_SUBMITTER_FALLBACK_GAS_LIMIT
1193 );
1194 assert_eq!(buffer_gas_limit(250_000), 300_000);
1195 }
1196
1197 #[test]
1198 fn aws_kms_gas_limit_buffer_floors_unusable_estimates() {
1199 assert_eq!(buffer_gas_limit(0), DIRECT_SUBMITTER_FALLBACK_GAS_LIMIT);
1200 assert_eq!(buffer_gas_limit(1), DIRECT_SUBMITTER_FALLBACK_GAS_LIMIT);
1201 }
1202
1203 #[test]
1204 fn direct_gas_limit_buffer_converts_ethers_estimates() {
1205 assert_eq!(
1206 buffer_ethers_gas_limit(EthersU256::from(21_000_u64)).unwrap(),
1207 EthersU256::from(DIRECT_SUBMITTER_FALLBACK_GAS_LIMIT)
1208 );
1209 assert_eq!(
1210 buffer_ethers_gas_limit(EthersU256::from(250_000_u64)).unwrap(),
1211 EthersU256::from(300_000_u64)
1212 );
1213 assert!(buffer_ethers_gas_limit(EthersU256::from(u64::MAX) + EthersU256::one()).is_err());
1214 }
1215}