Skip to main content

hypercall/runtime/
integrated.rs

1use anyhow::Context;
2use hypercall_types::WalletAddress;
3use metrics::gauge;
4use std::future::Future;
5use std::net::SocketAddr;
6use std::pin::Pin;
7use std::str::FromStr;
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::Arc;
10use std::time::{Duration, Instant};
11use tokio::signal;
12use tokio::sync::mpsc;
13use tracing::{debug, error, info, warn};
14
15#[cfg(feature = "testnet")]
16const TESTNET_BOOTSTRAP_DEPOSIT_SEQUENCE_BASE: u64 = 9_000_000_000;
17
18#[cfg(feature = "testnet")]
19fn testnet_bootstrap_deposit_sequence(index: usize) -> u64 {
20    TESTNET_BOOTSTRAP_DEPOSIT_SEQUENCE_BASE
21        .checked_add(index as u64)
22        .expect("testnet bootstrap deposit sequence overflow")
23}
24
25#[cfg(feature = "testnet")]
26fn testnet_bootstrap_source_event_hash(sequence: u64) -> alloy::primitives::FixedBytes<32> {
27    let mut bytes = [0u8; 32];
28    bytes[..16].copy_from_slice(b"testnet_bootstra");
29    bytes[24..].copy_from_slice(&sequence.to_be_bytes());
30    alloy::primitives::FixedBytes::from(bytes)
31}
32
33#[cfg(feature = "testnet")]
34fn testnet_bootstrap_request_id(index: usize) -> String {
35    uuid::Uuid::from_u128(
36        0x1707_0000_0000_0000_0000_0000_0000_0000u128
37            .checked_add(index as u128)
38            .expect("testnet bootstrap request UUID overflow"),
39    )
40    .to_string()
41}
42
43fn parse_configured_wallet_set(
44    config_name: &str,
45    wallets: &[String],
46) -> anyhow::Result<std::collections::BTreeSet<WalletAddress>> {
47    wallets
48        .iter()
49        .map(|wallet| {
50            WalletAddress::from_str(wallet)
51                .map_err(|error| anyhow::anyhow!("Invalid {config_name} wallet {wallet}: {error}"))
52        })
53        .collect()
54}
55
56/// Record a startup phase duration as both a log line and a Prometheus gauge.
57/// Uses gauges (not histograms) since startup only happens once per process.
58macro_rules! startup_timing {
59    ($phase:expr, $elapsed:expr) => {{
60        let elapsed = $elapsed;
61        info!("[startup-timing] {}: {:?}", $phase, elapsed);
62        gauge!("ht_startup_phase_seconds", "phase" => $phase).set(elapsed.as_secs_f64());
63    }};
64    ($phase:expr, $elapsed:expr, $($arg:tt)+) => {{
65        let elapsed = $elapsed;
66        info!("[startup-timing] {}: {:?} ({})", $phase, elapsed, format!($($arg)+));
67        gauge!("ht_startup_phase_seconds", "phase" => $phase).set(elapsed.as_secs_f64());
68    }};
69}
70
71use crate::observability::metrics_collector::{MetricsCollector, MetricsCollectorConfig};
72use crate::shared::option_token_address::ensure_option_token_derivation_supported;
73use crate::shared::service::{Service, ServiceRegistry};
74use crate::shared::shutdown::Shutdown;
75use crate::shared::task_group::TaskGroup;
76
77use crate::hypercore::HypercorePositionService;
78use crate::messaging::{ChannelEventBus, EventBusTrait};
79use crate::portfolio::PortfolioService;
80use crate::price_oracle::hydromancer_client::{HydromancerClient, HydromancerConfig};
81use crate::price_oracle::hyperliquid_oracle::{
82    HyperliquidMarkPriceOracle, HyperliquidOracleConfig,
83};
84use crate::price_oracle::hyperliquid_ws::HyperliquidWsFeed;
85use crate::read_cache::greeks::GreeksCache;
86use crate::read_cache::portfolio::PortfolioCache;
87use crate::readiness::{ReadinessRegistry, SyncStatusReadiness, VolOracleReadiness};
88use crate::rsm::margin_service::SpanMarginService;
89use crate::rsm::portfolio_margin::risk_account_builder::RiskAccountBuilder;
90use crate::rsm::unified_engine::{UnifiedEngineBuilder, UnifiedEngineRequest};
91use crate::runtime::tasks::bbo_snapshot::{BboSnapshotService, BboSnapshotTaskConfig};
92use crate::runtime::tasks::historical_pnl::{HistoricalPnlSnapshotTask, HistoricalPnlTaskConfig};
93use crate::runtime::tasks::historical_theo::{
94    HistoricalTheoSnapshotTask, HistoricalTheoTaskConfig,
95};
96use crate::runtime::tasks::index_price_publisher::{
97    IndexPricePublisher, IndexPriceUpdatePublisher,
98};
99use crate::snapshot::{
100    DbInstrumentsSnapshotLoader, DbInstrumentsSnapshotWriter, InstrumentsSnapshotTask,
101    PortfolioSnapshotTask, SnapshotTaskConfig,
102};
103use crate::standard_margin::{StandardAccountBuilder, StandardMarginService};
104use crate::startup::{oracles, services};
105use crate::types::Config;
106use crate::vol_oracle::VolOracleFactory;
107use axum::body::Body;
108use axum::extract::{Request, State};
109use axum::http::StatusCode;
110use axum::response::{IntoResponse, Response};
111use axum::routing::get;
112use axum::Router;
113use hypercall_api::candles::{
114    candle_ws_poll_interval_ms, CandleWsPublisher, HyperliquidCandleSource, UnderlyingCandleSource,
115};
116use hypercall_api::trading_halt::TradingHaltState;
117use hypercall_api::websocket::{
118    PubSubManager, WsCompetitionFinalStanding, WsCompetitionGapUpdate, WsCompetitionRankChange,
119    WsState,
120};
121use hypercall_api::{directives, handlers, middleware};
122use hypercall_competition::CompetitionService;
123use hypercall_db_diesel::DatabaseHandler;
124use hypercall_types::EngineMessage;
125use std::collections::{HashMap, HashSet};
126use tokio::task::JoinHandle;
127use tower::util::ServiceExt;
128
129type PromoteSender = Arc<tokio::sync::Mutex<Option<tokio::sync::oneshot::Sender<()>>>>;
130
131impl IndexPriceUpdatePublisher for PubSubManager {
132    fn publish_index_prices(&self, update: hypercall_types::ws_protocol::WsIndexPriceUpdate) {
133        PubSubManager::publish_index_prices(self, update);
134    }
135}
136
137#[derive(Clone)]
138pub struct StartupProgress {
139    phase: Arc<std::sync::Mutex<String>>,
140    counter: Arc<AtomicU64>,
141    last_progress_unix_ms: Arc<AtomicU64>,
142}
143
144pub(crate) struct StartupProgressSnapshot {
145    pub(crate) phase: String,
146    pub(crate) counter: u64,
147    pub(crate) last_progress_unix_ms: Option<u64>,
148    pub(crate) last_progress_age_ms: Option<u64>,
149}
150
151impl StartupProgress {
152    fn new(initial_phase: impl Into<String>) -> Self {
153        let progress = Self {
154            phase: Arc::new(std::sync::Mutex::new(String::new())),
155            counter: Arc::new(AtomicU64::new(0)),
156            last_progress_unix_ms: Arc::new(AtomicU64::new(0)),
157        };
158        progress.mark(initial_phase);
159        progress
160    }
161
162    fn mark(&self, phase: impl Into<String>) {
163        let phase = phase.into();
164        *self
165            .phase
166            .lock()
167            .expect("startup progress phase mutex poisoned") = phase;
168        self.record_progress();
169    }
170
171    fn heartbeat(&self) {
172        self.record_progress();
173    }
174
175    pub(crate) fn snapshot(&self) -> StartupProgressSnapshot {
176        let phase = self
177            .phase
178            .lock()
179            .expect("startup progress phase mutex poisoned")
180            .clone();
181        let counter = self.counter.load(Ordering::Relaxed);
182        let last_progress_unix_ms = match self.last_progress_unix_ms.load(Ordering::Relaxed) {
183            0 => None,
184            ts => Some(ts),
185        };
186        let last_progress_age_ms =
187            last_progress_unix_ms.map(|ts| current_unix_ms().saturating_sub(ts));
188
189        StartupProgressSnapshot {
190            phase,
191            counter,
192            last_progress_unix_ms,
193            last_progress_age_ms,
194        }
195    }
196
197    fn record_progress(&self) {
198        self.counter.fetch_add(1, Ordering::Relaxed);
199        self.last_progress_unix_ms
200            .store(current_unix_ms(), Ordering::Relaxed);
201    }
202}
203
204fn current_unix_ms() -> u64 {
205    std::time::SystemTime::now()
206        .duration_since(std::time::UNIX_EPOCH)
207        .expect("system clock before UNIX_EPOCH")
208        .as_millis() as u64
209}
210
211#[derive(Clone)]
212struct BootstrapHttpState {
213    shutdown: Shutdown,
214    standby_progress: Option<crate::nats::replay_loop::ReplayProgress>,
215    standby_promote: Option<PromoteSender>,
216    startup_progress: Option<StartupProgress>,
217    full_app: Arc<tokio::sync::RwLock<Option<Router>>>,
218}
219
220struct BootstrapHttpServer {
221    full_app: Arc<tokio::sync::RwLock<Option<Router>>>,
222    task: JoinHandle<()>,
223}
224
225fn build_bootstrap_http_app(state: BootstrapHttpState) -> Router {
226    Router::new()
227        .route("/health", get(bootstrap_health))
228        .route("/standby-ready", get(bootstrap_standby_ready))
229        .fallback(bootstrap_fallback)
230        .with_state(state)
231}
232
233async fn bootstrap_health(State(state): State<BootstrapHttpState>) -> Response {
234    let response = hypercall_api::models::HealthResponse {
235        status: if state.shutdown.is_triggered() {
236            "shutting_down".to_string()
237        } else {
238            "ok".to_string()
239        },
240    };
241
242    if state.shutdown.is_triggered() {
243        (
244            StatusCode::SERVICE_UNAVAILABLE,
245            hypercall_api::sonic_json::SonicJson(response),
246        )
247            .into_response()
248    } else {
249        (
250            StatusCode::OK,
251            hypercall_api::sonic_json::SonicJson(response),
252        )
253            .into_response()
254    }
255}
256
257async fn bootstrap_standby_ready(State(state): State<BootstrapHttpState>) -> Response {
258    if state.full_app.read().await.is_none() {
259        let response = match state.standby_progress.as_ref() {
260            Some(progress) => {
261                let mut payload = handlers::health::standby_status_payload(progress, false);
262                payload["status"] = serde_json::json!("api_starting");
263                payload["promotable"] = serde_json::json!(false);
264                payload["api_router_ready"] = serde_json::json!(false);
265                add_startup_progress_payload(&mut payload, state.startup_progress.as_ref());
266                payload
267            }
268            None => {
269                let mut payload = serde_json::json!({
270                "status": "not_standby_mode",
271                "api_router_ready": false,
272                });
273                add_startup_progress_payload(&mut payload, state.startup_progress.as_ref());
274                payload
275            }
276        };
277        return (
278            StatusCode::SERVICE_UNAVAILABLE,
279            hypercall_api::sonic_json::SonicJson(response),
280        )
281            .into_response();
282    }
283
284    let already_promoted =
285        handlers::health::standby_already_promoted(state.standby_promote.as_ref());
286    handlers::health::standby_ready_response_with_startup(
287        state
288            .standby_progress
289            .as_ref()
290            .map(|progress| progress as &dyn hypercall_api::runtime_status::StandbyReplayProgress),
291        already_promoted,
292        state
293            .startup_progress
294            .as_ref()
295            .map(|progress| progress as &dyn hypercall_api::runtime_status::StartupProgressReader),
296    )
297}
298
299fn add_startup_progress_payload(
300    payload: &mut serde_json::Value,
301    progress: Option<&StartupProgress>,
302) {
303    let Some(progress) = progress else {
304        return;
305    };
306    let snapshot = progress.snapshot();
307    payload["startup_phase"] = serde_json::json!(snapshot.phase);
308    payload["startup_progress_counter"] = serde_json::json!(snapshot.counter);
309    payload["last_startup_progress_unix_ms"] = serde_json::json!(snapshot.last_progress_unix_ms);
310    payload["last_startup_progress_age_ms"] = serde_json::json!(snapshot.last_progress_age_ms);
311}
312
313async fn bootstrap_fallback(
314    State(state): State<BootstrapHttpState>,
315    request: Request<Body>,
316) -> Response {
317    let app = state.full_app.read().await.clone();
318    let Some(app) = app else {
319        return (
320            StatusCode::SERVICE_UNAVAILABLE,
321            hypercall_api::sonic_json::SonicJson(serde_json::json!({
322                "status": "starting",
323                "message": "API router is still starting"
324            })),
325        )
326            .into_response();
327    };
328
329    app.oneshot(request)
330        .await
331        .unwrap_or_else(|error| match error {})
332}
333
334fn start_bootstrap_http_server(
335    listener: tokio::net::TcpListener,
336    addr: SocketAddr,
337    shutdown: &Shutdown,
338    standby_progress: Option<crate::nats::replay_loop::ReplayProgress>,
339    standby_promote: Option<PromoteSender>,
340    startup_progress: Option<StartupProgress>,
341) -> BootstrapHttpServer {
342    let full_app = Arc::new(tokio::sync::RwLock::new(None));
343    let app = build_bootstrap_http_app(BootstrapHttpState {
344        shutdown: shutdown.clone(),
345        standby_progress,
346        standby_promote,
347        startup_progress,
348        full_app: full_app.clone(),
349    });
350    let mut http_shutdown_rx = shutdown.subscribe();
351
352    let task = tokio::spawn(async move {
353        tracing::info!("Bootstrap standby HTTP server listening on {}", addr);
354        let server = axum::serve(listener, app).with_graceful_shutdown(async move {
355            let _ = http_shutdown_rx.recv().await;
356            tracing::info!("Shutting down bootstrap standby HTTP server...");
357        });
358
359        if let Err(error) = server.await {
360            tracing::error!("Bootstrap standby HTTP server error: {}", error);
361        }
362    });
363
364    BootstrapHttpServer { full_app, task }
365}
366
367pub async fn run_integrated_server(
368    config: Config,
369    runtime: Arc<crate::backend_config::BackendRuntime>,
370) -> anyhow::Result<()> {
371    // Use IPv6 dual-stack binding (::) which accepts both IPv4 and IPv6 connections.
372    let port = std::env::var("PORT").unwrap_or_else(|_| "3000".to_string());
373    let addr: SocketAddr = format!("[::]:{}", port)
374        .parse()
375        .expect("valid socket address");
376    run_integrated_server_on(config, addr, runtime).await
377}
378
379/// Run the integrated server on a specific address.
380///
381/// This is a convenience wrapper that creates a new Shutdown instance.
382/// For testing or custom shutdown handling, use `run_integrated_server_on_with_shutdown`.
383pub async fn run_integrated_server_on(
384    config: Config,
385    addr: SocketAddr,
386    runtime: Arc<crate::backend_config::BackendRuntime>,
387) -> anyhow::Result<()> {
388    let shutdown = Shutdown::new();
389    run_integrated_server_on_with_shutdown(config, addr, shutdown, runtime).await
390}
391
392pub fn spawn_integrated_server_on(
393    config: Config,
394    addr: SocketAddr,
395    runtime: Arc<crate::backend_config::BackendRuntime>,
396) -> tokio::task::JoinHandle<anyhow::Result<()>> {
397    tokio::spawn(Box::pin(run_integrated_server_on(config, addr, runtime)))
398}
399
400/// Run the integrated server with an externally provided shutdown signal.
401///
402/// This allows tests and external callers to control when the server shuts down.
403pub async fn run_integrated_server_on_with_shutdown(
404    config: Config,
405    addr: SocketAddr,
406    shutdown: Shutdown,
407    runtime: Arc<crate::backend_config::BackendRuntime>,
408) -> anyhow::Result<()> {
409    let listener = tokio::net::TcpListener::bind(addr).await?;
410    boxed_integrated_server_with_listener_and_shutdown(config, listener, shutdown, runtime).await
411}
412
413/// Run the integrated server on a pre-bound listener.
414///
415/// Useful for tests that must reserve a port before startup to avoid bind races.
416pub async fn run_integrated_server_with_listener(
417    config: Config,
418    listener: tokio::net::TcpListener,
419) -> anyhow::Result<()> {
420    let runtime = Arc::new(crate::backend_config::BackendRuntime::from_legacy_env()?);
421    let shutdown = Shutdown::new();
422    boxed_integrated_server_with_listener_and_shutdown(config, listener, shutdown, runtime).await
423}
424
425fn boxed_integrated_server_with_listener_and_shutdown(
426    config: Config,
427    listener: tokio::net::TcpListener,
428    shutdown: Shutdown,
429    runtime: Arc<crate::backend_config::BackendRuntime>,
430) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>> {
431    Box::pin(run_integrated_server_with_listener_and_shutdown(
432        config, listener, shutdown, runtime,
433    ))
434}
435
436/// Build the narrow [`hypercall_admin::AdminState`] from the full api
437/// [`handlers::AppState`].
438///
439/// `qp_cache` is sourced from the RFQ handler state because `AppState` does not
440/// own the quote-provider registry directly. This mapping previously lived as
441/// `AdminState::from_app_state` in `hypercall-api`; it moved here when admin
442/// became a sibling crate so neither crate depends on the other.
443fn build_admin_state(
444    app_state: &handlers::AppState,
445    qp_cache: Arc<hypercall_runtime_api::QuoteProviderCache>,
446) -> hypercall_admin::state::AdminState {
447    hypercall_admin::state::AdminState {
448        db: app_state.db.clone(),
449        sync_db: app_state.sync_db.clone(),
450        portfolio_cache: app_state.portfolio_cache.clone(),
451        greeks_cache: app_state.greeks_cache.clone(),
452        quote_provider: app_state.quote_provider.clone(),
453        tier_cache: app_state.tier_cache.clone(),
454        risk_vol_oracle: app_state.risk_vol_oracle.clone(),
455        instruments_cache: app_state.instruments_cache.clone(),
456        engine_state_digest_provider: app_state.engine_state_digest_provider.clone(),
457        engine_journal_reader: app_state.engine_journal_reader.clone(),
458        balance_snapshot_provider: app_state.balance_snapshot_provider.clone(),
459        recovery_safety_report: app_state.recovery_safety_report.clone(),
460        qp_cache,
461        competition: app_state.competition_service.clone(),
462        order_sender: app_state.order_sender.clone(),
463        engine_quiesce_sender: app_state.engine_quiesce_sender.clone(),
464        tier_update_sender: app_state.tier_update_sender.clone(),
465        pm_settlement_admin_sender: app_state.pm_settlement_admin_sender.clone(),
466        agent_auth_sender: app_state.agent_auth_sender.clone(),
467        rsm_signer: app_state.rsm_signer.clone(),
468        trading_halt: app_state.trading_halt.clone(),
469        standby_promote: app_state.standby_promote.clone(),
470        standby_controller: app_state.standby_controller.clone(),
471        drain_signal: app_state.drain_signal.clone(),
472        is_draining: app_state.is_draining.clone(),
473        build_info: app_state.build_info.clone(),
474        runtime_config: Arc::new(hypercall_admin::state::AdminRuntimeConfig {
475            wal_path: app_state.runtime_config.wal_path.clone(),
476            db_host: app_state.runtime_config.db_host.clone(),
477            db_name: app_state.runtime_config.db_name.clone(),
478            testnet_mode: app_state.runtime_config.testnet_mode,
479            portfolio_margin_pool_enabled: app_state.runtime_config.portfolio_margin_pool_enabled,
480            portfolio_margin_settlement_allowlist: app_state
481                .runtime_config
482                .portfolio_margin_settlement_allowlist
483                .clone(),
484        }),
485        server_started_at: app_state.server_started_at,
486        boot_id: app_state.boot_id.clone(),
487        admin_api_key: app_state.admin_api_key.clone(),
488        allow_unauthenticated_monitoring: app_state.allow_unauthenticated_monitoring,
489    }
490}
491
492/// Run the integrated server on a pre-bound listener with an external shutdown signal.
493///
494/// This is primarily used by integration tests that need race-free port reservation
495/// and deterministic shutdown control.
496pub async fn run_integrated_server_with_listener_and_shutdown(
497    config: Config,
498    listener: tokio::net::TcpListener,
499    shutdown: Shutdown,
500    runtime: Arc<crate::backend_config::BackendRuntime>,
501) -> anyhow::Result<()> {
502    let addr = listener.local_addr()?;
503    let mut listener = Some(listener);
504
505    // Initialize Prometheus metrics early to avoid losing metrics emitted during startup
506    crate::observability::prometheus::init_prometheus();
507    // Start the upkeep task now that we're in a tokio runtime context
508    crate::observability::prometheus::start_upkeep_task();
509    tracing::info!(
510        "Starting integrated API server with unified engine on {}...",
511        addr
512    );
513
514    let startup_begin = Instant::now();
515    runtime.apply_process_env();
516
517    #[cfg(not(feature = "test-endpoints"))]
518    if runtime.config.modes.enable_test_endpoints {
519        tracing::warn!(
520            "modes.enable_test_endpoints=true in config but binary built without test-endpoints feature. \
521             Test endpoints, mock feeds, and mock oracles are disabled."
522        );
523    }
524
525    #[cfg(not(feature = "faucet"))]
526    if runtime.config.modes.testnet_mode {
527        tracing::info!(
528            "modes.testnet_mode=true in config but binary built without faucet feature. \
529             The /faucet route is not available."
530        );
531    }
532
533    ensure_option_token_derivation_supported()
534        .map_err(|err| anyhow::anyhow!("Option token derivation startup gate failed: {}", err))?;
535
536    // Install the per-underlying expiry time policy before any component
537    // derives expiry timestamps (engine recovery, caches, settlement, token
538    // address derivation all depend on it).
539    let expiry_times = runtime
540        .config
541        .catalog
542        .expiry_times()
543        .map_err(|err| anyhow::anyhow!("Invalid catalog expiry time configuration: {err:#}"))?;
544    hypercall_types::install_expiry_times(expiry_times)
545        .map_err(|err| anyhow::anyhow!("Expiry time policy install failed: {err}"))?;
546
547    let database_resources = crate::startup::database::build_database_resources(&runtime).await?;
548    let engine_database_url = database_resources.engine_database_url;
549    let db_auth = database_resources.db_auth;
550    let engine_db_auth = database_resources.engine_db_auth;
551    let db_pool = database_resources.db_pool;
552    let diesel_db = database_resources.diesel_db;
553    let shared_db = database_resources.shared_db;
554    let standby_mode_active = database_resources.standby_mode_active;
555    let skip_db_migrations = database_resources.skip_db_migrations;
556    let standby_progress = standby_mode_active.then(crate::nats::replay_loop::ReplayProgress::new);
557    let startup_progress =
558        standby_mode_active.then(|| StartupProgress::new("bootstrap_http_starting"));
559    let (standby_promote, mut standby_replay_promote_rx): (
560        Option<PromoteSender>,
561        Option<tokio::sync::oneshot::Receiver<()>>,
562    ) = if standby_mode_active {
563        let (promote_tx, promote_rx) = tokio::sync::oneshot::channel::<()>();
564        (
565            Some(Arc::new(tokio::sync::Mutex::new(Some(promote_tx)))),
566            Some(promote_rx),
567        )
568    } else {
569        (None, None)
570    };
571    let bootstrap_http = if standby_mode_active {
572        Some(start_bootstrap_http_server(
573            listener
574                .take()
575                .expect("standby bootstrap HTTP requires the bound listener"),
576            addr,
577            &shutdown,
578            standby_progress.clone(),
579            standby_promote.clone(),
580            startup_progress.clone(),
581        ))
582    } else {
583        None
584    };
585    if let Some(progress) = startup_progress.as_ref() {
586        progress.mark("bootstrap_http_ready");
587    }
588
589    // Create PubSub manager
590    let pubsub = Arc::new(PubSubManager::new());
591
592    // Create TaskGroup for tracking background tasks
593    let mut task_group = TaskGroup::new();
594    let mut service_registry = ServiceRegistry::new();
595    let post_promote_services: Arc<tokio::sync::Mutex<Vec<Arc<dyn Service>>>> =
596        Arc::new(tokio::sync::Mutex::new(Vec::new()));
597
598    // Create channel for order processing (from HTTP handlers to unified engine)
599    let (order_tx, order_rx) = mpsc::channel::<UnifiedEngineRequest>(100);
600
601    // Create in-process channel event bus.
602    let t = Instant::now();
603    let event_bus: Arc<dyn EventBusTrait> = Arc::new(
604        ChannelEventBus::new()
605            .map_err(|e| anyhow::anyhow!("Failed to create ChannelEventBus: {}", e))?,
606    );
607
608    startup_timing!("event_bus_create", t.elapsed());
609
610    // NOTE: Do NOT start event bus processing yet – start it after all caches
611    // have subscribed to avoid missing early events.
612
613    crate::startup::transaction_submitter::build_transaction_submitter_resources(
614        &runtime,
615        event_bus.clone(),
616        Some(diesel_db.clone()),
617        &shutdown,
618        &mut task_group,
619    )
620    .await?;
621
622    let sync_exchange_address =
623        alloy::primitives::Address::from_str(&runtime.config.contracts.exchange_contract_address)
624            .ok();
625
626    // Get sender for unified engine to send events
627    let engine_event_tx = event_bus.get_sender();
628
629    // Direct channel for WS event forwarding — low-latency path for WebSocket updates.
630    let (ws_direct_tx, ws_direct_rx_for_forwarder) = mpsc::unbounded_channel();
631
632    let rsm_signer_chain_id = if runtime.config.modes.testnet_mode {
633        hypercall_types::directives::HYPERCALL_TESTNET_CHAIN_ID
634    } else {
635        hypercall_types::directives::HYPERCALL_MAINNET_CHAIN_ID
636    };
637
638    // Initialize PortfolioCache with the shared handler
639    let portfolio_cache = Arc::new(PortfolioCache::new(shared_db.clone()));
640
641    // Initialize from latest snapshot (returns next engine command_id to replay)
642    let t = Instant::now();
643    let next_portfolio_command_id = portfolio_cache.initialize().await?;
644    startup_timing!("portfolio_snapshot_load", t.elapsed());
645
646    // NOTE: Readiness registry is built later, after all components are initialized.
647
648    // Initialize TierCache early so margin-mode dependent projections can be
649    // rebuilt before catchup replay.
650    let tier_cache_config = crate::read_cache::tier::TierCacheConfig {
651        max_open_orders_default: runtime.config.api.max_open_orders_default,
652        max_open_positions_default: runtime.config.api.max_open_positions_default,
653    };
654    let tier_cache = Arc::new(crate::read_cache::tier::TierCache::new_with_config(
655        shared_db.clone(),
656        tier_cache_config,
657    )?);
658    let t = Instant::now();
659    tier_cache.load_from_db().await?;
660    startup_timing!("tier_cache", t.elapsed());
661
662    // Initialize RateLimitCache for API rate limiting
663    let t = Instant::now();
664    let rate_limit_cache = Arc::new(
665        hypercall_api::caches::rate_limit::RateLimitCache::new(
666            tier_cache.clone(),
667            runtime.secrets.redis_url.as_deref(),
668        )
669        .await,
670    );
671    startup_timing!("rate_limit_cache", t.elapsed());
672
673    // Initialize LiquidationCache for pre-liquidation order blocking
674    let liquidation_cache = Arc::new(crate::liquidator::LiquidationCache::new());
675    if runtime.config.liquidation.enabled {
676        liquidation_cache
677            .load_from_db(diesel_db.as_ref())
678            .await
679            .map_err(|error| {
680                anyhow::anyhow!("Failed to load liquidation cache from DB: {}", error)
681            })?;
682        info!("LiquidationCache initialized successfully");
683    } else {
684        info!("LiquidationCache restore skipped because liquidation runtime is disabled");
685    }
686
687    let journal_resources = crate::startup::journal::build_journal_resources(
688        &runtime,
689        &db_auth,
690        &shutdown,
691        &mut task_group,
692    )?;
693    let engine_journal_writer = journal_resources.engine_journal_writer;
694    let journal_batch_sender = journal_resources.journal_batch_sender;
695
696    // Wire tier_cache to PortfolioService BEFORE catchup replay. Runtime balance
697    // reads are snapshot-backed via EngineSnapshot.balance_ledger.
698    {
699        let portfolio_svc = portfolio_cache.get_service();
700        if let Some(impl_svc) = portfolio_svc
701            .as_any()
702            .downcast_ref::<crate::portfolio::PortfolioServiceImpl>()
703        {
704            impl_svc.set_tier_cache(tier_cache.clone()).await;
705            info!("✓ Wired TierCache to PortfolioService for catchup replay");
706        } else {
707            return Err(anyhow::anyhow!(
708                "Could not wire TierCache to PortfolioService for catchup replay: service is not PortfolioServiceImpl"
709            ));
710        }
711    }
712
713    {
714        let t = Instant::now();
715        info!(
716            "Starting portfolio catchup replay from engine command_id={}",
717            next_portfolio_command_id
718        );
719
720        let mut after_command_id = next_portfolio_command_id - 1;
721        let mut total_events_replayed: u64 = 0;
722        let mut total_custody_commands_replayed: u64 = 0;
723        loop {
724            let replay_db: &dyn hypercall_db::JournalReplayReader = shared_db.as_ref();
725            let command_batch = replay_db
726                .get_replay_commands_after_command_id_sync(after_command_id, None, 1000)
727                .map_err(|e| anyhow::anyhow!("Failed to load portfolio replay commands: {}", e))?;
728
729            let Some(last_command) = command_batch.last() else {
730                break;
731            };
732            let end_command_id = last_command.command_id;
733
734            let replay_events = replay_db
735                .get_portfolio_events_for_command_range_sync(after_command_id + 1, end_command_id)
736                .map_err(|e| anyhow::anyhow!("Failed to load portfolio replay events: {}", e))?;
737
738            let mut replay_events_by_command = std::collections::BTreeMap::new();
739            for replay_event in replay_events {
740                replay_events_by_command
741                    .entry(replay_event.command_id)
742                    .or_insert_with(Vec::new)
743                    .push(replay_event);
744            }
745
746            for command in &command_batch {
747                if let Some(events) = replay_events_by_command.remove(&command.command_id) {
748                    for replay_event in events {
749                        match replay_event.event_type {
750                            hypercall_db::EventType::OrderFilled => {
751                                if replay_event.event_data.len() <= 1 {
752                                    panic!(
753                                        "CRITICAL_FAILURE: empty replay fill payload for command_id={}",
754                                        replay_event.command_id
755                                    );
756                                }
757                                let fill =
758                                    match hypercall_types::EngineMessage::deserialize_from_wire(
759                                        crate::shared::topics::TOPIC_FILLS,
760                                        &replay_event.event_data,
761                                    ) {
762                                        Ok(hypercall_types::EngineMessage::OrderFilled {
763                                            fill,
764                                            ..
765                                        }) => fill,
766                                        Ok(other) => {
767                                            panic!(
768                                                "CRITICAL_FAILURE: expected replay fill for command_id={}, got {}",
769                                                replay_event.command_id,
770                                                other.type_name()
771                                            )
772                                        }
773                                        Err(e) => {
774                                            panic!(
775                                                "CRITICAL_FAILURE: failed to deserialize replay fill for command_id={}: {}",
776                                                replay_event.command_id,
777                                                e
778                                            )
779                                        }
780                                    };
781                                portfolio_cache
782                                    .replay_journal_fill(&fill, replay_event.command_id)
783                                    .await;
784                            }
785                            hypercall_db::EventType::PositionExpired => {
786                                if replay_event.event_data.len() <= 1 {
787                                    panic!(
788                                        "CRITICAL_FAILURE: empty replay position expiry payload for command_id={}",
789                                        replay_event.command_id
790                                    );
791                                }
792                                let expiry = rmp_serde::from_slice::<
793                                    hypercall_types::PositionExpiredMessage,
794                                >(&replay_event.event_data[1..])
795                                .unwrap_or_else(|e| {
796                                    panic!(
797                                        "CRITICAL_FAILURE: failed to deserialize replay position expiry for command_id={}: {}",
798                                        replay_event.command_id, e
799                                    )
800                                });
801                                // TODO: Add a dedicated replay_journal_expiry method that skips
802                                // the settlement_payouts lookup, analogous to replay_journal_fill.
803                                // Current behavior is safe but adds startup latency.
804                                portfolio_cache
805                                    .handle_engine_message(
806                                        EngineMessage::PositionExpired(expiry),
807                                        replay_event.command_id,
808                                    )
809                                    .await;
810                            }
811                            other => {
812                                panic!(
813                                    "CRITICAL_FAILURE: unexpected portfolio replay event type {:?} for command_id={}",
814                                    other, replay_event.command_id
815                                )
816                            }
817                        }
818                        total_events_replayed += 1;
819                    }
820                }
821
822                let command_type = match command.command_type.as_str() {
823                    "OptionDepositUpdate" => Some(crate::nats::CommandType::OptionDepositUpdate),
824                    "OptionWithdrawalUpdate" => {
825                        Some(crate::nats::CommandType::OptionWithdrawalUpdate)
826                    }
827                    _ => None,
828                };
829                let Some(command_type) = command_type else {
830                    continue;
831                };
832                let decoded = crate::nats::deserialize::deserialize_command(
833                    command_type,
834                    crate::nats::LEGACY_COMMAND_WIRE_VERSION,
835                    &command.command_data,
836                )
837                .unwrap_or_else(|error| {
838                    panic!(
839                        "CRITICAL_FAILURE: failed to decode portfolio custody command {} (command_id={}): {}",
840                        command.command_type, command.command_id, error
841                    )
842                });
843                match decoded {
844                    crate::rsm::apply::EngineCommand::OptionDepositUpdate {
845                        wallet,
846                        symbol,
847                        quantity,
848                        ..
849                    } => {
850                        portfolio_cache
851                            .replay_option_custody_delta(
852                                wallet,
853                                symbol,
854                                quantity,
855                                command.command_id,
856                            )
857                            .await;
858                        total_custody_commands_replayed += 1;
859                    }
860                    crate::rsm::apply::EngineCommand::OptionWithdrawalUpdate {
861                        wallet,
862                        symbol,
863                        quantity,
864                        ..
865                    } => {
866                        portfolio_cache
867                            .replay_option_custody_delta(
868                                wallet,
869                                symbol,
870                                -quantity,
871                                command.command_id,
872                            )
873                            .await;
874                        total_custody_commands_replayed += 1;
875                    }
876                    other => {
877                        panic!(
878                            "CRITICAL_FAILURE: decoded unexpected portfolio custody command {} for command_id={}",
879                            other.command_type(),
880                            command.command_id
881                        );
882                    }
883                }
884            }
885
886            if let Some((&command_id, _)) = replay_events_by_command.iter().next() {
887                panic!(
888                    "CRITICAL_FAILURE: portfolio replay event found for command_id={} without matching replay command in batch",
889                    command_id
890                );
891            }
892
893            after_command_id = end_command_id;
894        }
895
896        startup_timing!(
897            "portfolio_catchup_replay",
898            t.elapsed(),
899            "{} events, {} custody commands",
900            total_events_replayed,
901            total_custody_commands_replayed
902        );
903        portfolio_cache.set_ready();
904    }
905    // Initialize instruments cache.
906    let t = Instant::now();
907    let instruments_cache =
908        Arc::new(crate::read_cache::instruments_registry::InstrumentsCache::new());
909
910    // Create instruments snapshot loader for snapshot-based initialization
911    let instruments_snapshot_loader = DbInstrumentsSnapshotLoader::new(shared_db.clone());
912    let bootstrap_db: Arc<dyn hypercall_db::BootstrapReader> = diesel_db.clone();
913    let instruments_offsets = instruments_cache
914        .clone()
915        .initialize_with_snapshot(
916            &bootstrap_db,
917            event_bus.clone(),
918            &instruments_snapshot_loader,
919            shutdown.subscribe(),
920        )
921        .await?;
922
923    let instruments_count = instruments_cache.len().await;
924    startup_timing!(
925        "instruments_snapshot_init",
926        t.elapsed(),
927        "{} instruments",
928        instruments_count
929    );
930
931    // Catchup replay for instruments cache if we restored from snapshot
932    if let Some(offsets) = instruments_offsets {
933        let t = Instant::now();
934        info!(
935            "Instruments snapshot restored with {} stream offsets - starting catchup replay",
936            offsets.len()
937        );
938
939        let instruments_cache_replay = instruments_cache.clone();
940        let handler: Box<dyn Fn(EngineMessage) + Send + Sync> = Box::new(move |event| {
941            // Only handle MarketUpdate events during replay
942            if let EngineMessage::MarketUpdate(market_update) = event {
943                let cache = instruments_cache_replay.clone();
944                tokio::task::block_in_place(|| {
945                    tokio::runtime::Handle::current().block_on(async {
946                        if let Err(e) = cache.handle_market_update(market_update).await {
947                            error!("Error replaying market update to instruments cache: {}", e);
948                        }
949                    });
950                });
951            }
952        });
953
954        let replay_result = event_bus.replay_from_offsets(offsets, handler).await;
955
956        match replay_result {
957            Ok(final_offsets) => {
958                // Advance seq tracking to the final replay offsets so subsequent
959                // snapshots record where live consumption should resume from.
960                if let Err(e) = instruments_cache
961                    .apply_snapshot_offsets(final_offsets)
962                    .await
963                {
964                    error!("Failed to apply replay offsets to instruments cache: {}", e);
965                }
966                let final_count = instruments_cache.len().await;
967                startup_timing!(
968                    "instruments_catchup_replay",
969                    t.elapsed(),
970                    "{} instruments",
971                    final_count
972                );
973                instruments_cache.sync_status().set_ready();
974            }
975            Err(e) => {
976                startup_timing!("instruments_catchup_replay", t.elapsed(), "FAILED: {}", e);
977                error!(
978                    "Instruments catchup replay failed: {} - marking as ready with potentially stale state",
979                    e
980                );
981                // Still mark as ready so service can start, but log the error
982                instruments_cache.sync_status().set_ready();
983            }
984        }
985    }
986
987    // Start instruments snapshot task for periodic snapshots
988    if !standby_mode_active {
989        let instruments_cache_for_snapshot = instruments_cache.clone();
990        let instruments_get_offsets = move || instruments_cache_for_snapshot.snapshot_offsets();
991        let instruments_snapshot_writer = Arc::new(DbInstrumentsSnapshotWriter::new(
992            shared_db.clone(),
993            instruments_cache.clone(),
994            instruments_get_offsets,
995        ));
996        let instruments_snapshot_task = Arc::new(InstrumentsSnapshotTask::new(
997            instruments_snapshot_writer,
998            SnapshotTaskConfig::default(),
999        ));
1000        service_registry.register(instruments_snapshot_task);
1001        info!("Instruments snapshot task registered (60s interval)");
1002    } else {
1003        info!("Instruments snapshot task deferred (standby mode)");
1004    }
1005
1006    let catalog_config = runtime.config.catalog.clone();
1007    info!(
1008        "Loaded inline catalog config version {} ({} underlyings)",
1009        catalog_config.version,
1010        catalog_config.underlyings.len()
1011    );
1012    let collateral_registry = Arc::new(
1013        catalog_manager::CollateralRegistry::from_catalog(&catalog_config)
1014            .context("Failed to build collateral registry from catalog config")?,
1015    );
1016
1017    // Resolve oracle markets from catalog (with optional env overrides).
1018    let oracle_markets = oracles::configured_oracle_markets(&catalog_config);
1019    let underlying_to_candle_coin: Arc<HashMap<String, String>> = Arc::new(
1020        oracle_markets
1021            .iter()
1022            .map(|market| (market.underlying.clone(), market.hyperliquid_coin.clone()))
1023            .collect(),
1024    );
1025    let candle_source: Arc<dyn UnderlyingCandleSource> = Arc::new(
1026        HyperliquidCandleSource::from_config(
1027            &runtime.config.pricing,
1028            runtime.config.modes.testnet_mode,
1029        )
1030        .map_err(|e| anyhow::anyhow!("Failed to initialize HyperliquidCandleSource: {}", e))?,
1031    );
1032
1033    // Build WsFeed symbol list from resolved oracle markets.
1034    let mut ws_symbols = Vec::new();
1035    let mut seen_symbols = HashSet::new();
1036    for market in &oracle_markets {
1037        if seen_symbols.insert(market.hyperliquid_coin.clone()) {
1038            ws_symbols.push(market.hyperliquid_coin.clone());
1039        }
1040    }
1041
1042    let ws_url = runtime.config.pricing.hyperliquid_ws_url.clone();
1043    let ws_feed = {
1044        #[cfg(feature = "test-endpoints")]
1045        {
1046            if runtime.config.modes.skip_external_oracle {
1047                let mut default_prices = HashMap::new();
1048                default_prices.insert("BTC".to_string(), 90_000.0);
1049                default_prices.insert("ETH".to_string(), 2_500.0);
1050                default_prices.insert("HYPE".to_string(), 42.0);
1051                default_prices.insert("km:US500".to_string(), 5_500.0);
1052                default_prices.insert("km:USOIL".to_string(), 65.0);
1053                oracles::apply_mock_oracle_price_overrides(&mut default_prices);
1054                let feed = Arc::new(HyperliquidWsFeed::new_mock(ws_symbols, &default_prices));
1055                let feed_clone = feed.clone();
1056                let mock_shutdown_rx = shutdown.subscribe();
1057                task_group.spawn("MockWsFeedRefresh", async move {
1058                    let mut interval = tokio::time::interval(Duration::from_secs(5));
1059                    let mut shutdown = mock_shutdown_rx;
1060                    loop {
1061                        tokio::select! {
1062                            _ = interval.tick() => feed_clone.refresh_mock_timestamps().await,
1063                            _ = shutdown.recv() => break,
1064                        }
1065                    }
1066                    Ok(())
1067                });
1068                info!("HyperliquidWsFeed mock with default prices (skip_external_oracle=true)");
1069                feed
1070            } else {
1071                let feed = Arc::new(HyperliquidWsFeed::new(ws_url.clone(), ws_symbols));
1072                let ws_feed_clone = feed.clone();
1073                let ws_shutdown_rx = shutdown.subscribe();
1074                task_group.spawn("HyperliquidWsFeed", async move {
1075                    ws_feed_clone.start(ws_shutdown_rx).await;
1076                    Ok(())
1077                });
1078                info!("HyperliquidWsFeed live mode enabled (test-endpoints build)");
1079                feed
1080            }
1081        }
1082        #[cfg(not(feature = "test-endpoints"))]
1083        {
1084            let feed = Arc::new(HyperliquidWsFeed::new(ws_url.clone(), ws_symbols));
1085            let ws_feed_clone = feed.clone();
1086            let ws_shutdown_rx = shutdown.subscribe();
1087            task_group.spawn("HyperliquidWsFeed", async move {
1088                ws_feed_clone.start(ws_shutdown_rx).await;
1089                Ok(())
1090            });
1091            info!("HyperliquidWsFeed started (url={})", ws_url);
1092            feed
1093        }
1094    };
1095
1096    let hydromancer_client =
1097        if runtime.config.hydromancer.enabled && !cfg!(feature = "test-endpoints") {
1098            let api_key = runtime
1099                .secrets
1100                .require_hydromancer_api_key()
1101                .expect("Hydromancer is enabled but HYDROMANCER_API_KEY is missing")
1102                .to_string();
1103            let config = HydromancerConfig {
1104                api_url: runtime.config.hydromancer.api_url.clone(),
1105                api_key,
1106            };
1107            info!("Hydromancer fallback oracle enabled");
1108            Some(Arc::new(HydromancerClient::new(config)))
1109        } else {
1110            info!("Hydromancer fallback oracle disabled");
1111            None
1112        };
1113
1114    // Initialize mark price oracles for each underlying.
1115    let t = Instant::now();
1116    let spot_price_notify = Arc::new(tokio::sync::Notify::new());
1117    let mut mark_price_oracles: HashMap<String, Arc<HyperliquidMarkPriceOracle>> = HashMap::new();
1118    for market in &oracle_markets {
1119        let t_oracle = Instant::now();
1120        let oracle_config = HyperliquidOracleConfig {
1121            symbol: market.hyperliquid_coin.clone(),
1122            oracle_writer: Some(shared_db.clone()),
1123            ws_feed: Some(ws_feed.clone()),
1124            price_notify: Some(spot_price_notify.clone()),
1125            ..Default::default()
1126        };
1127        let mut oracle = {
1128            #[cfg(feature = "test-endpoints")]
1129            {
1130                HyperliquidMarkPriceOracle::new(oracle_config).map_err(|e| {
1131                    anyhow::anyhow!(
1132                        "Failed to construct mark price oracle {}: {}",
1133                        market.underlying,
1134                        e
1135                    )
1136                })?
1137            }
1138            #[cfg(not(feature = "test-endpoints"))]
1139            {
1140                match HyperliquidMarkPriceOracle::new_with_init(oracle_config.clone()).await {
1141                    Ok(oracle) => oracle,
1142                    Err(err) => {
1143                        error!(
1144                            "Failed to initialize mark price oracle for {} (coin: {}): {}. Starting without initial price.",
1145                            market.underlying, market.hyperliquid_coin, err
1146                        );
1147                        HyperliquidMarkPriceOracle::new(oracle_config).map_err(|e| {
1148                            anyhow::anyhow!(
1149                                "Failed to construct mark price oracle {}: {}",
1150                                market.underlying,
1151                                e
1152                            )
1153                        })?
1154                    }
1155                }
1156            }
1157        };
1158        if let Some(ref hc) = hydromancer_client {
1159            oracle.set_hydromancer_client(hc.clone());
1160        }
1161        startup_timing!("oracle_init", t_oracle.elapsed(), "{}", market.underlying);
1162        gauge!("ht_startup_oracle_seconds", "underlying" => market.underlying.clone())
1163            .set(t_oracle.elapsed().as_secs_f64());
1164        let oracle = Arc::new(oracle);
1165        mark_price_oracles.insert(market.underlying.clone(), oracle);
1166    }
1167    startup_timing!("oracles_total", t.elapsed());
1168
1169    // Start polling tasks for each oracle.
1170    for market in &oracle_markets {
1171        let oracle_clone = mark_price_oracles
1172            .get(&market.underlying)
1173            .unwrap_or_else(|| {
1174                panic!(
1175                    "Mark price oracle missing for {} after initialization",
1176                    market.underlying
1177                )
1178            })
1179            .clone();
1180        let oracle_shutdown_rx = shutdown.subscribe();
1181        let task_name = format!("MarkPriceOracle-{}", market.underlying);
1182        let task_name_static = Box::leak(task_name.into_boxed_str());
1183        task_group.spawn(task_name_static, async move {
1184            let handle = oracle_clone.start_polling_with_shutdown(oracle_shutdown_rx);
1185            if let Err(e) = handle.await {
1186                error!("MarkPriceOracle task panicked: {:?}", e);
1187            }
1188            Ok(())
1189        });
1190        info!(
1191            "Initialized mark price oracle for {} (coin: {})",
1192            market.underlying, market.hyperliquid_coin
1193        );
1194    }
1195
1196    // Clone for CatalogManager before moving to engine builder
1197    let mark_price_oracles_for_catalog = mark_price_oracles.clone();
1198    info!(
1199        "Mark price oracles initialized for: {:?}",
1200        mark_price_oracles.keys().collect::<Vec<_>>()
1201    );
1202
1203    let read_snapshot = std::sync::Arc::new(arc_swap::ArcSwap::from_pointee(
1204        crate::rsm::engine_snapshot::EngineSnapshot::empty(),
1205    ));
1206    let snapshot_provider = std::sync::Arc::new(
1207        crate::rsm::engine_snapshot::SnapshotQuoteProvider::new(read_snapshot.clone()),
1208    );
1209    let quote_provider: std::sync::Arc<dyn hypercall_runtime_api::QuoteProvider> =
1210        snapshot_provider.clone();
1211    let engine_state_digest_provider: std::sync::Arc<
1212        dyn crate::rsm::engine_snapshot::EngineStateDigestProvider,
1213    > = snapshot_provider.clone();
1214    let balance_provider: std::sync::Arc<dyn crate::rsm::ledger::BalanceProvider> =
1215        snapshot_provider.clone();
1216    let order_snapshot: std::sync::Arc<dyn hypercall_runtime_api::OrderSnapshotProvider> =
1217        snapshot_provider.clone();
1218    let agent_auth: std::sync::Arc<dyn hypercall_runtime_api::AgentAuthProvider> =
1219        snapshot_provider.clone();
1220    // Initialize GreeksCache with quote provider for on-demand mid price reads
1221    let t = Instant::now();
1222    let greeks_cache = GreeksCache::new(
1223        diesel_db.as_ref(),
1224        event_bus.clone(),
1225        mark_price_oracles.clone(),
1226        shutdown.subscribe(),
1227        quote_provider.clone(),
1228    )
1229    .await
1230    .map_err(|e| anyhow::anyhow!("Failed to create GreeksCache: {}", e))?;
1231    startup_timing!("greeks_cache", t.elapsed());
1232    let competition_service = Arc::new(CompetitionService::new(
1233        diesel_db.clone(),
1234        greeks_cache.clone(),
1235    ));
1236
1237    // Initialize MmpCache with the shared handler and GreeksCache
1238    let t = Instant::now();
1239    let mmp_cache = Arc::new(crate::read_cache::mmp::MmpCache::new(
1240        shared_db.clone(),
1241        greeks_cache.clone(),
1242    )?);
1243    mmp_cache.load_configs_from_db().await?;
1244    startup_timing!("mmp_cache", t.elapsed());
1245    service_registry.register(mmp_cache.clone());
1246
1247    // TierCache is already initialized earlier (before catchup replay)
1248
1249    // WsEventForwarder reads from the direct channel for low-latency WebSocket updates.
1250    // The sender half (ws_direct_tx) is wired to the engine builder below.
1251    let ws_receiver = ws_direct_rx_for_forwarder;
1252
1253    // Startup catchup: reload TierCache to catch any tier changes that
1254    // occurred between initial load and engine start.
1255    // Version-based ordering ensures stale messages will be ignored.
1256    tier_cache.load_from_db().await?;
1257    debug!("TierCache startup catchup complete");
1258
1259    // Initialize push notification service (optional, requires VAPID_PRIVATE_KEY_PEM env var)
1260    let push_service: Option<Arc<hypercall_api::push_service::PushNotificationService>> =
1261        match std::env::var("VAPID_PRIVATE_KEY_PEM") {
1262            Ok(pem_b64) => {
1263                let pem_b64_clean: String =
1264                    pem_b64.chars().filter(|c| !c.is_whitespace()).collect();
1265                match base64::Engine::decode(
1266                    &base64::engine::general_purpose::STANDARD,
1267                    &pem_b64_clean,
1268                ) {
1269                    Ok(pem_bytes) => {
1270                        match hypercall_api::push_service::PushNotificationService::new(
1271                            diesel_db.clone(),
1272                            &pem_bytes,
1273                        ) {
1274                            Ok(svc) => {
1275                                tracing::info!("✓ PushNotificationService initialized");
1276                                Some(Arc::new(svc))
1277                            }
1278                            Err(e) => {
1279                                tracing::warn!("Push notification service init failed: {e}");
1280                                None
1281                            }
1282                        }
1283                    }
1284                    Err(e) => {
1285                        tracing::warn!("VAPID_PRIVATE_KEY_PEM base64 decode failed: {e}");
1286                        None
1287                    }
1288                }
1289            }
1290            Err(_) => {
1291                tracing::info!("VAPID_PRIVATE_KEY_PEM not set, push notifications disabled");
1292                None
1293            }
1294        };
1295
1296    // Initialize persisted notification feed service.
1297    //
1298    // Postgres is the source of truth — feed writes/reads work without Redis.
1299    // Redis, when available, caches a per-wallet `has_unread` bit for the
1300    // edge-served bell dot. The service always initializes; the Redis client
1301    // is optional and populated only when we get a valid `redis://` / `rediss://`
1302    // URL. REST-only URLs (`https://…`) aren't usable from the `redis` crate
1303    // and are ignored with a warning rather than silently disabling the feed.
1304    let notification_service: Arc<hypercall_api::notification_service::NotificationService> = {
1305        let redis_client_opt: Option<redis::Client> = match std::env::var("UPSTASH_REDIS_URL")
1306            .ok()
1307            .or_else(|| std::env::var("REDIS_URL").ok())
1308        {
1309            Some(url) if !url.trim().is_empty() => {
1310                let trimmed = url.trim();
1311                if trimmed.starts_with("redis://") || trimmed.starts_with("rediss://") {
1312                    match redis::Client::open(trimmed.to_string()) {
1313                        Ok(c) => {
1314                            tracing::info!("✓ NotificationService Redis cache initialized");
1315                            Some(c)
1316                        }
1317                        Err(e) => {
1318                            tracing::warn!(
1319                                error = %e,
1320                                "NotificationService: Redis client open failed; feed persistence stays on, bell cache disabled"
1321                            );
1322                            None
1323                        }
1324                    }
1325                } else {
1326                    tracing::warn!(
1327                        "NotificationService: UPSTASH_REDIS_URL / REDIS_URL must be a redis:// or rediss:// URI (got scheme mismatch); feed persistence stays on, bell cache disabled"
1328                    );
1329                    None
1330                }
1331            }
1332            _ => {
1333                tracing::info!(
1334                    "NotificationService: no Redis URL configured (UPSTASH_REDIS_URL / REDIS_URL), bell cache disabled; feed persistence still active"
1335                );
1336                None
1337            }
1338        };
1339        hypercall_api::notification_service::NotificationService::new(
1340            diesel_db.clone(),
1341            redis_client_opt,
1342            shutdown.subscribe(),
1343        )
1344    };
1345
1346    // Forward events to WebSocket PubSubManager, PortfolioCache, and Persistence
1347    let ws_forwarder_deps = hypercall_api::websocket::event_forwarder::WsEventForwarderDeps {
1348        pubsub: pubsub.clone(),
1349        portfolio_cache: portfolio_cache.clone(),
1350        instruments_cache: instruments_cache.clone(),
1351        quote_provider: quote_provider.clone(),
1352        greeks_cache: greeks_cache.clone(),
1353        tier_cache: tier_cache.clone(),
1354        push_service: push_service.clone(),
1355        db: Arc::new(
1356            crate::db_handler::DieselEventHandler::with_pool_no_migrations(shared_db.pool()),
1357        ),
1358        engine_event_tx: engine_event_tx.clone(),
1359        notification_service: notification_service.clone(),
1360        shutdown: shutdown.clone(),
1361    };
1362    task_group.spawn("WsEventForwarder", async move {
1363        hypercall_api::websocket::event_forwarder::run(ws_forwarder_deps, ws_receiver).await
1364    });
1365
1366    // Competition fill recording is an independent downstream consumer of the
1367    // engine fill stream, not a side effect of websocket broadcast. It subscribes
1368    // to the EventBus TOPIC_FILLS fan-out (the same generic mechanism the
1369    // HyperCore forwarder and other downstream services use); when the engine
1370    // runs out-of-process this receiver is fed by a NATS consumer instead.
1371    let competition_fill_rx = event_bus
1372        .subscribe(vec![hypercall_types::topics::TOPIC_FILLS.to_string()])
1373        .await
1374        .map_err(|e| anyhow::anyhow!("Failed to subscribe competition fill recorder: {}", e))?;
1375    let competition_fill_recorder =
1376        hypercall_competition::CompetitionFillRecorder::new(competition_service.clone());
1377    let competition_fill_shutdown = shutdown.subscribe();
1378    task_group.spawn("CompetitionFillRecorder", async move {
1379        competition_fill_recorder
1380            .run(competition_fill_rx, competition_fill_shutdown)
1381            .await;
1382        Ok(())
1383    });
1384
1385    // Get the portfolio service from the cache for wiring into the engine
1386    // This ensures UnifiedEngine reads executed state from the canonical source
1387    let portfolio_service = portfolio_cache.get_service();
1388
1389    // Initialize MarketStatsCache with DB backfill for last 24h
1390    let market_stats_cache = Arc::new(crate::read_cache::market_stats::MarketStatsCache::new(
1391        portfolio_service.clone(),
1392    ));
1393    market_stats_cache
1394        .clone()
1395        .initialize_with_backfill(diesel_db.as_ref(), event_bus.clone(), shutdown.subscribe())
1396        .await?;
1397    tracing::info!("✓ MarketStatsCache initialized");
1398
1399    let markets_snapshot_refresh_ms = runtime.config.api.markets_snapshot_refresh_ms;
1400    let markets_snapshot_cache = Arc::new(
1401        hypercall_api::caches::MarketsSnapshotCache::new(
1402            instruments_cache.clone(),
1403            greeks_cache.clone(),
1404            market_stats_cache.clone(),
1405        )
1406        .with_refresh_interval(Duration::from_millis(markets_snapshot_refresh_ms)),
1407    );
1408    markets_snapshot_cache.initialize().await;
1409    service_registry.register(markets_snapshot_cache.clone());
1410    tracing::info!(
1411        "✓ MarketsSnapshotCache initialized (refresh_ms={})",
1412        markets_snapshot_refresh_ms
1413    );
1414
1415    let instruments_snapshot_refresh_ms =
1416        services::parse_positive_refresh_ms_env("INSTRUMENTS_SNAPSHOT_REFRESH_MS", 10_000);
1417    let instruments_snapshot_cache = Arc::new(
1418        hypercall_api::caches::InstrumentsSnapshotCache::new(instruments_cache.clone())
1419            .with_refresh_interval(Duration::from_millis(instruments_snapshot_refresh_ms)),
1420    );
1421    instruments_snapshot_cache.initialize().await;
1422    service_registry.register(instruments_snapshot_cache.clone());
1423    tracing::info!(
1424        "✓ InstrumentsSnapshotCache initialized (refresh_ms={})",
1425        instruments_snapshot_refresh_ms
1426    );
1427
1428    // --- Competitions Snapshot Cache ---
1429    let competitions_snapshot_refresh_ms =
1430        services::parse_positive_refresh_ms_env("COMPETITIONS_SNAPSHOT_REFRESH_MS", 10_000);
1431    let competitions_snapshot_cache = Arc::new(
1432        hypercall_api::caches::CompetitionsSnapshotCache::new(competition_service.clone())
1433            .with_refresh_interval(Duration::from_millis(competitions_snapshot_refresh_ms)),
1434    );
1435    competitions_snapshot_cache.initialize().await;
1436    service_registry.register(competitions_snapshot_cache.clone());
1437    tracing::info!(
1438        "✓ CompetitionsSnapshotCache initialized (refresh_ms={})",
1439        competitions_snapshot_refresh_ms
1440    );
1441
1442    // --- Sparklines Snapshot Cache ---
1443    let sparklines_snapshot_refresh_ms =
1444        services::parse_positive_refresh_ms_env("SPARKLINES_SNAPSHOT_REFRESH_MS", 15_000);
1445    let sparklines_snapshot_cache = Arc::new(
1446        hypercall_api::caches::SparklinesSnapshotCache::from_config(
1447            &runtime.config.pricing,
1448            runtime.config.modes.testnet_mode,
1449        )
1450        .map_err(|e| anyhow::anyhow!("Failed to create SparklinesSnapshotCache: {}", e))?
1451        .with_refresh_interval(Duration::from_millis(sparklines_snapshot_refresh_ms)),
1452    );
1453    sparklines_snapshot_cache.initialize().await;
1454    service_registry.register(sparklines_snapshot_cache.clone());
1455    tracing::info!(
1456        "✓ SparklinesSnapshotCache initialized (refresh_ms={})",
1457        sparklines_snapshot_refresh_ms
1458    );
1459
1460    let mut hypercore_event_rx = event_bus
1461        .subscribe(vec![
1462            crate::shared::topics::TOPIC_HYPERCORE_POSITION_UPDATES.to_string(),
1463        ])
1464        .await
1465        .map_err(|e| anyhow::anyhow!("Failed to subscribe to HyperCore updates: {}", e))?;
1466    let hypercore_ws_tx = ws_direct_tx.clone();
1467    let mut hypercore_bridge_shutdown_rx = shutdown.subscribe();
1468    task_group.spawn("HypercoreEventForwarder", async move {
1469        loop {
1470            let event = tokio::select! {
1471                _ = hypercore_bridge_shutdown_rx.recv() => {
1472                    info!("HypercoreEventForwarder received shutdown signal");
1473                    break;
1474                }
1475                maybe_event = hypercore_event_rx.recv() => {
1476                    match maybe_event {
1477                        Some(event) => event,
1478                        None => break,
1479                    }
1480                }
1481            };
1482
1483            if let EngineMessage::HypercorePositionUpdate(_) = &event {
1484                if hypercore_ws_tx.send(event).is_err() {
1485                    warn!("HypercoreEventForwarder failed to forward HyperCore update");
1486                    break;
1487                }
1488            }
1489        }
1490
1491        Ok(())
1492    });
1493
1494    let (deposit_tx, deposit_rx) =
1495        tokio::sync::mpsc::channel::<crate::rsm::unified_engine::DepositRequest>(128);
1496
1497    // NOW start event bus processing – ALL subscriptions are complete
1498    // CRITICAL: This MUST happen after all subscribe() calls so no events are missed
1499    let t = Instant::now();
1500    event_bus.clone().start_processing().await;
1501    startup_timing!("event_bus_start_processing", t.elapsed());
1502
1503    let vol_oracle_output = VolOracleFactory::build(&catalog_config, &mut task_group, &shutdown)
1504        .await
1505        .map_err(|err| anyhow::anyhow!("Failed to build vol oracle router: {err:#}"))?;
1506    let risk_vol_oracle = vol_oracle_output.oracle;
1507
1508    // Wire vol oracle into GreeksCache so PM repricing can fall back to
1509    // surface-interpolated IV when orderbooks are empty (e.g. after fresh deploy).
1510    greeks_cache.set_vol_oracle(risk_vol_oracle.clone()).await;
1511
1512    // Feed platform spot prices into the Polygon vol oracle so it can
1513    // compute dynamic strike scales (platform_spot / etf_spot).
1514    {
1515        let gc = greeks_cache.clone();
1516        let spots = vol_oracle_output.polygon_platform_spots;
1517        let underlyings: Vec<String> = catalog_config.vol_oracles.routes.keys().cloned().collect();
1518        let mut feeder_shutdown = shutdown.subscribe();
1519        task_group.spawn("polygon_spot_feeder", async move {
1520            let mut interval = tokio::time::interval(std::time::Duration::from_secs(5));
1521            loop {
1522                tokio::select! {
1523                    _ = interval.tick() => {}
1524                    _ = feeder_shutdown.recv() => break,
1525                }
1526                let mut updates: Vec<(String, Option<f64>)> = Vec::new();
1527                for underlying in &underlyings {
1528                    updates.push((underlying.clone(), gc.get_spot_price(underlying).await));
1529                }
1530                let mut map = spots.write().expect("platform_spots poisoned");
1531                for (underlying, price) in updates {
1532                    if let Some(p) = price {
1533                        map.insert(underlying, p);
1534                    } else {
1535                        map.remove(&underlying);
1536                    }
1537                }
1538            }
1539            Ok(())
1540        });
1541    }
1542
1543    // Readiness checks are collected here but the registry is built after
1544    // the engine, so we can include the engine's sync_status. This ensures
1545    // the API rejects requests until post-startup reconciliation completes.
1546    let mut readiness_checks: Vec<Arc<dyn crate::readiness::Readiness>> = vec![
1547        Arc::new(SyncStatusReadiness::new(
1548            "portfolio",
1549            portfolio_cache.sync_status(),
1550        )),
1551        Arc::new(SyncStatusReadiness::new(
1552            "instruments",
1553            instruments_cache.sync_status(),
1554        )),
1555    ];
1556    if runtime.config.api.require_risk_vol_readiness {
1557        readiness_checks.push(Arc::new(VolOracleReadiness::new(
1558            "risk_vol_oracle",
1559            risk_vol_oracle.clone(),
1560        )));
1561    }
1562
1563    let _hypercore_position_service =
1564        if std::env::var("ENABLE_INTEGRATED_HYPERCORE_POSITION_SERVICE").is_ok() {
1565            let update_interval_secs = std::env::var("HYPERCORE_UPDATE_INTERVAL_SECS")
1566                .ok()
1567                .and_then(|raw| raw.parse::<u64>().ok())
1568                .unwrap_or(30);
1569            let service = Arc::new(HypercorePositionService::with_default_url(
1570                Duration::from_secs(update_interval_secs),
1571                event_bus.clone(),
1572                shared_db.clone(),
1573            ));
1574            let service_for_task = service.clone();
1575            let hypercore_shutdown = shutdown.subscribe();
1576            task_group.spawn("HypercorePositionService", async move {
1577                service_for_task
1578                    .run_with_shutdown(hypercore_shutdown)
1579                    .await
1580                    .map_err(|e| anyhow::anyhow!("HyperCore position service failed: {}", e))
1581            });
1582            tracing::info!(
1583                "✓ HyperCore position service started inside integrated server (interval={}s)",
1584                update_interval_secs
1585            );
1586            Some(service)
1587        } else {
1588            None
1589        };
1590
1591    // Create SpanMarginService for API use (stateless computation, separate from engine's instance)
1592    // This allows the API layer to compute SPAN margin without going through the engine
1593    let span_margin_service = Arc::new(SpanMarginService::new_with_vol_oracle(
1594        config.clone(),
1595        risk_vol_oracle.clone(),
1596    ));
1597    tracing::info!("✓ SpanMarginService initialized for API layer");
1598
1599    // Create RiskAccountBuilder for API risk calculations
1600    // Uses the engine snapshot balance ledger published by UnifiedEngine.
1601    let snapshot_open_orders = Arc::new(
1602        crate::rsm::engine_snapshot::SnapshotOpenOrdersSource::new(order_snapshot.clone()),
1603    );
1604    let risk_account_builder = Arc::new(RiskAccountBuilder::new_with_balance_provider(
1605        balance_provider.clone(),
1606        portfolio_service.clone(),
1607        snapshot_open_orders,
1608        greeks_cache.clone(),
1609    ));
1610    tracing::info!("✓ RiskAccountBuilder initialized for API layer");
1611
1612    // Create StandardMarginService for Deribit-style linear margin calculations
1613    let standard_margin_service = Arc::new(StandardMarginService::new());
1614    tracing::info!("✓ StandardMarginService initialized for API layer");
1615
1616    // Create StandardAccountBuilder for Standard margin mode accounts
1617    let standard_account_builder = Arc::new(StandardAccountBuilder::new_with_balance_provider(
1618        balance_provider.clone(),
1619        portfolio_service.clone(),
1620        greeks_cache.clone(),
1621    ));
1622    tracing::info!("✓ StandardAccountBuilder initialized for API layer");
1623
1624    // Wire margin service + risk account builder into PortfolioCache for real-time WS margin updates
1625    portfolio_cache
1626        .set_margin_dependencies(
1627            span_margin_service.clone(),
1628            greeks_cache.clone(),
1629            risk_account_builder.clone(),
1630            tier_cache.clone(),
1631            order_snapshot.clone(),
1632            standard_margin_service.clone(),
1633            standard_account_builder.clone(),
1634        )
1635        .await;
1636
1637    // After all dependencies are wired, reprice all positions loaded from snapshot/replay.
1638    // Positions replayed from journal have unrealized_pnl=0 (stale). Repricing here ensures
1639    // the first /portfolio GET returns correct equity instead of inflated values.
1640    {
1641        let t = Instant::now();
1642        let repriced = portfolio_cache.reprice_all_wallets().await;
1643        startup_timing!("portfolio_reprice_all", t.elapsed(), "{} wallets", repriced);
1644    }
1645
1646    // Start DB-writing snapshot tasks (deferred in standby — readonly pool)
1647    if !standby_mode_active {
1648        let historical_pnl_config =
1649            HistoricalPnlTaskConfig::from_config(&runtime.config.background_tasks.historical_pnl);
1650        let historical_pnl_task = HistoricalPnlSnapshotTask::new(
1651            portfolio_cache.clone(),
1652            diesel_db.clone(),
1653            historical_pnl_config.clone(),
1654        );
1655        service_registry.register(Arc::new(historical_pnl_task));
1656        info!(
1657            "Historical pnl snapshot task registered (max_periods={}, capture_every_5m_ms={}, capture_every_1h_ms={}, capture_every_1d_ms={})",
1658            historical_pnl_config.max_periods,
1659            historical_pnl_config.capture_every_5m_ms,
1660            historical_pnl_config.capture_every_1h_ms,
1661            historical_pnl_config.capture_every_1d_ms
1662        );
1663
1664        let historical_theo_config = HistoricalTheoTaskConfig::from_env();
1665        let historical_theo_task = HistoricalTheoSnapshotTask::new(
1666            instruments_cache.clone(),
1667            greeks_cache.clone(),
1668            diesel_db.clone(),
1669            historical_theo_config.clone(),
1670        );
1671        service_registry.register(Arc::new(historical_theo_task));
1672        info!(
1673            "Historical theo snapshot task registered (max_periods={}, capture_every_5m_ms={}, capture_every_1h_ms={}, capture_every_1d_ms={})",
1674            historical_theo_config.max_periods,
1675            historical_theo_config.capture_every_5m_ms,
1676            historical_theo_config.capture_every_1h_ms,
1677            historical_theo_config.capture_every_1d_ms
1678        );
1679
1680        let vol_surface_config =
1681            crate::runtime::tasks::vol_surface_snapshot::VolSurfaceSnapshotConfig::from_env();
1682        let vol_surface_task =
1683            crate::runtime::tasks::vol_surface_snapshot::VolSurfaceSnapshotTask::new(
1684                risk_vol_oracle.clone(),
1685                diesel_db.clone(),
1686                vol_surface_config.clone(),
1687            );
1688        service_registry.register(Arc::new(vol_surface_task));
1689        info!(
1690            "Vol surface snapshot task registered (max_periods={}, capture_every_5m_ms={}, capture_every_1h_ms={}, capture_every_1d_ms={})",
1691            vol_surface_config.max_periods,
1692            vol_surface_config.capture_every_5m_ms,
1693            vol_surface_config.capture_every_1h_ms,
1694            vol_surface_config.capture_every_1d_ms
1695        );
1696    } else {
1697        info!("Historical pnl/theo/vol-surface snapshot tasks deferred (standby mode)");
1698    }
1699
1700    // Start persisted BBO snapshot task for /options-summary 24h price_change lookup.
1701    let bbo_snapshot_config =
1702        BboSnapshotTaskConfig::from_config(&runtime.config.background_tasks.bbo_snapshot);
1703    let bbo_snapshot_service = Arc::new(BboSnapshotService::new(
1704        instruments_cache.clone(),
1705        quote_provider.clone(),
1706        diesel_db.clone(),
1707        bbo_snapshot_config.clone(),
1708    ));
1709    if !standby_mode_active {
1710        service_registry.register(bbo_snapshot_service.clone());
1711        info!(
1712            "BBO snapshot task registered (interval_secs={}, retention_days={})",
1713            bbo_snapshot_config.interval_secs, bbo_snapshot_config.retention_days
1714        );
1715    } else {
1716        info!("BBO snapshot task deferred (standby mode)");
1717    }
1718
1719    let options_summary_snapshot_refresh_ms = services::parse_positive_refresh_ms_env(
1720        "OPTIONS_SUMMARY_SNAPSHOT_REFRESH_MS",
1721        markets_snapshot_refresh_ms,
1722    );
1723    let indicative_cache =
1724        Arc::new(hypercall_api::rfq::indicative_quote_cache::IndicativeQuoteCache::new(60_000));
1725    let options_summary_snapshot_cache = Arc::new(
1726        hypercall_api::caches::OptionsSummarySnapshotCache::new(
1727            instruments_cache.clone(),
1728            greeks_cache.clone(),
1729            quote_provider.clone(),
1730            bbo_snapshot_service.clone(),
1731            Some(indicative_cache.clone()),
1732            diesel_db.clone(),
1733        )
1734        .with_refresh_interval(Duration::from_millis(options_summary_snapshot_refresh_ms)),
1735    );
1736    options_summary_snapshot_cache.initialize().await;
1737    service_registry.register(options_summary_snapshot_cache.clone());
1738    tracing::info!(
1739        "✓ OptionsSummarySnapshotCache initialized (refresh_ms={})",
1740        options_summary_snapshot_refresh_ms
1741    );
1742
1743    // --- Upstash Batch Publisher ---
1744    let upstash_init = hypercall_api::upstash::init(
1745        &markets_snapshot_cache,
1746        &competitions_snapshot_cache,
1747        &sparklines_snapshot_cache,
1748        &instruments_snapshot_cache,
1749        &options_summary_snapshot_cache,
1750        diesel_db.clone(),
1751    )
1752    .await?;
1753    if let Some(publisher) = upstash_init.publisher {
1754        service_registry.register(publisher);
1755    }
1756    let username_redis_client = upstash_init.redis_client;
1757
1758    // Periodically refresh WS portfolio margin for subscribed wallets so
1759    // idle accounts still receive market-driven updates.
1760    // Periodically refresh WS portfolio margin and greeks for subscribed wallets
1761    // so idle accounts still receive market-driven updates.
1762    let periodic_margin_cache = portfolio_cache.clone();
1763    let mut periodic_margin_shutdown_rx = shutdown.subscribe();
1764    task_group.spawn("PortfolioMarginWsRefresh", async move {
1765        let mut interval = tokio::time::interval(Duration::from_secs(5));
1766        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
1767
1768        loop {
1769            tokio::select! {
1770                _ = periodic_margin_shutdown_rx.recv() => {
1771                    info!("PortfolioMarginWsRefresh received shutdown signal");
1772                    break;
1773                }
1774                _ = interval.tick() => {
1775                    let position_wallets = periodic_margin_cache
1776                        .publish_position_updates_for_subscribers()
1777                        .await;
1778                    let wallets = periodic_margin_cache
1779                        .publish_margin_updates_for_subscribers()
1780                        .await;
1781                    let greek_wallets = periodic_margin_cache
1782                        .publish_greeks_updates_for_subscribers()
1783                        .await;
1784                    tracing::debug!(
1785                        "PortfolioMarginWsRefresh published position updates for {} wallets, margin updates for {} wallets, and greeks updates for {} wallets",
1786                        position_wallets,
1787                        wallets,
1788                        greek_wallets
1789                    );
1790                }
1791            }
1792        }
1793        Ok(())
1794    });
1795
1796    // Periodic competition updates + finalization notifications for subscribed wallets.
1797    let competition_service_for_ws = competition_service.clone();
1798    let competition_pubsub = pubsub.clone();
1799    let mut competition_ws_shutdown_rx = shutdown.subscribe();
1800    task_group.spawn("CompetitionWsRefresh", async move {
1801        let mut interval = tokio::time::interval(Duration::from_secs(5));
1802        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
1803        let mut last_rank_by_wallet: HashMap<WalletAddress, (i64, i64)> = HashMap::new();
1804        let mut last_gap_by_wallet: HashMap<WalletAddress, (i64, Option<i64>, Option<rust_decimal::Decimal>)> =
1805            HashMap::new();
1806
1807        loop {
1808            tokio::select! {
1809                _ = competition_ws_shutdown_rx.recv() => {
1810                    info!("CompetitionWsRefresh received shutdown signal");
1811                    break;
1812                }
1813                _ = interval.tick() => {
1814                    let now_ts_ms = chrono::Utc::now().timestamp_millis();
1815
1816                    match competition_service_for_ws.finalize_ended_competitions(now_ts_ms).await {
1817                        Ok(final_stats) => {
1818                            for stats in final_stats {
1819                                competition_pubsub
1820                                    .publish_competition_final_stats(stats.clone())
1821                                    .await;
1822                                competition_pubsub
1823                                    .publish_competition_final_standing(WsCompetitionFinalStanding {
1824                                        wallet_address: stats.wallet_address,
1825                                        competition_id: stats.competition_id,
1826                                        rank: stats.rank,
1827                                        pnl: stats.pnl,
1828                                        volume: stats.volume,
1829                                        efficiency: stats.efficiency,
1830                                        medal: stats.medal,
1831                                        timestamp: stats.timestamp,
1832                                    })
1833                                    .await;
1834                            }
1835                        }
1836                        Err(err) => {
1837                            error!("Competition finalization failed: {}", err);
1838                        }
1839                    }
1840
1841                    let wallets = competition_pubsub.subscribed_wallets("competition").await;
1842                    for wallet in wallets {
1843                        match competition_service_for_ws
1844                            .build_competition_update_for_wallet(wallet, now_ts_ms)
1845                            .await
1846                        {
1847                            Ok(Some(update)) => {
1848                                competition_pubsub.publish_competition_update(update).await;
1849                            }
1850                            Ok(None) => {}
1851                            Err(err) => {
1852                                error!(
1853                                    "Failed to build competition update for wallet {}: {}",
1854                                    wallet, err
1855                                );
1856                            }
1857                        }
1858                    }
1859
1860                    let engagement_wallets = competition_pubsub
1861                        .subscribed_wallets("competition_engagement")
1862                        .await;
1863                    for wallet in engagement_wallets.iter().copied() {
1864                        match competition_service_for_ws
1865                            .build_competition_engagement_snapshot_for_wallet(wallet, now_ts_ms)
1866                            .await
1867                        {
1868                            Ok(Some(snapshot)) => {
1869                                if let Some((prev_competition_id, prev_rank)) =
1870                                    last_rank_by_wallet.get(&wallet).copied()
1871                                {
1872                                    if prev_competition_id == snapshot.competition_id
1873                                        && prev_rank != snapshot.rank
1874                                    {
1875                                        competition_pubsub
1876                                            .publish_competition_rank_change(WsCompetitionRankChange {
1877                                                wallet_address: wallet,
1878                                                competition_id: snapshot.competition_id,
1879                                                from_rank: prev_rank,
1880                                                to_rank: snapshot.rank,
1881                                                delta_places: prev_rank - snapshot.rank,
1882                                                pnl: snapshot.pnl,
1883                                                timestamp: now_ts_ms,
1884                                            })
1885                                            .await;
1886                                    }
1887                                }
1888
1889                                let gap_state = (
1890                                    snapshot.competition_id,
1891                                    snapshot.next_rank,
1892                                    snapshot.gap_metric_value,
1893                                );
1894                                let should_publish_gap = last_gap_by_wallet
1895                                    .get(&wallet)
1896                                    .map(|prev| prev != &gap_state)
1897                                    .unwrap_or(true);
1898                                if should_publish_gap {
1899                                    competition_pubsub
1900                                        .publish_competition_gap_update(WsCompetitionGapUpdate {
1901                                            wallet_address: wallet,
1902                                            competition_id: snapshot.competition_id,
1903                                            rank: snapshot.rank,
1904                                            next_rank: snapshot.next_rank,
1905                                            gap_metric_value: snapshot.gap_metric_value,
1906                                            timestamp: now_ts_ms,
1907                                        })
1908                                        .await;
1909                                }
1910
1911                                last_rank_by_wallet
1912                                    .insert(wallet, (snapshot.competition_id, snapshot.rank));
1913                                last_gap_by_wallet.insert(wallet, gap_state);
1914                            }
1915                            Ok(None) => {
1916                                last_rank_by_wallet.remove(&wallet);
1917                                last_gap_by_wallet.remove(&wallet);
1918                            }
1919                            Err(err) => {
1920                                error!(
1921                                    "Failed to build competition engagement snapshot for wallet {}: {}",
1922                                    wallet, err
1923                                );
1924                            }
1925                        }
1926                    }
1927
1928                    let engagement_wallets_set =
1929                        engagement_wallets.iter().copied().collect::<std::collections::HashSet<_>>();
1930                    last_rank_by_wallet.retain(|wallet, _| engagement_wallets_set.contains(wallet));
1931                    last_gap_by_wallet.retain(|wallet, _| engagement_wallets_set.contains(wallet));
1932                }
1933            }
1934        }
1935        Ok(())
1936    });
1937
1938    // Ledger and tier_cache already wired to PortfolioService earlier (before catchup replay)
1939    tracing::info!(
1940        "✓ Wired SpanMarginService + RiskAccountBuilder into PortfolioCache for WS margin updates"
1941    );
1942
1943    // --- Standby mode setup ---
1944    // If STANDBY_MODE is set, the engine, journal batcher, and background write tasks
1945    // are deferred until POST /admin/promote is called. The HTTP server starts normally
1946    // so /standby-ready and /admin/promote are accessible.
1947    // Note: standby_mode_active is resolved near the top of this function (pool creation).
1948    let standby_mode = standby_mode_active;
1949
1950    let (standby_controller, _standby_promote_rx) = if standby_mode {
1951        let (controller, promote_rx) =
1952            crate::runtime::standby::StandbyController::new_standby(1000);
1953        info!("Standby mode enabled — warmup complete, engine deferred until POST /admin/promote");
1954        (Some(controller), Some(promote_rx))
1955    } else {
1956        (None, None)
1957    };
1958
1959    let rsm_signer_configured = runtime.config.rsm_signer.is_configured();
1960    let local_rsm_signer_configured = runtime.config.transaction_submitter.mode
1961        == hypercall_transaction_submitter::TransactionSubmitterMode::Direct;
1962    // Deposit crediting is not a separately toggled feature. If this process is
1963    // connected to chain, on-chain Exchange.Deposit events must become engine
1964    // credits. Deposits do not need the RSM signer because the user already
1965    // burned custody tokens in the Exchange contract.
1966    let onchain_deposits_enabled = true;
1967
1968    let rsm_signer_service: Option<Arc<dyn hypercall_signer::RsmSigner>> = if runtime
1969        .config
1970        .liquidation
1971        .enabled
1972        || rsm_signer_configured
1973        || local_rsm_signer_configured
1974    {
1975        if let Some(controller) = standby_controller.clone() {
1976            runtime
1977                .secrets
1978                .database_url
1979                .as_ref()
1980                .context("standby RSM signer requires DATABASE_URL for post-promote init")?;
1981            Some(
1982                Arc::new(crate::startup::rsm_signer::StandbyRsmSignerService::new(
1983                    controller,
1984                    runtime.config.clone(),
1985                    runtime.secrets.clone(),
1986                    db_auth.clone(),
1987                    rsm_signer_chain_id,
1988                )) as Arc<dyn hypercall_signer::RsmSigner>,
1989            )
1990        } else {
1991            Some(
1992                crate::startup::rsm_signer::build_rsm_signer_service(
1993                    &runtime.config,
1994                    &runtime.secrets,
1995                    diesel_db.clone(),
1996                    rsm_signer_chain_id,
1997                )
1998                .await
1999                .map_err(|error| anyhow::anyhow!("Failed to initialize RSM signer: {}", error))?,
2000            )
2001        }
2002    } else {
2003        None
2004    };
2005
2006    // (Journal + batcher construction moved up to before the runtime balance
2007    // ledger and portfolio-catchup initializations. `engine_journal_writer` and
2008    // `journal_batch_sender` are already in scope here.)
2009    //
2010    // Start portfolio snapshot task for periodic state persistence.
2011    // Capture uses the journal batcher flush barrier and the portfolio projection
2012    // barrier so snapshot state and replay boundary are aligned.
2013    let portfolio_service_for_snapshot = portfolio_cache.get_service();
2014    let snapshot_capture_portfolio_cache = portfolio_cache.clone();
2015    let snapshot_capture_diesel = shared_db.clone();
2016    let snapshot_capture_sender = journal_batch_sender.clone();
2017    let snapshot_task = Arc::new(
2018        PortfolioSnapshotTask::new(
2019            shared_db.clone(),
2020            shared_db.clone(),
2021            portfolio_service_for_snapshot,
2022            SnapshotTaskConfig::default(),
2023        )
2024        .with_capture_snapshot(Arc::new(move || {
2025            tokio::task::block_in_place(|| {
2026                tokio::runtime::Handle::current().block_on(async {
2027                    let _guard = snapshot_capture_portfolio_cache
2028                        .lock_projection_barrier()
2029                        .await;
2030                    if let Some(sender) = &snapshot_capture_sender {
2031                        let (ack_tx, ack_rx) = tokio::sync::oneshot::channel();
2032                        sender
2033                            .send(crate::journal::JournalMessage::Flush(ack_tx))
2034                            .await
2035                            .map_err(|e| {
2036                                anyhow::anyhow!("Failed to flush journal batcher: {}", e)
2037                            })?;
2038                        ack_rx.await.map_err(|_| {
2039                            anyhow::anyhow!("Journal batcher dropped snapshot flush ack")
2040                        })?;
2041                    }
2042
2043                    let replay: &dyn hypercall_db::JournalReplayReader =
2044                        snapshot_capture_diesel.as_ref();
2045                    let next_command_id = replay.get_next_engine_command_id_sync()?;
2046                    let states = snapshot_capture_portfolio_cache
2047                        .capture_portfolios_under_barrier(&_guard)
2048                        .await;
2049
2050                    let mut offsets = HashMap::new();
2051                    offsets.insert(
2052                        crate::read_cache::portfolio::ENGINE_COMMAND_SNAPSHOT_STREAM.to_string(),
2053                        HashMap::from([(0, next_command_id)]),
2054                    );
2055                    Ok((states, offsets))
2056                })
2057            })
2058        })),
2059    );
2060    if !standby_mode_active {
2061        service_registry.register(snapshot_task);
2062        info!("Portfolio snapshot task started (60s interval)");
2063    } else {
2064        info!("Portfolio snapshot task deferred (standby mode)");
2065    }
2066
2067    // Save a clone of oracles for Hydromancer startup recovery (before builder consumes them)
2068    let mark_price_oracles_for_recovery = mark_price_oracles.clone();
2069
2070    let engine_wal_path = hypercall_journal::checkpoint::wal_path_from_config(
2071        runtime.config.engine.persistence.journal.wal_path.as_ref(),
2072    );
2073    let engine_drain_marker_path = handlers::health::drain_marker_path(&engine_wal_path);
2074    let engine_wal_path_configured =
2075        hypercall_journal::checkpoint::wal_path_is_explicitly_configured(
2076            runtime.config.engine.persistence.journal.wal_path.as_ref(),
2077        );
2078    let engine_start_quiesced =
2079        engine_wal_path_configured && tokio::fs::metadata(&engine_drain_marker_path).await.is_ok();
2080    if engine_start_quiesced {
2081        info!(
2082            path = %engine_drain_marker_path.display(),
2083            "Persistent drain marker found; engine will start quiesced"
2084        );
2085    }
2086    let recovery_safety_report = hypercall_api::recovery_safety::shared_report();
2087    let portfolio_margin_mode_allowlist = parse_configured_wallet_set(
2088        "PORTFOLIO_MARGIN_MODE_ALLOWLIST",
2089        &runtime.config.engine.portfolio_margin_mode_allowlist,
2090    )?;
2091    let portfolio_margin_settlement_allowlist = parse_configured_wallet_set(
2092        "PORTFOLIO_MARGIN_SETTLEMENT_ALLOWLIST",
2093        &runtime.config.engine.portfolio_margin_settlement_allowlist,
2094    )?;
2095
2096    let mut builder = UnifiedEngineBuilder::new(config)
2097        .with_order_buffer_size(1000)
2098        .with_database(&engine_database_url)
2099        .with_db_auth(engine_db_auth)
2100        .with_runtime_settings(crate::rsm::unified_engine::EngineRuntimeSettings {
2101            snapshot_interval: Duration::from_secs(runtime.config.engine.snapshot_interval_secs),
2102            read_snapshot_interval: Duration::from_millis(
2103                runtime.config.engine.read_snapshot_interval_ms,
2104            ),
2105            post_startup_reconcile_delay: Duration::from_secs(
2106                runtime.config.engine.post_startup_reconcile_delay_secs,
2107            ),
2108            wal_path: runtime.config.engine.persistence.journal.wal_path.clone(),
2109            start_quiesced: engine_start_quiesced,
2110            recovery_safety_report: Some(recovery_safety_report.clone()),
2111            portfolio_margin_pool_enabled: runtime.config.engine.portfolio_margin_pool_enabled,
2112            portfolio_margin_settlement_allowlist: portfolio_margin_settlement_allowlist.clone(),
2113        })
2114        .with_mmp_cache(mmp_cache.clone())
2115        .with_tier_cache(tier_cache.clone())
2116        .with_portfolio_service(portfolio_service)
2117        .with_portfolio_cache(portfolio_cache.clone())
2118        .with_greeks_cache(greeks_cache.clone())
2119        .with_mark_price_oracles(mark_price_oracles)
2120        .with_risk_account_builder(risk_account_builder.clone())
2121        .with_vol_oracle(risk_vol_oracle.clone())
2122        .with_standard_account_builder(standard_account_builder.clone())
2123        .with_liquidation_cache(liquidation_cache.clone());
2124
2125    // In standby mode, skip DDL in the engine's internal DatabaseHandler.
2126    // The readonly replica can serve reads (max order_id, trade_id, l2_seq)
2127    // but cannot run migrations.
2128    if standby_mode_active || skip_db_migrations {
2129        builder = builder.with_skip_db_migrations();
2130    }
2131
2132    // Add journal writer to builder
2133    if let Some(ref writer) = engine_journal_writer {
2134        builder = builder.with_journal_writer(writer.clone());
2135    }
2136
2137    // Add journal batch sender to builder
2138    if let Some(ref sender) = journal_batch_sender {
2139        builder = builder.with_journal_batch_sender(sender.clone());
2140    }
2141
2142    // If journal is disabled, use mock journal (engine requires one)
2143    if engine_journal_writer.is_none() && journal_batch_sender.is_none() {
2144        builder = builder.with_mock_journal();
2145    }
2146
2147    let default_standby_replay_start_seq = if standby_mode_active {
2148        let nats_config =
2149            crate::nats::NatsConfig::from_env().expect("NATS_URL required for standby mode");
2150        let latest_seq = nats_config
2151            .latest_stream_sequence()
2152            .await
2153            .with_context(|| "Failed to read NATS stream tail for standby replay start sequence")?;
2154        info!(
2155            latest_seq,
2156            "Captured NATS stream tail before standby startup recovery"
2157        );
2158        Some(latest_seq as i64)
2159    } else {
2160        None
2161    };
2162
2163    let mut balance_update_publisher_for_api = None;
2164    let mut balance_update_stream_required = false;
2165
2166    // Connect NATS publisher for real-time command replication (optional)
2167    if let Some(nats_config) = crate::nats::NatsConfig::from_env() {
2168        balance_update_stream_required = true;
2169        match crate::nats::NatsPublisher::connect(&nats_config).await {
2170            Ok(publisher) => {
2171                builder = builder.with_nats_publisher(publisher);
2172                info!("NATS publisher connected for real-time command replication");
2173            }
2174            Err(e) => {
2175                warn!(
2176                    "Failed to connect NATS publisher (replication disabled): {}",
2177                    e
2178                );
2179            }
2180        }
2181        match crate::nats::NatsBalanceUpdatePublisher::connect(&nats_config).await {
2182            Ok(publisher) => {
2183                balance_update_publisher_for_api = Some(publisher.clone());
2184                builder = builder.with_nats_balance_update_publisher(publisher);
2185                info!("NATS balance update publisher connected");
2186            }
2187            Err(e) => {
2188                warn!("Failed to connect NATS balance update publisher: {}", e);
2189            }
2190        }
2191    }
2192
2193    #[cfg(feature = "testnet")]
2194    let testnet_bootstrap_accounts = {
2195        tracing::warn!("testnet feature enabled, funding hardhat test accounts");
2196        let test_accounts = vec![
2197            (
2198                WalletAddress::from_str("0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266").unwrap(),
2199                100_000.0,
2200            ), // Hardhat account 0 - wallet1 in tests
2201            (
2202                WalletAddress::from_str("0x70997970C51812dc3A010C7d01b50e0d17dc79C8").unwrap(),
2203                100_000.0,
2204            ), // Hardhat account 1 - wallet2 in tests
2205            // Note: Hardhat account 2 (0x3C44...) is intentionally NOT funded - used as wallet3 (observer-only) in tests
2206            // Hardhat account 3 - used for margin rejection tests with limited $10k funding
2207            (
2208                WalletAddress::from_str("0x90F79bf6EB2c4f870365E785982E1f101E93b906").unwrap(),
2209                10_000.0,
2210            ), // Hardhat account 3 - $10k for margin rejection tests
2211        ];
2212
2213        test_accounts
2214    };
2215
2216    builder = builder.with_read_snapshot(read_snapshot.clone());
2217
2218    builder = builder.with_ws_event_sender(ws_direct_tx.clone());
2219
2220    let engine_result = builder.build_with_info(engine_event_tx.clone(), shutdown.tx());
2221    let mut unified_engine = engine_result.engine;
2222    let engine_order_tx = engine_result.order_tx;
2223    let engine_market_tx = engine_result.market_tx;
2224    let engine_margin_mode_tx = engine_result.margin_mode_tx;
2225    let engine_agent_auth_tx = engine_result.agent_auth_tx;
2226    let engine_nonce_check_tx = engine_result.nonce_check_tx;
2227    let engine_pm_settlement_admin_tx = engine_result.pm_settlement_admin_tx;
2228    let engine_quiesce_tx = engine_result.quiesce_tx;
2229
2230    // Add engine sync_status to readiness gate so the API rejects orders
2231    // until post-startup ghost-order reconciliation completes (~5s after start).
2232    readiness_checks.push(Arc::new(SyncStatusReadiness::new(
2233        "engine",
2234        engine_result.sync_status,
2235    )));
2236    let readiness = Arc::new(ReadinessRegistry::new(readiness_checks));
2237    info!(
2238        "Readiness registry initialized with {} checks (including engine)",
2239        readiness.reports().len()
2240    );
2241
2242    // Startup recovery: attempt Hydromancer fallback for EXPIRED_PENDING_PRICE instruments
2243    if hydromancer_client.is_some() {
2244        let recovery_pool = db_pool.clone();
2245        let recovery_oracles = mark_price_oracles_for_recovery.clone();
2246        task_group.spawn("HydromancerStartupRecovery", async move {
2247            match crate::startup::hydromancer::recover_stuck_settlements(
2248                &recovery_pool,
2249                &recovery_oracles,
2250            )
2251            .await
2252            {
2253                Ok(count) => {
2254                    if count > 0 {
2255                        info!(
2256                            "Hydromancer startup recovery: settled {} stuck instruments",
2257                            count
2258                        );
2259                    }
2260                }
2261                Err(e) => {
2262                    error!("Hydromancer startup recovery failed: {}", e);
2263                }
2264            }
2265            Ok(())
2266        });
2267    }
2268
2269    // Wire RFQ execute channel to engine before starting it
2270    let (rfq_execute_tx, rfq_execute_rx) =
2271        tokio::sync::mpsc::channel::<hypercall_runtime_api::RfqExecuteRequest>(128);
2272    unified_engine.set_rfq_receiver(rfq_execute_rx);
2273
2274    // Wire deposit channel for faucet/admin deposits -> balance_ledger.
2275    // Failed withdrawals require manual reconciliation and are not auto-refunded.
2276    unified_engine.set_deposit_receiver(deposit_rx);
2277
2278    #[cfg(feature = "testnet")]
2279    let testnet_bootstrap_deposits = {
2280        let mut deposits = Vec::new();
2281        let now_ms = crate::shared::order_types::get_timestamp_millis();
2282        for (idx, (wallet, cash)) in testnet_bootstrap_accounts.iter().enumerate() {
2283            let sequence = testnet_bootstrap_deposit_sequence(idx);
2284            let source_event_hash = testnet_bootstrap_source_event_hash(sequence);
2285            if unified_engine
2286                .ctx
2287                .applied_deposit_source_event_hashes
2288                .contains(&source_event_hash)
2289            {
2290                continue;
2291            }
2292
2293            let current_cash = unified_engine.ctx.balance_ledger.balance(wallet);
2294            if current_cash != rust_decimal::Decimal::ZERO {
2295                warn!(
2296                    wallet = %wallet,
2297                    current_cash = %current_cash,
2298                    "Skipping testnet bootstrap funding for wallet with restored engine cash but no bootstrap source hash"
2299                );
2300                continue;
2301            }
2302
2303            let amount = rust_decimal::Decimal::from_f64_retain(*cash)
2304                .expect("testnet bootstrap cash f64 -> Decimal conversion");
2305            let (applied_tx, applied_rx) = tokio::sync::oneshot::channel();
2306            deposits.push((
2307                crate::rsm::unified_engine::DepositRequest {
2308                    wallet: *wallet,
2309                    amount,
2310                    timestamp_ms: now_ms,
2311                    sequence: Some(sequence),
2312                    source_event_hash,
2313                    journal_request_id: testnet_bootstrap_request_id(idx),
2314                    outbox_appends: Vec::new(),
2315                    applied_tx: Some(applied_tx),
2316                },
2317                applied_rx,
2318            ));
2319        }
2320        deposits
2321    };
2322
2323    let (option_deposit_tx, option_deposit_rx) =
2324        tokio::sync::mpsc::channel::<crate::rsm::unified_engine::OptionDepositRequest>(128);
2325    unified_engine.set_option_deposit_receiver(option_deposit_rx);
2326
2327    let (option_withdrawal_tx, option_withdrawal_rx) =
2328        tokio::sync::mpsc::channel::<crate::rsm::unified_engine::OptionWithdrawalRequest>(128);
2329    unified_engine.set_option_withdrawal_receiver(option_withdrawal_rx);
2330
2331    let (cash_withdrawal_tx, cash_withdrawal_rx) =
2332        tokio::sync::mpsc::channel::<crate::rsm::unified_engine::CashWithdrawalRequest>(128);
2333    unified_engine.set_cash_withdrawal_receiver(cash_withdrawal_rx);
2334
2335    // Failed withdrawals, cash or option, are not auto-refunded in this phase.
2336    // Operators reconcile them manually until user-driven withdrawal proofs exist.
2337
2338    if !standby_mode {
2339        if let Some(rsm_signer) = rsm_signer_service.as_ref() {
2340            let signer_address = rsm_signer.signer_address();
2341            let durable_next_nonce = shared_db
2342                .get_rsm_signer_nonce(&signer_address)
2343                .await
2344                .with_context(|| {
2345                    format!("failed to load durable RSM signer nonce for {signer_address}")
2346                })?
2347                .map(|record| {
2348                    u64::try_from(record.next_nonce).with_context(|| {
2349                        format!(
2350                            "persisted next_nonce {} for signer {} is negative",
2351                            record.next_nonce, signer_address
2352                        )
2353                    })
2354                })
2355                .transpose()?
2356                .unwrap_or(0);
2357            unified_engine.seed_rsm_signer_nonce_floor(signer_address, durable_next_nonce);
2358            info!(
2359                signer = %signer_address,
2360                durable_next_nonce,
2361                "Seeded engine RSM signer nonce floor from durable signer tracker"
2362            );
2363        }
2364
2365        if let Some(rsm_signer) = rsm_signer_service.clone() {
2366            let sequencer = Arc::new(crate::directive_outbox::DirectiveDeliverySequencer::new(
2367                shared_db.clone(),
2368                rsm_signer,
2369                engine_event_tx.clone(),
2370                runtime.config.engine.persistence.outbox.poll_ms,
2371            ));
2372            let handle = sequencer.start_with_shutdown(shutdown.subscribe());
2373            task_group.spawn("DirectiveDeliverySequencer", async move {
2374                if let Err(error) = handle.await {
2375                    error!("DirectiveDeliverySequencer task panicked: {:?}", error);
2376                }
2377                Ok(())
2378            });
2379            tracing::info!("✓ Directive delivery sequencer started");
2380        }
2381    }
2382
2383    let (tier_update_tx, tier_update_rx) =
2384        tokio::sync::mpsc::channel::<crate::rsm::unified_engine::TierUpdateRequest>(128);
2385    unified_engine.set_tier_update_receiver(tier_update_rx);
2386
2387    // Wire HyperCore equity channel for PM margin (Hydromancer feed -> engine)
2388    let (hypercore_equity_tx, hypercore_equity_rx) =
2389        tokio::sync::mpsc::channel::<crate::rsm::unified_engine::HypercoreEquityRequest>(128);
2390    unified_engine.set_hypercore_equity_receiver(hypercore_equity_rx);
2391
2392    // Wire liquidation bonus channel separately from deposits for semantic clarity.
2393    let (liquidation_bonus_tx, liquidation_bonus_rx) =
2394        tokio::sync::mpsc::channel::<crate::rsm::unified_engine::LiquidationBonusRequest>(128);
2395    unified_engine.set_liquidation_bonus_receiver(liquidation_bonus_rx);
2396
2397    // Wire trading_mode notify channel. Initial value is an empty
2398    // map — the catalog manager publishes the real map on its first
2399    // rewrite tick. The engine main loop `changed()`s on the receiver
2400    // and applies updates via `apply_underlying_trading_mode_update`.
2401    let (trading_mode_notify_tx, trading_mode_notify_rx) = tokio::sync::watch::channel::<
2402        std::collections::HashMap<String, hypercall_types::TradingModes>,
2403    >(std::collections::HashMap::new());
2404    unified_engine.set_trading_mode_receiver(trading_mode_notify_rx);
2405
2406    // --- NATS standby replication ---
2407    let standby_startup_heartbeat = if standby_mode {
2408        startup_progress.as_ref().map(|progress| {
2409            let progress = progress.clone();
2410            tokio::spawn(async move {
2411                let mut interval = tokio::time::interval(Duration::from_secs(15));
2412                loop {
2413                    interval.tick().await;
2414                    progress.heartbeat();
2415                }
2416            })
2417        })
2418    } else {
2419        None
2420    };
2421
2422    if standby_mode {
2423        let nats_config =
2424            crate::nats::NatsConfig::from_env().expect("NATS_URL required for standby mode");
2425
2426        if let Some(progress) = startup_progress.as_ref() {
2427            progress.mark("standby_hydrate_base_state");
2428        }
2429        unified_engine.hydrate_standby_base_state().await;
2430        if let Some(progress) = startup_progress.as_ref() {
2431            progress.mark("standby_hydrate_base_state_complete");
2432        }
2433
2434        let progress = standby_progress
2435            .clone()
2436            .context("standby replay progress missing while STANDBY_MODE is active")?;
2437        let promote_rx = standby_replay_promote_rx
2438            .take()
2439            .context("standby promote receiver missing while STANDBY_MODE is active")?;
2440        let (engine_tx, engine_rx) =
2441            tokio::sync::oneshot::channel::<crate::nats::standby_handler::StandbyEngineHandler>();
2442
2443        let handler = crate::nats::standby_handler::StandbyEngineHandler::new(unified_engine);
2444        let default_replay_start_seq = default_standby_replay_start_seq.ok_or_else(|| {
2445            anyhow::anyhow!("NATS stream tail was not captured for standby replay")
2446        })?;
2447        let replay_start_seq = match std::env::var("NATS_REPLAY_START_SEQ") {
2448            Ok(value) => value
2449                .parse::<i64>()
2450                .with_context(|| format!("NATS_REPLAY_START_SEQ must be an i64, got {value:?}"))?,
2451            Err(std::env::VarError::NotPresent) => default_replay_start_seq,
2452            Err(error) => {
2453                return Err(anyhow::anyhow!(
2454                    "Failed to read NATS_REPLAY_START_SEQ: {error}"
2455                ));
2456            }
2457        };
2458        if replay_start_seq < 0 {
2459            return Err(anyhow::anyhow!(
2460                "NATS_REPLAY_START_SEQ must be >= 0, got {replay_start_seq}"
2461            ));
2462        }
2463        info!(
2464            replay_start_seq,
2465            default_replay_start_seq, "Starting standby NATS replay"
2466        );
2467        info!("Starting in STANDBY mode — replaying from NATS, engine deferred");
2468        if let Some(progress) = startup_progress.as_ref() {
2469            progress.mark("standby_nats_replay_starting");
2470        }
2471
2472        task_group.spawn("StandbyReplayLoop", async move {
2473            let (bcast_tx, bcast_rx) = tokio::sync::broadcast::channel::<()>(1);
2474            tokio::spawn(async move {
2475                promote_rx.await.ok();
2476                bcast_tx.send(()).ok();
2477            });
2478            match crate::nats::replay_loop::run_replay_loop(
2479                &nats_config,
2480                replay_start_seq,
2481                handler,
2482                progress,
2483                bcast_rx,
2484            )
2485            .await
2486            {
2487                Ok(h) => {
2488                    engine_tx.send(h).ok();
2489                }
2490                Err(e) => {
2491                    error!("Standby replay loop error: {}", e);
2492                }
2493            }
2494            Ok(())
2495        });
2496
2497        let db_auth_for_promote = db_auth.clone();
2498        let post_promote_services_for_promote = post_promote_services.clone();
2499        let shutdown_for_promote = shutdown.clone();
2500        let shared_db_for_promote = shared_db.clone();
2501        let rsm_signer_for_promote = rsm_signer_service.clone();
2502        let engine_event_tx_for_promote = engine_event_tx.clone();
2503        let directive_outbox_poll_ms = runtime.config.engine.persistence.outbox.poll_ms;
2504        task_group.spawn("StandbyPromote", async move {
2505            match engine_rx.await {
2506                Ok(handler) => {
2507                    info!("Standby promoted — creating read-write database pools");
2508                    let mut engine = handler.into_engine();
2509
2510                    // Create a read-write DieselEventHandler from DATABASE_URL and
2511                    // swap it into the engine, replacing the readonly handler used
2512                    // during NATS replay.  This runs migrations on the primary.
2513                    // Failure here is fatal: starting the engine without write-capable
2514                    // persistence would silently drop fills/settlements.
2515                    match DatabaseHandler::new_with_auth(db_auth_for_promote.clone(), 3) {
2516                        Ok(rw_handler) => {
2517                            engine.set_db(rw_handler);
2518                            info!("Pool swap complete — engine now has read-write DB access");
2519                        }
2520                        Err(e) => {
2521                            panic!(
2522                                "CRITICAL_FAILURE: Failed to create read-write DatabaseHandler on promote: {}. \
2523                                 Cannot start engine without write-capable persistence.",
2524                                e
2525                            );
2526                        }
2527                    }
2528
2529                    if let Some(rsm_signer) = rsm_signer_for_promote.clone() {
2530                        if let Err(error) = rsm_signer.status().await {
2531                            panic!(
2532                                "CRITICAL_FAILURE: Failed to initialize RSM signer after standby promotion: {}",
2533                                error
2534                            );
2535                        }
2536
2537                        let signer_address = rsm_signer.signer_address();
2538                        let durable_next_nonce = shared_db_for_promote
2539                            .get_rsm_signer_nonce(&signer_address)
2540                            .await
2541                            .unwrap_or_else(|error| {
2542                                panic!(
2543                                    "CRITICAL_FAILURE: failed to load durable RSM signer nonce for {} after standby promotion: {}",
2544                                    signer_address, error
2545                                )
2546                            })
2547                            .map(|record| {
2548                                u64::try_from(record.next_nonce).unwrap_or_else(|_| {
2549                                    panic!(
2550                                        "CRITICAL_FAILURE: persisted next_nonce {} for signer {} is negative after standby promotion",
2551                                        record.next_nonce, signer_address
2552                                    )
2553                                })
2554                            })
2555                            .unwrap_or(0);
2556                        engine.seed_rsm_signer_nonce_floor(signer_address, durable_next_nonce);
2557                        info!(
2558                            signer = %signer_address,
2559                            durable_next_nonce,
2560                            "Seeded engine RSM signer nonce floor after standby promotion"
2561                        );
2562
2563                        let sequencer =
2564                            Arc::new(crate::directive_outbox::DirectiveDeliverySequencer::new(
2565                                shared_db_for_promote.clone(),
2566                                rsm_signer,
2567                                engine_event_tx_for_promote.clone(),
2568                                directive_outbox_poll_ms,
2569                            ));
2570                        let handle =
2571                            sequencer.start_with_shutdown(shutdown_for_promote.subscribe());
2572                        tokio::spawn(async move {
2573                            if let Err(error) = handle.await {
2574                                error!("DirectiveDeliverySequencer task panicked: {:?}", error);
2575                            }
2576                        });
2577                        info!("Directive delivery sequencer started after standby promotion");
2578                    }
2579
2580
2581                    let services = {
2582                        let mut services = post_promote_services_for_promote.lock().await;
2583                        std::mem::take(&mut *services)
2584                    };
2585                    for service in services {
2586                        let service_name = service.name();
2587                        let owner = service.owner();
2588                        let shutdown_rx = shutdown_for_promote.subscribe();
2589                        info!(
2590                            service = service_name,
2591                            owner = %owner,
2592                            "Starting deferred service after standby promotion"
2593                        );
2594                        tokio::spawn(async move {
2595                            if let Err(error) = service.run(shutdown_rx).await {
2596                                error!(
2597                                    service = service_name,
2598                                    owner = %owner,
2599                                    error = %error,
2600                                    "Deferred service exited with error"
2601                                );
2602                            }
2603                        });
2604                    }
2605
2606                    info!("Starting engine event loop after promote");
2607                    engine.start().await;
2608                }
2609                Err(_) => {
2610                    error!("Replay loop ended without returning engine");
2611                }
2612            }
2613            Ok(())
2614        });
2615
2616        if let Some(progress) = startup_progress.as_ref() {
2617            progress.mark("standby_replay_task_spawned");
2618        }
2619    } else {
2620        task_group.spawn("UnifiedEngine", async move {
2621            unified_engine.start().await;
2622            Ok(())
2623        });
2624
2625        #[cfg(feature = "testnet")]
2626        for (deposit, applied_rx) in testnet_bootstrap_deposits {
2627            let wallet = deposit.wallet;
2628            let amount = deposit.amount;
2629            deposit_tx.send(deposit).await.map_err(|error| {
2630                anyhow::anyhow!("failed to queue testnet bootstrap deposit: {error}")
2631            })?;
2632            match applied_rx.await {
2633                Ok(Ok(())) => {
2634                    info!(
2635                        wallet = %wallet,
2636                        amount = %amount,
2637                        "Applied journaled testnet bootstrap deposit"
2638                    );
2639                }
2640                Ok(Err(error)) => {
2641                    return Err(anyhow::anyhow!(
2642                        "testnet bootstrap deposit for {wallet} failed: {error}"
2643                    ));
2644                }
2645                Err(error) => {
2646                    return Err(anyhow::anyhow!(
2647                        "testnet bootstrap deposit ack for {wallet} dropped: {error}"
2648                    ));
2649                }
2650            }
2651        }
2652    }
2653
2654    // Create drain signal + flag for blue-green switchover
2655    let drain_signal = std::sync::Arc::new(tokio::sync::Notify::new());
2656    let is_draining =
2657        std::sync::Arc::new(std::sync::atomic::AtomicBool::new(engine_start_quiesced));
2658
2659    // Forward orders to unified engine (shutdown-aware, standby-aware, drain-aware)
2660    let mut order_rx = order_rx;
2661    let engine_order_tx_clone = engine_order_tx.clone();
2662    let mut order_forwarder_shutdown_rx = shutdown.subscribe();
2663    let standby_for_forwarder = standby_controller.clone();
2664    let is_draining_for_forwarder = is_draining.clone();
2665    task_group.spawn("OrderForwarder", async move {
2666        loop {
2667            tokio::select! {
2668                _ = order_forwarder_shutdown_rx.recv() => {
2669                    info!("OrderForwarder received shutdown signal");
2670                    break;
2671                }
2672                maybe_req = order_rx.recv() => {
2673                    match maybe_req {
2674                        Some(request) => {
2675                            // In standby/draining mode, reject orders immediately.
2676                            // The blue/green deploy script promotes before switching
2677                            // the Service, so write traffic shouldn't arrive in standby.
2678                            // Rejecting (instead of queuing) avoids hanging HTTP handlers.
2679                            if let Some(ref controller) = standby_for_forwarder {
2680                                if controller.is_standby().await {
2681                                    tracing::debug!("Rejecting order — server in standby mode");
2682                                    let _ = request.response_tx.send(hypercall_types::OrderUpdateMessage {
2683                                        order_id: None,
2684                                        info: request.message.info,
2685                                        status: hypercall_types::OrderUpdateStatus::Rejected,
2686                                        timestamp: std::time::SystemTime::now()
2687                                            .duration_since(std::time::UNIX_EPOCH)
2688                                            .unwrap_or_default()
2689                                            .as_millis() as u64,
2690                                        reason: Some("Server in standby mode, not accepting orders".to_string()),
2691                                        filled_size: rust_decimal::Decimal::ZERO,
2692                                        wallet_address: request.message.wallet,
2693                                        mmp_triggered: false,
2694                                        request_id: request.message.request_id,
2695                                    }).await;
2696                                    continue;
2697                                }
2698                            }
2699                            // Check drain flag before forwarding
2700                            if is_draining_for_forwarder.load(std::sync::atomic::Ordering::Relaxed) {
2701                                tracing::debug!("Rejecting order — server is draining");
2702                                let _ = request.response_tx.send(hypercall_types::OrderUpdateMessage {
2703                                    order_id: None,
2704                                    info: request.message.info,
2705                                    status: hypercall_types::OrderUpdateStatus::Rejected,
2706                                    timestamp: std::time::SystemTime::now()
2707                                        .duration_since(std::time::UNIX_EPOCH)
2708                                        .unwrap_or_default()
2709                                        .as_millis() as u64,
2710                                    reason: Some("Server is draining for blue/green deploy".to_string()),
2711                                    filled_size: rust_decimal::Decimal::ZERO,
2712                                    wallet_address: request.message.wallet,
2713                                    mmp_triggered: false,
2714                                    request_id: request.message.request_id,
2715                                }).await;
2716                                continue;
2717                            }
2718                            // Active mode (or no standby controller) — forward to engine
2719                            if engine_order_tx_clone.send(request).await.is_err() {
2720                                tracing::error!("Failed to send order to unified engine");
2721                                break;
2722                            }
2723                        }
2724                        None => break,
2725                    }
2726                }
2727            }
2728        }
2729        Ok(())
2730    });
2731
2732    // Transition standby controller to active after promote
2733    if let Some(drain_controller) = standby_controller.clone() {
2734        task_group.spawn("StandbyFinalize", async move {
2735            loop {
2736                if drain_controller.is_promoted().await {
2737                    break;
2738                }
2739                tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2740            }
2741            // No orders to drain (OrderForwarder rejects in standby instead of queuing).
2742            // Just transition to active so is_standby() returns false.
2743            let _ = drain_controller.take_drain_batch().await;
2744            info!("Standby controller transitioned to active");
2745            Ok(())
2746        });
2747    }
2748
2749    // Start CatalogManager if enabled
2750    // CatalogManager continuously reconciles markets/instruments from policy file
2751    // Note: catalog_config was already loaded earlier for oracle setup
2752    if runtime.config.catalog_manager.enabled {
2753        let catalog_manager = crate::catalog_manager::CatalogManager::new(
2754            diesel_db.clone(),
2755            catalog_config,
2756            engine_market_tx.clone(),
2757            mark_price_oracles_for_catalog,
2758            runtime.config.catalog_manager.interval_secs,
2759        )
2760        .with_trading_mode_notify(trading_mode_notify_tx);
2761
2762        let catalog_manager = Arc::new(catalog_manager);
2763        if standby_mode {
2764            let deferred_service: Arc<dyn Service> = catalog_manager;
2765            post_promote_services.lock().await.push(deferred_service);
2766            info!("CatalogManager deferred until standby promotion");
2767        } else {
2768            service_registry.register(catalog_manager);
2769            info!(
2770                "✓ CatalogManager started with interval={}s (trading_mode notify channel wired)",
2771                runtime.config.catalog_manager.interval_secs
2772            );
2773        }
2774    } else {
2775        debug!("CatalogManager disabled");
2776    }
2777
2778    let trading_halt = Arc::new(tokio::sync::RwLock::new(TradingHaltState::from_config(
2779        runtime.config.api.global_trading_halted,
2780        &runtime.config.api.halted_markets,
2781    )));
2782    {
2783        let halt_state = trading_halt.read().await;
2784        if halt_state.global_halt.is_some() || !halt_state.halted_markets.is_empty() {
2785            tracing::warn!(
2786                global_halted = halt_state.global_halt.is_some(),
2787                halted_markets = halt_state.halted_markets.len(),
2788                "Trading halt loaded from startup configuration"
2789            );
2790        }
2791    }
2792
2793    let chain_auth: Arc<dyn directives::onchain::ChainAuthReader> = Arc::new(
2794        directives::onchain::AlloyChainAuthReader::new(
2795            &runtime.config.transaction_submitter.rpc_url,
2796            &runtime.config.contracts.exchange_contract_address,
2797        )
2798        .map_err(|e| anyhow::anyhow!("Failed to initialize chain auth reader: {}", e))?,
2799    );
2800    let gas_provider_service = Arc::new(if cfg!(feature = "test-endpoints") {
2801        hypercall_api::gas_provider::GasProviderService::new_mock()
2802    } else {
2803        match hypercall_api::gas_provider::GasProviderService::new(
2804            &runtime.config.transaction_submitter.rpc_url,
2805        )
2806        .await
2807        {
2808            Ok(service) => service,
2809            Err(error)
2810                if runtime.config.transaction_submitter.mode
2811                    == hypercall_transaction_submitter::TransactionSubmitterMode::Mock =>
2812            {
2813                let fallback_chain_id = if runtime.config.modes.testnet_mode {
2814                    hypercall_types::directives::HYPERCALL_TESTNET_CHAIN_ID
2815                } else {
2816                    hypercall_types::directives::HYPERCALL_MAINNET_CHAIN_ID
2817                };
2818                tracing::warn!(
2819                    chain_id = fallback_chain_id,
2820                    rpc_url = %runtime.config.transaction_submitter.rpc_url,
2821                    "Gas provider chain ID lookup failed in mock submitter mode, falling back to configured chain id: {}",
2822                    error
2823                );
2824                hypercall_api::gas_provider::GasProviderService::new_with_chain_id(
2825                    &runtime.config.transaction_submitter.rpc_url,
2826                    fallback_chain_id,
2827                )
2828                .map_err(|fallback_error| {
2829                    anyhow::anyhow!(
2830                        "Failed to initialize gas provider service fallback: {}",
2831                        fallback_error
2832                    )
2833                })?
2834            }
2835            Err(error) => {
2836                return Err(anyhow::anyhow!(
2837                    "Failed to initialize gas provider service: {}",
2838                    error
2839                ));
2840            }
2841        }
2842    });
2843
2844    let signing_chain_id = if runtime.config.modes.testnet_mode {
2845        hypercall_types::directives::HYPERCALL_TESTNET_CHAIN_ID
2846    } else {
2847        hypercall_types::directives::HYPERCALL_MAINNET_CHAIN_ID
2848    };
2849    let api_portfolio_margin_mode_allowlist = portfolio_margin_mode_allowlist
2850        .iter()
2851        .copied()
2852        .collect::<Vec<_>>();
2853    let api_portfolio_margin_settlement_allowlist = portfolio_margin_settlement_allowlist
2854        .iter()
2855        .copied()
2856        .collect::<Vec<_>>();
2857    let app_runtime_config = Arc::new(handlers::AppRuntimeConfig {
2858        testnet_mode: runtime.config.modes.testnet_mode,
2859        trade_explorer_url_template: runtime.config.api.trade_explorer_url_template.clone(),
2860        wal_path: hypercall_journal::checkpoint::wal_path_from_config(
2861            runtime.config.engine.persistence.journal.wal_path.as_ref(),
2862        ),
2863        db_host: crate::startup::database::extract_db_host(&runtime.secrets.database_url),
2864        db_name: crate::startup::database::extract_db_name(&runtime.secrets.database_url),
2865        directive_chain_id: signing_chain_id,
2866        signing_chain_id,
2867        exchange_contract_address: runtime.config.contracts.exchange_contract_address.clone(),
2868        portfolio_margin_pool_enabled: runtime.config.engine.portfolio_margin_pool_enabled,
2869        portfolio_margin_mode_allowlist: api_portfolio_margin_mode_allowlist,
2870        portfolio_margin_settlement_allowlist: api_portfolio_margin_settlement_allowlist,
2871        #[cfg(feature = "rsm-state")]
2872        rsm_environment: runtime
2873            .config
2874            .engine
2875            .persistence
2876            .journal
2877            .rsm_environment
2878            .into(),
2879    });
2880
2881    // Initialize username service (Postgres + optional Redis cache)
2882    let username_service = Arc::new(
2883        hypercall_api::username_service::UsernameService::new(
2884            diesel_db.clone(),
2885            username_redis_client,
2886        )
2887        .await,
2888    );
2889    tracing::info!("✓ UsernameService initialized");
2890
2891    let rfq_websocket_publisher: Arc<dyn hypercall_api::rfq::qp_ws_state::RfqWebsocketPublisher> =
2892        pubsub.clone();
2893    let rfq_resources = crate::startup::rfq::build_rfq_resources(
2894        shared_db.clone(),
2895        rfq_execute_tx,
2896        &shutdown,
2897        indicative_cache.clone(),
2898        agent_auth.clone(),
2899        signing_chain_id,
2900        instruments_cache.clone(),
2901        engine_nonce_check_tx.clone(),
2902        Some(rfq_websocket_publisher),
2903    )
2904    .await?;
2905    let rfq_manager = rfq_resources.rfq_manager.clone();
2906    let rfq_handler_state = rfq_resources.rfq_handler_state.clone();
2907    let qp_ws_state = rfq_resources.qp_ws_state;
2908
2909    // Create app state
2910    let app_state = handlers::AppState {
2911        db: diesel_db.clone(),
2912        order_sender: order_tx.clone(),
2913        market_sender: engine_market_tx.clone(),
2914        engine_quiesce_sender: engine_quiesce_tx.clone(),
2915        margin_mode_sender: engine_margin_mode_tx.clone(),
2916        agent_auth_sender: engine_agent_auth_tx.clone(),
2917        agent_auth: agent_auth.clone(),
2918        auth_failure_recorder: Arc::new(
2919            crate::observability::api_boundary::MetricsAuthFailureRecorder,
2920        ),
2921        metrics_renderer: Arc::new(crate::observability::api_boundary::PrometheusMetricsRenderer),
2922        greeks_cache: greeks_cache.clone(),
2923        portfolio_cache: portfolio_cache.clone(),
2924        instruments_cache: instruments_cache.clone(),
2925        market_stats_cache: market_stats_cache.clone(),
2926        markets_snapshot_cache: markets_snapshot_cache.clone(),
2927        instruments_snapshot_cache: instruments_snapshot_cache.clone(),
2928        options_summary_snapshot_cache: options_summary_snapshot_cache.clone(),
2929        event_bus_sender: engine_event_tx.clone(),
2930        chain_auth: chain_auth.clone(),
2931        exchange_address: alloy::primitives::Address::from_str(
2932            &runtime.config.contracts.exchange_contract_address,
2933        )
2934        .ok(),
2935        gas_provider_service: gas_provider_service.clone(),
2936        transaction_request_journal: Some(Arc::new(
2937            crate::api_boundary_impls::RuntimeTransactionRequestJournal::new(
2938                journal_batch_sender.clone(),
2939                engine_journal_writer.clone(),
2940            ),
2941        )),
2942        mmp_cache: mmp_cache.clone(),
2943        tier_cache: tier_cache.clone(),
2944        risk_vol_oracle: risk_vol_oracle.clone(),
2945        readiness: readiness.clone() as Arc<dyn hypercall_api::runtime_status::ReadinessGate>,
2946        balance_provider: Arc::new(crate::api_boundary_impls::RuntimeBalanceProvider::new(
2947            balance_provider.clone(),
2948        )),
2949        balance_snapshot_provider: Arc::new(
2950            crate::api_boundary_impls::RuntimeBalanceSnapshotProvider::new(
2951                snapshot_provider.clone(),
2952                balance_update_publisher_for_api.clone(),
2953                balance_update_stream_required,
2954            ),
2955        ),
2956        deposit_sender: Some(deposit_tx.clone()),
2957        option_deposit_sender: Some(option_deposit_tx.clone()),
2958        option_withdrawal_sender: Some(option_withdrawal_tx.clone()),
2959        cash_withdrawal_sender: Some(cash_withdrawal_tx.clone()),
2960        tier_update_sender: Some(tier_update_tx.clone()),
2961        pm_settlement_admin_sender: Some(engine_pm_settlement_admin_tx.clone()),
2962        sync_db: Some(shared_db.clone()),
2963        rate_limit_cache: rate_limit_cache.clone(),
2964        engine_journal_reader: engine_journal_writer.clone().map(|writer| {
2965            Arc::new(crate::api_boundary_impls::RuntimeEngineJournalReader::new(
2966                writer,
2967            )) as Arc<dyn hypercall_api::boundary::engine::EngineJournalReader>
2968        }),
2969        runtime_config: app_runtime_config,
2970        build_info: hypercall_api::observability_boundary::BuildInfo {
2971            version: crate::observability::CARGO_VERSION.to_string(),
2972            commit: crate::observability::GIT_COMMIT.to_string(),
2973            git_ref: crate::observability::GIT_REF.to_string(),
2974            build_time: crate::observability::BUILD_TIME.to_string(),
2975        },
2976        collateral_registry: collateral_registry.clone(),
2977        admin_api_key: runtime.secrets.admin_api_key.clone(),
2978        allow_unauthenticated_monitoring: runtime.config.environment.name == "development",
2979        boot_id: uuid::Uuid::now_v7().to_string(),
2980        server_started_at: chrono::Utc::now(),
2981        trading_halt: trading_halt.clone(),
2982        rsm_signer: rsm_signer_service.clone(),
2983        rsm_signer_address: if standby_mode {
2984            None
2985        } else {
2986            rsm_signer_service
2987                .as_ref()
2988                .map(|signer| signer.signer_address())
2989        },
2990        // Fill checks go to HyperCore (testnet), not mainnet pricing
2991        hl_info_url: Some(runtime.config.pricing.hypercore_info_url.clone()),
2992        exchange_pool_liquidity_reader: Arc::new(
2993            hypercall_api::state::HypercoreExchangePoolLiquidityReader::new(
2994                Some(runtime.config.pricing.hypercore_info_url.clone()),
2995                sync_exchange_address,
2996            ),
2997        ),
2998        hydromancer_feed: {
2999            if runtime.config.hydromancer.enabled {
3000                match runtime
3001                    .config
3002                    .hydromancer
3003                    .ws_api_key
3004                    .as_deref()
3005                    .or(runtime.secrets.hydromancer_ws_api_key.as_deref())
3006                    .or(runtime.secrets.hydromancer_api_key.as_deref())
3007                {
3008                    Some(api_key) => {
3009                        let api_key = api_key.trim();
3010                        let ws_base = runtime
3011                            .config
3012                            .hydromancer
3013                            .ws_url
3014                            .as_deref()
3015                            .unwrap_or("wss://api.hydromancer.xyz/ws");
3016                        let ws_url = format!("{}?token={}", ws_base, api_key);
3017                        // Position queries go to HyperCore (testnet), not mainnet pricing
3018                        let hl_info = runtime.config.pricing.hypercore_info_url.clone();
3019                        let managed_accounts = crate::startup::hydromancer::load_exchange_accounts(
3020                            &runtime.config.transaction_submitter.rpc_url,
3021                            &runtime.config.contracts.exchange_contract_address,
3022                        )
3023                        .await;
3024                        info!(
3025                            count = managed_accounts.len(),
3026                            "Hydromancer feed: loaded accounts from Exchange"
3027                        );
3028
3029                        let account_addresses: Vec<WalletAddress> =
3030                            managed_accounts.iter().map(|p| p.account).collect();
3031                        let account_to_manager: std::collections::HashMap<
3032                            WalletAddress,
3033                            WalletAddress,
3034                        > = managed_accounts
3035                            .iter()
3036                            .map(|p| (p.account, p.manager))
3037                            .collect();
3038
3039                        let hydromancer_api_url = runtime.config.hydromancer.api_url.clone();
3040                        let hydromancer_api_key = Some(api_key.to_string());
3041                        let feed = Arc::new(crate::hypercore::HydromancerFeedService::new(
3042                            ws_url,
3043                            hl_info,
3044                            hydromancer_api_url,
3045                            hydromancer_api_key,
3046                            account_addresses,
3047                        ));
3048                        let feed_clone = feed.clone();
3049                        let shutdown_rx = shutdown.subscribe();
3050                        tokio::spawn(async move {
3051                            feed_clone.start(shutdown_rx).await;
3052                        });
3053
3054                        // Bridge: perp positions from Hydromancer feed → PortfolioService for PM.
3055                        // Only uses PositionSnapshot (full reconciled state from REST on each
3056                        // connect/reconnect). Fills are handled by the feed's internal position
3057                        // tracking; the next snapshot will reflect them.
3058                        {
3059                            let mut position_rx = feed.subscribe();
3060                            let pc = portfolio_cache.clone();
3061                            let feed_ref = feed.clone();
3062                            let pubsub_for_bridge = pubsub.clone();
3063                            let initial_accounts: Vec<WalletAddress> =
3064                                managed_accounts.iter().map(|p| p.account).collect();
3065                            let a2m =
3066                                std::sync::Arc::new(tokio::sync::RwLock::new(account_to_manager));
3067                            let chain_auth_bridge: Arc<dyn directives::onchain::ChainAuthReader> =
3068                                chain_auth.clone();
3069                            let equity_tx = hypercore_equity_tx.clone();
3070                            tokio::spawn(async move {
3071                                info!(
3072                                    accounts = initial_accounts.len(),
3073                                    "PM bridge: task spawned, waiting 10s for feed reconciliation"
3074                                );
3075                                tokio::time::sleep(std::time::Duration::from_secs(10)).await;
3076                                info!("PM bridge: starting initial sync");
3077                                for acct in &initial_accounts {
3078                                    if let Some(state) = feed_ref.get_positions(acct).await {
3079                                        let manager = {
3080                                            let map = a2m.read().await;
3081                                            map.get(acct).copied()
3082                                        };
3083                                        if let Some(manager) = manager {
3084                                            for pos in &state.positions {
3085                                                let Some(entry_price) = pos.entry_price else {
3086                                                    warn!(
3087                                                        account = %acct,
3088                                                        coin = %pos.coin,
3089                                                        "PM bridge: skipping Hydromancer position with missing entry price"
3090                                                    );
3091                                                    continue;
3092                                                };
3093                                                let update =
3094                                                    crate::portfolio::HypercorePositionUpdate {
3095                                                        account: manager.to_string(),
3096                                                        coin: pos.coin.clone(),
3097                                                        size: pos.size,
3098                                                        entry_price,
3099                                                        unrealized_pnl: pos.unrealized_pnl,
3100                                                        timestamp: state.last_updated_ms,
3101                                                        snapshot: true,
3102                                                    };
3103                                                pc.handle_hypercore_position_update(update).await;
3104                                            }
3105                                            // Send HyperCore equity to engine for PM margin
3106                                            {
3107                                                let account_value =
3108                                                    rust_decimal::Decimal::from_f64_retain(
3109                                                        state.account_value,
3110                                                    )
3111                                                    .unwrap_or_default();
3112                                                let _ = equity_tx.send(
3113                                                    crate::rsm::unified_engine::HypercoreEquityRequest {
3114                                                        wallet: manager,
3115                                                        account_value,
3116                                                        timestamp_ms: state.last_updated_ms,
3117                                                    },
3118                                                ).await;
3119                                            }
3120                                            info!(
3121                                                account = %acct,
3122                                                positions = state.positions.len(),
3123                                                account_value = state.account_value,
3124                                                "PM bridge: initial sync"
3125                                            );
3126                                        }
3127                                    }
3128                                }
3129
3130                                info!("PM bridge: initial sync complete, entering event loop");
3131
3132                                // Only forward fills to WS that happened after bridge start
3133                                // to avoid replaying historical fills on reconnect
3134                                let bridge_start_ms =
3135                                    crate::shared::order_types::get_timestamp_millis();
3136
3137                                let mut prev_coins: std::collections::HashMap<
3138                                    WalletAddress,
3139                                    std::collections::HashSet<String>,
3140                                > = std::collections::HashMap::new();
3141
3142                                loop {
3143                                    match position_rx.recv().await {
3144                                        Ok(
3145                                            crate::hypercore::HydromancerEvent::PositionSnapshot {
3146                                                account,
3147                                                state,
3148                                            },
3149                                        ) => {
3150                                            let manager = {
3151                                                let map = a2m.read().await;
3152                                                map.get(&account).copied()
3153                                            };
3154                                            let manager = match manager {
3155                                                Some(m) => m,
3156                                                None => {
3157                                                    match chain_auth_bridge
3158                                                        .get_manager(account.inner())
3159                                                        .await
3160                                                    {
3161                                                        Ok(addr)
3162                                                            if addr
3163                                                                != alloy::primitives::Address::ZERO =>
3164                                                        {
3165                                                            let mgr = WalletAddress::from(addr);
3166                                                            a2m.write().await.insert(account, mgr);
3167                                                            mgr
3168                                                        }
3169                                                        _ => continue,
3170                                                    }
3171                                                }
3172                                            };
3173
3174                                            let mut current_coins =
3175                                                std::collections::HashSet::new();
3176                                            for pos in &state.positions {
3177                                                current_coins.insert(pos.coin.clone());
3178                                                let Some(entry_price) = pos.entry_price else {
3179                                                    warn!(
3180                                                        account = %account,
3181                                                        coin = %pos.coin,
3182                                                        "PM bridge: skipping Hydromancer position with missing entry price"
3183                                                    );
3184                                                    continue;
3185                                                };
3186                                                let update =
3187                                                    crate::portfolio::HypercorePositionUpdate {
3188                                                        account: manager.to_string(),
3189                                                        coin: pos.coin.clone(),
3190                                                        size: pos.size,
3191                                                        entry_price,
3192                                                        unrealized_pnl: pos.unrealized_pnl,
3193                                                        timestamp: state.last_updated_ms,
3194                                                        snapshot: true,
3195                                                    };
3196                                                pc.handle_hypercore_position_update(update).await;
3197                                            }
3198
3199                                            // Send HyperCore equity to engine for PM margin
3200                                            {
3201                                                let account_value =
3202                                                    rust_decimal::Decimal::from_f64_retain(
3203                                                        state.account_value,
3204                                                    )
3205                                                    .unwrap_or_default();
3206                                                let _ = equity_tx.send(
3207                                                    crate::rsm::unified_engine::HypercoreEquityRequest {
3208                                                        wallet: manager,
3209                                                        account_value,
3210                                                        timestamp_ms: state.last_updated_ms,
3211                                                    },
3212                                                ).await;
3213                                            }
3214
3215                                            // Clear positions that disappeared since last snapshot
3216                                            if let Some(old) = prev_coins.get(&account) {
3217                                                for coin in old.difference(&current_coins) {
3218                                                    let update =
3219                                                        crate::portfolio::HypercorePositionUpdate {
3220                                                            account: manager.to_string(),
3221                                                            coin: coin.clone(),
3222                                                            size: 0.0,
3223                                                            entry_price: 0.0,
3224                                                            unrealized_pnl: 0.0,
3225                                                            timestamp: state.last_updated_ms,
3226                                                            snapshot: true,
3227                                                        };
3228                                                    pc.handle_hypercore_position_update(update)
3229                                                        .await;
3230                                                }
3231                                            }
3232                                            prev_coins.insert(account, current_coins);
3233                                        }
3234                                        Ok(crate::hypercore::HydromancerEvent::Fill {
3235                                            account,
3236                                            fill,
3237                                        }) => {
3238                                            if fill.time < bridge_start_ms {
3239                                                continue;
3240                                            }
3241                                            let manager = {
3242                                                let map = a2m.read().await;
3243                                                map.get(&account).copied()
3244                                            };
3245                                            let manager = match manager {
3246                                                Some(m) => m,
3247                                                None => match chain_auth_bridge
3248                                                    .get_manager(account.inner())
3249                                                    .await
3250                                                {
3251                                                    Ok(addr)
3252                                                        if addr
3253                                                            != alloy::primitives::Address::ZERO =>
3254                                                    {
3255                                                        let mgr = WalletAddress::from(addr);
3256                                                        a2m.write().await.insert(account, mgr);
3257                                                        mgr
3258                                                    }
3259                                                    _ => continue,
3260                                                },
3261                                            };
3262                                            if let (Ok(size), Ok(price)) =
3263                                                (fill.sz.parse::<f64>(), fill.px.parse::<f64>())
3264                                            {
3265                                                if size == 0.0 || price == 0.0 {
3266                                                    continue;
3267                                                }
3268                                                let ws_fill =
3269                                                    hypercall_api::websocket::WsFillUpdate {
3270                                                        order_id: fill.oid.unwrap_or(0) as i64,
3271                                                        fill_id: fill.time as i64,
3272                                                        symbol: format!("{}-PERP", fill.coin),
3273                                                        side: if fill.side == "B" {
3274                                                            "Buy"
3275                                                        } else {
3276                                                            "Sell"
3277                                                        }
3278                                                        .to_string(),
3279                                                        price:
3280                                                            rust_decimal::Decimal::from_f64_retain(
3281                                                                price,
3282                                                            )
3283                                                            .unwrap_or_default(),
3284                                                        size:
3285                                                            rust_decimal::Decimal::from_f64_retain(
3286                                                                size,
3287                                                            )
3288                                                            .unwrap_or_default(),
3289                                                        timestamp: fill.time as i64,
3290                                                        wallet_address: manager,
3291                                                        fee: fill
3292                                                            .fee
3293                                                            .as_deref()
3294                                                            .and_then(|f| f.parse().ok())
3295                                                            .unwrap_or_default(),
3296                                                        trade_id: fill.time as i64,
3297                                                        is_taker: true,
3298                                                        builder_code_address: None,
3299                                                        builder_code_fee: None,
3300                                                        instrument_type: "perp".to_string(),
3301                                                    };
3302                                                pubsub_for_bridge.publish_fill(ws_fill);
3303
3304                                                if let Some(state) =
3305                                                    feed_ref.get_positions(&account).await
3306                                                {
3307                                                    if let Some(pos) = state
3308                                                        .positions
3309                                                        .iter()
3310                                                        .find(|p| p.coin == fill.coin)
3311                                                    {
3312                                                        let Some(entry_price) = pos.entry_price
3313                                                        else {
3314                                                            warn!(
3315                                                                account = %account,
3316                                                                coin = %pos.coin,
3317                                                                "PM bridge: skipping Hydromancer fill position update with missing entry price"
3318                                                            );
3319                                                            continue;
3320                                                        };
3321                                                        pc.handle_hypercore_position_update(
3322                                                            crate::portfolio::HypercorePositionUpdate {
3323                                                                account: manager.to_string(),
3324                                                                coin: pos.coin.clone(),
3325                                                                size: pos.size,
3326                                                                entry_price,
3327                                                                unrealized_pnl: pos.unrealized_pnl,
3328                                                                timestamp: state.last_updated_ms,
3329                                                                snapshot: false,
3330                                                            },
3331                                                        )
3332                                                        .await;
3333                                                    }
3334                                                    // Update HyperCore equity after fill
3335                                                    {
3336                                                        let account_value =
3337                                                            rust_decimal::Decimal::from_f64_retain(
3338                                                                state.account_value,
3339                                                            )
3340                                                            .unwrap_or_default();
3341                                                        let _ = equity_tx.send(
3342                                                            crate::rsm::unified_engine::HypercoreEquityRequest {
3343                                                                wallet: manager,
3344                                                                account_value,
3345                                                                timestamp_ms: state.last_updated_ms,
3346                                                            },
3347                                                        ).await;
3348                                                    }
3349                                                }
3350                                            }
3351                                        }
3352                                        Ok(crate::hypercore::HydromancerEvent::OrderUpdate {
3353                                            ..
3354                                        }) => {}
3355                                        Err(tokio::sync::broadcast::error::RecvError::Lagged(
3356                                            n,
3357                                        )) => {
3358                                            warn!(skipped = n, "Hydromancer→PM bridge lagged");
3359                                        }
3360                                        Err(_) => break,
3361                                    }
3362                                }
3363                            });
3364                            info!("✓ Hydromancer→PM position bridge started");
3365                        }
3366
3367                        info!("✓ Hydromancer feed service started");
3368                        Some(feed)
3369                    }
3370                    None => {
3371                        warn!("Hydromancer enabled but no API key configured");
3372                        None
3373                    }
3374                }
3375            } else {
3376                None
3377            }
3378        },
3379        quote_provider: quote_provider.clone(),
3380        engine_state_digest_provider: Arc::new(
3381            crate::api_boundary_impls::RuntimeEngineStateDigestProvider::new(
3382                engine_state_digest_provider.clone(),
3383            ),
3384        ),
3385        order_snapshot: order_snapshot.clone(),
3386        competition_service: competition_service.clone(),
3387        candle_source: candle_source.clone(),
3388        underlying_to_candle_coin: underlying_to_candle_coin.clone(),
3389        bbo_snapshot_reader: bbo_snapshot_service.clone(),
3390        indicative_cache: Some(indicative_cache.clone()),
3391        rfq_manager: Some(rfq_manager.clone()),
3392        username_service: username_service.clone(),
3393        push_service: push_service.clone(),
3394        standby_controller: standby_controller.clone().map(|controller| {
3395            Arc::new(controller) as Arc<dyn hypercall_api::runtime_status::StandbyPromoter>
3396        }),
3397        notification_service: Some(notification_service.clone()),
3398        standby_promote: standby_promote.clone(),
3399        standby_progress: standby_progress.clone().map(|progress| {
3400            Arc::new(progress) as Arc<dyn hypercall_api::runtime_status::StandbyReplayProgress>
3401        }),
3402        startup_progress: startup_progress.clone().map(|progress| {
3403            Arc::new(progress) as Arc<dyn hypercall_api::runtime_status::StartupProgressReader>
3404        }),
3405        recovery_safety_report: recovery_safety_report.clone(),
3406        drain_signal: drain_signal.clone(),
3407        is_draining: is_draining.clone(),
3408        shutdown: shutdown.clone(),
3409    };
3410
3411    if runtime.config.liquidation.enabled {
3412        let rsm_directive_publisher = crate::rsm_directive_publisher::RsmDirectivePublisher::new(
3413            engine_event_tx.clone(),
3414            journal_batch_sender.clone(),
3415        );
3416        let mut liquidation_executor_builder = crate::liquidator::LiquidationExecutor::new(
3417            liquidation_cache.clone(),
3418            portfolio_cache.clone(),
3419        )
3420        .with_event_sender(ws_direct_tx.clone())
3421        .with_rsm_directive_publisher(rsm_directive_publisher)
3422        .with_full_target_buffer_bps(runtime.config.liquidation.full_target_buffer_bps);
3423        if let Some(signer) = rsm_signer_service.clone() {
3424            liquidation_executor_builder = liquidation_executor_builder.with_rsm_signer(signer);
3425        }
3426        let liquidation_executor = Arc::new(liquidation_executor_builder);
3427
3428        let liquidation_watcher = Arc::new(
3429            crate::liquidator::LiquidationWatcher::new(
3430                liquidation_cache.clone(),
3431                portfolio_cache.get_service(),
3432                tier_cache.clone(),
3433                span_margin_service.clone(),
3434                standard_margin_service.clone(),
3435                crate::liquidator::LiquidationWatcherConfig::from_runtime_config(
3436                    &runtime.config.liquidation,
3437                ),
3438            )
3439            .with_risk_account_builder(risk_account_builder.clone())
3440            .with_standard_account_builder(standard_account_builder.clone())
3441            .with_quote_provider(quote_provider.clone())
3442            .with_greeks_cache(greeks_cache.clone())
3443            .with_order_snapshot(order_snapshot.clone())
3444            .with_order_sender(order_tx.clone())
3445            .with_event_sender(ws_direct_tx.clone())
3446            .with_executor(liquidation_executor),
3447        );
3448        let liquidation_observer = Arc::new(
3449            crate::liquidator::LiquidationChainObserver::new(
3450                liquidation_cache.clone(),
3451                diesel_db.clone(),
3452                liquidation_bonus_tx.clone(),
3453                Some(ws_direct_tx.clone()),
3454                event_bus.clone(),
3455                &runtime.config.transaction_submitter.rpc_url,
3456                &runtime.config.contracts.exchange_contract_address,
3457                crate::liquidator::LiquidationObserverConfig::from_runtime_config(
3458                    &runtime.config.liquidation,
3459                ),
3460            )
3461            .map_err(|error| {
3462                anyhow::anyhow!("Failed to initialize liquidation chain observer: {}", error)
3463            })?,
3464        );
3465
3466        if standby_mode {
3467            let watcher_service: Arc<dyn Service> = liquidation_watcher;
3468            let observer_service: Arc<dyn Service> = liquidation_observer;
3469            let mut services = post_promote_services.lock().await;
3470            services.push(watcher_service);
3471            services.push(observer_service);
3472            info!("Liquidation watcher and chain observer deferred until standby promotion");
3473        } else {
3474            service_registry.register(liquidation_watcher);
3475            service_registry.register(liquidation_observer);
3476            info!("✓ Liquidation watcher registered");
3477            info!("✓ Liquidation chain observer registered");
3478        }
3479    } else {
3480        info!("Liquidation runtime disabled by config");
3481    }
3482
3483    if onchain_deposits_enabled {
3484        gauge!("ht_hypercore_cash_ledger_observer_enabled").set(1.0);
3485        let cash_ledger_observer = Arc::new(
3486            crate::hypercore_cash_ledger_observer::HypercoreCashLedgerObserver::new(
3487                shared_db.clone(),
3488                deposit_tx.clone(),
3489                crate::hypercore_cash_ledger_observer::HypercoreCashLedgerObserverConfig::from_runtime_config(
3490                    &runtime.config.onchain_deposits,
3491                    runtime.config.pricing.hypercore_info_url.clone(),
3492                    hypercall_types::WalletAddress::from(
3493                        alloy::primitives::Address::from_str(
3494                            &runtime.config.contracts.exchange_contract_address,
3495                        )
3496                        .expect("exchange_contract_address already validated at startup"),
3497                    ),
3498                    hypercall_types::WalletAddress::from(
3499                        alloy::primitives::Address::from_str(
3500                            &runtime.config.contracts.core_deposit_wallet_address,
3501                        )
3502                        .expect("core_deposit_wallet_address already validated at startup"),
3503                    ),
3504                ),
3505                tier_cache.clone(),
3506            )
3507            .map_err(|error| {
3508                anyhow::anyhow!("Failed to initialize HyperCore cash ledger observer: {}", error)
3509            })?,
3510        );
3511
3512        if standby_mode {
3513            let observer_service: Arc<dyn Service> = cash_ledger_observer;
3514            let mut services = post_promote_services.lock().await;
3515            services.push(observer_service);
3516            info!("HyperCore cash ledger observer deferred until standby promotion");
3517        } else {
3518            service_registry.register(cash_ledger_observer);
3519            info!("✓ HyperCore cash ledger observer registered");
3520        }
3521
3522        gauge!("ht_rsm_deposit_credit_observer_enabled").set(1.0);
3523        let deposit_credit_observer = Arc::new(
3524            crate::rsm_deposit_credit_observer::RsmDepositCreditObserver::new(
3525                shared_db.clone(),
3526                option_deposit_tx.clone(),
3527                engine_pm_settlement_admin_tx.clone(),
3528                &runtime.config.transaction_submitter.rpc_url,
3529                &runtime.config.contracts.exchange_contract_address,
3530                &runtime.config.contracts.usdc_address,
3531                crate::rsm_deposit_credit_observer::RsmDepositCreditObserverConfig::from_runtime_config(
3532                    &runtime.config.onchain_deposits,
3533                ),
3534            )
3535            .map_err(|error| {
3536                anyhow::anyhow!("Failed to initialize RSM deposit credit observer: {}", error)
3537            })?,
3538        );
3539
3540        if standby_mode {
3541            let observer_service: Arc<dyn Service> = deposit_credit_observer;
3542            let mut services = post_promote_services.lock().await;
3543            services.push(observer_service);
3544            info!("RSM deposit credit observer deferred until standby promotion");
3545        } else {
3546            service_registry.register(deposit_credit_observer);
3547            info!("✓ RSM deposit credit observer registered");
3548        }
3549    } else {
3550        gauge!("ht_hypercore_cash_ledger_observer_enabled").set(0.0);
3551        gauge!("ht_rsm_deposit_credit_observer_enabled").set(0.0);
3552        info!(
3553            "HyperCore cash ledger and RSM deposit credit observers disabled because transaction submission is mocked"
3554        );
3555    }
3556
3557    // Create rate limit state for middleware
3558    let rate_limit_state = middleware::RateLimitState {
3559        rate_limiter: rate_limit_cache.clone(),
3560    };
3561
3562    // Create WebSocket state (after RFQ system so rfq_handler_state is available)
3563    let ws_state = WsState {
3564        pubsub: (*pubsub).clone(),
3565        indicative_cache: Some(indicative_cache.clone()),
3566        portfolio_cache: Some(portfolio_cache.clone()),
3567        competition_service: Some(competition_service.clone()),
3568        heartbeat_config: hypercall_api::websocket::ws_heartbeat_config(&runtime.config.api),
3569        rfq_handler_state: Some(rfq_handler_state.clone()),
3570        order_sender: Some(order_tx.clone()),
3571        trading_halt: Some(trading_halt.clone()),
3572        agent_auth: agent_auth.clone(),
3573        signing_chain_id,
3574    };
3575
3576    let candle_ws_poll_interval_ms = candle_ws_poll_interval_ms(&runtime.config.pricing);
3577    let candle_publisher = CandleWsPublisher::new(
3578        pubsub.clone(),
3579        candle_source.clone(),
3580        underlying_to_candle_coin.clone(),
3581        Duration::from_millis(candle_ws_poll_interval_ms),
3582    );
3583    service_registry.register(Arc::new(candle_publisher));
3584    tracing::info!(
3585        "✓ CandleWsPublisher registered (poll_interval_ms={})",
3586        candle_ws_poll_interval_ms
3587    );
3588
3589    let index_price_publisher =
3590        IndexPricePublisher::new(greeks_cache.clone(), pubsub.clone(), spot_price_notify);
3591    service_registry.register(Arc::new(index_price_publisher));
3592    tracing::info!("✓ IndexPricePublisher registered");
3593
3594    // Start MetricsCollector for business/trading metrics
3595    let metrics_collector = Arc::new(MetricsCollector::new(
3596        MetricsCollectorConfig::from_config(&runtime.config.observability.metrics),
3597        portfolio_cache.clone(),
3598        quote_provider.clone(),
3599        Some(order_snapshot.clone()),
3600        Some(tier_cache.clone()),
3601        portfolio_margin_settlement_allowlist,
3602        Some(greeks_cache.clone()),
3603        Some(market_stats_cache.clone()),
3604        Some(shared_db.clone()),
3605        Some(diesel_db.clone()),
3606        Some(diesel_db.clone()),
3607        Some(diesel_db.clone()),
3608        Some(risk_vol_oracle.clone()),
3609    ));
3610    service_registry.register(metrics_collector);
3611    tracing::info!("✓ MetricsCollector registered");
3612
3613    // Start all registered services
3614    let service_count = service_registry.len();
3615    service_registry.start_all(&shutdown, &mut task_group);
3616    info!("✓ ServiceRegistry started {} services", service_count);
3617
3618    // Setup graceful shutdown (Ctrl-C and SIGTERM listener)
3619    // This task listens for Ctrl-C or SIGTERM and also exits if shutdown is triggered elsewhere
3620    let shutdown_for_signal = shutdown.clone();
3621    let mut shutdown_rx_signal = shutdown.subscribe();
3622    task_group.spawn("SignalListener", async move {
3623        // Create a future that resolves on either SIGINT (Ctrl-C) or SIGTERM
3624        let ctrl_c = async {
3625            signal::ctrl_c()
3626                .await
3627                .expect("Failed to setup SIGINT handler");
3628            "SIGINT"
3629        };
3630
3631        #[cfg(unix)]
3632        let terminate = async {
3633            use tokio::signal::unix::{signal, SignalKind};
3634            signal(SignalKind::terminate())
3635                .expect("Failed to setup SIGTERM handler")
3636                .recv()
3637                .await;
3638            "SIGTERM"
3639        };
3640
3641        #[cfg(not(unix))]
3642        let terminate = std::future::pending::<&str>();
3643
3644        tokio::select! {
3645            sig = ctrl_c => {
3646                tracing::info!("Received {} (Ctrl-C), initiating graceful shutdown...", sig);
3647                shutdown_for_signal.trigger();
3648            }
3649            sig = terminate => {
3650                tracing::info!("Received {}, initiating graceful shutdown...", sig);
3651                shutdown_for_signal.trigger();
3652            }
3653            _ = shutdown_rx_signal.recv() => {
3654                tracing::info!("SignalListener received shutdown signal, exiting...");
3655            }
3656        }
3657        Ok(())
3658    });
3659
3660    // The complete admin surface lives in `hypercall_admin::admin_router` and is
3661    // wrapped here in the single admin auth layer.
3662    let admin_state = build_admin_state(&app_state, rfq_handler_state.qp_cache.clone());
3663    let admin_router = hypercall_admin::admin_router(admin_state.clone()).layer(
3664        axum::middleware::from_fn_with_state(
3665            admin_state,
3666            hypercall_admin::auth::monitoring_auth_middleware,
3667        ),
3668    );
3669
3670    // The live Swagger/Scalar spec must include the admin crate's paths (e.g. the
3671    // Health-tagged /recovery-safety endpoints), matching what api-spec-exporter
3672    // produces. api cannot merge admin_openapi() itself (api -> admin edge), so the
3673    // root composes the public spec here, the same way it composes the routers.
3674    let openapi_doc = hypercall_api::openapi::filter_hidden_tags({
3675        let mut doc = hypercall_api::openapi::internal_openapi();
3676        doc.merge(hypercall_admin::admin_openapi());
3677        doc
3678    });
3679
3680    let app = hypercall_api::routes::build_app(hypercall_api::routes::BuildAppParams {
3681        app_state,
3682        rate_limit_state,
3683        ws_state,
3684        qp_ws_state,
3685        rfq_handler_state,
3686        readiness: readiness.clone() as Arc<dyn hypercall_api::runtime_status::ReadinessGate>,
3687        standby_progress: standby_progress.clone().map(|progress| {
3688            Arc::new(progress) as Arc<dyn hypercall_api::runtime_status::StandbyReplayProgress>
3689        }),
3690        standby_promote: standby_promote.clone(),
3691        admin_router,
3692        openapi_doc,
3693    });
3694
3695    startup_timing!("total", startup_begin.elapsed());
3696    tracing::info!("Integrated API server listening on {}", addr);
3697
3698    if let Some(bootstrap_http) = bootstrap_http {
3699        if let Some(progress) = startup_progress.as_ref() {
3700            progress.mark("full_api_router_installed");
3701        }
3702        if let Some(task) = standby_startup_heartbeat {
3703            task.abort();
3704        }
3705        *bootstrap_http.full_app.write().await = Some(app);
3706        tracing::info!("Full API router installed behind bootstrap standby HTTP server");
3707        if let Err(error) = bootstrap_http.task.await {
3708            tracing::error!("Bootstrap standby HTTP server task failed: {}", error);
3709        }
3710    } else {
3711        let listener = listener.expect("non-standby HTTP server requires the bound listener");
3712        let mut http_shutdown_rx = shutdown.subscribe();
3713        let server = axum::serve(listener, app);
3714
3715        tokio::select! {
3716            result = server => {
3717                if let Err(e) = result {
3718                    tracing::error!("Server error: {}", e);
3719                }
3720            }
3721            _ = http_shutdown_rx.recv() => {
3722                tracing::info!("Shutting down HTTP server...");
3723            }
3724        }
3725    }
3726
3727    // Shutdown all background tasks and wait for them to complete
3728    // Note: shutdown may already be triggered (by Ctrl-C), but trigger is idempotent
3729    task_group
3730        .shutdown_and_join(&shutdown, Duration::from_secs(15))
3731        .await?;
3732
3733    tracing::info!("Server shutdown complete");
3734    Ok(())
3735}
3736
3737#[cfg(test)]
3738mod tests {
3739    use super::{build_bootstrap_http_app, BootstrapHttpState, StartupProgress};
3740    use crate::observability::command_trace::EngineStateDigest;
3741    use crate::shared::shutdown::Shutdown;
3742    use alloy::primitives::Address;
3743    use axum::body::Body;
3744    use axum::http::{Request, StatusCode};
3745    use axum::routing::get;
3746    use axum::Router;
3747    use hypercall_api::websocket::event_forwarder::ws_orderbook_update_from_engine_update;
3748    use hypercall_types::wallet_address::test_wallet;
3749    use hypercall_types::OrderbookUpdate;
3750    use hypercall_types::WalletAddress;
3751    use hypercall_types::{EngineMessage, SignedDirectiveTx, TransactionRequest, TransactionType};
3752    use rust_decimal_macros::dec;
3753    use std::str::FromStr;
3754    use std::sync::Arc;
3755    use tower::util::ServiceExt;
3756
3757    #[tokio::test]
3758    async fn bootstrap_http_serves_probe_routes_before_full_app() {
3759        let full_app = Arc::new(tokio::sync::RwLock::new(None));
3760        let standby_progress = crate::nats::replay_loop::ReplayProgress::new();
3761        let startup_progress = StartupProgress::new("test_starting");
3762        startup_progress.mark("test_hydrating");
3763        let (promote_tx, _promote_rx) = tokio::sync::oneshot::channel::<()>();
3764        let app = build_bootstrap_http_app(BootstrapHttpState {
3765            shutdown: Shutdown::new(),
3766            standby_progress: Some(standby_progress),
3767            standby_promote: Some(Arc::new(tokio::sync::Mutex::new(Some(promote_tx)))),
3768            startup_progress: Some(startup_progress.clone()),
3769            full_app: full_app.clone(),
3770        });
3771
3772        let health = app
3773            .clone()
3774            .oneshot(
3775                Request::builder()
3776                    .uri("/health")
3777                    .body(Body::empty())
3778                    .expect("valid health request"),
3779            )
3780            .await
3781            .expect("health response");
3782        assert_eq!(health.status(), StatusCode::OK);
3783
3784        let standby_ready = app
3785            .clone()
3786            .oneshot(
3787                Request::builder()
3788                    .uri("/standby-ready")
3789                    .body(Body::empty())
3790                    .expect("valid standby-ready request"),
3791            )
3792            .await
3793            .expect("standby-ready response");
3794        assert_eq!(standby_ready.status(), StatusCode::SERVICE_UNAVAILABLE);
3795        let standby_body = axum::body::to_bytes(standby_ready.into_body(), usize::MAX)
3796            .await
3797            .expect("standby-ready body");
3798        let standby_payload: serde_json::Value =
3799            serde_json::from_slice(&standby_body).expect("standby-ready json");
3800        assert_eq!(standby_payload["status"], "api_starting");
3801        assert_eq!(standby_payload["api_router_ready"], false);
3802        assert_eq!(standby_payload["promotable"], false);
3803        assert_eq!(standby_payload["startup_phase"], "test_hydrating");
3804        assert_eq!(standby_payload["startup_progress_counter"], 2);
3805        assert!(standby_payload["last_startup_progress_unix_ms"]
3806            .as_u64()
3807            .is_some());
3808        assert!(standby_payload["last_startup_progress_age_ms"]
3809            .as_u64()
3810            .is_some());
3811
3812        let fallback_while_starting = app
3813            .clone()
3814            .oneshot(
3815                Request::builder()
3816                    .uri("/delegated")
3817                    .body(Body::empty())
3818                    .expect("valid fallback request"),
3819            )
3820            .await
3821            .expect("fallback response");
3822        assert_eq!(
3823            fallback_while_starting.status(),
3824            StatusCode::SERVICE_UNAVAILABLE
3825        );
3826
3827        let caught_up_progress = crate::nats::replay_loop::ReplayProgress::new();
3828        caught_up_progress.test_update_catch_up_state(10, 10);
3829        let (caught_up_promote_tx, _caught_up_promote_rx) = tokio::sync::oneshot::channel::<()>();
3830        let caught_up_app = build_bootstrap_http_app(BootstrapHttpState {
3831            shutdown: Shutdown::new(),
3832            standby_progress: Some(caught_up_progress),
3833            standby_promote: Some(Arc::new(tokio::sync::Mutex::new(Some(
3834                caught_up_promote_tx,
3835            )))),
3836            startup_progress: Some(startup_progress),
3837            full_app: full_app.clone(),
3838        });
3839        let caught_up_before_router = caught_up_app
3840            .clone()
3841            .oneshot(
3842                Request::builder()
3843                    .uri("/standby-ready")
3844                    .body(Body::empty())
3845                    .expect("valid caught-up standby-ready request"),
3846            )
3847            .await
3848            .expect("caught-up standby-ready response");
3849        assert_eq!(
3850            caught_up_before_router.status(),
3851            StatusCode::SERVICE_UNAVAILABLE
3852        );
3853
3854        *full_app.write().await = Some(Router::new().route("/delegated", get(|| async { "ok" })));
3855        let caught_up_after_router = caught_up_app
3856            .oneshot(
3857                Request::builder()
3858                    .uri("/standby-ready")
3859                    .body(Body::empty())
3860                    .expect("valid caught-up standby-ready request after router install"),
3861            )
3862            .await
3863            .expect("caught-up standby-ready response after router install");
3864        assert_eq!(caught_up_after_router.status(), StatusCode::OK);
3865        let caught_up_body = axum::body::to_bytes(caught_up_after_router.into_body(), usize::MAX)
3866            .await
3867            .expect("caught-up standby-ready body");
3868        let caught_up_payload: serde_json::Value =
3869            serde_json::from_slice(&caught_up_body).expect("caught-up standby-ready json");
3870        assert_eq!(caught_up_payload["status"], "standby_ready");
3871        assert_eq!(caught_up_payload["startup_phase"], "test_hydrating");
3872        assert_eq!(caught_up_payload["startup_progress_counter"], 2);
3873
3874        let delegated = app
3875            .oneshot(
3876                Request::builder()
3877                    .uri("/delegated")
3878                    .body(Body::empty())
3879                    .expect("valid delegated request"),
3880            )
3881            .await
3882            .expect("delegated response");
3883        assert_eq!(delegated.status(), StatusCode::OK);
3884    }
3885
3886    #[test]
3887    fn test_ws_orderbook_update_converts_contract_units_to_human_contracts() {
3888        let orderbook_update_contract_units_raw = OrderbookUpdate {
3889            symbol: "BTC-20260331-100000-C".to_string(),
3890            bids: vec![(dec!(100), dec!(1_000_000)), (dec!(99), dec!(2_500_000))],
3891            asks: vec![(dec!(101), dec!(500_000)), (dec!(102), dec!(3_000_000))],
3892            timestamp: 1_750_000_000_000_u64,
3893        };
3894
3895        let ws_update = ws_orderbook_update_from_engine_update(
3896            &orderbook_update_contract_units_raw,
3897            Some(test_wallet(7)),
3898        );
3899
3900        assert_eq!(ws_update.bids[0].1, dec!(1));
3901        assert_eq!(ws_update.bids[1].1, dec!(2.5));
3902        assert_eq!(ws_update.asks[0].1, dec!(0.5));
3903        assert_eq!(ws_update.asks[1].1, dec!(3));
3904    }
3905
3906    #[test]
3907    fn test_disabled_wal_startup_provides_mock_journal_writer_for_directives() {
3908        let (engine_journal_writer, journal_batch_sender) = crate::journal::mock_journal_backends();
3909        assert!(
3910            journal_batch_sender.is_none(),
3911            "disabled WAL mode should not configure a batch sender"
3912        );
3913
3914        let journal_writer = engine_journal_writer
3915            .expect("disabled WAL mode should inject a mock writer for directive submits");
3916        let wallet = WalletAddress::from(
3917            Address::from_str("0x1111111111111111111111111111111111111111").expect("valid wallet"),
3918        );
3919        let request_id = uuid::Uuid::now_v7().to_string();
3920        let tx_request = TransactionRequest {
3921            request_id: request_id.clone(),
3922            wallet_address: wallet,
3923            account_contract: wallet,
3924            transaction_type: TransactionType::UserDirective(SignedDirectiveTx {
3925                directive: vec![1, 2, 3],
3926                signature: "0xabcdef".to_string(),
3927            }),
3928            timestamp: 123,
3929            expires_at: 456,
3930        };
3931        let result = journal_writer.append_transition(
3932            tx_request.timestamp,
3933            &hypercall_types::serialize_to_wire_bytes(&serde_json::json!({
3934                "account": wallet,
3935                "actionKey": "hc_update_api_wallet",
3936                "nonce": 42,
3937                "recoveredSigner": wallet,
3938            })),
3939            None,
3940            None,
3941            &EngineStateDigest::default(),
3942            &EngineStateDigest::default(),
3943            0,
3944            &[EngineMessage::TransactionRequest(tx_request)],
3945            hypercall_db_diesel::engine_enums::DbUuid(
3946                uuid::Uuid::parse_str(&request_id).expect("request id should be a UUID"),
3947            ),
3948            None,
3949        );
3950
3951        assert!(
3952            result.is_ok(),
3953            "disabled WAL startup writer should be usable for directive journaling"
3954        );
3955    }
3956}