Skip to main content

hypercall/startup/
journal.rs

1use crate::backend_config::BackendRuntime;
2use crate::db_handler::DbAuthConfig;
3use crate::journal::{
4    EngineJournalBatcher, EngineJournalWriter, JournalBatchSender, JournalBatcherConfig,
5    SharedEngineJournalWriter,
6};
7use crate::shared::shutdown::Shutdown;
8use crate::shared::task_group::TaskGroup;
9use anyhow::Context;
10#[cfg(feature = "rsm-state")]
11use std::str::FromStr;
12use std::sync::Arc;
13use tracing::info;
14
15pub struct JournalResources {
16    pub engine_journal_writer: Option<SharedEngineJournalWriter>,
17    pub journal_batch_sender: Option<JournalBatchSender>,
18}
19
20pub fn build_journal_resources(
21    runtime: &BackendRuntime,
22    db_auth: &DbAuthConfig,
23    shutdown: &Shutdown,
24    task_group: &mut TaskGroup,
25) -> anyhow::Result<JournalResources> {
26    // Journal + batcher enable durable ACK and async PostgreSQL replication.
27    // Enabled when ENGINE_JOURNAL_ENABLED is set (all deployments set this).
28    // When not set (tests, local dev without WAL), the engine falls back to mock journal below.
29    //
30    // Construction happens before replay because `EngineJournalBatcher::new`
31    // runs WAL recovery synchronously, writing any unreplicated WAL tail into
32    // Postgres (`engine_commands`, `engine_events`, `fills`, audit tables).
33    // Engine cash recovery is snapshot + journal replay-bound.
34    let journal_enabled = std::env::var("ENGINE_JOURNAL_ENABLED")
35        .map(|v| v == "true" || v == "1")
36        .unwrap_or(false);
37
38    let (engine_journal_writer, journal_batch_sender) = if journal_enabled {
39        // Create journal writer, always use the read-write URL so the journal is
40        // ready to persist entries as soon as orders arrive after promote. During
41        // standby the journal is idle, so having an RW pool here is harmless.
42        let journal_writer = match EngineJournalWriter::from_auth_config(
43            db_auth,
44            runtime.config.database.pool.journal_max,
45            &runtime.config.engine.persistence.journal,
46        ) {
47            Ok(writer) => {
48                info!("✓ EngineJournalWriter enabled - durable ACK + idempotency active");
49                Some(Arc::new(writer) as SharedEngineJournalWriter)
50            }
51            Err(error) => {
52                return Err(anyhow::anyhow!(
53                    "Failed to create EngineJournalWriter: {}",
54                    error
55                ));
56            }
57        };
58
59        // Create journal batcher, constructor runs WAL recovery inline. Like the
60        // journal writer above, the batcher always uses the read-write URL so
61        // replication inserts succeed after promote.
62        let batcher_config =
63            JournalBatcherConfig::from_config(&runtime.config.engine.persistence.journal);
64        let batcher_shutdown_rx = shutdown.subscribe();
65        #[cfg(feature = "rsm-state")]
66        let block_signer = runtime
67            .secrets
68            .transaction_submitter_private_key
69            .as_deref()
70            .map(alloy::signers::local::PrivateKeySigner::from_str)
71            .transpose()
72            .map_err(|error| {
73                anyhow::anyhow!(
74                    "Failed to parse TRANSACTION_SUBMITTER_PRIVATE_KEY for block signing: {}",
75                    error
76                )
77            })?;
78        #[cfg(not(feature = "rsm-state"))]
79        let block_signer = None;
80        #[cfg(feature = "rsm-state")]
81        if block_signer.is_some() {
82            info!("Block signing enabled (transaction submitter key loaded)");
83        }
84        let journal_rw_pool = hypercall_db_diesel::build_db_pool(db_auth, 2, 30_000, 10_000)
85            .context("Failed to create RW pool for journal batcher")?;
86        let (batcher, batch_sender) = EngineJournalBatcher::new(
87            journal_rw_pool,
88            batcher_config.clone(),
89            batcher_shutdown_rx,
90            block_signer,
91        );
92
93        task_group.spawn("EngineJournalBatcher", async move {
94            batcher.run().await;
95            Ok(())
96        });
97
98        info!(
99            "✓ EngineJournalBatcher enabled (wal_first=true, group_commit=true, batch_size={})",
100            batcher_config.max_batch_size,
101        );
102
103        (journal_writer, Some(batch_sender))
104    } else {
105        info!("Journal disabled (ENGINE_JOURNAL_ENABLED not set)");
106        crate::journal::mock_journal_backends()
107    };
108
109    Ok(JournalResources {
110        engine_journal_writer,
111        journal_batch_sender,
112    })
113}
114
115#[cfg(test)]
116mod tests {
117    use super::*;
118    use serial_test::serial;
119
120    #[test]
121    #[serial]
122    fn disabled_journal_uses_mock_writer_without_batch_sender() {
123        let previous = std::env::var("ENGINE_JOURNAL_ENABLED").ok();
124        std::env::remove_var("ENGINE_JOURNAL_ENABLED");
125
126        let runtime = BackendRuntime {
127            config: crate::backend_config::BackendConfig::default(),
128            secrets: crate::backend_config::BackendSecrets::default(),
129        };
130        let db_auth = DbAuthConfig::password("postgres://user:pass@localhost/hypercall_test");
131        let shutdown = Shutdown::new();
132        let mut task_group = TaskGroup::new();
133
134        let resources =
135            build_journal_resources(&runtime, &db_auth, &shutdown, &mut task_group).unwrap();
136
137        assert!(resources.engine_journal_writer.is_some());
138        assert!(resources.journal_batch_sender.is_none());
139        assert!(task_group.is_empty());
140
141        match previous {
142            Some(value) => std::env::set_var("ENGINE_JOURNAL_ENABLED", value),
143            None => std::env::remove_var("ENGINE_JOURNAL_ENABLED"),
144        }
145    }
146}