1use crate::portfolio::HypercorePositionUpdate;
2use crate::portfolio::{
3 canonical_perp_symbol, PortfolioBalance, PortfolioChange, PortfolioError, PortfolioService,
4 PositionChange, PositionData,
5};
6use crate::read_cache::tier::TierCache;
7use crate::rsm::ledger::Ledger;
8use crate::rsm::MarginMode;
9use crate::shared::order_types::ParsedSymbol;
10use crate::snapshot::error::SnapshotError;
11use crate::snapshot::traits::Snapshotable;
12use anyhow::Result;
13use async_trait::async_trait;
14use hypercall_engine::{
15 calculate_fill_accounting as calculate_engine_fill_accounting, fill_premium_delta,
16 EnginePosition, FillAccountingContext, FillAccountingPosition, FillCashSettlement,
17};
18use hypercall_types::api_models::{Portfolio, Position, PositionWithMetrics};
19use hypercall_types::EngineMessage;
20use hypercall_types::FillAccounting;
21use hypercall_types::Side;
22use hypercall_types::{to_human_readable_decimal, WalletAddress};
23use rust_decimal::Decimal;
24use rust_decimal_macros::dec;
25use std::collections::HashMap;
26use std::str::FromStr;
27use std::sync::Arc;
28use tokio::sync::RwLock;
29use tracing::{debug, error, info, warn};
30
31pub struct PortfolioServiceImpl {
40 portfolios: Arc<RwLock<HashMap<WalletAddress, PortfolioBalance>>>,
42 ledger: Arc<RwLock<Option<Arc<dyn Ledger + Send + Sync>>>>,
45 tier_cache: Arc<RwLock<Option<Arc<TierCache>>>>,
48 perp_position_timestamps: Arc<RwLock<HashMap<(WalletAddress, String), u64>>>,
51}
52
53fn calculate_position_metrics(position: Position) -> PositionWithMetrics {
54 let notional_value = position.amount * position.entry_price;
55 let margin_ratio = if notional_value.abs() > dec!(0) {
56 position.margin_posted / notional_value.abs()
57 } else {
58 dec!(0)
59 };
60
61 PositionWithMetrics {
62 position,
63 notional_value,
64 maintenance_margin: dec!(0),
65 liquidation_price: dec!(0),
66 margin_ratio,
67 }
68}
69
70impl PortfolioServiceImpl {
71 pub fn new() -> Self {
73 Self {
74 portfolios: Arc::new(RwLock::new(HashMap::new())),
75 ledger: Arc::new(RwLock::new(None)),
76 tier_cache: Arc::new(RwLock::new(None)),
77 perp_position_timestamps: Arc::new(RwLock::new(HashMap::new())),
78 }
79 }
80
81 pub async fn set_ledger(&self, ledger: Arc<dyn Ledger + Send + Sync>) {
83 let mut l = self.ledger.write().await;
84 *l = Some(ledger);
85 }
86
87 pub async fn set_tier_cache(&self, tier_cache: Arc<TierCache>) {
92 let mut tc = self.tier_cache.write().await;
93 *tc = Some(tier_cache);
94 }
95
96 async fn apply_option_premium_if_standard(
104 &self,
105 wallet: &WalletAddress,
106 symbol: &str,
107 side: &Side,
108 price: Decimal,
109 size: Decimal,
110 ) -> Result<(), PortfolioError> {
111 if ParsedSymbol::from_symbol(symbol).is_err() {
112 return Ok(());
113 }
114
115 let ledger = self.ledger.read().await.clone();
116 let Some(ledger) = ledger else {
117 debug!(
118 "No ledger configured, skipping premium settlement lookup for {}",
119 wallet
120 );
121 return Ok(());
122 };
123
124 if !self
125 .wallet_uses_standard_option_premium(wallet, symbol)
126 .await?
127 {
128 debug!(
129 "Skipping premium settlement for {} - Portfolio mode",
130 wallet
131 );
132 return Ok(());
133 }
134
135 let premium_delta = fill_premium_delta(*side, price, size);
136
137 ledger
138 .apply_premium(wallet, premium_delta)
139 .await
140 .map_err(|e| {
141 error!(
142 "CRITICAL: Failed to apply option premium {} to ledger for wallet {}: {:?}",
143 premium_delta, wallet, e
144 );
145 PortfolioError::LedgerError(format!(
146 "Failed to apply option premium {} for {}: {:?}",
147 premium_delta, wallet, e
148 ))
149 })?;
150 info!(
151 "Applied option premium {} to ledger for wallet {} (symbol: {}, side: {:?}, price: {}, size: {})",
152 premium_delta, wallet, symbol, side, price, size
153 );
154
155 Ok(())
156 }
157
158 pub async fn reset_from_snapshots(
162 &self,
163 snapshots: HashMap<WalletAddress, PortfolioBalance>,
164 ) -> Result<()> {
165 let mut portfolios = self.portfolios.write().await;
166 *portfolios = snapshots;
167 Ok(())
168 }
169
170 pub async fn all_portfolios(&self) -> HashMap<WalletAddress, PortfolioBalance> {
174 let portfolios = self.portfolios.read().await;
175 portfolios.clone()
176 }
177
178 pub async fn get_portfolio_balance(&self, account: &WalletAddress) -> Option<PortfolioBalance> {
183 let portfolios = self.portfolios.read().await;
184 portfolios.get(account).cloned()
185 }
186
187 pub async fn update_market_prices(&self, prices: HashMap<String, Decimal>) {
191 let mut portfolios = self.portfolios.write().await;
192
193 for (wallet, portfolio) in portfolios.iter_mut() {
194 for (symbol, position) in portfolio.positions.iter_mut() {
195 if let Some(&market_price) = prices.get(symbol) {
196 let new_unrealized_pnl =
197 (market_price - position.entry_price) * position.amount;
198 debug!(
199 wallet = %wallet,
200 symbol,
201 amount = %position.amount,
202 entry_price = %position.entry_price,
203 market_price = %market_price,
204 previous_unrealized_pnl = %position.unrealized_pnl,
205 new_unrealized_pnl = %new_unrealized_pnl,
206 "Applied repriced market price to portfolio position"
207 );
208 position.unrealized_pnl = new_unrealized_pnl;
209 }
210 }
211 }
212 }
213
214 #[allow(dead_code)] async fn handle_fill(
226 &self,
227 wallet: &WalletAddress,
228 symbol: &str,
229 side: Side,
230 price: Decimal,
231 quantity: Decimal,
232 ) -> Decimal {
233 static TEST_TRADE_ID: std::sync::atomic::AtomicU64 =
234 std::sync::atomic::AtomicU64::new(1_000_000_000);
235 let trade_id = TEST_TRADE_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
236
237 let mut portfolios = self.portfolios.write().await;
238 Self::apply_fill_locked(
239 &mut portfolios,
240 wallet,
241 trade_id,
242 symbol,
243 side,
244 price,
245 quantity,
246 )
247 }
248
249 fn update_position_for_fill(
272 portfolio: &mut PortfolioBalance,
273 symbol: &str,
274 side: &Side,
275 price: Decimal,
276 quantity: Decimal,
277 ) -> Decimal {
278 let position = portfolio
279 .positions
280 .entry(symbol.to_string())
281 .or_insert_with(|| PositionData {
282 symbol: symbol.to_string(),
283 amount: dec!(0),
284 entry_price: dec!(0),
285 margin_posted: dec!(0),
286 realized_pnl: dec!(0),
287 unrealized_pnl: dec!(0),
288 });
289
290 let signed_quantity = match side {
291 Side::Buy => quantity,
292 Side::Sell => -quantity,
293 };
294 let engine_position = EnginePosition {
295 quantity: position.amount,
296 entry_price: position.entry_price,
297 };
298 let transition =
299 EnginePosition::fill_transition(Some(&engine_position), signed_quantity, price);
300
301 if transition.realized_pnl != dec!(0) {
303 position.realized_pnl += transition.realized_pnl;
304 }
305 position.amount = transition.quantity;
306 position.entry_price = transition.entry_price;
307
308 if position.amount == dec!(0) {
311 position.unrealized_pnl = dec!(0);
312 }
313
314 if position.amount.abs() < dec!(0.00000001) {
316 portfolio.positions.remove(symbol);
317 }
318
319 transition.realized_pnl
320 }
321
322 fn apply_fill_locked(
329 portfolios: &mut HashMap<WalletAddress, PortfolioBalance>,
330 wallet: &WalletAddress,
331 _trade_id: u64,
332 symbol: &str,
333 side: Side,
334 price: Decimal,
335 quantity: Decimal,
336 ) -> Decimal {
337 let portfolio = portfolios
338 .entry(*wallet)
339 .or_insert_with(|| PortfolioBalance {
340 positions: HashMap::new(),
341 total_margin_used: dec!(0),
342 });
343
344 Self::update_position_for_fill(portfolio, symbol, &side, price, quantity)
349 }
350
351 async fn apply_realized_pnl_to_ledger(
360 &self,
361 wallet: &WalletAddress,
362 realized_pnl: Decimal,
363 ) -> Result<(), PortfolioError> {
364 if realized_pnl == dec!(0) {
365 return Ok(());
366 }
367
368 let ledger = self.ledger.read().await;
369 if let Some(ref ledger) = *ledger {
370 ledger.apply_pnl(wallet, realized_pnl).await.map_err(|e| {
371 error!(
372 "CRITICAL: Failed to apply realized PnL {} to ledger for wallet {}: {:?}",
373 realized_pnl, wallet, e
374 );
375 PortfolioError::LedgerError(format!(
376 "Failed to apply realized PnL {} for {}: {:?}",
377 realized_pnl, wallet, e
378 ))
379 })?;
380
381 debug!(
382 "Applied realized PnL {} to ledger for wallet {}",
383 realized_pnl, wallet
384 );
385 }
386 Ok(())
387 }
388
389 pub async fn calculate_fill_accounting(
404 &self,
405 fill: &hypercall_types::Fill,
406 ) -> Result<FillAccounting, PortfolioError> {
407 let taker_uses_premium = self
408 .wallet_uses_standard_option_premium(&fill.taker_wallet_address, &fill.symbol)
409 .await?;
410 let maker_uses_premium = self
411 .wallet_uses_standard_option_premium(&fill.maker_wallet_address, &fill.symbol)
412 .await?;
413 let portfolios = self.portfolios.read().await;
414 Ok(Self::calculate_fill_accounting_against_portfolios(
415 &portfolios,
416 fill,
417 taker_uses_premium,
418 maker_uses_premium,
419 ))
420 }
421
422 fn calculate_fill_accounting_against_portfolios(
423 portfolios: &HashMap<WalletAddress, PortfolioBalance>,
424 fill: &hypercall_types::Fill,
425 taker_uses_premium: bool,
426 maker_uses_premium: bool,
427 ) -> FillAccounting {
428 calculate_engine_fill_accounting(
429 fill,
430 FillAccountingContext {
431 taker_position: Self::fill_accounting_position_for_wallet(
432 portfolios,
433 &fill.taker_wallet_address,
434 &fill.symbol,
435 ),
436 maker_position: Self::fill_accounting_position_for_wallet(
437 portfolios,
438 &fill.maker_wallet_address,
439 &fill.symbol,
440 ),
441 taker_cash_settlement: Self::cash_settlement_for_fill(taker_uses_premium),
442 maker_cash_settlement: Self::cash_settlement_for_fill(maker_uses_premium),
443 },
444 )
445 }
446
447 fn fill_accounting_position_for_wallet(
448 portfolios: &HashMap<WalletAddress, PortfolioBalance>,
449 wallet: &WalletAddress,
450 symbol: &str,
451 ) -> Option<FillAccountingPosition> {
452 portfolios
453 .get(wallet)
454 .and_then(|portfolio| portfolio.positions.get(symbol))
455 .map(|position| FillAccountingPosition {
456 quantity: position.amount,
457 entry_price: position.entry_price,
458 })
459 }
460
461 fn cash_settlement_for_fill(uses_premium: bool) -> FillCashSettlement {
462 if uses_premium {
463 FillCashSettlement::OptionPremium
464 } else {
465 FillCashSettlement::RealizedPnl
466 }
467 }
468
469 async fn wallet_uses_standard_option_premium(
471 &self,
472 wallet: &WalletAddress,
473 symbol: &str,
474 ) -> Result<bool, PortfolioError> {
475 if ParsedSymbol::from_symbol(symbol).is_err() {
476 return Ok(false);
477 }
478
479 let tc = self.tier_cache.read().await;
480 let margin_mode = match tc.as_ref() {
481 Some(tier_cache) => tier_cache.get_margin_mode(wallet).await.map_err(|error| {
482 PortfolioError::InternalError(format!(
483 "failed to load margin mode for premium settlement wallet {}: {}",
484 wallet, error
485 ))
486 })?,
487 None => {
488 return Err(PortfolioError::InternalError(format!(
489 "TierCache not configured for option premium settlement wallet {}",
490 wallet
491 )));
492 }
493 };
494 Ok(margin_mode == MarginMode::Standard)
495 }
496
497 fn get_position_change_locked(
502 portfolios: &HashMap<WalletAddress, PortfolioBalance>,
503 wallet: &WalletAddress,
504 symbol: &str,
505 ) -> PositionChange {
506 if let Some(portfolio) = portfolios.get(wallet) {
507 if let Some(pos) = portfolio.positions.get(symbol) {
508 return PositionChange {
509 symbol: symbol.to_string(),
510 amount: pos.amount,
511 entry_price: pos.entry_price,
512 margin_posted: pos.margin_posted,
513 realized_pnl: pos.realized_pnl,
514 unrealized_pnl: pos.unrealized_pnl,
515 };
516 }
517 }
518 PositionChange {
520 symbol: symbol.to_string(),
521 amount: dec!(0),
522 entry_price: dec!(0),
523 margin_posted: dec!(0),
524 realized_pnl: dec!(0),
525 unrealized_pnl: dec!(0),
526 }
527 }
528
529 pub async fn apply_fill_to_memory(
534 &self,
535 wallet: &WalletAddress,
536 symbol: &str,
537 side: &Side,
538 price: Decimal,
539 quantity: Decimal,
540 ) {
541 let mut portfolios = self.portfolios.write().await;
542
543 let portfolio = portfolios
544 .entry(*wallet)
545 .or_insert_with(|| PortfolioBalance {
546 positions: HashMap::new(),
547 total_margin_used: dec!(0),
548 });
549
550 Self::update_position_for_fill(portfolio, symbol, side, price, quantity);
552
553 debug!(
554 "Applied fill to memory: wallet={}, symbol={}, side={:?}, price={}, qty={}",
555 wallet, symbol, side, price, quantity
556 );
557 }
558
559 pub async fn apply_fill_to_memory_both_sides(
561 &self,
562 taker_wallet: &WalletAddress,
563 maker_wallet: &WalletAddress,
564 symbol: &str,
565 taker_side: &Side,
566 maker_side: &Side,
567 price: Decimal,
568 quantity: Decimal,
569 ) {
570 let mut portfolios = self.portfolios.write().await;
571 let portfolio = portfolios
572 .entry(*taker_wallet)
573 .or_insert_with(|| PortfolioBalance {
574 positions: HashMap::new(),
575 total_margin_used: dec!(0),
576 });
577 Self::update_position_for_fill(portfolio, symbol, taker_side, price, quantity);
578
579 let portfolio = portfolios
580 .entry(*maker_wallet)
581 .or_insert_with(|| PortfolioBalance {
582 positions: HashMap::new(),
583 total_margin_used: dec!(0),
584 });
585 Self::update_position_for_fill(portfolio, symbol, maker_side, price, quantity);
586 }
587
588 pub async fn apply_option_custody_delta(
589 &self,
590 wallet: &WalletAddress,
591 symbol: &str,
592 quantity_delta: Decimal,
593 ) -> PortfolioChange {
594 let mut portfolios = self.portfolios.write().await;
595 let total_margin_used = {
596 let portfolio = portfolios.entry(*wallet).or_default();
597 let position = portfolio
598 .positions
599 .entry(symbol.to_string())
600 .or_insert_with(|| PositionData {
601 symbol: symbol.to_string(),
602 amount: dec!(0),
603 entry_price: dec!(0),
604 margin_posted: dec!(0),
605 realized_pnl: dec!(0),
606 unrealized_pnl: dec!(0),
607 });
608
609 let old_amount = position.amount;
610 let old_entry = position.entry_price;
611 let new_amount = old_amount + quantity_delta;
612
613 if new_amount == dec!(0) {
614 portfolio.positions.remove(symbol);
615 } else {
616 position.amount = new_amount;
617 position.entry_price = if old_amount > dec!(0) {
618 old_entry
619 } else if new_amount < dec!(0) {
620 old_entry
621 } else {
622 dec!(0)
623 };
624 }
625 portfolio.total_margin_used
626 };
627
628 let position_change = Self::get_position_change_locked(&portfolios, wallet, symbol);
629 PortfolioChange {
630 wallet: *wallet,
631 position_changes: vec![position_change],
632 balance_change: None,
633 total_margin_used,
634 }
635 }
636}
637
638impl Default for PortfolioServiceImpl {
639 fn default() -> Self {
640 Self::new()
641 }
642}
643
644#[async_trait]
645impl PortfolioService for PortfolioServiceImpl {
646 async fn get_portfolio(&self, account: &WalletAddress) -> Portfolio {
647 let portfolios = self.portfolios.read().await;
648
649 if let Some(cached) = portfolios.get(account) {
650 let positions: Vec<PositionWithMetrics> = cached
652 .positions
653 .values()
654 .map(|pos| {
655 let position = Position {
656 wallet_address: *account,
657 symbol: pos.symbol.clone(),
658 amount: pos.amount,
659 entry_price: pos.entry_price,
660 margin_posted: pos.margin_posted,
661 realized_pnl: pos.realized_pnl,
662 unrealized_pnl: pos.unrealized_pnl,
663 updated_at: chrono::Utc::now(),
664 };
665 calculate_position_metrics(position)
666 })
667 .collect();
668
669 Portfolio {
670 wallet_address: *account,
671 positions,
672 total_margin_used: cached.total_margin_used,
673 available_balance: dec!(0), span_margin: None, margin_mode: "standard".to_string(), margin_summary: None, }
678 } else {
679 Portfolio {
681 wallet_address: *account,
682 positions: Vec::new(),
683 total_margin_used: dec!(0),
684 available_balance: dec!(0),
685 span_margin: None,
686 margin_mode: "standard".to_string(), margin_summary: None, }
689 }
690 }
691
692 async fn get_portfolio_balance(&self, account: &WalletAddress) -> Option<PortfolioBalance> {
693 PortfolioServiceImpl::get_portfolio_balance(self, account).await
695 }
696
697 async fn all_portfolios(&self) -> HashMap<WalletAddress, PortfolioBalance> {
698 PortfolioServiceImpl::all_portfolios(self).await
700 }
701
702 async fn apply_event(
710 &self,
711 event: &EngineMessage,
712 ) -> Result<Vec<PortfolioChange>, PortfolioError> {
713 match event {
714 EngineMessage::OrderFilled { fill, .. } => {
715 let size_human = to_human_readable_decimal(&fill.symbol, fill.size);
716
717 let maker_side = match fill.taker_side {
719 Side::Buy => Side::Sell,
720 Side::Sell => Side::Buy,
721 };
722
723 let (
729 taker_realized_pnl,
730 maker_realized_pnl,
731 taker_change,
732 maker_change,
733 taker_margin,
734 maker_margin,
735 ) = {
736 let mut portfolios = self.portfolios.write().await;
737
738 let taker_pnl = Self::apply_fill_locked(
739 &mut portfolios,
740 &fill.taker_wallet_address,
741 fill.trade_id,
742 &fill.symbol,
743 fill.taker_side,
744 fill.price,
745 size_human,
746 );
747
748 let maker_pnl = Self::apply_fill_locked(
749 &mut portfolios,
750 &fill.maker_wallet_address,
751 fill.trade_id,
752 &fill.symbol,
753 maker_side,
754 fill.price,
755 size_human,
756 );
757
758 let taker_pos = Self::get_position_change_locked(
760 &portfolios,
761 &fill.taker_wallet_address,
762 &fill.symbol,
763 );
764 let maker_pos = Self::get_position_change_locked(
765 &portfolios,
766 &fill.maker_wallet_address,
767 &fill.symbol,
768 );
769
770 let taker_margin = portfolios
772 .get(&fill.taker_wallet_address)
773 .map(|b| b.total_margin_used)
774 .unwrap_or(dec!(0));
775 let maker_margin = portfolios
776 .get(&fill.maker_wallet_address)
777 .map(|b| b.total_margin_used)
778 .unwrap_or(dec!(0));
779
780 (
781 taker_pnl,
782 maker_pnl,
783 taker_pos,
784 maker_pos,
785 taker_margin,
786 maker_margin,
787 )
788 };
789 self.apply_realized_pnl_to_ledger(&fill.taker_wallet_address, taker_realized_pnl)
794 .await?;
795
796 self.apply_option_premium_if_standard(
799 &fill.taker_wallet_address,
800 &fill.symbol,
801 &fill.taker_side,
802 fill.price,
803 size_human,
804 )
805 .await?;
806
807 self.apply_realized_pnl_to_ledger(&fill.maker_wallet_address, maker_realized_pnl)
810 .await?;
811
812 self.apply_option_premium_if_standard(
815 &fill.maker_wallet_address,
816 &fill.symbol,
817 &maker_side,
818 fill.price,
819 size_human,
820 )
821 .await?;
822
823 let mut changes = Vec::with_capacity(2);
825
826 changes.push(PortfolioChange {
827 wallet: fill.taker_wallet_address,
828 position_changes: vec![taker_change],
829 balance_change: None,
830 total_margin_used: taker_margin,
831 });
832
833 changes.push(PortfolioChange {
834 wallet: fill.maker_wallet_address,
835 position_changes: vec![maker_change],
836 balance_change: None,
837 total_margin_used: maker_margin,
838 });
839
840 return Ok(changes);
841 }
842 EngineMessage::PositionExpired(expiry_msg) => {
843 let wallet = &expiry_msg.wallet_address;
844 let symbol = &expiry_msg.symbol;
845
846 info!(
847 "PortfolioService: Removing expired position for wallet={}, symbol={}",
848 wallet, symbol
849 );
850
851 {
852 let mut portfolios = self.portfolios.write().await;
853
854 if let Some(portfolio) = portfolios.get_mut(wallet) {
855 if portfolio.positions.remove(symbol).is_some() {
856 info!(
857 "PortfolioService: Removed expired position {} for wallet {}",
858 symbol, wallet
859 );
860 } else {
861 warn!(
862 "PortfolioService: Position {} not found for wallet {} during expiry",
863 symbol, wallet
864 );
865 }
866 } else {
867 warn!(
868 "PortfolioService: Portfolio not found for wallet {} during expiry",
869 wallet
870 );
871 }
872 }
873 }
874 _ => {
875 }
877 }
878 Ok(vec![])
879 }
880
881 async fn apply_hypercore_position_update(&self, update: &HypercorePositionUpdate) {
882 self.set_hypercore_position(update).await;
884 }
885
886 async fn set_hypercore_position(&self, update: &HypercorePositionUpdate) {
887 let account_normalized = update.account.to_lowercase();
888
889 let wallet_address = match WalletAddress::from_str(&account_normalized) {
891 Ok(addr) => addr,
892 Err(e) => {
893 tracing::warn!(
894 "Skipping hypercore position update: invalid account address '{}' for coin {}: {}",
895 update.account,
896 update.coin,
897 e
898 );
899 return;
900 }
901 };
902
903 let position_key = canonical_perp_symbol(&update.coin);
904
905 {
909 let mut ts_map = self.perp_position_timestamps.write().await;
910 let ts_key = (wallet_address, position_key.clone());
911 if let Some(&prev_ts) = ts_map.get(&ts_key) {
912 if update.timestamp < prev_ts {
913 debug!(
914 "Dropping stale perp update for {}/{}: ts {} < prev {}",
915 update.account, position_key, update.timestamp, prev_ts
916 );
917 return;
918 }
919 }
920 ts_map.insert(ts_key, update.timestamp);
921 }
922
923 let mut portfolios = self.portfolios.write().await;
924
925 let portfolio = portfolios
926 .entry(wallet_address)
927 .or_insert_with(|| PortfolioBalance {
928 positions: HashMap::new(),
929 total_margin_used: dec!(0),
930 });
931 let raw_position_key = update.coin.trim().to_ascii_uppercase();
932
933 if raw_position_key != position_key {
934 portfolio.positions.remove(&raw_position_key);
935 }
936 if update.coin != raw_position_key && update.coin != position_key {
937 portfolio.positions.remove(&update.coin);
938 }
939
940 if update.size == 0.0 {
941 portfolio.positions.remove(&position_key);
943 debug!(
944 "Removed hypercore position {} for account {}",
945 position_key, update.account
946 );
947 } else {
948 portfolio.positions.insert(
950 position_key.clone(),
951 PositionData {
952 symbol: position_key.clone(),
953 amount: Decimal::from_f64_retain(update.size).unwrap_or(Decimal::ZERO),
954 entry_price: Decimal::from_f64_retain(update.entry_price)
955 .unwrap_or(Decimal::ZERO),
956 margin_posted: dec!(0), realized_pnl: dec!(0), unrealized_pnl: Decimal::from_f64_retain(update.unrealized_pnl)
959 .unwrap_or(Decimal::ZERO),
960 },
961 );
962 debug!(
963 "Set hypercore position {} for account {}: size={}, entry_price={}, unrealized_pnl={}",
964 position_key, update.account, update.size, update.entry_price, update.unrealized_pnl
965 );
966 }
967 }
968
969 fn as_any(&self) -> &dyn std::any::Any {
970 self
971 }
972
973 async fn calculate_fill_accounting(
974 &self,
975 fill: &hypercall_types::Fill,
976 ) -> Result<FillAccounting, PortfolioError> {
977 PortfolioServiceImpl::calculate_fill_accounting(self, fill).await
979 }
980
981 async fn apply_fill_to_memory(
982 &self,
983 wallet: &WalletAddress,
984 symbol: &str,
985 side: &Side,
986 price: Decimal,
987 quantity: Decimal,
988 ) {
989 PortfolioServiceImpl::apply_fill_to_memory(self, wallet, symbol, side, price, quantity)
991 .await
992 }
993
994 async fn remove_expired_position(&self, wallet: &WalletAddress, symbol: &str) {
995 let mut portfolios = self.portfolios.write().await;
996
997 if let Some(portfolio) = portfolios.get_mut(wallet) {
998 if portfolio.positions.remove(symbol).is_some() {
999 info!(
1000 "PortfolioService: Removed expired position {} for wallet {} (replay cleanup, no ledger credit)",
1001 symbol, wallet
1002 );
1003 } else {
1004 debug!(
1005 "PortfolioService: Position {} already removed for wallet {} (replay cleanup)",
1006 symbol, wallet
1007 );
1008 }
1009 } else {
1010 debug!(
1011 "PortfolioService: Portfolio not found for wallet {} during replay cleanup",
1012 wallet
1013 );
1014 }
1015 }
1016}
1017
1018#[async_trait]
1020impl Snapshotable for PortfolioServiceImpl {
1021 type Key = WalletAddress;
1022 type State = PortfolioBalance;
1023
1024 async fn list_all(&self) -> Result<HashMap<Self::Key, Self::State>, SnapshotError> {
1025 Ok(self.all_portfolios().await)
1026 }
1027
1028 async fn restore(&self, key: &Self::Key, state: Self::State) -> Result<(), SnapshotError> {
1029 let mut portfolios = self.portfolios.write().await;
1030 portfolios.insert(*key, state);
1031 Ok(())
1032 }
1033
1034 async fn clear_all(&self) -> Result<(), SnapshotError> {
1035 let mut portfolios = self.portfolios.write().await;
1036 portfolios.clear();
1037 Ok(())
1038 }
1039}
1040
1041#[cfg(test)]
1042mod tests {
1043 use super::*;
1044 use anyhow::Result;
1045 use hypercall_types::Fill;
1046 use hypercall_types::MarginMode as TypeMarginMode;
1047 use hypercall_types::WalletAddress;
1048 use hypercall_types::CONTRACT_UNIT_MULTIPLIER;
1049
1050 use hypercall_types::wallet_address::test_wallet;
1051 use std::sync::atomic::{AtomicU64, Ordering};
1052
1053 static NEXT_TRADE_ID: AtomicU64 = AtomicU64::new(1);
1055
1056 fn create_service() -> PortfolioServiceImpl {
1057 PortfolioServiceImpl::new()
1058 }
1059
1060 struct NoopTierDb;
1061
1062 impl hypercall_db::TierReader for NoopTierDb {
1063 fn get_margin_mode_sync(&self, _wallet: &WalletAddress) -> Result<TypeMarginMode> {
1064 Ok(TypeMarginMode::Standard)
1065 }
1066
1067 fn get_existing_margin_mode_sync(
1068 &self,
1069 _wallet: &WalletAddress,
1070 ) -> Result<Option<TypeMarginMode>> {
1071 Ok(None)
1072 }
1073
1074 fn get_tier_defaults_sync(
1075 &self,
1076 _tier_name: &str,
1077 ) -> Result<Option<hypercall_db::TierDefaultsRecord>> {
1078 Ok(None)
1079 }
1080
1081 fn get_user_tier_sync(
1082 &self,
1083 _wallet: &WalletAddress,
1084 ) -> Result<Option<hypercall_db::UserTierRecord>> {
1085 Ok(None)
1086 }
1087
1088 fn get_all_user_tiers_sync(&self) -> Result<Vec<hypercall_db::UserTierRecord>> {
1089 Ok(Vec::new())
1090 }
1091 }
1092
1093 impl hypercall_db::TierWriter for NoopTierDb {
1094 fn save_user_tier_sync(&self, _update: &hypercall_db::UserTierUpdate) -> Result<()> {
1095 Ok(())
1096 }
1097
1098 fn set_margin_mode_sync(
1099 &self,
1100 _wallet: &WalletAddress,
1101 _margin_mode: TypeMarginMode,
1102 ) -> Result<i64> {
1103 Ok(1)
1104 }
1105
1106 fn insert_margin_mode_if_missing_sync(
1107 &self,
1108 _wallet: &WalletAddress,
1109 _margin_mode: TypeMarginMode,
1110 ) -> Result<Option<i64>> {
1111 Ok(Some(1))
1112 }
1113
1114 fn delete_user_tier_sync(&self, _wallet: &WalletAddress) -> Result<()> {
1115 Ok(())
1116 }
1117 }
1118
1119 async fn attach_tier_cache(
1120 service: &PortfolioServiceImpl,
1121 modes: &[(WalletAddress, TypeMarginMode)],
1122 ) {
1123 let tier_cache = Arc::new(TierCache::new(Arc::new(NoopTierDb)).unwrap());
1124 for (wallet, mode) in modes {
1125 tier_cache.set_margin_mode_in_memory(wallet, *mode).await;
1126 }
1127 service.set_tier_cache(tier_cache).await;
1128 }
1129
1130 fn fill_from_event(event: &EngineMessage) -> Fill {
1131 match event {
1132 EngineMessage::OrderFilled { fill, .. } => fill.clone(),
1133 _ => panic!("expected OrderFilled event"),
1134 }
1135 }
1136
1137 #[tokio::test]
1138 async fn option_custody_delta_updates_read_model_without_cash_effects() {
1139 let service = create_service();
1140 let wallet = test_wallet(231);
1141 let symbol = "BTC-20261231-100000-C";
1142
1143 service
1144 .apply_option_custody_delta(&wallet, symbol, dec!(2))
1145 .await;
1146 let balance = service
1147 .get_portfolio_balance(&wallet)
1148 .await
1149 .expect("portfolio should exist after option deposit");
1150 let position = balance
1151 .positions
1152 .get(symbol)
1153 .expect("position should exist");
1154 assert_eq!(position.amount, dec!(2));
1155 assert_eq!(position.entry_price, dec!(0));
1156
1157 service
1158 .apply_option_custody_delta(&wallet, symbol, dec!(-1))
1159 .await;
1160 let balance = service
1161 .get_portfolio_balance(&wallet)
1162 .await
1163 .expect("portfolio should still exist after partial withdrawal");
1164 assert_eq!(balance.positions[symbol].amount, dec!(1));
1165
1166 service
1167 .apply_option_custody_delta(&wallet, symbol, dec!(-1))
1168 .await;
1169 let balance = service
1170 .get_portfolio_balance(&wallet)
1171 .await
1172 .expect("portfolio should remain for empty account");
1173 assert!(
1174 !balance.positions.contains_key(symbol),
1175 "withdrawal to zero should remove the read-model position"
1176 );
1177 }
1178
1179 fn create_fill(
1181 wallet: WalletAddress,
1182 symbol: &str,
1183 side: Side,
1184 price: f64,
1185 quantity: f64,
1186 ) -> EngineMessage {
1187 let trade_id = NEXT_TRADE_ID.fetch_add(1, Ordering::Relaxed);
1188 create_fill_with_trade_id(trade_id, wallet, symbol, side, price, quantity)
1189 }
1190
1191 fn create_fill_with_trade_id(
1193 trade_id: u64,
1194 wallet: WalletAddress,
1195 symbol: &str,
1196 side: Side,
1197 price: f64,
1198 quantity: f64,
1199 ) -> EngineMessage {
1200 let price_dec = Decimal::from_f64_retain(price).unwrap_or(dec!(0));
1201 let size_dec =
1202 Decimal::from_f64_retain(quantity * CONTRACT_UNIT_MULTIPLIER).unwrap_or(dec!(0));
1203 let fill = Fill {
1204 trade_id,
1205 taker_order_id: 0,
1206 maker_order_id: 0,
1207 symbol: symbol.to_string(),
1208 price: price_dec,
1209 size: size_dec,
1210 taker_side: side,
1211 taker_wallet_address: wallet,
1212 maker_wallet_address: test_wallet(200),
1213 fee: dec!(0),
1214 is_taker: true,
1215 timestamp: 0,
1216 builder_code_address: None,
1217 builder_code_fee: None,
1218 source: Default::default(),
1219 taker_realized_pnl: None,
1220 maker_realized_pnl: None,
1221 underlying_notional: None,
1222 };
1223 EngineMessage::OrderFilled {
1224 accounting: hypercall_engine::FillAccounting::from_fill(&fill),
1225 fill,
1226 }
1227 }
1228
1229 #[tokio::test]
1230 async fn calculate_fill_accounting_standard_buy_uses_premium_only() {
1231 let service = create_service();
1232 let taker = test_wallet(1);
1233 let maker = test_wallet(200);
1234 attach_tier_cache(
1235 &service,
1236 &[
1237 (taker, TypeMarginMode::Standard),
1238 (maker, TypeMarginMode::Standard),
1239 ],
1240 )
1241 .await;
1242 let fill = fill_from_event(&create_fill_with_trade_id(
1243 1001,
1244 taker,
1245 "BTC-20261231-100000-C",
1246 Side::Buy,
1247 100.0,
1248 2.0,
1249 ));
1250
1251 let accounting = service.calculate_fill_accounting(&fill).await.unwrap();
1252
1253 assert_eq!(accounting.taker_premium_delta(), dec!(-200));
1254 assert_eq!(accounting.maker_premium_delta(), dec!(200));
1255 assert_eq!(accounting.taker_ledger_residual_delta(), Decimal::ZERO);
1256 assert_eq!(accounting.maker_ledger_residual_delta(), Decimal::ZERO);
1257 assert_eq!(accounting.taker_net_cash_delta(), dec!(-200));
1258 assert_eq!(accounting.maker_net_cash_delta(), dec!(200));
1259 }
1260
1261 #[tokio::test]
1262 async fn calculate_fill_accounting_standard_sell_uses_premium_only() {
1263 let service = create_service();
1264 let taker = test_wallet(2);
1265 let maker = test_wallet(200);
1266 attach_tier_cache(
1267 &service,
1268 &[
1269 (taker, TypeMarginMode::Standard),
1270 (maker, TypeMarginMode::Standard),
1271 ],
1272 )
1273 .await;
1274 let fill = fill_from_event(&create_fill_with_trade_id(
1275 1002,
1276 taker,
1277 "BTC-20261231-100000-C",
1278 Side::Sell,
1279 100.0,
1280 2.0,
1281 ));
1282
1283 let accounting = service.calculate_fill_accounting(&fill).await.unwrap();
1284
1285 assert_eq!(accounting.taker_premium_delta(), dec!(200));
1286 assert_eq!(accounting.maker_premium_delta(), dec!(-200));
1287 assert_eq!(accounting.taker_ledger_residual_delta(), Decimal::ZERO);
1288 assert_eq!(accounting.maker_ledger_residual_delta(), Decimal::ZERO);
1289 assert_eq!(accounting.taker_net_cash_delta(), dec!(200));
1290 assert_eq!(accounting.maker_net_cash_delta(), dec!(-200));
1291 }
1292
1293 #[tokio::test]
1294 async fn calculate_fill_accounting_standard_close_preserves_reported_pnl() {
1295 let service = create_service();
1296 let taker = test_wallet(5);
1297 let maker = test_wallet(200);
1298 attach_tier_cache(
1299 &service,
1300 &[
1301 (taker, TypeMarginMode::Standard),
1302 (maker, TypeMarginMode::Standard),
1303 ],
1304 )
1305 .await;
1306 service
1307 .apply_fill_to_memory(
1308 &taker,
1309 "BTC-20261231-100000-C",
1310 &Side::Buy,
1311 dec!(100),
1312 dec!(2),
1313 )
1314 .await;
1315 let fill = fill_from_event(&create_fill_with_trade_id(
1316 1005,
1317 taker,
1318 "BTC-20261231-100000-C",
1319 Side::Sell,
1320 70.0,
1321 1.0,
1322 ));
1323
1324 let accounting = service.calculate_fill_accounting(&fill).await.unwrap();
1325
1326 assert_eq!(accounting.taker_realized_pnl, dec!(-30));
1327 assert_eq!(accounting.taker_premium_delta(), dec!(70));
1328 assert_eq!(accounting.taker_ledger_residual_delta(), Decimal::ZERO);
1329 assert_eq!(accounting.taker_net_cash_delta(), dec!(70));
1330 }
1331
1332 #[tokio::test]
1333 async fn calculate_fill_accounting_portfolio_mode_carries_realized_pnl() {
1334 let service = create_service();
1335 let taker = test_wallet(3);
1336 let maker = test_wallet(200);
1337 attach_tier_cache(
1338 &service,
1339 &[
1340 (taker, TypeMarginMode::Portfolio),
1341 (maker, TypeMarginMode::Portfolio),
1342 ],
1343 )
1344 .await;
1345 service
1346 .apply_fill_to_memory(
1347 &taker,
1348 "BTC-20261231-100000-C",
1349 &Side::Buy,
1350 dec!(100),
1351 dec!(2),
1352 )
1353 .await;
1354 let fill = fill_from_event(&create_fill_with_trade_id(
1355 1003,
1356 taker,
1357 "BTC-20261231-100000-C",
1358 Side::Sell,
1359 70.0,
1360 1.0,
1361 ));
1362
1363 let accounting = service.calculate_fill_accounting(&fill).await.unwrap();
1364
1365 assert_eq!(accounting.taker_premium_delta(), Decimal::ZERO);
1366 assert_eq!(accounting.taker_realized_pnl, dec!(-30));
1367 assert_eq!(accounting.taker_ledger_residual_delta(), dec!(-30));
1368 assert_eq!(accounting.taker_net_cash_delta(), dec!(-30));
1369 assert_eq!(accounting.maker_net_cash_delta(), Decimal::ZERO);
1370 }
1371
1372 #[tokio::test]
1373 async fn calculate_fill_accounting_without_position_has_zero_realized_pnl() {
1374 let service = create_service();
1375 let fill = fill_from_event(&create_fill_with_trade_id(
1376 1004,
1377 test_wallet(4),
1378 "BTC-PERP",
1379 Side::Sell,
1380 70.0,
1381 1.0,
1382 ));
1383
1384 let accounting = service.calculate_fill_accounting(&fill).await.unwrap();
1385
1386 assert_eq!(accounting, hypercall_types::FillAccounting::zero(1004));
1387 }
1388
1389 #[tokio::test]
1390 async fn test_handle_buy_fill() {
1391 let service = create_service();
1392
1393 let event = create_fill(test_wallet(1), "BTC-CALL-100000", Side::Buy, 1000.0, 10.0);
1394 service.apply_event(&event).await.unwrap();
1395
1396 let portfolio = service.get_portfolio(&test_wallet(1)).await;
1397 assert_eq!(portfolio.wallet_address, test_wallet(1));
1398 assert_eq!(portfolio.positions.len(), 1);
1399 let position = &portfolio.positions[0].position;
1400 assert_eq!(position.symbol, "BTC-CALL-100000");
1401 assert_eq!(position.amount, dec!(10));
1402 assert_eq!(position.entry_price, dec!(1000));
1403 }
1404
1405 #[tokio::test]
1406 async fn test_handle_sell_fill() {
1407 let service = create_service();
1408
1409 let buy_event = create_fill(test_wallet(1), "BTC-CALL-100000", Side::Buy, 1000.0, 10.0);
1411 service.apply_event(&buy_event).await.unwrap();
1412
1413 let sell_event = create_fill(test_wallet(1), "BTC-CALL-100000", Side::Sell, 1100.0, 5.0);
1415 service.apply_event(&sell_event).await.unwrap();
1416
1417 let portfolio = service.get_portfolio(&test_wallet(1)).await;
1418 assert_eq!(portfolio.positions.len(), 1);
1419 let position = &portfolio.positions[0].position;
1420 assert_eq!(position.amount, dec!(5)); }
1422
1423 #[tokio::test]
1424 async fn test_handle_full_position_close() {
1425 let service = create_service();
1426
1427 let buy_event = create_fill(test_wallet(1), "BTC-CALL-100000", Side::Buy, 1000.0, 10.0);
1429 service.apply_event(&buy_event).await.unwrap();
1430
1431 let sell_event = create_fill(test_wallet(1), "BTC-CALL-100000", Side::Sell, 1100.0, 10.0);
1433 service.apply_event(&sell_event).await.unwrap();
1434
1435 let portfolio = service.get_portfolio(&test_wallet(1)).await;
1437 assert_eq!(portfolio.positions.len(), 0);
1438 assert_eq!(portfolio.total_margin_used, dec!(0));
1439 }
1440
1441 #[tokio::test]
1442 async fn test_handle_short_selling() {
1443 let service = create_service();
1444
1445 let event = create_fill(test_wallet(1), "BTC-CALL-100000", Side::Sell, 1000.0, 10.0);
1447 service.apply_event(&event).await.unwrap();
1448
1449 let portfolio = service.get_portfolio(&test_wallet(1)).await;
1451 assert_eq!(portfolio.positions.len(), 1);
1452 let position = &portfolio.positions[0].position;
1453 assert_eq!(position.amount, dec!(-10)); }
1458
1459 #[tokio::test]
1460 async fn test_buy_to_close_short_partial() {
1461 let service = create_service();
1462
1463 let short_event = create_fill(test_wallet(1), "BTC-CALL-100000", Side::Sell, 1000.0, 10.0);
1465 service.apply_event(&short_event).await.unwrap();
1466
1467 let buy_event = create_fill(test_wallet(1), "BTC-CALL-100000", Side::Buy, 900.0, 5.0);
1469 service.apply_event(&buy_event).await.unwrap();
1470
1471 let portfolios = service.all_portfolios().await;
1473 let wallet_portfolio = portfolios.get(&test_wallet(1)).unwrap();
1474 let position = wallet_portfolio.positions.get("BTC-CALL-100000").unwrap();
1475
1476 assert_eq!(position.amount, dec!(-5));
1478
1479 assert_eq!(position.realized_pnl, dec!(500));
1481
1482 }
1484
1485 #[tokio::test]
1486 async fn test_extend_short_position() {
1487 let service = create_service();
1488
1489 let short_event = create_fill(test_wallet(1), "BTC-CALL-100000", Side::Sell, 1000.0, 10.0);
1491 service.apply_event(&short_event).await.unwrap();
1492
1493 let portfolio = service.get_portfolio(&test_wallet(1)).await;
1494 let position = &portfolio.positions[0].position;
1495 assert_eq!(position.amount, dec!(-10));
1496 assert_eq!(position.entry_price, dec!(1000));
1497
1498 let extend_event = create_fill(test_wallet(1), "BTC-CALL-100000", Side::Sell, 1100.0, 5.0);
1500 service.apply_event(&extend_event).await.unwrap();
1501
1502 let portfolio = service.get_portfolio(&test_wallet(1)).await;
1504 let position = &portfolio.positions[0].position;
1505
1506 assert_eq!(position.amount, dec!(-15));
1508
1509 let expected_entry = (dec!(1000) * dec!(10) + dec!(1100) * dec!(5)) / dec!(15);
1511 assert!((position.entry_price - expected_entry).abs() < dec!(0.01));
1512
1513 }
1515
1516 #[tokio::test]
1517 async fn test_buy_to_close_short_full() {
1518 let service = create_service();
1519
1520 let short_event = create_fill(test_wallet(1), "BTC-CALL-100000", Side::Sell, 1000.0, 10.0);
1522 service.apply_event(&short_event).await.unwrap();
1523
1524 let buy_event = create_fill(test_wallet(1), "BTC-CALL-100000", Side::Buy, 1100.0, 10.0);
1526 service.apply_event(&buy_event).await.unwrap();
1527
1528 let portfolio = service.get_portfolio(&test_wallet(1)).await;
1530
1531 assert_eq!(portfolio.positions.len(), 0);
1533 }
1534
1535 #[tokio::test]
1536 async fn test_sell_to_close_long_with_profit() {
1537 let service = create_service();
1538
1539 let buy_event = create_fill(test_wallet(1), "BTC-CALL-100000", Side::Buy, 1000.0, 10.0);
1541 service.apply_event(&buy_event).await.unwrap();
1542
1543 let sell_event = create_fill(test_wallet(1), "BTC-CALL-100000", Side::Sell, 1200.0, 5.0);
1545 service.apply_event(&sell_event).await.unwrap();
1546
1547 let portfolios = service.all_portfolios().await;
1549 let wallet_portfolio = portfolios.get(&test_wallet(1)).unwrap();
1550 let position = wallet_portfolio.positions.get("BTC-CALL-100000").unwrap();
1551
1552 assert_eq!(position.amount, dec!(5));
1554
1555 assert_eq!(position.realized_pnl, dec!(1000));
1557 }
1558
1559 #[tokio::test]
1560 async fn test_sell_to_close_long_with_loss() {
1561 let service = create_service();
1562
1563 let buy_event = create_fill(test_wallet(1), "BTC-CALL-100000", Side::Buy, 1000.0, 10.0);
1565 service.apply_event(&buy_event).await.unwrap();
1566
1567 let sell_event = create_fill(test_wallet(1), "BTC-CALL-100000", Side::Sell, 800.0, 10.0);
1569 service.apply_event(&sell_event).await.unwrap();
1570
1571 let portfolio = service.get_portfolio(&test_wallet(1)).await;
1573 assert_eq!(portfolio.positions.len(), 0);
1574 }
1575
1576 #[tokio::test]
1577 async fn test_multiple_fills_cumulative_realized_pnl() {
1578 let service = create_service();
1579
1580 let buy_event = create_fill(test_wallet(1), "BTC-CALL-100000", Side::Buy, 1000.0, 10.0);
1582 service.apply_event(&buy_event).await.unwrap();
1583
1584 let sell1_event = create_fill(test_wallet(1), "BTC-CALL-100000", Side::Sell, 1100.0, 5.0);
1586 service.apply_event(&sell1_event).await.unwrap();
1587
1588 {
1589 let portfolios = service.all_portfolios().await;
1590 let wallet_portfolio = portfolios.get(&test_wallet(1)).unwrap();
1591 let position = wallet_portfolio.positions.get("BTC-CALL-100000").unwrap();
1592 assert_eq!(position.realized_pnl, dec!(500));
1593 }
1594
1595 let sell2_event = create_fill(test_wallet(1), "BTC-CALL-100000", Side::Sell, 1200.0, 3.0);
1597 service.apply_event(&sell2_event).await.unwrap();
1598
1599 {
1600 let portfolios = service.all_portfolios().await;
1601 let wallet_portfolio = portfolios.get(&test_wallet(1)).unwrap();
1602 let position = wallet_portfolio.positions.get("BTC-CALL-100000").unwrap();
1603
1604 assert_eq!(position.realized_pnl, dec!(1100));
1606 assert_eq!(position.amount, dec!(2)); }
1608 }
1609
1610 #[tokio::test]
1611 async fn test_multiple_fills_weighted_average() {
1612 let service = create_service();
1613
1614 let buy1_event = create_fill(test_wallet(1), "BTC-CALL-100000", Side::Buy, 1000.0, 10.0);
1616 service.apply_event(&buy1_event).await.unwrap();
1617
1618 let buy2_event = create_fill(test_wallet(1), "BTC-CALL-100000", Side::Buy, 1100.0, 10.0);
1619 service.apply_event(&buy2_event).await.unwrap();
1620
1621 let portfolio = service.get_portfolio(&test_wallet(1)).await;
1623 let position = &portfolio.positions[0].position;
1624 assert_eq!(position.amount, dec!(20));
1625 assert_eq!(position.entry_price, dec!(1050)); }
1627
1628 #[tokio::test]
1629 async fn test_order_filled_message_quantity_is_converted_from_contract_units() {
1630 let service = create_service();
1631
1632 let fill = Fill {
1634 trade_id: 1,
1635 taker_order_id: 100,
1636 maker_order_id: 101,
1637 symbol: "BTC-20260131-100000-C".to_string(),
1638 price: dec!(2000), size: dec!(10_000_000), taker_side: Side::Buy,
1641 taker_wallet_address: test_wallet(1),
1642 maker_wallet_address: test_wallet(2),
1643 fee: dec!(1),
1644 is_taker: true,
1645 timestamp: chrono::Utc::now().timestamp() as u64,
1646 builder_code_address: None,
1647 builder_code_fee: None,
1648 source: Default::default(),
1649 taker_realized_pnl: None,
1650 maker_realized_pnl: None,
1651 underlying_notional: None,
1652 };
1653
1654 service
1656 .apply_event(&EngineMessage::OrderFilled {
1657 accounting: hypercall_engine::FillAccounting::from_fill(&fill),
1658 fill,
1659 })
1660 .await
1661 .unwrap();
1662
1663 let portfolio = service.get_portfolio(&test_wallet(1)).await;
1665
1666 assert_eq!(portfolio.positions.len(), 1);
1668 let position = &portfolio.positions[0].position;
1669 assert_eq!(position.amount, dec!(10), "Position should be 10 contracts");
1670 }
1671
1672 #[tokio::test]
1673 async fn test_flip_long_to_short() {
1674 let service = create_service();
1675
1676 service
1678 .handle_fill(
1679 &test_wallet(1),
1680 "BTC-CALL-100000",
1681 Side::Buy,
1682 dec!(1000),
1683 dec!(10),
1684 )
1685 .await;
1686
1687 let portfolio = service.get_portfolio(&test_wallet(1)).await;
1688 assert_eq!(portfolio.positions.len(), 1);
1689 assert_eq!(portfolio.positions[0].position.amount, dec!(10));
1690
1691 service
1693 .handle_fill(
1694 &test_wallet(1),
1695 "BTC-CALL-100000",
1696 Side::Sell,
1697 dec!(1100),
1698 dec!(15),
1699 )
1700 .await;
1701
1702 let portfolio = service.get_portfolio(&test_wallet(1)).await;
1704 assert_eq!(portfolio.positions.len(), 1);
1705
1706 let position = &portfolio.positions[0].position;
1707 assert_eq!(position.amount, dec!(-5), "Should have 5 short contracts");
1708 assert_eq!(
1709 position.entry_price,
1710 dec!(1100),
1711 "Entry price should be the flip price"
1712 );
1713
1714 }
1716
1717 #[tokio::test]
1718 async fn test_flip_short_to_long() {
1719 let service = create_service();
1720
1721 service
1723 .handle_fill(
1724 &test_wallet(1),
1725 "BTC-CALL-100000",
1726 Side::Sell,
1727 dec!(1000),
1728 dec!(10),
1729 )
1730 .await;
1731
1732 let portfolio = service.get_portfolio(&test_wallet(1)).await;
1733 assert_eq!(portfolio.positions.len(), 1);
1734 assert_eq!(portfolio.positions[0].position.amount, dec!(-10));
1735
1736 service
1738 .handle_fill(
1739 &test_wallet(1),
1740 "BTC-CALL-100000",
1741 Side::Buy,
1742 dec!(900),
1743 dec!(15),
1744 )
1745 .await;
1746
1747 let portfolio = service.get_portfolio(&test_wallet(1)).await;
1749 assert_eq!(portfolio.positions.len(), 1);
1750
1751 let position = &portfolio.positions[0].position;
1752 assert_eq!(position.amount, dec!(5), "Should have 5 long contracts");
1753 assert_eq!(
1754 position.entry_price,
1755 dec!(900),
1756 "Entry price should be the flip price"
1757 );
1758
1759 }
1761
1762 #[tokio::test]
1767 async fn test_ledger_not_touched_on_open() {
1768 use crate::rsm::ledger::InMemoryLedger;
1769
1770 let service = PortfolioServiceImpl::new();
1771 let ledger = Arc::new(InMemoryLedger::new());
1772 service.set_ledger(ledger.clone()).await;
1773
1774 ledger
1776 .set_balance(&test_wallet(1), dec!(10000))
1777 .await
1778 .unwrap();
1779
1780 let event = create_fill(test_wallet(1), "BTC-CALL-100000", Side::Buy, 1000.0, 10.0);
1782 service.apply_event(&event).await.unwrap();
1783
1784 let balance = ledger.get_balance(&test_wallet(1)).await.unwrap();
1786 assert_eq!(
1787 balance,
1788 dec!(10000),
1789 "Opening position should not change ledger balance"
1790 );
1791 }
1792
1793 #[tokio::test]
1794 async fn test_ledger_increases_on_profitable_close() {
1795 use crate::rsm::ledger::InMemoryLedger;
1796
1797 let service = PortfolioServiceImpl::new();
1798 let ledger = Arc::new(InMemoryLedger::new());
1799 service.set_ledger(ledger.clone()).await;
1800
1801 ledger
1803 .set_balance(&test_wallet(1), dec!(10000))
1804 .await
1805 .unwrap();
1806
1807 let buy_event = create_fill(test_wallet(1), "BTC-CALL-100000", Side::Buy, 1000.0, 10.0);
1809 service.apply_event(&buy_event).await.unwrap();
1810
1811 let balance_after_buy = ledger.get_balance(&test_wallet(1)).await.unwrap();
1813 assert_eq!(
1814 balance_after_buy,
1815 dec!(10000),
1816 "Balance should be unchanged after opening"
1817 );
1818
1819 let sell_event = create_fill(test_wallet(1), "BTC-CALL-100000", Side::Sell, 1500.0, 10.0);
1821 service.apply_event(&sell_event).await.unwrap();
1822
1823 let balance_after_sell = ledger.get_balance(&test_wallet(1)).await.unwrap();
1825 assert_eq!(
1826 balance_after_sell,
1827 dec!(15000),
1828 "Balance should increase by realized profit (5000)"
1829 );
1830 }
1831
1832 #[tokio::test]
1833 async fn test_ledger_decreases_on_losing_close() {
1834 use crate::rsm::ledger::InMemoryLedger;
1835
1836 let service = PortfolioServiceImpl::new();
1837 let ledger = Arc::new(InMemoryLedger::new());
1838 service.set_ledger(ledger.clone()).await;
1839
1840 ledger
1842 .set_balance(&test_wallet(1), dec!(10000))
1843 .await
1844 .unwrap();
1845
1846 let buy_event = create_fill(test_wallet(1), "BTC-CALL-100000", Side::Buy, 1000.0, 10.0);
1848 service.apply_event(&buy_event).await.unwrap();
1849
1850 let sell_event = create_fill(test_wallet(1), "BTC-CALL-100000", Side::Sell, 800.0, 10.0);
1852 service.apply_event(&sell_event).await.unwrap();
1853
1854 let balance_after_sell = ledger.get_balance(&test_wallet(1)).await.unwrap();
1856 assert_eq!(
1857 balance_after_sell,
1858 dec!(8000),
1859 "Balance should decrease by realized loss (2000)"
1860 );
1861 }
1862
1863 #[tokio::test]
1864 async fn test_ledger_on_short_close() {
1865 use crate::rsm::ledger::InMemoryLedger;
1866
1867 let service = PortfolioServiceImpl::new();
1868 let ledger = Arc::new(InMemoryLedger::new());
1869 service.set_ledger(ledger.clone()).await;
1870
1871 ledger
1873 .set_balance(&test_wallet(1), dec!(10000))
1874 .await
1875 .unwrap();
1876
1877 let sell_event = create_fill(test_wallet(1), "BTC-CALL-100000", Side::Sell, 1000.0, 10.0);
1879 service.apply_event(&sell_event).await.unwrap();
1880
1881 let balance_after_short = ledger.get_balance(&test_wallet(1)).await.unwrap();
1883 assert_eq!(
1884 balance_after_short,
1885 dec!(10000),
1886 "Balance should be unchanged after opening short"
1887 );
1888
1889 let buy_event = create_fill(test_wallet(1), "BTC-CALL-100000", Side::Buy, 900.0, 10.0);
1891 service.apply_event(&buy_event).await.unwrap();
1892
1893 let balance_after_close = ledger.get_balance(&test_wallet(1)).await.unwrap();
1895 assert_eq!(
1896 balance_after_close,
1897 dec!(11000),
1898 "Balance should increase by profit from closing short (1000)"
1899 );
1900 }
1901
1902 #[tokio::test]
1903 async fn test_partial_close_applies_partial_pnl() {
1904 use crate::rsm::ledger::InMemoryLedger;
1905
1906 let service = PortfolioServiceImpl::new();
1907 let ledger = Arc::new(InMemoryLedger::new());
1908 service.set_ledger(ledger.clone()).await;
1909
1910 ledger
1912 .set_balance(&test_wallet(1), dec!(10000))
1913 .await
1914 .unwrap();
1915
1916 let buy_event = create_fill(test_wallet(1), "BTC-CALL-100000", Side::Buy, 1000.0, 10.0);
1918 service.apply_event(&buy_event).await.unwrap();
1919
1920 let sell_event = create_fill(test_wallet(1), "BTC-CALL-100000", Side::Sell, 1200.0, 5.0);
1922 service.apply_event(&sell_event).await.unwrap();
1923
1924 let balance = ledger.get_balance(&test_wallet(1)).await.unwrap();
1926 assert_eq!(
1927 balance,
1928 dec!(11000),
1929 "Balance should increase by partial profit (1000)"
1930 );
1931
1932 let portfolio = service.get_portfolio(&test_wallet(1)).await;
1934 assert_eq!(portfolio.positions.len(), 1);
1935 assert_eq!(
1936 portfolio.positions[0].position.amount,
1937 dec!(5),
1938 "Should have 5 remaining contracts"
1939 );
1940 }
1941
1942 #[tokio::test]
1953 async fn test_multiple_fills_accumulate() {
1954 let service = create_service();
1955
1956 let fill1 = create_fill_with_trade_id(
1958 1,
1959 test_wallet(1),
1960 "BTC-CALL-100000",
1961 Side::Buy,
1962 1000.0,
1963 10.0,
1964 );
1965 let fill2 = create_fill_with_trade_id(
1966 2,
1967 test_wallet(1),
1968 "BTC-CALL-100000",
1969 Side::Buy,
1970 1000.0,
1971 10.0,
1972 );
1973
1974 service.apply_event(&fill1).await.unwrap();
1975 service.apply_event(&fill2).await.unwrap();
1976
1977 let portfolio = service.get_portfolio(&test_wallet(1)).await;
1979 assert_eq!(portfolio.positions.len(), 1);
1980 assert_eq!(
1981 portfolio.positions[0].position.amount,
1982 dec!(20),
1983 "Should have 20 contracts (both fills applied)"
1984 );
1985 }
1986
1987 use hypercall_types::PositionExpiredMessage;
1992
1993 fn create_position_expired(
1994 wallet: WalletAddress,
1995 symbol: &str,
1996 position_size: Decimal,
1997 settlement_price: Decimal,
1998 settlement_value: Decimal,
1999 ) -> EngineMessage {
2000 EngineMessage::PositionExpired(PositionExpiredMessage {
2001 wallet_address: wallet,
2002 margin_mode: crate::rsm::MarginMode::Standard,
2003 symbol: symbol.to_string(),
2004 position_size,
2005 settlement_price,
2006 settlement_value,
2007 settlement_entry_price: None,
2008 cost_basis: None,
2009 net_pnl: None,
2010 timestamp: 12345,
2011 })
2012 }
2013
2014 #[tokio::test]
2015 async fn test_position_expiry_removes_position() {
2016 use crate::rsm::ledger::InMemoryLedger;
2017
2018 let service = create_service();
2019 let ledger = Arc::new(InMemoryLedger::new());
2020 service.set_ledger(ledger.clone()).await;
2021
2022 let wallet = test_wallet(1);
2023 let symbol = "BTC-CALL-100000";
2024
2025 ledger.set_balance(&wallet, dec!(10000)).await.unwrap();
2027
2028 let fill = create_fill(wallet, symbol, Side::Buy, 500.0, 10.0);
2030 service.apply_event(&fill).await.unwrap();
2031 let balance_after_fill = ledger.get_balance(&wallet).await.unwrap();
2032
2033 let portfolio = service.get_portfolio(&wallet).await;
2035 assert_eq!(portfolio.positions.len(), 1);
2036 assert_eq!(portfolio.positions[0].position.symbol, symbol);
2037 assert_eq!(portfolio.positions[0].position.amount, dec!(10));
2038
2039 let expiry = create_position_expired(
2041 wallet,
2042 symbol,
2043 dec!(10), dec!(5000), dec!(50000), );
2047 service.apply_event(&expiry).await.unwrap();
2048
2049 let portfolio_after = service.get_portfolio(&wallet).await;
2051 assert_eq!(
2052 portfolio_after.positions.len(),
2053 0,
2054 "Position should be removed after expiry"
2055 );
2056
2057 let balance = ledger.get_balance(&wallet).await.unwrap();
2059 assert_eq!(
2060 balance, balance_after_fill,
2061 "PortfolioService expiry projection must not apply settlement cash"
2062 );
2063 }
2064
2065 #[tokio::test]
2066 async fn test_position_expiry_nonexistent_position_is_noop() {
2067 let service = create_service();
2068 let wallet = test_wallet(1);
2069 let symbol = "BTC-CALL-100000";
2070
2071 let expiry = create_position_expired(wallet, symbol, dec!(10), dec!(5000), dec!(50000));
2073
2074 service.apply_event(&expiry).await.unwrap();
2076
2077 let portfolio = service.get_portfolio(&wallet).await;
2079 assert!(portfolio.positions.is_empty());
2080 }
2081
2082 #[tokio::test]
2083 async fn test_position_expiry_zero_settlement_no_ledger_call() {
2084 let service = create_service();
2085 let wallet = test_wallet(1);
2086 let symbol = "BTC-CALL-100000";
2087
2088 let fill = create_fill(wallet, symbol, Side::Buy, 500.0, 10.0);
2090 service.apply_event(&fill).await.unwrap();
2091
2092 let expiry = create_position_expired(
2094 wallet,
2095 symbol,
2096 dec!(10),
2097 dec!(0), dec!(0), );
2100 service.apply_event(&expiry).await.unwrap();
2101
2102 let portfolio = service.get_portfolio(&wallet).await;
2104 assert!(portfolio.positions.is_empty());
2105 }
2106
2107 #[tokio::test]
2108 async fn test_position_expiry_replay_idempotency() {
2109 use crate::rsm::ledger::InMemoryLedger;
2110
2111 let service = create_service();
2112 let ledger = Arc::new(InMemoryLedger::new());
2113 service.set_ledger(ledger.clone()).await;
2114
2115 let wallet = test_wallet(1);
2116 let symbol = "BTC-CALL-100000";
2117
2118 ledger.set_balance(&wallet, dec!(10000)).await.unwrap();
2120
2121 let fill = create_fill(wallet, symbol, Side::Buy, 500.0, 10.0);
2123 service.apply_event(&fill).await.unwrap();
2124 let balance_after_fill = ledger.get_balance(&wallet).await.unwrap();
2125
2126 let expiry = create_position_expired(
2129 wallet,
2130 symbol,
2131 dec!(10), dec!(5000), dec!(50000), );
2135 service.apply_event(&expiry).await.unwrap();
2136
2137 let balance_after_first = ledger.get_balance(&wallet).await.unwrap();
2139 assert_eq!(
2140 balance_after_first, balance_after_fill,
2141 "PortfolioService expiry projection must not apply settlement cash"
2142 );
2143
2144 service.apply_event(&expiry).await.unwrap();
2147
2148 let balance_after_replay = ledger.get_balance(&wallet).await.unwrap();
2150 assert_eq!(
2151 balance_after_replay, balance_after_fill,
2152 "Replay should not change ledger balance"
2153 );
2154
2155 let portfolio = service.get_portfolio(&wallet).await;
2157 assert!(portfolio.positions.is_empty());
2158 }
2159}