Skip to main content

hypercall/startup/
transaction_submitter.rs

1use std::sync::Arc;
2use std::time::Instant;
3
4use metrics::gauge;
5use tokio::sync::mpsc;
6use tracing::{error, info};
7
8use crate::backend_config::BackendRuntime;
9use crate::messaging::EventBusTrait;
10use crate::shared::shutdown::Shutdown;
11use crate::shared::task_group::TaskGroup;
12use hypercall_db_diesel::DieselDb;
13use hypercall_transaction_submitter::{TransactionSubmitter, TransactionSubmitterMode};
14
15struct TransactionSubmitterEventBus {
16    inner: Arc<dyn EventBusTrait>,
17}
18
19impl TransactionSubmitterEventBus {
20    fn new(inner: Arc<dyn EventBusTrait>) -> Self {
21        Self { inner }
22    }
23}
24
25#[async_trait::async_trait]
26impl hypercall_transaction_submitter::EventBusTrait for TransactionSubmitterEventBus {
27    fn get_sender(&self) -> mpsc::UnboundedSender<hypercall_types::EngineMessage> {
28        self.inner.get_sender()
29    }
30
31    async fn subscribe(
32        &self,
33        topics: Vec<String>,
34    ) -> Result<mpsc::UnboundedReceiver<hypercall_types::EngineMessage>, String> {
35        self.inner.subscribe(topics).await
36    }
37}
38
39pub struct TransactionSubmitterResources {}
40
41pub async fn build_transaction_submitter_resources(
42    runtime: &BackendRuntime,
43    event_bus: Arc<dyn EventBusTrait>,
44    tx_submitter_db: Option<Arc<DieselDb>>,
45    shutdown: &Shutdown,
46    task_group: &mut TaskGroup,
47) -> anyhow::Result<TransactionSubmitterResources> {
48    let t = Instant::now();
49    let transaction_submitter_mode = runtime.config.transaction_submitter.mode;
50    let transaction_submitter_event_bus: Arc<dyn hypercall_transaction_submitter::EventBusTrait> =
51        Arc::new(TransactionSubmitterEventBus::new(event_bus));
52    let transaction_submitter = build_transaction_submitter(
53        runtime,
54        transaction_submitter_event_bus,
55        tx_submitter_db,
56        transaction_submitter_mode,
57    )
58    .await?;
59    record_startup_timing("transaction_submitter", t.elapsed());
60
61    start_transaction_submitter(transaction_submitter.clone(), shutdown, task_group);
62
63    Ok(TransactionSubmitterResources {})
64}
65
66async fn build_transaction_submitter(
67    runtime: &BackendRuntime,
68    event_bus: Arc<dyn hypercall_transaction_submitter::EventBusTrait>,
69    tx_submitter_db: Option<Arc<DieselDb>>,
70    transaction_submitter_mode: TransactionSubmitterMode,
71) -> anyhow::Result<Arc<TransactionSubmitter>> {
72    let secrets = runtime.secrets.transaction_submitter_secrets();
73
74    #[cfg(any(debug_assertions, test))]
75    {
76        match transaction_submitter_mode {
77            TransactionSubmitterMode::Mock => {
78                info!("Using mock TransactionSubmitter");
79                let mut submitter = TransactionSubmitter::new_mock(
80                    event_bus,
81                    &runtime.config.contracts.exchange_contract_address,
82                )
83                .map_err(|e| {
84                    anyhow::anyhow!("Failed to initialize mock TransactionSubmitter: {}", e)
85                })?;
86                if let Some(db) = tx_submitter_db.clone() {
87                    submitter = submitter.with_db(db.clone()).with_submitter_store(db);
88                }
89                Ok(Arc::new(submitter))
90            }
91            TransactionSubmitterMode::Direct => {
92                let mut submitter = TransactionSubmitter::new_direct(
93                    event_bus,
94                    &runtime.config.transaction_submitter,
95                    &runtime.config.contracts.exchange_contract_address,
96                    &secrets,
97                )
98                .await
99                .map_err(|e| {
100                    error!("Failed to initialize direct TransactionSubmitter: {}", e);
101                    anyhow::anyhow!("Failed to initialize direct TransactionSubmitter: {}", e)
102                })?;
103                if let Some(db) = tx_submitter_db.clone() {
104                    submitter = submitter.with_db(db.clone()).with_submitter_store(db);
105                }
106                Ok(Arc::new(submitter))
107            }
108            TransactionSubmitterMode::AwsKms => {
109                let mut submitter = TransactionSubmitter::new_aws_kms(
110                    event_bus,
111                    &runtime.config.transaction_submitter,
112                    &runtime.config.contracts.exchange_contract_address,
113                )
114                .await
115                .map_err(|e| {
116                    error!("Failed to initialize AWS KMS TransactionSubmitter: {}", e);
117                    anyhow::anyhow!("Failed to initialize AWS KMS TransactionSubmitter: {}", e)
118                })?;
119                if let Some(db) = tx_submitter_db.clone() {
120                    submitter = submitter.with_db(db.clone()).with_submitter_store(db);
121                }
122                Ok(Arc::new(submitter))
123            }
124        }
125    }
126
127    #[cfg(not(any(debug_assertions, test)))]
128    {
129        match transaction_submitter_mode {
130            TransactionSubmitterMode::Mock => {
131                info!("Using mock TransactionSubmitter (mock mode configured)");
132                let mut submitter = TransactionSubmitter::new_mock(
133                    event_bus,
134                    &runtime.config.contracts.exchange_contract_address,
135                )
136                .map_err(|e| {
137                    anyhow::anyhow!("Failed to initialize mock TransactionSubmitter: {}", e)
138                })?;
139                if let Some(db) = tx_submitter_db.clone() {
140                    submitter = submitter.with_db(db.clone()).with_submitter_store(db);
141                }
142                Ok(Arc::new(submitter))
143            }
144            TransactionSubmitterMode::Direct => {
145                let mut submitter = TransactionSubmitter::new_direct(
146                    event_bus,
147                    &runtime.config.transaction_submitter,
148                    &runtime.config.contracts.exchange_contract_address,
149                    &secrets,
150                )
151                .await
152                .map_err(|e| {
153                    error!("Failed to initialize direct TransactionSubmitter: {}", e);
154                    anyhow::anyhow!("Failed to initialize direct TransactionSubmitter: {}", e)
155                })?;
156                if let Some(db) = tx_submitter_db.clone() {
157                    submitter = submitter.with_db(db.clone()).with_submitter_store(db);
158                }
159                Ok(Arc::new(submitter))
160            }
161            TransactionSubmitterMode::AwsKms => {
162                let mut submitter = TransactionSubmitter::new_aws_kms(
163                    event_bus,
164                    &runtime.config.transaction_submitter,
165                    &runtime.config.contracts.exchange_contract_address,
166                )
167                .await
168                .map_err(|e| {
169                    error!("Failed to initialize AWS KMS TransactionSubmitter: {}", e);
170                    anyhow::anyhow!("Failed to initialize AWS KMS TransactionSubmitter: {}", e)
171                })?;
172                if let Some(db) = tx_submitter_db.clone() {
173                    submitter = submitter.with_db(db.clone()).with_submitter_store(db);
174                }
175                Ok(Arc::new(submitter))
176            }
177        }
178    }
179}
180
181fn start_transaction_submitter(
182    transaction_submitter: Arc<TransactionSubmitter>,
183    shutdown: &Shutdown,
184    task_group: &mut TaskGroup,
185) {
186    match transaction_submitter.start_with_shutdown(shutdown.subscribe()) {
187        Ok(handle) => {
188            task_group.spawn("TransactionSubmitter", async move {
189                if let Err(e) = handle.await {
190                    error!("TransactionSubmitter task panicked: {:?}", e);
191                }
192                Ok(())
193            });
194            info!("✓ Transaction submitter service started");
195        }
196        Err(e) => {
197            error!("Failed to start transaction submitter: {}", e);
198        }
199    }
200}
201
202fn record_startup_timing(phase: &'static str, elapsed: std::time::Duration) {
203    info!("[startup-timing] {}: {:?}", phase, elapsed);
204    gauge!("ht_startup_phase_seconds", "phase" => phase).set(elapsed.as_secs_f64());
205}
206
207#[cfg(test)]
208mod tests {
209    use super::*;
210    use crate::backend_config::{BackendConfig, BackendSecrets};
211    use crate::messaging::ChannelEventBus;
212    use std::time::Duration;
213
214    #[tokio::test]
215    async fn mock_transaction_submitter_starts_without_sync_relayer() {
216        let mut config = BackendConfig::default();
217        config.transaction_submitter.mode = TransactionSubmitterMode::Mock;
218        config.contracts.exchange_contract_address =
219            "0x0000000000000000000000000000000000000001".to_string();
220        let runtime = BackendRuntime {
221            config,
222            secrets: BackendSecrets::default(),
223        };
224        let event_bus = Arc::new(ChannelEventBus::new().expect("event bus should construct"));
225        let shutdown = Shutdown::new();
226        let mut task_group = TaskGroup::new();
227
228        let _resources = build_transaction_submitter_resources(
229            &runtime,
230            event_bus,
231            None,
232            &shutdown,
233            &mut task_group,
234        )
235        .await
236        .expect("mock transaction submitter should initialize");
237
238        assert_eq!(task_group.len(), 1);
239
240        task_group
241            .shutdown_and_join(&shutdown, Duration::from_secs(1))
242            .await
243            .expect("mock transaction submitter task should stop on shutdown");
244    }
245}