Skip to main content

hypercall/portfolio/
portfolio_service_impl.rs

1use crate::portfolio::HypercorePositionUpdate;
2use crate::portfolio::{
3    canonical_perp_symbol, PortfolioBalance, PortfolioChange, PortfolioError, PortfolioService,
4    PositionChange, PositionData,
5};
6use crate::read_cache::tier::TierCache;
7use crate::rsm::ledger::Ledger;
8use crate::rsm::MarginMode;
9use crate::shared::order_types::ParsedSymbol;
10use crate::snapshot::error::SnapshotError;
11use crate::snapshot::traits::Snapshotable;
12use anyhow::Result;
13use async_trait::async_trait;
14use hypercall_engine::{
15    calculate_fill_accounting as calculate_engine_fill_accounting, fill_premium_delta,
16    EnginePosition, FillAccountingContext, FillAccountingPosition, FillCashSettlement,
17};
18use hypercall_types::api_models::{Portfolio, Position, PositionWithMetrics};
19use hypercall_types::EngineMessage;
20use hypercall_types::FillAccounting;
21use hypercall_types::Side;
22use hypercall_types::{to_human_readable_decimal, WalletAddress};
23use rust_decimal::Decimal;
24use rust_decimal_macros::dec;
25use std::collections::HashMap;
26use std::str::FromStr;
27use std::sync::Arc;
28use tokio::sync::RwLock;
29use tracing::{debug, error, info, warn};
30
31/// Implementation of PortfolioService.
32///
33/// Manages in-memory portfolio state with pure state mutations.
34/// Tests can still attach a Ledger to exercise legacy PortfolioService APIs,
35/// but production balance reads are owned by EngineSnapshot.balance_ledger.
36///
37/// Fill persistence is journal-owned. This service only applies the in-memory
38/// position projection for already-authoritative fill events.
39pub struct PortfolioServiceImpl {
40    /// In-memory portfolio state keyed by account address.
41    portfolios: Arc<RwLock<HashMap<WalletAddress, PortfolioBalance>>>,
42    /// Optional legacy ledger used by direct PortfolioService tests.
43    /// Production cache paths do not attach a DB-backed ledger here.
44    ledger: Arc<RwLock<Option<Arc<dyn Ledger + Send + Sync>>>>,
45    /// Optional tier cache for checking margin mode.
46    /// Required for premium settlement in Standard margin mode.
47    tier_cache: Arc<RwLock<Option<Arc<TierCache>>>>,
48    /// Last-seen timestamp per (wallet, symbol) for perp position updates.
49    /// Prevents stale REST-polled updates from overwriting fresher WS-fed data.
50    perp_position_timestamps: Arc<RwLock<HashMap<(WalletAddress, String), u64>>>,
51}
52
53fn calculate_position_metrics(position: Position) -> PositionWithMetrics {
54    let notional_value = position.amount * position.entry_price;
55    let margin_ratio = if notional_value.abs() > dec!(0) {
56        position.margin_posted / notional_value.abs()
57    } else {
58        dec!(0)
59    };
60
61    PositionWithMetrics {
62        position,
63        notional_value,
64        maintenance_margin: dec!(0),
65        liquidation_price: dec!(0),
66        margin_ratio,
67    }
68}
69
70impl PortfolioServiceImpl {
71    /// Create a new portfolio service.
72    pub fn new() -> Self {
73        Self {
74            portfolios: Arc::new(RwLock::new(HashMap::new())),
75            ledger: Arc::new(RwLock::new(None)),
76            tier_cache: Arc::new(RwLock::new(None)),
77            perp_position_timestamps: Arc::new(RwLock::new(HashMap::new())),
78        }
79    }
80
81    /// Set the legacy ledger used by direct PortfolioService tests.
82    pub async fn set_ledger(&self, ledger: Arc<dyn Ledger + Send + Sync>) {
83        let mut l = self.ledger.write().await;
84        *l = Some(ledger);
85    }
86
87    /// Set the tier cache for checking margin mode.
88    ///
89    /// When set, option fills will apply premium settlement for Standard margin mode.
90    /// Premium is debited on buy, credited on sell - directly affecting USDC balance.
91    pub async fn set_tier_cache(&self, tier_cache: Arc<TierCache>) {
92        let mut tc = self.tier_cache.write().await;
93        *tc = Some(tier_cache);
94    }
95
96    /// Apply option premium settlement for Standard margin mode accounts.
97    ///
98    /// In Standard margin mode, option premium is settled in USDC at fill time:
99    /// - Buy option: USDC balance decreases by premium (debit)
100    /// - Sell option: USDC balance increases by premium (credit)
101    ///
102    /// In Portfolio margin mode, this does nothing (premium is financed via margin).
103    async fn apply_option_premium_if_standard(
104        &self,
105        wallet: &WalletAddress,
106        symbol: &str,
107        side: &Side,
108        price: Decimal,
109        size: Decimal,
110    ) -> Result<(), PortfolioError> {
111        if ParsedSymbol::from_symbol(symbol).is_err() {
112            return Ok(());
113        }
114
115        let ledger = self.ledger.read().await.clone();
116        let Some(ledger) = ledger else {
117            debug!(
118                "No ledger configured, skipping premium settlement lookup for {}",
119                wallet
120            );
121            return Ok(());
122        };
123
124        if !self
125            .wallet_uses_standard_option_premium(wallet, symbol)
126            .await?
127        {
128            debug!(
129                "Skipping premium settlement for {} - Portfolio mode",
130                wallet
131            );
132            return Ok(());
133        }
134
135        let premium_delta = fill_premium_delta(*side, price, size);
136
137        ledger
138            .apply_premium(wallet, premium_delta)
139            .await
140            .map_err(|e| {
141                error!(
142                    "CRITICAL: Failed to apply option premium {} to ledger for wallet {}: {:?}",
143                    premium_delta, wallet, e
144                );
145                PortfolioError::LedgerError(format!(
146                    "Failed to apply option premium {} for {}: {:?}",
147                    premium_delta, wallet, e
148                ))
149            })?;
150        info!(
151            "Applied option premium {} to ledger for wallet {} (symbol: {}, side: {:?}, price: {}, size: {})",
152            premium_delta, wallet, symbol, side, price, size
153        );
154
155        Ok(())
156    }
157
158    /// Reset portfolio state from snapshots (for initialization from DB).
159    ///
160    /// Replaces all in-memory state with the provided snapshots.
161    pub async fn reset_from_snapshots(
162        &self,
163        snapshots: HashMap<WalletAddress, PortfolioBalance>,
164    ) -> Result<()> {
165        let mut portfolios = self.portfolios.write().await;
166        *portfolios = snapshots;
167        Ok(())
168    }
169
170    /// Get all portfolios as a snapshot (for persistence).
171    ///
172    /// Returns a clone of all in-memory portfolio state.
173    pub async fn all_portfolios(&self) -> HashMap<WalletAddress, PortfolioBalance> {
174        let portfolios = self.portfolios.read().await;
175        portfolios.clone()
176    }
177
178    /// Get the internal PortfolioBalance for a specific account.
179    ///
180    /// Returns None if account doesn't exist.
181    /// This is for risk/margin calculations - use get_portfolio() for API responses.
182    pub async fn get_portfolio_balance(&self, account: &WalletAddress) -> Option<PortfolioBalance> {
183        let portfolios = self.portfolios.read().await;
184        portfolios.get(account).cloned()
185    }
186
187    /// Update market prices for unrealized PnL calculations.
188    ///
189    /// Not an event - ephemeral mark-to-market updates.
190    pub async fn update_market_prices(&self, prices: HashMap<String, Decimal>) {
191        let mut portfolios = self.portfolios.write().await;
192
193        for (wallet, portfolio) in portfolios.iter_mut() {
194            for (symbol, position) in portfolio.positions.iter_mut() {
195                if let Some(&market_price) = prices.get(symbol) {
196                    let new_unrealized_pnl =
197                        (market_price - position.entry_price) * position.amount;
198                    debug!(
199                        wallet = %wallet,
200                        symbol,
201                        amount = %position.amount,
202                        entry_price = %position.entry_price,
203                        market_price = %market_price,
204                        previous_unrealized_pnl = %position.unrealized_pnl,
205                        new_unrealized_pnl = %new_unrealized_pnl,
206                        "Applied repriced market price to portfolio position"
207                    );
208                    position.unrealized_pnl = new_unrealized_pnl;
209                }
210            }
211        }
212    }
213
214    /// Handle a fill event for a specific wallet (acquires lock internally).
215    ///
216    /// This is for backward compatibility with tests. For atomic taker+maker fills,
217    /// use `apply_fill_locked` directly under a single lock.
218    ///
219    /// Note: Uses trade_id=0 which means idempotency is disabled for test calls.
220    /// Each call will be processed (not skipped).
221    ///
222    /// # Returns
223    /// The realized PnL from closing positions in this fill.
224    #[allow(dead_code)] // Used by tests
225    async fn handle_fill(
226        &self,
227        wallet: &WalletAddress,
228        symbol: &str,
229        side: Side,
230        price: Decimal,
231        quantity: Decimal,
232    ) -> Decimal {
233        static TEST_TRADE_ID: std::sync::atomic::AtomicU64 =
234            std::sync::atomic::AtomicU64::new(1_000_000_000);
235        let trade_id = TEST_TRADE_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
236
237        let mut portfolios = self.portfolios.write().await;
238        Self::apply_fill_locked(
239            &mut portfolios,
240            wallet,
241            trade_id,
242            symbol,
243            side,
244            price,
245            quantity,
246        )
247    }
248
249    /// Apply a fill to a wallet's portfolio without acquiring the lock.
250    ///
251    /// The caller must hold `self.portfolios.write()` and pass the mutable reference.
252    /// This allows atomic application of taker+maker fills under a single lock.
253    ///
254    /// # Idempotency
255    /// If `trade_id` has already been applied to this wallet, the fill is skipped.
256    /// This makes replay safe - applying the same fill twice is a no-op.
257    /// Update position for a fill and return the realized PnL.
258    ///
259    /// This is a pure in-memory operation used by both `apply_fill_locked`
260    /// (for event processing) and `apply_fill_to_memory` (for memory-only updates).
261    ///
262    /// # Arguments
263    /// * `portfolio` - The portfolio to update
264    /// * `symbol` - Trading symbol
265    /// * `side` - Buy or Sell
266    /// * `price` - Fill price
267    /// * `quantity` - Fill quantity (positive)
268    ///
269    /// # Returns
270    /// The realized PnL from closing positions in this fill.
271    fn update_position_for_fill(
272        portfolio: &mut PortfolioBalance,
273        symbol: &str,
274        side: &Side,
275        price: Decimal,
276        quantity: Decimal,
277    ) -> Decimal {
278        let position = portfolio
279            .positions
280            .entry(symbol.to_string())
281            .or_insert_with(|| PositionData {
282                symbol: symbol.to_string(),
283                amount: dec!(0),
284                entry_price: dec!(0),
285                margin_posted: dec!(0),
286                realized_pnl: dec!(0),
287                unrealized_pnl: dec!(0),
288            });
289
290        let signed_quantity = match side {
291            Side::Buy => quantity,
292            Side::Sell => -quantity,
293        };
294        let engine_position = EnginePosition {
295            quantity: position.amount,
296            entry_price: position.entry_price,
297        };
298        let transition =
299            EnginePosition::fill_transition(Some(&engine_position), signed_quantity, price);
300
301        // Track realized PnL
302        if transition.realized_pnl != dec!(0) {
303            position.realized_pnl += transition.realized_pnl;
304        }
305        position.amount = transition.quantity;
306        position.entry_price = transition.entry_price;
307
308        // UPNL is mark-to-market and must be refreshed from theoretical marks.
309        // Do not use fill price as a mark proxy.
310        if position.amount == dec!(0) {
311            position.unrealized_pnl = dec!(0);
312        }
313
314        // Clean up zero positions (using small threshold for Decimal)
315        if position.amount.abs() < dec!(0.00000001) {
316            portfolio.positions.remove(symbol);
317        }
318
319        transition.realized_pnl
320    }
321
322    ///
323    /// # Returns
324    /// The realized PnL from closing positions in this fill.
325    /// - Positive: profit from closing
326    /// - Negative: loss from closing
327    /// - Zero: no position was closed (pure open/add), OR fill was skipped (idempotent)
328    fn apply_fill_locked(
329        portfolios: &mut HashMap<WalletAddress, PortfolioBalance>,
330        wallet: &WalletAddress,
331        _trade_id: u64,
332        symbol: &str,
333        side: Side,
334        price: Decimal,
335        quantity: Decimal,
336    ) -> Decimal {
337        let portfolio = portfolios
338            .entry(*wallet)
339            .or_insert_with(|| PortfolioBalance {
340                positions: HashMap::new(),
341                total_margin_used: dec!(0),
342            });
343
344        // NOTE: Fill idempotency is handled at the DB level via
345        // ON CONFLICT DO NOTHING in apply_fill_with_ledger_sync.
346        // This method is only called when DB insert succeeds.
347
348        Self::update_position_for_fill(portfolio, symbol, &side, price, quantity)
349    }
350
351    /// Apply realized PnL to the legacy test ledger if configured.
352    ///
353    /// This is called after handle_fill when there's a non-zero realized PnL.
354    ///
355    /// # Errors
356    /// Returns `PortfolioError::LedgerError` if the ledger update fails.
357    /// Production fill persistence is journal-owned; runtime collateral comes
358    /// from EngineSnapshot.balance_ledger.
359    async fn apply_realized_pnl_to_ledger(
360        &self,
361        wallet: &WalletAddress,
362        realized_pnl: Decimal,
363    ) -> Result<(), PortfolioError> {
364        if realized_pnl == dec!(0) {
365            return Ok(());
366        }
367
368        let ledger = self.ledger.read().await;
369        if let Some(ref ledger) = *ledger {
370            ledger.apply_pnl(wallet, realized_pnl).await.map_err(|e| {
371                error!(
372                    "CRITICAL: Failed to apply realized PnL {} to ledger for wallet {}: {:?}",
373                    realized_pnl, wallet, e
374                );
375                PortfolioError::LedgerError(format!(
376                    "Failed to apply realized PnL {} for {}: {:?}",
377                    realized_pnl, wallet, e
378                ))
379            })?;
380
381            debug!(
382                "Applied realized PnL {} to ledger for wallet {}",
383                realized_pnl, wallet
384            );
385        }
386        Ok(())
387    }
388
389    // ===== Crash-Safe Fill Processing Methods =====
390
391    /// Calculate the accounting delta that would result from applying a fill.
392    ///
393    /// This is a pure calculation with no side effects.
394    /// Used to determine what to write to the ledger in the atomic DB transaction.
395    ///
396    /// IMPORTANT: For Standard margin mode on OPTIONS, we use premium settlement only.
397    /// The premium flow (buy = debit, sell = credit) already captures the P&L:
398    /// - Buy at $1000 → debit $1000
399    /// - Sell at $700 → credit $700
400    /// - Net = -$300 (loss is implicit in premium flow)
401    ///
402    /// For Portfolio margin or perps, we use realized PnL instead.
403    pub async fn calculate_fill_accounting(
404        &self,
405        fill: &hypercall_types::Fill,
406    ) -> Result<FillAccounting, PortfolioError> {
407        let taker_uses_premium = self
408            .wallet_uses_standard_option_premium(&fill.taker_wallet_address, &fill.symbol)
409            .await?;
410        let maker_uses_premium = self
411            .wallet_uses_standard_option_premium(&fill.maker_wallet_address, &fill.symbol)
412            .await?;
413        let portfolios = self.portfolios.read().await;
414        Ok(Self::calculate_fill_accounting_against_portfolios(
415            &portfolios,
416            fill,
417            taker_uses_premium,
418            maker_uses_premium,
419        ))
420    }
421
422    fn calculate_fill_accounting_against_portfolios(
423        portfolios: &HashMap<WalletAddress, PortfolioBalance>,
424        fill: &hypercall_types::Fill,
425        taker_uses_premium: bool,
426        maker_uses_premium: bool,
427    ) -> FillAccounting {
428        calculate_engine_fill_accounting(
429            fill,
430            FillAccountingContext {
431                taker_position: Self::fill_accounting_position_for_wallet(
432                    portfolios,
433                    &fill.taker_wallet_address,
434                    &fill.symbol,
435                ),
436                maker_position: Self::fill_accounting_position_for_wallet(
437                    portfolios,
438                    &fill.maker_wallet_address,
439                    &fill.symbol,
440                ),
441                taker_cash_settlement: Self::cash_settlement_for_fill(taker_uses_premium),
442                maker_cash_settlement: Self::cash_settlement_for_fill(maker_uses_premium),
443            },
444        )
445    }
446
447    fn fill_accounting_position_for_wallet(
448        portfolios: &HashMap<WalletAddress, PortfolioBalance>,
449        wallet: &WalletAddress,
450        symbol: &str,
451    ) -> Option<FillAccountingPosition> {
452        portfolios
453            .get(wallet)
454            .and_then(|portfolio| portfolio.positions.get(symbol))
455            .map(|position| FillAccountingPosition {
456                quantity: position.amount,
457                entry_price: position.entry_price,
458            })
459    }
460
461    fn cash_settlement_for_fill(uses_premium: bool) -> FillCashSettlement {
462        if uses_premium {
463            FillCashSettlement::OptionPremium
464        } else {
465            FillCashSettlement::RealizedPnl
466        }
467    }
468
469    /// Returns whether this fill should settle option premium through the premium ledger.
470    async fn wallet_uses_standard_option_premium(
471        &self,
472        wallet: &WalletAddress,
473        symbol: &str,
474    ) -> Result<bool, PortfolioError> {
475        if ParsedSymbol::from_symbol(symbol).is_err() {
476            return Ok(false);
477        }
478
479        let tc = self.tier_cache.read().await;
480        let margin_mode = match tc.as_ref() {
481            Some(tier_cache) => tier_cache.get_margin_mode(wallet).await.map_err(|error| {
482                PortfolioError::InternalError(format!(
483                    "failed to load margin mode for premium settlement wallet {}: {}",
484                    wallet, error
485                ))
486            })?,
487            None => {
488                return Err(PortfolioError::InternalError(format!(
489                    "TierCache not configured for option premium settlement wallet {}",
490                    wallet
491                )));
492            }
493        };
494        Ok(margin_mode == MarginMode::Standard)
495    }
496
497    /// Get position change for notification, under lock.
498    ///
499    /// Returns a PositionChange struct capturing the current state of the position.
500    /// If position doesn't exist (closed), returns zeroed values.
501    fn get_position_change_locked(
502        portfolios: &HashMap<WalletAddress, PortfolioBalance>,
503        wallet: &WalletAddress,
504        symbol: &str,
505    ) -> PositionChange {
506        if let Some(portfolio) = portfolios.get(wallet) {
507            if let Some(pos) = portfolio.positions.get(symbol) {
508                return PositionChange {
509                    symbol: symbol.to_string(),
510                    amount: pos.amount,
511                    entry_price: pos.entry_price,
512                    margin_posted: pos.margin_posted,
513                    realized_pnl: pos.realized_pnl,
514                    unrealized_pnl: pos.unrealized_pnl,
515                };
516            }
517        }
518        // Position closed or doesn't exist
519        PositionChange {
520            symbol: symbol.to_string(),
521            amount: dec!(0),
522            entry_price: dec!(0),
523            margin_posted: dec!(0),
524            realized_pnl: dec!(0),
525            unrealized_pnl: dec!(0),
526        }
527    }
528
529    /// Apply a fill's position changes to in-memory state only.
530    ///
531    /// This does NOT update the durable ledger. Live fills are journal-authoritative,
532    /// and restart replay also uses this path for position-only reconstruction.
533    pub async fn apply_fill_to_memory(
534        &self,
535        wallet: &WalletAddress,
536        symbol: &str,
537        side: &Side,
538        price: Decimal,
539        quantity: Decimal,
540    ) {
541        let mut portfolios = self.portfolios.write().await;
542
543        let portfolio = portfolios
544            .entry(*wallet)
545            .or_insert_with(|| PortfolioBalance {
546                positions: HashMap::new(),
547                total_margin_used: dec!(0),
548            });
549
550        // Use shared helper for position update logic
551        Self::update_position_for_fill(portfolio, symbol, side, price, quantity);
552
553        debug!(
554            "Applied fill to memory: wallet={}, symbol={}, side={:?}, price={}, qty={}",
555            wallet, symbol, side, price, quantity
556        );
557    }
558
559    /// Apply a fill's position changes for both taker and maker atomically under a single write lock.
560    pub async fn apply_fill_to_memory_both_sides(
561        &self,
562        taker_wallet: &WalletAddress,
563        maker_wallet: &WalletAddress,
564        symbol: &str,
565        taker_side: &Side,
566        maker_side: &Side,
567        price: Decimal,
568        quantity: Decimal,
569    ) {
570        let mut portfolios = self.portfolios.write().await;
571        let portfolio = portfolios
572            .entry(*taker_wallet)
573            .or_insert_with(|| PortfolioBalance {
574                positions: HashMap::new(),
575                total_margin_used: dec!(0),
576            });
577        Self::update_position_for_fill(portfolio, symbol, taker_side, price, quantity);
578
579        let portfolio = portfolios
580            .entry(*maker_wallet)
581            .or_insert_with(|| PortfolioBalance {
582                positions: HashMap::new(),
583                total_margin_used: dec!(0),
584            });
585        Self::update_position_for_fill(portfolio, symbol, maker_side, price, quantity);
586    }
587
588    pub async fn apply_option_custody_delta(
589        &self,
590        wallet: &WalletAddress,
591        symbol: &str,
592        quantity_delta: Decimal,
593    ) -> PortfolioChange {
594        let mut portfolios = self.portfolios.write().await;
595        let total_margin_used = {
596            let portfolio = portfolios.entry(*wallet).or_default();
597            let position = portfolio
598                .positions
599                .entry(symbol.to_string())
600                .or_insert_with(|| PositionData {
601                    symbol: symbol.to_string(),
602                    amount: dec!(0),
603                    entry_price: dec!(0),
604                    margin_posted: dec!(0),
605                    realized_pnl: dec!(0),
606                    unrealized_pnl: dec!(0),
607                });
608
609            let old_amount = position.amount;
610            let old_entry = position.entry_price;
611            let new_amount = old_amount + quantity_delta;
612
613            if new_amount == dec!(0) {
614                portfolio.positions.remove(symbol);
615            } else {
616                position.amount = new_amount;
617                position.entry_price = if old_amount > dec!(0) {
618                    old_entry
619                } else if new_amount < dec!(0) {
620                    old_entry
621                } else {
622                    dec!(0)
623                };
624            }
625            portfolio.total_margin_used
626        };
627
628        let position_change = Self::get_position_change_locked(&portfolios, wallet, symbol);
629        PortfolioChange {
630            wallet: *wallet,
631            position_changes: vec![position_change],
632            balance_change: None,
633            total_margin_used,
634        }
635    }
636}
637
638impl Default for PortfolioServiceImpl {
639    fn default() -> Self {
640        Self::new()
641    }
642}
643
644#[async_trait]
645impl PortfolioService for PortfolioServiceImpl {
646    async fn get_portfolio(&self, account: &WalletAddress) -> Portfolio {
647        let portfolios = self.portfolios.read().await;
648
649        if let Some(cached) = portfolios.get(account) {
650            // Convert cached data to API Portfolio format
651            let positions: Vec<PositionWithMetrics> = cached
652                .positions
653                .values()
654                .map(|pos| {
655                    let position = Position {
656                        wallet_address: *account,
657                        symbol: pos.symbol.clone(),
658                        amount: pos.amount,
659                        entry_price: pos.entry_price,
660                        margin_posted: pos.margin_posted,
661                        realized_pnl: pos.realized_pnl,
662                        unrealized_pnl: pos.unrealized_pnl,
663                        updated_at: chrono::Utc::now(),
664                    };
665                    calculate_position_metrics(position)
666                })
667                .collect();
668
669            Portfolio {
670                wallet_address: *account,
671                positions,
672                total_margin_used: cached.total_margin_used,
673                available_balance: dec!(0), // Computed by handler using equity - IM
674                span_margin: None,          // Filled in by HTTP handler from engine
675                margin_mode: "standard".to_string(), // Default, filled by HTTP handler
676                margin_summary: None,       // Filled by HTTP handler
677            }
678        } else {
679            // Return empty portfolio (lazy account creation)
680            Portfolio {
681                wallet_address: *account,
682                positions: Vec::new(),
683                total_margin_used: dec!(0),
684                available_balance: dec!(0),
685                span_margin: None,
686                margin_mode: "standard".to_string(), // Default, filled by HTTP handler
687                margin_summary: None,                // Filled by HTTP handler
688            }
689        }
690    }
691
692    async fn get_portfolio_balance(&self, account: &WalletAddress) -> Option<PortfolioBalance> {
693        // Delegate to the inherent method
694        PortfolioServiceImpl::get_portfolio_balance(self, account).await
695    }
696
697    async fn all_portfolios(&self) -> HashMap<WalletAddress, PortfolioBalance> {
698        // Delegate to the inherent method
699        PortfolioServiceImpl::all_portfolios(self).await
700    }
701
702    // WARNING: The OrderFilled branch below uses apply_realized_pnl_to_ledger()
703    // and apply_option_premium_if_standard(), both of which write to InMemoryLedger
704    // ONLY (no DB persistence). This is safe because the live fill path in
705    // PortfolioCache uses apply_fill_state_mutation() instead (which syncs memory
706    // AFTER the journal writes to DB). Do NOT use apply_event(OrderFilled) for
707    // real fills (e.g., catchup replay) without adding DB persistence first.
708    // See: CALL-500 post-mortem — same class of bug (in-memory-only mutation).
709    async fn apply_event(
710        &self,
711        event: &EngineMessage,
712    ) -> Result<Vec<PortfolioChange>, PortfolioError> {
713        match event {
714            EngineMessage::OrderFilled { fill, .. } => {
715                let size_human = to_human_readable_decimal(&fill.symbol, fill.size);
716
717                // Compute maker side
718                let maker_side = match fill.taker_side {
719                    Side::Buy => Side::Sell,
720                    Side::Sell => Side::Buy,
721                };
722
723                // === ATOMIC TAKER + MAKER PORTFOLIO UPDATE ===
724                // Apply both fills under a single lock to ensure readers never see
725                // a state where taker is updated but maker is not.
726                // Idempotency: if trade_id already applied to a wallet, that side is skipped.
727                // Also capture position state for notifications.
728                let (
729                    taker_realized_pnl,
730                    maker_realized_pnl,
731                    taker_change,
732                    maker_change,
733                    taker_margin,
734                    maker_margin,
735                ) = {
736                    let mut portfolios = self.portfolios.write().await;
737
738                    let taker_pnl = Self::apply_fill_locked(
739                        &mut portfolios,
740                        &fill.taker_wallet_address,
741                        fill.trade_id,
742                        &fill.symbol,
743                        fill.taker_side,
744                        fill.price,
745                        size_human,
746                    );
747
748                    let maker_pnl = Self::apply_fill_locked(
749                        &mut portfolios,
750                        &fill.maker_wallet_address,
751                        fill.trade_id,
752                        &fill.symbol,
753                        maker_side,
754                        fill.price,
755                        size_human,
756                    );
757
758                    // Capture position state for notifications
759                    let taker_pos = Self::get_position_change_locked(
760                        &portfolios,
761                        &fill.taker_wallet_address,
762                        &fill.symbol,
763                    );
764                    let maker_pos = Self::get_position_change_locked(
765                        &portfolios,
766                        &fill.maker_wallet_address,
767                        &fill.symbol,
768                    );
769
770                    // Capture margin used
771                    let taker_margin = portfolios
772                        .get(&fill.taker_wallet_address)
773                        .map(|b| b.total_margin_used)
774                        .unwrap_or(dec!(0));
775                    let maker_margin = portfolios
776                        .get(&fill.maker_wallet_address)
777                        .map(|b| b.total_margin_used)
778                        .unwrap_or(dec!(0));
779
780                    (
781                        taker_pnl,
782                        maker_pnl,
783                        taker_pos,
784                        maker_pos,
785                        taker_margin,
786                        maker_margin,
787                    )
788                };
789                // Lock released here
790
791                // Apply taker's realized PnL to ledger (only on position closes)
792                // CRITICAL: Ledger failure halts processing
793                self.apply_realized_pnl_to_ledger(&fill.taker_wallet_address, taker_realized_pnl)
794                    .await?;
795
796                // Apply option premium for Standard margin mode (taker)
797                // In Standard mode: Buy debits USDC, Sell credits USDC
798                self.apply_option_premium_if_standard(
799                    &fill.taker_wallet_address,
800                    &fill.symbol,
801                    &fill.taker_side,
802                    fill.price,
803                    size_human,
804                )
805                .await?;
806
807                // Apply maker's realized PnL to ledger (only on position closes)
808                // CRITICAL: Ledger failure halts processing
809                self.apply_realized_pnl_to_ledger(&fill.maker_wallet_address, maker_realized_pnl)
810                    .await?;
811
812                // Apply option premium for Standard margin mode (maker)
813                // In Standard mode: Buy debits USDC, Sell credits USDC
814                self.apply_option_premium_if_standard(
815                    &fill.maker_wallet_address,
816                    &fill.symbol,
817                    &maker_side,
818                    fill.price,
819                    size_human,
820                )
821                .await?;
822
823                // Return changes for both taker and maker
824                let mut changes = Vec::with_capacity(2);
825
826                changes.push(PortfolioChange {
827                    wallet: fill.taker_wallet_address,
828                    position_changes: vec![taker_change],
829                    balance_change: None,
830                    total_margin_used: taker_margin,
831                });
832
833                changes.push(PortfolioChange {
834                    wallet: fill.maker_wallet_address,
835                    position_changes: vec![maker_change],
836                    balance_change: None,
837                    total_margin_used: maker_margin,
838                });
839
840                return Ok(changes);
841            }
842            EngineMessage::PositionExpired(expiry_msg) => {
843                let wallet = &expiry_msg.wallet_address;
844                let symbol = &expiry_msg.symbol;
845
846                info!(
847                    "PortfolioService: Removing expired position for wallet={}, symbol={}",
848                    wallet, symbol
849                );
850
851                {
852                    let mut portfolios = self.portfolios.write().await;
853
854                    if let Some(portfolio) = portfolios.get_mut(wallet) {
855                        if portfolio.positions.remove(symbol).is_some() {
856                            info!(
857                                "PortfolioService: Removed expired position {} for wallet {}",
858                                symbol, wallet
859                            );
860                        } else {
861                            warn!(
862                                "PortfolioService: Position {} not found for wallet {} during expiry",
863                                symbol, wallet
864                            );
865                        }
866                    } else {
867                        warn!(
868                            "PortfolioService: Portfolio not found for wallet {} during expiry",
869                            wallet
870                        );
871                    }
872                }
873            }
874            _ => {
875                // Other event types not handled yet (OrderInfo, L2Snapshot, etc.)
876            }
877        }
878        Ok(vec![])
879    }
880
881    async fn apply_hypercore_position_update(&self, update: &HypercorePositionUpdate) {
882        // For incremental updates, use same logic as set_hypercore_position
883        self.set_hypercore_position(update).await;
884    }
885
886    async fn set_hypercore_position(&self, update: &HypercorePositionUpdate) {
887        let account_normalized = update.account.to_lowercase();
888
889        // Parse wallet address before acquiring lock - fail gracefully on malformed input
890        let wallet_address = match WalletAddress::from_str(&account_normalized) {
891            Ok(addr) => addr,
892            Err(e) => {
893                tracing::warn!(
894                    "Skipping hypercore position update: invalid account address '{}' for coin {}: {}",
895                    update.account,
896                    update.coin,
897                    e
898                );
899                return;
900            }
901        };
902
903        let position_key = canonical_perp_symbol(&update.coin);
904
905        // Reject stale updates: if we already have a newer timestamp for this
906        // (wallet, symbol) pair, skip. This prevents REST-polled fallback data
907        // from overwriting fresher WS-fed Hydromancer data.
908        {
909            let mut ts_map = self.perp_position_timestamps.write().await;
910            let ts_key = (wallet_address, position_key.clone());
911            if let Some(&prev_ts) = ts_map.get(&ts_key) {
912                if update.timestamp < prev_ts {
913                    debug!(
914                        "Dropping stale perp update for {}/{}: ts {} < prev {}",
915                        update.account, position_key, update.timestamp, prev_ts
916                    );
917                    return;
918                }
919            }
920            ts_map.insert(ts_key, update.timestamp);
921        }
922
923        let mut portfolios = self.portfolios.write().await;
924
925        let portfolio = portfolios
926            .entry(wallet_address)
927            .or_insert_with(|| PortfolioBalance {
928                positions: HashMap::new(),
929                total_margin_used: dec!(0),
930            });
931        let raw_position_key = update.coin.trim().to_ascii_uppercase();
932
933        if raw_position_key != position_key {
934            portfolio.positions.remove(&raw_position_key);
935        }
936        if update.coin != raw_position_key && update.coin != position_key {
937            portfolio.positions.remove(&update.coin);
938        }
939
940        if update.size == 0.0 {
941            // Position closed - remove from map
942            portfolio.positions.remove(&position_key);
943            debug!(
944                "Removed hypercore position {} for account {}",
945                position_key, update.account
946            );
947        } else {
948            // Set/update position - convert f64 to Decimal
949            portfolio.positions.insert(
950                position_key.clone(),
951                PositionData {
952                    symbol: position_key.clone(),
953                    amount: Decimal::from_f64_retain(update.size).unwrap_or(Decimal::ZERO),
954                    entry_price: Decimal::from_f64_retain(update.entry_price)
955                        .unwrap_or(Decimal::ZERO),
956                    margin_posted: dec!(0), // Hypercore positions have their own margin
957                    realized_pnl: dec!(0),  // Not tracked for hypercore positions
958                    unrealized_pnl: Decimal::from_f64_retain(update.unrealized_pnl)
959                        .unwrap_or(Decimal::ZERO),
960                },
961            );
962            debug!(
963                "Set hypercore position {} for account {}: size={}, entry_price={}, unrealized_pnl={}",
964                position_key, update.account, update.size, update.entry_price, update.unrealized_pnl
965            );
966        }
967    }
968
969    fn as_any(&self) -> &dyn std::any::Any {
970        self
971    }
972
973    async fn calculate_fill_accounting(
974        &self,
975        fill: &hypercall_types::Fill,
976    ) -> Result<FillAccounting, PortfolioError> {
977        // Delegate to the inherent method
978        PortfolioServiceImpl::calculate_fill_accounting(self, fill).await
979    }
980
981    async fn apply_fill_to_memory(
982        &self,
983        wallet: &WalletAddress,
984        symbol: &str,
985        side: &Side,
986        price: Decimal,
987        quantity: Decimal,
988    ) {
989        // Delegate to the inherent method
990        PortfolioServiceImpl::apply_fill_to_memory(self, wallet, symbol, side, price, quantity)
991            .await
992    }
993
994    async fn remove_expired_position(&self, wallet: &WalletAddress, symbol: &str) {
995        let mut portfolios = self.portfolios.write().await;
996
997        if let Some(portfolio) = portfolios.get_mut(wallet) {
998            if portfolio.positions.remove(symbol).is_some() {
999                info!(
1000                    "PortfolioService: Removed expired position {} for wallet {} (replay cleanup, no ledger credit)",
1001                    symbol, wallet
1002                );
1003            } else {
1004                debug!(
1005                    "PortfolioService: Position {} already removed for wallet {} (replay cleanup)",
1006                    symbol, wallet
1007                );
1008            }
1009        } else {
1010            debug!(
1011                "PortfolioService: Portfolio not found for wallet {} during replay cleanup",
1012                wallet
1013            );
1014        }
1015    }
1016}
1017
1018// Implement Snapshotable for use with generic snapshot infrastructure
1019#[async_trait]
1020impl Snapshotable for PortfolioServiceImpl {
1021    type Key = WalletAddress;
1022    type State = PortfolioBalance;
1023
1024    async fn list_all(&self) -> Result<HashMap<Self::Key, Self::State>, SnapshotError> {
1025        Ok(self.all_portfolios().await)
1026    }
1027
1028    async fn restore(&self, key: &Self::Key, state: Self::State) -> Result<(), SnapshotError> {
1029        let mut portfolios = self.portfolios.write().await;
1030        portfolios.insert(*key, state);
1031        Ok(())
1032    }
1033
1034    async fn clear_all(&self) -> Result<(), SnapshotError> {
1035        let mut portfolios = self.portfolios.write().await;
1036        portfolios.clear();
1037        Ok(())
1038    }
1039}
1040
1041#[cfg(test)]
1042mod tests {
1043    use super::*;
1044    use anyhow::Result;
1045    use hypercall_types::Fill;
1046    use hypercall_types::MarginMode as TypeMarginMode;
1047    use hypercall_types::WalletAddress;
1048    use hypercall_types::CONTRACT_UNIT_MULTIPLIER;
1049
1050    use hypercall_types::wallet_address::test_wallet;
1051    use std::sync::atomic::{AtomicU64, Ordering};
1052
1053    /// Auto-incrementing trade ID for test isolation
1054    static NEXT_TRADE_ID: AtomicU64 = AtomicU64::new(1);
1055
1056    fn create_service() -> PortfolioServiceImpl {
1057        PortfolioServiceImpl::new()
1058    }
1059
1060    struct NoopTierDb;
1061
1062    impl hypercall_db::TierReader for NoopTierDb {
1063        fn get_margin_mode_sync(&self, _wallet: &WalletAddress) -> Result<TypeMarginMode> {
1064            Ok(TypeMarginMode::Standard)
1065        }
1066
1067        fn get_existing_margin_mode_sync(
1068            &self,
1069            _wallet: &WalletAddress,
1070        ) -> Result<Option<TypeMarginMode>> {
1071            Ok(None)
1072        }
1073
1074        fn get_tier_defaults_sync(
1075            &self,
1076            _tier_name: &str,
1077        ) -> Result<Option<hypercall_db::TierDefaultsRecord>> {
1078            Ok(None)
1079        }
1080
1081        fn get_user_tier_sync(
1082            &self,
1083            _wallet: &WalletAddress,
1084        ) -> Result<Option<hypercall_db::UserTierRecord>> {
1085            Ok(None)
1086        }
1087
1088        fn get_all_user_tiers_sync(&self) -> Result<Vec<hypercall_db::UserTierRecord>> {
1089            Ok(Vec::new())
1090        }
1091    }
1092
1093    impl hypercall_db::TierWriter for NoopTierDb {
1094        fn save_user_tier_sync(&self, _update: &hypercall_db::UserTierUpdate) -> Result<()> {
1095            Ok(())
1096        }
1097
1098        fn set_margin_mode_sync(
1099            &self,
1100            _wallet: &WalletAddress,
1101            _margin_mode: TypeMarginMode,
1102        ) -> Result<i64> {
1103            Ok(1)
1104        }
1105
1106        fn insert_margin_mode_if_missing_sync(
1107            &self,
1108            _wallet: &WalletAddress,
1109            _margin_mode: TypeMarginMode,
1110        ) -> Result<Option<i64>> {
1111            Ok(Some(1))
1112        }
1113
1114        fn delete_user_tier_sync(&self, _wallet: &WalletAddress) -> Result<()> {
1115            Ok(())
1116        }
1117    }
1118
1119    async fn attach_tier_cache(
1120        service: &PortfolioServiceImpl,
1121        modes: &[(WalletAddress, TypeMarginMode)],
1122    ) {
1123        let tier_cache = Arc::new(TierCache::new(Arc::new(NoopTierDb)).unwrap());
1124        for (wallet, mode) in modes {
1125            tier_cache.set_margin_mode_in_memory(wallet, *mode).await;
1126        }
1127        service.set_tier_cache(tier_cache).await;
1128    }
1129
1130    fn fill_from_event(event: &EngineMessage) -> Fill {
1131        match event {
1132            EngineMessage::OrderFilled { fill, .. } => fill.clone(),
1133            _ => panic!("expected OrderFilled event"),
1134        }
1135    }
1136
1137    #[tokio::test]
1138    async fn option_custody_delta_updates_read_model_without_cash_effects() {
1139        let service = create_service();
1140        let wallet = test_wallet(231);
1141        let symbol = "BTC-20261231-100000-C";
1142
1143        service
1144            .apply_option_custody_delta(&wallet, symbol, dec!(2))
1145            .await;
1146        let balance = service
1147            .get_portfolio_balance(&wallet)
1148            .await
1149            .expect("portfolio should exist after option deposit");
1150        let position = balance
1151            .positions
1152            .get(symbol)
1153            .expect("position should exist");
1154        assert_eq!(position.amount, dec!(2));
1155        assert_eq!(position.entry_price, dec!(0));
1156
1157        service
1158            .apply_option_custody_delta(&wallet, symbol, dec!(-1))
1159            .await;
1160        let balance = service
1161            .get_portfolio_balance(&wallet)
1162            .await
1163            .expect("portfolio should still exist after partial withdrawal");
1164        assert_eq!(balance.positions[symbol].amount, dec!(1));
1165
1166        service
1167            .apply_option_custody_delta(&wallet, symbol, dec!(-1))
1168            .await;
1169        let balance = service
1170            .get_portfolio_balance(&wallet)
1171            .await
1172            .expect("portfolio should remain for empty account");
1173        assert!(
1174            !balance.positions.contains_key(symbol),
1175            "withdrawal to zero should remove the read-model position"
1176        );
1177    }
1178
1179    /// Create a fill with an auto-incremented trade_id (for unique fills)
1180    fn create_fill(
1181        wallet: WalletAddress,
1182        symbol: &str,
1183        side: Side,
1184        price: f64,
1185        quantity: f64,
1186    ) -> EngineMessage {
1187        let trade_id = NEXT_TRADE_ID.fetch_add(1, Ordering::Relaxed);
1188        create_fill_with_trade_id(trade_id, wallet, symbol, side, price, quantity)
1189    }
1190
1191    /// Create a fill with a specific trade_id (for idempotency testing)
1192    fn create_fill_with_trade_id(
1193        trade_id: u64,
1194        wallet: WalletAddress,
1195        symbol: &str,
1196        side: Side,
1197        price: f64,
1198        quantity: f64,
1199    ) -> EngineMessage {
1200        let price_dec = Decimal::from_f64_retain(price).unwrap_or(dec!(0));
1201        let size_dec =
1202            Decimal::from_f64_retain(quantity * CONTRACT_UNIT_MULTIPLIER).unwrap_or(dec!(0));
1203        let fill = Fill {
1204            trade_id,
1205            taker_order_id: 0,
1206            maker_order_id: 0,
1207            symbol: symbol.to_string(),
1208            price: price_dec,
1209            size: size_dec,
1210            taker_side: side,
1211            taker_wallet_address: wallet,
1212            maker_wallet_address: test_wallet(200),
1213            fee: dec!(0),
1214            is_taker: true,
1215            timestamp: 0,
1216            builder_code_address: None,
1217            builder_code_fee: None,
1218            source: Default::default(),
1219            taker_realized_pnl: None,
1220            maker_realized_pnl: None,
1221            underlying_notional: None,
1222        };
1223        EngineMessage::OrderFilled {
1224            accounting: hypercall_engine::FillAccounting::from_fill(&fill),
1225            fill,
1226        }
1227    }
1228
1229    #[tokio::test]
1230    async fn calculate_fill_accounting_standard_buy_uses_premium_only() {
1231        let service = create_service();
1232        let taker = test_wallet(1);
1233        let maker = test_wallet(200);
1234        attach_tier_cache(
1235            &service,
1236            &[
1237                (taker, TypeMarginMode::Standard),
1238                (maker, TypeMarginMode::Standard),
1239            ],
1240        )
1241        .await;
1242        let fill = fill_from_event(&create_fill_with_trade_id(
1243            1001,
1244            taker,
1245            "BTC-20261231-100000-C",
1246            Side::Buy,
1247            100.0,
1248            2.0,
1249        ));
1250
1251        let accounting = service.calculate_fill_accounting(&fill).await.unwrap();
1252
1253        assert_eq!(accounting.taker_premium_delta(), dec!(-200));
1254        assert_eq!(accounting.maker_premium_delta(), dec!(200));
1255        assert_eq!(accounting.taker_ledger_residual_delta(), Decimal::ZERO);
1256        assert_eq!(accounting.maker_ledger_residual_delta(), Decimal::ZERO);
1257        assert_eq!(accounting.taker_net_cash_delta(), dec!(-200));
1258        assert_eq!(accounting.maker_net_cash_delta(), dec!(200));
1259    }
1260
1261    #[tokio::test]
1262    async fn calculate_fill_accounting_standard_sell_uses_premium_only() {
1263        let service = create_service();
1264        let taker = test_wallet(2);
1265        let maker = test_wallet(200);
1266        attach_tier_cache(
1267            &service,
1268            &[
1269                (taker, TypeMarginMode::Standard),
1270                (maker, TypeMarginMode::Standard),
1271            ],
1272        )
1273        .await;
1274        let fill = fill_from_event(&create_fill_with_trade_id(
1275            1002,
1276            taker,
1277            "BTC-20261231-100000-C",
1278            Side::Sell,
1279            100.0,
1280            2.0,
1281        ));
1282
1283        let accounting = service.calculate_fill_accounting(&fill).await.unwrap();
1284
1285        assert_eq!(accounting.taker_premium_delta(), dec!(200));
1286        assert_eq!(accounting.maker_premium_delta(), dec!(-200));
1287        assert_eq!(accounting.taker_ledger_residual_delta(), Decimal::ZERO);
1288        assert_eq!(accounting.maker_ledger_residual_delta(), Decimal::ZERO);
1289        assert_eq!(accounting.taker_net_cash_delta(), dec!(200));
1290        assert_eq!(accounting.maker_net_cash_delta(), dec!(-200));
1291    }
1292
1293    #[tokio::test]
1294    async fn calculate_fill_accounting_standard_close_preserves_reported_pnl() {
1295        let service = create_service();
1296        let taker = test_wallet(5);
1297        let maker = test_wallet(200);
1298        attach_tier_cache(
1299            &service,
1300            &[
1301                (taker, TypeMarginMode::Standard),
1302                (maker, TypeMarginMode::Standard),
1303            ],
1304        )
1305        .await;
1306        service
1307            .apply_fill_to_memory(
1308                &taker,
1309                "BTC-20261231-100000-C",
1310                &Side::Buy,
1311                dec!(100),
1312                dec!(2),
1313            )
1314            .await;
1315        let fill = fill_from_event(&create_fill_with_trade_id(
1316            1005,
1317            taker,
1318            "BTC-20261231-100000-C",
1319            Side::Sell,
1320            70.0,
1321            1.0,
1322        ));
1323
1324        let accounting = service.calculate_fill_accounting(&fill).await.unwrap();
1325
1326        assert_eq!(accounting.taker_realized_pnl, dec!(-30));
1327        assert_eq!(accounting.taker_premium_delta(), dec!(70));
1328        assert_eq!(accounting.taker_ledger_residual_delta(), Decimal::ZERO);
1329        assert_eq!(accounting.taker_net_cash_delta(), dec!(70));
1330    }
1331
1332    #[tokio::test]
1333    async fn calculate_fill_accounting_portfolio_mode_carries_realized_pnl() {
1334        let service = create_service();
1335        let taker = test_wallet(3);
1336        let maker = test_wallet(200);
1337        attach_tier_cache(
1338            &service,
1339            &[
1340                (taker, TypeMarginMode::Portfolio),
1341                (maker, TypeMarginMode::Portfolio),
1342            ],
1343        )
1344        .await;
1345        service
1346            .apply_fill_to_memory(
1347                &taker,
1348                "BTC-20261231-100000-C",
1349                &Side::Buy,
1350                dec!(100),
1351                dec!(2),
1352            )
1353            .await;
1354        let fill = fill_from_event(&create_fill_with_trade_id(
1355            1003,
1356            taker,
1357            "BTC-20261231-100000-C",
1358            Side::Sell,
1359            70.0,
1360            1.0,
1361        ));
1362
1363        let accounting = service.calculate_fill_accounting(&fill).await.unwrap();
1364
1365        assert_eq!(accounting.taker_premium_delta(), Decimal::ZERO);
1366        assert_eq!(accounting.taker_realized_pnl, dec!(-30));
1367        assert_eq!(accounting.taker_ledger_residual_delta(), dec!(-30));
1368        assert_eq!(accounting.taker_net_cash_delta(), dec!(-30));
1369        assert_eq!(accounting.maker_net_cash_delta(), Decimal::ZERO);
1370    }
1371
1372    #[tokio::test]
1373    async fn calculate_fill_accounting_without_position_has_zero_realized_pnl() {
1374        let service = create_service();
1375        let fill = fill_from_event(&create_fill_with_trade_id(
1376            1004,
1377            test_wallet(4),
1378            "BTC-PERP",
1379            Side::Sell,
1380            70.0,
1381            1.0,
1382        ));
1383
1384        let accounting = service.calculate_fill_accounting(&fill).await.unwrap();
1385
1386        assert_eq!(accounting, hypercall_types::FillAccounting::zero(1004));
1387    }
1388
1389    #[tokio::test]
1390    async fn test_handle_buy_fill() {
1391        let service = create_service();
1392
1393        let event = create_fill(test_wallet(1), "BTC-CALL-100000", Side::Buy, 1000.0, 10.0);
1394        service.apply_event(&event).await.unwrap();
1395
1396        let portfolio = service.get_portfolio(&test_wallet(1)).await;
1397        assert_eq!(portfolio.wallet_address, test_wallet(1));
1398        assert_eq!(portfolio.positions.len(), 1);
1399        let position = &portfolio.positions[0].position;
1400        assert_eq!(position.symbol, "BTC-CALL-100000");
1401        assert_eq!(position.amount, dec!(10));
1402        assert_eq!(position.entry_price, dec!(1000));
1403    }
1404
1405    #[tokio::test]
1406    async fn test_handle_sell_fill() {
1407        let service = create_service();
1408
1409        // First buy some
1410        let buy_event = create_fill(test_wallet(1), "BTC-CALL-100000", Side::Buy, 1000.0, 10.0);
1411        service.apply_event(&buy_event).await.unwrap();
1412
1413        // Then sell half
1414        let sell_event = create_fill(test_wallet(1), "BTC-CALL-100000", Side::Sell, 1100.0, 5.0);
1415        service.apply_event(&sell_event).await.unwrap();
1416
1417        let portfolio = service.get_portfolio(&test_wallet(1)).await;
1418        assert_eq!(portfolio.positions.len(), 1);
1419        let position = &portfolio.positions[0].position;
1420        assert_eq!(position.amount, dec!(5)); // 10 - 5
1421    }
1422
1423    #[tokio::test]
1424    async fn test_handle_full_position_close() {
1425        let service = create_service();
1426
1427        // Buy position
1428        let buy_event = create_fill(test_wallet(1), "BTC-CALL-100000", Side::Buy, 1000.0, 10.0);
1429        service.apply_event(&buy_event).await.unwrap();
1430
1431        // Sell entire position
1432        let sell_event = create_fill(test_wallet(1), "BTC-CALL-100000", Side::Sell, 1100.0, 10.0);
1433        service.apply_event(&sell_event).await.unwrap();
1434
1435        // Check position was closed
1436        let portfolio = service.get_portfolio(&test_wallet(1)).await;
1437        assert_eq!(portfolio.positions.len(), 0);
1438        assert_eq!(portfolio.total_margin_used, dec!(0));
1439    }
1440
1441    #[tokio::test]
1442    async fn test_handle_short_selling() {
1443        let service = create_service();
1444
1445        // Short sell (sell without existing position)
1446        let event = create_fill(test_wallet(1), "BTC-CALL-100000", Side::Sell, 1000.0, 10.0);
1447        service.apply_event(&event).await.unwrap();
1448
1449        // Check short position was created
1450        let portfolio = service.get_portfolio(&test_wallet(1)).await;
1451        assert_eq!(portfolio.positions.len(), 1);
1452        let position = &portfolio.positions[0].position;
1453        assert_eq!(position.amount, dec!(-10)); // Negative for short
1454
1455        // NOTE: margin_posted and total_margin_used are deprecated (always 0.0)
1456        // Real margin comes from SPAN via SpanMarginSummary
1457    }
1458
1459    #[tokio::test]
1460    async fn test_buy_to_close_short_partial() {
1461        let service = create_service();
1462
1463        // Short sell 10 contracts at 1000
1464        let short_event = create_fill(test_wallet(1), "BTC-CALL-100000", Side::Sell, 1000.0, 10.0);
1465        service.apply_event(&short_event).await.unwrap();
1466
1467        // Buy 5 contracts at 900 to close half the short (profit)
1468        let buy_event = create_fill(test_wallet(1), "BTC-CALL-100000", Side::Buy, 900.0, 5.0);
1469        service.apply_event(&buy_event).await.unwrap();
1470
1471        // Check internal state directly for realized PnL
1472        let portfolios = service.all_portfolios().await;
1473        let wallet_portfolio = portfolios.get(&test_wallet(1)).unwrap();
1474        let position = wallet_portfolio.positions.get("BTC-CALL-100000").unwrap();
1475
1476        // Should have -5 contracts remaining (closed 5 of the 10 short)
1477        assert_eq!(position.amount, dec!(-5));
1478
1479        // Realized PnL should be (1000 - 900) * 5 = 500 (profit on closing short at lower price)
1480        assert_eq!(position.realized_pnl, dec!(500));
1481
1482        // NOTE: margin_posted is deprecated - real margin comes from SPAN
1483    }
1484
1485    #[tokio::test]
1486    async fn test_extend_short_position() {
1487        let service = create_service();
1488
1489        // Short sell 10 contracts at 1000
1490        let short_event = create_fill(test_wallet(1), "BTC-CALL-100000", Side::Sell, 1000.0, 10.0);
1491        service.apply_event(&short_event).await.unwrap();
1492
1493        let portfolio = service.get_portfolio(&test_wallet(1)).await;
1494        let position = &portfolio.positions[0].position;
1495        assert_eq!(position.amount, dec!(-10));
1496        assert_eq!(position.entry_price, dec!(1000));
1497
1498        // Sell 5 more contracts at 1100 (extending the short)
1499        let extend_event = create_fill(test_wallet(1), "BTC-CALL-100000", Side::Sell, 1100.0, 5.0);
1500        service.apply_event(&extend_event).await.unwrap();
1501
1502        // Check position
1503        let portfolio = service.get_portfolio(&test_wallet(1)).await;
1504        let position = &portfolio.positions[0].position;
1505
1506        // Should have -15 contracts total
1507        assert_eq!(position.amount, dec!(-15));
1508
1509        // Entry price should be weighted average: (1000*10 + 1100*5) / 15 = 1033.33...
1510        let expected_entry = (dec!(1000) * dec!(10) + dec!(1100) * dec!(5)) / dec!(15);
1511        assert!((position.entry_price - expected_entry).abs() < dec!(0.01));
1512
1513        // NOTE: margin_posted is deprecated - real margin comes from SPAN
1514    }
1515
1516    #[tokio::test]
1517    async fn test_buy_to_close_short_full() {
1518        let service = create_service();
1519
1520        // Short sell 10 contracts at 1000
1521        let short_event = create_fill(test_wallet(1), "BTC-CALL-100000", Side::Sell, 1000.0, 10.0);
1522        service.apply_event(&short_event).await.unwrap();
1523
1524        // Buy 10 contracts at 1100 to close the entire short (loss)
1525        let buy_event = create_fill(test_wallet(1), "BTC-CALL-100000", Side::Buy, 1100.0, 10.0);
1526        service.apply_event(&buy_event).await.unwrap();
1527
1528        // Check portfolio
1529        let portfolio = service.get_portfolio(&test_wallet(1)).await;
1530
1531        // Position should be closed
1532        assert_eq!(portfolio.positions.len(), 0);
1533    }
1534
1535    #[tokio::test]
1536    async fn test_sell_to_close_long_with_profit() {
1537        let service = create_service();
1538
1539        // Buy 10 contracts at 1000
1540        let buy_event = create_fill(test_wallet(1), "BTC-CALL-100000", Side::Buy, 1000.0, 10.0);
1541        service.apply_event(&buy_event).await.unwrap();
1542
1543        // Sell 5 contracts at 1200 (profit)
1544        let sell_event = create_fill(test_wallet(1), "BTC-CALL-100000", Side::Sell, 1200.0, 5.0);
1545        service.apply_event(&sell_event).await.unwrap();
1546
1547        // Check internal state directly for realized PnL
1548        let portfolios = service.all_portfolios().await;
1549        let wallet_portfolio = portfolios.get(&test_wallet(1)).unwrap();
1550        let position = wallet_portfolio.positions.get("BTC-CALL-100000").unwrap();
1551
1552        // Should have 5 contracts remaining
1553        assert_eq!(position.amount, dec!(5));
1554
1555        // Realized PnL should be (1200 - 1000) * 5 = 1000 (profit)
1556        assert_eq!(position.realized_pnl, dec!(1000));
1557    }
1558
1559    #[tokio::test]
1560    async fn test_sell_to_close_long_with_loss() {
1561        let service = create_service();
1562
1563        // Buy 10 contracts at 1000
1564        let buy_event = create_fill(test_wallet(1), "BTC-CALL-100000", Side::Buy, 1000.0, 10.0);
1565        service.apply_event(&buy_event).await.unwrap();
1566
1567        // Sell 10 contracts at 800 (loss)
1568        let sell_event = create_fill(test_wallet(1), "BTC-CALL-100000", Side::Sell, 800.0, 10.0);
1569        service.apply_event(&sell_event).await.unwrap();
1570
1571        // Check portfolio - position should be closed
1572        let portfolio = service.get_portfolio(&test_wallet(1)).await;
1573        assert_eq!(portfolio.positions.len(), 0);
1574    }
1575
1576    #[tokio::test]
1577    async fn test_multiple_fills_cumulative_realized_pnl() {
1578        let service = create_service();
1579
1580        // Buy 10 contracts at 1000
1581        let buy_event = create_fill(test_wallet(1), "BTC-CALL-100000", Side::Buy, 1000.0, 10.0);
1582        service.apply_event(&buy_event).await.unwrap();
1583
1584        // Sell 5 contracts at 1100 (profit of 500)
1585        let sell1_event = create_fill(test_wallet(1), "BTC-CALL-100000", Side::Sell, 1100.0, 5.0);
1586        service.apply_event(&sell1_event).await.unwrap();
1587
1588        {
1589            let portfolios = service.all_portfolios().await;
1590            let wallet_portfolio = portfolios.get(&test_wallet(1)).unwrap();
1591            let position = wallet_portfolio.positions.get("BTC-CALL-100000").unwrap();
1592            assert_eq!(position.realized_pnl, dec!(500));
1593        }
1594
1595        // Sell 3 more contracts at 1200 (profit of 600)
1596        let sell2_event = create_fill(test_wallet(1), "BTC-CALL-100000", Side::Sell, 1200.0, 3.0);
1597        service.apply_event(&sell2_event).await.unwrap();
1598
1599        {
1600            let portfolios = service.all_portfolios().await;
1601            let wallet_portfolio = portfolios.get(&test_wallet(1)).unwrap();
1602            let position = wallet_portfolio.positions.get("BTC-CALL-100000").unwrap();
1603
1604            // Cumulative realized PnL should be 500 + 600 = 1100
1605            assert_eq!(position.realized_pnl, dec!(1100));
1606            assert_eq!(position.amount, dec!(2)); // 10 - 5 - 3 = 2 remaining
1607        }
1608    }
1609
1610    #[tokio::test]
1611    async fn test_multiple_fills_weighted_average() {
1612        let service = create_service();
1613
1614        // Multiple buys at different prices
1615        let buy1_event = create_fill(test_wallet(1), "BTC-CALL-100000", Side::Buy, 1000.0, 10.0);
1616        service.apply_event(&buy1_event).await.unwrap();
1617
1618        let buy2_event = create_fill(test_wallet(1), "BTC-CALL-100000", Side::Buy, 1100.0, 10.0);
1619        service.apply_event(&buy2_event).await.unwrap();
1620
1621        // Check weighted average price
1622        let portfolio = service.get_portfolio(&test_wallet(1)).await;
1623        let position = &portfolio.positions[0].position;
1624        assert_eq!(position.amount, dec!(20));
1625        assert_eq!(position.entry_price, dec!(1050)); // (1000*10 + 1100*10) / 20
1626    }
1627
1628    #[tokio::test]
1629    async fn test_order_filled_message_quantity_is_converted_from_contract_units() {
1630        let service = create_service();
1631
1632        // Create a fill message with contract units (like real production)
1633        let fill = Fill {
1634            trade_id: 1,
1635            taker_order_id: 100,
1636            maker_order_id: 101,
1637            symbol: "BTC-20260131-100000-C".to_string(),
1638            price: dec!(2000),      // $2000 per contract
1639            size: dec!(10_000_000), // 10 contracts in CONTRACT UNITS
1640            taker_side: Side::Buy,
1641            taker_wallet_address: test_wallet(1),
1642            maker_wallet_address: test_wallet(2),
1643            fee: dec!(1),
1644            is_taker: true,
1645            timestamp: chrono::Utc::now().timestamp() as u64,
1646            builder_code_address: None,
1647            builder_code_fee: None,
1648            source: Default::default(),
1649            taker_realized_pnl: None,
1650            maker_realized_pnl: None,
1651            underlying_notional: None,
1652        };
1653
1654        // Process through apply_event
1655        service
1656            .apply_event(&EngineMessage::OrderFilled {
1657                accounting: hypercall_engine::FillAccounting::from_fill(&fill),
1658                fill,
1659            })
1660            .await
1661            .unwrap();
1662
1663        // Verify taker portfolio
1664        let portfolio = service.get_portfolio(&test_wallet(1)).await;
1665
1666        // Verify position quantity
1667        assert_eq!(portfolio.positions.len(), 1);
1668        let position = &portfolio.positions[0].position;
1669        assert_eq!(position.amount, dec!(10), "Position should be 10 contracts");
1670    }
1671
1672    #[tokio::test]
1673    async fn test_flip_long_to_short() {
1674        let service = create_service();
1675
1676        // Buy 10 contracts (long position)
1677        service
1678            .handle_fill(
1679                &test_wallet(1),
1680                "BTC-CALL-100000",
1681                Side::Buy,
1682                dec!(1000),
1683                dec!(10),
1684            )
1685            .await;
1686
1687        let portfolio = service.get_portfolio(&test_wallet(1)).await;
1688        assert_eq!(portfolio.positions.len(), 1);
1689        assert_eq!(portfolio.positions[0].position.amount, dec!(10));
1690
1691        // Sell 15 contracts (close 10 long, open 5 short)
1692        service
1693            .handle_fill(
1694                &test_wallet(1),
1695                "BTC-CALL-100000",
1696                Side::Sell,
1697                dec!(1100),
1698                dec!(15),
1699            )
1700            .await;
1701
1702        // Check position after flip
1703        let portfolio = service.get_portfolio(&test_wallet(1)).await;
1704        assert_eq!(portfolio.positions.len(), 1);
1705
1706        let position = &portfolio.positions[0].position;
1707        assert_eq!(position.amount, dec!(-5), "Should have 5 short contracts");
1708        assert_eq!(
1709            position.entry_price,
1710            dec!(1100),
1711            "Entry price should be the flip price"
1712        );
1713
1714        // NOTE: margin_posted is deprecated - real margin comes from SPAN
1715    }
1716
1717    #[tokio::test]
1718    async fn test_flip_short_to_long() {
1719        let service = create_service();
1720
1721        // Sell 10 contracts (short position)
1722        service
1723            .handle_fill(
1724                &test_wallet(1),
1725                "BTC-CALL-100000",
1726                Side::Sell,
1727                dec!(1000),
1728                dec!(10),
1729            )
1730            .await;
1731
1732        let portfolio = service.get_portfolio(&test_wallet(1)).await;
1733        assert_eq!(portfolio.positions.len(), 1);
1734        assert_eq!(portfolio.positions[0].position.amount, dec!(-10));
1735
1736        // Buy 15 contracts (close 10 short, open 5 long)
1737        service
1738            .handle_fill(
1739                &test_wallet(1),
1740                "BTC-CALL-100000",
1741                Side::Buy,
1742                dec!(900),
1743                dec!(15),
1744            )
1745            .await;
1746
1747        // Check position after flip
1748        let portfolio = service.get_portfolio(&test_wallet(1)).await;
1749        assert_eq!(portfolio.positions.len(), 1);
1750
1751        let position = &portfolio.positions[0].position;
1752        assert_eq!(position.amount, dec!(5), "Should have 5 long contracts");
1753        assert_eq!(
1754            position.entry_price,
1755            dec!(900),
1756            "Entry price should be the flip price"
1757        );
1758
1759        // NOTE: margin_posted is deprecated - real margin comes from SPAN
1760    }
1761
1762    // ============================================================================
1763    // Ledger integration tests
1764    // ============================================================================
1765
1766    #[tokio::test]
1767    async fn test_ledger_not_touched_on_open() {
1768        use crate::rsm::ledger::InMemoryLedger;
1769
1770        let service = PortfolioServiceImpl::new();
1771        let ledger = Arc::new(InMemoryLedger::new());
1772        service.set_ledger(ledger.clone()).await;
1773
1774        // Set initial ledger balance
1775        ledger
1776            .set_balance(&test_wallet(1), dec!(10000))
1777            .await
1778            .unwrap();
1779
1780        // Open a position (buy call)
1781        let event = create_fill(test_wallet(1), "BTC-CALL-100000", Side::Buy, 1000.0, 10.0);
1782        service.apply_event(&event).await.unwrap();
1783
1784        // Ledger balance should NOT change (opening position doesn't touch ledger)
1785        let balance = ledger.get_balance(&test_wallet(1)).await.unwrap();
1786        assert_eq!(
1787            balance,
1788            dec!(10000),
1789            "Opening position should not change ledger balance"
1790        );
1791    }
1792
1793    #[tokio::test]
1794    async fn test_ledger_increases_on_profitable_close() {
1795        use crate::rsm::ledger::InMemoryLedger;
1796
1797        let service = PortfolioServiceImpl::new();
1798        let ledger = Arc::new(InMemoryLedger::new());
1799        service.set_ledger(ledger.clone()).await;
1800
1801        // Set initial ledger balance
1802        ledger
1803            .set_balance(&test_wallet(1), dec!(10000))
1804            .await
1805            .unwrap();
1806
1807        // Buy 10 contracts at 1000
1808        let buy_event = create_fill(test_wallet(1), "BTC-CALL-100000", Side::Buy, 1000.0, 10.0);
1809        service.apply_event(&buy_event).await.unwrap();
1810
1811        // Balance unchanged after buy
1812        let balance_after_buy = ledger.get_balance(&test_wallet(1)).await.unwrap();
1813        assert_eq!(
1814            balance_after_buy,
1815            dec!(10000),
1816            "Balance should be unchanged after opening"
1817        );
1818
1819        // Sell 10 contracts at 1500 (profit of 500 per contract = 5000 total)
1820        let sell_event = create_fill(test_wallet(1), "BTC-CALL-100000", Side::Sell, 1500.0, 10.0);
1821        service.apply_event(&sell_event).await.unwrap();
1822
1823        // Ledger balance should increase by realized profit
1824        let balance_after_sell = ledger.get_balance(&test_wallet(1)).await.unwrap();
1825        assert_eq!(
1826            balance_after_sell,
1827            dec!(15000),
1828            "Balance should increase by realized profit (5000)"
1829        );
1830    }
1831
1832    #[tokio::test]
1833    async fn test_ledger_decreases_on_losing_close() {
1834        use crate::rsm::ledger::InMemoryLedger;
1835
1836        let service = PortfolioServiceImpl::new();
1837        let ledger = Arc::new(InMemoryLedger::new());
1838        service.set_ledger(ledger.clone()).await;
1839
1840        // Set initial ledger balance
1841        ledger
1842            .set_balance(&test_wallet(1), dec!(10000))
1843            .await
1844            .unwrap();
1845
1846        // Buy 10 contracts at 1000
1847        let buy_event = create_fill(test_wallet(1), "BTC-CALL-100000", Side::Buy, 1000.0, 10.0);
1848        service.apply_event(&buy_event).await.unwrap();
1849
1850        // Sell 10 contracts at 800 (loss of 200 per contract = 2000 total)
1851        let sell_event = create_fill(test_wallet(1), "BTC-CALL-100000", Side::Sell, 800.0, 10.0);
1852        service.apply_event(&sell_event).await.unwrap();
1853
1854        // Ledger balance should decrease by realized loss
1855        let balance_after_sell = ledger.get_balance(&test_wallet(1)).await.unwrap();
1856        assert_eq!(
1857            balance_after_sell,
1858            dec!(8000),
1859            "Balance should decrease by realized loss (2000)"
1860        );
1861    }
1862
1863    #[tokio::test]
1864    async fn test_ledger_on_short_close() {
1865        use crate::rsm::ledger::InMemoryLedger;
1866
1867        let service = PortfolioServiceImpl::new();
1868        let ledger = Arc::new(InMemoryLedger::new());
1869        service.set_ledger(ledger.clone()).await;
1870
1871        // Set initial ledger balance
1872        ledger
1873            .set_balance(&test_wallet(1), dec!(10000))
1874            .await
1875            .unwrap();
1876
1877        // Short sell 10 contracts at 1000
1878        let sell_event = create_fill(test_wallet(1), "BTC-CALL-100000", Side::Sell, 1000.0, 10.0);
1879        service.apply_event(&sell_event).await.unwrap();
1880
1881        // Balance unchanged after short (opening)
1882        let balance_after_short = ledger.get_balance(&test_wallet(1)).await.unwrap();
1883        assert_eq!(
1884            balance_after_short,
1885            dec!(10000),
1886            "Balance should be unchanged after opening short"
1887        );
1888
1889        // Buy to close at 900 (profit of 100 per contract = 1000 total)
1890        let buy_event = create_fill(test_wallet(1), "BTC-CALL-100000", Side::Buy, 900.0, 10.0);
1891        service.apply_event(&buy_event).await.unwrap();
1892
1893        // Ledger balance should increase by realized profit from closing short
1894        let balance_after_close = ledger.get_balance(&test_wallet(1)).await.unwrap();
1895        assert_eq!(
1896            balance_after_close,
1897            dec!(11000),
1898            "Balance should increase by profit from closing short (1000)"
1899        );
1900    }
1901
1902    #[tokio::test]
1903    async fn test_partial_close_applies_partial_pnl() {
1904        use crate::rsm::ledger::InMemoryLedger;
1905
1906        let service = PortfolioServiceImpl::new();
1907        let ledger = Arc::new(InMemoryLedger::new());
1908        service.set_ledger(ledger.clone()).await;
1909
1910        // Set initial ledger balance
1911        ledger
1912            .set_balance(&test_wallet(1), dec!(10000))
1913            .await
1914            .unwrap();
1915
1916        // Buy 10 contracts at 1000
1917        let buy_event = create_fill(test_wallet(1), "BTC-CALL-100000", Side::Buy, 1000.0, 10.0);
1918        service.apply_event(&buy_event).await.unwrap();
1919
1920        // Sell 5 contracts at 1200 (profit of 200 per contract = 1000 total)
1921        let sell_event = create_fill(test_wallet(1), "BTC-CALL-100000", Side::Sell, 1200.0, 5.0);
1922        service.apply_event(&sell_event).await.unwrap();
1923
1924        // Ledger balance should increase by partial realized profit
1925        let balance = ledger.get_balance(&test_wallet(1)).await.unwrap();
1926        assert_eq!(
1927            balance,
1928            dec!(11000),
1929            "Balance should increase by partial profit (1000)"
1930        );
1931
1932        // Verify position is partially closed
1933        let portfolio = service.get_portfolio(&test_wallet(1)).await;
1934        assert_eq!(portfolio.positions.len(), 1);
1935        assert_eq!(
1936            portfolio.positions[0].position.amount,
1937            dec!(5),
1938            "Should have 5 remaining contracts"
1939        );
1940    }
1941
1942    // ============================================================================
1943    // Idempotency tests
1944    // ============================================================================
1945    // NOTE: Fill idempotency is now handled at the DB level via
1946    // ON CONFLICT (trade_id, wallet_address, is_taker) DO NOTHING in
1947    // apply_fill_with_ledger_sync. See portfolio_cache_tests for DB-level
1948    // idempotency tests. The in-memory apply_fill* methods no longer
1949    // deduplicate - they assume the caller has already checked DB state.
1950    // ============================================================================
1951
1952    #[tokio::test]
1953    async fn test_multiple_fills_accumulate() {
1954        let service = create_service();
1955
1956        // Apply two different fills with different trade_ids
1957        let fill1 = create_fill_with_trade_id(
1958            1,
1959            test_wallet(1),
1960            "BTC-CALL-100000",
1961            Side::Buy,
1962            1000.0,
1963            10.0,
1964        );
1965        let fill2 = create_fill_with_trade_id(
1966            2,
1967            test_wallet(1),
1968            "BTC-CALL-100000",
1969            Side::Buy,
1970            1000.0,
1971            10.0,
1972        );
1973
1974        service.apply_event(&fill1).await.unwrap();
1975        service.apply_event(&fill2).await.unwrap();
1976
1977        // Both should be applied
1978        let portfolio = service.get_portfolio(&test_wallet(1)).await;
1979        assert_eq!(portfolio.positions.len(), 1);
1980        assert_eq!(
1981            portfolio.positions[0].position.amount,
1982            dec!(20),
1983            "Should have 20 contracts (both fills applied)"
1984        );
1985    }
1986
1987    // ============================================================================
1988    // Position Expiration Tests
1989    // ============================================================================
1990
1991    use hypercall_types::PositionExpiredMessage;
1992
1993    fn create_position_expired(
1994        wallet: WalletAddress,
1995        symbol: &str,
1996        position_size: Decimal,
1997        settlement_price: Decimal,
1998        settlement_value: Decimal,
1999    ) -> EngineMessage {
2000        EngineMessage::PositionExpired(PositionExpiredMessage {
2001            wallet_address: wallet,
2002            margin_mode: crate::rsm::MarginMode::Standard,
2003            symbol: symbol.to_string(),
2004            position_size,
2005            settlement_price,
2006            settlement_value,
2007            settlement_entry_price: None,
2008            cost_basis: None,
2009            net_pnl: None,
2010            timestamp: 12345,
2011        })
2012    }
2013
2014    #[tokio::test]
2015    async fn test_position_expiry_removes_position() {
2016        use crate::rsm::ledger::InMemoryLedger;
2017
2018        let service = create_service();
2019        let ledger = Arc::new(InMemoryLedger::new());
2020        service.set_ledger(ledger.clone()).await;
2021
2022        let wallet = test_wallet(1);
2023        let symbol = "BTC-CALL-100000";
2024
2025        // Set initial ledger balance
2026        ledger.set_balance(&wallet, dec!(10000)).await.unwrap();
2027
2028        // Open a position via fill
2029        let fill = create_fill(wallet, symbol, Side::Buy, 500.0, 10.0);
2030        service.apply_event(&fill).await.unwrap();
2031        let balance_after_fill = ledger.get_balance(&wallet).await.unwrap();
2032
2033        // Verify position exists
2034        let portfolio = service.get_portfolio(&wallet).await;
2035        assert_eq!(portfolio.positions.len(), 1);
2036        assert_eq!(portfolio.positions[0].position.symbol, symbol);
2037        assert_eq!(portfolio.positions[0].position.amount, dec!(10));
2038
2039        // Apply position expiration
2040        let expiry = create_position_expired(
2041            wallet,
2042            symbol,
2043            dec!(10),    // position_size
2044            dec!(5000),  // settlement_price (intrinsic value)
2045            dec!(50000), // settlement_value (5000 * 10)
2046        );
2047        service.apply_event(&expiry).await.unwrap();
2048
2049        // Position should be removed
2050        let portfolio_after = service.get_portfolio(&wallet).await;
2051        assert_eq!(
2052            portfolio_after.positions.len(),
2053            0,
2054            "Position should be removed after expiry"
2055        );
2056
2057        // Settlement accounting is DB-owned; PortfolioService only removes the position.
2058        let balance = ledger.get_balance(&wallet).await.unwrap();
2059        assert_eq!(
2060            balance, balance_after_fill,
2061            "PortfolioService expiry projection must not apply settlement cash"
2062        );
2063    }
2064
2065    #[tokio::test]
2066    async fn test_position_expiry_nonexistent_position_is_noop() {
2067        let service = create_service();
2068        let wallet = test_wallet(1);
2069        let symbol = "BTC-CALL-100000";
2070
2071        // Don't create any position first - just expire a non-existent one
2072        let expiry = create_position_expired(wallet, symbol, dec!(10), dec!(5000), dec!(50000));
2073
2074        // This should NOT error - it's a no-op for non-existent positions
2075        service.apply_event(&expiry).await.unwrap();
2076
2077        // Portfolio should remain empty
2078        let portfolio = service.get_portfolio(&wallet).await;
2079        assert!(portfolio.positions.is_empty());
2080    }
2081
2082    #[tokio::test]
2083    async fn test_position_expiry_zero_settlement_no_ledger_call() {
2084        let service = create_service();
2085        let wallet = test_wallet(1);
2086        let symbol = "BTC-CALL-100000";
2087
2088        // Create a position
2089        let fill = create_fill(wallet, symbol, Side::Buy, 500.0, 10.0);
2090        service.apply_event(&fill).await.unwrap();
2091
2092        // Expire with zero settlement (OTM option)
2093        let expiry = create_position_expired(
2094            wallet,
2095            symbol,
2096            dec!(10),
2097            dec!(0), // OTM - no intrinsic value
2098            dec!(0), // Zero settlement
2099        );
2100        service.apply_event(&expiry).await.unwrap();
2101
2102        // Position removed even with zero settlement
2103        let portfolio = service.get_portfolio(&wallet).await;
2104        assert!(portfolio.positions.is_empty());
2105    }
2106
2107    #[tokio::test]
2108    async fn test_position_expiry_replay_idempotency() {
2109        use crate::rsm::ledger::InMemoryLedger;
2110
2111        let service = create_service();
2112        let ledger = Arc::new(InMemoryLedger::new());
2113        service.set_ledger(ledger.clone()).await;
2114
2115        let wallet = test_wallet(1);
2116        let symbol = "BTC-CALL-100000";
2117
2118        // Set initial ledger balance
2119        ledger.set_balance(&wallet, dec!(10000)).await.unwrap();
2120
2121        // Open a position via fill
2122        let fill = create_fill(wallet, symbol, Side::Buy, 500.0, 10.0);
2123        service.apply_event(&fill).await.unwrap();
2124        let balance_after_fill = ledger.get_balance(&wallet).await.unwrap();
2125
2126        // Apply position expiration first time. PortfolioService projection
2127        // removes positions only; settlement accounting is DB-owned.
2128        let expiry = create_position_expired(
2129            wallet,
2130            symbol,
2131            dec!(10),    // position_size
2132            dec!(5000),  // settlement_price
2133            dec!(50000), // settlement_value
2134        );
2135        service.apply_event(&expiry).await.unwrap();
2136
2137        // Verify first application worked
2138        let balance_after_first = ledger.get_balance(&wallet).await.unwrap();
2139        assert_eq!(
2140            balance_after_first, balance_after_fill,
2141            "PortfolioService expiry projection must not apply settlement cash"
2142        );
2143
2144        // Apply the SAME expiration event again (simulating event replay)
2145        // This should be a no-op because position no longer exists
2146        service.apply_event(&expiry).await.unwrap();
2147
2148        // Balance should NOT change - idempotency via position_removed check
2149        let balance_after_replay = ledger.get_balance(&wallet).await.unwrap();
2150        assert_eq!(
2151            balance_after_replay, balance_after_fill,
2152            "Replay should not change ledger balance"
2153        );
2154
2155        // Position should still be removed
2156        let portfolio = service.get_portfolio(&wallet).await;
2157        assert!(portfolio.positions.is_empty());
2158    }
2159}