hypercall/snapshot/instruments/
task.rs1use metrics::{counter, gauge};
2use std::sync::Arc;
3use tokio::time::{interval, Duration};
4use tracing::{error, info};
5
6use crate::snapshot::traits::SnapshotWriter;
7use crate::snapshot::SnapshotTaskConfig;
8
9pub struct InstrumentsSnapshotTask<W>
11where
12 W: SnapshotWriter + Send + Sync + 'static,
13{
14 writer: Arc<W>,
15 interval: Duration,
16}
17
18impl<W> InstrumentsSnapshotTask<W>
19where
20 W: SnapshotWriter + Send + Sync + 'static,
21{
22 pub fn new(writer: Arc<W>, config: SnapshotTaskConfig) -> Self {
23 Self {
24 writer,
25 interval: config.interval,
26 }
27 }
28
29 pub fn start_with_shutdown(
30 self: Arc<Self>,
31 mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
32 ) -> tokio::task::JoinHandle<()> {
33 tokio::spawn(async move {
34 info!(
35 "Starting instruments snapshot task (interval {:?})",
36 self.interval
37 );
38 let mut ticker = interval(self.interval);
39 loop {
40 tokio::select! {
41 _ = shutdown_rx.recv() => {
42 info!("Instruments snapshot task received shutdown signal");
43 break;
44 }
45 _ = ticker.tick() => {
46 if let Err(e) = self.take_snapshot().await {
47 error!("Failed to take instruments snapshot: {}", e);
48 }
49 }
50 }
51 }
52 info!("Instruments snapshot task stopped");
53 })
54 }
55}
56
57#[async_trait::async_trait]
58impl<W> crate::shared::service::Service for InstrumentsSnapshotTask<W>
59where
60 W: SnapshotWriter + Send + Sync + 'static,
61{
62 fn name(&self) -> &'static str {
63 "InstrumentsSnapshotTask"
64 }
65
66 fn owner(&self) -> crate::shared::service::ServiceOwner {
67 crate::shared::service::ServiceOwner::Engine
68 }
69
70 async fn run(self: Arc<Self>, shutdown: crate::shared::ShutdownRx) -> anyhow::Result<()> {
71 self.start_with_shutdown(shutdown)
72 .await
73 .map_err(|e| anyhow::anyhow!("InstrumentsSnapshotTask task failed: {:?}", e))
74 }
75}
76
77impl<W> InstrumentsSnapshotTask<W>
78where
79 W: SnapshotWriter + Send + Sync + 'static,
80{
81 async fn take_snapshot(&self) -> anyhow::Result<()> {
82 let writer = self.writer.clone();
83 match tokio::task::spawn_blocking(move || writer.take_snapshot()).await? {
84 Ok(snapshot_id) => {
85 info!("Instruments snapshot saved with id={}", snapshot_id);
86 counter!("ht_instruments_snapshots_total", "status" => "success").increment(1);
87 gauge!("ht_instruments_snapshot_last_id").set(snapshot_id as f64);
88 gauge!("ht_instruments_snapshot_last_success_timestamp").set(
89 std::time::SystemTime::now()
90 .duration_since(std::time::UNIX_EPOCH)
91 .unwrap()
92 .as_secs() as f64,
93 );
94 Ok(())
95 }
96 Err(e) => {
97 counter!("ht_instruments_snapshots_total", "status" => "error").increment(1);
98 Err(anyhow::anyhow!("Snapshot failed: {}", e))
99 }
100 }
101 }
102}