Skip to main content

hypercall_journal/
checkpoint.rs

1use serde::{Deserialize, Serialize};
2use std::fs::{create_dir_all, OpenOptions};
3use std::io::Write;
4use std::path::{Path, PathBuf};
5
6pub const DEFAULT_WAL_PATH: &str = "/var/tmp/hypertheta-engine-journal.wal";
7
8/// Recovery checkpoint metadata stored in {WAL_PATH}.checkpoint.
9#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
10#[serde(deny_unknown_fields)]
11pub struct WalCheckpointMetadata {
12    pub wal_offset: u64,
13    pub last_command_id: i64,
14    pub last_l2_seq: i64,
15}
16
17impl WalCheckpointMetadata {
18    pub const ZERO: Self = Self {
19        wal_offset: 0,
20        last_command_id: 0,
21        last_l2_seq: 0,
22    };
23}
24
25pub fn wal_path_is_explicitly_configured(configured_wal_path: Option<&PathBuf>) -> bool {
26    configured_wal_path.is_some()
27        || std::env::var("ENGINE_JOURNAL_WAL_PATH")
28            .ok()
29            .is_some_and(|value| !value.trim().is_empty())
30}
31
32pub fn wal_path_from_config(configured_wal_path: Option<&PathBuf>) -> PathBuf {
33    std::env::var("ENGINE_JOURNAL_WAL_PATH")
34        .ok()
35        .filter(|value| !value.trim().is_empty())
36        .map(PathBuf::from)
37        .or_else(|| configured_wal_path.cloned())
38        .unwrap_or_else(|| PathBuf::from(DEFAULT_WAL_PATH))
39}
40
41pub fn checkpoint_path_for(wal_path: &Path) -> PathBuf {
42    let mut os = wal_path.as_os_str().to_os_string();
43    os.push(".checkpoint");
44    PathBuf::from(os)
45}
46
47pub fn checkpoint_path_from_config(configured_wal_path: Option<&PathBuf>) -> PathBuf {
48    checkpoint_path_for(&wal_path_from_config(configured_wal_path))
49}
50
51pub fn read_checkpoint(path: &Path) -> Result<WalCheckpointMetadata, String> {
52    if !path.exists() {
53        return Ok(WalCheckpointMetadata::ZERO);
54    }
55
56    let content = std::fs::read_to_string(path)
57        .map_err(|e| format!("failed to read checkpoint {}: {}", path.display(), e))?;
58    let trimmed = content.trim();
59    if trimmed.is_empty() {
60        return Err(format!("checkpoint {} is empty", path.display()));
61    }
62
63    let parsed: WalCheckpointMetadata = sonic_rs::from_str(trimmed).map_err(|e| {
64        format!(
65            "failed to parse checkpoint {} with value {:?}: {}",
66            path.display(),
67            trimmed,
68            e
69        )
70    })?;
71
72    if parsed.last_command_id < 0 {
73        return Err(format!(
74            "checkpoint {} has negative last_command_id {}",
75            path.display(),
76            parsed.last_command_id
77        ));
78    }
79    if parsed.last_l2_seq < 0 {
80        return Err(format!(
81            "checkpoint {} has negative last_l2_seq {}",
82            path.display(),
83            parsed.last_l2_seq
84        ));
85    }
86
87    Ok(parsed)
88}
89
90pub fn write_checkpoint(path: &Path, checkpoint: WalCheckpointMetadata) -> Result<(), String> {
91    if let Some(parent) = path.parent() {
92        if !parent.as_os_str().is_empty() {
93            create_dir_all(parent).map_err(|e| {
94                format!(
95                    "failed to create checkpoint directory {}: {}",
96                    parent.display(),
97                    e
98                )
99            })?;
100        }
101    }
102
103    let mut tmp_os = path.as_os_str().to_os_string();
104    tmp_os.push(".tmp");
105    let tmp_path = PathBuf::from(tmp_os);
106
107    #[cfg(unix)]
108    let mut file = {
109        use std::os::unix::fs::OpenOptionsExt;
110        OpenOptions::new()
111            .create(true)
112            .truncate(true)
113            .write(true)
114            .mode(0o600)
115            .open(&tmp_path)
116            .map_err(|e| {
117                format!(
118                    "failed to open checkpoint temp {}: {}",
119                    tmp_path.display(),
120                    e
121                )
122            })?
123    };
124    #[cfg(not(unix))]
125    let mut file = OpenOptions::new()
126        .create(true)
127        .truncate(true)
128        .write(true)
129        .open(&tmp_path)
130        .map_err(|e| {
131            format!(
132                "failed to open checkpoint temp {}: {}",
133                tmp_path.display(),
134                e
135            )
136        })?;
137
138    let payload = sonic_rs::to_vec(&checkpoint)
139        .map_err(|e| format!("failed to serialize checkpoint metadata: {}", e))?;
140    file.write_all(&payload).map_err(|e| {
141        format!(
142            "failed to write checkpoint temp {}: {}",
143            tmp_path.display(),
144            e
145        )
146    })?;
147    file.sync_data().map_err(|e| {
148        format!(
149            "failed to sync checkpoint temp {}: {}",
150            tmp_path.display(),
151            e
152        )
153    })?;
154
155    std::fs::rename(&tmp_path, path).map_err(|e| {
156        format!(
157            "failed to atomically replace checkpoint {} with {}: {}",
158            path.display(),
159            tmp_path.display(),
160            e
161        )
162    })?;
163
164    #[cfg(unix)]
165    {
166        let parent = path.parent().ok_or_else(|| {
167            format!(
168                "checkpoint {} has no parent directory to sync",
169                path.display()
170            )
171        })?;
172        let parent = if parent.as_os_str().is_empty() {
173            Path::new(".")
174        } else {
175            parent
176        };
177
178        OpenOptions::new()
179            .read(true)
180            .open(parent)
181            .and_then(|dir| dir.sync_all())
182            .map_err(|e| {
183                format!(
184                    "failed to sync checkpoint directory {}: {}",
185                    parent.display(),
186                    e
187                )
188            })?;
189    }
190
191    Ok(())
192}
193
194#[cfg(test)]
195mod tests {
196    use super::*;
197    use serial_test::serial;
198
199    struct EnvVarGuard {
200        key: &'static str,
201        old_value: Option<String>,
202    }
203
204    impl EnvVarGuard {
205        fn set(key: &'static str, value: &str) -> Self {
206            let old_value = std::env::var(key).ok();
207            std::env::set_var(key, value);
208            Self { key, old_value }
209        }
210
211        fn unset(key: &'static str) -> Self {
212            let old_value = std::env::var(key).ok();
213            std::env::remove_var(key);
214            Self { key, old_value }
215        }
216    }
217
218    impl Drop for EnvVarGuard {
219        fn drop(&mut self) {
220            if let Some(old_value) = &self.old_value {
221                std::env::set_var(self.key, old_value);
222            } else {
223                std::env::remove_var(self.key);
224            }
225        }
226    }
227
228    #[test]
229    fn checkpoint_roundtrip_json() {
230        let dir = tempfile::tempdir().expect("create tempdir");
231        let path = dir.path().join("checkpoint");
232        let expected = WalCheckpointMetadata {
233            wal_offset: 123,
234            last_command_id: 456,
235            last_l2_seq: 789,
236        };
237
238        write_checkpoint(&path, expected).expect("write checkpoint");
239        let actual = read_checkpoint(&path).expect("read checkpoint");
240
241        assert_eq!(actual, expected);
242    }
243
244    #[test]
245    fn missing_checkpoint_is_zero() {
246        let dir = tempfile::tempdir().expect("create tempdir");
247        let path = dir.path().join("missing.checkpoint");
248
249        assert_eq!(
250            read_checkpoint(&path).expect("read checkpoint"),
251            WalCheckpointMetadata::ZERO
252        );
253    }
254
255    #[test]
256    fn legacy_numeric_checkpoint_is_rejected() {
257        let dir = tempfile::tempdir().expect("create tempdir");
258        let path = dir.path().join("checkpoint");
259        std::fs::write(&path, b"42").expect("write legacy checkpoint");
260
261        let err = read_checkpoint(&path).expect_err("legacy checkpoint must fail");
262        assert!(
263            err.contains("invalid type") || err.contains("expected"),
264            "unexpected error: {err}"
265        );
266    }
267
268    #[test]
269    fn checkpoint_path_appends_suffix_to_full_wal_path() {
270        let path = checkpoint_path_for(Path::new("/var/tmp/engine.wal"));
271        assert_eq!(path, PathBuf::from("/var/tmp/engine.wal.checkpoint"));
272    }
273
274    #[test]
275    #[serial]
276    fn wal_path_from_config_prefers_env_override() {
277        let _guard = EnvVarGuard::set("ENGINE_JOURNAL_WAL_PATH", "/tmp/from-env.wal");
278        let configured = PathBuf::from("/tmp/from-config.wal");
279
280        assert_eq!(
281            wal_path_from_config(Some(&configured)),
282            PathBuf::from("/tmp/from-env.wal")
283        );
284    }
285
286    #[test]
287    #[serial]
288    fn wal_path_from_config_ignores_blank_env_override() {
289        let _guard = EnvVarGuard::set("ENGINE_JOURNAL_WAL_PATH", "   ");
290        let configured = PathBuf::from("/tmp/from-config.wal");
291
292        assert_eq!(wal_path_from_config(Some(&configured)), configured);
293    }
294
295    #[test]
296    #[serial]
297    fn wal_path_from_config_uses_env_when_config_missing() {
298        let _guard = EnvVarGuard::set("ENGINE_JOURNAL_WAL_PATH", "/tmp/from-env.wal");
299
300        assert_eq!(
301            wal_path_from_config(None),
302            PathBuf::from("/tmp/from-env.wal")
303        );
304    }
305
306    #[test]
307    #[serial]
308    fn wal_path_from_config_uses_default_when_unset() {
309        let _guard = EnvVarGuard::unset("ENGINE_JOURNAL_WAL_PATH");
310
311        assert_eq!(wal_path_from_config(None), PathBuf::from(DEFAULT_WAL_PATH));
312    }
313
314    #[test]
315    #[serial]
316    fn explicit_config_detection_checks_config_and_env() {
317        let configured = PathBuf::from("/tmp/configured.wal");
318        {
319            let _guard = EnvVarGuard::unset("ENGINE_JOURNAL_WAL_PATH");
320            assert!(wal_path_is_explicitly_configured(Some(&configured)));
321            assert!(!wal_path_is_explicitly_configured(None));
322        }
323        {
324            let _guard = EnvVarGuard::set("ENGINE_JOURNAL_WAL_PATH", "/tmp/from-env.wal");
325            assert!(wal_path_is_explicitly_configured(None));
326        }
327    }
328}