Skip to main content

hypercall/observability/metrics_collector/
mod.rs

1//! Metrics Collector - Periodic collection of business and operational metrics.
2//!
3//! This module provides a background task that periodically computes and exports
4//! metrics for trading activity, risk, market quality, and operations.
5
6use crate::read_cache::greeks::GreeksCache;
7use crate::read_cache::market_stats::MarketStatsCache;
8use crate::read_cache::portfolio::PortfolioCache;
9use crate::read_cache::tier::TierCache;
10use crate::vol_oracle::VolOracle;
11use hypercall_db::InstrumentReader;
12use hypercall_db_diesel::DatabaseHandler;
13use hypercall_runtime_api::{OrderSnapshotProvider, QuoteProvider};
14use hypercall_types::WalletAddress;
15use metrics::gauge;
16use rust_decimal::prelude::ToPrimitive;
17use std::collections::{BTreeSet, HashMap};
18use std::sync::Arc;
19use std::time::Duration;
20use tokio::sync::broadcast;
21use tracing::{debug, error, info, warn};
22
23mod directive_outbox;
24mod events;
25mod integrity;
26mod market_quality;
27mod operations;
28mod risk;
29mod settlement;
30mod trading;
31
32pub use events::{record_auth_failure, record_liquidation, record_settlement, LiquidationReason};
33pub use hypercall_types::observability::AuthFailureReason;
34
35// Build-time version info (set by build.rs, used as fallbacks)
36const COMPILE_TIME_GIT_COMMIT: &str = env!("GIT_COMMIT");
37const COMPILE_TIME_GIT_REF: &str = env!("GIT_REF");
38const COMPILE_TIME_BUILD_TIME: &str = env!("BUILD_TIME");
39pub const CARGO_VERSION: &str = env!("CARGO_PKG_VERSION");
40
41// Runtime version info - prefers env vars (set in Docker) over compile-time constants
42// This allows passing git info via Docker build args without needing .git in build context
43use std::sync::LazyLock;
44
45pub static GIT_COMMIT: LazyLock<String> = LazyLock::new(|| {
46    std::env::var("GIT_COMMIT").unwrap_or_else(|_| COMPILE_TIME_GIT_COMMIT.to_string())
47});
48
49pub static GIT_REF: LazyLock<String> =
50    LazyLock::new(|| std::env::var("GIT_REF").unwrap_or_else(|_| COMPILE_TIME_GIT_REF.to_string()));
51
52pub static BUILD_TIME: LazyLock<String> = LazyLock::new(|| {
53    std::env::var("BUILD_TIME").unwrap_or_else(|_| COMPILE_TIME_BUILD_TIME.to_string())
54});
55
56/// Configuration for the metrics collector.
57#[derive(Clone)]
58pub struct MetricsCollectorConfig {
59    /// Interval for fast in-memory metrics (trading, market quality, operational).
60    pub fast_interval: Duration,
61    /// Interval for slow DB-backed metrics (settlement, integrity).
62    /// Set higher to reduce DB load, especially with multiple instances.
63    pub slow_interval: Duration,
64    /// Threshold (0.0-1.0) for "near liquidation" accounts.
65    /// E.g., 0.15 means accounts within 15% of liquidation threshold.
66    pub near_liquidation_threshold: f64,
67    /// If true, skip DB-backed metrics entirely.
68    /// Use when running multiple instances and only the primary should query DB.
69    /// Controlled by METRICS_SKIP_DB_QUERIES env var.
70    pub skip_db_queries: bool,
71}
72
73impl Default for MetricsCollectorConfig {
74    fn default() -> Self {
75        Self {
76            fast_interval: Duration::from_secs(30),
77            slow_interval: Duration::from_secs(300), // 5 minutes for DB queries
78            near_liquidation_threshold: 0.15,
79            skip_db_queries: false,
80        }
81    }
82}
83
84impl MetricsCollectorConfig {
85    pub fn from_config(config: &crate::backend_config::MetricsRuntimeConfig) -> Self {
86        Self {
87            skip_db_queries: config.skip_db_queries,
88            ..Self::default()
89        }
90    }
91}
92
93/// Metrics collector that periodically computes and exports business metrics.
94pub struct MetricsCollector {
95    config: MetricsCollectorConfig,
96    portfolio_cache: Arc<PortfolioCache>,
97    quote_provider: Arc<dyn QuoteProvider>,
98    order_snapshot: Option<Arc<dyn OrderSnapshotProvider>>,
99    tier_cache: Option<Arc<TierCache>>,
100    pm_settlement_allowlist: BTreeSet<WalletAddress>,
101    greeks_cache: Option<Arc<GreeksCache>>,
102    market_stats_cache: Option<Arc<MarketStatsCache>>,
103    db: Option<Arc<DatabaseHandler>>,
104    integrity_db: Option<Arc<dyn hypercall_db::IntegrityReader>>,
105    directive_outbox_db: Option<Arc<dyn hypercall_db::AsyncDirectiveOutboxReader>>,
106    pm_settlement_db: Option<Arc<dyn hypercall_db::PmSettlementProjectionReader>>,
107    risk_vol_oracle: Option<Arc<dyn VolOracle>>,
108}
109
110impl MetricsCollector {
111    pub fn new(
112        config: MetricsCollectorConfig,
113        portfolio_cache: Arc<PortfolioCache>,
114        quote_provider: Arc<dyn QuoteProvider>,
115        order_snapshot: Option<Arc<dyn OrderSnapshotProvider>>,
116        tier_cache: Option<Arc<TierCache>>,
117        pm_settlement_allowlist: BTreeSet<WalletAddress>,
118        greeks_cache: Option<Arc<GreeksCache>>,
119        market_stats_cache: Option<Arc<MarketStatsCache>>,
120        db: Option<Arc<DatabaseHandler>>,
121        integrity_db: Option<Arc<dyn hypercall_db::IntegrityReader>>,
122        directive_outbox_db: Option<Arc<dyn hypercall_db::AsyncDirectiveOutboxReader>>,
123        pm_settlement_db: Option<Arc<dyn hypercall_db::PmSettlementProjectionReader>>,
124        risk_vol_oracle: Option<Arc<dyn VolOracle>>,
125    ) -> Self {
126        Self {
127            config,
128            portfolio_cache,
129            quote_provider,
130            order_snapshot,
131            tier_cache,
132            pm_settlement_allowlist,
133            greeks_cache,
134            market_stats_cache,
135            db,
136            integrity_db,
137            directive_outbox_db,
138            pm_settlement_db,
139            risk_vol_oracle,
140        }
141    }
142
143    pub async fn run_loop(&self, mut shutdown_rx: broadcast::Receiver<()>) {
144        info!(
145            "Starting metrics collector (fast: {:?}, slow: {:?}, skip_db: {})",
146            self.config.fast_interval, self.config.slow_interval, self.config.skip_db_queries
147        );
148
149        gauge!(
150            "ht_build_info",
151            "commit" => GIT_COMMIT.as_str(),
152            "ref" => GIT_REF.as_str(),
153            "version" => CARGO_VERSION,
154            "build_time" => BUILD_TIME.as_str()
155        )
156        .set(1.0);
157        info!(
158            "Build info: version={}, commit={}, ref={}, built={}",
159            CARGO_VERSION, &*GIT_COMMIT, &*GIT_REF, &*BUILD_TIME
160        );
161
162        let mut fast_interval = tokio::time::interval(self.config.fast_interval);
163        let mut slow_interval = tokio::time::interval(self.config.slow_interval);
164
165        loop {
166            tokio::select! {
167                _ = fast_interval.tick() => {
168                    self.collect_fast_metrics().await;
169                }
170                _ = slow_interval.tick() => {
171                    if !self.config.skip_db_queries {
172                        self.collect_slow_metrics().await;
173                    }
174                }
175                _ = shutdown_rx.recv() => {
176                    info!("Metrics collector shutting down");
177                    break;
178                }
179            }
180        }
181    }
182
183    /// Collect fast in-memory metrics (every 30s by default).
184    pub(super) async fn collect_fast_metrics(&self) {
185        debug!("Collecting fast metrics...");
186
187        // These iterate in-memory caches - very fast
188        tokio::join!(
189            self.collect_trading_metrics(),
190            self.collect_risk_metrics(),
191            self.collect_market_quality_metrics(),
192            self.collect_operational_metrics(),
193            self.collect_invariant_metrics(),
194            self.collect_position_metrics(),
195            self.collect_vol_surface_metrics(),
196            self.collect_market_stats_metrics(),
197        );
198    }
199
200    /// Collect slow DB-backed metrics (every 5 min by default).
201    /// These hit the database and should run less frequently.
202    pub(super) async fn collect_slow_metrics(&self) {
203        debug!("Collecting slow (DB) metrics...");
204
205        // These query the database - run less frequently
206        tokio::join!(
207            self.collect_settlement_metrics(),
208            self.collect_integrity_metrics(),
209            self.collect_directive_outbox_metrics(),
210        );
211    }
212}
213
214#[async_trait::async_trait]
215impl crate::shared::service::Service for MetricsCollector {
216    fn name(&self) -> &'static str {
217        "MetricsCollector"
218    }
219
220    fn owner(&self) -> crate::shared::service::ServiceOwner {
221        crate::shared::service::ServiceOwner::Shared
222    }
223
224    async fn run(
225        self: Arc<MetricsCollector>,
226        shutdown: crate::shared::ShutdownRx,
227    ) -> anyhow::Result<()> {
228        self.run_loop(shutdown).await;
229        Ok(())
230    }
231}