Skip to main content

hypercall/
recovery_safety.rs

1use chrono::{DateTime, Utc};
2use metrics::gauge;
3use serde::{Deserialize, Serialize};
4use std::sync::{Arc, RwLock};
5
6#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize, utoipa::ToSchema)]
7#[serde(rename_all = "snake_case")]
8pub enum RecoverySafetyStatus {
9    Pass,
10    Fail,
11}
12
13#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
14pub struct RecoverySafetyCheck {
15    pub name: String,
16    pub status: RecoverySafetyStatus,
17    pub message: String,
18}
19
20#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
21pub struct RecoverySafetyCheckpoint {
22    pub last_command_id: i64,
23    pub last_l2_seq: i64,
24    pub wal_offset: u64,
25}
26
27#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
28pub struct RecoverySafetySnapshot {
29    pub path: String,
30    pub present: bool,
31    pub loaded: bool,
32    pub last_command_id: Option<i64>,
33    pub last_l2_seq: Option<i64>,
34    pub order_count: Option<usize>,
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
38pub struct RecoverySafetyReplayCoverage {
39    pub checked: bool,
40    pub min_command_id: Option<i64>,
41    pub max_command_id: Option<i64>,
42}
43
44#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
45pub struct RecoverySafetyDrainMarker {
46    pub path: String,
47    pub present_at_startup: bool,
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
51pub struct RecoverySafetyCash {
52    pub validation_checked: bool,
53    pub validation_passed: Option<bool>,
54    pub message: Option<String>,
55}
56
57#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
58pub struct RecoverySafetyReport {
59    pub status: RecoverySafetyStatus,
60    pub checked_at: DateTime<Utc>,
61    pub wal_path: String,
62    pub wal_path_configured: bool,
63    pub require_snapshot_restore: bool,
64    pub unsafe_replay_override: bool,
65    pub cash_divergence_override: bool,
66    pub checkpoint: RecoverySafetyCheckpoint,
67    pub snapshot: Option<RecoverySafetySnapshot>,
68    pub replay_coverage: RecoverySafetyReplayCoverage,
69    pub drain_marker: RecoverySafetyDrainMarker,
70    pub cash: RecoverySafetyCash,
71    pub checks: Vec<RecoverySafetyCheck>,
72}
73
74#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
75pub struct RecoverySafetyBuildInfo {
76    pub version: String,
77    pub commit: String,
78    pub git_ref: String,
79    pub build_time: String,
80}
81
82#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
83pub struct RecoverySafetyMonitoringResponse {
84    pub status: RecoverySafetyStatus,
85    pub build_info: RecoverySafetyBuildInfo,
86    pub report: RecoverySafetyReport,
87}
88
89pub type SharedRecoverySafetyReport = Arc<RwLock<Option<RecoverySafetyReport>>>;
90
91pub fn shared_report() -> SharedRecoverySafetyReport {
92    let report = Arc::new(RwLock::new(None));
93    // Start pessimistic until startup writes the first gate result.
94    // This makes missing or delayed reporting visible in alerts.
95    gauge!("ht_recovery_safety_gate_passed", "service" => "api").set(0.0);
96    report
97}
98
99impl RecoverySafetyReport {
100    #[allow(clippy::too_many_arguments)]
101    pub fn new(
102        wal_path: String,
103        wal_path_configured: bool,
104        require_snapshot_restore: bool,
105        unsafe_replay_override: bool,
106        cash_divergence_override: bool,
107        checkpoint: RecoverySafetyCheckpoint,
108        drain_marker: RecoverySafetyDrainMarker,
109    ) -> Self {
110        Self {
111            status: RecoverySafetyStatus::Pass,
112            checked_at: Utc::now(),
113            wal_path,
114            wal_path_configured,
115            require_snapshot_restore,
116            unsafe_replay_override,
117            cash_divergence_override,
118            checkpoint,
119            snapshot: None,
120            replay_coverage: RecoverySafetyReplayCoverage {
121                checked: false,
122                min_command_id: None,
123                max_command_id: None,
124            },
125            drain_marker,
126            cash: RecoverySafetyCash {
127                validation_checked: false,
128                validation_passed: None,
129                message: None,
130            },
131            checks: Vec::new(),
132        }
133    }
134
135    pub fn pass(&mut self, name: impl Into<String>, message: impl Into<String>) {
136        self.checks.push(RecoverySafetyCheck {
137            name: name.into(),
138            status: RecoverySafetyStatus::Pass,
139            message: message.into(),
140        });
141    }
142
143    pub fn fail(&mut self, name: impl Into<String>, message: impl Into<String>) {
144        self.status = RecoverySafetyStatus::Fail;
145        self.checks.push(RecoverySafetyCheck {
146            name: name.into(),
147            status: RecoverySafetyStatus::Fail,
148            message: message.into(),
149        });
150    }
151}
152
153pub fn store_report(target: &Option<SharedRecoverySafetyReport>, report: &RecoverySafetyReport) {
154    if let Some(target) = target {
155        let mut guard = target
156            .write()
157            .expect("recovery safety report lock poisoned");
158        *guard = Some(report.clone());
159    }
160    let value = match report.status {
161        RecoverySafetyStatus::Pass => 1.0,
162        RecoverySafetyStatus::Fail => 0.0,
163    };
164    gauge!("ht_recovery_safety_gate_passed", "service" => "api").set(value);
165}
166
167#[cfg(test)]
168mod tests {
169    use super::*;
170
171    fn test_report() -> RecoverySafetyReport {
172        RecoverySafetyReport::new(
173            "/tmp/engine.wal".to_string(),
174            true,
175            false,
176            false,
177            false,
178            RecoverySafetyCheckpoint {
179                last_command_id: 42,
180                last_l2_seq: 7,
181                wal_offset: 128,
182            },
183            RecoverySafetyDrainMarker {
184                path: "/tmp/engine.drain".to_string(),
185                present_at_startup: false,
186            },
187        )
188    }
189
190    #[test]
191    fn recovery_safety_report_starts_passing_and_records_failures() {
192        let mut report = test_report();
193        report.pass("snapshot_decode", "snapshot decoded");
194        assert_eq!(report.status, RecoverySafetyStatus::Pass);
195
196        report.fail("drain_marker", "persistent drain marker present at startup");
197
198        assert_eq!(report.status, RecoverySafetyStatus::Fail);
199        assert_eq!(report.checks.len(), 2);
200        assert_eq!(report.checks[1].name, "drain_marker");
201    }
202
203    #[test]
204    fn shared_recovery_safety_report_stores_latest_report() {
205        let shared = shared_report();
206        let mut report = test_report();
207        report.fail("standard_margin_cash", "snapshot_cash=-1 durable_cash=1");
208
209        store_report(&Some(shared.clone()), &report);
210
211        let stored = shared
212            .read()
213            .expect("lock")
214            .clone()
215            .expect("report should be stored");
216        assert_eq!(stored.status, RecoverySafetyStatus::Fail);
217        assert_eq!(stored.checks[0].name, "standard_margin_cash");
218    }
219}