1use std::path::{Path, PathBuf};
2
3use anyhow::{Context, Result};
4use diesel::prelude::*;
5use diesel::sql_types::{BigInt, SmallInt};
6use hypercall_db::ValidatorRsmStateReader;
7use hypercall_state_commitment::mmr::HypercallMmr;
8use hypercall_state_commitment::pipeline::{
9 BatchCommitment, CommitmentPipeline, PreparedBatchCommitment, StateDelta,
10};
11use hypercall_state_commitment::rocks_store::{RocksDbMmrStore, RocksDbStore};
12use hypercall_state_commitment::state_tree::new_jmt;
13
14const STATE_TREE_DIR: &str = "state-jmt";
15const RISK_TREE_DIR: &str = "risk-jmt";
16const COMMAND_MMR_DIR: &str = "command-mmr";
17const OBLIGATION_MMR_DIR: &str = "obligation-mmr";
18const INTENT_MMR_DIR: &str = "intent-mmr";
19
20#[derive(Debug, Clone)]
21pub struct ValidatorRsmCommitmentStoreConfig {
22 pub root_dir: PathBuf,
23 pub prune_window: u64,
24 pub pruning_enabled: bool,
25}
26
27#[derive(Debug, Clone)]
33pub struct ValidatorRsmCommitmentStore {
34 root_dir: PathBuf,
35 prune_window: u64,
36 pruning_enabled: bool,
37}
38
39impl ValidatorRsmCommitmentStore {
40 pub fn open(config: ValidatorRsmCommitmentStoreConfig) -> Result<Self> {
41 std::fs::create_dir_all(&config.root_dir).with_context(|| {
42 format!(
43 "failed to create validator RSM commitment root {}",
44 config.root_dir.display()
45 )
46 })?;
47 Ok(Self {
48 root_dir: config.root_dir,
49 prune_window: config.prune_window,
50 pruning_enabled: config.pruning_enabled,
51 })
52 }
53
54 pub fn open_pipeline(
55 &self,
56 version: u64,
57 ) -> Result<CommitmentPipeline<RocksDbStore, RocksDbMmrStore>> {
58 let state_store = open_store(self.root_dir.join(STATE_TREE_DIR))?;
59 let risk_store = open_store(self.root_dir.join(RISK_TREE_DIR))?;
60 let command_mmr = open_mmr_store(self.root_dir.join(COMMAND_MMR_DIR))?;
61 let obligation_mmr = open_mmr_store(self.root_dir.join(OBLIGATION_MMR_DIR))?;
62 let intent_mmr = open_mmr_store(self.root_dir.join(INTENT_MMR_DIR))?;
63 Ok(CommitmentPipeline::new_with_mmrs(
64 state_store,
65 risk_store,
66 command_mmr,
67 obligation_mmr,
68 intent_mmr,
69 self.prune_window,
70 )?
71 .with_version(version))
72 }
73
74 pub fn root_dir(&self) -> &Path {
75 &self.root_dir
76 }
77
78 fn ensure_empty_for_genesis(&self) -> Result<()> {
79 let state_store = open_store(self.root_dir.join(STATE_TREE_DIR))?;
80 let risk_store = open_store(self.root_dir.join(RISK_TREE_DIR))?;
81 let command_mmr = open_mmr_store(self.root_dir.join(COMMAND_MMR_DIR))?;
82 let obligation_mmr = open_mmr_store(self.root_dir.join(OBLIGATION_MMR_DIR))?;
83 let intent_mmr = open_mmr_store(self.root_dir.join(INTENT_MMR_DIR))?;
84
85 if !state_store.is_empty()? {
86 anyhow::bail!("validator RSM DB is empty but local state JMT store is not empty");
87 }
88 if !risk_store.is_empty()? {
89 anyhow::bail!("validator RSM DB is empty but local risk JMT store is not empty");
90 }
91 if !command_mmr.is_empty()? {
92 anyhow::bail!("validator RSM DB is empty but local command MMR store is not empty");
93 }
94 if !obligation_mmr.is_empty()? {
95 anyhow::bail!("validator RSM DB is empty but local obligation MMR store is not empty");
96 }
97 if !intent_mmr.is_empty()? {
98 anyhow::bail!("validator RSM DB is empty but local intent MMR store is not empty");
99 }
100 Ok(())
101 }
102
103 pub fn pruning_enabled(&self) -> bool {
104 self.pruning_enabled
105 }
106
107 fn local_next_version(&self) -> Result<Option<u64>> {
108 let state_store = open_store(self.root_dir.join(STATE_TREE_DIR))?;
109 let risk_store = open_store(self.root_dir.join(RISK_TREE_DIR))?;
110 let state_version = state_store.latest_root_version()?;
111 let risk_version = risk_store.latest_root_version()?;
112 match (state_version, risk_version) {
113 (Some(state), Some(risk)) if state != risk => {
114 anyhow::bail!(
115 "validator RSM local state/risk stores disagree on latest version: state={} risk={}",
116 state,
117 risk
118 );
119 }
120 (Some(version), _) | (_, Some(version)) => Ok(Some(version + 1)),
121 (None, None) => Ok(None),
122 }
123 }
124}
125
126pub struct ValidatorRsmStateCommitmentRuntime {
132 store: ValidatorRsmCommitmentStore,
133 pipeline: CommitmentPipeline<RocksDbStore, RocksDbMmrStore>,
134}
135
136impl ValidatorRsmStateCommitmentRuntime {
137 pub fn open_from_db(
138 config: ValidatorRsmCommitmentStoreConfig,
139 pool: std::sync::Arc<hypercall_db_diesel::DbPool>,
140 environment: hypercall_db::ValidatorRsmEnvironment,
141 ) -> Result<Self> {
142 let handler = hypercall_db_diesel::DatabaseHandler::with_pool_no_migrations(pool.clone());
143 let current = handler
144 .get_validator_rsm_current_state_sync(environment)
145 .context("failed to read validator RSM current pointer")?;
146 let block_count = validator_rsm_block_count(&pool, environment)
147 .context("failed to count validator RSM blocks")?;
148 let root_summary_count = validator_rsm_root_summary_count(&pool, environment)
149 .context("failed to count validator RSM root summaries")?;
150 let latest_block = handler
151 .get_latest_rsm_block_sync(environment)
152 .context("failed to read latest validator RSM block")?;
153
154 match (current, latest_block) {
155 (Some(current), Some(latest)) => {
156 if latest.root_summary.version != current.current_version {
157 anyhow::bail!(
158 "validator RSM current pointer {} disagrees with latest block version {}",
159 current.current_version,
160 latest.root_summary.version
161 );
162 }
163 let store = ValidatorRsmCommitmentStore::open(config)?;
164 let db_next_version = current.current_version + 1;
165 let local_next_version = store.local_next_version()?.unwrap_or(db_next_version);
166 if local_next_version < db_next_version {
167 anyhow::bail!(
168 "validator RSM local commitment store is behind DB: local next version {} but DB next version is {}",
169 local_next_version,
170 db_next_version
171 );
172 }
173 let local_ahead_of_db = local_next_version > db_next_version;
174 verify_persisted_roots(
175 &store,
176 current.current_version,
177 &latest.root_summary,
178 local_ahead_of_db,
179 )?;
180 return Self::open_at_version(store, local_next_version);
181 }
182 (Some(current), None) => {
183 anyhow::bail!(
184 "validator RSM current pointer {} exists but accepted block is missing",
185 current.current_version
186 );
187 }
188 (None, Some(_)) => {
189 anyhow::bail!("validator RSM blocks exist but current pointer is missing");
190 }
191 (None, None) if block_count == 0 && root_summary_count == 0 => {
192 let store = ValidatorRsmCommitmentStore::open(config)?;
193 store.ensure_empty_for_genesis()?;
194 return Self::open_at_version(store, 0);
195 }
196 (None, None) => {
197 anyhow::bail!("validator RSM accepted state exists but current pointer is missing");
198 }
199 }
200 }
201
202 fn open_at_version(store: ValidatorRsmCommitmentStore, version: u64) -> Result<Self> {
203 let pipeline = store.open_pipeline(version)?;
204 Ok(Self { store, pipeline })
205 }
206
207 pub fn commit_rsm_batch(&mut self, delta: &StateDelta) -> Result<BatchCommitment> {
208 let prepared = self.prepare_rsm_batch(delta)?;
209 self.apply_prepared_rsm_batch(prepared)
210 }
211
212 pub fn prepare_rsm_batch(&self, delta: &StateDelta) -> Result<PreparedBatchCommitment> {
213 self.pipeline.prepare_commit(delta)
214 }
215
216 pub fn next_batch_version(&self) -> u64 {
217 self.pipeline.version()
218 }
219
220 pub fn apply_prepared_rsm_batch(
221 &mut self,
222 prepared: PreparedBatchCommitment,
223 ) -> Result<BatchCommitment> {
224 let commitment = self.pipeline.apply_prepared_commit(prepared)?;
225 if commitment.state_leaves_updated == 0 && commitment.version > 0 {
226 self.pipeline
227 .checkpoint_inherited_state_root(commitment.version)?;
228 }
229 if commitment.risk_leaves_updated == 0 && commitment.version > 0 {
230 self.pipeline
231 .checkpoint_inherited_risk_root(commitment.version)?;
232 }
233 if self.store.pruning_enabled() && commitment.version >= self.store.prune_window {
234 let prune_to = commitment.version - self.store.prune_window;
235 let (state_pruned, risk_pruned) = self.pipeline.prune_stores(prune_to)?;
236 if self.store.prune_window != 0
237 && commitment.version % self.store.prune_window == 0
238 && (state_pruned > 0 || risk_pruned > 0)
239 {
240 self.pipeline.compact_stores()?;
241 }
242 }
243 Ok(commitment)
244 }
245}
246
247#[derive(QueryableByName)]
248struct ValidatorRsmBlockCountRow {
249 #[diesel(sql_type = BigInt)]
250 count: i64,
251}
252
253fn validator_rsm_block_count(
254 pool: &hypercall_db_diesel::DbPool,
255 environment: hypercall_db::ValidatorRsmEnvironment,
256) -> Result<i64> {
257 let mut conn = pool.get().context("failed to get diesel connection")?;
258 let row = diesel::sql_query(
259 "SELECT COUNT(*)::BIGINT AS count
260 FROM validator_rsm_blocks
261 WHERE environment = $1",
262 )
263 .bind::<SmallInt, _>(environment.as_i16())
264 .get_result::<ValidatorRsmBlockCountRow>(&mut conn)?;
265 Ok(row.count)
266}
267
268fn validator_rsm_root_summary_count(
269 pool: &hypercall_db_diesel::DbPool,
270 environment: hypercall_db::ValidatorRsmEnvironment,
271) -> Result<i64> {
272 let mut conn = pool.get().context("failed to get diesel connection")?;
273 let row = diesel::sql_query(
274 "SELECT COUNT(*)::BIGINT AS count
275 FROM validator_rsm_batch_roots
276 WHERE environment = $1",
277 )
278 .bind::<SmallInt, _>(environment.as_i16())
279 .get_result::<ValidatorRsmBlockCountRow>(&mut conn)?;
280 Ok(row.count)
281}
282
283fn verify_persisted_roots(
284 store: &ValidatorRsmCommitmentStore,
285 version: u64,
286 summary: &hypercall_db::ValidatorRsmRootSummary,
287 allow_mmr_ahead: bool,
288) -> Result<()> {
289 let state_store = open_store(store.root_dir.join(STATE_TREE_DIR))?;
290 let risk_store = open_store(store.root_dir.join(RISK_TREE_DIR))?;
291 let state_root = new_jmt(&state_store).get_root_hash_option(version)?;
292 let risk_root = new_jmt(&risk_store).get_root_hash_option(version)?;
293 match state_root {
294 Some(root) if root.0 != summary.state_root => {
295 anyhow::bail!(
296 "persisted state JMT root mismatch for version {}: db={} rocksdb={}",
297 version,
298 hex::encode(summary.state_root),
299 hex::encode(root.0)
300 );
301 }
302 Some(_) => {}
303 None if summary.state_root == [0u8; 32] => {}
304 None => anyhow::bail!("missing persisted state JMT root for version {version}"),
305 }
306 match risk_root {
307 Some(root) if root.0 != summary.risk_root => {
308 anyhow::bail!(
309 "persisted risk JMT root mismatch for version {}: db={} rocksdb={}",
310 version,
311 hex::encode(summary.risk_root),
312 hex::encode(root.0)
313 );
314 }
315 Some(_) => {}
316 None if summary.risk_root == [0u8; 32] => {}
317 None => anyhow::bail!("missing persisted risk JMT root for version {version}"),
318 }
319 if !allow_mmr_ahead {
320 verify_mmr_root(
321 "command",
322 open_mmr_store(store.root_dir.join(COMMAND_MMR_DIR))?,
323 summary.command_mmr_root,
324 )?;
325 verify_mmr_root(
326 "obligation",
327 open_mmr_store(store.root_dir.join(OBLIGATION_MMR_DIR))?,
328 summary.obligation_mmr_root,
329 )?;
330 verify_mmr_root(
331 "intent",
332 open_mmr_store(store.root_dir.join(INTENT_MMR_DIR))?,
333 summary.intent_mmr_root,
334 )?;
335 }
336 Ok(())
337}
338
339fn verify_mmr_root(name: &str, mmr_store: RocksDbMmrStore, expected: [u8; 32]) -> Result<()> {
340 let mmr = HypercallMmr::from_store(mmr_store)?;
341 let root = mmr.peaks_hash();
342 if root.0 != expected {
343 anyhow::bail!(
344 "persisted {} MMR root mismatch: db={} rocksdb={}",
345 name,
346 hex::encode(expected),
347 hex::encode(root.0)
348 );
349 }
350 Ok(())
351}
352
353fn open_store(path: PathBuf) -> Result<RocksDbStore> {
354 std::fs::create_dir_all(&path)
355 .with_context(|| format!("failed to create commitment store {}", path.display()))?;
356 RocksDbStore::open(&path)
357 .with_context(|| format!("failed to open commitment store {}", path.display()))
358}
359
360fn open_mmr_store(path: PathBuf) -> Result<RocksDbMmrStore> {
361 std::fs::create_dir_all(&path)
362 .with_context(|| format!("failed to create MMR commitment store {}", path.display()))?;
363 RocksDbMmrStore::open(&path)
364 .with_context(|| format!("failed to open MMR commitment store {}", path.display()))
365}
366
367#[cfg(test)]
368mod tests {
369 use super::*;
370 use hypercall_state_commitment::leaves::{leaf_to_bytes, AccountLeaf, RiskLeaf};
371 use hypercall_state_commitment::pipeline::{AccountUpdate, RiskUpdate, StateDelta};
372 use hypercall_state_commitment::state_tree::new_jmt;
373 use hypercall_state_commitment::StateKey;
374
375 #[test]
376 fn opens_rocksdb_backed_pipeline_at_version() {
377 let dir = tempfile::tempdir().unwrap();
378 let store = ValidatorRsmCommitmentStore::open(ValidatorRsmCommitmentStoreConfig {
379 root_dir: dir.path().join("validator-rsm"),
380 prune_window: 100,
381 pruning_enabled: false,
382 })
383 .unwrap();
384
385 let pipeline = store.open_pipeline(7).unwrap();
386 assert_eq!(pipeline.version(), 7);
387 assert!(store.root_dir().join(STATE_TREE_DIR).exists());
388 assert!(store.root_dir().join(RISK_TREE_DIR).exists());
389 assert!(store.root_dir().join(COMMAND_MMR_DIR).exists());
390 assert!(store.root_dir().join(OBLIGATION_MMR_DIR).exists());
391 assert!(store.root_dir().join(INTENT_MMR_DIR).exists());
392 }
393
394 #[test]
395 fn pipeline_reopens_with_persisted_mmr_roots() {
396 let dir = tempfile::tempdir().unwrap();
397 let store = ValidatorRsmCommitmentStore::open(ValidatorRsmCommitmentStoreConfig {
398 root_dir: dir.path().join("validator-rsm"),
399 prune_window: 100,
400 pruning_enabled: false,
401 })
402 .unwrap();
403
404 let first = {
405 let mut pipeline = store.open_pipeline(0).unwrap();
406 pipeline
407 .commit(&StateDelta {
408 commands: vec![[0x11; 32], [0x22; 32]],
409 obligations: vec![[0x33; 32]],
410 intents: vec![[0x44; 32]],
411 ..Default::default()
412 })
413 .unwrap()
414 };
415
416 let second = {
417 let mut pipeline = store.open_pipeline(1).unwrap();
418 assert_eq!(pipeline.version(), 1);
419 pipeline.commit(&StateDelta::default()).unwrap()
420 };
421
422 assert_eq!(second.command_mmr_root, first.command_mmr_root);
423 assert_eq!(second.obligation_mmr_root, first.obligation_mmr_root);
424 assert_eq!(second.intent_mmr_root, first.intent_mmr_root);
425 }
426
427 #[test]
428 fn empty_db_rejects_stale_local_commitment_store() {
429 let dir = tempfile::tempdir().unwrap();
430 let store = ValidatorRsmCommitmentStore::open(ValidatorRsmCommitmentStoreConfig {
431 root_dir: dir.path().join("validator-rsm"),
432 prune_window: 100,
433 pruning_enabled: false,
434 })
435 .unwrap();
436
437 store.ensure_empty_for_genesis().unwrap();
438 {
439 let mut pipeline = store.open_pipeline(0).unwrap();
440 pipeline
441 .commit(&StateDelta {
442 commands: vec![[0x11; 32]],
443 ..Default::default()
444 })
445 .unwrap();
446 }
447
448 let error = store.ensure_empty_for_genesis().unwrap_err().to_string();
449 assert!(
450 error.contains("local command MMR store is not empty"),
451 "{error}"
452 );
453 }
454
455 #[test]
456 fn runtime_checkpoints_inherited_roots_for_empty_delta() {
457 let dir = tempfile::tempdir().unwrap();
458 let store = ValidatorRsmCommitmentStore::open(ValidatorRsmCommitmentStoreConfig {
459 root_dir: dir.path().join("validator-rsm"),
460 prune_window: 100,
461 pruning_enabled: false,
462 })
463 .unwrap();
464
465 let mut runtime =
466 ValidatorRsmStateCommitmentRuntime::open_at_version(store.clone(), 0).unwrap();
467 let first = runtime
468 .commit_rsm_batch(&StateDelta {
469 accounts: vec![AccountUpdate {
470 address: [0x11; 20],
471 leaf: AccountLeaf {
472 cash: 1_000_000,
473 nonce: 1,
474 margin_mode: 0,
475 liquidation_state: 0,
476 },
477 }],
478 risk: vec![RiskUpdate {
479 address: [0x11; 20],
480 leaf: RiskLeaf {
481 equity: 1_000_000,
482 initial_margin: 100_000,
483 maintenance_margin: 50_000,
484 scanning_risk: 75_000,
485 health_nonce: 1,
486 },
487 }],
488 ..Default::default()
489 })
490 .unwrap();
491 let second = runtime.commit_rsm_batch(&StateDelta::default()).unwrap();
492
493 assert_eq!(second.state_root, first.state_root);
494 assert_eq!(second.risk_root, first.risk_root);
495 drop(runtime);
496 let state_store = open_store(store.root_dir().join(STATE_TREE_DIR)).unwrap();
497 let risk_store = open_store(store.root_dir().join(RISK_TREE_DIR)).unwrap();
498 let persisted_state = new_jmt(&state_store)
499 .get_root_hash_option(second.version)
500 .unwrap()
501 .expect("empty batch should checkpoint inherited state root");
502 let persisted = new_jmt(&risk_store)
503 .get_root_hash_option(second.version)
504 .unwrap()
505 .expect("empty risk batch should checkpoint inherited root");
506 assert_eq!(persisted_state, second.state_root);
507 assert_eq!(persisted, second.risk_root);
508 }
509
510 #[test]
511 fn dormant_value_survives_stale_node_prune() {
512 let dir = tempfile::tempdir().unwrap();
513 let rocks = RocksDbStore::open(dir.path()).unwrap();
514 let tree = new_jmt(&rocks);
515 let dormant_key = StateKey::account(&[0x11; 20]);
516 let active_key = StateKey::account(&[0x22; 20]);
517 let dormant_value = leaf_to_bytes(&AccountLeaf {
518 cash: 42,
519 nonce: 1,
520 margin_mode: 0,
521 liquidation_state: 0,
522 });
523
524 let (_, batch) = tree
525 .put_value_set(vec![(dormant_key, Some(dormant_value.clone()))], 0)
526 .unwrap();
527 rocks.write_tree_update_batch(batch).unwrap();
528
529 for version in 1..20 {
530 let value = leaf_to_bytes(&AccountLeaf {
531 cash: version as i128,
532 nonce: version,
533 margin_mode: 0,
534 liquidation_state: 0,
535 });
536 let (_, batch) = tree
537 .put_value_set(vec![(active_key, Some(value))], version)
538 .unwrap();
539 rocks.write_tree_update_batch(batch).unwrap();
540 }
541
542 let (pruned_nodes, pruned_values) = rocks.prune(10).unwrap();
543 assert!(pruned_nodes > 0);
544
545 #[cfg(not(feature = "rsm-prune-values"))]
546 {
547 assert_eq!(pruned_values, 0, "value pruning requires checkpoints");
548
549 let (value, proof) = tree.get_with_proof(dormant_key, 19).unwrap();
550 assert_eq!(value, Some(dormant_value));
551 let root = tree.get_root_hash(19).unwrap();
552 assert!(proof.verify(root, dormant_key, value.as_ref()).is_ok());
553 }
554
555 #[cfg(feature = "rsm-prune-values")]
556 assert!(
557 pruned_values > 0,
558 "experimental value pruning should delete old value rows"
559 );
560 }
561}