hypercall_runtime_api/
rpi_monitor.rs1use std::collections::VecDeque;
2use std::sync::{LazyLock, Mutex};
3
4use hypercall_types::utils::get_timestamp_millis;
5use serde::Serialize;
6
7const MAX_EVENTS: usize = 1000;
8
9static RECENT_EVENTS: LazyLock<Mutex<VecDeque<RpiMonitorEvent>>> =
10 LazyLock::new(|| Mutex::new(VecDeque::with_capacity(MAX_EVENTS)));
11
12#[derive(Clone, Debug, Serialize, utoipa::ToSchema)]
13pub struct RpiMonitorEvent {
14 pub timestamp_ms: u64,
15 pub outcome: String,
16 pub reference_state: String,
17 pub reason: Option<String>,
18 pub wallet: String,
19 pub symbol: String,
20 pub side: String,
21 pub size: String,
22 pub limit_price: String,
23 pub reference_price: Option<String>,
24 pub fill_price: Option<String>,
25 pub rfq_id: Option<String>,
26 pub quote_id: Option<String>,
27 pub fill_id: Option<String>,
28 pub order_id: Option<u64>,
29 pub request_id: Option<String>,
30 pub l2_seq: Option<i64>,
31}
32
33#[derive(Clone, Debug)]
34pub struct RpiMonitorRecord {
35 pub outcome: &'static str,
36 pub reference_state: &'static str,
37 pub reason: Option<String>,
38 pub wallet: String,
39 pub symbol: String,
40 pub side: String,
41 pub size: String,
42 pub limit_price: String,
43 pub reference_price: Option<String>,
44 pub fill_price: Option<String>,
45 pub rfq_id: Option<String>,
46 pub quote_id: Option<String>,
47 pub fill_id: Option<String>,
48 pub order_id: Option<u64>,
49 pub request_id: Option<String>,
50 pub l2_seq: Option<i64>,
51}
52
53#[derive(Clone, Debug, Default)]
54pub struct RpiMonitorFilter {
55 pub outcome: Option<String>,
56 pub symbol: Option<String>,
57 pub wallet: Option<String>,
58 pub rfq_id: Option<String>,
59 pub request_id: Option<String>,
60}
61
62pub fn record(record: RpiMonitorRecord) {
63 metrics::counter!(
64 "ht_rpi_order_routing_total",
65 "outcome" => record.outcome,
66 "reference" => record.reference_state
67 )
68 .increment(1);
69
70 let event = RpiMonitorEvent {
71 timestamp_ms: get_timestamp_millis(),
72 outcome: record.outcome.to_string(),
73 reference_state: record.reference_state.to_string(),
74 reason: record.reason,
75 wallet: record.wallet,
76 symbol: record.symbol,
77 side: record.side,
78 size: record.size,
79 limit_price: record.limit_price,
80 reference_price: record.reference_price,
81 fill_price: record.fill_price,
82 rfq_id: record.rfq_id,
83 quote_id: record.quote_id,
84 fill_id: record.fill_id,
85 order_id: record.order_id,
86 request_id: record.request_id,
87 l2_seq: record.l2_seq,
88 };
89
90 let mut events = RECENT_EVENTS.lock().expect("RPI monitor mutex poisoned");
91 if events.len() == MAX_EVENTS {
92 events.pop_front();
93 }
94 events.push_back(event);
95}
96
97pub fn recent(filter: &RpiMonitorFilter, limit: usize) -> Vec<RpiMonitorEvent> {
98 let events = RECENT_EVENTS.lock().expect("RPI monitor mutex poisoned");
99 events
100 .iter()
101 .rev()
102 .filter(|event| {
103 filter
104 .outcome
105 .as_ref()
106 .map_or(true, |outcome| event.outcome == *outcome)
107 && filter
108 .symbol
109 .as_ref()
110 .map_or(true, |symbol| event.symbol == *symbol)
111 && filter
112 .wallet
113 .as_ref()
114 .map_or(true, |wallet| event.wallet.eq_ignore_ascii_case(wallet))
115 && filter.rfq_id.as_ref().map_or(true, |rfq_id| {
116 event.rfq_id.as_deref() == Some(rfq_id.as_str())
117 })
118 && filter.request_id.as_ref().map_or(true, |request_id| {
119 event.request_id.as_deref() == Some(request_id.as_str())
120 })
121 })
122 .take(limit)
123 .cloned()
124 .collect()
125}