Skip to main content

hypercall_db_diesel/
rfq.rs

1//! RfqReader + RfqWriter implementations for DatabaseHandler.
2//!
3//! Persists quote provider registrations, RFQ records with legs,
4//! firm quotes, and status transitions.
5
6use anyhow::Result;
7use diesel::prelude::*;
8use diesel::RunQueryDsl;
9use rust_decimal::Decimal;
10
11use hypercall_types::WalletAddress;
12
13use crate::database_handler::DatabaseHandler;
14
15impl hypercall_db::RfqReader for DatabaseHandler {
16    fn get_all_quote_providers_sync(&self) -> Result<Vec<hypercall_db::QuoteProviderRecord>> {
17        use crate::schema::quote_providers::dsl;
18
19        let mut conn = self.pool().get()?;
20        let results = dsl::quote_providers.load::<crate::models::QuoteProviderRecord>(&mut conn)?;
21        Ok(results.into_iter().map(Into::into).collect())
22    }
23}
24
25impl hypercall_db::RfqWriter for DatabaseHandler {
26    fn upsert_quote_provider_sync(&self, qp: &hypercall_db::QuoteProviderRecord) -> Result<()> {
27        use crate::schema::quote_providers::dsl;
28
29        let new_qp = crate::models::NewQuoteProvider {
30            wallet_address: qp.wallet_address,
31            tier: qp.tier.clone(),
32            status: qp.status.clone(),
33            allowed_underlyings: qp.allowed_underlyings.clone(),
34            max_notional_per_quote: qp.max_notional_per_quote,
35            max_open_notional: qp.max_open_notional,
36        };
37
38        let mut conn = self.pool().get()?;
39        diesel::insert_into(dsl::quote_providers)
40            .values(&new_qp)
41            .on_conflict(dsl::wallet_address)
42            .do_update()
43            .set((
44                dsl::tier.eq(&new_qp.tier),
45                dsl::status.eq(&new_qp.status),
46                dsl::allowed_underlyings.eq(&new_qp.allowed_underlyings),
47                dsl::max_notional_per_quote.eq(&new_qp.max_notional_per_quote),
48                dsl::max_open_notional.eq(&new_qp.max_open_notional),
49                dsl::updated_at.eq(diesel::dsl::now),
50            ))
51            .execute(&mut conn)?;
52        Ok(())
53    }
54
55    fn update_quote_provider_status_sync(
56        &self,
57        wallet: &WalletAddress,
58        status: &str,
59    ) -> Result<()> {
60        use crate::schema::quote_providers::dsl;
61
62        let mut conn = self.pool().get()?;
63        let rows = diesel::update(dsl::quote_providers.filter(dsl::wallet_address.eq(wallet)))
64            .set((dsl::status.eq(status), dsl::updated_at.eq(diesel::dsl::now)))
65            .execute(&mut conn)?;
66        if rows == 0 {
67            anyhow::bail!("Quote provider {} not found in DB", wallet);
68        }
69        Ok(())
70    }
71
72    fn persist_rfq_record_sync(
73        &self,
74        rfq_id: &uuid::Uuid,
75        taker_wallet: &WalletAddress,
76        underlying: &str,
77        status: &str,
78        taker_signature: &str,
79        taker_nonce: u64,
80        legs_hash: &[u8; 32],
81        legs: &[(String, String, Decimal)],
82        expires_at_ms: u64,
83    ) -> Result<()> {
84        let mut conn = self.pool().get()?;
85        conn.transaction(|conn| {
86            let header_rows: usize = diesel::sql_query(
87                "INSERT INTO rfq_records (rfq_id, taker_wallet, underlying, status, taker_signature, taker_nonce, legs_hash, expires_at)
88                 VALUES ($1::uuid, $2, $3, $4, $5, $6, $7, to_timestamp($8::double precision / 1000))
89                 ON CONFLICT (rfq_id) DO NOTHING",
90            )
91            .bind::<diesel::sql_types::Text, _>(rfq_id.to_string())
92            .bind::<diesel::sql_types::Binary, _>(taker_wallet.as_bytes())
93            .bind::<diesel::sql_types::Text, _>(underlying)
94            .bind::<diesel::sql_types::Text, _>(status)
95            .bind::<diesel::sql_types::Text, _>(taker_signature)
96            .bind::<diesel::sql_types::BigInt, _>(taker_nonce as i64)
97            .bind::<diesel::sql_types::Binary, _>(legs_hash.as_slice())
98            .bind::<diesel::sql_types::Double, _>(expires_at_ms as f64)
99            .execute(conn)?;
100
101            if header_rows == 0 {
102                return Ok(());
103            }
104
105            for (i, (instrument, side, size)) in legs.iter().enumerate() {
106                diesel::sql_query(
107                    "INSERT INTO rfq_legs (rfq_id, instrument, side, size, leg_index)
108                     VALUES ($1::uuid, $2, $3, $4, $5)
109                     ON CONFLICT (rfq_id, leg_index) DO NOTHING",
110                )
111                .bind::<diesel::sql_types::Text, _>(rfq_id.to_string())
112                .bind::<diesel::sql_types::Text, _>(instrument.as_str())
113                .bind::<diesel::sql_types::Text, _>(side.as_str())
114                .bind::<diesel::sql_types::Numeric, _>(*size)
115                .bind::<diesel::sql_types::SmallInt, _>(i as i16)
116                .execute(conn)?;
117            }
118
119            Ok(())
120        })
121    }
122
123    fn persist_rfq_quote_sync(
124        &self,
125        quote_id: &uuid::Uuid,
126        rfq_id: &uuid::Uuid,
127        qp_wallet: &WalletAddress,
128        net_premium: Decimal,
129        valid_for_ms: u64,
130        qp_signature: &str,
131        qp_nonce: u64,
132        legs: &[(String, String, Decimal, Decimal)],
133        expires_at_ms: u64,
134    ) -> Result<()> {
135        let mut conn = self.pool().get()?;
136        conn.transaction(|conn| {
137            diesel::sql_query(
138                "INSERT INTO rfq_quotes (quote_id, rfq_id, qp_wallet, net_premium, valid_for_ms, qp_signature, qp_nonce, expires_at)
139                 VALUES ($1::uuid, $2::uuid, $3, $4, $5, $6, $7, to_timestamp($8::double precision / 1000))
140                 ON CONFLICT (quote_id) DO NOTHING",
141            )
142            .bind::<diesel::sql_types::Text, _>(quote_id.to_string())
143            .bind::<diesel::sql_types::Text, _>(rfq_id.to_string())
144            .bind::<diesel::sql_types::Binary, _>(qp_wallet.as_bytes())
145            .bind::<diesel::sql_types::Numeric, _>(net_premium)
146            .bind::<diesel::sql_types::BigInt, _>(valid_for_ms as i64)
147            .bind::<diesel::sql_types::Text, _>(qp_signature)
148            .bind::<diesel::sql_types::BigInt, _>(qp_nonce as i64)
149            .bind::<diesel::sql_types::Double, _>(expires_at_ms as f64)
150            .execute(conn)?;
151
152            for (i, (instrument, side, price, size)) in legs.iter().enumerate() {
153                diesel::sql_query(
154                    "INSERT INTO rfq_quote_legs (quote_id, instrument, side, price, size, leg_index)
155                     VALUES ($1::uuid, $2, $3, $4, $5, $6)
156                     ON CONFLICT (quote_id, leg_index) DO NOTHING",
157                )
158                .bind::<diesel::sql_types::Text, _>(quote_id.to_string())
159                .bind::<diesel::sql_types::Text, _>(instrument.as_str())
160                .bind::<diesel::sql_types::Text, _>(side.as_str())
161                .bind::<diesel::sql_types::Numeric, _>(*price)
162                .bind::<diesel::sql_types::Numeric, _>(*size)
163                .bind::<diesel::sql_types::SmallInt, _>(i as i16)
164                .execute(conn)?;
165            }
166
167            Ok(())
168        })
169    }
170
171    fn update_rfq_status_sync(&self, rfq_id: &uuid::Uuid, status: &str) -> Result<()> {
172        let mut conn = self.pool().get()?;
173        diesel::sql_query("UPDATE rfq_records SET status = $1 WHERE rfq_id = $2::uuid")
174            .bind::<diesel::sql_types::Text, _>(status)
175            .bind::<diesel::sql_types::Text, _>(rfq_id.to_string())
176            .execute(&mut conn)?;
177        Ok(())
178    }
179}
180
181#[cfg(test)]
182mod tests {
183    use crate::test_helpers::TestDb;
184    use hypercall_db::*;
185    use hypercall_types::wallet_address::test_wallet;
186    use rust_decimal_macros::dec;
187
188    #[tokio::test]
189    async fn rfq_quote_providers_empty() {
190        let test_db = TestDb::new().await.unwrap();
191        let db = test_db.handler.as_ref();
192        let providers = db.get_all_quote_providers_sync().unwrap();
193        assert!(providers.is_empty());
194    }
195
196    #[tokio::test]
197    async fn rfq_upsert_quote_provider_roundtrip() {
198        let test_db = TestDb::new().await.unwrap();
199        let db = test_db.handler.as_ref();
200        let wallet = test_wallet(30);
201
202        let qp = QuoteProviderRecord {
203            wallet_address: wallet,
204            tier: "qp2".to_string(),
205            status: "active".to_string(),
206            allowed_underlyings: Some(vec!["BTC".to_string(), "ETH".to_string()]),
207            max_notional_per_quote: dec!(1000000),
208            max_open_notional: dec!(5000000),
209            created_at: chrono::Utc::now(),
210            updated_at: chrono::Utc::now(),
211        };
212
213        db.upsert_quote_provider_sync(&qp).unwrap();
214        let loaded = db.get_all_quote_providers_sync().unwrap();
215        assert_eq!(loaded.len(), 1);
216        assert_eq!(loaded[0].wallet_address, wallet);
217        assert_eq!(loaded[0].tier, "qp2");
218        assert_eq!(loaded[0].status, "active");
219    }
220
221    #[tokio::test]
222    async fn rfq_update_quote_provider_status() {
223        let test_db = TestDb::new().await.unwrap();
224        let db = test_db.handler.as_ref();
225        let wallet = test_wallet(31);
226
227        let qp = QuoteProviderRecord {
228            wallet_address: wallet,
229            tier: "qp1".to_string(),
230            status: "active".to_string(),
231            allowed_underlyings: None,
232            max_notional_per_quote: dec!(500000),
233            max_open_notional: dec!(2000000),
234            created_at: chrono::Utc::now(),
235            updated_at: chrono::Utc::now(),
236        };
237
238        db.upsert_quote_provider_sync(&qp).unwrap();
239        db.update_quote_provider_status_sync(&wallet, "suspended")
240            .unwrap();
241
242        let loaded = db.get_all_quote_providers_sync().unwrap();
243        assert_eq!(loaded[0].status, "suspended");
244    }
245
246    #[tokio::test]
247    async fn rfq_persist_record_roundtrip() {
248        let test_db = TestDb::new().await.unwrap();
249        let db = test_db.handler.as_ref();
250        let wallet = test_wallet(32);
251        let rfq_id = uuid::Uuid::new_v4();
252
253        db.persist_rfq_record_sync(
254            &rfq_id,
255            &wallet,
256            "BTC",
257            "created",
258            "0xsig",
259            1,
260            &[0u8; 32],
261            &[
262                (
263                    "BTC-20260131-100000-C".to_string(),
264                    "buy".to_string(),
265                    dec!(1),
266                ),
267                (
268                    "BTC-20260131-110000-C".to_string(),
269                    "sell".to_string(),
270                    dec!(1),
271                ),
272            ],
273            1700000000000,
274        )
275        .unwrap();
276
277        // Idempotent replay
278        db.persist_rfq_record_sync(
279            &rfq_id,
280            &wallet,
281            "BTC",
282            "created",
283            "0xsig",
284            1,
285            &[0u8; 32],
286            &[(
287                "BTC-20260131-100000-C".to_string(),
288                "buy".to_string(),
289                dec!(1),
290            )],
291            1700000000000,
292        )
293        .unwrap();
294    }
295
296    #[tokio::test]
297    async fn rfq_persist_quote_roundtrip() {
298        let test_db = TestDb::new().await.unwrap();
299        let db = test_db.handler.as_ref();
300        let wallet = test_wallet(33);
301        let rfq_id = uuid::Uuid::new_v4();
302        let quote_id = uuid::Uuid::new_v4();
303
304        // Need RFQ record first
305        db.persist_rfq_record_sync(
306            &rfq_id,
307            &wallet,
308            "BTC",
309            "created",
310            "0xsig",
311            1,
312            &[0u8; 32],
313            &[(
314                "BTC-20260131-100000-C".to_string(),
315                "buy".to_string(),
316                dec!(1),
317            )],
318            1700000000000,
319        )
320        .unwrap();
321
322        let qp_wallet = test_wallet(34);
323        db.persist_rfq_quote_sync(
324            &quote_id,
325            &rfq_id,
326            &qp_wallet,
327            dec!(500),
328            5000,
329            "0xqpsig",
330            1,
331            &[(
332                "BTC-20260131-100000-C".to_string(),
333                "sell".to_string(),
334                dec!(500),
335                dec!(1),
336            )],
337            1700000000000,
338        )
339        .unwrap();
340    }
341
342    #[tokio::test]
343    async fn rfq_update_status() {
344        let test_db = TestDb::new().await.unwrap();
345        let db = test_db.handler.as_ref();
346        let wallet = test_wallet(35);
347        let rfq_id = uuid::Uuid::new_v4();
348
349        db.persist_rfq_record_sync(
350            &rfq_id,
351            &wallet,
352            "ETH",
353            "created",
354            "0xsig",
355            1,
356            &[0u8; 32],
357            &[(
358                "ETH-20260131-4000-C".to_string(),
359                "buy".to_string(),
360                dec!(10),
361            )],
362            1700000000000,
363        )
364        .unwrap();
365
366        db.update_rfq_status_sync(&rfq_id, "executed").unwrap();
367    }
368}