1use super::*;
2use crate::rsm::apply::MmpFillUpdate;
3use hypercall_runtime_api::{RfqExecuteCommand, RfqExecuteRequest, RfqExecuteResult};
4use hypercall_types::RFQ_SELF_TRADE_REJECTION_REASON;
5use hypercall_types::{to_contract_units_decimal, Fill, FillSource, Side};
6use hypercall_types::{EngineMessage, RfqFillLeg, RfqFillMessage};
7use metrics::histogram;
8use std::time::Instant;
9
10pub(crate) struct RfqExecutionPlan {
20 pub fill_id: String,
21 pub events: Vec<EngineMessage>,
22 pub mmp_updates: Vec<MmpFillUpdate>,
23}
24
25impl UnifiedEngine {
26 pub(crate) async fn handle_rfq_execute(&mut self, request: RfqExecuteRequest) {
40 let handle_start = Instant::now();
41 let cmd = &request.command;
42 let rfq_span = tracing::info_span!(
43 "engine.rfq_execute",
44 rfq_id = %cmd.rfq_id,
45 taker = %cmd.taker_wallet,
46 qp = %cmd.qp_wallet,
47 request_id = %cmd.request_id,
48 );
49 let _guard = rfq_span.enter();
50
51 if !cmd.request_id.is_empty() {
54 if let Some(cached_fill_id) = self.lookup_cached_rfq_response(&cmd.request_id).await {
55 debug!(
56 "RFQ idempotency hit for rfq_id={}, request_id={}, cached fill_id={}",
57 cmd.rfq_id, cmd.request_id, cached_fill_id
58 );
59 let _ = request.response_tx.send(RfqExecuteResult::Success {
60 fill_id: cached_fill_id,
61 });
62 return;
63 }
64 }
65
66 let timestamp = cmd.timestamp_ms;
67 if timestamp == 0 || cmd.fill_id.is_empty() {
68 let _ = request.response_tx.send(RfqExecuteResult::Failed {
69 reason: "RFQ command missing deterministic timestamp_ms or fill_id".to_string(),
70 });
71 return;
72 }
73 let env = crate::rsm::apply::CommandEnvelope::new(
74 timestamp,
75 crate::rsm::apply::EngineCommand::RfqExecute(cmd.clone()),
76 );
77 let apply_start = Instant::now();
78 let apply_output = match self.apply(env) {
79 Ok(output) => output,
80 Err(crate::rsm::unified_engine::apply_interface::EngineError::Rejected(reason)) => {
81 warn!(
82 rfq_id = %cmd.rfq_id,
83 reason = %reason,
84 "RFQ rejected by engine (nonce or admission)"
85 );
86 let _ = request
87 .response_tx
88 .send(RfqExecuteResult::Failed { reason });
89 return;
90 }
91 Err(e) => {
92 panic!(
93 "JOURNAL_FATAL: apply() failed for RFQ rfq_id={}: {}",
94 cmd.rfq_id, e
95 )
96 }
97 };
98 histogram!("ht_rfq_apply_seconds").record(apply_start.elapsed().as_secs_f64());
99
100 let rfq_plan = apply_output.rfq_plan.unwrap_or_else(|| {
102 panic!(
103 "JOURNAL_FATAL: apply() returned no rfq_plan for RFQ rfq_id={}",
104 cmd.rfq_id
105 )
106 });
107
108 match rfq_plan {
109 Err(failure) => {
110 warn!(
111 "RFQ execution rejected at planning: rfq_id={}, reason={}",
112 cmd.rfq_id,
113 match &failure {
114 RfqExecuteResult::Failed { reason } => reason.as_str(),
115 _ => "unknown",
116 }
117 );
118 let _ = request.response_tx.send(failure);
119 }
120 Ok(plan) => {
121 info!(
122 "RFQ planned: rfq_id={}, fill_id={}, legs={}, premium={}",
123 cmd.rfq_id,
124 plan.fill_id,
125 cmd.legs.len(),
126 cmd.net_premium,
127 );
128
129 self.process_rfq_journaled(
130 request,
131 plan.fill_id,
132 apply_output.events,
133 apply_output.balance_updates,
134 plan.mmp_updates,
135 )
136 .await;
137
138 histogram!("ht_rfq_handle_total_seconds")
139 .record(handle_start.elapsed().as_secs_f64());
140 }
141 }
142 }
143
144 async fn lookup_cached_rfq_response(&self, request_id: &str) -> Option<String> {
158 let req_uuid = uuid::Uuid::parse_str(request_id).ok()?;
159 if !self.known_request_ids.contains(&req_uuid) {
160 return None;
161 }
162 let writer = self.journal_writer.as_ref()?.clone();
163 let lookup_uuid = req_uuid;
164 let existing = tokio::task::spawn_blocking(move || writer.get_by_request_id(&lookup_uuid))
165 .await
166 .ok()?
167 .ok()??;
168 let bytes = existing.response_data.as_ref()?;
170 if bytes.len() < 2 {
171 return None;
172 }
173 let fill_id: String = rmp_serde::from_slice(&bytes[1..]).ok()?;
174 Some(fill_id)
175 }
176
177 pub(super) fn encode_rfq_response_data(fill_id: &str) -> Vec<u8> {
180 let mut buf = Vec::with_capacity(64);
181 buf.push(1u8);
182 rmp_serde::encode::write_named(&mut buf, &fill_id).expect("encode fill_id");
183 buf
184 }
185
186 pub(crate) fn plan_rfq_execution(
191 &mut self,
192 cmd: &RfqExecuteCommand,
193 ) -> Result<RfqExecutionPlan, RfqExecuteResult> {
194 if cmd.timestamp_ms == 0 {
195 return Err(RfqExecuteResult::Failed {
196 reason: "RFQ command missing timestamp_ms".to_string(),
197 });
198 }
199 if cmd.fill_id.is_empty() {
200 return Err(RfqExecuteResult::Failed {
201 reason: "RFQ command missing fill_id".to_string(),
202 });
203 }
204
205 if cmd.taker_wallet == cmd.qp_wallet {
206 return Err(RfqExecuteResult::Failed {
207 reason: RFQ_SELF_TRADE_REJECTION_REASON.to_string(),
208 });
209 }
210
211 for leg in &cmd.legs {
213 if self.ctx.expired_instruments.get(&leg.instrument) == Some(&true)
214 || self
215 .expiry_manager
216 .is_instrument_expired(&leg.instrument, &self.ctx.orderbooks)
217 {
218 return Err(RfqExecuteResult::Failed {
219 reason: format!("Instrument has expired: {}", leg.instrument),
220 });
221 }
222 if !self.ctx.orderbooks.contains_key(&leg.instrument) {
223 return Err(RfqExecuteResult::Failed {
224 reason: format!("Invalid symbol: {}", leg.instrument),
225 });
226 }
227 match self.ctx.instrument_trading_modes.get(&leg.instrument) {
228 None => {
229 return Err(RfqExecuteResult::Failed {
230 reason: format!("Instrument not found: {}", leg.instrument),
231 });
232 }
233 Some(mode) if !mode.allows_rfq() => {
234 return Err(RfqExecuteResult::Failed {
235 reason: format!(
236 "Instrument {} is orderbook-only and does not accept RFQ trades",
237 leg.instrument
238 ),
239 });
240 }
241 Some(_) => {}
242 }
243 }
244
245 {
251 let now = cmd.timestamp_ms;
252 let mut checked_underlyings: std::collections::HashSet<String> =
253 std::collections::HashSet::new();
254 for leg in &cmd.legs {
255 let Some(underlying) = leg.instrument.split('-').next().map(|s| s.to_string())
256 else {
257 continue;
258 };
259 if !checked_underlyings.insert(underlying.clone()) {
260 continue;
261 }
262 let mmp_key = (cmd.qp_wallet, underlying.clone());
263 if let Some(mmp_state) = self.ctx.mmp_state.get(&mmp_key) {
264 if mmp_state.is_frozen(now) {
265 return Err(RfqExecuteResult::Failed {
266 reason: format!("Quote provider MMP is frozen for {}", underlying),
267 });
268 }
269 }
270 }
271 }
272
273 {
287 use rust_decimal_macros::dec;
288
289 if cmd.net_premium < dec!(0) {
290 let taker_debit = -cmd.net_premium;
291 let taker_cash = self
292 .ctx
293 .balance_ledger
294 .get(&cmd.taker_wallet)
295 .copied()
296 .unwrap_or(dec!(0));
297 tracing::debug!(
298 request_id = %cmd.request_id,
299 payer_wallet = %cmd.taker_wallet,
300 payer_role = "taker",
301 payer_cash = %taker_cash,
302 required_debit = %taker_debit,
303 "RFQ balance_ledger premium pre-check"
304 );
305 if taker_cash < taker_debit {
306 tracing::warn!(
307 request_id = %cmd.request_id,
308 payer_wallet = %cmd.taker_wallet,
309 payer_role = "taker",
310 payer_cash = %taker_cash,
311 required_debit = %taker_debit,
312 "Rejecting RFQ: insufficient balance_ledger cash for premium debit"
313 );
314 return Err(RfqExecuteResult::Failed {
315 reason: format!(
316 "Taker cash {} insufficient to cover net premium debit {}",
317 taker_cash, taker_debit
318 ),
319 });
320 }
321 } else if cmd.net_premium > dec!(0) {
322 let qp_debit = cmd.net_premium;
323 let qp_cash = self
324 .ctx
325 .balance_ledger
326 .get(&cmd.qp_wallet)
327 .copied()
328 .unwrap_or(dec!(0));
329 tracing::debug!(
330 request_id = %cmd.request_id,
331 payer_wallet = %cmd.qp_wallet,
332 payer_role = "qp",
333 payer_cash = %qp_cash,
334 required_debit = %qp_debit,
335 "RFQ balance_ledger premium pre-check"
336 );
337 if qp_cash < qp_debit {
338 tracing::warn!(
339 request_id = %cmd.request_id,
340 payer_wallet = %cmd.qp_wallet,
341 payer_role = "qp",
342 payer_cash = %qp_cash,
343 required_debit = %qp_debit,
344 "Rejecting RFQ: insufficient balance_ledger cash for premium debit"
345 );
346 return Err(RfqExecuteResult::Failed {
347 reason: format!(
348 "QP cash {} insufficient to cover net premium debit {}",
349 qp_cash, qp_debit
350 ),
351 });
352 }
353 }
354 }
357
358 let taker_orders: Vec<OrderInfo> = cmd
366 .legs
367 .iter()
368 .map(|leg| OrderInfo {
369 symbol: leg.instrument.clone(),
370 side: leg.taker_side,
371 price: leg.price,
372 size: to_contract_units_decimal(&leg.instrument, leg.size),
373 tif: hypercall_types::TimeInForce::IOC,
374 client_id: None,
375 order_id: None,
376 is_perp: false,
377 underlying: leg.instrument.split('-').next().map(|s| s.to_string()),
378 reduce_only: Some(false),
379 nonce: None,
380 signature: None,
381 mmp_enabled: false,
382 builder_code_address: cmd.builder_code_address,
383 })
384 .collect();
385
386 for order in &taker_orders {
387 if let Err(reason) = self.check_order_limits(&cmd.taker_wallet, order) {
388 return Err(RfqExecuteResult::Failed {
389 reason: format!("Taker order limit failed: {}", reason),
390 });
391 }
392 if let Err(reason) = LiquidationManager::check_preliquidation_order_allowed(
393 &self.ctx.deps,
394 &self.ctx.engine_positions,
395 &cmd.taker_wallet,
396 order,
397 ) {
398 return Err(RfqExecuteResult::Failed {
399 reason: format!("Taker pre-liquidation check failed: {}", reason),
400 });
401 }
402 if let Err(reason) = self.margin_manager.check_tier_restrictions(
403 &self.ctx.deps,
404 &self.ctx.engine_positions,
405 &self.ctx.balance_ledger,
406 &cmd.taker_wallet,
407 order,
408 &self.ctx.order_index,
409 ) {
410 return Err(RfqExecuteResult::Failed {
411 reason: format!("Taker tier restriction failed: {}", reason),
412 });
413 }
414 }
415
416 if let Err(reason) = self.margin_manager.check_margin_for_orders(
417 &self.ctx.deps,
418 &self.ctx.engine_positions,
419 &self.ctx.balance_ledger,
420 &cmd.taker_wallet,
421 &taker_orders,
422 &self.ctx.order_index,
423 ) {
424 return Err(RfqExecuteResult::Failed {
425 reason: format!("Taker margin check failed: {}", reason),
426 });
427 }
428
429 let qp_orders: Vec<OrderInfo> = cmd
431 .legs
432 .iter()
433 .map(|leg| {
434 let qp_side = match leg.taker_side {
435 Side::Buy => Side::Sell,
436 Side::Sell => Side::Buy,
437 };
438 OrderInfo {
439 symbol: leg.instrument.clone(),
440 side: qp_side,
441 price: leg.price,
442 size: to_contract_units_decimal(&leg.instrument, leg.size),
443 tif: hypercall_types::TimeInForce::IOC,
444 client_id: None,
445 order_id: None,
446 is_perp: false,
447 underlying: leg.instrument.split('-').next().map(|s| s.to_string()),
448 reduce_only: Some(false),
449 nonce: None,
450 signature: None,
451 mmp_enabled: false,
452 builder_code_address: None,
453 }
454 })
455 .collect();
456
457 for order in &qp_orders {
458 if let Err(reason) = self.check_order_limits(&cmd.qp_wallet, order) {
459 return Err(RfqExecuteResult::Failed {
460 reason: format!("QP order limit failed: {}", reason),
461 });
462 }
463 if let Err(reason) = LiquidationManager::check_preliquidation_order_allowed(
464 &self.ctx.deps,
465 &self.ctx.engine_positions,
466 &cmd.qp_wallet,
467 order,
468 ) {
469 return Err(RfqExecuteResult::Failed {
470 reason: format!("QP pre-liquidation check failed: {}", reason),
471 });
472 }
473 if let Err(reason) = self.margin_manager.check_tier_restrictions(
474 &self.ctx.deps,
475 &self.ctx.engine_positions,
476 &self.ctx.balance_ledger,
477 &cmd.qp_wallet,
478 order,
479 &self.ctx.order_index,
480 ) {
481 return Err(RfqExecuteResult::Failed {
482 reason: format!("QP tier restriction failed: {}", reason),
483 });
484 }
485 }
486
487 if let Err(reason) = self.check_pm_settlement_pool_gate_for_order_groups(&[
488 (&cmd.taker_wallet, &taker_orders),
489 (&cmd.qp_wallet, &qp_orders),
490 ]) {
491 return Err(RfqExecuteResult::Failed {
492 reason: format!("RFQ PM settlement gate failed: {}", reason),
493 });
494 }
495
496 if let Err(reason) = self.margin_manager.check_margin_for_quote_provider_orders(
497 &self.ctx.deps,
498 &self.ctx.engine_positions,
499 &self.ctx.balance_ledger,
500 &cmd.qp_wallet,
501 &qp_orders,
502 &self.ctx.order_index,
503 ) {
504 return Err(RfqExecuteResult::Failed {
505 reason: format!("QP margin check failed: {}", reason),
506 });
507 }
508
509 let leg_implied_premium: rust_decimal::Decimal = cmd
525 .legs
526 .iter()
527 .map(|l| {
528 let magnitude = l.price * l.size;
529 match l.taker_side {
530 Side::Buy => -magnitude,
531 Side::Sell => magnitude,
532 }
533 })
534 .sum();
535 if leg_implied_premium != cmd.net_premium {
536 return Err(RfqExecuteResult::Failed {
537 reason: format!(
538 "Leg-implied premium {} does not match signed net_premium {} for rfq_id={}",
539 leg_implied_premium, cmd.net_premium, cmd.rfq_id
540 ),
541 });
542 }
543
544 let plan = self.build_rfq_execution_plan_unchecked(cmd);
545
546 Ok(plan)
547 }
548
549 pub(crate) fn build_rfq_execution_plan_unchecked(
555 &mut self,
556 cmd: &RfqExecuteCommand,
557 ) -> RfqExecutionPlan {
558 let fill_id = cmd.fill_id.clone();
559 let timestamp = cmd.timestamp_ms;
560
561 let mut events: Vec<EngineMessage> = Vec::with_capacity(cmd.legs.len() + 1);
562 let mut rfq_fill_legs: Vec<RfqFillLeg> = Vec::with_capacity(cmd.legs.len());
563 let mut mmp_updates: Vec<MmpFillUpdate> = Vec::with_capacity(cmd.legs.len());
564
565 for leg in &cmd.legs {
566 let trade_id = self.ctx.next_trade_id;
567 self.ctx.next_trade_id += 1;
568
569 let size_in_contract_units = to_contract_units_decimal(&leg.instrument, leg.size);
571 let price_f64 = leg
572 .price
573 .to_f64()
574 .expect("CRITICAL_FAILURE: RFQ leg price is not representable as f64");
575 let size_u64 = size_in_contract_units.trunc().to_u64().expect(
576 "CRITICAL_FAILURE: RFQ leg size is not representable as u64 contract units",
577 );
578 let fee_calc = self.fee_service.get_fees(
579 &cmd.qp_wallet.as_hex(),
580 &cmd.taker_wallet.as_hex(),
581 price_f64,
582 size_u64,
583 cmd.builder_code_address.as_ref(),
584 );
585 let taker_fee = rust_decimal::Decimal::from_f64_retain(fee_calc.taker_fee)
586 .expect("CRITICAL_FAILURE: RFQ taker fee is not representable as Decimal");
587
588 let mut fill = Fill {
589 trade_id,
590 taker_order_id: 0,
591 maker_order_id: 0,
592 symbol: leg.instrument.clone(),
593 price: leg.price,
594 size: size_in_contract_units,
595 taker_side: leg.taker_side,
596 taker_wallet_address: cmd.taker_wallet,
597 maker_wallet_address: cmd.qp_wallet,
598 fee: taker_fee,
599 is_taker: true,
600 timestamp,
601 builder_code_address: cmd.builder_code_address,
602 builder_code_fee: fee_calc.builder_code_fee,
603 source: FillSource::Rfq {
604 rfq_id: cmd.rfq_id.clone(),
605 quote_id: cmd.quote_id.clone(),
606 },
607 taker_realized_pnl: None,
608 maker_realized_pnl: None,
609 underlying_notional: None,
610 };
611 self.attach_fill_underlying_notional(&mut fill);
612
613 events.push(EngineMessage::OrderFilled {
614 accounting: hypercall_engine::FillAccounting::zero(fill.trade_id),
615 fill: fill.clone(),
616 });
617
618 rfq_fill_legs.push(RfqFillLeg {
619 instrument: leg.instrument.clone(),
620 taker_side: leg.taker_side,
621 price: leg.price,
622 size: leg.size,
623 });
624
625 if let Some(underlying) = leg.instrument.split('-').next() {
626 mmp_updates.push(MmpFillUpdate {
627 qp_wallet: cmd.qp_wallet,
628 underlying: underlying.to_string(),
629 fill,
630 timestamp_ms: timestamp,
631 });
632 }
633 }
634
635 events.push(EngineMessage::RfqFilled(RfqFillMessage {
638 fill_id: fill_id.clone(),
639 rfq_id: cmd.rfq_id.clone(),
640 quote_id: cmd.quote_id.clone(),
641 taker_wallet: cmd.taker_wallet,
642 qp_wallet: cmd.qp_wallet,
643 legs: rfq_fill_legs,
644 net_premium: cmd.net_premium,
645 taker_accept_signature: cmd.taker_signature.clone(),
646 timestamp,
647 }));
648
649 RfqExecutionPlan {
650 fill_id,
651 events,
652 mmp_updates,
653 }
654 }
655}