1pub use hypercall_types::observability::{EngineStateDigest, SymbolSummary};
12use serde::{Deserialize, Serialize};
13use std::collections::VecDeque;
14use tokio::sync::RwLock;
15
16const DEFAULT_CAPACITY: usize = 1000;
18
19pub struct CommandTraceStore {
21 cap: usize,
23 inner: RwLock<VecDeque<CommandTraceRecord>>,
25}
26
27impl CommandTraceStore {
28 pub fn new(capacity: usize) -> Self {
30 Self {
31 cap: capacity,
32 inner: RwLock::new(VecDeque::with_capacity(capacity)),
33 }
34 }
35
36 pub fn with_default_capacity() -> Self {
38 Self::new(DEFAULT_CAPACITY)
39 }
40
41 pub async fn push(&self, record: CommandTraceRecord) {
44 let mut inner = self.inner.write().await;
45 if inner.len() >= self.cap {
46 inner.pop_front();
47 }
48 inner.push_back(record);
49 }
50
51 pub async fn get_recent(&self, limit: usize) -> Vec<CommandTraceRecord> {
53 let inner = self.inner.read().await;
54 inner.iter().rev().take(limit).cloned().collect()
55 }
56
57 pub async fn list_recent(&self, limit: usize) -> Vec<CommandTraceRecord> {
59 self.get_recent(limit).await
60 }
61
62 pub async fn find_by_request_id(&self, request_id: &str) -> Vec<CommandTraceRecord> {
64 let inner = self.inner.read().await;
65 inner
66 .iter()
67 .filter(|r| r.request_id == request_id)
68 .cloned()
69 .collect()
70 }
71
72 pub async fn len(&self) -> usize {
74 let inner = self.inner.read().await;
75 inner.len()
76 }
77
78 pub async fn is_empty(&self) -> bool {
80 let inner = self.inner.read().await;
81 inner.is_empty()
82 }
83
84 pub async fn clear(&self) {
86 let mut inner = self.inner.write().await;
87 inner.clear();
88 }
89}
90
91impl Default for CommandTraceStore {
92 fn default() -> Self {
93 Self::with_default_capacity()
94 }
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
99#[serde(rename_all = "snake_case")]
100#[derive(Default)]
101pub enum JournalStatus {
102 Journaled,
104 #[default]
106 JournalDisabled,
107 JournalFailed,
109 InternalNoJournal,
111 IdempotentHit,
113 MissingRequestId,
115}
116
117#[derive(Debug, Clone, Serialize, Deserialize, Default)]
119pub struct CommandTiming {
120 pub apply_ms: u64,
122 pub serialize_ms: u64,
124 pub journal_ms: u64,
126 pub publish_ms: u64,
128}
129
130#[derive(Debug, Clone, Serialize, Deserialize)]
132pub struct CommandTraceRecord {
133 pub request_id: String,
135 pub command_type: String,
137 pub wallet: Option<String>,
139 pub symbol: Option<String>,
141 pub order_id: Option<u64>,
143 pub received_ts_ms: u64,
145 pub applied_ts_ms: u64,
147 pub duration_ms: u64,
149 pub pre_state_digest: EngineStateDigest,
151 pub post_state_digest: EngineStateDigest,
153 pub context: sonic_rs::Value,
155 #[serde(default)]
157 pub journal_status: JournalStatus,
158 #[serde(default)]
160 pub timing: CommandTiming,
161}
162
163impl CommandTraceRecord {
164 pub fn builder(request_id: String) -> CommandTraceRecordBuilder {
166 CommandTraceRecordBuilder::new(request_id)
167 }
168}
169
170pub struct CommandTraceRecordBuilder {
172 request_id: String,
173 command_type: Option<String>,
174 wallet: Option<String>,
175 symbol: Option<String>,
176 order_id: Option<u64>,
177 received_ts_ms: u64,
178 applied_ts_ms: u64,
179 pre_state_digest: Option<EngineStateDigest>,
180 post_state_digest: Option<EngineStateDigest>,
181 context: sonic_rs::Value,
182 journal_status: JournalStatus,
183 timing: CommandTiming,
184}
185
186impl CommandTraceRecordBuilder {
187 pub fn new(request_id: String) -> Self {
188 Self {
189 request_id,
190 command_type: None,
191 wallet: None,
192 symbol: None,
193 order_id: None,
194 received_ts_ms: 0,
195 applied_ts_ms: 0,
196 pre_state_digest: None,
197 post_state_digest: None,
198 context: sonic_rs::Value::default(),
199 journal_status: JournalStatus::default(),
200 timing: CommandTiming::default(),
201 }
202 }
203
204 pub fn command_type(mut self, cmd_type: impl Into<String>) -> Self {
205 self.command_type = Some(cmd_type.into());
206 self
207 }
208
209 pub fn wallet(mut self, wallet: impl Into<String>) -> Self {
210 self.wallet = Some(wallet.into());
211 self
212 }
213
214 pub fn symbol(mut self, symbol: impl Into<String>) -> Self {
215 self.symbol = Some(symbol.into());
216 self
217 }
218
219 pub fn order_id(mut self, order_id: u64) -> Self {
220 self.order_id = Some(order_id);
221 self
222 }
223
224 pub fn received_ts_ms(mut self, ts: u64) -> Self {
225 self.received_ts_ms = ts;
226 self
227 }
228
229 pub fn applied_ts_ms(mut self, ts: u64) -> Self {
230 self.applied_ts_ms = ts;
231 self
232 }
233
234 pub fn pre_state_digest(mut self, digest: EngineStateDigest) -> Self {
235 self.pre_state_digest = Some(digest);
236 self
237 }
238
239 pub fn post_state_digest(mut self, digest: EngineStateDigest) -> Self {
240 self.post_state_digest = Some(digest);
241 self
242 }
243
244 pub fn context(mut self, context: sonic_rs::Value) -> Self {
245 self.context = context;
246 self
247 }
248
249 pub fn journal_status(mut self, status: JournalStatus) -> Self {
250 self.journal_status = status;
251 self
252 }
253
254 pub fn timing(mut self, timing: CommandTiming) -> Self {
255 self.timing = timing;
256 self
257 }
258
259 pub fn build(self) -> CommandTraceRecord {
260 let duration_ms = self.applied_ts_ms.saturating_sub(self.received_ts_ms);
261
262 CommandTraceRecord {
263 request_id: self.request_id,
264 command_type: self.command_type.unwrap_or_else(|| "Unknown".to_string()),
265 wallet: self.wallet,
266 symbol: self.symbol,
267 order_id: self.order_id,
268 received_ts_ms: self.received_ts_ms,
269 applied_ts_ms: self.applied_ts_ms,
270 duration_ms,
271 pre_state_digest: self.pre_state_digest.unwrap_or_default(),
272 post_state_digest: self.post_state_digest.unwrap_or_default(),
273 context: self.context,
274 journal_status: self.journal_status,
275 timing: self.timing,
276 }
277 }
278}
279
280#[cfg(test)]
281mod tests {
282 use super::*;
283 use std::collections::HashMap;
284
285 #[tokio::test]
286 async fn test_command_trace_store_basic() {
287 let store = CommandTraceStore::new(10);
288 assert!(store.is_empty().await);
289
290 let record = CommandTraceRecord::builder("test-1".to_string())
291 .command_type("CreateOrder")
292 .wallet("0x1234")
293 .build();
294
295 store.push(record).await;
296 assert_eq!(store.len().await, 1);
297
298 let recent = store.get_recent(10).await;
299 assert_eq!(recent.len(), 1);
300 assert_eq!(recent[0].request_id, "test-1");
301 }
302
303 #[tokio::test]
304 async fn test_command_trace_store_ring_buffer() {
305 let store = CommandTraceStore::new(3);
306
307 for i in 1..=5 {
309 let record = CommandTraceRecord::builder(format!("test-{}", i))
310 .command_type("CreateOrder")
311 .build();
312 store.push(record).await;
313 }
314
315 assert_eq!(store.len().await, 3);
317
318 let recent = store.get_recent(10).await;
319 assert_eq!(recent.len(), 3);
320 assert_eq!(recent[0].request_id, "test-5");
322 assert_eq!(recent[1].request_id, "test-4");
323 assert_eq!(recent[2].request_id, "test-3");
324 }
325
326 #[tokio::test]
327 async fn test_find_by_request_id() {
328 let store = CommandTraceStore::new(10);
329
330 let record1 = CommandTraceRecord::builder("test-1".to_string())
331 .command_type("CreateOrder")
332 .build();
333 let record2 = CommandTraceRecord::builder("test-2".to_string())
334 .command_type("CancelOrder")
335 .build();
336
337 store.push(record1).await;
338 store.push(record2).await;
339
340 let found = store.find_by_request_id("test-1").await;
341 assert_eq!(found.len(), 1);
342 assert_eq!(found[0].command_type, "CreateOrder");
343
344 let not_found = store.find_by_request_id("test-99").await;
345 assert!(not_found.is_empty());
346 }
347
348 #[test]
349 fn test_engine_state_digest_equality() {
350 let digest1 = EngineStateDigest {
351 next_order_id: 100,
352 next_trade_id: 50,
353 l2_seq: 1000,
354 symbols: HashMap::new(),
355 };
356
357 let digest2 = EngineStateDigest {
358 next_order_id: 100,
359 next_trade_id: 50,
360 l2_seq: 1000,
361 symbols: HashMap::new(),
362 };
363
364 let digest3 = EngineStateDigest {
365 next_order_id: 101, next_trade_id: 50,
367 l2_seq: 1000,
368 symbols: HashMap::new(),
369 };
370
371 assert!(digest1.is_equal_to(&digest2));
372 assert!(!digest1.is_equal_to(&digest3));
373 }
374}