Skip to main content

hypercall_db_diesel/
validator_rsm.rs

1use anyhow::{anyhow, Context, Result};
2use chrono::{DateTime, Utc};
3use diesel::prelude::*;
4use diesel::sql_types::{BigInt, Bytea, Integer, SmallInt, Text, Uuid};
5
6use crate::database_handler::DatabaseHandler;
7use crate::diesel_db::DieselDb;
8use crate::engine_enums::{CommandType, DbUuid};
9use crate::schema::{
10    validator_rsm_batch_roots, validator_rsm_block_commands, validator_rsm_blocks,
11    validator_rsm_current_state,
12};
13
14#[derive(Queryable, Selectable)]
15#[diesel(table_name = validator_rsm_batch_roots)]
16#[diesel(check_for_backend(diesel::pg::Pg))]
17struct RootSummaryRow {
18    environment: i16,
19    version: i64,
20    batch_seq: i64,
21    state_root: Vec<u8>,
22    risk_root: Vec<u8>,
23    command_mmr_root: Vec<u8>,
24    obligation_mmr_root: Vec<u8>,
25    intent_mmr_root: Vec<u8>,
26    batch_root: Vec<u8>,
27    command_range_start: i64,
28    command_range_end: i64,
29    command_count: i64,
30    schema_version: i32,
31    accepted_block_hash: Vec<u8>,
32    created_at: DateTime<Utc>,
33}
34
35#[derive(Queryable, Selectable)]
36#[diesel(table_name = validator_rsm_current_state)]
37#[diesel(check_for_backend(diesel::pg::Pg))]
38struct CurrentStateRow {
39    environment: i16,
40    current_version: i64,
41    updated_at: DateTime<Utc>,
42}
43
44#[derive(Queryable, Selectable)]
45#[diesel(table_name = validator_rsm_blocks)]
46#[diesel(check_for_backend(diesel::pg::Pg))]
47struct BlockRow {
48    environment: i16,
49    height: i64,
50    hash: Vec<u8>,
51    parent_hash: Vec<u8>,
52    commands_hash: Vec<u8>,
53    batch_root: Vec<u8>,
54    command_count: i64,
55    first_command_seq: i64,
56    last_command_seq: i64,
57    signer: Option<Vec<u8>>,
58    signature: Option<Vec<u8>>,
59    created_at: DateTime<Utc>,
60}
61
62#[derive(Queryable, Selectable)]
63#[diesel(table_name = validator_rsm_block_commands)]
64#[diesel(check_for_backend(diesel::pg::Pg))]
65struct BlockCommandRow {
66    environment: i16,
67    height: i64,
68    rsm_command_seq: i64,
69    command_index: i64,
70    engine_command_id: i64,
71    request_uuid: DbUuid,
72    command_type: CommandType,
73    command_data: Vec<u8>,
74    command_identity_hash: Vec<u8>,
75    created_at: DateTime<Utc>,
76}
77
78fn u64_to_i64(name: &str, value: u64) -> Result<i64> {
79    i64::try_from(value).map_err(|_| anyhow!("{name} overflows BIGINT: {value}"))
80}
81
82fn i64_to_u64(name: &str, value: i64) -> Result<u64> {
83    u64::try_from(value).map_err(|_| anyhow!("{name} is negative: {value}"))
84}
85
86fn root32(name: &str, bytes: Vec<u8>) -> Result<[u8; 32]> {
87    bytes
88        .try_into()
89        .map_err(|bytes: Vec<u8>| anyhow!("{name} must be 32 bytes, got {}", bytes.len()))
90}
91
92fn addr20(name: &str, bytes: Vec<u8>) -> Result<[u8; 20]> {
93    bytes
94        .try_into()
95        .map_err(|bytes: Vec<u8>| anyhow!("{name} must be 20 bytes, got {}", bytes.len()))
96}
97
98impl TryFrom<RootSummaryRow> for hypercall_db::ValidatorRsmRootSummary {
99    type Error = anyhow::Error;
100
101    fn try_from(row: RootSummaryRow) -> Result<Self> {
102        Ok(Self {
103            environment: row.environment.try_into()?,
104            version: i64_to_u64("version", row.version)?,
105            batch_seq: i64_to_u64("batch_seq", row.batch_seq)?,
106            state_root: root32("state_root", row.state_root)?,
107            risk_root: root32("risk_root", row.risk_root)?,
108            command_mmr_root: root32("command_mmr_root", row.command_mmr_root)?,
109            obligation_mmr_root: root32("obligation_mmr_root", row.obligation_mmr_root)?,
110            intent_mmr_root: root32("intent_mmr_root", row.intent_mmr_root)?,
111            batch_root: root32("batch_root", row.batch_root)?,
112            command_range_start: i64_to_u64("command_range_start", row.command_range_start)?,
113            command_range_end: i64_to_u64("command_range_end", row.command_range_end)?,
114            command_count: i64_to_u64("command_count", row.command_count)?,
115            schema_version: row.schema_version,
116            accepted_block_hash: root32("accepted_block_hash", row.accepted_block_hash)?,
117            created_at: row.created_at,
118        })
119    }
120}
121
122impl TryFrom<BlockRow> for hypercall_db::RsmBlockHeader {
123    type Error = anyhow::Error;
124
125    fn try_from(row: BlockRow) -> Result<Self> {
126        Ok(Self {
127            environment: row.environment.try_into()?,
128            height: i64_to_u64("height", row.height)?,
129            hash: root32("hash", row.hash)?,
130            parent_hash: root32("parent_hash", row.parent_hash)?,
131            commands_hash: root32("commands_hash", row.commands_hash)?,
132            batch_root: root32("batch_root", row.batch_root)?,
133            command_count: i64_to_u64("command_count", row.command_count)?,
134            first_command_seq: i64_to_u64("first_command_seq", row.first_command_seq)?,
135            last_command_seq: i64_to_u64("last_command_seq", row.last_command_seq)?,
136            signer: row
137                .signer
138                .map(|bytes| addr20("signer", bytes))
139                .transpose()?,
140            signature: row.signature,
141            created_at: row.created_at,
142        })
143    }
144}
145
146impl TryFrom<CurrentStateRow> for hypercall_db::ValidatorRsmCurrentState {
147    type Error = anyhow::Error;
148
149    fn try_from(row: CurrentStateRow) -> Result<Self> {
150        Ok(Self {
151            environment: row.environment.try_into()?,
152            current_version: i64_to_u64("current_version", row.current_version)?,
153            updated_at: row.updated_at,
154        })
155    }
156}
157
158impl TryFrom<BlockCommandRow> for hypercall_db::RsmBlockCommand {
159    type Error = anyhow::Error;
160
161    fn try_from(row: BlockCommandRow) -> Result<Self> {
162        Ok(Self {
163            environment: row.environment.try_into()?,
164            height: i64_to_u64("height", row.height)?,
165            rsm_command_seq: i64_to_u64("rsm_command_seq", row.rsm_command_seq)?,
166            command_index: i64_to_u64("command_index", row.command_index)?,
167            engine_command_id: row.engine_command_id,
168            request_uuid: row.request_uuid.0,
169            command_type: row.command_type.to_string(),
170            command_data: row.command_data,
171            command_identity_hash: root32("command_identity_hash", row.command_identity_hash)?,
172            created_at: row.created_at,
173        })
174    }
175}
176
177fn block_view_from_rows(
178    block: BlockRow,
179    root_summary: RootSummaryRow,
180) -> Result<hypercall_db::RsmBlockView> {
181    Ok(hypercall_db::RsmBlockView {
182        block: block.try_into()?,
183        root_summary: root_summary.try_into()?,
184    })
185}
186
187#[async_trait::async_trait]
188impl hypercall_db::ValidatorRsmStateAsyncReader for DieselDb {
189    async fn get_validator_rsm_root_summary(
190        &self,
191        environment: hypercall_db::ValidatorRsmEnvironment,
192        version: u64,
193    ) -> Result<Option<hypercall_db::ValidatorRsmRootSummary>> {
194        let version = u64_to_i64("version", version)?;
195        let mut conn = self.get_conn().await?;
196        let query = validator_rsm_batch_roots::table
197            .filter(validator_rsm_batch_roots::environment.eq(environment.as_i16()))
198            .filter(validator_rsm_batch_roots::version.eq(version))
199            .select(RootSummaryRow::as_select());
200        let row = diesel_async::RunQueryDsl::get_result(query, &mut conn)
201            .await
202            .optional()?;
203
204        row.map(TryInto::try_into).transpose()
205    }
206
207    async fn get_validator_rsm_current_state(
208        &self,
209        environment: hypercall_db::ValidatorRsmEnvironment,
210    ) -> Result<Option<hypercall_db::ValidatorRsmCurrentState>> {
211        let mut conn = self.get_conn().await?;
212        let query = validator_rsm_current_state::table
213            .filter(validator_rsm_current_state::environment.eq(environment.as_i16()))
214            .select(CurrentStateRow::as_select());
215        let row = diesel_async::RunQueryDsl::get_result(query, &mut conn)
216            .await
217            .optional()?;
218
219        row.map(TryInto::try_into).transpose()
220    }
221
222    async fn get_validator_rsm_current_root_summary(
223        &self,
224        environment: hypercall_db::ValidatorRsmEnvironment,
225    ) -> Result<Option<hypercall_db::ValidatorRsmRootSummary>> {
226        let mut conn = self.get_conn().await?;
227        let query = validator_rsm_current_state::table
228            .inner_join(
229                validator_rsm_batch_roots::table.on(validator_rsm_batch_roots::environment
230                    .eq(validator_rsm_current_state::environment)
231                    .and(
232                        validator_rsm_batch_roots::version
233                            .eq(validator_rsm_current_state::current_version),
234                    )),
235            )
236            .filter(validator_rsm_current_state::environment.eq(environment.as_i16()))
237            .select(RootSummaryRow::as_select());
238        let row = diesel_async::RunQueryDsl::get_result(query, &mut conn)
239            .await
240            .optional()?;
241
242        row.map(TryInto::try_into).transpose()
243    }
244
245    async fn get_rsm_block_by_height(
246        &self,
247        environment: hypercall_db::ValidatorRsmEnvironment,
248        height: u64,
249    ) -> Result<Option<hypercall_db::RsmBlockView>> {
250        let height = u64_to_i64("height", height)?;
251        let mut conn = self.get_conn().await?;
252        let query = validator_rsm_blocks::table
253            .inner_join(
254                validator_rsm_batch_roots::table.on(validator_rsm_batch_roots::environment
255                    .eq(validator_rsm_blocks::environment)
256                    .and(validator_rsm_batch_roots::batch_seq.eq(validator_rsm_blocks::height))
257                    .and(
258                        validator_rsm_batch_roots::accepted_block_hash
259                            .eq(validator_rsm_blocks::hash),
260                    )),
261            )
262            .filter(validator_rsm_blocks::environment.eq(environment.as_i16()))
263            .filter(validator_rsm_blocks::height.eq(height))
264            .select((BlockRow::as_select(), RootSummaryRow::as_select()));
265        let row = diesel_async::RunQueryDsl::get_result(query, &mut conn)
266            .await
267            .optional()?;
268
269        row.map(|(block, root)| block_view_from_rows(block, root))
270            .transpose()
271    }
272
273    async fn get_rsm_block_by_hash(
274        &self,
275        environment: hypercall_db::ValidatorRsmEnvironment,
276        hash: [u8; 32],
277    ) -> Result<Option<hypercall_db::RsmBlockView>> {
278        let mut conn = self.get_conn().await?;
279        let query = validator_rsm_blocks::table
280            .inner_join(
281                validator_rsm_batch_roots::table.on(validator_rsm_batch_roots::environment
282                    .eq(validator_rsm_blocks::environment)
283                    .and(validator_rsm_batch_roots::batch_seq.eq(validator_rsm_blocks::height))
284                    .and(
285                        validator_rsm_batch_roots::accepted_block_hash
286                            .eq(validator_rsm_blocks::hash),
287                    )),
288            )
289            .filter(validator_rsm_blocks::environment.eq(environment.as_i16()))
290            .filter(validator_rsm_blocks::hash.eq(hash.to_vec()))
291            .select((BlockRow::as_select(), RootSummaryRow::as_select()));
292        let row = diesel_async::RunQueryDsl::get_result(query, &mut conn)
293            .await
294            .optional()?;
295
296        row.map(|(block, root)| block_view_from_rows(block, root))
297            .transpose()
298    }
299
300    async fn get_latest_rsm_block(
301        &self,
302        environment: hypercall_db::ValidatorRsmEnvironment,
303    ) -> Result<Option<hypercall_db::RsmBlockView>> {
304        let mut conn = self.get_conn().await?;
305        let query = validator_rsm_blocks::table
306            .inner_join(
307                validator_rsm_batch_roots::table.on(validator_rsm_batch_roots::environment
308                    .eq(validator_rsm_blocks::environment)
309                    .and(validator_rsm_batch_roots::batch_seq.eq(validator_rsm_blocks::height))
310                    .and(
311                        validator_rsm_batch_roots::accepted_block_hash
312                            .eq(validator_rsm_blocks::hash),
313                    )),
314            )
315            .inner_join(
316                validator_rsm_current_state::table.on(validator_rsm_current_state::environment
317                    .eq(validator_rsm_blocks::environment)
318                    .and(
319                        validator_rsm_current_state::current_version
320                            .eq(validator_rsm_batch_roots::version),
321                    )),
322            )
323            .filter(validator_rsm_blocks::environment.eq(environment.as_i16()))
324            .select((BlockRow::as_select(), RootSummaryRow::as_select()));
325        let row = diesel_async::RunQueryDsl::get_result(query, &mut conn)
326            .await
327            .optional()?;
328
329        row.map(|(block, root)| block_view_from_rows(block, root))
330            .transpose()
331    }
332
333    async fn list_rsm_blocks(
334        &self,
335        environment: hypercall_db::ValidatorRsmEnvironment,
336        from_height: Option<u64>,
337        limit: u32,
338    ) -> Result<Vec<hypercall_db::RsmBlockView>> {
339        let limit = i64::from(limit.clamp(1, 100));
340        let mut conn = self.get_conn().await?;
341        let mut query = validator_rsm_blocks::table
342            .inner_join(
343                validator_rsm_batch_roots::table.on(validator_rsm_batch_roots::environment
344                    .eq(validator_rsm_blocks::environment)
345                    .and(validator_rsm_batch_roots::batch_seq.eq(validator_rsm_blocks::height))
346                    .and(
347                        validator_rsm_batch_roots::accepted_block_hash
348                            .eq(validator_rsm_blocks::hash),
349                    )),
350            )
351            .filter(validator_rsm_blocks::environment.eq(environment.as_i16()))
352            .into_boxed();
353
354        if let Some(from_height) = from_height {
355            query = query
356                .filter(validator_rsm_blocks::height.le(u64_to_i64("from_height", from_height)?));
357        }
358
359        let query = query
360            .select((BlockRow::as_select(), RootSummaryRow::as_select()))
361            .order(validator_rsm_blocks::height.desc())
362            .limit(limit);
363        let rows = diesel_async::RunQueryDsl::load(query, &mut conn).await?;
364
365        rows.into_iter()
366            .map(|(block, root)| block_view_from_rows(block, root))
367            .collect()
368    }
369
370    async fn get_rsm_block_commands(
371        &self,
372        environment: hypercall_db::ValidatorRsmEnvironment,
373        height: u64,
374    ) -> Result<Vec<hypercall_db::RsmBlockCommand>> {
375        let height = u64_to_i64("height", height)?;
376        let mut conn = self.get_conn().await?;
377        let query = validator_rsm_block_commands::table
378            .filter(validator_rsm_block_commands::environment.eq(environment.as_i16()))
379            .filter(validator_rsm_block_commands::height.eq(height))
380            .select(BlockCommandRow::as_select())
381            .order(validator_rsm_block_commands::command_index.asc());
382        let rows = diesel_async::RunQueryDsl::load(query, &mut conn).await?;
383
384        rows.into_iter().map(TryInto::try_into).collect()
385    }
386}
387
388impl hypercall_db::ValidatorRsmStateReader for DatabaseHandler {
389    fn get_validator_rsm_root_summary_sync(
390        &self,
391        environment: hypercall_db::ValidatorRsmEnvironment,
392        version: u64,
393    ) -> Result<Option<hypercall_db::ValidatorRsmRootSummary>> {
394        let version = u64_to_i64("version", version)?;
395        let mut conn = self.pool().get().context("Failed to get DB connection")?;
396        let row = validator_rsm_batch_roots::table
397            .filter(validator_rsm_batch_roots::environment.eq(environment.as_i16()))
398            .filter(validator_rsm_batch_roots::version.eq(version))
399            .select(RootSummaryRow::as_select())
400            .get_result::<RootSummaryRow>(&mut conn)
401            .optional()?;
402
403        row.map(TryInto::try_into).transpose()
404    }
405
406    fn get_validator_rsm_current_state_sync(
407        &self,
408        environment: hypercall_db::ValidatorRsmEnvironment,
409    ) -> Result<Option<hypercall_db::ValidatorRsmCurrentState>> {
410        let mut conn = self.pool().get().context("Failed to get DB connection")?;
411        let row = validator_rsm_current_state::table
412            .filter(validator_rsm_current_state::environment.eq(environment.as_i16()))
413            .select(CurrentStateRow::as_select())
414            .get_result::<CurrentStateRow>(&mut conn)
415            .optional()?;
416
417        row.map(TryInto::try_into).transpose()
418    }
419
420    fn get_validator_rsm_current_root_summary_sync(
421        &self,
422        environment: hypercall_db::ValidatorRsmEnvironment,
423    ) -> Result<Option<hypercall_db::ValidatorRsmRootSummary>> {
424        let mut conn = self.pool().get().context("Failed to get DB connection")?;
425        let row = validator_rsm_current_state::table
426            .inner_join(
427                validator_rsm_batch_roots::table.on(validator_rsm_batch_roots::environment
428                    .eq(validator_rsm_current_state::environment)
429                    .and(
430                        validator_rsm_batch_roots::version
431                            .eq(validator_rsm_current_state::current_version),
432                    )),
433            )
434            .filter(validator_rsm_current_state::environment.eq(environment.as_i16()))
435            .select(RootSummaryRow::as_select())
436            .get_result::<RootSummaryRow>(&mut conn)
437            .optional()?;
438
439        row.map(TryInto::try_into).transpose()
440    }
441
442    fn get_rsm_block_by_height_sync(
443        &self,
444        environment: hypercall_db::ValidatorRsmEnvironment,
445        height: u64,
446    ) -> Result<Option<hypercall_db::RsmBlockView>> {
447        let height = u64_to_i64("height", height)?;
448        let mut conn = self.pool().get().context("Failed to get DB connection")?;
449        let row = validator_rsm_blocks::table
450            .inner_join(
451                validator_rsm_batch_roots::table.on(validator_rsm_batch_roots::environment
452                    .eq(validator_rsm_blocks::environment)
453                    .and(validator_rsm_batch_roots::batch_seq.eq(validator_rsm_blocks::height))
454                    .and(
455                        validator_rsm_batch_roots::accepted_block_hash
456                            .eq(validator_rsm_blocks::hash),
457                    )),
458            )
459            .filter(validator_rsm_blocks::environment.eq(environment.as_i16()))
460            .filter(validator_rsm_blocks::height.eq(height))
461            .select((BlockRow::as_select(), RootSummaryRow::as_select()))
462            .get_result::<(BlockRow, RootSummaryRow)>(&mut conn)
463            .optional()?;
464
465        row.map(|(block, root)| block_view_from_rows(block, root))
466            .transpose()
467    }
468
469    fn get_rsm_block_by_hash_sync(
470        &self,
471        environment: hypercall_db::ValidatorRsmEnvironment,
472        hash: [u8; 32],
473    ) -> Result<Option<hypercall_db::RsmBlockView>> {
474        let mut conn = self.pool().get().context("Failed to get DB connection")?;
475        let row = validator_rsm_blocks::table
476            .inner_join(
477                validator_rsm_batch_roots::table.on(validator_rsm_batch_roots::environment
478                    .eq(validator_rsm_blocks::environment)
479                    .and(validator_rsm_batch_roots::batch_seq.eq(validator_rsm_blocks::height))
480                    .and(
481                        validator_rsm_batch_roots::accepted_block_hash
482                            .eq(validator_rsm_blocks::hash),
483                    )),
484            )
485            .filter(validator_rsm_blocks::environment.eq(environment.as_i16()))
486            .filter(validator_rsm_blocks::hash.eq(hash.to_vec()))
487            .select((BlockRow::as_select(), RootSummaryRow::as_select()))
488            .get_result::<(BlockRow, RootSummaryRow)>(&mut conn)
489            .optional()?;
490
491        row.map(|(block, root)| block_view_from_rows(block, root))
492            .transpose()
493    }
494
495    fn get_latest_rsm_block_sync(
496        &self,
497        environment: hypercall_db::ValidatorRsmEnvironment,
498    ) -> Result<Option<hypercall_db::RsmBlockView>> {
499        let mut conn = self.pool().get().context("Failed to get DB connection")?;
500        let row = validator_rsm_blocks::table
501            .inner_join(
502                validator_rsm_batch_roots::table.on(validator_rsm_batch_roots::environment
503                    .eq(validator_rsm_blocks::environment)
504                    .and(validator_rsm_batch_roots::batch_seq.eq(validator_rsm_blocks::height))
505                    .and(
506                        validator_rsm_batch_roots::accepted_block_hash
507                            .eq(validator_rsm_blocks::hash),
508                    )),
509            )
510            .inner_join(
511                validator_rsm_current_state::table.on(validator_rsm_current_state::environment
512                    .eq(validator_rsm_blocks::environment)
513                    .and(
514                        validator_rsm_current_state::current_version
515                            .eq(validator_rsm_batch_roots::version),
516                    )),
517            )
518            .filter(validator_rsm_blocks::environment.eq(environment.as_i16()))
519            .select((BlockRow::as_select(), RootSummaryRow::as_select()))
520            .get_result::<(BlockRow, RootSummaryRow)>(&mut conn)
521            .optional()?;
522
523        row.map(|(block, root)| block_view_from_rows(block, root))
524            .transpose()
525    }
526
527    fn list_rsm_blocks_sync(
528        &self,
529        environment: hypercall_db::ValidatorRsmEnvironment,
530        from_height: Option<u64>,
531        limit: u32,
532    ) -> Result<Vec<hypercall_db::RsmBlockView>> {
533        let limit = i64::from(limit.clamp(1, 100));
534        let mut conn = self.pool().get().context("Failed to get DB connection")?;
535        let mut query = validator_rsm_blocks::table
536            .inner_join(
537                validator_rsm_batch_roots::table.on(validator_rsm_batch_roots::environment
538                    .eq(validator_rsm_blocks::environment)
539                    .and(validator_rsm_batch_roots::batch_seq.eq(validator_rsm_blocks::height))
540                    .and(
541                        validator_rsm_batch_roots::accepted_block_hash
542                            .eq(validator_rsm_blocks::hash),
543                    )),
544            )
545            .filter(validator_rsm_blocks::environment.eq(environment.as_i16()))
546            .into_boxed();
547
548        if let Some(from_height) = from_height {
549            query = query
550                .filter(validator_rsm_blocks::height.le(u64_to_i64("from_height", from_height)?));
551        }
552
553        let rows = query
554            .select((BlockRow::as_select(), RootSummaryRow::as_select()))
555            .order(validator_rsm_blocks::height.desc())
556            .limit(limit)
557            .load::<(BlockRow, RootSummaryRow)>(&mut conn)?;
558
559        rows.into_iter()
560            .map(|(block, root)| block_view_from_rows(block, root))
561            .collect()
562    }
563
564    fn get_rsm_block_commands_sync(
565        &self,
566        environment: hypercall_db::ValidatorRsmEnvironment,
567        height: u64,
568    ) -> Result<Vec<hypercall_db::RsmBlockCommand>> {
569        let height = u64_to_i64("height", height)?;
570        let mut conn = self.pool().get().context("Failed to get DB connection")?;
571        let rows = validator_rsm_block_commands::table
572            .filter(validator_rsm_block_commands::environment.eq(environment.as_i16()))
573            .filter(validator_rsm_block_commands::height.eq(height))
574            .select(BlockCommandRow::as_select())
575            .order(validator_rsm_block_commands::command_index.asc())
576            .load::<BlockCommandRow>(&mut conn)?;
577
578        rows.into_iter().map(TryInto::try_into).collect()
579    }
580}
581
582impl hypercall_db::ValidatorRsmStateWriter for DatabaseHandler {
583    fn save_validator_rsm_root_summary_sync(
584        &self,
585        summary: &hypercall_db::NewValidatorRsmRootSummary,
586        advance_current: bool,
587    ) -> Result<()> {
588        let version = u64_to_i64("version", summary.version)?;
589        let batch_seq = u64_to_i64("batch_seq", summary.batch_seq)?;
590        let command_range_start = u64_to_i64("command_range_start", summary.command_range_start)?;
591        let command_range_end = u64_to_i64("command_range_end", summary.command_range_end)?;
592        let command_count = u64_to_i64("command_count", summary.command_count)?;
593        let mut conn = self.pool().get().context("Failed to get DB connection")?;
594
595        conn.transaction(|conn| {
596            let inserted = diesel::sql_query(
597                "INSERT INTO validator_rsm_batch_roots (
598                    environment, version, batch_seq, state_root, risk_root, command_mmr_root,
599                    obligation_mmr_root, intent_mmr_root, batch_root, command_range_start,
600                    command_range_end, command_count, schema_version, accepted_block_hash
601                 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
602                 ON CONFLICT (environment, version) DO NOTHING",
603            )
604            .bind::<SmallInt, _>(summary.environment.as_i16())
605            .bind::<BigInt, _>(version)
606            .bind::<BigInt, _>(batch_seq)
607            .bind::<Bytea, _>(&summary.state_root[..])
608            .bind::<Bytea, _>(&summary.risk_root[..])
609            .bind::<Bytea, _>(&summary.command_mmr_root[..])
610            .bind::<Bytea, _>(&summary.obligation_mmr_root[..])
611            .bind::<Bytea, _>(&summary.intent_mmr_root[..])
612            .bind::<Bytea, _>(&summary.batch_root[..])
613            .bind::<BigInt, _>(command_range_start)
614            .bind::<BigInt, _>(command_range_end)
615            .bind::<BigInt, _>(command_count)
616            .bind::<Integer, _>(summary.schema_version)
617            .bind::<Bytea, _>(&summary.accepted_block_hash[..])
618            .execute(conn)?;
619            if inserted == 0 {
620                let existing = validator_rsm_batch_roots::table
621                    .filter(validator_rsm_batch_roots::environment.eq(summary.environment.as_i16()))
622                    .filter(validator_rsm_batch_roots::version.eq(version))
623                    .select(RootSummaryRow::as_select())
624                    .get_result::<RootSummaryRow>(conn)?;
625
626                let matches_existing = existing.batch_seq == batch_seq
627                    && existing.state_root == summary.state_root.as_slice()
628                    && existing.risk_root == summary.risk_root.as_slice()
629                    && existing.command_mmr_root == summary.command_mmr_root.as_slice()
630                    && existing.obligation_mmr_root == summary.obligation_mmr_root.as_slice()
631                    && existing.intent_mmr_root == summary.intent_mmr_root.as_slice()
632                    && existing.batch_root == summary.batch_root.as_slice()
633                    && existing.command_range_start == command_range_start
634                    && existing.command_range_end == command_range_end
635                    && existing.command_count == command_count
636                    && existing.schema_version == summary.schema_version
637                    && existing.accepted_block_hash == summary.accepted_block_hash.as_slice();
638                if !matches_existing {
639                    return Err(anyhow!(
640                        "conflicting validator RSM root summary for {} version {}",
641                        summary.environment,
642                        summary.version
643                    ));
644                }
645            }
646
647            if advance_current {
648                let affected = diesel::sql_query(
649                    "INSERT INTO validator_rsm_current_state (environment, current_version, updated_at)
650                     VALUES ($1, $2, NOW())
651                     ON CONFLICT (environment) DO UPDATE
652                     SET current_version = EXCLUDED.current_version, updated_at = NOW()
653                     WHERE validator_rsm_current_state.current_version <= EXCLUDED.current_version",
654                )
655                .bind::<SmallInt, _>(summary.environment.as_i16())
656                .bind::<BigInt, _>(version)
657                .execute(conn)?;
658                if affected == 0 {
659                    return Err(anyhow!(
660                        "refusing to move validator RSM current version for {} backward to {}",
661                        summary.environment,
662                        version
663                    ));
664                }
665            }
666
667            Ok::<_, anyhow::Error>(())
668        })
669    }
670
671    fn save_rsm_block_header_sync(&self, block: &hypercall_db::NewRsmBlockHeader) -> Result<()> {
672        let height = u64_to_i64("height", block.height)?;
673        let command_count = u64_to_i64("command_count", block.command_count)?;
674        let first_command_seq = u64_to_i64("first_command_seq", block.first_command_seq)?;
675        let last_command_seq = u64_to_i64("last_command_seq", block.last_command_seq)?;
676        let mut conn = self.pool().get().context("Failed to get DB connection")?;
677
678        conn.transaction(|conn| {
679            let inserted = diesel::sql_query(
680                "INSERT INTO validator_rsm_blocks (
681                    environment, height, hash, parent_hash, commands_hash, batch_root,
682                    command_count, first_command_seq, last_command_seq, signer, signature
683                 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
684                 ON CONFLICT (environment, height) DO NOTHING",
685            )
686            .bind::<SmallInt, _>(block.environment.as_i16())
687            .bind::<BigInt, _>(height)
688            .bind::<Bytea, _>(&block.hash[..])
689            .bind::<Bytea, _>(&block.parent_hash[..])
690            .bind::<Bytea, _>(&block.commands_hash[..])
691            .bind::<Bytea, _>(&block.batch_root[..])
692            .bind::<BigInt, _>(command_count)
693            .bind::<BigInt, _>(first_command_seq)
694            .bind::<BigInt, _>(last_command_seq)
695            .bind::<diesel::sql_types::Nullable<Bytea>, _>(
696                block.signer.as_ref().map(|signer| signer.as_slice()),
697            )
698            .bind::<diesel::sql_types::Nullable<Bytea>, _>(block.signature.as_deref())
699            .execute(conn)?;
700
701            if inserted == 0 {
702                let existing = validator_rsm_blocks::table
703                    .filter(validator_rsm_blocks::environment.eq(block.environment.as_i16()))
704                    .filter(validator_rsm_blocks::height.eq(height))
705                    .select(BlockRow::as_select())
706                    .get_result::<BlockRow>(conn)?;
707
708                let matches_existing = existing.hash == block.hash.as_slice()
709                    && existing.parent_hash == block.parent_hash.as_slice()
710                    && existing.commands_hash == block.commands_hash.as_slice()
711                    && existing.batch_root == block.batch_root.as_slice()
712                    && existing.command_count == command_count
713                    && existing.first_command_seq == first_command_seq
714                    && existing.last_command_seq == last_command_seq
715                    && existing.signer.as_deref() == block.signer.as_ref().map(|s| &s[..])
716                    && existing.signature.as_deref() == block.signature.as_deref();
717                if !matches_existing {
718                    return Err(anyhow!(
719                        "conflicting validator RSM block for {} height {}",
720                        block.environment,
721                        block.height
722                    ));
723                }
724            }
725
726            Ok::<_, anyhow::Error>(())
727        })
728    }
729
730    fn save_rsm_block_commands_sync(
731        &self,
732        commands: &[hypercall_db::NewRsmBlockCommand],
733    ) -> Result<()> {
734        if commands.is_empty() {
735            return Ok(());
736        }
737
738        let mut conn = self.pool().get().context("Failed to get DB connection")?;
739        conn.transaction(|conn| {
740            for command in commands {
741                let height = u64_to_i64("height", command.height)?;
742                let rsm_command_seq = u64_to_i64("rsm_command_seq", command.rsm_command_seq)?;
743                let command_index = u64_to_i64("command_index", command.command_index)?;
744                let inserted = diesel::sql_query(
745                    "INSERT INTO validator_rsm_block_commands (
746                        environment, height, rsm_command_seq, command_index, engine_command_id,
747                        request_uuid, command_type, command_data, command_identity_hash
748                     ) VALUES ($1, $2, $3, $4, $5, $6, $7::engine_command_type, $8, $9)
749                     ON CONFLICT (environment, height, rsm_command_seq) DO NOTHING",
750                )
751                .bind::<SmallInt, _>(command.environment.as_i16())
752                .bind::<BigInt, _>(height)
753                .bind::<BigInt, _>(rsm_command_seq)
754                .bind::<BigInt, _>(command_index)
755                .bind::<BigInt, _>(command.engine_command_id)
756                .bind::<Uuid, _>(DbUuid(command.request_uuid))
757                .bind::<Text, _>(&command.command_type)
758                .bind::<Bytea, _>(&command.command_data)
759                .bind::<Bytea, _>(&command.command_identity_hash[..])
760                .execute(conn)?;
761
762                if inserted == 0 {
763                    let existing = validator_rsm_block_commands::table
764                        .filter(
765                            validator_rsm_block_commands::environment
766                                .eq(command.environment.as_i16()),
767                        )
768                        .filter(validator_rsm_block_commands::height.eq(height))
769                        .filter(validator_rsm_block_commands::rsm_command_seq.eq(rsm_command_seq))
770                        .select(BlockCommandRow::as_select())
771                        .get_result::<BlockCommandRow>(conn)?;
772
773                    let matches_existing = existing.command_index == command_index
774                        && existing.engine_command_id == command.engine_command_id
775                        && existing.request_uuid.0 == command.request_uuid
776                        && existing.command_type.to_string() == command.command_type
777                        && existing.command_data == command.command_data
778                        && existing.command_identity_hash
779                            == command.command_identity_hash.as_slice();
780                    if !matches_existing {
781                        return Err(anyhow!(
782                            "conflicting validator RSM block command for {} height {} sequence {}",
783                            command.environment,
784                            command.height,
785                            command.rsm_command_seq
786                        ));
787                    }
788                }
789            }
790
791            Ok::<_, anyhow::Error>(())
792        })
793    }
794}
795
796#[cfg(test)]
797mod tests {
798    use super::*;
799    use crate::test_helpers::TestDb;
800    use hypercall_db::{ValidatorRsmEnvironment, ValidatorRsmStateReader, ValidatorRsmStateWriter};
801    use uuid::Uuid as RustUuid;
802
803    fn summary(
804        environment: ValidatorRsmEnvironment,
805        version: u64,
806    ) -> hypercall_db::NewValidatorRsmRootSummary {
807        hypercall_db::NewValidatorRsmRootSummary {
808            environment,
809            version,
810            batch_seq: version + 10,
811            state_root: [version as u8; 32],
812            risk_root: [2; 32],
813            command_mmr_root: [3; 32],
814            obligation_mmr_root: [4; 32],
815            intent_mmr_root: [5; 32],
816            batch_root: [6; 32],
817            command_range_start: version * 100,
818            command_range_end: version * 100 + 2,
819            command_count: 3,
820            schema_version: 1,
821            accepted_block_hash: [7; 32],
822        }
823    }
824
825    fn block_header(
826        environment: ValidatorRsmEnvironment,
827        height: u64,
828    ) -> hypercall_db::NewRsmBlockHeader {
829        hypercall_db::NewRsmBlockHeader {
830            environment,
831            height,
832            hash: [7; 32],
833            parent_hash: [8; 32],
834            commands_hash: [9; 32],
835            batch_root: [6; 32],
836            command_count: 3,
837            first_command_seq: 100,
838            last_command_seq: 102,
839            signer: Some([10; 20]),
840            signature: Some(vec![11; 65]),
841        }
842    }
843
844    fn block_command(
845        environment: ValidatorRsmEnvironment,
846        height: u64,
847        command_index: u64,
848        engine_command_id: i64,
849        request_uuid: RustUuid,
850    ) -> hypercall_db::NewRsmBlockCommand {
851        hypercall_db::NewRsmBlockCommand {
852            environment,
853            height,
854            rsm_command_seq: 100 + command_index,
855            command_index,
856            engine_command_id,
857            request_uuid,
858            command_type: "PriceUpdate".to_string(),
859            command_data: vec![1, 2, 3, command_index as u8],
860            command_identity_hash: [command_index as u8 + 12; 32],
861        }
862    }
863
864    fn insert_engine_command(
865        db: &DatabaseHandler,
866        request_uuid: RustUuid,
867        command_type: &str,
868        command_data: &[u8],
869    ) -> i64 {
870        let mut conn = db.pool().get().unwrap();
871        diesel::sql_query(
872            "INSERT INTO engine_commands (
873                received_ts_ms, request_uuid, command_type_enum, command_data
874             ) VALUES (1, $1, $2::engine_command_type, $3)
875             RETURNING command_id",
876        )
877        .bind::<Uuid, _>(DbUuid(request_uuid))
878        .bind::<Text, _>(command_type)
879        .bind::<Bytea, _>(command_data)
880        .get_result::<CommandIdRow>(&mut conn)
881        .unwrap()
882        .command_id
883    }
884
885    #[derive(QueryableByName)]
886    struct CommandIdRow {
887        #[diesel(sql_type = BigInt)]
888        command_id: i64,
889    }
890
891    #[test]
892    fn root32_rejects_wrong_length() {
893        assert!(root32("state_root", vec![0u8; 31]).is_err());
894        assert_eq!(root32("state_root", vec![1u8; 32]).unwrap(), [1u8; 32]);
895    }
896
897    #[test]
898    fn integer_conversion_rejects_overflow_and_negative() {
899        assert!(u64_to_i64("version", i64::MAX as u64 + 1).is_err());
900        assert!(i64_to_u64("version", -1).is_err());
901    }
902
903    #[tokio::test]
904    async fn root_summary_roundtrip_and_current_pointer() {
905        let test_db = TestDb::new().await.unwrap();
906        let db = test_db.handler.as_ref();
907        let env = ValidatorRsmEnvironment::Staging;
908        let new_summary = summary(env, 1);
909
910        db.save_validator_rsm_root_summary_sync(&new_summary, true)
911            .unwrap();
912
913        let saved = db
914            .get_validator_rsm_root_summary_sync(env, 1)
915            .unwrap()
916            .expect("saved validator RSM root summary");
917        assert_eq!(saved.environment, env);
918        assert_eq!(saved.version, new_summary.version);
919        assert_eq!(saved.batch_seq, new_summary.batch_seq);
920        assert_eq!(saved.state_root, new_summary.state_root);
921        assert_eq!(saved.command_range_start, new_summary.command_range_start);
922        assert_eq!(saved.command_range_end, new_summary.command_range_end);
923        assert_eq!(saved.command_count, new_summary.command_count);
924
925        let current = db
926            .get_validator_rsm_current_state_sync(env)
927            .unwrap()
928            .expect("current validator RSM state");
929        assert_eq!(current.environment, env);
930        assert_eq!(current.current_version, 1);
931
932        let current_summary = db
933            .get_validator_rsm_current_root_summary_sync(env)
934            .unwrap()
935            .expect("current validator RSM root summary");
936        assert_eq!(current_summary.version, 1);
937        assert_eq!(current_summary.state_root, new_summary.state_root);
938    }
939
940    #[tokio::test]
941    async fn root_summary_duplicate_is_idempotent_but_conflict_fails() {
942        let test_db = TestDb::new().await.unwrap();
943        let db = test_db.handler.as_ref();
944        let new_summary = summary(ValidatorRsmEnvironment::Staging, 1);
945
946        db.save_validator_rsm_root_summary_sync(&new_summary, false)
947            .unwrap();
948        db.save_validator_rsm_root_summary_sync(&new_summary, false)
949            .unwrap();
950
951        let mut conflicting = new_summary.clone();
952        conflicting.state_root = [99; 32];
953        let err = db
954            .save_validator_rsm_root_summary_sync(&conflicting, false)
955            .expect_err("conflicting immutable root summary must fail");
956        assert!(
957            err.to_string()
958                .contains("conflicting validator RSM root summary"),
959            "unexpected error: {err:#}"
960        );
961    }
962
963    #[tokio::test]
964    async fn current_pointer_is_monotonic() {
965        let test_db = TestDb::new().await.unwrap();
966        let db = test_db.handler.as_ref();
967        let env = ValidatorRsmEnvironment::Staging;
968
969        db.save_validator_rsm_root_summary_sync(&summary(env, 1), true)
970            .unwrap();
971        db.save_validator_rsm_root_summary_sync(&summary(env, 2), true)
972            .unwrap();
973
974        let err = db
975            .save_validator_rsm_root_summary_sync(&summary(env, 1), true)
976            .expect_err("current pointer must not move backward");
977        assert!(
978            err.to_string()
979                .contains("refusing to move validator RSM current version"),
980            "unexpected error: {err:#}"
981        );
982
983        let current = db
984            .get_validator_rsm_current_state_sync(env)
985            .unwrap()
986            .expect("current validator RSM state");
987        assert_eq!(current.current_version, 2);
988    }
989
990    #[tokio::test]
991    async fn rsm_block_header_roundtrip_and_joined_views() {
992        let test_db = TestDb::new().await.unwrap();
993        let db = test_db.handler.as_ref();
994        let env = ValidatorRsmEnvironment::Staging;
995        let root = summary(env, 1);
996        let header = block_header(env, root.batch_seq);
997
998        db.save_validator_rsm_root_summary_sync(&root, true)
999            .unwrap();
1000        db.save_rsm_block_header_sync(&header).unwrap();
1001
1002        let by_height = db
1003            .get_rsm_block_by_height_sync(env, header.height)
1004            .unwrap()
1005            .expect("rsm block by height");
1006        assert_eq!(by_height.block.height, header.height);
1007        assert_eq!(by_height.block.hash, header.hash);
1008        assert_eq!(by_height.block.parent_hash, header.parent_hash);
1009        assert_eq!(by_height.block.commands_hash, header.commands_hash);
1010        assert_eq!(by_height.block.signer, header.signer);
1011        assert_eq!(by_height.root_summary.version, root.version);
1012        assert_eq!(by_height.root_summary.state_root, root.state_root);
1013
1014        let by_hash = db
1015            .get_rsm_block_by_hash_sync(env, header.hash)
1016            .unwrap()
1017            .expect("rsm block by hash");
1018        assert_eq!(by_hash.block.height, header.height);
1019
1020        let latest = db
1021            .get_latest_rsm_block_sync(env)
1022            .unwrap()
1023            .expect("latest rsm block");
1024        assert_eq!(latest.block.hash, header.hash);
1025
1026        let listed = db.list_rsm_blocks_sync(env, None, 10).unwrap();
1027        assert_eq!(listed.len(), 1);
1028        assert_eq!(listed[0].block.height, header.height);
1029    }
1030
1031    #[tokio::test]
1032    async fn rsm_block_header_duplicate_is_idempotent_but_conflict_fails() {
1033        let test_db = TestDb::new().await.unwrap();
1034        let db = test_db.handler.as_ref();
1035        let env = ValidatorRsmEnvironment::Staging;
1036        let header = block_header(env, 11);
1037
1038        db.save_rsm_block_header_sync(&header).unwrap();
1039        db.save_rsm_block_header_sync(&header).unwrap();
1040
1041        let mut conflicting = header.clone();
1042        conflicting.parent_hash = [99; 32];
1043        let err = db
1044            .save_rsm_block_header_sync(&conflicting)
1045            .expect_err("conflicting immutable block header must fail");
1046        assert!(
1047            err.to_string().contains("conflicting validator RSM block"),
1048            "unexpected error: {err:#}"
1049        );
1050    }
1051
1052    #[tokio::test]
1053    async fn rsm_block_commands_roundtrip_in_command_order() {
1054        let test_db = TestDb::new().await.unwrap();
1055        let db = test_db.handler.as_ref();
1056        let env = ValidatorRsmEnvironment::Staging;
1057        let header = block_header(env, 11);
1058        db.save_rsm_block_header_sync(&header).unwrap();
1059
1060        let request_a = RustUuid::new_v4();
1061        let request_b = RustUuid::new_v4();
1062        let command_id_a = insert_engine_command(db, request_a, "PriceUpdate", &[1, 2, 3, 0]);
1063        let command_id_b = insert_engine_command(db, request_b, "PriceUpdate", &[1, 2, 3, 1]);
1064        let commands = vec![
1065            block_command(env, header.height, 0, command_id_a, request_a),
1066            block_command(env, header.height, 1, command_id_b, request_b),
1067        ];
1068
1069        db.save_rsm_block_commands_sync(&commands).unwrap();
1070        db.save_rsm_block_commands_sync(&commands).unwrap();
1071
1072        let saved = db.get_rsm_block_commands_sync(env, header.height).unwrap();
1073        assert_eq!(saved.len(), 2);
1074        assert_eq!(saved[0].command_index, 0);
1075        assert_eq!(saved[0].rsm_command_seq, 100);
1076        assert_eq!(saved[0].engine_command_id, command_id_a);
1077        assert_eq!(saved[0].request_uuid, request_a);
1078        assert_eq!(saved[0].command_type, "PriceUpdate");
1079        assert_eq!(saved[0].command_data, vec![1, 2, 3, 0]);
1080        assert_eq!(saved[1].command_index, 1);
1081        assert_eq!(saved[1].engine_command_id, command_id_b);
1082    }
1083
1084    #[tokio::test]
1085    async fn rsm_block_command_duplicate_conflict_fails() {
1086        let test_db = TestDb::new().await.unwrap();
1087        let db = test_db.handler.as_ref();
1088        let env = ValidatorRsmEnvironment::Staging;
1089        let header = block_header(env, 12);
1090        db.save_rsm_block_header_sync(&header).unwrap();
1091
1092        let request_uuid = RustUuid::new_v4();
1093        let command_id = insert_engine_command(db, request_uuid, "PriceUpdate", &[1, 2, 3, 0]);
1094        let command = block_command(env, header.height, 0, command_id, request_uuid);
1095        db.save_rsm_block_commands_sync(std::slice::from_ref(&command))
1096            .unwrap();
1097
1098        let mut conflicting = command;
1099        conflicting.command_data = vec![9, 9, 9];
1100        let err = db
1101            .save_rsm_block_commands_sync(&[conflicting])
1102            .expect_err("conflicting immutable block command must fail");
1103        assert!(
1104            err.to_string()
1105                .contains("conflicting validator RSM block command"),
1106            "unexpected error: {err:#}"
1107        );
1108    }
1109}