Skip to main content

hypercall/liquidator/
observer.rs

1use super::cache::LiquidationCache;
2use super::state::{
3    has_material_projection_change, AccountLiquidationStatus, FullLiquidationMetadata,
4    LiquidatedMetadata, LiquidationState,
5};
6use crate::messaging::EventBusTrait;
7use crate::rsm::unified_engine::LiquidationBonusRequest;
8use crate::shared::order_types::get_timestamp_millis;
9use crate::shared::topics::TOPIC_TRANSACTION_UPDATES;
10use alloy::{
11    primitives::{Address, U256},
12    providers::{DynProvider, Provider, ProviderBuilder},
13    rpc::types::{BlockNumberOrTag, Filter, Log},
14    sol,
15    sol_types::SolEvent,
16};
17use anyhow::{anyhow, Context, Result};
18use hypercall_db::LiquidationWriter;
19use hypercall_types::{
20    EngineMessage, LiquidationStateMessage, LiquidationStateType, TransactionStatus,
21    TransactionUpdate,
22};
23use hypercall_types::{WalletAddress, CONTRACT_UNIT_MULTIPLIER_DECIMAL};
24use metrics::{counter, gauge};
25use rust_decimal::Decimal;
26use std::str::FromStr;
27use std::sync::Arc;
28use std::time::Duration;
29use tokio::sync::{broadcast, mpsc, RwLock};
30use tracing::{debug, error, info, warn};
31
32sol! {
33    struct AuctionParams {
34        uint256 startTime;
35        int256 equity;
36        uint256 marginNeeded;
37    }
38
39    #[sol(rpc)]
40    contract Exchange {
41        function auctioneer() external view returns (address);
42        function managers(address account) external view returns (address);
43        function getAuctionParams(address account) external view returns (AuctionParams memory);
44
45        event LiquidationStarted(address indexed account, int256 equity, uint256 marginNeeded);
46        event LiquidationResolved(address indexed account, address indexed winner, uint256 bonus);
47        event LiquidationStopped(address indexed account);
48        event AuctioneerSet(address auctioneer);
49    }
50
51    #[sol(rpc)]
52    contract StaticAuctioneer {
53        event Liquidated(address indexed account, address indexed liquidator, uint256 amount);
54    }
55}
56
57#[derive(Clone, Debug)]
58pub struct LiquidationObserverConfig {
59    pub poll_interval: Duration,
60    pub max_lag_blocks: u64,
61}
62
63impl LiquidationObserverConfig {
64    pub fn from_runtime_config(config: &crate::backend_config::LiquidationRuntimeConfig) -> Self {
65        Self {
66            poll_interval: Duration::from_millis(config.chain_observer_poll_interval_ms),
67            max_lag_blocks: config.chain_observer_max_lag_blocks,
68        }
69    }
70}
71
72pub struct LiquidationChainObserver {
73    cache: Arc<LiquidationCache>,
74    db: Arc<dyn LiquidationWriter>,
75    bonus_sender: mpsc::Sender<LiquidationBonusRequest>,
76    event_sender: Option<mpsc::UnboundedSender<EngineMessage>>,
77    event_bus: Arc<dyn EventBusTrait>,
78    provider: DynProvider,
79    exchange_address: Address,
80    auctioneer_address: Arc<RwLock<Address>>,
81    config: LiquidationObserverConfig,
82}
83
84impl LiquidationChainObserver {
85    pub fn new(
86        cache: Arc<LiquidationCache>,
87        db: Arc<dyn LiquidationWriter>,
88        bonus_sender: mpsc::Sender<LiquidationBonusRequest>,
89        event_sender: Option<mpsc::UnboundedSender<EngineMessage>>,
90        event_bus: Arc<dyn EventBusTrait>,
91        rpc_url: &str,
92        exchange_contract_address: &str,
93        config: LiquidationObserverConfig,
94    ) -> Result<Self> {
95        let exchange_address = Address::from_str(exchange_contract_address)
96            .context("invalid transaction_submitter.exchange_contract_address")?;
97        let provider = ProviderBuilder::new()
98            .connect_http(
99                rpc_url
100                    .parse()
101                    .context("invalid transaction_submitter.rpc_url")?,
102            )
103            .erased();
104
105        Ok(Self {
106            cache,
107            db,
108            bonus_sender,
109            event_sender,
110            event_bus,
111            provider,
112            exchange_address,
113            auctioneer_address: Arc::new(RwLock::new(Address::ZERO)),
114            config,
115        })
116    }
117
118    pub async fn run_with_shutdown(&self, mut shutdown_rx: broadcast::Receiver<()>) -> Result<()> {
119        let mut tx_updates = self
120            .event_bus
121            .subscribe(vec![TOPIC_TRANSACTION_UPDATES.to_string()])
122            .await
123            .map_err(|error| anyhow!("failed to subscribe to transaction updates: {}", error))?;
124        let mut interval = tokio::time::interval(self.config.poll_interval);
125        let mut next_block = self.initial_next_block().await?;
126
127        loop {
128            tokio::select! {
129                _ = shutdown_rx.recv() => {
130                    info!("Liquidation chain observer received shutdown signal");
131                    break;
132                }
133                maybe_message = tx_updates.recv() => {
134                    match maybe_message {
135                        Some(EngineMessage::TransactionUpdate(update)) => {
136                            if let Err(error) = self.handle_transaction_update(update).await {
137                                error!("Failed to process liquidation transaction update: {}", error);
138                            }
139                        }
140                        Some(_) => {}
141                        None => {
142                            warn!("Liquidation chain observer transaction update subscription closed");
143                            break;
144                        }
145                    }
146                }
147                _ = interval.tick() => {
148                    if let Err(error) = self.poll_chain(&mut next_block).await {
149                        error!("Liquidation chain observer poll failed: {}", error);
150                    }
151                }
152            }
153        }
154
155        Ok(())
156    }
157
158    async fn initial_next_block(&self) -> Result<u64> {
159        self.refresh_auctioneer_address().await?;
160
161        let latest_block = self.provider.get_block_number().await?;
162        let next_block = match self.db.get_max_liquidation_observed_block().await? {
163            Some(block) => block
164                .checked_add(1)
165                .ok_or_else(|| anyhow!("liquidation observed block overflow"))?,
166            None => latest_block.saturating_sub(self.config.max_lag_blocks.saturating_sub(1)),
167        };
168
169        info!(
170            next_block,
171            latest_block, "Liquidation chain observer initialized"
172        );
173        Ok(next_block)
174    }
175
176    async fn refresh_auctioneer_address(&self) -> Result<()> {
177        let exchange = Exchange::new(self.exchange_address, &self.provider);
178        let auctioneer_address = exchange
179            .auctioneer()
180            .call()
181            .await
182            .context("failed to query Exchange.auctioneer()")?;
183        *self.auctioneer_address.write().await = auctioneer_address;
184        Ok(())
185    }
186
187    async fn poll_chain(&self, next_block: &mut u64) -> Result<()> {
188        self.sync_pending_chain_state().await?;
189
190        let latest_block = self.provider.get_block_number().await?;
191        if *next_block > latest_block {
192            gauge!("ht_liquidation_chain_observer_lag_blocks").set(0.0);
193            return Ok(());
194        }
195
196        let lag_blocks = latest_block - *next_block;
197        gauge!("ht_liquidation_chain_observer_lag_blocks").set(lag_blocks as f64);
198        if lag_blocks >= self.config.max_lag_blocks {
199            warn!(
200                next_block = *next_block,
201                latest_block,
202                lag_blocks,
203                max_lag_blocks = self.config.max_lag_blocks,
204                "Liquidation chain observer lag exceeded configured threshold"
205            );
206        }
207
208        self.process_logs(*next_block, latest_block).await?;
209        *next_block = latest_block
210            .checked_add(1)
211            .ok_or_else(|| anyhow!("latest block overflow"))?;
212        Ok(())
213    }
214
215    async fn process_logs(&self, from_block: u64, to_block: u64) -> Result<()> {
216        let auctioneer_address = self.auctioneer_address_at_block(from_block).await?;
217        let mut current_from = from_block;
218        let mut current_auctioneer = auctioneer_address;
219        let mut min_log_key_exclusive = None;
220
221        while current_from <= to_block {
222            let exchange_logs = self
223                .provider
224                .get_logs(
225                    &self
226                        .exchange_liquidation_filter(current_from, to_block)
227                        .address(self.exchange_address),
228                )
229                .await
230                .with_context(|| {
231                    format!(
232                        "failed to fetch exchange liquidation logs from {} to {}",
233                        current_from, to_block
234                    )
235                })?;
236
237            let auctioneer_logs = if current_auctioneer == Address::ZERO {
238                Vec::new()
239            } else {
240                self.provider
241                    .get_logs(
242                        &self
243                            .auctioneer_liquidation_filter(current_from, to_block)
244                            .address(current_auctioneer),
245                    )
246                    .await
247                    .with_context(|| {
248                        format!(
249                            "failed to fetch auctioneer liquidation logs from {} to {}",
250                            current_from, to_block
251                        )
252                    })?
253            };
254
255            let mut logs: Vec<Log> = exchange_logs
256                .into_iter()
257                .chain(auctioneer_logs)
258                .filter(|log| {
259                    min_log_key_exclusive
260                        .map(|min_key| log_sort_key(log) > min_key)
261                        .unwrap_or(true)
262                })
263                .collect();
264            logs.sort_by_key(log_sort_key);
265
266            let rotation_log_index = logs
267                .iter()
268                .position(|log| log.log_decode::<Exchange::AuctioneerSet>().is_ok());
269
270            if let Some(rotation_log_index) = rotation_log_index {
271                let rotation_key = log_sort_key(&logs[rotation_log_index]);
272                let drained: Vec<Log> = logs.drain(..=rotation_log_index).collect();
273                for log in drained {
274                    self.handle_log(log).await?;
275                }
276                current_from = rotation_key.0;
277                current_auctioneer = *self.auctioneer_address.read().await;
278                min_log_key_exclusive = Some(rotation_key);
279                continue;
280            }
281
282            for log in logs {
283                self.handle_log(log).await?;
284            }
285            break;
286        }
287
288        Ok(())
289    }
290
291    async fn auctioneer_address_at_block(&self, block_number: u64) -> Result<Address> {
292        let block_id = alloy::eips::BlockId::Number(BlockNumberOrTag::Number(block_number));
293        let exchange_code = self
294            .provider
295            .get_code_at(self.exchange_address)
296            .block_id(block_id)
297            .await
298            .with_context(|| {
299                format!(
300                    "failed to query Exchange bytecode at replay start block {}",
301                    block_number
302                )
303            })?;
304        if exchange_code.is_empty() {
305            return Ok(Address::ZERO);
306        }
307
308        let exchange = Exchange::new(self.exchange_address, &self.provider);
309        exchange
310            .auctioneer()
311            .block(block_id)
312            .call()
313            .await
314            .with_context(|| {
315                format!(
316                    "failed to query Exchange.auctioneer() at replay start block {}",
317                    block_number
318                )
319            })
320    }
321
322    async fn handle_log(&self, log: Log) -> Result<()> {
323        if let Ok(decoded) = log.log_decode::<Exchange::LiquidationStarted>() {
324            return self.handle_liquidation_started(log, decoded.data()).await;
325        }
326        if let Ok(decoded) = log.log_decode::<Exchange::LiquidationStopped>() {
327            return self.handle_liquidation_stopped(log, decoded.data()).await;
328        }
329        if let Ok(decoded) = log.log_decode::<Exchange::LiquidationResolved>() {
330            return self.handle_liquidation_resolved(log, decoded.data()).await;
331        }
332        if let Ok(decoded) = log.log_decode::<Exchange::AuctioneerSet>() {
333            let event = decoded.data();
334            *self.auctioneer_address.write().await = event.auctioneer;
335            info!(auctioneer = %event.auctioneer, "Updated liquidation auctioneer from chain event");
336            return Ok(());
337        }
338        if let Ok(decoded) = log.log_decode::<StaticAuctioneer::Liquidated>() {
339            return self.handle_liquidated(log, decoded.data()).await;
340        }
341        Ok(())
342    }
343
344    async fn sync_pending_chain_state(&self) -> Result<()> {
345        let wallets = self.cache.get_all_wallets().await;
346        if wallets.is_empty() {
347            return Ok(());
348        }
349
350        let exchange = Exchange::new(self.exchange_address, &self.provider);
351        for wallet in wallets {
352            let Some(status) = self.cache.get_status(&wallet).await else {
353                continue;
354            };
355
356            if !should_sync_pending_chain_state(&status.state) {
357                continue;
358            }
359
360            let params = match exchange.getAuctionParams(wallet.inner()).call().await {
361                Ok(params) => params,
362                Err(error) => {
363                    warn!(
364                        wallet = %wallet,
365                        state = status.state.as_str(),
366                        "Failed to query auction params while syncing pending liquidation state: {}",
367                        error
368                    );
369                    continue;
370                }
371            };
372
373            if params.startTime != U256::ZERO {
374                self.promote_started_from_chain_state(&status, &params)
375                    .await?;
376                continue;
377            }
378
379            if matches!(status.state, LiquidationState::InLiquidation(..)) {
380                self.reconcile_terminal_state_from_chain(&status).await?;
381            }
382        }
383
384        Ok(())
385    }
386
387    async fn reconcile_terminal_state_from_chain(
388        &self,
389        status: &AccountLiquidationStatus,
390    ) -> Result<()> {
391        let wallet = status.wallet;
392        let Some(auction_id) = status.state.auction_id().map(ToOwned::to_owned) else {
393            warn!(
394                wallet = %wallet,
395                state = status.state.as_str(),
396                "Skipping terminal chain-state reconciliation without tracked auction id"
397            );
398            return Ok(());
399        };
400
401        let latest_block = self.provider.get_block_number().await?;
402        let from_block = self
403            .terminal_reconciliation_from_block(&auction_id, latest_block)
404            .await?
405            .min(latest_block);
406        let mut logs = self
407            .provider
408            .get_logs(
409                &Filter::new()
410                    .address(self.exchange_address)
411                    .from_block(BlockNumberOrTag::Number(from_block))
412                    .to_block(BlockNumberOrTag::Number(latest_block))
413                    .event_signature(vec![
414                        Exchange::LiquidationStopped::SIGNATURE_HASH,
415                        Exchange::LiquidationResolved::SIGNATURE_HASH,
416                    ])
417                    .topic1(wallet.inner()),
418            )
419            .await
420            .with_context(|| {
421                format!(
422                    "failed to fetch terminal liquidation reconciliation logs for wallet {} from {} to {}",
423                    wallet, from_block, latest_block
424                )
425            })?;
426        logs.sort_by_key(log_sort_key);
427
428        for log in logs.into_iter().rev() {
429            if let Ok(decoded) = log.log_decode::<Exchange::LiquidationStopped>() {
430                if decoded.data().account == wallet.inner() {
431                    self.handle_liquidation_stopped(log, decoded.data()).await?;
432                    return Ok(());
433                }
434            }
435            if let Ok(decoded) = log.log_decode::<Exchange::LiquidationResolved>() {
436                if decoded.data().account == wallet.inner() {
437                    self.handle_liquidation_resolved(log, decoded.data())
438                        .await?;
439                    return Ok(());
440                }
441            }
442        }
443
444        debug!(
445            wallet = %wallet,
446            auction_id = %auction_id,
447            from_block,
448            latest_block,
449            "No terminal liquidation log found while reconciling inactive on-chain auction"
450        );
451        Ok(())
452    }
453
454    async fn terminal_reconciliation_from_block(
455        &self,
456        auction_id: &str,
457        latest_block: u64,
458    ) -> Result<u64> {
459        let last_observed_block = self
460            .db
461            .get_liquidation_auction(auction_id)
462            .await?
463            .and_then(|auction| auction.last_observed_block);
464        next_reconciliation_block(
465            last_observed_block,
466            latest_block,
467            self.config.max_lag_blocks,
468        )
469    }
470
471    fn exchange_liquidation_filter(&self, from_block: u64, to_block: u64) -> Filter {
472        Filter::new()
473            .from_block(BlockNumberOrTag::Number(from_block))
474            .to_block(BlockNumberOrTag::Number(to_block))
475            .event_signature(vec![
476                Exchange::LiquidationStarted::SIGNATURE_HASH,
477                Exchange::LiquidationResolved::SIGNATURE_HASH,
478                Exchange::LiquidationStopped::SIGNATURE_HASH,
479                Exchange::AuctioneerSet::SIGNATURE_HASH,
480            ])
481    }
482
483    fn auctioneer_liquidation_filter(&self, from_block: u64, to_block: u64) -> Filter {
484        Filter::new()
485            .from_block(BlockNumberOrTag::Number(from_block))
486            .to_block(BlockNumberOrTag::Number(to_block))
487            .event_signature(StaticAuctioneer::Liquidated::SIGNATURE_HASH)
488    }
489
490    async fn promote_started_from_chain_state(
491        &self,
492        status: &AccountLiquidationStatus,
493        params: &AuctionParams,
494    ) -> Result<()> {
495        let Some(chain_start_time) = u256_to_u64(params.startTime)? else {
496            return Ok(());
497        };
498        let wallet = status.wallet;
499        let previous_state = status.state.clone();
500        let now = get_timestamp_millis();
501        let mut updated_status = status.clone();
502        let margin_needed = decimal_from_u256_units(params.marginNeeded)?;
503        let (auction_id, request_id, tx_hash, started_at, stop_request_id, stop_tx_hash) =
504            match &status.state {
505                LiquidationState::PreLiquidation(metadata) => (
506                    replay_started_auction_id(
507                        wallet,
508                        metadata.pending_full_auction_id.as_deref(),
509                        Some(chain_start_time),
510                        metadata.pending_full_tx_hash.as_deref(),
511                    ),
512                    metadata.pending_full_request_id.clone(),
513                    metadata.pending_full_tx_hash.clone(),
514                    now,
515                    None,
516                    None,
517                ),
518                LiquidationState::InLiquidation(metadata) => (
519                    metadata.auction_id.clone(),
520                    metadata.request_id.clone(),
521                    metadata.tx_hash.clone(),
522                    metadata.started_at,
523                    metadata.stop_request_id.clone(),
524                    metadata.stop_tx_hash.clone(),
525                ),
526                other => {
527                    debug!(
528                        wallet = %wallet,
529                        state = other.as_str(),
530                        "Skipping chain-state promotion for wallet outside liquidation start path"
531                    );
532                    return Ok(());
533                }
534            };
535
536        updated_status.enter_liquidation(
537            FullLiquidationMetadata {
538                auction_id: auction_id.clone(),
539                request_id,
540                tx_hash,
541                started_at,
542                chain_start_time: Some(chain_start_time),
543                margin_needed,
544                stop_request_id,
545                stop_tx_hash,
546            },
547            now,
548        );
549        self.cache.set_status(updated_status.clone()).await;
550        self.emit_status_update(&previous_state, &updated_status, now);
551        self.db
552            .update_liquidation_auction(
553                &auction_id,
554                &hypercall_db::LiquidationAuctionUpdate {
555                    status: Some("active".to_string()),
556                    request_id: None,
557                    tx_hash: None,
558                    chain_start_time: Some(i64::try_from(chain_start_time)?),
559                    margin_needed: None,
560                    stop_request_id: None,
561                    stop_tx_hash: None,
562                    completed_at: None,
563                    liquidator_address: None,
564                    bonus: None,
565                    settlement_value: None,
566                    last_observed_block: None,
567                },
568            )
569            .await?;
570        Ok(())
571    }
572
573    async fn handle_liquidation_started(
574        &self,
575        log: Log,
576        event: &Exchange::LiquidationStarted,
577    ) -> Result<()> {
578        let wallet = WalletAddress::from(event.account);
579        let Some(status) = self.cache.get_status(&wallet).await else {
580            warn!(wallet = %wallet, "Ignoring LiquidationStarted for untracked wallet");
581            return Ok(());
582        };
583
584        let exchange = Exchange::new(self.exchange_address, &self.provider);
585        let chain_start_time = match exchange.getAuctionParams(wallet.inner()).call().await {
586            Ok(params) => {
587                let chain_start_time = u256_to_u64(params.startTime)?;
588                if chain_start_time.is_none() {
589                    warn!(
590                        wallet = %wallet,
591                        "LiquidationStarted replay has no live chain start time, continuing without it"
592                    );
593                }
594                chain_start_time
595            }
596            Err(error) => {
597                warn!(
598                    wallet = %wallet,
599                    "Failed to query live auction params during LiquidationStarted replay: {}",
600                    error
601                );
602                None
603            }
604        };
605        let margin_needed = decimal_from_u256_units(event.marginNeeded)?;
606        let tx_hash = log.transaction_hash.map(|hash| hash.to_string());
607        let block_number = log
608            .block_number
609            .ok_or_else(|| anyhow!("LiquidationStarted missing block number"))?;
610        let now = get_timestamp_millis();
611        let previous_state = status.state.clone();
612        let mut updated_status = status.clone();
613
614        let (auction_id, request_id, known_tx_hash, started_at, stop_request_id, stop_tx_hash) =
615            match &status.state {
616                LiquidationState::PreLiquidation(metadata) => (
617                    replay_started_auction_id(
618                        wallet,
619                        metadata.pending_full_auction_id.as_deref(),
620                        chain_start_time,
621                        tx_hash
622                            .as_deref()
623                            .or(metadata.pending_full_tx_hash.as_deref()),
624                    ),
625                    metadata.pending_full_request_id.clone(),
626                    metadata.pending_full_tx_hash.clone(),
627                    now,
628                    None,
629                    None,
630                ),
631                LiquidationState::InLiquidation(metadata) => (
632                    metadata.auction_id.clone(),
633                    metadata.request_id.clone(),
634                    metadata.tx_hash.clone(),
635                    metadata.started_at,
636                    metadata.stop_request_id.clone(),
637                    metadata.stop_tx_hash.clone(),
638                ),
639                other => {
640                    warn!(
641                        wallet = %wallet,
642                        state = other.as_str(),
643                        "Ignoring stale LiquidationStarted for wallet in unexpected state"
644                    );
645                    return Ok(());
646                }
647            };
648
649        updated_status.enter_liquidation(
650            FullLiquidationMetadata {
651                auction_id: auction_id.clone(),
652                request_id,
653                tx_hash: tx_hash.or(known_tx_hash),
654                started_at,
655                chain_start_time,
656                margin_needed,
657                stop_request_id,
658                stop_tx_hash,
659            },
660            now,
661        );
662        self.cache.set_status(updated_status.clone()).await;
663        self.emit_status_update(&previous_state, &updated_status, now);
664        self.db
665            .update_liquidation_auction(
666                &auction_id,
667                &hypercall_db::LiquidationAuctionUpdate {
668                    status: Some("active".to_string()),
669                    request_id: None,
670                    tx_hash: None,
671                    chain_start_time: chain_start_time.map(i64::try_from).transpose()?,
672                    margin_needed: Some(margin_needed),
673                    stop_request_id: None,
674                    stop_tx_hash: None,
675                    completed_at: None,
676                    liquidator_address: None,
677                    bonus: None,
678                    settlement_value: None,
679                    last_observed_block: Some(i64::try_from(block_number)?),
680                },
681            )
682            .await?;
683        counter!("ht_liquidation_full_started_total").increment(1);
684        Ok(())
685    }
686
687    async fn handle_liquidation_stopped(
688        &self,
689        log: Log,
690        event: &Exchange::LiquidationStopped,
691    ) -> Result<()> {
692        let wallet = WalletAddress::from(event.account);
693        let Some(status) = self.cache.get_status(&wallet).await else {
694            warn!(wallet = %wallet, "Ignoring LiquidationStopped for untracked wallet");
695            return Ok(());
696        };
697        let previous_state = status.state.clone();
698        let Some(auction_id) =
699            tracked_auction_id_for_replay(&previous_state, wallet, "LiquidationStopped")
700        else {
701            return Ok(());
702        };
703        let now = get_timestamp_millis();
704        let mut updated_status = status.clone();
705        updated_status.recover_to_healthy(now);
706        self.cache.set_status(updated_status.clone()).await;
707        self.emit_status_update_with_auction(
708            &previous_state,
709            &updated_status,
710            now,
711            Some(auction_id.clone()),
712        );
713        self.db
714            .update_liquidation_auction(
715                &auction_id,
716                &hypercall_db::LiquidationAuctionUpdate {
717                    status: Some("cancelled".to_string()),
718                    request_id: None,
719                    tx_hash: None,
720                    chain_start_time: None,
721                    margin_needed: None,
722                    stop_request_id: None,
723                    stop_tx_hash: log.transaction_hash.map(|hash| hash.to_string()),
724                    completed_at: Some(i64::try_from(now)?),
725                    liquidator_address: None,
726                    bonus: None,
727                    settlement_value: None,
728                    last_observed_block: log.block_number.map(i64::try_from).transpose()?,
729                },
730            )
731            .await?;
732        counter!("ht_liquidation_full_stopped_total").increment(1);
733        Ok(())
734    }
735
736    async fn handle_liquidation_resolved(
737        &self,
738        log: Log,
739        event: &Exchange::LiquidationResolved,
740    ) -> Result<()> {
741        let wallet = WalletAddress::from(event.account);
742        let Some(status) = self.cache.get_status(&wallet).await else {
743            warn!(wallet = %wallet, "Ignoring LiquidationResolved for untracked wallet");
744            return Ok(());
745        };
746        if let LiquidationState::Liquidated(metadata) = &status.state {
747            if metadata.tx_hash == log.transaction_hash.map(|hash| hash.to_string()) {
748                debug!(wallet = %wallet, "Skipping already applied LiquidationResolved event");
749                return Ok(());
750            }
751        }
752
753        let winner = WalletAddress::from(event.winner);
754        let bonus = decimal_from_u256_units(event.bonus)?;
755        let now = get_timestamp_millis();
756        let previous_state = status.state.clone();
757        let Some(auction_id) =
758            tracked_auction_id_for_replay(&previous_state, wallet, "LiquidationResolved")
759        else {
760            return Ok(());
761        };
762        if bonus > Decimal::ZERO {
763            let bonus_permit = self
764                .reserve_liquidation_bonus_engine_handoff(winner)
765                .await?;
766            let resolution_tx_hash = log.transaction_hash.unwrap_or_else(|| {
767                panic!(
768                    "STATE_CORRUPTION: LiquidationResolved missing transaction hash for wallet {}",
769                    wallet
770                )
771            });
772            let bonus_apply = self
773                .db
774                .claim_and_apply_liquidation_bonus(
775                    &winner,
776                    &wallet,
777                    &auction_id,
778                    &resolution_tx_hash.to_string(),
779                    bonus,
780                    i64::try_from(now)?,
781                )
782                .await?;
783            if let Some(ledger_event_id) = bonus_apply.ledger_event_id {
784                self.apply_liquidation_bonus_to_engine(
785                    bonus_permit,
786                    winner,
787                    bonus,
788                    ledger_event_id,
789                    now,
790                )
791                .await?;
792            }
793            if bonus_apply.newly_applied {
794                counter!("ht_liquidation_bonus_paid_total").increment(1);
795            }
796        }
797
798        let existing_auction = self.db.get_liquidation_auction(&auction_id).await?;
799        let mut updated_status = status.clone();
800        updated_status.complete_liquidation(
801            LiquidatedMetadata {
802                auction_id: auction_id.clone(),
803                completed_at: now,
804                winner: Some(winner),
805                bonus,
806                tx_hash: log.transaction_hash.map(|hash| hash.to_string()),
807            },
808            now,
809        );
810        self.cache.set_status(updated_status.clone()).await;
811        self.emit_status_update_with_auction(
812            &previous_state,
813            &updated_status,
814            now,
815            Some(auction_id.clone()),
816        );
817        self.db
818            .update_liquidation_auction(
819                &auction_id,
820                &hypercall_db::LiquidationAuctionUpdate {
821                    status: Some("completed".to_string()),
822                    request_id: None,
823                    tx_hash: None,
824                    chain_start_time: None,
825                    margin_needed: None,
826                    stop_request_id: None,
827                    stop_tx_hash: None,
828                    completed_at: Some(i64::try_from(now)?),
829                    liquidator_address: Some(winner),
830                    bonus: Some(bonus),
831                    settlement_value: existing_auction
832                        .as_ref()
833                        .and_then(|auction| auction.settlement_value),
834                    last_observed_block: log.block_number.map(i64::try_from).transpose()?,
835                },
836            )
837            .await?;
838        counter!("ht_liquidation_full_resolved_total").increment(1);
839        Ok(())
840    }
841
842    async fn reserve_liquidation_bonus_engine_handoff(
843        &self,
844        winner: WalletAddress,
845    ) -> Result<mpsc::Permit<'_, LiquidationBonusRequest>> {
846        self.bonus_sender.reserve().await.map_err(|error| {
847            anyhow!(
848                "failed to reserve liquidation bonus balance update handoff to engine for {}: {}",
849                winner,
850                error
851            )
852        })
853    }
854
855    async fn apply_liquidation_bonus_to_engine(
856        &self,
857        bonus_permit: mpsc::Permit<'_, LiquidationBonusRequest>,
858        winner: WalletAddress,
859        bonus: Decimal,
860        sequence: u64,
861        timestamp_ms: u64,
862    ) -> Result<()> {
863        let (applied_tx, applied_rx) = tokio::sync::oneshot::channel();
864        bonus_permit.send(LiquidationBonusRequest {
865            request_id: liquidation_bonus_request_id(sequence),
866            wallet: winner,
867            amount: bonus,
868            timestamp_ms,
869            sequence: Some(sequence),
870            applied_tx: Some(applied_tx),
871        });
872
873        match tokio::time::timeout(Duration::from_secs(10), applied_rx).await {
874            Ok(Ok(Ok(()))) => Ok(()),
875            Ok(Ok(Err(error))) => panic!(
876                "RUNTIME_INVARIANT: persisted liquidation bonus for {} at sequence {} but engine rejected balance_ledger update: {}",
877                winner, sequence, error
878            ),
879            Ok(Err(error)) => panic!(
880                "RUNTIME_INVARIANT: persisted liquidation bonus for {} at sequence {} but engine dropped balance_ledger update ack: {}",
881                winner, sequence, error
882            ),
883            Err(_) => panic!(
884                "RUNTIME_INVARIANT: persisted liquidation bonus for {} at sequence {} but engine ack timed out",
885                winner, sequence
886            ),
887        }
888    }
889
890    async fn handle_liquidated(
891        &self,
892        log: Log,
893        event: &StaticAuctioneer::Liquidated,
894    ) -> Result<()> {
895        let wallet = WalletAddress::from(event.account);
896        let Some(status) = self.cache.get_status(&wallet).await else {
897            warn!(wallet = %wallet, "Ignoring Liquidated for untracked wallet");
898            return Ok(());
899        };
900        let Some(auction_id) = status.state.auction_id().map(ToOwned::to_owned) else {
901            warn!(wallet = %wallet, "Ignoring Liquidated without tracked auction id");
902            return Ok(());
903        };
904        let existing_auction = self.db.get_liquidation_auction(&auction_id).await?;
905        self.db
906            .update_liquidation_auction(
907                &auction_id,
908                &hypercall_db::LiquidationAuctionUpdate {
909                    status: Some("completed".to_string()),
910                    request_id: None,
911                    tx_hash: None,
912                    chain_start_time: None,
913                    margin_needed: None,
914                    stop_request_id: None,
915                    stop_tx_hash: None,
916                    completed_at: Some(i64::try_from(get_timestamp_millis())?),
917                    liquidator_address: Some(WalletAddress::from(event.liquidator)),
918                    bonus: existing_auction.as_ref().and_then(|auction| auction.bonus),
919                    settlement_value: Some(decimal_from_u256_units(event.amount)?),
920                    last_observed_block: log.block_number.map(i64::try_from).transpose()?,
921                },
922            )
923            .await?;
924        counter!("ht_liquidation_liquidated_total").increment(1);
925        Ok(())
926    }
927
928    async fn handle_transaction_update(&self, update: TransactionUpdate) -> Result<()> {
929        let Some((_wallet, mut status)) =
930            self.find_status_by_request_id(&update.request_id).await?
931        else {
932            return Ok(());
933        };
934
935        let previous_state = status.state.clone();
936        let mut changed = false;
937        match &mut status.state {
938            LiquidationState::PreLiquidation(metadata)
939                if metadata.pending_full_request_id.as_deref()
940                    == Some(update.request_id.as_str()) =>
941            {
942                match update.status {
943                    TransactionStatus::Confirmed => {
944                        if metadata.pending_full_tx_hash != update.tx_hash {
945                            metadata.pending_full_tx_hash = update.tx_hash.clone();
946                            changed = true;
947                        }
948                    }
949                    TransactionStatus::Failed | TransactionStatus::Expired => {
950                        metadata.pending_full_auction_id = None;
951                        metadata.pending_full_request_id = None;
952                        metadata.pending_full_tx_hash = None;
953                        metadata.pending_full_margin_needed = None;
954                        changed = true;
955                    }
956                    TransactionStatus::Pending | TransactionStatus::Submitted => {}
957                }
958            }
959            LiquidationState::InLiquidation(metadata)
960                if metadata.request_id.as_deref() == Some(update.request_id.as_str()) =>
961            {
962                match update.status {
963                    TransactionStatus::Confirmed => {
964                        if metadata.tx_hash != update.tx_hash {
965                            metadata.tx_hash = update.tx_hash.clone();
966                            changed = true;
967                        }
968                    }
969                    TransactionStatus::Pending
970                    | TransactionStatus::Submitted
971                    | TransactionStatus::Failed
972                    | TransactionStatus::Expired => {}
973                }
974            }
975            LiquidationState::InLiquidation(metadata)
976                if metadata.stop_request_id.as_deref() == Some(update.request_id.as_str()) =>
977            {
978                match update.status {
979                    TransactionStatus::Confirmed => {
980                        if metadata.stop_tx_hash != update.tx_hash {
981                            metadata.stop_tx_hash = update.tx_hash.clone();
982                            changed = true;
983                        }
984                    }
985                    TransactionStatus::Failed | TransactionStatus::Expired => {
986                        metadata.stop_request_id = None;
987                        metadata.stop_tx_hash = None;
988                        changed = true;
989                    }
990                    TransactionStatus::Pending | TransactionStatus::Submitted => {}
991                }
992            }
993            _ => {}
994        }
995
996        if !changed {
997            return Ok(());
998        }
999
1000        let timestamp = update.timestamp;
1001        status.updated_at = timestamp;
1002        self.cache.set_status(status.clone()).await;
1003        self.emit_status_update(&previous_state, &status, timestamp);
1004        Ok(())
1005    }
1006
1007    async fn find_status_by_request_id(
1008        &self,
1009        request_id: &str,
1010    ) -> Result<Option<(WalletAddress, AccountLiquidationStatus)>> {
1011        for wallet in self.cache.get_all_wallets().await {
1012            let Some(status) = self.cache.get_status(&wallet).await else {
1013                continue;
1014            };
1015            match &status.state {
1016                LiquidationState::PreLiquidation(metadata)
1017                    if metadata.pending_full_request_id.as_deref() == Some(request_id) =>
1018                {
1019                    return Ok(Some((wallet, status)));
1020                }
1021                LiquidationState::InLiquidation(metadata)
1022                    if metadata.request_id.as_deref() == Some(request_id)
1023                        || metadata.stop_request_id.as_deref() == Some(request_id) =>
1024                {
1025                    return Ok(Some((wallet, status)));
1026                }
1027                _ => {}
1028            }
1029        }
1030        Ok(None)
1031    }
1032
1033    fn emit_status_update(
1034        &self,
1035        previous_state: &LiquidationState,
1036        status: &AccountLiquidationStatus,
1037        timestamp: u64,
1038    ) {
1039        self.emit_status_update_with_auction(
1040            previous_state,
1041            status,
1042            timestamp,
1043            status
1044                .state
1045                .auction_id()
1046                .map(ToOwned::to_owned)
1047                .or_else(|| previous_state.auction_id().map(ToOwned::to_owned)),
1048        );
1049    }
1050
1051    fn emit_status_update_with_auction(
1052        &self,
1053        previous_state: &LiquidationState,
1054        status: &AccountLiquidationStatus,
1055        timestamp: u64,
1056        auction_id: Option<String>,
1057    ) {
1058        let Some(sender) = &self.event_sender else {
1059            return;
1060        };
1061
1062        let msg = LiquidationStateMessage {
1063            wallet: status.wallet,
1064            previous_state: liquidation_state_to_type(previous_state),
1065            new_state: liquidation_state_to_type(&status.state),
1066            previous_liquidation_mode: previous_state
1067                .liquidation_mode()
1068                .map(|mode| mode.as_str().to_string()),
1069            liquidation_mode: status
1070                .liquidation_mode()
1071                .map(|mode| mode.as_str().to_string()),
1072            margin_mode: status.margin_mode.as_str().to_string(),
1073            equity: status.equity,
1074            mm_required: status.mm_required,
1075            maintenance_margin: status.maintenance_margin,
1076            shortfall: status.shortfall(),
1077            previous_auction_id: previous_state.auction_id().map(str::to_string),
1078            projection_changed: has_material_projection_change(previous_state, &status.state),
1079            auction_id,
1080            status: status.clone(),
1081            timestamp,
1082        };
1083
1084        if let Err(error) = sender.send(EngineMessage::LiquidationStateChange(msg)) {
1085            warn!(
1086                "Failed to emit liquidation observer status update: {}",
1087                error
1088            );
1089        }
1090    }
1091}
1092
1093fn liquidation_state_to_type(state: &LiquidationState) -> LiquidationStateType {
1094    match state {
1095        LiquidationState::Healthy => LiquidationStateType::Healthy,
1096        LiquidationState::PreLiquidation(..) => LiquidationStateType::PreLiquidation,
1097        LiquidationState::InLiquidation(..) => LiquidationStateType::InLiquidation,
1098        LiquidationState::Liquidated(..) => LiquidationStateType::Liquidated,
1099    }
1100}
1101
1102fn should_sync_pending_chain_state(state: &LiquidationState) -> bool {
1103    matches!(
1104        state,
1105        LiquidationState::PreLiquidation(..) | LiquidationState::InLiquidation(..)
1106    )
1107}
1108
1109fn next_reconciliation_block(
1110    last_observed_block: Option<i64>,
1111    latest_block: u64,
1112    max_lag_blocks: u64,
1113) -> Result<u64> {
1114    match last_observed_block {
1115        Some(block) => {
1116            let observed = u64::try_from(block).map_err(|_| {
1117                anyhow!(
1118                    "negative liquidation last_observed_block {} persisted in database",
1119                    block
1120                )
1121            })?;
1122            observed
1123                .checked_add(1)
1124                .ok_or_else(|| anyhow!("liquidation last_observed_block overflow: {}", observed))
1125        }
1126        None => Ok(latest_block.saturating_sub(max_lag_blocks.saturating_sub(1))),
1127    }
1128}
1129
1130fn log_sort_key(log: &Log) -> (u64, u64, u64) {
1131    (
1132        log.block_number.unwrap_or_default(),
1133        log.transaction_index.unwrap_or_default(),
1134        log.log_index.unwrap_or_default(),
1135    )
1136}
1137
1138fn decimal_from_u256_units(value: U256) -> Result<Decimal> {
1139    let raw = Decimal::from_str(&value.to_string())
1140        .map_err(|error| anyhow!("failed to parse U256 {} as Decimal: {}", value, error))?;
1141    Ok(raw / CONTRACT_UNIT_MULTIPLIER_DECIMAL)
1142}
1143
1144fn u256_to_u64(value: U256) -> Result<Option<u64>> {
1145    if value == U256::ZERO {
1146        return Ok(None);
1147    }
1148    let raw = value.to_string();
1149    Ok(Some(raw.parse::<u64>().map_err(|error| {
1150        anyhow!("failed to convert U256 {} to u64: {}", value, error)
1151    })?))
1152}
1153
1154fn tracked_auction_id_for_replay(
1155    state: &LiquidationState,
1156    wallet: WalletAddress,
1157    event_name: &str,
1158) -> Option<String> {
1159    match state.auction_id() {
1160        Some(auction_id) => Some(auction_id.to_owned()),
1161        None => {
1162            warn!(
1163                wallet = %wallet,
1164                state = ?state,
1165                "Ignoring stale {} event without tracked auction",
1166                event_name
1167            );
1168            None
1169        }
1170    }
1171}
1172
1173fn replay_started_auction_id(
1174    wallet: WalletAddress,
1175    persisted_auction_id: Option<&str>,
1176    chain_start_time: Option<u64>,
1177    tx_hash: Option<&str>,
1178) -> String {
1179    if let Some(auction_id) = persisted_auction_id {
1180        return auction_id.to_owned();
1181    }
1182
1183    let derived = match (chain_start_time, tx_hash) {
1184        (Some(start_time), _) => format!("chain:{}:{}", wallet, start_time),
1185        (None, Some(tx_hash)) => format!("tx:{}:{}", wallet, tx_hash),
1186        (None, None) => format!("wallet:{}", wallet),
1187    };
1188    warn!(
1189        wallet = %wallet,
1190        auction_id = %derived,
1191        "Recovering liquidation replay without persisted pending_full_auction_id"
1192    );
1193    derived
1194}
1195
1196#[async_trait::async_trait]
1197impl crate::shared::service::Service for LiquidationChainObserver {
1198    fn name(&self) -> &'static str {
1199        "LiquidationChainObserver"
1200    }
1201
1202    fn owner(&self) -> crate::shared::service::ServiceOwner {
1203        crate::shared::service::ServiceOwner::Engine
1204    }
1205
1206    async fn run(
1207        self: std::sync::Arc<Self>,
1208        shutdown: crate::shared::ShutdownRx,
1209    ) -> anyhow::Result<()> {
1210        self.run_with_shutdown(shutdown).await
1211    }
1212}
1213
1214fn liquidation_bonus_request_id(ledger_event_id: u64) -> String {
1215    const LIQUIDATION_BONUS_UUID_PREFIX: u128 = 0x4c49_5142_4f4e_5500_0000_0000_0000_0000;
1216    uuid::Uuid::from_u128(LIQUIDATION_BONUS_UUID_PREFIX | u128::from(ledger_event_id)).to_string()
1217}
1218
1219#[cfg(test)]
1220mod tests {
1221    use super::{
1222        decimal_from_u256_units, liquidation_bonus_request_id, next_reconciliation_block,
1223        replay_started_auction_id, should_sync_pending_chain_state, tracked_auction_id_for_replay,
1224        u256_to_u64,
1225    };
1226    use crate::liquidator::state::{
1227        FullLiquidationMetadata, LiquidationState, PartialLiquidationMetadata,
1228    };
1229    use alloy::primitives::U256;
1230    use hypercall_types::wallet_address::test_wallet;
1231    use rust_decimal_macros::dec;
1232
1233    #[test]
1234    fn liquidation_bonus_request_id_is_durable_sequence_stable_uuid() {
1235        let first = liquidation_bonus_request_id(42);
1236        let second = liquidation_bonus_request_id(42);
1237        let different = liquidation_bonus_request_id(43);
1238
1239        assert_eq!(first, second);
1240        assert_ne!(first, different);
1241        uuid::Uuid::parse_str(&first).expect("liquidation bonus request id must be a UUID");
1242    }
1243
1244    #[test]
1245    fn test_decimal_from_u256_units() {
1246        assert_eq!(
1247            decimal_from_u256_units(U256::from(12_345_678u64)).unwrap(),
1248            dec!(12.345678)
1249        );
1250    }
1251
1252    #[test]
1253    fn test_u256_to_u64() {
1254        assert_eq!(u256_to_u64(U256::ZERO).unwrap(), None);
1255        assert_eq!(u256_to_u64(U256::from(42u64)).unwrap(), Some(42));
1256    }
1257
1258    #[test]
1259    fn test_tracked_auction_id_for_replay_skips_healthy_state() {
1260        assert_eq!(
1261            tracked_auction_id_for_replay(
1262                &LiquidationState::Healthy,
1263                test_wallet(1),
1264                "LiquidationStopped"
1265            ),
1266            None
1267        );
1268    }
1269
1270    #[test]
1271    fn test_replay_started_auction_id_preserves_cached_value() {
1272        assert_eq!(
1273            replay_started_auction_id(test_wallet(1), Some("auction-123"), Some(42), Some("0xabc"),),
1274            "auction-123"
1275        );
1276    }
1277
1278    #[test]
1279    fn test_replay_started_auction_id_derives_from_chain_start_time() {
1280        let wallet = test_wallet(7);
1281        assert_eq!(
1282            replay_started_auction_id(wallet, None, Some(42), Some("0xabc")),
1283            format!("chain:{}:42", wallet)
1284        );
1285    }
1286
1287    #[test]
1288    fn test_should_sync_pending_chain_state_for_pre_liquidation_without_pending_request() {
1289        assert!(should_sync_pending_chain_state(
1290            &LiquidationState::PreLiquidation(PartialLiquidationMetadata::default())
1291        ));
1292    }
1293
1294    #[test]
1295    fn test_should_sync_pending_chain_state_for_in_liquidation_with_known_chain_start_time() {
1296        assert!(should_sync_pending_chain_state(
1297            &LiquidationState::InLiquidation(FullLiquidationMetadata {
1298                auction_id: "auction-1".to_string(),
1299                chain_start_time: Some(55),
1300                ..FullLiquidationMetadata::default()
1301            })
1302        ));
1303    }
1304
1305    #[test]
1306    fn test_next_reconciliation_block_advances_last_observed_block() {
1307        assert_eq!(next_reconciliation_block(Some(41), 99, 20).unwrap(), 42);
1308        assert_eq!(next_reconciliation_block(None, 99, 20).unwrap(), 80);
1309    }
1310}