1use axum::{extract::State, http::StatusCode, response::IntoResponse};
4use serde::{Deserialize, Serialize};
5use sonic_rs::json;
6use std::collections::HashMap;
7use tracing::warn;
8
9use crate::state::AdminState;
10use hypercall_runtime_api::sonic_json::SonicJson;
11
12#[derive(Debug, Clone, Serialize, utoipa::ToSchema)]
13pub struct VolOracleStatusResponse {
14 pub underlying: String,
15 pub provider: String,
16 pub route_facing: bool,
17 pub connected: bool,
18 pub ready: bool,
19 pub last_update_ts_ms: Option<i64>,
20 pub staleness_seconds: Option<f64>,
21 pub staleness_threshold_seconds: Option<f64>,
22 pub surface_points: usize,
23 pub messages_received: u64,
24 pub last_error: Option<String>,
25}
26
27#[derive(Debug, Clone, Serialize, utoipa::ToSchema)]
28pub struct VolOracleStatusesResponse {
29 pub statuses: Vec<VolOracleStatusResponse>,
30}
31
32#[derive(Debug, Clone, Serialize, utoipa::ToSchema)]
33pub struct PriceOracleStatusResponse {
34 pub underlying: String,
35 pub connected: bool,
36 pub ready: bool,
37 pub spot_price: Option<f64>,
38 pub prev_day_price: Option<f64>,
39 pub staleness_seconds: Option<f64>,
40}
41
42#[derive(Debug, Clone, Serialize, utoipa::ToSchema)]
43pub struct PriceOracleStatusesResponse {
44 pub statuses: Vec<PriceOracleStatusResponse>,
45}
46
47#[utoipa::path(
49 get,
50 path = "/monitoring/vol-oracles",
51 responses(
52 (status = 200, description = "Vol oracle status", body = VolOracleStatusesResponse),
53 (status = 401, description = "Invalid or missing X-Admin-Key header")
54 ),
55 tag = "Monitoring",
56 security(("admin_key" = []))
57)]
58pub async fn vol_oracles(State(app_state): State<AdminState>) -> impl IntoResponse {
59 let statuses = app_state
60 .risk_vol_oracle
61 .statuses()
62 .into_iter()
63 .map(|status| VolOracleStatusResponse {
64 underlying: status.underlying,
65 provider: status.provider.as_str().to_string(),
66 route_facing: status.route_facing,
67 connected: status.connected,
68 ready: status.ready,
69 last_update_ts_ms: status.last_update_ts_ms,
70 staleness_seconds: status.staleness_seconds,
71 staleness_threshold_seconds: status.staleness_threshold_seconds,
72 surface_points: status.surface_points,
73 messages_received: status.messages_received,
74 last_error: status.last_error,
75 })
76 .collect();
77
78 SonicJson(VolOracleStatusesResponse { statuses })
79}
80
81#[utoipa::path(
83 get,
84 path = "/monitoring/price-oracles",
85 responses(
86 (status = 200, description = "Price oracle status", body = PriceOracleStatusesResponse),
87 (status = 401, description = "Invalid or missing X-Admin-Key header")
88 ),
89 tag = "Monitoring",
90 security(("admin_key" = []))
91)]
92pub async fn price_oracles(State(app_state): State<AdminState>) -> impl IntoResponse {
93 let spot_prices = app_state.greeks_cache.get_all_spot_prices_snapshot().await;
94 let prev_day_prices = app_state
95 .greeks_cache
96 .get_all_prev_day_prices_snapshot()
97 .await;
98 let staleness = app_state.greeks_cache.get_spot_price_staleness().await;
99 let unhealthy = app_state.greeks_cache.get_unhealthy_oracles().await;
100
101 let statuses = price_oracle_statuses_from_snapshots(
102 app_state.greeks_cache.get_configured_underlyings(),
103 spot_prices,
104 prev_day_prices,
105 staleness,
106 unhealthy,
107 );
108
109 SonicJson(PriceOracleStatusesResponse { statuses })
110}
111
112fn price_oracle_statuses_from_snapshots(
113 configured_underlyings: Vec<String>,
114 spot_prices: HashMap<String, f64>,
115 prev_day_prices: HashMap<String, f64>,
116 staleness: HashMap<String, Option<f64>>,
117 unhealthy: Vec<String>,
118) -> Vec<PriceOracleStatusResponse> {
119 let mut statuses: Vec<PriceOracleStatusResponse> = configured_underlyings
120 .into_iter()
121 .map(|underlying| {
122 let spot_price = spot_prices.get(&underlying).copied();
123 let is_unhealthy = unhealthy.contains(&underlying);
124 PriceOracleStatusResponse {
125 prev_day_price: prev_day_prices.get(&underlying).copied(),
126 staleness_seconds: staleness.get(&underlying).copied().flatten(),
127 connected: !is_unhealthy,
128 ready: spot_price.is_some() && !is_unhealthy,
129 spot_price,
130 underlying,
131 }
132 })
133 .collect();
134
135 statuses.sort_by(|a, b| a.underlying.cmp(&b.underlying));
136 statuses
137}
138
139#[derive(Debug, Deserialize, utoipa::IntoParams)]
142pub struct VolSurfaceQuery {
143 pub underlying: Option<String>,
145}
146
147#[derive(Debug, Serialize, utoipa::ToSchema)]
148pub struct VolSurfaceApiResponse {
149 pub surfaces: Vec<hypercall_vol_oracle::VolSurfaceSnapshot>,
150}
151
152#[utoipa::path(
154 get,
155 path = "/monitoring/vol-surface",
156 params(VolSurfaceQuery),
157 responses(
158 (status = 200, description = "Vol surface snapshot", body = VolSurfaceApiResponse),
159 (status = 401, description = "Invalid or missing X-Admin-Key header")
160 ),
161 tag = "Monitoring",
162 security(("admin_key" = []))
163)]
164pub async fn get_vol_surface(
165 State(app_state): State<AdminState>,
166 axum::extract::Query(query): axum::extract::Query<VolSurfaceQuery>,
167) -> impl IntoResponse {
168 use std::collections::HashSet;
169
170 let statuses = app_state.risk_vol_oracle.statuses();
171 let underlyings: Vec<String> = if let Some(ref u) = query.underlying {
172 vec![u.clone()]
173 } else {
174 let mut seen = HashSet::new();
175 statuses
176 .iter()
177 .filter(|s| seen.insert(s.underlying.clone()))
178 .map(|s| s.underlying.clone())
179 .collect()
180 };
181
182 let surfaces: Vec<hypercall_vol_oracle::VolSurfaceSnapshot> = underlyings
183 .iter()
184 .filter_map(|u| app_state.risk_vol_oracle.get_surface_snapshot(u))
185 .collect();
186
187 SonicJson(VolSurfaceApiResponse { surfaces })
188}
189
190#[derive(Debug, Deserialize, utoipa::IntoParams)]
191pub struct VolSurfaceHistoryQuery {
192 pub underlying: String,
193 pub interval_ms: i64,
195 #[serde(default = "default_history_limit")]
197 pub limit: usize,
198}
199
200fn default_history_limit() -> usize {
201 500
202}
203
204#[derive(Debug, Serialize, utoipa::ToSchema)]
205pub struct VolSurfaceHistoryResponse {
206 pub underlying: String,
207 pub interval_ms: i64,
208 pub points: Vec<VolSurfaceHistoryPoint>,
209}
210
211#[derive(Debug, Serialize, utoipa::ToSchema)]
212pub struct VolSurfaceHistoryPoint {
213 pub timestamp_ms: i64,
214 pub surface_json: serde_json::Value,
215}
216
217#[utoipa::path(
219 get,
220 path = "/monitoring/vol-surface/history",
221 params(VolSurfaceHistoryQuery),
222 responses(
223 (status = 200, description = "Vol surface history", body = VolSurfaceHistoryResponse),
224 (status = 401, description = "Invalid or missing X-Admin-Key header")
225 ),
226 tag = "Monitoring",
227 security(("admin_key" = []))
228)]
229pub async fn get_vol_surface_history(
230 State(app_state): State<AdminState>,
231 axum::extract::Query(query): axum::extract::Query<VolSurfaceHistoryQuery>,
232) -> impl IntoResponse {
233 let rows = match app_state
234 .db
235 .get_vol_surface_history(&query.underlying, query.interval_ms, query.limit)
236 .await
237 {
238 Ok(rows) => rows,
239 Err(err) => {
240 warn!("Vol surface history query failed: {err:#}");
241 return (
242 StatusCode::INTERNAL_SERVER_ERROR,
243 SonicJson(json!({ "error": err.to_string() })),
244 )
245 .into_response();
246 }
247 };
248
249 let points: Vec<VolSurfaceHistoryPoint> = rows
250 .into_iter()
251 .map(|row| VolSurfaceHistoryPoint {
252 timestamp_ms: row.timestamp_ms,
253 surface_json: row.surface_json,
254 })
255 .collect();
256
257 SonicJson(VolSurfaceHistoryResponse {
258 underlying: query.underlying,
259 interval_ms: query.interval_ms,
260 points,
261 })
262 .into_response()
263}
264
265#[derive(Debug, Deserialize, utoipa::IntoParams)]
266pub struct VolSurfaceSymbolQuery {
267 pub symbol: String,
269 #[serde(default = "default_symbol_interval_ms")]
271 pub interval_ms: i64,
272 #[serde(default = "default_history_limit")]
274 pub limit: usize,
275}
276
277fn default_symbol_interval_ms() -> i64 {
278 hypercall_types::HISTORICAL_THEO_INTERVAL_5M_MS
279}
280
281#[derive(Debug, Serialize, utoipa::ToSchema)]
282pub struct VolSurfaceSymbolResponse {
283 pub symbol: String,
284 pub underlying: String,
285 pub strike: f64,
286 pub expiry: i64,
287 pub interval_ms: i64,
288 pub points: Vec<SymbolIvPoint>,
289}
290
291#[derive(Debug, Serialize, utoipa::ToSchema)]
292pub struct SymbolIvPoint {
293 pub timestamp_ms: i64,
294 pub iv: Option<f64>,
295}
296
297#[utoipa::path(
302 get,
303 path = "/monitoring/vol-surface/symbol",
304 params(VolSurfaceSymbolQuery),
305 responses(
306 (status = 200, description = "Symbol IV time series", body = VolSurfaceSymbolResponse),
307 (status = 400, description = "Invalid symbol or interval"),
308 (status = 401, description = "Invalid or missing X-Admin-Key header")
309 ),
310 tag = "Monitoring",
311 security(("admin_key" = []))
312)]
313pub async fn get_vol_surface_symbol(
314 State(app_state): State<AdminState>,
315 axum::extract::Query(query): axum::extract::Query<VolSurfaceSymbolQuery>,
316) -> impl IntoResponse {
317 use hypercall_types::ParsedOptionSymbol;
318 use rust_decimal::prelude::ToPrimitive;
319
320 let parsed = match ParsedOptionSymbol::from_symbol(&query.symbol) {
321 Ok(p) => p,
322 Err(reason) => {
323 return (
324 StatusCode::BAD_REQUEST,
325 SonicJson(
326 json!({ "error": format!("Invalid symbol {}: {}", query.symbol, reason) }),
327 ),
328 )
329 .into_response();
330 }
331 };
332
333 let expiry_ts_secs =
334 hypercall_types::expiry_date_to_timestamp(&parsed.underlying, parsed.expiry);
335 let strike = match parsed.strike.to_f64() {
338 Some(strike) => strike,
339 None => {
340 return (
341 StatusCode::BAD_REQUEST,
342 SonicJson(json!({
343 "error": format!("Strike {} cannot be represented as f64", parsed.strike)
344 })),
345 )
346 .into_response();
347 }
348 };
349
350 let rows = match app_state
351 .db
352 .get_vol_surface_history(&parsed.underlying, query.interval_ms, query.limit)
353 .await
354 {
355 Ok(rows) => rows,
356 Err(err) => {
357 warn!("Vol surface symbol query failed: {err:#}");
358 return (
359 StatusCode::INTERNAL_SERVER_ERROR,
360 SonicJson(json!({ "error": err.to_string() })),
361 )
362 .into_response();
363 }
364 };
365
366 const STRIKE_TOL: f64 = 0.5;
369
370 let points: Vec<SymbolIvPoint> = rows
371 .into_iter()
372 .map(|row| {
373 let iv = serde_json::from_value::<hypercall_vol_oracle::VolSurfaceSnapshot>(
374 row.surface_json,
375 )
376 .ok()
377 .and_then(|snap| {
378 snap.strike_points.into_iter().find_map(|p| {
379 if p.expiry == expiry_ts_secs && (p.strike - strike).abs() < STRIKE_TOL {
380 Some(p.iv)
381 } else {
382 None
383 }
384 })
385 });
386 SymbolIvPoint {
387 timestamp_ms: row.timestamp_ms,
388 iv,
389 }
390 })
391 .collect();
392
393 SonicJson(VolSurfaceSymbolResponse {
394 symbol: query.symbol,
395 underlying: parsed.underlying,
396 strike,
397 expiry: expiry_ts_secs,
398 interval_ms: query.interval_ms,
399 points,
400 })
401 .into_response()
402}
403
404#[cfg(test)]
405mod tests {
406 use super::*;
407
408 #[test]
409 fn price_oracle_statuses_are_sorted_and_mark_unready_or_unhealthy() {
410 let statuses = price_oracle_statuses_from_snapshots(
411 vec!["ETH".to_string(), "BTC".to_string(), "HYPE".to_string()],
412 HashMap::from([("BTC".to_string(), 100_000.0), ("ETH".to_string(), 3_000.0)]),
413 HashMap::from([("BTC".to_string(), 99_000.0)]),
414 HashMap::from([("BTC".to_string(), Some(12.5)), ("ETH".to_string(), None)]),
415 vec!["ETH".to_string()],
416 );
417
418 assert_eq!(
419 statuses
420 .iter()
421 .map(|status| status.underlying.as_str())
422 .collect::<Vec<_>>(),
423 vec!["BTC", "ETH", "HYPE"]
424 );
425
426 let btc = &statuses[0];
427 assert!(btc.connected);
428 assert!(btc.ready);
429 assert_eq!(btc.spot_price, Some(100_000.0));
430 assert_eq!(btc.prev_day_price, Some(99_000.0));
431 assert_eq!(btc.staleness_seconds, Some(12.5));
432
433 let eth = &statuses[1];
434 assert!(!eth.connected);
435 assert!(!eth.ready);
436 assert_eq!(eth.spot_price, Some(3_000.0));
437 assert_eq!(eth.staleness_seconds, None);
438
439 let hype = &statuses[2];
440 assert!(hype.connected);
441 assert!(!hype.ready);
442 assert_eq!(hype.spot_price, None);
443 }
444}