1use anyhow::{anyhow, Result};
4use diesel::prelude::*;
5use diesel::sql_types::{Array, BigInt, Binary, Nullable, Text};
6use diesel::upsert::excluded;
7use std::collections::{HashMap, HashSet};
8
9use crate::database_handler::DatabaseHandler;
10use crate::engine_enums::{CommandType, DbUuid, EventType};
11use crate::order_status_materialization::upsert_materialized_order_state;
12use crate::schema::{engine_command_digests, engine_commands, engine_events};
13
14impl hypercall_db::EngineJournalBatchWriter for DatabaseHandler {
15 fn insert_engine_journal_batch_sync(
16 &self,
17 entries: &[hypercall_db::EngineJournalEntryInsert],
18 persist_digests: bool,
19 rsm_blocks: Option<&hypercall_db::EngineJournalRsmBlockBatch>,
20 ) -> Result<hypercall_db::EngineJournalBatchInsertResult> {
21 #[cfg(not(feature = "rsm-state"))]
22 if rsm_blocks.is_some() {
23 return Err(anyhow!(
24 "RSM block persistence requested but hypercall-db-diesel/rsm-state is disabled"
25 ));
26 }
27
28 if entries.is_empty() {
29 return Ok(hypercall_db::EngineJournalBatchInsertResult {
30 inserted_count: 0,
31 inserted_commands: vec![],
32 });
33 }
34
35 let mut conn = self.pool().get()?;
36
37 conn.transaction(|conn| {
38 let unique_entries = dedupe_entries_by_request_uuid(entries);
39 let command_values: Vec<_> = unique_entries
40 .iter()
41 .map(|entry| {
42 let command_type_enum = entry
43 .command_type
44 .as_deref()
45 .map(parse_command_type)
46 .transpose()?;
47 Ok((
48 engine_commands::received_ts_ms.eq(entry.received_ts_ms as i64),
49 engine_commands::command_data.eq(&entry.command_data),
50 engine_commands::response_data.eq(&entry.response_data),
51 engine_commands::order_id.eq(entry.order_id),
52 engine_commands::request_uuid.eq(DbUuid(entry.request_uuid)),
53 engine_commands::command_type_enum.eq(command_type_enum),
54 ))
55 })
56 .collect::<Result<Vec<_>, diesel::result::Error>>()?;
57
58 let inserted_commands: Vec<(DbUuid, i64)> = diesel::insert_into(engine_commands::table)
59 .values(&command_values)
60 .on_conflict(engine_commands::request_uuid)
61 .do_nothing()
62 .returning((engine_commands::request_uuid, engine_commands::command_id))
63 .get_results(conn)?;
64
65 let total_inserted = inserted_commands.len();
66 if total_inserted == 0 {
67 #[cfg(feature = "rsm-state")]
68 if let Some(rsm_blocks) = rsm_blocks {
69 persist_rsm_blocks_in_tx(conn, rsm_blocks).map_err(diesel_unknown_error)?;
70 }
71 #[cfg(not(feature = "rsm-state"))]
72 let _ = rsm_blocks;
73 return Ok(hypercall_db::EngineJournalBatchInsertResult {
74 inserted_count: 0,
75 inserted_commands: vec![],
76 });
77 }
78
79 let request_uuid_to_command_id: HashMap<uuid::Uuid, i64> = inserted_commands
80 .iter()
81 .map(|(request_uuid, cmd_id)| (request_uuid.0, *cmd_id))
82 .collect();
83
84 let mut event_values = Vec::new();
85 for entry in &unique_entries {
86 if let Some(&command_id) = request_uuid_to_command_id.get(&entry.request_uuid) {
87 for (idx, event) in entry.events.iter().enumerate() {
88 event_values.push((
89 engine_events::command_id.eq(command_id),
90 engine_events::event_idx.eq(idx as i32),
91 engine_events::event_data.eq(event.event_data.clone()),
92 engine_events::event_key.eq(event.event_key.clone()),
93 engine_events::l2_sequence.eq(event.l2_sequence),
94 engine_events::event_type_enum
95 .eq(event_type_to_diesel(event.event_type)),
96 ));
97 }
98 }
99 }
100
101 if !event_values.is_empty() {
102 diesel::insert_into(engine_events::table)
103 .values(&event_values)
104 .execute(conn)?;
105 }
106
107 insert_directive_outbox_appends(conn, &unique_entries, &request_uuid_to_command_id)?;
108 persist_journal_side_effects(conn, &unique_entries, &request_uuid_to_command_id)?;
109
110 if persist_digests {
111 let digest_values: Vec<_> = unique_entries
112 .iter()
113 .filter_map(|entry| {
114 request_uuid_to_command_id
115 .get(&entry.request_uuid)
116 .map(|&command_id| {
117 (
118 engine_command_digests::command_id.eq(command_id),
119 engine_command_digests::duration_ms
120 .eq(entry.duration_ms as i64),
121 engine_command_digests::pre_digest_data
122 .eq(entry.pre_digest_data.clone()),
123 engine_command_digests::post_digest_data
124 .eq(entry.post_digest_data.clone()),
125 )
126 })
127 })
128 .collect();
129
130 if !digest_values.is_empty() {
131 diesel::insert_into(engine_command_digests::table)
132 .values(&digest_values)
133 .on_conflict(engine_command_digests::command_id)
134 .do_nothing()
135 .execute(conn)?;
136 }
137 }
138
139 #[cfg(feature = "rsm-state")]
140 if let Some(rsm_blocks) = rsm_blocks {
141 persist_rsm_blocks_in_tx(conn, rsm_blocks).map_err(diesel_unknown_error)?;
142 }
143
144 Ok(hypercall_db::EngineJournalBatchInsertResult {
145 inserted_count: total_inserted,
146 inserted_commands: inserted_commands
147 .into_iter()
148 .map(|(request_uuid, command_id)| (request_uuid.0, command_id))
149 .collect(),
150 })
151 })
152 }
153
154 fn lookup_engine_journal_command_ids_sync(
155 &self,
156 request_ids: &[uuid::Uuid],
157 ) -> Result<Vec<(uuid::Uuid, i64)>> {
158 if request_ids.is_empty() {
159 return Ok(vec![]);
160 }
161
162 let mut conn = self.pool().get()?;
163 let request_ids: Vec<_> = request_ids.iter().copied().map(DbUuid).collect();
164 let rows: Vec<(DbUuid, i64)> = engine_commands::table
165 .filter(engine_commands::request_uuid.eq_any(request_ids))
166 .select((engine_commands::request_uuid, engine_commands::command_id))
167 .load(&mut conn)?;
168 Ok(rows
169 .into_iter()
170 .map(|(request_uuid, command_id)| (request_uuid.0, command_id))
171 .collect())
172 }
173}
174
175fn dedupe_entries_by_request_uuid(
176 entries: &[hypercall_db::EngineJournalEntryInsert],
177) -> Vec<&hypercall_db::EngineJournalEntryInsert> {
178 let mut seen = HashSet::new();
179 entries
180 .iter()
181 .filter(|entry| seen.insert(entry.request_uuid))
182 .collect()
183}
184
185fn parse_command_type(value: &str) -> Result<CommandType, diesel::result::Error> {
186 CommandType::from_str(value).ok_or_else(|| {
187 diesel_unknown_error(anyhow!("unknown engine journal command type '{}'", value))
188 })
189}
190
191fn event_type_to_diesel(event_type: hypercall_db::EventType) -> EventType {
192 EventType::from_str(&event_type.to_string()).unwrap_or_else(|| {
193 panic!(
194 "RUNTIME_INVARIANT: hypercall_db EventType '{}' has no Diesel mirror",
195 event_type
196 )
197 })
198}
199
200fn diesel_unknown_error(error: anyhow::Error) -> diesel::result::Error {
201 diesel::result::Error::DatabaseError(
202 diesel::result::DatabaseErrorKind::Unknown,
203 Box::new(error.to_string()),
204 )
205}
206
207fn insert_directive_outbox_appends(
208 conn: &mut PgConnection,
209 entries: &[&hypercall_db::EngineJournalEntryInsert],
210 request_uuid_to_command_id: &HashMap<uuid::Uuid, i64>,
211) -> QueryResult<usize> {
212 let mut directive_ids = Vec::new();
213 let mut command_ids = Vec::new();
214 let mut kinds = Vec::new();
215 let mut action_keys = Vec::new();
216 let mut wallet_addresses = Vec::new();
217 let mut account_addresses = Vec::new();
218 let mut signer_addresses = Vec::new();
219 let mut directive_nonces = Vec::new();
220 let mut idempotency_keys = Vec::new();
221 let mut payload_hashes = Vec::new();
222 let mut payloads = Vec::new();
223 let mut domain_statuses = Vec::new();
224 let mut delivery_statuses = Vec::new();
225 let mut created_ts_values = Vec::new();
226 let mut expires_at_values = Vec::new();
227 let mut signer_nonce_floors = HashMap::<Vec<u8>, i64>::new();
228
229 for entry in entries {
230 let Some(&command_id) = request_uuid_to_command_id.get(&entry.request_uuid) else {
231 continue;
232 };
233 for append in &entry.outbox_appends {
234 directive_ids.push(append.directive_id.clone());
235 command_ids.push(command_id);
236 kinds.push(append.kind.as_str().to_string());
237 action_keys.push(append.action_key.as_str().to_string());
238 wallet_addresses.push(append.wallet.as_bytes().to_vec());
239 account_addresses.push(append.account.as_bytes().to_vec());
240 let signer_address = append.signer.as_bytes().to_vec();
241 signer_addresses.push(signer_address.clone());
242 let directive_nonce = i64::try_from(append.nonce).unwrap_or_else(|_| {
243 panic!(
244 "RUNTIME_INVARIANT: directive nonce {} exceeds i64 range",
245 append.nonce
246 )
247 });
248 directive_nonces.push(directive_nonce);
249 let next_nonce = directive_nonce.checked_add(1).unwrap_or_else(|| {
250 panic!(
251 "RUNTIME_INVARIANT: directive nonce {} cannot advance",
252 append.nonce
253 )
254 });
255 signer_nonce_floors
256 .entry(signer_address)
257 .and_modify(|floor| *floor = (*floor).max(next_nonce))
258 .or_insert(next_nonce);
259 idempotency_keys.push(append.idempotency_key.clone());
260 payload_hashes.push(append.payload_hash.as_slice().to_vec());
261 payloads.push(append.payload.clone());
262 domain_statuses.push(append.domain_status.as_str().to_string());
263 delivery_statuses.push(append.delivery_status.as_str().to_string());
264 created_ts_values.push(i64::try_from(append.created_ts_ms).unwrap_or_else(|_| {
265 panic!(
266 "RUNTIME_INVARIANT: directive created_ts_ms {} exceeds i64 range",
267 append.created_ts_ms
268 )
269 }));
270 expires_at_values.push(append.expires_at_ms.map(|expires| {
271 i64::try_from(expires).unwrap_or_else(|_| {
272 panic!(
273 "RUNTIME_INVARIANT: directive expires_at_ms {} exceeds i64 range",
274 expires
275 )
276 })
277 }));
278 }
279 }
280
281 if directive_ids.is_empty() {
282 return Ok(0);
283 }
284
285 let inserted = diesel::sql_query(
286 r#"
287 INSERT INTO directive_outbox (
288 directive_id, command_id, kind, action_key, wallet_address,
289 account_address, signer_address, directive_nonce, idempotency_key,
290 payload_hash, payload, domain_status, delivery_status,
291 created_ts_ms, expires_at_ms
292 )
293 SELECT *
294 FROM UNNEST(
295 $1::text[],
296 $2::bigint[],
297 $3::text[],
298 $4::public.directive_action_key[],
299 $5::bytea[],
300 $6::bytea[],
301 $7::bytea[],
302 $8::bigint[],
303 $9::text[],
304 $10::bytea[],
305 $11::bytea[],
306 $12::text[],
307 $13::text[],
308 $14::bigint[],
309 $15::bigint[]
310 )
311 ON CONFLICT (directive_id) DO NOTHING
312 "#,
313 )
314 .bind::<Array<Text>, _>(directive_ids)
315 .bind::<Array<BigInt>, _>(command_ids)
316 .bind::<Array<Text>, _>(kinds)
317 .bind::<Array<Text>, _>(action_keys)
318 .bind::<Array<Binary>, _>(wallet_addresses)
319 .bind::<Array<Binary>, _>(account_addresses)
320 .bind::<Array<Binary>, _>(signer_addresses)
321 .bind::<Array<BigInt>, _>(directive_nonces)
322 .bind::<Array<Text>, _>(idempotency_keys)
323 .bind::<Array<Binary>, _>(payload_hashes)
324 .bind::<Array<Binary>, _>(payloads)
325 .bind::<Array<Text>, _>(domain_statuses)
326 .bind::<Array<Text>, _>(delivery_statuses)
327 .bind::<Array<BigInt>, _>(created_ts_values)
328 .bind::<Array<Nullable<BigInt>>, _>(expires_at_values)
329 .execute(conn)?;
330
331 let (nonce_signers, nonce_floors): (Vec<_>, Vec<_>) = signer_nonce_floors.into_iter().unzip();
332 diesel::sql_query(
333 r#"
334 INSERT INTO rsm_signer_nonces (signer_address, next_nonce, last_synced_nonce, created_at, updated_at)
335 SELECT signer_address, next_nonce, NULL, NOW(), NOW()
336 FROM UNNEST($1::bytea[], $2::bigint[]) AS t(signer_address, next_nonce)
337 ON CONFLICT (signer_address) DO UPDATE
338 SET next_nonce = GREATEST(rsm_signer_nonces.next_nonce, EXCLUDED.next_nonce),
339 updated_at = NOW()
340 "#,
341 )
342 .bind::<Array<Binary>, _>(nonce_signers)
343 .bind::<Array<BigInt>, _>(nonce_floors)
344 .execute(conn)?;
345
346 Ok(inserted)
347}
348
349fn persist_journal_side_effects(
350 conn: &mut PgConnection,
351 entries: &[&hypercall_db::EngineJournalEntryInsert],
352 inserted_commands: &HashMap<uuid::Uuid, i64>,
353) -> QueryResult<()> {
354 for entry in entries {
355 if !inserted_commands.contains_key(&entry.request_uuid) {
356 continue;
357 }
358 let fill_side_effects_by_idx: HashMap<i32, &hypercall_db::EngineJournalFillSideEffect> =
359 entry
360 .fill_side_effects
361 .iter()
362 .map(|side_effect| (side_effect.event_idx, side_effect))
363 .collect();
364
365 for (idx, event) in entry.events.iter().enumerate() {
366 if event.event_type == hypercall_db::EventType::OrderInfo {
367 persist_order_info_event_in_tx(conn, idx, event)?;
368 }
369 if event.event_type == hypercall_db::EventType::OrderUpdate {
370 persist_order_update_event_in_tx(conn, idx, event)?;
371 }
372 if event.event_type != hypercall_db::EventType::OrderFilled {
373 continue;
374 }
375
376 let fill = match hypercall_types::EngineMessage::deserialize_from_wire(
377 &event.event_topic,
378 &event.event_data,
379 ) {
380 Ok(hypercall_types::EngineMessage::OrderFilled { fill, .. }) => fill,
381 Ok(other) => {
382 panic!(
383 "CRITICAL_FAILURE: expected OrderFilled payload at idx {}, got {}",
384 idx,
385 other.type_name()
386 )
387 }
388 Err(e) => {
389 panic!(
390 "CRITICAL_FAILURE: failed to deserialize journaled fill event at idx {}: {}",
391 idx, e
392 )
393 }
394 };
395 let side_effect = fill_side_effects_by_idx
396 .get(&(idx as i32))
397 .unwrap_or_else(|| {
398 panic!(
399 "CRITICAL_FAILURE: missing journal fill side-effect for request {:?}, event_idx={}, trade_id={}",
400 entry.request_uuid, idx, fill.trade_id
401 )
402 });
403 if side_effect.trade_id != fill.trade_id {
404 panic!(
405 "CRITICAL_FAILURE: journal fill side-effect trade_id mismatch for request {:?}, event_idx={}: side_effect={}, fill={}",
406 entry.request_uuid, idx, side_effect.trade_id, fill.trade_id
407 );
408 }
409
410 let side_effect = crate::event_handler::FillSideEffect {
411 trade_id: side_effect.trade_id,
412 taker_ledger_delta: side_effect.taker_ledger_delta,
413 maker_ledger_delta: side_effect.maker_ledger_delta,
414 taker_premium_delta: side_effect.taker_premium_delta,
415 maker_premium_delta: side_effect.maker_premium_delta,
416 underlying_notional: side_effect.underlying_notional,
417 };
418 crate::event_handler::persist_legacy_replay_fill_with_side_effects_in_tx(
419 conn,
420 &fill,
421 &side_effect,
422 )
423 .map_err(|e| diesel_unknown_error(anyhow!(e.to_string())))?;
424 }
425 }
426
427 for entry in entries {
428 if !inserted_commands.contains_key(&entry.request_uuid) {
429 continue;
430 }
431 for balance_update in &entry.balance_updates {
432 crate::ledger_ops::apply_pnl_decimal_sync(
433 conn,
434 &balance_update.update.wallet,
435 balance_update.update.delta,
436 )
437 .map_err(|e| diesel_unknown_error(anyhow!(e.to_string())))?;
438 }
439
440 if let Some(ref side_effect) = entry.cash_withdrawal_side_effect {
441 persist_cash_withdrawal_side_effect_in_tx(conn, side_effect)?;
442 }
443 }
444
445 Ok(())
446}
447
448fn persist_order_info_event_in_tx(
449 conn: &mut PgConnection,
450 idx: usize,
451 event: &hypercall_db::EngineJournalEventInsert,
452) -> QueryResult<()> {
453 let msg = rmp_serde::from_slice::<hypercall_types::OrderInfoMessage>(&event.event_data[1..])
454 .unwrap_or_else(|e| {
455 panic!(
456 "CRITICAL_FAILURE: failed to deserialize journaled OrderInfo at idx {}: {}",
457 idx, e
458 )
459 });
460 let new_order_info = crate::models::NewOrderInfo {
461 order_id: msg.order_id as i64,
462 wallet_address: msg.wallet,
463 symbol: msg.info.symbol.clone(),
464 side: format!("{:?}", msg.info.side),
465 price: msg.info.price,
466 size: msg.info.size,
467 tif: format!("{:?}", msg.info.tif),
468 client_id: msg.info.client_id.clone(),
469 is_perp: msg.info.is_perp,
470 underlying: msg.info.underlying.clone(),
471 reduce_only: msg.info.reduce_only,
472 nonce: msg.info.nonce.map(|n| n as i64),
473 signature: msg.info.signature.clone(),
474 mmp_enabled: msg.info.mmp_enabled,
475 timestamp: msg.timestamp as i64,
476 status: "ACKED".to_string(),
477 filled_size: rust_decimal::Decimal::ZERO,
478 last_materialized_order_update_id: 0,
479 };
480 diesel::insert_into(crate::schema::order_infos::table)
481 .values(&new_order_info)
482 .on_conflict(crate::schema::order_infos::order_id)
483 .do_update()
484 .set((
485 crate::schema::order_infos::wallet_address
486 .eq(excluded(crate::schema::order_infos::wallet_address)),
487 crate::schema::order_infos::symbol.eq(excluded(crate::schema::order_infos::symbol)),
488 crate::schema::order_infos::side.eq(excluded(crate::schema::order_infos::side)),
489 crate::schema::order_infos::price.eq(excluded(crate::schema::order_infos::price)),
490 crate::schema::order_infos::size.eq(excluded(crate::schema::order_infos::size)),
491 crate::schema::order_infos::tif.eq(excluded(crate::schema::order_infos::tif)),
492 crate::schema::order_infos::client_id
493 .eq(excluded(crate::schema::order_infos::client_id)),
494 crate::schema::order_infos::is_perp.eq(excluded(crate::schema::order_infos::is_perp)),
495 crate::schema::order_infos::underlying
496 .eq(excluded(crate::schema::order_infos::underlying)),
497 crate::schema::order_infos::reduce_only
498 .eq(excluded(crate::schema::order_infos::reduce_only)),
499 crate::schema::order_infos::nonce.eq(excluded(crate::schema::order_infos::nonce)),
500 crate::schema::order_infos::signature
501 .eq(excluded(crate::schema::order_infos::signature)),
502 crate::schema::order_infos::mmp_enabled
503 .eq(excluded(crate::schema::order_infos::mmp_enabled)),
504 crate::schema::order_infos::timestamp
505 .eq(excluded(crate::schema::order_infos::timestamp)),
506 ))
507 .execute(conn)?;
508 Ok(())
509}
510
511fn persist_order_update_event_in_tx(
512 conn: &mut PgConnection,
513 idx: usize,
514 event: &hypercall_db::EngineJournalEventInsert,
515) -> QueryResult<()> {
516 let msg = rmp_serde::from_slice::<hypercall_types::OrderUpdateMessage>(&event.event_data[1..])
517 .unwrap_or_else(|e| {
518 panic!(
519 "CRITICAL_FAILURE: failed to deserialize journaled OrderUpdate at idx {}: {}",
520 idx, e
521 )
522 });
523 let current_status = match msg.status {
524 hypercall_types::OrderUpdateStatus::Acked => "ACKED".to_string(),
525 hypercall_types::OrderUpdateStatus::Open => "OPEN".to_string(),
526 hypercall_types::OrderUpdateStatus::PartiallyFilled => "PARTIALLY_FILLED".to_string(),
527 hypercall_types::OrderUpdateStatus::Filled => "FILLED".to_string(),
528 hypercall_types::OrderUpdateStatus::Canceled => "CANCELED".to_string(),
529 hypercall_types::OrderUpdateStatus::Rejected => "REJECTED".to_string(),
530 };
531 let new_order_update = crate::models::NewOrderUpdate {
532 timestamp: msg.timestamp as i64,
533 order_id: msg.order_id.map(|id| id as i64),
534 status: current_status.clone(),
535 reason: msg.reason.clone(),
536 filled_size: msg.filled_size,
537 symbol: msg.info.symbol.clone(),
538 price: msg.info.price,
539 size: msg.info.size,
540 side: format!("{:?}", msg.info.side),
541 tif: format!("{:?}", msg.info.tif),
542 client_id: msg.info.client_id.clone(),
543 };
544 let order_update_id = diesel::insert_into(crate::schema::order_updates::table)
545 .values(&new_order_update)
546 .returning(crate::schema::order_updates::id)
547 .get_result::<Option<i32>>(conn)?
548 .expect("order_updates.id should be populated after insert");
549
550 if let Some(order_id) = msg.order_id {
551 let materialized_order_info = crate::models::NewOrderInfo {
552 order_id: order_id as i64,
553 wallet_address: msg.wallet_address,
554 symbol: msg.info.symbol.clone(),
555 side: format!("{:?}", msg.info.side),
556 price: msg.info.price,
557 size: msg.info.size,
558 tif: format!("{:?}", msg.info.tif),
559 client_id: msg.info.client_id.clone(),
560 is_perp: msg.info.is_perp,
561 underlying: msg.info.underlying.clone(),
562 reduce_only: msg.info.reduce_only,
563 nonce: msg.info.nonce.map(|n| n as i64),
564 signature: msg.info.signature.clone(),
565 mmp_enabled: msg.info.mmp_enabled,
566 timestamp: msg.timestamp as i64,
567 status: current_status,
568 filled_size: msg.filled_size,
569 last_materialized_order_update_id: order_update_id,
570 };
571 upsert_materialized_order_state(conn, &materialized_order_info)?;
572 }
573 Ok(())
574}
575
576fn persist_cash_withdrawal_side_effect_in_tx(
577 conn: &mut diesel::PgConnection,
578 side_effect: &hypercall_db::EngineJournalCashWithdrawalSideEffect,
579) -> QueryResult<()> {
580 use diesel::sql_types::{BigInt, Binary, Numeric, Text};
581
582 #[derive(diesel::QueryableByName)]
583 struct InsertedEventRow {
584 #[diesel(sql_type = BigInt)]
585 id: i64,
586 }
587
588 #[derive(diesel::QueryableByName)]
589 struct LedgerEventRow {
590 #[diesel(sql_type = BigInt)]
591 id: i64,
592 }
593
594 let event_time_ms = i64::try_from(side_effect.timestamp_ms).unwrap_or_else(|_| {
595 panic!(
596 "STATE_CORRUPTION: timestamp_ms {} exceeds i64 range for cash withdrawal {}",
597 side_effect.timestamp_ms, side_effect.request_id
598 )
599 });
600
601 let inserted = diesel::sql_query(
602 "INSERT INTO hypercore_cash_ledger_events
603 (wallet, event_hash, event_time_ms, delta_type, amount_usdc)
604 VALUES ($1, $2, $3, 'withdraw', $4)
605 ON CONFLICT (wallet, event_hash, delta_type) DO NOTHING
606 RETURNING id",
607 )
608 .bind::<Binary, _>(side_effect.wallet.as_bytes())
609 .bind::<Text, _>(&side_effect.request_id)
610 .bind::<BigInt, _>(event_time_ms)
611 .bind::<Numeric, _>(side_effect.amount)
612 .get_result::<InsertedEventRow>(conn)
613 .optional()?;
614
615 let Some(inserted) = inserted else {
616 return Ok(());
617 };
618
619 let ledger = diesel::sql_query(
620 "INSERT INTO ledger_events (wallet, event_ts_ms, delta, event_type, reference_symbol)
621 VALUES ($1, $2, $3, 'withdraw', $4)
622 RETURNING id",
623 )
624 .bind::<Binary, _>(side_effect.wallet.as_bytes())
625 .bind::<BigInt, _>(event_time_ms)
626 .bind::<Numeric, _>(-side_effect.amount)
627 .bind::<Text, _>(&side_effect.request_id)
628 .get_result::<LedgerEventRow>(conn)?;
629
630 diesel::sql_query(
631 "UPDATE hypercore_cash_ledger_events
632 SET ledger_event_id = $2,
633 balance_after = $3,
634 status = 'submitted',
635 error = NULL,
636 updated_at = NOW()
637 WHERE id = $1",
638 )
639 .bind::<BigInt, _>(inserted.id)
640 .bind::<BigInt, _>(ledger.id)
641 .bind::<Numeric, _>(side_effect.balance_after)
642 .execute(conn)?;
643
644 Ok(())
645}
646
647#[cfg(feature = "rsm-state")]
648mod rsm {
649 use super::*;
650 use diesel::sql_types::{Bytea, Integer, SmallInt, Uuid};
651
652 #[derive(QueryableByName)]
653 struct ExistingRootSummaryRow {
654 #[diesel(sql_type = BigInt)]
655 batch_seq: i64,
656 #[diesel(sql_type = Bytea)]
657 state_root: Vec<u8>,
658 #[diesel(sql_type = Bytea)]
659 risk_root: Vec<u8>,
660 #[diesel(sql_type = Bytea)]
661 command_mmr_root: Vec<u8>,
662 #[diesel(sql_type = Bytea)]
663 obligation_mmr_root: Vec<u8>,
664 #[diesel(sql_type = Bytea)]
665 intent_mmr_root: Vec<u8>,
666 #[diesel(sql_type = Bytea)]
667 batch_root: Vec<u8>,
668 #[diesel(sql_type = BigInt)]
669 command_range_start: i64,
670 #[diesel(sql_type = BigInt)]
671 command_range_end: i64,
672 #[diesel(sql_type = BigInt)]
673 command_count: i64,
674 #[diesel(sql_type = Integer)]
675 schema_version: i32,
676 #[diesel(sql_type = Bytea)]
677 accepted_block_hash: Vec<u8>,
678 }
679
680 #[derive(QueryableByName)]
681 struct ExistingBlockRow {
682 #[diesel(sql_type = Bytea)]
683 hash: Vec<u8>,
684 #[diesel(sql_type = Bytea)]
685 parent_hash: Vec<u8>,
686 #[diesel(sql_type = Bytea)]
687 commands_hash: Vec<u8>,
688 #[diesel(sql_type = Bytea)]
689 batch_root: Vec<u8>,
690 #[diesel(sql_type = BigInt)]
691 command_count: i64,
692 #[diesel(sql_type = BigInt)]
693 first_command_seq: i64,
694 #[diesel(sql_type = BigInt)]
695 last_command_seq: i64,
696 #[diesel(sql_type = Nullable<Bytea>)]
697 signer: Option<Vec<u8>>,
698 #[diesel(sql_type = Nullable<Bytea>)]
699 signature: Option<Vec<u8>>,
700 }
701
702 #[derive(QueryableByName)]
703 struct ExistingBlockCommandRow {
704 #[diesel(sql_type = BigInt)]
705 command_index: i64,
706 #[diesel(sql_type = BigInt)]
707 engine_command_id: i64,
708 #[diesel(sql_type = Uuid)]
709 request_uuid: DbUuid,
710 #[diesel(sql_type = Text)]
711 command_type: String,
712 #[diesel(sql_type = Bytea)]
713 command_data: Vec<u8>,
714 #[diesel(sql_type = Bytea)]
715 command_identity_hash: Vec<u8>,
716 }
717
718 pub(super) fn persist_rsm_blocks_in_tx(
719 conn: &mut PgConnection,
720 input: &hypercall_db::EngineJournalRsmBlockBatch,
721 ) -> Result<()> {
722 let mut latest_summary = None;
723
724 for block in &input.blocks {
725 let advance_current = input.mode == hypercall_db::EngineJournalRsmPersistenceMode::Live;
726 save_root_summary_in_tx(conn, &block.root_summary, advance_current)?;
727 if input.mode == hypercall_db::EngineJournalRsmPersistenceMode::Replay {
728 latest_summary = Some(block.root_summary.clone());
729 }
730 save_block_header_in_tx(conn, &block.header)?;
731
732 let request_ids: Vec<_> = block
733 .commands
734 .iter()
735 .map(|cmd| DbUuid(cmd.request_uuid))
736 .collect();
737 let command_ids: Vec<(DbUuid, i64)> = engine_commands::table
738 .filter(engine_commands::request_uuid.eq_any(&request_ids))
739 .select((engine_commands::request_uuid, engine_commands::command_id))
740 .load(conn)?;
741 let command_ids_by_request_uuid: HashMap<_, _> = command_ids
742 .into_iter()
743 .map(|(id, command_id)| (id.0, command_id))
744 .collect();
745
746 let commands = block
747 .commands
748 .iter()
749 .map(|command| {
750 let engine_command_id = *command_ids_by_request_uuid
751 .get(&command.request_uuid)
752 .ok_or_else(|| {
753 anyhow!(
754 "missing engine_command_id for RSM block {} request {}",
755 block.header.height,
756 command.request_uuid
757 )
758 })?;
759 Ok(hypercall_db::NewRsmBlockCommand {
760 environment: block.header.environment,
761 height: block.header.height,
762 rsm_command_seq: command.rsm_command_seq,
763 command_index: command.command_index,
764 engine_command_id,
765 request_uuid: command.request_uuid,
766 command_type: command.command_type.clone(),
767 command_data: command.command_data.clone(),
768 command_identity_hash: command.command_identity_hash,
769 })
770 })
771 .collect::<Result<Vec<_>>>()?;
772 save_block_commands_in_tx(conn, &commands)?;
773 }
774
775 if let Some(summary) = latest_summary {
776 let current_version = current_version_in_tx(conn, summary.environment)?;
777 if current_version
778 .map(|current| current <= summary.version)
779 .unwrap_or(true)
780 {
781 save_root_summary_in_tx(conn, &summary, true)?;
782 }
783 }
784
785 Ok(())
786 }
787
788 fn u64_to_i64(name: &str, value: u64) -> Result<i64> {
789 i64::try_from(value).map_err(|_| anyhow!("{name} overflows BIGINT: {value}"))
790 }
791
792 fn save_root_summary_in_tx(
793 conn: &mut PgConnection,
794 summary: &hypercall_db::NewValidatorRsmRootSummary,
795 advance_current: bool,
796 ) -> Result<()> {
797 let version = u64_to_i64("version", summary.version)?;
798 let batch_seq = u64_to_i64("batch_seq", summary.batch_seq)?;
799 let command_range_start = u64_to_i64("command_range_start", summary.command_range_start)?;
800 let command_range_end = u64_to_i64("command_range_end", summary.command_range_end)?;
801 let command_count = u64_to_i64("command_count", summary.command_count)?;
802
803 let inserted = diesel::sql_query(
804 "INSERT INTO validator_rsm_batch_roots (
805 environment, version, batch_seq, state_root, risk_root, command_mmr_root,
806 obligation_mmr_root, intent_mmr_root, batch_root, command_range_start,
807 command_range_end, command_count, schema_version, accepted_block_hash
808 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
809 ON CONFLICT (environment, version) DO NOTHING",
810 )
811 .bind::<SmallInt, _>(summary.environment.as_i16())
812 .bind::<BigInt, _>(version)
813 .bind::<BigInt, _>(batch_seq)
814 .bind::<Bytea, _>(&summary.state_root[..])
815 .bind::<Bytea, _>(&summary.risk_root[..])
816 .bind::<Bytea, _>(&summary.command_mmr_root[..])
817 .bind::<Bytea, _>(&summary.obligation_mmr_root[..])
818 .bind::<Bytea, _>(&summary.intent_mmr_root[..])
819 .bind::<Bytea, _>(&summary.batch_root[..])
820 .bind::<BigInt, _>(command_range_start)
821 .bind::<BigInt, _>(command_range_end)
822 .bind::<BigInt, _>(command_count)
823 .bind::<Integer, _>(summary.schema_version)
824 .bind::<Bytea, _>(&summary.accepted_block_hash[..])
825 .execute(conn)?;
826
827 if inserted == 0 {
828 let existing = diesel::sql_query(
829 "SELECT batch_seq, state_root, risk_root, command_mmr_root,
830 obligation_mmr_root, intent_mmr_root, batch_root, command_range_start,
831 command_range_end, command_count, schema_version, accepted_block_hash
832 FROM validator_rsm_batch_roots
833 WHERE environment = $1 AND version = $2",
834 )
835 .bind::<SmallInt, _>(summary.environment.as_i16())
836 .bind::<BigInt, _>(version)
837 .get_result::<ExistingRootSummaryRow>(conn)?;
838
839 let matches_existing = existing.batch_seq == batch_seq
840 && existing.state_root == summary.state_root.as_slice()
841 && existing.risk_root == summary.risk_root.as_slice()
842 && existing.command_mmr_root == summary.command_mmr_root.as_slice()
843 && existing.obligation_mmr_root == summary.obligation_mmr_root.as_slice()
844 && existing.intent_mmr_root == summary.intent_mmr_root.as_slice()
845 && existing.batch_root == summary.batch_root.as_slice()
846 && existing.command_range_start == command_range_start
847 && existing.command_range_end == command_range_end
848 && existing.command_count == command_count
849 && existing.schema_version == summary.schema_version
850 && existing.accepted_block_hash == summary.accepted_block_hash.as_slice();
851 if !matches_existing {
852 anyhow::bail!(
853 "conflicting validator RSM root summary for {} version {}",
854 summary.environment,
855 summary.version
856 );
857 }
858 }
859
860 if advance_current {
861 let affected = diesel::sql_query(
862 "INSERT INTO validator_rsm_current_state (environment, current_version, updated_at)
863 VALUES ($1, $2, NOW())
864 ON CONFLICT (environment) DO UPDATE
865 SET current_version = EXCLUDED.current_version, updated_at = NOW()
866 WHERE validator_rsm_current_state.current_version <= EXCLUDED.current_version",
867 )
868 .bind::<SmallInt, _>(summary.environment.as_i16())
869 .bind::<BigInt, _>(version)
870 .execute(conn)?;
871 if affected == 0 {
872 anyhow::bail!(
873 "refusing to move validator RSM current version for {} backward to {}",
874 summary.environment,
875 version
876 );
877 }
878 }
879
880 Ok(())
881 }
882
883 fn save_block_header_in_tx(
884 conn: &mut PgConnection,
885 block: &hypercall_db::NewRsmBlockHeader,
886 ) -> Result<()> {
887 let height = u64_to_i64("height", block.height)?;
888 let command_count = u64_to_i64("command_count", block.command_count)?;
889 let first_command_seq = u64_to_i64("first_command_seq", block.first_command_seq)?;
890 let last_command_seq = u64_to_i64("last_command_seq", block.last_command_seq)?;
891
892 let inserted = diesel::sql_query(
893 "INSERT INTO validator_rsm_blocks (
894 environment, height, hash, parent_hash, commands_hash, batch_root,
895 command_count, first_command_seq, last_command_seq, signer, signature
896 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
897 ON CONFLICT (environment, height) DO NOTHING",
898 )
899 .bind::<SmallInt, _>(block.environment.as_i16())
900 .bind::<BigInt, _>(height)
901 .bind::<Bytea, _>(&block.hash[..])
902 .bind::<Bytea, _>(&block.parent_hash[..])
903 .bind::<Bytea, _>(&block.commands_hash[..])
904 .bind::<Bytea, _>(&block.batch_root[..])
905 .bind::<BigInt, _>(command_count)
906 .bind::<BigInt, _>(first_command_seq)
907 .bind::<BigInt, _>(last_command_seq)
908 .bind::<Nullable<Bytea>, _>(block.signer.as_ref().map(|signer| signer.as_slice()))
909 .bind::<Nullable<Bytea>, _>(block.signature.as_deref())
910 .execute(conn)?;
911
912 if inserted == 0 {
913 let existing = diesel::sql_query(
914 "SELECT hash, parent_hash, commands_hash, batch_root, command_count,
915 first_command_seq, last_command_seq, signer, signature
916 FROM validator_rsm_blocks
917 WHERE environment = $1 AND height = $2",
918 )
919 .bind::<SmallInt, _>(block.environment.as_i16())
920 .bind::<BigInt, _>(height)
921 .get_result::<ExistingBlockRow>(conn)?;
922
923 let matches_existing = existing.hash == block.hash.as_slice()
924 && existing.parent_hash == block.parent_hash.as_slice()
925 && existing.commands_hash == block.commands_hash.as_slice()
926 && existing.batch_root == block.batch_root.as_slice()
927 && existing.command_count == command_count
928 && existing.first_command_seq == first_command_seq
929 && existing.last_command_seq == last_command_seq
930 && existing.signer.as_deref() == block.signer.as_ref().map(|s| &s[..])
931 && existing.signature.as_deref() == block.signature.as_deref();
932 if !matches_existing {
933 anyhow::bail!(
934 "conflicting validator RSM block for {} height {}",
935 block.environment,
936 block.height
937 );
938 }
939 }
940
941 Ok(())
942 }
943
944 fn save_block_commands_in_tx(
945 conn: &mut PgConnection,
946 commands: &[hypercall_db::NewRsmBlockCommand],
947 ) -> Result<()> {
948 for command in commands {
949 let height = u64_to_i64("height", command.height)?;
950 let rsm_command_seq = u64_to_i64("rsm_command_seq", command.rsm_command_seq)?;
951 let command_index = u64_to_i64("command_index", command.command_index)?;
952 let inserted = diesel::sql_query(
953 "INSERT INTO validator_rsm_block_commands (
954 environment, height, rsm_command_seq, command_index, engine_command_id,
955 request_uuid, command_type, command_data, command_identity_hash
956 ) VALUES ($1, $2, $3, $4, $5, $6, $7::engine_command_type, $8, $9)
957 ON CONFLICT (environment, height, rsm_command_seq) DO NOTHING",
958 )
959 .bind::<SmallInt, _>(command.environment.as_i16())
960 .bind::<BigInt, _>(height)
961 .bind::<BigInt, _>(rsm_command_seq)
962 .bind::<BigInt, _>(command_index)
963 .bind::<BigInt, _>(command.engine_command_id)
964 .bind::<Uuid, _>(DbUuid(command.request_uuid))
965 .bind::<Text, _>(&command.command_type)
966 .bind::<Bytea, _>(&command.command_data)
967 .bind::<Bytea, _>(&command.command_identity_hash[..])
968 .execute(conn)?;
969
970 if inserted == 0 {
971 let existing = diesel::sql_query(
972 "SELECT command_index, engine_command_id, request_uuid, command_type::text AS command_type,
973 command_data, command_identity_hash
974 FROM validator_rsm_block_commands
975 WHERE environment = $1 AND height = $2 AND rsm_command_seq = $3",
976 )
977 .bind::<SmallInt, _>(command.environment.as_i16())
978 .bind::<BigInt, _>(height)
979 .bind::<BigInt, _>(rsm_command_seq)
980 .get_result::<ExistingBlockCommandRow>(conn)?;
981
982 let matches_existing = existing.command_index == command_index
983 && existing.engine_command_id == command.engine_command_id
984 && existing.request_uuid.0 == command.request_uuid
985 && existing.command_type == command.command_type
986 && existing.command_data == command.command_data
987 && existing.command_identity_hash == command.command_identity_hash.as_slice();
988 if !matches_existing {
989 anyhow::bail!(
990 "conflicting validator RSM block command for {} height {} sequence {}",
991 command.environment,
992 command.height,
993 command.rsm_command_seq
994 );
995 }
996 }
997 }
998
999 Ok(())
1000 }
1001
1002 fn current_version_in_tx(
1003 conn: &mut PgConnection,
1004 environment: hypercall_db::ValidatorRsmEnvironment,
1005 ) -> Result<Option<u64>> {
1006 #[derive(QueryableByName)]
1007 struct CurrentVersionRow {
1008 #[diesel(sql_type = BigInt)]
1009 current_version: i64,
1010 }
1011
1012 let row = diesel::sql_query(
1013 "SELECT current_version FROM validator_rsm_current_state WHERE environment = $1",
1014 )
1015 .bind::<SmallInt, _>(environment.as_i16())
1016 .get_result::<CurrentVersionRow>(conn)
1017 .optional()?;
1018
1019 row.map(|row| {
1020 u64::try_from(row.current_version)
1021 .map_err(|_| anyhow!("current_version is negative: {}", row.current_version))
1022 })
1023 .transpose()
1024 }
1025}
1026
1027#[cfg(feature = "rsm-state")]
1028use rsm::persist_rsm_blocks_in_tx;
1029
1030#[cfg(test)]
1031mod tests {
1032 use super::*;
1033 use crate::test_helpers::TestDb;
1034 use hypercall_db::EngineJournalBatchWriter;
1035 use hypercall_types::wallet_address::test_wallet;
1036 use rust_decimal::Decimal;
1037 use rust_decimal_macros::dec;
1038
1039 #[test]
1040 fn event_type_mapping_covers_writer_trait_variants() {
1041 let variants = [
1042 hypercall_db::EventType::OrderAction,
1043 hypercall_db::EventType::OrderUpdate,
1044 hypercall_db::EventType::OrderInfo,
1045 hypercall_db::EventType::MarketAction,
1046 hypercall_db::EventType::MarketUpdate,
1047 hypercall_db::EventType::OrderFilled,
1048 hypercall_db::EventType::OrderbookUpdated,
1049 hypercall_db::EventType::L2Update,
1050 hypercall_db::EventType::Trade,
1051 hypercall_db::EventType::TransactionRequest,
1052 hypercall_db::EventType::TransactionUpdate,
1053 hypercall_db::EventType::MmpTriggered,
1054 hypercall_db::EventType::PositionExpired,
1055 hypercall_db::EventType::TierUpdate,
1056 hypercall_db::EventType::HypercorePositionUpdate,
1057 hypercall_db::EventType::LiquidationStateChange,
1058 hypercall_db::EventType::RfqFilled,
1059 ];
1060
1061 for event_type in variants {
1062 assert_eq!(
1063 event_type_to_diesel(event_type).as_str(),
1064 event_type.to_string()
1065 );
1066 }
1067 }
1068
1069 #[test]
1070 fn command_type_parser_fails_closed_for_unknown_values() {
1071 assert_eq!(
1072 parse_command_type("CreateOrder")
1073 .expect("CreateOrder maps")
1074 .as_str(),
1075 "CreateOrder"
1076 );
1077 assert!(
1078 parse_command_type("InventedCommand").is_err(),
1079 "unknown command types must not silently map to a default"
1080 );
1081 }
1082
1083 fn journal_entry_with_balance_update(
1084 request_uuid: uuid::Uuid,
1085 wallet: hypercall_types::WalletAddress,
1086 delta: Decimal,
1087 balance_after: Decimal,
1088 ) -> hypercall_db::EngineJournalEntryInsert {
1089 hypercall_db::EngineJournalEntryInsert {
1090 request_uuid,
1091 received_ts_ms: 1_700_000_000_000,
1092 command_data: b"test-command".to_vec(),
1093 response_data: None,
1094 order_id: None,
1095 command_type: None,
1096 duration_ms: 1,
1097 pre_digest_data: vec![],
1098 post_digest_data: vec![],
1099 events: vec![],
1100 outbox_appends: vec![],
1101 fill_side_effects: vec![],
1102 cash_withdrawal_side_effect: None,
1103 balance_updates: vec![hypercall_db::EngineJournalBalanceUpdate {
1104 update: hypercall_types::BalanceUpdate {
1105 balance_update_seq: 1,
1106 wallet,
1107 delta,
1108 balance_after,
1109 reason: hypercall_types::BalanceUpdateReason::Deposit,
1110 reference_id: Some("journal-projection-test".to_string()),
1111 source_command_id: None,
1112 timestamp_ms: 1_700_000_000_000,
1113 },
1114 }],
1115 }
1116 }
1117
1118 fn account_balance(
1119 conn: &mut PgConnection,
1120 wallet: hypercall_types::WalletAddress,
1121 ) -> Option<Decimal> {
1122 crate::schema::account_balances::table
1123 .filter(crate::schema::account_balances::account_address.eq(wallet))
1124 .select(crate::schema::account_balances::balance)
1125 .first::<Decimal>(conn)
1126 .optional()
1127 .unwrap()
1128 }
1129
1130 #[tokio::test]
1131 async fn journal_balance_updates_materialize_account_balances_projection() {
1132 let test_db = TestDb::new().await.unwrap();
1133 let wallet = test_wallet(31);
1134 let request_uuid = uuid::Uuid::new_v4();
1135 let entry = journal_entry_with_balance_update(request_uuid, wallet, dec!(125), dec!(125));
1136
1137 let result = test_db
1138 .handler
1139 .insert_engine_journal_batch_sync(&[entry], false, None)
1140 .unwrap();
1141 assert_eq!(result.inserted_count, 1);
1142
1143 let mut conn = test_db.handler.pool().get().unwrap();
1144 assert_eq!(account_balance(&mut conn, wallet), Some(dec!(125)));
1145 }
1146
1147 #[tokio::test]
1148 async fn duplicate_journal_command_does_not_double_apply_account_balances_projection() {
1149 let test_db = TestDb::new().await.unwrap();
1150 let wallet = test_wallet(32);
1151 let request_uuid = uuid::Uuid::new_v4();
1152 let entry = journal_entry_with_balance_update(request_uuid, wallet, dec!(75), dec!(75));
1153
1154 let first = test_db
1155 .handler
1156 .insert_engine_journal_batch_sync(std::slice::from_ref(&entry), false, None)
1157 .unwrap();
1158 assert_eq!(first.inserted_count, 1);
1159
1160 let duplicate = test_db
1161 .handler
1162 .insert_engine_journal_batch_sync(&[entry], false, None)
1163 .unwrap();
1164 assert_eq!(duplicate.inserted_count, 0);
1165
1166 let mut conn = test_db.handler.pool().get().unwrap();
1167 assert_eq!(account_balance(&mut conn, wallet), Some(dec!(75)));
1168 }
1169
1170 #[tokio::test]
1171 async fn mixed_duplicate_and_new_batch_applies_only_new_balance_projection() {
1172 let test_db = TestDb::new().await.unwrap();
1173 let wallet = test_wallet(33);
1174 let duplicate_uuid = uuid::Uuid::new_v4();
1175 let new_uuid = uuid::Uuid::new_v4();
1176 let duplicate =
1177 journal_entry_with_balance_update(duplicate_uuid, wallet, dec!(75), dec!(75));
1178 let new_entry = journal_entry_with_balance_update(new_uuid, wallet, dec!(25), dec!(100));
1179
1180 let first = test_db
1181 .handler
1182 .insert_engine_journal_batch_sync(std::slice::from_ref(&duplicate), false, None)
1183 .unwrap();
1184 assert_eq!(first.inserted_count, 1);
1185
1186 let mixed = test_db
1187 .handler
1188 .insert_engine_journal_batch_sync(&[duplicate, new_entry], false, None)
1189 .unwrap();
1190 assert_eq!(mixed.inserted_count, 1);
1191
1192 let mut conn = test_db.handler.pool().get().unwrap();
1193 assert_eq!(account_balance(&mut conn, wallet), Some(dec!(100)));
1194 }
1195}