Skip to main content

hypercall/
rsm_deposit_credit_observer.rs

1use std::str::FromStr;
2use std::sync::Arc;
3use std::time::Duration;
4
5use alloy::{
6    primitives::{Address, B256, U256},
7    providers::{DynProvider, Provider, ProviderBuilder},
8    rpc::types::{BlockNumberOrTag, Filter, Log},
9    sol,
10    sol_types::SolEvent,
11};
12use anyhow::{anyhow, Context, Result};
13use async_trait::async_trait;
14use hypercall_engine::command::{EngineCommand, RecordPmVaultDepositCommand};
15use hypercall_types::WalletAddress;
16use metrics::{counter, gauge};
17use rust_decimal::Decimal;
18use rust_decimal_macros::dec;
19use sha3::{Digest, Keccak256};
20use tokio::sync::{broadcast, mpsc, oneshot};
21use tracing::{info, warn};
22use uuid::Uuid;
23
24use crate::rsm::unified_engine::OptionDepositRequest;
25use crate::shared::order_types::get_timestamp_millis;
26use crate::shared::service::{Service, ServiceOwner};
27use hypercall_db::{
28    OptionInstrumentForCredit, RsmDepositCreditClaimInput, RsmDepositCreditClaimRecord,
29};
30use hypercall_db_diesel::DatabaseHandler;
31
32const CREDIT_KIND_OPTION: &str = "option";
33const CREDIT_KIND_USDC: &str = "usdc";
34const CREDIT_KIND_PM_LIQUIDITY: &str = "pm_liquidity";
35const RPC_TIMEOUT: Duration = Duration::from_secs(30);
36
37sol! {
38    #[sol(rpc)]
39    contract Exchange {
40        event Deposit(address indexed account, address indexed from, address indexed token, uint256 amount);
41        event UsdcDeposit(address indexed account, address indexed from, address indexed token, uint256 amount, uint32 dstDex);
42        event PmLiquidityDeposit(address indexed lp, address indexed from, address indexed token, string underlying, uint256 amount, uint32 dstDex);
43        function managers(address account) external view returns (address);
44    }
45}
46
47#[derive(Clone, Debug)]
48pub struct RsmDepositCreditObserverConfig {
49    pub poll_interval: Duration,
50    pub startup_lookback_blocks: u64,
51    pub max_blocks_per_poll: u64,
52    pub max_blocks_per_get_logs: u64,
53    pub confirmation_blocks: u64,
54    pub pm_liquidity_settlement_grace_ms: i64,
55}
56
57impl RsmDepositCreditObserverConfig {
58    pub fn from_runtime_config(
59        config: &crate::backend_config::OnchainDepositsRuntimeConfig,
60    ) -> Self {
61        Self {
62            poll_interval: Duration::from_millis(config.poll_interval_ms),
63            startup_lookback_blocks: config.startup_lookback_blocks,
64            max_blocks_per_poll: config.max_blocks_per_poll,
65            max_blocks_per_get_logs: config.max_blocks_per_get_logs,
66            confirmation_blocks: config.confirmation_blocks,
67            pm_liquidity_settlement_grace_ms: config.pm_liquidity_settlement_grace_ms,
68        }
69    }
70}
71
72#[async_trait]
73pub trait DepositCreditStore: Send + Sync + 'static {
74    async fn max_observed_block(&self) -> Result<Option<u64>>;
75    async fn register_observed_account(&self, account: &WalletAddress) -> Result<()>;
76    async fn claim_deposit_credit(
77        &self,
78        input: &RsmDepositCreditClaimInput,
79    ) -> Result<RsmDepositCreditClaimRecord>;
80    async fn mark_submitted(&self, request_id: &str) -> Result<()>;
81    async fn mark_failed(&self, request_id: &str, error: &str) -> Result<()>;
82    async fn option_instrument_for_credit(
83        &self,
84        token: &WalletAddress,
85    ) -> Result<Option<OptionInstrumentForCredit>>;
86    async fn max_active_option_expiry_ms(&self, underlying: &str) -> Result<Option<i64>>;
87}
88
89#[async_trait]
90impl DepositCreditStore for DatabaseHandler {
91    async fn max_observed_block(&self) -> Result<Option<u64>> {
92        self.get_max_rsm_deposit_credit_observed_block().await
93    }
94
95    async fn register_observed_account(&self, account: &WalletAddress) -> Result<()> {
96        self.ensure_observed_deposit_account(account).await
97    }
98
99    async fn claim_deposit_credit(
100        &self,
101        input: &RsmDepositCreditClaimInput,
102    ) -> Result<RsmDepositCreditClaimRecord> {
103        self.claim_rsm_deposit_credit(input).await
104    }
105
106    async fn mark_submitted(&self, request_id: &str) -> Result<()> {
107        self.mark_rsm_deposit_credit_submitted(request_id).await
108    }
109
110    async fn mark_failed(&self, request_id: &str, error: &str) -> Result<()> {
111        self.mark_rsm_deposit_credit_failed(request_id, error).await
112    }
113
114    async fn option_instrument_for_credit(
115        &self,
116        token: &WalletAddress,
117    ) -> Result<Option<OptionInstrumentForCredit>> {
118        self.get_option_instrument_for_credit(token).await
119    }
120
121    async fn max_active_option_expiry_ms(&self, underlying: &str) -> Result<Option<i64>> {
122        use hypercall_db::InstrumentReader;
123        use hypercall_types::api_models::InstrumentStatus;
124
125        let instruments =
126            self.get_instruments_by_status_sync(InstrumentStatus::Active.as_db_str())?;
127        instruments
128            .into_iter()
129            .filter(|instrument| instrument.underlying == underlying)
130            .map(|instrument| {
131                let expiry = u64::try_from(instrument.expiry).with_context(|| {
132                    format!(
133                        "Invalid negative expiry {} for active instrument {}",
134                        instrument.expiry, instrument.id
135                    )
136                })?;
137                let expiry_ts = hypercall_types::expiry_date_to_timestamp_checked(
138                    &instrument.underlying,
139                    expiry,
140                )
141                .map_err(|error| {
142                    anyhow!(
143                        "Invalid expiry {} for active instrument {}: {}",
144                        instrument.expiry,
145                        instrument.id,
146                        error
147                    )
148                })?;
149                let expiry_ms = i64::try_from(expiry_ts)
150                    .context("active option expiry timestamp exceeds i64")?
151                    .checked_mul(1_000)
152                    .ok_or_else(|| anyhow!("active option expiry timestamp ms overflow"))?;
153                Ok(expiry_ms)
154            })
155            .try_fold(None, |max: Option<i64>, expiry_ms: Result<i64>| {
156                let expiry_ms = expiry_ms?;
157                Ok(Some(
158                    max.map_or(expiry_ms, |current| current.max(expiry_ms)),
159                ))
160            })
161    }
162}
163
164#[async_trait]
165pub trait DepositManagerResolver: Send + Sync + 'static {
166    async fn manager_for_account(&self, account: WalletAddress) -> Result<WalletAddress>;
167}
168
169#[async_trait]
170pub trait DepositCreditApplier: Send + Sync + 'static {
171    async fn apply_option_deposit(&self, request: OptionDepositRequest) -> Result<()>;
172    async fn apply_pm_liquidity_deposit(&self, command: RecordPmVaultDepositCommand) -> Result<()>;
173}
174
175#[async_trait]
176impl DepositCreditApplier for mpsc::Sender<OptionDepositRequest> {
177    async fn apply_option_deposit(&self, mut request: OptionDepositRequest) -> Result<()> {
178        let (tx, rx) = oneshot::channel();
179        request.applied_tx = Some(tx);
180        self.send(request)
181            .await
182            .map_err(|_| anyhow!("option deposit receiver closed"))?;
183        rx.await
184            .map_err(|_| anyhow!("option deposit apply acknowledgement dropped"))?
185            .map_err(|err| anyhow!(err))
186    }
187
188    async fn apply_pm_liquidity_deposit(
189        &self,
190        _command: RecordPmVaultDepositCommand,
191    ) -> Result<()> {
192        anyhow::bail!("PM liquidity deposits require a PM settlement command sender")
193    }
194}
195
196fn record_deposit_observer_block_gauges(latest_block: u64, finalized_block: u64, next_block: u64) {
197    gauge!("ht_rsm_deposit_credit_observer_current_block").set(latest_block as f64);
198    gauge!("ht_rsm_deposit_credit_observer_finalized_block").set(finalized_block as f64);
199    gauge!("ht_rsm_deposit_credit_observer_next_block").set(next_block as f64);
200    gauge!("ht_rsm_deposit_credit_observer_finalized_lag_blocks")
201        .set(finalized_block.saturating_sub(next_block) as f64);
202    gauge!("ht_rsm_deposit_credit_observer_unfinalized_blocks")
203        .set(latest_block.saturating_sub(finalized_block) as f64);
204}
205
206pub struct RsmDepositCreditObserver {
207    store: Arc<dyn DepositCreditStore>,
208    applier: Arc<dyn DepositCreditApplier>,
209    provider: DynProvider,
210    exchange_address: Address,
211    usdc_address: Address,
212    config: RsmDepositCreditObserverConfig,
213}
214
215impl RsmDepositCreditObserver {
216    pub fn new(
217        db: Arc<DatabaseHandler>,
218        option_deposit_sender: mpsc::Sender<OptionDepositRequest>,
219        pm_settlement_sender: mpsc::Sender<hypercall_runtime_api::PmSettlementAdminRequest>,
220        rpc_url: &str,
221        exchange_contract_address: &str,
222        usdc_contract_address: &str,
223        config: RsmDepositCreditObserverConfig,
224    ) -> Result<Self> {
225        let exchange_address = Address::from_str(exchange_contract_address)
226            .context("invalid contracts.exchange_contract_address")?;
227        let usdc_address =
228            Address::from_str(usdc_contract_address).context("invalid contracts.usdc_address")?;
229        let provider = ProviderBuilder::new()
230            .connect_http(
231                rpc_url
232                    .parse()
233                    .context("invalid transaction_submitter.rpc_url")?,
234            )
235            .erased();
236
237        Ok(Self {
238            store: db,
239            applier: Arc::new(DepositCreditChannelApplier {
240                option_deposit_sender,
241                pm_settlement_sender,
242            }),
243            provider,
244            exchange_address,
245            usdc_address,
246            config,
247        })
248    }
249
250    pub async fn run_with_shutdown(&self, mut shutdown_rx: broadcast::Receiver<()>) -> Result<()> {
251        let mut interval = tokio::time::interval(self.config.poll_interval);
252        let mut next_block = self.initial_next_block().await?;
253
254        loop {
255            tokio::select! {
256                _ = shutdown_rx.recv() => {
257                    info!("RSM deposit credit observer received shutdown signal");
258                    break;
259                }
260                _ = interval.tick() => {
261                    if let Err(error) = self.poll_chain(&mut next_block).await {
262                        counter!("ht_rsm_deposit_credit_errors_total", "stage" => "poll")
263                            .increment(1);
264                        warn!("RSM deposit credit observer poll failed: {}", error);
265                    }
266                }
267            }
268        }
269
270        Ok(())
271    }
272
273    async fn initial_next_block(&self) -> Result<u64> {
274        let latest_block =
275            rpc_timeout("get_block_number", self.provider.get_block_number()).await?;
276        let finalized_block = latest_block.saturating_sub(self.config.confirmation_blocks);
277        let next_block = initial_next_block(
278            self.store.max_observed_block().await?,
279            finalized_block,
280            self.config.startup_lookback_blocks,
281        )?;
282
283        info!(
284            next_block,
285            latest_block,
286            finalized_block,
287            confirmation_blocks = self.config.confirmation_blocks,
288            "RSM deposit credit observer initialized"
289        );
290        record_deposit_observer_block_gauges(latest_block, finalized_block, next_block);
291        Ok(next_block)
292    }
293
294    async fn poll_chain(&self, next_block: &mut u64) -> Result<()> {
295        let latest_block =
296            rpc_timeout("get_block_number", self.provider.get_block_number()).await?;
297        let finalized_block = latest_block.saturating_sub(self.config.confirmation_blocks);
298        record_deposit_observer_block_gauges(latest_block, finalized_block, *next_block);
299        if *next_block > finalized_block {
300            gauge!("ht_rsm_deposit_credit_observer_lag_blocks").set(0.0);
301            return Ok(());
302        }
303
304        let Some((from_block, to_block)) = next_poll_range(
305            *next_block,
306            finalized_block,
307            self.config.max_blocks_per_poll,
308        ) else {
309            gauge!("ht_rsm_deposit_credit_observer_lag_blocks").set(0.0);
310            return Ok(());
311        };
312
313        let lag_blocks = latest_block - from_block;
314        gauge!("ht_rsm_deposit_credit_observer_lag_blocks").set(lag_blocks as f64);
315        if to_block < latest_block {
316            warn!(
317                next_block = from_block,
318                to_block,
319                latest_block,
320                finalized_block,
321                lag_blocks,
322                max_blocks_per_poll = self.config.max_blocks_per_poll,
323                "RSM deposit credit observer is catching up"
324            );
325        }
326
327        self.process_logs(from_block, to_block).await?;
328        gauge!("ht_rsm_deposit_credit_observer_last_success_block").set(to_block as f64);
329        *next_block = to_block
330            .checked_add(1)
331            .ok_or_else(|| anyhow!("deposit observer next block overflow"))?;
332        record_deposit_observer_block_gauges(latest_block, finalized_block, *next_block);
333        Ok(())
334    }
335
336    async fn process_logs(&self, from_block: u64, to_block: u64) -> Result<()> {
337        for (chunk_from, chunk_to) in
338            block_chunks_inclusive(from_block, to_block, self.config.max_blocks_per_get_logs)
339        {
340            let mut logs = self
341                .fetch_event_logs(chunk_from, chunk_to, Exchange::Deposit::SIGNATURE_HASH)
342                .await?;
343            logs.extend(
344                self.fetch_event_logs(chunk_from, chunk_to, Exchange::UsdcDeposit::SIGNATURE_HASH)
345                    .await?,
346            );
347            logs.extend(
348                self.fetch_event_logs(
349                    chunk_from,
350                    chunk_to,
351                    Exchange::PmLiquidityDeposit::SIGNATURE_HASH,
352                )
353                .await?,
354            );
355            counter!("ht_rsm_deposit_credit_get_logs_total").increment(1);
356            logs.sort_by_key(log_sort_key);
357
358            for log in logs {
359                self.handle_log(log).await?;
360            }
361        }
362        Ok(())
363    }
364
365    async fn fetch_event_logs(
366        &self,
367        from_block: u64,
368        to_block: u64,
369        signature: B256,
370    ) -> Result<Vec<Log>> {
371        rpc_timeout(
372            "get_logs",
373            self.provider.get_logs(
374                &Filter::new()
375                    .address(self.exchange_address)
376                    .from_block(BlockNumberOrTag::Number(from_block))
377                    .to_block(BlockNumberOrTag::Number(to_block))
378                    .event_signature(signature),
379            ),
380        )
381        .await
382        .with_context(|| {
383            format!(
384                "failed to fetch Exchange deposit logs signature={} from {} to {}",
385                signature, from_block, to_block
386            )
387        })
388    }
389
390    async fn handle_log(&self, log: Log) -> Result<()> {
391        let signature = log
392            .topic0()
393            .copied()
394            .ok_or_else(|| anyhow!("Exchange deposit log missing topic0"))?;
395        if signature == Exchange::UsdcDeposit::SIGNATURE_HASH {
396            return self.handle_usdc_deposit_log(log).await;
397        }
398        if signature == Exchange::PmLiquidityDeposit::SIGNATURE_HASH {
399            return self.handle_pm_liquidity_deposit_log(log).await;
400        }
401
402        let decoded = log
403            .log_decode::<Exchange::Deposit>()
404            .inspect_err(|_| {
405                counter!("ht_rsm_deposit_credit_errors_total", "stage" => "decode").increment(1);
406            })
407            .context("failed to decode Exchange.Deposit log")?;
408        let event = decoded.data();
409        let tx_hash = log
410            .transaction_hash
411            .ok_or_else(|| anyhow!("Exchange.Deposit missing transaction hash"))?
412            .to_string();
413        let log_index = i64::try_from(
414            log.log_index
415                .ok_or_else(|| anyhow!("Exchange.Deposit missing log index"))?,
416        )?;
417        let observed_block = i64::try_from(
418            log.block_number
419                .ok_or_else(|| anyhow!("Exchange.Deposit missing block number"))?,
420        )?;
421        let account = WalletAddress::from(event.account);
422        let token = WalletAddress::from(event.token);
423
424        handle_observed_deposit(
425            self.store.as_ref(),
426            self.applier.as_ref(),
427            self,
428            ObservedDeposit {
429                tx_hash,
430                log_index,
431                observed_block,
432                account,
433                token,
434                amount: event.amount,
435            },
436        )
437        .await
438    }
439
440    async fn handle_usdc_deposit_log(&self, log: Log) -> Result<()> {
441        let decoded = log
442            .log_decode::<Exchange::UsdcDeposit>()
443            .inspect_err(|_| {
444                counter!("ht_rsm_deposit_credit_errors_total", "stage" => "decode").increment(1);
445            })
446            .context("failed to decode Exchange.UsdcDeposit log")?;
447        let event = decoded.data();
448        if event.dstDex != 0 {
449            counter!("ht_rsm_deposit_credit_errors_total", "stage" => "unsupported_dst_dex")
450                .increment(1);
451            tracing::warn!(
452                dst_dex = event.dstDex,
453                tx_hash = ?log.transaction_hash,
454                "Exchange.UsdcDeposit has unsupported dstDex {} for tx {:?}",
455                event.dstDex,
456                log.transaction_hash
457            );
458            return Ok(());
459        }
460
461        let tx_hash = log
462            .transaction_hash
463            .ok_or_else(|| anyhow!("Exchange.UsdcDeposit missing transaction hash"))?
464            .to_string();
465        let log_index = i64::try_from(
466            log.log_index
467                .ok_or_else(|| anyhow!("Exchange.UsdcDeposit missing log index"))?,
468        )?;
469        let observed_block = i64::try_from(
470            log.block_number
471                .ok_or_else(|| anyhow!("Exchange.UsdcDeposit missing block number"))?,
472        )?;
473        let claim_input = RsmDepositCreditClaimInput {
474            tx_hash,
475            log_index,
476            observed_block,
477            account: WalletAddress::from(event.account),
478            token: WalletAddress::from(event.token),
479            amount_wei: event.amount.to_string(),
480            credit_kind: CREDIT_KIND_USDC.to_string(),
481            request_id: Uuid::now_v7().to_string(),
482        };
483        let claim = self
484            .store
485            .claim_deposit_credit(&claim_input)
486            .await
487            .inspect_err(|_| {
488                counter!("ht_rsm_deposit_credit_errors_total", "stage" => "claim").increment(1);
489            })?;
490        validate_claim(&claim_input, &claim);
491        info!(
492            account = %claim.account,
493            from = %WalletAddress::from(event.from),
494            token = %claim.token,
495            amount_wei = %claim.amount_wei,
496            request_id = %claim.request_id,
497            tx_hash = %claim.tx_hash,
498            log_index = claim.log_index,
499            "Observed Exchange.UsdcDeposit pending HyperCore correlation"
500        );
501        Ok(())
502    }
503
504    async fn handle_pm_liquidity_deposit_log(&self, log: Log) -> Result<()> {
505        let decoded = log
506            .log_decode::<Exchange::PmLiquidityDeposit>()
507            .inspect_err(|_| {
508                counter!("ht_rsm_deposit_credit_errors_total", "stage" => "decode").increment(1);
509            })
510            .context("failed to decode Exchange.PmLiquidityDeposit log")?;
511        let event = decoded.data();
512        if event.dstDex != 0 {
513            counter!("ht_rsm_deposit_credit_errors_total", "stage" => "unsupported_dst_dex")
514                .increment(1);
515            warn!(
516                dst_dex = event.dstDex,
517                tx_hash = ?log.transaction_hash,
518                "Exchange.PmLiquidityDeposit has unsupported dstDex"
519            );
520            return Ok(());
521        }
522        if event.token != self.usdc_address {
523            anyhow::bail!(
524                "Exchange.PmLiquidityDeposit token {} did not match configured USDC {}",
525                event.token,
526                self.usdc_address
527            );
528        }
529
530        let tx_hash = log
531            .transaction_hash
532            .ok_or_else(|| anyhow!("Exchange.PmLiquidityDeposit missing transaction hash"))?
533            .to_string();
534        let log_index_u64 = log
535            .log_index
536            .ok_or_else(|| anyhow!("Exchange.PmLiquidityDeposit missing log index"))?;
537        let log_index = i64::try_from(log_index_u64)?;
538        let observed_block = i64::try_from(
539            log.block_number
540                .ok_or_else(|| anyhow!("Exchange.PmLiquidityDeposit missing block number"))?,
541        )?;
542        let chain_id = rpc_timeout("get_chain_id", self.provider.get_chain_id()).await?;
543        handle_observed_pm_liquidity_deposit(
544            self.store.as_ref(),
545            self.applier.as_ref(),
546            ObservedPmLiquidityDeposit {
547                tx_hash,
548                log_index,
549                observed_block,
550                lp: WalletAddress::from(event.lp),
551                from: WalletAddress::from(event.from),
552                token: WalletAddress::from(event.token),
553                underlying: event.underlying.clone(),
554                amount: event.amount,
555                chain_id,
556                exchange_address: self.exchange_address.to_string(),
557                log_index_u64,
558                settlement_grace_ms: self.config.pm_liquidity_settlement_grace_ms,
559            },
560        )
561        .await
562    }
563}
564
565async fn handle_observed_pm_liquidity_deposit(
566    store: &dyn DepositCreditStore,
567    applier: &dyn DepositCreditApplier,
568    deposit: ObservedPmLiquidityDeposit,
569) -> Result<()> {
570    let request_id = Uuid::now_v7();
571    let claim_input = RsmDepositCreditClaimInput {
572        tx_hash: deposit.tx_hash.clone(),
573        log_index: deposit.log_index,
574        observed_block: deposit.observed_block,
575        account: deposit.lp,
576        token: deposit.token,
577        amount_wei: deposit.amount.to_string(),
578        credit_kind: CREDIT_KIND_PM_LIQUIDITY.to_string(),
579        request_id: request_id.to_string(),
580    };
581    let claim = store
582        .claim_deposit_credit(&claim_input)
583        .await
584        .inspect_err(|_| {
585            counter!("ht_rsm_deposit_credit_errors_total", "stage" => "claim").increment(1);
586        })?;
587    validate_claim(&claim_input, &claim);
588    if claim.status == "submitted" {
589        return Ok(());
590    }
591
592    let max_listed_expiry_ts_ms = match store
593        .max_active_option_expiry_ms(&deposit.underlying)
594        .await?
595    {
596        Some(expiry) => expiry,
597        None => {
598            let error = format!(
599                "PM liquidity deposit has no active listed options for underlying {}",
600                deposit.underlying
601            );
602            store.mark_failed(&claim.request_id, &error).await?;
603            warn!(
604                request_id = %claim.request_id,
605                tx_hash = %claim.tx_hash,
606                log_index = claim.log_index,
607                underlying = %deposit.underlying,
608                "Quarantined invalid PM liquidity deposit"
609            );
610            return Ok(());
611        }
612    };
613    let amount_usdc = usdc_token_amount_to_decimal(deposit.amount)?;
614    let claim_request_id = Uuid::parse_str(&claim.request_id)
615        .context("rsm_deposit_credits PM liquidity request_id must be a UUID")?;
616    let input_digest = pm_liquidity_input_digest(
617        deposit.chain_id,
618        &deposit.exchange_address,
619        &deposit.tx_hash,
620        deposit.log_index_u64,
621        &claim.account.to_string(),
622        &deposit.underlying,
623        &claim.amount_wei,
624        max_listed_expiry_ts_ms,
625        deposit.settlement_grace_ms,
626    );
627    applier
628        .apply_pm_liquidity_deposit(RecordPmVaultDepositCommand {
629            request_id: claim_request_id,
630            input_digest,
631            depositor: claim.account,
632            underlying: deposit.underlying.clone(),
633            amount_usdc,
634            chain_id: deposit.chain_id,
635            source_contract_address: WalletAddress::from_str(&deposit.exchange_address)
636                .context("PM liquidity source contract address must be a valid EVM address")?,
637            tx_hash: claim.tx_hash.clone(),
638            log_index: u32::try_from(deposit.log_index_u64)
639                .context("PM liquidity deposit log index exceeds u32")?,
640            max_listed_expiry_ts_ms,
641            settlement_grace_ms: deposit.settlement_grace_ms,
642            timestamp_ms: get_timestamp_millis(),
643        })
644        .await
645        .inspect_err(|_| {
646            counter!("ht_rsm_deposit_credit_errors_total", "stage" => "pm_engine_apply")
647                .increment(1);
648        })?;
649    store
650        .mark_submitted(&claim.request_id)
651        .await
652        .inspect_err(|_| {
653            counter!("ht_rsm_deposit_credit_errors_total", "stage" => "mark_submitted")
654                .increment(1);
655        })?;
656    counter!("ht_pm_liquidity_deposits_submitted_total").increment(1);
657    info!(
658        lp = %claim.account,
659        from = %deposit.from,
660        underlying = %deposit.underlying,
661        amount_usdc = %amount_usdc,
662        request_id = %claim.request_id,
663        tx_hash = %claim.tx_hash,
664        log_index = claim.log_index,
665        max_listed_expiry_ts_ms,
666        "Applied PM liquidity deposit for Exchange.PmLiquidityDeposit"
667    );
668    Ok(())
669}
670
671#[derive(Debug, Clone)]
672struct ObservedPmLiquidityDeposit {
673    tx_hash: String,
674    log_index: i64,
675    observed_block: i64,
676    lp: WalletAddress,
677    from: WalletAddress,
678    token: WalletAddress,
679    underlying: String,
680    amount: U256,
681    chain_id: u64,
682    exchange_address: String,
683    log_index_u64: u64,
684    settlement_grace_ms: i64,
685}
686
687struct DepositCreditChannelApplier {
688    option_deposit_sender: mpsc::Sender<OptionDepositRequest>,
689    pm_settlement_sender: mpsc::Sender<hypercall_runtime_api::PmSettlementAdminRequest>,
690}
691
692#[async_trait]
693impl DepositCreditApplier for DepositCreditChannelApplier {
694    async fn apply_option_deposit(&self, request: OptionDepositRequest) -> Result<()> {
695        self.option_deposit_sender
696            .apply_option_deposit(request)
697            .await
698    }
699
700    async fn apply_pm_liquidity_deposit(&self, command: RecordPmVaultDepositCommand) -> Result<()> {
701        let (tx, rx) = oneshot::channel();
702        self.pm_settlement_sender
703            .send(hypercall_runtime_api::PmSettlementAdminRequest {
704                command: EngineCommand::RecordPmVaultDeposit(command),
705                applied_tx: tx,
706            })
707            .await
708            .map_err(|_| anyhow!("PM settlement command receiver closed"))?;
709        rx.await
710            .map_err(|_| anyhow!("PM settlement command acknowledgement dropped"))?
711            .map_err(|err| anyhow!(err))
712    }
713}
714
715#[async_trait]
716impl DepositManagerResolver for RsmDepositCreditObserver {
717    async fn manager_for_account(&self, account: WalletAddress) -> Result<WalletAddress> {
718        let exchange = Exchange::new(self.exchange_address, &self.provider);
719        let manager = rpc_timeout("Exchange.managers", async {
720            exchange.managers(account.inner()).call().await
721        })
722        .await
723        .with_context(|| format!("failed to resolve Exchange manager for {account}"))?;
724        if manager == Address::ZERO {
725            counter!("ht_rsm_deposit_credit_errors_total", "stage" => "manager_lookup")
726                .increment(1);
727            anyhow::bail!("Exchange.managers({account}) returned zero address");
728        }
729        Ok(WalletAddress::from(manager))
730    }
731}
732
733async fn handle_observed_deposit(
734    store: &dyn DepositCreditStore,
735    applier: &dyn DepositCreditApplier,
736    manager_resolver: &dyn DepositManagerResolver,
737    deposit: ObservedDeposit,
738) -> Result<()> {
739    let manager = manager_resolver
740        .manager_for_account(deposit.account)
741        .await?;
742    store
743        .register_observed_account(&manager)
744        .await
745        .inspect_err(|_| {
746            counter!("ht_rsm_deposit_credit_errors_total", "stage" => "register_account")
747                .increment(1);
748        })?;
749
750    let amount_wei = deposit.amount.to_string();
751    let claim_input = RsmDepositCreditClaimInput {
752        tx_hash: deposit.tx_hash,
753        log_index: deposit.log_index,
754        observed_block: deposit.observed_block,
755        account: deposit.account,
756        token: deposit.token,
757        amount_wei,
758        credit_kind: CREDIT_KIND_OPTION.to_string(),
759        request_id: Uuid::now_v7().to_string(),
760    };
761    let claim = store
762        .claim_deposit_credit(&claim_input)
763        .await
764        .inspect_err(|_| {
765            counter!("ht_rsm_deposit_credit_errors_total", "stage" => "claim").increment(1);
766        })?;
767    validate_claim(&claim_input, &claim);
768    if claim.status == "submitted" {
769        return Ok(());
770    }
771
772    let instrument = match store
773        .option_instrument_for_credit(&deposit.token)
774        .await
775        .inspect_err(|_| {
776            counter!("ht_rsm_deposit_credit_errors_total", "stage" => "instrument_lookup")
777                .increment(1);
778        })? {
779        Some(instrument) => instrument,
780        None => {
781            counter!("ht_rsm_deposit_credit_errors_total", "stage" => "unsupported_token")
782                .increment(1);
783            anyhow::bail!(
784                "deposit token has no option instrument mapping: account={}, token={}, tx_hash={}, log_index={}",
785                claim_input.account,
786                claim_input.token,
787                claim_input.tx_hash,
788                claim_input.log_index
789            );
790        }
791    };
792    let quantity = option_token_amount_to_quantity(deposit.amount)?;
793    applier
794        .apply_option_deposit(OptionDepositRequest {
795            request_id: claim.request_id.clone(),
796            wallet: manager,
797            symbol: instrument.symbol.clone(),
798            quantity,
799            timestamp_ms: get_timestamp_millis(),
800            applied_tx: None,
801        })
802        .await
803        .inspect_err(|_| {
804            counter!("ht_rsm_deposit_credit_errors_total", "stage" => "engine_apply").increment(1);
805        })?;
806    store
807        .mark_submitted(&claim.request_id)
808        .await
809        .inspect_err(|_| {
810            counter!("ht_rsm_deposit_credit_errors_total", "stage" => "mark_submitted")
811                .increment(1);
812        })?;
813    counter!("ht_rsm_deposit_credit_submitted_total").increment(1);
814    info!(
815        account = %deposit.account,
816        manager = %manager,
817        token = %deposit.token,
818        symbol = %instrument.symbol,
819        quantity = %quantity,
820        request_id = %claim.request_id,
821        tx_hash = %claim.tx_hash,
822        log_index = claim.log_index,
823        "Applied option deposit credit for Exchange.Deposit"
824    );
825    Ok(())
826}
827
828#[derive(Debug, Clone)]
829struct ObservedDeposit {
830    tx_hash: String,
831    log_index: i64,
832    observed_block: i64,
833    account: WalletAddress,
834    token: WalletAddress,
835    amount: U256,
836}
837
838#[async_trait::async_trait]
839impl Service for RsmDepositCreditObserver {
840    fn name(&self) -> &'static str {
841        "RsmDepositCreditObserver"
842    }
843
844    fn owner(&self) -> ServiceOwner {
845        ServiceOwner::Shared
846    }
847
848    async fn run(self: Arc<Self>, shutdown: crate::shared::shutdown::ShutdownRx) -> Result<()> {
849        self.run_with_shutdown(shutdown).await
850    }
851}
852
853fn option_token_amount_to_quantity(amount: U256) -> Result<Decimal> {
854    let raw = Decimal::from_str(&amount.to_string())
855        .with_context(|| format!("failed to parse option token amount {}", amount))?;
856    let quantity = raw / dec!(1000000);
857    if quantity <= Decimal::ZERO {
858        anyhow::bail!(
859            "option deposit amount {} converts to non-positive quantity",
860            amount
861        );
862    }
863    Ok(quantity)
864}
865
866fn usdc_token_amount_to_decimal(amount: U256) -> Result<Decimal> {
867    let raw = Decimal::from_str(&amount.to_string())
868        .with_context(|| format!("failed to parse USDC token amount {}", amount))?;
869    let amount_usdc = raw / dec!(1000000);
870    if amount_usdc <= Decimal::ZERO {
871        anyhow::bail!(
872            "PM liquidity deposit amount {} converts to non-positive USDC",
873            amount
874        );
875    }
876    Ok(amount_usdc)
877}
878
879fn pm_liquidity_input_digest(
880    chain_id: u64,
881    exchange_address: &str,
882    tx_hash: &str,
883    log_index: u64,
884    lp: &str,
885    underlying: &str,
886    amount_wei: &str,
887    max_listed_expiry_ts_ms: i64,
888    settlement_grace_ms: i64,
889) -> String {
890    let mut hasher = Keccak256::new();
891    for part in [
892        "pm-liquidity-v1",
893        &chain_id.to_string(),
894        exchange_address,
895        tx_hash,
896        &log_index.to_string(),
897        lp,
898        underlying,
899        amount_wei,
900        &max_listed_expiry_ts_ms.to_string(),
901        &settlement_grace_ms.to_string(),
902    ] {
903        hasher.update((part.len() as u64).to_le_bytes());
904        hasher.update(part.as_bytes());
905    }
906    format!("0x{}", hex::encode(hasher.finalize()))
907}
908
909fn validate_claim(input: &RsmDepositCreditClaimInput, claim: &RsmDepositCreditClaimRecord) {
910    if input.tx_hash != claim.tx_hash
911        || input.log_index != claim.log_index
912        || input.account != claim.account
913        || input.token != claim.token
914        || input.amount_wei != claim.amount_wei
915        || input.credit_kind != claim.credit_kind
916    {
917        panic!(
918            "STATE_CORRUPTION: rsm_deposit_credits claim mismatch for tx_hash={} log_index={}",
919            input.tx_hash, input.log_index
920        );
921    }
922}
923
924fn log_sort_key(log: &Log) -> (u64, u64, u64) {
925    (
926        log.block_number.unwrap_or_default(),
927        log.transaction_index.unwrap_or_default(),
928        log.log_index.unwrap_or_default(),
929    )
930}
931
932fn initial_next_block(
933    max_observed_block: Option<u64>,
934    latest_block: u64,
935    startup_lookback_blocks: u64,
936) -> Result<u64> {
937    if startup_lookback_blocks == 0 {
938        return Err(anyhow!("startup_lookback_blocks must be greater than zero"));
939    }
940
941    if let Some(max_observed_block) = max_observed_block {
942        // Resume from the last observed block, not the next block. Blocks can
943        // contain multiple deposit logs, and the claim table's tx_hash/log_index
944        // uniqueness makes replaying the boundary block idempotent.
945        return Ok(max_observed_block);
946    }
947
948    // First boot must not scan from genesis. The bounded lookback catches recent
949    // deposits that landed before this process started without turning startup
950    // into an unbounded historical backfill.
951    Ok(latest_block.saturating_sub(startup_lookback_blocks.saturating_sub(1)))
952}
953
954async fn rpc_timeout<T>(
955    operation: &'static str,
956    future: impl std::future::Future<Output = Result<T, impl std::fmt::Display>>,
957) -> Result<T> {
958    tokio::time::timeout(RPC_TIMEOUT, future)
959        .await
960        .map_err(|_| anyhow!("{} timed out after {:?}", operation, RPC_TIMEOUT))?
961        .map_err(|error| anyhow!("{} failed: {}", operation, error))
962}
963
964fn next_poll_range(
965    next_block: u64,
966    latest_block: u64,
967    max_blocks_per_poll: u64,
968) -> Option<(u64, u64)> {
969    if next_block > latest_block {
970        return None;
971    }
972
973    let to_block = next_block
974        .saturating_add(max_blocks_per_poll.saturating_sub(1))
975        .min(latest_block);
976    Some((next_block, to_block))
977}
978
979fn block_chunks_inclusive(
980    from_block: u64,
981    to_block: u64,
982    max_blocks_per_get_logs: u64,
983) -> Vec<(u64, u64)> {
984    assert!(
985        max_blocks_per_get_logs > 0,
986        "max_blocks_per_get_logs must be greater than zero"
987    );
988    if from_block > to_block {
989        return Vec::new();
990    }
991
992    let mut chunks = Vec::new();
993    let mut chunk_from = from_block;
994    while chunk_from <= to_block {
995        // RPC providers cap eth_getLogs spans. Keep the provider request range
996        // separate from the per-poll catchup range so a tick makes bounded
997        // progress while still respecting tighter upstream limits.
998        let chunk_to = chunk_from
999            .saturating_add(max_blocks_per_get_logs.saturating_sub(1))
1000            .min(to_block);
1001        chunks.push((chunk_from, chunk_to));
1002        if chunk_to == u64::MAX {
1003            break;
1004        }
1005        chunk_from = chunk_to + 1;
1006    }
1007    chunks
1008}
1009
1010#[cfg(test)]
1011mod tests {
1012    use super::*;
1013    use std::sync::Mutex;
1014
1015    use rust_decimal_macros::dec;
1016
1017    struct MockStore {
1018        claim_status: String,
1019        instrument: Option<OptionInstrumentForCredit>,
1020        max_active_option_expiry_ms: Option<i64>,
1021        registered_accounts: Mutex<Vec<WalletAddress>>,
1022        claims: Mutex<Vec<RsmDepositCreditClaimInput>>,
1023        submitted: Mutex<Vec<String>>,
1024        failed: Mutex<Vec<(String, String)>>,
1025    }
1026
1027    impl MockStore {
1028        fn new(instrument: Option<OptionInstrumentForCredit>) -> Self {
1029            Self {
1030                claim_status: "pending".to_string(),
1031                instrument,
1032                max_active_option_expiry_ms: Some(1_800_000_000_000),
1033                registered_accounts: Mutex::new(Vec::new()),
1034                claims: Mutex::new(Vec::new()),
1035                submitted: Mutex::new(Vec::new()),
1036                failed: Mutex::new(Vec::new()),
1037            }
1038        }
1039    }
1040
1041    #[async_trait::async_trait]
1042    impl DepositCreditStore for MockStore {
1043        async fn max_observed_block(&self) -> Result<Option<u64>> {
1044            Ok(None)
1045        }
1046
1047        async fn register_observed_account(&self, account: &WalletAddress) -> Result<()> {
1048            self.registered_accounts.lock().unwrap().push(*account);
1049            Ok(())
1050        }
1051
1052        async fn claim_deposit_credit(
1053            &self,
1054            input: &RsmDepositCreditClaimInput,
1055        ) -> Result<RsmDepositCreditClaimRecord> {
1056            self.claims.lock().unwrap().push(input.clone());
1057            Ok(RsmDepositCreditClaimRecord {
1058                tx_hash: input.tx_hash.clone(),
1059                log_index: input.log_index,
1060                observed_block: input.observed_block,
1061                account: input.account,
1062                token: input.token,
1063                amount_wei: input.amount_wei.clone(),
1064                credit_kind: input.credit_kind.clone(),
1065                request_id: input.request_id.clone(),
1066                status: self.claim_status.clone(),
1067            })
1068        }
1069
1070        async fn mark_submitted(&self, request_id: &str) -> Result<()> {
1071            self.submitted.lock().unwrap().push(request_id.to_string());
1072            Ok(())
1073        }
1074
1075        async fn mark_failed(&self, request_id: &str, error: &str) -> Result<()> {
1076            self.failed
1077                .lock()
1078                .unwrap()
1079                .push((request_id.to_string(), error.to_string()));
1080            Ok(())
1081        }
1082
1083        async fn option_instrument_for_credit(
1084            &self,
1085            _token: &WalletAddress,
1086        ) -> Result<Option<OptionInstrumentForCredit>> {
1087            Ok(self.instrument.clone())
1088        }
1089
1090        async fn max_active_option_expiry_ms(&self, _underlying: &str) -> Result<Option<i64>> {
1091            Ok(self.max_active_option_expiry_ms)
1092        }
1093    }
1094
1095    #[derive(Default)]
1096    struct MockApplier {
1097        applied: Mutex<Vec<OptionDepositRequest>>,
1098        pm_applied: Mutex<Vec<RecordPmVaultDepositCommand>>,
1099    }
1100
1101    struct MockManagerResolver {
1102        manager: WalletAddress,
1103    }
1104
1105    #[async_trait::async_trait]
1106    impl DepositManagerResolver for MockManagerResolver {
1107        async fn manager_for_account(&self, _account: WalletAddress) -> Result<WalletAddress> {
1108            Ok(self.manager)
1109        }
1110    }
1111
1112    #[async_trait::async_trait]
1113    impl DepositCreditApplier for MockApplier {
1114        async fn apply_option_deposit(&self, request: OptionDepositRequest) -> Result<()> {
1115            self.applied.lock().unwrap().push(request);
1116            Ok(())
1117        }
1118
1119        async fn apply_pm_liquidity_deposit(
1120            &self,
1121            command: RecordPmVaultDepositCommand,
1122        ) -> Result<()> {
1123            self.pm_applied.lock().unwrap().push(command);
1124            Ok(())
1125        }
1126    }
1127
1128    fn btc_call_instrument() -> OptionInstrumentForCredit {
1129        OptionInstrumentForCredit {
1130            symbol: "BTC-20260130-100000-C".to_string(),
1131            underlying: "BTC".to_string(),
1132            expiry: 20260130,
1133            strike: dec!(100000),
1134            option_type: "call".to_string(),
1135        }
1136    }
1137
1138    #[test]
1139    fn option_token_amount_to_quantity_uses_token_decimals() {
1140        assert_eq!(
1141            option_token_amount_to_quantity(U256::from(1_500_000)).unwrap(),
1142            dec!(1.5)
1143        );
1144    }
1145
1146    #[test]
1147    #[should_panic(expected = "STATE_CORRUPTION")]
1148    fn validate_claim_panics_on_reused_log_with_different_payload() {
1149        let account = WalletAddress::from(Address::repeat_byte(1));
1150        let token = WalletAddress::from(Address::repeat_byte(2));
1151        let input = RsmDepositCreditClaimInput {
1152            tx_hash: "0xabc".to_string(),
1153            log_index: 1,
1154            observed_block: 10,
1155            account,
1156            token,
1157            amount_wei: "1".to_string(),
1158            credit_kind: "option".to_string(),
1159            request_id: "018f0000-0000-7000-8000-000000000001".to_string(),
1160        };
1161        let claim = RsmDepositCreditClaimRecord {
1162            tx_hash: input.tx_hash.clone(),
1163            log_index: input.log_index,
1164            observed_block: input.observed_block,
1165            account,
1166            token,
1167            amount_wei: "2".to_string(),
1168            credit_kind: input.credit_kind.clone(),
1169            request_id: input.request_id.clone(),
1170            status: "pending".to_string(),
1171        };
1172
1173        validate_claim(&input, &claim);
1174    }
1175
1176    #[test]
1177    fn initial_next_block_replays_watermark_block_after_restart() {
1178        let next = initial_next_block(Some(42), 100, 32).expect("watermark should resume");
1179
1180        assert_eq!(next, 42);
1181    }
1182
1183    #[test]
1184    fn initial_next_block_uses_bounded_startup_lookback() {
1185        let next = initial_next_block(None, 100, 32).expect("lookback should compute");
1186
1187        assert_eq!(next, 69);
1188    }
1189
1190    #[test]
1191    fn next_poll_range_caps_catchup_per_tick() {
1192        assert_eq!(next_poll_range(10, 10_000, 500), Some((10, 509)));
1193        assert_eq!(next_poll_range(10_001, 10_000, 500), None);
1194    }
1195
1196    #[test]
1197    fn block_chunks_inclusive_splits_get_logs_ranges() {
1198        let chunks = block_chunks_inclusive(10, 25, 7);
1199
1200        assert_eq!(chunks, vec![(10, 16), (17, 23), (24, 25)]);
1201    }
1202
1203    #[tokio::test]
1204    async fn handle_observed_deposit_claims_applies_and_marks_submitted() {
1205        let account = WalletAddress::from(Address::repeat_byte(1));
1206        let manager = WalletAddress::from(Address::repeat_byte(3));
1207        let token = WalletAddress::from(Address::repeat_byte(2));
1208        let store = MockStore::new(Some(btc_call_instrument()));
1209        let applier = MockApplier::default();
1210        let manager_resolver = MockManagerResolver { manager };
1211
1212        handle_observed_deposit(
1213            &store,
1214            &applier,
1215            &manager_resolver,
1216            ObservedDeposit {
1217                tx_hash: "0xabc".to_string(),
1218                log_index: 7,
1219                observed_block: 100,
1220                account,
1221                token,
1222                amount: U256::from(1_000_000),
1223            },
1224        )
1225        .await
1226        .expect("deposit should be published");
1227
1228        assert_eq!(
1229            store.registered_accounts.lock().unwrap().as_slice(),
1230            &[manager]
1231        );
1232
1233        let claims = store.claims.lock().unwrap();
1234        assert_eq!(claims.len(), 1);
1235        assert_eq!(claims[0].tx_hash, "0xabc");
1236        assert_eq!(claims[0].log_index, 7);
1237        assert_eq!(claims[0].observed_block, 100);
1238        assert_eq!(claims[0].account, account);
1239        assert_eq!(claims[0].token, token);
1240        assert_eq!(claims[0].amount_wei, "1000000");
1241        assert_eq!(claims[0].credit_kind, CREDIT_KIND_OPTION);
1242
1243        let applied = applier.applied.lock().unwrap();
1244        assert_eq!(applied.len(), 1);
1245        assert_eq!(applied[0].request_id, claims[0].request_id);
1246        assert_eq!(applied[0].wallet, manager);
1247        assert_eq!(applied[0].symbol, "BTC-20260130-100000-C");
1248        assert_eq!(applied[0].quantity, dec!(1));
1249
1250        assert_eq!(
1251            store.submitted.lock().unwrap().as_slice(),
1252            &[claims[0].request_id.clone()]
1253        );
1254    }
1255
1256    #[tokio::test]
1257    async fn handle_observed_pm_liquidity_deposit_claims_applies_and_marks_submitted() {
1258        let lp = WalletAddress::from(Address::repeat_byte(11));
1259        let payer = WalletAddress::from(Address::repeat_byte(12));
1260        let token = WalletAddress::from(Address::repeat_byte(13));
1261        let store = MockStore {
1262            claim_status: "pending".to_string(),
1263            instrument: None,
1264            max_active_option_expiry_ms: Some(1_800_000_000_000),
1265            registered_accounts: Mutex::new(Vec::new()),
1266            claims: Mutex::new(Vec::new()),
1267            submitted: Mutex::new(Vec::new()),
1268            failed: Mutex::new(Vec::new()),
1269        };
1270        let applier = MockApplier::default();
1271
1272        handle_observed_pm_liquidity_deposit(
1273            &store,
1274            &applier,
1275            ObservedPmLiquidityDeposit {
1276                tx_hash: "0xpm".to_string(),
1277                log_index: 9,
1278                observed_block: 100,
1279                lp,
1280                from: payer,
1281                token,
1282                underlying: "BTC".to_string(),
1283                amount: U256::from(50_000_000_000_u64),
1284                chain_id: 999,
1285                exchange_address: "0x1111111111111111111111111111111111111111".to_string(),
1286                log_index_u64: 9,
1287                settlement_grace_ms: 86_400_000,
1288            },
1289        )
1290        .await
1291        .expect("PM liquidity deposit should be journaled");
1292
1293        let claims = store.claims.lock().unwrap();
1294        assert_eq!(claims.len(), 1);
1295        assert_eq!(claims[0].tx_hash, "0xpm");
1296        assert_eq!(claims[0].log_index, 9);
1297        assert_eq!(claims[0].account, lp);
1298        assert_eq!(claims[0].token, token);
1299        assert_eq!(claims[0].amount_wei, "50000000000");
1300        assert_eq!(claims[0].credit_kind, CREDIT_KIND_PM_LIQUIDITY);
1301
1302        let applied = applier.pm_applied.lock().unwrap();
1303        assert_eq!(applied.len(), 1);
1304        assert_eq!(applied[0].request_id.to_string(), claims[0].request_id);
1305        assert_eq!(applied[0].depositor, lp);
1306        assert_eq!(applied[0].underlying, "BTC");
1307        assert_eq!(applied[0].amount_usdc, dec!(50000));
1308        assert_eq!(applied[0].chain_id, 999);
1309        assert_eq!(
1310            applied[0].source_contract_address,
1311            WalletAddress::from_str("0x1111111111111111111111111111111111111111").unwrap()
1312        );
1313        assert_eq!(applied[0].tx_hash, "0xpm");
1314        assert_eq!(applied[0].log_index, 9);
1315        assert_eq!(applied[0].max_listed_expiry_ts_ms, 1_800_000_000_000);
1316        assert_eq!(applied[0].settlement_grace_ms, 86_400_000);
1317        assert!(applied[0].input_digest.starts_with("0x"));
1318
1319        assert_eq!(
1320            store.submitted.lock().unwrap().as_slice(),
1321            &[claims[0].request_id.clone()]
1322        );
1323    }
1324
1325    #[tokio::test]
1326    async fn handle_observed_pm_liquidity_deposit_quarantines_unknown_underlying() {
1327        let lp = WalletAddress::from(Address::repeat_byte(11));
1328        let payer = WalletAddress::from(Address::repeat_byte(12));
1329        let token = WalletAddress::from(Address::repeat_byte(13));
1330        let store = MockStore {
1331            claim_status: "pending".to_string(),
1332            instrument: None,
1333            max_active_option_expiry_ms: None,
1334            registered_accounts: Mutex::new(Vec::new()),
1335            claims: Mutex::new(Vec::new()),
1336            submitted: Mutex::new(Vec::new()),
1337            failed: Mutex::new(Vec::new()),
1338        };
1339        let applier = MockApplier::default();
1340
1341        handle_observed_pm_liquidity_deposit(
1342            &store,
1343            &applier,
1344            ObservedPmLiquidityDeposit {
1345                tx_hash: "0xpm".to_string(),
1346                log_index: 9,
1347                observed_block: 100,
1348                lp,
1349                from: payer,
1350                token,
1351                underlying: "NOT_LISTED".to_string(),
1352                amount: U256::from(50_000_000_000_u64),
1353                chain_id: 999,
1354                exchange_address: "0x1111111111111111111111111111111111111111".to_string(),
1355                log_index_u64: 9,
1356                settlement_grace_ms: 86_400_000,
1357            },
1358        )
1359        .await
1360        .expect("invalid PM liquidity deposit should be quarantined, not retried forever");
1361
1362        let claims = store.claims.lock().unwrap();
1363        assert_eq!(claims.len(), 1);
1364        assert!(applier.pm_applied.lock().unwrap().is_empty());
1365        assert!(store.submitted.lock().unwrap().is_empty());
1366        let failed = store.failed.lock().unwrap();
1367        assert_eq!(failed.len(), 1);
1368        assert_eq!(failed[0].0, claims[0].request_id);
1369        assert!(
1370            failed[0].1.contains("no active listed options"),
1371            "unexpected failure reason: {}",
1372            failed[0].1
1373        );
1374    }
1375
1376    #[tokio::test]
1377    async fn handle_observed_deposit_claims_then_errors_on_unsupported_token() {
1378        let account = WalletAddress::from(Address::repeat_byte(1));
1379        let manager = WalletAddress::from(Address::repeat_byte(3));
1380        let store = MockStore::new(None);
1381        let applier = MockApplier::default();
1382        let manager_resolver = MockManagerResolver { manager };
1383
1384        let result = handle_observed_deposit(
1385            &store,
1386            &applier,
1387            &manager_resolver,
1388            ObservedDeposit {
1389                tx_hash: "0xabc".to_string(),
1390                log_index: 7,
1391                observed_block: 100,
1392                account,
1393                token: WalletAddress::from(Address::repeat_byte(2)),
1394                amount: U256::from(1_000_000),
1395            },
1396        )
1397        .await;
1398
1399        let error = result.expect_err("unsupported token should fail the observer path");
1400        assert!(
1401            error.to_string().contains("no option instrument mapping"),
1402            "unexpected error: {error:#}"
1403        );
1404        assert_eq!(
1405            store.registered_accounts.lock().unwrap().as_slice(),
1406            &[manager]
1407        );
1408        let claims = store.claims.lock().unwrap();
1409        assert_eq!(claims.len(), 1);
1410        assert_eq!(claims[0].tx_hash, "0xabc");
1411        assert_eq!(claims[0].log_index, 7);
1412        assert!(applier.applied.lock().unwrap().is_empty());
1413        assert!(store.submitted.lock().unwrap().is_empty());
1414    }
1415
1416    #[tokio::test]
1417    async fn handle_observed_deposit_skips_already_submitted_claim() {
1418        let store = MockStore {
1419            claim_status: "submitted".to_string(),
1420            instrument: None,
1421            max_active_option_expiry_ms: Some(1_800_000_000_000),
1422            registered_accounts: Mutex::new(Vec::new()),
1423            claims: Mutex::new(Vec::new()),
1424            submitted: Mutex::new(Vec::new()),
1425            failed: Mutex::new(Vec::new()),
1426        };
1427        let applier = MockApplier::default();
1428        let manager_resolver = MockManagerResolver {
1429            manager: WalletAddress::from(Address::repeat_byte(3)),
1430        };
1431
1432        handle_observed_deposit(
1433            &store,
1434            &applier,
1435            &manager_resolver,
1436            ObservedDeposit {
1437                tx_hash: "0xabc".to_string(),
1438                log_index: 7,
1439                observed_block: 100,
1440                account: WalletAddress::from(Address::repeat_byte(1)),
1441                token: WalletAddress::from(Address::repeat_byte(2)),
1442                amount: U256::from(1_000_000),
1443            },
1444        )
1445        .await
1446        .expect("submitted claim should be idempotent");
1447
1448        assert!(applier.applied.lock().unwrap().is_empty());
1449        assert!(store.submitted.lock().unwrap().is_empty());
1450    }
1451}