1use std::path::PathBuf;
2use std::str::FromStr;
3use std::sync::Arc;
4use std::time::Duration;
5
6use axum::http::StatusCode;
7use rust_decimal::Decimal;
8use tokio::sync::{mpsc, RwLock};
9use tracing::error;
10
11use super::caches::options_summary::BboReferenceAskReader;
12use super::observability_boundary::{AuthFailureRecorder, BuildInfo, MetricsRenderer};
13use super::runtime_status::{
14 ReadinessGate, StandbyPromoter, StandbyReplayProgress, StartupProgressReader,
15};
16use super::trading_halt::TradingHaltState;
17use crate::boundary::engine::{
18 EngineJournalReader, EngineStateDigestProvider, TransactionRequestJournal,
19};
20use crate::boundary::market_inputs::{
21 GreeksCacheReader, InstrumentsCacheReader, MarketStatsCacheReader,
22};
23use crate::boundary::read_models::{
24 BalanceProvider, EngineBalanceSnapshotProvider, MmpCacheApi, PortfolioCacheApi, TierCacheApi,
25};
26use alloy::primitives::Address;
27use hypercall_runtime_api::{MarketRequest, TierUpdateRequest};
28use hypercall_types::WalletAddress;
29
30pub(crate) const ENGINE_RESPONSE_TIMEOUT: Duration = Duration::from_secs(10);
31const EXCHANGE_POOL_LIQUIDITY_NOT_CONFIGURED: &str =
32 "exchange pool liquidity check is not configured";
33const EXCHANGE_POOL_LIQUIDITY_VALIDATION_FAILED: &str =
34 "failed to validate exchange pool liquidity";
35
36#[derive(Clone)]
37pub struct AppRuntimeConfig {
38 pub testnet_mode: bool,
39 pub trade_explorer_url_template: Option<String>,
40 pub wal_path: PathBuf,
41 pub db_host: String,
42 pub db_name: String,
43 pub directive_chain_id: u64,
44 pub signing_chain_id: u64,
47 pub exchange_contract_address: String,
48 pub portfolio_margin_pool_enabled: bool,
49 pub portfolio_margin_mode_allowlist: Vec<WalletAddress>,
50 pub portfolio_margin_settlement_allowlist: Vec<WalletAddress>,
51 #[cfg(feature = "rsm-state")]
52 pub rsm_environment: hypercall_db::ValidatorRsmEnvironment,
53}
54
55#[derive(Debug, Clone)]
56pub struct DirectiveHydromancerFill {
57 pub coin: String,
58 pub side: String,
59 pub px: String,
60 pub sz: String,
61 pub time: u64,
62}
63
64#[async_trait::async_trait]
65pub trait DirectiveHydromancerFeed: Send + Sync {
66 async fn add_account(&self, account: WalletAddress);
67 async fn wait_for_fills(
68 &self,
69 account: &WalletAddress,
70 timeout: Duration,
71 ) -> Vec<DirectiveHydromancerFill>;
72}
73
74#[async_trait::async_trait]
75pub trait ExchangePoolLiquidityReader: Send + Sync {
76 async fn usdc_pool_balance(&self) -> Result<Decimal, crate::error::ApiError>;
77}
78
79pub struct HypercoreExchangePoolLiquidityReader {
80 info_url: Option<String>,
81 exchange_address: Option<Address>,
82}
83
84impl HypercoreExchangePoolLiquidityReader {
85 pub fn new(info_url: Option<String>, exchange_address: Option<Address>) -> Self {
86 Self {
87 info_url,
88 exchange_address,
89 }
90 }
91}
92
93#[async_trait::async_trait]
94impl ExchangePoolLiquidityReader for HypercoreExchangePoolLiquidityReader {
95 async fn usdc_pool_balance(&self) -> Result<Decimal, crate::error::ApiError> {
96 let info_url = self.info_url.as_ref().ok_or_else(|| {
97 crate::error::ApiError::new(
98 StatusCode::SERVICE_UNAVAILABLE,
99 "service_unavailable",
100 EXCHANGE_POOL_LIQUIDITY_NOT_CONFIGURED,
101 )
102 })?;
103 let exchange_address = self.exchange_address.ok_or_else(|| {
104 crate::error::ApiError::new(
105 StatusCode::SERVICE_UNAVAILABLE,
106 "service_unavailable",
107 EXCHANGE_POOL_LIQUIDITY_NOT_CONFIGURED,
108 )
109 })?;
110
111 let client = reqwest::Client::builder()
112 .timeout(Duration::from_secs(5))
113 .build()
114 .map_err(|error| {
115 error!("Failed to build HyperCore info client: {}", error);
116 crate::error::ApiError::internal_error(EXCHANGE_POOL_LIQUIDITY_VALIDATION_FAILED)
117 })?;
118 let response = client
119 .post(info_url)
120 .json(&serde_json::json!({
121 "type": "clearinghouseState",
122 "user": exchange_address.to_string(),
123 }))
124 .send()
125 .await
126 .map_err(|error| {
127 error!(
128 "Failed to query Exchange withdrawable balance for withdrawal liquidity: {}",
129 error
130 );
131 crate::error::ApiError::internal_error(EXCHANGE_POOL_LIQUIDITY_VALIDATION_FAILED)
132 })?;
133 if !response.status().is_success() {
134 error!(
135 status = %response.status(),
136 "HyperCore info returned non-success status for Exchange pool liquidity check"
137 );
138 return Err(crate::error::ApiError::internal_error(
139 EXCHANGE_POOL_LIQUIDITY_VALIDATION_FAILED,
140 ));
141 }
142
143 let json = response
144 .json::<serde_json::Value>()
145 .await
146 .map_err(|error| {
147 error!(
148 "Failed to decode Exchange withdrawable balance for withdrawal liquidity: {}",
149 error
150 );
151 crate::error::ApiError::internal_error(EXCHANGE_POOL_LIQUIDITY_VALIDATION_FAILED)
152 })?;
153
154 parse_hypercore_withdrawable_balance(&json).map_err(|error| {
155 error!(
156 "Failed to parse Exchange USDC pool balance for withdrawal liquidity: {}",
157 error
158 );
159 crate::error::ApiError::internal_error(EXCHANGE_POOL_LIQUIDITY_VALIDATION_FAILED)
160 })
161 }
162}
163
164pub struct UnavailableExchangePoolLiquidityReader;
165
166#[async_trait::async_trait]
167impl ExchangePoolLiquidityReader for UnavailableExchangePoolLiquidityReader {
168 async fn usdc_pool_balance(&self) -> Result<Decimal, crate::error::ApiError> {
169 Err(crate::error::ApiError::new(
170 StatusCode::SERVICE_UNAVAILABLE,
171 "service_unavailable",
172 EXCHANGE_POOL_LIQUIDITY_NOT_CONFIGURED,
173 ))
174 }
175}
176
177pub(crate) fn parse_hypercore_withdrawable_balance(
178 json: &serde_json::Value,
179) -> anyhow::Result<Decimal> {
180 let withdrawable = json
181 .get("withdrawable")
182 .and_then(|withdrawable| withdrawable.as_str())
183 .ok_or_else(|| anyhow::anyhow!("clearinghouseState response missing withdrawable"))?;
184
185 Decimal::from_str(withdrawable)
186 .map_err(|error| anyhow::anyhow!("invalid withdrawable '{}': {}", withdrawable, error))
187}
188
189pub use hypercall_runtime_api::{ApiAsyncDb, ApiRsmStateDb, ApiSyncDb};
190
191#[derive(Clone)]
192pub struct AppState {
193 pub db: Arc<dyn ApiAsyncDb>,
194 pub order_sender: mpsc::Sender<hypercall_runtime_api::UnifiedEngineRequest>,
195 pub market_sender: mpsc::Sender<MarketRequest>,
196 pub engine_quiesce_sender: mpsc::Sender<hypercall_runtime_api::EngineQuiesceRequest>,
197 pub margin_mode_sender: mpsc::Sender<hypercall_runtime_api::MarginModeUpdateRequest>,
198 pub agent_auth_sender: mpsc::Sender<hypercall_runtime_api::AgentAuthRequest>,
199 pub agent_auth: Arc<dyn hypercall_runtime_api::AgentAuthProvider>,
200 pub auth_failure_recorder: Arc<dyn AuthFailureRecorder>,
201 pub metrics_renderer: Arc<dyn MetricsRenderer>,
202 pub greeks_cache: Arc<dyn GreeksCacheReader>,
203 pub portfolio_cache: Arc<dyn PortfolioCacheApi>,
204 pub instruments_cache: Arc<dyn InstrumentsCacheReader>,
205 pub market_stats_cache: Arc<dyn MarketStatsCacheReader>,
206 pub markets_snapshot_cache: Arc<crate::caches::MarketsSnapshotCache>,
207 pub instruments_snapshot_cache: Arc<crate::caches::InstrumentsSnapshotCache>,
208 pub options_summary_snapshot_cache: Arc<crate::caches::OptionsSummarySnapshotCache>,
209 pub event_bus_sender: mpsc::UnboundedSender<hypercall_types::EngineMessage>,
210 pub chain_auth: Arc<dyn crate::directives::onchain::ChainAuthReader>,
211 pub exchange_address: Option<Address>,
212 pub gas_provider_service: Arc<crate::gas_provider::GasProviderService>,
213 pub transaction_request_journal: Option<Arc<dyn TransactionRequestJournal>>,
214 pub mmp_cache: Arc<dyn MmpCacheApi>,
215 pub tier_cache: Arc<dyn TierCacheApi>,
216 pub risk_vol_oracle: hypercall_vol_oracle::SharedVolOracle,
217 pub readiness: Arc<dyn ReadinessGate>,
218 pub sync_db: Option<Arc<dyn ApiSyncDb>>,
219 pub rate_limit_cache: Arc<crate::caches::rate_limit::RateLimitCache>,
220 pub balance_provider: Arc<dyn BalanceProvider>,
221 pub balance_snapshot_provider: Arc<dyn EngineBalanceSnapshotProvider>,
222 pub deposit_sender: Option<mpsc::Sender<hypercall_runtime_api::DepositRequest>>,
223 pub option_deposit_sender: Option<mpsc::Sender<hypercall_runtime_api::OptionDepositRequest>>,
224 pub option_withdrawal_sender:
225 Option<mpsc::Sender<hypercall_runtime_api::OptionWithdrawalRequest>>,
226 pub cash_withdrawal_sender: Option<mpsc::Sender<hypercall_runtime_api::CashWithdrawalRequest>>,
227 pub tier_update_sender: Option<mpsc::Sender<TierUpdateRequest>>,
228 pub pm_settlement_admin_sender:
229 Option<mpsc::Sender<hypercall_runtime_api::PmSettlementAdminRequest>>,
230 pub engine_journal_reader: Option<Arc<dyn EngineJournalReader>>,
231 pub runtime_config: Arc<AppRuntimeConfig>,
232 pub build_info: BuildInfo,
233 pub collateral_registry: Arc<catalog_manager::CollateralRegistry>,
234 pub admin_api_key: Option<String>,
235 pub allow_unauthenticated_monitoring: bool,
239 pub boot_id: String,
240 pub server_started_at: chrono::DateTime<chrono::Utc>,
241 pub trading_halt: Arc<RwLock<TradingHaltState>>,
242 pub rsm_signer: Option<Arc<dyn hypercall_signer::RsmSigner>>,
243 pub rsm_signer_address: Option<hypercall_types::WalletAddress>,
244 pub hl_info_url: Option<String>,
245 pub exchange_pool_liquidity_reader: Arc<dyn ExchangePoolLiquidityReader>,
246 pub hydromancer_feed: Option<Arc<dyn DirectiveHydromancerFeed>>,
247 pub quote_provider: Arc<dyn hypercall_runtime_api::QuoteProvider>,
248 pub engine_state_digest_provider: Arc<dyn EngineStateDigestProvider>,
249 pub order_snapshot: Arc<dyn hypercall_runtime_api::OrderSnapshotProvider>,
250 pub competition_service: Arc<hypercall_competition::CompetitionService>,
251 pub candle_source: Arc<dyn crate::candles::UnderlyingCandleSource>,
252 pub underlying_to_candle_coin: Arc<std::collections::HashMap<String, String>>,
253 pub bbo_snapshot_reader: Arc<dyn BboReferenceAskReader>,
254 pub indicative_cache: Option<Arc<crate::rfq::indicative_quote_cache::IndicativeQuoteCache>>,
255 pub rfq_manager: Option<Arc<crate::rfq::rfq_manager::RfqManager>>,
256 pub username_service: Arc<crate::username_service::UsernameService>,
257 pub push_service: Option<Arc<crate::push_service::PushNotificationService>>,
258 pub standby_controller: Option<Arc<dyn StandbyPromoter>>,
259 pub notification_service: Option<Arc<crate::notification_service::NotificationService>>,
260 pub standby_promote: Option<Arc<tokio::sync::Mutex<Option<tokio::sync::oneshot::Sender<()>>>>>,
261 pub standby_progress: Option<Arc<dyn StandbyReplayProgress>>,
262 pub startup_progress: Option<Arc<dyn StartupProgressReader>>,
263 pub recovery_safety_report: crate::recovery_safety::SharedRecoverySafetyReport,
264 pub drain_signal: Arc<tokio::sync::Notify>,
265 pub is_draining: Arc<std::sync::atomic::AtomicBool>,
266 pub shutdown: hypercall_runtime_api::Shutdown,
267}
268
269pub async fn submit_tier_update_command(
270 state: &AppState,
271 wallet: hypercall_types::WalletAddress,
272) -> Result<(), String> {
273 let sender = state
274 .tier_update_sender
275 .as_ref()
276 .ok_or_else(|| "tier update engine channel is not configured".to_string())?;
277
278 let tier = state
279 .tier_cache
280 .get_tier(&wallet)
281 .await
282 .map(|tier| tier.tier)
283 .ok_or_else(|| format!("missing tier for tier update: wallet={wallet}"))?;
284 let margin_mode = state
285 .tier_cache
286 .get_margin_mode(&wallet)
287 .await
288 .map_err(|e| format!("failed to get margin mode for tier update: {}", e))?;
289 let trading_limits = state.tier_cache.get_trading_limits_async(&wallet).await;
290 let (tx, rx) = tokio::sync::oneshot::channel();
291
292 sender
293 .send(TierUpdateRequest {
294 wallet,
295 margin_mode,
296 tier,
297 trading_limits,
298 timestamp_ms: hypercall_types::utils::get_timestamp_millis(),
299 applied_tx: Some(tx),
300 })
301 .await
302 .map_err(|_| "tier update engine channel closed".to_string())?;
303
304 tokio::time::timeout(ENGINE_RESPONSE_TIMEOUT, rx)
305 .await
306 .map_err(|_| "tier update apply acknowledgement timed out".to_string())?
307 .map_err(|_| "tier update apply acknowledgement dropped".to_string())?
308}