Skip to main content

hypercall_api/handlers/
health.rs

1use crate::runtime_status::{StandbyReplayProgress, StartupProgressReader};
2use crate::sonic_json::SonicJson;
3use axum::{extract::State, http::StatusCode, response::IntoResponse};
4use serde_json::json;
5
6use super::AppState;
7
8pub use hypercall_runtime_api::drain::drain_marker_path;
9
10/// Health check endpoint
11#[utoipa::path(
12    get,
13    path = "/health",
14    responses(
15        (status = 200, description = "Health status", body = HealthResponse),
16        (status = 503, description = "Service is shutting down", body = HealthResponse)
17    ),
18    tag = "Health"
19)]
20pub async fn health(State(app_state): State<AppState>) -> impl IntoResponse {
21    let shutting_down = app_state.shutdown.is_triggered();
22    let response = crate::models::HealthResponse {
23        status: if shutting_down {
24            "shutting_down".to_string()
25        } else {
26            "ok".to_string()
27        },
28    };
29
30    if shutting_down {
31        (StatusCode::SERVICE_UNAVAILABLE, SonicJson(response))
32    } else {
33        (StatusCode::OK, SonicJson(response))
34    }
35}
36
37/// Version information endpoint
38#[utoipa::path(
39    get,
40    path = "/version",
41    responses(
42        (status = 200, description = "Version information", body = VersionResponse)
43    ),
44    tag = "Health"
45)]
46pub async fn version(
47    State(app_state): State<AppState>,
48) -> SonicJson<crate::models::VersionResponse> {
49    let build_info = &app_state.build_info;
50    SonicJson(crate::models::VersionResponse {
51        version: build_info.version.clone(),
52        commit: build_info.commit.clone(),
53        git_ref: build_info.git_ref.clone(),
54        build_time: build_info.build_time.clone(),
55        signing_chain_id: Some(app_state.runtime_config.signing_chain_id),
56    })
57}
58
59/// Readiness check endpoint
60///
61/// Returns 200 OK when all components are ready to serve requests,
62/// or 503 Service Unavailable when the service is still starting up.
63#[utoipa::path(
64    get,
65    path = "/ready",
66    responses(
67        (status = 200, description = "Service is ready", body = ReadyResponse),
68        (status = 503, description = "Service is not ready", body = ReadyResponse)
69    ),
70    tag = "Health"
71)]
72pub async fn ready(State(app_state): State<AppState>) -> impl IntoResponse {
73    let components = app_state.readiness.reports();
74    let all_ready = app_state.readiness.all_ready();
75
76    let response = crate::models::ReadyResponse {
77        status: if all_ready {
78            "ready".to_string()
79        } else {
80            "not_ready".to_string()
81        },
82        message: if all_ready {
83            None
84        } else {
85            Some("Service is starting up. Please retry shortly.".to_string())
86        },
87        components,
88    };
89
90    if all_ready {
91        (StatusCode::OK, SonicJson(response))
92    } else {
93        (StatusCode::SERVICE_UNAVAILABLE, SonicJson(response))
94    }
95}
96
97/// Standby readiness — returns 200 when the replay loop has caught up with the primary.
98pub async fn standby_ready(State(app_state): State<AppState>) -> axum::response::Response {
99    let already_promoted = standby_already_promoted(app_state.standby_promote.as_ref());
100    standby_ready_response_with_startup(
101        app_state.standby_progress.as_deref(),
102        already_promoted,
103        app_state.startup_progress.as_deref(),
104    )
105}
106
107pub fn standby_already_promoted(
108    standby_promote: Option<
109        &std::sync::Arc<tokio::sync::Mutex<Option<tokio::sync::oneshot::Sender<()>>>>,
110    >,
111) -> bool {
112    standby_promote.is_none_or(|p| {
113        // If promote_tx has been taken, we're already promoted
114        p.try_lock().is_ok_and(|guard| guard.is_none())
115    })
116}
117
118pub fn standby_ready_response_with_startup(
119    progress: Option<&dyn StandbyReplayProgress>,
120    already_promoted: bool,
121    startup_progress: Option<&dyn StartupProgressReader>,
122) -> axum::response::Response {
123    match progress {
124        None => (
125            StatusCode::NOT_FOUND,
126            SonicJson(with_startup_progress(
127                json!({"status": "not_standby_mode"}),
128                startup_progress,
129            )),
130        )
131            .into_response(),
132        Some(progress) => {
133            let response = with_startup_progress(
134                standby_status_payload(progress, already_promoted),
135                startup_progress,
136            );
137
138            if already_promoted {
139                (StatusCode::CONFLICT, SonicJson(response)).into_response()
140            } else if progress.is_caught_up() {
141                (StatusCode::OK, SonicJson(response)).into_response()
142            } else {
143                (StatusCode::SERVICE_UNAVAILABLE, SonicJson(response)).into_response()
144            }
145        }
146    }
147}
148
149fn with_startup_progress(
150    mut payload: serde_json::Value,
151    startup_progress: Option<&dyn StartupProgressReader>,
152) -> serde_json::Value {
153    let Some(startup_progress) = startup_progress else {
154        return payload;
155    };
156
157    let snapshot = startup_progress.snapshot();
158    payload["startup_phase"] = json!(snapshot.phase);
159    payload["startup_progress_counter"] = json!(snapshot.counter);
160    payload["last_startup_progress_unix_ms"] = json!(snapshot.last_progress_unix_ms);
161    payload["last_startup_progress_age_ms"] = json!(snapshot.last_progress_age_ms);
162    payload
163}
164
165pub fn standby_status_payload(
166    progress: &dyn StandbyReplayProgress,
167    already_promoted: bool,
168) -> serde_json::Value {
169    let caught_up = progress.is_caught_up();
170    let last_replay_unix_ms = progress.last_replay_unix_ms();
171    let last_replay_age_ms = last_replay_unix_ms.map(|ts| {
172        let now_ms = std::time::SystemTime::now()
173            .duration_since(std::time::UNIX_EPOCH)
174            .expect("system clock before UNIX_EPOCH")
175            .as_millis() as u64;
176        now_ms.saturating_sub(ts)
177    });
178
179    json!({
180        "status": if already_promoted {
181            "already_promoted"
182        } else if caught_up {
183            "standby_ready"
184        } else {
185            "catching_up"
186        },
187        "commands_replayed": progress.commands_replayed(),
188        "caught_up": caught_up,
189        "promotable": caught_up && !already_promoted,
190        "replay_cursor_seq": progress.replay_cursor_seq(),
191        "latest_stream_seq": progress.latest_stream_seq(),
192        "stream_lag": progress.stream_lag(),
193        "last_replayed_seq": progress.last_replayed_seq(),
194        "last_replay_unix_ms": last_replay_unix_ms,
195        "last_replay_age_ms": last_replay_age_ms,
196    })
197}
198
199#[cfg(test)]
200mod tests {
201    use super::standby_status_payload;
202    use crate::runtime_status::StandbyReplayProgress;
203
204    #[derive(Default)]
205    struct TestStandbyReplayProgress {
206        commands_replayed: u64,
207        caught_up: bool,
208        replay_cursor_seq: Option<i64>,
209        latest_stream_seq: Option<u64>,
210        stream_lag: Option<u64>,
211        last_replayed_seq: Option<i64>,
212        last_replay_unix_ms: Option<u64>,
213    }
214
215    impl TestStandbyReplayProgress {
216        fn record_replayed(&mut self, stream_seq: i64) {
217            self.commands_replayed += 1;
218            self.replay_cursor_seq = Some(stream_seq);
219            self.latest_stream_seq = Some(stream_seq as u64);
220            self.stream_lag = Some(0);
221            self.last_replayed_seq = Some(stream_seq);
222            self.last_replay_unix_ms = Some(
223                std::time::SystemTime::now()
224                    .duration_since(std::time::UNIX_EPOCH)
225                    .expect("system clock before UNIX_EPOCH")
226                    .as_millis() as u64,
227            );
228        }
229
230        fn set_caught_up(&mut self) {
231            self.caught_up = true;
232        }
233    }
234
235    impl StandbyReplayProgress for TestStandbyReplayProgress {
236        fn commands_replayed(&self) -> u64 {
237            self.commands_replayed
238        }
239
240        fn is_caught_up(&self) -> bool {
241            self.caught_up
242        }
243
244        fn replay_cursor_seq(&self) -> Option<i64> {
245            self.replay_cursor_seq
246        }
247
248        fn latest_stream_seq(&self) -> Option<u64> {
249            self.latest_stream_seq
250        }
251
252        fn stream_lag(&self) -> Option<u64> {
253            self.stream_lag
254        }
255
256        fn last_replayed_seq(&self) -> Option<i64> {
257            self.last_replayed_seq
258        }
259
260        fn last_replay_unix_ms(&self) -> Option<u64> {
261            self.last_replay_unix_ms
262        }
263    }
264
265    #[test]
266    fn standby_status_payload_reports_catching_up_progress() {
267        let progress = TestStandbyReplayProgress::default();
268
269        let payload = standby_status_payload(&progress, false);
270
271        assert_eq!(payload["status"], "catching_up");
272        assert_eq!(payload["commands_replayed"], 0);
273        assert_eq!(payload["caught_up"], false);
274        assert_eq!(payload["promotable"], false);
275        assert!(payload["replay_cursor_seq"].is_null());
276        assert!(payload["latest_stream_seq"].is_null());
277        assert!(payload["stream_lag"].is_null());
278        assert!(payload["last_replayed_seq"].is_null());
279        assert!(payload["last_replay_unix_ms"].is_null());
280        assert!(payload["last_replay_age_ms"].is_null());
281    }
282
283    #[test]
284    fn standby_status_payload_reports_recent_replay_metadata() {
285        let mut progress = TestStandbyReplayProgress::default();
286        progress.record_replayed(42);
287        progress.set_caught_up();
288
289        let payload = standby_status_payload(&progress, false);
290
291        assert_eq!(payload["status"], "standby_ready");
292        assert_eq!(payload["commands_replayed"], 1);
293        assert_eq!(payload["caught_up"], true);
294        assert_eq!(payload["promotable"], true);
295        assert_eq!(payload["replay_cursor_seq"], 42);
296        assert_eq!(payload["latest_stream_seq"], 42);
297        assert_eq!(payload["stream_lag"], 0);
298        assert_eq!(payload["last_replayed_seq"], 42);
299        assert!(payload["last_replay_unix_ms"].as_u64().is_some());
300        assert!(payload["last_replay_age_ms"].as_u64().is_some());
301    }
302
303    #[test]
304    fn standby_status_payload_reports_already_promoted() {
305        let mut progress = TestStandbyReplayProgress::default();
306        progress.record_replayed(99);
307        progress.set_caught_up();
308
309        let payload = standby_status_payload(&progress, true);
310
311        assert_eq!(payload["status"], "already_promoted");
312        assert_eq!(payload["promotable"], false);
313    }
314}
315
316/// Public exchange configuration for frontend integration.
317#[utoipa::path(
318    get,
319    path = "/exchange-info",
320    responses(
321        (status = 200, description = "Exchange configuration", body = ExchangeInfoResponse)
322    ),
323    tag = "Health"
324)]
325pub async fn exchange_info(
326    State(app_state): State<AppState>,
327) -> SonicJson<crate::models::ExchangeInfoResponse> {
328    SonicJson(crate::models::ExchangeInfoResponse {
329        exchange_address: app_state.runtime_config.exchange_contract_address.clone(),
330        chain_id: app_state.runtime_config.signing_chain_id,
331        signing_domain: crate::models::SigningDomainInfo {
332            name: "Hypercall".to_string(),
333            version: "1".to_string(),
334        },
335    })
336}