1use anyhow::{anyhow, Result};
2use arc_swap::ArcSwap;
3use axum::body::Bytes;
4use hypercall_types::{InstrumentResponse, InstrumentSpecResponse, TickSizeStep};
5use rust_decimal::prelude::ToPrimitive;
6use serde::Serialize;
7use std::collections::BTreeMap;
8use std::sync::Arc;
9use std::time::{Duration, Instant, SystemTime};
10use tracing::error;
11
12use crate::boundary::market_inputs::InstrumentsCacheReader;
13use crate::models::{Instrument, InstrumentStatus};
14
15const DEFAULT_REFRESH_INTERVAL_MS: u64 = 10_000;
16const INSTRUMENTS_SNAPSHOT_ENVELOPE_SCHEMA_VERSION: u32 = 1;
17
18#[derive(Clone)]
19struct CachedInstrumentResponse {
20 status: InstrumentStatus,
21 response: InstrumentResponse,
22 spec: InstrumentSpecResponse,
23}
24
25struct InstrumentsSnapshotData {
26 all_by_underlying: BTreeMap<String, Vec<CachedInstrumentResponse>>,
27 published_response: Bytes,
28 built_at: SystemTime,
29}
30
31pub struct InstrumentsSnapshotCache {
32 instruments_cache: Arc<dyn InstrumentsCacheReader>,
33 snapshot: ArcSwap<InstrumentsSnapshotData>,
34 refresh_interval: Duration,
35}
36
37impl InstrumentsSnapshotCache {
38 pub fn new(instruments_cache: Arc<dyn InstrumentsCacheReader>) -> Self {
39 Self {
40 instruments_cache,
41 snapshot: ArcSwap::from_pointee(InstrumentsSnapshotData {
42 all_by_underlying: BTreeMap::new(),
43 published_response: Bytes::from_static(
44 br#"{"schema_version":1,"built_at_ms":0,"payload":{}}"#,
45 ),
46 built_at: SystemTime::UNIX_EPOCH,
47 }),
48 refresh_interval: Duration::from_millis(DEFAULT_REFRESH_INTERVAL_MS),
49 }
50 }
51
52 pub fn with_refresh_interval(mut self, refresh_interval: Duration) -> Self {
53 self.refresh_interval = refresh_interval;
54 self
55 }
56
57 pub async fn refresh_once(&self) -> Result<()> {
58 let load_start = Instant::now();
59 let mut instruments = self.instruments_cache.get_all().await;
60 instruments.sort_by(|a, b| {
61 (
62 a.underlying.as_str(),
63 a.expiry,
64 a.strike,
65 a.option_type.as_str(),
66 a.id.as_str(),
67 )
68 .cmp(&(
69 b.underlying.as_str(),
70 b.expiry,
71 b.strike,
72 b.option_type.as_str(),
73 b.id.as_str(),
74 ))
75 });
76 metrics::histogram!("ht_instruments_snapshot_load_seconds")
77 .record(load_start.elapsed().as_secs_f64());
78
79 let build_start = Instant::now();
80 let mut all_by_underlying = BTreeMap::<String, Vec<CachedInstrumentResponse>>::new();
81 for instrument in instruments {
82 all_by_underlying
83 .entry(instrument.underlying.to_uppercase())
84 .or_default()
85 .push(CachedInstrumentResponse {
86 status: instrument.status.clone(),
87 response: instrument_to_response(&instrument),
88 spec: instrument_to_spec(&instrument)?,
89 });
90 }
91 metrics::histogram!("ht_instruments_snapshot_build_seconds")
92 .record(build_start.elapsed().as_secs_f64());
93
94 let serialize_start = Instant::now();
95 let active_by_underlying: BTreeMap<String, Vec<InstrumentResponse>> = all_by_underlying
96 .iter()
97 .filter_map(|(underlying, entries)| {
98 let active_entries: Vec<InstrumentResponse> = entries
99 .iter()
100 .filter(|entry| entry.status.is_active())
101 .map(|entry| entry.response.clone())
102 .collect();
103 (!active_entries.is_empty()).then_some((underlying.clone(), active_entries))
104 })
105 .collect();
106 let built_at = SystemTime::now();
107 let published_response = Bytes::from(
108 serde_json::to_vec(&PublishedInstrumentsSnapshotRef {
109 schema_version: INSTRUMENTS_SNAPSHOT_ENVELOPE_SCHEMA_VERSION,
110 built_at_ms: system_time_to_millis(built_at)?,
111 payload: &active_by_underlying,
112 })
113 .map_err(|e| anyhow!("Failed to serialize instruments snapshot: {}", e))?,
114 );
115 metrics::histogram!("ht_instruments_snapshot_serialize_seconds")
116 .record(serialize_start.elapsed().as_secs_f64());
117
118 self.snapshot.store(Arc::new(InstrumentsSnapshotData {
119 all_by_underlying,
120 published_response,
121 built_at,
122 }));
123 metrics::counter!("ht_instruments_snapshot_refresh_total", "status" => "success")
124 .increment(1);
125 Ok(())
126 }
127
128 pub async fn initialize(&self) {
129 if let Err(error) = self.refresh_once().await {
130 metrics::counter!("ht_instruments_snapshot_refresh_total", "status" => "error")
131 .increment(1);
132 error!(
133 error = %error,
134 "Initial /instruments snapshot build failed; serving cold-start payload"
135 );
136 }
137 }
138
139 pub fn start_with_shutdown(
140 self: Arc<Self>,
141 mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
142 ) -> tokio::task::JoinHandle<()> {
143 tokio::spawn(async move {
144 let mut interval = tokio::time::interval(self.refresh_interval);
145 loop {
146 tokio::select! {
147 _ = shutdown_rx.recv() => {
148 break;
149 }
150 _ = interval.tick() => {
151 if let Err(error) = self.refresh_once().await {
152 metrics::counter!("ht_instruments_snapshot_refresh_total", "status" => "error")
153 .increment(1);
154 error!(
155 error = %error,
156 "Failed to refresh /instruments snapshot; keeping last-good"
157 );
158 }
159 }
160 }
161 }
162 })
163 }
164
165 pub fn get_filtered(
166 &self,
167 currency: &str,
168 status_filter: &Option<Vec<InstrumentStatus>>,
169 ) -> (Vec<InstrumentResponse>, SystemTime) {
170 let snapshot = self.snapshot.load();
171 let key = currency.to_uppercase();
172 let result = snapshot
173 .all_by_underlying
174 .get(&key)
175 .map(|entries| {
176 entries
177 .iter()
178 .filter(|entry| matches_status_filter(&entry.status, status_filter))
179 .map(|entry| entry.response.clone())
180 .collect()
181 })
182 .unwrap_or_default();
183 (result, snapshot.built_at)
184 }
185
186 pub fn get_specs_filtered(
187 &self,
188 currency: &str,
189 status_filter: &Option<Vec<InstrumentStatus>>,
190 ) -> (Vec<InstrumentSpecResponse>, SystemTime) {
191 let snapshot = self.snapshot.load();
192 let key = currency.to_uppercase();
193 let result = snapshot
194 .all_by_underlying
195 .get(&key)
196 .map(|entries| {
197 entries
198 .iter()
199 .filter(|entry| matches_status_filter(&entry.status, status_filter))
200 .map(|entry| entry.spec.clone())
201 .collect()
202 })
203 .unwrap_or_default();
204 (result, snapshot.built_at)
205 }
206
207 pub fn published_response(&self) -> (Bytes, SystemTime) {
208 let snapshot = self.snapshot.load();
209 (snapshot.published_response.clone(), snapshot.built_at)
210 }
211}
212
213fn instrument_to_response(instrument: &Instrument) -> InstrumentResponse {
214 let currency = instrument.underlying.to_uppercase();
215 InstrumentResponse {
216 price_index: format!("{}_usd", currency.to_lowercase()),
217 rfq: instrument.trading_mode.allows_rfq(),
218 orderbook: instrument.trading_mode.allows_orderbook(),
219 kind: "option".to_string(),
220 instrument_name: instrument.id.clone(),
221 option_token_address: instrument.option_token_address,
222 maker_commission: 0.0003,
223 taker_commission: 0.0003,
224 instrument_type: "reversed".to_string(),
225 expiration_timestamp: (instrument.expiry * 1000) as i64,
226 creation_timestamp: instrument.updated_at.timestamp_millis(),
227 is_active: instrument.status.is_active(),
228 option_type: instrument.option_type.to_lowercase(),
229 contract_size: 1.0,
230 tick_size: 0.0001,
231 strike: instrument.strike.to_f64().unwrap_or(0.0),
232 instrument_id: instrument.instrument_id,
233 settlement_period: "day".to_string(),
234 min_trade_amount: 0.1,
235 block_trade_commission: 0.0003,
236 block_trade_min_trade_amount: 25.0,
237 block_trade_tick_size: 0.0001,
238 settlement_currency: currency.clone(),
239 base_currency: currency.clone(),
240 counter_currency: "USD".to_string(),
241 quote_currency: "USD".to_string(),
242 tick_size_steps: vec![TickSizeStep {
243 tick_size: 0.0005,
244 above_price: 0.005,
245 }],
246 }
247}
248
249fn instrument_to_spec(instrument: &Instrument) -> Result<InstrumentSpecResponse> {
250 let base_asset = instrument.underlying.to_uppercase();
251 let option_kind = match instrument.option_type.to_lowercase().as_str() {
252 "call" => Some("C".to_string()),
253 "put" => Some("P".to_string()),
254 other => {
255 return Err(anyhow!(
256 "invalid option_type '{}' for instrument {}",
257 other,
258 instrument.id
259 ));
260 }
261 };
262 let expiry_ns = instrument
265 .expiry
266 .checked_mul(1_000_000_000)
267 .and_then(|value| i64::try_from(value).ok())
268 .ok_or_else(|| {
269 anyhow!(
270 "invalid expiry {} for instrument {}",
271 instrument.expiry,
272 instrument.id
273 )
274 })?;
275 let settlement_time = hypercall_types::expiry_times().for_underlying(&instrument.underlying);
276 let settlement_hour_utc = u8::try_from(settlement_time.hour).map_err(|_| {
277 anyhow!(
278 "invalid settlement hour {} for instrument {}",
279 settlement_time.hour,
280 instrument.id
281 )
282 })?;
283
284 Ok(InstrumentSpecResponse {
285 instrument_id: instrument.id.clone(),
286 instrument_numeric_id: instrument.instrument_id,
287 exchange_symbol: instrument.id.clone(),
288 sym: format!("{}-USD", base_asset),
289 exchange: "HYPERCALL".to_string(),
290 instrument_kind: "OPTION".to_string(),
291 option_kind,
292 delivery: Some("CASH".to_string()),
293 settle_asset: Some("USDC".to_string()),
294 base_asset,
295 quote_asset: "USD".to_string(),
296 strike: instrument.strike,
297 expiry_ns,
298 settlement_hour_utc: Some(settlement_hour_utc),
299 settlement_time_utc: Some(format!(
300 "{:02}:{:02}",
301 settlement_time.hour, settlement_time.minute
302 )),
303 contract_size: 1.0,
304 min_trade_size: 0.1,
305 tick_size: 0.0001,
306 price_decimals: Some(6),
307 size_decimals: Some(6),
308 min_price_increment_bands: vec![TickSizeStep {
309 tick_size: 0.0005,
310 above_price: 0.005,
311 }],
312 state: instrument_state(&instrument.status).to_string(),
313 is_tradable: instrument.status.is_active(),
314 listed_time_ns: None,
315 event_ts_ns: None,
320 maker_fee_bps: None,
321 taker_fee_bps: None,
322 initial_margin_fraction: None,
323 maintenance_margin_fraction: None,
324 position_limit: None,
325 option_token_address: instrument.option_token_address,
326 settlement_oracle: None,
327 condition_id: None,
328 underlying_resolution_source: None,
329 })
330}
331
332fn instrument_state(status: &InstrumentStatus) -> &'static str {
333 match status {
334 InstrumentStatus::Active => "OPEN",
335 InstrumentStatus::ExpiredPendingPrice => "SETTLEMENT",
336 InstrumentStatus::Settled => "DELIVERED",
337 }
338}
339
340fn matches_status_filter(
341 status: &InstrumentStatus,
342 filter: &Option<Vec<InstrumentStatus>>,
343) -> bool {
344 match filter {
345 Some(allowed) => allowed.contains(status),
346 None => true,
347 }
348}
349
350fn system_time_to_millis(value: SystemTime) -> Result<u64> {
351 Ok(value
352 .duration_since(SystemTime::UNIX_EPOCH)
353 .map_err(|e| anyhow!("Invalid snapshot build time: {}", e))?
354 .as_millis() as u64)
355}
356
357#[derive(Serialize)]
358struct PublishedInstrumentsSnapshotRef<'a> {
359 schema_version: u32,
360 built_at_ms: u64,
361 payload: &'a BTreeMap<String, Vec<InstrumentResponse>>,
362}
363
364#[cfg(test)]
365mod tests {
366 use super::*;
367 use chrono::{TimeZone, Utc};
368 use rust_decimal_macros::dec;
369
370 fn test_instrument(expiry_ts_secs: u64) -> Instrument {
371 Instrument {
372 instrument_id: 42,
373 id: "BTC-20260130-100000-C".to_string(),
374 underlying: "btc".to_string(),
375 strike: dec!(100000),
376 expiry: expiry_ts_secs,
377 option_type: "call".to_string(),
378 option_token_address: None,
379 mark_iv: None,
380 volume_24h: dec!(0),
381 open_interest: dec!(0),
382 updated_at: Utc.with_ymd_and_hms(2026, 1, 1, 12, 0, 0).unwrap(),
383 status: InstrumentStatus::Active,
384 trading_mode: Default::default(),
385 }
386 }
387
388 #[test]
389 fn instrument_to_spec_maps_current_option_fields() {
390 let spec = instrument_to_spec(&test_instrument(1_769_760_000)).expect("spec should build");
392
393 assert_eq!(spec.instrument_id, "BTC-20260130-100000-C");
394 assert_eq!(spec.instrument_numeric_id, 42);
395 assert_eq!(spec.exchange_symbol, "BTC-20260130-100000-C");
396 assert_eq!(spec.sym, "BTC-USD");
397 assert_eq!(spec.exchange, "HYPERCALL");
398 assert_eq!(spec.instrument_kind, "OPTION");
399 assert_eq!(spec.option_kind.as_deref(), Some("C"));
400 assert_eq!(spec.delivery.as_deref(), Some("CASH"));
401 assert_eq!(spec.settle_asset.as_deref(), Some("USDC"));
402 assert_eq!(spec.base_asset, "BTC");
403 assert_eq!(spec.quote_asset, "USD");
404 assert_eq!(spec.expiry_ns, 1_769_760_000_000_000_000);
405 assert_eq!(spec.settlement_hour_utc, Some(8));
406 assert_eq!(spec.settlement_time_utc.as_deref(), Some("08:00"));
407 assert_eq!(spec.state, "OPEN");
408 assert!(spec.is_tradable);
409 assert_eq!(spec.event_ts_ns, None);
410 assert!(spec.maker_fee_bps.is_none());
411 assert!(spec.settlement_oracle.is_none());
412 }
413
414 #[test]
415 fn instrument_to_spec_serializes_strike_as_string() {
416 let spec = instrument_to_spec(&test_instrument(1_769_760_000)).expect("spec should build");
417 let json = serde_json::to_value(&spec).expect("spec should serialize");
418 assert_eq!(json["strike"], serde_json::json!("100000"));
419 assert_eq!(
420 json["expiry_ns"],
421 serde_json::json!(1_769_760_000_000_000_000_i64)
422 );
423 assert_eq!(json["option_token_address"], serde_json::Value::Null);
424 }
425
426 #[test]
427 fn instrument_to_spec_rejects_invalid_option_type() {
428 let mut instrument = test_instrument(1_769_760_000);
429 instrument.option_type = "straddle".to_string();
430 let err = instrument_to_spec(&instrument).unwrap_err();
431 assert!(err.to_string().contains("invalid option_type"));
432 }
433
434 #[test]
435 fn instrument_to_spec_rejects_overflowing_expiry() {
436 let err = instrument_to_spec(&test_instrument(u64::MAX)).unwrap_err();
437 assert!(err.to_string().contains("invalid expiry"));
438 }
439}