hypercall/startup/
journal.rs1use crate::backend_config::BackendRuntime;
2use crate::db_handler::DbAuthConfig;
3use crate::journal::{
4 EngineJournalBatcher, EngineJournalWriter, JournalBatchSender, JournalBatcherConfig,
5 SharedEngineJournalWriter,
6};
7use crate::shared::shutdown::Shutdown;
8use crate::shared::task_group::TaskGroup;
9use anyhow::Context;
10#[cfg(feature = "rsm-state")]
11use std::str::FromStr;
12use std::sync::Arc;
13use tracing::info;
14
15pub struct JournalResources {
16 pub engine_journal_writer: Option<SharedEngineJournalWriter>,
17 pub journal_batch_sender: Option<JournalBatchSender>,
18}
19
20pub fn build_journal_resources(
21 runtime: &BackendRuntime,
22 db_auth: &DbAuthConfig,
23 shutdown: &Shutdown,
24 task_group: &mut TaskGroup,
25) -> anyhow::Result<JournalResources> {
26 let journal_enabled = std::env::var("ENGINE_JOURNAL_ENABLED")
35 .map(|v| v == "true" || v == "1")
36 .unwrap_or(false);
37
38 let (engine_journal_writer, journal_batch_sender) = if journal_enabled {
39 let journal_writer = match EngineJournalWriter::from_auth_config(
43 db_auth,
44 runtime.config.database.pool.journal_max,
45 &runtime.config.engine.persistence.journal,
46 ) {
47 Ok(writer) => {
48 info!("✓ EngineJournalWriter enabled - durable ACK + idempotency active");
49 Some(Arc::new(writer) as SharedEngineJournalWriter)
50 }
51 Err(error) => {
52 return Err(anyhow::anyhow!(
53 "Failed to create EngineJournalWriter: {}",
54 error
55 ));
56 }
57 };
58
59 let batcher_config =
63 JournalBatcherConfig::from_config(&runtime.config.engine.persistence.journal);
64 let batcher_shutdown_rx = shutdown.subscribe();
65 #[cfg(feature = "rsm-state")]
66 let block_signer = runtime
67 .secrets
68 .transaction_submitter_private_key
69 .as_deref()
70 .map(alloy::signers::local::PrivateKeySigner::from_str)
71 .transpose()
72 .map_err(|error| {
73 anyhow::anyhow!(
74 "Failed to parse TRANSACTION_SUBMITTER_PRIVATE_KEY for block signing: {}",
75 error
76 )
77 })?;
78 #[cfg(not(feature = "rsm-state"))]
79 let block_signer = None;
80 #[cfg(feature = "rsm-state")]
81 if block_signer.is_some() {
82 info!("Block signing enabled (transaction submitter key loaded)");
83 }
84 let journal_rw_pool = hypercall_db_diesel::build_db_pool(db_auth, 2, 30_000, 10_000)
85 .context("Failed to create RW pool for journal batcher")?;
86 let (batcher, batch_sender) = EngineJournalBatcher::new(
87 journal_rw_pool,
88 batcher_config.clone(),
89 batcher_shutdown_rx,
90 block_signer,
91 );
92
93 task_group.spawn("EngineJournalBatcher", async move {
94 batcher.run().await;
95 Ok(())
96 });
97
98 info!(
99 "✓ EngineJournalBatcher enabled (wal_first=true, group_commit=true, batch_size={})",
100 batcher_config.max_batch_size,
101 );
102
103 (journal_writer, Some(batch_sender))
104 } else {
105 info!("Journal disabled (ENGINE_JOURNAL_ENABLED not set)");
106 crate::journal::mock_journal_backends()
107 };
108
109 Ok(JournalResources {
110 engine_journal_writer,
111 journal_batch_sender,
112 })
113}
114
115#[cfg(test)]
116mod tests {
117 use super::*;
118 use serial_test::serial;
119
120 #[test]
121 #[serial]
122 fn disabled_journal_uses_mock_writer_without_batch_sender() {
123 let previous = std::env::var("ENGINE_JOURNAL_ENABLED").ok();
124 std::env::remove_var("ENGINE_JOURNAL_ENABLED");
125
126 let runtime = BackendRuntime {
127 config: crate::backend_config::BackendConfig::default(),
128 secrets: crate::backend_config::BackendSecrets::default(),
129 };
130 let db_auth = DbAuthConfig::password("postgres://user:pass@localhost/hypercall_test");
131 let shutdown = Shutdown::new();
132 let mut task_group = TaskGroup::new();
133
134 let resources =
135 build_journal_resources(&runtime, &db_auth, &shutdown, &mut task_group).unwrap();
136
137 assert!(resources.engine_journal_writer.is_some());
138 assert!(resources.journal_batch_sender.is_none());
139 assert!(task_group.is_empty());
140
141 match previous {
142 Some(value) => std::env::set_var("ENGINE_JOURNAL_ENABLED", value),
143 None => std::env::remove_var("ENGINE_JOURNAL_ENABLED"),
144 }
145 }
146}