1use super::*;
4
5enum JournalStrategy {
7 Batched(crate::journal::JournalBatchSender),
9 Sync(crate::journal::SharedEngineJournalWriter),
11}
12
13impl JournalStrategy {
14 fn select(
17 batch_sender: &Option<crate::journal::JournalBatchSender>,
18 sync_writer: crate::journal::SharedEngineJournalWriter,
19 ) -> Self {
20 if let Some(sender) = batch_sender.clone() {
21 JournalStrategy::Batched(sender)
22 } else {
23 JournalStrategy::Sync(sync_writer)
24 }
25 }
26}
27
28impl UnifiedEngine {
29 pub(super) async fn process_order_journaled(&mut self, request: UnifiedEngineRequest) {
44 let start = Instant::now();
45
46 let request_id = request.message.request_id.clone();
47
48 let Some(ref req_id) = request_id else {
50 counter!("ht_engine_journal_fallback_total", "reason" => "missing_request_id")
51 .increment(1);
52 if cfg!(test) {
53 warn!("Order missing request_id, falling back to non-journaled processing");
54 self.process_order(request).await;
55 return;
56 }
57 panic!("JOURNAL_FATAL: request_id missing; refusing to process without idempotency.");
58 };
59 let req_uuid = uuid::Uuid::parse_str(req_id).unwrap_or_else(|e| {
60 panic!(
61 "JOURNAL_FATAL: request_id '{}' is not a valid UUID: {}. \
62 Refusing to process without a canonical idempotency key.",
63 req_id, e
64 )
65 });
66
67 let queue_wait = request.enqueued_at.elapsed().as_secs_f64();
68 histogram!("ht_engine_queue_wait_seconds").record(queue_wait);
69 decrement_pending_requests();
70 gauge!("ht_engine_pending_requests").set(get_pending_requests() as f64);
71
72 let journal_writer = match &self.journal_writer {
74 Some(w) => w.clone(),
75 None => {
76 if self.journal_batch_sender.is_none() {
79 error!(
81 "Journal unavailable: no writer or batch_sender configured. \
82 This is a configuration error - use .with_mock_journal() for tests."
83 );
84 counter!("ht_engine_journal_error_total", "reason" => "missing_writer")
85 .increment(1);
86 panic!(
87 "JOURNAL_FATAL: no writer or batch_sender configured for request_id={}. \
88 Refusing to process without durability.",
89 req_id
90 );
91 }
92 warn!("No journal_writer for idempotency check, proceeding with batch-only mode");
94 let sender = self
95 .journal_batch_sender
96 .as_ref()
97 .expect("batch sender missing in batch-only journal mode")
98 .clone();
99 self.process_order_journaled_impl(
100 request,
101 req_id,
102 req_uuid,
103 start,
104 JournalStrategy::Batched(sender),
105 )
106 .await;
107 self.known_request_ids.insert(req_uuid);
108 return;
109 }
110 };
111
112 if !self.known_request_ids.contains(&req_uuid) {
117 counter!("ht_engine_journal_cache_miss_total").increment(1);
119
120 let strategy = JournalStrategy::select(&self.journal_batch_sender, journal_writer);
122 self.process_order_journaled_impl(request, req_id, req_uuid, start, strategy)
123 .await;
124
125 self.known_request_ids.insert(req_uuid);
127 return;
128 }
129
130 counter!("ht_engine_journal_cache_hit_total").increment(1);
133
134 let idempotency_span = info_span!("journal_idempotency_check");
135 let req_id_clone = req_uuid;
136 let journal_clone = journal_writer.clone();
137 let idempotency_result = async {
138 tokio::task::spawn_blocking(move || journal_clone.get_by_request_id(&req_id_clone))
139 .await
140 }
141 .instrument(idempotency_span)
142 .await;
143
144 match idempotency_result {
145 Ok(Ok(Some(existing))) => {
146 debug!("Journal idempotency hit for request_id: {}", req_id);
148 counter!("ht_engine_journal_idempotency_hit_total").increment(1);
149
150 if let Some(cached_response) = existing.decode_response() {
151 let _ = request.response_tx.send(cached_response).await;
152 return;
153 }
154 panic!(
155 "JOURNAL_FATAL: no response_data in cached journal record \
156 for request_id={}",
157 req_id
158 );
159 }
160 Ok(Ok(None)) => {
161 debug!(
164 "Journal miss for request_id: {} (was in cache) - processing",
165 req_id
166 );
167 }
168 Ok(Err(e)) => {
169 error!("Journal query failed for request_id: {}: {}", req_id, e);
170 counter!("ht_engine_journal_error_total", "reason" => "query_error").increment(1);
171 panic!(
172 "JOURNAL_FATAL: journal query failed for request_id={}: {}. \
173 Cannot guarantee idempotency, restart required.",
174 req_id, e
175 );
176 }
177 Err(e) => {
178 error!("spawn_blocking failed for idempotency check: {}", e);
179 counter!("ht_engine_journal_error_total", "reason" => "spawn_error").increment(1);
180 panic!(
181 "JOURNAL_FATAL: spawn_blocking failed for idempotency check: {}. \
182 Runtime is degraded, restart required.",
183 e
184 );
185 }
186 }
187
188 let strategy = JournalStrategy::select(&self.journal_batch_sender, journal_writer);
191 self.process_order_journaled_impl(request, req_id, req_uuid, start, strategy)
192 .await;
193
194 }
197
198 async fn process_order_journaled_impl(
200 &mut self,
201 request: UnifiedEngineRequest,
202 req_id: &str,
203 req_uuid: uuid::Uuid,
204 start: Instant,
205 strategy: JournalStrategy,
206 ) {
207 let engine_span = info_span!("engine_step");
208 let timestamp = request.message.timestamp;
209
210 use crate::observability::command_trace::EngineStateDigest;
211 let pre_digest: EngineStateDigest = Default::default();
212
213 let original_response_tx = request.response_tx;
214
215 let env = crate::rsm::apply::CommandEnvelope::new(
216 timestamp,
217 crate::rsm::apply::EngineCommand::OrderAction(request.message.clone()),
218 );
219 let apply_output = match engine_span.in_scope(|| self.apply(env)) {
220 Ok(output) => output,
221 Err(crate::rsm::unified_engine::apply_interface::EngineError::Rejected(reason)) => {
222 warn!(
223 request_id = %req_id,
224 reason = %reason,
225 "Order rejected by engine (nonce or admission)"
226 );
227 counter!("ht_engine_order_rejected_total", "reason" => "nonce").increment(1);
228 let rejection = OrderUpdateMessage {
229 timestamp,
230 info: request.message.info.clone(),
231 status: hypercall_types::OrderUpdateStatus::Rejected,
232 reason: Some(reason),
233 filled_size: rust_decimal::Decimal::ZERO,
234 order_id: None,
235 wallet_address: request.message.wallet,
236 mmp_triggered: false,
237 request_id: request.message.request_id.clone(),
238 };
239 let _ = original_response_tx.send(rejection).await;
240 self.known_request_ids.insert(req_uuid);
241 return;
242 }
243 Err(e) => {
244 panic!(
245 "JOURNAL_FATAL: apply() failed for request_id={}: {}",
246 req_id, e
247 )
248 }
249 };
250 let engine_duration = start.elapsed();
251 histogram!("ht_engine_step_seconds").record(engine_duration.as_secs_f64());
252
253 #[cfg(feature = "rsm-state")]
254 let identity_hash = apply_output.command_identity_hash;
255 let captured_balance_updates = apply_output.balance_updates;
256 let captured_events = apply_output.events;
257 let response = apply_output
258 .order_responses
259 .into_iter()
260 .last()
261 .unwrap_or_else(|| {
262 error!(
263 "No response received from apply() for request_id: {}",
264 req_id
265 );
266 counter!("ht_engine_journal_error_total", "reason" => "no_response").increment(1);
267 panic!(
268 "JOURNAL_FATAL: apply() emitted no response for request_id={}. \
269 This violates engine response invariants.",
270 req_id
271 );
272 });
273
274 let journal_response = if request.message.action == OrderAction::CreateOrder
282 || request.message.action == OrderAction::ReplaceOrder
283 {
284 captured_events
287 .iter()
288 .rev()
289 .find_map(|e| {
290 if let EngineMessage::OrderUpdate(update) = e {
291 if update.order_id == response.order_id {
292 Some(update.clone())
293 } else {
294 None
295 }
296 } else {
297 None
298 }
299 })
300 .unwrap_or_else(|| {
301 if response.status == OrderUpdateStatus::Acked {
302 panic!(
303 "JOURNAL_FATAL: no OrderUpdate captured for order_id={:?}, \
304 request_id={}: no OrderUpdate found in {} captured events. \
305 finalize_order did not emit a terminal update; refusing to journal Acked.",
306 response.order_id,
307 req_id,
308 captured_events.len()
309 );
310 }
311 response.clone()
312 })
313 } else {
314 response.clone()
316 };
317
318 let post_digest: EngineStateDigest = Default::default();
319 #[cfg(feature = "rsm-state")]
320 let post_rsm_state_digest = crate::rsm::engine_snapshot::EngineStateDigest::from_ctx(
321 &self.ctx,
322 self.ctx
323 .l2_update_seq
324 .load(std::sync::atomic::Ordering::SeqCst),
325 );
326 let duration_ms = start.elapsed().as_millis() as u64;
327
328 let mut command_for_journal = request.message.clone();
335 if request.message.action != OrderAction::ReplaceOrder {
336 if let Some(order_id) = journal_response.order_id {
337 command_for_journal.info.order_id = Some(order_id);
338 }
339 }
340 let command_data = hypercall_types::serialize_to_wire_bytes(&command_for_journal);
341
342 let response_data = Some(hypercall_types::serialize_to_wire_bytes(&journal_response));
343 let order_id_i64 = journal_response.order_id.map(|id| {
344 i64::try_from(id).unwrap_or_else(|_| {
345 panic!(
346 "JOURNAL_FATAL: order_id {} exceeds i64 range for request_id={}",
347 id, req_id
348 )
349 })
350 });
351 let command_type_enum = Some(match request.message.action {
352 OrderAction::CreateOrder => hypercall_db_diesel::engine_enums::CommandType::CreateOrder,
353 OrderAction::CancelOrder => hypercall_db_diesel::engine_enums::CommandType::CancelOrder,
354 OrderAction::ReplaceOrder => {
355 hypercall_db_diesel::engine_enums::CommandType::ReplaceOrder
356 }
357 });
358
359 let event_payloads = self.serialize_events_for_journal(&captured_events);
361 let portfolio_projection_guard =
362 if let Some(portfolio_cache) = &self.ctx.deps.portfolio_cache {
363 Some(portfolio_cache.lock_projection_barrier().await)
364 } else {
365 None
366 };
367 let fill_side_effects = fill_side_effects_from_events(&captured_events);
368
369 let req_uuid = hypercall_db_diesel::engine_enums::DbUuid(req_uuid);
370
371 match strategy {
373 JournalStrategy::Batched(sender) => {
374 let (ack_tx, ack_rx) = tokio::sync::oneshot::channel();
378
379 let entry = crate::journal::JournalEntry {
380 received_ts_ms: timestamp,
381 command_data: command_data.clone(),
382 response_data: response_data.clone(),
383 order_id: order_id_i64,
384 pre_digest: pre_digest.clone(),
385 post_digest: post_digest.clone(),
386 duration_ms,
387 events: event_payloads,
388 outbox_appends: Vec::new(),
389 fill_side_effects: fill_side_effects.clone(),
390 cash_withdrawal_side_effect: None,
391 balance_updates: captured_balance_updates.clone(),
392 created_at: Instant::now(),
393 commit_ack: Some(ack_tx),
394 request_uuid: req_uuid,
395 command_type_enum,
396 #[cfg(feature = "rsm-state")]
397 command_identity_hash: identity_hash,
398 #[cfg(feature = "rsm-state")]
399 rsm_state_digest: Some(post_rsm_state_digest.clone()),
400 };
401
402 let push_span = info_span!("journal_batch_push");
404
405 async {
406 let msg = crate::journal::JournalMessage::Entry(entry);
407 match sender.try_send(msg) {
408 Ok(()) => {
409 counter!("ht_journal_batch_pushed_total").increment(1);
410 debug!("Pushed journal entry for request_id: {}", req_id);
411 }
412 Err(mpsc::error::TrySendError::Full(msg)) => {
413 warn!("Journal batch channel full, applying backpressure");
415 counter!("ht_journal_batch_backpressure_total").increment(1);
416 gauge!("ht_journal_batch_channel_full").set(1.0);
417
418 if let Err(e) = sender.send(msg).await {
419 panic!(
420 "CRITICAL_FAILURE: Failed to send journal entry for request_id {}: {}. \
421 Journal channel closed - idempotency will be broken. Restart required.",
422 req_id, e
423 );
424 }
425 gauge!("ht_journal_batch_channel_full").set(0.0);
426 }
427 Err(mpsc::error::TrySendError::Closed(_)) => {
428 panic!(
429 "CRITICAL_FAILURE: Journal batch channel closed for request_id {}. \
430 Journal system is dead - idempotency will be broken. Restart required.",
431 req_id
432 );
433 }
434 }
435 }
436 .instrument(push_span)
437 .await;
438
439 match ack_rx.await {
443 Ok(()) => {}
444 Err(_) => {
445 panic!(
446 "CRITICAL_FAILURE: Journal commit_ack dropped for request_id {}. \
447 Durability boundary is unknown after state mutation. \
448 Engine must stop to avoid running with non-durable state.",
449 req_id
450 );
451 }
452 }
453
454 if let Some(ref publisher) = self.nats_publisher {
457 publisher
458 .publish(crate::nats::CommandType::Order, &command_data)
459 .await;
460 }
461 self.publish_balance_updates_to_nats(&captured_balance_updates)
462 .await;
463
464 let pending_notifications = self
466 .apply_portfolio_projection_events_sync(&captured_events, req_id)
467 .await;
468
469 for event in &captured_events {
471 self.ctx.deps.emit_event(event);
472 }
473 drop(portfolio_projection_guard);
474
475 self.send_portfolio_notifications(pending_notifications)
476 .await;
477
478 self.publish_snapshot();
479 let _ = original_response_tx.send(response).await;
480 counter!("ht_engine_journal_commits_total").increment(1);
481 histogram!("ht_engine_journaled_seconds").record(start.elapsed().as_secs_f64());
482 }
483
484 JournalStrategy::Sync(journal_writer) => {
485 let postgres_span = info_span!("journal_postgres_insert");
487 let req_uuid_for_journal = req_uuid;
488 let command_data_for_journal = command_data.clone();
489 let response_data_for_journal = response_data.clone();
490 let order_id_for_journal = order_id_i64;
491 let pre_digest_for_journal = pre_digest.clone();
492 let post_digest_for_journal = post_digest.clone();
493 let events_for_journal = captured_events.clone();
494 let fill_side_effects_for_journal = fill_side_effects.clone();
495 let balance_updates_for_journal = captured_balance_updates.clone();
496 let command_type_enum_for_journal = command_type_enum;
497 let append_result = async {
498 let pg_start = Instant::now();
499 let result = tokio::task::spawn_blocking(move || {
500 journal_writer.append_transition_with_fill_side_effects(
501 timestamp,
502 &command_data_for_journal,
503 response_data_for_journal.as_deref(),
504 order_id_for_journal,
505 &pre_digest_for_journal,
506 &post_digest_for_journal,
507 duration_ms,
508 &events_for_journal,
509 &fill_side_effects_for_journal,
510 &balance_updates_for_journal,
511 req_uuid_for_journal,
512 command_type_enum_for_journal,
513 )
514 })
515 .await;
516 histogram!("ht_journal_postgres_insert_seconds")
517 .record(pg_start.elapsed().as_secs_f64());
518 result
519 }
520 .instrument(postgres_span)
521 .await;
522
523 match append_result {
524 Ok(Ok(result)) => {
525 let pending_notifications = if result.was_new_insert {
526 debug!(
527 "Journaled request_id: {} with command_id: {}",
528 req_id, result.command_id
529 );
530 counter!("ht_engine_journal_commits_total").increment(1);
531
532 if let Some(ref publisher) = self.nats_publisher {
533 publisher
534 .publish(crate::nats::CommandType::Order, &command_data)
535 .await;
536 }
537 self.publish_balance_updates_to_nats(&captured_balance_updates)
538 .await;
539
540 Some(
541 self.apply_portfolio_projection_events_sync(
542 &captured_events,
543 req_id,
544 )
545 .await,
546 )
547 } else {
548 debug!(
549 "Journal race: request_id: {} already committed by command_id: {}",
550 req_id, result.command_id
551 );
552 counter!("ht_engine_journal_race_total").increment(1);
553 None
554 };
555
556 for event in &captured_events {
558 self.ctx.deps.emit_event(event);
559 }
560 drop(portfolio_projection_guard);
561
562 if let Some(pending_notifications) = pending_notifications {
563 self.send_portfolio_notifications(pending_notifications)
564 .await;
565 }
566
567 self.publish_snapshot();
568 let _ = original_response_tx.send(response).await;
569 }
570 Ok(Err(e)) => {
571 error!(
574 "FATAL: Journal append failed for request_id: {} after state mutation: {}",
575 req_id, e
576 );
577 counter!("ht_engine_journal_error_total", "reason" => "append_failed")
578 .increment(1);
579 panic!(
580 "JOURNAL_FATAL: Journal append failed after state mutation for request_id={}: {}. \
581 Engine state is now inconsistent and unrecoverable. \
582 Restart required to restore from last consistent snapshot.",
583 req_id, e
584 );
585 }
586 Err(e) => {
587 error!(
589 "FATAL: spawn_blocking failed for journal write, request_id: {}: {}",
590 req_id, e
591 );
592 counter!("ht_engine_journal_error_total", "reason" => "spawn_error")
593 .increment(1);
594 panic!(
595 "JOURNAL_FATAL: spawn_blocking failed for journal write, request_id={}: {}. \
596 Engine state is now inconsistent and unrecoverable. \
597 Restart required to restore from last consistent snapshot.",
598 req_id, e
599 );
600 }
601 }
602
603 histogram!("ht_engine_order_processing_journaled_seconds")
604 .record(start.elapsed().as_secs_f64());
605 }
606 };
607 }
608
609 fn serialize_events_for_journal(
613 &self,
614 events: &[EngineMessage],
615 ) -> Vec<crate::journal::engine_journal_batcher::EventPayload> {
616 use crate::journal::engine_journal_batcher::EventPayload;
617 events
618 .iter()
619 .map(|e| {
620 let event_data = e
621 .serialize_inner()
622 .expect("Failed to serialize EngineMessage - this is a bug");
623 let event_topic = e.topic().to_string();
624 let event_key = e.partition_key();
625 let l2_sequence = match e {
626 EngineMessage::L2Update(l2) => l2.sequence,
627 _ => None,
628 };
629 EventPayload {
630 event_type_enum:
631 hypercall_db_diesel::engine_enums::EventType::from_engine_message(e),
632 event_topic,
633 event_key,
634 event_data,
635 l2_sequence,
636 }
637 })
638 .collect()
639 }
640
641 pub(crate) async fn apply_portfolio_projection_events_sync(
646 &self,
647 events: &[EngineMessage],
648 req_id: &str,
649 ) -> Vec<crate::read_cache::portfolio::PendingNotifications> {
650 let Some(portfolio_cache) = &self.ctx.deps.portfolio_cache else {
651 return Vec::new();
652 };
653
654 let mut pending_notifications = Vec::new();
655
656 for event in events {
657 match event {
658 EngineMessage::OrderFilled { fill, .. } => {
659 let seq = i64::try_from(fill.trade_id).unwrap_or_else(|_| {
660 panic!(
661 "CRITICAL_FAILURE: trade_id {} does not fit into i64 for synchronous portfolio update",
662 fill.trade_id
663 )
664 });
665 portfolio_cache
666 .handle_order_filled_under_barrier(fill, seq)
667 .await;
668 pending_notifications.push(
669 crate::read_cache::portfolio::PendingNotifications::Fill {
670 fill: fill.clone(),
671 },
672 );
673 debug!(
674 "Applied synchronous portfolio fill update: request_id={}, trade_id={}, symbol={}",
675 req_id, fill.trade_id, fill.symbol
676 );
677 }
678 EngineMessage::PositionExpired(expiry_msg) => {
679 pending_notifications.push(
680 portfolio_cache
681 .handle_position_expired_under_barrier(expiry_msg, None)
682 .await,
683 );
684 debug!(
685 "Applied synchronous portfolio expiry update: request_id={}, wallet={}, symbol={}",
686 req_id, expiry_msg.wallet_address, expiry_msg.symbol
687 );
688 }
689 _ => {}
690 }
691 }
692
693 pending_notifications
694 }
695
696 async fn send_portfolio_notifications(
700 &self,
701 pending_notifications: Vec<crate::read_cache::portfolio::PendingNotifications>,
702 ) {
703 let Some(portfolio_cache) = &self.ctx.deps.portfolio_cache else {
704 return;
705 };
706
707 for pending in pending_notifications {
708 portfolio_cache.send_pending_notifications(pending).await;
709 }
710 }
711
712 pub(crate) async fn apply_replayed_events_sync(
713 &mut self,
714 events: &[EngineMessage],
715 req_id: &str,
716 ) {
717 if events.is_empty() {
718 return;
719 }
720
721 let portfolio_projection_guard =
722 if let Some(portfolio_cache) = &self.ctx.deps.portfolio_cache {
723 Some(portfolio_cache.lock_projection_barrier().await)
724 } else {
725 None
726 };
727 let pending_notifications = self
728 .apply_portfolio_projection_events_sync(events, req_id)
729 .await;
730 for event in events {
731 self.ctx.deps.emit_event(event);
732 }
733 drop(portfolio_projection_guard);
734 self.send_portfolio_notifications(pending_notifications)
735 .await;
736 }
737
738 pub(crate) async fn apply_startup_replayed_events_sync(
739 &mut self,
740 events: &[EngineMessage],
741 req_id: &str,
742 ) {
743 if events.is_empty() {
744 return;
745 }
746
747 let Some(portfolio_cache) = self.ctx.deps.portfolio_cache.clone() else {
748 for event in events {
749 self.ctx.deps.emit_event(event);
750 }
751 return;
752 };
753
754 let portfolio_projection_guard = portfolio_cache.lock_projection_barrier().await;
755 let mut pending_notifications = Vec::new();
756
757 for event in events {
758 if let EngineMessage::PositionExpired(expiry_msg) = event {
759 pending_notifications.push(
760 portfolio_cache
761 .handle_replayed_position_expired_projection_under_barrier(expiry_msg)
762 .await,
763 );
764 debug!(
765 "Applied startup replay expiry projection: request_id={}, wallet={}, symbol={}",
766 req_id, expiry_msg.wallet_address, expiry_msg.symbol
767 );
768 }
769 }
770
771 for event in events {
772 self.ctx.deps.emit_event(event);
773 }
774 drop(portfolio_projection_guard);
775 self.send_portfolio_notifications(pending_notifications)
776 .await;
777 }
778
779 pub(super) async fn process_rfq_journaled(
805 &mut self,
806 request: hypercall_runtime_api::RfqExecuteRequest,
807 fill_id: String,
808 captured_events: Vec<hypercall_types::EngineMessage>,
809 captured_balance_updates: Vec<hypercall_types::BalanceUpdate>,
810 mmp_updates: Vec<crate::rsm::apply::MmpFillUpdate>,
811 ) {
812 let start = Instant::now();
813 let cmd = &request.command;
814
815 let req_id = if cmd.request_id.is_empty() {
820 warn!(
821 "RFQ command missing request_id (rfq_id={}, quote_id={}); \
822 idempotency tracking disabled for this request",
823 cmd.rfq_id, cmd.quote_id
824 );
825 counter!("ht_engine_journal_fallback_total", "reason" => "missing_request_id")
826 .increment(1);
827 use sha3::{Digest, Sha3_256};
835 let mut hasher = Sha3_256::new();
836 hasher.update(cmd.rfq_id.as_bytes());
837 hasher.update(b":");
838 hasher.update(cmd.quote_id.as_bytes());
839 let digest = hasher.finalize();
840 let mut bytes = [0u8; 16];
841 bytes.copy_from_slice(&digest[..16]);
842 bytes[6] = (bytes[6] & 0x0F) | 0x80;
844 bytes[8] = (bytes[8] & 0x3F) | 0x80;
845 uuid::Uuid::from_bytes(bytes).to_string()
846 } else {
847 cmd.request_id.clone()
848 };
849
850 let req_uuid = uuid::Uuid::parse_str(&req_id).unwrap_or_else(|e| {
851 panic!(
852 "JOURNAL_FATAL: rfq command request_id '{}' is not a valid UUID: {}",
853 req_id, e
854 )
855 });
856
857 let command_data = {
862 let mut buf = Vec::with_capacity(256);
863 buf.push(1u8);
865 rmp_serde::encode::write_named(&mut buf, cmd).unwrap_or_else(|e| {
866 panic!(
867 "JOURNAL_FATAL: failed to serialize RfqExecuteCommand for rfq_id={}: {}",
868 cmd.rfq_id, e
869 )
870 });
871 buf
872 };
873
874 let response_data: Option<Vec<u8>> =
880 Some(super::UnifiedEngine::encode_rfq_response_data(&fill_id));
881
882 let event_payloads = self.serialize_events_for_journal(&captured_events);
883 let portfolio_projection_guard =
884 if let Some(portfolio_cache) = &self.ctx.deps.portfolio_cache {
885 Some(portfolio_cache.lock_projection_barrier().await)
886 } else {
887 None
888 };
889 let fill_side_effects = fill_side_effects_from_events(&captured_events);
890
891 let timestamp = get_timestamp_millis();
892 use crate::observability::command_trace::EngineStateDigest;
893 let pre_digest: EngineStateDigest = Default::default();
894 let post_digest: EngineStateDigest = Default::default();
895 #[cfg(feature = "rsm-state")]
896 let post_rsm_state_digest = crate::rsm::engine_snapshot::EngineStateDigest::from_ctx(
897 &self.ctx,
898 self.ctx
899 .l2_update_seq
900 .load(std::sync::atomic::Ordering::SeqCst),
901 );
902 let duration_ms = start.elapsed().as_millis() as u64;
903 let req_uuid_db = hypercall_db_diesel::engine_enums::DbUuid(req_uuid);
904 let command_type_enum = Some(hypercall_db_diesel::engine_enums::CommandType::RfqExecute);
905
906 let journal_writer = match &self.journal_writer {
907 Some(w) => w.clone(),
908 None => {
909 error!(
910 "Journal unavailable for RFQ: no writer configured. \
911 Use .with_mock_journal() for tests."
912 );
913 panic!(
914 "JOURNAL_FATAL: no journal_writer configured for RFQ rfq_id={}. \
915 Refusing to process without durability.",
916 cmd.rfq_id
917 );
918 }
919 };
920
921 let strategy = JournalStrategy::select(&self.journal_batch_sender, journal_writer);
922
923 match strategy {
924 JournalStrategy::Batched(sender) => {
925 let (ack_tx, ack_rx) = tokio::sync::oneshot::channel();
926
927 let entry = crate::journal::JournalEntry {
928 received_ts_ms: timestamp,
929 command_data: command_data.clone(),
930 response_data: response_data.clone(),
931 order_id: None,
932 pre_digest: pre_digest.clone(),
933 post_digest: post_digest.clone(),
934 duration_ms,
935 events: event_payloads,
936 outbox_appends: Vec::new(),
937 fill_side_effects: fill_side_effects.clone(),
938 cash_withdrawal_side_effect: None,
939 balance_updates: captured_balance_updates.clone(),
940 created_at: Instant::now(),
941 commit_ack: Some(ack_tx),
942 request_uuid: req_uuid_db,
943 command_type_enum,
944 #[cfg(feature = "rsm-state")]
945 command_identity_hash: crate::rsm::apply::EngineCommand::RfqExecute(
946 cmd.clone(),
947 )
948 .identity_hash(),
949 #[cfg(feature = "rsm-state")]
950 rsm_state_digest: Some(post_rsm_state_digest.clone()),
951 };
952
953 let push_span = info_span!("journal_batch_push_rfq");
954 async {
955 let msg = crate::journal::JournalMessage::Entry(entry);
956 match sender.try_send(msg) {
957 Ok(()) => {
958 counter!("ht_journal_batch_pushed_total", "command" => "RfqExecute")
959 .increment(1);
960 }
961 Err(mpsc::error::TrySendError::Full(msg)) => {
962 warn!("Journal batch channel full (RFQ), applying backpressure");
963 counter!(
964 "ht_journal_batch_backpressure_total",
965 "command" => "RfqExecute"
966 )
967 .increment(1);
968 if let Err(e) = sender.send(msg).await {
969 panic!(
970 "CRITICAL_FAILURE: Failed to send RFQ journal entry for rfq_id {}: {}. \
971 Journal channel closed - idempotency will be broken. Restart required.",
972 cmd.rfq_id, e
973 );
974 }
975 }
976 Err(mpsc::error::TrySendError::Closed(_)) => {
977 panic!(
978 "CRITICAL_FAILURE: Journal batch channel closed for RFQ rfq_id {}. \
979 Journal system is dead - idempotency will be broken. Restart required.",
980 cmd.rfq_id
981 );
982 }
983 }
984 }
985 .instrument(push_span)
986 .await;
987
988 let wal_wait_start = Instant::now();
992 match ack_rx.await {
993 Ok(()) => {}
994 Err(_) => {
995 panic!(
996 "CRITICAL_FAILURE: RFQ journal commit_ack dropped for rfq_id {}. \
997 Durability boundary unknown after planning; engine must stop.",
998 cmd.rfq_id
999 );
1000 }
1001 }
1002 histogram!("ht_rfq_wal_wait_seconds")
1003 .record(wal_wait_start.elapsed().as_secs_f64());
1004
1005 let nats_start = Instant::now();
1006 if let Some(ref publisher) = self.nats_publisher {
1007 publisher
1008 .publish(crate::nats::CommandType::RfqExecute, &command_data)
1009 .await;
1010 }
1011 self.publish_balance_updates_to_nats(&captured_balance_updates)
1012 .await;
1013 histogram!("ht_rfq_nats_publish_seconds")
1014 .record(nats_start.elapsed().as_secs_f64());
1015
1016 let projection_start = Instant::now();
1017 let pending_notifications = self
1018 .apply_portfolio_projection_events_sync(&captured_events, &req_id)
1019 .await;
1020 histogram!("ht_rfq_projection_seconds")
1021 .record(projection_start.elapsed().as_secs_f64());
1022
1023 for event in &captured_events {
1024 self.ctx.deps.emit_event(event);
1025 }
1026 drop(portfolio_projection_guard);
1027
1028 let notify_start = Instant::now();
1029 self.send_portfolio_notifications(pending_notifications)
1030 .await;
1031 histogram!("ht_rfq_notify_seconds").record(notify_start.elapsed().as_secs_f64());
1032
1033 counter!("ht_engine_journal_commits_total", "command" => "RfqExecute").increment(1);
1034 histogram!("ht_engine_journaled_seconds", "command" => "RfqExecute")
1035 .record(start.elapsed().as_secs_f64());
1036 }
1037
1038 JournalStrategy::Sync(journal_writer) => {
1039 let postgres_span = info_span!("journal_postgres_insert_rfq");
1040 let req_uuid_for_journal = req_uuid_db;
1041 let command_data_for_journal = command_data.clone();
1042 let response_data_for_journal = response_data.clone();
1043 let pre_digest_for_journal = pre_digest.clone();
1044 let post_digest_for_journal = post_digest.clone();
1045 let events_for_journal = captured_events.clone();
1046 let fill_side_effects_for_journal = fill_side_effects.clone();
1047 let balance_updates_for_journal = captured_balance_updates.clone();
1048 let command_type_enum_for_journal = command_type_enum;
1049
1050 let append_result = async {
1051 tokio::task::spawn_blocking(move || {
1052 journal_writer.append_transition_with_fill_side_effects(
1053 timestamp,
1054 &command_data_for_journal,
1055 response_data_for_journal.as_deref(),
1056 None,
1057 &pre_digest_for_journal,
1058 &post_digest_for_journal,
1059 duration_ms,
1060 &events_for_journal,
1061 &fill_side_effects_for_journal,
1062 &balance_updates_for_journal,
1063 req_uuid_for_journal,
1064 command_type_enum_for_journal,
1065 )
1066 })
1067 .await
1068 }
1069 .instrument(postgres_span)
1070 .await;
1071
1072 match append_result {
1073 Ok(Ok(result)) => {
1074 let pending_notifications = if result.was_new_insert {
1075 counter!("ht_engine_journal_commits_total", "command" => "RfqExecute")
1076 .increment(1);
1077
1078 if let Some(ref publisher) = self.nats_publisher {
1079 publisher
1080 .publish(crate::nats::CommandType::RfqExecute, &command_data)
1081 .await;
1082 }
1083 self.publish_balance_updates_to_nats(&captured_balance_updates)
1084 .await;
1085
1086 Some(
1087 self.apply_portfolio_projection_events_sync(
1088 &captured_events,
1089 &req_id,
1090 )
1091 .await,
1092 )
1093 } else {
1094 counter!("ht_engine_journal_race_total", "command" => "RfqExecute")
1095 .increment(1);
1096 None
1097 };
1098
1099 for event in &captured_events {
1100 self.ctx.deps.emit_event(event);
1101 }
1102 drop(portfolio_projection_guard);
1103
1104 if let Some(pending_notifications) = pending_notifications {
1105 self.send_portfolio_notifications(pending_notifications)
1106 .await;
1107 }
1108 }
1109 Ok(Err(e)) => {
1110 error!(
1111 "FATAL: RFQ journal append failed for rfq_id={}: {}",
1112 cmd.rfq_id, e
1113 );
1114 counter!(
1115 "ht_engine_journal_error_total",
1116 "reason" => "append_failed",
1117 "command" => "RfqExecute"
1118 )
1119 .increment(1);
1120 panic!(
1121 "JOURNAL_FATAL: RFQ journal append failed for rfq_id={}: {}",
1122 cmd.rfq_id, e
1123 );
1124 }
1125 Err(e) => {
1126 error!("FATAL: spawn_blocking failed for RFQ journal write: {}", e);
1127 panic!(
1128 "JOURNAL_FATAL: spawn_blocking failed for RFQ journal write: {}",
1129 e
1130 );
1131 }
1132 }
1133 }
1134 }
1135
1136 if !cmd.request_id.is_empty() {
1139 self.known_request_ids.insert(req_uuid);
1140 }
1141
1142 if let Some(ref mmp_cache) = self.ctx.deps.mmp_cache {
1148 for update in &mmp_updates {
1149 let _ = mmp_cache
1150 .process_fill(
1151 &update.qp_wallet,
1152 &update.underlying,
1153 &update.fill,
1154 update.timestamp_ms,
1155 )
1156 .await;
1157 }
1158 }
1159
1160 self.publish_snapshot();
1161
1162 let _ = request
1164 .response_tx
1165 .send(hypercall_runtime_api::RfqExecuteResult::Success { fill_id });
1166 }
1167}
1168
1169fn fill_side_effects_from_events(
1170 events: &[EngineMessage],
1171) -> Vec<crate::journal::JournalFillSideEffect> {
1172 events
1173 .iter()
1174 .enumerate()
1175 .filter_map(|(event_idx, event)| match event {
1176 EngineMessage::OrderFilled { fill, accounting } => {
1177 assert_eq!(
1178 accounting.trade_id, fill.trade_id,
1179 "CRITICAL: fill accounting trade_id mismatch for fill {}",
1180 fill.trade_id
1181 );
1182 let mut side_effect = crate::journal::JournalFillSideEffect::from_fill_accounting(
1183 event_idx as i32,
1184 accounting,
1185 );
1186 side_effect.underlying_notional = fill.underlying_notional;
1187 Some(side_effect)
1188 }
1189 _ => None,
1190 })
1191 .collect()
1192}