Skip to main content

hypercall_db/
journal_records.rs

1use serde::{Deserialize, Serialize};
2
3const WIRE_VERSION: u8 = 0x01;
4
5fn assert_wire_header(bytes: &[u8], context: String) {
6    assert!(bytes.len() > 1, "{} missing version/payload bytes", context);
7    assert_eq!(
8        bytes[0], WIRE_VERSION,
9        "{} unsupported wire version {} (expected {})",
10        context, bytes[0], WIRE_VERSION
11    );
12}
13
14#[derive(Debug, Clone, Deserialize)]
15pub struct JournalCommandSummary {
16    pub command_id: i64,
17    pub request_id: String,
18    pub received_ts_ms: i64,
19    pub command_type: String,
20    pub response_data: Option<Vec<u8>>,
21    pub event_count: i64,
22    pub event_types_sample: Vec<String>,
23    pub pre_digest: Option<serde_json::Value>,
24    pub post_digest: Option<serde_json::Value>,
25    pub duration_ms: Option<i64>,
26    pub created_at: chrono::DateTime<chrono::Utc>,
27}
28
29impl JournalCommandSummary {
30    pub fn response_json(&self) -> Option<serde_json::Value> {
31        self.response_data.as_ref().map(|data| {
32            assert_wire_header(
33                data,
34                format!(
35                    "JournalCommandSummary response_data invalid (command_id={})",
36                    self.command_id
37                ),
38            );
39            rmp_serde::from_slice::<serde_json::Value>(&data[1..]).unwrap_or_else(|e| {
40                panic!(
41                    "Failed to decode journal response_data msgpack (command_id={}): {}",
42                    self.command_id, e
43                )
44            })
45        })
46    }
47}
48
49impl Serialize for JournalCommandSummary {
50    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
51        use serde::ser::SerializeStruct;
52        let mut state = serializer.serialize_struct("JournalCommandSummary", 11)?;
53        state.serialize_field("command_id", &self.command_id)?;
54        state.serialize_field("request_id", &self.request_id)?;
55        state.serialize_field("received_ts_ms", &self.received_ts_ms)?;
56        state.serialize_field("command_type", &self.command_type)?;
57        state.serialize_field("response_json", &self.response_json())?;
58        state.serialize_field("event_count", &self.event_count)?;
59        state.serialize_field("event_types_sample", &self.event_types_sample)?;
60        state.serialize_field("pre_digest", &self.pre_digest)?;
61        state.serialize_field("post_digest", &self.post_digest)?;
62        state.serialize_field("duration_ms", &self.duration_ms)?;
63        state.serialize_field("created_at", &self.created_at)?;
64        state.end()
65    }
66}
67
68#[derive(Debug, Clone, Deserialize)]
69pub struct JournalFullRecord {
70    pub command_id: i64,
71    pub request_id: String,
72    pub received_ts_ms: i64,
73    pub command_type: String,
74    pub command_data: Vec<u8>,
75    pub response_data: Option<Vec<u8>>,
76    pub pre_digest: Option<serde_json::Value>,
77    pub post_digest: Option<serde_json::Value>,
78    pub duration_ms: Option<i64>,
79    pub events: Vec<JournalEventRecord>,
80    pub created_at: chrono::DateTime<chrono::Utc>,
81}
82
83impl Serialize for JournalFullRecord {
84    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
85        use serde::ser::SerializeStruct;
86        let mut state = serializer.serialize_struct("JournalFullRecord", 11)?;
87        state.serialize_field("command_id", &self.command_id)?;
88        state.serialize_field("request_id", &self.request_id)?;
89        state.serialize_field("received_ts_ms", &self.received_ts_ms)?;
90        state.serialize_field("command_type", &self.command_type)?;
91        state.serialize_field("command_json", &self.command_json())?;
92        state.serialize_field("response_json", &self.response_json())?;
93        state.serialize_field("pre_digest", &self.pre_digest)?;
94        state.serialize_field("post_digest", &self.post_digest)?;
95        state.serialize_field("duration_ms", &self.duration_ms)?;
96        state.serialize_field("events", &self.events)?;
97        state.serialize_field("created_at", &self.created_at)?;
98        state.end()
99    }
100}
101
102impl JournalFullRecord {
103    pub fn command_json(&self) -> serde_json::Value {
104        assert_wire_header(
105            &self.command_data,
106            format!(
107                "JournalFullRecord command_data invalid (command_id={})",
108                self.command_id
109            ),
110        );
111        rmp_serde::from_slice::<serde_json::Value>(&self.command_data[1..]).unwrap_or_else(|e| {
112            panic!(
113                "Failed to decode journal command_data msgpack (command_id={}): {}",
114                self.command_id, e
115            )
116        })
117    }
118
119    pub fn response_json(&self) -> Option<serde_json::Value> {
120        self.response_data.as_ref().map(|data| {
121            assert_wire_header(
122                data,
123                format!(
124                    "JournalFullRecord response_data invalid (command_id={})",
125                    self.command_id
126                ),
127            );
128            rmp_serde::from_slice::<serde_json::Value>(&data[1..]).unwrap_or_else(|e| {
129                panic!(
130                    "Failed to decode journal response_data msgpack (command_id={}): {}",
131                    self.command_id, e
132                )
133            })
134        })
135    }
136
137    pub fn decode_response(&self) -> Option<hypercall_types::OrderUpdateMessage> {
138        self.response_data.as_ref().map(|data| {
139            assert_wire_header(
140                data,
141                format!(
142                    "JournalFullRecord response_data invalid (command_id={})",
143                    self.command_id
144                ),
145            );
146            rmp_serde::from_slice(&data[1..]).unwrap_or_else(|e| {
147                panic!(
148                    "Failed to decode response_data msgpack for idempotency (command_id={}): {}",
149                    self.command_id, e
150                )
151            })
152        })
153    }
154}
155
156#[derive(Debug, Clone, Deserialize)]
157pub struct JournalEventRecord {
158    pub event_id: i64,
159    pub event_idx: i32,
160    pub event_type: String,
161    pub event_data: Vec<u8>,
162    pub event_topic: String,
163    pub event_key: Option<String>,
164    pub l2_sequence: Option<i64>,
165}
166
167impl JournalEventRecord {
168    pub fn event_json(&self) -> serde_json::Value {
169        assert_wire_header(
170            &self.event_data,
171            format!(
172                "JournalEventRecord event_data invalid (event_id={})",
173                self.event_id
174            ),
175        );
176        rmp_serde::from_slice::<serde_json::Value>(&self.event_data[1..]).unwrap_or_else(|e| {
177            panic!(
178                "Failed to decode journal event_data msgpack (event_id={}, type={}): {}",
179                self.event_id, self.event_type, e
180            )
181        })
182    }
183}
184
185impl Serialize for JournalEventRecord {
186    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
187        use serde::ser::SerializeStruct;
188        let mut state = serializer.serialize_struct("JournalEventRecord", 5)?;
189        state.serialize_field("event_id", &self.event_id)?;
190        state.serialize_field("event_idx", &self.event_idx)?;
191        state.serialize_field("event_type", &self.event_type)?;
192        state.serialize_field("event_json", &self.event_json())?;
193        state.serialize_field("l2_sequence", &self.l2_sequence)?;
194        state.end()
195    }
196}