Skip to main content

hypercall/journal/engine_journal_batcher/
rsm_block_persistence.rs

1use super::{rsm_batch_roots_from_wal, EngineJournalBatcher, WalRecordWithOffset};
2use anyhow::{anyhow, Context, Result};
3use tracing::warn;
4
5#[derive(Debug, Clone, Copy, PartialEq, Eq)]
6pub(super) enum RsmBlockPersistenceMode {
7    Live,
8    Replay,
9}
10
11pub(super) struct RsmBlockPersistenceInput<'a> {
12    pub(super) environment: hypercall_db::ValidatorRsmEnvironment,
13    pub(super) records: &'a [WalRecordWithOffset],
14    pub(super) mode: RsmBlockPersistenceMode,
15}
16
17impl EngineJournalBatcher {
18    pub(super) fn rsm_block_persistence_batch(
19        input: RsmBlockPersistenceInput<'_>,
20    ) -> Result<hypercall_db::EngineJournalRsmBlockBatch> {
21        let mut blocks = Vec::new();
22        let mode = match input.mode {
23            RsmBlockPersistenceMode::Live => hypercall_db::EngineJournalRsmPersistenceMode::Live,
24            RsmBlockPersistenceMode::Replay => {
25                hypercall_db::EngineJournalRsmPersistenceMode::Replay
26            }
27        };
28
29        for (index, record) in input.records.iter().enumerate() {
30            let Some(header) = record.block_header.as_ref() else {
31                continue;
32            };
33            if header.command_count == 0 {
34                continue;
35            }
36
37            let physical_records: Vec<WalRecordWithOffset> = input
38                .records
39                .iter()
40                .skip(index)
41                .take_while(|candidate| candidate.end_offset == record.end_offset)
42                .cloned()
43                .collect();
44            let roots = match rsm_batch_roots_from_wal(&physical_records) {
45                Some(roots) => roots,
46                None if input.mode == RsmBlockPersistenceMode::Replay => {
47                    warn!(
48                        environment = %input.environment,
49                        height = header.batch_seq,
50                        "Skipping legacy RSM WAL block without engine state digest during replay"
51                    );
52                    continue;
53                }
54                None => anyhow::bail!(
55                    "live RSM block {} is missing engine state digest",
56                    header.batch_seq
57                ),
58            };
59            if roots.batch_root != header.batch_root {
60                anyhow::bail!(
61                    "RSM batch root mismatch for block {}: WAL header={} computed={}",
62                    header.batch_seq,
63                    hex::encode(header.batch_root),
64                    hex::encode(roots.batch_root)
65                );
66            }
67
68            let block_hash = header.block_hash();
69            let command_range_end = header
70                .first_seq
71                .checked_add(header.command_count - 1)
72                .ok_or_else(|| anyhow!("RSM command range overflow"))?;
73            let root_summary = hypercall_db::NewValidatorRsmRootSummary {
74                environment: input.environment,
75                version: header.batch_seq,
76                batch_seq: header.batch_seq,
77                state_root: roots.state_root,
78                risk_root: roots.risk_root,
79                command_mmr_root: roots.command_mmr_root,
80                obligation_mmr_root: roots.obligation_mmr_root,
81                intent_mmr_root: roots.intent_mmr_root,
82                batch_root: roots.batch_root,
83                command_range_start: header.first_seq,
84                command_range_end,
85                command_count: header.command_count,
86                schema_version: 1,
87                accepted_block_hash: block_hash,
88            };
89
90            let signer = (header.signer != [0u8; 20]).then_some(header.signer);
91            let signature = (!header.signature.is_empty()).then_some(header.signature.clone());
92            let block_header = hypercall_db::NewRsmBlockHeader {
93                environment: input.environment,
94                height: header.batch_seq,
95                hash: block_hash,
96                parent_hash: header.prev_block_hash,
97                commands_hash: header.commands_hash,
98                batch_root: header.batch_root,
99                command_count: header.command_count,
100                first_command_seq: header.first_seq,
101                last_command_seq: command_range_end,
102                signer,
103                signature,
104            };
105
106            let commands =
107                rsm_block_commands_from_wal(header.batch_seq, header.first_seq, &physical_records)?;
108            blocks.push(hypercall_db::EngineJournalRsmBlockInput {
109                root_summary,
110                header: block_header,
111                commands,
112            });
113        }
114
115        Ok(hypercall_db::EngineJournalRsmBlockBatch { mode, blocks })
116    }
117}
118
119fn rsm_block_commands_from_wal(
120    height: u64,
121    first_command_seq: u64,
122    records: &[WalRecordWithOffset],
123) -> Result<Vec<hypercall_db::EngineJournalRsmBlockCommandInput>> {
124    let engine_records: Vec<&WalRecordWithOffset> = records
125        .iter()
126        .filter(|record| record.record.command_type_enum.is_some())
127        .collect();
128    if engine_records.is_empty() {
129        return Ok(vec![]);
130    }
131
132    engine_records
133        .into_iter()
134        .enumerate()
135        .map(|(index, record)| {
136            let command_type = record.record.command_type_enum.ok_or_else(|| {
137                anyhow!(
138                    "missing command type for RSM block {} command {}",
139                    height,
140                    index
141                )
142            })?;
143            let command_index = u64::try_from(index).context("RSM block command index overflow")?;
144            let rsm_command_seq = first_command_seq
145                .checked_add(command_index)
146                .ok_or_else(|| anyhow!("RSM command sequence overflow"))?;
147
148            Ok(hypercall_db::EngineJournalRsmBlockCommandInput {
149                rsm_command_seq,
150                command_index,
151                request_uuid: record.record.request_uuid.0,
152                command_type: command_type.as_str().to_string(),
153                command_data: record.record.command_data.clone(),
154                command_identity_hash: record.record.command_identity_hash,
155            })
156        })
157        .collect()
158}