1use anyhow::Result;
7use diesel::prelude::*;
8use diesel::RunQueryDsl;
9use rust_decimal::Decimal;
10
11use hypercall_types::WalletAddress;
12
13use crate::database_handler::DatabaseHandler;
14
15impl hypercall_db::RfqReader for DatabaseHandler {
16 fn get_all_quote_providers_sync(&self) -> Result<Vec<hypercall_db::QuoteProviderRecord>> {
17 use crate::schema::quote_providers::dsl;
18
19 let mut conn = self.pool().get()?;
20 let results = dsl::quote_providers.load::<crate::models::QuoteProviderRecord>(&mut conn)?;
21 Ok(results.into_iter().map(Into::into).collect())
22 }
23}
24
25impl hypercall_db::RfqWriter for DatabaseHandler {
26 fn upsert_quote_provider_sync(&self, qp: &hypercall_db::QuoteProviderRecord) -> Result<()> {
27 use crate::schema::quote_providers::dsl;
28
29 let new_qp = crate::models::NewQuoteProvider {
30 wallet_address: qp.wallet_address,
31 tier: qp.tier.clone(),
32 status: qp.status.clone(),
33 allowed_underlyings: qp.allowed_underlyings.clone(),
34 max_notional_per_quote: qp.max_notional_per_quote,
35 max_open_notional: qp.max_open_notional,
36 };
37
38 let mut conn = self.pool().get()?;
39 diesel::insert_into(dsl::quote_providers)
40 .values(&new_qp)
41 .on_conflict(dsl::wallet_address)
42 .do_update()
43 .set((
44 dsl::tier.eq(&new_qp.tier),
45 dsl::status.eq(&new_qp.status),
46 dsl::allowed_underlyings.eq(&new_qp.allowed_underlyings),
47 dsl::max_notional_per_quote.eq(&new_qp.max_notional_per_quote),
48 dsl::max_open_notional.eq(&new_qp.max_open_notional),
49 dsl::updated_at.eq(diesel::dsl::now),
50 ))
51 .execute(&mut conn)?;
52 Ok(())
53 }
54
55 fn update_quote_provider_status_sync(
56 &self,
57 wallet: &WalletAddress,
58 status: &str,
59 ) -> Result<()> {
60 use crate::schema::quote_providers::dsl;
61
62 let mut conn = self.pool().get()?;
63 let rows = diesel::update(dsl::quote_providers.filter(dsl::wallet_address.eq(wallet)))
64 .set((dsl::status.eq(status), dsl::updated_at.eq(diesel::dsl::now)))
65 .execute(&mut conn)?;
66 if rows == 0 {
67 anyhow::bail!("Quote provider {} not found in DB", wallet);
68 }
69 Ok(())
70 }
71
72 fn persist_rfq_record_sync(
73 &self,
74 rfq_id: &uuid::Uuid,
75 taker_wallet: &WalletAddress,
76 underlying: &str,
77 status: &str,
78 taker_signature: &str,
79 taker_nonce: u64,
80 legs_hash: &[u8; 32],
81 legs: &[(String, String, Decimal)],
82 expires_at_ms: u64,
83 ) -> Result<()> {
84 let mut conn = self.pool().get()?;
85 conn.transaction(|conn| {
86 let header_rows: usize = diesel::sql_query(
87 "INSERT INTO rfq_records (rfq_id, taker_wallet, underlying, status, taker_signature, taker_nonce, legs_hash, expires_at)
88 VALUES ($1::uuid, $2, $3, $4, $5, $6, $7, to_timestamp($8::double precision / 1000))
89 ON CONFLICT (rfq_id) DO NOTHING",
90 )
91 .bind::<diesel::sql_types::Text, _>(rfq_id.to_string())
92 .bind::<diesel::sql_types::Binary, _>(taker_wallet.as_bytes())
93 .bind::<diesel::sql_types::Text, _>(underlying)
94 .bind::<diesel::sql_types::Text, _>(status)
95 .bind::<diesel::sql_types::Text, _>(taker_signature)
96 .bind::<diesel::sql_types::BigInt, _>(taker_nonce as i64)
97 .bind::<diesel::sql_types::Binary, _>(legs_hash.as_slice())
98 .bind::<diesel::sql_types::Double, _>(expires_at_ms as f64)
99 .execute(conn)?;
100
101 if header_rows == 0 {
102 return Ok(());
103 }
104
105 for (i, (instrument, side, size)) in legs.iter().enumerate() {
106 diesel::sql_query(
107 "INSERT INTO rfq_legs (rfq_id, instrument, side, size, leg_index)
108 VALUES ($1::uuid, $2, $3, $4, $5)
109 ON CONFLICT (rfq_id, leg_index) DO NOTHING",
110 )
111 .bind::<diesel::sql_types::Text, _>(rfq_id.to_string())
112 .bind::<diesel::sql_types::Text, _>(instrument.as_str())
113 .bind::<diesel::sql_types::Text, _>(side.as_str())
114 .bind::<diesel::sql_types::Numeric, _>(*size)
115 .bind::<diesel::sql_types::SmallInt, _>(i as i16)
116 .execute(conn)?;
117 }
118
119 Ok(())
120 })
121 }
122
123 fn persist_rfq_quote_sync(
124 &self,
125 quote_id: &uuid::Uuid,
126 rfq_id: &uuid::Uuid,
127 qp_wallet: &WalletAddress,
128 net_premium: Decimal,
129 valid_for_ms: u64,
130 qp_signature: &str,
131 qp_nonce: u64,
132 legs: &[(String, String, Decimal, Decimal)],
133 expires_at_ms: u64,
134 ) -> Result<()> {
135 let mut conn = self.pool().get()?;
136 conn.transaction(|conn| {
137 diesel::sql_query(
138 "INSERT INTO rfq_quotes (quote_id, rfq_id, qp_wallet, net_premium, valid_for_ms, qp_signature, qp_nonce, expires_at)
139 VALUES ($1::uuid, $2::uuid, $3, $4, $5, $6, $7, to_timestamp($8::double precision / 1000))
140 ON CONFLICT (quote_id) DO NOTHING",
141 )
142 .bind::<diesel::sql_types::Text, _>(quote_id.to_string())
143 .bind::<diesel::sql_types::Text, _>(rfq_id.to_string())
144 .bind::<diesel::sql_types::Binary, _>(qp_wallet.as_bytes())
145 .bind::<diesel::sql_types::Numeric, _>(net_premium)
146 .bind::<diesel::sql_types::BigInt, _>(valid_for_ms as i64)
147 .bind::<diesel::sql_types::Text, _>(qp_signature)
148 .bind::<diesel::sql_types::BigInt, _>(qp_nonce as i64)
149 .bind::<diesel::sql_types::Double, _>(expires_at_ms as f64)
150 .execute(conn)?;
151
152 for (i, (instrument, side, price, size)) in legs.iter().enumerate() {
153 diesel::sql_query(
154 "INSERT INTO rfq_quote_legs (quote_id, instrument, side, price, size, leg_index)
155 VALUES ($1::uuid, $2, $3, $4, $5, $6)
156 ON CONFLICT (quote_id, leg_index) DO NOTHING",
157 )
158 .bind::<diesel::sql_types::Text, _>(quote_id.to_string())
159 .bind::<diesel::sql_types::Text, _>(instrument.as_str())
160 .bind::<diesel::sql_types::Text, _>(side.as_str())
161 .bind::<diesel::sql_types::Numeric, _>(*price)
162 .bind::<diesel::sql_types::Numeric, _>(*size)
163 .bind::<diesel::sql_types::SmallInt, _>(i as i16)
164 .execute(conn)?;
165 }
166
167 Ok(())
168 })
169 }
170
171 fn update_rfq_status_sync(&self, rfq_id: &uuid::Uuid, status: &str) -> Result<()> {
172 let mut conn = self.pool().get()?;
173 diesel::sql_query("UPDATE rfq_records SET status = $1 WHERE rfq_id = $2::uuid")
174 .bind::<diesel::sql_types::Text, _>(status)
175 .bind::<diesel::sql_types::Text, _>(rfq_id.to_string())
176 .execute(&mut conn)?;
177 Ok(())
178 }
179}
180
181#[cfg(test)]
182mod tests {
183 use crate::test_helpers::TestDb;
184 use hypercall_db::*;
185 use hypercall_types::wallet_address::test_wallet;
186 use rust_decimal_macros::dec;
187
188 #[tokio::test]
189 async fn rfq_quote_providers_empty() {
190 let test_db = TestDb::new().await.unwrap();
191 let db = test_db.handler.as_ref();
192 let providers = db.get_all_quote_providers_sync().unwrap();
193 assert!(providers.is_empty());
194 }
195
196 #[tokio::test]
197 async fn rfq_upsert_quote_provider_roundtrip() {
198 let test_db = TestDb::new().await.unwrap();
199 let db = test_db.handler.as_ref();
200 let wallet = test_wallet(30);
201
202 let qp = QuoteProviderRecord {
203 wallet_address: wallet,
204 tier: "qp2".to_string(),
205 status: "active".to_string(),
206 allowed_underlyings: Some(vec!["BTC".to_string(), "ETH".to_string()]),
207 max_notional_per_quote: dec!(1000000),
208 max_open_notional: dec!(5000000),
209 created_at: chrono::Utc::now(),
210 updated_at: chrono::Utc::now(),
211 };
212
213 db.upsert_quote_provider_sync(&qp).unwrap();
214 let loaded = db.get_all_quote_providers_sync().unwrap();
215 assert_eq!(loaded.len(), 1);
216 assert_eq!(loaded[0].wallet_address, wallet);
217 assert_eq!(loaded[0].tier, "qp2");
218 assert_eq!(loaded[0].status, "active");
219 }
220
221 #[tokio::test]
222 async fn rfq_update_quote_provider_status() {
223 let test_db = TestDb::new().await.unwrap();
224 let db = test_db.handler.as_ref();
225 let wallet = test_wallet(31);
226
227 let qp = QuoteProviderRecord {
228 wallet_address: wallet,
229 tier: "qp1".to_string(),
230 status: "active".to_string(),
231 allowed_underlyings: None,
232 max_notional_per_quote: dec!(500000),
233 max_open_notional: dec!(2000000),
234 created_at: chrono::Utc::now(),
235 updated_at: chrono::Utc::now(),
236 };
237
238 db.upsert_quote_provider_sync(&qp).unwrap();
239 db.update_quote_provider_status_sync(&wallet, "suspended")
240 .unwrap();
241
242 let loaded = db.get_all_quote_providers_sync().unwrap();
243 assert_eq!(loaded[0].status, "suspended");
244 }
245
246 #[tokio::test]
247 async fn rfq_persist_record_roundtrip() {
248 let test_db = TestDb::new().await.unwrap();
249 let db = test_db.handler.as_ref();
250 let wallet = test_wallet(32);
251 let rfq_id = uuid::Uuid::new_v4();
252
253 db.persist_rfq_record_sync(
254 &rfq_id,
255 &wallet,
256 "BTC",
257 "created",
258 "0xsig",
259 1,
260 &[0u8; 32],
261 &[
262 (
263 "BTC-20260131-100000-C".to_string(),
264 "buy".to_string(),
265 dec!(1),
266 ),
267 (
268 "BTC-20260131-110000-C".to_string(),
269 "sell".to_string(),
270 dec!(1),
271 ),
272 ],
273 1700000000000,
274 )
275 .unwrap();
276
277 db.persist_rfq_record_sync(
279 &rfq_id,
280 &wallet,
281 "BTC",
282 "created",
283 "0xsig",
284 1,
285 &[0u8; 32],
286 &[(
287 "BTC-20260131-100000-C".to_string(),
288 "buy".to_string(),
289 dec!(1),
290 )],
291 1700000000000,
292 )
293 .unwrap();
294 }
295
296 #[tokio::test]
297 async fn rfq_persist_quote_roundtrip() {
298 let test_db = TestDb::new().await.unwrap();
299 let db = test_db.handler.as_ref();
300 let wallet = test_wallet(33);
301 let rfq_id = uuid::Uuid::new_v4();
302 let quote_id = uuid::Uuid::new_v4();
303
304 db.persist_rfq_record_sync(
306 &rfq_id,
307 &wallet,
308 "BTC",
309 "created",
310 "0xsig",
311 1,
312 &[0u8; 32],
313 &[(
314 "BTC-20260131-100000-C".to_string(),
315 "buy".to_string(),
316 dec!(1),
317 )],
318 1700000000000,
319 )
320 .unwrap();
321
322 let qp_wallet = test_wallet(34);
323 db.persist_rfq_quote_sync(
324 "e_id,
325 &rfq_id,
326 &qp_wallet,
327 dec!(500),
328 5000,
329 "0xqpsig",
330 1,
331 &[(
332 "BTC-20260131-100000-C".to_string(),
333 "sell".to_string(),
334 dec!(500),
335 dec!(1),
336 )],
337 1700000000000,
338 )
339 .unwrap();
340 }
341
342 #[tokio::test]
343 async fn rfq_update_status() {
344 let test_db = TestDb::new().await.unwrap();
345 let db = test_db.handler.as_ref();
346 let wallet = test_wallet(35);
347 let rfq_id = uuid::Uuid::new_v4();
348
349 db.persist_rfq_record_sync(
350 &rfq_id,
351 &wallet,
352 "ETH",
353 "created",
354 "0xsig",
355 1,
356 &[0u8; 32],
357 &[(
358 "ETH-20260131-4000-C".to_string(),
359 "buy".to_string(),
360 dec!(10),
361 )],
362 1700000000000,
363 )
364 .unwrap();
365
366 db.update_rfq_status_sync(&rfq_id, "executed").unwrap();
367 }
368}