1use anyhow::{anyhow, Context, Result};
2use chrono::{DateTime, Utc};
3use diesel::prelude::*;
4use diesel::sql_types::{BigInt, Bytea, Integer, SmallInt, Text, Uuid};
5
6use crate::database_handler::DatabaseHandler;
7use crate::diesel_db::DieselDb;
8use crate::engine_enums::{CommandType, DbUuid};
9use crate::schema::{
10 validator_rsm_batch_roots, validator_rsm_block_commands, validator_rsm_blocks,
11 validator_rsm_current_state,
12};
13
14#[derive(Queryable, Selectable)]
15#[diesel(table_name = validator_rsm_batch_roots)]
16#[diesel(check_for_backend(diesel::pg::Pg))]
17struct RootSummaryRow {
18 environment: i16,
19 version: i64,
20 batch_seq: i64,
21 state_root: Vec<u8>,
22 risk_root: Vec<u8>,
23 command_mmr_root: Vec<u8>,
24 obligation_mmr_root: Vec<u8>,
25 intent_mmr_root: Vec<u8>,
26 batch_root: Vec<u8>,
27 command_range_start: i64,
28 command_range_end: i64,
29 command_count: i64,
30 schema_version: i32,
31 accepted_block_hash: Vec<u8>,
32 created_at: DateTime<Utc>,
33}
34
35#[derive(Queryable, Selectable)]
36#[diesel(table_name = validator_rsm_current_state)]
37#[diesel(check_for_backend(diesel::pg::Pg))]
38struct CurrentStateRow {
39 environment: i16,
40 current_version: i64,
41 updated_at: DateTime<Utc>,
42}
43
44#[derive(Queryable, Selectable)]
45#[diesel(table_name = validator_rsm_blocks)]
46#[diesel(check_for_backend(diesel::pg::Pg))]
47struct BlockRow {
48 environment: i16,
49 height: i64,
50 hash: Vec<u8>,
51 parent_hash: Vec<u8>,
52 commands_hash: Vec<u8>,
53 batch_root: Vec<u8>,
54 command_count: i64,
55 first_command_seq: i64,
56 last_command_seq: i64,
57 signer: Option<Vec<u8>>,
58 signature: Option<Vec<u8>>,
59 created_at: DateTime<Utc>,
60}
61
62#[derive(Queryable, Selectable)]
63#[diesel(table_name = validator_rsm_block_commands)]
64#[diesel(check_for_backend(diesel::pg::Pg))]
65struct BlockCommandRow {
66 environment: i16,
67 height: i64,
68 rsm_command_seq: i64,
69 command_index: i64,
70 engine_command_id: i64,
71 request_uuid: DbUuid,
72 command_type: CommandType,
73 command_data: Vec<u8>,
74 command_identity_hash: Vec<u8>,
75 created_at: DateTime<Utc>,
76}
77
78fn u64_to_i64(name: &str, value: u64) -> Result<i64> {
79 i64::try_from(value).map_err(|_| anyhow!("{name} overflows BIGINT: {value}"))
80}
81
82fn i64_to_u64(name: &str, value: i64) -> Result<u64> {
83 u64::try_from(value).map_err(|_| anyhow!("{name} is negative: {value}"))
84}
85
86fn root32(name: &str, bytes: Vec<u8>) -> Result<[u8; 32]> {
87 bytes
88 .try_into()
89 .map_err(|bytes: Vec<u8>| anyhow!("{name} must be 32 bytes, got {}", bytes.len()))
90}
91
92fn addr20(name: &str, bytes: Vec<u8>) -> Result<[u8; 20]> {
93 bytes
94 .try_into()
95 .map_err(|bytes: Vec<u8>| anyhow!("{name} must be 20 bytes, got {}", bytes.len()))
96}
97
98impl TryFrom<RootSummaryRow> for hypercall_db::ValidatorRsmRootSummary {
99 type Error = anyhow::Error;
100
101 fn try_from(row: RootSummaryRow) -> Result<Self> {
102 Ok(Self {
103 environment: row.environment.try_into()?,
104 version: i64_to_u64("version", row.version)?,
105 batch_seq: i64_to_u64("batch_seq", row.batch_seq)?,
106 state_root: root32("state_root", row.state_root)?,
107 risk_root: root32("risk_root", row.risk_root)?,
108 command_mmr_root: root32("command_mmr_root", row.command_mmr_root)?,
109 obligation_mmr_root: root32("obligation_mmr_root", row.obligation_mmr_root)?,
110 intent_mmr_root: root32("intent_mmr_root", row.intent_mmr_root)?,
111 batch_root: root32("batch_root", row.batch_root)?,
112 command_range_start: i64_to_u64("command_range_start", row.command_range_start)?,
113 command_range_end: i64_to_u64("command_range_end", row.command_range_end)?,
114 command_count: i64_to_u64("command_count", row.command_count)?,
115 schema_version: row.schema_version,
116 accepted_block_hash: root32("accepted_block_hash", row.accepted_block_hash)?,
117 created_at: row.created_at,
118 })
119 }
120}
121
122impl TryFrom<BlockRow> for hypercall_db::RsmBlockHeader {
123 type Error = anyhow::Error;
124
125 fn try_from(row: BlockRow) -> Result<Self> {
126 Ok(Self {
127 environment: row.environment.try_into()?,
128 height: i64_to_u64("height", row.height)?,
129 hash: root32("hash", row.hash)?,
130 parent_hash: root32("parent_hash", row.parent_hash)?,
131 commands_hash: root32("commands_hash", row.commands_hash)?,
132 batch_root: root32("batch_root", row.batch_root)?,
133 command_count: i64_to_u64("command_count", row.command_count)?,
134 first_command_seq: i64_to_u64("first_command_seq", row.first_command_seq)?,
135 last_command_seq: i64_to_u64("last_command_seq", row.last_command_seq)?,
136 signer: row
137 .signer
138 .map(|bytes| addr20("signer", bytes))
139 .transpose()?,
140 signature: row.signature,
141 created_at: row.created_at,
142 })
143 }
144}
145
146impl TryFrom<CurrentStateRow> for hypercall_db::ValidatorRsmCurrentState {
147 type Error = anyhow::Error;
148
149 fn try_from(row: CurrentStateRow) -> Result<Self> {
150 Ok(Self {
151 environment: row.environment.try_into()?,
152 current_version: i64_to_u64("current_version", row.current_version)?,
153 updated_at: row.updated_at,
154 })
155 }
156}
157
158impl TryFrom<BlockCommandRow> for hypercall_db::RsmBlockCommand {
159 type Error = anyhow::Error;
160
161 fn try_from(row: BlockCommandRow) -> Result<Self> {
162 Ok(Self {
163 environment: row.environment.try_into()?,
164 height: i64_to_u64("height", row.height)?,
165 rsm_command_seq: i64_to_u64("rsm_command_seq", row.rsm_command_seq)?,
166 command_index: i64_to_u64("command_index", row.command_index)?,
167 engine_command_id: row.engine_command_id,
168 request_uuid: row.request_uuid.0,
169 command_type: row.command_type.to_string(),
170 command_data: row.command_data,
171 command_identity_hash: root32("command_identity_hash", row.command_identity_hash)?,
172 created_at: row.created_at,
173 })
174 }
175}
176
177fn block_view_from_rows(
178 block: BlockRow,
179 root_summary: RootSummaryRow,
180) -> Result<hypercall_db::RsmBlockView> {
181 Ok(hypercall_db::RsmBlockView {
182 block: block.try_into()?,
183 root_summary: root_summary.try_into()?,
184 })
185}
186
187#[async_trait::async_trait]
188impl hypercall_db::ValidatorRsmStateAsyncReader for DieselDb {
189 async fn get_validator_rsm_root_summary(
190 &self,
191 environment: hypercall_db::ValidatorRsmEnvironment,
192 version: u64,
193 ) -> Result<Option<hypercall_db::ValidatorRsmRootSummary>> {
194 let version = u64_to_i64("version", version)?;
195 let mut conn = self.get_conn().await?;
196 let query = validator_rsm_batch_roots::table
197 .filter(validator_rsm_batch_roots::environment.eq(environment.as_i16()))
198 .filter(validator_rsm_batch_roots::version.eq(version))
199 .select(RootSummaryRow::as_select());
200 let row = diesel_async::RunQueryDsl::get_result(query, &mut conn)
201 .await
202 .optional()?;
203
204 row.map(TryInto::try_into).transpose()
205 }
206
207 async fn get_validator_rsm_current_state(
208 &self,
209 environment: hypercall_db::ValidatorRsmEnvironment,
210 ) -> Result<Option<hypercall_db::ValidatorRsmCurrentState>> {
211 let mut conn = self.get_conn().await?;
212 let query = validator_rsm_current_state::table
213 .filter(validator_rsm_current_state::environment.eq(environment.as_i16()))
214 .select(CurrentStateRow::as_select());
215 let row = diesel_async::RunQueryDsl::get_result(query, &mut conn)
216 .await
217 .optional()?;
218
219 row.map(TryInto::try_into).transpose()
220 }
221
222 async fn get_validator_rsm_current_root_summary(
223 &self,
224 environment: hypercall_db::ValidatorRsmEnvironment,
225 ) -> Result<Option<hypercall_db::ValidatorRsmRootSummary>> {
226 let mut conn = self.get_conn().await?;
227 let query = validator_rsm_current_state::table
228 .inner_join(
229 validator_rsm_batch_roots::table.on(validator_rsm_batch_roots::environment
230 .eq(validator_rsm_current_state::environment)
231 .and(
232 validator_rsm_batch_roots::version
233 .eq(validator_rsm_current_state::current_version),
234 )),
235 )
236 .filter(validator_rsm_current_state::environment.eq(environment.as_i16()))
237 .select(RootSummaryRow::as_select());
238 let row = diesel_async::RunQueryDsl::get_result(query, &mut conn)
239 .await
240 .optional()?;
241
242 row.map(TryInto::try_into).transpose()
243 }
244
245 async fn get_rsm_block_by_height(
246 &self,
247 environment: hypercall_db::ValidatorRsmEnvironment,
248 height: u64,
249 ) -> Result<Option<hypercall_db::RsmBlockView>> {
250 let height = u64_to_i64("height", height)?;
251 let mut conn = self.get_conn().await?;
252 let query = validator_rsm_blocks::table
253 .inner_join(
254 validator_rsm_batch_roots::table.on(validator_rsm_batch_roots::environment
255 .eq(validator_rsm_blocks::environment)
256 .and(validator_rsm_batch_roots::batch_seq.eq(validator_rsm_blocks::height))
257 .and(
258 validator_rsm_batch_roots::accepted_block_hash
259 .eq(validator_rsm_blocks::hash),
260 )),
261 )
262 .filter(validator_rsm_blocks::environment.eq(environment.as_i16()))
263 .filter(validator_rsm_blocks::height.eq(height))
264 .select((BlockRow::as_select(), RootSummaryRow::as_select()));
265 let row = diesel_async::RunQueryDsl::get_result(query, &mut conn)
266 .await
267 .optional()?;
268
269 row.map(|(block, root)| block_view_from_rows(block, root))
270 .transpose()
271 }
272
273 async fn get_rsm_block_by_hash(
274 &self,
275 environment: hypercall_db::ValidatorRsmEnvironment,
276 hash: [u8; 32],
277 ) -> Result<Option<hypercall_db::RsmBlockView>> {
278 let mut conn = self.get_conn().await?;
279 let query = validator_rsm_blocks::table
280 .inner_join(
281 validator_rsm_batch_roots::table.on(validator_rsm_batch_roots::environment
282 .eq(validator_rsm_blocks::environment)
283 .and(validator_rsm_batch_roots::batch_seq.eq(validator_rsm_blocks::height))
284 .and(
285 validator_rsm_batch_roots::accepted_block_hash
286 .eq(validator_rsm_blocks::hash),
287 )),
288 )
289 .filter(validator_rsm_blocks::environment.eq(environment.as_i16()))
290 .filter(validator_rsm_blocks::hash.eq(hash.to_vec()))
291 .select((BlockRow::as_select(), RootSummaryRow::as_select()));
292 let row = diesel_async::RunQueryDsl::get_result(query, &mut conn)
293 .await
294 .optional()?;
295
296 row.map(|(block, root)| block_view_from_rows(block, root))
297 .transpose()
298 }
299
300 async fn get_latest_rsm_block(
301 &self,
302 environment: hypercall_db::ValidatorRsmEnvironment,
303 ) -> Result<Option<hypercall_db::RsmBlockView>> {
304 let mut conn = self.get_conn().await?;
305 let query = validator_rsm_blocks::table
306 .inner_join(
307 validator_rsm_batch_roots::table.on(validator_rsm_batch_roots::environment
308 .eq(validator_rsm_blocks::environment)
309 .and(validator_rsm_batch_roots::batch_seq.eq(validator_rsm_blocks::height))
310 .and(
311 validator_rsm_batch_roots::accepted_block_hash
312 .eq(validator_rsm_blocks::hash),
313 )),
314 )
315 .inner_join(
316 validator_rsm_current_state::table.on(validator_rsm_current_state::environment
317 .eq(validator_rsm_blocks::environment)
318 .and(
319 validator_rsm_current_state::current_version
320 .eq(validator_rsm_batch_roots::version),
321 )),
322 )
323 .filter(validator_rsm_blocks::environment.eq(environment.as_i16()))
324 .select((BlockRow::as_select(), RootSummaryRow::as_select()));
325 let row = diesel_async::RunQueryDsl::get_result(query, &mut conn)
326 .await
327 .optional()?;
328
329 row.map(|(block, root)| block_view_from_rows(block, root))
330 .transpose()
331 }
332
333 async fn list_rsm_blocks(
334 &self,
335 environment: hypercall_db::ValidatorRsmEnvironment,
336 from_height: Option<u64>,
337 limit: u32,
338 ) -> Result<Vec<hypercall_db::RsmBlockView>> {
339 let limit = i64::from(limit.clamp(1, 100));
340 let mut conn = self.get_conn().await?;
341 let mut query = validator_rsm_blocks::table
342 .inner_join(
343 validator_rsm_batch_roots::table.on(validator_rsm_batch_roots::environment
344 .eq(validator_rsm_blocks::environment)
345 .and(validator_rsm_batch_roots::batch_seq.eq(validator_rsm_blocks::height))
346 .and(
347 validator_rsm_batch_roots::accepted_block_hash
348 .eq(validator_rsm_blocks::hash),
349 )),
350 )
351 .filter(validator_rsm_blocks::environment.eq(environment.as_i16()))
352 .into_boxed();
353
354 if let Some(from_height) = from_height {
355 query = query
356 .filter(validator_rsm_blocks::height.le(u64_to_i64("from_height", from_height)?));
357 }
358
359 let query = query
360 .select((BlockRow::as_select(), RootSummaryRow::as_select()))
361 .order(validator_rsm_blocks::height.desc())
362 .limit(limit);
363 let rows = diesel_async::RunQueryDsl::load(query, &mut conn).await?;
364
365 rows.into_iter()
366 .map(|(block, root)| block_view_from_rows(block, root))
367 .collect()
368 }
369
370 async fn get_rsm_block_commands(
371 &self,
372 environment: hypercall_db::ValidatorRsmEnvironment,
373 height: u64,
374 ) -> Result<Vec<hypercall_db::RsmBlockCommand>> {
375 let height = u64_to_i64("height", height)?;
376 let mut conn = self.get_conn().await?;
377 let query = validator_rsm_block_commands::table
378 .filter(validator_rsm_block_commands::environment.eq(environment.as_i16()))
379 .filter(validator_rsm_block_commands::height.eq(height))
380 .select(BlockCommandRow::as_select())
381 .order(validator_rsm_block_commands::command_index.asc());
382 let rows = diesel_async::RunQueryDsl::load(query, &mut conn).await?;
383
384 rows.into_iter().map(TryInto::try_into).collect()
385 }
386}
387
388impl hypercall_db::ValidatorRsmStateReader for DatabaseHandler {
389 fn get_validator_rsm_root_summary_sync(
390 &self,
391 environment: hypercall_db::ValidatorRsmEnvironment,
392 version: u64,
393 ) -> Result<Option<hypercall_db::ValidatorRsmRootSummary>> {
394 let version = u64_to_i64("version", version)?;
395 let mut conn = self.pool().get().context("Failed to get DB connection")?;
396 let row = validator_rsm_batch_roots::table
397 .filter(validator_rsm_batch_roots::environment.eq(environment.as_i16()))
398 .filter(validator_rsm_batch_roots::version.eq(version))
399 .select(RootSummaryRow::as_select())
400 .get_result::<RootSummaryRow>(&mut conn)
401 .optional()?;
402
403 row.map(TryInto::try_into).transpose()
404 }
405
406 fn get_validator_rsm_current_state_sync(
407 &self,
408 environment: hypercall_db::ValidatorRsmEnvironment,
409 ) -> Result<Option<hypercall_db::ValidatorRsmCurrentState>> {
410 let mut conn = self.pool().get().context("Failed to get DB connection")?;
411 let row = validator_rsm_current_state::table
412 .filter(validator_rsm_current_state::environment.eq(environment.as_i16()))
413 .select(CurrentStateRow::as_select())
414 .get_result::<CurrentStateRow>(&mut conn)
415 .optional()?;
416
417 row.map(TryInto::try_into).transpose()
418 }
419
420 fn get_validator_rsm_current_root_summary_sync(
421 &self,
422 environment: hypercall_db::ValidatorRsmEnvironment,
423 ) -> Result<Option<hypercall_db::ValidatorRsmRootSummary>> {
424 let mut conn = self.pool().get().context("Failed to get DB connection")?;
425 let row = validator_rsm_current_state::table
426 .inner_join(
427 validator_rsm_batch_roots::table.on(validator_rsm_batch_roots::environment
428 .eq(validator_rsm_current_state::environment)
429 .and(
430 validator_rsm_batch_roots::version
431 .eq(validator_rsm_current_state::current_version),
432 )),
433 )
434 .filter(validator_rsm_current_state::environment.eq(environment.as_i16()))
435 .select(RootSummaryRow::as_select())
436 .get_result::<RootSummaryRow>(&mut conn)
437 .optional()?;
438
439 row.map(TryInto::try_into).transpose()
440 }
441
442 fn get_rsm_block_by_height_sync(
443 &self,
444 environment: hypercall_db::ValidatorRsmEnvironment,
445 height: u64,
446 ) -> Result<Option<hypercall_db::RsmBlockView>> {
447 let height = u64_to_i64("height", height)?;
448 let mut conn = self.pool().get().context("Failed to get DB connection")?;
449 let row = validator_rsm_blocks::table
450 .inner_join(
451 validator_rsm_batch_roots::table.on(validator_rsm_batch_roots::environment
452 .eq(validator_rsm_blocks::environment)
453 .and(validator_rsm_batch_roots::batch_seq.eq(validator_rsm_blocks::height))
454 .and(
455 validator_rsm_batch_roots::accepted_block_hash
456 .eq(validator_rsm_blocks::hash),
457 )),
458 )
459 .filter(validator_rsm_blocks::environment.eq(environment.as_i16()))
460 .filter(validator_rsm_blocks::height.eq(height))
461 .select((BlockRow::as_select(), RootSummaryRow::as_select()))
462 .get_result::<(BlockRow, RootSummaryRow)>(&mut conn)
463 .optional()?;
464
465 row.map(|(block, root)| block_view_from_rows(block, root))
466 .transpose()
467 }
468
469 fn get_rsm_block_by_hash_sync(
470 &self,
471 environment: hypercall_db::ValidatorRsmEnvironment,
472 hash: [u8; 32],
473 ) -> Result<Option<hypercall_db::RsmBlockView>> {
474 let mut conn = self.pool().get().context("Failed to get DB connection")?;
475 let row = validator_rsm_blocks::table
476 .inner_join(
477 validator_rsm_batch_roots::table.on(validator_rsm_batch_roots::environment
478 .eq(validator_rsm_blocks::environment)
479 .and(validator_rsm_batch_roots::batch_seq.eq(validator_rsm_blocks::height))
480 .and(
481 validator_rsm_batch_roots::accepted_block_hash
482 .eq(validator_rsm_blocks::hash),
483 )),
484 )
485 .filter(validator_rsm_blocks::environment.eq(environment.as_i16()))
486 .filter(validator_rsm_blocks::hash.eq(hash.to_vec()))
487 .select((BlockRow::as_select(), RootSummaryRow::as_select()))
488 .get_result::<(BlockRow, RootSummaryRow)>(&mut conn)
489 .optional()?;
490
491 row.map(|(block, root)| block_view_from_rows(block, root))
492 .transpose()
493 }
494
495 fn get_latest_rsm_block_sync(
496 &self,
497 environment: hypercall_db::ValidatorRsmEnvironment,
498 ) -> Result<Option<hypercall_db::RsmBlockView>> {
499 let mut conn = self.pool().get().context("Failed to get DB connection")?;
500 let row = validator_rsm_blocks::table
501 .inner_join(
502 validator_rsm_batch_roots::table.on(validator_rsm_batch_roots::environment
503 .eq(validator_rsm_blocks::environment)
504 .and(validator_rsm_batch_roots::batch_seq.eq(validator_rsm_blocks::height))
505 .and(
506 validator_rsm_batch_roots::accepted_block_hash
507 .eq(validator_rsm_blocks::hash),
508 )),
509 )
510 .inner_join(
511 validator_rsm_current_state::table.on(validator_rsm_current_state::environment
512 .eq(validator_rsm_blocks::environment)
513 .and(
514 validator_rsm_current_state::current_version
515 .eq(validator_rsm_batch_roots::version),
516 )),
517 )
518 .filter(validator_rsm_blocks::environment.eq(environment.as_i16()))
519 .select((BlockRow::as_select(), RootSummaryRow::as_select()))
520 .get_result::<(BlockRow, RootSummaryRow)>(&mut conn)
521 .optional()?;
522
523 row.map(|(block, root)| block_view_from_rows(block, root))
524 .transpose()
525 }
526
527 fn list_rsm_blocks_sync(
528 &self,
529 environment: hypercall_db::ValidatorRsmEnvironment,
530 from_height: Option<u64>,
531 limit: u32,
532 ) -> Result<Vec<hypercall_db::RsmBlockView>> {
533 let limit = i64::from(limit.clamp(1, 100));
534 let mut conn = self.pool().get().context("Failed to get DB connection")?;
535 let mut query = validator_rsm_blocks::table
536 .inner_join(
537 validator_rsm_batch_roots::table.on(validator_rsm_batch_roots::environment
538 .eq(validator_rsm_blocks::environment)
539 .and(validator_rsm_batch_roots::batch_seq.eq(validator_rsm_blocks::height))
540 .and(
541 validator_rsm_batch_roots::accepted_block_hash
542 .eq(validator_rsm_blocks::hash),
543 )),
544 )
545 .filter(validator_rsm_blocks::environment.eq(environment.as_i16()))
546 .into_boxed();
547
548 if let Some(from_height) = from_height {
549 query = query
550 .filter(validator_rsm_blocks::height.le(u64_to_i64("from_height", from_height)?));
551 }
552
553 let rows = query
554 .select((BlockRow::as_select(), RootSummaryRow::as_select()))
555 .order(validator_rsm_blocks::height.desc())
556 .limit(limit)
557 .load::<(BlockRow, RootSummaryRow)>(&mut conn)?;
558
559 rows.into_iter()
560 .map(|(block, root)| block_view_from_rows(block, root))
561 .collect()
562 }
563
564 fn get_rsm_block_commands_sync(
565 &self,
566 environment: hypercall_db::ValidatorRsmEnvironment,
567 height: u64,
568 ) -> Result<Vec<hypercall_db::RsmBlockCommand>> {
569 let height = u64_to_i64("height", height)?;
570 let mut conn = self.pool().get().context("Failed to get DB connection")?;
571 let rows = validator_rsm_block_commands::table
572 .filter(validator_rsm_block_commands::environment.eq(environment.as_i16()))
573 .filter(validator_rsm_block_commands::height.eq(height))
574 .select(BlockCommandRow::as_select())
575 .order(validator_rsm_block_commands::command_index.asc())
576 .load::<BlockCommandRow>(&mut conn)?;
577
578 rows.into_iter().map(TryInto::try_into).collect()
579 }
580}
581
582impl hypercall_db::ValidatorRsmStateWriter for DatabaseHandler {
583 fn save_validator_rsm_root_summary_sync(
584 &self,
585 summary: &hypercall_db::NewValidatorRsmRootSummary,
586 advance_current: bool,
587 ) -> Result<()> {
588 let version = u64_to_i64("version", summary.version)?;
589 let batch_seq = u64_to_i64("batch_seq", summary.batch_seq)?;
590 let command_range_start = u64_to_i64("command_range_start", summary.command_range_start)?;
591 let command_range_end = u64_to_i64("command_range_end", summary.command_range_end)?;
592 let command_count = u64_to_i64("command_count", summary.command_count)?;
593 let mut conn = self.pool().get().context("Failed to get DB connection")?;
594
595 conn.transaction(|conn| {
596 let inserted = diesel::sql_query(
597 "INSERT INTO validator_rsm_batch_roots (
598 environment, version, batch_seq, state_root, risk_root, command_mmr_root,
599 obligation_mmr_root, intent_mmr_root, batch_root, command_range_start,
600 command_range_end, command_count, schema_version, accepted_block_hash
601 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
602 ON CONFLICT (environment, version) DO NOTHING",
603 )
604 .bind::<SmallInt, _>(summary.environment.as_i16())
605 .bind::<BigInt, _>(version)
606 .bind::<BigInt, _>(batch_seq)
607 .bind::<Bytea, _>(&summary.state_root[..])
608 .bind::<Bytea, _>(&summary.risk_root[..])
609 .bind::<Bytea, _>(&summary.command_mmr_root[..])
610 .bind::<Bytea, _>(&summary.obligation_mmr_root[..])
611 .bind::<Bytea, _>(&summary.intent_mmr_root[..])
612 .bind::<Bytea, _>(&summary.batch_root[..])
613 .bind::<BigInt, _>(command_range_start)
614 .bind::<BigInt, _>(command_range_end)
615 .bind::<BigInt, _>(command_count)
616 .bind::<Integer, _>(summary.schema_version)
617 .bind::<Bytea, _>(&summary.accepted_block_hash[..])
618 .execute(conn)?;
619 if inserted == 0 {
620 let existing = validator_rsm_batch_roots::table
621 .filter(validator_rsm_batch_roots::environment.eq(summary.environment.as_i16()))
622 .filter(validator_rsm_batch_roots::version.eq(version))
623 .select(RootSummaryRow::as_select())
624 .get_result::<RootSummaryRow>(conn)?;
625
626 let matches_existing = existing.batch_seq == batch_seq
627 && existing.state_root == summary.state_root.as_slice()
628 && existing.risk_root == summary.risk_root.as_slice()
629 && existing.command_mmr_root == summary.command_mmr_root.as_slice()
630 && existing.obligation_mmr_root == summary.obligation_mmr_root.as_slice()
631 && existing.intent_mmr_root == summary.intent_mmr_root.as_slice()
632 && existing.batch_root == summary.batch_root.as_slice()
633 && existing.command_range_start == command_range_start
634 && existing.command_range_end == command_range_end
635 && existing.command_count == command_count
636 && existing.schema_version == summary.schema_version
637 && existing.accepted_block_hash == summary.accepted_block_hash.as_slice();
638 if !matches_existing {
639 return Err(anyhow!(
640 "conflicting validator RSM root summary for {} version {}",
641 summary.environment,
642 summary.version
643 ));
644 }
645 }
646
647 if advance_current {
648 let affected = diesel::sql_query(
649 "INSERT INTO validator_rsm_current_state (environment, current_version, updated_at)
650 VALUES ($1, $2, NOW())
651 ON CONFLICT (environment) DO UPDATE
652 SET current_version = EXCLUDED.current_version, updated_at = NOW()
653 WHERE validator_rsm_current_state.current_version <= EXCLUDED.current_version",
654 )
655 .bind::<SmallInt, _>(summary.environment.as_i16())
656 .bind::<BigInt, _>(version)
657 .execute(conn)?;
658 if affected == 0 {
659 return Err(anyhow!(
660 "refusing to move validator RSM current version for {} backward to {}",
661 summary.environment,
662 version
663 ));
664 }
665 }
666
667 Ok::<_, anyhow::Error>(())
668 })
669 }
670
671 fn save_rsm_block_header_sync(&self, block: &hypercall_db::NewRsmBlockHeader) -> Result<()> {
672 let height = u64_to_i64("height", block.height)?;
673 let command_count = u64_to_i64("command_count", block.command_count)?;
674 let first_command_seq = u64_to_i64("first_command_seq", block.first_command_seq)?;
675 let last_command_seq = u64_to_i64("last_command_seq", block.last_command_seq)?;
676 let mut conn = self.pool().get().context("Failed to get DB connection")?;
677
678 conn.transaction(|conn| {
679 let inserted = diesel::sql_query(
680 "INSERT INTO validator_rsm_blocks (
681 environment, height, hash, parent_hash, commands_hash, batch_root,
682 command_count, first_command_seq, last_command_seq, signer, signature
683 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
684 ON CONFLICT (environment, height) DO NOTHING",
685 )
686 .bind::<SmallInt, _>(block.environment.as_i16())
687 .bind::<BigInt, _>(height)
688 .bind::<Bytea, _>(&block.hash[..])
689 .bind::<Bytea, _>(&block.parent_hash[..])
690 .bind::<Bytea, _>(&block.commands_hash[..])
691 .bind::<Bytea, _>(&block.batch_root[..])
692 .bind::<BigInt, _>(command_count)
693 .bind::<BigInt, _>(first_command_seq)
694 .bind::<BigInt, _>(last_command_seq)
695 .bind::<diesel::sql_types::Nullable<Bytea>, _>(
696 block.signer.as_ref().map(|signer| signer.as_slice()),
697 )
698 .bind::<diesel::sql_types::Nullable<Bytea>, _>(block.signature.as_deref())
699 .execute(conn)?;
700
701 if inserted == 0 {
702 let existing = validator_rsm_blocks::table
703 .filter(validator_rsm_blocks::environment.eq(block.environment.as_i16()))
704 .filter(validator_rsm_blocks::height.eq(height))
705 .select(BlockRow::as_select())
706 .get_result::<BlockRow>(conn)?;
707
708 let matches_existing = existing.hash == block.hash.as_slice()
709 && existing.parent_hash == block.parent_hash.as_slice()
710 && existing.commands_hash == block.commands_hash.as_slice()
711 && existing.batch_root == block.batch_root.as_slice()
712 && existing.command_count == command_count
713 && existing.first_command_seq == first_command_seq
714 && existing.last_command_seq == last_command_seq
715 && existing.signer.as_deref() == block.signer.as_ref().map(|s| &s[..])
716 && existing.signature.as_deref() == block.signature.as_deref();
717 if !matches_existing {
718 return Err(anyhow!(
719 "conflicting validator RSM block for {} height {}",
720 block.environment,
721 block.height
722 ));
723 }
724 }
725
726 Ok::<_, anyhow::Error>(())
727 })
728 }
729
730 fn save_rsm_block_commands_sync(
731 &self,
732 commands: &[hypercall_db::NewRsmBlockCommand],
733 ) -> Result<()> {
734 if commands.is_empty() {
735 return Ok(());
736 }
737
738 let mut conn = self.pool().get().context("Failed to get DB connection")?;
739 conn.transaction(|conn| {
740 for command in commands {
741 let height = u64_to_i64("height", command.height)?;
742 let rsm_command_seq = u64_to_i64("rsm_command_seq", command.rsm_command_seq)?;
743 let command_index = u64_to_i64("command_index", command.command_index)?;
744 let inserted = diesel::sql_query(
745 "INSERT INTO validator_rsm_block_commands (
746 environment, height, rsm_command_seq, command_index, engine_command_id,
747 request_uuid, command_type, command_data, command_identity_hash
748 ) VALUES ($1, $2, $3, $4, $5, $6, $7::engine_command_type, $8, $9)
749 ON CONFLICT (environment, height, rsm_command_seq) DO NOTHING",
750 )
751 .bind::<SmallInt, _>(command.environment.as_i16())
752 .bind::<BigInt, _>(height)
753 .bind::<BigInt, _>(rsm_command_seq)
754 .bind::<BigInt, _>(command_index)
755 .bind::<BigInt, _>(command.engine_command_id)
756 .bind::<Uuid, _>(DbUuid(command.request_uuid))
757 .bind::<Text, _>(&command.command_type)
758 .bind::<Bytea, _>(&command.command_data)
759 .bind::<Bytea, _>(&command.command_identity_hash[..])
760 .execute(conn)?;
761
762 if inserted == 0 {
763 let existing = validator_rsm_block_commands::table
764 .filter(
765 validator_rsm_block_commands::environment
766 .eq(command.environment.as_i16()),
767 )
768 .filter(validator_rsm_block_commands::height.eq(height))
769 .filter(validator_rsm_block_commands::rsm_command_seq.eq(rsm_command_seq))
770 .select(BlockCommandRow::as_select())
771 .get_result::<BlockCommandRow>(conn)?;
772
773 let matches_existing = existing.command_index == command_index
774 && existing.engine_command_id == command.engine_command_id
775 && existing.request_uuid.0 == command.request_uuid
776 && existing.command_type.to_string() == command.command_type
777 && existing.command_data == command.command_data
778 && existing.command_identity_hash
779 == command.command_identity_hash.as_slice();
780 if !matches_existing {
781 return Err(anyhow!(
782 "conflicting validator RSM block command for {} height {} sequence {}",
783 command.environment,
784 command.height,
785 command.rsm_command_seq
786 ));
787 }
788 }
789 }
790
791 Ok::<_, anyhow::Error>(())
792 })
793 }
794}
795
796#[cfg(test)]
797mod tests {
798 use super::*;
799 use crate::test_helpers::TestDb;
800 use hypercall_db::{ValidatorRsmEnvironment, ValidatorRsmStateReader, ValidatorRsmStateWriter};
801 use uuid::Uuid as RustUuid;
802
803 fn summary(
804 environment: ValidatorRsmEnvironment,
805 version: u64,
806 ) -> hypercall_db::NewValidatorRsmRootSummary {
807 hypercall_db::NewValidatorRsmRootSummary {
808 environment,
809 version,
810 batch_seq: version + 10,
811 state_root: [version as u8; 32],
812 risk_root: [2; 32],
813 command_mmr_root: [3; 32],
814 obligation_mmr_root: [4; 32],
815 intent_mmr_root: [5; 32],
816 batch_root: [6; 32],
817 command_range_start: version * 100,
818 command_range_end: version * 100 + 2,
819 command_count: 3,
820 schema_version: 1,
821 accepted_block_hash: [7; 32],
822 }
823 }
824
825 fn block_header(
826 environment: ValidatorRsmEnvironment,
827 height: u64,
828 ) -> hypercall_db::NewRsmBlockHeader {
829 hypercall_db::NewRsmBlockHeader {
830 environment,
831 height,
832 hash: [7; 32],
833 parent_hash: [8; 32],
834 commands_hash: [9; 32],
835 batch_root: [6; 32],
836 command_count: 3,
837 first_command_seq: 100,
838 last_command_seq: 102,
839 signer: Some([10; 20]),
840 signature: Some(vec![11; 65]),
841 }
842 }
843
844 fn block_command(
845 environment: ValidatorRsmEnvironment,
846 height: u64,
847 command_index: u64,
848 engine_command_id: i64,
849 request_uuid: RustUuid,
850 ) -> hypercall_db::NewRsmBlockCommand {
851 hypercall_db::NewRsmBlockCommand {
852 environment,
853 height,
854 rsm_command_seq: 100 + command_index,
855 command_index,
856 engine_command_id,
857 request_uuid,
858 command_type: "PriceUpdate".to_string(),
859 command_data: vec![1, 2, 3, command_index as u8],
860 command_identity_hash: [command_index as u8 + 12; 32],
861 }
862 }
863
864 fn insert_engine_command(
865 db: &DatabaseHandler,
866 request_uuid: RustUuid,
867 command_type: &str,
868 command_data: &[u8],
869 ) -> i64 {
870 let mut conn = db.pool().get().unwrap();
871 diesel::sql_query(
872 "INSERT INTO engine_commands (
873 received_ts_ms, request_uuid, command_type_enum, command_data
874 ) VALUES (1, $1, $2::engine_command_type, $3)
875 RETURNING command_id",
876 )
877 .bind::<Uuid, _>(DbUuid(request_uuid))
878 .bind::<Text, _>(command_type)
879 .bind::<Bytea, _>(command_data)
880 .get_result::<CommandIdRow>(&mut conn)
881 .unwrap()
882 .command_id
883 }
884
885 #[derive(QueryableByName)]
886 struct CommandIdRow {
887 #[diesel(sql_type = BigInt)]
888 command_id: i64,
889 }
890
891 #[test]
892 fn root32_rejects_wrong_length() {
893 assert!(root32("state_root", vec![0u8; 31]).is_err());
894 assert_eq!(root32("state_root", vec![1u8; 32]).unwrap(), [1u8; 32]);
895 }
896
897 #[test]
898 fn integer_conversion_rejects_overflow_and_negative() {
899 assert!(u64_to_i64("version", i64::MAX as u64 + 1).is_err());
900 assert!(i64_to_u64("version", -1).is_err());
901 }
902
903 #[tokio::test]
904 async fn root_summary_roundtrip_and_current_pointer() {
905 let test_db = TestDb::new().await.unwrap();
906 let db = test_db.handler.as_ref();
907 let env = ValidatorRsmEnvironment::Staging;
908 let new_summary = summary(env, 1);
909
910 db.save_validator_rsm_root_summary_sync(&new_summary, true)
911 .unwrap();
912
913 let saved = db
914 .get_validator_rsm_root_summary_sync(env, 1)
915 .unwrap()
916 .expect("saved validator RSM root summary");
917 assert_eq!(saved.environment, env);
918 assert_eq!(saved.version, new_summary.version);
919 assert_eq!(saved.batch_seq, new_summary.batch_seq);
920 assert_eq!(saved.state_root, new_summary.state_root);
921 assert_eq!(saved.command_range_start, new_summary.command_range_start);
922 assert_eq!(saved.command_range_end, new_summary.command_range_end);
923 assert_eq!(saved.command_count, new_summary.command_count);
924
925 let current = db
926 .get_validator_rsm_current_state_sync(env)
927 .unwrap()
928 .expect("current validator RSM state");
929 assert_eq!(current.environment, env);
930 assert_eq!(current.current_version, 1);
931
932 let current_summary = db
933 .get_validator_rsm_current_root_summary_sync(env)
934 .unwrap()
935 .expect("current validator RSM root summary");
936 assert_eq!(current_summary.version, 1);
937 assert_eq!(current_summary.state_root, new_summary.state_root);
938 }
939
940 #[tokio::test]
941 async fn root_summary_duplicate_is_idempotent_but_conflict_fails() {
942 let test_db = TestDb::new().await.unwrap();
943 let db = test_db.handler.as_ref();
944 let new_summary = summary(ValidatorRsmEnvironment::Staging, 1);
945
946 db.save_validator_rsm_root_summary_sync(&new_summary, false)
947 .unwrap();
948 db.save_validator_rsm_root_summary_sync(&new_summary, false)
949 .unwrap();
950
951 let mut conflicting = new_summary.clone();
952 conflicting.state_root = [99; 32];
953 let err = db
954 .save_validator_rsm_root_summary_sync(&conflicting, false)
955 .expect_err("conflicting immutable root summary must fail");
956 assert!(
957 err.to_string()
958 .contains("conflicting validator RSM root summary"),
959 "unexpected error: {err:#}"
960 );
961 }
962
963 #[tokio::test]
964 async fn current_pointer_is_monotonic() {
965 let test_db = TestDb::new().await.unwrap();
966 let db = test_db.handler.as_ref();
967 let env = ValidatorRsmEnvironment::Staging;
968
969 db.save_validator_rsm_root_summary_sync(&summary(env, 1), true)
970 .unwrap();
971 db.save_validator_rsm_root_summary_sync(&summary(env, 2), true)
972 .unwrap();
973
974 let err = db
975 .save_validator_rsm_root_summary_sync(&summary(env, 1), true)
976 .expect_err("current pointer must not move backward");
977 assert!(
978 err.to_string()
979 .contains("refusing to move validator RSM current version"),
980 "unexpected error: {err:#}"
981 );
982
983 let current = db
984 .get_validator_rsm_current_state_sync(env)
985 .unwrap()
986 .expect("current validator RSM state");
987 assert_eq!(current.current_version, 2);
988 }
989
990 #[tokio::test]
991 async fn rsm_block_header_roundtrip_and_joined_views() {
992 let test_db = TestDb::new().await.unwrap();
993 let db = test_db.handler.as_ref();
994 let env = ValidatorRsmEnvironment::Staging;
995 let root = summary(env, 1);
996 let header = block_header(env, root.batch_seq);
997
998 db.save_validator_rsm_root_summary_sync(&root, true)
999 .unwrap();
1000 db.save_rsm_block_header_sync(&header).unwrap();
1001
1002 let by_height = db
1003 .get_rsm_block_by_height_sync(env, header.height)
1004 .unwrap()
1005 .expect("rsm block by height");
1006 assert_eq!(by_height.block.height, header.height);
1007 assert_eq!(by_height.block.hash, header.hash);
1008 assert_eq!(by_height.block.parent_hash, header.parent_hash);
1009 assert_eq!(by_height.block.commands_hash, header.commands_hash);
1010 assert_eq!(by_height.block.signer, header.signer);
1011 assert_eq!(by_height.root_summary.version, root.version);
1012 assert_eq!(by_height.root_summary.state_root, root.state_root);
1013
1014 let by_hash = db
1015 .get_rsm_block_by_hash_sync(env, header.hash)
1016 .unwrap()
1017 .expect("rsm block by hash");
1018 assert_eq!(by_hash.block.height, header.height);
1019
1020 let latest = db
1021 .get_latest_rsm_block_sync(env)
1022 .unwrap()
1023 .expect("latest rsm block");
1024 assert_eq!(latest.block.hash, header.hash);
1025
1026 let listed = db.list_rsm_blocks_sync(env, None, 10).unwrap();
1027 assert_eq!(listed.len(), 1);
1028 assert_eq!(listed[0].block.height, header.height);
1029 }
1030
1031 #[tokio::test]
1032 async fn rsm_block_header_duplicate_is_idempotent_but_conflict_fails() {
1033 let test_db = TestDb::new().await.unwrap();
1034 let db = test_db.handler.as_ref();
1035 let env = ValidatorRsmEnvironment::Staging;
1036 let header = block_header(env, 11);
1037
1038 db.save_rsm_block_header_sync(&header).unwrap();
1039 db.save_rsm_block_header_sync(&header).unwrap();
1040
1041 let mut conflicting = header.clone();
1042 conflicting.parent_hash = [99; 32];
1043 let err = db
1044 .save_rsm_block_header_sync(&conflicting)
1045 .expect_err("conflicting immutable block header must fail");
1046 assert!(
1047 err.to_string().contains("conflicting validator RSM block"),
1048 "unexpected error: {err:#}"
1049 );
1050 }
1051
1052 #[tokio::test]
1053 async fn rsm_block_commands_roundtrip_in_command_order() {
1054 let test_db = TestDb::new().await.unwrap();
1055 let db = test_db.handler.as_ref();
1056 let env = ValidatorRsmEnvironment::Staging;
1057 let header = block_header(env, 11);
1058 db.save_rsm_block_header_sync(&header).unwrap();
1059
1060 let request_a = RustUuid::new_v4();
1061 let request_b = RustUuid::new_v4();
1062 let command_id_a = insert_engine_command(db, request_a, "PriceUpdate", &[1, 2, 3, 0]);
1063 let command_id_b = insert_engine_command(db, request_b, "PriceUpdate", &[1, 2, 3, 1]);
1064 let commands = vec![
1065 block_command(env, header.height, 0, command_id_a, request_a),
1066 block_command(env, header.height, 1, command_id_b, request_b),
1067 ];
1068
1069 db.save_rsm_block_commands_sync(&commands).unwrap();
1070 db.save_rsm_block_commands_sync(&commands).unwrap();
1071
1072 let saved = db.get_rsm_block_commands_sync(env, header.height).unwrap();
1073 assert_eq!(saved.len(), 2);
1074 assert_eq!(saved[0].command_index, 0);
1075 assert_eq!(saved[0].rsm_command_seq, 100);
1076 assert_eq!(saved[0].engine_command_id, command_id_a);
1077 assert_eq!(saved[0].request_uuid, request_a);
1078 assert_eq!(saved[0].command_type, "PriceUpdate");
1079 assert_eq!(saved[0].command_data, vec![1, 2, 3, 0]);
1080 assert_eq!(saved[1].command_index, 1);
1081 assert_eq!(saved[1].engine_command_id, command_id_b);
1082 }
1083
1084 #[tokio::test]
1085 async fn rsm_block_command_duplicate_conflict_fails() {
1086 let test_db = TestDb::new().await.unwrap();
1087 let db = test_db.handler.as_ref();
1088 let env = ValidatorRsmEnvironment::Staging;
1089 let header = block_header(env, 12);
1090 db.save_rsm_block_header_sync(&header).unwrap();
1091
1092 let request_uuid = RustUuid::new_v4();
1093 let command_id = insert_engine_command(db, request_uuid, "PriceUpdate", &[1, 2, 3, 0]);
1094 let command = block_command(env, header.height, 0, command_id, request_uuid);
1095 db.save_rsm_block_commands_sync(std::slice::from_ref(&command))
1096 .unwrap();
1097
1098 let mut conflicting = command;
1099 conflicting.command_data = vec![9, 9, 9];
1100 let err = db
1101 .save_rsm_block_commands_sync(&[conflicting])
1102 .expect_err("conflicting immutable block command must fail");
1103 assert!(
1104 err.to_string()
1105 .contains("conflicting validator RSM block command"),
1106 "unexpected error: {err:#}"
1107 );
1108 }
1109}