1use anyhow::{anyhow, Result};
9use diesel::pg::PgConnection;
10use diesel::prelude::*;
11use diesel::sql_types::{BigInt, Numeric};
12use diesel::OptionalExtension;
13use rust_decimal::Decimal;
14
15use hypercall_types::engine_messages::{
16 EngineMessage, LiquidationStateMessage, LiquidationStateType,
17};
18use hypercall_types::liquidation_state::{AccountLiquidationStatus, LiquidationState};
19use hypercall_types::{
20 utils::is_option_symbol, MarketUpdateStatus, OptionType, OrderUpdateStatus, WalletAddress,
21};
22
23use crate::database_handler::{current_option_token_deployment, DatabaseHandler};
24use crate::models::*;
25use crate::order_status_materialization::upsert_materialized_order_state;
26use crate::schema;
27
28pub struct ExpiredOrderCancel {
34 pub order_id: u64,
35 pub wallet: WalletAddress,
36 pub symbol: String,
37 pub price: Decimal,
38 pub size: Decimal,
39 pub side: hypercall_types::Side,
40 pub tif: hypercall_types::TimeInForce,
41 pub is_perp: bool,
42 pub underlying: Option<String>,
43 pub reduce_only: Option<bool>,
44 pub nonce: Option<u64>,
45 pub signature: Option<String>,
46 pub mmp_enabled: bool,
47 pub filled_size: Decimal,
48 pub client_id: Option<String>,
49 pub timestamp: u64,
50}
51
52pub fn status_to_liquidation_state_record(
63 status: &AccountLiquidationStatus,
64) -> hypercall_db::LiquidationStateRecord {
65 use serde_json::Value as JsonValue;
66
67 fn json_string_values(values: &[String]) -> Vec<JsonValue> {
68 values.iter().cloned().map(JsonValue::String).collect()
69 }
70
71 let mut record = hypercall_db::LiquidationStateRecord {
72 wallet_address: status.wallet,
73 state: status.state.db_str().to_string(),
74 margin_mode: status.margin_mode.as_str().to_string(),
75 equity: status.equity,
76 mm_required: status.mm_required,
77 maintenance_margin: status.maintenance_margin,
78 liquidation_mode: status
79 .liquidation_mode()
80 .map(|mode| mode.as_str().to_string()),
81 target_equity: None,
82 entered_pre_liq_at: None,
83 mm_shortfall: None,
84 escalation_deadline: None,
85 last_reprice_at: None,
86 partial_order_request_ids: None,
87 partial_order_client_ids: None,
88 partial_bonus_bps: None,
89 auction_id: None,
90 request_id: None,
91 tx_hash: None,
92 auction_started_at: None,
93 chain_start_time: None,
94 margin_needed: None,
95 stop_request_id: None,
96 stop_tx_hash: None,
97 liquidated_at: None,
98 resolved_winner: None,
99 resolved_bonus: None,
100 resolution_tx_hash: None,
101 last_observed_block: None,
102 updated_at_ms: Some(status.updated_at as i64),
103 created_at: None,
104 updated_at: None,
105 };
106
107 match &status.state {
108 LiquidationState::Healthy => {}
109 LiquidationState::PreLiquidation(metadata) => {
110 record.target_equity = Some(metadata.target_equity);
111 record.entered_pre_liq_at = Some(metadata.entered_at as i64);
112 record.mm_shortfall = Some(metadata.mm_shortfall);
113 record.escalation_deadline = Some(metadata.escalation_deadline as i64);
114 record.last_reprice_at = metadata.last_reprice_at.map(|value| value as i64);
115 record.partial_order_request_ids = Some(JsonValue::Array(json_string_values(
116 &metadata.active_order_request_ids,
117 )));
118 record.partial_order_client_ids = Some(JsonValue::Array(json_string_values(
119 &metadata.active_order_client_ids,
120 )));
121 record.partial_bonus_bps = Some(metadata.bonus_bps as i32);
122 record.auction_id = metadata.pending_full_auction_id.clone();
123 record.request_id = metadata.pending_full_request_id.clone();
124 record.tx_hash = metadata.pending_full_tx_hash.clone();
125 record.margin_needed = metadata.pending_full_margin_needed;
126 }
127 LiquidationState::InLiquidation(metadata) => {
128 record.auction_id = Some(metadata.auction_id.clone());
129 record.request_id = metadata.request_id.clone();
130 record.tx_hash = metadata.tx_hash.clone();
131 record.auction_started_at = Some(metadata.started_at as i64);
132 record.chain_start_time = metadata.chain_start_time.map(|value| value as i64);
133 record.margin_needed = Some(metadata.margin_needed);
134 record.stop_request_id = metadata.stop_request_id.clone();
135 record.stop_tx_hash = metadata.stop_tx_hash.clone();
136 }
137 LiquidationState::Liquidated(metadata) => {
138 record.auction_id = Some(metadata.auction_id.clone());
139 record.liquidated_at = Some(metadata.completed_at as i64);
140 record.resolved_winner = metadata.winner;
141 record.resolved_bonus = Some(metadata.bonus);
142 record.resolution_tx_hash = metadata.tx_hash.clone();
143 }
144 }
145
146 record
147}
148
149fn new_liquidation_state_from_domain(
151 d: &hypercall_db::LiquidationStateRecord,
152) -> NewLiquidationState {
153 NewLiquidationState {
154 wallet_address: d.wallet_address,
155 state: d.state.clone(),
156 margin_mode: d.margin_mode.clone(),
157 equity: d.equity,
158 mm_required: d.mm_required,
159 maintenance_margin: d.maintenance_margin,
160 liquidation_mode: d.liquidation_mode.clone(),
161 target_equity: d.target_equity,
162 entered_pre_liq_at: d.entered_pre_liq_at,
163 mm_shortfall: d.mm_shortfall,
164 escalation_deadline: d.escalation_deadline,
165 last_reprice_at: d.last_reprice_at,
166 partial_order_request_ids: d.partial_order_request_ids.clone(),
167 partial_order_client_ids: d.partial_order_client_ids.clone(),
168 partial_bonus_bps: d.partial_bonus_bps,
169 auction_id: d.auction_id.clone(),
170 request_id: d.request_id.clone(),
171 tx_hash: d.tx_hash.clone(),
172 auction_started_at: d.auction_started_at,
173 chain_start_time: d.chain_start_time,
174 margin_needed: d.margin_needed,
175 stop_request_id: d.stop_request_id.clone(),
176 stop_tx_hash: d.stop_tx_hash.clone(),
177 liquidated_at: d.liquidated_at,
178 resolved_winner: d.resolved_winner,
179 resolved_bonus: d.resolved_bonus,
180 resolution_tx_hash: d.resolution_tx_hash.clone(),
181 last_observed_block: d.last_observed_block,
182 updated_at_ms: d.updated_at_ms,
183 }
184}
185
186fn liquidation_history_from_message(
187 message: &LiquidationStateMessage,
188) -> Result<NewLiquidationHistory> {
189 let (request_id, tx_hash, margin_needed, winner_address, bonus) = match &message.status.state {
190 LiquidationState::Healthy => (None, None, None, None, None),
191 LiquidationState::PreLiquidation(metadata) => (
192 metadata.pending_full_request_id.clone(),
193 metadata.pending_full_tx_hash.clone(),
194 metadata.pending_full_margin_needed,
195 None,
196 None,
197 ),
198 LiquidationState::InLiquidation(metadata) => (
199 metadata.request_id.clone(),
200 metadata.tx_hash.clone(),
201 Some(metadata.margin_needed),
202 None,
203 None,
204 ),
205 LiquidationState::Liquidated(metadata) => (
206 None,
207 metadata.tx_hash.clone(),
208 None,
209 metadata.winner,
210 Some(metadata.bonus),
211 ),
212 };
213
214 let details = serde_json::to_value(&message.status)
215 .map_err(|e| anyhow!("failed to serialize liquidation status snapshot: {}", e))?;
216
217 Ok(NewLiquidationHistory {
218 wallet_address: message.wallet,
219 previous_state: message.previous_state.to_string(),
220 new_state: message.new_state.to_string(),
221 liquidation_mode: message.liquidation_mode.clone(),
222 equity: message.equity,
223 mm_required: message.mm_required,
224 maintenance_margin: message.maintenance_margin,
225 shortfall: message.shortfall,
226 auction_id: message.auction_id.clone(),
227 request_id,
228 tx_hash,
229 margin_needed,
230 winner_address,
231 bonus,
232 details,
233 timestamp: i64::try_from(message.timestamp).map_err(|_| {
234 anyhow!(
235 "liquidation timestamp {} exceeds i64 range",
236 message.timestamp
237 )
238 })?,
239 })
240}
241
242fn liquidation_auction_from_status(
243 status: &AccountLiquidationStatus,
244) -> Result<Option<NewLiquidationAuction>> {
245 match &status.state {
246 LiquidationState::PreLiquidation(metadata) => {
247 let Some(auction_id) = metadata.pending_full_auction_id.clone() else {
248 return Ok(None);
249 };
250 Ok(Some(NewLiquidationAuction {
251 auction_id,
252 wallet_address: status.wallet,
253 status: "pending".to_string(),
254 positions: serde_json::Value::Array(Vec::new()),
255 equity_at_start: status.equity,
256 mm_shortfall_at_start: status.shortfall(),
257 target_equity: Some(metadata.target_equity),
258 request_id: metadata.pending_full_request_id.clone(),
259 tx_hash: metadata.pending_full_tx_hash.clone(),
260 started_at: i64::try_from(metadata.entered_at).map_err(|_| {
261 anyhow!(
262 "pre-liquidation entered_at {} exceeds i64 range",
263 metadata.entered_at
264 )
265 })?,
266 chain_start_time: None,
267 margin_needed: metadata.pending_full_margin_needed,
268 stop_request_id: None,
269 stop_tx_hash: None,
270 completed_at: None,
271 liquidator_address: None,
272 bonus: None,
273 settlement_value: None,
274 last_observed_block: None,
275 }))
276 }
277 LiquidationState::InLiquidation(metadata) => Ok(Some(NewLiquidationAuction {
278 auction_id: metadata.auction_id.clone(),
279 wallet_address: status.wallet,
280 status: "active".to_string(),
281 positions: serde_json::Value::Array(Vec::new()),
282 equity_at_start: status.equity,
283 mm_shortfall_at_start: status.shortfall(),
284 target_equity: None,
285 request_id: metadata.request_id.clone(),
286 tx_hash: metadata.tx_hash.clone(),
287 started_at: i64::try_from(metadata.started_at).map_err(|_| {
288 anyhow!(
289 "auction started_at {} exceeds i64 range",
290 metadata.started_at
291 )
292 })?,
293 chain_start_time: metadata
294 .chain_start_time
295 .map(i64::try_from)
296 .transpose()
297 .map_err(|_| anyhow!("chain_start_time exceeds i64 range"))?,
298 margin_needed: Some(metadata.margin_needed),
299 stop_request_id: metadata.stop_request_id.clone(),
300 stop_tx_hash: metadata.stop_tx_hash.clone(),
301 completed_at: None,
302 liquidator_address: None,
303 bonus: None,
304 settlement_value: None,
305 last_observed_block: None,
306 })),
307 LiquidationState::Liquidated(metadata) => Ok(Some(NewLiquidationAuction {
308 auction_id: metadata.auction_id.clone(),
309 wallet_address: status.wallet,
310 status: "completed".to_string(),
311 positions: serde_json::Value::Array(Vec::new()),
312 equity_at_start: status.equity,
313 mm_shortfall_at_start: status.shortfall(),
314 target_equity: None,
315 request_id: None,
316 tx_hash: None,
317 started_at: 0,
323 chain_start_time: None,
324 margin_needed: None,
325 stop_request_id: None,
326 stop_tx_hash: None,
327 completed_at: Some(i64::try_from(metadata.completed_at).map_err(|_| {
328 anyhow!(
329 "liquidated completed_at {} exceeds i64 range",
330 metadata.completed_at
331 )
332 })?),
333 liquidator_address: metadata.winner,
334 bonus: Some(metadata.bonus),
335 settlement_value: None,
336 last_observed_block: None,
337 })),
338 _ => Ok(None),
339 }
340}
341
342fn liquidation_auction_update_from_message(
343 message: &LiquidationStateMessage,
344) -> Result<Option<UpdateLiquidationAuction>> {
345 if message.auction_id.is_none() {
346 return Ok(None);
347 }
348
349 let cancelled_update = || -> Result<Option<UpdateLiquidationAuction>> {
350 Ok(Some(UpdateLiquidationAuction {
351 status: Some("cancelled".to_string()),
352 request_id: None,
353 tx_hash: None,
354 chain_start_time: None,
355 margin_needed: None,
356 stop_request_id: None,
357 stop_tx_hash: None,
358 completed_at: Some(
359 i64::try_from(message.timestamp)
360 .map_err(|_| anyhow!("liquidation timestamp exceeds i64 range"))?,
361 ),
362 liquidator_address: None,
363 bonus: None,
364 settlement_value: None,
365 last_observed_block: None,
366 }))
367 };
368
369 match (&message.previous_state, &message.new_state) {
370 (LiquidationStateType::InLiquidation, LiquidationStateType::Healthy) => cancelled_update(),
371 (LiquidationStateType::PreLiquidation, LiquidationStateType::PreLiquidation)
376 if message.previous_liquidation_mode.as_deref() == Some("full")
377 && message.liquidation_mode.as_deref() != Some("full") =>
378 {
379 cancelled_update()
380 }
381 _ => Ok(None),
382 }
383}
384
385pub fn persist_fill_with_side_effects_in_tx(
395 conn: &mut PgConnection,
396 fill: &hypercall_types::Fill,
397 side_effects: &FillSideEffect,
398) -> Result<(bool, bool)> {
399 persist_fill_with_side_effects_in_tx_with_validation(
400 conn,
401 fill,
402 side_effects,
403 FillUnderlyingNotionalValidation::Strict,
404 )
405}
406
407pub fn persist_legacy_replay_fill_with_side_effects_in_tx(
410 conn: &mut PgConnection,
411 fill: &hypercall_types::Fill,
412 side_effects: &FillSideEffect,
413) -> Result<(bool, bool)> {
414 persist_fill_with_side_effects_in_tx_with_validation(
415 conn,
416 fill,
417 side_effects,
418 FillUnderlyingNotionalValidation::LegacyReplay,
419 )
420}
421
422fn persist_fill_with_side_effects_in_tx_with_validation(
423 conn: &mut PgConnection,
424 fill: &hypercall_types::Fill,
425 side_effects: &FillSideEffect,
426 validation: FillUnderlyingNotionalValidation,
427) -> Result<(bool, bool)> {
428 use diesel::sql_types::{BigInt, Binary, Bool, Text};
429 use rust_decimal_macros::dec;
430
431 assert_eq!(
432 side_effects.trade_id, fill.trade_id,
433 "CRITICAL: journal fill side effects trade_id mismatch for fill {}",
434 fill.trade_id
435 );
436 let underlying_notional =
437 validate_fill_underlying_notional(fill, side_effects.underlying_notional, validation)?;
438 let taker_realized_pnl = fill
439 .taker_realized_pnl
440 .unwrap_or(side_effects.taker_ledger_delta);
441 let maker_realized_pnl = fill
442 .maker_realized_pnl
443 .unwrap_or(side_effects.maker_ledger_delta);
444
445 diesel::sql_query(
446 "INSERT INTO trades (trade_id, symbol, price, size, maker_address, taker_address, maker_fee, taker_fee, timestamp)
447 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
448 ON CONFLICT (trade_id) DO NOTHING",
449 )
450 .bind::<BigInt, _>(fill.trade_id as i64)
451 .bind::<Text, _>(&fill.symbol)
452 .bind::<Numeric, _>(fill.price)
453 .bind::<Numeric, _>(fill.size)
454 .bind::<Binary, _>(&fill.maker_wallet_address)
455 .bind::<Binary, _>(&fill.taker_wallet_address)
456 .bind::<Numeric, _>(dec!(0))
457 .bind::<Numeric, _>(fill.fee)
458 .bind::<BigInt, _>(fill.timestamp as i64)
459 .execute(conn)?;
460
461 let taker_rows: usize = diesel::sql_query(
462 "INSERT INTO fills (trade_id, wallet_address, symbol, price, size, fee, is_taker, timestamp, builder_code_address, builder_code_fee, realized_pnl, underlying_notional)
463 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
464 ON CONFLICT (trade_id, wallet_address, is_taker) DO NOTHING",
465 )
466 .bind::<BigInt, _>(fill.trade_id as i64)
467 .bind::<Binary, _>(&fill.taker_wallet_address)
468 .bind::<Text, _>(&fill.symbol)
469 .bind::<Numeric, _>(fill.price)
470 .bind::<Numeric, _>(fill.size)
471 .bind::<Numeric, _>(fill.fee)
472 .bind::<Bool, _>(true)
473 .bind::<BigInt, _>(fill.timestamp as i64)
474 .bind::<diesel::sql_types::Nullable<Binary>, _>(fill.builder_code_address.as_ref())
475 .bind::<diesel::sql_types::Nullable<Numeric>, _>(fill.builder_code_fee)
476 .bind::<diesel::sql_types::Nullable<Numeric>, _>(Some(taker_realized_pnl))
477 .bind::<diesel::sql_types::Nullable<Numeric>, _>(underlying_notional)
478 .execute(conn)?;
479 let taker_inserted = taker_rows > 0;
480
481 if taker_inserted {
482 apply_fill_ledger_side_effects(
483 conn,
484 &fill.taker_wallet_address,
485 fill,
486 side_effects.taker_ledger_delta,
487 side_effects.taker_premium_delta,
488 )?;
489 }
490
491 let maker_rows: usize = diesel::sql_query(
492 "INSERT INTO fills (trade_id, wallet_address, symbol, price, size, fee, is_taker, timestamp, builder_code_address, builder_code_fee, realized_pnl, underlying_notional)
493 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
494 ON CONFLICT (trade_id, wallet_address, is_taker) DO NOTHING",
495 )
496 .bind::<BigInt, _>(fill.trade_id as i64)
497 .bind::<Binary, _>(&fill.maker_wallet_address)
498 .bind::<Text, _>(&fill.symbol)
499 .bind::<Numeric, _>(fill.price)
500 .bind::<Numeric, _>(fill.size)
501 .bind::<Numeric, _>(dec!(0))
502 .bind::<Bool, _>(false)
503 .bind::<BigInt, _>(fill.timestamp as i64)
504 .bind::<diesel::sql_types::Nullable<Binary>, _>(
505 Option::<hypercall_types::WalletAddress>::None,
506 )
507 .bind::<diesel::sql_types::Nullable<Numeric>, _>(Option::<Decimal>::None)
508 .bind::<diesel::sql_types::Nullable<Numeric>, _>(Some(maker_realized_pnl))
509 .bind::<diesel::sql_types::Nullable<Numeric>, _>(underlying_notional)
510 .execute(conn)?;
511 let maker_inserted = maker_rows > 0;
512
513 if maker_inserted {
514 apply_fill_ledger_side_effects(
515 conn,
516 &fill.maker_wallet_address,
517 fill,
518 side_effects.maker_ledger_delta,
519 side_effects.maker_premium_delta,
520 )?;
521 }
522
523 Ok((taker_inserted, maker_inserted))
524}
525
526fn apply_fill_ledger_side_effects(
527 conn: &mut PgConnection,
528 wallet: &WalletAddress,
529 fill: &hypercall_types::Fill,
530 realized_delta: Decimal,
531 premium_delta: Decimal,
532) -> Result<()> {
533 use diesel::sql_types::{BigInt, Binary, Text};
534
535 let _total_delta = realized_delta + premium_delta;
536
537 if realized_delta != Decimal::ZERO {
538 let inserted = diesel::sql_query(
539 "INSERT INTO ledger_events
540 (wallet, event_ts_ms, delta, event_type, reference_trade_id, reference_symbol)
541 VALUES ($1, $2, $3, $4, $5, $6)
542 ON CONFLICT (wallet, reference_trade_id, event_type)
543 WHERE reference_trade_id IS NOT NULL
544 DO NOTHING",
545 )
546 .bind::<Binary, _>(wallet)
547 .bind::<BigInt, _>(fill.timestamp as i64)
548 .bind::<Numeric, _>(realized_delta)
549 .bind::<Text, _>("fill_realized_pnl")
550 .bind::<BigInt, _>(fill.trade_id as i64)
551 .bind::<diesel::sql_types::Nullable<Text>, _>(Some(fill.symbol.as_str()))
552 .execute(conn)?;
553 assert_eq!(
554 inserted, 1,
555 "CRITICAL: missing fill_realized_pnl ledger event for trade {} wallet {}",
556 fill.trade_id, wallet
557 );
558 }
559
560 if premium_delta != Decimal::ZERO {
561 let inserted = diesel::sql_query(
562 "INSERT INTO ledger_events
563 (wallet, event_ts_ms, delta, event_type, reference_trade_id, reference_symbol)
564 VALUES ($1, $2, $3, $4, $5, $6)
565 ON CONFLICT (wallet, reference_trade_id, event_type)
566 WHERE reference_trade_id IS NOT NULL
567 DO NOTHING",
568 )
569 .bind::<Binary, _>(wallet)
570 .bind::<BigInt, _>(fill.timestamp as i64)
571 .bind::<Numeric, _>(premium_delta)
572 .bind::<Text, _>("fill_premium")
573 .bind::<BigInt, _>(fill.trade_id as i64)
574 .bind::<diesel::sql_types::Nullable<Text>, _>(Some(fill.symbol.as_str()))
575 .execute(conn)?;
576 assert_eq!(
577 inserted, 1,
578 "CRITICAL: missing fill_premium ledger event for trade {} wallet {}",
579 fill.trade_id, wallet
580 );
581 }
582
583 Ok(())
584}
585
586pub struct FillSideEffect {
595 pub trade_id: u64,
596 pub taker_ledger_delta: Decimal,
597 pub maker_ledger_delta: Decimal,
598 pub taker_premium_delta: Decimal,
599 pub maker_premium_delta: Decimal,
600 pub underlying_notional: Option<Decimal>,
601}
602
603#[derive(Debug, Clone, Copy, PartialEq, Eq)]
604enum FillUnderlyingNotionalValidation {
605 Strict,
606 LegacyReplay,
607}
608
609fn validate_fill_underlying_notional(
610 fill: &hypercall_types::Fill,
611 side_effect_underlying_notional: Option<Decimal>,
612 validation: FillUnderlyingNotionalValidation,
613) -> Result<Option<Decimal>> {
614 if !is_option_symbol(&fill.symbol) {
615 return Ok(fill.underlying_notional.or(side_effect_underlying_notional));
616 }
617
618 if validation == FillUnderlyingNotionalValidation::LegacyReplay {
619 if let (Some(fill_notional), Some(side_effect_notional)) =
620 (fill.underlying_notional, side_effect_underlying_notional)
621 {
622 anyhow::ensure!(
623 fill_notional == side_effect_notional,
624 "CRITICAL: option fill underlying_notional mismatch for trade {}: fill={} side_effect={}",
625 fill.trade_id,
626 fill_notional,
627 side_effect_notional
628 );
629 }
630 return Ok(fill.underlying_notional.or(side_effect_underlying_notional));
631 }
632
633 let fill_notional = fill.underlying_notional.ok_or_else(|| {
634 anyhow!(
635 "CRITICAL: option fill {} for trade {} is missing underlying_notional",
636 fill.symbol,
637 fill.trade_id
638 )
639 })?;
640 let side_effect_notional = side_effect_underlying_notional.ok_or_else(|| {
641 anyhow!(
642 "CRITICAL: option fill side effects for trade {} are missing underlying_notional",
643 fill.trade_id
644 )
645 })?;
646 anyhow::ensure!(
647 fill_notional == side_effect_notional,
648 "CRITICAL: option fill underlying_notional mismatch for trade {}: fill={} side_effect={}",
649 fill.trade_id,
650 fill_notional,
651 side_effect_notional
652 );
653 Ok(Some(fill_notional))
654}
655
656impl FillSideEffect {
657 pub fn from_fill_accounting(accounting: &hypercall_types::FillAccounting) -> Self {
658 accounting.assert_cash_decomposition();
659 Self {
660 trade_id: accounting.trade_id,
661 taker_ledger_delta: accounting.taker_ledger_residual_delta(),
662 maker_ledger_delta: accounting.maker_ledger_residual_delta(),
663 taker_premium_delta: accounting.taker_premium_delta(),
664 maker_premium_delta: accounting.maker_premium_delta(),
665 underlying_notional: None,
666 }
667 }
668}
669
670impl DatabaseHandler {
675 pub(crate) fn upsert_order_info(conn: &mut PgConnection, info: &NewOrderInfo) -> Result<()> {
678 use diesel::upsert::excluded;
679
680 diesel::insert_into(schema::order_infos::table)
681 .values(info)
682 .on_conflict(schema::order_infos::order_id)
683 .do_update()
684 .set((
685 schema::order_infos::wallet_address
686 .eq(excluded(schema::order_infos::wallet_address)),
687 schema::order_infos::symbol.eq(excluded(schema::order_infos::symbol)),
688 schema::order_infos::side.eq(excluded(schema::order_infos::side)),
689 schema::order_infos::price.eq(excluded(schema::order_infos::price)),
690 schema::order_infos::size.eq(excluded(schema::order_infos::size)),
691 schema::order_infos::tif.eq(excluded(schema::order_infos::tif)),
692 schema::order_infos::client_id.eq(excluded(schema::order_infos::client_id)),
693 schema::order_infos::is_perp.eq(excluded(schema::order_infos::is_perp)),
694 schema::order_infos::underlying.eq(excluded(schema::order_infos::underlying)),
695 schema::order_infos::reduce_only.eq(excluded(schema::order_infos::reduce_only)),
696 schema::order_infos::nonce.eq(excluded(schema::order_infos::nonce)),
697 schema::order_infos::signature.eq(excluded(schema::order_infos::signature)),
698 schema::order_infos::mmp_enabled.eq(excluded(schema::order_infos::mmp_enabled)),
699 schema::order_infos::timestamp.eq(excluded(schema::order_infos::timestamp)),
700 ))
701 .execute(conn)?;
702 Ok(())
703 }
704
705 pub(crate) fn insert_order_action(
708 conn: &mut PgConnection,
709 action: &NewOrderAction,
710 ) -> Result<()> {
711 diesel::insert_into(schema::order_actions::table)
712 .values(action)
713 .execute(conn)?;
714 Ok(())
715 }
716
717 pub fn handle_event_sync(&self, event: &EngineMessage) -> Result<()> {
719 let mut conn = self.pool().get()?;
720 self.handle_event_with_conn(&mut conn, event)
721 }
722
723 pub fn handle_event_batch_sync(&self, events: &[EngineMessage]) -> Result<()> {
725 let mut conn = self.pool().get()?;
726
727 (*conn).transaction::<_, anyhow::Error, _>(|conn| {
728 for event in events {
729 self.handle_event_with_conn(conn, event)?;
730 }
731 Ok(())
732 })?;
733
734 Ok(())
735 }
736
737 pub fn handle_event_with_conn(
739 &self,
740 conn: &mut PgConnection,
741 event: &EngineMessage,
742 ) -> Result<()> {
743 match event {
744 EngineMessage::OrderInfo(order_info_msg) => {
745 let new_order_info = NewOrderInfo {
746 order_id: order_info_msg.order_id as i64,
747 wallet_address: order_info_msg.wallet,
748 symbol: order_info_msg.info.symbol.clone(),
749 side: format!("{:?}", order_info_msg.info.side),
750 price: order_info_msg.info.price,
751 size: order_info_msg.info.size,
752 tif: format!("{:?}", order_info_msg.info.tif),
753 client_id: order_info_msg.info.client_id.clone(),
754 is_perp: order_info_msg.info.is_perp,
755 underlying: order_info_msg.info.underlying.clone(),
756 reduce_only: order_info_msg.info.reduce_only,
757 nonce: order_info_msg.info.nonce.map(|n| n as i64),
758 signature: order_info_msg.info.signature.clone(),
759 mmp_enabled: order_info_msg.info.mmp_enabled,
760 timestamp: order_info_msg.timestamp as i64,
761 status: "ACKED".to_string(),
762 filled_size: Decimal::ZERO,
763 last_materialized_order_update_id: 0,
764 };
765 Self::upsert_order_info(conn, &new_order_info)?;
766 }
767 EngineMessage::OrderAction(action) => {
768 let new_action = NewOrderAction {
769 timestamp: action.timestamp as i64,
770 wallet: action.wallet,
771 action: format!("{:?}", action.action),
772 symbol: action.info.symbol.clone(),
773 price: action.info.price,
774 size: action.info.size,
775 side: format!("{:?}", action.info.side),
776 tif: format!("{:?}", action.info.tif),
777 client_id: action.info.client_id.clone(),
778 };
779 Self::insert_order_action(conn, &new_action)?;
780 }
781 EngineMessage::OrderUpdate(update) => {
782 let current_status = Self::order_update_status_to_db(update.status).to_string();
783
784 let new_order_update = NewOrderUpdate {
786 timestamp: update.timestamp as i64,
787 order_id: update.order_id.map(|id| id as i64),
788 status: current_status.clone(),
789 reason: update.reason.clone(),
790 filled_size: update.filled_size,
791 symbol: update.info.symbol.clone(),
792 price: update.info.price,
793 size: update.info.size,
794 side: format!("{:?}", update.info.side),
795 tif: format!("{:?}", update.info.tif),
796 client_id: update.info.client_id.clone(),
797 };
798
799 let order_update_id = diesel::insert_into(schema::order_updates::table)
800 .values(&new_order_update)
801 .returning(schema::order_updates::id)
802 .get_result::<Option<i32>>(conn)?
803 .expect("order_updates.id should be populated after insert");
804
805 if let Some(order_id) = update.order_id {
806 let materialized_order_info = NewOrderInfo {
807 order_id: order_id as i64,
808 wallet_address: update.wallet_address,
809 symbol: update.info.symbol.clone(),
810 side: format!("{:?}", update.info.side),
811 price: update.info.price,
812 size: update.info.size,
813 tif: format!("{:?}", update.info.tif),
814 client_id: update.info.client_id.clone(),
815 is_perp: update.info.is_perp,
816 underlying: update.info.underlying.clone(),
817 reduce_only: update.info.reduce_only,
818 nonce: update.info.nonce.map(|n| n as i64),
819 signature: update.info.signature.clone(),
820 mmp_enabled: update.info.mmp_enabled,
821 timestamp: update.timestamp as i64,
822 status: current_status,
823 filled_size: update.filled_size,
824 last_materialized_order_update_id: order_update_id,
825 };
826
827 upsert_materialized_order_state(conn, &materialized_order_info)?;
828 }
829
830 if let OrderUpdateStatus::Rejected = update.status {
832 let new_rejected = NewRejectedOrder {
833 wallet_address: update.wallet_address,
834 symbol: update.info.symbol.clone(),
835 side: format!("{:?}", update.info.side),
836 price: update.info.price,
837 size: update.info.size,
838 reason: update
839 .reason
840 .clone()
841 .unwrap_or_else(|| "Unknown".to_string()),
842 timestamp: update.timestamp as i64,
843 };
844
845 diesel::insert_into(schema::rejected_orders::table)
846 .values(&new_rejected)
847 .execute(conn)?;
848 }
849 }
850 EngineMessage::OrderFilled { .. } => {
851 }
856 EngineMessage::OrderbookUpdated(_update) => {
857 }
859 EngineMessage::MarketAction(action) => {
860 let new_action = NewMarketAction {
862 timestamp: action.timestamp as i64,
863 action: format!("{:?}", action.action),
864 symbol: action.market.symbol.clone(),
865 underlying: action.market.underlying.clone(),
866 expiry: action.market.expiry as i64,
867 strike: action.market.strike,
868 option_type: match action.market.option_type {
869 OptionType::Call => "call".to_string(),
870 OptionType::Put => "put".to_string(),
871 },
872 };
873
874 diesel::insert_into(schema::market_actions::table)
875 .values(&new_action)
876 .execute(conn)?;
877 }
878 EngineMessage::MarketUpdate(update) => {
879 let new_market_update = NewMarketUpdate {
881 timestamp: update.timestamp as i64,
882 status: format!("{:?}", update.status),
883 symbol: update.market.symbol.clone(),
884 underlying: update.market.underlying.clone(),
885 expiry: update.market.expiry as i64,
886 strike: update.market.strike,
887 option_type: match update.market.option_type {
888 OptionType::Call => "call".to_string(),
889 OptionType::Put => "put".to_string(),
890 },
891 };
892
893 diesel::insert_into(schema::market_updates::table)
894 .values(&new_market_update)
895 .execute(conn)?;
896
897 match update.status {
898 MarketUpdateStatus::MarketCreated | MarketUpdateStatus::MarketAlreadyExists => {
899 let new_market = Market {
901 underlying: update.market.underlying.clone(),
902 expiry: update.market.expiry as i64,
903 };
904
905 diesel::sql_query(
907 "INSERT INTO markets (underlying, expiry) VALUES ($1, $2) ON CONFLICT (underlying, expiry) DO NOTHING"
908 )
909 .bind::<diesel::sql_types::Text, _>(&new_market.underlying)
910 .bind::<diesel::sql_types::BigInt, _>(new_market.expiry)
911 .execute(conn)?;
912
913 let option_type = match update.market.option_type {
915 OptionType::Call => "call",
916 OptionType::Put => "put",
917 };
918
919 let instrument_strike = update.market.strike;
920 let deployment = current_option_token_deployment()?;
921 let option_token_address = hypercall_types::derive_option_token_address(
922 deployment,
923 &update.market.underlying,
924 update.market.expiry,
925 instrument_strike,
926 option_type,
927 )?;
928
929 let new_instrument = Instrument {
930 instrument_numeric_id: 0,
931 id: update.market.symbol.clone(),
932 underlying: update.market.underlying.clone(),
933 strike: instrument_strike,
934 expiry: update.market.expiry as i64,
935 option_type: option_type.to_string(),
936 option_token_address: Some(option_token_address),
937 status: "ACTIVE".to_string(),
938 trading_mode: "orderbook".to_string(),
939 };
940
941 if let Err(err) = diesel::sql_query(
943 "INSERT INTO instruments (id, underlying, strike, expiry, option_type, status, option_token_address, trading_mode) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (id) DO NOTHING"
944 )
945 .bind::<diesel::sql_types::Text, _>(&new_instrument.id)
946 .bind::<diesel::sql_types::Text, _>(&new_instrument.underlying)
947 .bind::<diesel::sql_types::Numeric, _>(instrument_strike)
948 .bind::<diesel::sql_types::BigInt, _>(new_instrument.expiry)
949 .bind::<diesel::sql_types::Text, _>(&new_instrument.option_type)
950 .bind::<diesel::sql_types::Text, _>(&new_instrument.status)
951 .bind::<diesel::sql_types::Bytea, _>(option_token_address)
952 .bind::<diesel::sql_types::Text, _>(&new_instrument.trading_mode)
953 .execute(conn) {
954 Self::observe_diesel_option_token_violation(&err);
955 return Err(err.into());
956 }
957 }
958 MarketUpdateStatus::MarketDeleted | MarketUpdateStatus::MarketExpired => {
959 diesel::delete(schema::instruments::table)
960 .filter(schema::instruments::id.eq(&update.market.symbol))
961 .execute(conn)?;
962 }
963 MarketUpdateStatus::MarketPendingSettlement => {
964 }
967 MarketUpdateStatus::MarketCreationFailed
968 | MarketUpdateStatus::MarketDeletionFailed => {
969 }
972 }
973 }
974 EngineMessage::L2Update(_) => {
975 }
977 EngineMessage::Trade(_trade_msg) => {
978 }
983 EngineMessage::TransactionRequest(tx_req) => {
984 tracing::info!("Persisting transaction request: {}", tx_req.request_id);
986 }
987 EngineMessage::TransactionUpdate(tx_update) => {
988 Self::persist_transaction_update(conn, tx_update)?;
989 tracing::info!(
990 "Persisting transaction update: {} -> {:?}",
991 tx_update.request_id,
992 tx_update.status
993 );
994 }
995 EngineMessage::MmpTriggered(mmp_msg) => {
996 tracing::warn!(
997 "Persisting MMP trigger: wallet={}, currency={}, reason={}",
998 mmp_msg.wallet,
999 mmp_msg.currency,
1000 mmp_msg.reason
1001 );
1002 }
1003 EngineMessage::PositionExpired(expiry_msg) => {
1004 let intent =
1007 hypercall_settlement::SettlementPersistenceIntent::from_position_expired(
1008 expiry_msg,
1009 )
1010 .map_err(|e| anyhow!("{}", e))?;
1011 let context = format!("{}/{}", expiry_msg.wallet_address, expiry_msg.symbol);
1012 let settlement_entry_price = intent
1013 .economics
1014 .map(|economics| economics.settlement_entry_price);
1015 let cost_basis = intent.economics.map(|economics| economics.cost_basis);
1016 let net_pnl = intent.economics.map(|economics| economics.net_pnl);
1017
1018 let new_expiration = NewPositionExpiration {
1019 wallet: intent.wallet,
1020 symbol: intent.symbol.clone(),
1021 expiry_ts: intent.expiry_ts,
1022 settlement_price: intent.settlement_price,
1023 settlement_value: intent.settlement_value,
1024 };
1025
1026 let payout = NewSettlementPayout {
1027 wallet: intent.wallet,
1028 symbol: intent.symbol.clone(),
1029 expiry_ts: intent.expiry_ts,
1030 position_size: intent.position_size,
1031 settlement_price: intent.settlement_price,
1032 payout_amount: intent.settlement_value,
1033 ledger_applied: true,
1034 settlement_entry_price,
1035 cost_basis,
1036 net_pnl,
1037 };
1038
1039 let settlement_cashflow =
1040 crate::settlement_ops::settlement_ledger_delta_for_margin_mode(
1041 intent.margin_mode,
1042 intent.settlement_value,
1043 net_pnl,
1044 &context,
1045 )?;
1046
1047 let outcome = conn.transaction::<_, anyhow::Error, _>(|tx_conn| {
1048 crate::settlement_ops::claim_settlement_in_tx(
1049 tx_conn,
1050 &new_expiration,
1051 &payout,
1052 settlement_cashflow,
1053 expiry_msg.timestamp as i64,
1054 )
1055 })?;
1056
1057 if !outcome.newly_persisted {
1058 tracing::info!(
1059 "Settlement already applied for {}/{}, skipping (idempotency)",
1060 expiry_msg.wallet_address,
1061 expiry_msg.symbol
1062 );
1063 } else {
1064 tracing::info!(
1065 "Settlement persisted: wallet={}, symbol={}, size={}, payout={}",
1066 expiry_msg.wallet_address,
1067 expiry_msg.symbol,
1068 expiry_msg.position_size,
1069 expiry_msg.settlement_value
1070 );
1071 }
1072 }
1073 EngineMessage::TierUpdate(_tier_msg) => {
1074 }
1078 EngineMessage::HypercorePositionUpdate(update) => {
1079 tracing::debug!(
1081 "DieselHandler: Hypercore position update - account={}, coin={}, size={}",
1082 update.account,
1083 update.coin,
1084 update.size
1085 );
1086 }
1087 EngineMessage::LiquidationStateChange(liq_msg) => {
1088 if let Some(mut auction_row) = liquidation_auction_from_status(&liq_msg.status)? {
1089 if let Some(existing_auction) =
1090 self.get_liquidation_auction_with_conn(conn, &auction_row.auction_id)?
1091 {
1092 if auction_row.stop_request_id.is_none() {
1093 auction_row.stop_request_id = existing_auction.stop_request_id.clone();
1094 }
1095 if auction_row.stop_tx_hash.is_none() {
1096 auction_row.stop_tx_hash = existing_auction.stop_tx_hash.clone();
1097 }
1098 if auction_row.last_observed_block.is_none() {
1099 auction_row.last_observed_block = existing_auction.last_observed_block;
1100 }
1101 if auction_row.target_equity.is_none() {
1104 auction_row.target_equity = existing_auction.target_equity;
1105 }
1106
1107 if matches!(liq_msg.status.state, LiquidationState::Liquidated(..)) {
1108 if auction_row.request_id.is_none() {
1109 auction_row.request_id = existing_auction.request_id;
1110 }
1111 if auction_row.tx_hash.is_none() {
1112 auction_row.tx_hash = existing_auction.tx_hash;
1113 }
1114 auction_row.started_at = existing_auction.started_at;
1115 if auction_row.chain_start_time.is_none() {
1116 auction_row.chain_start_time = existing_auction.chain_start_time;
1117 }
1118 if auction_row.margin_needed.is_none() {
1119 auction_row.margin_needed = existing_auction.margin_needed;
1120 }
1121 if auction_row.settlement_value.is_none() {
1122 auction_row.settlement_value = existing_auction.settlement_value;
1123 }
1124 }
1125 }
1126 self.upsert_liquidation_auction_with_conn(conn, &auction_row)?;
1127 } else if let Some(update) = liquidation_auction_update_from_message(liq_msg)? {
1128 self.update_liquidation_auction_with_conn(
1129 conn,
1130 liq_msg
1131 .auction_id
1132 .as_deref()
1133 .expect("auction update requires auction_id"),
1134 &update,
1135 )?;
1136 }
1137
1138 let domain_state = status_to_liquidation_state_record(&liq_msg.status);
1139 let state_row = new_liquidation_state_from_domain(&domain_state);
1140 self.save_liquidation_state_with_conn(conn, &state_row)?;
1141
1142 let should_persist_history =
1143 liq_msg.previous_state != liq_msg.new_state || liq_msg.projection_changed;
1144 if should_persist_history {
1145 let history_row = liquidation_history_from_message(liq_msg)?;
1146 self.insert_liquidation_history_with_conn(conn, &history_row)?;
1147 }
1148 tracing::info!(
1149 "DieselHandler: persisted liquidation state change - wallet={}, {} -> {}",
1150 liq_msg.wallet,
1151 liq_msg.previous_state,
1152 liq_msg.new_state
1153 );
1154 }
1155 EngineMessage::RfqFilled(msg) => {
1156 tracing::info!(
1157 "DieselHandler: RFQ filled - rfq_id={}, taker={}, qp={}",
1158 msg.rfq_id,
1159 msg.taker_wallet,
1160 msg.qp_wallet
1161 );
1162 diesel::sql_query(
1166 "INSERT INTO rfq_fills (fill_id, rfq_id, quote_id, taker_wallet, qp_wallet, net_premium, taker_accept_signature)
1167 VALUES ($1::uuid, $2::uuid, $3::uuid, $4, $5, $6, $7)
1168 ON CONFLICT (rfq_id, quote_id) DO NOTHING",
1169 )
1170 .bind::<diesel::sql_types::Text, _>(msg.fill_id.as_str())
1171 .bind::<diesel::sql_types::Text, _>(msg.rfq_id.as_str())
1172 .bind::<diesel::sql_types::Text, _>(msg.quote_id.as_str())
1173 .bind::<diesel::sql_types::Binary, _>(msg.taker_wallet.as_bytes())
1174 .bind::<diesel::sql_types::Binary, _>(msg.qp_wallet.as_bytes())
1175 .bind::<diesel::sql_types::Numeric, _>(msg.net_premium)
1176 .bind::<diesel::sql_types::Text, _>(msg.taker_accept_signature.as_str())
1177 .execute(conn)?;
1178 }
1179 }
1180 Ok(())
1181 }
1182
1183 pub fn batch_cancel_expired_orders_sync(
1191 &self,
1192 cancels: &[ExpiredOrderCancel],
1193 reason: &str,
1194 ) -> Result<usize> {
1195 if cancels.is_empty() {
1196 return Ok(0);
1197 }
1198
1199 let mut conn = self.pool().get()?;
1200
1201 let now_ms = std::time::SystemTime::now()
1202 .duration_since(std::time::UNIX_EPOCH)
1203 .map_err(|e| {
1204 anyhow::anyhow!(
1205 "System clock before UNIX_EPOCH during startup cancel flush: {}",
1206 e
1207 )
1208 })?
1209 .as_millis() as i64;
1210
1211 const SERIALIZATION_RETRY_LIMIT: usize = 3;
1212 let candidate_ids: Vec<i64> = cancels.iter().map(|c| c.order_id as i64).collect();
1213
1214 #[derive(QueryableByName)]
1215 struct TerminalId {
1216 #[diesel(sql_type = diesel::sql_types::BigInt)]
1217 order_id: i64,
1218 }
1219
1220 #[derive(Queryable)]
1221 struct InsertedOrderUpdateId {
1222 id: Option<i32>,
1223 order_id: Option<i64>,
1224 }
1225
1226 for attempt in 0..SERIALIZATION_RETRY_LIMIT {
1227 let tx_result = conn
1228 .build_transaction()
1229 .serializable()
1230 .run::<_, diesel::result::Error, _>(|conn| {
1231 let terminal_ids: std::collections::HashSet<i64> = diesel::sql_query(
1235 "SELECT order_id FROM ( \
1236 SELECT oi.order_id \
1237 FROM order_infos oi \
1238 WHERE oi.order_id = ANY($1) \
1239 AND oi.status IN ('CANCELED', 'FILLED', 'REJECTED') \
1240 UNION \
1241 SELECT ou.order_id \
1242 FROM order_updates ou \
1243 WHERE ou.order_id = ANY($1) \
1244 AND ou.status IN ('CANCELED', 'FILLED', 'REJECTED') \
1245 ) terminal_orders",
1246 )
1247 .bind::<diesel::sql_types::Array<BigInt>, _>(&candidate_ids)
1248 .load::<TerminalId>(conn)?
1249 .into_iter()
1250 .map(|r| r.order_id)
1251 .collect();
1252
1253 let active_cancels: Vec<&ExpiredOrderCancel> = cancels
1254 .iter()
1255 .filter(|c| !terminal_ids.contains(&(c.order_id as i64)))
1256 .collect();
1257
1258 let new_updates: Vec<NewOrderUpdate> = active_cancels
1259 .iter()
1260 .map(|cancel| NewOrderUpdate {
1261 timestamp: now_ms,
1262 order_id: Some(cancel.order_id as i64),
1263 status: "CANCELED".to_string(),
1264 reason: Some(reason.to_string()),
1265 filled_size: cancel.filled_size,
1266 symbol: cancel.symbol.clone(),
1267 price: cancel.price,
1268 size: cancel.size,
1269 side: format!("{:?}", cancel.side),
1270 tif: "GTC".to_string(),
1271 client_id: cancel.client_id.clone(),
1272 })
1273 .collect();
1274
1275 if new_updates.is_empty() {
1276 return Ok(0);
1277 }
1278
1279 let inserted_updates: Vec<InsertedOrderUpdateId> =
1280 diesel::insert_into(schema::order_updates::table)
1281 .values(&new_updates)
1282 .returning((schema::order_updates::id, schema::order_updates::order_id))
1283 .get_results(conn)?;
1284
1285 let inserted = inserted_updates.len();
1286
1287 let inserted_update_ids: std::collections::HashMap<i64, i32> =
1288 inserted_updates
1289 .into_iter()
1290 .map(|row| {
1291 let order_id = row.order_id.expect(
1292 "startup expired cancel insert returned NULL order_id; persisted data invariant broken",
1293 );
1294 let update_id = row.id.expect(
1295 "startup expired cancel insert returned NULL update id; persisted data invariant broken",
1296 );
1297 (order_id, update_id)
1298 })
1299 .collect();
1300
1301 for cancel in active_cancels {
1305 let order_update_id = *inserted_update_ids
1306 .get(&(cancel.order_id as i64))
1307 .expect("missing inserted startup cancel update id for order");
1308
1309 let materialized_order_info = NewOrderInfo {
1310 order_id: cancel.order_id as i64,
1311 wallet_address: cancel.wallet,
1312 symbol: cancel.symbol.clone(),
1313 side: format!("{:?}", cancel.side),
1314 price: cancel.price,
1315 size: cancel.size,
1316 tif: format!("{:?}", cancel.tif),
1317 client_id: cancel.client_id.clone(),
1318 is_perp: cancel.is_perp,
1319 underlying: cancel.underlying.clone(),
1320 reduce_only: cancel.reduce_only,
1321 nonce: cancel.nonce.map(|nonce| nonce as i64),
1322 signature: cancel.signature.clone(),
1323 mmp_enabled: cancel.mmp_enabled,
1324 timestamp: cancel.timestamp as i64,
1325 status: "CANCELED".to_string(),
1326 filled_size: cancel.filled_size,
1327 last_materialized_order_update_id: order_update_id,
1328 };
1329
1330 upsert_materialized_order_state(conn, &materialized_order_info)?;
1331 }
1332
1333 Ok(inserted)
1334 });
1335
1336 match tx_result {
1337 Ok(inserted) => return Ok(inserted),
1338 Err(diesel::result::Error::DatabaseError(
1339 diesel::result::DatabaseErrorKind::SerializationFailure,
1340 _,
1341 )) if attempt + 1 < SERIALIZATION_RETRY_LIMIT => {
1342 tracing::warn!(
1343 attempt = attempt + 1,
1344 "Retrying startup expired cancel batch after serialization failure"
1345 );
1346 }
1347 Err(err) => return Err(err.into()),
1348 }
1349 }
1350
1351 Err(anyhow::anyhow!(
1352 "startup expired cancel batch exceeded serialization retry budget"
1353 ))
1354 }
1355
1356 fn save_liquidation_state_with_conn(
1359 &self,
1360 conn: &mut PgConnection,
1361 state: &NewLiquidationState,
1362 ) -> Result<()> {
1363 diesel::insert_into(schema::liquidation_states::table)
1364 .values(state)
1365 .on_conflict(schema::liquidation_states::wallet_address)
1366 .do_update()
1367 .set(state)
1368 .execute(conn)?;
1369 Ok(())
1370 }
1371
1372 fn insert_liquidation_history_with_conn(
1373 &self,
1374 conn: &mut PgConnection,
1375 entry: &NewLiquidationHistory,
1376 ) -> Result<i64> {
1377 use schema::liquidation_history::dsl as lh;
1378
1379 let inserted_id = diesel::insert_into(schema::liquidation_history::table)
1380 .values(entry)
1381 .on_conflict_do_nothing()
1382 .returning(lh::id)
1383 .get_result::<i64>(conn)
1384 .optional()?
1385 .unwrap_or(0);
1386
1387 Ok(inserted_id)
1388 }
1389
1390 fn upsert_liquidation_auction_with_conn(
1391 &self,
1392 conn: &mut PgConnection,
1393 auction: &NewLiquidationAuction,
1394 ) -> Result<()> {
1395 let query = diesel::insert_into(schema::liquidation_auctions::table)
1396 .values(auction)
1397 .on_conflict(schema::liquidation_auctions::auction_id)
1398 .do_update();
1399
1400 if auction.completed_at.is_some() {
1401 query
1402 .set((
1403 schema::liquidation_auctions::status.eq(&auction.status),
1404 schema::liquidation_auctions::positions.eq(&auction.positions),
1405 schema::liquidation_auctions::target_equity.eq(auction.target_equity),
1406 schema::liquidation_auctions::request_id.eq(&auction.request_id),
1407 schema::liquidation_auctions::tx_hash.eq(&auction.tx_hash),
1408 schema::liquidation_auctions::chain_start_time.eq(auction.chain_start_time),
1409 schema::liquidation_auctions::margin_needed.eq(auction.margin_needed),
1410 schema::liquidation_auctions::stop_request_id.eq(&auction.stop_request_id),
1411 schema::liquidation_auctions::stop_tx_hash.eq(&auction.stop_tx_hash),
1412 schema::liquidation_auctions::completed_at.eq(auction.completed_at),
1413 schema::liquidation_auctions::liquidator_address.eq(auction.liquidator_address),
1414 schema::liquidation_auctions::bonus.eq(auction.bonus),
1415 schema::liquidation_auctions::settlement_value.eq(auction.settlement_value),
1416 schema::liquidation_auctions::last_observed_block
1417 .eq(auction.last_observed_block),
1418 ))
1419 .execute(conn)?;
1420 } else {
1421 query
1422 .set((
1423 schema::liquidation_auctions::status.eq(&auction.status),
1424 schema::liquidation_auctions::positions.eq(&auction.positions),
1425 schema::liquidation_auctions::target_equity.eq(auction.target_equity),
1426 schema::liquidation_auctions::request_id.eq(&auction.request_id),
1427 schema::liquidation_auctions::tx_hash.eq(&auction.tx_hash),
1428 schema::liquidation_auctions::started_at.eq(auction.started_at),
1429 schema::liquidation_auctions::chain_start_time.eq(auction.chain_start_time),
1430 schema::liquidation_auctions::margin_needed.eq(auction.margin_needed),
1431 schema::liquidation_auctions::stop_request_id.eq(&auction.stop_request_id),
1432 schema::liquidation_auctions::stop_tx_hash.eq(&auction.stop_tx_hash),
1433 schema::liquidation_auctions::completed_at.eq(auction.completed_at),
1434 schema::liquidation_auctions::liquidator_address.eq(auction.liquidator_address),
1435 schema::liquidation_auctions::bonus.eq(auction.bonus),
1436 schema::liquidation_auctions::settlement_value.eq(auction.settlement_value),
1437 schema::liquidation_auctions::last_observed_block
1438 .eq(auction.last_observed_block),
1439 ))
1440 .execute(conn)?;
1441 }
1442 Ok(())
1443 }
1444
1445 fn update_liquidation_auction_with_conn(
1446 &self,
1447 conn: &mut PgConnection,
1448 auction_id: &str,
1449 update: &UpdateLiquidationAuction,
1450 ) -> Result<()> {
1451 diesel::update(
1452 schema::liquidation_auctions::table
1453 .filter(schema::liquidation_auctions::auction_id.eq(auction_id)),
1454 )
1455 .set(update)
1456 .execute(conn)?;
1457
1458 Ok(())
1459 }
1460
1461 fn get_liquidation_auction_with_conn(
1462 &self,
1463 conn: &mut PgConnection,
1464 auction_id: &str,
1465 ) -> Result<Option<LiquidationAuctionRecord>> {
1466 let result = schema::liquidation_auctions::table
1467 .filter(schema::liquidation_auctions::auction_id.eq(auction_id))
1468 .first::<LiquidationAuctionRecord>(conn)
1469 .optional()?;
1470
1471 Ok(result)
1472 }
1473
1474 fn persist_transaction_update(
1478 conn: &mut PgConnection,
1479 tx_update: &hypercall_types::engine_messages::TransactionUpdate,
1480 ) -> Result<()> {
1481 use diesel::sql_types::{Nullable, Text};
1482 use hypercall_types::engine_messages::TransactionStatus;
1483
1484 let (domain_status, delivery_status) = match tx_update.status {
1485 TransactionStatus::Submitted => (None, Some("pending")),
1486 TransactionStatus::Pending => (None, Some("pending")),
1487 TransactionStatus::Confirmed => (Some("completed"), Some("finalized")),
1488 TransactionStatus::Failed => (Some("failed"), Some("reverted")),
1489 TransactionStatus::Expired => (Some("failed"), Some("expired")),
1490 };
1491
1492 if let Some(domain_status) = domain_status {
1493 let delivery_error = tx_update.error.as_deref();
1494
1495 #[derive(diesel::QueryableByName)]
1496 struct ActionKeyRow {
1497 #[diesel(sql_type = crate::engine_enums::DirectiveActionKeySql)]
1498 action_key: crate::engine_enums::DirectiveActionKeyDb,
1499 }
1500
1501 let updated_row: Option<ActionKeyRow> = diesel::sql_query(
1502 r#"
1503 WITH updated_outbox AS (
1504 UPDATE directive_outbox
1505 SET domain_status = $2,
1506 delivery_status = $3,
1507 tx_hash = COALESCE($4, tx_hash),
1508 last_delivery_error = $5,
1509 updated_at = CURRENT_TIMESTAMP
1510 WHERE directive_id = $1
1511 AND delivery_status NOT IN ('finalized', 'reverted', 'expired', 'dead_lettered')
1512 RETURNING directive_id, action_key
1513 )
1514 SELECT action_key FROM updated_outbox
1515 "#,
1516 )
1517 .bind::<Text, _>(&tx_update.request_id)
1518 .bind::<Text, _>(domain_status)
1519 .bind::<Text, _>(delivery_status.expect("terminal update must have delivery status"))
1520 .bind::<Nullable<Text>, _>(tx_update.tx_hash.as_deref())
1521 .bind::<Nullable<Text>, _>(delivery_error)
1522 .get_result(conn)
1523 .optional()?;
1524
1525 if matches!(
1526 tx_update.status,
1527 TransactionStatus::Failed | TransactionStatus::Expired
1528 ) {
1529 if let Some(row) = updated_row {
1530 let action_key: hypercall_types::directives::ActionKey = row.action_key.into();
1531 if matches!(
1532 action_key,
1533 hypercall_types::directives::ActionKey::SystemWithdrawToken
1534 | hypercall_types::directives::ActionKey::SystemCreditOption
1535 ) {
1536 metrics::counter!(
1537 "ht_withdrawal_manual_reconciliation_required_total",
1538 "action_key" => action_key.as_str(),
1539 )
1540 .increment(1);
1541 tracing::warn!(
1542 directive_id = %tx_update.request_id,
1543 action_key = ?action_key,
1544 "Withdrawal directive reached terminal delivery status; automatic refunds are disabled and operator reconciliation is required"
1545 );
1546 }
1547 }
1548 }
1549 } else if let Some(delivery_status) = delivery_status {
1550 diesel::sql_query(
1551 r#"
1552 UPDATE directive_outbox
1553 SET delivery_status = $2,
1554 tx_hash = COALESCE($3, tx_hash),
1555 last_delivery_error = $4,
1556 updated_at = CURRENT_TIMESTAMP
1557 WHERE directive_id = $1
1558 AND delivery_status NOT IN ('finalized', 'reverted', 'expired', 'dead_lettered')
1559 "#,
1560 )
1561 .bind::<Text, _>(&tx_update.request_id)
1562 .bind::<Text, _>(delivery_status)
1563 .bind::<Nullable<Text>, _>(tx_update.tx_hash.as_deref())
1564 .bind::<Nullable<Text>, _>(tx_update.error.as_deref())
1565 .execute(conn)?;
1566 }
1567
1568 Ok(())
1569 }
1570}
1571
1572#[cfg(test)]
1573#[path = "event_handler_tests.rs"]
1574mod tests;