hypercall/startup/
transaction_submitter.rs1use std::sync::Arc;
2use std::time::Instant;
3
4use metrics::gauge;
5use tokio::sync::mpsc;
6use tracing::{error, info};
7
8use crate::backend_config::BackendRuntime;
9use crate::messaging::EventBusTrait;
10use crate::shared::shutdown::Shutdown;
11use crate::shared::task_group::TaskGroup;
12use hypercall_db_diesel::DieselDb;
13use hypercall_transaction_submitter::{TransactionSubmitter, TransactionSubmitterMode};
14
15struct TransactionSubmitterEventBus {
16 inner: Arc<dyn EventBusTrait>,
17}
18
19impl TransactionSubmitterEventBus {
20 fn new(inner: Arc<dyn EventBusTrait>) -> Self {
21 Self { inner }
22 }
23}
24
25#[async_trait::async_trait]
26impl hypercall_transaction_submitter::EventBusTrait for TransactionSubmitterEventBus {
27 fn get_sender(&self) -> mpsc::UnboundedSender<hypercall_types::EngineMessage> {
28 self.inner.get_sender()
29 }
30
31 async fn subscribe(
32 &self,
33 topics: Vec<String>,
34 ) -> Result<mpsc::UnboundedReceiver<hypercall_types::EngineMessage>, String> {
35 self.inner.subscribe(topics).await
36 }
37}
38
39pub struct TransactionSubmitterResources {}
40
41pub async fn build_transaction_submitter_resources(
42 runtime: &BackendRuntime,
43 event_bus: Arc<dyn EventBusTrait>,
44 tx_submitter_db: Option<Arc<DieselDb>>,
45 shutdown: &Shutdown,
46 task_group: &mut TaskGroup,
47) -> anyhow::Result<TransactionSubmitterResources> {
48 let t = Instant::now();
49 let transaction_submitter_mode = runtime.config.transaction_submitter.mode;
50 let transaction_submitter_event_bus: Arc<dyn hypercall_transaction_submitter::EventBusTrait> =
51 Arc::new(TransactionSubmitterEventBus::new(event_bus));
52 let transaction_submitter = build_transaction_submitter(
53 runtime,
54 transaction_submitter_event_bus,
55 tx_submitter_db,
56 transaction_submitter_mode,
57 )
58 .await?;
59 record_startup_timing("transaction_submitter", t.elapsed());
60
61 start_transaction_submitter(transaction_submitter.clone(), shutdown, task_group);
62
63 Ok(TransactionSubmitterResources {})
64}
65
66async fn build_transaction_submitter(
67 runtime: &BackendRuntime,
68 event_bus: Arc<dyn hypercall_transaction_submitter::EventBusTrait>,
69 tx_submitter_db: Option<Arc<DieselDb>>,
70 transaction_submitter_mode: TransactionSubmitterMode,
71) -> anyhow::Result<Arc<TransactionSubmitter>> {
72 let secrets = runtime.secrets.transaction_submitter_secrets();
73
74 #[cfg(any(debug_assertions, test))]
75 {
76 match transaction_submitter_mode {
77 TransactionSubmitterMode::Mock => {
78 info!("Using mock TransactionSubmitter");
79 let mut submitter = TransactionSubmitter::new_mock(
80 event_bus,
81 &runtime.config.contracts.exchange_contract_address,
82 )
83 .map_err(|e| {
84 anyhow::anyhow!("Failed to initialize mock TransactionSubmitter: {}", e)
85 })?;
86 if let Some(db) = tx_submitter_db.clone() {
87 submitter = submitter.with_db(db.clone()).with_submitter_store(db);
88 }
89 Ok(Arc::new(submitter))
90 }
91 TransactionSubmitterMode::Direct => {
92 let mut submitter = TransactionSubmitter::new_direct(
93 event_bus,
94 &runtime.config.transaction_submitter,
95 &runtime.config.contracts.exchange_contract_address,
96 &secrets,
97 )
98 .await
99 .map_err(|e| {
100 error!("Failed to initialize direct TransactionSubmitter: {}", e);
101 anyhow::anyhow!("Failed to initialize direct TransactionSubmitter: {}", e)
102 })?;
103 if let Some(db) = tx_submitter_db.clone() {
104 submitter = submitter.with_db(db.clone()).with_submitter_store(db);
105 }
106 Ok(Arc::new(submitter))
107 }
108 TransactionSubmitterMode::AwsKms => {
109 let mut submitter = TransactionSubmitter::new_aws_kms(
110 event_bus,
111 &runtime.config.transaction_submitter,
112 &runtime.config.contracts.exchange_contract_address,
113 )
114 .await
115 .map_err(|e| {
116 error!("Failed to initialize AWS KMS TransactionSubmitter: {}", e);
117 anyhow::anyhow!("Failed to initialize AWS KMS TransactionSubmitter: {}", e)
118 })?;
119 if let Some(db) = tx_submitter_db.clone() {
120 submitter = submitter.with_db(db.clone()).with_submitter_store(db);
121 }
122 Ok(Arc::new(submitter))
123 }
124 }
125 }
126
127 #[cfg(not(any(debug_assertions, test)))]
128 {
129 match transaction_submitter_mode {
130 TransactionSubmitterMode::Mock => {
131 info!("Using mock TransactionSubmitter (mock mode configured)");
132 let mut submitter = TransactionSubmitter::new_mock(
133 event_bus,
134 &runtime.config.contracts.exchange_contract_address,
135 )
136 .map_err(|e| {
137 anyhow::anyhow!("Failed to initialize mock TransactionSubmitter: {}", e)
138 })?;
139 if let Some(db) = tx_submitter_db.clone() {
140 submitter = submitter.with_db(db.clone()).with_submitter_store(db);
141 }
142 Ok(Arc::new(submitter))
143 }
144 TransactionSubmitterMode::Direct => {
145 let mut submitter = TransactionSubmitter::new_direct(
146 event_bus,
147 &runtime.config.transaction_submitter,
148 &runtime.config.contracts.exchange_contract_address,
149 &secrets,
150 )
151 .await
152 .map_err(|e| {
153 error!("Failed to initialize direct TransactionSubmitter: {}", e);
154 anyhow::anyhow!("Failed to initialize direct TransactionSubmitter: {}", e)
155 })?;
156 if let Some(db) = tx_submitter_db.clone() {
157 submitter = submitter.with_db(db.clone()).with_submitter_store(db);
158 }
159 Ok(Arc::new(submitter))
160 }
161 TransactionSubmitterMode::AwsKms => {
162 let mut submitter = TransactionSubmitter::new_aws_kms(
163 event_bus,
164 &runtime.config.transaction_submitter,
165 &runtime.config.contracts.exchange_contract_address,
166 )
167 .await
168 .map_err(|e| {
169 error!("Failed to initialize AWS KMS TransactionSubmitter: {}", e);
170 anyhow::anyhow!("Failed to initialize AWS KMS TransactionSubmitter: {}", e)
171 })?;
172 if let Some(db) = tx_submitter_db.clone() {
173 submitter = submitter.with_db(db.clone()).with_submitter_store(db);
174 }
175 Ok(Arc::new(submitter))
176 }
177 }
178 }
179}
180
181fn start_transaction_submitter(
182 transaction_submitter: Arc<TransactionSubmitter>,
183 shutdown: &Shutdown,
184 task_group: &mut TaskGroup,
185) {
186 match transaction_submitter.start_with_shutdown(shutdown.subscribe()) {
187 Ok(handle) => {
188 task_group.spawn("TransactionSubmitter", async move {
189 if let Err(e) = handle.await {
190 error!("TransactionSubmitter task panicked: {:?}", e);
191 }
192 Ok(())
193 });
194 info!("✓ Transaction submitter service started");
195 }
196 Err(e) => {
197 error!("Failed to start transaction submitter: {}", e);
198 }
199 }
200}
201
202fn record_startup_timing(phase: &'static str, elapsed: std::time::Duration) {
203 info!("[startup-timing] {}: {:?}", phase, elapsed);
204 gauge!("ht_startup_phase_seconds", "phase" => phase).set(elapsed.as_secs_f64());
205}
206
207#[cfg(test)]
208mod tests {
209 use super::*;
210 use crate::backend_config::{BackendConfig, BackendSecrets};
211 use crate::messaging::ChannelEventBus;
212 use std::time::Duration;
213
214 #[tokio::test]
215 async fn mock_transaction_submitter_starts_without_sync_relayer() {
216 let mut config = BackendConfig::default();
217 config.transaction_submitter.mode = TransactionSubmitterMode::Mock;
218 config.contracts.exchange_contract_address =
219 "0x0000000000000000000000000000000000000001".to_string();
220 let runtime = BackendRuntime {
221 config,
222 secrets: BackendSecrets::default(),
223 };
224 let event_bus = Arc::new(ChannelEventBus::new().expect("event bus should construct"));
225 let shutdown = Shutdown::new();
226 let mut task_group = TaskGroup::new();
227
228 let _resources = build_transaction_submitter_resources(
229 &runtime,
230 event_bus,
231 None,
232 &shutdown,
233 &mut task_group,
234 )
235 .await
236 .expect("mock transaction submitter should initialize");
237
238 assert_eq!(task_group.len(), 1);
239
240 task_group
241 .shutdown_and_join(&shutdown, Duration::from_secs(1))
242 .await
243 .expect("mock transaction submitter task should stop on shutdown");
244 }
245}