hypercall/observability/metrics_collector/
mod.rs1use crate::read_cache::greeks::GreeksCache;
7use crate::read_cache::market_stats::MarketStatsCache;
8use crate::read_cache::portfolio::PortfolioCache;
9use crate::read_cache::tier::TierCache;
10use crate::vol_oracle::VolOracle;
11use hypercall_db::InstrumentReader;
12use hypercall_db_diesel::DatabaseHandler;
13use hypercall_runtime_api::{OrderSnapshotProvider, QuoteProvider};
14use hypercall_types::WalletAddress;
15use metrics::gauge;
16use rust_decimal::prelude::ToPrimitive;
17use std::collections::{BTreeSet, HashMap};
18use std::sync::Arc;
19use std::time::Duration;
20use tokio::sync::broadcast;
21use tracing::{debug, error, info, warn};
22
23mod directive_outbox;
24mod events;
25mod integrity;
26mod market_quality;
27mod operations;
28mod risk;
29mod settlement;
30mod trading;
31
32pub use events::{record_auth_failure, record_liquidation, record_settlement, LiquidationReason};
33pub use hypercall_types::observability::AuthFailureReason;
34
35const COMPILE_TIME_GIT_COMMIT: &str = env!("GIT_COMMIT");
37const COMPILE_TIME_GIT_REF: &str = env!("GIT_REF");
38const COMPILE_TIME_BUILD_TIME: &str = env!("BUILD_TIME");
39pub const CARGO_VERSION: &str = env!("CARGO_PKG_VERSION");
40
41use std::sync::LazyLock;
44
45pub static GIT_COMMIT: LazyLock<String> = LazyLock::new(|| {
46 std::env::var("GIT_COMMIT").unwrap_or_else(|_| COMPILE_TIME_GIT_COMMIT.to_string())
47});
48
49pub static GIT_REF: LazyLock<String> =
50 LazyLock::new(|| std::env::var("GIT_REF").unwrap_or_else(|_| COMPILE_TIME_GIT_REF.to_string()));
51
52pub static BUILD_TIME: LazyLock<String> = LazyLock::new(|| {
53 std::env::var("BUILD_TIME").unwrap_or_else(|_| COMPILE_TIME_BUILD_TIME.to_string())
54});
55
56#[derive(Clone)]
58pub struct MetricsCollectorConfig {
59 pub fast_interval: Duration,
61 pub slow_interval: Duration,
64 pub near_liquidation_threshold: f64,
67 pub skip_db_queries: bool,
71}
72
73impl Default for MetricsCollectorConfig {
74 fn default() -> Self {
75 Self {
76 fast_interval: Duration::from_secs(30),
77 slow_interval: Duration::from_secs(300), near_liquidation_threshold: 0.15,
79 skip_db_queries: false,
80 }
81 }
82}
83
84impl MetricsCollectorConfig {
85 pub fn from_config(config: &crate::backend_config::MetricsRuntimeConfig) -> Self {
86 Self {
87 skip_db_queries: config.skip_db_queries,
88 ..Self::default()
89 }
90 }
91}
92
93pub struct MetricsCollector {
95 config: MetricsCollectorConfig,
96 portfolio_cache: Arc<PortfolioCache>,
97 quote_provider: Arc<dyn QuoteProvider>,
98 order_snapshot: Option<Arc<dyn OrderSnapshotProvider>>,
99 tier_cache: Option<Arc<TierCache>>,
100 pm_settlement_allowlist: BTreeSet<WalletAddress>,
101 greeks_cache: Option<Arc<GreeksCache>>,
102 market_stats_cache: Option<Arc<MarketStatsCache>>,
103 db: Option<Arc<DatabaseHandler>>,
104 integrity_db: Option<Arc<dyn hypercall_db::IntegrityReader>>,
105 directive_outbox_db: Option<Arc<dyn hypercall_db::AsyncDirectiveOutboxReader>>,
106 pm_settlement_db: Option<Arc<dyn hypercall_db::PmSettlementProjectionReader>>,
107 risk_vol_oracle: Option<Arc<dyn VolOracle>>,
108}
109
110impl MetricsCollector {
111 pub fn new(
112 config: MetricsCollectorConfig,
113 portfolio_cache: Arc<PortfolioCache>,
114 quote_provider: Arc<dyn QuoteProvider>,
115 order_snapshot: Option<Arc<dyn OrderSnapshotProvider>>,
116 tier_cache: Option<Arc<TierCache>>,
117 pm_settlement_allowlist: BTreeSet<WalletAddress>,
118 greeks_cache: Option<Arc<GreeksCache>>,
119 market_stats_cache: Option<Arc<MarketStatsCache>>,
120 db: Option<Arc<DatabaseHandler>>,
121 integrity_db: Option<Arc<dyn hypercall_db::IntegrityReader>>,
122 directive_outbox_db: Option<Arc<dyn hypercall_db::AsyncDirectiveOutboxReader>>,
123 pm_settlement_db: Option<Arc<dyn hypercall_db::PmSettlementProjectionReader>>,
124 risk_vol_oracle: Option<Arc<dyn VolOracle>>,
125 ) -> Self {
126 Self {
127 config,
128 portfolio_cache,
129 quote_provider,
130 order_snapshot,
131 tier_cache,
132 pm_settlement_allowlist,
133 greeks_cache,
134 market_stats_cache,
135 db,
136 integrity_db,
137 directive_outbox_db,
138 pm_settlement_db,
139 risk_vol_oracle,
140 }
141 }
142
143 pub async fn run_loop(&self, mut shutdown_rx: broadcast::Receiver<()>) {
144 info!(
145 "Starting metrics collector (fast: {:?}, slow: {:?}, skip_db: {})",
146 self.config.fast_interval, self.config.slow_interval, self.config.skip_db_queries
147 );
148
149 gauge!(
150 "ht_build_info",
151 "commit" => GIT_COMMIT.as_str(),
152 "ref" => GIT_REF.as_str(),
153 "version" => CARGO_VERSION,
154 "build_time" => BUILD_TIME.as_str()
155 )
156 .set(1.0);
157 info!(
158 "Build info: version={}, commit={}, ref={}, built={}",
159 CARGO_VERSION, &*GIT_COMMIT, &*GIT_REF, &*BUILD_TIME
160 );
161
162 let mut fast_interval = tokio::time::interval(self.config.fast_interval);
163 let mut slow_interval = tokio::time::interval(self.config.slow_interval);
164
165 loop {
166 tokio::select! {
167 _ = fast_interval.tick() => {
168 self.collect_fast_metrics().await;
169 }
170 _ = slow_interval.tick() => {
171 if !self.config.skip_db_queries {
172 self.collect_slow_metrics().await;
173 }
174 }
175 _ = shutdown_rx.recv() => {
176 info!("Metrics collector shutting down");
177 break;
178 }
179 }
180 }
181 }
182
183 pub(super) async fn collect_fast_metrics(&self) {
185 debug!("Collecting fast metrics...");
186
187 tokio::join!(
189 self.collect_trading_metrics(),
190 self.collect_risk_metrics(),
191 self.collect_market_quality_metrics(),
192 self.collect_operational_metrics(),
193 self.collect_invariant_metrics(),
194 self.collect_position_metrics(),
195 self.collect_vol_surface_metrics(),
196 self.collect_market_stats_metrics(),
197 );
198 }
199
200 pub(super) async fn collect_slow_metrics(&self) {
203 debug!("Collecting slow (DB) metrics...");
204
205 tokio::join!(
207 self.collect_settlement_metrics(),
208 self.collect_integrity_metrics(),
209 self.collect_directive_outbox_metrics(),
210 );
211 }
212}
213
214#[async_trait::async_trait]
215impl crate::shared::service::Service for MetricsCollector {
216 fn name(&self) -> &'static str {
217 "MetricsCollector"
218 }
219
220 fn owner(&self) -> crate::shared::service::ServiceOwner {
221 crate::shared::service::ServiceOwner::Shared
222 }
223
224 async fn run(
225 self: Arc<MetricsCollector>,
226 shutdown: crate::shared::ShutdownRx,
227 ) -> anyhow::Result<()> {
228 self.run_loop(shutdown).await;
229 Ok(())
230 }
231}