1use crate::observability::command_trace::EngineStateDigest;
11#[cfg(feature = "rsm-state")]
12use crate::rsm::commitment_store::{
13 ValidatorRsmCommitmentStoreConfig, ValidatorRsmStateCommitmentRuntime,
14};
15#[cfg(feature = "rsm-state")]
16use crate::rsm::engine_snapshot::EngineStateDigest as RsmEngineStateDigest;
17use hypercall_journal::checkpoint::{
18 checkpoint_path_for as checkpoint_path_for_wal, read_checkpoint, write_checkpoint,
19 WalCheckpointMetadata,
20};
21#[cfg(feature = "rsm-state")]
22mod rsm_block_persistence;
23#[cfg(feature = "rsm-state")]
24use hypercall_blocks::SigningAuthority;
25#[cfg(feature = "rsm-state")]
26use hypercall_blocks::{BlockLogEntry, BlockLogHeader};
27use hypercall_db::EngineJournalBatchWriter;
28#[cfg(feature = "rsm-state")]
29use hypercall_db::ValidatorRsmStateReader;
30use hypercall_journal::frame::{write_frame, WAL_CRC};
31#[cfg(feature = "rsm-state")]
32use hypercall_state_commitment::{
33 leaves::GlobalLeaf,
34 pipeline::{PreparedBatchCommitment, StateDelta},
35};
36use metrics::{counter, gauge, histogram};
37#[cfg(feature = "rsm-state")]
38use rsm_block_persistence::{RsmBlockPersistenceInput, RsmBlockPersistenceMode};
39use rust_decimal::Decimal;
40use serde::{Deserialize, Serialize};
41use std::fs::{create_dir_all, File, OpenOptions};
42use std::io::{BufWriter, Read, Seek, SeekFrom, Write};
43use std::path::{Path, PathBuf};
44use std::sync::Arc;
45use std::time::{Duration, Instant};
46use tokio::sync::{mpsc, oneshot};
47use tracing::{debug, error, info, info_span, warn, Instrument};
48
49#[cfg(target_os = "linux")]
50use std::os::unix::io::AsRawFd;
51
52pub const DEFAULT_CHANNEL_CAPACITY: usize = 3000;
54
55const REPLICATION_RETRY_DELAY: Duration = Duration::from_millis(250);
56
57#[cfg(target_os = "linux")]
59const WAL_PREALLOC_CHUNK: u64 = 64 * 1024 * 1024;
60
61enum WalWriterMessage {
63 WriteBatch {
64 entries: Vec<JournalEntry>,
65 ack_senders: Vec<Option<oneshot::Sender<()>>>,
66 completion: Option<oneshot::Sender<()>>,
68 },
69 FlushBarrier {
73 completion: oneshot::Sender<()>,
74 },
75 Shutdown,
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize)]
81pub struct EventPayload {
82 pub event_topic: String,
83 pub event_key: Option<String>,
84 pub event_data: Vec<u8>, pub l2_sequence: Option<i64>,
86 pub event_type_enum: hypercall_db_diesel::engine_enums::EventType,
87}
88
89#[derive(Debug, Clone, Serialize, Deserialize)]
95pub struct JournalFillSideEffect {
96 pub event_idx: i32,
97 pub trade_id: u64,
98 pub taker_ledger_delta: Decimal,
99 pub maker_ledger_delta: Decimal,
100 pub taker_premium_delta: Decimal,
101 pub maker_premium_delta: Decimal,
102 #[serde(default)]
103 pub underlying_notional: Option<Decimal>,
104}
105
106impl JournalFillSideEffect {
107 pub fn from_fill_accounting(
108 event_idx: i32,
109 accounting: &hypercall_types::FillAccounting,
110 ) -> Self {
111 accounting.assert_cash_decomposition();
112 Self {
113 event_idx,
114 trade_id: accounting.trade_id,
115 taker_ledger_delta: accounting.taker_ledger_residual_delta(),
116 maker_ledger_delta: accounting.maker_ledger_residual_delta(),
117 taker_premium_delta: accounting.taker_premium_delta(),
118 maker_premium_delta: accounting.maker_premium_delta(),
119 underlying_notional: None,
120 }
121 }
122}
123
124#[derive(Debug, Clone, Serialize, Deserialize)]
129pub struct JournalCashWithdrawalSideEffect {
130 pub wallet: hypercall_types::WalletAddress,
131 pub request_id: String,
132 pub amount: Decimal,
133 pub balance_after: Decimal,
134 pub timestamp_ms: u64,
135}
136
137#[derive(Debug)]
139pub struct JournalEntry {
140 pub received_ts_ms: u64,
141 pub command_data: Vec<u8>,
143 pub response_data: Option<Vec<u8>>,
145 pub order_id: Option<i64>,
147 pub pre_digest: EngineStateDigest,
148 pub post_digest: EngineStateDigest,
149 pub duration_ms: u64,
150 pub events: Vec<EventPayload>,
151 pub outbox_appends: Vec<crate::directive_outbox::DirectiveOutboxAppend>,
152 pub fill_side_effects: Vec<JournalFillSideEffect>,
153 pub cash_withdrawal_side_effect: Option<JournalCashWithdrawalSideEffect>,
154 pub balance_updates: Vec<hypercall_types::BalanceUpdate>,
155 pub created_at: Instant,
156 pub commit_ack: Option<oneshot::Sender<()>>,
158 pub request_uuid: hypercall_db_diesel::engine_enums::DbUuid,
159 pub command_type_enum: Option<hypercall_db_diesel::engine_enums::CommandType>,
161 #[cfg(feature = "rsm-state")]
163 pub command_identity_hash: [u8; 32],
164 #[cfg(feature = "rsm-state")]
166 pub rsm_state_digest: Option<RsmEngineStateDigest>,
167}
168
169#[derive(Debug, Clone)]
171pub struct JournalBatcherConfig {
172 pub channel_capacity: usize,
174 pub max_batch_size: usize,
176 pub flush_interval_ms: u64,
178 pub persist_digests: bool,
180 pub wal_path: PathBuf,
182 #[cfg(feature = "rsm-state")]
184 pub rsm_environment: hypercall_db::ValidatorRsmEnvironment,
185 #[cfg(feature = "rsm-state")]
186 pub rsm_commitment_store: ValidatorRsmCommitmentStoreConfig,
187}
188
189impl Default for JournalBatcherConfig {
190 fn default() -> Self {
191 Self {
192 channel_capacity: DEFAULT_CHANNEL_CAPACITY,
193 max_batch_size: 100,
194 flush_interval_ms: 10,
195 persist_digests: true,
196 wal_path: PathBuf::from(hypercall_journal::checkpoint::DEFAULT_WAL_PATH),
197 #[cfg(feature = "rsm-state")]
198 rsm_environment: hypercall_db::ValidatorRsmEnvironment::Development,
199 #[cfg(feature = "rsm-state")]
200 rsm_commitment_store: ValidatorRsmCommitmentStoreConfig {
201 root_dir: PathBuf::from("target/validator-rsm-state"),
202 prune_window: 10_000,
203 pruning_enabled: false,
204 },
205 }
206 }
207}
208
209impl JournalBatcherConfig {
210 pub fn from_config(config: &crate::backend_config::JournalRuntimeConfig) -> Self {
211 Self {
212 channel_capacity: config.batch_channel_capacity,
213 max_batch_size: config.batch_size,
214 flush_interval_ms: config.flush_interval_ms,
215 persist_digests: config.digests_enabled,
216 wal_path: hypercall_journal::checkpoint::wal_path_from_config(config.wal_path.as_ref()),
217 #[cfg(feature = "rsm-state")]
218 rsm_environment: config.rsm_environment.into(),
219 #[cfg(feature = "rsm-state")]
220 rsm_commitment_store: ValidatorRsmCommitmentStoreConfig {
221 root_dir: config.rsm_state.state_store_path.clone(),
222 prune_window: config.rsm_state.prune_window_versions,
223 pruning_enabled: config.rsm_state.pruning_enabled,
224 },
225 }
226 }
227}
228
229pub enum JournalMessage {
231 Entry(JournalEntry),
233 Flush(oneshot::Sender<()>),
236}
237
238pub type JournalBatchSender = mpsc::Sender<JournalMessage>;
240
241enum ReplicationMessage {
242 Entries(Vec<WalRecordWithOffset>),
243 Shutdown,
244 FlushBarrier(oneshot::Sender<()>),
247}
248
249#[derive(Debug, Clone, Copy)]
250struct ReplicationProgress {
251 last_command_id: i64,
252 max_l2_seq: i64,
253}
254
255#[cfg(feature = "rsm-state")]
256struct RsmWalWriterState {
257 commitments: ValidatorRsmStateCommitmentRuntime,
258 next_batch_seq: u64,
259 prev_block_hash: [u8; 32],
260 global_command_seq: u64,
261 block_signer: Option<alloy::signers::local::PrivateKeySigner>,
262}
263
264#[cfg(feature = "rsm-state")]
265struct PreparedWalBlock {
266 block: WalBlockRecordWithRoots,
267 prepared_commitment: PreparedBatchCommitment,
268}
269
270#[cfg(feature = "rsm-state")]
271impl RsmWalWriterState {
272 fn prepare_block(&mut self, entries: &[JournalEntry]) -> Option<PreparedWalBlock> {
273 let identity_hashes = engine_command_identity_hashes(entries);
274 if identity_hashes.is_empty() {
275 return None;
276 }
277 let timestamp = entries.last().map(|e| e.received_ts_ms).unwrap_or(0);
278 let signer = self
279 .block_signer
280 .as_ref()
281 .map(|signer| *signer.address().as_ref())
282 .unwrap_or([0u8; 20]);
283 let delta =
284 rsm_state_delta_from_entries(entries, &identity_hashes, self.global_command_seq)
285 .unwrap_or_else(|error| {
286 panic!(
287 "CRITICAL_FAILURE: failed to materialize validator RSM state delta: {}",
288 error
289 )
290 });
291 let prepared_commitment =
292 self.commitments
293 .prepare_rsm_batch(&delta)
294 .unwrap_or_else(|error| {
295 panic!(
296 "CRITICAL_FAILURE: failed to prepare validator RSM state batch: {}",
297 error
298 )
299 });
300 let roots: RsmBatchRoots = prepared_commitment.commitment.clone().into();
301
302 let mut block = WalBlockRecord::unsigned_from_identity_hashes(
303 self.next_batch_seq,
304 self.prev_block_hash,
305 &identity_hashes,
306 roots.batch_root,
307 self.global_command_seq,
308 timestamp,
309 signer,
310 );
311 block.signature = self.sign_block(&block);
312
313 Some(PreparedWalBlock {
314 block: WalBlockRecordWithRoots {
315 header: block,
316 roots,
317 },
318 prepared_commitment,
319 })
320 }
321
322 fn apply_prepared_block(&mut self, prepared: PreparedWalBlock) -> WalBlockRecordWithRoots {
323 self.commitments
324 .apply_prepared_rsm_batch(prepared.prepared_commitment)
325 .unwrap_or_else(|error| {
326 panic!(
327 "CRITICAL_FAILURE: failed to apply validator RSM state batch after WAL fsync: {}",
328 error
329 )
330 });
331 self.advance_block_state(&prepared.block.header);
332 prepared.block
333 }
334
335 fn sign_block(&self, unsigned: &WalBlockRecord) -> Vec<u8> {
336 let Some(ref signer) = self.block_signer else {
337 return Vec::new();
338 };
339
340 use alloy::primitives::B256;
341 use alloy::signers::SignerSync;
342
343 let hash = B256::from(unsigned.block_hash());
344 match signer.sign_hash_sync(&hash) {
345 Ok(sig) => {
346 let mut sig_bytes = [0u8; 65];
347 sig_bytes[..32].copy_from_slice(&sig.r().to_be_bytes::<32>());
348 sig_bytes[32..64].copy_from_slice(&sig.s().to_be_bytes::<32>());
349 sig_bytes[64] = sig.v() as u8;
350 sig_bytes.to_vec()
351 }
352 Err(e) => {
353 tracing::error!("Failed to sign block {}: {}", unsigned.batch_seq, e);
354 Vec::new()
355 }
356 }
357 }
358
359 fn advance_block_state(&mut self, block: &WalBlockRecord) {
360 let block_hash = block.block_hash();
361 let signed = !block.signature.is_empty();
362 tracing::debug!(
363 batch_seq = block.batch_seq,
364 command_count = block.command_count,
365 first_seq = block.first_seq,
366 block_hash = hex::encode(block_hash),
367 signed,
368 "Block produced"
369 );
370
371 self.prev_block_hash = block_hash;
372 self.next_batch_seq += 1;
373 self.global_command_seq += block.command_count;
374
375 counter!("ht_blocks_produced_total").increment(1);
376 }
377}
378
379pub struct EngineJournalBatcher {
382 pool: crate::db_handler::DbPool,
383 receiver: mpsc::Receiver<JournalMessage>,
384 config: JournalBatcherConfig,
385 shutdown: tokio::sync::broadcast::Receiver<()>,
386 wal_writer_tx: std::sync::mpsc::SyncSender<WalWriterMessage>,
388 wal_writer_handle: Option<std::thread::JoinHandle<()>>,
390 replication_tx: mpsc::Sender<ReplicationMessage>,
391 replication_rx: Option<mpsc::Receiver<ReplicationMessage>>,
392 #[cfg(feature = "rsm-state")]
394 rsm_environment: hypercall_db::ValidatorRsmEnvironment,
395}
396
397impl EngineJournalBatcher {
398 pub fn new(
400 pool: crate::db_handler::DbPool,
401 config: JournalBatcherConfig,
402 shutdown: tokio::sync::broadcast::Receiver<()>,
403 cold_signer: Option<alloy::signers::local::PrivateKeySigner>,
404 ) -> (Self, JournalBatchSender) {
405 let (tx, rx) = mpsc::channel(config.channel_capacity);
406
407 let mut wal_state = WalState::open(config.wal_path.clone()).unwrap_or_else(|e| {
409 panic!(
410 "CRITICAL_FAILURE: Failed to initialize WAL state: {}. Restart required.",
411 e
412 )
413 });
414
415 #[cfg(feature = "rsm-state")]
416 let rsm_environment = config.rsm_environment;
417 #[cfg(feature = "rsm-state")]
418 let mut commitments = ValidatorRsmStateCommitmentRuntime::open_from_db(
419 config.rsm_commitment_store.clone(),
420 Arc::new(pool.clone()),
421 rsm_environment,
422 )
423 .unwrap_or_else(|error| {
424 panic!(
425 "CRITICAL_FAILURE: failed to initialize validator RSM state store: {}",
426 error
427 )
428 });
429
430 if let Some(truncated_len) = Self::recover_wal_sync(
434 &pool,
435 &wal_state.wal_path,
436 &wal_state.checkpoint_path,
437 config.persist_digests,
438 config.max_batch_size,
439 #[cfg(feature = "rsm-state")]
440 config.rsm_environment,
441 #[cfg(feature = "rsm-state")]
442 Some(&mut commitments),
443 ) {
444 wal_state.next_offset = truncated_len;
445 wal_state.preallocated_end = truncated_len;
446 }
447
448 let (replication_tx, replication_rx) = mpsc::channel(config.channel_capacity);
449
450 let (wal_writer_tx, wal_writer_rx) = std::sync::mpsc::sync_channel(config.channel_capacity);
452
453 #[cfg(feature = "rsm-state")]
455 let mut block_signer = None;
456 #[cfg(feature = "rsm-state")]
457 if let Some(cold_signer) = cold_signer {
458 use alloy::primitives::B256;
459 use alloy::signers::SignerSync;
460
461 let hot_signer = alloy::signers::local::PrivateKeySigner::random();
462 let hot_address = hot_signer.address();
463 let cold_address = cold_signer.address();
464
465 let now_ms = std::time::SystemTime::now()
466 .duration_since(std::time::UNIX_EPOCH)
467 .unwrap()
468 .as_millis() as u64;
469 let valid_until = now_ms + 24 * 60 * 60 * 1000; let mut authority = SigningAuthority {
472 delegator: *cold_address.as_ref(),
473 delegate: *hot_address.as_ref(),
474 valid_from: now_ms,
475 valid_until,
476 signature: Vec::new(),
477 };
478
479 let delegation_hash = B256::from(authority.delegation_hash());
480 match cold_signer.sign_hash_sync(&delegation_hash) {
481 Ok(sig) => {
482 let mut sig_bytes = [0u8; 65];
483 sig_bytes[..32].copy_from_slice(&sig.r().to_be_bytes::<32>());
484 sig_bytes[32..64].copy_from_slice(&sig.s().to_be_bytes::<32>());
485 sig_bytes[64] = sig.v() as u8;
486 authority.signature = sig_bytes.to_vec();
487
488 info!(
489 cold_key = %cold_address,
490 hot_key = %hot_address,
491 valid_until_h = (valid_until - now_ms) / 3_600_000,
492 "Block signing: cold key delegated to ephemeral hot key (24h)"
493 );
494
495 block_signer = Some(hot_signer);
496 }
497 Err(e) => {
498 error!(
499 "Failed to sign delegation with cold key: {}. Blocks will be unsigned.",
500 e
501 );
502 }
503 }
504 }
505 #[cfg(not(feature = "rsm-state"))]
506 let _ = cold_signer;
507 #[cfg(feature = "rsm-state")]
508 let (next_batch_seq, prev_block_hash, global_command_seq) =
509 Self::load_rsm_block_cursor(&pool, rsm_environment);
510
511 #[cfg(feature = "rsm-state")]
512 let rsm_writer_state = RsmWalWriterState {
513 commitments,
514 next_batch_seq,
515 prev_block_hash,
516 global_command_seq,
517 block_signer,
518 };
519
520 let replication_tx_for_thread = replication_tx.clone();
523 let wal_writer_handle = std::thread::Builder::new()
524 .name("wal-writer".to_string())
525 .spawn(move || {
526 wal_writer_thread_main(
527 wal_state,
528 wal_writer_rx,
529 replication_tx_for_thread,
530 #[cfg(feature = "rsm-state")]
531 rsm_writer_state,
532 );
533 })
534 .expect("CRITICAL_FAILURE: failed to spawn wal-writer thread");
535
536 let batcher = Self {
537 pool,
538 receiver: rx,
539 config,
540 shutdown,
541 wal_writer_tx,
542 wal_writer_handle: Some(wal_writer_handle),
543 replication_tx,
544 replication_rx: Some(replication_rx),
545 #[cfg(feature = "rsm-state")]
546 rsm_environment,
547 };
548 (batcher, tx)
549 }
550
551 #[cfg(feature = "rsm-state")]
552 fn load_rsm_block_cursor(
553 pool: &crate::db_handler::DbPool,
554 rsm_environment: hypercall_db::ValidatorRsmEnvironment,
555 ) -> (u64, [u8; 32], u64) {
556 let handler =
557 hypercall_db_diesel::DatabaseHandler::with_pool_no_migrations(Arc::new(pool.clone()));
558 match handler.get_latest_rsm_block_sync(rsm_environment) {
559 Ok(Some(block)) => {
560 let next_batch_seq = block.block.height + 1;
561 let prev_block_hash = block.block.hash;
562 let global_command_seq = block.block.last_command_seq + 1;
563 info!(
564 environment = %rsm_environment,
565 next_batch_seq = next_batch_seq,
566 global_command_seq = global_command_seq,
567 "Loaded validator RSM block cursor"
568 );
569 (next_batch_seq, prev_block_hash, global_command_seq)
570 }
571 Ok(None) => {
572 info!(
573 environment = %rsm_environment,
574 "No validator RSM block cursor found; starting at genesis"
575 );
576 (0, [0u8; 32], 0)
577 }
578 Err(error) => {
579 panic!(
580 "CRITICAL_FAILURE: failed to load validator RSM block cursor: {}",
581 error
582 );
583 }
584 }
585 }
586
587 fn recover_wal_sync(
590 pool: &crate::db_handler::DbPool,
591 wal_path: &Path,
592 checkpoint_path: &Path,
593 persist_digests: bool,
594 max_batch_size: usize,
595 #[cfg(feature = "rsm-state")] rsm_environment: hypercall_db::ValidatorRsmEnvironment,
596 #[cfg(feature = "rsm-state")] mut rsm_commitments: Option<
597 &mut ValidatorRsmStateCommitmentRuntime,
598 >,
599 ) -> Option<u64> {
600 if !wal_path.exists() {
605 return None;
606 }
607
608 let checkpoint = read_checkpoint(checkpoint_path).unwrap_or_else(|e| {
609 panic!(
610 "CRITICAL_FAILURE: Failed to read WAL checkpoint at startup: {}. Restart required.",
611 e
612 )
613 });
614
615 let mut file = OpenOptions::new()
617 .read(true)
618 .write(true)
619 .open(wal_path)
620 .unwrap_or_else(|e| {
621 panic!(
622 "CRITICAL_FAILURE: Failed to open WAL {} for replay: {}. Restart required.",
623 wal_path.display(),
624 e
625 )
626 });
627
628 let file_len = file
629 .metadata()
630 .unwrap_or_else(|e| {
631 panic!(
632 "CRITICAL_FAILURE: Failed to stat WAL {}: {}",
633 wal_path.display(),
634 e
635 )
636 })
637 .len();
638
639 if checkpoint.wal_offset > file_len {
640 panic!(
641 "CRITICAL_FAILURE: WAL checkpoint {} exceeds file length {} for {}",
642 checkpoint.wal_offset,
643 file_len,
644 wal_path.display()
645 );
646 }
647
648 if checkpoint.wal_offset == file_len {
649 return None;
651 }
652
653 file.seek(SeekFrom::Start(checkpoint.wal_offset))
654 .unwrap_or_else(|e| panic!("CRITICAL_FAILURE: Failed to seek WAL replay start: {}", e));
655
656 info!(
657 wal_path = %wal_path.display(),
658 checkpoint_path = %checkpoint_path.display(),
659 checkpoint_offset = checkpoint.wal_offset,
660 file_len = file_len,
661 "Streaming WAL recovery to PostgreSQL before engine startup"
662 );
663
664 let batch_size = max_batch_size.max(1);
665 let mut offset = checkpoint.wal_offset;
666 let mut last_good_offset = checkpoint.wal_offset;
669 let mut total_recovered: u64 = 0;
670 let mut hit_torn_record = false;
671
672 loop {
674 let mut batch: Vec<WalRecordWithOffset> = Vec::with_capacity(batch_size);
675
676 for _ in 0..batch_size {
678 if batch.len() >= batch_size {
679 break;
680 }
681
682 let record_start_offset = offset;
683
684 let mut len_buf = [0u8; 4];
686 match file.read_exact(&mut len_buf) {
687 Ok(()) => {}
688 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
689 hit_torn_record = true;
690 break;
691 }
692 Err(e) => {
693 panic!(
694 "CRITICAL_FAILURE: Failed reading WAL length prefix at offset {}: {}",
695 offset, e
696 );
697 }
698 }
699
700 let payload_len = u32::from_le_bytes(len_buf) as usize;
701 if payload_len == 0 {
702 panic!(
703 "CRITICAL_FAILURE: WAL record length was zero at offset {} in {}. Data corruption detected.",
704 offset,
705 wal_path.display()
706 );
707 }
708 offset += 4;
709
710 let remaining = file_len.saturating_sub(offset);
713 if payload_len as u64 > remaining {
714 warn!(
715 "WAL declared payload_len={} but only {} bytes remain at offset {} in {}, \
716 truncating tail and replaying prior durable records only",
717 payload_len,
718 remaining,
719 offset,
720 wal_path.display(),
721 );
722 hit_torn_record = true;
723 break;
724 }
725
726 let mut crc_buf = [0u8; 4];
728 match file.read_exact(&mut crc_buf) {
729 Ok(()) => {}
730 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
731 warn!(
732 "WAL ended with incomplete CRC header at offset {} in {} ({}), \
733 truncating tail and replaying prior durable records only",
734 offset,
735 wal_path.display(),
736 e
737 );
738 hit_torn_record = true;
739 break;
740 }
741 Err(e) => {
742 panic!(
743 "CRITICAL_FAILURE: Failed reading WAL CRC prefix at offset {}: {}",
744 offset, e
745 );
746 }
747 }
748 offset += 4;
749
750 let expected_crc = u32::from_le_bytes(crc_buf);
751 let mut payload = vec![0u8; payload_len];
752
753 match file.read_exact(&mut payload) {
755 Ok(()) => {}
756 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
757 warn!(
758 "WAL ended with incomplete payload at offset {} in {} ({}), \
759 truncating tail and replaying prior durable records only",
760 offset,
761 wal_path.display(),
762 e
763 );
764 hit_torn_record = true;
765 break;
766 }
767 Err(e) => {
768 panic!(
769 "CRITICAL_FAILURE: Failed reading WAL payload at offset {}: {}",
770 offset, e
771 );
772 }
773 }
774 offset += payload_len as u64;
775
776 let actual_crc = WAL_CRC.checksum(&payload);
777 if actual_crc != expected_crc {
778 panic!(
779 "CRITICAL_FAILURE: WAL CRC mismatch at end_offset {} in {}: expected {}, got {}. Data corruption detected.",
780 offset,
781 wal_path.display(),
782 expected_crc,
783 actual_crc
784 );
785 }
786
787 let mut decoded_records = Vec::new();
788 #[cfg(feature = "rsm-state")]
789 if let Ok(block_entry) = rmp_serde::from_slice::<WalDurableBlockEntry>(&payload) {
790 let header = block_entry.header;
791 let roots = block_entry.roots;
792 for (index, cmd) in block_entry.commands.into_iter().enumerate() {
793 decoded_records.push(WalRecordWithOffset {
794 record: cmd,
795 end_offset: offset,
796 block_header: (index == 0).then_some(header.clone()),
797 block_roots: (index == 0).then_some(roots),
798 });
799 }
800 } else if rmp_serde::from_slice::<WalBlockEntry>(&payload).is_ok() {
801 panic!(
802 "CRITICAL_FAILURE: legacy rootless RSM WAL block at end_offset {} is unsupported after durable root cutover",
803 offset
804 );
805 } else {
806 match rmp_serde::from_slice::<WalJournalRecord>(&payload) {
807 Ok(record) => {
808 decoded_records.push(WalRecordWithOffset {
809 record,
810 end_offset: offset,
811 block_header: None,
812 block_roots: None,
813 });
814 }
815 Err(_) => {
816 if rmp_serde::from_slice::<WalBlockRecord>(&payload).is_ok() {
817 tracing::debug!(
818 end_offset = offset,
819 "Skipping legacy WAL block record during recovery"
820 );
821 last_good_offset = offset;
822 continue;
823 }
824
825 panic!(
826 "CRITICAL_FAILURE: Failed to deserialize WAL payload at end_offset {}",
827 offset
828 );
829 }
830 }
831 }
832 #[cfg(not(feature = "rsm-state"))]
833 match rmp_serde::from_slice::<WalJournalRecord>(&payload) {
834 Ok(record) => {
835 decoded_records.push(WalRecordWithOffset {
836 record,
837 end_offset: offset,
838 });
839 }
840 Err(_) => {
841 panic!(
842 "CRITICAL_FAILURE: Failed to deserialize WAL payload at end_offset {}",
843 offset
844 );
845 }
846 }
847
848 if should_defer_replay_record(batch.len(), decoded_records.len(), batch_size) {
849 file.seek(SeekFrom::Start(record_start_offset))
850 .unwrap_or_else(|e| {
851 panic!(
852 "CRITICAL_FAILURE: Failed to rewind WAL replay batch boundary: {}",
853 e
854 )
855 });
856 offset = record_start_offset;
857 break;
858 }
859
860 if decoded_records.len() > batch_size {
861 warn!(
862 commands = decoded_records.len(),
863 batch_size = batch_size,
864 end_offset = offset,
865 "WAL replay physical record exceeds command batch size; replaying it atomically"
866 );
867 }
868
869 for record in decoded_records {
870 batch.push(WalRecordWithOffset {
871 record: record.record,
872 end_offset: record.end_offset,
873 #[cfg(feature = "rsm-state")]
874 block_header: record.block_header,
875 #[cfg(feature = "rsm-state")]
876 block_roots: record.block_roots,
877 });
878 }
879
880 last_good_offset = offset;
881 }
882
883 if batch.is_empty() {
885 break;
886 }
887
888 let batch_len = batch.len() as u64;
889
890 #[cfg(feature = "rsm-state")]
891 if let Some(commitments) = rsm_commitments.as_deref_mut() {
892 apply_recovered_rsm_commitment_blocks(&mut batch, commitments).unwrap_or_else(
893 |error| {
894 panic!(
895 "CRITICAL_FAILURE: WAL replay failed to rebuild local validator RSM state before root persistence: {}",
896 error
897 )
898 },
899 );
900 }
901
902 let db_entries: Vec<JournalEntry> =
904 batch.iter().map(WalRecordWithOffset::to_db_entry).collect();
905 let (_inserted_count, mut inserted_commands) = match Self::insert_batch(
906 pool,
907 &db_entries,
908 persist_digests,
909 #[cfg(feature = "rsm-state")]
910 Some(
911 Self::rsm_block_persistence_batch(RsmBlockPersistenceInput {
912 environment: rsm_environment,
913 records: &batch,
914 mode: RsmBlockPersistenceMode::Replay,
915 })
916 .unwrap_or_else(|error| {
917 panic!(
918 "CRITICAL_FAILURE: WAL replay failed to prepare RSM block persistence: {}",
919 error
920 )
921 }),
922 ),
923 ) {
924 Ok(result) => result,
925 Err(e) => {
926 panic!(
927 "CRITICAL_FAILURE: WAL replay insert failed: {}. Startup cannot continue.",
928 e
929 );
930 }
931 };
932
933 if inserted_commands.is_empty() {
934 let request_ids: Vec<hypercall_db_diesel::engine_enums::DbUuid> = batch
935 .iter()
936 .map(|record| record.record.request_uuid)
937 .collect();
938 inserted_commands =
939 Self::lookup_command_ids_by_request_ids(pool, &request_ids).unwrap_or_else(
940 |e| {
941 panic!(
942 "CRITICAL_FAILURE: WAL replay command_id lookup failed: {}. Startup cannot continue.",
943 e
944 )
945 },
946 );
947 if inserted_commands.is_empty() {
948 panic!(
949 "CRITICAL_FAILURE: WAL replay insert and command_id lookup returned empty for non-empty chunk."
950 );
951 }
952 }
953
954 let progress = ReplicationProgress {
955 last_command_id: inserted_commands
956 .iter()
957 .map(|(_, command_id)| *command_id)
958 .max()
959 .expect("WAL replay command IDs unexpectedly empty"),
960 max_l2_seq: batch
961 .iter()
962 .flat_map(|record| record.record.events.iter().filter_map(|e| e.l2_sequence))
963 .max()
964 .unwrap_or(0),
965 };
966
967 let last_offset = batch
968 .last()
969 .expect("WAL replay chunk unexpectedly empty")
970 .end_offset;
971
972 if let Err(e) = Self::advance_checkpoint(checkpoint_path, last_offset, progress) {
974 panic!(
975 "CRITICAL_FAILURE: Failed to advance WAL checkpoint after replay: {}",
976 e
977 );
978 }
979
980 total_recovered += batch_len;
981
982 drop(batch);
984
985 debug!(
986 total_recovered = total_recovered,
987 current_offset = offset,
988 file_len = file_len,
989 "WAL streaming recovery progress"
990 );
991 }
992
993 let truncated = if hit_torn_record && last_good_offset < file_len {
998 warn!(
999 "Truncating WAL {} from {} to {} bytes to remove torn tail",
1000 wal_path.display(),
1001 file_len,
1002 last_good_offset
1003 );
1004 file.set_len(last_good_offset).unwrap_or_else(|e| {
1005 panic!(
1006 "CRITICAL_FAILURE: Failed to truncate WAL {} to {}: {}",
1007 wal_path.display(),
1008 last_good_offset,
1009 e
1010 )
1011 });
1012 Some(last_good_offset)
1013 } else {
1014 None
1015 };
1016
1017 if total_recovered > 0 {
1018 info!(
1019 total_recovered = total_recovered,
1020 "WAL streaming recovery complete"
1021 );
1022 }
1023
1024 counter!("ht_journal_wal_replay_entries_total").increment(total_recovered);
1025 truncated
1026 }
1027
1028 pub async fn run(mut self) {
1034 info!(
1035 "Starting WAL-first journal batcher (group_commit=true, batch_size={}, digests={})",
1036 self.config.max_batch_size, self.config.persist_digests
1037 );
1038
1039 let flush_interval = Duration::from_millis(self.config.flush_interval_ms);
1040 let wal_path = self.config.wal_path.clone();
1041 let checkpoint_path = checkpoint_path_for_wal(&wal_path);
1042
1043 let replication_rx = self
1044 .replication_rx
1045 .take()
1046 .expect("Replication receiver missing; batcher initialized incorrectly");
1047
1048 let replication_handle = tokio::spawn(Self::replication_loop(
1049 self.pool.clone(),
1050 self.config.persist_digests,
1051 self.config.max_batch_size,
1052 flush_interval,
1053 checkpoint_path,
1054 wal_path,
1055 replication_rx,
1056 #[cfg(feature = "rsm-state")]
1057 self.rsm_environment,
1058 ));
1059
1060 let mut batch: Vec<JournalEntry> = Vec::with_capacity(self.config.max_batch_size);
1061
1062 loop {
1063 tokio::select! {
1064 biased;
1065
1066 _ = self.shutdown.recv() => {
1067 info!("Journal batcher received shutdown, flushing remaining {} entries", batch.len());
1068 if !batch.is_empty() {
1069 self.flush_batch(&mut batch, None).await;
1070 }
1071
1072 let mut drained_count = 0;
1074 while let Ok(msg) = self.receiver.try_recv() {
1075 match msg {
1076 JournalMessage::Entry(entry) => {
1077 drained_count += 1;
1078 batch.push(entry);
1079 if batch.len() >= self.config.max_batch_size {
1080 self.flush_batch(&mut batch, None).await;
1081 }
1082 }
1083 JournalMessage::Flush(ack) => {
1084 let (complete_tx, complete_rx) = oneshot::channel();
1085 if !batch.is_empty() {
1086 self.flush_batch(&mut batch, Some(complete_tx)).await;
1087 } else {
1088 self.send_flush_barrier(complete_tx).await;
1089 }
1090 let _ = complete_rx.await;
1091 self.await_replication_flush().await;
1092 let _ = ack.send(());
1093 }
1094 }
1095 }
1096
1097 if !batch.is_empty() {
1098 let (complete_tx, complete_rx) = oneshot::channel();
1100 self.flush_batch(&mut batch, Some(complete_tx)).await;
1101 let _ = complete_rx.await;
1102 }
1103
1104 if drained_count > 0 {
1105 info!("Drained {} additional entries to WAL during shutdown", drained_count);
1106 }
1107
1108 let _ = self.wal_writer_tx.send(WalWriterMessage::Shutdown);
1110 if let Some(handle) = self.wal_writer_handle.take() {
1111 handle.join().expect("CRITICAL_FAILURE: wal-writer thread panicked");
1112 }
1113
1114 if let Err(e) = self.replication_tx.send(ReplicationMessage::Shutdown).await {
1115 warn!("Failed to signal replication shutdown: {}", e);
1116 }
1117
1118 match replication_handle.await {
1119 Ok(()) => {}
1120 Err(e) => {
1121 panic!(
1122 "CRITICAL_FAILURE: Journal replication task panicked during shutdown: {}",
1123 e
1124 );
1125 }
1126 }
1127
1128 break;
1129 }
1130
1131 msg = self.receiver.recv() => {
1132 match msg {
1133 Some(JournalMessage::Entry(e)) => {
1134 let queue_latency = e.created_at.elapsed();
1136 histogram!("ht_journal_queue_latency_seconds")
1137 .record(queue_latency.as_secs_f64());
1138
1139 batch.push(e);
1140
1141 while batch.len() < self.config.max_batch_size {
1145 match self.receiver.try_recv() {
1146 Ok(JournalMessage::Entry(e2)) => {
1147 let queue_latency = e2.created_at.elapsed();
1148 histogram!("ht_journal_queue_latency_seconds")
1149 .record(queue_latency.as_secs_f64());
1150 batch.push(e2);
1151 }
1152 Ok(JournalMessage::Flush(ack)) => {
1153 let (complete_tx, complete_rx) = oneshot::channel();
1156 if !batch.is_empty() {
1157 self.flush_batch(&mut batch, Some(complete_tx)).await;
1158 } else {
1159 self.send_flush_barrier(complete_tx).await;
1160 }
1161 let _ = complete_rx.await;
1162 self.await_replication_flush().await;
1163 let _ = ack.send(());
1164 }
1165 Err(_) => break,
1166 }
1167 }
1168
1169 if !batch.is_empty() {
1171 self.flush_batch(&mut batch, None).await;
1172 }
1173 }
1174 Some(JournalMessage::Flush(ack)) => {
1175 let (complete_tx, complete_rx) = oneshot::channel();
1178 if !batch.is_empty() {
1179 self.flush_batch(&mut batch, Some(complete_tx)).await;
1180 } else {
1181 self.send_flush_barrier(complete_tx).await;
1182 }
1183 let _ = complete_rx.await;
1184 self.await_replication_flush().await;
1187 let _ = ack.send(());
1188 }
1189 None => {
1190 info!("Journal batcher channel closed, flushing remaining {} entries", batch.len());
1191 if !batch.is_empty() {
1192 self.flush_batch(&mut batch, None).await;
1193 }
1194 let _ = self.wal_writer_tx.send(WalWriterMessage::Shutdown);
1196 if let Some(handle) = self.wal_writer_handle.take() {
1197 handle.join().expect("CRITICAL_FAILURE: wal-writer thread panicked");
1198 }
1199 let _ = self.replication_tx.send(ReplicationMessage::Shutdown).await;
1200 match replication_handle.await {
1201 Ok(()) => {}
1202 Err(e) => {
1203 panic!(
1204 "CRITICAL_FAILURE: Journal replication task panicked while closing channel: {}",
1205 e
1206 );
1207 }
1208 }
1209 break;
1210 }
1211 }
1212 }
1213 }
1214 }
1215
1216 info!("Journal batcher stopped");
1217 }
1218
1219 async fn flush_batch(
1227 &mut self,
1228 batch: &mut Vec<JournalEntry>,
1229 completion: Option<oneshot::Sender<()>>,
1230 ) {
1231 if batch.is_empty() {
1232 return;
1233 }
1234
1235 let batch_size = batch.len();
1236 let mut entries = std::mem::take(batch);
1237
1238 histogram!("ht_journal_batch_size").record(batch_size as f64);
1239
1240 let ack_senders: Vec<Option<oneshot::Sender<()>>> =
1241 entries.iter_mut().map(|e| e.commit_ack.take()).collect();
1242
1243 let msg = WalWriterMessage::WriteBatch {
1244 entries,
1245 ack_senders,
1246 completion,
1247 };
1248
1249 let tx = self.wal_writer_tx.clone();
1252 tokio::task::spawn_blocking(move || {
1253 tx.send(msg)
1254 .expect("CRITICAL_FAILURE: wal-writer thread dead. Restart required.");
1255 })
1256 .await
1257 .expect("CRITICAL_FAILURE: spawn_blocking failed sending to wal-writer");
1258 }
1259
1260 async fn send_flush_barrier(&self, completion: oneshot::Sender<()>) {
1265 let tx = self.wal_writer_tx.clone();
1266 tokio::task::spawn_blocking(move || {
1267 tx.send(WalWriterMessage::FlushBarrier { completion })
1268 .expect("CRITICAL_FAILURE: wal-writer thread dead. Restart required.");
1269 })
1270 .await
1271 .expect("CRITICAL_FAILURE: spawn_blocking failed sending flush barrier");
1272 }
1273
1274 async fn await_replication_flush(&self) {
1284 let (tx, rx) = oneshot::channel();
1285 if let Err(e) = self
1286 .replication_tx
1287 .send(ReplicationMessage::FlushBarrier(tx))
1288 .await
1289 {
1290 warn!(
1291 "Failed to send replication flush barrier (channel closed during shutdown): {}",
1292 e
1293 );
1294 return;
1295 }
1296 match rx.await {
1297 Ok(()) => {
1298 debug!("Replication flush barrier completed — checkpoint is up-to-date");
1299 }
1300 Err(_) => {
1301 warn!("Replication flush barrier ack dropped (replication task exited)");
1302 }
1303 }
1304 }
1305
1306 async fn replication_loop(
1315 pool: crate::db_handler::DbPool,
1316 persist_digests: bool,
1317 max_batch_size: usize,
1318 flush_interval: Duration,
1319 checkpoint_path: PathBuf,
1320 wal_path: PathBuf,
1321 mut receiver: mpsc::Receiver<ReplicationMessage>,
1322 #[cfg(feature = "rsm-state")] rsm_environment: hypercall_db::ValidatorRsmEnvironment,
1323 ) {
1324 info!("Journal replication loop started");
1325
1326 let mut pending: Vec<WalRecordWithOffset> = Vec::new();
1327 let mut last_flush = Instant::now();
1328 let mut last_punched_offset: u64 =
1329 Self::read_checkpoint_or_panic(&checkpoint_path).wal_offset;
1330 Self::emit_wal_checkpoint_metrics(&wal_path, last_punched_offset);
1331
1332 loop {
1333 tokio::select! {
1334 biased;
1335
1336 msg = receiver.recv() => {
1337 match msg {
1338 Some(ReplicationMessage::Entries(entries)) => {
1339 pending.extend(entries);
1340 gauge!("ht_journal_replication_backlog").set(pending.len() as f64);
1341 if pending.len() >= max_batch_size {
1342 if let Err(e) = Self::replicate_pending(
1343 &pool,
1344 persist_digests,
1345 max_batch_size,
1346 &checkpoint_path,
1347 &mut pending,
1348 #[cfg(feature = "rsm-state")]
1349 rsm_environment,
1350 ).await {
1351 error!(
1353 pending = pending.len(),
1354 "Replication batch failed (WAL tail will grow): {}",
1355 e
1356 );
1357 counter!("ht_journal_replication_batch_failures_total").increment(1);
1358 } else {
1359 last_flush = Instant::now();
1360 let cp = Self::read_checkpoint_or_panic(&checkpoint_path);
1361 Self::emit_wal_checkpoint_metrics(&wal_path, cp.wal_offset);
1362 if cp.wal_offset > last_punched_offset {
1363 Self::punch_wal_hole(&wal_path, last_punched_offset, cp.wal_offset);
1364 last_punched_offset = cp.wal_offset;
1365 }
1366 }
1367 }
1368 }
1369 Some(ReplicationMessage::FlushBarrier(ack)) => {
1370 if !pending.is_empty() {
1376 if let Err(e) = Self::replicate_pending(
1377 &pool,
1378 persist_digests,
1379 max_batch_size,
1380 &checkpoint_path,
1381 &mut pending,
1382 #[cfg(feature = "rsm-state")]
1383 rsm_environment,
1384 ).await {
1385 error!(
1386 "CRITICAL_FAILURE: replication flush barrier failed: {}. \
1387 Cannot write snapshot with stale checkpoint. Aborting process.",
1388 e
1389 );
1390 std::process::abort();
1391 }
1392 last_flush = Instant::now();
1393 let cp = Self::read_checkpoint_or_panic(&checkpoint_path);
1394 Self::emit_wal_checkpoint_metrics(&wal_path, cp.wal_offset);
1395 if cp.wal_offset > last_punched_offset {
1396 Self::punch_wal_hole(&wal_path, last_punched_offset, cp.wal_offset);
1397 last_punched_offset = cp.wal_offset;
1398 }
1399 }
1400 let _ = ack.send(());
1401 }
1402 Some(ReplicationMessage::Shutdown) => {
1403 info!("Journal replication received shutdown with {} pending entries", pending.len());
1404 Self::drain_replication_channel(&mut receiver, &mut pending);
1405 if let Err(e) = Self::replicate_pending(
1406 &pool,
1407 persist_digests,
1408 max_batch_size,
1409 &checkpoint_path,
1410 &mut pending,
1411 #[cfg(feature = "rsm-state")]
1412 rsm_environment,
1413 ).await {
1414 error!("Replication failed during shutdown (data safe in WAL): {}", e);
1415 counter!("ht_journal_replication_batch_failures_total").increment(1);
1416 }
1417 break;
1418 }
1419 None => {
1420 info!("Replication channel closed with {} pending entries", pending.len());
1421 if let Err(e) = Self::replicate_pending(
1422 &pool,
1423 persist_digests,
1424 max_batch_size,
1425 &checkpoint_path,
1426 &mut pending,
1427 #[cfg(feature = "rsm-state")]
1428 rsm_environment,
1429 ).await {
1430 error!("Replication failed on channel close (data safe in WAL): {}", e);
1431 counter!("ht_journal_replication_batch_failures_total").increment(1);
1432 }
1433 break;
1434 }
1435 }
1436 }
1437
1438 _ = tokio::time::sleep(flush_interval) => {
1439 if !pending.is_empty() && last_flush.elapsed() >= flush_interval {
1440 if let Err(e) = Self::replicate_pending(
1441 &pool,
1442 persist_digests,
1443 max_batch_size,
1444 &checkpoint_path,
1445 &mut pending,
1446 #[cfg(feature = "rsm-state")]
1447 rsm_environment,
1448 ).await {
1449 error!(
1451 pending = pending.len(),
1452 "Replication flush failed (WAL tail will grow): {}",
1453 e
1454 );
1455 counter!("ht_journal_replication_batch_failures_total").increment(1);
1456 } else {
1457 last_flush = Instant::now();
1458 }
1459 let cp = Self::read_checkpoint_or_panic(&checkpoint_path);
1460 Self::emit_wal_checkpoint_metrics(&wal_path, cp.wal_offset);
1461 if cp.wal_offset > last_punched_offset {
1462 Self::punch_wal_hole(&wal_path, last_punched_offset, cp.wal_offset);
1463 last_punched_offset = cp.wal_offset;
1464 }
1465 }
1466 }
1467 }
1468 }
1469
1470 info!("Journal replication loop stopped");
1471 }
1472
1473 fn read_checkpoint_or_panic(path: &Path) -> WalCheckpointMetadata {
1474 read_checkpoint(path).unwrap_or_else(|e| {
1475 panic!(
1476 "CRITICAL_FAILURE: failed to read/parse checkpoint {}: {}",
1477 path.display(),
1478 e
1479 )
1480 })
1481 }
1482
1483 fn drain_replication_channel(
1484 receiver: &mut mpsc::Receiver<ReplicationMessage>,
1485 pending: &mut Vec<WalRecordWithOffset>,
1486 ) {
1487 while let Ok(msg) = receiver.try_recv() {
1488 match msg {
1489 ReplicationMessage::Entries(entries) => pending.extend(entries),
1490 ReplicationMessage::FlushBarrier(ack) => {
1491 let _ = ack.send(());
1494 }
1495 ReplicationMessage::Shutdown => break,
1496 }
1497 }
1498 }
1499
1500 fn punch_wal_hole(wal_path: &Path, from: u64, to: u64) {
1504 if to <= from {
1505 return;
1506 }
1507
1508 #[cfg(target_os = "linux")]
1509 {
1510 use std::os::unix::io::AsRawFd;
1511 let file = match OpenOptions::new().write(true).open(wal_path) {
1512 Ok(f) => f,
1513 Err(e) => {
1514 warn!("Failed to open WAL for hole punch: {}", e);
1515 return;
1516 }
1517 };
1518 let ret = unsafe {
1519 libc::fallocate(
1520 file.as_raw_fd(),
1521 libc::FALLOC_FL_PUNCH_HOLE | libc::FALLOC_FL_KEEP_SIZE,
1522 from as i64,
1523 (to - from) as i64,
1524 )
1525 };
1526 if ret == 0 {
1527 let reclaimed_mb = (to - from) / (1024 * 1024);
1528 counter!("ht_wal_hole_punch_bytes").increment(to - from);
1529 debug!(
1530 "WAL hole punch: reclaimed ~{}MB (offset {}..{})",
1531 reclaimed_mb, from, to
1532 );
1533 } else {
1534 let errno = std::io::Error::last_os_error();
1535 warn!(
1536 "WAL hole punch failed (non-fatal): {} (offset {}..{})",
1537 errno, from, to
1538 );
1539 }
1540 }
1541
1542 #[cfg(not(target_os = "linux"))]
1543 {
1544 let _ = (wal_path, from, to);
1545 }
1546 }
1547
1548 fn advance_checkpoint(
1549 checkpoint_path: &Path,
1550 last_offset: u64,
1551 progress: ReplicationProgress,
1552 ) -> Result<(), String> {
1553 let current = read_checkpoint(checkpoint_path)?;
1554 let next = WalCheckpointMetadata {
1555 wal_offset: last_offset,
1556 last_command_id: current.last_command_id.max(progress.last_command_id),
1557 last_l2_seq: current.last_l2_seq.max(progress.max_l2_seq),
1558 };
1559 write_checkpoint(checkpoint_path, next)
1560 }
1561
1562 fn emit_wal_checkpoint_metrics(wal_path: &Path, checkpoint_offset: u64) {
1564 gauge!("ht_wal_checkpoint_offset_bytes").set(checkpoint_offset as f64);
1565 match std::fs::metadata(wal_path) {
1566 Ok(meta) => {
1567 let unreplicated = meta.len().saturating_sub(checkpoint_offset);
1568 gauge!("ht_wal_unreplicated_bytes").set(unreplicated as f64);
1569 }
1570 Err(e) => {
1571 warn!("Failed to stat WAL for checkpoint metrics: {}", e);
1572 }
1573 }
1574 }
1575
1576 async fn replicate_pending(
1577 pool: &crate::db_handler::DbPool,
1578 persist_digests: bool,
1579 max_batch_size: usize,
1580 checkpoint_path: &Path,
1581 pending: &mut Vec<WalRecordWithOffset>,
1582 #[cfg(feature = "rsm-state")] rsm_environment: hypercall_db::ValidatorRsmEnvironment,
1583 ) -> Result<(), String> {
1584 while !pending.is_empty() {
1585 let chunk_len = next_replication_chunk_len(pending, max_batch_size);
1586 let chunk: Vec<WalRecordWithOffset> = pending.iter().take(chunk_len).cloned().collect();
1587
1588 match Self::replicate_chunk(
1589 pool,
1590 persist_digests,
1591 &chunk,
1592 #[cfg(feature = "rsm-state")]
1593 rsm_environment,
1594 )
1595 .await
1596 {
1597 Ok(progress) => {
1598 let last_offset = chunk
1599 .last()
1600 .expect("replication chunk unexpectedly empty")
1601 .end_offset;
1602 Self::advance_checkpoint(checkpoint_path, last_offset, progress)?;
1603 pending.drain(..chunk_len);
1604 gauge!("ht_journal_replication_backlog").set(pending.len() as f64);
1605 counter!("ht_journal_replication_chunks_total").increment(1);
1606 }
1607 Err(e) => {
1608 counter!("ht_journal_replication_errors_total").increment(1);
1609 warn!(
1610 "Journal replication chunk failed, batch remains pending for next flush cycle (pending={}): {}",
1611 pending.len(),
1612 e
1613 );
1614 tokio::time::sleep(REPLICATION_RETRY_DELAY).await;
1615 return Err(e);
1616 }
1617 }
1618 }
1619
1620 Ok(())
1621 }
1622
1623 async fn replicate_chunk(
1624 pool: &crate::db_handler::DbPool,
1625 persist_digests: bool,
1626 records: &[WalRecordWithOffset],
1627 #[cfg(feature = "rsm-state")] rsm_environment: hypercall_db::ValidatorRsmEnvironment,
1628 ) -> Result<ReplicationProgress, String> {
1629 if records.is_empty() {
1630 return Ok(ReplicationProgress {
1631 last_command_id: 0,
1632 max_l2_seq: 0,
1633 });
1634 }
1635
1636 let db_entries: Vec<JournalEntry> = records
1637 .iter()
1638 .map(WalRecordWithOffset::to_db_entry)
1639 .collect();
1640
1641 let pool_for_insert = pool.clone();
1642 #[cfg(feature = "rsm-state")]
1643 let block_records = records.to_vec();
1644 let insert_start = Instant::now();
1645 let insert_result = tokio::task::spawn_blocking(move || {
1646 Self::insert_batch(
1647 &pool_for_insert,
1648 &db_entries,
1649 persist_digests,
1650 #[cfg(feature = "rsm-state")]
1651 Some(Self::rsm_block_persistence_batch(
1652 RsmBlockPersistenceInput {
1653 environment: rsm_environment,
1654 records: &block_records,
1655 mode: RsmBlockPersistenceMode::Live,
1656 },
1657 )?),
1658 )
1659 })
1660 .await
1661 .map_err(|e| format!("spawn_blocking failed during replication insert: {}", e))?;
1662
1663 histogram!("ht_journal_postgres_insert_seconds")
1664 .record(insert_start.elapsed().as_secs_f64());
1665
1666 let (_inserted_count, mut inserted_commands) =
1667 insert_result.map_err(|e| format!("replication insert failed: {}", e))?;
1668
1669 if inserted_commands.is_empty() {
1670 let request_ids: Vec<hypercall_db_diesel::engine_enums::DbUuid> =
1672 records.iter().map(|r| r.record.request_uuid).collect();
1673 let pool_for_lookup = pool.clone();
1674 let lookup_result = tokio::task::spawn_blocking(move || {
1675 Self::lookup_command_ids_by_request_ids(&pool_for_lookup, &request_ids)
1676 })
1677 .await
1678 .map_err(|e| format!("spawn_blocking failed during command_id lookup: {}", e))?;
1679
1680 inserted_commands =
1681 lookup_result.map_err(|e| format!("command_id lookup failed: {}", e))?;
1682
1683 if inserted_commands.is_empty() {
1684 return Err(
1685 "replication insert returned no command IDs and lookup by request_id returned empty"
1686 .to_string(),
1687 );
1688 }
1689 }
1690
1691 let last_command_id = inserted_commands
1692 .iter()
1693 .map(|(_, command_id)| *command_id)
1694 .max()
1695 .expect("inserted_commands unexpectedly empty after rehydration");
1696
1697 let max_l2_seq = records
1698 .iter()
1699 .flat_map(|record| record.record.events.iter().filter_map(|e| e.l2_sequence))
1700 .max()
1701 .unwrap_or(0);
1702
1703 Ok(ReplicationProgress {
1704 last_command_id,
1705 max_l2_seq,
1706 })
1707 }
1708
1709 fn insert_batch(
1712 pool: &crate::db_handler::DbPool,
1713 entries: &[JournalEntry],
1714 persist_digests: bool,
1715 #[cfg(feature = "rsm-state")] rsm_blocks: Option<hypercall_db::EngineJournalRsmBlockBatch>,
1716 ) -> anyhow::Result<(usize, Vec<(hypercall_db_diesel::engine_enums::DbUuid, i64)>)> {
1717 let db_entries: Vec<_> = entries.iter().map(engine_journal_entry_insert).collect();
1718 let handler =
1719 hypercall_db_diesel::DatabaseHandler::with_pool_no_migrations(Arc::new(pool.clone()));
1720 #[cfg(feature = "rsm-state")]
1721 let rsm_blocks = rsm_blocks.as_ref();
1722 #[cfg(not(feature = "rsm-state"))]
1723 let rsm_blocks = None;
1724 let result =
1725 handler.insert_engine_journal_batch_sync(&db_entries, persist_digests, rsm_blocks)?;
1726 Ok((
1727 result.inserted_count,
1728 result
1729 .inserted_commands
1730 .into_iter()
1731 .map(|(request_uuid, command_id)| {
1732 (
1733 hypercall_db_diesel::engine_enums::DbUuid(request_uuid),
1734 command_id,
1735 )
1736 })
1737 .collect(),
1738 ))
1739 }
1740
1741 fn lookup_command_ids_by_request_ids(
1742 pool: &crate::db_handler::DbPool,
1743 request_ids: &[hypercall_db_diesel::engine_enums::DbUuid],
1744 ) -> anyhow::Result<Vec<(hypercall_db_diesel::engine_enums::DbUuid, i64)>> {
1745 let request_ids: Vec<_> = request_ids.iter().map(|id| id.0).collect();
1746 let handler =
1747 hypercall_db_diesel::DatabaseHandler::with_pool_no_migrations(Arc::new(pool.clone()));
1748 Ok(handler
1749 .lookup_engine_journal_command_ids_sync(&request_ids)?
1750 .into_iter()
1751 .map(|(request_uuid, command_id)| {
1752 (
1753 hypercall_db_diesel::engine_enums::DbUuid(request_uuid),
1754 command_id,
1755 )
1756 })
1757 .collect())
1758 }
1759
1760 pub fn start(self) -> tokio::task::JoinHandle<()> {
1762 tokio::spawn(
1763 async move {
1764 self.run().await;
1765 }
1766 .instrument(info_span!("journal_batcher")),
1767 )
1768 }
1769}
1770
1771fn engine_journal_entry_insert(entry: &JournalEntry) -> hypercall_db::EngineJournalEntryInsert {
1772 let pre_digest_json = serde_json::to_value(&entry.pre_digest)
1773 .expect("Failed to serialize pre_digest for journal insert");
1774 let post_digest_json = serde_json::to_value(&entry.post_digest)
1775 .expect("Failed to serialize post_digest for journal insert");
1776
1777 hypercall_db::EngineJournalEntryInsert {
1778 request_uuid: entry.request_uuid.0,
1779 received_ts_ms: entry.received_ts_ms,
1780 command_data: entry.command_data.clone(),
1781 response_data: entry.response_data.clone(),
1782 order_id: entry.order_id,
1783 command_type: entry
1784 .command_type_enum
1785 .map(|command| command.as_str().to_string()),
1786 duration_ms: entry.duration_ms,
1787 pre_digest_data: hypercall_types::serialize_to_wire_bytes(&pre_digest_json),
1788 post_digest_data: hypercall_types::serialize_to_wire_bytes(&post_digest_json),
1789 events: entry
1790 .events
1791 .iter()
1792 .map(|event| hypercall_db::EngineJournalEventInsert {
1793 event_topic: event.event_topic.clone(),
1794 event_key: event.event_key.clone(),
1795 event_data: event.event_data.clone(),
1796 l2_sequence: event.l2_sequence,
1797 event_type: event.event_type_enum.into(),
1798 })
1799 .collect(),
1800 outbox_appends: entry.outbox_appends.clone(),
1801 fill_side_effects: entry
1802 .fill_side_effects
1803 .iter()
1804 .map(|side_effect| hypercall_db::EngineJournalFillSideEffect {
1805 event_idx: side_effect.event_idx,
1806 trade_id: side_effect.trade_id,
1807 taker_ledger_delta: side_effect.taker_ledger_delta,
1808 maker_ledger_delta: side_effect.maker_ledger_delta,
1809 taker_premium_delta: side_effect.taker_premium_delta,
1810 maker_premium_delta: side_effect.maker_premium_delta,
1811 underlying_notional: side_effect.underlying_notional,
1812 })
1813 .collect(),
1814 cash_withdrawal_side_effect: entry.cash_withdrawal_side_effect.as_ref().map(
1815 |side_effect| hypercall_db::EngineJournalCashWithdrawalSideEffect {
1816 wallet: side_effect.wallet,
1817 request_id: side_effect.request_id.clone(),
1818 amount: side_effect.amount,
1819 balance_after: side_effect.balance_after,
1820 timestamp_ms: side_effect.timestamp_ms,
1821 },
1822 ),
1823 balance_updates: entry
1824 .balance_updates
1825 .iter()
1826 .cloned()
1827 .map(|update| hypercall_db::EngineJournalBalanceUpdate { update })
1828 .collect(),
1829 }
1830}
1831
1832#[cfg(feature = "rsm-state")]
1833fn engine_command_identity_hashes(entries: &[JournalEntry]) -> Vec<[u8; 32]> {
1834 entries
1835 .iter()
1836 .filter_map(|entry| {
1837 entry
1838 .command_type_enum
1839 .is_some_and(rsm_state_supported_command)
1840 .then_some(entry.command_identity_hash)
1841 })
1842 .collect()
1843}
1844
1845#[cfg(feature = "rsm-state")]
1846#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1847struct RsmBatchRoots {
1848 state_root: [u8; 32],
1849 risk_root: [u8; 32],
1850 command_mmr_root: [u8; 32],
1851 obligation_mmr_root: [u8; 32],
1852 intent_mmr_root: [u8; 32],
1853 batch_root: [u8; 32],
1854}
1855
1856#[cfg(feature = "rsm-state")]
1857impl From<hypercall_state_commitment::pipeline::BatchCommitment> for RsmBatchRoots {
1858 fn from(commitment: hypercall_state_commitment::pipeline::BatchCommitment) -> Self {
1859 Self {
1860 state_root: commitment.state_root.0,
1861 risk_root: commitment.risk_root.0,
1862 command_mmr_root: commitment.command_mmr_root.0,
1863 obligation_mmr_root: commitment.obligation_mmr_root.0,
1864 intent_mmr_root: commitment.intent_mmr_root.0,
1865 batch_root: commitment.batch_root,
1866 }
1867 }
1868}
1869
1870#[cfg(feature = "rsm-state")]
1871fn rsm_batch_roots_from_wal(records: &[WalRecordWithOffset]) -> Option<RsmBatchRoots> {
1872 records.iter().find_map(|record| record.block_roots)
1873}
1874
1875#[cfg(feature = "rsm-state")]
1876fn apply_recovered_rsm_commitment_blocks(
1877 records: &mut [WalRecordWithOffset],
1878 commitments: &mut ValidatorRsmStateCommitmentRuntime,
1879) -> anyhow::Result<()> {
1880 let mut index = 0;
1881 while index < records.len() {
1882 let Some(header) = records[index].block_header.as_ref() else {
1883 index += 1;
1884 continue;
1885 };
1886
1887 let end_offset = records[index].end_offset;
1888 let block_end = records[index..]
1889 .iter()
1890 .position(|candidate| candidate.end_offset != end_offset)
1891 .map(|relative| index + relative)
1892 .unwrap_or(records.len());
1893 let block_records = &records[index..block_end];
1894 let expected_roots = rsm_batch_roots_from_wal(block_records);
1895
1896 let next_version = commitments.next_batch_version();
1897 if header.batch_seq < next_version {
1898 index = block_end;
1899 continue;
1900 }
1901 if header.batch_seq > next_version {
1902 anyhow::bail!(
1903 "recovered RSM block version gap: WAL block {} but local commitment store next version is {}",
1904 header.batch_seq,
1905 next_version
1906 );
1907 }
1908
1909 let entries: Vec<JournalEntry> = block_records
1910 .iter()
1911 .map(WalRecordWithOffset::to_db_entry)
1912 .collect();
1913 let identity_hashes = engine_command_identity_hashes(&entries);
1914 if identity_hashes.len() as u64 != header.command_count {
1915 anyhow::bail!(
1916 "recovered RSM block {} command count mismatch: header={} identities={}",
1917 header.batch_seq,
1918 header.command_count,
1919 identity_hashes.len()
1920 );
1921 }
1922
1923 let delta = rsm_state_delta_from_entries(&entries, &identity_hashes, header.first_seq)?;
1924 let prepared = commitments.prepare_rsm_batch(&delta)?;
1925 let computed_roots: RsmBatchRoots = prepared.commitment.clone().into();
1926 match expected_roots {
1927 Some(expected_roots) if computed_roots != expected_roots => {
1928 anyhow::bail!(
1929 "recovered RSM block {} root mismatch: WAL batch_root={} local batch_root={}",
1930 header.batch_seq,
1931 hex::encode(expected_roots.batch_root),
1932 hex::encode(computed_roots.batch_root)
1933 );
1934 }
1935 Some(_) => {}
1936 None if computed_roots.batch_root != header.batch_root => {
1937 anyhow::bail!(
1938 "recovered legacy RSM block {} root mismatch: WAL header batch_root={} local batch_root={}",
1939 header.batch_seq,
1940 hex::encode(header.batch_root),
1941 hex::encode(computed_roots.batch_root)
1942 );
1943 }
1944 None => {
1945 records[index].block_roots = Some(computed_roots);
1946 }
1947 }
1948 commitments.apply_prepared_rsm_batch(prepared)?;
1949
1950 index = block_end;
1951 }
1952
1953 Ok(())
1954}
1955
1956#[cfg(feature = "rsm-state")]
1957fn rsm_state_delta_from_entries(
1958 entries: &[JournalEntry],
1959 identity_hashes: &[[u8; 32]],
1960 first_seq: u64,
1961) -> anyhow::Result<StateDelta> {
1962 let engine_entries: Vec<&JournalEntry> = entries
1963 .iter()
1964 .filter(|entry| {
1965 entry
1966 .command_type_enum
1967 .is_some_and(rsm_state_supported_command)
1968 })
1969 .collect();
1970 let mut delta = rsm_state_delta_from_parts(
1971 engine_entries
1972 .iter()
1973 .filter_map(|entry| entry.command_type_enum),
1974 engine_entries.iter().flat_map(|entry| entry.events.iter()),
1975 identity_hashes,
1976 first_seq,
1977 )?;
1978 delta.global = Some(rsm_global_leaf(identity_hashes, first_seq));
1979 Ok(delta)
1980}
1981
1982#[cfg(feature = "rsm-state")]
1983fn rsm_state_delta_from_parts<'a>(
1984 command_types: impl IntoIterator<Item = hypercall_db_diesel::engine_enums::CommandType>,
1985 events: impl IntoIterator<Item = &'a EventPayload>,
1986 identity_hashes: &[[u8; 32]],
1987 first_seq: u64,
1988) -> anyhow::Result<StateDelta> {
1989 for command_type in command_types {
1990 if !rsm_state_supported_command(command_type) {
1991 anyhow::bail!(
1992 "unsupported validator RSM state materializer command type: {}",
1993 command_type.as_str()
1994 );
1995 }
1996 }
1997 let mut delta = StateDelta::default();
1998 let _ = first_seq;
1999 for hash in identity_hashes {
2000 delta = append_command_delta(delta, *hash);
2001 }
2002 for event in events {
2003 delta = append_obligation_delta(delta, rsm_event_payload_hash(event));
2004 }
2005 for hash in identity_hashes {
2006 delta = append_intent_delta(delta, *hash);
2007 }
2008 Ok(delta)
2009}
2010
2011#[cfg(feature = "rsm-state")]
2012fn append_command_delta(mut delta: StateDelta, command_hash: [u8; 32]) -> StateDelta {
2013 delta.commands.push(command_hash);
2014 delta
2015}
2016
2017#[cfg(feature = "rsm-state")]
2018fn append_obligation_delta(mut delta: StateDelta, obligation_hash: [u8; 32]) -> StateDelta {
2019 delta.obligations.push(obligation_hash);
2020 delta
2021}
2022
2023#[cfg(feature = "rsm-state")]
2024fn append_intent_delta(mut delta: StateDelta, intent_hash: [u8; 32]) -> StateDelta {
2025 delta.intents.push(intent_hash);
2026 delta
2027}
2028
2029#[cfg(feature = "rsm-state")]
2030fn rsm_state_supported_command(
2031 command_type: hypercall_db_diesel::engine_enums::CommandType,
2032) -> bool {
2033 matches!(
2038 command_type,
2039 hypercall_db_diesel::engine_enums::CommandType::TickSnapshot
2040 )
2041}
2042
2043#[cfg(feature = "rsm-state")]
2044fn rsm_global_leaf(identity_hashes: &[[u8; 32]], first_seq: u64) -> GlobalLeaf {
2045 GlobalLeaf {
2046 next_order_id: 0,
2047 next_trade_id: 0,
2048 command_chain_root: hypercall_blocks::compute_commands_hash(identity_hashes),
2049 command_chain_seq: first_seq + identity_hashes.len() as u64,
2050 }
2051}
2052
2053#[cfg(feature = "rsm-state")]
2054fn rsm_event_payload_hash(event: &EventPayload) -> [u8; 32] {
2055 let bytes = rmp_serde::to_vec_named(event)
2056 .expect("EventPayload serialization should not fail for RSM obligation root");
2057 rsm_hash_with_domain(b"hypercall:rsm:event-payload:v1", &bytes)
2058}
2059
2060#[cfg(feature = "rsm-state")]
2061fn rsm_hash_with_domain(domain: &[u8], payload: &[u8]) -> [u8; 32] {
2062 use sha3::{Digest, Keccak256};
2063
2064 let mut hasher = Keccak256::new();
2065 hasher.update(domain);
2066 hasher.update((payload.len() as u64).to_be_bytes());
2067 hasher.update(payload);
2068 hasher.finalize().into()
2069}
2070
2071#[derive(Debug, Clone, Serialize, Deserialize)]
2072struct WalJournalRecord {
2073 request_uuid: hypercall_db_diesel::engine_enums::DbUuid,
2074 received_ts_ms: u64,
2075 command_data: Vec<u8>,
2076 response_data: Option<Vec<u8>>,
2077 order_id: Option<i64>,
2078 pre_digest: EngineStateDigest,
2079 post_digest: EngineStateDigest,
2080 duration_ms: u64,
2081 events: Vec<EventPayload>,
2082 outbox_appends: Vec<crate::directive_outbox::DirectiveOutboxAppend>,
2083 fill_side_effects: Vec<JournalFillSideEffect>,
2084 #[serde(default)]
2085 cash_withdrawal_side_effect: Option<JournalCashWithdrawalSideEffect>,
2086 #[serde(default)]
2087 balance_updates: Vec<hypercall_types::BalanceUpdate>,
2088 command_type_enum: Option<hypercall_db_diesel::engine_enums::CommandType>,
2089 #[cfg(feature = "rsm-state")]
2090 #[serde(default)]
2091 command_identity_hash: [u8; 32],
2092 #[cfg(feature = "rsm-state")]
2093 #[serde(default)]
2094 rsm_state_digest: Option<RsmEngineStateDigest>,
2095}
2096
2097#[cfg(feature = "rsm-state")]
2098type WalBlockRecord = BlockLogHeader;
2099#[cfg(feature = "rsm-state")]
2100type WalBlockEntry = BlockLogEntry<WalJournalRecord>;
2101
2102#[cfg(feature = "rsm-state")]
2103#[derive(Debug, Clone, Serialize, Deserialize)]
2104struct WalBlockRecordWithRoots {
2105 header: WalBlockRecord,
2106 roots: RsmBatchRoots,
2107}
2108
2109#[cfg(feature = "rsm-state")]
2110#[derive(Debug, Clone, Serialize, Deserialize)]
2111struct WalDurableBlockEntry {
2112 header: WalBlockRecord,
2113 roots: RsmBatchRoots,
2114 commands: Vec<WalJournalRecord>,
2115}
2116
2117impl WalJournalRecord {
2118 fn from_entry(entry: &JournalEntry) -> Self {
2119 Self {
2120 request_uuid: entry.request_uuid,
2121 received_ts_ms: entry.received_ts_ms,
2122 command_data: entry.command_data.clone(),
2123 response_data: entry.response_data.clone(),
2124 order_id: entry.order_id,
2125 pre_digest: entry.pre_digest.clone(),
2126 post_digest: entry.post_digest.clone(),
2127 duration_ms: entry.duration_ms,
2128 events: entry.events.clone(),
2129 outbox_appends: entry.outbox_appends.clone(),
2130 fill_side_effects: entry.fill_side_effects.clone(),
2131 cash_withdrawal_side_effect: entry.cash_withdrawal_side_effect.clone(),
2132 balance_updates: entry.balance_updates.clone(),
2133 command_type_enum: entry.command_type_enum,
2134 #[cfg(feature = "rsm-state")]
2135 command_identity_hash: entry.command_identity_hash,
2136 #[cfg(feature = "rsm-state")]
2137 rsm_state_digest: entry.rsm_state_digest.clone(),
2138 }
2139 }
2140}
2141
2142#[derive(Debug, Clone)]
2143struct WalRecordWithOffset {
2144 record: WalJournalRecord,
2145 end_offset: u64,
2146 #[cfg(feature = "rsm-state")]
2147 block_header: Option<WalBlockRecord>,
2148 #[cfg(feature = "rsm-state")]
2149 block_roots: Option<RsmBatchRoots>,
2150}
2151
2152impl WalRecordWithOffset {
2153 fn to_db_entry(&self) -> JournalEntry {
2154 JournalEntry {
2155 request_uuid: self.record.request_uuid,
2156 received_ts_ms: self.record.received_ts_ms,
2157 command_data: self.record.command_data.clone(),
2158 response_data: self.record.response_data.clone(),
2159 order_id: self.record.order_id,
2160 pre_digest: self.record.pre_digest.clone(),
2161 post_digest: self.record.post_digest.clone(),
2162 duration_ms: self.record.duration_ms,
2163 events: self.record.events.clone(),
2164 outbox_appends: self.record.outbox_appends.clone(),
2165 fill_side_effects: self.record.fill_side_effects.clone(),
2166 cash_withdrawal_side_effect: self.record.cash_withdrawal_side_effect.clone(),
2167 balance_updates: self.record.balance_updates.clone(),
2168 created_at: Instant::now(),
2169 commit_ack: None,
2170 command_type_enum: self.record.command_type_enum,
2171 #[cfg(feature = "rsm-state")]
2172 command_identity_hash: self.record.command_identity_hash,
2173 #[cfg(feature = "rsm-state")]
2174 rsm_state_digest: self.record.rsm_state_digest.clone(),
2175 }
2176 }
2177}
2178
2179fn next_replication_chunk_len(pending: &[WalRecordWithOffset], max_batch_size: usize) -> usize {
2186 if pending.is_empty() {
2187 return 0;
2188 }
2189
2190 let mut chunk_len = max_batch_size.max(1).min(pending.len());
2191 while chunk_len < pending.len()
2192 && pending[chunk_len - 1].end_offset == pending[chunk_len].end_offset
2193 {
2194 chunk_len += 1;
2195 }
2196 chunk_len
2197}
2198
2199fn should_defer_replay_record(
2200 current_batch_commands: usize,
2201 decoded_record_commands: usize,
2202 max_batch_size: usize,
2203) -> bool {
2204 current_batch_commands > 0
2205 && current_batch_commands + decoded_record_commands > max_batch_size.max(1)
2206}
2207
2208struct WalState {
2209 wal_path: PathBuf,
2210 checkpoint_path: PathBuf,
2211 file: File,
2212 next_offset: u64,
2213 preallocated_end: u64,
2216}
2217
2218impl WalState {
2219 #[cfg(test)]
2221 fn open_at(wal_path: PathBuf) -> Result<Self, String> {
2222 if let Some(parent) = wal_path.parent() {
2223 if !parent.as_os_str().is_empty() {
2224 create_dir_all(parent).map_err(|e| {
2225 format!("failed to create WAL directory {}: {}", parent.display(), e)
2226 })?;
2227 }
2228 }
2229
2230 let checkpoint_path = checkpoint_path_for_wal(&wal_path);
2231
2232 let file = OpenOptions::new()
2233 .create(true)
2234 .append(true)
2235 .read(true)
2236 .open(&wal_path)
2237 .map_err(|e| format!("failed to open WAL file {}: {}", wal_path.display(), e))?;
2238
2239 let next_offset = file
2240 .metadata()
2241 .map_err(|e| format!("failed to stat WAL file {}: {}", wal_path.display(), e))?
2242 .len();
2243
2244 Ok(Self {
2245 wal_path,
2246 checkpoint_path,
2247 file,
2248 next_offset,
2249 preallocated_end: next_offset,
2250 })
2251 }
2252
2253 fn open(wal_path: PathBuf) -> Result<Self, String> {
2254 if let Some(parent) = wal_path.parent() {
2255 if !parent.as_os_str().is_empty() {
2256 create_dir_all(parent).map_err(|e| {
2257 format!("failed to create WAL directory {}: {}", parent.display(), e)
2258 })?;
2259
2260 Self::emit_disk_space_metrics(parent);
2261 }
2262 }
2263
2264 let checkpoint_path = checkpoint_path_for_wal(&wal_path);
2265
2266 #[cfg(unix)]
2267 let file = {
2268 use std::os::unix::fs::OpenOptionsExt;
2269 OpenOptions::new()
2270 .create(true)
2271 .append(true)
2272 .read(true)
2273 .mode(0o600)
2274 .open(&wal_path)
2275 .map_err(|e| format!("failed to open WAL file {}: {}", wal_path.display(), e))?
2276 };
2277 #[cfg(not(unix))]
2278 let file = OpenOptions::new()
2279 .create(true)
2280 .append(true)
2281 .read(true)
2282 .open(&wal_path)
2283 .map_err(|e| format!("failed to open WAL file {}: {}", wal_path.display(), e))?;
2284
2285 let next_offset = file
2286 .metadata()
2287 .map_err(|e| format!("failed to stat WAL file {}: {}", wal_path.display(), e))?
2288 .len();
2289
2290 let cp = read_checkpoint(&checkpoint_path).unwrap_or_else(|e| {
2293 panic!(
2294 "CRITICAL_FAILURE: failed to read/parse checkpoint {}: {}",
2295 checkpoint_path.display(),
2296 e
2297 )
2298 });
2299 if cp.wal_offset > 0 {
2300 info!(
2301 "WAL startup: punching replicated prefix (0..{} bytes, file size={} bytes)",
2302 cp.wal_offset, next_offset
2303 );
2304 EngineJournalBatcher::punch_wal_hole(&wal_path, 0, cp.wal_offset);
2305
2306 if let Some(parent) = wal_path.parent() {
2307 Self::emit_disk_space_metrics(parent);
2308 }
2309 }
2310
2311 Ok(Self {
2312 wal_path,
2313 checkpoint_path,
2314 file,
2315 next_offset,
2316 preallocated_end: next_offset,
2317 })
2318 }
2319
2320 fn emit_disk_space_metrics(path: &Path) {
2322 #[cfg(unix)]
2323 {
2324 use std::ffi::CString;
2325 let c_path = match CString::new(path.to_string_lossy().as_bytes()) {
2326 Ok(p) => p,
2327 Err(_) => return,
2328 };
2329 unsafe {
2330 let mut stat: libc::statvfs = std::mem::zeroed();
2331 if libc::statvfs(c_path.as_ptr(), &mut stat) == 0 {
2332 let total_bytes = stat.f_frsize as u64 * stat.f_blocks as u64;
2333 let avail_bytes = stat.f_frsize as u64 * stat.f_bavail as u64;
2334 let used_bytes =
2335 total_bytes.saturating_sub(stat.f_frsize as u64 * stat.f_bfree as u64);
2336 let pct = if total_bytes > 0 {
2337 (used_bytes as f64 / total_bytes as f64) * 100.0
2338 } else {
2339 0.0
2340 };
2341
2342 gauge!("ht_wal_volume_total_bytes").set(total_bytes as f64);
2343 gauge!("ht_wal_volume_used_bytes").set(used_bytes as f64);
2344 gauge!("ht_wal_volume_avail_bytes").set(avail_bytes as f64);
2345 gauge!("ht_wal_volume_usage_pct").set(pct);
2346 }
2347 }
2348 }
2349 }
2350
2351 fn ensure_preallocated(&mut self) -> u64 {
2357 if self.next_offset < self.preallocated_end {
2358 return 0;
2359 }
2360
2361 #[cfg(target_os = "linux")]
2362 {
2363 let offset = self.preallocated_end as i64;
2364 let len = WAL_PREALLOC_CHUNK as i64;
2365 let ret = unsafe {
2366 libc::fallocate(
2367 self.file.as_raw_fd(),
2368 libc::FALLOC_FL_KEEP_SIZE,
2369 offset,
2370 len,
2371 )
2372 };
2373 if ret == 0 {
2374 self.preallocated_end += WAL_PREALLOC_CHUNK;
2375 WAL_PREALLOC_CHUNK
2376 } else {
2377 let errno = std::io::Error::last_os_error();
2378 warn!(
2379 "fallocate failed (non-fatal, continuing without preallocation): {}",
2380 errno
2381 );
2382 if let Some(parent) = self.wal_path.parent() {
2384 Self::emit_disk_space_metrics(parent);
2385 }
2386 0
2387 }
2388 }
2389
2390 #[cfg(not(target_os = "linux"))]
2391 {
2392 0
2393 }
2394 }
2395
2396 #[cfg_attr(feature = "rsm-state", allow(dead_code))]
2401 fn append_batch_no_sync(
2402 &mut self,
2403 entries: &[JournalEntry],
2404 ) -> Result<Vec<WalRecordWithOffset>, String> {
2405 if entries.is_empty() {
2406 return Ok(Vec::new());
2407 }
2408
2409 self.ensure_preallocated();
2411
2412 let mut out = Vec::with_capacity(entries.len());
2413
2414 let mut writer = BufWriter::new(&self.file);
2415
2416 for entry in entries {
2417 let record = WalJournalRecord::from_entry(entry);
2418 let payload = rmp_serde::to_vec_named(&record).map_err(|e| {
2419 format!(
2420 "failed to serialize WAL record for {}: {}",
2421 entry.request_uuid, e
2422 )
2423 })?;
2424
2425 write_frame(
2426 &mut writer,
2427 &payload,
2428 &mut self.next_offset,
2429 &format!("record for {}", entry.request_uuid),
2430 )?;
2431
2432 out.push(WalRecordWithOffset {
2433 record,
2434 end_offset: self.next_offset,
2435 #[cfg(feature = "rsm-state")]
2436 block_header: None,
2437 #[cfg(feature = "rsm-state")]
2438 block_roots: None,
2439 });
2440 }
2441
2442 writer
2443 .flush()
2444 .map_err(|e| format!("failed to flush WAL BufWriter: {}", e))?;
2445
2446 Ok(out)
2447 }
2448
2449 #[cfg(feature = "rsm-state")]
2451 fn append_block<T: Serialize>(&mut self, block: &T) -> Result<u64, String> {
2452 self.ensure_preallocated();
2453
2454 let payload = rmp_serde::to_vec_named(block)
2455 .map_err(|e| format!("failed to serialize WAL block: {}", e))?;
2456
2457 let mut writer = BufWriter::new(&self.file);
2458 write_frame(&mut writer, &payload, &mut self.next_offset, "block")?;
2459 writer
2460 .flush()
2461 .map_err(|e| format!("failed to flush WAL BufWriter: {}", e))?;
2462
2463 Ok(self.next_offset)
2464 }
2465
2466 #[cfg(test)]
2468 fn append_batch(
2469 &mut self,
2470 entries: &[JournalEntry],
2471 ) -> Result<Vec<WalRecordWithOffset>, String> {
2472 let result = self.append_batch_no_sync(entries)?;
2473 self.file.sync_data().map_err(|e| {
2474 format!(
2475 "test sync_data failed for WAL {}: {}",
2476 self.wal_path.display(),
2477 e
2478 )
2479 })?;
2480 Ok(result)
2481 }
2482
2483 #[cfg(test)]
2484 fn read_unreplicated_records(
2485 wal_path: &Path,
2486 checkpoint_path: &Path,
2487 ) -> Result<Vec<WalRecordWithOffset>, String> {
2488 if !wal_path.exists() {
2489 return Ok(Vec::new());
2490 }
2491
2492 let checkpoint = read_checkpoint(checkpoint_path)?;
2493 let mut file = OpenOptions::new().read(true).open(wal_path).map_err(|e| {
2494 format!(
2495 "failed to open WAL {} for replay: {}",
2496 wal_path.display(),
2497 e
2498 )
2499 })?;
2500
2501 let file_len = file
2502 .metadata()
2503 .map_err(|e| format!("failed to stat WAL {}: {}", wal_path.display(), e))?
2504 .len();
2505
2506 if checkpoint.wal_offset > file_len {
2507 return Err(format!(
2508 "WAL checkpoint {} exceeds file length {} for {}",
2509 checkpoint.wal_offset,
2510 file_len,
2511 wal_path.display()
2512 ));
2513 }
2514
2515 file.seek(SeekFrom::Start(checkpoint.wal_offset))
2516 .map_err(|e| format!("failed to seek WAL replay start: {}", e))?;
2517
2518 let mut offset = checkpoint.wal_offset;
2519 let mut out = Vec::new();
2520
2521 loop {
2522 let mut len_buf = [0u8; 4];
2523 match file.read_exact(&mut len_buf) {
2524 Ok(()) => {}
2525 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break,
2526 Err(e) => {
2527 return Err(format!("failed reading WAL length prefix: {}", e));
2528 }
2529 }
2530
2531 let payload_len = u32::from_le_bytes(len_buf) as usize;
2532 if payload_len == 0 {
2533 panic!(
2534 "CRITICAL_FAILURE: WAL record length was zero at offset {} in {}. Data corruption detected.",
2535 offset,
2536 wal_path.display()
2537 );
2538 }
2539 offset += 4;
2540
2541 let remaining = file_len.saturating_sub(offset);
2543 if payload_len as u64 > remaining {
2544 warn!(
2545 "WAL declared payload_len={} but only {} bytes remain at offset {} in {}, \
2546 truncating tail and replaying prior durable records only",
2547 payload_len,
2548 remaining,
2549 offset,
2550 wal_path.display(),
2551 );
2552 break;
2553 }
2554
2555 let mut crc_buf = [0u8; 4];
2556 match file.read_exact(&mut crc_buf) {
2557 Ok(()) => {}
2558 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
2559 warn!(
2560 "WAL ended with incomplete CRC header at offset {} in {} ({}), \
2561 truncating tail and replaying prior durable records only",
2562 offset,
2563 wal_path.display(),
2564 e
2565 );
2566 break;
2567 }
2568 Err(e) => return Err(format!("failed reading WAL crc prefix: {}", e)),
2569 }
2570 offset += 4;
2571
2572 let expected_crc = u32::from_le_bytes(crc_buf);
2573 let mut payload = vec![0u8; payload_len];
2574
2575 match file.read_exact(&mut payload) {
2576 Ok(()) => {}
2577 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
2578 warn!(
2579 "WAL ended with incomplete payload at offset {} in {} ({}), \
2580 truncating tail and replaying prior durable records only",
2581 offset,
2582 wal_path.display(),
2583 e
2584 );
2585 break;
2586 }
2587 Err(e) => return Err(format!("failed reading WAL payload: {}", e)),
2588 }
2589 offset += payload_len as u64;
2590
2591 let actual_crc = WAL_CRC.checksum(&payload);
2592 if actual_crc != expected_crc {
2593 panic!(
2594 "CRITICAL_FAILURE: WAL CRC mismatch at end_offset {} in {}: expected {}, got {}. Data corruption detected.",
2595 offset,
2596 wal_path.display(),
2597 expected_crc,
2598 actual_crc
2599 );
2600 }
2601
2602 #[cfg(feature = "rsm-state")]
2603 if let Ok(block_entry) = rmp_serde::from_slice::<WalDurableBlockEntry>(&payload) {
2604 let header = block_entry.header;
2605 let roots = block_entry.roots;
2606 for (index, cmd) in block_entry.commands.into_iter().enumerate() {
2607 out.push(WalRecordWithOffset {
2608 record: cmd,
2609 end_offset: offset,
2610 block_header: (index == 0).then_some(header.clone()),
2611 block_roots: (index == 0).then_some(roots),
2612 });
2613 }
2614 continue;
2615 }
2616
2617 #[cfg(feature = "rsm-state")]
2618 if rmp_serde::from_slice::<WalBlockEntry>(&payload).is_ok() {
2619 panic!(
2620 "CRITICAL_FAILURE: legacy rootless RSM WAL block at end_offset {} is unsupported after durable root cutover",
2621 offset
2622 );
2623 }
2624
2625 if let Ok(record) = rmp_serde::from_slice::<WalJournalRecord>(&payload) {
2626 out.push(WalRecordWithOffset {
2627 record,
2628 end_offset: offset,
2629 #[cfg(feature = "rsm-state")]
2630 block_header: None,
2631 #[cfg(feature = "rsm-state")]
2632 block_roots: None,
2633 });
2634 continue;
2635 }
2636
2637 #[cfg(feature = "rsm-state")]
2638 if rmp_serde::from_slice::<WalBlockRecord>(&payload).is_ok() {
2639 continue;
2640 }
2641
2642 return Err(format!(
2643 "failed to deserialize WAL payload at end_offset {}",
2644 offset
2645 ));
2646 }
2647
2648 Ok(out)
2649 }
2650}
2651
2652#[cfg(target_os = "linux")]
2658fn wal_fsync_io_uring(ring: &mut io_uring::IoUring, file: &File) -> Result<(), String> {
2659 let fd = io_uring::types::Fd(file.as_raw_fd());
2660 let fsync_e = io_uring::opcode::Fsync::new(fd)
2661 .flags(io_uring::types::FsyncFlags::DATASYNC)
2662 .build()
2663 .user_data(0x42);
2664
2665 unsafe {
2666 ring.submission()
2667 .push(&fsync_e)
2668 .map_err(|e| format!("io_uring submission push failed: {:?}", e))?;
2669 }
2670
2671 ring.submit_and_wait(1)
2672 .map_err(|e| format!("io_uring submit_and_wait failed: {}", e))?;
2673
2674 let mut cq = ring.completion();
2675 cq.sync();
2676 let cqe = cq
2677 .next()
2678 .ok_or_else(|| "io_uring completion queue empty after submit_and_wait".to_string())?;
2679
2680 if cqe.result() < 0 {
2681 return Err(format!(
2682 "io_uring FSYNC failed with errno {}",
2683 -cqe.result()
2684 ));
2685 }
2686
2687 Ok(())
2688}
2689
2690fn wal_writer_thread_main(
2696 mut wal_state: WalState,
2697 rx: std::sync::mpsc::Receiver<WalWriterMessage>,
2698 replication_tx: mpsc::Sender<ReplicationMessage>,
2699 #[cfg(feature = "rsm-state")] mut rsm_state: RsmWalWriterState,
2700) {
2701 #[cfg(target_os = "linux")]
2704 let mut ring: Option<io_uring::IoUring> = match io_uring::IoUring::new(8) {
2705 Ok(r) => {
2706 info!("WAL writer using io_uring for DATASYNC");
2707 Some(r)
2708 }
2709 Err(e) => {
2710 warn!(
2711 "io_uring unavailable ({}), WAL writer falling back to blocking fdatasync",
2712 e
2713 );
2714 None
2715 }
2716 };
2717
2718 #[cfg(target_os = "linux")]
2720 fn do_fsync(ring: &mut Option<io_uring::IoUring>, file: &File, wal_path: &Path) {
2721 if let Some(ref mut r) = ring {
2722 if let Err(e) = wal_fsync_io_uring(r, file) {
2723 warn!(
2726 "io_uring FSYNC failed ({}), disabling io_uring and falling back to fdatasync",
2727 e
2728 );
2729 counter!("ht_journal_wal_io_uring_fallbacks").increment(1);
2730 *ring = None;
2731 } else {
2733 return;
2734 }
2735 }
2736 if let Err(e) = file.sync_data() {
2737 if let Some(parent) = wal_path.parent() {
2738 WalState::emit_disk_space_metrics(parent);
2739 }
2740 panic!(
2741 "CRITICAL_FAILURE: fdatasync failed for WAL {}: {}. Engine must stop.",
2742 wal_path.display(),
2743 e
2744 );
2745 }
2746 }
2747
2748 #[cfg(not(target_os = "linux"))]
2749 fn do_fsync(file: &File, wal_path: &Path) {
2750 if let Err(e) = file.sync_data() {
2751 if let Some(parent) = wal_path.parent() {
2752 WalState::emit_disk_space_metrics(parent);
2753 }
2754 panic!(
2755 "CRITICAL_FAILURE: fdatasync failed for WAL {}: {}. Engine must stop.",
2756 wal_path.display(),
2757 e
2758 );
2759 }
2760 }
2761
2762 loop {
2763 match rx.recv() {
2764 Ok(WalWriterMessage::WriteBatch {
2765 entries,
2766 ack_senders,
2767 completion,
2768 }) => {
2769 let wal_start = Instant::now();
2770 let entry_count = entries.len();
2771
2772 #[cfg(feature = "rsm-state")]
2773 let prepared_block = rsm_state.prepare_block(&entries);
2774
2775 #[cfg(feature = "rsm-state")]
2776 let append_result = if let Some(prepared) = prepared_block.as_ref() {
2777 let commands: Vec<WalJournalRecord> =
2778 entries.iter().map(WalJournalRecord::from_entry).collect();
2779 let block_entry = WalDurableBlockEntry {
2780 header: prepared.block.header.clone(),
2781 roots: prepared.block.roots,
2782 commands,
2783 };
2784 wal_state.append_block(&block_entry).map(|end_offset| {
2785 let header = block_entry.header.clone();
2786 let roots = block_entry.roots;
2787 block_entry
2788 .commands
2789 .into_iter()
2790 .enumerate()
2791 .map(|(index, cmd)| WalRecordWithOffset {
2792 record: cmd,
2793 end_offset,
2794 block_header: (index == 0).then_some(header.clone()),
2795 block_roots: (index == 0).then_some(roots),
2796 })
2797 .collect::<Vec<_>>()
2798 })
2799 } else {
2800 wal_state.append_batch_no_sync(&entries)
2801 };
2802
2803 #[cfg(not(feature = "rsm-state"))]
2804 let append_result = wal_state.append_batch_no_sync(&entries);
2805
2806 match append_result {
2807 Ok(wal_records) => {
2808 #[cfg(feature = "rsm-state")]
2809 let mut wal_records = wal_records;
2810
2811 #[cfg(target_os = "linux")]
2812 do_fsync(&mut ring, &wal_state.file, &wal_state.wal_path);
2813 #[cfg(not(target_os = "linux"))]
2814 do_fsync(&wal_state.file, &wal_state.wal_path);
2815
2816 #[cfg(feature = "rsm-state")]
2817 if let Some(prepared) = prepared_block {
2818 let applied_block = rsm_state.apply_prepared_block(prepared);
2819 for record in wal_records.iter_mut() {
2820 if record.block_header.is_some() {
2821 record.block_header = Some(applied_block.header.clone());
2822 record.block_roots = Some(applied_block.roots);
2823 break;
2824 }
2825 }
2826 }
2827
2828 histogram!("ht_journal_wal_append_seconds")
2829 .record(wal_start.elapsed().as_secs_f64());
2830 counter!("ht_journal_wal_batches_total").increment(1);
2831 counter!("ht_journal_wal_entries_total").increment(entry_count as u64);
2832 gauge!("ht_wal_file_size_bytes").set(wal_state.next_offset as f64);
2833
2834 if let Some(parent) = wal_state.wal_path.parent() {
2836 WalState::emit_disk_space_metrics(parent);
2837 }
2838
2839 for tx in ack_senders.into_iter().flatten() {
2841 let _ = tx.send(());
2842 }
2843
2844 if let Err(e) =
2846 replication_tx.blocking_send(ReplicationMessage::Entries(wal_records))
2847 {
2848 error!(
2852 "CRITICAL_FAILURE: Replication channel closed unexpectedly \
2853 (replication task likely crashed): {}. Aborting process.",
2854 e
2855 );
2856 std::process::abort();
2857 }
2858
2859 if let Some(tx) = completion {
2861 let _ = tx.send(());
2862 }
2863 }
2864 Err(e) => {
2865 if let Some(parent) = wal_state.wal_path.parent() {
2866 WalState::emit_disk_space_metrics(parent);
2867 }
2868 panic!(
2869 "CRITICAL_FAILURE: WAL append failed: {}. WAL size={} bytes. Engine must stop.",
2870 e, wal_state.next_offset
2871 );
2872 }
2873 }
2874 }
2875 Ok(WalWriterMessage::FlushBarrier { completion }) => {
2876 let _ = completion.send(());
2879 }
2880 Ok(WalWriterMessage::Shutdown) | Err(_) => {
2881 info!("WAL writer thread shutting down");
2882 break;
2883 }
2884 }
2885 }
2886}
2887
2888pub type SharedJournalBatchSender = Arc<JournalBatchSender>;
2890
2891#[cfg(test)]
2892mod tests {
2893 use super::*;
2894 use diesel::RunQueryDsl;
2895
2896 #[test]
2897 fn test_config_defaults() {
2898 let config = JournalBatcherConfig::default();
2899 assert_eq!(config.channel_capacity, DEFAULT_CHANNEL_CAPACITY);
2900 assert_eq!(config.max_batch_size, 100);
2901 assert_eq!(config.flush_interval_ms, 10);
2902 }
2903
2904 #[test]
2905 fn engine_journal_insert_projection_preserves_transactional_side_effect_inputs() {
2906 let request_uuid = hypercall_db_diesel::engine_enums::DbUuid(uuid::Uuid::new_v4());
2907 let wallet = hypercall_types::WalletAddress::from([1u8; 20]);
2908 let account = hypercall_types::WalletAddress::from([2u8; 20]);
2909 let signer = hypercall_types::WalletAddress::from([3u8; 20]);
2910 let destination = hypercall_types::WalletAddress::from([4u8; 20]);
2911
2912 let outbox_append = hypercall_db::DirectiveOutboxAppend::needs_rsm_signature(
2913 "directive-1".to_string(),
2914 hypercall_types::directives::ActionKey::SystemWithdrawToken,
2915 wallet,
2916 account,
2917 signer,
2918 17,
2919 "idem-1".to_string(),
2920 vec![10, 11, 12],
2921 1234,
2922 Some(5678),
2923 );
2924 let fill_accounting = hypercall_types::FillAccounting {
2925 trade_id: 42,
2926 taker_realized_pnl: Decimal::new(-125, 2),
2927 maker_realized_pnl: Decimal::new(125, 2),
2928 taker_premium_delta: Decimal::new(-7, 2),
2929 maker_premium_delta: Decimal::new(7, 2),
2930 taker_net_cash_delta: Decimal::new(-132, 2),
2931 maker_net_cash_delta: Decimal::new(132, 2),
2932 };
2933 let fill_side_effect = JournalFillSideEffect::from_fill_accounting(0, &fill_accounting);
2934 let cash_withdrawal_side_effect = JournalCashWithdrawalSideEffect {
2935 wallet: destination,
2936 request_id: "cash-withdrawal-1".to_string(),
2937 amount: Decimal::new(2500, 2),
2938 balance_after: Decimal::new(7500, 2),
2939 timestamp_ms: 9999,
2940 };
2941 let balance_update = hypercall_types::BalanceUpdate {
2942 balance_update_seq: 7,
2943 wallet,
2944 delta: Decimal::new(2500, 2),
2945 balance_after: Decimal::new(7500, 2),
2946 reason: hypercall_types::BalanceUpdateReason::Deposit,
2947 reference_id: Some("deposit-1".to_string()),
2948 source_command_id: None,
2949 timestamp_ms: 9999,
2950 };
2951 let entry = JournalEntry {
2952 request_uuid,
2953 received_ts_ms: 1000,
2954 command_data: vec![1, 2, 3],
2955 response_data: Some(vec![4, 5]),
2956 order_id: Some(9),
2957 pre_digest: EngineStateDigest {
2958 next_order_id: 10,
2959 next_trade_id: 20,
2960 l2_seq: 30,
2961 symbols: Default::default(),
2962 },
2963 post_digest: EngineStateDigest {
2964 next_order_id: 11,
2965 next_trade_id: 21,
2966 l2_seq: 31,
2967 symbols: Default::default(),
2968 },
2969 duration_ms: 6,
2970 events: vec![EventPayload {
2971 event_topic: "order-updates".to_string(),
2972 event_key: Some("order-9".to_string()),
2973 event_data: vec![0, 1, 2],
2974 l2_sequence: Some(44),
2975 event_type_enum: hypercall_db_diesel::engine_enums::EventType::OrderUpdate,
2976 }],
2977 outbox_appends: vec![outbox_append.clone()],
2978 fill_side_effects: vec![fill_side_effect.clone()],
2979 cash_withdrawal_side_effect: Some(cash_withdrawal_side_effect.clone()),
2980 balance_updates: vec![balance_update.clone()],
2981 created_at: Instant::now(),
2982 commit_ack: None,
2983 command_type_enum: Some(hypercall_db_diesel::engine_enums::CommandType::CreateOrder),
2984 #[cfg(feature = "rsm-state")]
2985 command_identity_hash: [5u8; 32],
2986 #[cfg(feature = "rsm-state")]
2987 rsm_state_digest: Some(crate::rsm::engine_snapshot::EngineStateDigest::empty()),
2988 };
2989
2990 let projected = engine_journal_entry_insert(&entry);
2991
2992 assert_eq!(projected.request_uuid, request_uuid.0);
2993 assert_eq!(projected.received_ts_ms, entry.received_ts_ms);
2994 assert_eq!(projected.command_data, entry.command_data);
2995 assert_eq!(projected.response_data, entry.response_data);
2996 assert_eq!(projected.order_id, entry.order_id);
2997 assert_eq!(projected.command_type.as_deref(), Some("CreateOrder"));
2998 assert_eq!(projected.duration_ms, entry.duration_ms);
2999 assert!(!projected.pre_digest_data.is_empty());
3000 assert!(!projected.post_digest_data.is_empty());
3001 assert_ne!(projected.pre_digest_data, projected.post_digest_data);
3002
3003 assert_eq!(projected.events.len(), 1);
3004 assert_eq!(projected.events[0].event_topic, "order-updates");
3005 assert_eq!(projected.events[0].event_key.as_deref(), Some("order-9"));
3006 assert_eq!(projected.events[0].event_data, vec![0, 1, 2]);
3007 assert_eq!(projected.events[0].l2_sequence, Some(44));
3008 assert_eq!(
3009 projected.events[0].event_type,
3010 hypercall_db::EventType::OrderUpdate
3011 );
3012
3013 assert_eq!(projected.outbox_appends.len(), 1);
3014 assert_eq!(
3015 projected.outbox_appends[0].directive_id,
3016 outbox_append.directive_id
3017 );
3018 assert_eq!(projected.outbox_appends[0].nonce, outbox_append.nonce);
3019 assert_eq!(
3020 projected.outbox_appends[0].action_key,
3021 hypercall_types::directives::ActionKey::SystemWithdrawToken
3022 );
3023
3024 assert_eq!(projected.fill_side_effects.len(), 1);
3025 assert_eq!(
3026 projected.fill_side_effects[0].event_idx,
3027 fill_side_effect.event_idx
3028 );
3029 assert_eq!(
3030 projected.fill_side_effects[0].trade_id,
3031 fill_side_effect.trade_id
3032 );
3033 assert_eq!(
3034 projected.fill_side_effects[0].taker_ledger_delta,
3035 fill_side_effect.taker_ledger_delta
3036 );
3037 assert_eq!(
3038 projected.fill_side_effects[0].maker_premium_delta,
3039 fill_side_effect.maker_premium_delta
3040 );
3041
3042 let projected_cash = projected
3043 .cash_withdrawal_side_effect
3044 .expect("cash withdrawal side effect should be projected");
3045 assert_eq!(projected_cash.wallet, cash_withdrawal_side_effect.wallet);
3046 assert_eq!(
3047 projected_cash.request_id,
3048 cash_withdrawal_side_effect.request_id
3049 );
3050 assert_eq!(projected_cash.amount, cash_withdrawal_side_effect.amount);
3051 assert_eq!(
3052 projected_cash.balance_after,
3053 cash_withdrawal_side_effect.balance_after
3054 );
3055 assert_eq!(
3056 projected_cash.timestamp_ms,
3057 cash_withdrawal_side_effect.timestamp_ms
3058 );
3059
3060 assert_eq!(projected.balance_updates.len(), 1);
3061 assert_eq!(projected.balance_updates[0].update, balance_update);
3062 }
3063
3064 #[test]
3065 fn wal_roundtrip_single_record() {
3066 let dir = tempfile::tempdir().expect("create tempdir");
3067 let wal_path = dir.path().join("journal.wal");
3068
3069 let mut wal = WalState::open_at(wal_path).expect("open wal");
3070 let request_uuid = hypercall_db_diesel::engine_enums::DbUuid(uuid::Uuid::new_v4());
3071 let balance_update = hypercall_types::BalanceUpdate {
3072 balance_update_seq: 3,
3073 wallet: hypercall_types::WalletAddress::from([9u8; 20]),
3074 delta: Decimal::new(-125, 2),
3075 balance_after: Decimal::new(875, 2),
3076 reason: hypercall_types::BalanceUpdateReason::Withdrawal,
3077 reference_id: Some("withdrawal-1".to_string()),
3078 source_command_id: None,
3079 timestamp_ms: 321,
3080 };
3081
3082 let entry = JournalEntry {
3083 request_uuid,
3084 received_ts_ms: 123,
3085 command_data: vec![1, 2, 3],
3086 response_data: Some(vec![4, 5, 6]),
3087 order_id: Some(42),
3088 pre_digest: EngineStateDigest::default(),
3089 post_digest: EngineStateDigest::default(),
3090 duration_ms: 7,
3091 events: vec![EventPayload {
3092 event_type_enum: hypercall_db_diesel::engine_enums::EventType::OrderUpdate,
3093 event_topic: "order-updates".to_string(),
3094 event_key: Some("k".to_string()),
3095 event_data: vec![1, 2, 3],
3096 l2_sequence: Some(1),
3097 }],
3098 outbox_appends: vec![],
3099 fill_side_effects: vec![],
3100 cash_withdrawal_side_effect: None,
3101 balance_updates: vec![balance_update.clone()],
3102 created_at: Instant::now(),
3103 commit_ack: None,
3104 command_type_enum: Some(hypercall_db_diesel::engine_enums::CommandType::CreateOrder),
3105 #[cfg(feature = "rsm-state")]
3106 command_identity_hash: [0u8; 32],
3107 #[cfg(feature = "rsm-state")]
3108 rsm_state_digest: Some(crate::rsm::engine_snapshot::EngineStateDigest::empty()),
3109 };
3110
3111 let offsets = wal.append_batch(&[entry]).expect("append wal batch");
3112 assert_eq!(offsets.len(), 1);
3113
3114 write_checkpoint(
3115 &wal.checkpoint_path,
3116 WalCheckpointMetadata {
3117 wal_offset: 0,
3118 last_command_id: 0,
3119 last_l2_seq: 0,
3120 },
3121 )
3122 .expect("write checkpoint");
3123 let replayed = WalState::read_unreplicated_records(&wal.wal_path, &wal.checkpoint_path)
3124 .expect("replay");
3125 assert_eq!(replayed.len(), 1);
3126 assert_eq!(replayed[0].record.request_uuid, request_uuid);
3127 assert_eq!(
3128 replayed[0].record.balance_updates,
3129 vec![balance_update.clone()]
3130 );
3131 assert_eq!(replayed[0].to_db_entry().balance_updates[0], balance_update);
3132 }
3133
3134 #[test]
3135 fn wal_checkpoint_prevents_double_replay() {
3136 let dir = tempfile::tempdir().expect("create tempdir");
3137 let wal_path = dir.path().join("journal.wal");
3138
3139 let mut wal = WalState::open_at(wal_path).expect("open wal");
3140
3141 let entries: Vec<JournalEntry> = (0..3)
3143 .map(|i| JournalEntry {
3144 request_uuid: hypercall_db_diesel::engine_enums::DbUuid(uuid::Uuid::new_v4()),
3145 received_ts_ms: 100 + i,
3146 command_data: vec![i as u8],
3147 response_data: None,
3148 order_id: Some(i as i64),
3149 pre_digest: EngineStateDigest::default(),
3150 post_digest: EngineStateDigest::default(),
3151 duration_ms: 1,
3152 events: vec![],
3153 outbox_appends: vec![],
3154 fill_side_effects: vec![],
3155 cash_withdrawal_side_effect: None,
3156 balance_updates: Vec::new(),
3157 created_at: Instant::now(),
3158 commit_ack: None,
3159 command_type_enum: Some(
3160 hypercall_db_diesel::engine_enums::CommandType::CreateOrder,
3161 ),
3162 #[cfg(feature = "rsm-state")]
3163 command_identity_hash: [0u8; 32],
3164 #[cfg(feature = "rsm-state")]
3165 rsm_state_digest: Some(crate::rsm::engine_snapshot::EngineStateDigest::empty()),
3166 })
3167 .collect();
3168
3169 let offsets = wal.append_batch(&entries).expect("append batch");
3170 assert_eq!(offsets.len(), 3);
3171
3172 let replayed = WalState::read_unreplicated_records(&wal.wal_path, &wal.checkpoint_path)
3174 .expect("replay all");
3175 assert_eq!(replayed.len(), 3);
3176
3177 let last_offset = offsets.last().unwrap().end_offset;
3179 write_checkpoint(
3180 &wal.checkpoint_path,
3181 WalCheckpointMetadata {
3182 wal_offset: last_offset,
3183 last_command_id: 0,
3184 last_l2_seq: 0,
3185 },
3186 )
3187 .expect("write checkpoint");
3188
3189 let replayed = WalState::read_unreplicated_records(&wal.wal_path, &wal.checkpoint_path)
3191 .expect("replay after checkpoint");
3192 assert_eq!(replayed.len(), 0, "Checkpoint should prevent double replay");
3193
3194 let partial_offset = offsets[0].end_offset;
3196 write_checkpoint(
3197 &wal.checkpoint_path,
3198 WalCheckpointMetadata {
3199 wal_offset: partial_offset,
3200 last_command_id: 0,
3201 last_l2_seq: 0,
3202 },
3203 )
3204 .expect("write partial checkpoint");
3205
3206 let replayed = WalState::read_unreplicated_records(&wal.wal_path, &wal.checkpoint_path)
3207 .expect("replay after partial checkpoint");
3208 assert_eq!(
3209 replayed.len(),
3210 2,
3211 "Partial checkpoint should replay only unreplicated records"
3212 );
3213 }
3214
3215 #[test]
3216 #[should_panic(expected = "CRITICAL_FAILURE: WAL record length was zero")]
3217 fn wal_replay_panics_on_zero_length_record() {
3218 let dir = tempfile::tempdir().expect("create tempdir");
3219 let wal_path = dir.path().join("journal.wal");
3220 let checkpoint_path = checkpoint_path_for_wal(&wal_path);
3221
3222 std::fs::write(&wal_path, 0u32.to_le_bytes()).expect("write malformed wal");
3223 WalState::read_unreplicated_records(&wal_path, &checkpoint_path)
3224 .expect("zero-length record must panic before returning");
3225 }
3226
3227 #[test]
3228 #[should_panic(expected = "CRITICAL_FAILURE: WAL CRC mismatch")]
3229 fn wal_replay_panics_on_crc_mismatch() {
3230 let dir = tempfile::tempdir().expect("create tempdir");
3231 let wal_path = dir.path().join("journal.wal");
3232 let checkpoint_path = checkpoint_path_for_wal(&wal_path);
3233
3234 let mut bytes = Vec::new();
3235 bytes.extend_from_slice(&1u32.to_le_bytes());
3236 bytes.extend_from_slice(&0u32.to_le_bytes());
3237 bytes.push(0xAB);
3238 std::fs::write(&wal_path, bytes).expect("write malformed wal");
3239
3240 WalState::read_unreplicated_records(&wal_path, &checkpoint_path)
3241 .expect("crc mismatch must panic before returning");
3242 }
3243
3244 #[test]
3245 fn wal_preallocation_extends_on_demand() {
3246 let dir = tempfile::tempdir().expect("create tempdir");
3247 let wal_path = dir.path().join("journal.wal");
3248
3249 let mut wal = WalState::open_at(wal_path).expect("open wal");
3250
3251 assert_eq!(wal.preallocated_end, 0);
3253
3254 let allocated = wal.ensure_preallocated();
3256
3257 #[cfg(target_os = "linux")]
3258 {
3259 let _ = allocated;
3263 }
3264
3265 #[cfg(not(target_os = "linux"))]
3266 {
3267 assert_eq!(allocated, 0, "Non-Linux should no-op");
3268 }
3269 }
3270
3271 #[cfg(not(feature = "rsm-state"))]
3272 #[test]
3273 fn wal_writer_thread_shutdown() {
3274 let dir = tempfile::tempdir().expect("create tempdir");
3275 let wal_path = dir.path().join("journal.wal");
3276
3277 let wal_state = WalState::open_at(wal_path).expect("open wal");
3278 let (wal_writer_tx, wal_writer_rx) = std::sync::mpsc::sync_channel(16);
3279 let (replication_tx, mut replication_rx) = mpsc::channel(16);
3280
3281 let handle = std::thread::Builder::new()
3282 .name("test-wal-writer".to_string())
3283 .spawn(move || {
3284 wal_writer_thread_main(
3285 wal_state,
3286 wal_writer_rx,
3287 replication_tx,
3288 #[cfg(feature = "rsm-state")]
3289 test_rsm_writer_state(),
3290 );
3291 })
3292 .expect("spawn test wal-writer thread");
3293
3294 let entry = JournalEntry {
3296 request_uuid: hypercall_db_diesel::engine_enums::DbUuid(uuid::Uuid::new_v4()),
3297 received_ts_ms: 1,
3298 command_data: vec![1],
3299 response_data: None,
3300 order_id: None,
3301 pre_digest: EngineStateDigest::default(),
3302 post_digest: EngineStateDigest::default(),
3303 duration_ms: 0,
3304 events: vec![],
3305 outbox_appends: vec![],
3306 fill_side_effects: vec![],
3307 cash_withdrawal_side_effect: None,
3308 balance_updates: Vec::new(),
3309 created_at: Instant::now(),
3310 commit_ack: None,
3311 command_type_enum: Some(hypercall_db_diesel::engine_enums::CommandType::CreateOrder),
3312 #[cfg(feature = "rsm-state")]
3313 command_identity_hash: [0u8; 32],
3314 #[cfg(feature = "rsm-state")]
3315 rsm_state_digest: Some(crate::rsm::engine_snapshot::EngineStateDigest::empty()),
3316 };
3317
3318 let (ack_tx, ack_rx) = oneshot::channel();
3319 let (complete_tx, complete_rx) = oneshot::channel();
3320
3321 wal_writer_tx
3322 .send(WalWriterMessage::WriteBatch {
3323 entries: vec![entry],
3324 ack_senders: vec![Some(ack_tx)],
3325 completion: Some(complete_tx),
3326 })
3327 .expect("send to wal-writer");
3328
3329 complete_rx.blocking_recv().expect("completion signal");
3331 ack_rx.blocking_recv().expect("ack signal");
3333
3334 match replication_rx.blocking_recv() {
3336 Some(ReplicationMessage::Entries(records)) => {
3337 assert_eq!(records.len(), 1);
3338 }
3339 other => panic!("Expected Entries, got {:?}", other.is_some()),
3340 }
3341
3342 wal_writer_tx
3344 .send(WalWriterMessage::Shutdown)
3345 .expect("send shutdown");
3346 handle.join().expect("wal-writer thread join");
3347 }
3348
3349 fn wal_bytes_for_entry(entry: &JournalEntry) -> Vec<u8> {
3361 let record = WalJournalRecord::from_entry(entry);
3362 let payload = rmp_serde::to_vec_named(&record).expect("serialize wal record");
3363 let payload_len = payload.len() as u32;
3364 let crc = WAL_CRC.checksum(&payload);
3365
3366 let mut buf = Vec::with_capacity(8 + payload.len());
3367 buf.extend_from_slice(&payload_len.to_le_bytes());
3368 buf.extend_from_slice(&crc.to_le_bytes());
3369 buf.extend_from_slice(&payload);
3370 buf
3371 }
3372
3373 fn make_test_entry(index: u8) -> JournalEntry {
3375 JournalEntry {
3376 request_uuid: hypercall_db_diesel::engine_enums::DbUuid(uuid::Uuid::new_v4()),
3377 received_ts_ms: 1000 + index as u64,
3378 command_data: vec![index],
3379 response_data: None,
3380 order_id: Some(index as i64),
3381 pre_digest: EngineStateDigest::default(),
3382 post_digest: EngineStateDigest::default(),
3383 duration_ms: 1,
3384 events: vec![],
3385 outbox_appends: vec![],
3386 fill_side_effects: vec![],
3387 cash_withdrawal_side_effect: None,
3388 balance_updates: Vec::new(),
3389 created_at: Instant::now(),
3390 commit_ack: None,
3391 command_type_enum: Some(hypercall_db_diesel::engine_enums::CommandType::CreateOrder),
3392 #[cfg(feature = "rsm-state")]
3393 command_identity_hash: [0u8; 32],
3394 #[cfg(feature = "rsm-state")]
3395 rsm_state_digest: Some(crate::rsm::engine_snapshot::EngineStateDigest::empty()),
3396 }
3397 }
3398
3399 #[cfg(feature = "rsm-state")]
3400 fn make_test_event(index: u8) -> EventPayload {
3401 EventPayload {
3402 event_topic: "test.topic".to_string(),
3403 event_key: Some(format!("key-{index}")),
3404 event_data: vec![index, index.wrapping_add(1)],
3405 l2_sequence: Some(index as i64),
3406 event_type_enum: hypercall_db_diesel::engine_enums::EventType::OrderUpdate,
3407 }
3408 }
3409
3410 #[cfg(feature = "rsm-state")]
3411 fn rsm_test_roots(entries: &[JournalEntry], identity_hashes: &[[u8; 32]]) -> RsmBatchRoots {
3412 let dir = tempfile::tempdir().expect("create tempdir");
3413 let store = crate::rsm::commitment_store::ValidatorRsmCommitmentStore::open(
3414 ValidatorRsmCommitmentStoreConfig {
3415 root_dir: dir.path().join("state"),
3416 prune_window: 100,
3417 pruning_enabled: false,
3418 },
3419 )
3420 .expect("open commitment store");
3421 let mut runtime = store.open_pipeline(0).expect("open pipeline");
3422 let delta = rsm_state_delta_from_entries(entries, identity_hashes, 0)
3423 .expect("materialize test delta");
3424 runtime.commit(&delta).expect("commit test delta").into()
3425 }
3426
3427 #[cfg(feature = "rsm-state")]
3428 fn make_test_rsm_entry(index: u8) -> JournalEntry {
3429 let mut entry = make_test_entry(index);
3430 entry.command_type_enum =
3431 Some(hypercall_db_diesel::engine_enums::CommandType::TickSnapshot);
3432 entry
3433 }
3434
3435 #[cfg(feature = "rsm-state")]
3436 fn make_test_rsm_block_records(
3437 batch_seq: u64,
3438 prev_block_hash: [u8; 32],
3439 first_seq: u64,
3440 entries: &[JournalEntry],
3441 ) -> Vec<WalRecordWithOffset> {
3442 let identity_hashes = engine_command_identity_hashes(entries);
3443 let roots = rsm_test_roots(entries, &identity_hashes);
3444 let header = WalBlockRecord::unsigned_from_identity_hashes(
3445 batch_seq,
3446 prev_block_hash,
3447 &identity_hashes,
3448 roots.batch_root,
3449 first_seq,
3450 1_700_000_000_000 + batch_seq,
3451 [0u8; 20],
3452 );
3453
3454 entries
3455 .iter()
3456 .enumerate()
3457 .map(|(index, entry)| WalRecordWithOffset {
3458 record: WalJournalRecord::from_entry(entry),
3459 end_offset: 100 + batch_seq,
3460 block_header: (index == 0).then_some(header.clone()),
3461 block_roots: (index == 0).then_some(roots),
3462 })
3463 .collect()
3464 }
3465
3466 #[test]
3467 fn wal_recovery_empty_wal_file() {
3468 let dir = tempfile::tempdir().expect("create tempdir");
3470 let wal_path = dir.path().join("journal.wal");
3471 let checkpoint_path = checkpoint_path_for_wal(&wal_path);
3472
3473 std::fs::write(&wal_path, b"").expect("write empty wal");
3475
3476 let records = WalState::read_unreplicated_records(&wal_path, &checkpoint_path)
3478 .expect("should recover empty wal");
3479 assert!(records.is_empty(), "empty WAL should yield zero records");
3480 }
3481
3482 #[test]
3483 fn replication_chunk_extends_to_keep_physical_wal_block_atomic() {
3484 let pending: Vec<WalRecordWithOffset> = [(0, 100), (1, 100), (2, 100), (3, 200), (4, 200)]
3485 .into_iter()
3486 .map(|(index, end_offset)| WalRecordWithOffset {
3487 record: WalJournalRecord::from_entry(&make_test_entry(index)),
3488 end_offset,
3489 #[cfg(feature = "rsm-state")]
3490 block_header: None,
3491 #[cfg(feature = "rsm-state")]
3492 block_roots: None,
3493 })
3494 .collect();
3495
3496 assert_eq!(next_replication_chunk_len(&pending, 2), 3);
3497 assert_eq!(next_replication_chunk_len(&pending[3..], 1), 2);
3498 }
3499
3500 #[test]
3501 fn wal_replay_defers_next_physical_record_to_cap_command_batch() {
3502 assert!(!should_defer_replay_record(0, 100, 10));
3503 assert!(!should_defer_replay_record(5, 5, 10));
3504 assert!(should_defer_replay_record(5, 6, 10));
3505 assert!(should_defer_replay_record(1, 1, 0));
3506 }
3507
3508 #[cfg(feature = "rsm-state")]
3509 #[test]
3510 fn block_commitment_excludes_non_engine_journal_entries() {
3511 let mut engine_entry = make_test_rsm_entry(1);
3512 engine_entry.command_identity_hash = [1u8; 32];
3513 engine_entry.events.push(make_test_event(1));
3514
3515 let mut passthrough_entry = make_test_entry(2);
3516 passthrough_entry.command_type_enum = None;
3517 passthrough_entry.command_identity_hash = [0u8; 32];
3518 passthrough_entry.events.push(make_test_event(2));
3519
3520 let hashes = engine_command_identity_hashes(&[engine_entry, passthrough_entry]);
3521
3522 assert_eq!(hashes, vec![[1u8; 32]]);
3523 assert_ne!(
3524 hypercall_blocks::compute_commands_hash(&hashes),
3525 hypercall_blocks::compute_commands_hash(&[[1u8; 32], [0u8; 32]])
3526 );
3527 let mut engine_only_entry = make_test_rsm_entry(1);
3528 engine_only_entry.command_identity_hash = [1u8; 32];
3529 engine_only_entry.events.push(make_test_event(1));
3530
3531 let mut mixed_engine_entry = make_test_rsm_entry(1);
3532 mixed_engine_entry.command_identity_hash = [1u8; 32];
3533 mixed_engine_entry.events.push(make_test_event(1));
3534
3535 let mut mixed_passthrough_entry = make_test_entry(2);
3536 mixed_passthrough_entry.command_type_enum = None;
3537 mixed_passthrough_entry.command_identity_hash = [0u8; 32];
3538 mixed_passthrough_entry.events.push(make_test_event(2));
3539
3540 assert_eq!(
3541 rsm_test_roots(&[engine_only_entry], &hashes).obligation_mmr_root,
3542 rsm_test_roots(&[mixed_engine_entry, mixed_passthrough_entry], &hashes)
3543 .obligation_mmr_root
3544 );
3545 }
3546
3547 #[cfg(feature = "rsm-state")]
3548 #[test]
3549 fn block_commitment_excludes_unsupported_rsm_commands() {
3550 let mut supported_entry = make_test_rsm_entry(1);
3551 supported_entry.command_identity_hash = [1u8; 32];
3552 supported_entry.events.push(make_test_event(1));
3553
3554 let mut agent_auth_entry = make_test_rsm_entry(2);
3555 agent_auth_entry.command_type_enum =
3556 Some(hypercall_db_diesel::engine_enums::CommandType::ApproveAgent);
3557 agent_auth_entry.command_identity_hash = [2u8; 32];
3558
3559 let hashes = engine_command_identity_hashes(&[supported_entry, agent_auth_entry]);
3560
3561 assert_eq!(hashes, vec![[1u8; 32]]);
3562 }
3563
3564 #[cfg(feature = "rsm-state")]
3565 #[test]
3566 fn rsm_batch_roots_are_deterministic_and_nonzero() {
3567 let mut entry = make_test_rsm_entry(1);
3568 entry.command_identity_hash = [9u8; 32];
3569 entry.events.push(make_test_event(1));
3570
3571 let roots = rsm_test_roots(&[entry], &[[9u8; 32]]);
3572 let mut same_entry = make_test_rsm_entry(1);
3573 same_entry.command_identity_hash = [9u8; 32];
3574 same_entry.events.push(make_test_event(1));
3575
3576 assert_ne!(roots.state_root, [0u8; 32]);
3577 assert_ne!(roots.command_mmr_root, [0u8; 32]);
3578 assert_ne!(roots.obligation_mmr_root, [0u8; 32]);
3579 assert_ne!(roots.intent_mmr_root, [0u8; 32]);
3580 assert_ne!(roots.batch_root, [0u8; 32]);
3581 assert_eq!(
3582 roots.batch_root,
3583 rsm_test_roots(&[same_entry], &[[9u8; 32]]).batch_root
3584 );
3585 }
3586
3587 #[cfg(feature = "rsm-state")]
3588 #[test]
3589 fn rsm_batch_roots_change_with_state_intents_and_obligations() {
3590 let mut entry = make_test_rsm_entry(1);
3591 entry.command_identity_hash = [9u8; 32];
3592 entry.events.push(make_test_event(1));
3593 let base = rsm_test_roots(&[entry], &[[9u8; 32]]);
3594
3595 let mut changed_state = make_test_rsm_entry(1);
3596 changed_state.command_identity_hash = [9u8; 32];
3597 changed_state.events.push(make_test_event(1));
3598 assert_ne!(
3599 base.state_root,
3600 rsm_state_delta_from_entries(&[changed_state], &[[9u8; 32]], 7)
3601 .and_then(|delta| {
3602 let dir = tempfile::tempdir().unwrap();
3603 let store = crate::rsm::commitment_store::ValidatorRsmCommitmentStore::open(
3604 ValidatorRsmCommitmentStoreConfig {
3605 root_dir: dir.path().join("state"),
3606 prune_window: 100,
3607 pruning_enabled: false,
3608 },
3609 )?;
3610 Ok(store.open_pipeline(0)?.commit(&delta)?.state_root.0)
3611 })
3612 .unwrap()
3613 );
3614
3615 let mut same_entry = make_test_rsm_entry(1);
3616 same_entry.command_identity_hash = [9u8; 32];
3617 same_entry.events.push(make_test_event(1));
3618 assert_ne!(
3619 base.intent_mmr_root,
3620 rsm_test_roots(&[same_entry], &[[8u8; 32]]).intent_mmr_root
3621 );
3622
3623 let mut changed_obligation = make_test_rsm_entry(1);
3624 changed_obligation.command_identity_hash = [9u8; 32];
3625 changed_obligation.events.push(make_test_event(1));
3626 changed_obligation.events[0].event_data.push(42);
3627 assert_ne!(
3628 base.obligation_mmr_root,
3629 rsm_test_roots(&[changed_obligation], &[[9u8; 32]]).obligation_mmr_root
3630 );
3631 }
3632
3633 #[cfg(feature = "rsm-state")]
3634 #[test]
3635 fn rsm_batch_roots_from_wal_tolerates_legacy_records_without_state_digest() {
3636 let mut entry = make_test_entry(1);
3637 entry.command_identity_hash = [1u8; 32];
3638 entry.rsm_state_digest = None;
3639 let record = WalRecordWithOffset {
3640 record: WalJournalRecord::from_entry(&entry),
3641 end_offset: 100,
3642 block_header: None,
3643 block_roots: None,
3644 };
3645 assert!(rsm_batch_roots_from_wal(&[record]).is_none());
3646 }
3647
3648 #[test]
3649 fn wal_recovery_nonexistent_wal_file() {
3650 let dir = tempfile::tempdir().expect("create tempdir");
3652 let wal_path = dir.path().join("nonexistent.wal");
3653 let checkpoint_path = checkpoint_path_for_wal(&wal_path);
3654
3655 let records = WalState::read_unreplicated_records(&wal_path, &checkpoint_path)
3656 .expect("missing WAL should be no-op");
3657 assert!(records.is_empty());
3658 }
3659
3660 #[test]
3661 fn wal_recovery_small_wal_few_records() {
3662 let dir = tempfile::tempdir().expect("create tempdir");
3665 let wal_path = dir.path().join("journal.wal");
3666
3667 let mut wal = WalState::open_at(wal_path).expect("open wal");
3668
3669 let entries: Vec<JournalEntry> = (0..5).map(make_test_entry).collect();
3670 let request_uuids: Vec<_> = entries.iter().map(|e| e.request_uuid).collect();
3671
3672 wal.append_batch(&entries).expect("append batch");
3673
3674 write_checkpoint(
3676 &wal.checkpoint_path,
3677 WalCheckpointMetadata {
3678 wal_offset: 0,
3679 last_command_id: 0,
3680 last_l2_seq: 0,
3681 },
3682 )
3683 .expect("write checkpoint");
3684
3685 let records = WalState::read_unreplicated_records(&wal.wal_path, &wal.checkpoint_path)
3686 .expect("read records");
3687 assert_eq!(records.len(), 5);
3688
3689 for (i, record) in records.iter().enumerate() {
3690 assert_eq!(
3691 record.record.request_uuid, request_uuids[i],
3692 "request_uuid mismatch at index {}",
3693 i
3694 );
3695 assert_eq!(record.record.command_data, vec![i as u8]);
3696 assert_eq!(record.record.received_ts_ms, 1000 + i as u64);
3697 assert_eq!(record.record.order_id, Some(i as i64));
3698 }
3699
3700 for i in 1..records.len() {
3702 assert!(
3703 records[i].end_offset > records[i - 1].end_offset,
3704 "offsets must increase monotonically"
3705 );
3706 }
3707 }
3708
3709 #[test]
3710 fn wal_recovery_resumes_from_checkpoint() {
3711 let dir = tempfile::tempdir().expect("create tempdir");
3714 let wal_path = dir.path().join("journal.wal");
3715
3716 let mut wal = WalState::open_at(wal_path).expect("open wal");
3717
3718 let entries: Vec<JournalEntry> = (0..5).map(make_test_entry).collect();
3719 let request_uuids: Vec<_> = entries.iter().map(|e| e.request_uuid).collect();
3720
3721 let offsets = wal.append_batch(&entries).expect("append batch");
3722 assert_eq!(offsets.len(), 5);
3723
3724 let checkpoint_offset = offsets[1].end_offset;
3726 write_checkpoint(
3727 &wal.checkpoint_path,
3728 WalCheckpointMetadata {
3729 wal_offset: checkpoint_offset,
3730 last_command_id: 0,
3731 last_l2_seq: 0,
3732 },
3733 )
3734 .expect("write checkpoint");
3735
3736 let records = WalState::read_unreplicated_records(&wal.wal_path, &wal.checkpoint_path)
3737 .expect("read records after checkpoint");
3738 assert_eq!(records.len(), 3, "should recover only records 2,3,4");
3739
3740 for (i, record) in records.iter().enumerate() {
3742 let entry_index = i + 2;
3743 assert_eq!(
3744 record.record.request_uuid, request_uuids[entry_index],
3745 "request_uuid mismatch at recovered index {}",
3746 i
3747 );
3748 }
3749 }
3750
3751 #[test]
3752 fn wal_recovery_truncated_after_length_prefix() {
3753 let dir = tempfile::tempdir().expect("create tempdir");
3757 let wal_path = dir.path().join("journal.wal");
3758 let checkpoint_path = checkpoint_path_for_wal(&wal_path);
3759
3760 let entry = make_test_entry(0);
3762 let mut wal_data = wal_bytes_for_entry(&entry);
3763 let complete_record_len = wal_data.len();
3764
3765 wal_data.extend_from_slice(&42u32.to_le_bytes());
3767
3768 std::fs::write(&wal_path, &wal_data).expect("write wal with truncated tail");
3769
3770 let records = WalState::read_unreplicated_records(&wal_path, &checkpoint_path)
3772 .expect("should recover despite truncated tail");
3773 assert_eq!(records.len(), 1, "should recover the one complete record");
3774 assert_eq!(records[0].end_offset, complete_record_len as u64);
3775 }
3776
3777 #[test]
3778 fn wal_recovery_truncated_after_crc() {
3779 let dir = tempfile::tempdir().expect("create tempdir");
3782 let wal_path = dir.path().join("journal.wal");
3783 let checkpoint_path = checkpoint_path_for_wal(&wal_path);
3784
3785 let entries: Vec<JournalEntry> = (0..2).map(make_test_entry).collect();
3787 let mut wal_data = Vec::new();
3788 for entry in &entries {
3789 wal_data.extend(wal_bytes_for_entry(entry));
3790 }
3791
3792 wal_data.extend_from_slice(&100u32.to_le_bytes()); wal_data.extend_from_slice(&0u32.to_le_bytes()); wal_data.push(0xFF); std::fs::write(&wal_path, &wal_data).expect("write wal with truncated payload");
3799
3800 let records = WalState::read_unreplicated_records(&wal_path, &checkpoint_path)
3801 .expect("should recover despite truncated payload");
3802 assert_eq!(
3803 records.len(),
3804 2,
3805 "should recover both complete records before the truncation"
3806 );
3807 }
3808
3809 #[test]
3810 fn wal_recovery_truncated_mid_length_prefix() {
3811 let dir = tempfile::tempdir().expect("create tempdir");
3814 let wal_path = dir.path().join("journal.wal");
3815 let checkpoint_path = checkpoint_path_for_wal(&wal_path);
3816
3817 let entry = make_test_entry(0);
3818 let mut wal_data = wal_bytes_for_entry(&entry);
3819
3820 wal_data.extend_from_slice(&[0x01, 0x00]);
3822
3823 std::fs::write(&wal_path, &wal_data).expect("write wal with partial length prefix");
3824
3825 let records = WalState::read_unreplicated_records(&wal_path, &checkpoint_path)
3826 .expect("should recover despite partial length prefix");
3827 assert_eq!(records.len(), 1, "should recover the one complete record");
3828 }
3829
3830 #[test]
3831 fn wal_recovery_corrupt_giant_payload_len() {
3832 let dir = tempfile::tempdir().expect("create tempdir");
3836 let wal_path = dir.path().join("journal.wal");
3837 let checkpoint_path = checkpoint_path_for_wal(&wal_path);
3838
3839 let entry = make_test_entry(0);
3841 let mut wal_data = wal_bytes_for_entry(&entry);
3842 let complete_len = wal_data.len();
3843
3844 wal_data.extend_from_slice(&0x8000_0000u32.to_le_bytes()); wal_data.extend_from_slice(&0u32.to_le_bytes()); wal_data.extend_from_slice(&[0xAB; 16]); std::fs::write(&wal_path, &wal_data).expect("write wal with corrupt length");
3851
3852 let records = WalState::read_unreplicated_records(&wal_path, &checkpoint_path)
3853 .expect("should recover despite corrupt length prefix");
3854 assert_eq!(
3855 records.len(),
3856 1,
3857 "should recover the one complete record before the corrupt length"
3858 );
3859 assert_eq!(records[0].end_offset, complete_len as u64);
3860 }
3861
3862 #[test]
3863 fn wal_recovery_payload_len_exactly_exceeds_remaining() {
3864 let dir = tempfile::tempdir().expect("create tempdir");
3867 let wal_path = dir.path().join("journal.wal");
3868 let checkpoint_path = checkpoint_path_for_wal(&wal_path);
3869
3870 let entry = make_test_entry(0);
3871 let mut wal_data = wal_bytes_for_entry(&entry);
3872
3873 wal_data.extend_from_slice(&200u32.to_le_bytes());
3877
3878 std::fs::write(&wal_path, &wal_data).expect("write wal");
3879
3880 let records = WalState::read_unreplicated_records(&wal_path, &checkpoint_path)
3881 .expect("should recover despite oversized payload_len");
3882 assert_eq!(records.len(), 1);
3883 }
3884
3885 #[test]
3886 fn wal_recovery_manual_bytes_match_wal_state_writer() {
3887 let dir = tempfile::tempdir().expect("create tempdir");
3890 let wal_path = dir.path().join("journal.wal");
3891
3892 let entry = make_test_entry(42);
3893 let manual_bytes = wal_bytes_for_entry(&entry);
3894
3895 let mut wal = WalState::open_at(wal_path.clone()).expect("open wal");
3897 wal.append_batch(&[entry]).expect("append batch");
3898 let wal_state_bytes = std::fs::read(&wal_path).expect("read wal file");
3899
3900 assert_eq!(
3901 manual_bytes, wal_state_bytes,
3902 "Manual WAL byte construction must match WalState writer output"
3903 );
3904 }
3905
3906 #[cfg(feature = "rsm-state")]
3911 #[test]
3912 fn test_wal_block_entry_roundtrip() {
3913 let commands: Vec<WalJournalRecord> = (0..3)
3916 .map(|i| {
3917 let entry = make_test_entry(i);
3918 WalJournalRecord::from_entry(&entry)
3919 })
3920 .collect();
3921
3922 let header = WalBlockRecord {
3923 batch_seq: 0,
3924 prev_block_hash: [0u8; 32],
3925 commands_hash: [1u8; 32],
3926 batch_root: [0u8; 32],
3927 command_count: 3,
3928 first_seq: 0,
3929 timestamp: 1700000000000,
3930 signer: [0u8; 20],
3931 signature: vec![0u8; 65],
3932 };
3933
3934 let block_entry = WalBlockEntry {
3935 header: header.clone(),
3936 commands: commands.clone(),
3937 };
3938
3939 let serialized = rmp_serde::to_vec_named(&block_entry).expect("serialize WalBlockEntry");
3940 let deserialized: WalBlockEntry =
3941 rmp_serde::from_slice(&serialized).expect("deserialize WalBlockEntry");
3942
3943 assert_eq!(deserialized.header.batch_seq, header.batch_seq);
3945 assert_eq!(deserialized.header.prev_block_hash, header.prev_block_hash);
3946 assert_eq!(deserialized.header.commands_hash, header.commands_hash);
3947 assert_eq!(deserialized.header.batch_root, header.batch_root);
3948 assert_eq!(deserialized.header.command_count, header.command_count);
3949 assert_eq!(deserialized.header.first_seq, header.first_seq);
3950 assert_eq!(deserialized.header.timestamp, header.timestamp);
3951 assert_eq!(deserialized.header.signer, header.signer);
3952 assert_eq!(deserialized.header.signature, header.signature);
3953
3954 assert_eq!(deserialized.commands.len(), 3);
3956 for (i, cmd) in deserialized.commands.iter().enumerate() {
3957 assert_eq!(cmd.command_data, commands[i].command_data);
3958 assert_eq!(cmd.received_ts_ms, commands[i].received_ts_ms);
3959 assert_eq!(cmd.order_id, commands[i].order_id);
3960 assert_eq!(cmd.request_uuid, commands[i].request_uuid);
3961 }
3962 }
3963
3964 #[cfg(feature = "rsm-state")]
3965 #[test]
3966 fn test_wal_block_entry_backward_compat() {
3967 let entry = make_test_entry(42);
3971 let record = WalJournalRecord::from_entry(&entry);
3972 let serialized = rmp_serde::to_vec_named(&record).expect("serialize WalJournalRecord");
3973
3974 let block_attempt = rmp_serde::from_slice::<WalBlockEntry>(&serialized);
3976 assert!(
3977 block_attempt.is_err(),
3978 "Old-format WalJournalRecord must not deserialize as WalBlockEntry"
3979 );
3980
3981 let roundtripped: WalJournalRecord =
3983 rmp_serde::from_slice(&serialized).expect("deserialize WalJournalRecord");
3984 assert_eq!(roundtripped.request_uuid, record.request_uuid);
3985 assert_eq!(roundtripped.command_data, record.command_data);
3986 assert_eq!(roundtripped.received_ts_ms, record.received_ts_ms);
3987 assert_eq!(roundtripped.order_id, record.order_id);
3988 }
3989
3990 #[cfg(feature = "rsm-state")]
3991 #[test]
3992 #[should_panic(expected = "legacy rootless RSM WAL block")]
3993 fn wal_recovery_rejects_legacy_rootless_rsm_block() {
3994 let dir = tempfile::tempdir().expect("create tempdir");
3995 let wal_path = dir.path().join("journal.wal");
3996 let checkpoint_path = checkpoint_path_for_wal(&wal_path);
3997 let command = WalJournalRecord::from_entry(&make_test_entry(42));
3998 let block_entry = WalBlockEntry {
3999 header: WalBlockRecord {
4000 batch_seq: 0,
4001 prev_block_hash: [0u8; 32],
4002 commands_hash: [1u8; 32],
4003 batch_root: [2u8; 32],
4004 command_count: 1,
4005 first_seq: 0,
4006 timestamp: 1_700_000_000_000,
4007 signer: [0u8; 20],
4008 signature: vec![0u8; 65],
4009 },
4010 commands: vec![command],
4011 };
4012 let payload = rmp_serde::to_vec_named(&block_entry).expect("serialize legacy block");
4013 let mut file = File::create(&wal_path).expect("create wal");
4014 let mut offset = 0;
4015 write_frame(&mut file, &payload, &mut offset, "legacy block").expect("write frame");
4016 drop(file);
4017
4018 let _ = WalState::read_unreplicated_records(&wal_path, &checkpoint_path);
4019 }
4020
4021 #[cfg(feature = "rsm-state")]
4022 #[test]
4023 fn test_wal_block_entry_with_empty_commands() {
4024 let header = WalBlockRecord {
4026 batch_seq: 0,
4027 prev_block_hash: [0u8; 32],
4028 commands_hash: [0u8; 32],
4029 batch_root: [0u8; 32],
4030 command_count: 0,
4031 first_seq: 0,
4032 timestamp: 1700000000000,
4033 signer: [0u8; 20],
4034 signature: vec![0u8; 65],
4035 };
4036
4037 let block_entry = WalBlockEntry {
4038 header: header.clone(),
4039 commands: vec![],
4040 };
4041
4042 let serialized = rmp_serde::to_vec_named(&block_entry).expect("serialize empty block");
4043 let deserialized: WalBlockEntry =
4044 rmp_serde::from_slice(&serialized).expect("deserialize empty block");
4045
4046 assert_eq!(deserialized.header.batch_seq, 0);
4047 assert_eq!(deserialized.header.command_count, 0);
4048 assert_eq!(deserialized.header.timestamp, 1700000000000);
4049 assert!(
4050 deserialized.commands.is_empty(),
4051 "Empty block must have no commands"
4052 );
4053 }
4054
4055 async fn setup_test_pool() -> (
4067 hypercall_db_diesel::DbPool,
4068 Option<testcontainers::ContainerAsync<testcontainers_modules::postgres::Postgres>>,
4069 crate::test_contracts::TestnetContractEnvGuard,
4070 ) {
4071 use crate::test_contracts::test_database_url_with_database;
4072 use testcontainers::runners::AsyncRunner;
4073 use testcontainers::ImageExt;
4074 use testcontainers_modules::postgres::Postgres;
4075
4076 let env_guard = crate::test_contracts::TestnetContractEnvGuard::apply();
4077
4078 let (database_url, container) = if let Ok(url) = std::env::var("TEST_DATABASE_URL") {
4079 let db_name = format!("test_wal_recovery_{}", uuid::Uuid::new_v4().simple());
4080 let admin_pool = sqlx::postgres::PgPoolOptions::new()
4081 .max_connections(1)
4082 .connect(&url)
4083 .await
4084 .expect("connect to TEST_DATABASE_URL");
4085 sqlx::query(&format!(r#"CREATE DATABASE "{}""#, db_name))
4086 .execute(&admin_pool)
4087 .await
4088 .expect("create isolated test database");
4089 admin_pool.close().await;
4090 let db_url = test_database_url_with_database(&url, &db_name);
4091 (db_url, None)
4092 } else {
4093 let container = Postgres::default()
4094 .with_db_name("test_wal_recovery")
4095 .with_user("test_user")
4096 .with_password("test_password")
4097 .with_tag("16-alpine")
4098 .start()
4099 .await
4100 .expect("start postgres container");
4101
4102 let port = container
4103 .get_host_port_ipv4(5432)
4104 .await
4105 .expect("get postgres port");
4106
4107 let url = format!(
4108 "postgresql://test_user:test_password@127.0.0.1:{}/test_wal_recovery",
4109 port
4110 );
4111
4112 tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
4114
4115 (url, Some(container))
4116 };
4117
4118 let _handler = crate::db_handler::DieselEventHandler::new(&database_url)
4119 .expect("create diesel handler and run migrations");
4120
4121 let auth = hypercall_db_diesel::DbAuthConfig::password(&database_url);
4122 let pool = hypercall_db_diesel::build_db_pool(&auth, 5, 30_000, 10_000)
4123 .expect("build diesel pool");
4124
4125 (pool, container, env_guard)
4126 }
4127
4128 #[tokio::test]
4129 #[serial_test::serial(testnet_env)]
4130 async fn recover_wal_sync_large_unreplicated_tail() {
4131 let (pool, _container, _env_guard) = setup_test_pool().await;
4137
4138 let dir = tempfile::tempdir().expect("create tempdir");
4139 let wal_path = dir.path().join("journal.wal");
4140 let checkpoint_path = checkpoint_path_for_wal(&wal_path);
4141
4142 let num_entries: usize = 1000;
4144 let mut wal = WalState::open_at(wal_path.clone()).expect("open wal");
4145
4146 let entries: Vec<JournalEntry> = (0..num_entries)
4147 .map(|i| JournalEntry {
4148 request_uuid: hypercall_db_diesel::engine_enums::DbUuid(uuid::Uuid::new_v4()),
4149 received_ts_ms: 1000 + i as u64,
4150 command_data: vec![(i % 256) as u8; 64],
4151 response_data: Some(vec![(i % 256) as u8; 32]),
4152 order_id: Some(i as i64),
4153 pre_digest: EngineStateDigest::default(),
4154 post_digest: EngineStateDigest::default(),
4155 duration_ms: 1,
4156 events: vec![],
4157 outbox_appends: vec![],
4158 fill_side_effects: vec![],
4159 cash_withdrawal_side_effect: None,
4160 balance_updates: Vec::new(),
4161 created_at: Instant::now(),
4162 commit_ack: None,
4163 command_type_enum: Some(
4164 hypercall_db_diesel::engine_enums::CommandType::CreateOrder,
4165 ),
4166 #[cfg(feature = "rsm-state")]
4167 command_identity_hash: [0u8; 32],
4168 #[cfg(feature = "rsm-state")]
4169 rsm_state_digest: Some(crate::rsm::engine_snapshot::EngineStateDigest::empty()),
4170 })
4171 .collect();
4172
4173 let request_uuids: Vec<hypercall_db_diesel::engine_enums::DbUuid> =
4174 entries.iter().map(|e| e.request_uuid).collect();
4175
4176 let offsets = wal.append_batch(&entries).expect("append WAL entries");
4177 assert_eq!(offsets.len(), num_entries);
4178
4179 let wal_end_offset = offsets.last().unwrap().end_offset;
4180 drop(wal);
4181
4182 write_checkpoint(
4184 &checkpoint_path,
4185 WalCheckpointMetadata {
4186 wal_offset: 0,
4187 last_command_id: 0,
4188 last_l2_seq: 0,
4189 },
4190 )
4191 .expect("write initial checkpoint");
4192
4193 let small_batch_size: usize = 10;
4195
4196 let truncated = EngineJournalBatcher::recover_wal_sync(
4198 &pool,
4199 &wal_path,
4200 &checkpoint_path,
4201 false, small_batch_size,
4203 #[cfg(feature = "rsm-state")]
4204 hypercall_db::ValidatorRsmEnvironment::Development,
4205 #[cfg(feature = "rsm-state")]
4206 None,
4207 );
4208
4209 assert!(
4211 truncated.is_none(),
4212 "WAL should not be truncated when all records are complete"
4213 );
4214
4215 let checkpoint = read_checkpoint(&checkpoint_path).expect("read checkpoint");
4217 assert_eq!(
4218 checkpoint.wal_offset, wal_end_offset,
4219 "Checkpoint must advance to end of WAL after full recovery"
4220 );
4221 assert!(
4222 checkpoint.last_command_id > 0,
4223 "Checkpoint must record a positive last_command_id"
4224 );
4225
4226 let mut conn = pool.get().expect("get db connection");
4228 let inserted_count: i64 =
4229 diesel::dsl::sql::<diesel::sql_types::BigInt>("SELECT COUNT(*) FROM engine_commands")
4230 .get_result(&mut conn)
4231 .expect("count engine_commands");
4232
4233 assert_eq!(
4234 inserted_count, num_entries as i64,
4235 "All {} WAL entries must be replicated to engine_commands",
4236 num_entries
4237 );
4238
4239 for &idx in &[0, num_entries / 2, num_entries - 1] {
4241 let uuid_val = request_uuids[idx].0;
4242 let exists: bool = diesel::dsl::sql::<diesel::sql_types::Bool>(&format!(
4243 "SELECT EXISTS(SELECT 1 FROM engine_commands WHERE request_uuid = '{}')",
4244 uuid_val
4245 ))
4246 .get_result(&mut conn)
4247 .expect("check request_uuid exists");
4248
4249 assert!(
4250 exists,
4251 "request_uuid for entry {} must exist in engine_commands",
4252 idx
4253 );
4254 }
4255 }
4256
4257 #[tokio::test]
4258 #[serial_test::serial(testnet_env)]
4259 async fn recover_wal_sync_checkpoint_advances_per_batch() {
4260 let (pool, _container, _env_guard) = setup_test_pool().await;
4264
4265 let dir = tempfile::tempdir().expect("create tempdir");
4266 let wal_path = dir.path().join("journal.wal");
4267 let checkpoint_path = checkpoint_path_for_wal(&wal_path);
4268
4269 let num_entries: usize = 50;
4271 let mut wal = WalState::open_at(wal_path.clone()).expect("open wal");
4272
4273 let entries: Vec<JournalEntry> = (0..num_entries)
4274 .map(|i| JournalEntry {
4275 request_uuid: hypercall_db_diesel::engine_enums::DbUuid(uuid::Uuid::new_v4()),
4276 received_ts_ms: 2000 + i as u64,
4277 command_data: vec![(i % 256) as u8; 16],
4278 response_data: None,
4279 order_id: Some(i as i64),
4280 pre_digest: EngineStateDigest::default(),
4281 post_digest: EngineStateDigest::default(),
4282 duration_ms: 1,
4283 events: vec![],
4284 outbox_appends: vec![],
4285 fill_side_effects: vec![],
4286 cash_withdrawal_side_effect: None,
4287 balance_updates: Vec::new(),
4288 created_at: Instant::now(),
4289 commit_ack: None,
4290 command_type_enum: Some(
4291 hypercall_db_diesel::engine_enums::CommandType::CreateOrder,
4292 ),
4293 #[cfg(feature = "rsm-state")]
4294 command_identity_hash: [0u8; 32],
4295 #[cfg(feature = "rsm-state")]
4296 rsm_state_digest: Some(crate::rsm::engine_snapshot::EngineStateDigest::empty()),
4297 })
4298 .collect();
4299
4300 let offsets = wal.append_batch(&entries).expect("append WAL entries");
4301 let wal_end_offset = offsets.last().unwrap().end_offset;
4302 drop(wal);
4303
4304 EngineJournalBatcher::recover_wal_sync(
4306 &pool,
4307 &wal_path,
4308 &checkpoint_path,
4309 false,
4310 10,
4311 #[cfg(feature = "rsm-state")]
4312 hypercall_db::ValidatorRsmEnvironment::Development,
4313 #[cfg(feature = "rsm-state")]
4314 None,
4315 );
4316
4317 let checkpoint = read_checkpoint(&checkpoint_path).expect("read checkpoint");
4319 assert_eq!(
4320 checkpoint.wal_offset, wal_end_offset,
4321 "Checkpoint must be at end of WAL after complete recovery"
4322 );
4323
4324 let midpoint_offset = offsets[24].end_offset;
4327 write_checkpoint(
4328 &checkpoint_path,
4329 WalCheckpointMetadata {
4330 wal_offset: midpoint_offset,
4331 last_command_id: checkpoint.last_command_id,
4332 last_l2_seq: checkpoint.last_l2_seq,
4333 },
4334 )
4335 .expect("write midpoint checkpoint");
4336
4337 EngineJournalBatcher::recover_wal_sync(
4338 &pool,
4339 &wal_path,
4340 &checkpoint_path,
4341 false,
4342 10,
4343 #[cfg(feature = "rsm-state")]
4344 hypercall_db::ValidatorRsmEnvironment::Development,
4345 #[cfg(feature = "rsm-state")]
4346 None,
4347 );
4348
4349 let checkpoint2 =
4351 read_checkpoint(&checkpoint_path).expect("read checkpoint after re-recovery");
4352 assert_eq!(checkpoint2.wal_offset, wal_end_offset);
4353
4354 let mut conn = pool.get().expect("get db connection");
4356 let count: i64 =
4357 diesel::dsl::sql::<diesel::sql_types::BigInt>("SELECT COUNT(*) FROM engine_commands")
4358 .get_result(&mut conn)
4359 .expect("count engine_commands");
4360
4361 assert_eq!(
4362 count, num_entries as i64,
4363 "Re-recovery must not create duplicate rows"
4364 );
4365 }
4366
4367 #[tokio::test]
4368 #[serial_test::serial(testnet_env)]
4369 async fn recover_wal_sync_duplicate_request_id_persists_digest_once() {
4370 let (pool, _container, _env_guard) = setup_test_pool().await;
4371
4372 let dir = tempfile::tempdir().expect("create tempdir");
4373 let wal_path = dir.path().join("journal.wal");
4374 let checkpoint_path = checkpoint_path_for_wal(&wal_path);
4375 let duplicate_request_uuid =
4376 hypercall_db_diesel::engine_enums::DbUuid(uuid::Uuid::new_v4());
4377 let mut wal = WalState::open_at(wal_path.clone()).expect("open wal");
4378
4379 let entries: Vec<JournalEntry> = (0..2)
4380 .map(|i| JournalEntry {
4381 request_uuid: duplicate_request_uuid,
4382 received_ts_ms: 3000 + i as u64,
4383 command_data: vec![i as u8; 16],
4384 response_data: None,
4385 order_id: None,
4386 pre_digest: EngineStateDigest::default(),
4387 post_digest: EngineStateDigest::default(),
4388 duration_ms: 1,
4389 events: vec![EventPayload {
4390 event_topic: "test.recovery".to_string(),
4391 event_key: Some(format!("duplicate-event-{i}")),
4392 event_data: vec![1, i as u8],
4393 l2_sequence: Some(i as i64),
4394 event_type_enum: hypercall_db_diesel::engine_enums::EventType::OrderbookUpdated,
4395 }],
4396 outbox_appends: vec![],
4397 balance_updates: vec![],
4398 fill_side_effects: vec![],
4399 cash_withdrawal_side_effect: None,
4400 created_at: Instant::now(),
4401 commit_ack: None,
4402 command_type_enum: Some(
4403 hypercall_db_diesel::engine_enums::CommandType::DepositUpdate,
4404 ),
4405 #[cfg(feature = "rsm-state")]
4406 command_identity_hash: [0u8; 32],
4407 #[cfg(feature = "rsm-state")]
4408 rsm_state_digest: Some(crate::rsm::engine_snapshot::EngineStateDigest::empty()),
4409 })
4410 .collect();
4411
4412 let offsets = wal.append_batch(&entries).expect("append WAL entries");
4413 let wal_end_offset = offsets.last().unwrap().end_offset;
4414 drop(wal);
4415
4416 EngineJournalBatcher::recover_wal_sync(
4417 &pool,
4418 &wal_path,
4419 &checkpoint_path,
4420 true,
4421 10,
4422 #[cfg(feature = "rsm-state")]
4423 hypercall_db::ValidatorRsmEnvironment::Development,
4424 #[cfg(feature = "rsm-state")]
4425 None,
4426 );
4427
4428 let checkpoint = read_checkpoint(&checkpoint_path).expect("read checkpoint");
4429 assert_eq!(
4430 checkpoint.wal_offset, wal_end_offset,
4431 "duplicate request UUIDs must not block checkpoint advancement"
4432 );
4433
4434 let mut conn = pool.get().expect("get db connection");
4435 let command_count: i64 =
4436 diesel::dsl::sql::<diesel::sql_types::BigInt>("SELECT COUNT(*) FROM engine_commands")
4437 .get_result(&mut conn)
4438 .expect("count engine_commands");
4439 assert_eq!(
4440 command_count, 1,
4441 "duplicate request UUIDs materialize one engine command"
4442 );
4443
4444 let event_count: i64 =
4445 diesel::dsl::sql::<diesel::sql_types::BigInt>("SELECT COUNT(*) FROM engine_events")
4446 .get_result(&mut conn)
4447 .expect("count engine_events");
4448 assert_eq!(
4449 event_count, 1,
4450 "duplicate request UUIDs materialize one command event"
4451 );
4452
4453 let digest_count: i64 = diesel::dsl::sql::<diesel::sql_types::BigInt>(
4454 "SELECT COUNT(*) FROM engine_command_digests",
4455 )
4456 .get_result(&mut conn)
4457 .expect("count engine_command_digests");
4458 assert_eq!(
4459 digest_count, 1,
4460 "duplicate request UUIDs materialize one command digest"
4461 );
4462 }
4463
4464 #[cfg(feature = "rsm-state")]
4465 #[tokio::test]
4466 #[serial_test::serial(testnet_env)]
4467 async fn rsm_wal_recovery_rebuilds_local_commitment_store_before_db_roots() {
4468 let (pool, _container, _env_guard) = setup_test_pool().await;
4469 let env = hypercall_db::ValidatorRsmEnvironment::Development;
4470 let dir = tempfile::tempdir().expect("create tempdir");
4471 let config = ValidatorRsmCommitmentStoreConfig {
4472 root_dir: dir.path().join("state"),
4473 prune_window: 100,
4474 pruning_enabled: false,
4475 };
4476
4477 let mut runtime = ValidatorRsmStateCommitmentRuntime::open_from_db(
4478 config.clone(),
4479 Arc::new(pool.clone()),
4480 env,
4481 )
4482 .expect("open empty commitment runtime");
4483
4484 let mut entry = make_test_rsm_entry(1);
4485 entry.command_identity_hash = [7u8; 32];
4486 let entries = [entry];
4487 let mut block = make_test_rsm_block_records(0, [0u8; 32], 0, &entries);
4488
4489 apply_recovered_rsm_commitment_blocks(&mut block, &mut runtime)
4490 .expect("replay should apply local commitment roots before DB persistence");
4491 EngineJournalBatcher::insert_batch(
4492 &pool,
4493 &entries,
4494 false,
4495 Some(
4496 EngineJournalBatcher::rsm_block_persistence_batch(RsmBlockPersistenceInput {
4497 environment: env,
4498 records: &block,
4499 mode: RsmBlockPersistenceMode::Replay,
4500 })
4501 .expect("prepare RSM replay persistence"),
4502 ),
4503 )
4504 .expect("persist replayed roots");
4505
4506 drop(runtime);
4507 ValidatorRsmStateCommitmentRuntime::open_from_db(config, Arc::new(pool.clone()), env)
4508 .expect("strict DB/local root verification should pass after replay");
4509 }
4510
4511 #[cfg(feature = "rsm-state")]
4512 #[tokio::test]
4513 #[serial_test::serial(testnet_env)]
4514 async fn rsm_replay_does_not_rewind_current_pointer() {
4515 let (pool, _container, _env_guard) = setup_test_pool().await;
4516 let env = hypercall_db::ValidatorRsmEnvironment::Development;
4517
4518 let mut entry1 = make_test_rsm_entry(1);
4519 entry1.command_identity_hash = [1u8; 32];
4520 let mut entry2 = make_test_rsm_entry(2);
4521 entry2.command_identity_hash = [2u8; 32];
4522
4523 let entries = [entry1, entry2];
4524
4525 let block1 = make_test_rsm_block_records(1, [0u8; 32], 0, &entries[0..1]);
4526 EngineJournalBatcher::insert_batch(
4527 &pool,
4528 &entries[0..1],
4529 false,
4530 Some(
4531 EngineJournalBatcher::rsm_block_persistence_batch(RsmBlockPersistenceInput {
4532 environment: env,
4533 records: &block1,
4534 mode: RsmBlockPersistenceMode::Live,
4535 })
4536 .expect("prepare RSM live block 1 persistence"),
4537 ),
4538 )
4539 .expect("persist live block 1");
4540 let block1_hash = block1[0]
4541 .block_header
4542 .as_ref()
4543 .expect("block 1 header")
4544 .block_hash();
4545
4546 let block2 = make_test_rsm_block_records(2, block1_hash, 1, &entries[1..2]);
4547 EngineJournalBatcher::insert_batch(
4548 &pool,
4549 &entries[1..2],
4550 false,
4551 Some(
4552 EngineJournalBatcher::rsm_block_persistence_batch(RsmBlockPersistenceInput {
4553 environment: env,
4554 records: &block2,
4555 mode: RsmBlockPersistenceMode::Live,
4556 })
4557 .expect("prepare RSM live block 2 persistence"),
4558 ),
4559 )
4560 .expect("persist live block 2");
4561
4562 let handler =
4563 hypercall_db_diesel::DatabaseHandler::with_pool_no_migrations(Arc::new(pool.clone()));
4564 assert_eq!(
4565 handler
4566 .get_validator_rsm_current_state_sync(env)
4567 .expect("read current")
4568 .expect("current exists")
4569 .current_version,
4570 2
4571 );
4572
4573 EngineJournalBatcher::insert_batch(
4574 &pool,
4575 &entries[0..1],
4576 false,
4577 Some(
4578 EngineJournalBatcher::rsm_block_persistence_batch(RsmBlockPersistenceInput {
4579 environment: env,
4580 records: &block1,
4581 mode: RsmBlockPersistenceMode::Replay,
4582 })
4583 .expect("prepare RSM replay block persistence"),
4584 ),
4585 )
4586 .expect("replay old block without rewinding current");
4587
4588 assert_eq!(
4589 handler
4590 .get_validator_rsm_current_state_sync(env)
4591 .expect("read current after replay")
4592 .expect("current exists")
4593 .current_version,
4594 2
4595 );
4596 }
4597}