hypercall_journal/
checkpoint.rs1use serde::{Deserialize, Serialize};
2use std::fs::{create_dir_all, OpenOptions};
3use std::io::Write;
4use std::path::{Path, PathBuf};
5
6pub const DEFAULT_WAL_PATH: &str = "/var/tmp/hypertheta-engine-journal.wal";
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
10#[serde(deny_unknown_fields)]
11pub struct WalCheckpointMetadata {
12 pub wal_offset: u64,
13 pub last_command_id: i64,
14 pub last_l2_seq: i64,
15}
16
17impl WalCheckpointMetadata {
18 pub const ZERO: Self = Self {
19 wal_offset: 0,
20 last_command_id: 0,
21 last_l2_seq: 0,
22 };
23}
24
25pub fn wal_path_is_explicitly_configured(configured_wal_path: Option<&PathBuf>) -> bool {
26 configured_wal_path.is_some()
27 || std::env::var("ENGINE_JOURNAL_WAL_PATH")
28 .ok()
29 .is_some_and(|value| !value.trim().is_empty())
30}
31
32pub fn wal_path_from_config(configured_wal_path: Option<&PathBuf>) -> PathBuf {
33 std::env::var("ENGINE_JOURNAL_WAL_PATH")
34 .ok()
35 .filter(|value| !value.trim().is_empty())
36 .map(PathBuf::from)
37 .or_else(|| configured_wal_path.cloned())
38 .unwrap_or_else(|| PathBuf::from(DEFAULT_WAL_PATH))
39}
40
41pub fn checkpoint_path_for(wal_path: &Path) -> PathBuf {
42 let mut os = wal_path.as_os_str().to_os_string();
43 os.push(".checkpoint");
44 PathBuf::from(os)
45}
46
47pub fn checkpoint_path_from_config(configured_wal_path: Option<&PathBuf>) -> PathBuf {
48 checkpoint_path_for(&wal_path_from_config(configured_wal_path))
49}
50
51pub fn read_checkpoint(path: &Path) -> Result<WalCheckpointMetadata, String> {
52 if !path.exists() {
53 return Ok(WalCheckpointMetadata::ZERO);
54 }
55
56 let content = std::fs::read_to_string(path)
57 .map_err(|e| format!("failed to read checkpoint {}: {}", path.display(), e))?;
58 let trimmed = content.trim();
59 if trimmed.is_empty() {
60 return Err(format!("checkpoint {} is empty", path.display()));
61 }
62
63 let parsed: WalCheckpointMetadata = sonic_rs::from_str(trimmed).map_err(|e| {
64 format!(
65 "failed to parse checkpoint {} with value {:?}: {}",
66 path.display(),
67 trimmed,
68 e
69 )
70 })?;
71
72 if parsed.last_command_id < 0 {
73 return Err(format!(
74 "checkpoint {} has negative last_command_id {}",
75 path.display(),
76 parsed.last_command_id
77 ));
78 }
79 if parsed.last_l2_seq < 0 {
80 return Err(format!(
81 "checkpoint {} has negative last_l2_seq {}",
82 path.display(),
83 parsed.last_l2_seq
84 ));
85 }
86
87 Ok(parsed)
88}
89
90pub fn write_checkpoint(path: &Path, checkpoint: WalCheckpointMetadata) -> Result<(), String> {
91 if let Some(parent) = path.parent() {
92 if !parent.as_os_str().is_empty() {
93 create_dir_all(parent).map_err(|e| {
94 format!(
95 "failed to create checkpoint directory {}: {}",
96 parent.display(),
97 e
98 )
99 })?;
100 }
101 }
102
103 let mut tmp_os = path.as_os_str().to_os_string();
104 tmp_os.push(".tmp");
105 let tmp_path = PathBuf::from(tmp_os);
106
107 #[cfg(unix)]
108 let mut file = {
109 use std::os::unix::fs::OpenOptionsExt;
110 OpenOptions::new()
111 .create(true)
112 .truncate(true)
113 .write(true)
114 .mode(0o600)
115 .open(&tmp_path)
116 .map_err(|e| {
117 format!(
118 "failed to open checkpoint temp {}: {}",
119 tmp_path.display(),
120 e
121 )
122 })?
123 };
124 #[cfg(not(unix))]
125 let mut file = OpenOptions::new()
126 .create(true)
127 .truncate(true)
128 .write(true)
129 .open(&tmp_path)
130 .map_err(|e| {
131 format!(
132 "failed to open checkpoint temp {}: {}",
133 tmp_path.display(),
134 e
135 )
136 })?;
137
138 let payload = sonic_rs::to_vec(&checkpoint)
139 .map_err(|e| format!("failed to serialize checkpoint metadata: {}", e))?;
140 file.write_all(&payload).map_err(|e| {
141 format!(
142 "failed to write checkpoint temp {}: {}",
143 tmp_path.display(),
144 e
145 )
146 })?;
147 file.sync_data().map_err(|e| {
148 format!(
149 "failed to sync checkpoint temp {}: {}",
150 tmp_path.display(),
151 e
152 )
153 })?;
154
155 std::fs::rename(&tmp_path, path).map_err(|e| {
156 format!(
157 "failed to atomically replace checkpoint {} with {}: {}",
158 path.display(),
159 tmp_path.display(),
160 e
161 )
162 })?;
163
164 #[cfg(unix)]
165 {
166 let parent = path.parent().ok_or_else(|| {
167 format!(
168 "checkpoint {} has no parent directory to sync",
169 path.display()
170 )
171 })?;
172 let parent = if parent.as_os_str().is_empty() {
173 Path::new(".")
174 } else {
175 parent
176 };
177
178 OpenOptions::new()
179 .read(true)
180 .open(parent)
181 .and_then(|dir| dir.sync_all())
182 .map_err(|e| {
183 format!(
184 "failed to sync checkpoint directory {}: {}",
185 parent.display(),
186 e
187 )
188 })?;
189 }
190
191 Ok(())
192}
193
194#[cfg(test)]
195mod tests {
196 use super::*;
197 use serial_test::serial;
198
199 struct EnvVarGuard {
200 key: &'static str,
201 old_value: Option<String>,
202 }
203
204 impl EnvVarGuard {
205 fn set(key: &'static str, value: &str) -> Self {
206 let old_value = std::env::var(key).ok();
207 std::env::set_var(key, value);
208 Self { key, old_value }
209 }
210
211 fn unset(key: &'static str) -> Self {
212 let old_value = std::env::var(key).ok();
213 std::env::remove_var(key);
214 Self { key, old_value }
215 }
216 }
217
218 impl Drop for EnvVarGuard {
219 fn drop(&mut self) {
220 if let Some(old_value) = &self.old_value {
221 std::env::set_var(self.key, old_value);
222 } else {
223 std::env::remove_var(self.key);
224 }
225 }
226 }
227
228 #[test]
229 fn checkpoint_roundtrip_json() {
230 let dir = tempfile::tempdir().expect("create tempdir");
231 let path = dir.path().join("checkpoint");
232 let expected = WalCheckpointMetadata {
233 wal_offset: 123,
234 last_command_id: 456,
235 last_l2_seq: 789,
236 };
237
238 write_checkpoint(&path, expected).expect("write checkpoint");
239 let actual = read_checkpoint(&path).expect("read checkpoint");
240
241 assert_eq!(actual, expected);
242 }
243
244 #[test]
245 fn missing_checkpoint_is_zero() {
246 let dir = tempfile::tempdir().expect("create tempdir");
247 let path = dir.path().join("missing.checkpoint");
248
249 assert_eq!(
250 read_checkpoint(&path).expect("read checkpoint"),
251 WalCheckpointMetadata::ZERO
252 );
253 }
254
255 #[test]
256 fn legacy_numeric_checkpoint_is_rejected() {
257 let dir = tempfile::tempdir().expect("create tempdir");
258 let path = dir.path().join("checkpoint");
259 std::fs::write(&path, b"42").expect("write legacy checkpoint");
260
261 let err = read_checkpoint(&path).expect_err("legacy checkpoint must fail");
262 assert!(
263 err.contains("invalid type") || err.contains("expected"),
264 "unexpected error: {err}"
265 );
266 }
267
268 #[test]
269 fn checkpoint_path_appends_suffix_to_full_wal_path() {
270 let path = checkpoint_path_for(Path::new("/var/tmp/engine.wal"));
271 assert_eq!(path, PathBuf::from("/var/tmp/engine.wal.checkpoint"));
272 }
273
274 #[test]
275 #[serial]
276 fn wal_path_from_config_prefers_env_override() {
277 let _guard = EnvVarGuard::set("ENGINE_JOURNAL_WAL_PATH", "/tmp/from-env.wal");
278 let configured = PathBuf::from("/tmp/from-config.wal");
279
280 assert_eq!(
281 wal_path_from_config(Some(&configured)),
282 PathBuf::from("/tmp/from-env.wal")
283 );
284 }
285
286 #[test]
287 #[serial]
288 fn wal_path_from_config_ignores_blank_env_override() {
289 let _guard = EnvVarGuard::set("ENGINE_JOURNAL_WAL_PATH", " ");
290 let configured = PathBuf::from("/tmp/from-config.wal");
291
292 assert_eq!(wal_path_from_config(Some(&configured)), configured);
293 }
294
295 #[test]
296 #[serial]
297 fn wal_path_from_config_uses_env_when_config_missing() {
298 let _guard = EnvVarGuard::set("ENGINE_JOURNAL_WAL_PATH", "/tmp/from-env.wal");
299
300 assert_eq!(
301 wal_path_from_config(None),
302 PathBuf::from("/tmp/from-env.wal")
303 );
304 }
305
306 #[test]
307 #[serial]
308 fn wal_path_from_config_uses_default_when_unset() {
309 let _guard = EnvVarGuard::unset("ENGINE_JOURNAL_WAL_PATH");
310
311 assert_eq!(wal_path_from_config(None), PathBuf::from(DEFAULT_WAL_PATH));
312 }
313
314 #[test]
315 #[serial]
316 fn explicit_config_detection_checks_config_and_env() {
317 let configured = PathBuf::from("/tmp/configured.wal");
318 {
319 let _guard = EnvVarGuard::unset("ENGINE_JOURNAL_WAL_PATH");
320 assert!(wal_path_is_explicitly_configured(Some(&configured)));
321 assert!(!wal_path_is_explicitly_configured(None));
322 }
323 {
324 let _guard = EnvVarGuard::set("ENGINE_JOURNAL_WAL_PATH", "/tmp/from-env.wal");
325 assert!(wal_path_is_explicitly_configured(None));
326 }
327 }
328}