Skip to main content

hypercall/read_cache/
portfolio.rs

1use super::greeks::GreeksCache;
2use super::tier::TierCache;
3use hypercall_types::api_models::{
4    MarginSummary, Portfolio, PortfolioGreeksAggregate, Position, PositionGreeksLeg,
5    PositionWithMetrics, SpanMarginSummary,
6};
7use hypercall_types::portfolio_greeks::{
8    build_net_position_quantities, calculate_portfolio_greeks,
9};
10use hypercall_types::position_metrics::{enrich_position_metrics, PositionMarginMetrics};
11
12use crate::portfolio::canonical_perp_symbol;
13use crate::portfolio::{
14    HypercorePositionUpdate, PortfolioBalance, PortfolioChange, PortfolioService,
15    PortfolioServiceImpl,
16};
17use crate::rsm::margin_service::{ExtendedRiskGrid, ScenarioPnl, SpanMarginService};
18use crate::rsm::portfolio_margin::RiskAccountBuilder;
19use crate::rsm::MarginMode;
20use crate::shared::order_types::ParsedSymbol;
21use crate::snapshot::{
22    DbPortfolioSnapshotLoader, SnapshotLoader, Snapshotable, SyncState, SyncStatus,
23};
24use crate::standard_margin::{
25    PositionMarginContribution, StandardAccountBuilder, StandardMarginService,
26};
27use crate::types::MarginDetails;
28use anyhow::Result;
29use futures::future::join_all;
30use hypercall_db_diesel::DatabaseHandler;
31use hypercall_runtime_api::OrderSnapshotProvider;
32use hypercall_runtime_api::RuntimeOrderSummary;
33use hypercall_types::ws_protocol::PortfolioUpdate;
34use hypercall_types::{to_contract_units_decimal, to_human_readable_decimal, WalletAddress};
35use hypercall_types::{EngineMessage, PositionExpiredMessage};
36use rust_decimal::prelude::FromPrimitive;
37use rust_decimal::Decimal;
38use rust_decimal_macros::dec;
39use std::collections::{BTreeSet, HashMap};
40use std::str::FromStr;
41use std::sync::Arc;
42use tokio::sync::{mpsc, Mutex, OwnedMutexGuard, RwLock};
43
44/// Notification work collected under the projection barrier and dispatched
45/// after the barrier is released so that WebSocket sends don't block other
46/// state mutations.
47pub(crate) enum PendingNotifications {
48    Fill { fill: hypercall_types::Fill },
49    Changes(Vec<PortfolioChange>),
50}
51
52type SubscriberId = u64;
53pub const ENGINE_COMMAND_SNAPSHOT_STREAM: &str = "engine_command";
54
55#[derive(Debug, Clone)]
56pub struct WalletMarginSnapshot {
57    pub mode: MarginMode,
58    pub span_margin: SpanMarginSummary,
59    pub margin_summary: MarginSummary,
60    pub total_margin_used: Decimal,
61    pub available_balance: Decimal,
62    /// Per-symbol IM/MM contributions for Standard mode.
63    /// None for Portfolio (SPAN) mode because portfolio-level scenario risk
64    /// is not additively attributable per position.
65    pub standard_position_contributions: Option<HashMap<String, PositionMarginContribution>>,
66    /// Per-symbol current option marks for Standard mode.
67    /// None for Portfolio (SPAN) mode.
68    pub standard_option_marks: Option<HashMap<String, Decimal>>,
69}
70
71#[derive(Debug, Clone)]
72pub struct PmRiskGridData {
73    pub margin_details: MarginDetails,
74    pub position_details: MarginDetails,
75    pub scenario_pnls: Vec<ScenarioPnl>,
76    pub extended_grid: ExtendedRiskGrid,
77}
78
79pub struct PortfolioCache {
80    service: Arc<PortfolioServiceImpl>,
81    db: Arc<DatabaseHandler>,
82    settlement: Arc<dyn hypercall_db::SettlementWriter>,
83    last_processed_seq: Arc<RwLock<i64>>,
84    projection_barrier: Arc<Mutex<()>>,
85    subscribers: Arc<
86        RwLock<
87            HashMap<WalletAddress, HashMap<SubscriberId, mpsc::UnboundedSender<PortfolioUpdate>>>,
88        >,
89    >,
90    next_subscriber_id: Arc<RwLock<SubscriberId>>,
91    /// Optional SPAN margin service for real-time margin updates
92    span_margin_service: Arc<RwLock<Option<Arc<SpanMarginService>>>>,
93    /// Optional Greeks cache for spot price lookups
94    greeks_cache: Arc<RwLock<Option<Arc<GreeksCache>>>>,
95    /// Optional RiskAccountBuilder for computing open orders IM
96    risk_account_builder: Arc<RwLock<Option<Arc<RiskAccountBuilder>>>>,
97    /// Optional tier cache for margin mode lookups.
98    tier_cache: Arc<RwLock<Option<Arc<TierCache>>>>,
99    /// Optional order snapshot provider for open-order calculations.
100    order_snapshot: Arc<RwLock<Option<Arc<dyn OrderSnapshotProvider>>>>,
101    /// Optional standard margin service for Standard mode calculations.
102    standard_margin_service: Arc<RwLock<Option<Arc<StandardMarginService>>>>,
103    /// Optional standard account builder for Standard mode account assembly.
104    standard_account_builder: Arc<RwLock<Option<Arc<StandardAccountBuilder>>>>,
105    /// Portfolio sync status for readiness gating.
106    /// API/engine should not report "ready" until this is Ready.
107    sync_status: Arc<SyncStatus>,
108}
109
110impl PortfolioCache {
111    pub fn new(db: Arc<DatabaseHandler>) -> Self {
112        let settlement: Arc<dyn hypercall_db::SettlementWriter> = db.clone();
113        Self {
114            service: Arc::new(PortfolioServiceImpl::new()),
115            settlement,
116            db,
117            last_processed_seq: Arc::new(RwLock::new(0)),
118            projection_barrier: Arc::new(Mutex::new(())),
119            subscribers: Arc::new(RwLock::new(HashMap::new())),
120            next_subscriber_id: Arc::new(RwLock::new(0)),
121            span_margin_service: Arc::new(RwLock::new(None)),
122            greeks_cache: Arc::new(RwLock::new(None)),
123            risk_account_builder: Arc::new(RwLock::new(None)),
124            tier_cache: Arc::new(RwLock::new(None)),
125            order_snapshot: Arc::new(RwLock::new(None)),
126            standard_margin_service: Arc::new(RwLock::new(None)),
127            standard_account_builder: Arc::new(RwLock::new(None)),
128            sync_status: Arc::new(SyncStatus::new()),
129        }
130    }
131
132    /// Get the sync status for readiness checks.
133    ///
134    /// API endpoints and WebSocket handlers should check this before
135    /// serving requests that depend on portfolio state.
136    pub fn sync_status(&self) -> Arc<SyncStatus> {
137        self.sync_status.clone()
138    }
139
140    pub async fn lock_projection_barrier(&self) -> OwnedMutexGuard<()> {
141        self.projection_barrier.clone().lock_owned().await
142    }
143
144    /// Check if portfolio is ready to serve requests.
145    pub fn is_ready(&self) -> bool {
146        self.sync_status.is_ready()
147    }
148
149    /// Get the current sync state.
150    pub fn sync_state(&self) -> SyncState {
151        self.sync_status.state()
152    }
153
154    /// Mark portfolio as catching up (called after snapshot restore).
155    pub fn set_catching_up(&self) {
156        self.sync_status.set_catching_up();
157        tracing::info!("Portfolio sync status: CatchingUp");
158    }
159
160    /// Mark portfolio as ready (called after catchup replay completes).
161    pub fn set_ready(&self) {
162        self.sync_status.set_ready();
163        tracing::info!("Portfolio sync status: Ready");
164    }
165
166    /// Set margin-related dependencies for shared REST/WS margin calculations.
167    pub async fn set_margin_dependencies(
168        &self,
169        span_margin_service: Arc<SpanMarginService>,
170        greeks_cache: Arc<GreeksCache>,
171        risk_account_builder: Arc<RiskAccountBuilder>,
172        tier_cache: Arc<TierCache>,
173        order_snapshot: Arc<dyn OrderSnapshotProvider>,
174        standard_margin_service: Arc<StandardMarginService>,
175        standard_account_builder: Arc<StandardAccountBuilder>,
176    ) {
177        let mut svc = self.span_margin_service.write().await;
178        *svc = Some(span_margin_service);
179        drop(svc);
180
181        let mut cache = self.greeks_cache.write().await;
182        *cache = Some(greeks_cache);
183        drop(cache);
184
185        let mut builder = self.risk_account_builder.write().await;
186        *builder = Some(risk_account_builder);
187        drop(builder);
188
189        let mut tier = self.tier_cache.write().await;
190        *tier = Some(tier_cache);
191        drop(tier);
192
193        let mut orders = self.order_snapshot.write().await;
194        *orders = Some(order_snapshot);
195        drop(orders);
196
197        let mut standard_svc = self.standard_margin_service.write().await;
198        *standard_svc = Some(standard_margin_service);
199        drop(standard_svc);
200
201        let mut standard_builder = self.standard_account_builder.write().await;
202        *standard_builder = Some(standard_account_builder);
203    }
204
205    pub async fn compute_wallet_margin_snapshot(
206        &self,
207        wallet: &WalletAddress,
208    ) -> anyhow::Result<WalletMarginSnapshot> {
209        let margin_mode = self.margin_mode_for_wallet(wallet).await?;
210        self.compute_wallet_margin_snapshot_for_mode(wallet, margin_mode)
211            .await
212    }
213
214    async fn compute_wallet_margin_snapshot_for_mode(
215        &self,
216        wallet: &WalletAddress,
217        margin_mode: MarginMode,
218    ) -> anyhow::Result<WalletMarginSnapshot> {
219        match margin_mode {
220            MarginMode::Portfolio => self.compute_portfolio_mode_snapshot(wallet).await,
221            MarginMode::Standard => self.compute_standard_mode_snapshot(wallet).await,
222        }
223    }
224
225    async fn compute_portfolio_mode_snapshot(
226        &self,
227        wallet: &WalletAddress,
228    ) -> anyhow::Result<WalletMarginSnapshot> {
229        self.prune_applied_settlement_ghost_positions(wallet)
230            .await?;
231
232        let margin_svc = self.span_margin_service.read().await;
233        let risk_builder = self.risk_account_builder.read().await;
234
235        let (margin_service, risk_account_builder) =
236            match (margin_svc.as_ref(), risk_builder.as_ref()) {
237                (Some(m), Some(r)) => (m.clone(), r.clone()),
238                _ => {
239                    return Err(anyhow::anyhow!(
240                        "Portfolio margin dependencies are not fully configured"
241                    ));
242                }
243            };
244        drop(margin_svc);
245        drop(risk_builder);
246
247        let snapshot = risk_account_builder.build_snapshot(wallet).await?;
248        let market_state =
249            risk_account_builder.resolve_market_state(&snapshot, margin_service.config())?;
250        let position_snapshot = snapshot.without_open_orders();
251
252        let margin_details = margin_service
253            .compute_margin_from_snapshot(&snapshot, market_state.clone())
254            .map_err(|e| anyhow::anyhow!("SPAN margin failed for {}: {}", wallet, e))?;
255        let position_details = margin_service
256            .compute_margin_from_snapshot(&position_snapshot, market_state)
257            .map_err(|e| {
258                anyhow::anyhow!("Position-only SPAN margin failed for {}: {}", wallet, e)
259            })?;
260
261        let position_im = position_details.initial_margin_required;
262        let open_orders_im = (margin_details.initial_margin_required
263            - position_details.initial_margin_required)
264            .max(Decimal::ZERO);
265        let position_mm = position_details.maintenance_margin_required;
266        let total_margin_used = position_im + open_orders_im;
267        let available_balance = (margin_details.equity - total_margin_used).max(dec!(0));
268
269        let span_margin = SpanMarginSummary {
270            equity: margin_details.equity,
271            initial_margin_required: position_im,
272            maintenance_margin_required: position_mm,
273            open_orders_initial_margin: open_orders_im,
274            option_margin_required: position_details.scanning_risk,
275            scanning_risk: position_details.scanning_risk,
276            option_floor: position_details.option_floor,
277            gamma_overlay: position_details.gamma_overlay,
278            hypercore_margin_required: dec!(0),
279        };
280
281        let margin_summary = MarginSummary {
282            mode: MarginMode::Portfolio.as_str().to_string(),
283            equity: margin_details.equity,
284            position_im,
285            open_orders_im,
286            initial_margin: margin_details.equity - position_im - open_orders_im,
287            maintenance_margin: margin_details.equity - position_mm,
288            open_orders_premium_reserved: None,
289        };
290
291        Ok(WalletMarginSnapshot {
292            mode: MarginMode::Portfolio,
293            span_margin,
294            margin_summary,
295            total_margin_used,
296            available_balance,
297            standard_position_contributions: None,
298            standard_option_marks: None,
299        })
300    }
301
302    pub async fn compute_pm_risk_grid_data(
303        &self,
304        wallet: &WalletAddress,
305    ) -> anyhow::Result<PmRiskGridData> {
306        let margin_mode = self.margin_mode_for_wallet(wallet).await?;
307        if matches!(margin_mode, MarginMode::Standard) {
308            return Err(anyhow::anyhow!(
309                "compute_pm_risk_grid_data called for Standard-mode wallet {}",
310                wallet
311            ));
312        }
313
314        self.prune_applied_settlement_ghost_positions(wallet)
315            .await?;
316
317        let margin_svc = self.span_margin_service.read().await;
318        let risk_builder = self.risk_account_builder.read().await;
319
320        let (margin_service, risk_account_builder) =
321            match (margin_svc.as_ref(), risk_builder.as_ref()) {
322                (Some(m), Some(r)) => (m.clone(), r.clone()),
323                _ => {
324                    return Err(anyhow::anyhow!(
325                        "Portfolio margin dependencies are not fully configured"
326                    ));
327                }
328            };
329        drop(margin_svc);
330        drop(risk_builder);
331
332        let snapshot = risk_account_builder.build_snapshot(wallet).await?;
333        let market_state =
334            risk_account_builder.resolve_market_state(&snapshot, margin_service.config())?;
335        let position_snapshot = snapshot.without_open_orders();
336
337        let margin_details = margin_service
338            .compute_margin_from_snapshot(&snapshot, market_state.clone())
339            .map_err(|e| anyhow::anyhow!("SPAN margin failed: {}", e))?;
340        let position_details = margin_service
341            .compute_margin_from_snapshot(&position_snapshot, market_state.clone())
342            .map_err(|e| anyhow::anyhow!("Position-only SPAN margin failed: {}", e))?;
343        let scenario_pnls = margin_service
344            .compute_risk_grid_from_snapshot(&snapshot, market_state.clone())
345            .map_err(|e| anyhow::anyhow!("Risk grid failed: {}", e))?;
346        let extended_grid = margin_service
347            .compute_extended_risk_grid_from_snapshot(&snapshot, market_state)
348            .map_err(|e| anyhow::anyhow!("Extended risk grid failed: {}", e))?;
349
350        Ok(PmRiskGridData {
351            margin_details,
352            position_details,
353            scenario_pnls,
354            extended_grid,
355        })
356    }
357
358    async fn compute_standard_mode_snapshot(
359        &self,
360        wallet: &WalletAddress,
361    ) -> anyhow::Result<WalletMarginSnapshot> {
362        let standard_svc_guard = self.standard_margin_service.read().await;
363        let standard_builder_guard = self.standard_account_builder.read().await;
364        let order_snapshot_guard = self.order_snapshot.read().await;
365        let greeks_cache_guard = self.greeks_cache.read().await;
366
367        let (standard_margin_service, standard_account_builder, order_snapshot, greeks_cache) =
368            match (
369                standard_svc_guard.as_ref(),
370                standard_builder_guard.as_ref(),
371                order_snapshot_guard.as_ref(),
372                greeks_cache_guard.as_ref(),
373            ) {
374                (Some(svc), Some(builder), Some(snapshot), Some(greeks)) => (
375                    svc.clone(),
376                    builder.clone(),
377                    snapshot.clone(),
378                    greeks.clone(),
379                ),
380                _ => {
381                    return Err(anyhow::anyhow!(
382                        "Standard margin dependencies are not fully configured"
383                    ));
384                }
385            };
386        drop(standard_svc_guard);
387        drop(standard_builder_guard);
388        drop(order_snapshot_guard);
389        drop(greeks_cache_guard);
390
391        // Refresh UPNL before building the account — compute_standard_mode_snapshot
392        // is called independently of get_portfolio, so positions may have stale UPNL.
393        if let Err(e) = self.refresh_live_market_prices_for_wallet(wallet).await {
394            tracing::warn!(
395                "Standard margin snapshot: could not refresh prices for {}: {}",
396                wallet,
397                e
398            );
399        }
400
401        let standard_account = standard_account_builder.build(wallet).await?;
402        let standard_position_contributions =
403            standard_margin_service.compute_position_margin_breakdown(&standard_account);
404        let standard_option_marks: HashMap<String, Decimal> = standard_account
405            .option_positions
406            .iter()
407            .map(|p| (p.symbol.clone(), p.mark_price))
408            .collect();
409        let cash = standard_account.usdc_balance;
410        let base_result = standard_margin_service.compute_margin(&standard_account);
411
412        let mut hypothetical_account = standard_account.clone();
413        let open_sell_positions =
414            Self::snapshot_open_sell_positions(wallet, order_snapshot.as_ref())?;
415        let missing_open_sell_spot_underlyings: BTreeSet<String> = open_sell_positions
416            .iter()
417            .filter(|pos| pos.position.spot_price == dec!(0))
418            .map(|pos| pos.position.underlying.clone())
419            .collect();
420        let mut missing_open_sell_spot_prices = HashMap::new();
421        if !missing_open_sell_spot_underlyings.is_empty() {
422            let fetches =
423                missing_open_sell_spot_underlyings
424                    .into_iter()
425                    .map(|underlying| {
426                        let greeks_cache = greeks_cache.clone();
427                        async move {
428                            let spot = greeks_cache.get_spot_price(&underlying).await.ok_or_else(
429                                || {
430                                    anyhow::anyhow!(
431                                        "Missing spot price for {} in standard margin calculation",
432                                        underlying
433                                    )
434                                },
435                            )?;
436                            let spot_decimal = Decimal::from_f64(spot).ok_or_else(|| {
437                                anyhow::anyhow!("Invalid spot price conversion for {}", underlying)
438                            })?;
439                            Ok::<(String, Decimal), anyhow::Error>((underlying, spot_decimal))
440                        }
441                    });
442
443            for result in join_all(fetches).await {
444                let (underlying, spot_decimal) = result?;
445                missing_open_sell_spot_prices.insert(underlying, spot_decimal);
446            }
447        }
448
449        for mut pos in open_sell_positions {
450            if pos.position.spot_price == dec!(0) {
451                pos.position.spot_price = *missing_open_sell_spot_prices
452                    .get(&pos.position.underlying)
453                    .ok_or_else(|| {
454                        anyhow::anyhow!(
455                            "Missing prefetched spot price for {} in standard margin calculation",
456                            pos.position.underlying
457                        )
458                    })?;
459            }
460            hypothetical_account.option_positions.push(pos.position);
461        }
462
463        let hypothetical_result = standard_margin_service.compute_margin(&hypothetical_account);
464        let open_orders_im =
465            (hypothetical_result.position_im - base_result.position_im).max(dec!(0));
466        let premium_reserved = Self::snapshot_open_buy_premium(wallet, order_snapshot.as_ref())?;
467        let total_margin_used = base_result.position_im + open_orders_im;
468        let available_balance = (cash - total_margin_used - premium_reserved).max(dec!(0));
469
470        let span_margin = SpanMarginSummary {
471            equity: base_result.equity,
472            initial_margin_required: base_result.position_im,
473            maintenance_margin_required: base_result.position_mm,
474            open_orders_initial_margin: open_orders_im,
475            option_margin_required: base_result.position_im,
476            scanning_risk: dec!(0),
477            option_floor: dec!(0),
478            gamma_overlay: dec!(0),
479            hypercore_margin_required: dec!(0),
480        };
481
482        let margin_summary = MarginSummary {
483            mode: MarginMode::Standard.as_str().to_string(),
484            equity: base_result.equity,
485            position_im: base_result.position_im,
486            open_orders_im,
487            initial_margin: base_result.equity
488                - base_result.position_im
489                - open_orders_im
490                - premium_reserved,
491            maintenance_margin: base_result.equity - base_result.position_mm,
492            open_orders_premium_reserved: Some(premium_reserved),
493        };
494
495        Ok(WalletMarginSnapshot {
496            mode: MarginMode::Standard,
497            span_margin,
498            margin_summary,
499            total_margin_used,
500            available_balance,
501            standard_position_contributions: Some(standard_position_contributions),
502            standard_option_marks: Some(standard_option_marks),
503        })
504    }
505
506    fn snapshot_open_sell_positions(
507        wallet: &WalletAddress,
508        order_snapshot: &dyn OrderSnapshotProvider,
509    ) -> anyhow::Result<Vec<hypercall_engine::order_index::OpenSellPositionInfo>> {
510        let mut positions = Vec::new();
511        for order in order_snapshot.get_open_orders_for_wallet(wallet) {
512            if !matches!(order.side, hypercall_types::Side::Sell) {
513                continue;
514            }
515            let parsed = ParsedSymbol::from_symbol(&order.symbol).map_err(|e| {
516                anyhow::anyhow!(
517                    "Invalid option symbol '{}' in open sell order {} for {}: {}",
518                    order.symbol,
519                    order.order_id,
520                    wallet,
521                    e
522                )
523            })?;
524            if order.remaining_size <= dec!(0) {
525                continue;
526            }
527
528            let premium = order.price * order.remaining_size;
529            let expiry_date_str = format!("{}", parsed.expiry);
530            let expiry_code = expiry_date_str.parse::<u64>().map_err(|e| {
531                anyhow::anyhow!(
532                    "Invalid expiry '{}' in open sell order {} for {} ({}): {}",
533                    expiry_date_str,
534                    order.order_id,
535                    wallet,
536                    order.symbol,
537                    e
538                )
539            })?;
540            let expiry_ts =
541                hypercall_types::expiry_date_to_timestamp_checked(&parsed.underlying, expiry_code)
542                    .map_err(|e| {
543                    anyhow::anyhow!(
544                        "Invalid expiry timestamp in open sell order {} for {} ({}): {}",
545                        order.order_id,
546                        wallet,
547                        order.symbol,
548                        e
549                    )
550                })? as i64;
551
552            let position = hypercall_margin::OptionPosition {
553                symbol: order.symbol.clone(),
554                underlying: parsed.underlying.clone(),
555                expiry_ts,
556                strike: parsed.strike,
557                is_call: matches!(parsed.option_type, crate::types::OptionType::Call),
558                size: -order.remaining_size,
559                mark_price: order.price,
560                entry_price: order.price,
561                spot_price: dec!(0),
562            };
563
564            positions
565                .push(hypercall_engine::order_index::OpenSellPositionInfo { premium, position });
566        }
567
568        Ok(positions)
569    }
570
571    fn snapshot_open_buy_premium(
572        wallet: &WalletAddress,
573        order_snapshot: &dyn OrderSnapshotProvider,
574    ) -> anyhow::Result<Decimal> {
575        order_snapshot
576            .get_open_orders_for_wallet(wallet)
577            .into_iter()
578            .filter(|order: &RuntimeOrderSummary| matches!(order.side, hypercall_types::Side::Buy))
579            .filter(|order| order.remaining_size > dec!(0))
580            .try_fold(dec!(0), |acc, order| {
581                ParsedSymbol::from_symbol(&order.symbol).map_err(|e| {
582                    anyhow::anyhow!(
583                        "Invalid option symbol '{}' in open buy order {} for {}: {}",
584                        order.symbol,
585                        order.order_id,
586                        wallet,
587                        e
588                    )
589                })?;
590                Ok(acc + (order.price * order.remaining_size))
591            })
592    }
593
594    /// Compute and publish margin update for a wallet to all subscribers.
595    /// This is called after fills to provide real-time margin updates.
596    pub async fn publish_margin_update(&self, wallet: &WalletAddress) {
597        if let Err(error) = self.try_publish_margin_update(wallet).await {
598            tracing::error!("Failed to publish margin update for {}: {}", wallet, error);
599        }
600    }
601
602    async fn try_publish_margin_update(&self, wallet: &WalletAddress) -> anyhow::Result<()> {
603        let margin_mode = self.margin_mode_for_wallet(wallet).await?;
604
605        // Refresh market prices before computing the margin snapshot so equity
606        // and margin values reflect the current mark, not stale in-memory UPNL.
607        match margin_mode {
608            MarginMode::Portfolio => {
609                self.refresh_live_market_prices_for_wallet(wallet)
610                    .await
611                    .map_err(|error| {
612                        anyhow::anyhow!(
613                            "PM margin update unavailable due to strict repricing failure: {}",
614                            error
615                        )
616                    })?;
617            }
618            MarginMode::Standard => {
619                if let Err(error) = self.refresh_live_market_prices_for_wallet(wallet).await {
620                    tracing::warn!(
621                        "Skipping live repricing before standard margin update for {}: {}",
622                        wallet,
623                        error
624                    );
625                }
626            }
627        }
628
629        let snapshot = self
630            .compute_wallet_margin_snapshot_for_mode(wallet, margin_mode)
631            .await
632            .map_err(|error| {
633                anyhow::anyhow!(
634                    "margin snapshot calculation failed for {}: {}",
635                    wallet,
636                    error
637                )
638            })?;
639
640        // Notify subscribers
641        self.notify_subscribers(
642            wallet,
643            PortfolioUpdate::MarginUpdate {
644                span_margin: snapshot.span_margin,
645                total_margin_used: snapshot.total_margin_used,
646                available_balance: snapshot.available_balance,
647                timestamp: chrono::Utc::now().timestamp(),
648            },
649        )
650        .await;
651
652        Ok(())
653    }
654
655    async fn prune_applied_settlement_ghost_positions(
656        &self,
657        wallet: &WalletAddress,
658    ) -> anyhow::Result<usize> {
659        let option_symbols = match self.service.get_portfolio_balance(wallet).await {
660            Some(balance) => balance
661                .positions
662                .keys()
663                .filter(|symbol| ParsedSymbol::from_symbol(symbol).is_ok())
664                .cloned()
665                .collect::<Vec<_>>(),
666            None => return Ok(0),
667        };
668
669        if option_symbols.is_empty() {
670            return Ok(0);
671        }
672
673        let settlement = self.settlement.clone();
674        let wallet_owned = *wallet;
675        let settled_symbols = tokio::task::spawn_blocking(move || {
676            settlement.get_applied_settlement_symbols_sync(&wallet_owned, &option_symbols)
677        })
678        .await
679        .map_err(|e| anyhow::anyhow!("spawn_blocking join error: {}", e))?
680        .map_err(|error| {
681            anyhow::anyhow!(
682                "Failed to query applied settlements for {}: {}",
683                wallet,
684                error
685            )
686        })?;
687
688        for symbol in &settled_symbols {
689            tracing::warn!(
690                wallet = %wallet,
691                symbol,
692                "Removing settled ghost position from portfolio projection"
693            );
694            metrics::counter!("ht_portfolio_ghost_positions_repaired_total").increment(1);
695            self.service.remove_expired_position(wallet, symbol).await;
696        }
697
698        Ok(settled_symbols.len())
699    }
700
701    /// Compute and publish margin updates for all wallets with active portfolio subscribers.
702    ///
703    /// Returns the number of unique wallets attempted.
704    pub async fn publish_margin_updates_for_subscribers(&self) -> usize {
705        let wallets: Vec<WalletAddress> = {
706            let subscribers = self.subscribers.read().await;
707            subscribers.keys().cloned().collect()
708        };
709
710        for wallet in &wallets {
711            self.publish_margin_update(wallet).await;
712        }
713
714        wallets.len()
715    }
716
717    async fn has_portfolio_subscribers(&self, wallet: &WalletAddress) -> bool {
718        let subscribers = self.subscribers.read().await;
719        subscribers
720            .get(wallet)
721            .map(|account_subs| !account_subs.is_empty())
722            .unwrap_or(false)
723    }
724
725    async fn compute_wallet_live_greeks_snapshot(
726        &self,
727        wallet: &WalletAddress,
728    ) -> anyhow::Result<(Vec<PositionGreeksLeg>, Option<PortfolioGreeksAggregate>)> {
729        let greeks_cache = self
730            .greeks_cache
731            .read()
732            .await
733            .as_ref()
734            .cloned()
735            .ok_or_else(|| anyhow::anyhow!("GreeksCache not configured"))?;
736
737        let live_portfolio = self.service.get_portfolio(wallet).await;
738        let live_positions = live_portfolio
739            .positions
740            .into_iter()
741            .map(|p| (p.position.symbol, p.position.amount));
742        let net_quantities = build_net_position_quantities(live_positions, &[])
743            .map_err(|e| anyhow::anyhow!("Invalid position state for greeks update: {}", e))?;
744
745        if net_quantities.is_empty() {
746            return Ok((Vec::new(), None));
747        }
748
749        let mut contract_greeks = HashMap::with_capacity(net_quantities.len());
750        for symbol in net_quantities.keys() {
751            let greeks = greeks_cache.get_greeks(symbol).await.map_err(|e| {
752                anyhow::anyhow!(
753                    "Failed to fetch greeks for symbol {} while publishing WS update: {}",
754                    symbol,
755                    e
756                )
757            })?;
758            contract_greeks.insert(symbol.clone(), greeks);
759        }
760
761        let response = calculate_portfolio_greeks(*wallet, &net_quantities, &contract_greeks)
762            .map_err(|e| anyhow::anyhow!("Failed to compute portfolio greeks: {}", e))?;
763
764        Ok((response.per_leg, response.aggregate))
765    }
766
767    /// Compute and publish a portfolio Greeks update for a wallet.
768    pub async fn publish_greeks_update(&self, wallet: &WalletAddress) {
769        if !self.has_portfolio_subscribers(wallet).await {
770            return;
771        }
772
773        let (per_leg, aggregate) = match self.compute_wallet_live_greeks_snapshot(wallet).await {
774            Ok(snapshot) => snapshot,
775            Err(e) => {
776                tracing::debug!(
777                    "Skipping greeks update publish for {} due to calculation error: {}",
778                    wallet,
779                    e
780                );
781                return;
782            }
783        };
784
785        self.notify_subscribers(
786            wallet,
787            PortfolioUpdate::GreeksUpdate {
788                per_leg,
789                aggregate,
790                timestamp: chrono::Utc::now().timestamp(),
791            },
792        )
793        .await;
794    }
795
796    /// Compute and publish greeks updates for all wallets with active portfolio subscribers.
797    ///
798    /// Returns the number of unique wallets attempted.
799    pub async fn publish_greeks_updates_for_subscribers(&self) -> usize {
800        let wallets: Vec<WalletAddress> = {
801            let subscribers = self.subscribers.read().await;
802            subscribers.keys().cloned().collect()
803        };
804
805        for wallet in &wallets {
806            self.publish_greeks_update(wallet).await;
807        }
808
809        wallets.len()
810    }
811
812    /// Get the underlying PortfolioService for wiring into other components.
813    ///
814    /// This returns the same service instance that PortfolioCache uses,
815    /// allowing UnifiedEngine to read executed state from the canonical source.
816    pub fn get_service(&self) -> Arc<PortfolioServiceImpl> {
817        self.service.clone()
818    }
819
820    /// Per-position repricing: update each position's market price individually.
821    ///
822    /// Reprice all wallets that have positions. Called once at startup after all
823    /// dependencies (greeks cache, oracles) are wired, to fix stale UPNL=0 from
824    /// journal replay.
825    pub async fn reprice_all_wallets(&self) -> usize {
826        let wallets: Vec<WalletAddress> = {
827            let all = self.service.all_portfolios().await;
828            all.into_iter()
829                .filter(|(_, balance)| !balance.positions.is_empty())
830                .map(|(wallet, _)| wallet)
831                .collect()
832        };
833        let total = wallets.len();
834        let mut repriced = 0;
835        for wallet in &wallets {
836            match self.refresh_live_market_prices_for_wallet(wallet).await {
837                Ok(n) if n > 0 => {
838                    repriced += 1;
839                    tracing::debug!("Repriced {} symbols for wallet {}", n, wallet);
840                }
841                Ok(_) => {}
842                Err(e) => {
843                    tracing::debug!("Could not reprice wallet {}: {}", wallet, e);
844                }
845            }
846        }
847        tracing::info!(
848            "Startup repricing: {}/{} wallets with positions repriced",
849            repriced,
850            total
851        );
852        repriced
853    }
854
855    /// Unlike the old batch approach (`refresh_wallet_theoretical_option_upnl`), this
856    /// continues on individual failures so one instrument's IV failure doesn't poison
857    /// the entire wallet's UPNL.
858    async fn refresh_live_market_prices_for_wallet(
859        &self,
860        wallet: &WalletAddress,
861    ) -> anyhow::Result<usize> {
862        let _ = self
863            .prune_applied_settlement_ghost_positions(wallet)
864            .await?;
865
866        let portfolio_balance = match self.service.get_portfolio_balance(wallet).await {
867            Some(portfolio_balance) => portfolio_balance,
868            None => return Ok(0),
869        };
870        if portfolio_balance.positions.is_empty() {
871            return Ok(0);
872        }
873
874        let greeks_cache = match self.greeks_cache.read().await.as_ref().cloned() {
875            Some(greeks_cache) => greeks_cache,
876            None => {
877                return Err(anyhow::anyhow!(
878                    "Greeks cache unavailable for portfolio repricing"
879                ))
880            }
881        };
882
883        let wallet_for_errors = wallet.to_string();
884        let mut symbols: Vec<String> = portfolio_balance
885            .positions
886            .keys()
887            .filter(|s| portfolio_perp_underlying(s).is_none())
888            .cloned()
889            .collect();
890        symbols.sort();
891        let fetches = symbols.into_iter().map(|symbol| {
892            let greeks_cache = greeks_cache.clone();
893            let wallet_for_errors = wallet_for_errors.clone();
894            async move {
895                let market_price = if ParsedSymbol::from_symbol(&symbol).is_ok() {
896                    greeks_cache
897                        .get_theoretical_price(&symbol)
898                        .await
899                        .map_err(|error| {
900                            anyhow::anyhow!(
901                                "Portfolio repricing unavailable for {} on {}: {}",
902                                symbol,
903                                wallet_for_errors,
904                                error
905                            )
906                        })?
907                } else {
908                    return Err(anyhow::anyhow!(
909                        "Portfolio repricing unavailable for {} on {}: unsupported symbol",
910                        symbol,
911                        wallet_for_errors
912                    ));
913                };
914
915                let market_price_decimal =
916                    Decimal::from_f64_retain(market_price).ok_or_else(|| {
917                        anyhow::anyhow!(
918                            "Invalid repriced market price for {} on {}: {}",
919                            symbol,
920                            wallet_for_errors,
921                            market_price
922                        )
923                    })?;
924                Ok::<(String, Decimal), anyhow::Error>((symbol, market_price_decimal))
925            }
926        });
927
928        let mut market_prices = HashMap::new();
929        for result in join_all(fetches).await {
930            match result {
931                Ok((symbol, market_price_decimal)) => {
932                    market_prices.insert(symbol, market_price_decimal);
933                }
934                Err(e) => {
935                    tracing::warn!("refresh_live_market_prices: wallet={}, error={}", wallet, e);
936                    return Err(e);
937                }
938            }
939        }
940
941        if market_prices.is_empty() {
942            return Ok(0);
943        }
944
945        let updated_symbols = market_prices.len();
946        self.service.update_market_prices(market_prices).await;
947        Ok(updated_symbols)
948    }
949
950    async fn margin_mode_for_wallet(&self, wallet: &WalletAddress) -> anyhow::Result<MarginMode> {
951        let tier_cache = self
952            .tier_cache
953            .read()
954            .await
955            .as_ref()
956            .cloned()
957            .ok_or_else(|| anyhow::anyhow!("TierCache not configured"))?;
958        tier_cache.get_margin_mode(wallet).await
959    }
960
961    /// Publish repriced positions for a subscribed wallet.
962    pub async fn publish_position_updates(&self, wallet: &WalletAddress) {
963        if !self.has_portfolio_subscribers(wallet).await {
964            return;
965        }
966
967        let positions = match self.build_enriched_positions(wallet).await {
968            Ok(positions) => positions,
969            Err(error) => {
970                tracing::debug!(
971                    "Skipping position update publish for {} due to calculation error: {}",
972                    wallet,
973                    error
974                );
975                return;
976            }
977        };
978
979        let timestamp = chrono::Utc::now().timestamp();
980        for position in positions {
981            self.notify_subscribers(
982                wallet,
983                PortfolioUpdate::PositionUpdate {
984                    position,
985                    timestamp,
986                },
987            )
988            .await;
989        }
990    }
991
992    /// Publish repriced position updates for every wallet with active subscribers.
993    pub async fn publish_position_updates_for_subscribers(&self) -> usize {
994        let wallets: Vec<WalletAddress> = {
995            let subscribers = self.subscribers.read().await;
996            subscribers.keys().cloned().collect()
997        };
998
999        for wallet in &wallets {
1000            self.publish_position_updates(wallet).await;
1001        }
1002
1003        wallets.len()
1004    }
1005
1006    async fn build_enriched_positions(
1007        &self,
1008        wallet: &WalletAddress,
1009    ) -> anyhow::Result<Vec<PositionWithMetrics>> {
1010        // Use per-position repricing (graceful degradation) instead of batch
1011        if let Err(error) = self.refresh_live_market_prices_for_wallet(wallet).await {
1012            tracing::warn!(
1013                "Failed to refresh live market prices for {} while building positions: {}",
1014                wallet,
1015                error
1016            );
1017        }
1018        let mut portfolio = self.service.get_portfolio(wallet).await;
1019        let margin_snapshot = match self.compute_wallet_margin_snapshot(wallet).await {
1020            Ok(snapshot) => snapshot,
1021            Err(e) => {
1022                tracing::warn!(
1023                    "Margin snapshot unavailable for {} while building WS positions: {}",
1024                    wallet,
1025                    e
1026                );
1027                return Ok(portfolio.positions);
1028            }
1029        };
1030        let mode = margin_snapshot.mode;
1031        portfolio.margin_mode = mode.as_str().to_string();
1032        portfolio.margin_summary = Some(margin_snapshot.margin_summary);
1033        portfolio.span_margin = Some(margin_snapshot.span_margin);
1034        portfolio.available_balance = margin_snapshot.available_balance;
1035        portfolio.total_margin_used = margin_snapshot.total_margin_used;
1036        enrich_position_metrics(
1037            mode,
1038            margin_snapshot
1039                .standard_position_contributions
1040                .map(|contributions| {
1041                    contributions
1042                        .into_iter()
1043                        .map(|(symbol, contribution)| {
1044                            (
1045                                symbol,
1046                                PositionMarginMetrics {
1047                                    initial_margin: contribution.initial_margin,
1048                                    maintenance_margin: contribution.maintenance_margin,
1049                                },
1050                            )
1051                        })
1052                        .collect()
1053                }),
1054            margin_snapshot.standard_option_marks,
1055            &mut portfolio,
1056        )
1057        .map_err(|status| {
1058            anyhow::anyhow!(
1059                "Failed to enrich position metrics for {}: status={}",
1060                wallet,
1061                status
1062            )
1063        })?;
1064        Ok(portfolio.positions)
1065    }
1066
1067    async fn build_enriched_position_update(
1068        &self,
1069        wallet: &WalletAddress,
1070        symbol: &str,
1071    ) -> anyhow::Result<PositionWithMetrics> {
1072        let positions = self.build_enriched_positions(wallet).await?;
1073        if let Some(position) = positions.into_iter().find(|p| p.position.symbol == symbol) {
1074            return Ok(position);
1075        }
1076
1077        Ok(PositionWithMetrics {
1078            position: Position {
1079                wallet_address: *wallet,
1080                symbol: symbol.to_string(),
1081                amount: dec!(0),
1082                entry_price: dec!(0),
1083                margin_posted: dec!(0),
1084                realized_pnl: dec!(0),
1085                unrealized_pnl: dec!(0),
1086                updated_at: chrono::Utc::now(),
1087            },
1088            notional_value: dec!(0),
1089            maintenance_margin: dec!(0),
1090            liquidation_price: dec!(0),
1091            margin_ratio: dec!(0),
1092        })
1093    }
1094
1095    /// Initialize portfolios from the latest snapshot and return the next
1096    /// `engine_commands.command_id` that must be replayed into memory.
1097    pub async fn initialize(&self) -> Result<i64> {
1098        let loader = DbPortfolioSnapshotLoader::new(self.db.clone());
1099
1100        match loader.load_latest() {
1101            Ok(Some((snapshot_id, state))) => {
1102                if let Some(next_command_id) = state
1103                    .offsets
1104                    .get(ENGINE_COMMAND_SNAPSHOT_STREAM)
1105                    .and_then(|partitions| partitions.get(&0))
1106                    .copied()
1107                {
1108                    self.service
1109                        .clear_all()
1110                        .await
1111                        .map_err(|e| anyhow::anyhow!("Failed clearing portfolio state: {}", e))?;
1112                    for (wallet, balance) in state.states {
1113                        self.service.restore(&wallet, balance).await.map_err(|e| {
1114                            anyhow::anyhow!(
1115                                "Failed restoring portfolio state for {}: {}",
1116                                wallet,
1117                                e
1118                            )
1119                        })?;
1120                    }
1121
1122                    tracing::info!(
1123                        "Restored portfolio from snapshot id={} with journal replay boundary command_id={}",
1124                        snapshot_id,
1125                        next_command_id
1126                    );
1127                    self.sync_status.set_catching_up();
1128                    tracing::info!(
1129                        "Portfolio sync status: CatchingUp (snapshot restored, awaiting journal replay)"
1130                    );
1131                    Ok(next_command_id)
1132                } else {
1133                    tracing::warn!(
1134                        "Ignoring legacy portfolio snapshot id={} without {} boundary, rebuilding from journal",
1135                        snapshot_id,
1136                        ENGINE_COMMAND_SNAPSHOT_STREAM
1137                    );
1138                    self.service.clear_all().await.map_err(|e| {
1139                        anyhow::anyhow!("Failed clearing legacy portfolio state: {}", e)
1140                    })?;
1141                    self.sync_status.set_catching_up();
1142                    Ok(1)
1143                }
1144            }
1145            Ok(None) => {
1146                tracing::info!("No portfolio snapshots found, rebuilding from journal");
1147                self.service
1148                    .clear_all()
1149                    .await
1150                    .map_err(|e| anyhow::anyhow!("Failed clearing portfolio state: {}", e))?;
1151                self.sync_status.set_catching_up();
1152                tracing::info!(
1153                    "Portfolio sync status: CatchingUp (no snapshot, replaying journal from genesis)"
1154                );
1155                Ok(1)
1156            }
1157            Err(e) => {
1158                tracing::error!("Failed to restore portfolio from snapshot: {}", e);
1159                Err(anyhow::anyhow!("Snapshot restore failed: {}", e))
1160            }
1161        }
1162    }
1163
1164    /// Handle hypercore position updates from the HypercorePositionService.
1165    ///
1166    /// Routes to set_hypercore_position (for snapshots) or apply_hypercore_position_update (for diffs)
1167    /// based on the snapshot flag.
1168    pub async fn handle_hypercore_position_update(&self, update: HypercorePositionUpdate) {
1169        if update.snapshot {
1170            self.service.set_hypercore_position(&update).await;
1171        } else {
1172            self.service.apply_hypercore_position_update(&update).await;
1173        }
1174
1175        // Optionally notify WebSocket subscribers of position update
1176        let wallet_normalized = update.account.to_lowercase();
1177        let timestamp = chrono::Utc::now().timestamp();
1178        let position_symbol = canonical_perp_symbol(&update.coin);
1179
1180        if let Ok(wallet) = WalletAddress::from_str(&wallet_normalized) {
1181            if !self.has_portfolio_subscribers(&wallet).await {
1182                return;
1183            }
1184
1185            let position = match self
1186                .build_enriched_position_update(&wallet, &position_symbol)
1187                .await
1188            {
1189                Ok(position) => position,
1190                Err(e) => {
1191                    tracing::error!(
1192                        "Failed to build hypercore position update for {}/{}: {}",
1193                        wallet,
1194                        update.coin,
1195                        e
1196                    );
1197                    return;
1198                }
1199            };
1200            self.notify_subscribers(
1201                &wallet,
1202                PortfolioUpdate::PositionUpdate {
1203                    position,
1204                    timestamp,
1205                },
1206            )
1207            .await;
1208            self.publish_margin_update(&wallet).await;
1209            self.publish_greeks_update(&wallet).await;
1210        }
1211    }
1212
1213    /// Handle incoming engine messages to update portfolio state.
1214    ///
1215    /// Fill handling is memory-only. Durable `fills`, `trades`, and fill-driven
1216    /// ledger rows are owned by the journal persistence path.
1217    pub async fn handle_engine_message(&self, message: EngineMessage, seq: i64) {
1218        // Acquire barrier for state mutation only; release before notifications.
1219        let pending_notifications = {
1220            let _guard = self.lock_projection_barrier().await;
1221            self.handle_engine_message_under_barrier(message, seq).await
1222        };
1223        // Barrier is released here. Send notifications without holding it.
1224        self.send_pending_notifications(pending_notifications).await;
1225    }
1226
1227    pub(crate) async fn handle_engine_message_under_barrier(
1228        &self,
1229        message: EngineMessage,
1230        seq: i64,
1231    ) -> PendingNotifications {
1232        match &message {
1233            EngineMessage::OrderFilled { fill, .. } => {
1234                self.apply_fill_state_mutation(fill, seq).await;
1235                PendingNotifications::Fill { fill: fill.clone() }
1236            }
1237            EngineMessage::PositionExpired(ref expiry_msg) => {
1238                let change = self
1239                    .apply_position_expired_state_mutation(expiry_msg, Some(seq))
1240                    .await;
1241                PendingNotifications::Changes(vec![change])
1242            }
1243            _ => {
1244                let changes = match self.service.apply_event(&message).await {
1245                    Ok(c) => c,
1246                    Err(e) => {
1247                        panic!("CRITICAL: Portfolio event processing failed: {}", e);
1248                    }
1249                };
1250
1251                let mut last_seq = self.last_processed_seq.write().await;
1252                *last_seq = seq;
1253                drop(last_seq);
1254
1255                PendingNotifications::Changes(changes)
1256            }
1257        }
1258    }
1259
1260    pub(crate) async fn handle_order_filled_under_barrier(
1261        &self,
1262        fill: &hypercall_types::Fill,
1263        seq: i64,
1264    ) {
1265        self.apply_fill_state_mutation(fill, seq).await;
1266    }
1267
1268    pub(crate) async fn handle_position_expired_under_barrier(
1269        &self,
1270        expiry_msg: &PositionExpiredMessage,
1271        seq: Option<i64>,
1272    ) -> PendingNotifications {
1273        let change = self
1274            .apply_position_expired_state_mutation(expiry_msg, seq)
1275            .await;
1276        PendingNotifications::Changes(vec![change])
1277    }
1278
1279    pub(crate) async fn handle_replayed_position_expired_projection_under_barrier(
1280        &self,
1281        expiry_msg: &PositionExpiredMessage,
1282    ) -> PendingNotifications {
1283        self.service
1284            .remove_expired_position(&expiry_msg.wallet_address, &expiry_msg.symbol)
1285            .await;
1286
1287        tracing::info!(
1288            "PortfolioCache: Applied replayed expiry projection for {}/{}",
1289            expiry_msg.wallet_address,
1290            expiry_msg.symbol,
1291        );
1292
1293        let (position_change, total_margin_used) = self
1294            .get_position_and_margin_for_notification(
1295                &expiry_msg.wallet_address,
1296                &expiry_msg.symbol,
1297            )
1298            .await;
1299
1300        PendingNotifications::Changes(vec![PortfolioChange {
1301            wallet: expiry_msg.wallet_address,
1302            position_changes: vec![position_change],
1303            balance_change: None,
1304            total_margin_used,
1305        }])
1306    }
1307
1308    pub(crate) async fn handle_option_custody_delta(
1309        &self,
1310        wallet: WalletAddress,
1311        symbol: String,
1312        quantity_delta: Decimal,
1313        seq: Option<i64>,
1314    ) {
1315        let pending = {
1316            let _guard = self.lock_projection_barrier().await;
1317            self.handle_option_custody_delta_under_barrier(wallet, symbol, quantity_delta, seq)
1318                .await
1319        };
1320        self.send_pending_notifications(pending).await;
1321    }
1322
1323    pub(crate) async fn handle_option_custody_delta_under_barrier(
1324        &self,
1325        wallet: WalletAddress,
1326        symbol: String,
1327        quantity_delta: Decimal,
1328        seq: Option<i64>,
1329    ) -> PendingNotifications {
1330        let change = self
1331            .service
1332            .apply_option_custody_delta(&wallet, &symbol, quantity_delta)
1333            .await;
1334        if let Some(seq) = seq {
1335            let mut last_seq = self.last_processed_seq.write().await;
1336            *last_seq = seq;
1337        }
1338        PendingNotifications::Changes(vec![change])
1339    }
1340
1341    pub async fn replay_option_custody_delta(
1342        &self,
1343        wallet: WalletAddress,
1344        symbol: String,
1345        quantity_delta: Decimal,
1346        seq: i64,
1347    ) {
1348        let _guard = self.lock_projection_barrier().await;
1349        self.handle_option_custody_delta_under_barrier(wallet, symbol, quantity_delta, Some(seq))
1350            .await;
1351    }
1352
1353    /// Send fill notifications. Call AFTER releasing the
1354    /// projection barrier so these non-critical operations don't block other
1355    /// state mutations.
1356    pub(crate) async fn send_fill_notifications(&self, fill: &hypercall_types::Fill) {
1357        let timestamp = chrono::Utc::now().timestamp();
1358
1359        let (taker_pos, taker_margin) = self
1360            .get_position_and_margin_for_notification(&fill.taker_wallet_address, &fill.symbol)
1361            .await;
1362        let taker_change = PortfolioChange {
1363            wallet: fill.taker_wallet_address,
1364            position_changes: vec![taker_pos],
1365            balance_change: None,
1366            total_margin_used: taker_margin,
1367        };
1368        self.send_change_notifications(&taker_change, timestamp)
1369            .await;
1370
1371        let (maker_pos, maker_margin) = self
1372            .get_position_and_margin_for_notification(&fill.maker_wallet_address, &fill.symbol)
1373            .await;
1374        let maker_change = PortfolioChange {
1375            wallet: fill.maker_wallet_address,
1376            position_changes: vec![maker_pos],
1377            balance_change: None,
1378            total_margin_used: maker_margin,
1379        };
1380        self.send_change_notifications(&maker_change, timestamp)
1381            .await;
1382    }
1383
1384    pub async fn replay_journal_fill(&self, fill: &hypercall_types::Fill, seq: i64) {
1385        let _guard = self.lock_projection_barrier().await;
1386        self.apply_fill_state_mutation(fill, seq).await;
1387    }
1388
1389    /// Apply fill state mutations to portfolio memory. This is the barrier-protected
1390    /// part of fill processing: position updates and seq advancement.
1391    /// Does NOT send notifications -- callers should use
1392    /// `send_fill_notifications` after releasing the barrier.
1393    async fn apply_fill_state_mutation(&self, fill: &hypercall_types::Fill, seq: i64) {
1394        let taker_side_orderbook = fill.taker_side;
1395        let maker_side = match fill.taker_side {
1396            hypercall_types::Side::Buy => hypercall_types::Side::Sell,
1397            hypercall_types::Side::Sell => hypercall_types::Side::Buy,
1398        };
1399        let size_human = to_human_readable_decimal(&fill.symbol, fill.size);
1400
1401        self.service
1402            .apply_fill_to_memory_both_sides(
1403                &fill.taker_wallet_address,
1404                &fill.maker_wallet_address,
1405                &fill.symbol,
1406                &taker_side_orderbook,
1407                &maker_side,
1408                fill.price,
1409                size_human,
1410            )
1411            .await;
1412
1413        let mut last_seq = self.last_processed_seq.write().await;
1414        *last_seq = seq;
1415    }
1416
1417    async fn apply_position_expired_state_mutation(
1418        &self,
1419        expiry_msg: &PositionExpiredMessage,
1420        seq: Option<i64>,
1421    ) -> PortfolioChange {
1422        let settlement = self.settlement.clone();
1423        let expiry_msg_for_repair = expiry_msg.clone();
1424        let settlement_outcome = tokio::task::spawn_blocking(move || {
1425            settlement.try_apply_settlement_sync(
1426                &expiry_msg_for_repair.wallet_address,
1427                &expiry_msg_for_repair.symbol,
1428                expiry_msg_for_repair.position_size,
1429                expiry_msg_for_repair.settlement_price,
1430                expiry_msg_for_repair.settlement_value,
1431                expiry_msg_for_repair.margin_mode,
1432                expiry_msg_for_repair.timestamp as i64,
1433                expiry_msg_for_repair.settlement_entry_price,
1434                expiry_msg_for_repair.cost_basis,
1435                expiry_msg_for_repair.net_pnl,
1436            )
1437        })
1438        .await
1439        .unwrap_or_else(|error| {
1440            panic!(
1441                "CRITICAL: settlement repair task join failed for {}/{}: {}",
1442                expiry_msg.wallet_address, expiry_msg.symbol, error
1443            )
1444        })
1445        .unwrap_or_else(|error| {
1446            metrics::counter!("ht_settlement_apply_failures_total").increment(1);
1447            panic!(
1448                "CRITICAL: failed to confirm or repair settlement accounting for {}/{}: {}",
1449                expiry_msg.wallet_address, expiry_msg.symbol, error
1450            )
1451        });
1452
1453        if settlement_outcome.newly_persisted {
1454            tracing::warn!(
1455                wallet = %expiry_msg.wallet_address,
1456                symbol = %expiry_msg.symbol,
1457                "Repaired missing settlement payout before portfolio projection"
1458            );
1459        } else {
1460            let confirmed_applied = self
1461                .settlement
1462                .is_settlement_ledger_applied_sync(&expiry_msg.wallet_address, &expiry_msg.symbol)
1463                .unwrap_or_else(|error| {
1464                    panic!(
1465                        "CRITICAL: DB error checking repaired settlement state for {}/{}: {}. \
1466                         Panicking to allow replay.",
1467                        expiry_msg.wallet_address, expiry_msg.symbol, error
1468                    )
1469                });
1470            if !confirmed_applied {
1471                metrics::counter!("ht_settlement_apply_failures_total").increment(1);
1472                panic!(
1473                    "CRITICAL: settlement repair completed but ledger_applied is still false for {}/{}",
1474                    expiry_msg.wallet_address, expiry_msg.symbol
1475                )
1476            }
1477        }
1478
1479        self.service
1480            .remove_expired_position(&expiry_msg.wallet_address, &expiry_msg.symbol)
1481            .await;
1482
1483        tracing::info!(
1484            "PortfolioCache: Applied settled expiry projection for {}/{}",
1485            expiry_msg.wallet_address,
1486            expiry_msg.symbol,
1487        );
1488        metrics::counter!("ht_settlement_apply_success_total").increment(1);
1489
1490        if let Some(seq) = seq {
1491            let mut last_seq = self.last_processed_seq.write().await;
1492            *last_seq = seq;
1493        }
1494
1495        let (position_change, total_margin_used) = self
1496            .get_position_and_margin_for_notification(
1497                &expiry_msg.wallet_address,
1498                &expiry_msg.symbol,
1499            )
1500            .await;
1501
1502        PortfolioChange {
1503            wallet: expiry_msg.wallet_address,
1504            position_changes: vec![position_change],
1505            balance_change: None,
1506            total_margin_used,
1507        }
1508    }
1509
1510    /// Dispatch pending notification work that was collected under the barrier.
1511    pub(crate) async fn send_pending_notifications(&self, pending: PendingNotifications) {
1512        match pending {
1513            PendingNotifications::Fill { fill } => self.send_fill_notifications(&fill).await,
1514            PendingNotifications::Changes(changes) => {
1515                let timestamp = chrono::Utc::now().timestamp();
1516                for change in &changes {
1517                    self.send_change_notifications(change, timestamp).await;
1518                }
1519            }
1520        }
1521    }
1522
1523    /// Send portfolio update notifications from a PortfolioChange struct.
1524    ///
1525    /// This is the centralized notification method - all portfolio state changes
1526    /// should go through here to ensure consistent WebSocket updates.
1527    async fn send_change_notifications(&self, change: &PortfolioChange, timestamp: i64) {
1528        // Send position updates for each changed position
1529        for pos_change in &change.position_changes {
1530            // If margin worked for the fill, it must work for the notification.
1531            // Crash instead of silently dropping the PositionUpdate (causes WS inconsistency).
1532            let position = self
1533                .build_enriched_position_update(&change.wallet, &pos_change.symbol)
1534                .await
1535                .expect("build_enriched_position_update failed after successful fill — margin state is inconsistent");
1536            self.notify_subscribers(
1537                &change.wallet,
1538                PortfolioUpdate::PositionUpdate {
1539                    position,
1540                    timestamp,
1541                },
1542            )
1543            .await;
1544        }
1545
1546        // Send balance update
1547        self.notify_subscribers(
1548            &change.wallet,
1549            PortfolioUpdate::BalanceUpdate {
1550                total_margin_used: change.total_margin_used,
1551                timestamp,
1552            },
1553        )
1554        .await;
1555
1556        self.publish_greeks_update(&change.wallet).await;
1557    }
1558
1559    /// Get position state and margin for a wallet/symbol after a fill.
1560    ///
1561    /// Returns both values from a single portfolio read to avoid redundant lock acquisitions.
1562    /// Used to construct PortfolioChange for notification after apply_fill_to_memory.
1563    async fn get_position_and_margin_for_notification(
1564        &self,
1565        wallet: &WalletAddress,
1566        symbol: &str,
1567    ) -> (crate::portfolio::PositionChange, Decimal) {
1568        if let Some(balance) = self.service.get_portfolio_balance(wallet).await {
1569            let margin = balance.total_margin_used;
1570            if let Some(pos) = balance.positions.get(symbol) {
1571                return (
1572                    crate::portfolio::PositionChange {
1573                        symbol: symbol.to_string(),
1574                        amount: pos.amount,
1575                        entry_price: pos.entry_price,
1576                        margin_posted: pos.margin_posted,
1577                        realized_pnl: pos.realized_pnl,
1578                        unrealized_pnl: pos.unrealized_pnl,
1579                    },
1580                    margin,
1581                );
1582            }
1583            // Position closed but portfolio exists
1584            return (
1585                crate::portfolio::PositionChange {
1586                    symbol: symbol.to_string(),
1587                    amount: dec!(0),
1588                    entry_price: dec!(0),
1589                    margin_posted: dec!(0),
1590                    realized_pnl: dec!(0),
1591                    unrealized_pnl: dec!(0),
1592                },
1593                margin,
1594            );
1595        }
1596        // Portfolio doesn't exist
1597        (
1598            crate::portfolio::PositionChange {
1599                symbol: symbol.to_string(),
1600                amount: dec!(0),
1601                entry_price: dec!(0),
1602                margin_posted: dec!(0),
1603                realized_pnl: dec!(0),
1604                unrealized_pnl: dec!(0),
1605            },
1606            dec!(0),
1607        )
1608    }
1609
1610    /// Subscribe to portfolio updates for a specific account
1611    pub async fn subscribe(
1612        &self,
1613        account: WalletAddress,
1614    ) -> (SubscriberId, mpsc::UnboundedReceiver<PortfolioUpdate>) {
1615        let (tx, rx) = mpsc::unbounded_channel();
1616
1617        // Generate a new subscriber ID
1618        let mut next_id = self.next_subscriber_id.write().await;
1619        let subscriber_id = *next_id;
1620        *next_id += 1;
1621        drop(next_id);
1622
1623        let positions = match self.build_enriched_positions(&account).await {
1624            Ok(positions) => positions,
1625            Err(e) => {
1626                tracing::error!(
1627                    "Failed to build initial portfolio snapshot for {}: {}",
1628                    account,
1629                    e
1630                );
1631                return (subscriber_id, rx);
1632            }
1633        };
1634
1635        // Add subscriber to the map
1636        let mut subscribers = self.subscribers.write().await;
1637        subscribers
1638            .entry(account)
1639            .or_insert_with(HashMap::new)
1640            .insert(subscriber_id, tx.clone());
1641        drop(subscribers);
1642
1643        let _ = tx.send(PortfolioUpdate::Initial {
1644            positions,
1645            timestamp: chrono::Utc::now().timestamp(),
1646        });
1647
1648        // Trigger margin update so subscriber gets equity/margin immediately
1649        // (publish_margin_update is a no-op if margin services aren't configured)
1650        self.publish_margin_update(&account).await;
1651        self.publish_greeks_update(&account).await;
1652
1653        (subscriber_id, rx)
1654    }
1655
1656    /// Unsubscribe a specific subscriber
1657    pub async fn unsubscribe(&self, account: &WalletAddress, subscriber_id: SubscriberId) {
1658        let mut subscribers = self.subscribers.write().await;
1659        if let Some(account_subs) = subscribers.get_mut(account) {
1660            account_subs.remove(&subscriber_id);
1661            if account_subs.is_empty() {
1662                subscribers.remove(account);
1663            }
1664        }
1665    }
1666
1667    /// Return the number of active subscribers for a wallet.
1668    pub async fn subscriber_count_for_wallet(&self, account: &WalletAddress) -> usize {
1669        let subscribers = self.subscribers.read().await;
1670        subscribers
1671            .get(account)
1672            .map(|account_subs| account_subs.len())
1673            .unwrap_or(0)
1674    }
1675
1676    /// Notify subscribers of portfolio updates
1677    async fn notify_subscribers(&self, account: &WalletAddress, update: PortfolioUpdate) {
1678        let failed_subscribers: Vec<SubscriberId> = {
1679            let subscribers = self.subscribers.read().await;
1680            let Some(account_subs) = subscribers.get(account) else {
1681                return;
1682            };
1683
1684            account_subs
1685                .iter()
1686                .filter_map(|(subscriber_id, tx)| {
1687                    tx.send(update.clone()).err().map(|_| *subscriber_id)
1688                })
1689                .collect()
1690        };
1691
1692        if failed_subscribers.is_empty() {
1693            return;
1694        }
1695
1696        let mut subscribers = self.subscribers.write().await;
1697        if let Some(account_subs) = subscribers.get_mut(account) {
1698            for subscriber_id in failed_subscribers {
1699                account_subs.remove(&subscriber_id);
1700            }
1701
1702            if account_subs.is_empty() {
1703                subscribers.remove(account);
1704            }
1705        }
1706    }
1707
1708    pub async fn process_test_fill(
1709        &self,
1710        account: &WalletAddress,
1711        option_id: &str,
1712        side: &str,
1713        price: Decimal,
1714        volume: Decimal,
1715    ) {
1716        use hypercall_types::Fill;
1717        use hypercall_types::Side;
1718
1719        let side_enum = match side {
1720            "buy" => Side::Buy,
1721            "sell" => Side::Sell,
1722            _ => return,
1723        };
1724
1725        let size_contract_units = to_contract_units_decimal(option_id, volume);
1726
1727        let fill = Fill {
1728            trade_id: 0,
1729            taker_order_id: 0,
1730            maker_order_id: 0,
1731            symbol: option_id.to_string(),
1732            price,
1733            size: size_contract_units,
1734            taker_side: side_enum,
1735            taker_wallet_address: *account,
1736            maker_wallet_address: WalletAddress::from(alloy::primitives::Address::ZERO),
1737            fee: dec!(0),
1738            is_taker: true,
1739            timestamp: 0,
1740            builder_code_address: None,
1741            builder_code_fee: None,
1742            source: Default::default(),
1743            taker_realized_pnl: None,
1744            maker_realized_pnl: None,
1745            underlying_notional: None,
1746        };
1747
1748        self.service
1749            .apply_event(&EngineMessage::OrderFilled {
1750                accounting: hypercall_engine::FillAccounting::from_fill(&fill),
1751                fill,
1752            })
1753            .await
1754            .unwrap();
1755    }
1756
1757    /// Get portfolio for a specific account
1758    pub async fn get_portfolio(&self, account: &WalletAddress) -> Result<Portfolio> {
1759        if let Err(error) = self.refresh_live_market_prices_for_wallet(account).await {
1760            tracing::warn!(
1761                "Failed to refresh live market prices for {} before REST portfolio read: {}",
1762                account,
1763                error
1764            );
1765        }
1766        Ok(self.service.get_portfolio(account).await)
1767    }
1768
1769    pub async fn get_portfolio_fail_closed_pm(&self, account: &WalletAddress) -> Result<Portfolio> {
1770        self.refresh_live_market_prices_for_wallet(account)
1771            .await
1772            .map_err(|error| anyhow::anyhow!("PM portfolio read unavailable: {}", error))?;
1773        Ok(self.service.get_portfolio(account).await)
1774    }
1775
1776    pub async fn has_live_position_symbol(&self, wallet: &WalletAddress, symbol: &str) -> bool {
1777        self.service
1778            .get_portfolio_balance(wallet)
1779            .await
1780            .map(|balance| balance.positions.contains_key(symbol))
1781            .unwrap_or(false)
1782    }
1783
1784    /// Check if a portfolio exists for an account.
1785    pub async fn has_portfolio(&self, account: &WalletAddress) -> bool {
1786        self.service.get_portfolio_balance(account).await.is_some()
1787    }
1788
1789    /// Update market prices for unrealized PnL calculations
1790    pub async fn update_market_prices(&self, prices: HashMap<String, Decimal>) {
1791        // Delegate to service
1792        self.service.update_market_prices(prices).await;
1793    }
1794
1795    /// Get the last processed portfolio sequence marker.
1796    pub async fn get_last_processed_seq(&self) -> i64 {
1797        *self.last_processed_seq.read().await
1798    }
1799
1800    /// Capture current `(seq, portfolios)` for tests and diagnostics.
1801    pub async fn capture_snapshot_state(&self) -> (i64, HashMap<WalletAddress, PortfolioBalance>) {
1802        let _guard = self.lock_projection_barrier().await;
1803        let seq = *self.last_processed_seq.read().await;
1804        let portfolios = self.service.all_portfolios().await;
1805        (seq, portfolios)
1806    }
1807
1808    pub async fn capture_portfolios_under_barrier(
1809        &self,
1810        _barrier_proof: &OwnedMutexGuard<()>,
1811    ) -> HashMap<WalletAddress, PortfolioBalance> {
1812        self.service.all_portfolios().await
1813    }
1814
1815    /// Get all portfolios with margin info (for metrics collection).
1816    ///
1817    /// Returns a map of wallet address to portfolio summary including positions and margin.
1818    pub async fn get_all_portfolios(&self) -> HashMap<WalletAddress, PortfolioSummary> {
1819        let portfolios = self.service.all_portfolios().await;
1820        let mut result = HashMap::new();
1821
1822        for (wallet, balance) in portfolios {
1823            // Convert positions to summary format
1824            let positions: HashMap<String, PositionSummary> = balance
1825                .positions
1826                .iter()
1827                .map(|(symbol, pos)| {
1828                    (
1829                        symbol.clone(),
1830                        PositionSummary {
1831                            symbol: pos.symbol.clone(),
1832                            amount: pos.amount,
1833                            entry_price: pos.entry_price,
1834                            realized_pnl: pos.realized_pnl,
1835                            unrealized_pnl: pos.unrealized_pnl,
1836                        },
1837                    )
1838                })
1839                .collect();
1840
1841            result.insert(
1842                wallet,
1843                PortfolioSummary {
1844                    positions,
1845                    margin_info: None, // Margin is computed on-demand, not cached
1846                },
1847            );
1848        }
1849
1850        result
1851    }
1852}
1853
1854/// Summary of a portfolio for metrics collection.
1855#[derive(Debug, Clone)]
1856pub struct PortfolioSummary {
1857    pub positions: HashMap<String, PositionSummary>,
1858    pub margin_info: Option<MarginInfo>,
1859}
1860
1861/// Summary of a position for metrics collection.
1862#[derive(Debug, Clone)]
1863pub struct PositionSummary {
1864    pub symbol: String,
1865    pub amount: Decimal,
1866    pub entry_price: Decimal,
1867    pub realized_pnl: Decimal,
1868    pub unrealized_pnl: Decimal,
1869}
1870
1871/// Summary of margin info for metrics collection.
1872#[derive(Debug, Clone)]
1873pub struct MarginInfo {
1874    pub equity: Decimal,
1875    pub initial_margin: Decimal,
1876    pub maintenance_margin: Decimal,
1877}
1878
1879use crate::shared::order_types::perp_underlying as portfolio_perp_underlying;
1880
1881#[cfg(test)]
1882mod tests {
1883    use super::*;
1884    use crate::messaging::{EventBusTrait, MockEventBus};
1885    use crate::price_oracle::hyperliquid_oracle::{
1886        HyperliquidMarkPriceOracle, HyperliquidOracleConfig,
1887    };
1888    use crate::read_cache::tier::TierCache;
1889    use crate::rsm::engine_snapshot::{
1890        MockOrderSnapshotProvider, MockQuoteProvider, SnapshotOpenOrdersSource,
1891    };
1892    use crate::rsm::ledger::InMemoryLedger;
1893    use crate::rsm::margin_service::SpanMarginService;
1894    use crate::rsm::MarginMode;
1895    use crate::snapshot::{DbPortfolioSnapshotWriter, SnapshotWriter};
1896    use crate::standard_margin::StandardMarginService;
1897    use crate::types::{Config, Scenario, ScenarioType};
1898    use crate::vol_oracle::FixedTestRiskVolOracle;
1899    use hypercall_engine::FeeConfig;
1900    use hypercall_runtime_api::{OrderSnapshotProvider, QuoteProvider, SnapshotBookQuote};
1901    use hypercall_types::Fill;
1902    use hypercall_types::Side;
1903    use sqlx::postgres::{PgConnectOptions, PgPoolOptions};
1904    use std::collections::{HashMap, HashSet};
1905    use testcontainers::runners::AsyncRunner;
1906    use testcontainers::ContainerAsync;
1907    use testcontainers::ImageExt;
1908    use testcontainers_modules::postgres::Postgres;
1909    use tokio::time::{sleep, timeout, Duration};
1910    use uuid::Uuid;
1911
1912    use hypercall_types::wallet_address::test_wallet;
1913
1914    struct TestContext {
1915        cache: Arc<PortfolioCache>,
1916        diesel_db: Arc<hypercall_db_diesel::DieselDb>,
1917        sqlx_connect_options: PgConnectOptions,
1918        _container: Option<ContainerAsync<Postgres>>,
1919    }
1920
1921    fn normalize_test_database_url(base_database_url: &str, database_name: Option<&str>) -> String {
1922        let mut parts = base_database_url.splitn(2, '?');
1923        let base = parts
1924            .next()
1925            .expect("DATABASE_URL must contain a URL base component");
1926        let normalized_base = match parts.next() {
1927            Some(query) => {
1928                let mut seen_keys = HashSet::new();
1929                let normalized_query = query
1930                    .replace('?', "&")
1931                    .split('&')
1932                    .filter(|pair| !pair.is_empty())
1933                    .filter(|pair| {
1934                        let key = pair.split('=').next().unwrap_or_default();
1935                        seen_keys.insert(key.to_string())
1936                    })
1937                    .collect::<Vec<_>>()
1938                    .join("&");
1939                if normalized_query.is_empty() {
1940                    base.to_string()
1941                } else {
1942                    format!("{base}?{normalized_query}")
1943                }
1944            }
1945            None => base.to_string(),
1946        };
1947
1948        let Some(database_name) = database_name else {
1949            return normalized_base;
1950        };
1951
1952        let (normalized_base, normalized_query) = match normalized_base.split_once('?') {
1953            Some((base, query)) => (base, Some(query)),
1954            None => (normalized_base.as_str(), None),
1955        };
1956        let (base_prefix, _) = normalized_base
1957            .rsplit_once('/')
1958            .expect("DATABASE_URL must include a database path");
1959        let rewritten_base = format!("{base_prefix}/{database_name}");
1960        match normalized_query {
1961            Some(query) if !query.is_empty() => format!("{rewritten_base}?{query}"),
1962            _ => rewritten_base,
1963        }
1964    }
1965
1966    async fn try_setup_test_cache_from_shared_database(
1967        base_database_url: &str,
1968    ) -> Result<TestContext, String> {
1969        let database_name = format!("portfolio_cache_test_{}", Uuid::now_v7().simple());
1970        let admin_database_url = normalize_test_database_url(base_database_url, None);
1971        let admin_pool = PgPoolOptions::new()
1972            .max_connections(1)
1973            .min_connections(1)
1974            .acquire_timeout(Duration::from_secs(30))
1975            .connect(&admin_database_url)
1976            .await
1977            .map_err(|error| {
1978                format!("failed to connect to shared PostgreSQL test database: {error}")
1979            })?;
1980        sqlx::query(&format!("CREATE DATABASE \"{}\"", database_name))
1981            .execute(&admin_pool)
1982            .await
1983            .map_err(|error| {
1984                format!("failed to create isolated PostgreSQL test database: {error}")
1985            })?;
1986        admin_pool.close().await;
1987
1988        let database_url = normalize_test_database_url(base_database_url, Some(&database_name));
1989        let sqlx_connect_options = PgConnectOptions::from_str(&database_url).map_err(|error| {
1990            format!("isolated PostgreSQL test database URL must parse: {error}")
1991        })?;
1992        let diesel_handler = Arc::new(
1993            DatabaseHandler::new(&database_url)
1994                .map_err(|error| format!("failed to initialize DieselEventHandler: {error:#}"))?,
1995        );
1996        let cache = Arc::new(PortfolioCache::new(diesel_handler));
1997        let _ = cache
1998            .initialize()
1999            .await
2000            .map_err(|error| format!("failed to initialize portfolio cache: {error}"))?;
2001
2002        let diesel_db = Arc::new(
2003            hypercall_db_diesel::DieselDb::new_no_tls(&database_url, 2)
2004                .await
2005                .map_err(|error| format!("failed to initialize DieselDb: {error:#}"))?,
2006        );
2007
2008        Ok(TestContext {
2009            cache,
2010            diesel_db,
2011            sqlx_connect_options,
2012            _container: None,
2013        })
2014    }
2015
2016    async fn setup_test_cache() -> TestContext {
2017        if let Ok(base_database_url) = std::env::var("TEST_DATABASE_URL") {
2018            match try_setup_test_cache_from_shared_database(&base_database_url).await {
2019                Ok(context) => return context,
2020                Err(error) => {
2021                    eprintln!(
2022                        "shared PostgreSQL test database unavailable for portfolio_cache tests: {error}; falling back to testcontainer"
2023                    );
2024                }
2025            }
2026        }
2027
2028        // Start PostgreSQL container with the new async API
2029        // Match deployed PostgreSQL 16 lanes.
2030        // testcontainers-modules defaults to postgres:11-alpine, where
2031        // `ALTER TYPE ... ADD VALUE` fails inside a transaction — PG12 lifted
2032        // that restriction, so pinning 16-alpine keeps migrations (including
2033        // the enum-extending ones in migrations/2026-04-11-000001 and
2034        // migrations/2026-04-12-000002) runnable under diesel's default
2035        // per-migration transaction.
2036        //
2037        // NOTE: `with_tag` returns `ContainerRequest<Postgres>` via the
2038        // `ImageExt` trait, which doesn't expose the Postgres-specific
2039        // builders (`with_db_name`, `with_user`, ...). Call it last.
2040        let postgres_image = Postgres::default()
2041            .with_db_name("test_db")
2042            .with_user("test_user")
2043            .with_password("test_password")
2044            .with_startup_timeout(Duration::from_secs(120))
2045            .with_tag("16-alpine");
2046
2047        let container = postgres_image
2048            .start()
2049            .await
2050            .expect("Failed to start PostgreSQL container");
2051        let port = container
2052            .get_host_port_ipv4(5432)
2053            .await
2054            .expect("Failed to get port");
2055
2056        let database_url = format!(
2057            "postgresql://test_user:test_password@127.0.0.1:{}/test_db",
2058            port
2059        );
2060
2061        // CI can expose the mapped port before Postgres is ready to accept
2062        // migrations. Poll the database directly instead of sleeping a fixed
2063        // amount of time.
2064        let mut last_ready_error = None;
2065        for _ in 0..60 {
2066            match PgPoolOptions::new()
2067                .max_connections(1)
2068                .min_connections(1)
2069                .acquire_timeout(Duration::from_secs(5))
2070                .connect(&database_url)
2071                .await
2072            {
2073                Ok(pool) => match sqlx::query("SELECT 1").execute(&pool).await {
2074                    Ok(_) => {
2075                        pool.close().await;
2076                        last_ready_error = None;
2077                        break;
2078                    }
2079                    Err(error) => {
2080                        last_ready_error = Some(format!("readiness query failed: {error}"));
2081                        pool.close().await;
2082                    }
2083                },
2084                Err(error) => {
2085                    last_ready_error = Some(format!("failed to connect to postgres: {error}"));
2086                }
2087            }
2088
2089            sleep(Duration::from_secs(1)).await;
2090        }
2091
2092        if let Some(error) = last_ready_error {
2093            panic!("PostgreSQL container failed readiness checks: {error}");
2094        }
2095
2096        // Create diesel handler which will run migrations. Even after the
2097        // readiness query succeeds, Postgres can still be finishing startup on
2098        // slower CI machines, so give migrations a short retry window.
2099        let mut diesel_handler = None;
2100        let mut last_handler_error = None;
2101        for _ in 0..10 {
2102            match DatabaseHandler::new(&database_url) {
2103                Ok(handler) => {
2104                    diesel_handler = Some(Arc::new(handler));
2105                    last_handler_error = None;
2106                    break;
2107                }
2108                Err(error) => {
2109                    last_handler_error =
2110                        Some(format!("failed to create diesel handler: {error:#}"));
2111                    sleep(Duration::from_secs(1)).await;
2112                }
2113            }
2114        }
2115
2116        let diesel_handler = diesel_handler.unwrap_or_else(|| {
2117            panic!(
2118                "PostgreSQL container never became migration-ready: {}",
2119                last_handler_error.unwrap_or_else(|| "unknown error".to_string())
2120            )
2121        });
2122
2123        let cache = Arc::new(PortfolioCache::new(diesel_handler));
2124
2125        // Initialize returns the next engine command_id to replay - we ignore it for these tests
2126        let _ = cache.initialize().await.unwrap();
2127
2128        let diesel_db = Arc::new(
2129            hypercall_db_diesel::DieselDb::new(&database_url, 2)
2130                .await
2131                .expect("DieselDb must initialize for test context"),
2132        );
2133
2134        TestContext {
2135            cache,
2136            diesel_db,
2137            sqlx_connect_options: PgConnectOptions::from_str(&database_url)
2138                .expect("Test container database URL must parse for sqlx"),
2139            _container: Some(container),
2140        }
2141    }
2142
2143    async fn attach_test_greeks_cache(context: &TestContext, symbol: &str) {
2144        let raw_event_bus = Arc::new(MockEventBus::new().expect("Failed to create mock event bus"));
2145        raw_event_bus.clone().start_processing().await;
2146        let event_bus: Arc<dyn EventBusTrait> = raw_event_bus.clone();
2147
2148        {
2149            use diesel::prelude::*;
2150            use diesel::sql_types::{BigInt, Numeric, Text};
2151
2152            let pool = context.cache.db.pool();
2153            let mut conn = pool.get().expect("Failed to get Diesel connection");
2154            diesel::sql_query(
2155                "INSERT INTO instruments (id, underlying, strike, expiry, option_type)
2156                 VALUES ($1, $2, $3, $4, $5)
2157                 ON CONFLICT (id) DO NOTHING",
2158            )
2159            .bind::<Text, _>(symbol)
2160            .bind::<Text, _>("BTC")
2161            .bind::<Numeric, _>(dec!(50000))
2162            .bind::<BigInt, _>(20261231_i64)
2163            .bind::<Text, _>("call")
2164            .execute(&mut conn)
2165            .expect("Failed to insert test instrument");
2166        }
2167
2168        let oracle_config = HyperliquidOracleConfig {
2169            api_url: "https://api.hyperliquid-testnet.xyz/info".to_string(),
2170            poll_interval_ms: 60_000,
2171            risk_free_rate: 0.05,
2172            symbol: "BTC".to_string(),
2173            twap_window_seconds: 300,
2174            oracle_writer: None,
2175            max_memory_samples: 100,
2176            ws_feed: None,
2177            price_notify: None,
2178            min_settlement_samples: 500,
2179        };
2180        let btc_oracle = Arc::new(
2181            HyperliquidMarkPriceOracle::new(oracle_config).expect("Failed to create BTC oracle"),
2182        );
2183        btc_oracle.set_spot_price_for_testing(50_000.0).await;
2184
2185        let mut oracles = HashMap::new();
2186        oracles.insert("BTC".to_string(), btc_oracle);
2187
2188        let quote_provider = Arc::new(MockQuoteProvider::new());
2189        quote_provider.set_quote(
2190            symbol,
2191            SnapshotBookQuote {
2192                best_bid: Some(10_000.0),
2193                best_bid_size: Some(5.0),
2194                best_ask: Some(11_000.0),
2195                best_ask_size: Some(5.0),
2196                mid: Some(10_500.0),
2197                bids: vec![(10_000.0, 5.0)],
2198                asks: vec![(11_000.0, 5.0)],
2199            },
2200        );
2201        let quote_provider_trait: Arc<dyn QuoteProvider> = quote_provider.clone();
2202        let (_shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel::<()>(1);
2203
2204        let greeks_cache = GreeksCache::new(
2205            context.diesel_db.as_ref(),
2206            event_bus,
2207            oracles,
2208            shutdown_rx,
2209            quote_provider_trait,
2210        )
2211        .await
2212        .expect("Failed to create GreeksCache");
2213        greeks_cache
2214            .set_vol_oracle(Arc::new(FixedTestRiskVolOracle::new(0.5)))
2215            .await;
2216
2217        let mut greeks_ready = false;
2218        let mut last_error = String::new();
2219        for _ in 0..20 {
2220            match greeks_cache.get_greeks(symbol).await {
2221                Ok(_) => {
2222                    greeks_ready = true;
2223                    break;
2224                }
2225                Err(e) => {
2226                    last_error = e.to_string();
2227                }
2228            }
2229            sleep(Duration::from_millis(50)).await;
2230        }
2231        assert!(
2232            greeks_ready,
2233            "expected test GreeksCache to have greeks for {}, last error: {}",
2234            symbol, last_error
2235        );
2236
2237        *context.cache.greeks_cache.write().await = Some(greeks_cache);
2238    }
2239
2240    /// Insert a trade record to satisfy the foreign key constraint on fills.
2241    async fn insert_trade_for_fill(context: &TestContext, fill: &hypercall_types::Fill) {
2242        use diesel::prelude::*;
2243        use diesel::sql_types::{BigInt, Binary, Numeric, Text};
2244        use rust_decimal_macros::dec;
2245
2246        let pool = context.cache.db.pool();
2247        let mut conn = pool.get().expect("Failed to get connection");
2248
2249        diesel::sql_query(
2250            "INSERT INTO trades (trade_id, symbol, price, size, maker_address, taker_address, maker_fee, taker_fee, timestamp)
2251             VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
2252             ON CONFLICT (trade_id) DO NOTHING"
2253        )
2254        .bind::<BigInt, _>(fill.trade_id as i64)
2255        .bind::<Text, _>(&fill.symbol)
2256        .bind::<Numeric, _>(fill.price)
2257        .bind::<Numeric, _>(fill.size)
2258        .bind::<Binary, _>(&fill.maker_wallet_address)
2259        .bind::<Binary, _>(&fill.taker_wallet_address)
2260        .bind::<Numeric, _>(dec!(0)) // maker_fee
2261        .bind::<Numeric, _>(fill.fee) // taker_fee
2262        .bind::<BigInt, _>(fill.timestamp as i64)
2263        .execute(&mut conn)
2264        .expect("Failed to insert trade");
2265    }
2266
2267    async fn insert_applied_settlement(
2268        context: &TestContext,
2269        wallet: WalletAddress,
2270        symbol: &str,
2271        position_size: Decimal,
2272        settlement_entry_price: Decimal,
2273        cost_basis: Decimal,
2274        net_pnl: Decimal,
2275    ) {
2276        let settlement_price = dec!(1000);
2277        let settlement_value = position_size * settlement_price;
2278        let outcome = context
2279            .cache
2280            .settlement
2281            .try_apply_settlement_sync(
2282                &wallet,
2283                symbol,
2284                position_size,
2285                settlement_price,
2286                settlement_value,
2287                MarginMode::Standard,
2288                2,
2289                Some(settlement_entry_price),
2290                Some(cost_basis),
2291                Some(net_pnl),
2292            )
2293            .expect("Failed to persist applied settlement");
2294        assert!(outcome.newly_persisted);
2295    }
2296
2297    async fn attach_test_pm_margin_dependencies(
2298        context: &TestContext,
2299        wallet: WalletAddress,
2300    ) -> Arc<TierCache> {
2301        attach_test_greeks_cache(context, "BTC-20261231-50000-C").await;
2302        let greeks_cache = context
2303            .cache
2304            .greeks_cache
2305            .read()
2306            .await
2307            .as_ref()
2308            .cloned()
2309            .expect("greeks cache should be configured");
2310
2311        let ledger: Arc<dyn crate::rsm::Ledger + Send + Sync> = Arc::new(InMemoryLedger::new());
2312        let portfolio_service: Arc<dyn crate::portfolio::PortfolioService + Send + Sync> =
2313            context.cache.get_service();
2314        let order_snapshot: Arc<dyn OrderSnapshotProvider> = Arc::new(MockOrderSnapshotProvider);
2315        let snapshot_open_orders = Arc::new(SnapshotOpenOrdersSource::new(order_snapshot.clone()));
2316        let risk_account_builder = Arc::new(crate::rsm::portfolio_margin::RiskAccountBuilder::new(
2317            ledger.clone(),
2318            portfolio_service.clone(),
2319            snapshot_open_orders,
2320            greeks_cache.clone(),
2321        ));
2322        let standard_margin_service = Arc::new(StandardMarginService::new());
2323        let standard_account_builder =
2324            Arc::new(crate::standard_margin::StandardAccountBuilder::new(
2325                ledger,
2326                portfolio_service,
2327                greeks_cache.clone(),
2328            ));
2329        let tier_cache = Arc::new(
2330            TierCache::new(context.cache.db.clone()).expect("tier cache should initialize"),
2331        );
2332        tier_cache
2333            .set_margin_mode(&wallet, MarginMode::Portfolio)
2334            .await
2335            .expect("wallet should be set to PM");
2336
2337        let span_margin_service = Arc::new(SpanMarginService::new_for_tests(Config {
2338            risk_free_rate: 0.05,
2339            base_volatility: 0.8,
2340            base_skew: 0.0,
2341            base_excess_kurtosis: 0.0,
2342            delta_threshold: 0.0001,
2343            strike_match_tolerance: 0.01,
2344            expiry_match_tolerance_years: 0.001,
2345            scenarios: vec![
2346                Scenario {
2347                    scenario_type: ScenarioType::SpotChange,
2348                    value: 0.15,
2349                },
2350                Scenario {
2351                    scenario_type: ScenarioType::SpotChange,
2352                    value: -0.15,
2353                },
2354            ],
2355            allow_standard_margin_shorts: false,
2356            fee_config: FeeConfig::default(),
2357        }));
2358
2359        context
2360            .cache
2361            .set_margin_dependencies(
2362                span_margin_service,
2363                greeks_cache,
2364                risk_account_builder,
2365                tier_cache.clone(),
2366                order_snapshot,
2367                standard_margin_service,
2368                standard_account_builder,
2369            )
2370            .await;
2371
2372        tier_cache
2373    }
2374
2375    #[tokio::test]
2376    async fn test_get_portfolio_fail_closed_pm_requires_repricing_inputs() {
2377        let context = setup_test_cache().await;
2378        let wallet = test_wallet(31);
2379
2380        context
2381            .cache
2382            .service
2383            .restore(
2384                &wallet,
2385                PortfolioBalance {
2386                    positions: HashMap::from([(
2387                        "BTC".to_string(),
2388                        crate::portfolio::PositionData {
2389                            symbol: "BTC".to_string(),
2390                            amount: dec!(1),
2391                            entry_price: dec!(48000),
2392                            margin_posted: dec!(0),
2393                            realized_pnl: dec!(0),
2394                            unrealized_pnl: dec!(0),
2395                        },
2396                    )]),
2397                    total_margin_used: dec!(0),
2398                },
2399            )
2400            .await
2401            .expect("restore should succeed");
2402
2403        let err = context
2404            .cache
2405            .get_portfolio_fail_closed_pm(&wallet)
2406            .await
2407            .expect_err("PM REST reads must fail closed without repricing inputs");
2408        assert!(
2409            err.to_string().contains("PM portfolio read unavailable"),
2410            "unexpected error: {}",
2411            err
2412        );
2413    }
2414
2415    #[tokio::test]
2416    async fn test_get_portfolio_fail_closed_pm_skips_perp_repricing() {
2417        let context = setup_test_cache().await;
2418        let wallet = test_wallet(32);
2419
2420        context
2421            .cache
2422            .service
2423            .restore(
2424                &wallet,
2425                PortfolioBalance {
2426                    positions: HashMap::from([(
2427                        "BTC-PERP".to_string(),
2428                        crate::portfolio::PositionData {
2429                            symbol: "BTC-PERP".to_string(),
2430                            amount: dec!(1),
2431                            entry_price: dec!(48000),
2432                            margin_posted: dec!(0),
2433                            realized_pnl: dec!(0),
2434                            unrealized_pnl: dec!(750),
2435                        },
2436                    )]),
2437                    total_margin_used: dec!(0),
2438                },
2439            )
2440            .await
2441            .expect("restore should succeed");
2442
2443        attach_test_greeks_cache(&context, "BTC-20260331-50000-C").await;
2444
2445        let portfolio = context
2446            .cache
2447            .get_portfolio_fail_closed_pm(&wallet)
2448            .await
2449            .expect("PM REST reads should succeed — perps are skipped during repricing");
2450        let btc_position = portfolio
2451            .positions
2452            .iter()
2453            .find(|position| position.position.symbol == "BTC-PERP")
2454            .expect("BTC-PERP position should be present");
2455        // HyperCore UPNL is authoritative — not repriced from spot
2456        assert_eq!(btc_position.position.unrealized_pnl, dec!(750));
2457    }
2458
2459    #[tokio::test]
2460    async fn test_get_portfolio_fail_closed_pm_repairs_applied_settlement_ghost_position() {
2461        let context = setup_test_cache().await;
2462        let wallet = test_wallet(33);
2463        let symbol = "BTC-20260403-34000-C";
2464
2465        context
2466            .cache
2467            .service
2468            .restore(
2469                &wallet,
2470                PortfolioBalance {
2471                    positions: HashMap::from([(
2472                        symbol.to_string(),
2473                        crate::portfolio::PositionData {
2474                            symbol: symbol.to_string(),
2475                            amount: dec!(1),
2476                            entry_price: dec!(1500),
2477                            margin_posted: dec!(0),
2478                            realized_pnl: dec!(0),
2479                            unrealized_pnl: dec!(0),
2480                        },
2481                    )]),
2482                    total_margin_used: dec!(0),
2483                },
2484            )
2485            .await
2486            .expect("restore should succeed");
2487        insert_applied_settlement(
2488            &context,
2489            wallet,
2490            symbol,
2491            dec!(1),
2492            dec!(0),
2493            dec!(0),
2494            dec!(1000),
2495        )
2496        .await;
2497
2498        let portfolio = context
2499            .cache
2500            .get_portfolio_fail_closed_pm(&wallet)
2501            .await
2502            .expect("PM portfolio should self-heal settled ghost positions");
2503
2504        assert!(
2505            portfolio
2506                .positions
2507                .iter()
2508                .all(|position| position.position.symbol != symbol),
2509            "settled ghost symbol should be removed from REST portfolio"
2510        );
2511        let internal = context
2512            .cache
2513            .service
2514            .get_portfolio_balance(&wallet)
2515            .await
2516            .expect("wallet should still exist");
2517        assert!(
2518            !internal.positions.contains_key(symbol),
2519            "settled ghost symbol should be removed from in-memory portfolio"
2520        );
2521    }
2522
2523    #[tokio::test]
2524    async fn test_get_portfolio_fail_closed_pm_keeps_fail_closed_without_settlement_proof() {
2525        let context = setup_test_cache().await;
2526        let wallet = test_wallet(34);
2527        let symbol = "BTC-20260403-34000-C";
2528
2529        context
2530            .cache
2531            .service
2532            .restore(
2533                &wallet,
2534                PortfolioBalance {
2535                    positions: HashMap::from([(
2536                        symbol.to_string(),
2537                        crate::portfolio::PositionData {
2538                            symbol: symbol.to_string(),
2539                            amount: dec!(1),
2540                            entry_price: dec!(1500),
2541                            margin_posted: dec!(0),
2542                            realized_pnl: dec!(0),
2543                            unrealized_pnl: dec!(0),
2544                        },
2545                    )]),
2546                    total_margin_used: dec!(0),
2547                },
2548            )
2549            .await
2550            .expect("restore should succeed");
2551
2552        let err = context
2553            .cache
2554            .get_portfolio_fail_closed_pm(&wallet)
2555            .await
2556            .expect_err("PM portfolio should stay fail-closed without settlement proof");
2557        assert!(
2558            err.to_string().contains("PM portfolio read unavailable"),
2559            "unexpected error: {}",
2560            err
2561        );
2562    }
2563
2564    #[tokio::test]
2565    async fn test_compute_wallet_margin_snapshot_repairs_applied_settlement_ghost_position() {
2566        let context = setup_test_cache().await;
2567        let wallet = test_wallet(35);
2568        let symbol = "BTC-20260403-34000-C";
2569
2570        context
2571            .cache
2572            .service
2573            .restore(
2574                &wallet,
2575                PortfolioBalance {
2576                    positions: HashMap::from([(
2577                        symbol.to_string(),
2578                        crate::portfolio::PositionData {
2579                            symbol: symbol.to_string(),
2580                            amount: dec!(1),
2581                            entry_price: dec!(1500),
2582                            margin_posted: dec!(0),
2583                            realized_pnl: dec!(0),
2584                            unrealized_pnl: dec!(0),
2585                        },
2586                    )]),
2587                    total_margin_used: dec!(0),
2588                },
2589            )
2590            .await
2591            .expect("restore should succeed");
2592        insert_applied_settlement(
2593            &context,
2594            wallet,
2595            symbol,
2596            dec!(1),
2597            dec!(0),
2598            dec!(0),
2599            dec!(1000),
2600        )
2601        .await;
2602        let _tier_cache = attach_test_pm_margin_dependencies(&context, wallet).await;
2603
2604        let snapshot = context
2605            .cache
2606            .compute_wallet_margin_snapshot(&wallet)
2607            .await
2608            .expect("PM margin snapshot should self-heal settled ghost positions");
2609
2610        assert_eq!(snapshot.mode, MarginMode::Portfolio);
2611        assert_eq!(snapshot.total_margin_used, dec!(0));
2612        assert_eq!(snapshot.available_balance, dec!(0));
2613        let internal = context
2614            .cache
2615            .service
2616            .get_portfolio_balance(&wallet)
2617            .await
2618            .expect("wallet should still exist");
2619        assert!(
2620            !internal.positions.contains_key(symbol),
2621            "settled ghost symbol should be removed before PM margin snapshot"
2622        );
2623    }
2624
2625    #[tokio::test]
2626    async fn test_portfolio_initialization() {
2627        let context = setup_test_cache().await;
2628        let cache = context.cache.clone();
2629
2630        // Initialize with empty database - replay starts from command_id 1
2631        let next_command_id = cache.initialize().await.unwrap();
2632        assert_eq!(next_command_id, 1);
2633
2634        // Should have no portfolios
2635        let portfolios = cache.service.all_portfolios().await;
2636        assert_eq!(portfolios.len(), 0);
2637    }
2638
2639    #[tokio::test(flavor = "multi_thread")]
2640    async fn test_snapshot_and_recovery() {
2641        let context = setup_test_cache().await;
2642        let cache = context.cache.clone();
2643
2644        // Create fills via engine messages
2645        let fill1 = Fill {
2646            trade_id: 1,
2647            taker_order_id: 100,
2648            maker_order_id: 101,
2649            symbol: "BTC-CALL-100000".to_string(),
2650            price: dec!(1000),
2651            size: to_contract_units_decimal("BTC-CALL-100000", dec!(10)),
2652            taker_side: Side::Buy,
2653            taker_wallet_address: test_wallet(1),
2654            maker_wallet_address: test_wallet(200),
2655            fee: dec!(0),
2656            is_taker: true,
2657            timestamp: 0,
2658            builder_code_address: None,
2659            builder_code_fee: None,
2660            source: Default::default(),
2661            taker_realized_pnl: None,
2662            maker_realized_pnl: None,
2663            underlying_notional: None,
2664        };
2665
2666        let fill2 = Fill {
2667            trade_id: 2,
2668            taker_order_id: 102,
2669            maker_order_id: 103,
2670            symbol: "ETH-PUT-3000".to_string(),
2671            price: dec!(50),
2672            size: to_contract_units_decimal("ETH-PUT-3000", dec!(5)),
2673            taker_side: Side::Sell,
2674            taker_wallet_address: test_wallet(2),
2675            maker_wallet_address: test_wallet(200),
2676            fee: dec!(0),
2677            is_taker: true,
2678            timestamp: 0,
2679            builder_code_address: None,
2680            builder_code_fee: None,
2681            source: Default::default(),
2682            taker_realized_pnl: None,
2683            maker_realized_pnl: None,
2684            underlying_notional: None,
2685        };
2686
2687        insert_trade_for_fill(&context, &fill1).await;
2688        cache
2689            .handle_engine_message(
2690                EngineMessage::OrderFilled {
2691                    accounting: hypercall_engine::FillAccounting::from_fill(&fill1),
2692                    fill: fill1,
2693                },
2694                1,
2695            )
2696            .await;
2697        insert_trade_for_fill(&context, &fill2).await;
2698        cache
2699            .handle_engine_message(
2700                EngineMessage::OrderFilled {
2701                    accounting: hypercall_engine::FillAccounting::from_fill(&fill2),
2702                    fill: fill2,
2703                },
2704                2,
2705            )
2706            .await;
2707
2708        // Verify seq was set correctly by handle_engine_message
2709        assert_eq!(
2710            cache.get_last_processed_seq().await,
2711            2,
2712            "Seq should be 2 after processing 2 fills"
2713        );
2714
2715        // Take snapshot using the standalone writer with barrier-aware capture
2716        let get_offsets = move || {
2717            let mut offsets = HashMap::new();
2718            offsets.insert(
2719                ENGINE_COMMAND_SNAPSHOT_STREAM.to_string(),
2720                HashMap::from([(0, 1)]),
2721            );
2722            Ok(offsets)
2723        };
2724        // Use barrier-aware capture for crash consistency
2725        let cache_for_capture = cache.clone();
2726        let capture_snapshot = move || {
2727            tokio::task::block_in_place(|| {
2728                tokio::runtime::Handle::current().block_on(async {
2729                    let (_seq, portfolios) = cache_for_capture.capture_snapshot_state().await;
2730                    let mut offsets = HashMap::new();
2731                    offsets.insert(
2732                        ENGINE_COMMAND_SNAPSHOT_STREAM.to_string(),
2733                        HashMap::from([(0, 1)]),
2734                    );
2735                    Ok((portfolios, offsets))
2736                })
2737            })
2738        };
2739
2740        let writer =
2741            DbPortfolioSnapshotWriter::new(cache.db.clone(), cache.get_service(), get_offsets)
2742                .with_capture_snapshot(capture_snapshot);
2743        writer.take_snapshot().unwrap();
2744
2745        // Create new cache instance with same DB
2746        let cache2 = Arc::new(PortfolioCache::new(cache.db.clone()));
2747        let next_command_id = cache2.initialize().await.unwrap();
2748        assert_eq!(next_command_id, 1);
2749
2750        // Verify portfolios were recovered
2751        let portfolio1 = cache2.get_portfolio(&test_wallet(1)).await.unwrap();
2752        assert_eq!(portfolio1.positions.len(), 1);
2753        assert!(cache2.has_portfolio(&test_wallet(1)).await);
2754
2755        let portfolio2 = cache2.get_portfolio(&test_wallet(2)).await.unwrap();
2756        assert_eq!(portfolio2.positions.len(), 1);
2757        assert!(cache2.has_portfolio(&test_wallet(2)).await);
2758    }
2759
2760    #[tokio::test(flavor = "multi_thread")]
2761    async fn test_replay_journal_fill_applies_multiple_fills_from_one_command() {
2762        let context = setup_test_cache().await;
2763        let cache = context.cache.clone();
2764        let symbol = "BTC-CALL-100000".to_string();
2765        let taker_wallet = test_wallet(1);
2766        let maker_wallet = test_wallet(200);
2767
2768        let fill1 = Fill {
2769            trade_id: 10,
2770            taker_order_id: 100,
2771            maker_order_id: 101,
2772            symbol: symbol.clone(),
2773            price: dec!(1000),
2774            size: to_contract_units_decimal(&symbol, dec!(0.1)),
2775            taker_side: Side::Buy,
2776            taker_wallet_address: taker_wallet,
2777            maker_wallet_address: maker_wallet,
2778            fee: dec!(0),
2779            is_taker: true,
2780            timestamp: 0,
2781            builder_code_address: None,
2782            builder_code_fee: None,
2783            source: Default::default(),
2784            taker_realized_pnl: None,
2785            maker_realized_pnl: None,
2786            underlying_notional: None,
2787        };
2788        let fill2 = Fill {
2789            trade_id: 11,
2790            taker_order_id: 100,
2791            maker_order_id: 102,
2792            symbol: symbol.clone(),
2793            price: dec!(1000),
2794            size: to_contract_units_decimal(&symbol, dec!(0.9)),
2795            taker_side: Side::Buy,
2796            taker_wallet_address: taker_wallet,
2797            maker_wallet_address: maker_wallet,
2798            fee: dec!(0),
2799            is_taker: true,
2800            timestamp: 0,
2801            builder_code_address: None,
2802            builder_code_fee: None,
2803            source: Default::default(),
2804            taker_realized_pnl: None,
2805            maker_realized_pnl: None,
2806            underlying_notional: None,
2807        };
2808
2809        insert_trade_for_fill(&context, &fill1).await;
2810        insert_trade_for_fill(&context, &fill2).await;
2811
2812        cache.replay_journal_fill(&fill1, 42).await;
2813        cache.replay_journal_fill(&fill2, 42).await;
2814
2815        let balance = cache
2816            .service
2817            .get_portfolio_balance(&taker_wallet)
2818            .await
2819            .expect("taker portfolio should exist after replay");
2820        assert_eq!(balance.positions.get(&symbol).unwrap().amount, dec!(1.0));
2821    }
2822
2823    #[tokio::test(flavor = "multi_thread")]
2824    async fn test_snapshot_recovery_repairs_applied_settlement_ghost_position_on_read() {
2825        let context = setup_test_cache().await;
2826        let wallet = test_wallet(36);
2827        let symbol = "BTC-20260403-34000-C";
2828
2829        context
2830            .cache
2831            .service
2832            .restore(
2833                &wallet,
2834                PortfolioBalance {
2835                    positions: HashMap::from([(
2836                        symbol.to_string(),
2837                        crate::portfolio::PositionData {
2838                            symbol: symbol.to_string(),
2839                            amount: dec!(1),
2840                            entry_price: dec!(1500),
2841                            margin_posted: dec!(0),
2842                            realized_pnl: dec!(0),
2843                            unrealized_pnl: dec!(0),
2844                        },
2845                    )]),
2846                    total_margin_used: dec!(0),
2847                },
2848            )
2849            .await
2850            .expect("restore should succeed");
2851        insert_applied_settlement(
2852            &context,
2853            wallet,
2854            symbol,
2855            dec!(1),
2856            dec!(0),
2857            dec!(0),
2858            dec!(1000),
2859        )
2860        .await;
2861
2862        let cache = context.cache.clone();
2863        let get_offsets = move || {
2864            let mut offsets = HashMap::new();
2865            offsets.insert(
2866                ENGINE_COMMAND_SNAPSHOT_STREAM.to_string(),
2867                HashMap::from([(0, 1)]),
2868            );
2869            Ok(offsets)
2870        };
2871        let cache_for_capture = cache.clone();
2872        let capture_snapshot = move || {
2873            tokio::task::block_in_place(|| {
2874                tokio::runtime::Handle::current().block_on(async {
2875                    let (_seq, portfolios) = cache_for_capture.capture_snapshot_state().await;
2876                    let mut offsets = HashMap::new();
2877                    offsets.insert(
2878                        ENGINE_COMMAND_SNAPSHOT_STREAM.to_string(),
2879                        HashMap::from([(0, 1)]),
2880                    );
2881                    Ok((portfolios, offsets))
2882                })
2883            })
2884        };
2885
2886        let writer =
2887            DbPortfolioSnapshotWriter::new(cache.db.clone(), cache.get_service(), get_offsets)
2888                .with_capture_snapshot(capture_snapshot);
2889        writer.take_snapshot().expect("snapshot should succeed");
2890
2891        let cache2 = Arc::new(PortfolioCache::new(cache.db.clone()));
2892        let next_command_id = cache2
2893            .initialize()
2894            .await
2895            .expect("initialize should succeed");
2896        assert_eq!(next_command_id, 1);
2897
2898        let portfolio = cache2
2899            .get_portfolio(&wallet)
2900            .await
2901            .expect("portfolio read should self-heal settled ghost position after restore");
2902        assert!(
2903            portfolio
2904                .positions
2905                .iter()
2906                .all(|position| position.position.symbol != symbol),
2907            "settled ghost symbol should be removed after snapshot recovery"
2908        );
2909    }
2910
2911    #[tokio::test]
2912    async fn test_portfolio_subscription() {
2913        let context = setup_test_cache().await;
2914        let cache = context.cache.clone();
2915
2916        // Subscribe to portfolio updates
2917        let (_subscriber_id, mut rx) = cache.subscribe(test_wallet(1)).await;
2918
2919        // Should receive initial snapshot
2920        let update = rx.recv().await.unwrap();
2921        match update {
2922            PortfolioUpdate::Initial { positions, .. } => {
2923                assert_eq!(positions.len(), 0);
2924            }
2925            _ => panic!("Expected Initial update"),
2926        }
2927
2928        // Process a fill via engine message
2929        use hypercall_types::Fill;
2930
2931        let fill = Fill {
2932            trade_id: 1,
2933            taker_order_id: 100,
2934            maker_order_id: 101,
2935            symbol: "BTC-CALL-100000".to_string(),
2936            price: dec!(1000),
2937            size: to_contract_units_decimal("BTC-CALL-100000", dec!(10)),
2938            taker_side: Side::Buy,
2939            taker_wallet_address: test_wallet(1),
2940            maker_wallet_address: test_wallet(200),
2941            fee: dec!(0),
2942            is_taker: true,
2943            timestamp: 0,
2944            builder_code_address: None,
2945            builder_code_fee: None,
2946            source: Default::default(),
2947            taker_realized_pnl: None,
2948            maker_realized_pnl: None,
2949            underlying_notional: None,
2950        };
2951
2952        insert_trade_for_fill(&context, &fill).await;
2953        cache
2954            .handle_engine_message(
2955                EngineMessage::OrderFilled {
2956                    accounting: hypercall_engine::FillAccounting::from_fill(&fill),
2957                    fill,
2958                },
2959                1,
2960            )
2961            .await;
2962
2963        // Should receive position update
2964        let update = rx.recv().await.unwrap();
2965        match update {
2966            PortfolioUpdate::PositionUpdate { position, .. } => {
2967                assert_eq!(position.position.symbol, "BTC-CALL-100000");
2968                assert_eq!(position.position.amount, dec!(10));
2969                assert_eq!(position.position.realized_pnl, dec!(0));
2970                assert_eq!(position.position.unrealized_pnl, dec!(0));
2971            }
2972            _ => panic!("Expected PositionUpdate"),
2973        }
2974
2975        // Should receive balance update
2976        let update = rx.recv().await.unwrap();
2977        match update {
2978            PortfolioUpdate::BalanceUpdate {
2979                total_margin_used, ..
2980            } => {
2981                // Margin used tracked from positions
2982                let _ = total_margin_used;
2983            }
2984            _ => panic!("Expected BalanceUpdate"),
2985        }
2986    }
2987
2988    #[tokio::test]
2989    async fn test_position_expired_notifies_subscribers_after_projection_update() {
2990        let context = setup_test_cache().await;
2991        let cache = context.cache.clone();
2992        let wallet = test_wallet(1);
2993        let symbol = "BTC-20261231-100000-C".to_string();
2994
2995        let (_subscriber_id, mut rx) = cache.subscribe(wallet).await;
2996        let initial = rx.recv().await.unwrap();
2997        assert!(matches!(initial, PortfolioUpdate::Initial { .. }));
2998
2999        let fill = Fill {
3000            trade_id: 1,
3001            taker_order_id: 100,
3002            maker_order_id: 101,
3003            symbol: symbol.clone(),
3004            price: dec!(1000),
3005            size: to_contract_units_decimal(&symbol, dec!(10)),
3006            taker_side: Side::Buy,
3007            taker_wallet_address: wallet,
3008            maker_wallet_address: test_wallet(200),
3009            fee: dec!(0),
3010            is_taker: true,
3011            timestamp: 0,
3012            builder_code_address: None,
3013            builder_code_fee: None,
3014            source: hypercall_types::FillSource::Orderbook,
3015            taker_realized_pnl: None,
3016            maker_realized_pnl: None,
3017            underlying_notional: None,
3018        };
3019
3020        insert_trade_for_fill(&context, &fill).await;
3021        cache
3022            .handle_engine_message(
3023                EngineMessage::OrderFilled {
3024                    accounting: hypercall_engine::FillAccounting::from_fill(&fill),
3025                    fill,
3026                },
3027                1,
3028            )
3029            .await;
3030
3031        let _ = rx.recv().await.unwrap();
3032        let _ = rx.recv().await.unwrap();
3033
3034        insert_applied_settlement(
3035            &context,
3036            wallet,
3037            &symbol,
3038            dec!(10),
3039            dec!(1000),
3040            dec!(10000),
3041            dec!(0),
3042        )
3043        .await;
3044        cache
3045            .handle_engine_message(
3046                EngineMessage::PositionExpired(PositionExpiredMessage {
3047                    wallet_address: wallet,
3048                    margin_mode: MarginMode::Standard,
3049                    symbol: symbol.clone(),
3050                    position_size: dec!(10),
3051                    settlement_price: dec!(1000),
3052                    settlement_value: dec!(10000),
3053                    settlement_entry_price: Some(dec!(1000)),
3054                    cost_basis: Some(dec!(10000)),
3055                    net_pnl: Some(dec!(0)),
3056                    timestamp: 2,
3057                }),
3058                2,
3059            )
3060            .await;
3061
3062        match rx.recv().await.unwrap() {
3063            PortfolioUpdate::PositionUpdate { position, .. } => {
3064                assert_eq!(position.position.symbol, symbol);
3065                assert_eq!(position.position.amount, dec!(0));
3066            }
3067            other => panic!("Expected PositionUpdate after expiry, got {:?}", other),
3068        }
3069
3070        match rx.recv().await.unwrap() {
3071            PortfolioUpdate::BalanceUpdate {
3072                total_margin_used, ..
3073            } => {
3074                assert_eq!(total_margin_used, dec!(0));
3075            }
3076            other => panic!("Expected BalanceUpdate after expiry, got {:?}", other),
3077        }
3078    }
3079
3080    #[tokio::test]
3081    async fn test_subscription_receives_greeks_snapshot_update() {
3082        let context = setup_test_cache().await;
3083        let cache = context.cache.clone();
3084        attach_test_greeks_cache(&context, "BTC-20261231-50000-C").await;
3085
3086        let (_subscriber_id, mut rx) = cache.subscribe(test_wallet(1)).await;
3087        let initial = timeout(Duration::from_secs(2), rx.recv())
3088            .await
3089            .expect("timed out waiting for initial update")
3090            .expect("channel closed");
3091        assert!(matches!(initial, PortfolioUpdate::Initial { .. }));
3092
3093        let greeks_update = timeout(Duration::from_secs(2), rx.recv())
3094            .await
3095            .expect("timed out waiting for greeks snapshot")
3096            .expect("channel closed");
3097        match greeks_update {
3098            PortfolioUpdate::GreeksUpdate {
3099                per_leg, aggregate, ..
3100            } => {
3101                assert!(per_leg.is_empty(), "expected empty per-leg greeks");
3102                assert!(aggregate.is_none(), "expected null aggregate greeks");
3103            }
3104            other => panic!("expected GreeksUpdate, got {:?}", other),
3105        }
3106    }
3107
3108    #[tokio::test]
3109    async fn test_fill_triggers_greeks_update() {
3110        let context = setup_test_cache().await;
3111        let cache = context.cache.clone();
3112        let symbol = "BTC-20261231-50000-C";
3113        attach_test_greeks_cache(&context, symbol).await;
3114
3115        let (_subscriber_id, mut rx) = cache.subscribe(test_wallet(1)).await;
3116        let _ = timeout(Duration::from_secs(2), rx.recv())
3117            .await
3118            .expect("timed out waiting for initial update")
3119            .expect("channel closed");
3120        let _ = timeout(Duration::from_secs(2), rx.recv())
3121            .await
3122            .expect("timed out waiting for greeks snapshot")
3123            .expect("channel closed");
3124
3125        use hypercall_types::Fill;
3126
3127        let fill = Fill {
3128            trade_id: 1001,
3129            taker_order_id: 100,
3130            maker_order_id: 101,
3131            symbol: symbol.to_string(),
3132            price: dec!(1000),
3133            size: to_contract_units_decimal(symbol, dec!(2)),
3134            taker_side: Side::Buy,
3135            taker_wallet_address: test_wallet(1),
3136            maker_wallet_address: test_wallet(200),
3137            fee: dec!(0),
3138            is_taker: true,
3139            timestamp: 0,
3140            builder_code_address: None,
3141            builder_code_fee: None,
3142            source: Default::default(),
3143            taker_realized_pnl: None,
3144            maker_realized_pnl: None,
3145            underlying_notional: None,
3146        };
3147
3148        insert_trade_for_fill(&context, &fill).await;
3149        cache
3150            .handle_engine_message(
3151                EngineMessage::OrderFilled {
3152                    accounting: hypercall_engine::FillAccounting::from_fill(&fill),
3153                    fill,
3154                },
3155                10,
3156            )
3157            .await;
3158
3159        let mut saw_position_update = false;
3160        let mut saw_greeks = false;
3161        for _ in 0..10 {
3162            let update = timeout(Duration::from_secs(5), rx.recv())
3163                .await
3164                .expect("timed out waiting for update")
3165                .expect("channel closed");
3166            match update {
3167                PortfolioUpdate::PositionUpdate { position, .. } => {
3168                    assert_eq!(position.position.symbol, symbol);
3169                    assert!(
3170                        position.position.unrealized_pnl != dec!(0),
3171                        "option UPNL should be repriced from theoretical mark"
3172                    );
3173                    saw_position_update = true;
3174                }
3175                PortfolioUpdate::GreeksUpdate {
3176                    per_leg, aggregate, ..
3177                } => {
3178                    assert!(!per_leg.is_empty(), "expected non-empty per-leg greeks");
3179                    assert_eq!(per_leg[0].symbol, symbol);
3180                    assert!(aggregate.is_some(), "expected aggregate greeks");
3181                    saw_greeks = true;
3182                    if saw_position_update {
3183                        break;
3184                    }
3185                }
3186                _ => {}
3187            }
3188        }
3189
3190        assert!(
3191            saw_position_update,
3192            "expected a PositionUpdate with theoretical repriced UPNL after fill"
3193        );
3194        assert!(saw_greeks, "expected a GreeksUpdate after fill");
3195    }
3196
3197    #[tokio::test]
3198    async fn test_get_portfolio_succeeds_when_theoretical_mark_missing() {
3199        let context = setup_test_cache().await;
3200        let cache = context.cache.clone();
3201        attach_test_greeks_cache(&context, "BTC-20261231-50000-C").await;
3202
3203        let symbol_missing_mark = "BTC-20261231-51000-C";
3204        let fill = Fill {
3205            trade_id: 2001,
3206            taker_order_id: 100,
3207            maker_order_id: 101,
3208            symbol: symbol_missing_mark.to_string(),
3209            price: dec!(1000),
3210            size: to_contract_units_decimal(symbol_missing_mark, dec!(1)),
3211            taker_side: Side::Buy,
3212            taker_wallet_address: test_wallet(1),
3213            maker_wallet_address: test_wallet(200),
3214            fee: dec!(0),
3215            is_taker: true,
3216            timestamp: 0,
3217            builder_code_address: None,
3218            builder_code_fee: None,
3219            source: Default::default(),
3220            taker_realized_pnl: None,
3221            maker_realized_pnl: None,
3222            underlying_notional: None,
3223        };
3224
3225        insert_trade_for_fill(&context, &fill).await;
3226        cache
3227            .handle_engine_message(
3228                EngineMessage::OrderFilled {
3229                    accounting: hypercall_engine::FillAccounting::from_fill(&fill),
3230                    fill,
3231                },
3232                11,
3233            )
3234            .await;
3235
3236        // Should succeed even when theoretical marks are missing (best-effort repricing)
3237        let portfolio = cache
3238            .get_portfolio(&test_wallet(1))
3239            .await
3240            .expect("portfolio should succeed even with missing theoretical marks");
3241        assert!(!portfolio.positions.is_empty());
3242    }
3243
3244    #[tokio::test]
3245    async fn test_get_portfolio_reprices_option_upnl_with_theoretical_mark() {
3246        let context = setup_test_cache().await;
3247        let cache = context.cache.clone();
3248        let symbol = "BTC-20261231-50000-C";
3249        attach_test_greeks_cache(&context, symbol).await;
3250
3251        let fill = Fill {
3252            trade_id: 2002,
3253            taker_order_id: 100,
3254            maker_order_id: 101,
3255            symbol: symbol.to_string(),
3256            price: dec!(1000),
3257            size: to_contract_units_decimal(symbol, dec!(1)),
3258            taker_side: Side::Buy,
3259            taker_wallet_address: test_wallet(1),
3260            maker_wallet_address: test_wallet(200),
3261            fee: dec!(0),
3262            is_taker: true,
3263            timestamp: 0,
3264            builder_code_address: None,
3265            builder_code_fee: None,
3266            source: Default::default(),
3267            taker_realized_pnl: None,
3268            maker_realized_pnl: None,
3269            underlying_notional: None,
3270        };
3271
3272        insert_trade_for_fill(&context, &fill).await;
3273        cache
3274            .handle_engine_message(
3275                EngineMessage::OrderFilled {
3276                    accounting: hypercall_engine::FillAccounting::from_fill(&fill),
3277                    fill,
3278                },
3279                12,
3280            )
3281            .await;
3282
3283        let portfolio = cache
3284            .get_portfolio(&test_wallet(1))
3285            .await
3286            .expect("portfolio fetch should succeed");
3287        let position = portfolio
3288            .positions
3289            .iter()
3290            .find(|p| p.position.symbol == symbol)
3291            .expect("position should exist");
3292        assert!(
3293            position.position.unrealized_pnl != dec!(0),
3294            "REST portfolio should expose theoretical repriced UPNL"
3295        );
3296    }
3297
3298    #[tokio::test]
3299    async fn test_multiple_subscribers() {
3300        let context = setup_test_cache().await;
3301        let cache = context.cache.clone();
3302
3303        // Multiple subscribers for same account
3304        let (_subscriber_id_1, mut rx1) = cache.subscribe(test_wallet(1)).await;
3305        let (_subscriber_id_2, mut rx2) = cache.subscribe(test_wallet(1)).await;
3306
3307        // Both should receive initial snapshot
3308        let _ = rx1.recv().await.unwrap();
3309        let _ = rx2.recv().await.unwrap();
3310
3311        // Process a fill via engine message
3312        use hypercall_types::Fill;
3313
3314        let fill = Fill {
3315            trade_id: 2,
3316            taker_order_id: 200,
3317            maker_order_id: 201,
3318            symbol: "BTC-CALL-100000".to_string(),
3319            price: dec!(1000),
3320            size: to_contract_units_decimal("BTC-CALL-100000", dec!(10)),
3321            taker_side: Side::Buy,
3322            taker_wallet_address: test_wallet(1),
3323            maker_wallet_address: test_wallet(200),
3324            fee: dec!(0),
3325            is_taker: true,
3326            timestamp: 0,
3327            builder_code_address: None,
3328            builder_code_fee: None,
3329            source: Default::default(),
3330            taker_realized_pnl: None,
3331            maker_realized_pnl: None,
3332            underlying_notional: None,
3333        };
3334
3335        insert_trade_for_fill(&context, &fill).await;
3336        cache
3337            .handle_engine_message(
3338                EngineMessage::OrderFilled {
3339                    accounting: hypercall_engine::FillAccounting::from_fill(&fill),
3340                    fill,
3341                },
3342                2,
3343            )
3344            .await;
3345
3346        // Both should receive updates
3347        let update1 = rx1.recv().await.unwrap();
3348        let update2 = rx2.recv().await.unwrap();
3349
3350        match (update1, update2) {
3351            (
3352                PortfolioUpdate::PositionUpdate { position: pos1, .. },
3353                PortfolioUpdate::PositionUpdate { position: pos2, .. },
3354            ) => {
3355                assert_eq!(pos1.position.realized_pnl, dec!(0));
3356                assert_eq!(pos1.position.unrealized_pnl, dec!(0));
3357                assert_eq!(pos2.position.realized_pnl, dec!(0));
3358                assert_eq!(pos2.position.unrealized_pnl, dec!(0));
3359            }
3360            _ => panic!("Expected both subscribers to receive PositionUpdate"),
3361        }
3362    }
3363
3364    #[tokio::test]
3365    async fn test_seq_updated_after_apply() {
3366        let context = setup_test_cache().await;
3367        let cache = context.cache.clone();
3368
3369        // Initially seq should be 0
3370        assert_eq!(cache.get_last_processed_seq().await, 0);
3371
3372        // Process a fill
3373        use hypercall_types::Fill;
3374
3375        let fill = Fill {
3376            trade_id: 42,
3377            taker_order_id: 100,
3378            maker_order_id: 101,
3379            symbol: "BTC-CALL-100000".to_string(),
3380            price: dec!(1000),
3381            size: to_contract_units_decimal("BTC-CALL-100000", dec!(10)),
3382            taker_side: Side::Buy,
3383            taker_wallet_address: test_wallet(1),
3384            maker_wallet_address: test_wallet(200),
3385            fee: dec!(0),
3386            is_taker: true,
3387            timestamp: 0,
3388            builder_code_address: None,
3389            builder_code_fee: None,
3390            source: Default::default(),
3391            taker_realized_pnl: None,
3392            maker_realized_pnl: None,
3393            underlying_notional: None,
3394        };
3395
3396        insert_trade_for_fill(&context, &fill).await;
3397        cache
3398            .handle_engine_message(
3399                EngineMessage::OrderFilled {
3400                    accounting: hypercall_engine::FillAccounting::from_fill(&fill),
3401                    fill,
3402                },
3403                42,
3404            )
3405            .await;
3406
3407        // Seq should now be 42
3408        assert_eq!(cache.get_last_processed_seq().await, 42);
3409
3410        // Portfolio should reflect the fill
3411        let portfolio = cache.get_portfolio(&test_wallet(1)).await.unwrap();
3412        assert_eq!(portfolio.positions.len(), 1);
3413        assert_eq!(portfolio.positions[0].position.amount, dec!(10));
3414    }
3415
3416    #[tokio::test]
3417    async fn test_capture_snapshot_state_is_consistent() {
3418        let context = setup_test_cache().await;
3419        let cache = context.cache.clone();
3420
3421        // Process two fills
3422        use hypercall_types::Fill;
3423
3424        let fill1 = Fill {
3425            trade_id: 1,
3426            taker_order_id: 100,
3427            maker_order_id: 101,
3428            symbol: "BTC-CALL-100000".to_string(),
3429            price: dec!(1000),
3430            size: to_contract_units_decimal("BTC-CALL-100000", dec!(10)),
3431            taker_side: Side::Buy,
3432            taker_wallet_address: test_wallet(1),
3433            maker_wallet_address: test_wallet(200),
3434            fee: dec!(0),
3435            is_taker: true,
3436            timestamp: 0,
3437            builder_code_address: None,
3438            builder_code_fee: None,
3439            source: Default::default(),
3440            taker_realized_pnl: None,
3441            maker_realized_pnl: None,
3442            underlying_notional: None,
3443        };
3444
3445        insert_trade_for_fill(&context, &fill1).await;
3446        cache
3447            .handle_engine_message(
3448                EngineMessage::OrderFilled {
3449                    accounting: hypercall_engine::FillAccounting::from_fill(&fill1),
3450                    fill: fill1,
3451                },
3452                1,
3453            )
3454            .await;
3455
3456        // Capture snapshot state
3457        let (seq, portfolios) = cache.capture_snapshot_state().await;
3458
3459        // The snapshot should be consistent: seq matches state
3460        assert_eq!(seq, 1);
3461        assert!(portfolios.contains_key(&test_wallet(1)));
3462        let balance = portfolios.get(&test_wallet(1)).unwrap();
3463        assert!(balance.positions.contains_key("BTC-CALL-100000"));
3464        assert_eq!(
3465            balance.positions.get("BTC-CALL-100000").unwrap().amount,
3466            dec!(10)
3467        );
3468    }
3469
3470    #[test]
3471    fn test_normalize_test_database_url_repairs_duplicate_query_separator() {
3472        let normalized = normalize_test_database_url(
3473            "postgresql://user:pass@localhost:5432/test_db?sslmode=disable?sslmode=require",
3474            Some("isolated_db"),
3475        );
3476
3477        assert_eq!(
3478            normalized,
3479            "postgresql://user:pass@localhost:5432/isolated_db?sslmode=disable"
3480        );
3481    }
3482
3483    #[test]
3484    fn test_normalize_test_database_url_preserves_authority_verbatim() {
3485        let normalized = normalize_test_database_url(
3486            "postgresql://user:pa%40ss@shared-db.internal:5432/test_db?sslmode=require",
3487            Some("isolated_db"),
3488        );
3489
3490        assert_eq!(
3491            normalized,
3492            "postgresql://user:pa%40ss@shared-db.internal:5432/isolated_db?sslmode=require"
3493        );
3494    }
3495
3496    #[tokio::test]
3497    async fn test_publish_margin_updates_for_subscribers_counts_unique_wallets() {
3498        let context = setup_test_cache().await;
3499        let cache = context.cache.clone();
3500
3501        // Two subscribers on wallet A.
3502        let (_subscriber_id_a1, mut rx_a1) = cache.subscribe(test_wallet(1)).await;
3503        let (_subscriber_id_a2, mut rx_a2) = cache.subscribe(test_wallet(1)).await;
3504        // One subscriber on wallet B.
3505        let (_subscriber_id_b1, mut rx_b1) = cache.subscribe(test_wallet(2)).await;
3506
3507        // Drain Initial messages.
3508        let _ = rx_a1.recv().await.unwrap();
3509        let _ = rx_a2.recv().await.unwrap();
3510        let _ = rx_b1.recv().await.unwrap();
3511
3512        let attempted = cache.publish_margin_updates_for_subscribers().await;
3513        assert_eq!(attempted, 2, "should attempt unique wallets only");
3514    }
3515
3516    #[tokio::test]
3517    async fn test_publish_margin_updates_for_subscribers_empty() {
3518        let context = setup_test_cache().await;
3519        let cache = context.cache.clone();
3520
3521        let attempted = cache.publish_margin_updates_for_subscribers().await;
3522        assert_eq!(attempted, 0, "no subscribers should attempt zero wallets");
3523    }
3524
3525    #[tokio::test]
3526    async fn test_dropped_subscriber_is_pruned_on_notify() {
3527        let context = setup_test_cache().await;
3528        let cache = context.cache.clone();
3529        let wallet = test_wallet(1);
3530
3531        let (_subscriber_id, mut rx) = cache.subscribe(wallet).await;
3532        let _ = rx.recv().await.unwrap();
3533        drop(rx);
3534
3535        cache
3536            .notify_subscribers(
3537                &wallet,
3538                PortfolioUpdate::BalanceUpdate {
3539                    total_margin_used: dec!(0),
3540                    timestamp: chrono::Utc::now().timestamp(),
3541                },
3542            )
3543            .await;
3544
3545        let subscribers = cache.subscribers.read().await;
3546        assert!(
3547            subscribers.get(&wallet).is_none(),
3548            "dropped subscriber should be removed after send failure"
3549        );
3550    }
3551}