Skip to main content

hypercall_db_diesel/
engine_journal.rs

1//! Diesel implementation for WAL batch journal materialization.
2
3use anyhow::{anyhow, Result};
4use diesel::prelude::*;
5use diesel::sql_types::{Array, BigInt, Binary, Nullable, Text};
6use diesel::upsert::excluded;
7use std::collections::{HashMap, HashSet};
8
9use crate::database_handler::DatabaseHandler;
10use crate::engine_enums::{CommandType, DbUuid, EventType};
11use crate::order_status_materialization::upsert_materialized_order_state;
12use crate::schema::{engine_command_digests, engine_commands, engine_events};
13
14impl hypercall_db::EngineJournalBatchWriter for DatabaseHandler {
15    fn insert_engine_journal_batch_sync(
16        &self,
17        entries: &[hypercall_db::EngineJournalEntryInsert],
18        persist_digests: bool,
19        rsm_blocks: Option<&hypercall_db::EngineJournalRsmBlockBatch>,
20    ) -> Result<hypercall_db::EngineJournalBatchInsertResult> {
21        #[cfg(not(feature = "rsm-state"))]
22        if rsm_blocks.is_some() {
23            return Err(anyhow!(
24                "RSM block persistence requested but hypercall-db-diesel/rsm-state is disabled"
25            ));
26        }
27
28        if entries.is_empty() {
29            return Ok(hypercall_db::EngineJournalBatchInsertResult {
30                inserted_count: 0,
31                inserted_commands: vec![],
32            });
33        }
34
35        let mut conn = self.pool().get()?;
36
37        conn.transaction(|conn| {
38            let unique_entries = dedupe_entries_by_request_uuid(entries);
39            let command_values: Vec<_> = unique_entries
40                .iter()
41                .map(|entry| {
42                    let command_type_enum = entry
43                        .command_type
44                        .as_deref()
45                        .map(parse_command_type)
46                        .transpose()?;
47                    Ok((
48                        engine_commands::received_ts_ms.eq(entry.received_ts_ms as i64),
49                        engine_commands::command_data.eq(&entry.command_data),
50                        engine_commands::response_data.eq(&entry.response_data),
51                        engine_commands::order_id.eq(entry.order_id),
52                        engine_commands::request_uuid.eq(DbUuid(entry.request_uuid)),
53                        engine_commands::command_type_enum.eq(command_type_enum),
54                    ))
55                })
56                .collect::<Result<Vec<_>, diesel::result::Error>>()?;
57
58            let inserted_commands: Vec<(DbUuid, i64)> = diesel::insert_into(engine_commands::table)
59                .values(&command_values)
60                .on_conflict(engine_commands::request_uuid)
61                .do_nothing()
62                .returning((engine_commands::request_uuid, engine_commands::command_id))
63                .get_results(conn)?;
64
65            let total_inserted = inserted_commands.len();
66            if total_inserted == 0 {
67                #[cfg(feature = "rsm-state")]
68                if let Some(rsm_blocks) = rsm_blocks {
69                    persist_rsm_blocks_in_tx(conn, rsm_blocks).map_err(diesel_unknown_error)?;
70                }
71                #[cfg(not(feature = "rsm-state"))]
72                let _ = rsm_blocks;
73                return Ok(hypercall_db::EngineJournalBatchInsertResult {
74                    inserted_count: 0,
75                    inserted_commands: vec![],
76                });
77            }
78
79            let request_uuid_to_command_id: HashMap<uuid::Uuid, i64> = inserted_commands
80                .iter()
81                .map(|(request_uuid, cmd_id)| (request_uuid.0, *cmd_id))
82                .collect();
83
84            let mut event_values = Vec::new();
85            for entry in &unique_entries {
86                if let Some(&command_id) = request_uuid_to_command_id.get(&entry.request_uuid) {
87                    for (idx, event) in entry.events.iter().enumerate() {
88                        event_values.push((
89                            engine_events::command_id.eq(command_id),
90                            engine_events::event_idx.eq(idx as i32),
91                            engine_events::event_data.eq(event.event_data.clone()),
92                            engine_events::event_key.eq(event.event_key.clone()),
93                            engine_events::l2_sequence.eq(event.l2_sequence),
94                            engine_events::event_type_enum
95                                .eq(event_type_to_diesel(event.event_type)),
96                        ));
97                    }
98                }
99            }
100
101            if !event_values.is_empty() {
102                diesel::insert_into(engine_events::table)
103                    .values(&event_values)
104                    .execute(conn)?;
105            }
106
107            insert_directive_outbox_appends(conn, &unique_entries, &request_uuid_to_command_id)?;
108            persist_journal_side_effects(conn, &unique_entries, &request_uuid_to_command_id)?;
109
110            if persist_digests {
111                let digest_values: Vec<_> = unique_entries
112                    .iter()
113                    .filter_map(|entry| {
114                        request_uuid_to_command_id
115                            .get(&entry.request_uuid)
116                            .map(|&command_id| {
117                                (
118                                    engine_command_digests::command_id.eq(command_id),
119                                    engine_command_digests::duration_ms
120                                        .eq(entry.duration_ms as i64),
121                                    engine_command_digests::pre_digest_data
122                                        .eq(entry.pre_digest_data.clone()),
123                                    engine_command_digests::post_digest_data
124                                        .eq(entry.post_digest_data.clone()),
125                                )
126                            })
127                    })
128                    .collect();
129
130                if !digest_values.is_empty() {
131                    diesel::insert_into(engine_command_digests::table)
132                        .values(&digest_values)
133                        .on_conflict(engine_command_digests::command_id)
134                        .do_nothing()
135                        .execute(conn)?;
136                }
137            }
138
139            #[cfg(feature = "rsm-state")]
140            if let Some(rsm_blocks) = rsm_blocks {
141                persist_rsm_blocks_in_tx(conn, rsm_blocks).map_err(diesel_unknown_error)?;
142            }
143
144            Ok(hypercall_db::EngineJournalBatchInsertResult {
145                inserted_count: total_inserted,
146                inserted_commands: inserted_commands
147                    .into_iter()
148                    .map(|(request_uuid, command_id)| (request_uuid.0, command_id))
149                    .collect(),
150            })
151        })
152    }
153
154    fn lookup_engine_journal_command_ids_sync(
155        &self,
156        request_ids: &[uuid::Uuid],
157    ) -> Result<Vec<(uuid::Uuid, i64)>> {
158        if request_ids.is_empty() {
159            return Ok(vec![]);
160        }
161
162        let mut conn = self.pool().get()?;
163        let request_ids: Vec<_> = request_ids.iter().copied().map(DbUuid).collect();
164        let rows: Vec<(DbUuid, i64)> = engine_commands::table
165            .filter(engine_commands::request_uuid.eq_any(request_ids))
166            .select((engine_commands::request_uuid, engine_commands::command_id))
167            .load(&mut conn)?;
168        Ok(rows
169            .into_iter()
170            .map(|(request_uuid, command_id)| (request_uuid.0, command_id))
171            .collect())
172    }
173}
174
175fn dedupe_entries_by_request_uuid(
176    entries: &[hypercall_db::EngineJournalEntryInsert],
177) -> Vec<&hypercall_db::EngineJournalEntryInsert> {
178    let mut seen = HashSet::new();
179    entries
180        .iter()
181        .filter(|entry| seen.insert(entry.request_uuid))
182        .collect()
183}
184
185fn parse_command_type(value: &str) -> Result<CommandType, diesel::result::Error> {
186    CommandType::from_str(value).ok_or_else(|| {
187        diesel_unknown_error(anyhow!("unknown engine journal command type '{}'", value))
188    })
189}
190
191fn event_type_to_diesel(event_type: hypercall_db::EventType) -> EventType {
192    EventType::from_str(&event_type.to_string()).unwrap_or_else(|| {
193        panic!(
194            "RUNTIME_INVARIANT: hypercall_db EventType '{}' has no Diesel mirror",
195            event_type
196        )
197    })
198}
199
200fn diesel_unknown_error(error: anyhow::Error) -> diesel::result::Error {
201    diesel::result::Error::DatabaseError(
202        diesel::result::DatabaseErrorKind::Unknown,
203        Box::new(error.to_string()),
204    )
205}
206
207fn insert_directive_outbox_appends(
208    conn: &mut PgConnection,
209    entries: &[&hypercall_db::EngineJournalEntryInsert],
210    request_uuid_to_command_id: &HashMap<uuid::Uuid, i64>,
211) -> QueryResult<usize> {
212    let mut directive_ids = Vec::new();
213    let mut command_ids = Vec::new();
214    let mut kinds = Vec::new();
215    let mut action_keys = Vec::new();
216    let mut wallet_addresses = Vec::new();
217    let mut account_addresses = Vec::new();
218    let mut signer_addresses = Vec::new();
219    let mut directive_nonces = Vec::new();
220    let mut idempotency_keys = Vec::new();
221    let mut payload_hashes = Vec::new();
222    let mut payloads = Vec::new();
223    let mut domain_statuses = Vec::new();
224    let mut delivery_statuses = Vec::new();
225    let mut created_ts_values = Vec::new();
226    let mut expires_at_values = Vec::new();
227    let mut signer_nonce_floors = HashMap::<Vec<u8>, i64>::new();
228
229    for entry in entries {
230        let Some(&command_id) = request_uuid_to_command_id.get(&entry.request_uuid) else {
231            continue;
232        };
233        for append in &entry.outbox_appends {
234            directive_ids.push(append.directive_id.clone());
235            command_ids.push(command_id);
236            kinds.push(append.kind.as_str().to_string());
237            action_keys.push(append.action_key.as_str().to_string());
238            wallet_addresses.push(append.wallet.as_bytes().to_vec());
239            account_addresses.push(append.account.as_bytes().to_vec());
240            let signer_address = append.signer.as_bytes().to_vec();
241            signer_addresses.push(signer_address.clone());
242            let directive_nonce = i64::try_from(append.nonce).unwrap_or_else(|_| {
243                panic!(
244                    "RUNTIME_INVARIANT: directive nonce {} exceeds i64 range",
245                    append.nonce
246                )
247            });
248            directive_nonces.push(directive_nonce);
249            let next_nonce = directive_nonce.checked_add(1).unwrap_or_else(|| {
250                panic!(
251                    "RUNTIME_INVARIANT: directive nonce {} cannot advance",
252                    append.nonce
253                )
254            });
255            signer_nonce_floors
256                .entry(signer_address)
257                .and_modify(|floor| *floor = (*floor).max(next_nonce))
258                .or_insert(next_nonce);
259            idempotency_keys.push(append.idempotency_key.clone());
260            payload_hashes.push(append.payload_hash.as_slice().to_vec());
261            payloads.push(append.payload.clone());
262            domain_statuses.push(append.domain_status.as_str().to_string());
263            delivery_statuses.push(append.delivery_status.as_str().to_string());
264            created_ts_values.push(i64::try_from(append.created_ts_ms).unwrap_or_else(|_| {
265                panic!(
266                    "RUNTIME_INVARIANT: directive created_ts_ms {} exceeds i64 range",
267                    append.created_ts_ms
268                )
269            }));
270            expires_at_values.push(append.expires_at_ms.map(|expires| {
271                i64::try_from(expires).unwrap_or_else(|_| {
272                    panic!(
273                        "RUNTIME_INVARIANT: directive expires_at_ms {} exceeds i64 range",
274                        expires
275                    )
276                })
277            }));
278        }
279    }
280
281    if directive_ids.is_empty() {
282        return Ok(0);
283    }
284
285    let inserted = diesel::sql_query(
286        r#"
287        INSERT INTO directive_outbox (
288            directive_id, command_id, kind, action_key, wallet_address,
289            account_address, signer_address, directive_nonce, idempotency_key,
290            payload_hash, payload, domain_status, delivery_status,
291            created_ts_ms, expires_at_ms
292        )
293        SELECT *
294        FROM UNNEST(
295            $1::text[],
296            $2::bigint[],
297            $3::text[],
298            $4::public.directive_action_key[],
299            $5::bytea[],
300            $6::bytea[],
301            $7::bytea[],
302            $8::bigint[],
303            $9::text[],
304            $10::bytea[],
305            $11::bytea[],
306            $12::text[],
307            $13::text[],
308            $14::bigint[],
309            $15::bigint[]
310        )
311        ON CONFLICT (directive_id) DO NOTHING
312        "#,
313    )
314    .bind::<Array<Text>, _>(directive_ids)
315    .bind::<Array<BigInt>, _>(command_ids)
316    .bind::<Array<Text>, _>(kinds)
317    .bind::<Array<Text>, _>(action_keys)
318    .bind::<Array<Binary>, _>(wallet_addresses)
319    .bind::<Array<Binary>, _>(account_addresses)
320    .bind::<Array<Binary>, _>(signer_addresses)
321    .bind::<Array<BigInt>, _>(directive_nonces)
322    .bind::<Array<Text>, _>(idempotency_keys)
323    .bind::<Array<Binary>, _>(payload_hashes)
324    .bind::<Array<Binary>, _>(payloads)
325    .bind::<Array<Text>, _>(domain_statuses)
326    .bind::<Array<Text>, _>(delivery_statuses)
327    .bind::<Array<BigInt>, _>(created_ts_values)
328    .bind::<Array<Nullable<BigInt>>, _>(expires_at_values)
329    .execute(conn)?;
330
331    let (nonce_signers, nonce_floors): (Vec<_>, Vec<_>) = signer_nonce_floors.into_iter().unzip();
332    diesel::sql_query(
333        r#"
334        INSERT INTO rsm_signer_nonces (signer_address, next_nonce, last_synced_nonce, created_at, updated_at)
335        SELECT signer_address, next_nonce, NULL, NOW(), NOW()
336        FROM UNNEST($1::bytea[], $2::bigint[]) AS t(signer_address, next_nonce)
337        ON CONFLICT (signer_address) DO UPDATE
338        SET next_nonce = GREATEST(rsm_signer_nonces.next_nonce, EXCLUDED.next_nonce),
339            updated_at = NOW()
340        "#,
341    )
342    .bind::<Array<Binary>, _>(nonce_signers)
343    .bind::<Array<BigInt>, _>(nonce_floors)
344    .execute(conn)?;
345
346    Ok(inserted)
347}
348
349fn persist_journal_side_effects(
350    conn: &mut PgConnection,
351    entries: &[&hypercall_db::EngineJournalEntryInsert],
352    inserted_commands: &HashMap<uuid::Uuid, i64>,
353) -> QueryResult<()> {
354    for entry in entries {
355        if !inserted_commands.contains_key(&entry.request_uuid) {
356            continue;
357        }
358        let fill_side_effects_by_idx: HashMap<i32, &hypercall_db::EngineJournalFillSideEffect> =
359            entry
360                .fill_side_effects
361                .iter()
362                .map(|side_effect| (side_effect.event_idx, side_effect))
363                .collect();
364
365        for (idx, event) in entry.events.iter().enumerate() {
366            if event.event_type == hypercall_db::EventType::OrderInfo {
367                persist_order_info_event_in_tx(conn, idx, event)?;
368            }
369            if event.event_type == hypercall_db::EventType::OrderUpdate {
370                persist_order_update_event_in_tx(conn, idx, event)?;
371            }
372            if event.event_type != hypercall_db::EventType::OrderFilled {
373                continue;
374            }
375
376            let fill = match hypercall_types::EngineMessage::deserialize_from_wire(
377                &event.event_topic,
378                &event.event_data,
379            ) {
380                Ok(hypercall_types::EngineMessage::OrderFilled { fill, .. }) => fill,
381                Ok(other) => {
382                    panic!(
383                        "CRITICAL_FAILURE: expected OrderFilled payload at idx {}, got {}",
384                        idx,
385                        other.type_name()
386                    )
387                }
388                Err(e) => {
389                    panic!(
390                        "CRITICAL_FAILURE: failed to deserialize journaled fill event at idx {}: {}",
391                        idx, e
392                    )
393                }
394            };
395            let side_effect = fill_side_effects_by_idx
396                .get(&(idx as i32))
397                .unwrap_or_else(|| {
398                    panic!(
399                        "CRITICAL_FAILURE: missing journal fill side-effect for request {:?}, event_idx={}, trade_id={}",
400                        entry.request_uuid, idx, fill.trade_id
401                    )
402                });
403            if side_effect.trade_id != fill.trade_id {
404                panic!(
405                    "CRITICAL_FAILURE: journal fill side-effect trade_id mismatch for request {:?}, event_idx={}: side_effect={}, fill={}",
406                    entry.request_uuid, idx, side_effect.trade_id, fill.trade_id
407                );
408            }
409
410            let side_effect = crate::event_handler::FillSideEffect {
411                trade_id: side_effect.trade_id,
412                taker_ledger_delta: side_effect.taker_ledger_delta,
413                maker_ledger_delta: side_effect.maker_ledger_delta,
414                taker_premium_delta: side_effect.taker_premium_delta,
415                maker_premium_delta: side_effect.maker_premium_delta,
416                underlying_notional: side_effect.underlying_notional,
417            };
418            crate::event_handler::persist_legacy_replay_fill_with_side_effects_in_tx(
419                conn,
420                &fill,
421                &side_effect,
422            )
423            .map_err(|e| diesel_unknown_error(anyhow!(e.to_string())))?;
424        }
425    }
426
427    for entry in entries {
428        if !inserted_commands.contains_key(&entry.request_uuid) {
429            continue;
430        }
431        for balance_update in &entry.balance_updates {
432            crate::ledger_ops::apply_pnl_decimal_sync(
433                conn,
434                &balance_update.update.wallet,
435                balance_update.update.delta,
436            )
437            .map_err(|e| diesel_unknown_error(anyhow!(e.to_string())))?;
438        }
439
440        if let Some(ref side_effect) = entry.cash_withdrawal_side_effect {
441            persist_cash_withdrawal_side_effect_in_tx(conn, side_effect)?;
442        }
443    }
444
445    Ok(())
446}
447
448fn persist_order_info_event_in_tx(
449    conn: &mut PgConnection,
450    idx: usize,
451    event: &hypercall_db::EngineJournalEventInsert,
452) -> QueryResult<()> {
453    let msg = rmp_serde::from_slice::<hypercall_types::OrderInfoMessage>(&event.event_data[1..])
454        .unwrap_or_else(|e| {
455            panic!(
456                "CRITICAL_FAILURE: failed to deserialize journaled OrderInfo at idx {}: {}",
457                idx, e
458            )
459        });
460    let new_order_info = crate::models::NewOrderInfo {
461        order_id: msg.order_id as i64,
462        wallet_address: msg.wallet,
463        symbol: msg.info.symbol.clone(),
464        side: format!("{:?}", msg.info.side),
465        price: msg.info.price,
466        size: msg.info.size,
467        tif: format!("{:?}", msg.info.tif),
468        client_id: msg.info.client_id.clone(),
469        is_perp: msg.info.is_perp,
470        underlying: msg.info.underlying.clone(),
471        reduce_only: msg.info.reduce_only,
472        nonce: msg.info.nonce.map(|n| n as i64),
473        signature: msg.info.signature.clone(),
474        mmp_enabled: msg.info.mmp_enabled,
475        timestamp: msg.timestamp as i64,
476        status: "ACKED".to_string(),
477        filled_size: rust_decimal::Decimal::ZERO,
478        last_materialized_order_update_id: 0,
479    };
480    diesel::insert_into(crate::schema::order_infos::table)
481        .values(&new_order_info)
482        .on_conflict(crate::schema::order_infos::order_id)
483        .do_update()
484        .set((
485            crate::schema::order_infos::wallet_address
486                .eq(excluded(crate::schema::order_infos::wallet_address)),
487            crate::schema::order_infos::symbol.eq(excluded(crate::schema::order_infos::symbol)),
488            crate::schema::order_infos::side.eq(excluded(crate::schema::order_infos::side)),
489            crate::schema::order_infos::price.eq(excluded(crate::schema::order_infos::price)),
490            crate::schema::order_infos::size.eq(excluded(crate::schema::order_infos::size)),
491            crate::schema::order_infos::tif.eq(excluded(crate::schema::order_infos::tif)),
492            crate::schema::order_infos::client_id
493                .eq(excluded(crate::schema::order_infos::client_id)),
494            crate::schema::order_infos::is_perp.eq(excluded(crate::schema::order_infos::is_perp)),
495            crate::schema::order_infos::underlying
496                .eq(excluded(crate::schema::order_infos::underlying)),
497            crate::schema::order_infos::reduce_only
498                .eq(excluded(crate::schema::order_infos::reduce_only)),
499            crate::schema::order_infos::nonce.eq(excluded(crate::schema::order_infos::nonce)),
500            crate::schema::order_infos::signature
501                .eq(excluded(crate::schema::order_infos::signature)),
502            crate::schema::order_infos::mmp_enabled
503                .eq(excluded(crate::schema::order_infos::mmp_enabled)),
504            crate::schema::order_infos::timestamp
505                .eq(excluded(crate::schema::order_infos::timestamp)),
506        ))
507        .execute(conn)?;
508    Ok(())
509}
510
511fn persist_order_update_event_in_tx(
512    conn: &mut PgConnection,
513    idx: usize,
514    event: &hypercall_db::EngineJournalEventInsert,
515) -> QueryResult<()> {
516    let msg = rmp_serde::from_slice::<hypercall_types::OrderUpdateMessage>(&event.event_data[1..])
517        .unwrap_or_else(|e| {
518            panic!(
519                "CRITICAL_FAILURE: failed to deserialize journaled OrderUpdate at idx {}: {}",
520                idx, e
521            )
522        });
523    let current_status = match msg.status {
524        hypercall_types::OrderUpdateStatus::Acked => "ACKED".to_string(),
525        hypercall_types::OrderUpdateStatus::Open => "OPEN".to_string(),
526        hypercall_types::OrderUpdateStatus::PartiallyFilled => "PARTIALLY_FILLED".to_string(),
527        hypercall_types::OrderUpdateStatus::Filled => "FILLED".to_string(),
528        hypercall_types::OrderUpdateStatus::Canceled => "CANCELED".to_string(),
529        hypercall_types::OrderUpdateStatus::Rejected => "REJECTED".to_string(),
530    };
531    let new_order_update = crate::models::NewOrderUpdate {
532        timestamp: msg.timestamp as i64,
533        order_id: msg.order_id.map(|id| id as i64),
534        status: current_status.clone(),
535        reason: msg.reason.clone(),
536        filled_size: msg.filled_size,
537        symbol: msg.info.symbol.clone(),
538        price: msg.info.price,
539        size: msg.info.size,
540        side: format!("{:?}", msg.info.side),
541        tif: format!("{:?}", msg.info.tif),
542        client_id: msg.info.client_id.clone(),
543    };
544    let order_update_id = diesel::insert_into(crate::schema::order_updates::table)
545        .values(&new_order_update)
546        .returning(crate::schema::order_updates::id)
547        .get_result::<Option<i32>>(conn)?
548        .expect("order_updates.id should be populated after insert");
549
550    if let Some(order_id) = msg.order_id {
551        let materialized_order_info = crate::models::NewOrderInfo {
552            order_id: order_id as i64,
553            wallet_address: msg.wallet_address,
554            symbol: msg.info.symbol.clone(),
555            side: format!("{:?}", msg.info.side),
556            price: msg.info.price,
557            size: msg.info.size,
558            tif: format!("{:?}", msg.info.tif),
559            client_id: msg.info.client_id.clone(),
560            is_perp: msg.info.is_perp,
561            underlying: msg.info.underlying.clone(),
562            reduce_only: msg.info.reduce_only,
563            nonce: msg.info.nonce.map(|n| n as i64),
564            signature: msg.info.signature.clone(),
565            mmp_enabled: msg.info.mmp_enabled,
566            timestamp: msg.timestamp as i64,
567            status: current_status,
568            filled_size: msg.filled_size,
569            last_materialized_order_update_id: order_update_id,
570        };
571        upsert_materialized_order_state(conn, &materialized_order_info)?;
572    }
573    Ok(())
574}
575
576fn persist_cash_withdrawal_side_effect_in_tx(
577    conn: &mut diesel::PgConnection,
578    side_effect: &hypercall_db::EngineJournalCashWithdrawalSideEffect,
579) -> QueryResult<()> {
580    use diesel::sql_types::{BigInt, Binary, Numeric, Text};
581
582    #[derive(diesel::QueryableByName)]
583    struct InsertedEventRow {
584        #[diesel(sql_type = BigInt)]
585        id: i64,
586    }
587
588    #[derive(diesel::QueryableByName)]
589    struct LedgerEventRow {
590        #[diesel(sql_type = BigInt)]
591        id: i64,
592    }
593
594    let event_time_ms = i64::try_from(side_effect.timestamp_ms).unwrap_or_else(|_| {
595        panic!(
596            "STATE_CORRUPTION: timestamp_ms {} exceeds i64 range for cash withdrawal {}",
597            side_effect.timestamp_ms, side_effect.request_id
598        )
599    });
600
601    let inserted = diesel::sql_query(
602        "INSERT INTO hypercore_cash_ledger_events
603            (wallet, event_hash, event_time_ms, delta_type, amount_usdc)
604         VALUES ($1, $2, $3, 'withdraw', $4)
605         ON CONFLICT (wallet, event_hash, delta_type) DO NOTHING
606         RETURNING id",
607    )
608    .bind::<Binary, _>(side_effect.wallet.as_bytes())
609    .bind::<Text, _>(&side_effect.request_id)
610    .bind::<BigInt, _>(event_time_ms)
611    .bind::<Numeric, _>(side_effect.amount)
612    .get_result::<InsertedEventRow>(conn)
613    .optional()?;
614
615    let Some(inserted) = inserted else {
616        return Ok(());
617    };
618
619    let ledger = diesel::sql_query(
620        "INSERT INTO ledger_events (wallet, event_ts_ms, delta, event_type, reference_symbol)
621         VALUES ($1, $2, $3, 'withdraw', $4)
622         RETURNING id",
623    )
624    .bind::<Binary, _>(side_effect.wallet.as_bytes())
625    .bind::<BigInt, _>(event_time_ms)
626    .bind::<Numeric, _>(-side_effect.amount)
627    .bind::<Text, _>(&side_effect.request_id)
628    .get_result::<LedgerEventRow>(conn)?;
629
630    diesel::sql_query(
631        "UPDATE hypercore_cash_ledger_events
632         SET ledger_event_id = $2,
633             balance_after = $3,
634             status = 'submitted',
635             error = NULL,
636             updated_at = NOW()
637         WHERE id = $1",
638    )
639    .bind::<BigInt, _>(inserted.id)
640    .bind::<BigInt, _>(ledger.id)
641    .bind::<Numeric, _>(side_effect.balance_after)
642    .execute(conn)?;
643
644    Ok(())
645}
646
647#[cfg(feature = "rsm-state")]
648mod rsm {
649    use super::*;
650    use diesel::sql_types::{Bytea, Integer, SmallInt, Uuid};
651
652    #[derive(QueryableByName)]
653    struct ExistingRootSummaryRow {
654        #[diesel(sql_type = BigInt)]
655        batch_seq: i64,
656        #[diesel(sql_type = Bytea)]
657        state_root: Vec<u8>,
658        #[diesel(sql_type = Bytea)]
659        risk_root: Vec<u8>,
660        #[diesel(sql_type = Bytea)]
661        command_mmr_root: Vec<u8>,
662        #[diesel(sql_type = Bytea)]
663        obligation_mmr_root: Vec<u8>,
664        #[diesel(sql_type = Bytea)]
665        intent_mmr_root: Vec<u8>,
666        #[diesel(sql_type = Bytea)]
667        batch_root: Vec<u8>,
668        #[diesel(sql_type = BigInt)]
669        command_range_start: i64,
670        #[diesel(sql_type = BigInt)]
671        command_range_end: i64,
672        #[diesel(sql_type = BigInt)]
673        command_count: i64,
674        #[diesel(sql_type = Integer)]
675        schema_version: i32,
676        #[diesel(sql_type = Bytea)]
677        accepted_block_hash: Vec<u8>,
678    }
679
680    #[derive(QueryableByName)]
681    struct ExistingBlockRow {
682        #[diesel(sql_type = Bytea)]
683        hash: Vec<u8>,
684        #[diesel(sql_type = Bytea)]
685        parent_hash: Vec<u8>,
686        #[diesel(sql_type = Bytea)]
687        commands_hash: Vec<u8>,
688        #[diesel(sql_type = Bytea)]
689        batch_root: Vec<u8>,
690        #[diesel(sql_type = BigInt)]
691        command_count: i64,
692        #[diesel(sql_type = BigInt)]
693        first_command_seq: i64,
694        #[diesel(sql_type = BigInt)]
695        last_command_seq: i64,
696        #[diesel(sql_type = Nullable<Bytea>)]
697        signer: Option<Vec<u8>>,
698        #[diesel(sql_type = Nullable<Bytea>)]
699        signature: Option<Vec<u8>>,
700    }
701
702    #[derive(QueryableByName)]
703    struct ExistingBlockCommandRow {
704        #[diesel(sql_type = BigInt)]
705        command_index: i64,
706        #[diesel(sql_type = BigInt)]
707        engine_command_id: i64,
708        #[diesel(sql_type = Uuid)]
709        request_uuid: DbUuid,
710        #[diesel(sql_type = Text)]
711        command_type: String,
712        #[diesel(sql_type = Bytea)]
713        command_data: Vec<u8>,
714        #[diesel(sql_type = Bytea)]
715        command_identity_hash: Vec<u8>,
716    }
717
718    pub(super) fn persist_rsm_blocks_in_tx(
719        conn: &mut PgConnection,
720        input: &hypercall_db::EngineJournalRsmBlockBatch,
721    ) -> Result<()> {
722        let mut latest_summary = None;
723
724        for block in &input.blocks {
725            let advance_current = input.mode == hypercall_db::EngineJournalRsmPersistenceMode::Live;
726            save_root_summary_in_tx(conn, &block.root_summary, advance_current)?;
727            if input.mode == hypercall_db::EngineJournalRsmPersistenceMode::Replay {
728                latest_summary = Some(block.root_summary.clone());
729            }
730            save_block_header_in_tx(conn, &block.header)?;
731
732            let request_ids: Vec<_> = block
733                .commands
734                .iter()
735                .map(|cmd| DbUuid(cmd.request_uuid))
736                .collect();
737            let command_ids: Vec<(DbUuid, i64)> = engine_commands::table
738                .filter(engine_commands::request_uuid.eq_any(&request_ids))
739                .select((engine_commands::request_uuid, engine_commands::command_id))
740                .load(conn)?;
741            let command_ids_by_request_uuid: HashMap<_, _> = command_ids
742                .into_iter()
743                .map(|(id, command_id)| (id.0, command_id))
744                .collect();
745
746            let commands = block
747                .commands
748                .iter()
749                .map(|command| {
750                    let engine_command_id = *command_ids_by_request_uuid
751                        .get(&command.request_uuid)
752                        .ok_or_else(|| {
753                            anyhow!(
754                                "missing engine_command_id for RSM block {} request {}",
755                                block.header.height,
756                                command.request_uuid
757                            )
758                        })?;
759                    Ok(hypercall_db::NewRsmBlockCommand {
760                        environment: block.header.environment,
761                        height: block.header.height,
762                        rsm_command_seq: command.rsm_command_seq,
763                        command_index: command.command_index,
764                        engine_command_id,
765                        request_uuid: command.request_uuid,
766                        command_type: command.command_type.clone(),
767                        command_data: command.command_data.clone(),
768                        command_identity_hash: command.command_identity_hash,
769                    })
770                })
771                .collect::<Result<Vec<_>>>()?;
772            save_block_commands_in_tx(conn, &commands)?;
773        }
774
775        if let Some(summary) = latest_summary {
776            let current_version = current_version_in_tx(conn, summary.environment)?;
777            if current_version
778                .map(|current| current <= summary.version)
779                .unwrap_or(true)
780            {
781                save_root_summary_in_tx(conn, &summary, true)?;
782            }
783        }
784
785        Ok(())
786    }
787
788    fn u64_to_i64(name: &str, value: u64) -> Result<i64> {
789        i64::try_from(value).map_err(|_| anyhow!("{name} overflows BIGINT: {value}"))
790    }
791
792    fn save_root_summary_in_tx(
793        conn: &mut PgConnection,
794        summary: &hypercall_db::NewValidatorRsmRootSummary,
795        advance_current: bool,
796    ) -> Result<()> {
797        let version = u64_to_i64("version", summary.version)?;
798        let batch_seq = u64_to_i64("batch_seq", summary.batch_seq)?;
799        let command_range_start = u64_to_i64("command_range_start", summary.command_range_start)?;
800        let command_range_end = u64_to_i64("command_range_end", summary.command_range_end)?;
801        let command_count = u64_to_i64("command_count", summary.command_count)?;
802
803        let inserted = diesel::sql_query(
804            "INSERT INTO validator_rsm_batch_roots (
805                environment, version, batch_seq, state_root, risk_root, command_mmr_root,
806                obligation_mmr_root, intent_mmr_root, batch_root, command_range_start,
807                command_range_end, command_count, schema_version, accepted_block_hash
808             ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
809             ON CONFLICT (environment, version) DO NOTHING",
810        )
811        .bind::<SmallInt, _>(summary.environment.as_i16())
812        .bind::<BigInt, _>(version)
813        .bind::<BigInt, _>(batch_seq)
814        .bind::<Bytea, _>(&summary.state_root[..])
815        .bind::<Bytea, _>(&summary.risk_root[..])
816        .bind::<Bytea, _>(&summary.command_mmr_root[..])
817        .bind::<Bytea, _>(&summary.obligation_mmr_root[..])
818        .bind::<Bytea, _>(&summary.intent_mmr_root[..])
819        .bind::<Bytea, _>(&summary.batch_root[..])
820        .bind::<BigInt, _>(command_range_start)
821        .bind::<BigInt, _>(command_range_end)
822        .bind::<BigInt, _>(command_count)
823        .bind::<Integer, _>(summary.schema_version)
824        .bind::<Bytea, _>(&summary.accepted_block_hash[..])
825        .execute(conn)?;
826
827        if inserted == 0 {
828            let existing = diesel::sql_query(
829                "SELECT batch_seq, state_root, risk_root, command_mmr_root,
830                        obligation_mmr_root, intent_mmr_root, batch_root, command_range_start,
831                        command_range_end, command_count, schema_version, accepted_block_hash
832                 FROM validator_rsm_batch_roots
833                 WHERE environment = $1 AND version = $2",
834            )
835            .bind::<SmallInt, _>(summary.environment.as_i16())
836            .bind::<BigInt, _>(version)
837            .get_result::<ExistingRootSummaryRow>(conn)?;
838
839            let matches_existing = existing.batch_seq == batch_seq
840                && existing.state_root == summary.state_root.as_slice()
841                && existing.risk_root == summary.risk_root.as_slice()
842                && existing.command_mmr_root == summary.command_mmr_root.as_slice()
843                && existing.obligation_mmr_root == summary.obligation_mmr_root.as_slice()
844                && existing.intent_mmr_root == summary.intent_mmr_root.as_slice()
845                && existing.batch_root == summary.batch_root.as_slice()
846                && existing.command_range_start == command_range_start
847                && existing.command_range_end == command_range_end
848                && existing.command_count == command_count
849                && existing.schema_version == summary.schema_version
850                && existing.accepted_block_hash == summary.accepted_block_hash.as_slice();
851            if !matches_existing {
852                anyhow::bail!(
853                    "conflicting validator RSM root summary for {} version {}",
854                    summary.environment,
855                    summary.version
856                );
857            }
858        }
859
860        if advance_current {
861            let affected = diesel::sql_query(
862                "INSERT INTO validator_rsm_current_state (environment, current_version, updated_at)
863                 VALUES ($1, $2, NOW())
864                 ON CONFLICT (environment) DO UPDATE
865                 SET current_version = EXCLUDED.current_version, updated_at = NOW()
866                 WHERE validator_rsm_current_state.current_version <= EXCLUDED.current_version",
867            )
868            .bind::<SmallInt, _>(summary.environment.as_i16())
869            .bind::<BigInt, _>(version)
870            .execute(conn)?;
871            if affected == 0 {
872                anyhow::bail!(
873                    "refusing to move validator RSM current version for {} backward to {}",
874                    summary.environment,
875                    version
876                );
877            }
878        }
879
880        Ok(())
881    }
882
883    fn save_block_header_in_tx(
884        conn: &mut PgConnection,
885        block: &hypercall_db::NewRsmBlockHeader,
886    ) -> Result<()> {
887        let height = u64_to_i64("height", block.height)?;
888        let command_count = u64_to_i64("command_count", block.command_count)?;
889        let first_command_seq = u64_to_i64("first_command_seq", block.first_command_seq)?;
890        let last_command_seq = u64_to_i64("last_command_seq", block.last_command_seq)?;
891
892        let inserted = diesel::sql_query(
893            "INSERT INTO validator_rsm_blocks (
894                environment, height, hash, parent_hash, commands_hash, batch_root,
895                command_count, first_command_seq, last_command_seq, signer, signature
896             ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
897             ON CONFLICT (environment, height) DO NOTHING",
898        )
899        .bind::<SmallInt, _>(block.environment.as_i16())
900        .bind::<BigInt, _>(height)
901        .bind::<Bytea, _>(&block.hash[..])
902        .bind::<Bytea, _>(&block.parent_hash[..])
903        .bind::<Bytea, _>(&block.commands_hash[..])
904        .bind::<Bytea, _>(&block.batch_root[..])
905        .bind::<BigInt, _>(command_count)
906        .bind::<BigInt, _>(first_command_seq)
907        .bind::<BigInt, _>(last_command_seq)
908        .bind::<Nullable<Bytea>, _>(block.signer.as_ref().map(|signer| signer.as_slice()))
909        .bind::<Nullable<Bytea>, _>(block.signature.as_deref())
910        .execute(conn)?;
911
912        if inserted == 0 {
913            let existing = diesel::sql_query(
914                "SELECT hash, parent_hash, commands_hash, batch_root, command_count,
915                        first_command_seq, last_command_seq, signer, signature
916                 FROM validator_rsm_blocks
917                 WHERE environment = $1 AND height = $2",
918            )
919            .bind::<SmallInt, _>(block.environment.as_i16())
920            .bind::<BigInt, _>(height)
921            .get_result::<ExistingBlockRow>(conn)?;
922
923            let matches_existing = existing.hash == block.hash.as_slice()
924                && existing.parent_hash == block.parent_hash.as_slice()
925                && existing.commands_hash == block.commands_hash.as_slice()
926                && existing.batch_root == block.batch_root.as_slice()
927                && existing.command_count == command_count
928                && existing.first_command_seq == first_command_seq
929                && existing.last_command_seq == last_command_seq
930                && existing.signer.as_deref() == block.signer.as_ref().map(|s| &s[..])
931                && existing.signature.as_deref() == block.signature.as_deref();
932            if !matches_existing {
933                anyhow::bail!(
934                    "conflicting validator RSM block for {} height {}",
935                    block.environment,
936                    block.height
937                );
938            }
939        }
940
941        Ok(())
942    }
943
944    fn save_block_commands_in_tx(
945        conn: &mut PgConnection,
946        commands: &[hypercall_db::NewRsmBlockCommand],
947    ) -> Result<()> {
948        for command in commands {
949            let height = u64_to_i64("height", command.height)?;
950            let rsm_command_seq = u64_to_i64("rsm_command_seq", command.rsm_command_seq)?;
951            let command_index = u64_to_i64("command_index", command.command_index)?;
952            let inserted = diesel::sql_query(
953                "INSERT INTO validator_rsm_block_commands (
954                    environment, height, rsm_command_seq, command_index, engine_command_id,
955                    request_uuid, command_type, command_data, command_identity_hash
956                 ) VALUES ($1, $2, $3, $4, $5, $6, $7::engine_command_type, $8, $9)
957                 ON CONFLICT (environment, height, rsm_command_seq) DO NOTHING",
958            )
959            .bind::<SmallInt, _>(command.environment.as_i16())
960            .bind::<BigInt, _>(height)
961            .bind::<BigInt, _>(rsm_command_seq)
962            .bind::<BigInt, _>(command_index)
963            .bind::<BigInt, _>(command.engine_command_id)
964            .bind::<Uuid, _>(DbUuid(command.request_uuid))
965            .bind::<Text, _>(&command.command_type)
966            .bind::<Bytea, _>(&command.command_data)
967            .bind::<Bytea, _>(&command.command_identity_hash[..])
968            .execute(conn)?;
969
970            if inserted == 0 {
971                let existing = diesel::sql_query(
972                    "SELECT command_index, engine_command_id, request_uuid, command_type::text AS command_type,
973                            command_data, command_identity_hash
974                     FROM validator_rsm_block_commands
975                     WHERE environment = $1 AND height = $2 AND rsm_command_seq = $3",
976                )
977                .bind::<SmallInt, _>(command.environment.as_i16())
978                .bind::<BigInt, _>(height)
979                .bind::<BigInt, _>(rsm_command_seq)
980                .get_result::<ExistingBlockCommandRow>(conn)?;
981
982                let matches_existing = existing.command_index == command_index
983                    && existing.engine_command_id == command.engine_command_id
984                    && existing.request_uuid.0 == command.request_uuid
985                    && existing.command_type == command.command_type
986                    && existing.command_data == command.command_data
987                    && existing.command_identity_hash == command.command_identity_hash.as_slice();
988                if !matches_existing {
989                    anyhow::bail!(
990                        "conflicting validator RSM block command for {} height {} sequence {}",
991                        command.environment,
992                        command.height,
993                        command.rsm_command_seq
994                    );
995                }
996            }
997        }
998
999        Ok(())
1000    }
1001
1002    fn current_version_in_tx(
1003        conn: &mut PgConnection,
1004        environment: hypercall_db::ValidatorRsmEnvironment,
1005    ) -> Result<Option<u64>> {
1006        #[derive(QueryableByName)]
1007        struct CurrentVersionRow {
1008            #[diesel(sql_type = BigInt)]
1009            current_version: i64,
1010        }
1011
1012        let row = diesel::sql_query(
1013            "SELECT current_version FROM validator_rsm_current_state WHERE environment = $1",
1014        )
1015        .bind::<SmallInt, _>(environment.as_i16())
1016        .get_result::<CurrentVersionRow>(conn)
1017        .optional()?;
1018
1019        row.map(|row| {
1020            u64::try_from(row.current_version)
1021                .map_err(|_| anyhow!("current_version is negative: {}", row.current_version))
1022        })
1023        .transpose()
1024    }
1025}
1026
1027#[cfg(feature = "rsm-state")]
1028use rsm::persist_rsm_blocks_in_tx;
1029
1030#[cfg(test)]
1031mod tests {
1032    use super::*;
1033    use crate::test_helpers::TestDb;
1034    use hypercall_db::EngineJournalBatchWriter;
1035    use hypercall_types::wallet_address::test_wallet;
1036    use rust_decimal::Decimal;
1037    use rust_decimal_macros::dec;
1038
1039    #[test]
1040    fn event_type_mapping_covers_writer_trait_variants() {
1041        let variants = [
1042            hypercall_db::EventType::OrderAction,
1043            hypercall_db::EventType::OrderUpdate,
1044            hypercall_db::EventType::OrderInfo,
1045            hypercall_db::EventType::MarketAction,
1046            hypercall_db::EventType::MarketUpdate,
1047            hypercall_db::EventType::OrderFilled,
1048            hypercall_db::EventType::OrderbookUpdated,
1049            hypercall_db::EventType::L2Update,
1050            hypercall_db::EventType::Trade,
1051            hypercall_db::EventType::TransactionRequest,
1052            hypercall_db::EventType::TransactionUpdate,
1053            hypercall_db::EventType::MmpTriggered,
1054            hypercall_db::EventType::PositionExpired,
1055            hypercall_db::EventType::TierUpdate,
1056            hypercall_db::EventType::HypercorePositionUpdate,
1057            hypercall_db::EventType::LiquidationStateChange,
1058            hypercall_db::EventType::RfqFilled,
1059        ];
1060
1061        for event_type in variants {
1062            assert_eq!(
1063                event_type_to_diesel(event_type).as_str(),
1064                event_type.to_string()
1065            );
1066        }
1067    }
1068
1069    #[test]
1070    fn command_type_parser_fails_closed_for_unknown_values() {
1071        assert_eq!(
1072            parse_command_type("CreateOrder")
1073                .expect("CreateOrder maps")
1074                .as_str(),
1075            "CreateOrder"
1076        );
1077        assert!(
1078            parse_command_type("InventedCommand").is_err(),
1079            "unknown command types must not silently map to a default"
1080        );
1081    }
1082
1083    fn journal_entry_with_balance_update(
1084        request_uuid: uuid::Uuid,
1085        wallet: hypercall_types::WalletAddress,
1086        delta: Decimal,
1087        balance_after: Decimal,
1088    ) -> hypercall_db::EngineJournalEntryInsert {
1089        hypercall_db::EngineJournalEntryInsert {
1090            request_uuid,
1091            received_ts_ms: 1_700_000_000_000,
1092            command_data: b"test-command".to_vec(),
1093            response_data: None,
1094            order_id: None,
1095            command_type: None,
1096            duration_ms: 1,
1097            pre_digest_data: vec![],
1098            post_digest_data: vec![],
1099            events: vec![],
1100            outbox_appends: vec![],
1101            fill_side_effects: vec![],
1102            cash_withdrawal_side_effect: None,
1103            balance_updates: vec![hypercall_db::EngineJournalBalanceUpdate {
1104                update: hypercall_types::BalanceUpdate {
1105                    balance_update_seq: 1,
1106                    wallet,
1107                    delta,
1108                    balance_after,
1109                    reason: hypercall_types::BalanceUpdateReason::Deposit,
1110                    reference_id: Some("journal-projection-test".to_string()),
1111                    source_command_id: None,
1112                    timestamp_ms: 1_700_000_000_000,
1113                },
1114            }],
1115        }
1116    }
1117
1118    fn account_balance(
1119        conn: &mut PgConnection,
1120        wallet: hypercall_types::WalletAddress,
1121    ) -> Option<Decimal> {
1122        crate::schema::account_balances::table
1123            .filter(crate::schema::account_balances::account_address.eq(wallet))
1124            .select(crate::schema::account_balances::balance)
1125            .first::<Decimal>(conn)
1126            .optional()
1127            .unwrap()
1128    }
1129
1130    #[tokio::test]
1131    async fn journal_balance_updates_materialize_account_balances_projection() {
1132        let test_db = TestDb::new().await.unwrap();
1133        let wallet = test_wallet(31);
1134        let request_uuid = uuid::Uuid::new_v4();
1135        let entry = journal_entry_with_balance_update(request_uuid, wallet, dec!(125), dec!(125));
1136
1137        let result = test_db
1138            .handler
1139            .insert_engine_journal_batch_sync(&[entry], false, None)
1140            .unwrap();
1141        assert_eq!(result.inserted_count, 1);
1142
1143        let mut conn = test_db.handler.pool().get().unwrap();
1144        assert_eq!(account_balance(&mut conn, wallet), Some(dec!(125)));
1145    }
1146
1147    #[tokio::test]
1148    async fn duplicate_journal_command_does_not_double_apply_account_balances_projection() {
1149        let test_db = TestDb::new().await.unwrap();
1150        let wallet = test_wallet(32);
1151        let request_uuid = uuid::Uuid::new_v4();
1152        let entry = journal_entry_with_balance_update(request_uuid, wallet, dec!(75), dec!(75));
1153
1154        let first = test_db
1155            .handler
1156            .insert_engine_journal_batch_sync(std::slice::from_ref(&entry), false, None)
1157            .unwrap();
1158        assert_eq!(first.inserted_count, 1);
1159
1160        let duplicate = test_db
1161            .handler
1162            .insert_engine_journal_batch_sync(&[entry], false, None)
1163            .unwrap();
1164        assert_eq!(duplicate.inserted_count, 0);
1165
1166        let mut conn = test_db.handler.pool().get().unwrap();
1167        assert_eq!(account_balance(&mut conn, wallet), Some(dec!(75)));
1168    }
1169
1170    #[tokio::test]
1171    async fn mixed_duplicate_and_new_batch_applies_only_new_balance_projection() {
1172        let test_db = TestDb::new().await.unwrap();
1173        let wallet = test_wallet(33);
1174        let duplicate_uuid = uuid::Uuid::new_v4();
1175        let new_uuid = uuid::Uuid::new_v4();
1176        let duplicate =
1177            journal_entry_with_balance_update(duplicate_uuid, wallet, dec!(75), dec!(75));
1178        let new_entry = journal_entry_with_balance_update(new_uuid, wallet, dec!(25), dec!(100));
1179
1180        let first = test_db
1181            .handler
1182            .insert_engine_journal_batch_sync(std::slice::from_ref(&duplicate), false, None)
1183            .unwrap();
1184        assert_eq!(first.inserted_count, 1);
1185
1186        let mixed = test_db
1187            .handler
1188            .insert_engine_journal_batch_sync(&[duplicate, new_entry], false, None)
1189            .unwrap();
1190        assert_eq!(mixed.inserted_count, 1);
1191
1192        let mut conn = test_db.handler.pool().get().unwrap();
1193        assert_eq!(account_balance(&mut conn, wallet), Some(dec!(100)));
1194    }
1195}