pub struct UnifiedEngine {Show 41 fields
pub(crate) ctx: EngineCtx,
margin_manager: MarginManager,
expiry_manager: ExpiryManager,
order_receiver: Receiver<UnifiedEngineRequest>,
market_receiver: Receiver<MarketRequest>,
pub(crate) rfq_receiver: Option<Receiver<RfqExecuteRequest>>,
pub(crate) deposit_receiver: Option<Receiver<DepositRequest>>,
pub(crate) option_deposit_receiver: Option<Receiver<OptionDepositRequest>>,
pub(crate) option_withdrawal_receiver: Option<Receiver<OptionWithdrawalRequest>>,
pub(crate) cash_withdrawal_receiver: Option<Receiver<CashWithdrawalRequest>>,
pub(crate) liquidation_bonus_receiver: Option<Receiver<LiquidationBonusRequest>>,
pub(crate) margin_mode_receiver: Option<Receiver<MarginModeUpdateRequest>>,
pub(crate) agent_auth_receiver: Option<Receiver<AgentAuthRequest>>,
pub(crate) nonce_check_receiver: Option<Receiver<NonceCheckRequest>>,
pub(crate) tier_update_receiver: Option<Receiver<TierUpdateRequest>>,
pub(crate) hypercore_equity_receiver: Option<Receiver<HypercoreEquityRequest>>,
pub(crate) pm_settlement_admin_receiver: Option<Receiver<PmSettlementAdminRequest>>,
pub(crate) quiesce_receiver: Option<Receiver<EngineQuiesceRequest>>,
pub(crate) trading_mode_receiver: Option<Receiver<HashMap<String, TradingModes>>>,
shutdown_receiver: Receiver<()>,
fee_service: FeeService,
snapshot_interval: Duration,
read_snapshot_interval: Duration,
post_startup_reconcile_delay: Duration,
last_snapshot: Instant,
wal_path: Option<PathBuf>,
sync_status: Arc<SyncStatus>,
journal_writer: Option<SharedEngineJournalWriter>,
journal_batch_sender: Option<JournalBatchSender>,
nats_publisher: Option<NatsPublisher>,
nats_balance_update_publisher: Option<NatsBalanceUpdatePublisher>,
known_request_ids: BoundedIdempotenceCache,
read_snapshot: Arc<ArcSwap<EngineSnapshot>>,
post_startup_reconciled: bool,
startup_replayed_events: Vec<(String, Vec<EngineMessage>)>,
replay_checkpoint: WalCheckpointMetadata,
runtime_quiesced: bool,
snapshot_loaded: bool,
startup_expired_order_cancels: Vec<StartupExpiredOrderCancel>,
external_vol_oracle: Option<SharedVolOracle>,
engine_iv_surfaces: Arc<RwLock<HashMap<String, VolatilitySurface>>>,
}Expand description
The unified engine that combines order matching and margin calculations.
Margin calculations for order acceptance are delegated to MarginService (SpanMarginService implementation). The engine is responsible for:
- Building hypothetical accounts that include open orders + proposed orders
- Calling MarginService to compute margin requirements
- Accepting/rejecting orders based on available collateral
Fields§
§ctx: EngineCtx§margin_manager: MarginManagerMargin checking logic (SPAN + Standard).
expiry_manager: ExpiryManagerExpiry and settlement management.
order_receiver: Receiver<UnifiedEngineRequest>§market_receiver: Receiver<MarketRequest>§rfq_receiver: Option<Receiver<RfqExecuteRequest>>§deposit_receiver: Option<Receiver<DepositRequest>>§option_deposit_receiver: Option<Receiver<OptionDepositRequest>>§option_withdrawal_receiver: Option<Receiver<OptionWithdrawalRequest>>§cash_withdrawal_receiver: Option<Receiver<CashWithdrawalRequest>>§liquidation_bonus_receiver: Option<Receiver<LiquidationBonusRequest>>§margin_mode_receiver: Option<Receiver<MarginModeUpdateRequest>>§agent_auth_receiver: Option<Receiver<AgentAuthRequest>>§nonce_check_receiver: Option<Receiver<NonceCheckRequest>>§tier_update_receiver: Option<Receiver<TierUpdateRequest>>§hypercore_equity_receiver: Option<Receiver<HypercoreEquityRequest>>§pm_settlement_admin_receiver: Option<Receiver<PmSettlementAdminRequest>>§quiesce_receiver: Option<Receiver<EngineQuiesceRequest>>§trading_mode_receiver: Option<Receiver<HashMap<String, TradingModes>>>Live notify channel from catalog_manager::manager for
per-underlying TradingModes updates. When the catalog
manager rewrites any row in the instruments.trading_mode
column, it publishes the full current underlying → mode
map here. The main loop changed()s on it and calls
apply_underlying_trading_mode_update.
shutdown_receiver: Receiver<()>§fee_service: FeeService§snapshot_interval: Duration§read_snapshot_interval: Duration§post_startup_reconcile_delay: Duration§last_snapshot: Instant§wal_path: Option<PathBuf>§sync_status: Arc<SyncStatus>Sync status for readiness tracking.
journal_writer: Option<SharedEngineJournalWriter>Durable engine journal for restart-safe ACK and idempotency.
journal_batch_sender: Option<JournalBatchSender>Batched journal sender for async PostgreSQL writes.
nats_publisher: Option<NatsPublisher>NATS publisher for real-time command replication to standby instances.
nats_balance_update_publisher: Option<NatsBalanceUpdatePublisher>NATS publisher for canonical balance update follower replication.
known_request_ids: BoundedIdempotenceCacheKnown request_ids for fast idempotency checks (bounded to prevent memory leaks).
read_snapshot: Arc<ArcSwap<EngineSnapshot>>ArcSwap snapshot for lock-free reads by API handlers.
post_startup_reconciled: boolWhether post-startup reconciliation has been performed. After SIGKILL, the journal batcher needs a few seconds to materialize pending terminal order statuses into order_infos. This flag ensures we re-check for ghost orders once after catchup.
startup_replayed_events: Vec<(String, Vec<EngineMessage>)>Runtime events produced during synchronous startup journal replay.
These must be projected before start() hydrates engine positions from
PortfolioService, otherwise recovered TickExpiry events can leave stale
portfolio positions that resurrect settled engine positions.
replay_checkpoint: WalCheckpointMetadataWAL checkpoint metadata loaded at startup and used to bound replay windows.
runtime_quiesced: boolWhen true, the engine has crossed an admin drain barrier. Live mutation sources remain parked until an explicit resume request.
snapshot_loaded: boolWhether an engine state snapshot was successfully loaded during startup. When true, the base reconstruction window is skipped during replay.
startup_expired_order_cancels: Vec<StartupExpiredOrderCancel>Orders on expired instruments discovered during replay that need DB cancellation.
external_vol_oracle: Option<SharedVolOracle>External vol oracle reference for ingesting IV surfaces into the command stream. Stored separately from the margin_manager’s oracle so the runtime can read surfaces and feed them as IvUpdate commands.
engine_iv_surfaces: Arc<RwLock<HashMap<String, VolatilitySurface>>>Shared IV surfaces backing the EngineVolOracle. Updated by IvUpdate commands, read by the SpanMarginService during margin checks.
Implementations§
Source§impl UnifiedEngine
impl UnifiedEngine
fn validate_nonce_preview( &self, signer: &WalletAddress, nonce: Option<u64>, command_timestamp_ms: u64, preview_sets: &mut HashMap<WalletAddress, BoundedNonceSet>, ) -> Result<(), EngineError>
Sourcefn validate_and_advance_nonce(
&mut self,
signer: &WalletAddress,
nonce: Option<u64>,
command_timestamp_ms: u64,
) -> Result<(), EngineError>
fn validate_and_advance_nonce( &mut self, signer: &WalletAddress, nonce: Option<u64>, command_timestamp_ms: u64, ) -> Result<(), EngineError>
Validate a nonce against the per-signer bounded nonce set (HL model).
The nonce must be within the time bounds of the command timestamp,
not already used, and greater than the smallest nonce in the set.
Skips validation when nonce is None (engine-internal commands).
fn allocate_rsm_nonce(&mut self, signer: WalletAddress) -> u64
pub fn seed_rsm_signer_nonce_floor( &mut self, signer: WalletAddress, next_nonce: u64, )
fn validate_cash_withdrawal_margin( &self, wallet: WalletAddress, amount: Decimal, ) -> Result<(), EngineError>
fn validate_option_withdrawal_margin( &self, wallet: WalletAddress, symbol: &str, quantity: Decimal, ) -> Result<(), EngineError>
pub(super) fn apply_deposit_update_to_balance_ledger( &mut self, update_kind: &'static str, wallet: WalletAddress, amount: Decimal, timestamp_ms: u64, sequence: Option<u64>, source_event_hash: &FixedBytes<32>, output: &mut ApplyOutput, ) -> Result<(), EngineError>
pub(super) fn apply_balance_snapshot_update_to_balance_ledger( &mut self, update_kind: &'static str, wallet: WalletAddress, amount: Decimal, balance_after: Decimal, timestamp_ms: u64, sequence: Option<u64>, output: &mut ApplyOutput, ) -> Result<(), EngineError>
fn build_balance_update( &self, wallet: WalletAddress, delta: Decimal, balance_after: Decimal, reason: BalanceUpdateReason, reference_id: Option<String>, timestamp_ms: u64, ) -> BalanceUpdate
fn apply_balance_update( &mut self, update: BalanceUpdate, output: &mut ApplyOutput, ) -> Result<(), EngineError>
fn last_deposit_update( &self, wallet: &WalletAddress, ) -> Option<DepositUpdateWatermark>
pub(super) fn apply_option_deposit_update( &mut self, request_id: String, wallet: WalletAddress, symbol: String, quantity: Decimal, timestamp_ms: u64, ) -> Result<(), EngineError>
pub(super) fn apply_option_withdrawal_update( &mut self, request_id: String, wallet: WalletAddress, account: WalletAddress, signer: WalletAddress, rsm_signer: WalletAddress, symbol: String, quantity: Decimal, nonce: Option<u64>, action: Vec<u8>, timestamp_ms: u64, output: &mut ApplyOutput, ) -> Result<(), EngineError>
pub(super) fn apply_cash_withdrawal_update( &mut self, request_id: String, wallet: WalletAddress, destination: WalletAddress, signer: WalletAddress, rsm_signer: WalletAddress, amount: Decimal, amount_wei: u64, account: WalletAddress, nonce: Option<u64>, timestamp_ms: u64, output: &mut ApplyOutput, ) -> Result<Decimal, EngineError>
Sourcepub fn apply(
&mut self,
env: CommandEnvelope,
) -> Result<ApplyOutput, EngineError>
pub fn apply( &mut self, env: CommandEnvelope, ) -> Result<ApplyOutput, EngineError>
Apply a command to the engine state and return the resulting events.
This is the core state machine interface that enables:
- Reasoning about correctness as a pure state transition
- Observing “input command -> output events” per request
- Testing determinism and replay
§Contract
applymust NOT publish to the event bus, write to DB, or mutate external caches- It only mutates the engine’s in-memory state and returns
Vec<EngineMessage> - The runtime is responsible for publishing events after apply returns
§Event Ordering
Events are returned in the order they should be published. The current implementation preserves the same ordering as the direct-send approach.
pub(super) fn account_for_fill( &mut self, fill: Fill, output: &mut ApplyOutput, ) -> EngineMessage
pub(super) fn account_for_events( &mut self, events: Vec<EngineMessage>, output: &mut ApplyOutput, ) -> Vec<EngineMessage>
pub(super) fn drain_orderbook_events(&mut self) -> Vec<EngineMessage>
Sourcepub(super) fn process_expiry_tick_collecting(
&mut self,
now_ms: u64,
context: TickExpiryContext,
output: &mut ApplyOutput,
response_market_symbol: Option<&str>,
) -> Result<Option<MarketUpdateMessage>, EngineError>
pub(super) fn process_expiry_tick_collecting( &mut self, now_ms: u64, context: TickExpiryContext, output: &mut ApplyOutput, response_market_symbol: Option<&str>, ) -> Result<Option<MarketUpdateMessage>, EngineError>
Process expiry tick using runtime-provided context.
Source§impl UnifiedEngine
impl UnifiedEngine
pub(super) fn attach_fill_underlying_notional(&self, fill: &mut Fill)
Source§impl UnifiedEngine
impl UnifiedEngine
Sourcepub(super) async fn process_order_journaled(
&mut self,
request: UnifiedEngineRequest,
)
pub(super) async fn process_order_journaled( &mut self, request: UnifiedEngineRequest, )
Process an order with durable journaling.
This method provides:
- Idempotency: duplicate request_id returns cached response
- Restart-safe ACK: response only sent after DB commit
- Full event capture for replay
The flow is:
- Check journal for existing request_id (idempotency)
- If found: return cached response
- If new: process order, journal transition, then ACK
When batch_sender is set, journaling is async (push to channel, ACK immediately). When batch_sender is not set, falls back to synchronous journaling.
Sourceasync fn process_order_journaled_impl(
&mut self,
request: UnifiedEngineRequest,
req_id: &str,
req_uuid: Uuid,
start: Instant,
strategy: JournalStrategy,
)
async fn process_order_journaled_impl( &mut self, request: UnifiedEngineRequest, req_id: &str, req_uuid: Uuid, start: Instant, strategy: JournalStrategy, )
Process order with journaling using the specified strategy.
Sourcefn serialize_events_for_journal(
&self,
events: &[EngineMessage],
) -> Vec<EventPayload>
fn serialize_events_for_journal( &self, events: &[EngineMessage], ) -> Vec<EventPayload>
Serialize events for journal. Produces wire-format bytes (version byte + msgpack) along with topic, key, and l2_sequence. These bytes are stored directly in the DB, avoiding double deserialization.
Sourcepub(crate) async fn apply_portfolio_projection_events_sync(
&self,
events: &[EngineMessage],
req_id: &str,
) -> Vec<PendingNotifications>
pub(crate) async fn apply_portfolio_projection_events_sync( &self, events: &[EngineMessage], req_id: &str, ) -> Vec<PendingNotifications>
Apply captured fill events to PortfolioCache synchronously.
This guarantees that the next margin check sees post-fill portfolio state without waiting for event-bus round trips.
Sourceasync fn send_portfolio_notifications(
&self,
pending_notifications: Vec<PendingNotifications>,
)
async fn send_portfolio_notifications( &self, pending_notifications: Vec<PendingNotifications>, )
Send portfolio notifications after the projection barrier has been released. This keeps WebSocket sends off the hot mutation path.
pub(crate) async fn apply_replayed_events_sync( &mut self, events: &[EngineMessage], req_id: &str, )
pub(crate) async fn apply_startup_replayed_events_sync( &mut self, events: &[EngineMessage], req_id: &str, )
Sourcepub(super) async fn process_rfq_journaled(
&mut self,
request: RfqExecuteRequest,
fill_id: String,
captured_events: Vec<EngineMessage>,
captured_balance_updates: Vec<BalanceUpdate>,
mmp_updates: Vec<MmpFillUpdate>,
)
pub(super) async fn process_rfq_journaled( &mut self, request: RfqExecuteRequest, fill_id: String, captured_events: Vec<EngineMessage>, captured_balance_updates: Vec<BalanceUpdate>, mmp_updates: Vec<MmpFillUpdate>, )
Process an RFQ execution with durable journaling.
Mirrors process_order_journaled but for RfqExecuteCommand. The
caller (handle_rfq_execute) has already produced the planned
Vec<EngineMessage> (per-leg OrderFilleds + summary RfqFilled)
via the pure plan_rfq_execution function — this method is
responsible for:
- Translating engine-emitted
FillAccountinginto journal fill side effects. - Serializing the command + events as a journal entry tagged
CommandType::RfqExecute. - Pushing to the journal batcher (or sync writer) and waiting for the durable commit ACK.
- Applying the portfolio projection under barrier (uses
apply_portfolio_projection_events_sync, the same function the orderbook path uses). - Emitting the events to the WS bus AFTER the journal commit, so nothing visible to the client/UI happens before durability.
- Firing post-journal MMP updates (which are intentionally non-durable side effects).
- Sending the
RfqExecuteResult::Success { fill_id }response.
On any journal failure this panics rather than returning, matching the orderbook journaling semantics — there is no safe way to continue running with mutated-but-unlogged state.
Source§impl UnifiedEngine
impl UnifiedEngine
pub(super) fn apply_market_action( &mut self, command: MarketActionCommand, timestamp: u64, output: &mut ApplyOutput, ) -> Result<(), EngineError>
fn manual_expiry_settled(effects: &[ExpiryEffect], symbol: &str) -> bool
fn create_market_in_memory( &mut self, symbol: String, strike: Decimal, is_call: bool, underlying_symbol: String, expiry: u32, output: &mut ApplyOutput, ) -> Result<(), String>
fn delete_market_in_memory( &mut self, symbol: &str, output: &mut ApplyOutput, ) -> Result<(), String>
Sourcepub fn create_market(
&mut self,
symbol: String,
strike: Decimal,
is_call: bool,
underlying_symbol: String,
expiry: u32,
) -> Result<String, String>
pub fn create_market( &mut self, symbol: String, strike: Decimal, is_call: bool, underlying_symbol: String, expiry: u32, ) -> Result<String, String>
Create a new market (orderbook) for trading
Sourcepub fn delete_market(&mut self, symbol: String) -> Result<String, String>
pub fn delete_market(&mut self, symbol: String) -> Result<String, String>
Delete an existing market (orderbook)
Sourcepub fn apply_underlying_trading_mode_update(
&mut self,
update: &HashMap<String, TradingModes>,
)
pub fn apply_underlying_trading_mode_update( &mut self, update: &HashMap<String, TradingModes>, )
Apply a HashMap<underlying, TradingModes> update from the
catalog manager’s notify channel. For every instrument the
engine currently knows about, if its underlying is in the
update map, rewrite the in-memory instrument_trading_modes
entry to the new mode. Instruments whose underlying isn’t in
the map are left untouched.
This closes the “catalog flip drifts for 60s” gap: prior to
this hook, the engine’s instrument_trading_modes only got
populated on recovery + CreateMarket, so a live catalog rewrite
(e.g. GOLD flipped from orderbook to rfq) would not be
observable to plan_rfq_execution or the order admission path
until the engine next restarted. The catalog manager sends the
new map on a tokio::sync::watch channel whenever it updates
any row, and the engine main loop calls this helper.
Source§impl UnifiedEngine
impl UnifiedEngine
Sourcefn get_next_order_id(&mut self) -> u64
fn get_next_order_id(&mut self) -> u64
Get the next order ID from the global sequence counter
Sourcepub(super) fn validate_order(
&self,
order_info: &OrderInfo,
wallet: &WalletAddress,
) -> Result<(), String>
pub(super) fn validate_order( &self, order_info: &OrderInfo, wallet: &WalletAddress, ) -> Result<(), String>
Validate order: check expiry, orderbook existence, and account funds.
Sourcepub(super) fn check_order_restrictions(
&self,
order_info: &OrderInfo,
wallet: &WalletAddress,
) -> Result<(), String>
pub(super) fn check_order_restrictions( &self, order_info: &OrderInfo, wallet: &WalletAddress, ) -> Result<(), String>
Check order restrictions: pre-liquidation, margin, and tier.
Sourcepub(super) async fn allocate_and_ack(
&mut self,
request: &UnifiedEngineRequest,
order_info: &OrderInfo,
wallet: &WalletAddress,
timestamp: u64,
) -> AllocatedOrder
pub(super) async fn allocate_and_ack( &mut self, request: &UnifiedEngineRequest, order_info: &OrderInfo, wallet: &WalletAddress, timestamp: u64, ) -> AllocatedOrder
Allocate order ID and send Acked response.
pub(super) fn allocate_order_output( &mut self, message: &OrderActionMessage, order_info: &OrderInfo, wallet: &WalletAddress, timestamp: u64, output: &mut ApplyOutput, ) -> u64
Sourcepub(super) async fn execute_matching(
&mut self,
order_id: u64,
order_info: &OrderInfo,
wallet: &WalletAddress,
timestamp: u64,
) -> MatchingResult
pub(super) async fn execute_matching( &mut self, order_id: u64, order_info: &OrderInfo, wallet: &WalletAddress, timestamp: u64, ) -> MatchingResult
Execute orderbook matching with MMP and self-trade handling.
pub(super) fn execute_matching_sync( &mut self, order_id: u64, order_info: &OrderInfo, wallet: &WalletAddress, timestamp: u64, output: &mut ApplyOutput, ) -> MatchingResult
fn fill_from_match_result( match_result: MatchResult, ) -> Result<Option<Fill>, SelfTradeMatch>
fn mmp_should_trigger( &mut self, wallet: &WalletAddress, underlying_symbol: &str, fill: &Fill, timestamp: u64, ) -> bool
fn record_fill_metrics(result: &MatchingResult)
Sourcepub(super) async fn finalize_order(
&mut self,
request: &UnifiedEngineRequest,
order_id: u64,
order_info: &OrderInfo,
wallet: &WalletAddress,
timestamp: u64,
result: &MatchingResult,
)
pub(super) async fn finalize_order( &mut self, request: &UnifiedEngineRequest, order_id: u64, order_info: &OrderInfo, wallet: &WalletAddress, timestamp: u64, result: &MatchingResult, )
Finalize order: determine status, record metrics, emit events.
pub(super) fn finalize_order_output( &mut self, message: &OrderActionMessage, order_id: u64, order_info: &OrderInfo, wallet: &WalletAddress, timestamp: u64, result: &MatchingResult, output: &mut ApplyOutput, )
pub(super) fn flush_orderbook_events(&mut self, output: &mut ApplyOutput)
pub(super) fn handle_mmp_trigger_output( &mut self, wallet: &WalletAddress, underlying_symbol: &str, trigger_reason: String, timestamp: u64, output: &mut ApplyOutput, )
pub(super) fn handle_self_trade_prevention_output( &mut self, message: &OrderActionMessage, taker_order_id: u64, maker_order_id: u64, order_info: &OrderInfo, wallet: &WalletAddress, partial_fills: &[Fill], timestamp: u64, output: &mut ApplyOutput, )
Sourcepub(super) fn check_order_limits(
&self,
wallet: &WalletAddress,
order_info: &OrderInfo,
) -> Result<(), String>
pub(super) fn check_order_limits( &self, wallet: &WalletAddress, order_info: &OrderInfo, ) -> Result<(), String>
Check if order would exceed open order or position limits.
Enforces per-tier limits on:
max_open_orders: Maximum number of active (open/acked/partial) ordersmax_open_positions: Maximum number of distinct instruments with positions
Returns Ok(()) if within limits, Err(reason) if limits exceeded.
pub(crate) fn check_pm_settlement_pool_gate_for_orders( &self, wallet: &WalletAddress, orders: &[OrderInfo], ) -> Result<(), String>
pub(crate) fn check_pm_settlement_pool_gate_for_order_groups( &self, order_groups: &[(&WalletAddress, &[OrderInfo])], ) -> Result<(), String>
fn pm_current_short_option_oi_usdc( &self, underlying: &str, ) -> Result<Decimal, String>
fn is_pm_settlement_pool_eligible_wallet( &self, wallet: &WalletAddress, ) -> Result<bool, String>
fn pm_risk_increasing_short_option_exposure_usdc( &self, wallet: &WalletAddress, orders: &[OrderInfo], ) -> Result<BTreeMap<String, Decimal>, String>
fn pm_resting_open_sell_quantity( &self, wallet: &WalletAddress, symbol: &str, ) -> Decimal
Source§impl UnifiedEngine
impl UnifiedEngine
pub(super) fn apply_order_action( &mut self, message: OrderActionMessage, timestamp: u64, output: &mut ApplyOutput, )
fn apply_create_order( &mut self, message: OrderActionMessage, timestamp: u64, output: &mut ApplyOutput, )
fn apply_perp_order( &mut self, message: OrderActionMessage, timestamp: u64, output: &mut ApplyOutput, )
fn apply_cancel_order( &mut self, message: OrderActionMessage, timestamp: u64, output: &mut ApplyOutput, )
fn apply_replace_order( &mut self, message: OrderActionMessage, timestamp: u64, output: &mut ApplyOutput, )
fn push_order_rejection( &mut self, message: OrderActionMessage, reason: String, timestamp: u64, output: &mut ApplyOutput, )
fn push_order_rejection_with_response( &mut self, message: OrderActionMessage, reason: String, timestamp: u64, output: &mut ApplyOutput, include_response: bool, )
pub(super) fn cancel_order_output( &mut self, message: OrderActionMessage, timestamp: u64, output: &mut ApplyOutput, include_response: bool, )
Sourceasync fn handle_create_order(
&mut self,
request: UnifiedEngineRequest,
timestamp: u64,
)
async fn handle_create_order( &mut self, request: UnifiedEngineRequest, timestamp: u64, )
Handle CREATE_ORDER action - orchestrates validation, matching, and finalization. Also used as phase 2 of ReplaceOrder (after the cancel phase).
Sourcepub(super) async fn process_order(&mut self, request: UnifiedEngineRequest)
pub(super) async fn process_order(&mut self, request: UnifiedEngineRequest)
Process a single order with SPAN calculations
Sourcefn lookup_symbol_by_order_id(
&self,
wallet: &WalletAddress,
order_id: u64,
) -> Option<String>
fn lookup_symbol_by_order_id( &self, wallet: &WalletAddress, order_id: u64, ) -> Option<String>
Look up a symbol for an order_id using the in-process order index.
Sourcefn lookup_by_client_id(
&self,
wallet: &WalletAddress,
client_id: &str,
) -> (Option<u64>, Option<String>)
fn lookup_by_client_id( &self, wallet: &WalletAddress, client_id: &str, ) -> (Option<u64>, Option<String>)
Look up (order_id, symbol) by client_id using the in-process order index.
Sourcepub(super) async fn process_cancel_order(
&mut self,
request: UnifiedEngineRequest,
timestamp: u64,
)
pub(super) async fn process_cancel_order( &mut self, request: UnifiedEngineRequest, timestamp: u64, )
Process a cancel order request
Sourcepub(super) async fn process_replace_order(
&mut self,
request: UnifiedEngineRequest,
timestamp: u64,
)
pub(super) async fn process_replace_order( &mut self, request: UnifiedEngineRequest, timestamp: u64, )
Process a replace order: atomically cancel an existing order, then create a new one.
The order_id of the order to cancel is in request.message.info.order_id.
The new order details (symbol, price, size, side, tif, client_id) are in request.message.info.
If the cancel fails, the entire operation is rejected and no new order is placed.
Sourcepub(super) async fn process_perp_order(
&mut self,
request: UnifiedEngineRequest,
timestamp: u64,
)
pub(super) async fn process_perp_order( &mut self, request: UnifiedEngineRequest, timestamp: u64, )
Process a perp order (margin validation only, execution happens async)
pub(super) async fn send_rejection( &self, request: UnifiedEngineRequest, reason: String, timestamp: u64, )
pub(super) async fn handle_mmp_trigger( &mut self, wallet: &WalletAddress, underlying_symbol: &str, trigger_reason: String, timestamp: u64, )
async fn mmp_cancel_order( &mut self, request: UnifiedEngineRequest, timestamp: u64, )
pub(super) async fn handle_self_trade_prevention( &mut self, request: &UnifiedEngineRequest, taker_order_id: u64, maker_order_id: u64, order_info: &OrderInfo, wallet: &WalletAddress, partial_fills: &[Fill], timestamp: u64, )
Source§impl UnifiedEngine
impl UnifiedEngine
pub(super) fn load_markets_from_db(&mut self)
Sourcepub(super) fn load_orderbook_snapshots(&mut self)
pub(super) fn load_orderbook_snapshots(&mut self)
Load WAL checkpoint metadata and prepare replay bounds.
Sourcepub(super) fn check_restored_orderbooks_for_crosses(&self)
pub(super) fn check_restored_orderbooks_for_crosses(&self)
Check restored orderbooks for crosses and log warnings. Crossed orderbooks after restore indicate a bug in the matching engine or snapshot corruption - the matching engine should never allow bid >= ask.
Sourcepub(super) fn replay_commands_from_journal(&mut self)
pub(super) fn replay_commands_from_journal(&mut self)
Replay commands from the journal to reconstruct orderbook state.
Recovery runs in two bounded windows:
- Base reconstruction: commands (0, checkpoint.last_command_id]
- Delta replay: commands (checkpoint.last_command_id, +inf)
fn replay_command_window( &mut self, start_command_id: i64, end_command_id: Option<i64>, chunk_size: i64, replay_count: &mut usize, max_replayed_order_id: &mut u64, )
fn replay_command_row( &mut self, cmd: &ReplayCommand, replay_count: &mut usize, max_replayed_order_id: &mut u64, )
fn is_expiry_replay_command(command: &EngineCommand) -> bool
fn replay_expiry_command_row( &mut self, cmd: &ReplayCommand, command: EngineCommand, )
pub(super) fn apply_legacy_agent_auth_replay_command( &mut self, command: &EngineCommand, ) -> bool
fn replay_fill_events_by_command_for_window( &mut self, start_command_id: i64, end_command_id: i64, ) -> BTreeMap<i64, Vec<(Fill, FillAccounting)>>
pub(super) fn apply_fill_events_for_replay( &mut self, fill_events: &[(Fill, FillAccounting)], )
fn replay_order_update_events_for_window( &mut self, start_command_id: i64, end_command_id: i64, )
fn decode_fill_events(raw_rows: &[Vec<u8>]) -> Vec<(Fill, FillAccounting)>
fn decode_fill_event(raw: &[u8], idx: usize) -> (Fill, FillAccounting)
fn decode_order_update_events(raw_rows: &[Vec<u8>]) -> Vec<OrderUpdateMessage>
Sourcepub(super) fn run_post_startup_reconciliation(&mut self)
pub(super) fn run_post_startup_reconciliation(&mut self)
Post-startup ghost order reconciliation.
Runs ~5s after engine start, giving the journal batcher time to flush pending terminal statuses to order_infos. Queries for any orders in the orderbook that have a terminal status and removes them. This is the sole safeguard against ghost orders from the outbox pattern crash scenario (SIGKILL between journal fsync and outbox publish).
Sourcepub(super) fn process_order_for_replay(
&mut self,
msg: &OrderActionMessage,
response: Option<&OrderUpdateMessage>,
) -> Result<(), String>
pub(super) fn process_order_for_replay( &mut self, msg: &OrderActionMessage, response: Option<&OrderUpdateMessage>, ) -> Result<(), String>
Process an order for replay purposes (no journaling, no response).
This is used during startup to replay commands from the journal after loading a snapshot. The commands are already persisted, so we just need to apply their state changes.
response_json is the original response from when this command was first
processed. It tells us the final order state (filled, open, partially_filled,
cancelled) so we can reconstruct the correct resting quantity without
re-running matching.
Sourcefn replay_create_order(
&mut self,
msg: &OrderActionMessage,
response: Option<&OrderUpdateMessage>,
) -> Result<(), String>
fn replay_create_order( &mut self, msg: &OrderActionMessage, response: Option<&OrderUpdateMessage>, ) -> Result<(), String>
Replay a create order command using the journal’s response to determine what resting state to restore.
Instead of re-running matching (which would require determinism of all inputs: timestamps, fees, oracle state, etc.), we read the response_json from the journal to determine the order’s final status and filled_size. This tells us exactly what resting quantity belongs in the orderbook:
- FILLED → don’t add (fully consumed)
- CANCELED / REJECTED → don’t add
- OPEN → add with full original quantity
- PARTIALLY_FILLED → add with (original_size - filled_size)
Sourcefn replay_cancel_order(
&mut self,
msg: &OrderActionMessage,
) -> Result<(), String>
fn replay_cancel_order( &mut self, msg: &OrderActionMessage, ) -> Result<(), String>
Replay a cancel order command.
Sourcefn cancel_order_for_replay_with_fallback(
&mut self,
order_id: u64,
candidate_symbols: Vec<String>,
) -> Option<String>
fn cancel_order_for_replay_with_fallback( &mut self, order_id: u64, candidate_symbols: Vec<String>, ) -> Option<String>
Recovery-only cancel helper.
Tries likely symbols first, then scans every orderbook as a fallback. Replay is authoritative by order_id, so it is better to scrub a ghost from an unexpected book than to leave a stale resting order behind.
Sourcefn replay_replace_order(
&mut self,
msg: &OrderActionMessage,
response: Option<&OrderUpdateMessage>,
) -> Result<(), String>
fn replay_replace_order( &mut self, msg: &OrderActionMessage, response: Option<&OrderUpdateMessage>, ) -> Result<(), String>
Replay a replace order: cancel the old order and create the new one.
For replay, msg.info.order_id is the cancel target (not patched by journal).
The new order’s assigned ID comes from response.order_id.
Sourcepub(super) fn apply_fills_for_replay(&mut self, fills: &[Fill])
pub(super) fn apply_fills_for_replay(&mut self, fills: &[Fill])
Apply fill events from the journal to resting maker orders.
During replay_create_order, each command’s response_json only
reflects the order’s state at the time that command was processed.
A maker order whose response said “Open” may have been filled later
by a subsequent taker order. This method applies those fills to both
the orderbook and order_index, closing the gap.
fills should contain all OrderFilled events from engine_events
for commands in the replay window (after the snapshot L2 sequence).
pub(super) fn flush_startup_expired_order_cancels(&mut self)
Sourcepub(super) fn apply_cancel_events_for_replay(
&mut self,
order_updates: &[OrderUpdateMessage],
)
pub(super) fn apply_cancel_events_for_replay( &mut self, order_updates: &[OrderUpdateMessage], )
Apply side-effect cancels from OrderUpdate events during replay (Pass 3).
During live processing, some commands produce cancel side effects:
- MMP-triggered cancels: A taker fill triggers MMP, which cancels all MMP-enabled orders for that wallet+underlying as a side effect.
- Self-trade prevention: A taker’s order is cancelled to prevent self-trade, and the matching maker may also be cancelled.
These cancels are captured as OrderUpdate events in the taker’s journal entry but are NOT separate journal commands. Pass 1 only processes commands, so these side-effect cancels are lost during replay. This pass re-applies them.
Source§impl UnifiedEngine
impl UnifiedEngine
Sourcepub(crate) async fn handle_rfq_execute(&mut self, request: RfqExecuteRequest)
pub(crate) async fn handle_rfq_execute(&mut self, request: RfqExecuteRequest)
Handle an RFQ execution request within the engine’s single-threaded loop.
The flow:
plan_rfq_executionvalidates the command and produces a list ofEngineMessageevents (per-legOrderFilled+ summaryRfqFilled) along with post-commit MMP updates. No state mutation, no DB writes, no WS emissions yet.process_rfq_journaledruns the events through the same journal pipeline asprocess_order_journaled: serialize → batch send → wait for journal commit ACK → apply portfolio projection under barrier → emit events to WS → ACK the caller.- Post-journal MMP updates fire after the projection barrier releases — they’re side effects that don’t need durability.
Sourceasync fn lookup_cached_rfq_response(&self, request_id: &str) -> Option<String>
async fn lookup_cached_rfq_response(&self, request_id: &str) -> Option<String>
Look up a cached RFQ response by request_id. Returns
Some(fill_id) if the journal has already recorded this
request, None otherwise.
Two-step check:
- Fast path: in-memory
known_request_idscache. Skips DB entirely if the uuid isn’t in the cache. - Slow path: query the journal writer for the full record and
decode the cached fill_id from
response_data.
Returns None on any decode/lookup failure rather than
panicking — a stale cache entry should fall through to the
normal execution path, not crash the engine.
Sourcepub(super) fn encode_rfq_response_data(fill_id: &str) -> Vec<u8> ⓘ
pub(super) fn encode_rfq_response_data(fill_id: &str) -> Vec<u8> ⓘ
Serialise an RFQ response (fill_id) for journal storage. Wire format mirrors the orderbook command format: [version=1][rmp].
Sourcepub(crate) fn plan_rfq_execution(
&mut self,
cmd: &RfqExecuteCommand,
) -> Result<RfqExecutionPlan, RfqExecuteResult>
pub(crate) fn plan_rfq_execution( &mut self, cmd: &RfqExecuteCommand, ) -> Result<RfqExecutionPlan, RfqExecuteResult>
Validate the command and build the planned event list. Pure: no
DB writes, no portfolio mutations, no WS emissions. Used by
handle_rfq_execute for the live path AND by replay paths that
need to re-derive events without re-emitting.
Sourcepub(crate) fn build_rfq_execution_plan_unchecked(
&mut self,
cmd: &RfqExecuteCommand,
) -> RfqExecutionPlan
pub(crate) fn build_rfq_execution_plan_unchecked( &mut self, cmd: &RfqExecuteCommand, ) -> RfqExecutionPlan
Build RFQ fill events without re-running live acceptance checks.
This is used by standby NATS replay after the primary has already accepted and journaled the RFQ. Replay must not reject a command that the primary accepted, and it must not block on live-only validation.
Source§impl UnifiedEngine
impl UnifiedEngine
Sourcefn require_external_durable_mutation_uuid(
command_type: &str,
request_id: &str,
) -> Uuid
fn require_external_durable_mutation_uuid( command_type: &str, request_id: &str, ) -> Uuid
Fail before mutation when an external durable operation enters the engine without a UUID-backed journal identity.
fn parse_directive_domain_status( value: &str, ) -> Result<DirectiveDomainStatus, String>
fn parse_directive_delivery_status( value: &str, ) -> Result<DirectiveDeliveryStatus, String>
fn persisted_withdrawal_directive_status( &self, request_id: &str, ) -> Result<Option<(DirectiveDomainStatus, DirectiveDeliveryStatus)>, String>
async fn external_option_command_already_journaled( &self, request_id: &str, command_type: &str, ) -> bool
async fn journal_external_option_position_command( &self, env: &CommandEnvelope, request_id: &str, command_type_enum: CommandType, outbox_appends: Vec<DirectiveOutboxAppend>, )
async fn journal_external_balance_update_command( &self, env: &CommandEnvelope, request_id: &str, outbox_appends: Vec<DirectiveOutboxAppend>, balance_updates: Vec<BalanceUpdate>, )
fn pm_settlement_request_id(command: &EngineCommand) -> Option<Uuid>
fn preflight_pm_settlement_admin_command( &self, command: &EngineCommand, ) -> Result<(), String>
fn stamp_pm_settlement_admin_command( command: EngineCommand, timestamp_ms: u64, ) -> EngineCommand
async fn journal_pm_settlement_admin_command( &self, env: &CommandEnvelope, request_uuid: Uuid, )
async fn journal_agent_auth_command(&self, env: &CommandEnvelope)
fn agent_auth_journal_available(&self) -> bool
async fn journal_external_cash_withdrawal_command( &self, env: &CommandEnvelope, request_id: &str, outbox_appends: Vec<DirectiveOutboxAppend>, wallet: WalletAddress, amount: Decimal, balance_after: Decimal, timestamp_ms: u64, balance_updates: Vec<BalanceUpdate>, )
async fn prepare_tick_expiry_env( &self, now_ms: u64, ) -> Result<CommandEnvelope, EngineError>
fn replay_owned_expiry_command_payload( env: &CommandEnvelope, ) -> Option<(CommandType, Vec<u8>, &'static str)>
async fn journal_replay_owned_expiry_command( &self, env: &CommandEnvelope, balance_updates: &[BalanceUpdate], )
fn replay_owned_expiry_journal_entry( env: &CommandEnvelope, balance_updates: &[BalanceUpdate], request_uuid: DbUuid, commit_ack: Option<Sender<()>>, ) -> Option<(JournalEntry, &'static str)>
fn validate_tick_expiry_nats_payload( &self, now_ms: u64, context: &TickExpiryContext, ) -> Result<(), EngineError>
fn validate_manual_market_expiry_nats_payload( &self, command: &MarketActionCommand, ) -> Result<(), EngineError>
async fn prepare_manual_market_expiry_context( &self, message: &MarketActionMessage, ) -> Result<TickExpiryContext, EngineError>
async fn apply_expiry_effects_and_events( &mut self, output: &ApplyOutput, req_id: &str, ) -> Result<(), EngineError>
async fn publish_tick_expiry_balance_updates( &self, output: &ApplyOutput, context: &'static str, )
async fn apply_market_effects( &self, output: &ApplyOutput, ) -> Result<(), EngineError>
pub(crate) fn apply_pm_settlement_projection_effects_sync( &self, effects: &[PmSettlementProjectionEffect], _request_id: &str, ) -> Result<(), String>
pub(crate) fn pm_settlement_projection_writes_from_effects( effects: &[PmSettlementProjectionEffect], ) -> Result<Vec<PmSettlementProjectionWrite>, String>
fn pm_projection_version_i32(field: &str, value: u32) -> Result<i32, String>
pub(crate) fn apply_expiry_effect( &mut self, effect: &ExpiryEffect, ) -> Result<(), EngineError>
pub(crate) fn apply_standby_expiry_effect( &mut self, effect: &ExpiryEffect, ) -> Result<(), EngineError>
Sourcepub(crate) fn apply_replayed_expiry_effects(
&mut self,
effects: &[ExpiryEffect],
) -> Result<(), EngineError>
pub(crate) fn apply_replayed_expiry_effects( &mut self, effects: &[ExpiryEffect], ) -> Result<(), EngineError>
Apply TickExpiry runtime effects after deterministic command replay.
The replayed command remains the authority for in-memory balance state.
This recovery-only path only catches durable projections up to the WAL
command: it writes idempotent settlement evidence, persists idempotent
status/cancel projections, and then checks that replayed engine cash
matches the durable balance. It must never set balance_ledger from
durable accounting.
fn observe_standby_settlement_with_retry( handler: &DatabaseHandler, intent: &ExpirySettlementIntent, ) -> Result<SettlementResult, Error>
fn reconcile_settlement_balance( &mut self, intent: &ExpirySettlementIntent, outcome: SettlementResult, )
Sourceasync fn flush_journal(&self)
async fn flush_journal(&self)
Flush the journal batcher, ensuring all pending entries are committed to DB.
async fn hydrate_runtime_state_from_dependencies( &mut self, context: &'static str, hydrate_market_data: bool, )
pub(crate) async fn hydrate_standby_base_state(&mut self)
pub(crate) async fn hydrate_primary_base_state(&mut self)
pub(crate) async fn apply_startup_replayed_events(&mut self)
Sourcepub(super) fn persist_engine_state_snapshot(&self) -> Option<i64>
pub(super) fn persist_engine_state_snapshot(&self) -> Option<i64>
Persist engine state snapshot to disk for fast restart recovery.
Reads the current WAL checkpoint and serializes the engine’s in-memory state (orders, positions, balances) so the next startup can skip full replay from Postgres. Only runs when ENGINE_JOURNAL_WAL_PATH is set.
fn current_wal_checkpoint_command_id(&self) -> i64
async fn apply_runtime_balance_update( &mut self, env: CommandEnvelope, applied_tx: Option<Sender<Result<(), String>>>, wallet: WalletAddress, amount: Decimal, update_kind: &'static str, last_read_snapshot: &mut Instant, journal_request_id: String, outbox_appends: Vec<DirectiveOutboxAppend>, )
fn is_idempotent_empty_balance_update(update_kind: &str) -> bool
async fn handle_quiesce_request(&mut self, request: EngineQuiesceRequest)
Sourceasync fn ingest_price_updates(&mut self)
async fn ingest_price_updates(&mut self)
Read spot prices from the external greeks cache and apply them as deterministic PriceUpdate commands to the engine state.
async fn ingest_price_updates_without_side_effects(&mut self)
async fn ingest_price_updates_with_side_effects( &mut self, emit_side_effects: bool, )
Sourceasync fn ingest_iv_updates(&mut self)
async fn ingest_iv_updates(&mut self)
Read IV surfaces from the external vol oracle and apply them as deterministic IvUpdate commands to the engine state.
Heavy work (surface rebuild, serialization) happens here, outside the engine’s critical path. apply() only does two map inserts.
async fn ingest_iv_updates_without_side_effects(&mut self)
async fn ingest_iv_updates_with_side_effects(&mut self, emit_side_effects: bool)
Source§impl UnifiedEngine
impl UnifiedEngine
Sourcepub fn init(
config: Config,
order_receiver: Receiver<UnifiedEngineRequest>,
market_receiver: Receiver<MarketRequest>,
event_sender: UnboundedSender<EngineMessage>,
read_snapshot: Option<Arc<ArcSwap<EngineSnapshot>>>,
shutdown_receiver: Receiver<()>,
runtime_settings: EngineRuntimeSettings,
database_path: Option<String>,
db_auth: Option<DbAuthConfig>,
allow_no_database: bool,
skip_db_migrations: bool,
) -> Self
pub fn init( config: Config, order_receiver: Receiver<UnifiedEngineRequest>, market_receiver: Receiver<MarketRequest>, event_sender: UnboundedSender<EngineMessage>, read_snapshot: Option<Arc<ArcSwap<EngineSnapshot>>>, shutdown_receiver: Receiver<()>, runtime_settings: EngineRuntimeSettings, database_path: Option<String>, db_auth: Option<DbAuthConfig>, allow_no_database: bool, skip_db_migrations: bool, ) -> Self
Initialize the unified engine with all necessary components
Sourcefn publish_snapshot(&self)
fn publish_snapshot(&self)
Publish a new read snapshot from the current orderbook state.
Sourcepub fn set_portfolio_service(
&mut self,
service: Arc<dyn PortfolioService + Send + Sync>,
)
pub fn set_portfolio_service( &mut self, service: Arc<dyn PortfolioService + Send + Sync>, )
Set the portfolio service for executed state.
This is the canonical source of truth for positions + cash. Must be set for margin checks to use real executed state.
Sourcepub fn set_portfolio_cache(&mut self, cache: Arc<PortfolioCache>)
pub fn set_portfolio_cache(&mut self, cache: Arc<PortfolioCache>)
Set the portfolio cache for synchronous fill processing.
Sourcepub fn set_risk_account_builder(&mut self, builder: Arc<RiskAccountBuilder>)
pub fn set_risk_account_builder(&mut self, builder: Arc<RiskAccountBuilder>)
Set the RiskAccountBuilder for building risk accounts.
This is the single source of truth for “how to build an Account for PM”. Must be set for margin calculations to work correctly.
Sourcepub fn set_standard_account_builder(
&mut self,
builder: Arc<StandardAccountBuilder>,
)
pub fn set_standard_account_builder( &mut self, builder: Arc<StandardAccountBuilder>, )
Set the StandardAccountBuilder for building StandardAccounts.
Used for Standard margin mode accounts (Deribit-style linear margin).
Sourcepub fn set_liquidation_cache(&mut self, cache: Arc<LiquidationCache>)
pub fn set_liquidation_cache(&mut self, cache: Arc<LiquidationCache>)
Set the LiquidationCache for pre-liquidation order blocking.
When set, orders from accounts in pre-liquidation state will be checked to ensure they don’t increase risk. Risk-reducing orders are still allowed.
Sourcepub fn set_db(&mut self, handler: DatabaseHandler)
pub fn set_db(&mut self, handler: DatabaseHandler)
Replace the engine’s DieselEventHandler.
Used during standby-to-active promotion: the engine starts with a readonly handler (or None), and on promote we swap in a read-write handler so the engine can persist fills, settlements, etc.
Sourcepub fn set_journal_writer(&mut self, writer: SharedEngineJournalWriter)
pub fn set_journal_writer(&mut self, writer: SharedEngineJournalWriter)
Set the durable engine journal writer. When set and journaling is enabled, commands are persisted before ACK. Also loads recent request_ids into the idempotency cache for fast lookups.
Sourcepub fn set_journal_batch_sender(&mut self, sender: JournalBatchSender)
pub fn set_journal_batch_sender(&mut self, sender: JournalBatchSender)
Set the batched journal sender for async PostgreSQL writes. When set, the engine pushes journal entries to a channel instead of blocking on synchronous inserts.
Sourcepub fn set_nats_publisher(&mut self, publisher: NatsPublisher)
pub fn set_nats_publisher(&mut self, publisher: NatsPublisher)
Set the NATS publisher for real-time command replication.
pub fn set_nats_balance_update_publisher( &mut self, publisher: NatsBalanceUpdatePublisher, )
Sourcepub(crate) async fn publish_to_nats(&self, env: &CommandEnvelope)
pub(crate) async fn publish_to_nats(&self, env: &CommandEnvelope)
Publish a command envelope to NATS for standby replication. Call this after apply() succeeds for any command that should be replicated.
pub(crate) async fn publish_balance_updates_to_nats( &self, updates: &[BalanceUpdate], )
Sourcepub fn set_mark_price_oracles(
&mut self,
oracles: HashMap<String, Arc<HyperliquidMarkPriceOracle>>,
)
pub fn set_mark_price_oracles( &mut self, oracles: HashMap<String, Arc<HyperliquidMarkPriceOracle>>, )
Set the mark price oracles for spot/forward price lookups.
Maps underlying symbol (e.g., “BTC”, “ETH”) to oracle instance. This is the canonical source for spot prices in margin calculations.
Sourcepub fn set_mmp_cache(&mut self, cache: Arc<MmpCache>)
pub fn set_mmp_cache(&mut self, cache: Arc<MmpCache>)
Set the MMP cache for market maker protection
Sourcepub fn set_tier_cache(&mut self, cache: Arc<TierCache>)
pub fn set_tier_cache(&mut self, cache: Arc<TierCache>)
Set the Tier cache for position limit checks.
Also seeds wallet_margin_modes from the cache so the engine fails closed when a wallet’s mode is missing rather than consulting a default.
Sourcepub fn set_ws_event_sender(&mut self, sender: UnboundedSender<EngineMessage>)
pub fn set_ws_event_sender(&mut self, sender: UnboundedSender<EngineMessage>)
Set the direct WS event sender (low-latency WS delivery).
Sourcepub fn set_rfq_receiver(&mut self, receiver: Receiver<RfqExecuteRequest>)
pub fn set_rfq_receiver(&mut self, receiver: Receiver<RfqExecuteRequest>)
Set the RFQ execution receiver channel.
Sourcepub fn set_deposit_receiver(&mut self, receiver: Receiver<DepositRequest>)
pub fn set_deposit_receiver(&mut self, receiver: Receiver<DepositRequest>)
Set the deposit receiver channel (faucet/admin deposits).
Sourcepub fn set_option_deposit_receiver(
&mut self,
receiver: Receiver<OptionDepositRequest>,
)
pub fn set_option_deposit_receiver( &mut self, receiver: Receiver<OptionDepositRequest>, )
Set the option deposit receiver channel.
pub fn set_option_withdrawal_receiver( &mut self, receiver: Receiver<OptionWithdrawalRequest>, )
Sourcepub fn set_cash_withdrawal_receiver(
&mut self,
receiver: Receiver<CashWithdrawalRequest>,
)
pub fn set_cash_withdrawal_receiver( &mut self, receiver: Receiver<CashWithdrawalRequest>, )
Set the cash withdrawal receiver channel.
Sourcepub fn set_liquidation_bonus_receiver(
&mut self,
receiver: Receiver<LiquidationBonusRequest>,
)
pub fn set_liquidation_bonus_receiver( &mut self, receiver: Receiver<LiquidationBonusRequest>, )
Set the liquidation bonus receiver channel.
Sourcepub fn set_tier_update_receiver(
&mut self,
receiver: Receiver<TierUpdateRequest>,
)
pub fn set_tier_update_receiver( &mut self, receiver: Receiver<TierUpdateRequest>, )
Set the tier update receiver channel.
Sourcepub fn set_hypercore_equity_receiver(
&mut self,
receiver: Receiver<HypercoreEquityRequest>,
)
pub fn set_hypercore_equity_receiver( &mut self, receiver: Receiver<HypercoreEquityRequest>, )
Set the HyperCore equity update receiver channel.
Sourcepub fn set_trading_mode_receiver(
&mut self,
receiver: Receiver<HashMap<String, TradingModes>>,
)
pub fn set_trading_mode_receiver( &mut self, receiver: Receiver<HashMap<String, TradingModes>>, )
Attach a tokio::sync::watch::Receiver that the engine main
loop will poll for per-underlying trading_mode updates. See
apply_underlying_trading_mode_update for the handler
semantics. Optional — call sites that don’t need live catalog
notifications can omit this and the engine behaves exactly as
before (modes only refresh on recovery / CreateMarket).
Sourcepub fn sync_status(&self) -> Arc<SyncStatus>
pub fn sync_status(&self) -> Arc<SyncStatus>
Get the sync status for readiness checks.
API endpoints should check this before serving requests that depend on engine state (e.g., order placement).
Source§impl UnifiedEngine
impl UnifiedEngine
Sourcepub async fn get_settlement_price(
&self,
underlying: &str,
expiry_ts: i64,
) -> Option<Decimal>
pub async fn get_settlement_price( &self, underlying: &str, expiry_ts: i64, ) -> Option<Decimal>
Get settlement price from oracle for a specific expiry.
Sourcepub fn set_reference_price(&mut self, underlying: String, price: Decimal)
pub fn set_reference_price(&mut self, underlying: String, price: Decimal)
Set reference price for testing.
Updates the canonical deps.reference_prices map used by margin checks
and settlement price lookups.
Sourcepub fn set_account_cash(&mut self, wallet: &WalletAddress, cash: f64)
pub fn set_account_cash(&mut self, wallet: &WalletAddress, cash: f64)
Set test cash for an account.
This is a TEST-ONLY function for funding accounts in unit/integration tests. It seeds balance_ledger, which is authoritative for margin checks.
Sourcepub async fn expire_instrument(
&mut self,
symbol: &str,
reference_price: Decimal,
now_ms: u64,
) -> Result<(), String>
pub async fn expire_instrument( &mut self, symbol: &str, reference_price: Decimal, now_ms: u64, ) -> Result<(), String>
Expire a specific instrument and settle all positions.
Sourcepub fn transition_to_pending_settlement(
&mut self,
symbols: &[String],
now_ms: u64,
)
pub fn transition_to_pending_settlement( &mut self, symbols: &[String], now_ms: u64, )
Transition instruments to EXPIRED_PENDING_PRICE status (test helper).
Sourcepub fn get_pending_settlement_instruments(
&self,
) -> Vec<(String, i64, Vec<String>)>
pub fn get_pending_settlement_instruments( &self, ) -> Vec<(String, i64, Vec<String>)>
Get all instruments pending settlement (test helper).
Sourcepub fn orderbooks_contains_key(&self, symbol: &str) -> bool
pub fn orderbooks_contains_key(&self, symbol: &str) -> bool
Check if orderbooks contains a key (for test utilities)
Sourcepub fn get_event_sender(&self) -> Option<UnboundedSender<EngineMessage>>
pub fn get_event_sender(&self) -> Option<UnboundedSender<EngineMessage>>
Get a clone of the event sender (for test utilities)
Sourcepub fn get_all_book_snapshots(
&self,
) -> HashMap<String, (Vec<(Decimal, Decimal)>, Vec<(Decimal, Decimal)>)>
pub fn get_all_book_snapshots( &self, ) -> HashMap<String, (Vec<(Decimal, Decimal)>, Vec<(Decimal, Decimal)>)>
Get L2 snapshots of all orderbooks. Returns a map of symbol → (bids, asks) where each side is Vec<(price, size)>. Used for cache reconciliation after unclean restart (SIGKILL).
Sourcepub fn insert_orderbook(&mut self, symbol: String, orderbook: OrderBook)
pub fn insert_orderbook(&mut self, symbol: String, orderbook: OrderBook)
Insert an orderbook directly (for test utilities)
Sourcepub fn track_expiry(&mut self, symbol: String, expiry: u64)
pub fn track_expiry(&mut self, symbol: String, expiry: u64)
Track expiry for a symbol (for test utilities)
Sourcepub fn persist_test_orderbook(&self, symbol: &str, parsed_symbol: &ParsedSymbol)
pub fn persist_test_orderbook(&self, symbol: &str, parsed_symbol: &ParsedSymbol)
Persist an orderbook to database if handler is available (for test utilities)
Trait Implementations§
Source§impl DurableJournaling for UnifiedEngine
impl DurableJournaling for UnifiedEngine
async fn process_order_journaled(&mut self, request: UnifiedEngineRequest)
Source§impl OrderAdmission for UnifiedEngine
impl OrderAdmission for UnifiedEngine
fn validate_order( &self, order_info: &OrderInfo, wallet: &WalletAddress, ) -> Result<(), String>
fn check_order_restrictions( &self, order_info: &OrderInfo, wallet: &WalletAddress, ) -> Result<(), String>
Source§impl OrderExecution for UnifiedEngine
impl OrderExecution for UnifiedEngine
async fn allocate_and_ack( &mut self, request: &UnifiedEngineRequest, order_info: &OrderInfo, wallet: &WalletAddress, timestamp: u64, ) -> AllocatedOrder
async fn execute_matching( &mut self, order_id: u64, order_info: &OrderInfo, wallet: &WalletAddress, timestamp: u64, ) -> MatchingResult
async fn finalize_order( &mut self, request: &UnifiedEngineRequest, order_id: u64, order_info: &OrderInfo, wallet: &WalletAddress, timestamp: u64, result: &MatchingResult, )
Source§impl ReplayRecovery for UnifiedEngine
impl ReplayRecovery for UnifiedEngine
fn load_markets_from_db(&mut self)
fn load_orderbook_snapshots(&mut self)
fn replay_commands_from_journal(&mut self)
fn check_restored_orderbooks_for_crosses(&self)
fn run_post_startup_reconciliation(&mut self)
Auto Trait Implementations§
impl Freeze for UnifiedEngine
impl !RefUnwindSafe for UnifiedEngine
impl Send for UnifiedEngine
impl Sync for UnifiedEngine
impl Unpin for UnifiedEngine
impl UnsafeUnpin for UnifiedEngine
impl !UnwindSafe for UnifiedEngine
Blanket Implementations§
§impl<T> AggregateExpressionMethods for T
impl<T> AggregateExpressionMethods for T
§fn aggregate_distinct(self) -> Self::Outputwhere
Self: DistinctDsl,
fn aggregate_distinct(self) -> Self::Outputwhere
Self: DistinctDsl,
DISTINCT modifier for aggregate functions Read more§fn aggregate_all(self) -> Self::Outputwhere
Self: AllDsl,
fn aggregate_all(self) -> Self::Outputwhere
Self: AllDsl,
ALL modifier for aggregate functions Read more§fn aggregate_filter<P>(self, f: P) -> Self::Outputwhere
P: AsExpression<Bool>,
Self: FilterDsl<<P as AsExpression<Bool>>::Expression>,
fn aggregate_filter<P>(self, f: P) -> Self::Outputwhere
P: AsExpression<Bool>,
Self: FilterDsl<<P as AsExpression<Bool>>::Expression>,
§fn aggregate_order<O>(self, o: O) -> Self::Outputwhere
Self: OrderAggregateDsl<O>,
fn aggregate_order<O>(self, o: O) -> Self::Outputwhere
Self: OrderAggregateDsl<O>,
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
§impl<T> Conv for T
impl<T> Conv for T
§impl<T> Downcast for Twhere
T: Any,
impl<T> Downcast for Twhere
T: Any,
§fn into_any(self: Box<T>) -> Box<dyn Any>
fn into_any(self: Box<T>) -> Box<dyn Any>
Box<dyn Trait> (where Trait: Downcast) to Box<dyn Any>, which can then be
downcast into Box<dyn ConcreteType> where ConcreteType implements Trait.§fn into_any_rc(self: Rc<T>) -> Rc<dyn Any>
fn into_any_rc(self: Rc<T>) -> Rc<dyn Any>
Rc<Trait> (where Trait: Downcast) to Rc<Any>, which can then be further
downcast into Rc<ConcreteType> where ConcreteType implements Trait.§fn as_any(&self) -> &(dyn Any + 'static)
fn as_any(&self) -> &(dyn Any + 'static)
&Trait (where Trait: Downcast) to &Any. This is needed since Rust cannot
generate &Any’s vtable from &Trait’s.§fn as_any_mut(&mut self) -> &mut (dyn Any + 'static)
fn as_any_mut(&mut self) -> &mut (dyn Any + 'static)
&mut Trait (where Trait: Downcast) to &Any. This is needed since Rust cannot
generate &mut Any’s vtable from &mut Trait’s.§impl<T> DowncastSend for T
impl<T> DowncastSend for T
§impl<T> DowncastSync for T
impl<T> DowncastSync for T
§impl<T> FmtForward for T
impl<T> FmtForward for T
§fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
self to use its Binary implementation when Debug-formatted.§fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
self to use its Display implementation when
Debug-formatted.§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
self to use its LowerExp implementation when
Debug-formatted.§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
self to use its LowerHex implementation when
Debug-formatted.§fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
self to use its Octal implementation when Debug-formatted.§fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
self to use its Pointer implementation when
Debug-formatted.§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
self to use its UpperExp implementation when
Debug-formatted.§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
self to use its UpperHex implementation when
Debug-formatted.§fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
§impl<T> FutureExt for T
impl<T> FutureExt for T
§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T in a tonic::Request§impl<T> IntoSql for T
impl<T> IntoSql for T
§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere
T: ?Sized,
§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
self and passes that borrow into the pipe function. Read more§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
self and passes that borrow into the pipe function. Read more§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R,
) -> R
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
self, then passes self.as_ref() into the pipe function.§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
self, then passes self.as_mut() into the pipe
function.§fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
self, then passes self.deref() into the pipe function.§impl<T> Pointable for T
impl<T> Pointable for T
§impl<T> PolicyExt for Twhere
T: ?Sized,
impl<T> PolicyExt for Twhere
T: ?Sized,
§impl<T, Conn> RunQueryDsl<Conn> for T
impl<T, Conn> RunQueryDsl<Conn> for T
§fn execute<'conn, 'query>(
self,
conn: &'conn mut Conn,
) -> <Conn as AsyncConnectionCore>::ExecuteFuture<'conn, 'query>where
Conn: AsyncConnectionCore + Send,
Self: ExecuteDsl<Conn> + 'query,
fn execute<'conn, 'query>(
self,
conn: &'conn mut Conn,
) -> <Conn as AsyncConnectionCore>::ExecuteFuture<'conn, 'query>where
Conn: AsyncConnectionCore + Send,
Self: ExecuteDsl<Conn> + 'query,
§fn load<'query, 'conn, U>(
self,
conn: &'conn mut Conn,
) -> AndThen<Self::LoadFuture<'conn>, TryCollect<Self::Stream<'conn>, Vec<U>>>where
U: Send,
Conn: AsyncConnectionCore,
Self: LoadQuery<'query, Conn, U> + 'query,
fn load<'query, 'conn, U>(
self,
conn: &'conn mut Conn,
) -> AndThen<Self::LoadFuture<'conn>, TryCollect<Self::Stream<'conn>, Vec<U>>>where
U: Send,
Conn: AsyncConnectionCore,
Self: LoadQuery<'query, Conn, U> + 'query,
§fn load_stream<'conn, 'query, U>(
self,
conn: &'conn mut Conn,
) -> Self::LoadFuture<'conn>where
Conn: AsyncConnectionCore,
U: 'conn,
Self: LoadQuery<'query, Conn, U> + 'query,
fn load_stream<'conn, 'query, U>(
self,
conn: &'conn mut Conn,
) -> Self::LoadFuture<'conn>where
Conn: AsyncConnectionCore,
U: 'conn,
Self: LoadQuery<'query, Conn, U> + 'query,
Stream] with the returned rows. Read more§fn get_result<'query, 'conn, U>(
self,
conn: &'conn mut Conn,
) -> AndThen<Self::LoadFuture<'conn>, LoadNext<Pin<Box<Self::Stream<'conn>>>>>where
U: Send + 'conn,
Conn: AsyncConnectionCore,
Self: LoadQuery<'query, Conn, U> + 'query,
fn get_result<'query, 'conn, U>(
self,
conn: &'conn mut Conn,
) -> AndThen<Self::LoadFuture<'conn>, LoadNext<Pin<Box<Self::Stream<'conn>>>>>where
U: Send + 'conn,
Conn: AsyncConnectionCore,
Self: LoadQuery<'query, Conn, U> + 'query,
§fn get_results<'query, 'conn, U>(
self,
conn: &'conn mut Conn,
) -> AndThen<Self::LoadFuture<'conn>, TryCollect<Self::Stream<'conn>, Vec<U>>>where
U: Send,
Conn: AsyncConnectionCore,
Self: LoadQuery<'query, Conn, U> + 'query,
fn get_results<'query, 'conn, U>(
self,
conn: &'conn mut Conn,
) -> AndThen<Self::LoadFuture<'conn>, TryCollect<Self::Stream<'conn>, Vec<U>>>where
U: Send,
Conn: AsyncConnectionCore,
Self: LoadQuery<'query, Conn, U> + 'query,
Vec with the affected rows. Read more§impl<T> Tap for T
impl<T> Tap for T
§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
Borrow<B> of a value. Read more§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
BorrowMut<B> of a value. Read more§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
AsRef<R> view of a value. Read more§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
AsMut<R> view of a value. Read more§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
Deref::Target of a value. Read more§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
Deref::Target of a value. Read more§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap() only in debug builds, and is erased in release builds.§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut() only in debug builds, and is erased in release
builds.§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
.tap_borrow() only in debug builds, and is erased in release
builds.§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
.tap_borrow_mut() only in debug builds, and is erased in release
builds.§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
.tap_ref() only in debug builds, and is erased in release
builds.§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
.tap_ref_mut() only in debug builds, and is erased in release
builds.§fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
.tap_deref() only in debug builds, and is erased in release
builds.