1use anyhow::{Context, Result};
7use diesel::prelude::*;
8use diesel::sql_types::{BigInt, Bytea, Int4, Text};
9use diesel::RunQueryDsl;
10use tracing::{debug, info};
11
12use crate::database_handler::DatabaseHandler;
13use hypercall_db::types::snapshots::{
14 InstrumentSnapshotEntry, InstrumentsSnapshotData, InstrumentsSnapshotInput,
15 PortfolioSnapshotAccountEntry, PortfolioSnapshotData, PortfolioSnapshotInput,
16 SnapshotOffsetEntry,
17};
18
19#[derive(QueryableByName)]
22struct SnapshotIdResult {
23 #[diesel(sql_type = BigInt)]
24 id: i64,
25}
26
27#[derive(QueryableByName)]
28struct InstrumentEntryRow {
29 #[diesel(sql_type = BigInt)]
30 #[allow(dead_code)]
31 snapshot_id: i64,
32 #[diesel(sql_type = Text)]
33 symbol: String,
34 #[diesel(sql_type = Bytea)]
35 data: Vec<u8>,
36}
37
38#[derive(QueryableByName)]
39struct PortfolioAccountRow {
40 #[diesel(sql_type = Text)]
41 wallet: String,
42 #[diesel(sql_type = Bytea)]
43 data: Vec<u8>,
44}
45
46#[derive(QueryableByName)]
47struct OffsetRow {
48 #[diesel(sql_type = Text)]
49 stream: String,
50 #[diesel(sql_type = Int4)]
51 partition: i32,
52 #[diesel(sql_type = BigInt)]
53 offset: i64,
54}
55
56impl hypercall_db::InstrumentsSnapshotWriter for DatabaseHandler {
59 fn write_instruments_snapshot_sync(&self, input: &InstrumentsSnapshotInput) -> Result<i64> {
60 let mut conn = self.pool().get().context("Failed to get DB connection")?;
61
62 conn.transaction(|conn| {
63 let snapshot_id: i64 = diesel::sql_query(
65 "INSERT INTO engine_snapshots (snapshot_type, engine_version) VALUES ($1, NULL) RETURNING id",
66 )
67 .bind::<Text, _>(&input.snapshot_type)
68 .get_result::<SnapshotIdResult>(conn)
69 .context("Failed to insert snapshot header")?
70 .id;
71
72 debug!(
73 "Created instruments snapshot header id={} entries={}",
74 snapshot_id,
75 input.entries.len()
76 );
77
78 for entry in &input.entries {
80 diesel::sql_query(
81 "INSERT INTO instruments_snapshot_entries (snapshot_id, symbol, data) VALUES ($1, $2, $3)",
82 )
83 .bind::<BigInt, _>(snapshot_id)
84 .bind::<Text, _>(&entry.symbol)
85 .bind::<Bytea, _>(&entry.data)
86 .execute(conn)
87 .with_context(|| format!("Failed to insert instrument {}", entry.symbol))?;
88 }
89
90 let mut offset_rows = 0;
92 for (stream, partitions) in &input.offsets {
93 for (partition, offset) in partitions {
94 diesel::sql_query(
95 "INSERT INTO instruments_snapshot_offsets (snapshot_id, stream, partition, \"offset\") VALUES ($1, $2, $3, $4)",
96 )
97 .bind::<BigInt, _>(snapshot_id)
98 .bind::<Text, _>(stream)
99 .bind::<Int4, _>(*partition)
100 .bind::<BigInt, _>(*offset)
101 .execute(conn)
102 .with_context(|| format!("Failed to insert offset for {}", stream))?;
103 offset_rows += 1;
104 }
105 }
106
107 let deleted = diesel::sql_query(
109 "DELETE FROM engine_snapshots WHERE snapshot_type = $1 AND id NOT IN (
110 SELECT id FROM engine_snapshots WHERE snapshot_type = $1 ORDER BY created_at DESC LIMIT $2
111 )",
112 )
113 .bind::<Text, _>(&input.snapshot_type)
114 .bind::<BigInt, _>(input.retention_count)
115 .execute(conn)
116 .context("Failed to cleanup snapshots")?;
117
118 if deleted > 0 {
119 info!("Deleted {} old instruments snapshot(s)", deleted);
120 }
121
122 info!(
123 "Stored instruments snapshot id={} offset_rows={}",
124 snapshot_id, offset_rows
125 );
126 Ok(snapshot_id)
127 })
128 }
129}
130
131impl hypercall_db::InstrumentsSnapshotReader for DatabaseHandler {
134 fn get_latest_instruments_snapshot_id_sync(&self) -> Result<Option<i64>> {
135 let mut conn = self.pool().get().context("Failed to get DB connection")?;
136
137 let result = diesel::sql_query(
138 "SELECT id FROM engine_snapshots WHERE snapshot_type = 'instruments' ORDER BY created_at DESC LIMIT 1",
139 )
140 .get_result::<SnapshotIdResult>(&mut conn)
141 .optional()
142 .context("Failed to fetch snapshot id")?;
143
144 Ok(result.map(|r| r.id))
145 }
146
147 fn load_instruments_snapshot_sync(&self, snapshot_id: i64) -> Result<InstrumentsSnapshotData> {
148 let mut conn = self.pool().get().context("Failed to get DB connection")?;
149
150 let entries = diesel::sql_query(
151 "SELECT snapshot_id, symbol, data FROM instruments_snapshot_entries WHERE snapshot_id = $1",
152 )
153 .bind::<BigInt, _>(snapshot_id)
154 .load::<InstrumentEntryRow>(&mut conn)
155 .context("Failed to load instrument rows")?;
156
157 let entry_data: Vec<InstrumentSnapshotEntry> = entries
158 .into_iter()
159 .map(|row| InstrumentSnapshotEntry {
160 symbol: row.symbol,
161 data: row.data,
162 })
163 .collect();
164
165 let offsets = diesel::sql_query(
166 "SELECT stream, partition, \"offset\" FROM instruments_snapshot_offsets WHERE snapshot_id = $1",
167 )
168 .bind::<BigInt, _>(snapshot_id)
169 .load::<OffsetRow>(&mut conn)
170 .context("Failed to load offsets")?;
171
172 let offset_data: Vec<SnapshotOffsetEntry> = offsets
173 .into_iter()
174 .map(|row| SnapshotOffsetEntry {
175 stream: row.stream,
176 partition: row.partition,
177 offset: row.offset,
178 })
179 .collect();
180
181 Ok(InstrumentsSnapshotData {
182 entries: entry_data,
183 offsets: offset_data,
184 })
185 }
186}
187
188impl hypercall_db::PortfolioSnapshotWriter for DatabaseHandler {
191 fn write_portfolio_snapshot_sync(&self, input: &PortfolioSnapshotInput) -> Result<i64> {
192 let mut conn = self.pool().get().context("Failed to get DB connection")?;
193
194 conn.transaction(|conn| {
195 let snapshot_id: i64 = diesel::sql_query(
197 "INSERT INTO engine_snapshots (snapshot_type, engine_version) VALUES ('portfolio', NULL) RETURNING id",
198 )
199 .get_result::<SnapshotIdResult>(conn)
200 .context("Failed to insert snapshot header")?
201 .id;
202
203 debug!("Created portfolio snapshot header with id={}", snapshot_id);
204
205 let mut account_count = 0;
207 for account in &input.accounts {
208 diesel::sql_query(
209 "INSERT INTO portfolio_snapshot_accounts (snapshot_id, wallet, data) VALUES ($1, $2, $3)",
210 )
211 .bind::<BigInt, _>(snapshot_id)
212 .bind::<Text, _>(&account.wallet)
213 .bind::<Bytea, _>(&account.data)
214 .execute(conn)
215 .with_context(|| format!("Failed to insert account {}", account.wallet))?;
216 account_count += 1;
217 }
218
219 debug!("Inserted {} portfolio accounts", account_count);
220
221 let mut offset_count = 0;
223 for (stream, partitions) in &input.offsets {
224 for (partition, offset) in partitions {
225 diesel::sql_query(
226 "INSERT INTO portfolio_snapshot_offsets (snapshot_id, stream, partition, \"offset\") VALUES ($1, $2, $3, $4)",
227 )
228 .bind::<BigInt, _>(snapshot_id)
229 .bind::<Text, _>(stream)
230 .bind::<Int4, _>(*partition)
231 .bind::<BigInt, _>(*offset)
232 .execute(conn)
233 .with_context(|| format!("Failed to insert offset for {}", stream))?;
234 offset_count += 1;
235 }
236 }
237
238 debug!("Inserted {} offset entries", offset_count);
239
240 let deleted = diesel::sql_query(
242 "DELETE FROM engine_snapshots WHERE snapshot_type = 'portfolio' AND id NOT IN (
243 SELECT id FROM engine_snapshots
244 WHERE snapshot_type = 'portfolio'
245 ORDER BY created_at DESC
246 LIMIT $1
247 )",
248 )
249 .bind::<BigInt, _>(input.retention_count)
250 .execute(conn)
251 .context("Failed to cleanup old snapshots")?;
252
253 if deleted > 0 {
254 info!("Cleaned up {} old portfolio snapshot(s)", deleted);
255 }
256
257 info!(
258 "Created portfolio snapshot id={} with {} accounts and {} offsets (retaining {})",
259 snapshot_id, account_count, offset_count, input.retention_count
260 );
261
262 Ok(snapshot_id)
263 })
264 }
265}
266
267impl hypercall_db::PortfolioSnapshotReader for DatabaseHandler {
270 fn get_latest_portfolio_snapshot_id_sync(&self) -> Result<Option<i64>> {
271 let mut conn = self.pool().get().context("Failed to get DB connection")?;
272
273 let result = diesel::sql_query(
274 "SELECT id FROM engine_snapshots WHERE snapshot_type = 'portfolio' ORDER BY created_at DESC LIMIT 1",
275 )
276 .get_result::<SnapshotIdResult>(&mut conn)
277 .optional()
278 .context("Failed to query latest snapshot")?;
279
280 Ok(result.map(|r| r.id))
281 }
282
283 fn portfolio_snapshot_exists_sync(&self, snapshot_id: i64) -> Result<bool> {
284 let mut conn = self.pool().get().context("Failed to get DB connection")?;
285
286 let exists: Option<SnapshotIdResult> = diesel::sql_query(
287 "SELECT id FROM engine_snapshots WHERE id = $1 AND snapshot_type = 'portfolio'",
288 )
289 .bind::<BigInt, _>(snapshot_id)
290 .get_result(&mut conn)
291 .optional()
292 .context("Failed to verify snapshot")?;
293
294 Ok(exists.is_some())
295 }
296
297 fn load_portfolio_snapshot_sync(&self, snapshot_id: i64) -> Result<PortfolioSnapshotData> {
298 let mut conn = self.pool().get().context("Failed to get DB connection")?;
299
300 let account_rows: Vec<PortfolioAccountRow> = diesel::sql_query(
302 "SELECT wallet, data FROM portfolio_snapshot_accounts WHERE snapshot_id = $1",
303 )
304 .bind::<BigInt, _>(snapshot_id)
305 .load(&mut conn)
306 .context("Failed to load accounts")?;
307
308 let accounts: Vec<PortfolioSnapshotAccountEntry> = account_rows
309 .into_iter()
310 .map(|row| PortfolioSnapshotAccountEntry {
311 wallet: row.wallet,
312 data: row.data,
313 })
314 .collect();
315
316 let offset_rows: Vec<OffsetRow> = diesel::sql_query(
318 "SELECT stream, partition, \"offset\" FROM portfolio_snapshot_offsets WHERE snapshot_id = $1",
319 )
320 .bind::<BigInt, _>(snapshot_id)
321 .load(&mut conn)
322 .context("Failed to load offsets")?;
323
324 let offsets: Vec<SnapshotOffsetEntry> = offset_rows
325 .into_iter()
326 .map(|row| SnapshotOffsetEntry {
327 stream: row.stream,
328 partition: row.partition,
329 offset: row.offset,
330 })
331 .collect();
332
333 Ok(PortfolioSnapshotData { accounts, offsets })
334 }
335}
336
337