Skip to main content

hypercall/rsm/
commitment_store.rs

1use std::path::{Path, PathBuf};
2
3use anyhow::{Context, Result};
4use diesel::prelude::*;
5use diesel::sql_types::{BigInt, SmallInt};
6use hypercall_db::ValidatorRsmStateReader;
7use hypercall_state_commitment::mmr::HypercallMmr;
8use hypercall_state_commitment::pipeline::{
9    BatchCommitment, CommitmentPipeline, PreparedBatchCommitment, StateDelta,
10};
11use hypercall_state_commitment::rocks_store::{RocksDbMmrStore, RocksDbStore};
12use hypercall_state_commitment::state_tree::new_jmt;
13
14const STATE_TREE_DIR: &str = "state-jmt";
15const RISK_TREE_DIR: &str = "risk-jmt";
16const COMMAND_MMR_DIR: &str = "command-mmr";
17const OBLIGATION_MMR_DIR: &str = "obligation-mmr";
18const INTENT_MMR_DIR: &str = "intent-mmr";
19
20#[derive(Debug, Clone)]
21pub struct ValidatorRsmCommitmentStoreConfig {
22    pub root_dir: PathBuf,
23    pub prune_window: u64,
24    pub pruning_enabled: bool,
25}
26
27/// Opens the local commitment-tree stores used to compute validator RSM roots.
28///
29/// This is intentionally separate from Postgres root-summary storage. RocksDB
30/// stores JMT node/value data for fast incremental root computation and proof
31/// generation. Postgres stores accepted root summaries and the current pointer.
32#[derive(Debug, Clone)]
33pub struct ValidatorRsmCommitmentStore {
34    root_dir: PathBuf,
35    prune_window: u64,
36    pruning_enabled: bool,
37}
38
39impl ValidatorRsmCommitmentStore {
40    pub fn open(config: ValidatorRsmCommitmentStoreConfig) -> Result<Self> {
41        std::fs::create_dir_all(&config.root_dir).with_context(|| {
42            format!(
43                "failed to create validator RSM commitment root {}",
44                config.root_dir.display()
45            )
46        })?;
47        Ok(Self {
48            root_dir: config.root_dir,
49            prune_window: config.prune_window,
50            pruning_enabled: config.pruning_enabled,
51        })
52    }
53
54    pub fn open_pipeline(
55        &self,
56        version: u64,
57    ) -> Result<CommitmentPipeline<RocksDbStore, RocksDbMmrStore>> {
58        let state_store = open_store(self.root_dir.join(STATE_TREE_DIR))?;
59        let risk_store = open_store(self.root_dir.join(RISK_TREE_DIR))?;
60        let command_mmr = open_mmr_store(self.root_dir.join(COMMAND_MMR_DIR))?;
61        let obligation_mmr = open_mmr_store(self.root_dir.join(OBLIGATION_MMR_DIR))?;
62        let intent_mmr = open_mmr_store(self.root_dir.join(INTENT_MMR_DIR))?;
63        Ok(CommitmentPipeline::new_with_mmrs(
64            state_store,
65            risk_store,
66            command_mmr,
67            obligation_mmr,
68            intent_mmr,
69            self.prune_window,
70        )?
71        .with_version(version))
72    }
73
74    pub fn root_dir(&self) -> &Path {
75        &self.root_dir
76    }
77
78    fn ensure_empty_for_genesis(&self) -> Result<()> {
79        let state_store = open_store(self.root_dir.join(STATE_TREE_DIR))?;
80        let risk_store = open_store(self.root_dir.join(RISK_TREE_DIR))?;
81        let command_mmr = open_mmr_store(self.root_dir.join(COMMAND_MMR_DIR))?;
82        let obligation_mmr = open_mmr_store(self.root_dir.join(OBLIGATION_MMR_DIR))?;
83        let intent_mmr = open_mmr_store(self.root_dir.join(INTENT_MMR_DIR))?;
84
85        if !state_store.is_empty()? {
86            anyhow::bail!("validator RSM DB is empty but local state JMT store is not empty");
87        }
88        if !risk_store.is_empty()? {
89            anyhow::bail!("validator RSM DB is empty but local risk JMT store is not empty");
90        }
91        if !command_mmr.is_empty()? {
92            anyhow::bail!("validator RSM DB is empty but local command MMR store is not empty");
93        }
94        if !obligation_mmr.is_empty()? {
95            anyhow::bail!("validator RSM DB is empty but local obligation MMR store is not empty");
96        }
97        if !intent_mmr.is_empty()? {
98            anyhow::bail!("validator RSM DB is empty but local intent MMR store is not empty");
99        }
100        Ok(())
101    }
102
103    pub fn pruning_enabled(&self) -> bool {
104        self.pruning_enabled
105    }
106
107    fn local_next_version(&self) -> Result<Option<u64>> {
108        let state_store = open_store(self.root_dir.join(STATE_TREE_DIR))?;
109        let risk_store = open_store(self.root_dir.join(RISK_TREE_DIR))?;
110        let state_version = state_store.latest_root_version()?;
111        let risk_version = risk_store.latest_root_version()?;
112        match (state_version, risk_version) {
113            (Some(state), Some(risk)) if state != risk => {
114                anyhow::bail!(
115                    "validator RSM local state/risk stores disagree on latest version: state={} risk={}",
116                    state,
117                    risk
118                );
119            }
120            (Some(version), _) | (_, Some(version)) => Ok(Some(version + 1)),
121            (None, None) => Ok(None),
122        }
123    }
124}
125
126/// Runtime-owned durable state commitment pipeline for validator RSM blocks.
127///
128/// This owns the RocksDB-backed JMT stores and keeps block production on the
129/// durable commitment path. Postgres remains the accepted-block index and
130/// current pointer. RocksDB remains the source for incremental JMT roots.
131pub struct ValidatorRsmStateCommitmentRuntime {
132    store: ValidatorRsmCommitmentStore,
133    pipeline: CommitmentPipeline<RocksDbStore, RocksDbMmrStore>,
134}
135
136impl ValidatorRsmStateCommitmentRuntime {
137    pub fn open_from_db(
138        config: ValidatorRsmCommitmentStoreConfig,
139        pool: std::sync::Arc<hypercall_db_diesel::DbPool>,
140        environment: hypercall_db::ValidatorRsmEnvironment,
141    ) -> Result<Self> {
142        let handler = hypercall_db_diesel::DatabaseHandler::with_pool_no_migrations(pool.clone());
143        let current = handler
144            .get_validator_rsm_current_state_sync(environment)
145            .context("failed to read validator RSM current pointer")?;
146        let block_count = validator_rsm_block_count(&pool, environment)
147            .context("failed to count validator RSM blocks")?;
148        let root_summary_count = validator_rsm_root_summary_count(&pool, environment)
149            .context("failed to count validator RSM root summaries")?;
150        let latest_block = handler
151            .get_latest_rsm_block_sync(environment)
152            .context("failed to read latest validator RSM block")?;
153
154        match (current, latest_block) {
155            (Some(current), Some(latest)) => {
156                if latest.root_summary.version != current.current_version {
157                    anyhow::bail!(
158                        "validator RSM current pointer {} disagrees with latest block version {}",
159                        current.current_version,
160                        latest.root_summary.version
161                    );
162                }
163                let store = ValidatorRsmCommitmentStore::open(config)?;
164                let db_next_version = current.current_version + 1;
165                let local_next_version = store.local_next_version()?.unwrap_or(db_next_version);
166                if local_next_version < db_next_version {
167                    anyhow::bail!(
168                        "validator RSM local commitment store is behind DB: local next version {} but DB next version is {}",
169                        local_next_version,
170                        db_next_version
171                    );
172                }
173                let local_ahead_of_db = local_next_version > db_next_version;
174                verify_persisted_roots(
175                    &store,
176                    current.current_version,
177                    &latest.root_summary,
178                    local_ahead_of_db,
179                )?;
180                return Self::open_at_version(store, local_next_version);
181            }
182            (Some(current), None) => {
183                anyhow::bail!(
184                    "validator RSM current pointer {} exists but accepted block is missing",
185                    current.current_version
186                );
187            }
188            (None, Some(_)) => {
189                anyhow::bail!("validator RSM blocks exist but current pointer is missing");
190            }
191            (None, None) if block_count == 0 && root_summary_count == 0 => {
192                let store = ValidatorRsmCommitmentStore::open(config)?;
193                store.ensure_empty_for_genesis()?;
194                return Self::open_at_version(store, 0);
195            }
196            (None, None) => {
197                anyhow::bail!("validator RSM accepted state exists but current pointer is missing");
198            }
199        }
200    }
201
202    fn open_at_version(store: ValidatorRsmCommitmentStore, version: u64) -> Result<Self> {
203        let pipeline = store.open_pipeline(version)?;
204        Ok(Self { store, pipeline })
205    }
206
207    pub fn commit_rsm_batch(&mut self, delta: &StateDelta) -> Result<BatchCommitment> {
208        let prepared = self.prepare_rsm_batch(delta)?;
209        self.apply_prepared_rsm_batch(prepared)
210    }
211
212    pub fn prepare_rsm_batch(&self, delta: &StateDelta) -> Result<PreparedBatchCommitment> {
213        self.pipeline.prepare_commit(delta)
214    }
215
216    pub fn next_batch_version(&self) -> u64 {
217        self.pipeline.version()
218    }
219
220    pub fn apply_prepared_rsm_batch(
221        &mut self,
222        prepared: PreparedBatchCommitment,
223    ) -> Result<BatchCommitment> {
224        let commitment = self.pipeline.apply_prepared_commit(prepared)?;
225        if commitment.state_leaves_updated == 0 && commitment.version > 0 {
226            self.pipeline
227                .checkpoint_inherited_state_root(commitment.version)?;
228        }
229        if commitment.risk_leaves_updated == 0 && commitment.version > 0 {
230            self.pipeline
231                .checkpoint_inherited_risk_root(commitment.version)?;
232        }
233        if self.store.pruning_enabled() && commitment.version >= self.store.prune_window {
234            let prune_to = commitment.version - self.store.prune_window;
235            let (state_pruned, risk_pruned) = self.pipeline.prune_stores(prune_to)?;
236            if self.store.prune_window != 0
237                && commitment.version % self.store.prune_window == 0
238                && (state_pruned > 0 || risk_pruned > 0)
239            {
240                self.pipeline.compact_stores()?;
241            }
242        }
243        Ok(commitment)
244    }
245}
246
247#[derive(QueryableByName)]
248struct ValidatorRsmBlockCountRow {
249    #[diesel(sql_type = BigInt)]
250    count: i64,
251}
252
253fn validator_rsm_block_count(
254    pool: &hypercall_db_diesel::DbPool,
255    environment: hypercall_db::ValidatorRsmEnvironment,
256) -> Result<i64> {
257    let mut conn = pool.get().context("failed to get diesel connection")?;
258    let row = diesel::sql_query(
259        "SELECT COUNT(*)::BIGINT AS count
260         FROM validator_rsm_blocks
261         WHERE environment = $1",
262    )
263    .bind::<SmallInt, _>(environment.as_i16())
264    .get_result::<ValidatorRsmBlockCountRow>(&mut conn)?;
265    Ok(row.count)
266}
267
268fn validator_rsm_root_summary_count(
269    pool: &hypercall_db_diesel::DbPool,
270    environment: hypercall_db::ValidatorRsmEnvironment,
271) -> Result<i64> {
272    let mut conn = pool.get().context("failed to get diesel connection")?;
273    let row = diesel::sql_query(
274        "SELECT COUNT(*)::BIGINT AS count
275         FROM validator_rsm_batch_roots
276         WHERE environment = $1",
277    )
278    .bind::<SmallInt, _>(environment.as_i16())
279    .get_result::<ValidatorRsmBlockCountRow>(&mut conn)?;
280    Ok(row.count)
281}
282
283fn verify_persisted_roots(
284    store: &ValidatorRsmCommitmentStore,
285    version: u64,
286    summary: &hypercall_db::ValidatorRsmRootSummary,
287    allow_mmr_ahead: bool,
288) -> Result<()> {
289    let state_store = open_store(store.root_dir.join(STATE_TREE_DIR))?;
290    let risk_store = open_store(store.root_dir.join(RISK_TREE_DIR))?;
291    let state_root = new_jmt(&state_store).get_root_hash_option(version)?;
292    let risk_root = new_jmt(&risk_store).get_root_hash_option(version)?;
293    match state_root {
294        Some(root) if root.0 != summary.state_root => {
295            anyhow::bail!(
296                "persisted state JMT root mismatch for version {}: db={} rocksdb={}",
297                version,
298                hex::encode(summary.state_root),
299                hex::encode(root.0)
300            );
301        }
302        Some(_) => {}
303        None if summary.state_root == [0u8; 32] => {}
304        None => anyhow::bail!("missing persisted state JMT root for version {version}"),
305    }
306    match risk_root {
307        Some(root) if root.0 != summary.risk_root => {
308            anyhow::bail!(
309                "persisted risk JMT root mismatch for version {}: db={} rocksdb={}",
310                version,
311                hex::encode(summary.risk_root),
312                hex::encode(root.0)
313            );
314        }
315        Some(_) => {}
316        None if summary.risk_root == [0u8; 32] => {}
317        None => anyhow::bail!("missing persisted risk JMT root for version {version}"),
318    }
319    if !allow_mmr_ahead {
320        verify_mmr_root(
321            "command",
322            open_mmr_store(store.root_dir.join(COMMAND_MMR_DIR))?,
323            summary.command_mmr_root,
324        )?;
325        verify_mmr_root(
326            "obligation",
327            open_mmr_store(store.root_dir.join(OBLIGATION_MMR_DIR))?,
328            summary.obligation_mmr_root,
329        )?;
330        verify_mmr_root(
331            "intent",
332            open_mmr_store(store.root_dir.join(INTENT_MMR_DIR))?,
333            summary.intent_mmr_root,
334        )?;
335    }
336    Ok(())
337}
338
339fn verify_mmr_root(name: &str, mmr_store: RocksDbMmrStore, expected: [u8; 32]) -> Result<()> {
340    let mmr = HypercallMmr::from_store(mmr_store)?;
341    let root = mmr.peaks_hash();
342    if root.0 != expected {
343        anyhow::bail!(
344            "persisted {} MMR root mismatch: db={} rocksdb={}",
345            name,
346            hex::encode(expected),
347            hex::encode(root.0)
348        );
349    }
350    Ok(())
351}
352
353fn open_store(path: PathBuf) -> Result<RocksDbStore> {
354    std::fs::create_dir_all(&path)
355        .with_context(|| format!("failed to create commitment store {}", path.display()))?;
356    RocksDbStore::open(&path)
357        .with_context(|| format!("failed to open commitment store {}", path.display()))
358}
359
360fn open_mmr_store(path: PathBuf) -> Result<RocksDbMmrStore> {
361    std::fs::create_dir_all(&path)
362        .with_context(|| format!("failed to create MMR commitment store {}", path.display()))?;
363    RocksDbMmrStore::open(&path)
364        .with_context(|| format!("failed to open MMR commitment store {}", path.display()))
365}
366
367#[cfg(test)]
368mod tests {
369    use super::*;
370    use hypercall_state_commitment::leaves::{leaf_to_bytes, AccountLeaf, RiskLeaf};
371    use hypercall_state_commitment::pipeline::{AccountUpdate, RiskUpdate, StateDelta};
372    use hypercall_state_commitment::state_tree::new_jmt;
373    use hypercall_state_commitment::StateKey;
374
375    #[test]
376    fn opens_rocksdb_backed_pipeline_at_version() {
377        let dir = tempfile::tempdir().unwrap();
378        let store = ValidatorRsmCommitmentStore::open(ValidatorRsmCommitmentStoreConfig {
379            root_dir: dir.path().join("validator-rsm"),
380            prune_window: 100,
381            pruning_enabled: false,
382        })
383        .unwrap();
384
385        let pipeline = store.open_pipeline(7).unwrap();
386        assert_eq!(pipeline.version(), 7);
387        assert!(store.root_dir().join(STATE_TREE_DIR).exists());
388        assert!(store.root_dir().join(RISK_TREE_DIR).exists());
389        assert!(store.root_dir().join(COMMAND_MMR_DIR).exists());
390        assert!(store.root_dir().join(OBLIGATION_MMR_DIR).exists());
391        assert!(store.root_dir().join(INTENT_MMR_DIR).exists());
392    }
393
394    #[test]
395    fn pipeline_reopens_with_persisted_mmr_roots() {
396        let dir = tempfile::tempdir().unwrap();
397        let store = ValidatorRsmCommitmentStore::open(ValidatorRsmCommitmentStoreConfig {
398            root_dir: dir.path().join("validator-rsm"),
399            prune_window: 100,
400            pruning_enabled: false,
401        })
402        .unwrap();
403
404        let first = {
405            let mut pipeline = store.open_pipeline(0).unwrap();
406            pipeline
407                .commit(&StateDelta {
408                    commands: vec![[0x11; 32], [0x22; 32]],
409                    obligations: vec![[0x33; 32]],
410                    intents: vec![[0x44; 32]],
411                    ..Default::default()
412                })
413                .unwrap()
414        };
415
416        let second = {
417            let mut pipeline = store.open_pipeline(1).unwrap();
418            assert_eq!(pipeline.version(), 1);
419            pipeline.commit(&StateDelta::default()).unwrap()
420        };
421
422        assert_eq!(second.command_mmr_root, first.command_mmr_root);
423        assert_eq!(second.obligation_mmr_root, first.obligation_mmr_root);
424        assert_eq!(second.intent_mmr_root, first.intent_mmr_root);
425    }
426
427    #[test]
428    fn empty_db_rejects_stale_local_commitment_store() {
429        let dir = tempfile::tempdir().unwrap();
430        let store = ValidatorRsmCommitmentStore::open(ValidatorRsmCommitmentStoreConfig {
431            root_dir: dir.path().join("validator-rsm"),
432            prune_window: 100,
433            pruning_enabled: false,
434        })
435        .unwrap();
436
437        store.ensure_empty_for_genesis().unwrap();
438        {
439            let mut pipeline = store.open_pipeline(0).unwrap();
440            pipeline
441                .commit(&StateDelta {
442                    commands: vec![[0x11; 32]],
443                    ..Default::default()
444                })
445                .unwrap();
446        }
447
448        let error = store.ensure_empty_for_genesis().unwrap_err().to_string();
449        assert!(
450            error.contains("local command MMR store is not empty"),
451            "{error}"
452        );
453    }
454
455    #[test]
456    fn runtime_checkpoints_inherited_roots_for_empty_delta() {
457        let dir = tempfile::tempdir().unwrap();
458        let store = ValidatorRsmCommitmentStore::open(ValidatorRsmCommitmentStoreConfig {
459            root_dir: dir.path().join("validator-rsm"),
460            prune_window: 100,
461            pruning_enabled: false,
462        })
463        .unwrap();
464
465        let mut runtime =
466            ValidatorRsmStateCommitmentRuntime::open_at_version(store.clone(), 0).unwrap();
467        let first = runtime
468            .commit_rsm_batch(&StateDelta {
469                accounts: vec![AccountUpdate {
470                    address: [0x11; 20],
471                    leaf: AccountLeaf {
472                        cash: 1_000_000,
473                        nonce: 1,
474                        margin_mode: 0,
475                        liquidation_state: 0,
476                    },
477                }],
478                risk: vec![RiskUpdate {
479                    address: [0x11; 20],
480                    leaf: RiskLeaf {
481                        equity: 1_000_000,
482                        initial_margin: 100_000,
483                        maintenance_margin: 50_000,
484                        scanning_risk: 75_000,
485                        health_nonce: 1,
486                    },
487                }],
488                ..Default::default()
489            })
490            .unwrap();
491        let second = runtime.commit_rsm_batch(&StateDelta::default()).unwrap();
492
493        assert_eq!(second.state_root, first.state_root);
494        assert_eq!(second.risk_root, first.risk_root);
495        drop(runtime);
496        let state_store = open_store(store.root_dir().join(STATE_TREE_DIR)).unwrap();
497        let risk_store = open_store(store.root_dir().join(RISK_TREE_DIR)).unwrap();
498        let persisted_state = new_jmt(&state_store)
499            .get_root_hash_option(second.version)
500            .unwrap()
501            .expect("empty batch should checkpoint inherited state root");
502        let persisted = new_jmt(&risk_store)
503            .get_root_hash_option(second.version)
504            .unwrap()
505            .expect("empty risk batch should checkpoint inherited root");
506        assert_eq!(persisted_state, second.state_root);
507        assert_eq!(persisted, second.risk_root);
508    }
509
510    #[test]
511    fn dormant_value_survives_stale_node_prune() {
512        let dir = tempfile::tempdir().unwrap();
513        let rocks = RocksDbStore::open(dir.path()).unwrap();
514        let tree = new_jmt(&rocks);
515        let dormant_key = StateKey::account(&[0x11; 20]);
516        let active_key = StateKey::account(&[0x22; 20]);
517        let dormant_value = leaf_to_bytes(&AccountLeaf {
518            cash: 42,
519            nonce: 1,
520            margin_mode: 0,
521            liquidation_state: 0,
522        });
523
524        let (_, batch) = tree
525            .put_value_set(vec![(dormant_key, Some(dormant_value.clone()))], 0)
526            .unwrap();
527        rocks.write_tree_update_batch(batch).unwrap();
528
529        for version in 1..20 {
530            let value = leaf_to_bytes(&AccountLeaf {
531                cash: version as i128,
532                nonce: version,
533                margin_mode: 0,
534                liquidation_state: 0,
535            });
536            let (_, batch) = tree
537                .put_value_set(vec![(active_key, Some(value))], version)
538                .unwrap();
539            rocks.write_tree_update_batch(batch).unwrap();
540        }
541
542        let (pruned_nodes, pruned_values) = rocks.prune(10).unwrap();
543        assert!(pruned_nodes > 0);
544
545        #[cfg(not(feature = "rsm-prune-values"))]
546        {
547            assert_eq!(pruned_values, 0, "value pruning requires checkpoints");
548
549            let (value, proof) = tree.get_with_proof(dormant_key, 19).unwrap();
550            assert_eq!(value, Some(dormant_value));
551            let root = tree.get_root_hash(19).unwrap();
552            assert!(proof.verify(root, dormant_key, value.as_ref()).is_ok());
553        }
554
555        #[cfg(feature = "rsm-prune-values")]
556        assert!(
557            pruned_values > 0,
558            "experimental value pruning should delete old value rows"
559        );
560    }
561}