Skip to main content

hypercall_runtime_api/
recovery_safety.rs

1use chrono::{DateTime, Utc};
2use metrics::gauge;
3use serde::{Deserialize, Serialize};
4use std::sync::{Arc, RwLock};
5
6#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize, utoipa::ToSchema)]
7#[serde(rename_all = "snake_case")]
8pub enum RecoverySafetyStatus {
9    Pass,
10    Fail,
11}
12
13#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
14pub struct RecoverySafetyCheck {
15    pub name: String,
16    pub status: RecoverySafetyStatus,
17    pub message: String,
18}
19
20#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
21pub struct RecoverySafetyCheckpoint {
22    pub last_command_id: i64,
23    pub last_l2_seq: i64,
24    pub wal_offset: u64,
25}
26
27#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
28pub struct RecoverySafetySnapshot {
29    pub path: String,
30    pub present: bool,
31    pub loaded: bool,
32    pub last_command_id: Option<i64>,
33    pub last_l2_seq: Option<i64>,
34    pub order_count: Option<usize>,
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
38pub struct RecoverySafetyReplayCoverage {
39    pub checked: bool,
40    pub min_command_id: Option<i64>,
41    pub max_command_id: Option<i64>,
42}
43
44#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
45pub struct RecoverySafetyDrainMarker {
46    pub path: String,
47    pub present_at_startup: bool,
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
51pub struct RecoverySafetyCash {
52    pub validation_checked: bool,
53    pub validation_passed: Option<bool>,
54    pub message: Option<String>,
55}
56
57#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
58pub struct RecoverySafetyReport {
59    pub status: RecoverySafetyStatus,
60    pub checked_at: DateTime<Utc>,
61    pub wal_path: String,
62    pub wal_path_configured: bool,
63    pub require_snapshot_restore: bool,
64    pub unsafe_replay_override: bool,
65    pub cash_divergence_override: bool,
66    pub checkpoint: RecoverySafetyCheckpoint,
67    pub snapshot: Option<RecoverySafetySnapshot>,
68    pub replay_coverage: RecoverySafetyReplayCoverage,
69    pub drain_marker: RecoverySafetyDrainMarker,
70    pub cash: RecoverySafetyCash,
71    pub checks: Vec<RecoverySafetyCheck>,
72}
73
74#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
75pub struct RecoverySafetyBuildInfo {
76    pub version: String,
77    pub commit: String,
78    pub git_ref: String,
79    pub build_time: String,
80}
81
82#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
83pub struct RecoverySafetyMonitoringResponse {
84    pub status: RecoverySafetyStatus,
85    pub build_info: RecoverySafetyBuildInfo,
86    pub report: RecoverySafetyReport,
87}
88
89#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
90pub struct RecoverySafetyAlertPoint {
91    pub status: RecoverySafetyStatus,
92    pub value: i64,
93    pub message: String,
94}
95
96pub type SharedRecoverySafetyReport = Arc<RwLock<Option<RecoverySafetyReport>>>;
97
98pub fn shared_report() -> SharedRecoverySafetyReport {
99    let report = Arc::new(RwLock::new(None));
100    // Start pessimistic until startup writes the first gate result.
101    // This makes missing or delayed reporting visible in alerts.
102    gauge!("ht_recovery_safety_gate_passed", "service" => "api").set(0.0);
103    report
104}
105
106impl RecoverySafetyReport {
107    #[allow(clippy::too_many_arguments)]
108    pub fn new(
109        wal_path: String,
110        wal_path_configured: bool,
111        require_snapshot_restore: bool,
112        unsafe_replay_override: bool,
113        cash_divergence_override: bool,
114        checkpoint: RecoverySafetyCheckpoint,
115        drain_marker: RecoverySafetyDrainMarker,
116    ) -> Self {
117        Self {
118            status: RecoverySafetyStatus::Pass,
119            checked_at: Utc::now(),
120            wal_path,
121            wal_path_configured,
122            require_snapshot_restore,
123            unsafe_replay_override,
124            cash_divergence_override,
125            checkpoint,
126            snapshot: None,
127            replay_coverage: RecoverySafetyReplayCoverage {
128                checked: false,
129                min_command_id: None,
130                max_command_id: None,
131            },
132            drain_marker,
133            cash: RecoverySafetyCash {
134                validation_checked: false,
135                validation_passed: None,
136                message: None,
137            },
138            checks: Vec::new(),
139        }
140    }
141
142    pub fn pass(&mut self, name: impl Into<String>, message: impl Into<String>) {
143        self.checks.push(RecoverySafetyCheck {
144            name: name.into(),
145            status: RecoverySafetyStatus::Pass,
146            message: message.into(),
147        });
148    }
149
150    pub fn fail(&mut self, name: impl Into<String>, message: impl Into<String>) {
151        self.status = RecoverySafetyStatus::Fail;
152        self.checks.push(RecoverySafetyCheck {
153            name: name.into(),
154            status: RecoverySafetyStatus::Fail,
155            message: message.into(),
156        });
157    }
158}
159
160pub fn store_report(target: &Option<SharedRecoverySafetyReport>, report: &RecoverySafetyReport) {
161    if let Some(target) = target {
162        let mut guard = target
163            .write()
164            .expect("recovery safety report lock poisoned");
165        *guard = Some(report.clone());
166    }
167    let value = match report.status {
168        RecoverySafetyStatus::Pass => 1.0,
169        RecoverySafetyStatus::Fail => 0.0,
170    };
171    gauge!("ht_recovery_safety_gate_passed", "service" => "api").set(value);
172}
173
174#[cfg(test)]
175mod tests {
176    use super::*;
177
178    fn test_report() -> RecoverySafetyReport {
179        RecoverySafetyReport::new(
180            "/tmp/engine.wal".to_string(),
181            true,
182            false,
183            false,
184            false,
185            RecoverySafetyCheckpoint {
186                last_command_id: 42,
187                last_l2_seq: 7,
188                wal_offset: 128,
189            },
190            RecoverySafetyDrainMarker {
191                path: "/tmp/engine.drain".to_string(),
192                present_at_startup: false,
193            },
194        )
195    }
196
197    #[test]
198    fn recovery_safety_report_starts_passing_and_records_failures() {
199        let mut report = test_report();
200        report.pass("snapshot_decode", "snapshot decoded");
201        assert_eq!(report.status, RecoverySafetyStatus::Pass);
202
203        report.fail("drain_marker", "persistent drain marker present at startup");
204
205        assert_eq!(report.status, RecoverySafetyStatus::Fail);
206        assert_eq!(report.checks.len(), 2);
207        assert_eq!(report.checks[1].name, "drain_marker");
208    }
209
210    #[test]
211    fn shared_recovery_safety_report_stores_latest_report() {
212        let shared = shared_report();
213        let mut report = test_report();
214        report.fail("standard_margin_cash", "snapshot_cash=-1 durable_cash=1");
215
216        store_report(&Some(shared.clone()), &report);
217
218        let stored = shared
219            .read()
220            .expect("lock")
221            .clone()
222            .expect("report should be stored");
223        assert_eq!(stored.status, RecoverySafetyStatus::Fail);
224        assert_eq!(stored.checks[0].name, "standard_margin_cash");
225    }
226}