Skip to main content

hypercall/rsm/unified_engine/
builder.rs

1//! UnifiedEngine builder and construction wiring.
2
3use super::*;
4
5/// Builder for creating the unified engine
6pub struct UnifiedEngineBuilder {
7    config: Config,
8    order_buffer_size: usize,
9    database_path: Option<String>,
10    db_auth: Option<hypercall_db_diesel::DbAuthConfig>,
11    allow_no_database: bool,  // For tests only
12    skip_db_migrations: bool, // Standby mode: connect but don't run DDL
13    mmp_cache: Option<Arc<crate::read_cache::mmp::MmpCache>>,
14    tier_cache: Option<Arc<crate::read_cache::tier::TierCache>>,
15    portfolio_service: Option<Arc<dyn PortfolioService + Send + Sync>>,
16    portfolio_cache: Option<Arc<crate::read_cache::portfolio::PortfolioCache>>,
17    greeks_cache: Option<Arc<GreeksCache>>,
18    mark_price_oracles: HashMap<String, Arc<HyperliquidMarkPriceOracle>>,
19    risk_account_builder:
20        Option<Arc<crate::rsm::portfolio_margin::risk_account_builder::RiskAccountBuilder>>,
21    standard_account_builder: Option<Arc<StandardAccountBuilder>>,
22    liquidation_cache: Option<Arc<crate::liquidator::LiquidationCache>>,
23    risk_vol_oracle: Option<crate::vol_oracle::SharedVolOracle>,
24    #[cfg(any(test, feature = "test-utils"))]
25    test_funded_accounts: Vec<(WalletAddress, f64)>,
26    ws_event_sender: Option<mpsc::UnboundedSender<EngineMessage>>,
27    journal_writer: Option<crate::journal::SharedEngineJournalWriter>,
28    journal_batch_sender: Option<crate::journal::JournalBatchSender>,
29    nats_publisher: Option<crate::nats::NatsPublisher>,
30    nats_balance_update_publisher: Option<crate::nats::NatsBalanceUpdatePublisher>,
31    read_snapshot: Option<Arc<ArcSwap<EngineSnapshot>>>,
32    runtime_settings: Option<EngineRuntimeSettings>,
33    startup_balance_ledger: Option<crate::rsm::ledger::BalanceLedger>,
34}
35
36/// Result of building the engine with additional info
37pub struct UnifiedEngineBuilderResult {
38    pub engine: UnifiedEngine,
39    pub order_tx: mpsc::Sender<UnifiedEngineRequest>,
40    pub market_tx: mpsc::Sender<MarketRequest>,
41    pub margin_mode_tx: mpsc::Sender<MarginModeUpdateRequest>,
42    pub agent_auth_tx: mpsc::Sender<AgentAuthRequest>,
43    pub nonce_check_tx: mpsc::Sender<NonceCheckRequest>,
44    pub pm_settlement_admin_tx: mpsc::Sender<hypercall_runtime_api::PmSettlementAdminRequest>,
45    pub quiesce_tx: mpsc::Sender<EngineQuiesceRequest>,
46    pub last_l2_seq: i64,
47    pub sync_status: Arc<SyncStatus>,
48}
49
50impl UnifiedEngineBuilder {
51    pub fn new(config: Config) -> Self {
52        Self {
53            config,
54            order_buffer_size: DEFAULT_ORDER_BUFFER_SIZE,
55            database_path: None, // Default to no database unless explicitly set
56            db_auth: None,
57            allow_no_database: false, // By default, database is required
58            skip_db_migrations: false,
59            mmp_cache: None,
60            tier_cache: None,
61            portfolio_service: None,
62            portfolio_cache: None,
63            greeks_cache: None,
64            mark_price_oracles: HashMap::new(),
65            risk_account_builder: None,
66            standard_account_builder: None,
67            liquidation_cache: None,
68            risk_vol_oracle: None,
69            #[cfg(any(test, feature = "test-utils"))]
70            test_funded_accounts: Vec::new(),
71            ws_event_sender: None,
72            journal_writer: None,
73            journal_batch_sender: None,
74            nats_publisher: None,
75            nats_balance_update_publisher: None,
76            read_snapshot: None,
77            runtime_settings: None,
78            startup_balance_ledger: None,
79        }
80    }
81
82    pub fn with_runtime_settings(mut self, settings: EngineRuntimeSettings) -> Self {
83        self.runtime_settings = Some(settings);
84        self
85    }
86
87    /// Seed the engine-owned balance ledger at startup.
88    ///
89    /// Runtime readers consume balances through EngineSnapshot publication, not
90    /// through this builder input after construction.
91    pub fn with_startup_balance_ledger(
92        mut self,
93        balance_ledger: crate::rsm::ledger::BalanceLedger,
94    ) -> Self {
95        self.startup_balance_ledger = Some(balance_ledger);
96        self
97    }
98
99    /// Set the ArcSwap snapshot handle for lock-free API reads.
100    /// If not set, the engine creates its own (tests).
101    pub fn with_read_snapshot(mut self, snapshot: Arc<ArcSwap<EngineSnapshot>>) -> Self {
102        self.read_snapshot = Some(snapshot);
103        self
104    }
105
106    /// Set the durable engine journal writer for restart-safe ACK.
107    pub fn with_journal_writer(
108        mut self,
109        writer: crate::journal::SharedEngineJournalWriter,
110    ) -> Self {
111        self.journal_writer = Some(writer);
112        self
113    }
114
115    /// Set the batched journal sender for async PostgreSQL writes.
116    pub fn with_journal_batch_sender(mut self, sender: crate::journal::JournalBatchSender) -> Self {
117        self.journal_batch_sender = Some(sender);
118        self
119    }
120
121    pub fn with_nats_publisher(mut self, publisher: crate::nats::NatsPublisher) -> Self {
122        self.nats_publisher = Some(publisher);
123        self
124    }
125
126    pub fn with_nats_balance_update_publisher(
127        mut self,
128        publisher: crate::nats::NatsBalanceUpdatePublisher,
129    ) -> Self {
130        self.nats_balance_update_publisher = Some(publisher);
131        self
132    }
133
134    pub fn with_order_buffer_size(mut self, size: usize) -> Self {
135        self.order_buffer_size = size;
136        self
137    }
138
139    pub fn with_vol_oracle(mut self, oracle: crate::vol_oracle::SharedVolOracle) -> Self {
140        self.risk_vol_oracle = Some(oracle);
141        self
142    }
143
144    pub fn with_risk_vol_oracle(self, oracle: crate::vol_oracle::SharedVolOracle) -> Self {
145        self.with_vol_oracle(oracle)
146    }
147
148    pub fn with_database(mut self, path: &str) -> Self {
149        self.database_path = Some(path.to_string());
150        self
151    }
152
153    pub fn with_db_auth(mut self, auth: hypercall_db_diesel::DbAuthConfig) -> Self {
154        self.db_auth = Some(auth);
155        self
156    }
157
158    pub fn without_database(mut self) -> Self {
159        self.database_path = None;
160        self
161    }
162
163    /// Connect to the database but skip DDL (migrations).
164    ///
165    /// Used in standby mode: the engine can read max IDs from a readonly
166    /// replica but cannot run migrations. On promote, a read-write
167    /// DieselEventHandler is swapped in via `set_diesel_handler`.
168    pub fn with_skip_db_migrations(mut self) -> Self {
169        self.skip_db_migrations = true;
170        self
171    }
172
173    /// Allow running without database (for tests only - DO NOT USE IN PRODUCTION)
174    #[cfg(any(test, feature = "test-utils"))]
175    pub fn allow_no_database_for_tests(mut self) -> Self {
176        self.allow_no_database = true;
177        self
178    }
179
180    /// Use an in-memory journal for testing.
181    /// Provides real idempotency checks and event capture without requiring a database.
182    /// This is the preferred way to test journaling logic.
183    pub fn with_mock_journal(mut self) -> Self {
184        let mock = Arc::new(crate::journal::InMemoryJournalWriter::new());
185        self.journal_writer = Some(mock);
186        self.allow_no_database = true;
187        self
188    }
189
190    /// Set the MmpCache for market maker protection
191    pub fn with_mmp_cache(mut self, cache: Arc<crate::read_cache::mmp::MmpCache>) -> Self {
192        self.mmp_cache = Some(cache);
193        self
194    }
195
196    /// Set the TierCache for position limit checks
197    pub fn with_tier_cache(mut self, cache: Arc<crate::read_cache::tier::TierCache>) -> Self {
198        self.tier_cache = Some(cache);
199        self
200    }
201
202    /// Set the PortfolioService for executed state (positions + cash).
203    ///
204    /// This is the canonical source of truth. Margin checks will read
205    /// executed positions from this service instead of the stale self.accounts map.
206    pub fn with_portfolio_service(
207        mut self,
208        service: Arc<dyn PortfolioService + Send + Sync>,
209    ) -> Self {
210        self.portfolio_service = Some(service);
211        self
212    }
213
214    /// Set the portfolio cache for synchronous fill processing in the engine hot path.
215    pub fn with_portfolio_cache(
216        mut self,
217        cache: Arc<crate::read_cache::portfolio::PortfolioCache>,
218    ) -> Self {
219        self.portfolio_cache = Some(cache);
220        self
221    }
222
223    /// Set the GreeksCache for spot price lookups in margin calculations.
224    /// This replaces the legacy reference_prices mechanism for live trading.
225    pub fn with_greeks_cache(mut self, cache: Arc<GreeksCache>) -> Self {
226        self.greeks_cache = Some(cache);
227        self
228    }
229
230    /// Set the mark price oracles for spot/forward price lookups.
231    ///
232    /// Maps underlying symbol (e.g., "BTC", "ETH") to oracle instance.
233    /// This is the canonical source for spot prices in margin calculations.
234    pub fn with_mark_price_oracles(
235        mut self,
236        oracles: HashMap<String, Arc<HyperliquidMarkPriceOracle>>,
237    ) -> Self {
238        self.mark_price_oracles = oracles;
239        self
240    }
241
242    /// Set the RiskAccountBuilder for building risk accounts.
243    ///
244    /// This is the single source of truth for "how to build an Account for PM".
245    /// Required for margin calculations to work correctly.
246    pub fn with_risk_account_builder(
247        mut self,
248        builder: Arc<crate::rsm::portfolio_margin::risk_account_builder::RiskAccountBuilder>,
249    ) -> Self {
250        self.risk_account_builder = Some(builder);
251        self
252    }
253
254    /// Set the StandardAccountBuilder for building StandardAccounts.
255    ///
256    /// Used for Standard margin mode accounts (Deribit-style linear margin).
257    /// If not set, Standard mode margin checks will use a fallback.
258    pub fn with_standard_account_builder(mut self, builder: Arc<StandardAccountBuilder>) -> Self {
259        self.standard_account_builder = Some(builder);
260        self
261    }
262
263    /// Set the LiquidationCache for pre-liquidation order blocking.
264    ///
265    /// When set, orders from accounts in pre-liquidation state will be checked
266    /// to ensure they don't increase risk. Risk-reducing orders are still allowed.
267    pub fn with_liquidation_cache(
268        mut self,
269        cache: Arc<crate::liquidator::LiquidationCache>,
270    ) -> Self {
271        self.liquidation_cache = Some(cache);
272        self
273    }
274
275    /// Set the direct WS event sender (low-latency WS delivery).
276    pub fn with_ws_event_sender(mut self, sender: mpsc::UnboundedSender<EngineMessage>) -> Self {
277        self.ws_event_sender = Some(sender);
278        self
279    }
280
281    /// Pre-fund test accounts (for integration tests only - DO NOT USE IN PRODUCTION)
282    /// Adds multiple accounts with the specified cash balance
283    #[cfg(any(test, feature = "test-utils"))]
284    pub fn with_funded_test_accounts(mut self, accounts: Vec<(WalletAddress, f64)>) -> Self {
285        self.test_funded_accounts = accounts;
286        self
287    }
288
289    /// Build the engine.
290    ///
291    /// The event_sender is used for direct event publishing (OrderUpdate, TradeMessage, etc.).
292    /// L2Update events are published via the journal → outbox path.
293    ///
294    /// The event_sender is ALWAYS used because:
295    /// - OrderUpdate events (cancels, fills) must be delivered directly to users via WebSocket
296    /// - Expiry-triggered cancels happen outside the journaled order path
297    /// - The outbox only handles L2Update events (public market data)
298    pub fn build(
299        self,
300        event_sender: mpsc::UnboundedSender<EngineMessage>,
301        shutdown_sender: broadcast::Sender<()>,
302    ) -> (
303        UnifiedEngine,
304        mpsc::Sender<UnifiedEngineRequest>,
305        mpsc::Sender<MarketRequest>,
306    ) {
307        // Validate journaling is configured (mandatory)
308        if self.journal_writer.is_none() && self.journal_batch_sender.is_none() {
309            panic!(
310                "Journaling is mandatory: either journal_writer or journal_batch_sender must be configured. \
311                Use .with_mock_journal() for tests."
312            );
313        }
314
315        let (order_tx, order_rx) = mpsc::channel(self.order_buffer_size);
316        let (market_tx, market_rx) = mpsc::channel(MARKET_REQUEST_BUFFER_SIZE);
317        let (_quiesce_tx, quiesce_rx) = mpsc::channel(8);
318        let shutdown_rx = shutdown_sender.subscribe();
319
320        let mut engine = UnifiedEngine::init(
321            self.config.clone(),
322            order_rx,
323            market_rx,
324            event_sender,
325            self.read_snapshot.clone(),
326            shutdown_rx,
327            self.runtime_settings.clone().unwrap_or_default(),
328            self.database_path.clone(),
329            self.db_auth.clone(),
330            self.allow_no_database,
331            self.skip_db_migrations,
332        );
333        engine.quiesce_receiver = Some(quiesce_rx);
334        self.apply_common_config(&mut engine);
335
336        (engine, order_tx, market_tx)
337    }
338
339    /// Build the engine with additional startup info.
340    pub fn build_with_info(
341        self,
342        event_sender: mpsc::UnboundedSender<EngineMessage>,
343        shutdown_sender: broadcast::Sender<()>,
344    ) -> UnifiedEngineBuilderResult {
345        // Validate journaling is configured (mandatory)
346        if self.journal_writer.is_none() && self.journal_batch_sender.is_none() {
347            panic!(
348                "Journaling is mandatory: either journal_writer or journal_batch_sender must be configured. \
349                Use .with_mock_journal() for tests."
350            );
351        }
352
353        let (order_tx, order_rx) = mpsc::channel(self.order_buffer_size);
354        let (market_tx, market_rx) = mpsc::channel(MARKET_REQUEST_BUFFER_SIZE);
355        let (margin_mode_tx, margin_mode_rx) = mpsc::channel(100);
356        let (agent_auth_tx, agent_auth_rx) = mpsc::channel(100);
357        let (nonce_check_tx, nonce_check_rx) = mpsc::channel(100);
358        let (quiesce_tx, quiesce_rx) = mpsc::channel(8);
359        let (pm_settlement_admin_tx, pm_settlement_admin_rx) = mpsc::channel(64);
360        let shutdown_rx = shutdown_sender.subscribe();
361
362        let mut engine = UnifiedEngine::init(
363            self.config.clone(),
364            order_rx,
365            market_rx,
366            event_sender,
367            self.read_snapshot.clone(),
368            shutdown_rx,
369            self.runtime_settings.clone().unwrap_or_default(),
370            self.database_path.clone(),
371            self.db_auth.clone(),
372            self.allow_no_database,
373            self.skip_db_migrations,
374        );
375        engine.margin_mode_receiver = Some(margin_mode_rx);
376        engine.agent_auth_receiver = Some(agent_auth_rx);
377        engine.nonce_check_receiver = Some(nonce_check_rx);
378        engine.pm_settlement_admin_receiver = Some(pm_settlement_admin_rx);
379        engine.quiesce_receiver = Some(quiesce_rx);
380
381        self.apply_common_config(&mut engine);
382
383        let last_l2_seq = engine.ctx.l2_update_seq.load(Ordering::SeqCst);
384        let sync_status = engine.sync_status();
385
386        UnifiedEngineBuilderResult {
387            engine,
388            order_tx,
389            market_tx,
390            margin_mode_tx,
391            agent_auth_tx,
392            nonce_check_tx,
393            pm_settlement_admin_tx,
394            quiesce_tx,
395            last_l2_seq,
396            sync_status,
397        }
398    }
399
400    fn apply_common_config(&self, engine: &mut UnifiedEngine) {
401        if let Some(oracle) = self.risk_vol_oracle.clone() {
402            // Only wrap in EngineVolOracle (fail-closed, command-only IV) when
403            // the oracle supports surface snapshots. Test oracles that only
404            // implement get_iv() are used directly.
405            let has_snapshots = oracle.supports_surface_snapshots();
406            let margin_oracle: crate::vol_oracle::SharedVolOracle = if has_snapshots {
407                engine.external_vol_oracle = Some(oracle.clone());
408                std::sync::Arc::new(crate::rsm::engine_vol_oracle::EngineVolOracle::new(
409                    engine.engine_iv_surfaces.clone(),
410                ))
411            } else {
412                oracle
413            };
414            engine.margin_manager = crate::rsm::margin_manager::MarginManager::new_with_vol_oracle(
415                &self.config,
416                Some(margin_oracle),
417            );
418        } else if self.allow_no_database {
419            // Test builders that opt out of database wiring still need a deterministic
420            // risk-vol source so order-admission tests don't hit the production fail-closed path.
421            engine.margin_manager.span_margin_service =
422                crate::rsm::margin_service::SpanMarginService::new_for_tests(self.config.clone());
423        }
424
425        if let Some(cache) = self.mmp_cache.clone() {
426            engine.set_mmp_cache(cache);
427        }
428
429        if let Some(cache) = self.tier_cache.clone() {
430            engine.set_tier_cache(cache);
431        }
432
433        if let Some(service) = self.portfolio_service.clone() {
434            engine.set_portfolio_service(service);
435        }
436
437        if let Some(cache) = self.portfolio_cache.clone() {
438            engine.set_portfolio_cache(cache);
439        }
440
441        if let Some(balance_ledger) = &self.startup_balance_ledger {
442            engine.ctx.balance_ledger = balance_ledger.clone();
443            tracing::info!(
444                balance_wallet_count = engine.ctx.balance_ledger.len(),
445                "Installed startup balance_ledger into engine context"
446            );
447        }
448
449        if let Some(builder) = self.risk_account_builder.clone() {
450            engine.set_risk_account_builder(builder);
451        }
452
453        if let Some(builder) = self.standard_account_builder.clone() {
454            engine.set_standard_account_builder(builder);
455        }
456
457        if let Some(cache) = self.liquidation_cache.clone() {
458            engine.set_liquidation_cache(cache);
459        }
460
461        if let Some(cache) = self.greeks_cache.clone() {
462            engine.ctx.deps.greeks_cache = Some(cache);
463        }
464
465        if !self.mark_price_oracles.is_empty() {
466            engine.set_mark_price_oracles(self.mark_price_oracles.clone());
467            engine
468                .expiry_manager
469                .register_settlements_for_existing_instruments(
470                    &engine.ctx.deps,
471                    &engine.ctx.orderbooks,
472                );
473        }
474
475        #[cfg(any(test, feature = "test-utils"))]
476        for (wallet, cash) in &self.test_funded_accounts {
477            engine.set_account_cash(wallet, *cash);
478        }
479
480        if let Some(ws_tx) = self.ws_event_sender.clone() {
481            engine.set_ws_event_sender(ws_tx);
482        }
483
484        if let Some(writer) = self.journal_writer.clone() {
485            engine.set_journal_writer(writer);
486        }
487
488        if let Some(sender) = self.journal_batch_sender.clone() {
489            engine.set_journal_batch_sender(sender);
490        }
491
492        if let Some(publisher) = self.nats_publisher.clone() {
493            engine.set_nats_publisher(publisher);
494        }
495        if let Some(publisher) = self.nats_balance_update_publisher.clone() {
496            engine.set_nats_balance_update_publisher(publisher);
497        }
498    }
499}