1use crate::runtime_status::{StandbyReplayProgress, StartupProgressReader};
2use crate::sonic_json::SonicJson;
3use axum::{extract::State, http::StatusCode, response::IntoResponse};
4use serde_json::json;
5
6use super::AppState;
7
8pub use hypercall_runtime_api::drain::drain_marker_path;
9
10#[utoipa::path(
12 get,
13 path = "/health",
14 responses(
15 (status = 200, description = "Health status", body = HealthResponse),
16 (status = 503, description = "Service is shutting down", body = HealthResponse)
17 ),
18 tag = "Health"
19)]
20pub async fn health(State(app_state): State<AppState>) -> impl IntoResponse {
21 let shutting_down = app_state.shutdown.is_triggered();
22 let response = crate::models::HealthResponse {
23 status: if shutting_down {
24 "shutting_down".to_string()
25 } else {
26 "ok".to_string()
27 },
28 };
29
30 if shutting_down {
31 (StatusCode::SERVICE_UNAVAILABLE, SonicJson(response))
32 } else {
33 (StatusCode::OK, SonicJson(response))
34 }
35}
36
37#[utoipa::path(
39 get,
40 path = "/version",
41 responses(
42 (status = 200, description = "Version information", body = VersionResponse)
43 ),
44 tag = "Health"
45)]
46pub async fn version(
47 State(app_state): State<AppState>,
48) -> SonicJson<crate::models::VersionResponse> {
49 let build_info = &app_state.build_info;
50 SonicJson(crate::models::VersionResponse {
51 version: build_info.version.clone(),
52 commit: build_info.commit.clone(),
53 git_ref: build_info.git_ref.clone(),
54 build_time: build_info.build_time.clone(),
55 signing_chain_id: Some(app_state.runtime_config.signing_chain_id),
56 })
57}
58
59#[utoipa::path(
64 get,
65 path = "/ready",
66 responses(
67 (status = 200, description = "Service is ready", body = ReadyResponse),
68 (status = 503, description = "Service is not ready", body = ReadyResponse)
69 ),
70 tag = "Health"
71)]
72pub async fn ready(State(app_state): State<AppState>) -> impl IntoResponse {
73 let components = app_state.readiness.reports();
74 let all_ready = app_state.readiness.all_ready();
75
76 let response = crate::models::ReadyResponse {
77 status: if all_ready {
78 "ready".to_string()
79 } else {
80 "not_ready".to_string()
81 },
82 message: if all_ready {
83 None
84 } else {
85 Some("Service is starting up. Please retry shortly.".to_string())
86 },
87 components,
88 };
89
90 if all_ready {
91 (StatusCode::OK, SonicJson(response))
92 } else {
93 (StatusCode::SERVICE_UNAVAILABLE, SonicJson(response))
94 }
95}
96
97pub async fn standby_ready(State(app_state): State<AppState>) -> axum::response::Response {
99 let already_promoted = standby_already_promoted(app_state.standby_promote.as_ref());
100 standby_ready_response_with_startup(
101 app_state.standby_progress.as_deref(),
102 already_promoted,
103 app_state.startup_progress.as_deref(),
104 )
105}
106
107pub fn standby_already_promoted(
108 standby_promote: Option<
109 &std::sync::Arc<tokio::sync::Mutex<Option<tokio::sync::oneshot::Sender<()>>>>,
110 >,
111) -> bool {
112 standby_promote.is_none_or(|p| {
113 p.try_lock().is_ok_and(|guard| guard.is_none())
115 })
116}
117
118pub fn standby_ready_response_with_startup(
119 progress: Option<&dyn StandbyReplayProgress>,
120 already_promoted: bool,
121 startup_progress: Option<&dyn StartupProgressReader>,
122) -> axum::response::Response {
123 match progress {
124 None => (
125 StatusCode::NOT_FOUND,
126 SonicJson(with_startup_progress(
127 json!({"status": "not_standby_mode"}),
128 startup_progress,
129 )),
130 )
131 .into_response(),
132 Some(progress) => {
133 let response = with_startup_progress(
134 standby_status_payload(progress, already_promoted),
135 startup_progress,
136 );
137
138 if already_promoted {
139 (StatusCode::CONFLICT, SonicJson(response)).into_response()
140 } else if progress.is_caught_up() {
141 (StatusCode::OK, SonicJson(response)).into_response()
142 } else {
143 (StatusCode::SERVICE_UNAVAILABLE, SonicJson(response)).into_response()
144 }
145 }
146 }
147}
148
149fn with_startup_progress(
150 mut payload: serde_json::Value,
151 startup_progress: Option<&dyn StartupProgressReader>,
152) -> serde_json::Value {
153 let Some(startup_progress) = startup_progress else {
154 return payload;
155 };
156
157 let snapshot = startup_progress.snapshot();
158 payload["startup_phase"] = json!(snapshot.phase);
159 payload["startup_progress_counter"] = json!(snapshot.counter);
160 payload["last_startup_progress_unix_ms"] = json!(snapshot.last_progress_unix_ms);
161 payload["last_startup_progress_age_ms"] = json!(snapshot.last_progress_age_ms);
162 payload
163}
164
165pub fn standby_status_payload(
166 progress: &dyn StandbyReplayProgress,
167 already_promoted: bool,
168) -> serde_json::Value {
169 let caught_up = progress.is_caught_up();
170 let last_replay_unix_ms = progress.last_replay_unix_ms();
171 let last_replay_age_ms = last_replay_unix_ms.map(|ts| {
172 let now_ms = std::time::SystemTime::now()
173 .duration_since(std::time::UNIX_EPOCH)
174 .expect("system clock before UNIX_EPOCH")
175 .as_millis() as u64;
176 now_ms.saturating_sub(ts)
177 });
178
179 json!({
180 "status": if already_promoted {
181 "already_promoted"
182 } else if caught_up {
183 "standby_ready"
184 } else {
185 "catching_up"
186 },
187 "commands_replayed": progress.commands_replayed(),
188 "caught_up": caught_up,
189 "promotable": caught_up && !already_promoted,
190 "replay_cursor_seq": progress.replay_cursor_seq(),
191 "latest_stream_seq": progress.latest_stream_seq(),
192 "stream_lag": progress.stream_lag(),
193 "last_replayed_seq": progress.last_replayed_seq(),
194 "last_replay_unix_ms": last_replay_unix_ms,
195 "last_replay_age_ms": last_replay_age_ms,
196 })
197}
198
199#[cfg(test)]
200mod tests {
201 use super::standby_status_payload;
202 use crate::runtime_status::StandbyReplayProgress;
203
204 #[derive(Default)]
205 struct TestStandbyReplayProgress {
206 commands_replayed: u64,
207 caught_up: bool,
208 replay_cursor_seq: Option<i64>,
209 latest_stream_seq: Option<u64>,
210 stream_lag: Option<u64>,
211 last_replayed_seq: Option<i64>,
212 last_replay_unix_ms: Option<u64>,
213 }
214
215 impl TestStandbyReplayProgress {
216 fn record_replayed(&mut self, stream_seq: i64) {
217 self.commands_replayed += 1;
218 self.replay_cursor_seq = Some(stream_seq);
219 self.latest_stream_seq = Some(stream_seq as u64);
220 self.stream_lag = Some(0);
221 self.last_replayed_seq = Some(stream_seq);
222 self.last_replay_unix_ms = Some(
223 std::time::SystemTime::now()
224 .duration_since(std::time::UNIX_EPOCH)
225 .expect("system clock before UNIX_EPOCH")
226 .as_millis() as u64,
227 );
228 }
229
230 fn set_caught_up(&mut self) {
231 self.caught_up = true;
232 }
233 }
234
235 impl StandbyReplayProgress for TestStandbyReplayProgress {
236 fn commands_replayed(&self) -> u64 {
237 self.commands_replayed
238 }
239
240 fn is_caught_up(&self) -> bool {
241 self.caught_up
242 }
243
244 fn replay_cursor_seq(&self) -> Option<i64> {
245 self.replay_cursor_seq
246 }
247
248 fn latest_stream_seq(&self) -> Option<u64> {
249 self.latest_stream_seq
250 }
251
252 fn stream_lag(&self) -> Option<u64> {
253 self.stream_lag
254 }
255
256 fn last_replayed_seq(&self) -> Option<i64> {
257 self.last_replayed_seq
258 }
259
260 fn last_replay_unix_ms(&self) -> Option<u64> {
261 self.last_replay_unix_ms
262 }
263 }
264
265 #[test]
266 fn standby_status_payload_reports_catching_up_progress() {
267 let progress = TestStandbyReplayProgress::default();
268
269 let payload = standby_status_payload(&progress, false);
270
271 assert_eq!(payload["status"], "catching_up");
272 assert_eq!(payload["commands_replayed"], 0);
273 assert_eq!(payload["caught_up"], false);
274 assert_eq!(payload["promotable"], false);
275 assert!(payload["replay_cursor_seq"].is_null());
276 assert!(payload["latest_stream_seq"].is_null());
277 assert!(payload["stream_lag"].is_null());
278 assert!(payload["last_replayed_seq"].is_null());
279 assert!(payload["last_replay_unix_ms"].is_null());
280 assert!(payload["last_replay_age_ms"].is_null());
281 }
282
283 #[test]
284 fn standby_status_payload_reports_recent_replay_metadata() {
285 let mut progress = TestStandbyReplayProgress::default();
286 progress.record_replayed(42);
287 progress.set_caught_up();
288
289 let payload = standby_status_payload(&progress, false);
290
291 assert_eq!(payload["status"], "standby_ready");
292 assert_eq!(payload["commands_replayed"], 1);
293 assert_eq!(payload["caught_up"], true);
294 assert_eq!(payload["promotable"], true);
295 assert_eq!(payload["replay_cursor_seq"], 42);
296 assert_eq!(payload["latest_stream_seq"], 42);
297 assert_eq!(payload["stream_lag"], 0);
298 assert_eq!(payload["last_replayed_seq"], 42);
299 assert!(payload["last_replay_unix_ms"].as_u64().is_some());
300 assert!(payload["last_replay_age_ms"].as_u64().is_some());
301 }
302
303 #[test]
304 fn standby_status_payload_reports_already_promoted() {
305 let mut progress = TestStandbyReplayProgress::default();
306 progress.record_replayed(99);
307 progress.set_caught_up();
308
309 let payload = standby_status_payload(&progress, true);
310
311 assert_eq!(payload["status"], "already_promoted");
312 assert_eq!(payload["promotable"], false);
313 }
314}
315
316#[utoipa::path(
318 get,
319 path = "/exchange-info",
320 responses(
321 (status = 200, description = "Exchange configuration", body = ExchangeInfoResponse)
322 ),
323 tag = "Health"
324)]
325pub async fn exchange_info(
326 State(app_state): State<AppState>,
327) -> SonicJson<crate::models::ExchangeInfoResponse> {
328 SonicJson(crate::models::ExchangeInfoResponse {
329 exchange_address: app_state.runtime_config.exchange_contract_address.clone(),
330 chain_id: app_state.runtime_config.signing_chain_id,
331 signing_domain: crate::models::SigningDomainInfo {
332 name: "Hypercall".to_string(),
333 version: "1".to_string(),
334 },
335 })
336}