Skip to main content

hypercall_api/
state.rs

1use std::path::PathBuf;
2use std::str::FromStr;
3use std::sync::Arc;
4use std::time::Duration;
5
6use axum::http::StatusCode;
7use rust_decimal::Decimal;
8use tokio::sync::{mpsc, RwLock};
9use tracing::error;
10
11use super::caches::options_summary::BboReferenceAskReader;
12use super::observability_boundary::{AuthFailureRecorder, BuildInfo, MetricsRenderer};
13use super::runtime_status::{
14    ReadinessGate, StandbyPromoter, StandbyReplayProgress, StartupProgressReader,
15};
16use super::trading_halt::TradingHaltState;
17use crate::boundary::engine::{
18    EngineJournalReader, EngineStateDigestProvider, TransactionRequestJournal,
19};
20use crate::boundary::market_inputs::{
21    GreeksCacheReader, InstrumentsCacheReader, MarketStatsCacheReader,
22};
23use crate::boundary::read_models::{
24    BalanceProvider, EngineBalanceSnapshotProvider, MmpCacheApi, PortfolioCacheApi, TierCacheApi,
25};
26use alloy::primitives::Address;
27use hypercall_runtime_api::{MarketRequest, TierUpdateRequest};
28use hypercall_types::WalletAddress;
29
30pub(crate) const ENGINE_RESPONSE_TIMEOUT: Duration = Duration::from_secs(10);
31const EXCHANGE_POOL_LIQUIDITY_NOT_CONFIGURED: &str =
32    "exchange pool liquidity check is not configured";
33const EXCHANGE_POOL_LIQUIDITY_VALIDATION_FAILED: &str =
34    "failed to validate exchange pool liquidity";
35
36#[derive(Clone)]
37pub struct AppRuntimeConfig {
38    pub testnet_mode: bool,
39    pub trade_explorer_url_template: Option<String>,
40    pub wal_path: PathBuf,
41    pub db_host: String,
42    pub db_name: String,
43    pub directive_chain_id: u64,
44    /// Chain ID used for EIP-712 option order signing domain.
45    /// Testnet = 998, mainnet = 999.
46    pub signing_chain_id: u64,
47    pub exchange_contract_address: String,
48    pub portfolio_margin_pool_enabled: bool,
49    pub portfolio_margin_mode_allowlist: Vec<WalletAddress>,
50    pub portfolio_margin_settlement_allowlist: Vec<WalletAddress>,
51    #[cfg(feature = "rsm-state")]
52    pub rsm_environment: hypercall_db::ValidatorRsmEnvironment,
53}
54
55#[derive(Debug, Clone)]
56pub struct DirectiveHydromancerFill {
57    pub coin: String,
58    pub side: String,
59    pub px: String,
60    pub sz: String,
61    pub time: u64,
62}
63
64#[async_trait::async_trait]
65pub trait DirectiveHydromancerFeed: Send + Sync {
66    async fn add_account(&self, account: WalletAddress);
67    async fn wait_for_fills(
68        &self,
69        account: &WalletAddress,
70        timeout: Duration,
71    ) -> Vec<DirectiveHydromancerFill>;
72}
73
74#[async_trait::async_trait]
75pub trait ExchangePoolLiquidityReader: Send + Sync {
76    async fn usdc_pool_balance(&self) -> Result<Decimal, crate::error::ApiError>;
77}
78
79pub struct HypercoreExchangePoolLiquidityReader {
80    info_url: Option<String>,
81    exchange_address: Option<Address>,
82}
83
84impl HypercoreExchangePoolLiquidityReader {
85    pub fn new(info_url: Option<String>, exchange_address: Option<Address>) -> Self {
86        Self {
87            info_url,
88            exchange_address,
89        }
90    }
91}
92
93#[async_trait::async_trait]
94impl ExchangePoolLiquidityReader for HypercoreExchangePoolLiquidityReader {
95    async fn usdc_pool_balance(&self) -> Result<Decimal, crate::error::ApiError> {
96        let info_url = self.info_url.as_ref().ok_or_else(|| {
97            crate::error::ApiError::new(
98                StatusCode::SERVICE_UNAVAILABLE,
99                "service_unavailable",
100                EXCHANGE_POOL_LIQUIDITY_NOT_CONFIGURED,
101            )
102        })?;
103        let exchange_address = self.exchange_address.ok_or_else(|| {
104            crate::error::ApiError::new(
105                StatusCode::SERVICE_UNAVAILABLE,
106                "service_unavailable",
107                EXCHANGE_POOL_LIQUIDITY_NOT_CONFIGURED,
108            )
109        })?;
110
111        let client = reqwest::Client::builder()
112            .timeout(Duration::from_secs(5))
113            .build()
114            .map_err(|error| {
115                error!("Failed to build HyperCore info client: {}", error);
116                crate::error::ApiError::internal_error(EXCHANGE_POOL_LIQUIDITY_VALIDATION_FAILED)
117            })?;
118        let response = client
119            .post(info_url)
120            .json(&serde_json::json!({
121                "type": "clearinghouseState",
122                "user": exchange_address.to_string(),
123            }))
124            .send()
125            .await
126            .map_err(|error| {
127                error!(
128                    "Failed to query Exchange withdrawable balance for withdrawal liquidity: {}",
129                    error
130                );
131                crate::error::ApiError::internal_error(EXCHANGE_POOL_LIQUIDITY_VALIDATION_FAILED)
132            })?;
133        if !response.status().is_success() {
134            error!(
135                status = %response.status(),
136                "HyperCore info returned non-success status for Exchange pool liquidity check"
137            );
138            return Err(crate::error::ApiError::internal_error(
139                EXCHANGE_POOL_LIQUIDITY_VALIDATION_FAILED,
140            ));
141        }
142
143        let json = response
144            .json::<serde_json::Value>()
145            .await
146            .map_err(|error| {
147                error!(
148                    "Failed to decode Exchange withdrawable balance for withdrawal liquidity: {}",
149                    error
150                );
151                crate::error::ApiError::internal_error(EXCHANGE_POOL_LIQUIDITY_VALIDATION_FAILED)
152            })?;
153
154        parse_hypercore_withdrawable_balance(&json).map_err(|error| {
155            error!(
156                "Failed to parse Exchange USDC pool balance for withdrawal liquidity: {}",
157                error
158            );
159            crate::error::ApiError::internal_error(EXCHANGE_POOL_LIQUIDITY_VALIDATION_FAILED)
160        })
161    }
162}
163
164pub struct UnavailableExchangePoolLiquidityReader;
165
166#[async_trait::async_trait]
167impl ExchangePoolLiquidityReader for UnavailableExchangePoolLiquidityReader {
168    async fn usdc_pool_balance(&self) -> Result<Decimal, crate::error::ApiError> {
169        Err(crate::error::ApiError::new(
170            StatusCode::SERVICE_UNAVAILABLE,
171            "service_unavailable",
172            EXCHANGE_POOL_LIQUIDITY_NOT_CONFIGURED,
173        ))
174    }
175}
176
177pub(crate) fn parse_hypercore_withdrawable_balance(
178    json: &serde_json::Value,
179) -> anyhow::Result<Decimal> {
180    let withdrawable = json
181        .get("withdrawable")
182        .and_then(|withdrawable| withdrawable.as_str())
183        .ok_or_else(|| anyhow::anyhow!("clearinghouseState response missing withdrawable"))?;
184
185    Decimal::from_str(withdrawable)
186        .map_err(|error| anyhow::anyhow!("invalid withdrawable '{}': {}", withdrawable, error))
187}
188
189pub use hypercall_runtime_api::{ApiAsyncDb, ApiRsmStateDb, ApiSyncDb};
190
191#[derive(Clone)]
192pub struct AppState {
193    pub db: Arc<dyn ApiAsyncDb>,
194    pub order_sender: mpsc::Sender<hypercall_runtime_api::UnifiedEngineRequest>,
195    pub market_sender: mpsc::Sender<MarketRequest>,
196    pub engine_quiesce_sender: mpsc::Sender<hypercall_runtime_api::EngineQuiesceRequest>,
197    pub margin_mode_sender: mpsc::Sender<hypercall_runtime_api::MarginModeUpdateRequest>,
198    pub agent_auth_sender: mpsc::Sender<hypercall_runtime_api::AgentAuthRequest>,
199    pub agent_auth: Arc<dyn hypercall_runtime_api::AgentAuthProvider>,
200    pub auth_failure_recorder: Arc<dyn AuthFailureRecorder>,
201    pub metrics_renderer: Arc<dyn MetricsRenderer>,
202    pub greeks_cache: Arc<dyn GreeksCacheReader>,
203    pub portfolio_cache: Arc<dyn PortfolioCacheApi>,
204    pub instruments_cache: Arc<dyn InstrumentsCacheReader>,
205    pub market_stats_cache: Arc<dyn MarketStatsCacheReader>,
206    pub markets_snapshot_cache: Arc<crate::caches::MarketsSnapshotCache>,
207    pub instruments_snapshot_cache: Arc<crate::caches::InstrumentsSnapshotCache>,
208    pub options_summary_snapshot_cache: Arc<crate::caches::OptionsSummarySnapshotCache>,
209    pub event_bus_sender: mpsc::UnboundedSender<hypercall_types::EngineMessage>,
210    pub chain_auth: Arc<dyn crate::directives::onchain::ChainAuthReader>,
211    pub exchange_address: Option<Address>,
212    pub gas_provider_service: Arc<crate::gas_provider::GasProviderService>,
213    pub transaction_request_journal: Option<Arc<dyn TransactionRequestJournal>>,
214    pub mmp_cache: Arc<dyn MmpCacheApi>,
215    pub tier_cache: Arc<dyn TierCacheApi>,
216    pub risk_vol_oracle: hypercall_vol_oracle::SharedVolOracle,
217    pub readiness: Arc<dyn ReadinessGate>,
218    pub sync_db: Option<Arc<dyn ApiSyncDb>>,
219    pub rate_limit_cache: Arc<crate::caches::rate_limit::RateLimitCache>,
220    pub balance_provider: Arc<dyn BalanceProvider>,
221    pub balance_snapshot_provider: Arc<dyn EngineBalanceSnapshotProvider>,
222    pub deposit_sender: Option<mpsc::Sender<hypercall_runtime_api::DepositRequest>>,
223    pub option_deposit_sender: Option<mpsc::Sender<hypercall_runtime_api::OptionDepositRequest>>,
224    pub option_withdrawal_sender:
225        Option<mpsc::Sender<hypercall_runtime_api::OptionWithdrawalRequest>>,
226    pub cash_withdrawal_sender: Option<mpsc::Sender<hypercall_runtime_api::CashWithdrawalRequest>>,
227    pub tier_update_sender: Option<mpsc::Sender<TierUpdateRequest>>,
228    pub pm_settlement_admin_sender:
229        Option<mpsc::Sender<hypercall_runtime_api::PmSettlementAdminRequest>>,
230    pub engine_journal_reader: Option<Arc<dyn EngineJournalReader>>,
231    pub runtime_config: Arc<AppRuntimeConfig>,
232    pub build_info: BuildInfo,
233    pub collateral_registry: Arc<catalog_manager::CollateralRegistry>,
234    pub admin_api_key: Option<String>,
235    /// Allow /monitoring/* without an admin key. Only the development
236    /// environment sets this; everywhere else the monitoring middleware
237    /// fails closed when `admin_api_key` is absent.
238    pub allow_unauthenticated_monitoring: bool,
239    pub boot_id: String,
240    pub server_started_at: chrono::DateTime<chrono::Utc>,
241    pub trading_halt: Arc<RwLock<TradingHaltState>>,
242    pub rsm_signer: Option<Arc<dyn hypercall_signer::RsmSigner>>,
243    pub rsm_signer_address: Option<hypercall_types::WalletAddress>,
244    pub hl_info_url: Option<String>,
245    pub exchange_pool_liquidity_reader: Arc<dyn ExchangePoolLiquidityReader>,
246    pub hydromancer_feed: Option<Arc<dyn DirectiveHydromancerFeed>>,
247    pub quote_provider: Arc<dyn hypercall_runtime_api::QuoteProvider>,
248    pub engine_state_digest_provider: Arc<dyn EngineStateDigestProvider>,
249    pub order_snapshot: Arc<dyn hypercall_runtime_api::OrderSnapshotProvider>,
250    pub competition_service: Arc<hypercall_competition::CompetitionService>,
251    pub candle_source: Arc<dyn crate::candles::UnderlyingCandleSource>,
252    pub underlying_to_candle_coin: Arc<std::collections::HashMap<String, String>>,
253    pub bbo_snapshot_reader: Arc<dyn BboReferenceAskReader>,
254    pub indicative_cache: Option<Arc<crate::rfq::indicative_quote_cache::IndicativeQuoteCache>>,
255    pub rfq_manager: Option<Arc<crate::rfq::rfq_manager::RfqManager>>,
256    pub username_service: Arc<crate::username_service::UsernameService>,
257    pub push_service: Option<Arc<crate::push_service::PushNotificationService>>,
258    pub standby_controller: Option<Arc<dyn StandbyPromoter>>,
259    pub notification_service: Option<Arc<crate::notification_service::NotificationService>>,
260    pub standby_promote: Option<Arc<tokio::sync::Mutex<Option<tokio::sync::oneshot::Sender<()>>>>>,
261    pub standby_progress: Option<Arc<dyn StandbyReplayProgress>>,
262    pub startup_progress: Option<Arc<dyn StartupProgressReader>>,
263    pub recovery_safety_report: crate::recovery_safety::SharedRecoverySafetyReport,
264    pub drain_signal: Arc<tokio::sync::Notify>,
265    pub is_draining: Arc<std::sync::atomic::AtomicBool>,
266    pub shutdown: hypercall_runtime_api::Shutdown,
267}
268
269pub async fn submit_tier_update_command(
270    state: &AppState,
271    wallet: hypercall_types::WalletAddress,
272) -> Result<(), String> {
273    let sender = state
274        .tier_update_sender
275        .as_ref()
276        .ok_or_else(|| "tier update engine channel is not configured".to_string())?;
277
278    let tier = state
279        .tier_cache
280        .get_tier(&wallet)
281        .await
282        .map(|tier| tier.tier)
283        .ok_or_else(|| format!("missing tier for tier update: wallet={wallet}"))?;
284    let margin_mode = state
285        .tier_cache
286        .get_margin_mode(&wallet)
287        .await
288        .map_err(|e| format!("failed to get margin mode for tier update: {}", e))?;
289    let trading_limits = state.tier_cache.get_trading_limits_async(&wallet).await;
290    let (tx, rx) = tokio::sync::oneshot::channel();
291
292    sender
293        .send(TierUpdateRequest {
294            wallet,
295            margin_mode,
296            tier,
297            trading_limits,
298            timestamp_ms: hypercall_types::utils::get_timestamp_millis(),
299            applied_tx: Some(tx),
300        })
301        .await
302        .map_err(|_| "tier update engine channel closed".to_string())?;
303
304    tokio::time::timeout(ENGINE_RESPONSE_TIMEOUT, rx)
305        .await
306        .map_err(|_| "tier update apply acknowledgement timed out".to_string())?
307        .map_err(|_| "tier update apply acknowledgement dropped".to_string())?
308}