1use super::cache::LiquidationCache;
2use super::state::{
3 has_material_projection_change, AccountLiquidationStatus, FullLiquidationMetadata,
4 LiquidatedMetadata, LiquidationState,
5};
6use crate::messaging::EventBusTrait;
7use crate::rsm::unified_engine::LiquidationBonusRequest;
8use crate::shared::order_types::get_timestamp_millis;
9use crate::shared::topics::TOPIC_TRANSACTION_UPDATES;
10use alloy::{
11 primitives::{Address, U256},
12 providers::{DynProvider, Provider, ProviderBuilder},
13 rpc::types::{BlockNumberOrTag, Filter, Log},
14 sol,
15 sol_types::SolEvent,
16};
17use anyhow::{anyhow, Context, Result};
18use hypercall_db::LiquidationWriter;
19use hypercall_types::{
20 EngineMessage, LiquidationStateMessage, LiquidationStateType, TransactionStatus,
21 TransactionUpdate,
22};
23use hypercall_types::{WalletAddress, CONTRACT_UNIT_MULTIPLIER_DECIMAL};
24use metrics::{counter, gauge};
25use rust_decimal::Decimal;
26use std::str::FromStr;
27use std::sync::Arc;
28use std::time::Duration;
29use tokio::sync::{broadcast, mpsc, RwLock};
30use tracing::{debug, error, info, warn};
31
32sol! {
33 struct AuctionParams {
34 uint256 startTime;
35 int256 equity;
36 uint256 marginNeeded;
37 }
38
39 #[sol(rpc)]
40 contract Exchange {
41 function auctioneer() external view returns (address);
42 function managers(address account) external view returns (address);
43 function getAuctionParams(address account) external view returns (AuctionParams memory);
44
45 event LiquidationStarted(address indexed account, int256 equity, uint256 marginNeeded);
46 event LiquidationResolved(address indexed account, address indexed winner, uint256 bonus);
47 event LiquidationStopped(address indexed account);
48 event AuctioneerSet(address auctioneer);
49 }
50
51 #[sol(rpc)]
52 contract StaticAuctioneer {
53 event Liquidated(address indexed account, address indexed liquidator, uint256 amount);
54 }
55}
56
57#[derive(Clone, Debug)]
58pub struct LiquidationObserverConfig {
59 pub poll_interval: Duration,
60 pub max_lag_blocks: u64,
61}
62
63impl LiquidationObserverConfig {
64 pub fn from_runtime_config(config: &crate::backend_config::LiquidationRuntimeConfig) -> Self {
65 Self {
66 poll_interval: Duration::from_millis(config.chain_observer_poll_interval_ms),
67 max_lag_blocks: config.chain_observer_max_lag_blocks,
68 }
69 }
70}
71
72pub struct LiquidationChainObserver {
73 cache: Arc<LiquidationCache>,
74 db: Arc<dyn LiquidationWriter>,
75 bonus_sender: mpsc::Sender<LiquidationBonusRequest>,
76 event_sender: Option<mpsc::UnboundedSender<EngineMessage>>,
77 event_bus: Arc<dyn EventBusTrait>,
78 provider: DynProvider,
79 exchange_address: Address,
80 auctioneer_address: Arc<RwLock<Address>>,
81 config: LiquidationObserverConfig,
82}
83
84impl LiquidationChainObserver {
85 pub fn new(
86 cache: Arc<LiquidationCache>,
87 db: Arc<dyn LiquidationWriter>,
88 bonus_sender: mpsc::Sender<LiquidationBonusRequest>,
89 event_sender: Option<mpsc::UnboundedSender<EngineMessage>>,
90 event_bus: Arc<dyn EventBusTrait>,
91 rpc_url: &str,
92 exchange_contract_address: &str,
93 config: LiquidationObserverConfig,
94 ) -> Result<Self> {
95 let exchange_address = Address::from_str(exchange_contract_address)
96 .context("invalid transaction_submitter.exchange_contract_address")?;
97 let provider = ProviderBuilder::new()
98 .connect_http(
99 rpc_url
100 .parse()
101 .context("invalid transaction_submitter.rpc_url")?,
102 )
103 .erased();
104
105 Ok(Self {
106 cache,
107 db,
108 bonus_sender,
109 event_sender,
110 event_bus,
111 provider,
112 exchange_address,
113 auctioneer_address: Arc::new(RwLock::new(Address::ZERO)),
114 config,
115 })
116 }
117
118 pub async fn run_with_shutdown(&self, mut shutdown_rx: broadcast::Receiver<()>) -> Result<()> {
119 let mut tx_updates = self
120 .event_bus
121 .subscribe(vec![TOPIC_TRANSACTION_UPDATES.to_string()])
122 .await
123 .map_err(|error| anyhow!("failed to subscribe to transaction updates: {}", error))?;
124 let mut interval = tokio::time::interval(self.config.poll_interval);
125 let mut next_block = self.initial_next_block().await?;
126
127 loop {
128 tokio::select! {
129 _ = shutdown_rx.recv() => {
130 info!("Liquidation chain observer received shutdown signal");
131 break;
132 }
133 maybe_message = tx_updates.recv() => {
134 match maybe_message {
135 Some(EngineMessage::TransactionUpdate(update)) => {
136 if let Err(error) = self.handle_transaction_update(update).await {
137 error!("Failed to process liquidation transaction update: {}", error);
138 }
139 }
140 Some(_) => {}
141 None => {
142 warn!("Liquidation chain observer transaction update subscription closed");
143 break;
144 }
145 }
146 }
147 _ = interval.tick() => {
148 if let Err(error) = self.poll_chain(&mut next_block).await {
149 error!("Liquidation chain observer poll failed: {}", error);
150 }
151 }
152 }
153 }
154
155 Ok(())
156 }
157
158 async fn initial_next_block(&self) -> Result<u64> {
159 self.refresh_auctioneer_address().await?;
160
161 let latest_block = self.provider.get_block_number().await?;
162 let next_block = match self.db.get_max_liquidation_observed_block().await? {
163 Some(block) => block
164 .checked_add(1)
165 .ok_or_else(|| anyhow!("liquidation observed block overflow"))?,
166 None => latest_block.saturating_sub(self.config.max_lag_blocks.saturating_sub(1)),
167 };
168
169 info!(
170 next_block,
171 latest_block, "Liquidation chain observer initialized"
172 );
173 Ok(next_block)
174 }
175
176 async fn refresh_auctioneer_address(&self) -> Result<()> {
177 let exchange = Exchange::new(self.exchange_address, &self.provider);
178 let auctioneer_address = exchange
179 .auctioneer()
180 .call()
181 .await
182 .context("failed to query Exchange.auctioneer()")?;
183 *self.auctioneer_address.write().await = auctioneer_address;
184 Ok(())
185 }
186
187 async fn poll_chain(&self, next_block: &mut u64) -> Result<()> {
188 self.sync_pending_chain_state().await?;
189
190 let latest_block = self.provider.get_block_number().await?;
191 if *next_block > latest_block {
192 gauge!("ht_liquidation_chain_observer_lag_blocks").set(0.0);
193 return Ok(());
194 }
195
196 let lag_blocks = latest_block - *next_block;
197 gauge!("ht_liquidation_chain_observer_lag_blocks").set(lag_blocks as f64);
198 if lag_blocks >= self.config.max_lag_blocks {
199 warn!(
200 next_block = *next_block,
201 latest_block,
202 lag_blocks,
203 max_lag_blocks = self.config.max_lag_blocks,
204 "Liquidation chain observer lag exceeded configured threshold"
205 );
206 }
207
208 self.process_logs(*next_block, latest_block).await?;
209 *next_block = latest_block
210 .checked_add(1)
211 .ok_or_else(|| anyhow!("latest block overflow"))?;
212 Ok(())
213 }
214
215 async fn process_logs(&self, from_block: u64, to_block: u64) -> Result<()> {
216 let auctioneer_address = self.auctioneer_address_at_block(from_block).await?;
217 let mut current_from = from_block;
218 let mut current_auctioneer = auctioneer_address;
219 let mut min_log_key_exclusive = None;
220
221 while current_from <= to_block {
222 let exchange_logs = self
223 .provider
224 .get_logs(
225 &self
226 .exchange_liquidation_filter(current_from, to_block)
227 .address(self.exchange_address),
228 )
229 .await
230 .with_context(|| {
231 format!(
232 "failed to fetch exchange liquidation logs from {} to {}",
233 current_from, to_block
234 )
235 })?;
236
237 let auctioneer_logs = if current_auctioneer == Address::ZERO {
238 Vec::new()
239 } else {
240 self.provider
241 .get_logs(
242 &self
243 .auctioneer_liquidation_filter(current_from, to_block)
244 .address(current_auctioneer),
245 )
246 .await
247 .with_context(|| {
248 format!(
249 "failed to fetch auctioneer liquidation logs from {} to {}",
250 current_from, to_block
251 )
252 })?
253 };
254
255 let mut logs: Vec<Log> = exchange_logs
256 .into_iter()
257 .chain(auctioneer_logs)
258 .filter(|log| {
259 min_log_key_exclusive
260 .map(|min_key| log_sort_key(log) > min_key)
261 .unwrap_or(true)
262 })
263 .collect();
264 logs.sort_by_key(log_sort_key);
265
266 let rotation_log_index = logs
267 .iter()
268 .position(|log| log.log_decode::<Exchange::AuctioneerSet>().is_ok());
269
270 if let Some(rotation_log_index) = rotation_log_index {
271 let rotation_key = log_sort_key(&logs[rotation_log_index]);
272 let drained: Vec<Log> = logs.drain(..=rotation_log_index).collect();
273 for log in drained {
274 self.handle_log(log).await?;
275 }
276 current_from = rotation_key.0;
277 current_auctioneer = *self.auctioneer_address.read().await;
278 min_log_key_exclusive = Some(rotation_key);
279 continue;
280 }
281
282 for log in logs {
283 self.handle_log(log).await?;
284 }
285 break;
286 }
287
288 Ok(())
289 }
290
291 async fn auctioneer_address_at_block(&self, block_number: u64) -> Result<Address> {
292 let block_id = alloy::eips::BlockId::Number(BlockNumberOrTag::Number(block_number));
293 let exchange_code = self
294 .provider
295 .get_code_at(self.exchange_address)
296 .block_id(block_id)
297 .await
298 .with_context(|| {
299 format!(
300 "failed to query Exchange bytecode at replay start block {}",
301 block_number
302 )
303 })?;
304 if exchange_code.is_empty() {
305 return Ok(Address::ZERO);
306 }
307
308 let exchange = Exchange::new(self.exchange_address, &self.provider);
309 exchange
310 .auctioneer()
311 .block(block_id)
312 .call()
313 .await
314 .with_context(|| {
315 format!(
316 "failed to query Exchange.auctioneer() at replay start block {}",
317 block_number
318 )
319 })
320 }
321
322 async fn handle_log(&self, log: Log) -> Result<()> {
323 if let Ok(decoded) = log.log_decode::<Exchange::LiquidationStarted>() {
324 return self.handle_liquidation_started(log, decoded.data()).await;
325 }
326 if let Ok(decoded) = log.log_decode::<Exchange::LiquidationStopped>() {
327 return self.handle_liquidation_stopped(log, decoded.data()).await;
328 }
329 if let Ok(decoded) = log.log_decode::<Exchange::LiquidationResolved>() {
330 return self.handle_liquidation_resolved(log, decoded.data()).await;
331 }
332 if let Ok(decoded) = log.log_decode::<Exchange::AuctioneerSet>() {
333 let event = decoded.data();
334 *self.auctioneer_address.write().await = event.auctioneer;
335 info!(auctioneer = %event.auctioneer, "Updated liquidation auctioneer from chain event");
336 return Ok(());
337 }
338 if let Ok(decoded) = log.log_decode::<StaticAuctioneer::Liquidated>() {
339 return self.handle_liquidated(log, decoded.data()).await;
340 }
341 Ok(())
342 }
343
344 async fn sync_pending_chain_state(&self) -> Result<()> {
345 let wallets = self.cache.get_all_wallets().await;
346 if wallets.is_empty() {
347 return Ok(());
348 }
349
350 let exchange = Exchange::new(self.exchange_address, &self.provider);
351 for wallet in wallets {
352 let Some(status) = self.cache.get_status(&wallet).await else {
353 continue;
354 };
355
356 if !should_sync_pending_chain_state(&status.state) {
357 continue;
358 }
359
360 let params = match exchange.getAuctionParams(wallet.inner()).call().await {
361 Ok(params) => params,
362 Err(error) => {
363 warn!(
364 wallet = %wallet,
365 state = status.state.as_str(),
366 "Failed to query auction params while syncing pending liquidation state: {}",
367 error
368 );
369 continue;
370 }
371 };
372
373 if params.startTime != U256::ZERO {
374 self.promote_started_from_chain_state(&status, ¶ms)
375 .await?;
376 continue;
377 }
378
379 if matches!(status.state, LiquidationState::InLiquidation(..)) {
380 self.reconcile_terminal_state_from_chain(&status).await?;
381 }
382 }
383
384 Ok(())
385 }
386
387 async fn reconcile_terminal_state_from_chain(
388 &self,
389 status: &AccountLiquidationStatus,
390 ) -> Result<()> {
391 let wallet = status.wallet;
392 let Some(auction_id) = status.state.auction_id().map(ToOwned::to_owned) else {
393 warn!(
394 wallet = %wallet,
395 state = status.state.as_str(),
396 "Skipping terminal chain-state reconciliation without tracked auction id"
397 );
398 return Ok(());
399 };
400
401 let latest_block = self.provider.get_block_number().await?;
402 let from_block = self
403 .terminal_reconciliation_from_block(&auction_id, latest_block)
404 .await?
405 .min(latest_block);
406 let mut logs = self
407 .provider
408 .get_logs(
409 &Filter::new()
410 .address(self.exchange_address)
411 .from_block(BlockNumberOrTag::Number(from_block))
412 .to_block(BlockNumberOrTag::Number(latest_block))
413 .event_signature(vec![
414 Exchange::LiquidationStopped::SIGNATURE_HASH,
415 Exchange::LiquidationResolved::SIGNATURE_HASH,
416 ])
417 .topic1(wallet.inner()),
418 )
419 .await
420 .with_context(|| {
421 format!(
422 "failed to fetch terminal liquidation reconciliation logs for wallet {} from {} to {}",
423 wallet, from_block, latest_block
424 )
425 })?;
426 logs.sort_by_key(log_sort_key);
427
428 for log in logs.into_iter().rev() {
429 if let Ok(decoded) = log.log_decode::<Exchange::LiquidationStopped>() {
430 if decoded.data().account == wallet.inner() {
431 self.handle_liquidation_stopped(log, decoded.data()).await?;
432 return Ok(());
433 }
434 }
435 if let Ok(decoded) = log.log_decode::<Exchange::LiquidationResolved>() {
436 if decoded.data().account == wallet.inner() {
437 self.handle_liquidation_resolved(log, decoded.data())
438 .await?;
439 return Ok(());
440 }
441 }
442 }
443
444 debug!(
445 wallet = %wallet,
446 auction_id = %auction_id,
447 from_block,
448 latest_block,
449 "No terminal liquidation log found while reconciling inactive on-chain auction"
450 );
451 Ok(())
452 }
453
454 async fn terminal_reconciliation_from_block(
455 &self,
456 auction_id: &str,
457 latest_block: u64,
458 ) -> Result<u64> {
459 let last_observed_block = self
460 .db
461 .get_liquidation_auction(auction_id)
462 .await?
463 .and_then(|auction| auction.last_observed_block);
464 next_reconciliation_block(
465 last_observed_block,
466 latest_block,
467 self.config.max_lag_blocks,
468 )
469 }
470
471 fn exchange_liquidation_filter(&self, from_block: u64, to_block: u64) -> Filter {
472 Filter::new()
473 .from_block(BlockNumberOrTag::Number(from_block))
474 .to_block(BlockNumberOrTag::Number(to_block))
475 .event_signature(vec![
476 Exchange::LiquidationStarted::SIGNATURE_HASH,
477 Exchange::LiquidationResolved::SIGNATURE_HASH,
478 Exchange::LiquidationStopped::SIGNATURE_HASH,
479 Exchange::AuctioneerSet::SIGNATURE_HASH,
480 ])
481 }
482
483 fn auctioneer_liquidation_filter(&self, from_block: u64, to_block: u64) -> Filter {
484 Filter::new()
485 .from_block(BlockNumberOrTag::Number(from_block))
486 .to_block(BlockNumberOrTag::Number(to_block))
487 .event_signature(StaticAuctioneer::Liquidated::SIGNATURE_HASH)
488 }
489
490 async fn promote_started_from_chain_state(
491 &self,
492 status: &AccountLiquidationStatus,
493 params: &AuctionParams,
494 ) -> Result<()> {
495 let Some(chain_start_time) = u256_to_u64(params.startTime)? else {
496 return Ok(());
497 };
498 let wallet = status.wallet;
499 let previous_state = status.state.clone();
500 let now = get_timestamp_millis();
501 let mut updated_status = status.clone();
502 let margin_needed = decimal_from_u256_units(params.marginNeeded)?;
503 let (auction_id, request_id, tx_hash, started_at, stop_request_id, stop_tx_hash) =
504 match &status.state {
505 LiquidationState::PreLiquidation(metadata) => (
506 replay_started_auction_id(
507 wallet,
508 metadata.pending_full_auction_id.as_deref(),
509 Some(chain_start_time),
510 metadata.pending_full_tx_hash.as_deref(),
511 ),
512 metadata.pending_full_request_id.clone(),
513 metadata.pending_full_tx_hash.clone(),
514 now,
515 None,
516 None,
517 ),
518 LiquidationState::InLiquidation(metadata) => (
519 metadata.auction_id.clone(),
520 metadata.request_id.clone(),
521 metadata.tx_hash.clone(),
522 metadata.started_at,
523 metadata.stop_request_id.clone(),
524 metadata.stop_tx_hash.clone(),
525 ),
526 other => {
527 debug!(
528 wallet = %wallet,
529 state = other.as_str(),
530 "Skipping chain-state promotion for wallet outside liquidation start path"
531 );
532 return Ok(());
533 }
534 };
535
536 updated_status.enter_liquidation(
537 FullLiquidationMetadata {
538 auction_id: auction_id.clone(),
539 request_id,
540 tx_hash,
541 started_at,
542 chain_start_time: Some(chain_start_time),
543 margin_needed,
544 stop_request_id,
545 stop_tx_hash,
546 },
547 now,
548 );
549 self.cache.set_status(updated_status.clone()).await;
550 self.emit_status_update(&previous_state, &updated_status, now);
551 self.db
552 .update_liquidation_auction(
553 &auction_id,
554 &hypercall_db::LiquidationAuctionUpdate {
555 status: Some("active".to_string()),
556 request_id: None,
557 tx_hash: None,
558 chain_start_time: Some(i64::try_from(chain_start_time)?),
559 margin_needed: None,
560 stop_request_id: None,
561 stop_tx_hash: None,
562 completed_at: None,
563 liquidator_address: None,
564 bonus: None,
565 settlement_value: None,
566 last_observed_block: None,
567 },
568 )
569 .await?;
570 Ok(())
571 }
572
573 async fn handle_liquidation_started(
574 &self,
575 log: Log,
576 event: &Exchange::LiquidationStarted,
577 ) -> Result<()> {
578 let wallet = WalletAddress::from(event.account);
579 let Some(status) = self.cache.get_status(&wallet).await else {
580 warn!(wallet = %wallet, "Ignoring LiquidationStarted for untracked wallet");
581 return Ok(());
582 };
583
584 let exchange = Exchange::new(self.exchange_address, &self.provider);
585 let chain_start_time = match exchange.getAuctionParams(wallet.inner()).call().await {
586 Ok(params) => {
587 let chain_start_time = u256_to_u64(params.startTime)?;
588 if chain_start_time.is_none() {
589 warn!(
590 wallet = %wallet,
591 "LiquidationStarted replay has no live chain start time, continuing without it"
592 );
593 }
594 chain_start_time
595 }
596 Err(error) => {
597 warn!(
598 wallet = %wallet,
599 "Failed to query live auction params during LiquidationStarted replay: {}",
600 error
601 );
602 None
603 }
604 };
605 let margin_needed = decimal_from_u256_units(event.marginNeeded)?;
606 let tx_hash = log.transaction_hash.map(|hash| hash.to_string());
607 let block_number = log
608 .block_number
609 .ok_or_else(|| anyhow!("LiquidationStarted missing block number"))?;
610 let now = get_timestamp_millis();
611 let previous_state = status.state.clone();
612 let mut updated_status = status.clone();
613
614 let (auction_id, request_id, known_tx_hash, started_at, stop_request_id, stop_tx_hash) =
615 match &status.state {
616 LiquidationState::PreLiquidation(metadata) => (
617 replay_started_auction_id(
618 wallet,
619 metadata.pending_full_auction_id.as_deref(),
620 chain_start_time,
621 tx_hash
622 .as_deref()
623 .or(metadata.pending_full_tx_hash.as_deref()),
624 ),
625 metadata.pending_full_request_id.clone(),
626 metadata.pending_full_tx_hash.clone(),
627 now,
628 None,
629 None,
630 ),
631 LiquidationState::InLiquidation(metadata) => (
632 metadata.auction_id.clone(),
633 metadata.request_id.clone(),
634 metadata.tx_hash.clone(),
635 metadata.started_at,
636 metadata.stop_request_id.clone(),
637 metadata.stop_tx_hash.clone(),
638 ),
639 other => {
640 warn!(
641 wallet = %wallet,
642 state = other.as_str(),
643 "Ignoring stale LiquidationStarted for wallet in unexpected state"
644 );
645 return Ok(());
646 }
647 };
648
649 updated_status.enter_liquidation(
650 FullLiquidationMetadata {
651 auction_id: auction_id.clone(),
652 request_id,
653 tx_hash: tx_hash.or(known_tx_hash),
654 started_at,
655 chain_start_time,
656 margin_needed,
657 stop_request_id,
658 stop_tx_hash,
659 },
660 now,
661 );
662 self.cache.set_status(updated_status.clone()).await;
663 self.emit_status_update(&previous_state, &updated_status, now);
664 self.db
665 .update_liquidation_auction(
666 &auction_id,
667 &hypercall_db::LiquidationAuctionUpdate {
668 status: Some("active".to_string()),
669 request_id: None,
670 tx_hash: None,
671 chain_start_time: chain_start_time.map(i64::try_from).transpose()?,
672 margin_needed: Some(margin_needed),
673 stop_request_id: None,
674 stop_tx_hash: None,
675 completed_at: None,
676 liquidator_address: None,
677 bonus: None,
678 settlement_value: None,
679 last_observed_block: Some(i64::try_from(block_number)?),
680 },
681 )
682 .await?;
683 counter!("ht_liquidation_full_started_total").increment(1);
684 Ok(())
685 }
686
687 async fn handle_liquidation_stopped(
688 &self,
689 log: Log,
690 event: &Exchange::LiquidationStopped,
691 ) -> Result<()> {
692 let wallet = WalletAddress::from(event.account);
693 let Some(status) = self.cache.get_status(&wallet).await else {
694 warn!(wallet = %wallet, "Ignoring LiquidationStopped for untracked wallet");
695 return Ok(());
696 };
697 let previous_state = status.state.clone();
698 let Some(auction_id) =
699 tracked_auction_id_for_replay(&previous_state, wallet, "LiquidationStopped")
700 else {
701 return Ok(());
702 };
703 let now = get_timestamp_millis();
704 let mut updated_status = status.clone();
705 updated_status.recover_to_healthy(now);
706 self.cache.set_status(updated_status.clone()).await;
707 self.emit_status_update_with_auction(
708 &previous_state,
709 &updated_status,
710 now,
711 Some(auction_id.clone()),
712 );
713 self.db
714 .update_liquidation_auction(
715 &auction_id,
716 &hypercall_db::LiquidationAuctionUpdate {
717 status: Some("cancelled".to_string()),
718 request_id: None,
719 tx_hash: None,
720 chain_start_time: None,
721 margin_needed: None,
722 stop_request_id: None,
723 stop_tx_hash: log.transaction_hash.map(|hash| hash.to_string()),
724 completed_at: Some(i64::try_from(now)?),
725 liquidator_address: None,
726 bonus: None,
727 settlement_value: None,
728 last_observed_block: log.block_number.map(i64::try_from).transpose()?,
729 },
730 )
731 .await?;
732 counter!("ht_liquidation_full_stopped_total").increment(1);
733 Ok(())
734 }
735
736 async fn handle_liquidation_resolved(
737 &self,
738 log: Log,
739 event: &Exchange::LiquidationResolved,
740 ) -> Result<()> {
741 let wallet = WalletAddress::from(event.account);
742 let Some(status) = self.cache.get_status(&wallet).await else {
743 warn!(wallet = %wallet, "Ignoring LiquidationResolved for untracked wallet");
744 return Ok(());
745 };
746 if let LiquidationState::Liquidated(metadata) = &status.state {
747 if metadata.tx_hash == log.transaction_hash.map(|hash| hash.to_string()) {
748 debug!(wallet = %wallet, "Skipping already applied LiquidationResolved event");
749 return Ok(());
750 }
751 }
752
753 let winner = WalletAddress::from(event.winner);
754 let bonus = decimal_from_u256_units(event.bonus)?;
755 let now = get_timestamp_millis();
756 let previous_state = status.state.clone();
757 let Some(auction_id) =
758 tracked_auction_id_for_replay(&previous_state, wallet, "LiquidationResolved")
759 else {
760 return Ok(());
761 };
762 if bonus > Decimal::ZERO {
763 let bonus_permit = self
764 .reserve_liquidation_bonus_engine_handoff(winner)
765 .await?;
766 let resolution_tx_hash = log.transaction_hash.unwrap_or_else(|| {
767 panic!(
768 "STATE_CORRUPTION: LiquidationResolved missing transaction hash for wallet {}",
769 wallet
770 )
771 });
772 let bonus_apply = self
773 .db
774 .claim_and_apply_liquidation_bonus(
775 &winner,
776 &wallet,
777 &auction_id,
778 &resolution_tx_hash.to_string(),
779 bonus,
780 i64::try_from(now)?,
781 )
782 .await?;
783 if let Some(ledger_event_id) = bonus_apply.ledger_event_id {
784 self.apply_liquidation_bonus_to_engine(
785 bonus_permit,
786 winner,
787 bonus,
788 ledger_event_id,
789 now,
790 )
791 .await?;
792 }
793 if bonus_apply.newly_applied {
794 counter!("ht_liquidation_bonus_paid_total").increment(1);
795 }
796 }
797
798 let existing_auction = self.db.get_liquidation_auction(&auction_id).await?;
799 let mut updated_status = status.clone();
800 updated_status.complete_liquidation(
801 LiquidatedMetadata {
802 auction_id: auction_id.clone(),
803 completed_at: now,
804 winner: Some(winner),
805 bonus,
806 tx_hash: log.transaction_hash.map(|hash| hash.to_string()),
807 },
808 now,
809 );
810 self.cache.set_status(updated_status.clone()).await;
811 self.emit_status_update_with_auction(
812 &previous_state,
813 &updated_status,
814 now,
815 Some(auction_id.clone()),
816 );
817 self.db
818 .update_liquidation_auction(
819 &auction_id,
820 &hypercall_db::LiquidationAuctionUpdate {
821 status: Some("completed".to_string()),
822 request_id: None,
823 tx_hash: None,
824 chain_start_time: None,
825 margin_needed: None,
826 stop_request_id: None,
827 stop_tx_hash: None,
828 completed_at: Some(i64::try_from(now)?),
829 liquidator_address: Some(winner),
830 bonus: Some(bonus),
831 settlement_value: existing_auction
832 .as_ref()
833 .and_then(|auction| auction.settlement_value),
834 last_observed_block: log.block_number.map(i64::try_from).transpose()?,
835 },
836 )
837 .await?;
838 counter!("ht_liquidation_full_resolved_total").increment(1);
839 Ok(())
840 }
841
842 async fn reserve_liquidation_bonus_engine_handoff(
843 &self,
844 winner: WalletAddress,
845 ) -> Result<mpsc::Permit<'_, LiquidationBonusRequest>> {
846 self.bonus_sender.reserve().await.map_err(|error| {
847 anyhow!(
848 "failed to reserve liquidation bonus balance update handoff to engine for {}: {}",
849 winner,
850 error
851 )
852 })
853 }
854
855 async fn apply_liquidation_bonus_to_engine(
856 &self,
857 bonus_permit: mpsc::Permit<'_, LiquidationBonusRequest>,
858 winner: WalletAddress,
859 bonus: Decimal,
860 sequence: u64,
861 timestamp_ms: u64,
862 ) -> Result<()> {
863 let (applied_tx, applied_rx) = tokio::sync::oneshot::channel();
864 bonus_permit.send(LiquidationBonusRequest {
865 request_id: liquidation_bonus_request_id(sequence),
866 wallet: winner,
867 amount: bonus,
868 timestamp_ms,
869 sequence: Some(sequence),
870 applied_tx: Some(applied_tx),
871 });
872
873 match tokio::time::timeout(Duration::from_secs(10), applied_rx).await {
874 Ok(Ok(Ok(()))) => Ok(()),
875 Ok(Ok(Err(error))) => panic!(
876 "RUNTIME_INVARIANT: persisted liquidation bonus for {} at sequence {} but engine rejected balance_ledger update: {}",
877 winner, sequence, error
878 ),
879 Ok(Err(error)) => panic!(
880 "RUNTIME_INVARIANT: persisted liquidation bonus for {} at sequence {} but engine dropped balance_ledger update ack: {}",
881 winner, sequence, error
882 ),
883 Err(_) => panic!(
884 "RUNTIME_INVARIANT: persisted liquidation bonus for {} at sequence {} but engine ack timed out",
885 winner, sequence
886 ),
887 }
888 }
889
890 async fn handle_liquidated(
891 &self,
892 log: Log,
893 event: &StaticAuctioneer::Liquidated,
894 ) -> Result<()> {
895 let wallet = WalletAddress::from(event.account);
896 let Some(status) = self.cache.get_status(&wallet).await else {
897 warn!(wallet = %wallet, "Ignoring Liquidated for untracked wallet");
898 return Ok(());
899 };
900 let Some(auction_id) = status.state.auction_id().map(ToOwned::to_owned) else {
901 warn!(wallet = %wallet, "Ignoring Liquidated without tracked auction id");
902 return Ok(());
903 };
904 let existing_auction = self.db.get_liquidation_auction(&auction_id).await?;
905 self.db
906 .update_liquidation_auction(
907 &auction_id,
908 &hypercall_db::LiquidationAuctionUpdate {
909 status: Some("completed".to_string()),
910 request_id: None,
911 tx_hash: None,
912 chain_start_time: None,
913 margin_needed: None,
914 stop_request_id: None,
915 stop_tx_hash: None,
916 completed_at: Some(i64::try_from(get_timestamp_millis())?),
917 liquidator_address: Some(WalletAddress::from(event.liquidator)),
918 bonus: existing_auction.as_ref().and_then(|auction| auction.bonus),
919 settlement_value: Some(decimal_from_u256_units(event.amount)?),
920 last_observed_block: log.block_number.map(i64::try_from).transpose()?,
921 },
922 )
923 .await?;
924 counter!("ht_liquidation_liquidated_total").increment(1);
925 Ok(())
926 }
927
928 async fn handle_transaction_update(&self, update: TransactionUpdate) -> Result<()> {
929 let Some((_wallet, mut status)) =
930 self.find_status_by_request_id(&update.request_id).await?
931 else {
932 return Ok(());
933 };
934
935 let previous_state = status.state.clone();
936 let mut changed = false;
937 match &mut status.state {
938 LiquidationState::PreLiquidation(metadata)
939 if metadata.pending_full_request_id.as_deref()
940 == Some(update.request_id.as_str()) =>
941 {
942 match update.status {
943 TransactionStatus::Confirmed => {
944 if metadata.pending_full_tx_hash != update.tx_hash {
945 metadata.pending_full_tx_hash = update.tx_hash.clone();
946 changed = true;
947 }
948 }
949 TransactionStatus::Failed | TransactionStatus::Expired => {
950 metadata.pending_full_auction_id = None;
951 metadata.pending_full_request_id = None;
952 metadata.pending_full_tx_hash = None;
953 metadata.pending_full_margin_needed = None;
954 changed = true;
955 }
956 TransactionStatus::Pending | TransactionStatus::Submitted => {}
957 }
958 }
959 LiquidationState::InLiquidation(metadata)
960 if metadata.request_id.as_deref() == Some(update.request_id.as_str()) =>
961 {
962 match update.status {
963 TransactionStatus::Confirmed => {
964 if metadata.tx_hash != update.tx_hash {
965 metadata.tx_hash = update.tx_hash.clone();
966 changed = true;
967 }
968 }
969 TransactionStatus::Pending
970 | TransactionStatus::Submitted
971 | TransactionStatus::Failed
972 | TransactionStatus::Expired => {}
973 }
974 }
975 LiquidationState::InLiquidation(metadata)
976 if metadata.stop_request_id.as_deref() == Some(update.request_id.as_str()) =>
977 {
978 match update.status {
979 TransactionStatus::Confirmed => {
980 if metadata.stop_tx_hash != update.tx_hash {
981 metadata.stop_tx_hash = update.tx_hash.clone();
982 changed = true;
983 }
984 }
985 TransactionStatus::Failed | TransactionStatus::Expired => {
986 metadata.stop_request_id = None;
987 metadata.stop_tx_hash = None;
988 changed = true;
989 }
990 TransactionStatus::Pending | TransactionStatus::Submitted => {}
991 }
992 }
993 _ => {}
994 }
995
996 if !changed {
997 return Ok(());
998 }
999
1000 let timestamp = update.timestamp;
1001 status.updated_at = timestamp;
1002 self.cache.set_status(status.clone()).await;
1003 self.emit_status_update(&previous_state, &status, timestamp);
1004 Ok(())
1005 }
1006
1007 async fn find_status_by_request_id(
1008 &self,
1009 request_id: &str,
1010 ) -> Result<Option<(WalletAddress, AccountLiquidationStatus)>> {
1011 for wallet in self.cache.get_all_wallets().await {
1012 let Some(status) = self.cache.get_status(&wallet).await else {
1013 continue;
1014 };
1015 match &status.state {
1016 LiquidationState::PreLiquidation(metadata)
1017 if metadata.pending_full_request_id.as_deref() == Some(request_id) =>
1018 {
1019 return Ok(Some((wallet, status)));
1020 }
1021 LiquidationState::InLiquidation(metadata)
1022 if metadata.request_id.as_deref() == Some(request_id)
1023 || metadata.stop_request_id.as_deref() == Some(request_id) =>
1024 {
1025 return Ok(Some((wallet, status)));
1026 }
1027 _ => {}
1028 }
1029 }
1030 Ok(None)
1031 }
1032
1033 fn emit_status_update(
1034 &self,
1035 previous_state: &LiquidationState,
1036 status: &AccountLiquidationStatus,
1037 timestamp: u64,
1038 ) {
1039 self.emit_status_update_with_auction(
1040 previous_state,
1041 status,
1042 timestamp,
1043 status
1044 .state
1045 .auction_id()
1046 .map(ToOwned::to_owned)
1047 .or_else(|| previous_state.auction_id().map(ToOwned::to_owned)),
1048 );
1049 }
1050
1051 fn emit_status_update_with_auction(
1052 &self,
1053 previous_state: &LiquidationState,
1054 status: &AccountLiquidationStatus,
1055 timestamp: u64,
1056 auction_id: Option<String>,
1057 ) {
1058 let Some(sender) = &self.event_sender else {
1059 return;
1060 };
1061
1062 let msg = LiquidationStateMessage {
1063 wallet: status.wallet,
1064 previous_state: liquidation_state_to_type(previous_state),
1065 new_state: liquidation_state_to_type(&status.state),
1066 previous_liquidation_mode: previous_state
1067 .liquidation_mode()
1068 .map(|mode| mode.as_str().to_string()),
1069 liquidation_mode: status
1070 .liquidation_mode()
1071 .map(|mode| mode.as_str().to_string()),
1072 margin_mode: status.margin_mode.as_str().to_string(),
1073 equity: status.equity,
1074 mm_required: status.mm_required,
1075 maintenance_margin: status.maintenance_margin,
1076 shortfall: status.shortfall(),
1077 previous_auction_id: previous_state.auction_id().map(str::to_string),
1078 projection_changed: has_material_projection_change(previous_state, &status.state),
1079 auction_id,
1080 status: status.clone(),
1081 timestamp,
1082 };
1083
1084 if let Err(error) = sender.send(EngineMessage::LiquidationStateChange(msg)) {
1085 warn!(
1086 "Failed to emit liquidation observer status update: {}",
1087 error
1088 );
1089 }
1090 }
1091}
1092
1093fn liquidation_state_to_type(state: &LiquidationState) -> LiquidationStateType {
1094 match state {
1095 LiquidationState::Healthy => LiquidationStateType::Healthy,
1096 LiquidationState::PreLiquidation(..) => LiquidationStateType::PreLiquidation,
1097 LiquidationState::InLiquidation(..) => LiquidationStateType::InLiquidation,
1098 LiquidationState::Liquidated(..) => LiquidationStateType::Liquidated,
1099 }
1100}
1101
1102fn should_sync_pending_chain_state(state: &LiquidationState) -> bool {
1103 matches!(
1104 state,
1105 LiquidationState::PreLiquidation(..) | LiquidationState::InLiquidation(..)
1106 )
1107}
1108
1109fn next_reconciliation_block(
1110 last_observed_block: Option<i64>,
1111 latest_block: u64,
1112 max_lag_blocks: u64,
1113) -> Result<u64> {
1114 match last_observed_block {
1115 Some(block) => {
1116 let observed = u64::try_from(block).map_err(|_| {
1117 anyhow!(
1118 "negative liquidation last_observed_block {} persisted in database",
1119 block
1120 )
1121 })?;
1122 observed
1123 .checked_add(1)
1124 .ok_or_else(|| anyhow!("liquidation last_observed_block overflow: {}", observed))
1125 }
1126 None => Ok(latest_block.saturating_sub(max_lag_blocks.saturating_sub(1))),
1127 }
1128}
1129
1130fn log_sort_key(log: &Log) -> (u64, u64, u64) {
1131 (
1132 log.block_number.unwrap_or_default(),
1133 log.transaction_index.unwrap_or_default(),
1134 log.log_index.unwrap_or_default(),
1135 )
1136}
1137
1138fn decimal_from_u256_units(value: U256) -> Result<Decimal> {
1139 let raw = Decimal::from_str(&value.to_string())
1140 .map_err(|error| anyhow!("failed to parse U256 {} as Decimal: {}", value, error))?;
1141 Ok(raw / CONTRACT_UNIT_MULTIPLIER_DECIMAL)
1142}
1143
1144fn u256_to_u64(value: U256) -> Result<Option<u64>> {
1145 if value == U256::ZERO {
1146 return Ok(None);
1147 }
1148 let raw = value.to_string();
1149 Ok(Some(raw.parse::<u64>().map_err(|error| {
1150 anyhow!("failed to convert U256 {} to u64: {}", value, error)
1151 })?))
1152}
1153
1154fn tracked_auction_id_for_replay(
1155 state: &LiquidationState,
1156 wallet: WalletAddress,
1157 event_name: &str,
1158) -> Option<String> {
1159 match state.auction_id() {
1160 Some(auction_id) => Some(auction_id.to_owned()),
1161 None => {
1162 warn!(
1163 wallet = %wallet,
1164 state = ?state,
1165 "Ignoring stale {} event without tracked auction",
1166 event_name
1167 );
1168 None
1169 }
1170 }
1171}
1172
1173fn replay_started_auction_id(
1174 wallet: WalletAddress,
1175 persisted_auction_id: Option<&str>,
1176 chain_start_time: Option<u64>,
1177 tx_hash: Option<&str>,
1178) -> String {
1179 if let Some(auction_id) = persisted_auction_id {
1180 return auction_id.to_owned();
1181 }
1182
1183 let derived = match (chain_start_time, tx_hash) {
1184 (Some(start_time), _) => format!("chain:{}:{}", wallet, start_time),
1185 (None, Some(tx_hash)) => format!("tx:{}:{}", wallet, tx_hash),
1186 (None, None) => format!("wallet:{}", wallet),
1187 };
1188 warn!(
1189 wallet = %wallet,
1190 auction_id = %derived,
1191 "Recovering liquidation replay without persisted pending_full_auction_id"
1192 );
1193 derived
1194}
1195
1196#[async_trait::async_trait]
1197impl crate::shared::service::Service for LiquidationChainObserver {
1198 fn name(&self) -> &'static str {
1199 "LiquidationChainObserver"
1200 }
1201
1202 fn owner(&self) -> crate::shared::service::ServiceOwner {
1203 crate::shared::service::ServiceOwner::Engine
1204 }
1205
1206 async fn run(
1207 self: std::sync::Arc<Self>,
1208 shutdown: crate::shared::ShutdownRx,
1209 ) -> anyhow::Result<()> {
1210 self.run_with_shutdown(shutdown).await
1211 }
1212}
1213
1214fn liquidation_bonus_request_id(ledger_event_id: u64) -> String {
1215 const LIQUIDATION_BONUS_UUID_PREFIX: u128 = 0x4c49_5142_4f4e_5500_0000_0000_0000_0000;
1216 uuid::Uuid::from_u128(LIQUIDATION_BONUS_UUID_PREFIX | u128::from(ledger_event_id)).to_string()
1217}
1218
1219#[cfg(test)]
1220mod tests {
1221 use super::{
1222 decimal_from_u256_units, liquidation_bonus_request_id, next_reconciliation_block,
1223 replay_started_auction_id, should_sync_pending_chain_state, tracked_auction_id_for_replay,
1224 u256_to_u64,
1225 };
1226 use crate::liquidator::state::{
1227 FullLiquidationMetadata, LiquidationState, PartialLiquidationMetadata,
1228 };
1229 use alloy::primitives::U256;
1230 use hypercall_types::wallet_address::test_wallet;
1231 use rust_decimal_macros::dec;
1232
1233 #[test]
1234 fn liquidation_bonus_request_id_is_durable_sequence_stable_uuid() {
1235 let first = liquidation_bonus_request_id(42);
1236 let second = liquidation_bonus_request_id(42);
1237 let different = liquidation_bonus_request_id(43);
1238
1239 assert_eq!(first, second);
1240 assert_ne!(first, different);
1241 uuid::Uuid::parse_str(&first).expect("liquidation bonus request id must be a UUID");
1242 }
1243
1244 #[test]
1245 fn test_decimal_from_u256_units() {
1246 assert_eq!(
1247 decimal_from_u256_units(U256::from(12_345_678u64)).unwrap(),
1248 dec!(12.345678)
1249 );
1250 }
1251
1252 #[test]
1253 fn test_u256_to_u64() {
1254 assert_eq!(u256_to_u64(U256::ZERO).unwrap(), None);
1255 assert_eq!(u256_to_u64(U256::from(42u64)).unwrap(), Some(42));
1256 }
1257
1258 #[test]
1259 fn test_tracked_auction_id_for_replay_skips_healthy_state() {
1260 assert_eq!(
1261 tracked_auction_id_for_replay(
1262 &LiquidationState::Healthy,
1263 test_wallet(1),
1264 "LiquidationStopped"
1265 ),
1266 None
1267 );
1268 }
1269
1270 #[test]
1271 fn test_replay_started_auction_id_preserves_cached_value() {
1272 assert_eq!(
1273 replay_started_auction_id(test_wallet(1), Some("auction-123"), Some(42), Some("0xabc"),),
1274 "auction-123"
1275 );
1276 }
1277
1278 #[test]
1279 fn test_replay_started_auction_id_derives_from_chain_start_time() {
1280 let wallet = test_wallet(7);
1281 assert_eq!(
1282 replay_started_auction_id(wallet, None, Some(42), Some("0xabc")),
1283 format!("chain:{}:42", wallet)
1284 );
1285 }
1286
1287 #[test]
1288 fn test_should_sync_pending_chain_state_for_pre_liquidation_without_pending_request() {
1289 assert!(should_sync_pending_chain_state(
1290 &LiquidationState::PreLiquidation(PartialLiquidationMetadata::default())
1291 ));
1292 }
1293
1294 #[test]
1295 fn test_should_sync_pending_chain_state_for_in_liquidation_with_known_chain_start_time() {
1296 assert!(should_sync_pending_chain_state(
1297 &LiquidationState::InLiquidation(FullLiquidationMetadata {
1298 auction_id: "auction-1".to_string(),
1299 chain_start_time: Some(55),
1300 ..FullLiquidationMetadata::default()
1301 })
1302 ));
1303 }
1304
1305 #[test]
1306 fn test_next_reconciliation_block_advances_last_observed_block() {
1307 assert_eq!(next_reconciliation_block(Some(41), 99, 20).unwrap(), 42);
1308 assert_eq!(next_reconciliation_block(None, 99, 20).unwrap(), 80);
1309 }
1310}