1use super::*;
4
5pub struct UnifiedEngineBuilder {
7 config: Config,
8 order_buffer_size: usize,
9 database_path: Option<String>,
10 db_auth: Option<hypercall_db_diesel::DbAuthConfig>,
11 allow_no_database: bool, skip_db_migrations: bool, mmp_cache: Option<Arc<crate::read_cache::mmp::MmpCache>>,
14 tier_cache: Option<Arc<crate::read_cache::tier::TierCache>>,
15 portfolio_service: Option<Arc<dyn PortfolioService + Send + Sync>>,
16 portfolio_cache: Option<Arc<crate::read_cache::portfolio::PortfolioCache>>,
17 greeks_cache: Option<Arc<GreeksCache>>,
18 mark_price_oracles: HashMap<String, Arc<HyperliquidMarkPriceOracle>>,
19 risk_account_builder:
20 Option<Arc<crate::rsm::portfolio_margin::risk_account_builder::RiskAccountBuilder>>,
21 standard_account_builder: Option<Arc<StandardAccountBuilder>>,
22 liquidation_cache: Option<Arc<crate::liquidator::LiquidationCache>>,
23 risk_vol_oracle: Option<crate::vol_oracle::SharedVolOracle>,
24 #[cfg(any(test, feature = "test-utils"))]
25 test_funded_accounts: Vec<(WalletAddress, f64)>,
26 ws_event_sender: Option<mpsc::UnboundedSender<EngineMessage>>,
27 journal_writer: Option<crate::journal::SharedEngineJournalWriter>,
28 journal_batch_sender: Option<crate::journal::JournalBatchSender>,
29 nats_publisher: Option<crate::nats::NatsPublisher>,
30 nats_balance_update_publisher: Option<crate::nats::NatsBalanceUpdatePublisher>,
31 read_snapshot: Option<Arc<ArcSwap<EngineSnapshot>>>,
32 runtime_settings: Option<EngineRuntimeSettings>,
33 startup_balance_ledger: Option<crate::rsm::ledger::BalanceLedger>,
34}
35
36pub struct UnifiedEngineBuilderResult {
38 pub engine: UnifiedEngine,
39 pub order_tx: mpsc::Sender<UnifiedEngineRequest>,
40 pub market_tx: mpsc::Sender<MarketRequest>,
41 pub margin_mode_tx: mpsc::Sender<MarginModeUpdateRequest>,
42 pub agent_auth_tx: mpsc::Sender<AgentAuthRequest>,
43 pub nonce_check_tx: mpsc::Sender<NonceCheckRequest>,
44 pub pm_settlement_admin_tx: mpsc::Sender<hypercall_runtime_api::PmSettlementAdminRequest>,
45 pub quiesce_tx: mpsc::Sender<EngineQuiesceRequest>,
46 pub last_l2_seq: i64,
47 pub sync_status: Arc<SyncStatus>,
48}
49
50impl UnifiedEngineBuilder {
51 pub fn new(config: Config) -> Self {
52 Self {
53 config,
54 order_buffer_size: DEFAULT_ORDER_BUFFER_SIZE,
55 database_path: None, db_auth: None,
57 allow_no_database: false, skip_db_migrations: false,
59 mmp_cache: None,
60 tier_cache: None,
61 portfolio_service: None,
62 portfolio_cache: None,
63 greeks_cache: None,
64 mark_price_oracles: HashMap::new(),
65 risk_account_builder: None,
66 standard_account_builder: None,
67 liquidation_cache: None,
68 risk_vol_oracle: None,
69 #[cfg(any(test, feature = "test-utils"))]
70 test_funded_accounts: Vec::new(),
71 ws_event_sender: None,
72 journal_writer: None,
73 journal_batch_sender: None,
74 nats_publisher: None,
75 nats_balance_update_publisher: None,
76 read_snapshot: None,
77 runtime_settings: None,
78 startup_balance_ledger: None,
79 }
80 }
81
82 pub fn with_runtime_settings(mut self, settings: EngineRuntimeSettings) -> Self {
83 self.runtime_settings = Some(settings);
84 self
85 }
86
87 pub fn with_startup_balance_ledger(
92 mut self,
93 balance_ledger: crate::rsm::ledger::BalanceLedger,
94 ) -> Self {
95 self.startup_balance_ledger = Some(balance_ledger);
96 self
97 }
98
99 pub fn with_read_snapshot(mut self, snapshot: Arc<ArcSwap<EngineSnapshot>>) -> Self {
102 self.read_snapshot = Some(snapshot);
103 self
104 }
105
106 pub fn with_journal_writer(
108 mut self,
109 writer: crate::journal::SharedEngineJournalWriter,
110 ) -> Self {
111 self.journal_writer = Some(writer);
112 self
113 }
114
115 pub fn with_journal_batch_sender(mut self, sender: crate::journal::JournalBatchSender) -> Self {
117 self.journal_batch_sender = Some(sender);
118 self
119 }
120
121 pub fn with_nats_publisher(mut self, publisher: crate::nats::NatsPublisher) -> Self {
122 self.nats_publisher = Some(publisher);
123 self
124 }
125
126 pub fn with_nats_balance_update_publisher(
127 mut self,
128 publisher: crate::nats::NatsBalanceUpdatePublisher,
129 ) -> Self {
130 self.nats_balance_update_publisher = Some(publisher);
131 self
132 }
133
134 pub fn with_order_buffer_size(mut self, size: usize) -> Self {
135 self.order_buffer_size = size;
136 self
137 }
138
139 pub fn with_vol_oracle(mut self, oracle: crate::vol_oracle::SharedVolOracle) -> Self {
140 self.risk_vol_oracle = Some(oracle);
141 self
142 }
143
144 pub fn with_risk_vol_oracle(self, oracle: crate::vol_oracle::SharedVolOracle) -> Self {
145 self.with_vol_oracle(oracle)
146 }
147
148 pub fn with_database(mut self, path: &str) -> Self {
149 self.database_path = Some(path.to_string());
150 self
151 }
152
153 pub fn with_db_auth(mut self, auth: hypercall_db_diesel::DbAuthConfig) -> Self {
154 self.db_auth = Some(auth);
155 self
156 }
157
158 pub fn without_database(mut self) -> Self {
159 self.database_path = None;
160 self
161 }
162
163 pub fn with_skip_db_migrations(mut self) -> Self {
169 self.skip_db_migrations = true;
170 self
171 }
172
173 #[cfg(any(test, feature = "test-utils"))]
175 pub fn allow_no_database_for_tests(mut self) -> Self {
176 self.allow_no_database = true;
177 self
178 }
179
180 pub fn with_mock_journal(mut self) -> Self {
184 let mock = Arc::new(crate::journal::InMemoryJournalWriter::new());
185 self.journal_writer = Some(mock);
186 self.allow_no_database = true;
187 self
188 }
189
190 pub fn with_mmp_cache(mut self, cache: Arc<crate::read_cache::mmp::MmpCache>) -> Self {
192 self.mmp_cache = Some(cache);
193 self
194 }
195
196 pub fn with_tier_cache(mut self, cache: Arc<crate::read_cache::tier::TierCache>) -> Self {
198 self.tier_cache = Some(cache);
199 self
200 }
201
202 pub fn with_portfolio_service(
207 mut self,
208 service: Arc<dyn PortfolioService + Send + Sync>,
209 ) -> Self {
210 self.portfolio_service = Some(service);
211 self
212 }
213
214 pub fn with_portfolio_cache(
216 mut self,
217 cache: Arc<crate::read_cache::portfolio::PortfolioCache>,
218 ) -> Self {
219 self.portfolio_cache = Some(cache);
220 self
221 }
222
223 pub fn with_greeks_cache(mut self, cache: Arc<GreeksCache>) -> Self {
226 self.greeks_cache = Some(cache);
227 self
228 }
229
230 pub fn with_mark_price_oracles(
235 mut self,
236 oracles: HashMap<String, Arc<HyperliquidMarkPriceOracle>>,
237 ) -> Self {
238 self.mark_price_oracles = oracles;
239 self
240 }
241
242 pub fn with_risk_account_builder(
247 mut self,
248 builder: Arc<crate::rsm::portfolio_margin::risk_account_builder::RiskAccountBuilder>,
249 ) -> Self {
250 self.risk_account_builder = Some(builder);
251 self
252 }
253
254 pub fn with_standard_account_builder(mut self, builder: Arc<StandardAccountBuilder>) -> Self {
259 self.standard_account_builder = Some(builder);
260 self
261 }
262
263 pub fn with_liquidation_cache(
268 mut self,
269 cache: Arc<crate::liquidator::LiquidationCache>,
270 ) -> Self {
271 self.liquidation_cache = Some(cache);
272 self
273 }
274
275 pub fn with_ws_event_sender(mut self, sender: mpsc::UnboundedSender<EngineMessage>) -> Self {
277 self.ws_event_sender = Some(sender);
278 self
279 }
280
281 #[cfg(any(test, feature = "test-utils"))]
284 pub fn with_funded_test_accounts(mut self, accounts: Vec<(WalletAddress, f64)>) -> Self {
285 self.test_funded_accounts = accounts;
286 self
287 }
288
289 pub fn build(
299 self,
300 event_sender: mpsc::UnboundedSender<EngineMessage>,
301 shutdown_sender: broadcast::Sender<()>,
302 ) -> (
303 UnifiedEngine,
304 mpsc::Sender<UnifiedEngineRequest>,
305 mpsc::Sender<MarketRequest>,
306 ) {
307 if self.journal_writer.is_none() && self.journal_batch_sender.is_none() {
309 panic!(
310 "Journaling is mandatory: either journal_writer or journal_batch_sender must be configured. \
311 Use .with_mock_journal() for tests."
312 );
313 }
314
315 let (order_tx, order_rx) = mpsc::channel(self.order_buffer_size);
316 let (market_tx, market_rx) = mpsc::channel(MARKET_REQUEST_BUFFER_SIZE);
317 let (_quiesce_tx, quiesce_rx) = mpsc::channel(8);
318 let shutdown_rx = shutdown_sender.subscribe();
319
320 let mut engine = UnifiedEngine::init(
321 self.config.clone(),
322 order_rx,
323 market_rx,
324 event_sender,
325 self.read_snapshot.clone(),
326 shutdown_rx,
327 self.runtime_settings.clone().unwrap_or_default(),
328 self.database_path.clone(),
329 self.db_auth.clone(),
330 self.allow_no_database,
331 self.skip_db_migrations,
332 );
333 engine.quiesce_receiver = Some(quiesce_rx);
334 self.apply_common_config(&mut engine);
335
336 (engine, order_tx, market_tx)
337 }
338
339 pub fn build_with_info(
341 self,
342 event_sender: mpsc::UnboundedSender<EngineMessage>,
343 shutdown_sender: broadcast::Sender<()>,
344 ) -> UnifiedEngineBuilderResult {
345 if self.journal_writer.is_none() && self.journal_batch_sender.is_none() {
347 panic!(
348 "Journaling is mandatory: either journal_writer or journal_batch_sender must be configured. \
349 Use .with_mock_journal() for tests."
350 );
351 }
352
353 let (order_tx, order_rx) = mpsc::channel(self.order_buffer_size);
354 let (market_tx, market_rx) = mpsc::channel(MARKET_REQUEST_BUFFER_SIZE);
355 let (margin_mode_tx, margin_mode_rx) = mpsc::channel(100);
356 let (agent_auth_tx, agent_auth_rx) = mpsc::channel(100);
357 let (nonce_check_tx, nonce_check_rx) = mpsc::channel(100);
358 let (quiesce_tx, quiesce_rx) = mpsc::channel(8);
359 let (pm_settlement_admin_tx, pm_settlement_admin_rx) = mpsc::channel(64);
360 let shutdown_rx = shutdown_sender.subscribe();
361
362 let mut engine = UnifiedEngine::init(
363 self.config.clone(),
364 order_rx,
365 market_rx,
366 event_sender,
367 self.read_snapshot.clone(),
368 shutdown_rx,
369 self.runtime_settings.clone().unwrap_or_default(),
370 self.database_path.clone(),
371 self.db_auth.clone(),
372 self.allow_no_database,
373 self.skip_db_migrations,
374 );
375 engine.margin_mode_receiver = Some(margin_mode_rx);
376 engine.agent_auth_receiver = Some(agent_auth_rx);
377 engine.nonce_check_receiver = Some(nonce_check_rx);
378 engine.pm_settlement_admin_receiver = Some(pm_settlement_admin_rx);
379 engine.quiesce_receiver = Some(quiesce_rx);
380
381 self.apply_common_config(&mut engine);
382
383 let last_l2_seq = engine.ctx.l2_update_seq.load(Ordering::SeqCst);
384 let sync_status = engine.sync_status();
385
386 UnifiedEngineBuilderResult {
387 engine,
388 order_tx,
389 market_tx,
390 margin_mode_tx,
391 agent_auth_tx,
392 nonce_check_tx,
393 pm_settlement_admin_tx,
394 quiesce_tx,
395 last_l2_seq,
396 sync_status,
397 }
398 }
399
400 fn apply_common_config(&self, engine: &mut UnifiedEngine) {
401 if let Some(oracle) = self.risk_vol_oracle.clone() {
402 let has_snapshots = oracle.supports_surface_snapshots();
406 let margin_oracle: crate::vol_oracle::SharedVolOracle = if has_snapshots {
407 engine.external_vol_oracle = Some(oracle.clone());
408 std::sync::Arc::new(crate::rsm::engine_vol_oracle::EngineVolOracle::new(
409 engine.engine_iv_surfaces.clone(),
410 ))
411 } else {
412 oracle
413 };
414 engine.margin_manager = crate::rsm::margin_manager::MarginManager::new_with_vol_oracle(
415 &self.config,
416 Some(margin_oracle),
417 );
418 } else if self.allow_no_database {
419 engine.margin_manager.span_margin_service =
422 crate::rsm::margin_service::SpanMarginService::new_for_tests(self.config.clone());
423 }
424
425 if let Some(cache) = self.mmp_cache.clone() {
426 engine.set_mmp_cache(cache);
427 }
428
429 if let Some(cache) = self.tier_cache.clone() {
430 engine.set_tier_cache(cache);
431 }
432
433 if let Some(service) = self.portfolio_service.clone() {
434 engine.set_portfolio_service(service);
435 }
436
437 if let Some(cache) = self.portfolio_cache.clone() {
438 engine.set_portfolio_cache(cache);
439 }
440
441 if let Some(balance_ledger) = &self.startup_balance_ledger {
442 engine.ctx.balance_ledger = balance_ledger.clone();
443 tracing::info!(
444 balance_wallet_count = engine.ctx.balance_ledger.len(),
445 "Installed startup balance_ledger into engine context"
446 );
447 }
448
449 if let Some(builder) = self.risk_account_builder.clone() {
450 engine.set_risk_account_builder(builder);
451 }
452
453 if let Some(builder) = self.standard_account_builder.clone() {
454 engine.set_standard_account_builder(builder);
455 }
456
457 if let Some(cache) = self.liquidation_cache.clone() {
458 engine.set_liquidation_cache(cache);
459 }
460
461 if let Some(cache) = self.greeks_cache.clone() {
462 engine.ctx.deps.greeks_cache = Some(cache);
463 }
464
465 if !self.mark_price_oracles.is_empty() {
466 engine.set_mark_price_oracles(self.mark_price_oracles.clone());
467 engine
468 .expiry_manager
469 .register_settlements_for_existing_instruments(
470 &engine.ctx.deps,
471 &engine.ctx.orderbooks,
472 );
473 }
474
475 #[cfg(any(test, feature = "test-utils"))]
476 for (wallet, cash) in &self.test_funded_accounts {
477 engine.set_account_cash(wallet, *cash);
478 }
479
480 if let Some(ws_tx) = self.ws_event_sender.clone() {
481 engine.set_ws_event_sender(ws_tx);
482 }
483
484 if let Some(writer) = self.journal_writer.clone() {
485 engine.set_journal_writer(writer);
486 }
487
488 if let Some(sender) = self.journal_batch_sender.clone() {
489 engine.set_journal_batch_sender(sender);
490 }
491
492 if let Some(publisher) = self.nats_publisher.clone() {
493 engine.set_nats_publisher(publisher);
494 }
495 if let Some(publisher) = self.nats_balance_update_publisher.clone() {
496 engine.set_nats_balance_update_publisher(publisher);
497 }
498 }
499}