Skip to main content

hypercall_api/websocket/
rfq.rs

1use crate::rfq::indicative_quote_cache::{AggregatedIndicativeQuote, IndicativeQuoteEntry};
2use crate::rfq::qp_ws_state::{QpWsState, RfqWebsocketPublisher};
3use axum::{
4    extract::{
5        ws::{Message, WebSocket, WebSocketUpgrade},
6        State,
7    },
8    http::HeaderMap,
9    response::Response,
10};
11use futures::{SinkExt, StreamExt};
12use hypercall_auth::SignatureRecovery;
13use hypercall_types::ws_protocol::{
14    WsIndicativeMarketData, WsMessage, WsRfqQuoteEntry, WsRfqQuotes, WsRfqStatusUpdate,
15};
16use hypercall_types::WalletAddress;
17use hypercall_ws_protocol::{
18    QpClientMessage as QpInboundMessage, QpServerMessage as QpOutboundMessage,
19};
20use metrics::counter;
21use rust_decimal::Decimal;
22use std::sync::Arc;
23use tokio::sync::mpsc;
24use tracing::{info, warn};
25
26/// Fallback `valid_for_ms` applied when a quote provider omits the field on
27/// an inbound RFQ response. Chosen to match the rfq-mm default and stay under
28/// `RfqConfig::max_valid_for_ms` (30s).
29const DEFAULT_QUOTE_VALID_FOR_MS: u64 = 25_000;
30const GATEWAY_TOKEN_HEADER: &str = "x-hypercall-gateway-token";
31
32/// Axum handler for QP WebSocket upgrade at `/ws/quotes`.
33pub async fn qp_websocket_handler(
34    ws: WebSocketUpgrade,
35    headers: HeaderMap,
36    State(state): State<QpWsState>,
37) -> Response {
38    let gateway_resume_authorized = gateway_resume_authorized(&headers, &state);
39    ws.on_upgrade(move |socket| handle_qp_socket(socket, state, gateway_resume_authorized))
40}
41
42fn gateway_resume_authorized(headers: &HeaderMap, state: &QpWsState) -> bool {
43    let Some(expected) = state.gateway_resume_token.as_deref() else {
44        return false;
45    };
46    headers
47        .get(GATEWAY_TOKEN_HEADER)
48        .and_then(|value| value.to_str().ok())
49        .is_some_and(|actual| actual == expected)
50}
51
52async fn handle_qp_socket(socket: WebSocket, state: QpWsState, gateway_resume_authorized: bool) {
53    let (mut ws_sender, mut ws_receiver) = socket.split();
54
55    // Step 1: Wait for first frame (ConnectQuoteProvider auth)
56    let auth_result =
57        match tokio::time::timeout(std::time::Duration::from_secs(10), ws_receiver.next()).await {
58            Ok(Some(Ok(Message::Text(text)))) => {
59                authenticate_qp(&text, &state, gateway_resume_authorized).await
60            }
61            Ok(Some(Ok(Message::Binary(bytes)))) => {
62                if let Ok(text) = String::from_utf8(bytes.to_vec()) {
63                    authenticate_qp(&text, &state, gateway_resume_authorized).await
64                } else {
65                    Err("Invalid binary frame".to_string())
66                }
67            }
68            _ => Err("Timeout or connection closed before auth".to_string()),
69        };
70
71    let qp_wallet = match auth_result {
72        Ok(wallet) => {
73            let msg = QpOutboundMessage::Authenticated {
74                wallet: wallet.as_hex(),
75            };
76            let json = serde_json::to_string(&msg).unwrap();
77            if ws_sender.send(Message::Text(json)).await.is_err() {
78                return;
79            }
80            wallet
81        }
82        Err(reason) => {
83            let msg = QpOutboundMessage::AuthFailed { reason };
84            let json = serde_json::to_string(&msg).unwrap();
85            let _ = ws_sender.send(Message::Text(json)).await;
86            return;
87        }
88    };
89
90    info!("QP authenticated: {}", qp_wallet.as_hex());
91
92    // Step 2: Register session
93    let (outbound_tx, mut outbound_rx) = mpsc::unbounded_channel::<QpOutboundMessage>();
94    let session_id = state
95        .session_manager
96        .register_session(qp_wallet, outbound_tx);
97
98    // Step 3: Message loop
99    loop {
100        tokio::select! {
101            // Outbound: engine -> QP
102            Some(msg) = outbound_rx.recv() => {
103                let json = serde_json::to_string(&msg).unwrap();
104                if ws_sender.send(Message::Text(json)).await.is_err() {
105                    break;
106                }
107            }
108            // Inbound: QP -> engine
109            Some(Ok(frame)) = ws_receiver.next() => {
110                match frame {
111                    Message::Text(text) => {
112                        handle_qp_message(&text, &qp_wallet, &state).await;
113                    }
114                    Message::Binary(bytes) => {
115                        if let Ok(text) = String::from_utf8(bytes.to_vec()) {
116                            handle_qp_message(&text, &qp_wallet, &state).await;
117                        }
118                    }
119                    Message::Ping(_) => {
120                        let _ = ws_sender.send(Message::Pong(vec![])).await;
121                    }
122                    Message::Close(_) => break,
123                    _ => {}
124                }
125            }
126            else => break,
127        }
128    }
129
130    // Step 4: Cleanup on disconnect
131    // Only evict this QP's indicative quotes if the disconnecting socket
132    // was the LAST session for the wallet. `QpSessionManager` supports
133    // multiple concurrent sessions per wallet (e.g. two tabs, rolling
134    // deploy with overlapping sessions), and unconditional eviction on
135    // any single disconnect would clobber quotes that the surviving
136    // sessions are still actively updating.
137    let fully_disconnected = state.session_manager.remove_session(&qp_wallet, session_id);
138    if fully_disconnected {
139        let updates = state.indicative_cache.evict_qp(&qp_wallet);
140        publish_indicative_updates(&state.pubsub, updates);
141    }
142    info!("QP disconnected: {}", qp_wallet.as_hex());
143}
144
145async fn authenticate_qp(
146    text: &str,
147    state: &QpWsState,
148    gateway_resume_authorized: bool,
149) -> Result<WalletAddress, String> {
150    let msg: QpInboundMessage =
151        serde_json::from_str(text).map_err(|e| format!("Invalid JSON: {}", e))?;
152
153    match msg {
154        QpInboundMessage::ConnectQuoteProvider {
155            wallet,
156            timestamp,
157            nonce,
158            signature,
159        } => {
160            let wallet_addr = WalletAddress::from_str(&wallet)
161                .map_err(|_| format!("Invalid wallet address: {}", wallet))?;
162
163            let ts = timestamp
164                .parse::<u64>()
165                .map_err(|_| "Invalid timestamp".to_string())?;
166
167            let recovered = SignatureRecovery::recover_connect_quote_provider_signer(
168                wallet_addr,
169                alloy::primitives::U256::from(ts),
170                nonce,
171                &signature,
172                state.signing_chain_id,
173            )
174            .map_err(|e| format!("Signature recovery failed: {}", e))?;
175
176            if recovered != wallet_addr.inner() {
177                return Err("Signature does not match wallet".to_string());
178            }
179
180            // Replay protection: advance the engine nonce watermark. The
181            // engine rejects if nonce <= max_seen[wallet] (strict monotonic).
182            let (tx, rx) = tokio::sync::oneshot::channel();
183            let nonce_req = hypercall_runtime_api::NonceCheckRequest {
184                wallet: wallet_addr,
185                nonce,
186                applied_tx: tx,
187            };
188            state
189                .nonce_check_sender
190                .send(nonce_req)
191                .await
192                .map_err(|_| "Engine nonce channel closed".to_string())?;
193            match tokio::time::timeout(std::time::Duration::from_secs(5), rx).await {
194                Ok(Ok(Ok(()))) => {}
195                Ok(Ok(Err(e))) => return Err(format!("Handshake nonce rejected: {}", e)),
196                Ok(Err(_)) => return Err("Engine dropped nonce response channel".to_string()),
197                Err(_) => return Err("Timed out waiting for nonce check".to_string()),
198            }
199
200            if !state.qp_cache.is_active(&wallet_addr).await {
201                return Err("Wallet is not a registered active quote provider".to_string());
202            }
203
204            Ok(wallet_addr)
205        }
206        QpInboundMessage::GatewayResumeQuoteProvider {
207            wallet,
208            timestamp,
209            nonce,
210            signature,
211        } => {
212            if !gateway_resume_authorized {
213                return Err(
214                    "gateway_resume_quote_provider is only accepted from the internal gateway"
215                        .to_string(),
216                );
217            }
218            let wallet_addr =
219                verify_qp_connect_signature(&wallet, &timestamp, nonce, &signature, state)?;
220
221            if !state.qp_cache.is_active(&wallet_addr).await {
222                return Err("Wallet is not a registered active quote provider".to_string());
223            }
224
225            Ok(wallet_addr)
226        }
227        _ => Err(
228            "First message must be ConnectQuoteProvider or GatewayResumeQuoteProvider".to_string(),
229        ),
230    }
231}
232
233fn verify_qp_connect_signature(
234    wallet: &str,
235    timestamp: &str,
236    nonce: u64,
237    signature: &str,
238    state: &QpWsState,
239) -> Result<WalletAddress, String> {
240    let wallet_addr = WalletAddress::from_str(wallet)
241        .map_err(|_| format!("Invalid wallet address: {}", wallet))?;
242
243    let ts = timestamp
244        .parse::<u64>()
245        .map_err(|_| "Invalid timestamp".to_string())?;
246
247    let recovered = SignatureRecovery::recover_connect_quote_provider_signer(
248        wallet_addr,
249        alloy::primitives::U256::from(ts),
250        nonce,
251        signature,
252        state.signing_chain_id,
253    )
254    .map_err(|e| format!("Signature recovery failed: {}", e))?;
255
256    if recovered != wallet_addr.inner() {
257        return Err("Signature does not match wallet".to_string());
258    }
259
260    Ok(wallet_addr)
261}
262
263async fn handle_qp_message(text: &str, qp_wallet: &WalletAddress, state: &QpWsState) {
264    let msg: QpInboundMessage = match serde_json::from_str(text) {
265        Ok(m) => m,
266        Err(e) => {
267            warn!("Invalid QP message from {}: {}", qp_wallet.as_hex(), e);
268            return;
269        }
270    };
271
272    match msg {
273        QpInboundMessage::ConnectQuoteProvider { .. }
274        | QpInboundMessage::GatewayResumeQuoteProvider { .. } => {
275            warn!(
276                "Ignoring QP auth frame after authentication from {}",
277                qp_wallet.as_hex()
278            );
279        }
280        QpInboundMessage::IndicativeQuoteUpdate { quotes } => {
281            let now = std::time::SystemTime::now()
282                .duration_since(std::time::UNIX_EPOCH)
283                .unwrap()
284                .as_millis() as u64;
285            let quote_count = quotes.len();
286
287            let entries: Vec<IndicativeQuoteEntry> = quotes
288                .into_iter()
289                .filter_map(|q| {
290                    if !state.instruments_cache.allows_rfq(&q.instrument) {
291                        return None;
292                    }
293                    Some(IndicativeQuoteEntry {
294                        instrument: q.instrument,
295                        bid_price: q.bid_price.parse::<Decimal>().ok()?,
296                        ask_price: q.ask_price.parse::<Decimal>().ok()?,
297                        max_bid_size: q.max_bid_size.parse::<Decimal>().ok()?,
298                        max_ask_size: q.max_ask_size.parse::<Decimal>().ok()?,
299                        updated_at: now,
300                    })
301                })
302                .collect();
303
304            if quote_count == 0 || !entries.is_empty() {
305                let updates = state.indicative_cache.update(*qp_wallet, entries);
306                publish_indicative_updates(&state.pubsub, updates);
307            }
308        }
309        QpInboundMessage::RfqResponse {
310            rfq_id,
311            action,
312            legs,
313            net_premium,
314            valid_for_ms,
315            signature,
316            nonce,
317        } => {
318            if action == "decline" {
319                counter!("ht_qp_quote_rejection_total", "reason" => "decline").increment(1);
320                info!("QP {} declined RFQ {}", qp_wallet.as_hex(), rfq_id);
321                if let (Some(ref rfq_manager), Ok(rfq_uuid)) =
322                    (&state.rfq_manager, uuid::Uuid::parse_str(&rfq_id))
323                {
324                    rfq_manager.record_qp_decline(rfq_uuid, *qp_wallet);
325                }
326                return;
327            }
328
329            if let Some(ref rfq_manager) = state.rfq_manager {
330                let rfq_uuid = match uuid::Uuid::parse_str(&rfq_id) {
331                    Ok(u) => u,
332                    Err(e) => {
333                        warn!("Invalid rfq_id from QP {}: {}", qp_wallet.as_hex(), e);
334                        return;
335                    }
336                };
337
338                // Re-check QP is still active before accepting quote
339                if !state.qp_cache.is_active(qp_wallet).await {
340                    warn!(
341                        "QP {} is no longer active, rejecting RFQ response for {}",
342                        qp_wallet.as_hex(),
343                        rfq_id
344                    );
345                    rfq_manager.record_qp_invalid_response(rfq_uuid, *qp_wallet);
346                    return;
347                }
348
349                // Strictly validate every QP leg. Silently filtering out
350                // invalid legs (the previous behavior) would let a QP
351                // submit a partial or zero/negative quote that still
352                // flows into margin + settlement; a negative price in
353                // particular flips `taker_premium_delta` and produces
354                // economically invalid fills that still pass signature
355                // checks (the QP signature binds `net_premium`, not the
356                // per-leg price/size). Reject the whole quote on any
357                // parse error, unknown side, or non-positive value.
358                let mut quote_legs: Vec<crate::rfq::rfq_manager::QuoteLeg> =
359                    Vec::with_capacity(legs.as_ref().map(|l| l.len()).unwrap_or(0));
360                let mut quote_legs_valid = true;
361                for l in legs.unwrap_or_default() {
362                    let side = match l.side.to_lowercase().as_str() {
363                        "buy" => hypercall_types::Side::Buy,
364                        "sell" => hypercall_types::Side::Sell,
365                        other => {
366                            warn!(
367                                "QP {} sent invalid leg side {:?} for RFQ {}, rejecting",
368                                qp_wallet.as_hex(),
369                                other,
370                                rfq_id
371                            );
372                            quote_legs_valid = false;
373                            break;
374                        }
375                    };
376                    let price = match l.price.parse::<Decimal>() {
377                        Ok(p) if p > Decimal::ZERO => p,
378                        _ => {
379                            warn!(
380                                "QP {} sent non-positive/invalid leg price {:?} for RFQ {}, rejecting",
381                                qp_wallet.as_hex(),
382                                l.price,
383                                rfq_id
384                            );
385                            quote_legs_valid = false;
386                            break;
387                        }
388                    };
389                    let size = match l.size.parse::<Decimal>() {
390                        Ok(s) if s > Decimal::ZERO => s,
391                        _ => {
392                            warn!(
393                                "QP {} sent non-positive/invalid leg size {:?} for RFQ {}, rejecting",
394                                qp_wallet.as_hex(),
395                                l.size,
396                                rfq_id
397                            );
398                            quote_legs_valid = false;
399                            break;
400                        }
401                    };
402                    quote_legs.push(crate::rfq::rfq_manager::QuoteLeg {
403                        instrument: l.instrument,
404                        side,
405                        price,
406                        size,
407                    });
408                }
409                if !quote_legs_valid {
410                    rfq_manager.record_qp_invalid_response(rfq_uuid, *qp_wallet);
411                    return;
412                }
413
414                // Cross-check the QP's returned legs against the ORIGINAL
415                // RFQ legs that `legs_hash` was computed from. The EIP-712
416                // signature check below only binds `legs_hash + net_premium`,
417                // and `legs_hash` is over the taker's originally-requested
418                // (instrument, side, size) tuples — NOT the per-leg metadata
419                // the QP echoes back. Without this cross-check, a malicious
420                // or buggy QP can emit a validly signed response while
421                // swapping instrument / side / size in the websocket
422                // payload, and `execute_rfq_inner` will happily run the
423                // altered legs once the taker accepts the (still-valid)
424                // signature. The QP may only contribute `price`; everything
425                // else must match the original request exactly.
426                let rfq_record = match rfq_manager.get_rfq(&rfq_uuid) {
427                    Some(r) => r,
428                    None => {
429                        warn!(
430                            "QP {} sent response for unknown RFQ {}, rejecting",
431                            qp_wallet.as_hex(),
432                            rfq_id
433                        );
434                        rfq_manager.record_qp_invalid_response(rfq_uuid, *qp_wallet);
435                        return;
436                    }
437                };
438                if quote_legs.len() != rfq_record.legs.len() {
439                    warn!(
440                        "QP {} sent {} legs for RFQ {} (expected {}), rejecting",
441                        qp_wallet.as_hex(),
442                        quote_legs.len(),
443                        rfq_id,
444                        rfq_record.legs.len()
445                    );
446                    rfq_manager.record_qp_invalid_response(rfq_uuid, *qp_wallet);
447                    return;
448                }
449                let mut legs_match = true;
450                for (qp_leg, original) in quote_legs.iter().zip(rfq_record.legs.iter()) {
451                    if qp_leg.instrument != original.instrument
452                        || qp_leg.side != original.side
453                        || qp_leg.size != original.size
454                    {
455                        warn!(
456                            "QP {} sent leg mismatched with original RFQ {} (got instrument={}, side={:?}, size={}; expected instrument={}, side={:?}, size={}), rejecting",
457                            qp_wallet.as_hex(),
458                            rfq_id,
459                            qp_leg.instrument, qp_leg.side, qp_leg.size,
460                            original.instrument, original.side, original.size,
461                        );
462                        legs_match = false;
463                        break;
464                    }
465                }
466                if !legs_match {
467                    rfq_manager.record_qp_invalid_response(rfq_uuid, *qp_wallet);
468                    return;
469                }
470
471                let quote = crate::rfq::rfq_manager::QpQuote {
472                    quote_id: uuid::Uuid::now_v7(),
473                    qp_wallet: *qp_wallet,
474                    legs: quote_legs,
475                    net_premium: match net_premium.and_then(|s| s.parse().ok()) {
476                        Some(p) => p,
477                        None => {
478                            warn!(
479                                "QP {} sent RFQ response with missing/invalid net_premium for RFQ {}, rejecting",
480                                qp_wallet.as_hex(), rfq_id
481                            );
482                            rfq_manager.record_qp_invalid_response(rfq_uuid, *qp_wallet);
483                            return;
484                        }
485                    },
486                    valid_for_ms: valid_for_ms.unwrap_or(DEFAULT_QUOTE_VALID_FOR_MS),
487                    qp_signature: match signature {
488                        Some(s) if !s.is_empty() => s,
489                        _ => {
490                            warn!(
491                                "QP {} sent RFQ response without signature for RFQ {}, rejecting",
492                                qp_wallet.as_hex(),
493                                rfq_id
494                            );
495                            rfq_manager.record_qp_invalid_response(rfq_uuid, *qp_wallet);
496                            return;
497                        }
498                    },
499                    qp_nonce: nonce.unwrap_or(0),
500                    received_at: std::time::SystemTime::now()
501                        .duration_since(std::time::UNIX_EPOCH)
502                        .unwrap()
503                        .as_millis() as u64,
504                };
505
506                // Verify QP EIP-712 signature (SubmitRFQResponse)
507                if let Some(rfq_record) = rfq_manager.get_rfq(&rfq_uuid) {
508                    let mut rfq_id_bytes = [0u8; 32];
509                    rfq_id_bytes[16..].copy_from_slice(rfq_uuid.as_bytes());
510
511                    // Fail closed on premium conversion errors. Silent fallbacks
512                    // to zero would let a QP sign over a zero-premium payload
513                    // and still pass verification while transmitting an
514                    // un-representable `net_premium`, decoupling the accepted
515                    // quote from the cryptographic binding.
516                    use rust_decimal::prelude::ToPrimitive;
517                    let premium_scaled =
518                        quote.net_premium * rust_decimal::Decimal::new(1_000_000, 0);
519                    let premium_micro = match premium_scaled.to_i64() {
520                        Some(v) => v,
521                        None => {
522                            warn!(
523                                "QP {} net_premium {} not representable as i64 for RFQ {}, rejecting",
524                                qp_wallet.as_hex(),
525                                quote.net_premium,
526                                rfq_id
527                            );
528                            rfq_manager.record_qp_invalid_response(rfq_uuid, *qp_wallet);
529                            return;
530                        }
531                    };
532                    let net_premium_i256 = match alloy::primitives::I256::try_from(premium_micro) {
533                        Ok(v) => v,
534                        Err(e) => {
535                            warn!(
536                                "QP {} net_premium {} failed I256 conversion for RFQ {}: {}, rejecting",
537                                qp_wallet.as_hex(),
538                                quote.net_premium,
539                                rfq_id,
540                                e
541                            );
542                            rfq_manager.record_qp_invalid_response(rfq_uuid, *qp_wallet);
543                            return;
544                        }
545                    };
546                    let valid_for_u256 = alloy::primitives::U256::from(quote.valid_for_ms);
547
548                    match hypercall_auth::SignatureRecovery::recover_submit_rfq_response_signer(
549                        rfq_id_bytes,
550                        rfq_record.legs_hash,
551                        net_premium_i256,
552                        valid_for_u256,
553                        *qp_wallet,
554                        quote.qp_nonce,
555                        &quote.qp_signature,
556                        state.signing_chain_id,
557                    ) {
558                        Ok(recovered) => {
559                            if recovered != qp_wallet.inner() {
560                                warn!(
561                                    "QP {} signature mismatch for RFQ {}, rejecting",
562                                    qp_wallet.as_hex(),
563                                    rfq_id
564                                );
565                                rfq_manager.record_qp_invalid_response(rfq_uuid, *qp_wallet);
566                                return;
567                            }
568                        }
569                        Err(e) => {
570                            warn!(
571                                "QP {} signature recovery failed for RFQ {}: {}, rejecting",
572                                qp_wallet.as_hex(),
573                                rfq_id,
574                                e
575                            );
576                            rfq_manager.record_qp_invalid_response(rfq_uuid, *qp_wallet);
577                            return;
578                        }
579                    }
580                }
581
582                match rfq_manager.handle_qp_response(rfq_uuid, quote) {
583                    Ok(result) => {
584                        use crate::rfq::rfq_manager::HandleQpResponseResult;
585                        match result {
586                            HandleQpResponseResult::QuoteStored(record) => {
587                                info!("QP {} quote stored for RFQ {}", qp_wallet.as_hex(), rfq_id);
588                                if let Some(ref pubsub) = state.pubsub {
589                                    let ws_quotes = record
590                                        .quotes
591                                        .iter()
592                                        .map(|q| WsRfqQuoteEntry {
593                                            quote_id: q.quote_id.to_string(),
594                                            net_premium: q.net_premium,
595                                            expires_at: q.received_at + q.valid_for_ms,
596                                        })
597                                        .collect();
598                                    pubsub.publish_rfq_quotes(WsRfqQuotes {
599                                        rfq_id: record.rfq_id.to_string(),
600                                        quotes: ws_quotes,
601                                        status: record.status.as_str().to_string(),
602                                        taker_wallet: record.taker_wallet,
603                                    });
604                                }
605                            }
606                            HandleQpResponseResult::AutoExecuted {
607                                record,
608                                accepted_quote_id,
609                            } => {
610                                info!(
611                                    "QP {} quote auto-executing for RFQ {} (quote={})",
612                                    qp_wallet.as_hex(),
613                                    rfq_id,
614                                    accepted_quote_id,
615                                );
616                                // Push quotes update to taker first
617                                if let Some(ref pubsub) = state.pubsub {
618                                    let ws_quotes = record
619                                        .quotes
620                                        .iter()
621                                        .map(|q| WsRfqQuoteEntry {
622                                            quote_id: q.quote_id.to_string(),
623                                            net_premium: q.net_premium,
624                                            expires_at: q.received_at + q.valid_for_ms,
625                                        })
626                                        .collect();
627                                    pubsub.publish_rfq_quotes(WsRfqQuotes {
628                                        rfq_id: record.rfq_id.to_string(),
629                                        quotes: ws_quotes,
630                                        status: record.status.as_str().to_string(),
631                                        taker_wallet: record.taker_wallet,
632                                    });
633                                }
634
635                                // Spawn async execution task. The rfq_manager
636                                // already transitioned the status to Accepted
637                                // atomically; this task dispatches to the engine
638                                // and updates to Executed/Failed.
639                                let rfq_manager = Arc::clone(rfq_manager);
640                                let pubsub = state.pubsub.clone();
641                                let taker_wallet = record.taker_wallet;
642                                let taker_sig = record.taker_signature.clone();
643                                let rfq_id_str = record.rfq_id.to_string();
644                                tokio::spawn(async move {
645                                    match rfq_manager
646                                        .auto_accept_quote(rfq_uuid, accepted_quote_id, taker_sig)
647                                        .await
648                                    {
649                                        Ok(exec_result) => {
650                                            let (status, fill_id) = match &exec_result {
651                                                hypercall_runtime_api::RfqExecuteResult::Success { fill_id } => {
652                                                    (hypercall_types::RfqStatus::Executed, Some(fill_id.clone()))
653                                                }
654                                                hypercall_runtime_api::RfqExecuteResult::Failed { .. } => {
655                                                    (hypercall_types::RfqStatus::Failed, None)
656                                                }
657                                            };
658                                            if let Some(ref pubsub) = pubsub {
659                                                pubsub.publish_to_channel(
660                                                    "rfq",
661                                                    WsMessage::RfqAcceptResult {
662                                                        rfq_id: rfq_id_str.clone(),
663                                                        quote_id: accepted_quote_id.to_string(),
664                                                        status: status.as_str().to_string(),
665                                                        fill_id,
666                                                        taker_wallet: Some(taker_wallet),
667                                                    },
668                                                );
669                                                pubsub.publish_to_channel(
670                                                    "rfq",
671                                                    WsMessage::RfqStatusUpdate(WsRfqStatusUpdate {
672                                                        rfq_id: rfq_id_str,
673                                                        status: status.as_str().to_string(),
674                                                        taker_wallet,
675                                                    }),
676                                                );
677                                            }
678                                            info!(
679                                                "Auto-execute completed for RFQ: status={:?}",
680                                                status
681                                            );
682                                        }
683                                        Err(e) => {
684                                            warn!(
685                                                "Auto-execute failed for RFQ {}: {}",
686                                                rfq_id_str, e
687                                            );
688                                            counter!(
689                                                "ht_rfq_auto_execute_total",
690                                                "result" => "engine_error"
691                                            )
692                                            .increment(1);
693                                            // Notify the taker that auto-execution failed so
694                                            // they can retry or pick another quote.
695                                            if let Some(ref pubsub) = pubsub {
696                                                pubsub.publish_to_channel(
697                                                    "rfq",
698                                                    WsMessage::RfqStatusUpdate(WsRfqStatusUpdate {
699                                                        rfq_id: rfq_id_str.clone(),
700                                                        status: "failed".to_string(),
701                                                        taker_wallet,
702                                                    }),
703                                                );
704                                            }
705                                        }
706                                    }
707                                });
708                            }
709                            HandleQpResponseResult::AutoExecuteSkipped {
710                                record,
711                                quote_premium,
712                                limit,
713                            } => {
714                                info!(
715                                    "QP {} quote outside auto-execute limit for RFQ {} (premium={}, limit={})",
716                                    qp_wallet.as_hex(),
717                                    rfq_id,
718                                    quote_premium,
719                                    limit,
720                                );
721                                if let Some(ref pubsub) = state.pubsub {
722                                    let ws_quotes = record
723                                        .quotes
724                                        .iter()
725                                        .map(|q| WsRfqQuoteEntry {
726                                            quote_id: q.quote_id.to_string(),
727                                            net_premium: q.net_premium,
728                                            expires_at: q.received_at + q.valid_for_ms,
729                                        })
730                                        .collect();
731                                    pubsub.publish_rfq_quotes(WsRfqQuotes {
732                                        rfq_id: record.rfq_id.to_string(),
733                                        quotes: ws_quotes,
734                                        status: "outside_limit".to_string(),
735                                        taker_wallet: record.taker_wallet,
736                                    });
737                                }
738                            }
739                            HandleQpResponseResult::RpiQuoteStored(record) => {
740                                info!(
741                                    "QP {} RPI quote stored for RFQ {}",
742                                    qp_wallet.as_hex(),
743                                    rfq_id
744                                );
745                                if let Some(ref pubsub) = state.pubsub {
746                                    let ws_quotes = record
747                                        .quotes
748                                        .iter()
749                                        .map(|q| WsRfqQuoteEntry {
750                                            quote_id: q.quote_id.to_string(),
751                                            net_premium: q.net_premium,
752                                            expires_at: q.received_at + q.valid_for_ms,
753                                        })
754                                        .collect();
755                                    pubsub.publish_rfq_quotes(WsRfqQuotes {
756                                        rfq_id: record.rfq_id.to_string(),
757                                        quotes: ws_quotes,
758                                        status: record.status.as_str().to_string(),
759                                        taker_wallet: record.taker_wallet,
760                                    });
761                                }
762                            }
763                            HandleQpResponseResult::RpiQuoteRejected { record, reason } => {
764                                info!(
765                                    "QP {} RPI quote rejected for RFQ {}: {}",
766                                    qp_wallet.as_hex(),
767                                    rfq_id,
768                                    reason
769                                );
770                                if let Some(ref pubsub) = state.pubsub {
771                                    pubsub.publish_to_channel(
772                                        "rfq",
773                                        WsMessage::RfqStatusUpdate(WsRfqStatusUpdate {
774                                            rfq_id: record.rfq_id.to_string(),
775                                            status: record.status.as_str().to_string(),
776                                            taker_wallet: record.taker_wallet,
777                                        }),
778                                    );
779                                }
780                            }
781                        }
782                    }
783                    Err(e) => {
784                        // Check if the error is because the RFQ is already
785                        // accepted/executed (auto-accept race). Notify the QP
786                        // immediately so it knows it lost, even if execution
787                        // hasn't completed yet. The QP shouldn't wait for
788                        // terminal execution before learning it lost the race.
789                        if e.contains("invalid state for quotes") {
790                            if let Some(record) = rfq_manager.get_rfq(&rfq_uuid) {
791                                if record.status == hypercall_types::RfqStatus::Accepted
792                                    || record.status == hypercall_types::RfqStatus::Executed
793                                {
794                                    let winning_quote_id = record
795                                        .accepted_quote_id
796                                        .map(|id| id.to_string())
797                                        .unwrap_or_default();
798                                    state.session_manager.send_to_qp(
799                                        qp_wallet,
800                                        QpOutboundMessage::RfqAlreadyFilled {
801                                            rfq_id: rfq_id.clone(),
802                                            filled_by_quote_id: winning_quote_id,
803                                        },
804                                    );
805                                }
806                            }
807                        }
808                        warn!(
809                            "QP {} quote rejected for RFQ {}: {}",
810                            qp_wallet.as_hex(),
811                            rfq_id,
812                            e
813                        );
814                    }
815                }
816            } else {
817                warn!(
818                    "Received RFQ response from QP {} but RfqManager not wired",
819                    qp_wallet.as_hex()
820                );
821            }
822        }
823    }
824}
825
826fn publish_indicative_updates(
827    pubsub: &Option<Arc<dyn RfqWebsocketPublisher>>,
828    updates: Vec<(String, Option<AggregatedIndicativeQuote>)>,
829) {
830    let Some(pubsub) = pubsub else {
831        return;
832    };
833
834    for (instrument, aggregate) in updates {
835        let message = match aggregate {
836            Some(aggregate) => WsIndicativeMarketData {
837                instrument: aggregate.instrument,
838                best_bid: Some(aggregate.best_bid),
839                best_ask: Some(aggregate.best_ask),
840                bid_iv: None,
841                ask_iv: None,
842                indicative_bid_size: Some(aggregate.indicative_bid_size),
843                indicative_ask_size: Some(aggregate.indicative_ask_size),
844                num_providers: aggregate.num_providers,
845                timestamp: aggregate.updated_at,
846            },
847            None => WsIndicativeMarketData {
848                instrument,
849                best_bid: None,
850                best_ask: None,
851                bid_iv: None,
852                ask_iv: None,
853                indicative_bid_size: None,
854                indicative_ask_size: None,
855                num_providers: 0,
856                timestamp: hypercall_types::utils::get_timestamp_millis(),
857            },
858        };
859
860        pubsub.publish_indicative_market_data(message);
861    }
862}
863
864use std::str::FromStr;
865
866#[cfg(test)]
867mod tests {
868    use super::*;
869    use crate::rfq::qp_sessions::QpSessionManager;
870    use crate::websocket::{uuid, PubSubManager, WsMessage};
871    use hypercall_ws_protocol::QpRfqLeg;
872    use rust_decimal_macros::dec;
873    use std::sync::Arc;
874
875    fn test_wallet(id: u8) -> WalletAddress {
876        let mut bytes = [0u8; 20];
877        bytes[19] = id;
878        WalletAddress::from(alloy::primitives::Address::from(bytes))
879    }
880
881    #[test]
882    fn test_session_manager_register_remove() {
883        let mgr = QpSessionManager::new();
884        let wallet = test_wallet(1);
885        let (tx, _rx) = mpsc::unbounded_channel();
886
887        let session_id = mgr.register_session(wallet, tx);
888        assert_eq!(mgr.get_connected_wallets().len(), 1);
889
890        mgr.remove_session(&wallet, session_id);
891        assert_eq!(mgr.get_connected_wallets().len(), 0);
892    }
893
894    #[test]
895    fn test_session_manager_multiple_sessions_per_wallet() {
896        let mgr = QpSessionManager::new();
897        let wallet = test_wallet(1);
898        let (tx1, _rx1) = mpsc::unbounded_channel();
899        let (tx2, _rx2) = mpsc::unbounded_channel();
900
901        let s1 = mgr.register_session(wallet, tx1);
902        let s2 = mgr.register_session(wallet, tx2);
903
904        // One wallet, two sessions
905        assert_eq!(mgr.get_connected_wallets().len(), 1);
906
907        // Removing one session while another remains must NOT report
908        // fully-disconnected — downstream cleanup (e.g. evicting
909        // indicative quotes from the cache) would otherwise clobber
910        // state the surviving session is still actively updating.
911        assert!(
912            !mgr.remove_session(&wallet, s1),
913            "remove_session must return false while other sessions remain"
914        );
915        assert_eq!(mgr.get_connected_wallets().len(), 1);
916
917        assert!(
918            mgr.remove_session(&wallet, s2),
919            "remove_session must return true when the last session drops"
920        );
921        assert_eq!(mgr.get_connected_wallets().len(), 0);
922    }
923
924    #[test]
925    fn test_send_to_qp_fans_out() {
926        let mgr = QpSessionManager::new();
927        let wallet = test_wallet(1);
928        let (tx1, mut rx1) = mpsc::unbounded_channel();
929        let (tx2, mut rx2) = mpsc::unbounded_channel();
930
931        mgr.register_session(wallet, tx1);
932        mgr.register_session(wallet, tx2);
933
934        mgr.send_to_qp(
935            &wallet,
936            QpOutboundMessage::Authenticated {
937                wallet: "0x01".to_string(),
938            },
939        );
940
941        assert!(rx1.try_recv().is_ok());
942        assert!(rx2.try_recv().is_ok());
943    }
944
945    #[test]
946    fn test_send_rfq_to_eligible() {
947        let mgr = QpSessionManager::new();
948        let w1 = test_wallet(1);
949        let w2 = test_wallet(2);
950        let w3 = test_wallet(3);
951        let (tx1, mut rx1) = mpsc::unbounded_channel();
952        let (tx2, mut rx2) = mpsc::unbounded_channel();
953        let (tx3, mut rx3) = mpsc::unbounded_channel();
954
955        mgr.register_session(w1, tx1);
956        mgr.register_session(w2, tx2);
957        mgr.register_session(w3, tx3);
958
959        // Only send to w1 and w3
960        mgr.send_rfq_to_eligible(
961            &[w1, w3],
962            QpOutboundMessage::RfqRequest {
963                rfq_id: "test".to_string(),
964                legs: vec![],
965                taker_wallet: "0x00".to_string(),
966                request_timestamp: 0,
967                response_deadline_ms: 5000,
968                auto_accept_limit: None,
969                auto_execute: false,
970                taker_limit_price: None,
971                reference_price: None,
972                min_improvement_tick: None,
973                auction_deadline_ms: None,
974                requires_price_improvement: false,
975            },
976        );
977
978        assert!(rx1.try_recv().is_ok());
979        assert!(rx2.try_recv().is_err()); // w2 not eligible
980        assert!(rx3.try_recv().is_ok());
981    }
982
983    #[test]
984    fn test_qp_inbound_message_deserialize() {
985        let json = r#"{"type":"indicative_quote_update","quotes":[{"instrument":"ETH-C-3000","bid_price":"120.5","ask_price":"125.0","max_bid_size":"10","max_ask_size":"10"}]}"#;
986        let msg: QpInboundMessage = serde_json::from_str(json).unwrap();
987        assert!(matches!(
988            msg,
989            QpInboundMessage::IndicativeQuoteUpdate { .. }
990        ));
991    }
992
993    #[test]
994    fn test_publish_indicative_updates_sends_market_data() {
995        let pubsub = PubSubManager::new();
996        let client_id = uuid::Uuid::new_v4();
997        let (tx, mut rx) = mpsc::unbounded_channel::<Arc<WsMessage>>();
998        pubsub.add_client(client_id, tx, false, None);
999        pubsub
1000            .subscribe(
1001                client_id,
1002                "indicative_market_data".to_string(),
1003                None,
1004                None,
1005                None,
1006            )
1007            .unwrap();
1008
1009        publish_indicative_updates(
1010            &Some(Arc::new(pubsub)),
1011            vec![(
1012                "ETH-20260131-3000-C".to_string(),
1013                Some(AggregatedIndicativeQuote {
1014                    instrument: "ETH-20260131-3000-C".to_string(),
1015                    best_bid: dec!(120.5),
1016                    best_ask: dec!(125.0),
1017                    indicative_bid_size: dec!(10),
1018                    indicative_ask_size: dec!(12),
1019                    num_providers: 2,
1020                    updated_at: 1_737_331_200_000,
1021                }),
1022            )],
1023        );
1024
1025        let message = rx.try_recv().expect("indicative update should publish");
1026        match message.as_ref() {
1027            WsMessage::IndicativeMarketData(update) => {
1028                assert_eq!(update.instrument, "ETH-20260131-3000-C");
1029                assert_eq!(update.best_bid, Some(dec!(120.5)));
1030                assert_eq!(update.best_ask, Some(dec!(125.0)));
1031                assert_eq!(update.indicative_bid_size, Some(dec!(10)));
1032                assert_eq!(update.indicative_ask_size, Some(dec!(12)));
1033                assert_eq!(update.num_providers, 2);
1034                assert_eq!(update.timestamp, 1_737_331_200_000);
1035            }
1036            other => panic!("expected IndicativeMarketData, got {other:?}"),
1037        }
1038    }
1039
1040    #[test]
1041    fn test_publish_indicative_updates_sends_removal() {
1042        let pubsub = PubSubManager::new();
1043        let client_id = uuid::Uuid::new_v4();
1044        let (tx, mut rx) = mpsc::unbounded_channel::<Arc<WsMessage>>();
1045        pubsub.add_client(client_id, tx, false, None);
1046        pubsub
1047            .subscribe(
1048                client_id,
1049                "indicative_market_data".to_string(),
1050                None,
1051                None,
1052                None,
1053            )
1054            .unwrap();
1055
1056        publish_indicative_updates(
1057            &Some(Arc::new(pubsub)),
1058            vec![("ETH-20260131-3000-C".to_string(), None)],
1059        );
1060
1061        let message = rx.try_recv().expect("indicative removal should publish");
1062        match message.as_ref() {
1063            WsMessage::IndicativeMarketData(update) => {
1064                assert_eq!(update.instrument, "ETH-20260131-3000-C");
1065                assert_eq!(update.best_bid, None);
1066                assert_eq!(update.best_ask, None);
1067                assert_eq!(update.indicative_bid_size, None);
1068                assert_eq!(update.indicative_ask_size, None);
1069                assert_eq!(update.num_providers, 0);
1070                assert!(update.timestamp > 0);
1071            }
1072            other => panic!("expected IndicativeMarketData removal, got {other:?}"),
1073        }
1074    }
1075
1076    #[test]
1077    fn test_qp_outbound_message_serialize() {
1078        let msg = QpOutboundMessage::RfqRequest {
1079            rfq_id: "abc".to_string(),
1080            legs: vec![QpRfqLeg {
1081                instrument: "ETH-C-3000".to_string(),
1082                side: "buy".to_string(),
1083                size: "5.0".to_string(),
1084            }],
1085            taker_wallet: "0x123".to_string(),
1086            request_timestamp: 1000,
1087            response_deadline_ms: 5000,
1088            auto_accept_limit: None,
1089            auto_execute: false,
1090            taker_limit_price: None,
1091            reference_price: None,
1092            min_improvement_tick: None,
1093            auction_deadline_ms: None,
1094            requires_price_improvement: false,
1095        };
1096        let json = serde_json::to_string(&msg).unwrap();
1097        assert!(json.contains("rfq_request"));
1098        assert!(json.contains("ETH-C-3000"));
1099        assert!(!json.contains("auto_accept_limit"));
1100    }
1101
1102    #[test]
1103    fn test_qp_outbound_message_serializes_auto_accept_limit() {
1104        let msg = QpOutboundMessage::RfqRequest {
1105            rfq_id: "abc".to_string(),
1106            legs: vec![QpRfqLeg {
1107                instrument: "BTC-20260501-90000-C".to_string(),
1108                side: "buy".to_string(),
1109                size: "1".to_string(),
1110            }],
1111            taker_wallet: "0x123".to_string(),
1112            request_timestamp: 1000,
1113            response_deadline_ms: 5000,
1114            auto_accept_limit: Some("3999".to_string()),
1115            auto_execute: true,
1116            taker_limit_price: Some("3999".to_string()),
1117            reference_price: Some("3999".to_string()),
1118            min_improvement_tick: Some("0.0001".to_string()),
1119            auction_deadline_ms: Some(2000),
1120            requires_price_improvement: true,
1121        };
1122        let json = serde_json::to_string(&msg).unwrap();
1123        assert!(json.contains(r#""auto_accept_limit":"3999""#));
1124        assert!(json.contains(r#""auto_execute":true"#));
1125        assert!(json.contains(r#""requires_price_improvement":true"#));
1126    }
1127}