Skip to main content

hypercall_db_diesel/
event_handler.rs

1//! Engine event handler: persists `EngineMessage` variants to Postgres.
2//!
3//! This module contains the core event-to-SQL logic that was previously
4//! embedded in the root crate's `DieselEventHandler`. It operates on a
5//! bare `&mut PgConnection` so callers can wrap calls in their own
6//! transactions.
7
8use anyhow::{anyhow, Result};
9use diesel::pg::PgConnection;
10use diesel::prelude::*;
11use diesel::sql_types::{BigInt, Numeric};
12use diesel::OptionalExtension;
13use rust_decimal::Decimal;
14
15use hypercall_types::engine_messages::{
16    EngineMessage, LiquidationStateMessage, LiquidationStateType,
17};
18use hypercall_types::liquidation_state::{AccountLiquidationStatus, LiquidationState};
19use hypercall_types::{
20    utils::is_option_symbol, MarketUpdateStatus, OptionType, OrderUpdateStatus, WalletAddress,
21};
22
23use crate::database_handler::{current_option_token_deployment, DatabaseHandler};
24use crate::models::*;
25use crate::order_status_materialization::upsert_materialized_order_state;
26use crate::schema;
27
28/// Portable representation of an expired order that needs a synthetic cancel.
29///
30/// This mirrors `StartupExpiredOrderCancel` from the root crate but lives in
31/// `hypercall-db-diesel` so the batch cancel logic can be called from outside
32/// the root crate.
33pub struct ExpiredOrderCancel {
34    pub order_id: u64,
35    pub wallet: WalletAddress,
36    pub symbol: String,
37    pub price: Decimal,
38    pub size: Decimal,
39    pub side: hypercall_types::Side,
40    pub tif: hypercall_types::TimeInForce,
41    pub is_perp: bool,
42    pub underlying: Option<String>,
43    pub reduce_only: Option<bool>,
44    pub nonce: Option<u64>,
45    pub signature: Option<String>,
46    pub mmp_enabled: bool,
47    pub filled_size: Decimal,
48    pub client_id: Option<String>,
49    pub timestamp: u64,
50}
51
52// ---------------------------------------------------------------------------
53// Free helpers (module-private unless noted)
54// ---------------------------------------------------------------------------
55
56/// Convert an `AccountLiquidationStatus` to a `LiquidationStateRecord` for
57/// database storage.
58///
59/// This was previously `LiquidationCache::status_to_new_state` in the root
60/// crate. All the types it touches live in `hypercall_types` / `hypercall_db`,
61/// so it belongs here.
62pub fn status_to_liquidation_state_record(
63    status: &AccountLiquidationStatus,
64) -> hypercall_db::LiquidationStateRecord {
65    use serde_json::Value as JsonValue;
66
67    fn json_string_values(values: &[String]) -> Vec<JsonValue> {
68        values.iter().cloned().map(JsonValue::String).collect()
69    }
70
71    let mut record = hypercall_db::LiquidationStateRecord {
72        wallet_address: status.wallet,
73        state: status.state.db_str().to_string(),
74        margin_mode: status.margin_mode.as_str().to_string(),
75        equity: status.equity,
76        mm_required: status.mm_required,
77        maintenance_margin: status.maintenance_margin,
78        liquidation_mode: status
79            .liquidation_mode()
80            .map(|mode| mode.as_str().to_string()),
81        target_equity: None,
82        entered_pre_liq_at: None,
83        mm_shortfall: None,
84        escalation_deadline: None,
85        last_reprice_at: None,
86        partial_order_request_ids: None,
87        partial_order_client_ids: None,
88        partial_bonus_bps: None,
89        auction_id: None,
90        request_id: None,
91        tx_hash: None,
92        auction_started_at: None,
93        chain_start_time: None,
94        margin_needed: None,
95        stop_request_id: None,
96        stop_tx_hash: None,
97        liquidated_at: None,
98        resolved_winner: None,
99        resolved_bonus: None,
100        resolution_tx_hash: None,
101        last_observed_block: None,
102        updated_at_ms: Some(status.updated_at as i64),
103        created_at: None,
104        updated_at: None,
105    };
106
107    match &status.state {
108        LiquidationState::Healthy => {}
109        LiquidationState::PreLiquidation(metadata) => {
110            record.target_equity = Some(metadata.target_equity);
111            record.entered_pre_liq_at = Some(metadata.entered_at as i64);
112            record.mm_shortfall = Some(metadata.mm_shortfall);
113            record.escalation_deadline = Some(metadata.escalation_deadline as i64);
114            record.last_reprice_at = metadata.last_reprice_at.map(|value| value as i64);
115            record.partial_order_request_ids = Some(JsonValue::Array(json_string_values(
116                &metadata.active_order_request_ids,
117            )));
118            record.partial_order_client_ids = Some(JsonValue::Array(json_string_values(
119                &metadata.active_order_client_ids,
120            )));
121            record.partial_bonus_bps = Some(metadata.bonus_bps as i32);
122            record.auction_id = metadata.pending_full_auction_id.clone();
123            record.request_id = metadata.pending_full_request_id.clone();
124            record.tx_hash = metadata.pending_full_tx_hash.clone();
125            record.margin_needed = metadata.pending_full_margin_needed;
126        }
127        LiquidationState::InLiquidation(metadata) => {
128            record.auction_id = Some(metadata.auction_id.clone());
129            record.request_id = metadata.request_id.clone();
130            record.tx_hash = metadata.tx_hash.clone();
131            record.auction_started_at = Some(metadata.started_at as i64);
132            record.chain_start_time = metadata.chain_start_time.map(|value| value as i64);
133            record.margin_needed = Some(metadata.margin_needed);
134            record.stop_request_id = metadata.stop_request_id.clone();
135            record.stop_tx_hash = metadata.stop_tx_hash.clone();
136        }
137        LiquidationState::Liquidated(metadata) => {
138            record.auction_id = Some(metadata.auction_id.clone());
139            record.liquidated_at = Some(metadata.completed_at as i64);
140            record.resolved_winner = metadata.winner;
141            record.resolved_bonus = Some(metadata.bonus);
142            record.resolution_tx_hash = metadata.tx_hash.clone();
143        }
144    }
145
146    record
147}
148
149/// Convert a domain `LiquidationStateRecord` into the Diesel `NewLiquidationState` insertable.
150fn new_liquidation_state_from_domain(
151    d: &hypercall_db::LiquidationStateRecord,
152) -> NewLiquidationState {
153    NewLiquidationState {
154        wallet_address: d.wallet_address,
155        state: d.state.clone(),
156        margin_mode: d.margin_mode.clone(),
157        equity: d.equity,
158        mm_required: d.mm_required,
159        maintenance_margin: d.maintenance_margin,
160        liquidation_mode: d.liquidation_mode.clone(),
161        target_equity: d.target_equity,
162        entered_pre_liq_at: d.entered_pre_liq_at,
163        mm_shortfall: d.mm_shortfall,
164        escalation_deadline: d.escalation_deadline,
165        last_reprice_at: d.last_reprice_at,
166        partial_order_request_ids: d.partial_order_request_ids.clone(),
167        partial_order_client_ids: d.partial_order_client_ids.clone(),
168        partial_bonus_bps: d.partial_bonus_bps,
169        auction_id: d.auction_id.clone(),
170        request_id: d.request_id.clone(),
171        tx_hash: d.tx_hash.clone(),
172        auction_started_at: d.auction_started_at,
173        chain_start_time: d.chain_start_time,
174        margin_needed: d.margin_needed,
175        stop_request_id: d.stop_request_id.clone(),
176        stop_tx_hash: d.stop_tx_hash.clone(),
177        liquidated_at: d.liquidated_at,
178        resolved_winner: d.resolved_winner,
179        resolved_bonus: d.resolved_bonus,
180        resolution_tx_hash: d.resolution_tx_hash.clone(),
181        last_observed_block: d.last_observed_block,
182        updated_at_ms: d.updated_at_ms,
183    }
184}
185
186fn liquidation_history_from_message(
187    message: &LiquidationStateMessage,
188) -> Result<NewLiquidationHistory> {
189    let (request_id, tx_hash, margin_needed, winner_address, bonus) = match &message.status.state {
190        LiquidationState::Healthy => (None, None, None, None, None),
191        LiquidationState::PreLiquidation(metadata) => (
192            metadata.pending_full_request_id.clone(),
193            metadata.pending_full_tx_hash.clone(),
194            metadata.pending_full_margin_needed,
195            None,
196            None,
197        ),
198        LiquidationState::InLiquidation(metadata) => (
199            metadata.request_id.clone(),
200            metadata.tx_hash.clone(),
201            Some(metadata.margin_needed),
202            None,
203            None,
204        ),
205        LiquidationState::Liquidated(metadata) => (
206            None,
207            metadata.tx_hash.clone(),
208            None,
209            metadata.winner,
210            Some(metadata.bonus),
211        ),
212    };
213
214    let details = serde_json::to_value(&message.status)
215        .map_err(|e| anyhow!("failed to serialize liquidation status snapshot: {}", e))?;
216
217    Ok(NewLiquidationHistory {
218        wallet_address: message.wallet,
219        previous_state: message.previous_state.to_string(),
220        new_state: message.new_state.to_string(),
221        liquidation_mode: message.liquidation_mode.clone(),
222        equity: message.equity,
223        mm_required: message.mm_required,
224        maintenance_margin: message.maintenance_margin,
225        shortfall: message.shortfall,
226        auction_id: message.auction_id.clone(),
227        request_id,
228        tx_hash,
229        margin_needed,
230        winner_address,
231        bonus,
232        details,
233        timestamp: i64::try_from(message.timestamp).map_err(|_| {
234            anyhow!(
235                "liquidation timestamp {} exceeds i64 range",
236                message.timestamp
237            )
238        })?,
239    })
240}
241
242fn liquidation_auction_from_status(
243    status: &AccountLiquidationStatus,
244) -> Result<Option<NewLiquidationAuction>> {
245    match &status.state {
246        LiquidationState::PreLiquidation(metadata) => {
247            let Some(auction_id) = metadata.pending_full_auction_id.clone() else {
248                return Ok(None);
249            };
250            Ok(Some(NewLiquidationAuction {
251                auction_id,
252                wallet_address: status.wallet,
253                status: "pending".to_string(),
254                positions: serde_json::Value::Array(Vec::new()),
255                equity_at_start: status.equity,
256                mm_shortfall_at_start: status.shortfall(),
257                target_equity: Some(metadata.target_equity),
258                request_id: metadata.pending_full_request_id.clone(),
259                tx_hash: metadata.pending_full_tx_hash.clone(),
260                started_at: i64::try_from(metadata.entered_at).map_err(|_| {
261                    anyhow!(
262                        "pre-liquidation entered_at {} exceeds i64 range",
263                        metadata.entered_at
264                    )
265                })?,
266                chain_start_time: None,
267                margin_needed: metadata.pending_full_margin_needed,
268                stop_request_id: None,
269                stop_tx_hash: None,
270                completed_at: None,
271                liquidator_address: None,
272                bonus: None,
273                settlement_value: None,
274                last_observed_block: None,
275            }))
276        }
277        LiquidationState::InLiquidation(metadata) => Ok(Some(NewLiquidationAuction {
278            auction_id: metadata.auction_id.clone(),
279            wallet_address: status.wallet,
280            status: "active".to_string(),
281            positions: serde_json::Value::Array(Vec::new()),
282            equity_at_start: status.equity,
283            mm_shortfall_at_start: status.shortfall(),
284            target_equity: None,
285            request_id: metadata.request_id.clone(),
286            tx_hash: metadata.tx_hash.clone(),
287            started_at: i64::try_from(metadata.started_at).map_err(|_| {
288                anyhow!(
289                    "auction started_at {} exceeds i64 range",
290                    metadata.started_at
291                )
292            })?,
293            chain_start_time: metadata
294                .chain_start_time
295                .map(i64::try_from)
296                .transpose()
297                .map_err(|_| anyhow!("chain_start_time exceeds i64 range"))?,
298            margin_needed: Some(metadata.margin_needed),
299            stop_request_id: metadata.stop_request_id.clone(),
300            stop_tx_hash: metadata.stop_tx_hash.clone(),
301            completed_at: None,
302            liquidator_address: None,
303            bonus: None,
304            settlement_value: None,
305            last_observed_block: None,
306        })),
307        LiquidationState::Liquidated(metadata) => Ok(Some(NewLiquidationAuction {
308            auction_id: metadata.auction_id.clone(),
309            wallet_address: status.wallet,
310            status: "completed".to_string(),
311            positions: serde_json::Value::Array(Vec::new()),
312            equity_at_start: status.equity,
313            mm_shortfall_at_start: status.shortfall(),
314            target_equity: None,
315            request_id: None,
316            tx_hash: None,
317            // A resolved-only replay can land before the corresponding
318            // start row exists. Preserve the original started_at when an
319            // existing row is present; otherwise store an explicit
320            // "unknown start" sentinel instead of fabricating a zero-
321            // duration auction from completed_at.
322            started_at: 0,
323            chain_start_time: None,
324            margin_needed: None,
325            stop_request_id: None,
326            stop_tx_hash: None,
327            completed_at: Some(i64::try_from(metadata.completed_at).map_err(|_| {
328                anyhow!(
329                    "liquidated completed_at {} exceeds i64 range",
330                    metadata.completed_at
331                )
332            })?),
333            liquidator_address: metadata.winner,
334            bonus: Some(metadata.bonus),
335            settlement_value: None,
336            last_observed_block: None,
337        })),
338        _ => Ok(None),
339    }
340}
341
342fn liquidation_auction_update_from_message(
343    message: &LiquidationStateMessage,
344) -> Result<Option<UpdateLiquidationAuction>> {
345    if message.auction_id.is_none() {
346        return Ok(None);
347    }
348
349    let cancelled_update = || -> Result<Option<UpdateLiquidationAuction>> {
350        Ok(Some(UpdateLiquidationAuction {
351            status: Some("cancelled".to_string()),
352            request_id: None,
353            tx_hash: None,
354            chain_start_time: None,
355            margin_needed: None,
356            stop_request_id: None,
357            stop_tx_hash: None,
358            completed_at: Some(
359                i64::try_from(message.timestamp)
360                    .map_err(|_| anyhow!("liquidation timestamp exceeds i64 range"))?,
361            ),
362            liquidator_address: None,
363            bonus: None,
364            settlement_value: None,
365            last_observed_block: None,
366        }))
367    };
368
369    match (&message.previous_state, &message.new_state) {
370        (LiquidationStateType::InLiquidation, LiquidationStateType::Healthy) => cancelled_update(),
371        // A pending StartLiquidation directive failed or expired while still
372        // in PreLiquidation. The previous_liquidation_mode was "full" (pending
373        // full request existed) but has reverted to partial/none. Cancel the
374        // orphaned "pending" auction row so it does not stay pending forever.
375        (LiquidationStateType::PreLiquidation, LiquidationStateType::PreLiquidation)
376            if message.previous_liquidation_mode.as_deref() == Some("full")
377                && message.liquidation_mode.as_deref() != Some("full") =>
378        {
379            cancelled_update()
380        }
381        _ => Ok(None),
382    }
383}
384
385// ---------------------------------------------------------------------------
386// Fill persistence helpers
387// ---------------------------------------------------------------------------
388
389/// Persist a fill (trade + taker/maker fill rows) and apply ledger side effects
390/// in the caller's transaction.
391///
392/// Returns `(taker_inserted, maker_inserted)` booleans -- `false` means
393/// the row already existed (idempotent ON CONFLICT DO NOTHING).
394pub fn persist_fill_with_side_effects_in_tx(
395    conn: &mut PgConnection,
396    fill: &hypercall_types::Fill,
397    side_effects: &FillSideEffect,
398) -> Result<(bool, bool)> {
399    persist_fill_with_side_effects_in_tx_with_validation(
400        conn,
401        fill,
402        side_effects,
403        FillUnderlyingNotionalValidation::Strict,
404    )
405}
406
407/// Persist a journaled fill while preserving replay compatibility for
408/// pre-cutover WAL payloads that did not carry `underlying_notional`.
409pub fn persist_legacy_replay_fill_with_side_effects_in_tx(
410    conn: &mut PgConnection,
411    fill: &hypercall_types::Fill,
412    side_effects: &FillSideEffect,
413) -> Result<(bool, bool)> {
414    persist_fill_with_side_effects_in_tx_with_validation(
415        conn,
416        fill,
417        side_effects,
418        FillUnderlyingNotionalValidation::LegacyReplay,
419    )
420}
421
422fn persist_fill_with_side_effects_in_tx_with_validation(
423    conn: &mut PgConnection,
424    fill: &hypercall_types::Fill,
425    side_effects: &FillSideEffect,
426    validation: FillUnderlyingNotionalValidation,
427) -> Result<(bool, bool)> {
428    use diesel::sql_types::{BigInt, Binary, Bool, Text};
429    use rust_decimal_macros::dec;
430
431    assert_eq!(
432        side_effects.trade_id, fill.trade_id,
433        "CRITICAL: journal fill side effects trade_id mismatch for fill {}",
434        fill.trade_id
435    );
436    let underlying_notional =
437        validate_fill_underlying_notional(fill, side_effects.underlying_notional, validation)?;
438    let taker_realized_pnl = fill
439        .taker_realized_pnl
440        .unwrap_or(side_effects.taker_ledger_delta);
441    let maker_realized_pnl = fill
442        .maker_realized_pnl
443        .unwrap_or(side_effects.maker_ledger_delta);
444
445    diesel::sql_query(
446        "INSERT INTO trades (trade_id, symbol, price, size, maker_address, taker_address, maker_fee, taker_fee, timestamp)
447         VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
448         ON CONFLICT (trade_id) DO NOTHING",
449    )
450    .bind::<BigInt, _>(fill.trade_id as i64)
451    .bind::<Text, _>(&fill.symbol)
452    .bind::<Numeric, _>(fill.price)
453    .bind::<Numeric, _>(fill.size)
454    .bind::<Binary, _>(&fill.maker_wallet_address)
455    .bind::<Binary, _>(&fill.taker_wallet_address)
456    .bind::<Numeric, _>(dec!(0))
457    .bind::<Numeric, _>(fill.fee)
458    .bind::<BigInt, _>(fill.timestamp as i64)
459    .execute(conn)?;
460
461    let taker_rows: usize = diesel::sql_query(
462        "INSERT INTO fills (trade_id, wallet_address, symbol, price, size, fee, is_taker, timestamp, builder_code_address, builder_code_fee, realized_pnl, underlying_notional)
463         VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
464         ON CONFLICT (trade_id, wallet_address, is_taker) DO NOTHING",
465    )
466    .bind::<BigInt, _>(fill.trade_id as i64)
467    .bind::<Binary, _>(&fill.taker_wallet_address)
468    .bind::<Text, _>(&fill.symbol)
469    .bind::<Numeric, _>(fill.price)
470    .bind::<Numeric, _>(fill.size)
471    .bind::<Numeric, _>(fill.fee)
472    .bind::<Bool, _>(true)
473    .bind::<BigInt, _>(fill.timestamp as i64)
474    .bind::<diesel::sql_types::Nullable<Binary>, _>(fill.builder_code_address.as_ref())
475    .bind::<diesel::sql_types::Nullable<Numeric>, _>(fill.builder_code_fee)
476    .bind::<diesel::sql_types::Nullable<Numeric>, _>(Some(taker_realized_pnl))
477    .bind::<diesel::sql_types::Nullable<Numeric>, _>(underlying_notional)
478    .execute(conn)?;
479    let taker_inserted = taker_rows > 0;
480
481    if taker_inserted {
482        apply_fill_ledger_side_effects(
483            conn,
484            &fill.taker_wallet_address,
485            fill,
486            side_effects.taker_ledger_delta,
487            side_effects.taker_premium_delta,
488        )?;
489    }
490
491    let maker_rows: usize = diesel::sql_query(
492        "INSERT INTO fills (trade_id, wallet_address, symbol, price, size, fee, is_taker, timestamp, builder_code_address, builder_code_fee, realized_pnl, underlying_notional)
493         VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
494         ON CONFLICT (trade_id, wallet_address, is_taker) DO NOTHING",
495    )
496    .bind::<BigInt, _>(fill.trade_id as i64)
497    .bind::<Binary, _>(&fill.maker_wallet_address)
498    .bind::<Text, _>(&fill.symbol)
499    .bind::<Numeric, _>(fill.price)
500    .bind::<Numeric, _>(fill.size)
501    .bind::<Numeric, _>(dec!(0))
502    .bind::<Bool, _>(false)
503    .bind::<BigInt, _>(fill.timestamp as i64)
504    .bind::<diesel::sql_types::Nullable<Binary>, _>(
505        Option::<hypercall_types::WalletAddress>::None,
506    )
507    .bind::<diesel::sql_types::Nullable<Numeric>, _>(Option::<Decimal>::None)
508    .bind::<diesel::sql_types::Nullable<Numeric>, _>(Some(maker_realized_pnl))
509    .bind::<diesel::sql_types::Nullable<Numeric>, _>(underlying_notional)
510    .execute(conn)?;
511    let maker_inserted = maker_rows > 0;
512
513    if maker_inserted {
514        apply_fill_ledger_side_effects(
515            conn,
516            &fill.maker_wallet_address,
517            fill,
518            side_effects.maker_ledger_delta,
519            side_effects.maker_premium_delta,
520        )?;
521    }
522
523    Ok((taker_inserted, maker_inserted))
524}
525
526fn apply_fill_ledger_side_effects(
527    conn: &mut PgConnection,
528    wallet: &WalletAddress,
529    fill: &hypercall_types::Fill,
530    realized_delta: Decimal,
531    premium_delta: Decimal,
532) -> Result<()> {
533    use diesel::sql_types::{BigInt, Binary, Text};
534
535    let _total_delta = realized_delta + premium_delta;
536
537    if realized_delta != Decimal::ZERO {
538        let inserted = diesel::sql_query(
539            "INSERT INTO ledger_events
540             (wallet, event_ts_ms, delta, event_type, reference_trade_id, reference_symbol)
541             VALUES ($1, $2, $3, $4, $5, $6)
542             ON CONFLICT (wallet, reference_trade_id, event_type)
543             WHERE reference_trade_id IS NOT NULL
544             DO NOTHING",
545        )
546        .bind::<Binary, _>(wallet)
547        .bind::<BigInt, _>(fill.timestamp as i64)
548        .bind::<Numeric, _>(realized_delta)
549        .bind::<Text, _>("fill_realized_pnl")
550        .bind::<BigInt, _>(fill.trade_id as i64)
551        .bind::<diesel::sql_types::Nullable<Text>, _>(Some(fill.symbol.as_str()))
552        .execute(conn)?;
553        assert_eq!(
554            inserted, 1,
555            "CRITICAL: missing fill_realized_pnl ledger event for trade {} wallet {}",
556            fill.trade_id, wallet
557        );
558    }
559
560    if premium_delta != Decimal::ZERO {
561        let inserted = diesel::sql_query(
562            "INSERT INTO ledger_events
563             (wallet, event_ts_ms, delta, event_type, reference_trade_id, reference_symbol)
564             VALUES ($1, $2, $3, $4, $5, $6)
565             ON CONFLICT (wallet, reference_trade_id, event_type)
566             WHERE reference_trade_id IS NOT NULL
567             DO NOTHING",
568        )
569        .bind::<Binary, _>(wallet)
570        .bind::<BigInt, _>(fill.timestamp as i64)
571        .bind::<Numeric, _>(premium_delta)
572        .bind::<Text, _>("fill_premium")
573        .bind::<BigInt, _>(fill.trade_id as i64)
574        .bind::<diesel::sql_types::Nullable<Text>, _>(Some(fill.symbol.as_str()))
575        .execute(conn)?;
576        assert_eq!(
577            inserted, 1,
578            "CRITICAL: missing fill_premium ledger event for trade {} wallet {}",
579            fill.trade_id, wallet
580        );
581    }
582
583    Ok(())
584}
585
586// ---------------------------------------------------------------------------
587// Portable fill side-effect struct
588// ---------------------------------------------------------------------------
589
590/// Journal-only accounting deltas for a single OrderFilled event.
591///
592/// This is the portable version of `JournalFillSideEffect` from the root
593/// crate's journal batcher.
594pub struct FillSideEffect {
595    pub trade_id: u64,
596    pub taker_ledger_delta: Decimal,
597    pub maker_ledger_delta: Decimal,
598    pub taker_premium_delta: Decimal,
599    pub maker_premium_delta: Decimal,
600    pub underlying_notional: Option<Decimal>,
601}
602
603#[derive(Debug, Clone, Copy, PartialEq, Eq)]
604enum FillUnderlyingNotionalValidation {
605    Strict,
606    LegacyReplay,
607}
608
609fn validate_fill_underlying_notional(
610    fill: &hypercall_types::Fill,
611    side_effect_underlying_notional: Option<Decimal>,
612    validation: FillUnderlyingNotionalValidation,
613) -> Result<Option<Decimal>> {
614    if !is_option_symbol(&fill.symbol) {
615        return Ok(fill.underlying_notional.or(side_effect_underlying_notional));
616    }
617
618    if validation == FillUnderlyingNotionalValidation::LegacyReplay {
619        if let (Some(fill_notional), Some(side_effect_notional)) =
620            (fill.underlying_notional, side_effect_underlying_notional)
621        {
622            anyhow::ensure!(
623                fill_notional == side_effect_notional,
624                "CRITICAL: option fill underlying_notional mismatch for trade {}: fill={} side_effect={}",
625                fill.trade_id,
626                fill_notional,
627                side_effect_notional
628            );
629        }
630        return Ok(fill.underlying_notional.or(side_effect_underlying_notional));
631    }
632
633    let fill_notional = fill.underlying_notional.ok_or_else(|| {
634        anyhow!(
635            "CRITICAL: option fill {} for trade {} is missing underlying_notional",
636            fill.symbol,
637            fill.trade_id
638        )
639    })?;
640    let side_effect_notional = side_effect_underlying_notional.ok_or_else(|| {
641        anyhow!(
642            "CRITICAL: option fill side effects for trade {} are missing underlying_notional",
643            fill.trade_id
644        )
645    })?;
646    anyhow::ensure!(
647        fill_notional == side_effect_notional,
648        "CRITICAL: option fill underlying_notional mismatch for trade {}: fill={} side_effect={}",
649        fill.trade_id,
650        fill_notional,
651        side_effect_notional
652    );
653    Ok(Some(fill_notional))
654}
655
656impl FillSideEffect {
657    pub fn from_fill_accounting(accounting: &hypercall_types::FillAccounting) -> Self {
658        accounting.assert_cash_decomposition();
659        Self {
660            trade_id: accounting.trade_id,
661            taker_ledger_delta: accounting.taker_ledger_residual_delta(),
662            maker_ledger_delta: accounting.maker_ledger_residual_delta(),
663            taker_premium_delta: accounting.taker_premium_delta(),
664            maker_premium_delta: accounting.maker_premium_delta(),
665            underlying_notional: None,
666        }
667    }
668}
669
670// ---------------------------------------------------------------------------
671// DatabaseHandler event methods
672// ---------------------------------------------------------------------------
673
674impl DatabaseHandler {
675    /// Upsert static order info into order_infos. Used by `handle_event_with_conn`
676    /// and the `OrderWriter` trait impl.
677    pub(crate) fn upsert_order_info(conn: &mut PgConnection, info: &NewOrderInfo) -> Result<()> {
678        use diesel::upsert::excluded;
679
680        diesel::insert_into(schema::order_infos::table)
681            .values(info)
682            .on_conflict(schema::order_infos::order_id)
683            .do_update()
684            .set((
685                schema::order_infos::wallet_address
686                    .eq(excluded(schema::order_infos::wallet_address)),
687                schema::order_infos::symbol.eq(excluded(schema::order_infos::symbol)),
688                schema::order_infos::side.eq(excluded(schema::order_infos::side)),
689                schema::order_infos::price.eq(excluded(schema::order_infos::price)),
690                schema::order_infos::size.eq(excluded(schema::order_infos::size)),
691                schema::order_infos::tif.eq(excluded(schema::order_infos::tif)),
692                schema::order_infos::client_id.eq(excluded(schema::order_infos::client_id)),
693                schema::order_infos::is_perp.eq(excluded(schema::order_infos::is_perp)),
694                schema::order_infos::underlying.eq(excluded(schema::order_infos::underlying)),
695                schema::order_infos::reduce_only.eq(excluded(schema::order_infos::reduce_only)),
696                schema::order_infos::nonce.eq(excluded(schema::order_infos::nonce)),
697                schema::order_infos::signature.eq(excluded(schema::order_infos::signature)),
698                schema::order_infos::mmp_enabled.eq(excluded(schema::order_infos::mmp_enabled)),
699                schema::order_infos::timestamp.eq(excluded(schema::order_infos::timestamp)),
700            ))
701            .execute(conn)?;
702        Ok(())
703    }
704
705    /// Insert an order action audit trail entry. Used by `handle_event_with_conn`
706    /// and the `OrderWriter` trait impl.
707    pub(crate) fn insert_order_action(
708        conn: &mut PgConnection,
709        action: &NewOrderAction,
710    ) -> Result<()> {
711        diesel::insert_into(schema::order_actions::table)
712            .values(action)
713            .execute(conn)?;
714        Ok(())
715    }
716
717    /// Persist a single engine event, acquiring a connection from the pool.
718    pub fn handle_event_sync(&self, event: &EngineMessage) -> Result<()> {
719        let mut conn = self.pool().get()?;
720        self.handle_event_with_conn(&mut conn, event)
721    }
722
723    /// Persist a batch of engine events in a single transaction.
724    pub fn handle_event_batch_sync(&self, events: &[EngineMessage]) -> Result<()> {
725        let mut conn = self.pool().get()?;
726
727        (*conn).transaction::<_, anyhow::Error, _>(|conn| {
728            for event in events {
729                self.handle_event_with_conn(conn, event)?;
730            }
731            Ok(())
732        })?;
733
734        Ok(())
735    }
736
737    /// Persist a single engine event using the provided connection.
738    pub fn handle_event_with_conn(
739        &self,
740        conn: &mut PgConnection,
741        event: &EngineMessage,
742    ) -> Result<()> {
743        match event {
744            EngineMessage::OrderInfo(order_info_msg) => {
745                let new_order_info = NewOrderInfo {
746                    order_id: order_info_msg.order_id as i64,
747                    wallet_address: order_info_msg.wallet,
748                    symbol: order_info_msg.info.symbol.clone(),
749                    side: format!("{:?}", order_info_msg.info.side),
750                    price: order_info_msg.info.price,
751                    size: order_info_msg.info.size,
752                    tif: format!("{:?}", order_info_msg.info.tif),
753                    client_id: order_info_msg.info.client_id.clone(),
754                    is_perp: order_info_msg.info.is_perp,
755                    underlying: order_info_msg.info.underlying.clone(),
756                    reduce_only: order_info_msg.info.reduce_only,
757                    nonce: order_info_msg.info.nonce.map(|n| n as i64),
758                    signature: order_info_msg.info.signature.clone(),
759                    mmp_enabled: order_info_msg.info.mmp_enabled,
760                    timestamp: order_info_msg.timestamp as i64,
761                    status: "ACKED".to_string(),
762                    filled_size: Decimal::ZERO,
763                    last_materialized_order_update_id: 0,
764                };
765                Self::upsert_order_info(conn, &new_order_info)?;
766            }
767            EngineMessage::OrderAction(action) => {
768                let new_action = NewOrderAction {
769                    timestamp: action.timestamp as i64,
770                    wallet: action.wallet,
771                    action: format!("{:?}", action.action),
772                    symbol: action.info.symbol.clone(),
773                    price: action.info.price,
774                    size: action.info.size,
775                    side: format!("{:?}", action.info.side),
776                    tif: format!("{:?}", action.info.tif),
777                    client_id: action.info.client_id.clone(),
778                };
779                Self::insert_order_action(conn, &new_action)?;
780            }
781            EngineMessage::OrderUpdate(update) => {
782                let current_status = Self::order_update_status_to_db(update.status).to_string();
783
784                // Store the raw order update
785                let new_order_update = NewOrderUpdate {
786                    timestamp: update.timestamp as i64,
787                    order_id: update.order_id.map(|id| id as i64),
788                    status: current_status.clone(),
789                    reason: update.reason.clone(),
790                    filled_size: update.filled_size,
791                    symbol: update.info.symbol.clone(),
792                    price: update.info.price,
793                    size: update.info.size,
794                    side: format!("{:?}", update.info.side),
795                    tif: format!("{:?}", update.info.tif),
796                    client_id: update.info.client_id.clone(),
797                };
798
799                let order_update_id = diesel::insert_into(schema::order_updates::table)
800                    .values(&new_order_update)
801                    .returning(schema::order_updates::id)
802                    .get_result::<Option<i32>>(conn)?
803                    .expect("order_updates.id should be populated after insert");
804
805                if let Some(order_id) = update.order_id {
806                    let materialized_order_info = NewOrderInfo {
807                        order_id: order_id as i64,
808                        wallet_address: update.wallet_address,
809                        symbol: update.info.symbol.clone(),
810                        side: format!("{:?}", update.info.side),
811                        price: update.info.price,
812                        size: update.info.size,
813                        tif: format!("{:?}", update.info.tif),
814                        client_id: update.info.client_id.clone(),
815                        is_perp: update.info.is_perp,
816                        underlying: update.info.underlying.clone(),
817                        reduce_only: update.info.reduce_only,
818                        nonce: update.info.nonce.map(|n| n as i64),
819                        signature: update.info.signature.clone(),
820                        mmp_enabled: update.info.mmp_enabled,
821                        timestamp: update.timestamp as i64,
822                        status: current_status,
823                        filled_size: update.filled_size,
824                        last_materialized_order_update_id: order_update_id,
825                    };
826
827                    upsert_materialized_order_state(conn, &materialized_order_info)?;
828                }
829
830                // Handle rejected orders
831                if let OrderUpdateStatus::Rejected = update.status {
832                    let new_rejected = NewRejectedOrder {
833                        wallet_address: update.wallet_address,
834                        symbol: update.info.symbol.clone(),
835                        side: format!("{:?}", update.info.side),
836                        price: update.info.price,
837                        size: update.info.size,
838                        reason: update
839                            .reason
840                            .clone()
841                            .unwrap_or_else(|| "Unknown".to_string()),
842                        timestamp: update.timestamp as i64,
843                    };
844
845                    diesel::insert_into(schema::rejected_orders::table)
846                        .values(&new_rejected)
847                        .execute(conn)?;
848                }
849            }
850            EngineMessage::OrderFilled { .. } => {
851                // No-op: fills and trades are now written inline by the journal
852                // batcher (insert_batch) in the same transaction as engine_commands
853                // and engine_events. Publishing of OrderFilled events to the event bus
854                // still happens so market_stats_cache receives real-time volume updates.
855            }
856            EngineMessage::OrderbookUpdated(_update) => {
857                // OrderbookUpdated events are no longer persisted (snapshot tables removed)
858            }
859            EngineMessage::MarketAction(action) => {
860                // Store the market action
861                let new_action = NewMarketAction {
862                    timestamp: action.timestamp as i64,
863                    action: format!("{:?}", action.action),
864                    symbol: action.market.symbol.clone(),
865                    underlying: action.market.underlying.clone(),
866                    expiry: action.market.expiry as i64,
867                    strike: action.market.strike,
868                    option_type: match action.market.option_type {
869                        OptionType::Call => "call".to_string(),
870                        OptionType::Put => "put".to_string(),
871                    },
872                };
873
874                diesel::insert_into(schema::market_actions::table)
875                    .values(&new_action)
876                    .execute(conn)?;
877            }
878            EngineMessage::MarketUpdate(update) => {
879                // Store the raw market update
880                let new_market_update = NewMarketUpdate {
881                    timestamp: update.timestamp as i64,
882                    status: format!("{:?}", update.status),
883                    symbol: update.market.symbol.clone(),
884                    underlying: update.market.underlying.clone(),
885                    expiry: update.market.expiry as i64,
886                    strike: update.market.strike,
887                    option_type: match update.market.option_type {
888                        OptionType::Call => "call".to_string(),
889                        OptionType::Put => "put".to_string(),
890                    },
891                };
892
893                diesel::insert_into(schema::market_updates::table)
894                    .values(&new_market_update)
895                    .execute(conn)?;
896
897                match update.status {
898                    MarketUpdateStatus::MarketCreated | MarketUpdateStatus::MarketAlreadyExists => {
899                        // Insert market if not exists (idempotent - ON CONFLICT DO NOTHING)
900                        let new_market = Market {
901                            underlying: update.market.underlying.clone(),
902                            expiry: update.market.expiry as i64,
903                        };
904
905                        // Use INSERT ON CONFLICT DO NOTHING for PostgreSQL
906                        diesel::sql_query(
907                            "INSERT INTO markets (underlying, expiry) VALUES ($1, $2) ON CONFLICT (underlying, expiry) DO NOTHING"
908                        )
909                        .bind::<diesel::sql_types::Text, _>(&new_market.underlying)
910                        .bind::<diesel::sql_types::BigInt, _>(new_market.expiry)
911                        .execute(conn)?;
912
913                        // Insert instrument
914                        let option_type = match update.market.option_type {
915                            OptionType::Call => "call",
916                            OptionType::Put => "put",
917                        };
918
919                        let instrument_strike = update.market.strike;
920                        let deployment = current_option_token_deployment()?;
921                        let option_token_address = hypercall_types::derive_option_token_address(
922                            deployment,
923                            &update.market.underlying,
924                            update.market.expiry,
925                            instrument_strike,
926                            option_type,
927                        )?;
928
929                        let new_instrument = Instrument {
930                            instrument_numeric_id: 0,
931                            id: update.market.symbol.clone(),
932                            underlying: update.market.underlying.clone(),
933                            strike: instrument_strike,
934                            expiry: update.market.expiry as i64,
935                            option_type: option_type.to_string(),
936                            option_token_address: Some(option_token_address),
937                            status: "ACTIVE".to_string(),
938                            trading_mode: "orderbook".to_string(),
939                        };
940
941                        // Use INSERT ON CONFLICT DO NOTHING for PostgreSQL
942                        if let Err(err) = diesel::sql_query(
943                            "INSERT INTO instruments (id, underlying, strike, expiry, option_type, status, option_token_address, trading_mode) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (id) DO NOTHING"
944                        )
945                        .bind::<diesel::sql_types::Text, _>(&new_instrument.id)
946                        .bind::<diesel::sql_types::Text, _>(&new_instrument.underlying)
947                        .bind::<diesel::sql_types::Numeric, _>(instrument_strike)
948                        .bind::<diesel::sql_types::BigInt, _>(new_instrument.expiry)
949                        .bind::<diesel::sql_types::Text, _>(&new_instrument.option_type)
950                        .bind::<diesel::sql_types::Text, _>(&new_instrument.status)
951                        .bind::<diesel::sql_types::Bytea, _>(option_token_address)
952                        .bind::<diesel::sql_types::Text, _>(&new_instrument.trading_mode)
953                        .execute(conn) {
954                            Self::observe_diesel_option_token_violation(&err);
955                            return Err(err.into());
956                        }
957                    }
958                    MarketUpdateStatus::MarketDeleted | MarketUpdateStatus::MarketExpired => {
959                        diesel::delete(schema::instruments::table)
960                            .filter(schema::instruments::id.eq(&update.market.symbol))
961                            .execute(conn)?;
962                    }
963                    MarketUpdateStatus::MarketPendingSettlement => {
964                        // Transition-to-pending updates instrument status directly via ExpiryManager.
965                        // Keep instrument row until settlement completes.
966                    }
967                    MarketUpdateStatus::MarketCreationFailed
968                    | MarketUpdateStatus::MarketDeletionFailed => {
969                        // Market lifecycle update failed - no database mutation needed here.
970                        // No database action needed
971                    }
972                }
973            }
974            EngineMessage::L2Update(_) => {
975                // L2 updates are incremental orderbook changes - not persisted here
976            }
977            EngineMessage::Trade(_trade_msg) => {
978                // Trade messages are public broadcast messages
979                // We store them but they don't have buyer/seller info
980                // The actual trade with buyer/seller is handled in OrderFilled
981                // For now, we skip this as the real trade is saved in OrderFilled
982            }
983            EngineMessage::TransactionRequest(tx_req) => {
984                // TODO: Store transaction requests in database
985                tracing::info!("Persisting transaction request: {}", tx_req.request_id);
986            }
987            EngineMessage::TransactionUpdate(tx_update) => {
988                Self::persist_transaction_update(conn, tx_update)?;
989                tracing::info!(
990                    "Persisting transaction update: {} -> {:?}",
991                    tx_update.request_id,
992                    tx_update.status
993                );
994            }
995            EngineMessage::MmpTriggered(mmp_msg) => {
996                tracing::warn!(
997                    "Persisting MMP trigger: wallet={}, currency={}, reason={}",
998                    mmp_msg.wallet,
999                    mmp_msg.currency,
1000                    mmp_msg.reason
1001                );
1002            }
1003            EngineMessage::PositionExpired(expiry_msg) => {
1004                // Idempotent settlement: try to insert, skip if already exists
1005                // This prevents double-crediting on replay/restart
1006                let intent =
1007                    hypercall_settlement::SettlementPersistenceIntent::from_position_expired(
1008                        expiry_msg,
1009                    )
1010                    .map_err(|e| anyhow!("{}", e))?;
1011                let context = format!("{}/{}", expiry_msg.wallet_address, expiry_msg.symbol);
1012                let settlement_entry_price = intent
1013                    .economics
1014                    .map(|economics| economics.settlement_entry_price);
1015                let cost_basis = intent.economics.map(|economics| economics.cost_basis);
1016                let net_pnl = intent.economics.map(|economics| economics.net_pnl);
1017
1018                let new_expiration = NewPositionExpiration {
1019                    wallet: intent.wallet,
1020                    symbol: intent.symbol.clone(),
1021                    expiry_ts: intent.expiry_ts,
1022                    settlement_price: intent.settlement_price,
1023                    settlement_value: intent.settlement_value,
1024                };
1025
1026                let payout = NewSettlementPayout {
1027                    wallet: intent.wallet,
1028                    symbol: intent.symbol.clone(),
1029                    expiry_ts: intent.expiry_ts,
1030                    position_size: intent.position_size,
1031                    settlement_price: intent.settlement_price,
1032                    payout_amount: intent.settlement_value,
1033                    ledger_applied: true,
1034                    settlement_entry_price,
1035                    cost_basis,
1036                    net_pnl,
1037                };
1038
1039                let settlement_cashflow =
1040                    crate::settlement_ops::settlement_ledger_delta_for_margin_mode(
1041                        intent.margin_mode,
1042                        intent.settlement_value,
1043                        net_pnl,
1044                        &context,
1045                    )?;
1046
1047                let outcome = conn.transaction::<_, anyhow::Error, _>(|tx_conn| {
1048                    crate::settlement_ops::claim_settlement_in_tx(
1049                        tx_conn,
1050                        &new_expiration,
1051                        &payout,
1052                        settlement_cashflow,
1053                        expiry_msg.timestamp as i64,
1054                    )
1055                })?;
1056
1057                if !outcome.newly_persisted {
1058                    tracing::info!(
1059                        "Settlement already applied for {}/{}, skipping (idempotency)",
1060                        expiry_msg.wallet_address,
1061                        expiry_msg.symbol
1062                    );
1063                } else {
1064                    tracing::info!(
1065                        "Settlement persisted: wallet={}, symbol={}, size={}, payout={}",
1066                        expiry_msg.wallet_address,
1067                        expiry_msg.symbol,
1068                        expiry_msg.position_size,
1069                        expiry_msg.settlement_value
1070                    );
1071                }
1072            }
1073            EngineMessage::TierUpdate(_tier_msg) => {
1074                // TierUpdates are for cross-process cache sync.
1075                // DB is already updated via tier_cache.set_margin_mode() before publishing.
1076                // No persistence action needed here.
1077            }
1078            EngineMessage::HypercorePositionUpdate(update) => {
1079                // HypercorePositionUpdate is handled by PortfolioService, not persisted here
1080                tracing::debug!(
1081                    "DieselHandler: Hypercore position update - account={}, coin={}, size={}",
1082                    update.account,
1083                    update.coin,
1084                    update.size
1085                );
1086            }
1087            EngineMessage::LiquidationStateChange(liq_msg) => {
1088                if let Some(mut auction_row) = liquidation_auction_from_status(&liq_msg.status)? {
1089                    if let Some(existing_auction) =
1090                        self.get_liquidation_auction_with_conn(conn, &auction_row.auction_id)?
1091                    {
1092                        if auction_row.stop_request_id.is_none() {
1093                            auction_row.stop_request_id = existing_auction.stop_request_id.clone();
1094                        }
1095                        if auction_row.stop_tx_hash.is_none() {
1096                            auction_row.stop_tx_hash = existing_auction.stop_tx_hash.clone();
1097                        }
1098                        if auction_row.last_observed_block.is_none() {
1099                            auction_row.last_observed_block = existing_auction.last_observed_block;
1100                        }
1101                        // Preserve target_equity from the existing row when the
1102                        // new state (e.g. InLiquidation) does not carry it.
1103                        if auction_row.target_equity.is_none() {
1104                            auction_row.target_equity = existing_auction.target_equity;
1105                        }
1106
1107                        if matches!(liq_msg.status.state, LiquidationState::Liquidated(..)) {
1108                            if auction_row.request_id.is_none() {
1109                                auction_row.request_id = existing_auction.request_id;
1110                            }
1111                            if auction_row.tx_hash.is_none() {
1112                                auction_row.tx_hash = existing_auction.tx_hash;
1113                            }
1114                            auction_row.started_at = existing_auction.started_at;
1115                            if auction_row.chain_start_time.is_none() {
1116                                auction_row.chain_start_time = existing_auction.chain_start_time;
1117                            }
1118                            if auction_row.margin_needed.is_none() {
1119                                auction_row.margin_needed = existing_auction.margin_needed;
1120                            }
1121                            if auction_row.settlement_value.is_none() {
1122                                auction_row.settlement_value = existing_auction.settlement_value;
1123                            }
1124                        }
1125                    }
1126                    self.upsert_liquidation_auction_with_conn(conn, &auction_row)?;
1127                } else if let Some(update) = liquidation_auction_update_from_message(liq_msg)? {
1128                    self.update_liquidation_auction_with_conn(
1129                        conn,
1130                        liq_msg
1131                            .auction_id
1132                            .as_deref()
1133                            .expect("auction update requires auction_id"),
1134                        &update,
1135                    )?;
1136                }
1137
1138                let domain_state = status_to_liquidation_state_record(&liq_msg.status);
1139                let state_row = new_liquidation_state_from_domain(&domain_state);
1140                self.save_liquidation_state_with_conn(conn, &state_row)?;
1141
1142                let should_persist_history =
1143                    liq_msg.previous_state != liq_msg.new_state || liq_msg.projection_changed;
1144                if should_persist_history {
1145                    let history_row = liquidation_history_from_message(liq_msg)?;
1146                    self.insert_liquidation_history_with_conn(conn, &history_row)?;
1147                }
1148                tracing::info!(
1149                    "DieselHandler: persisted liquidation state change - wallet={}, {} -> {}",
1150                    liq_msg.wallet,
1151                    liq_msg.previous_state,
1152                    liq_msg.new_state
1153                );
1154            }
1155            EngineMessage::RfqFilled(msg) => {
1156                tracing::info!(
1157                    "DieselHandler: RFQ filled - rfq_id={}, taker={}, qp={}",
1158                    msg.rfq_id,
1159                    msg.taker_wallet,
1160                    msg.qp_wallet
1161                );
1162                // Persist the executed RFQ fill so the durable audit trail
1163                // (rfq_id / quote_id / taker accept signature) survives
1164                // restart and can be queried from DB.
1165                diesel::sql_query(
1166                    "INSERT INTO rfq_fills (fill_id, rfq_id, quote_id, taker_wallet, qp_wallet, net_premium, taker_accept_signature)
1167                     VALUES ($1::uuid, $2::uuid, $3::uuid, $4, $5, $6, $7)
1168                     ON CONFLICT (rfq_id, quote_id) DO NOTHING",
1169                )
1170                .bind::<diesel::sql_types::Text, _>(msg.fill_id.as_str())
1171                .bind::<diesel::sql_types::Text, _>(msg.rfq_id.as_str())
1172                .bind::<diesel::sql_types::Text, _>(msg.quote_id.as_str())
1173                .bind::<diesel::sql_types::Binary, _>(msg.taker_wallet.as_bytes())
1174                .bind::<diesel::sql_types::Binary, _>(msg.qp_wallet.as_bytes())
1175                .bind::<diesel::sql_types::Numeric, _>(msg.net_premium)
1176                .bind::<diesel::sql_types::Text, _>(msg.taker_accept_signature.as_str())
1177                .execute(conn)?;
1178            }
1179        }
1180        Ok(())
1181    }
1182
1183    // ===== Batch cancel for expired orders =====
1184
1185    /// Batch-insert CANCELED rows into order_updates for orders on expired instruments.
1186    /// Skips orders that already have a terminal status (CANCELED, FILLED, REJECTED)
1187    /// in order_infos for idempotency.
1188    ///
1189    /// Runs as a single serializable transaction with retry.
1190    pub fn batch_cancel_expired_orders_sync(
1191        &self,
1192        cancels: &[ExpiredOrderCancel],
1193        reason: &str,
1194    ) -> Result<usize> {
1195        if cancels.is_empty() {
1196            return Ok(0);
1197        }
1198
1199        let mut conn = self.pool().get()?;
1200
1201        let now_ms = std::time::SystemTime::now()
1202            .duration_since(std::time::UNIX_EPOCH)
1203            .map_err(|e| {
1204                anyhow::anyhow!(
1205                    "System clock before UNIX_EPOCH during startup cancel flush: {}",
1206                    e
1207                )
1208            })?
1209            .as_millis() as i64;
1210
1211        const SERIALIZATION_RETRY_LIMIT: usize = 3;
1212        let candidate_ids: Vec<i64> = cancels.iter().map(|c| c.order_id as i64).collect();
1213
1214        #[derive(QueryableByName)]
1215        struct TerminalId {
1216            #[diesel(sql_type = diesel::sql_types::BigInt)]
1217            order_id: i64,
1218        }
1219
1220        #[derive(Queryable)]
1221        struct InsertedOrderUpdateId {
1222            id: Option<i32>,
1223            order_id: Option<i64>,
1224        }
1225
1226        for attempt in 0..SERIALIZATION_RETRY_LIMIT {
1227            let tx_result = conn
1228                .build_transaction()
1229                .serializable()
1230                .run::<_, diesel::result::Error, _>(|conn| {
1231                    // Read terminal state and synthetic-cancel writes in the same
1232                    // serializable transaction so a concurrent real terminal write
1233                    // forces a retry instead of silently appending a stale cancel.
1234                    let terminal_ids: std::collections::HashSet<i64> = diesel::sql_query(
1235                        "SELECT order_id FROM ( \
1236                             SELECT oi.order_id \
1237                             FROM order_infos oi \
1238                             WHERE oi.order_id = ANY($1) \
1239                             AND oi.status IN ('CANCELED', 'FILLED', 'REJECTED') \
1240                             UNION \
1241                             SELECT ou.order_id \
1242                             FROM order_updates ou \
1243                             WHERE ou.order_id = ANY($1) \
1244                             AND ou.status IN ('CANCELED', 'FILLED', 'REJECTED') \
1245                         ) terminal_orders",
1246                    )
1247                    .bind::<diesel::sql_types::Array<BigInt>, _>(&candidate_ids)
1248                    .load::<TerminalId>(conn)?
1249                    .into_iter()
1250                    .map(|r| r.order_id)
1251                    .collect();
1252
1253                    let active_cancels: Vec<&ExpiredOrderCancel> = cancels
1254                        .iter()
1255                        .filter(|c| !terminal_ids.contains(&(c.order_id as i64)))
1256                        .collect();
1257
1258                    let new_updates: Vec<NewOrderUpdate> = active_cancels
1259                        .iter()
1260                        .map(|cancel| NewOrderUpdate {
1261                            timestamp: now_ms,
1262                            order_id: Some(cancel.order_id as i64),
1263                            status: "CANCELED".to_string(),
1264                            reason: Some(reason.to_string()),
1265                            filled_size: cancel.filled_size,
1266                            symbol: cancel.symbol.clone(),
1267                            price: cancel.price,
1268                            size: cancel.size,
1269                            side: format!("{:?}", cancel.side),
1270                            tif: "GTC".to_string(),
1271                            client_id: cancel.client_id.clone(),
1272                        })
1273                        .collect();
1274
1275                    if new_updates.is_empty() {
1276                        return Ok(0);
1277                    }
1278
1279                    let inserted_updates: Vec<InsertedOrderUpdateId> =
1280                        diesel::insert_into(schema::order_updates::table)
1281                            .values(&new_updates)
1282                            .returning((schema::order_updates::id, schema::order_updates::order_id))
1283                            .get_results(conn)?;
1284
1285                    let inserted = inserted_updates.len();
1286
1287                    let inserted_update_ids: std::collections::HashMap<i64, i32> =
1288                        inserted_updates
1289                            .into_iter()
1290                            .map(|row| {
1291                                let order_id = row.order_id.expect(
1292                                    "startup expired cancel insert returned NULL order_id; persisted data invariant broken",
1293                                );
1294                                let update_id = row.id.expect(
1295                                    "startup expired cancel insert returned NULL update id; persisted data invariant broken",
1296                                );
1297                                (order_id, update_id)
1298                            })
1299                            .collect();
1300
1301                    // Keep the materialized hard-cutover state in sync so replayed
1302                    // startup cancels remain idempotent even before any later
1303                    // reader/backfill pass.
1304                    for cancel in active_cancels {
1305                        let order_update_id = *inserted_update_ids
1306                            .get(&(cancel.order_id as i64))
1307                            .expect("missing inserted startup cancel update id for order");
1308
1309                        let materialized_order_info = NewOrderInfo {
1310                            order_id: cancel.order_id as i64,
1311                            wallet_address: cancel.wallet,
1312                            symbol: cancel.symbol.clone(),
1313                            side: format!("{:?}", cancel.side),
1314                            price: cancel.price,
1315                            size: cancel.size,
1316                            tif: format!("{:?}", cancel.tif),
1317                            client_id: cancel.client_id.clone(),
1318                            is_perp: cancel.is_perp,
1319                            underlying: cancel.underlying.clone(),
1320                            reduce_only: cancel.reduce_only,
1321                            nonce: cancel.nonce.map(|nonce| nonce as i64),
1322                            signature: cancel.signature.clone(),
1323                            mmp_enabled: cancel.mmp_enabled,
1324                            timestamp: cancel.timestamp as i64,
1325                            status: "CANCELED".to_string(),
1326                            filled_size: cancel.filled_size,
1327                            last_materialized_order_update_id: order_update_id,
1328                        };
1329
1330                        upsert_materialized_order_state(conn, &materialized_order_info)?;
1331                    }
1332
1333                    Ok(inserted)
1334                });
1335
1336            match tx_result {
1337                Ok(inserted) => return Ok(inserted),
1338                Err(diesel::result::Error::DatabaseError(
1339                    diesel::result::DatabaseErrorKind::SerializationFailure,
1340                    _,
1341                )) if attempt + 1 < SERIALIZATION_RETRY_LIMIT => {
1342                    tracing::warn!(
1343                        attempt = attempt + 1,
1344                        "Retrying startup expired cancel batch after serialization failure"
1345                    );
1346                }
1347                Err(err) => return Err(err.into()),
1348            }
1349        }
1350
1351        Err(anyhow::anyhow!(
1352            "startup expired cancel batch exceeded serialization retry budget"
1353        ))
1354    }
1355
1356    // ===== Liquidation CRUD helpers (sync _with_conn for engine event loop) =====
1357
1358    fn save_liquidation_state_with_conn(
1359        &self,
1360        conn: &mut PgConnection,
1361        state: &NewLiquidationState,
1362    ) -> Result<()> {
1363        diesel::insert_into(schema::liquidation_states::table)
1364            .values(state)
1365            .on_conflict(schema::liquidation_states::wallet_address)
1366            .do_update()
1367            .set(state)
1368            .execute(conn)?;
1369        Ok(())
1370    }
1371
1372    fn insert_liquidation_history_with_conn(
1373        &self,
1374        conn: &mut PgConnection,
1375        entry: &NewLiquidationHistory,
1376    ) -> Result<i64> {
1377        use schema::liquidation_history::dsl as lh;
1378
1379        let inserted_id = diesel::insert_into(schema::liquidation_history::table)
1380            .values(entry)
1381            .on_conflict_do_nothing()
1382            .returning(lh::id)
1383            .get_result::<i64>(conn)
1384            .optional()?
1385            .unwrap_or(0);
1386
1387        Ok(inserted_id)
1388    }
1389
1390    fn upsert_liquidation_auction_with_conn(
1391        &self,
1392        conn: &mut PgConnection,
1393        auction: &NewLiquidationAuction,
1394    ) -> Result<()> {
1395        let query = diesel::insert_into(schema::liquidation_auctions::table)
1396            .values(auction)
1397            .on_conflict(schema::liquidation_auctions::auction_id)
1398            .do_update();
1399
1400        if auction.completed_at.is_some() {
1401            query
1402                .set((
1403                    schema::liquidation_auctions::status.eq(&auction.status),
1404                    schema::liquidation_auctions::positions.eq(&auction.positions),
1405                    schema::liquidation_auctions::target_equity.eq(auction.target_equity),
1406                    schema::liquidation_auctions::request_id.eq(&auction.request_id),
1407                    schema::liquidation_auctions::tx_hash.eq(&auction.tx_hash),
1408                    schema::liquidation_auctions::chain_start_time.eq(auction.chain_start_time),
1409                    schema::liquidation_auctions::margin_needed.eq(auction.margin_needed),
1410                    schema::liquidation_auctions::stop_request_id.eq(&auction.stop_request_id),
1411                    schema::liquidation_auctions::stop_tx_hash.eq(&auction.stop_tx_hash),
1412                    schema::liquidation_auctions::completed_at.eq(auction.completed_at),
1413                    schema::liquidation_auctions::liquidator_address.eq(auction.liquidator_address),
1414                    schema::liquidation_auctions::bonus.eq(auction.bonus),
1415                    schema::liquidation_auctions::settlement_value.eq(auction.settlement_value),
1416                    schema::liquidation_auctions::last_observed_block
1417                        .eq(auction.last_observed_block),
1418                ))
1419                .execute(conn)?;
1420        } else {
1421            query
1422                .set((
1423                    schema::liquidation_auctions::status.eq(&auction.status),
1424                    schema::liquidation_auctions::positions.eq(&auction.positions),
1425                    schema::liquidation_auctions::target_equity.eq(auction.target_equity),
1426                    schema::liquidation_auctions::request_id.eq(&auction.request_id),
1427                    schema::liquidation_auctions::tx_hash.eq(&auction.tx_hash),
1428                    schema::liquidation_auctions::started_at.eq(auction.started_at),
1429                    schema::liquidation_auctions::chain_start_time.eq(auction.chain_start_time),
1430                    schema::liquidation_auctions::margin_needed.eq(auction.margin_needed),
1431                    schema::liquidation_auctions::stop_request_id.eq(&auction.stop_request_id),
1432                    schema::liquidation_auctions::stop_tx_hash.eq(&auction.stop_tx_hash),
1433                    schema::liquidation_auctions::completed_at.eq(auction.completed_at),
1434                    schema::liquidation_auctions::liquidator_address.eq(auction.liquidator_address),
1435                    schema::liquidation_auctions::bonus.eq(auction.bonus),
1436                    schema::liquidation_auctions::settlement_value.eq(auction.settlement_value),
1437                    schema::liquidation_auctions::last_observed_block
1438                        .eq(auction.last_observed_block),
1439                ))
1440                .execute(conn)?;
1441        }
1442        Ok(())
1443    }
1444
1445    fn update_liquidation_auction_with_conn(
1446        &self,
1447        conn: &mut PgConnection,
1448        auction_id: &str,
1449        update: &UpdateLiquidationAuction,
1450    ) -> Result<()> {
1451        diesel::update(
1452            schema::liquidation_auctions::table
1453                .filter(schema::liquidation_auctions::auction_id.eq(auction_id)),
1454        )
1455        .set(update)
1456        .execute(conn)?;
1457
1458        Ok(())
1459    }
1460
1461    fn get_liquidation_auction_with_conn(
1462        &self,
1463        conn: &mut PgConnection,
1464        auction_id: &str,
1465    ) -> Result<Option<LiquidationAuctionRecord>> {
1466        let result = schema::liquidation_auctions::table
1467            .filter(schema::liquidation_auctions::auction_id.eq(auction_id))
1468            .first::<LiquidationAuctionRecord>(conn)
1469            .optional()?;
1470
1471        Ok(result)
1472    }
1473
1474    /// Update directive outbox status from a transaction lifecycle event.
1475    /// Withdrawal failures are not auto-refunded here. Until withdrawal proofs
1476    /// are user-driven, terminal withdrawal states require manual reconciliation.
1477    fn persist_transaction_update(
1478        conn: &mut PgConnection,
1479        tx_update: &hypercall_types::engine_messages::TransactionUpdate,
1480    ) -> Result<()> {
1481        use diesel::sql_types::{Nullable, Text};
1482        use hypercall_types::engine_messages::TransactionStatus;
1483
1484        let (domain_status, delivery_status) = match tx_update.status {
1485            TransactionStatus::Submitted => (None, Some("pending")),
1486            TransactionStatus::Pending => (None, Some("pending")),
1487            TransactionStatus::Confirmed => (Some("completed"), Some("finalized")),
1488            TransactionStatus::Failed => (Some("failed"), Some("reverted")),
1489            TransactionStatus::Expired => (Some("failed"), Some("expired")),
1490        };
1491
1492        if let Some(domain_status) = domain_status {
1493            let delivery_error = tx_update.error.as_deref();
1494
1495            #[derive(diesel::QueryableByName)]
1496            struct ActionKeyRow {
1497                #[diesel(sql_type = crate::engine_enums::DirectiveActionKeySql)]
1498                action_key: crate::engine_enums::DirectiveActionKeyDb,
1499            }
1500
1501            let updated_row: Option<ActionKeyRow> = diesel::sql_query(
1502                r#"
1503                WITH updated_outbox AS (
1504                    UPDATE directive_outbox
1505                    SET domain_status = $2,
1506                        delivery_status = $3,
1507                        tx_hash = COALESCE($4, tx_hash),
1508                        last_delivery_error = $5,
1509                        updated_at = CURRENT_TIMESTAMP
1510                    WHERE directive_id = $1
1511                      AND delivery_status NOT IN ('finalized', 'reverted', 'expired', 'dead_lettered')
1512                    RETURNING directive_id, action_key
1513                )
1514                SELECT action_key FROM updated_outbox
1515                "#,
1516            )
1517            .bind::<Text, _>(&tx_update.request_id)
1518            .bind::<Text, _>(domain_status)
1519            .bind::<Text, _>(delivery_status.expect("terminal update must have delivery status"))
1520            .bind::<Nullable<Text>, _>(tx_update.tx_hash.as_deref())
1521            .bind::<Nullable<Text>, _>(delivery_error)
1522            .get_result(conn)
1523            .optional()?;
1524
1525            if matches!(
1526                tx_update.status,
1527                TransactionStatus::Failed | TransactionStatus::Expired
1528            ) {
1529                if let Some(row) = updated_row {
1530                    let action_key: hypercall_types::directives::ActionKey = row.action_key.into();
1531                    if matches!(
1532                        action_key,
1533                        hypercall_types::directives::ActionKey::SystemWithdrawToken
1534                            | hypercall_types::directives::ActionKey::SystemCreditOption
1535                    ) {
1536                        metrics::counter!(
1537                            "ht_withdrawal_manual_reconciliation_required_total",
1538                            "action_key" => action_key.as_str(),
1539                        )
1540                        .increment(1);
1541                        tracing::warn!(
1542                            directive_id = %tx_update.request_id,
1543                            action_key = ?action_key,
1544                            "Withdrawal directive reached terminal delivery status; automatic refunds are disabled and operator reconciliation is required"
1545                        );
1546                    }
1547                }
1548            }
1549        } else if let Some(delivery_status) = delivery_status {
1550            diesel::sql_query(
1551                r#"
1552                UPDATE directive_outbox
1553                    SET delivery_status = $2,
1554                        tx_hash = COALESCE($3, tx_hash),
1555                    last_delivery_error = $4,
1556                    updated_at = CURRENT_TIMESTAMP
1557                WHERE directive_id = $1
1558                  AND delivery_status NOT IN ('finalized', 'reverted', 'expired', 'dead_lettered')
1559                "#,
1560            )
1561            .bind::<Text, _>(&tx_update.request_id)
1562            .bind::<Text, _>(delivery_status)
1563            .bind::<Nullable<Text>, _>(tx_update.tx_hash.as_deref())
1564            .bind::<Nullable<Text>, _>(tx_update.error.as_deref())
1565            .execute(conn)?;
1566        }
1567
1568        Ok(())
1569    }
1570}
1571
1572#[cfg(test)]
1573#[path = "event_handler_tests.rs"]
1574mod tests;