Skip to main content

hypercall/rsm/
ledger.rs

1//! Balance ledger types.
2//!
3//! `BalanceLedger` is the engine-owned runtime source of truth for account
4//! balances. Runtime cash movement must be reduced through `BalanceUpdate`
5//! effects so replay, snapshots, and followers share one ordered contract.
6
7use async_trait::async_trait;
8use hypercall_types::{BalanceUpdate, WalletAddress};
9use rust_decimal::Decimal;
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12use std::fmt;
13use std::ops::{Deref, DerefMut};
14use std::sync::Arc;
15
16/// Error type for ledger operations.
17#[derive(Debug, Clone)]
18pub enum LedgerError {
19    /// Database error
20    Db(String),
21    /// Invalid operation
22    InvalidOperation(String),
23}
24
25impl fmt::Display for LedgerError {
26    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
27        match self {
28            LedgerError::Db(msg) => write!(f, "Ledger DB error: {}", msg),
29            LedgerError::InvalidOperation(msg) => write!(f, "Invalid ledger operation: {}", msg),
30        }
31    }
32}
33
34impl std::error::Error for LedgerError {}
35
36/// Durable account balance persistence interface.
37///
38/// The engine-owned `BalanceLedger` is authoritative for runtime reads. This
39/// trait exists for DB persistence adapters and tests that exercise durable
40/// account balance writes:
41/// - Position closes (realized PnL)
42/// - Explicit deposits/withdrawals
43/// - Premium settlement for Standard margin mode
44///
45/// Opening positions does NOT touch the ledger in Portfolio mode.
46#[async_trait]
47pub trait Ledger: Send + Sync {
48    /// Get the current collateral balance for an account.
49    ///
50    /// Returns 0 if the account doesn't exist (lazy account creation).
51    async fn get_balance(&self, wallet: &WalletAddress) -> Result<Decimal, LedgerError>;
52
53    /// Apply realized PnL to an account's collateral balance.
54    ///
55    /// This is called when positions are closed:
56    /// - Positive realized_pnl: balance increases (profitable close)
57    /// - Negative realized_pnl: balance decreases (losing close)
58    ///
59    /// Creates the account if it doesn't exist.
60    async fn apply_pnl(
61        &self,
62        wallet: &WalletAddress,
63        realized_pnl: Decimal,
64    ) -> Result<(), LedgerError>;
65
66    /// Set the balance directly (for deposits, withdrawals, and tests).
67    ///
68    /// Creates the account if it doesn't exist.
69    async fn set_balance(
70        &self,
71        wallet: &WalletAddress,
72        balance: Decimal,
73    ) -> Result<(), LedgerError>;
74
75    /// Apply option premium settlement to an account's balance.
76    ///
77    /// This is called for Standard margin mode accounts on option fills:
78    /// - Negative premium_delta: balance decreases (buying options)
79    /// - Positive premium_delta: balance increases (selling options)
80    ///
81    /// Creates the account if it doesn't exist.
82    ///
83    /// Note: For Portfolio margin mode, this is NOT called - premium is
84    /// financed via margin instead of directly debiting the balance.
85    async fn apply_premium(
86        &self,
87        wallet: &WalletAddress,
88        premium_delta: Decimal,
89    ) -> Result<(), LedgerError> {
90        // Default implementation delegates to apply_pnl since they have the same semantics
91        self.apply_pnl(wallet, premium_delta).await
92    }
93}
94
95/// Read-only source for runtime account balances.
96#[async_trait]
97pub trait BalanceProvider: Send + Sync {
98    async fn get_balance(&self, wallet: &WalletAddress) -> Result<Decimal, LedgerError>;
99}
100
101pub struct LedgerBalanceProvider {
102    ledger: Arc<dyn Ledger + Send + Sync>,
103}
104
105impl LedgerBalanceProvider {
106    pub fn new(ledger: Arc<dyn Ledger + Send + Sync>) -> Self {
107        Self { ledger }
108    }
109}
110
111#[async_trait]
112impl BalanceProvider for LedgerBalanceProvider {
113    async fn get_balance(&self, wallet: &WalletAddress) -> Result<Decimal, LedgerError> {
114        self.ledger.get_balance(wallet).await
115    }
116}
117
118/// Engine-owned runtime balance ledger.
119#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
120pub struct BalanceLedger {
121    balances: HashMap<WalletAddress, Decimal>,
122    last_balance_update_seq: u64,
123    last_applied_balance_update: Option<BalanceUpdate>,
124}
125
126impl BalanceLedger {
127    pub fn new() -> Self {
128        Self::default()
129    }
130
131    pub fn from_map(balances: HashMap<WalletAddress, Decimal>) -> Self {
132        Self::from_map_with_sequence(balances, 0)
133    }
134
135    pub fn from_map_with_sequence(
136        balances: HashMap<WalletAddress, Decimal>,
137        last_balance_update_seq: u64,
138    ) -> Self {
139        let mut ledger = Self::new();
140        for (wallet, balance) in balances {
141            ledger.set(wallet, balance);
142        }
143        ledger.last_balance_update_seq = last_balance_update_seq;
144        ledger
145    }
146
147    pub fn balance(&self, wallet: &WalletAddress) -> Decimal {
148        self.balances.get(wallet).copied().unwrap_or(Decimal::ZERO)
149    }
150
151    pub fn balances(&self) -> &HashMap<WalletAddress, Decimal> {
152        &self.balances
153    }
154
155    pub fn last_balance_update_seq(&self) -> u64 {
156        self.last_balance_update_seq
157    }
158
159    pub fn next_balance_update_seq(&self) -> u64 {
160        self.last_balance_update_seq
161            .checked_add(1)
162            .expect("STATE_CORRUPTION: balance_update_seq overflow")
163    }
164
165    pub fn apply_balance_update(&mut self, update: &BalanceUpdate) -> Result<bool, LedgerError> {
166        if update.balance_update_seq == self.last_balance_update_seq {
167            match self.last_applied_balance_update.as_ref() {
168                Some(last_update) if last_update == update => return Ok(false),
169                Some(_) => {
170                    return Err(LedgerError::InvalidOperation(format!(
171                        "balance update seq {} duplicate payload mismatch",
172                        update.balance_update_seq
173                    )));
174                }
175                None => return Ok(false),
176            }
177        }
178
179        if update.balance_update_seq < self.last_balance_update_seq {
180            return Err(LedgerError::InvalidOperation(format!(
181                "balance update seq {} is older than last applied seq {}",
182                update.balance_update_seq, self.last_balance_update_seq
183            )));
184        }
185
186        let expected_seq = self.next_balance_update_seq();
187        if update.balance_update_seq != expected_seq {
188            return Err(LedgerError::InvalidOperation(format!(
189                "balance update seq gap: expected {}, got {}",
190                expected_seq, update.balance_update_seq
191            )));
192        }
193
194        let current = self.balance(&update.wallet);
195        let expected_balance_after = current + update.delta;
196        if expected_balance_after != update.balance_after {
197            return Err(LedgerError::InvalidOperation(format!(
198                "balance update seq {} for {} has invalid balance_after: current {} + delta {} = {}, got {}",
199                update.balance_update_seq,
200                update.wallet,
201                current,
202                update.delta,
203                expected_balance_after,
204                update.balance_after
205            )));
206        }
207
208        self.set(update.wallet, update.balance_after);
209        self.last_balance_update_seq = update.balance_update_seq;
210        self.last_applied_balance_update = Some(update.clone());
211        Ok(true)
212    }
213
214    pub fn set(&mut self, wallet: WalletAddress, balance: Decimal) {
215        if balance == Decimal::ZERO {
216            self.balances.remove(&wallet);
217        } else {
218            self.balances.insert(wallet, balance);
219        }
220    }
221
222    pub fn insert(&mut self, wallet: WalletAddress, balance: Decimal) -> Option<Decimal> {
223        let previous = self.balances.get(&wallet).copied();
224        self.set(wallet, balance);
225        previous
226    }
227
228    pub fn add_delta(&mut self, wallet: WalletAddress, delta: Decimal) {
229        if delta == Decimal::ZERO {
230            return;
231        }
232
233        let next = self.balance(&wallet) + delta;
234        self.set(wallet, next);
235    }
236
237    pub fn remove_zero(&mut self, wallet: &WalletAddress) {
238        if self.balance(wallet) == Decimal::ZERO {
239            self.balances.remove(wallet);
240        }
241    }
242
243    pub fn snapshot_map(&self) -> HashMap<WalletAddress, Decimal> {
244        self.balances.clone()
245    }
246
247    pub fn sorted_nonzero_entries(&self) -> Vec<(WalletAddress, Decimal)> {
248        let mut entries: Vec<_> = self
249            .balances
250            .iter()
251            .filter_map(|(wallet, balance)| {
252                if *balance == Decimal::ZERO {
253                    None
254                } else {
255                    Some((*wallet, *balance))
256                }
257            })
258            .collect();
259        entries.sort_by_key(|(wallet, _)| *wallet);
260        entries
261    }
262
263    pub fn nonzero_wallets(&self) -> Vec<WalletAddress> {
264        self.sorted_nonzero_entries()
265            .into_iter()
266            .map(|(wallet, _)| wallet)
267            .collect()
268    }
269}
270
271impl Deref for BalanceLedger {
272    type Target = HashMap<WalletAddress, Decimal>;
273
274    fn deref(&self) -> &Self::Target {
275        &self.balances
276    }
277}
278
279impl DerefMut for BalanceLedger {
280    fn deref_mut(&mut self) -> &mut Self::Target {
281        &mut self.balances
282    }
283}
284
285/// In-memory test double for code that still exercises the async durable ledger
286/// trait.
287#[derive(Default)]
288pub struct InMemoryLedger {
289    balances: std::sync::Arc<tokio::sync::RwLock<BalanceLedger>>,
290}
291
292impl InMemoryLedger {
293    pub fn new() -> Self {
294        Self::default()
295    }
296
297    pub async fn nonzero_wallets(&self) -> Vec<WalletAddress> {
298        let balances = self.balances.read().await;
299        balances.nonzero_wallets()
300    }
301}
302
303#[async_trait]
304impl Ledger for InMemoryLedger {
305    async fn get_balance(&self, wallet: &WalletAddress) -> Result<Decimal, LedgerError> {
306        let balances = self.balances.read().await;
307        Ok(balances.balance(wallet))
308    }
309
310    async fn apply_pnl(
311        &self,
312        wallet: &WalletAddress,
313        realized_pnl: Decimal,
314    ) -> Result<(), LedgerError> {
315        let mut balances = self.balances.write().await;
316        balances.add_delta(*wallet, realized_pnl);
317        Ok(())
318    }
319
320    async fn set_balance(
321        &self,
322        wallet: &WalletAddress,
323        balance: Decimal,
324    ) -> Result<(), LedgerError> {
325        let mut balances = self.balances.write().await;
326        balances.set(*wallet, balance);
327        Ok(())
328    }
329}
330
331/// Mock balance provider for tests -- always returns zero balance.
332#[cfg(any(test, feature = "test-utils"))]
333pub struct MockBalanceProvider;
334
335#[cfg(any(test, feature = "test-utils"))]
336#[async_trait]
337impl BalanceProvider for MockBalanceProvider {
338    async fn get_balance(&self, _wallet: &WalletAddress) -> Result<Decimal, LedgerError> {
339        Ok(Decimal::ZERO)
340    }
341}
342
343#[cfg(test)]
344mod tests {
345    use super::*;
346    use hypercall_types::{wallet_address::test_wallet, BalanceUpdateReason};
347    use rust_decimal_macros::dec;
348
349    fn balance_update(
350        ledger: &BalanceLedger,
351        wallet: WalletAddress,
352        delta: Decimal,
353    ) -> BalanceUpdate {
354        BalanceUpdate {
355            balance_update_seq: ledger.next_balance_update_seq(),
356            wallet,
357            delta,
358            balance_after: ledger.balance(&wallet) + delta,
359            reason: BalanceUpdateReason::Deposit,
360            reference_id: Some("test-reference".to_string()),
361            source_command_id: Some(1),
362            timestamp_ms: 1234,
363        }
364    }
365
366    fn sequenced_balance_update(
367        seq: u64,
368        wallet: WalletAddress,
369        delta: Decimal,
370        balance_after: Decimal,
371    ) -> BalanceUpdate {
372        BalanceUpdate {
373            balance_update_seq: seq,
374            wallet,
375            delta,
376            balance_after,
377            reason: BalanceUpdateReason::Deposit,
378            reference_id: Some(format!("test-reference-{seq}")),
379            source_command_id: Some(seq as i64),
380            timestamp_ms: 1234 + seq,
381        }
382    }
383
384    #[tokio::test]
385    async fn test_in_memory_ledger_get_balance_nonexistent() {
386        let ledger = InMemoryLedger::new();
387        let balance = ledger.get_balance(&test_wallet(1)).await.unwrap();
388        assert_eq!(balance, dec!(0));
389    }
390
391    #[tokio::test]
392    async fn test_in_memory_ledger_set_and_get_balance() {
393        let ledger = InMemoryLedger::new();
394        ledger
395            .set_balance(&test_wallet(1), dec!(10000))
396            .await
397            .unwrap();
398        let balance = ledger.get_balance(&test_wallet(1)).await.unwrap();
399        assert_eq!(balance, dec!(10000));
400    }
401
402    #[tokio::test]
403    async fn test_in_memory_ledger_nonzero_wallets() {
404        let ledger = InMemoryLedger::new();
405        ledger.set_balance(&test_wallet(1), dec!(25)).await.unwrap();
406        ledger
407            .set_balance(&test_wallet(2), rust_decimal::Decimal::ZERO)
408            .await
409            .unwrap();
410        ledger.set_balance(&test_wallet(3), dec!(-5)).await.unwrap();
411
412        let mut wallets = ledger.nonzero_wallets().await;
413        wallets.sort_unstable();
414
415        assert_eq!(wallets, vec![test_wallet(1), test_wallet(3)]);
416    }
417
418    #[test]
419    fn apply_balance_update_validates_and_applies_delta() {
420        let wallet = test_wallet(5);
421        let mut ledger = BalanceLedger::new();
422        ledger.set(wallet, dec!(100));
423
424        let update = balance_update(&ledger, wallet, dec!(25));
425        assert!(ledger.apply_balance_update(&update).unwrap());
426
427        assert_eq!(ledger.balance(&wallet), dec!(125));
428        assert_eq!(ledger.last_balance_update_seq(), update.balance_update_seq);
429    }
430
431    #[test]
432    fn apply_balance_update_removes_zero_balance() {
433        let wallet = test_wallet(6);
434        let mut ledger = BalanceLedger::new();
435        ledger.set(wallet, dec!(100));
436
437        let update = balance_update(&ledger, wallet, dec!(-100));
438        ledger.apply_balance_update(&update).unwrap();
439
440        assert_eq!(ledger.balance(&wallet), Decimal::ZERO);
441        assert!(!ledger.snapshot_map().contains_key(&wallet));
442    }
443
444    #[test]
445    fn apply_balance_update_rejects_invalid_balance_after() {
446        let wallet = test_wallet(7);
447        let mut ledger = BalanceLedger::new();
448        ledger.set(wallet, dec!(100));
449
450        let mut update = balance_update(&ledger, wallet, dec!(25));
451        update.balance_after = dec!(126);
452
453        let error = ledger.apply_balance_update(&update).unwrap_err();
454        assert!(
455            error.to_string().contains("has invalid balance_after"),
456            "unexpected error: {error}"
457        );
458        assert_eq!(ledger.balance(&wallet), dec!(100));
459    }
460
461    #[test]
462    fn apply_balance_update_rejects_sequence_gap() {
463        let wallet = test_wallet(8);
464        let mut ledger = BalanceLedger::new();
465
466        let mut update = balance_update(&ledger, wallet, dec!(10));
467        update.balance_update_seq += 1;
468
469        let error = ledger.apply_balance_update(&update).unwrap_err();
470        assert!(
471            error.to_string().contains("balance update seq gap"),
472            "unexpected error: {error}"
473        );
474    }
475
476    #[test]
477    fn apply_balance_update_accepts_exact_duplicate() {
478        let wallet = test_wallet(9);
479        let mut ledger = BalanceLedger::new();
480
481        let update = balance_update(&ledger, wallet, dec!(10));
482        assert!(ledger.apply_balance_update(&update).unwrap());
483        assert!(!ledger.apply_balance_update(&update).unwrap());
484        assert_eq!(ledger.balance(&wallet), dec!(10));
485    }
486
487    #[test]
488    fn apply_balance_update_rejects_duplicate_payload_mismatch() {
489        let wallet = test_wallet(10);
490        let mut ledger = BalanceLedger::new();
491
492        let update = balance_update(&ledger, wallet, dec!(10));
493        ledger.apply_balance_update(&update).unwrap();
494
495        let mut duplicate = update.clone();
496        duplicate.reference_id = Some("different".to_string());
497
498        let error = ledger.apply_balance_update(&duplicate).unwrap_err();
499        assert!(
500            error.to_string().contains("duplicate payload mismatch"),
501            "unexpected error: {error}"
502        );
503    }
504
505    #[test]
506    fn follower_replay_from_snapshot_converges_to_engine_ledger() {
507        let wallet_a = test_wallet(11);
508        let wallet_b = test_wallet(12);
509        let mut snapshot_balances = HashMap::new();
510        snapshot_balances.insert(wallet_a, dec!(100));
511        snapshot_balances.insert(wallet_b, dec!(50));
512        let mut follower = BalanceLedger::from_map_with_sequence(snapshot_balances, 10);
513
514        let updates = [
515            sequenced_balance_update(11, wallet_a, dec!(25), dec!(125)),
516            sequenced_balance_update(12, wallet_b, dec!(-50), dec!(0)),
517            sequenced_balance_update(13, wallet_a, dec!(-5), dec!(120)),
518        ];
519        for update in &updates {
520            assert!(follower.apply_balance_update(update).unwrap());
521        }
522
523        let mut expected = BalanceLedger::new();
524        expected.set(wallet_a, dec!(120));
525        expected.last_balance_update_seq = 13;
526        expected.last_applied_balance_update = Some(updates[2].clone());
527        assert_eq!(follower, expected);
528        assert!(!follower.snapshot_map().contains_key(&wallet_b));
529    }
530
531    #[test]
532    fn follower_replay_duplicate_after_snapshot_is_noop() {
533        let wallet = test_wallet(13);
534        let mut snapshot_balances = HashMap::new();
535        snapshot_balances.insert(wallet, dec!(100));
536        let mut follower = BalanceLedger::from_map_with_sequence(snapshot_balances, 2);
537        let update = sequenced_balance_update(3, wallet, dec!(10), dec!(110));
538
539        assert!(follower.apply_balance_update(&update).unwrap());
540        assert!(!follower.apply_balance_update(&update).unwrap());
541        assert_eq!(follower.balance(&wallet), dec!(110));
542        assert_eq!(follower.last_balance_update_seq(), 3);
543    }
544
545    #[test]
546    fn follower_replay_duplicate_snapshot_boundary_without_payload_is_noop() {
547        let wallet = test_wallet(15);
548        let mut snapshot_balances = HashMap::new();
549        snapshot_balances.insert(wallet, dec!(100));
550        let mut follower = BalanceLedger::from_map_with_sequence(snapshot_balances, 3);
551        let update = sequenced_balance_update(3, wallet, dec!(10), dec!(110));
552
553        assert!(!follower.apply_balance_update(&update).unwrap());
554        assert_eq!(follower.balance(&wallet), dec!(100));
555        assert_eq!(follower.last_balance_update_seq(), 3);
556    }
557
558    #[test]
559    fn follower_replay_gap_after_snapshot_fails_closed() {
560        let wallet = test_wallet(14);
561        let mut snapshot_balances = HashMap::new();
562        snapshot_balances.insert(wallet, dec!(100));
563        let mut follower = BalanceLedger::from_map_with_sequence(snapshot_balances, 7);
564        let update = sequenced_balance_update(9, wallet, dec!(10), dec!(110));
565
566        let error = follower.apply_balance_update(&update).unwrap_err();
567        assert!(
568            error.to_string().contains("seq gap"),
569            "unexpected error: {error}"
570        );
571        assert_eq!(follower.balance(&wallet), dec!(100));
572        assert_eq!(follower.last_balance_update_seq(), 7);
573    }
574
575    #[tokio::test]
576    async fn test_in_memory_ledger_apply_pnl_positive() {
577        let ledger = InMemoryLedger::new();
578        ledger
579            .set_balance(&test_wallet(1), dec!(10000))
580            .await
581            .unwrap();
582        ledger.apply_pnl(&test_wallet(1), dec!(500)).await.unwrap();
583        let balance = ledger.get_balance(&test_wallet(1)).await.unwrap();
584        assert_eq!(balance, dec!(10500));
585    }
586
587    #[tokio::test]
588    async fn test_in_memory_ledger_apply_pnl_negative() {
589        let ledger = InMemoryLedger::new();
590        ledger
591            .set_balance(&test_wallet(1), dec!(10000))
592            .await
593            .unwrap();
594        ledger.apply_pnl(&test_wallet(1), dec!(-300)).await.unwrap();
595        let balance = ledger.get_balance(&test_wallet(1)).await.unwrap();
596        assert_eq!(balance, dec!(9700));
597    }
598
599    #[tokio::test]
600    async fn test_in_memory_ledger_apply_pnl_creates_account() {
601        let ledger = InMemoryLedger::new();
602        // Apply PnL to non-existent account
603        ledger.apply_pnl(&test_wallet(1), dec!(500)).await.unwrap();
604        let balance = ledger.get_balance(&test_wallet(1)).await.unwrap();
605        assert_eq!(balance, dec!(500));
606    }
607}