1use anyhow::Result;
2use rust_decimal::prelude::ToPrimitive;
3use std::collections::HashMap;
4use std::sync::Arc;
5use tokio::sync::RwLock;
6use tracing::{debug, error, info, warn};
7
8use crate::messaging::EventBusTrait;
9use crate::price_oracle::hyperliquid_oracle::HyperliquidMarkPriceOracle;
10use crate::shared::black_scholes_utils::{calculate_greeks, calculate_implied_volatility};
11use crate::shared::order_types::perp_underlying;
12use crate::shared::traits::MarkPriceOracle;
13use crate::snapshot::SyncStatus;
14use crate::types::OptionType;
15use crate::valuation_utils::{intrinsic_option_price, now_unix_timestamp_secs};
16use crate::vol_oracle::risk_oracle::SharedVolOracle;
17use hypercall_runtime_api::QuoteProvider;
18use hypercall_types::{
19 expiry_date_to_timestamp, EngineMessage, Greeks, Market, MarketUpdateMessage,
20 MarketUpdateStatus, OptionType as MessageOptionType,
21};
22
23#[derive(Debug, Clone)]
24struct PricingContext {
25 underlying: String,
26 expiry_timestamp: u64,
27 strike_f64: f64,
28 option_type: OptionType,
29 spot_price: Option<f64>,
30 forward_price: f64,
31 risk_free_rate: f64,
32}
33
34#[derive(Debug, Clone, Copy)]
35struct TheoreticalIv {
36 iv: f64,
37 source: &'static str,
38}
39
40pub struct GreeksCache {
41 instruments: Arc<RwLock<HashMap<String, Market>>>,
43 oracles: HashMap<String, Arc<HyperliquidMarkPriceOracle>>,
45 risk_free_rate: f64,
47 quote_provider: Arc<dyn QuoteProvider>,
49 sync_status: Arc<SyncStatus>,
51 theoretical_iv_overrides: Arc<RwLock<HashMap<String, f64>>>,
53 vol_oracle: RwLock<Option<SharedVolOracle>>,
55}
56
57impl GreeksCache {
58 pub fn configured_underlyings(&self) -> Vec<String> {
60 self.oracles.keys().cloned().collect()
61 }
62
63 pub fn risk_free_rate(&self) -> f64 {
65 self.risk_free_rate
66 }
67
68 fn normalize_expiry_timestamp(underlying: &str, raw_expiry: i64) -> Result<u64> {
69 if (19_000_000..=30_000_000).contains(&raw_expiry) {
72 let expiry_timestamp = expiry_date_to_timestamp(underlying, raw_expiry as u64);
73 return u64::try_from(expiry_timestamp).map_err(|_| {
74 anyhow::anyhow!(
75 "Invalid converted expiry timestamp {} from code {}",
76 expiry_timestamp,
77 raw_expiry
78 )
79 });
80 }
81
82 u64::try_from(raw_expiry)
83 .map_err(|_| anyhow::anyhow!("Invalid negative expiry timestamp: {}", raw_expiry))
84 }
85
86 pub async fn new(
91 diesel_db: &dyn hypercall_db::BootstrapReader,
92 event_bus: Arc<dyn EventBusTrait>,
93 oracles: HashMap<String, Arc<HyperliquidMarkPriceOracle>>,
94 shutdown_rx: tokio::sync::broadcast::Receiver<()>,
95 quote_provider: Arc<dyn QuoteProvider>,
96 ) -> Result<Arc<Self>> {
97 info!(
98 "Creating GreeksCache with oracles for: {:?}",
99 oracles.keys().collect::<Vec<_>>()
100 );
101
102 let sync_status = Arc::new(SyncStatus::new());
103 let cache = Arc::new(Self {
104 instruments: Arc::new(RwLock::new(HashMap::new())),
105 oracles,
106 risk_free_rate: 0.04,
107 quote_provider,
108 sync_status: sync_status.clone(),
109 theoretical_iv_overrides: Arc::new(RwLock::new(HashMap::new())),
110 vol_oracle: RwLock::new(None),
111 });
112
113 cache.load_instruments_from_db(diesel_db).await?;
114
115 cache
117 .clone()
118 .start_market_updates_listener(event_bus, shutdown_rx)
119 .await?;
120
121 let mut all_ready = true;
124 for (underlying, oracle) in &cache.oracles {
125 if oracle.get_spot_price().await.is_none() {
126 warn!(
127 "GreeksCache: oracle for {} does not have a spot price yet",
128 underlying
129 );
130 all_ready = false;
131 }
132 }
133 if all_ready {
134 sync_status.set_ready();
135 info!("GreeksCache ready: all oracles have spot prices");
136 } else {
137 warn!("GreeksCache: not all oracles have spot prices at construction time");
138 }
139
140 Ok(cache)
141 }
142
143 pub async fn set_vol_oracle(&self, oracle: SharedVolOracle) {
145 *self.vol_oracle.write().await = Some(oracle);
146 }
147
148 pub fn sync_status(&self) -> Arc<SyncStatus> {
150 self.sync_status.clone()
151 }
152
153 async fn load_instruments_from_db(
154 &self,
155 diesel_db: &dyn hypercall_db::BootstrapReader,
156 ) -> Result<()> {
157 use hypercall_types::OptionType;
158
159 info!("Loading active instruments from database");
160
161 let instruments = diesel_db.get_all_active_instruments().await?;
162
163 let mut instruments_map = self.instruments.write().await;
164 for instrument in instruments {
165 let option_type = match instrument.option_type {
166 OptionType::Call => MessageOptionType::Call,
167 OptionType::Put => MessageOptionType::Put,
168 };
169
170 let expiry =
171 match Self::normalize_expiry_timestamp(&instrument.underlying, instrument.expiry) {
172 Ok(expiry) => expiry,
173 Err(e) => {
174 error!(
175 "Invalid expiry {} for instrument {}: {}",
176 instrument.expiry, instrument.id, e
177 );
178 continue;
179 }
180 };
181
182 instruments_map.insert(
183 instrument.id.clone(),
184 Market {
185 symbol: instrument.id.clone(),
186 underlying: instrument.underlying.clone(),
187 strike: instrument.strike,
188 expiry,
189 option_type,
190 },
191 );
192 }
193
194 info!("Loaded {} instruments", instruments_map.len());
195 Ok(())
196 }
197
198 async fn start_market_updates_listener(
200 self: Arc<Self>,
201 event_bus: Arc<dyn EventBusTrait>,
202 mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
203 ) -> Result<()> {
204 use crate::shared::topics::TOPIC_MARKET_UPDATES;
205
206 let topics = vec![TOPIC_MARKET_UPDATES.to_string()];
207 let mut receiver = event_bus
208 .subscribe(topics)
209 .await
210 .map_err(|e| anyhow::anyhow!("Failed to subscribe to market-updates: {}", e))?;
211
212 let cache = self.clone();
213 tokio::spawn(async move {
214 info!("GreeksCache market-updates listener started");
215 loop {
216 tokio::select! {
217 _ = shutdown_rx.recv() => {
218 info!("GreeksCache listener stopping");
219 break;
220 }
221 maybe_message = receiver.recv() => {
222 match maybe_message {
223 Some(EngineMessage::MarketUpdate(market_msg)) => {
224 cache.handle_market_update(market_msg).await;
225 }
226 Some(_) => {} None => break,
228 }
229 }
230 }
231 }
232 });
233
234 Ok(())
235 }
236
237 async fn handle_market_update(&self, market_msg: MarketUpdateMessage) {
238 debug!(
239 "Handling market update for symbol: {}",
240 market_msg.market.symbol
241 );
242
243 match market_msg.status {
244 MarketUpdateStatus::MarketCreated | MarketUpdateStatus::MarketAlreadyExists => {
245 info!("New market created: {}", market_msg.market.symbol);
246 let mut market = market_msg.market.clone();
247 let raw_expiry = market.expiry as i64;
248 let expiry = match Self::normalize_expiry_timestamp(&market.underlying, raw_expiry)
249 {
250 Ok(expiry) => expiry,
251 Err(e) => {
252 error!(
253 "Invalid expiry {} for market update {}: {}",
254 raw_expiry, market.symbol, e
255 );
256 return;
257 }
258 };
259 market.expiry = expiry;
260
261 self.instruments
262 .write()
263 .await
264 .insert(market.symbol.clone(), market);
265 }
266 MarketUpdateStatus::MarketDeleted
267 | MarketUpdateStatus::MarketExpired
268 | MarketUpdateStatus::MarketPendingSettlement => {
269 info!("Market removed: {}", market_msg.market.symbol);
270 self.instruments
271 .write()
272 .await
273 .remove(&market_msg.market.symbol);
274 }
275 MarketUpdateStatus::MarketCreationFailed | MarketUpdateStatus::MarketDeletionFailed => {
276 debug!(
278 "Market lifecycle update failed for {}: not mutating cache",
279 market_msg.market.symbol
280 );
281 }
282 }
283 }
284
285 async fn get_pricing_context(&self, symbol: &str) -> Result<PricingContext> {
288 let (underlying, expiry, strike, option_type_message) = {
289 let instruments = self.instruments.read().await;
290 let instrument = instruments
291 .get(symbol)
292 .ok_or_else(|| anyhow::anyhow!("Symbol not found: {}", symbol))?;
293 (
294 instrument.underlying.clone(),
295 instrument.expiry as i64,
296 instrument.strike,
297 instrument.option_type,
298 )
299 };
300
301 let normalized_expiry = Self::normalize_expiry_timestamp(&underlying, expiry)?;
302 let oracle = self.oracles.get(&underlying).cloned().ok_or_else(|| {
303 anyhow::anyhow!(
304 "No mark price oracle configured for underlying: {}",
305 underlying
306 )
307 })?;
308
309 let spot_price = oracle.get_spot_price().await;
310
311 let expiry_i64 = i64::try_from(normalized_expiry).map_err(|_| {
312 anyhow::anyhow!(
313 "Invalid normalized expiry timestamp for {}: {}",
314 symbol,
315 normalized_expiry
316 )
317 })?;
318 let forward_price = oracle.get_mark_price(expiry_i64).await.map_err(|e| {
319 anyhow::anyhow!(
320 "Failed to get forward price for {} at expiry {}: {}",
321 underlying,
322 normalized_expiry,
323 e
324 )
325 })?;
326
327 let strike_f64 = strike
328 .to_f64()
329 .ok_or_else(|| anyhow::anyhow!("Invalid strike for {}: {}", symbol, strike))?;
330
331 let option_type = match option_type_message {
332 MessageOptionType::Call => OptionType::Call,
333 MessageOptionType::Put => OptionType::Put,
334 };
335
336 Ok(PricingContext {
337 underlying,
338 expiry_timestamp: normalized_expiry,
339 strike_f64,
340 option_type,
341 spot_price,
342 forward_price,
343 risk_free_rate: self.risk_free_rate,
344 })
345 }
346
347 fn time_to_expiry_years(expiry_timestamp: u64) -> Result<f64> {
348 let now = std::time::SystemTime::now()
349 .duration_since(std::time::UNIX_EPOCH)
350 .unwrap()
351 .as_secs() as i64;
352 let time_to_expiry = ((expiry_timestamp as i64 - now) as f64) / (365.25 * 24.0 * 3600.0);
353
354 if time_to_expiry <= 0.0 {
355 return Err(anyhow::anyhow!("Option has expired"));
356 }
357
358 Ok(time_to_expiry)
359 }
360
361 async fn resolve_theoretical_iv(
362 &self,
363 symbol: &str,
364 context: &PricingContext,
365 ) -> Result<TheoreticalIv> {
366 if let Some(iv) = self
367 .theoretical_iv_overrides
368 .read()
369 .await
370 .get(symbol)
371 .copied()
372 {
373 return Ok(TheoreticalIv {
374 iv,
375 source: "override",
376 });
377 }
378
379 let vol_oracle_guard = self.vol_oracle.read().await;
380 let vol_oracle = vol_oracle_guard.as_ref().ok_or_else(|| {
381 anyhow::anyhow!(
382 "Vol oracle not available for theoretical pricing of {} (still initializing)",
383 symbol
384 )
385 })?;
386 let expiry_i64 = i64::try_from(context.expiry_timestamp).map_err(|_| {
387 anyhow::anyhow!("Invalid expiry timestamp: {}", context.expiry_timestamp)
388 })?;
389 let iv = vol_oracle
390 .get_iv(&context.underlying, context.strike_f64, expiry_i64)
391 .map_err(|e| anyhow::anyhow!("Vol oracle IV unavailable for {}: {}", symbol, e))?;
392 drop(vol_oracle_guard);
393
394 Ok(TheoreticalIv {
395 iv,
396 source: "vol_oracle",
397 })
398 }
399
400 fn compute_quote_iv_from_market_price(
401 context: &PricingContext,
402 time_to_expiry: f64,
403 market_price: f64,
404 initial_vol: Option<f64>,
405 ) -> Option<f64> {
406 calculate_implied_volatility(
407 &context.option_type,
408 context.forward_price,
409 context.strike_f64,
410 time_to_expiry,
411 context.risk_free_rate,
412 market_price,
413 initial_vol,
414 )
415 }
416
417 async fn compute_theoretical_greeks(&self, symbol: &str) -> Result<Greeks> {
418 let context = self.get_pricing_context(symbol).await?;
419 let time_to_expiry = Self::time_to_expiry_years(context.expiry_timestamp)?;
420 let iv = self.resolve_theoretical_iv(symbol, &context).await?;
421
422 let mut greeks = calculate_greeks(
423 &context.option_type,
424 context.forward_price,
425 context.strike_f64,
426 time_to_expiry,
427 context.risk_free_rate,
428 iv.iv,
429 );
430 greeks.market_mid_price = self.quote_provider.get_quote(symbol).and_then(|q| q.mid);
431
432 debug!(
433 symbol,
434 underlying = context.underlying,
435 ?context.option_type,
436 expiry_timestamp = context.expiry_timestamp,
437 strike = context.strike_f64,
438 spot_price = context.spot_price,
439 forward_price = context.forward_price,
440 risk_free_rate = context.risk_free_rate,
441 time_to_expiry,
442 iv = iv.iv,
443 theoretical_price = greeks.theoretical_price,
444 iv_source = iv.source,
445 "Computed theoretical greeks"
446 );
447
448 Ok(greeks)
449 }
450
451 pub async fn get_greeks(&self, symbol: &str) -> Result<Greeks> {
456 self.compute_theoretical_greeks(symbol).await
457 }
458
459 pub async fn get_theoretical_price(&self, symbol: &str) -> Result<f64> {
461 Ok(self.get_greeks(symbol).await?.theoretical_price)
462 }
463
464 pub async fn get_theoretical_mark(&self, symbol: &str) -> Result<f64> {
466 let option_market = { self.instruments.read().await.get(symbol).cloned() };
467 if let Some(option_market) = option_market {
468 let expiry_timestamp = Self::normalize_expiry_timestamp(
469 &option_market.underlying,
470 option_market.expiry as i64,
471 )?;
472 let now = now_unix_timestamp_secs()?;
473 if i64::try_from(expiry_timestamp)
474 .map(|expiry| expiry <= now)
475 .unwrap_or(false)
476 {
477 let expiry_i64 = i64::try_from(expiry_timestamp).map_err(|_| {
478 anyhow::anyhow!(
479 "Invalid normalized expiry timestamp for {}: {}",
480 symbol,
481 expiry_timestamp
482 )
483 })?;
484 let settlement_price = self
485 .get_settlement_price(&option_market.underlying, expiry_i64)
486 .await
487 .ok_or_else(|| {
488 anyhow::anyhow!(
489 "Missing settlement price for {} at expiry {}",
490 option_market.underlying,
491 expiry_i64
492 )
493 })?;
494 let strike = option_market.strike.to_f64().ok_or_else(|| {
495 anyhow::anyhow!("Invalid strike for {}: {}", symbol, option_market.strike)
496 })?;
497 let option_type = match option_market.option_type {
498 MessageOptionType::Call => OptionType::Call,
499 MessageOptionType::Put => OptionType::Put,
500 };
501 return Ok(intrinsic_option_price(
502 settlement_price,
503 strike,
504 &option_type,
505 ));
506 }
507 return self.get_theoretical_price(symbol).await;
508 }
509
510 if let Some(underlying) = perp_underlying(symbol) {
511 return self.get_spot_price(underlying).await.ok_or_else(|| {
512 anyhow::anyhow!(
513 "Missing spot price for {} while resolving theoretical mark",
514 underlying
515 )
516 });
517 }
518
519 self.get_spot_price(symbol)
520 .await
521 .ok_or_else(|| anyhow::anyhow!("Missing spot price for {}", symbol))
522 }
523
524 pub async fn get_iv(&self, symbol: &str) -> Result<f64> {
526 let context = self.get_pricing_context(symbol).await?;
527 Ok(self.resolve_theoretical_iv(symbol, &context).await?.iv)
528 }
529
530 pub async fn get_bulk_iv(&self, symbols: &[String]) -> HashMap<String, f64> {
534 let mut result = HashMap::with_capacity(symbols.len());
535 for symbol in symbols {
536 match self.get_iv(symbol).await {
537 Ok(iv) => {
538 result.insert(symbol.clone(), iv);
539 }
540 Err(e) => {
541 debug!("Failed to get IV for {}: {}", symbol, e);
542 }
543 }
544 }
545 result
546 }
547
548 pub async fn get_all_iv_snapshot(&self) -> HashMap<String, f64> {
550 let symbols = self.get_cached_symbols().await;
551 let mut result = HashMap::with_capacity(symbols.len());
552 for symbol in symbols {
553 match self.get_iv(&symbol).await {
554 Ok(iv) => {
555 result.insert(symbol, iv);
556 }
557 Err(e) => {
558 debug!("Failed to get IV for {}: {}", symbol, e);
559 }
560 }
561 }
562 result
563 }
564
565 pub async fn get_quote_side_ivs_from_prices(
567 &self,
568 symbol: &str,
569 best_bid: Option<f64>,
570 best_ask: Option<f64>,
571 ) -> Result<(Option<f64>, Option<f64>)> {
572 let context = self.get_pricing_context(symbol).await?;
573 let time_to_expiry = Self::time_to_expiry_years(context.expiry_timestamp)?;
574 let initial_iv = self
575 .resolve_theoretical_iv(symbol, &context)
576 .await
577 .ok()
578 .map(|iv| iv.iv);
579
580 let bid_iv = best_bid.filter(|price| *price > 0.0).and_then(|price| {
581 Self::compute_quote_iv_from_market_price(&context, time_to_expiry, price, initial_iv)
582 });
583 let ask_iv = best_ask.filter(|price| *price > 0.0).and_then(|price| {
584 Self::compute_quote_iv_from_market_price(&context, time_to_expiry, price, initial_iv)
585 });
586
587 Ok((bid_iv, ask_iv))
588 }
589
590 pub async fn get_quote_side_ivs(&self, symbol: &str) -> Result<(Option<f64>, Option<f64>)> {
592 let Some(quote) = self.quote_provider.get_quote(symbol) else {
593 return Ok((None, None));
594 };
595
596 self.get_quote_side_ivs_from_prices(symbol, quote.best_bid, quote.best_ask)
597 .await
598 }
599
600 pub async fn get_all_spot_prices_snapshot(&self) -> HashMap<String, f64> {
602 let mut result = HashMap::with_capacity(self.oracles.len());
603 for (underlying, oracle) in &self.oracles {
604 if let Some(price) = oracle.get_spot_price().await {
605 result.insert(underlying.clone(), price);
606 }
607 }
608 result
609 }
610
611 pub async fn get_all_prev_day_prices_snapshot(&self) -> HashMap<String, f64> {
613 let mut result = HashMap::with_capacity(self.oracles.len());
614 for (underlying, oracle) in &self.oracles {
615 if let Some(price) = oracle.get_prev_day_price().await {
616 result.insert(underlying.clone(), price);
617 }
618 }
619 result
620 }
621
622 pub async fn get_spot_price(&self, underlying: &str) -> Option<f64> {
624 if let Some(oracle) = self.oracles.get(underlying) {
625 oracle.get_spot_price().await
626 } else {
627 warn!("No oracle configured for underlying: {}", underlying);
628 None
629 }
630 }
631
632 pub async fn get_forward_price_quote(
634 &self,
635 underlying: &str,
636 expiry_timestamp: i64,
637 ) -> Option<(f64, f64)> {
638 if let Some(oracle) = self.oracles.get(underlying) {
639 let risk_free_rate = oracle.risk_free_rate();
640 match oracle.get_mark_price(expiry_timestamp).await {
641 Ok(price) => Some((price, risk_free_rate)),
642 Err(e) => {
643 warn!(
644 "Failed to get forward price for {} at expiry {}: {}",
645 underlying, expiry_timestamp, e
646 );
647 None
648 }
649 }
650 } else {
651 warn!("No oracle configured for underlying: {}", underlying);
652 None
653 }
654 }
655
656 pub async fn get_forward_price(&self, underlying: &str, expiry_timestamp: i64) -> Option<f64> {
658 self.get_forward_price_quote(underlying, expiry_timestamp)
659 .await
660 .map(|(forward_price, _)| forward_price)
661 }
662
663 pub async fn get_settlement_price(
665 &self,
666 underlying: &str,
667 expiry_timestamp: i64,
668 ) -> Option<f64> {
669 if let Some(oracle) = self.oracles.get(underlying) {
670 oracle.get_settlement_price(expiry_timestamp).await
671 } else {
672 warn!("No oracle configured for underlying: {}", underlying);
673 None
674 }
675 }
676
677 pub async fn get_cached_symbols(&self) -> Vec<String> {
679 self.instruments.read().await.keys().cloned().collect()
680 }
681
682 pub async fn has_symbol(&self, symbol: &str) -> bool {
684 self.instruments.read().await.contains_key(symbol)
685 }
686
687 pub fn get_configured_underlyings(&self) -> Vec<String> {
689 self.oracles.keys().cloned().collect()
690 }
691
692 pub async fn get_spot_price_staleness(&self) -> HashMap<String, Option<f64>> {
696 let mut result = HashMap::new();
697 for (underlying, oracle) in &self.oracles {
698 result.insert(underlying.clone(), oracle.get_staleness_seconds().await);
699 }
700 result
701 }
702
703 pub async fn get_unhealthy_oracles(&self) -> Vec<String> {
705 let mut unhealthy = Vec::new();
706 for (underlying, oracle) in &self.oracles {
707 if !oracle.is_healthy().await {
708 unhealthy.push(underlying.clone());
709 }
710 }
711 unhealthy
712 }
713
714 #[cfg(any(test, feature = "test-utils"))]
717 pub fn new_empty_for_testing(quote_provider: Arc<dyn QuoteProvider>) -> Arc<Self> {
718 Arc::new(Self {
719 instruments: Arc::new(RwLock::new(HashMap::new())),
720 oracles: HashMap::new(),
721 risk_free_rate: 0.04,
722 quote_provider,
723 sync_status: Arc::new(SyncStatus::new()),
724 theoretical_iv_overrides: Arc::new(RwLock::new(HashMap::new())),
725 vol_oracle: RwLock::new(None),
726 })
727 }
728
729 #[cfg(any(test, feature = "test-utils"))]
734 pub async fn set_spot_price_for_testing(&self, underlying: &str, price: f64) -> bool {
735 if let Some(oracle) = self.oracles.get(underlying) {
736 oracle.set_spot_price_for_testing(price).await;
737 true
738 } else {
739 warn!(
740 "Cannot set spot price for {}: no oracle configured",
741 underlying
742 );
743 false
744 }
745 }
746
747 #[cfg(any(test, feature = "test-utils"))]
751 pub async fn set_theoretical_iv_for_testing(&self, symbol: &str, iv: f64) {
752 self.theoretical_iv_overrides
753 .write()
754 .await
755 .insert(symbol.to_string(), iv);
756 }
757}
758
759use crate::rsm::portfolio_margin::risk_account_builder::SpotPriceSource;
761use async_trait::async_trait;
762
763#[async_trait]
764impl SpotPriceSource for GreeksCache {
765 async fn get_spot_price(&self, underlying: &str) -> Option<f64> {
766 GreeksCache::get_spot_price(self, underlying).await
768 }
769}