1use crate::portfolio::PortfolioService;
9use crate::price_oracle::hyperliquid_oracle::HyperliquidMarkPriceOracle;
10use crate::read_cache::greeks::GreeksCache;
11use crate::read_cache::portfolio::PortfolioCache;
12use crate::rsm::engine_state_snapshot::EngineStateSnapshot;
13use crate::rsm::ledger::BalanceLedger;
14use crate::shared::order_types::ParsedSymbol;
15use crate::standard_margin::StandardAccountBuilder;
16use crate::types::Config;
17use hypercall_db_diesel::DatabaseHandler;
18use hypercall_engine::order_index::EngineOrderIndex;
19use hypercall_engine::OrderBook;
20use hypercall_types::TradingModes;
21use hypercall_types::WalletAddress;
22use rust_decimal::prelude::ToPrimitive;
23use rust_decimal::Decimal;
24use serde::{Deserialize, Serialize};
25use std::collections::{BTreeSet, HashMap};
26use std::sync::atomic::{AtomicI64, Ordering};
27use std::sync::Arc;
28use tracing::{info, warn};
29
30pub use hypercall_engine::mmp::{EngineMmpState, MmpFillRecord};
32pub use hypercall_engine::position::EnginePosition;
33
34#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
35pub struct DepositUpdateWatermark {
36 #[serde(default)]
37 pub sequence: Option<u64>,
38 pub timestamp_ms: u64,
39 pub balance_after: Decimal,
40 #[serde(default)]
41 pub source: BalanceLedgerMutationSource,
42}
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
45pub enum BalanceLedgerMutationSource {
46 #[default]
47 DepositUpdate,
48 CashWithdrawal,
49}
50
51pub struct EngineDeps {
56 pub mmp_cache: Option<Arc<crate::read_cache::mmp::MmpCache>>,
58
59 pub tier_cache: Option<Arc<crate::read_cache::tier::TierCache>>,
61
62 pub portfolio_service: Option<Arc<dyn PortfolioService + Send + Sync>>,
64
65 pub portfolio_cache: Option<Arc<PortfolioCache>>,
67
68 pub risk_account_builder:
70 Option<Arc<crate::rsm::portfolio_margin::risk_account_builder::RiskAccountBuilder>>,
71
72 pub standard_account_builder: Option<Arc<StandardAccountBuilder>>,
74
75 pub greeks_cache: Option<Arc<GreeksCache>>,
77
78 pub mark_price_oracles: HashMap<String, Arc<HyperliquidMarkPriceOracle>>,
80
81 pub reference_prices: HashMap<String, f64>,
83
84 pub liquidation_cache: Option<Arc<crate::liquidator::LiquidationCache>>,
86
87 pub config: Config,
89
90 pub portfolio_margin_pool_enabled: bool,
92
93 pub portfolio_margin_settlement_allowlist: BTreeSet<WalletAddress>,
95
96 pub event_sender: tokio::sync::mpsc::UnboundedSender<hypercall_types::EngineMessage>,
99
100 pub ws_event_sender: Option<tokio::sync::mpsc::UnboundedSender<hypercall_types::EngineMessage>>,
104
105 pub margin_timestamp_s: i64,
109
110 pub liquidation_states: HashMap<WalletAddress, hypercall_types::LiquidationStateType>,
112
113 pub wallet_margin_modes: HashMap<WalletAddress, crate::rsm::margin_mode::MarginMode>,
115
116 pub mmp_enabled: HashMap<(WalletAddress, String), bool>,
118
119 pub wallet_trading_limits: HashMap<WalletAddress, hypercall_types::api_models::TradingLimits>,
122
123 pub default_trading_limits: hypercall_types::api_models::TradingLimits,
125
126 pub wallet_tiers: HashMap<WalletAddress, String>,
129
130 pub perp_positions: HashMap<(String, String), crate::hypercore::PerpPosition>,
133
134 pub hypercore_account_equity: HashMap<WalletAddress, Decimal>,
139
140 pub hypercore_equity_timestamps: HashMap<WalletAddress, u64>,
144}
145
146impl EngineDeps {
147 pub fn emit_event(&self, event: &hypercall_types::EngineMessage) {
152 let _ = self.event_sender.send(event.clone());
153 if let Some(ref ws_tx) = self.ws_event_sender {
154 let _ = ws_tx.send(event.clone());
155 }
156 }
157}
158
159pub fn classify_rejection_reason(reason: &str) -> &'static str {
163 hypercall_engine::admission::classify_rejection_reason(reason)
164}
165
166pub fn engine_positions_to_portfolio_balance(
171 positions: &HashMap<(WalletAddress, String), EnginePosition>,
172 wallet: &WalletAddress,
173 reference_prices: &HashMap<String, f64>,
174) -> crate::portfolio::PortfolioBalance {
175 use crate::portfolio::{PortfolioBalance, PositionData};
176 use rust_decimal_macros::dec;
177
178 let mut pos_map = HashMap::new();
179 for ((w, symbol), pos) in positions {
180 if w != wallet {
181 continue;
182 }
183 let upnl = if symbol.ends_with("-PERP") {
184 let underlying = symbol.split('-').next().unwrap_or(symbol);
185 if let Some(&spot) = reference_prices.get(underlying) {
186 let spot_dec = rust_decimal::Decimal::try_from(spot).unwrap_or(dec!(0));
187 (spot_dec - pos.entry_price) * pos.quantity
188 } else {
189 dec!(0)
190 }
191 } else {
192 dec!(0)
193 };
194 pos_map.insert(
195 symbol.clone(),
196 PositionData {
197 symbol: symbol.clone(),
198 amount: pos.quantity,
199 entry_price: pos.entry_price,
200 margin_posted: dec!(0),
201 realized_pnl: dec!(0),
202 unrealized_pnl: upnl,
203 },
204 );
205 }
206 PortfolioBalance {
207 positions: pos_map,
208 total_margin_used: dec!(0),
209 }
210}
211
212pub fn apply_fill_to_positions(
217 positions: &mut HashMap<(WalletAddress, String), EnginePosition>,
218 wallet: WalletAddress,
219 symbol: String,
220 signed_qty: rust_decimal::Decimal,
221 fill_price: rust_decimal::Decimal,
222) {
223 use rust_decimal_macros::dec;
224
225 let key = (wallet, symbol);
226 let pos = positions.entry(key.clone()).or_insert(EnginePosition {
227 quantity: dec!(0),
228 entry_price: dec!(0),
229 });
230
231 pos.apply_fill(signed_qty, fill_price);
232
233 if pos.quantity == dec!(0) {
234 positions.remove(&key);
235 }
236}
237
238pub fn apply_option_deposit_to_positions(
246 positions: &mut HashMap<(WalletAddress, String), EnginePosition>,
247 wallet: WalletAddress,
248 symbol: String,
249 quantity: rust_decimal::Decimal,
250) {
251 use rust_decimal_macros::dec;
252
253 assert!(
254 quantity > dec!(0),
255 "RUNTIME_INVARIANT: option deposit quantity must be positive"
256 );
257
258 let key = (wallet, symbol);
259 let Some(pos) = positions.get_mut(&key) else {
260 positions.insert(
261 key,
262 EnginePosition {
263 quantity,
264 entry_price: dec!(0),
265 },
266 );
267 return;
268 };
269
270 let old_qty = pos.quantity;
271 let old_entry = pos.entry_price;
272 let new_qty = old_qty + quantity;
273
274 if new_qty == dec!(0) {
275 positions.remove(&key);
276 } else {
277 pos.quantity = new_qty;
278 pos.entry_price = if old_qty > dec!(0) {
279 old_entry
280 } else if new_qty < dec!(0) {
281 old_entry
282 } else {
283 dec!(0)
284 };
285 }
286}
287
288#[cfg(test)]
289mod tests {
290 use super::*;
291 use alloy::primitives::Address;
292 use rust_decimal_macros::dec;
293
294 fn wallet(byte: u8) -> WalletAddress {
295 WalletAddress::from(Address::repeat_byte(byte))
296 }
297
298 #[test]
299 fn option_deposit_preserves_existing_long_cost_basis() {
300 let wallet = wallet(1);
301 let symbol = "BTC-20260130-100000-C".to_string();
302 let mut positions = HashMap::from([(
303 (wallet, symbol.clone()),
304 EnginePosition {
305 quantity: dec!(1),
306 entry_price: dec!(100),
307 },
308 )]);
309
310 apply_option_deposit_to_positions(&mut positions, wallet, symbol.clone(), dec!(1));
311
312 let position = positions
313 .get(&(wallet, symbol))
314 .expect("position should remain open");
315 assert_eq!(position.quantity, dec!(2));
316 assert_eq!(position.entry_price, dec!(100));
317 }
318}
319
320pub fn apply_option_withdrawal_to_positions(
325 positions: &mut HashMap<(WalletAddress, String), EnginePosition>,
326 wallet: WalletAddress,
327 symbol: String,
328 quantity: rust_decimal::Decimal,
329) -> Result<(), String> {
330 use rust_decimal_macros::dec;
331
332 if quantity <= dec!(0) {
333 return Err("option withdrawal quantity must be positive".to_string());
334 }
335
336 let key = (wallet, symbol);
337 let Some(pos) = positions.get_mut(&key) else {
338 return Err("insufficient option balance for withdrawal".to_string());
339 };
340 if pos.quantity < quantity {
341 return Err(format!(
342 "insufficient option balance for withdrawal: have {}, need {}",
343 pos.quantity, quantity
344 ));
345 }
346
347 pos.quantity -= quantity;
348 if pos.quantity == dec!(0) {
349 positions.remove(&key);
350 }
351 Ok(())
352}
353
354pub struct EngineCtx {
359 pub orderbooks: HashMap<String, OrderBook>,
362
363 pub next_order_id: u64,
365
366 pub next_trade_id: u64,
368
369 pub l2_update_seq: Arc<AtomicI64>,
371
372 pub expired_instruments: HashMap<String, bool>,
375
376 pub spot_prices: HashMap<String, rust_decimal::Decimal>,
381
382 pub iv_surfaces: HashMap<String, crate::vol_oracle::vol_surface_cache::VolatilitySurface>,
385
386 pub iv_source_timestamps: HashMap<String, i64>,
389
390 pub instrument_trading_modes: HashMap<String, TradingModes>,
393
394 pub order_index: EngineOrderIndex,
396
397 pub engine_positions: HashMap<(WalletAddress, String), EnginePosition>,
403
404 pub balance_ledger: BalanceLedger,
409
410 pub deposit_update_watermarks: HashMap<WalletAddress, DepositUpdateWatermark>,
414
415 pub applied_deposit_source_event_hashes: BTreeSet<alloy::primitives::FixedBytes<32>>,
434
435 pub pm_settlement_state: crate::rsm::portfolio_margin::settlement_state::PmSettlementState,
440
441 pub mmp_state: HashMap<(WalletAddress, String), EngineMmpState>,
446
447 pub agent_authorizations: HashMap<WalletAddress, HashMap<WalletAddress, Option<u64>>>,
452
453 pub nonce_sets: HashMap<WalletAddress, hypercall_engine::BoundedNonceSet>,
460
461 pub rsm_signer_nonces: HashMap<WalletAddress, u64>,
465
466 pub deps: EngineDeps,
468
469 pub db: Option<DatabaseHandler>,
471}
472
473impl EngineCtx {
474 fn symbol_is_expired_by_wall_clock(symbol: &str) -> Option<bool> {
475 let parsed = match ParsedSymbol::from_symbol(symbol) {
476 Ok(parsed) => parsed,
477 Err(error) => {
478 warn!(
479 "Snapshot expired instrument {} has invalid symbol format: {}",
480 symbol, error
481 );
482 return None;
483 }
484 };
485 let expiry_ts =
486 hypercall_types::expiry_date_to_timestamp(&parsed.underlying, parsed.expiry);
487 if expiry_ts == 0 {
488 warn!(
489 "Snapshot expired instrument {} has invalid expiry code {}",
490 symbol, parsed.expiry
491 );
492 return None;
493 }
494 let expiry_ts = u64::try_from(expiry_ts).unwrap_or_else(|_| {
495 panic!(
496 "STATE_CORRUPTION: expiry timestamp {} for {} is negative",
497 expiry_ts, symbol
498 )
499 });
500 let now = crate::shared::clock::unix_now_secs();
501 Some(now >= expiry_ts)
502 }
503
504 pub fn restore_from_snapshot(&mut self, snapshot: &EngineStateSnapshot) {
510 self.next_order_id = snapshot.next_order_id;
512 self.next_trade_id = snapshot.next_trade_id;
513 self.l2_update_seq
514 .store(snapshot.last_l2_seq, Ordering::SeqCst);
515
516 let snapshot_symbols: BTreeSet<String> = snapshot.orderbooks.keys().cloned().collect();
518 let engine_symbols: BTreeSet<String> = self.orderbooks.keys().cloned().collect();
519 let snapshot_only: Vec<String> = snapshot_symbols
520 .difference(&engine_symbols)
521 .cloned()
522 .collect();
523 let snapshot_only_with_orders: Vec<String> = snapshot_only
524 .iter()
525 .filter_map(|symbol| {
526 snapshot
527 .orderbooks
528 .get(symbol)
529 .filter(|entries| !entries.is_empty())
530 .map(|_| symbol.clone())
531 })
532 .collect();
533 if !snapshot_only_with_orders.is_empty() {
534 let orphaned_order_count: usize = snapshot_only_with_orders
535 .iter()
536 .filter_map(|s| snapshot.orderbooks.get(s).map(|e| e.len()))
537 .sum();
538 warn!(
539 "Dropping {} orphaned orders across {} expired/unknown symbols from snapshot: {:?}",
540 orphaned_order_count,
541 snapshot_only_with_orders.len(),
542 snapshot_only_with_orders
543 );
544 }
545 if !snapshot_only.is_empty() {
546 warn!(
547 "Snapshot contains symbols not present in current engine orderbooks; skipping restore for {:?}",
548 snapshot_only
549 );
550 }
551
552 let mut total_orders = 0usize;
553 for (symbol, orderbook) in &mut self.orderbooks {
554 let orders: Vec<_> = snapshot
555 .orderbooks
556 .get(symbol)
557 .map(|entries| {
558 entries
559 .iter()
560 .map(|e| {
561 hypercall_engine::OrderRecord {
562 order_id: e.order_id,
563 price: e.price,
564 quantity: e.quantity,
565 side: e.side,
566 wallet: e.wallet,
567 timestamp: e.timestamp,
568 client_id: e.client_id.clone(),
569 mmp_enabled: e.mmp_enabled,
570 original_size: e.original_size.unwrap_or(e.quantity),
572 }
573 })
574 .collect()
575 })
576 .unwrap_or_default();
577 total_orders += orders.len();
578 orderbook.restore_from_orders(orders);
579 }
580
581 self.order_index.rebuild_from_orderbooks(&self.orderbooks);
583
584 let db_recovered_expired = self.expired_instruments.clone();
588 let mut restored_expired = HashMap::new();
589 let mut dropped_stale_expired = 0usize;
590 for (symbol, expired) in &snapshot.expired_instruments {
591 if !*expired {
592 continue;
593 }
594 if db_recovered_expired.get(symbol) == Some(&true) {
595 restored_expired.insert(symbol.clone(), true);
596 continue;
597 }
598 match Self::symbol_is_expired_by_wall_clock(symbol) {
599 Some(true) | None => {
600 restored_expired.insert(symbol.clone(), true);
601 }
602 Some(false) => {
603 dropped_stale_expired += 1;
604 }
605 }
606 }
607 if dropped_stale_expired > 0 {
608 warn!(
609 "Dropped {} stale expired instrument flag(s) from engine state snapshot",
610 dropped_stale_expired
611 );
612 }
613 self.expired_instruments = restored_expired;
614
615 self.spot_prices = snapshot.spot_prices.clone();
619 self.deps.reference_prices = snapshot
620 .spot_prices
621 .iter()
622 .filter_map(|(underlying, price)| {
623 price
624 .to_f64()
625 .map(|price| (underlying.clone(), price))
626 .or_else(|| {
627 warn!(
628 underlying = %underlying,
629 price = %price,
630 "Skipping restored spot price that cannot be represented as f64"
631 );
632 None
633 })
634 })
635 .collect();
636 self.iv_surfaces = snapshot
637 .iv_surfaces
638 .clone()
639 .into_iter()
640 .map(|(underlying, surface)| (underlying, surface.into_surface()))
641 .collect();
642 self.iv_source_timestamps = snapshot.iv_source_timestamps.clone();
643 if snapshot.instrument_trading_modes.is_empty() && !self.instrument_trading_modes.is_empty()
644 {
645 warn!(
646 seeded_modes = self.instrument_trading_modes.len(),
647 "Preserving DB-seeded instrument trading modes because snapshot has none"
648 );
649 } else {
650 self.instrument_trading_modes = snapshot.instrument_trading_modes.clone();
651 }
652
653 self.engine_positions = snapshot.engine_positions.clone();
655 self.mmp_state = snapshot.mmp_state.clone();
656 self.deps.liquidation_states = snapshot.liquidation_states.clone();
657 if snapshot.wallet_margin_modes.is_empty() && !self.deps.wallet_margin_modes.is_empty() {
658 warn!(
659 seeded_modes = self.deps.wallet_margin_modes.len(),
660 "Preserving tier-cache seeded wallet margin modes because snapshot has none"
661 );
662 } else {
663 self.deps.wallet_margin_modes = snapshot.wallet_margin_modes.clone();
664 }
665 self.deps.mmp_enabled = snapshot.mmp_enabled.clone();
666 if snapshot.wallet_trading_limits.is_empty()
667 && snapshot.wallet_tiers.is_empty()
668 && !self.deps.wallet_trading_limits.is_empty()
669 {
670 warn!(
671 seeded_limits = self.deps.wallet_trading_limits.len(),
672 seeded_tiers = self.deps.wallet_tiers.len(),
673 "Preserving tier-cache seeded trading limits because snapshot has none"
674 );
675 } else {
676 self.deps.wallet_trading_limits = snapshot.wallet_trading_limits.clone();
677 self.deps.default_trading_limits = snapshot.default_trading_limits;
678 self.deps.wallet_tiers = snapshot.wallet_tiers.clone();
679 }
680
681 self.balance_ledger = BalanceLedger::from_map_with_sequence(
684 snapshot.balance_ledger.clone(),
685 snapshot.last_balance_update_seq,
686 );
687 info!(
688 balance_wallet_count = self.balance_ledger.len(),
689 snapshot_last_command_id = snapshot.last_command_id,
690 snapshot_last_l2_seq = snapshot.last_l2_seq,
691 "Restored balance_ledger from engine state snapshot"
692 );
693
694 self.deposit_update_watermarks = snapshot.deposit_update_watermarks.clone();
696 self.applied_deposit_source_event_hashes =
697 snapshot.applied_deposit_source_event_hashes.clone();
698 self.pm_settlement_state = snapshot.pm_settlement_state.clone();
699
700 self.agent_authorizations = snapshot
702 .agent_authorizations
703 .iter()
704 .map(|(wallet, agents)| {
705 let map: HashMap<WalletAddress, Option<u64>> = agents
706 .iter()
707 .map(|(agent, expires)| (*agent, *expires))
708 .collect();
709 (*wallet, map)
710 })
711 .collect();
712 self.nonce_sets.clear();
714 for (signer, nonces_vec) in &snapshot.nonce_sets {
715 self.nonce_sets.insert(
716 *signer,
717 hypercall_engine::BoundedNonceSet::from_vec(
718 nonces_vec.clone(),
719 hypercall_engine::nonce::DEFAULT_NONCE_SET_CAPACITY,
720 ),
721 );
722 }
723 for (wallet, watermark) in &snapshot.nonce_watermarks {
725 self.nonce_sets.entry(*wallet).or_insert_with(|| {
726 let mut set = hypercall_engine::BoundedNonceSet::new(
727 hypercall_engine::nonce::DEFAULT_NONCE_SET_CAPACITY,
728 );
729 set.insert(*watermark);
730 set
731 });
732 }
733 self.rsm_signer_nonces = snapshot.rsm_signer_nonces.clone();
734
735 self.deps.hypercore_account_equity = snapshot.hypercore_account_equity.clone();
737 self.deps.hypercore_equity_timestamps = snapshot.hypercore_equity_timestamps.clone();
738 self.deps.perp_positions = snapshot.perp_positions.clone();
739
740 info!(
741 "Restored engine state from snapshot: next_order_id={}, next_trade_id={}, \
742 l2_seq={}, orderbooks={}, orders={}, expired={}, hypercore_equity_wallets={}",
743 self.next_order_id,
744 self.next_trade_id,
745 snapshot.last_l2_seq,
746 snapshot.orderbooks.len(),
747 total_orders,
748 self.expired_instruments.len(),
749 self.deps.hypercore_account_equity.len(),
750 );
751 }
752}