1use super::greeks::GreeksCache;
2use super::tier::TierCache;
3use hypercall_types::api_models::{
4 MarginSummary, Portfolio, PortfolioGreeksAggregate, Position, PositionGreeksLeg,
5 PositionWithMetrics, SpanMarginSummary,
6};
7use hypercall_types::portfolio_greeks::{
8 build_net_position_quantities, calculate_portfolio_greeks,
9};
10use hypercall_types::position_metrics::{enrich_position_metrics, PositionMarginMetrics};
11
12use crate::portfolio::canonical_perp_symbol;
13use crate::portfolio::{
14 HypercorePositionUpdate, PortfolioBalance, PortfolioChange, PortfolioService,
15 PortfolioServiceImpl,
16};
17use crate::rsm::margin_service::{ExtendedRiskGrid, ScenarioPnl, SpanMarginService};
18use crate::rsm::portfolio_margin::RiskAccountBuilder;
19use crate::rsm::MarginMode;
20use crate::shared::order_types::ParsedSymbol;
21use crate::snapshot::{
22 DbPortfolioSnapshotLoader, SnapshotLoader, Snapshotable, SyncState, SyncStatus,
23};
24use crate::standard_margin::{
25 PositionMarginContribution, StandardAccountBuilder, StandardMarginService,
26};
27use crate::types::MarginDetails;
28use anyhow::Result;
29use futures::future::join_all;
30use hypercall_db_diesel::DatabaseHandler;
31use hypercall_runtime_api::OrderSnapshotProvider;
32use hypercall_runtime_api::RuntimeOrderSummary;
33use hypercall_types::ws_protocol::PortfolioUpdate;
34use hypercall_types::{to_contract_units_decimal, to_human_readable_decimal, WalletAddress};
35use hypercall_types::{EngineMessage, PositionExpiredMessage};
36use rust_decimal::prelude::FromPrimitive;
37use rust_decimal::Decimal;
38use rust_decimal_macros::dec;
39use std::collections::{BTreeSet, HashMap};
40use std::str::FromStr;
41use std::sync::Arc;
42use tokio::sync::{mpsc, Mutex, OwnedMutexGuard, RwLock};
43
44pub(crate) enum PendingNotifications {
48 Fill { fill: hypercall_types::Fill },
49 Changes(Vec<PortfolioChange>),
50}
51
52type SubscriberId = u64;
53pub const ENGINE_COMMAND_SNAPSHOT_STREAM: &str = "engine_command";
54
55#[derive(Debug, Clone)]
56pub struct WalletMarginSnapshot {
57 pub mode: MarginMode,
58 pub span_margin: SpanMarginSummary,
59 pub margin_summary: MarginSummary,
60 pub total_margin_used: Decimal,
61 pub available_balance: Decimal,
62 pub standard_position_contributions: Option<HashMap<String, PositionMarginContribution>>,
66 pub standard_option_marks: Option<HashMap<String, Decimal>>,
69}
70
71#[derive(Debug, Clone)]
72pub struct PmRiskGridData {
73 pub margin_details: MarginDetails,
74 pub position_details: MarginDetails,
75 pub scenario_pnls: Vec<ScenarioPnl>,
76 pub extended_grid: ExtendedRiskGrid,
77}
78
79pub struct PortfolioCache {
80 service: Arc<PortfolioServiceImpl>,
81 db: Arc<DatabaseHandler>,
82 settlement: Arc<dyn hypercall_db::SettlementWriter>,
83 last_processed_seq: Arc<RwLock<i64>>,
84 projection_barrier: Arc<Mutex<()>>,
85 subscribers: Arc<
86 RwLock<
87 HashMap<WalletAddress, HashMap<SubscriberId, mpsc::UnboundedSender<PortfolioUpdate>>>,
88 >,
89 >,
90 next_subscriber_id: Arc<RwLock<SubscriberId>>,
91 span_margin_service: Arc<RwLock<Option<Arc<SpanMarginService>>>>,
93 greeks_cache: Arc<RwLock<Option<Arc<GreeksCache>>>>,
95 risk_account_builder: Arc<RwLock<Option<Arc<RiskAccountBuilder>>>>,
97 tier_cache: Arc<RwLock<Option<Arc<TierCache>>>>,
99 order_snapshot: Arc<RwLock<Option<Arc<dyn OrderSnapshotProvider>>>>,
101 standard_margin_service: Arc<RwLock<Option<Arc<StandardMarginService>>>>,
103 standard_account_builder: Arc<RwLock<Option<Arc<StandardAccountBuilder>>>>,
105 sync_status: Arc<SyncStatus>,
108}
109
110impl PortfolioCache {
111 pub fn new(db: Arc<DatabaseHandler>) -> Self {
112 let settlement: Arc<dyn hypercall_db::SettlementWriter> = db.clone();
113 Self {
114 service: Arc::new(PortfolioServiceImpl::new()),
115 settlement,
116 db,
117 last_processed_seq: Arc::new(RwLock::new(0)),
118 projection_barrier: Arc::new(Mutex::new(())),
119 subscribers: Arc::new(RwLock::new(HashMap::new())),
120 next_subscriber_id: Arc::new(RwLock::new(0)),
121 span_margin_service: Arc::new(RwLock::new(None)),
122 greeks_cache: Arc::new(RwLock::new(None)),
123 risk_account_builder: Arc::new(RwLock::new(None)),
124 tier_cache: Arc::new(RwLock::new(None)),
125 order_snapshot: Arc::new(RwLock::new(None)),
126 standard_margin_service: Arc::new(RwLock::new(None)),
127 standard_account_builder: Arc::new(RwLock::new(None)),
128 sync_status: Arc::new(SyncStatus::new()),
129 }
130 }
131
132 pub fn sync_status(&self) -> Arc<SyncStatus> {
137 self.sync_status.clone()
138 }
139
140 pub async fn lock_projection_barrier(&self) -> OwnedMutexGuard<()> {
141 self.projection_barrier.clone().lock_owned().await
142 }
143
144 pub fn is_ready(&self) -> bool {
146 self.sync_status.is_ready()
147 }
148
149 pub fn sync_state(&self) -> SyncState {
151 self.sync_status.state()
152 }
153
154 pub fn set_catching_up(&self) {
156 self.sync_status.set_catching_up();
157 tracing::info!("Portfolio sync status: CatchingUp");
158 }
159
160 pub fn set_ready(&self) {
162 self.sync_status.set_ready();
163 tracing::info!("Portfolio sync status: Ready");
164 }
165
166 pub async fn set_margin_dependencies(
168 &self,
169 span_margin_service: Arc<SpanMarginService>,
170 greeks_cache: Arc<GreeksCache>,
171 risk_account_builder: Arc<RiskAccountBuilder>,
172 tier_cache: Arc<TierCache>,
173 order_snapshot: Arc<dyn OrderSnapshotProvider>,
174 standard_margin_service: Arc<StandardMarginService>,
175 standard_account_builder: Arc<StandardAccountBuilder>,
176 ) {
177 let mut svc = self.span_margin_service.write().await;
178 *svc = Some(span_margin_service);
179 drop(svc);
180
181 let mut cache = self.greeks_cache.write().await;
182 *cache = Some(greeks_cache);
183 drop(cache);
184
185 let mut builder = self.risk_account_builder.write().await;
186 *builder = Some(risk_account_builder);
187 drop(builder);
188
189 let mut tier = self.tier_cache.write().await;
190 *tier = Some(tier_cache);
191 drop(tier);
192
193 let mut orders = self.order_snapshot.write().await;
194 *orders = Some(order_snapshot);
195 drop(orders);
196
197 let mut standard_svc = self.standard_margin_service.write().await;
198 *standard_svc = Some(standard_margin_service);
199 drop(standard_svc);
200
201 let mut standard_builder = self.standard_account_builder.write().await;
202 *standard_builder = Some(standard_account_builder);
203 }
204
205 pub async fn compute_wallet_margin_snapshot(
206 &self,
207 wallet: &WalletAddress,
208 ) -> anyhow::Result<WalletMarginSnapshot> {
209 let margin_mode = self.margin_mode_for_wallet(wallet).await?;
210 self.compute_wallet_margin_snapshot_for_mode(wallet, margin_mode)
211 .await
212 }
213
214 async fn compute_wallet_margin_snapshot_for_mode(
215 &self,
216 wallet: &WalletAddress,
217 margin_mode: MarginMode,
218 ) -> anyhow::Result<WalletMarginSnapshot> {
219 match margin_mode {
220 MarginMode::Portfolio => self.compute_portfolio_mode_snapshot(wallet).await,
221 MarginMode::Standard => self.compute_standard_mode_snapshot(wallet).await,
222 }
223 }
224
225 async fn compute_portfolio_mode_snapshot(
226 &self,
227 wallet: &WalletAddress,
228 ) -> anyhow::Result<WalletMarginSnapshot> {
229 self.prune_applied_settlement_ghost_positions(wallet)
230 .await?;
231
232 let margin_svc = self.span_margin_service.read().await;
233 let risk_builder = self.risk_account_builder.read().await;
234
235 let (margin_service, risk_account_builder) =
236 match (margin_svc.as_ref(), risk_builder.as_ref()) {
237 (Some(m), Some(r)) => (m.clone(), r.clone()),
238 _ => {
239 return Err(anyhow::anyhow!(
240 "Portfolio margin dependencies are not fully configured"
241 ));
242 }
243 };
244 drop(margin_svc);
245 drop(risk_builder);
246
247 let snapshot = risk_account_builder.build_snapshot(wallet).await?;
248 let market_state =
249 risk_account_builder.resolve_market_state(&snapshot, margin_service.config())?;
250 let position_snapshot = snapshot.without_open_orders();
251
252 let margin_details = margin_service
253 .compute_margin_from_snapshot(&snapshot, market_state.clone())
254 .map_err(|e| anyhow::anyhow!("SPAN margin failed for {}: {}", wallet, e))?;
255 let position_details = margin_service
256 .compute_margin_from_snapshot(&position_snapshot, market_state)
257 .map_err(|e| {
258 anyhow::anyhow!("Position-only SPAN margin failed for {}: {}", wallet, e)
259 })?;
260
261 let position_im = position_details.initial_margin_required;
262 let open_orders_im = (margin_details.initial_margin_required
263 - position_details.initial_margin_required)
264 .max(Decimal::ZERO);
265 let position_mm = position_details.maintenance_margin_required;
266 let total_margin_used = position_im + open_orders_im;
267 let available_balance = (margin_details.equity - total_margin_used).max(dec!(0));
268
269 let span_margin = SpanMarginSummary {
270 equity: margin_details.equity,
271 initial_margin_required: position_im,
272 maintenance_margin_required: position_mm,
273 open_orders_initial_margin: open_orders_im,
274 option_margin_required: position_details.scanning_risk,
275 scanning_risk: position_details.scanning_risk,
276 option_floor: position_details.option_floor,
277 gamma_overlay: position_details.gamma_overlay,
278 hypercore_margin_required: dec!(0),
279 };
280
281 let margin_summary = MarginSummary {
282 mode: MarginMode::Portfolio.as_str().to_string(),
283 equity: margin_details.equity,
284 position_im,
285 open_orders_im,
286 initial_margin: margin_details.equity - position_im - open_orders_im,
287 maintenance_margin: margin_details.equity - position_mm,
288 open_orders_premium_reserved: None,
289 };
290
291 Ok(WalletMarginSnapshot {
292 mode: MarginMode::Portfolio,
293 span_margin,
294 margin_summary,
295 total_margin_used,
296 available_balance,
297 standard_position_contributions: None,
298 standard_option_marks: None,
299 })
300 }
301
302 pub async fn compute_pm_risk_grid_data(
303 &self,
304 wallet: &WalletAddress,
305 ) -> anyhow::Result<PmRiskGridData> {
306 let margin_mode = self.margin_mode_for_wallet(wallet).await?;
307 if matches!(margin_mode, MarginMode::Standard) {
308 return Err(anyhow::anyhow!(
309 "compute_pm_risk_grid_data called for Standard-mode wallet {}",
310 wallet
311 ));
312 }
313
314 self.prune_applied_settlement_ghost_positions(wallet)
315 .await?;
316
317 let margin_svc = self.span_margin_service.read().await;
318 let risk_builder = self.risk_account_builder.read().await;
319
320 let (margin_service, risk_account_builder) =
321 match (margin_svc.as_ref(), risk_builder.as_ref()) {
322 (Some(m), Some(r)) => (m.clone(), r.clone()),
323 _ => {
324 return Err(anyhow::anyhow!(
325 "Portfolio margin dependencies are not fully configured"
326 ));
327 }
328 };
329 drop(margin_svc);
330 drop(risk_builder);
331
332 let snapshot = risk_account_builder.build_snapshot(wallet).await?;
333 let market_state =
334 risk_account_builder.resolve_market_state(&snapshot, margin_service.config())?;
335 let position_snapshot = snapshot.without_open_orders();
336
337 let margin_details = margin_service
338 .compute_margin_from_snapshot(&snapshot, market_state.clone())
339 .map_err(|e| anyhow::anyhow!("SPAN margin failed: {}", e))?;
340 let position_details = margin_service
341 .compute_margin_from_snapshot(&position_snapshot, market_state.clone())
342 .map_err(|e| anyhow::anyhow!("Position-only SPAN margin failed: {}", e))?;
343 let scenario_pnls = margin_service
344 .compute_risk_grid_from_snapshot(&snapshot, market_state.clone())
345 .map_err(|e| anyhow::anyhow!("Risk grid failed: {}", e))?;
346 let extended_grid = margin_service
347 .compute_extended_risk_grid_from_snapshot(&snapshot, market_state)
348 .map_err(|e| anyhow::anyhow!("Extended risk grid failed: {}", e))?;
349
350 Ok(PmRiskGridData {
351 margin_details,
352 position_details,
353 scenario_pnls,
354 extended_grid,
355 })
356 }
357
358 async fn compute_standard_mode_snapshot(
359 &self,
360 wallet: &WalletAddress,
361 ) -> anyhow::Result<WalletMarginSnapshot> {
362 let standard_svc_guard = self.standard_margin_service.read().await;
363 let standard_builder_guard = self.standard_account_builder.read().await;
364 let order_snapshot_guard = self.order_snapshot.read().await;
365 let greeks_cache_guard = self.greeks_cache.read().await;
366
367 let (standard_margin_service, standard_account_builder, order_snapshot, greeks_cache) =
368 match (
369 standard_svc_guard.as_ref(),
370 standard_builder_guard.as_ref(),
371 order_snapshot_guard.as_ref(),
372 greeks_cache_guard.as_ref(),
373 ) {
374 (Some(svc), Some(builder), Some(snapshot), Some(greeks)) => (
375 svc.clone(),
376 builder.clone(),
377 snapshot.clone(),
378 greeks.clone(),
379 ),
380 _ => {
381 return Err(anyhow::anyhow!(
382 "Standard margin dependencies are not fully configured"
383 ));
384 }
385 };
386 drop(standard_svc_guard);
387 drop(standard_builder_guard);
388 drop(order_snapshot_guard);
389 drop(greeks_cache_guard);
390
391 if let Err(e) = self.refresh_live_market_prices_for_wallet(wallet).await {
394 tracing::warn!(
395 "Standard margin snapshot: could not refresh prices for {}: {}",
396 wallet,
397 e
398 );
399 }
400
401 let standard_account = standard_account_builder.build(wallet).await?;
402 let standard_position_contributions =
403 standard_margin_service.compute_position_margin_breakdown(&standard_account);
404 let standard_option_marks: HashMap<String, Decimal> = standard_account
405 .option_positions
406 .iter()
407 .map(|p| (p.symbol.clone(), p.mark_price))
408 .collect();
409 let cash = standard_account.usdc_balance;
410 let base_result = standard_margin_service.compute_margin(&standard_account);
411
412 let mut hypothetical_account = standard_account.clone();
413 let open_sell_positions =
414 Self::snapshot_open_sell_positions(wallet, order_snapshot.as_ref())?;
415 let missing_open_sell_spot_underlyings: BTreeSet<String> = open_sell_positions
416 .iter()
417 .filter(|pos| pos.position.spot_price == dec!(0))
418 .map(|pos| pos.position.underlying.clone())
419 .collect();
420 let mut missing_open_sell_spot_prices = HashMap::new();
421 if !missing_open_sell_spot_underlyings.is_empty() {
422 let fetches =
423 missing_open_sell_spot_underlyings
424 .into_iter()
425 .map(|underlying| {
426 let greeks_cache = greeks_cache.clone();
427 async move {
428 let spot = greeks_cache.get_spot_price(&underlying).await.ok_or_else(
429 || {
430 anyhow::anyhow!(
431 "Missing spot price for {} in standard margin calculation",
432 underlying
433 )
434 },
435 )?;
436 let spot_decimal = Decimal::from_f64(spot).ok_or_else(|| {
437 anyhow::anyhow!("Invalid spot price conversion for {}", underlying)
438 })?;
439 Ok::<(String, Decimal), anyhow::Error>((underlying, spot_decimal))
440 }
441 });
442
443 for result in join_all(fetches).await {
444 let (underlying, spot_decimal) = result?;
445 missing_open_sell_spot_prices.insert(underlying, spot_decimal);
446 }
447 }
448
449 for mut pos in open_sell_positions {
450 if pos.position.spot_price == dec!(0) {
451 pos.position.spot_price = *missing_open_sell_spot_prices
452 .get(&pos.position.underlying)
453 .ok_or_else(|| {
454 anyhow::anyhow!(
455 "Missing prefetched spot price for {} in standard margin calculation",
456 pos.position.underlying
457 )
458 })?;
459 }
460 hypothetical_account.option_positions.push(pos.position);
461 }
462
463 let hypothetical_result = standard_margin_service.compute_margin(&hypothetical_account);
464 let open_orders_im =
465 (hypothetical_result.position_im - base_result.position_im).max(dec!(0));
466 let premium_reserved = Self::snapshot_open_buy_premium(wallet, order_snapshot.as_ref())?;
467 let total_margin_used = base_result.position_im + open_orders_im;
468 let available_balance = (cash - total_margin_used - premium_reserved).max(dec!(0));
469
470 let span_margin = SpanMarginSummary {
471 equity: base_result.equity,
472 initial_margin_required: base_result.position_im,
473 maintenance_margin_required: base_result.position_mm,
474 open_orders_initial_margin: open_orders_im,
475 option_margin_required: base_result.position_im,
476 scanning_risk: dec!(0),
477 option_floor: dec!(0),
478 gamma_overlay: dec!(0),
479 hypercore_margin_required: dec!(0),
480 };
481
482 let margin_summary = MarginSummary {
483 mode: MarginMode::Standard.as_str().to_string(),
484 equity: base_result.equity,
485 position_im: base_result.position_im,
486 open_orders_im,
487 initial_margin: base_result.equity
488 - base_result.position_im
489 - open_orders_im
490 - premium_reserved,
491 maintenance_margin: base_result.equity - base_result.position_mm,
492 open_orders_premium_reserved: Some(premium_reserved),
493 };
494
495 Ok(WalletMarginSnapshot {
496 mode: MarginMode::Standard,
497 span_margin,
498 margin_summary,
499 total_margin_used,
500 available_balance,
501 standard_position_contributions: Some(standard_position_contributions),
502 standard_option_marks: Some(standard_option_marks),
503 })
504 }
505
506 fn snapshot_open_sell_positions(
507 wallet: &WalletAddress,
508 order_snapshot: &dyn OrderSnapshotProvider,
509 ) -> anyhow::Result<Vec<hypercall_engine::order_index::OpenSellPositionInfo>> {
510 let mut positions = Vec::new();
511 for order in order_snapshot.get_open_orders_for_wallet(wallet) {
512 if !matches!(order.side, hypercall_types::Side::Sell) {
513 continue;
514 }
515 let parsed = ParsedSymbol::from_symbol(&order.symbol).map_err(|e| {
516 anyhow::anyhow!(
517 "Invalid option symbol '{}' in open sell order {} for {}: {}",
518 order.symbol,
519 order.order_id,
520 wallet,
521 e
522 )
523 })?;
524 if order.remaining_size <= dec!(0) {
525 continue;
526 }
527
528 let premium = order.price * order.remaining_size;
529 let expiry_date_str = format!("{}", parsed.expiry);
530 let expiry_code = expiry_date_str.parse::<u64>().map_err(|e| {
531 anyhow::anyhow!(
532 "Invalid expiry '{}' in open sell order {} for {} ({}): {}",
533 expiry_date_str,
534 order.order_id,
535 wallet,
536 order.symbol,
537 e
538 )
539 })?;
540 let expiry_ts =
541 hypercall_types::expiry_date_to_timestamp_checked(&parsed.underlying, expiry_code)
542 .map_err(|e| {
543 anyhow::anyhow!(
544 "Invalid expiry timestamp in open sell order {} for {} ({}): {}",
545 order.order_id,
546 wallet,
547 order.symbol,
548 e
549 )
550 })? as i64;
551
552 let position = hypercall_margin::OptionPosition {
553 symbol: order.symbol.clone(),
554 underlying: parsed.underlying.clone(),
555 expiry_ts,
556 strike: parsed.strike,
557 is_call: matches!(parsed.option_type, crate::types::OptionType::Call),
558 size: -order.remaining_size,
559 mark_price: order.price,
560 entry_price: order.price,
561 spot_price: dec!(0),
562 };
563
564 positions
565 .push(hypercall_engine::order_index::OpenSellPositionInfo { premium, position });
566 }
567
568 Ok(positions)
569 }
570
571 fn snapshot_open_buy_premium(
572 wallet: &WalletAddress,
573 order_snapshot: &dyn OrderSnapshotProvider,
574 ) -> anyhow::Result<Decimal> {
575 order_snapshot
576 .get_open_orders_for_wallet(wallet)
577 .into_iter()
578 .filter(|order: &RuntimeOrderSummary| matches!(order.side, hypercall_types::Side::Buy))
579 .filter(|order| order.remaining_size > dec!(0))
580 .try_fold(dec!(0), |acc, order| {
581 ParsedSymbol::from_symbol(&order.symbol).map_err(|e| {
582 anyhow::anyhow!(
583 "Invalid option symbol '{}' in open buy order {} for {}: {}",
584 order.symbol,
585 order.order_id,
586 wallet,
587 e
588 )
589 })?;
590 Ok(acc + (order.price * order.remaining_size))
591 })
592 }
593
594 pub async fn publish_margin_update(&self, wallet: &WalletAddress) {
597 if let Err(error) = self.try_publish_margin_update(wallet).await {
598 tracing::error!("Failed to publish margin update for {}: {}", wallet, error);
599 }
600 }
601
602 async fn try_publish_margin_update(&self, wallet: &WalletAddress) -> anyhow::Result<()> {
603 let margin_mode = self.margin_mode_for_wallet(wallet).await?;
604
605 match margin_mode {
608 MarginMode::Portfolio => {
609 self.refresh_live_market_prices_for_wallet(wallet)
610 .await
611 .map_err(|error| {
612 anyhow::anyhow!(
613 "PM margin update unavailable due to strict repricing failure: {}",
614 error
615 )
616 })?;
617 }
618 MarginMode::Standard => {
619 if let Err(error) = self.refresh_live_market_prices_for_wallet(wallet).await {
620 tracing::warn!(
621 "Skipping live repricing before standard margin update for {}: {}",
622 wallet,
623 error
624 );
625 }
626 }
627 }
628
629 let snapshot = self
630 .compute_wallet_margin_snapshot_for_mode(wallet, margin_mode)
631 .await
632 .map_err(|error| {
633 anyhow::anyhow!(
634 "margin snapshot calculation failed for {}: {}",
635 wallet,
636 error
637 )
638 })?;
639
640 self.notify_subscribers(
642 wallet,
643 PortfolioUpdate::MarginUpdate {
644 span_margin: snapshot.span_margin,
645 total_margin_used: snapshot.total_margin_used,
646 available_balance: snapshot.available_balance,
647 timestamp: chrono::Utc::now().timestamp(),
648 },
649 )
650 .await;
651
652 Ok(())
653 }
654
655 async fn prune_applied_settlement_ghost_positions(
656 &self,
657 wallet: &WalletAddress,
658 ) -> anyhow::Result<usize> {
659 let option_symbols = match self.service.get_portfolio_balance(wallet).await {
660 Some(balance) => balance
661 .positions
662 .keys()
663 .filter(|symbol| ParsedSymbol::from_symbol(symbol).is_ok())
664 .cloned()
665 .collect::<Vec<_>>(),
666 None => return Ok(0),
667 };
668
669 if option_symbols.is_empty() {
670 return Ok(0);
671 }
672
673 let settlement = self.settlement.clone();
674 let wallet_owned = *wallet;
675 let settled_symbols = tokio::task::spawn_blocking(move || {
676 settlement.get_applied_settlement_symbols_sync(&wallet_owned, &option_symbols)
677 })
678 .await
679 .map_err(|e| anyhow::anyhow!("spawn_blocking join error: {}", e))?
680 .map_err(|error| {
681 anyhow::anyhow!(
682 "Failed to query applied settlements for {}: {}",
683 wallet,
684 error
685 )
686 })?;
687
688 for symbol in &settled_symbols {
689 tracing::warn!(
690 wallet = %wallet,
691 symbol,
692 "Removing settled ghost position from portfolio projection"
693 );
694 metrics::counter!("ht_portfolio_ghost_positions_repaired_total").increment(1);
695 self.service.remove_expired_position(wallet, symbol).await;
696 }
697
698 Ok(settled_symbols.len())
699 }
700
701 pub async fn publish_margin_updates_for_subscribers(&self) -> usize {
705 let wallets: Vec<WalletAddress> = {
706 let subscribers = self.subscribers.read().await;
707 subscribers.keys().cloned().collect()
708 };
709
710 for wallet in &wallets {
711 self.publish_margin_update(wallet).await;
712 }
713
714 wallets.len()
715 }
716
717 async fn has_portfolio_subscribers(&self, wallet: &WalletAddress) -> bool {
718 let subscribers = self.subscribers.read().await;
719 subscribers
720 .get(wallet)
721 .map(|account_subs| !account_subs.is_empty())
722 .unwrap_or(false)
723 }
724
725 async fn compute_wallet_live_greeks_snapshot(
726 &self,
727 wallet: &WalletAddress,
728 ) -> anyhow::Result<(Vec<PositionGreeksLeg>, Option<PortfolioGreeksAggregate>)> {
729 let greeks_cache = self
730 .greeks_cache
731 .read()
732 .await
733 .as_ref()
734 .cloned()
735 .ok_or_else(|| anyhow::anyhow!("GreeksCache not configured"))?;
736
737 let live_portfolio = self.service.get_portfolio(wallet).await;
738 let live_positions = live_portfolio
739 .positions
740 .into_iter()
741 .map(|p| (p.position.symbol, p.position.amount));
742 let net_quantities = build_net_position_quantities(live_positions, &[])
743 .map_err(|e| anyhow::anyhow!("Invalid position state for greeks update: {}", e))?;
744
745 if net_quantities.is_empty() {
746 return Ok((Vec::new(), None));
747 }
748
749 let mut contract_greeks = HashMap::with_capacity(net_quantities.len());
750 for symbol in net_quantities.keys() {
751 let greeks = greeks_cache.get_greeks(symbol).await.map_err(|e| {
752 anyhow::anyhow!(
753 "Failed to fetch greeks for symbol {} while publishing WS update: {}",
754 symbol,
755 e
756 )
757 })?;
758 contract_greeks.insert(symbol.clone(), greeks);
759 }
760
761 let response = calculate_portfolio_greeks(*wallet, &net_quantities, &contract_greeks)
762 .map_err(|e| anyhow::anyhow!("Failed to compute portfolio greeks: {}", e))?;
763
764 Ok((response.per_leg, response.aggregate))
765 }
766
767 pub async fn publish_greeks_update(&self, wallet: &WalletAddress) {
769 if !self.has_portfolio_subscribers(wallet).await {
770 return;
771 }
772
773 let (per_leg, aggregate) = match self.compute_wallet_live_greeks_snapshot(wallet).await {
774 Ok(snapshot) => snapshot,
775 Err(e) => {
776 tracing::debug!(
777 "Skipping greeks update publish for {} due to calculation error: {}",
778 wallet,
779 e
780 );
781 return;
782 }
783 };
784
785 self.notify_subscribers(
786 wallet,
787 PortfolioUpdate::GreeksUpdate {
788 per_leg,
789 aggregate,
790 timestamp: chrono::Utc::now().timestamp(),
791 },
792 )
793 .await;
794 }
795
796 pub async fn publish_greeks_updates_for_subscribers(&self) -> usize {
800 let wallets: Vec<WalletAddress> = {
801 let subscribers = self.subscribers.read().await;
802 subscribers.keys().cloned().collect()
803 };
804
805 for wallet in &wallets {
806 self.publish_greeks_update(wallet).await;
807 }
808
809 wallets.len()
810 }
811
812 pub fn get_service(&self) -> Arc<PortfolioServiceImpl> {
817 self.service.clone()
818 }
819
820 pub async fn reprice_all_wallets(&self) -> usize {
826 let wallets: Vec<WalletAddress> = {
827 let all = self.service.all_portfolios().await;
828 all.into_iter()
829 .filter(|(_, balance)| !balance.positions.is_empty())
830 .map(|(wallet, _)| wallet)
831 .collect()
832 };
833 let total = wallets.len();
834 let mut repriced = 0;
835 for wallet in &wallets {
836 match self.refresh_live_market_prices_for_wallet(wallet).await {
837 Ok(n) if n > 0 => {
838 repriced += 1;
839 tracing::debug!("Repriced {} symbols for wallet {}", n, wallet);
840 }
841 Ok(_) => {}
842 Err(e) => {
843 tracing::debug!("Could not reprice wallet {}: {}", wallet, e);
844 }
845 }
846 }
847 tracing::info!(
848 "Startup repricing: {}/{} wallets with positions repriced",
849 repriced,
850 total
851 );
852 repriced
853 }
854
855 async fn refresh_live_market_prices_for_wallet(
859 &self,
860 wallet: &WalletAddress,
861 ) -> anyhow::Result<usize> {
862 let _ = self
863 .prune_applied_settlement_ghost_positions(wallet)
864 .await?;
865
866 let portfolio_balance = match self.service.get_portfolio_balance(wallet).await {
867 Some(portfolio_balance) => portfolio_balance,
868 None => return Ok(0),
869 };
870 if portfolio_balance.positions.is_empty() {
871 return Ok(0);
872 }
873
874 let greeks_cache = match self.greeks_cache.read().await.as_ref().cloned() {
875 Some(greeks_cache) => greeks_cache,
876 None => {
877 return Err(anyhow::anyhow!(
878 "Greeks cache unavailable for portfolio repricing"
879 ))
880 }
881 };
882
883 let wallet_for_errors = wallet.to_string();
884 let mut symbols: Vec<String> = portfolio_balance
885 .positions
886 .keys()
887 .filter(|s| portfolio_perp_underlying(s).is_none())
888 .cloned()
889 .collect();
890 symbols.sort();
891 let fetches = symbols.into_iter().map(|symbol| {
892 let greeks_cache = greeks_cache.clone();
893 let wallet_for_errors = wallet_for_errors.clone();
894 async move {
895 let market_price = if ParsedSymbol::from_symbol(&symbol).is_ok() {
896 greeks_cache
897 .get_theoretical_price(&symbol)
898 .await
899 .map_err(|error| {
900 anyhow::anyhow!(
901 "Portfolio repricing unavailable for {} on {}: {}",
902 symbol,
903 wallet_for_errors,
904 error
905 )
906 })?
907 } else {
908 return Err(anyhow::anyhow!(
909 "Portfolio repricing unavailable for {} on {}: unsupported symbol",
910 symbol,
911 wallet_for_errors
912 ));
913 };
914
915 let market_price_decimal =
916 Decimal::from_f64_retain(market_price).ok_or_else(|| {
917 anyhow::anyhow!(
918 "Invalid repriced market price for {} on {}: {}",
919 symbol,
920 wallet_for_errors,
921 market_price
922 )
923 })?;
924 Ok::<(String, Decimal), anyhow::Error>((symbol, market_price_decimal))
925 }
926 });
927
928 let mut market_prices = HashMap::new();
929 for result in join_all(fetches).await {
930 match result {
931 Ok((symbol, market_price_decimal)) => {
932 market_prices.insert(symbol, market_price_decimal);
933 }
934 Err(e) => {
935 tracing::warn!("refresh_live_market_prices: wallet={}, error={}", wallet, e);
936 return Err(e);
937 }
938 }
939 }
940
941 if market_prices.is_empty() {
942 return Ok(0);
943 }
944
945 let updated_symbols = market_prices.len();
946 self.service.update_market_prices(market_prices).await;
947 Ok(updated_symbols)
948 }
949
950 async fn margin_mode_for_wallet(&self, wallet: &WalletAddress) -> anyhow::Result<MarginMode> {
951 let tier_cache = self
952 .tier_cache
953 .read()
954 .await
955 .as_ref()
956 .cloned()
957 .ok_or_else(|| anyhow::anyhow!("TierCache not configured"))?;
958 tier_cache.get_margin_mode(wallet).await
959 }
960
961 pub async fn publish_position_updates(&self, wallet: &WalletAddress) {
963 if !self.has_portfolio_subscribers(wallet).await {
964 return;
965 }
966
967 let positions = match self.build_enriched_positions(wallet).await {
968 Ok(positions) => positions,
969 Err(error) => {
970 tracing::debug!(
971 "Skipping position update publish for {} due to calculation error: {}",
972 wallet,
973 error
974 );
975 return;
976 }
977 };
978
979 let timestamp = chrono::Utc::now().timestamp();
980 for position in positions {
981 self.notify_subscribers(
982 wallet,
983 PortfolioUpdate::PositionUpdate {
984 position,
985 timestamp,
986 },
987 )
988 .await;
989 }
990 }
991
992 pub async fn publish_position_updates_for_subscribers(&self) -> usize {
994 let wallets: Vec<WalletAddress> = {
995 let subscribers = self.subscribers.read().await;
996 subscribers.keys().cloned().collect()
997 };
998
999 for wallet in &wallets {
1000 self.publish_position_updates(wallet).await;
1001 }
1002
1003 wallets.len()
1004 }
1005
1006 async fn build_enriched_positions(
1007 &self,
1008 wallet: &WalletAddress,
1009 ) -> anyhow::Result<Vec<PositionWithMetrics>> {
1010 if let Err(error) = self.refresh_live_market_prices_for_wallet(wallet).await {
1012 tracing::warn!(
1013 "Failed to refresh live market prices for {} while building positions: {}",
1014 wallet,
1015 error
1016 );
1017 }
1018 let mut portfolio = self.service.get_portfolio(wallet).await;
1019 let margin_snapshot = match self.compute_wallet_margin_snapshot(wallet).await {
1020 Ok(snapshot) => snapshot,
1021 Err(e) => {
1022 tracing::warn!(
1023 "Margin snapshot unavailable for {} while building WS positions: {}",
1024 wallet,
1025 e
1026 );
1027 return Ok(portfolio.positions);
1028 }
1029 };
1030 let mode = margin_snapshot.mode;
1031 portfolio.margin_mode = mode.as_str().to_string();
1032 portfolio.margin_summary = Some(margin_snapshot.margin_summary);
1033 portfolio.span_margin = Some(margin_snapshot.span_margin);
1034 portfolio.available_balance = margin_snapshot.available_balance;
1035 portfolio.total_margin_used = margin_snapshot.total_margin_used;
1036 enrich_position_metrics(
1037 mode,
1038 margin_snapshot
1039 .standard_position_contributions
1040 .map(|contributions| {
1041 contributions
1042 .into_iter()
1043 .map(|(symbol, contribution)| {
1044 (
1045 symbol,
1046 PositionMarginMetrics {
1047 initial_margin: contribution.initial_margin,
1048 maintenance_margin: contribution.maintenance_margin,
1049 },
1050 )
1051 })
1052 .collect()
1053 }),
1054 margin_snapshot.standard_option_marks,
1055 &mut portfolio,
1056 )
1057 .map_err(|status| {
1058 anyhow::anyhow!(
1059 "Failed to enrich position metrics for {}: status={}",
1060 wallet,
1061 status
1062 )
1063 })?;
1064 Ok(portfolio.positions)
1065 }
1066
1067 async fn build_enriched_position_update(
1068 &self,
1069 wallet: &WalletAddress,
1070 symbol: &str,
1071 ) -> anyhow::Result<PositionWithMetrics> {
1072 let positions = self.build_enriched_positions(wallet).await?;
1073 if let Some(position) = positions.into_iter().find(|p| p.position.symbol == symbol) {
1074 return Ok(position);
1075 }
1076
1077 Ok(PositionWithMetrics {
1078 position: Position {
1079 wallet_address: *wallet,
1080 symbol: symbol.to_string(),
1081 amount: dec!(0),
1082 entry_price: dec!(0),
1083 margin_posted: dec!(0),
1084 realized_pnl: dec!(0),
1085 unrealized_pnl: dec!(0),
1086 updated_at: chrono::Utc::now(),
1087 },
1088 notional_value: dec!(0),
1089 maintenance_margin: dec!(0),
1090 liquidation_price: dec!(0),
1091 margin_ratio: dec!(0),
1092 })
1093 }
1094
1095 pub async fn initialize(&self) -> Result<i64> {
1098 let loader = DbPortfolioSnapshotLoader::new(self.db.clone());
1099
1100 match loader.load_latest() {
1101 Ok(Some((snapshot_id, state))) => {
1102 if let Some(next_command_id) = state
1103 .offsets
1104 .get(ENGINE_COMMAND_SNAPSHOT_STREAM)
1105 .and_then(|partitions| partitions.get(&0))
1106 .copied()
1107 {
1108 self.service
1109 .clear_all()
1110 .await
1111 .map_err(|e| anyhow::anyhow!("Failed clearing portfolio state: {}", e))?;
1112 for (wallet, balance) in state.states {
1113 self.service.restore(&wallet, balance).await.map_err(|e| {
1114 anyhow::anyhow!(
1115 "Failed restoring portfolio state for {}: {}",
1116 wallet,
1117 e
1118 )
1119 })?;
1120 }
1121
1122 tracing::info!(
1123 "Restored portfolio from snapshot id={} with journal replay boundary command_id={}",
1124 snapshot_id,
1125 next_command_id
1126 );
1127 self.sync_status.set_catching_up();
1128 tracing::info!(
1129 "Portfolio sync status: CatchingUp (snapshot restored, awaiting journal replay)"
1130 );
1131 Ok(next_command_id)
1132 } else {
1133 tracing::warn!(
1134 "Ignoring legacy portfolio snapshot id={} without {} boundary, rebuilding from journal",
1135 snapshot_id,
1136 ENGINE_COMMAND_SNAPSHOT_STREAM
1137 );
1138 self.service.clear_all().await.map_err(|e| {
1139 anyhow::anyhow!("Failed clearing legacy portfolio state: {}", e)
1140 })?;
1141 self.sync_status.set_catching_up();
1142 Ok(1)
1143 }
1144 }
1145 Ok(None) => {
1146 tracing::info!("No portfolio snapshots found, rebuilding from journal");
1147 self.service
1148 .clear_all()
1149 .await
1150 .map_err(|e| anyhow::anyhow!("Failed clearing portfolio state: {}", e))?;
1151 self.sync_status.set_catching_up();
1152 tracing::info!(
1153 "Portfolio sync status: CatchingUp (no snapshot, replaying journal from genesis)"
1154 );
1155 Ok(1)
1156 }
1157 Err(e) => {
1158 tracing::error!("Failed to restore portfolio from snapshot: {}", e);
1159 Err(anyhow::anyhow!("Snapshot restore failed: {}", e))
1160 }
1161 }
1162 }
1163
1164 pub async fn handle_hypercore_position_update(&self, update: HypercorePositionUpdate) {
1169 if update.snapshot {
1170 self.service.set_hypercore_position(&update).await;
1171 } else {
1172 self.service.apply_hypercore_position_update(&update).await;
1173 }
1174
1175 let wallet_normalized = update.account.to_lowercase();
1177 let timestamp = chrono::Utc::now().timestamp();
1178 let position_symbol = canonical_perp_symbol(&update.coin);
1179
1180 if let Ok(wallet) = WalletAddress::from_str(&wallet_normalized) {
1181 if !self.has_portfolio_subscribers(&wallet).await {
1182 return;
1183 }
1184
1185 let position = match self
1186 .build_enriched_position_update(&wallet, &position_symbol)
1187 .await
1188 {
1189 Ok(position) => position,
1190 Err(e) => {
1191 tracing::error!(
1192 "Failed to build hypercore position update for {}/{}: {}",
1193 wallet,
1194 update.coin,
1195 e
1196 );
1197 return;
1198 }
1199 };
1200 self.notify_subscribers(
1201 &wallet,
1202 PortfolioUpdate::PositionUpdate {
1203 position,
1204 timestamp,
1205 },
1206 )
1207 .await;
1208 self.publish_margin_update(&wallet).await;
1209 self.publish_greeks_update(&wallet).await;
1210 }
1211 }
1212
1213 pub async fn handle_engine_message(&self, message: EngineMessage, seq: i64) {
1218 let pending_notifications = {
1220 let _guard = self.lock_projection_barrier().await;
1221 self.handle_engine_message_under_barrier(message, seq).await
1222 };
1223 self.send_pending_notifications(pending_notifications).await;
1225 }
1226
1227 pub(crate) async fn handle_engine_message_under_barrier(
1228 &self,
1229 message: EngineMessage,
1230 seq: i64,
1231 ) -> PendingNotifications {
1232 match &message {
1233 EngineMessage::OrderFilled { fill, .. } => {
1234 self.apply_fill_state_mutation(fill, seq).await;
1235 PendingNotifications::Fill { fill: fill.clone() }
1236 }
1237 EngineMessage::PositionExpired(ref expiry_msg) => {
1238 let change = self
1239 .apply_position_expired_state_mutation(expiry_msg, Some(seq))
1240 .await;
1241 PendingNotifications::Changes(vec![change])
1242 }
1243 _ => {
1244 let changes = match self.service.apply_event(&message).await {
1245 Ok(c) => c,
1246 Err(e) => {
1247 panic!("CRITICAL: Portfolio event processing failed: {}", e);
1248 }
1249 };
1250
1251 let mut last_seq = self.last_processed_seq.write().await;
1252 *last_seq = seq;
1253 drop(last_seq);
1254
1255 PendingNotifications::Changes(changes)
1256 }
1257 }
1258 }
1259
1260 pub(crate) async fn handle_order_filled_under_barrier(
1261 &self,
1262 fill: &hypercall_types::Fill,
1263 seq: i64,
1264 ) {
1265 self.apply_fill_state_mutation(fill, seq).await;
1266 }
1267
1268 pub(crate) async fn handle_position_expired_under_barrier(
1269 &self,
1270 expiry_msg: &PositionExpiredMessage,
1271 seq: Option<i64>,
1272 ) -> PendingNotifications {
1273 let change = self
1274 .apply_position_expired_state_mutation(expiry_msg, seq)
1275 .await;
1276 PendingNotifications::Changes(vec![change])
1277 }
1278
1279 pub(crate) async fn handle_replayed_position_expired_projection_under_barrier(
1280 &self,
1281 expiry_msg: &PositionExpiredMessage,
1282 ) -> PendingNotifications {
1283 self.service
1284 .remove_expired_position(&expiry_msg.wallet_address, &expiry_msg.symbol)
1285 .await;
1286
1287 tracing::info!(
1288 "PortfolioCache: Applied replayed expiry projection for {}/{}",
1289 expiry_msg.wallet_address,
1290 expiry_msg.symbol,
1291 );
1292
1293 let (position_change, total_margin_used) = self
1294 .get_position_and_margin_for_notification(
1295 &expiry_msg.wallet_address,
1296 &expiry_msg.symbol,
1297 )
1298 .await;
1299
1300 PendingNotifications::Changes(vec![PortfolioChange {
1301 wallet: expiry_msg.wallet_address,
1302 position_changes: vec![position_change],
1303 balance_change: None,
1304 total_margin_used,
1305 }])
1306 }
1307
1308 pub(crate) async fn handle_option_custody_delta(
1309 &self,
1310 wallet: WalletAddress,
1311 symbol: String,
1312 quantity_delta: Decimal,
1313 seq: Option<i64>,
1314 ) {
1315 let pending = {
1316 let _guard = self.lock_projection_barrier().await;
1317 self.handle_option_custody_delta_under_barrier(wallet, symbol, quantity_delta, seq)
1318 .await
1319 };
1320 self.send_pending_notifications(pending).await;
1321 }
1322
1323 pub(crate) async fn handle_option_custody_delta_under_barrier(
1324 &self,
1325 wallet: WalletAddress,
1326 symbol: String,
1327 quantity_delta: Decimal,
1328 seq: Option<i64>,
1329 ) -> PendingNotifications {
1330 let change = self
1331 .service
1332 .apply_option_custody_delta(&wallet, &symbol, quantity_delta)
1333 .await;
1334 if let Some(seq) = seq {
1335 let mut last_seq = self.last_processed_seq.write().await;
1336 *last_seq = seq;
1337 }
1338 PendingNotifications::Changes(vec![change])
1339 }
1340
1341 pub async fn replay_option_custody_delta(
1342 &self,
1343 wallet: WalletAddress,
1344 symbol: String,
1345 quantity_delta: Decimal,
1346 seq: i64,
1347 ) {
1348 let _guard = self.lock_projection_barrier().await;
1349 self.handle_option_custody_delta_under_barrier(wallet, symbol, quantity_delta, Some(seq))
1350 .await;
1351 }
1352
1353 pub(crate) async fn send_fill_notifications(&self, fill: &hypercall_types::Fill) {
1357 let timestamp = chrono::Utc::now().timestamp();
1358
1359 let (taker_pos, taker_margin) = self
1360 .get_position_and_margin_for_notification(&fill.taker_wallet_address, &fill.symbol)
1361 .await;
1362 let taker_change = PortfolioChange {
1363 wallet: fill.taker_wallet_address,
1364 position_changes: vec![taker_pos],
1365 balance_change: None,
1366 total_margin_used: taker_margin,
1367 };
1368 self.send_change_notifications(&taker_change, timestamp)
1369 .await;
1370
1371 let (maker_pos, maker_margin) = self
1372 .get_position_and_margin_for_notification(&fill.maker_wallet_address, &fill.symbol)
1373 .await;
1374 let maker_change = PortfolioChange {
1375 wallet: fill.maker_wallet_address,
1376 position_changes: vec![maker_pos],
1377 balance_change: None,
1378 total_margin_used: maker_margin,
1379 };
1380 self.send_change_notifications(&maker_change, timestamp)
1381 .await;
1382 }
1383
1384 pub async fn replay_journal_fill(&self, fill: &hypercall_types::Fill, seq: i64) {
1385 let _guard = self.lock_projection_barrier().await;
1386 self.apply_fill_state_mutation(fill, seq).await;
1387 }
1388
1389 async fn apply_fill_state_mutation(&self, fill: &hypercall_types::Fill, seq: i64) {
1394 let taker_side_orderbook = fill.taker_side;
1395 let maker_side = match fill.taker_side {
1396 hypercall_types::Side::Buy => hypercall_types::Side::Sell,
1397 hypercall_types::Side::Sell => hypercall_types::Side::Buy,
1398 };
1399 let size_human = to_human_readable_decimal(&fill.symbol, fill.size);
1400
1401 self.service
1402 .apply_fill_to_memory_both_sides(
1403 &fill.taker_wallet_address,
1404 &fill.maker_wallet_address,
1405 &fill.symbol,
1406 &taker_side_orderbook,
1407 &maker_side,
1408 fill.price,
1409 size_human,
1410 )
1411 .await;
1412
1413 let mut last_seq = self.last_processed_seq.write().await;
1414 *last_seq = seq;
1415 }
1416
1417 async fn apply_position_expired_state_mutation(
1418 &self,
1419 expiry_msg: &PositionExpiredMessage,
1420 seq: Option<i64>,
1421 ) -> PortfolioChange {
1422 let settlement = self.settlement.clone();
1423 let expiry_msg_for_repair = expiry_msg.clone();
1424 let settlement_outcome = tokio::task::spawn_blocking(move || {
1425 settlement.try_apply_settlement_sync(
1426 &expiry_msg_for_repair.wallet_address,
1427 &expiry_msg_for_repair.symbol,
1428 expiry_msg_for_repair.position_size,
1429 expiry_msg_for_repair.settlement_price,
1430 expiry_msg_for_repair.settlement_value,
1431 expiry_msg_for_repair.margin_mode,
1432 expiry_msg_for_repair.timestamp as i64,
1433 expiry_msg_for_repair.settlement_entry_price,
1434 expiry_msg_for_repair.cost_basis,
1435 expiry_msg_for_repair.net_pnl,
1436 )
1437 })
1438 .await
1439 .unwrap_or_else(|error| {
1440 panic!(
1441 "CRITICAL: settlement repair task join failed for {}/{}: {}",
1442 expiry_msg.wallet_address, expiry_msg.symbol, error
1443 )
1444 })
1445 .unwrap_or_else(|error| {
1446 metrics::counter!("ht_settlement_apply_failures_total").increment(1);
1447 panic!(
1448 "CRITICAL: failed to confirm or repair settlement accounting for {}/{}: {}",
1449 expiry_msg.wallet_address, expiry_msg.symbol, error
1450 )
1451 });
1452
1453 if settlement_outcome.newly_persisted {
1454 tracing::warn!(
1455 wallet = %expiry_msg.wallet_address,
1456 symbol = %expiry_msg.symbol,
1457 "Repaired missing settlement payout before portfolio projection"
1458 );
1459 } else {
1460 let confirmed_applied = self
1461 .settlement
1462 .is_settlement_ledger_applied_sync(&expiry_msg.wallet_address, &expiry_msg.symbol)
1463 .unwrap_or_else(|error| {
1464 panic!(
1465 "CRITICAL: DB error checking repaired settlement state for {}/{}: {}. \
1466 Panicking to allow replay.",
1467 expiry_msg.wallet_address, expiry_msg.symbol, error
1468 )
1469 });
1470 if !confirmed_applied {
1471 metrics::counter!("ht_settlement_apply_failures_total").increment(1);
1472 panic!(
1473 "CRITICAL: settlement repair completed but ledger_applied is still false for {}/{}",
1474 expiry_msg.wallet_address, expiry_msg.symbol
1475 )
1476 }
1477 }
1478
1479 self.service
1480 .remove_expired_position(&expiry_msg.wallet_address, &expiry_msg.symbol)
1481 .await;
1482
1483 tracing::info!(
1484 "PortfolioCache: Applied settled expiry projection for {}/{}",
1485 expiry_msg.wallet_address,
1486 expiry_msg.symbol,
1487 );
1488 metrics::counter!("ht_settlement_apply_success_total").increment(1);
1489
1490 if let Some(seq) = seq {
1491 let mut last_seq = self.last_processed_seq.write().await;
1492 *last_seq = seq;
1493 }
1494
1495 let (position_change, total_margin_used) = self
1496 .get_position_and_margin_for_notification(
1497 &expiry_msg.wallet_address,
1498 &expiry_msg.symbol,
1499 )
1500 .await;
1501
1502 PortfolioChange {
1503 wallet: expiry_msg.wallet_address,
1504 position_changes: vec![position_change],
1505 balance_change: None,
1506 total_margin_used,
1507 }
1508 }
1509
1510 pub(crate) async fn send_pending_notifications(&self, pending: PendingNotifications) {
1512 match pending {
1513 PendingNotifications::Fill { fill } => self.send_fill_notifications(&fill).await,
1514 PendingNotifications::Changes(changes) => {
1515 let timestamp = chrono::Utc::now().timestamp();
1516 for change in &changes {
1517 self.send_change_notifications(change, timestamp).await;
1518 }
1519 }
1520 }
1521 }
1522
1523 async fn send_change_notifications(&self, change: &PortfolioChange, timestamp: i64) {
1528 for pos_change in &change.position_changes {
1530 let position = self
1533 .build_enriched_position_update(&change.wallet, &pos_change.symbol)
1534 .await
1535 .expect("build_enriched_position_update failed after successful fill — margin state is inconsistent");
1536 self.notify_subscribers(
1537 &change.wallet,
1538 PortfolioUpdate::PositionUpdate {
1539 position,
1540 timestamp,
1541 },
1542 )
1543 .await;
1544 }
1545
1546 self.notify_subscribers(
1548 &change.wallet,
1549 PortfolioUpdate::BalanceUpdate {
1550 total_margin_used: change.total_margin_used,
1551 timestamp,
1552 },
1553 )
1554 .await;
1555
1556 self.publish_greeks_update(&change.wallet).await;
1557 }
1558
1559 async fn get_position_and_margin_for_notification(
1564 &self,
1565 wallet: &WalletAddress,
1566 symbol: &str,
1567 ) -> (crate::portfolio::PositionChange, Decimal) {
1568 if let Some(balance) = self.service.get_portfolio_balance(wallet).await {
1569 let margin = balance.total_margin_used;
1570 if let Some(pos) = balance.positions.get(symbol) {
1571 return (
1572 crate::portfolio::PositionChange {
1573 symbol: symbol.to_string(),
1574 amount: pos.amount,
1575 entry_price: pos.entry_price,
1576 margin_posted: pos.margin_posted,
1577 realized_pnl: pos.realized_pnl,
1578 unrealized_pnl: pos.unrealized_pnl,
1579 },
1580 margin,
1581 );
1582 }
1583 return (
1585 crate::portfolio::PositionChange {
1586 symbol: symbol.to_string(),
1587 amount: dec!(0),
1588 entry_price: dec!(0),
1589 margin_posted: dec!(0),
1590 realized_pnl: dec!(0),
1591 unrealized_pnl: dec!(0),
1592 },
1593 margin,
1594 );
1595 }
1596 (
1598 crate::portfolio::PositionChange {
1599 symbol: symbol.to_string(),
1600 amount: dec!(0),
1601 entry_price: dec!(0),
1602 margin_posted: dec!(0),
1603 realized_pnl: dec!(0),
1604 unrealized_pnl: dec!(0),
1605 },
1606 dec!(0),
1607 )
1608 }
1609
1610 pub async fn subscribe(
1612 &self,
1613 account: WalletAddress,
1614 ) -> (SubscriberId, mpsc::UnboundedReceiver<PortfolioUpdate>) {
1615 let (tx, rx) = mpsc::unbounded_channel();
1616
1617 let mut next_id = self.next_subscriber_id.write().await;
1619 let subscriber_id = *next_id;
1620 *next_id += 1;
1621 drop(next_id);
1622
1623 let positions = match self.build_enriched_positions(&account).await {
1624 Ok(positions) => positions,
1625 Err(e) => {
1626 tracing::error!(
1627 "Failed to build initial portfolio snapshot for {}: {}",
1628 account,
1629 e
1630 );
1631 return (subscriber_id, rx);
1632 }
1633 };
1634
1635 let mut subscribers = self.subscribers.write().await;
1637 subscribers
1638 .entry(account)
1639 .or_insert_with(HashMap::new)
1640 .insert(subscriber_id, tx.clone());
1641 drop(subscribers);
1642
1643 let _ = tx.send(PortfolioUpdate::Initial {
1644 positions,
1645 timestamp: chrono::Utc::now().timestamp(),
1646 });
1647
1648 self.publish_margin_update(&account).await;
1651 self.publish_greeks_update(&account).await;
1652
1653 (subscriber_id, rx)
1654 }
1655
1656 pub async fn unsubscribe(&self, account: &WalletAddress, subscriber_id: SubscriberId) {
1658 let mut subscribers = self.subscribers.write().await;
1659 if let Some(account_subs) = subscribers.get_mut(account) {
1660 account_subs.remove(&subscriber_id);
1661 if account_subs.is_empty() {
1662 subscribers.remove(account);
1663 }
1664 }
1665 }
1666
1667 pub async fn subscriber_count_for_wallet(&self, account: &WalletAddress) -> usize {
1669 let subscribers = self.subscribers.read().await;
1670 subscribers
1671 .get(account)
1672 .map(|account_subs| account_subs.len())
1673 .unwrap_or(0)
1674 }
1675
1676 async fn notify_subscribers(&self, account: &WalletAddress, update: PortfolioUpdate) {
1678 let failed_subscribers: Vec<SubscriberId> = {
1679 let subscribers = self.subscribers.read().await;
1680 let Some(account_subs) = subscribers.get(account) else {
1681 return;
1682 };
1683
1684 account_subs
1685 .iter()
1686 .filter_map(|(subscriber_id, tx)| {
1687 tx.send(update.clone()).err().map(|_| *subscriber_id)
1688 })
1689 .collect()
1690 };
1691
1692 if failed_subscribers.is_empty() {
1693 return;
1694 }
1695
1696 let mut subscribers = self.subscribers.write().await;
1697 if let Some(account_subs) = subscribers.get_mut(account) {
1698 for subscriber_id in failed_subscribers {
1699 account_subs.remove(&subscriber_id);
1700 }
1701
1702 if account_subs.is_empty() {
1703 subscribers.remove(account);
1704 }
1705 }
1706 }
1707
1708 pub async fn process_test_fill(
1709 &self,
1710 account: &WalletAddress,
1711 option_id: &str,
1712 side: &str,
1713 price: Decimal,
1714 volume: Decimal,
1715 ) {
1716 use hypercall_types::Fill;
1717 use hypercall_types::Side;
1718
1719 let side_enum = match side {
1720 "buy" => Side::Buy,
1721 "sell" => Side::Sell,
1722 _ => return,
1723 };
1724
1725 let size_contract_units = to_contract_units_decimal(option_id, volume);
1726
1727 let fill = Fill {
1728 trade_id: 0,
1729 taker_order_id: 0,
1730 maker_order_id: 0,
1731 symbol: option_id.to_string(),
1732 price,
1733 size: size_contract_units,
1734 taker_side: side_enum,
1735 taker_wallet_address: *account,
1736 maker_wallet_address: WalletAddress::from(alloy::primitives::Address::ZERO),
1737 fee: dec!(0),
1738 is_taker: true,
1739 timestamp: 0,
1740 builder_code_address: None,
1741 builder_code_fee: None,
1742 source: Default::default(),
1743 taker_realized_pnl: None,
1744 maker_realized_pnl: None,
1745 underlying_notional: None,
1746 };
1747
1748 self.service
1749 .apply_event(&EngineMessage::OrderFilled {
1750 accounting: hypercall_engine::FillAccounting::from_fill(&fill),
1751 fill,
1752 })
1753 .await
1754 .unwrap();
1755 }
1756
1757 pub async fn get_portfolio(&self, account: &WalletAddress) -> Result<Portfolio> {
1759 if let Err(error) = self.refresh_live_market_prices_for_wallet(account).await {
1760 tracing::warn!(
1761 "Failed to refresh live market prices for {} before REST portfolio read: {}",
1762 account,
1763 error
1764 );
1765 }
1766 Ok(self.service.get_portfolio(account).await)
1767 }
1768
1769 pub async fn get_portfolio_fail_closed_pm(&self, account: &WalletAddress) -> Result<Portfolio> {
1770 self.refresh_live_market_prices_for_wallet(account)
1771 .await
1772 .map_err(|error| anyhow::anyhow!("PM portfolio read unavailable: {}", error))?;
1773 Ok(self.service.get_portfolio(account).await)
1774 }
1775
1776 pub async fn has_live_position_symbol(&self, wallet: &WalletAddress, symbol: &str) -> bool {
1777 self.service
1778 .get_portfolio_balance(wallet)
1779 .await
1780 .map(|balance| balance.positions.contains_key(symbol))
1781 .unwrap_or(false)
1782 }
1783
1784 pub async fn has_portfolio(&self, account: &WalletAddress) -> bool {
1786 self.service.get_portfolio_balance(account).await.is_some()
1787 }
1788
1789 pub async fn update_market_prices(&self, prices: HashMap<String, Decimal>) {
1791 self.service.update_market_prices(prices).await;
1793 }
1794
1795 pub async fn get_last_processed_seq(&self) -> i64 {
1797 *self.last_processed_seq.read().await
1798 }
1799
1800 pub async fn capture_snapshot_state(&self) -> (i64, HashMap<WalletAddress, PortfolioBalance>) {
1802 let _guard = self.lock_projection_barrier().await;
1803 let seq = *self.last_processed_seq.read().await;
1804 let portfolios = self.service.all_portfolios().await;
1805 (seq, portfolios)
1806 }
1807
1808 pub async fn capture_portfolios_under_barrier(
1809 &self,
1810 _barrier_proof: &OwnedMutexGuard<()>,
1811 ) -> HashMap<WalletAddress, PortfolioBalance> {
1812 self.service.all_portfolios().await
1813 }
1814
1815 pub async fn get_all_portfolios(&self) -> HashMap<WalletAddress, PortfolioSummary> {
1819 let portfolios = self.service.all_portfolios().await;
1820 let mut result = HashMap::new();
1821
1822 for (wallet, balance) in portfolios {
1823 let positions: HashMap<String, PositionSummary> = balance
1825 .positions
1826 .iter()
1827 .map(|(symbol, pos)| {
1828 (
1829 symbol.clone(),
1830 PositionSummary {
1831 symbol: pos.symbol.clone(),
1832 amount: pos.amount,
1833 entry_price: pos.entry_price,
1834 realized_pnl: pos.realized_pnl,
1835 unrealized_pnl: pos.unrealized_pnl,
1836 },
1837 )
1838 })
1839 .collect();
1840
1841 result.insert(
1842 wallet,
1843 PortfolioSummary {
1844 positions,
1845 margin_info: None, },
1847 );
1848 }
1849
1850 result
1851 }
1852}
1853
1854#[derive(Debug, Clone)]
1856pub struct PortfolioSummary {
1857 pub positions: HashMap<String, PositionSummary>,
1858 pub margin_info: Option<MarginInfo>,
1859}
1860
1861#[derive(Debug, Clone)]
1863pub struct PositionSummary {
1864 pub symbol: String,
1865 pub amount: Decimal,
1866 pub entry_price: Decimal,
1867 pub realized_pnl: Decimal,
1868 pub unrealized_pnl: Decimal,
1869}
1870
1871#[derive(Debug, Clone)]
1873pub struct MarginInfo {
1874 pub equity: Decimal,
1875 pub initial_margin: Decimal,
1876 pub maintenance_margin: Decimal,
1877}
1878
1879use crate::shared::order_types::perp_underlying as portfolio_perp_underlying;
1880
1881#[cfg(test)]
1882mod tests {
1883 use super::*;
1884 use crate::messaging::{EventBusTrait, MockEventBus};
1885 use crate::price_oracle::hyperliquid_oracle::{
1886 HyperliquidMarkPriceOracle, HyperliquidOracleConfig,
1887 };
1888 use crate::read_cache::tier::TierCache;
1889 use crate::rsm::engine_snapshot::{
1890 MockOrderSnapshotProvider, MockQuoteProvider, SnapshotOpenOrdersSource,
1891 };
1892 use crate::rsm::ledger::InMemoryLedger;
1893 use crate::rsm::margin_service::SpanMarginService;
1894 use crate::rsm::MarginMode;
1895 use crate::snapshot::{DbPortfolioSnapshotWriter, SnapshotWriter};
1896 use crate::standard_margin::StandardMarginService;
1897 use crate::types::{Config, Scenario, ScenarioType};
1898 use crate::vol_oracle::FixedTestRiskVolOracle;
1899 use hypercall_engine::FeeConfig;
1900 use hypercall_runtime_api::{OrderSnapshotProvider, QuoteProvider, SnapshotBookQuote};
1901 use hypercall_types::Fill;
1902 use hypercall_types::Side;
1903 use sqlx::postgres::{PgConnectOptions, PgPoolOptions};
1904 use std::collections::{HashMap, HashSet};
1905 use testcontainers::runners::AsyncRunner;
1906 use testcontainers::ContainerAsync;
1907 use testcontainers::ImageExt;
1908 use testcontainers_modules::postgres::Postgres;
1909 use tokio::time::{sleep, timeout, Duration};
1910 use uuid::Uuid;
1911
1912 use hypercall_types::wallet_address::test_wallet;
1913
1914 struct TestContext {
1915 cache: Arc<PortfolioCache>,
1916 diesel_db: Arc<hypercall_db_diesel::DieselDb>,
1917 sqlx_connect_options: PgConnectOptions,
1918 _container: Option<ContainerAsync<Postgres>>,
1919 }
1920
1921 fn normalize_test_database_url(base_database_url: &str, database_name: Option<&str>) -> String {
1922 let mut parts = base_database_url.splitn(2, '?');
1923 let base = parts
1924 .next()
1925 .expect("DATABASE_URL must contain a URL base component");
1926 let normalized_base = match parts.next() {
1927 Some(query) => {
1928 let mut seen_keys = HashSet::new();
1929 let normalized_query = query
1930 .replace('?', "&")
1931 .split('&')
1932 .filter(|pair| !pair.is_empty())
1933 .filter(|pair| {
1934 let key = pair.split('=').next().unwrap_or_default();
1935 seen_keys.insert(key.to_string())
1936 })
1937 .collect::<Vec<_>>()
1938 .join("&");
1939 if normalized_query.is_empty() {
1940 base.to_string()
1941 } else {
1942 format!("{base}?{normalized_query}")
1943 }
1944 }
1945 None => base.to_string(),
1946 };
1947
1948 let Some(database_name) = database_name else {
1949 return normalized_base;
1950 };
1951
1952 let (normalized_base, normalized_query) = match normalized_base.split_once('?') {
1953 Some((base, query)) => (base, Some(query)),
1954 None => (normalized_base.as_str(), None),
1955 };
1956 let (base_prefix, _) = normalized_base
1957 .rsplit_once('/')
1958 .expect("DATABASE_URL must include a database path");
1959 let rewritten_base = format!("{base_prefix}/{database_name}");
1960 match normalized_query {
1961 Some(query) if !query.is_empty() => format!("{rewritten_base}?{query}"),
1962 _ => rewritten_base,
1963 }
1964 }
1965
1966 async fn try_setup_test_cache_from_shared_database(
1967 base_database_url: &str,
1968 ) -> Result<TestContext, String> {
1969 let database_name = format!("portfolio_cache_test_{}", Uuid::now_v7().simple());
1970 let admin_database_url = normalize_test_database_url(base_database_url, None);
1971 let admin_pool = PgPoolOptions::new()
1972 .max_connections(1)
1973 .min_connections(1)
1974 .acquire_timeout(Duration::from_secs(30))
1975 .connect(&admin_database_url)
1976 .await
1977 .map_err(|error| {
1978 format!("failed to connect to shared PostgreSQL test database: {error}")
1979 })?;
1980 sqlx::query(&format!("CREATE DATABASE \"{}\"", database_name))
1981 .execute(&admin_pool)
1982 .await
1983 .map_err(|error| {
1984 format!("failed to create isolated PostgreSQL test database: {error}")
1985 })?;
1986 admin_pool.close().await;
1987
1988 let database_url = normalize_test_database_url(base_database_url, Some(&database_name));
1989 let sqlx_connect_options = PgConnectOptions::from_str(&database_url).map_err(|error| {
1990 format!("isolated PostgreSQL test database URL must parse: {error}")
1991 })?;
1992 let diesel_handler = Arc::new(
1993 DatabaseHandler::new(&database_url)
1994 .map_err(|error| format!("failed to initialize DieselEventHandler: {error:#}"))?,
1995 );
1996 let cache = Arc::new(PortfolioCache::new(diesel_handler));
1997 let _ = cache
1998 .initialize()
1999 .await
2000 .map_err(|error| format!("failed to initialize portfolio cache: {error}"))?;
2001
2002 let diesel_db = Arc::new(
2003 hypercall_db_diesel::DieselDb::new_no_tls(&database_url, 2)
2004 .await
2005 .map_err(|error| format!("failed to initialize DieselDb: {error:#}"))?,
2006 );
2007
2008 Ok(TestContext {
2009 cache,
2010 diesel_db,
2011 sqlx_connect_options,
2012 _container: None,
2013 })
2014 }
2015
2016 async fn setup_test_cache() -> TestContext {
2017 if let Ok(base_database_url) = std::env::var("TEST_DATABASE_URL") {
2018 match try_setup_test_cache_from_shared_database(&base_database_url).await {
2019 Ok(context) => return context,
2020 Err(error) => {
2021 eprintln!(
2022 "shared PostgreSQL test database unavailable for portfolio_cache tests: {error}; falling back to testcontainer"
2023 );
2024 }
2025 }
2026 }
2027
2028 let postgres_image = Postgres::default()
2041 .with_db_name("test_db")
2042 .with_user("test_user")
2043 .with_password("test_password")
2044 .with_startup_timeout(Duration::from_secs(120))
2045 .with_tag("16-alpine");
2046
2047 let container = postgres_image
2048 .start()
2049 .await
2050 .expect("Failed to start PostgreSQL container");
2051 let port = container
2052 .get_host_port_ipv4(5432)
2053 .await
2054 .expect("Failed to get port");
2055
2056 let database_url = format!(
2057 "postgresql://test_user:test_password@127.0.0.1:{}/test_db",
2058 port
2059 );
2060
2061 let mut last_ready_error = None;
2065 for _ in 0..60 {
2066 match PgPoolOptions::new()
2067 .max_connections(1)
2068 .min_connections(1)
2069 .acquire_timeout(Duration::from_secs(5))
2070 .connect(&database_url)
2071 .await
2072 {
2073 Ok(pool) => match sqlx::query("SELECT 1").execute(&pool).await {
2074 Ok(_) => {
2075 pool.close().await;
2076 last_ready_error = None;
2077 break;
2078 }
2079 Err(error) => {
2080 last_ready_error = Some(format!("readiness query failed: {error}"));
2081 pool.close().await;
2082 }
2083 },
2084 Err(error) => {
2085 last_ready_error = Some(format!("failed to connect to postgres: {error}"));
2086 }
2087 }
2088
2089 sleep(Duration::from_secs(1)).await;
2090 }
2091
2092 if let Some(error) = last_ready_error {
2093 panic!("PostgreSQL container failed readiness checks: {error}");
2094 }
2095
2096 let mut diesel_handler = None;
2100 let mut last_handler_error = None;
2101 for _ in 0..10 {
2102 match DatabaseHandler::new(&database_url) {
2103 Ok(handler) => {
2104 diesel_handler = Some(Arc::new(handler));
2105 last_handler_error = None;
2106 break;
2107 }
2108 Err(error) => {
2109 last_handler_error =
2110 Some(format!("failed to create diesel handler: {error:#}"));
2111 sleep(Duration::from_secs(1)).await;
2112 }
2113 }
2114 }
2115
2116 let diesel_handler = diesel_handler.unwrap_or_else(|| {
2117 panic!(
2118 "PostgreSQL container never became migration-ready: {}",
2119 last_handler_error.unwrap_or_else(|| "unknown error".to_string())
2120 )
2121 });
2122
2123 let cache = Arc::new(PortfolioCache::new(diesel_handler));
2124
2125 let _ = cache.initialize().await.unwrap();
2127
2128 let diesel_db = Arc::new(
2129 hypercall_db_diesel::DieselDb::new(&database_url, 2)
2130 .await
2131 .expect("DieselDb must initialize for test context"),
2132 );
2133
2134 TestContext {
2135 cache,
2136 diesel_db,
2137 sqlx_connect_options: PgConnectOptions::from_str(&database_url)
2138 .expect("Test container database URL must parse for sqlx"),
2139 _container: Some(container),
2140 }
2141 }
2142
2143 async fn attach_test_greeks_cache(context: &TestContext, symbol: &str) {
2144 let raw_event_bus = Arc::new(MockEventBus::new().expect("Failed to create mock event bus"));
2145 raw_event_bus.clone().start_processing().await;
2146 let event_bus: Arc<dyn EventBusTrait> = raw_event_bus.clone();
2147
2148 {
2149 use diesel::prelude::*;
2150 use diesel::sql_types::{BigInt, Numeric, Text};
2151
2152 let pool = context.cache.db.pool();
2153 let mut conn = pool.get().expect("Failed to get Diesel connection");
2154 diesel::sql_query(
2155 "INSERT INTO instruments (id, underlying, strike, expiry, option_type)
2156 VALUES ($1, $2, $3, $4, $5)
2157 ON CONFLICT (id) DO NOTHING",
2158 )
2159 .bind::<Text, _>(symbol)
2160 .bind::<Text, _>("BTC")
2161 .bind::<Numeric, _>(dec!(50000))
2162 .bind::<BigInt, _>(20261231_i64)
2163 .bind::<Text, _>("call")
2164 .execute(&mut conn)
2165 .expect("Failed to insert test instrument");
2166 }
2167
2168 let oracle_config = HyperliquidOracleConfig {
2169 api_url: "https://api.hyperliquid-testnet.xyz/info".to_string(),
2170 poll_interval_ms: 60_000,
2171 risk_free_rate: 0.05,
2172 symbol: "BTC".to_string(),
2173 twap_window_seconds: 300,
2174 oracle_writer: None,
2175 max_memory_samples: 100,
2176 ws_feed: None,
2177 price_notify: None,
2178 min_settlement_samples: 500,
2179 };
2180 let btc_oracle = Arc::new(
2181 HyperliquidMarkPriceOracle::new(oracle_config).expect("Failed to create BTC oracle"),
2182 );
2183 btc_oracle.set_spot_price_for_testing(50_000.0).await;
2184
2185 let mut oracles = HashMap::new();
2186 oracles.insert("BTC".to_string(), btc_oracle);
2187
2188 let quote_provider = Arc::new(MockQuoteProvider::new());
2189 quote_provider.set_quote(
2190 symbol,
2191 SnapshotBookQuote {
2192 best_bid: Some(10_000.0),
2193 best_bid_size: Some(5.0),
2194 best_ask: Some(11_000.0),
2195 best_ask_size: Some(5.0),
2196 mid: Some(10_500.0),
2197 bids: vec![(10_000.0, 5.0)],
2198 asks: vec![(11_000.0, 5.0)],
2199 },
2200 );
2201 let quote_provider_trait: Arc<dyn QuoteProvider> = quote_provider.clone();
2202 let (_shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel::<()>(1);
2203
2204 let greeks_cache = GreeksCache::new(
2205 context.diesel_db.as_ref(),
2206 event_bus,
2207 oracles,
2208 shutdown_rx,
2209 quote_provider_trait,
2210 )
2211 .await
2212 .expect("Failed to create GreeksCache");
2213 greeks_cache
2214 .set_vol_oracle(Arc::new(FixedTestRiskVolOracle::new(0.5)))
2215 .await;
2216
2217 let mut greeks_ready = false;
2218 let mut last_error = String::new();
2219 for _ in 0..20 {
2220 match greeks_cache.get_greeks(symbol).await {
2221 Ok(_) => {
2222 greeks_ready = true;
2223 break;
2224 }
2225 Err(e) => {
2226 last_error = e.to_string();
2227 }
2228 }
2229 sleep(Duration::from_millis(50)).await;
2230 }
2231 assert!(
2232 greeks_ready,
2233 "expected test GreeksCache to have greeks for {}, last error: {}",
2234 symbol, last_error
2235 );
2236
2237 *context.cache.greeks_cache.write().await = Some(greeks_cache);
2238 }
2239
2240 async fn insert_trade_for_fill(context: &TestContext, fill: &hypercall_types::Fill) {
2242 use diesel::prelude::*;
2243 use diesel::sql_types::{BigInt, Binary, Numeric, Text};
2244 use rust_decimal_macros::dec;
2245
2246 let pool = context.cache.db.pool();
2247 let mut conn = pool.get().expect("Failed to get connection");
2248
2249 diesel::sql_query(
2250 "INSERT INTO trades (trade_id, symbol, price, size, maker_address, taker_address, maker_fee, taker_fee, timestamp)
2251 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
2252 ON CONFLICT (trade_id) DO NOTHING"
2253 )
2254 .bind::<BigInt, _>(fill.trade_id as i64)
2255 .bind::<Text, _>(&fill.symbol)
2256 .bind::<Numeric, _>(fill.price)
2257 .bind::<Numeric, _>(fill.size)
2258 .bind::<Binary, _>(&fill.maker_wallet_address)
2259 .bind::<Binary, _>(&fill.taker_wallet_address)
2260 .bind::<Numeric, _>(dec!(0)) .bind::<Numeric, _>(fill.fee) .bind::<BigInt, _>(fill.timestamp as i64)
2263 .execute(&mut conn)
2264 .expect("Failed to insert trade");
2265 }
2266
2267 async fn insert_applied_settlement(
2268 context: &TestContext,
2269 wallet: WalletAddress,
2270 symbol: &str,
2271 position_size: Decimal,
2272 settlement_entry_price: Decimal,
2273 cost_basis: Decimal,
2274 net_pnl: Decimal,
2275 ) {
2276 let settlement_price = dec!(1000);
2277 let settlement_value = position_size * settlement_price;
2278 let outcome = context
2279 .cache
2280 .settlement
2281 .try_apply_settlement_sync(
2282 &wallet,
2283 symbol,
2284 position_size,
2285 settlement_price,
2286 settlement_value,
2287 MarginMode::Standard,
2288 2,
2289 Some(settlement_entry_price),
2290 Some(cost_basis),
2291 Some(net_pnl),
2292 )
2293 .expect("Failed to persist applied settlement");
2294 assert!(outcome.newly_persisted);
2295 }
2296
2297 async fn attach_test_pm_margin_dependencies(
2298 context: &TestContext,
2299 wallet: WalletAddress,
2300 ) -> Arc<TierCache> {
2301 attach_test_greeks_cache(context, "BTC-20261231-50000-C").await;
2302 let greeks_cache = context
2303 .cache
2304 .greeks_cache
2305 .read()
2306 .await
2307 .as_ref()
2308 .cloned()
2309 .expect("greeks cache should be configured");
2310
2311 let ledger: Arc<dyn crate::rsm::Ledger + Send + Sync> = Arc::new(InMemoryLedger::new());
2312 let portfolio_service: Arc<dyn crate::portfolio::PortfolioService + Send + Sync> =
2313 context.cache.get_service();
2314 let order_snapshot: Arc<dyn OrderSnapshotProvider> = Arc::new(MockOrderSnapshotProvider);
2315 let snapshot_open_orders = Arc::new(SnapshotOpenOrdersSource::new(order_snapshot.clone()));
2316 let risk_account_builder = Arc::new(crate::rsm::portfolio_margin::RiskAccountBuilder::new(
2317 ledger.clone(),
2318 portfolio_service.clone(),
2319 snapshot_open_orders,
2320 greeks_cache.clone(),
2321 ));
2322 let standard_margin_service = Arc::new(StandardMarginService::new());
2323 let standard_account_builder =
2324 Arc::new(crate::standard_margin::StandardAccountBuilder::new(
2325 ledger,
2326 portfolio_service,
2327 greeks_cache.clone(),
2328 ));
2329 let tier_cache = Arc::new(
2330 TierCache::new(context.cache.db.clone()).expect("tier cache should initialize"),
2331 );
2332 tier_cache
2333 .set_margin_mode(&wallet, MarginMode::Portfolio)
2334 .await
2335 .expect("wallet should be set to PM");
2336
2337 let span_margin_service = Arc::new(SpanMarginService::new_for_tests(Config {
2338 risk_free_rate: 0.05,
2339 base_volatility: 0.8,
2340 base_skew: 0.0,
2341 base_excess_kurtosis: 0.0,
2342 delta_threshold: 0.0001,
2343 strike_match_tolerance: 0.01,
2344 expiry_match_tolerance_years: 0.001,
2345 scenarios: vec![
2346 Scenario {
2347 scenario_type: ScenarioType::SpotChange,
2348 value: 0.15,
2349 },
2350 Scenario {
2351 scenario_type: ScenarioType::SpotChange,
2352 value: -0.15,
2353 },
2354 ],
2355 allow_standard_margin_shorts: false,
2356 fee_config: FeeConfig::default(),
2357 }));
2358
2359 context
2360 .cache
2361 .set_margin_dependencies(
2362 span_margin_service,
2363 greeks_cache,
2364 risk_account_builder,
2365 tier_cache.clone(),
2366 order_snapshot,
2367 standard_margin_service,
2368 standard_account_builder,
2369 )
2370 .await;
2371
2372 tier_cache
2373 }
2374
2375 #[tokio::test]
2376 async fn test_get_portfolio_fail_closed_pm_requires_repricing_inputs() {
2377 let context = setup_test_cache().await;
2378 let wallet = test_wallet(31);
2379
2380 context
2381 .cache
2382 .service
2383 .restore(
2384 &wallet,
2385 PortfolioBalance {
2386 positions: HashMap::from([(
2387 "BTC".to_string(),
2388 crate::portfolio::PositionData {
2389 symbol: "BTC".to_string(),
2390 amount: dec!(1),
2391 entry_price: dec!(48000),
2392 margin_posted: dec!(0),
2393 realized_pnl: dec!(0),
2394 unrealized_pnl: dec!(0),
2395 },
2396 )]),
2397 total_margin_used: dec!(0),
2398 },
2399 )
2400 .await
2401 .expect("restore should succeed");
2402
2403 let err = context
2404 .cache
2405 .get_portfolio_fail_closed_pm(&wallet)
2406 .await
2407 .expect_err("PM REST reads must fail closed without repricing inputs");
2408 assert!(
2409 err.to_string().contains("PM portfolio read unavailable"),
2410 "unexpected error: {}",
2411 err
2412 );
2413 }
2414
2415 #[tokio::test]
2416 async fn test_get_portfolio_fail_closed_pm_skips_perp_repricing() {
2417 let context = setup_test_cache().await;
2418 let wallet = test_wallet(32);
2419
2420 context
2421 .cache
2422 .service
2423 .restore(
2424 &wallet,
2425 PortfolioBalance {
2426 positions: HashMap::from([(
2427 "BTC-PERP".to_string(),
2428 crate::portfolio::PositionData {
2429 symbol: "BTC-PERP".to_string(),
2430 amount: dec!(1),
2431 entry_price: dec!(48000),
2432 margin_posted: dec!(0),
2433 realized_pnl: dec!(0),
2434 unrealized_pnl: dec!(750),
2435 },
2436 )]),
2437 total_margin_used: dec!(0),
2438 },
2439 )
2440 .await
2441 .expect("restore should succeed");
2442
2443 attach_test_greeks_cache(&context, "BTC-20260331-50000-C").await;
2444
2445 let portfolio = context
2446 .cache
2447 .get_portfolio_fail_closed_pm(&wallet)
2448 .await
2449 .expect("PM REST reads should succeed — perps are skipped during repricing");
2450 let btc_position = portfolio
2451 .positions
2452 .iter()
2453 .find(|position| position.position.symbol == "BTC-PERP")
2454 .expect("BTC-PERP position should be present");
2455 assert_eq!(btc_position.position.unrealized_pnl, dec!(750));
2457 }
2458
2459 #[tokio::test]
2460 async fn test_get_portfolio_fail_closed_pm_repairs_applied_settlement_ghost_position() {
2461 let context = setup_test_cache().await;
2462 let wallet = test_wallet(33);
2463 let symbol = "BTC-20260403-34000-C";
2464
2465 context
2466 .cache
2467 .service
2468 .restore(
2469 &wallet,
2470 PortfolioBalance {
2471 positions: HashMap::from([(
2472 symbol.to_string(),
2473 crate::portfolio::PositionData {
2474 symbol: symbol.to_string(),
2475 amount: dec!(1),
2476 entry_price: dec!(1500),
2477 margin_posted: dec!(0),
2478 realized_pnl: dec!(0),
2479 unrealized_pnl: dec!(0),
2480 },
2481 )]),
2482 total_margin_used: dec!(0),
2483 },
2484 )
2485 .await
2486 .expect("restore should succeed");
2487 insert_applied_settlement(
2488 &context,
2489 wallet,
2490 symbol,
2491 dec!(1),
2492 dec!(0),
2493 dec!(0),
2494 dec!(1000),
2495 )
2496 .await;
2497
2498 let portfolio = context
2499 .cache
2500 .get_portfolio_fail_closed_pm(&wallet)
2501 .await
2502 .expect("PM portfolio should self-heal settled ghost positions");
2503
2504 assert!(
2505 portfolio
2506 .positions
2507 .iter()
2508 .all(|position| position.position.symbol != symbol),
2509 "settled ghost symbol should be removed from REST portfolio"
2510 );
2511 let internal = context
2512 .cache
2513 .service
2514 .get_portfolio_balance(&wallet)
2515 .await
2516 .expect("wallet should still exist");
2517 assert!(
2518 !internal.positions.contains_key(symbol),
2519 "settled ghost symbol should be removed from in-memory portfolio"
2520 );
2521 }
2522
2523 #[tokio::test]
2524 async fn test_get_portfolio_fail_closed_pm_keeps_fail_closed_without_settlement_proof() {
2525 let context = setup_test_cache().await;
2526 let wallet = test_wallet(34);
2527 let symbol = "BTC-20260403-34000-C";
2528
2529 context
2530 .cache
2531 .service
2532 .restore(
2533 &wallet,
2534 PortfolioBalance {
2535 positions: HashMap::from([(
2536 symbol.to_string(),
2537 crate::portfolio::PositionData {
2538 symbol: symbol.to_string(),
2539 amount: dec!(1),
2540 entry_price: dec!(1500),
2541 margin_posted: dec!(0),
2542 realized_pnl: dec!(0),
2543 unrealized_pnl: dec!(0),
2544 },
2545 )]),
2546 total_margin_used: dec!(0),
2547 },
2548 )
2549 .await
2550 .expect("restore should succeed");
2551
2552 let err = context
2553 .cache
2554 .get_portfolio_fail_closed_pm(&wallet)
2555 .await
2556 .expect_err("PM portfolio should stay fail-closed without settlement proof");
2557 assert!(
2558 err.to_string().contains("PM portfolio read unavailable"),
2559 "unexpected error: {}",
2560 err
2561 );
2562 }
2563
2564 #[tokio::test]
2565 async fn test_compute_wallet_margin_snapshot_repairs_applied_settlement_ghost_position() {
2566 let context = setup_test_cache().await;
2567 let wallet = test_wallet(35);
2568 let symbol = "BTC-20260403-34000-C";
2569
2570 context
2571 .cache
2572 .service
2573 .restore(
2574 &wallet,
2575 PortfolioBalance {
2576 positions: HashMap::from([(
2577 symbol.to_string(),
2578 crate::portfolio::PositionData {
2579 symbol: symbol.to_string(),
2580 amount: dec!(1),
2581 entry_price: dec!(1500),
2582 margin_posted: dec!(0),
2583 realized_pnl: dec!(0),
2584 unrealized_pnl: dec!(0),
2585 },
2586 )]),
2587 total_margin_used: dec!(0),
2588 },
2589 )
2590 .await
2591 .expect("restore should succeed");
2592 insert_applied_settlement(
2593 &context,
2594 wallet,
2595 symbol,
2596 dec!(1),
2597 dec!(0),
2598 dec!(0),
2599 dec!(1000),
2600 )
2601 .await;
2602 let _tier_cache = attach_test_pm_margin_dependencies(&context, wallet).await;
2603
2604 let snapshot = context
2605 .cache
2606 .compute_wallet_margin_snapshot(&wallet)
2607 .await
2608 .expect("PM margin snapshot should self-heal settled ghost positions");
2609
2610 assert_eq!(snapshot.mode, MarginMode::Portfolio);
2611 assert_eq!(snapshot.total_margin_used, dec!(0));
2612 assert_eq!(snapshot.available_balance, dec!(0));
2613 let internal = context
2614 .cache
2615 .service
2616 .get_portfolio_balance(&wallet)
2617 .await
2618 .expect("wallet should still exist");
2619 assert!(
2620 !internal.positions.contains_key(symbol),
2621 "settled ghost symbol should be removed before PM margin snapshot"
2622 );
2623 }
2624
2625 #[tokio::test]
2626 async fn test_portfolio_initialization() {
2627 let context = setup_test_cache().await;
2628 let cache = context.cache.clone();
2629
2630 let next_command_id = cache.initialize().await.unwrap();
2632 assert_eq!(next_command_id, 1);
2633
2634 let portfolios = cache.service.all_portfolios().await;
2636 assert_eq!(portfolios.len(), 0);
2637 }
2638
2639 #[tokio::test(flavor = "multi_thread")]
2640 async fn test_snapshot_and_recovery() {
2641 let context = setup_test_cache().await;
2642 let cache = context.cache.clone();
2643
2644 let fill1 = Fill {
2646 trade_id: 1,
2647 taker_order_id: 100,
2648 maker_order_id: 101,
2649 symbol: "BTC-CALL-100000".to_string(),
2650 price: dec!(1000),
2651 size: to_contract_units_decimal("BTC-CALL-100000", dec!(10)),
2652 taker_side: Side::Buy,
2653 taker_wallet_address: test_wallet(1),
2654 maker_wallet_address: test_wallet(200),
2655 fee: dec!(0),
2656 is_taker: true,
2657 timestamp: 0,
2658 builder_code_address: None,
2659 builder_code_fee: None,
2660 source: Default::default(),
2661 taker_realized_pnl: None,
2662 maker_realized_pnl: None,
2663 underlying_notional: None,
2664 };
2665
2666 let fill2 = Fill {
2667 trade_id: 2,
2668 taker_order_id: 102,
2669 maker_order_id: 103,
2670 symbol: "ETH-PUT-3000".to_string(),
2671 price: dec!(50),
2672 size: to_contract_units_decimal("ETH-PUT-3000", dec!(5)),
2673 taker_side: Side::Sell,
2674 taker_wallet_address: test_wallet(2),
2675 maker_wallet_address: test_wallet(200),
2676 fee: dec!(0),
2677 is_taker: true,
2678 timestamp: 0,
2679 builder_code_address: None,
2680 builder_code_fee: None,
2681 source: Default::default(),
2682 taker_realized_pnl: None,
2683 maker_realized_pnl: None,
2684 underlying_notional: None,
2685 };
2686
2687 insert_trade_for_fill(&context, &fill1).await;
2688 cache
2689 .handle_engine_message(
2690 EngineMessage::OrderFilled {
2691 accounting: hypercall_engine::FillAccounting::from_fill(&fill1),
2692 fill: fill1,
2693 },
2694 1,
2695 )
2696 .await;
2697 insert_trade_for_fill(&context, &fill2).await;
2698 cache
2699 .handle_engine_message(
2700 EngineMessage::OrderFilled {
2701 accounting: hypercall_engine::FillAccounting::from_fill(&fill2),
2702 fill: fill2,
2703 },
2704 2,
2705 )
2706 .await;
2707
2708 assert_eq!(
2710 cache.get_last_processed_seq().await,
2711 2,
2712 "Seq should be 2 after processing 2 fills"
2713 );
2714
2715 let get_offsets = move || {
2717 let mut offsets = HashMap::new();
2718 offsets.insert(
2719 ENGINE_COMMAND_SNAPSHOT_STREAM.to_string(),
2720 HashMap::from([(0, 1)]),
2721 );
2722 Ok(offsets)
2723 };
2724 let cache_for_capture = cache.clone();
2726 let capture_snapshot = move || {
2727 tokio::task::block_in_place(|| {
2728 tokio::runtime::Handle::current().block_on(async {
2729 let (_seq, portfolios) = cache_for_capture.capture_snapshot_state().await;
2730 let mut offsets = HashMap::new();
2731 offsets.insert(
2732 ENGINE_COMMAND_SNAPSHOT_STREAM.to_string(),
2733 HashMap::from([(0, 1)]),
2734 );
2735 Ok((portfolios, offsets))
2736 })
2737 })
2738 };
2739
2740 let writer =
2741 DbPortfolioSnapshotWriter::new(cache.db.clone(), cache.get_service(), get_offsets)
2742 .with_capture_snapshot(capture_snapshot);
2743 writer.take_snapshot().unwrap();
2744
2745 let cache2 = Arc::new(PortfolioCache::new(cache.db.clone()));
2747 let next_command_id = cache2.initialize().await.unwrap();
2748 assert_eq!(next_command_id, 1);
2749
2750 let portfolio1 = cache2.get_portfolio(&test_wallet(1)).await.unwrap();
2752 assert_eq!(portfolio1.positions.len(), 1);
2753 assert!(cache2.has_portfolio(&test_wallet(1)).await);
2754
2755 let portfolio2 = cache2.get_portfolio(&test_wallet(2)).await.unwrap();
2756 assert_eq!(portfolio2.positions.len(), 1);
2757 assert!(cache2.has_portfolio(&test_wallet(2)).await);
2758 }
2759
2760 #[tokio::test(flavor = "multi_thread")]
2761 async fn test_replay_journal_fill_applies_multiple_fills_from_one_command() {
2762 let context = setup_test_cache().await;
2763 let cache = context.cache.clone();
2764 let symbol = "BTC-CALL-100000".to_string();
2765 let taker_wallet = test_wallet(1);
2766 let maker_wallet = test_wallet(200);
2767
2768 let fill1 = Fill {
2769 trade_id: 10,
2770 taker_order_id: 100,
2771 maker_order_id: 101,
2772 symbol: symbol.clone(),
2773 price: dec!(1000),
2774 size: to_contract_units_decimal(&symbol, dec!(0.1)),
2775 taker_side: Side::Buy,
2776 taker_wallet_address: taker_wallet,
2777 maker_wallet_address: maker_wallet,
2778 fee: dec!(0),
2779 is_taker: true,
2780 timestamp: 0,
2781 builder_code_address: None,
2782 builder_code_fee: None,
2783 source: Default::default(),
2784 taker_realized_pnl: None,
2785 maker_realized_pnl: None,
2786 underlying_notional: None,
2787 };
2788 let fill2 = Fill {
2789 trade_id: 11,
2790 taker_order_id: 100,
2791 maker_order_id: 102,
2792 symbol: symbol.clone(),
2793 price: dec!(1000),
2794 size: to_contract_units_decimal(&symbol, dec!(0.9)),
2795 taker_side: Side::Buy,
2796 taker_wallet_address: taker_wallet,
2797 maker_wallet_address: maker_wallet,
2798 fee: dec!(0),
2799 is_taker: true,
2800 timestamp: 0,
2801 builder_code_address: None,
2802 builder_code_fee: None,
2803 source: Default::default(),
2804 taker_realized_pnl: None,
2805 maker_realized_pnl: None,
2806 underlying_notional: None,
2807 };
2808
2809 insert_trade_for_fill(&context, &fill1).await;
2810 insert_trade_for_fill(&context, &fill2).await;
2811
2812 cache.replay_journal_fill(&fill1, 42).await;
2813 cache.replay_journal_fill(&fill2, 42).await;
2814
2815 let balance = cache
2816 .service
2817 .get_portfolio_balance(&taker_wallet)
2818 .await
2819 .expect("taker portfolio should exist after replay");
2820 assert_eq!(balance.positions.get(&symbol).unwrap().amount, dec!(1.0));
2821 }
2822
2823 #[tokio::test(flavor = "multi_thread")]
2824 async fn test_snapshot_recovery_repairs_applied_settlement_ghost_position_on_read() {
2825 let context = setup_test_cache().await;
2826 let wallet = test_wallet(36);
2827 let symbol = "BTC-20260403-34000-C";
2828
2829 context
2830 .cache
2831 .service
2832 .restore(
2833 &wallet,
2834 PortfolioBalance {
2835 positions: HashMap::from([(
2836 symbol.to_string(),
2837 crate::portfolio::PositionData {
2838 symbol: symbol.to_string(),
2839 amount: dec!(1),
2840 entry_price: dec!(1500),
2841 margin_posted: dec!(0),
2842 realized_pnl: dec!(0),
2843 unrealized_pnl: dec!(0),
2844 },
2845 )]),
2846 total_margin_used: dec!(0),
2847 },
2848 )
2849 .await
2850 .expect("restore should succeed");
2851 insert_applied_settlement(
2852 &context,
2853 wallet,
2854 symbol,
2855 dec!(1),
2856 dec!(0),
2857 dec!(0),
2858 dec!(1000),
2859 )
2860 .await;
2861
2862 let cache = context.cache.clone();
2863 let get_offsets = move || {
2864 let mut offsets = HashMap::new();
2865 offsets.insert(
2866 ENGINE_COMMAND_SNAPSHOT_STREAM.to_string(),
2867 HashMap::from([(0, 1)]),
2868 );
2869 Ok(offsets)
2870 };
2871 let cache_for_capture = cache.clone();
2872 let capture_snapshot = move || {
2873 tokio::task::block_in_place(|| {
2874 tokio::runtime::Handle::current().block_on(async {
2875 let (_seq, portfolios) = cache_for_capture.capture_snapshot_state().await;
2876 let mut offsets = HashMap::new();
2877 offsets.insert(
2878 ENGINE_COMMAND_SNAPSHOT_STREAM.to_string(),
2879 HashMap::from([(0, 1)]),
2880 );
2881 Ok((portfolios, offsets))
2882 })
2883 })
2884 };
2885
2886 let writer =
2887 DbPortfolioSnapshotWriter::new(cache.db.clone(), cache.get_service(), get_offsets)
2888 .with_capture_snapshot(capture_snapshot);
2889 writer.take_snapshot().expect("snapshot should succeed");
2890
2891 let cache2 = Arc::new(PortfolioCache::new(cache.db.clone()));
2892 let next_command_id = cache2
2893 .initialize()
2894 .await
2895 .expect("initialize should succeed");
2896 assert_eq!(next_command_id, 1);
2897
2898 let portfolio = cache2
2899 .get_portfolio(&wallet)
2900 .await
2901 .expect("portfolio read should self-heal settled ghost position after restore");
2902 assert!(
2903 portfolio
2904 .positions
2905 .iter()
2906 .all(|position| position.position.symbol != symbol),
2907 "settled ghost symbol should be removed after snapshot recovery"
2908 );
2909 }
2910
2911 #[tokio::test]
2912 async fn test_portfolio_subscription() {
2913 let context = setup_test_cache().await;
2914 let cache = context.cache.clone();
2915
2916 let (_subscriber_id, mut rx) = cache.subscribe(test_wallet(1)).await;
2918
2919 let update = rx.recv().await.unwrap();
2921 match update {
2922 PortfolioUpdate::Initial { positions, .. } => {
2923 assert_eq!(positions.len(), 0);
2924 }
2925 _ => panic!("Expected Initial update"),
2926 }
2927
2928 use hypercall_types::Fill;
2930
2931 let fill = Fill {
2932 trade_id: 1,
2933 taker_order_id: 100,
2934 maker_order_id: 101,
2935 symbol: "BTC-CALL-100000".to_string(),
2936 price: dec!(1000),
2937 size: to_contract_units_decimal("BTC-CALL-100000", dec!(10)),
2938 taker_side: Side::Buy,
2939 taker_wallet_address: test_wallet(1),
2940 maker_wallet_address: test_wallet(200),
2941 fee: dec!(0),
2942 is_taker: true,
2943 timestamp: 0,
2944 builder_code_address: None,
2945 builder_code_fee: None,
2946 source: Default::default(),
2947 taker_realized_pnl: None,
2948 maker_realized_pnl: None,
2949 underlying_notional: None,
2950 };
2951
2952 insert_trade_for_fill(&context, &fill).await;
2953 cache
2954 .handle_engine_message(
2955 EngineMessage::OrderFilled {
2956 accounting: hypercall_engine::FillAccounting::from_fill(&fill),
2957 fill,
2958 },
2959 1,
2960 )
2961 .await;
2962
2963 let update = rx.recv().await.unwrap();
2965 match update {
2966 PortfolioUpdate::PositionUpdate { position, .. } => {
2967 assert_eq!(position.position.symbol, "BTC-CALL-100000");
2968 assert_eq!(position.position.amount, dec!(10));
2969 assert_eq!(position.position.realized_pnl, dec!(0));
2970 assert_eq!(position.position.unrealized_pnl, dec!(0));
2971 }
2972 _ => panic!("Expected PositionUpdate"),
2973 }
2974
2975 let update = rx.recv().await.unwrap();
2977 match update {
2978 PortfolioUpdate::BalanceUpdate {
2979 total_margin_used, ..
2980 } => {
2981 let _ = total_margin_used;
2983 }
2984 _ => panic!("Expected BalanceUpdate"),
2985 }
2986 }
2987
2988 #[tokio::test]
2989 async fn test_position_expired_notifies_subscribers_after_projection_update() {
2990 let context = setup_test_cache().await;
2991 let cache = context.cache.clone();
2992 let wallet = test_wallet(1);
2993 let symbol = "BTC-20261231-100000-C".to_string();
2994
2995 let (_subscriber_id, mut rx) = cache.subscribe(wallet).await;
2996 let initial = rx.recv().await.unwrap();
2997 assert!(matches!(initial, PortfolioUpdate::Initial { .. }));
2998
2999 let fill = Fill {
3000 trade_id: 1,
3001 taker_order_id: 100,
3002 maker_order_id: 101,
3003 symbol: symbol.clone(),
3004 price: dec!(1000),
3005 size: to_contract_units_decimal(&symbol, dec!(10)),
3006 taker_side: Side::Buy,
3007 taker_wallet_address: wallet,
3008 maker_wallet_address: test_wallet(200),
3009 fee: dec!(0),
3010 is_taker: true,
3011 timestamp: 0,
3012 builder_code_address: None,
3013 builder_code_fee: None,
3014 source: hypercall_types::FillSource::Orderbook,
3015 taker_realized_pnl: None,
3016 maker_realized_pnl: None,
3017 underlying_notional: None,
3018 };
3019
3020 insert_trade_for_fill(&context, &fill).await;
3021 cache
3022 .handle_engine_message(
3023 EngineMessage::OrderFilled {
3024 accounting: hypercall_engine::FillAccounting::from_fill(&fill),
3025 fill,
3026 },
3027 1,
3028 )
3029 .await;
3030
3031 let _ = rx.recv().await.unwrap();
3032 let _ = rx.recv().await.unwrap();
3033
3034 insert_applied_settlement(
3035 &context,
3036 wallet,
3037 &symbol,
3038 dec!(10),
3039 dec!(1000),
3040 dec!(10000),
3041 dec!(0),
3042 )
3043 .await;
3044 cache
3045 .handle_engine_message(
3046 EngineMessage::PositionExpired(PositionExpiredMessage {
3047 wallet_address: wallet,
3048 margin_mode: MarginMode::Standard,
3049 symbol: symbol.clone(),
3050 position_size: dec!(10),
3051 settlement_price: dec!(1000),
3052 settlement_value: dec!(10000),
3053 settlement_entry_price: Some(dec!(1000)),
3054 cost_basis: Some(dec!(10000)),
3055 net_pnl: Some(dec!(0)),
3056 timestamp: 2,
3057 }),
3058 2,
3059 )
3060 .await;
3061
3062 match rx.recv().await.unwrap() {
3063 PortfolioUpdate::PositionUpdate { position, .. } => {
3064 assert_eq!(position.position.symbol, symbol);
3065 assert_eq!(position.position.amount, dec!(0));
3066 }
3067 other => panic!("Expected PositionUpdate after expiry, got {:?}", other),
3068 }
3069
3070 match rx.recv().await.unwrap() {
3071 PortfolioUpdate::BalanceUpdate {
3072 total_margin_used, ..
3073 } => {
3074 assert_eq!(total_margin_used, dec!(0));
3075 }
3076 other => panic!("Expected BalanceUpdate after expiry, got {:?}", other),
3077 }
3078 }
3079
3080 #[tokio::test]
3081 async fn test_subscription_receives_greeks_snapshot_update() {
3082 let context = setup_test_cache().await;
3083 let cache = context.cache.clone();
3084 attach_test_greeks_cache(&context, "BTC-20261231-50000-C").await;
3085
3086 let (_subscriber_id, mut rx) = cache.subscribe(test_wallet(1)).await;
3087 let initial = timeout(Duration::from_secs(2), rx.recv())
3088 .await
3089 .expect("timed out waiting for initial update")
3090 .expect("channel closed");
3091 assert!(matches!(initial, PortfolioUpdate::Initial { .. }));
3092
3093 let greeks_update = timeout(Duration::from_secs(2), rx.recv())
3094 .await
3095 .expect("timed out waiting for greeks snapshot")
3096 .expect("channel closed");
3097 match greeks_update {
3098 PortfolioUpdate::GreeksUpdate {
3099 per_leg, aggregate, ..
3100 } => {
3101 assert!(per_leg.is_empty(), "expected empty per-leg greeks");
3102 assert!(aggregate.is_none(), "expected null aggregate greeks");
3103 }
3104 other => panic!("expected GreeksUpdate, got {:?}", other),
3105 }
3106 }
3107
3108 #[tokio::test]
3109 async fn test_fill_triggers_greeks_update() {
3110 let context = setup_test_cache().await;
3111 let cache = context.cache.clone();
3112 let symbol = "BTC-20261231-50000-C";
3113 attach_test_greeks_cache(&context, symbol).await;
3114
3115 let (_subscriber_id, mut rx) = cache.subscribe(test_wallet(1)).await;
3116 let _ = timeout(Duration::from_secs(2), rx.recv())
3117 .await
3118 .expect("timed out waiting for initial update")
3119 .expect("channel closed");
3120 let _ = timeout(Duration::from_secs(2), rx.recv())
3121 .await
3122 .expect("timed out waiting for greeks snapshot")
3123 .expect("channel closed");
3124
3125 use hypercall_types::Fill;
3126
3127 let fill = Fill {
3128 trade_id: 1001,
3129 taker_order_id: 100,
3130 maker_order_id: 101,
3131 symbol: symbol.to_string(),
3132 price: dec!(1000),
3133 size: to_contract_units_decimal(symbol, dec!(2)),
3134 taker_side: Side::Buy,
3135 taker_wallet_address: test_wallet(1),
3136 maker_wallet_address: test_wallet(200),
3137 fee: dec!(0),
3138 is_taker: true,
3139 timestamp: 0,
3140 builder_code_address: None,
3141 builder_code_fee: None,
3142 source: Default::default(),
3143 taker_realized_pnl: None,
3144 maker_realized_pnl: None,
3145 underlying_notional: None,
3146 };
3147
3148 insert_trade_for_fill(&context, &fill).await;
3149 cache
3150 .handle_engine_message(
3151 EngineMessage::OrderFilled {
3152 accounting: hypercall_engine::FillAccounting::from_fill(&fill),
3153 fill,
3154 },
3155 10,
3156 )
3157 .await;
3158
3159 let mut saw_position_update = false;
3160 let mut saw_greeks = false;
3161 for _ in 0..10 {
3162 let update = timeout(Duration::from_secs(5), rx.recv())
3163 .await
3164 .expect("timed out waiting for update")
3165 .expect("channel closed");
3166 match update {
3167 PortfolioUpdate::PositionUpdate { position, .. } => {
3168 assert_eq!(position.position.symbol, symbol);
3169 assert!(
3170 position.position.unrealized_pnl != dec!(0),
3171 "option UPNL should be repriced from theoretical mark"
3172 );
3173 saw_position_update = true;
3174 }
3175 PortfolioUpdate::GreeksUpdate {
3176 per_leg, aggregate, ..
3177 } => {
3178 assert!(!per_leg.is_empty(), "expected non-empty per-leg greeks");
3179 assert_eq!(per_leg[0].symbol, symbol);
3180 assert!(aggregate.is_some(), "expected aggregate greeks");
3181 saw_greeks = true;
3182 if saw_position_update {
3183 break;
3184 }
3185 }
3186 _ => {}
3187 }
3188 }
3189
3190 assert!(
3191 saw_position_update,
3192 "expected a PositionUpdate with theoretical repriced UPNL after fill"
3193 );
3194 assert!(saw_greeks, "expected a GreeksUpdate after fill");
3195 }
3196
3197 #[tokio::test]
3198 async fn test_get_portfolio_succeeds_when_theoretical_mark_missing() {
3199 let context = setup_test_cache().await;
3200 let cache = context.cache.clone();
3201 attach_test_greeks_cache(&context, "BTC-20261231-50000-C").await;
3202
3203 let symbol_missing_mark = "BTC-20261231-51000-C";
3204 let fill = Fill {
3205 trade_id: 2001,
3206 taker_order_id: 100,
3207 maker_order_id: 101,
3208 symbol: symbol_missing_mark.to_string(),
3209 price: dec!(1000),
3210 size: to_contract_units_decimal(symbol_missing_mark, dec!(1)),
3211 taker_side: Side::Buy,
3212 taker_wallet_address: test_wallet(1),
3213 maker_wallet_address: test_wallet(200),
3214 fee: dec!(0),
3215 is_taker: true,
3216 timestamp: 0,
3217 builder_code_address: None,
3218 builder_code_fee: None,
3219 source: Default::default(),
3220 taker_realized_pnl: None,
3221 maker_realized_pnl: None,
3222 underlying_notional: None,
3223 };
3224
3225 insert_trade_for_fill(&context, &fill).await;
3226 cache
3227 .handle_engine_message(
3228 EngineMessage::OrderFilled {
3229 accounting: hypercall_engine::FillAccounting::from_fill(&fill),
3230 fill,
3231 },
3232 11,
3233 )
3234 .await;
3235
3236 let portfolio = cache
3238 .get_portfolio(&test_wallet(1))
3239 .await
3240 .expect("portfolio should succeed even with missing theoretical marks");
3241 assert!(!portfolio.positions.is_empty());
3242 }
3243
3244 #[tokio::test]
3245 async fn test_get_portfolio_reprices_option_upnl_with_theoretical_mark() {
3246 let context = setup_test_cache().await;
3247 let cache = context.cache.clone();
3248 let symbol = "BTC-20261231-50000-C";
3249 attach_test_greeks_cache(&context, symbol).await;
3250
3251 let fill = Fill {
3252 trade_id: 2002,
3253 taker_order_id: 100,
3254 maker_order_id: 101,
3255 symbol: symbol.to_string(),
3256 price: dec!(1000),
3257 size: to_contract_units_decimal(symbol, dec!(1)),
3258 taker_side: Side::Buy,
3259 taker_wallet_address: test_wallet(1),
3260 maker_wallet_address: test_wallet(200),
3261 fee: dec!(0),
3262 is_taker: true,
3263 timestamp: 0,
3264 builder_code_address: None,
3265 builder_code_fee: None,
3266 source: Default::default(),
3267 taker_realized_pnl: None,
3268 maker_realized_pnl: None,
3269 underlying_notional: None,
3270 };
3271
3272 insert_trade_for_fill(&context, &fill).await;
3273 cache
3274 .handle_engine_message(
3275 EngineMessage::OrderFilled {
3276 accounting: hypercall_engine::FillAccounting::from_fill(&fill),
3277 fill,
3278 },
3279 12,
3280 )
3281 .await;
3282
3283 let portfolio = cache
3284 .get_portfolio(&test_wallet(1))
3285 .await
3286 .expect("portfolio fetch should succeed");
3287 let position = portfolio
3288 .positions
3289 .iter()
3290 .find(|p| p.position.symbol == symbol)
3291 .expect("position should exist");
3292 assert!(
3293 position.position.unrealized_pnl != dec!(0),
3294 "REST portfolio should expose theoretical repriced UPNL"
3295 );
3296 }
3297
3298 #[tokio::test]
3299 async fn test_multiple_subscribers() {
3300 let context = setup_test_cache().await;
3301 let cache = context.cache.clone();
3302
3303 let (_subscriber_id_1, mut rx1) = cache.subscribe(test_wallet(1)).await;
3305 let (_subscriber_id_2, mut rx2) = cache.subscribe(test_wallet(1)).await;
3306
3307 let _ = rx1.recv().await.unwrap();
3309 let _ = rx2.recv().await.unwrap();
3310
3311 use hypercall_types::Fill;
3313
3314 let fill = Fill {
3315 trade_id: 2,
3316 taker_order_id: 200,
3317 maker_order_id: 201,
3318 symbol: "BTC-CALL-100000".to_string(),
3319 price: dec!(1000),
3320 size: to_contract_units_decimal("BTC-CALL-100000", dec!(10)),
3321 taker_side: Side::Buy,
3322 taker_wallet_address: test_wallet(1),
3323 maker_wallet_address: test_wallet(200),
3324 fee: dec!(0),
3325 is_taker: true,
3326 timestamp: 0,
3327 builder_code_address: None,
3328 builder_code_fee: None,
3329 source: Default::default(),
3330 taker_realized_pnl: None,
3331 maker_realized_pnl: None,
3332 underlying_notional: None,
3333 };
3334
3335 insert_trade_for_fill(&context, &fill).await;
3336 cache
3337 .handle_engine_message(
3338 EngineMessage::OrderFilled {
3339 accounting: hypercall_engine::FillAccounting::from_fill(&fill),
3340 fill,
3341 },
3342 2,
3343 )
3344 .await;
3345
3346 let update1 = rx1.recv().await.unwrap();
3348 let update2 = rx2.recv().await.unwrap();
3349
3350 match (update1, update2) {
3351 (
3352 PortfolioUpdate::PositionUpdate { position: pos1, .. },
3353 PortfolioUpdate::PositionUpdate { position: pos2, .. },
3354 ) => {
3355 assert_eq!(pos1.position.realized_pnl, dec!(0));
3356 assert_eq!(pos1.position.unrealized_pnl, dec!(0));
3357 assert_eq!(pos2.position.realized_pnl, dec!(0));
3358 assert_eq!(pos2.position.unrealized_pnl, dec!(0));
3359 }
3360 _ => panic!("Expected both subscribers to receive PositionUpdate"),
3361 }
3362 }
3363
3364 #[tokio::test]
3365 async fn test_seq_updated_after_apply() {
3366 let context = setup_test_cache().await;
3367 let cache = context.cache.clone();
3368
3369 assert_eq!(cache.get_last_processed_seq().await, 0);
3371
3372 use hypercall_types::Fill;
3374
3375 let fill = Fill {
3376 trade_id: 42,
3377 taker_order_id: 100,
3378 maker_order_id: 101,
3379 symbol: "BTC-CALL-100000".to_string(),
3380 price: dec!(1000),
3381 size: to_contract_units_decimal("BTC-CALL-100000", dec!(10)),
3382 taker_side: Side::Buy,
3383 taker_wallet_address: test_wallet(1),
3384 maker_wallet_address: test_wallet(200),
3385 fee: dec!(0),
3386 is_taker: true,
3387 timestamp: 0,
3388 builder_code_address: None,
3389 builder_code_fee: None,
3390 source: Default::default(),
3391 taker_realized_pnl: None,
3392 maker_realized_pnl: None,
3393 underlying_notional: None,
3394 };
3395
3396 insert_trade_for_fill(&context, &fill).await;
3397 cache
3398 .handle_engine_message(
3399 EngineMessage::OrderFilled {
3400 accounting: hypercall_engine::FillAccounting::from_fill(&fill),
3401 fill,
3402 },
3403 42,
3404 )
3405 .await;
3406
3407 assert_eq!(cache.get_last_processed_seq().await, 42);
3409
3410 let portfolio = cache.get_portfolio(&test_wallet(1)).await.unwrap();
3412 assert_eq!(portfolio.positions.len(), 1);
3413 assert_eq!(portfolio.positions[0].position.amount, dec!(10));
3414 }
3415
3416 #[tokio::test]
3417 async fn test_capture_snapshot_state_is_consistent() {
3418 let context = setup_test_cache().await;
3419 let cache = context.cache.clone();
3420
3421 use hypercall_types::Fill;
3423
3424 let fill1 = Fill {
3425 trade_id: 1,
3426 taker_order_id: 100,
3427 maker_order_id: 101,
3428 symbol: "BTC-CALL-100000".to_string(),
3429 price: dec!(1000),
3430 size: to_contract_units_decimal("BTC-CALL-100000", dec!(10)),
3431 taker_side: Side::Buy,
3432 taker_wallet_address: test_wallet(1),
3433 maker_wallet_address: test_wallet(200),
3434 fee: dec!(0),
3435 is_taker: true,
3436 timestamp: 0,
3437 builder_code_address: None,
3438 builder_code_fee: None,
3439 source: Default::default(),
3440 taker_realized_pnl: None,
3441 maker_realized_pnl: None,
3442 underlying_notional: None,
3443 };
3444
3445 insert_trade_for_fill(&context, &fill1).await;
3446 cache
3447 .handle_engine_message(
3448 EngineMessage::OrderFilled {
3449 accounting: hypercall_engine::FillAccounting::from_fill(&fill1),
3450 fill: fill1,
3451 },
3452 1,
3453 )
3454 .await;
3455
3456 let (seq, portfolios) = cache.capture_snapshot_state().await;
3458
3459 assert_eq!(seq, 1);
3461 assert!(portfolios.contains_key(&test_wallet(1)));
3462 let balance = portfolios.get(&test_wallet(1)).unwrap();
3463 assert!(balance.positions.contains_key("BTC-CALL-100000"));
3464 assert_eq!(
3465 balance.positions.get("BTC-CALL-100000").unwrap().amount,
3466 dec!(10)
3467 );
3468 }
3469
3470 #[test]
3471 fn test_normalize_test_database_url_repairs_duplicate_query_separator() {
3472 let normalized = normalize_test_database_url(
3473 "postgresql://user:pass@localhost:5432/test_db?sslmode=disable?sslmode=require",
3474 Some("isolated_db"),
3475 );
3476
3477 assert_eq!(
3478 normalized,
3479 "postgresql://user:pass@localhost:5432/isolated_db?sslmode=disable"
3480 );
3481 }
3482
3483 #[test]
3484 fn test_normalize_test_database_url_preserves_authority_verbatim() {
3485 let normalized = normalize_test_database_url(
3486 "postgresql://user:pa%40ss@shared-db.internal:5432/test_db?sslmode=require",
3487 Some("isolated_db"),
3488 );
3489
3490 assert_eq!(
3491 normalized,
3492 "postgresql://user:pa%40ss@shared-db.internal:5432/isolated_db?sslmode=require"
3493 );
3494 }
3495
3496 #[tokio::test]
3497 async fn test_publish_margin_updates_for_subscribers_counts_unique_wallets() {
3498 let context = setup_test_cache().await;
3499 let cache = context.cache.clone();
3500
3501 let (_subscriber_id_a1, mut rx_a1) = cache.subscribe(test_wallet(1)).await;
3503 let (_subscriber_id_a2, mut rx_a2) = cache.subscribe(test_wallet(1)).await;
3504 let (_subscriber_id_b1, mut rx_b1) = cache.subscribe(test_wallet(2)).await;
3506
3507 let _ = rx_a1.recv().await.unwrap();
3509 let _ = rx_a2.recv().await.unwrap();
3510 let _ = rx_b1.recv().await.unwrap();
3511
3512 let attempted = cache.publish_margin_updates_for_subscribers().await;
3513 assert_eq!(attempted, 2, "should attempt unique wallets only");
3514 }
3515
3516 #[tokio::test]
3517 async fn test_publish_margin_updates_for_subscribers_empty() {
3518 let context = setup_test_cache().await;
3519 let cache = context.cache.clone();
3520
3521 let attempted = cache.publish_margin_updates_for_subscribers().await;
3522 assert_eq!(attempted, 0, "no subscribers should attempt zero wallets");
3523 }
3524
3525 #[tokio::test]
3526 async fn test_dropped_subscriber_is_pruned_on_notify() {
3527 let context = setup_test_cache().await;
3528 let cache = context.cache.clone();
3529 let wallet = test_wallet(1);
3530
3531 let (_subscriber_id, mut rx) = cache.subscribe(wallet).await;
3532 let _ = rx.recv().await.unwrap();
3533 drop(rx);
3534
3535 cache
3536 .notify_subscribers(
3537 &wallet,
3538 PortfolioUpdate::BalanceUpdate {
3539 total_margin_used: dec!(0),
3540 timestamp: chrono::Utc::now().timestamp(),
3541 },
3542 )
3543 .await;
3544
3545 let subscribers = cache.subscribers.read().await;
3546 assert!(
3547 subscribers.get(&wallet).is_none(),
3548 "dropped subscriber should be removed after send failure"
3549 );
3550 }
3551}