Skip to main content

hypercall_db_diesel/
snapshots.rs

1//! Snapshot persistence implementations for DatabaseHandler.
2//!
3//! Implements InstrumentsSnapshotWriter, InstrumentsSnapshotReader,
4//! PortfolioSnapshotWriter, PortfolioSnapshotReader, and EngineCommandOffsetReader.
5
6use anyhow::{Context, Result};
7use diesel::prelude::*;
8use diesel::sql_types::{BigInt, Bytea, Int4, Text};
9use diesel::RunQueryDsl;
10use tracing::{debug, info};
11
12use crate::database_handler::DatabaseHandler;
13use hypercall_db::types::snapshots::{
14    InstrumentSnapshotEntry, InstrumentsSnapshotData, InstrumentsSnapshotInput,
15    PortfolioSnapshotAccountEntry, PortfolioSnapshotData, PortfolioSnapshotInput,
16    SnapshotOffsetEntry,
17};
18
19// ---- Diesel QueryableByName helper structs ----
20
21#[derive(QueryableByName)]
22struct SnapshotIdResult {
23    #[diesel(sql_type = BigInt)]
24    id: i64,
25}
26
27#[derive(QueryableByName)]
28struct InstrumentEntryRow {
29    #[diesel(sql_type = BigInt)]
30    #[allow(dead_code)]
31    snapshot_id: i64,
32    #[diesel(sql_type = Text)]
33    symbol: String,
34    #[diesel(sql_type = Bytea)]
35    data: Vec<u8>,
36}
37
38#[derive(QueryableByName)]
39struct PortfolioAccountRow {
40    #[diesel(sql_type = Text)]
41    wallet: String,
42    #[diesel(sql_type = Bytea)]
43    data: Vec<u8>,
44}
45
46#[derive(QueryableByName)]
47struct OffsetRow {
48    #[diesel(sql_type = Text)]
49    stream: String,
50    #[diesel(sql_type = Int4)]
51    partition: i32,
52    #[diesel(sql_type = BigInt)]
53    offset: i64,
54}
55
56// ---- InstrumentsSnapshotWriter ----
57
58impl hypercall_db::InstrumentsSnapshotWriter for DatabaseHandler {
59    fn write_instruments_snapshot_sync(&self, input: &InstrumentsSnapshotInput) -> Result<i64> {
60        let mut conn = self.pool().get().context("Failed to get DB connection")?;
61
62        conn.transaction(|conn| {
63            // Insert snapshot header
64            let snapshot_id: i64 = diesel::sql_query(
65                "INSERT INTO engine_snapshots (snapshot_type, engine_version) VALUES ($1, NULL) RETURNING id",
66            )
67            .bind::<Text, _>(&input.snapshot_type)
68            .get_result::<SnapshotIdResult>(conn)
69            .context("Failed to insert snapshot header")?
70            .id;
71
72            debug!(
73                "Created instruments snapshot header id={} entries={}",
74                snapshot_id,
75                input.entries.len()
76            );
77
78            // Insert entries
79            for entry in &input.entries {
80                diesel::sql_query(
81                    "INSERT INTO instruments_snapshot_entries (snapshot_id, symbol, data) VALUES ($1, $2, $3)",
82                )
83                .bind::<BigInt, _>(snapshot_id)
84                .bind::<Text, _>(&entry.symbol)
85                .bind::<Bytea, _>(&entry.data)
86                .execute(conn)
87                .with_context(|| format!("Failed to insert instrument {}", entry.symbol))?;
88            }
89
90            // Insert offsets
91            let mut offset_rows = 0;
92            for (stream, partitions) in &input.offsets {
93                for (partition, offset) in partitions {
94                    diesel::sql_query(
95                        "INSERT INTO instruments_snapshot_offsets (snapshot_id, stream, partition, \"offset\") VALUES ($1, $2, $3, $4)",
96                    )
97                    .bind::<BigInt, _>(snapshot_id)
98                    .bind::<Text, _>(stream)
99                    .bind::<Int4, _>(*partition)
100                    .bind::<BigInt, _>(*offset)
101                    .execute(conn)
102                    .with_context(|| format!("Failed to insert offset for {}", stream))?;
103                    offset_rows += 1;
104                }
105            }
106
107            // Cleanup old snapshots
108            let deleted = diesel::sql_query(
109                "DELETE FROM engine_snapshots WHERE snapshot_type = $1 AND id NOT IN (
110                    SELECT id FROM engine_snapshots WHERE snapshot_type = $1 ORDER BY created_at DESC LIMIT $2
111                )",
112            )
113            .bind::<Text, _>(&input.snapshot_type)
114            .bind::<BigInt, _>(input.retention_count)
115            .execute(conn)
116            .context("Failed to cleanup snapshots")?;
117
118            if deleted > 0 {
119                info!("Deleted {} old instruments snapshot(s)", deleted);
120            }
121
122            info!(
123                "Stored instruments snapshot id={} offset_rows={}",
124                snapshot_id, offset_rows
125            );
126            Ok(snapshot_id)
127        })
128    }
129}
130
131// ---- InstrumentsSnapshotReader ----
132
133impl hypercall_db::InstrumentsSnapshotReader for DatabaseHandler {
134    fn get_latest_instruments_snapshot_id_sync(&self) -> Result<Option<i64>> {
135        let mut conn = self.pool().get().context("Failed to get DB connection")?;
136
137        let result = diesel::sql_query(
138            "SELECT id FROM engine_snapshots WHERE snapshot_type = 'instruments' ORDER BY created_at DESC LIMIT 1",
139        )
140        .get_result::<SnapshotIdResult>(&mut conn)
141        .optional()
142        .context("Failed to fetch snapshot id")?;
143
144        Ok(result.map(|r| r.id))
145    }
146
147    fn load_instruments_snapshot_sync(&self, snapshot_id: i64) -> Result<InstrumentsSnapshotData> {
148        let mut conn = self.pool().get().context("Failed to get DB connection")?;
149
150        let entries = diesel::sql_query(
151            "SELECT snapshot_id, symbol, data FROM instruments_snapshot_entries WHERE snapshot_id = $1",
152        )
153        .bind::<BigInt, _>(snapshot_id)
154        .load::<InstrumentEntryRow>(&mut conn)
155        .context("Failed to load instrument rows")?;
156
157        let entry_data: Vec<InstrumentSnapshotEntry> = entries
158            .into_iter()
159            .map(|row| InstrumentSnapshotEntry {
160                symbol: row.symbol,
161                data: row.data,
162            })
163            .collect();
164
165        let offsets = diesel::sql_query(
166            "SELECT stream, partition, \"offset\" FROM instruments_snapshot_offsets WHERE snapshot_id = $1",
167        )
168        .bind::<BigInt, _>(snapshot_id)
169        .load::<OffsetRow>(&mut conn)
170        .context("Failed to load offsets")?;
171
172        let offset_data: Vec<SnapshotOffsetEntry> = offsets
173            .into_iter()
174            .map(|row| SnapshotOffsetEntry {
175                stream: row.stream,
176                partition: row.partition,
177                offset: row.offset,
178            })
179            .collect();
180
181        Ok(InstrumentsSnapshotData {
182            entries: entry_data,
183            offsets: offset_data,
184        })
185    }
186}
187
188// ---- PortfolioSnapshotWriter ----
189
190impl hypercall_db::PortfolioSnapshotWriter for DatabaseHandler {
191    fn write_portfolio_snapshot_sync(&self, input: &PortfolioSnapshotInput) -> Result<i64> {
192        let mut conn = self.pool().get().context("Failed to get DB connection")?;
193
194        conn.transaction(|conn| {
195            // Insert snapshot header
196            let snapshot_id: i64 = diesel::sql_query(
197                "INSERT INTO engine_snapshots (snapshot_type, engine_version) VALUES ('portfolio', NULL) RETURNING id",
198            )
199            .get_result::<SnapshotIdResult>(conn)
200            .context("Failed to insert snapshot header")?
201            .id;
202
203            debug!("Created portfolio snapshot header with id={}", snapshot_id);
204
205            // Insert accounts
206            let mut account_count = 0;
207            for account in &input.accounts {
208                diesel::sql_query(
209                    "INSERT INTO portfolio_snapshot_accounts (snapshot_id, wallet, data) VALUES ($1, $2, $3)",
210                )
211                .bind::<BigInt, _>(snapshot_id)
212                .bind::<Text, _>(&account.wallet)
213                .bind::<Bytea, _>(&account.data)
214                .execute(conn)
215                .with_context(|| format!("Failed to insert account {}", account.wallet))?;
216                account_count += 1;
217            }
218
219            debug!("Inserted {} portfolio accounts", account_count);
220
221            // Insert offsets
222            let mut offset_count = 0;
223            for (stream, partitions) in &input.offsets {
224                for (partition, offset) in partitions {
225                    diesel::sql_query(
226                        "INSERT INTO portfolio_snapshot_offsets (snapshot_id, stream, partition, \"offset\") VALUES ($1, $2, $3, $4)",
227                    )
228                    .bind::<BigInt, _>(snapshot_id)
229                    .bind::<Text, _>(stream)
230                    .bind::<Int4, _>(*partition)
231                    .bind::<BigInt, _>(*offset)
232                    .execute(conn)
233                    .with_context(|| format!("Failed to insert offset for {}", stream))?;
234                    offset_count += 1;
235                }
236            }
237
238            debug!("Inserted {} offset entries", offset_count);
239
240            // Cleanup old snapshots beyond retention limit
241            let deleted = diesel::sql_query(
242                "DELETE FROM engine_snapshots WHERE snapshot_type = 'portfolio' AND id NOT IN (
243                    SELECT id FROM engine_snapshots
244                    WHERE snapshot_type = 'portfolio'
245                    ORDER BY created_at DESC
246                    LIMIT $1
247                )",
248            )
249            .bind::<BigInt, _>(input.retention_count)
250            .execute(conn)
251            .context("Failed to cleanup old snapshots")?;
252
253            if deleted > 0 {
254                info!("Cleaned up {} old portfolio snapshot(s)", deleted);
255            }
256
257            info!(
258                "Created portfolio snapshot id={} with {} accounts and {} offsets (retaining {})",
259                snapshot_id, account_count, offset_count, input.retention_count
260            );
261
262            Ok(snapshot_id)
263        })
264    }
265}
266
267// ---- PortfolioSnapshotReader ----
268
269impl hypercall_db::PortfolioSnapshotReader for DatabaseHandler {
270    fn get_latest_portfolio_snapshot_id_sync(&self) -> Result<Option<i64>> {
271        let mut conn = self.pool().get().context("Failed to get DB connection")?;
272
273        let result = diesel::sql_query(
274            "SELECT id FROM engine_snapshots WHERE snapshot_type = 'portfolio' ORDER BY created_at DESC LIMIT 1",
275        )
276        .get_result::<SnapshotIdResult>(&mut conn)
277        .optional()
278        .context("Failed to query latest snapshot")?;
279
280        Ok(result.map(|r| r.id))
281    }
282
283    fn portfolio_snapshot_exists_sync(&self, snapshot_id: i64) -> Result<bool> {
284        let mut conn = self.pool().get().context("Failed to get DB connection")?;
285
286        let exists: Option<SnapshotIdResult> = diesel::sql_query(
287            "SELECT id FROM engine_snapshots WHERE id = $1 AND snapshot_type = 'portfolio'",
288        )
289        .bind::<BigInt, _>(snapshot_id)
290        .get_result(&mut conn)
291        .optional()
292        .context("Failed to verify snapshot")?;
293
294        Ok(exists.is_some())
295    }
296
297    fn load_portfolio_snapshot_sync(&self, snapshot_id: i64) -> Result<PortfolioSnapshotData> {
298        let mut conn = self.pool().get().context("Failed to get DB connection")?;
299
300        // Load accounts
301        let account_rows: Vec<PortfolioAccountRow> = diesel::sql_query(
302            "SELECT wallet, data FROM portfolio_snapshot_accounts WHERE snapshot_id = $1",
303        )
304        .bind::<BigInt, _>(snapshot_id)
305        .load(&mut conn)
306        .context("Failed to load accounts")?;
307
308        let accounts: Vec<PortfolioSnapshotAccountEntry> = account_rows
309            .into_iter()
310            .map(|row| PortfolioSnapshotAccountEntry {
311                wallet: row.wallet,
312                data: row.data,
313            })
314            .collect();
315
316        // Load offsets
317        let offset_rows: Vec<OffsetRow> = diesel::sql_query(
318            "SELECT stream, partition, \"offset\" FROM portfolio_snapshot_offsets WHERE snapshot_id = $1",
319        )
320        .bind::<BigInt, _>(snapshot_id)
321        .load(&mut conn)
322        .context("Failed to load offsets")?;
323
324        let offsets: Vec<SnapshotOffsetEntry> = offset_rows
325            .into_iter()
326            .map(|row| SnapshotOffsetEntry {
327                stream: row.stream,
328                partition: row.partition,
329                offset: row.offset,
330            })
331            .collect();
332
333        Ok(PortfolioSnapshotData { accounts, offsets })
334    }
335}
336
337// Note: get_next_engine_command_id_sync is implemented via JournalReplayReader
338// in hypercall-db-diesel/src/replay.rs. No separate EngineCommandOffsetReader needed.