1use crate::rfq::indicative_quote_cache::{AggregatedIndicativeQuote, IndicativeQuoteEntry};
2use crate::rfq::qp_ws_state::{QpWsState, RfqWebsocketPublisher};
3use axum::{
4 extract::{
5 ws::{Message, WebSocket, WebSocketUpgrade},
6 State,
7 },
8 http::HeaderMap,
9 response::Response,
10};
11use futures::{SinkExt, StreamExt};
12use hypercall_auth::SignatureRecovery;
13use hypercall_types::ws_protocol::{
14 WsIndicativeMarketData, WsMessage, WsRfqQuoteEntry, WsRfqQuotes, WsRfqStatusUpdate,
15};
16use hypercall_types::WalletAddress;
17use hypercall_ws_protocol::{
18 QpClientMessage as QpInboundMessage, QpServerMessage as QpOutboundMessage,
19};
20use metrics::counter;
21use rust_decimal::Decimal;
22use std::sync::Arc;
23use tokio::sync::mpsc;
24use tracing::{info, warn};
25
26const DEFAULT_QUOTE_VALID_FOR_MS: u64 = 25_000;
30const GATEWAY_TOKEN_HEADER: &str = "x-hypercall-gateway-token";
31
32pub async fn qp_websocket_handler(
34 ws: WebSocketUpgrade,
35 headers: HeaderMap,
36 State(state): State<QpWsState>,
37) -> Response {
38 let gateway_resume_authorized = gateway_resume_authorized(&headers, &state);
39 ws.on_upgrade(move |socket| handle_qp_socket(socket, state, gateway_resume_authorized))
40}
41
42fn gateway_resume_authorized(headers: &HeaderMap, state: &QpWsState) -> bool {
43 let Some(expected) = state.gateway_resume_token.as_deref() else {
44 return false;
45 };
46 headers
47 .get(GATEWAY_TOKEN_HEADER)
48 .and_then(|value| value.to_str().ok())
49 .is_some_and(|actual| actual == expected)
50}
51
52async fn handle_qp_socket(socket: WebSocket, state: QpWsState, gateway_resume_authorized: bool) {
53 let (mut ws_sender, mut ws_receiver) = socket.split();
54
55 let auth_result =
57 match tokio::time::timeout(std::time::Duration::from_secs(10), ws_receiver.next()).await {
58 Ok(Some(Ok(Message::Text(text)))) => {
59 authenticate_qp(&text, &state, gateway_resume_authorized).await
60 }
61 Ok(Some(Ok(Message::Binary(bytes)))) => {
62 if let Ok(text) = String::from_utf8(bytes.to_vec()) {
63 authenticate_qp(&text, &state, gateway_resume_authorized).await
64 } else {
65 Err("Invalid binary frame".to_string())
66 }
67 }
68 _ => Err("Timeout or connection closed before auth".to_string()),
69 };
70
71 let qp_wallet = match auth_result {
72 Ok(wallet) => {
73 let msg = QpOutboundMessage::Authenticated {
74 wallet: wallet.as_hex(),
75 };
76 let json = serde_json::to_string(&msg).unwrap();
77 if ws_sender.send(Message::Text(json)).await.is_err() {
78 return;
79 }
80 wallet
81 }
82 Err(reason) => {
83 let msg = QpOutboundMessage::AuthFailed { reason };
84 let json = serde_json::to_string(&msg).unwrap();
85 let _ = ws_sender.send(Message::Text(json)).await;
86 return;
87 }
88 };
89
90 info!("QP authenticated: {}", qp_wallet.as_hex());
91
92 let (outbound_tx, mut outbound_rx) = mpsc::unbounded_channel::<QpOutboundMessage>();
94 let session_id = state
95 .session_manager
96 .register_session(qp_wallet, outbound_tx);
97
98 loop {
100 tokio::select! {
101 Some(msg) = outbound_rx.recv() => {
103 let json = serde_json::to_string(&msg).unwrap();
104 if ws_sender.send(Message::Text(json)).await.is_err() {
105 break;
106 }
107 }
108 Some(Ok(frame)) = ws_receiver.next() => {
110 match frame {
111 Message::Text(text) => {
112 handle_qp_message(&text, &qp_wallet, &state).await;
113 }
114 Message::Binary(bytes) => {
115 if let Ok(text) = String::from_utf8(bytes.to_vec()) {
116 handle_qp_message(&text, &qp_wallet, &state).await;
117 }
118 }
119 Message::Ping(_) => {
120 let _ = ws_sender.send(Message::Pong(vec![])).await;
121 }
122 Message::Close(_) => break,
123 _ => {}
124 }
125 }
126 else => break,
127 }
128 }
129
130 let fully_disconnected = state.session_manager.remove_session(&qp_wallet, session_id);
138 if fully_disconnected {
139 let updates = state.indicative_cache.evict_qp(&qp_wallet);
140 publish_indicative_updates(&state.pubsub, updates);
141 }
142 info!("QP disconnected: {}", qp_wallet.as_hex());
143}
144
145async fn authenticate_qp(
146 text: &str,
147 state: &QpWsState,
148 gateway_resume_authorized: bool,
149) -> Result<WalletAddress, String> {
150 let msg: QpInboundMessage =
151 serde_json::from_str(text).map_err(|e| format!("Invalid JSON: {}", e))?;
152
153 match msg {
154 QpInboundMessage::ConnectQuoteProvider {
155 wallet,
156 timestamp,
157 nonce,
158 signature,
159 } => {
160 let wallet_addr = WalletAddress::from_str(&wallet)
161 .map_err(|_| format!("Invalid wallet address: {}", wallet))?;
162
163 let ts = timestamp
164 .parse::<u64>()
165 .map_err(|_| "Invalid timestamp".to_string())?;
166
167 let recovered = SignatureRecovery::recover_connect_quote_provider_signer(
168 wallet_addr,
169 alloy::primitives::U256::from(ts),
170 nonce,
171 &signature,
172 state.signing_chain_id,
173 )
174 .map_err(|e| format!("Signature recovery failed: {}", e))?;
175
176 if recovered != wallet_addr.inner() {
177 return Err("Signature does not match wallet".to_string());
178 }
179
180 let (tx, rx) = tokio::sync::oneshot::channel();
183 let nonce_req = hypercall_runtime_api::NonceCheckRequest {
184 wallet: wallet_addr,
185 nonce,
186 applied_tx: tx,
187 };
188 state
189 .nonce_check_sender
190 .send(nonce_req)
191 .await
192 .map_err(|_| "Engine nonce channel closed".to_string())?;
193 match tokio::time::timeout(std::time::Duration::from_secs(5), rx).await {
194 Ok(Ok(Ok(()))) => {}
195 Ok(Ok(Err(e))) => return Err(format!("Handshake nonce rejected: {}", e)),
196 Ok(Err(_)) => return Err("Engine dropped nonce response channel".to_string()),
197 Err(_) => return Err("Timed out waiting for nonce check".to_string()),
198 }
199
200 if !state.qp_cache.is_active(&wallet_addr).await {
201 return Err("Wallet is not a registered active quote provider".to_string());
202 }
203
204 Ok(wallet_addr)
205 }
206 QpInboundMessage::GatewayResumeQuoteProvider {
207 wallet,
208 timestamp,
209 nonce,
210 signature,
211 } => {
212 if !gateway_resume_authorized {
213 return Err(
214 "gateway_resume_quote_provider is only accepted from the internal gateway"
215 .to_string(),
216 );
217 }
218 let wallet_addr =
219 verify_qp_connect_signature(&wallet, ×tamp, nonce, &signature, state)?;
220
221 if !state.qp_cache.is_active(&wallet_addr).await {
222 return Err("Wallet is not a registered active quote provider".to_string());
223 }
224
225 Ok(wallet_addr)
226 }
227 _ => Err(
228 "First message must be ConnectQuoteProvider or GatewayResumeQuoteProvider".to_string(),
229 ),
230 }
231}
232
233fn verify_qp_connect_signature(
234 wallet: &str,
235 timestamp: &str,
236 nonce: u64,
237 signature: &str,
238 state: &QpWsState,
239) -> Result<WalletAddress, String> {
240 let wallet_addr = WalletAddress::from_str(wallet)
241 .map_err(|_| format!("Invalid wallet address: {}", wallet))?;
242
243 let ts = timestamp
244 .parse::<u64>()
245 .map_err(|_| "Invalid timestamp".to_string())?;
246
247 let recovered = SignatureRecovery::recover_connect_quote_provider_signer(
248 wallet_addr,
249 alloy::primitives::U256::from(ts),
250 nonce,
251 signature,
252 state.signing_chain_id,
253 )
254 .map_err(|e| format!("Signature recovery failed: {}", e))?;
255
256 if recovered != wallet_addr.inner() {
257 return Err("Signature does not match wallet".to_string());
258 }
259
260 Ok(wallet_addr)
261}
262
263async fn handle_qp_message(text: &str, qp_wallet: &WalletAddress, state: &QpWsState) {
264 let msg: QpInboundMessage = match serde_json::from_str(text) {
265 Ok(m) => m,
266 Err(e) => {
267 warn!("Invalid QP message from {}: {}", qp_wallet.as_hex(), e);
268 return;
269 }
270 };
271
272 match msg {
273 QpInboundMessage::ConnectQuoteProvider { .. }
274 | QpInboundMessage::GatewayResumeQuoteProvider { .. } => {
275 warn!(
276 "Ignoring QP auth frame after authentication from {}",
277 qp_wallet.as_hex()
278 );
279 }
280 QpInboundMessage::IndicativeQuoteUpdate { quotes } => {
281 let now = std::time::SystemTime::now()
282 .duration_since(std::time::UNIX_EPOCH)
283 .unwrap()
284 .as_millis() as u64;
285 let quote_count = quotes.len();
286
287 let entries: Vec<IndicativeQuoteEntry> = quotes
288 .into_iter()
289 .filter_map(|q| {
290 if !state.instruments_cache.allows_rfq(&q.instrument) {
291 return None;
292 }
293 Some(IndicativeQuoteEntry {
294 instrument: q.instrument,
295 bid_price: q.bid_price.parse::<Decimal>().ok()?,
296 ask_price: q.ask_price.parse::<Decimal>().ok()?,
297 max_bid_size: q.max_bid_size.parse::<Decimal>().ok()?,
298 max_ask_size: q.max_ask_size.parse::<Decimal>().ok()?,
299 updated_at: now,
300 })
301 })
302 .collect();
303
304 if quote_count == 0 || !entries.is_empty() {
305 let updates = state.indicative_cache.update(*qp_wallet, entries);
306 publish_indicative_updates(&state.pubsub, updates);
307 }
308 }
309 QpInboundMessage::RfqResponse {
310 rfq_id,
311 action,
312 legs,
313 net_premium,
314 valid_for_ms,
315 signature,
316 nonce,
317 } => {
318 if action == "decline" {
319 counter!("ht_qp_quote_rejection_total", "reason" => "decline").increment(1);
320 info!("QP {} declined RFQ {}", qp_wallet.as_hex(), rfq_id);
321 if let (Some(ref rfq_manager), Ok(rfq_uuid)) =
322 (&state.rfq_manager, uuid::Uuid::parse_str(&rfq_id))
323 {
324 rfq_manager.record_qp_decline(rfq_uuid, *qp_wallet);
325 }
326 return;
327 }
328
329 if let Some(ref rfq_manager) = state.rfq_manager {
330 let rfq_uuid = match uuid::Uuid::parse_str(&rfq_id) {
331 Ok(u) => u,
332 Err(e) => {
333 warn!("Invalid rfq_id from QP {}: {}", qp_wallet.as_hex(), e);
334 return;
335 }
336 };
337
338 if !state.qp_cache.is_active(qp_wallet).await {
340 warn!(
341 "QP {} is no longer active, rejecting RFQ response for {}",
342 qp_wallet.as_hex(),
343 rfq_id
344 );
345 rfq_manager.record_qp_invalid_response(rfq_uuid, *qp_wallet);
346 return;
347 }
348
349 let mut quote_legs: Vec<crate::rfq::rfq_manager::QuoteLeg> =
359 Vec::with_capacity(legs.as_ref().map(|l| l.len()).unwrap_or(0));
360 let mut quote_legs_valid = true;
361 for l in legs.unwrap_or_default() {
362 let side = match l.side.to_lowercase().as_str() {
363 "buy" => hypercall_types::Side::Buy,
364 "sell" => hypercall_types::Side::Sell,
365 other => {
366 warn!(
367 "QP {} sent invalid leg side {:?} for RFQ {}, rejecting",
368 qp_wallet.as_hex(),
369 other,
370 rfq_id
371 );
372 quote_legs_valid = false;
373 break;
374 }
375 };
376 let price = match l.price.parse::<Decimal>() {
377 Ok(p) if p > Decimal::ZERO => p,
378 _ => {
379 warn!(
380 "QP {} sent non-positive/invalid leg price {:?} for RFQ {}, rejecting",
381 qp_wallet.as_hex(),
382 l.price,
383 rfq_id
384 );
385 quote_legs_valid = false;
386 break;
387 }
388 };
389 let size = match l.size.parse::<Decimal>() {
390 Ok(s) if s > Decimal::ZERO => s,
391 _ => {
392 warn!(
393 "QP {} sent non-positive/invalid leg size {:?} for RFQ {}, rejecting",
394 qp_wallet.as_hex(),
395 l.size,
396 rfq_id
397 );
398 quote_legs_valid = false;
399 break;
400 }
401 };
402 quote_legs.push(crate::rfq::rfq_manager::QuoteLeg {
403 instrument: l.instrument,
404 side,
405 price,
406 size,
407 });
408 }
409 if !quote_legs_valid {
410 rfq_manager.record_qp_invalid_response(rfq_uuid, *qp_wallet);
411 return;
412 }
413
414 let rfq_record = match rfq_manager.get_rfq(&rfq_uuid) {
427 Some(r) => r,
428 None => {
429 warn!(
430 "QP {} sent response for unknown RFQ {}, rejecting",
431 qp_wallet.as_hex(),
432 rfq_id
433 );
434 rfq_manager.record_qp_invalid_response(rfq_uuid, *qp_wallet);
435 return;
436 }
437 };
438 if quote_legs.len() != rfq_record.legs.len() {
439 warn!(
440 "QP {} sent {} legs for RFQ {} (expected {}), rejecting",
441 qp_wallet.as_hex(),
442 quote_legs.len(),
443 rfq_id,
444 rfq_record.legs.len()
445 );
446 rfq_manager.record_qp_invalid_response(rfq_uuid, *qp_wallet);
447 return;
448 }
449 let mut legs_match = true;
450 for (qp_leg, original) in quote_legs.iter().zip(rfq_record.legs.iter()) {
451 if qp_leg.instrument != original.instrument
452 || qp_leg.side != original.side
453 || qp_leg.size != original.size
454 {
455 warn!(
456 "QP {} sent leg mismatched with original RFQ {} (got instrument={}, side={:?}, size={}; expected instrument={}, side={:?}, size={}), rejecting",
457 qp_wallet.as_hex(),
458 rfq_id,
459 qp_leg.instrument, qp_leg.side, qp_leg.size,
460 original.instrument, original.side, original.size,
461 );
462 legs_match = false;
463 break;
464 }
465 }
466 if !legs_match {
467 rfq_manager.record_qp_invalid_response(rfq_uuid, *qp_wallet);
468 return;
469 }
470
471 let quote = crate::rfq::rfq_manager::QpQuote {
472 quote_id: uuid::Uuid::now_v7(),
473 qp_wallet: *qp_wallet,
474 legs: quote_legs,
475 net_premium: match net_premium.and_then(|s| s.parse().ok()) {
476 Some(p) => p,
477 None => {
478 warn!(
479 "QP {} sent RFQ response with missing/invalid net_premium for RFQ {}, rejecting",
480 qp_wallet.as_hex(), rfq_id
481 );
482 rfq_manager.record_qp_invalid_response(rfq_uuid, *qp_wallet);
483 return;
484 }
485 },
486 valid_for_ms: valid_for_ms.unwrap_or(DEFAULT_QUOTE_VALID_FOR_MS),
487 qp_signature: match signature {
488 Some(s) if !s.is_empty() => s,
489 _ => {
490 warn!(
491 "QP {} sent RFQ response without signature for RFQ {}, rejecting",
492 qp_wallet.as_hex(),
493 rfq_id
494 );
495 rfq_manager.record_qp_invalid_response(rfq_uuid, *qp_wallet);
496 return;
497 }
498 },
499 qp_nonce: nonce.unwrap_or(0),
500 received_at: std::time::SystemTime::now()
501 .duration_since(std::time::UNIX_EPOCH)
502 .unwrap()
503 .as_millis() as u64,
504 };
505
506 if let Some(rfq_record) = rfq_manager.get_rfq(&rfq_uuid) {
508 let mut rfq_id_bytes = [0u8; 32];
509 rfq_id_bytes[16..].copy_from_slice(rfq_uuid.as_bytes());
510
511 use rust_decimal::prelude::ToPrimitive;
517 let premium_scaled =
518 quote.net_premium * rust_decimal::Decimal::new(1_000_000, 0);
519 let premium_micro = match premium_scaled.to_i64() {
520 Some(v) => v,
521 None => {
522 warn!(
523 "QP {} net_premium {} not representable as i64 for RFQ {}, rejecting",
524 qp_wallet.as_hex(),
525 quote.net_premium,
526 rfq_id
527 );
528 rfq_manager.record_qp_invalid_response(rfq_uuid, *qp_wallet);
529 return;
530 }
531 };
532 let net_premium_i256 = match alloy::primitives::I256::try_from(premium_micro) {
533 Ok(v) => v,
534 Err(e) => {
535 warn!(
536 "QP {} net_premium {} failed I256 conversion for RFQ {}: {}, rejecting",
537 qp_wallet.as_hex(),
538 quote.net_premium,
539 rfq_id,
540 e
541 );
542 rfq_manager.record_qp_invalid_response(rfq_uuid, *qp_wallet);
543 return;
544 }
545 };
546 let valid_for_u256 = alloy::primitives::U256::from(quote.valid_for_ms);
547
548 match hypercall_auth::SignatureRecovery::recover_submit_rfq_response_signer(
549 rfq_id_bytes,
550 rfq_record.legs_hash,
551 net_premium_i256,
552 valid_for_u256,
553 *qp_wallet,
554 quote.qp_nonce,
555 "e.qp_signature,
556 state.signing_chain_id,
557 ) {
558 Ok(recovered) => {
559 if recovered != qp_wallet.inner() {
560 warn!(
561 "QP {} signature mismatch for RFQ {}, rejecting",
562 qp_wallet.as_hex(),
563 rfq_id
564 );
565 rfq_manager.record_qp_invalid_response(rfq_uuid, *qp_wallet);
566 return;
567 }
568 }
569 Err(e) => {
570 warn!(
571 "QP {} signature recovery failed for RFQ {}: {}, rejecting",
572 qp_wallet.as_hex(),
573 rfq_id,
574 e
575 );
576 rfq_manager.record_qp_invalid_response(rfq_uuid, *qp_wallet);
577 return;
578 }
579 }
580 }
581
582 match rfq_manager.handle_qp_response(rfq_uuid, quote) {
583 Ok(result) => {
584 use crate::rfq::rfq_manager::HandleQpResponseResult;
585 match result {
586 HandleQpResponseResult::QuoteStored(record) => {
587 info!("QP {} quote stored for RFQ {}", qp_wallet.as_hex(), rfq_id);
588 if let Some(ref pubsub) = state.pubsub {
589 let ws_quotes = record
590 .quotes
591 .iter()
592 .map(|q| WsRfqQuoteEntry {
593 quote_id: q.quote_id.to_string(),
594 net_premium: q.net_premium,
595 expires_at: q.received_at + q.valid_for_ms,
596 })
597 .collect();
598 pubsub.publish_rfq_quotes(WsRfqQuotes {
599 rfq_id: record.rfq_id.to_string(),
600 quotes: ws_quotes,
601 status: record.status.as_str().to_string(),
602 taker_wallet: record.taker_wallet,
603 });
604 }
605 }
606 HandleQpResponseResult::AutoExecuted {
607 record,
608 accepted_quote_id,
609 } => {
610 info!(
611 "QP {} quote auto-executing for RFQ {} (quote={})",
612 qp_wallet.as_hex(),
613 rfq_id,
614 accepted_quote_id,
615 );
616 if let Some(ref pubsub) = state.pubsub {
618 let ws_quotes = record
619 .quotes
620 .iter()
621 .map(|q| WsRfqQuoteEntry {
622 quote_id: q.quote_id.to_string(),
623 net_premium: q.net_premium,
624 expires_at: q.received_at + q.valid_for_ms,
625 })
626 .collect();
627 pubsub.publish_rfq_quotes(WsRfqQuotes {
628 rfq_id: record.rfq_id.to_string(),
629 quotes: ws_quotes,
630 status: record.status.as_str().to_string(),
631 taker_wallet: record.taker_wallet,
632 });
633 }
634
635 let rfq_manager = Arc::clone(rfq_manager);
640 let pubsub = state.pubsub.clone();
641 let taker_wallet = record.taker_wallet;
642 let taker_sig = record.taker_signature.clone();
643 let rfq_id_str = record.rfq_id.to_string();
644 tokio::spawn(async move {
645 match rfq_manager
646 .auto_accept_quote(rfq_uuid, accepted_quote_id, taker_sig)
647 .await
648 {
649 Ok(exec_result) => {
650 let (status, fill_id) = match &exec_result {
651 hypercall_runtime_api::RfqExecuteResult::Success { fill_id } => {
652 (hypercall_types::RfqStatus::Executed, Some(fill_id.clone()))
653 }
654 hypercall_runtime_api::RfqExecuteResult::Failed { .. } => {
655 (hypercall_types::RfqStatus::Failed, None)
656 }
657 };
658 if let Some(ref pubsub) = pubsub {
659 pubsub.publish_to_channel(
660 "rfq",
661 WsMessage::RfqAcceptResult {
662 rfq_id: rfq_id_str.clone(),
663 quote_id: accepted_quote_id.to_string(),
664 status: status.as_str().to_string(),
665 fill_id,
666 taker_wallet: Some(taker_wallet),
667 },
668 );
669 pubsub.publish_to_channel(
670 "rfq",
671 WsMessage::RfqStatusUpdate(WsRfqStatusUpdate {
672 rfq_id: rfq_id_str,
673 status: status.as_str().to_string(),
674 taker_wallet,
675 }),
676 );
677 }
678 info!(
679 "Auto-execute completed for RFQ: status={:?}",
680 status
681 );
682 }
683 Err(e) => {
684 warn!(
685 "Auto-execute failed for RFQ {}: {}",
686 rfq_id_str, e
687 );
688 counter!(
689 "ht_rfq_auto_execute_total",
690 "result" => "engine_error"
691 )
692 .increment(1);
693 if let Some(ref pubsub) = pubsub {
696 pubsub.publish_to_channel(
697 "rfq",
698 WsMessage::RfqStatusUpdate(WsRfqStatusUpdate {
699 rfq_id: rfq_id_str.clone(),
700 status: "failed".to_string(),
701 taker_wallet,
702 }),
703 );
704 }
705 }
706 }
707 });
708 }
709 HandleQpResponseResult::AutoExecuteSkipped {
710 record,
711 quote_premium,
712 limit,
713 } => {
714 info!(
715 "QP {} quote outside auto-execute limit for RFQ {} (premium={}, limit={})",
716 qp_wallet.as_hex(),
717 rfq_id,
718 quote_premium,
719 limit,
720 );
721 if let Some(ref pubsub) = state.pubsub {
722 let ws_quotes = record
723 .quotes
724 .iter()
725 .map(|q| WsRfqQuoteEntry {
726 quote_id: q.quote_id.to_string(),
727 net_premium: q.net_premium,
728 expires_at: q.received_at + q.valid_for_ms,
729 })
730 .collect();
731 pubsub.publish_rfq_quotes(WsRfqQuotes {
732 rfq_id: record.rfq_id.to_string(),
733 quotes: ws_quotes,
734 status: "outside_limit".to_string(),
735 taker_wallet: record.taker_wallet,
736 });
737 }
738 }
739 HandleQpResponseResult::RpiQuoteStored(record) => {
740 info!(
741 "QP {} RPI quote stored for RFQ {}",
742 qp_wallet.as_hex(),
743 rfq_id
744 );
745 if let Some(ref pubsub) = state.pubsub {
746 let ws_quotes = record
747 .quotes
748 .iter()
749 .map(|q| WsRfqQuoteEntry {
750 quote_id: q.quote_id.to_string(),
751 net_premium: q.net_premium,
752 expires_at: q.received_at + q.valid_for_ms,
753 })
754 .collect();
755 pubsub.publish_rfq_quotes(WsRfqQuotes {
756 rfq_id: record.rfq_id.to_string(),
757 quotes: ws_quotes,
758 status: record.status.as_str().to_string(),
759 taker_wallet: record.taker_wallet,
760 });
761 }
762 }
763 HandleQpResponseResult::RpiQuoteRejected { record, reason } => {
764 info!(
765 "QP {} RPI quote rejected for RFQ {}: {}",
766 qp_wallet.as_hex(),
767 rfq_id,
768 reason
769 );
770 if let Some(ref pubsub) = state.pubsub {
771 pubsub.publish_to_channel(
772 "rfq",
773 WsMessage::RfqStatusUpdate(WsRfqStatusUpdate {
774 rfq_id: record.rfq_id.to_string(),
775 status: record.status.as_str().to_string(),
776 taker_wallet: record.taker_wallet,
777 }),
778 );
779 }
780 }
781 }
782 }
783 Err(e) => {
784 if e.contains("invalid state for quotes") {
790 if let Some(record) = rfq_manager.get_rfq(&rfq_uuid) {
791 if record.status == hypercall_types::RfqStatus::Accepted
792 || record.status == hypercall_types::RfqStatus::Executed
793 {
794 let winning_quote_id = record
795 .accepted_quote_id
796 .map(|id| id.to_string())
797 .unwrap_or_default();
798 state.session_manager.send_to_qp(
799 qp_wallet,
800 QpOutboundMessage::RfqAlreadyFilled {
801 rfq_id: rfq_id.clone(),
802 filled_by_quote_id: winning_quote_id,
803 },
804 );
805 }
806 }
807 }
808 warn!(
809 "QP {} quote rejected for RFQ {}: {}",
810 qp_wallet.as_hex(),
811 rfq_id,
812 e
813 );
814 }
815 }
816 } else {
817 warn!(
818 "Received RFQ response from QP {} but RfqManager not wired",
819 qp_wallet.as_hex()
820 );
821 }
822 }
823 }
824}
825
826fn publish_indicative_updates(
827 pubsub: &Option<Arc<dyn RfqWebsocketPublisher>>,
828 updates: Vec<(String, Option<AggregatedIndicativeQuote>)>,
829) {
830 let Some(pubsub) = pubsub else {
831 return;
832 };
833
834 for (instrument, aggregate) in updates {
835 let message = match aggregate {
836 Some(aggregate) => WsIndicativeMarketData {
837 instrument: aggregate.instrument,
838 best_bid: Some(aggregate.best_bid),
839 best_ask: Some(aggregate.best_ask),
840 bid_iv: None,
841 ask_iv: None,
842 indicative_bid_size: Some(aggregate.indicative_bid_size),
843 indicative_ask_size: Some(aggregate.indicative_ask_size),
844 num_providers: aggregate.num_providers,
845 timestamp: aggregate.updated_at,
846 },
847 None => WsIndicativeMarketData {
848 instrument,
849 best_bid: None,
850 best_ask: None,
851 bid_iv: None,
852 ask_iv: None,
853 indicative_bid_size: None,
854 indicative_ask_size: None,
855 num_providers: 0,
856 timestamp: hypercall_types::utils::get_timestamp_millis(),
857 },
858 };
859
860 pubsub.publish_indicative_market_data(message);
861 }
862}
863
864use std::str::FromStr;
865
866#[cfg(test)]
867mod tests {
868 use super::*;
869 use crate::rfq::qp_sessions::QpSessionManager;
870 use crate::websocket::{uuid, PubSubManager, WsMessage};
871 use hypercall_ws_protocol::QpRfqLeg;
872 use rust_decimal_macros::dec;
873 use std::sync::Arc;
874
875 fn test_wallet(id: u8) -> WalletAddress {
876 let mut bytes = [0u8; 20];
877 bytes[19] = id;
878 WalletAddress::from(alloy::primitives::Address::from(bytes))
879 }
880
881 #[test]
882 fn test_session_manager_register_remove() {
883 let mgr = QpSessionManager::new();
884 let wallet = test_wallet(1);
885 let (tx, _rx) = mpsc::unbounded_channel();
886
887 let session_id = mgr.register_session(wallet, tx);
888 assert_eq!(mgr.get_connected_wallets().len(), 1);
889
890 mgr.remove_session(&wallet, session_id);
891 assert_eq!(mgr.get_connected_wallets().len(), 0);
892 }
893
894 #[test]
895 fn test_session_manager_multiple_sessions_per_wallet() {
896 let mgr = QpSessionManager::new();
897 let wallet = test_wallet(1);
898 let (tx1, _rx1) = mpsc::unbounded_channel();
899 let (tx2, _rx2) = mpsc::unbounded_channel();
900
901 let s1 = mgr.register_session(wallet, tx1);
902 let s2 = mgr.register_session(wallet, tx2);
903
904 assert_eq!(mgr.get_connected_wallets().len(), 1);
906
907 assert!(
912 !mgr.remove_session(&wallet, s1),
913 "remove_session must return false while other sessions remain"
914 );
915 assert_eq!(mgr.get_connected_wallets().len(), 1);
916
917 assert!(
918 mgr.remove_session(&wallet, s2),
919 "remove_session must return true when the last session drops"
920 );
921 assert_eq!(mgr.get_connected_wallets().len(), 0);
922 }
923
924 #[test]
925 fn test_send_to_qp_fans_out() {
926 let mgr = QpSessionManager::new();
927 let wallet = test_wallet(1);
928 let (tx1, mut rx1) = mpsc::unbounded_channel();
929 let (tx2, mut rx2) = mpsc::unbounded_channel();
930
931 mgr.register_session(wallet, tx1);
932 mgr.register_session(wallet, tx2);
933
934 mgr.send_to_qp(
935 &wallet,
936 QpOutboundMessage::Authenticated {
937 wallet: "0x01".to_string(),
938 },
939 );
940
941 assert!(rx1.try_recv().is_ok());
942 assert!(rx2.try_recv().is_ok());
943 }
944
945 #[test]
946 fn test_send_rfq_to_eligible() {
947 let mgr = QpSessionManager::new();
948 let w1 = test_wallet(1);
949 let w2 = test_wallet(2);
950 let w3 = test_wallet(3);
951 let (tx1, mut rx1) = mpsc::unbounded_channel();
952 let (tx2, mut rx2) = mpsc::unbounded_channel();
953 let (tx3, mut rx3) = mpsc::unbounded_channel();
954
955 mgr.register_session(w1, tx1);
956 mgr.register_session(w2, tx2);
957 mgr.register_session(w3, tx3);
958
959 mgr.send_rfq_to_eligible(
961 &[w1, w3],
962 QpOutboundMessage::RfqRequest {
963 rfq_id: "test".to_string(),
964 legs: vec![],
965 taker_wallet: "0x00".to_string(),
966 request_timestamp: 0,
967 response_deadline_ms: 5000,
968 auto_accept_limit: None,
969 auto_execute: false,
970 taker_limit_price: None,
971 reference_price: None,
972 min_improvement_tick: None,
973 auction_deadline_ms: None,
974 requires_price_improvement: false,
975 },
976 );
977
978 assert!(rx1.try_recv().is_ok());
979 assert!(rx2.try_recv().is_err()); assert!(rx3.try_recv().is_ok());
981 }
982
983 #[test]
984 fn test_qp_inbound_message_deserialize() {
985 let json = r#"{"type":"indicative_quote_update","quotes":[{"instrument":"ETH-C-3000","bid_price":"120.5","ask_price":"125.0","max_bid_size":"10","max_ask_size":"10"}]}"#;
986 let msg: QpInboundMessage = serde_json::from_str(json).unwrap();
987 assert!(matches!(
988 msg,
989 QpInboundMessage::IndicativeQuoteUpdate { .. }
990 ));
991 }
992
993 #[test]
994 fn test_publish_indicative_updates_sends_market_data() {
995 let pubsub = PubSubManager::new();
996 let client_id = uuid::Uuid::new_v4();
997 let (tx, mut rx) = mpsc::unbounded_channel::<Arc<WsMessage>>();
998 pubsub.add_client(client_id, tx, false, None);
999 pubsub
1000 .subscribe(
1001 client_id,
1002 "indicative_market_data".to_string(),
1003 None,
1004 None,
1005 None,
1006 )
1007 .unwrap();
1008
1009 publish_indicative_updates(
1010 &Some(Arc::new(pubsub)),
1011 vec![(
1012 "ETH-20260131-3000-C".to_string(),
1013 Some(AggregatedIndicativeQuote {
1014 instrument: "ETH-20260131-3000-C".to_string(),
1015 best_bid: dec!(120.5),
1016 best_ask: dec!(125.0),
1017 indicative_bid_size: dec!(10),
1018 indicative_ask_size: dec!(12),
1019 num_providers: 2,
1020 updated_at: 1_737_331_200_000,
1021 }),
1022 )],
1023 );
1024
1025 let message = rx.try_recv().expect("indicative update should publish");
1026 match message.as_ref() {
1027 WsMessage::IndicativeMarketData(update) => {
1028 assert_eq!(update.instrument, "ETH-20260131-3000-C");
1029 assert_eq!(update.best_bid, Some(dec!(120.5)));
1030 assert_eq!(update.best_ask, Some(dec!(125.0)));
1031 assert_eq!(update.indicative_bid_size, Some(dec!(10)));
1032 assert_eq!(update.indicative_ask_size, Some(dec!(12)));
1033 assert_eq!(update.num_providers, 2);
1034 assert_eq!(update.timestamp, 1_737_331_200_000);
1035 }
1036 other => panic!("expected IndicativeMarketData, got {other:?}"),
1037 }
1038 }
1039
1040 #[test]
1041 fn test_publish_indicative_updates_sends_removal() {
1042 let pubsub = PubSubManager::new();
1043 let client_id = uuid::Uuid::new_v4();
1044 let (tx, mut rx) = mpsc::unbounded_channel::<Arc<WsMessage>>();
1045 pubsub.add_client(client_id, tx, false, None);
1046 pubsub
1047 .subscribe(
1048 client_id,
1049 "indicative_market_data".to_string(),
1050 None,
1051 None,
1052 None,
1053 )
1054 .unwrap();
1055
1056 publish_indicative_updates(
1057 &Some(Arc::new(pubsub)),
1058 vec![("ETH-20260131-3000-C".to_string(), None)],
1059 );
1060
1061 let message = rx.try_recv().expect("indicative removal should publish");
1062 match message.as_ref() {
1063 WsMessage::IndicativeMarketData(update) => {
1064 assert_eq!(update.instrument, "ETH-20260131-3000-C");
1065 assert_eq!(update.best_bid, None);
1066 assert_eq!(update.best_ask, None);
1067 assert_eq!(update.indicative_bid_size, None);
1068 assert_eq!(update.indicative_ask_size, None);
1069 assert_eq!(update.num_providers, 0);
1070 assert!(update.timestamp > 0);
1071 }
1072 other => panic!("expected IndicativeMarketData removal, got {other:?}"),
1073 }
1074 }
1075
1076 #[test]
1077 fn test_qp_outbound_message_serialize() {
1078 let msg = QpOutboundMessage::RfqRequest {
1079 rfq_id: "abc".to_string(),
1080 legs: vec![QpRfqLeg {
1081 instrument: "ETH-C-3000".to_string(),
1082 side: "buy".to_string(),
1083 size: "5.0".to_string(),
1084 }],
1085 taker_wallet: "0x123".to_string(),
1086 request_timestamp: 1000,
1087 response_deadline_ms: 5000,
1088 auto_accept_limit: None,
1089 auto_execute: false,
1090 taker_limit_price: None,
1091 reference_price: None,
1092 min_improvement_tick: None,
1093 auction_deadline_ms: None,
1094 requires_price_improvement: false,
1095 };
1096 let json = serde_json::to_string(&msg).unwrap();
1097 assert!(json.contains("rfq_request"));
1098 assert!(json.contains("ETH-C-3000"));
1099 assert!(!json.contains("auto_accept_limit"));
1100 }
1101
1102 #[test]
1103 fn test_qp_outbound_message_serializes_auto_accept_limit() {
1104 let msg = QpOutboundMessage::RfqRequest {
1105 rfq_id: "abc".to_string(),
1106 legs: vec![QpRfqLeg {
1107 instrument: "BTC-20260501-90000-C".to_string(),
1108 side: "buy".to_string(),
1109 size: "1".to_string(),
1110 }],
1111 taker_wallet: "0x123".to_string(),
1112 request_timestamp: 1000,
1113 response_deadline_ms: 5000,
1114 auto_accept_limit: Some("3999".to_string()),
1115 auto_execute: true,
1116 taker_limit_price: Some("3999".to_string()),
1117 reference_price: Some("3999".to_string()),
1118 min_improvement_tick: Some("0.0001".to_string()),
1119 auction_deadline_ms: Some(2000),
1120 requires_price_improvement: true,
1121 };
1122 let json = serde_json::to_string(&msg).unwrap();
1123 assert!(json.contains(r#""auto_accept_limit":"3999""#));
1124 assert!(json.contains(r#""auto_execute":true"#));
1125 assert!(json.contains(r#""requires_price_improvement":true"#));
1126 }
1127}