1use crate::engine_enums_ext::EventTypeExt;
11use crate::observability::command_trace::EngineStateDigest;
12use diesel::prelude::*;
13use diesel::r2d2::PoolError;
14pub use hypercall_db::{JournalCommandSummary, JournalEventRecord, JournalFullRecord};
15use hypercall_db_diesel::engine_enums::{CommandType, DbUuid, EventType};
16use hypercall_db_diesel::schema::{engine_command_digests, engine_commands, engine_events};
17use hypercall_types::EngineMessage;
18use std::collections::HashMap;
19use std::sync::Arc;
20
21pub trait JournalWriter: Send + Sync {
23 fn is_durable(&self) -> bool {
25 false
26 }
27
28 #[allow(clippy::too_many_arguments)]
31 fn append_transition(
32 &self,
33 received_ts_ms: u64,
34 command_data: &[u8],
35 response_data: Option<&[u8]>,
36 order_id: Option<i64>,
37 pre_digest: &EngineStateDigest,
38 post_digest: &EngineStateDigest,
39 duration_ms: u64,
40 events: &[EngineMessage],
41 request_uuid: DbUuid,
42 command_type_enum: Option<CommandType>,
43 ) -> Result<JournalAppendResult, EngineJournalError>;
44
45 #[allow(clippy::too_many_arguments)]
50 fn append_transition_with_fill_side_effects(
51 &self,
52 received_ts_ms: u64,
53 command_data: &[u8],
54 response_data: Option<&[u8]>,
55 order_id: Option<i64>,
56 pre_digest: &EngineStateDigest,
57 post_digest: &EngineStateDigest,
58 duration_ms: u64,
59 events: &[EngineMessage],
60 fill_side_effects: &[super::JournalFillSideEffect],
61 balance_updates: &[hypercall_types::BalanceUpdate],
62 request_uuid: DbUuid,
63 command_type_enum: Option<CommandType>,
64 ) -> Result<JournalAppendResult, EngineJournalError> {
65 let _ = fill_side_effects;
66 if !balance_updates.is_empty() {
67 return Err(EngineJournalError::InvalidUsage(
68 "append_transition_with_fill_side_effects received non-empty balance_updates but this writer does not persist them".to_string(),
69 ));
70 }
71 self.append_transition(
72 received_ts_ms,
73 command_data,
74 response_data,
75 order_id,
76 pre_digest,
77 post_digest,
78 duration_ms,
79 events,
80 request_uuid,
81 command_type_enum,
82 )
83 }
84
85 fn get_by_request_id(
87 &self,
88 request_id: &uuid::Uuid,
89 ) -> Result<Option<JournalFullRecord>, EngineJournalError>;
90
91 fn request_id_exists(&self, request_id: &uuid::Uuid) -> Result<bool, EngineJournalError>;
93
94 fn get_recent_request_ids(
96 &self,
97 since_hours: i64,
98 ) -> Result<std::collections::HashSet<uuid::Uuid>, EngineJournalError>;
99
100 fn get_recent(&self, limit: usize) -> Result<Vec<JournalCommandSummary>, EngineJournalError>;
102}
103
104#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
107pub enum EventPersistenceMode {
108 #[default]
110 Full,
111 Summary,
113 None,
115}
116
117impl EventPersistenceMode {
118 pub fn from_env_str(s: &str) -> Self {
120 match s.to_lowercase().as_str() {
121 "full" => EventPersistenceMode::Full,
122 "summary" => EventPersistenceMode::Summary,
123 "none" => EventPersistenceMode::None,
124 _ => {
125 tracing::warn!(
126 "Unknown ENGINE_JOURNAL_EVENT_PERSISTENCE value '{}', defaulting to 'full'",
127 s
128 );
129 EventPersistenceMode::Full
130 }
131 }
132 }
133
134 pub fn from_config(config: &crate::backend_config::JournalRuntimeConfig) -> Self {
135 Self::from_env_str(&config.event_persistence)
136 }
137}
138
139fn get_critical_event_types(configured_values: &[String]) -> std::collections::HashSet<String> {
144 let configured: std::collections::HashSet<String> = configured_values
145 .iter()
146 .map(|value: &String| value.trim().to_string())
147 .filter(|value: &String| !value.is_empty())
148 .collect();
149 if !configured.is_empty() {
150 return configured;
151 }
152
153 ["L2Update".to_string()].into_iter().collect()
154}
155
156fn is_critical_event_type(
158 event_type: &str,
159 critical_event_types: &std::collections::HashSet<String>,
160) -> bool {
161 critical_event_types.contains(event_type)
162}
163
164#[derive(Debug)]
166pub enum EngineJournalError {
167 Connection(String),
168 Query(String),
169 Serialization(String),
170 InvalidUsage(String),
171}
172
173impl std::fmt::Display for EngineJournalError {
174 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
175 match self {
176 EngineJournalError::Connection(s) => write!(f, "Database connection error: {}", s),
177 EngineJournalError::Query(s) => write!(f, "Query error: {}", s),
178 EngineJournalError::Serialization(s) => write!(f, "Serialization error: {}", s),
179 EngineJournalError::InvalidUsage(s) => write!(f, "Invalid journal usage: {}", s),
180 }
181 }
182}
183
184impl std::error::Error for EngineJournalError {}
185
186impl From<diesel::result::Error> for EngineJournalError {
187 fn from(err: diesel::result::Error) -> Self {
188 EngineJournalError::Query(err.to_string())
189 }
190}
191
192impl From<PoolError> for EngineJournalError {
193 fn from(err: PoolError) -> Self {
194 EngineJournalError::Connection(err.to_string())
195 }
196}
197
198impl From<serde_json::Error> for EngineJournalError {
199 fn from(err: serde_json::Error) -> Self {
200 EngineJournalError::Serialization(err.to_string())
201 }
202}
203
204fn ensure_no_fill_events(
205 events: &[EngineMessage],
206 entrypoint: &str,
207) -> Result<(), EngineJournalError> {
208 if events
209 .iter()
210 .any(|event| matches!(event, EngineMessage::OrderFilled { .. }))
211 {
212 return Err(EngineJournalError::InvalidUsage(format!(
213 "{} cannot journal OrderFilled events, use append_transition_with_fill_side_effects",
214 entrypoint
215 )));
216 }
217 Ok(())
218}
219
220#[derive(Debug, Clone)]
222pub struct JournalAppendResult {
223 pub command_id: i64,
225 pub response_data: Option<Vec<u8>>,
227 pub events: Vec<EngineMessage>,
229 pub was_new_insert: bool,
231}
232
233pub struct EngineJournalWriter {
235 pool: crate::db_handler::DbPool,
236 event_persistence_mode: EventPersistenceMode,
238 critical_event_types: std::collections::HashSet<String>,
239}
240
241impl EngineJournalWriter {
242 pub fn new(pool: crate::db_handler::DbPool) -> Self {
244 Self {
245 pool,
246 event_persistence_mode: EventPersistenceMode::default(),
247 critical_event_types: get_critical_event_types(&[]),
248 }
249 }
250
251 pub fn new_with_mode(pool: crate::db_handler::DbPool, mode: EventPersistenceMode) -> Self {
253 Self {
254 pool,
255 event_persistence_mode: mode,
256 critical_event_types: get_critical_event_types(&[]),
257 }
258 }
259
260 pub fn from_database_url_with_config(
261 database_url: &str,
262 pool_max: u32,
263 config: &crate::backend_config::JournalRuntimeConfig,
264 ) -> Result<Self, EngineJournalError> {
265 let auth = crate::db_handler::DbAuthConfig::password(database_url);
266 Self::from_auth_config(&auth, pool_max, config)
267 }
268
269 pub fn from_auth_config(
270 auth: &crate::db_handler::DbAuthConfig,
271 pool_max: u32,
272 config: &crate::backend_config::JournalRuntimeConfig,
273 ) -> Result<Self, EngineJournalError> {
274 let pool = hypercall_db_diesel::build_db_pool(auth, pool_max, 30_000, 10_000)
275 .map_err(|e| EngineJournalError::Connection(e.to_string()))?;
276 Ok(Self {
277 pool,
278 event_persistence_mode: EventPersistenceMode::from_config(config),
279 critical_event_types: get_critical_event_types(&config.critical_event_types),
280 })
281 }
282
283 pub fn event_persistence_mode(&self) -> EventPersistenceMode {
285 self.event_persistence_mode
286 }
287
288 #[allow(clippy::too_many_arguments)]
306 pub fn append_transition(
307 &self,
308 received_ts_ms: u64,
309 command_data: &[u8],
310 response_data: Option<&[u8]>,
311 order_id: Option<i64>,
312 pre_digest: &EngineStateDigest,
313 post_digest: &EngineStateDigest,
314 duration_ms: u64,
315 events: &[EngineMessage],
316 request_uuid: DbUuid,
317 command_type_enum: Option<CommandType>,
318 ) -> Result<JournalAppendResult, EngineJournalError> {
319 ensure_no_fill_events(events, "append_transition()")?;
320 self.append_transition_internal(
321 received_ts_ms,
322 command_data,
323 response_data,
324 order_id,
325 pre_digest,
326 post_digest,
327 duration_ms,
328 events,
329 &[],
330 &[],
331 request_uuid,
332 command_type_enum,
333 )
334 }
335
336 #[allow(clippy::too_many_arguments)]
337 fn append_transition_internal(
338 &self,
339 received_ts_ms: u64,
340 command_data: &[u8],
341 response_data: Option<&[u8]>,
342 order_id: Option<i64>,
343 pre_digest: &EngineStateDigest,
344 post_digest: &EngineStateDigest,
345 duration_ms: u64,
346 events: &[EngineMessage],
347 fill_side_effects: &[super::JournalFillSideEffect],
348 balance_updates: &[hypercall_types::BalanceUpdate],
349 request_uuid: DbUuid,
350 command_type_enum: Option<CommandType>,
351 ) -> Result<JournalAppendResult, EngineJournalError> {
352 let mut conn = self.pool.get()?;
353
354 if let Some(existing) = self.get_by_request_id_internal(&mut conn, &request_uuid.0)? {
356 return Ok(JournalAppendResult {
358 command_id: existing.command_id,
359 response_data: existing.response_data,
360 events: existing
361 .events
362 .into_iter()
363 .map(|e| {
364 EngineMessage::deserialize_from_wire(&e.event_topic, &e.event_data)
365 .expect("Failed to parse stored journal event payload")
366 })
367 .collect(),
368 was_new_insert: false,
369 });
370 }
371
372 use crate::journal::engine_journal_batcher::EventPayload;
377 let event_payloads: Vec<EventPayload> = match self.event_persistence_mode {
378 EventPersistenceMode::Full => events
379 .iter()
380 .map(|e| {
381 let event_type_enum = EventType::from_engine_message(e);
382 let event_data = e
383 .serialize_inner()
384 .expect("Failed to serialize EngineMessage");
385 let event_topic = e.topic().to_string();
386 let event_key = e.partition_key();
387 let l2_sequence = match e {
388 EngineMessage::L2Update(l2) => l2.sequence,
389 _ => None,
390 };
391 EventPayload {
392 event_topic,
393 event_key,
394 event_data,
395 l2_sequence,
396 event_type_enum,
397 }
398 })
399 .collect(),
400 EventPersistenceMode::Summary | EventPersistenceMode::None => {
401 events
403 .iter()
404 .filter(|e| {
405 self.event_persistence_mode == EventPersistenceMode::Summary
406 || is_critical_event_type(
407 EventType::from_engine_message(e).as_str(),
408 &self.critical_event_types,
409 )
410 })
411 .map(|e| {
412 let event_type_enum = EventType::from_engine_message(e);
413 let event_data = e
414 .serialize_inner()
415 .expect("Failed to serialize EngineMessage");
416 let event_topic = e.topic().to_string();
417 let event_key = e.partition_key();
418 let l2_sequence = match e {
419 EngineMessage::L2Update(l2) => l2.sequence,
420 _ => None,
421 };
422 EventPayload {
423 event_topic,
424 event_key,
425 event_data,
426 l2_sequence,
427 event_type_enum,
428 }
429 })
430 .collect()
431 }
432 };
433
434 conn.transaction::<_, EngineJournalError, _>(|conn| {
436 let insert_result: Option<i64> = diesel::insert_into(engine_commands::table)
439 .values((
440 engine_commands::received_ts_ms.eq(received_ts_ms as i64),
441 engine_commands::command_data.eq(command_data),
442 engine_commands::response_data.eq(response_data),
443 engine_commands::order_id.eq(order_id),
444 engine_commands::request_uuid.eq(request_uuid),
445 engine_commands::command_type_enum.eq(command_type_enum),
446 ))
447 .on_conflict(engine_commands::request_uuid)
448 .do_nothing()
449 .returning(engine_commands::command_id)
450 .get_result(conn)
451 .optional()?;
452
453 let (command_id, was_new_insert) = match insert_result {
454 Some(id) => (id, true),
455 None => {
456 let existing_id: i64 = engine_commands::table
458 .filter(engine_commands::request_uuid.eq(request_uuid))
459 .select(engine_commands::command_id)
460 .first(conn)?;
461 (existing_id, false)
462 }
463 };
464
465 if was_new_insert {
467 if !event_payloads.is_empty() {
469 let event_rows: Vec<_> = event_payloads
470 .iter()
471 .enumerate()
472 .map(|(idx, event)| {
473 (
474 engine_events::command_id.eq(command_id),
475 engine_events::event_idx.eq(idx as i32),
476 engine_events::event_data.eq(&event.event_data),
477 engine_events::event_key.eq(&event.event_key),
478 engine_events::l2_sequence.eq(event.l2_sequence),
479 engine_events::event_type_enum.eq(event.event_type_enum),
480 )
481 })
482 .collect();
483
484 diesel::insert_into(engine_events::table)
485 .values(&event_rows)
486 .execute(conn)?;
487 }
488
489 let fill_side_effects_by_idx: HashMap<i32, &super::JournalFillSideEffect> =
490 fill_side_effects
491 .iter()
492 .map(|side_effect| (side_effect.event_idx, side_effect))
493 .collect();
494 for (idx, event) in events.iter().enumerate() {
495 let EngineMessage::OrderFilled { fill, .. } = event else {
496 continue;
497 };
498
499 let side_effect = fill_side_effects_by_idx.get(&(idx as i32)).unwrap_or_else(|| {
500 panic!(
501 "CRITICAL_FAILURE: missing journal fill side-effect for sync command_id={}, event_idx={}, trade_id={}",
502 command_id, idx, fill.trade_id
503 )
504 });
505 crate::db_handler::DieselEventHandler::persist_fill_with_side_effects_in_tx(
506 conn,
507 fill,
508 side_effect,
509 )
510 .map_err(|e| EngineJournalError::Query(e.to_string()))?;
511 }
512
513 for balance_update in balance_updates {
514 hypercall_db_diesel::ledger_ops::apply_pnl_decimal_sync(
515 conn,
516 &balance_update.wallet,
517 balance_update.delta,
518 )
519 .map_err(|e| EngineJournalError::Query(e.to_string()))?;
520 }
521
522 let pre_digest_data = hypercall_types::serialize_to_wire_bytes(pre_digest);
524 let post_digest_data =
525 hypercall_types::serialize_to_wire_bytes(post_digest);
526 diesel::insert_into(engine_command_digests::table)
527 .values((
528 engine_command_digests::command_id.eq(command_id),
529 engine_command_digests::duration_ms.eq(duration_ms as i64),
530 engine_command_digests::pre_digest_data.eq(&pre_digest_data),
531 engine_command_digests::post_digest_data.eq(&post_digest_data),
532 ))
533 .on_conflict(engine_command_digests::command_id)
534 .do_nothing()
535 .execute(conn)?;
536 }
537
538 Ok(JournalAppendResult {
539 command_id,
540 response_data: response_data.map(|d| d.to_vec()),
541 events: events.to_vec(),
542 was_new_insert,
543 })
544 })
545 }
546
547 pub fn get_by_request_id(
549 &self,
550 request_id: &uuid::Uuid,
551 ) -> Result<Option<JournalFullRecord>, EngineJournalError> {
552 let mut conn = self.pool.get()?;
553 self.get_by_request_id_internal(&mut conn, request_id)
554 }
555
556 fn get_by_request_id_internal(
557 &self,
558 conn: &mut PgConnection,
559 request_id: &uuid::Uuid,
560 ) -> Result<Option<JournalFullRecord>, EngineJournalError> {
561 let command: Option<(
563 i64,
564 DbUuid,
565 i64,
566 Option<CommandType>,
567 Option<Vec<u8>>,
568 Option<Vec<u8>>,
569 chrono::DateTime<chrono::Utc>,
570 )> = engine_commands::table
571 .filter(engine_commands::request_uuid.eq(DbUuid(*request_id)))
572 .select((
573 engine_commands::command_id,
574 engine_commands::request_uuid,
575 engine_commands::received_ts_ms,
576 engine_commands::command_type_enum,
577 engine_commands::command_data,
578 engine_commands::response_data,
579 engine_commands::created_at,
580 ))
581 .first(conn)
582 .optional()?;
583
584 let Some((
585 command_id,
586 req_id,
587 received_ts_ms,
588 command_type,
589 command_data,
590 response_data,
591 created_at,
592 )) = command
593 else {
594 return Ok(None);
595 };
596
597 let events: Vec<(i64, i32, EventType, Vec<u8>, Option<String>, Option<i64>)> =
599 engine_events::table
600 .filter(engine_events::command_id.eq(command_id))
601 .order(engine_events::event_idx.asc())
602 .select((
603 engine_events::event_id,
604 engine_events::event_idx,
605 engine_events::event_type_enum,
606 engine_events::event_data,
607 engine_events::event_key,
608 engine_events::l2_sequence,
609 ))
610 .load(conn)?;
611
612 let digest: Option<(Vec<u8>, Vec<u8>, i64)> = engine_command_digests::table
614 .filter(engine_command_digests::command_id.eq(command_id))
615 .select((
616 engine_command_digests::pre_digest_data,
617 engine_command_digests::post_digest_data,
618 engine_command_digests::duration_ms,
619 ))
620 .first(conn)
621 .optional()?;
622
623 let (pre_digest, post_digest, duration_ms) = digest
624 .map(|(pre, post, dur)| {
625 let pre =
626 rmp_serde::from_slice::<serde_json::Value>(&pre[1..]).unwrap_or_else(|e| {
627 panic!(
628 "Failed to decode pre_digest_data msgpack (command_id={}): {}",
629 command_id, e
630 )
631 });
632 let post =
633 rmp_serde::from_slice::<serde_json::Value>(&post[1..]).unwrap_or_else(|e| {
634 panic!(
635 "Failed to decode post_digest_data msgpack (command_id={}): {}",
636 command_id, e
637 )
638 });
639 (Some(pre), Some(post), Some(dur))
640 })
641 .unwrap_or((None, None, None));
642
643 Ok(Some(JournalFullRecord {
644 command_id,
645 request_id: req_id.to_string(),
646 received_ts_ms,
647 command_type: command_type
648 .map(|ct| ct.as_str().to_string())
649 .unwrap_or_default(),
650 command_data: command_data.expect("command_data is NULL — run backfill first"),
651 response_data,
652 pre_digest,
653 post_digest,
654 duration_ms,
655 events: events
656 .into_iter()
657 .map(
658 |(event_id, event_idx, event_type_enum, event_data, event_key, l2_sequence)| {
659 JournalEventRecord {
660 event_id,
661 event_idx,
662 event_type: event_type_enum.as_str().to_string(),
663 event_data,
664 event_topic: event_type_enum.topic().to_string(),
665 event_key,
666 l2_sequence,
667 }
668 },
669 )
670 .collect(),
671 created_at,
672 }))
673 }
674
675 pub fn get_recent(
677 &self,
678 limit: usize,
679 ) -> Result<Vec<JournalCommandSummary>, EngineJournalError> {
680 let mut conn = self.pool.get()?;
681
682 let commands: Vec<(
684 i64,
685 DbUuid,
686 i64,
687 Option<CommandType>,
688 Option<Vec<u8>>,
689 chrono::DateTime<chrono::Utc>,
690 )> = engine_commands::table
691 .order(engine_commands::created_at.desc())
692 .limit(limit as i64)
693 .select((
694 engine_commands::command_id,
695 engine_commands::request_uuid,
696 engine_commands::received_ts_ms,
697 engine_commands::command_type_enum,
698 engine_commands::response_data,
699 engine_commands::created_at,
700 ))
701 .load(&mut conn)?;
702
703 let mut summaries = Vec::with_capacity(commands.len());
704 for (command_id, request_id, received_ts_ms, command_type, response_data, created_at) in
705 commands
706 {
707 let event_count: i64 = engine_events::table
709 .filter(engine_events::command_id.eq(command_id))
710 .count()
711 .get_result(&mut conn)?;
712
713 let event_types_sample: Vec<EventType> = engine_events::table
715 .filter(engine_events::command_id.eq(command_id))
716 .order(engine_events::event_idx.asc())
717 .limit(5)
718 .select(engine_events::event_type_enum)
719 .load(&mut conn)?;
720
721 let digest: Option<(Vec<u8>, Vec<u8>, i64)> = engine_command_digests::table
723 .filter(engine_command_digests::command_id.eq(command_id))
724 .select((
725 engine_command_digests::pre_digest_data,
726 engine_command_digests::post_digest_data,
727 engine_command_digests::duration_ms,
728 ))
729 .first(&mut conn)
730 .optional()?;
731
732 let (pre_digest, post_digest, duration_ms) = digest
733 .map(|(pre, post, dur)| {
734 let pre =
735 rmp_serde::from_slice::<serde_json::Value>(&pre[1..]).unwrap_or_else(|e| {
736 panic!(
737 "Failed to decode pre_digest_data msgpack (command_id={}): {}",
738 command_id, e
739 )
740 });
741 let post = rmp_serde::from_slice::<serde_json::Value>(&post[1..])
742 .unwrap_or_else(|e| {
743 panic!(
744 "Failed to decode post_digest_data msgpack (command_id={}): {}",
745 command_id, e
746 )
747 });
748 (Some(pre), Some(post), Some(dur))
749 })
750 .unwrap_or((None, None, None));
751
752 summaries.push(JournalCommandSummary {
753 command_id,
754 request_id: request_id.to_string(),
755 received_ts_ms,
756 command_type: command_type
757 .map(|ct| ct.as_str().to_string())
758 .unwrap_or_default(),
759 response_data,
760 event_count,
761 event_types_sample: event_types_sample
762 .into_iter()
763 .map(|e| e.as_str().to_string())
764 .collect(),
765 pre_digest,
766 post_digest,
767 duration_ms,
768 created_at,
769 });
770 }
771
772 Ok(summaries)
773 }
774
775 pub fn get_recent_request_ids(
779 &self,
780 since_hours: i64,
781 ) -> Result<std::collections::HashSet<uuid::Uuid>, EngineJournalError> {
782 let mut conn = self.pool.get()?;
783
784 let cutoff = chrono::Utc::now() - chrono::Duration::hours(since_hours);
786
787 let request_ids: Vec<DbUuid> = engine_commands::table
788 .filter(engine_commands::created_at.ge(cutoff))
789 .select(engine_commands::request_uuid)
790 .load(&mut conn)?;
791
792 let count = request_ids.len();
793 tracing::info!(
794 "Loaded {} request_ids from journal for idempotency cache (since {} hours ago)",
795 count,
796 since_hours
797 );
798
799 Ok(request_ids.into_iter().map(|id| id.0).collect())
800 }
801
802 pub fn request_id_exists(&self, request_id: &uuid::Uuid) -> Result<bool, EngineJournalError> {
805 let mut conn = self.pool.get()?;
806
807 let count: i64 = engine_commands::table
808 .filter(engine_commands::request_uuid.eq(DbUuid(*request_id)))
809 .count()
810 .get_result(&mut conn)?;
811
812 Ok(count > 0)
813 }
814}
815
816impl JournalWriter for EngineJournalWriter {
817 fn is_durable(&self) -> bool {
818 true
819 }
820
821 fn append_transition(
822 &self,
823 received_ts_ms: u64,
824 command_data: &[u8],
825 response_data: Option<&[u8]>,
826 order_id: Option<i64>,
827 pre_digest: &EngineStateDigest,
828 post_digest: &EngineStateDigest,
829 duration_ms: u64,
830 events: &[EngineMessage],
831 request_uuid: DbUuid,
832 command_type_enum: Option<CommandType>,
833 ) -> Result<JournalAppendResult, EngineJournalError> {
834 EngineJournalWriter::append_transition(
835 self,
836 received_ts_ms,
837 command_data,
838 response_data,
839 order_id,
840 pre_digest,
841 post_digest,
842 duration_ms,
843 events,
844 request_uuid,
845 command_type_enum,
846 )
847 }
848
849 fn append_transition_with_fill_side_effects(
850 &self,
851 received_ts_ms: u64,
852 command_data: &[u8],
853 response_data: Option<&[u8]>,
854 order_id: Option<i64>,
855 pre_digest: &EngineStateDigest,
856 post_digest: &EngineStateDigest,
857 duration_ms: u64,
858 events: &[EngineMessage],
859 fill_side_effects: &[super::JournalFillSideEffect],
860 balance_updates: &[hypercall_types::BalanceUpdate],
861 request_uuid: DbUuid,
862 command_type_enum: Option<CommandType>,
863 ) -> Result<JournalAppendResult, EngineJournalError> {
864 self.append_transition_internal(
865 received_ts_ms,
866 command_data,
867 response_data,
868 order_id,
869 pre_digest,
870 post_digest,
871 duration_ms,
872 events,
873 fill_side_effects,
874 balance_updates,
875 request_uuid,
876 command_type_enum,
877 )
878 }
879
880 fn get_by_request_id(
881 &self,
882 request_id: &uuid::Uuid,
883 ) -> Result<Option<JournalFullRecord>, EngineJournalError> {
884 EngineJournalWriter::get_by_request_id(self, request_id)
885 }
886
887 fn request_id_exists(&self, request_id: &uuid::Uuid) -> Result<bool, EngineJournalError> {
888 EngineJournalWriter::request_id_exists(self, request_id)
889 }
890
891 fn get_recent_request_ids(
892 &self,
893 since_hours: i64,
894 ) -> Result<std::collections::HashSet<uuid::Uuid>, EngineJournalError> {
895 EngineJournalWriter::get_recent_request_ids(self, since_hours)
896 }
897
898 fn get_recent(&self, limit: usize) -> Result<Vec<JournalCommandSummary>, EngineJournalError> {
899 EngineJournalWriter::get_recent(self, limit)
900 }
901}
902
903pub struct InMemoryJournalWriter {
906 entries: std::sync::Mutex<HashMap<uuid::Uuid, JournalFullRecord>>,
907 next_command_id: std::sync::atomic::AtomicI64,
908 durable: bool,
909}
910
911impl InMemoryJournalWriter {
912 pub fn new() -> Self {
914 Self::with_durability(false)
915 }
916
917 fn with_durability(durable: bool) -> Self {
918 Self {
919 entries: std::sync::Mutex::new(HashMap::new()),
920 next_command_id: std::sync::atomic::AtomicI64::new(1),
921 durable,
922 }
923 }
924
925 #[cfg(test)]
926 pub(crate) fn durable_for_tests() -> Self {
927 Self::with_durability(true)
928 }
929
930 pub fn get_all_entries(&self) -> Vec<JournalFullRecord> {
932 let entries = self.entries.lock().unwrap();
933 entries.values().cloned().collect()
934 }
935
936 pub fn clear(&self) {
938 let mut entries = self.entries.lock().unwrap();
939 entries.clear();
940 }
941
942 pub fn len(&self) -> usize {
944 let entries = self.entries.lock().unwrap();
945 entries.len()
946 }
947
948 pub fn is_empty(&self) -> bool {
950 self.len() == 0
951 }
952}
953
954#[cfg(test)]
955pub(crate) fn request_uuid_for_test(uuid: uuid::Uuid) -> DbUuid {
956 DbUuid(uuid)
957}
958
959impl Default for InMemoryJournalWriter {
960 fn default() -> Self {
961 Self::new()
962 }
963}
964
965impl JournalWriter for InMemoryJournalWriter {
966 fn is_durable(&self) -> bool {
967 self.durable
968 }
969
970 fn append_transition(
971 &self,
972 received_ts_ms: u64,
973 command_data: &[u8],
974 response_data: Option<&[u8]>,
975 _order_id: Option<i64>,
976 pre_digest: &EngineStateDigest,
977 post_digest: &EngineStateDigest,
978 duration_ms: u64,
979 events: &[EngineMessage],
980 request_uuid: DbUuid,
981 command_type_enum: Option<CommandType>,
982 ) -> Result<JournalAppendResult, EngineJournalError> {
983 let mut entries = self.entries.lock().unwrap();
984
985 if let Some(existing) = entries.get(&request_uuid.0) {
987 return Ok(JournalAppendResult {
988 command_id: existing.command_id,
989 response_data: existing.response_data.clone(),
990 events: existing
991 .events
992 .iter()
993 .map(|e| {
994 EngineMessage::deserialize_from_wire(&e.event_topic, &e.event_data)
995 .expect("Failed to parse in-memory journal event payload")
996 })
997 .collect(),
998 was_new_insert: false,
999 });
1000 }
1001
1002 let command_id = self
1004 .next_command_id
1005 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1006
1007 let event_records: Vec<JournalEventRecord> = events
1008 .iter()
1009 .enumerate()
1010 .map(|(idx, e)| {
1011 let event_type = EventType::from_engine_message(e).as_str().to_string();
1012 let event_data = e
1013 .serialize_inner()
1014 .expect("Failed to serialize EngineMessage");
1015 let event_topic = e.topic().to_string();
1016 let event_key = e.partition_key();
1017 let l2_sequence = match e {
1018 EngineMessage::L2Update(l2) => l2.sequence,
1019 _ => None,
1020 };
1021 JournalEventRecord {
1022 event_id: (command_id * 1000) + idx as i64, event_idx: idx as i32,
1024 event_type,
1025 event_data,
1026 event_topic,
1027 event_key,
1028 l2_sequence,
1029 }
1030 })
1031 .collect();
1032
1033 let record = JournalFullRecord {
1034 command_id,
1035 request_id: request_uuid.0.to_string(),
1036 received_ts_ms: received_ts_ms as i64,
1037 command_type: command_type_enum
1038 .map(|ct| ct.as_str().to_string())
1039 .unwrap_or_default(),
1040 command_data: command_data.to_vec(),
1041 response_data: response_data.map(|d| d.to_vec()),
1042 pre_digest: serde_json::to_value(pre_digest).ok(),
1043 post_digest: serde_json::to_value(post_digest).ok(),
1044 duration_ms: Some(duration_ms as i64),
1045 events: event_records,
1046 created_at: chrono::Utc::now(),
1047 };
1048
1049 entries.insert(request_uuid.0, record);
1050
1051 Ok(JournalAppendResult {
1052 command_id,
1053 response_data: response_data.map(|d| d.to_vec()),
1054 events: events.to_vec(),
1055 was_new_insert: true,
1056 })
1057 }
1058
1059 #[allow(clippy::too_many_arguments)]
1060 fn append_transition_with_fill_side_effects(
1061 &self,
1062 received_ts_ms: u64,
1063 command_data: &[u8],
1064 response_data: Option<&[u8]>,
1065 order_id: Option<i64>,
1066 pre_digest: &EngineStateDigest,
1067 post_digest: &EngineStateDigest,
1068 duration_ms: u64,
1069 events: &[EngineMessage],
1070 _fill_side_effects: &[super::JournalFillSideEffect],
1071 _balance_updates: &[hypercall_types::BalanceUpdate],
1072 request_uuid: DbUuid,
1073 command_type_enum: Option<CommandType>,
1074 ) -> Result<JournalAppendResult, EngineJournalError> {
1075 self.append_transition(
1076 received_ts_ms,
1077 command_data,
1078 response_data,
1079 order_id,
1080 pre_digest,
1081 post_digest,
1082 duration_ms,
1083 events,
1084 request_uuid,
1085 command_type_enum,
1086 )
1087 }
1088
1089 fn get_by_request_id(
1090 &self,
1091 request_id: &uuid::Uuid,
1092 ) -> Result<Option<JournalFullRecord>, EngineJournalError> {
1093 let entries = self.entries.lock().unwrap();
1094 Ok(entries.get(request_id).cloned())
1095 }
1096
1097 fn request_id_exists(&self, request_id: &uuid::Uuid) -> Result<bool, EngineJournalError> {
1098 let entries = self.entries.lock().unwrap();
1099 Ok(entries.contains_key(request_id))
1100 }
1101
1102 fn get_recent_request_ids(
1103 &self,
1104 _since_hours: i64,
1105 ) -> Result<std::collections::HashSet<uuid::Uuid>, EngineJournalError> {
1106 let entries = self.entries.lock().unwrap();
1107 Ok(entries.keys().cloned().collect())
1108 }
1109
1110 fn get_recent(&self, limit: usize) -> Result<Vec<JournalCommandSummary>, EngineJournalError> {
1111 let entries = self.entries.lock().unwrap();
1112 let mut summaries: Vec<JournalCommandSummary> = entries
1113 .values()
1114 .map(|record| JournalCommandSummary {
1115 command_id: record.command_id,
1116 request_id: record.request_id.clone(),
1117 received_ts_ms: record.received_ts_ms,
1118 command_type: record.command_type.clone(),
1119 response_data: record.response_data.clone(),
1120 event_count: record.events.len() as i64,
1121 event_types_sample: record
1122 .events
1123 .iter()
1124 .take(5)
1125 .map(|e| e.event_type.clone())
1126 .collect(),
1127 pre_digest: record.pre_digest.clone(),
1128 post_digest: record.post_digest.clone(),
1129 duration_ms: record.duration_ms,
1130 created_at: record.created_at,
1131 })
1132 .collect();
1133
1134 summaries.sort_by_key(|s| std::cmp::Reverse(s.created_at));
1136 summaries.truncate(limit);
1137 Ok(summaries)
1138 }
1139}
1140
1141pub type SharedEngineJournalWriter = Arc<dyn JournalWriter>;
1143
1144#[cfg(test)]
1145mod tests {
1146 use super::*;
1147
1148 #[cfg(feature = "test-utils")]
1149 async fn setup_test_pool() -> (
1150 hypercall_db_diesel::DbPool,
1151 Option<testcontainers::ContainerAsync<testcontainers_modules::postgres::Postgres>>,
1152 crate::test_contracts::TestnetContractEnvGuard,
1153 ) {
1154 use crate::test_contracts::test_database_url_with_database;
1155 use testcontainers::runners::AsyncRunner;
1156 use testcontainers::ImageExt;
1157 use testcontainers_modules::postgres::Postgres;
1158
1159 let env_guard = crate::test_contracts::TestnetContractEnvGuard::apply();
1160
1161 let (database_url, container) = if let Ok(url) = std::env::var("TEST_DATABASE_URL") {
1162 let db_name = format!("test_sync_journal_{}", uuid::Uuid::new_v4().simple());
1163 let admin_pool = sqlx::postgres::PgPoolOptions::new()
1164 .max_connections(1)
1165 .connect(&url)
1166 .await
1167 .expect("connect to TEST_DATABASE_URL");
1168 sqlx::query(&format!(r#"CREATE DATABASE "{}""#, db_name))
1169 .execute(&admin_pool)
1170 .await
1171 .expect("create isolated test database");
1172 admin_pool.close().await;
1173 let db_url = test_database_url_with_database(&url, &db_name);
1174 (db_url, None)
1175 } else {
1176 let container = Postgres::default()
1177 .with_db_name("test_sync_journal")
1178 .with_user("test_user")
1179 .with_password("test_password")
1180 .with_tag("16-alpine")
1181 .start()
1182 .await
1183 .expect("start postgres container");
1184
1185 let port = container
1186 .get_host_port_ipv4(5432)
1187 .await
1188 .expect("get postgres port");
1189 let url = format!(
1190 "postgresql://test_user:test_password@127.0.0.1:{}/test_sync_journal",
1191 port
1192 );
1193 tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
1194
1195 (url, Some(container))
1196 };
1197
1198 let _handler = crate::db_handler::DieselEventHandler::new(&database_url)
1199 .expect("create diesel handler and run migrations");
1200 let auth = hypercall_db_diesel::DbAuthConfig::password(&database_url);
1201 let pool = hypercall_db_diesel::build_db_pool(&auth, 5, 30_000, 10_000)
1202 .expect("build diesel pool");
1203
1204 (pool, container, env_guard)
1205 }
1206
1207 #[test]
1208 fn test_event_type_topic_derive() {
1209 use hypercall_types::OrderActionMessage;
1211 use hypercall_types::{OrderInfo, Side, WalletAddress};
1212 use rust_decimal_macros::dec;
1213 use std::str::FromStr;
1214
1215 let wallet = WalletAddress::from_str("0x1234567890123456789012345678901234567890").unwrap();
1216 let msg = OrderActionMessage {
1217 timestamp: 1000,
1218 info: OrderInfo {
1219 symbol: "TEST".to_string(),
1220 price: dec!(1.0),
1221 size: dec!(1.0),
1222 side: Side::Buy,
1223 tif: hypercall_types::TimeInForce::GTC,
1224 client_id: None,
1225 order_id: None,
1226 is_perp: false,
1227 underlying: None,
1228 reduce_only: None,
1229 nonce: None,
1230 signature: None,
1231 mmp_enabled: false,
1232 builder_code_address: None,
1233 },
1234 action: hypercall_types::OrderAction::CreateOrder,
1235 wallet,
1236 api_wallet_address: None,
1237 mmp_triggered: false,
1238 request_id: None,
1239 };
1240
1241 let event = EngineMessage::OrderAction(msg);
1242 assert_eq!(
1243 EventType::from_engine_message(&event).as_str(),
1244 "OrderAction"
1245 );
1246 }
1247
1248 #[cfg(feature = "test-utils")]
1249 #[tokio::test]
1250 #[serial_test::serial(testnet_env)]
1251 async fn sync_writer_materializes_balance_updates_once() {
1252 use diesel::prelude::*;
1253 use rust_decimal_macros::dec;
1254
1255 let (pool, _container, _env_guard) = setup_test_pool().await;
1256 let writer = EngineJournalWriter::new(pool.clone());
1257 let request_uuid = DbUuid(uuid::Uuid::new_v4());
1258 let wallet = hypercall_types::wallet_address::test_wallet(11);
1259 let update = hypercall_types::BalanceUpdate {
1260 balance_update_seq: 1,
1261 wallet,
1262 delta: dec!(42),
1263 balance_after: dec!(42),
1264 reason: hypercall_types::BalanceUpdateReason::Deposit,
1265 reference_id: Some("sync-writer-test".to_string()),
1266 source_command_id: None,
1267 timestamp_ms: 1234,
1268 };
1269
1270 let first = writer
1271 .append_transition_with_fill_side_effects(
1272 1234,
1273 &[1, 2, 3],
1274 None,
1275 None,
1276 &EngineStateDigest::default(),
1277 &EngineStateDigest::default(),
1278 1,
1279 &[],
1280 &[],
1281 std::slice::from_ref(&update),
1282 request_uuid,
1283 Some(CommandType::DepositUpdate),
1284 )
1285 .expect("sync writer should persist balance updates");
1286 assert!(first.was_new_insert);
1287
1288 let duplicate = writer
1289 .append_transition_with_fill_side_effects(
1290 1234,
1291 &[1, 2, 3],
1292 None,
1293 None,
1294 &EngineStateDigest::default(),
1295 &EngineStateDigest::default(),
1296 1,
1297 &[],
1298 &[],
1299 std::slice::from_ref(&update),
1300 request_uuid,
1301 Some(CommandType::DepositUpdate),
1302 )
1303 .expect("duplicate request should be idempotent");
1304 assert!(!duplicate.was_new_insert);
1305
1306 let mut conn = pool.get().expect("db connection");
1307 let balance: rust_decimal::Decimal = hypercall_db_diesel::schema::account_balances::table
1308 .filter(hypercall_db_diesel::schema::account_balances::account_address.eq(wallet))
1309 .select(hypercall_db_diesel::schema::account_balances::balance)
1310 .first(&mut conn)
1311 .expect("projection balance");
1312 assert_eq!(balance, dec!(42));
1313 }
1314}