Skip to main content

hypercall/startup/
rfq.rs

1//! Startup wiring for RFQ services.
2
3use std::sync::Arc;
4
5use tokio::sync::mpsc;
6use tracing::info;
7
8use crate::read_cache::instruments_registry::InstrumentsCache;
9use crate::rsm::unified_engine::NonceCheckRequest;
10use crate::shared::shutdown::Shutdown;
11use hypercall_api::rfq::handler_state::RfqHandlerState;
12use hypercall_api::rfq::qp_sessions::QpSessionManager;
13use hypercall_api::rfq::qp_ws_state::{QpWsState, RfqWebsocketPublisher};
14use hypercall_api::rfq::quote_provider_cache::QuoteProviderCache;
15use hypercall_runtime_api::AgentAuthProvider;
16
17pub struct RfqStartupResources {
18    pub rfq_manager: Arc<hypercall_api::rfq::rfq_manager::RfqManager>,
19    pub rfq_handler_state: RfqHandlerState,
20    pub qp_ws_state: QpWsState,
21}
22
23pub async fn build_rfq_resources(
24    shared_db: Arc<hypercall_db_diesel::DatabaseHandler>,
25    rfq_execute_tx: mpsc::Sender<hypercall_runtime_api::RfqExecuteRequest>,
26    shutdown: &Shutdown,
27    indicative_cache: Arc<hypercall_api::rfq::indicative_quote_cache::IndicativeQuoteCache>,
28    agent_auth: Arc<dyn AgentAuthProvider>,
29    signing_chain_id: u64,
30    instruments_cache: Arc<InstrumentsCache>,
31    nonce_check_sender: mpsc::Sender<NonceCheckRequest>,
32    websocket_publisher: Option<Arc<dyn RfqWebsocketPublisher>>,
33) -> anyhow::Result<RfqStartupResources> {
34    let qp_cache = Arc::new(QuoteProviderCache::new(shared_db.clone()));
35    qp_cache
36        .load_from_db()
37        .await
38        .map_err(|e| anyhow::anyhow!("Failed to load quote providers from DB: {}", e))?;
39
40    let qp_session_manager = Arc::new(QpSessionManager::new());
41    let mut rfq_manager_inner = hypercall_api::rfq::rfq_manager::RfqManager::new(
42        qp_cache.clone(),
43        qp_session_manager.clone(),
44        rfq_execute_tx,
45        hypercall_api::rfq::rfq_manager::RfqConfig::default(),
46    );
47    rfq_manager_inner.set_db(shared_db);
48    let rfq_manager = Arc::new(rfq_manager_inner);
49
50    rfq_manager.start_cleanup_task(shutdown.subscribe());
51    indicative_cache.start_ttl_eviction(shutdown.subscribe());
52
53    let rfq_handler_state = RfqHandlerState {
54        rfq_manager: rfq_manager.clone(),
55        qp_cache: qp_cache.clone(),
56        agent_auth,
57        signing_chain_id,
58    };
59    let qp_ws_state = QpWsState {
60        qp_cache,
61        indicative_cache,
62        instruments_cache,
63        session_manager: qp_session_manager,
64        rfq_manager: Some(rfq_manager.clone()),
65        nonce_check_sender,
66        pubsub: websocket_publisher,
67        signing_chain_id,
68        gateway_resume_token: std::env::var("WS_GATEWAY_INTERNAL_TOKEN")
69            .ok()
70            .map(|value| value.trim().to_string())
71            .filter(|value| !value.is_empty()),
72    };
73
74    info!("✓ RFQ system initialized");
75
76    Ok(RfqStartupResources {
77        rfq_manager,
78        rfq_handler_state,
79        qp_ws_state,
80    })
81}