1use chrono::{DateTime, Utc};
2use metrics::gauge;
3use serde::{Deserialize, Serialize};
4use std::sync::{Arc, RwLock};
5
6#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize, utoipa::ToSchema)]
7#[serde(rename_all = "snake_case")]
8pub enum RecoverySafetyStatus {
9 Pass,
10 Fail,
11}
12
13#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
14pub struct RecoverySafetyCheck {
15 pub name: String,
16 pub status: RecoverySafetyStatus,
17 pub message: String,
18}
19
20#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
21pub struct RecoverySafetyCheckpoint {
22 pub last_command_id: i64,
23 pub last_l2_seq: i64,
24 pub wal_offset: u64,
25}
26
27#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
28pub struct RecoverySafetySnapshot {
29 pub path: String,
30 pub present: bool,
31 pub loaded: bool,
32 pub last_command_id: Option<i64>,
33 pub last_l2_seq: Option<i64>,
34 pub order_count: Option<usize>,
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
38pub struct RecoverySafetyReplayCoverage {
39 pub checked: bool,
40 pub min_command_id: Option<i64>,
41 pub max_command_id: Option<i64>,
42}
43
44#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
45pub struct RecoverySafetyDrainMarker {
46 pub path: String,
47 pub present_at_startup: bool,
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
51pub struct RecoverySafetyCash {
52 pub validation_checked: bool,
53 pub validation_passed: Option<bool>,
54 pub message: Option<String>,
55}
56
57#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
58pub struct RecoverySafetyReport {
59 pub status: RecoverySafetyStatus,
60 pub checked_at: DateTime<Utc>,
61 pub wal_path: String,
62 pub wal_path_configured: bool,
63 pub require_snapshot_restore: bool,
64 pub unsafe_replay_override: bool,
65 pub cash_divergence_override: bool,
66 pub checkpoint: RecoverySafetyCheckpoint,
67 pub snapshot: Option<RecoverySafetySnapshot>,
68 pub replay_coverage: RecoverySafetyReplayCoverage,
69 pub drain_marker: RecoverySafetyDrainMarker,
70 pub cash: RecoverySafetyCash,
71 pub checks: Vec<RecoverySafetyCheck>,
72}
73
74#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
75pub struct RecoverySafetyBuildInfo {
76 pub version: String,
77 pub commit: String,
78 pub git_ref: String,
79 pub build_time: String,
80}
81
82#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
83pub struct RecoverySafetyMonitoringResponse {
84 pub status: RecoverySafetyStatus,
85 pub build_info: RecoverySafetyBuildInfo,
86 pub report: RecoverySafetyReport,
87}
88
89#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
90pub struct RecoverySafetyAlertPoint {
91 pub status: RecoverySafetyStatus,
92 pub value: i64,
93 pub message: String,
94}
95
96pub type SharedRecoverySafetyReport = Arc<RwLock<Option<RecoverySafetyReport>>>;
97
98pub fn shared_report() -> SharedRecoverySafetyReport {
99 let report = Arc::new(RwLock::new(None));
100 gauge!("ht_recovery_safety_gate_passed", "service" => "api").set(0.0);
103 report
104}
105
106impl RecoverySafetyReport {
107 #[allow(clippy::too_many_arguments)]
108 pub fn new(
109 wal_path: String,
110 wal_path_configured: bool,
111 require_snapshot_restore: bool,
112 unsafe_replay_override: bool,
113 cash_divergence_override: bool,
114 checkpoint: RecoverySafetyCheckpoint,
115 drain_marker: RecoverySafetyDrainMarker,
116 ) -> Self {
117 Self {
118 status: RecoverySafetyStatus::Pass,
119 checked_at: Utc::now(),
120 wal_path,
121 wal_path_configured,
122 require_snapshot_restore,
123 unsafe_replay_override,
124 cash_divergence_override,
125 checkpoint,
126 snapshot: None,
127 replay_coverage: RecoverySafetyReplayCoverage {
128 checked: false,
129 min_command_id: None,
130 max_command_id: None,
131 },
132 drain_marker,
133 cash: RecoverySafetyCash {
134 validation_checked: false,
135 validation_passed: None,
136 message: None,
137 },
138 checks: Vec::new(),
139 }
140 }
141
142 pub fn pass(&mut self, name: impl Into<String>, message: impl Into<String>) {
143 self.checks.push(RecoverySafetyCheck {
144 name: name.into(),
145 status: RecoverySafetyStatus::Pass,
146 message: message.into(),
147 });
148 }
149
150 pub fn fail(&mut self, name: impl Into<String>, message: impl Into<String>) {
151 self.status = RecoverySafetyStatus::Fail;
152 self.checks.push(RecoverySafetyCheck {
153 name: name.into(),
154 status: RecoverySafetyStatus::Fail,
155 message: message.into(),
156 });
157 }
158}
159
160pub fn store_report(target: &Option<SharedRecoverySafetyReport>, report: &RecoverySafetyReport) {
161 if let Some(target) = target {
162 let mut guard = target
163 .write()
164 .expect("recovery safety report lock poisoned");
165 *guard = Some(report.clone());
166 }
167 let value = match report.status {
168 RecoverySafetyStatus::Pass => 1.0,
169 RecoverySafetyStatus::Fail => 0.0,
170 };
171 gauge!("ht_recovery_safety_gate_passed", "service" => "api").set(value);
172}
173
174#[cfg(test)]
175mod tests {
176 use super::*;
177
178 fn test_report() -> RecoverySafetyReport {
179 RecoverySafetyReport::new(
180 "/tmp/engine.wal".to_string(),
181 true,
182 false,
183 false,
184 false,
185 RecoverySafetyCheckpoint {
186 last_command_id: 42,
187 last_l2_seq: 7,
188 wal_offset: 128,
189 },
190 RecoverySafetyDrainMarker {
191 path: "/tmp/engine.drain".to_string(),
192 present_at_startup: false,
193 },
194 )
195 }
196
197 #[test]
198 fn recovery_safety_report_starts_passing_and_records_failures() {
199 let mut report = test_report();
200 report.pass("snapshot_decode", "snapshot decoded");
201 assert_eq!(report.status, RecoverySafetyStatus::Pass);
202
203 report.fail("drain_marker", "persistent drain marker present at startup");
204
205 assert_eq!(report.status, RecoverySafetyStatus::Fail);
206 assert_eq!(report.checks.len(), 2);
207 assert_eq!(report.checks[1].name, "drain_marker");
208 }
209
210 #[test]
211 fn shared_recovery_safety_report_stores_latest_report() {
212 let shared = shared_report();
213 let mut report = test_report();
214 report.fail("standard_margin_cash", "snapshot_cash=-1 durable_cash=1");
215
216 store_report(&Some(shared.clone()), &report);
217
218 let stored = shared
219 .read()
220 .expect("lock")
221 .clone()
222 .expect("report should be stored");
223 assert_eq!(stored.status, RecoverySafetyStatus::Fail);
224 assert_eq!(stored.checks[0].name, "standard_margin_cash");
225 }
226}