1use axum::{extract::State, http::StatusCode, response::IntoResponse};
8use serde_json::json;
9
10use crate::state::AdminState;
11use hypercall_runtime_api::drain::{
12 drain_marker_path, keep_drain_after_accepted_quiesce_failure, persist_drain_marker,
13};
14use hypercall_runtime_api::sonic_json::SonicJson;
15use hypercall_runtime_api::StandbyPromoteOutcome;
16
17async fn send_engine_quiesce_action(
18 state: &AdminState,
19 action: hypercall_runtime_api::EngineQuiesceAction,
20) -> Result<hypercall_runtime_api::EngineQuiesceReport, &'static str> {
21 let (tx, rx) = tokio::sync::oneshot::channel();
22 state
23 .engine_quiesce_sender
24 .send(hypercall_runtime_api::EngineQuiesceRequest {
25 action,
26 response_tx: tx,
27 })
28 .await
29 .map_err(|_| "engine quiesce channel is closed")?;
30 tokio::time::timeout(std::time::Duration::from_secs(10), rx)
31 .await
32 .map_err(|_| "engine quiesce timed out")?
33 .map_err(|_| "engine quiesce response channel closed")
34}
35
36async fn resume_after_failed_drain_marker(
37 state: &AdminState,
38) -> Result<hypercall_runtime_api::EngineQuiesceReport, &'static str> {
39 let report =
40 send_engine_quiesce_action(state, hypercall_runtime_api::EngineQuiesceAction::Resume)
41 .await?;
42 state
43 .is_draining
44 .store(false, std::sync::atomic::Ordering::SeqCst);
45 Ok(report)
46}
47
48pub async fn admin_promote(State(app_state): State<AdminState>) -> impl IntoResponse {
50 let promote = match &app_state.standby_promote {
51 None => {
52 return (
53 StatusCode::NOT_FOUND,
54 SonicJson(json!({"status": "not_standby_mode"})),
55 )
56 }
57 Some(p) => p,
58 };
59
60 if let Some(controller) = &app_state.standby_controller {
61 match controller.promote().await {
62 StandbyPromoteOutcome::Promoted { queued_orders } => {
63 tracing::info!(queued_orders, "Standby controller promoted");
64 }
65 StandbyPromoteOutcome::AlreadyActive => {
66 tracing::info!("Standby controller was already promoted");
67 }
68 }
69 }
70
71 let mut guard = promote.lock().await;
72 match guard.take() {
73 Some(tx) => {
74 tx.send(()).ok();
75 (
76 StatusCode::OK,
77 SonicJson(json!({
78 "status": "promoted",
79 "message": "Engine promoted to active mode",
80 })),
81 )
82 }
83 None => (
84 StatusCode::CONFLICT,
85 SonicJson(json!({"status": "already_promoted"})),
86 ),
87 }
88}
89
90pub async fn admin_drain(State(app_state): State<AdminState>) -> impl IntoResponse {
91 app_state
92 .is_draining
93 .store(true, std::sync::atomic::Ordering::SeqCst);
94 app_state.drain_signal.notify_waiters();
95
96 let marker_path = drain_marker_path(&app_state.runtime_config.wal_path);
97
98 let max_cap = app_state.order_sender.max_capacity();
104 for _ in 0..50 {
105 if app_state.order_sender.capacity() == max_cap {
106 break;
107 }
108 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
109 }
110
111 let (tx, rx) = tokio::sync::oneshot::channel();
112 if app_state
113 .engine_quiesce_sender
114 .send(hypercall_runtime_api::EngineQuiesceRequest {
115 action: hypercall_runtime_api::EngineQuiesceAction::Quiesce,
116 response_tx: tx,
117 })
118 .await
119 .is_err()
120 {
121 app_state
122 .is_draining
123 .store(false, std::sync::atomic::Ordering::SeqCst);
124 return (
125 StatusCode::SERVICE_UNAVAILABLE,
126 SonicJson(json!({
127 "status": "error",
128 "message": "engine quiesce channel is closed",
129 })),
130 )
131 .into_response();
132 }
133
134 let report = match tokio::time::timeout(std::time::Duration::from_secs(10), rx).await {
135 Ok(Ok(report)) => report,
136 Ok(Err(_)) => {
137 return keep_drain_after_accepted_quiesce_failure(
138 &app_state.is_draining,
139 &marker_path,
140 StatusCode::SERVICE_UNAVAILABLE,
141 "engine quiesce response channel closed after request was accepted",
142 )
143 .await;
144 }
145 Err(_) => {
146 return keep_drain_after_accepted_quiesce_failure(
147 &app_state.is_draining,
148 &marker_path,
149 StatusCode::GATEWAY_TIMEOUT,
150 "engine quiesce timed out after request was accepted",
151 )
152 .await;
153 }
154 };
155
156 if let Some(parent) = marker_path.parent() {
157 if let Err(error) = tokio::fs::create_dir_all(parent).await {
158 return match resume_after_failed_drain_marker(&app_state).await {
159 Ok(resume_report) => (
160 StatusCode::INTERNAL_SERVER_ERROR,
161 SonicJson(json!({
162 "status": "error",
163 "message": format!("failed to create drain marker directory: {error}"),
164 "rollback": "engine resumed and drain flag cleared",
165 "engine": resume_report,
166 })),
167 )
168 .into_response(),
169 Err(resume_error) => (
170 StatusCode::INTERNAL_SERVER_ERROR,
171 SonicJson(json!({
172 "status": "error",
173 "message": format!("failed to create drain marker directory: {error}"),
174 "rollback_error": resume_error,
175 "draining": true,
176 })),
177 )
178 .into_response(),
179 };
180 }
181 }
182 if let Err(error) = tokio::fs::write(&marker_path, b"drained\n").await {
183 return match resume_after_failed_drain_marker(&app_state).await {
184 Ok(resume_report) => (
185 StatusCode::INTERNAL_SERVER_ERROR,
186 SonicJson(json!({
187 "status": "error",
188 "message": format!("failed to persist drain marker: {error}"),
189 "rollback": "engine resumed and drain flag cleared",
190 "engine": resume_report,
191 })),
192 )
193 .into_response(),
194 Err(resume_error) => (
195 StatusCode::INTERNAL_SERVER_ERROR,
196 SonicJson(json!({
197 "status": "error",
198 "message": format!("failed to persist drain marker: {error}"),
199 "rollback_error": resume_error,
200 "draining": true,
201 })),
202 )
203 .into_response(),
204 };
205 }
206
207 (
208 StatusCode::OK,
209 SonicJson(json!({
210 "status": "drained",
211 "message": "Server drained. No new orders will be accepted. Safe to switch traffic.",
212 "engine": report,
213 "drain_marker": marker_path,
214 })),
215 )
216 .into_response()
217}
218
219pub async fn admin_undrain(State(app_state): State<AdminState>) -> impl IntoResponse {
220 let marker_path = drain_marker_path(&app_state.runtime_config.wal_path);
221 let mut removed_marker = false;
222 if let Err(error) = tokio::fs::remove_file(&marker_path).await {
223 if error.kind() != std::io::ErrorKind::NotFound {
224 return (
225 StatusCode::INTERNAL_SERVER_ERROR,
226 SonicJson(json!({
227 "status": "error",
228 "message": format!("failed to remove drain marker: {error}"),
229 })),
230 )
231 .into_response();
232 }
233 } else {
234 removed_marker = true;
235 }
236
237 let (tx, rx) = tokio::sync::oneshot::channel();
238 if app_state
239 .engine_quiesce_sender
240 .send(hypercall_runtime_api::EngineQuiesceRequest {
241 action: hypercall_runtime_api::EngineQuiesceAction::Resume,
242 response_tx: tx,
243 })
244 .await
245 .is_err()
246 {
247 let marker_restore_error = if removed_marker {
248 persist_drain_marker(&marker_path).await.err()
249 } else {
250 None
251 };
252 return (
253 StatusCode::SERVICE_UNAVAILABLE,
254 SonicJson(json!({
255 "status": "error",
256 "message": "engine quiesce channel is closed",
257 "draining": true,
258 "marker_restore_error": marker_restore_error,
259 })),
260 )
261 .into_response();
262 }
263
264 let report = match tokio::time::timeout(std::time::Duration::from_secs(10), rx).await {
265 Ok(Ok(report)) => report,
266 Ok(Err(_)) => {
267 let marker_restore_error = if removed_marker {
268 persist_drain_marker(&marker_path).await.err()
269 } else {
270 None
271 };
272 return (
273 StatusCode::SERVICE_UNAVAILABLE,
274 SonicJson(json!({
275 "status": "error",
276 "message": "engine resume response channel closed",
277 "draining": true,
278 "marker_restore_error": marker_restore_error,
279 })),
280 )
281 .into_response();
282 }
283 Err(_) => {
284 let marker_restore_error = if removed_marker {
285 persist_drain_marker(&marker_path).await.err()
286 } else {
287 None
288 };
289 return (
290 StatusCode::GATEWAY_TIMEOUT,
291 SonicJson(json!({
292 "status": "error",
293 "message": "engine resume timed out",
294 "draining": true,
295 "marker_restore_error": marker_restore_error,
296 })),
297 )
298 .into_response();
299 }
300 };
301
302 app_state
303 .is_draining
304 .store(false, std::sync::atomic::Ordering::SeqCst);
305
306 (
307 StatusCode::OK,
308 SonicJson(json!({
309 "status": "undrained",
310 "message": "Server accepting orders again.",
311 "engine": report,
312 })),
313 )
314 .into_response()
315}
316
317pub async fn drain_status(State(app_state): State<AdminState>) -> impl IntoResponse {
318 let draining = app_state
319 .is_draining
320 .load(std::sync::atomic::Ordering::Relaxed);
321 (
322 StatusCode::OK,
323 SonicJson(json!({
324 "draining": draining,
325 })),
326 )
327}