Skip to main content

hypercall_admin/
lifecycle.rs

1//! Standby promotion + drain lifecycle admin endpoints.
2//!
3//! These route handlers were carved out of `handlers::health`. The drain
4//! marker / engine-quiesce helpers are kept local here so the admin surface
5//! depends only on stable lower crates.
6
7use axum::{extract::State, http::StatusCode, response::IntoResponse};
8use serde_json::json;
9
10use crate::state::AdminState;
11use hypercall_runtime_api::drain::{
12    drain_marker_path, keep_drain_after_accepted_quiesce_failure, persist_drain_marker,
13};
14use hypercall_runtime_api::sonic_json::SonicJson;
15use hypercall_runtime_api::StandbyPromoteOutcome;
16
17async fn send_engine_quiesce_action(
18    state: &AdminState,
19    action: hypercall_runtime_api::EngineQuiesceAction,
20) -> Result<hypercall_runtime_api::EngineQuiesceReport, &'static str> {
21    let (tx, rx) = tokio::sync::oneshot::channel();
22    state
23        .engine_quiesce_sender
24        .send(hypercall_runtime_api::EngineQuiesceRequest {
25            action,
26            response_tx: tx,
27        })
28        .await
29        .map_err(|_| "engine quiesce channel is closed")?;
30    tokio::time::timeout(std::time::Duration::from_secs(10), rx)
31        .await
32        .map_err(|_| "engine quiesce timed out")?
33        .map_err(|_| "engine quiesce response channel closed")
34}
35
36async fn resume_after_failed_drain_marker(
37    state: &AdminState,
38) -> Result<hypercall_runtime_api::EngineQuiesceReport, &'static str> {
39    let report =
40        send_engine_quiesce_action(state, hypercall_runtime_api::EngineQuiesceAction::Resume)
41            .await?;
42    state
43        .is_draining
44        .store(false, std::sync::atomic::Ordering::SeqCst);
45    Ok(report)
46}
47
48/// Promote standby to active — stops the replay loop and starts the engine.
49pub async fn admin_promote(State(app_state): State<AdminState>) -> impl IntoResponse {
50    let promote = match &app_state.standby_promote {
51        None => {
52            return (
53                StatusCode::NOT_FOUND,
54                SonicJson(json!({"status": "not_standby_mode"})),
55            )
56        }
57        Some(p) => p,
58    };
59
60    if let Some(controller) = &app_state.standby_controller {
61        match controller.promote().await {
62            StandbyPromoteOutcome::Promoted { queued_orders } => {
63                tracing::info!(queued_orders, "Standby controller promoted");
64            }
65            StandbyPromoteOutcome::AlreadyActive => {
66                tracing::info!("Standby controller was already promoted");
67            }
68        }
69    }
70
71    let mut guard = promote.lock().await;
72    match guard.take() {
73        Some(tx) => {
74            tx.send(()).ok();
75            (
76                StatusCode::OK,
77                SonicJson(json!({
78                    "status": "promoted",
79                    "message": "Engine promoted to active mode",
80                })),
81            )
82        }
83        None => (
84            StatusCode::CONFLICT,
85            SonicJson(json!({"status": "already_promoted"})),
86        ),
87    }
88}
89
90pub async fn admin_drain(State(app_state): State<AdminState>) -> impl IntoResponse {
91    app_state
92        .is_draining
93        .store(true, std::sync::atomic::Ordering::SeqCst);
94    app_state.drain_signal.notify_waiters();
95
96    let marker_path = drain_marker_path(&app_state.runtime_config.wal_path);
97
98    // Wait for in-flight orders to finish processing.
99    // The order_sender channel capacity tells us whether there are pending requests:
100    // when capacity == max_capacity, no sends are buffered and the engine has consumed
101    // everything queued before the drain flag was set.
102    // Poll up to 5 seconds (50 * 100ms) for the channel to empty.
103    let max_cap = app_state.order_sender.max_capacity();
104    for _ in 0..50 {
105        if app_state.order_sender.capacity() == max_cap {
106            break;
107        }
108        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
109    }
110
111    let (tx, rx) = tokio::sync::oneshot::channel();
112    if app_state
113        .engine_quiesce_sender
114        .send(hypercall_runtime_api::EngineQuiesceRequest {
115            action: hypercall_runtime_api::EngineQuiesceAction::Quiesce,
116            response_tx: tx,
117        })
118        .await
119        .is_err()
120    {
121        app_state
122            .is_draining
123            .store(false, std::sync::atomic::Ordering::SeqCst);
124        return (
125            StatusCode::SERVICE_UNAVAILABLE,
126            SonicJson(json!({
127                "status": "error",
128                "message": "engine quiesce channel is closed",
129            })),
130        )
131            .into_response();
132    }
133
134    let report = match tokio::time::timeout(std::time::Duration::from_secs(10), rx).await {
135        Ok(Ok(report)) => report,
136        Ok(Err(_)) => {
137            return keep_drain_after_accepted_quiesce_failure(
138                &app_state.is_draining,
139                &marker_path,
140                StatusCode::SERVICE_UNAVAILABLE,
141                "engine quiesce response channel closed after request was accepted",
142            )
143            .await;
144        }
145        Err(_) => {
146            return keep_drain_after_accepted_quiesce_failure(
147                &app_state.is_draining,
148                &marker_path,
149                StatusCode::GATEWAY_TIMEOUT,
150                "engine quiesce timed out after request was accepted",
151            )
152            .await;
153        }
154    };
155
156    if let Some(parent) = marker_path.parent() {
157        if let Err(error) = tokio::fs::create_dir_all(parent).await {
158            return match resume_after_failed_drain_marker(&app_state).await {
159                Ok(resume_report) => (
160                    StatusCode::INTERNAL_SERVER_ERROR,
161                    SonicJson(json!({
162                        "status": "error",
163                        "message": format!("failed to create drain marker directory: {error}"),
164                        "rollback": "engine resumed and drain flag cleared",
165                        "engine": resume_report,
166                    })),
167                )
168                    .into_response(),
169                Err(resume_error) => (
170                    StatusCode::INTERNAL_SERVER_ERROR,
171                    SonicJson(json!({
172                        "status": "error",
173                        "message": format!("failed to create drain marker directory: {error}"),
174                        "rollback_error": resume_error,
175                        "draining": true,
176                    })),
177                )
178                    .into_response(),
179            };
180        }
181    }
182    if let Err(error) = tokio::fs::write(&marker_path, b"drained\n").await {
183        return match resume_after_failed_drain_marker(&app_state).await {
184            Ok(resume_report) => (
185                StatusCode::INTERNAL_SERVER_ERROR,
186                SonicJson(json!({
187                    "status": "error",
188                    "message": format!("failed to persist drain marker: {error}"),
189                    "rollback": "engine resumed and drain flag cleared",
190                    "engine": resume_report,
191                })),
192            )
193                .into_response(),
194            Err(resume_error) => (
195                StatusCode::INTERNAL_SERVER_ERROR,
196                SonicJson(json!({
197                    "status": "error",
198                    "message": format!("failed to persist drain marker: {error}"),
199                    "rollback_error": resume_error,
200                    "draining": true,
201                })),
202            )
203                .into_response(),
204        };
205    }
206
207    (
208        StatusCode::OK,
209        SonicJson(json!({
210            "status": "drained",
211            "message": "Server drained. No new orders will be accepted. Safe to switch traffic.",
212            "engine": report,
213            "drain_marker": marker_path,
214        })),
215    )
216        .into_response()
217}
218
219pub async fn admin_undrain(State(app_state): State<AdminState>) -> impl IntoResponse {
220    let marker_path = drain_marker_path(&app_state.runtime_config.wal_path);
221    let mut removed_marker = false;
222    if let Err(error) = tokio::fs::remove_file(&marker_path).await {
223        if error.kind() != std::io::ErrorKind::NotFound {
224            return (
225                StatusCode::INTERNAL_SERVER_ERROR,
226                SonicJson(json!({
227                    "status": "error",
228                    "message": format!("failed to remove drain marker: {error}"),
229                })),
230            )
231                .into_response();
232        }
233    } else {
234        removed_marker = true;
235    }
236
237    let (tx, rx) = tokio::sync::oneshot::channel();
238    if app_state
239        .engine_quiesce_sender
240        .send(hypercall_runtime_api::EngineQuiesceRequest {
241            action: hypercall_runtime_api::EngineQuiesceAction::Resume,
242            response_tx: tx,
243        })
244        .await
245        .is_err()
246    {
247        let marker_restore_error = if removed_marker {
248            persist_drain_marker(&marker_path).await.err()
249        } else {
250            None
251        };
252        return (
253            StatusCode::SERVICE_UNAVAILABLE,
254            SonicJson(json!({
255                "status": "error",
256                "message": "engine quiesce channel is closed",
257                "draining": true,
258                "marker_restore_error": marker_restore_error,
259            })),
260        )
261            .into_response();
262    }
263
264    let report = match tokio::time::timeout(std::time::Duration::from_secs(10), rx).await {
265        Ok(Ok(report)) => report,
266        Ok(Err(_)) => {
267            let marker_restore_error = if removed_marker {
268                persist_drain_marker(&marker_path).await.err()
269            } else {
270                None
271            };
272            return (
273                StatusCode::SERVICE_UNAVAILABLE,
274                SonicJson(json!({
275                    "status": "error",
276                    "message": "engine resume response channel closed",
277                    "draining": true,
278                    "marker_restore_error": marker_restore_error,
279                })),
280            )
281                .into_response();
282        }
283        Err(_) => {
284            let marker_restore_error = if removed_marker {
285                persist_drain_marker(&marker_path).await.err()
286            } else {
287                None
288            };
289            return (
290                StatusCode::GATEWAY_TIMEOUT,
291                SonicJson(json!({
292                    "status": "error",
293                    "message": "engine resume timed out",
294                    "draining": true,
295                    "marker_restore_error": marker_restore_error,
296                })),
297            )
298                .into_response();
299        }
300    };
301
302    app_state
303        .is_draining
304        .store(false, std::sync::atomic::Ordering::SeqCst);
305
306    (
307        StatusCode::OK,
308        SonicJson(json!({
309            "status": "undrained",
310            "message": "Server accepting orders again.",
311            "engine": report,
312        })),
313    )
314        .into_response()
315}
316
317pub async fn drain_status(State(app_state): State<AdminState>) -> impl IntoResponse {
318    let draining = app_state
319        .is_draining
320        .load(std::sync::atomic::Ordering::Relaxed);
321    (
322        StatusCode::OK,
323        SonicJson(json!({
324            "draining": draining,
325        })),
326    )
327}