Skip to main content

hypercall/rsm/unified_engine/
journaling.rs

1//! Journal and idempotency orchestration for `UnifiedEngine`.
2
3use super::*;
4
5/// Strategy for persisting journal entries after order processing.
6enum JournalStrategy {
7    /// Push entry to batch channel for async write, ACK immediately
8    Batched(crate::journal::JournalBatchSender),
9    /// Write synchronously to journal, panic on failure
10    Sync(crate::journal::SharedEngineJournalWriter),
11}
12
13impl JournalStrategy {
14    /// Select the appropriate journal strategy based on available writers.
15    /// Prefers batched mode if a batch sender is configured.
16    fn select(
17        batch_sender: &Option<crate::journal::JournalBatchSender>,
18        sync_writer: crate::journal::SharedEngineJournalWriter,
19    ) -> Self {
20        if let Some(sender) = batch_sender.clone() {
21            JournalStrategy::Batched(sender)
22        } else {
23            JournalStrategy::Sync(sync_writer)
24        }
25    }
26}
27
28impl UnifiedEngine {
29    /// Process an order with durable journaling.
30    ///
31    /// This method provides:
32    /// - Idempotency: duplicate request_id returns cached response
33    /// - Restart-safe ACK: response only sent after DB commit
34    /// - Full event capture for replay
35    ///
36    /// The flow is:
37    /// 1. Check journal for existing request_id (idempotency)
38    /// 2. If found: return cached response
39    /// 3. If new: process order, journal transition, then ACK
40    ///
41    /// When batch_sender is set, journaling is async (push to channel, ACK immediately).
42    /// When batch_sender is not set, falls back to synchronous journaling.
43    pub(super) async fn process_order_journaled(&mut self, request: UnifiedEngineRequest) {
44        let start = Instant::now();
45
46        let request_id = request.message.request_id.clone();
47
48        // Extract request_id - non-journaled fallback is allowed only in tests.
49        let Some(ref req_id) = request_id else {
50            counter!("ht_engine_journal_fallback_total", "reason" => "missing_request_id")
51                .increment(1);
52            if cfg!(test) {
53                warn!("Order missing request_id, falling back to non-journaled processing");
54                self.process_order(request).await;
55                return;
56            }
57            panic!("JOURNAL_FATAL: request_id missing; refusing to process without idempotency.");
58        };
59        let req_uuid = uuid::Uuid::parse_str(req_id).unwrap_or_else(|e| {
60            panic!(
61                "JOURNAL_FATAL: request_id '{}' is not a valid UUID: {}. \
62                 Refusing to process without a canonical idempotency key.",
63                req_id, e
64            )
65        });
66
67        let queue_wait = request.enqueued_at.elapsed().as_secs_f64();
68        histogram!("ht_engine_queue_wait_seconds").record(queue_wait);
69        decrement_pending_requests();
70        gauge!("ht_engine_pending_requests").set(get_pending_requests() as f64);
71
72        // Get journal writer for idempotency check
73        let journal_writer = match &self.journal_writer {
74            Some(w) => w.clone(),
75            None => {
76                // No journal writer - if we have batch sender, we can still proceed
77                // (idempotency won't work but batching will)
78                if self.journal_batch_sender.is_none() {
79                    // No journal configured - this should never happen as builder validates
80                    error!(
81                        "Journal unavailable: no writer or batch_sender configured. \
82                           This is a configuration error - use .with_mock_journal() for tests."
83                    );
84                    counter!("ht_engine_journal_error_total", "reason" => "missing_writer")
85                        .increment(1);
86                    panic!(
87                        "JOURNAL_FATAL: no writer or batch_sender configured for request_id={}. \
88                         Refusing to process without durability.",
89                        req_id
90                    );
91                }
92                // Proceed without idempotency check - add to known set after processing
93                warn!("No journal_writer for idempotency check, proceeding with batch-only mode");
94                let sender = self
95                    .journal_batch_sender
96                    .as_ref()
97                    .expect("batch sender missing in batch-only journal mode")
98                    .clone();
99                self.process_order_journaled_impl(
100                    request,
101                    req_id,
102                    req_uuid,
103                    start,
104                    JournalStrategy::Batched(sender),
105                )
106                .await;
107                self.known_request_ids.insert(req_uuid);
108                return;
109            }
110        };
111
112        // Fast path: check in-memory cache first.
113        // Cache entries are loaded from a configured lookback window and the cache is bounded.
114        // A miss here means request_id is either new, or older than the cache window and
115        // assumed not retried, so we skip the DB lookup.
116        if !self.known_request_ids.contains(&req_uuid) {
117            // New request - no need to check DB, proceed directly
118            counter!("ht_engine_journal_cache_miss_total").increment(1);
119
120            // Process the order (batched or sync path)
121            let strategy = JournalStrategy::select(&self.journal_batch_sender, journal_writer);
122            self.process_order_journaled_impl(request, req_id, req_uuid, start, strategy)
123                .await;
124
125            // Add to known set so future duplicates (in this session) will be caught
126            self.known_request_ids.insert(req_uuid);
127            return;
128        }
129
130        // Slow path: request_id is in known set (either from startup load or this session).
131        // Query DB to check if it's actually a duplicate with a cached response.
132        counter!("ht_engine_journal_cache_hit_total").increment(1);
133
134        let idempotency_span = info_span!("journal_idempotency_check");
135        let req_id_clone = req_uuid;
136        let journal_clone = journal_writer.clone();
137        let idempotency_result = async {
138            tokio::task::spawn_blocking(move || journal_clone.get_by_request_id(&req_id_clone))
139                .await
140        }
141        .instrument(idempotency_span)
142        .await;
143
144        match idempotency_result {
145            Ok(Ok(Some(existing))) => {
146                // Idempotent hit - return cached response
147                debug!("Journal idempotency hit for request_id: {}", req_id);
148                counter!("ht_engine_journal_idempotency_hit_total").increment(1);
149
150                if let Some(cached_response) = existing.decode_response() {
151                    let _ = request.response_tx.send(cached_response).await;
152                    return;
153                }
154                panic!(
155                    "JOURNAL_FATAL: no response_data in cached journal record \
156                     for request_id={}",
157                    req_id
158                );
159            }
160            Ok(Ok(None)) => {
161                // request_id was in cache but not in DB - cache may have been stale
162                // or this is a re-used request_id within this session (shouldn't happen with UUIDs)
163                debug!(
164                    "Journal miss for request_id: {} (was in cache) - processing",
165                    req_id
166                );
167            }
168            Ok(Err(e)) => {
169                error!("Journal query failed for request_id: {}: {}", req_id, e);
170                counter!("ht_engine_journal_error_total", "reason" => "query_error").increment(1);
171                panic!(
172                    "JOURNAL_FATAL: journal query failed for request_id={}: {}. \
173                     Cannot guarantee idempotency, restart required.",
174                    req_id, e
175                );
176            }
177            Err(e) => {
178                error!("spawn_blocking failed for idempotency check: {}", e);
179                counter!("ht_engine_journal_error_total", "reason" => "spawn_error").increment(1);
180                panic!(
181                    "JOURNAL_FATAL: spawn_blocking failed for idempotency check: {}. \
182                     Runtime is degraded, restart required.",
183                    e
184                );
185            }
186        }
187
188        // Slow path fallthrough: request_id was in cache but not in DB.
189        // Process the order (batched or sync path).
190        let strategy = JournalStrategy::select(&self.journal_batch_sender, journal_writer);
191        self.process_order_journaled_impl(request, req_id, req_uuid, start, strategy)
192            .await;
193
194        // request_id is already in known_request_ids (that's why we hit the slow path),
195        // but we processed it so no need to insert again.
196    }
197
198    /// Process order with journaling using the specified strategy.
199    async fn process_order_journaled_impl(
200        &mut self,
201        request: UnifiedEngineRequest,
202        req_id: &str,
203        req_uuid: uuid::Uuid,
204        start: Instant,
205        strategy: JournalStrategy,
206    ) {
207        let engine_span = info_span!("engine_step");
208        let timestamp = request.message.timestamp;
209
210        use crate::observability::command_trace::EngineStateDigest;
211        let pre_digest: EngineStateDigest = Default::default();
212
213        let original_response_tx = request.response_tx;
214
215        let env = crate::rsm::apply::CommandEnvelope::new(
216            timestamp,
217            crate::rsm::apply::EngineCommand::OrderAction(request.message.clone()),
218        );
219        let apply_output = match engine_span.in_scope(|| self.apply(env)) {
220            Ok(output) => output,
221            Err(crate::rsm::unified_engine::apply_interface::EngineError::Rejected(reason)) => {
222                warn!(
223                    request_id = %req_id,
224                    reason = %reason,
225                    "Order rejected by engine (nonce or admission)"
226                );
227                counter!("ht_engine_order_rejected_total", "reason" => "nonce").increment(1);
228                let rejection = OrderUpdateMessage {
229                    timestamp,
230                    info: request.message.info.clone(),
231                    status: hypercall_types::OrderUpdateStatus::Rejected,
232                    reason: Some(reason),
233                    filled_size: rust_decimal::Decimal::ZERO,
234                    order_id: None,
235                    wallet_address: request.message.wallet,
236                    mmp_triggered: false,
237                    request_id: request.message.request_id.clone(),
238                };
239                let _ = original_response_tx.send(rejection).await;
240                self.known_request_ids.insert(req_uuid);
241                return;
242            }
243            Err(e) => {
244                panic!(
245                    "JOURNAL_FATAL: apply() failed for request_id={}: {}",
246                    req_id, e
247                )
248            }
249        };
250        let engine_duration = start.elapsed();
251        histogram!("ht_engine_step_seconds").record(engine_duration.as_secs_f64());
252
253        #[cfg(feature = "rsm-state")]
254        let identity_hash = apply_output.command_identity_hash;
255        let captured_balance_updates = apply_output.balance_updates;
256        let captured_events = apply_output.events;
257        let response = apply_output
258            .order_responses
259            .into_iter()
260            .last()
261            .unwrap_or_else(|| {
262                error!(
263                    "No response received from apply() for request_id: {}",
264                    req_id
265                );
266                counter!("ht_engine_journal_error_total", "reason" => "no_response").increment(1);
267                panic!(
268                    "JOURNAL_FATAL: apply() emitted no response for request_id={}. \
269                     This violates engine response invariants.",
270                    req_id
271                );
272            });
273
274        // For the journal, we need the FINAL order status, not just the preliminary ACK.
275        // `allocate_and_ack` sends an Acked response to response_tx, but `finalize_order`
276        // sends the final status (Filled/Open/PartiallyFilled/Canceled) only to event_sender.
277        // The proxy channel thus contains Acked, but the real final status is in the captured
278        // events. Without this fix, all create orders would be journaled with Acked status,
279        // causing replay to add them all to the book (including fully-filled takers and
280        // MMP-cancelled orders), leading to state regressions after SIGKILL.
281        let journal_response = if request.message.action == OrderAction::CreateOrder
282            || request.message.action == OrderAction::ReplaceOrder
283        {
284            // Scan captured events for the LAST OrderUpdate matching this order_id.
285            // This captures the final status from finalize_order.
286            captured_events
287                .iter()
288                .rev()
289                .find_map(|e| {
290                    if let EngineMessage::OrderUpdate(update) = e {
291                        if update.order_id == response.order_id {
292                            Some(update.clone())
293                        } else {
294                            None
295                        }
296                    } else {
297                        None
298                    }
299                })
300                .unwrap_or_else(|| {
301                    if response.status == OrderUpdateStatus::Acked {
302                        panic!(
303                            "JOURNAL_FATAL: no OrderUpdate captured for order_id={:?}, \
304                             request_id={}: no OrderUpdate found in {} captured events. \
305                             finalize_order did not emit a terminal update; refusing to journal Acked.",
306                            response.order_id,
307                            req_id,
308                            captured_events.len()
309                        );
310                    }
311                    response.clone()
312                })
313        } else {
314            // Cancel orders send the final response directly to response_tx
315            response.clone()
316        };
317
318        let post_digest: EngineStateDigest = Default::default();
319        #[cfg(feature = "rsm-state")]
320        let post_rsm_state_digest = crate::rsm::engine_snapshot::EngineStateDigest::from_ctx(
321            &self.ctx,
322            self.ctx
323                .l2_update_seq
324                .load(std::sync::atomic::Ordering::SeqCst),
325        );
326        let duration_ms = start.elapsed().as_millis() as u64;
327
328        // Serialize command and response for journal (wire format:
329        // [version byte][msgpack payload]).
330        // Patch the command with the engine-assigned order_id from the response
331        // so journal replay can reconstruct orders correctly.
332        // For ReplaceOrder, info.order_id is the cancel target - don't overwrite it.
333        // The new order's ID is available in the journal response for replay.
334        let mut command_for_journal = request.message.clone();
335        if request.message.action != OrderAction::ReplaceOrder {
336            if let Some(order_id) = journal_response.order_id {
337                command_for_journal.info.order_id = Some(order_id);
338            }
339        }
340        let command_data = hypercall_types::serialize_to_wire_bytes(&command_for_journal);
341
342        let response_data = Some(hypercall_types::serialize_to_wire_bytes(&journal_response));
343        let order_id_i64 = journal_response.order_id.map(|id| {
344            i64::try_from(id).unwrap_or_else(|_| {
345                panic!(
346                    "JOURNAL_FATAL: order_id {} exceeds i64 range for request_id={}",
347                    id, req_id
348                )
349            })
350        });
351        let command_type_enum = Some(match request.message.action {
352            OrderAction::CreateOrder => hypercall_db_diesel::engine_enums::CommandType::CreateOrder,
353            OrderAction::CancelOrder => hypercall_db_diesel::engine_enums::CommandType::CancelOrder,
354            OrderAction::ReplaceOrder => {
355                hypercall_db_diesel::engine_enums::CommandType::ReplaceOrder
356            }
357        });
358
359        // Serialize events for batched journal path.
360        let event_payloads = self.serialize_events_for_journal(&captured_events);
361        let portfolio_projection_guard =
362            if let Some(portfolio_cache) = &self.ctx.deps.portfolio_cache {
363                Some(portfolio_cache.lock_projection_barrier().await)
364            } else {
365                None
366            };
367        let fill_side_effects = fill_side_effects_from_events(&captured_events);
368
369        let req_uuid = hypercall_db_diesel::engine_enums::DbUuid(req_uuid);
370
371        // Execute strategy-specific journaling
372        match strategy {
373            JournalStrategy::Batched(sender) => {
374                // Create commit_ack channel so we wait for DB commit before emitting
375                // events or ACKing the client. This prevents SIGKILL from causing
376                // state regressions (client saw cancel, but journal didn't persist it).
377                let (ack_tx, ack_rx) = tokio::sync::oneshot::channel();
378
379                let entry = crate::journal::JournalEntry {
380                    received_ts_ms: timestamp,
381                    command_data: command_data.clone(),
382                    response_data: response_data.clone(),
383                    order_id: order_id_i64,
384                    pre_digest: pre_digest.clone(),
385                    post_digest: post_digest.clone(),
386                    duration_ms,
387                    events: event_payloads,
388                    outbox_appends: Vec::new(),
389                    fill_side_effects: fill_side_effects.clone(),
390                    cash_withdrawal_side_effect: None,
391                    balance_updates: captured_balance_updates.clone(),
392                    created_at: Instant::now(),
393                    commit_ack: Some(ack_tx),
394                    request_uuid: req_uuid,
395                    command_type_enum,
396                    #[cfg(feature = "rsm-state")]
397                    command_identity_hash: identity_hash,
398                    #[cfg(feature = "rsm-state")]
399                    rsm_state_digest: Some(post_rsm_state_digest.clone()),
400                };
401
402                // Push to batch channel (non-blocking unless channel is full)
403                let push_span = info_span!("journal_batch_push");
404
405                async {
406                    let msg = crate::journal::JournalMessage::Entry(entry);
407                    match sender.try_send(msg) {
408                        Ok(()) => {
409                            counter!("ht_journal_batch_pushed_total").increment(1);
410                            debug!("Pushed journal entry for request_id: {}", req_id);
411                        }
412                        Err(mpsc::error::TrySendError::Full(msg)) => {
413                            // Channel full - block until space available (backpressure)
414                            warn!("Journal batch channel full, applying backpressure");
415                            counter!("ht_journal_batch_backpressure_total").increment(1);
416                            gauge!("ht_journal_batch_channel_full").set(1.0);
417
418                            if let Err(e) = sender.send(msg).await {
419                                panic!(
420                                    "CRITICAL_FAILURE: Failed to send journal entry for request_id {}: {}. \
421                                     Journal channel closed - idempotency will be broken. Restart required.",
422                                    req_id, e
423                                );
424                            }
425                            gauge!("ht_journal_batch_channel_full").set(0.0);
426                        }
427                        Err(mpsc::error::TrySendError::Closed(_)) => {
428                            panic!(
429                                "CRITICAL_FAILURE: Journal batch channel closed for request_id {}. \
430                                 Journal system is dead - idempotency will be broken. Restart required.",
431                                req_id
432                            );
433                        }
434                    }
435                }
436                .instrument(push_span)
437                .await;
438
439                // Wait for the WAL-first journal batcher to durably fsync this entry.
440                // Only THEN emit events and ACK the client - this guarantees that
441                // anything the client sees is durable and will survive SIGKILL.
442                match ack_rx.await {
443                    Ok(()) => {}
444                    Err(_) => {
445                        panic!(
446                            "CRITICAL_FAILURE: Journal commit_ack dropped for request_id {}. \
447                             Durability boundary is unknown after state mutation. \
448                             Engine must stop to avoid running with non-durable state.",
449                            req_id
450                        );
451                    }
452                }
453
454                // NATS publish after WAL durability confirmed — standby only
455                // sees commands that survived fsync.
456                if let Some(ref publisher) = self.nats_publisher {
457                    publisher
458                        .publish(crate::nats::CommandType::Order, &command_data)
459                        .await;
460                }
461                self.publish_balance_updates_to_nats(&captured_balance_updates)
462                    .await;
463
464                // Keep portfolio state in sync before emitting events.
465                let pending_notifications = self
466                    .apply_portfolio_projection_events_sync(&captured_events, req_id)
467                    .await;
468
469                // NOW emit events and ACK - data is durable
470                for event in &captured_events {
471                    self.ctx.deps.emit_event(event);
472                }
473                drop(portfolio_projection_guard);
474
475                self.send_portfolio_notifications(pending_notifications)
476                    .await;
477
478                self.publish_snapshot();
479                let _ = original_response_tx.send(response).await;
480                counter!("ht_engine_journal_commits_total").increment(1);
481                histogram!("ht_engine_journaled_seconds").record(start.elapsed().as_secs_f64());
482            }
483
484            JournalStrategy::Sync(journal_writer) => {
485                // Synchronous write with spawn_blocking
486                let postgres_span = info_span!("journal_postgres_insert");
487                let req_uuid_for_journal = req_uuid;
488                let command_data_for_journal = command_data.clone();
489                let response_data_for_journal = response_data.clone();
490                let order_id_for_journal = order_id_i64;
491                let pre_digest_for_journal = pre_digest.clone();
492                let post_digest_for_journal = post_digest.clone();
493                let events_for_journal = captured_events.clone();
494                let fill_side_effects_for_journal = fill_side_effects.clone();
495                let balance_updates_for_journal = captured_balance_updates.clone();
496                let command_type_enum_for_journal = command_type_enum;
497                let append_result = async {
498                    let pg_start = Instant::now();
499                    let result = tokio::task::spawn_blocking(move || {
500                        journal_writer.append_transition_with_fill_side_effects(
501                            timestamp,
502                            &command_data_for_journal,
503                            response_data_for_journal.as_deref(),
504                            order_id_for_journal,
505                            &pre_digest_for_journal,
506                            &post_digest_for_journal,
507                            duration_ms,
508                            &events_for_journal,
509                            &fill_side_effects_for_journal,
510                            &balance_updates_for_journal,
511                            req_uuid_for_journal,
512                            command_type_enum_for_journal,
513                        )
514                    })
515                    .await;
516                    histogram!("ht_journal_postgres_insert_seconds")
517                        .record(pg_start.elapsed().as_secs_f64());
518                    result
519                }
520                .instrument(postgres_span)
521                .await;
522
523                match append_result {
524                    Ok(Ok(result)) => {
525                        let pending_notifications = if result.was_new_insert {
526                            debug!(
527                                "Journaled request_id: {} with command_id: {}",
528                                req_id, result.command_id
529                            );
530                            counter!("ht_engine_journal_commits_total").increment(1);
531
532                            if let Some(ref publisher) = self.nats_publisher {
533                                publisher
534                                    .publish(crate::nats::CommandType::Order, &command_data)
535                                    .await;
536                            }
537                            self.publish_balance_updates_to_nats(&captured_balance_updates)
538                                .await;
539
540                            Some(
541                                self.apply_portfolio_projection_events_sync(
542                                    &captured_events,
543                                    req_id,
544                                )
545                                .await,
546                            )
547                        } else {
548                            debug!(
549                                "Journal race: request_id: {} already committed by command_id: {}",
550                                req_id, result.command_id
551                            );
552                            counter!("ht_engine_journal_race_total").increment(1);
553                            None
554                        };
555
556                        // Emit events and send response - journal is durable
557                        for event in &captured_events {
558                            self.ctx.deps.emit_event(event);
559                        }
560                        drop(portfolio_projection_guard);
561
562                        if let Some(pending_notifications) = pending_notifications {
563                            self.send_portfolio_notifications(pending_notifications)
564                                .await;
565                        }
566
567                        self.publish_snapshot();
568                        let _ = original_response_tx.send(response).await;
569                    }
570                    Ok(Err(e)) => {
571                        // CRITICAL: State was mutated but journal append failed.
572                        // We MUST NOT continue running with mutated-but-unlogged state.
573                        error!(
574                            "FATAL: Journal append failed for request_id: {} after state mutation: {}",
575                            req_id, e
576                        );
577                        counter!("ht_engine_journal_error_total", "reason" => "append_failed")
578                            .increment(1);
579                        panic!(
580                            "JOURNAL_FATAL: Journal append failed after state mutation for request_id={}: {}. \
581                            Engine state is now inconsistent and unrecoverable. \
582                            Restart required to restore from last consistent snapshot.",
583                            req_id, e
584                        );
585                    }
586                    Err(e) => {
587                        // CRITICAL: spawn_blocking failed
588                        error!(
589                            "FATAL: spawn_blocking failed for journal write, request_id: {}: {}",
590                            req_id, e
591                        );
592                        counter!("ht_engine_journal_error_total", "reason" => "spawn_error")
593                            .increment(1);
594                        panic!(
595                            "JOURNAL_FATAL: spawn_blocking failed for journal write, request_id={}: {}. \
596                            Engine state is now inconsistent and unrecoverable. \
597                            Restart required to restore from last consistent snapshot.",
598                            req_id, e
599                        );
600                    }
601                }
602
603                histogram!("ht_engine_order_processing_journaled_seconds")
604                    .record(start.elapsed().as_secs_f64());
605            }
606        };
607    }
608
609    /// Serialize events for journal.
610    /// Produces wire-format bytes (version byte + msgpack) along with topic, key, and l2_sequence.
611    /// These bytes are stored directly in the DB, avoiding double deserialization.
612    fn serialize_events_for_journal(
613        &self,
614        events: &[EngineMessage],
615    ) -> Vec<crate::journal::engine_journal_batcher::EventPayload> {
616        use crate::journal::engine_journal_batcher::EventPayload;
617        events
618            .iter()
619            .map(|e| {
620                let event_data = e
621                    .serialize_inner()
622                    .expect("Failed to serialize EngineMessage - this is a bug");
623                let event_topic = e.topic().to_string();
624                let event_key = e.partition_key();
625                let l2_sequence = match e {
626                    EngineMessage::L2Update(l2) => l2.sequence,
627                    _ => None,
628                };
629                EventPayload {
630                    event_type_enum:
631                        hypercall_db_diesel::engine_enums::EventType::from_engine_message(e),
632                    event_topic,
633                    event_key,
634                    event_data,
635                    l2_sequence,
636                }
637            })
638            .collect()
639    }
640
641    /// Apply captured fill events to PortfolioCache synchronously.
642    ///
643    /// This guarantees that the next margin check sees post-fill portfolio state
644    /// without waiting for event-bus round trips.
645    pub(crate) async fn apply_portfolio_projection_events_sync(
646        &self,
647        events: &[EngineMessage],
648        req_id: &str,
649    ) -> Vec<crate::read_cache::portfolio::PendingNotifications> {
650        let Some(portfolio_cache) = &self.ctx.deps.portfolio_cache else {
651            return Vec::new();
652        };
653
654        let mut pending_notifications = Vec::new();
655
656        for event in events {
657            match event {
658                EngineMessage::OrderFilled { fill, .. } => {
659                    let seq = i64::try_from(fill.trade_id).unwrap_or_else(|_| {
660                        panic!(
661                            "CRITICAL_FAILURE: trade_id {} does not fit into i64 for synchronous portfolio update",
662                            fill.trade_id
663                        )
664                    });
665                    portfolio_cache
666                        .handle_order_filled_under_barrier(fill, seq)
667                        .await;
668                    pending_notifications.push(
669                        crate::read_cache::portfolio::PendingNotifications::Fill {
670                            fill: fill.clone(),
671                        },
672                    );
673                    debug!(
674                        "Applied synchronous portfolio fill update: request_id={}, trade_id={}, symbol={}",
675                        req_id, fill.trade_id, fill.symbol
676                    );
677                }
678                EngineMessage::PositionExpired(expiry_msg) => {
679                    pending_notifications.push(
680                        portfolio_cache
681                            .handle_position_expired_under_barrier(expiry_msg, None)
682                            .await,
683                    );
684                    debug!(
685                        "Applied synchronous portfolio expiry update: request_id={}, wallet={}, symbol={}",
686                        req_id, expiry_msg.wallet_address, expiry_msg.symbol
687                    );
688                }
689                _ => {}
690            }
691        }
692
693        pending_notifications
694    }
695
696    /// Send portfolio notifications after the projection barrier has been
697    /// released. This keeps WebSocket sends off the hot
698    /// mutation path.
699    async fn send_portfolio_notifications(
700        &self,
701        pending_notifications: Vec<crate::read_cache::portfolio::PendingNotifications>,
702    ) {
703        let Some(portfolio_cache) = &self.ctx.deps.portfolio_cache else {
704            return;
705        };
706
707        for pending in pending_notifications {
708            portfolio_cache.send_pending_notifications(pending).await;
709        }
710    }
711
712    pub(crate) async fn apply_replayed_events_sync(
713        &mut self,
714        events: &[EngineMessage],
715        req_id: &str,
716    ) {
717        if events.is_empty() {
718            return;
719        }
720
721        let portfolio_projection_guard =
722            if let Some(portfolio_cache) = &self.ctx.deps.portfolio_cache {
723                Some(portfolio_cache.lock_projection_barrier().await)
724            } else {
725                None
726            };
727        let pending_notifications = self
728            .apply_portfolio_projection_events_sync(events, req_id)
729            .await;
730        for event in events {
731            self.ctx.deps.emit_event(event);
732        }
733        drop(portfolio_projection_guard);
734        self.send_portfolio_notifications(pending_notifications)
735            .await;
736    }
737
738    pub(crate) async fn apply_startup_replayed_events_sync(
739        &mut self,
740        events: &[EngineMessage],
741        req_id: &str,
742    ) {
743        if events.is_empty() {
744            return;
745        }
746
747        let Some(portfolio_cache) = self.ctx.deps.portfolio_cache.clone() else {
748            for event in events {
749                self.ctx.deps.emit_event(event);
750            }
751            return;
752        };
753
754        let portfolio_projection_guard = portfolio_cache.lock_projection_barrier().await;
755        let mut pending_notifications = Vec::new();
756
757        for event in events {
758            if let EngineMessage::PositionExpired(expiry_msg) = event {
759                pending_notifications.push(
760                    portfolio_cache
761                        .handle_replayed_position_expired_projection_under_barrier(expiry_msg)
762                        .await,
763                );
764                debug!(
765                    "Applied startup replay expiry projection: request_id={}, wallet={}, symbol={}",
766                    req_id, expiry_msg.wallet_address, expiry_msg.symbol
767                );
768            }
769        }
770
771        for event in events {
772            self.ctx.deps.emit_event(event);
773        }
774        drop(portfolio_projection_guard);
775        self.send_portfolio_notifications(pending_notifications)
776            .await;
777    }
778
779    /// Process an RFQ execution with durable journaling.
780    ///
781    /// Mirrors `process_order_journaled` but for `RfqExecuteCommand`. The
782    /// caller (`handle_rfq_execute`) has already produced the planned
783    /// `Vec<EngineMessage>` (per-leg `OrderFilled`s + summary `RfqFilled`)
784    /// via the pure `plan_rfq_execution` function — this method is
785    /// responsible for:
786    ///
787    /// 1. Translating engine-emitted `FillAccounting` into journal fill side effects.
788    /// 2. Serializing the command + events as a journal entry tagged
789    ///    `CommandType::RfqExecute`.
790    /// 3. Pushing to the journal batcher (or sync writer) and waiting for
791    ///    the durable commit ACK.
792    /// 4. Applying the portfolio projection under barrier (uses
793    ///    `apply_portfolio_projection_events_sync`, the same function the
794    ///    orderbook path uses).
795    /// 5. Emitting the events to the WS bus AFTER the journal commit, so
796    ///    nothing visible to the client/UI happens before durability.
797    /// 6. Firing post-journal MMP updates (which are intentionally
798    ///    non-durable side effects).
799    /// 7. Sending the `RfqExecuteResult::Success { fill_id }` response.
800    ///
801    /// On any journal failure this panics rather than returning, matching
802    /// the orderbook journaling semantics — there is no safe way to
803    /// continue running with mutated-but-unlogged state.
804    pub(super) async fn process_rfq_journaled(
805        &mut self,
806        request: hypercall_runtime_api::RfqExecuteRequest,
807        fill_id: String,
808        captured_events: Vec<hypercall_types::EngineMessage>,
809        captured_balance_updates: Vec<hypercall_types::BalanceUpdate>,
810        mmp_updates: Vec<crate::rsm::apply::MmpFillUpdate>,
811    ) {
812        let start = Instant::now();
813        let cmd = &request.command;
814
815        // Idempotency cache lookup happens in `handle_rfq_execute` before
816        // we even reach planning, so this function only ever sees
817        // first-time-or-cache-stale commands. We still derive a stable
818        // request_uuid here so the journal entry is keyed correctly.
819        let req_id = if cmd.request_id.is_empty() {
820            warn!(
821                "RFQ command missing request_id (rfq_id={}, quote_id={}); \
822                 idempotency tracking disabled for this request",
823                cmd.rfq_id, cmd.quote_id
824            );
825            counter!("ht_engine_journal_fallback_total", "reason" => "missing_request_id")
826                .increment(1);
827            // Synthesize a deterministic UUID from (rfq_id, quote_id) so
828            // the journal still has *something* keyed to this command.
829            // We don't have the `uuid` crate's v5 feature enabled, so
830            // build the UUID by hashing the pair via sha3 and slicing
831            // the first 16 bytes — same effective property (deterministic
832            // for the same input, well-distributed) without pulling in
833            // another feature flag.
834            use sha3::{Digest, Sha3_256};
835            let mut hasher = Sha3_256::new();
836            hasher.update(cmd.rfq_id.as_bytes());
837            hasher.update(b":");
838            hasher.update(cmd.quote_id.as_bytes());
839            let digest = hasher.finalize();
840            let mut bytes = [0u8; 16];
841            bytes.copy_from_slice(&digest[..16]);
842            // Mark as v8 (custom) per RFC 4122 §5.8.
843            bytes[6] = (bytes[6] & 0x0F) | 0x80;
844            bytes[8] = (bytes[8] & 0x3F) | 0x80;
845            uuid::Uuid::from_bytes(bytes).to_string()
846        } else {
847            cmd.request_id.clone()
848        };
849
850        let req_uuid = uuid::Uuid::parse_str(&req_id).unwrap_or_else(|e| {
851            panic!(
852                "JOURNAL_FATAL: rfq command request_id '{}' is not a valid UUID: {}",
853                req_id, e
854            )
855        });
856
857        // Serialize command for journal. Wire format:
858        // [version_byte][msgpack payload]. We use the existing
859        // `serialize_to_wire_bytes` helper if it exists for RFQ, else
860        // build it inline.
861        let command_data = {
862            let mut buf = Vec::with_capacity(256);
863            // Version byte 1, matching the orderbook command format.
864            buf.push(1u8);
865            rmp_serde::encode::write_named(&mut buf, cmd).unwrap_or_else(|e| {
866                panic!(
867                    "JOURNAL_FATAL: failed to serialize RfqExecuteCommand for rfq_id={}: {}",
868                    cmd.rfq_id, e
869                )
870            });
871            buf
872        };
873
874        // RFQ doesn't have an OrderUpdateMessage equivalent, but we
875        // still want to cache the planned fill_id so duplicate dispatches
876        // get the SAME fill_id back instead of a fresh plan. Encode the
877        // fill_id as the response_data so `lookup_cached_rfq_response`
878        // can deserialize it.
879        let response_data: Option<Vec<u8>> =
880            Some(super::UnifiedEngine::encode_rfq_response_data(&fill_id));
881
882        let event_payloads = self.serialize_events_for_journal(&captured_events);
883        let portfolio_projection_guard =
884            if let Some(portfolio_cache) = &self.ctx.deps.portfolio_cache {
885                Some(portfolio_cache.lock_projection_barrier().await)
886            } else {
887                None
888            };
889        let fill_side_effects = fill_side_effects_from_events(&captured_events);
890
891        let timestamp = get_timestamp_millis();
892        use crate::observability::command_trace::EngineStateDigest;
893        let pre_digest: EngineStateDigest = Default::default();
894        let post_digest: EngineStateDigest = Default::default();
895        #[cfg(feature = "rsm-state")]
896        let post_rsm_state_digest = crate::rsm::engine_snapshot::EngineStateDigest::from_ctx(
897            &self.ctx,
898            self.ctx
899                .l2_update_seq
900                .load(std::sync::atomic::Ordering::SeqCst),
901        );
902        let duration_ms = start.elapsed().as_millis() as u64;
903        let req_uuid_db = hypercall_db_diesel::engine_enums::DbUuid(req_uuid);
904        let command_type_enum = Some(hypercall_db_diesel::engine_enums::CommandType::RfqExecute);
905
906        let journal_writer = match &self.journal_writer {
907            Some(w) => w.clone(),
908            None => {
909                error!(
910                    "Journal unavailable for RFQ: no writer configured. \
911                     Use .with_mock_journal() for tests."
912                );
913                panic!(
914                    "JOURNAL_FATAL: no journal_writer configured for RFQ rfq_id={}. \
915                     Refusing to process without durability.",
916                    cmd.rfq_id
917                );
918            }
919        };
920
921        let strategy = JournalStrategy::select(&self.journal_batch_sender, journal_writer);
922
923        match strategy {
924            JournalStrategy::Batched(sender) => {
925                let (ack_tx, ack_rx) = tokio::sync::oneshot::channel();
926
927                let entry = crate::journal::JournalEntry {
928                    received_ts_ms: timestamp,
929                    command_data: command_data.clone(),
930                    response_data: response_data.clone(),
931                    order_id: None,
932                    pre_digest: pre_digest.clone(),
933                    post_digest: post_digest.clone(),
934                    duration_ms,
935                    events: event_payloads,
936                    outbox_appends: Vec::new(),
937                    fill_side_effects: fill_side_effects.clone(),
938                    cash_withdrawal_side_effect: None,
939                    balance_updates: captured_balance_updates.clone(),
940                    created_at: Instant::now(),
941                    commit_ack: Some(ack_tx),
942                    request_uuid: req_uuid_db,
943                    command_type_enum,
944                    #[cfg(feature = "rsm-state")]
945                    command_identity_hash: crate::rsm::apply::EngineCommand::RfqExecute(
946                        cmd.clone(),
947                    )
948                    .identity_hash(),
949                    #[cfg(feature = "rsm-state")]
950                    rsm_state_digest: Some(post_rsm_state_digest.clone()),
951                };
952
953                let push_span = info_span!("journal_batch_push_rfq");
954                async {
955                    let msg = crate::journal::JournalMessage::Entry(entry);
956                    match sender.try_send(msg) {
957                        Ok(()) => {
958                            counter!("ht_journal_batch_pushed_total", "command" => "RfqExecute")
959                                .increment(1);
960                        }
961                        Err(mpsc::error::TrySendError::Full(msg)) => {
962                            warn!("Journal batch channel full (RFQ), applying backpressure");
963                            counter!(
964                                "ht_journal_batch_backpressure_total",
965                                "command" => "RfqExecute"
966                            )
967                            .increment(1);
968                            if let Err(e) = sender.send(msg).await {
969                                panic!(
970                                    "CRITICAL_FAILURE: Failed to send RFQ journal entry for rfq_id {}: {}. \
971                                     Journal channel closed - idempotency will be broken. Restart required.",
972                                    cmd.rfq_id, e
973                                );
974                            }
975                        }
976                        Err(mpsc::error::TrySendError::Closed(_)) => {
977                            panic!(
978                                "CRITICAL_FAILURE: Journal batch channel closed for RFQ rfq_id {}. \
979                                 Journal system is dead - idempotency will be broken. Restart required.",
980                                cmd.rfq_id
981                            );
982                        }
983                    }
984                }
985                .instrument(push_span)
986                .await;
987
988                // Wait for the WAL-first journal batcher to durably fsync
989                // this entry. ONLY THEN apply the projection / emit events
990                // / ACK — this is the durability boundary.
991                let wal_wait_start = Instant::now();
992                match ack_rx.await {
993                    Ok(()) => {}
994                    Err(_) => {
995                        panic!(
996                            "CRITICAL_FAILURE: RFQ journal commit_ack dropped for rfq_id {}. \
997                             Durability boundary unknown after planning; engine must stop.",
998                            cmd.rfq_id
999                        );
1000                    }
1001                }
1002                histogram!("ht_rfq_wal_wait_seconds")
1003                    .record(wal_wait_start.elapsed().as_secs_f64());
1004
1005                let nats_start = Instant::now();
1006                if let Some(ref publisher) = self.nats_publisher {
1007                    publisher
1008                        .publish(crate::nats::CommandType::RfqExecute, &command_data)
1009                        .await;
1010                }
1011                self.publish_balance_updates_to_nats(&captured_balance_updates)
1012                    .await;
1013                histogram!("ht_rfq_nats_publish_seconds")
1014                    .record(nats_start.elapsed().as_secs_f64());
1015
1016                let projection_start = Instant::now();
1017                let pending_notifications = self
1018                    .apply_portfolio_projection_events_sync(&captured_events, &req_id)
1019                    .await;
1020                histogram!("ht_rfq_projection_seconds")
1021                    .record(projection_start.elapsed().as_secs_f64());
1022
1023                for event in &captured_events {
1024                    self.ctx.deps.emit_event(event);
1025                }
1026                drop(portfolio_projection_guard);
1027
1028                let notify_start = Instant::now();
1029                self.send_portfolio_notifications(pending_notifications)
1030                    .await;
1031                histogram!("ht_rfq_notify_seconds").record(notify_start.elapsed().as_secs_f64());
1032
1033                counter!("ht_engine_journal_commits_total", "command" => "RfqExecute").increment(1);
1034                histogram!("ht_engine_journaled_seconds", "command" => "RfqExecute")
1035                    .record(start.elapsed().as_secs_f64());
1036            }
1037
1038            JournalStrategy::Sync(journal_writer) => {
1039                let postgres_span = info_span!("journal_postgres_insert_rfq");
1040                let req_uuid_for_journal = req_uuid_db;
1041                let command_data_for_journal = command_data.clone();
1042                let response_data_for_journal = response_data.clone();
1043                let pre_digest_for_journal = pre_digest.clone();
1044                let post_digest_for_journal = post_digest.clone();
1045                let events_for_journal = captured_events.clone();
1046                let fill_side_effects_for_journal = fill_side_effects.clone();
1047                let balance_updates_for_journal = captured_balance_updates.clone();
1048                let command_type_enum_for_journal = command_type_enum;
1049
1050                let append_result = async {
1051                    tokio::task::spawn_blocking(move || {
1052                        journal_writer.append_transition_with_fill_side_effects(
1053                            timestamp,
1054                            &command_data_for_journal,
1055                            response_data_for_journal.as_deref(),
1056                            None,
1057                            &pre_digest_for_journal,
1058                            &post_digest_for_journal,
1059                            duration_ms,
1060                            &events_for_journal,
1061                            &fill_side_effects_for_journal,
1062                            &balance_updates_for_journal,
1063                            req_uuid_for_journal,
1064                            command_type_enum_for_journal,
1065                        )
1066                    })
1067                    .await
1068                }
1069                .instrument(postgres_span)
1070                .await;
1071
1072                match append_result {
1073                    Ok(Ok(result)) => {
1074                        let pending_notifications = if result.was_new_insert {
1075                            counter!("ht_engine_journal_commits_total", "command" => "RfqExecute")
1076                                .increment(1);
1077
1078                            if let Some(ref publisher) = self.nats_publisher {
1079                                publisher
1080                                    .publish(crate::nats::CommandType::RfqExecute, &command_data)
1081                                    .await;
1082                            }
1083                            self.publish_balance_updates_to_nats(&captured_balance_updates)
1084                                .await;
1085
1086                            Some(
1087                                self.apply_portfolio_projection_events_sync(
1088                                    &captured_events,
1089                                    &req_id,
1090                                )
1091                                .await,
1092                            )
1093                        } else {
1094                            counter!("ht_engine_journal_race_total", "command" => "RfqExecute")
1095                                .increment(1);
1096                            None
1097                        };
1098
1099                        for event in &captured_events {
1100                            self.ctx.deps.emit_event(event);
1101                        }
1102                        drop(portfolio_projection_guard);
1103
1104                        if let Some(pending_notifications) = pending_notifications {
1105                            self.send_portfolio_notifications(pending_notifications)
1106                                .await;
1107                        }
1108                    }
1109                    Ok(Err(e)) => {
1110                        error!(
1111                            "FATAL: RFQ journal append failed for rfq_id={}: {}",
1112                            cmd.rfq_id, e
1113                        );
1114                        counter!(
1115                            "ht_engine_journal_error_total",
1116                            "reason" => "append_failed",
1117                            "command" => "RfqExecute"
1118                        )
1119                        .increment(1);
1120                        panic!(
1121                            "JOURNAL_FATAL: RFQ journal append failed for rfq_id={}: {}",
1122                            cmd.rfq_id, e
1123                        );
1124                    }
1125                    Err(e) => {
1126                        error!("FATAL: spawn_blocking failed for RFQ journal write: {}", e);
1127                        panic!(
1128                            "JOURNAL_FATAL: spawn_blocking failed for RFQ journal write: {}",
1129                            e
1130                        );
1131                    }
1132                }
1133            }
1134        }
1135
1136        // Add to known set so future duplicates (within this session) hit
1137        // the fast path.
1138        if !cmd.request_id.is_empty() {
1139            self.known_request_ids.insert(req_uuid);
1140        }
1141
1142        // Post-journal side effects: MMP fill updates. These are
1143        // intentionally non-durable — losing them on a crash is
1144        // acceptable, the next quote refresh from the QP will repopulate
1145        // MMP state. Running them after journal commit ensures the
1146        // visible-to-client durability boundary is unaffected.
1147        if let Some(ref mmp_cache) = self.ctx.deps.mmp_cache {
1148            for update in &mmp_updates {
1149                let _ = mmp_cache
1150                    .process_fill(
1151                        &update.qp_wallet,
1152                        &update.underlying,
1153                        &update.fill,
1154                        update.timestamp_ms,
1155                    )
1156                    .await;
1157            }
1158        }
1159
1160        self.publish_snapshot();
1161
1162        // ACK the caller with the planned fill_id.
1163        let _ = request
1164            .response_tx
1165            .send(hypercall_runtime_api::RfqExecuteResult::Success { fill_id });
1166    }
1167}
1168
1169fn fill_side_effects_from_events(
1170    events: &[EngineMessage],
1171) -> Vec<crate::journal::JournalFillSideEffect> {
1172    events
1173        .iter()
1174        .enumerate()
1175        .filter_map(|(event_idx, event)| match event {
1176            EngineMessage::OrderFilled { fill, accounting } => {
1177                assert_eq!(
1178                    accounting.trade_id, fill.trade_id,
1179                    "CRITICAL: fill accounting trade_id mismatch for fill {}",
1180                    fill.trade_id
1181                );
1182                let mut side_effect = crate::journal::JournalFillSideEffect::from_fill_accounting(
1183                    event_idx as i32,
1184                    accounting,
1185                );
1186                side_effect.underlying_notional = fill.underlying_notional;
1187                Some(side_effect)
1188            }
1189            _ => None,
1190        })
1191        .collect()
1192}