Skip to main content

hypercall/snapshot/instruments/
task.rs

1use metrics::{counter, gauge};
2use std::sync::Arc;
3use tokio::time::{interval, Duration};
4use tracing::{error, info};
5
6use crate::snapshot::traits::SnapshotWriter;
7use crate::snapshot::SnapshotTaskConfig;
8
9/// Background task that periodically persists instruments snapshots.
10pub struct InstrumentsSnapshotTask<W>
11where
12    W: SnapshotWriter + Send + Sync + 'static,
13{
14    writer: Arc<W>,
15    interval: Duration,
16}
17
18impl<W> InstrumentsSnapshotTask<W>
19where
20    W: SnapshotWriter + Send + Sync + 'static,
21{
22    pub fn new(writer: Arc<W>, config: SnapshotTaskConfig) -> Self {
23        Self {
24            writer,
25            interval: config.interval,
26        }
27    }
28
29    pub fn start_with_shutdown(
30        self: Arc<Self>,
31        mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
32    ) -> tokio::task::JoinHandle<()> {
33        tokio::spawn(async move {
34            info!(
35                "Starting instruments snapshot task (interval {:?})",
36                self.interval
37            );
38            let mut ticker = interval(self.interval);
39            loop {
40                tokio::select! {
41                    _ = shutdown_rx.recv() => {
42                        info!("Instruments snapshot task received shutdown signal");
43                        break;
44                    }
45                    _ = ticker.tick() => {
46                        if let Err(e) = self.take_snapshot().await {
47                            error!("Failed to take instruments snapshot: {}", e);
48                        }
49                    }
50                }
51            }
52            info!("Instruments snapshot task stopped");
53        })
54    }
55}
56
57#[async_trait::async_trait]
58impl<W> crate::shared::service::Service for InstrumentsSnapshotTask<W>
59where
60    W: SnapshotWriter + Send + Sync + 'static,
61{
62    fn name(&self) -> &'static str {
63        "InstrumentsSnapshotTask"
64    }
65
66    fn owner(&self) -> crate::shared::service::ServiceOwner {
67        crate::shared::service::ServiceOwner::Engine
68    }
69
70    async fn run(self: Arc<Self>, shutdown: crate::shared::ShutdownRx) -> anyhow::Result<()> {
71        self.start_with_shutdown(shutdown)
72            .await
73            .map_err(|e| anyhow::anyhow!("InstrumentsSnapshotTask task failed: {:?}", e))
74    }
75}
76
77impl<W> InstrumentsSnapshotTask<W>
78where
79    W: SnapshotWriter + Send + Sync + 'static,
80{
81    async fn take_snapshot(&self) -> anyhow::Result<()> {
82        let writer = self.writer.clone();
83        match tokio::task::spawn_blocking(move || writer.take_snapshot()).await? {
84            Ok(snapshot_id) => {
85                info!("Instruments snapshot saved with id={}", snapshot_id);
86                counter!("ht_instruments_snapshots_total", "status" => "success").increment(1);
87                gauge!("ht_instruments_snapshot_last_id").set(snapshot_id as f64);
88                gauge!("ht_instruments_snapshot_last_success_timestamp").set(
89                    std::time::SystemTime::now()
90                        .duration_since(std::time::UNIX_EPOCH)
91                        .unwrap()
92                        .as_secs() as f64,
93                );
94                Ok(())
95            }
96            Err(e) => {
97                counter!("ht_instruments_snapshots_total", "status" => "error").increment(1);
98                Err(anyhow::anyhow!("Snapshot failed: {}", e))
99            }
100        }
101    }
102}