Skip to main content

hypercall/journal/
engine_journal_batcher.rs

1//! WAL-first journal batcher for durable ACKs in the engine hot path.
2//!
3//! The engine pushes `JournalEntry` values into a bounded channel.
4//! This task drains entries, appends them to a local WAL, calls `fdatasync`,
5//! and only then allows ACK/event emission via `commit_ack`.
6//!
7//! PostgreSQL replication runs asynchronously after WAL durability so
8//! client-visible latency is decoupled from network/database I/O.
9
10use crate::observability::command_trace::EngineStateDigest;
11#[cfg(feature = "rsm-state")]
12use crate::rsm::commitment_store::{
13    ValidatorRsmCommitmentStoreConfig, ValidatorRsmStateCommitmentRuntime,
14};
15#[cfg(feature = "rsm-state")]
16use crate::rsm::engine_snapshot::EngineStateDigest as RsmEngineStateDigest;
17use hypercall_journal::checkpoint::{
18    checkpoint_path_for as checkpoint_path_for_wal, read_checkpoint, write_checkpoint,
19    WalCheckpointMetadata,
20};
21#[cfg(feature = "rsm-state")]
22mod rsm_block_persistence;
23#[cfg(feature = "rsm-state")]
24use hypercall_blocks::SigningAuthority;
25#[cfg(feature = "rsm-state")]
26use hypercall_blocks::{BlockLogEntry, BlockLogHeader};
27use hypercall_db::EngineJournalBatchWriter;
28#[cfg(feature = "rsm-state")]
29use hypercall_db::ValidatorRsmStateReader;
30use hypercall_journal::frame::{write_frame, WAL_CRC};
31#[cfg(feature = "rsm-state")]
32use hypercall_state_commitment::{
33    leaves::GlobalLeaf,
34    pipeline::{PreparedBatchCommitment, StateDelta},
35};
36use metrics::{counter, gauge, histogram};
37#[cfg(feature = "rsm-state")]
38use rsm_block_persistence::{RsmBlockPersistenceInput, RsmBlockPersistenceMode};
39use rust_decimal::Decimal;
40use serde::{Deserialize, Serialize};
41use std::fs::{create_dir_all, File, OpenOptions};
42use std::io::{BufWriter, Read, Seek, SeekFrom, Write};
43use std::path::{Path, PathBuf};
44use std::sync::Arc;
45use std::time::{Duration, Instant};
46use tokio::sync::{mpsc, oneshot};
47use tracing::{debug, error, info, info_span, warn, Instrument};
48
49#[cfg(target_os = "linux")]
50use std::os::unix::io::AsRawFd;
51
52/// Default channel capacity
53pub const DEFAULT_CHANNEL_CAPACITY: usize = 3000;
54
55const REPLICATION_RETRY_DELAY: Duration = Duration::from_millis(250);
56
57/// Pre-allocate WAL file in 64 MiB chunks to avoid extent allocation stalls.
58#[cfg(target_os = "linux")]
59const WAL_PREALLOC_CHUNK: u64 = 64 * 1024 * 1024;
60
61/// Messages sent from the batcher task to the dedicated WAL writer thread.
62enum WalWriterMessage {
63    WriteBatch {
64        entries: Vec<JournalEntry>,
65        ack_senders: Vec<Option<oneshot::Sender<()>>>,
66        /// If Some, fired after fsync completes (used by Flush callers).
67        completion: Option<oneshot::Sender<()>>,
68    },
69    /// Barrier: fires completion after all prior WriteBatches have been fsynced.
70    /// Used by Flush when the batcher's local batch is empty but prior writes
71    /// may still be in-flight on this thread.
72    FlushBarrier {
73        completion: oneshot::Sender<()>,
74    },
75    Shutdown,
76}
77
78/// Pre-serialized event payload ready for DB storage.
79/// Contains wire-format bytes (version byte + msgpack) so no further serialization is needed.
80#[derive(Debug, Clone, Serialize, Deserialize)]
81pub struct EventPayload {
82    pub event_topic: String,
83    pub event_key: Option<String>,
84    pub event_data: Vec<u8>, // version byte + inner msgpack
85    pub l2_sequence: Option<i64>,
86    pub event_type_enum: hypercall_db_diesel::engine_enums::EventType,
87}
88
89/// Journal-only accounting deltas for a single OrderFilled event.
90///
91/// These are computed on the hot path from the pre-fill portfolio state and
92/// persisted in WAL so the async journal replicator can materialize durable
93/// ledger effects without re-running business logic against current state.
94#[derive(Debug, Clone, Serialize, Deserialize)]
95pub struct JournalFillSideEffect {
96    pub event_idx: i32,
97    pub trade_id: u64,
98    pub taker_ledger_delta: Decimal,
99    pub maker_ledger_delta: Decimal,
100    pub taker_premium_delta: Decimal,
101    pub maker_premium_delta: Decimal,
102    #[serde(default)]
103    pub underlying_notional: Option<Decimal>,
104}
105
106impl JournalFillSideEffect {
107    pub fn from_fill_accounting(
108        event_idx: i32,
109        accounting: &hypercall_types::FillAccounting,
110    ) -> Self {
111        accounting.assert_cash_decomposition();
112        Self {
113            event_idx,
114            trade_id: accounting.trade_id,
115            taker_ledger_delta: accounting.taker_ledger_residual_delta(),
116            maker_ledger_delta: accounting.maker_ledger_residual_delta(),
117            taker_premium_delta: accounting.taker_premium_delta(),
118            maker_premium_delta: accounting.maker_premium_delta(),
119            underlying_notional: None,
120        }
121    }
122}
123
124/// Journal-only side effect for a CashWithdrawalUpdate command.
125///
126/// Persisted in WAL so the async journal replicator can materialize durable
127/// DB balance decrements without re-running business logic.
128#[derive(Debug, Clone, Serialize, Deserialize)]
129pub struct JournalCashWithdrawalSideEffect {
130    pub wallet: hypercall_types::WalletAddress,
131    pub request_id: String,
132    pub amount: Decimal,
133    pub balance_after: Decimal,
134    pub timestamp_ms: u64,
135}
136
137/// A journal entry to be batched and durably appended.
138#[derive(Debug)]
139pub struct JournalEntry {
140    pub received_ts_ms: u64,
141    /// Wire-format bytes: [version: u8][msgpack payload]
142    pub command_data: Vec<u8>,
143    /// Wire-format bytes: [version: u8][msgpack payload]
144    pub response_data: Option<Vec<u8>>,
145    /// Extracted order_id for indexed column
146    pub order_id: Option<i64>,
147    pub pre_digest: EngineStateDigest,
148    pub post_digest: EngineStateDigest,
149    pub duration_ms: u64,
150    pub events: Vec<EventPayload>,
151    pub outbox_appends: Vec<crate::directive_outbox::DirectiveOutboxAppend>,
152    pub fill_side_effects: Vec<JournalFillSideEffect>,
153    pub cash_withdrawal_side_effect: Option<JournalCashWithdrawalSideEffect>,
154    pub balance_updates: Vec<hypercall_types::BalanceUpdate>,
155    pub created_at: Instant,
156    /// Signalled after the entry is durably appended to WAL + fdatasync.
157    pub commit_ack: Option<oneshot::Sender<()>>,
158    pub request_uuid: hypercall_db_diesel::engine_enums::DbUuid,
159    /// None for non-engine commands (e.g., TransactionRequests passthrough).
160    pub command_type_enum: Option<hypercall_db_diesel::engine_enums::CommandType>,
161    /// Command identity hash for validator RSM batch metadata.
162    #[cfg(feature = "rsm-state")]
163    pub command_identity_hash: [u8; 32],
164    /// Full restart-sensitive engine state digest for RSM block roots.
165    #[cfg(feature = "rsm-state")]
166    pub rsm_state_digest: Option<RsmEngineStateDigest>,
167}
168
169/// Configuration for the journal batcher
170#[derive(Debug, Clone)]
171pub struct JournalBatcherConfig {
172    /// Channel capacity (backpressure threshold)
173    pub channel_capacity: usize,
174    /// Maximum batch size for WAL append and replication chunks
175    pub max_batch_size: usize,
176    /// Maximum wait time before flushing a partial batch (ms)
177    pub flush_interval_ms: u64,
178    /// Whether to persist digests
179    pub persist_digests: bool,
180    /// WAL path used for durable local staging.
181    pub wal_path: PathBuf,
182    /// Environment partition for validator RSM block metadata.
183    #[cfg(feature = "rsm-state")]
184    pub rsm_environment: hypercall_db::ValidatorRsmEnvironment,
185    #[cfg(feature = "rsm-state")]
186    pub rsm_commitment_store: ValidatorRsmCommitmentStoreConfig,
187}
188
189impl Default for JournalBatcherConfig {
190    fn default() -> Self {
191        Self {
192            channel_capacity: DEFAULT_CHANNEL_CAPACITY,
193            max_batch_size: 100,
194            flush_interval_ms: 10,
195            persist_digests: true,
196            wal_path: PathBuf::from(hypercall_journal::checkpoint::DEFAULT_WAL_PATH),
197            #[cfg(feature = "rsm-state")]
198            rsm_environment: hypercall_db::ValidatorRsmEnvironment::Development,
199            #[cfg(feature = "rsm-state")]
200            rsm_commitment_store: ValidatorRsmCommitmentStoreConfig {
201                root_dir: PathBuf::from("target/validator-rsm-state"),
202                prune_window: 10_000,
203                pruning_enabled: false,
204            },
205        }
206    }
207}
208
209impl JournalBatcherConfig {
210    pub fn from_config(config: &crate::backend_config::JournalRuntimeConfig) -> Self {
211        Self {
212            channel_capacity: config.batch_channel_capacity,
213            max_batch_size: config.batch_size,
214            flush_interval_ms: config.flush_interval_ms,
215            persist_digests: config.digests_enabled,
216            wal_path: hypercall_journal::checkpoint::wal_path_from_config(config.wal_path.as_ref()),
217            #[cfg(feature = "rsm-state")]
218            rsm_environment: config.rsm_environment.into(),
219            #[cfg(feature = "rsm-state")]
220            rsm_commitment_store: ValidatorRsmCommitmentStoreConfig {
221                root_dir: config.rsm_state.state_store_path.clone(),
222                prune_window: config.rsm_state.prune_window_versions,
223                pruning_enabled: config.rsm_state.pruning_enabled,
224            },
225        }
226    }
227}
228
229/// Messages sent to the journal batcher.
230pub enum JournalMessage {
231    /// A journal entry to be batched and appended to WAL.
232    Entry(JournalEntry),
233    /// Flush all pending entries to WAL and signal completion.
234    /// Used before orderbook snapshots to ensure snapshot is consistent with journal durability.
235    Flush(oneshot::Sender<()>),
236}
237
238/// Sender half for pushing journal entries
239pub type JournalBatchSender = mpsc::Sender<JournalMessage>;
240
241enum ReplicationMessage {
242    Entries(Vec<WalRecordWithOffset>),
243    Shutdown,
244    /// Barrier: replicate all pending entries, then fire the ack.
245    /// Used by Flush to ensure the checkpoint is up-to-date before snapshot writes.
246    FlushBarrier(oneshot::Sender<()>),
247}
248
249#[derive(Debug, Clone, Copy)]
250struct ReplicationProgress {
251    last_command_id: i64,
252    max_l2_seq: i64,
253}
254
255#[cfg(feature = "rsm-state")]
256struct RsmWalWriterState {
257    commitments: ValidatorRsmStateCommitmentRuntime,
258    next_batch_seq: u64,
259    prev_block_hash: [u8; 32],
260    global_command_seq: u64,
261    block_signer: Option<alloy::signers::local::PrivateKeySigner>,
262}
263
264#[cfg(feature = "rsm-state")]
265struct PreparedWalBlock {
266    block: WalBlockRecordWithRoots,
267    prepared_commitment: PreparedBatchCommitment,
268}
269
270#[cfg(feature = "rsm-state")]
271impl RsmWalWriterState {
272    fn prepare_block(&mut self, entries: &[JournalEntry]) -> Option<PreparedWalBlock> {
273        let identity_hashes = engine_command_identity_hashes(entries);
274        if identity_hashes.is_empty() {
275            return None;
276        }
277        let timestamp = entries.last().map(|e| e.received_ts_ms).unwrap_or(0);
278        let signer = self
279            .block_signer
280            .as_ref()
281            .map(|signer| *signer.address().as_ref())
282            .unwrap_or([0u8; 20]);
283        let delta =
284            rsm_state_delta_from_entries(entries, &identity_hashes, self.global_command_seq)
285                .unwrap_or_else(|error| {
286                    panic!(
287                        "CRITICAL_FAILURE: failed to materialize validator RSM state delta: {}",
288                        error
289                    )
290                });
291        let prepared_commitment =
292            self.commitments
293                .prepare_rsm_batch(&delta)
294                .unwrap_or_else(|error| {
295                    panic!(
296                        "CRITICAL_FAILURE: failed to prepare validator RSM state batch: {}",
297                        error
298                    )
299                });
300        let roots: RsmBatchRoots = prepared_commitment.commitment.clone().into();
301
302        let mut block = WalBlockRecord::unsigned_from_identity_hashes(
303            self.next_batch_seq,
304            self.prev_block_hash,
305            &identity_hashes,
306            roots.batch_root,
307            self.global_command_seq,
308            timestamp,
309            signer,
310        );
311        block.signature = self.sign_block(&block);
312
313        Some(PreparedWalBlock {
314            block: WalBlockRecordWithRoots {
315                header: block,
316                roots,
317            },
318            prepared_commitment,
319        })
320    }
321
322    fn apply_prepared_block(&mut self, prepared: PreparedWalBlock) -> WalBlockRecordWithRoots {
323        self.commitments
324            .apply_prepared_rsm_batch(prepared.prepared_commitment)
325            .unwrap_or_else(|error| {
326                panic!(
327                    "CRITICAL_FAILURE: failed to apply validator RSM state batch after WAL fsync: {}",
328                    error
329                )
330            });
331        self.advance_block_state(&prepared.block.header);
332        prepared.block
333    }
334
335    fn sign_block(&self, unsigned: &WalBlockRecord) -> Vec<u8> {
336        let Some(ref signer) = self.block_signer else {
337            return Vec::new();
338        };
339
340        use alloy::primitives::B256;
341        use alloy::signers::SignerSync;
342
343        let hash = B256::from(unsigned.block_hash());
344        match signer.sign_hash_sync(&hash) {
345            Ok(sig) => {
346                let mut sig_bytes = [0u8; 65];
347                sig_bytes[..32].copy_from_slice(&sig.r().to_be_bytes::<32>());
348                sig_bytes[32..64].copy_from_slice(&sig.s().to_be_bytes::<32>());
349                sig_bytes[64] = sig.v() as u8;
350                sig_bytes.to_vec()
351            }
352            Err(e) => {
353                tracing::error!("Failed to sign block {}: {}", unsigned.batch_seq, e);
354                Vec::new()
355            }
356        }
357    }
358
359    fn advance_block_state(&mut self, block: &WalBlockRecord) {
360        let block_hash = block.block_hash();
361        let signed = !block.signature.is_empty();
362        tracing::debug!(
363            batch_seq = block.batch_seq,
364            command_count = block.command_count,
365            first_seq = block.first_seq,
366            block_hash = hex::encode(block_hash),
367            signed,
368            "Block produced"
369        );
370
371        self.prev_block_hash = block_hash;
372        self.next_batch_seq += 1;
373        self.global_command_seq += block.command_count;
374
375        counter!("ht_blocks_produced_total").increment(1);
376    }
377}
378
379/// The batcher that runs in the background and appends to local WAL first,
380/// then replicates to PostgreSQL asynchronously.
381pub struct EngineJournalBatcher {
382    pool: crate::db_handler::DbPool,
383    receiver: mpsc::Receiver<JournalMessage>,
384    config: JournalBatcherConfig,
385    shutdown: tokio::sync::broadcast::Receiver<()>,
386    /// Sender half of the bounded channel to the dedicated WAL writer thread.
387    wal_writer_tx: std::sync::mpsc::SyncSender<WalWriterMessage>,
388    /// Join handle for the WAL writer OS thread (taken during shutdown).
389    wal_writer_handle: Option<std::thread::JoinHandle<()>>,
390    replication_tx: mpsc::Sender<ReplicationMessage>,
391    replication_rx: Option<mpsc::Receiver<ReplicationMessage>>,
392    /// Environment partition for persisted RSM roots and block headers.
393    #[cfg(feature = "rsm-state")]
394    rsm_environment: hypercall_db::ValidatorRsmEnvironment,
395}
396
397impl EngineJournalBatcher {
398    /// Create a new batcher and return the sender for pushing entries.
399    pub fn new(
400        pool: crate::db_handler::DbPool,
401        config: JournalBatcherConfig,
402        shutdown: tokio::sync::broadcast::Receiver<()>,
403        cold_signer: Option<alloy::signers::local::PrivateKeySigner>,
404    ) -> (Self, JournalBatchSender) {
405        let (tx, rx) = mpsc::channel(config.channel_capacity);
406
407        // Create WalState directly — no Arc/Mutex needed, the writer thread owns it exclusively.
408        let mut wal_state = WalState::open(config.wal_path.clone()).unwrap_or_else(|e| {
409            panic!(
410                "CRITICAL_FAILURE: Failed to initialize WAL state: {}. Restart required.",
411                e
412            )
413        });
414
415        #[cfg(feature = "rsm-state")]
416        let rsm_environment = config.rsm_environment;
417        #[cfg(feature = "rsm-state")]
418        let mut commitments = ValidatorRsmStateCommitmentRuntime::open_from_db(
419            config.rsm_commitment_store.clone(),
420            Arc::new(pool.clone()),
421            rsm_environment,
422        )
423        .unwrap_or_else(|error| {
424            panic!(
425                "CRITICAL_FAILURE: failed to initialize validator RSM state store: {}",
426                error
427            )
428        });
429
430        // Run WAL recovery synchronously before spawning the writer thread.
431        // If recovery truncated torn bytes, fix up the write position so the
432        // writer thread appends at the correct offset.
433        if let Some(truncated_len) = Self::recover_wal_sync(
434            &pool,
435            &wal_state.wal_path,
436            &wal_state.checkpoint_path,
437            config.persist_digests,
438            config.max_batch_size,
439            #[cfg(feature = "rsm-state")]
440            config.rsm_environment,
441            #[cfg(feature = "rsm-state")]
442            Some(&mut commitments),
443        ) {
444            wal_state.next_offset = truncated_len;
445            wal_state.preallocated_end = truncated_len;
446        }
447
448        let (replication_tx, replication_rx) = mpsc::channel(config.channel_capacity);
449
450        // Bounded sync channel for batcher → writer thread communication.
451        let (wal_writer_tx, wal_writer_rx) = std::sync::mpsc::sync_channel(config.channel_capacity);
452
453        // Set up hot key delegation if cold signer is provided.
454        #[cfg(feature = "rsm-state")]
455        let mut block_signer = None;
456        #[cfg(feature = "rsm-state")]
457        if let Some(cold_signer) = cold_signer {
458            use alloy::primitives::B256;
459            use alloy::signers::SignerSync;
460
461            let hot_signer = alloy::signers::local::PrivateKeySigner::random();
462            let hot_address = hot_signer.address();
463            let cold_address = cold_signer.address();
464
465            let now_ms = std::time::SystemTime::now()
466                .duration_since(std::time::UNIX_EPOCH)
467                .unwrap()
468                .as_millis() as u64;
469            let valid_until = now_ms + 24 * 60 * 60 * 1000; // 24 hours
470
471            let mut authority = SigningAuthority {
472                delegator: *cold_address.as_ref(),
473                delegate: *hot_address.as_ref(),
474                valid_from: now_ms,
475                valid_until,
476                signature: Vec::new(),
477            };
478
479            let delegation_hash = B256::from(authority.delegation_hash());
480            match cold_signer.sign_hash_sync(&delegation_hash) {
481                Ok(sig) => {
482                    let mut sig_bytes = [0u8; 65];
483                    sig_bytes[..32].copy_from_slice(&sig.r().to_be_bytes::<32>());
484                    sig_bytes[32..64].copy_from_slice(&sig.s().to_be_bytes::<32>());
485                    sig_bytes[64] = sig.v() as u8;
486                    authority.signature = sig_bytes.to_vec();
487
488                    info!(
489                        cold_key = %cold_address,
490                        hot_key = %hot_address,
491                        valid_until_h = (valid_until - now_ms) / 3_600_000,
492                        "Block signing: cold key delegated to ephemeral hot key (24h)"
493                    );
494
495                    block_signer = Some(hot_signer);
496                }
497                Err(e) => {
498                    error!(
499                        "Failed to sign delegation with cold key: {}. Blocks will be unsigned.",
500                        e
501                    );
502                }
503            }
504        }
505        #[cfg(not(feature = "rsm-state"))]
506        let _ = cold_signer;
507        #[cfg(feature = "rsm-state")]
508        let (next_batch_seq, prev_block_hash, global_command_seq) =
509            Self::load_rsm_block_cursor(&pool, rsm_environment);
510
511        #[cfg(feature = "rsm-state")]
512        let rsm_writer_state = RsmWalWriterState {
513            commitments,
514            next_batch_seq,
515            prev_block_hash,
516            global_command_seq,
517            block_signer,
518        };
519
520        // Spawn dedicated WAL writer OS thread. It owns WalState and the RSM
521        // commitment runtime so RocksDB mutation happens after WAL fsync.
522        let replication_tx_for_thread = replication_tx.clone();
523        let wal_writer_handle = std::thread::Builder::new()
524            .name("wal-writer".to_string())
525            .spawn(move || {
526                wal_writer_thread_main(
527                    wal_state,
528                    wal_writer_rx,
529                    replication_tx_for_thread,
530                    #[cfg(feature = "rsm-state")]
531                    rsm_writer_state,
532                );
533            })
534            .expect("CRITICAL_FAILURE: failed to spawn wal-writer thread");
535
536        let batcher = Self {
537            pool,
538            receiver: rx,
539            config,
540            shutdown,
541            wal_writer_tx,
542            wal_writer_handle: Some(wal_writer_handle),
543            replication_tx,
544            replication_rx: Some(replication_rx),
545            #[cfg(feature = "rsm-state")]
546            rsm_environment,
547        };
548        (batcher, tx)
549    }
550
551    #[cfg(feature = "rsm-state")]
552    fn load_rsm_block_cursor(
553        pool: &crate::db_handler::DbPool,
554        rsm_environment: hypercall_db::ValidatorRsmEnvironment,
555    ) -> (u64, [u8; 32], u64) {
556        let handler =
557            hypercall_db_diesel::DatabaseHandler::with_pool_no_migrations(Arc::new(pool.clone()));
558        match handler.get_latest_rsm_block_sync(rsm_environment) {
559            Ok(Some(block)) => {
560                let next_batch_seq = block.block.height + 1;
561                let prev_block_hash = block.block.hash;
562                let global_command_seq = block.block.last_command_seq + 1;
563                info!(
564                    environment = %rsm_environment,
565                    next_batch_seq = next_batch_seq,
566                    global_command_seq = global_command_seq,
567                    "Loaded validator RSM block cursor"
568                );
569                (next_batch_seq, prev_block_hash, global_command_seq)
570            }
571            Ok(None) => {
572                info!(
573                    environment = %rsm_environment,
574                    "No validator RSM block cursor found; starting at genesis"
575                );
576                (0, [0u8; 32], 0)
577            }
578            Err(error) => {
579                panic!(
580                    "CRITICAL_FAILURE: failed to load validator RSM block cursor: {}",
581                    error
582                );
583            }
584        }
585    }
586
587    /// Run streaming WAL recovery. Returns `Some(truncated_len)` if the WAL
588    /// was truncated to remove torn bytes, so the caller can fix up WalState.
589    fn recover_wal_sync(
590        pool: &crate::db_handler::DbPool,
591        wal_path: &Path,
592        checkpoint_path: &Path,
593        persist_digests: bool,
594        max_batch_size: usize,
595        #[cfg(feature = "rsm-state")] rsm_environment: hypercall_db::ValidatorRsmEnvironment,
596        #[cfg(feature = "rsm-state")] mut rsm_commitments: Option<
597            &mut ValidatorRsmStateCommitmentRuntime,
598        >,
599    ) -> Option<u64> {
600        // Stream WAL recovery in bounded-memory chunks instead of loading the
601        // entire unreplicated tail into a Vec.  This prevents OOM when the
602        // unreplicated tail is much larger than available RAM.
603
604        if !wal_path.exists() {
605            return None;
606        }
607
608        let checkpoint = read_checkpoint(checkpoint_path).unwrap_or_else(|e| {
609            panic!(
610                "CRITICAL_FAILURE: Failed to read WAL checkpoint at startup: {}. Restart required.",
611                e
612            )
613        });
614
615        // Open read-write so we can truncate torn bytes at EOF.
616        let mut file = OpenOptions::new()
617            .read(true)
618            .write(true)
619            .open(wal_path)
620            .unwrap_or_else(|e| {
621                panic!(
622                    "CRITICAL_FAILURE: Failed to open WAL {} for replay: {}. Restart required.",
623                    wal_path.display(),
624                    e
625                )
626            });
627
628        let file_len = file
629            .metadata()
630            .unwrap_or_else(|e| {
631                panic!(
632                    "CRITICAL_FAILURE: Failed to stat WAL {}: {}",
633                    wal_path.display(),
634                    e
635                )
636            })
637            .len();
638
639        if checkpoint.wal_offset > file_len {
640            panic!(
641                "CRITICAL_FAILURE: WAL checkpoint {} exceeds file length {} for {}",
642                checkpoint.wal_offset,
643                file_len,
644                wal_path.display()
645            );
646        }
647
648        if checkpoint.wal_offset == file_len {
649            // Nothing to recover -- checkpoint is at end of WAL.
650            return None;
651        }
652
653        file.seek(SeekFrom::Start(checkpoint.wal_offset))
654            .unwrap_or_else(|e| panic!("CRITICAL_FAILURE: Failed to seek WAL replay start: {}", e));
655
656        info!(
657            wal_path = %wal_path.display(),
658            checkpoint_path = %checkpoint_path.display(),
659            checkpoint_offset = checkpoint.wal_offset,
660            file_len = file_len,
661            "Streaming WAL recovery to PostgreSQL before engine startup"
662        );
663
664        let batch_size = max_batch_size.max(1);
665        let mut offset = checkpoint.wal_offset;
666        // Tracks the offset after the last fully-parsed record so we can
667        // truncate torn bytes that would prevent future records from being read.
668        let mut last_good_offset = checkpoint.wal_offset;
669        let mut total_recovered: u64 = 0;
670        let mut hit_torn_record = false;
671
672        // Outer loop: read and process one batch at a time.
673        loop {
674            let mut batch: Vec<WalRecordWithOffset> = Vec::with_capacity(batch_size);
675
676            // Inner loop: fill one batch of up to `batch_size` commands.
677            for _ in 0..batch_size {
678                if batch.len() >= batch_size {
679                    break;
680                }
681
682                let record_start_offset = offset;
683
684                // Read 4-byte length prefix.
685                let mut len_buf = [0u8; 4];
686                match file.read_exact(&mut len_buf) {
687                    Ok(()) => {}
688                    Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
689                        hit_torn_record = true;
690                        break;
691                    }
692                    Err(e) => {
693                        panic!(
694                            "CRITICAL_FAILURE: Failed reading WAL length prefix at offset {}: {}",
695                            offset, e
696                        );
697                    }
698                }
699
700                let payload_len = u32::from_le_bytes(len_buf) as usize;
701                if payload_len == 0 {
702                    panic!(
703                        "CRITICAL_FAILURE: WAL record length was zero at offset {} in {}. Data corruption detected.",
704                        offset,
705                        wal_path.display()
706                    );
707                }
708                offset += 4;
709
710                // Guard: a corrupt length prefix could claim gigabytes and OOM
711                // the process before read_exact even gets a chance to fail.
712                let remaining = file_len.saturating_sub(offset);
713                if payload_len as u64 > remaining {
714                    warn!(
715                        "WAL declared payload_len={} but only {} bytes remain at offset {} in {}, \
716                         truncating tail and replaying prior durable records only",
717                        payload_len,
718                        remaining,
719                        offset,
720                        wal_path.display(),
721                    );
722                    hit_torn_record = true;
723                    break;
724                }
725
726                // Read 4-byte CRC.
727                let mut crc_buf = [0u8; 4];
728                match file.read_exact(&mut crc_buf) {
729                    Ok(()) => {}
730                    Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
731                        warn!(
732                            "WAL ended with incomplete CRC header at offset {} in {} ({}), \
733                             truncating tail and replaying prior durable records only",
734                            offset,
735                            wal_path.display(),
736                            e
737                        );
738                        hit_torn_record = true;
739                        break;
740                    }
741                    Err(e) => {
742                        panic!(
743                            "CRITICAL_FAILURE: Failed reading WAL CRC prefix at offset {}: {}",
744                            offset, e
745                        );
746                    }
747                }
748                offset += 4;
749
750                let expected_crc = u32::from_le_bytes(crc_buf);
751                let mut payload = vec![0u8; payload_len];
752
753                // Read payload.
754                match file.read_exact(&mut payload) {
755                    Ok(()) => {}
756                    Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
757                        warn!(
758                            "WAL ended with incomplete payload at offset {} in {} ({}), \
759                             truncating tail and replaying prior durable records only",
760                            offset,
761                            wal_path.display(),
762                            e
763                        );
764                        hit_torn_record = true;
765                        break;
766                    }
767                    Err(e) => {
768                        panic!(
769                            "CRITICAL_FAILURE: Failed reading WAL payload at offset {}: {}",
770                            offset, e
771                        );
772                    }
773                }
774                offset += payload_len as u64;
775
776                let actual_crc = WAL_CRC.checksum(&payload);
777                if actual_crc != expected_crc {
778                    panic!(
779                        "CRITICAL_FAILURE: WAL CRC mismatch at end_offset {} in {}: expected {}, got {}. Data corruption detected.",
780                        offset,
781                        wal_path.display(),
782                        expected_crc,
783                        actual_crc
784                    );
785                }
786
787                let mut decoded_records = Vec::new();
788                #[cfg(feature = "rsm-state")]
789                if let Ok(block_entry) = rmp_serde::from_slice::<WalDurableBlockEntry>(&payload) {
790                    let header = block_entry.header;
791                    let roots = block_entry.roots;
792                    for (index, cmd) in block_entry.commands.into_iter().enumerate() {
793                        decoded_records.push(WalRecordWithOffset {
794                            record: cmd,
795                            end_offset: offset,
796                            block_header: (index == 0).then_some(header.clone()),
797                            block_roots: (index == 0).then_some(roots),
798                        });
799                    }
800                } else if rmp_serde::from_slice::<WalBlockEntry>(&payload).is_ok() {
801                    panic!(
802                        "CRITICAL_FAILURE: legacy rootless RSM WAL block at end_offset {} is unsupported after durable root cutover",
803                        offset
804                    );
805                } else {
806                    match rmp_serde::from_slice::<WalJournalRecord>(&payload) {
807                        Ok(record) => {
808                            decoded_records.push(WalRecordWithOffset {
809                                record,
810                                end_offset: offset,
811                                block_header: None,
812                                block_roots: None,
813                            });
814                        }
815                        Err(_) => {
816                            if rmp_serde::from_slice::<WalBlockRecord>(&payload).is_ok() {
817                                tracing::debug!(
818                                    end_offset = offset,
819                                    "Skipping legacy WAL block record during recovery"
820                                );
821                                last_good_offset = offset;
822                                continue;
823                            }
824
825                            panic!(
826                                "CRITICAL_FAILURE: Failed to deserialize WAL payload at end_offset {}",
827                                offset
828                            );
829                        }
830                    }
831                }
832                #[cfg(not(feature = "rsm-state"))]
833                match rmp_serde::from_slice::<WalJournalRecord>(&payload) {
834                    Ok(record) => {
835                        decoded_records.push(WalRecordWithOffset {
836                            record,
837                            end_offset: offset,
838                        });
839                    }
840                    Err(_) => {
841                        panic!(
842                            "CRITICAL_FAILURE: Failed to deserialize WAL payload at end_offset {}",
843                            offset
844                        );
845                    }
846                }
847
848                if should_defer_replay_record(batch.len(), decoded_records.len(), batch_size) {
849                    file.seek(SeekFrom::Start(record_start_offset))
850                        .unwrap_or_else(|e| {
851                            panic!(
852                                "CRITICAL_FAILURE: Failed to rewind WAL replay batch boundary: {}",
853                                e
854                            )
855                        });
856                    offset = record_start_offset;
857                    break;
858                }
859
860                if decoded_records.len() > batch_size {
861                    warn!(
862                        commands = decoded_records.len(),
863                        batch_size = batch_size,
864                        end_offset = offset,
865                        "WAL replay physical record exceeds command batch size; replaying it atomically"
866                    );
867                }
868
869                for record in decoded_records {
870                    batch.push(WalRecordWithOffset {
871                        record: record.record,
872                        end_offset: record.end_offset,
873                        #[cfg(feature = "rsm-state")]
874                        block_header: record.block_header,
875                        #[cfg(feature = "rsm-state")]
876                        block_roots: record.block_roots,
877                    });
878                }
879
880                last_good_offset = offset;
881            }
882
883            // If no records were read in this iteration, we are done.
884            if batch.is_empty() {
885                break;
886            }
887
888            let batch_len = batch.len() as u64;
889
890            #[cfg(feature = "rsm-state")]
891            if let Some(commitments) = rsm_commitments.as_deref_mut() {
892                apply_recovered_rsm_commitment_blocks(&mut batch, commitments).unwrap_or_else(
893                    |error| {
894                        panic!(
895                            "CRITICAL_FAILURE: WAL replay failed to rebuild local validator RSM state before root persistence: {}",
896                            error
897                        )
898                    },
899                );
900            }
901
902            // Insert this chunk into PostgreSQL.
903            let db_entries: Vec<JournalEntry> =
904                batch.iter().map(WalRecordWithOffset::to_db_entry).collect();
905            let (_inserted_count, mut inserted_commands) = match Self::insert_batch(
906                pool,
907                &db_entries,
908                persist_digests,
909                #[cfg(feature = "rsm-state")]
910                Some(
911                    Self::rsm_block_persistence_batch(RsmBlockPersistenceInput {
912                        environment: rsm_environment,
913                        records: &batch,
914                        mode: RsmBlockPersistenceMode::Replay,
915                    })
916                    .unwrap_or_else(|error| {
917                        panic!(
918                            "CRITICAL_FAILURE: WAL replay failed to prepare RSM block persistence: {}",
919                            error
920                        )
921                    }),
922                ),
923            ) {
924                Ok(result) => result,
925                Err(e) => {
926                    panic!(
927                        "CRITICAL_FAILURE: WAL replay insert failed: {}. Startup cannot continue.",
928                        e
929                    );
930                }
931            };
932
933            if inserted_commands.is_empty() {
934                let request_ids: Vec<hypercall_db_diesel::engine_enums::DbUuid> = batch
935                    .iter()
936                    .map(|record| record.record.request_uuid)
937                    .collect();
938                inserted_commands =
939                    Self::lookup_command_ids_by_request_ids(pool, &request_ids).unwrap_or_else(
940                        |e| {
941                            panic!(
942                                "CRITICAL_FAILURE: WAL replay command_id lookup failed: {}. Startup cannot continue.",
943                                e
944                            )
945                        },
946                    );
947                if inserted_commands.is_empty() {
948                    panic!(
949                        "CRITICAL_FAILURE: WAL replay insert and command_id lookup returned empty for non-empty chunk."
950                    );
951                }
952            }
953
954            let progress = ReplicationProgress {
955                last_command_id: inserted_commands
956                    .iter()
957                    .map(|(_, command_id)| *command_id)
958                    .max()
959                    .expect("WAL replay command IDs unexpectedly empty"),
960                max_l2_seq: batch
961                    .iter()
962                    .flat_map(|record| record.record.events.iter().filter_map(|e| e.l2_sequence))
963                    .max()
964                    .unwrap_or(0),
965            };
966
967            let last_offset = batch
968                .last()
969                .expect("WAL replay chunk unexpectedly empty")
970                .end_offset;
971
972            // Advance checkpoint after each chunk so crash-recovery resumes here.
973            if let Err(e) = Self::advance_checkpoint(checkpoint_path, last_offset, progress) {
974                panic!(
975                    "CRITICAL_FAILURE: Failed to advance WAL checkpoint after replay: {}",
976                    e
977                );
978            }
979
980            total_recovered += batch_len;
981
982            // Drop the batch to free memory before reading the next chunk.
983            drop(batch);
984
985            debug!(
986                total_recovered = total_recovered,
987                current_offset = offset,
988                file_len = file_len,
989                "WAL streaming recovery progress"
990            );
991        }
992
993        // Truncate torn bytes so the writer thread appends cleanly after the
994        // last complete record.  Without this, the partial bytes sit between
995        // the checkpoint and the next write, and the next restart would re-parse
996        // (and skip) them before any new records could be reached.
997        let truncated = if hit_torn_record && last_good_offset < file_len {
998            warn!(
999                "Truncating WAL {} from {} to {} bytes to remove torn tail",
1000                wal_path.display(),
1001                file_len,
1002                last_good_offset
1003            );
1004            file.set_len(last_good_offset).unwrap_or_else(|e| {
1005                panic!(
1006                    "CRITICAL_FAILURE: Failed to truncate WAL {} to {}: {}",
1007                    wal_path.display(),
1008                    last_good_offset,
1009                    e
1010                )
1011            });
1012            Some(last_good_offset)
1013        } else {
1014            None
1015        };
1016
1017        if total_recovered > 0 {
1018            info!(
1019                total_recovered = total_recovered,
1020                "WAL streaming recovery complete"
1021            );
1022        }
1023
1024        counter!("ht_journal_wal_replay_entries_total").increment(total_recovered);
1025        truncated
1026    }
1027
1028    /// Run the batcher loop with group commit.
1029    ///
1030    /// Entries are flushed immediately on arrival. Natural batching occurs when
1031    /// new entries arrive while the writer thread is performing fsync — they
1032    /// accumulate in the channel and get drained in the next iteration.
1033    pub async fn run(mut self) {
1034        info!(
1035            "Starting WAL-first journal batcher (group_commit=true, batch_size={}, digests={})",
1036            self.config.max_batch_size, self.config.persist_digests
1037        );
1038
1039        let flush_interval = Duration::from_millis(self.config.flush_interval_ms);
1040        let wal_path = self.config.wal_path.clone();
1041        let checkpoint_path = checkpoint_path_for_wal(&wal_path);
1042
1043        let replication_rx = self
1044            .replication_rx
1045            .take()
1046            .expect("Replication receiver missing; batcher initialized incorrectly");
1047
1048        let replication_handle = tokio::spawn(Self::replication_loop(
1049            self.pool.clone(),
1050            self.config.persist_digests,
1051            self.config.max_batch_size,
1052            flush_interval,
1053            checkpoint_path,
1054            wal_path,
1055            replication_rx,
1056            #[cfg(feature = "rsm-state")]
1057            self.rsm_environment,
1058        ));
1059
1060        let mut batch: Vec<JournalEntry> = Vec::with_capacity(self.config.max_batch_size);
1061
1062        loop {
1063            tokio::select! {
1064                biased;
1065
1066                _ = self.shutdown.recv() => {
1067                    info!("Journal batcher received shutdown, flushing remaining {} entries", batch.len());
1068                    if !batch.is_empty() {
1069                        self.flush_batch(&mut batch, None).await;
1070                    }
1071
1072                    // CRITICAL: Drain all remaining entries from receiver to WAL.
1073                    let mut drained_count = 0;
1074                    while let Ok(msg) = self.receiver.try_recv() {
1075                        match msg {
1076                            JournalMessage::Entry(entry) => {
1077                                drained_count += 1;
1078                                batch.push(entry);
1079                                if batch.len() >= self.config.max_batch_size {
1080                                    self.flush_batch(&mut batch, None).await;
1081                                }
1082                            }
1083                            JournalMessage::Flush(ack) => {
1084                                let (complete_tx, complete_rx) = oneshot::channel();
1085                                if !batch.is_empty() {
1086                                    self.flush_batch(&mut batch, Some(complete_tx)).await;
1087                                } else {
1088                                    self.send_flush_barrier(complete_tx).await;
1089                                }
1090                                let _ = complete_rx.await;
1091                                self.await_replication_flush().await;
1092                                let _ = ack.send(());
1093                            }
1094                        }
1095                    }
1096
1097                    if !batch.is_empty() {
1098                        // Final flush with completion to ensure durability before shutdown.
1099                        let (complete_tx, complete_rx) = oneshot::channel();
1100                        self.flush_batch(&mut batch, Some(complete_tx)).await;
1101                        let _ = complete_rx.await;
1102                    }
1103
1104                    if drained_count > 0 {
1105                        info!("Drained {} additional entries to WAL during shutdown", drained_count);
1106                    }
1107
1108                    // Shutdown WAL writer thread.
1109                    let _ = self.wal_writer_tx.send(WalWriterMessage::Shutdown);
1110                    if let Some(handle) = self.wal_writer_handle.take() {
1111                        handle.join().expect("CRITICAL_FAILURE: wal-writer thread panicked");
1112                    }
1113
1114                    if let Err(e) = self.replication_tx.send(ReplicationMessage::Shutdown).await {
1115                        warn!("Failed to signal replication shutdown: {}", e);
1116                    }
1117
1118                    match replication_handle.await {
1119                        Ok(()) => {}
1120                        Err(e) => {
1121                            panic!(
1122                                "CRITICAL_FAILURE: Journal replication task panicked during shutdown: {}",
1123                                e
1124                            );
1125                        }
1126                    }
1127
1128                    break;
1129                }
1130
1131                msg = self.receiver.recv() => {
1132                    match msg {
1133                        Some(JournalMessage::Entry(e)) => {
1134                            // Track queue latency (time from engine push to batcher receive)
1135                            let queue_latency = e.created_at.elapsed();
1136                            histogram!("ht_journal_queue_latency_seconds")
1137                                .record(queue_latency.as_secs_f64());
1138
1139                            batch.push(e);
1140
1141                            // Group commit: drain all immediately available entries.
1142                            // Entries that arrived while the previous fsync was in flight
1143                            // get batched together, amortizing the cost of the next fsync.
1144                            while batch.len() < self.config.max_batch_size {
1145                                match self.receiver.try_recv() {
1146                                    Ok(JournalMessage::Entry(e2)) => {
1147                                        let queue_latency = e2.created_at.elapsed();
1148                                        histogram!("ht_journal_queue_latency_seconds")
1149                                            .record(queue_latency.as_secs_f64());
1150                                        batch.push(e2);
1151                                    }
1152                                    Ok(JournalMessage::Flush(ack)) => {
1153                                        // Flush arrived during drain: flush current batch
1154                                        // and wait for durability before acking.
1155                                        let (complete_tx, complete_rx) = oneshot::channel();
1156                                        if !batch.is_empty() {
1157                                            self.flush_batch(&mut batch, Some(complete_tx)).await;
1158                                        } else {
1159                                            self.send_flush_barrier(complete_tx).await;
1160                                        }
1161                                        let _ = complete_rx.await;
1162                                        self.await_replication_flush().await;
1163                                        let _ = ack.send(());
1164                                    }
1165                                    Err(_) => break,
1166                                }
1167                            }
1168
1169                            // Immediate flush — no timer delay.
1170                            if !batch.is_empty() {
1171                                self.flush_batch(&mut batch, None).await;
1172                            }
1173                        }
1174                        Some(JournalMessage::Flush(ack)) => {
1175                            // Always send through the writer thread to serialize
1176                            // behind any in-flight WriteBatch fsyncs.
1177                            let (complete_tx, complete_rx) = oneshot::channel();
1178                            if !batch.is_empty() {
1179                                self.flush_batch(&mut batch, Some(complete_tx)).await;
1180                            } else {
1181                                self.send_flush_barrier(complete_tx).await;
1182                            }
1183                            let _ = complete_rx.await;
1184                            // Wait for replication to complete so the checkpoint
1185                            // is current before any snapshot write.
1186                            self.await_replication_flush().await;
1187                            let _ = ack.send(());
1188                        }
1189                        None => {
1190                            info!("Journal batcher channel closed, flushing remaining {} entries", batch.len());
1191                            if !batch.is_empty() {
1192                                self.flush_batch(&mut batch, None).await;
1193                            }
1194                            // Shutdown WAL writer thread.
1195                            let _ = self.wal_writer_tx.send(WalWriterMessage::Shutdown);
1196                            if let Some(handle) = self.wal_writer_handle.take() {
1197                                handle.join().expect("CRITICAL_FAILURE: wal-writer thread panicked");
1198                            }
1199                            let _ = self.replication_tx.send(ReplicationMessage::Shutdown).await;
1200                            match replication_handle.await {
1201                                Ok(()) => {}
1202                                Err(e) => {
1203                                    panic!(
1204                                        "CRITICAL_FAILURE: Journal replication task panicked while closing channel: {}",
1205                                        e
1206                                    );
1207                                }
1208                            }
1209                            break;
1210                        }
1211                    }
1212                }
1213            }
1214        }
1215
1216        info!("Journal batcher stopped");
1217    }
1218
1219    /// Enqueue a batch for the WAL writer thread.
1220    ///
1221    /// Extracts ack senders from the entries and sends a `WalWriterMessage::WriteBatch`
1222    /// to the writer thread. The writer thread handles write + fsync + ack firing.
1223    ///
1224    /// If `completion` is Some, the writer thread fires it after fsync. Callers
1225    /// who need to block until fsync (e.g. Flush) should await the receiver side.
1226    async fn flush_batch(
1227        &mut self,
1228        batch: &mut Vec<JournalEntry>,
1229        completion: Option<oneshot::Sender<()>>,
1230    ) {
1231        if batch.is_empty() {
1232            return;
1233        }
1234
1235        let batch_size = batch.len();
1236        let mut entries = std::mem::take(batch);
1237
1238        histogram!("ht_journal_batch_size").record(batch_size as f64);
1239
1240        let ack_senders: Vec<Option<oneshot::Sender<()>>> =
1241            entries.iter_mut().map(|e| e.commit_ack.take()).collect();
1242
1243        let msg = WalWriterMessage::WriteBatch {
1244            entries,
1245            ack_senders,
1246            completion,
1247        };
1248
1249        // Send to writer thread via bounded sync channel.
1250        // Use spawn_blocking because SyncSender::send blocks if full (backpressure).
1251        let tx = self.wal_writer_tx.clone();
1252        tokio::task::spawn_blocking(move || {
1253            tx.send(msg)
1254                .expect("CRITICAL_FAILURE: wal-writer thread dead. Restart required.");
1255        })
1256        .await
1257        .expect("CRITICAL_FAILURE: spawn_blocking failed sending to wal-writer");
1258    }
1259
1260    /// Send a flush barrier to the WAL writer thread.
1261    ///
1262    /// The barrier is processed in FIFO order behind any in-flight WriteBatch
1263    /// messages, ensuring the completion fires only after all prior fsyncs.
1264    async fn send_flush_barrier(&self, completion: oneshot::Sender<()>) {
1265        let tx = self.wal_writer_tx.clone();
1266        tokio::task::spawn_blocking(move || {
1267            tx.send(WalWriterMessage::FlushBarrier { completion })
1268                .expect("CRITICAL_FAILURE: wal-writer thread dead. Restart required.");
1269        })
1270        .await
1271        .expect("CRITICAL_FAILURE: spawn_blocking failed sending flush barrier");
1272    }
1273
1274    /// Send a flush barrier through the replication channel and wait for all
1275    /// pending WAL entries to be replicated to Postgres.
1276    ///
1277    /// After this returns, the WAL checkpoint is guaranteed to reflect the state
1278    /// of all entries written to the WAL up to this point. This prevents
1279    /// snapshot-checkpoint lag: without this barrier, `persist_engine_state_snapshot`
1280    /// could read a stale checkpoint and tag the snapshot with a `last_command_id`
1281    /// that trails the live engine state, causing double-application of fills
1282    /// during the next recovery.
1283    async fn await_replication_flush(&self) {
1284        let (tx, rx) = oneshot::channel();
1285        if let Err(e) = self
1286            .replication_tx
1287            .send(ReplicationMessage::FlushBarrier(tx))
1288            .await
1289        {
1290            warn!(
1291                "Failed to send replication flush barrier (channel closed during shutdown): {}",
1292                e
1293            );
1294            return;
1295        }
1296        match rx.await {
1297            Ok(()) => {
1298                debug!("Replication flush barrier completed — checkpoint is up-to-date");
1299            }
1300            Err(_) => {
1301                warn!("Replication flush barrier ack dropped (replication task exited)");
1302            }
1303        }
1304    }
1305
1306    /// Replication loop: drains WAL records from the writer thread and replicates
1307    /// to PostgreSQL.
1308    ///
1309    /// NOTE: This loop does NOT listen to the broadcast shutdown signal directly.
1310    /// Shutdown is driven by `ReplicationMessage::Shutdown` sent by the batcher
1311    /// AFTER the WAL writer thread has been joined. This prevents a race where
1312    /// the broadcast fires early, the replication loop exits, and the WAL writer
1313    /// thread panics on `replication_tx.blocking_send()` to a closed channel.
1314    async fn replication_loop(
1315        pool: crate::db_handler::DbPool,
1316        persist_digests: bool,
1317        max_batch_size: usize,
1318        flush_interval: Duration,
1319        checkpoint_path: PathBuf,
1320        wal_path: PathBuf,
1321        mut receiver: mpsc::Receiver<ReplicationMessage>,
1322        #[cfg(feature = "rsm-state")] rsm_environment: hypercall_db::ValidatorRsmEnvironment,
1323    ) {
1324        info!("Journal replication loop started");
1325
1326        let mut pending: Vec<WalRecordWithOffset> = Vec::new();
1327        let mut last_flush = Instant::now();
1328        let mut last_punched_offset: u64 =
1329            Self::read_checkpoint_or_panic(&checkpoint_path).wal_offset;
1330        Self::emit_wal_checkpoint_metrics(&wal_path, last_punched_offset);
1331
1332        loop {
1333            tokio::select! {
1334                biased;
1335
1336                msg = receiver.recv() => {
1337                    match msg {
1338                        Some(ReplicationMessage::Entries(entries)) => {
1339                            pending.extend(entries);
1340                            gauge!("ht_journal_replication_backlog").set(pending.len() as f64);
1341                            if pending.len() >= max_batch_size {
1342                                if let Err(e) = Self::replicate_pending(
1343                                    &pool,
1344                                    persist_digests,
1345                                    max_batch_size,
1346                                    &checkpoint_path,
1347                                    &mut pending,
1348                                    #[cfg(feature = "rsm-state")]
1349                                    rsm_environment,
1350                                ).await {
1351                                    // pending is intentionally not drained; entries will be retried on next flush
1352                                    error!(
1353                                        pending = pending.len(),
1354                                        "Replication batch failed (WAL tail will grow): {}",
1355                                        e
1356                                    );
1357                                    counter!("ht_journal_replication_batch_failures_total").increment(1);
1358                                } else {
1359                                    last_flush = Instant::now();
1360                                    let cp = Self::read_checkpoint_or_panic(&checkpoint_path);
1361                                    Self::emit_wal_checkpoint_metrics(&wal_path, cp.wal_offset);
1362                                    if cp.wal_offset > last_punched_offset {
1363                                        Self::punch_wal_hole(&wal_path, last_punched_offset, cp.wal_offset);
1364                                        last_punched_offset = cp.wal_offset;
1365                                    }
1366                                }
1367                            }
1368                        }
1369                        Some(ReplicationMessage::FlushBarrier(ack)) => {
1370                            // Replicate ALL pending entries before acking. This
1371                            // guarantees the checkpoint reflects everything the
1372                            // WAL writer has sent us so far. Unlike the normal
1373                            // replication path, we MUST succeed here — a stale
1374                            // checkpoint would cause snapshot-checkpoint lag.
1375                            if !pending.is_empty() {
1376                                if let Err(e) = Self::replicate_pending(
1377                                    &pool,
1378                                    persist_digests,
1379                                    max_batch_size,
1380                                    &checkpoint_path,
1381                                    &mut pending,
1382                                    #[cfg(feature = "rsm-state")]
1383                                    rsm_environment,
1384                                ).await {
1385                                    error!(
1386                                        "CRITICAL_FAILURE: replication flush barrier failed: {}. \
1387                                         Cannot write snapshot with stale checkpoint. Aborting process.",
1388                                        e
1389                                    );
1390                                    std::process::abort();
1391                                }
1392                                last_flush = Instant::now();
1393                                let cp = Self::read_checkpoint_or_panic(&checkpoint_path);
1394                                Self::emit_wal_checkpoint_metrics(&wal_path, cp.wal_offset);
1395                                if cp.wal_offset > last_punched_offset {
1396                                    Self::punch_wal_hole(&wal_path, last_punched_offset, cp.wal_offset);
1397                                    last_punched_offset = cp.wal_offset;
1398                                }
1399                            }
1400                            let _ = ack.send(());
1401                        }
1402                        Some(ReplicationMessage::Shutdown) => {
1403                            info!("Journal replication received shutdown with {} pending entries", pending.len());
1404                            Self::drain_replication_channel(&mut receiver, &mut pending);
1405                            if let Err(e) = Self::replicate_pending(
1406                                &pool,
1407                                persist_digests,
1408                                max_batch_size,
1409                                &checkpoint_path,
1410                                &mut pending,
1411                                #[cfg(feature = "rsm-state")]
1412                                rsm_environment,
1413                            ).await {
1414                                error!("Replication failed during shutdown (data safe in WAL): {}", e);
1415                                counter!("ht_journal_replication_batch_failures_total").increment(1);
1416                            }
1417                            break;
1418                        }
1419                        None => {
1420                            info!("Replication channel closed with {} pending entries", pending.len());
1421                            if let Err(e) = Self::replicate_pending(
1422                                &pool,
1423                                persist_digests,
1424                                max_batch_size,
1425                                &checkpoint_path,
1426                                &mut pending,
1427                                #[cfg(feature = "rsm-state")]
1428                                rsm_environment,
1429                            ).await {
1430                                error!("Replication failed on channel close (data safe in WAL): {}", e);
1431                                counter!("ht_journal_replication_batch_failures_total").increment(1);
1432                            }
1433                            break;
1434                        }
1435                    }
1436                }
1437
1438                _ = tokio::time::sleep(flush_interval) => {
1439                    if !pending.is_empty() && last_flush.elapsed() >= flush_interval {
1440                        if let Err(e) = Self::replicate_pending(
1441                            &pool,
1442                            persist_digests,
1443                            max_batch_size,
1444                            &checkpoint_path,
1445                            &mut pending,
1446                            #[cfg(feature = "rsm-state")]
1447                            rsm_environment,
1448                        ).await {
1449                            // pending is intentionally not drained; entries will be retried on next flush
1450                            error!(
1451                                pending = pending.len(),
1452                                "Replication flush failed (WAL tail will grow): {}",
1453                                e
1454                            );
1455                            counter!("ht_journal_replication_batch_failures_total").increment(1);
1456                        } else {
1457                            last_flush = Instant::now();
1458                        }
1459                        let cp = Self::read_checkpoint_or_panic(&checkpoint_path);
1460                        Self::emit_wal_checkpoint_metrics(&wal_path, cp.wal_offset);
1461                        if cp.wal_offset > last_punched_offset {
1462                            Self::punch_wal_hole(&wal_path, last_punched_offset, cp.wal_offset);
1463                            last_punched_offset = cp.wal_offset;
1464                        }
1465                    }
1466                }
1467            }
1468        }
1469
1470        info!("Journal replication loop stopped");
1471    }
1472
1473    fn read_checkpoint_or_panic(path: &Path) -> WalCheckpointMetadata {
1474        read_checkpoint(path).unwrap_or_else(|e| {
1475            panic!(
1476                "CRITICAL_FAILURE: failed to read/parse checkpoint {}: {}",
1477                path.display(),
1478                e
1479            )
1480        })
1481    }
1482
1483    fn drain_replication_channel(
1484        receiver: &mut mpsc::Receiver<ReplicationMessage>,
1485        pending: &mut Vec<WalRecordWithOffset>,
1486    ) {
1487        while let Ok(msg) = receiver.try_recv() {
1488            match msg {
1489                ReplicationMessage::Entries(entries) => pending.extend(entries),
1490                ReplicationMessage::FlushBarrier(ack) => {
1491                    // During drain, ack immediately — all pending entries will
1492                    // be replicated by the caller after drain completes.
1493                    let _ = ack.send(());
1494                }
1495                ReplicationMessage::Shutdown => break,
1496            }
1497        }
1498    }
1499
1500    /// Punch a hole in the WAL file for the replicated byte range [from, to).
1501    /// Releases disk blocks without changing the file size (sparse file).
1502    /// Non-fatal: logs a warning if the filesystem doesn't support it.
1503    fn punch_wal_hole(wal_path: &Path, from: u64, to: u64) {
1504        if to <= from {
1505            return;
1506        }
1507
1508        #[cfg(target_os = "linux")]
1509        {
1510            use std::os::unix::io::AsRawFd;
1511            let file = match OpenOptions::new().write(true).open(wal_path) {
1512                Ok(f) => f,
1513                Err(e) => {
1514                    warn!("Failed to open WAL for hole punch: {}", e);
1515                    return;
1516                }
1517            };
1518            let ret = unsafe {
1519                libc::fallocate(
1520                    file.as_raw_fd(),
1521                    libc::FALLOC_FL_PUNCH_HOLE | libc::FALLOC_FL_KEEP_SIZE,
1522                    from as i64,
1523                    (to - from) as i64,
1524                )
1525            };
1526            if ret == 0 {
1527                let reclaimed_mb = (to - from) / (1024 * 1024);
1528                counter!("ht_wal_hole_punch_bytes").increment(to - from);
1529                debug!(
1530                    "WAL hole punch: reclaimed ~{}MB (offset {}..{})",
1531                    reclaimed_mb, from, to
1532                );
1533            } else {
1534                let errno = std::io::Error::last_os_error();
1535                warn!(
1536                    "WAL hole punch failed (non-fatal): {} (offset {}..{})",
1537                    errno, from, to
1538                );
1539            }
1540        }
1541
1542        #[cfg(not(target_os = "linux"))]
1543        {
1544            let _ = (wal_path, from, to);
1545        }
1546    }
1547
1548    fn advance_checkpoint(
1549        checkpoint_path: &Path,
1550        last_offset: u64,
1551        progress: ReplicationProgress,
1552    ) -> Result<(), String> {
1553        let current = read_checkpoint(checkpoint_path)?;
1554        let next = WalCheckpointMetadata {
1555            wal_offset: last_offset,
1556            last_command_id: current.last_command_id.max(progress.last_command_id),
1557            last_l2_seq: current.last_l2_seq.max(progress.max_l2_seq),
1558        };
1559        write_checkpoint(checkpoint_path, next)
1560    }
1561
1562    /// Emit Prometheus gauges for WAL checkpoint offset and unreplicated tail.
1563    fn emit_wal_checkpoint_metrics(wal_path: &Path, checkpoint_offset: u64) {
1564        gauge!("ht_wal_checkpoint_offset_bytes").set(checkpoint_offset as f64);
1565        match std::fs::metadata(wal_path) {
1566            Ok(meta) => {
1567                let unreplicated = meta.len().saturating_sub(checkpoint_offset);
1568                gauge!("ht_wal_unreplicated_bytes").set(unreplicated as f64);
1569            }
1570            Err(e) => {
1571                warn!("Failed to stat WAL for checkpoint metrics: {}", e);
1572            }
1573        }
1574    }
1575
1576    async fn replicate_pending(
1577        pool: &crate::db_handler::DbPool,
1578        persist_digests: bool,
1579        max_batch_size: usize,
1580        checkpoint_path: &Path,
1581        pending: &mut Vec<WalRecordWithOffset>,
1582        #[cfg(feature = "rsm-state")] rsm_environment: hypercall_db::ValidatorRsmEnvironment,
1583    ) -> Result<(), String> {
1584        while !pending.is_empty() {
1585            let chunk_len = next_replication_chunk_len(pending, max_batch_size);
1586            let chunk: Vec<WalRecordWithOffset> = pending.iter().take(chunk_len).cloned().collect();
1587
1588            match Self::replicate_chunk(
1589                pool,
1590                persist_digests,
1591                &chunk,
1592                #[cfg(feature = "rsm-state")]
1593                rsm_environment,
1594            )
1595            .await
1596            {
1597                Ok(progress) => {
1598                    let last_offset = chunk
1599                        .last()
1600                        .expect("replication chunk unexpectedly empty")
1601                        .end_offset;
1602                    Self::advance_checkpoint(checkpoint_path, last_offset, progress)?;
1603                    pending.drain(..chunk_len);
1604                    gauge!("ht_journal_replication_backlog").set(pending.len() as f64);
1605                    counter!("ht_journal_replication_chunks_total").increment(1);
1606                }
1607                Err(e) => {
1608                    counter!("ht_journal_replication_errors_total").increment(1);
1609                    warn!(
1610                        "Journal replication chunk failed, batch remains pending for next flush cycle (pending={}): {}",
1611                        pending.len(),
1612                        e
1613                    );
1614                    tokio::time::sleep(REPLICATION_RETRY_DELAY).await;
1615                    return Err(e);
1616                }
1617            }
1618        }
1619
1620        Ok(())
1621    }
1622
1623    async fn replicate_chunk(
1624        pool: &crate::db_handler::DbPool,
1625        persist_digests: bool,
1626        records: &[WalRecordWithOffset],
1627        #[cfg(feature = "rsm-state")] rsm_environment: hypercall_db::ValidatorRsmEnvironment,
1628    ) -> Result<ReplicationProgress, String> {
1629        if records.is_empty() {
1630            return Ok(ReplicationProgress {
1631                last_command_id: 0,
1632                max_l2_seq: 0,
1633            });
1634        }
1635
1636        let db_entries: Vec<JournalEntry> = records
1637            .iter()
1638            .map(WalRecordWithOffset::to_db_entry)
1639            .collect();
1640
1641        let pool_for_insert = pool.clone();
1642        #[cfg(feature = "rsm-state")]
1643        let block_records = records.to_vec();
1644        let insert_start = Instant::now();
1645        let insert_result = tokio::task::spawn_blocking(move || {
1646            Self::insert_batch(
1647                &pool_for_insert,
1648                &db_entries,
1649                persist_digests,
1650                #[cfg(feature = "rsm-state")]
1651                Some(Self::rsm_block_persistence_batch(
1652                    RsmBlockPersistenceInput {
1653                        environment: rsm_environment,
1654                        records: &block_records,
1655                        mode: RsmBlockPersistenceMode::Live,
1656                    },
1657                )?),
1658            )
1659        })
1660        .await
1661        .map_err(|e| format!("spawn_blocking failed during replication insert: {}", e))?;
1662
1663        histogram!("ht_journal_postgres_insert_seconds")
1664            .record(insert_start.elapsed().as_secs_f64());
1665
1666        let (_inserted_count, mut inserted_commands) =
1667            insert_result.map_err(|e| format!("replication insert failed: {}", e))?;
1668
1669        if inserted_commands.is_empty() {
1670            // Insert may have succeeded on a previous attempt, so rehydrate command IDs.
1671            let request_ids: Vec<hypercall_db_diesel::engine_enums::DbUuid> =
1672                records.iter().map(|r| r.record.request_uuid).collect();
1673            let pool_for_lookup = pool.clone();
1674            let lookup_result = tokio::task::spawn_blocking(move || {
1675                Self::lookup_command_ids_by_request_ids(&pool_for_lookup, &request_ids)
1676            })
1677            .await
1678            .map_err(|e| format!("spawn_blocking failed during command_id lookup: {}", e))?;
1679
1680            inserted_commands =
1681                lookup_result.map_err(|e| format!("command_id lookup failed: {}", e))?;
1682
1683            if inserted_commands.is_empty() {
1684                return Err(
1685                    "replication insert returned no command IDs and lookup by request_id returned empty"
1686                        .to_string(),
1687                );
1688            }
1689        }
1690
1691        let last_command_id = inserted_commands
1692            .iter()
1693            .map(|(_, command_id)| *command_id)
1694            .max()
1695            .expect("inserted_commands unexpectedly empty after rehydration");
1696
1697        let max_l2_seq = records
1698            .iter()
1699            .flat_map(|record| record.record.events.iter().filter_map(|e| e.l2_sequence))
1700            .max()
1701            .unwrap_or(0);
1702
1703        Ok(ReplicationProgress {
1704            last_command_id,
1705            max_l2_seq,
1706        })
1707    }
1708
1709    /// Insert a batch of entries into PostgreSQL (blocking).
1710    /// Returns (inserted_count, [(request_uuid, command_id)]) for downstream event publish accounting.
1711    fn insert_batch(
1712        pool: &crate::db_handler::DbPool,
1713        entries: &[JournalEntry],
1714        persist_digests: bool,
1715        #[cfg(feature = "rsm-state")] rsm_blocks: Option<hypercall_db::EngineJournalRsmBlockBatch>,
1716    ) -> anyhow::Result<(usize, Vec<(hypercall_db_diesel::engine_enums::DbUuid, i64)>)> {
1717        let db_entries: Vec<_> = entries.iter().map(engine_journal_entry_insert).collect();
1718        let handler =
1719            hypercall_db_diesel::DatabaseHandler::with_pool_no_migrations(Arc::new(pool.clone()));
1720        #[cfg(feature = "rsm-state")]
1721        let rsm_blocks = rsm_blocks.as_ref();
1722        #[cfg(not(feature = "rsm-state"))]
1723        let rsm_blocks = None;
1724        let result =
1725            handler.insert_engine_journal_batch_sync(&db_entries, persist_digests, rsm_blocks)?;
1726        Ok((
1727            result.inserted_count,
1728            result
1729                .inserted_commands
1730                .into_iter()
1731                .map(|(request_uuid, command_id)| {
1732                    (
1733                        hypercall_db_diesel::engine_enums::DbUuid(request_uuid),
1734                        command_id,
1735                    )
1736                })
1737                .collect(),
1738        ))
1739    }
1740
1741    fn lookup_command_ids_by_request_ids(
1742        pool: &crate::db_handler::DbPool,
1743        request_ids: &[hypercall_db_diesel::engine_enums::DbUuid],
1744    ) -> anyhow::Result<Vec<(hypercall_db_diesel::engine_enums::DbUuid, i64)>> {
1745        let request_ids: Vec<_> = request_ids.iter().map(|id| id.0).collect();
1746        let handler =
1747            hypercall_db_diesel::DatabaseHandler::with_pool_no_migrations(Arc::new(pool.clone()));
1748        Ok(handler
1749            .lookup_engine_journal_command_ids_sync(&request_ids)?
1750            .into_iter()
1751            .map(|(request_uuid, command_id)| {
1752                (
1753                    hypercall_db_diesel::engine_enums::DbUuid(request_uuid),
1754                    command_id,
1755                )
1756            })
1757            .collect())
1758    }
1759
1760    /// Start the batcher as a background task
1761    pub fn start(self) -> tokio::task::JoinHandle<()> {
1762        tokio::spawn(
1763            async move {
1764                self.run().await;
1765            }
1766            .instrument(info_span!("journal_batcher")),
1767        )
1768    }
1769}
1770
1771fn engine_journal_entry_insert(entry: &JournalEntry) -> hypercall_db::EngineJournalEntryInsert {
1772    let pre_digest_json = serde_json::to_value(&entry.pre_digest)
1773        .expect("Failed to serialize pre_digest for journal insert");
1774    let post_digest_json = serde_json::to_value(&entry.post_digest)
1775        .expect("Failed to serialize post_digest for journal insert");
1776
1777    hypercall_db::EngineJournalEntryInsert {
1778        request_uuid: entry.request_uuid.0,
1779        received_ts_ms: entry.received_ts_ms,
1780        command_data: entry.command_data.clone(),
1781        response_data: entry.response_data.clone(),
1782        order_id: entry.order_id,
1783        command_type: entry
1784            .command_type_enum
1785            .map(|command| command.as_str().to_string()),
1786        duration_ms: entry.duration_ms,
1787        pre_digest_data: hypercall_types::serialize_to_wire_bytes(&pre_digest_json),
1788        post_digest_data: hypercall_types::serialize_to_wire_bytes(&post_digest_json),
1789        events: entry
1790            .events
1791            .iter()
1792            .map(|event| hypercall_db::EngineJournalEventInsert {
1793                event_topic: event.event_topic.clone(),
1794                event_key: event.event_key.clone(),
1795                event_data: event.event_data.clone(),
1796                l2_sequence: event.l2_sequence,
1797                event_type: event.event_type_enum.into(),
1798            })
1799            .collect(),
1800        outbox_appends: entry.outbox_appends.clone(),
1801        fill_side_effects: entry
1802            .fill_side_effects
1803            .iter()
1804            .map(|side_effect| hypercall_db::EngineJournalFillSideEffect {
1805                event_idx: side_effect.event_idx,
1806                trade_id: side_effect.trade_id,
1807                taker_ledger_delta: side_effect.taker_ledger_delta,
1808                maker_ledger_delta: side_effect.maker_ledger_delta,
1809                taker_premium_delta: side_effect.taker_premium_delta,
1810                maker_premium_delta: side_effect.maker_premium_delta,
1811                underlying_notional: side_effect.underlying_notional,
1812            })
1813            .collect(),
1814        cash_withdrawal_side_effect: entry.cash_withdrawal_side_effect.as_ref().map(
1815            |side_effect| hypercall_db::EngineJournalCashWithdrawalSideEffect {
1816                wallet: side_effect.wallet,
1817                request_id: side_effect.request_id.clone(),
1818                amount: side_effect.amount,
1819                balance_after: side_effect.balance_after,
1820                timestamp_ms: side_effect.timestamp_ms,
1821            },
1822        ),
1823        balance_updates: entry
1824            .balance_updates
1825            .iter()
1826            .cloned()
1827            .map(|update| hypercall_db::EngineJournalBalanceUpdate { update })
1828            .collect(),
1829    }
1830}
1831
1832#[cfg(feature = "rsm-state")]
1833fn engine_command_identity_hashes(entries: &[JournalEntry]) -> Vec<[u8; 32]> {
1834    entries
1835        .iter()
1836        .filter_map(|entry| {
1837            entry
1838                .command_type_enum
1839                .is_some_and(rsm_state_supported_command)
1840                .then_some(entry.command_identity_hash)
1841        })
1842        .collect()
1843}
1844
1845#[cfg(feature = "rsm-state")]
1846#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1847struct RsmBatchRoots {
1848    state_root: [u8; 32],
1849    risk_root: [u8; 32],
1850    command_mmr_root: [u8; 32],
1851    obligation_mmr_root: [u8; 32],
1852    intent_mmr_root: [u8; 32],
1853    batch_root: [u8; 32],
1854}
1855
1856#[cfg(feature = "rsm-state")]
1857impl From<hypercall_state_commitment::pipeline::BatchCommitment> for RsmBatchRoots {
1858    fn from(commitment: hypercall_state_commitment::pipeline::BatchCommitment) -> Self {
1859        Self {
1860            state_root: commitment.state_root.0,
1861            risk_root: commitment.risk_root.0,
1862            command_mmr_root: commitment.command_mmr_root.0,
1863            obligation_mmr_root: commitment.obligation_mmr_root.0,
1864            intent_mmr_root: commitment.intent_mmr_root.0,
1865            batch_root: commitment.batch_root,
1866        }
1867    }
1868}
1869
1870#[cfg(feature = "rsm-state")]
1871fn rsm_batch_roots_from_wal(records: &[WalRecordWithOffset]) -> Option<RsmBatchRoots> {
1872    records.iter().find_map(|record| record.block_roots)
1873}
1874
1875#[cfg(feature = "rsm-state")]
1876fn apply_recovered_rsm_commitment_blocks(
1877    records: &mut [WalRecordWithOffset],
1878    commitments: &mut ValidatorRsmStateCommitmentRuntime,
1879) -> anyhow::Result<()> {
1880    let mut index = 0;
1881    while index < records.len() {
1882        let Some(header) = records[index].block_header.as_ref() else {
1883            index += 1;
1884            continue;
1885        };
1886
1887        let end_offset = records[index].end_offset;
1888        let block_end = records[index..]
1889            .iter()
1890            .position(|candidate| candidate.end_offset != end_offset)
1891            .map(|relative| index + relative)
1892            .unwrap_or(records.len());
1893        let block_records = &records[index..block_end];
1894        let expected_roots = rsm_batch_roots_from_wal(block_records);
1895
1896        let next_version = commitments.next_batch_version();
1897        if header.batch_seq < next_version {
1898            index = block_end;
1899            continue;
1900        }
1901        if header.batch_seq > next_version {
1902            anyhow::bail!(
1903                "recovered RSM block version gap: WAL block {} but local commitment store next version is {}",
1904                header.batch_seq,
1905                next_version
1906            );
1907        }
1908
1909        let entries: Vec<JournalEntry> = block_records
1910            .iter()
1911            .map(WalRecordWithOffset::to_db_entry)
1912            .collect();
1913        let identity_hashes = engine_command_identity_hashes(&entries);
1914        if identity_hashes.len() as u64 != header.command_count {
1915            anyhow::bail!(
1916                "recovered RSM block {} command count mismatch: header={} identities={}",
1917                header.batch_seq,
1918                header.command_count,
1919                identity_hashes.len()
1920            );
1921        }
1922
1923        let delta = rsm_state_delta_from_entries(&entries, &identity_hashes, header.first_seq)?;
1924        let prepared = commitments.prepare_rsm_batch(&delta)?;
1925        let computed_roots: RsmBatchRoots = prepared.commitment.clone().into();
1926        match expected_roots {
1927            Some(expected_roots) if computed_roots != expected_roots => {
1928                anyhow::bail!(
1929                    "recovered RSM block {} root mismatch: WAL batch_root={} local batch_root={}",
1930                    header.batch_seq,
1931                    hex::encode(expected_roots.batch_root),
1932                    hex::encode(computed_roots.batch_root)
1933                );
1934            }
1935            Some(_) => {}
1936            None if computed_roots.batch_root != header.batch_root => {
1937                anyhow::bail!(
1938                    "recovered legacy RSM block {} root mismatch: WAL header batch_root={} local batch_root={}",
1939                    header.batch_seq,
1940                    hex::encode(header.batch_root),
1941                    hex::encode(computed_roots.batch_root)
1942                );
1943            }
1944            None => {
1945                records[index].block_roots = Some(computed_roots);
1946            }
1947        }
1948        commitments.apply_prepared_rsm_batch(prepared)?;
1949
1950        index = block_end;
1951    }
1952
1953    Ok(())
1954}
1955
1956#[cfg(feature = "rsm-state")]
1957fn rsm_state_delta_from_entries(
1958    entries: &[JournalEntry],
1959    identity_hashes: &[[u8; 32]],
1960    first_seq: u64,
1961) -> anyhow::Result<StateDelta> {
1962    let engine_entries: Vec<&JournalEntry> = entries
1963        .iter()
1964        .filter(|entry| {
1965            entry
1966                .command_type_enum
1967                .is_some_and(rsm_state_supported_command)
1968        })
1969        .collect();
1970    let mut delta = rsm_state_delta_from_parts(
1971        engine_entries
1972            .iter()
1973            .filter_map(|entry| entry.command_type_enum),
1974        engine_entries.iter().flat_map(|entry| entry.events.iter()),
1975        identity_hashes,
1976        first_seq,
1977    )?;
1978    delta.global = Some(rsm_global_leaf(identity_hashes, first_seq));
1979    Ok(delta)
1980}
1981
1982#[cfg(feature = "rsm-state")]
1983fn rsm_state_delta_from_parts<'a>(
1984    command_types: impl IntoIterator<Item = hypercall_db_diesel::engine_enums::CommandType>,
1985    events: impl IntoIterator<Item = &'a EventPayload>,
1986    identity_hashes: &[[u8; 32]],
1987    first_seq: u64,
1988) -> anyhow::Result<StateDelta> {
1989    for command_type in command_types {
1990        if !rsm_state_supported_command(command_type) {
1991            anyhow::bail!(
1992                "unsupported validator RSM state materializer command type: {}",
1993                command_type.as_str()
1994            );
1995        }
1996    }
1997    let mut delta = StateDelta::default();
1998    let _ = first_seq;
1999    for hash in identity_hashes {
2000        delta = append_command_delta(delta, *hash);
2001    }
2002    for event in events {
2003        delta = append_obligation_delta(delta, rsm_event_payload_hash(event));
2004    }
2005    for hash in identity_hashes {
2006        delta = append_intent_delta(delta, *hash);
2007    }
2008    Ok(delta)
2009}
2010
2011#[cfg(feature = "rsm-state")]
2012fn append_command_delta(mut delta: StateDelta, command_hash: [u8; 32]) -> StateDelta {
2013    delta.commands.push(command_hash);
2014    delta
2015}
2016
2017#[cfg(feature = "rsm-state")]
2018fn append_obligation_delta(mut delta: StateDelta, obligation_hash: [u8; 32]) -> StateDelta {
2019    delta.obligations.push(obligation_hash);
2020    delta
2021}
2022
2023#[cfg(feature = "rsm-state")]
2024fn append_intent_delta(mut delta: StateDelta, intent_hash: [u8; 32]) -> StateDelta {
2025    delta.intents.push(intent_hash);
2026    delta
2027}
2028
2029#[cfg(feature = "rsm-state")]
2030fn rsm_state_supported_command(
2031    command_type: hypercall_db_diesel::engine_enums::CommandType,
2032) -> bool {
2033    // First materializer slice: TickSnapshot only. It advances the command log
2034    // commitment but does not mutate order, position, ledger, market, oracle,
2035    // nonce, or risk state. Every state-mutating command fails closed until its
2036    // leaf materializer lands.
2037    matches!(
2038        command_type,
2039        hypercall_db_diesel::engine_enums::CommandType::TickSnapshot
2040    )
2041}
2042
2043#[cfg(feature = "rsm-state")]
2044fn rsm_global_leaf(identity_hashes: &[[u8; 32]], first_seq: u64) -> GlobalLeaf {
2045    GlobalLeaf {
2046        next_order_id: 0,
2047        next_trade_id: 0,
2048        command_chain_root: hypercall_blocks::compute_commands_hash(identity_hashes),
2049        command_chain_seq: first_seq + identity_hashes.len() as u64,
2050    }
2051}
2052
2053#[cfg(feature = "rsm-state")]
2054fn rsm_event_payload_hash(event: &EventPayload) -> [u8; 32] {
2055    let bytes = rmp_serde::to_vec_named(event)
2056        .expect("EventPayload serialization should not fail for RSM obligation root");
2057    rsm_hash_with_domain(b"hypercall:rsm:event-payload:v1", &bytes)
2058}
2059
2060#[cfg(feature = "rsm-state")]
2061fn rsm_hash_with_domain(domain: &[u8], payload: &[u8]) -> [u8; 32] {
2062    use sha3::{Digest, Keccak256};
2063
2064    let mut hasher = Keccak256::new();
2065    hasher.update(domain);
2066    hasher.update((payload.len() as u64).to_be_bytes());
2067    hasher.update(payload);
2068    hasher.finalize().into()
2069}
2070
2071#[derive(Debug, Clone, Serialize, Deserialize)]
2072struct WalJournalRecord {
2073    request_uuid: hypercall_db_diesel::engine_enums::DbUuid,
2074    received_ts_ms: u64,
2075    command_data: Vec<u8>,
2076    response_data: Option<Vec<u8>>,
2077    order_id: Option<i64>,
2078    pre_digest: EngineStateDigest,
2079    post_digest: EngineStateDigest,
2080    duration_ms: u64,
2081    events: Vec<EventPayload>,
2082    outbox_appends: Vec<crate::directive_outbox::DirectiveOutboxAppend>,
2083    fill_side_effects: Vec<JournalFillSideEffect>,
2084    #[serde(default)]
2085    cash_withdrawal_side_effect: Option<JournalCashWithdrawalSideEffect>,
2086    #[serde(default)]
2087    balance_updates: Vec<hypercall_types::BalanceUpdate>,
2088    command_type_enum: Option<hypercall_db_diesel::engine_enums::CommandType>,
2089    #[cfg(feature = "rsm-state")]
2090    #[serde(default)]
2091    command_identity_hash: [u8; 32],
2092    #[cfg(feature = "rsm-state")]
2093    #[serde(default)]
2094    rsm_state_digest: Option<RsmEngineStateDigest>,
2095}
2096
2097#[cfg(feature = "rsm-state")]
2098type WalBlockRecord = BlockLogHeader;
2099#[cfg(feature = "rsm-state")]
2100type WalBlockEntry = BlockLogEntry<WalJournalRecord>;
2101
2102#[cfg(feature = "rsm-state")]
2103#[derive(Debug, Clone, Serialize, Deserialize)]
2104struct WalBlockRecordWithRoots {
2105    header: WalBlockRecord,
2106    roots: RsmBatchRoots,
2107}
2108
2109#[cfg(feature = "rsm-state")]
2110#[derive(Debug, Clone, Serialize, Deserialize)]
2111struct WalDurableBlockEntry {
2112    header: WalBlockRecord,
2113    roots: RsmBatchRoots,
2114    commands: Vec<WalJournalRecord>,
2115}
2116
2117impl WalJournalRecord {
2118    fn from_entry(entry: &JournalEntry) -> Self {
2119        Self {
2120            request_uuid: entry.request_uuid,
2121            received_ts_ms: entry.received_ts_ms,
2122            command_data: entry.command_data.clone(),
2123            response_data: entry.response_data.clone(),
2124            order_id: entry.order_id,
2125            pre_digest: entry.pre_digest.clone(),
2126            post_digest: entry.post_digest.clone(),
2127            duration_ms: entry.duration_ms,
2128            events: entry.events.clone(),
2129            outbox_appends: entry.outbox_appends.clone(),
2130            fill_side_effects: entry.fill_side_effects.clone(),
2131            cash_withdrawal_side_effect: entry.cash_withdrawal_side_effect.clone(),
2132            balance_updates: entry.balance_updates.clone(),
2133            command_type_enum: entry.command_type_enum,
2134            #[cfg(feature = "rsm-state")]
2135            command_identity_hash: entry.command_identity_hash,
2136            #[cfg(feature = "rsm-state")]
2137            rsm_state_digest: entry.rsm_state_digest.clone(),
2138        }
2139    }
2140}
2141
2142#[derive(Debug, Clone)]
2143struct WalRecordWithOffset {
2144    record: WalJournalRecord,
2145    end_offset: u64,
2146    #[cfg(feature = "rsm-state")]
2147    block_header: Option<WalBlockRecord>,
2148    #[cfg(feature = "rsm-state")]
2149    block_roots: Option<RsmBatchRoots>,
2150}
2151
2152impl WalRecordWithOffset {
2153    fn to_db_entry(&self) -> JournalEntry {
2154        JournalEntry {
2155            request_uuid: self.record.request_uuid,
2156            received_ts_ms: self.record.received_ts_ms,
2157            command_data: self.record.command_data.clone(),
2158            response_data: self.record.response_data.clone(),
2159            order_id: self.record.order_id,
2160            pre_digest: self.record.pre_digest.clone(),
2161            post_digest: self.record.post_digest.clone(),
2162            duration_ms: self.record.duration_ms,
2163            events: self.record.events.clone(),
2164            outbox_appends: self.record.outbox_appends.clone(),
2165            fill_side_effects: self.record.fill_side_effects.clone(),
2166            cash_withdrawal_side_effect: self.record.cash_withdrawal_side_effect.clone(),
2167            balance_updates: self.record.balance_updates.clone(),
2168            created_at: Instant::now(),
2169            commit_ack: None,
2170            command_type_enum: self.record.command_type_enum,
2171            #[cfg(feature = "rsm-state")]
2172            command_identity_hash: self.record.command_identity_hash,
2173            #[cfg(feature = "rsm-state")]
2174            rsm_state_digest: self.record.rsm_state_digest.clone(),
2175        }
2176    }
2177}
2178
2179/// Returns the number of records for the next replication chunk.
2180///
2181/// `pending` is the ordered WAL tail and `max_batch_size` is the target command
2182/// cap. The returned length may exceed that cap only to keep records with the
2183/// same `end_offset` together, since they came from the same physical WAL
2184/// record and checkpoint advancement must stay atomic at that boundary.
2185fn next_replication_chunk_len(pending: &[WalRecordWithOffset], max_batch_size: usize) -> usize {
2186    if pending.is_empty() {
2187        return 0;
2188    }
2189
2190    let mut chunk_len = max_batch_size.max(1).min(pending.len());
2191    while chunk_len < pending.len()
2192        && pending[chunk_len - 1].end_offset == pending[chunk_len].end_offset
2193    {
2194        chunk_len += 1;
2195    }
2196    chunk_len
2197}
2198
2199fn should_defer_replay_record(
2200    current_batch_commands: usize,
2201    decoded_record_commands: usize,
2202    max_batch_size: usize,
2203) -> bool {
2204    current_batch_commands > 0
2205        && current_batch_commands + decoded_record_commands > max_batch_size.max(1)
2206}
2207
2208struct WalState {
2209    wal_path: PathBuf,
2210    checkpoint_path: PathBuf,
2211    file: File,
2212    next_offset: u64,
2213    /// End offset of the last fallocate'd region. Pre-allocation avoids
2214    /// extent allocation stalls during sequential WAL writes on ext4/xfs.
2215    preallocated_end: u64,
2216}
2217
2218impl WalState {
2219    /// Open a WAL at a specific path. Preferred for tests to avoid env var races.
2220    #[cfg(test)]
2221    fn open_at(wal_path: PathBuf) -> Result<Self, String> {
2222        if let Some(parent) = wal_path.parent() {
2223            if !parent.as_os_str().is_empty() {
2224                create_dir_all(parent).map_err(|e| {
2225                    format!("failed to create WAL directory {}: {}", parent.display(), e)
2226                })?;
2227            }
2228        }
2229
2230        let checkpoint_path = checkpoint_path_for_wal(&wal_path);
2231
2232        let file = OpenOptions::new()
2233            .create(true)
2234            .append(true)
2235            .read(true)
2236            .open(&wal_path)
2237            .map_err(|e| format!("failed to open WAL file {}: {}", wal_path.display(), e))?;
2238
2239        let next_offset = file
2240            .metadata()
2241            .map_err(|e| format!("failed to stat WAL file {}: {}", wal_path.display(), e))?
2242            .len();
2243
2244        Ok(Self {
2245            wal_path,
2246            checkpoint_path,
2247            file,
2248            next_offset,
2249            preallocated_end: next_offset,
2250        })
2251    }
2252
2253    fn open(wal_path: PathBuf) -> Result<Self, String> {
2254        if let Some(parent) = wal_path.parent() {
2255            if !parent.as_os_str().is_empty() {
2256                create_dir_all(parent).map_err(|e| {
2257                    format!("failed to create WAL directory {}: {}", parent.display(), e)
2258                })?;
2259
2260                Self::emit_disk_space_metrics(parent);
2261            }
2262        }
2263
2264        let checkpoint_path = checkpoint_path_for_wal(&wal_path);
2265
2266        #[cfg(unix)]
2267        let file = {
2268            use std::os::unix::fs::OpenOptionsExt;
2269            OpenOptions::new()
2270                .create(true)
2271                .append(true)
2272                .read(true)
2273                .mode(0o600)
2274                .open(&wal_path)
2275                .map_err(|e| format!("failed to open WAL file {}: {}", wal_path.display(), e))?
2276        };
2277        #[cfg(not(unix))]
2278        let file = OpenOptions::new()
2279            .create(true)
2280            .append(true)
2281            .read(true)
2282            .open(&wal_path)
2283            .map_err(|e| format!("failed to open WAL file {}: {}", wal_path.display(), e))?;
2284
2285        let next_offset = file
2286            .metadata()
2287            .map_err(|e| format!("failed to stat WAL file {}: {}", wal_path.display(), e))?
2288            .len();
2289
2290        // Punch holes for already-replicated WAL prefix on startup.
2291        // Reclaims disk from data written before hole punching was added.
2292        let cp = read_checkpoint(&checkpoint_path).unwrap_or_else(|e| {
2293            panic!(
2294                "CRITICAL_FAILURE: failed to read/parse checkpoint {}: {}",
2295                checkpoint_path.display(),
2296                e
2297            )
2298        });
2299        if cp.wal_offset > 0 {
2300            info!(
2301                "WAL startup: punching replicated prefix (0..{} bytes, file size={} bytes)",
2302                cp.wal_offset, next_offset
2303            );
2304            EngineJournalBatcher::punch_wal_hole(&wal_path, 0, cp.wal_offset);
2305
2306            if let Some(parent) = wal_path.parent() {
2307                Self::emit_disk_space_metrics(parent);
2308            }
2309        }
2310
2311        Ok(Self {
2312            wal_path,
2313            checkpoint_path,
2314            file,
2315            next_offset,
2316            preallocated_end: next_offset,
2317        })
2318    }
2319
2320    /// Emit Prometheus gauges for WAL volume disk space.
2321    fn emit_disk_space_metrics(path: &Path) {
2322        #[cfg(unix)]
2323        {
2324            use std::ffi::CString;
2325            let c_path = match CString::new(path.to_string_lossy().as_bytes()) {
2326                Ok(p) => p,
2327                Err(_) => return,
2328            };
2329            unsafe {
2330                let mut stat: libc::statvfs = std::mem::zeroed();
2331                if libc::statvfs(c_path.as_ptr(), &mut stat) == 0 {
2332                    let total_bytes = stat.f_frsize as u64 * stat.f_blocks as u64;
2333                    let avail_bytes = stat.f_frsize as u64 * stat.f_bavail as u64;
2334                    let used_bytes =
2335                        total_bytes.saturating_sub(stat.f_frsize as u64 * stat.f_bfree as u64);
2336                    let pct = if total_bytes > 0 {
2337                        (used_bytes as f64 / total_bytes as f64) * 100.0
2338                    } else {
2339                        0.0
2340                    };
2341
2342                    gauge!("ht_wal_volume_total_bytes").set(total_bytes as f64);
2343                    gauge!("ht_wal_volume_used_bytes").set(used_bytes as f64);
2344                    gauge!("ht_wal_volume_avail_bytes").set(avail_bytes as f64);
2345                    gauge!("ht_wal_volume_usage_pct").set(pct);
2346                }
2347            }
2348        }
2349    }
2350
2351    /// Pre-allocate disk space for the WAL file in 64 MiB chunks using fallocate(2).
2352    ///
2353    /// Uses `FALLOC_FL_KEEP_SIZE` to avoid inflating the file size (replay reads
2354    /// up to the actual EOF). Non-fatal on failure (some filesystems like tmpfs
2355    /// don't support fallocate). No-op on non-Linux.
2356    fn ensure_preallocated(&mut self) -> u64 {
2357        if self.next_offset < self.preallocated_end {
2358            return 0;
2359        }
2360
2361        #[cfg(target_os = "linux")]
2362        {
2363            let offset = self.preallocated_end as i64;
2364            let len = WAL_PREALLOC_CHUNK as i64;
2365            let ret = unsafe {
2366                libc::fallocate(
2367                    self.file.as_raw_fd(),
2368                    libc::FALLOC_FL_KEEP_SIZE,
2369                    offset,
2370                    len,
2371                )
2372            };
2373            if ret == 0 {
2374                self.preallocated_end += WAL_PREALLOC_CHUNK;
2375                WAL_PREALLOC_CHUNK
2376            } else {
2377                let errno = std::io::Error::last_os_error();
2378                warn!(
2379                    "fallocate failed (non-fatal, continuing without preallocation): {}",
2380                    errno
2381                );
2382                // Disk might be full — log space info for diagnosis
2383                if let Some(parent) = self.wal_path.parent() {
2384                    Self::emit_disk_space_metrics(parent);
2385                }
2386                0
2387            }
2388        }
2389
2390        #[cfg(not(target_os = "linux"))]
2391        {
2392            0
2393        }
2394    }
2395
2396    /// Serialize entries and write them to page cache via BufWriter.
2397    ///
2398    /// Does NOT call fsync — the caller (wal_writer_thread_main) is responsible
2399    /// for durability via io_uring FSYNC or sync_data().
2400    #[cfg_attr(feature = "rsm-state", allow(dead_code))]
2401    fn append_batch_no_sync(
2402        &mut self,
2403        entries: &[JournalEntry],
2404    ) -> Result<Vec<WalRecordWithOffset>, String> {
2405        if entries.is_empty() {
2406            return Ok(Vec::new());
2407        }
2408
2409        // Ensure we have pre-allocated space to avoid extent allocation stalls.
2410        self.ensure_preallocated();
2411
2412        let mut out = Vec::with_capacity(entries.len());
2413
2414        let mut writer = BufWriter::new(&self.file);
2415
2416        for entry in entries {
2417            let record = WalJournalRecord::from_entry(entry);
2418            let payload = rmp_serde::to_vec_named(&record).map_err(|e| {
2419                format!(
2420                    "failed to serialize WAL record for {}: {}",
2421                    entry.request_uuid, e
2422                )
2423            })?;
2424
2425            write_frame(
2426                &mut writer,
2427                &payload,
2428                &mut self.next_offset,
2429                &format!("record for {}", entry.request_uuid),
2430            )?;
2431
2432            out.push(WalRecordWithOffset {
2433                record,
2434                end_offset: self.next_offset,
2435                #[cfg(feature = "rsm-state")]
2436                block_header: None,
2437                #[cfg(feature = "rsm-state")]
2438                block_roots: None,
2439            });
2440        }
2441
2442        writer
2443            .flush()
2444            .map_err(|e| format!("failed to flush WAL BufWriter: {}", e))?;
2445
2446        Ok(out)
2447    }
2448
2449    /// Append a self-contained validator RSM batch record.
2450    #[cfg(feature = "rsm-state")]
2451    fn append_block<T: Serialize>(&mut self, block: &T) -> Result<u64, String> {
2452        self.ensure_preallocated();
2453
2454        let payload = rmp_serde::to_vec_named(block)
2455            .map_err(|e| format!("failed to serialize WAL block: {}", e))?;
2456
2457        let mut writer = BufWriter::new(&self.file);
2458        write_frame(&mut writer, &payload, &mut self.next_offset, "block")?;
2459        writer
2460            .flush()
2461            .map_err(|e| format!("failed to flush WAL BufWriter: {}", e))?;
2462
2463        Ok(self.next_offset)
2464    }
2465
2466    /// Convenience method for tests: write + sync in one call.
2467    #[cfg(test)]
2468    fn append_batch(
2469        &mut self,
2470        entries: &[JournalEntry],
2471    ) -> Result<Vec<WalRecordWithOffset>, String> {
2472        let result = self.append_batch_no_sync(entries)?;
2473        self.file.sync_data().map_err(|e| {
2474            format!(
2475                "test sync_data failed for WAL {}: {}",
2476                self.wal_path.display(),
2477                e
2478            )
2479        })?;
2480        Ok(result)
2481    }
2482
2483    #[cfg(test)]
2484    fn read_unreplicated_records(
2485        wal_path: &Path,
2486        checkpoint_path: &Path,
2487    ) -> Result<Vec<WalRecordWithOffset>, String> {
2488        if !wal_path.exists() {
2489            return Ok(Vec::new());
2490        }
2491
2492        let checkpoint = read_checkpoint(checkpoint_path)?;
2493        let mut file = OpenOptions::new().read(true).open(wal_path).map_err(|e| {
2494            format!(
2495                "failed to open WAL {} for replay: {}",
2496                wal_path.display(),
2497                e
2498            )
2499        })?;
2500
2501        let file_len = file
2502            .metadata()
2503            .map_err(|e| format!("failed to stat WAL {}: {}", wal_path.display(), e))?
2504            .len();
2505
2506        if checkpoint.wal_offset > file_len {
2507            return Err(format!(
2508                "WAL checkpoint {} exceeds file length {} for {}",
2509                checkpoint.wal_offset,
2510                file_len,
2511                wal_path.display()
2512            ));
2513        }
2514
2515        file.seek(SeekFrom::Start(checkpoint.wal_offset))
2516            .map_err(|e| format!("failed to seek WAL replay start: {}", e))?;
2517
2518        let mut offset = checkpoint.wal_offset;
2519        let mut out = Vec::new();
2520
2521        loop {
2522            let mut len_buf = [0u8; 4];
2523            match file.read_exact(&mut len_buf) {
2524                Ok(()) => {}
2525                Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break,
2526                Err(e) => {
2527                    return Err(format!("failed reading WAL length prefix: {}", e));
2528                }
2529            }
2530
2531            let payload_len = u32::from_le_bytes(len_buf) as usize;
2532            if payload_len == 0 {
2533                panic!(
2534                    "CRITICAL_FAILURE: WAL record length was zero at offset {} in {}. Data corruption detected.",
2535                    offset,
2536                    wal_path.display()
2537                );
2538            }
2539            offset += 4;
2540
2541            // Guard against corrupt length prefix that could OOM the process.
2542            let remaining = file_len.saturating_sub(offset);
2543            if payload_len as u64 > remaining {
2544                warn!(
2545                    "WAL declared payload_len={} but only {} bytes remain at offset {} in {}, \
2546                     truncating tail and replaying prior durable records only",
2547                    payload_len,
2548                    remaining,
2549                    offset,
2550                    wal_path.display(),
2551                );
2552                break;
2553            }
2554
2555            let mut crc_buf = [0u8; 4];
2556            match file.read_exact(&mut crc_buf) {
2557                Ok(()) => {}
2558                Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
2559                    warn!(
2560                        "WAL ended with incomplete CRC header at offset {} in {} ({}), \
2561                         truncating tail and replaying prior durable records only",
2562                        offset,
2563                        wal_path.display(),
2564                        e
2565                    );
2566                    break;
2567                }
2568                Err(e) => return Err(format!("failed reading WAL crc prefix: {}", e)),
2569            }
2570            offset += 4;
2571
2572            let expected_crc = u32::from_le_bytes(crc_buf);
2573            let mut payload = vec![0u8; payload_len];
2574
2575            match file.read_exact(&mut payload) {
2576                Ok(()) => {}
2577                Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
2578                    warn!(
2579                        "WAL ended with incomplete payload at offset {} in {} ({}), \
2580                         truncating tail and replaying prior durable records only",
2581                        offset,
2582                        wal_path.display(),
2583                        e
2584                    );
2585                    break;
2586                }
2587                Err(e) => return Err(format!("failed reading WAL payload: {}", e)),
2588            }
2589            offset += payload_len as u64;
2590
2591            let actual_crc = WAL_CRC.checksum(&payload);
2592            if actual_crc != expected_crc {
2593                panic!(
2594                    "CRITICAL_FAILURE: WAL CRC mismatch at end_offset {} in {}: expected {}, got {}. Data corruption detected.",
2595                    offset,
2596                    wal_path.display(),
2597                    expected_crc,
2598                    actual_crc
2599                );
2600            }
2601
2602            #[cfg(feature = "rsm-state")]
2603            if let Ok(block_entry) = rmp_serde::from_slice::<WalDurableBlockEntry>(&payload) {
2604                let header = block_entry.header;
2605                let roots = block_entry.roots;
2606                for (index, cmd) in block_entry.commands.into_iter().enumerate() {
2607                    out.push(WalRecordWithOffset {
2608                        record: cmd,
2609                        end_offset: offset,
2610                        block_header: (index == 0).then_some(header.clone()),
2611                        block_roots: (index == 0).then_some(roots),
2612                    });
2613                }
2614                continue;
2615            }
2616
2617            #[cfg(feature = "rsm-state")]
2618            if rmp_serde::from_slice::<WalBlockEntry>(&payload).is_ok() {
2619                panic!(
2620                    "CRITICAL_FAILURE: legacy rootless RSM WAL block at end_offset {} is unsupported after durable root cutover",
2621                    offset
2622                );
2623            }
2624
2625            if let Ok(record) = rmp_serde::from_slice::<WalJournalRecord>(&payload) {
2626                out.push(WalRecordWithOffset {
2627                    record,
2628                    end_offset: offset,
2629                    #[cfg(feature = "rsm-state")]
2630                    block_header: None,
2631                    #[cfg(feature = "rsm-state")]
2632                    block_roots: None,
2633                });
2634                continue;
2635            }
2636
2637            #[cfg(feature = "rsm-state")]
2638            if rmp_serde::from_slice::<WalBlockRecord>(&payload).is_ok() {
2639                continue;
2640            }
2641
2642            return Err(format!(
2643                "failed to deserialize WAL payload at end_offset {}",
2644                offset
2645            ));
2646        }
2647
2648        Ok(out)
2649    }
2650}
2651
2652/// Submit `IORING_OP_FSYNC` with `DATASYNC` flag via io_uring.
2653///
2654/// Only the fsync uses io_uring — writes go through regular BufWriter to page cache.
2655/// io_uring FSYNC flushes page cache to storage without blocking the thread on a
2656/// syscall (the kernel does the work asynchronously and we wait on the CQE).
2657#[cfg(target_os = "linux")]
2658fn wal_fsync_io_uring(ring: &mut io_uring::IoUring, file: &File) -> Result<(), String> {
2659    let fd = io_uring::types::Fd(file.as_raw_fd());
2660    let fsync_e = io_uring::opcode::Fsync::new(fd)
2661        .flags(io_uring::types::FsyncFlags::DATASYNC)
2662        .build()
2663        .user_data(0x42);
2664
2665    unsafe {
2666        ring.submission()
2667            .push(&fsync_e)
2668            .map_err(|e| format!("io_uring submission push failed: {:?}", e))?;
2669    }
2670
2671    ring.submit_and_wait(1)
2672        .map_err(|e| format!("io_uring submit_and_wait failed: {}", e))?;
2673
2674    let mut cq = ring.completion();
2675    cq.sync();
2676    let cqe = cq
2677        .next()
2678        .ok_or_else(|| "io_uring completion queue empty after submit_and_wait".to_string())?;
2679
2680    if cqe.result() < 0 {
2681        return Err(format!(
2682            "io_uring FSYNC failed with errno {}",
2683            -cqe.result()
2684        ));
2685    }
2686
2687    Ok(())
2688}
2689
2690/// Dedicated WAL writer OS thread.
2691///
2692/// Owns `WalState` exclusively (no Arc/Mutex). Receives batches from the batcher
2693/// task, writes to page cache, performs fsync (io_uring on Linux, sync_data elsewhere),
2694/// fires durability acks, and enqueues records for async replication.
2695fn wal_writer_thread_main(
2696    mut wal_state: WalState,
2697    rx: std::sync::mpsc::Receiver<WalWriterMessage>,
2698    replication_tx: mpsc::Sender<ReplicationMessage>,
2699    #[cfg(feature = "rsm-state")] mut rsm_state: RsmWalWriterState,
2700) {
2701    // Try to create io_uring ring. Falls back to sync_data() if unavailable
2702    // (e.g. seccomp-restricted containers, older kernels).
2703    #[cfg(target_os = "linux")]
2704    let mut ring: Option<io_uring::IoUring> = match io_uring::IoUring::new(8) {
2705        Ok(r) => {
2706            info!("WAL writer using io_uring for DATASYNC");
2707            Some(r)
2708        }
2709        Err(e) => {
2710            warn!(
2711                "io_uring unavailable ({}), WAL writer falling back to blocking fdatasync",
2712                e
2713            );
2714            None
2715        }
2716    };
2717
2718    /// Perform fdatasync on the WAL file using the best available method.
2719    #[cfg(target_os = "linux")]
2720    fn do_fsync(ring: &mut Option<io_uring::IoUring>, file: &File, wal_path: &Path) {
2721        if let Some(ref mut r) = ring {
2722            if let Err(e) = wal_fsync_io_uring(r, file) {
2723                // io_uring FSYNC failed at runtime. Disable and fall back to
2724                // blocking fdatasync rather than crashing the engine.
2725                warn!(
2726                    "io_uring FSYNC failed ({}), disabling io_uring and falling back to fdatasync",
2727                    e
2728                );
2729                counter!("ht_journal_wal_io_uring_fallbacks").increment(1);
2730                *ring = None;
2731                // Fall through to sync_data below
2732            } else {
2733                return;
2734            }
2735        }
2736        if let Err(e) = file.sync_data() {
2737            if let Some(parent) = wal_path.parent() {
2738                WalState::emit_disk_space_metrics(parent);
2739            }
2740            panic!(
2741                "CRITICAL_FAILURE: fdatasync failed for WAL {}: {}. Engine must stop.",
2742                wal_path.display(),
2743                e
2744            );
2745        }
2746    }
2747
2748    #[cfg(not(target_os = "linux"))]
2749    fn do_fsync(file: &File, wal_path: &Path) {
2750        if let Err(e) = file.sync_data() {
2751            if let Some(parent) = wal_path.parent() {
2752                WalState::emit_disk_space_metrics(parent);
2753            }
2754            panic!(
2755                "CRITICAL_FAILURE: fdatasync failed for WAL {}: {}. Engine must stop.",
2756                wal_path.display(),
2757                e
2758            );
2759        }
2760    }
2761
2762    loop {
2763        match rx.recv() {
2764            Ok(WalWriterMessage::WriteBatch {
2765                entries,
2766                ack_senders,
2767                completion,
2768            }) => {
2769                let wal_start = Instant::now();
2770                let entry_count = entries.len();
2771
2772                #[cfg(feature = "rsm-state")]
2773                let prepared_block = rsm_state.prepare_block(&entries);
2774
2775                #[cfg(feature = "rsm-state")]
2776                let append_result = if let Some(prepared) = prepared_block.as_ref() {
2777                    let commands: Vec<WalJournalRecord> =
2778                        entries.iter().map(WalJournalRecord::from_entry).collect();
2779                    let block_entry = WalDurableBlockEntry {
2780                        header: prepared.block.header.clone(),
2781                        roots: prepared.block.roots,
2782                        commands,
2783                    };
2784                    wal_state.append_block(&block_entry).map(|end_offset| {
2785                        let header = block_entry.header.clone();
2786                        let roots = block_entry.roots;
2787                        block_entry
2788                            .commands
2789                            .into_iter()
2790                            .enumerate()
2791                            .map(|(index, cmd)| WalRecordWithOffset {
2792                                record: cmd,
2793                                end_offset,
2794                                block_header: (index == 0).then_some(header.clone()),
2795                                block_roots: (index == 0).then_some(roots),
2796                            })
2797                            .collect::<Vec<_>>()
2798                    })
2799                } else {
2800                    wal_state.append_batch_no_sync(&entries)
2801                };
2802
2803                #[cfg(not(feature = "rsm-state"))]
2804                let append_result = wal_state.append_batch_no_sync(&entries);
2805
2806                match append_result {
2807                    Ok(wal_records) => {
2808                        #[cfg(feature = "rsm-state")]
2809                        let mut wal_records = wal_records;
2810
2811                        #[cfg(target_os = "linux")]
2812                        do_fsync(&mut ring, &wal_state.file, &wal_state.wal_path);
2813                        #[cfg(not(target_os = "linux"))]
2814                        do_fsync(&wal_state.file, &wal_state.wal_path);
2815
2816                        #[cfg(feature = "rsm-state")]
2817                        if let Some(prepared) = prepared_block {
2818                            let applied_block = rsm_state.apply_prepared_block(prepared);
2819                            for record in wal_records.iter_mut() {
2820                                if record.block_header.is_some() {
2821                                    record.block_header = Some(applied_block.header.clone());
2822                                    record.block_roots = Some(applied_block.roots);
2823                                    break;
2824                                }
2825                            }
2826                        }
2827
2828                        histogram!("ht_journal_wal_append_seconds")
2829                            .record(wal_start.elapsed().as_secs_f64());
2830                        counter!("ht_journal_wal_batches_total").increment(1);
2831                        counter!("ht_journal_wal_entries_total").increment(entry_count as u64);
2832                        gauge!("ht_wal_file_size_bytes").set(wal_state.next_offset as f64);
2833
2834                        // Update volume disk space gauges on every write
2835                        if let Some(parent) = wal_state.wal_path.parent() {
2836                            WalState::emit_disk_space_metrics(parent);
2837                        }
2838
2839                        // Durability boundary reached: entries are on disk. Fire acks.
2840                        for tx in ack_senders.into_iter().flatten() {
2841                            let _ = tx.send(());
2842                        }
2843
2844                        // Enqueue for async PostgreSQL replication.
2845                        if let Err(e) =
2846                            replication_tx.blocking_send(ReplicationMessage::Entries(wal_records))
2847                        {
2848                            // The WAL writer is shut down before the replication
2849                            // channel during graceful shutdown, so if we reach
2850                            // here the replication task has crashed unexpectedly.
2851                            error!(
2852                                "CRITICAL_FAILURE: Replication channel closed unexpectedly \
2853                                 (replication task likely crashed): {}. Aborting process.",
2854                                e
2855                            );
2856                            std::process::abort();
2857                        }
2858
2859                        // Signal completion if requested (Flush callers await this).
2860                        if let Some(tx) = completion {
2861                            let _ = tx.send(());
2862                        }
2863                    }
2864                    Err(e) => {
2865                        if let Some(parent) = wal_state.wal_path.parent() {
2866                            WalState::emit_disk_space_metrics(parent);
2867                        }
2868                        panic!(
2869                            "CRITICAL_FAILURE: WAL append failed: {}. WAL size={} bytes. Engine must stop.",
2870                            e, wal_state.next_offset
2871                        );
2872                    }
2873                }
2874            }
2875            Ok(WalWriterMessage::FlushBarrier { completion }) => {
2876                // Barrier processed in FIFO order — all prior WriteBatches
2877                // have already been fsynced by the time we get here.
2878                let _ = completion.send(());
2879            }
2880            Ok(WalWriterMessage::Shutdown) | Err(_) => {
2881                info!("WAL writer thread shutting down");
2882                break;
2883            }
2884        }
2885    }
2886}
2887
2888/// Shared sender that can be cloned and used from the engine.
2889pub type SharedJournalBatchSender = Arc<JournalBatchSender>;
2890
2891#[cfg(test)]
2892mod tests {
2893    use super::*;
2894    use diesel::RunQueryDsl;
2895
2896    #[test]
2897    fn test_config_defaults() {
2898        let config = JournalBatcherConfig::default();
2899        assert_eq!(config.channel_capacity, DEFAULT_CHANNEL_CAPACITY);
2900        assert_eq!(config.max_batch_size, 100);
2901        assert_eq!(config.flush_interval_ms, 10);
2902    }
2903
2904    #[test]
2905    fn engine_journal_insert_projection_preserves_transactional_side_effect_inputs() {
2906        let request_uuid = hypercall_db_diesel::engine_enums::DbUuid(uuid::Uuid::new_v4());
2907        let wallet = hypercall_types::WalletAddress::from([1u8; 20]);
2908        let account = hypercall_types::WalletAddress::from([2u8; 20]);
2909        let signer = hypercall_types::WalletAddress::from([3u8; 20]);
2910        let destination = hypercall_types::WalletAddress::from([4u8; 20]);
2911
2912        let outbox_append = hypercall_db::DirectiveOutboxAppend::needs_rsm_signature(
2913            "directive-1".to_string(),
2914            hypercall_types::directives::ActionKey::SystemWithdrawToken,
2915            wallet,
2916            account,
2917            signer,
2918            17,
2919            "idem-1".to_string(),
2920            vec![10, 11, 12],
2921            1234,
2922            Some(5678),
2923        );
2924        let fill_accounting = hypercall_types::FillAccounting {
2925            trade_id: 42,
2926            taker_realized_pnl: Decimal::new(-125, 2),
2927            maker_realized_pnl: Decimal::new(125, 2),
2928            taker_premium_delta: Decimal::new(-7, 2),
2929            maker_premium_delta: Decimal::new(7, 2),
2930            taker_net_cash_delta: Decimal::new(-132, 2),
2931            maker_net_cash_delta: Decimal::new(132, 2),
2932        };
2933        let fill_side_effect = JournalFillSideEffect::from_fill_accounting(0, &fill_accounting);
2934        let cash_withdrawal_side_effect = JournalCashWithdrawalSideEffect {
2935            wallet: destination,
2936            request_id: "cash-withdrawal-1".to_string(),
2937            amount: Decimal::new(2500, 2),
2938            balance_after: Decimal::new(7500, 2),
2939            timestamp_ms: 9999,
2940        };
2941        let balance_update = hypercall_types::BalanceUpdate {
2942            balance_update_seq: 7,
2943            wallet,
2944            delta: Decimal::new(2500, 2),
2945            balance_after: Decimal::new(7500, 2),
2946            reason: hypercall_types::BalanceUpdateReason::Deposit,
2947            reference_id: Some("deposit-1".to_string()),
2948            source_command_id: None,
2949            timestamp_ms: 9999,
2950        };
2951        let entry = JournalEntry {
2952            request_uuid,
2953            received_ts_ms: 1000,
2954            command_data: vec![1, 2, 3],
2955            response_data: Some(vec![4, 5]),
2956            order_id: Some(9),
2957            pre_digest: EngineStateDigest {
2958                next_order_id: 10,
2959                next_trade_id: 20,
2960                l2_seq: 30,
2961                symbols: Default::default(),
2962            },
2963            post_digest: EngineStateDigest {
2964                next_order_id: 11,
2965                next_trade_id: 21,
2966                l2_seq: 31,
2967                symbols: Default::default(),
2968            },
2969            duration_ms: 6,
2970            events: vec![EventPayload {
2971                event_topic: "order-updates".to_string(),
2972                event_key: Some("order-9".to_string()),
2973                event_data: vec![0, 1, 2],
2974                l2_sequence: Some(44),
2975                event_type_enum: hypercall_db_diesel::engine_enums::EventType::OrderUpdate,
2976            }],
2977            outbox_appends: vec![outbox_append.clone()],
2978            fill_side_effects: vec![fill_side_effect.clone()],
2979            cash_withdrawal_side_effect: Some(cash_withdrawal_side_effect.clone()),
2980            balance_updates: vec![balance_update.clone()],
2981            created_at: Instant::now(),
2982            commit_ack: None,
2983            command_type_enum: Some(hypercall_db_diesel::engine_enums::CommandType::CreateOrder),
2984            #[cfg(feature = "rsm-state")]
2985            command_identity_hash: [5u8; 32],
2986            #[cfg(feature = "rsm-state")]
2987            rsm_state_digest: Some(crate::rsm::engine_snapshot::EngineStateDigest::empty()),
2988        };
2989
2990        let projected = engine_journal_entry_insert(&entry);
2991
2992        assert_eq!(projected.request_uuid, request_uuid.0);
2993        assert_eq!(projected.received_ts_ms, entry.received_ts_ms);
2994        assert_eq!(projected.command_data, entry.command_data);
2995        assert_eq!(projected.response_data, entry.response_data);
2996        assert_eq!(projected.order_id, entry.order_id);
2997        assert_eq!(projected.command_type.as_deref(), Some("CreateOrder"));
2998        assert_eq!(projected.duration_ms, entry.duration_ms);
2999        assert!(!projected.pre_digest_data.is_empty());
3000        assert!(!projected.post_digest_data.is_empty());
3001        assert_ne!(projected.pre_digest_data, projected.post_digest_data);
3002
3003        assert_eq!(projected.events.len(), 1);
3004        assert_eq!(projected.events[0].event_topic, "order-updates");
3005        assert_eq!(projected.events[0].event_key.as_deref(), Some("order-9"));
3006        assert_eq!(projected.events[0].event_data, vec![0, 1, 2]);
3007        assert_eq!(projected.events[0].l2_sequence, Some(44));
3008        assert_eq!(
3009            projected.events[0].event_type,
3010            hypercall_db::EventType::OrderUpdate
3011        );
3012
3013        assert_eq!(projected.outbox_appends.len(), 1);
3014        assert_eq!(
3015            projected.outbox_appends[0].directive_id,
3016            outbox_append.directive_id
3017        );
3018        assert_eq!(projected.outbox_appends[0].nonce, outbox_append.nonce);
3019        assert_eq!(
3020            projected.outbox_appends[0].action_key,
3021            hypercall_types::directives::ActionKey::SystemWithdrawToken
3022        );
3023
3024        assert_eq!(projected.fill_side_effects.len(), 1);
3025        assert_eq!(
3026            projected.fill_side_effects[0].event_idx,
3027            fill_side_effect.event_idx
3028        );
3029        assert_eq!(
3030            projected.fill_side_effects[0].trade_id,
3031            fill_side_effect.trade_id
3032        );
3033        assert_eq!(
3034            projected.fill_side_effects[0].taker_ledger_delta,
3035            fill_side_effect.taker_ledger_delta
3036        );
3037        assert_eq!(
3038            projected.fill_side_effects[0].maker_premium_delta,
3039            fill_side_effect.maker_premium_delta
3040        );
3041
3042        let projected_cash = projected
3043            .cash_withdrawal_side_effect
3044            .expect("cash withdrawal side effect should be projected");
3045        assert_eq!(projected_cash.wallet, cash_withdrawal_side_effect.wallet);
3046        assert_eq!(
3047            projected_cash.request_id,
3048            cash_withdrawal_side_effect.request_id
3049        );
3050        assert_eq!(projected_cash.amount, cash_withdrawal_side_effect.amount);
3051        assert_eq!(
3052            projected_cash.balance_after,
3053            cash_withdrawal_side_effect.balance_after
3054        );
3055        assert_eq!(
3056            projected_cash.timestamp_ms,
3057            cash_withdrawal_side_effect.timestamp_ms
3058        );
3059
3060        assert_eq!(projected.balance_updates.len(), 1);
3061        assert_eq!(projected.balance_updates[0].update, balance_update);
3062    }
3063
3064    #[test]
3065    fn wal_roundtrip_single_record() {
3066        let dir = tempfile::tempdir().expect("create tempdir");
3067        let wal_path = dir.path().join("journal.wal");
3068
3069        let mut wal = WalState::open_at(wal_path).expect("open wal");
3070        let request_uuid = hypercall_db_diesel::engine_enums::DbUuid(uuid::Uuid::new_v4());
3071        let balance_update = hypercall_types::BalanceUpdate {
3072            balance_update_seq: 3,
3073            wallet: hypercall_types::WalletAddress::from([9u8; 20]),
3074            delta: Decimal::new(-125, 2),
3075            balance_after: Decimal::new(875, 2),
3076            reason: hypercall_types::BalanceUpdateReason::Withdrawal,
3077            reference_id: Some("withdrawal-1".to_string()),
3078            source_command_id: None,
3079            timestamp_ms: 321,
3080        };
3081
3082        let entry = JournalEntry {
3083            request_uuid,
3084            received_ts_ms: 123,
3085            command_data: vec![1, 2, 3],
3086            response_data: Some(vec![4, 5, 6]),
3087            order_id: Some(42),
3088            pre_digest: EngineStateDigest::default(),
3089            post_digest: EngineStateDigest::default(),
3090            duration_ms: 7,
3091            events: vec![EventPayload {
3092                event_type_enum: hypercall_db_diesel::engine_enums::EventType::OrderUpdate,
3093                event_topic: "order-updates".to_string(),
3094                event_key: Some("k".to_string()),
3095                event_data: vec![1, 2, 3],
3096                l2_sequence: Some(1),
3097            }],
3098            outbox_appends: vec![],
3099            fill_side_effects: vec![],
3100            cash_withdrawal_side_effect: None,
3101            balance_updates: vec![balance_update.clone()],
3102            created_at: Instant::now(),
3103            commit_ack: None,
3104            command_type_enum: Some(hypercall_db_diesel::engine_enums::CommandType::CreateOrder),
3105            #[cfg(feature = "rsm-state")]
3106            command_identity_hash: [0u8; 32],
3107            #[cfg(feature = "rsm-state")]
3108            rsm_state_digest: Some(crate::rsm::engine_snapshot::EngineStateDigest::empty()),
3109        };
3110
3111        let offsets = wal.append_batch(&[entry]).expect("append wal batch");
3112        assert_eq!(offsets.len(), 1);
3113
3114        write_checkpoint(
3115            &wal.checkpoint_path,
3116            WalCheckpointMetadata {
3117                wal_offset: 0,
3118                last_command_id: 0,
3119                last_l2_seq: 0,
3120            },
3121        )
3122        .expect("write checkpoint");
3123        let replayed = WalState::read_unreplicated_records(&wal.wal_path, &wal.checkpoint_path)
3124            .expect("replay");
3125        assert_eq!(replayed.len(), 1);
3126        assert_eq!(replayed[0].record.request_uuid, request_uuid);
3127        assert_eq!(
3128            replayed[0].record.balance_updates,
3129            vec![balance_update.clone()]
3130        );
3131        assert_eq!(replayed[0].to_db_entry().balance_updates[0], balance_update);
3132    }
3133
3134    #[test]
3135    fn wal_checkpoint_prevents_double_replay() {
3136        let dir = tempfile::tempdir().expect("create tempdir");
3137        let wal_path = dir.path().join("journal.wal");
3138
3139        let mut wal = WalState::open_at(wal_path).expect("open wal");
3140
3141        // Write 3 records
3142        let entries: Vec<JournalEntry> = (0..3)
3143            .map(|i| JournalEntry {
3144                request_uuid: hypercall_db_diesel::engine_enums::DbUuid(uuid::Uuid::new_v4()),
3145                received_ts_ms: 100 + i,
3146                command_data: vec![i as u8],
3147                response_data: None,
3148                order_id: Some(i as i64),
3149                pre_digest: EngineStateDigest::default(),
3150                post_digest: EngineStateDigest::default(),
3151                duration_ms: 1,
3152                events: vec![],
3153                outbox_appends: vec![],
3154                fill_side_effects: vec![],
3155                cash_withdrawal_side_effect: None,
3156                balance_updates: Vec::new(),
3157                created_at: Instant::now(),
3158                commit_ack: None,
3159                command_type_enum: Some(
3160                    hypercall_db_diesel::engine_enums::CommandType::CreateOrder,
3161                ),
3162                #[cfg(feature = "rsm-state")]
3163                command_identity_hash: [0u8; 32],
3164                #[cfg(feature = "rsm-state")]
3165                rsm_state_digest: Some(crate::rsm::engine_snapshot::EngineStateDigest::empty()),
3166            })
3167            .collect();
3168
3169        let offsets = wal.append_batch(&entries).expect("append batch");
3170        assert_eq!(offsets.len(), 3);
3171
3172        // No checkpoint: all 3 records should be returned
3173        let replayed = WalState::read_unreplicated_records(&wal.wal_path, &wal.checkpoint_path)
3174            .expect("replay all");
3175        assert_eq!(replayed.len(), 3);
3176
3177        // Write checkpoint at end of last record
3178        let last_offset = offsets.last().unwrap().end_offset;
3179        write_checkpoint(
3180            &wal.checkpoint_path,
3181            WalCheckpointMetadata {
3182                wal_offset: last_offset,
3183                last_command_id: 0,
3184                last_l2_seq: 0,
3185            },
3186        )
3187        .expect("write checkpoint");
3188
3189        // After checkpoint: 0 records should be returned (all already replicated)
3190        let replayed = WalState::read_unreplicated_records(&wal.wal_path, &wal.checkpoint_path)
3191            .expect("replay after checkpoint");
3192        assert_eq!(replayed.len(), 0, "Checkpoint should prevent double replay");
3193
3194        // Partial checkpoint: checkpoint at end of record 1
3195        let partial_offset = offsets[0].end_offset;
3196        write_checkpoint(
3197            &wal.checkpoint_path,
3198            WalCheckpointMetadata {
3199                wal_offset: partial_offset,
3200                last_command_id: 0,
3201                last_l2_seq: 0,
3202            },
3203        )
3204        .expect("write partial checkpoint");
3205
3206        let replayed = WalState::read_unreplicated_records(&wal.wal_path, &wal.checkpoint_path)
3207            .expect("replay after partial checkpoint");
3208        assert_eq!(
3209            replayed.len(),
3210            2,
3211            "Partial checkpoint should replay only unreplicated records"
3212        );
3213    }
3214
3215    #[test]
3216    #[should_panic(expected = "CRITICAL_FAILURE: WAL record length was zero")]
3217    fn wal_replay_panics_on_zero_length_record() {
3218        let dir = tempfile::tempdir().expect("create tempdir");
3219        let wal_path = dir.path().join("journal.wal");
3220        let checkpoint_path = checkpoint_path_for_wal(&wal_path);
3221
3222        std::fs::write(&wal_path, 0u32.to_le_bytes()).expect("write malformed wal");
3223        WalState::read_unreplicated_records(&wal_path, &checkpoint_path)
3224            .expect("zero-length record must panic before returning");
3225    }
3226
3227    #[test]
3228    #[should_panic(expected = "CRITICAL_FAILURE: WAL CRC mismatch")]
3229    fn wal_replay_panics_on_crc_mismatch() {
3230        let dir = tempfile::tempdir().expect("create tempdir");
3231        let wal_path = dir.path().join("journal.wal");
3232        let checkpoint_path = checkpoint_path_for_wal(&wal_path);
3233
3234        let mut bytes = Vec::new();
3235        bytes.extend_from_slice(&1u32.to_le_bytes());
3236        bytes.extend_from_slice(&0u32.to_le_bytes());
3237        bytes.push(0xAB);
3238        std::fs::write(&wal_path, bytes).expect("write malformed wal");
3239
3240        WalState::read_unreplicated_records(&wal_path, &checkpoint_path)
3241            .expect("crc mismatch must panic before returning");
3242    }
3243
3244    #[test]
3245    fn wal_preallocation_extends_on_demand() {
3246        let dir = tempfile::tempdir().expect("create tempdir");
3247        let wal_path = dir.path().join("journal.wal");
3248
3249        let mut wal = WalState::open_at(wal_path).expect("open wal");
3250
3251        // Initially preallocated_end == next_offset == 0
3252        assert_eq!(wal.preallocated_end, 0);
3253
3254        // ensure_preallocated should attempt to extend (may be no-op on non-Linux/tmpfs)
3255        let allocated = wal.ensure_preallocated();
3256
3257        #[cfg(target_os = "linux")]
3258        {
3259            // On Linux with a real filesystem, fallocate should succeed.
3260            // On tmpfs (CI), it may fail gracefully and return 0.
3261            // Either way, no panic.
3262            let _ = allocated;
3263        }
3264
3265        #[cfg(not(target_os = "linux"))]
3266        {
3267            assert_eq!(allocated, 0, "Non-Linux should no-op");
3268        }
3269    }
3270
3271    #[cfg(not(feature = "rsm-state"))]
3272    #[test]
3273    fn wal_writer_thread_shutdown() {
3274        let dir = tempfile::tempdir().expect("create tempdir");
3275        let wal_path = dir.path().join("journal.wal");
3276
3277        let wal_state = WalState::open_at(wal_path).expect("open wal");
3278        let (wal_writer_tx, wal_writer_rx) = std::sync::mpsc::sync_channel(16);
3279        let (replication_tx, mut replication_rx) = mpsc::channel(16);
3280
3281        let handle = std::thread::Builder::new()
3282            .name("test-wal-writer".to_string())
3283            .spawn(move || {
3284                wal_writer_thread_main(
3285                    wal_state,
3286                    wal_writer_rx,
3287                    replication_tx,
3288                    #[cfg(feature = "rsm-state")]
3289                    test_rsm_writer_state(),
3290                );
3291            })
3292            .expect("spawn test wal-writer thread");
3293
3294        // Send a batch
3295        let entry = JournalEntry {
3296            request_uuid: hypercall_db_diesel::engine_enums::DbUuid(uuid::Uuid::new_v4()),
3297            received_ts_ms: 1,
3298            command_data: vec![1],
3299            response_data: None,
3300            order_id: None,
3301            pre_digest: EngineStateDigest::default(),
3302            post_digest: EngineStateDigest::default(),
3303            duration_ms: 0,
3304            events: vec![],
3305            outbox_appends: vec![],
3306            fill_side_effects: vec![],
3307            cash_withdrawal_side_effect: None,
3308            balance_updates: Vec::new(),
3309            created_at: Instant::now(),
3310            commit_ack: None,
3311            command_type_enum: Some(hypercall_db_diesel::engine_enums::CommandType::CreateOrder),
3312            #[cfg(feature = "rsm-state")]
3313            command_identity_hash: [0u8; 32],
3314            #[cfg(feature = "rsm-state")]
3315            rsm_state_digest: Some(crate::rsm::engine_snapshot::EngineStateDigest::empty()),
3316        };
3317
3318        let (ack_tx, ack_rx) = oneshot::channel();
3319        let (complete_tx, complete_rx) = oneshot::channel();
3320
3321        wal_writer_tx
3322            .send(WalWriterMessage::WriteBatch {
3323                entries: vec![entry],
3324                ack_senders: vec![Some(ack_tx)],
3325                completion: Some(complete_tx),
3326            })
3327            .expect("send to wal-writer");
3328
3329        // Wait for completion
3330        complete_rx.blocking_recv().expect("completion signal");
3331        // Ack should have been fired
3332        ack_rx.blocking_recv().expect("ack signal");
3333
3334        // Verify replication message was sent
3335        match replication_rx.blocking_recv() {
3336            Some(ReplicationMessage::Entries(records)) => {
3337                assert_eq!(records.len(), 1);
3338            }
3339            other => panic!("Expected Entries, got {:?}", other.is_some()),
3340        }
3341
3342        // Shutdown
3343        wal_writer_tx
3344            .send(WalWriterMessage::Shutdown)
3345            .expect("send shutdown");
3346        handle.join().expect("wal-writer thread join");
3347    }
3348
3349    // ---------------------------------------------------------------
3350    // WAL streaming recovery tests
3351    //
3352    // These exercise the WAL binary format (4-byte LE length, 4-byte LE CRC,
3353    // msgpack payload) via `read_unreplicated_records` which uses identical
3354    // parsing logic to `recover_wal_sync`.  The streaming variant additionally
3355    // batches records and writes them to PostgreSQL, but the I/O parsing is
3356    // shared.
3357    // ---------------------------------------------------------------
3358
3359    /// Helper: build a valid WAL record byte sequence for a single JournalEntry.
3360    fn wal_bytes_for_entry(entry: &JournalEntry) -> Vec<u8> {
3361        let record = WalJournalRecord::from_entry(entry);
3362        let payload = rmp_serde::to_vec_named(&record).expect("serialize wal record");
3363        let payload_len = payload.len() as u32;
3364        let crc = WAL_CRC.checksum(&payload);
3365
3366        let mut buf = Vec::with_capacity(8 + payload.len());
3367        buf.extend_from_slice(&payload_len.to_le_bytes());
3368        buf.extend_from_slice(&crc.to_le_bytes());
3369        buf.extend_from_slice(&payload);
3370        buf
3371    }
3372
3373    /// Helper: build a minimal JournalEntry with a given index for differentiation.
3374    fn make_test_entry(index: u8) -> JournalEntry {
3375        JournalEntry {
3376            request_uuid: hypercall_db_diesel::engine_enums::DbUuid(uuid::Uuid::new_v4()),
3377            received_ts_ms: 1000 + index as u64,
3378            command_data: vec![index],
3379            response_data: None,
3380            order_id: Some(index as i64),
3381            pre_digest: EngineStateDigest::default(),
3382            post_digest: EngineStateDigest::default(),
3383            duration_ms: 1,
3384            events: vec![],
3385            outbox_appends: vec![],
3386            fill_side_effects: vec![],
3387            cash_withdrawal_side_effect: None,
3388            balance_updates: Vec::new(),
3389            created_at: Instant::now(),
3390            commit_ack: None,
3391            command_type_enum: Some(hypercall_db_diesel::engine_enums::CommandType::CreateOrder),
3392            #[cfg(feature = "rsm-state")]
3393            command_identity_hash: [0u8; 32],
3394            #[cfg(feature = "rsm-state")]
3395            rsm_state_digest: Some(crate::rsm::engine_snapshot::EngineStateDigest::empty()),
3396        }
3397    }
3398
3399    #[cfg(feature = "rsm-state")]
3400    fn make_test_event(index: u8) -> EventPayload {
3401        EventPayload {
3402            event_topic: "test.topic".to_string(),
3403            event_key: Some(format!("key-{index}")),
3404            event_data: vec![index, index.wrapping_add(1)],
3405            l2_sequence: Some(index as i64),
3406            event_type_enum: hypercall_db_diesel::engine_enums::EventType::OrderUpdate,
3407        }
3408    }
3409
3410    #[cfg(feature = "rsm-state")]
3411    fn rsm_test_roots(entries: &[JournalEntry], identity_hashes: &[[u8; 32]]) -> RsmBatchRoots {
3412        let dir = tempfile::tempdir().expect("create tempdir");
3413        let store = crate::rsm::commitment_store::ValidatorRsmCommitmentStore::open(
3414            ValidatorRsmCommitmentStoreConfig {
3415                root_dir: dir.path().join("state"),
3416                prune_window: 100,
3417                pruning_enabled: false,
3418            },
3419        )
3420        .expect("open commitment store");
3421        let mut runtime = store.open_pipeline(0).expect("open pipeline");
3422        let delta = rsm_state_delta_from_entries(entries, identity_hashes, 0)
3423            .expect("materialize test delta");
3424        runtime.commit(&delta).expect("commit test delta").into()
3425    }
3426
3427    #[cfg(feature = "rsm-state")]
3428    fn make_test_rsm_entry(index: u8) -> JournalEntry {
3429        let mut entry = make_test_entry(index);
3430        entry.command_type_enum =
3431            Some(hypercall_db_diesel::engine_enums::CommandType::TickSnapshot);
3432        entry
3433    }
3434
3435    #[cfg(feature = "rsm-state")]
3436    fn make_test_rsm_block_records(
3437        batch_seq: u64,
3438        prev_block_hash: [u8; 32],
3439        first_seq: u64,
3440        entries: &[JournalEntry],
3441    ) -> Vec<WalRecordWithOffset> {
3442        let identity_hashes = engine_command_identity_hashes(entries);
3443        let roots = rsm_test_roots(entries, &identity_hashes);
3444        let header = WalBlockRecord::unsigned_from_identity_hashes(
3445            batch_seq,
3446            prev_block_hash,
3447            &identity_hashes,
3448            roots.batch_root,
3449            first_seq,
3450            1_700_000_000_000 + batch_seq,
3451            [0u8; 20],
3452        );
3453
3454        entries
3455            .iter()
3456            .enumerate()
3457            .map(|(index, entry)| WalRecordWithOffset {
3458                record: WalJournalRecord::from_entry(entry),
3459                end_offset: 100 + batch_seq,
3460                block_header: (index == 0).then_some(header.clone()),
3461                block_roots: (index == 0).then_some(roots),
3462            })
3463            .collect()
3464    }
3465
3466    #[test]
3467    fn wal_recovery_empty_wal_file() {
3468        // An empty WAL file (0 bytes) should recover zero records.
3469        let dir = tempfile::tempdir().expect("create tempdir");
3470        let wal_path = dir.path().join("journal.wal");
3471        let checkpoint_path = checkpoint_path_for_wal(&wal_path);
3472
3473        // Create empty WAL file.
3474        std::fs::write(&wal_path, b"").expect("write empty wal");
3475
3476        // No checkpoint file => defaults to offset 0.
3477        let records = WalState::read_unreplicated_records(&wal_path, &checkpoint_path)
3478            .expect("should recover empty wal");
3479        assert!(records.is_empty(), "empty WAL should yield zero records");
3480    }
3481
3482    #[test]
3483    fn replication_chunk_extends_to_keep_physical_wal_block_atomic() {
3484        let pending: Vec<WalRecordWithOffset> = [(0, 100), (1, 100), (2, 100), (3, 200), (4, 200)]
3485            .into_iter()
3486            .map(|(index, end_offset)| WalRecordWithOffset {
3487                record: WalJournalRecord::from_entry(&make_test_entry(index)),
3488                end_offset,
3489                #[cfg(feature = "rsm-state")]
3490                block_header: None,
3491                #[cfg(feature = "rsm-state")]
3492                block_roots: None,
3493            })
3494            .collect();
3495
3496        assert_eq!(next_replication_chunk_len(&pending, 2), 3);
3497        assert_eq!(next_replication_chunk_len(&pending[3..], 1), 2);
3498    }
3499
3500    #[test]
3501    fn wal_replay_defers_next_physical_record_to_cap_command_batch() {
3502        assert!(!should_defer_replay_record(0, 100, 10));
3503        assert!(!should_defer_replay_record(5, 5, 10));
3504        assert!(should_defer_replay_record(5, 6, 10));
3505        assert!(should_defer_replay_record(1, 1, 0));
3506    }
3507
3508    #[cfg(feature = "rsm-state")]
3509    #[test]
3510    fn block_commitment_excludes_non_engine_journal_entries() {
3511        let mut engine_entry = make_test_rsm_entry(1);
3512        engine_entry.command_identity_hash = [1u8; 32];
3513        engine_entry.events.push(make_test_event(1));
3514
3515        let mut passthrough_entry = make_test_entry(2);
3516        passthrough_entry.command_type_enum = None;
3517        passthrough_entry.command_identity_hash = [0u8; 32];
3518        passthrough_entry.events.push(make_test_event(2));
3519
3520        let hashes = engine_command_identity_hashes(&[engine_entry, passthrough_entry]);
3521
3522        assert_eq!(hashes, vec![[1u8; 32]]);
3523        assert_ne!(
3524            hypercall_blocks::compute_commands_hash(&hashes),
3525            hypercall_blocks::compute_commands_hash(&[[1u8; 32], [0u8; 32]])
3526        );
3527        let mut engine_only_entry = make_test_rsm_entry(1);
3528        engine_only_entry.command_identity_hash = [1u8; 32];
3529        engine_only_entry.events.push(make_test_event(1));
3530
3531        let mut mixed_engine_entry = make_test_rsm_entry(1);
3532        mixed_engine_entry.command_identity_hash = [1u8; 32];
3533        mixed_engine_entry.events.push(make_test_event(1));
3534
3535        let mut mixed_passthrough_entry = make_test_entry(2);
3536        mixed_passthrough_entry.command_type_enum = None;
3537        mixed_passthrough_entry.command_identity_hash = [0u8; 32];
3538        mixed_passthrough_entry.events.push(make_test_event(2));
3539
3540        assert_eq!(
3541            rsm_test_roots(&[engine_only_entry], &hashes).obligation_mmr_root,
3542            rsm_test_roots(&[mixed_engine_entry, mixed_passthrough_entry], &hashes)
3543                .obligation_mmr_root
3544        );
3545    }
3546
3547    #[cfg(feature = "rsm-state")]
3548    #[test]
3549    fn block_commitment_excludes_unsupported_rsm_commands() {
3550        let mut supported_entry = make_test_rsm_entry(1);
3551        supported_entry.command_identity_hash = [1u8; 32];
3552        supported_entry.events.push(make_test_event(1));
3553
3554        let mut agent_auth_entry = make_test_rsm_entry(2);
3555        agent_auth_entry.command_type_enum =
3556            Some(hypercall_db_diesel::engine_enums::CommandType::ApproveAgent);
3557        agent_auth_entry.command_identity_hash = [2u8; 32];
3558
3559        let hashes = engine_command_identity_hashes(&[supported_entry, agent_auth_entry]);
3560
3561        assert_eq!(hashes, vec![[1u8; 32]]);
3562    }
3563
3564    #[cfg(feature = "rsm-state")]
3565    #[test]
3566    fn rsm_batch_roots_are_deterministic_and_nonzero() {
3567        let mut entry = make_test_rsm_entry(1);
3568        entry.command_identity_hash = [9u8; 32];
3569        entry.events.push(make_test_event(1));
3570
3571        let roots = rsm_test_roots(&[entry], &[[9u8; 32]]);
3572        let mut same_entry = make_test_rsm_entry(1);
3573        same_entry.command_identity_hash = [9u8; 32];
3574        same_entry.events.push(make_test_event(1));
3575
3576        assert_ne!(roots.state_root, [0u8; 32]);
3577        assert_ne!(roots.command_mmr_root, [0u8; 32]);
3578        assert_ne!(roots.obligation_mmr_root, [0u8; 32]);
3579        assert_ne!(roots.intent_mmr_root, [0u8; 32]);
3580        assert_ne!(roots.batch_root, [0u8; 32]);
3581        assert_eq!(
3582            roots.batch_root,
3583            rsm_test_roots(&[same_entry], &[[9u8; 32]]).batch_root
3584        );
3585    }
3586
3587    #[cfg(feature = "rsm-state")]
3588    #[test]
3589    fn rsm_batch_roots_change_with_state_intents_and_obligations() {
3590        let mut entry = make_test_rsm_entry(1);
3591        entry.command_identity_hash = [9u8; 32];
3592        entry.events.push(make_test_event(1));
3593        let base = rsm_test_roots(&[entry], &[[9u8; 32]]);
3594
3595        let mut changed_state = make_test_rsm_entry(1);
3596        changed_state.command_identity_hash = [9u8; 32];
3597        changed_state.events.push(make_test_event(1));
3598        assert_ne!(
3599            base.state_root,
3600            rsm_state_delta_from_entries(&[changed_state], &[[9u8; 32]], 7)
3601                .and_then(|delta| {
3602                    let dir = tempfile::tempdir().unwrap();
3603                    let store = crate::rsm::commitment_store::ValidatorRsmCommitmentStore::open(
3604                        ValidatorRsmCommitmentStoreConfig {
3605                            root_dir: dir.path().join("state"),
3606                            prune_window: 100,
3607                            pruning_enabled: false,
3608                        },
3609                    )?;
3610                    Ok(store.open_pipeline(0)?.commit(&delta)?.state_root.0)
3611                })
3612                .unwrap()
3613        );
3614
3615        let mut same_entry = make_test_rsm_entry(1);
3616        same_entry.command_identity_hash = [9u8; 32];
3617        same_entry.events.push(make_test_event(1));
3618        assert_ne!(
3619            base.intent_mmr_root,
3620            rsm_test_roots(&[same_entry], &[[8u8; 32]]).intent_mmr_root
3621        );
3622
3623        let mut changed_obligation = make_test_rsm_entry(1);
3624        changed_obligation.command_identity_hash = [9u8; 32];
3625        changed_obligation.events.push(make_test_event(1));
3626        changed_obligation.events[0].event_data.push(42);
3627        assert_ne!(
3628            base.obligation_mmr_root,
3629            rsm_test_roots(&[changed_obligation], &[[9u8; 32]]).obligation_mmr_root
3630        );
3631    }
3632
3633    #[cfg(feature = "rsm-state")]
3634    #[test]
3635    fn rsm_batch_roots_from_wal_tolerates_legacy_records_without_state_digest() {
3636        let mut entry = make_test_entry(1);
3637        entry.command_identity_hash = [1u8; 32];
3638        entry.rsm_state_digest = None;
3639        let record = WalRecordWithOffset {
3640            record: WalJournalRecord::from_entry(&entry),
3641            end_offset: 100,
3642            block_header: None,
3643            block_roots: None,
3644        };
3645        assert!(rsm_batch_roots_from_wal(&[record]).is_none());
3646    }
3647
3648    #[test]
3649    fn wal_recovery_nonexistent_wal_file() {
3650        // If the WAL file doesn't exist, recovery is a no-op.
3651        let dir = tempfile::tempdir().expect("create tempdir");
3652        let wal_path = dir.path().join("nonexistent.wal");
3653        let checkpoint_path = checkpoint_path_for_wal(&wal_path);
3654
3655        let records = WalState::read_unreplicated_records(&wal_path, &checkpoint_path)
3656            .expect("missing WAL should be no-op");
3657        assert!(records.is_empty());
3658    }
3659
3660    #[test]
3661    fn wal_recovery_small_wal_few_records() {
3662        // Write a few records via WalState, then read them back and verify
3663        // that field contents roundtrip correctly.
3664        let dir = tempfile::tempdir().expect("create tempdir");
3665        let wal_path = dir.path().join("journal.wal");
3666
3667        let mut wal = WalState::open_at(wal_path).expect("open wal");
3668
3669        let entries: Vec<JournalEntry> = (0..5).map(make_test_entry).collect();
3670        let request_uuids: Vec<_> = entries.iter().map(|e| e.request_uuid).collect();
3671
3672        wal.append_batch(&entries).expect("append batch");
3673
3674        // Checkpoint at 0 => all 5 records should be recovered.
3675        write_checkpoint(
3676            &wal.checkpoint_path,
3677            WalCheckpointMetadata {
3678                wal_offset: 0,
3679                last_command_id: 0,
3680                last_l2_seq: 0,
3681            },
3682        )
3683        .expect("write checkpoint");
3684
3685        let records = WalState::read_unreplicated_records(&wal.wal_path, &wal.checkpoint_path)
3686            .expect("read records");
3687        assert_eq!(records.len(), 5);
3688
3689        for (i, record) in records.iter().enumerate() {
3690            assert_eq!(
3691                record.record.request_uuid, request_uuids[i],
3692                "request_uuid mismatch at index {}",
3693                i
3694            );
3695            assert_eq!(record.record.command_data, vec![i as u8]);
3696            assert_eq!(record.record.received_ts_ms, 1000 + i as u64);
3697            assert_eq!(record.record.order_id, Some(i as i64));
3698        }
3699
3700        // Verify offsets are monotonically increasing.
3701        for i in 1..records.len() {
3702            assert!(
3703                records[i].end_offset > records[i - 1].end_offset,
3704                "offsets must increase monotonically"
3705            );
3706        }
3707    }
3708
3709    #[test]
3710    fn wal_recovery_resumes_from_checkpoint() {
3711        // Write 5 records, checkpoint after record 2, then recover only
3712        // the remaining 3.
3713        let dir = tempfile::tempdir().expect("create tempdir");
3714        let wal_path = dir.path().join("journal.wal");
3715
3716        let mut wal = WalState::open_at(wal_path).expect("open wal");
3717
3718        let entries: Vec<JournalEntry> = (0..5).map(make_test_entry).collect();
3719        let request_uuids: Vec<_> = entries.iter().map(|e| e.request_uuid).collect();
3720
3721        let offsets = wal.append_batch(&entries).expect("append batch");
3722        assert_eq!(offsets.len(), 5);
3723
3724        // Checkpoint after record index 1 (i.e. 2 records replicated).
3725        let checkpoint_offset = offsets[1].end_offset;
3726        write_checkpoint(
3727            &wal.checkpoint_path,
3728            WalCheckpointMetadata {
3729                wal_offset: checkpoint_offset,
3730                last_command_id: 0,
3731                last_l2_seq: 0,
3732            },
3733        )
3734        .expect("write checkpoint");
3735
3736        let records = WalState::read_unreplicated_records(&wal.wal_path, &wal.checkpoint_path)
3737            .expect("read records after checkpoint");
3738        assert_eq!(records.len(), 3, "should recover only records 2,3,4");
3739
3740        // Records should be the last 3 entries.
3741        for (i, record) in records.iter().enumerate() {
3742            let entry_index = i + 2;
3743            assert_eq!(
3744                record.record.request_uuid, request_uuids[entry_index],
3745                "request_uuid mismatch at recovered index {}",
3746                i
3747            );
3748        }
3749    }
3750
3751    #[test]
3752    fn wal_recovery_truncated_after_length_prefix() {
3753        // Simulate a crash that left a partial record: length prefix is
3754        // written but CRC and payload are missing.  Recovery should warn
3755        // and return the records that were complete before the truncation.
3756        let dir = tempfile::tempdir().expect("create tempdir");
3757        let wal_path = dir.path().join("journal.wal");
3758        let checkpoint_path = checkpoint_path_for_wal(&wal_path);
3759
3760        // Write one complete record, then append a truncated length prefix.
3761        let entry = make_test_entry(0);
3762        let mut wal_data = wal_bytes_for_entry(&entry);
3763        let complete_record_len = wal_data.len();
3764
3765        // Append partial record: just the 4-byte length prefix, no CRC or payload.
3766        wal_data.extend_from_slice(&42u32.to_le_bytes());
3767
3768        std::fs::write(&wal_path, &wal_data).expect("write wal with truncated tail");
3769
3770        // No checkpoint => start from 0.
3771        let records = WalState::read_unreplicated_records(&wal_path, &checkpoint_path)
3772            .expect("should recover despite truncated tail");
3773        assert_eq!(records.len(), 1, "should recover the one complete record");
3774        assert_eq!(records[0].end_offset, complete_record_len as u64);
3775    }
3776
3777    #[test]
3778    fn wal_recovery_truncated_after_crc() {
3779        // Simulate a crash that wrote the length + CRC but the payload
3780        // is incomplete.  Recovery should still return prior complete records.
3781        let dir = tempfile::tempdir().expect("create tempdir");
3782        let wal_path = dir.path().join("journal.wal");
3783        let checkpoint_path = checkpoint_path_for_wal(&wal_path);
3784
3785        // Write two complete records, then append a truncated third.
3786        let entries: Vec<JournalEntry> = (0..2).map(make_test_entry).collect();
3787        let mut wal_data = Vec::new();
3788        for entry in &entries {
3789            wal_data.extend(wal_bytes_for_entry(entry));
3790        }
3791
3792        // Truncated third record: length + CRC but only 1 byte of payload
3793        // (claims payload is 100 bytes).
3794        wal_data.extend_from_slice(&100u32.to_le_bytes()); // length = 100
3795        wal_data.extend_from_slice(&0u32.to_le_bytes()); // CRC (won't matter)
3796        wal_data.push(0xFF); // only 1 of 100 payload bytes
3797
3798        std::fs::write(&wal_path, &wal_data).expect("write wal with truncated payload");
3799
3800        let records = WalState::read_unreplicated_records(&wal_path, &checkpoint_path)
3801            .expect("should recover despite truncated payload");
3802        assert_eq!(
3803            records.len(),
3804            2,
3805            "should recover both complete records before the truncation"
3806        );
3807    }
3808
3809    #[test]
3810    fn wal_recovery_truncated_mid_length_prefix() {
3811        // WAL ends with only 2 of 4 bytes of the next record's length prefix.
3812        // The read_exact for length should hit UnexpectedEof and stop cleanly.
3813        let dir = tempfile::tempdir().expect("create tempdir");
3814        let wal_path = dir.path().join("journal.wal");
3815        let checkpoint_path = checkpoint_path_for_wal(&wal_path);
3816
3817        let entry = make_test_entry(0);
3818        let mut wal_data = wal_bytes_for_entry(&entry);
3819
3820        // Append only 2 bytes of a length prefix.
3821        wal_data.extend_from_slice(&[0x01, 0x00]);
3822
3823        std::fs::write(&wal_path, &wal_data).expect("write wal with partial length prefix");
3824
3825        let records = WalState::read_unreplicated_records(&wal_path, &checkpoint_path)
3826            .expect("should recover despite partial length prefix");
3827        assert_eq!(records.len(), 1, "should recover the one complete record");
3828    }
3829
3830    #[test]
3831    fn wal_recovery_corrupt_giant_payload_len() {
3832        // A corrupt length prefix that claims a multi-GB payload should not OOM
3833        // the process.  The payload_len guard should detect that the declared
3834        // length exceeds the remaining file bytes and stop replay.
3835        let dir = tempfile::tempdir().expect("create tempdir");
3836        let wal_path = dir.path().join("journal.wal");
3837        let checkpoint_path = checkpoint_path_for_wal(&wal_path);
3838
3839        // Write one complete record first.
3840        let entry = make_test_entry(0);
3841        let mut wal_data = wal_bytes_for_entry(&entry);
3842        let complete_len = wal_data.len();
3843
3844        // Append a record with a huge payload_len (2 GB) but only 16 bytes of
3845        // actual data.  Without the guard this would attempt a 2 GB allocation.
3846        wal_data.extend_from_slice(&0x8000_0000u32.to_le_bytes()); // 2 GB
3847        wal_data.extend_from_slice(&0u32.to_le_bytes()); // CRC placeholder
3848        wal_data.extend_from_slice(&[0xAB; 16]); // 16 bytes of junk
3849
3850        std::fs::write(&wal_path, &wal_data).expect("write wal with corrupt length");
3851
3852        let records = WalState::read_unreplicated_records(&wal_path, &checkpoint_path)
3853            .expect("should recover despite corrupt length prefix");
3854        assert_eq!(
3855            records.len(),
3856            1,
3857            "should recover the one complete record before the corrupt length"
3858        );
3859        assert_eq!(records[0].end_offset, complete_len as u64);
3860    }
3861
3862    #[test]
3863    fn wal_recovery_payload_len_exactly_exceeds_remaining() {
3864        // Length prefix claims just a few more bytes than remain in the file.
3865        // This tests the boundary of the payload_len guard.
3866        let dir = tempfile::tempdir().expect("create tempdir");
3867        let wal_path = dir.path().join("journal.wal");
3868        let checkpoint_path = checkpoint_path_for_wal(&wal_path);
3869
3870        let entry = make_test_entry(0);
3871        let mut wal_data = wal_bytes_for_entry(&entry);
3872
3873        // Append a record whose payload_len (200) exceeds remaining bytes.
3874        // After the 4-byte length prefix, only 0 bytes of CRC+payload remain
3875        // because we write nothing else.
3876        wal_data.extend_from_slice(&200u32.to_le_bytes());
3877
3878        std::fs::write(&wal_path, &wal_data).expect("write wal");
3879
3880        let records = WalState::read_unreplicated_records(&wal_path, &checkpoint_path)
3881            .expect("should recover despite oversized payload_len");
3882        assert_eq!(records.len(), 1);
3883    }
3884
3885    #[test]
3886    fn wal_recovery_manual_bytes_match_wal_state_writer() {
3887        // Verify that manually-constructed WAL bytes (as used in truncation
3888        // tests) produce the same records as WalState::append_batch.
3889        let dir = tempfile::tempdir().expect("create tempdir");
3890        let wal_path = dir.path().join("journal.wal");
3891
3892        let entry = make_test_entry(42);
3893        let manual_bytes = wal_bytes_for_entry(&entry);
3894
3895        // Write the same entry via WalState.
3896        let mut wal = WalState::open_at(wal_path.clone()).expect("open wal");
3897        wal.append_batch(&[entry]).expect("append batch");
3898        let wal_state_bytes = std::fs::read(&wal_path).expect("read wal file");
3899
3900        assert_eq!(
3901            manual_bytes, wal_state_bytes,
3902            "Manual WAL byte construction must match WalState writer output"
3903        );
3904    }
3905
3906    // ---------------------------------------------------------------
3907    // WAL block entry serialization tests
3908    // ---------------------------------------------------------------
3909
3910    #[cfg(feature = "rsm-state")]
3911    #[test]
3912    fn test_wal_block_entry_roundtrip() {
3913        // Create a WalBlockEntry with a header and 3 commands, serialize,
3914        // deserialize, and verify all fields match.
3915        let commands: Vec<WalJournalRecord> = (0..3)
3916            .map(|i| {
3917                let entry = make_test_entry(i);
3918                WalJournalRecord::from_entry(&entry)
3919            })
3920            .collect();
3921
3922        let header = WalBlockRecord {
3923            batch_seq: 0,
3924            prev_block_hash: [0u8; 32],
3925            commands_hash: [1u8; 32],
3926            batch_root: [0u8; 32],
3927            command_count: 3,
3928            first_seq: 0,
3929            timestamp: 1700000000000,
3930            signer: [0u8; 20],
3931            signature: vec![0u8; 65],
3932        };
3933
3934        let block_entry = WalBlockEntry {
3935            header: header.clone(),
3936            commands: commands.clone(),
3937        };
3938
3939        let serialized = rmp_serde::to_vec_named(&block_entry).expect("serialize WalBlockEntry");
3940        let deserialized: WalBlockEntry =
3941            rmp_serde::from_slice(&serialized).expect("deserialize WalBlockEntry");
3942
3943        // Verify header fields
3944        assert_eq!(deserialized.header.batch_seq, header.batch_seq);
3945        assert_eq!(deserialized.header.prev_block_hash, header.prev_block_hash);
3946        assert_eq!(deserialized.header.commands_hash, header.commands_hash);
3947        assert_eq!(deserialized.header.batch_root, header.batch_root);
3948        assert_eq!(deserialized.header.command_count, header.command_count);
3949        assert_eq!(deserialized.header.first_seq, header.first_seq);
3950        assert_eq!(deserialized.header.timestamp, header.timestamp);
3951        assert_eq!(deserialized.header.signer, header.signer);
3952        assert_eq!(deserialized.header.signature, header.signature);
3953
3954        // Verify commands
3955        assert_eq!(deserialized.commands.len(), 3);
3956        for (i, cmd) in deserialized.commands.iter().enumerate() {
3957            assert_eq!(cmd.command_data, commands[i].command_data);
3958            assert_eq!(cmd.received_ts_ms, commands[i].received_ts_ms);
3959            assert_eq!(cmd.order_id, commands[i].order_id);
3960            assert_eq!(cmd.request_uuid, commands[i].request_uuid);
3961        }
3962    }
3963
3964    #[cfg(feature = "rsm-state")]
3965    #[test]
3966    fn test_wal_block_entry_backward_compat() {
3967        // Serialize an old-format WalJournalRecord, then verify:
3968        // 1. It fails to deserialize as WalBlockEntry (different structure)
3969        // 2. It succeeds as WalJournalRecord
3970        let entry = make_test_entry(42);
3971        let record = WalJournalRecord::from_entry(&entry);
3972        let serialized = rmp_serde::to_vec_named(&record).expect("serialize WalJournalRecord");
3973
3974        // Should fail as WalBlockEntry (it has no 'header' / 'commands' fields)
3975        let block_attempt = rmp_serde::from_slice::<WalBlockEntry>(&serialized);
3976        assert!(
3977            block_attempt.is_err(),
3978            "Old-format WalJournalRecord must not deserialize as WalBlockEntry"
3979        );
3980
3981        // Should succeed as WalJournalRecord
3982        let roundtripped: WalJournalRecord =
3983            rmp_serde::from_slice(&serialized).expect("deserialize WalJournalRecord");
3984        assert_eq!(roundtripped.request_uuid, record.request_uuid);
3985        assert_eq!(roundtripped.command_data, record.command_data);
3986        assert_eq!(roundtripped.received_ts_ms, record.received_ts_ms);
3987        assert_eq!(roundtripped.order_id, record.order_id);
3988    }
3989
3990    #[cfg(feature = "rsm-state")]
3991    #[test]
3992    #[should_panic(expected = "legacy rootless RSM WAL block")]
3993    fn wal_recovery_rejects_legacy_rootless_rsm_block() {
3994        let dir = tempfile::tempdir().expect("create tempdir");
3995        let wal_path = dir.path().join("journal.wal");
3996        let checkpoint_path = checkpoint_path_for_wal(&wal_path);
3997        let command = WalJournalRecord::from_entry(&make_test_entry(42));
3998        let block_entry = WalBlockEntry {
3999            header: WalBlockRecord {
4000                batch_seq: 0,
4001                prev_block_hash: [0u8; 32],
4002                commands_hash: [1u8; 32],
4003                batch_root: [2u8; 32],
4004                command_count: 1,
4005                first_seq: 0,
4006                timestamp: 1_700_000_000_000,
4007                signer: [0u8; 20],
4008                signature: vec![0u8; 65],
4009            },
4010            commands: vec![command],
4011        };
4012        let payload = rmp_serde::to_vec_named(&block_entry).expect("serialize legacy block");
4013        let mut file = File::create(&wal_path).expect("create wal");
4014        let mut offset = 0;
4015        write_frame(&mut file, &payload, &mut offset, "legacy block").expect("write frame");
4016        drop(file);
4017
4018        let _ = WalState::read_unreplicated_records(&wal_path, &checkpoint_path);
4019    }
4020
4021    #[cfg(feature = "rsm-state")]
4022    #[test]
4023    fn test_wal_block_entry_with_empty_commands() {
4024        // A block with 0 commands should serialize/deserialize correctly.
4025        let header = WalBlockRecord {
4026            batch_seq: 0,
4027            prev_block_hash: [0u8; 32],
4028            commands_hash: [0u8; 32],
4029            batch_root: [0u8; 32],
4030            command_count: 0,
4031            first_seq: 0,
4032            timestamp: 1700000000000,
4033            signer: [0u8; 20],
4034            signature: vec![0u8; 65],
4035        };
4036
4037        let block_entry = WalBlockEntry {
4038            header: header.clone(),
4039            commands: vec![],
4040        };
4041
4042        let serialized = rmp_serde::to_vec_named(&block_entry).expect("serialize empty block");
4043        let deserialized: WalBlockEntry =
4044            rmp_serde::from_slice(&serialized).expect("deserialize empty block");
4045
4046        assert_eq!(deserialized.header.batch_seq, 0);
4047        assert_eq!(deserialized.header.command_count, 0);
4048        assert_eq!(deserialized.header.timestamp, 1700000000000);
4049        assert!(
4050            deserialized.commands.is_empty(),
4051            "Empty block must have no commands"
4052        );
4053    }
4054
4055    // ---------------------------------------------------------------
4056    // Integration test: streaming WAL recovery with large unreplicated
4057    // tail does not OOM and correctly replicates all records to Postgres.
4058    //
4059    // Regression test for CALL-654 / PR #850.
4060    // ---------------------------------------------------------------
4061
4062    /// Set up a Postgres test database and return the diesel pool.
4063    ///
4064    /// Checks `TEST_DATABASE_URL` first to reuse a pre-started Postgres service
4065    /// (CI). Falls back to a testcontainer for local development.
4066    async fn setup_test_pool() -> (
4067        hypercall_db_diesel::DbPool,
4068        Option<testcontainers::ContainerAsync<testcontainers_modules::postgres::Postgres>>,
4069        crate::test_contracts::TestnetContractEnvGuard,
4070    ) {
4071        use crate::test_contracts::test_database_url_with_database;
4072        use testcontainers::runners::AsyncRunner;
4073        use testcontainers::ImageExt;
4074        use testcontainers_modules::postgres::Postgres;
4075
4076        let env_guard = crate::test_contracts::TestnetContractEnvGuard::apply();
4077
4078        let (database_url, container) = if let Ok(url) = std::env::var("TEST_DATABASE_URL") {
4079            let db_name = format!("test_wal_recovery_{}", uuid::Uuid::new_v4().simple());
4080            let admin_pool = sqlx::postgres::PgPoolOptions::new()
4081                .max_connections(1)
4082                .connect(&url)
4083                .await
4084                .expect("connect to TEST_DATABASE_URL");
4085            sqlx::query(&format!(r#"CREATE DATABASE "{}""#, db_name))
4086                .execute(&admin_pool)
4087                .await
4088                .expect("create isolated test database");
4089            admin_pool.close().await;
4090            let db_url = test_database_url_with_database(&url, &db_name);
4091            (db_url, None)
4092        } else {
4093            let container = Postgres::default()
4094                .with_db_name("test_wal_recovery")
4095                .with_user("test_user")
4096                .with_password("test_password")
4097                .with_tag("16-alpine")
4098                .start()
4099                .await
4100                .expect("start postgres container");
4101
4102            let port = container
4103                .get_host_port_ipv4(5432)
4104                .await
4105                .expect("get postgres port");
4106
4107            let url = format!(
4108                "postgresql://test_user:test_password@127.0.0.1:{}/test_wal_recovery",
4109                port
4110            );
4111
4112            // Wait for Postgres to be ready.
4113            tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
4114
4115            (url, Some(container))
4116        };
4117
4118        let _handler = crate::db_handler::DieselEventHandler::new(&database_url)
4119            .expect("create diesel handler and run migrations");
4120
4121        let auth = hypercall_db_diesel::DbAuthConfig::password(&database_url);
4122        let pool = hypercall_db_diesel::build_db_pool(&auth, 5, 30_000, 10_000)
4123            .expect("build diesel pool");
4124
4125        (pool, container, env_guard)
4126    }
4127
4128    #[tokio::test]
4129    #[serial_test::serial(testnet_env)]
4130    async fn recover_wal_sync_large_unreplicated_tail() {
4131        // Regression test for CALL-654: streaming WAL recovery must process
4132        // a large unreplicated tail in bounded-memory batches without OOM,
4133        // replicate every record to PostgreSQL, and advance the checkpoint
4134        // to the end of the WAL.
4135
4136        let (pool, _container, _env_guard) = setup_test_pool().await;
4137
4138        let dir = tempfile::tempdir().expect("create tempdir");
4139        let wal_path = dir.path().join("journal.wal");
4140        let checkpoint_path = checkpoint_path_for_wal(&wal_path);
4141
4142        // Write 1000 WAL entries to simulate a large unreplicated tail.
4143        let num_entries: usize = 1000;
4144        let mut wal = WalState::open_at(wal_path.clone()).expect("open wal");
4145
4146        let entries: Vec<JournalEntry> = (0..num_entries)
4147            .map(|i| JournalEntry {
4148                request_uuid: hypercall_db_diesel::engine_enums::DbUuid(uuid::Uuid::new_v4()),
4149                received_ts_ms: 1000 + i as u64,
4150                command_data: vec![(i % 256) as u8; 64],
4151                response_data: Some(vec![(i % 256) as u8; 32]),
4152                order_id: Some(i as i64),
4153                pre_digest: EngineStateDigest::default(),
4154                post_digest: EngineStateDigest::default(),
4155                duration_ms: 1,
4156                events: vec![],
4157                outbox_appends: vec![],
4158                fill_side_effects: vec![],
4159                cash_withdrawal_side_effect: None,
4160                balance_updates: Vec::new(),
4161                created_at: Instant::now(),
4162                commit_ack: None,
4163                command_type_enum: Some(
4164                    hypercall_db_diesel::engine_enums::CommandType::CreateOrder,
4165                ),
4166                #[cfg(feature = "rsm-state")]
4167                command_identity_hash: [0u8; 32],
4168                #[cfg(feature = "rsm-state")]
4169                rsm_state_digest: Some(crate::rsm::engine_snapshot::EngineStateDigest::empty()),
4170            })
4171            .collect();
4172
4173        let request_uuids: Vec<hypercall_db_diesel::engine_enums::DbUuid> =
4174            entries.iter().map(|e| e.request_uuid).collect();
4175
4176        let offsets = wal.append_batch(&entries).expect("append WAL entries");
4177        assert_eq!(offsets.len(), num_entries);
4178
4179        let wal_end_offset = offsets.last().unwrap().end_offset;
4180        drop(wal);
4181
4182        // Checkpoint at 0 => entire WAL is unreplicated.
4183        write_checkpoint(
4184            &checkpoint_path,
4185            WalCheckpointMetadata {
4186                wal_offset: 0,
4187                last_command_id: 0,
4188                last_l2_seq: 0,
4189            },
4190        )
4191        .expect("write initial checkpoint");
4192
4193        // Use a small batch size (10) to force many streaming iterations.
4194        let small_batch_size: usize = 10;
4195
4196        // Run streaming WAL recovery.
4197        let truncated = EngineJournalBatcher::recover_wal_sync(
4198            &pool,
4199            &wal_path,
4200            &checkpoint_path,
4201            false, // persist_digests
4202            small_batch_size,
4203            #[cfg(feature = "rsm-state")]
4204            hypercall_db::ValidatorRsmEnvironment::Development,
4205            #[cfg(feature = "rsm-state")]
4206            None,
4207        );
4208
4209        // No torn bytes, so no truncation should occur.
4210        assert!(
4211            truncated.is_none(),
4212            "WAL should not be truncated when all records are complete"
4213        );
4214
4215        // Verify checkpoint advanced to end of WAL.
4216        let checkpoint = read_checkpoint(&checkpoint_path).expect("read checkpoint");
4217        assert_eq!(
4218            checkpoint.wal_offset, wal_end_offset,
4219            "Checkpoint must advance to end of WAL after full recovery"
4220        );
4221        assert!(
4222            checkpoint.last_command_id > 0,
4223            "Checkpoint must record a positive last_command_id"
4224        );
4225
4226        // Verify all 1000 records were inserted into engine_commands.
4227        let mut conn = pool.get().expect("get db connection");
4228        let inserted_count: i64 =
4229            diesel::dsl::sql::<diesel::sql_types::BigInt>("SELECT COUNT(*) FROM engine_commands")
4230                .get_result(&mut conn)
4231                .expect("count engine_commands");
4232
4233        assert_eq!(
4234            inserted_count, num_entries as i64,
4235            "All {} WAL entries must be replicated to engine_commands",
4236            num_entries
4237        );
4238
4239        // Verify request_uuids match by spot-checking first, middle, and last.
4240        for &idx in &[0, num_entries / 2, num_entries - 1] {
4241            let uuid_val = request_uuids[idx].0;
4242            let exists: bool = diesel::dsl::sql::<diesel::sql_types::Bool>(&format!(
4243                "SELECT EXISTS(SELECT 1 FROM engine_commands WHERE request_uuid = '{}')",
4244                uuid_val
4245            ))
4246            .get_result(&mut conn)
4247            .expect("check request_uuid exists");
4248
4249            assert!(
4250                exists,
4251                "request_uuid for entry {} must exist in engine_commands",
4252                idx
4253            );
4254        }
4255    }
4256
4257    #[tokio::test]
4258    #[serial_test::serial(testnet_env)]
4259    async fn recover_wal_sync_checkpoint_advances_per_batch() {
4260        // Verify that the checkpoint advances after each batch, not just at
4261        // the end. This ensures crash-recovery mid-replay resumes correctly.
4262
4263        let (pool, _container, _env_guard) = setup_test_pool().await;
4264
4265        let dir = tempfile::tempdir().expect("create tempdir");
4266        let wal_path = dir.path().join("journal.wal");
4267        let checkpoint_path = checkpoint_path_for_wal(&wal_path);
4268
4269        // Write 50 entries, use batch_size=10 so we get 5 batches.
4270        let num_entries: usize = 50;
4271        let mut wal = WalState::open_at(wal_path.clone()).expect("open wal");
4272
4273        let entries: Vec<JournalEntry> = (0..num_entries)
4274            .map(|i| JournalEntry {
4275                request_uuid: hypercall_db_diesel::engine_enums::DbUuid(uuid::Uuid::new_v4()),
4276                received_ts_ms: 2000 + i as u64,
4277                command_data: vec![(i % 256) as u8; 16],
4278                response_data: None,
4279                order_id: Some(i as i64),
4280                pre_digest: EngineStateDigest::default(),
4281                post_digest: EngineStateDigest::default(),
4282                duration_ms: 1,
4283                events: vec![],
4284                outbox_appends: vec![],
4285                fill_side_effects: vec![],
4286                cash_withdrawal_side_effect: None,
4287                balance_updates: Vec::new(),
4288                created_at: Instant::now(),
4289                commit_ack: None,
4290                command_type_enum: Some(
4291                    hypercall_db_diesel::engine_enums::CommandType::CreateOrder,
4292                ),
4293                #[cfg(feature = "rsm-state")]
4294                command_identity_hash: [0u8; 32],
4295                #[cfg(feature = "rsm-state")]
4296                rsm_state_digest: Some(crate::rsm::engine_snapshot::EngineStateDigest::empty()),
4297            })
4298            .collect();
4299
4300        let offsets = wal.append_batch(&entries).expect("append WAL entries");
4301        let wal_end_offset = offsets.last().unwrap().end_offset;
4302        drop(wal);
4303
4304        // No checkpoint file => defaults to offset 0.
4305        EngineJournalBatcher::recover_wal_sync(
4306            &pool,
4307            &wal_path,
4308            &checkpoint_path,
4309            false,
4310            10,
4311            #[cfg(feature = "rsm-state")]
4312            hypercall_db::ValidatorRsmEnvironment::Development,
4313            #[cfg(feature = "rsm-state")]
4314            None,
4315        );
4316
4317        // After recovery, checkpoint must be at the very end.
4318        let checkpoint = read_checkpoint(&checkpoint_path).expect("read checkpoint");
4319        assert_eq!(
4320            checkpoint.wal_offset, wal_end_offset,
4321            "Checkpoint must be at end of WAL after complete recovery"
4322        );
4323
4324        // Now simulate a partial replay: reset checkpoint to midpoint,
4325        // then recover again. Idempotent inserts mean no duplicates.
4326        let midpoint_offset = offsets[24].end_offset;
4327        write_checkpoint(
4328            &checkpoint_path,
4329            WalCheckpointMetadata {
4330                wal_offset: midpoint_offset,
4331                last_command_id: checkpoint.last_command_id,
4332                last_l2_seq: checkpoint.last_l2_seq,
4333            },
4334        )
4335        .expect("write midpoint checkpoint");
4336
4337        EngineJournalBatcher::recover_wal_sync(
4338            &pool,
4339            &wal_path,
4340            &checkpoint_path,
4341            false,
4342            10,
4343            #[cfg(feature = "rsm-state")]
4344            hypercall_db::ValidatorRsmEnvironment::Development,
4345            #[cfg(feature = "rsm-state")]
4346            None,
4347        );
4348
4349        // Checkpoint should again be at end.
4350        let checkpoint2 =
4351            read_checkpoint(&checkpoint_path).expect("read checkpoint after re-recovery");
4352        assert_eq!(checkpoint2.wal_offset, wal_end_offset);
4353
4354        // Count should still be 50 (idempotent via ON CONFLICT DO NOTHING).
4355        let mut conn = pool.get().expect("get db connection");
4356        let count: i64 =
4357            diesel::dsl::sql::<diesel::sql_types::BigInt>("SELECT COUNT(*) FROM engine_commands")
4358                .get_result(&mut conn)
4359                .expect("count engine_commands");
4360
4361        assert_eq!(
4362            count, num_entries as i64,
4363            "Re-recovery must not create duplicate rows"
4364        );
4365    }
4366
4367    #[tokio::test]
4368    #[serial_test::serial(testnet_env)]
4369    async fn recover_wal_sync_duplicate_request_id_persists_digest_once() {
4370        let (pool, _container, _env_guard) = setup_test_pool().await;
4371
4372        let dir = tempfile::tempdir().expect("create tempdir");
4373        let wal_path = dir.path().join("journal.wal");
4374        let checkpoint_path = checkpoint_path_for_wal(&wal_path);
4375        let duplicate_request_uuid =
4376            hypercall_db_diesel::engine_enums::DbUuid(uuid::Uuid::new_v4());
4377        let mut wal = WalState::open_at(wal_path.clone()).expect("open wal");
4378
4379        let entries: Vec<JournalEntry> = (0..2)
4380            .map(|i| JournalEntry {
4381                request_uuid: duplicate_request_uuid,
4382                received_ts_ms: 3000 + i as u64,
4383                command_data: vec![i as u8; 16],
4384                response_data: None,
4385                order_id: None,
4386                pre_digest: EngineStateDigest::default(),
4387                post_digest: EngineStateDigest::default(),
4388                duration_ms: 1,
4389                events: vec![EventPayload {
4390                    event_topic: "test.recovery".to_string(),
4391                    event_key: Some(format!("duplicate-event-{i}")),
4392                    event_data: vec![1, i as u8],
4393                    l2_sequence: Some(i as i64),
4394                    event_type_enum: hypercall_db_diesel::engine_enums::EventType::OrderbookUpdated,
4395                }],
4396                outbox_appends: vec![],
4397                balance_updates: vec![],
4398                fill_side_effects: vec![],
4399                cash_withdrawal_side_effect: None,
4400                created_at: Instant::now(),
4401                commit_ack: None,
4402                command_type_enum: Some(
4403                    hypercall_db_diesel::engine_enums::CommandType::DepositUpdate,
4404                ),
4405                #[cfg(feature = "rsm-state")]
4406                command_identity_hash: [0u8; 32],
4407                #[cfg(feature = "rsm-state")]
4408                rsm_state_digest: Some(crate::rsm::engine_snapshot::EngineStateDigest::empty()),
4409            })
4410            .collect();
4411
4412        let offsets = wal.append_batch(&entries).expect("append WAL entries");
4413        let wal_end_offset = offsets.last().unwrap().end_offset;
4414        drop(wal);
4415
4416        EngineJournalBatcher::recover_wal_sync(
4417            &pool,
4418            &wal_path,
4419            &checkpoint_path,
4420            true,
4421            10,
4422            #[cfg(feature = "rsm-state")]
4423            hypercall_db::ValidatorRsmEnvironment::Development,
4424            #[cfg(feature = "rsm-state")]
4425            None,
4426        );
4427
4428        let checkpoint = read_checkpoint(&checkpoint_path).expect("read checkpoint");
4429        assert_eq!(
4430            checkpoint.wal_offset, wal_end_offset,
4431            "duplicate request UUIDs must not block checkpoint advancement"
4432        );
4433
4434        let mut conn = pool.get().expect("get db connection");
4435        let command_count: i64 =
4436            diesel::dsl::sql::<diesel::sql_types::BigInt>("SELECT COUNT(*) FROM engine_commands")
4437                .get_result(&mut conn)
4438                .expect("count engine_commands");
4439        assert_eq!(
4440            command_count, 1,
4441            "duplicate request UUIDs materialize one engine command"
4442        );
4443
4444        let event_count: i64 =
4445            diesel::dsl::sql::<diesel::sql_types::BigInt>("SELECT COUNT(*) FROM engine_events")
4446                .get_result(&mut conn)
4447                .expect("count engine_events");
4448        assert_eq!(
4449            event_count, 1,
4450            "duplicate request UUIDs materialize one command event"
4451        );
4452
4453        let digest_count: i64 = diesel::dsl::sql::<diesel::sql_types::BigInt>(
4454            "SELECT COUNT(*) FROM engine_command_digests",
4455        )
4456        .get_result(&mut conn)
4457        .expect("count engine_command_digests");
4458        assert_eq!(
4459            digest_count, 1,
4460            "duplicate request UUIDs materialize one command digest"
4461        );
4462    }
4463
4464    #[cfg(feature = "rsm-state")]
4465    #[tokio::test]
4466    #[serial_test::serial(testnet_env)]
4467    async fn rsm_wal_recovery_rebuilds_local_commitment_store_before_db_roots() {
4468        let (pool, _container, _env_guard) = setup_test_pool().await;
4469        let env = hypercall_db::ValidatorRsmEnvironment::Development;
4470        let dir = tempfile::tempdir().expect("create tempdir");
4471        let config = ValidatorRsmCommitmentStoreConfig {
4472            root_dir: dir.path().join("state"),
4473            prune_window: 100,
4474            pruning_enabled: false,
4475        };
4476
4477        let mut runtime = ValidatorRsmStateCommitmentRuntime::open_from_db(
4478            config.clone(),
4479            Arc::new(pool.clone()),
4480            env,
4481        )
4482        .expect("open empty commitment runtime");
4483
4484        let mut entry = make_test_rsm_entry(1);
4485        entry.command_identity_hash = [7u8; 32];
4486        let entries = [entry];
4487        let mut block = make_test_rsm_block_records(0, [0u8; 32], 0, &entries);
4488
4489        apply_recovered_rsm_commitment_blocks(&mut block, &mut runtime)
4490            .expect("replay should apply local commitment roots before DB persistence");
4491        EngineJournalBatcher::insert_batch(
4492            &pool,
4493            &entries,
4494            false,
4495            Some(
4496                EngineJournalBatcher::rsm_block_persistence_batch(RsmBlockPersistenceInput {
4497                    environment: env,
4498                    records: &block,
4499                    mode: RsmBlockPersistenceMode::Replay,
4500                })
4501                .expect("prepare RSM replay persistence"),
4502            ),
4503        )
4504        .expect("persist replayed roots");
4505
4506        drop(runtime);
4507        ValidatorRsmStateCommitmentRuntime::open_from_db(config, Arc::new(pool.clone()), env)
4508            .expect("strict DB/local root verification should pass after replay");
4509    }
4510
4511    #[cfg(feature = "rsm-state")]
4512    #[tokio::test]
4513    #[serial_test::serial(testnet_env)]
4514    async fn rsm_replay_does_not_rewind_current_pointer() {
4515        let (pool, _container, _env_guard) = setup_test_pool().await;
4516        let env = hypercall_db::ValidatorRsmEnvironment::Development;
4517
4518        let mut entry1 = make_test_rsm_entry(1);
4519        entry1.command_identity_hash = [1u8; 32];
4520        let mut entry2 = make_test_rsm_entry(2);
4521        entry2.command_identity_hash = [2u8; 32];
4522
4523        let entries = [entry1, entry2];
4524
4525        let block1 = make_test_rsm_block_records(1, [0u8; 32], 0, &entries[0..1]);
4526        EngineJournalBatcher::insert_batch(
4527            &pool,
4528            &entries[0..1],
4529            false,
4530            Some(
4531                EngineJournalBatcher::rsm_block_persistence_batch(RsmBlockPersistenceInput {
4532                    environment: env,
4533                    records: &block1,
4534                    mode: RsmBlockPersistenceMode::Live,
4535                })
4536                .expect("prepare RSM live block 1 persistence"),
4537            ),
4538        )
4539        .expect("persist live block 1");
4540        let block1_hash = block1[0]
4541            .block_header
4542            .as_ref()
4543            .expect("block 1 header")
4544            .block_hash();
4545
4546        let block2 = make_test_rsm_block_records(2, block1_hash, 1, &entries[1..2]);
4547        EngineJournalBatcher::insert_batch(
4548            &pool,
4549            &entries[1..2],
4550            false,
4551            Some(
4552                EngineJournalBatcher::rsm_block_persistence_batch(RsmBlockPersistenceInput {
4553                    environment: env,
4554                    records: &block2,
4555                    mode: RsmBlockPersistenceMode::Live,
4556                })
4557                .expect("prepare RSM live block 2 persistence"),
4558            ),
4559        )
4560        .expect("persist live block 2");
4561
4562        let handler =
4563            hypercall_db_diesel::DatabaseHandler::with_pool_no_migrations(Arc::new(pool.clone()));
4564        assert_eq!(
4565            handler
4566                .get_validator_rsm_current_state_sync(env)
4567                .expect("read current")
4568                .expect("current exists")
4569                .current_version,
4570            2
4571        );
4572
4573        EngineJournalBatcher::insert_batch(
4574            &pool,
4575            &entries[0..1],
4576            false,
4577            Some(
4578                EngineJournalBatcher::rsm_block_persistence_batch(RsmBlockPersistenceInput {
4579                    environment: env,
4580                    records: &block1,
4581                    mode: RsmBlockPersistenceMode::Replay,
4582                })
4583                .expect("prepare RSM replay block persistence"),
4584            ),
4585        )
4586        .expect("replay old block without rewinding current");
4587
4588        assert_eq!(
4589            handler
4590                .get_validator_rsm_current_state_sync(env)
4591                .expect("read current after replay")
4592                .expect("current exists")
4593                .current_version,
4594            2
4595        );
4596    }
4597}