Skip to main content

hypercall_state_commitment/
rocks_store.rs

1use std::path::Path;
2
3use ckb_merkle_mountain_range::{
4    Error as MmrError, MMRStoreReadOps, MMRStoreWriteOps, Result as MmrResult,
5};
6use jmt::storage::{LeafNode, Node, NodeBatch, NodeKey, TreeReader, TreeUpdateBatch, TreeWriter};
7use jmt::{KeyHash, OwnedValue, Version};
8use rocksdb::{ColumnFamilyDescriptor, Options, WriteBatch, DB};
9
10use crate::mmr::{
11    prepare_append_many_with_store, MmrHash, MmrMetadataStore, PrepareMmrStore, PreparedMmrAppend,
12    SupportedMmrStore,
13};
14
15const CF_NODES: &str = "jmt_nodes";
16const CF_VALUES: &str = "jmt_values";
17const CF_STALE: &str = "jmt_stale";
18const CF_ROOT_META: &str = "jmt_root_meta";
19const CF_MMR_NODES: &str = "mmr_nodes";
20const CF_MMR_META: &str = "mmr_meta";
21const LATEST_ROOT_VERSION_KEY: &[u8] = b"latest_root_version";
22const LATEST_ROOT_KEY_KEY: &[u8] = b"latest_root_key";
23const MMR_SIZE_KEY: &[u8] = b"size";
24
25/// RocksDB-backed JMT store with bounded memory usage.
26///
27/// Three column families:
28/// - `jmt_nodes`: `borsh(NodeKey) -> borsh(Node)`
29/// - `jmt_values`: `key_hash(32) || version(8 BE) -> raw value bytes` (empty = tombstone)
30/// - `jmt_stale`: `stale_since_version(8 BE) || borsh(NodeKey) -> ()`
31///
32/// The values CF key layout enables `seek_for_prev` to efficiently find the
33/// latest value at or before a given version for a key hash.
34pub struct RocksDbStore {
35    db: DB,
36}
37
38/// RocksDB-backed MMR store.
39///
40/// This is separate from `RocksDbStore` because the runtime owns three logical
41/// MMRs with independent append positions: command, obligation, and intent.
42pub struct RocksDbMmrStore {
43    db: DB,
44}
45
46impl RocksDbStore {
47    pub fn open(path: impl AsRef<Path>) -> anyhow::Result<Self> {
48        let mut db_opts = Options::default();
49        db_opts.create_if_missing(true);
50        db_opts.create_missing_column_families(true);
51
52        let mut node_opts = Options::default();
53        node_opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
54
55        let mut value_opts = Options::default();
56        value_opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
57
58        let stale_opts = Options::default();
59        let root_meta_opts = Options::default();
60
61        let cfs = vec![
62            ColumnFamilyDescriptor::new(CF_NODES, node_opts),
63            ColumnFamilyDescriptor::new(CF_VALUES, value_opts),
64            ColumnFamilyDescriptor::new(CF_STALE, stale_opts),
65            ColumnFamilyDescriptor::new(CF_ROOT_META, root_meta_opts),
66        ];
67
68        let db = DB::open_cf_descriptors(&db_opts, path, cfs)?;
69        Ok(Self { db })
70    }
71
72    /// Write a full TreeUpdateBatch atomically (nodes + values + stale indices).
73    pub fn write_tree_update_batch(&self, update: TreeUpdateBatch) -> anyhow::Result<()> {
74        let cf_nodes = self.db.cf_handle(CF_NODES).expect("missing jmt_nodes CF");
75        let cf_values = self.db.cf_handle(CF_VALUES).expect("missing jmt_values CF");
76        let cf_stale = self.db.cf_handle(CF_STALE).expect("missing jmt_stale CF");
77        let cf_root_meta = self
78            .db
79            .cf_handle(CF_ROOT_META)
80            .expect("missing jmt_root_meta CF");
81
82        let mut batch = WriteBatch::default();
83
84        for (node_key, node) in update.node_batch.nodes() {
85            let key = borsh::to_vec(node_key)?;
86            let val = borsh::to_vec(node)?;
87            batch.put_cf(cf_nodes, &key, &val);
88            if node_key.nibble_path().is_empty() {
89                batch.put_cf(
90                    cf_root_meta,
91                    LATEST_ROOT_VERSION_KEY,
92                    node_key.version().to_be_bytes(),
93                );
94                batch.put_cf(cf_root_meta, LATEST_ROOT_KEY_KEY, &key);
95            }
96        }
97
98        for ((version, key_hash), value) in update.node_batch.values() {
99            let key = encode_value_key(key_hash, *version);
100            match value {
101                Some(v) => batch.put_cf(cf_values, key, v),
102                None => batch.put_cf(cf_values, key, []),
103            }
104        }
105
106        for stale in &update.stale_node_index_batch {
107            let key = encode_stale_key(stale.stale_since_version, &stale.node_key)?;
108            batch.put_cf(cf_stale, &key, []);
109        }
110
111        self.db.write(batch)?;
112        Ok(())
113    }
114
115    /// Prune stale nodes up to (inclusive) the given version.
116    ///
117    /// Value pruning is disabled by default until checkpoint compaction is
118    /// implemented. A dormant key can have its latest live value at an old
119    /// version, so deleting values by version alone can make the latest root
120    /// impossible to prove. The non-default `prune-values` feature keeps the
121    /// old experimental behavior available for research and benchmarks.
122    /// Returns (nodes_pruned, values_pruned).
123    pub fn prune(&self, up_to_version: Version) -> anyhow::Result<(usize, usize)> {
124        let nodes_pruned = self.prune_stale_nodes(up_to_version)?;
125        #[cfg(feature = "prune-values")]
126        let values_pruned = self.prune_values(up_to_version)?;
127        #[cfg(not(feature = "prune-values"))]
128        let values_pruned = 0;
129        Ok((nodes_pruned, values_pruned))
130    }
131
132    fn prune_stale_nodes(&self, up_to_version: Version) -> anyhow::Result<usize> {
133        let cf_nodes = self.db.cf_handle(CF_NODES).expect("missing jmt_nodes CF");
134        let cf_stale = self.db.cf_handle(CF_STALE).expect("missing jmt_stale CF");
135
136        let mut batch = WriteBatch::default();
137        let mut pruned = 0usize;
138
139        let mut iter = self.db.raw_iterator_cf(cf_stale);
140        iter.seek_to_first();
141
142        while iter.valid() {
143            let key = match iter.key() {
144                Some(k) => k,
145                None => break,
146            };
147
148            if key.len() < 8 {
149                iter.next();
150                continue;
151            }
152            let stale_version = u64::from_be_bytes(key[..8].try_into().unwrap());
153            if stale_version > up_to_version {
154                break;
155            }
156
157            let node_key: NodeKey = borsh::from_slice(&key[8..])?;
158            let node_key_bytes = borsh::to_vec(&node_key)?;
159            batch.delete_cf(cf_nodes, &node_key_bytes);
160            batch.delete_cf(cf_stale, key);
161            pruned += 1;
162
163            iter.next();
164        }
165
166        if pruned > 0 {
167            self.db.write(batch)?;
168        }
169        Ok(pruned)
170    }
171
172    #[cfg(feature = "prune-values")]
173    fn prune_values(&self, up_to_version: Version) -> anyhow::Result<usize> {
174        let cf_values = self.db.cf_handle(CF_VALUES).expect("missing jmt_values CF");
175
176        let mut batch = WriteBatch::default();
177        let mut pruned = 0usize;
178
179        let mut iter = self.db.raw_iterator_cf(cf_values);
180        iter.seek_to_first();
181
182        while iter.valid() {
183            let key = match iter.key() {
184                Some(k) => k,
185                None => break,
186            };
187
188            if key.len() < 40 {
189                iter.next();
190                continue;
191            }
192
193            let version = u64::from_be_bytes(key[32..40].try_into().unwrap());
194            if version <= up_to_version {
195                batch.delete_cf(cf_values, key);
196                pruned += 1;
197            }
198
199            iter.next();
200        }
201
202        if pruned > 0 {
203            self.db.write(batch)?;
204        }
205        Ok(pruned)
206    }
207
208    pub fn num_nodes(&self) -> anyhow::Result<usize> {
209        self.count_cf(CF_NODES)
210    }
211
212    pub fn num_values(&self) -> anyhow::Result<usize> {
213        self.count_cf(CF_VALUES)
214    }
215
216    pub fn is_empty(&self) -> anyhow::Result<bool> {
217        Ok(self.count_cf(CF_NODES)? == 0
218            && self.count_cf(CF_VALUES)? == 0
219            && self.count_cf(CF_STALE)? == 0
220            && self.count_cf(CF_ROOT_META)? == 0)
221    }
222
223    /// Copy a JMT root node to a later version without changing tree contents.
224    ///
225    /// JMT itself only writes a root node when a tree has updates. Runtime
226    /// batches can still inherit an unchanged root, so checkpointing the root
227    /// at the accepted version keeps exact-version startup verification cheap.
228    pub fn copy_latest_root_to_version(&self, to_version: Version) -> anyhow::Result<bool> {
229        let cf = self.db.cf_handle(CF_NODES).expect("missing jmt_nodes CF");
230        let cf_root_meta = self
231            .db
232            .cf_handle(CF_ROOT_META)
233            .expect("missing jmt_root_meta CF");
234
235        let Some((latest_version, mut to_key)) = self.latest_root_metadata()? else {
236            return Ok(false);
237        };
238        if latest_version >= to_version {
239            return Ok(false);
240        }
241        let Some(node) = self.db.get_cf(cf, &to_key)? else {
242            anyhow::bail!(
243                "latest JMT root metadata points to missing node at version {}",
244                latest_version
245            );
246        };
247        anyhow::ensure!(
248            to_key.len() >= 8,
249            "encoded JMT root node key shorter than version prefix"
250        );
251        to_key[..8].copy_from_slice(&to_version.to_le_bytes());
252        let mut batch = WriteBatch::default();
253        batch.put_cf(cf, &to_key, node);
254        batch.put_cf(
255            cf_root_meta,
256            LATEST_ROOT_VERSION_KEY,
257            to_version.to_be_bytes(),
258        );
259        batch.put_cf(cf_root_meta, LATEST_ROOT_KEY_KEY, &to_key);
260        self.db.write(batch)?;
261        Ok(true)
262    }
263
264    pub fn latest_root_version(&self) -> anyhow::Result<Option<Version>> {
265        Ok(self.latest_root_metadata()?.map(|(version, _)| version))
266    }
267
268    fn latest_root_metadata(&self) -> anyhow::Result<Option<(Version, Vec<u8>)>> {
269        let cf_root_meta = self
270            .db
271            .cf_handle(CF_ROOT_META)
272            .expect("missing jmt_root_meta CF");
273        let Some(version_bytes) = self.db.get_cf(cf_root_meta, LATEST_ROOT_VERSION_KEY)? else {
274            return Ok(None);
275        };
276        anyhow::ensure!(
277            version_bytes.len() == 8,
278            "invalid latest JMT root version metadata length {}",
279            version_bytes.len()
280        );
281        let Some(root_key) = self.db.get_cf(cf_root_meta, LATEST_ROOT_KEY_KEY)? else {
282            anyhow::bail!("latest JMT root key metadata is missing");
283        };
284        Ok(Some((
285            u64::from_be_bytes(version_bytes.as_slice().try_into().unwrap()),
286            root_key,
287        )))
288    }
289
290    /// Compact all commitment-store column families.
291    ///
292    /// This rewrites RocksDB's LSM files after pruning or large batch runs so
293    /// deleted stale-node keys stop occupying disk. It does not change logical
294    /// JMT contents or versioned values.
295    pub fn compact_all(&self) -> anyhow::Result<()> {
296        for cf_name in [CF_NODES, CF_VALUES, CF_STALE, CF_ROOT_META] {
297            let cf = self
298                .db
299                .cf_handle(cf_name)
300                .ok_or_else(|| anyhow::anyhow!("missing CF {cf_name}"))?;
301            self.db.compact_range_cf(cf, None::<&[u8]>, None::<&[u8]>);
302        }
303        Ok(())
304    }
305
306    fn count_cf(&self, cf_name: &str) -> anyhow::Result<usize> {
307        let cf = self
308            .db
309            .cf_handle(cf_name)
310            .ok_or_else(|| anyhow::anyhow!("missing CF {cf_name}"))?;
311        let mut count = 0usize;
312        let mut iter = self.db.raw_iterator_cf(cf);
313        iter.seek_to_first();
314        while iter.valid() {
315            count += 1;
316            iter.next();
317        }
318        Ok(count)
319    }
320}
321
322impl RocksDbMmrStore {
323    pub fn open(path: impl AsRef<Path>) -> anyhow::Result<Self> {
324        let mut db_opts = Options::default();
325        db_opts.create_if_missing(true);
326        db_opts.create_missing_column_families(true);
327
328        let mut node_opts = Options::default();
329        node_opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
330
331        let meta_opts = Options::default();
332
333        let cfs = vec![
334            ColumnFamilyDescriptor::new(CF_MMR_NODES, node_opts),
335            ColumnFamilyDescriptor::new(CF_MMR_META, meta_opts),
336        ];
337
338        let db = DB::open_cf_descriptors(&db_opts, path, cfs)?;
339        Ok(Self { db })
340    }
341
342    pub fn num_nodes(&self) -> anyhow::Result<usize> {
343        let cf = self
344            .db
345            .cf_handle(CF_MMR_NODES)
346            .ok_or_else(|| anyhow::anyhow!("missing CF {CF_MMR_NODES}"))?;
347        let mut count = 0usize;
348        let mut iter = self.db.raw_iterator_cf(cf);
349        iter.seek_to_first();
350        while iter.valid() {
351            count += 1;
352            iter.next();
353        }
354        Ok(count)
355    }
356
357    pub fn is_empty(&self) -> anyhow::Result<bool> {
358        Ok(self.num_nodes()? == 0 && self.load_mmr_size()? == 0)
359    }
360
361    /// Compact all MMR column families.
362    pub fn compact_all(&self) -> anyhow::Result<()> {
363        for cf_name in [CF_MMR_NODES, CF_MMR_META] {
364            let cf = self
365                .db
366                .cf_handle(cf_name)
367                .ok_or_else(|| anyhow::anyhow!("missing CF {cf_name}"))?;
368            self.db.compact_range_cf(cf, None::<&[u8]>, None::<&[u8]>);
369        }
370        Ok(())
371    }
372}
373
374impl MmrMetadataStore for RocksDbMmrStore {
375    fn load_mmr_size(&self) -> anyhow::Result<u64> {
376        let cf = self.db.cf_handle(CF_MMR_META).expect("missing mmr_meta CF");
377        match self.db.get_cf(cf, MMR_SIZE_KEY)? {
378            Some(bytes) => {
379                if bytes.len() != 8 {
380                    anyhow::bail!("invalid persisted MMR size length {}", bytes.len());
381                }
382                Ok(u64::from_be_bytes(bytes.as_slice().try_into().unwrap()))
383            }
384            None => Ok(0),
385        }
386    }
387}
388
389impl SupportedMmrStore for RocksDbMmrStore {}
390
391impl PrepareMmrStore for RocksDbMmrStore {
392    fn prepare_append_many_from(
393        size: u64,
394        store: &Self,
395        data: &[[u8; 32]],
396    ) -> anyhow::Result<PreparedMmrAppend> {
397        prepare_append_many_with_store(size, store, data)
398    }
399}
400
401impl ckb_merkle_mountain_range::MMRStoreReadOps<MmrHash>
402    for &crate::mmr::StagedMmrStore<'_, RocksDbMmrStore>
403{
404    fn get_elem(&self, pos: u64) -> ckb_merkle_mountain_range::Result<Option<MmrHash>> {
405        for (start, elems) in self.writes.borrow().iter().rev() {
406            if pos >= *start {
407                let index = (pos - *start) as usize;
408                if let Some(elem) = elems.get(index) {
409                    return Ok(Some(elem.clone()));
410                }
411            }
412        }
413        self.base.get_elem(pos)
414    }
415}
416
417impl MMRStoreReadOps<MmrHash> for &RocksDbMmrStore {
418    fn get_elem(&self, pos: u64) -> MmrResult<Option<MmrHash>> {
419        let cf = self
420            .db
421            .cf_handle(CF_MMR_NODES)
422            .expect("missing mmr_nodes CF");
423        match self
424            .db
425            .get_cf(cf, pos.to_be_bytes())
426            .map_err(|error| MmrError::StoreError(error.to_string()))?
427        {
428            Some(bytes) => {
429                if bytes.len() != 32 {
430                    return Err(MmrError::StoreError(format!(
431                        "invalid persisted MMR node length {}",
432                        bytes.len()
433                    )));
434                }
435                Ok(Some(MmrHash(bytes.as_slice().try_into().unwrap())))
436            }
437            None => Ok(None),
438        }
439    }
440}
441
442impl MMRStoreWriteOps<MmrHash> for &RocksDbMmrStore {
443    fn append(&mut self, pos: u64, elems: Vec<MmrHash>) -> MmrResult<()> {
444        let cf_nodes = self
445            .db
446            .cf_handle(CF_MMR_NODES)
447            .expect("missing mmr_nodes CF");
448        let cf_meta = self.db.cf_handle(CF_MMR_META).expect("missing mmr_meta CF");
449        let next_size = pos + elems.len() as u64;
450        let mut batch = WriteBatch::default();
451        for (i, elem) in elems.into_iter().enumerate() {
452            let p = pos + i as u64;
453            if self
454                .db
455                .get_cf(cf_nodes, p.to_be_bytes())
456                .map_err(|error| MmrError::StoreError(error.to_string()))?
457                .is_some()
458            {
459                return Err(MmrError::InconsistentStore);
460            }
461            batch.put_cf(cf_nodes, p.to_be_bytes(), elem.0);
462        }
463        batch.put_cf(cf_meta, MMR_SIZE_KEY, next_size.to_be_bytes());
464        self.db
465            .write(batch)
466            .map_err(|error| MmrError::StoreError(error.to_string()))?;
467        Ok(())
468    }
469}
470
471impl TreeReader for RocksDbStore {
472    fn get_node_option(&self, node_key: &NodeKey) -> anyhow::Result<Option<Node>> {
473        let cf = self.db.cf_handle(CF_NODES).expect("missing jmt_nodes CF");
474        let key = borsh::to_vec(node_key)?;
475        match self.db.get_cf(cf, &key)? {
476            Some(bytes) => Ok(Some(borsh::from_slice(&bytes)?)),
477            None => Ok(None),
478        }
479    }
480
481    fn get_value_option(
482        &self,
483        max_version: Version,
484        key_hash: KeyHash,
485    ) -> anyhow::Result<Option<OwnedValue>> {
486        let cf = self.db.cf_handle(CF_VALUES).expect("missing jmt_values CF");
487
488        let seek_key = encode_value_key(&key_hash, max_version);
489        let mut iter = self.db.raw_iterator_cf(cf);
490        iter.seek_for_prev(seek_key);
491
492        if !iter.valid() {
493            return Ok(None);
494        }
495
496        let key = match iter.key() {
497            Some(k) if k.len() >= 40 => k,
498            _ => return Ok(None),
499        };
500
501        if key[..32] != key_hash.0 {
502            return Ok(None);
503        }
504
505        match iter.value() {
506            Some([]) => Ok(None),
507            Some(v) => Ok(Some(v.to_vec())),
508            None => Ok(None),
509        }
510    }
511
512    fn get_rightmost_leaf(&self) -> anyhow::Result<Option<(NodeKey, LeafNode)>> {
513        let cf = self.db.cf_handle(CF_NODES).expect("missing jmt_nodes CF");
514        let mut best: Option<(NodeKey, LeafNode)> = None;
515
516        let mut iter = self.db.raw_iterator_cf(cf);
517        iter.seek_to_first();
518
519        while iter.valid() {
520            if let (Some(key_bytes), Some(val_bytes)) = (iter.key(), iter.value()) {
521                let node: Node = borsh::from_slice(val_bytes)?;
522                if let Node::Leaf(leaf) = node {
523                    let node_key: NodeKey = borsh::from_slice(key_bytes)?;
524                    let dominated = match &best {
525                        None => true,
526                        Some((_, best_leaf)) => leaf.key_hash() > best_leaf.key_hash(),
527                    };
528                    if dominated {
529                        best = Some((node_key, leaf));
530                    }
531                }
532            }
533            iter.next();
534        }
535
536        Ok(best)
537    }
538}
539
540impl TreeWriter for RocksDbStore {
541    fn write_node_batch(&self, node_batch: &NodeBatch) -> anyhow::Result<()> {
542        let cf_nodes = self.db.cf_handle(CF_NODES).expect("missing jmt_nodes CF");
543        let cf_values = self.db.cf_handle(CF_VALUES).expect("missing jmt_values CF");
544        let cf_root_meta = self
545            .db
546            .cf_handle(CF_ROOT_META)
547            .expect("missing jmt_root_meta CF");
548
549        let mut batch = WriteBatch::default();
550
551        for (node_key, node) in node_batch.nodes() {
552            let key = borsh::to_vec(node_key)?;
553            let val = borsh::to_vec(node)?;
554            batch.put_cf(cf_nodes, &key, &val);
555            if node_key.nibble_path().is_empty() {
556                batch.put_cf(
557                    cf_root_meta,
558                    LATEST_ROOT_VERSION_KEY,
559                    node_key.version().to_be_bytes(),
560                );
561                batch.put_cf(cf_root_meta, LATEST_ROOT_KEY_KEY, &key);
562            }
563        }
564
565        for ((version, key_hash), value) in node_batch.values() {
566            let key = encode_value_key(key_hash, *version);
567            match value {
568                Some(v) => batch.put_cf(cf_values, key, v),
569                None => batch.put_cf(cf_values, key, []),
570            }
571        }
572
573        self.db.write(batch)?;
574        Ok(())
575    }
576}
577
578/// Encode a values CF key: `key_hash (32 bytes) || version (8 bytes big-endian)`.
579fn encode_value_key(key_hash: &KeyHash, version: Version) -> [u8; 40] {
580    let mut key = [0u8; 40];
581    key[..32].copy_from_slice(&key_hash.0);
582    key[32..].copy_from_slice(&version.to_be_bytes());
583    key
584}
585
586/// Encode a stale CF key: `stale_since_version (8 bytes BE) || borsh(node_key)`.
587fn encode_stale_key(stale_since_version: Version, node_key: &NodeKey) -> anyhow::Result<Vec<u8>> {
588    let node_key_bytes = borsh::to_vec(node_key)?;
589    let mut key = Vec::with_capacity(8 + node_key_bytes.len());
590    key.extend_from_slice(&stale_since_version.to_be_bytes());
591    key.extend_from_slice(&node_key_bytes);
592    Ok(key)
593}
594
595#[cfg(all(test, feature = "rocksdb"))]
596mod tests {
597    use super::*;
598    use crate::keys::StateKey;
599    use crate::leaves::*;
600    use crate::mmr::HypercallMmr;
601    use crate::state_tree::new_jmt;
602    use tempfile::TempDir;
603
604    fn open_temp_store() -> (RocksDbStore, TempDir) {
605        let dir = TempDir::new().unwrap();
606        let store = RocksDbStore::open(dir.path()).unwrap();
607        (store, dir)
608    }
609
610    #[test]
611    fn mmr_reopens_with_persisted_size_and_root() {
612        let dir = TempDir::new().unwrap();
613        let root_after_two = {
614            let store = RocksDbMmrStore::open(dir.path()).unwrap();
615            let mut mmr = HypercallMmr::from_store(store).unwrap();
616            mmr.append([0x11; 32]).unwrap();
617            mmr.append([0x22; 32]).unwrap();
618            assert_eq!(mmr.size(), 3);
619            mmr.peaks_hash()
620        };
621
622        let root_after_three = {
623            let store = RocksDbMmrStore::open(dir.path()).unwrap();
624            let mut mmr = HypercallMmr::from_store(store).unwrap();
625            assert_eq!(mmr.size(), 3);
626            assert_eq!(mmr.peaks_hash(), root_after_two);
627            mmr.append([0x33; 32]).unwrap();
628            assert_eq!(mmr.size(), 4);
629            mmr.peaks_hash()
630        };
631
632        let store = RocksDbMmrStore::open(dir.path()).unwrap();
633        let mmr = HypercallMmr::from_store(store).unwrap();
634        assert_eq!(mmr.size(), 4);
635        assert_eq!(mmr.peaks_hash(), root_after_three);
636    }
637
638    #[test]
639    fn basic_insert_and_read() {
640        let (store, _dir) = open_temp_store();
641        let tree = new_jmt(&store);
642
643        let key = StateKey::account(&[0x42; 20]);
644        let value = leaf_to_bytes(&AccountLeaf {
645            cash: 1000,
646            nonce: 0,
647            margin_mode: 0,
648            liquidation_state: 0,
649        });
650
651        let (root, batch) = tree
652            .put_value_set(vec![(key, Some(value.clone()))], 0)
653            .unwrap();
654        store.write_tree_update_batch(batch).unwrap();
655
656        let (retrieved, proof) = tree.get_with_proof(key, 0).unwrap();
657        assert_eq!(retrieved, Some(value));
658        assert!(proof.verify(root, key, retrieved.as_ref()).is_ok());
659    }
660
661    #[test]
662    fn copy_latest_root_to_version_returns_false_without_root_metadata() {
663        let (store, _dir) = open_temp_store();
664
665        assert!(!store.copy_latest_root_to_version(7).unwrap());
666        assert!(new_jmt(&store).get_root_hash_option(7).unwrap().is_none());
667    }
668
669    #[test]
670    fn copy_latest_root_to_version_checkpoints_unchanged_root() {
671        let (store, _dir) = open_temp_store();
672        let tree = new_jmt(&store);
673        let key = StateKey::account(&[0x42; 20]);
674        let value = leaf_to_bytes(&AccountLeaf {
675            cash: 1000,
676            nonce: 0,
677            margin_mode: 0,
678            liquidation_state: 0,
679        });
680        let (root, batch) = tree
681            .put_value_set(vec![(key, Some(value.clone()))], 0)
682            .unwrap();
683        store.write_tree_update_batch(batch).unwrap();
684
685        assert!(store.copy_latest_root_to_version(7).unwrap());
686        assert_eq!(tree.get_root_hash_option(7).unwrap(), Some(root));
687        let (retrieved, proof) = tree.get_with_proof(key, 7).unwrap();
688        assert_eq!(retrieved, Some(value));
689        assert!(proof.verify(root, key, retrieved.as_ref()).is_ok());
690    }
691
692    #[test]
693    fn copy_latest_root_to_version_returns_false_for_current_or_older_version() {
694        let (store, _dir) = open_temp_store();
695        let tree = new_jmt(&store);
696        let key = StateKey::account(&[0x11; 20]);
697        let value = leaf_to_bytes(&AccountLeaf {
698            cash: 1,
699            nonce: 0,
700            margin_mode: 0,
701            liquidation_state: 0,
702        });
703        let (_root, batch) = tree.put_value_set(vec![(key, Some(value))], 3).unwrap();
704        store.write_tree_update_batch(batch).unwrap();
705
706        assert!(!store.copy_latest_root_to_version(3).unwrap());
707        assert!(!store.copy_latest_root_to_version(2).unwrap());
708    }
709
710    #[test]
711    fn copy_latest_root_to_version_fails_when_metadata_points_to_missing_node() {
712        let (store, _dir) = open_temp_store();
713        let cf_root_meta = store
714            .db
715            .cf_handle(CF_ROOT_META)
716            .expect("missing root meta CF");
717        let mut missing_root_key = vec![0u8; 16];
718        missing_root_key[..8].copy_from_slice(&3u64.to_le_bytes());
719        let mut batch = WriteBatch::default();
720        batch.put_cf(cf_root_meta, LATEST_ROOT_VERSION_KEY, 3u64.to_be_bytes());
721        batch.put_cf(cf_root_meta, LATEST_ROOT_KEY_KEY, missing_root_key);
722        store.db.write(batch).unwrap();
723
724        let error = store
725            .copy_latest_root_to_version(7)
726            .unwrap_err()
727            .to_string();
728        assert!(
729            error.contains("latest JMT root metadata points to missing node"),
730            "{error}"
731        );
732    }
733
734    #[test]
735    fn versioned_reads() {
736        let (store, _dir) = open_temp_store();
737        let tree = new_jmt(&store);
738        let key = StateKey::account(&[0x01; 20]);
739
740        for v in 0..10u64 {
741            let val = leaf_to_bytes(&AccountLeaf {
742                cash: v as i128 * 100,
743                nonce: v,
744                margin_mode: 0,
745                liquidation_state: 0,
746            });
747            let (_, batch) = tree.put_value_set(vec![(key, Some(val))], v).unwrap();
748            store.write_tree_update_batch(batch).unwrap();
749        }
750
751        for v in 0..10u64 {
752            let (val, _) = tree.get_with_proof(key, v).unwrap();
753            let decoded: AccountLeaf = leaf_from_bytes(&val.unwrap()).unwrap();
754            assert_eq!(decoded.cash, v as i128 * 100);
755        }
756    }
757
758    #[test]
759    fn prune_stale_nodes() {
760        let (store, _dir) = open_temp_store();
761        let tree = new_jmt(&store);
762        let key = StateKey::account(&[0x01; 20]);
763
764        for v in 0..5u64 {
765            let val = leaf_to_bytes(&AccountLeaf {
766                cash: v as i128,
767                nonce: v,
768                margin_mode: 0,
769                liquidation_state: 0,
770            });
771            let (_, batch) = tree.put_value_set(vec![(key, Some(val))], v).unwrap();
772            store.write_tree_update_batch(batch).unwrap();
773        }
774
775        let nodes_before = store.num_nodes().unwrap();
776        let (nodes_pruned, _) = store.prune(3).unwrap();
777        assert!(nodes_pruned > 0, "should have pruned stale nodes");
778        assert!(store.num_nodes().unwrap() < nodes_before);
779
780        let (val, _) = tree.get_with_proof(key, 4).unwrap();
781        let decoded: AccountLeaf = leaf_from_bytes(&val.unwrap()).unwrap();
782        assert_eq!(decoded.cash, 4);
783    }
784
785    #[test]
786    fn delete_and_tombstone() {
787        let (store, _dir) = open_temp_store();
788        let tree = new_jmt(&store);
789        let key = StateKey::account(&[0x01; 20]);
790
791        let val = leaf_to_bytes(&AccountLeaf {
792            cash: 42,
793            nonce: 0,
794            margin_mode: 0,
795            liquidation_state: 0,
796        });
797        let (_, batch) = tree.put_value_set(vec![(key, Some(val))], 0).unwrap();
798        store.write_tree_update_batch(batch).unwrap();
799
800        let (_, batch) = tree.put_value_set(vec![(key, None)], 1).unwrap();
801        store.write_tree_update_batch(batch).unwrap();
802
803        let (val_v1, _) = tree.get_with_proof(key, 1).unwrap();
804        assert!(val_v1.is_none(), "deleted key must return None");
805
806        let (val_v0, _) = tree.get_with_proof(key, 0).unwrap();
807        assert!(val_v0.is_some(), "v0 should still have value");
808    }
809
810    #[test]
811    fn batch_1000_accounts() {
812        let (store, _dir) = open_temp_store();
813        let tree = new_jmt(&store);
814
815        let entries: Vec<_> = (0..1000u32)
816            .map(|i| {
817                let mut addr = [0u8; 20];
818                addr[0..4].copy_from_slice(&i.to_le_bytes());
819                let key = StateKey::account(&addr);
820                let val = leaf_to_bytes(&AccountLeaf {
821                    cash: i as i128 * 100,
822                    nonce: 0,
823                    margin_mode: 0,
824                    liquidation_state: 0,
825                });
826                (key, Some(val))
827            })
828            .collect();
829
830        let (root, batch) = tree.put_value_set(entries, 0).unwrap();
831        store.write_tree_update_batch(batch).unwrap();
832
833        assert_ne!(root.0, [0u8; 32]);
834
835        for i in [0u32, 500, 999] {
836            let mut addr = [0u8; 20];
837            addr[0..4].copy_from_slice(&i.to_le_bytes());
838            let key = StateKey::account(&addr);
839            let (val, proof) = tree.get_with_proof(key, 0).unwrap();
840            assert!(val.is_some());
841            assert!(proof.verify(root, key, val.as_ref()).is_ok());
842        }
843    }
844
845    #[test]
846    fn survives_reopen() {
847        let dir = TempDir::new().unwrap();
848        let root;
849        let key = StateKey::account(&[0xAA; 20]);
850        let value = leaf_to_bytes(&AccountLeaf {
851            cash: 999,
852            nonce: 1,
853            margin_mode: 0,
854            liquidation_state: 0,
855        });
856
857        {
858            let store = RocksDbStore::open(dir.path()).unwrap();
859            let tree = new_jmt(&store);
860            let (r, batch) = tree
861                .put_value_set(vec![(key, Some(value.clone()))], 0)
862                .unwrap();
863            store.write_tree_update_batch(batch).unwrap();
864            root = r;
865        }
866
867        {
868            let store = RocksDbStore::open(dir.path()).unwrap();
869            let tree = new_jmt(&store);
870            let (val, proof) = tree.get_with_proof(key, 0).unwrap();
871            assert_eq!(val, Some(value));
872            assert!(proof.verify(root, key, val.as_ref()).is_ok());
873        }
874    }
875
876    #[test]
877    fn mixed_state_types() {
878        let (store, _dir) = open_temp_store();
879        let tree = new_jmt(&store);
880        let addr = [0x42u8; 20];
881
882        let entries: Vec<(KeyHash, Option<Vec<u8>>)> = vec![
883            (
884                StateKey::account(&addr),
885                Some(leaf_to_bytes(&AccountLeaf {
886                    cash: 50000,
887                    nonce: 0,
888                    margin_mode: 1,
889                    liquidation_state: 0,
890                })),
891            ),
892            (
893                StateKey::option_position(&addr, "BTC-20261231-100000-C"),
894                Some(leaf_to_bytes(&OptionPositionLeaf {
895                    quantity: 100_000_000,
896                    entry_price: 500_00,
897                })),
898            ),
899            (
900                StateKey::perp_position(&addr, "BTC"),
901                Some(leaf_to_bytes(&PerpPositionLeaf {
902                    size: -1_000_000,
903                    entry_price: 95000_00000000,
904                })),
905            ),
906            (
907                StateKey::global(),
908                Some(leaf_to_bytes(&GlobalLeaf {
909                    next_order_id: 1000,
910                    next_trade_id: 500,
911                    command_chain_root: [0xBB; 32],
912                    command_chain_seq: 42,
913                })),
914            ),
915        ];
916
917        let (root, batch) = tree.put_value_set(entries.clone(), 0).unwrap();
918        store.write_tree_update_batch(batch).unwrap();
919
920        for (key, expected) in &entries {
921            let (val, proof) = tree.get_with_proof(*key, 0).unwrap();
922            assert_eq!(&val, expected);
923            assert!(proof.verify(root, *key, val.as_deref()).is_ok());
924        }
925    }
926}