Skip to main content

hypercall_admin/monitoring/
oracles.rs

1//! Oracle health + vol surface admin endpoints.
2
3use axum::{extract::State, http::StatusCode, response::IntoResponse};
4use serde::{Deserialize, Serialize};
5use sonic_rs::json;
6use std::collections::HashMap;
7use tracing::warn;
8
9use crate::state::AdminState;
10use hypercall_runtime_api::sonic_json::SonicJson;
11
12#[derive(Debug, Clone, Serialize, utoipa::ToSchema)]
13pub struct VolOracleStatusResponse {
14    pub underlying: String,
15    pub provider: String,
16    pub route_facing: bool,
17    pub connected: bool,
18    pub ready: bool,
19    pub last_update_ts_ms: Option<i64>,
20    pub staleness_seconds: Option<f64>,
21    pub staleness_threshold_seconds: Option<f64>,
22    pub surface_points: usize,
23    pub messages_received: u64,
24    pub last_error: Option<String>,
25}
26
27#[derive(Debug, Clone, Serialize, utoipa::ToSchema)]
28pub struct VolOracleStatusesResponse {
29    pub statuses: Vec<VolOracleStatusResponse>,
30}
31
32#[derive(Debug, Clone, Serialize, utoipa::ToSchema)]
33pub struct PriceOracleStatusResponse {
34    pub underlying: String,
35    pub connected: bool,
36    pub ready: bool,
37    pub spot_price: Option<f64>,
38    pub prev_day_price: Option<f64>,
39    pub staleness_seconds: Option<f64>,
40}
41
42#[derive(Debug, Clone, Serialize, utoipa::ToSchema)]
43pub struct PriceOracleStatusesResponse {
44    pub statuses: Vec<PriceOracleStatusResponse>,
45}
46
47/// List vol oracle health and freshness by underlying.
48#[utoipa::path(
49    get,
50    path = "/monitoring/vol-oracles",
51    responses(
52        (status = 200, description = "Vol oracle status", body = VolOracleStatusesResponse),
53        (status = 401, description = "Invalid or missing X-Admin-Key header")
54    ),
55    tag = "Monitoring",
56    security(("admin_key" = []))
57)]
58pub async fn vol_oracles(State(app_state): State<AdminState>) -> impl IntoResponse {
59    let statuses = app_state
60        .risk_vol_oracle
61        .statuses()
62        .into_iter()
63        .map(|status| VolOracleStatusResponse {
64            underlying: status.underlying,
65            provider: status.provider.as_str().to_string(),
66            route_facing: status.route_facing,
67            connected: status.connected,
68            ready: status.ready,
69            last_update_ts_ms: status.last_update_ts_ms,
70            staleness_seconds: status.staleness_seconds,
71            staleness_threshold_seconds: status.staleness_threshold_seconds,
72            surface_points: status.surface_points,
73            messages_received: status.messages_received,
74            last_error: status.last_error,
75        })
76        .collect();
77
78    SonicJson(VolOracleStatusesResponse { statuses })
79}
80
81/// List mark/spot price oracle health and freshness by underlying.
82#[utoipa::path(
83    get,
84    path = "/monitoring/price-oracles",
85    responses(
86        (status = 200, description = "Price oracle status", body = PriceOracleStatusesResponse),
87        (status = 401, description = "Invalid or missing X-Admin-Key header")
88    ),
89    tag = "Monitoring",
90    security(("admin_key" = []))
91)]
92pub async fn price_oracles(State(app_state): State<AdminState>) -> impl IntoResponse {
93    let spot_prices = app_state.greeks_cache.get_all_spot_prices_snapshot().await;
94    let prev_day_prices = app_state
95        .greeks_cache
96        .get_all_prev_day_prices_snapshot()
97        .await;
98    let staleness = app_state.greeks_cache.get_spot_price_staleness().await;
99    let unhealthy = app_state.greeks_cache.get_unhealthy_oracles().await;
100
101    let statuses = price_oracle_statuses_from_snapshots(
102        app_state.greeks_cache.get_configured_underlyings(),
103        spot_prices,
104        prev_day_prices,
105        staleness,
106        unhealthy,
107    );
108
109    SonicJson(PriceOracleStatusesResponse { statuses })
110}
111
112fn price_oracle_statuses_from_snapshots(
113    configured_underlyings: Vec<String>,
114    spot_prices: HashMap<String, f64>,
115    prev_day_prices: HashMap<String, f64>,
116    staleness: HashMap<String, Option<f64>>,
117    unhealthy: Vec<String>,
118) -> Vec<PriceOracleStatusResponse> {
119    let mut statuses: Vec<PriceOracleStatusResponse> = configured_underlyings
120        .into_iter()
121        .map(|underlying| {
122            let spot_price = spot_prices.get(&underlying).copied();
123            let is_unhealthy = unhealthy.contains(&underlying);
124            PriceOracleStatusResponse {
125                prev_day_price: prev_day_prices.get(&underlying).copied(),
126                staleness_seconds: staleness.get(&underlying).copied().flatten(),
127                connected: !is_unhealthy,
128                ready: spot_price.is_some() && !is_unhealthy,
129                spot_price,
130                underlying,
131            }
132        })
133        .collect();
134
135    statuses.sort_by(|a, b| a.underlying.cmp(&b.underlying));
136    statuses
137}
138
139// --- Vol Surface Visualization Handlers ---
140
141#[derive(Debug, Deserialize, utoipa::IntoParams)]
142pub struct VolSurfaceQuery {
143    /// Underlying asset (e.g. "BTC", "ETH"). If omitted, returns all.
144    pub underlying: Option<String>,
145}
146
147#[derive(Debug, Serialize, utoipa::ToSchema)]
148pub struct VolSurfaceApiResponse {
149    pub surfaces: Vec<hypercall_vol_oracle::VolSurfaceSnapshot>,
150}
151
152/// GET /monitoring/vol-surface - Live vol surface snapshot(s).
153#[utoipa::path(
154    get,
155    path = "/monitoring/vol-surface",
156    params(VolSurfaceQuery),
157    responses(
158        (status = 200, description = "Vol surface snapshot", body = VolSurfaceApiResponse),
159        (status = 401, description = "Invalid or missing X-Admin-Key header")
160    ),
161    tag = "Monitoring",
162    security(("admin_key" = []))
163)]
164pub async fn get_vol_surface(
165    State(app_state): State<AdminState>,
166    axum::extract::Query(query): axum::extract::Query<VolSurfaceQuery>,
167) -> impl IntoResponse {
168    use std::collections::HashSet;
169
170    let statuses = app_state.risk_vol_oracle.statuses();
171    let underlyings: Vec<String> = if let Some(ref u) = query.underlying {
172        vec![u.clone()]
173    } else {
174        let mut seen = HashSet::new();
175        statuses
176            .iter()
177            .filter(|s| seen.insert(s.underlying.clone()))
178            .map(|s| s.underlying.clone())
179            .collect()
180    };
181
182    let surfaces: Vec<hypercall_vol_oracle::VolSurfaceSnapshot> = underlyings
183        .iter()
184        .filter_map(|u| app_state.risk_vol_oracle.get_surface_snapshot(u))
185        .collect();
186
187    SonicJson(VolSurfaceApiResponse { surfaces })
188}
189
190#[derive(Debug, Deserialize, utoipa::IntoParams)]
191pub struct VolSurfaceHistoryQuery {
192    pub underlying: String,
193    /// Interval in milliseconds (300000 = 5m, 3600000 = 1h, 86400000 = 1d).
194    pub interval_ms: i64,
195    /// Maximum number of points to return.
196    #[serde(default = "default_history_limit")]
197    pub limit: usize,
198}
199
200fn default_history_limit() -> usize {
201    500
202}
203
204#[derive(Debug, Serialize, utoipa::ToSchema)]
205pub struct VolSurfaceHistoryResponse {
206    pub underlying: String,
207    pub interval_ms: i64,
208    pub points: Vec<VolSurfaceHistoryPoint>,
209}
210
211#[derive(Debug, Serialize, utoipa::ToSchema)]
212pub struct VolSurfaceHistoryPoint {
213    pub timestamp_ms: i64,
214    pub surface_json: serde_json::Value,
215}
216
217/// GET /monitoring/vol-surface/history - Historical vol surface snapshots.
218#[utoipa::path(
219    get,
220    path = "/monitoring/vol-surface/history",
221    params(VolSurfaceHistoryQuery),
222    responses(
223        (status = 200, description = "Vol surface history", body = VolSurfaceHistoryResponse),
224        (status = 401, description = "Invalid or missing X-Admin-Key header")
225    ),
226    tag = "Monitoring",
227    security(("admin_key" = []))
228)]
229pub async fn get_vol_surface_history(
230    State(app_state): State<AdminState>,
231    axum::extract::Query(query): axum::extract::Query<VolSurfaceHistoryQuery>,
232) -> impl IntoResponse {
233    let rows = match app_state
234        .db
235        .get_vol_surface_history(&query.underlying, query.interval_ms, query.limit)
236        .await
237    {
238        Ok(rows) => rows,
239        Err(err) => {
240            warn!("Vol surface history query failed: {err:#}");
241            return (
242                StatusCode::INTERNAL_SERVER_ERROR,
243                SonicJson(json!({ "error": err.to_string() })),
244            )
245                .into_response();
246        }
247    };
248
249    let points: Vec<VolSurfaceHistoryPoint> = rows
250        .into_iter()
251        .map(|row| VolSurfaceHistoryPoint {
252            timestamp_ms: row.timestamp_ms,
253            surface_json: row.surface_json,
254        })
255        .collect();
256
257    SonicJson(VolSurfaceHistoryResponse {
258        underlying: query.underlying,
259        interval_ms: query.interval_ms,
260        points,
261    })
262    .into_response()
263}
264
265#[derive(Debug, Deserialize, utoipa::IntoParams)]
266pub struct VolSurfaceSymbolQuery {
267    /// Option symbol, e.g. "BTC-20260530-100000-C".
268    pub symbol: String,
269    /// Interval in milliseconds (300000 = 5m, 3600000 = 1h, 86400000 = 1d).
270    #[serde(default = "default_symbol_interval_ms")]
271    pub interval_ms: i64,
272    /// Maximum number of points to return.
273    #[serde(default = "default_history_limit")]
274    pub limit: usize,
275}
276
277fn default_symbol_interval_ms() -> i64 {
278    hypercall_types::HISTORICAL_THEO_INTERVAL_5M_MS
279}
280
281#[derive(Debug, Serialize, utoipa::ToSchema)]
282pub struct VolSurfaceSymbolResponse {
283    pub symbol: String,
284    pub underlying: String,
285    pub strike: f64,
286    pub expiry: i64,
287    pub interval_ms: i64,
288    pub points: Vec<SymbolIvPoint>,
289}
290
291#[derive(Debug, Serialize, utoipa::ToSchema)]
292pub struct SymbolIvPoint {
293    pub timestamp_ms: i64,
294    pub iv: Option<f64>,
295}
296
297/// GET /monitoring/vol-surface/symbol - IV time series for a single option symbol.
298///
299/// Reads persisted `vol_surface_snapshots` (not the live oracle) and extracts the
300/// IV for the requested (strike, expiry) at each captured timestamp.
301#[utoipa::path(
302    get,
303    path = "/monitoring/vol-surface/symbol",
304    params(VolSurfaceSymbolQuery),
305    responses(
306        (status = 200, description = "Symbol IV time series", body = VolSurfaceSymbolResponse),
307        (status = 400, description = "Invalid symbol or interval"),
308        (status = 401, description = "Invalid or missing X-Admin-Key header")
309    ),
310    tag = "Monitoring",
311    security(("admin_key" = []))
312)]
313pub async fn get_vol_surface_symbol(
314    State(app_state): State<AdminState>,
315    axum::extract::Query(query): axum::extract::Query<VolSurfaceSymbolQuery>,
316) -> impl IntoResponse {
317    use hypercall_types::ParsedOptionSymbol;
318    use rust_decimal::prelude::ToPrimitive;
319
320    let parsed = match ParsedOptionSymbol::from_symbol(&query.symbol) {
321        Ok(p) => p,
322        Err(reason) => {
323            return (
324                StatusCode::BAD_REQUEST,
325                SonicJson(
326                    json!({ "error": format!("Invalid symbol {}: {}", query.symbol, reason) }),
327                ),
328            )
329                .into_response();
330        }
331    };
332
333    let expiry_ts_secs =
334        hypercall_types::expiry_date_to_timestamp(&parsed.underlying, parsed.expiry);
335    // Never fall back to a made-up strike: a wrong strike would silently match
336    // the wrong vol surface point. Propagate the conversion failure instead.
337    let strike = match parsed.strike.to_f64() {
338        Some(strike) => strike,
339        None => {
340            return (
341                StatusCode::BAD_REQUEST,
342                SonicJson(json!({
343                    "error": format!("Strike {} cannot be represented as f64", parsed.strike)
344                })),
345            )
346                .into_response();
347        }
348    };
349
350    let rows = match app_state
351        .db
352        .get_vol_surface_history(&parsed.underlying, query.interval_ms, query.limit)
353        .await
354    {
355        Ok(rows) => rows,
356        Err(err) => {
357            warn!("Vol surface symbol query failed: {err:#}");
358            return (
359                StatusCode::INTERNAL_SERVER_ERROR,
360                SonicJson(json!({ "error": err.to_string() })),
361            )
362                .into_response();
363        }
364    };
365
366    // Match strike to within $0.5 — vol surface stores raw f64 strikes that may
367    // differ slightly from the integer strike encoded in the symbol.
368    const STRIKE_TOL: f64 = 0.5;
369
370    let points: Vec<SymbolIvPoint> = rows
371        .into_iter()
372        .map(|row| {
373            let iv = serde_json::from_value::<hypercall_vol_oracle::VolSurfaceSnapshot>(
374                row.surface_json,
375            )
376            .ok()
377            .and_then(|snap| {
378                snap.strike_points.into_iter().find_map(|p| {
379                    if p.expiry == expiry_ts_secs && (p.strike - strike).abs() < STRIKE_TOL {
380                        Some(p.iv)
381                    } else {
382                        None
383                    }
384                })
385            });
386            SymbolIvPoint {
387                timestamp_ms: row.timestamp_ms,
388                iv,
389            }
390        })
391        .collect();
392
393    SonicJson(VolSurfaceSymbolResponse {
394        symbol: query.symbol,
395        underlying: parsed.underlying,
396        strike,
397        expiry: expiry_ts_secs,
398        interval_ms: query.interval_ms,
399        points,
400    })
401    .into_response()
402}
403
404#[cfg(test)]
405mod tests {
406    use super::*;
407
408    #[test]
409    fn price_oracle_statuses_are_sorted_and_mark_unready_or_unhealthy() {
410        let statuses = price_oracle_statuses_from_snapshots(
411            vec!["ETH".to_string(), "BTC".to_string(), "HYPE".to_string()],
412            HashMap::from([("BTC".to_string(), 100_000.0), ("ETH".to_string(), 3_000.0)]),
413            HashMap::from([("BTC".to_string(), 99_000.0)]),
414            HashMap::from([("BTC".to_string(), Some(12.5)), ("ETH".to_string(), None)]),
415            vec!["ETH".to_string()],
416        );
417
418        assert_eq!(
419            statuses
420                .iter()
421                .map(|status| status.underlying.as_str())
422                .collect::<Vec<_>>(),
423            vec!["BTC", "ETH", "HYPE"]
424        );
425
426        let btc = &statuses[0];
427        assert!(btc.connected);
428        assert!(btc.ready);
429        assert_eq!(btc.spot_price, Some(100_000.0));
430        assert_eq!(btc.prev_day_price, Some(99_000.0));
431        assert_eq!(btc.staleness_seconds, Some(12.5));
432
433        let eth = &statuses[1];
434        assert!(!eth.connected);
435        assert!(!eth.ready);
436        assert_eq!(eth.spot_price, Some(3_000.0));
437        assert_eq!(eth.staleness_seconds, None);
438
439        let hype = &statuses[2];
440        assert!(hype.connected);
441        assert!(!hype.ready);
442        assert_eq!(hype.spot_price, None);
443    }
444}