1use async_trait::async_trait;
8use hypercall_types::{BalanceUpdate, WalletAddress};
9use rust_decimal::Decimal;
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12use std::fmt;
13use std::ops::{Deref, DerefMut};
14use std::sync::Arc;
15
16#[derive(Debug, Clone)]
18pub enum LedgerError {
19 Db(String),
21 InvalidOperation(String),
23}
24
25impl fmt::Display for LedgerError {
26 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
27 match self {
28 LedgerError::Db(msg) => write!(f, "Ledger DB error: {}", msg),
29 LedgerError::InvalidOperation(msg) => write!(f, "Invalid ledger operation: {}", msg),
30 }
31 }
32}
33
34impl std::error::Error for LedgerError {}
35
36#[async_trait]
47pub trait Ledger: Send + Sync {
48 async fn get_balance(&self, wallet: &WalletAddress) -> Result<Decimal, LedgerError>;
52
53 async fn apply_pnl(
61 &self,
62 wallet: &WalletAddress,
63 realized_pnl: Decimal,
64 ) -> Result<(), LedgerError>;
65
66 async fn set_balance(
70 &self,
71 wallet: &WalletAddress,
72 balance: Decimal,
73 ) -> Result<(), LedgerError>;
74
75 async fn apply_premium(
86 &self,
87 wallet: &WalletAddress,
88 premium_delta: Decimal,
89 ) -> Result<(), LedgerError> {
90 self.apply_pnl(wallet, premium_delta).await
92 }
93}
94
95#[async_trait]
97pub trait BalanceProvider: Send + Sync {
98 async fn get_balance(&self, wallet: &WalletAddress) -> Result<Decimal, LedgerError>;
99}
100
101pub struct LedgerBalanceProvider {
102 ledger: Arc<dyn Ledger + Send + Sync>,
103}
104
105impl LedgerBalanceProvider {
106 pub fn new(ledger: Arc<dyn Ledger + Send + Sync>) -> Self {
107 Self { ledger }
108 }
109}
110
111#[async_trait]
112impl BalanceProvider for LedgerBalanceProvider {
113 async fn get_balance(&self, wallet: &WalletAddress) -> Result<Decimal, LedgerError> {
114 self.ledger.get_balance(wallet).await
115 }
116}
117
118#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
120pub struct BalanceLedger {
121 balances: HashMap<WalletAddress, Decimal>,
122 last_balance_update_seq: u64,
123 last_applied_balance_update: Option<BalanceUpdate>,
124}
125
126impl BalanceLedger {
127 pub fn new() -> Self {
128 Self::default()
129 }
130
131 pub fn from_map(balances: HashMap<WalletAddress, Decimal>) -> Self {
132 Self::from_map_with_sequence(balances, 0)
133 }
134
135 pub fn from_map_with_sequence(
136 balances: HashMap<WalletAddress, Decimal>,
137 last_balance_update_seq: u64,
138 ) -> Self {
139 let mut ledger = Self::new();
140 for (wallet, balance) in balances {
141 ledger.set(wallet, balance);
142 }
143 ledger.last_balance_update_seq = last_balance_update_seq;
144 ledger
145 }
146
147 pub fn balance(&self, wallet: &WalletAddress) -> Decimal {
148 self.balances.get(wallet).copied().unwrap_or(Decimal::ZERO)
149 }
150
151 pub fn balances(&self) -> &HashMap<WalletAddress, Decimal> {
152 &self.balances
153 }
154
155 pub fn last_balance_update_seq(&self) -> u64 {
156 self.last_balance_update_seq
157 }
158
159 pub fn next_balance_update_seq(&self) -> u64 {
160 self.last_balance_update_seq
161 .checked_add(1)
162 .expect("STATE_CORRUPTION: balance_update_seq overflow")
163 }
164
165 pub fn apply_balance_update(&mut self, update: &BalanceUpdate) -> Result<bool, LedgerError> {
166 if update.balance_update_seq == self.last_balance_update_seq {
167 match self.last_applied_balance_update.as_ref() {
168 Some(last_update) if last_update == update => return Ok(false),
169 Some(_) => {
170 return Err(LedgerError::InvalidOperation(format!(
171 "balance update seq {} duplicate payload mismatch",
172 update.balance_update_seq
173 )));
174 }
175 None => return Ok(false),
176 }
177 }
178
179 if update.balance_update_seq < self.last_balance_update_seq {
180 return Err(LedgerError::InvalidOperation(format!(
181 "balance update seq {} is older than last applied seq {}",
182 update.balance_update_seq, self.last_balance_update_seq
183 )));
184 }
185
186 let expected_seq = self.next_balance_update_seq();
187 if update.balance_update_seq != expected_seq {
188 return Err(LedgerError::InvalidOperation(format!(
189 "balance update seq gap: expected {}, got {}",
190 expected_seq, update.balance_update_seq
191 )));
192 }
193
194 let current = self.balance(&update.wallet);
195 let expected_balance_after = current + update.delta;
196 if expected_balance_after != update.balance_after {
197 return Err(LedgerError::InvalidOperation(format!(
198 "balance update seq {} for {} has invalid balance_after: current {} + delta {} = {}, got {}",
199 update.balance_update_seq,
200 update.wallet,
201 current,
202 update.delta,
203 expected_balance_after,
204 update.balance_after
205 )));
206 }
207
208 self.set(update.wallet, update.balance_after);
209 self.last_balance_update_seq = update.balance_update_seq;
210 self.last_applied_balance_update = Some(update.clone());
211 Ok(true)
212 }
213
214 pub fn set(&mut self, wallet: WalletAddress, balance: Decimal) {
215 if balance == Decimal::ZERO {
216 self.balances.remove(&wallet);
217 } else {
218 self.balances.insert(wallet, balance);
219 }
220 }
221
222 pub fn insert(&mut self, wallet: WalletAddress, balance: Decimal) -> Option<Decimal> {
223 let previous = self.balances.get(&wallet).copied();
224 self.set(wallet, balance);
225 previous
226 }
227
228 pub fn add_delta(&mut self, wallet: WalletAddress, delta: Decimal) {
229 if delta == Decimal::ZERO {
230 return;
231 }
232
233 let next = self.balance(&wallet) + delta;
234 self.set(wallet, next);
235 }
236
237 pub fn remove_zero(&mut self, wallet: &WalletAddress) {
238 if self.balance(wallet) == Decimal::ZERO {
239 self.balances.remove(wallet);
240 }
241 }
242
243 pub fn snapshot_map(&self) -> HashMap<WalletAddress, Decimal> {
244 self.balances.clone()
245 }
246
247 pub fn sorted_nonzero_entries(&self) -> Vec<(WalletAddress, Decimal)> {
248 let mut entries: Vec<_> = self
249 .balances
250 .iter()
251 .filter_map(|(wallet, balance)| {
252 if *balance == Decimal::ZERO {
253 None
254 } else {
255 Some((*wallet, *balance))
256 }
257 })
258 .collect();
259 entries.sort_by_key(|(wallet, _)| *wallet);
260 entries
261 }
262
263 pub fn nonzero_wallets(&self) -> Vec<WalletAddress> {
264 self.sorted_nonzero_entries()
265 .into_iter()
266 .map(|(wallet, _)| wallet)
267 .collect()
268 }
269}
270
271impl Deref for BalanceLedger {
272 type Target = HashMap<WalletAddress, Decimal>;
273
274 fn deref(&self) -> &Self::Target {
275 &self.balances
276 }
277}
278
279impl DerefMut for BalanceLedger {
280 fn deref_mut(&mut self) -> &mut Self::Target {
281 &mut self.balances
282 }
283}
284
285#[derive(Default)]
288pub struct InMemoryLedger {
289 balances: std::sync::Arc<tokio::sync::RwLock<BalanceLedger>>,
290}
291
292impl InMemoryLedger {
293 pub fn new() -> Self {
294 Self::default()
295 }
296
297 pub async fn nonzero_wallets(&self) -> Vec<WalletAddress> {
298 let balances = self.balances.read().await;
299 balances.nonzero_wallets()
300 }
301}
302
303#[async_trait]
304impl Ledger for InMemoryLedger {
305 async fn get_balance(&self, wallet: &WalletAddress) -> Result<Decimal, LedgerError> {
306 let balances = self.balances.read().await;
307 Ok(balances.balance(wallet))
308 }
309
310 async fn apply_pnl(
311 &self,
312 wallet: &WalletAddress,
313 realized_pnl: Decimal,
314 ) -> Result<(), LedgerError> {
315 let mut balances = self.balances.write().await;
316 balances.add_delta(*wallet, realized_pnl);
317 Ok(())
318 }
319
320 async fn set_balance(
321 &self,
322 wallet: &WalletAddress,
323 balance: Decimal,
324 ) -> Result<(), LedgerError> {
325 let mut balances = self.balances.write().await;
326 balances.set(*wallet, balance);
327 Ok(())
328 }
329}
330
331#[cfg(any(test, feature = "test-utils"))]
333pub struct MockBalanceProvider;
334
335#[cfg(any(test, feature = "test-utils"))]
336#[async_trait]
337impl BalanceProvider for MockBalanceProvider {
338 async fn get_balance(&self, _wallet: &WalletAddress) -> Result<Decimal, LedgerError> {
339 Ok(Decimal::ZERO)
340 }
341}
342
343#[cfg(test)]
344mod tests {
345 use super::*;
346 use hypercall_types::{wallet_address::test_wallet, BalanceUpdateReason};
347 use rust_decimal_macros::dec;
348
349 fn balance_update(
350 ledger: &BalanceLedger,
351 wallet: WalletAddress,
352 delta: Decimal,
353 ) -> BalanceUpdate {
354 BalanceUpdate {
355 balance_update_seq: ledger.next_balance_update_seq(),
356 wallet,
357 delta,
358 balance_after: ledger.balance(&wallet) + delta,
359 reason: BalanceUpdateReason::Deposit,
360 reference_id: Some("test-reference".to_string()),
361 source_command_id: Some(1),
362 timestamp_ms: 1234,
363 }
364 }
365
366 fn sequenced_balance_update(
367 seq: u64,
368 wallet: WalletAddress,
369 delta: Decimal,
370 balance_after: Decimal,
371 ) -> BalanceUpdate {
372 BalanceUpdate {
373 balance_update_seq: seq,
374 wallet,
375 delta,
376 balance_after,
377 reason: BalanceUpdateReason::Deposit,
378 reference_id: Some(format!("test-reference-{seq}")),
379 source_command_id: Some(seq as i64),
380 timestamp_ms: 1234 + seq,
381 }
382 }
383
384 #[tokio::test]
385 async fn test_in_memory_ledger_get_balance_nonexistent() {
386 let ledger = InMemoryLedger::new();
387 let balance = ledger.get_balance(&test_wallet(1)).await.unwrap();
388 assert_eq!(balance, dec!(0));
389 }
390
391 #[tokio::test]
392 async fn test_in_memory_ledger_set_and_get_balance() {
393 let ledger = InMemoryLedger::new();
394 ledger
395 .set_balance(&test_wallet(1), dec!(10000))
396 .await
397 .unwrap();
398 let balance = ledger.get_balance(&test_wallet(1)).await.unwrap();
399 assert_eq!(balance, dec!(10000));
400 }
401
402 #[tokio::test]
403 async fn test_in_memory_ledger_nonzero_wallets() {
404 let ledger = InMemoryLedger::new();
405 ledger.set_balance(&test_wallet(1), dec!(25)).await.unwrap();
406 ledger
407 .set_balance(&test_wallet(2), rust_decimal::Decimal::ZERO)
408 .await
409 .unwrap();
410 ledger.set_balance(&test_wallet(3), dec!(-5)).await.unwrap();
411
412 let mut wallets = ledger.nonzero_wallets().await;
413 wallets.sort_unstable();
414
415 assert_eq!(wallets, vec![test_wallet(1), test_wallet(3)]);
416 }
417
418 #[test]
419 fn apply_balance_update_validates_and_applies_delta() {
420 let wallet = test_wallet(5);
421 let mut ledger = BalanceLedger::new();
422 ledger.set(wallet, dec!(100));
423
424 let update = balance_update(&ledger, wallet, dec!(25));
425 assert!(ledger.apply_balance_update(&update).unwrap());
426
427 assert_eq!(ledger.balance(&wallet), dec!(125));
428 assert_eq!(ledger.last_balance_update_seq(), update.balance_update_seq);
429 }
430
431 #[test]
432 fn apply_balance_update_removes_zero_balance() {
433 let wallet = test_wallet(6);
434 let mut ledger = BalanceLedger::new();
435 ledger.set(wallet, dec!(100));
436
437 let update = balance_update(&ledger, wallet, dec!(-100));
438 ledger.apply_balance_update(&update).unwrap();
439
440 assert_eq!(ledger.balance(&wallet), Decimal::ZERO);
441 assert!(!ledger.snapshot_map().contains_key(&wallet));
442 }
443
444 #[test]
445 fn apply_balance_update_rejects_invalid_balance_after() {
446 let wallet = test_wallet(7);
447 let mut ledger = BalanceLedger::new();
448 ledger.set(wallet, dec!(100));
449
450 let mut update = balance_update(&ledger, wallet, dec!(25));
451 update.balance_after = dec!(126);
452
453 let error = ledger.apply_balance_update(&update).unwrap_err();
454 assert!(
455 error.to_string().contains("has invalid balance_after"),
456 "unexpected error: {error}"
457 );
458 assert_eq!(ledger.balance(&wallet), dec!(100));
459 }
460
461 #[test]
462 fn apply_balance_update_rejects_sequence_gap() {
463 let wallet = test_wallet(8);
464 let mut ledger = BalanceLedger::new();
465
466 let mut update = balance_update(&ledger, wallet, dec!(10));
467 update.balance_update_seq += 1;
468
469 let error = ledger.apply_balance_update(&update).unwrap_err();
470 assert!(
471 error.to_string().contains("balance update seq gap"),
472 "unexpected error: {error}"
473 );
474 }
475
476 #[test]
477 fn apply_balance_update_accepts_exact_duplicate() {
478 let wallet = test_wallet(9);
479 let mut ledger = BalanceLedger::new();
480
481 let update = balance_update(&ledger, wallet, dec!(10));
482 assert!(ledger.apply_balance_update(&update).unwrap());
483 assert!(!ledger.apply_balance_update(&update).unwrap());
484 assert_eq!(ledger.balance(&wallet), dec!(10));
485 }
486
487 #[test]
488 fn apply_balance_update_rejects_duplicate_payload_mismatch() {
489 let wallet = test_wallet(10);
490 let mut ledger = BalanceLedger::new();
491
492 let update = balance_update(&ledger, wallet, dec!(10));
493 ledger.apply_balance_update(&update).unwrap();
494
495 let mut duplicate = update.clone();
496 duplicate.reference_id = Some("different".to_string());
497
498 let error = ledger.apply_balance_update(&duplicate).unwrap_err();
499 assert!(
500 error.to_string().contains("duplicate payload mismatch"),
501 "unexpected error: {error}"
502 );
503 }
504
505 #[test]
506 fn follower_replay_from_snapshot_converges_to_engine_ledger() {
507 let wallet_a = test_wallet(11);
508 let wallet_b = test_wallet(12);
509 let mut snapshot_balances = HashMap::new();
510 snapshot_balances.insert(wallet_a, dec!(100));
511 snapshot_balances.insert(wallet_b, dec!(50));
512 let mut follower = BalanceLedger::from_map_with_sequence(snapshot_balances, 10);
513
514 let updates = [
515 sequenced_balance_update(11, wallet_a, dec!(25), dec!(125)),
516 sequenced_balance_update(12, wallet_b, dec!(-50), dec!(0)),
517 sequenced_balance_update(13, wallet_a, dec!(-5), dec!(120)),
518 ];
519 for update in &updates {
520 assert!(follower.apply_balance_update(update).unwrap());
521 }
522
523 let mut expected = BalanceLedger::new();
524 expected.set(wallet_a, dec!(120));
525 expected.last_balance_update_seq = 13;
526 expected.last_applied_balance_update = Some(updates[2].clone());
527 assert_eq!(follower, expected);
528 assert!(!follower.snapshot_map().contains_key(&wallet_b));
529 }
530
531 #[test]
532 fn follower_replay_duplicate_after_snapshot_is_noop() {
533 let wallet = test_wallet(13);
534 let mut snapshot_balances = HashMap::new();
535 snapshot_balances.insert(wallet, dec!(100));
536 let mut follower = BalanceLedger::from_map_with_sequence(snapshot_balances, 2);
537 let update = sequenced_balance_update(3, wallet, dec!(10), dec!(110));
538
539 assert!(follower.apply_balance_update(&update).unwrap());
540 assert!(!follower.apply_balance_update(&update).unwrap());
541 assert_eq!(follower.balance(&wallet), dec!(110));
542 assert_eq!(follower.last_balance_update_seq(), 3);
543 }
544
545 #[test]
546 fn follower_replay_duplicate_snapshot_boundary_without_payload_is_noop() {
547 let wallet = test_wallet(15);
548 let mut snapshot_balances = HashMap::new();
549 snapshot_balances.insert(wallet, dec!(100));
550 let mut follower = BalanceLedger::from_map_with_sequence(snapshot_balances, 3);
551 let update = sequenced_balance_update(3, wallet, dec!(10), dec!(110));
552
553 assert!(!follower.apply_balance_update(&update).unwrap());
554 assert_eq!(follower.balance(&wallet), dec!(100));
555 assert_eq!(follower.last_balance_update_seq(), 3);
556 }
557
558 #[test]
559 fn follower_replay_gap_after_snapshot_fails_closed() {
560 let wallet = test_wallet(14);
561 let mut snapshot_balances = HashMap::new();
562 snapshot_balances.insert(wallet, dec!(100));
563 let mut follower = BalanceLedger::from_map_with_sequence(snapshot_balances, 7);
564 let update = sequenced_balance_update(9, wallet, dec!(10), dec!(110));
565
566 let error = follower.apply_balance_update(&update).unwrap_err();
567 assert!(
568 error.to_string().contains("seq gap"),
569 "unexpected error: {error}"
570 );
571 assert_eq!(follower.balance(&wallet), dec!(100));
572 assert_eq!(follower.last_balance_update_seq(), 7);
573 }
574
575 #[tokio::test]
576 async fn test_in_memory_ledger_apply_pnl_positive() {
577 let ledger = InMemoryLedger::new();
578 ledger
579 .set_balance(&test_wallet(1), dec!(10000))
580 .await
581 .unwrap();
582 ledger.apply_pnl(&test_wallet(1), dec!(500)).await.unwrap();
583 let balance = ledger.get_balance(&test_wallet(1)).await.unwrap();
584 assert_eq!(balance, dec!(10500));
585 }
586
587 #[tokio::test]
588 async fn test_in_memory_ledger_apply_pnl_negative() {
589 let ledger = InMemoryLedger::new();
590 ledger
591 .set_balance(&test_wallet(1), dec!(10000))
592 .await
593 .unwrap();
594 ledger.apply_pnl(&test_wallet(1), dec!(-300)).await.unwrap();
595 let balance = ledger.get_balance(&test_wallet(1)).await.unwrap();
596 assert_eq!(balance, dec!(9700));
597 }
598
599 #[tokio::test]
600 async fn test_in_memory_ledger_apply_pnl_creates_account() {
601 let ledger = InMemoryLedger::new();
602 ledger.apply_pnl(&test_wallet(1), dec!(500)).await.unwrap();
604 let balance = ledger.get_balance(&test_wallet(1)).await.unwrap();
605 assert_eq!(balance, dec!(500));
606 }
607}