hypercall/snapshot/portfolio/
task.rs1use metrics::{counter, gauge};
7use std::collections::HashMap;
8use std::sync::Arc;
9use std::time::Duration;
10use tokio::time::interval;
11use tracing::{error, info};
12
13use crate::portfolio::{PortfolioBalance, PortfolioServiceImpl};
14use crate::read_cache::portfolio::ENGINE_COMMAND_SNAPSHOT_STREAM;
15use crate::snapshot::portfolio::db::DbPortfolioSnapshotWriter;
16use crate::snapshot::traits::SnapshotWriter;
17use hypercall_db::{JournalReplayReader, PortfolioSnapshotWriter};
18use hypercall_types::WalletAddress;
19
20pub struct SnapshotTaskConfig {
22 pub interval: Duration,
24}
25
26impl Default for SnapshotTaskConfig {
27 fn default() -> Self {
28 Self {
29 interval: Duration::from_secs(60),
30 }
31 }
32}
33
34pub struct PortfolioSnapshotTask {
36 db: Arc<dyn PortfolioSnapshotWriter>,
37 replay_reader: Arc<dyn JournalReplayReader>,
38 portfolio_service: Arc<PortfolioServiceImpl>,
39 capture_snapshot: Option<Arc<AtomicPortfolioSnapshotCapture>>,
40 config: SnapshotTaskConfig,
41}
42
43type AtomicPortfolioSnapshotCapture = dyn Fn() -> anyhow::Result<(
44 HashMap<WalletAddress, PortfolioBalance>,
45 HashMap<String, HashMap<i32, i64>>,
46 )> + Send
47 + Sync;
48
49impl PortfolioSnapshotTask {
50 pub fn new(
52 db: Arc<dyn PortfolioSnapshotWriter>,
53 replay_reader: Arc<dyn JournalReplayReader>,
54 portfolio_service: Arc<PortfolioServiceImpl>,
55 config: SnapshotTaskConfig,
56 ) -> Self {
57 Self {
58 db,
59 replay_reader,
60 portfolio_service,
61 capture_snapshot: None,
62 config,
63 }
64 }
65
66 pub fn with_capture_snapshot(
67 mut self,
68 capture_snapshot: Arc<AtomicPortfolioSnapshotCapture>,
69 ) -> Self {
70 self.capture_snapshot = Some(capture_snapshot);
71 self
72 }
73
74 pub fn start(self: Arc<Self>) {
79 let (tx, rx) = tokio::sync::broadcast::channel::<()>(1);
82 std::mem::forget(tx);
83 self.start_with_shutdown(rx);
84 }
85
86 pub fn start_with_shutdown(
91 self: Arc<Self>,
92 mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
93 ) -> tokio::task::JoinHandle<()> {
94 let task = self.clone();
95 tokio::spawn(async move {
96 info!(
97 "Starting portfolio snapshot task (interval: {:?})",
98 task.config.interval
99 );
100
101 let mut interval = interval(task.config.interval);
102
103 loop {
104 tokio::select! {
105 _ = shutdown_rx.recv() => {
106 info!("Portfolio snapshot task received shutdown signal");
107 break;
108 }
109 _ = interval.tick() => {
110 if let Err(e) = task.take_snapshot().await {
111 error!("Failed to take portfolio snapshot: {}", e);
112 }
113 }
114 }
115 }
116
117 info!("Taking final portfolio snapshot before shutdown...");
119 if let Err(e) = task.take_snapshot().await {
120 error!("Failed to take final portfolio snapshot on shutdown: {}", e);
121 } else {
122 info!("Final portfolio snapshot saved successfully");
123 }
124
125 info!("Portfolio snapshot task stopped");
126 })
127 }
128}
129
130#[async_trait::async_trait]
131impl crate::shared::service::Service for PortfolioSnapshotTask {
132 fn name(&self) -> &'static str {
133 "PortfolioSnapshotTask"
134 }
135
136 fn owner(&self) -> crate::shared::service::ServiceOwner {
137 crate::shared::service::ServiceOwner::Engine
138 }
139
140 async fn run(self: Arc<Self>, shutdown: crate::shared::ShutdownRx) -> anyhow::Result<()> {
141 self.start_with_shutdown(shutdown)
142 .await
143 .map_err(|e| anyhow::anyhow!("PortfolioSnapshotTask task failed: {:?}", e))
144 }
145}
146
147impl PortfolioSnapshotTask {
148 async fn take_snapshot(&self) -> anyhow::Result<()> {
149 let replay_reader = self.replay_reader.clone();
150 let get_offsets = move || {
151 let next_command_id = replay_reader.get_next_engine_command_id_sync()?;
152 let mut offsets = HashMap::new();
153 offsets.insert(
154 ENGINE_COMMAND_SNAPSHOT_STREAM.to_string(),
155 HashMap::from([(0, next_command_id)]),
156 );
157 Ok(offsets)
158 };
159
160 let mut writer = DbPortfolioSnapshotWriter::new(
161 self.db.clone(),
162 self.portfolio_service.clone(),
163 get_offsets,
164 );
165 if let Some(capture_snapshot) = &self.capture_snapshot {
166 let capture_snapshot = capture_snapshot.clone();
167 writer = writer.with_capture_snapshot(move || capture_snapshot());
168 }
169
170 match writer.take_snapshot() {
171 Ok(snapshot_id) => {
172 info!("Portfolio snapshot saved with id={}", snapshot_id);
173 counter!("ht_portfolio_snapshots_total", "status" => "success").increment(1);
174 gauge!("ht_portfolio_snapshot_last_id").set(snapshot_id as f64);
175 gauge!("ht_portfolio_snapshot_last_success_timestamp").set(
176 std::time::SystemTime::now()
177 .duration_since(std::time::UNIX_EPOCH)
178 .unwrap()
179 .as_secs() as f64,
180 );
181 Ok(())
182 }
183 Err(e) => {
184 error!("Snapshot failed: {}", e);
185 counter!("ht_portfolio_snapshots_total", "status" => "error").increment(1);
186 Err(anyhow::anyhow!("Snapshot failed: {}", e))
187 }
188 }
189 }
190}