1use chrono::{DateTime, Utc};
2use metrics::gauge;
3use serde::{Deserialize, Serialize};
4use std::sync::{Arc, RwLock};
5
6#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize, utoipa::ToSchema)]
7#[serde(rename_all = "snake_case")]
8pub enum RecoverySafetyStatus {
9 Pass,
10 Fail,
11}
12
13#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
14pub struct RecoverySafetyCheck {
15 pub name: String,
16 pub status: RecoverySafetyStatus,
17 pub message: String,
18}
19
20#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
21pub struct RecoverySafetyCheckpoint {
22 pub last_command_id: i64,
23 pub last_l2_seq: i64,
24 pub wal_offset: u64,
25}
26
27#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
28pub struct RecoverySafetySnapshot {
29 pub path: String,
30 pub present: bool,
31 pub loaded: bool,
32 pub last_command_id: Option<i64>,
33 pub last_l2_seq: Option<i64>,
34 pub order_count: Option<usize>,
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
38pub struct RecoverySafetyReplayCoverage {
39 pub checked: bool,
40 pub min_command_id: Option<i64>,
41 pub max_command_id: Option<i64>,
42}
43
44#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
45pub struct RecoverySafetyDrainMarker {
46 pub path: String,
47 pub present_at_startup: bool,
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
51pub struct RecoverySafetyCash {
52 pub validation_checked: bool,
53 pub validation_passed: Option<bool>,
54 pub message: Option<String>,
55}
56
57#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
58pub struct RecoverySafetyReport {
59 pub status: RecoverySafetyStatus,
60 pub checked_at: DateTime<Utc>,
61 pub wal_path: String,
62 pub wal_path_configured: bool,
63 pub require_snapshot_restore: bool,
64 pub unsafe_replay_override: bool,
65 pub cash_divergence_override: bool,
66 pub checkpoint: RecoverySafetyCheckpoint,
67 pub snapshot: Option<RecoverySafetySnapshot>,
68 pub replay_coverage: RecoverySafetyReplayCoverage,
69 pub drain_marker: RecoverySafetyDrainMarker,
70 pub cash: RecoverySafetyCash,
71 pub checks: Vec<RecoverySafetyCheck>,
72}
73
74#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
75pub struct RecoverySafetyBuildInfo {
76 pub version: String,
77 pub commit: String,
78 pub git_ref: String,
79 pub build_time: String,
80}
81
82#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
83pub struct RecoverySafetyMonitoringResponse {
84 pub status: RecoverySafetyStatus,
85 pub build_info: RecoverySafetyBuildInfo,
86 pub report: RecoverySafetyReport,
87}
88
89pub type SharedRecoverySafetyReport = Arc<RwLock<Option<RecoverySafetyReport>>>;
90
91pub fn shared_report() -> SharedRecoverySafetyReport {
92 let report = Arc::new(RwLock::new(None));
93 gauge!("ht_recovery_safety_gate_passed", "service" => "api").set(0.0);
96 report
97}
98
99impl RecoverySafetyReport {
100 #[allow(clippy::too_many_arguments)]
101 pub fn new(
102 wal_path: String,
103 wal_path_configured: bool,
104 require_snapshot_restore: bool,
105 unsafe_replay_override: bool,
106 cash_divergence_override: bool,
107 checkpoint: RecoverySafetyCheckpoint,
108 drain_marker: RecoverySafetyDrainMarker,
109 ) -> Self {
110 Self {
111 status: RecoverySafetyStatus::Pass,
112 checked_at: Utc::now(),
113 wal_path,
114 wal_path_configured,
115 require_snapshot_restore,
116 unsafe_replay_override,
117 cash_divergence_override,
118 checkpoint,
119 snapshot: None,
120 replay_coverage: RecoverySafetyReplayCoverage {
121 checked: false,
122 min_command_id: None,
123 max_command_id: None,
124 },
125 drain_marker,
126 cash: RecoverySafetyCash {
127 validation_checked: false,
128 validation_passed: None,
129 message: None,
130 },
131 checks: Vec::new(),
132 }
133 }
134
135 pub fn pass(&mut self, name: impl Into<String>, message: impl Into<String>) {
136 self.checks.push(RecoverySafetyCheck {
137 name: name.into(),
138 status: RecoverySafetyStatus::Pass,
139 message: message.into(),
140 });
141 }
142
143 pub fn fail(&mut self, name: impl Into<String>, message: impl Into<String>) {
144 self.status = RecoverySafetyStatus::Fail;
145 self.checks.push(RecoverySafetyCheck {
146 name: name.into(),
147 status: RecoverySafetyStatus::Fail,
148 message: message.into(),
149 });
150 }
151}
152
153pub fn store_report(target: &Option<SharedRecoverySafetyReport>, report: &RecoverySafetyReport) {
154 if let Some(target) = target {
155 let mut guard = target
156 .write()
157 .expect("recovery safety report lock poisoned");
158 *guard = Some(report.clone());
159 }
160 let value = match report.status {
161 RecoverySafetyStatus::Pass => 1.0,
162 RecoverySafetyStatus::Fail => 0.0,
163 };
164 gauge!("ht_recovery_safety_gate_passed", "service" => "api").set(value);
165}
166
167#[cfg(test)]
168mod tests {
169 use super::*;
170
171 fn test_report() -> RecoverySafetyReport {
172 RecoverySafetyReport::new(
173 "/tmp/engine.wal".to_string(),
174 true,
175 false,
176 false,
177 false,
178 RecoverySafetyCheckpoint {
179 last_command_id: 42,
180 last_l2_seq: 7,
181 wal_offset: 128,
182 },
183 RecoverySafetyDrainMarker {
184 path: "/tmp/engine.drain".to_string(),
185 present_at_startup: false,
186 },
187 )
188 }
189
190 #[test]
191 fn recovery_safety_report_starts_passing_and_records_failures() {
192 let mut report = test_report();
193 report.pass("snapshot_decode", "snapshot decoded");
194 assert_eq!(report.status, RecoverySafetyStatus::Pass);
195
196 report.fail("drain_marker", "persistent drain marker present at startup");
197
198 assert_eq!(report.status, RecoverySafetyStatus::Fail);
199 assert_eq!(report.checks.len(), 2);
200 assert_eq!(report.checks[1].name, "drain_marker");
201 }
202
203 #[test]
204 fn shared_recovery_safety_report_stores_latest_report() {
205 let shared = shared_report();
206 let mut report = test_report();
207 report.fail("standard_margin_cash", "snapshot_cash=-1 durable_cash=1");
208
209 store_report(&Some(shared.clone()), &report);
210
211 let stored = shared
212 .read()
213 .expect("lock")
214 .clone()
215 .expect("report should be stored");
216 assert_eq!(stored.status, RecoverySafetyStatus::Fail);
217 assert_eq!(stored.checks[0].name, "standard_margin_cash");
218 }
219}