Skip to main content

hypercall/observability/
command_trace.rs

1//! Command tracing for debugging and observability.
2//!
3//! This module provides a lightweight per-command trace ring buffer that records:
4//! - The command that was applied
5//! - Pre and post state digests
6//! - Events generated
7//! - Context values used in decision-making
8//!
9//! This enables debugging by observing "input command -> output events" per request.
10
11pub use hypercall_types::observability::{EngineStateDigest, SymbolSummary};
12use serde::{Deserialize, Serialize};
13use std::collections::VecDeque;
14use tokio::sync::RwLock;
15
16/// Default capacity for the command trace store ring buffer.
17const DEFAULT_CAPACITY: usize = 1000;
18
19/// Store for command trace records with a ring buffer implementation.
20pub struct CommandTraceStore {
21    /// Maximum number of records to keep
22    cap: usize,
23    /// Ring buffer of trace records
24    inner: RwLock<VecDeque<CommandTraceRecord>>,
25}
26
27impl CommandTraceStore {
28    /// Create a new command trace store with the specified capacity.
29    pub fn new(capacity: usize) -> Self {
30        Self {
31            cap: capacity,
32            inner: RwLock::new(VecDeque::with_capacity(capacity)),
33        }
34    }
35
36    /// Create a new command trace store with default capacity.
37    pub fn with_default_capacity() -> Self {
38        Self::new(DEFAULT_CAPACITY)
39    }
40
41    /// Push a new trace record into the store.
42    /// If the store is at capacity, the oldest record is removed.
43    pub async fn push(&self, record: CommandTraceRecord) {
44        let mut inner = self.inner.write().await;
45        if inner.len() >= self.cap {
46            inner.pop_front();
47        }
48        inner.push_back(record);
49    }
50
51    /// Get the last N records (most recent first).
52    pub async fn get_recent(&self, limit: usize) -> Vec<CommandTraceRecord> {
53        let inner = self.inner.read().await;
54        inner.iter().rev().take(limit).cloned().collect()
55    }
56
57    /// Alias for get_recent (convenience method).
58    pub async fn list_recent(&self, limit: usize) -> Vec<CommandTraceRecord> {
59        self.get_recent(limit).await
60    }
61
62    /// Find records by request_id.
63    pub async fn find_by_request_id(&self, request_id: &str) -> Vec<CommandTraceRecord> {
64        let inner = self.inner.read().await;
65        inner
66            .iter()
67            .filter(|r| r.request_id == request_id)
68            .cloned()
69            .collect()
70    }
71
72    /// Get total number of records in the store.
73    pub async fn len(&self) -> usize {
74        let inner = self.inner.read().await;
75        inner.len()
76    }
77
78    /// Check if the store is empty.
79    pub async fn is_empty(&self) -> bool {
80        let inner = self.inner.read().await;
81        inner.is_empty()
82    }
83
84    /// Clear all records from the store.
85    pub async fn clear(&self) {
86        let mut inner = self.inner.write().await;
87        inner.clear();
88    }
89}
90
91impl Default for CommandTraceStore {
92    fn default() -> Self {
93        Self::with_default_capacity()
94    }
95}
96
97/// Journal status for a command trace record.
98#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
99#[serde(rename_all = "snake_case")]
100#[derive(Default)]
101pub enum JournalStatus {
102    /// Command was successfully journaled
103    Journaled,
104    /// Journaling is disabled
105    #[default]
106    JournalDisabled,
107    /// Journal write failed (degraded mode)
108    JournalFailed,
109    /// Internal command that doesn't require journaling (tick, expiry, MMP)
110    InternalNoJournal,
111    /// Idempotent hit - returned cached response
112    IdempotentHit,
113    /// Missing request_id, couldn't journal
114    MissingRequestId,
115}
116
117/// Timing breakdown for command processing.
118#[derive(Debug, Clone, Serialize, Deserialize, Default)]
119pub struct CommandTiming {
120    /// Time to apply the command (pure state machine)
121    pub apply_ms: u64,
122    /// Time to serialize events and response
123    pub serialize_ms: u64,
124    /// Time for journal DB transaction
125    pub journal_ms: u64,
126    /// Time to publish events to subscribers
127    pub publish_ms: u64,
128}
129
130/// A single command trace record capturing the full context of a command execution.
131#[derive(Debug, Clone, Serialize, Deserialize)]
132pub struct CommandTraceRecord {
133    /// Unique request ID for correlation
134    pub request_id: String,
135    /// Type of command (e.g., "CreateOrder", "CancelOrder", "CreateMarket")
136    pub command_type: String,
137    /// Wallet address involved in the command (if applicable)
138    pub wallet: Option<String>,
139    /// Symbol involved in the command (if applicable)
140    pub symbol: Option<String>,
141    /// Order ID involved in the command (if applicable)
142    pub order_id: Option<u64>,
143    /// Timestamp when the command was received (milliseconds since epoch)
144    pub received_ts_ms: u64,
145    /// Timestamp when apply() completed (milliseconds since epoch)
146    pub applied_ts_ms: u64,
147    /// Duration of apply() in milliseconds
148    pub duration_ms: u64,
149    /// Engine state digest before apply()
150    pub pre_state_digest: EngineStateDigest,
151    /// Engine state digest after apply()
152    pub post_state_digest: EngineStateDigest,
153    /// Context values used in decision-making (liquidation state, tier, margin mode, etc.)
154    pub context: sonic_rs::Value,
155    /// Journal status indicating how this command was processed
156    #[serde(default)]
157    pub journal_status: JournalStatus,
158    /// Detailed timing breakdown
159    #[serde(default)]
160    pub timing: CommandTiming,
161}
162
163impl CommandTraceRecord {
164    /// Create a new trace record builder
165    pub fn builder(request_id: String) -> CommandTraceRecordBuilder {
166        CommandTraceRecordBuilder::new(request_id)
167    }
168}
169
170/// Builder for creating CommandTraceRecord instances
171pub struct CommandTraceRecordBuilder {
172    request_id: String,
173    command_type: Option<String>,
174    wallet: Option<String>,
175    symbol: Option<String>,
176    order_id: Option<u64>,
177    received_ts_ms: u64,
178    applied_ts_ms: u64,
179    pre_state_digest: Option<EngineStateDigest>,
180    post_state_digest: Option<EngineStateDigest>,
181    context: sonic_rs::Value,
182    journal_status: JournalStatus,
183    timing: CommandTiming,
184}
185
186impl CommandTraceRecordBuilder {
187    pub fn new(request_id: String) -> Self {
188        Self {
189            request_id,
190            command_type: None,
191            wallet: None,
192            symbol: None,
193            order_id: None,
194            received_ts_ms: 0,
195            applied_ts_ms: 0,
196            pre_state_digest: None,
197            post_state_digest: None,
198            context: sonic_rs::Value::default(),
199            journal_status: JournalStatus::default(),
200            timing: CommandTiming::default(),
201        }
202    }
203
204    pub fn command_type(mut self, cmd_type: impl Into<String>) -> Self {
205        self.command_type = Some(cmd_type.into());
206        self
207    }
208
209    pub fn wallet(mut self, wallet: impl Into<String>) -> Self {
210        self.wallet = Some(wallet.into());
211        self
212    }
213
214    pub fn symbol(mut self, symbol: impl Into<String>) -> Self {
215        self.symbol = Some(symbol.into());
216        self
217    }
218
219    pub fn order_id(mut self, order_id: u64) -> Self {
220        self.order_id = Some(order_id);
221        self
222    }
223
224    pub fn received_ts_ms(mut self, ts: u64) -> Self {
225        self.received_ts_ms = ts;
226        self
227    }
228
229    pub fn applied_ts_ms(mut self, ts: u64) -> Self {
230        self.applied_ts_ms = ts;
231        self
232    }
233
234    pub fn pre_state_digest(mut self, digest: EngineStateDigest) -> Self {
235        self.pre_state_digest = Some(digest);
236        self
237    }
238
239    pub fn post_state_digest(mut self, digest: EngineStateDigest) -> Self {
240        self.post_state_digest = Some(digest);
241        self
242    }
243
244    pub fn context(mut self, context: sonic_rs::Value) -> Self {
245        self.context = context;
246        self
247    }
248
249    pub fn journal_status(mut self, status: JournalStatus) -> Self {
250        self.journal_status = status;
251        self
252    }
253
254    pub fn timing(mut self, timing: CommandTiming) -> Self {
255        self.timing = timing;
256        self
257    }
258
259    pub fn build(self) -> CommandTraceRecord {
260        let duration_ms = self.applied_ts_ms.saturating_sub(self.received_ts_ms);
261
262        CommandTraceRecord {
263            request_id: self.request_id,
264            command_type: self.command_type.unwrap_or_else(|| "Unknown".to_string()),
265            wallet: self.wallet,
266            symbol: self.symbol,
267            order_id: self.order_id,
268            received_ts_ms: self.received_ts_ms,
269            applied_ts_ms: self.applied_ts_ms,
270            duration_ms,
271            pre_state_digest: self.pre_state_digest.unwrap_or_default(),
272            post_state_digest: self.post_state_digest.unwrap_or_default(),
273            context: self.context,
274            journal_status: self.journal_status,
275            timing: self.timing,
276        }
277    }
278}
279
280#[cfg(test)]
281mod tests {
282    use super::*;
283    use std::collections::HashMap;
284
285    #[tokio::test]
286    async fn test_command_trace_store_basic() {
287        let store = CommandTraceStore::new(10);
288        assert!(store.is_empty().await);
289
290        let record = CommandTraceRecord::builder("test-1".to_string())
291            .command_type("CreateOrder")
292            .wallet("0x1234")
293            .build();
294
295        store.push(record).await;
296        assert_eq!(store.len().await, 1);
297
298        let recent = store.get_recent(10).await;
299        assert_eq!(recent.len(), 1);
300        assert_eq!(recent[0].request_id, "test-1");
301    }
302
303    #[tokio::test]
304    async fn test_command_trace_store_ring_buffer() {
305        let store = CommandTraceStore::new(3);
306
307        // Push 5 records into a store with capacity 3
308        for i in 1..=5 {
309            let record = CommandTraceRecord::builder(format!("test-{}", i))
310                .command_type("CreateOrder")
311                .build();
312            store.push(record).await;
313        }
314
315        // Should only have the last 3 records
316        assert_eq!(store.len().await, 3);
317
318        let recent = store.get_recent(10).await;
319        assert_eq!(recent.len(), 3);
320        // Most recent first
321        assert_eq!(recent[0].request_id, "test-5");
322        assert_eq!(recent[1].request_id, "test-4");
323        assert_eq!(recent[2].request_id, "test-3");
324    }
325
326    #[tokio::test]
327    async fn test_find_by_request_id() {
328        let store = CommandTraceStore::new(10);
329
330        let record1 = CommandTraceRecord::builder("test-1".to_string())
331            .command_type("CreateOrder")
332            .build();
333        let record2 = CommandTraceRecord::builder("test-2".to_string())
334            .command_type("CancelOrder")
335            .build();
336
337        store.push(record1).await;
338        store.push(record2).await;
339
340        let found = store.find_by_request_id("test-1").await;
341        assert_eq!(found.len(), 1);
342        assert_eq!(found[0].command_type, "CreateOrder");
343
344        let not_found = store.find_by_request_id("test-99").await;
345        assert!(not_found.is_empty());
346    }
347
348    #[test]
349    fn test_engine_state_digest_equality() {
350        let digest1 = EngineStateDigest {
351            next_order_id: 100,
352            next_trade_id: 50,
353            l2_seq: 1000,
354            symbols: HashMap::new(),
355        };
356
357        let digest2 = EngineStateDigest {
358            next_order_id: 100,
359            next_trade_id: 50,
360            l2_seq: 1000,
361            symbols: HashMap::new(),
362        };
363
364        let digest3 = EngineStateDigest {
365            next_order_id: 101, // Different
366            next_trade_id: 50,
367            l2_seq: 1000,
368            symbols: HashMap::new(),
369        };
370
371        assert!(digest1.is_equal_to(&digest2));
372        assert!(!digest1.is_equal_to(&digest3));
373    }
374}