Skip to main content

hypercall/rsm/unified_engine/
rfq_handler.rs

1use super::*;
2use crate::rsm::apply::MmpFillUpdate;
3use hypercall_runtime_api::{RfqExecuteCommand, RfqExecuteRequest, RfqExecuteResult};
4use hypercall_types::RFQ_SELF_TRADE_REJECTION_REASON;
5use hypercall_types::{to_contract_units_decimal, Fill, FillSource, Side};
6use hypercall_types::{EngineMessage, RfqFillLeg, RfqFillMessage};
7use metrics::histogram;
8use std::time::Instant;
9
10/// Output of `plan_rfq_execution` — the planned execution as a list of
11/// `EngineMessage`s plus a fill_id. The caller (the journal pipeline) is
12/// responsible for actually persisting and applying these events.
13///
14/// Building this is **pure**: no DB writes, no portfolio mutations, no
15/// WS emissions. Validation that can fail (instruments, MMP, margin,
16/// premium-vs-legs cross-check) happens here and returns `Failed` before
17/// any events are produced. Once the plan exists, journaling +
18/// projection are unconditional.
19pub(crate) struct RfqExecutionPlan {
20    pub fill_id: String,
21    pub events: Vec<EngineMessage>,
22    pub mmp_updates: Vec<MmpFillUpdate>,
23}
24
25impl UnifiedEngine {
26    /// Handle an RFQ execution request within the engine's single-threaded loop.
27    ///
28    /// The flow:
29    /// 1. `plan_rfq_execution` validates the command and produces a list of
30    ///    `EngineMessage` events (per-leg `OrderFilled` + summary `RfqFilled`)
31    ///    along with post-commit MMP updates. No state mutation, no DB
32    ///    writes, no WS emissions yet.
33    /// 2. `process_rfq_journaled` runs the events through the same journal
34    ///    pipeline as `process_order_journaled`: serialize → batch send →
35    ///    wait for journal commit ACK → apply portfolio projection under
36    ///    barrier → emit events to WS → ACK the caller.
37    /// 3. Post-journal MMP updates fire after the projection barrier
38    ///    releases — they're side effects that don't need durability.
39    pub(crate) async fn handle_rfq_execute(&mut self, request: RfqExecuteRequest) {
40        let handle_start = Instant::now();
41        let cmd = &request.command;
42        let rfq_span = tracing::info_span!(
43            "engine.rfq_execute",
44            rfq_id = %cmd.rfq_id,
45            taker = %cmd.taker_wallet,
46            qp = %cmd.qp_wallet,
47            request_id = %cmd.request_id,
48        );
49        let _guard = rfq_span.enter();
50
51        // Idempotency fast-path: check before apply() to avoid
52        // re-planning and double-advancing the hash chain.
53        if !cmd.request_id.is_empty() {
54            if let Some(cached_fill_id) = self.lookup_cached_rfq_response(&cmd.request_id).await {
55                debug!(
56                    "RFQ idempotency hit for rfq_id={}, request_id={}, cached fill_id={}",
57                    cmd.rfq_id, cmd.request_id, cached_fill_id
58                );
59                let _ = request.response_tx.send(RfqExecuteResult::Success {
60                    fill_id: cached_fill_id,
61                });
62                return;
63            }
64        }
65
66        let timestamp = cmd.timestamp_ms;
67        if timestamp == 0 || cmd.fill_id.is_empty() {
68            let _ = request.response_tx.send(RfqExecuteResult::Failed {
69                reason: "RFQ command missing deterministic timestamp_ms or fill_id".to_string(),
70            });
71            return;
72        }
73        let env = crate::rsm::apply::CommandEnvelope::new(
74            timestamp,
75            crate::rsm::apply::EngineCommand::RfqExecute(cmd.clone()),
76        );
77        let apply_start = Instant::now();
78        let apply_output = match self.apply(env) {
79            Ok(output) => output,
80            Err(crate::rsm::unified_engine::apply_interface::EngineError::Rejected(reason)) => {
81                warn!(
82                    rfq_id = %cmd.rfq_id,
83                    reason = %reason,
84                    "RFQ rejected by engine (nonce or admission)"
85                );
86                let _ = request
87                    .response_tx
88                    .send(RfqExecuteResult::Failed { reason });
89                return;
90            }
91            Err(e) => {
92                panic!(
93                    "JOURNAL_FATAL: apply() failed for RFQ rfq_id={}: {}",
94                    cmd.rfq_id, e
95                )
96            }
97        };
98        histogram!("ht_rfq_apply_seconds").record(apply_start.elapsed().as_secs_f64());
99
100        // Extract plan result from apply output
101        let rfq_plan = apply_output.rfq_plan.unwrap_or_else(|| {
102            panic!(
103                "JOURNAL_FATAL: apply() returned no rfq_plan for RFQ rfq_id={}",
104                cmd.rfq_id
105            )
106        });
107
108        match rfq_plan {
109            Err(failure) => {
110                warn!(
111                    "RFQ execution rejected at planning: rfq_id={}, reason={}",
112                    cmd.rfq_id,
113                    match &failure {
114                        RfqExecuteResult::Failed { reason } => reason.as_str(),
115                        _ => "unknown",
116                    }
117                );
118                let _ = request.response_tx.send(failure);
119            }
120            Ok(plan) => {
121                info!(
122                    "RFQ planned: rfq_id={}, fill_id={}, legs={}, premium={}",
123                    cmd.rfq_id,
124                    plan.fill_id,
125                    cmd.legs.len(),
126                    cmd.net_premium,
127                );
128
129                self.process_rfq_journaled(
130                    request,
131                    plan.fill_id,
132                    apply_output.events,
133                    apply_output.balance_updates,
134                    plan.mmp_updates,
135                )
136                .await;
137
138                histogram!("ht_rfq_handle_total_seconds")
139                    .record(handle_start.elapsed().as_secs_f64());
140            }
141        }
142    }
143
144    /// Look up a cached RFQ response by request_id. Returns
145    /// `Some(fill_id)` if the journal has already recorded this
146    /// request, `None` otherwise.
147    ///
148    /// Two-step check:
149    /// 1. Fast path: in-memory `known_request_ids` cache. Skips DB
150    ///    entirely if the uuid isn't in the cache.
151    /// 2. Slow path: query the journal writer for the full record and
152    ///    decode the cached fill_id from `response_data`.
153    ///
154    /// Returns `None` on any decode/lookup failure rather than
155    /// panicking — a stale cache entry should fall through to the
156    /// normal execution path, not crash the engine.
157    async fn lookup_cached_rfq_response(&self, request_id: &str) -> Option<String> {
158        let req_uuid = uuid::Uuid::parse_str(request_id).ok()?;
159        if !self.known_request_ids.contains(&req_uuid) {
160            return None;
161        }
162        let writer = self.journal_writer.as_ref()?.clone();
163        let lookup_uuid = req_uuid;
164        let existing = tokio::task::spawn_blocking(move || writer.get_by_request_id(&lookup_uuid))
165            .await
166            .ok()?
167            .ok()??;
168        // response_data layout: [version_byte=1][rmp(String fill_id)]
169        let bytes = existing.response_data.as_ref()?;
170        if bytes.len() < 2 {
171            return None;
172        }
173        let fill_id: String = rmp_serde::from_slice(&bytes[1..]).ok()?;
174        Some(fill_id)
175    }
176
177    /// Serialise an RFQ response (fill_id) for journal storage. Wire
178    /// format mirrors the orderbook command format: [version=1][rmp].
179    pub(super) fn encode_rfq_response_data(fill_id: &str) -> Vec<u8> {
180        let mut buf = Vec::with_capacity(64);
181        buf.push(1u8);
182        rmp_serde::encode::write_named(&mut buf, &fill_id).expect("encode fill_id");
183        buf
184    }
185
186    /// Validate the command and build the planned event list. Pure: no
187    /// DB writes, no portfolio mutations, no WS emissions. Used by
188    /// `handle_rfq_execute` for the live path AND by replay paths that
189    /// need to re-derive events without re-emitting.
190    pub(crate) fn plan_rfq_execution(
191        &mut self,
192        cmd: &RfqExecuteCommand,
193    ) -> Result<RfqExecutionPlan, RfqExecuteResult> {
194        if cmd.timestamp_ms == 0 {
195            return Err(RfqExecuteResult::Failed {
196                reason: "RFQ command missing timestamp_ms".to_string(),
197            });
198        }
199        if cmd.fill_id.is_empty() {
200            return Err(RfqExecuteResult::Failed {
201                reason: "RFQ command missing fill_id".to_string(),
202            });
203        }
204
205        if cmd.taker_wallet == cmd.qp_wallet {
206            return Err(RfqExecuteResult::Failed {
207                reason: RFQ_SELF_TRADE_REJECTION_REASON.to_string(),
208            });
209        }
210
211        // 1. Validate instruments exist and allow RFQ trading
212        for leg in &cmd.legs {
213            if self.ctx.expired_instruments.get(&leg.instrument) == Some(&true)
214                || self
215                    .expiry_manager
216                    .is_instrument_expired(&leg.instrument, &self.ctx.orderbooks)
217            {
218                return Err(RfqExecuteResult::Failed {
219                    reason: format!("Instrument has expired: {}", leg.instrument),
220                });
221            }
222            if !self.ctx.orderbooks.contains_key(&leg.instrument) {
223                return Err(RfqExecuteResult::Failed {
224                    reason: format!("Invalid symbol: {}", leg.instrument),
225                });
226            }
227            match self.ctx.instrument_trading_modes.get(&leg.instrument) {
228                None => {
229                    return Err(RfqExecuteResult::Failed {
230                        reason: format!("Instrument not found: {}", leg.instrument),
231                    });
232                }
233                Some(mode) if !mode.allows_rfq() => {
234                    return Err(RfqExecuteResult::Failed {
235                        reason: format!(
236                            "Instrument {} is orderbook-only and does not accept RFQ trades",
237                            leg.instrument
238                        ),
239                    });
240                }
241                Some(_) => {}
242            }
243        }
244
245        // 2. Check MMP frozen for QP across every underlying in the
246        //    command. A QP can be frozen on BTC but not ETH, so a BTC-leg
247        //    RFQ must reject while an ETH-leg RFQ passes. Walk all unique
248        //    underlyings and fail on the first frozen one.
249        //    Uses engine-internal mmp_state (synchronous, no async reads).
250        {
251            let now = cmd.timestamp_ms;
252            let mut checked_underlyings: std::collections::HashSet<String> =
253                std::collections::HashSet::new();
254            for leg in &cmd.legs {
255                let Some(underlying) = leg.instrument.split('-').next().map(|s| s.to_string())
256                else {
257                    continue;
258                };
259                if !checked_underlyings.insert(underlying.clone()) {
260                    continue;
261                }
262                let mmp_key = (cmd.qp_wallet, underlying.clone());
263                if let Some(mmp_state) = self.ctx.mmp_state.get(&mmp_key) {
264                    if mmp_state.is_frozen(now) {
265                        return Err(RfqExecuteResult::Failed {
266                            reason: format!("Quote provider MMP is frozen for {}", underlying),
267                        });
268                    }
269                }
270            }
271        }
272
273        // 3a. Cash coverage check on the signed net premium.
274        //
275        //     SPAN margin doesn't enforce cash floors: long-only option
276        //     positions have IM = 0 because the loss is capped at the
277        //     premium paid, but we never debit the premium from cash in
278        //     the hypothetical portfolio. That means a wallet with 10
279        //     USDC could pass SPAN for a 95 USDC long straddle. The
280        //     downstream fill would then fail at ledger update time,
281        //     which is the wrong place to find out.
282        //
283        //     We refuse at plan time instead: whichever wallet owes
284        //     `abs(net_premium)` must have that much liquid cash. If the
285        //     balance_ledger must be the source of truth for this pre-check.
286        {
287            use rust_decimal_macros::dec;
288
289            if cmd.net_premium < dec!(0) {
290                let taker_debit = -cmd.net_premium;
291                let taker_cash = self
292                    .ctx
293                    .balance_ledger
294                    .get(&cmd.taker_wallet)
295                    .copied()
296                    .unwrap_or(dec!(0));
297                tracing::debug!(
298                    request_id = %cmd.request_id,
299                    payer_wallet = %cmd.taker_wallet,
300                    payer_role = "taker",
301                    payer_cash = %taker_cash,
302                    required_debit = %taker_debit,
303                    "RFQ balance_ledger premium pre-check"
304                );
305                if taker_cash < taker_debit {
306                    tracing::warn!(
307                        request_id = %cmd.request_id,
308                        payer_wallet = %cmd.taker_wallet,
309                        payer_role = "taker",
310                        payer_cash = %taker_cash,
311                        required_debit = %taker_debit,
312                        "Rejecting RFQ: insufficient balance_ledger cash for premium debit"
313                    );
314                    return Err(RfqExecuteResult::Failed {
315                        reason: format!(
316                            "Taker cash {} insufficient to cover net premium debit {}",
317                            taker_cash, taker_debit
318                        ),
319                    });
320                }
321            } else if cmd.net_premium > dec!(0) {
322                let qp_debit = cmd.net_premium;
323                let qp_cash = self
324                    .ctx
325                    .balance_ledger
326                    .get(&cmd.qp_wallet)
327                    .copied()
328                    .unwrap_or(dec!(0));
329                tracing::debug!(
330                    request_id = %cmd.request_id,
331                    payer_wallet = %cmd.qp_wallet,
332                    payer_role = "qp",
333                    payer_cash = %qp_cash,
334                    required_debit = %qp_debit,
335                    "RFQ balance_ledger premium pre-check"
336                );
337                if qp_cash < qp_debit {
338                    tracing::warn!(
339                        request_id = %cmd.request_id,
340                        payer_wallet = %cmd.qp_wallet,
341                        payer_role = "qp",
342                        payer_cash = %qp_cash,
343                        required_debit = %qp_debit,
344                        "Rejecting RFQ: insufficient balance_ledger cash for premium debit"
345                    );
346                    return Err(RfqExecuteResult::Failed {
347                        reason: format!(
348                            "QP cash {} insufficient to cover net premium debit {}",
349                            qp_cash, qp_debit
350                        ),
351                    });
352                }
353            }
354            // net_premium == 0 (e.g. a perfect risk reversal) → no cash
355            // debit on either side, skip the cash check.
356        }
357
358        // 3b. Margin check — build one OrderInfo per leg for the taker
359        //     and one opposite-side OrderInfo per leg for the QP, then
360        //     score each wallet as a single hypothetical portfolio via
361        //     `check_margin_for_orders`. This is the whole reason we
362        //     can't loop per-leg: a hedged collar (short call + long put)
363        //     is healthy on the combined portfolio even though the short
364        //     leg alone would fail margin.
365        let taker_orders: Vec<OrderInfo> = cmd
366            .legs
367            .iter()
368            .map(|leg| OrderInfo {
369                symbol: leg.instrument.clone(),
370                side: leg.taker_side,
371                price: leg.price,
372                size: to_contract_units_decimal(&leg.instrument, leg.size),
373                tif: hypercall_types::TimeInForce::IOC,
374                client_id: None,
375                order_id: None,
376                is_perp: false,
377                underlying: leg.instrument.split('-').next().map(|s| s.to_string()),
378                reduce_only: Some(false),
379                nonce: None,
380                signature: None,
381                mmp_enabled: false,
382                builder_code_address: cmd.builder_code_address,
383            })
384            .collect();
385
386        for order in &taker_orders {
387            if let Err(reason) = self.check_order_limits(&cmd.taker_wallet, order) {
388                return Err(RfqExecuteResult::Failed {
389                    reason: format!("Taker order limit failed: {}", reason),
390                });
391            }
392            if let Err(reason) = LiquidationManager::check_preliquidation_order_allowed(
393                &self.ctx.deps,
394                &self.ctx.engine_positions,
395                &cmd.taker_wallet,
396                order,
397            ) {
398                return Err(RfqExecuteResult::Failed {
399                    reason: format!("Taker pre-liquidation check failed: {}", reason),
400                });
401            }
402            if let Err(reason) = self.margin_manager.check_tier_restrictions(
403                &self.ctx.deps,
404                &self.ctx.engine_positions,
405                &self.ctx.balance_ledger,
406                &cmd.taker_wallet,
407                order,
408                &self.ctx.order_index,
409            ) {
410                return Err(RfqExecuteResult::Failed {
411                    reason: format!("Taker tier restriction failed: {}", reason),
412                });
413            }
414        }
415
416        if let Err(reason) = self.margin_manager.check_margin_for_orders(
417            &self.ctx.deps,
418            &self.ctx.engine_positions,
419            &self.ctx.balance_ledger,
420            &cmd.taker_wallet,
421            &taker_orders,
422            &self.ctx.order_index,
423        ) {
424            return Err(RfqExecuteResult::Failed {
425                reason: format!("Taker margin check failed: {}", reason),
426            });
427        }
428
429        // QP takes the opposite side of every leg.
430        let qp_orders: Vec<OrderInfo> = cmd
431            .legs
432            .iter()
433            .map(|leg| {
434                let qp_side = match leg.taker_side {
435                    Side::Buy => Side::Sell,
436                    Side::Sell => Side::Buy,
437                };
438                OrderInfo {
439                    symbol: leg.instrument.clone(),
440                    side: qp_side,
441                    price: leg.price,
442                    size: to_contract_units_decimal(&leg.instrument, leg.size),
443                    tif: hypercall_types::TimeInForce::IOC,
444                    client_id: None,
445                    order_id: None,
446                    is_perp: false,
447                    underlying: leg.instrument.split('-').next().map(|s| s.to_string()),
448                    reduce_only: Some(false),
449                    nonce: None,
450                    signature: None,
451                    mmp_enabled: false,
452                    builder_code_address: None,
453                }
454            })
455            .collect();
456
457        for order in &qp_orders {
458            if let Err(reason) = self.check_order_limits(&cmd.qp_wallet, order) {
459                return Err(RfqExecuteResult::Failed {
460                    reason: format!("QP order limit failed: {}", reason),
461                });
462            }
463            if let Err(reason) = LiquidationManager::check_preliquidation_order_allowed(
464                &self.ctx.deps,
465                &self.ctx.engine_positions,
466                &cmd.qp_wallet,
467                order,
468            ) {
469                return Err(RfqExecuteResult::Failed {
470                    reason: format!("QP pre-liquidation check failed: {}", reason),
471                });
472            }
473            if let Err(reason) = self.margin_manager.check_tier_restrictions(
474                &self.ctx.deps,
475                &self.ctx.engine_positions,
476                &self.ctx.balance_ledger,
477                &cmd.qp_wallet,
478                order,
479                &self.ctx.order_index,
480            ) {
481                return Err(RfqExecuteResult::Failed {
482                    reason: format!("QP tier restriction failed: {}", reason),
483                });
484            }
485        }
486
487        if let Err(reason) = self.check_pm_settlement_pool_gate_for_order_groups(&[
488            (&cmd.taker_wallet, &taker_orders),
489            (&cmd.qp_wallet, &qp_orders),
490        ]) {
491            return Err(RfqExecuteResult::Failed {
492                reason: format!("RFQ PM settlement gate failed: {}", reason),
493            });
494        }
495
496        if let Err(reason) = self.margin_manager.check_margin_for_quote_provider_orders(
497            &self.ctx.deps,
498            &self.ctx.engine_positions,
499            &self.ctx.balance_ledger,
500            &cmd.qp_wallet,
501            &qp_orders,
502            &self.ctx.order_index,
503        ) {
504            return Err(RfqExecuteResult::Failed {
505                reason: format!("QP margin check failed: {}", reason),
506            });
507        }
508
509        // 4. Validate that the signed `net_premium` (which the taker's
510        //    EIP-712 signature committed to) matches the legs the QP sent,
511        //    INCLUDING the sign. Without this check, a malformed or
512        //    malicious quote where `net_premium` diverges from the legs
513        //    would let the taker authorize one amount while the engine
514        //    debits/credits another.
515        //
516        //    Unit + sign convention (matches `rfq_responder::price_rfq`):
517        //    - `leg.size` on `RfqExecuteLeg` is in HUMAN-READABLE contracts
518        //      (same as the frontend submits), NOT contract units. So the
519        //      leg contribution is `leg.price * leg.size` with no unit
520        //      divisor.
521        //    - Taker buys → premium contribution is negative (taker pays).
522        //      Taker sells → positive (taker receives). So the signed
523        //      `net_premium` matches `sum(leg.price * leg.size * sign)`.
524        let leg_implied_premium: rust_decimal::Decimal = cmd
525            .legs
526            .iter()
527            .map(|l| {
528                let magnitude = l.price * l.size;
529                match l.taker_side {
530                    Side::Buy => -magnitude,
531                    Side::Sell => magnitude,
532                }
533            })
534            .sum();
535        if leg_implied_premium != cmd.net_premium {
536            return Err(RfqExecuteResult::Failed {
537                reason: format!(
538                    "Leg-implied premium {} does not match signed net_premium {} for rfq_id={}",
539                    leg_implied_premium, cmd.net_premium, cmd.rfq_id
540                ),
541            });
542        }
543
544        let plan = self.build_rfq_execution_plan_unchecked(cmd);
545
546        Ok(plan)
547    }
548
549    /// Build RFQ fill events without re-running live acceptance checks.
550    ///
551    /// This is used by standby NATS replay after the primary has already
552    /// accepted and journaled the RFQ. Replay must not reject a command that
553    /// the primary accepted, and it must not block on live-only validation.
554    pub(crate) fn build_rfq_execution_plan_unchecked(
555        &mut self,
556        cmd: &RfqExecuteCommand,
557    ) -> RfqExecutionPlan {
558        let fill_id = cmd.fill_id.clone();
559        let timestamp = cmd.timestamp_ms;
560
561        let mut events: Vec<EngineMessage> = Vec::with_capacity(cmd.legs.len() + 1);
562        let mut rfq_fill_legs: Vec<RfqFillLeg> = Vec::with_capacity(cmd.legs.len());
563        let mut mmp_updates: Vec<MmpFillUpdate> = Vec::with_capacity(cmd.legs.len());
564
565        for leg in &cmd.legs {
566            let trade_id = self.ctx.next_trade_id;
567            self.ctx.next_trade_id += 1;
568
569            // Convert human-readable size to contract units (internal format)
570            let size_in_contract_units = to_contract_units_decimal(&leg.instrument, leg.size);
571            let price_f64 = leg
572                .price
573                .to_f64()
574                .expect("CRITICAL_FAILURE: RFQ leg price is not representable as f64");
575            let size_u64 = size_in_contract_units.trunc().to_u64().expect(
576                "CRITICAL_FAILURE: RFQ leg size is not representable as u64 contract units",
577            );
578            let fee_calc = self.fee_service.get_fees(
579                &cmd.qp_wallet.as_hex(),
580                &cmd.taker_wallet.as_hex(),
581                price_f64,
582                size_u64,
583                cmd.builder_code_address.as_ref(),
584            );
585            let taker_fee = rust_decimal::Decimal::from_f64_retain(fee_calc.taker_fee)
586                .expect("CRITICAL_FAILURE: RFQ taker fee is not representable as Decimal");
587
588            let mut fill = Fill {
589                trade_id,
590                taker_order_id: 0,
591                maker_order_id: 0,
592                symbol: leg.instrument.clone(),
593                price: leg.price,
594                size: size_in_contract_units,
595                taker_side: leg.taker_side,
596                taker_wallet_address: cmd.taker_wallet,
597                maker_wallet_address: cmd.qp_wallet,
598                fee: taker_fee,
599                is_taker: true,
600                timestamp,
601                builder_code_address: cmd.builder_code_address,
602                builder_code_fee: fee_calc.builder_code_fee,
603                source: FillSource::Rfq {
604                    rfq_id: cmd.rfq_id.clone(),
605                    quote_id: cmd.quote_id.clone(),
606                },
607                taker_realized_pnl: None,
608                maker_realized_pnl: None,
609                underlying_notional: None,
610            };
611            self.attach_fill_underlying_notional(&mut fill);
612
613            events.push(EngineMessage::OrderFilled {
614                accounting: hypercall_engine::FillAccounting::zero(fill.trade_id),
615                fill: fill.clone(),
616            });
617
618            rfq_fill_legs.push(RfqFillLeg {
619                instrument: leg.instrument.clone(),
620                taker_side: leg.taker_side,
621                price: leg.price,
622                size: leg.size,
623            });
624
625            if let Some(underlying) = leg.instrument.split('-').next() {
626                mmp_updates.push(MmpFillUpdate {
627                    qp_wallet: cmd.qp_wallet,
628                    underlying: underlying.to_string(),
629                    fill,
630                    timestamp_ms: timestamp,
631                });
632            }
633        }
634
635        // Append the summary RfqFilled event last so consumers see the
636        // per-leg fills before the rollup.
637        events.push(EngineMessage::RfqFilled(RfqFillMessage {
638            fill_id: fill_id.clone(),
639            rfq_id: cmd.rfq_id.clone(),
640            quote_id: cmd.quote_id.clone(),
641            taker_wallet: cmd.taker_wallet,
642            qp_wallet: cmd.qp_wallet,
643            legs: rfq_fill_legs,
644            net_premium: cmd.net_premium,
645            taker_accept_signature: cmd.taker_signature.clone(),
646            timestamp,
647        }));
648
649        RfqExecutionPlan {
650            fill_id,
651            events,
652            mmp_updates,
653        }
654    }
655}