1use std::collections::HashMap;
2use std::sync::Arc;
3
4use axum::{
5 extract::{FromRef, Query, State},
6 http::StatusCode,
7 Json,
8};
9use serde::{Deserialize, Serialize};
10use utoipa::{IntoParams, ToSchema};
11
12use super::AppState;
13use crate::candles::{
14 resolve_candle_coin, validate_candle_time_range, CandleResolution, UnderlyingCandleSource,
15 UnderlyingCandlesResponse,
16};
17use crate::models::ApiResponse;
18
19#[derive(Clone)]
20pub struct CandleHandlerState {
21 pub candle_source: Arc<dyn UnderlyingCandleSource>,
22 pub underlying_to_candle_coin: Arc<HashMap<String, String>>,
23}
24
25impl FromRef<AppState> for CandleHandlerState {
26 fn from_ref(input: &AppState) -> Self {
27 Self {
28 candle_source: input.candle_source.clone(),
29 underlying_to_candle_coin: input.underlying_to_candle_coin.clone(),
30 }
31 }
32}
33
34#[derive(Debug, Deserialize, IntoParams)]
38pub struct CandlesQuery {
39 #[param(example = "BTC")]
41 pub underlying: Option<String>,
42 #[param(example = "BTC-20260331-100000-C")]
44 pub instrument_name: Option<String>,
45 #[param(example = "1m")]
47 pub resolution: CandleResolution,
48 #[param(example = 1710000000000_i64)]
50 pub start_time_ms: i64,
51 #[param(example = 1710003600000_i64)]
53 pub end_time_ms: i64,
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
57#[serde(untagged)]
58pub enum CandlesResponse {
59 Underlying(UnderlyingCandlesResponse),
60}
61
62#[derive(Debug, Serialize, ToSchema)]
63pub struct CandlesApiResponse {
64 pub success: bool,
65 pub data: Option<CandlesResponse>,
66 pub error: Option<String>,
67}
68
69fn error_response<T>(
70 status: StatusCode,
71 message: impl Into<String>,
72) -> (StatusCode, Json<ApiResponse<T>>) {
73 (status, Json(ApiResponse::error(message.into())))
74}
75
76fn parse_underlying(params: &CandlesQuery) -> std::result::Result<String, String> {
77 if params
78 .instrument_name
79 .as_ref()
80 .map(|value| !value.trim().is_empty())
81 .unwrap_or(false)
82 {
83 return Err(
84 "Option candles are no longer served from /candles; use /historical-theos".to_string(),
85 );
86 }
87
88 params
89 .underlying
90 .as_ref()
91 .map(|value| value.trim())
92 .filter(|value| !value.is_empty())
93 .map(|value| value.to_string())
94 .ok_or_else(|| "underlying is required".to_string())
95}
96
97#[utoipa::path(
99 get,
100 path = "/candles",
101 params(CandlesQuery),
102 responses(
103 (status = 200, description = "Historical underlying candles", body = CandlesApiResponse),
104 (status = 400, description = "Invalid query parameters"),
105 (status = 502, description = "Upstream candle source failure")
106 ),
107 tag = "Markets"
108)]
109pub async fn get_candles(
110 State(state): State<CandleHandlerState>,
111 Query(params): Query<CandlesQuery>,
112) -> Result<Json<ApiResponse<CandlesResponse>>, (StatusCode, Json<ApiResponse<CandlesResponse>>)> {
113 validate_candle_time_range(params.start_time_ms, params.end_time_ms)
114 .map_err(|error| error_response(StatusCode::BAD_REQUEST, error.to_string()))?;
115
116 let underlying_param = parse_underlying(¶ms)
117 .map_err(|error| error_response(StatusCode::BAD_REQUEST, error))?;
118
119 let (underlying, candle_coin) =
120 resolve_candle_coin(&underlying_param, &state.underlying_to_candle_coin)
121 .map_err(|error| error_response(StatusCode::BAD_REQUEST, error.to_string()))?;
122
123 let mut candles = state
124 .candle_source
125 .fetch_candles(
126 &candle_coin,
127 params.resolution,
128 params.start_time_ms,
129 params.end_time_ms,
130 )
131 .await
132 .map_err(|error| {
133 tracing::error!(
134 underlying,
135 coin = candle_coin,
136 resolution = %params.resolution,
137 start_time_ms = params.start_time_ms,
138 end_time_ms = params.end_time_ms,
139 error = %error,
140 "Failed to fetch underlying candles"
141 );
142 error_response(
143 StatusCode::BAD_GATEWAY,
144 format!("Failed to fetch candles: {}", error),
145 )
146 })?;
147
148 candles.sort_by_key(|candle| candle.start_time_ms);
149
150 Ok(Json(ApiResponse::success(CandlesResponse::Underlying(
151 UnderlyingCandlesResponse {
152 underlying,
153 resolution: params.resolution,
154 candles,
155 },
156 ))))
157}
158
159#[cfg(test)]
160mod tests {
161 use super::*;
162 use crate::candles::{CandleSourceError, UnderlyingCandle};
163 use async_trait::async_trait;
164 use axum::{body::Body, http::Request, routing::get, Router};
165 use tower::ServiceExt;
166
167 #[derive(Clone)]
168 struct MockCandleSource {
169 candles: Vec<UnderlyingCandle>,
170 fail: bool,
171 }
172
173 #[async_trait]
174 impl UnderlyingCandleSource for MockCandleSource {
175 async fn fetch_candles(
176 &self,
177 _coin: &str,
178 _resolution: CandleResolution,
179 _start_time_ms: i64,
180 _end_time_ms: i64,
181 ) -> Result<Vec<UnderlyingCandle>, CandleSourceError> {
182 if self.fail {
183 return Err(CandleSourceError::Upstream("boom".to_string()));
184 }
185 Ok(self.candles.clone())
186 }
187 }
188
189 fn build_router(source: Arc<dyn UnderlyingCandleSource>) -> Router {
190 let mut mapping = HashMap::new();
191 mapping.insert("BTC".to_string(), "BTC".to_string());
192
193 let state = CandleHandlerState {
194 candle_source: source,
195 underlying_to_candle_coin: Arc::new(mapping),
196 };
197
198 Router::new()
199 .route("/candles", get(get_candles))
200 .with_state(state)
201 }
202
203 #[tokio::test]
204 async fn test_get_underlying_candles_success_orders_results() {
205 let app = build_router(Arc::new(MockCandleSource {
206 candles: vec![
207 UnderlyingCandle {
208 start_time_ms: 200,
209 end_time_ms: 300,
210 open: 2.0,
211 high: 3.0,
212 low: 1.0,
213 close: 2.5,
214 volume: 20.0,
215 },
216 UnderlyingCandle {
217 start_time_ms: 100,
218 end_time_ms: 200,
219 open: 1.0,
220 high: 2.0,
221 low: 0.5,
222 close: 1.5,
223 volume: 10.0,
224 },
225 ],
226 fail: false,
227 }));
228
229 let response = app
230 .oneshot(
231 Request::builder()
232 .uri("/candles?underlying=btc&resolution=1m&start_time_ms=1&end_time_ms=999")
233 .body(Body::empty())
234 .unwrap(),
235 )
236 .await
237 .unwrap();
238
239 assert_eq!(response.status(), StatusCode::OK);
240 let body = axum::body::to_bytes(response.into_body(), usize::MAX)
241 .await
242 .unwrap();
243 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
244
245 assert_eq!(json["success"], true);
246 assert_eq!(json["data"]["underlying"], "BTC");
247 assert_eq!(json["data"]["resolution"], "1m");
248
249 let candles = json["data"]["candles"].as_array().unwrap();
250 assert_eq!(candles.len(), 2);
251 assert_eq!(candles[0]["start_time_ms"], 100);
252 assert_eq!(candles[1]["start_time_ms"], 200);
253 }
254
255 #[tokio::test]
256 async fn test_get_candles_option_selector_rejected() {
257 let app = build_router(Arc::new(MockCandleSource {
258 candles: vec![],
259 fail: false,
260 }));
261
262 let response = app
263 .oneshot(
264 Request::builder()
265 .uri("/candles?instrument_name=BTC-20260331-100000-C&resolution=1m&start_time_ms=1&end_time_ms=999")
266 .body(Body::empty())
267 .unwrap(),
268 )
269 .await
270 .unwrap();
271
272 assert_eq!(response.status(), StatusCode::BAD_REQUEST);
273 let body = axum::body::to_bytes(response.into_body(), usize::MAX)
274 .await
275 .unwrap();
276 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
277 assert_eq!(
278 json["error"],
279 "Option candles are no longer served from /candles; use /historical-theos"
280 );
281 }
282
283 #[tokio::test]
284 async fn test_get_candles_missing_underlying_returns_400() {
285 let app = build_router(Arc::new(MockCandleSource {
286 candles: vec![],
287 fail: false,
288 }));
289
290 let response = app
291 .oneshot(
292 Request::builder()
293 .uri("/candles?resolution=1m&start_time_ms=1&end_time_ms=2")
294 .body(Body::empty())
295 .unwrap(),
296 )
297 .await
298 .unwrap();
299
300 assert_eq!(response.status(), StatusCode::BAD_REQUEST);
301 }
302
303 #[tokio::test]
304 async fn test_get_candles_unknown_underlying_returns_400() {
305 let app = build_router(Arc::new(MockCandleSource {
306 candles: vec![],
307 fail: false,
308 }));
309
310 let response = app
311 .oneshot(
312 Request::builder()
313 .uri("/candles?underlying=SOL&resolution=1m&start_time_ms=1&end_time_ms=2")
314 .body(Body::empty())
315 .unwrap(),
316 )
317 .await
318 .unwrap();
319
320 assert_eq!(response.status(), StatusCode::BAD_REQUEST);
321 }
322
323 #[tokio::test]
324 async fn test_get_candles_upstream_failure_returns_502() {
325 let app = build_router(Arc::new(MockCandleSource {
326 candles: vec![],
327 fail: true,
328 }));
329
330 let response = app
331 .oneshot(
332 Request::builder()
333 .uri("/candles?underlying=BTC&resolution=1m&start_time_ms=1&end_time_ms=2")
334 .body(Body::empty())
335 .unwrap(),
336 )
337 .await
338 .unwrap();
339
340 assert_eq!(response.status(), StatusCode::BAD_GATEWAY);
341 }
342}