1use async_trait::async_trait;
2use chrono::Utc;
3use metrics::counter;
4use rust_decimal::Decimal;
5use serde::{Deserialize, Serialize};
6use std::collections::{HashMap, HashSet};
7use std::fmt;
8use std::str::FromStr;
9use std::sync::Arc;
10use std::time::Duration;
11use tokio::time::MissedTickBehavior;
12use tracing::{debug, warn};
13use utoipa::ToSchema;
14
15use super::websocket::{PubSubManager, WsCandleUpdate, WsMessage};
16use crate::models::Trade;
17use hypercall_types::to_human_readable_decimal;
18pub use hypercall_types::ws_protocol::CandleResolution;
19
20pub const CANDLE_WS_CHANNEL_PREFIX: &str = "candles:";
21const DEFAULT_CANDLE_WS_POLL_INTERVAL_MS: u64 = 2_000;
22const DEFAULT_HYPERLIQUID_INFO_URL: &str = "https://api.hyperliquid.xyz/info";
23const DEFAULT_HYPERLIQUID_TESTNET_INFO_URL: &str = "https://api.hyperliquid-testnet.xyz/info";
24pub const MAX_OPTION_CANDLE_QUERY_SPAN_MS: i64 = 2_678_400_000;
25
26#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)]
27pub struct UnderlyingCandle {
28 pub start_time_ms: i64,
29 pub end_time_ms: i64,
30 pub open: f64,
31 pub high: f64,
32 pub low: f64,
33 pub close: f64,
34 pub volume: f64,
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
38pub struct UnderlyingCandlesResponse {
39 pub underlying: String,
40 pub resolution: CandleResolution,
41 pub candles: Vec<UnderlyingCandle>,
42}
43
44#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)]
45pub struct OptionInstrumentCandle {
46 pub start_time_ms: i64,
47 pub end_time_ms: i64,
48 pub open: f64,
49 pub high: f64,
50 pub low: f64,
51 pub close: f64,
52 pub volume: f64,
53}
54
55#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
56pub struct OptionInstrumentCandlesResponse {
57 pub instrument_name: String,
58 pub resolution: CandleResolution,
59 pub candles: Vec<OptionInstrumentCandle>,
60}
61
62#[derive(Debug, Clone, PartialEq, Eq)]
63pub enum CandleValidationError {
64 InvalidStartTimeMs(i64),
65 InvalidEndTimeMs(i64),
66 EndTimeNotAfterStart {
67 start_time_ms: i64,
68 end_time_ms: i64,
69 },
70 OptionQuerySpanTooLarge {
71 start_time_ms: i64,
72 end_time_ms: i64,
73 max_span_ms: i64,
74 },
75 BucketAlignmentOverflow {
76 end_time_ms: i64,
77 resolution: CandleResolution,
78 },
79 UnknownUnderlying(String),
80}
81
82impl fmt::Display for CandleValidationError {
83 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
84 match self {
85 Self::InvalidStartTimeMs(ts) => {
86 write!(f, "start_time_ms must be non-negative, got {}", ts)
87 }
88 Self::InvalidEndTimeMs(ts) => {
89 write!(f, "end_time_ms must be non-negative, got {}", ts)
90 }
91 Self::EndTimeNotAfterStart {
92 start_time_ms,
93 end_time_ms,
94 } => write!(
95 f,
96 "end_time_ms ({}) must be greater than start_time_ms ({})",
97 end_time_ms, start_time_ms
98 ),
99 Self::OptionQuerySpanTooLarge {
100 start_time_ms,
101 end_time_ms,
102 max_span_ms,
103 } => write!(
104 f,
105 "option candle query span [{}, {}) exceeds max supported window of {} ms",
106 start_time_ms, end_time_ms, max_span_ms
107 ),
108 Self::BucketAlignmentOverflow {
109 end_time_ms,
110 resolution,
111 } => write!(
112 f,
113 "end_time_ms ({}) cannot be aligned to {} bucket boundaries without overflow",
114 end_time_ms, resolution
115 ),
116 Self::UnknownUnderlying(underlying) => {
117 write!(
118 f,
119 "Unsupported underlying '{}': no candle coin mapping",
120 underlying
121 )
122 }
123 }
124 }
125}
126
127impl std::error::Error for CandleValidationError {}
128
129#[derive(Debug, Clone)]
130pub enum CandleSourceError {
131 Upstream(String),
132 InvalidResponse(String),
133}
134
135impl fmt::Display for CandleSourceError {
136 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
137 match self {
138 Self::Upstream(msg) => write!(f, "upstream candle request failed: {}", msg),
139 Self::InvalidResponse(msg) => write!(f, "invalid upstream candle payload: {}", msg),
140 }
141 }
142}
143
144impl std::error::Error for CandleSourceError {}
145
146pub fn normalize_underlying(underlying: &str) -> String {
147 underlying.trim().to_ascii_uppercase()
148}
149
150pub fn validate_candle_time_range(
151 start_time_ms: i64,
152 end_time_ms: i64,
153) -> Result<(), CandleValidationError> {
154 if start_time_ms < 0 {
155 return Err(CandleValidationError::InvalidStartTimeMs(start_time_ms));
156 }
157 if end_time_ms < 0 {
158 return Err(CandleValidationError::InvalidEndTimeMs(end_time_ms));
159 }
160 if end_time_ms <= start_time_ms {
161 return Err(CandleValidationError::EndTimeNotAfterStart {
162 start_time_ms,
163 end_time_ms,
164 });
165 }
166 Ok(())
167}
168
169pub fn validate_option_candle_query_time_range(
170 start_time_ms: i64,
171 end_time_ms: i64,
172) -> Result<(), CandleValidationError> {
173 validate_candle_time_range(start_time_ms, end_time_ms)?;
174
175 let span_ms = end_time_ms.checked_sub(start_time_ms).expect(
176 "validated candle time range must have a positive span that fits in i64 arithmetic",
177 );
178 if span_ms > MAX_OPTION_CANDLE_QUERY_SPAN_MS {
179 return Err(CandleValidationError::OptionQuerySpanTooLarge {
180 start_time_ms,
181 end_time_ms,
182 max_span_ms: MAX_OPTION_CANDLE_QUERY_SPAN_MS,
183 });
184 }
185
186 Ok(())
187}
188
189pub fn align_candle_time_range_to_buckets(
190 start_time_ms: i64,
191 end_time_ms: i64,
192 resolution: CandleResolution,
193) -> Result<(i64, i64), CandleValidationError> {
194 let interval_ms = resolution.interval_ms();
195 let aligned_start_time_ms = start_time_ms - (start_time_ms % interval_ms);
196 let end_remainder_ms = end_time_ms % interval_ms;
197 let aligned_end_time_ms = if end_remainder_ms == 0 {
198 end_time_ms
199 } else {
200 end_time_ms
201 .checked_add(interval_ms - end_remainder_ms)
202 .ok_or(CandleValidationError::BucketAlignmentOverflow {
203 end_time_ms,
204 resolution,
205 })?
206 };
207
208 Ok((aligned_start_time_ms, aligned_end_time_ms))
209}
210
211pub fn build_option_instrument_candles(
212 symbol: &str,
213 resolution: CandleResolution,
214 trades: &[Trade],
215) -> Result<Vec<OptionInstrumentCandle>, CandleSourceError> {
216 let interval_ms = resolution.interval_ms();
217 let mut candles = Vec::new();
218 let mut current: Option<OptionCandleAccumulator> = None;
219
220 for trade in trades {
221 if trade.symbol != symbol {
222 return Err(CandleSourceError::InvalidResponse(format!(
223 "trade symbol mismatch: expected {}, got {}",
224 symbol, trade.symbol
225 )));
226 }
227
228 let bucket_start_time_ms = trade.timestamp - (trade.timestamp % interval_ms);
229 let price = decimal_to_f64(trade.price, "price")?;
230 let volume = decimal_to_f64(to_human_readable_decimal(symbol, trade.size), "volume")?;
231
232 match current.take() {
233 Some(mut accumulator) if accumulator.start_time_ms == bucket_start_time_ms => {
234 accumulator.observe(price, volume);
235 current = Some(accumulator);
236 }
237 Some(accumulator) => {
238 candles.push(accumulator.finish(interval_ms)?);
239 current = Some(OptionCandleAccumulator::new(
240 bucket_start_time_ms,
241 price,
242 volume,
243 ));
244 }
245 None => {
246 current = Some(OptionCandleAccumulator::new(
247 bucket_start_time_ms,
248 price,
249 volume,
250 ));
251 }
252 }
253 }
254
255 if let Some(accumulator) = current {
256 candles.push(accumulator.finish(interval_ms)?);
257 }
258
259 Ok(candles)
260}
261
262pub fn resolve_candle_coin(
263 underlying: &str,
264 underlying_to_candle_coin: &HashMap<String, String>,
265) -> Result<(String, String), CandleValidationError> {
266 let normalized_underlying = normalize_underlying(underlying);
267 if let Some(coin) = underlying_to_candle_coin.get(&normalized_underlying) {
268 Ok((normalized_underlying, coin.clone()))
269 } else {
270 Err(CandleValidationError::UnknownUnderlying(
271 normalized_underlying,
272 ))
273 }
274}
275
276#[async_trait]
277pub trait UnderlyingCandleSource: Send + Sync {
278 async fn fetch_candles(
279 &self,
280 coin: &str,
281 resolution: CandleResolution,
282 start_time_ms: i64,
283 end_time_ms: i64,
284 ) -> Result<Vec<UnderlyingCandle>, CandleSourceError>;
285}
286
287#[derive(Clone)]
288pub struct HyperliquidCandleSource {
289 client: reqwest::Client,
290 info_url: String,
291}
292
293impl HyperliquidCandleSource {
294 pub fn new(info_url: String) -> Result<Self, String> {
295 let client = reqwest::Client::builder()
296 .timeout(Duration::from_secs(10))
297 .build()
298 .map_err(|e| format!("failed to build hyperliquid candle client: {}", e))?;
299 Ok(Self { client, info_url })
300 }
301
302 pub fn from_config(
303 pricing: &hypercall_config::PricingConfig,
304 is_testnet_mode: bool,
305 ) -> Result<Self, String> {
306 let info_url = if pricing.hyperliquid_info_url.is_empty() {
307 if is_testnet_mode {
308 DEFAULT_HYPERLIQUID_TESTNET_INFO_URL.to_string()
309 } else {
310 DEFAULT_HYPERLIQUID_INFO_URL.to_string()
311 }
312 } else {
313 pricing.hyperliquid_info_url.clone()
314 };
315 Self::new(info_url)
316 }
317}
318
319#[derive(Debug, Serialize)]
320struct HyperliquidCandleSnapshotEnvelope {
321 #[serde(rename = "type")]
322 request_type: &'static str,
323 req: HyperliquidCandleSnapshotReq,
324}
325
326#[derive(Debug, Serialize)]
327#[serde(rename_all = "camelCase")]
328struct HyperliquidCandleSnapshotReq {
329 coin: String,
330 interval: String,
331 start_time: i64,
332 end_time: i64,
333}
334
335#[derive(Debug, Deserialize)]
336struct HyperliquidCandleSnapshot {
337 #[serde(rename = "t")]
338 start_time_ms: i64,
339 #[serde(rename = "T")]
340 end_time_ms: i64,
341 #[serde(rename = "o")]
342 open: String,
343 #[serde(rename = "h")]
344 high: String,
345 #[serde(rename = "l")]
346 low: String,
347 #[serde(rename = "c")]
348 close: String,
349 #[serde(rename = "v")]
350 volume: String,
351}
352
353fn parse_f64_field(value: &str, field: &str) -> Result<f64, CandleSourceError> {
354 let parsed = value.parse::<f64>().map_err(|e| {
355 CandleSourceError::InvalidResponse(format!("failed to parse {}='{}': {}", field, value, e))
356 })?;
357 if !parsed.is_finite() {
358 return Err(CandleSourceError::InvalidResponse(format!(
359 "{} must be finite, got {}",
360 field, value
361 )));
362 }
363 Ok(parsed)
364}
365
366fn decimal_to_f64(value: Decimal, field: &str) -> Result<f64, CandleSourceError> {
367 let value_text = value.to_string();
368 parse_f64_field(&value_text, field)
369}
370
371#[derive(Debug, Clone)]
372struct OptionCandleAccumulator {
373 start_time_ms: i64,
374 open: f64,
375 high: f64,
376 low: f64,
377 close: f64,
378 volume: f64,
379}
380
381impl OptionCandleAccumulator {
382 fn new(start_time_ms: i64, price: f64, volume: f64) -> Self {
383 Self {
384 start_time_ms,
385 open: price,
386 high: price,
387 low: price,
388 close: price,
389 volume,
390 }
391 }
392
393 fn observe(&mut self, price: f64, volume: f64) {
394 self.high = self.high.max(price);
395 self.low = self.low.min(price);
396 self.close = price;
397 self.volume += volume;
398 }
399
400 fn finish(self, interval_ms: i64) -> Result<OptionInstrumentCandle, CandleSourceError> {
401 let end_time_ms = self.start_time_ms + interval_ms;
402 validate_candle_time_range(self.start_time_ms, end_time_ms)
403 .map_err(|e| CandleSourceError::InvalidResponse(e.to_string()))?;
404
405 Ok(OptionInstrumentCandle {
406 start_time_ms: self.start_time_ms,
407 end_time_ms,
408 open: self.open,
409 high: self.high,
410 low: self.low,
411 close: self.close,
412 volume: self.volume,
413 })
414 }
415}
416
417fn parse_hyperliquid_snapshots(
418 snapshots: Vec<HyperliquidCandleSnapshot>,
419) -> Result<Vec<UnderlyingCandle>, CandleSourceError> {
420 let mut candles = Vec::with_capacity(snapshots.len());
421
422 for snapshot in snapshots {
423 validate_candle_time_range(snapshot.start_time_ms, snapshot.end_time_ms)
424 .map_err(|e| CandleSourceError::InvalidResponse(e.to_string()))?;
425
426 let open = parse_f64_field(&snapshot.open, "open")?;
427 let high = parse_f64_field(&snapshot.high, "high")?;
428 let low = parse_f64_field(&snapshot.low, "low")?;
429 let close = parse_f64_field(&snapshot.close, "close")?;
430 let volume = parse_f64_field(&snapshot.volume, "volume")?;
431
432 candles.push(UnderlyingCandle {
433 start_time_ms: snapshot.start_time_ms,
434 end_time_ms: snapshot.end_time_ms,
435 open,
436 high,
437 low,
438 close,
439 volume,
440 });
441 }
442
443 candles.sort_by_key(|candle| candle.start_time_ms);
444 Ok(candles)
445}
446
447#[async_trait]
448impl UnderlyingCandleSource for HyperliquidCandleSource {
449 async fn fetch_candles(
450 &self,
451 coin: &str,
452 resolution: CandleResolution,
453 start_time_ms: i64,
454 end_time_ms: i64,
455 ) -> Result<Vec<UnderlyingCandle>, CandleSourceError> {
456 validate_candle_time_range(start_time_ms, end_time_ms)
457 .map_err(|e| CandleSourceError::InvalidResponse(e.to_string()))?;
458
459 let body = HyperliquidCandleSnapshotEnvelope {
460 request_type: "candleSnapshot",
461 req: HyperliquidCandleSnapshotReq {
462 coin: coin.to_string(),
463 interval: resolution.as_str().to_string(),
464 start_time: start_time_ms,
465 end_time: end_time_ms,
466 },
467 };
468
469 let response = self
470 .client
471 .post(&self.info_url)
472 .json(&body)
473 .send()
474 .await
475 .map_err(|e| CandleSourceError::Upstream(e.to_string()))?;
476
477 if !response.status().is_success() {
478 return Err(CandleSourceError::Upstream(format!(
479 "status={} url={}",
480 response.status(),
481 self.info_url
482 )));
483 }
484
485 let snapshots = response
486 .json::<Vec<HyperliquidCandleSnapshot>>()
487 .await
488 .map_err(|e| CandleSourceError::InvalidResponse(e.to_string()))?;
489
490 parse_hyperliquid_snapshots(snapshots)
491 }
492}
493
494pub fn parse_candle_channel(channel: &str) -> Option<(String, CandleResolution)> {
495 let mut parts = channel.split(':');
496 let prefix = parts.next()?;
497 if prefix != "candles" {
498 return None;
499 }
500
501 let underlying = parts.next()?.trim();
502 if underlying.is_empty() {
503 return None;
504 }
505
506 let resolution = parts.next()?.trim();
507 if parts.next().is_some() {
508 return None;
509 }
510
511 let parsed_resolution = CandleResolution::from_str(resolution).ok()?;
512 Some((normalize_underlying(underlying), parsed_resolution))
513}
514
515pub fn candle_ws_poll_interval_ms(pricing: &hypercall_config::PricingConfig) -> u64 {
516 if pricing.candle_ws_poll_interval_ms == 0 {
517 DEFAULT_CANDLE_WS_POLL_INTERVAL_MS
518 } else {
519 pricing.candle_ws_poll_interval_ms
520 }
521}
522
523#[derive(Debug, Clone, Copy, PartialEq, Eq)]
524struct CandleFingerprint {
525 start_time_ms: i64,
526 end_time_ms: i64,
527 open_bits: u64,
528 high_bits: u64,
529 low_bits: u64,
530 close_bits: u64,
531 volume_bits: u64,
532}
533
534impl CandleFingerprint {
535 fn from_update(update: &WsCandleUpdate) -> Self {
536 Self {
537 start_time_ms: update.start_time_ms,
538 end_time_ms: update.end_time_ms,
539 open_bits: update.open.to_bits(),
540 high_bits: update.high.to_bits(),
541 low_bits: update.low.to_bits(),
542 close_bits: update.close.to_bits(),
543 volume_bits: update.volume.to_bits(),
544 }
545 }
546}
547
548pub struct CandleWsPublisher {
549 pubsub: Arc<PubSubManager>,
550 candle_source: Arc<dyn UnderlyingCandleSource>,
551 underlying_to_candle_coin: Arc<HashMap<String, String>>,
552 poll_interval: Duration,
553 last_published: tokio::sync::Mutex<HashMap<String, CandleFingerprint>>,
554}
555
556impl CandleWsPublisher {
557 pub fn new(
558 pubsub: Arc<PubSubManager>,
559 candle_source: Arc<dyn UnderlyingCandleSource>,
560 underlying_to_candle_coin: Arc<HashMap<String, String>>,
561 poll_interval: Duration,
562 ) -> Self {
563 Self {
564 pubsub,
565 candle_source,
566 underlying_to_candle_coin,
567 poll_interval,
568 last_published: tokio::sync::Mutex::new(HashMap::new()),
569 }
570 }
571
572 pub async fn run_with_shutdown(&self, mut shutdown_rx: tokio::sync::broadcast::Receiver<()>) {
573 let mut ticker = tokio::time::interval(self.poll_interval);
574 ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
575
576 loop {
577 tokio::select! {
578 _ = shutdown_rx.recv() => {
579 debug!("CandleWsPublisher received shutdown signal");
580 break;
581 }
582 _ = ticker.tick() => {
583 self.publish_once().await;
584 }
585 }
586 }
587 }
588
589 async fn publish_once(&self) {
590 let channels = self
591 .pubsub
592 .active_channels_with_prefix(CANDLE_WS_CHANNEL_PREFIX)
593 .await;
594
595 let active: HashSet<String> = channels.iter().cloned().collect();
596 {
597 let mut last = self.last_published.lock().await;
598 last.retain(|channel, _| active.contains(channel));
599 }
600
601 for channel in channels {
602 let Some((underlying, resolution)) = parse_candle_channel(&channel) else {
603 counter!("ht_ws_candle_invalid_channel_total").increment(1);
604 debug!(channel, "Skipping invalid candle channel format");
605 continue;
606 };
607
608 let Some(coin) = self.underlying_to_candle_coin.get(&underlying) else {
609 counter!("ht_ws_candle_unknown_underlying_total", "underlying" => underlying.clone()).increment(1);
610 debug!(
611 channel,
612 underlying, "Skipping candle channel with unknown underlying"
613 );
614 continue;
615 };
616
617 let end_time_ms = Utc::now().timestamp_millis();
618 let lookback_ms = resolution.interval_ms().saturating_mul(2);
619 let start_time_ms = end_time_ms.saturating_sub(lookback_ms);
620
621 match self
622 .candle_source
623 .fetch_candles(coin, resolution, start_time_ms, end_time_ms)
624 .await
625 {
626 Ok(candles) => {
627 let Some(latest) = candles
628 .into_iter()
629 .max_by_key(|candle| candle.start_time_ms)
630 else {
631 continue;
632 };
633
634 let update = WsCandleUpdate {
635 underlying: underlying.clone(),
636 resolution,
637 start_time_ms: latest.start_time_ms,
638 end_time_ms: latest.end_time_ms,
639 open: latest.open,
640 high: latest.high,
641 low: latest.low,
642 close: latest.close,
643 volume: latest.volume,
644 };
645
646 let next_fingerprint = CandleFingerprint::from_update(&update);
647 {
648 let last = self.last_published.lock().await;
649 if last.get(&channel) == Some(&next_fingerprint) {
650 continue;
651 }
652 }
653
654 self.pubsub
655 .publish_to_channel(&channel, WsMessage::CandleUpdate(update));
656 self.last_published
657 .lock()
658 .await
659 .insert(channel, next_fingerprint);
660 }
661 Err(e) => {
662 counter!(
663 "ht_ws_candle_fetch_failures_total",
664 "underlying" => underlying,
665 "resolution" => resolution.as_str().to_string()
666 )
667 .increment(1);
668 warn!(channel, error = %e, "Failed to fetch candles for websocket publishing");
669 }
670 }
671 }
672 }
673}
674
675#[cfg(test)]
676mod tests {
677 use super::*;
678 use std::sync::atomic::{AtomicUsize, Ordering};
679 use tokio::sync::mpsc;
680
681 #[test]
682 fn test_candle_resolution_roundtrip() {
683 let all = [
684 CandleResolution::OneMinute,
685 CandleResolution::FiveMinutes,
686 CandleResolution::FifteenMinutes,
687 CandleResolution::OneHour,
688 CandleResolution::FourHours,
689 CandleResolution::OneDay,
690 ];
691
692 for resolution in all {
693 let json = serde_json::to_string(&resolution).expect("serialize resolution");
694 let parsed: CandleResolution = serde_json::from_str(&json).expect("parse resolution");
695 assert_eq!(parsed, resolution);
696 assert_eq!(
697 CandleResolution::from_str(resolution.as_str()).unwrap(),
698 resolution
699 );
700 }
701
702 let invalid = serde_json::from_str::<CandleResolution>("\"10m\"");
703 assert!(invalid.is_err());
704 }
705
706 #[test]
707 fn test_interval_ms_mapping() {
708 assert_eq!(CandleResolution::OneMinute.interval_ms(), 60_000);
709 assert_eq!(CandleResolution::FiveMinutes.interval_ms(), 300_000);
710 assert_eq!(CandleResolution::FifteenMinutes.interval_ms(), 900_000);
711 assert_eq!(CandleResolution::OneHour.interval_ms(), 3_600_000);
712 assert_eq!(CandleResolution::FourHours.interval_ms(), 14_400_000);
713 assert_eq!(CandleResolution::OneDay.interval_ms(), 86_400_000);
714 }
715
716 #[test]
717 fn test_validate_candle_time_range_rejects_invalid_values() {
718 assert_eq!(
719 validate_candle_time_range(-1, 100).unwrap_err(),
720 CandleValidationError::InvalidStartTimeMs(-1)
721 );
722 assert_eq!(
723 validate_candle_time_range(0, -1).unwrap_err(),
724 CandleValidationError::InvalidEndTimeMs(-1)
725 );
726 assert_eq!(
727 validate_candle_time_range(100, 100).unwrap_err(),
728 CandleValidationError::EndTimeNotAfterStart {
729 start_time_ms: 100,
730 end_time_ms: 100
731 }
732 );
733 }
734
735 #[test]
736 fn test_validate_option_candle_query_time_range_rejects_large_spans() {
737 let error = validate_option_candle_query_time_range(0, MAX_OPTION_CANDLE_QUERY_SPAN_MS + 1)
738 .unwrap_err();
739
740 assert_eq!(
741 error,
742 CandleValidationError::OptionQuerySpanTooLarge {
743 start_time_ms: 0,
744 end_time_ms: MAX_OPTION_CANDLE_QUERY_SPAN_MS + 1,
745 max_span_ms: MAX_OPTION_CANDLE_QUERY_SPAN_MS,
746 }
747 );
748 }
749
750 #[test]
751 fn test_align_candle_time_range_to_buckets_expands_edges() {
752 let aligned =
753 align_candle_time_range_to_buckets(61_000, 121_000, CandleResolution::OneMinute)
754 .unwrap();
755
756 assert_eq!(aligned, (60_000, 180_000));
757 }
758
759 #[test]
760 fn test_align_candle_time_range_to_buckets_keeps_aligned_edges() {
761 let aligned =
762 align_candle_time_range_to_buckets(60_000, 120_000, CandleResolution::OneMinute)
763 .unwrap();
764
765 assert_eq!(aligned, (60_000, 120_000));
766 }
767
768 #[test]
769 fn test_align_candle_time_range_to_buckets_rejects_overflow() {
770 let error =
771 align_candle_time_range_to_buckets(60_000, i64::MAX - 1, CandleResolution::OneMinute)
772 .unwrap_err();
773
774 assert_eq!(
775 error,
776 CandleValidationError::BucketAlignmentOverflow {
777 end_time_ms: i64::MAX - 1,
778 resolution: CandleResolution::OneMinute,
779 }
780 );
781 }
782
783 #[test]
784 fn test_parse_hyperliquid_snapshots_numeric_conversion() {
785 let snapshots = vec![HyperliquidCandleSnapshot {
786 start_time_ms: 1_000,
787 end_time_ms: 2_000,
788 open: "100.25".to_string(),
789 high: "101.50".to_string(),
790 low: "99.75".to_string(),
791 close: "100.75".to_string(),
792 volume: "42.5".to_string(),
793 }];
794
795 let candles = parse_hyperliquid_snapshots(snapshots).expect("parse snapshots");
796 assert_eq!(candles.len(), 1);
797 assert_eq!(candles[0].open, 100.25);
798 assert_eq!(candles[0].high, 101.50);
799 assert_eq!(candles[0].low, 99.75);
800 assert_eq!(candles[0].close, 100.75);
801 assert_eq!(candles[0].volume, 42.5);
802 }
803
804 #[test]
805 fn test_parse_candle_channel() {
806 let parsed = parse_candle_channel("candles:btc:1m").expect("valid channel");
807 assert_eq!(parsed.0, "BTC");
808 assert_eq!(parsed.1, CandleResolution::OneMinute);
809
810 assert!(parse_candle_channel("candles:BTC").is_none());
811 assert!(parse_candle_channel("candles:BTC:10m").is_none());
812 assert!(parse_candle_channel("orderbook").is_none());
813 }
814
815 fn trade(symbol: &str, price: &str, size: &str, timestamp: i64) -> Trade {
816 Trade {
817 trade_id: timestamp,
818 symbol: symbol.to_string(),
819 price: price.parse().unwrap(),
820 size: size.parse().unwrap(),
821 maker_address: [1_u8; 20].into(),
822 taker_address: [2_u8; 20].into(),
823 maker_fee: Decimal::ZERO,
824 taker_fee: Decimal::ZERO,
825 timestamp,
826 created_at: Utc::now(),
827 }
828 }
829
830 #[test]
831 fn test_build_option_instrument_candles_single_bucket() {
832 let trades = vec![
833 trade("BTC-20260331-100000-C", "100.0", "1000000", 60_001),
834 trade("BTC-20260331-100000-C", "110.0", "2000000", 60_100),
835 trade("BTC-20260331-100000-C", "90.0", "3000000", 60_200),
836 ];
837
838 let candles = build_option_instrument_candles(
839 "BTC-20260331-100000-C",
840 CandleResolution::OneMinute,
841 &trades,
842 )
843 .unwrap();
844
845 assert_eq!(candles.len(), 1);
846 assert_eq!(
847 candles[0],
848 OptionInstrumentCandle {
849 start_time_ms: 60_000,
850 end_time_ms: 120_000,
851 open: 100.0,
852 high: 110.0,
853 low: 90.0,
854 close: 90.0,
855 volume: 6.0,
856 }
857 );
858 }
859
860 #[test]
861 fn test_build_option_instrument_candles_multiple_buckets() {
862 let trades = vec![
863 trade("BTC-20260331-100000-C", "100.0", "1000000", 1),
864 trade("BTC-20260331-100000-C", "101.0", "1000000", 59_999),
865 trade("BTC-20260331-100000-C", "102.0", "1000000", 60_000),
866 trade("BTC-20260331-100000-C", "103.0", "1000000", 119_000),
867 ];
868
869 let candles = build_option_instrument_candles(
870 "BTC-20260331-100000-C",
871 CandleResolution::OneMinute,
872 &trades,
873 )
874 .unwrap();
875
876 assert_eq!(candles.len(), 2);
877 assert_eq!(candles[0].start_time_ms, 0);
878 assert_eq!(candles[0].close, 101.0);
879 assert_eq!(candles[1].start_time_ms, 60_000);
880 assert_eq!(candles[1].open, 102.0);
881 assert_eq!(candles[1].close, 103.0);
882 }
883
884 #[test]
885 fn test_build_option_instrument_candles_rejects_symbol_mismatch() {
886 let trades = vec![trade("ETH-20260331-100000-C", "100.0", "1000000", 1)];
887
888 let error = build_option_instrument_candles(
889 "BTC-20260331-100000-C",
890 CandleResolution::OneMinute,
891 &trades,
892 )
893 .unwrap_err();
894
895 assert!(matches!(error, CandleSourceError::InvalidResponse(_)));
896 }
897
898 #[derive(Clone)]
899 struct MockCandleSource {
900 calls: Arc<AtomicUsize>,
901 candles: Vec<UnderlyingCandle>,
902 }
903
904 #[async_trait]
905 impl UnderlyingCandleSource for MockCandleSource {
906 async fn fetch_candles(
907 &self,
908 _coin: &str,
909 _resolution: CandleResolution,
910 _start_time_ms: i64,
911 _end_time_ms: i64,
912 ) -> Result<Vec<UnderlyingCandle>, CandleSourceError> {
913 self.calls.fetch_add(1, Ordering::SeqCst);
914 Ok(self.candles.clone())
915 }
916 }
917
918 #[tokio::test]
919 async fn test_ws_publisher_dedupes_unchanged_candle_updates() {
920 let pubsub = Arc::new(PubSubManager::new());
921 let client_id = crate::websocket::uuid::Uuid::new_v4();
922 let (tx, mut rx) = mpsc::unbounded_channel::<Arc<WsMessage>>();
923 pubsub.add_client(client_id, tx, false, None);
924 pubsub
925 .subscribe(client_id, "candles:BTC:1m".to_string(), None, None, None)
926 .expect("subscribe");
927
928 let calls = Arc::new(AtomicUsize::new(0));
929 let source = Arc::new(MockCandleSource {
930 calls: calls.clone(),
931 candles: vec![UnderlyingCandle {
932 start_time_ms: 1_000,
933 end_time_ms: 2_000,
934 open: 100.0,
935 high: 101.0,
936 low: 99.0,
937 close: 100.5,
938 volume: 5.0,
939 }],
940 });
941
942 let mut mapping = HashMap::new();
943 mapping.insert("BTC".to_string(), "BTC".to_string());
944
945 let publisher = CandleWsPublisher::new(
946 pubsub.clone(),
947 source,
948 Arc::new(mapping),
949 Duration::from_millis(20),
950 );
951
952 let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel(1);
953 let handle = tokio::spawn(async move {
954 publisher.run_with_shutdown(shutdown_rx).await;
955 });
956
957 tokio::time::sleep(Duration::from_millis(120)).await;
958 let _ = shutdown_tx.send(());
959 handle.await.expect("publisher task");
960
961 let mut candle_messages = 0;
962 while let Ok(message) = rx.try_recv() {
963 if matches!(message.as_ref(), WsMessage::CandleUpdate(_)) {
964 candle_messages += 1;
965 }
966 }
967
968 assert!(calls.load(Ordering::SeqCst) >= 1);
969 assert_eq!(candle_messages, 1, "unchanged candle should publish once");
970 }
971
972 #[tokio::test]
973 async fn test_ws_publisher_ignores_invalid_channel_format() {
974 let pubsub = Arc::new(PubSubManager::new());
975 let client_id = crate::websocket::uuid::Uuid::new_v4();
976 let (tx, mut rx) = mpsc::unbounded_channel::<Arc<WsMessage>>();
977 pubsub.add_client(client_id, tx, false, None);
978 pubsub
979 .subscribe(client_id, "candles:BTC".to_string(), None, None, None)
980 .expect("subscribe");
981
982 let calls = Arc::new(AtomicUsize::new(0));
983 let source = Arc::new(MockCandleSource {
984 calls: calls.clone(),
985 candles: vec![],
986 });
987
988 let publisher = CandleWsPublisher::new(
989 pubsub,
990 source,
991 Arc::new(HashMap::new()),
992 Duration::from_millis(20),
993 );
994
995 let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel(1);
996 let handle = tokio::spawn(async move {
997 publisher.run_with_shutdown(shutdown_rx).await;
998 });
999
1000 tokio::time::sleep(Duration::from_millis(80)).await;
1001 let _ = shutdown_tx.send(());
1002 handle.await.expect("publisher task");
1003
1004 assert_eq!(calls.load(Ordering::SeqCst), 0);
1005 assert!(rx.try_recv().is_err());
1006 }
1007}