Skip to main content

hypercall_runtime_api/
rpi_monitor.rs

1use std::collections::VecDeque;
2use std::sync::{LazyLock, Mutex};
3
4use hypercall_types::utils::get_timestamp_millis;
5use serde::Serialize;
6
7const MAX_EVENTS: usize = 1000;
8
9static RECENT_EVENTS: LazyLock<Mutex<VecDeque<RpiMonitorEvent>>> =
10    LazyLock::new(|| Mutex::new(VecDeque::with_capacity(MAX_EVENTS)));
11
12#[derive(Clone, Debug, Serialize, utoipa::ToSchema)]
13pub struct RpiMonitorEvent {
14    pub timestamp_ms: u64,
15    pub outcome: String,
16    pub reference_state: String,
17    pub reason: Option<String>,
18    pub wallet: String,
19    pub symbol: String,
20    pub side: String,
21    pub size: String,
22    pub limit_price: String,
23    pub reference_price: Option<String>,
24    pub fill_price: Option<String>,
25    pub rfq_id: Option<String>,
26    pub quote_id: Option<String>,
27    pub fill_id: Option<String>,
28    pub order_id: Option<u64>,
29    pub request_id: Option<String>,
30    pub l2_seq: Option<i64>,
31}
32
33#[derive(Clone, Debug)]
34pub struct RpiMonitorRecord {
35    pub outcome: &'static str,
36    pub reference_state: &'static str,
37    pub reason: Option<String>,
38    pub wallet: String,
39    pub symbol: String,
40    pub side: String,
41    pub size: String,
42    pub limit_price: String,
43    pub reference_price: Option<String>,
44    pub fill_price: Option<String>,
45    pub rfq_id: Option<String>,
46    pub quote_id: Option<String>,
47    pub fill_id: Option<String>,
48    pub order_id: Option<u64>,
49    pub request_id: Option<String>,
50    pub l2_seq: Option<i64>,
51}
52
53#[derive(Clone, Debug, Default)]
54pub struct RpiMonitorFilter {
55    pub outcome: Option<String>,
56    pub symbol: Option<String>,
57    pub wallet: Option<String>,
58    pub rfq_id: Option<String>,
59    pub request_id: Option<String>,
60}
61
62pub fn record(record: RpiMonitorRecord) {
63    metrics::counter!(
64        "ht_rpi_order_routing_total",
65        "outcome" => record.outcome,
66        "reference" => record.reference_state
67    )
68    .increment(1);
69
70    let event = RpiMonitorEvent {
71        timestamp_ms: get_timestamp_millis(),
72        outcome: record.outcome.to_string(),
73        reference_state: record.reference_state.to_string(),
74        reason: record.reason,
75        wallet: record.wallet,
76        symbol: record.symbol,
77        side: record.side,
78        size: record.size,
79        limit_price: record.limit_price,
80        reference_price: record.reference_price,
81        fill_price: record.fill_price,
82        rfq_id: record.rfq_id,
83        quote_id: record.quote_id,
84        fill_id: record.fill_id,
85        order_id: record.order_id,
86        request_id: record.request_id,
87        l2_seq: record.l2_seq,
88    };
89
90    let mut events = RECENT_EVENTS.lock().expect("RPI monitor mutex poisoned");
91    if events.len() == MAX_EVENTS {
92        events.pop_front();
93    }
94    events.push_back(event);
95}
96
97pub fn recent(filter: &RpiMonitorFilter, limit: usize) -> Vec<RpiMonitorEvent> {
98    let events = RECENT_EVENTS.lock().expect("RPI monitor mutex poisoned");
99    events
100        .iter()
101        .rev()
102        .filter(|event| {
103            filter
104                .outcome
105                .as_ref()
106                .map_or(true, |outcome| event.outcome == *outcome)
107                && filter
108                    .symbol
109                    .as_ref()
110                    .map_or(true, |symbol| event.symbol == *symbol)
111                && filter
112                    .wallet
113                    .as_ref()
114                    .map_or(true, |wallet| event.wallet.eq_ignore_ascii_case(wallet))
115                && filter.rfq_id.as_ref().map_or(true, |rfq_id| {
116                    event.rfq_id.as_deref() == Some(rfq_id.as_str())
117                })
118                && filter.request_id.as_ref().map_or(true, |request_id| {
119                    event.request_id.as_deref() == Some(request_id.as_str())
120                })
121        })
122        .take(limit)
123        .cloned()
124        .collect()
125}