Skip to main content

hypercall_runtime_api/
quote_provider_cache.rs

1use anyhow::Result;
2use hypercall_db::RfqWriter;
3use hypercall_types::{QpStatus, QpTier, WalletAddress};
4use rust_decimal::Decimal;
5use std::collections::HashMap;
6use std::sync::Arc;
7use tokio::sync::RwLock;
8use tracing::info;
9
10/// Configuration for a registered quote provider.
11#[derive(Debug, Clone)]
12pub struct QuoteProviderConfig {
13    pub wallet: WalletAddress,
14    pub tier: QpTier,
15    pub status: QpStatus,
16    pub allowed_underlyings: Option<Vec<String>>,
17    pub max_notional_per_quote: Decimal,
18    pub max_open_notional: Decimal,
19}
20
21impl QuoteProviderConfig {
22    pub fn is_active(&self) -> bool {
23        self.status == QpStatus::Active
24    }
25
26    pub fn can_quote_underlying(&self, underlying: &str) -> bool {
27        match &self.allowed_underlyings {
28            None => true, // None = all underlyings allowed
29            Some(underlyings) => underlyings.iter().any(|u| u == underlying),
30        }
31    }
32}
33
34impl TryFrom<hypercall_db::QuoteProviderRecord> for QuoteProviderConfig {
35    type Error = String;
36
37    fn try_from(record: hypercall_db::QuoteProviderRecord) -> std::result::Result<Self, String> {
38        let tier = match record.tier.as_str() {
39            "qp1" => QpTier::Qp1,
40            "qp2" => QpTier::Qp2,
41            "qp3_internal" => QpTier::Qp3Internal,
42            other => return Err(format!("Unknown QP tier: {}", other)),
43        };
44        let status = match record.status.as_str() {
45            "active" => QpStatus::Active,
46            "suspended" => QpStatus::Suspended,
47            other => return Err(format!("Unknown QP status: {}", other)),
48        };
49        Ok(QuoteProviderConfig {
50            wallet: record.wallet_address,
51            tier,
52            status,
53            allowed_underlyings: record.allowed_underlyings,
54            max_notional_per_quote: record.max_notional_per_quote,
55            max_open_notional: record.max_open_notional,
56        })
57    }
58}
59
60pub struct QuoteProviderCache {
61    providers: Arc<RwLock<HashMap<WalletAddress, QuoteProviderConfig>>>,
62    db: Arc<dyn RfqWriter>,
63}
64
65impl QuoteProviderCache {
66    pub fn new(db: Arc<dyn RfqWriter>) -> Self {
67        Self {
68            providers: Arc::new(RwLock::new(HashMap::new())),
69            db,
70        }
71    }
72
73    /// Load all quote providers from database at startup.
74    pub async fn load_from_db(&self) -> Result<()> {
75        let records = self.db.get_all_quote_providers_sync()?;
76        let mut cache = self.providers.write().await;
77        for record in records {
78            match QuoteProviderConfig::try_from(record) {
79                Ok(config) => {
80                    cache.insert(config.wallet, config);
81                }
82                Err(e) => {
83                    tracing::warn!("Skipping invalid quote provider: {}", e);
84                }
85            }
86        }
87        info!("Loaded {} quote providers from database", cache.len());
88        Ok(())
89    }
90
91    /// Check if a wallet is a registered and active QP.
92    ///
93    /// Takes the read lock asynchronously (rather than `try_read()`) so
94    /// concurrent admin writes (register/suspend) cannot transiently map
95    /// a valid active QP to "inactive" and reject legitimate QP websocket
96    /// connections or firm-quote responses. This is called on the QP
97    /// websocket auth path and on every inbound RFQ response, so silent
98    /// false negatives here look like flaky QP behavior to operators.
99    pub async fn is_active(&self, wallet: &WalletAddress) -> bool {
100        let cache = self.providers.read().await;
101        cache.get(wallet).map(|c| c.is_active()).unwrap_or(false)
102    }
103
104    /// Get all active QPs that can quote a given underlying.
105    ///
106    /// Takes the read lock asynchronously rather than using `try_read`,
107    /// which would silently return an empty vec under concurrent admin
108    /// writes (register/suspend). A false-empty result here causes
109    /// `submit_rfq` to fan out to zero QPs and eventually age into
110    /// `no_quotes` even though eligible providers exist.
111    pub async fn get_active_for_underlying(&self, underlying: &str) -> Vec<QuoteProviderConfig> {
112        let cache = self.providers.read().await;
113        cache
114            .values()
115            .filter(|c| c.is_active() && c.can_quote_underlying(underlying))
116            .cloned()
117            .collect()
118    }
119
120    /// Register or update a quote provider (admin operation).
121    pub async fn register(&self, config: QuoteProviderConfig) -> Result<()> {
122        let record = hypercall_db::QuoteProviderRecord {
123            wallet_address: config.wallet,
124            tier: config.tier.as_str().to_string(),
125            status: match config.status {
126                QpStatus::Active => "active".to_string(),
127                QpStatus::Suspended => "suspended".to_string(),
128            },
129            allowed_underlyings: config.allowed_underlyings.clone(),
130            max_notional_per_quote: config.max_notional_per_quote,
131            max_open_notional: config.max_open_notional,
132            created_at: chrono::Utc::now(),
133            updated_at: chrono::Utc::now(),
134        };
135
136        self.db.upsert_quote_provider_sync(&record)?;
137
138        let mut cache = self.providers.write().await;
139        cache.insert(config.wallet, config);
140        Ok(())
141    }
142
143    /// Update QP status (active/suspended).
144    pub async fn update_status(&self, wallet: &WalletAddress, status: QpStatus) -> Result<()> {
145        let status_str = match status {
146            QpStatus::Active => "active",
147            QpStatus::Suspended => "suspended",
148        };
149
150        self.db
151            .update_quote_provider_status_sync(wallet, status_str)?;
152
153        let mut cache = self.providers.write().await;
154        if let Some(config) = cache.get_mut(wallet) {
155            config.status = status;
156        }
157        Ok(())
158    }
159
160    /// Get all registered QPs (for admin listing).
161    pub async fn get_all(&self) -> Vec<QuoteProviderConfig> {
162        let cache = self.providers.read().await;
163        cache.values().cloned().collect()
164    }
165}
166
167#[cfg(test)]
168mod tests {
169    use super::*;
170    use rust_decimal_macros::dec;
171
172    fn test_wallet(id: u8) -> WalletAddress {
173        let mut bytes = [0u8; 20];
174        bytes[19] = id;
175        WalletAddress::from(alloy::primitives::Address::from(bytes))
176    }
177
178    fn make_config(id: u8, tier: QpTier, active: bool) -> QuoteProviderConfig {
179        QuoteProviderConfig {
180            wallet: test_wallet(id),
181            tier,
182            status: if active {
183                QpStatus::Active
184            } else {
185                QpStatus::Suspended
186            },
187            allowed_underlyings: None,
188            max_notional_per_quote: dec!(1000000),
189            max_open_notional: dec!(5000000),
190        }
191    }
192
193    fn make_config_with_underlyings(id: u8, underlyings: Vec<&str>) -> QuoteProviderConfig {
194        QuoteProviderConfig {
195            wallet: test_wallet(id),
196            tier: QpTier::Qp2,
197            status: QpStatus::Active,
198            allowed_underlyings: Some(underlyings.into_iter().map(String::from).collect()),
199            max_notional_per_quote: dec!(1000000),
200            max_open_notional: dec!(5000000),
201        }
202    }
203
204    #[test]
205    fn test_can_quote_underlying_all() {
206        let config = make_config(1, QpTier::Qp2, true);
207        assert!(config.can_quote_underlying("ETH"));
208        assert!(config.can_quote_underlying("BTC"));
209    }
210
211    #[test]
212    fn test_can_quote_underlying_restricted() {
213        let config = make_config_with_underlyings(1, vec!["ETH"]);
214        assert!(config.can_quote_underlying("ETH"));
215        assert!(!config.can_quote_underlying("BTC"));
216    }
217
218    #[test]
219    fn test_suspended_not_active() {
220        let config = make_config(1, QpTier::Qp1, false);
221        assert!(!config.is_active());
222    }
223
224    #[test]
225    fn test_try_from_record() {
226        let record = hypercall_db::QuoteProviderRecord {
227            wallet_address: test_wallet(1),
228            tier: "qp2".to_string(),
229            status: "active".to_string(),
230            allowed_underlyings: Some(vec!["ETH".to_string()]),
231            max_notional_per_quote: dec!(1000000),
232            max_open_notional: dec!(5000000),
233            created_at: chrono::Utc::now(),
234            updated_at: chrono::Utc::now(),
235        };
236
237        let config = QuoteProviderConfig::try_from(record).unwrap();
238        assert_eq!(config.tier, QpTier::Qp2);
239        assert_eq!(config.status, QpStatus::Active);
240        assert!(config.can_quote_underlying("ETH"));
241        assert!(!config.can_quote_underlying("BTC"));
242    }
243
244    #[test]
245    fn test_try_from_record_invalid_tier() {
246        let record = hypercall_db::QuoteProviderRecord {
247            wallet_address: test_wallet(1),
248            tier: "invalid".to_string(),
249            status: "active".to_string(),
250            allowed_underlyings: None,
251            max_notional_per_quote: dec!(1000000),
252            max_open_notional: dec!(5000000),
253            created_at: chrono::Utc::now(),
254            updated_at: chrono::Utc::now(),
255        };
256
257        assert!(QuoteProviderConfig::try_from(record).is_err());
258    }
259}