1use std::collections::HashMap;
10use std::sync::Arc;
11use std::time::{Duration, Instant};
12
13use anyhow::{Context, Result};
14use async_trait::async_trait;
15use chrono::Utc;
16use metrics::counter;
17use serde::{Deserialize, Serialize};
18use tokio::sync::RwLock;
19use tokio::task::JoinHandle;
20use tracing::{debug, error, info, warn};
21
22use super::hydromancer_client::HydromancerClient;
23use super::hyperliquid_types::{MetaAndAssetCtxsRequest, MetaAndAssetCtxsResponse};
24use super::hyperliquid_ws::HyperliquidWsFeed;
25use crate::shared::traits::{MarkPriceOracle, TxHash, H256};
26use crate::snapshot::SyncStatus;
27use hypercall_db::OracleWriter;
28
29pub const DEFAULT_RISK_FREE_RATE: f64 = 0.05;
31
32pub const DEFAULT_POLL_INTERVAL_MS: u64 = 2000;
34
35pub const DEFAULT_TWAP_WINDOW_SECONDS: u32 = 1800;
37
38pub const DEFAULT_MAX_MEMORY_SAMPLES: usize = 2000;
40
41pub const DEFAULT_MIN_SETTLEMENT_SAMPLES: usize = 500;
43
44const MIN_SETTLEMENT_SAMPLES_FLOOR: usize = 50;
46
47const FALLBACK_FAILURE_TTL: Duration = Duration::from_secs(300);
49
50#[derive(Clone)]
52pub struct HyperliquidOracleConfig {
53 pub api_url: String,
55 pub poll_interval_ms: u64,
57 pub risk_free_rate: f64,
59 pub symbol: String,
61 pub twap_window_seconds: u32,
63 pub oracle_writer: Option<Arc<dyn OracleWriter>>,
65 pub max_memory_samples: usize,
67 pub ws_feed: Option<Arc<HyperliquidWsFeed>>,
69 pub price_notify: Option<Arc<tokio::sync::Notify>>,
71 pub min_settlement_samples: usize,
73}
74
75impl std::fmt::Debug for HyperliquidOracleConfig {
76 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 f.debug_struct("HyperliquidOracleConfig")
78 .field("api_url", &self.api_url)
79 .field("poll_interval_ms", &self.poll_interval_ms)
80 .field("risk_free_rate", &self.risk_free_rate)
81 .field("symbol", &self.symbol)
82 .field("twap_window_seconds", &self.twap_window_seconds)
83 .field("max_memory_samples", &self.max_memory_samples)
84 .field("min_settlement_samples", &self.min_settlement_samples)
85 .field("ws_feed", &self.ws_feed.is_some())
86 .finish()
87 }
88}
89
90impl Default for HyperliquidOracleConfig {
91 fn default() -> Self {
92 Self {
93 api_url: "https://api.hyperliquid.xyz/info".to_string(),
94 poll_interval_ms: DEFAULT_POLL_INTERVAL_MS,
95 risk_free_rate: DEFAULT_RISK_FREE_RATE,
96 symbol: String::new(),
98 twap_window_seconds: DEFAULT_TWAP_WINDOW_SECONDS,
99 oracle_writer: None,
100 max_memory_samples: DEFAULT_MAX_MEMORY_SAMPLES,
101 ws_feed: None,
102 price_notify: None,
103 min_settlement_samples: DEFAULT_MIN_SETTLEMENT_SAMPLES
104 .max(MIN_SETTLEMENT_SAMPLES_FLOOR),
105 }
106 }
107}
108
109impl HyperliquidOracleConfig {
110 pub fn from_pricing_config(config: &hypercall_config::PricingConfig) -> Self {
111 Self {
112 api_url: config.hyperliquid_info_url.clone(),
113 poll_interval_ms: DEFAULT_POLL_INTERVAL_MS,
114 risk_free_rate: DEFAULT_RISK_FREE_RATE,
115 symbol: config.oracle_symbol.clone(),
116 twap_window_seconds: DEFAULT_TWAP_WINDOW_SECONDS,
117 oracle_writer: None,
118 max_memory_samples: DEFAULT_MAX_MEMORY_SAMPLES,
119 ws_feed: None,
120 price_notify: None,
121 min_settlement_samples: config
122 .min_settlement_samples
123 .max(MIN_SETTLEMENT_SAMPLES_FLOOR),
124 }
125 }
126
127 fn validate(&self) -> Result<()> {
129 if self.symbol.is_empty() {
130 anyhow::bail!(
131 "HyperliquidOracleConfig: symbol must be specified (e.g., \"BTC\", \"ETH\")"
132 );
133 }
134 Ok(())
135 }
136}
137
138#[derive(Debug, Clone, Serialize, Deserialize)]
140pub struct PriceSample {
141 pub timestamp_ms: i64,
143 pub price: f64,
145 pub source: String,
147}
148
149#[derive(Debug, Clone)]
151pub struct SettlementWindow {
152 pub expiry_timestamp: i64,
154 pub window_start: i64,
156 pub window_end: i64,
158 pub samples: Vec<PriceSample>,
160 pub settlement_price: Option<f64>,
162 pub persisted: bool,
164}
165
166pub struct OracleFetchResult {
168 pub oracle_price: f64,
169 pub prev_day_px: Option<f64>,
170}
171
172#[derive(Debug, Default)]
174struct OracleState {
175 current_spot: Option<f64>,
177 prev_day_px: Option<f64>,
179 last_fetch_timestamp: Option<i64>,
181 settlements: HashMap<i64, SettlementWindow>,
183 price_seq: u64,
185 is_healthy: bool,
187 test_price_override: bool,
189}
190
191#[derive(Debug, Clone)]
193enum CachedFallbackResult {
194 Success(f64),
196 Failure(Instant),
198}
199
200pub struct HyperliquidMarkPriceOracle {
205 config: HyperliquidOracleConfig,
206 client: reqwest::Client,
207 state: Arc<RwLock<OracleState>>,
208 sync_status: Arc<SyncStatus>,
210 hydromancer_client: Option<Arc<HydromancerClient>>,
212 fallback_cache: Arc<RwLock<HashMap<(String, i64), CachedFallbackResult>>>,
214}
215
216impl HyperliquidMarkPriceOracle {
217 pub fn new(config: HyperliquidOracleConfig) -> Result<Self> {
221 config.validate()?;
222
223 let client = reqwest::Client::builder()
224 .timeout(Duration::from_secs(10))
225 .build()
226 .expect("Failed to create HTTP client");
227
228 Ok(Self {
229 config,
230 client,
231 state: Arc::new(RwLock::new(OracleState::default())),
232 sync_status: Arc::new(SyncStatus::new()),
233 hydromancer_client: None,
234 fallback_cache: Arc::new(RwLock::new(HashMap::new())),
235 })
236 }
237
238 pub fn risk_free_rate(&self) -> f64 {
240 self.config.risk_free_rate
241 }
242
243 pub async fn new_with_init(config: HyperliquidOracleConfig) -> Result<Self> {
249 let oracle = Self::new(config)?;
250
251 if let Some(ref ws_feed) = oracle.config.ws_feed {
253 const WS_WAIT_ATTEMPTS: u32 = 15; for attempt in 0..WS_WAIT_ATTEMPTS {
255 if let Some(price) = ws_feed.get_price(&oracle.config.symbol).await {
256 let prev_day_px = ws_feed.get_prev_day_price(&oracle.config.symbol).await;
257 let mut state = oracle.state.write().await;
258 state.current_spot = Some(price);
259 state.prev_day_px = prev_day_px;
260 state.last_fetch_timestamp = Some(Utc::now().timestamp_millis());
261 state.is_healthy = true;
262 drop(state);
263
264 oracle.sync_status.set_ready();
265 info!(
266 "Oracle initialized for {} via WebSocket: initial_price={} prev_day_px={:?}",
267 oracle.config.symbol, price, prev_day_px
268 );
269 return Ok(oracle);
270 }
271 if attempt < WS_WAIT_ATTEMPTS - 1 {
272 debug!(
273 "Oracle {} waiting for WS price ({}/{})",
274 oracle.config.symbol,
275 attempt + 1,
276 WS_WAIT_ATTEMPTS
277 );
278 tokio::time::sleep(Duration::from_millis(500)).await;
279 }
280 }
281 warn!(
282 "Oracle {} WS feed didn't deliver price in time, falling back to HTTP",
283 oracle.config.symbol
284 );
285 }
286
287 const MAX_RETRIES: u32 = 5;
289 const INITIAL_DELAY_MS: u64 = 1000;
290
291 let mut last_error = None;
292 for attempt in 0..MAX_RETRIES {
293 match oracle.fetch_oracle_price().await {
294 Ok(result) => {
295 {
296 let mut state = oracle.state.write().await;
297 state.current_spot = Some(result.oracle_price);
298 state.prev_day_px = result.prev_day_px;
299 state.last_fetch_timestamp = Some(Utc::now().timestamp_millis());
300 state.is_healthy = true;
301 }
302
303 oracle.sync_status.set_ready();
304
305 info!(
306 "Oracle initialized for {} via HTTP: initial_price={}{}",
307 oracle.config.symbol,
308 result.oracle_price,
309 if attempt > 0 {
310 format!(" (after {} retries)", attempt)
311 } else {
312 String::new()
313 }
314 );
315
316 return Ok(oracle);
317 }
318 Err(e) => {
319 let err_text = e.to_string();
328 if err_text.contains("not found in Hyperliquid universe") {
329 warn!(
330 "Oracle {} symbol unknown to Hyperliquid universe, \
331 not retrying: {}",
332 oracle.config.symbol, e
333 );
334 return Err(e);
335 }
336 let delay_ms = INITIAL_DELAY_MS * 2u64.pow(attempt);
337 warn!(
338 "Oracle {} HTTP init attempt {}/{} failed: {}. Retrying in {}ms...",
339 oracle.config.symbol,
340 attempt + 1,
341 MAX_RETRIES,
342 e,
343 delay_ms
344 );
345 last_error = Some(e);
346 tokio::time::sleep(Duration::from_millis(delay_ms)).await;
347 }
348 }
349 }
350
351 Err(last_error
352 .unwrap_or_else(|| anyhow::anyhow!("Oracle init failed after {} retries", MAX_RETRIES)))
353 }
354
355 pub fn sync_status(&self) -> Arc<SyncStatus> {
357 self.sync_status.clone()
358 }
359
360 pub fn config(&self) -> &HyperliquidOracleConfig {
362 &self.config
363 }
364
365 pub async fn is_healthy(&self) -> bool {
367 let state = self.state.read().await;
368 state.is_healthy
369 }
370
371 pub async fn get_last_fetch_timestamp_ms(&self) -> Option<i64> {
374 let state = self.state.read().await;
375 state.last_fetch_timestamp
376 }
377
378 pub async fn get_staleness_seconds(&self) -> Option<f64> {
381 let state = self.state.read().await;
382 state.last_fetch_timestamp.map(|ts| {
383 let now_ms = chrono::Utc::now().timestamp_millis();
384 (now_ms - ts) as f64 / 1000.0
385 })
386 }
387
388 pub async fn fetch_oracle_price(&self) -> Result<OracleFetchResult> {
390 let request = MetaAndAssetCtxsRequest::new();
391
392 let response = self
393 .client
394 .post(&self.config.api_url)
395 .json(&request)
396 .send()
397 .await
398 .context("Failed to send request to Hyperliquid API")?;
399
400 if !response.status().is_success() {
401 let status = response.status();
402 let error_text = response.text().await.unwrap_or_default();
403 anyhow::bail!("API request failed with status {}: {}", status, error_text);
404 }
405
406 let data: MetaAndAssetCtxsResponse = response
407 .json()
408 .await
409 .context("Failed to parse metaAndAssetCtxs response")?;
410
411 let symbol_index = data
413 .meta
414 .universe
415 .iter()
416 .position(|asset| asset.name == self.config.symbol)
417 .ok_or_else(|| {
418 anyhow::anyhow!(
419 "Symbol {} not found in Hyperliquid universe",
420 self.config.symbol
421 )
422 })?;
423
424 let ctx = data.asset_ctxs.get(symbol_index).ok_or_else(|| {
426 anyhow::anyhow!("Asset context not found for symbol {}", self.config.symbol)
427 })?;
428
429 let oracle_price = ctx.oracle_price().ok_or_else(|| {
430 anyhow::anyhow!("Failed to parse oracle price for {}", self.config.symbol)
431 })?;
432
433 Ok(OracleFetchResult {
434 oracle_price,
435 prev_day_px: ctx.prev_day_price(),
436 })
437 }
438
439 pub fn start_polling(self: &Arc<Self>) -> JoinHandle<()> {
444 let (tx, rx) = tokio::sync::broadcast::channel::<()>(1);
447 std::mem::forget(tx);
448 self.start_polling_with_shutdown(rx)
449 }
450
451 pub fn start_polling_with_shutdown(
458 self: &Arc<Self>,
459 mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
460 ) -> JoinHandle<()> {
461 let oracle = Arc::clone(self);
462
463 tokio::spawn(async move {
464 let mut interval =
465 tokio::time::interval(Duration::from_millis(oracle.config.poll_interval_ms));
466
467 let ws_feed =
468 oracle.config.ws_feed.as_ref().expect(
469 "Oracle polling requires a WebSocket feed — ws_feed must be set in config",
470 );
471 info!(
472 "Oracle {} using WebSocket feed (WS-only, no HTTP fallback)",
473 oracle.config.symbol
474 );
475
476 loop {
477 tokio::select! {
478 _ = shutdown_rx.recv() => {
479 info!("Oracle {} received shutdown signal", oracle.config.symbol);
480 break;
481 }
482 _ = interval.tick() => {
483 match ws_feed.get_price(&oracle.config.symbol).await {
484 Some(price) => {
485 let now_ms = Utc::now().timestamp_millis();
486 let now_secs = now_ms / 1000;
487 let prev_day_px = ws_feed.get_prev_day_price(&oracle.config.symbol).await;
489
490 let mut state = oracle.state.write().await;
491
492 if state.test_price_override {
494 drop(state);
495 continue;
496 }
497
498 state.current_spot = Some(price);
499 if let Some(pdp) = prev_day_px {
500 state.prev_day_px = Some(pdp);
501 }
502 state.last_fetch_timestamp = Some(now_ms);
503 state.price_seq += 1;
504 state.is_healthy = true;
505
506 debug!(
507 "Oracle updated: {} = {} (seq={})",
508 oracle.config.symbol, price, state.price_seq
509 );
510
511 oracle
513 .process_settlement_samples(&mut state, price, now_ms, now_secs)
514 .await;
515
516 if let Some(ref notify) = oracle.config.price_notify {
517 notify.notify_waiters();
518 }
519 }
520 None => {
521 let mut state = oracle.state.write().await;
523 if state.test_price_override {
524 continue;
525 }
526 warn!(
527 "Oracle {}: WS price unavailable, waiting for reconnect",
528 oracle.config.symbol
529 );
530 state.is_healthy = false;
531 }
532 }
533 }
534 }
535 }
536 info!("Oracle {} polling stopped", oracle.config.symbol);
537 })
538 }
539
540 async fn process_settlement_samples(
542 &self,
543 state: &mut OracleState,
544 price: f64,
545 now_ms: i64,
546 now_secs: i64,
547 ) {
548 let sample = PriceSample {
549 timestamp_ms: now_ms,
550 price,
551 source: "hyperliquid".to_string(),
552 };
553
554 let mut expiries_to_compute: Vec<i64> = Vec::new();
556
557 for (&expiry, window) in state.settlements.iter_mut() {
558 if window.settlement_price.is_some() {
560 continue;
561 }
562
563 if now_secs >= window.window_start && now_secs <= window.window_end {
565 window.samples.push(sample.clone());
567
568 if window.samples.len() > self.config.max_memory_samples {
570 let excess = window.samples.len() - self.config.max_memory_samples;
571 window.samples.drain(0..excess);
572 }
573
574 debug!(
575 "Collected settlement sample: expiry={}, samples={}",
576 expiry,
577 window.samples.len()
578 );
579 }
580
581 if now_secs > window.window_end && window.settlement_price.is_none() {
583 expiries_to_compute.push(expiry);
584 }
585 }
586
587 let mut expiries_to_remove: Vec<i64> = Vec::new();
589
590 for expiry in expiries_to_compute {
591 if let Some(window) = state.settlements.get_mut(&expiry) {
592 if !window.samples.is_empty() {
593 let twap = Self::compute_twap_median_of_means(&window.samples, 0.05);
594 window.settlement_price = Some(twap);
595
596 info!(
597 "Settlement computed: expiry={}, twap={}, samples={}",
598 expiry,
599 twap,
600 window.samples.len()
601 );
602
603 if let Some(ref writer) = self.config.oracle_writer {
605 let input = hypercall_db::NewOracleSettlementPriceInput {
606 symbol: self.config.symbol.clone(),
607 expiry_timestamp: window.expiry_timestamp,
608 settlement_price: twap,
609 sample_count: window.samples.len() as i32,
610 window_start: window.window_start,
611 window_end: window.window_end,
612 algorithm: "median_of_means_5pct_trim".to_string(),
613 };
614 if let Err(e) = writer.save_oracle_settlement_price_sync(&input) {
615 error!("Failed to persist settlement: {}", e);
616 } else {
617 window.persisted = true;
618 expiries_to_remove.push(expiry);
620 }
621 }
622 } else {
623 warn!("Settlement window ended with no samples: expiry={}", expiry);
624 expiries_to_remove.push(expiry);
626 }
627 }
628 }
629
630 for expiry in expiries_to_remove {
633 if let Some(window) = state.settlements.remove(&expiry) {
634 debug!(
635 "Removed persisted settlement from memory: expiry={}, price={:?}",
636 expiry, window.settlement_price
637 );
638 }
639 }
640 }
641
642 pub fn compute_forward_price(
650 spot: f64,
651 expiry_timestamp: i64,
652 risk_free_rate: f64,
653 ) -> Result<f64> {
654 let now = Utc::now().timestamp();
655 let time_to_expiry_secs = expiry_timestamp - now;
656
657 if time_to_expiry_secs <= 0 {
658 anyhow::bail!("Expiry {} is in the past (now={})", expiry_timestamp, now);
659 }
660
661 let time_to_expiry_years = time_to_expiry_secs as f64 / (365.25 * 24.0 * 60.0 * 60.0);
663
664 let forward = spot * (risk_free_rate * time_to_expiry_years).exp();
666
667 Ok(forward)
668 }
669
670 pub fn compute_twap_median_of_means(samples: &[PriceSample], trim_pct: f64) -> f64 {
681 if samples.is_empty() {
682 return 0.0;
683 }
684
685 if samples.len() == 1 {
686 return samples[0].price;
687 }
688
689 let mut prices: Vec<f64> = samples.iter().map(|s| s.price).collect();
691 prices.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
692
693 let trim_count = (prices.len() as f64 * trim_pct) as usize;
695 let trimmed = if trim_count * 2 >= prices.len() {
696 &prices[prices.len() / 2..=prices.len() / 2]
698 } else {
699 &prices[trim_count..prices.len() - trim_count]
700 };
701
702 if trimmed.is_empty() {
703 return prices[prices.len() / 2]; }
705
706 let k = (trimmed.len() as f64).sqrt().ceil() as usize;
708 let bucket_size = trimmed.len().div_ceil(k); let mut bucket_means: Vec<f64> = Vec::with_capacity(k);
712
713 for chunk in trimmed.chunks(bucket_size) {
714 let sum: f64 = chunk.iter().sum();
715 let mean = sum / chunk.len() as f64;
716 bucket_means.push(mean);
717 }
718
719 bucket_means.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
721
722 if bucket_means.len().is_multiple_of(2) && bucket_means.len() >= 2 {
723 let mid = bucket_means.len() / 2;
725 (bucket_means[mid - 1] + bucket_means[mid]) / 2.0
726 } else {
727 bucket_means[bucket_means.len() / 2]
729 }
730 }
731
732 fn persist_samples_via_writer(
734 writer: &dyn OracleWriter,
735 symbol: &str,
736 expiry_timestamp: i64,
737 samples: &[PriceSample],
738 ) -> Result<()> {
739 if samples.is_empty() {
740 return Ok(());
741 }
742
743 let inputs: Vec<hypercall_db::NewOraclePriceSampleInput> = samples
744 .iter()
745 .map(|s| hypercall_db::NewOraclePriceSampleInput {
746 symbol: symbol.to_string(),
747 expiry_timestamp,
748 sample_timestamp_ms: s.timestamp_ms,
749 price: s.price,
750 source: s.source.clone(),
751 })
752 .collect();
753
754 writer.save_oracle_price_samples_sync(&inputs)?;
755
756 debug!(
757 "Persisted {} samples for symbol={}, expiry={}",
758 samples.len(),
759 symbol,
760 expiry_timestamp
761 );
762
763 Ok(())
764 }
765
766 pub fn set_hydromancer_client(&mut self, client: Arc<HydromancerClient>) {
768 self.hydromancer_client = Some(client);
769 }
770
771 fn persist_hydromancer_settlement_via_writer(
773 writer: &dyn OracleWriter,
774 symbol: &str,
775 expiry_timestamp: i64,
776 settlement_price: f64,
777 sample_count: usize,
778 window_start: i64,
779 window_end: i64,
780 ) -> Result<()> {
781 let input = hypercall_db::NewOracleSettlementPriceInput {
782 symbol: symbol.to_string(),
783 expiry_timestamp,
784 settlement_price,
785 sample_count: sample_count as i32,
786 window_start,
787 window_end,
788 algorithm: "hydromancer_median_of_means_5pct_trim".to_string(),
789 };
790
791 writer.save_oracle_settlement_price_sync(&input)?;
792
793 info!(
794 "Persisted Hydromancer settlement: symbol={}, expiry={}, price={}, samples={}",
795 symbol, expiry_timestamp, settlement_price, sample_count
796 );
797
798 Ok(())
799 }
800
801 pub async fn attempt_hydromancer_fallback(&self, expiry_timestamp: i64) -> Option<f64> {
806 let hydromancer = match &self.hydromancer_client {
807 Some(c) => c.clone(),
808 None => return None,
809 };
810
811 let symbol = &self.config.symbol;
812 let cache_key = (symbol.clone(), expiry_timestamp);
813
814 {
816 let cache = self.fallback_cache.read().await;
817 if let Some(cached) = cache.get(&cache_key) {
818 match cached {
819 CachedFallbackResult::Success(price) => return Some(*price),
820 CachedFallbackResult::Failure(ts) => {
821 if ts.elapsed() < FALLBACK_FAILURE_TTL {
822 return None;
823 }
824 }
826 }
827 }
828 }
829
830 warn!(
831 "Attempting Hydromancer fallback for symbol={}, expiry={}",
832 symbol, expiry_timestamp
833 );
834
835 let twap_window = self.config.twap_window_seconds as i64;
836 let window_start_ms = (expiry_timestamp - twap_window) * 1000;
837 let window_end_ms = expiry_timestamp * 1000;
838
839 let records = match hydromancer
841 .fetch_oracle_price_history(symbol, window_start_ms, window_end_ms)
842 .await
843 {
844 Ok(r) => r,
845 Err(e) => {
846 error!(
847 "Hydromancer API error for symbol={}, expiry={}: {}",
848 symbol, expiry_timestamp, e
849 );
850 counter!("ht_oracle_settlement_fallback_total", "underlying" => symbol.clone(), "result" => "api_error")
851 .increment(1);
852 let mut cache = self.fallback_cache.write().await;
853 cache.insert(cache_key, CachedFallbackResult::Failure(Instant::now()));
854 return None;
855 }
856 };
857
858 let samples =
860 HydromancerClient::records_to_price_samples(records, window_start_ms, window_end_ms);
861
862 if samples.len() < self.config.min_settlement_samples {
864 warn!(
865 "Hydromancer returned insufficient samples for symbol={}, expiry={}: {} < {}",
866 symbol,
867 expiry_timestamp,
868 samples.len(),
869 self.config.min_settlement_samples
870 );
871 counter!("ht_oracle_settlement_fallback_total", "underlying" => symbol.clone(), "result" => "insufficient_samples")
872 .increment(1);
873 let mut cache = self.fallback_cache.write().await;
874 cache.insert(cache_key, CachedFallbackResult::Failure(Instant::now()));
875 return None;
876 }
877
878 let settlement_price = Self::compute_twap_median_of_means(&samples, 0.05);
880 let sample_count = samples.len();
881
882 info!(
883 "Hydromancer fallback computed settlement: symbol={}, expiry={}, price={}, samples={}",
884 symbol, expiry_timestamp, settlement_price, sample_count
885 );
886
887 if let Some(ref writer) = self.config.oracle_writer {
889 let window_start_secs = expiry_timestamp - twap_window;
890
891 if let Err(e) = Self::persist_hydromancer_settlement_via_writer(
892 writer.as_ref(),
893 symbol,
894 expiry_timestamp,
895 settlement_price,
896 sample_count,
897 window_start_secs,
898 expiry_timestamp,
899 ) {
900 error!(
901 "Failed to persist Hydromancer settlement for symbol={}, expiry={}: {}",
902 symbol, expiry_timestamp, e
903 );
904 }
905
906 if let Err(e) = Self::persist_samples_via_writer(
907 writer.as_ref(),
908 symbol,
909 expiry_timestamp,
910 &samples,
911 ) {
912 error!(
913 "Failed to persist Hydromancer samples for symbol={}, expiry={}: {}",
914 symbol, expiry_timestamp, e
915 );
916 }
917 }
918
919 counter!("ht_oracle_settlement_fallback_total", "underlying" => symbol.clone(), "result" => "success")
920 .increment(1);
921
922 let mut cache = self.fallback_cache.write().await;
924 cache.insert(cache_key, CachedFallbackResult::Success(settlement_price));
925
926 Some(settlement_price)
927 }
928
929 #[cfg(any(test, feature = "test-utils"))]
934 pub async fn set_spot_price_for_testing(&self, price: f64) {
935 let mut state = self.state.write().await;
936 state.current_spot = Some(price);
937 state.last_fetch_timestamp = Some(chrono::Utc::now().timestamp_millis());
938 state.is_healthy = true;
939 state.test_price_override = true;
940 state.price_seq += 1;
941 drop(state);
942 self.sync_status.set_ready();
943 if let Some(ref notify) = self.config.price_notify {
944 notify.notify_waiters();
945 }
946 info!(
947 "Test mode: Set spot price for {} to ${:.2} (WS/HTTP updates frozen)",
948 self.config.symbol, price
949 );
950 }
951}
952
953#[async_trait]
954impl MarkPriceOracle for HyperliquidMarkPriceOracle {
955 async fn get_spot_price(&self) -> Option<f64> {
956 let state = self.state.read().await;
957 state.current_spot
958 }
959
960 async fn get_prev_day_price(&self) -> Option<f64> {
961 let state = self.state.read().await;
962 state.prev_day_px
963 }
964
965 async fn get_mark_price(&self, expiry_timestamp: i64) -> Result<f64> {
966 let state = self.state.read().await;
967
968 let spot = state
969 .current_spot
970 .ok_or_else(|| anyhow::anyhow!("No spot price available"))?;
971
972 Self::compute_forward_price(spot, expiry_timestamp, self.config.risk_free_rate)
973 }
974
975 async fn register_settlement(&self, expiry_timestamp: i64, twap_seconds: u32) {
976 let mut state = self.state.write().await;
977
978 if state.settlements.contains_key(&expiry_timestamp) {
980 warn!(
981 "Settlement already registered for expiry {}",
982 expiry_timestamp
983 );
984 return;
985 }
986
987 let window_start = expiry_timestamp - twap_seconds as i64;
988
989 let window = SettlementWindow {
990 expiry_timestamp,
991 window_start,
992 window_end: expiry_timestamp,
993 samples: Vec::with_capacity(twap_seconds as usize / 2), settlement_price: None,
995 persisted: false,
996 };
997
998 state.settlements.insert(expiry_timestamp, window);
999
1000 info!(
1001 "Registered settlement: expiry={}, window_start={}, window_end={}",
1002 expiry_timestamp, window_start, expiry_timestamp
1003 );
1004 }
1005
1006 async fn get_settlement_price(&self, expiry_timestamp: i64) -> Option<f64> {
1007 {
1009 let state = self.state.read().await;
1010 if let Some(price) = state
1011 .settlements
1012 .get(&expiry_timestamp)
1013 .and_then(|w| w.settlement_price)
1014 {
1015 return Some(price);
1016 }
1017 }
1018
1019 if let Some(ref writer) = self.config.oracle_writer {
1021 match writer.get_oracle_settlement_price_sync(&self.config.symbol, expiry_timestamp) {
1022 Ok(Some(price)) => return Some(price),
1023 Ok(None) => {}
1024 Err(e) => {
1025 warn!(
1026 "Failed to load settlement from DB for expiry {}: {}",
1027 expiry_timestamp, e
1028 );
1029 }
1030 }
1031 }
1032
1033 let now = Utc::now().timestamp();
1035 if now > expiry_timestamp && self.hydromancer_client.is_some() {
1036 return self.attempt_hydromancer_fallback(expiry_timestamp).await;
1037 }
1038
1039 None
1040 }
1041
1042 async fn commit_price(&self, root: H256) -> Result<TxHash> {
1043 let state = self.state.read().await;
1044 let price_seq = state.price_seq;
1045
1046 info!(
1047 "Committing price root: {} (seq={})",
1048 hex::encode(root),
1049 price_seq
1050 );
1051
1052 Ok(format!("0x{}", hex::encode(root)))
1057 }
1058
1059 async fn get_price_seq(&self) -> u64 {
1060 let state = self.state.read().await;
1061 state.price_seq
1062 }
1063}
1064
1065#[cfg(test)]
1066mod tests {
1067 use super::*;
1068
1069 #[test]
1070 fn test_forward_price_calculation() {
1071 let spot = 100.0;
1073 let now = Utc::now().timestamp();
1074 let one_year_secs = 365.25 * 24.0 * 60.0 * 60.0;
1075 let expiry = now + one_year_secs as i64;
1076 let r = 0.05;
1077
1078 let forward = HyperliquidMarkPriceOracle::compute_forward_price(spot, expiry, r).unwrap();
1079
1080 let expected = spot * (r * 1.0).exp();
1082 assert!((forward - expected).abs() < 0.01);
1083 }
1084
1085 #[test]
1086 fn test_forward_price_expired() {
1087 let spot = 100.0;
1088 let expiry = Utc::now().timestamp() - 1000; let r = 0.05;
1090
1091 let result = HyperliquidMarkPriceOracle::compute_forward_price(spot, expiry, r);
1092 assert!(result.is_err());
1093 }
1094
1095 #[test]
1096 fn test_twap_simple() {
1097 let samples = vec![
1098 PriceSample {
1099 timestamp_ms: 1000,
1100 price: 100.0,
1101 source: "test".to_string(),
1102 },
1103 PriceSample {
1104 timestamp_ms: 2000,
1105 price: 102.0,
1106 source: "test".to_string(),
1107 },
1108 PriceSample {
1109 timestamp_ms: 3000,
1110 price: 101.0,
1111 source: "test".to_string(),
1112 },
1113 ];
1114
1115 let twap = HyperliquidMarkPriceOracle::compute_twap_median_of_means(&samples, 0.0);
1116 assert!((twap - 101.0).abs() < 1.0);
1118 }
1119
1120 #[test]
1121 fn test_twap_with_outlier() {
1122 let mut samples: Vec<PriceSample> = (0..100)
1123 .map(|i| PriceSample {
1124 timestamp_ms: i * 1000,
1125 price: 100.0 + (i as f64 * 0.1), source: "test".to_string(),
1127 })
1128 .collect();
1129
1130 samples.push(PriceSample {
1132 timestamp_ms: 100000,
1133 price: 1000.0, source: "test".to_string(),
1135 });
1136 samples.push(PriceSample {
1137 timestamp_ms: 101000,
1138 price: 10.0, source: "test".to_string(),
1140 });
1141
1142 let twap_trimmed = HyperliquidMarkPriceOracle::compute_twap_median_of_means(&samples, 0.05);
1143 let twap_untrimmed =
1144 HyperliquidMarkPriceOracle::compute_twap_median_of_means(&samples, 0.0);
1145
1146 assert!(
1152 twap_trimmed > 100.0 && twap_trimmed < 115.0,
1153 "Trimmed TWAP {} should be in reasonable range",
1154 twap_trimmed
1155 );
1156 assert!(twap_untrimmed > 0.0, "Untrimmed TWAP should be positive");
1157 println!(
1158 "TWAP test: trimmed={:.2}, untrimmed={:.2}",
1159 twap_trimmed, twap_untrimmed
1160 );
1161 }
1162
1163 #[test]
1164 fn test_twap_empty() {
1165 let samples: Vec<PriceSample> = vec![];
1166 let twap = HyperliquidMarkPriceOracle::compute_twap_median_of_means(&samples, 0.05);
1167 assert_eq!(twap, 0.0);
1168 }
1169
1170 #[test]
1171 fn test_twap_single_sample() {
1172 let samples = vec![PriceSample {
1173 timestamp_ms: 1000,
1174 price: 42.5,
1175 source: "test".to_string(),
1176 }];
1177 let twap = HyperliquidMarkPriceOracle::compute_twap_median_of_means(&samples, 0.05);
1178 assert_eq!(twap, 42.5);
1179 }
1180
1181 #[tokio::test]
1182 async fn test_settlement_registration() {
1183 let config = HyperliquidOracleConfig {
1184 symbol: "BTC".to_string(),
1185 ..Default::default()
1186 };
1187 let oracle = HyperliquidMarkPriceOracle::new(config).expect("Failed to create oracle");
1188
1189 let expiry = Utc::now().timestamp() + 3600; oracle.register_settlement(expiry, 1800).await;
1191
1192 let state = oracle.state.read().await;
1194 assert!(state.settlements.contains_key(&expiry));
1195
1196 let window = state.settlements.get(&expiry).unwrap();
1197 assert_eq!(window.expiry_timestamp, expiry);
1198 assert_eq!(window.window_start, expiry - 1800);
1199 assert_eq!(window.window_end, expiry);
1200 assert!(window.settlement_price.is_none());
1201 }
1202
1203 #[test]
1204 fn test_config_requires_symbol() {
1205 let config = HyperliquidOracleConfig::default();
1207 let result = HyperliquidMarkPriceOracle::new(config);
1208 assert!(result.is_err());
1209 let err = result.err().unwrap();
1210 assert!(
1211 err.to_string().contains("symbol must be specified"),
1212 "Expected error about missing symbol, got: {}",
1213 err
1214 );
1215 }
1216
1217 #[tokio::test]
1218 async fn test_no_fallback_when_no_client() {
1219 let config = HyperliquidOracleConfig {
1220 symbol: "BTC".to_string(),
1221 ..Default::default()
1222 };
1223 let oracle = HyperliquidMarkPriceOracle::new(config).unwrap();
1224
1225 let past_expiry = Utc::now().timestamp() - 3600;
1227 let result = oracle.get_settlement_price(past_expiry).await;
1228 assert!(result.is_none());
1229 }
1230
1231 #[tokio::test]
1232 async fn test_no_fallback_when_memory_hit() {
1233 let config = HyperliquidOracleConfig {
1234 symbol: "BTC".to_string(),
1235 ..Default::default()
1236 };
1237 let mut oracle = HyperliquidMarkPriceOracle::new(config).unwrap();
1238 let mock_client = Arc::new(HydromancerClient::with_base_url(
1239 "http://should-not-be-called".to_string(),
1240 "key".to_string(),
1241 ));
1242 oracle.set_hydromancer_client(mock_client);
1243
1244 let past_expiry = Utc::now().timestamp() - 3600;
1246 {
1247 let mut state = oracle.state.write().await;
1248 state.settlements.insert(
1249 past_expiry,
1250 SettlementWindow {
1251 expiry_timestamp: past_expiry,
1252 window_start: past_expiry - 1800,
1253 window_end: past_expiry,
1254 samples: vec![],
1255 settlement_price: Some(70000.0),
1256 persisted: true,
1257 },
1258 );
1259 }
1260
1261 let result = oracle.get_settlement_price(past_expiry).await;
1262 assert_eq!(result, Some(70000.0));
1263 }
1264
1265 #[tokio::test]
1266 async fn test_no_fallback_before_window_end() {
1267 let config = HyperliquidOracleConfig {
1268 symbol: "BTC".to_string(),
1269 ..Default::default()
1270 };
1271 let mut oracle = HyperliquidMarkPriceOracle::new(config).unwrap();
1272 let mock_client = Arc::new(HydromancerClient::with_base_url(
1273 "http://should-not-be-called".to_string(),
1274 "key".to_string(),
1275 ));
1276 oracle.set_hydromancer_client(mock_client);
1277
1278 let future_expiry = Utc::now().timestamp() + 3600;
1280 let result = oracle.get_settlement_price(future_expiry).await;
1281 assert!(result.is_none());
1282 }
1283
1284 #[tokio::test]
1285 async fn test_fallback_triggers_on_miss() {
1286 use mockito::Server;
1287
1288 let mut server = Server::new_async().await;
1289
1290 let past_expiry = 100000_i64; let twap_window = 1800_i64;
1293 let window_start_ms = (past_expiry - twap_window) * 1000;
1294
1295 let samples: Vec<String> = (0..600)
1297 .map(|i| {
1298 let ts = window_start_ms + i * 3000; format!(
1300 r#"{{"time":{},"dex":"Hyperliquid","coin":"BTC","oraclePx":"70000.0","markPx":"70000.0","extPerpPx":null}}"#,
1301 ts
1302 )
1303 })
1304 .collect();
1305
1306 server
1307 .mock("POST", "/")
1308 .with_status(200)
1309 .with_body(format!("[{}]", samples.join(",")))
1310 .create_async()
1311 .await;
1312
1313 let config = HyperliquidOracleConfig {
1314 symbol: "BTC".to_string(),
1315 min_settlement_samples: 50,
1316 twap_window_seconds: twap_window as u32,
1317 ..Default::default()
1318 };
1319 let mut oracle = HyperliquidMarkPriceOracle::new(config).unwrap();
1320 let client = Arc::new(HydromancerClient::with_base_url(
1321 server.url(),
1322 "test-key".to_string(),
1323 ));
1324 oracle.set_hydromancer_client(client);
1325
1326 let result = oracle.get_settlement_price(past_expiry).await;
1327 assert!(result.is_some());
1328 assert_eq!(result.unwrap(), 70000.0);
1329 }
1330
1331 #[tokio::test]
1332 async fn test_cache_prevents_repeat_calls() {
1333 use mockito::Server;
1334
1335 let mut server = Server::new_async().await;
1336
1337 let past_expiry = 100000_i64;
1338 let twap_window = 1800_i64;
1339 let window_start_ms = (past_expiry - twap_window) * 1000;
1340
1341 let samples: Vec<String> = (0..100)
1342 .map(|i| {
1343 let ts = window_start_ms + i * 3000;
1344 format!(
1345 r#"{{"time":{},"dex":"Hyperliquid","coin":"BTC","oraclePx":"70000.0","markPx":"70000.0","extPerpPx":null}}"#,
1346 ts
1347 )
1348 })
1349 .collect();
1350
1351 let mock = server
1352 .mock("POST", "/")
1353 .with_status(200)
1354 .with_body(format!("[{}]", samples.join(",")))
1355 .expect(1) .create_async()
1357 .await;
1358
1359 let config = HyperliquidOracleConfig {
1360 symbol: "BTC".to_string(),
1361 min_settlement_samples: 50,
1362 twap_window_seconds: twap_window as u32,
1363 ..Default::default()
1364 };
1365 let mut oracle = HyperliquidMarkPriceOracle::new(config).unwrap();
1366 let client = Arc::new(HydromancerClient::with_base_url(
1367 server.url(),
1368 "key".to_string(),
1369 ));
1370 oracle.set_hydromancer_client(client);
1371
1372 let result1 = oracle.attempt_hydromancer_fallback(past_expiry).await;
1374 assert!(result1.is_some());
1375
1376 let result2 = oracle.attempt_hydromancer_fallback(past_expiry).await;
1378 assert_eq!(result1, result2);
1379
1380 mock.assert_async().await;
1381 }
1382
1383 #[tokio::test]
1384 async fn test_insufficient_samples_returns_none() {
1385 use mockito::Server;
1386
1387 let mut server = Server::new_async().await;
1388
1389 let past_expiry = 100000_i64;
1390 let twap_window = 1800_i64;
1391 let window_start_ms = (past_expiry - twap_window) * 1000;
1392
1393 let samples: Vec<String> = (0..10)
1395 .map(|i| {
1396 let ts = window_start_ms + i * 3000;
1397 format!(
1398 r#"{{"time":{},"dex":"Hyperliquid","coin":"BTC","oraclePx":"70000.0","markPx":"70000.0","extPerpPx":null}}"#,
1399 ts
1400 )
1401 })
1402 .collect();
1403
1404 server
1405 .mock("POST", "/")
1406 .with_status(200)
1407 .with_body(format!("[{}]", samples.join(",")))
1408 .create_async()
1409 .await;
1410
1411 let config = HyperliquidOracleConfig {
1412 symbol: "BTC".to_string(),
1413 min_settlement_samples: 500, twap_window_seconds: twap_window as u32,
1415 ..Default::default()
1416 };
1417 let mut oracle = HyperliquidMarkPriceOracle::new(config).unwrap();
1418 let client = Arc::new(HydromancerClient::with_base_url(
1419 server.url(),
1420 "key".to_string(),
1421 ));
1422 oracle.set_hydromancer_client(client);
1423
1424 let result = oracle.attempt_hydromancer_fallback(past_expiry).await;
1425 assert!(result.is_none());
1426 }
1427
1428 #[tokio::test]
1429 async fn test_min_settlement_samples_floor() {
1430 let config = HyperliquidOracleConfig {
1432 symbol: "BTC".to_string(),
1433 min_settlement_samples: 10, ..Default::default()
1435 };
1436 assert_eq!(config.min_settlement_samples, 10);
1441
1442 let default_config = HyperliquidOracleConfig {
1444 symbol: "BTC".to_string(),
1445 ..Default::default()
1446 };
1447 assert!(default_config.min_settlement_samples >= MIN_SETTLEMENT_SAMPLES_FLOOR);
1448 }
1449}