Skip to main content

hypercall/snapshot/portfolio/
task.rs

1//! Portfolio snapshot background task.
2//!
3//! Runs as a separate service, started ONLY from the integrated (unified) server.
4//! This should NOT be started in standalone API or engine modes.
5
6use metrics::{counter, gauge};
7use std::collections::HashMap;
8use std::sync::Arc;
9use std::time::Duration;
10use tokio::time::interval;
11use tracing::{error, info};
12
13use crate::portfolio::{PortfolioBalance, PortfolioServiceImpl};
14use crate::read_cache::portfolio::ENGINE_COMMAND_SNAPSHOT_STREAM;
15use crate::snapshot::portfolio::db::DbPortfolioSnapshotWriter;
16use crate::snapshot::traits::SnapshotWriter;
17use hypercall_db::{JournalReplayReader, PortfolioSnapshotWriter};
18use hypercall_types::WalletAddress;
19
20/// Configuration for the snapshot task.
21pub struct SnapshotTaskConfig {
22    /// How often to take snapshots.
23    pub interval: Duration,
24}
25
26impl Default for SnapshotTaskConfig {
27    fn default() -> Self {
28        Self {
29            interval: Duration::from_secs(60),
30        }
31    }
32}
33
34/// Background task that periodically takes portfolio snapshots.
35pub struct PortfolioSnapshotTask {
36    db: Arc<dyn PortfolioSnapshotWriter>,
37    replay_reader: Arc<dyn JournalReplayReader>,
38    portfolio_service: Arc<PortfolioServiceImpl>,
39    capture_snapshot: Option<Arc<AtomicPortfolioSnapshotCapture>>,
40    config: SnapshotTaskConfig,
41}
42
43type AtomicPortfolioSnapshotCapture = dyn Fn() -> anyhow::Result<(
44        HashMap<WalletAddress, PortfolioBalance>,
45        HashMap<String, HashMap<i32, i64>>,
46    )> + Send
47    + Sync;
48
49impl PortfolioSnapshotTask {
50    /// Create a new snapshot task.
51    pub fn new(
52        db: Arc<dyn PortfolioSnapshotWriter>,
53        replay_reader: Arc<dyn JournalReplayReader>,
54        portfolio_service: Arc<PortfolioServiceImpl>,
55        config: SnapshotTaskConfig,
56    ) -> Self {
57        Self {
58            db,
59            replay_reader,
60            portfolio_service,
61            capture_snapshot: None,
62            config,
63        }
64    }
65
66    pub fn with_capture_snapshot(
67        mut self,
68        capture_snapshot: Arc<AtomicPortfolioSnapshotCapture>,
69    ) -> Self {
70        self.capture_snapshot = Some(capture_snapshot);
71        self
72    }
73
74    /// Start the background snapshot task (legacy, no shutdown support).
75    ///
76    /// This spawns a tokio task that runs until the process exits.
77    /// For graceful shutdown support, use `start_with_shutdown` instead.
78    pub fn start(self: Arc<Self>) {
79        // Create a dummy shutdown channel that never fires.
80        // IMPORTANT: We must keep the sender alive, otherwise recv() returns Closed immediately.
81        let (tx, rx) = tokio::sync::broadcast::channel::<()>(1);
82        std::mem::forget(tx);
83        self.start_with_shutdown(rx);
84    }
85
86    /// Start the background snapshot task with shutdown support.
87    ///
88    /// The task will exit gracefully when the shutdown signal is received.
89    /// Returns a JoinHandle that can be awaited or added to a TaskGroup.
90    pub fn start_with_shutdown(
91        self: Arc<Self>,
92        mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
93    ) -> tokio::task::JoinHandle<()> {
94        let task = self.clone();
95        tokio::spawn(async move {
96            info!(
97                "Starting portfolio snapshot task (interval: {:?})",
98                task.config.interval
99            );
100
101            let mut interval = interval(task.config.interval);
102
103            loop {
104                tokio::select! {
105                    _ = shutdown_rx.recv() => {
106                        info!("Portfolio snapshot task received shutdown signal");
107                        break;
108                    }
109                    _ = interval.tick() => {
110                        if let Err(e) = task.take_snapshot().await {
111                            error!("Failed to take portfolio snapshot: {}", e);
112                        }
113                    }
114                }
115            }
116
117            // Take a final snapshot on graceful shutdown to ensure state is persisted
118            info!("Taking final portfolio snapshot before shutdown...");
119            if let Err(e) = task.take_snapshot().await {
120                error!("Failed to take final portfolio snapshot on shutdown: {}", e);
121            } else {
122                info!("Final portfolio snapshot saved successfully");
123            }
124
125            info!("Portfolio snapshot task stopped");
126        })
127    }
128}
129
130#[async_trait::async_trait]
131impl crate::shared::service::Service for PortfolioSnapshotTask {
132    fn name(&self) -> &'static str {
133        "PortfolioSnapshotTask"
134    }
135
136    fn owner(&self) -> crate::shared::service::ServiceOwner {
137        crate::shared::service::ServiceOwner::Engine
138    }
139
140    async fn run(self: Arc<Self>, shutdown: crate::shared::ShutdownRx) -> anyhow::Result<()> {
141        self.start_with_shutdown(shutdown)
142            .await
143            .map_err(|e| anyhow::anyhow!("PortfolioSnapshotTask task failed: {:?}", e))
144    }
145}
146
147impl PortfolioSnapshotTask {
148    async fn take_snapshot(&self) -> anyhow::Result<()> {
149        let replay_reader = self.replay_reader.clone();
150        let get_offsets = move || {
151            let next_command_id = replay_reader.get_next_engine_command_id_sync()?;
152            let mut offsets = HashMap::new();
153            offsets.insert(
154                ENGINE_COMMAND_SNAPSHOT_STREAM.to_string(),
155                HashMap::from([(0, next_command_id)]),
156            );
157            Ok(offsets)
158        };
159
160        let mut writer = DbPortfolioSnapshotWriter::new(
161            self.db.clone(),
162            self.portfolio_service.clone(),
163            get_offsets,
164        );
165        if let Some(capture_snapshot) = &self.capture_snapshot {
166            let capture_snapshot = capture_snapshot.clone();
167            writer = writer.with_capture_snapshot(move || capture_snapshot());
168        }
169
170        match writer.take_snapshot() {
171            Ok(snapshot_id) => {
172                info!("Portfolio snapshot saved with id={}", snapshot_id);
173                counter!("ht_portfolio_snapshots_total", "status" => "success").increment(1);
174                gauge!("ht_portfolio_snapshot_last_id").set(snapshot_id as f64);
175                gauge!("ht_portfolio_snapshot_last_success_timestamp").set(
176                    std::time::SystemTime::now()
177                        .duration_since(std::time::UNIX_EPOCH)
178                        .unwrap()
179                        .as_secs() as f64,
180                );
181                Ok(())
182            }
183            Err(e) => {
184                error!("Snapshot failed: {}", e);
185                counter!("ht_portfolio_snapshots_total", "status" => "error").increment(1);
186                Err(anyhow::anyhow!("Snapshot failed: {}", e))
187            }
188        }
189    }
190}