1use anyhow::Context;
2use hypercall_types::WalletAddress;
3use metrics::gauge;
4use std::future::Future;
5use std::net::SocketAddr;
6use std::pin::Pin;
7use std::str::FromStr;
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::Arc;
10use std::time::{Duration, Instant};
11use tokio::signal;
12use tokio::sync::mpsc;
13use tracing::{debug, error, info, warn};
14
15#[cfg(feature = "testnet")]
16const TESTNET_BOOTSTRAP_DEPOSIT_SEQUENCE_BASE: u64 = 9_000_000_000;
17
18#[cfg(feature = "testnet")]
19fn testnet_bootstrap_deposit_sequence(index: usize) -> u64 {
20 TESTNET_BOOTSTRAP_DEPOSIT_SEQUENCE_BASE
21 .checked_add(index as u64)
22 .expect("testnet bootstrap deposit sequence overflow")
23}
24
25#[cfg(feature = "testnet")]
26fn testnet_bootstrap_source_event_hash(sequence: u64) -> alloy::primitives::FixedBytes<32> {
27 let mut bytes = [0u8; 32];
28 bytes[..16].copy_from_slice(b"testnet_bootstra");
29 bytes[24..].copy_from_slice(&sequence.to_be_bytes());
30 alloy::primitives::FixedBytes::from(bytes)
31}
32
33#[cfg(feature = "testnet")]
34fn testnet_bootstrap_request_id(index: usize) -> String {
35 uuid::Uuid::from_u128(
36 0x1707_0000_0000_0000_0000_0000_0000_0000u128
37 .checked_add(index as u128)
38 .expect("testnet bootstrap request UUID overflow"),
39 )
40 .to_string()
41}
42
43fn parse_configured_wallet_set(
44 config_name: &str,
45 wallets: &[String],
46) -> anyhow::Result<std::collections::BTreeSet<WalletAddress>> {
47 wallets
48 .iter()
49 .map(|wallet| {
50 WalletAddress::from_str(wallet)
51 .map_err(|error| anyhow::anyhow!("Invalid {config_name} wallet {wallet}: {error}"))
52 })
53 .collect()
54}
55
56macro_rules! startup_timing {
59 ($phase:expr, $elapsed:expr) => {{
60 let elapsed = $elapsed;
61 info!("[startup-timing] {}: {:?}", $phase, elapsed);
62 gauge!("ht_startup_phase_seconds", "phase" => $phase).set(elapsed.as_secs_f64());
63 }};
64 ($phase:expr, $elapsed:expr, $($arg:tt)+) => {{
65 let elapsed = $elapsed;
66 info!("[startup-timing] {}: {:?} ({})", $phase, elapsed, format!($($arg)+));
67 gauge!("ht_startup_phase_seconds", "phase" => $phase).set(elapsed.as_secs_f64());
68 }};
69}
70
71use crate::observability::metrics_collector::{MetricsCollector, MetricsCollectorConfig};
72use crate::shared::option_token_address::ensure_option_token_derivation_supported;
73use crate::shared::service::{Service, ServiceRegistry};
74use crate::shared::shutdown::Shutdown;
75use crate::shared::task_group::TaskGroup;
76
77use crate::hypercore::HypercorePositionService;
78use crate::messaging::{ChannelEventBus, EventBusTrait};
79use crate::portfolio::PortfolioService;
80use crate::price_oracle::hydromancer_client::{HydromancerClient, HydromancerConfig};
81use crate::price_oracle::hyperliquid_oracle::{
82 HyperliquidMarkPriceOracle, HyperliquidOracleConfig,
83};
84use crate::price_oracle::hyperliquid_ws::HyperliquidWsFeed;
85use crate::read_cache::greeks::GreeksCache;
86use crate::read_cache::portfolio::PortfolioCache;
87use crate::readiness::{ReadinessRegistry, SyncStatusReadiness, VolOracleReadiness};
88use crate::rsm::margin_service::SpanMarginService;
89use crate::rsm::portfolio_margin::risk_account_builder::RiskAccountBuilder;
90use crate::rsm::unified_engine::{UnifiedEngineBuilder, UnifiedEngineRequest};
91use crate::runtime::tasks::bbo_snapshot::{BboSnapshotService, BboSnapshotTaskConfig};
92use crate::runtime::tasks::historical_pnl::{HistoricalPnlSnapshotTask, HistoricalPnlTaskConfig};
93use crate::runtime::tasks::historical_theo::{
94 HistoricalTheoSnapshotTask, HistoricalTheoTaskConfig,
95};
96use crate::runtime::tasks::index_price_publisher::{
97 IndexPricePublisher, IndexPriceUpdatePublisher,
98};
99use crate::snapshot::{
100 DbInstrumentsSnapshotLoader, DbInstrumentsSnapshotWriter, InstrumentsSnapshotTask,
101 PortfolioSnapshotTask, SnapshotTaskConfig,
102};
103use crate::standard_margin::{StandardAccountBuilder, StandardMarginService};
104use crate::startup::{oracles, services};
105use crate::types::Config;
106use crate::vol_oracle::VolOracleFactory;
107use axum::body::Body;
108use axum::extract::{Request, State};
109use axum::http::StatusCode;
110use axum::response::{IntoResponse, Response};
111use axum::routing::get;
112use axum::Router;
113use hypercall_api::candles::{
114 candle_ws_poll_interval_ms, CandleWsPublisher, HyperliquidCandleSource, UnderlyingCandleSource,
115};
116use hypercall_api::trading_halt::TradingHaltState;
117use hypercall_api::websocket::{
118 PubSubManager, WsCompetitionFinalStanding, WsCompetitionGapUpdate, WsCompetitionRankChange,
119 WsState,
120};
121use hypercall_api::{directives, handlers, middleware};
122use hypercall_competition::CompetitionService;
123use hypercall_db_diesel::DatabaseHandler;
124use hypercall_types::EngineMessage;
125use std::collections::{HashMap, HashSet};
126use tokio::task::JoinHandle;
127use tower::util::ServiceExt;
128
129type PromoteSender = Arc<tokio::sync::Mutex<Option<tokio::sync::oneshot::Sender<()>>>>;
130
131impl IndexPriceUpdatePublisher for PubSubManager {
132 fn publish_index_prices(&self, update: hypercall_types::ws_protocol::WsIndexPriceUpdate) {
133 PubSubManager::publish_index_prices(self, update);
134 }
135}
136
137#[derive(Clone)]
138pub struct StartupProgress {
139 phase: Arc<std::sync::Mutex<String>>,
140 counter: Arc<AtomicU64>,
141 last_progress_unix_ms: Arc<AtomicU64>,
142}
143
144pub(crate) struct StartupProgressSnapshot {
145 pub(crate) phase: String,
146 pub(crate) counter: u64,
147 pub(crate) last_progress_unix_ms: Option<u64>,
148 pub(crate) last_progress_age_ms: Option<u64>,
149}
150
151impl StartupProgress {
152 fn new(initial_phase: impl Into<String>) -> Self {
153 let progress = Self {
154 phase: Arc::new(std::sync::Mutex::new(String::new())),
155 counter: Arc::new(AtomicU64::new(0)),
156 last_progress_unix_ms: Arc::new(AtomicU64::new(0)),
157 };
158 progress.mark(initial_phase);
159 progress
160 }
161
162 fn mark(&self, phase: impl Into<String>) {
163 let phase = phase.into();
164 *self
165 .phase
166 .lock()
167 .expect("startup progress phase mutex poisoned") = phase;
168 self.record_progress();
169 }
170
171 fn heartbeat(&self) {
172 self.record_progress();
173 }
174
175 pub(crate) fn snapshot(&self) -> StartupProgressSnapshot {
176 let phase = self
177 .phase
178 .lock()
179 .expect("startup progress phase mutex poisoned")
180 .clone();
181 let counter = self.counter.load(Ordering::Relaxed);
182 let last_progress_unix_ms = match self.last_progress_unix_ms.load(Ordering::Relaxed) {
183 0 => None,
184 ts => Some(ts),
185 };
186 let last_progress_age_ms =
187 last_progress_unix_ms.map(|ts| current_unix_ms().saturating_sub(ts));
188
189 StartupProgressSnapshot {
190 phase,
191 counter,
192 last_progress_unix_ms,
193 last_progress_age_ms,
194 }
195 }
196
197 fn record_progress(&self) {
198 self.counter.fetch_add(1, Ordering::Relaxed);
199 self.last_progress_unix_ms
200 .store(current_unix_ms(), Ordering::Relaxed);
201 }
202}
203
204fn current_unix_ms() -> u64 {
205 std::time::SystemTime::now()
206 .duration_since(std::time::UNIX_EPOCH)
207 .expect("system clock before UNIX_EPOCH")
208 .as_millis() as u64
209}
210
211#[derive(Clone)]
212struct BootstrapHttpState {
213 shutdown: Shutdown,
214 standby_progress: Option<crate::nats::replay_loop::ReplayProgress>,
215 standby_promote: Option<PromoteSender>,
216 startup_progress: Option<StartupProgress>,
217 full_app: Arc<tokio::sync::RwLock<Option<Router>>>,
218}
219
220struct BootstrapHttpServer {
221 full_app: Arc<tokio::sync::RwLock<Option<Router>>>,
222 task: JoinHandle<()>,
223}
224
225fn build_bootstrap_http_app(state: BootstrapHttpState) -> Router {
226 Router::new()
227 .route("/health", get(bootstrap_health))
228 .route("/standby-ready", get(bootstrap_standby_ready))
229 .fallback(bootstrap_fallback)
230 .with_state(state)
231}
232
233async fn bootstrap_health(State(state): State<BootstrapHttpState>) -> Response {
234 let response = hypercall_api::models::HealthResponse {
235 status: if state.shutdown.is_triggered() {
236 "shutting_down".to_string()
237 } else {
238 "ok".to_string()
239 },
240 };
241
242 if state.shutdown.is_triggered() {
243 (
244 StatusCode::SERVICE_UNAVAILABLE,
245 hypercall_api::sonic_json::SonicJson(response),
246 )
247 .into_response()
248 } else {
249 (
250 StatusCode::OK,
251 hypercall_api::sonic_json::SonicJson(response),
252 )
253 .into_response()
254 }
255}
256
257async fn bootstrap_standby_ready(State(state): State<BootstrapHttpState>) -> Response {
258 if state.full_app.read().await.is_none() {
259 let response = match state.standby_progress.as_ref() {
260 Some(progress) => {
261 let mut payload = handlers::health::standby_status_payload(progress, false);
262 payload["status"] = serde_json::json!("api_starting");
263 payload["promotable"] = serde_json::json!(false);
264 payload["api_router_ready"] = serde_json::json!(false);
265 add_startup_progress_payload(&mut payload, state.startup_progress.as_ref());
266 payload
267 }
268 None => {
269 let mut payload = serde_json::json!({
270 "status": "not_standby_mode",
271 "api_router_ready": false,
272 });
273 add_startup_progress_payload(&mut payload, state.startup_progress.as_ref());
274 payload
275 }
276 };
277 return (
278 StatusCode::SERVICE_UNAVAILABLE,
279 hypercall_api::sonic_json::SonicJson(response),
280 )
281 .into_response();
282 }
283
284 let already_promoted =
285 handlers::health::standby_already_promoted(state.standby_promote.as_ref());
286 handlers::health::standby_ready_response_with_startup(
287 state
288 .standby_progress
289 .as_ref()
290 .map(|progress| progress as &dyn hypercall_api::runtime_status::StandbyReplayProgress),
291 already_promoted,
292 state
293 .startup_progress
294 .as_ref()
295 .map(|progress| progress as &dyn hypercall_api::runtime_status::StartupProgressReader),
296 )
297}
298
299fn add_startup_progress_payload(
300 payload: &mut serde_json::Value,
301 progress: Option<&StartupProgress>,
302) {
303 let Some(progress) = progress else {
304 return;
305 };
306 let snapshot = progress.snapshot();
307 payload["startup_phase"] = serde_json::json!(snapshot.phase);
308 payload["startup_progress_counter"] = serde_json::json!(snapshot.counter);
309 payload["last_startup_progress_unix_ms"] = serde_json::json!(snapshot.last_progress_unix_ms);
310 payload["last_startup_progress_age_ms"] = serde_json::json!(snapshot.last_progress_age_ms);
311}
312
313async fn bootstrap_fallback(
314 State(state): State<BootstrapHttpState>,
315 request: Request<Body>,
316) -> Response {
317 let app = state.full_app.read().await.clone();
318 let Some(app) = app else {
319 return (
320 StatusCode::SERVICE_UNAVAILABLE,
321 hypercall_api::sonic_json::SonicJson(serde_json::json!({
322 "status": "starting",
323 "message": "API router is still starting"
324 })),
325 )
326 .into_response();
327 };
328
329 app.oneshot(request)
330 .await
331 .unwrap_or_else(|error| match error {})
332}
333
334fn start_bootstrap_http_server(
335 listener: tokio::net::TcpListener,
336 addr: SocketAddr,
337 shutdown: &Shutdown,
338 standby_progress: Option<crate::nats::replay_loop::ReplayProgress>,
339 standby_promote: Option<PromoteSender>,
340 startup_progress: Option<StartupProgress>,
341) -> BootstrapHttpServer {
342 let full_app = Arc::new(tokio::sync::RwLock::new(None));
343 let app = build_bootstrap_http_app(BootstrapHttpState {
344 shutdown: shutdown.clone(),
345 standby_progress,
346 standby_promote,
347 startup_progress,
348 full_app: full_app.clone(),
349 });
350 let mut http_shutdown_rx = shutdown.subscribe();
351
352 let task = tokio::spawn(async move {
353 tracing::info!("Bootstrap standby HTTP server listening on {}", addr);
354 let server = axum::serve(listener, app).with_graceful_shutdown(async move {
355 let _ = http_shutdown_rx.recv().await;
356 tracing::info!("Shutting down bootstrap standby HTTP server...");
357 });
358
359 if let Err(error) = server.await {
360 tracing::error!("Bootstrap standby HTTP server error: {}", error);
361 }
362 });
363
364 BootstrapHttpServer { full_app, task }
365}
366
367pub async fn run_integrated_server(
368 config: Config,
369 runtime: Arc<crate::backend_config::BackendRuntime>,
370) -> anyhow::Result<()> {
371 let port = std::env::var("PORT").unwrap_or_else(|_| "3000".to_string());
373 let addr: SocketAddr = format!("[::]:{}", port)
374 .parse()
375 .expect("valid socket address");
376 run_integrated_server_on(config, addr, runtime).await
377}
378
379pub async fn run_integrated_server_on(
384 config: Config,
385 addr: SocketAddr,
386 runtime: Arc<crate::backend_config::BackendRuntime>,
387) -> anyhow::Result<()> {
388 let shutdown = Shutdown::new();
389 run_integrated_server_on_with_shutdown(config, addr, shutdown, runtime).await
390}
391
392pub fn spawn_integrated_server_on(
393 config: Config,
394 addr: SocketAddr,
395 runtime: Arc<crate::backend_config::BackendRuntime>,
396) -> tokio::task::JoinHandle<anyhow::Result<()>> {
397 tokio::spawn(Box::pin(run_integrated_server_on(config, addr, runtime)))
398}
399
400pub async fn run_integrated_server_on_with_shutdown(
404 config: Config,
405 addr: SocketAddr,
406 shutdown: Shutdown,
407 runtime: Arc<crate::backend_config::BackendRuntime>,
408) -> anyhow::Result<()> {
409 let listener = tokio::net::TcpListener::bind(addr).await?;
410 boxed_integrated_server_with_listener_and_shutdown(config, listener, shutdown, runtime).await
411}
412
413pub async fn run_integrated_server_with_listener(
417 config: Config,
418 listener: tokio::net::TcpListener,
419) -> anyhow::Result<()> {
420 let runtime = Arc::new(crate::backend_config::BackendRuntime::from_legacy_env()?);
421 let shutdown = Shutdown::new();
422 boxed_integrated_server_with_listener_and_shutdown(config, listener, shutdown, runtime).await
423}
424
425fn boxed_integrated_server_with_listener_and_shutdown(
426 config: Config,
427 listener: tokio::net::TcpListener,
428 shutdown: Shutdown,
429 runtime: Arc<crate::backend_config::BackendRuntime>,
430) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>> {
431 Box::pin(run_integrated_server_with_listener_and_shutdown(
432 config, listener, shutdown, runtime,
433 ))
434}
435
436fn build_admin_state(
444 app_state: &handlers::AppState,
445 qp_cache: Arc<hypercall_runtime_api::QuoteProviderCache>,
446) -> hypercall_admin::state::AdminState {
447 hypercall_admin::state::AdminState {
448 db: app_state.db.clone(),
449 sync_db: app_state.sync_db.clone(),
450 portfolio_cache: app_state.portfolio_cache.clone(),
451 greeks_cache: app_state.greeks_cache.clone(),
452 quote_provider: app_state.quote_provider.clone(),
453 tier_cache: app_state.tier_cache.clone(),
454 risk_vol_oracle: app_state.risk_vol_oracle.clone(),
455 instruments_cache: app_state.instruments_cache.clone(),
456 engine_state_digest_provider: app_state.engine_state_digest_provider.clone(),
457 engine_journal_reader: app_state.engine_journal_reader.clone(),
458 balance_snapshot_provider: app_state.balance_snapshot_provider.clone(),
459 recovery_safety_report: app_state.recovery_safety_report.clone(),
460 qp_cache,
461 competition: app_state.competition_service.clone(),
462 order_sender: app_state.order_sender.clone(),
463 engine_quiesce_sender: app_state.engine_quiesce_sender.clone(),
464 tier_update_sender: app_state.tier_update_sender.clone(),
465 pm_settlement_admin_sender: app_state.pm_settlement_admin_sender.clone(),
466 agent_auth_sender: app_state.agent_auth_sender.clone(),
467 rsm_signer: app_state.rsm_signer.clone(),
468 trading_halt: app_state.trading_halt.clone(),
469 standby_promote: app_state.standby_promote.clone(),
470 standby_controller: app_state.standby_controller.clone(),
471 drain_signal: app_state.drain_signal.clone(),
472 is_draining: app_state.is_draining.clone(),
473 build_info: app_state.build_info.clone(),
474 runtime_config: Arc::new(hypercall_admin::state::AdminRuntimeConfig {
475 wal_path: app_state.runtime_config.wal_path.clone(),
476 db_host: app_state.runtime_config.db_host.clone(),
477 db_name: app_state.runtime_config.db_name.clone(),
478 testnet_mode: app_state.runtime_config.testnet_mode,
479 portfolio_margin_pool_enabled: app_state.runtime_config.portfolio_margin_pool_enabled,
480 portfolio_margin_settlement_allowlist: app_state
481 .runtime_config
482 .portfolio_margin_settlement_allowlist
483 .clone(),
484 }),
485 server_started_at: app_state.server_started_at,
486 boot_id: app_state.boot_id.clone(),
487 admin_api_key: app_state.admin_api_key.clone(),
488 allow_unauthenticated_monitoring: app_state.allow_unauthenticated_monitoring,
489 }
490}
491
492pub async fn run_integrated_server_with_listener_and_shutdown(
497 config: Config,
498 listener: tokio::net::TcpListener,
499 shutdown: Shutdown,
500 runtime: Arc<crate::backend_config::BackendRuntime>,
501) -> anyhow::Result<()> {
502 let addr = listener.local_addr()?;
503 let mut listener = Some(listener);
504
505 crate::observability::prometheus::init_prometheus();
507 crate::observability::prometheus::start_upkeep_task();
509 tracing::info!(
510 "Starting integrated API server with unified engine on {}...",
511 addr
512 );
513
514 let startup_begin = Instant::now();
515 runtime.apply_process_env();
516
517 #[cfg(not(feature = "test-endpoints"))]
518 if runtime.config.modes.enable_test_endpoints {
519 tracing::warn!(
520 "modes.enable_test_endpoints=true in config but binary built without test-endpoints feature. \
521 Test endpoints, mock feeds, and mock oracles are disabled."
522 );
523 }
524
525 #[cfg(not(feature = "faucet"))]
526 if runtime.config.modes.testnet_mode {
527 tracing::info!(
528 "modes.testnet_mode=true in config but binary built without faucet feature. \
529 The /faucet route is not available."
530 );
531 }
532
533 ensure_option_token_derivation_supported()
534 .map_err(|err| anyhow::anyhow!("Option token derivation startup gate failed: {}", err))?;
535
536 let expiry_times = runtime
540 .config
541 .catalog
542 .expiry_times()
543 .map_err(|err| anyhow::anyhow!("Invalid catalog expiry time configuration: {err:#}"))?;
544 hypercall_types::install_expiry_times(expiry_times)
545 .map_err(|err| anyhow::anyhow!("Expiry time policy install failed: {err}"))?;
546
547 let database_resources = crate::startup::database::build_database_resources(&runtime).await?;
548 let engine_database_url = database_resources.engine_database_url;
549 let db_auth = database_resources.db_auth;
550 let engine_db_auth = database_resources.engine_db_auth;
551 let db_pool = database_resources.db_pool;
552 let diesel_db = database_resources.diesel_db;
553 let shared_db = database_resources.shared_db;
554 let standby_mode_active = database_resources.standby_mode_active;
555 let skip_db_migrations = database_resources.skip_db_migrations;
556 let standby_progress = standby_mode_active.then(crate::nats::replay_loop::ReplayProgress::new);
557 let startup_progress =
558 standby_mode_active.then(|| StartupProgress::new("bootstrap_http_starting"));
559 let (standby_promote, mut standby_replay_promote_rx): (
560 Option<PromoteSender>,
561 Option<tokio::sync::oneshot::Receiver<()>>,
562 ) = if standby_mode_active {
563 let (promote_tx, promote_rx) = tokio::sync::oneshot::channel::<()>();
564 (
565 Some(Arc::new(tokio::sync::Mutex::new(Some(promote_tx)))),
566 Some(promote_rx),
567 )
568 } else {
569 (None, None)
570 };
571 let bootstrap_http = if standby_mode_active {
572 Some(start_bootstrap_http_server(
573 listener
574 .take()
575 .expect("standby bootstrap HTTP requires the bound listener"),
576 addr,
577 &shutdown,
578 standby_progress.clone(),
579 standby_promote.clone(),
580 startup_progress.clone(),
581 ))
582 } else {
583 None
584 };
585 if let Some(progress) = startup_progress.as_ref() {
586 progress.mark("bootstrap_http_ready");
587 }
588
589 let pubsub = Arc::new(PubSubManager::new());
591
592 let mut task_group = TaskGroup::new();
594 let mut service_registry = ServiceRegistry::new();
595 let post_promote_services: Arc<tokio::sync::Mutex<Vec<Arc<dyn Service>>>> =
596 Arc::new(tokio::sync::Mutex::new(Vec::new()));
597
598 let (order_tx, order_rx) = mpsc::channel::<UnifiedEngineRequest>(100);
600
601 let t = Instant::now();
603 let event_bus: Arc<dyn EventBusTrait> = Arc::new(
604 ChannelEventBus::new()
605 .map_err(|e| anyhow::anyhow!("Failed to create ChannelEventBus: {}", e))?,
606 );
607
608 startup_timing!("event_bus_create", t.elapsed());
609
610 crate::startup::transaction_submitter::build_transaction_submitter_resources(
614 &runtime,
615 event_bus.clone(),
616 Some(diesel_db.clone()),
617 &shutdown,
618 &mut task_group,
619 )
620 .await?;
621
622 let sync_exchange_address =
623 alloy::primitives::Address::from_str(&runtime.config.contracts.exchange_contract_address)
624 .ok();
625
626 let engine_event_tx = event_bus.get_sender();
628
629 let (ws_direct_tx, ws_direct_rx_for_forwarder) = mpsc::unbounded_channel();
631
632 let rsm_signer_chain_id = if runtime.config.modes.testnet_mode {
633 hypercall_types::directives::HYPERCALL_TESTNET_CHAIN_ID
634 } else {
635 hypercall_types::directives::HYPERCALL_MAINNET_CHAIN_ID
636 };
637
638 let portfolio_cache = Arc::new(PortfolioCache::new(shared_db.clone()));
640
641 let t = Instant::now();
643 let next_portfolio_command_id = portfolio_cache.initialize().await?;
644 startup_timing!("portfolio_snapshot_load", t.elapsed());
645
646 let tier_cache_config = crate::read_cache::tier::TierCacheConfig {
651 max_open_orders_default: runtime.config.api.max_open_orders_default,
652 max_open_positions_default: runtime.config.api.max_open_positions_default,
653 };
654 let tier_cache = Arc::new(crate::read_cache::tier::TierCache::new_with_config(
655 shared_db.clone(),
656 tier_cache_config,
657 )?);
658 let t = Instant::now();
659 tier_cache.load_from_db().await?;
660 startup_timing!("tier_cache", t.elapsed());
661
662 let t = Instant::now();
664 let rate_limit_cache = Arc::new(
665 hypercall_api::caches::rate_limit::RateLimitCache::new(
666 tier_cache.clone(),
667 runtime.secrets.redis_url.as_deref(),
668 )
669 .await,
670 );
671 startup_timing!("rate_limit_cache", t.elapsed());
672
673 let liquidation_cache = Arc::new(crate::liquidator::LiquidationCache::new());
675 if runtime.config.liquidation.enabled {
676 liquidation_cache
677 .load_from_db(diesel_db.as_ref())
678 .await
679 .map_err(|error| {
680 anyhow::anyhow!("Failed to load liquidation cache from DB: {}", error)
681 })?;
682 info!("LiquidationCache initialized successfully");
683 } else {
684 info!("LiquidationCache restore skipped because liquidation runtime is disabled");
685 }
686
687 let journal_resources = crate::startup::journal::build_journal_resources(
688 &runtime,
689 &db_auth,
690 &shutdown,
691 &mut task_group,
692 )?;
693 let engine_journal_writer = journal_resources.engine_journal_writer;
694 let journal_batch_sender = journal_resources.journal_batch_sender;
695
696 {
699 let portfolio_svc = portfolio_cache.get_service();
700 if let Some(impl_svc) = portfolio_svc
701 .as_any()
702 .downcast_ref::<crate::portfolio::PortfolioServiceImpl>()
703 {
704 impl_svc.set_tier_cache(tier_cache.clone()).await;
705 info!("✓ Wired TierCache to PortfolioService for catchup replay");
706 } else {
707 return Err(anyhow::anyhow!(
708 "Could not wire TierCache to PortfolioService for catchup replay: service is not PortfolioServiceImpl"
709 ));
710 }
711 }
712
713 {
714 let t = Instant::now();
715 info!(
716 "Starting portfolio catchup replay from engine command_id={}",
717 next_portfolio_command_id
718 );
719
720 let mut after_command_id = next_portfolio_command_id - 1;
721 let mut total_events_replayed: u64 = 0;
722 let mut total_custody_commands_replayed: u64 = 0;
723 loop {
724 let replay_db: &dyn hypercall_db::JournalReplayReader = shared_db.as_ref();
725 let command_batch = replay_db
726 .get_replay_commands_after_command_id_sync(after_command_id, None, 1000)
727 .map_err(|e| anyhow::anyhow!("Failed to load portfolio replay commands: {}", e))?;
728
729 let Some(last_command) = command_batch.last() else {
730 break;
731 };
732 let end_command_id = last_command.command_id;
733
734 let replay_events = replay_db
735 .get_portfolio_events_for_command_range_sync(after_command_id + 1, end_command_id)
736 .map_err(|e| anyhow::anyhow!("Failed to load portfolio replay events: {}", e))?;
737
738 let mut replay_events_by_command = std::collections::BTreeMap::new();
739 for replay_event in replay_events {
740 replay_events_by_command
741 .entry(replay_event.command_id)
742 .or_insert_with(Vec::new)
743 .push(replay_event);
744 }
745
746 for command in &command_batch {
747 if let Some(events) = replay_events_by_command.remove(&command.command_id) {
748 for replay_event in events {
749 match replay_event.event_type {
750 hypercall_db::EventType::OrderFilled => {
751 if replay_event.event_data.len() <= 1 {
752 panic!(
753 "CRITICAL_FAILURE: empty replay fill payload for command_id={}",
754 replay_event.command_id
755 );
756 }
757 let fill =
758 match hypercall_types::EngineMessage::deserialize_from_wire(
759 crate::shared::topics::TOPIC_FILLS,
760 &replay_event.event_data,
761 ) {
762 Ok(hypercall_types::EngineMessage::OrderFilled {
763 fill,
764 ..
765 }) => fill,
766 Ok(other) => {
767 panic!(
768 "CRITICAL_FAILURE: expected replay fill for command_id={}, got {}",
769 replay_event.command_id,
770 other.type_name()
771 )
772 }
773 Err(e) => {
774 panic!(
775 "CRITICAL_FAILURE: failed to deserialize replay fill for command_id={}: {}",
776 replay_event.command_id,
777 e
778 )
779 }
780 };
781 portfolio_cache
782 .replay_journal_fill(&fill, replay_event.command_id)
783 .await;
784 }
785 hypercall_db::EventType::PositionExpired => {
786 if replay_event.event_data.len() <= 1 {
787 panic!(
788 "CRITICAL_FAILURE: empty replay position expiry payload for command_id={}",
789 replay_event.command_id
790 );
791 }
792 let expiry = rmp_serde::from_slice::<
793 hypercall_types::PositionExpiredMessage,
794 >(&replay_event.event_data[1..])
795 .unwrap_or_else(|e| {
796 panic!(
797 "CRITICAL_FAILURE: failed to deserialize replay position expiry for command_id={}: {}",
798 replay_event.command_id, e
799 )
800 });
801 portfolio_cache
805 .handle_engine_message(
806 EngineMessage::PositionExpired(expiry),
807 replay_event.command_id,
808 )
809 .await;
810 }
811 other => {
812 panic!(
813 "CRITICAL_FAILURE: unexpected portfolio replay event type {:?} for command_id={}",
814 other, replay_event.command_id
815 )
816 }
817 }
818 total_events_replayed += 1;
819 }
820 }
821
822 let command_type = match command.command_type.as_str() {
823 "OptionDepositUpdate" => Some(crate::nats::CommandType::OptionDepositUpdate),
824 "OptionWithdrawalUpdate" => {
825 Some(crate::nats::CommandType::OptionWithdrawalUpdate)
826 }
827 _ => None,
828 };
829 let Some(command_type) = command_type else {
830 continue;
831 };
832 let decoded = crate::nats::deserialize::deserialize_command(
833 command_type,
834 crate::nats::LEGACY_COMMAND_WIRE_VERSION,
835 &command.command_data,
836 )
837 .unwrap_or_else(|error| {
838 panic!(
839 "CRITICAL_FAILURE: failed to decode portfolio custody command {} (command_id={}): {}",
840 command.command_type, command.command_id, error
841 )
842 });
843 match decoded {
844 crate::rsm::apply::EngineCommand::OptionDepositUpdate {
845 wallet,
846 symbol,
847 quantity,
848 ..
849 } => {
850 portfolio_cache
851 .replay_option_custody_delta(
852 wallet,
853 symbol,
854 quantity,
855 command.command_id,
856 )
857 .await;
858 total_custody_commands_replayed += 1;
859 }
860 crate::rsm::apply::EngineCommand::OptionWithdrawalUpdate {
861 wallet,
862 symbol,
863 quantity,
864 ..
865 } => {
866 portfolio_cache
867 .replay_option_custody_delta(
868 wallet,
869 symbol,
870 -quantity,
871 command.command_id,
872 )
873 .await;
874 total_custody_commands_replayed += 1;
875 }
876 other => {
877 panic!(
878 "CRITICAL_FAILURE: decoded unexpected portfolio custody command {} for command_id={}",
879 other.command_type(),
880 command.command_id
881 );
882 }
883 }
884 }
885
886 if let Some((&command_id, _)) = replay_events_by_command.iter().next() {
887 panic!(
888 "CRITICAL_FAILURE: portfolio replay event found for command_id={} without matching replay command in batch",
889 command_id
890 );
891 }
892
893 after_command_id = end_command_id;
894 }
895
896 startup_timing!(
897 "portfolio_catchup_replay",
898 t.elapsed(),
899 "{} events, {} custody commands",
900 total_events_replayed,
901 total_custody_commands_replayed
902 );
903 portfolio_cache.set_ready();
904 }
905 let t = Instant::now();
907 let instruments_cache =
908 Arc::new(crate::read_cache::instruments_registry::InstrumentsCache::new());
909
910 let instruments_snapshot_loader = DbInstrumentsSnapshotLoader::new(shared_db.clone());
912 let bootstrap_db: Arc<dyn hypercall_db::BootstrapReader> = diesel_db.clone();
913 let instruments_offsets = instruments_cache
914 .clone()
915 .initialize_with_snapshot(
916 &bootstrap_db,
917 event_bus.clone(),
918 &instruments_snapshot_loader,
919 shutdown.subscribe(),
920 )
921 .await?;
922
923 let instruments_count = instruments_cache.len().await;
924 startup_timing!(
925 "instruments_snapshot_init",
926 t.elapsed(),
927 "{} instruments",
928 instruments_count
929 );
930
931 if let Some(offsets) = instruments_offsets {
933 let t = Instant::now();
934 info!(
935 "Instruments snapshot restored with {} stream offsets - starting catchup replay",
936 offsets.len()
937 );
938
939 let instruments_cache_replay = instruments_cache.clone();
940 let handler: Box<dyn Fn(EngineMessage) + Send + Sync> = Box::new(move |event| {
941 if let EngineMessage::MarketUpdate(market_update) = event {
943 let cache = instruments_cache_replay.clone();
944 tokio::task::block_in_place(|| {
945 tokio::runtime::Handle::current().block_on(async {
946 if let Err(e) = cache.handle_market_update(market_update).await {
947 error!("Error replaying market update to instruments cache: {}", e);
948 }
949 });
950 });
951 }
952 });
953
954 let replay_result = event_bus.replay_from_offsets(offsets, handler).await;
955
956 match replay_result {
957 Ok(final_offsets) => {
958 if let Err(e) = instruments_cache
961 .apply_snapshot_offsets(final_offsets)
962 .await
963 {
964 error!("Failed to apply replay offsets to instruments cache: {}", e);
965 }
966 let final_count = instruments_cache.len().await;
967 startup_timing!(
968 "instruments_catchup_replay",
969 t.elapsed(),
970 "{} instruments",
971 final_count
972 );
973 instruments_cache.sync_status().set_ready();
974 }
975 Err(e) => {
976 startup_timing!("instruments_catchup_replay", t.elapsed(), "FAILED: {}", e);
977 error!(
978 "Instruments catchup replay failed: {} - marking as ready with potentially stale state",
979 e
980 );
981 instruments_cache.sync_status().set_ready();
983 }
984 }
985 }
986
987 if !standby_mode_active {
989 let instruments_cache_for_snapshot = instruments_cache.clone();
990 let instruments_get_offsets = move || instruments_cache_for_snapshot.snapshot_offsets();
991 let instruments_snapshot_writer = Arc::new(DbInstrumentsSnapshotWriter::new(
992 shared_db.clone(),
993 instruments_cache.clone(),
994 instruments_get_offsets,
995 ));
996 let instruments_snapshot_task = Arc::new(InstrumentsSnapshotTask::new(
997 instruments_snapshot_writer,
998 SnapshotTaskConfig::default(),
999 ));
1000 service_registry.register(instruments_snapshot_task);
1001 info!("Instruments snapshot task registered (60s interval)");
1002 } else {
1003 info!("Instruments snapshot task deferred (standby mode)");
1004 }
1005
1006 let catalog_config = runtime.config.catalog.clone();
1007 info!(
1008 "Loaded inline catalog config version {} ({} underlyings)",
1009 catalog_config.version,
1010 catalog_config.underlyings.len()
1011 );
1012 let collateral_registry = Arc::new(
1013 catalog_manager::CollateralRegistry::from_catalog(&catalog_config)
1014 .context("Failed to build collateral registry from catalog config")?,
1015 );
1016
1017 let oracle_markets = oracles::configured_oracle_markets(&catalog_config);
1019 let underlying_to_candle_coin: Arc<HashMap<String, String>> = Arc::new(
1020 oracle_markets
1021 .iter()
1022 .map(|market| (market.underlying.clone(), market.hyperliquid_coin.clone()))
1023 .collect(),
1024 );
1025 let candle_source: Arc<dyn UnderlyingCandleSource> = Arc::new(
1026 HyperliquidCandleSource::from_config(
1027 &runtime.config.pricing,
1028 runtime.config.modes.testnet_mode,
1029 )
1030 .map_err(|e| anyhow::anyhow!("Failed to initialize HyperliquidCandleSource: {}", e))?,
1031 );
1032
1033 let mut ws_symbols = Vec::new();
1035 let mut seen_symbols = HashSet::new();
1036 for market in &oracle_markets {
1037 if seen_symbols.insert(market.hyperliquid_coin.clone()) {
1038 ws_symbols.push(market.hyperliquid_coin.clone());
1039 }
1040 }
1041
1042 let ws_url = runtime.config.pricing.hyperliquid_ws_url.clone();
1043 let ws_feed = {
1044 #[cfg(feature = "test-endpoints")]
1045 {
1046 if runtime.config.modes.skip_external_oracle {
1047 let mut default_prices = HashMap::new();
1048 default_prices.insert("BTC".to_string(), 90_000.0);
1049 default_prices.insert("ETH".to_string(), 2_500.0);
1050 default_prices.insert("HYPE".to_string(), 42.0);
1051 default_prices.insert("km:US500".to_string(), 5_500.0);
1052 default_prices.insert("km:USOIL".to_string(), 65.0);
1053 oracles::apply_mock_oracle_price_overrides(&mut default_prices);
1054 let feed = Arc::new(HyperliquidWsFeed::new_mock(ws_symbols, &default_prices));
1055 let feed_clone = feed.clone();
1056 let mock_shutdown_rx = shutdown.subscribe();
1057 task_group.spawn("MockWsFeedRefresh", async move {
1058 let mut interval = tokio::time::interval(Duration::from_secs(5));
1059 let mut shutdown = mock_shutdown_rx;
1060 loop {
1061 tokio::select! {
1062 _ = interval.tick() => feed_clone.refresh_mock_timestamps().await,
1063 _ = shutdown.recv() => break,
1064 }
1065 }
1066 Ok(())
1067 });
1068 info!("HyperliquidWsFeed mock with default prices (skip_external_oracle=true)");
1069 feed
1070 } else {
1071 let feed = Arc::new(HyperliquidWsFeed::new(ws_url.clone(), ws_symbols));
1072 let ws_feed_clone = feed.clone();
1073 let ws_shutdown_rx = shutdown.subscribe();
1074 task_group.spawn("HyperliquidWsFeed", async move {
1075 ws_feed_clone.start(ws_shutdown_rx).await;
1076 Ok(())
1077 });
1078 info!("HyperliquidWsFeed live mode enabled (test-endpoints build)");
1079 feed
1080 }
1081 }
1082 #[cfg(not(feature = "test-endpoints"))]
1083 {
1084 let feed = Arc::new(HyperliquidWsFeed::new(ws_url.clone(), ws_symbols));
1085 let ws_feed_clone = feed.clone();
1086 let ws_shutdown_rx = shutdown.subscribe();
1087 task_group.spawn("HyperliquidWsFeed", async move {
1088 ws_feed_clone.start(ws_shutdown_rx).await;
1089 Ok(())
1090 });
1091 info!("HyperliquidWsFeed started (url={})", ws_url);
1092 feed
1093 }
1094 };
1095
1096 let hydromancer_client =
1097 if runtime.config.hydromancer.enabled && !cfg!(feature = "test-endpoints") {
1098 let api_key = runtime
1099 .secrets
1100 .require_hydromancer_api_key()
1101 .expect("Hydromancer is enabled but HYDROMANCER_API_KEY is missing")
1102 .to_string();
1103 let config = HydromancerConfig {
1104 api_url: runtime.config.hydromancer.api_url.clone(),
1105 api_key,
1106 };
1107 info!("Hydromancer fallback oracle enabled");
1108 Some(Arc::new(HydromancerClient::new(config)))
1109 } else {
1110 info!("Hydromancer fallback oracle disabled");
1111 None
1112 };
1113
1114 let t = Instant::now();
1116 let spot_price_notify = Arc::new(tokio::sync::Notify::new());
1117 let mut mark_price_oracles: HashMap<String, Arc<HyperliquidMarkPriceOracle>> = HashMap::new();
1118 for market in &oracle_markets {
1119 let t_oracle = Instant::now();
1120 let oracle_config = HyperliquidOracleConfig {
1121 symbol: market.hyperliquid_coin.clone(),
1122 oracle_writer: Some(shared_db.clone()),
1123 ws_feed: Some(ws_feed.clone()),
1124 price_notify: Some(spot_price_notify.clone()),
1125 ..Default::default()
1126 };
1127 let mut oracle = {
1128 #[cfg(feature = "test-endpoints")]
1129 {
1130 HyperliquidMarkPriceOracle::new(oracle_config).map_err(|e| {
1131 anyhow::anyhow!(
1132 "Failed to construct mark price oracle {}: {}",
1133 market.underlying,
1134 e
1135 )
1136 })?
1137 }
1138 #[cfg(not(feature = "test-endpoints"))]
1139 {
1140 match HyperliquidMarkPriceOracle::new_with_init(oracle_config.clone()).await {
1141 Ok(oracle) => oracle,
1142 Err(err) => {
1143 error!(
1144 "Failed to initialize mark price oracle for {} (coin: {}): {}. Starting without initial price.",
1145 market.underlying, market.hyperliquid_coin, err
1146 );
1147 HyperliquidMarkPriceOracle::new(oracle_config).map_err(|e| {
1148 anyhow::anyhow!(
1149 "Failed to construct mark price oracle {}: {}",
1150 market.underlying,
1151 e
1152 )
1153 })?
1154 }
1155 }
1156 }
1157 };
1158 if let Some(ref hc) = hydromancer_client {
1159 oracle.set_hydromancer_client(hc.clone());
1160 }
1161 startup_timing!("oracle_init", t_oracle.elapsed(), "{}", market.underlying);
1162 gauge!("ht_startup_oracle_seconds", "underlying" => market.underlying.clone())
1163 .set(t_oracle.elapsed().as_secs_f64());
1164 let oracle = Arc::new(oracle);
1165 mark_price_oracles.insert(market.underlying.clone(), oracle);
1166 }
1167 startup_timing!("oracles_total", t.elapsed());
1168
1169 for market in &oracle_markets {
1171 let oracle_clone = mark_price_oracles
1172 .get(&market.underlying)
1173 .unwrap_or_else(|| {
1174 panic!(
1175 "Mark price oracle missing for {} after initialization",
1176 market.underlying
1177 )
1178 })
1179 .clone();
1180 let oracle_shutdown_rx = shutdown.subscribe();
1181 let task_name = format!("MarkPriceOracle-{}", market.underlying);
1182 let task_name_static = Box::leak(task_name.into_boxed_str());
1183 task_group.spawn(task_name_static, async move {
1184 let handle = oracle_clone.start_polling_with_shutdown(oracle_shutdown_rx);
1185 if let Err(e) = handle.await {
1186 error!("MarkPriceOracle task panicked: {:?}", e);
1187 }
1188 Ok(())
1189 });
1190 info!(
1191 "Initialized mark price oracle for {} (coin: {})",
1192 market.underlying, market.hyperliquid_coin
1193 );
1194 }
1195
1196 let mark_price_oracles_for_catalog = mark_price_oracles.clone();
1198 info!(
1199 "Mark price oracles initialized for: {:?}",
1200 mark_price_oracles.keys().collect::<Vec<_>>()
1201 );
1202
1203 let read_snapshot = std::sync::Arc::new(arc_swap::ArcSwap::from_pointee(
1204 crate::rsm::engine_snapshot::EngineSnapshot::empty(),
1205 ));
1206 let snapshot_provider = std::sync::Arc::new(
1207 crate::rsm::engine_snapshot::SnapshotQuoteProvider::new(read_snapshot.clone()),
1208 );
1209 let quote_provider: std::sync::Arc<dyn hypercall_runtime_api::QuoteProvider> =
1210 snapshot_provider.clone();
1211 let engine_state_digest_provider: std::sync::Arc<
1212 dyn crate::rsm::engine_snapshot::EngineStateDigestProvider,
1213 > = snapshot_provider.clone();
1214 let balance_provider: std::sync::Arc<dyn crate::rsm::ledger::BalanceProvider> =
1215 snapshot_provider.clone();
1216 let order_snapshot: std::sync::Arc<dyn hypercall_runtime_api::OrderSnapshotProvider> =
1217 snapshot_provider.clone();
1218 let agent_auth: std::sync::Arc<dyn hypercall_runtime_api::AgentAuthProvider> =
1219 snapshot_provider.clone();
1220 let t = Instant::now();
1222 let greeks_cache = GreeksCache::new(
1223 diesel_db.as_ref(),
1224 event_bus.clone(),
1225 mark_price_oracles.clone(),
1226 shutdown.subscribe(),
1227 quote_provider.clone(),
1228 )
1229 .await
1230 .map_err(|e| anyhow::anyhow!("Failed to create GreeksCache: {}", e))?;
1231 startup_timing!("greeks_cache", t.elapsed());
1232 let competition_service = Arc::new(CompetitionService::new(
1233 diesel_db.clone(),
1234 greeks_cache.clone(),
1235 ));
1236
1237 let t = Instant::now();
1239 let mmp_cache = Arc::new(crate::read_cache::mmp::MmpCache::new(
1240 shared_db.clone(),
1241 greeks_cache.clone(),
1242 )?);
1243 mmp_cache.load_configs_from_db().await?;
1244 startup_timing!("mmp_cache", t.elapsed());
1245 service_registry.register(mmp_cache.clone());
1246
1247 let ws_receiver = ws_direct_rx_for_forwarder;
1252
1253 tier_cache.load_from_db().await?;
1257 debug!("TierCache startup catchup complete");
1258
1259 let push_service: Option<Arc<hypercall_api::push_service::PushNotificationService>> =
1261 match std::env::var("VAPID_PRIVATE_KEY_PEM") {
1262 Ok(pem_b64) => {
1263 let pem_b64_clean: String =
1264 pem_b64.chars().filter(|c| !c.is_whitespace()).collect();
1265 match base64::Engine::decode(
1266 &base64::engine::general_purpose::STANDARD,
1267 &pem_b64_clean,
1268 ) {
1269 Ok(pem_bytes) => {
1270 match hypercall_api::push_service::PushNotificationService::new(
1271 diesel_db.clone(),
1272 &pem_bytes,
1273 ) {
1274 Ok(svc) => {
1275 tracing::info!("✓ PushNotificationService initialized");
1276 Some(Arc::new(svc))
1277 }
1278 Err(e) => {
1279 tracing::warn!("Push notification service init failed: {e}");
1280 None
1281 }
1282 }
1283 }
1284 Err(e) => {
1285 tracing::warn!("VAPID_PRIVATE_KEY_PEM base64 decode failed: {e}");
1286 None
1287 }
1288 }
1289 }
1290 Err(_) => {
1291 tracing::info!("VAPID_PRIVATE_KEY_PEM not set, push notifications disabled");
1292 None
1293 }
1294 };
1295
1296 let notification_service: Arc<hypercall_api::notification_service::NotificationService> = {
1305 let redis_client_opt: Option<redis::Client> = match std::env::var("UPSTASH_REDIS_URL")
1306 .ok()
1307 .or_else(|| std::env::var("REDIS_URL").ok())
1308 {
1309 Some(url) if !url.trim().is_empty() => {
1310 let trimmed = url.trim();
1311 if trimmed.starts_with("redis://") || trimmed.starts_with("rediss://") {
1312 match redis::Client::open(trimmed.to_string()) {
1313 Ok(c) => {
1314 tracing::info!("✓ NotificationService Redis cache initialized");
1315 Some(c)
1316 }
1317 Err(e) => {
1318 tracing::warn!(
1319 error = %e,
1320 "NotificationService: Redis client open failed; feed persistence stays on, bell cache disabled"
1321 );
1322 None
1323 }
1324 }
1325 } else {
1326 tracing::warn!(
1327 "NotificationService: UPSTASH_REDIS_URL / REDIS_URL must be a redis:// or rediss:// URI (got scheme mismatch); feed persistence stays on, bell cache disabled"
1328 );
1329 None
1330 }
1331 }
1332 _ => {
1333 tracing::info!(
1334 "NotificationService: no Redis URL configured (UPSTASH_REDIS_URL / REDIS_URL), bell cache disabled; feed persistence still active"
1335 );
1336 None
1337 }
1338 };
1339 hypercall_api::notification_service::NotificationService::new(
1340 diesel_db.clone(),
1341 redis_client_opt,
1342 shutdown.subscribe(),
1343 )
1344 };
1345
1346 let ws_forwarder_deps = hypercall_api::websocket::event_forwarder::WsEventForwarderDeps {
1348 pubsub: pubsub.clone(),
1349 portfolio_cache: portfolio_cache.clone(),
1350 instruments_cache: instruments_cache.clone(),
1351 quote_provider: quote_provider.clone(),
1352 greeks_cache: greeks_cache.clone(),
1353 tier_cache: tier_cache.clone(),
1354 push_service: push_service.clone(),
1355 db: Arc::new(
1356 crate::db_handler::DieselEventHandler::with_pool_no_migrations(shared_db.pool()),
1357 ),
1358 engine_event_tx: engine_event_tx.clone(),
1359 notification_service: notification_service.clone(),
1360 shutdown: shutdown.clone(),
1361 };
1362 task_group.spawn("WsEventForwarder", async move {
1363 hypercall_api::websocket::event_forwarder::run(ws_forwarder_deps, ws_receiver).await
1364 });
1365
1366 let competition_fill_rx = event_bus
1372 .subscribe(vec![hypercall_types::topics::TOPIC_FILLS.to_string()])
1373 .await
1374 .map_err(|e| anyhow::anyhow!("Failed to subscribe competition fill recorder: {}", e))?;
1375 let competition_fill_recorder =
1376 hypercall_competition::CompetitionFillRecorder::new(competition_service.clone());
1377 let competition_fill_shutdown = shutdown.subscribe();
1378 task_group.spawn("CompetitionFillRecorder", async move {
1379 competition_fill_recorder
1380 .run(competition_fill_rx, competition_fill_shutdown)
1381 .await;
1382 Ok(())
1383 });
1384
1385 let portfolio_service = portfolio_cache.get_service();
1388
1389 let market_stats_cache = Arc::new(crate::read_cache::market_stats::MarketStatsCache::new(
1391 portfolio_service.clone(),
1392 ));
1393 market_stats_cache
1394 .clone()
1395 .initialize_with_backfill(diesel_db.as_ref(), event_bus.clone(), shutdown.subscribe())
1396 .await?;
1397 tracing::info!("✓ MarketStatsCache initialized");
1398
1399 let markets_snapshot_refresh_ms = runtime.config.api.markets_snapshot_refresh_ms;
1400 let markets_snapshot_cache = Arc::new(
1401 hypercall_api::caches::MarketsSnapshotCache::new(
1402 instruments_cache.clone(),
1403 greeks_cache.clone(),
1404 market_stats_cache.clone(),
1405 )
1406 .with_refresh_interval(Duration::from_millis(markets_snapshot_refresh_ms)),
1407 );
1408 markets_snapshot_cache.initialize().await;
1409 service_registry.register(markets_snapshot_cache.clone());
1410 tracing::info!(
1411 "✓ MarketsSnapshotCache initialized (refresh_ms={})",
1412 markets_snapshot_refresh_ms
1413 );
1414
1415 let instruments_snapshot_refresh_ms =
1416 services::parse_positive_refresh_ms_env("INSTRUMENTS_SNAPSHOT_REFRESH_MS", 10_000);
1417 let instruments_snapshot_cache = Arc::new(
1418 hypercall_api::caches::InstrumentsSnapshotCache::new(instruments_cache.clone())
1419 .with_refresh_interval(Duration::from_millis(instruments_snapshot_refresh_ms)),
1420 );
1421 instruments_snapshot_cache.initialize().await;
1422 service_registry.register(instruments_snapshot_cache.clone());
1423 tracing::info!(
1424 "✓ InstrumentsSnapshotCache initialized (refresh_ms={})",
1425 instruments_snapshot_refresh_ms
1426 );
1427
1428 let competitions_snapshot_refresh_ms =
1430 services::parse_positive_refresh_ms_env("COMPETITIONS_SNAPSHOT_REFRESH_MS", 10_000);
1431 let competitions_snapshot_cache = Arc::new(
1432 hypercall_api::caches::CompetitionsSnapshotCache::new(competition_service.clone())
1433 .with_refresh_interval(Duration::from_millis(competitions_snapshot_refresh_ms)),
1434 );
1435 competitions_snapshot_cache.initialize().await;
1436 service_registry.register(competitions_snapshot_cache.clone());
1437 tracing::info!(
1438 "✓ CompetitionsSnapshotCache initialized (refresh_ms={})",
1439 competitions_snapshot_refresh_ms
1440 );
1441
1442 let sparklines_snapshot_refresh_ms =
1444 services::parse_positive_refresh_ms_env("SPARKLINES_SNAPSHOT_REFRESH_MS", 15_000);
1445 let sparklines_snapshot_cache = Arc::new(
1446 hypercall_api::caches::SparklinesSnapshotCache::from_config(
1447 &runtime.config.pricing,
1448 runtime.config.modes.testnet_mode,
1449 )
1450 .map_err(|e| anyhow::anyhow!("Failed to create SparklinesSnapshotCache: {}", e))?
1451 .with_refresh_interval(Duration::from_millis(sparklines_snapshot_refresh_ms)),
1452 );
1453 sparklines_snapshot_cache.initialize().await;
1454 service_registry.register(sparklines_snapshot_cache.clone());
1455 tracing::info!(
1456 "✓ SparklinesSnapshotCache initialized (refresh_ms={})",
1457 sparklines_snapshot_refresh_ms
1458 );
1459
1460 let mut hypercore_event_rx = event_bus
1461 .subscribe(vec![
1462 crate::shared::topics::TOPIC_HYPERCORE_POSITION_UPDATES.to_string(),
1463 ])
1464 .await
1465 .map_err(|e| anyhow::anyhow!("Failed to subscribe to HyperCore updates: {}", e))?;
1466 let hypercore_ws_tx = ws_direct_tx.clone();
1467 let mut hypercore_bridge_shutdown_rx = shutdown.subscribe();
1468 task_group.spawn("HypercoreEventForwarder", async move {
1469 loop {
1470 let event = tokio::select! {
1471 _ = hypercore_bridge_shutdown_rx.recv() => {
1472 info!("HypercoreEventForwarder received shutdown signal");
1473 break;
1474 }
1475 maybe_event = hypercore_event_rx.recv() => {
1476 match maybe_event {
1477 Some(event) => event,
1478 None => break,
1479 }
1480 }
1481 };
1482
1483 if let EngineMessage::HypercorePositionUpdate(_) = &event {
1484 if hypercore_ws_tx.send(event).is_err() {
1485 warn!("HypercoreEventForwarder failed to forward HyperCore update");
1486 break;
1487 }
1488 }
1489 }
1490
1491 Ok(())
1492 });
1493
1494 let (deposit_tx, deposit_rx) =
1495 tokio::sync::mpsc::channel::<crate::rsm::unified_engine::DepositRequest>(128);
1496
1497 let t = Instant::now();
1500 event_bus.clone().start_processing().await;
1501 startup_timing!("event_bus_start_processing", t.elapsed());
1502
1503 let vol_oracle_output = VolOracleFactory::build(&catalog_config, &mut task_group, &shutdown)
1504 .await
1505 .map_err(|err| anyhow::anyhow!("Failed to build vol oracle router: {err:#}"))?;
1506 let risk_vol_oracle = vol_oracle_output.oracle;
1507
1508 greeks_cache.set_vol_oracle(risk_vol_oracle.clone()).await;
1511
1512 {
1515 let gc = greeks_cache.clone();
1516 let spots = vol_oracle_output.polygon_platform_spots;
1517 let underlyings: Vec<String> = catalog_config.vol_oracles.routes.keys().cloned().collect();
1518 let mut feeder_shutdown = shutdown.subscribe();
1519 task_group.spawn("polygon_spot_feeder", async move {
1520 let mut interval = tokio::time::interval(std::time::Duration::from_secs(5));
1521 loop {
1522 tokio::select! {
1523 _ = interval.tick() => {}
1524 _ = feeder_shutdown.recv() => break,
1525 }
1526 let mut updates: Vec<(String, Option<f64>)> = Vec::new();
1527 for underlying in &underlyings {
1528 updates.push((underlying.clone(), gc.get_spot_price(underlying).await));
1529 }
1530 let mut map = spots.write().expect("platform_spots poisoned");
1531 for (underlying, price) in updates {
1532 if let Some(p) = price {
1533 map.insert(underlying, p);
1534 } else {
1535 map.remove(&underlying);
1536 }
1537 }
1538 }
1539 Ok(())
1540 });
1541 }
1542
1543 let mut readiness_checks: Vec<Arc<dyn crate::readiness::Readiness>> = vec![
1547 Arc::new(SyncStatusReadiness::new(
1548 "portfolio",
1549 portfolio_cache.sync_status(),
1550 )),
1551 Arc::new(SyncStatusReadiness::new(
1552 "instruments",
1553 instruments_cache.sync_status(),
1554 )),
1555 ];
1556 if runtime.config.api.require_risk_vol_readiness {
1557 readiness_checks.push(Arc::new(VolOracleReadiness::new(
1558 "risk_vol_oracle",
1559 risk_vol_oracle.clone(),
1560 )));
1561 }
1562
1563 let _hypercore_position_service =
1564 if std::env::var("ENABLE_INTEGRATED_HYPERCORE_POSITION_SERVICE").is_ok() {
1565 let update_interval_secs = std::env::var("HYPERCORE_UPDATE_INTERVAL_SECS")
1566 .ok()
1567 .and_then(|raw| raw.parse::<u64>().ok())
1568 .unwrap_or(30);
1569 let service = Arc::new(HypercorePositionService::with_default_url(
1570 Duration::from_secs(update_interval_secs),
1571 event_bus.clone(),
1572 shared_db.clone(),
1573 ));
1574 let service_for_task = service.clone();
1575 let hypercore_shutdown = shutdown.subscribe();
1576 task_group.spawn("HypercorePositionService", async move {
1577 service_for_task
1578 .run_with_shutdown(hypercore_shutdown)
1579 .await
1580 .map_err(|e| anyhow::anyhow!("HyperCore position service failed: {}", e))
1581 });
1582 tracing::info!(
1583 "✓ HyperCore position service started inside integrated server (interval={}s)",
1584 update_interval_secs
1585 );
1586 Some(service)
1587 } else {
1588 None
1589 };
1590
1591 let span_margin_service = Arc::new(SpanMarginService::new_with_vol_oracle(
1594 config.clone(),
1595 risk_vol_oracle.clone(),
1596 ));
1597 tracing::info!("✓ SpanMarginService initialized for API layer");
1598
1599 let snapshot_open_orders = Arc::new(
1602 crate::rsm::engine_snapshot::SnapshotOpenOrdersSource::new(order_snapshot.clone()),
1603 );
1604 let risk_account_builder = Arc::new(RiskAccountBuilder::new_with_balance_provider(
1605 balance_provider.clone(),
1606 portfolio_service.clone(),
1607 snapshot_open_orders,
1608 greeks_cache.clone(),
1609 ));
1610 tracing::info!("✓ RiskAccountBuilder initialized for API layer");
1611
1612 let standard_margin_service = Arc::new(StandardMarginService::new());
1614 tracing::info!("✓ StandardMarginService initialized for API layer");
1615
1616 let standard_account_builder = Arc::new(StandardAccountBuilder::new_with_balance_provider(
1618 balance_provider.clone(),
1619 portfolio_service.clone(),
1620 greeks_cache.clone(),
1621 ));
1622 tracing::info!("✓ StandardAccountBuilder initialized for API layer");
1623
1624 portfolio_cache
1626 .set_margin_dependencies(
1627 span_margin_service.clone(),
1628 greeks_cache.clone(),
1629 risk_account_builder.clone(),
1630 tier_cache.clone(),
1631 order_snapshot.clone(),
1632 standard_margin_service.clone(),
1633 standard_account_builder.clone(),
1634 )
1635 .await;
1636
1637 {
1641 let t = Instant::now();
1642 let repriced = portfolio_cache.reprice_all_wallets().await;
1643 startup_timing!("portfolio_reprice_all", t.elapsed(), "{} wallets", repriced);
1644 }
1645
1646 if !standby_mode_active {
1648 let historical_pnl_config =
1649 HistoricalPnlTaskConfig::from_config(&runtime.config.background_tasks.historical_pnl);
1650 let historical_pnl_task = HistoricalPnlSnapshotTask::new(
1651 portfolio_cache.clone(),
1652 diesel_db.clone(),
1653 historical_pnl_config.clone(),
1654 );
1655 service_registry.register(Arc::new(historical_pnl_task));
1656 info!(
1657 "Historical pnl snapshot task registered (max_periods={}, capture_every_5m_ms={}, capture_every_1h_ms={}, capture_every_1d_ms={})",
1658 historical_pnl_config.max_periods,
1659 historical_pnl_config.capture_every_5m_ms,
1660 historical_pnl_config.capture_every_1h_ms,
1661 historical_pnl_config.capture_every_1d_ms
1662 );
1663
1664 let historical_theo_config = HistoricalTheoTaskConfig::from_env();
1665 let historical_theo_task = HistoricalTheoSnapshotTask::new(
1666 instruments_cache.clone(),
1667 greeks_cache.clone(),
1668 diesel_db.clone(),
1669 historical_theo_config.clone(),
1670 );
1671 service_registry.register(Arc::new(historical_theo_task));
1672 info!(
1673 "Historical theo snapshot task registered (max_periods={}, capture_every_5m_ms={}, capture_every_1h_ms={}, capture_every_1d_ms={})",
1674 historical_theo_config.max_periods,
1675 historical_theo_config.capture_every_5m_ms,
1676 historical_theo_config.capture_every_1h_ms,
1677 historical_theo_config.capture_every_1d_ms
1678 );
1679
1680 let vol_surface_config =
1681 crate::runtime::tasks::vol_surface_snapshot::VolSurfaceSnapshotConfig::from_env();
1682 let vol_surface_task =
1683 crate::runtime::tasks::vol_surface_snapshot::VolSurfaceSnapshotTask::new(
1684 risk_vol_oracle.clone(),
1685 diesel_db.clone(),
1686 vol_surface_config.clone(),
1687 );
1688 service_registry.register(Arc::new(vol_surface_task));
1689 info!(
1690 "Vol surface snapshot task registered (max_periods={}, capture_every_5m_ms={}, capture_every_1h_ms={}, capture_every_1d_ms={})",
1691 vol_surface_config.max_periods,
1692 vol_surface_config.capture_every_5m_ms,
1693 vol_surface_config.capture_every_1h_ms,
1694 vol_surface_config.capture_every_1d_ms
1695 );
1696 } else {
1697 info!("Historical pnl/theo/vol-surface snapshot tasks deferred (standby mode)");
1698 }
1699
1700 let bbo_snapshot_config =
1702 BboSnapshotTaskConfig::from_config(&runtime.config.background_tasks.bbo_snapshot);
1703 let bbo_snapshot_service = Arc::new(BboSnapshotService::new(
1704 instruments_cache.clone(),
1705 quote_provider.clone(),
1706 diesel_db.clone(),
1707 bbo_snapshot_config.clone(),
1708 ));
1709 if !standby_mode_active {
1710 service_registry.register(bbo_snapshot_service.clone());
1711 info!(
1712 "BBO snapshot task registered (interval_secs={}, retention_days={})",
1713 bbo_snapshot_config.interval_secs, bbo_snapshot_config.retention_days
1714 );
1715 } else {
1716 info!("BBO snapshot task deferred (standby mode)");
1717 }
1718
1719 let options_summary_snapshot_refresh_ms = services::parse_positive_refresh_ms_env(
1720 "OPTIONS_SUMMARY_SNAPSHOT_REFRESH_MS",
1721 markets_snapshot_refresh_ms,
1722 );
1723 let indicative_cache =
1724 Arc::new(hypercall_api::rfq::indicative_quote_cache::IndicativeQuoteCache::new(60_000));
1725 let options_summary_snapshot_cache = Arc::new(
1726 hypercall_api::caches::OptionsSummarySnapshotCache::new(
1727 instruments_cache.clone(),
1728 greeks_cache.clone(),
1729 quote_provider.clone(),
1730 bbo_snapshot_service.clone(),
1731 Some(indicative_cache.clone()),
1732 diesel_db.clone(),
1733 )
1734 .with_refresh_interval(Duration::from_millis(options_summary_snapshot_refresh_ms)),
1735 );
1736 options_summary_snapshot_cache.initialize().await;
1737 service_registry.register(options_summary_snapshot_cache.clone());
1738 tracing::info!(
1739 "✓ OptionsSummarySnapshotCache initialized (refresh_ms={})",
1740 options_summary_snapshot_refresh_ms
1741 );
1742
1743 let upstash_init = hypercall_api::upstash::init(
1745 &markets_snapshot_cache,
1746 &competitions_snapshot_cache,
1747 &sparklines_snapshot_cache,
1748 &instruments_snapshot_cache,
1749 &options_summary_snapshot_cache,
1750 diesel_db.clone(),
1751 )
1752 .await?;
1753 if let Some(publisher) = upstash_init.publisher {
1754 service_registry.register(publisher);
1755 }
1756 let username_redis_client = upstash_init.redis_client;
1757
1758 let periodic_margin_cache = portfolio_cache.clone();
1763 let mut periodic_margin_shutdown_rx = shutdown.subscribe();
1764 task_group.spawn("PortfolioMarginWsRefresh", async move {
1765 let mut interval = tokio::time::interval(Duration::from_secs(5));
1766 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
1767
1768 loop {
1769 tokio::select! {
1770 _ = periodic_margin_shutdown_rx.recv() => {
1771 info!("PortfolioMarginWsRefresh received shutdown signal");
1772 break;
1773 }
1774 _ = interval.tick() => {
1775 let position_wallets = periodic_margin_cache
1776 .publish_position_updates_for_subscribers()
1777 .await;
1778 let wallets = periodic_margin_cache
1779 .publish_margin_updates_for_subscribers()
1780 .await;
1781 let greek_wallets = periodic_margin_cache
1782 .publish_greeks_updates_for_subscribers()
1783 .await;
1784 tracing::debug!(
1785 "PortfolioMarginWsRefresh published position updates for {} wallets, margin updates for {} wallets, and greeks updates for {} wallets",
1786 position_wallets,
1787 wallets,
1788 greek_wallets
1789 );
1790 }
1791 }
1792 }
1793 Ok(())
1794 });
1795
1796 let competition_service_for_ws = competition_service.clone();
1798 let competition_pubsub = pubsub.clone();
1799 let mut competition_ws_shutdown_rx = shutdown.subscribe();
1800 task_group.spawn("CompetitionWsRefresh", async move {
1801 let mut interval = tokio::time::interval(Duration::from_secs(5));
1802 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
1803 let mut last_rank_by_wallet: HashMap<WalletAddress, (i64, i64)> = HashMap::new();
1804 let mut last_gap_by_wallet: HashMap<WalletAddress, (i64, Option<i64>, Option<rust_decimal::Decimal>)> =
1805 HashMap::new();
1806
1807 loop {
1808 tokio::select! {
1809 _ = competition_ws_shutdown_rx.recv() => {
1810 info!("CompetitionWsRefresh received shutdown signal");
1811 break;
1812 }
1813 _ = interval.tick() => {
1814 let now_ts_ms = chrono::Utc::now().timestamp_millis();
1815
1816 match competition_service_for_ws.finalize_ended_competitions(now_ts_ms).await {
1817 Ok(final_stats) => {
1818 for stats in final_stats {
1819 competition_pubsub
1820 .publish_competition_final_stats(stats.clone())
1821 .await;
1822 competition_pubsub
1823 .publish_competition_final_standing(WsCompetitionFinalStanding {
1824 wallet_address: stats.wallet_address,
1825 competition_id: stats.competition_id,
1826 rank: stats.rank,
1827 pnl: stats.pnl,
1828 volume: stats.volume,
1829 efficiency: stats.efficiency,
1830 medal: stats.medal,
1831 timestamp: stats.timestamp,
1832 })
1833 .await;
1834 }
1835 }
1836 Err(err) => {
1837 error!("Competition finalization failed: {}", err);
1838 }
1839 }
1840
1841 let wallets = competition_pubsub.subscribed_wallets("competition").await;
1842 for wallet in wallets {
1843 match competition_service_for_ws
1844 .build_competition_update_for_wallet(wallet, now_ts_ms)
1845 .await
1846 {
1847 Ok(Some(update)) => {
1848 competition_pubsub.publish_competition_update(update).await;
1849 }
1850 Ok(None) => {}
1851 Err(err) => {
1852 error!(
1853 "Failed to build competition update for wallet {}: {}",
1854 wallet, err
1855 );
1856 }
1857 }
1858 }
1859
1860 let engagement_wallets = competition_pubsub
1861 .subscribed_wallets("competition_engagement")
1862 .await;
1863 for wallet in engagement_wallets.iter().copied() {
1864 match competition_service_for_ws
1865 .build_competition_engagement_snapshot_for_wallet(wallet, now_ts_ms)
1866 .await
1867 {
1868 Ok(Some(snapshot)) => {
1869 if let Some((prev_competition_id, prev_rank)) =
1870 last_rank_by_wallet.get(&wallet).copied()
1871 {
1872 if prev_competition_id == snapshot.competition_id
1873 && prev_rank != snapshot.rank
1874 {
1875 competition_pubsub
1876 .publish_competition_rank_change(WsCompetitionRankChange {
1877 wallet_address: wallet,
1878 competition_id: snapshot.competition_id,
1879 from_rank: prev_rank,
1880 to_rank: snapshot.rank,
1881 delta_places: prev_rank - snapshot.rank,
1882 pnl: snapshot.pnl,
1883 timestamp: now_ts_ms,
1884 })
1885 .await;
1886 }
1887 }
1888
1889 let gap_state = (
1890 snapshot.competition_id,
1891 snapshot.next_rank,
1892 snapshot.gap_metric_value,
1893 );
1894 let should_publish_gap = last_gap_by_wallet
1895 .get(&wallet)
1896 .map(|prev| prev != &gap_state)
1897 .unwrap_or(true);
1898 if should_publish_gap {
1899 competition_pubsub
1900 .publish_competition_gap_update(WsCompetitionGapUpdate {
1901 wallet_address: wallet,
1902 competition_id: snapshot.competition_id,
1903 rank: snapshot.rank,
1904 next_rank: snapshot.next_rank,
1905 gap_metric_value: snapshot.gap_metric_value,
1906 timestamp: now_ts_ms,
1907 })
1908 .await;
1909 }
1910
1911 last_rank_by_wallet
1912 .insert(wallet, (snapshot.competition_id, snapshot.rank));
1913 last_gap_by_wallet.insert(wallet, gap_state);
1914 }
1915 Ok(None) => {
1916 last_rank_by_wallet.remove(&wallet);
1917 last_gap_by_wallet.remove(&wallet);
1918 }
1919 Err(err) => {
1920 error!(
1921 "Failed to build competition engagement snapshot for wallet {}: {}",
1922 wallet, err
1923 );
1924 }
1925 }
1926 }
1927
1928 let engagement_wallets_set =
1929 engagement_wallets.iter().copied().collect::<std::collections::HashSet<_>>();
1930 last_rank_by_wallet.retain(|wallet, _| engagement_wallets_set.contains(wallet));
1931 last_gap_by_wallet.retain(|wallet, _| engagement_wallets_set.contains(wallet));
1932 }
1933 }
1934 }
1935 Ok(())
1936 });
1937
1938 tracing::info!(
1940 "✓ Wired SpanMarginService + RiskAccountBuilder into PortfolioCache for WS margin updates"
1941 );
1942
1943 let standby_mode = standby_mode_active;
1949
1950 let (standby_controller, _standby_promote_rx) = if standby_mode {
1951 let (controller, promote_rx) =
1952 crate::runtime::standby::StandbyController::new_standby(1000);
1953 info!("Standby mode enabled — warmup complete, engine deferred until POST /admin/promote");
1954 (Some(controller), Some(promote_rx))
1955 } else {
1956 (None, None)
1957 };
1958
1959 let rsm_signer_configured = runtime.config.rsm_signer.is_configured();
1960 let local_rsm_signer_configured = runtime.config.transaction_submitter.mode
1961 == hypercall_transaction_submitter::TransactionSubmitterMode::Direct;
1962 let onchain_deposits_enabled = true;
1967
1968 let rsm_signer_service: Option<Arc<dyn hypercall_signer::RsmSigner>> = if runtime
1969 .config
1970 .liquidation
1971 .enabled
1972 || rsm_signer_configured
1973 || local_rsm_signer_configured
1974 {
1975 if let Some(controller) = standby_controller.clone() {
1976 runtime
1977 .secrets
1978 .database_url
1979 .as_ref()
1980 .context("standby RSM signer requires DATABASE_URL for post-promote init")?;
1981 Some(
1982 Arc::new(crate::startup::rsm_signer::StandbyRsmSignerService::new(
1983 controller,
1984 runtime.config.clone(),
1985 runtime.secrets.clone(),
1986 db_auth.clone(),
1987 rsm_signer_chain_id,
1988 )) as Arc<dyn hypercall_signer::RsmSigner>,
1989 )
1990 } else {
1991 Some(
1992 crate::startup::rsm_signer::build_rsm_signer_service(
1993 &runtime.config,
1994 &runtime.secrets,
1995 diesel_db.clone(),
1996 rsm_signer_chain_id,
1997 )
1998 .await
1999 .map_err(|error| anyhow::anyhow!("Failed to initialize RSM signer: {}", error))?,
2000 )
2001 }
2002 } else {
2003 None
2004 };
2005
2006 let portfolio_service_for_snapshot = portfolio_cache.get_service();
2014 let snapshot_capture_portfolio_cache = portfolio_cache.clone();
2015 let snapshot_capture_diesel = shared_db.clone();
2016 let snapshot_capture_sender = journal_batch_sender.clone();
2017 let snapshot_task = Arc::new(
2018 PortfolioSnapshotTask::new(
2019 shared_db.clone(),
2020 shared_db.clone(),
2021 portfolio_service_for_snapshot,
2022 SnapshotTaskConfig::default(),
2023 )
2024 .with_capture_snapshot(Arc::new(move || {
2025 tokio::task::block_in_place(|| {
2026 tokio::runtime::Handle::current().block_on(async {
2027 let _guard = snapshot_capture_portfolio_cache
2028 .lock_projection_barrier()
2029 .await;
2030 if let Some(sender) = &snapshot_capture_sender {
2031 let (ack_tx, ack_rx) = tokio::sync::oneshot::channel();
2032 sender
2033 .send(crate::journal::JournalMessage::Flush(ack_tx))
2034 .await
2035 .map_err(|e| {
2036 anyhow::anyhow!("Failed to flush journal batcher: {}", e)
2037 })?;
2038 ack_rx.await.map_err(|_| {
2039 anyhow::anyhow!("Journal batcher dropped snapshot flush ack")
2040 })?;
2041 }
2042
2043 let replay: &dyn hypercall_db::JournalReplayReader =
2044 snapshot_capture_diesel.as_ref();
2045 let next_command_id = replay.get_next_engine_command_id_sync()?;
2046 let states = snapshot_capture_portfolio_cache
2047 .capture_portfolios_under_barrier(&_guard)
2048 .await;
2049
2050 let mut offsets = HashMap::new();
2051 offsets.insert(
2052 crate::read_cache::portfolio::ENGINE_COMMAND_SNAPSHOT_STREAM.to_string(),
2053 HashMap::from([(0, next_command_id)]),
2054 );
2055 Ok((states, offsets))
2056 })
2057 })
2058 })),
2059 );
2060 if !standby_mode_active {
2061 service_registry.register(snapshot_task);
2062 info!("Portfolio snapshot task started (60s interval)");
2063 } else {
2064 info!("Portfolio snapshot task deferred (standby mode)");
2065 }
2066
2067 let mark_price_oracles_for_recovery = mark_price_oracles.clone();
2069
2070 let engine_wal_path = hypercall_journal::checkpoint::wal_path_from_config(
2071 runtime.config.engine.persistence.journal.wal_path.as_ref(),
2072 );
2073 let engine_drain_marker_path = handlers::health::drain_marker_path(&engine_wal_path);
2074 let engine_wal_path_configured =
2075 hypercall_journal::checkpoint::wal_path_is_explicitly_configured(
2076 runtime.config.engine.persistence.journal.wal_path.as_ref(),
2077 );
2078 let engine_start_quiesced =
2079 engine_wal_path_configured && tokio::fs::metadata(&engine_drain_marker_path).await.is_ok();
2080 if engine_start_quiesced {
2081 info!(
2082 path = %engine_drain_marker_path.display(),
2083 "Persistent drain marker found; engine will start quiesced"
2084 );
2085 }
2086 let recovery_safety_report = hypercall_api::recovery_safety::shared_report();
2087 let portfolio_margin_mode_allowlist = parse_configured_wallet_set(
2088 "PORTFOLIO_MARGIN_MODE_ALLOWLIST",
2089 &runtime.config.engine.portfolio_margin_mode_allowlist,
2090 )?;
2091 let portfolio_margin_settlement_allowlist = parse_configured_wallet_set(
2092 "PORTFOLIO_MARGIN_SETTLEMENT_ALLOWLIST",
2093 &runtime.config.engine.portfolio_margin_settlement_allowlist,
2094 )?;
2095
2096 let mut builder = UnifiedEngineBuilder::new(config)
2097 .with_order_buffer_size(1000)
2098 .with_database(&engine_database_url)
2099 .with_db_auth(engine_db_auth)
2100 .with_runtime_settings(crate::rsm::unified_engine::EngineRuntimeSettings {
2101 snapshot_interval: Duration::from_secs(runtime.config.engine.snapshot_interval_secs),
2102 read_snapshot_interval: Duration::from_millis(
2103 runtime.config.engine.read_snapshot_interval_ms,
2104 ),
2105 post_startup_reconcile_delay: Duration::from_secs(
2106 runtime.config.engine.post_startup_reconcile_delay_secs,
2107 ),
2108 wal_path: runtime.config.engine.persistence.journal.wal_path.clone(),
2109 start_quiesced: engine_start_quiesced,
2110 recovery_safety_report: Some(recovery_safety_report.clone()),
2111 portfolio_margin_pool_enabled: runtime.config.engine.portfolio_margin_pool_enabled,
2112 portfolio_margin_settlement_allowlist: portfolio_margin_settlement_allowlist.clone(),
2113 })
2114 .with_mmp_cache(mmp_cache.clone())
2115 .with_tier_cache(tier_cache.clone())
2116 .with_portfolio_service(portfolio_service)
2117 .with_portfolio_cache(portfolio_cache.clone())
2118 .with_greeks_cache(greeks_cache.clone())
2119 .with_mark_price_oracles(mark_price_oracles)
2120 .with_risk_account_builder(risk_account_builder.clone())
2121 .with_vol_oracle(risk_vol_oracle.clone())
2122 .with_standard_account_builder(standard_account_builder.clone())
2123 .with_liquidation_cache(liquidation_cache.clone());
2124
2125 if standby_mode_active || skip_db_migrations {
2129 builder = builder.with_skip_db_migrations();
2130 }
2131
2132 if let Some(ref writer) = engine_journal_writer {
2134 builder = builder.with_journal_writer(writer.clone());
2135 }
2136
2137 if let Some(ref sender) = journal_batch_sender {
2139 builder = builder.with_journal_batch_sender(sender.clone());
2140 }
2141
2142 if engine_journal_writer.is_none() && journal_batch_sender.is_none() {
2144 builder = builder.with_mock_journal();
2145 }
2146
2147 let default_standby_replay_start_seq = if standby_mode_active {
2148 let nats_config =
2149 crate::nats::NatsConfig::from_env().expect("NATS_URL required for standby mode");
2150 let latest_seq = nats_config
2151 .latest_stream_sequence()
2152 .await
2153 .with_context(|| "Failed to read NATS stream tail for standby replay start sequence")?;
2154 info!(
2155 latest_seq,
2156 "Captured NATS stream tail before standby startup recovery"
2157 );
2158 Some(latest_seq as i64)
2159 } else {
2160 None
2161 };
2162
2163 let mut balance_update_publisher_for_api = None;
2164 let mut balance_update_stream_required = false;
2165
2166 if let Some(nats_config) = crate::nats::NatsConfig::from_env() {
2168 balance_update_stream_required = true;
2169 match crate::nats::NatsPublisher::connect(&nats_config).await {
2170 Ok(publisher) => {
2171 builder = builder.with_nats_publisher(publisher);
2172 info!("NATS publisher connected for real-time command replication");
2173 }
2174 Err(e) => {
2175 warn!(
2176 "Failed to connect NATS publisher (replication disabled): {}",
2177 e
2178 );
2179 }
2180 }
2181 match crate::nats::NatsBalanceUpdatePublisher::connect(&nats_config).await {
2182 Ok(publisher) => {
2183 balance_update_publisher_for_api = Some(publisher.clone());
2184 builder = builder.with_nats_balance_update_publisher(publisher);
2185 info!("NATS balance update publisher connected");
2186 }
2187 Err(e) => {
2188 warn!("Failed to connect NATS balance update publisher: {}", e);
2189 }
2190 }
2191 }
2192
2193 #[cfg(feature = "testnet")]
2194 let testnet_bootstrap_accounts = {
2195 tracing::warn!("testnet feature enabled, funding hardhat test accounts");
2196 let test_accounts = vec![
2197 (
2198 WalletAddress::from_str("0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266").unwrap(),
2199 100_000.0,
2200 ), (
2202 WalletAddress::from_str("0x70997970C51812dc3A010C7d01b50e0d17dc79C8").unwrap(),
2203 100_000.0,
2204 ), (
2208 WalletAddress::from_str("0x90F79bf6EB2c4f870365E785982E1f101E93b906").unwrap(),
2209 10_000.0,
2210 ), ];
2212
2213 test_accounts
2214 };
2215
2216 builder = builder.with_read_snapshot(read_snapshot.clone());
2217
2218 builder = builder.with_ws_event_sender(ws_direct_tx.clone());
2219
2220 let engine_result = builder.build_with_info(engine_event_tx.clone(), shutdown.tx());
2221 let mut unified_engine = engine_result.engine;
2222 let engine_order_tx = engine_result.order_tx;
2223 let engine_market_tx = engine_result.market_tx;
2224 let engine_margin_mode_tx = engine_result.margin_mode_tx;
2225 let engine_agent_auth_tx = engine_result.agent_auth_tx;
2226 let engine_nonce_check_tx = engine_result.nonce_check_tx;
2227 let engine_pm_settlement_admin_tx = engine_result.pm_settlement_admin_tx;
2228 let engine_quiesce_tx = engine_result.quiesce_tx;
2229
2230 readiness_checks.push(Arc::new(SyncStatusReadiness::new(
2233 "engine",
2234 engine_result.sync_status,
2235 )));
2236 let readiness = Arc::new(ReadinessRegistry::new(readiness_checks));
2237 info!(
2238 "Readiness registry initialized with {} checks (including engine)",
2239 readiness.reports().len()
2240 );
2241
2242 if hydromancer_client.is_some() {
2244 let recovery_pool = db_pool.clone();
2245 let recovery_oracles = mark_price_oracles_for_recovery.clone();
2246 task_group.spawn("HydromancerStartupRecovery", async move {
2247 match crate::startup::hydromancer::recover_stuck_settlements(
2248 &recovery_pool,
2249 &recovery_oracles,
2250 )
2251 .await
2252 {
2253 Ok(count) => {
2254 if count > 0 {
2255 info!(
2256 "Hydromancer startup recovery: settled {} stuck instruments",
2257 count
2258 );
2259 }
2260 }
2261 Err(e) => {
2262 error!("Hydromancer startup recovery failed: {}", e);
2263 }
2264 }
2265 Ok(())
2266 });
2267 }
2268
2269 let (rfq_execute_tx, rfq_execute_rx) =
2271 tokio::sync::mpsc::channel::<hypercall_runtime_api::RfqExecuteRequest>(128);
2272 unified_engine.set_rfq_receiver(rfq_execute_rx);
2273
2274 unified_engine.set_deposit_receiver(deposit_rx);
2277
2278 #[cfg(feature = "testnet")]
2279 let testnet_bootstrap_deposits = {
2280 let mut deposits = Vec::new();
2281 let now_ms = crate::shared::order_types::get_timestamp_millis();
2282 for (idx, (wallet, cash)) in testnet_bootstrap_accounts.iter().enumerate() {
2283 let sequence = testnet_bootstrap_deposit_sequence(idx);
2284 let source_event_hash = testnet_bootstrap_source_event_hash(sequence);
2285 if unified_engine
2286 .ctx
2287 .applied_deposit_source_event_hashes
2288 .contains(&source_event_hash)
2289 {
2290 continue;
2291 }
2292
2293 let current_cash = unified_engine.ctx.balance_ledger.balance(wallet);
2294 if current_cash != rust_decimal::Decimal::ZERO {
2295 warn!(
2296 wallet = %wallet,
2297 current_cash = %current_cash,
2298 "Skipping testnet bootstrap funding for wallet with restored engine cash but no bootstrap source hash"
2299 );
2300 continue;
2301 }
2302
2303 let amount = rust_decimal::Decimal::from_f64_retain(*cash)
2304 .expect("testnet bootstrap cash f64 -> Decimal conversion");
2305 let (applied_tx, applied_rx) = tokio::sync::oneshot::channel();
2306 deposits.push((
2307 crate::rsm::unified_engine::DepositRequest {
2308 wallet: *wallet,
2309 amount,
2310 timestamp_ms: now_ms,
2311 sequence: Some(sequence),
2312 source_event_hash,
2313 journal_request_id: testnet_bootstrap_request_id(idx),
2314 outbox_appends: Vec::new(),
2315 applied_tx: Some(applied_tx),
2316 },
2317 applied_rx,
2318 ));
2319 }
2320 deposits
2321 };
2322
2323 let (option_deposit_tx, option_deposit_rx) =
2324 tokio::sync::mpsc::channel::<crate::rsm::unified_engine::OptionDepositRequest>(128);
2325 unified_engine.set_option_deposit_receiver(option_deposit_rx);
2326
2327 let (option_withdrawal_tx, option_withdrawal_rx) =
2328 tokio::sync::mpsc::channel::<crate::rsm::unified_engine::OptionWithdrawalRequest>(128);
2329 unified_engine.set_option_withdrawal_receiver(option_withdrawal_rx);
2330
2331 let (cash_withdrawal_tx, cash_withdrawal_rx) =
2332 tokio::sync::mpsc::channel::<crate::rsm::unified_engine::CashWithdrawalRequest>(128);
2333 unified_engine.set_cash_withdrawal_receiver(cash_withdrawal_rx);
2334
2335 if !standby_mode {
2339 if let Some(rsm_signer) = rsm_signer_service.as_ref() {
2340 let signer_address = rsm_signer.signer_address();
2341 let durable_next_nonce = shared_db
2342 .get_rsm_signer_nonce(&signer_address)
2343 .await
2344 .with_context(|| {
2345 format!("failed to load durable RSM signer nonce for {signer_address}")
2346 })?
2347 .map(|record| {
2348 u64::try_from(record.next_nonce).with_context(|| {
2349 format!(
2350 "persisted next_nonce {} for signer {} is negative",
2351 record.next_nonce, signer_address
2352 )
2353 })
2354 })
2355 .transpose()?
2356 .unwrap_or(0);
2357 unified_engine.seed_rsm_signer_nonce_floor(signer_address, durable_next_nonce);
2358 info!(
2359 signer = %signer_address,
2360 durable_next_nonce,
2361 "Seeded engine RSM signer nonce floor from durable signer tracker"
2362 );
2363 }
2364
2365 if let Some(rsm_signer) = rsm_signer_service.clone() {
2366 let sequencer = Arc::new(crate::directive_outbox::DirectiveDeliverySequencer::new(
2367 shared_db.clone(),
2368 rsm_signer,
2369 engine_event_tx.clone(),
2370 runtime.config.engine.persistence.outbox.poll_ms,
2371 ));
2372 let handle = sequencer.start_with_shutdown(shutdown.subscribe());
2373 task_group.spawn("DirectiveDeliverySequencer", async move {
2374 if let Err(error) = handle.await {
2375 error!("DirectiveDeliverySequencer task panicked: {:?}", error);
2376 }
2377 Ok(())
2378 });
2379 tracing::info!("✓ Directive delivery sequencer started");
2380 }
2381 }
2382
2383 let (tier_update_tx, tier_update_rx) =
2384 tokio::sync::mpsc::channel::<crate::rsm::unified_engine::TierUpdateRequest>(128);
2385 unified_engine.set_tier_update_receiver(tier_update_rx);
2386
2387 let (hypercore_equity_tx, hypercore_equity_rx) =
2389 tokio::sync::mpsc::channel::<crate::rsm::unified_engine::HypercoreEquityRequest>(128);
2390 unified_engine.set_hypercore_equity_receiver(hypercore_equity_rx);
2391
2392 let (liquidation_bonus_tx, liquidation_bonus_rx) =
2394 tokio::sync::mpsc::channel::<crate::rsm::unified_engine::LiquidationBonusRequest>(128);
2395 unified_engine.set_liquidation_bonus_receiver(liquidation_bonus_rx);
2396
2397 let (trading_mode_notify_tx, trading_mode_notify_rx) = tokio::sync::watch::channel::<
2402 std::collections::HashMap<String, hypercall_types::TradingModes>,
2403 >(std::collections::HashMap::new());
2404 unified_engine.set_trading_mode_receiver(trading_mode_notify_rx);
2405
2406 let standby_startup_heartbeat = if standby_mode {
2408 startup_progress.as_ref().map(|progress| {
2409 let progress = progress.clone();
2410 tokio::spawn(async move {
2411 let mut interval = tokio::time::interval(Duration::from_secs(15));
2412 loop {
2413 interval.tick().await;
2414 progress.heartbeat();
2415 }
2416 })
2417 })
2418 } else {
2419 None
2420 };
2421
2422 if standby_mode {
2423 let nats_config =
2424 crate::nats::NatsConfig::from_env().expect("NATS_URL required for standby mode");
2425
2426 if let Some(progress) = startup_progress.as_ref() {
2427 progress.mark("standby_hydrate_base_state");
2428 }
2429 unified_engine.hydrate_standby_base_state().await;
2430 if let Some(progress) = startup_progress.as_ref() {
2431 progress.mark("standby_hydrate_base_state_complete");
2432 }
2433
2434 let progress = standby_progress
2435 .clone()
2436 .context("standby replay progress missing while STANDBY_MODE is active")?;
2437 let promote_rx = standby_replay_promote_rx
2438 .take()
2439 .context("standby promote receiver missing while STANDBY_MODE is active")?;
2440 let (engine_tx, engine_rx) =
2441 tokio::sync::oneshot::channel::<crate::nats::standby_handler::StandbyEngineHandler>();
2442
2443 let handler = crate::nats::standby_handler::StandbyEngineHandler::new(unified_engine);
2444 let default_replay_start_seq = default_standby_replay_start_seq.ok_or_else(|| {
2445 anyhow::anyhow!("NATS stream tail was not captured for standby replay")
2446 })?;
2447 let replay_start_seq = match std::env::var("NATS_REPLAY_START_SEQ") {
2448 Ok(value) => value
2449 .parse::<i64>()
2450 .with_context(|| format!("NATS_REPLAY_START_SEQ must be an i64, got {value:?}"))?,
2451 Err(std::env::VarError::NotPresent) => default_replay_start_seq,
2452 Err(error) => {
2453 return Err(anyhow::anyhow!(
2454 "Failed to read NATS_REPLAY_START_SEQ: {error}"
2455 ));
2456 }
2457 };
2458 if replay_start_seq < 0 {
2459 return Err(anyhow::anyhow!(
2460 "NATS_REPLAY_START_SEQ must be >= 0, got {replay_start_seq}"
2461 ));
2462 }
2463 info!(
2464 replay_start_seq,
2465 default_replay_start_seq, "Starting standby NATS replay"
2466 );
2467 info!("Starting in STANDBY mode — replaying from NATS, engine deferred");
2468 if let Some(progress) = startup_progress.as_ref() {
2469 progress.mark("standby_nats_replay_starting");
2470 }
2471
2472 task_group.spawn("StandbyReplayLoop", async move {
2473 let (bcast_tx, bcast_rx) = tokio::sync::broadcast::channel::<()>(1);
2474 tokio::spawn(async move {
2475 promote_rx.await.ok();
2476 bcast_tx.send(()).ok();
2477 });
2478 match crate::nats::replay_loop::run_replay_loop(
2479 &nats_config,
2480 replay_start_seq,
2481 handler,
2482 progress,
2483 bcast_rx,
2484 )
2485 .await
2486 {
2487 Ok(h) => {
2488 engine_tx.send(h).ok();
2489 }
2490 Err(e) => {
2491 error!("Standby replay loop error: {}", e);
2492 }
2493 }
2494 Ok(())
2495 });
2496
2497 let db_auth_for_promote = db_auth.clone();
2498 let post_promote_services_for_promote = post_promote_services.clone();
2499 let shutdown_for_promote = shutdown.clone();
2500 let shared_db_for_promote = shared_db.clone();
2501 let rsm_signer_for_promote = rsm_signer_service.clone();
2502 let engine_event_tx_for_promote = engine_event_tx.clone();
2503 let directive_outbox_poll_ms = runtime.config.engine.persistence.outbox.poll_ms;
2504 task_group.spawn("StandbyPromote", async move {
2505 match engine_rx.await {
2506 Ok(handler) => {
2507 info!("Standby promoted — creating read-write database pools");
2508 let mut engine = handler.into_engine();
2509
2510 match DatabaseHandler::new_with_auth(db_auth_for_promote.clone(), 3) {
2516 Ok(rw_handler) => {
2517 engine.set_db(rw_handler);
2518 info!("Pool swap complete — engine now has read-write DB access");
2519 }
2520 Err(e) => {
2521 panic!(
2522 "CRITICAL_FAILURE: Failed to create read-write DatabaseHandler on promote: {}. \
2523 Cannot start engine without write-capable persistence.",
2524 e
2525 );
2526 }
2527 }
2528
2529 if let Some(rsm_signer) = rsm_signer_for_promote.clone() {
2530 if let Err(error) = rsm_signer.status().await {
2531 panic!(
2532 "CRITICAL_FAILURE: Failed to initialize RSM signer after standby promotion: {}",
2533 error
2534 );
2535 }
2536
2537 let signer_address = rsm_signer.signer_address();
2538 let durable_next_nonce = shared_db_for_promote
2539 .get_rsm_signer_nonce(&signer_address)
2540 .await
2541 .unwrap_or_else(|error| {
2542 panic!(
2543 "CRITICAL_FAILURE: failed to load durable RSM signer nonce for {} after standby promotion: {}",
2544 signer_address, error
2545 )
2546 })
2547 .map(|record| {
2548 u64::try_from(record.next_nonce).unwrap_or_else(|_| {
2549 panic!(
2550 "CRITICAL_FAILURE: persisted next_nonce {} for signer {} is negative after standby promotion",
2551 record.next_nonce, signer_address
2552 )
2553 })
2554 })
2555 .unwrap_or(0);
2556 engine.seed_rsm_signer_nonce_floor(signer_address, durable_next_nonce);
2557 info!(
2558 signer = %signer_address,
2559 durable_next_nonce,
2560 "Seeded engine RSM signer nonce floor after standby promotion"
2561 );
2562
2563 let sequencer =
2564 Arc::new(crate::directive_outbox::DirectiveDeliverySequencer::new(
2565 shared_db_for_promote.clone(),
2566 rsm_signer,
2567 engine_event_tx_for_promote.clone(),
2568 directive_outbox_poll_ms,
2569 ));
2570 let handle =
2571 sequencer.start_with_shutdown(shutdown_for_promote.subscribe());
2572 tokio::spawn(async move {
2573 if let Err(error) = handle.await {
2574 error!("DirectiveDeliverySequencer task panicked: {:?}", error);
2575 }
2576 });
2577 info!("Directive delivery sequencer started after standby promotion");
2578 }
2579
2580
2581 let services = {
2582 let mut services = post_promote_services_for_promote.lock().await;
2583 std::mem::take(&mut *services)
2584 };
2585 for service in services {
2586 let service_name = service.name();
2587 let owner = service.owner();
2588 let shutdown_rx = shutdown_for_promote.subscribe();
2589 info!(
2590 service = service_name,
2591 owner = %owner,
2592 "Starting deferred service after standby promotion"
2593 );
2594 tokio::spawn(async move {
2595 if let Err(error) = service.run(shutdown_rx).await {
2596 error!(
2597 service = service_name,
2598 owner = %owner,
2599 error = %error,
2600 "Deferred service exited with error"
2601 );
2602 }
2603 });
2604 }
2605
2606 info!("Starting engine event loop after promote");
2607 engine.start().await;
2608 }
2609 Err(_) => {
2610 error!("Replay loop ended without returning engine");
2611 }
2612 }
2613 Ok(())
2614 });
2615
2616 if let Some(progress) = startup_progress.as_ref() {
2617 progress.mark("standby_replay_task_spawned");
2618 }
2619 } else {
2620 task_group.spawn("UnifiedEngine", async move {
2621 unified_engine.start().await;
2622 Ok(())
2623 });
2624
2625 #[cfg(feature = "testnet")]
2626 for (deposit, applied_rx) in testnet_bootstrap_deposits {
2627 let wallet = deposit.wallet;
2628 let amount = deposit.amount;
2629 deposit_tx.send(deposit).await.map_err(|error| {
2630 anyhow::anyhow!("failed to queue testnet bootstrap deposit: {error}")
2631 })?;
2632 match applied_rx.await {
2633 Ok(Ok(())) => {
2634 info!(
2635 wallet = %wallet,
2636 amount = %amount,
2637 "Applied journaled testnet bootstrap deposit"
2638 );
2639 }
2640 Ok(Err(error)) => {
2641 return Err(anyhow::anyhow!(
2642 "testnet bootstrap deposit for {wallet} failed: {error}"
2643 ));
2644 }
2645 Err(error) => {
2646 return Err(anyhow::anyhow!(
2647 "testnet bootstrap deposit ack for {wallet} dropped: {error}"
2648 ));
2649 }
2650 }
2651 }
2652 }
2653
2654 let drain_signal = std::sync::Arc::new(tokio::sync::Notify::new());
2656 let is_draining =
2657 std::sync::Arc::new(std::sync::atomic::AtomicBool::new(engine_start_quiesced));
2658
2659 let mut order_rx = order_rx;
2661 let engine_order_tx_clone = engine_order_tx.clone();
2662 let mut order_forwarder_shutdown_rx = shutdown.subscribe();
2663 let standby_for_forwarder = standby_controller.clone();
2664 let is_draining_for_forwarder = is_draining.clone();
2665 task_group.spawn("OrderForwarder", async move {
2666 loop {
2667 tokio::select! {
2668 _ = order_forwarder_shutdown_rx.recv() => {
2669 info!("OrderForwarder received shutdown signal");
2670 break;
2671 }
2672 maybe_req = order_rx.recv() => {
2673 match maybe_req {
2674 Some(request) => {
2675 if let Some(ref controller) = standby_for_forwarder {
2680 if controller.is_standby().await {
2681 tracing::debug!("Rejecting order — server in standby mode");
2682 let _ = request.response_tx.send(hypercall_types::OrderUpdateMessage {
2683 order_id: None,
2684 info: request.message.info,
2685 status: hypercall_types::OrderUpdateStatus::Rejected,
2686 timestamp: std::time::SystemTime::now()
2687 .duration_since(std::time::UNIX_EPOCH)
2688 .unwrap_or_default()
2689 .as_millis() as u64,
2690 reason: Some("Server in standby mode, not accepting orders".to_string()),
2691 filled_size: rust_decimal::Decimal::ZERO,
2692 wallet_address: request.message.wallet,
2693 mmp_triggered: false,
2694 request_id: request.message.request_id,
2695 }).await;
2696 continue;
2697 }
2698 }
2699 if is_draining_for_forwarder.load(std::sync::atomic::Ordering::Relaxed) {
2701 tracing::debug!("Rejecting order — server is draining");
2702 let _ = request.response_tx.send(hypercall_types::OrderUpdateMessage {
2703 order_id: None,
2704 info: request.message.info,
2705 status: hypercall_types::OrderUpdateStatus::Rejected,
2706 timestamp: std::time::SystemTime::now()
2707 .duration_since(std::time::UNIX_EPOCH)
2708 .unwrap_or_default()
2709 .as_millis() as u64,
2710 reason: Some("Server is draining for blue/green deploy".to_string()),
2711 filled_size: rust_decimal::Decimal::ZERO,
2712 wallet_address: request.message.wallet,
2713 mmp_triggered: false,
2714 request_id: request.message.request_id,
2715 }).await;
2716 continue;
2717 }
2718 if engine_order_tx_clone.send(request).await.is_err() {
2720 tracing::error!("Failed to send order to unified engine");
2721 break;
2722 }
2723 }
2724 None => break,
2725 }
2726 }
2727 }
2728 }
2729 Ok(())
2730 });
2731
2732 if let Some(drain_controller) = standby_controller.clone() {
2734 task_group.spawn("StandbyFinalize", async move {
2735 loop {
2736 if drain_controller.is_promoted().await {
2737 break;
2738 }
2739 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2740 }
2741 let _ = drain_controller.take_drain_batch().await;
2744 info!("Standby controller transitioned to active");
2745 Ok(())
2746 });
2747 }
2748
2749 if runtime.config.catalog_manager.enabled {
2753 let catalog_manager = crate::catalog_manager::CatalogManager::new(
2754 diesel_db.clone(),
2755 catalog_config,
2756 engine_market_tx.clone(),
2757 mark_price_oracles_for_catalog,
2758 runtime.config.catalog_manager.interval_secs,
2759 )
2760 .with_trading_mode_notify(trading_mode_notify_tx);
2761
2762 let catalog_manager = Arc::new(catalog_manager);
2763 if standby_mode {
2764 let deferred_service: Arc<dyn Service> = catalog_manager;
2765 post_promote_services.lock().await.push(deferred_service);
2766 info!("CatalogManager deferred until standby promotion");
2767 } else {
2768 service_registry.register(catalog_manager);
2769 info!(
2770 "✓ CatalogManager started with interval={}s (trading_mode notify channel wired)",
2771 runtime.config.catalog_manager.interval_secs
2772 );
2773 }
2774 } else {
2775 debug!("CatalogManager disabled");
2776 }
2777
2778 let trading_halt = Arc::new(tokio::sync::RwLock::new(TradingHaltState::from_config(
2779 runtime.config.api.global_trading_halted,
2780 &runtime.config.api.halted_markets,
2781 )));
2782 {
2783 let halt_state = trading_halt.read().await;
2784 if halt_state.global_halt.is_some() || !halt_state.halted_markets.is_empty() {
2785 tracing::warn!(
2786 global_halted = halt_state.global_halt.is_some(),
2787 halted_markets = halt_state.halted_markets.len(),
2788 "Trading halt loaded from startup configuration"
2789 );
2790 }
2791 }
2792
2793 let chain_auth: Arc<dyn directives::onchain::ChainAuthReader> = Arc::new(
2794 directives::onchain::AlloyChainAuthReader::new(
2795 &runtime.config.transaction_submitter.rpc_url,
2796 &runtime.config.contracts.exchange_contract_address,
2797 )
2798 .map_err(|e| anyhow::anyhow!("Failed to initialize chain auth reader: {}", e))?,
2799 );
2800 let gas_provider_service = Arc::new(if cfg!(feature = "test-endpoints") {
2801 hypercall_api::gas_provider::GasProviderService::new_mock()
2802 } else {
2803 match hypercall_api::gas_provider::GasProviderService::new(
2804 &runtime.config.transaction_submitter.rpc_url,
2805 )
2806 .await
2807 {
2808 Ok(service) => service,
2809 Err(error)
2810 if runtime.config.transaction_submitter.mode
2811 == hypercall_transaction_submitter::TransactionSubmitterMode::Mock =>
2812 {
2813 let fallback_chain_id = if runtime.config.modes.testnet_mode {
2814 hypercall_types::directives::HYPERCALL_TESTNET_CHAIN_ID
2815 } else {
2816 hypercall_types::directives::HYPERCALL_MAINNET_CHAIN_ID
2817 };
2818 tracing::warn!(
2819 chain_id = fallback_chain_id,
2820 rpc_url = %runtime.config.transaction_submitter.rpc_url,
2821 "Gas provider chain ID lookup failed in mock submitter mode, falling back to configured chain id: {}",
2822 error
2823 );
2824 hypercall_api::gas_provider::GasProviderService::new_with_chain_id(
2825 &runtime.config.transaction_submitter.rpc_url,
2826 fallback_chain_id,
2827 )
2828 .map_err(|fallback_error| {
2829 anyhow::anyhow!(
2830 "Failed to initialize gas provider service fallback: {}",
2831 fallback_error
2832 )
2833 })?
2834 }
2835 Err(error) => {
2836 return Err(anyhow::anyhow!(
2837 "Failed to initialize gas provider service: {}",
2838 error
2839 ));
2840 }
2841 }
2842 });
2843
2844 let signing_chain_id = if runtime.config.modes.testnet_mode {
2845 hypercall_types::directives::HYPERCALL_TESTNET_CHAIN_ID
2846 } else {
2847 hypercall_types::directives::HYPERCALL_MAINNET_CHAIN_ID
2848 };
2849 let api_portfolio_margin_mode_allowlist = portfolio_margin_mode_allowlist
2850 .iter()
2851 .copied()
2852 .collect::<Vec<_>>();
2853 let api_portfolio_margin_settlement_allowlist = portfolio_margin_settlement_allowlist
2854 .iter()
2855 .copied()
2856 .collect::<Vec<_>>();
2857 let app_runtime_config = Arc::new(handlers::AppRuntimeConfig {
2858 testnet_mode: runtime.config.modes.testnet_mode,
2859 trade_explorer_url_template: runtime.config.api.trade_explorer_url_template.clone(),
2860 wal_path: hypercall_journal::checkpoint::wal_path_from_config(
2861 runtime.config.engine.persistence.journal.wal_path.as_ref(),
2862 ),
2863 db_host: crate::startup::database::extract_db_host(&runtime.secrets.database_url),
2864 db_name: crate::startup::database::extract_db_name(&runtime.secrets.database_url),
2865 directive_chain_id: signing_chain_id,
2866 signing_chain_id,
2867 exchange_contract_address: runtime.config.contracts.exchange_contract_address.clone(),
2868 portfolio_margin_pool_enabled: runtime.config.engine.portfolio_margin_pool_enabled,
2869 portfolio_margin_mode_allowlist: api_portfolio_margin_mode_allowlist,
2870 portfolio_margin_settlement_allowlist: api_portfolio_margin_settlement_allowlist,
2871 #[cfg(feature = "rsm-state")]
2872 rsm_environment: runtime
2873 .config
2874 .engine
2875 .persistence
2876 .journal
2877 .rsm_environment
2878 .into(),
2879 });
2880
2881 let username_service = Arc::new(
2883 hypercall_api::username_service::UsernameService::new(
2884 diesel_db.clone(),
2885 username_redis_client,
2886 )
2887 .await,
2888 );
2889 tracing::info!("✓ UsernameService initialized");
2890
2891 let rfq_websocket_publisher: Arc<dyn hypercall_api::rfq::qp_ws_state::RfqWebsocketPublisher> =
2892 pubsub.clone();
2893 let rfq_resources = crate::startup::rfq::build_rfq_resources(
2894 shared_db.clone(),
2895 rfq_execute_tx,
2896 &shutdown,
2897 indicative_cache.clone(),
2898 agent_auth.clone(),
2899 signing_chain_id,
2900 instruments_cache.clone(),
2901 engine_nonce_check_tx.clone(),
2902 Some(rfq_websocket_publisher),
2903 )
2904 .await?;
2905 let rfq_manager = rfq_resources.rfq_manager.clone();
2906 let rfq_handler_state = rfq_resources.rfq_handler_state.clone();
2907 let qp_ws_state = rfq_resources.qp_ws_state;
2908
2909 let app_state = handlers::AppState {
2911 db: diesel_db.clone(),
2912 order_sender: order_tx.clone(),
2913 market_sender: engine_market_tx.clone(),
2914 engine_quiesce_sender: engine_quiesce_tx.clone(),
2915 margin_mode_sender: engine_margin_mode_tx.clone(),
2916 agent_auth_sender: engine_agent_auth_tx.clone(),
2917 agent_auth: agent_auth.clone(),
2918 auth_failure_recorder: Arc::new(
2919 crate::observability::api_boundary::MetricsAuthFailureRecorder,
2920 ),
2921 metrics_renderer: Arc::new(crate::observability::api_boundary::PrometheusMetricsRenderer),
2922 greeks_cache: greeks_cache.clone(),
2923 portfolio_cache: portfolio_cache.clone(),
2924 instruments_cache: instruments_cache.clone(),
2925 market_stats_cache: market_stats_cache.clone(),
2926 markets_snapshot_cache: markets_snapshot_cache.clone(),
2927 instruments_snapshot_cache: instruments_snapshot_cache.clone(),
2928 options_summary_snapshot_cache: options_summary_snapshot_cache.clone(),
2929 event_bus_sender: engine_event_tx.clone(),
2930 chain_auth: chain_auth.clone(),
2931 exchange_address: alloy::primitives::Address::from_str(
2932 &runtime.config.contracts.exchange_contract_address,
2933 )
2934 .ok(),
2935 gas_provider_service: gas_provider_service.clone(),
2936 transaction_request_journal: Some(Arc::new(
2937 crate::api_boundary_impls::RuntimeTransactionRequestJournal::new(
2938 journal_batch_sender.clone(),
2939 engine_journal_writer.clone(),
2940 ),
2941 )),
2942 mmp_cache: mmp_cache.clone(),
2943 tier_cache: tier_cache.clone(),
2944 risk_vol_oracle: risk_vol_oracle.clone(),
2945 readiness: readiness.clone() as Arc<dyn hypercall_api::runtime_status::ReadinessGate>,
2946 balance_provider: Arc::new(crate::api_boundary_impls::RuntimeBalanceProvider::new(
2947 balance_provider.clone(),
2948 )),
2949 balance_snapshot_provider: Arc::new(
2950 crate::api_boundary_impls::RuntimeBalanceSnapshotProvider::new(
2951 snapshot_provider.clone(),
2952 balance_update_publisher_for_api.clone(),
2953 balance_update_stream_required,
2954 ),
2955 ),
2956 deposit_sender: Some(deposit_tx.clone()),
2957 option_deposit_sender: Some(option_deposit_tx.clone()),
2958 option_withdrawal_sender: Some(option_withdrawal_tx.clone()),
2959 cash_withdrawal_sender: Some(cash_withdrawal_tx.clone()),
2960 tier_update_sender: Some(tier_update_tx.clone()),
2961 pm_settlement_admin_sender: Some(engine_pm_settlement_admin_tx.clone()),
2962 sync_db: Some(shared_db.clone()),
2963 rate_limit_cache: rate_limit_cache.clone(),
2964 engine_journal_reader: engine_journal_writer.clone().map(|writer| {
2965 Arc::new(crate::api_boundary_impls::RuntimeEngineJournalReader::new(
2966 writer,
2967 )) as Arc<dyn hypercall_api::boundary::engine::EngineJournalReader>
2968 }),
2969 runtime_config: app_runtime_config,
2970 build_info: hypercall_api::observability_boundary::BuildInfo {
2971 version: crate::observability::CARGO_VERSION.to_string(),
2972 commit: crate::observability::GIT_COMMIT.to_string(),
2973 git_ref: crate::observability::GIT_REF.to_string(),
2974 build_time: crate::observability::BUILD_TIME.to_string(),
2975 },
2976 collateral_registry: collateral_registry.clone(),
2977 admin_api_key: runtime.secrets.admin_api_key.clone(),
2978 allow_unauthenticated_monitoring: runtime.config.environment.name == "development",
2979 boot_id: uuid::Uuid::now_v7().to_string(),
2980 server_started_at: chrono::Utc::now(),
2981 trading_halt: trading_halt.clone(),
2982 rsm_signer: rsm_signer_service.clone(),
2983 rsm_signer_address: if standby_mode {
2984 None
2985 } else {
2986 rsm_signer_service
2987 .as_ref()
2988 .map(|signer| signer.signer_address())
2989 },
2990 hl_info_url: Some(runtime.config.pricing.hypercore_info_url.clone()),
2992 exchange_pool_liquidity_reader: Arc::new(
2993 hypercall_api::state::HypercoreExchangePoolLiquidityReader::new(
2994 Some(runtime.config.pricing.hypercore_info_url.clone()),
2995 sync_exchange_address,
2996 ),
2997 ),
2998 hydromancer_feed: {
2999 if runtime.config.hydromancer.enabled {
3000 match runtime
3001 .config
3002 .hydromancer
3003 .ws_api_key
3004 .as_deref()
3005 .or(runtime.secrets.hydromancer_ws_api_key.as_deref())
3006 .or(runtime.secrets.hydromancer_api_key.as_deref())
3007 {
3008 Some(api_key) => {
3009 let api_key = api_key.trim();
3010 let ws_base = runtime
3011 .config
3012 .hydromancer
3013 .ws_url
3014 .as_deref()
3015 .unwrap_or("wss://api.hydromancer.xyz/ws");
3016 let ws_url = format!("{}?token={}", ws_base, api_key);
3017 let hl_info = runtime.config.pricing.hypercore_info_url.clone();
3019 let managed_accounts = crate::startup::hydromancer::load_exchange_accounts(
3020 &runtime.config.transaction_submitter.rpc_url,
3021 &runtime.config.contracts.exchange_contract_address,
3022 )
3023 .await;
3024 info!(
3025 count = managed_accounts.len(),
3026 "Hydromancer feed: loaded accounts from Exchange"
3027 );
3028
3029 let account_addresses: Vec<WalletAddress> =
3030 managed_accounts.iter().map(|p| p.account).collect();
3031 let account_to_manager: std::collections::HashMap<
3032 WalletAddress,
3033 WalletAddress,
3034 > = managed_accounts
3035 .iter()
3036 .map(|p| (p.account, p.manager))
3037 .collect();
3038
3039 let hydromancer_api_url = runtime.config.hydromancer.api_url.clone();
3040 let hydromancer_api_key = Some(api_key.to_string());
3041 let feed = Arc::new(crate::hypercore::HydromancerFeedService::new(
3042 ws_url,
3043 hl_info,
3044 hydromancer_api_url,
3045 hydromancer_api_key,
3046 account_addresses,
3047 ));
3048 let feed_clone = feed.clone();
3049 let shutdown_rx = shutdown.subscribe();
3050 tokio::spawn(async move {
3051 feed_clone.start(shutdown_rx).await;
3052 });
3053
3054 {
3059 let mut position_rx = feed.subscribe();
3060 let pc = portfolio_cache.clone();
3061 let feed_ref = feed.clone();
3062 let pubsub_for_bridge = pubsub.clone();
3063 let initial_accounts: Vec<WalletAddress> =
3064 managed_accounts.iter().map(|p| p.account).collect();
3065 let a2m =
3066 std::sync::Arc::new(tokio::sync::RwLock::new(account_to_manager));
3067 let chain_auth_bridge: Arc<dyn directives::onchain::ChainAuthReader> =
3068 chain_auth.clone();
3069 let equity_tx = hypercore_equity_tx.clone();
3070 tokio::spawn(async move {
3071 info!(
3072 accounts = initial_accounts.len(),
3073 "PM bridge: task spawned, waiting 10s for feed reconciliation"
3074 );
3075 tokio::time::sleep(std::time::Duration::from_secs(10)).await;
3076 info!("PM bridge: starting initial sync");
3077 for acct in &initial_accounts {
3078 if let Some(state) = feed_ref.get_positions(acct).await {
3079 let manager = {
3080 let map = a2m.read().await;
3081 map.get(acct).copied()
3082 };
3083 if let Some(manager) = manager {
3084 for pos in &state.positions {
3085 let Some(entry_price) = pos.entry_price else {
3086 warn!(
3087 account = %acct,
3088 coin = %pos.coin,
3089 "PM bridge: skipping Hydromancer position with missing entry price"
3090 );
3091 continue;
3092 };
3093 let update =
3094 crate::portfolio::HypercorePositionUpdate {
3095 account: manager.to_string(),
3096 coin: pos.coin.clone(),
3097 size: pos.size,
3098 entry_price,
3099 unrealized_pnl: pos.unrealized_pnl,
3100 timestamp: state.last_updated_ms,
3101 snapshot: true,
3102 };
3103 pc.handle_hypercore_position_update(update).await;
3104 }
3105 {
3107 let account_value =
3108 rust_decimal::Decimal::from_f64_retain(
3109 state.account_value,
3110 )
3111 .unwrap_or_default();
3112 let _ = equity_tx.send(
3113 crate::rsm::unified_engine::HypercoreEquityRequest {
3114 wallet: manager,
3115 account_value,
3116 timestamp_ms: state.last_updated_ms,
3117 },
3118 ).await;
3119 }
3120 info!(
3121 account = %acct,
3122 positions = state.positions.len(),
3123 account_value = state.account_value,
3124 "PM bridge: initial sync"
3125 );
3126 }
3127 }
3128 }
3129
3130 info!("PM bridge: initial sync complete, entering event loop");
3131
3132 let bridge_start_ms =
3135 crate::shared::order_types::get_timestamp_millis();
3136
3137 let mut prev_coins: std::collections::HashMap<
3138 WalletAddress,
3139 std::collections::HashSet<String>,
3140 > = std::collections::HashMap::new();
3141
3142 loop {
3143 match position_rx.recv().await {
3144 Ok(
3145 crate::hypercore::HydromancerEvent::PositionSnapshot {
3146 account,
3147 state,
3148 },
3149 ) => {
3150 let manager = {
3151 let map = a2m.read().await;
3152 map.get(&account).copied()
3153 };
3154 let manager = match manager {
3155 Some(m) => m,
3156 None => {
3157 match chain_auth_bridge
3158 .get_manager(account.inner())
3159 .await
3160 {
3161 Ok(addr)
3162 if addr
3163 != alloy::primitives::Address::ZERO =>
3164 {
3165 let mgr = WalletAddress::from(addr);
3166 a2m.write().await.insert(account, mgr);
3167 mgr
3168 }
3169 _ => continue,
3170 }
3171 }
3172 };
3173
3174 let mut current_coins =
3175 std::collections::HashSet::new();
3176 for pos in &state.positions {
3177 current_coins.insert(pos.coin.clone());
3178 let Some(entry_price) = pos.entry_price else {
3179 warn!(
3180 account = %account,
3181 coin = %pos.coin,
3182 "PM bridge: skipping Hydromancer position with missing entry price"
3183 );
3184 continue;
3185 };
3186 let update =
3187 crate::portfolio::HypercorePositionUpdate {
3188 account: manager.to_string(),
3189 coin: pos.coin.clone(),
3190 size: pos.size,
3191 entry_price,
3192 unrealized_pnl: pos.unrealized_pnl,
3193 timestamp: state.last_updated_ms,
3194 snapshot: true,
3195 };
3196 pc.handle_hypercore_position_update(update).await;
3197 }
3198
3199 {
3201 let account_value =
3202 rust_decimal::Decimal::from_f64_retain(
3203 state.account_value,
3204 )
3205 .unwrap_or_default();
3206 let _ = equity_tx.send(
3207 crate::rsm::unified_engine::HypercoreEquityRequest {
3208 wallet: manager,
3209 account_value,
3210 timestamp_ms: state.last_updated_ms,
3211 },
3212 ).await;
3213 }
3214
3215 if let Some(old) = prev_coins.get(&account) {
3217 for coin in old.difference(¤t_coins) {
3218 let update =
3219 crate::portfolio::HypercorePositionUpdate {
3220 account: manager.to_string(),
3221 coin: coin.clone(),
3222 size: 0.0,
3223 entry_price: 0.0,
3224 unrealized_pnl: 0.0,
3225 timestamp: state.last_updated_ms,
3226 snapshot: true,
3227 };
3228 pc.handle_hypercore_position_update(update)
3229 .await;
3230 }
3231 }
3232 prev_coins.insert(account, current_coins);
3233 }
3234 Ok(crate::hypercore::HydromancerEvent::Fill {
3235 account,
3236 fill,
3237 }) => {
3238 if fill.time < bridge_start_ms {
3239 continue;
3240 }
3241 let manager = {
3242 let map = a2m.read().await;
3243 map.get(&account).copied()
3244 };
3245 let manager = match manager {
3246 Some(m) => m,
3247 None => match chain_auth_bridge
3248 .get_manager(account.inner())
3249 .await
3250 {
3251 Ok(addr)
3252 if addr
3253 != alloy::primitives::Address::ZERO =>
3254 {
3255 let mgr = WalletAddress::from(addr);
3256 a2m.write().await.insert(account, mgr);
3257 mgr
3258 }
3259 _ => continue,
3260 },
3261 };
3262 if let (Ok(size), Ok(price)) =
3263 (fill.sz.parse::<f64>(), fill.px.parse::<f64>())
3264 {
3265 if size == 0.0 || price == 0.0 {
3266 continue;
3267 }
3268 let ws_fill =
3269 hypercall_api::websocket::WsFillUpdate {
3270 order_id: fill.oid.unwrap_or(0) as i64,
3271 fill_id: fill.time as i64,
3272 symbol: format!("{}-PERP", fill.coin),
3273 side: if fill.side == "B" {
3274 "Buy"
3275 } else {
3276 "Sell"
3277 }
3278 .to_string(),
3279 price:
3280 rust_decimal::Decimal::from_f64_retain(
3281 price,
3282 )
3283 .unwrap_or_default(),
3284 size:
3285 rust_decimal::Decimal::from_f64_retain(
3286 size,
3287 )
3288 .unwrap_or_default(),
3289 timestamp: fill.time as i64,
3290 wallet_address: manager,
3291 fee: fill
3292 .fee
3293 .as_deref()
3294 .and_then(|f| f.parse().ok())
3295 .unwrap_or_default(),
3296 trade_id: fill.time as i64,
3297 is_taker: true,
3298 builder_code_address: None,
3299 builder_code_fee: None,
3300 instrument_type: "perp".to_string(),
3301 };
3302 pubsub_for_bridge.publish_fill(ws_fill);
3303
3304 if let Some(state) =
3305 feed_ref.get_positions(&account).await
3306 {
3307 if let Some(pos) = state
3308 .positions
3309 .iter()
3310 .find(|p| p.coin == fill.coin)
3311 {
3312 let Some(entry_price) = pos.entry_price
3313 else {
3314 warn!(
3315 account = %account,
3316 coin = %pos.coin,
3317 "PM bridge: skipping Hydromancer fill position update with missing entry price"
3318 );
3319 continue;
3320 };
3321 pc.handle_hypercore_position_update(
3322 crate::portfolio::HypercorePositionUpdate {
3323 account: manager.to_string(),
3324 coin: pos.coin.clone(),
3325 size: pos.size,
3326 entry_price,
3327 unrealized_pnl: pos.unrealized_pnl,
3328 timestamp: state.last_updated_ms,
3329 snapshot: false,
3330 },
3331 )
3332 .await;
3333 }
3334 {
3336 let account_value =
3337 rust_decimal::Decimal::from_f64_retain(
3338 state.account_value,
3339 )
3340 .unwrap_or_default();
3341 let _ = equity_tx.send(
3342 crate::rsm::unified_engine::HypercoreEquityRequest {
3343 wallet: manager,
3344 account_value,
3345 timestamp_ms: state.last_updated_ms,
3346 },
3347 ).await;
3348 }
3349 }
3350 }
3351 }
3352 Ok(crate::hypercore::HydromancerEvent::OrderUpdate {
3353 ..
3354 }) => {}
3355 Err(tokio::sync::broadcast::error::RecvError::Lagged(
3356 n,
3357 )) => {
3358 warn!(skipped = n, "Hydromancer→PM bridge lagged");
3359 }
3360 Err(_) => break,
3361 }
3362 }
3363 });
3364 info!("✓ Hydromancer→PM position bridge started");
3365 }
3366
3367 info!("✓ Hydromancer feed service started");
3368 Some(feed)
3369 }
3370 None => {
3371 warn!("Hydromancer enabled but no API key configured");
3372 None
3373 }
3374 }
3375 } else {
3376 None
3377 }
3378 },
3379 quote_provider: quote_provider.clone(),
3380 engine_state_digest_provider: Arc::new(
3381 crate::api_boundary_impls::RuntimeEngineStateDigestProvider::new(
3382 engine_state_digest_provider.clone(),
3383 ),
3384 ),
3385 order_snapshot: order_snapshot.clone(),
3386 competition_service: competition_service.clone(),
3387 candle_source: candle_source.clone(),
3388 underlying_to_candle_coin: underlying_to_candle_coin.clone(),
3389 bbo_snapshot_reader: bbo_snapshot_service.clone(),
3390 indicative_cache: Some(indicative_cache.clone()),
3391 rfq_manager: Some(rfq_manager.clone()),
3392 username_service: username_service.clone(),
3393 push_service: push_service.clone(),
3394 standby_controller: standby_controller.clone().map(|controller| {
3395 Arc::new(controller) as Arc<dyn hypercall_api::runtime_status::StandbyPromoter>
3396 }),
3397 notification_service: Some(notification_service.clone()),
3398 standby_promote: standby_promote.clone(),
3399 standby_progress: standby_progress.clone().map(|progress| {
3400 Arc::new(progress) as Arc<dyn hypercall_api::runtime_status::StandbyReplayProgress>
3401 }),
3402 startup_progress: startup_progress.clone().map(|progress| {
3403 Arc::new(progress) as Arc<dyn hypercall_api::runtime_status::StartupProgressReader>
3404 }),
3405 recovery_safety_report: recovery_safety_report.clone(),
3406 drain_signal: drain_signal.clone(),
3407 is_draining: is_draining.clone(),
3408 shutdown: shutdown.clone(),
3409 };
3410
3411 if runtime.config.liquidation.enabled {
3412 let rsm_directive_publisher = crate::rsm_directive_publisher::RsmDirectivePublisher::new(
3413 engine_event_tx.clone(),
3414 journal_batch_sender.clone(),
3415 );
3416 let mut liquidation_executor_builder = crate::liquidator::LiquidationExecutor::new(
3417 liquidation_cache.clone(),
3418 portfolio_cache.clone(),
3419 )
3420 .with_event_sender(ws_direct_tx.clone())
3421 .with_rsm_directive_publisher(rsm_directive_publisher)
3422 .with_full_target_buffer_bps(runtime.config.liquidation.full_target_buffer_bps);
3423 if let Some(signer) = rsm_signer_service.clone() {
3424 liquidation_executor_builder = liquidation_executor_builder.with_rsm_signer(signer);
3425 }
3426 let liquidation_executor = Arc::new(liquidation_executor_builder);
3427
3428 let liquidation_watcher = Arc::new(
3429 crate::liquidator::LiquidationWatcher::new(
3430 liquidation_cache.clone(),
3431 portfolio_cache.get_service(),
3432 tier_cache.clone(),
3433 span_margin_service.clone(),
3434 standard_margin_service.clone(),
3435 crate::liquidator::LiquidationWatcherConfig::from_runtime_config(
3436 &runtime.config.liquidation,
3437 ),
3438 )
3439 .with_risk_account_builder(risk_account_builder.clone())
3440 .with_standard_account_builder(standard_account_builder.clone())
3441 .with_quote_provider(quote_provider.clone())
3442 .with_greeks_cache(greeks_cache.clone())
3443 .with_order_snapshot(order_snapshot.clone())
3444 .with_order_sender(order_tx.clone())
3445 .with_event_sender(ws_direct_tx.clone())
3446 .with_executor(liquidation_executor),
3447 );
3448 let liquidation_observer = Arc::new(
3449 crate::liquidator::LiquidationChainObserver::new(
3450 liquidation_cache.clone(),
3451 diesel_db.clone(),
3452 liquidation_bonus_tx.clone(),
3453 Some(ws_direct_tx.clone()),
3454 event_bus.clone(),
3455 &runtime.config.transaction_submitter.rpc_url,
3456 &runtime.config.contracts.exchange_contract_address,
3457 crate::liquidator::LiquidationObserverConfig::from_runtime_config(
3458 &runtime.config.liquidation,
3459 ),
3460 )
3461 .map_err(|error| {
3462 anyhow::anyhow!("Failed to initialize liquidation chain observer: {}", error)
3463 })?,
3464 );
3465
3466 if standby_mode {
3467 let watcher_service: Arc<dyn Service> = liquidation_watcher;
3468 let observer_service: Arc<dyn Service> = liquidation_observer;
3469 let mut services = post_promote_services.lock().await;
3470 services.push(watcher_service);
3471 services.push(observer_service);
3472 info!("Liquidation watcher and chain observer deferred until standby promotion");
3473 } else {
3474 service_registry.register(liquidation_watcher);
3475 service_registry.register(liquidation_observer);
3476 info!("✓ Liquidation watcher registered");
3477 info!("✓ Liquidation chain observer registered");
3478 }
3479 } else {
3480 info!("Liquidation runtime disabled by config");
3481 }
3482
3483 if onchain_deposits_enabled {
3484 gauge!("ht_hypercore_cash_ledger_observer_enabled").set(1.0);
3485 let cash_ledger_observer = Arc::new(
3486 crate::hypercore_cash_ledger_observer::HypercoreCashLedgerObserver::new(
3487 shared_db.clone(),
3488 deposit_tx.clone(),
3489 crate::hypercore_cash_ledger_observer::HypercoreCashLedgerObserverConfig::from_runtime_config(
3490 &runtime.config.onchain_deposits,
3491 runtime.config.pricing.hypercore_info_url.clone(),
3492 hypercall_types::WalletAddress::from(
3493 alloy::primitives::Address::from_str(
3494 &runtime.config.contracts.exchange_contract_address,
3495 )
3496 .expect("exchange_contract_address already validated at startup"),
3497 ),
3498 hypercall_types::WalletAddress::from(
3499 alloy::primitives::Address::from_str(
3500 &runtime.config.contracts.core_deposit_wallet_address,
3501 )
3502 .expect("core_deposit_wallet_address already validated at startup"),
3503 ),
3504 ),
3505 tier_cache.clone(),
3506 )
3507 .map_err(|error| {
3508 anyhow::anyhow!("Failed to initialize HyperCore cash ledger observer: {}", error)
3509 })?,
3510 );
3511
3512 if standby_mode {
3513 let observer_service: Arc<dyn Service> = cash_ledger_observer;
3514 let mut services = post_promote_services.lock().await;
3515 services.push(observer_service);
3516 info!("HyperCore cash ledger observer deferred until standby promotion");
3517 } else {
3518 service_registry.register(cash_ledger_observer);
3519 info!("✓ HyperCore cash ledger observer registered");
3520 }
3521
3522 gauge!("ht_rsm_deposit_credit_observer_enabled").set(1.0);
3523 let deposit_credit_observer = Arc::new(
3524 crate::rsm_deposit_credit_observer::RsmDepositCreditObserver::new(
3525 shared_db.clone(),
3526 option_deposit_tx.clone(),
3527 engine_pm_settlement_admin_tx.clone(),
3528 &runtime.config.transaction_submitter.rpc_url,
3529 &runtime.config.contracts.exchange_contract_address,
3530 &runtime.config.contracts.usdc_address,
3531 crate::rsm_deposit_credit_observer::RsmDepositCreditObserverConfig::from_runtime_config(
3532 &runtime.config.onchain_deposits,
3533 ),
3534 )
3535 .map_err(|error| {
3536 anyhow::anyhow!("Failed to initialize RSM deposit credit observer: {}", error)
3537 })?,
3538 );
3539
3540 if standby_mode {
3541 let observer_service: Arc<dyn Service> = deposit_credit_observer;
3542 let mut services = post_promote_services.lock().await;
3543 services.push(observer_service);
3544 info!("RSM deposit credit observer deferred until standby promotion");
3545 } else {
3546 service_registry.register(deposit_credit_observer);
3547 info!("✓ RSM deposit credit observer registered");
3548 }
3549 } else {
3550 gauge!("ht_hypercore_cash_ledger_observer_enabled").set(0.0);
3551 gauge!("ht_rsm_deposit_credit_observer_enabled").set(0.0);
3552 info!(
3553 "HyperCore cash ledger and RSM deposit credit observers disabled because transaction submission is mocked"
3554 );
3555 }
3556
3557 let rate_limit_state = middleware::RateLimitState {
3559 rate_limiter: rate_limit_cache.clone(),
3560 };
3561
3562 let ws_state = WsState {
3564 pubsub: (*pubsub).clone(),
3565 indicative_cache: Some(indicative_cache.clone()),
3566 portfolio_cache: Some(portfolio_cache.clone()),
3567 competition_service: Some(competition_service.clone()),
3568 heartbeat_config: hypercall_api::websocket::ws_heartbeat_config(&runtime.config.api),
3569 rfq_handler_state: Some(rfq_handler_state.clone()),
3570 order_sender: Some(order_tx.clone()),
3571 trading_halt: Some(trading_halt.clone()),
3572 agent_auth: agent_auth.clone(),
3573 signing_chain_id,
3574 };
3575
3576 let candle_ws_poll_interval_ms = candle_ws_poll_interval_ms(&runtime.config.pricing);
3577 let candle_publisher = CandleWsPublisher::new(
3578 pubsub.clone(),
3579 candle_source.clone(),
3580 underlying_to_candle_coin.clone(),
3581 Duration::from_millis(candle_ws_poll_interval_ms),
3582 );
3583 service_registry.register(Arc::new(candle_publisher));
3584 tracing::info!(
3585 "✓ CandleWsPublisher registered (poll_interval_ms={})",
3586 candle_ws_poll_interval_ms
3587 );
3588
3589 let index_price_publisher =
3590 IndexPricePublisher::new(greeks_cache.clone(), pubsub.clone(), spot_price_notify);
3591 service_registry.register(Arc::new(index_price_publisher));
3592 tracing::info!("✓ IndexPricePublisher registered");
3593
3594 let metrics_collector = Arc::new(MetricsCollector::new(
3596 MetricsCollectorConfig::from_config(&runtime.config.observability.metrics),
3597 portfolio_cache.clone(),
3598 quote_provider.clone(),
3599 Some(order_snapshot.clone()),
3600 Some(tier_cache.clone()),
3601 portfolio_margin_settlement_allowlist,
3602 Some(greeks_cache.clone()),
3603 Some(market_stats_cache.clone()),
3604 Some(shared_db.clone()),
3605 Some(diesel_db.clone()),
3606 Some(diesel_db.clone()),
3607 Some(diesel_db.clone()),
3608 Some(risk_vol_oracle.clone()),
3609 ));
3610 service_registry.register(metrics_collector);
3611 tracing::info!("✓ MetricsCollector registered");
3612
3613 let service_count = service_registry.len();
3615 service_registry.start_all(&shutdown, &mut task_group);
3616 info!("✓ ServiceRegistry started {} services", service_count);
3617
3618 let shutdown_for_signal = shutdown.clone();
3621 let mut shutdown_rx_signal = shutdown.subscribe();
3622 task_group.spawn("SignalListener", async move {
3623 let ctrl_c = async {
3625 signal::ctrl_c()
3626 .await
3627 .expect("Failed to setup SIGINT handler");
3628 "SIGINT"
3629 };
3630
3631 #[cfg(unix)]
3632 let terminate = async {
3633 use tokio::signal::unix::{signal, SignalKind};
3634 signal(SignalKind::terminate())
3635 .expect("Failed to setup SIGTERM handler")
3636 .recv()
3637 .await;
3638 "SIGTERM"
3639 };
3640
3641 #[cfg(not(unix))]
3642 let terminate = std::future::pending::<&str>();
3643
3644 tokio::select! {
3645 sig = ctrl_c => {
3646 tracing::info!("Received {} (Ctrl-C), initiating graceful shutdown...", sig);
3647 shutdown_for_signal.trigger();
3648 }
3649 sig = terminate => {
3650 tracing::info!("Received {}, initiating graceful shutdown...", sig);
3651 shutdown_for_signal.trigger();
3652 }
3653 _ = shutdown_rx_signal.recv() => {
3654 tracing::info!("SignalListener received shutdown signal, exiting...");
3655 }
3656 }
3657 Ok(())
3658 });
3659
3660 let admin_state = build_admin_state(&app_state, rfq_handler_state.qp_cache.clone());
3663 let admin_router = hypercall_admin::admin_router(admin_state.clone()).layer(
3664 axum::middleware::from_fn_with_state(
3665 admin_state,
3666 hypercall_admin::auth::monitoring_auth_middleware,
3667 ),
3668 );
3669
3670 let openapi_doc = hypercall_api::openapi::filter_hidden_tags({
3675 let mut doc = hypercall_api::openapi::internal_openapi();
3676 doc.merge(hypercall_admin::admin_openapi());
3677 doc
3678 });
3679
3680 let app = hypercall_api::routes::build_app(hypercall_api::routes::BuildAppParams {
3681 app_state,
3682 rate_limit_state,
3683 ws_state,
3684 qp_ws_state,
3685 rfq_handler_state,
3686 readiness: readiness.clone() as Arc<dyn hypercall_api::runtime_status::ReadinessGate>,
3687 standby_progress: standby_progress.clone().map(|progress| {
3688 Arc::new(progress) as Arc<dyn hypercall_api::runtime_status::StandbyReplayProgress>
3689 }),
3690 standby_promote: standby_promote.clone(),
3691 admin_router,
3692 openapi_doc,
3693 });
3694
3695 startup_timing!("total", startup_begin.elapsed());
3696 tracing::info!("Integrated API server listening on {}", addr);
3697
3698 if let Some(bootstrap_http) = bootstrap_http {
3699 if let Some(progress) = startup_progress.as_ref() {
3700 progress.mark("full_api_router_installed");
3701 }
3702 if let Some(task) = standby_startup_heartbeat {
3703 task.abort();
3704 }
3705 *bootstrap_http.full_app.write().await = Some(app);
3706 tracing::info!("Full API router installed behind bootstrap standby HTTP server");
3707 if let Err(error) = bootstrap_http.task.await {
3708 tracing::error!("Bootstrap standby HTTP server task failed: {}", error);
3709 }
3710 } else {
3711 let listener = listener.expect("non-standby HTTP server requires the bound listener");
3712 let mut http_shutdown_rx = shutdown.subscribe();
3713 let server = axum::serve(listener, app);
3714
3715 tokio::select! {
3716 result = server => {
3717 if let Err(e) = result {
3718 tracing::error!("Server error: {}", e);
3719 }
3720 }
3721 _ = http_shutdown_rx.recv() => {
3722 tracing::info!("Shutting down HTTP server...");
3723 }
3724 }
3725 }
3726
3727 task_group
3730 .shutdown_and_join(&shutdown, Duration::from_secs(15))
3731 .await?;
3732
3733 tracing::info!("Server shutdown complete");
3734 Ok(())
3735}
3736
3737#[cfg(test)]
3738mod tests {
3739 use super::{build_bootstrap_http_app, BootstrapHttpState, StartupProgress};
3740 use crate::observability::command_trace::EngineStateDigest;
3741 use crate::shared::shutdown::Shutdown;
3742 use alloy::primitives::Address;
3743 use axum::body::Body;
3744 use axum::http::{Request, StatusCode};
3745 use axum::routing::get;
3746 use axum::Router;
3747 use hypercall_api::websocket::event_forwarder::ws_orderbook_update_from_engine_update;
3748 use hypercall_types::wallet_address::test_wallet;
3749 use hypercall_types::OrderbookUpdate;
3750 use hypercall_types::WalletAddress;
3751 use hypercall_types::{EngineMessage, SignedDirectiveTx, TransactionRequest, TransactionType};
3752 use rust_decimal_macros::dec;
3753 use std::str::FromStr;
3754 use std::sync::Arc;
3755 use tower::util::ServiceExt;
3756
3757 #[tokio::test]
3758 async fn bootstrap_http_serves_probe_routes_before_full_app() {
3759 let full_app = Arc::new(tokio::sync::RwLock::new(None));
3760 let standby_progress = crate::nats::replay_loop::ReplayProgress::new();
3761 let startup_progress = StartupProgress::new("test_starting");
3762 startup_progress.mark("test_hydrating");
3763 let (promote_tx, _promote_rx) = tokio::sync::oneshot::channel::<()>();
3764 let app = build_bootstrap_http_app(BootstrapHttpState {
3765 shutdown: Shutdown::new(),
3766 standby_progress: Some(standby_progress),
3767 standby_promote: Some(Arc::new(tokio::sync::Mutex::new(Some(promote_tx)))),
3768 startup_progress: Some(startup_progress.clone()),
3769 full_app: full_app.clone(),
3770 });
3771
3772 let health = app
3773 .clone()
3774 .oneshot(
3775 Request::builder()
3776 .uri("/health")
3777 .body(Body::empty())
3778 .expect("valid health request"),
3779 )
3780 .await
3781 .expect("health response");
3782 assert_eq!(health.status(), StatusCode::OK);
3783
3784 let standby_ready = app
3785 .clone()
3786 .oneshot(
3787 Request::builder()
3788 .uri("/standby-ready")
3789 .body(Body::empty())
3790 .expect("valid standby-ready request"),
3791 )
3792 .await
3793 .expect("standby-ready response");
3794 assert_eq!(standby_ready.status(), StatusCode::SERVICE_UNAVAILABLE);
3795 let standby_body = axum::body::to_bytes(standby_ready.into_body(), usize::MAX)
3796 .await
3797 .expect("standby-ready body");
3798 let standby_payload: serde_json::Value =
3799 serde_json::from_slice(&standby_body).expect("standby-ready json");
3800 assert_eq!(standby_payload["status"], "api_starting");
3801 assert_eq!(standby_payload["api_router_ready"], false);
3802 assert_eq!(standby_payload["promotable"], false);
3803 assert_eq!(standby_payload["startup_phase"], "test_hydrating");
3804 assert_eq!(standby_payload["startup_progress_counter"], 2);
3805 assert!(standby_payload["last_startup_progress_unix_ms"]
3806 .as_u64()
3807 .is_some());
3808 assert!(standby_payload["last_startup_progress_age_ms"]
3809 .as_u64()
3810 .is_some());
3811
3812 let fallback_while_starting = app
3813 .clone()
3814 .oneshot(
3815 Request::builder()
3816 .uri("/delegated")
3817 .body(Body::empty())
3818 .expect("valid fallback request"),
3819 )
3820 .await
3821 .expect("fallback response");
3822 assert_eq!(
3823 fallback_while_starting.status(),
3824 StatusCode::SERVICE_UNAVAILABLE
3825 );
3826
3827 let caught_up_progress = crate::nats::replay_loop::ReplayProgress::new();
3828 caught_up_progress.test_update_catch_up_state(10, 10);
3829 let (caught_up_promote_tx, _caught_up_promote_rx) = tokio::sync::oneshot::channel::<()>();
3830 let caught_up_app = build_bootstrap_http_app(BootstrapHttpState {
3831 shutdown: Shutdown::new(),
3832 standby_progress: Some(caught_up_progress),
3833 standby_promote: Some(Arc::new(tokio::sync::Mutex::new(Some(
3834 caught_up_promote_tx,
3835 )))),
3836 startup_progress: Some(startup_progress),
3837 full_app: full_app.clone(),
3838 });
3839 let caught_up_before_router = caught_up_app
3840 .clone()
3841 .oneshot(
3842 Request::builder()
3843 .uri("/standby-ready")
3844 .body(Body::empty())
3845 .expect("valid caught-up standby-ready request"),
3846 )
3847 .await
3848 .expect("caught-up standby-ready response");
3849 assert_eq!(
3850 caught_up_before_router.status(),
3851 StatusCode::SERVICE_UNAVAILABLE
3852 );
3853
3854 *full_app.write().await = Some(Router::new().route("/delegated", get(|| async { "ok" })));
3855 let caught_up_after_router = caught_up_app
3856 .oneshot(
3857 Request::builder()
3858 .uri("/standby-ready")
3859 .body(Body::empty())
3860 .expect("valid caught-up standby-ready request after router install"),
3861 )
3862 .await
3863 .expect("caught-up standby-ready response after router install");
3864 assert_eq!(caught_up_after_router.status(), StatusCode::OK);
3865 let caught_up_body = axum::body::to_bytes(caught_up_after_router.into_body(), usize::MAX)
3866 .await
3867 .expect("caught-up standby-ready body");
3868 let caught_up_payload: serde_json::Value =
3869 serde_json::from_slice(&caught_up_body).expect("caught-up standby-ready json");
3870 assert_eq!(caught_up_payload["status"], "standby_ready");
3871 assert_eq!(caught_up_payload["startup_phase"], "test_hydrating");
3872 assert_eq!(caught_up_payload["startup_progress_counter"], 2);
3873
3874 let delegated = app
3875 .oneshot(
3876 Request::builder()
3877 .uri("/delegated")
3878 .body(Body::empty())
3879 .expect("valid delegated request"),
3880 )
3881 .await
3882 .expect("delegated response");
3883 assert_eq!(delegated.status(), StatusCode::OK);
3884 }
3885
3886 #[test]
3887 fn test_ws_orderbook_update_converts_contract_units_to_human_contracts() {
3888 let orderbook_update_contract_units_raw = OrderbookUpdate {
3889 symbol: "BTC-20260331-100000-C".to_string(),
3890 bids: vec![(dec!(100), dec!(1_000_000)), (dec!(99), dec!(2_500_000))],
3891 asks: vec![(dec!(101), dec!(500_000)), (dec!(102), dec!(3_000_000))],
3892 timestamp: 1_750_000_000_000_u64,
3893 };
3894
3895 let ws_update = ws_orderbook_update_from_engine_update(
3896 &orderbook_update_contract_units_raw,
3897 Some(test_wallet(7)),
3898 );
3899
3900 assert_eq!(ws_update.bids[0].1, dec!(1));
3901 assert_eq!(ws_update.bids[1].1, dec!(2.5));
3902 assert_eq!(ws_update.asks[0].1, dec!(0.5));
3903 assert_eq!(ws_update.asks[1].1, dec!(3));
3904 }
3905
3906 #[test]
3907 fn test_disabled_wal_startup_provides_mock_journal_writer_for_directives() {
3908 let (engine_journal_writer, journal_batch_sender) = crate::journal::mock_journal_backends();
3909 assert!(
3910 journal_batch_sender.is_none(),
3911 "disabled WAL mode should not configure a batch sender"
3912 );
3913
3914 let journal_writer = engine_journal_writer
3915 .expect("disabled WAL mode should inject a mock writer for directive submits");
3916 let wallet = WalletAddress::from(
3917 Address::from_str("0x1111111111111111111111111111111111111111").expect("valid wallet"),
3918 );
3919 let request_id = uuid::Uuid::now_v7().to_string();
3920 let tx_request = TransactionRequest {
3921 request_id: request_id.clone(),
3922 wallet_address: wallet,
3923 account_contract: wallet,
3924 transaction_type: TransactionType::UserDirective(SignedDirectiveTx {
3925 directive: vec![1, 2, 3],
3926 signature: "0xabcdef".to_string(),
3927 }),
3928 timestamp: 123,
3929 expires_at: 456,
3930 };
3931 let result = journal_writer.append_transition(
3932 tx_request.timestamp,
3933 &hypercall_types::serialize_to_wire_bytes(&serde_json::json!({
3934 "account": wallet,
3935 "actionKey": "hc_update_api_wallet",
3936 "nonce": 42,
3937 "recoveredSigner": wallet,
3938 })),
3939 None,
3940 None,
3941 &EngineStateDigest::default(),
3942 &EngineStateDigest::default(),
3943 0,
3944 &[EngineMessage::TransactionRequest(tx_request)],
3945 hypercall_db_diesel::engine_enums::DbUuid(
3946 uuid::Uuid::parse_str(&request_id).expect("request id should be a UUID"),
3947 ),
3948 None,
3949 );
3950
3951 assert!(
3952 result.is_ok(),
3953 "disabled WAL startup writer should be usable for directive journaling"
3954 );
3955 }
3956}