Skip to main content

hypercall_api/caches/
instruments.rs

1use anyhow::{anyhow, Result};
2use arc_swap::ArcSwap;
3use axum::body::Bytes;
4use hypercall_types::{InstrumentResponse, InstrumentSpecResponse, TickSizeStep};
5use rust_decimal::prelude::ToPrimitive;
6use serde::Serialize;
7use std::collections::BTreeMap;
8use std::sync::Arc;
9use std::time::{Duration, Instant, SystemTime};
10use tracing::error;
11
12use crate::boundary::market_inputs::InstrumentsCacheReader;
13use crate::models::{Instrument, InstrumentStatus};
14
15const DEFAULT_REFRESH_INTERVAL_MS: u64 = 10_000;
16const INSTRUMENTS_SNAPSHOT_ENVELOPE_SCHEMA_VERSION: u32 = 1;
17
18#[derive(Clone)]
19struct CachedInstrumentResponse {
20    status: InstrumentStatus,
21    response: InstrumentResponse,
22    spec: InstrumentSpecResponse,
23}
24
25struct InstrumentsSnapshotData {
26    all_by_underlying: BTreeMap<String, Vec<CachedInstrumentResponse>>,
27    published_response: Bytes,
28    built_at: SystemTime,
29}
30
31pub struct InstrumentsSnapshotCache {
32    instruments_cache: Arc<dyn InstrumentsCacheReader>,
33    snapshot: ArcSwap<InstrumentsSnapshotData>,
34    refresh_interval: Duration,
35}
36
37impl InstrumentsSnapshotCache {
38    pub fn new(instruments_cache: Arc<dyn InstrumentsCacheReader>) -> Self {
39        Self {
40            instruments_cache,
41            snapshot: ArcSwap::from_pointee(InstrumentsSnapshotData {
42                all_by_underlying: BTreeMap::new(),
43                published_response: Bytes::from_static(
44                    br#"{"schema_version":1,"built_at_ms":0,"payload":{}}"#,
45                ),
46                built_at: SystemTime::UNIX_EPOCH,
47            }),
48            refresh_interval: Duration::from_millis(DEFAULT_REFRESH_INTERVAL_MS),
49        }
50    }
51
52    pub fn with_refresh_interval(mut self, refresh_interval: Duration) -> Self {
53        self.refresh_interval = refresh_interval;
54        self
55    }
56
57    pub async fn refresh_once(&self) -> Result<()> {
58        let load_start = Instant::now();
59        let mut instruments = self.instruments_cache.get_all().await;
60        instruments.sort_by(|a, b| {
61            (
62                a.underlying.as_str(),
63                a.expiry,
64                a.strike,
65                a.option_type.as_str(),
66                a.id.as_str(),
67            )
68                .cmp(&(
69                    b.underlying.as_str(),
70                    b.expiry,
71                    b.strike,
72                    b.option_type.as_str(),
73                    b.id.as_str(),
74                ))
75        });
76        metrics::histogram!("ht_instruments_snapshot_load_seconds")
77            .record(load_start.elapsed().as_secs_f64());
78
79        let build_start = Instant::now();
80        let mut all_by_underlying = BTreeMap::<String, Vec<CachedInstrumentResponse>>::new();
81        for instrument in instruments {
82            all_by_underlying
83                .entry(instrument.underlying.to_uppercase())
84                .or_default()
85                .push(CachedInstrumentResponse {
86                    status: instrument.status.clone(),
87                    response: instrument_to_response(&instrument),
88                    spec: instrument_to_spec(&instrument)?,
89                });
90        }
91        metrics::histogram!("ht_instruments_snapshot_build_seconds")
92            .record(build_start.elapsed().as_secs_f64());
93
94        let serialize_start = Instant::now();
95        let active_by_underlying: BTreeMap<String, Vec<InstrumentResponse>> = all_by_underlying
96            .iter()
97            .filter_map(|(underlying, entries)| {
98                let active_entries: Vec<InstrumentResponse> = entries
99                    .iter()
100                    .filter(|entry| entry.status.is_active())
101                    .map(|entry| entry.response.clone())
102                    .collect();
103                (!active_entries.is_empty()).then_some((underlying.clone(), active_entries))
104            })
105            .collect();
106        let built_at = SystemTime::now();
107        let published_response = Bytes::from(
108            serde_json::to_vec(&PublishedInstrumentsSnapshotRef {
109                schema_version: INSTRUMENTS_SNAPSHOT_ENVELOPE_SCHEMA_VERSION,
110                built_at_ms: system_time_to_millis(built_at)?,
111                payload: &active_by_underlying,
112            })
113            .map_err(|e| anyhow!("Failed to serialize instruments snapshot: {}", e))?,
114        );
115        metrics::histogram!("ht_instruments_snapshot_serialize_seconds")
116            .record(serialize_start.elapsed().as_secs_f64());
117
118        self.snapshot.store(Arc::new(InstrumentsSnapshotData {
119            all_by_underlying,
120            published_response,
121            built_at,
122        }));
123        metrics::counter!("ht_instruments_snapshot_refresh_total", "status" => "success")
124            .increment(1);
125        Ok(())
126    }
127
128    pub async fn initialize(&self) {
129        if let Err(error) = self.refresh_once().await {
130            metrics::counter!("ht_instruments_snapshot_refresh_total", "status" => "error")
131                .increment(1);
132            error!(
133                error = %error,
134                "Initial /instruments snapshot build failed; serving cold-start payload"
135            );
136        }
137    }
138
139    pub fn start_with_shutdown(
140        self: Arc<Self>,
141        mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
142    ) -> tokio::task::JoinHandle<()> {
143        tokio::spawn(async move {
144            let mut interval = tokio::time::interval(self.refresh_interval);
145            loop {
146                tokio::select! {
147                    _ = shutdown_rx.recv() => {
148                        break;
149                    }
150                    _ = interval.tick() => {
151                        if let Err(error) = self.refresh_once().await {
152                            metrics::counter!("ht_instruments_snapshot_refresh_total", "status" => "error")
153                                .increment(1);
154                            error!(
155                                error = %error,
156                                "Failed to refresh /instruments snapshot; keeping last-good"
157                            );
158                        }
159                    }
160                }
161            }
162        })
163    }
164
165    pub fn get_filtered(
166        &self,
167        currency: &str,
168        status_filter: &Option<Vec<InstrumentStatus>>,
169    ) -> (Vec<InstrumentResponse>, SystemTime) {
170        let snapshot = self.snapshot.load();
171        let key = currency.to_uppercase();
172        let result = snapshot
173            .all_by_underlying
174            .get(&key)
175            .map(|entries| {
176                entries
177                    .iter()
178                    .filter(|entry| matches_status_filter(&entry.status, status_filter))
179                    .map(|entry| entry.response.clone())
180                    .collect()
181            })
182            .unwrap_or_default();
183        (result, snapshot.built_at)
184    }
185
186    pub fn get_specs_filtered(
187        &self,
188        currency: &str,
189        status_filter: &Option<Vec<InstrumentStatus>>,
190    ) -> (Vec<InstrumentSpecResponse>, SystemTime) {
191        let snapshot = self.snapshot.load();
192        let key = currency.to_uppercase();
193        let result = snapshot
194            .all_by_underlying
195            .get(&key)
196            .map(|entries| {
197                entries
198                    .iter()
199                    .filter(|entry| matches_status_filter(&entry.status, status_filter))
200                    .map(|entry| entry.spec.clone())
201                    .collect()
202            })
203            .unwrap_or_default();
204        (result, snapshot.built_at)
205    }
206
207    pub fn published_response(&self) -> (Bytes, SystemTime) {
208        let snapshot = self.snapshot.load();
209        (snapshot.published_response.clone(), snapshot.built_at)
210    }
211}
212
213fn instrument_to_response(instrument: &Instrument) -> InstrumentResponse {
214    let currency = instrument.underlying.to_uppercase();
215    InstrumentResponse {
216        price_index: format!("{}_usd", currency.to_lowercase()),
217        rfq: instrument.trading_mode.allows_rfq(),
218        orderbook: instrument.trading_mode.allows_orderbook(),
219        kind: "option".to_string(),
220        instrument_name: instrument.id.clone(),
221        option_token_address: instrument.option_token_address,
222        maker_commission: 0.0003,
223        taker_commission: 0.0003,
224        instrument_type: "reversed".to_string(),
225        expiration_timestamp: (instrument.expiry * 1000) as i64,
226        creation_timestamp: instrument.updated_at.timestamp_millis(),
227        is_active: instrument.status.is_active(),
228        option_type: instrument.option_type.to_lowercase(),
229        contract_size: 1.0,
230        tick_size: 0.0001,
231        strike: instrument.strike.to_f64().unwrap_or(0.0),
232        instrument_id: instrument.instrument_id,
233        settlement_period: "day".to_string(),
234        min_trade_amount: 0.1,
235        block_trade_commission: 0.0003,
236        block_trade_min_trade_amount: 25.0,
237        block_trade_tick_size: 0.0001,
238        settlement_currency: currency.clone(),
239        base_currency: currency.clone(),
240        counter_currency: "USD".to_string(),
241        quote_currency: "USD".to_string(),
242        tick_size_steps: vec![TickSizeStep {
243            tick_size: 0.0005,
244            above_price: 0.005,
245        }],
246    }
247}
248
249fn instrument_to_spec(instrument: &Instrument) -> Result<InstrumentSpecResponse> {
250    let base_asset = instrument.underlying.to_uppercase();
251    let option_kind = match instrument.option_type.to_lowercase().as_str() {
252        "call" => Some("C".to_string()),
253        "put" => Some("P".to_string()),
254        other => {
255            return Err(anyhow!(
256                "invalid option_type '{}' for instrument {}",
257                other,
258                instrument.id
259            ));
260        }
261    };
262    // Instrument.expiry is already the per-underlying resolved Unix timestamp
263    // in seconds (converted from the YYYYMMDD code at registry load).
264    let expiry_ns = instrument
265        .expiry
266        .checked_mul(1_000_000_000)
267        .and_then(|value| i64::try_from(value).ok())
268        .ok_or_else(|| {
269            anyhow!(
270                "invalid expiry {} for instrument {}",
271                instrument.expiry,
272                instrument.id
273            )
274        })?;
275    let settlement_time = hypercall_types::expiry_times().for_underlying(&instrument.underlying);
276    let settlement_hour_utc = u8::try_from(settlement_time.hour).map_err(|_| {
277        anyhow!(
278            "invalid settlement hour {} for instrument {}",
279            settlement_time.hour,
280            instrument.id
281        )
282    })?;
283
284    Ok(InstrumentSpecResponse {
285        instrument_id: instrument.id.clone(),
286        instrument_numeric_id: instrument.instrument_id,
287        exchange_symbol: instrument.id.clone(),
288        sym: format!("{}-USD", base_asset),
289        exchange: "HYPERCALL".to_string(),
290        instrument_kind: "OPTION".to_string(),
291        option_kind,
292        delivery: Some("CASH".to_string()),
293        settle_asset: Some("USDC".to_string()),
294        base_asset,
295        quote_asset: "USD".to_string(),
296        strike: instrument.strike,
297        expiry_ns,
298        settlement_hour_utc: Some(settlement_hour_utc),
299        settlement_time_utc: Some(format!(
300            "{:02}:{:02}",
301            settlement_time.hour, settlement_time.minute
302        )),
303        contract_size: 1.0,
304        min_trade_size: 0.1,
305        tick_size: 0.0001,
306        price_decimals: Some(6),
307        size_decimals: Some(6),
308        min_price_increment_bands: vec![TickSizeStep {
309            tick_size: 0.0005,
310            above_price: 0.005,
311        }],
312        state: instrument_state(&instrument.status).to_string(),
313        is_tradable: instrument.status.is_active(),
314        listed_time_ns: None,
315        // Instrument.updated_at is reset to now() whenever the registry
316        // rebuilds from the DB, so it is not a real spec-change timestamp.
317        // Stays None until a persisted spec event timestamp exists, else
318        // consumers would see false spec changes after every restart.
319        event_ts_ns: None,
320        maker_fee_bps: None,
321        taker_fee_bps: None,
322        initial_margin_fraction: None,
323        maintenance_margin_fraction: None,
324        position_limit: None,
325        option_token_address: instrument.option_token_address,
326        settlement_oracle: None,
327        condition_id: None,
328        underlying_resolution_source: None,
329    })
330}
331
332fn instrument_state(status: &InstrumentStatus) -> &'static str {
333    match status {
334        InstrumentStatus::Active => "OPEN",
335        InstrumentStatus::ExpiredPendingPrice => "SETTLEMENT",
336        InstrumentStatus::Settled => "DELIVERED",
337    }
338}
339
340fn matches_status_filter(
341    status: &InstrumentStatus,
342    filter: &Option<Vec<InstrumentStatus>>,
343) -> bool {
344    match filter {
345        Some(allowed) => allowed.contains(status),
346        None => true,
347    }
348}
349
350fn system_time_to_millis(value: SystemTime) -> Result<u64> {
351    Ok(value
352        .duration_since(SystemTime::UNIX_EPOCH)
353        .map_err(|e| anyhow!("Invalid snapshot build time: {}", e))?
354        .as_millis() as u64)
355}
356
357#[derive(Serialize)]
358struct PublishedInstrumentsSnapshotRef<'a> {
359    schema_version: u32,
360    built_at_ms: u64,
361    payload: &'a BTreeMap<String, Vec<InstrumentResponse>>,
362}
363
364#[cfg(test)]
365mod tests {
366    use super::*;
367    use chrono::{TimeZone, Utc};
368    use rust_decimal_macros::dec;
369
370    fn test_instrument(expiry_ts_secs: u64) -> Instrument {
371        Instrument {
372            instrument_id: 42,
373            id: "BTC-20260130-100000-C".to_string(),
374            underlying: "btc".to_string(),
375            strike: dec!(100000),
376            expiry: expiry_ts_secs,
377            option_type: "call".to_string(),
378            option_token_address: None,
379            mark_iv: None,
380            volume_24h: dec!(0),
381            open_interest: dec!(0),
382            updated_at: Utc.with_ymd_and_hms(2026, 1, 1, 12, 0, 0).unwrap(),
383            status: InstrumentStatus::Active,
384            trading_mode: Default::default(),
385        }
386    }
387
388    #[test]
389    fn instrument_to_spec_maps_current_option_fields() {
390        // 2026-01-30T08:00:00Z, the default expiry time of day.
391        let spec = instrument_to_spec(&test_instrument(1_769_760_000)).expect("spec should build");
392
393        assert_eq!(spec.instrument_id, "BTC-20260130-100000-C");
394        assert_eq!(spec.instrument_numeric_id, 42);
395        assert_eq!(spec.exchange_symbol, "BTC-20260130-100000-C");
396        assert_eq!(spec.sym, "BTC-USD");
397        assert_eq!(spec.exchange, "HYPERCALL");
398        assert_eq!(spec.instrument_kind, "OPTION");
399        assert_eq!(spec.option_kind.as_deref(), Some("C"));
400        assert_eq!(spec.delivery.as_deref(), Some("CASH"));
401        assert_eq!(spec.settle_asset.as_deref(), Some("USDC"));
402        assert_eq!(spec.base_asset, "BTC");
403        assert_eq!(spec.quote_asset, "USD");
404        assert_eq!(spec.expiry_ns, 1_769_760_000_000_000_000);
405        assert_eq!(spec.settlement_hour_utc, Some(8));
406        assert_eq!(spec.settlement_time_utc.as_deref(), Some("08:00"));
407        assert_eq!(spec.state, "OPEN");
408        assert!(spec.is_tradable);
409        assert_eq!(spec.event_ts_ns, None);
410        assert!(spec.maker_fee_bps.is_none());
411        assert!(spec.settlement_oracle.is_none());
412    }
413
414    #[test]
415    fn instrument_to_spec_serializes_strike_as_string() {
416        let spec = instrument_to_spec(&test_instrument(1_769_760_000)).expect("spec should build");
417        let json = serde_json::to_value(&spec).expect("spec should serialize");
418        assert_eq!(json["strike"], serde_json::json!("100000"));
419        assert_eq!(
420            json["expiry_ns"],
421            serde_json::json!(1_769_760_000_000_000_000_i64)
422        );
423        assert_eq!(json["option_token_address"], serde_json::Value::Null);
424    }
425
426    #[test]
427    fn instrument_to_spec_rejects_invalid_option_type() {
428        let mut instrument = test_instrument(1_769_760_000);
429        instrument.option_type = "straddle".to_string();
430        let err = instrument_to_spec(&instrument).unwrap_err();
431        assert!(err.to_string().contains("invalid option_type"));
432    }
433
434    #[test]
435    fn instrument_to_spec_rejects_overflowing_expiry() {
436        let err = instrument_to_spec(&test_instrument(u64::MAX)).unwrap_err();
437        assert!(err.to_string().contains("invalid expiry"));
438    }
439}