1use std::sync::Arc;
4
5use tokio::sync::mpsc;
6use tracing::info;
7
8use crate::read_cache::instruments_registry::InstrumentsCache;
9use crate::rsm::unified_engine::NonceCheckRequest;
10use crate::shared::shutdown::Shutdown;
11use hypercall_api::rfq::handler_state::RfqHandlerState;
12use hypercall_api::rfq::qp_sessions::QpSessionManager;
13use hypercall_api::rfq::qp_ws_state::{QpWsState, RfqWebsocketPublisher};
14use hypercall_api::rfq::quote_provider_cache::QuoteProviderCache;
15use hypercall_runtime_api::AgentAuthProvider;
16
17pub struct RfqStartupResources {
18 pub rfq_manager: Arc<hypercall_api::rfq::rfq_manager::RfqManager>,
19 pub rfq_handler_state: RfqHandlerState,
20 pub qp_ws_state: QpWsState,
21}
22
23pub async fn build_rfq_resources(
24 shared_db: Arc<hypercall_db_diesel::DatabaseHandler>,
25 rfq_execute_tx: mpsc::Sender<hypercall_runtime_api::RfqExecuteRequest>,
26 shutdown: &Shutdown,
27 indicative_cache: Arc<hypercall_api::rfq::indicative_quote_cache::IndicativeQuoteCache>,
28 agent_auth: Arc<dyn AgentAuthProvider>,
29 signing_chain_id: u64,
30 instruments_cache: Arc<InstrumentsCache>,
31 nonce_check_sender: mpsc::Sender<NonceCheckRequest>,
32 websocket_publisher: Option<Arc<dyn RfqWebsocketPublisher>>,
33) -> anyhow::Result<RfqStartupResources> {
34 let qp_cache = Arc::new(QuoteProviderCache::new(shared_db.clone()));
35 qp_cache
36 .load_from_db()
37 .await
38 .map_err(|e| anyhow::anyhow!("Failed to load quote providers from DB: {}", e))?;
39
40 let qp_session_manager = Arc::new(QpSessionManager::new());
41 let mut rfq_manager_inner = hypercall_api::rfq::rfq_manager::RfqManager::new(
42 qp_cache.clone(),
43 qp_session_manager.clone(),
44 rfq_execute_tx,
45 hypercall_api::rfq::rfq_manager::RfqConfig::default(),
46 );
47 rfq_manager_inner.set_db(shared_db);
48 let rfq_manager = Arc::new(rfq_manager_inner);
49
50 rfq_manager.start_cleanup_task(shutdown.subscribe());
51 indicative_cache.start_ttl_eviction(shutdown.subscribe());
52
53 let rfq_handler_state = RfqHandlerState {
54 rfq_manager: rfq_manager.clone(),
55 qp_cache: qp_cache.clone(),
56 agent_auth,
57 signing_chain_id,
58 };
59 let qp_ws_state = QpWsState {
60 qp_cache,
61 indicative_cache,
62 instruments_cache,
63 session_manager: qp_session_manager,
64 rfq_manager: Some(rfq_manager.clone()),
65 nonce_check_sender,
66 pubsub: websocket_publisher,
67 signing_chain_id,
68 gateway_resume_token: std::env::var("WS_GATEWAY_INTERNAL_TOKEN")
69 .ok()
70 .map(|value| value.trim().to_string())
71 .filter(|value| !value.is_empty()),
72 };
73
74 info!("✓ RFQ system initialized");
75
76 Ok(RfqStartupResources {
77 rfq_manager,
78 rfq_handler_state,
79 qp_ws_state,
80 })
81}