hypercall/journal/engine_journal_batcher/
rsm_block_persistence.rs1use super::{rsm_batch_roots_from_wal, EngineJournalBatcher, WalRecordWithOffset};
2use anyhow::{anyhow, Context, Result};
3use tracing::warn;
4
5#[derive(Debug, Clone, Copy, PartialEq, Eq)]
6pub(super) enum RsmBlockPersistenceMode {
7 Live,
8 Replay,
9}
10
11pub(super) struct RsmBlockPersistenceInput<'a> {
12 pub(super) environment: hypercall_db::ValidatorRsmEnvironment,
13 pub(super) records: &'a [WalRecordWithOffset],
14 pub(super) mode: RsmBlockPersistenceMode,
15}
16
17impl EngineJournalBatcher {
18 pub(super) fn rsm_block_persistence_batch(
19 input: RsmBlockPersistenceInput<'_>,
20 ) -> Result<hypercall_db::EngineJournalRsmBlockBatch> {
21 let mut blocks = Vec::new();
22 let mode = match input.mode {
23 RsmBlockPersistenceMode::Live => hypercall_db::EngineJournalRsmPersistenceMode::Live,
24 RsmBlockPersistenceMode::Replay => {
25 hypercall_db::EngineJournalRsmPersistenceMode::Replay
26 }
27 };
28
29 for (index, record) in input.records.iter().enumerate() {
30 let Some(header) = record.block_header.as_ref() else {
31 continue;
32 };
33 if header.command_count == 0 {
34 continue;
35 }
36
37 let physical_records: Vec<WalRecordWithOffset> = input
38 .records
39 .iter()
40 .skip(index)
41 .take_while(|candidate| candidate.end_offset == record.end_offset)
42 .cloned()
43 .collect();
44 let roots = match rsm_batch_roots_from_wal(&physical_records) {
45 Some(roots) => roots,
46 None if input.mode == RsmBlockPersistenceMode::Replay => {
47 warn!(
48 environment = %input.environment,
49 height = header.batch_seq,
50 "Skipping legacy RSM WAL block without engine state digest during replay"
51 );
52 continue;
53 }
54 None => anyhow::bail!(
55 "live RSM block {} is missing engine state digest",
56 header.batch_seq
57 ),
58 };
59 if roots.batch_root != header.batch_root {
60 anyhow::bail!(
61 "RSM batch root mismatch for block {}: WAL header={} computed={}",
62 header.batch_seq,
63 hex::encode(header.batch_root),
64 hex::encode(roots.batch_root)
65 );
66 }
67
68 let block_hash = header.block_hash();
69 let command_range_end = header
70 .first_seq
71 .checked_add(header.command_count - 1)
72 .ok_or_else(|| anyhow!("RSM command range overflow"))?;
73 let root_summary = hypercall_db::NewValidatorRsmRootSummary {
74 environment: input.environment,
75 version: header.batch_seq,
76 batch_seq: header.batch_seq,
77 state_root: roots.state_root,
78 risk_root: roots.risk_root,
79 command_mmr_root: roots.command_mmr_root,
80 obligation_mmr_root: roots.obligation_mmr_root,
81 intent_mmr_root: roots.intent_mmr_root,
82 batch_root: roots.batch_root,
83 command_range_start: header.first_seq,
84 command_range_end,
85 command_count: header.command_count,
86 schema_version: 1,
87 accepted_block_hash: block_hash,
88 };
89
90 let signer = (header.signer != [0u8; 20]).then_some(header.signer);
91 let signature = (!header.signature.is_empty()).then_some(header.signature.clone());
92 let block_header = hypercall_db::NewRsmBlockHeader {
93 environment: input.environment,
94 height: header.batch_seq,
95 hash: block_hash,
96 parent_hash: header.prev_block_hash,
97 commands_hash: header.commands_hash,
98 batch_root: header.batch_root,
99 command_count: header.command_count,
100 first_command_seq: header.first_seq,
101 last_command_seq: command_range_end,
102 signer,
103 signature,
104 };
105
106 let commands =
107 rsm_block_commands_from_wal(header.batch_seq, header.first_seq, &physical_records)?;
108 blocks.push(hypercall_db::EngineJournalRsmBlockInput {
109 root_summary,
110 header: block_header,
111 commands,
112 });
113 }
114
115 Ok(hypercall_db::EngineJournalRsmBlockBatch { mode, blocks })
116 }
117}
118
119fn rsm_block_commands_from_wal(
120 height: u64,
121 first_command_seq: u64,
122 records: &[WalRecordWithOffset],
123) -> Result<Vec<hypercall_db::EngineJournalRsmBlockCommandInput>> {
124 let engine_records: Vec<&WalRecordWithOffset> = records
125 .iter()
126 .filter(|record| record.record.command_type_enum.is_some())
127 .collect();
128 if engine_records.is_empty() {
129 return Ok(vec![]);
130 }
131
132 engine_records
133 .into_iter()
134 .enumerate()
135 .map(|(index, record)| {
136 let command_type = record.record.command_type_enum.ok_or_else(|| {
137 anyhow!(
138 "missing command type for RSM block {} command {}",
139 height,
140 index
141 )
142 })?;
143 let command_index = u64::try_from(index).context("RSM block command index overflow")?;
144 let rsm_command_seq = first_command_seq
145 .checked_add(command_index)
146 .ok_or_else(|| anyhow!("RSM command sequence overflow"))?;
147
148 Ok(hypercall_db::EngineJournalRsmBlockCommandInput {
149 rsm_command_seq,
150 command_index,
151 request_uuid: record.record.request_uuid.0,
152 command_type: command_type.as_str().to_string(),
153 command_data: record.record.command_data.clone(),
154 command_identity_hash: record.record.command_identity_hash,
155 })
156 })
157 .collect()
158}