Skip to main content

hypercall_api/rfq/
rfq_manager.rs

1use crate::rfq::quote_provider_cache::QuoteProviderCache;
2use dashmap::DashMap;
3use hypercall_db::RfqWriter;
4use hypercall_runtime_api::{
5    RfqExecuteCommand, RfqExecuteLeg, RfqExecuteRequest, RfqExecuteResult,
6};
7use hypercall_types::utils::get_timestamp_millis as current_time_ms;
8use hypercall_types::{RfqStatus, Side, WalletAddress, RFQ_SELF_TRADE_REJECTION_REASON};
9use hypercall_ws_protocol::{QpRfqLeg, QpServerMessage as QpOutboundMessage};
10use metrics::{counter, histogram};
11use rust_decimal::Decimal;
12use std::collections::HashSet;
13use std::sync::Arc;
14use std::time::Duration;
15use tokio::sync::mpsc;
16use tracing::{info, warn};
17use uuid::Uuid;
18
19pub trait QpSessionRegistry: Send + Sync {
20    fn send_rfq_to_eligible(&self, eligible_wallets: &[WalletAddress], message: QpOutboundMessage);
21
22    fn get_connected_wallets(&self) -> Vec<WalletAddress>;
23}
24
25/// Testnet option premium tick used for the first RFQ-RPI cutover.
26/// Public instrument metadata currently exposes `tick_size = 0.0001`.
27const OPTION_PREMIUM_TICK_SCALE: u32 = 4;
28
29fn is_nonce_rejection_reason(reason: &str) -> bool {
30    reason.starts_with("nonce ")
31}
32
33/// Configuration for the RFQ system.
34#[derive(Debug, Clone)]
35pub struct RfqConfig {
36    pub response_deadline_ms: u64,
37    pub rpi_auction_ms: u64,
38    pub min_improvement_ticks: u32,
39    pub reveal_limit_to_qps: bool,
40    pub max_valid_for_ms: u64,
41    pub rfq_lifetime_ms: u64,
42    pub max_legs: usize,
43}
44
45impl Default for RfqConfig {
46    fn default() -> Self {
47        Self {
48            response_deadline_ms: 10_000,
49            rpi_auction_ms: 2_000,
50            min_improvement_ticks: 1,
51            reveal_limit_to_qps: true,
52            max_valid_for_ms: 30_000,
53            rfq_lifetime_ms: 60_000,
54            max_legs: 10,
55        }
56    }
57}
58
59impl RfqConfig {
60    pub fn min_improvement_tick(&self) -> Decimal {
61        Decimal::new(
62            i64::from(self.min_improvement_ticks),
63            OPTION_PREMIUM_TICK_SCALE,
64        )
65    }
66}
67
68/// A single leg in an RFQ.
69#[derive(Debug, Clone)]
70pub struct RfqLeg {
71    pub instrument: String,
72    pub side: Side,
73    pub size: Decimal,
74}
75
76#[derive(Debug, Clone, Copy, PartialEq, Eq)]
77enum AutoAcceptLimitMode {
78    DebitCap,
79    CreditFloor,
80}
81
82fn auto_accept_limit_mode(legs: &[RfqLeg]) -> Result<AutoAcceptLimitMode, String> {
83    let first_side = legs
84        .first()
85        .ok_or_else(|| "RFQ must have at least one leg".to_string())?
86        .side;
87
88    if legs.iter().any(|leg| leg.side != first_side) {
89        return Err(
90            "Auto-execute RFQ requires all legs on the same side; mixed-side packages require explicit quote acceptance"
91                .to_string(),
92        );
93    }
94
95    Ok(match first_side {
96        Side::Buy => AutoAcceptLimitMode::DebitCap,
97        Side::Sell => AutoAcceptLimitMode::CreditFloor,
98    })
99}
100
101fn quote_satisfies_auto_accept_limit(
102    legs: &[RfqLeg],
103    quote_net_premium: Decimal,
104    limit: Decimal,
105    _min_tick: Decimal,
106) -> Result<bool, String> {
107    if limit <= Decimal::ZERO {
108        return Err("auto_accept_limit must be positive".to_string());
109    }
110
111    match auto_accept_limit_mode(legs)? {
112        // Taker buys encode as negative premium. The limit is the maximum
113        // debit they authorized. Standard auto-execute RFQ accepts quotes at
114        // the exact signed limit; RPI price improvement is checked separately.
115        AutoAcceptLimitMode::DebitCap => Ok(quote_net_premium >= -limit),
116        // Taker sells encode as positive premium. The limit is the minimum
117        // credit they authorized. Standard auto-execute RFQ accepts quotes at
118        // the exact signed limit; RPI price improvement is checked separately.
119        AutoAcceptLimitMode::CreditFloor => Ok(quote_net_premium >= limit),
120    }
121}
122
123pub fn quote_qualifies_for_rpi(
124    side: Side,
125    limit_price: Decimal,
126    best_executable_book_price: Option<Decimal>,
127    quote_price: Decimal,
128    min_tick: Decimal,
129) -> bool {
130    if limit_price <= Decimal::ZERO || quote_price <= Decimal::ZERO || min_tick <= Decimal::ZERO {
131        return false;
132    }
133
134    let satisfies_limit = match side {
135        Side::Buy => quote_price <= limit_price,
136        Side::Sell => quote_price >= limit_price,
137    };
138    if !satisfies_limit {
139        return false;
140    }
141
142    match (side, best_executable_book_price) {
143        (Side::Buy, Some(best_ask)) => quote_price <= best_ask - min_tick,
144        (Side::Sell, Some(best_bid)) => quote_price >= best_bid + min_tick,
145        (_, None) => true,
146    }
147}
148
149fn quote_satisfies_rpi_auction(
150    side: Side,
151    limit_price: Decimal,
152    reference_price: Option<Decimal>,
153    quote_price: Decimal,
154    min_tick: Decimal,
155) -> bool {
156    quote_qualifies_for_rpi(side, limit_price, reference_price, quote_price, min_tick)
157}
158
159fn rpi_quote_rejection(
160    side: Side,
161    limit_price: Decimal,
162    reference_price: Option<Decimal>,
163    quote_price: Decimal,
164    min_tick: Decimal,
165) -> (&'static str, String) {
166    if !quote_qualifies_for_rpi(side, limit_price, None, quote_price, min_tick) {
167        return (
168            "outside_limit",
169            format!(
170                "quote price {} does not satisfy limit {}",
171                quote_price, limit_price
172            ),
173        );
174    }
175
176    if let Some(reference_price) = reference_price {
177        return (
178            "no_price_improvement",
179            format!(
180                "quote price {} does not improve reference {} by tick {}",
181                quote_price, reference_price, min_tick
182            ),
183        );
184    }
185
186    (
187        "outside_limit",
188        format!(
189            "quote price {} does not satisfy limit {}",
190            quote_price, limit_price
191        ),
192    )
193}
194
195fn validate_rpi_quote_shape(requested_legs: &[RfqLeg], quote: &QpQuote) -> Result<(), String> {
196    if quote.legs.len() != requested_legs.len() {
197        return Err("RPI quote must cover the full requested leg set".to_string());
198    }
199
200    for (requested, quoted) in requested_legs.iter().zip(&quote.legs) {
201        if quoted.instrument != requested.instrument {
202            return Err(format!(
203                "RPI quote instrument {} does not match requested {}",
204                quoted.instrument, requested.instrument
205            ));
206        }
207        if quoted.side != requested.side {
208            return Err(format!(
209                "RPI quote side {:?} does not match requested {:?}",
210                quoted.side, requested.side
211            ));
212        }
213        if quoted.size != requested.size {
214            return Err(format!(
215                "RPI quote size {} does not fully cover requested size {}",
216                quoted.size, requested.size
217            ));
218        }
219    }
220
221    Ok(())
222}
223
224fn sorted_rpi_candidate_quote_ids(record: &RfqRecord) -> Result<Vec<Uuid>, String> {
225    if record.rpi_auction.is_none() {
226        return Err("RFQ is not an RPI auction".to_string());
227    }
228    let side = record
229        .legs
230        .first()
231        .ok_or_else(|| "RPI RFQ missing leg".to_string())?
232        .side;
233
234    let mut quotes = record.quotes.clone();
235    quotes.sort_by(|a, b| {
236        let a_price = a.legs.first().map(|leg| leg.price).unwrap_or(Decimal::ZERO);
237        let b_price = b.legs.first().map(|leg| leg.price).unwrap_or(Decimal::ZERO);
238        let price_order = match side {
239            Side::Buy => a_price.cmp(&b_price),
240            Side::Sell => b_price.cmp(&a_price),
241        };
242        price_order.then_with(|| a.received_at.cmp(&b.received_at))
243    });
244
245    Ok(quotes.into_iter().map(|q| q.quote_id).collect())
246}
247
248fn should_close_rpi_auction(record: &RfqRecord, now_ms: u64) -> bool {
249    let all_qps_responded = !record.eligible_qps.is_empty()
250        && record
251            .eligible_qps
252            .iter()
253            .all(|wallet| record.responded_qps.contains(wallet));
254    let no_qps = record.eligible_qps.is_empty();
255    let timed_out = now_ms > record.deadline_at;
256
257    no_qps || all_qps_responded || timed_out || record.status.is_terminal()
258}
259
260/// A firm quote from a QP.
261#[derive(Debug, Clone)]
262pub struct QpQuote {
263    pub quote_id: Uuid,
264    pub qp_wallet: WalletAddress,
265    pub legs: Vec<QuoteLeg>,
266    pub net_premium: Decimal,
267    pub valid_for_ms: u64,
268    pub qp_signature: String,
269    pub qp_nonce: u64,
270    pub received_at: u64,
271}
272
273#[derive(Debug, Clone)]
274pub struct QuoteLeg {
275    pub instrument: String,
276    pub side: Side,
277    pub price: Decimal,
278    pub size: Decimal,
279}
280
281#[derive(Debug, Clone)]
282pub struct RpiAuctionContext {
283    pub limit_price: Decimal,
284    pub reference_price: Option<Decimal>,
285    pub min_tick: Decimal,
286}
287
288#[derive(Debug, Clone)]
289pub struct SubmitRpiAuction {
290    pub rfq_id: Uuid,
291    pub taker_wallet: WalletAddress,
292    pub taker_signer: WalletAddress,
293    pub builder_code_address: Option<WalletAddress>,
294    pub legs: Vec<RfqLeg>,
295    pub legs_hash: [u8; 32],
296    pub taker_signature: String,
297    pub taker_nonce: u64,
298    pub limit_price: Decimal,
299    pub reference_price: Option<Decimal>,
300    pub min_tick: Decimal,
301}
302
303/// In-memory RFQ record tracking the full lifecycle.
304#[derive(Debug, Clone)]
305pub struct RfqRecord {
306    pub rfq_id: Uuid,
307    pub taker_wallet: WalletAddress,
308    pub taker_signer: WalletAddress,
309    pub builder_code_address: Option<WalletAddress>,
310    pub underlying: String,
311    pub legs: Vec<RfqLeg>,
312    pub legs_hash: [u8; 32],
313    pub taker_signature: String,
314    pub taker_nonce: u64,
315    pub status: RfqStatus,
316    pub quotes: Vec<QpQuote>,
317    pub created_at: u64,
318    pub deadline_at: u64,
319    pub expires_at: u64,
320    /// When set, the first quote satisfying the taker's directional limit
321    /// triggers immediate auto-execution without a separate accept RTT.
322    /// Buy RFQs use this as a max debit. Sell RFQs use it as a min credit.
323    pub auto_accept_limit: Option<Decimal>,
324    /// The quote that won the RFQ (set on accept or auto-accept).
325    /// Used to reliably identify the winning quote for late-arriving
326    /// QPs, instead of relying on `quotes.last()`.
327    pub accepted_quote_id: Option<Uuid>,
328    pub execution_quote_id: Option<Uuid>,
329    pub execution_request_id: Option<String>,
330    pub execution_fill_id: Option<String>,
331    pub execution_timestamp_ms: Option<u64>,
332    pub eligible_qps: Vec<WalletAddress>,
333    pub responded_qps: HashSet<WalletAddress>,
334    pub rpi_auction: Option<RpiAuctionContext>,
335}
336
337impl RfqRecord {
338    fn execution_facts_for_quote(
339        &mut self,
340        quote_id: Uuid,
341        timestamp_ms: u64,
342    ) -> (String, String, u64) {
343        if self.execution_quote_id == Some(quote_id) {
344            if let (Some(request_id), Some(fill_id), Some(stored_timestamp_ms)) = (
345                &self.execution_request_id,
346                &self.execution_fill_id,
347                self.execution_timestamp_ms,
348            ) {
349                return (request_id.clone(), fill_id.clone(), stored_timestamp_ms);
350            }
351        }
352
353        let request_id = deterministic_rfq_execution_uuid("request", self.rfq_id, quote_id);
354        let fill_id = deterministic_rfq_execution_uuid("fill", self.rfq_id, quote_id);
355        self.execution_quote_id = Some(quote_id);
356        self.execution_request_id = Some(request_id.clone());
357        self.execution_fill_id = Some(fill_id.clone());
358        self.execution_timestamp_ms = Some(timestamp_ms);
359        (request_id, fill_id, timestamp_ms)
360    }
361
362    fn clear_execution_selection(&mut self) {
363        self.accepted_quote_id = None;
364        self.execution_quote_id = None;
365        self.execution_request_id = None;
366        self.execution_fill_id = None;
367        self.execution_timestamp_ms = None;
368    }
369
370    fn revert_auto_accept_to_quotes_received(&mut self) {
371        self.status = RfqStatus::QuotesReceived;
372        self.clear_execution_selection();
373        self.auto_accept_limit = None;
374    }
375}
376
377fn deterministic_rfq_execution_uuid(purpose: &str, rfq_id: Uuid, quote_id: Uuid) -> String {
378    use sha3::{Digest, Sha3_256};
379
380    let mut hasher = Sha3_256::new();
381    hasher.update(b"rfq-execution:");
382    hasher.update(purpose.as_bytes());
383    hasher.update(b":");
384    hasher.update(rfq_id.as_bytes());
385    hasher.update(b":");
386    hasher.update(quote_id.as_bytes());
387    let digest = hasher.finalize();
388
389    let mut bytes = [0u8; 16];
390    bytes.copy_from_slice(&digest[..16]);
391    bytes[6] = (bytes[6] & 0x0F) | 0x80;
392    bytes[8] = (bytes[8] & 0x3F) | 0x80;
393    Uuid::from_bytes(bytes).to_string()
394}
395
396/// Result of `handle_qp_response` distinguishing a simple quote store
397/// from an auto-execution trigger.
398#[derive(Debug)]
399pub enum HandleQpResponseResult {
400    /// Quote stored normally; taker must explicitly accept.
401    QuoteStored(RfqRecord),
402    /// The quote was within the taker's pre-authorized limit and the RFQ
403    /// has been atomically transitioned to Accepted. The caller should
404    /// dispatch the execution command to the engine.
405    AutoExecuted {
406        record: RfqRecord,
407        accepted_quote_id: Uuid,
408    },
409    /// The taker had auto_accept_limit set but the quote exceeded it.
410    /// The caller should notify the taker so they can fall back immediately
411    /// instead of waiting for the timeout.
412    AutoExecuteSkipped {
413        record: RfqRecord,
414        quote_premium: Decimal,
415        limit: Decimal,
416    },
417    /// RPI quote was accepted as an auction candidate. Final execution waits
418    /// until the auction closes so the best price wins.
419    RpiQuoteStored(RfqRecord),
420    /// QP responded, but the quote did not satisfy the mandatory price
421    /// improvement rule, or was otherwise not a full-size RPI candidate.
422    RpiQuoteRejected { record: RfqRecord, reason: String },
423}
424
425/// Manages RFQ lifecycle: creation, QP fan-out, quote collection, acceptance.
426pub struct RfqManager {
427    active_rfqs: DashMap<Uuid, RfqRecord>,
428    qp_cache: Arc<QuoteProviderCache>,
429    qp_sessions: Arc<dyn QpSessionRegistry>,
430    rfq_sender: mpsc::Sender<RfqExecuteRequest>,
431    db: Option<Arc<dyn RfqWriter>>,
432    config: RfqConfig,
433}
434
435impl RfqManager {
436    pub fn new(
437        qp_cache: Arc<QuoteProviderCache>,
438        qp_sessions: Arc<dyn QpSessionRegistry>,
439        rfq_sender: mpsc::Sender<RfqExecuteRequest>,
440        config: RfqConfig,
441    ) -> Self {
442        Self {
443            active_rfqs: DashMap::new(),
444            qp_cache,
445            qp_sessions,
446            rfq_sender,
447            db: None,
448            config,
449        }
450    }
451
452    pub fn min_improvement_tick(&self) -> Decimal {
453        self.config.min_improvement_tick()
454    }
455
456    /// Persist an RFQ status change to the `rfq_records` table. Non-fatal on
457    /// failure — logs a warning and keeps going so in-memory state
458    /// transitions stay responsive even if the DB hiccups. Callers MUST NOT
459    /// hold a DashMap lock (e.g. `get_mut` guard or `alter_all` closure) while
460    /// invoking this; drop the entry first.
461    fn persist_status(&self, rfq_id: &Uuid, status: RfqStatus) {
462        if let Some(ref handler) = self.db {
463            if let Err(e) = handler.update_rfq_status_sync(rfq_id, status.as_str()) {
464                warn!(
465                    "Failed to persist RFQ {} status {}: {}",
466                    rfq_id,
467                    status.as_str(),
468                    e
469                );
470            }
471        }
472    }
473
474    /// Set the persistence writer for RFQ records and quotes.
475    pub fn set_db(&mut self, handler: Arc<dyn RfqWriter>) {
476        self.db = Some(handler);
477    }
478
479    /// Submit a new RFQ from a taker.
480    pub async fn submit_rfq(
481        &self,
482        rfq_id: Uuid,
483        taker_wallet: WalletAddress,
484        taker_signer: WalletAddress,
485        legs: Vec<RfqLeg>,
486        legs_hash: [u8; 32],
487        taker_signature: String,
488        taker_nonce: u64,
489        auto_accept_limit: Option<Decimal>,
490    ) -> Result<RfqRecord, String> {
491        // Validate leg count
492        if legs.is_empty() || legs.len() > self.config.max_legs {
493            return Err(format!(
494                "Invalid number of legs: {} (must be 1-{})",
495                legs.len(),
496                self.config.max_legs
497            ));
498        }
499
500        // Extract underlying (all legs must share the same one)
501        let underlying = legs[0]
502            .instrument
503            .split('-')
504            .next()
505            .ok_or("Invalid instrument format")?
506            .to_string();
507
508        for leg in &legs[1..] {
509            let leg_underlying = leg
510                .instrument
511                .split('-')
512                .next()
513                .ok_or("Invalid instrument format")?;
514            if leg_underlying != underlying {
515                return Err("All legs must share the same underlying".to_string());
516            }
517        }
518
519        if let Some(limit) = auto_accept_limit {
520            if limit <= Decimal::ZERO {
521                return Err("auto_accept_limit must be positive".to_string());
522            }
523            auto_accept_limit_mode(&legs)?;
524        }
525
526        let now = current_time_ms();
527        let response_deadline_ms = self.config.response_deadline_ms;
528        let record = RfqRecord {
529            rfq_id,
530            taker_wallet,
531            taker_signer,
532            builder_code_address: None,
533            underlying: underlying.clone(),
534            legs,
535            legs_hash,
536            taker_signature,
537            taker_nonce,
538            status: RfqStatus::Created,
539            quotes: Vec::new(),
540            created_at: now,
541            deadline_at: now + response_deadline_ms,
542            expires_at: now + self.config.rfq_lifetime_ms,
543            auto_accept_limit,
544            accepted_quote_id: None,
545            execution_quote_id: None,
546            execution_request_id: None,
547            execution_fill_id: None,
548            execution_timestamp_ms: None,
549            eligible_qps: Vec::new(),
550            responded_qps: HashSet::new(),
551            rpi_auction: None,
552        };
553
554        // Insert atomically; reject if the caller reused an rfq_id. The
555        // DB header row is persisted with `ON CONFLICT (rfq_id) DO
556        // NOTHING` (see `persist_rfq_record_sync`), so overwriting the
557        // in-memory entry while the durable header stayed bound to the
558        // original RFQ would splice new quotes/execution onto stale
559        // persisted metadata and break taker flow consistency. Fail
560        // fast before any mutation.
561        use dashmap::mapref::entry::Entry;
562        match self.active_rfqs.entry(rfq_id) {
563            Entry::Occupied(_) => {
564                return Err(format!("RFQ {} already exists", rfq_id));
565            }
566            Entry::Vacant(slot) => {
567                slot.insert(record.clone());
568            }
569        }
570
571        // Persist RFQ record + legs to DB
572        if let Some(ref handler) = self.db {
573            let db_legs: Vec<(String, String, Decimal)> = record
574                .legs
575                .iter()
576                .map(|l| {
577                    let side = match l.side {
578                        Side::Buy => "buy".to_string(),
579                        Side::Sell => "sell".to_string(),
580                    };
581                    (l.instrument.clone(), side, l.size)
582                })
583                .collect();
584            if let Err(e) = handler.persist_rfq_record_sync(
585                &rfq_id,
586                &taker_wallet,
587                &underlying,
588                "created",
589                &record.taker_signature,
590                record.taker_nonce,
591                &legs_hash,
592                &db_legs,
593                record.expires_at,
594            ) {
595                info!("Failed to persist RFQ record to DB (non-fatal): {}", e);
596            }
597        }
598
599        // Fan out to eligible QPs
600        let eligible_qps = self.qp_cache.get_active_for_underlying(&underlying).await;
601        let eligible_wallets: Vec<WalletAddress> = eligible_qps
602            .iter()
603            .filter(|_qp| {
604                // Filter out MMP-frozen QPs
605                // Note: is_frozen is async but we need sync here. Use try approach.
606                true // MMP check is done async in the engine execution path
607            })
608            .map(|qp| qp.wallet)
609            .collect();
610
611        if let Some(mut entry) = self.active_rfqs.get_mut(&rfq_id) {
612            entry.eligible_qps = eligible_wallets.clone();
613        }
614
615        if !eligible_wallets.is_empty() {
616            let auto_execute = record.auto_accept_limit.is_some();
617            let revealed_limit = record
618                .auto_accept_limit
619                .and_then(|limit| self.config.reveal_limit_to_qps.then(|| limit.to_string()));
620            let rfq_msg = QpOutboundMessage::RfqRequest {
621                rfq_id: rfq_id.to_string(),
622                legs: record
623                    .legs
624                    .iter()
625                    .map(|l| QpRfqLeg {
626                        instrument: l.instrument.clone(),
627                        side: format!("{:?}", l.side).to_lowercase(),
628                        size: l.size.to_string(),
629                    })
630                    .collect(),
631                taker_wallet: taker_wallet.as_hex(),
632                request_timestamp: now,
633                response_deadline_ms,
634                auto_accept_limit: revealed_limit.clone(),
635                auto_execute,
636                taker_limit_price: revealed_limit,
637                reference_price: None,
638                min_improvement_tick: None,
639                auction_deadline_ms: None,
640                requires_price_improvement: false,
641            };
642
643            self.qp_sessions
644                .send_rfq_to_eligible(&eligible_wallets, rfq_msg);
645        }
646
647        // Transition to SENT_TO_QPS
648        if let Some(mut entry) = self.active_rfqs.get_mut(&rfq_id) {
649            entry.status = RfqStatus::SentToQps;
650        }
651        self.persist_status(&rfq_id, RfqStatus::SentToQps);
652
653        let record = self
654            .active_rfqs
655            .get(&rfq_id)
656            .map(|r| r.clone())
657            .ok_or("RFQ not found after creation")?;
658
659        counter!(
660            "ht_rfq_submit_total",
661            "underlying" => underlying.clone(),
662            "result" => "ok"
663        )
664        .increment(1);
665
666        info!(
667            "RFQ submitted: rfq_id={}, taker={}, underlying={}, legs={}, eligible_qps={}",
668            rfq_id,
669            taker_wallet.as_hex(),
670            underlying,
671            record.legs.len(),
672            eligible_wallets.len(),
673        );
674
675        Ok(record)
676    }
677
678    pub async fn submit_rpi_auction(&self, request: SubmitRpiAuction) -> Result<RfqRecord, String> {
679        if request.legs.len() != 1 {
680            return Err("RPI auction requires exactly one leg".to_string());
681        }
682        if request.limit_price <= Decimal::ZERO {
683            return Err("RPI limit price must be positive".to_string());
684        }
685        if request
686            .reference_price
687            .is_some_and(|reference_price| reference_price <= Decimal::ZERO)
688        {
689            return Err("RPI reference price must be positive when present".to_string());
690        }
691        if request.min_tick <= Decimal::ZERO {
692            return Err("RPI min tick must be positive".to_string());
693        }
694
695        let underlying = request.legs[0]
696            .instrument
697            .split('-')
698            .next()
699            .ok_or("Invalid instrument format")?
700            .to_string();
701
702        let now = current_time_ms();
703        let auction_ms = self.config.rpi_auction_ms;
704        let record = RfqRecord {
705            rfq_id: request.rfq_id,
706            taker_wallet: request.taker_wallet,
707            taker_signer: request.taker_signer,
708            builder_code_address: request.builder_code_address,
709            underlying: underlying.clone(),
710            legs: request.legs,
711            legs_hash: request.legs_hash,
712            taker_signature: request.taker_signature,
713            taker_nonce: request.taker_nonce,
714            status: RfqStatus::Created,
715            quotes: Vec::new(),
716            created_at: now,
717            deadline_at: now + auction_ms,
718            expires_at: now + self.config.rfq_lifetime_ms,
719            auto_accept_limit: None,
720            accepted_quote_id: None,
721            execution_quote_id: None,
722            execution_request_id: None,
723            execution_fill_id: None,
724            execution_timestamp_ms: None,
725            eligible_qps: Vec::new(),
726            responded_qps: HashSet::new(),
727            rpi_auction: Some(RpiAuctionContext {
728                limit_price: request.limit_price,
729                reference_price: request.reference_price,
730                min_tick: request.min_tick,
731            }),
732        };
733
734        use dashmap::mapref::entry::Entry;
735        match self.active_rfqs.entry(record.rfq_id) {
736            Entry::Occupied(_) => {
737                return Err(format!("RFQ {} already exists", record.rfq_id));
738            }
739            Entry::Vacant(slot) => {
740                slot.insert(record.clone());
741            }
742        }
743
744        if let Some(ref handler) = self.db {
745            let db_legs: Vec<(String, String, Decimal)> = record
746                .legs
747                .iter()
748                .map(|l| {
749                    let side = match l.side {
750                        Side::Buy => "buy".to_string(),
751                        Side::Sell => "sell".to_string(),
752                    };
753                    (l.instrument.clone(), side, l.size)
754                })
755                .collect();
756            if let Err(e) = handler.persist_rfq_record_sync(
757                &record.rfq_id,
758                &record.taker_wallet,
759                &underlying,
760                "created",
761                &record.taker_signature,
762                record.taker_nonce,
763                &record.legs_hash,
764                &db_legs,
765                record.expires_at,
766            ) {
767                info!("Failed to persist RPI RFQ record to DB (non-fatal): {}", e);
768            }
769        }
770
771        let eligible_qps = self.qp_cache.get_active_for_underlying(&underlying).await;
772        let connected_wallets: HashSet<WalletAddress> = self
773            .qp_sessions
774            .get_connected_wallets()
775            .into_iter()
776            .collect();
777        let eligible_wallets: Vec<WalletAddress> = eligible_qps
778            .iter()
779            .map(|qp| qp.wallet)
780            .filter(|wallet| connected_wallets.contains(wallet))
781            .collect();
782
783        if let Some(mut entry) = self.active_rfqs.get_mut(&record.rfq_id) {
784            entry.eligible_qps = eligible_wallets.clone();
785        }
786
787        if !eligible_wallets.is_empty() {
788            let rpi = record
789                .rpi_auction
790                .as_ref()
791                .ok_or("RPI context missing after record creation")?;
792            let rfq_msg = QpOutboundMessage::RfqRequest {
793                rfq_id: record.rfq_id.to_string(),
794                legs: record
795                    .legs
796                    .iter()
797                    .map(|l| QpRfqLeg {
798                        instrument: l.instrument.clone(),
799                        side: format!("{:?}", l.side).to_lowercase(),
800                        size: l.size.to_string(),
801                    })
802                    .collect(),
803                taker_wallet: record.taker_wallet.as_hex(),
804                request_timestamp: now,
805                response_deadline_ms: auction_ms,
806                auto_accept_limit: None,
807                auto_execute: true,
808                taker_limit_price: self
809                    .config
810                    .reveal_limit_to_qps
811                    .then(|| rpi.limit_price.to_string()),
812                reference_price: rpi.reference_price.map(|price| price.to_string()),
813                min_improvement_tick: rpi.reference_price.map(|_| rpi.min_tick.to_string()),
814                auction_deadline_ms: Some(auction_ms),
815                requires_price_improvement: rpi.reference_price.is_some(),
816            };
817            self.qp_sessions
818                .send_rfq_to_eligible(&eligible_wallets, rfq_msg);
819        }
820
821        if let Some(mut entry) = self.active_rfqs.get_mut(&record.rfq_id) {
822            entry.status = RfqStatus::SentToQps;
823        }
824        self.persist_status(&record.rfq_id, RfqStatus::SentToQps);
825
826        counter!(
827            "ht_rpi_auction_total",
828            "outcome" => "started",
829            "underlying" => underlying.clone()
830        )
831        .increment(1);
832
833        Ok(self
834            .active_rfqs
835            .get(&record.rfq_id)
836            .map(|r| r.clone())
837            .ok_or("RPI RFQ not found after creation")?)
838    }
839
840    /// Handle a firm quote response from a QP.
841    ///
842    /// No margin check here. Quotes are short-lived and non-binding until
843    /// accepted, so margin state can change between quote time and acceptance
844    /// (new fills, deposits, other RFQs landing). The authoritative SPAN
845    /// margin check on both taker and QP runs in `plan_rfq_execution` at
846    /// accept time, which cannot be skipped. Adding a check here would add
847    /// engine load on every incoming quote for a result that's immediately
848    /// stale.
849    pub fn handle_qp_response(
850        &self,
851        rfq_id: Uuid,
852        quote: QpQuote,
853    ) -> Result<HandleQpResponseResult, String> {
854        let mut entry = self.active_rfqs.get_mut(&rfq_id).ok_or("RFQ not found")?;
855
856        // Validate state
857        if entry.status != RfqStatus::SentToQps && entry.status != RfqStatus::QuotesReceived {
858            return Err(format!(
859                "RFQ in invalid state for quotes: {:?}",
860                entry.status
861            ));
862        }
863
864        let now = current_time_ms();
865        if now > entry.deadline_at {
866            return Err("RFQ response deadline has passed".to_string());
867        }
868
869        // Validate valid_for_ms
870        if quote.valid_for_ms > self.config.max_valid_for_ms {
871            return Err(format!(
872                "valid_for_ms {} exceeds max {}",
873                quote.valid_for_ms, self.config.max_valid_for_ms
874            ));
875        }
876
877        if entry.taker_wallet == quote.qp_wallet {
878            entry.responded_qps.insert(quote.qp_wallet);
879            let snapshot = entry.clone();
880            if entry.rpi_auction.is_some() {
881                return Ok(HandleQpResponseResult::RpiQuoteRejected {
882                    record: snapshot,
883                    reason: RFQ_SELF_TRADE_REJECTION_REASON.to_string(),
884                });
885            }
886            return Err(RFQ_SELF_TRADE_REJECTION_REASON.to_string());
887        }
888
889        let rpi_auction = entry.rpi_auction.clone();
890        if let Some(rpi) = rpi_auction.as_ref() {
891            entry.responded_qps.insert(quote.qp_wallet);
892            if let Err(reason) = validate_rpi_quote_shape(&entry.legs, &quote) {
893                let snapshot = entry.clone();
894                return Ok(HandleQpResponseResult::RpiQuoteRejected {
895                    record: snapshot,
896                    reason,
897                });
898            }
899            let Some(first_quote_leg) = quote.legs.first() else {
900                let snapshot = entry.clone();
901                return Ok(HandleQpResponseResult::RpiQuoteRejected {
902                    record: snapshot,
903                    reason: "RPI quote missing quote leg".to_string(),
904                });
905            };
906            if !quote_satisfies_rpi_auction(
907                first_quote_leg.side,
908                rpi.limit_price,
909                rpi.reference_price,
910                first_quote_leg.price,
911                rpi.min_tick,
912            ) {
913                let snapshot = entry.clone();
914                let (reason_label, reason) = rpi_quote_rejection(
915                    first_quote_leg.side,
916                    rpi.limit_price,
917                    rpi.reference_price,
918                    first_quote_leg.price,
919                    rpi.min_tick,
920                );
921                counter!("ht_rpi_quote_rejected_total", "reason" => reason_label).increment(1);
922                return Ok(HandleQpResponseResult::RpiQuoteRejected {
923                    record: snapshot,
924                    reason,
925                });
926            }
927        } else {
928            entry.responded_qps.insert(quote.qp_wallet);
929        }
930
931        // Persist quote to DB
932        if let Some(ref handler) = self.db {
933            let db_legs: Vec<(String, String, Decimal, Decimal)> = quote
934                .legs
935                .iter()
936                .map(|l| {
937                    let side = match l.side {
938                        Side::Buy => "buy".to_string(),
939                        Side::Sell => "sell".to_string(),
940                    };
941                    (l.instrument.clone(), side, l.price, l.size)
942                })
943                .collect();
944            let expires_at_ms = quote.received_at + quote.valid_for_ms;
945            if let Err(e) = handler.persist_rfq_quote_sync(
946                &quote.quote_id,
947                &rfq_id,
948                &quote.qp_wallet,
949                quote.net_premium,
950                quote.valid_for_ms,
951                &quote.qp_signature,
952                quote.qp_nonce,
953                &db_legs,
954                expires_at_ms,
955            ) {
956                info!("Failed to persist RFQ quote to DB (non-fatal): {}", e);
957            }
958        }
959
960        // Store quote and transition; snapshot while holding the lock.
961        let quote_id = quote.quote_id;
962        let quote_net_premium = quote.net_premium;
963        entry.quotes.push(quote);
964        let transitioned_to_quotes_received = entry.status == RfqStatus::SentToQps;
965        if transitioned_to_quotes_received {
966            entry.status = RfqStatus::QuotesReceived;
967        }
968
969        if rpi_auction.is_some() {
970            let snapshot = entry.clone();
971            drop(entry);
972            if transitioned_to_quotes_received {
973                self.persist_status(&rfq_id, RfqStatus::QuotesReceived);
974            }
975            return Ok(HandleQpResponseResult::RpiQuoteStored(snapshot));
976        }
977
978        // Check auto-accept directionally. Buy RFQs authorize a max debit,
979        // while sell RFQs authorize a min credit. Atomically transition to
980        // Accepted under the DashMap lock so only one QP can win.
981        let auto_accept_limit = entry.auto_accept_limit;
982        let auto_execute = if let Some(limit) = auto_accept_limit {
983            let within_limit = match quote_satisfies_auto_accept_limit(
984                &entry.legs,
985                quote_net_premium,
986                limit,
987                self.config.min_improvement_tick(),
988            ) {
989                Ok(within_limit) => within_limit,
990                Err(reason) => {
991                    warn!(
992                        "RFQ {} auto-execute SKIPPED: {} net_premium={} limit={}",
993                        rfq_id, reason, quote_net_premium, limit
994                    );
995                    false
996                }
997            };
998
999            if within_limit
1000                && (entry.status == RfqStatus::QuotesReceived
1001                    || entry.status == RfqStatus::SentToQps)
1002            {
1003                entry.status = RfqStatus::Accepted;
1004                entry.accepted_quote_id = Some(quote_id);
1005                true
1006            } else {
1007                info!(
1008                    "RFQ {} auto-execute SKIPPED: net_premium={} limit={} status={:?}",
1009                    rfq_id, quote_net_premium, limit, entry.status,
1010                );
1011                false
1012            }
1013        } else {
1014            false
1015        };
1016
1017        let snapshot = entry.clone();
1018        drop(entry);
1019
1020        if auto_execute {
1021            self.persist_status(&rfq_id, RfqStatus::Accepted);
1022            counter!("ht_rfq_auto_execute_total", "result" => "triggered").increment(1);
1023            info!(
1024                "RFQ {} auto-executing: quote {} net_premium={} within limit",
1025                rfq_id, quote_id, quote_net_premium
1026            );
1027            Ok(HandleQpResponseResult::AutoExecuted {
1028                record: snapshot,
1029                accepted_quote_id: quote_id,
1030            })
1031        } else {
1032            if transitioned_to_quotes_received {
1033                self.persist_status(&rfq_id, RfqStatus::QuotesReceived);
1034            }
1035            if let Some(limit) = auto_accept_limit {
1036                Ok(HandleQpResponseResult::AutoExecuteSkipped {
1037                    record: snapshot,
1038                    quote_premium: quote_net_premium.abs(),
1039                    limit,
1040                })
1041            } else {
1042                Ok(HandleQpResponseResult::QuoteStored(snapshot))
1043            }
1044        }
1045    }
1046
1047    pub fn record_qp_invalid_response(&self, rfq_id: Uuid, qp_wallet: WalletAddress) {
1048        if let Some(mut entry) = self.active_rfqs.get_mut(&rfq_id) {
1049            if entry.rpi_auction.is_some() {
1050                entry.responded_qps.insert(qp_wallet);
1051            }
1052        }
1053    }
1054
1055    /// Accept a quote. Uses get_mut() for atomic state transition (prevents double-accept).
1056    /// DashMap::get_mut() holds an exclusive shard lock for the key, so the read-check-write
1057    /// within the block is atomic with respect to other accesses to the same key.
1058    pub async fn accept_quote(
1059        &self,
1060        rfq_id: Uuid,
1061        quote_id: Uuid,
1062        taker_wallet: WalletAddress,
1063        taker_signature: String,
1064        taker_signer: WalletAddress,
1065        taker_nonce: u64,
1066    ) -> Result<RfqExecuteResult, String> {
1067        // Atomic check-and-transition using DashMap::entry
1068        let (cmd, was_accepted) = {
1069            let mut entry = self.active_rfqs.get_mut(&rfq_id).ok_or("RFQ not found")?;
1070
1071            // Validate state atomically
1072            if entry.status != RfqStatus::QuotesReceived {
1073                return Err(format!(
1074                    "RFQ in invalid state for acceptance: {:?}",
1075                    entry.status
1076                ));
1077            }
1078
1079            if entry.taker_wallet != taker_wallet {
1080                return Err("Taker wallet mismatch".to_string());
1081            }
1082
1083            // Find the quote
1084            let quote = entry
1085                .quotes
1086                .iter()
1087                .find(|q| q.quote_id == quote_id)
1088                .ok_or("Quote not found")?
1089                .clone();
1090
1091            // Check quote validity
1092            let now = current_time_ms();
1093            let quote_expires_at = quote.received_at + quote.valid_for_ms;
1094            if now > quote_expires_at {
1095                return Err("Quote has expired".to_string());
1096            }
1097
1098            // Atomically transition to ACCEPTED (prevents double-accept)
1099            entry.status = RfqStatus::Accepted;
1100            entry.accepted_quote_id = Some(quote_id);
1101            let (request_id, fill_id, execution_timestamp_ms) =
1102                entry.execution_facts_for_quote(quote_id, now);
1103
1104            // Build execution command
1105            let legs: Vec<RfqExecuteLeg> = quote
1106                .legs
1107                .iter()
1108                .map(|l| RfqExecuteLeg {
1109                    instrument: l.instrument.clone(),
1110                    taker_side: l.side,
1111                    price: l.price,
1112                    size: l.size,
1113                })
1114                .collect();
1115
1116            let cmd = RfqExecuteCommand {
1117                request_id,
1118                fill_id,
1119                rfq_id: rfq_id.to_string(),
1120                quote_id: quote_id.to_string(),
1121                taker_wallet,
1122                qp_wallet: quote.qp_wallet,
1123                builder_code_address: entry.builder_code_address,
1124                timestamp_ms: execution_timestamp_ms,
1125                legs,
1126                net_premium: quote.net_premium,
1127                taker_signature,
1128                qp_signature: quote.qp_signature.clone(),
1129                taker_nonce: Some(entry.taker_nonce),
1130                taker_accept_nonce: Some(taker_nonce),
1131                taker_submit_signer: Some(entry.taker_signer),
1132                taker_accept_signer: Some(taker_signer),
1133            };
1134
1135            (cmd, true)
1136        };
1137
1138        if !was_accepted {
1139            return Err("Failed to accept quote".to_string());
1140        }
1141        self.persist_status(&rfq_id, RfqStatus::Accepted);
1142
1143        // Dispatch to engine. If the engine channel is closed or the
1144        // response channel is dropped (e.g. engine restart mid-request),
1145        // the in-memory and durable RFQ state are still `Accepted` from
1146        // the atomic transition above. Revert to `QuotesReceived` on any
1147        // failure so the caller can retry instead of leaving the RFQ
1148        // permanently stuck in `Accepted` (which the state check at the
1149        // top of this function would then forever reject as "invalid
1150        // state for acceptance", and which `cleanup_expired` does NOT
1151        // transition out of).
1152        let (response_tx, response_rx) = tokio::sync::oneshot::channel();
1153        let request = RfqExecuteRequest {
1154            command: cmd,
1155            response_tx,
1156        };
1157
1158        let revert_to_quotes_received = |manager: &Self| {
1159            if let Some(mut entry) = manager.active_rfqs.get_mut(&rfq_id) {
1160                entry.status = RfqStatus::QuotesReceived;
1161                entry.accepted_quote_id = None;
1162            }
1163            manager.persist_status(&rfq_id, RfqStatus::QuotesReceived);
1164        };
1165
1166        if self.rfq_sender.send(request).await.is_err() {
1167            revert_to_quotes_received(self);
1168            return Err("Engine channel closed".to_string());
1169        }
1170
1171        let result = match response_rx.await {
1172            Ok(r) => r,
1173            Err(_) => {
1174                revert_to_quotes_received(self);
1175                return Err("Engine response channel closed".to_string());
1176            }
1177        };
1178
1179        if let RfqExecuteResult::Failed { reason } = &result {
1180            if is_nonce_rejection_reason(reason) {
1181                revert_to_quotes_received(self);
1182                return Err(format!("RFQ execution rejected: {}", reason));
1183            }
1184        }
1185
1186        // Update status based on result
1187        let terminal_status = match &result {
1188            RfqExecuteResult::Success { .. } => RfqStatus::Executed,
1189            RfqExecuteResult::Failed { .. } => RfqStatus::Failed,
1190        };
1191        if let Some(mut entry) = self.active_rfqs.get_mut(&rfq_id) {
1192            entry.status = terminal_status;
1193        }
1194        self.persist_status(&rfq_id, terminal_status);
1195
1196        Ok(result)
1197    }
1198
1199    /// Execute an auto-accepted RFQ quote. Called from a spawned task after
1200    /// `handle_qp_response` returns `AutoExecuted`. The taker already
1201    /// authorized execution via the `SubmitAutoExecuteRfq` EIP-712
1202    /// signature, so no separate accept signature is needed.
1203    pub async fn auto_accept_quote(
1204        &self,
1205        rfq_id: Uuid,
1206        quote_id: Uuid,
1207        taker_signature: String,
1208    ) -> Result<RfqExecuteResult, String> {
1209        // Build the execution command from the already-Accepted record.
1210        let cmd = {
1211            let mut entry = self.active_rfqs.get_mut(&rfq_id).ok_or("RFQ not found")?;
1212            if entry.status != RfqStatus::Accepted {
1213                return Err(format!(
1214                    "RFQ in unexpected state for auto-accept execution: {:?}",
1215                    entry.status
1216                ));
1217            }
1218
1219            let quote = entry
1220                .quotes
1221                .iter()
1222                .find(|q| q.quote_id == quote_id)
1223                .ok_or("Quote not found for auto-accept")?
1224                .clone();
1225
1226            // Reject if the quote has expired between the instant it was
1227            // stored (handle_qp_response) and when we dispatch execution.
1228            let now = current_time_ms();
1229            let quote_expires_at = quote.received_at + quote.valid_for_ms;
1230            if now > quote_expires_at {
1231                // Revert to QuotesReceived so subsequent quotes can still
1232                // be accepted or the taker can manually pick one.
1233                entry.revert_auto_accept_to_quotes_received();
1234                drop(entry);
1235                self.persist_status(&rfq_id, RfqStatus::QuotesReceived);
1236                return Err("Quote has expired".to_string());
1237            }
1238
1239            let legs: Vec<RfqExecuteLeg> = quote
1240                .legs
1241                .iter()
1242                .map(|l| RfqExecuteLeg {
1243                    instrument: l.instrument.clone(),
1244                    taker_side: l.side,
1245                    price: l.price,
1246                    size: l.size,
1247                })
1248                .collect();
1249            let (request_id, fill_id, execution_timestamp_ms) =
1250                entry.execution_facts_for_quote(quote_id, now);
1251
1252            RfqExecuteCommand {
1253                request_id,
1254                fill_id,
1255                rfq_id: rfq_id.to_string(),
1256                quote_id: quote_id.to_string(),
1257                taker_wallet: entry.taker_wallet,
1258                qp_wallet: quote.qp_wallet,
1259                builder_code_address: entry.builder_code_address,
1260                timestamp_ms: execution_timestamp_ms,
1261                legs,
1262                net_premium: quote.net_premium,
1263                taker_signature,
1264                qp_signature: quote.qp_signature.clone(),
1265                taker_nonce: Some(entry.taker_nonce),
1266                taker_accept_nonce: None,
1267                taker_submit_signer: Some(entry.taker_signer),
1268                taker_accept_signer: None,
1269            }
1270        }; // DashMap read guard dropped here
1271
1272        // Dispatch to engine
1273        let (response_tx, response_rx) = tokio::sync::oneshot::channel();
1274        let request = RfqExecuteRequest {
1275            command: cmd,
1276            response_tx,
1277        };
1278
1279        let revert_to_quotes_received = |manager: &Self| {
1280            if let Some(mut entry) = manager.active_rfqs.get_mut(&rfq_id) {
1281                entry.revert_auto_accept_to_quotes_received();
1282            }
1283            manager.persist_status(&rfq_id, RfqStatus::QuotesReceived);
1284        };
1285
1286        if self.rfq_sender.send(request).await.is_err() {
1287            revert_to_quotes_received(self);
1288            return Err("Engine channel closed".to_string());
1289        }
1290
1291        let result = match response_rx.await {
1292            Ok(r) => r,
1293            Err(_) => {
1294                revert_to_quotes_received(self);
1295                return Err("Engine response channel closed".to_string());
1296            }
1297        };
1298
1299        if let RfqExecuteResult::Failed { reason } = &result {
1300            if is_nonce_rejection_reason(reason) {
1301                revert_to_quotes_received(self);
1302                return Err(format!("RFQ execution rejected: {}", reason));
1303            }
1304        }
1305
1306        // Update status based on result
1307        let terminal_status = match &result {
1308            RfqExecuteResult::Success { .. } => RfqStatus::Executed,
1309            RfqExecuteResult::Failed { .. } => RfqStatus::Failed,
1310        };
1311        if let Some(mut entry) = self.active_rfqs.get_mut(&rfq_id) {
1312            entry.status = terminal_status;
1313        }
1314        self.persist_status(&rfq_id, terminal_status);
1315
1316        Ok(result)
1317    }
1318
1319    pub fn record_qp_decline(&self, rfq_id: Uuid, qp_wallet: WalletAddress) {
1320        if let Some(mut entry) = self.active_rfqs.get_mut(&rfq_id) {
1321            entry.responded_qps.insert(qp_wallet);
1322        }
1323    }
1324
1325    pub async fn wait_for_rpi_auction_close(&self, rfq_id: Uuid) -> Result<RfqRecord, String> {
1326        loop {
1327            let record = self
1328                .active_rfqs
1329                .get(&rfq_id)
1330                .map(|r| r.clone())
1331                .ok_or_else(|| "RPI RFQ not found".to_string())?;
1332            if record.rpi_auction.is_none() {
1333                return Err("RFQ is not an RPI auction".to_string());
1334            }
1335
1336            if should_close_rpi_auction(&record, current_time_ms()) {
1337                histogram!(
1338                    "ht_rpi_auction_duration_ms",
1339                    "outcome" => if record.quotes.is_empty() { "fallback_to_book" } else { "candidate_quotes" }
1340                )
1341                .record(current_time_ms().saturating_sub(record.created_at) as f64);
1342                return Ok(record);
1343            }
1344
1345            tokio::time::sleep(Duration::from_millis(10)).await;
1346        }
1347    }
1348
1349    pub fn rpi_candidate_quote_ids(&self, rfq_id: &Uuid) -> Result<Vec<Uuid>, String> {
1350        let record = self
1351            .active_rfqs
1352            .get(rfq_id)
1353            .map(|r| r.clone())
1354            .ok_or_else(|| "RPI RFQ not found".to_string())?;
1355        sorted_rpi_candidate_quote_ids(&record)
1356    }
1357
1358    pub async fn execute_rpi_quote(
1359        &self,
1360        rfq_id: Uuid,
1361        quote_id: Uuid,
1362        taker_signature: String,
1363    ) -> Result<RfqExecuteResult, String> {
1364        let cmd = {
1365            let mut entry = self.active_rfqs.get_mut(&rfq_id).ok_or("RFQ not found")?;
1366            if entry.rpi_auction.is_none() {
1367                return Err("RFQ is not an RPI auction".to_string());
1368            }
1369            if entry.status != RfqStatus::SentToQps && entry.status != RfqStatus::QuotesReceived {
1370                return Err(format!(
1371                    "RFQ in invalid state for RPI execution: {:?}",
1372                    entry.status
1373                ));
1374            }
1375
1376            let quote = entry
1377                .quotes
1378                .iter()
1379                .find(|q| q.quote_id == quote_id)
1380                .ok_or("Quote not found")?
1381                .clone();
1382
1383            let now = current_time_ms();
1384            let quote_expires_at = quote.received_at + quote.valid_for_ms;
1385            if now > quote_expires_at {
1386                return Err("Quote has expired".to_string());
1387            }
1388
1389            entry.status = RfqStatus::Accepted;
1390            entry.accepted_quote_id = Some(quote_id);
1391            let (request_id, fill_id, execution_timestamp_ms) =
1392                entry.execution_facts_for_quote(quote_id, now);
1393
1394            let legs: Vec<RfqExecuteLeg> = quote
1395                .legs
1396                .iter()
1397                .map(|l| RfqExecuteLeg {
1398                    instrument: l.instrument.clone(),
1399                    taker_side: l.side,
1400                    price: l.price,
1401                    size: l.size,
1402                })
1403                .collect();
1404
1405            RfqExecuteCommand {
1406                request_id,
1407                fill_id,
1408                rfq_id: rfq_id.to_string(),
1409                quote_id: quote_id.to_string(),
1410                taker_wallet: entry.taker_wallet,
1411                qp_wallet: quote.qp_wallet,
1412                builder_code_address: entry.builder_code_address,
1413                timestamp_ms: execution_timestamp_ms,
1414                legs,
1415                net_premium: quote.net_premium,
1416                taker_signature,
1417                qp_signature: quote.qp_signature.clone(),
1418                taker_nonce: Some(entry.taker_nonce),
1419                taker_accept_nonce: None,
1420                taker_submit_signer: Some(entry.taker_signer),
1421                taker_accept_signer: None,
1422            }
1423        };
1424        self.persist_status(&rfq_id, RfqStatus::Accepted);
1425
1426        let (response_tx, response_rx) = tokio::sync::oneshot::channel();
1427        let request = RfqExecuteRequest {
1428            command: cmd,
1429            response_tx,
1430        };
1431
1432        let revert_to_quotes_received = |manager: &Self| {
1433            if let Some(mut entry) = manager.active_rfqs.get_mut(&rfq_id) {
1434                entry.status = RfqStatus::QuotesReceived;
1435                entry.clear_execution_selection();
1436            }
1437            manager.persist_status(&rfq_id, RfqStatus::QuotesReceived);
1438        };
1439
1440        if self.rfq_sender.send(request).await.is_err() {
1441            revert_to_quotes_received(self);
1442            return Err("Engine channel closed".to_string());
1443        }
1444
1445        let result = match response_rx.await {
1446            Ok(r) => r,
1447            Err(_) => {
1448                revert_to_quotes_received(self);
1449                return Err("Engine response channel closed".to_string());
1450            }
1451        };
1452
1453        match &result {
1454            RfqExecuteResult::Success { .. } => {
1455                if let Some(mut entry) = self.active_rfqs.get_mut(&rfq_id) {
1456                    entry.status = RfqStatus::Executed;
1457                }
1458                self.persist_status(&rfq_id, RfqStatus::Executed);
1459                counter!("ht_rpi_auction_total", "outcome" => "filled").increment(1);
1460            }
1461            RfqExecuteResult::Failed { .. } => {
1462                revert_to_quotes_received(self);
1463            }
1464        }
1465
1466        Ok(result)
1467    }
1468
1469    pub fn mark_rpi_fallback_to_book(&self, rfq_id: &Uuid) {
1470        if let Some(mut entry) = self.active_rfqs.get_mut(rfq_id) {
1471            entry.status = if entry.quotes.is_empty() {
1472                RfqStatus::NoQuotes
1473            } else {
1474                RfqStatus::Failed
1475            };
1476        }
1477        let status = self
1478            .active_rfqs
1479            .get(rfq_id)
1480            .map(|entry| entry.status)
1481            .unwrap_or(RfqStatus::Failed);
1482        self.persist_status(rfq_id, status);
1483        counter!("ht_rpi_auction_total", "outcome" => "fallback_to_book").increment(1);
1484    }
1485
1486    /// Get an RFQ record by ID.
1487    pub fn get_rfq(&self, rfq_id: &Uuid) -> Option<RfqRecord> {
1488        self.active_rfqs.get(rfq_id).map(|r| r.clone())
1489    }
1490
1491    /// Get RFQ history for a wallet.
1492    pub fn get_history(&self, wallet: &WalletAddress) -> Vec<RfqRecord> {
1493        self.active_rfqs
1494            .iter()
1495            .filter(|r| r.taker_wallet == *wallet)
1496            .map(|r| r.clone())
1497            .collect()
1498    }
1499
1500    /// Start a background cleanup task that expires stale RFQs.
1501    pub fn start_cleanup_task(
1502        self: &Arc<Self>,
1503        mut shutdown: tokio::sync::broadcast::Receiver<()>,
1504    ) {
1505        let manager = Arc::clone(self);
1506        tokio::spawn(async move {
1507            let mut interval = tokio::time::interval(std::time::Duration::from_secs(1));
1508            loop {
1509                tokio::select! {
1510                    _ = shutdown.recv() => break,
1511                    _ = interval.tick() => {
1512                        manager.cleanup_expired();
1513                    }
1514                }
1515            }
1516        });
1517    }
1518
1519    /// Expire stale RFQs and evict old terminal records.
1520    fn cleanup_expired(&self) {
1521        let now = current_time_ms();
1522
1523        // Collect transitions inside the lock, then persist them to DB after
1524        // the `alter_all` closure returns so we don't hold the DashMap shard
1525        // lock across a sync DB call.
1526        let transitions = std::sync::Mutex::new(Vec::<(Uuid, RfqStatus)>::new());
1527        self.active_rfqs.alter_all(|rfq_id, mut record| {
1528            let new_status = match record.status {
1529                RfqStatus::SentToQps if now > record.deadline_at => {
1530                    if record.quotes.is_empty() {
1531                        Some(RfqStatus::NoQuotes)
1532                    } else {
1533                        Some(RfqStatus::QuotesReceived)
1534                    }
1535                }
1536                RfqStatus::QuotesReceived if now > record.expires_at => Some(RfqStatus::Expired),
1537                _ => None,
1538            };
1539            if let Some(status) = new_status {
1540                record.status = status;
1541                if let Ok(mut t) = transitions.lock() {
1542                    t.push((*rfq_id, status));
1543                }
1544            }
1545            record
1546        });
1547
1548        if let Ok(t) = transitions.into_inner() {
1549            for (rfq_id, status) in t {
1550                self.persist_status(&rfq_id, status);
1551            }
1552        }
1553
1554        // Evict terminal RFQs older than 5 minutes
1555        let eviction_threshold = now.saturating_sub(5 * 60 * 1000);
1556        self.active_rfqs.retain(|_, record| {
1557            !(record.status.is_terminal() && record.created_at < eviction_threshold)
1558        });
1559    }
1560}
1561
1562/// Test-only handle that wraps an `RfqManager` for unit tests, providing
1563/// direct access to internal state without requiring a real database or
1564/// quote provider cache.
1565#[cfg(test)]
1566struct TestRfqManager {
1567    active_rfqs: Arc<DashMap<Uuid, RfqRecord>>,
1568    config: RfqConfig,
1569}
1570
1571#[cfg(test)]
1572impl TestRfqManager {
1573    fn new() -> Self {
1574        Self {
1575            active_rfqs: Arc::new(DashMap::new()),
1576            config: RfqConfig::default(),
1577        }
1578    }
1579
1580    fn insert_rfq(&self, record: RfqRecord) {
1581        self.active_rfqs.insert(record.rfq_id, record);
1582    }
1583
1584    fn get_rfq(&self, rfq_id: &Uuid) -> Option<RfqRecord> {
1585        self.active_rfqs.get(rfq_id).map(|r| r.clone())
1586    }
1587
1588    /// Mirrors `RfqManager::handle_qp_response` logic for testing without
1589    /// requiring the full manager (which needs DB, QP cache, etc.).
1590    fn handle_qp_response(
1591        &self,
1592        rfq_id: Uuid,
1593        quote: QpQuote,
1594    ) -> Result<HandleQpResponseResult, String> {
1595        let mut entry = self.active_rfqs.get_mut(&rfq_id).ok_or("RFQ not found")?;
1596
1597        if entry.status != RfqStatus::SentToQps && entry.status != RfqStatus::QuotesReceived {
1598            return Err(format!(
1599                "RFQ in invalid state for quotes: {:?}",
1600                entry.status
1601            ));
1602        }
1603
1604        let now = current_time_ms();
1605        if now > entry.deadline_at {
1606            return Err("RFQ response deadline has passed".to_string());
1607        }
1608
1609        if quote.valid_for_ms > self.config.max_valid_for_ms {
1610            return Err(format!(
1611                "valid_for_ms {} exceeds max {}",
1612                quote.valid_for_ms, self.config.max_valid_for_ms
1613            ));
1614        }
1615
1616        if entry.taker_wallet == quote.qp_wallet {
1617            entry.responded_qps.insert(quote.qp_wallet);
1618            let snapshot = entry.clone();
1619            if entry.rpi_auction.is_some() {
1620                return Ok(HandleQpResponseResult::RpiQuoteRejected {
1621                    record: snapshot,
1622                    reason: RFQ_SELF_TRADE_REJECTION_REASON.to_string(),
1623                });
1624            }
1625            return Err(RFQ_SELF_TRADE_REJECTION_REASON.to_string());
1626        }
1627
1628        let rpi_auction = entry.rpi_auction.clone();
1629        if let Some(rpi) = rpi_auction.as_ref() {
1630            entry.responded_qps.insert(quote.qp_wallet);
1631            if let Err(reason) = validate_rpi_quote_shape(&entry.legs, &quote) {
1632                let snapshot = entry.clone();
1633                return Ok(HandleQpResponseResult::RpiQuoteRejected {
1634                    record: snapshot,
1635                    reason,
1636                });
1637            }
1638            let first_quote_leg = quote
1639                .legs
1640                .first()
1641                .ok_or_else(|| "RPI quote missing quote leg".to_string())?;
1642            if !quote_satisfies_rpi_auction(
1643                first_quote_leg.side,
1644                rpi.limit_price,
1645                rpi.reference_price,
1646                first_quote_leg.price,
1647                rpi.min_tick,
1648            ) {
1649                let snapshot = entry.clone();
1650                let (_, reason) = rpi_quote_rejection(
1651                    first_quote_leg.side,
1652                    rpi.limit_price,
1653                    rpi.reference_price,
1654                    first_quote_leg.price,
1655                    rpi.min_tick,
1656                );
1657                return Ok(HandleQpResponseResult::RpiQuoteRejected {
1658                    record: snapshot,
1659                    reason,
1660                });
1661            }
1662        } else {
1663            entry.responded_qps.insert(quote.qp_wallet);
1664        }
1665
1666        let quote_id = quote.quote_id;
1667        let quote_net_premium = quote.net_premium;
1668        entry.quotes.push(quote);
1669        let transitioned_to_quotes_received = entry.status == RfqStatus::SentToQps;
1670        if transitioned_to_quotes_received {
1671            entry.status = RfqStatus::QuotesReceived;
1672        }
1673
1674        if rpi_auction.is_some() {
1675            let snapshot = entry.clone();
1676            drop(entry);
1677            return Ok(HandleQpResponseResult::RpiQuoteStored(snapshot));
1678        }
1679
1680        let auto_accept_limit = entry.auto_accept_limit;
1681        let auto_execute = if let Some(limit) = auto_accept_limit {
1682            let within_limit = quote_satisfies_auto_accept_limit(
1683                &entry.legs,
1684                quote_net_premium,
1685                limit,
1686                self.config.min_improvement_tick(),
1687            )
1688            .unwrap_or(false);
1689
1690            if within_limit
1691                && (entry.status == RfqStatus::QuotesReceived
1692                    || entry.status == RfqStatus::SentToQps)
1693            {
1694                entry.status = RfqStatus::Accepted;
1695                entry.accepted_quote_id = Some(quote_id);
1696                true
1697            } else {
1698                false
1699            }
1700        } else {
1701            false
1702        };
1703
1704        let snapshot = entry.clone();
1705        drop(entry);
1706
1707        if auto_execute {
1708            Ok(HandleQpResponseResult::AutoExecuted {
1709                record: snapshot,
1710                accepted_quote_id: quote_id,
1711            })
1712        } else if let Some(limit) = auto_accept_limit {
1713            Ok(HandleQpResponseResult::AutoExecuteSkipped {
1714                record: snapshot,
1715                quote_premium: quote_net_premium.abs(),
1716                limit,
1717            })
1718        } else {
1719            Ok(HandleQpResponseResult::QuoteStored(snapshot))
1720        }
1721    }
1722}
1723
1724#[cfg(test)]
1725mod tests {
1726    use super::*;
1727    use rust_decimal_macros::dec;
1728
1729    fn test_wallet(id: u8) -> WalletAddress {
1730        let mut bytes = [0u8; 20];
1731        bytes[19] = id;
1732        WalletAddress::from(alloy::primitives::Address::from(bytes))
1733    }
1734
1735    /// Create a test RfqRecord in `SentToQps` state (ready to receive quotes).
1736    fn make_test_rfq(
1737        rfq_id: Uuid,
1738        taker_wallet: WalletAddress,
1739        auto_accept_limit: Option<Decimal>,
1740    ) -> RfqRecord {
1741        make_test_rfq_with_side(rfq_id, taker_wallet, auto_accept_limit, Side::Buy)
1742    }
1743
1744    fn make_test_rfq_with_side(
1745        rfq_id: Uuid,
1746        taker_wallet: WalletAddress,
1747        auto_accept_limit: Option<Decimal>,
1748        side: Side,
1749    ) -> RfqRecord {
1750        let now = current_time_ms();
1751        RfqRecord {
1752            rfq_id,
1753            taker_wallet,
1754            taker_signer: taker_wallet,
1755            builder_code_address: None,
1756            underlying: "ETH".to_string(),
1757            legs: vec![RfqLeg {
1758                instrument: "ETH-20260401-3000-C".to_string(),
1759                side,
1760                size: dec!(5),
1761            }],
1762            legs_hash: [0u8; 32],
1763            taker_signature: "0xtest_sig".to_string(),
1764            taker_nonce: 1,
1765            status: RfqStatus::SentToQps,
1766            quotes: Vec::new(),
1767            created_at: now,
1768            deadline_at: now + 30_000,
1769            expires_at: now + 60_000,
1770            auto_accept_limit,
1771            accepted_quote_id: None,
1772            execution_quote_id: None,
1773            execution_request_id: None,
1774            execution_fill_id: None,
1775            execution_timestamp_ms: None,
1776            eligible_qps: Vec::new(),
1777            responded_qps: HashSet::new(),
1778            rpi_auction: None,
1779        }
1780    }
1781
1782    /// Create a test QpQuote with the given net_premium.
1783    fn make_test_quote(qp_wallet: WalletAddress, net_premium: Decimal) -> QpQuote {
1784        make_test_quote_with_leg(qp_wallet, net_premium, Side::Buy, dec!(150), dec!(5))
1785    }
1786
1787    fn make_test_quote_with_leg(
1788        qp_wallet: WalletAddress,
1789        net_premium: Decimal,
1790        side: Side,
1791        price: Decimal,
1792        size: Decimal,
1793    ) -> QpQuote {
1794        QpQuote {
1795            quote_id: Uuid::now_v7(),
1796            qp_wallet,
1797            legs: vec![QuoteLeg {
1798                instrument: "ETH-20260401-3000-C".to_string(),
1799                side,
1800                price,
1801                size,
1802            }],
1803            net_premium,
1804            valid_for_ms: 25_000,
1805            qp_signature: "0xqp_sig".to_string(),
1806            qp_nonce: 1,
1807            received_at: current_time_ms(),
1808        }
1809    }
1810
1811    fn make_test_rpi_rfq(rfq_id: Uuid, taker_wallet: WalletAddress, side: Side) -> RfqRecord {
1812        let mut record = make_test_rfq_with_side(rfq_id, taker_wallet, None, side);
1813        record.rpi_auction = Some(RpiAuctionContext {
1814            limit_price: dec!(100),
1815            reference_price: Some(dec!(100)),
1816            min_tick: dec!(0.0001),
1817        });
1818        record
1819    }
1820
1821    fn make_test_rpi_rfq_without_book_reference(
1822        rfq_id: Uuid,
1823        taker_wallet: WalletAddress,
1824        side: Side,
1825    ) -> RfqRecord {
1826        let mut record = make_test_rpi_rfq(rfq_id, taker_wallet, side);
1827        record
1828            .rpi_auction
1829            .as_mut()
1830            .expect("test RPI context should exist")
1831            .reference_price = None;
1832        record
1833    }
1834
1835    #[test]
1836    fn test_rfq_config_default() {
1837        let config = RfqConfig::default();
1838        assert_eq!(config.response_deadline_ms, 10_000);
1839        assert_eq!(config.rpi_auction_ms, 2_000);
1840        assert_eq!(config.min_improvement_ticks, 1);
1841        assert!(config.reveal_limit_to_qps);
1842        assert_eq!(config.max_valid_for_ms, 30_000);
1843        assert_eq!(config.rfq_lifetime_ms, 60_000);
1844        assert_eq!(config.max_legs, 10);
1845    }
1846
1847    #[test]
1848    fn test_rfq_record_lifecycle() {
1849        let record = RfqRecord {
1850            rfq_id: Uuid::now_v7(),
1851            taker_wallet: test_wallet(1),
1852            taker_signer: test_wallet(1),
1853            builder_code_address: None,
1854            underlying: "ETH".to_string(),
1855            legs: vec![RfqLeg {
1856                instrument: "ETH-20260401-3000-C".to_string(),
1857                side: Side::Buy,
1858                size: dec!(5),
1859            }],
1860            legs_hash: [0u8; 32],
1861            taker_signature: "0x123".to_string(),
1862            taker_nonce: 1,
1863            status: RfqStatus::Created,
1864            quotes: Vec::new(),
1865            created_at: current_time_ms(),
1866            deadline_at: current_time_ms() + 5000,
1867            expires_at: current_time_ms() + 60000,
1868            auto_accept_limit: None,
1869            accepted_quote_id: None,
1870            execution_quote_id: None,
1871            execution_request_id: None,
1872            execution_fill_id: None,
1873            execution_timestamp_ms: None,
1874            eligible_qps: Vec::new(),
1875            responded_qps: HashSet::new(),
1876            rpi_auction: None,
1877        };
1878
1879        assert_eq!(record.status, RfqStatus::Created);
1880        assert!(!record.status.is_terminal());
1881    }
1882
1883    #[test]
1884    fn test_rfq_execution_facts_stable_for_quote_retries() {
1885        let rfq_id = Uuid::now_v7();
1886        let quote_id = Uuid::now_v7();
1887        let other_quote_id = Uuid::now_v7();
1888        let mut record = make_test_rfq(rfq_id, test_wallet(1), None);
1889
1890        let first = record.execution_facts_for_quote(quote_id, 100);
1891        let retry = record.execution_facts_for_quote(quote_id, 200);
1892        assert_eq!(
1893            first, retry,
1894            "same accepted quote must reuse execution facts across retries"
1895        );
1896
1897        let other_quote = record.execution_facts_for_quote(other_quote_id, 300);
1898        assert_ne!(
1899            first.0, other_quote.0,
1900            "different accepted quotes must not share a request_id"
1901        );
1902        assert_ne!(
1903            first.1, other_quote.1,
1904            "different accepted quotes must not share a fill_id"
1905        );
1906        assert_eq!(other_quote.2, 300);
1907    }
1908
1909    #[test]
1910    fn test_auto_accept_rollback_clears_execution_selection() {
1911        let rfq_id = Uuid::now_v7();
1912        let quote_id = Uuid::now_v7();
1913        let mut record = make_test_rfq(rfq_id, test_wallet(1), Some(dec!(5.0)));
1914
1915        record.status = RfqStatus::Accepted;
1916        record.accepted_quote_id = Some(quote_id);
1917        let _ = record.execution_facts_for_quote(quote_id, 100);
1918
1919        record.revert_auto_accept_to_quotes_received();
1920
1921        assert_eq!(record.status, RfqStatus::QuotesReceived);
1922        assert_eq!(record.auto_accept_limit, None);
1923        assert_eq!(record.accepted_quote_id, None);
1924        assert_eq!(record.execution_quote_id, None);
1925        assert_eq!(record.execution_request_id, None);
1926        assert_eq!(record.execution_fill_id, None);
1927        assert_eq!(record.execution_timestamp_ms, None);
1928    }
1929
1930    #[test]
1931    fn test_rfq_status_terminal() {
1932        assert!(!RfqStatus::Created.is_terminal());
1933        assert!(!RfqStatus::SentToQps.is_terminal());
1934        assert!(!RfqStatus::QuotesReceived.is_terminal());
1935        assert!(!RfqStatus::Accepted.is_terminal());
1936        assert!(RfqStatus::NoQuotes.is_terminal());
1937        assert!(RfqStatus::Expired.is_terminal());
1938        assert!(RfqStatus::Executed.is_terminal());
1939        assert!(RfqStatus::Failed.is_terminal());
1940    }
1941
1942    #[test]
1943    fn test_auto_execute_rfq() {
1944        let manager = TestRfqManager::new();
1945
1946        let rfq_id = Uuid::now_v7();
1947        let taker = test_wallet(1);
1948        let qp = test_wallet(2);
1949
1950        // Insert an RFQ with auto_accept_limit = 5.0
1951        let record = make_test_rfq(rfq_id, taker, Some(dec!(5.0)));
1952        manager.insert_rfq(record);
1953
1954        // Taker buy quotes encode as negative premium. A 3.0 debit is within
1955        // a 5.0 max debit limit.
1956        let quote = make_test_quote(qp, dec!(-3.0));
1957        let quote_id = quote.quote_id;
1958
1959        let result = manager.handle_qp_response(rfq_id, quote).unwrap();
1960
1961        // Should be auto-executed
1962        match result {
1963            HandleQpResponseResult::AutoExecuted {
1964                record,
1965                accepted_quote_id,
1966            } => {
1967                assert_eq!(accepted_quote_id, quote_id);
1968                assert_eq!(record.status, RfqStatus::Accepted);
1969            }
1970            HandleQpResponseResult::QuoteStored(_) => {
1971                panic!("Expected AutoExecuted, got QuoteStored");
1972            }
1973            HandleQpResponseResult::AutoExecuteSkipped { .. } => {
1974                panic!("Expected AutoExecuted, got AutoExecuteSkipped");
1975            }
1976            HandleQpResponseResult::RpiQuoteStored(_)
1977            | HandleQpResponseResult::RpiQuoteRejected { .. } => {
1978                panic!("Expected AutoExecuted, got RPI result");
1979            }
1980        }
1981
1982        // Verify the RFQ status is Accepted in the manager
1983        let stored = manager.get_rfq(&rfq_id).unwrap();
1984        assert_eq!(stored.status, RfqStatus::Accepted);
1985
1986        // The auto_accept_quote method would be called by the server to
1987        // dispatch the execute command. We verify the record state is
1988        // correct for that dispatch.
1989        assert_eq!(stored.quotes.len(), 1);
1990        assert_eq!(stored.quotes[0].quote_id, quote_id);
1991    }
1992
1993    #[test]
1994    fn test_manual_rfq_rejects_self_trade_quote() {
1995        let manager = TestRfqManager::new();
1996
1997        let rfq_id = Uuid::now_v7();
1998        let taker = test_wallet(1);
1999
2000        manager.insert_rfq(make_test_rfq(rfq_id, taker, None));
2001
2002        let result = manager.handle_qp_response(rfq_id, make_test_quote(taker, dec!(-3.0)));
2003
2004        assert_eq!(
2005            result.unwrap_err(),
2006            RFQ_SELF_TRADE_REJECTION_REASON.to_string()
2007        );
2008        let stored = manager.get_rfq(&rfq_id).unwrap();
2009        assert_eq!(stored.status, RfqStatus::SentToQps);
2010        assert_eq!(stored.quotes.len(), 0);
2011        assert!(stored.responded_qps.contains(&taker));
2012    }
2013
2014    #[test]
2015    fn test_auto_execute_rfq_rejects_self_trade_quote() {
2016        let manager = TestRfqManager::new();
2017
2018        let rfq_id = Uuid::now_v7();
2019        let taker = test_wallet(1);
2020
2021        manager.insert_rfq(make_test_rfq(rfq_id, taker, Some(dec!(5.0))));
2022
2023        let result = manager.handle_qp_response(rfq_id, make_test_quote(taker, dec!(-3.0)));
2024
2025        assert_eq!(
2026            result.unwrap_err(),
2027            RFQ_SELF_TRADE_REJECTION_REASON.to_string()
2028        );
2029        let stored = manager.get_rfq(&rfq_id).unwrap();
2030        assert_eq!(stored.status, RfqStatus::SentToQps);
2031        assert_eq!(stored.accepted_quote_id, None);
2032        assert_eq!(stored.quotes.len(), 0);
2033        assert!(stored.responded_qps.contains(&taker));
2034    }
2035
2036    #[test]
2037    fn test_auto_execute_rfq_outside_limit() {
2038        let manager = TestRfqManager::new();
2039
2040        let rfq_id = Uuid::now_v7();
2041        let taker = test_wallet(1);
2042        let qp = test_wallet(2);
2043
2044        // Insert an RFQ with auto_accept_limit = 2.0
2045        let record = make_test_rfq(rfq_id, taker, Some(dec!(2.0)));
2046        manager.insert_rfq(record);
2047
2048        // A 3.0 debit is outside a 2.0 max debit limit.
2049        let quote = make_test_quote(qp, dec!(-3.0));
2050
2051        let result = manager.handle_qp_response(rfq_id, quote).unwrap();
2052
2053        // Should skip auto-execute because quote exceeds limit
2054        match result {
2055            HandleQpResponseResult::AutoExecuteSkipped {
2056                record,
2057                quote_premium,
2058                limit,
2059            } => {
2060                assert_eq!(record.status, RfqStatus::QuotesReceived);
2061                assert_eq!(record.quotes.len(), 1);
2062                assert_eq!(quote_premium, dec!(3.0));
2063                assert_eq!(limit, dec!(2.0));
2064            }
2065            HandleQpResponseResult::QuoteStored(_) => {
2066                panic!("Expected AutoExecuteSkipped, got QuoteStored");
2067            }
2068            HandleQpResponseResult::AutoExecuted { .. } => {
2069                panic!("Expected AutoExecuteSkipped, got AutoExecuted");
2070            }
2071            HandleQpResponseResult::RpiQuoteStored(_)
2072            | HandleQpResponseResult::RpiQuoteRejected { .. } => {
2073                panic!("Expected AutoExecuteSkipped, got RPI result");
2074            }
2075        }
2076
2077        // Verify the RFQ status is QuotesReceived (not Accepted)
2078        let stored = manager.get_rfq(&rfq_id).unwrap();
2079        assert_eq!(stored.status, RfqStatus::QuotesReceived);
2080    }
2081
2082    #[test]
2083    fn test_auto_execute_prevents_double_fill() {
2084        let manager = TestRfqManager::new();
2085
2086        let rfq_id = Uuid::now_v7();
2087        let taker = test_wallet(1);
2088        let qp1 = test_wallet(2);
2089        let qp2 = test_wallet(3);
2090
2091        // Insert an RFQ with auto_accept_limit = 5.0
2092        let record = make_test_rfq(rfq_id, taker, Some(dec!(5.0)));
2093        manager.insert_rfq(record);
2094
2095        // First QP responds with a 3.0 debit within the limit -> AutoExecuted
2096        let quote1 = make_test_quote(qp1, dec!(-3.0));
2097        let result1 = manager.handle_qp_response(rfq_id, quote1).unwrap();
2098        assert!(
2099            matches!(result1, HandleQpResponseResult::AutoExecuted { .. }),
2100            "First quote should auto-execute"
2101        );
2102
2103        // Second QP responds -> should be rejected (RFQ already in Accepted state)
2104        let quote2 = make_test_quote(qp2, dec!(-2.0));
2105        let result2 = manager.handle_qp_response(rfq_id, quote2);
2106        assert!(
2107            result2.is_err(),
2108            "Second quote should be rejected because RFQ is already Accepted"
2109        );
2110        assert!(
2111            result2.unwrap_err().contains("invalid state for quotes"),
2112            "Error should indicate invalid state"
2113        );
2114    }
2115
2116    #[test]
2117    fn test_auto_execute_sell_quote_below_limit_is_skipped() {
2118        let manager = TestRfqManager::new();
2119
2120        let rfq_id = Uuid::now_v7();
2121        let taker = test_wallet(1);
2122        let qp = test_wallet(2);
2123
2124        let record = make_test_rfq_with_side(rfq_id, taker, Some(dec!(1.9553)), Side::Sell);
2125        manager.insert_rfq(record);
2126
2127        let quote = make_test_quote(qp, dec!(1.0));
2128        let result = manager.handle_qp_response(rfq_id, quote).unwrap();
2129
2130        match result {
2131            HandleQpResponseResult::AutoExecuteSkipped {
2132                record,
2133                quote_premium,
2134                limit,
2135            } => {
2136                assert_eq!(record.status, RfqStatus::QuotesReceived);
2137                assert_eq!(quote_premium, dec!(1.0));
2138                assert_eq!(limit, dec!(1.9553));
2139            }
2140            HandleQpResponseResult::QuoteStored(_) => {
2141                panic!("Expected AutoExecuteSkipped, got QuoteStored");
2142            }
2143            HandleQpResponseResult::AutoExecuted { .. } => {
2144                panic!("Sell quote below limit must not auto-execute");
2145            }
2146            HandleQpResponseResult::RpiQuoteStored(_)
2147            | HandleQpResponseResult::RpiQuoteRejected { .. } => {
2148                panic!("Expected AutoExecuteSkipped, got RPI result");
2149            }
2150        }
2151    }
2152
2153    #[test]
2154    fn test_auto_execute_sell_quote_at_or_above_limit_executes() {
2155        let manager = TestRfqManager::new();
2156
2157        let rfq_id = Uuid::now_v7();
2158        let taker = test_wallet(1);
2159        let qp = test_wallet(2);
2160
2161        let record = make_test_rfq_with_side(rfq_id, taker, Some(dec!(1.9553)), Side::Sell);
2162        manager.insert_rfq(record);
2163
2164        let quote = make_test_quote(qp, dec!(1.96));
2165        let quote_id = quote.quote_id;
2166        let result = manager.handle_qp_response(rfq_id, quote).unwrap();
2167
2168        match result {
2169            HandleQpResponseResult::AutoExecuted {
2170                record,
2171                accepted_quote_id,
2172            } => {
2173                assert_eq!(accepted_quote_id, quote_id);
2174                assert_eq!(record.status, RfqStatus::Accepted);
2175            }
2176            HandleQpResponseResult::QuoteStored(_) => {
2177                panic!("Expected AutoExecuted, got QuoteStored");
2178            }
2179            HandleQpResponseResult::AutoExecuteSkipped { .. } => {
2180                panic!("Expected AutoExecuted, got AutoExecuteSkipped");
2181            }
2182            HandleQpResponseResult::RpiQuoteStored(_)
2183            | HandleQpResponseResult::RpiQuoteRejected { .. } => {
2184                panic!("Expected AutoExecuted, got RPI result");
2185            }
2186        }
2187    }
2188
2189    #[test]
2190    fn test_auto_execute_mixed_side_limit_is_rejected() {
2191        let legs = vec![
2192            RfqLeg {
2193                instrument: "ETH-20260401-3000-C".to_string(),
2194                side: Side::Buy,
2195                size: dec!(1),
2196            },
2197            RfqLeg {
2198                instrument: "ETH-20260401-3000-P".to_string(),
2199                side: Side::Sell,
2200                size: dec!(1),
2201            },
2202        ];
2203
2204        let err = quote_satisfies_auto_accept_limit(&legs, dec!(1), dec!(1), dec!(0.0001))
2205            .expect_err("mixed-side auto-execute RFQ should be rejected");
2206
2207        assert!(err.contains("mixed-side packages require explicit quote acceptance"));
2208    }
2209
2210    #[test]
2211    fn test_rpi_buy_accepts_limit_without_book_reference() {
2212        assert!(quote_qualifies_for_rpi(
2213            Side::Buy,
2214            dec!(100),
2215            None,
2216            dec!(100),
2217            dec!(0.0001),
2218        ));
2219        assert!(!quote_qualifies_for_rpi(
2220            Side::Buy,
2221            dec!(100),
2222            None,
2223            dec!(100.0001),
2224            dec!(0.0001),
2225        ));
2226    }
2227
2228    #[test]
2229    fn test_rpi_price_improvement_buy_beats_executable_book() {
2230        assert!(quote_qualifies_for_rpi(
2231            Side::Buy,
2232            dec!(105),
2233            Some(dec!(100)),
2234            dec!(99.9999),
2235            dec!(0.0001),
2236        ));
2237        assert!(!quote_qualifies_for_rpi(
2238            Side::Buy,
2239            dec!(105),
2240            Some(dec!(100)),
2241            dec!(104.9999),
2242            dec!(0.0001),
2243        ));
2244    }
2245
2246    #[test]
2247    fn test_rpi_price_improvement_sell_beats_executable_book() {
2248        assert!(quote_qualifies_for_rpi(
2249            Side::Sell,
2250            dec!(95),
2251            Some(dec!(100)),
2252            dec!(100.0001),
2253            dec!(0.0001),
2254        ));
2255        assert!(!quote_qualifies_for_rpi(
2256            Side::Sell,
2257            dec!(95),
2258            Some(dec!(100)),
2259            dec!(95.0001),
2260            dec!(0.0001),
2261        ));
2262    }
2263
2264    #[test]
2265    fn test_rpi_rejection_prefers_limit_reason_over_reference_reason() {
2266        let (label, reason) = rpi_quote_rejection(
2267            Side::Buy,
2268            dec!(100),
2269            Some(dec!(99)),
2270            dec!(100.0001),
2271            dec!(0.0001),
2272        );
2273
2274        assert_eq!(label, "outside_limit");
2275        assert!(reason.contains("does not satisfy limit"));
2276    }
2277
2278    #[test]
2279    fn test_rpi_rejection_reports_missing_price_improvement() {
2280        let (label, reason) = rpi_quote_rejection(
2281            Side::Buy,
2282            dec!(105),
2283            Some(dec!(100)),
2284            dec!(100),
2285            dec!(0.0001),
2286        );
2287
2288        assert_eq!(label, "no_price_improvement");
2289        assert!(reason.contains("does not improve reference"));
2290    }
2291
2292    #[test]
2293    fn test_rpi_quote_rejects_price_without_improvement() {
2294        let manager = TestRfqManager::new();
2295        let rfq_id = Uuid::now_v7();
2296        let taker = test_wallet(1);
2297        let qp = test_wallet(2);
2298
2299        manager.insert_rfq(make_test_rpi_rfq(rfq_id, taker, Side::Buy));
2300
2301        let quote = make_test_quote_with_leg(qp, dec!(-500), Side::Buy, dec!(100), dec!(5));
2302        let result = manager.handle_qp_response(rfq_id, quote).unwrap();
2303
2304        match result {
2305            HandleQpResponseResult::RpiQuoteRejected { record, reason } => {
2306                assert!(reason.contains("does not improve reference"));
2307                assert_eq!(record.quotes.len(), 0);
2308                assert!(record.responded_qps.contains(&qp));
2309            }
2310            other => panic!("Expected RpiQuoteRejected, got {:?}", other),
2311        }
2312    }
2313
2314    #[test]
2315    fn test_rpi_quote_stores_limit_price_when_no_book_reference() {
2316        let manager = TestRfqManager::new();
2317        let rfq_id = Uuid::now_v7();
2318        let taker = test_wallet(1);
2319        let qp = test_wallet(2);
2320
2321        manager.insert_rfq(make_test_rpi_rfq_without_book_reference(
2322            rfq_id,
2323            taker,
2324            Side::Buy,
2325        ));
2326
2327        let quote = make_test_quote_with_leg(qp, dec!(-500), Side::Buy, dec!(100), dec!(5));
2328        let quote_id = quote.quote_id;
2329        let result = manager.handle_qp_response(rfq_id, quote).unwrap();
2330
2331        match result {
2332            HandleQpResponseResult::RpiQuoteStored(record) => {
2333                assert_eq!(record.status, RfqStatus::QuotesReceived);
2334                assert_eq!(record.quotes.len(), 1);
2335                assert_eq!(record.quotes[0].quote_id, quote_id);
2336                assert!(record.responded_qps.contains(&qp));
2337            }
2338            other => panic!("Expected RpiQuoteStored, got {:?}", other),
2339        }
2340    }
2341
2342    #[test]
2343    fn test_rpi_quote_rejects_price_outside_limit_without_book_reference() {
2344        let manager = TestRfqManager::new();
2345        let rfq_id = Uuid::now_v7();
2346        let taker = test_wallet(1);
2347        let qp = test_wallet(2);
2348
2349        manager.insert_rfq(make_test_rpi_rfq_without_book_reference(
2350            rfq_id,
2351            taker,
2352            Side::Buy,
2353        ));
2354
2355        let quote =
2356            make_test_quote_with_leg(qp, dec!(-500.0005), Side::Buy, dec!(100.0001), dec!(5));
2357        let result = manager.handle_qp_response(rfq_id, quote).unwrap();
2358
2359        match result {
2360            HandleQpResponseResult::RpiQuoteRejected { record, reason } => {
2361                assert!(reason.contains("does not satisfy limit"));
2362                assert_eq!(record.quotes.len(), 0);
2363                assert!(record.responded_qps.contains(&qp));
2364            }
2365            other => panic!("Expected RpiQuoteRejected, got {:?}", other),
2366        }
2367    }
2368
2369    #[test]
2370    fn test_rpi_auction_close_boundary_allows_deadline_millisecond() {
2371        let rfq_id = Uuid::now_v7();
2372        let taker = test_wallet(1);
2373        let mut record = make_test_rpi_rfq(rfq_id, taker, Side::Buy);
2374        record.eligible_qps = vec![test_wallet(2)];
2375        record.deadline_at = 1_000;
2376
2377        assert!(!should_close_rpi_auction(&record, 999));
2378        assert!(!should_close_rpi_auction(&record, 1_000));
2379        assert!(should_close_rpi_auction(&record, 1_001));
2380    }
2381
2382    #[test]
2383    fn test_rpi_quote_rejects_self_trade_and_counts_response() {
2384        let manager = TestRfqManager::new();
2385        let rfq_id = Uuid::now_v7();
2386        let taker = test_wallet(1);
2387
2388        manager.insert_rfq(make_test_rpi_rfq(rfq_id, taker, Side::Buy));
2389
2390        let quote =
2391            make_test_quote_with_leg(taker, dec!(-499.9995), Side::Buy, dec!(99.9999), dec!(5));
2392        let result = manager.handle_qp_response(rfq_id, quote).unwrap();
2393
2394        match result {
2395            HandleQpResponseResult::RpiQuoteRejected { record, reason } => {
2396                assert_eq!(reason, RFQ_SELF_TRADE_REJECTION_REASON);
2397                assert_eq!(record.status, RfqStatus::SentToQps);
2398                assert_eq!(record.quotes.len(), 0);
2399                assert!(record.responded_qps.contains(&taker));
2400            }
2401            other => panic!("Expected RpiQuoteRejected, got {:?}", other),
2402        }
2403    }
2404
2405    #[test]
2406    fn test_rpi_quote_rejects_partial_size() {
2407        let manager = TestRfqManager::new();
2408        let rfq_id = Uuid::now_v7();
2409        let taker = test_wallet(1);
2410        let qp = test_wallet(2);
2411
2412        manager.insert_rfq(make_test_rpi_rfq(rfq_id, taker, Side::Buy));
2413
2414        let quote =
2415            make_test_quote_with_leg(qp, dec!(-499.9995), Side::Buy, dec!(99.9999), dec!(4));
2416        let result = manager.handle_qp_response(rfq_id, quote).unwrap();
2417
2418        match result {
2419            HandleQpResponseResult::RpiQuoteRejected { record, reason } => {
2420                assert!(reason.contains("does not fully cover requested size"));
2421                assert_eq!(record.quotes.len(), 0);
2422                assert!(record.responded_qps.contains(&qp));
2423            }
2424            other => panic!("Expected RpiQuoteRejected, got {:?}", other),
2425        }
2426    }
2427
2428    #[test]
2429    fn test_rpi_quote_stores_full_size_price_improved_candidate() {
2430        let manager = TestRfqManager::new();
2431        let rfq_id = Uuid::now_v7();
2432        let taker = test_wallet(1);
2433        let qp = test_wallet(2);
2434
2435        manager.insert_rfq(make_test_rpi_rfq(rfq_id, taker, Side::Buy));
2436
2437        let quote =
2438            make_test_quote_with_leg(qp, dec!(-499.9995), Side::Buy, dec!(99.9999), dec!(5));
2439        let quote_id = quote.quote_id;
2440        let result = manager.handle_qp_response(rfq_id, quote).unwrap();
2441
2442        match result {
2443            HandleQpResponseResult::RpiQuoteStored(record) => {
2444                assert_eq!(record.status, RfqStatus::QuotesReceived);
2445                assert_eq!(record.quotes.len(), 1);
2446                assert_eq!(record.quotes[0].quote_id, quote_id);
2447                assert!(record.responded_qps.contains(&qp));
2448            }
2449            other => panic!("Expected RpiQuoteStored, got {:?}", other),
2450        }
2451    }
2452
2453    #[test]
2454    fn test_rpi_candidate_sort_buy_best_price_then_arrival() {
2455        let rfq_id = Uuid::now_v7();
2456        let taker = test_wallet(1);
2457        let mut record = make_test_rpi_rfq(rfq_id, taker, Side::Buy);
2458        let older_best = Uuid::now_v7();
2459        let newer_best = Uuid::now_v7();
2460        let worse = Uuid::now_v7();
2461        record.quotes = vec![
2462            QpQuote {
2463                quote_id: worse,
2464                qp_wallet: test_wallet(2),
2465                legs: vec![QuoteLeg {
2466                    instrument: "ETH-20260401-3000-C".to_string(),
2467                    side: Side::Buy,
2468                    price: dec!(99.90),
2469                    size: dec!(5),
2470                }],
2471                net_premium: dec!(-499.50),
2472                valid_for_ms: 25_000,
2473                qp_signature: "0xqp_sig".to_string(),
2474                qp_nonce: 1,
2475                received_at: 3,
2476            },
2477            QpQuote {
2478                quote_id: newer_best,
2479                qp_wallet: test_wallet(3),
2480                legs: vec![QuoteLeg {
2481                    instrument: "ETH-20260401-3000-C".to_string(),
2482                    side: Side::Buy,
2483                    price: dec!(99.80),
2484                    size: dec!(5),
2485                }],
2486                net_premium: dec!(-499),
2487                valid_for_ms: 25_000,
2488                qp_signature: "0xqp_sig".to_string(),
2489                qp_nonce: 1,
2490                received_at: 2,
2491            },
2492            QpQuote {
2493                quote_id: older_best,
2494                qp_wallet: test_wallet(4),
2495                legs: vec![QuoteLeg {
2496                    instrument: "ETH-20260401-3000-C".to_string(),
2497                    side: Side::Buy,
2498                    price: dec!(99.80),
2499                    size: dec!(5),
2500                }],
2501                net_premium: dec!(-499),
2502                valid_for_ms: 25_000,
2503                qp_signature: "0xqp_sig".to_string(),
2504                qp_nonce: 1,
2505                received_at: 1,
2506            },
2507        ];
2508
2509        let sorted = sorted_rpi_candidate_quote_ids(&record).unwrap();
2510
2511        assert_eq!(sorted, vec![older_best, newer_best, worse]);
2512    }
2513}