Skip to main content

hypercall/journal/
engine_journal.rs

1//! Durable engine journal for restart-safe command/event logging.
2//!
3//! This module provides persistent storage for engine commands and their resulting events,
4//! enabling:
5//! - Restart-safe ACK: API responses only after DB commit
6//! - Idempotency: duplicate request_id returns cached response
7//! - Auditability: full command + event trace queryable
8//! - Future replay: schema supports snapshot + replay recovery
9
10use crate::engine_enums_ext::EventTypeExt;
11use crate::observability::command_trace::EngineStateDigest;
12use diesel::prelude::*;
13use diesel::r2d2::PoolError;
14pub use hypercall_db::{JournalCommandSummary, JournalEventRecord, JournalFullRecord};
15use hypercall_db_diesel::engine_enums::{CommandType, DbUuid, EventType};
16use hypercall_db_diesel::schema::{engine_command_digests, engine_commands, engine_events};
17use hypercall_types::EngineMessage;
18use std::collections::HashMap;
19use std::sync::Arc;
20
21/// Trait for journal writers (enables mock implementations for testing)
22pub trait JournalWriter: Send + Sync {
23    /// Whether writes through this writer survive process restart.
24    fn is_durable(&self) -> bool {
25        false
26    }
27
28    /// Append a state transition to the journal.
29    /// Returns existing record if request_id already exists (idempotency).
30    #[allow(clippy::too_many_arguments)]
31    fn append_transition(
32        &self,
33        received_ts_ms: u64,
34        command_data: &[u8],
35        response_data: Option<&[u8]>,
36        order_id: Option<i64>,
37        pre_digest: &EngineStateDigest,
38        post_digest: &EngineStateDigest,
39        duration_ms: u64,
40        events: &[EngineMessage],
41        request_uuid: DbUuid,
42        command_type_enum: Option<CommandType>,
43    ) -> Result<JournalAppendResult, EngineJournalError>;
44
45    /// Append a transition while atomically materializing any fill side-effects.
46    ///
47    /// Writers that do not own durable fill persistence can ignore the side-effects
48    /// and fall back to `append_transition`.
49    #[allow(clippy::too_many_arguments)]
50    fn append_transition_with_fill_side_effects(
51        &self,
52        received_ts_ms: u64,
53        command_data: &[u8],
54        response_data: Option<&[u8]>,
55        order_id: Option<i64>,
56        pre_digest: &EngineStateDigest,
57        post_digest: &EngineStateDigest,
58        duration_ms: u64,
59        events: &[EngineMessage],
60        fill_side_effects: &[super::JournalFillSideEffect],
61        balance_updates: &[hypercall_types::BalanceUpdate],
62        request_uuid: DbUuid,
63        command_type_enum: Option<CommandType>,
64    ) -> Result<JournalAppendResult, EngineJournalError> {
65        let _ = fill_side_effects;
66        if !balance_updates.is_empty() {
67            return Err(EngineJournalError::InvalidUsage(
68                "append_transition_with_fill_side_effects received non-empty balance_updates but this writer does not persist them".to_string(),
69            ));
70        }
71        self.append_transition(
72            received_ts_ms,
73            command_data,
74            response_data,
75            order_id,
76            pre_digest,
77            post_digest,
78            duration_ms,
79            events,
80            request_uuid,
81            command_type_enum,
82        )
83    }
84
85    /// Get a journal record by request_id
86    fn get_by_request_id(
87        &self,
88        request_id: &uuid::Uuid,
89    ) -> Result<Option<JournalFullRecord>, EngineJournalError>;
90
91    /// Check if a request_id exists in the journal (lightweight existence check)
92    fn request_id_exists(&self, request_id: &uuid::Uuid) -> Result<bool, EngineJournalError>;
93
94    /// Get recent request_ids for idempotency cache warm-up
95    fn get_recent_request_ids(
96        &self,
97        since_hours: i64,
98    ) -> Result<std::collections::HashSet<uuid::Uuid>, EngineJournalError>;
99
100    /// Get recent journal command summaries (for monitoring)
101    fn get_recent(&self, limit: usize) -> Result<Vec<JournalCommandSummary>, EngineJournalError>;
102}
103
104/// Event persistence mode for journal writes.
105/// Controls the latency/durability tradeoff.
106#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
107pub enum EventPersistenceMode {
108    /// Persist full event JSON (highest durability, highest latency)
109    #[default]
110    Full,
111    /// Persist only event summaries (type, order_id, trade_id) - lower latency
112    Summary,
113    /// Don't persist events at all - only command + response + digests (lowest latency)
114    None,
115}
116
117impl EventPersistenceMode {
118    /// Parse from environment variable value
119    pub fn from_env_str(s: &str) -> Self {
120        match s.to_lowercase().as_str() {
121            "full" => EventPersistenceMode::Full,
122            "summary" => EventPersistenceMode::Summary,
123            "none" => EventPersistenceMode::None,
124            _ => {
125                tracing::warn!(
126                    "Unknown ENGINE_JOURNAL_EVENT_PERSISTENCE value '{}', defaulting to 'full'",
127                    s
128                );
129                EventPersistenceMode::Full
130            }
131        }
132    }
133
134    pub fn from_config(config: &crate::backend_config::JournalRuntimeConfig) -> Self {
135        Self::from_env_str(&config.event_persistence)
136    }
137}
138
139/// Get the set of critical event types that are always persisted in Full mode.
140/// These events are needed for the outbox publisher to replay state correctly.
141/// Configured via ENGINE_JOURNAL_CRITICAL_EVENT_TYPES env var.
142/// Default: "L2Update" (essential for cache state rebuild)
143fn get_critical_event_types(configured_values: &[String]) -> std::collections::HashSet<String> {
144    let configured: std::collections::HashSet<String> = configured_values
145        .iter()
146        .map(|value: &String| value.trim().to_string())
147        .filter(|value: &String| !value.is_empty())
148        .collect();
149    if !configured.is_empty() {
150        return configured;
151    }
152
153    ["L2Update".to_string()].into_iter().collect()
154}
155
156/// Check if an event type is critical and should always be persisted in Full mode
157fn is_critical_event_type(
158    event_type: &str,
159    critical_event_types: &std::collections::HashSet<String>,
160) -> bool {
161    critical_event_types.contains(event_type)
162}
163
164/// Error type for engine journal operations
165#[derive(Debug)]
166pub enum EngineJournalError {
167    Connection(String),
168    Query(String),
169    Serialization(String),
170    InvalidUsage(String),
171}
172
173impl std::fmt::Display for EngineJournalError {
174    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
175        match self {
176            EngineJournalError::Connection(s) => write!(f, "Database connection error: {}", s),
177            EngineJournalError::Query(s) => write!(f, "Query error: {}", s),
178            EngineJournalError::Serialization(s) => write!(f, "Serialization error: {}", s),
179            EngineJournalError::InvalidUsage(s) => write!(f, "Invalid journal usage: {}", s),
180        }
181    }
182}
183
184impl std::error::Error for EngineJournalError {}
185
186impl From<diesel::result::Error> for EngineJournalError {
187    fn from(err: diesel::result::Error) -> Self {
188        EngineJournalError::Query(err.to_string())
189    }
190}
191
192impl From<PoolError> for EngineJournalError {
193    fn from(err: PoolError) -> Self {
194        EngineJournalError::Connection(err.to_string())
195    }
196}
197
198impl From<serde_json::Error> for EngineJournalError {
199    fn from(err: serde_json::Error) -> Self {
200        EngineJournalError::Serialization(err.to_string())
201    }
202}
203
204fn ensure_no_fill_events(
205    events: &[EngineMessage],
206    entrypoint: &str,
207) -> Result<(), EngineJournalError> {
208    if events
209        .iter()
210        .any(|event| matches!(event, EngineMessage::OrderFilled { .. }))
211    {
212        return Err(EngineJournalError::InvalidUsage(format!(
213            "{} cannot journal OrderFilled events, use append_transition_with_fill_side_effects",
214            entrypoint
215        )));
216    }
217    Ok(())
218}
219
220/// Result of appending a transition to the journal
221#[derive(Debug, Clone)]
222pub struct JournalAppendResult {
223    /// Engine sequence number (command_id)
224    pub command_id: i64,
225    /// Response wire bytes (version byte + msgpack) for idempotent return
226    pub response_data: Option<Vec<u8>>,
227    /// Events that were persisted
228    pub events: Vec<EngineMessage>,
229    /// Whether this was a new insert (false if idempotent hit)
230    pub was_new_insert: bool,
231}
232
233/// Durable engine journal writer
234pub struct EngineJournalWriter {
235    pool: crate::db_handler::DbPool,
236    /// Event persistence mode (controls latency/durability tradeoff)
237    event_persistence_mode: EventPersistenceMode,
238    critical_event_types: std::collections::HashSet<String>,
239}
240
241impl EngineJournalWriter {
242    /// Create a new journal writer with the given database pool
243    pub fn new(pool: crate::db_handler::DbPool) -> Self {
244        Self {
245            pool,
246            event_persistence_mode: EventPersistenceMode::default(),
247            critical_event_types: get_critical_event_types(&[]),
248        }
249    }
250
251    /// Create a new journal writer with explicit event persistence mode
252    pub fn new_with_mode(pool: crate::db_handler::DbPool, mode: EventPersistenceMode) -> Self {
253        Self {
254            pool,
255            event_persistence_mode: mode,
256            critical_event_types: get_critical_event_types(&[]),
257        }
258    }
259
260    pub fn from_database_url_with_config(
261        database_url: &str,
262        pool_max: u32,
263        config: &crate::backend_config::JournalRuntimeConfig,
264    ) -> Result<Self, EngineJournalError> {
265        let auth = crate::db_handler::DbAuthConfig::password(database_url);
266        Self::from_auth_config(&auth, pool_max, config)
267    }
268
269    pub fn from_auth_config(
270        auth: &crate::db_handler::DbAuthConfig,
271        pool_max: u32,
272        config: &crate::backend_config::JournalRuntimeConfig,
273    ) -> Result<Self, EngineJournalError> {
274        let pool = hypercall_db_diesel::build_db_pool(auth, pool_max, 30_000, 10_000)
275            .map_err(|e| EngineJournalError::Connection(e.to_string()))?;
276        Ok(Self {
277            pool,
278            event_persistence_mode: EventPersistenceMode::from_config(config),
279            critical_event_types: get_critical_event_types(&config.critical_event_types),
280        })
281    }
282
283    /// Get the current event persistence mode
284    pub fn event_persistence_mode(&self) -> EventPersistenceMode {
285        self.event_persistence_mode
286    }
287
288    /// Append a state transition to the journal.
289    ///
290    /// This is transactional and provides idempotency:
291    /// - If request_id already exists, returns the existing record (no new inserts)
292    /// - If request_id is new, inserts command, events, and digests atomically
293    ///
294    /// # Arguments
295    /// * `received_ts_ms` - When the command was received
296    /// * `command_data` - Wire-format bytes (version byte + msgpack)
297    /// * `response_data` - Optional response wire bytes for idempotent returns
298    /// * `order_id` - Extracted order_id for indexed column
299    /// * `pre_digest` - State digest before applying
300    /// * `post_digest` - State digest after applying
301    /// * `duration_ms` - Processing duration
302    /// * `events` - Events generated by applying the command
303    /// * `request_uuid` - Unique request UUID for idempotency
304    /// * `command_type_enum` - Command type enum
305    #[allow(clippy::too_many_arguments)]
306    pub fn append_transition(
307        &self,
308        received_ts_ms: u64,
309        command_data: &[u8],
310        response_data: Option<&[u8]>,
311        order_id: Option<i64>,
312        pre_digest: &EngineStateDigest,
313        post_digest: &EngineStateDigest,
314        duration_ms: u64,
315        events: &[EngineMessage],
316        request_uuid: DbUuid,
317        command_type_enum: Option<CommandType>,
318    ) -> Result<JournalAppendResult, EngineJournalError> {
319        ensure_no_fill_events(events, "append_transition()")?;
320        self.append_transition_internal(
321            received_ts_ms,
322            command_data,
323            response_data,
324            order_id,
325            pre_digest,
326            post_digest,
327            duration_ms,
328            events,
329            &[],
330            &[],
331            request_uuid,
332            command_type_enum,
333        )
334    }
335
336    #[allow(clippy::too_many_arguments)]
337    fn append_transition_internal(
338        &self,
339        received_ts_ms: u64,
340        command_data: &[u8],
341        response_data: Option<&[u8]>,
342        order_id: Option<i64>,
343        pre_digest: &EngineStateDigest,
344        post_digest: &EngineStateDigest,
345        duration_ms: u64,
346        events: &[EngineMessage],
347        fill_side_effects: &[super::JournalFillSideEffect],
348        balance_updates: &[hypercall_types::BalanceUpdate],
349        request_uuid: DbUuid,
350        command_type_enum: Option<CommandType>,
351    ) -> Result<JournalAppendResult, EngineJournalError> {
352        let mut conn = self.pool.get()?;
353
354        // Check if request_uuid already exists (idempotency check)
355        if let Some(existing) = self.get_by_request_id_internal(&mut conn, &request_uuid.0)? {
356            // Deserialize stored msgpack bytes back to EngineMessage for idempotent return.
357            return Ok(JournalAppendResult {
358                command_id: existing.command_id,
359                response_data: existing.response_data,
360                events: existing
361                    .events
362                    .into_iter()
363                    .map(|e| {
364                        EngineMessage::deserialize_from_wire(&e.event_topic, &e.event_data)
365                            .expect("Failed to parse stored journal event payload")
366                    })
367                    .collect(),
368                was_new_insert: false,
369            });
370        }
371
372        // Serialize digests
373        // Serialize events to wire-format bytes (version byte + msgpack).
374        // All events are serialized in full — the Summary/None modes are deprecated
375        // since events now store pre-serialized wire-format bytes.
376        use crate::journal::engine_journal_batcher::EventPayload;
377        let event_payloads: Vec<EventPayload> = match self.event_persistence_mode {
378            EventPersistenceMode::Full => events
379                .iter()
380                .map(|e| {
381                    let event_type_enum = EventType::from_engine_message(e);
382                    let event_data = e
383                        .serialize_inner()
384                        .expect("Failed to serialize EngineMessage");
385                    let event_topic = e.topic().to_string();
386                    let event_key = e.partition_key();
387                    let l2_sequence = match e {
388                        EngineMessage::L2Update(l2) => l2.sequence,
389                        _ => None,
390                    };
391                    EventPayload {
392                        event_topic,
393                        event_key,
394                        event_data,
395                        l2_sequence,
396                        event_type_enum,
397                    }
398                })
399                .collect(),
400            EventPersistenceMode::Summary | EventPersistenceMode::None => {
401                // In Summary/None modes, only persist critical events (e.g., L2Update)
402                events
403                    .iter()
404                    .filter(|e| {
405                        self.event_persistence_mode == EventPersistenceMode::Summary
406                            || is_critical_event_type(
407                                EventType::from_engine_message(e).as_str(),
408                                &self.critical_event_types,
409                            )
410                    })
411                    .map(|e| {
412                        let event_type_enum = EventType::from_engine_message(e);
413                        let event_data = e
414                            .serialize_inner()
415                            .expect("Failed to serialize EngineMessage");
416                        let event_topic = e.topic().to_string();
417                        let event_key = e.partition_key();
418                        let l2_sequence = match e {
419                            EngineMessage::L2Update(l2) => l2.sequence,
420                            _ => None,
421                        };
422                        EventPayload {
423                            event_topic,
424                            event_key,
425                            event_data,
426                            l2_sequence,
427                            event_type_enum,
428                        }
429                    })
430                    .collect()
431            }
432        };
433
434        // Execute transaction with idempotent inserts
435        conn.transaction::<_, EngineJournalError, _>(|conn| {
436            // Insert command with ON CONFLICT DO NOTHING for idempotency
437            // This is faster than separate SELECT + INSERT
438            let insert_result: Option<i64> = diesel::insert_into(engine_commands::table)
439                .values((
440                    engine_commands::received_ts_ms.eq(received_ts_ms as i64),
441                    engine_commands::command_data.eq(command_data),
442                    engine_commands::response_data.eq(response_data),
443                    engine_commands::order_id.eq(order_id),
444                    engine_commands::request_uuid.eq(request_uuid),
445                    engine_commands::command_type_enum.eq(command_type_enum),
446                ))
447                .on_conflict(engine_commands::request_uuid)
448                .do_nothing()
449                .returning(engine_commands::command_id)
450                .get_result(conn)
451                .optional()?;
452
453            let (command_id, was_new_insert) = match insert_result {
454                Some(id) => (id, true),
455                None => {
456                    // Conflict - fetch existing command_id
457                    let existing_id: i64 = engine_commands::table
458                        .filter(engine_commands::request_uuid.eq(request_uuid))
459                        .select(engine_commands::command_id)
460                        .first(conn)?;
461                    (existing_id, false)
462                }
463            };
464
465            // Only insert events and digests if this was a new insert
466            if was_new_insert {
467                // Batch insert events (much faster than per-event loop)
468                if !event_payloads.is_empty() {
469                    let event_rows: Vec<_> = event_payloads
470                        .iter()
471                        .enumerate()
472                        .map(|(idx, event)| {
473                            (
474                                engine_events::command_id.eq(command_id),
475                                engine_events::event_idx.eq(idx as i32),
476                                engine_events::event_data.eq(&event.event_data),
477                                engine_events::event_key.eq(&event.event_key),
478                                engine_events::l2_sequence.eq(event.l2_sequence),
479                                engine_events::event_type_enum.eq(event.event_type_enum),
480                            )
481                        })
482                        .collect();
483
484                    diesel::insert_into(engine_events::table)
485                        .values(&event_rows)
486                        .execute(conn)?;
487                }
488
489                let fill_side_effects_by_idx: HashMap<i32, &super::JournalFillSideEffect> =
490                    fill_side_effects
491                        .iter()
492                        .map(|side_effect| (side_effect.event_idx, side_effect))
493                        .collect();
494                for (idx, event) in events.iter().enumerate() {
495                    let EngineMessage::OrderFilled { fill, .. } = event else {
496                        continue;
497                    };
498
499                    let side_effect = fill_side_effects_by_idx.get(&(idx as i32)).unwrap_or_else(|| {
500                        panic!(
501                            "CRITICAL_FAILURE: missing journal fill side-effect for sync command_id={}, event_idx={}, trade_id={}",
502                            command_id, idx, fill.trade_id
503                        )
504                    });
505                    crate::db_handler::DieselEventHandler::persist_fill_with_side_effects_in_tx(
506                        conn,
507                        fill,
508                        side_effect,
509                    )
510                    .map_err(|e| EngineJournalError::Query(e.to_string()))?;
511                }
512
513                for balance_update in balance_updates {
514                    hypercall_db_diesel::ledger_ops::apply_pnl_decimal_sync(
515                        conn,
516                        &balance_update.wallet,
517                        balance_update.delta,
518                    )
519                    .map_err(|e| EngineJournalError::Query(e.to_string()))?;
520                }
521
522                // Insert digests (dual-write JSONB + msgpack)
523                let pre_digest_data = hypercall_types::serialize_to_wire_bytes(pre_digest);
524                let post_digest_data =
525                    hypercall_types::serialize_to_wire_bytes(post_digest);
526                diesel::insert_into(engine_command_digests::table)
527                    .values((
528                        engine_command_digests::command_id.eq(command_id),
529                        engine_command_digests::duration_ms.eq(duration_ms as i64),
530                        engine_command_digests::pre_digest_data.eq(&pre_digest_data),
531                        engine_command_digests::post_digest_data.eq(&post_digest_data),
532                    ))
533                    .on_conflict(engine_command_digests::command_id)
534                    .do_nothing()
535                    .execute(conn)?;
536            }
537
538            Ok(JournalAppendResult {
539                command_id,
540                response_data: response_data.map(|d| d.to_vec()),
541                events: events.to_vec(),
542                was_new_insert,
543            })
544        })
545    }
546
547    /// Get a journal record by request_id
548    pub fn get_by_request_id(
549        &self,
550        request_id: &uuid::Uuid,
551    ) -> Result<Option<JournalFullRecord>, EngineJournalError> {
552        let mut conn = self.pool.get()?;
553        self.get_by_request_id_internal(&mut conn, request_id)
554    }
555
556    fn get_by_request_id_internal(
557        &self,
558        conn: &mut PgConnection,
559        request_id: &uuid::Uuid,
560    ) -> Result<Option<JournalFullRecord>, EngineJournalError> {
561        // Query command
562        let command: Option<(
563            i64,
564            DbUuid,
565            i64,
566            Option<CommandType>,
567            Option<Vec<u8>>,
568            Option<Vec<u8>>,
569            chrono::DateTime<chrono::Utc>,
570        )> = engine_commands::table
571            .filter(engine_commands::request_uuid.eq(DbUuid(*request_id)))
572            .select((
573                engine_commands::command_id,
574                engine_commands::request_uuid,
575                engine_commands::received_ts_ms,
576                engine_commands::command_type_enum,
577                engine_commands::command_data,
578                engine_commands::response_data,
579                engine_commands::created_at,
580            ))
581            .first(conn)
582            .optional()?;
583
584        let Some((
585            command_id,
586            req_id,
587            received_ts_ms,
588            command_type,
589            command_data,
590            response_data,
591            created_at,
592        )) = command
593        else {
594            return Ok(None);
595        };
596
597        // Query events
598        let events: Vec<(i64, i32, EventType, Vec<u8>, Option<String>, Option<i64>)> =
599            engine_events::table
600                .filter(engine_events::command_id.eq(command_id))
601                .order(engine_events::event_idx.asc())
602                .select((
603                    engine_events::event_id,
604                    engine_events::event_idx,
605                    engine_events::event_type_enum,
606                    engine_events::event_data,
607                    engine_events::event_key,
608                    engine_events::l2_sequence,
609                ))
610                .load(conn)?;
611
612        // Query digests
613        let digest: Option<(Vec<u8>, Vec<u8>, i64)> = engine_command_digests::table
614            .filter(engine_command_digests::command_id.eq(command_id))
615            .select((
616                engine_command_digests::pre_digest_data,
617                engine_command_digests::post_digest_data,
618                engine_command_digests::duration_ms,
619            ))
620            .first(conn)
621            .optional()?;
622
623        let (pre_digest, post_digest, duration_ms) = digest
624            .map(|(pre, post, dur)| {
625                let pre =
626                    rmp_serde::from_slice::<serde_json::Value>(&pre[1..]).unwrap_or_else(|e| {
627                        panic!(
628                            "Failed to decode pre_digest_data msgpack (command_id={}): {}",
629                            command_id, e
630                        )
631                    });
632                let post =
633                    rmp_serde::from_slice::<serde_json::Value>(&post[1..]).unwrap_or_else(|e| {
634                        panic!(
635                            "Failed to decode post_digest_data msgpack (command_id={}): {}",
636                            command_id, e
637                        )
638                    });
639                (Some(pre), Some(post), Some(dur))
640            })
641            .unwrap_or((None, None, None));
642
643        Ok(Some(JournalFullRecord {
644            command_id,
645            request_id: req_id.to_string(),
646            received_ts_ms,
647            command_type: command_type
648                .map(|ct| ct.as_str().to_string())
649                .unwrap_or_default(),
650            command_data: command_data.expect("command_data is NULL — run backfill first"),
651            response_data,
652            pre_digest,
653            post_digest,
654            duration_ms,
655            events: events
656                .into_iter()
657                .map(
658                    |(event_id, event_idx, event_type_enum, event_data, event_key, l2_sequence)| {
659                        JournalEventRecord {
660                            event_id,
661                            event_idx,
662                            event_type: event_type_enum.as_str().to_string(),
663                            event_data,
664                            event_topic: event_type_enum.topic().to_string(),
665                            event_key,
666                            l2_sequence,
667                        }
668                    },
669                )
670                .collect(),
671            created_at,
672        }))
673    }
674
675    /// Get recent journal command summaries
676    pub fn get_recent(
677        &self,
678        limit: usize,
679    ) -> Result<Vec<JournalCommandSummary>, EngineJournalError> {
680        let mut conn = self.pool.get()?;
681
682        // Query recent commands with event counts
683        let commands: Vec<(
684            i64,
685            DbUuid,
686            i64,
687            Option<CommandType>,
688            Option<Vec<u8>>,
689            chrono::DateTime<chrono::Utc>,
690        )> = engine_commands::table
691            .order(engine_commands::created_at.desc())
692            .limit(limit as i64)
693            .select((
694                engine_commands::command_id,
695                engine_commands::request_uuid,
696                engine_commands::received_ts_ms,
697                engine_commands::command_type_enum,
698                engine_commands::response_data,
699                engine_commands::created_at,
700            ))
701            .load(&mut conn)?;
702
703        let mut summaries = Vec::with_capacity(commands.len());
704        for (command_id, request_id, received_ts_ms, command_type, response_data, created_at) in
705            commands
706        {
707            // Count events
708            let event_count: i64 = engine_events::table
709                .filter(engine_events::command_id.eq(command_id))
710                .count()
711                .get_result(&mut conn)?;
712
713            // Get first 5 event types as a sample
714            let event_types_sample: Vec<EventType> = engine_events::table
715                .filter(engine_events::command_id.eq(command_id))
716                .order(engine_events::event_idx.asc())
717                .limit(5)
718                .select(engine_events::event_type_enum)
719                .load(&mut conn)?;
720
721            // Get digests
722            let digest: Option<(Vec<u8>, Vec<u8>, i64)> = engine_command_digests::table
723                .filter(engine_command_digests::command_id.eq(command_id))
724                .select((
725                    engine_command_digests::pre_digest_data,
726                    engine_command_digests::post_digest_data,
727                    engine_command_digests::duration_ms,
728                ))
729                .first(&mut conn)
730                .optional()?;
731
732            let (pre_digest, post_digest, duration_ms) = digest
733                .map(|(pre, post, dur)| {
734                    let pre =
735                        rmp_serde::from_slice::<serde_json::Value>(&pre[1..]).unwrap_or_else(|e| {
736                            panic!(
737                                "Failed to decode pre_digest_data msgpack (command_id={}): {}",
738                                command_id, e
739                            )
740                        });
741                    let post = rmp_serde::from_slice::<serde_json::Value>(&post[1..])
742                        .unwrap_or_else(|e| {
743                            panic!(
744                                "Failed to decode post_digest_data msgpack (command_id={}): {}",
745                                command_id, e
746                            )
747                        });
748                    (Some(pre), Some(post), Some(dur))
749                })
750                .unwrap_or((None, None, None));
751
752            summaries.push(JournalCommandSummary {
753                command_id,
754                request_id: request_id.to_string(),
755                received_ts_ms,
756                command_type: command_type
757                    .map(|ct| ct.as_str().to_string())
758                    .unwrap_or_default(),
759                response_data,
760                event_count,
761                event_types_sample: event_types_sample
762                    .into_iter()
763                    .map(|e| e.as_str().to_string())
764                    .collect(),
765                pre_digest,
766                post_digest,
767                duration_ms,
768                created_at,
769            });
770        }
771
772        Ok(summaries)
773    }
774
775    /// Get recent request_ids for idempotency cache warm-up.
776    /// Returns request_ids from commands received in the last `since_hours` hours.
777    /// This is used at startup to populate the in-memory HashSet for fast idempotency checks.
778    pub fn get_recent_request_ids(
779        &self,
780        since_hours: i64,
781    ) -> Result<std::collections::HashSet<uuid::Uuid>, EngineJournalError> {
782        let mut conn = self.pool.get()?;
783
784        // Calculate cutoff timestamp
785        let cutoff = chrono::Utc::now() - chrono::Duration::hours(since_hours);
786
787        let request_ids: Vec<DbUuid> = engine_commands::table
788            .filter(engine_commands::created_at.ge(cutoff))
789            .select(engine_commands::request_uuid)
790            .load(&mut conn)?;
791
792        let count = request_ids.len();
793        tracing::info!(
794            "Loaded {} request_ids from journal for idempotency cache (since {} hours ago)",
795            count,
796            since_hours
797        );
798
799        Ok(request_ids.into_iter().map(|id| id.0).collect())
800    }
801
802    /// Check if a request_id exists in the journal (lightweight existence check).
803    /// Returns true if the request_id has been processed before.
804    pub fn request_id_exists(&self, request_id: &uuid::Uuid) -> Result<bool, EngineJournalError> {
805        let mut conn = self.pool.get()?;
806
807        let count: i64 = engine_commands::table
808            .filter(engine_commands::request_uuid.eq(DbUuid(*request_id)))
809            .count()
810            .get_result(&mut conn)?;
811
812        Ok(count > 0)
813    }
814}
815
816impl JournalWriter for EngineJournalWriter {
817    fn is_durable(&self) -> bool {
818        true
819    }
820
821    fn append_transition(
822        &self,
823        received_ts_ms: u64,
824        command_data: &[u8],
825        response_data: Option<&[u8]>,
826        order_id: Option<i64>,
827        pre_digest: &EngineStateDigest,
828        post_digest: &EngineStateDigest,
829        duration_ms: u64,
830        events: &[EngineMessage],
831        request_uuid: DbUuid,
832        command_type_enum: Option<CommandType>,
833    ) -> Result<JournalAppendResult, EngineJournalError> {
834        EngineJournalWriter::append_transition(
835            self,
836            received_ts_ms,
837            command_data,
838            response_data,
839            order_id,
840            pre_digest,
841            post_digest,
842            duration_ms,
843            events,
844            request_uuid,
845            command_type_enum,
846        )
847    }
848
849    fn append_transition_with_fill_side_effects(
850        &self,
851        received_ts_ms: u64,
852        command_data: &[u8],
853        response_data: Option<&[u8]>,
854        order_id: Option<i64>,
855        pre_digest: &EngineStateDigest,
856        post_digest: &EngineStateDigest,
857        duration_ms: u64,
858        events: &[EngineMessage],
859        fill_side_effects: &[super::JournalFillSideEffect],
860        balance_updates: &[hypercall_types::BalanceUpdate],
861        request_uuid: DbUuid,
862        command_type_enum: Option<CommandType>,
863    ) -> Result<JournalAppendResult, EngineJournalError> {
864        self.append_transition_internal(
865            received_ts_ms,
866            command_data,
867            response_data,
868            order_id,
869            pre_digest,
870            post_digest,
871            duration_ms,
872            events,
873            fill_side_effects,
874            balance_updates,
875            request_uuid,
876            command_type_enum,
877        )
878    }
879
880    fn get_by_request_id(
881        &self,
882        request_id: &uuid::Uuid,
883    ) -> Result<Option<JournalFullRecord>, EngineJournalError> {
884        EngineJournalWriter::get_by_request_id(self, request_id)
885    }
886
887    fn request_id_exists(&self, request_id: &uuid::Uuid) -> Result<bool, EngineJournalError> {
888        EngineJournalWriter::request_id_exists(self, request_id)
889    }
890
891    fn get_recent_request_ids(
892        &self,
893        since_hours: i64,
894    ) -> Result<std::collections::HashSet<uuid::Uuid>, EngineJournalError> {
895        EngineJournalWriter::get_recent_request_ids(self, since_hours)
896    }
897
898    fn get_recent(&self, limit: usize) -> Result<Vec<JournalCommandSummary>, EngineJournalError> {
899        EngineJournalWriter::get_recent(self, limit)
900    }
901}
902
903/// In-memory journal writer for testing.
904/// Exercises real idempotency logic without requiring a database.
905pub struct InMemoryJournalWriter {
906    entries: std::sync::Mutex<HashMap<uuid::Uuid, JournalFullRecord>>,
907    next_command_id: std::sync::atomic::AtomicI64,
908    durable: bool,
909}
910
911impl InMemoryJournalWriter {
912    /// Create a new in-memory journal writer
913    pub fn new() -> Self {
914        Self::with_durability(false)
915    }
916
917    fn with_durability(durable: bool) -> Self {
918        Self {
919            entries: std::sync::Mutex::new(HashMap::new()),
920            next_command_id: std::sync::atomic::AtomicI64::new(1),
921            durable,
922        }
923    }
924
925    #[cfg(test)]
926    pub(crate) fn durable_for_tests() -> Self {
927        Self::with_durability(true)
928    }
929
930    /// Get all stored entries (for test verification)
931    pub fn get_all_entries(&self) -> Vec<JournalFullRecord> {
932        let entries = self.entries.lock().unwrap();
933        entries.values().cloned().collect()
934    }
935
936    /// Clear all entries (for test isolation)
937    pub fn clear(&self) {
938        let mut entries = self.entries.lock().unwrap();
939        entries.clear();
940    }
941
942    /// Get number of entries stored
943    pub fn len(&self) -> usize {
944        let entries = self.entries.lock().unwrap();
945        entries.len()
946    }
947
948    /// Check if empty
949    pub fn is_empty(&self) -> bool {
950        self.len() == 0
951    }
952}
953
954#[cfg(test)]
955pub(crate) fn request_uuid_for_test(uuid: uuid::Uuid) -> DbUuid {
956    DbUuid(uuid)
957}
958
959impl Default for InMemoryJournalWriter {
960    fn default() -> Self {
961        Self::new()
962    }
963}
964
965impl JournalWriter for InMemoryJournalWriter {
966    fn is_durable(&self) -> bool {
967        self.durable
968    }
969
970    fn append_transition(
971        &self,
972        received_ts_ms: u64,
973        command_data: &[u8],
974        response_data: Option<&[u8]>,
975        _order_id: Option<i64>,
976        pre_digest: &EngineStateDigest,
977        post_digest: &EngineStateDigest,
978        duration_ms: u64,
979        events: &[EngineMessage],
980        request_uuid: DbUuid,
981        command_type_enum: Option<CommandType>,
982    ) -> Result<JournalAppendResult, EngineJournalError> {
983        let mut entries = self.entries.lock().unwrap();
984
985        // Idempotency check - return existing record if request_uuid exists
986        if let Some(existing) = entries.get(&request_uuid.0) {
987            return Ok(JournalAppendResult {
988                command_id: existing.command_id,
989                response_data: existing.response_data.clone(),
990                events: existing
991                    .events
992                    .iter()
993                    .map(|e| {
994                        EngineMessage::deserialize_from_wire(&e.event_topic, &e.event_data)
995                            .expect("Failed to parse in-memory journal event payload")
996                    })
997                    .collect(),
998                was_new_insert: false,
999            });
1000        }
1001
1002        // Create new entry
1003        let command_id = self
1004            .next_command_id
1005            .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1006
1007        let event_records: Vec<JournalEventRecord> = events
1008            .iter()
1009            .enumerate()
1010            .map(|(idx, e)| {
1011                let event_type = EventType::from_engine_message(e).as_str().to_string();
1012                let event_data = e
1013                    .serialize_inner()
1014                    .expect("Failed to serialize EngineMessage");
1015                let event_topic = e.topic().to_string();
1016                let event_key = e.partition_key();
1017                let l2_sequence = match e {
1018                    EngineMessage::L2Update(l2) => l2.sequence,
1019                    _ => None,
1020                };
1021                JournalEventRecord {
1022                    event_id: (command_id * 1000) + idx as i64, // Synthetic event_id
1023                    event_idx: idx as i32,
1024                    event_type,
1025                    event_data,
1026                    event_topic,
1027                    event_key,
1028                    l2_sequence,
1029                }
1030            })
1031            .collect();
1032
1033        let record = JournalFullRecord {
1034            command_id,
1035            request_id: request_uuid.0.to_string(),
1036            received_ts_ms: received_ts_ms as i64,
1037            command_type: command_type_enum
1038                .map(|ct| ct.as_str().to_string())
1039                .unwrap_or_default(),
1040            command_data: command_data.to_vec(),
1041            response_data: response_data.map(|d| d.to_vec()),
1042            pre_digest: serde_json::to_value(pre_digest).ok(),
1043            post_digest: serde_json::to_value(post_digest).ok(),
1044            duration_ms: Some(duration_ms as i64),
1045            events: event_records,
1046            created_at: chrono::Utc::now(),
1047        };
1048
1049        entries.insert(request_uuid.0, record);
1050
1051        Ok(JournalAppendResult {
1052            command_id,
1053            response_data: response_data.map(|d| d.to_vec()),
1054            events: events.to_vec(),
1055            was_new_insert: true,
1056        })
1057    }
1058
1059    #[allow(clippy::too_many_arguments)]
1060    fn append_transition_with_fill_side_effects(
1061        &self,
1062        received_ts_ms: u64,
1063        command_data: &[u8],
1064        response_data: Option<&[u8]>,
1065        order_id: Option<i64>,
1066        pre_digest: &EngineStateDigest,
1067        post_digest: &EngineStateDigest,
1068        duration_ms: u64,
1069        events: &[EngineMessage],
1070        _fill_side_effects: &[super::JournalFillSideEffect],
1071        _balance_updates: &[hypercall_types::BalanceUpdate],
1072        request_uuid: DbUuid,
1073        command_type_enum: Option<CommandType>,
1074    ) -> Result<JournalAppendResult, EngineJournalError> {
1075        self.append_transition(
1076            received_ts_ms,
1077            command_data,
1078            response_data,
1079            order_id,
1080            pre_digest,
1081            post_digest,
1082            duration_ms,
1083            events,
1084            request_uuid,
1085            command_type_enum,
1086        )
1087    }
1088
1089    fn get_by_request_id(
1090        &self,
1091        request_id: &uuid::Uuid,
1092    ) -> Result<Option<JournalFullRecord>, EngineJournalError> {
1093        let entries = self.entries.lock().unwrap();
1094        Ok(entries.get(request_id).cloned())
1095    }
1096
1097    fn request_id_exists(&self, request_id: &uuid::Uuid) -> Result<bool, EngineJournalError> {
1098        let entries = self.entries.lock().unwrap();
1099        Ok(entries.contains_key(request_id))
1100    }
1101
1102    fn get_recent_request_ids(
1103        &self,
1104        _since_hours: i64,
1105    ) -> Result<std::collections::HashSet<uuid::Uuid>, EngineJournalError> {
1106        let entries = self.entries.lock().unwrap();
1107        Ok(entries.keys().cloned().collect())
1108    }
1109
1110    fn get_recent(&self, limit: usize) -> Result<Vec<JournalCommandSummary>, EngineJournalError> {
1111        let entries = self.entries.lock().unwrap();
1112        let mut summaries: Vec<JournalCommandSummary> = entries
1113            .values()
1114            .map(|record| JournalCommandSummary {
1115                command_id: record.command_id,
1116                request_id: record.request_id.clone(),
1117                received_ts_ms: record.received_ts_ms,
1118                command_type: record.command_type.clone(),
1119                response_data: record.response_data.clone(),
1120                event_count: record.events.len() as i64,
1121                event_types_sample: record
1122                    .events
1123                    .iter()
1124                    .take(5)
1125                    .map(|e| e.event_type.clone())
1126                    .collect(),
1127                pre_digest: record.pre_digest.clone(),
1128                post_digest: record.post_digest.clone(),
1129                duration_ms: record.duration_ms,
1130                created_at: record.created_at,
1131            })
1132            .collect();
1133
1134        // Sort by created_at descending and take limit
1135        summaries.sort_by_key(|s| std::cmp::Reverse(s.created_at));
1136        summaries.truncate(limit);
1137        Ok(summaries)
1138    }
1139}
1140
1141/// Thread-safe wrapper for journal writers (trait object for mock support)
1142pub type SharedEngineJournalWriter = Arc<dyn JournalWriter>;
1143
1144#[cfg(test)]
1145mod tests {
1146    use super::*;
1147
1148    #[cfg(feature = "test-utils")]
1149    async fn setup_test_pool() -> (
1150        hypercall_db_diesel::DbPool,
1151        Option<testcontainers::ContainerAsync<testcontainers_modules::postgres::Postgres>>,
1152        crate::test_contracts::TestnetContractEnvGuard,
1153    ) {
1154        use crate::test_contracts::test_database_url_with_database;
1155        use testcontainers::runners::AsyncRunner;
1156        use testcontainers::ImageExt;
1157        use testcontainers_modules::postgres::Postgres;
1158
1159        let env_guard = crate::test_contracts::TestnetContractEnvGuard::apply();
1160
1161        let (database_url, container) = if let Ok(url) = std::env::var("TEST_DATABASE_URL") {
1162            let db_name = format!("test_sync_journal_{}", uuid::Uuid::new_v4().simple());
1163            let admin_pool = sqlx::postgres::PgPoolOptions::new()
1164                .max_connections(1)
1165                .connect(&url)
1166                .await
1167                .expect("connect to TEST_DATABASE_URL");
1168            sqlx::query(&format!(r#"CREATE DATABASE "{}""#, db_name))
1169                .execute(&admin_pool)
1170                .await
1171                .expect("create isolated test database");
1172            admin_pool.close().await;
1173            let db_url = test_database_url_with_database(&url, &db_name);
1174            (db_url, None)
1175        } else {
1176            let container = Postgres::default()
1177                .with_db_name("test_sync_journal")
1178                .with_user("test_user")
1179                .with_password("test_password")
1180                .with_tag("16-alpine")
1181                .start()
1182                .await
1183                .expect("start postgres container");
1184
1185            let port = container
1186                .get_host_port_ipv4(5432)
1187                .await
1188                .expect("get postgres port");
1189            let url = format!(
1190                "postgresql://test_user:test_password@127.0.0.1:{}/test_sync_journal",
1191                port
1192            );
1193            tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
1194
1195            (url, Some(container))
1196        };
1197
1198        let _handler = crate::db_handler::DieselEventHandler::new(&database_url)
1199            .expect("create diesel handler and run migrations");
1200        let auth = hypercall_db_diesel::DbAuthConfig::password(&database_url);
1201        let pool = hypercall_db_diesel::build_db_pool(&auth, 5, 30_000, 10_000)
1202            .expect("build diesel pool");
1203
1204        (pool, container, env_guard)
1205    }
1206
1207    #[test]
1208    fn test_event_type_topic_derive() {
1209        // Just verify the function doesn't panic
1210        use hypercall_types::OrderActionMessage;
1211        use hypercall_types::{OrderInfo, Side, WalletAddress};
1212        use rust_decimal_macros::dec;
1213        use std::str::FromStr;
1214
1215        let wallet = WalletAddress::from_str("0x1234567890123456789012345678901234567890").unwrap();
1216        let msg = OrderActionMessage {
1217            timestamp: 1000,
1218            info: OrderInfo {
1219                symbol: "TEST".to_string(),
1220                price: dec!(1.0),
1221                size: dec!(1.0),
1222                side: Side::Buy,
1223                tif: hypercall_types::TimeInForce::GTC,
1224                client_id: None,
1225                order_id: None,
1226                is_perp: false,
1227                underlying: None,
1228                reduce_only: None,
1229                nonce: None,
1230                signature: None,
1231                mmp_enabled: false,
1232                builder_code_address: None,
1233            },
1234            action: hypercall_types::OrderAction::CreateOrder,
1235            wallet,
1236            api_wallet_address: None,
1237            mmp_triggered: false,
1238            request_id: None,
1239        };
1240
1241        let event = EngineMessage::OrderAction(msg);
1242        assert_eq!(
1243            EventType::from_engine_message(&event).as_str(),
1244            "OrderAction"
1245        );
1246    }
1247
1248    #[cfg(feature = "test-utils")]
1249    #[tokio::test]
1250    #[serial_test::serial(testnet_env)]
1251    async fn sync_writer_materializes_balance_updates_once() {
1252        use diesel::prelude::*;
1253        use rust_decimal_macros::dec;
1254
1255        let (pool, _container, _env_guard) = setup_test_pool().await;
1256        let writer = EngineJournalWriter::new(pool.clone());
1257        let request_uuid = DbUuid(uuid::Uuid::new_v4());
1258        let wallet = hypercall_types::wallet_address::test_wallet(11);
1259        let update = hypercall_types::BalanceUpdate {
1260            balance_update_seq: 1,
1261            wallet,
1262            delta: dec!(42),
1263            balance_after: dec!(42),
1264            reason: hypercall_types::BalanceUpdateReason::Deposit,
1265            reference_id: Some("sync-writer-test".to_string()),
1266            source_command_id: None,
1267            timestamp_ms: 1234,
1268        };
1269
1270        let first = writer
1271            .append_transition_with_fill_side_effects(
1272                1234,
1273                &[1, 2, 3],
1274                None,
1275                None,
1276                &EngineStateDigest::default(),
1277                &EngineStateDigest::default(),
1278                1,
1279                &[],
1280                &[],
1281                std::slice::from_ref(&update),
1282                request_uuid,
1283                Some(CommandType::DepositUpdate),
1284            )
1285            .expect("sync writer should persist balance updates");
1286        assert!(first.was_new_insert);
1287
1288        let duplicate = writer
1289            .append_transition_with_fill_side_effects(
1290                1234,
1291                &[1, 2, 3],
1292                None,
1293                None,
1294                &EngineStateDigest::default(),
1295                &EngineStateDigest::default(),
1296                1,
1297                &[],
1298                &[],
1299                std::slice::from_ref(&update),
1300                request_uuid,
1301                Some(CommandType::DepositUpdate),
1302            )
1303            .expect("duplicate request should be idempotent");
1304        assert!(!duplicate.was_new_insert);
1305
1306        let mut conn = pool.get().expect("db connection");
1307        let balance: rust_decimal::Decimal = hypercall_db_diesel::schema::account_balances::table
1308            .filter(hypercall_db_diesel::schema::account_balances::account_address.eq(wallet))
1309            .select(hypercall_db_diesel::schema::account_balances::balance)
1310            .first(&mut conn)
1311            .expect("projection balance");
1312        assert_eq!(balance, dec!(42));
1313    }
1314}