1use crate::rfq::quote_provider_cache::QuoteProviderCache;
2use dashmap::DashMap;
3use hypercall_db::RfqWriter;
4use hypercall_runtime_api::{
5 RfqExecuteCommand, RfqExecuteLeg, RfqExecuteRequest, RfqExecuteResult,
6};
7use hypercall_types::utils::get_timestamp_millis as current_time_ms;
8use hypercall_types::{RfqStatus, Side, WalletAddress, RFQ_SELF_TRADE_REJECTION_REASON};
9use hypercall_ws_protocol::{QpRfqLeg, QpServerMessage as QpOutboundMessage};
10use metrics::{counter, histogram};
11use rust_decimal::Decimal;
12use std::collections::HashSet;
13use std::sync::Arc;
14use std::time::Duration;
15use tokio::sync::mpsc;
16use tracing::{info, warn};
17use uuid::Uuid;
18
19pub trait QpSessionRegistry: Send + Sync {
20 fn send_rfq_to_eligible(&self, eligible_wallets: &[WalletAddress], message: QpOutboundMessage);
21
22 fn get_connected_wallets(&self) -> Vec<WalletAddress>;
23}
24
25const OPTION_PREMIUM_TICK_SCALE: u32 = 4;
28
29fn is_nonce_rejection_reason(reason: &str) -> bool {
30 reason.starts_with("nonce ")
31}
32
33#[derive(Debug, Clone)]
35pub struct RfqConfig {
36 pub response_deadline_ms: u64,
37 pub rpi_auction_ms: u64,
38 pub min_improvement_ticks: u32,
39 pub reveal_limit_to_qps: bool,
40 pub max_valid_for_ms: u64,
41 pub rfq_lifetime_ms: u64,
42 pub max_legs: usize,
43}
44
45impl Default for RfqConfig {
46 fn default() -> Self {
47 Self {
48 response_deadline_ms: 10_000,
49 rpi_auction_ms: 2_000,
50 min_improvement_ticks: 1,
51 reveal_limit_to_qps: true,
52 max_valid_for_ms: 30_000,
53 rfq_lifetime_ms: 60_000,
54 max_legs: 10,
55 }
56 }
57}
58
59impl RfqConfig {
60 pub fn min_improvement_tick(&self) -> Decimal {
61 Decimal::new(
62 i64::from(self.min_improvement_ticks),
63 OPTION_PREMIUM_TICK_SCALE,
64 )
65 }
66}
67
68#[derive(Debug, Clone)]
70pub struct RfqLeg {
71 pub instrument: String,
72 pub side: Side,
73 pub size: Decimal,
74}
75
76#[derive(Debug, Clone, Copy, PartialEq, Eq)]
77enum AutoAcceptLimitMode {
78 DebitCap,
79 CreditFloor,
80}
81
82fn auto_accept_limit_mode(legs: &[RfqLeg]) -> Result<AutoAcceptLimitMode, String> {
83 let first_side = legs
84 .first()
85 .ok_or_else(|| "RFQ must have at least one leg".to_string())?
86 .side;
87
88 if legs.iter().any(|leg| leg.side != first_side) {
89 return Err(
90 "Auto-execute RFQ requires all legs on the same side; mixed-side packages require explicit quote acceptance"
91 .to_string(),
92 );
93 }
94
95 Ok(match first_side {
96 Side::Buy => AutoAcceptLimitMode::DebitCap,
97 Side::Sell => AutoAcceptLimitMode::CreditFloor,
98 })
99}
100
101fn quote_satisfies_auto_accept_limit(
102 legs: &[RfqLeg],
103 quote_net_premium: Decimal,
104 limit: Decimal,
105 _min_tick: Decimal,
106) -> Result<bool, String> {
107 if limit <= Decimal::ZERO {
108 return Err("auto_accept_limit must be positive".to_string());
109 }
110
111 match auto_accept_limit_mode(legs)? {
112 AutoAcceptLimitMode::DebitCap => Ok(quote_net_premium >= -limit),
116 AutoAcceptLimitMode::CreditFloor => Ok(quote_net_premium >= limit),
120 }
121}
122
123pub fn quote_qualifies_for_rpi(
124 side: Side,
125 limit_price: Decimal,
126 best_executable_book_price: Option<Decimal>,
127 quote_price: Decimal,
128 min_tick: Decimal,
129) -> bool {
130 if limit_price <= Decimal::ZERO || quote_price <= Decimal::ZERO || min_tick <= Decimal::ZERO {
131 return false;
132 }
133
134 let satisfies_limit = match side {
135 Side::Buy => quote_price <= limit_price,
136 Side::Sell => quote_price >= limit_price,
137 };
138 if !satisfies_limit {
139 return false;
140 }
141
142 match (side, best_executable_book_price) {
143 (Side::Buy, Some(best_ask)) => quote_price <= best_ask - min_tick,
144 (Side::Sell, Some(best_bid)) => quote_price >= best_bid + min_tick,
145 (_, None) => true,
146 }
147}
148
149fn quote_satisfies_rpi_auction(
150 side: Side,
151 limit_price: Decimal,
152 reference_price: Option<Decimal>,
153 quote_price: Decimal,
154 min_tick: Decimal,
155) -> bool {
156 quote_qualifies_for_rpi(side, limit_price, reference_price, quote_price, min_tick)
157}
158
159fn rpi_quote_rejection(
160 side: Side,
161 limit_price: Decimal,
162 reference_price: Option<Decimal>,
163 quote_price: Decimal,
164 min_tick: Decimal,
165) -> (&'static str, String) {
166 if !quote_qualifies_for_rpi(side, limit_price, None, quote_price, min_tick) {
167 return (
168 "outside_limit",
169 format!(
170 "quote price {} does not satisfy limit {}",
171 quote_price, limit_price
172 ),
173 );
174 }
175
176 if let Some(reference_price) = reference_price {
177 return (
178 "no_price_improvement",
179 format!(
180 "quote price {} does not improve reference {} by tick {}",
181 quote_price, reference_price, min_tick
182 ),
183 );
184 }
185
186 (
187 "outside_limit",
188 format!(
189 "quote price {} does not satisfy limit {}",
190 quote_price, limit_price
191 ),
192 )
193}
194
195fn validate_rpi_quote_shape(requested_legs: &[RfqLeg], quote: &QpQuote) -> Result<(), String> {
196 if quote.legs.len() != requested_legs.len() {
197 return Err("RPI quote must cover the full requested leg set".to_string());
198 }
199
200 for (requested, quoted) in requested_legs.iter().zip("e.legs) {
201 if quoted.instrument != requested.instrument {
202 return Err(format!(
203 "RPI quote instrument {} does not match requested {}",
204 quoted.instrument, requested.instrument
205 ));
206 }
207 if quoted.side != requested.side {
208 return Err(format!(
209 "RPI quote side {:?} does not match requested {:?}",
210 quoted.side, requested.side
211 ));
212 }
213 if quoted.size != requested.size {
214 return Err(format!(
215 "RPI quote size {} does not fully cover requested size {}",
216 quoted.size, requested.size
217 ));
218 }
219 }
220
221 Ok(())
222}
223
224fn sorted_rpi_candidate_quote_ids(record: &RfqRecord) -> Result<Vec<Uuid>, String> {
225 if record.rpi_auction.is_none() {
226 return Err("RFQ is not an RPI auction".to_string());
227 }
228 let side = record
229 .legs
230 .first()
231 .ok_or_else(|| "RPI RFQ missing leg".to_string())?
232 .side;
233
234 let mut quotes = record.quotes.clone();
235 quotes.sort_by(|a, b| {
236 let a_price = a.legs.first().map(|leg| leg.price).unwrap_or(Decimal::ZERO);
237 let b_price = b.legs.first().map(|leg| leg.price).unwrap_or(Decimal::ZERO);
238 let price_order = match side {
239 Side::Buy => a_price.cmp(&b_price),
240 Side::Sell => b_price.cmp(&a_price),
241 };
242 price_order.then_with(|| a.received_at.cmp(&b.received_at))
243 });
244
245 Ok(quotes.into_iter().map(|q| q.quote_id).collect())
246}
247
248fn should_close_rpi_auction(record: &RfqRecord, now_ms: u64) -> bool {
249 let all_qps_responded = !record.eligible_qps.is_empty()
250 && record
251 .eligible_qps
252 .iter()
253 .all(|wallet| record.responded_qps.contains(wallet));
254 let no_qps = record.eligible_qps.is_empty();
255 let timed_out = now_ms > record.deadline_at;
256
257 no_qps || all_qps_responded || timed_out || record.status.is_terminal()
258}
259
260#[derive(Debug, Clone)]
262pub struct QpQuote {
263 pub quote_id: Uuid,
264 pub qp_wallet: WalletAddress,
265 pub legs: Vec<QuoteLeg>,
266 pub net_premium: Decimal,
267 pub valid_for_ms: u64,
268 pub qp_signature: String,
269 pub qp_nonce: u64,
270 pub received_at: u64,
271}
272
273#[derive(Debug, Clone)]
274pub struct QuoteLeg {
275 pub instrument: String,
276 pub side: Side,
277 pub price: Decimal,
278 pub size: Decimal,
279}
280
281#[derive(Debug, Clone)]
282pub struct RpiAuctionContext {
283 pub limit_price: Decimal,
284 pub reference_price: Option<Decimal>,
285 pub min_tick: Decimal,
286}
287
288#[derive(Debug, Clone)]
289pub struct SubmitRpiAuction {
290 pub rfq_id: Uuid,
291 pub taker_wallet: WalletAddress,
292 pub taker_signer: WalletAddress,
293 pub builder_code_address: Option<WalletAddress>,
294 pub legs: Vec<RfqLeg>,
295 pub legs_hash: [u8; 32],
296 pub taker_signature: String,
297 pub taker_nonce: u64,
298 pub limit_price: Decimal,
299 pub reference_price: Option<Decimal>,
300 pub min_tick: Decimal,
301}
302
303#[derive(Debug, Clone)]
305pub struct RfqRecord {
306 pub rfq_id: Uuid,
307 pub taker_wallet: WalletAddress,
308 pub taker_signer: WalletAddress,
309 pub builder_code_address: Option<WalletAddress>,
310 pub underlying: String,
311 pub legs: Vec<RfqLeg>,
312 pub legs_hash: [u8; 32],
313 pub taker_signature: String,
314 pub taker_nonce: u64,
315 pub status: RfqStatus,
316 pub quotes: Vec<QpQuote>,
317 pub created_at: u64,
318 pub deadline_at: u64,
319 pub expires_at: u64,
320 pub auto_accept_limit: Option<Decimal>,
324 pub accepted_quote_id: Option<Uuid>,
328 pub execution_quote_id: Option<Uuid>,
329 pub execution_request_id: Option<String>,
330 pub execution_fill_id: Option<String>,
331 pub execution_timestamp_ms: Option<u64>,
332 pub eligible_qps: Vec<WalletAddress>,
333 pub responded_qps: HashSet<WalletAddress>,
334 pub rpi_auction: Option<RpiAuctionContext>,
335}
336
337impl RfqRecord {
338 fn execution_facts_for_quote(
339 &mut self,
340 quote_id: Uuid,
341 timestamp_ms: u64,
342 ) -> (String, String, u64) {
343 if self.execution_quote_id == Some(quote_id) {
344 if let (Some(request_id), Some(fill_id), Some(stored_timestamp_ms)) = (
345 &self.execution_request_id,
346 &self.execution_fill_id,
347 self.execution_timestamp_ms,
348 ) {
349 return (request_id.clone(), fill_id.clone(), stored_timestamp_ms);
350 }
351 }
352
353 let request_id = deterministic_rfq_execution_uuid("request", self.rfq_id, quote_id);
354 let fill_id = deterministic_rfq_execution_uuid("fill", self.rfq_id, quote_id);
355 self.execution_quote_id = Some(quote_id);
356 self.execution_request_id = Some(request_id.clone());
357 self.execution_fill_id = Some(fill_id.clone());
358 self.execution_timestamp_ms = Some(timestamp_ms);
359 (request_id, fill_id, timestamp_ms)
360 }
361
362 fn clear_execution_selection(&mut self) {
363 self.accepted_quote_id = None;
364 self.execution_quote_id = None;
365 self.execution_request_id = None;
366 self.execution_fill_id = None;
367 self.execution_timestamp_ms = None;
368 }
369
370 fn revert_auto_accept_to_quotes_received(&mut self) {
371 self.status = RfqStatus::QuotesReceived;
372 self.clear_execution_selection();
373 self.auto_accept_limit = None;
374 }
375}
376
377fn deterministic_rfq_execution_uuid(purpose: &str, rfq_id: Uuid, quote_id: Uuid) -> String {
378 use sha3::{Digest, Sha3_256};
379
380 let mut hasher = Sha3_256::new();
381 hasher.update(b"rfq-execution:");
382 hasher.update(purpose.as_bytes());
383 hasher.update(b":");
384 hasher.update(rfq_id.as_bytes());
385 hasher.update(b":");
386 hasher.update(quote_id.as_bytes());
387 let digest = hasher.finalize();
388
389 let mut bytes = [0u8; 16];
390 bytes.copy_from_slice(&digest[..16]);
391 bytes[6] = (bytes[6] & 0x0F) | 0x80;
392 bytes[8] = (bytes[8] & 0x3F) | 0x80;
393 Uuid::from_bytes(bytes).to_string()
394}
395
396#[derive(Debug)]
399pub enum HandleQpResponseResult {
400 QuoteStored(RfqRecord),
402 AutoExecuted {
406 record: RfqRecord,
407 accepted_quote_id: Uuid,
408 },
409 AutoExecuteSkipped {
413 record: RfqRecord,
414 quote_premium: Decimal,
415 limit: Decimal,
416 },
417 RpiQuoteStored(RfqRecord),
420 RpiQuoteRejected { record: RfqRecord, reason: String },
423}
424
425pub struct RfqManager {
427 active_rfqs: DashMap<Uuid, RfqRecord>,
428 qp_cache: Arc<QuoteProviderCache>,
429 qp_sessions: Arc<dyn QpSessionRegistry>,
430 rfq_sender: mpsc::Sender<RfqExecuteRequest>,
431 db: Option<Arc<dyn RfqWriter>>,
432 config: RfqConfig,
433}
434
435impl RfqManager {
436 pub fn new(
437 qp_cache: Arc<QuoteProviderCache>,
438 qp_sessions: Arc<dyn QpSessionRegistry>,
439 rfq_sender: mpsc::Sender<RfqExecuteRequest>,
440 config: RfqConfig,
441 ) -> Self {
442 Self {
443 active_rfqs: DashMap::new(),
444 qp_cache,
445 qp_sessions,
446 rfq_sender,
447 db: None,
448 config,
449 }
450 }
451
452 pub fn min_improvement_tick(&self) -> Decimal {
453 self.config.min_improvement_tick()
454 }
455
456 fn persist_status(&self, rfq_id: &Uuid, status: RfqStatus) {
462 if let Some(ref handler) = self.db {
463 if let Err(e) = handler.update_rfq_status_sync(rfq_id, status.as_str()) {
464 warn!(
465 "Failed to persist RFQ {} status {}: {}",
466 rfq_id,
467 status.as_str(),
468 e
469 );
470 }
471 }
472 }
473
474 pub fn set_db(&mut self, handler: Arc<dyn RfqWriter>) {
476 self.db = Some(handler);
477 }
478
479 pub async fn submit_rfq(
481 &self,
482 rfq_id: Uuid,
483 taker_wallet: WalletAddress,
484 taker_signer: WalletAddress,
485 legs: Vec<RfqLeg>,
486 legs_hash: [u8; 32],
487 taker_signature: String,
488 taker_nonce: u64,
489 auto_accept_limit: Option<Decimal>,
490 ) -> Result<RfqRecord, String> {
491 if legs.is_empty() || legs.len() > self.config.max_legs {
493 return Err(format!(
494 "Invalid number of legs: {} (must be 1-{})",
495 legs.len(),
496 self.config.max_legs
497 ));
498 }
499
500 let underlying = legs[0]
502 .instrument
503 .split('-')
504 .next()
505 .ok_or("Invalid instrument format")?
506 .to_string();
507
508 for leg in &legs[1..] {
509 let leg_underlying = leg
510 .instrument
511 .split('-')
512 .next()
513 .ok_or("Invalid instrument format")?;
514 if leg_underlying != underlying {
515 return Err("All legs must share the same underlying".to_string());
516 }
517 }
518
519 if let Some(limit) = auto_accept_limit {
520 if limit <= Decimal::ZERO {
521 return Err("auto_accept_limit must be positive".to_string());
522 }
523 auto_accept_limit_mode(&legs)?;
524 }
525
526 let now = current_time_ms();
527 let response_deadline_ms = self.config.response_deadline_ms;
528 let record = RfqRecord {
529 rfq_id,
530 taker_wallet,
531 taker_signer,
532 builder_code_address: None,
533 underlying: underlying.clone(),
534 legs,
535 legs_hash,
536 taker_signature,
537 taker_nonce,
538 status: RfqStatus::Created,
539 quotes: Vec::new(),
540 created_at: now,
541 deadline_at: now + response_deadline_ms,
542 expires_at: now + self.config.rfq_lifetime_ms,
543 auto_accept_limit,
544 accepted_quote_id: None,
545 execution_quote_id: None,
546 execution_request_id: None,
547 execution_fill_id: None,
548 execution_timestamp_ms: None,
549 eligible_qps: Vec::new(),
550 responded_qps: HashSet::new(),
551 rpi_auction: None,
552 };
553
554 use dashmap::mapref::entry::Entry;
562 match self.active_rfqs.entry(rfq_id) {
563 Entry::Occupied(_) => {
564 return Err(format!("RFQ {} already exists", rfq_id));
565 }
566 Entry::Vacant(slot) => {
567 slot.insert(record.clone());
568 }
569 }
570
571 if let Some(ref handler) = self.db {
573 let db_legs: Vec<(String, String, Decimal)> = record
574 .legs
575 .iter()
576 .map(|l| {
577 let side = match l.side {
578 Side::Buy => "buy".to_string(),
579 Side::Sell => "sell".to_string(),
580 };
581 (l.instrument.clone(), side, l.size)
582 })
583 .collect();
584 if let Err(e) = handler.persist_rfq_record_sync(
585 &rfq_id,
586 &taker_wallet,
587 &underlying,
588 "created",
589 &record.taker_signature,
590 record.taker_nonce,
591 &legs_hash,
592 &db_legs,
593 record.expires_at,
594 ) {
595 info!("Failed to persist RFQ record to DB (non-fatal): {}", e);
596 }
597 }
598
599 let eligible_qps = self.qp_cache.get_active_for_underlying(&underlying).await;
601 let eligible_wallets: Vec<WalletAddress> = eligible_qps
602 .iter()
603 .filter(|_qp| {
604 true })
608 .map(|qp| qp.wallet)
609 .collect();
610
611 if let Some(mut entry) = self.active_rfqs.get_mut(&rfq_id) {
612 entry.eligible_qps = eligible_wallets.clone();
613 }
614
615 if !eligible_wallets.is_empty() {
616 let auto_execute = record.auto_accept_limit.is_some();
617 let revealed_limit = record
618 .auto_accept_limit
619 .and_then(|limit| self.config.reveal_limit_to_qps.then(|| limit.to_string()));
620 let rfq_msg = QpOutboundMessage::RfqRequest {
621 rfq_id: rfq_id.to_string(),
622 legs: record
623 .legs
624 .iter()
625 .map(|l| QpRfqLeg {
626 instrument: l.instrument.clone(),
627 side: format!("{:?}", l.side).to_lowercase(),
628 size: l.size.to_string(),
629 })
630 .collect(),
631 taker_wallet: taker_wallet.as_hex(),
632 request_timestamp: now,
633 response_deadline_ms,
634 auto_accept_limit: revealed_limit.clone(),
635 auto_execute,
636 taker_limit_price: revealed_limit,
637 reference_price: None,
638 min_improvement_tick: None,
639 auction_deadline_ms: None,
640 requires_price_improvement: false,
641 };
642
643 self.qp_sessions
644 .send_rfq_to_eligible(&eligible_wallets, rfq_msg);
645 }
646
647 if let Some(mut entry) = self.active_rfqs.get_mut(&rfq_id) {
649 entry.status = RfqStatus::SentToQps;
650 }
651 self.persist_status(&rfq_id, RfqStatus::SentToQps);
652
653 let record = self
654 .active_rfqs
655 .get(&rfq_id)
656 .map(|r| r.clone())
657 .ok_or("RFQ not found after creation")?;
658
659 counter!(
660 "ht_rfq_submit_total",
661 "underlying" => underlying.clone(),
662 "result" => "ok"
663 )
664 .increment(1);
665
666 info!(
667 "RFQ submitted: rfq_id={}, taker={}, underlying={}, legs={}, eligible_qps={}",
668 rfq_id,
669 taker_wallet.as_hex(),
670 underlying,
671 record.legs.len(),
672 eligible_wallets.len(),
673 );
674
675 Ok(record)
676 }
677
678 pub async fn submit_rpi_auction(&self, request: SubmitRpiAuction) -> Result<RfqRecord, String> {
679 if request.legs.len() != 1 {
680 return Err("RPI auction requires exactly one leg".to_string());
681 }
682 if request.limit_price <= Decimal::ZERO {
683 return Err("RPI limit price must be positive".to_string());
684 }
685 if request
686 .reference_price
687 .is_some_and(|reference_price| reference_price <= Decimal::ZERO)
688 {
689 return Err("RPI reference price must be positive when present".to_string());
690 }
691 if request.min_tick <= Decimal::ZERO {
692 return Err("RPI min tick must be positive".to_string());
693 }
694
695 let underlying = request.legs[0]
696 .instrument
697 .split('-')
698 .next()
699 .ok_or("Invalid instrument format")?
700 .to_string();
701
702 let now = current_time_ms();
703 let auction_ms = self.config.rpi_auction_ms;
704 let record = RfqRecord {
705 rfq_id: request.rfq_id,
706 taker_wallet: request.taker_wallet,
707 taker_signer: request.taker_signer,
708 builder_code_address: request.builder_code_address,
709 underlying: underlying.clone(),
710 legs: request.legs,
711 legs_hash: request.legs_hash,
712 taker_signature: request.taker_signature,
713 taker_nonce: request.taker_nonce,
714 status: RfqStatus::Created,
715 quotes: Vec::new(),
716 created_at: now,
717 deadline_at: now + auction_ms,
718 expires_at: now + self.config.rfq_lifetime_ms,
719 auto_accept_limit: None,
720 accepted_quote_id: None,
721 execution_quote_id: None,
722 execution_request_id: None,
723 execution_fill_id: None,
724 execution_timestamp_ms: None,
725 eligible_qps: Vec::new(),
726 responded_qps: HashSet::new(),
727 rpi_auction: Some(RpiAuctionContext {
728 limit_price: request.limit_price,
729 reference_price: request.reference_price,
730 min_tick: request.min_tick,
731 }),
732 };
733
734 use dashmap::mapref::entry::Entry;
735 match self.active_rfqs.entry(record.rfq_id) {
736 Entry::Occupied(_) => {
737 return Err(format!("RFQ {} already exists", record.rfq_id));
738 }
739 Entry::Vacant(slot) => {
740 slot.insert(record.clone());
741 }
742 }
743
744 if let Some(ref handler) = self.db {
745 let db_legs: Vec<(String, String, Decimal)> = record
746 .legs
747 .iter()
748 .map(|l| {
749 let side = match l.side {
750 Side::Buy => "buy".to_string(),
751 Side::Sell => "sell".to_string(),
752 };
753 (l.instrument.clone(), side, l.size)
754 })
755 .collect();
756 if let Err(e) = handler.persist_rfq_record_sync(
757 &record.rfq_id,
758 &record.taker_wallet,
759 &underlying,
760 "created",
761 &record.taker_signature,
762 record.taker_nonce,
763 &record.legs_hash,
764 &db_legs,
765 record.expires_at,
766 ) {
767 info!("Failed to persist RPI RFQ record to DB (non-fatal): {}", e);
768 }
769 }
770
771 let eligible_qps = self.qp_cache.get_active_for_underlying(&underlying).await;
772 let connected_wallets: HashSet<WalletAddress> = self
773 .qp_sessions
774 .get_connected_wallets()
775 .into_iter()
776 .collect();
777 let eligible_wallets: Vec<WalletAddress> = eligible_qps
778 .iter()
779 .map(|qp| qp.wallet)
780 .filter(|wallet| connected_wallets.contains(wallet))
781 .collect();
782
783 if let Some(mut entry) = self.active_rfqs.get_mut(&record.rfq_id) {
784 entry.eligible_qps = eligible_wallets.clone();
785 }
786
787 if !eligible_wallets.is_empty() {
788 let rpi = record
789 .rpi_auction
790 .as_ref()
791 .ok_or("RPI context missing after record creation")?;
792 let rfq_msg = QpOutboundMessage::RfqRequest {
793 rfq_id: record.rfq_id.to_string(),
794 legs: record
795 .legs
796 .iter()
797 .map(|l| QpRfqLeg {
798 instrument: l.instrument.clone(),
799 side: format!("{:?}", l.side).to_lowercase(),
800 size: l.size.to_string(),
801 })
802 .collect(),
803 taker_wallet: record.taker_wallet.as_hex(),
804 request_timestamp: now,
805 response_deadline_ms: auction_ms,
806 auto_accept_limit: None,
807 auto_execute: true,
808 taker_limit_price: self
809 .config
810 .reveal_limit_to_qps
811 .then(|| rpi.limit_price.to_string()),
812 reference_price: rpi.reference_price.map(|price| price.to_string()),
813 min_improvement_tick: rpi.reference_price.map(|_| rpi.min_tick.to_string()),
814 auction_deadline_ms: Some(auction_ms),
815 requires_price_improvement: rpi.reference_price.is_some(),
816 };
817 self.qp_sessions
818 .send_rfq_to_eligible(&eligible_wallets, rfq_msg);
819 }
820
821 if let Some(mut entry) = self.active_rfqs.get_mut(&record.rfq_id) {
822 entry.status = RfqStatus::SentToQps;
823 }
824 self.persist_status(&record.rfq_id, RfqStatus::SentToQps);
825
826 counter!(
827 "ht_rpi_auction_total",
828 "outcome" => "started",
829 "underlying" => underlying.clone()
830 )
831 .increment(1);
832
833 Ok(self
834 .active_rfqs
835 .get(&record.rfq_id)
836 .map(|r| r.clone())
837 .ok_or("RPI RFQ not found after creation")?)
838 }
839
840 pub fn handle_qp_response(
850 &self,
851 rfq_id: Uuid,
852 quote: QpQuote,
853 ) -> Result<HandleQpResponseResult, String> {
854 let mut entry = self.active_rfqs.get_mut(&rfq_id).ok_or("RFQ not found")?;
855
856 if entry.status != RfqStatus::SentToQps && entry.status != RfqStatus::QuotesReceived {
858 return Err(format!(
859 "RFQ in invalid state for quotes: {:?}",
860 entry.status
861 ));
862 }
863
864 let now = current_time_ms();
865 if now > entry.deadline_at {
866 return Err("RFQ response deadline has passed".to_string());
867 }
868
869 if quote.valid_for_ms > self.config.max_valid_for_ms {
871 return Err(format!(
872 "valid_for_ms {} exceeds max {}",
873 quote.valid_for_ms, self.config.max_valid_for_ms
874 ));
875 }
876
877 if entry.taker_wallet == quote.qp_wallet {
878 entry.responded_qps.insert(quote.qp_wallet);
879 let snapshot = entry.clone();
880 if entry.rpi_auction.is_some() {
881 return Ok(HandleQpResponseResult::RpiQuoteRejected {
882 record: snapshot,
883 reason: RFQ_SELF_TRADE_REJECTION_REASON.to_string(),
884 });
885 }
886 return Err(RFQ_SELF_TRADE_REJECTION_REASON.to_string());
887 }
888
889 let rpi_auction = entry.rpi_auction.clone();
890 if let Some(rpi) = rpi_auction.as_ref() {
891 entry.responded_qps.insert(quote.qp_wallet);
892 if let Err(reason) = validate_rpi_quote_shape(&entry.legs, "e) {
893 let snapshot = entry.clone();
894 return Ok(HandleQpResponseResult::RpiQuoteRejected {
895 record: snapshot,
896 reason,
897 });
898 }
899 let Some(first_quote_leg) = quote.legs.first() else {
900 let snapshot = entry.clone();
901 return Ok(HandleQpResponseResult::RpiQuoteRejected {
902 record: snapshot,
903 reason: "RPI quote missing quote leg".to_string(),
904 });
905 };
906 if !quote_satisfies_rpi_auction(
907 first_quote_leg.side,
908 rpi.limit_price,
909 rpi.reference_price,
910 first_quote_leg.price,
911 rpi.min_tick,
912 ) {
913 let snapshot = entry.clone();
914 let (reason_label, reason) = rpi_quote_rejection(
915 first_quote_leg.side,
916 rpi.limit_price,
917 rpi.reference_price,
918 first_quote_leg.price,
919 rpi.min_tick,
920 );
921 counter!("ht_rpi_quote_rejected_total", "reason" => reason_label).increment(1);
922 return Ok(HandleQpResponseResult::RpiQuoteRejected {
923 record: snapshot,
924 reason,
925 });
926 }
927 } else {
928 entry.responded_qps.insert(quote.qp_wallet);
929 }
930
931 if let Some(ref handler) = self.db {
933 let db_legs: Vec<(String, String, Decimal, Decimal)> = quote
934 .legs
935 .iter()
936 .map(|l| {
937 let side = match l.side {
938 Side::Buy => "buy".to_string(),
939 Side::Sell => "sell".to_string(),
940 };
941 (l.instrument.clone(), side, l.price, l.size)
942 })
943 .collect();
944 let expires_at_ms = quote.received_at + quote.valid_for_ms;
945 if let Err(e) = handler.persist_rfq_quote_sync(
946 "e.quote_id,
947 &rfq_id,
948 "e.qp_wallet,
949 quote.net_premium,
950 quote.valid_for_ms,
951 "e.qp_signature,
952 quote.qp_nonce,
953 &db_legs,
954 expires_at_ms,
955 ) {
956 info!("Failed to persist RFQ quote to DB (non-fatal): {}", e);
957 }
958 }
959
960 let quote_id = quote.quote_id;
962 let quote_net_premium = quote.net_premium;
963 entry.quotes.push(quote);
964 let transitioned_to_quotes_received = entry.status == RfqStatus::SentToQps;
965 if transitioned_to_quotes_received {
966 entry.status = RfqStatus::QuotesReceived;
967 }
968
969 if rpi_auction.is_some() {
970 let snapshot = entry.clone();
971 drop(entry);
972 if transitioned_to_quotes_received {
973 self.persist_status(&rfq_id, RfqStatus::QuotesReceived);
974 }
975 return Ok(HandleQpResponseResult::RpiQuoteStored(snapshot));
976 }
977
978 let auto_accept_limit = entry.auto_accept_limit;
982 let auto_execute = if let Some(limit) = auto_accept_limit {
983 let within_limit = match quote_satisfies_auto_accept_limit(
984 &entry.legs,
985 quote_net_premium,
986 limit,
987 self.config.min_improvement_tick(),
988 ) {
989 Ok(within_limit) => within_limit,
990 Err(reason) => {
991 warn!(
992 "RFQ {} auto-execute SKIPPED: {} net_premium={} limit={}",
993 rfq_id, reason, quote_net_premium, limit
994 );
995 false
996 }
997 };
998
999 if within_limit
1000 && (entry.status == RfqStatus::QuotesReceived
1001 || entry.status == RfqStatus::SentToQps)
1002 {
1003 entry.status = RfqStatus::Accepted;
1004 entry.accepted_quote_id = Some(quote_id);
1005 true
1006 } else {
1007 info!(
1008 "RFQ {} auto-execute SKIPPED: net_premium={} limit={} status={:?}",
1009 rfq_id, quote_net_premium, limit, entry.status,
1010 );
1011 false
1012 }
1013 } else {
1014 false
1015 };
1016
1017 let snapshot = entry.clone();
1018 drop(entry);
1019
1020 if auto_execute {
1021 self.persist_status(&rfq_id, RfqStatus::Accepted);
1022 counter!("ht_rfq_auto_execute_total", "result" => "triggered").increment(1);
1023 info!(
1024 "RFQ {} auto-executing: quote {} net_premium={} within limit",
1025 rfq_id, quote_id, quote_net_premium
1026 );
1027 Ok(HandleQpResponseResult::AutoExecuted {
1028 record: snapshot,
1029 accepted_quote_id: quote_id,
1030 })
1031 } else {
1032 if transitioned_to_quotes_received {
1033 self.persist_status(&rfq_id, RfqStatus::QuotesReceived);
1034 }
1035 if let Some(limit) = auto_accept_limit {
1036 Ok(HandleQpResponseResult::AutoExecuteSkipped {
1037 record: snapshot,
1038 quote_premium: quote_net_premium.abs(),
1039 limit,
1040 })
1041 } else {
1042 Ok(HandleQpResponseResult::QuoteStored(snapshot))
1043 }
1044 }
1045 }
1046
1047 pub fn record_qp_invalid_response(&self, rfq_id: Uuid, qp_wallet: WalletAddress) {
1048 if let Some(mut entry) = self.active_rfqs.get_mut(&rfq_id) {
1049 if entry.rpi_auction.is_some() {
1050 entry.responded_qps.insert(qp_wallet);
1051 }
1052 }
1053 }
1054
1055 pub async fn accept_quote(
1059 &self,
1060 rfq_id: Uuid,
1061 quote_id: Uuid,
1062 taker_wallet: WalletAddress,
1063 taker_signature: String,
1064 taker_signer: WalletAddress,
1065 taker_nonce: u64,
1066 ) -> Result<RfqExecuteResult, String> {
1067 let (cmd, was_accepted) = {
1069 let mut entry = self.active_rfqs.get_mut(&rfq_id).ok_or("RFQ not found")?;
1070
1071 if entry.status != RfqStatus::QuotesReceived {
1073 return Err(format!(
1074 "RFQ in invalid state for acceptance: {:?}",
1075 entry.status
1076 ));
1077 }
1078
1079 if entry.taker_wallet != taker_wallet {
1080 return Err("Taker wallet mismatch".to_string());
1081 }
1082
1083 let quote = entry
1085 .quotes
1086 .iter()
1087 .find(|q| q.quote_id == quote_id)
1088 .ok_or("Quote not found")?
1089 .clone();
1090
1091 let now = current_time_ms();
1093 let quote_expires_at = quote.received_at + quote.valid_for_ms;
1094 if now > quote_expires_at {
1095 return Err("Quote has expired".to_string());
1096 }
1097
1098 entry.status = RfqStatus::Accepted;
1100 entry.accepted_quote_id = Some(quote_id);
1101 let (request_id, fill_id, execution_timestamp_ms) =
1102 entry.execution_facts_for_quote(quote_id, now);
1103
1104 let legs: Vec<RfqExecuteLeg> = quote
1106 .legs
1107 .iter()
1108 .map(|l| RfqExecuteLeg {
1109 instrument: l.instrument.clone(),
1110 taker_side: l.side,
1111 price: l.price,
1112 size: l.size,
1113 })
1114 .collect();
1115
1116 let cmd = RfqExecuteCommand {
1117 request_id,
1118 fill_id,
1119 rfq_id: rfq_id.to_string(),
1120 quote_id: quote_id.to_string(),
1121 taker_wallet,
1122 qp_wallet: quote.qp_wallet,
1123 builder_code_address: entry.builder_code_address,
1124 timestamp_ms: execution_timestamp_ms,
1125 legs,
1126 net_premium: quote.net_premium,
1127 taker_signature,
1128 qp_signature: quote.qp_signature.clone(),
1129 taker_nonce: Some(entry.taker_nonce),
1130 taker_accept_nonce: Some(taker_nonce),
1131 taker_submit_signer: Some(entry.taker_signer),
1132 taker_accept_signer: Some(taker_signer),
1133 };
1134
1135 (cmd, true)
1136 };
1137
1138 if !was_accepted {
1139 return Err("Failed to accept quote".to_string());
1140 }
1141 self.persist_status(&rfq_id, RfqStatus::Accepted);
1142
1143 let (response_tx, response_rx) = tokio::sync::oneshot::channel();
1153 let request = RfqExecuteRequest {
1154 command: cmd,
1155 response_tx,
1156 };
1157
1158 let revert_to_quotes_received = |manager: &Self| {
1159 if let Some(mut entry) = manager.active_rfqs.get_mut(&rfq_id) {
1160 entry.status = RfqStatus::QuotesReceived;
1161 entry.accepted_quote_id = None;
1162 }
1163 manager.persist_status(&rfq_id, RfqStatus::QuotesReceived);
1164 };
1165
1166 if self.rfq_sender.send(request).await.is_err() {
1167 revert_to_quotes_received(self);
1168 return Err("Engine channel closed".to_string());
1169 }
1170
1171 let result = match response_rx.await {
1172 Ok(r) => r,
1173 Err(_) => {
1174 revert_to_quotes_received(self);
1175 return Err("Engine response channel closed".to_string());
1176 }
1177 };
1178
1179 if let RfqExecuteResult::Failed { reason } = &result {
1180 if is_nonce_rejection_reason(reason) {
1181 revert_to_quotes_received(self);
1182 return Err(format!("RFQ execution rejected: {}", reason));
1183 }
1184 }
1185
1186 let terminal_status = match &result {
1188 RfqExecuteResult::Success { .. } => RfqStatus::Executed,
1189 RfqExecuteResult::Failed { .. } => RfqStatus::Failed,
1190 };
1191 if let Some(mut entry) = self.active_rfqs.get_mut(&rfq_id) {
1192 entry.status = terminal_status;
1193 }
1194 self.persist_status(&rfq_id, terminal_status);
1195
1196 Ok(result)
1197 }
1198
1199 pub async fn auto_accept_quote(
1204 &self,
1205 rfq_id: Uuid,
1206 quote_id: Uuid,
1207 taker_signature: String,
1208 ) -> Result<RfqExecuteResult, String> {
1209 let cmd = {
1211 let mut entry = self.active_rfqs.get_mut(&rfq_id).ok_or("RFQ not found")?;
1212 if entry.status != RfqStatus::Accepted {
1213 return Err(format!(
1214 "RFQ in unexpected state for auto-accept execution: {:?}",
1215 entry.status
1216 ));
1217 }
1218
1219 let quote = entry
1220 .quotes
1221 .iter()
1222 .find(|q| q.quote_id == quote_id)
1223 .ok_or("Quote not found for auto-accept")?
1224 .clone();
1225
1226 let now = current_time_ms();
1229 let quote_expires_at = quote.received_at + quote.valid_for_ms;
1230 if now > quote_expires_at {
1231 entry.revert_auto_accept_to_quotes_received();
1234 drop(entry);
1235 self.persist_status(&rfq_id, RfqStatus::QuotesReceived);
1236 return Err("Quote has expired".to_string());
1237 }
1238
1239 let legs: Vec<RfqExecuteLeg> = quote
1240 .legs
1241 .iter()
1242 .map(|l| RfqExecuteLeg {
1243 instrument: l.instrument.clone(),
1244 taker_side: l.side,
1245 price: l.price,
1246 size: l.size,
1247 })
1248 .collect();
1249 let (request_id, fill_id, execution_timestamp_ms) =
1250 entry.execution_facts_for_quote(quote_id, now);
1251
1252 RfqExecuteCommand {
1253 request_id,
1254 fill_id,
1255 rfq_id: rfq_id.to_string(),
1256 quote_id: quote_id.to_string(),
1257 taker_wallet: entry.taker_wallet,
1258 qp_wallet: quote.qp_wallet,
1259 builder_code_address: entry.builder_code_address,
1260 timestamp_ms: execution_timestamp_ms,
1261 legs,
1262 net_premium: quote.net_premium,
1263 taker_signature,
1264 qp_signature: quote.qp_signature.clone(),
1265 taker_nonce: Some(entry.taker_nonce),
1266 taker_accept_nonce: None,
1267 taker_submit_signer: Some(entry.taker_signer),
1268 taker_accept_signer: None,
1269 }
1270 }; let (response_tx, response_rx) = tokio::sync::oneshot::channel();
1274 let request = RfqExecuteRequest {
1275 command: cmd,
1276 response_tx,
1277 };
1278
1279 let revert_to_quotes_received = |manager: &Self| {
1280 if let Some(mut entry) = manager.active_rfqs.get_mut(&rfq_id) {
1281 entry.revert_auto_accept_to_quotes_received();
1282 }
1283 manager.persist_status(&rfq_id, RfqStatus::QuotesReceived);
1284 };
1285
1286 if self.rfq_sender.send(request).await.is_err() {
1287 revert_to_quotes_received(self);
1288 return Err("Engine channel closed".to_string());
1289 }
1290
1291 let result = match response_rx.await {
1292 Ok(r) => r,
1293 Err(_) => {
1294 revert_to_quotes_received(self);
1295 return Err("Engine response channel closed".to_string());
1296 }
1297 };
1298
1299 if let RfqExecuteResult::Failed { reason } = &result {
1300 if is_nonce_rejection_reason(reason) {
1301 revert_to_quotes_received(self);
1302 return Err(format!("RFQ execution rejected: {}", reason));
1303 }
1304 }
1305
1306 let terminal_status = match &result {
1308 RfqExecuteResult::Success { .. } => RfqStatus::Executed,
1309 RfqExecuteResult::Failed { .. } => RfqStatus::Failed,
1310 };
1311 if let Some(mut entry) = self.active_rfqs.get_mut(&rfq_id) {
1312 entry.status = terminal_status;
1313 }
1314 self.persist_status(&rfq_id, terminal_status);
1315
1316 Ok(result)
1317 }
1318
1319 pub fn record_qp_decline(&self, rfq_id: Uuid, qp_wallet: WalletAddress) {
1320 if let Some(mut entry) = self.active_rfqs.get_mut(&rfq_id) {
1321 entry.responded_qps.insert(qp_wallet);
1322 }
1323 }
1324
1325 pub async fn wait_for_rpi_auction_close(&self, rfq_id: Uuid) -> Result<RfqRecord, String> {
1326 loop {
1327 let record = self
1328 .active_rfqs
1329 .get(&rfq_id)
1330 .map(|r| r.clone())
1331 .ok_or_else(|| "RPI RFQ not found".to_string())?;
1332 if record.rpi_auction.is_none() {
1333 return Err("RFQ is not an RPI auction".to_string());
1334 }
1335
1336 if should_close_rpi_auction(&record, current_time_ms()) {
1337 histogram!(
1338 "ht_rpi_auction_duration_ms",
1339 "outcome" => if record.quotes.is_empty() { "fallback_to_book" } else { "candidate_quotes" }
1340 )
1341 .record(current_time_ms().saturating_sub(record.created_at) as f64);
1342 return Ok(record);
1343 }
1344
1345 tokio::time::sleep(Duration::from_millis(10)).await;
1346 }
1347 }
1348
1349 pub fn rpi_candidate_quote_ids(&self, rfq_id: &Uuid) -> Result<Vec<Uuid>, String> {
1350 let record = self
1351 .active_rfqs
1352 .get(rfq_id)
1353 .map(|r| r.clone())
1354 .ok_or_else(|| "RPI RFQ not found".to_string())?;
1355 sorted_rpi_candidate_quote_ids(&record)
1356 }
1357
1358 pub async fn execute_rpi_quote(
1359 &self,
1360 rfq_id: Uuid,
1361 quote_id: Uuid,
1362 taker_signature: String,
1363 ) -> Result<RfqExecuteResult, String> {
1364 let cmd = {
1365 let mut entry = self.active_rfqs.get_mut(&rfq_id).ok_or("RFQ not found")?;
1366 if entry.rpi_auction.is_none() {
1367 return Err("RFQ is not an RPI auction".to_string());
1368 }
1369 if entry.status != RfqStatus::SentToQps && entry.status != RfqStatus::QuotesReceived {
1370 return Err(format!(
1371 "RFQ in invalid state for RPI execution: {:?}",
1372 entry.status
1373 ));
1374 }
1375
1376 let quote = entry
1377 .quotes
1378 .iter()
1379 .find(|q| q.quote_id == quote_id)
1380 .ok_or("Quote not found")?
1381 .clone();
1382
1383 let now = current_time_ms();
1384 let quote_expires_at = quote.received_at + quote.valid_for_ms;
1385 if now > quote_expires_at {
1386 return Err("Quote has expired".to_string());
1387 }
1388
1389 entry.status = RfqStatus::Accepted;
1390 entry.accepted_quote_id = Some(quote_id);
1391 let (request_id, fill_id, execution_timestamp_ms) =
1392 entry.execution_facts_for_quote(quote_id, now);
1393
1394 let legs: Vec<RfqExecuteLeg> = quote
1395 .legs
1396 .iter()
1397 .map(|l| RfqExecuteLeg {
1398 instrument: l.instrument.clone(),
1399 taker_side: l.side,
1400 price: l.price,
1401 size: l.size,
1402 })
1403 .collect();
1404
1405 RfqExecuteCommand {
1406 request_id,
1407 fill_id,
1408 rfq_id: rfq_id.to_string(),
1409 quote_id: quote_id.to_string(),
1410 taker_wallet: entry.taker_wallet,
1411 qp_wallet: quote.qp_wallet,
1412 builder_code_address: entry.builder_code_address,
1413 timestamp_ms: execution_timestamp_ms,
1414 legs,
1415 net_premium: quote.net_premium,
1416 taker_signature,
1417 qp_signature: quote.qp_signature.clone(),
1418 taker_nonce: Some(entry.taker_nonce),
1419 taker_accept_nonce: None,
1420 taker_submit_signer: Some(entry.taker_signer),
1421 taker_accept_signer: None,
1422 }
1423 };
1424 self.persist_status(&rfq_id, RfqStatus::Accepted);
1425
1426 let (response_tx, response_rx) = tokio::sync::oneshot::channel();
1427 let request = RfqExecuteRequest {
1428 command: cmd,
1429 response_tx,
1430 };
1431
1432 let revert_to_quotes_received = |manager: &Self| {
1433 if let Some(mut entry) = manager.active_rfqs.get_mut(&rfq_id) {
1434 entry.status = RfqStatus::QuotesReceived;
1435 entry.clear_execution_selection();
1436 }
1437 manager.persist_status(&rfq_id, RfqStatus::QuotesReceived);
1438 };
1439
1440 if self.rfq_sender.send(request).await.is_err() {
1441 revert_to_quotes_received(self);
1442 return Err("Engine channel closed".to_string());
1443 }
1444
1445 let result = match response_rx.await {
1446 Ok(r) => r,
1447 Err(_) => {
1448 revert_to_quotes_received(self);
1449 return Err("Engine response channel closed".to_string());
1450 }
1451 };
1452
1453 match &result {
1454 RfqExecuteResult::Success { .. } => {
1455 if let Some(mut entry) = self.active_rfqs.get_mut(&rfq_id) {
1456 entry.status = RfqStatus::Executed;
1457 }
1458 self.persist_status(&rfq_id, RfqStatus::Executed);
1459 counter!("ht_rpi_auction_total", "outcome" => "filled").increment(1);
1460 }
1461 RfqExecuteResult::Failed { .. } => {
1462 revert_to_quotes_received(self);
1463 }
1464 }
1465
1466 Ok(result)
1467 }
1468
1469 pub fn mark_rpi_fallback_to_book(&self, rfq_id: &Uuid) {
1470 if let Some(mut entry) = self.active_rfqs.get_mut(rfq_id) {
1471 entry.status = if entry.quotes.is_empty() {
1472 RfqStatus::NoQuotes
1473 } else {
1474 RfqStatus::Failed
1475 };
1476 }
1477 let status = self
1478 .active_rfqs
1479 .get(rfq_id)
1480 .map(|entry| entry.status)
1481 .unwrap_or(RfqStatus::Failed);
1482 self.persist_status(rfq_id, status);
1483 counter!("ht_rpi_auction_total", "outcome" => "fallback_to_book").increment(1);
1484 }
1485
1486 pub fn get_rfq(&self, rfq_id: &Uuid) -> Option<RfqRecord> {
1488 self.active_rfqs.get(rfq_id).map(|r| r.clone())
1489 }
1490
1491 pub fn get_history(&self, wallet: &WalletAddress) -> Vec<RfqRecord> {
1493 self.active_rfqs
1494 .iter()
1495 .filter(|r| r.taker_wallet == *wallet)
1496 .map(|r| r.clone())
1497 .collect()
1498 }
1499
1500 pub fn start_cleanup_task(
1502 self: &Arc<Self>,
1503 mut shutdown: tokio::sync::broadcast::Receiver<()>,
1504 ) {
1505 let manager = Arc::clone(self);
1506 tokio::spawn(async move {
1507 let mut interval = tokio::time::interval(std::time::Duration::from_secs(1));
1508 loop {
1509 tokio::select! {
1510 _ = shutdown.recv() => break,
1511 _ = interval.tick() => {
1512 manager.cleanup_expired();
1513 }
1514 }
1515 }
1516 });
1517 }
1518
1519 fn cleanup_expired(&self) {
1521 let now = current_time_ms();
1522
1523 let transitions = std::sync::Mutex::new(Vec::<(Uuid, RfqStatus)>::new());
1527 self.active_rfqs.alter_all(|rfq_id, mut record| {
1528 let new_status = match record.status {
1529 RfqStatus::SentToQps if now > record.deadline_at => {
1530 if record.quotes.is_empty() {
1531 Some(RfqStatus::NoQuotes)
1532 } else {
1533 Some(RfqStatus::QuotesReceived)
1534 }
1535 }
1536 RfqStatus::QuotesReceived if now > record.expires_at => Some(RfqStatus::Expired),
1537 _ => None,
1538 };
1539 if let Some(status) = new_status {
1540 record.status = status;
1541 if let Ok(mut t) = transitions.lock() {
1542 t.push((*rfq_id, status));
1543 }
1544 }
1545 record
1546 });
1547
1548 if let Ok(t) = transitions.into_inner() {
1549 for (rfq_id, status) in t {
1550 self.persist_status(&rfq_id, status);
1551 }
1552 }
1553
1554 let eviction_threshold = now.saturating_sub(5 * 60 * 1000);
1556 self.active_rfqs.retain(|_, record| {
1557 !(record.status.is_terminal() && record.created_at < eviction_threshold)
1558 });
1559 }
1560}
1561
1562#[cfg(test)]
1566struct TestRfqManager {
1567 active_rfqs: Arc<DashMap<Uuid, RfqRecord>>,
1568 config: RfqConfig,
1569}
1570
1571#[cfg(test)]
1572impl TestRfqManager {
1573 fn new() -> Self {
1574 Self {
1575 active_rfqs: Arc::new(DashMap::new()),
1576 config: RfqConfig::default(),
1577 }
1578 }
1579
1580 fn insert_rfq(&self, record: RfqRecord) {
1581 self.active_rfqs.insert(record.rfq_id, record);
1582 }
1583
1584 fn get_rfq(&self, rfq_id: &Uuid) -> Option<RfqRecord> {
1585 self.active_rfqs.get(rfq_id).map(|r| r.clone())
1586 }
1587
1588 fn handle_qp_response(
1591 &self,
1592 rfq_id: Uuid,
1593 quote: QpQuote,
1594 ) -> Result<HandleQpResponseResult, String> {
1595 let mut entry = self.active_rfqs.get_mut(&rfq_id).ok_or("RFQ not found")?;
1596
1597 if entry.status != RfqStatus::SentToQps && entry.status != RfqStatus::QuotesReceived {
1598 return Err(format!(
1599 "RFQ in invalid state for quotes: {:?}",
1600 entry.status
1601 ));
1602 }
1603
1604 let now = current_time_ms();
1605 if now > entry.deadline_at {
1606 return Err("RFQ response deadline has passed".to_string());
1607 }
1608
1609 if quote.valid_for_ms > self.config.max_valid_for_ms {
1610 return Err(format!(
1611 "valid_for_ms {} exceeds max {}",
1612 quote.valid_for_ms, self.config.max_valid_for_ms
1613 ));
1614 }
1615
1616 if entry.taker_wallet == quote.qp_wallet {
1617 entry.responded_qps.insert(quote.qp_wallet);
1618 let snapshot = entry.clone();
1619 if entry.rpi_auction.is_some() {
1620 return Ok(HandleQpResponseResult::RpiQuoteRejected {
1621 record: snapshot,
1622 reason: RFQ_SELF_TRADE_REJECTION_REASON.to_string(),
1623 });
1624 }
1625 return Err(RFQ_SELF_TRADE_REJECTION_REASON.to_string());
1626 }
1627
1628 let rpi_auction = entry.rpi_auction.clone();
1629 if let Some(rpi) = rpi_auction.as_ref() {
1630 entry.responded_qps.insert(quote.qp_wallet);
1631 if let Err(reason) = validate_rpi_quote_shape(&entry.legs, "e) {
1632 let snapshot = entry.clone();
1633 return Ok(HandleQpResponseResult::RpiQuoteRejected {
1634 record: snapshot,
1635 reason,
1636 });
1637 }
1638 let first_quote_leg = quote
1639 .legs
1640 .first()
1641 .ok_or_else(|| "RPI quote missing quote leg".to_string())?;
1642 if !quote_satisfies_rpi_auction(
1643 first_quote_leg.side,
1644 rpi.limit_price,
1645 rpi.reference_price,
1646 first_quote_leg.price,
1647 rpi.min_tick,
1648 ) {
1649 let snapshot = entry.clone();
1650 let (_, reason) = rpi_quote_rejection(
1651 first_quote_leg.side,
1652 rpi.limit_price,
1653 rpi.reference_price,
1654 first_quote_leg.price,
1655 rpi.min_tick,
1656 );
1657 return Ok(HandleQpResponseResult::RpiQuoteRejected {
1658 record: snapshot,
1659 reason,
1660 });
1661 }
1662 } else {
1663 entry.responded_qps.insert(quote.qp_wallet);
1664 }
1665
1666 let quote_id = quote.quote_id;
1667 let quote_net_premium = quote.net_premium;
1668 entry.quotes.push(quote);
1669 let transitioned_to_quotes_received = entry.status == RfqStatus::SentToQps;
1670 if transitioned_to_quotes_received {
1671 entry.status = RfqStatus::QuotesReceived;
1672 }
1673
1674 if rpi_auction.is_some() {
1675 let snapshot = entry.clone();
1676 drop(entry);
1677 return Ok(HandleQpResponseResult::RpiQuoteStored(snapshot));
1678 }
1679
1680 let auto_accept_limit = entry.auto_accept_limit;
1681 let auto_execute = if let Some(limit) = auto_accept_limit {
1682 let within_limit = quote_satisfies_auto_accept_limit(
1683 &entry.legs,
1684 quote_net_premium,
1685 limit,
1686 self.config.min_improvement_tick(),
1687 )
1688 .unwrap_or(false);
1689
1690 if within_limit
1691 && (entry.status == RfqStatus::QuotesReceived
1692 || entry.status == RfqStatus::SentToQps)
1693 {
1694 entry.status = RfqStatus::Accepted;
1695 entry.accepted_quote_id = Some(quote_id);
1696 true
1697 } else {
1698 false
1699 }
1700 } else {
1701 false
1702 };
1703
1704 let snapshot = entry.clone();
1705 drop(entry);
1706
1707 if auto_execute {
1708 Ok(HandleQpResponseResult::AutoExecuted {
1709 record: snapshot,
1710 accepted_quote_id: quote_id,
1711 })
1712 } else if let Some(limit) = auto_accept_limit {
1713 Ok(HandleQpResponseResult::AutoExecuteSkipped {
1714 record: snapshot,
1715 quote_premium: quote_net_premium.abs(),
1716 limit,
1717 })
1718 } else {
1719 Ok(HandleQpResponseResult::QuoteStored(snapshot))
1720 }
1721 }
1722}
1723
1724#[cfg(test)]
1725mod tests {
1726 use super::*;
1727 use rust_decimal_macros::dec;
1728
1729 fn test_wallet(id: u8) -> WalletAddress {
1730 let mut bytes = [0u8; 20];
1731 bytes[19] = id;
1732 WalletAddress::from(alloy::primitives::Address::from(bytes))
1733 }
1734
1735 fn make_test_rfq(
1737 rfq_id: Uuid,
1738 taker_wallet: WalletAddress,
1739 auto_accept_limit: Option<Decimal>,
1740 ) -> RfqRecord {
1741 make_test_rfq_with_side(rfq_id, taker_wallet, auto_accept_limit, Side::Buy)
1742 }
1743
1744 fn make_test_rfq_with_side(
1745 rfq_id: Uuid,
1746 taker_wallet: WalletAddress,
1747 auto_accept_limit: Option<Decimal>,
1748 side: Side,
1749 ) -> RfqRecord {
1750 let now = current_time_ms();
1751 RfqRecord {
1752 rfq_id,
1753 taker_wallet,
1754 taker_signer: taker_wallet,
1755 builder_code_address: None,
1756 underlying: "ETH".to_string(),
1757 legs: vec![RfqLeg {
1758 instrument: "ETH-20260401-3000-C".to_string(),
1759 side,
1760 size: dec!(5),
1761 }],
1762 legs_hash: [0u8; 32],
1763 taker_signature: "0xtest_sig".to_string(),
1764 taker_nonce: 1,
1765 status: RfqStatus::SentToQps,
1766 quotes: Vec::new(),
1767 created_at: now,
1768 deadline_at: now + 30_000,
1769 expires_at: now + 60_000,
1770 auto_accept_limit,
1771 accepted_quote_id: None,
1772 execution_quote_id: None,
1773 execution_request_id: None,
1774 execution_fill_id: None,
1775 execution_timestamp_ms: None,
1776 eligible_qps: Vec::new(),
1777 responded_qps: HashSet::new(),
1778 rpi_auction: None,
1779 }
1780 }
1781
1782 fn make_test_quote(qp_wallet: WalletAddress, net_premium: Decimal) -> QpQuote {
1784 make_test_quote_with_leg(qp_wallet, net_premium, Side::Buy, dec!(150), dec!(5))
1785 }
1786
1787 fn make_test_quote_with_leg(
1788 qp_wallet: WalletAddress,
1789 net_premium: Decimal,
1790 side: Side,
1791 price: Decimal,
1792 size: Decimal,
1793 ) -> QpQuote {
1794 QpQuote {
1795 quote_id: Uuid::now_v7(),
1796 qp_wallet,
1797 legs: vec![QuoteLeg {
1798 instrument: "ETH-20260401-3000-C".to_string(),
1799 side,
1800 price,
1801 size,
1802 }],
1803 net_premium,
1804 valid_for_ms: 25_000,
1805 qp_signature: "0xqp_sig".to_string(),
1806 qp_nonce: 1,
1807 received_at: current_time_ms(),
1808 }
1809 }
1810
1811 fn make_test_rpi_rfq(rfq_id: Uuid, taker_wallet: WalletAddress, side: Side) -> RfqRecord {
1812 let mut record = make_test_rfq_with_side(rfq_id, taker_wallet, None, side);
1813 record.rpi_auction = Some(RpiAuctionContext {
1814 limit_price: dec!(100),
1815 reference_price: Some(dec!(100)),
1816 min_tick: dec!(0.0001),
1817 });
1818 record
1819 }
1820
1821 fn make_test_rpi_rfq_without_book_reference(
1822 rfq_id: Uuid,
1823 taker_wallet: WalletAddress,
1824 side: Side,
1825 ) -> RfqRecord {
1826 let mut record = make_test_rpi_rfq(rfq_id, taker_wallet, side);
1827 record
1828 .rpi_auction
1829 .as_mut()
1830 .expect("test RPI context should exist")
1831 .reference_price = None;
1832 record
1833 }
1834
1835 #[test]
1836 fn test_rfq_config_default() {
1837 let config = RfqConfig::default();
1838 assert_eq!(config.response_deadline_ms, 10_000);
1839 assert_eq!(config.rpi_auction_ms, 2_000);
1840 assert_eq!(config.min_improvement_ticks, 1);
1841 assert!(config.reveal_limit_to_qps);
1842 assert_eq!(config.max_valid_for_ms, 30_000);
1843 assert_eq!(config.rfq_lifetime_ms, 60_000);
1844 assert_eq!(config.max_legs, 10);
1845 }
1846
1847 #[test]
1848 fn test_rfq_record_lifecycle() {
1849 let record = RfqRecord {
1850 rfq_id: Uuid::now_v7(),
1851 taker_wallet: test_wallet(1),
1852 taker_signer: test_wallet(1),
1853 builder_code_address: None,
1854 underlying: "ETH".to_string(),
1855 legs: vec![RfqLeg {
1856 instrument: "ETH-20260401-3000-C".to_string(),
1857 side: Side::Buy,
1858 size: dec!(5),
1859 }],
1860 legs_hash: [0u8; 32],
1861 taker_signature: "0x123".to_string(),
1862 taker_nonce: 1,
1863 status: RfqStatus::Created,
1864 quotes: Vec::new(),
1865 created_at: current_time_ms(),
1866 deadline_at: current_time_ms() + 5000,
1867 expires_at: current_time_ms() + 60000,
1868 auto_accept_limit: None,
1869 accepted_quote_id: None,
1870 execution_quote_id: None,
1871 execution_request_id: None,
1872 execution_fill_id: None,
1873 execution_timestamp_ms: None,
1874 eligible_qps: Vec::new(),
1875 responded_qps: HashSet::new(),
1876 rpi_auction: None,
1877 };
1878
1879 assert_eq!(record.status, RfqStatus::Created);
1880 assert!(!record.status.is_terminal());
1881 }
1882
1883 #[test]
1884 fn test_rfq_execution_facts_stable_for_quote_retries() {
1885 let rfq_id = Uuid::now_v7();
1886 let quote_id = Uuid::now_v7();
1887 let other_quote_id = Uuid::now_v7();
1888 let mut record = make_test_rfq(rfq_id, test_wallet(1), None);
1889
1890 let first = record.execution_facts_for_quote(quote_id, 100);
1891 let retry = record.execution_facts_for_quote(quote_id, 200);
1892 assert_eq!(
1893 first, retry,
1894 "same accepted quote must reuse execution facts across retries"
1895 );
1896
1897 let other_quote = record.execution_facts_for_quote(other_quote_id, 300);
1898 assert_ne!(
1899 first.0, other_quote.0,
1900 "different accepted quotes must not share a request_id"
1901 );
1902 assert_ne!(
1903 first.1, other_quote.1,
1904 "different accepted quotes must not share a fill_id"
1905 );
1906 assert_eq!(other_quote.2, 300);
1907 }
1908
1909 #[test]
1910 fn test_auto_accept_rollback_clears_execution_selection() {
1911 let rfq_id = Uuid::now_v7();
1912 let quote_id = Uuid::now_v7();
1913 let mut record = make_test_rfq(rfq_id, test_wallet(1), Some(dec!(5.0)));
1914
1915 record.status = RfqStatus::Accepted;
1916 record.accepted_quote_id = Some(quote_id);
1917 let _ = record.execution_facts_for_quote(quote_id, 100);
1918
1919 record.revert_auto_accept_to_quotes_received();
1920
1921 assert_eq!(record.status, RfqStatus::QuotesReceived);
1922 assert_eq!(record.auto_accept_limit, None);
1923 assert_eq!(record.accepted_quote_id, None);
1924 assert_eq!(record.execution_quote_id, None);
1925 assert_eq!(record.execution_request_id, None);
1926 assert_eq!(record.execution_fill_id, None);
1927 assert_eq!(record.execution_timestamp_ms, None);
1928 }
1929
1930 #[test]
1931 fn test_rfq_status_terminal() {
1932 assert!(!RfqStatus::Created.is_terminal());
1933 assert!(!RfqStatus::SentToQps.is_terminal());
1934 assert!(!RfqStatus::QuotesReceived.is_terminal());
1935 assert!(!RfqStatus::Accepted.is_terminal());
1936 assert!(RfqStatus::NoQuotes.is_terminal());
1937 assert!(RfqStatus::Expired.is_terminal());
1938 assert!(RfqStatus::Executed.is_terminal());
1939 assert!(RfqStatus::Failed.is_terminal());
1940 }
1941
1942 #[test]
1943 fn test_auto_execute_rfq() {
1944 let manager = TestRfqManager::new();
1945
1946 let rfq_id = Uuid::now_v7();
1947 let taker = test_wallet(1);
1948 let qp = test_wallet(2);
1949
1950 let record = make_test_rfq(rfq_id, taker, Some(dec!(5.0)));
1952 manager.insert_rfq(record);
1953
1954 let quote = make_test_quote(qp, dec!(-3.0));
1957 let quote_id = quote.quote_id;
1958
1959 let result = manager.handle_qp_response(rfq_id, quote).unwrap();
1960
1961 match result {
1963 HandleQpResponseResult::AutoExecuted {
1964 record,
1965 accepted_quote_id,
1966 } => {
1967 assert_eq!(accepted_quote_id, quote_id);
1968 assert_eq!(record.status, RfqStatus::Accepted);
1969 }
1970 HandleQpResponseResult::QuoteStored(_) => {
1971 panic!("Expected AutoExecuted, got QuoteStored");
1972 }
1973 HandleQpResponseResult::AutoExecuteSkipped { .. } => {
1974 panic!("Expected AutoExecuted, got AutoExecuteSkipped");
1975 }
1976 HandleQpResponseResult::RpiQuoteStored(_)
1977 | HandleQpResponseResult::RpiQuoteRejected { .. } => {
1978 panic!("Expected AutoExecuted, got RPI result");
1979 }
1980 }
1981
1982 let stored = manager.get_rfq(&rfq_id).unwrap();
1984 assert_eq!(stored.status, RfqStatus::Accepted);
1985
1986 assert_eq!(stored.quotes.len(), 1);
1990 assert_eq!(stored.quotes[0].quote_id, quote_id);
1991 }
1992
1993 #[test]
1994 fn test_manual_rfq_rejects_self_trade_quote() {
1995 let manager = TestRfqManager::new();
1996
1997 let rfq_id = Uuid::now_v7();
1998 let taker = test_wallet(1);
1999
2000 manager.insert_rfq(make_test_rfq(rfq_id, taker, None));
2001
2002 let result = manager.handle_qp_response(rfq_id, make_test_quote(taker, dec!(-3.0)));
2003
2004 assert_eq!(
2005 result.unwrap_err(),
2006 RFQ_SELF_TRADE_REJECTION_REASON.to_string()
2007 );
2008 let stored = manager.get_rfq(&rfq_id).unwrap();
2009 assert_eq!(stored.status, RfqStatus::SentToQps);
2010 assert_eq!(stored.quotes.len(), 0);
2011 assert!(stored.responded_qps.contains(&taker));
2012 }
2013
2014 #[test]
2015 fn test_auto_execute_rfq_rejects_self_trade_quote() {
2016 let manager = TestRfqManager::new();
2017
2018 let rfq_id = Uuid::now_v7();
2019 let taker = test_wallet(1);
2020
2021 manager.insert_rfq(make_test_rfq(rfq_id, taker, Some(dec!(5.0))));
2022
2023 let result = manager.handle_qp_response(rfq_id, make_test_quote(taker, dec!(-3.0)));
2024
2025 assert_eq!(
2026 result.unwrap_err(),
2027 RFQ_SELF_TRADE_REJECTION_REASON.to_string()
2028 );
2029 let stored = manager.get_rfq(&rfq_id).unwrap();
2030 assert_eq!(stored.status, RfqStatus::SentToQps);
2031 assert_eq!(stored.accepted_quote_id, None);
2032 assert_eq!(stored.quotes.len(), 0);
2033 assert!(stored.responded_qps.contains(&taker));
2034 }
2035
2036 #[test]
2037 fn test_auto_execute_rfq_outside_limit() {
2038 let manager = TestRfqManager::new();
2039
2040 let rfq_id = Uuid::now_v7();
2041 let taker = test_wallet(1);
2042 let qp = test_wallet(2);
2043
2044 let record = make_test_rfq(rfq_id, taker, Some(dec!(2.0)));
2046 manager.insert_rfq(record);
2047
2048 let quote = make_test_quote(qp, dec!(-3.0));
2050
2051 let result = manager.handle_qp_response(rfq_id, quote).unwrap();
2052
2053 match result {
2055 HandleQpResponseResult::AutoExecuteSkipped {
2056 record,
2057 quote_premium,
2058 limit,
2059 } => {
2060 assert_eq!(record.status, RfqStatus::QuotesReceived);
2061 assert_eq!(record.quotes.len(), 1);
2062 assert_eq!(quote_premium, dec!(3.0));
2063 assert_eq!(limit, dec!(2.0));
2064 }
2065 HandleQpResponseResult::QuoteStored(_) => {
2066 panic!("Expected AutoExecuteSkipped, got QuoteStored");
2067 }
2068 HandleQpResponseResult::AutoExecuted { .. } => {
2069 panic!("Expected AutoExecuteSkipped, got AutoExecuted");
2070 }
2071 HandleQpResponseResult::RpiQuoteStored(_)
2072 | HandleQpResponseResult::RpiQuoteRejected { .. } => {
2073 panic!("Expected AutoExecuteSkipped, got RPI result");
2074 }
2075 }
2076
2077 let stored = manager.get_rfq(&rfq_id).unwrap();
2079 assert_eq!(stored.status, RfqStatus::QuotesReceived);
2080 }
2081
2082 #[test]
2083 fn test_auto_execute_prevents_double_fill() {
2084 let manager = TestRfqManager::new();
2085
2086 let rfq_id = Uuid::now_v7();
2087 let taker = test_wallet(1);
2088 let qp1 = test_wallet(2);
2089 let qp2 = test_wallet(3);
2090
2091 let record = make_test_rfq(rfq_id, taker, Some(dec!(5.0)));
2093 manager.insert_rfq(record);
2094
2095 let quote1 = make_test_quote(qp1, dec!(-3.0));
2097 let result1 = manager.handle_qp_response(rfq_id, quote1).unwrap();
2098 assert!(
2099 matches!(result1, HandleQpResponseResult::AutoExecuted { .. }),
2100 "First quote should auto-execute"
2101 );
2102
2103 let quote2 = make_test_quote(qp2, dec!(-2.0));
2105 let result2 = manager.handle_qp_response(rfq_id, quote2);
2106 assert!(
2107 result2.is_err(),
2108 "Second quote should be rejected because RFQ is already Accepted"
2109 );
2110 assert!(
2111 result2.unwrap_err().contains("invalid state for quotes"),
2112 "Error should indicate invalid state"
2113 );
2114 }
2115
2116 #[test]
2117 fn test_auto_execute_sell_quote_below_limit_is_skipped() {
2118 let manager = TestRfqManager::new();
2119
2120 let rfq_id = Uuid::now_v7();
2121 let taker = test_wallet(1);
2122 let qp = test_wallet(2);
2123
2124 let record = make_test_rfq_with_side(rfq_id, taker, Some(dec!(1.9553)), Side::Sell);
2125 manager.insert_rfq(record);
2126
2127 let quote = make_test_quote(qp, dec!(1.0));
2128 let result = manager.handle_qp_response(rfq_id, quote).unwrap();
2129
2130 match result {
2131 HandleQpResponseResult::AutoExecuteSkipped {
2132 record,
2133 quote_premium,
2134 limit,
2135 } => {
2136 assert_eq!(record.status, RfqStatus::QuotesReceived);
2137 assert_eq!(quote_premium, dec!(1.0));
2138 assert_eq!(limit, dec!(1.9553));
2139 }
2140 HandleQpResponseResult::QuoteStored(_) => {
2141 panic!("Expected AutoExecuteSkipped, got QuoteStored");
2142 }
2143 HandleQpResponseResult::AutoExecuted { .. } => {
2144 panic!("Sell quote below limit must not auto-execute");
2145 }
2146 HandleQpResponseResult::RpiQuoteStored(_)
2147 | HandleQpResponseResult::RpiQuoteRejected { .. } => {
2148 panic!("Expected AutoExecuteSkipped, got RPI result");
2149 }
2150 }
2151 }
2152
2153 #[test]
2154 fn test_auto_execute_sell_quote_at_or_above_limit_executes() {
2155 let manager = TestRfqManager::new();
2156
2157 let rfq_id = Uuid::now_v7();
2158 let taker = test_wallet(1);
2159 let qp = test_wallet(2);
2160
2161 let record = make_test_rfq_with_side(rfq_id, taker, Some(dec!(1.9553)), Side::Sell);
2162 manager.insert_rfq(record);
2163
2164 let quote = make_test_quote(qp, dec!(1.96));
2165 let quote_id = quote.quote_id;
2166 let result = manager.handle_qp_response(rfq_id, quote).unwrap();
2167
2168 match result {
2169 HandleQpResponseResult::AutoExecuted {
2170 record,
2171 accepted_quote_id,
2172 } => {
2173 assert_eq!(accepted_quote_id, quote_id);
2174 assert_eq!(record.status, RfqStatus::Accepted);
2175 }
2176 HandleQpResponseResult::QuoteStored(_) => {
2177 panic!("Expected AutoExecuted, got QuoteStored");
2178 }
2179 HandleQpResponseResult::AutoExecuteSkipped { .. } => {
2180 panic!("Expected AutoExecuted, got AutoExecuteSkipped");
2181 }
2182 HandleQpResponseResult::RpiQuoteStored(_)
2183 | HandleQpResponseResult::RpiQuoteRejected { .. } => {
2184 panic!("Expected AutoExecuted, got RPI result");
2185 }
2186 }
2187 }
2188
2189 #[test]
2190 fn test_auto_execute_mixed_side_limit_is_rejected() {
2191 let legs = vec![
2192 RfqLeg {
2193 instrument: "ETH-20260401-3000-C".to_string(),
2194 side: Side::Buy,
2195 size: dec!(1),
2196 },
2197 RfqLeg {
2198 instrument: "ETH-20260401-3000-P".to_string(),
2199 side: Side::Sell,
2200 size: dec!(1),
2201 },
2202 ];
2203
2204 let err = quote_satisfies_auto_accept_limit(&legs, dec!(1), dec!(1), dec!(0.0001))
2205 .expect_err("mixed-side auto-execute RFQ should be rejected");
2206
2207 assert!(err.contains("mixed-side packages require explicit quote acceptance"));
2208 }
2209
2210 #[test]
2211 fn test_rpi_buy_accepts_limit_without_book_reference() {
2212 assert!(quote_qualifies_for_rpi(
2213 Side::Buy,
2214 dec!(100),
2215 None,
2216 dec!(100),
2217 dec!(0.0001),
2218 ));
2219 assert!(!quote_qualifies_for_rpi(
2220 Side::Buy,
2221 dec!(100),
2222 None,
2223 dec!(100.0001),
2224 dec!(0.0001),
2225 ));
2226 }
2227
2228 #[test]
2229 fn test_rpi_price_improvement_buy_beats_executable_book() {
2230 assert!(quote_qualifies_for_rpi(
2231 Side::Buy,
2232 dec!(105),
2233 Some(dec!(100)),
2234 dec!(99.9999),
2235 dec!(0.0001),
2236 ));
2237 assert!(!quote_qualifies_for_rpi(
2238 Side::Buy,
2239 dec!(105),
2240 Some(dec!(100)),
2241 dec!(104.9999),
2242 dec!(0.0001),
2243 ));
2244 }
2245
2246 #[test]
2247 fn test_rpi_price_improvement_sell_beats_executable_book() {
2248 assert!(quote_qualifies_for_rpi(
2249 Side::Sell,
2250 dec!(95),
2251 Some(dec!(100)),
2252 dec!(100.0001),
2253 dec!(0.0001),
2254 ));
2255 assert!(!quote_qualifies_for_rpi(
2256 Side::Sell,
2257 dec!(95),
2258 Some(dec!(100)),
2259 dec!(95.0001),
2260 dec!(0.0001),
2261 ));
2262 }
2263
2264 #[test]
2265 fn test_rpi_rejection_prefers_limit_reason_over_reference_reason() {
2266 let (label, reason) = rpi_quote_rejection(
2267 Side::Buy,
2268 dec!(100),
2269 Some(dec!(99)),
2270 dec!(100.0001),
2271 dec!(0.0001),
2272 );
2273
2274 assert_eq!(label, "outside_limit");
2275 assert!(reason.contains("does not satisfy limit"));
2276 }
2277
2278 #[test]
2279 fn test_rpi_rejection_reports_missing_price_improvement() {
2280 let (label, reason) = rpi_quote_rejection(
2281 Side::Buy,
2282 dec!(105),
2283 Some(dec!(100)),
2284 dec!(100),
2285 dec!(0.0001),
2286 );
2287
2288 assert_eq!(label, "no_price_improvement");
2289 assert!(reason.contains("does not improve reference"));
2290 }
2291
2292 #[test]
2293 fn test_rpi_quote_rejects_price_without_improvement() {
2294 let manager = TestRfqManager::new();
2295 let rfq_id = Uuid::now_v7();
2296 let taker = test_wallet(1);
2297 let qp = test_wallet(2);
2298
2299 manager.insert_rfq(make_test_rpi_rfq(rfq_id, taker, Side::Buy));
2300
2301 let quote = make_test_quote_with_leg(qp, dec!(-500), Side::Buy, dec!(100), dec!(5));
2302 let result = manager.handle_qp_response(rfq_id, quote).unwrap();
2303
2304 match result {
2305 HandleQpResponseResult::RpiQuoteRejected { record, reason } => {
2306 assert!(reason.contains("does not improve reference"));
2307 assert_eq!(record.quotes.len(), 0);
2308 assert!(record.responded_qps.contains(&qp));
2309 }
2310 other => panic!("Expected RpiQuoteRejected, got {:?}", other),
2311 }
2312 }
2313
2314 #[test]
2315 fn test_rpi_quote_stores_limit_price_when_no_book_reference() {
2316 let manager = TestRfqManager::new();
2317 let rfq_id = Uuid::now_v7();
2318 let taker = test_wallet(1);
2319 let qp = test_wallet(2);
2320
2321 manager.insert_rfq(make_test_rpi_rfq_without_book_reference(
2322 rfq_id,
2323 taker,
2324 Side::Buy,
2325 ));
2326
2327 let quote = make_test_quote_with_leg(qp, dec!(-500), Side::Buy, dec!(100), dec!(5));
2328 let quote_id = quote.quote_id;
2329 let result = manager.handle_qp_response(rfq_id, quote).unwrap();
2330
2331 match result {
2332 HandleQpResponseResult::RpiQuoteStored(record) => {
2333 assert_eq!(record.status, RfqStatus::QuotesReceived);
2334 assert_eq!(record.quotes.len(), 1);
2335 assert_eq!(record.quotes[0].quote_id, quote_id);
2336 assert!(record.responded_qps.contains(&qp));
2337 }
2338 other => panic!("Expected RpiQuoteStored, got {:?}", other),
2339 }
2340 }
2341
2342 #[test]
2343 fn test_rpi_quote_rejects_price_outside_limit_without_book_reference() {
2344 let manager = TestRfqManager::new();
2345 let rfq_id = Uuid::now_v7();
2346 let taker = test_wallet(1);
2347 let qp = test_wallet(2);
2348
2349 manager.insert_rfq(make_test_rpi_rfq_without_book_reference(
2350 rfq_id,
2351 taker,
2352 Side::Buy,
2353 ));
2354
2355 let quote =
2356 make_test_quote_with_leg(qp, dec!(-500.0005), Side::Buy, dec!(100.0001), dec!(5));
2357 let result = manager.handle_qp_response(rfq_id, quote).unwrap();
2358
2359 match result {
2360 HandleQpResponseResult::RpiQuoteRejected { record, reason } => {
2361 assert!(reason.contains("does not satisfy limit"));
2362 assert_eq!(record.quotes.len(), 0);
2363 assert!(record.responded_qps.contains(&qp));
2364 }
2365 other => panic!("Expected RpiQuoteRejected, got {:?}", other),
2366 }
2367 }
2368
2369 #[test]
2370 fn test_rpi_auction_close_boundary_allows_deadline_millisecond() {
2371 let rfq_id = Uuid::now_v7();
2372 let taker = test_wallet(1);
2373 let mut record = make_test_rpi_rfq(rfq_id, taker, Side::Buy);
2374 record.eligible_qps = vec![test_wallet(2)];
2375 record.deadline_at = 1_000;
2376
2377 assert!(!should_close_rpi_auction(&record, 999));
2378 assert!(!should_close_rpi_auction(&record, 1_000));
2379 assert!(should_close_rpi_auction(&record, 1_001));
2380 }
2381
2382 #[test]
2383 fn test_rpi_quote_rejects_self_trade_and_counts_response() {
2384 let manager = TestRfqManager::new();
2385 let rfq_id = Uuid::now_v7();
2386 let taker = test_wallet(1);
2387
2388 manager.insert_rfq(make_test_rpi_rfq(rfq_id, taker, Side::Buy));
2389
2390 let quote =
2391 make_test_quote_with_leg(taker, dec!(-499.9995), Side::Buy, dec!(99.9999), dec!(5));
2392 let result = manager.handle_qp_response(rfq_id, quote).unwrap();
2393
2394 match result {
2395 HandleQpResponseResult::RpiQuoteRejected { record, reason } => {
2396 assert_eq!(reason, RFQ_SELF_TRADE_REJECTION_REASON);
2397 assert_eq!(record.status, RfqStatus::SentToQps);
2398 assert_eq!(record.quotes.len(), 0);
2399 assert!(record.responded_qps.contains(&taker));
2400 }
2401 other => panic!("Expected RpiQuoteRejected, got {:?}", other),
2402 }
2403 }
2404
2405 #[test]
2406 fn test_rpi_quote_rejects_partial_size() {
2407 let manager = TestRfqManager::new();
2408 let rfq_id = Uuid::now_v7();
2409 let taker = test_wallet(1);
2410 let qp = test_wallet(2);
2411
2412 manager.insert_rfq(make_test_rpi_rfq(rfq_id, taker, Side::Buy));
2413
2414 let quote =
2415 make_test_quote_with_leg(qp, dec!(-499.9995), Side::Buy, dec!(99.9999), dec!(4));
2416 let result = manager.handle_qp_response(rfq_id, quote).unwrap();
2417
2418 match result {
2419 HandleQpResponseResult::RpiQuoteRejected { record, reason } => {
2420 assert!(reason.contains("does not fully cover requested size"));
2421 assert_eq!(record.quotes.len(), 0);
2422 assert!(record.responded_qps.contains(&qp));
2423 }
2424 other => panic!("Expected RpiQuoteRejected, got {:?}", other),
2425 }
2426 }
2427
2428 #[test]
2429 fn test_rpi_quote_stores_full_size_price_improved_candidate() {
2430 let manager = TestRfqManager::new();
2431 let rfq_id = Uuid::now_v7();
2432 let taker = test_wallet(1);
2433 let qp = test_wallet(2);
2434
2435 manager.insert_rfq(make_test_rpi_rfq(rfq_id, taker, Side::Buy));
2436
2437 let quote =
2438 make_test_quote_with_leg(qp, dec!(-499.9995), Side::Buy, dec!(99.9999), dec!(5));
2439 let quote_id = quote.quote_id;
2440 let result = manager.handle_qp_response(rfq_id, quote).unwrap();
2441
2442 match result {
2443 HandleQpResponseResult::RpiQuoteStored(record) => {
2444 assert_eq!(record.status, RfqStatus::QuotesReceived);
2445 assert_eq!(record.quotes.len(), 1);
2446 assert_eq!(record.quotes[0].quote_id, quote_id);
2447 assert!(record.responded_qps.contains(&qp));
2448 }
2449 other => panic!("Expected RpiQuoteStored, got {:?}", other),
2450 }
2451 }
2452
2453 #[test]
2454 fn test_rpi_candidate_sort_buy_best_price_then_arrival() {
2455 let rfq_id = Uuid::now_v7();
2456 let taker = test_wallet(1);
2457 let mut record = make_test_rpi_rfq(rfq_id, taker, Side::Buy);
2458 let older_best = Uuid::now_v7();
2459 let newer_best = Uuid::now_v7();
2460 let worse = Uuid::now_v7();
2461 record.quotes = vec![
2462 QpQuote {
2463 quote_id: worse,
2464 qp_wallet: test_wallet(2),
2465 legs: vec![QuoteLeg {
2466 instrument: "ETH-20260401-3000-C".to_string(),
2467 side: Side::Buy,
2468 price: dec!(99.90),
2469 size: dec!(5),
2470 }],
2471 net_premium: dec!(-499.50),
2472 valid_for_ms: 25_000,
2473 qp_signature: "0xqp_sig".to_string(),
2474 qp_nonce: 1,
2475 received_at: 3,
2476 },
2477 QpQuote {
2478 quote_id: newer_best,
2479 qp_wallet: test_wallet(3),
2480 legs: vec![QuoteLeg {
2481 instrument: "ETH-20260401-3000-C".to_string(),
2482 side: Side::Buy,
2483 price: dec!(99.80),
2484 size: dec!(5),
2485 }],
2486 net_premium: dec!(-499),
2487 valid_for_ms: 25_000,
2488 qp_signature: "0xqp_sig".to_string(),
2489 qp_nonce: 1,
2490 received_at: 2,
2491 },
2492 QpQuote {
2493 quote_id: older_best,
2494 qp_wallet: test_wallet(4),
2495 legs: vec![QuoteLeg {
2496 instrument: "ETH-20260401-3000-C".to_string(),
2497 side: Side::Buy,
2498 price: dec!(99.80),
2499 size: dec!(5),
2500 }],
2501 net_premium: dec!(-499),
2502 valid_for_ms: 25_000,
2503 qp_signature: "0xqp_sig".to_string(),
2504 qp_nonce: 1,
2505 received_at: 1,
2506 },
2507 ];
2508
2509 let sorted = sorted_rpi_candidate_quote_ids(&record).unwrap();
2510
2511 assert_eq!(sorted, vec![older_best, newer_best, worse]);
2512 }
2513}