Skip to main content

hypercall_api/
candles.rs

1use async_trait::async_trait;
2use chrono::Utc;
3use metrics::counter;
4use rust_decimal::Decimal;
5use serde::{Deserialize, Serialize};
6use std::collections::{HashMap, HashSet};
7use std::fmt;
8use std::str::FromStr;
9use std::sync::Arc;
10use std::time::Duration;
11use tokio::time::MissedTickBehavior;
12use tracing::{debug, warn};
13use utoipa::ToSchema;
14
15use super::websocket::{PubSubManager, WsCandleUpdate, WsMessage};
16use crate::models::Trade;
17use hypercall_types::to_human_readable_decimal;
18pub use hypercall_types::ws_protocol::CandleResolution;
19
20pub const CANDLE_WS_CHANNEL_PREFIX: &str = "candles:";
21const DEFAULT_CANDLE_WS_POLL_INTERVAL_MS: u64 = 2_000;
22const DEFAULT_HYPERLIQUID_INFO_URL: &str = "https://api.hyperliquid.xyz/info";
23const DEFAULT_HYPERLIQUID_TESTNET_INFO_URL: &str = "https://api.hyperliquid-testnet.xyz/info";
24pub const MAX_OPTION_CANDLE_QUERY_SPAN_MS: i64 = 2_678_400_000;
25
26#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)]
27pub struct UnderlyingCandle {
28    pub start_time_ms: i64,
29    pub end_time_ms: i64,
30    pub open: f64,
31    pub high: f64,
32    pub low: f64,
33    pub close: f64,
34    pub volume: f64,
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
38pub struct UnderlyingCandlesResponse {
39    pub underlying: String,
40    pub resolution: CandleResolution,
41    pub candles: Vec<UnderlyingCandle>,
42}
43
44#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)]
45pub struct OptionInstrumentCandle {
46    pub start_time_ms: i64,
47    pub end_time_ms: i64,
48    pub open: f64,
49    pub high: f64,
50    pub low: f64,
51    pub close: f64,
52    pub volume: f64,
53}
54
55#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
56pub struct OptionInstrumentCandlesResponse {
57    pub instrument_name: String,
58    pub resolution: CandleResolution,
59    pub candles: Vec<OptionInstrumentCandle>,
60}
61
62#[derive(Debug, Clone, PartialEq, Eq)]
63pub enum CandleValidationError {
64    InvalidStartTimeMs(i64),
65    InvalidEndTimeMs(i64),
66    EndTimeNotAfterStart {
67        start_time_ms: i64,
68        end_time_ms: i64,
69    },
70    OptionQuerySpanTooLarge {
71        start_time_ms: i64,
72        end_time_ms: i64,
73        max_span_ms: i64,
74    },
75    BucketAlignmentOverflow {
76        end_time_ms: i64,
77        resolution: CandleResolution,
78    },
79    UnknownUnderlying(String),
80}
81
82impl fmt::Display for CandleValidationError {
83    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
84        match self {
85            Self::InvalidStartTimeMs(ts) => {
86                write!(f, "start_time_ms must be non-negative, got {}", ts)
87            }
88            Self::InvalidEndTimeMs(ts) => {
89                write!(f, "end_time_ms must be non-negative, got {}", ts)
90            }
91            Self::EndTimeNotAfterStart {
92                start_time_ms,
93                end_time_ms,
94            } => write!(
95                f,
96                "end_time_ms ({}) must be greater than start_time_ms ({})",
97                end_time_ms, start_time_ms
98            ),
99            Self::OptionQuerySpanTooLarge {
100                start_time_ms,
101                end_time_ms,
102                max_span_ms,
103            } => write!(
104                f,
105                "option candle query span [{}, {}) exceeds max supported window of {} ms",
106                start_time_ms, end_time_ms, max_span_ms
107            ),
108            Self::BucketAlignmentOverflow {
109                end_time_ms,
110                resolution,
111            } => write!(
112                f,
113                "end_time_ms ({}) cannot be aligned to {} bucket boundaries without overflow",
114                end_time_ms, resolution
115            ),
116            Self::UnknownUnderlying(underlying) => {
117                write!(
118                    f,
119                    "Unsupported underlying '{}': no candle coin mapping",
120                    underlying
121                )
122            }
123        }
124    }
125}
126
127impl std::error::Error for CandleValidationError {}
128
129#[derive(Debug, Clone)]
130pub enum CandleSourceError {
131    Upstream(String),
132    InvalidResponse(String),
133}
134
135impl fmt::Display for CandleSourceError {
136    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
137        match self {
138            Self::Upstream(msg) => write!(f, "upstream candle request failed: {}", msg),
139            Self::InvalidResponse(msg) => write!(f, "invalid upstream candle payload: {}", msg),
140        }
141    }
142}
143
144impl std::error::Error for CandleSourceError {}
145
146pub fn normalize_underlying(underlying: &str) -> String {
147    underlying.trim().to_ascii_uppercase()
148}
149
150pub fn validate_candle_time_range(
151    start_time_ms: i64,
152    end_time_ms: i64,
153) -> Result<(), CandleValidationError> {
154    if start_time_ms < 0 {
155        return Err(CandleValidationError::InvalidStartTimeMs(start_time_ms));
156    }
157    if end_time_ms < 0 {
158        return Err(CandleValidationError::InvalidEndTimeMs(end_time_ms));
159    }
160    if end_time_ms <= start_time_ms {
161        return Err(CandleValidationError::EndTimeNotAfterStart {
162            start_time_ms,
163            end_time_ms,
164        });
165    }
166    Ok(())
167}
168
169pub fn validate_option_candle_query_time_range(
170    start_time_ms: i64,
171    end_time_ms: i64,
172) -> Result<(), CandleValidationError> {
173    validate_candle_time_range(start_time_ms, end_time_ms)?;
174
175    let span_ms = end_time_ms.checked_sub(start_time_ms).expect(
176        "validated candle time range must have a positive span that fits in i64 arithmetic",
177    );
178    if span_ms > MAX_OPTION_CANDLE_QUERY_SPAN_MS {
179        return Err(CandleValidationError::OptionQuerySpanTooLarge {
180            start_time_ms,
181            end_time_ms,
182            max_span_ms: MAX_OPTION_CANDLE_QUERY_SPAN_MS,
183        });
184    }
185
186    Ok(())
187}
188
189pub fn align_candle_time_range_to_buckets(
190    start_time_ms: i64,
191    end_time_ms: i64,
192    resolution: CandleResolution,
193) -> Result<(i64, i64), CandleValidationError> {
194    let interval_ms = resolution.interval_ms();
195    let aligned_start_time_ms = start_time_ms - (start_time_ms % interval_ms);
196    let end_remainder_ms = end_time_ms % interval_ms;
197    let aligned_end_time_ms = if end_remainder_ms == 0 {
198        end_time_ms
199    } else {
200        end_time_ms
201            .checked_add(interval_ms - end_remainder_ms)
202            .ok_or(CandleValidationError::BucketAlignmentOverflow {
203                end_time_ms,
204                resolution,
205            })?
206    };
207
208    Ok((aligned_start_time_ms, aligned_end_time_ms))
209}
210
211pub fn build_option_instrument_candles(
212    symbol: &str,
213    resolution: CandleResolution,
214    trades: &[Trade],
215) -> Result<Vec<OptionInstrumentCandle>, CandleSourceError> {
216    let interval_ms = resolution.interval_ms();
217    let mut candles = Vec::new();
218    let mut current: Option<OptionCandleAccumulator> = None;
219
220    for trade in trades {
221        if trade.symbol != symbol {
222            return Err(CandleSourceError::InvalidResponse(format!(
223                "trade symbol mismatch: expected {}, got {}",
224                symbol, trade.symbol
225            )));
226        }
227
228        let bucket_start_time_ms = trade.timestamp - (trade.timestamp % interval_ms);
229        let price = decimal_to_f64(trade.price, "price")?;
230        let volume = decimal_to_f64(to_human_readable_decimal(symbol, trade.size), "volume")?;
231
232        match current.take() {
233            Some(mut accumulator) if accumulator.start_time_ms == bucket_start_time_ms => {
234                accumulator.observe(price, volume);
235                current = Some(accumulator);
236            }
237            Some(accumulator) => {
238                candles.push(accumulator.finish(interval_ms)?);
239                current = Some(OptionCandleAccumulator::new(
240                    bucket_start_time_ms,
241                    price,
242                    volume,
243                ));
244            }
245            None => {
246                current = Some(OptionCandleAccumulator::new(
247                    bucket_start_time_ms,
248                    price,
249                    volume,
250                ));
251            }
252        }
253    }
254
255    if let Some(accumulator) = current {
256        candles.push(accumulator.finish(interval_ms)?);
257    }
258
259    Ok(candles)
260}
261
262pub fn resolve_candle_coin(
263    underlying: &str,
264    underlying_to_candle_coin: &HashMap<String, String>,
265) -> Result<(String, String), CandleValidationError> {
266    let normalized_underlying = normalize_underlying(underlying);
267    if let Some(coin) = underlying_to_candle_coin.get(&normalized_underlying) {
268        Ok((normalized_underlying, coin.clone()))
269    } else {
270        Err(CandleValidationError::UnknownUnderlying(
271            normalized_underlying,
272        ))
273    }
274}
275
276#[async_trait]
277pub trait UnderlyingCandleSource: Send + Sync {
278    async fn fetch_candles(
279        &self,
280        coin: &str,
281        resolution: CandleResolution,
282        start_time_ms: i64,
283        end_time_ms: i64,
284    ) -> Result<Vec<UnderlyingCandle>, CandleSourceError>;
285}
286
287#[derive(Clone)]
288pub struct HyperliquidCandleSource {
289    client: reqwest::Client,
290    info_url: String,
291}
292
293impl HyperliquidCandleSource {
294    pub fn new(info_url: String) -> Result<Self, String> {
295        let client = reqwest::Client::builder()
296            .timeout(Duration::from_secs(10))
297            .build()
298            .map_err(|e| format!("failed to build hyperliquid candle client: {}", e))?;
299        Ok(Self { client, info_url })
300    }
301
302    pub fn from_config(
303        pricing: &hypercall_config::PricingConfig,
304        is_testnet_mode: bool,
305    ) -> Result<Self, String> {
306        let info_url = if pricing.hyperliquid_info_url.is_empty() {
307            if is_testnet_mode {
308                DEFAULT_HYPERLIQUID_TESTNET_INFO_URL.to_string()
309            } else {
310                DEFAULT_HYPERLIQUID_INFO_URL.to_string()
311            }
312        } else {
313            pricing.hyperliquid_info_url.clone()
314        };
315        Self::new(info_url)
316    }
317}
318
319#[derive(Debug, Serialize)]
320struct HyperliquidCandleSnapshotEnvelope {
321    #[serde(rename = "type")]
322    request_type: &'static str,
323    req: HyperliquidCandleSnapshotReq,
324}
325
326#[derive(Debug, Serialize)]
327#[serde(rename_all = "camelCase")]
328struct HyperliquidCandleSnapshotReq {
329    coin: String,
330    interval: String,
331    start_time: i64,
332    end_time: i64,
333}
334
335#[derive(Debug, Deserialize)]
336struct HyperliquidCandleSnapshot {
337    #[serde(rename = "t")]
338    start_time_ms: i64,
339    #[serde(rename = "T")]
340    end_time_ms: i64,
341    #[serde(rename = "o")]
342    open: String,
343    #[serde(rename = "h")]
344    high: String,
345    #[serde(rename = "l")]
346    low: String,
347    #[serde(rename = "c")]
348    close: String,
349    #[serde(rename = "v")]
350    volume: String,
351}
352
353fn parse_f64_field(value: &str, field: &str) -> Result<f64, CandleSourceError> {
354    let parsed = value.parse::<f64>().map_err(|e| {
355        CandleSourceError::InvalidResponse(format!("failed to parse {}='{}': {}", field, value, e))
356    })?;
357    if !parsed.is_finite() {
358        return Err(CandleSourceError::InvalidResponse(format!(
359            "{} must be finite, got {}",
360            field, value
361        )));
362    }
363    Ok(parsed)
364}
365
366fn decimal_to_f64(value: Decimal, field: &str) -> Result<f64, CandleSourceError> {
367    let value_text = value.to_string();
368    parse_f64_field(&value_text, field)
369}
370
371#[derive(Debug, Clone)]
372struct OptionCandleAccumulator {
373    start_time_ms: i64,
374    open: f64,
375    high: f64,
376    low: f64,
377    close: f64,
378    volume: f64,
379}
380
381impl OptionCandleAccumulator {
382    fn new(start_time_ms: i64, price: f64, volume: f64) -> Self {
383        Self {
384            start_time_ms,
385            open: price,
386            high: price,
387            low: price,
388            close: price,
389            volume,
390        }
391    }
392
393    fn observe(&mut self, price: f64, volume: f64) {
394        self.high = self.high.max(price);
395        self.low = self.low.min(price);
396        self.close = price;
397        self.volume += volume;
398    }
399
400    fn finish(self, interval_ms: i64) -> Result<OptionInstrumentCandle, CandleSourceError> {
401        let end_time_ms = self.start_time_ms + interval_ms;
402        validate_candle_time_range(self.start_time_ms, end_time_ms)
403            .map_err(|e| CandleSourceError::InvalidResponse(e.to_string()))?;
404
405        Ok(OptionInstrumentCandle {
406            start_time_ms: self.start_time_ms,
407            end_time_ms,
408            open: self.open,
409            high: self.high,
410            low: self.low,
411            close: self.close,
412            volume: self.volume,
413        })
414    }
415}
416
417fn parse_hyperliquid_snapshots(
418    snapshots: Vec<HyperliquidCandleSnapshot>,
419) -> Result<Vec<UnderlyingCandle>, CandleSourceError> {
420    let mut candles = Vec::with_capacity(snapshots.len());
421
422    for snapshot in snapshots {
423        validate_candle_time_range(snapshot.start_time_ms, snapshot.end_time_ms)
424            .map_err(|e| CandleSourceError::InvalidResponse(e.to_string()))?;
425
426        let open = parse_f64_field(&snapshot.open, "open")?;
427        let high = parse_f64_field(&snapshot.high, "high")?;
428        let low = parse_f64_field(&snapshot.low, "low")?;
429        let close = parse_f64_field(&snapshot.close, "close")?;
430        let volume = parse_f64_field(&snapshot.volume, "volume")?;
431
432        candles.push(UnderlyingCandle {
433            start_time_ms: snapshot.start_time_ms,
434            end_time_ms: snapshot.end_time_ms,
435            open,
436            high,
437            low,
438            close,
439            volume,
440        });
441    }
442
443    candles.sort_by_key(|candle| candle.start_time_ms);
444    Ok(candles)
445}
446
447#[async_trait]
448impl UnderlyingCandleSource for HyperliquidCandleSource {
449    async fn fetch_candles(
450        &self,
451        coin: &str,
452        resolution: CandleResolution,
453        start_time_ms: i64,
454        end_time_ms: i64,
455    ) -> Result<Vec<UnderlyingCandle>, CandleSourceError> {
456        validate_candle_time_range(start_time_ms, end_time_ms)
457            .map_err(|e| CandleSourceError::InvalidResponse(e.to_string()))?;
458
459        let body = HyperliquidCandleSnapshotEnvelope {
460            request_type: "candleSnapshot",
461            req: HyperliquidCandleSnapshotReq {
462                coin: coin.to_string(),
463                interval: resolution.as_str().to_string(),
464                start_time: start_time_ms,
465                end_time: end_time_ms,
466            },
467        };
468
469        let response = self
470            .client
471            .post(&self.info_url)
472            .json(&body)
473            .send()
474            .await
475            .map_err(|e| CandleSourceError::Upstream(e.to_string()))?;
476
477        if !response.status().is_success() {
478            return Err(CandleSourceError::Upstream(format!(
479                "status={} url={}",
480                response.status(),
481                self.info_url
482            )));
483        }
484
485        let snapshots = response
486            .json::<Vec<HyperliquidCandleSnapshot>>()
487            .await
488            .map_err(|e| CandleSourceError::InvalidResponse(e.to_string()))?;
489
490        parse_hyperliquid_snapshots(snapshots)
491    }
492}
493
494pub fn parse_candle_channel(channel: &str) -> Option<(String, CandleResolution)> {
495    let mut parts = channel.split(':');
496    let prefix = parts.next()?;
497    if prefix != "candles" {
498        return None;
499    }
500
501    let underlying = parts.next()?.trim();
502    if underlying.is_empty() {
503        return None;
504    }
505
506    let resolution = parts.next()?.trim();
507    if parts.next().is_some() {
508        return None;
509    }
510
511    let parsed_resolution = CandleResolution::from_str(resolution).ok()?;
512    Some((normalize_underlying(underlying), parsed_resolution))
513}
514
515pub fn candle_ws_poll_interval_ms(pricing: &hypercall_config::PricingConfig) -> u64 {
516    if pricing.candle_ws_poll_interval_ms == 0 {
517        DEFAULT_CANDLE_WS_POLL_INTERVAL_MS
518    } else {
519        pricing.candle_ws_poll_interval_ms
520    }
521}
522
523#[derive(Debug, Clone, Copy, PartialEq, Eq)]
524struct CandleFingerprint {
525    start_time_ms: i64,
526    end_time_ms: i64,
527    open_bits: u64,
528    high_bits: u64,
529    low_bits: u64,
530    close_bits: u64,
531    volume_bits: u64,
532}
533
534impl CandleFingerprint {
535    fn from_update(update: &WsCandleUpdate) -> Self {
536        Self {
537            start_time_ms: update.start_time_ms,
538            end_time_ms: update.end_time_ms,
539            open_bits: update.open.to_bits(),
540            high_bits: update.high.to_bits(),
541            low_bits: update.low.to_bits(),
542            close_bits: update.close.to_bits(),
543            volume_bits: update.volume.to_bits(),
544        }
545    }
546}
547
548pub struct CandleWsPublisher {
549    pubsub: Arc<PubSubManager>,
550    candle_source: Arc<dyn UnderlyingCandleSource>,
551    underlying_to_candle_coin: Arc<HashMap<String, String>>,
552    poll_interval: Duration,
553    last_published: tokio::sync::Mutex<HashMap<String, CandleFingerprint>>,
554}
555
556impl CandleWsPublisher {
557    pub fn new(
558        pubsub: Arc<PubSubManager>,
559        candle_source: Arc<dyn UnderlyingCandleSource>,
560        underlying_to_candle_coin: Arc<HashMap<String, String>>,
561        poll_interval: Duration,
562    ) -> Self {
563        Self {
564            pubsub,
565            candle_source,
566            underlying_to_candle_coin,
567            poll_interval,
568            last_published: tokio::sync::Mutex::new(HashMap::new()),
569        }
570    }
571
572    pub async fn run_with_shutdown(&self, mut shutdown_rx: tokio::sync::broadcast::Receiver<()>) {
573        let mut ticker = tokio::time::interval(self.poll_interval);
574        ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
575
576        loop {
577            tokio::select! {
578                _ = shutdown_rx.recv() => {
579                    debug!("CandleWsPublisher received shutdown signal");
580                    break;
581                }
582                _ = ticker.tick() => {
583                    self.publish_once().await;
584                }
585            }
586        }
587    }
588
589    async fn publish_once(&self) {
590        let channels = self
591            .pubsub
592            .active_channels_with_prefix(CANDLE_WS_CHANNEL_PREFIX)
593            .await;
594
595        let active: HashSet<String> = channels.iter().cloned().collect();
596        {
597            let mut last = self.last_published.lock().await;
598            last.retain(|channel, _| active.contains(channel));
599        }
600
601        for channel in channels {
602            let Some((underlying, resolution)) = parse_candle_channel(&channel) else {
603                counter!("ht_ws_candle_invalid_channel_total").increment(1);
604                debug!(channel, "Skipping invalid candle channel format");
605                continue;
606            };
607
608            let Some(coin) = self.underlying_to_candle_coin.get(&underlying) else {
609                counter!("ht_ws_candle_unknown_underlying_total", "underlying" => underlying.clone()).increment(1);
610                debug!(
611                    channel,
612                    underlying, "Skipping candle channel with unknown underlying"
613                );
614                continue;
615            };
616
617            let end_time_ms = Utc::now().timestamp_millis();
618            let lookback_ms = resolution.interval_ms().saturating_mul(2);
619            let start_time_ms = end_time_ms.saturating_sub(lookback_ms);
620
621            match self
622                .candle_source
623                .fetch_candles(coin, resolution, start_time_ms, end_time_ms)
624                .await
625            {
626                Ok(candles) => {
627                    let Some(latest) = candles
628                        .into_iter()
629                        .max_by_key(|candle| candle.start_time_ms)
630                    else {
631                        continue;
632                    };
633
634                    let update = WsCandleUpdate {
635                        underlying: underlying.clone(),
636                        resolution,
637                        start_time_ms: latest.start_time_ms,
638                        end_time_ms: latest.end_time_ms,
639                        open: latest.open,
640                        high: latest.high,
641                        low: latest.low,
642                        close: latest.close,
643                        volume: latest.volume,
644                    };
645
646                    let next_fingerprint = CandleFingerprint::from_update(&update);
647                    {
648                        let last = self.last_published.lock().await;
649                        if last.get(&channel) == Some(&next_fingerprint) {
650                            continue;
651                        }
652                    }
653
654                    self.pubsub
655                        .publish_to_channel(&channel, WsMessage::CandleUpdate(update));
656                    self.last_published
657                        .lock()
658                        .await
659                        .insert(channel, next_fingerprint);
660                }
661                Err(e) => {
662                    counter!(
663                        "ht_ws_candle_fetch_failures_total",
664                        "underlying" => underlying,
665                        "resolution" => resolution.as_str().to_string()
666                    )
667                    .increment(1);
668                    warn!(channel, error = %e, "Failed to fetch candles for websocket publishing");
669                }
670            }
671        }
672    }
673}
674
675#[cfg(test)]
676mod tests {
677    use super::*;
678    use std::sync::atomic::{AtomicUsize, Ordering};
679    use tokio::sync::mpsc;
680
681    #[test]
682    fn test_candle_resolution_roundtrip() {
683        let all = [
684            CandleResolution::OneMinute,
685            CandleResolution::FiveMinutes,
686            CandleResolution::FifteenMinutes,
687            CandleResolution::OneHour,
688            CandleResolution::FourHours,
689            CandleResolution::OneDay,
690        ];
691
692        for resolution in all {
693            let json = serde_json::to_string(&resolution).expect("serialize resolution");
694            let parsed: CandleResolution = serde_json::from_str(&json).expect("parse resolution");
695            assert_eq!(parsed, resolution);
696            assert_eq!(
697                CandleResolution::from_str(resolution.as_str()).unwrap(),
698                resolution
699            );
700        }
701
702        let invalid = serde_json::from_str::<CandleResolution>("\"10m\"");
703        assert!(invalid.is_err());
704    }
705
706    #[test]
707    fn test_interval_ms_mapping() {
708        assert_eq!(CandleResolution::OneMinute.interval_ms(), 60_000);
709        assert_eq!(CandleResolution::FiveMinutes.interval_ms(), 300_000);
710        assert_eq!(CandleResolution::FifteenMinutes.interval_ms(), 900_000);
711        assert_eq!(CandleResolution::OneHour.interval_ms(), 3_600_000);
712        assert_eq!(CandleResolution::FourHours.interval_ms(), 14_400_000);
713        assert_eq!(CandleResolution::OneDay.interval_ms(), 86_400_000);
714    }
715
716    #[test]
717    fn test_validate_candle_time_range_rejects_invalid_values() {
718        assert_eq!(
719            validate_candle_time_range(-1, 100).unwrap_err(),
720            CandleValidationError::InvalidStartTimeMs(-1)
721        );
722        assert_eq!(
723            validate_candle_time_range(0, -1).unwrap_err(),
724            CandleValidationError::InvalidEndTimeMs(-1)
725        );
726        assert_eq!(
727            validate_candle_time_range(100, 100).unwrap_err(),
728            CandleValidationError::EndTimeNotAfterStart {
729                start_time_ms: 100,
730                end_time_ms: 100
731            }
732        );
733    }
734
735    #[test]
736    fn test_validate_option_candle_query_time_range_rejects_large_spans() {
737        let error = validate_option_candle_query_time_range(0, MAX_OPTION_CANDLE_QUERY_SPAN_MS + 1)
738            .unwrap_err();
739
740        assert_eq!(
741            error,
742            CandleValidationError::OptionQuerySpanTooLarge {
743                start_time_ms: 0,
744                end_time_ms: MAX_OPTION_CANDLE_QUERY_SPAN_MS + 1,
745                max_span_ms: MAX_OPTION_CANDLE_QUERY_SPAN_MS,
746            }
747        );
748    }
749
750    #[test]
751    fn test_align_candle_time_range_to_buckets_expands_edges() {
752        let aligned =
753            align_candle_time_range_to_buckets(61_000, 121_000, CandleResolution::OneMinute)
754                .unwrap();
755
756        assert_eq!(aligned, (60_000, 180_000));
757    }
758
759    #[test]
760    fn test_align_candle_time_range_to_buckets_keeps_aligned_edges() {
761        let aligned =
762            align_candle_time_range_to_buckets(60_000, 120_000, CandleResolution::OneMinute)
763                .unwrap();
764
765        assert_eq!(aligned, (60_000, 120_000));
766    }
767
768    #[test]
769    fn test_align_candle_time_range_to_buckets_rejects_overflow() {
770        let error =
771            align_candle_time_range_to_buckets(60_000, i64::MAX - 1, CandleResolution::OneMinute)
772                .unwrap_err();
773
774        assert_eq!(
775            error,
776            CandleValidationError::BucketAlignmentOverflow {
777                end_time_ms: i64::MAX - 1,
778                resolution: CandleResolution::OneMinute,
779            }
780        );
781    }
782
783    #[test]
784    fn test_parse_hyperliquid_snapshots_numeric_conversion() {
785        let snapshots = vec![HyperliquidCandleSnapshot {
786            start_time_ms: 1_000,
787            end_time_ms: 2_000,
788            open: "100.25".to_string(),
789            high: "101.50".to_string(),
790            low: "99.75".to_string(),
791            close: "100.75".to_string(),
792            volume: "42.5".to_string(),
793        }];
794
795        let candles = parse_hyperliquid_snapshots(snapshots).expect("parse snapshots");
796        assert_eq!(candles.len(), 1);
797        assert_eq!(candles[0].open, 100.25);
798        assert_eq!(candles[0].high, 101.50);
799        assert_eq!(candles[0].low, 99.75);
800        assert_eq!(candles[0].close, 100.75);
801        assert_eq!(candles[0].volume, 42.5);
802    }
803
804    #[test]
805    fn test_parse_candle_channel() {
806        let parsed = parse_candle_channel("candles:btc:1m").expect("valid channel");
807        assert_eq!(parsed.0, "BTC");
808        assert_eq!(parsed.1, CandleResolution::OneMinute);
809
810        assert!(parse_candle_channel("candles:BTC").is_none());
811        assert!(parse_candle_channel("candles:BTC:10m").is_none());
812        assert!(parse_candle_channel("orderbook").is_none());
813    }
814
815    fn trade(symbol: &str, price: &str, size: &str, timestamp: i64) -> Trade {
816        Trade {
817            trade_id: timestamp,
818            symbol: symbol.to_string(),
819            price: price.parse().unwrap(),
820            size: size.parse().unwrap(),
821            maker_address: [1_u8; 20].into(),
822            taker_address: [2_u8; 20].into(),
823            maker_fee: Decimal::ZERO,
824            taker_fee: Decimal::ZERO,
825            timestamp,
826            created_at: Utc::now(),
827        }
828    }
829
830    #[test]
831    fn test_build_option_instrument_candles_single_bucket() {
832        let trades = vec![
833            trade("BTC-20260331-100000-C", "100.0", "1000000", 60_001),
834            trade("BTC-20260331-100000-C", "110.0", "2000000", 60_100),
835            trade("BTC-20260331-100000-C", "90.0", "3000000", 60_200),
836        ];
837
838        let candles = build_option_instrument_candles(
839            "BTC-20260331-100000-C",
840            CandleResolution::OneMinute,
841            &trades,
842        )
843        .unwrap();
844
845        assert_eq!(candles.len(), 1);
846        assert_eq!(
847            candles[0],
848            OptionInstrumentCandle {
849                start_time_ms: 60_000,
850                end_time_ms: 120_000,
851                open: 100.0,
852                high: 110.0,
853                low: 90.0,
854                close: 90.0,
855                volume: 6.0,
856            }
857        );
858    }
859
860    #[test]
861    fn test_build_option_instrument_candles_multiple_buckets() {
862        let trades = vec![
863            trade("BTC-20260331-100000-C", "100.0", "1000000", 1),
864            trade("BTC-20260331-100000-C", "101.0", "1000000", 59_999),
865            trade("BTC-20260331-100000-C", "102.0", "1000000", 60_000),
866            trade("BTC-20260331-100000-C", "103.0", "1000000", 119_000),
867        ];
868
869        let candles = build_option_instrument_candles(
870            "BTC-20260331-100000-C",
871            CandleResolution::OneMinute,
872            &trades,
873        )
874        .unwrap();
875
876        assert_eq!(candles.len(), 2);
877        assert_eq!(candles[0].start_time_ms, 0);
878        assert_eq!(candles[0].close, 101.0);
879        assert_eq!(candles[1].start_time_ms, 60_000);
880        assert_eq!(candles[1].open, 102.0);
881        assert_eq!(candles[1].close, 103.0);
882    }
883
884    #[test]
885    fn test_build_option_instrument_candles_rejects_symbol_mismatch() {
886        let trades = vec![trade("ETH-20260331-100000-C", "100.0", "1000000", 1)];
887
888        let error = build_option_instrument_candles(
889            "BTC-20260331-100000-C",
890            CandleResolution::OneMinute,
891            &trades,
892        )
893        .unwrap_err();
894
895        assert!(matches!(error, CandleSourceError::InvalidResponse(_)));
896    }
897
898    #[derive(Clone)]
899    struct MockCandleSource {
900        calls: Arc<AtomicUsize>,
901        candles: Vec<UnderlyingCandle>,
902    }
903
904    #[async_trait]
905    impl UnderlyingCandleSource for MockCandleSource {
906        async fn fetch_candles(
907            &self,
908            _coin: &str,
909            _resolution: CandleResolution,
910            _start_time_ms: i64,
911            _end_time_ms: i64,
912        ) -> Result<Vec<UnderlyingCandle>, CandleSourceError> {
913            self.calls.fetch_add(1, Ordering::SeqCst);
914            Ok(self.candles.clone())
915        }
916    }
917
918    #[tokio::test]
919    async fn test_ws_publisher_dedupes_unchanged_candle_updates() {
920        let pubsub = Arc::new(PubSubManager::new());
921        let client_id = crate::websocket::uuid::Uuid::new_v4();
922        let (tx, mut rx) = mpsc::unbounded_channel::<Arc<WsMessage>>();
923        pubsub.add_client(client_id, tx, false, None);
924        pubsub
925            .subscribe(client_id, "candles:BTC:1m".to_string(), None, None, None)
926            .expect("subscribe");
927
928        let calls = Arc::new(AtomicUsize::new(0));
929        let source = Arc::new(MockCandleSource {
930            calls: calls.clone(),
931            candles: vec![UnderlyingCandle {
932                start_time_ms: 1_000,
933                end_time_ms: 2_000,
934                open: 100.0,
935                high: 101.0,
936                low: 99.0,
937                close: 100.5,
938                volume: 5.0,
939            }],
940        });
941
942        let mut mapping = HashMap::new();
943        mapping.insert("BTC".to_string(), "BTC".to_string());
944
945        let publisher = CandleWsPublisher::new(
946            pubsub.clone(),
947            source,
948            Arc::new(mapping),
949            Duration::from_millis(20),
950        );
951
952        let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel(1);
953        let handle = tokio::spawn(async move {
954            publisher.run_with_shutdown(shutdown_rx).await;
955        });
956
957        tokio::time::sleep(Duration::from_millis(120)).await;
958        let _ = shutdown_tx.send(());
959        handle.await.expect("publisher task");
960
961        let mut candle_messages = 0;
962        while let Ok(message) = rx.try_recv() {
963            if matches!(message.as_ref(), WsMessage::CandleUpdate(_)) {
964                candle_messages += 1;
965            }
966        }
967
968        assert!(calls.load(Ordering::SeqCst) >= 1);
969        assert_eq!(candle_messages, 1, "unchanged candle should publish once");
970    }
971
972    #[tokio::test]
973    async fn test_ws_publisher_ignores_invalid_channel_format() {
974        let pubsub = Arc::new(PubSubManager::new());
975        let client_id = crate::websocket::uuid::Uuid::new_v4();
976        let (tx, mut rx) = mpsc::unbounded_channel::<Arc<WsMessage>>();
977        pubsub.add_client(client_id, tx, false, None);
978        pubsub
979            .subscribe(client_id, "candles:BTC".to_string(), None, None, None)
980            .expect("subscribe");
981
982        let calls = Arc::new(AtomicUsize::new(0));
983        let source = Arc::new(MockCandleSource {
984            calls: calls.clone(),
985            candles: vec![],
986        });
987
988        let publisher = CandleWsPublisher::new(
989            pubsub,
990            source,
991            Arc::new(HashMap::new()),
992            Duration::from_millis(20),
993        );
994
995        let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel(1);
996        let handle = tokio::spawn(async move {
997            publisher.run_with_shutdown(shutdown_rx).await;
998        });
999
1000        tokio::time::sleep(Duration::from_millis(80)).await;
1001        let _ = shutdown_tx.send(());
1002        handle.await.expect("publisher task");
1003
1004        assert_eq!(calls.load(Ordering::SeqCst), 0);
1005        assert!(rx.try_recv().is_err());
1006    }
1007}