1use std::path::Path;
2
3use ckb_merkle_mountain_range::{
4 Error as MmrError, MMRStoreReadOps, MMRStoreWriteOps, Result as MmrResult,
5};
6use jmt::storage::{LeafNode, Node, NodeBatch, NodeKey, TreeReader, TreeUpdateBatch, TreeWriter};
7use jmt::{KeyHash, OwnedValue, Version};
8use rocksdb::{ColumnFamilyDescriptor, Options, WriteBatch, DB};
9
10use crate::mmr::{
11 prepare_append_many_with_store, MmrHash, MmrMetadataStore, PrepareMmrStore, PreparedMmrAppend,
12 SupportedMmrStore,
13};
14
15const CF_NODES: &str = "jmt_nodes";
16const CF_VALUES: &str = "jmt_values";
17const CF_STALE: &str = "jmt_stale";
18const CF_ROOT_META: &str = "jmt_root_meta";
19const CF_MMR_NODES: &str = "mmr_nodes";
20const CF_MMR_META: &str = "mmr_meta";
21const LATEST_ROOT_VERSION_KEY: &[u8] = b"latest_root_version";
22const LATEST_ROOT_KEY_KEY: &[u8] = b"latest_root_key";
23const MMR_SIZE_KEY: &[u8] = b"size";
24
25pub struct RocksDbStore {
35 db: DB,
36}
37
38pub struct RocksDbMmrStore {
43 db: DB,
44}
45
46impl RocksDbStore {
47 pub fn open(path: impl AsRef<Path>) -> anyhow::Result<Self> {
48 let mut db_opts = Options::default();
49 db_opts.create_if_missing(true);
50 db_opts.create_missing_column_families(true);
51
52 let mut node_opts = Options::default();
53 node_opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
54
55 let mut value_opts = Options::default();
56 value_opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
57
58 let stale_opts = Options::default();
59 let root_meta_opts = Options::default();
60
61 let cfs = vec![
62 ColumnFamilyDescriptor::new(CF_NODES, node_opts),
63 ColumnFamilyDescriptor::new(CF_VALUES, value_opts),
64 ColumnFamilyDescriptor::new(CF_STALE, stale_opts),
65 ColumnFamilyDescriptor::new(CF_ROOT_META, root_meta_opts),
66 ];
67
68 let db = DB::open_cf_descriptors(&db_opts, path, cfs)?;
69 Ok(Self { db })
70 }
71
72 pub fn write_tree_update_batch(&self, update: TreeUpdateBatch) -> anyhow::Result<()> {
74 let cf_nodes = self.db.cf_handle(CF_NODES).expect("missing jmt_nodes CF");
75 let cf_values = self.db.cf_handle(CF_VALUES).expect("missing jmt_values CF");
76 let cf_stale = self.db.cf_handle(CF_STALE).expect("missing jmt_stale CF");
77 let cf_root_meta = self
78 .db
79 .cf_handle(CF_ROOT_META)
80 .expect("missing jmt_root_meta CF");
81
82 let mut batch = WriteBatch::default();
83
84 for (node_key, node) in update.node_batch.nodes() {
85 let key = borsh::to_vec(node_key)?;
86 let val = borsh::to_vec(node)?;
87 batch.put_cf(cf_nodes, &key, &val);
88 if node_key.nibble_path().is_empty() {
89 batch.put_cf(
90 cf_root_meta,
91 LATEST_ROOT_VERSION_KEY,
92 node_key.version().to_be_bytes(),
93 );
94 batch.put_cf(cf_root_meta, LATEST_ROOT_KEY_KEY, &key);
95 }
96 }
97
98 for ((version, key_hash), value) in update.node_batch.values() {
99 let key = encode_value_key(key_hash, *version);
100 match value {
101 Some(v) => batch.put_cf(cf_values, key, v),
102 None => batch.put_cf(cf_values, key, []),
103 }
104 }
105
106 for stale in &update.stale_node_index_batch {
107 let key = encode_stale_key(stale.stale_since_version, &stale.node_key)?;
108 batch.put_cf(cf_stale, &key, []);
109 }
110
111 self.db.write(batch)?;
112 Ok(())
113 }
114
115 pub fn prune(&self, up_to_version: Version) -> anyhow::Result<(usize, usize)> {
124 let nodes_pruned = self.prune_stale_nodes(up_to_version)?;
125 #[cfg(feature = "prune-values")]
126 let values_pruned = self.prune_values(up_to_version)?;
127 #[cfg(not(feature = "prune-values"))]
128 let values_pruned = 0;
129 Ok((nodes_pruned, values_pruned))
130 }
131
132 fn prune_stale_nodes(&self, up_to_version: Version) -> anyhow::Result<usize> {
133 let cf_nodes = self.db.cf_handle(CF_NODES).expect("missing jmt_nodes CF");
134 let cf_stale = self.db.cf_handle(CF_STALE).expect("missing jmt_stale CF");
135
136 let mut batch = WriteBatch::default();
137 let mut pruned = 0usize;
138
139 let mut iter = self.db.raw_iterator_cf(cf_stale);
140 iter.seek_to_first();
141
142 while iter.valid() {
143 let key = match iter.key() {
144 Some(k) => k,
145 None => break,
146 };
147
148 if key.len() < 8 {
149 iter.next();
150 continue;
151 }
152 let stale_version = u64::from_be_bytes(key[..8].try_into().unwrap());
153 if stale_version > up_to_version {
154 break;
155 }
156
157 let node_key: NodeKey = borsh::from_slice(&key[8..])?;
158 let node_key_bytes = borsh::to_vec(&node_key)?;
159 batch.delete_cf(cf_nodes, &node_key_bytes);
160 batch.delete_cf(cf_stale, key);
161 pruned += 1;
162
163 iter.next();
164 }
165
166 if pruned > 0 {
167 self.db.write(batch)?;
168 }
169 Ok(pruned)
170 }
171
172 #[cfg(feature = "prune-values")]
173 fn prune_values(&self, up_to_version: Version) -> anyhow::Result<usize> {
174 let cf_values = self.db.cf_handle(CF_VALUES).expect("missing jmt_values CF");
175
176 let mut batch = WriteBatch::default();
177 let mut pruned = 0usize;
178
179 let mut iter = self.db.raw_iterator_cf(cf_values);
180 iter.seek_to_first();
181
182 while iter.valid() {
183 let key = match iter.key() {
184 Some(k) => k,
185 None => break,
186 };
187
188 if key.len() < 40 {
189 iter.next();
190 continue;
191 }
192
193 let version = u64::from_be_bytes(key[32..40].try_into().unwrap());
194 if version <= up_to_version {
195 batch.delete_cf(cf_values, key);
196 pruned += 1;
197 }
198
199 iter.next();
200 }
201
202 if pruned > 0 {
203 self.db.write(batch)?;
204 }
205 Ok(pruned)
206 }
207
208 pub fn num_nodes(&self) -> anyhow::Result<usize> {
209 self.count_cf(CF_NODES)
210 }
211
212 pub fn num_values(&self) -> anyhow::Result<usize> {
213 self.count_cf(CF_VALUES)
214 }
215
216 pub fn is_empty(&self) -> anyhow::Result<bool> {
217 Ok(self.count_cf(CF_NODES)? == 0
218 && self.count_cf(CF_VALUES)? == 0
219 && self.count_cf(CF_STALE)? == 0
220 && self.count_cf(CF_ROOT_META)? == 0)
221 }
222
223 pub fn copy_latest_root_to_version(&self, to_version: Version) -> anyhow::Result<bool> {
229 let cf = self.db.cf_handle(CF_NODES).expect("missing jmt_nodes CF");
230 let cf_root_meta = self
231 .db
232 .cf_handle(CF_ROOT_META)
233 .expect("missing jmt_root_meta CF");
234
235 let Some((latest_version, mut to_key)) = self.latest_root_metadata()? else {
236 return Ok(false);
237 };
238 if latest_version >= to_version {
239 return Ok(false);
240 }
241 let Some(node) = self.db.get_cf(cf, &to_key)? else {
242 anyhow::bail!(
243 "latest JMT root metadata points to missing node at version {}",
244 latest_version
245 );
246 };
247 anyhow::ensure!(
248 to_key.len() >= 8,
249 "encoded JMT root node key shorter than version prefix"
250 );
251 to_key[..8].copy_from_slice(&to_version.to_le_bytes());
252 let mut batch = WriteBatch::default();
253 batch.put_cf(cf, &to_key, node);
254 batch.put_cf(
255 cf_root_meta,
256 LATEST_ROOT_VERSION_KEY,
257 to_version.to_be_bytes(),
258 );
259 batch.put_cf(cf_root_meta, LATEST_ROOT_KEY_KEY, &to_key);
260 self.db.write(batch)?;
261 Ok(true)
262 }
263
264 pub fn latest_root_version(&self) -> anyhow::Result<Option<Version>> {
265 Ok(self.latest_root_metadata()?.map(|(version, _)| version))
266 }
267
268 fn latest_root_metadata(&self) -> anyhow::Result<Option<(Version, Vec<u8>)>> {
269 let cf_root_meta = self
270 .db
271 .cf_handle(CF_ROOT_META)
272 .expect("missing jmt_root_meta CF");
273 let Some(version_bytes) = self.db.get_cf(cf_root_meta, LATEST_ROOT_VERSION_KEY)? else {
274 return Ok(None);
275 };
276 anyhow::ensure!(
277 version_bytes.len() == 8,
278 "invalid latest JMT root version metadata length {}",
279 version_bytes.len()
280 );
281 let Some(root_key) = self.db.get_cf(cf_root_meta, LATEST_ROOT_KEY_KEY)? else {
282 anyhow::bail!("latest JMT root key metadata is missing");
283 };
284 Ok(Some((
285 u64::from_be_bytes(version_bytes.as_slice().try_into().unwrap()),
286 root_key,
287 )))
288 }
289
290 pub fn compact_all(&self) -> anyhow::Result<()> {
296 for cf_name in [CF_NODES, CF_VALUES, CF_STALE, CF_ROOT_META] {
297 let cf = self
298 .db
299 .cf_handle(cf_name)
300 .ok_or_else(|| anyhow::anyhow!("missing CF {cf_name}"))?;
301 self.db.compact_range_cf(cf, None::<&[u8]>, None::<&[u8]>);
302 }
303 Ok(())
304 }
305
306 fn count_cf(&self, cf_name: &str) -> anyhow::Result<usize> {
307 let cf = self
308 .db
309 .cf_handle(cf_name)
310 .ok_or_else(|| anyhow::anyhow!("missing CF {cf_name}"))?;
311 let mut count = 0usize;
312 let mut iter = self.db.raw_iterator_cf(cf);
313 iter.seek_to_first();
314 while iter.valid() {
315 count += 1;
316 iter.next();
317 }
318 Ok(count)
319 }
320}
321
322impl RocksDbMmrStore {
323 pub fn open(path: impl AsRef<Path>) -> anyhow::Result<Self> {
324 let mut db_opts = Options::default();
325 db_opts.create_if_missing(true);
326 db_opts.create_missing_column_families(true);
327
328 let mut node_opts = Options::default();
329 node_opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
330
331 let meta_opts = Options::default();
332
333 let cfs = vec![
334 ColumnFamilyDescriptor::new(CF_MMR_NODES, node_opts),
335 ColumnFamilyDescriptor::new(CF_MMR_META, meta_opts),
336 ];
337
338 let db = DB::open_cf_descriptors(&db_opts, path, cfs)?;
339 Ok(Self { db })
340 }
341
342 pub fn num_nodes(&self) -> anyhow::Result<usize> {
343 let cf = self
344 .db
345 .cf_handle(CF_MMR_NODES)
346 .ok_or_else(|| anyhow::anyhow!("missing CF {CF_MMR_NODES}"))?;
347 let mut count = 0usize;
348 let mut iter = self.db.raw_iterator_cf(cf);
349 iter.seek_to_first();
350 while iter.valid() {
351 count += 1;
352 iter.next();
353 }
354 Ok(count)
355 }
356
357 pub fn is_empty(&self) -> anyhow::Result<bool> {
358 Ok(self.num_nodes()? == 0 && self.load_mmr_size()? == 0)
359 }
360
361 pub fn compact_all(&self) -> anyhow::Result<()> {
363 for cf_name in [CF_MMR_NODES, CF_MMR_META] {
364 let cf = self
365 .db
366 .cf_handle(cf_name)
367 .ok_or_else(|| anyhow::anyhow!("missing CF {cf_name}"))?;
368 self.db.compact_range_cf(cf, None::<&[u8]>, None::<&[u8]>);
369 }
370 Ok(())
371 }
372}
373
374impl MmrMetadataStore for RocksDbMmrStore {
375 fn load_mmr_size(&self) -> anyhow::Result<u64> {
376 let cf = self.db.cf_handle(CF_MMR_META).expect("missing mmr_meta CF");
377 match self.db.get_cf(cf, MMR_SIZE_KEY)? {
378 Some(bytes) => {
379 if bytes.len() != 8 {
380 anyhow::bail!("invalid persisted MMR size length {}", bytes.len());
381 }
382 Ok(u64::from_be_bytes(bytes.as_slice().try_into().unwrap()))
383 }
384 None => Ok(0),
385 }
386 }
387}
388
389impl SupportedMmrStore for RocksDbMmrStore {}
390
391impl PrepareMmrStore for RocksDbMmrStore {
392 fn prepare_append_many_from(
393 size: u64,
394 store: &Self,
395 data: &[[u8; 32]],
396 ) -> anyhow::Result<PreparedMmrAppend> {
397 prepare_append_many_with_store(size, store, data)
398 }
399}
400
401impl ckb_merkle_mountain_range::MMRStoreReadOps<MmrHash>
402 for &crate::mmr::StagedMmrStore<'_, RocksDbMmrStore>
403{
404 fn get_elem(&self, pos: u64) -> ckb_merkle_mountain_range::Result<Option<MmrHash>> {
405 for (start, elems) in self.writes.borrow().iter().rev() {
406 if pos >= *start {
407 let index = (pos - *start) as usize;
408 if let Some(elem) = elems.get(index) {
409 return Ok(Some(elem.clone()));
410 }
411 }
412 }
413 self.base.get_elem(pos)
414 }
415}
416
417impl MMRStoreReadOps<MmrHash> for &RocksDbMmrStore {
418 fn get_elem(&self, pos: u64) -> MmrResult<Option<MmrHash>> {
419 let cf = self
420 .db
421 .cf_handle(CF_MMR_NODES)
422 .expect("missing mmr_nodes CF");
423 match self
424 .db
425 .get_cf(cf, pos.to_be_bytes())
426 .map_err(|error| MmrError::StoreError(error.to_string()))?
427 {
428 Some(bytes) => {
429 if bytes.len() != 32 {
430 return Err(MmrError::StoreError(format!(
431 "invalid persisted MMR node length {}",
432 bytes.len()
433 )));
434 }
435 Ok(Some(MmrHash(bytes.as_slice().try_into().unwrap())))
436 }
437 None => Ok(None),
438 }
439 }
440}
441
442impl MMRStoreWriteOps<MmrHash> for &RocksDbMmrStore {
443 fn append(&mut self, pos: u64, elems: Vec<MmrHash>) -> MmrResult<()> {
444 let cf_nodes = self
445 .db
446 .cf_handle(CF_MMR_NODES)
447 .expect("missing mmr_nodes CF");
448 let cf_meta = self.db.cf_handle(CF_MMR_META).expect("missing mmr_meta CF");
449 let next_size = pos + elems.len() as u64;
450 let mut batch = WriteBatch::default();
451 for (i, elem) in elems.into_iter().enumerate() {
452 let p = pos + i as u64;
453 if self
454 .db
455 .get_cf(cf_nodes, p.to_be_bytes())
456 .map_err(|error| MmrError::StoreError(error.to_string()))?
457 .is_some()
458 {
459 return Err(MmrError::InconsistentStore);
460 }
461 batch.put_cf(cf_nodes, p.to_be_bytes(), elem.0);
462 }
463 batch.put_cf(cf_meta, MMR_SIZE_KEY, next_size.to_be_bytes());
464 self.db
465 .write(batch)
466 .map_err(|error| MmrError::StoreError(error.to_string()))?;
467 Ok(())
468 }
469}
470
471impl TreeReader for RocksDbStore {
472 fn get_node_option(&self, node_key: &NodeKey) -> anyhow::Result<Option<Node>> {
473 let cf = self.db.cf_handle(CF_NODES).expect("missing jmt_nodes CF");
474 let key = borsh::to_vec(node_key)?;
475 match self.db.get_cf(cf, &key)? {
476 Some(bytes) => Ok(Some(borsh::from_slice(&bytes)?)),
477 None => Ok(None),
478 }
479 }
480
481 fn get_value_option(
482 &self,
483 max_version: Version,
484 key_hash: KeyHash,
485 ) -> anyhow::Result<Option<OwnedValue>> {
486 let cf = self.db.cf_handle(CF_VALUES).expect("missing jmt_values CF");
487
488 let seek_key = encode_value_key(&key_hash, max_version);
489 let mut iter = self.db.raw_iterator_cf(cf);
490 iter.seek_for_prev(seek_key);
491
492 if !iter.valid() {
493 return Ok(None);
494 }
495
496 let key = match iter.key() {
497 Some(k) if k.len() >= 40 => k,
498 _ => return Ok(None),
499 };
500
501 if key[..32] != key_hash.0 {
502 return Ok(None);
503 }
504
505 match iter.value() {
506 Some([]) => Ok(None),
507 Some(v) => Ok(Some(v.to_vec())),
508 None => Ok(None),
509 }
510 }
511
512 fn get_rightmost_leaf(&self) -> anyhow::Result<Option<(NodeKey, LeafNode)>> {
513 let cf = self.db.cf_handle(CF_NODES).expect("missing jmt_nodes CF");
514 let mut best: Option<(NodeKey, LeafNode)> = None;
515
516 let mut iter = self.db.raw_iterator_cf(cf);
517 iter.seek_to_first();
518
519 while iter.valid() {
520 if let (Some(key_bytes), Some(val_bytes)) = (iter.key(), iter.value()) {
521 let node: Node = borsh::from_slice(val_bytes)?;
522 if let Node::Leaf(leaf) = node {
523 let node_key: NodeKey = borsh::from_slice(key_bytes)?;
524 let dominated = match &best {
525 None => true,
526 Some((_, best_leaf)) => leaf.key_hash() > best_leaf.key_hash(),
527 };
528 if dominated {
529 best = Some((node_key, leaf));
530 }
531 }
532 }
533 iter.next();
534 }
535
536 Ok(best)
537 }
538}
539
540impl TreeWriter for RocksDbStore {
541 fn write_node_batch(&self, node_batch: &NodeBatch) -> anyhow::Result<()> {
542 let cf_nodes = self.db.cf_handle(CF_NODES).expect("missing jmt_nodes CF");
543 let cf_values = self.db.cf_handle(CF_VALUES).expect("missing jmt_values CF");
544 let cf_root_meta = self
545 .db
546 .cf_handle(CF_ROOT_META)
547 .expect("missing jmt_root_meta CF");
548
549 let mut batch = WriteBatch::default();
550
551 for (node_key, node) in node_batch.nodes() {
552 let key = borsh::to_vec(node_key)?;
553 let val = borsh::to_vec(node)?;
554 batch.put_cf(cf_nodes, &key, &val);
555 if node_key.nibble_path().is_empty() {
556 batch.put_cf(
557 cf_root_meta,
558 LATEST_ROOT_VERSION_KEY,
559 node_key.version().to_be_bytes(),
560 );
561 batch.put_cf(cf_root_meta, LATEST_ROOT_KEY_KEY, &key);
562 }
563 }
564
565 for ((version, key_hash), value) in node_batch.values() {
566 let key = encode_value_key(key_hash, *version);
567 match value {
568 Some(v) => batch.put_cf(cf_values, key, v),
569 None => batch.put_cf(cf_values, key, []),
570 }
571 }
572
573 self.db.write(batch)?;
574 Ok(())
575 }
576}
577
578fn encode_value_key(key_hash: &KeyHash, version: Version) -> [u8; 40] {
580 let mut key = [0u8; 40];
581 key[..32].copy_from_slice(&key_hash.0);
582 key[32..].copy_from_slice(&version.to_be_bytes());
583 key
584}
585
586fn encode_stale_key(stale_since_version: Version, node_key: &NodeKey) -> anyhow::Result<Vec<u8>> {
588 let node_key_bytes = borsh::to_vec(node_key)?;
589 let mut key = Vec::with_capacity(8 + node_key_bytes.len());
590 key.extend_from_slice(&stale_since_version.to_be_bytes());
591 key.extend_from_slice(&node_key_bytes);
592 Ok(key)
593}
594
595#[cfg(all(test, feature = "rocksdb"))]
596mod tests {
597 use super::*;
598 use crate::keys::StateKey;
599 use crate::leaves::*;
600 use crate::mmr::HypercallMmr;
601 use crate::state_tree::new_jmt;
602 use tempfile::TempDir;
603
604 fn open_temp_store() -> (RocksDbStore, TempDir) {
605 let dir = TempDir::new().unwrap();
606 let store = RocksDbStore::open(dir.path()).unwrap();
607 (store, dir)
608 }
609
610 #[test]
611 fn mmr_reopens_with_persisted_size_and_root() {
612 let dir = TempDir::new().unwrap();
613 let root_after_two = {
614 let store = RocksDbMmrStore::open(dir.path()).unwrap();
615 let mut mmr = HypercallMmr::from_store(store).unwrap();
616 mmr.append([0x11; 32]).unwrap();
617 mmr.append([0x22; 32]).unwrap();
618 assert_eq!(mmr.size(), 3);
619 mmr.peaks_hash()
620 };
621
622 let root_after_three = {
623 let store = RocksDbMmrStore::open(dir.path()).unwrap();
624 let mut mmr = HypercallMmr::from_store(store).unwrap();
625 assert_eq!(mmr.size(), 3);
626 assert_eq!(mmr.peaks_hash(), root_after_two);
627 mmr.append([0x33; 32]).unwrap();
628 assert_eq!(mmr.size(), 4);
629 mmr.peaks_hash()
630 };
631
632 let store = RocksDbMmrStore::open(dir.path()).unwrap();
633 let mmr = HypercallMmr::from_store(store).unwrap();
634 assert_eq!(mmr.size(), 4);
635 assert_eq!(mmr.peaks_hash(), root_after_three);
636 }
637
638 #[test]
639 fn basic_insert_and_read() {
640 let (store, _dir) = open_temp_store();
641 let tree = new_jmt(&store);
642
643 let key = StateKey::account(&[0x42; 20]);
644 let value = leaf_to_bytes(&AccountLeaf {
645 cash: 1000,
646 nonce: 0,
647 margin_mode: 0,
648 liquidation_state: 0,
649 });
650
651 let (root, batch) = tree
652 .put_value_set(vec![(key, Some(value.clone()))], 0)
653 .unwrap();
654 store.write_tree_update_batch(batch).unwrap();
655
656 let (retrieved, proof) = tree.get_with_proof(key, 0).unwrap();
657 assert_eq!(retrieved, Some(value));
658 assert!(proof.verify(root, key, retrieved.as_ref()).is_ok());
659 }
660
661 #[test]
662 fn copy_latest_root_to_version_returns_false_without_root_metadata() {
663 let (store, _dir) = open_temp_store();
664
665 assert!(!store.copy_latest_root_to_version(7).unwrap());
666 assert!(new_jmt(&store).get_root_hash_option(7).unwrap().is_none());
667 }
668
669 #[test]
670 fn copy_latest_root_to_version_checkpoints_unchanged_root() {
671 let (store, _dir) = open_temp_store();
672 let tree = new_jmt(&store);
673 let key = StateKey::account(&[0x42; 20]);
674 let value = leaf_to_bytes(&AccountLeaf {
675 cash: 1000,
676 nonce: 0,
677 margin_mode: 0,
678 liquidation_state: 0,
679 });
680 let (root, batch) = tree
681 .put_value_set(vec![(key, Some(value.clone()))], 0)
682 .unwrap();
683 store.write_tree_update_batch(batch).unwrap();
684
685 assert!(store.copy_latest_root_to_version(7).unwrap());
686 assert_eq!(tree.get_root_hash_option(7).unwrap(), Some(root));
687 let (retrieved, proof) = tree.get_with_proof(key, 7).unwrap();
688 assert_eq!(retrieved, Some(value));
689 assert!(proof.verify(root, key, retrieved.as_ref()).is_ok());
690 }
691
692 #[test]
693 fn copy_latest_root_to_version_returns_false_for_current_or_older_version() {
694 let (store, _dir) = open_temp_store();
695 let tree = new_jmt(&store);
696 let key = StateKey::account(&[0x11; 20]);
697 let value = leaf_to_bytes(&AccountLeaf {
698 cash: 1,
699 nonce: 0,
700 margin_mode: 0,
701 liquidation_state: 0,
702 });
703 let (_root, batch) = tree.put_value_set(vec![(key, Some(value))], 3).unwrap();
704 store.write_tree_update_batch(batch).unwrap();
705
706 assert!(!store.copy_latest_root_to_version(3).unwrap());
707 assert!(!store.copy_latest_root_to_version(2).unwrap());
708 }
709
710 #[test]
711 fn copy_latest_root_to_version_fails_when_metadata_points_to_missing_node() {
712 let (store, _dir) = open_temp_store();
713 let cf_root_meta = store
714 .db
715 .cf_handle(CF_ROOT_META)
716 .expect("missing root meta CF");
717 let mut missing_root_key = vec![0u8; 16];
718 missing_root_key[..8].copy_from_slice(&3u64.to_le_bytes());
719 let mut batch = WriteBatch::default();
720 batch.put_cf(cf_root_meta, LATEST_ROOT_VERSION_KEY, 3u64.to_be_bytes());
721 batch.put_cf(cf_root_meta, LATEST_ROOT_KEY_KEY, missing_root_key);
722 store.db.write(batch).unwrap();
723
724 let error = store
725 .copy_latest_root_to_version(7)
726 .unwrap_err()
727 .to_string();
728 assert!(
729 error.contains("latest JMT root metadata points to missing node"),
730 "{error}"
731 );
732 }
733
734 #[test]
735 fn versioned_reads() {
736 let (store, _dir) = open_temp_store();
737 let tree = new_jmt(&store);
738 let key = StateKey::account(&[0x01; 20]);
739
740 for v in 0..10u64 {
741 let val = leaf_to_bytes(&AccountLeaf {
742 cash: v as i128 * 100,
743 nonce: v,
744 margin_mode: 0,
745 liquidation_state: 0,
746 });
747 let (_, batch) = tree.put_value_set(vec![(key, Some(val))], v).unwrap();
748 store.write_tree_update_batch(batch).unwrap();
749 }
750
751 for v in 0..10u64 {
752 let (val, _) = tree.get_with_proof(key, v).unwrap();
753 let decoded: AccountLeaf = leaf_from_bytes(&val.unwrap()).unwrap();
754 assert_eq!(decoded.cash, v as i128 * 100);
755 }
756 }
757
758 #[test]
759 fn prune_stale_nodes() {
760 let (store, _dir) = open_temp_store();
761 let tree = new_jmt(&store);
762 let key = StateKey::account(&[0x01; 20]);
763
764 for v in 0..5u64 {
765 let val = leaf_to_bytes(&AccountLeaf {
766 cash: v as i128,
767 nonce: v,
768 margin_mode: 0,
769 liquidation_state: 0,
770 });
771 let (_, batch) = tree.put_value_set(vec![(key, Some(val))], v).unwrap();
772 store.write_tree_update_batch(batch).unwrap();
773 }
774
775 let nodes_before = store.num_nodes().unwrap();
776 let (nodes_pruned, _) = store.prune(3).unwrap();
777 assert!(nodes_pruned > 0, "should have pruned stale nodes");
778 assert!(store.num_nodes().unwrap() < nodes_before);
779
780 let (val, _) = tree.get_with_proof(key, 4).unwrap();
781 let decoded: AccountLeaf = leaf_from_bytes(&val.unwrap()).unwrap();
782 assert_eq!(decoded.cash, 4);
783 }
784
785 #[test]
786 fn delete_and_tombstone() {
787 let (store, _dir) = open_temp_store();
788 let tree = new_jmt(&store);
789 let key = StateKey::account(&[0x01; 20]);
790
791 let val = leaf_to_bytes(&AccountLeaf {
792 cash: 42,
793 nonce: 0,
794 margin_mode: 0,
795 liquidation_state: 0,
796 });
797 let (_, batch) = tree.put_value_set(vec![(key, Some(val))], 0).unwrap();
798 store.write_tree_update_batch(batch).unwrap();
799
800 let (_, batch) = tree.put_value_set(vec![(key, None)], 1).unwrap();
801 store.write_tree_update_batch(batch).unwrap();
802
803 let (val_v1, _) = tree.get_with_proof(key, 1).unwrap();
804 assert!(val_v1.is_none(), "deleted key must return None");
805
806 let (val_v0, _) = tree.get_with_proof(key, 0).unwrap();
807 assert!(val_v0.is_some(), "v0 should still have value");
808 }
809
810 #[test]
811 fn batch_1000_accounts() {
812 let (store, _dir) = open_temp_store();
813 let tree = new_jmt(&store);
814
815 let entries: Vec<_> = (0..1000u32)
816 .map(|i| {
817 let mut addr = [0u8; 20];
818 addr[0..4].copy_from_slice(&i.to_le_bytes());
819 let key = StateKey::account(&addr);
820 let val = leaf_to_bytes(&AccountLeaf {
821 cash: i as i128 * 100,
822 nonce: 0,
823 margin_mode: 0,
824 liquidation_state: 0,
825 });
826 (key, Some(val))
827 })
828 .collect();
829
830 let (root, batch) = tree.put_value_set(entries, 0).unwrap();
831 store.write_tree_update_batch(batch).unwrap();
832
833 assert_ne!(root.0, [0u8; 32]);
834
835 for i in [0u32, 500, 999] {
836 let mut addr = [0u8; 20];
837 addr[0..4].copy_from_slice(&i.to_le_bytes());
838 let key = StateKey::account(&addr);
839 let (val, proof) = tree.get_with_proof(key, 0).unwrap();
840 assert!(val.is_some());
841 assert!(proof.verify(root, key, val.as_ref()).is_ok());
842 }
843 }
844
845 #[test]
846 fn survives_reopen() {
847 let dir = TempDir::new().unwrap();
848 let root;
849 let key = StateKey::account(&[0xAA; 20]);
850 let value = leaf_to_bytes(&AccountLeaf {
851 cash: 999,
852 nonce: 1,
853 margin_mode: 0,
854 liquidation_state: 0,
855 });
856
857 {
858 let store = RocksDbStore::open(dir.path()).unwrap();
859 let tree = new_jmt(&store);
860 let (r, batch) = tree
861 .put_value_set(vec![(key, Some(value.clone()))], 0)
862 .unwrap();
863 store.write_tree_update_batch(batch).unwrap();
864 root = r;
865 }
866
867 {
868 let store = RocksDbStore::open(dir.path()).unwrap();
869 let tree = new_jmt(&store);
870 let (val, proof) = tree.get_with_proof(key, 0).unwrap();
871 assert_eq!(val, Some(value));
872 assert!(proof.verify(root, key, val.as_ref()).is_ok());
873 }
874 }
875
876 #[test]
877 fn mixed_state_types() {
878 let (store, _dir) = open_temp_store();
879 let tree = new_jmt(&store);
880 let addr = [0x42u8; 20];
881
882 let entries: Vec<(KeyHash, Option<Vec<u8>>)> = vec![
883 (
884 StateKey::account(&addr),
885 Some(leaf_to_bytes(&AccountLeaf {
886 cash: 50000,
887 nonce: 0,
888 margin_mode: 1,
889 liquidation_state: 0,
890 })),
891 ),
892 (
893 StateKey::option_position(&addr, "BTC-20261231-100000-C"),
894 Some(leaf_to_bytes(&OptionPositionLeaf {
895 quantity: 100_000_000,
896 entry_price: 500_00,
897 })),
898 ),
899 (
900 StateKey::perp_position(&addr, "BTC"),
901 Some(leaf_to_bytes(&PerpPositionLeaf {
902 size: -1_000_000,
903 entry_price: 95000_00000000,
904 })),
905 ),
906 (
907 StateKey::global(),
908 Some(leaf_to_bytes(&GlobalLeaf {
909 next_order_id: 1000,
910 next_trade_id: 500,
911 command_chain_root: [0xBB; 32],
912 command_chain_seq: 42,
913 })),
914 ),
915 ];
916
917 let (root, batch) = tree.put_value_set(entries.clone(), 0).unwrap();
918 store.write_tree_update_batch(batch).unwrap();
919
920 for (key, expected) in &entries {
921 let (val, proof) = tree.get_with_proof(*key, 0).unwrap();
922 assert_eq!(&val, expected);
923 assert!(proof.verify(root, *key, val.as_deref()).is_ok());
924 }
925 }
926}