1use std::str::FromStr;
2use std::sync::Arc;
3use std::time::Duration;
4
5use alloy::{
6 primitives::{Address, B256, U256},
7 providers::{DynProvider, Provider, ProviderBuilder},
8 rpc::types::{BlockNumberOrTag, Filter, Log},
9 sol,
10 sol_types::SolEvent,
11};
12use anyhow::{anyhow, Context, Result};
13use async_trait::async_trait;
14use hypercall_engine::command::{EngineCommand, RecordPmVaultDepositCommand};
15use hypercall_types::WalletAddress;
16use metrics::{counter, gauge};
17use rust_decimal::Decimal;
18use rust_decimal_macros::dec;
19use sha3::{Digest, Keccak256};
20use tokio::sync::{broadcast, mpsc, oneshot};
21use tracing::{info, warn};
22use uuid::Uuid;
23
24use crate::rsm::unified_engine::OptionDepositRequest;
25use crate::shared::order_types::get_timestamp_millis;
26use crate::shared::service::{Service, ServiceOwner};
27use hypercall_db::{
28 OptionInstrumentForCredit, RsmDepositCreditClaimInput, RsmDepositCreditClaimRecord,
29};
30use hypercall_db_diesel::DatabaseHandler;
31
32const CREDIT_KIND_OPTION: &str = "option";
33const CREDIT_KIND_USDC: &str = "usdc";
34const CREDIT_KIND_PM_LIQUIDITY: &str = "pm_liquidity";
35const RPC_TIMEOUT: Duration = Duration::from_secs(30);
36
37sol! {
38 #[sol(rpc)]
39 contract Exchange {
40 event Deposit(address indexed account, address indexed from, address indexed token, uint256 amount);
41 event UsdcDeposit(address indexed account, address indexed from, address indexed token, uint256 amount, uint32 dstDex);
42 event PmLiquidityDeposit(address indexed lp, address indexed from, address indexed token, string underlying, uint256 amount, uint32 dstDex);
43 function managers(address account) external view returns (address);
44 }
45}
46
47#[derive(Clone, Debug)]
48pub struct RsmDepositCreditObserverConfig {
49 pub poll_interval: Duration,
50 pub startup_lookback_blocks: u64,
51 pub max_blocks_per_poll: u64,
52 pub max_blocks_per_get_logs: u64,
53 pub confirmation_blocks: u64,
54 pub pm_liquidity_settlement_grace_ms: i64,
55}
56
57impl RsmDepositCreditObserverConfig {
58 pub fn from_runtime_config(
59 config: &crate::backend_config::OnchainDepositsRuntimeConfig,
60 ) -> Self {
61 Self {
62 poll_interval: Duration::from_millis(config.poll_interval_ms),
63 startup_lookback_blocks: config.startup_lookback_blocks,
64 max_blocks_per_poll: config.max_blocks_per_poll,
65 max_blocks_per_get_logs: config.max_blocks_per_get_logs,
66 confirmation_blocks: config.confirmation_blocks,
67 pm_liquidity_settlement_grace_ms: config.pm_liquidity_settlement_grace_ms,
68 }
69 }
70}
71
72#[async_trait]
73pub trait DepositCreditStore: Send + Sync + 'static {
74 async fn max_observed_block(&self) -> Result<Option<u64>>;
75 async fn register_observed_account(&self, account: &WalletAddress) -> Result<()>;
76 async fn claim_deposit_credit(
77 &self,
78 input: &RsmDepositCreditClaimInput,
79 ) -> Result<RsmDepositCreditClaimRecord>;
80 async fn mark_submitted(&self, request_id: &str) -> Result<()>;
81 async fn mark_failed(&self, request_id: &str, error: &str) -> Result<()>;
82 async fn option_instrument_for_credit(
83 &self,
84 token: &WalletAddress,
85 ) -> Result<Option<OptionInstrumentForCredit>>;
86 async fn max_active_option_expiry_ms(&self, underlying: &str) -> Result<Option<i64>>;
87}
88
89#[async_trait]
90impl DepositCreditStore for DatabaseHandler {
91 async fn max_observed_block(&self) -> Result<Option<u64>> {
92 self.get_max_rsm_deposit_credit_observed_block().await
93 }
94
95 async fn register_observed_account(&self, account: &WalletAddress) -> Result<()> {
96 self.ensure_observed_deposit_account(account).await
97 }
98
99 async fn claim_deposit_credit(
100 &self,
101 input: &RsmDepositCreditClaimInput,
102 ) -> Result<RsmDepositCreditClaimRecord> {
103 self.claim_rsm_deposit_credit(input).await
104 }
105
106 async fn mark_submitted(&self, request_id: &str) -> Result<()> {
107 self.mark_rsm_deposit_credit_submitted(request_id).await
108 }
109
110 async fn mark_failed(&self, request_id: &str, error: &str) -> Result<()> {
111 self.mark_rsm_deposit_credit_failed(request_id, error).await
112 }
113
114 async fn option_instrument_for_credit(
115 &self,
116 token: &WalletAddress,
117 ) -> Result<Option<OptionInstrumentForCredit>> {
118 self.get_option_instrument_for_credit(token).await
119 }
120
121 async fn max_active_option_expiry_ms(&self, underlying: &str) -> Result<Option<i64>> {
122 use hypercall_db::InstrumentReader;
123 use hypercall_types::api_models::InstrumentStatus;
124
125 let instruments =
126 self.get_instruments_by_status_sync(InstrumentStatus::Active.as_db_str())?;
127 instruments
128 .into_iter()
129 .filter(|instrument| instrument.underlying == underlying)
130 .map(|instrument| {
131 let expiry = u64::try_from(instrument.expiry).with_context(|| {
132 format!(
133 "Invalid negative expiry {} for active instrument {}",
134 instrument.expiry, instrument.id
135 )
136 })?;
137 let expiry_ts = hypercall_types::expiry_date_to_timestamp_checked(
138 &instrument.underlying,
139 expiry,
140 )
141 .map_err(|error| {
142 anyhow!(
143 "Invalid expiry {} for active instrument {}: {}",
144 instrument.expiry,
145 instrument.id,
146 error
147 )
148 })?;
149 let expiry_ms = i64::try_from(expiry_ts)
150 .context("active option expiry timestamp exceeds i64")?
151 .checked_mul(1_000)
152 .ok_or_else(|| anyhow!("active option expiry timestamp ms overflow"))?;
153 Ok(expiry_ms)
154 })
155 .try_fold(None, |max: Option<i64>, expiry_ms: Result<i64>| {
156 let expiry_ms = expiry_ms?;
157 Ok(Some(
158 max.map_or(expiry_ms, |current| current.max(expiry_ms)),
159 ))
160 })
161 }
162}
163
164#[async_trait]
165pub trait DepositManagerResolver: Send + Sync + 'static {
166 async fn manager_for_account(&self, account: WalletAddress) -> Result<WalletAddress>;
167}
168
169#[async_trait]
170pub trait DepositCreditApplier: Send + Sync + 'static {
171 async fn apply_option_deposit(&self, request: OptionDepositRequest) -> Result<()>;
172 async fn apply_pm_liquidity_deposit(&self, command: RecordPmVaultDepositCommand) -> Result<()>;
173}
174
175#[async_trait]
176impl DepositCreditApplier for mpsc::Sender<OptionDepositRequest> {
177 async fn apply_option_deposit(&self, mut request: OptionDepositRequest) -> Result<()> {
178 let (tx, rx) = oneshot::channel();
179 request.applied_tx = Some(tx);
180 self.send(request)
181 .await
182 .map_err(|_| anyhow!("option deposit receiver closed"))?;
183 rx.await
184 .map_err(|_| anyhow!("option deposit apply acknowledgement dropped"))?
185 .map_err(|err| anyhow!(err))
186 }
187
188 async fn apply_pm_liquidity_deposit(
189 &self,
190 _command: RecordPmVaultDepositCommand,
191 ) -> Result<()> {
192 anyhow::bail!("PM liquidity deposits require a PM settlement command sender")
193 }
194}
195
196fn record_deposit_observer_block_gauges(latest_block: u64, finalized_block: u64, next_block: u64) {
197 gauge!("ht_rsm_deposit_credit_observer_current_block").set(latest_block as f64);
198 gauge!("ht_rsm_deposit_credit_observer_finalized_block").set(finalized_block as f64);
199 gauge!("ht_rsm_deposit_credit_observer_next_block").set(next_block as f64);
200 gauge!("ht_rsm_deposit_credit_observer_finalized_lag_blocks")
201 .set(finalized_block.saturating_sub(next_block) as f64);
202 gauge!("ht_rsm_deposit_credit_observer_unfinalized_blocks")
203 .set(latest_block.saturating_sub(finalized_block) as f64);
204}
205
206pub struct RsmDepositCreditObserver {
207 store: Arc<dyn DepositCreditStore>,
208 applier: Arc<dyn DepositCreditApplier>,
209 provider: DynProvider,
210 exchange_address: Address,
211 usdc_address: Address,
212 config: RsmDepositCreditObserverConfig,
213}
214
215impl RsmDepositCreditObserver {
216 pub fn new(
217 db: Arc<DatabaseHandler>,
218 option_deposit_sender: mpsc::Sender<OptionDepositRequest>,
219 pm_settlement_sender: mpsc::Sender<hypercall_runtime_api::PmSettlementAdminRequest>,
220 rpc_url: &str,
221 exchange_contract_address: &str,
222 usdc_contract_address: &str,
223 config: RsmDepositCreditObserverConfig,
224 ) -> Result<Self> {
225 let exchange_address = Address::from_str(exchange_contract_address)
226 .context("invalid contracts.exchange_contract_address")?;
227 let usdc_address =
228 Address::from_str(usdc_contract_address).context("invalid contracts.usdc_address")?;
229 let provider = ProviderBuilder::new()
230 .connect_http(
231 rpc_url
232 .parse()
233 .context("invalid transaction_submitter.rpc_url")?,
234 )
235 .erased();
236
237 Ok(Self {
238 store: db,
239 applier: Arc::new(DepositCreditChannelApplier {
240 option_deposit_sender,
241 pm_settlement_sender,
242 }),
243 provider,
244 exchange_address,
245 usdc_address,
246 config,
247 })
248 }
249
250 pub async fn run_with_shutdown(&self, mut shutdown_rx: broadcast::Receiver<()>) -> Result<()> {
251 let mut interval = tokio::time::interval(self.config.poll_interval);
252 let mut next_block = self.initial_next_block().await?;
253
254 loop {
255 tokio::select! {
256 _ = shutdown_rx.recv() => {
257 info!("RSM deposit credit observer received shutdown signal");
258 break;
259 }
260 _ = interval.tick() => {
261 if let Err(error) = self.poll_chain(&mut next_block).await {
262 counter!("ht_rsm_deposit_credit_errors_total", "stage" => "poll")
263 .increment(1);
264 warn!("RSM deposit credit observer poll failed: {}", error);
265 }
266 }
267 }
268 }
269
270 Ok(())
271 }
272
273 async fn initial_next_block(&self) -> Result<u64> {
274 let latest_block =
275 rpc_timeout("get_block_number", self.provider.get_block_number()).await?;
276 let finalized_block = latest_block.saturating_sub(self.config.confirmation_blocks);
277 let next_block = initial_next_block(
278 self.store.max_observed_block().await?,
279 finalized_block,
280 self.config.startup_lookback_blocks,
281 )?;
282
283 info!(
284 next_block,
285 latest_block,
286 finalized_block,
287 confirmation_blocks = self.config.confirmation_blocks,
288 "RSM deposit credit observer initialized"
289 );
290 record_deposit_observer_block_gauges(latest_block, finalized_block, next_block);
291 Ok(next_block)
292 }
293
294 async fn poll_chain(&self, next_block: &mut u64) -> Result<()> {
295 let latest_block =
296 rpc_timeout("get_block_number", self.provider.get_block_number()).await?;
297 let finalized_block = latest_block.saturating_sub(self.config.confirmation_blocks);
298 record_deposit_observer_block_gauges(latest_block, finalized_block, *next_block);
299 if *next_block > finalized_block {
300 gauge!("ht_rsm_deposit_credit_observer_lag_blocks").set(0.0);
301 return Ok(());
302 }
303
304 let Some((from_block, to_block)) = next_poll_range(
305 *next_block,
306 finalized_block,
307 self.config.max_blocks_per_poll,
308 ) else {
309 gauge!("ht_rsm_deposit_credit_observer_lag_blocks").set(0.0);
310 return Ok(());
311 };
312
313 let lag_blocks = latest_block - from_block;
314 gauge!("ht_rsm_deposit_credit_observer_lag_blocks").set(lag_blocks as f64);
315 if to_block < latest_block {
316 warn!(
317 next_block = from_block,
318 to_block,
319 latest_block,
320 finalized_block,
321 lag_blocks,
322 max_blocks_per_poll = self.config.max_blocks_per_poll,
323 "RSM deposit credit observer is catching up"
324 );
325 }
326
327 self.process_logs(from_block, to_block).await?;
328 gauge!("ht_rsm_deposit_credit_observer_last_success_block").set(to_block as f64);
329 *next_block = to_block
330 .checked_add(1)
331 .ok_or_else(|| anyhow!("deposit observer next block overflow"))?;
332 record_deposit_observer_block_gauges(latest_block, finalized_block, *next_block);
333 Ok(())
334 }
335
336 async fn process_logs(&self, from_block: u64, to_block: u64) -> Result<()> {
337 for (chunk_from, chunk_to) in
338 block_chunks_inclusive(from_block, to_block, self.config.max_blocks_per_get_logs)
339 {
340 let mut logs = self
341 .fetch_event_logs(chunk_from, chunk_to, Exchange::Deposit::SIGNATURE_HASH)
342 .await?;
343 logs.extend(
344 self.fetch_event_logs(chunk_from, chunk_to, Exchange::UsdcDeposit::SIGNATURE_HASH)
345 .await?,
346 );
347 logs.extend(
348 self.fetch_event_logs(
349 chunk_from,
350 chunk_to,
351 Exchange::PmLiquidityDeposit::SIGNATURE_HASH,
352 )
353 .await?,
354 );
355 counter!("ht_rsm_deposit_credit_get_logs_total").increment(1);
356 logs.sort_by_key(log_sort_key);
357
358 for log in logs {
359 self.handle_log(log).await?;
360 }
361 }
362 Ok(())
363 }
364
365 async fn fetch_event_logs(
366 &self,
367 from_block: u64,
368 to_block: u64,
369 signature: B256,
370 ) -> Result<Vec<Log>> {
371 rpc_timeout(
372 "get_logs",
373 self.provider.get_logs(
374 &Filter::new()
375 .address(self.exchange_address)
376 .from_block(BlockNumberOrTag::Number(from_block))
377 .to_block(BlockNumberOrTag::Number(to_block))
378 .event_signature(signature),
379 ),
380 )
381 .await
382 .with_context(|| {
383 format!(
384 "failed to fetch Exchange deposit logs signature={} from {} to {}",
385 signature, from_block, to_block
386 )
387 })
388 }
389
390 async fn handle_log(&self, log: Log) -> Result<()> {
391 let signature = log
392 .topic0()
393 .copied()
394 .ok_or_else(|| anyhow!("Exchange deposit log missing topic0"))?;
395 if signature == Exchange::UsdcDeposit::SIGNATURE_HASH {
396 return self.handle_usdc_deposit_log(log).await;
397 }
398 if signature == Exchange::PmLiquidityDeposit::SIGNATURE_HASH {
399 return self.handle_pm_liquidity_deposit_log(log).await;
400 }
401
402 let decoded = log
403 .log_decode::<Exchange::Deposit>()
404 .inspect_err(|_| {
405 counter!("ht_rsm_deposit_credit_errors_total", "stage" => "decode").increment(1);
406 })
407 .context("failed to decode Exchange.Deposit log")?;
408 let event = decoded.data();
409 let tx_hash = log
410 .transaction_hash
411 .ok_or_else(|| anyhow!("Exchange.Deposit missing transaction hash"))?
412 .to_string();
413 let log_index = i64::try_from(
414 log.log_index
415 .ok_or_else(|| anyhow!("Exchange.Deposit missing log index"))?,
416 )?;
417 let observed_block = i64::try_from(
418 log.block_number
419 .ok_or_else(|| anyhow!("Exchange.Deposit missing block number"))?,
420 )?;
421 let account = WalletAddress::from(event.account);
422 let token = WalletAddress::from(event.token);
423
424 handle_observed_deposit(
425 self.store.as_ref(),
426 self.applier.as_ref(),
427 self,
428 ObservedDeposit {
429 tx_hash,
430 log_index,
431 observed_block,
432 account,
433 token,
434 amount: event.amount,
435 },
436 )
437 .await
438 }
439
440 async fn handle_usdc_deposit_log(&self, log: Log) -> Result<()> {
441 let decoded = log
442 .log_decode::<Exchange::UsdcDeposit>()
443 .inspect_err(|_| {
444 counter!("ht_rsm_deposit_credit_errors_total", "stage" => "decode").increment(1);
445 })
446 .context("failed to decode Exchange.UsdcDeposit log")?;
447 let event = decoded.data();
448 if event.dstDex != 0 {
449 counter!("ht_rsm_deposit_credit_errors_total", "stage" => "unsupported_dst_dex")
450 .increment(1);
451 tracing::warn!(
452 dst_dex = event.dstDex,
453 tx_hash = ?log.transaction_hash,
454 "Exchange.UsdcDeposit has unsupported dstDex {} for tx {:?}",
455 event.dstDex,
456 log.transaction_hash
457 );
458 return Ok(());
459 }
460
461 let tx_hash = log
462 .transaction_hash
463 .ok_or_else(|| anyhow!("Exchange.UsdcDeposit missing transaction hash"))?
464 .to_string();
465 let log_index = i64::try_from(
466 log.log_index
467 .ok_or_else(|| anyhow!("Exchange.UsdcDeposit missing log index"))?,
468 )?;
469 let observed_block = i64::try_from(
470 log.block_number
471 .ok_or_else(|| anyhow!("Exchange.UsdcDeposit missing block number"))?,
472 )?;
473 let claim_input = RsmDepositCreditClaimInput {
474 tx_hash,
475 log_index,
476 observed_block,
477 account: WalletAddress::from(event.account),
478 token: WalletAddress::from(event.token),
479 amount_wei: event.amount.to_string(),
480 credit_kind: CREDIT_KIND_USDC.to_string(),
481 request_id: Uuid::now_v7().to_string(),
482 };
483 let claim = self
484 .store
485 .claim_deposit_credit(&claim_input)
486 .await
487 .inspect_err(|_| {
488 counter!("ht_rsm_deposit_credit_errors_total", "stage" => "claim").increment(1);
489 })?;
490 validate_claim(&claim_input, &claim);
491 info!(
492 account = %claim.account,
493 from = %WalletAddress::from(event.from),
494 token = %claim.token,
495 amount_wei = %claim.amount_wei,
496 request_id = %claim.request_id,
497 tx_hash = %claim.tx_hash,
498 log_index = claim.log_index,
499 "Observed Exchange.UsdcDeposit pending HyperCore correlation"
500 );
501 Ok(())
502 }
503
504 async fn handle_pm_liquidity_deposit_log(&self, log: Log) -> Result<()> {
505 let decoded = log
506 .log_decode::<Exchange::PmLiquidityDeposit>()
507 .inspect_err(|_| {
508 counter!("ht_rsm_deposit_credit_errors_total", "stage" => "decode").increment(1);
509 })
510 .context("failed to decode Exchange.PmLiquidityDeposit log")?;
511 let event = decoded.data();
512 if event.dstDex != 0 {
513 counter!("ht_rsm_deposit_credit_errors_total", "stage" => "unsupported_dst_dex")
514 .increment(1);
515 warn!(
516 dst_dex = event.dstDex,
517 tx_hash = ?log.transaction_hash,
518 "Exchange.PmLiquidityDeposit has unsupported dstDex"
519 );
520 return Ok(());
521 }
522 if event.token != self.usdc_address {
523 anyhow::bail!(
524 "Exchange.PmLiquidityDeposit token {} did not match configured USDC {}",
525 event.token,
526 self.usdc_address
527 );
528 }
529
530 let tx_hash = log
531 .transaction_hash
532 .ok_or_else(|| anyhow!("Exchange.PmLiquidityDeposit missing transaction hash"))?
533 .to_string();
534 let log_index_u64 = log
535 .log_index
536 .ok_or_else(|| anyhow!("Exchange.PmLiquidityDeposit missing log index"))?;
537 let log_index = i64::try_from(log_index_u64)?;
538 let observed_block = i64::try_from(
539 log.block_number
540 .ok_or_else(|| anyhow!("Exchange.PmLiquidityDeposit missing block number"))?,
541 )?;
542 let chain_id = rpc_timeout("get_chain_id", self.provider.get_chain_id()).await?;
543 handle_observed_pm_liquidity_deposit(
544 self.store.as_ref(),
545 self.applier.as_ref(),
546 ObservedPmLiquidityDeposit {
547 tx_hash,
548 log_index,
549 observed_block,
550 lp: WalletAddress::from(event.lp),
551 from: WalletAddress::from(event.from),
552 token: WalletAddress::from(event.token),
553 underlying: event.underlying.clone(),
554 amount: event.amount,
555 chain_id,
556 exchange_address: self.exchange_address.to_string(),
557 log_index_u64,
558 settlement_grace_ms: self.config.pm_liquidity_settlement_grace_ms,
559 },
560 )
561 .await
562 }
563}
564
565async fn handle_observed_pm_liquidity_deposit(
566 store: &dyn DepositCreditStore,
567 applier: &dyn DepositCreditApplier,
568 deposit: ObservedPmLiquidityDeposit,
569) -> Result<()> {
570 let request_id = Uuid::now_v7();
571 let claim_input = RsmDepositCreditClaimInput {
572 tx_hash: deposit.tx_hash.clone(),
573 log_index: deposit.log_index,
574 observed_block: deposit.observed_block,
575 account: deposit.lp,
576 token: deposit.token,
577 amount_wei: deposit.amount.to_string(),
578 credit_kind: CREDIT_KIND_PM_LIQUIDITY.to_string(),
579 request_id: request_id.to_string(),
580 };
581 let claim = store
582 .claim_deposit_credit(&claim_input)
583 .await
584 .inspect_err(|_| {
585 counter!("ht_rsm_deposit_credit_errors_total", "stage" => "claim").increment(1);
586 })?;
587 validate_claim(&claim_input, &claim);
588 if claim.status == "submitted" {
589 return Ok(());
590 }
591
592 let max_listed_expiry_ts_ms = match store
593 .max_active_option_expiry_ms(&deposit.underlying)
594 .await?
595 {
596 Some(expiry) => expiry,
597 None => {
598 let error = format!(
599 "PM liquidity deposit has no active listed options for underlying {}",
600 deposit.underlying
601 );
602 store.mark_failed(&claim.request_id, &error).await?;
603 warn!(
604 request_id = %claim.request_id,
605 tx_hash = %claim.tx_hash,
606 log_index = claim.log_index,
607 underlying = %deposit.underlying,
608 "Quarantined invalid PM liquidity deposit"
609 );
610 return Ok(());
611 }
612 };
613 let amount_usdc = usdc_token_amount_to_decimal(deposit.amount)?;
614 let claim_request_id = Uuid::parse_str(&claim.request_id)
615 .context("rsm_deposit_credits PM liquidity request_id must be a UUID")?;
616 let input_digest = pm_liquidity_input_digest(
617 deposit.chain_id,
618 &deposit.exchange_address,
619 &deposit.tx_hash,
620 deposit.log_index_u64,
621 &claim.account.to_string(),
622 &deposit.underlying,
623 &claim.amount_wei,
624 max_listed_expiry_ts_ms,
625 deposit.settlement_grace_ms,
626 );
627 applier
628 .apply_pm_liquidity_deposit(RecordPmVaultDepositCommand {
629 request_id: claim_request_id,
630 input_digest,
631 depositor: claim.account,
632 underlying: deposit.underlying.clone(),
633 amount_usdc,
634 chain_id: deposit.chain_id,
635 source_contract_address: WalletAddress::from_str(&deposit.exchange_address)
636 .context("PM liquidity source contract address must be a valid EVM address")?,
637 tx_hash: claim.tx_hash.clone(),
638 log_index: u32::try_from(deposit.log_index_u64)
639 .context("PM liquidity deposit log index exceeds u32")?,
640 max_listed_expiry_ts_ms,
641 settlement_grace_ms: deposit.settlement_grace_ms,
642 timestamp_ms: get_timestamp_millis(),
643 })
644 .await
645 .inspect_err(|_| {
646 counter!("ht_rsm_deposit_credit_errors_total", "stage" => "pm_engine_apply")
647 .increment(1);
648 })?;
649 store
650 .mark_submitted(&claim.request_id)
651 .await
652 .inspect_err(|_| {
653 counter!("ht_rsm_deposit_credit_errors_total", "stage" => "mark_submitted")
654 .increment(1);
655 })?;
656 counter!("ht_pm_liquidity_deposits_submitted_total").increment(1);
657 info!(
658 lp = %claim.account,
659 from = %deposit.from,
660 underlying = %deposit.underlying,
661 amount_usdc = %amount_usdc,
662 request_id = %claim.request_id,
663 tx_hash = %claim.tx_hash,
664 log_index = claim.log_index,
665 max_listed_expiry_ts_ms,
666 "Applied PM liquidity deposit for Exchange.PmLiquidityDeposit"
667 );
668 Ok(())
669}
670
671#[derive(Debug, Clone)]
672struct ObservedPmLiquidityDeposit {
673 tx_hash: String,
674 log_index: i64,
675 observed_block: i64,
676 lp: WalletAddress,
677 from: WalletAddress,
678 token: WalletAddress,
679 underlying: String,
680 amount: U256,
681 chain_id: u64,
682 exchange_address: String,
683 log_index_u64: u64,
684 settlement_grace_ms: i64,
685}
686
687struct DepositCreditChannelApplier {
688 option_deposit_sender: mpsc::Sender<OptionDepositRequest>,
689 pm_settlement_sender: mpsc::Sender<hypercall_runtime_api::PmSettlementAdminRequest>,
690}
691
692#[async_trait]
693impl DepositCreditApplier for DepositCreditChannelApplier {
694 async fn apply_option_deposit(&self, request: OptionDepositRequest) -> Result<()> {
695 self.option_deposit_sender
696 .apply_option_deposit(request)
697 .await
698 }
699
700 async fn apply_pm_liquidity_deposit(&self, command: RecordPmVaultDepositCommand) -> Result<()> {
701 let (tx, rx) = oneshot::channel();
702 self.pm_settlement_sender
703 .send(hypercall_runtime_api::PmSettlementAdminRequest {
704 command: EngineCommand::RecordPmVaultDeposit(command),
705 applied_tx: tx,
706 })
707 .await
708 .map_err(|_| anyhow!("PM settlement command receiver closed"))?;
709 rx.await
710 .map_err(|_| anyhow!("PM settlement command acknowledgement dropped"))?
711 .map_err(|err| anyhow!(err))
712 }
713}
714
715#[async_trait]
716impl DepositManagerResolver for RsmDepositCreditObserver {
717 async fn manager_for_account(&self, account: WalletAddress) -> Result<WalletAddress> {
718 let exchange = Exchange::new(self.exchange_address, &self.provider);
719 let manager = rpc_timeout("Exchange.managers", async {
720 exchange.managers(account.inner()).call().await
721 })
722 .await
723 .with_context(|| format!("failed to resolve Exchange manager for {account}"))?;
724 if manager == Address::ZERO {
725 counter!("ht_rsm_deposit_credit_errors_total", "stage" => "manager_lookup")
726 .increment(1);
727 anyhow::bail!("Exchange.managers({account}) returned zero address");
728 }
729 Ok(WalletAddress::from(manager))
730 }
731}
732
733async fn handle_observed_deposit(
734 store: &dyn DepositCreditStore,
735 applier: &dyn DepositCreditApplier,
736 manager_resolver: &dyn DepositManagerResolver,
737 deposit: ObservedDeposit,
738) -> Result<()> {
739 let manager = manager_resolver
740 .manager_for_account(deposit.account)
741 .await?;
742 store
743 .register_observed_account(&manager)
744 .await
745 .inspect_err(|_| {
746 counter!("ht_rsm_deposit_credit_errors_total", "stage" => "register_account")
747 .increment(1);
748 })?;
749
750 let amount_wei = deposit.amount.to_string();
751 let claim_input = RsmDepositCreditClaimInput {
752 tx_hash: deposit.tx_hash,
753 log_index: deposit.log_index,
754 observed_block: deposit.observed_block,
755 account: deposit.account,
756 token: deposit.token,
757 amount_wei,
758 credit_kind: CREDIT_KIND_OPTION.to_string(),
759 request_id: Uuid::now_v7().to_string(),
760 };
761 let claim = store
762 .claim_deposit_credit(&claim_input)
763 .await
764 .inspect_err(|_| {
765 counter!("ht_rsm_deposit_credit_errors_total", "stage" => "claim").increment(1);
766 })?;
767 validate_claim(&claim_input, &claim);
768 if claim.status == "submitted" {
769 return Ok(());
770 }
771
772 let instrument = match store
773 .option_instrument_for_credit(&deposit.token)
774 .await
775 .inspect_err(|_| {
776 counter!("ht_rsm_deposit_credit_errors_total", "stage" => "instrument_lookup")
777 .increment(1);
778 })? {
779 Some(instrument) => instrument,
780 None => {
781 counter!("ht_rsm_deposit_credit_errors_total", "stage" => "unsupported_token")
782 .increment(1);
783 anyhow::bail!(
784 "deposit token has no option instrument mapping: account={}, token={}, tx_hash={}, log_index={}",
785 claim_input.account,
786 claim_input.token,
787 claim_input.tx_hash,
788 claim_input.log_index
789 );
790 }
791 };
792 let quantity = option_token_amount_to_quantity(deposit.amount)?;
793 applier
794 .apply_option_deposit(OptionDepositRequest {
795 request_id: claim.request_id.clone(),
796 wallet: manager,
797 symbol: instrument.symbol.clone(),
798 quantity,
799 timestamp_ms: get_timestamp_millis(),
800 applied_tx: None,
801 })
802 .await
803 .inspect_err(|_| {
804 counter!("ht_rsm_deposit_credit_errors_total", "stage" => "engine_apply").increment(1);
805 })?;
806 store
807 .mark_submitted(&claim.request_id)
808 .await
809 .inspect_err(|_| {
810 counter!("ht_rsm_deposit_credit_errors_total", "stage" => "mark_submitted")
811 .increment(1);
812 })?;
813 counter!("ht_rsm_deposit_credit_submitted_total").increment(1);
814 info!(
815 account = %deposit.account,
816 manager = %manager,
817 token = %deposit.token,
818 symbol = %instrument.symbol,
819 quantity = %quantity,
820 request_id = %claim.request_id,
821 tx_hash = %claim.tx_hash,
822 log_index = claim.log_index,
823 "Applied option deposit credit for Exchange.Deposit"
824 );
825 Ok(())
826}
827
828#[derive(Debug, Clone)]
829struct ObservedDeposit {
830 tx_hash: String,
831 log_index: i64,
832 observed_block: i64,
833 account: WalletAddress,
834 token: WalletAddress,
835 amount: U256,
836}
837
838#[async_trait::async_trait]
839impl Service for RsmDepositCreditObserver {
840 fn name(&self) -> &'static str {
841 "RsmDepositCreditObserver"
842 }
843
844 fn owner(&self) -> ServiceOwner {
845 ServiceOwner::Shared
846 }
847
848 async fn run(self: Arc<Self>, shutdown: crate::shared::shutdown::ShutdownRx) -> Result<()> {
849 self.run_with_shutdown(shutdown).await
850 }
851}
852
853fn option_token_amount_to_quantity(amount: U256) -> Result<Decimal> {
854 let raw = Decimal::from_str(&amount.to_string())
855 .with_context(|| format!("failed to parse option token amount {}", amount))?;
856 let quantity = raw / dec!(1000000);
857 if quantity <= Decimal::ZERO {
858 anyhow::bail!(
859 "option deposit amount {} converts to non-positive quantity",
860 amount
861 );
862 }
863 Ok(quantity)
864}
865
866fn usdc_token_amount_to_decimal(amount: U256) -> Result<Decimal> {
867 let raw = Decimal::from_str(&amount.to_string())
868 .with_context(|| format!("failed to parse USDC token amount {}", amount))?;
869 let amount_usdc = raw / dec!(1000000);
870 if amount_usdc <= Decimal::ZERO {
871 anyhow::bail!(
872 "PM liquidity deposit amount {} converts to non-positive USDC",
873 amount
874 );
875 }
876 Ok(amount_usdc)
877}
878
879fn pm_liquidity_input_digest(
880 chain_id: u64,
881 exchange_address: &str,
882 tx_hash: &str,
883 log_index: u64,
884 lp: &str,
885 underlying: &str,
886 amount_wei: &str,
887 max_listed_expiry_ts_ms: i64,
888 settlement_grace_ms: i64,
889) -> String {
890 let mut hasher = Keccak256::new();
891 for part in [
892 "pm-liquidity-v1",
893 &chain_id.to_string(),
894 exchange_address,
895 tx_hash,
896 &log_index.to_string(),
897 lp,
898 underlying,
899 amount_wei,
900 &max_listed_expiry_ts_ms.to_string(),
901 &settlement_grace_ms.to_string(),
902 ] {
903 hasher.update((part.len() as u64).to_le_bytes());
904 hasher.update(part.as_bytes());
905 }
906 format!("0x{}", hex::encode(hasher.finalize()))
907}
908
909fn validate_claim(input: &RsmDepositCreditClaimInput, claim: &RsmDepositCreditClaimRecord) {
910 if input.tx_hash != claim.tx_hash
911 || input.log_index != claim.log_index
912 || input.account != claim.account
913 || input.token != claim.token
914 || input.amount_wei != claim.amount_wei
915 || input.credit_kind != claim.credit_kind
916 {
917 panic!(
918 "STATE_CORRUPTION: rsm_deposit_credits claim mismatch for tx_hash={} log_index={}",
919 input.tx_hash, input.log_index
920 );
921 }
922}
923
924fn log_sort_key(log: &Log) -> (u64, u64, u64) {
925 (
926 log.block_number.unwrap_or_default(),
927 log.transaction_index.unwrap_or_default(),
928 log.log_index.unwrap_or_default(),
929 )
930}
931
932fn initial_next_block(
933 max_observed_block: Option<u64>,
934 latest_block: u64,
935 startup_lookback_blocks: u64,
936) -> Result<u64> {
937 if startup_lookback_blocks == 0 {
938 return Err(anyhow!("startup_lookback_blocks must be greater than zero"));
939 }
940
941 if let Some(max_observed_block) = max_observed_block {
942 return Ok(max_observed_block);
946 }
947
948 Ok(latest_block.saturating_sub(startup_lookback_blocks.saturating_sub(1)))
952}
953
954async fn rpc_timeout<T>(
955 operation: &'static str,
956 future: impl std::future::Future<Output = Result<T, impl std::fmt::Display>>,
957) -> Result<T> {
958 tokio::time::timeout(RPC_TIMEOUT, future)
959 .await
960 .map_err(|_| anyhow!("{} timed out after {:?}", operation, RPC_TIMEOUT))?
961 .map_err(|error| anyhow!("{} failed: {}", operation, error))
962}
963
964fn next_poll_range(
965 next_block: u64,
966 latest_block: u64,
967 max_blocks_per_poll: u64,
968) -> Option<(u64, u64)> {
969 if next_block > latest_block {
970 return None;
971 }
972
973 let to_block = next_block
974 .saturating_add(max_blocks_per_poll.saturating_sub(1))
975 .min(latest_block);
976 Some((next_block, to_block))
977}
978
979fn block_chunks_inclusive(
980 from_block: u64,
981 to_block: u64,
982 max_blocks_per_get_logs: u64,
983) -> Vec<(u64, u64)> {
984 assert!(
985 max_blocks_per_get_logs > 0,
986 "max_blocks_per_get_logs must be greater than zero"
987 );
988 if from_block > to_block {
989 return Vec::new();
990 }
991
992 let mut chunks = Vec::new();
993 let mut chunk_from = from_block;
994 while chunk_from <= to_block {
995 let chunk_to = chunk_from
999 .saturating_add(max_blocks_per_get_logs.saturating_sub(1))
1000 .min(to_block);
1001 chunks.push((chunk_from, chunk_to));
1002 if chunk_to == u64::MAX {
1003 break;
1004 }
1005 chunk_from = chunk_to + 1;
1006 }
1007 chunks
1008}
1009
1010#[cfg(test)]
1011mod tests {
1012 use super::*;
1013 use std::sync::Mutex;
1014
1015 use rust_decimal_macros::dec;
1016
1017 struct MockStore {
1018 claim_status: String,
1019 instrument: Option<OptionInstrumentForCredit>,
1020 max_active_option_expiry_ms: Option<i64>,
1021 registered_accounts: Mutex<Vec<WalletAddress>>,
1022 claims: Mutex<Vec<RsmDepositCreditClaimInput>>,
1023 submitted: Mutex<Vec<String>>,
1024 failed: Mutex<Vec<(String, String)>>,
1025 }
1026
1027 impl MockStore {
1028 fn new(instrument: Option<OptionInstrumentForCredit>) -> Self {
1029 Self {
1030 claim_status: "pending".to_string(),
1031 instrument,
1032 max_active_option_expiry_ms: Some(1_800_000_000_000),
1033 registered_accounts: Mutex::new(Vec::new()),
1034 claims: Mutex::new(Vec::new()),
1035 submitted: Mutex::new(Vec::new()),
1036 failed: Mutex::new(Vec::new()),
1037 }
1038 }
1039 }
1040
1041 #[async_trait::async_trait]
1042 impl DepositCreditStore for MockStore {
1043 async fn max_observed_block(&self) -> Result<Option<u64>> {
1044 Ok(None)
1045 }
1046
1047 async fn register_observed_account(&self, account: &WalletAddress) -> Result<()> {
1048 self.registered_accounts.lock().unwrap().push(*account);
1049 Ok(())
1050 }
1051
1052 async fn claim_deposit_credit(
1053 &self,
1054 input: &RsmDepositCreditClaimInput,
1055 ) -> Result<RsmDepositCreditClaimRecord> {
1056 self.claims.lock().unwrap().push(input.clone());
1057 Ok(RsmDepositCreditClaimRecord {
1058 tx_hash: input.tx_hash.clone(),
1059 log_index: input.log_index,
1060 observed_block: input.observed_block,
1061 account: input.account,
1062 token: input.token,
1063 amount_wei: input.amount_wei.clone(),
1064 credit_kind: input.credit_kind.clone(),
1065 request_id: input.request_id.clone(),
1066 status: self.claim_status.clone(),
1067 })
1068 }
1069
1070 async fn mark_submitted(&self, request_id: &str) -> Result<()> {
1071 self.submitted.lock().unwrap().push(request_id.to_string());
1072 Ok(())
1073 }
1074
1075 async fn mark_failed(&self, request_id: &str, error: &str) -> Result<()> {
1076 self.failed
1077 .lock()
1078 .unwrap()
1079 .push((request_id.to_string(), error.to_string()));
1080 Ok(())
1081 }
1082
1083 async fn option_instrument_for_credit(
1084 &self,
1085 _token: &WalletAddress,
1086 ) -> Result<Option<OptionInstrumentForCredit>> {
1087 Ok(self.instrument.clone())
1088 }
1089
1090 async fn max_active_option_expiry_ms(&self, _underlying: &str) -> Result<Option<i64>> {
1091 Ok(self.max_active_option_expiry_ms)
1092 }
1093 }
1094
1095 #[derive(Default)]
1096 struct MockApplier {
1097 applied: Mutex<Vec<OptionDepositRequest>>,
1098 pm_applied: Mutex<Vec<RecordPmVaultDepositCommand>>,
1099 }
1100
1101 struct MockManagerResolver {
1102 manager: WalletAddress,
1103 }
1104
1105 #[async_trait::async_trait]
1106 impl DepositManagerResolver for MockManagerResolver {
1107 async fn manager_for_account(&self, _account: WalletAddress) -> Result<WalletAddress> {
1108 Ok(self.manager)
1109 }
1110 }
1111
1112 #[async_trait::async_trait]
1113 impl DepositCreditApplier for MockApplier {
1114 async fn apply_option_deposit(&self, request: OptionDepositRequest) -> Result<()> {
1115 self.applied.lock().unwrap().push(request);
1116 Ok(())
1117 }
1118
1119 async fn apply_pm_liquidity_deposit(
1120 &self,
1121 command: RecordPmVaultDepositCommand,
1122 ) -> Result<()> {
1123 self.pm_applied.lock().unwrap().push(command);
1124 Ok(())
1125 }
1126 }
1127
1128 fn btc_call_instrument() -> OptionInstrumentForCredit {
1129 OptionInstrumentForCredit {
1130 symbol: "BTC-20260130-100000-C".to_string(),
1131 underlying: "BTC".to_string(),
1132 expiry: 20260130,
1133 strike: dec!(100000),
1134 option_type: "call".to_string(),
1135 }
1136 }
1137
1138 #[test]
1139 fn option_token_amount_to_quantity_uses_token_decimals() {
1140 assert_eq!(
1141 option_token_amount_to_quantity(U256::from(1_500_000)).unwrap(),
1142 dec!(1.5)
1143 );
1144 }
1145
1146 #[test]
1147 #[should_panic(expected = "STATE_CORRUPTION")]
1148 fn validate_claim_panics_on_reused_log_with_different_payload() {
1149 let account = WalletAddress::from(Address::repeat_byte(1));
1150 let token = WalletAddress::from(Address::repeat_byte(2));
1151 let input = RsmDepositCreditClaimInput {
1152 tx_hash: "0xabc".to_string(),
1153 log_index: 1,
1154 observed_block: 10,
1155 account,
1156 token,
1157 amount_wei: "1".to_string(),
1158 credit_kind: "option".to_string(),
1159 request_id: "018f0000-0000-7000-8000-000000000001".to_string(),
1160 };
1161 let claim = RsmDepositCreditClaimRecord {
1162 tx_hash: input.tx_hash.clone(),
1163 log_index: input.log_index,
1164 observed_block: input.observed_block,
1165 account,
1166 token,
1167 amount_wei: "2".to_string(),
1168 credit_kind: input.credit_kind.clone(),
1169 request_id: input.request_id.clone(),
1170 status: "pending".to_string(),
1171 };
1172
1173 validate_claim(&input, &claim);
1174 }
1175
1176 #[test]
1177 fn initial_next_block_replays_watermark_block_after_restart() {
1178 let next = initial_next_block(Some(42), 100, 32).expect("watermark should resume");
1179
1180 assert_eq!(next, 42);
1181 }
1182
1183 #[test]
1184 fn initial_next_block_uses_bounded_startup_lookback() {
1185 let next = initial_next_block(None, 100, 32).expect("lookback should compute");
1186
1187 assert_eq!(next, 69);
1188 }
1189
1190 #[test]
1191 fn next_poll_range_caps_catchup_per_tick() {
1192 assert_eq!(next_poll_range(10, 10_000, 500), Some((10, 509)));
1193 assert_eq!(next_poll_range(10_001, 10_000, 500), None);
1194 }
1195
1196 #[test]
1197 fn block_chunks_inclusive_splits_get_logs_ranges() {
1198 let chunks = block_chunks_inclusive(10, 25, 7);
1199
1200 assert_eq!(chunks, vec![(10, 16), (17, 23), (24, 25)]);
1201 }
1202
1203 #[tokio::test]
1204 async fn handle_observed_deposit_claims_applies_and_marks_submitted() {
1205 let account = WalletAddress::from(Address::repeat_byte(1));
1206 let manager = WalletAddress::from(Address::repeat_byte(3));
1207 let token = WalletAddress::from(Address::repeat_byte(2));
1208 let store = MockStore::new(Some(btc_call_instrument()));
1209 let applier = MockApplier::default();
1210 let manager_resolver = MockManagerResolver { manager };
1211
1212 handle_observed_deposit(
1213 &store,
1214 &applier,
1215 &manager_resolver,
1216 ObservedDeposit {
1217 tx_hash: "0xabc".to_string(),
1218 log_index: 7,
1219 observed_block: 100,
1220 account,
1221 token,
1222 amount: U256::from(1_000_000),
1223 },
1224 )
1225 .await
1226 .expect("deposit should be published");
1227
1228 assert_eq!(
1229 store.registered_accounts.lock().unwrap().as_slice(),
1230 &[manager]
1231 );
1232
1233 let claims = store.claims.lock().unwrap();
1234 assert_eq!(claims.len(), 1);
1235 assert_eq!(claims[0].tx_hash, "0xabc");
1236 assert_eq!(claims[0].log_index, 7);
1237 assert_eq!(claims[0].observed_block, 100);
1238 assert_eq!(claims[0].account, account);
1239 assert_eq!(claims[0].token, token);
1240 assert_eq!(claims[0].amount_wei, "1000000");
1241 assert_eq!(claims[0].credit_kind, CREDIT_KIND_OPTION);
1242
1243 let applied = applier.applied.lock().unwrap();
1244 assert_eq!(applied.len(), 1);
1245 assert_eq!(applied[0].request_id, claims[0].request_id);
1246 assert_eq!(applied[0].wallet, manager);
1247 assert_eq!(applied[0].symbol, "BTC-20260130-100000-C");
1248 assert_eq!(applied[0].quantity, dec!(1));
1249
1250 assert_eq!(
1251 store.submitted.lock().unwrap().as_slice(),
1252 &[claims[0].request_id.clone()]
1253 );
1254 }
1255
1256 #[tokio::test]
1257 async fn handle_observed_pm_liquidity_deposit_claims_applies_and_marks_submitted() {
1258 let lp = WalletAddress::from(Address::repeat_byte(11));
1259 let payer = WalletAddress::from(Address::repeat_byte(12));
1260 let token = WalletAddress::from(Address::repeat_byte(13));
1261 let store = MockStore {
1262 claim_status: "pending".to_string(),
1263 instrument: None,
1264 max_active_option_expiry_ms: Some(1_800_000_000_000),
1265 registered_accounts: Mutex::new(Vec::new()),
1266 claims: Mutex::new(Vec::new()),
1267 submitted: Mutex::new(Vec::new()),
1268 failed: Mutex::new(Vec::new()),
1269 };
1270 let applier = MockApplier::default();
1271
1272 handle_observed_pm_liquidity_deposit(
1273 &store,
1274 &applier,
1275 ObservedPmLiquidityDeposit {
1276 tx_hash: "0xpm".to_string(),
1277 log_index: 9,
1278 observed_block: 100,
1279 lp,
1280 from: payer,
1281 token,
1282 underlying: "BTC".to_string(),
1283 amount: U256::from(50_000_000_000_u64),
1284 chain_id: 999,
1285 exchange_address: "0x1111111111111111111111111111111111111111".to_string(),
1286 log_index_u64: 9,
1287 settlement_grace_ms: 86_400_000,
1288 },
1289 )
1290 .await
1291 .expect("PM liquidity deposit should be journaled");
1292
1293 let claims = store.claims.lock().unwrap();
1294 assert_eq!(claims.len(), 1);
1295 assert_eq!(claims[0].tx_hash, "0xpm");
1296 assert_eq!(claims[0].log_index, 9);
1297 assert_eq!(claims[0].account, lp);
1298 assert_eq!(claims[0].token, token);
1299 assert_eq!(claims[0].amount_wei, "50000000000");
1300 assert_eq!(claims[0].credit_kind, CREDIT_KIND_PM_LIQUIDITY);
1301
1302 let applied = applier.pm_applied.lock().unwrap();
1303 assert_eq!(applied.len(), 1);
1304 assert_eq!(applied[0].request_id.to_string(), claims[0].request_id);
1305 assert_eq!(applied[0].depositor, lp);
1306 assert_eq!(applied[0].underlying, "BTC");
1307 assert_eq!(applied[0].amount_usdc, dec!(50000));
1308 assert_eq!(applied[0].chain_id, 999);
1309 assert_eq!(
1310 applied[0].source_contract_address,
1311 WalletAddress::from_str("0x1111111111111111111111111111111111111111").unwrap()
1312 );
1313 assert_eq!(applied[0].tx_hash, "0xpm");
1314 assert_eq!(applied[0].log_index, 9);
1315 assert_eq!(applied[0].max_listed_expiry_ts_ms, 1_800_000_000_000);
1316 assert_eq!(applied[0].settlement_grace_ms, 86_400_000);
1317 assert!(applied[0].input_digest.starts_with("0x"));
1318
1319 assert_eq!(
1320 store.submitted.lock().unwrap().as_slice(),
1321 &[claims[0].request_id.clone()]
1322 );
1323 }
1324
1325 #[tokio::test]
1326 async fn handle_observed_pm_liquidity_deposit_quarantines_unknown_underlying() {
1327 let lp = WalletAddress::from(Address::repeat_byte(11));
1328 let payer = WalletAddress::from(Address::repeat_byte(12));
1329 let token = WalletAddress::from(Address::repeat_byte(13));
1330 let store = MockStore {
1331 claim_status: "pending".to_string(),
1332 instrument: None,
1333 max_active_option_expiry_ms: None,
1334 registered_accounts: Mutex::new(Vec::new()),
1335 claims: Mutex::new(Vec::new()),
1336 submitted: Mutex::new(Vec::new()),
1337 failed: Mutex::new(Vec::new()),
1338 };
1339 let applier = MockApplier::default();
1340
1341 handle_observed_pm_liquidity_deposit(
1342 &store,
1343 &applier,
1344 ObservedPmLiquidityDeposit {
1345 tx_hash: "0xpm".to_string(),
1346 log_index: 9,
1347 observed_block: 100,
1348 lp,
1349 from: payer,
1350 token,
1351 underlying: "NOT_LISTED".to_string(),
1352 amount: U256::from(50_000_000_000_u64),
1353 chain_id: 999,
1354 exchange_address: "0x1111111111111111111111111111111111111111".to_string(),
1355 log_index_u64: 9,
1356 settlement_grace_ms: 86_400_000,
1357 },
1358 )
1359 .await
1360 .expect("invalid PM liquidity deposit should be quarantined, not retried forever");
1361
1362 let claims = store.claims.lock().unwrap();
1363 assert_eq!(claims.len(), 1);
1364 assert!(applier.pm_applied.lock().unwrap().is_empty());
1365 assert!(store.submitted.lock().unwrap().is_empty());
1366 let failed = store.failed.lock().unwrap();
1367 assert_eq!(failed.len(), 1);
1368 assert_eq!(failed[0].0, claims[0].request_id);
1369 assert!(
1370 failed[0].1.contains("no active listed options"),
1371 "unexpected failure reason: {}",
1372 failed[0].1
1373 );
1374 }
1375
1376 #[tokio::test]
1377 async fn handle_observed_deposit_claims_then_errors_on_unsupported_token() {
1378 let account = WalletAddress::from(Address::repeat_byte(1));
1379 let manager = WalletAddress::from(Address::repeat_byte(3));
1380 let store = MockStore::new(None);
1381 let applier = MockApplier::default();
1382 let manager_resolver = MockManagerResolver { manager };
1383
1384 let result = handle_observed_deposit(
1385 &store,
1386 &applier,
1387 &manager_resolver,
1388 ObservedDeposit {
1389 tx_hash: "0xabc".to_string(),
1390 log_index: 7,
1391 observed_block: 100,
1392 account,
1393 token: WalletAddress::from(Address::repeat_byte(2)),
1394 amount: U256::from(1_000_000),
1395 },
1396 )
1397 .await;
1398
1399 let error = result.expect_err("unsupported token should fail the observer path");
1400 assert!(
1401 error.to_string().contains("no option instrument mapping"),
1402 "unexpected error: {error:#}"
1403 );
1404 assert_eq!(
1405 store.registered_accounts.lock().unwrap().as_slice(),
1406 &[manager]
1407 );
1408 let claims = store.claims.lock().unwrap();
1409 assert_eq!(claims.len(), 1);
1410 assert_eq!(claims[0].tx_hash, "0xabc");
1411 assert_eq!(claims[0].log_index, 7);
1412 assert!(applier.applied.lock().unwrap().is_empty());
1413 assert!(store.submitted.lock().unwrap().is_empty());
1414 }
1415
1416 #[tokio::test]
1417 async fn handle_observed_deposit_skips_already_submitted_claim() {
1418 let store = MockStore {
1419 claim_status: "submitted".to_string(),
1420 instrument: None,
1421 max_active_option_expiry_ms: Some(1_800_000_000_000),
1422 registered_accounts: Mutex::new(Vec::new()),
1423 claims: Mutex::new(Vec::new()),
1424 submitted: Mutex::new(Vec::new()),
1425 failed: Mutex::new(Vec::new()),
1426 };
1427 let applier = MockApplier::default();
1428 let manager_resolver = MockManagerResolver {
1429 manager: WalletAddress::from(Address::repeat_byte(3)),
1430 };
1431
1432 handle_observed_deposit(
1433 &store,
1434 &applier,
1435 &manager_resolver,
1436 ObservedDeposit {
1437 tx_hash: "0xabc".to_string(),
1438 log_index: 7,
1439 observed_block: 100,
1440 account: WalletAddress::from(Address::repeat_byte(1)),
1441 token: WalletAddress::from(Address::repeat_byte(2)),
1442 amount: U256::from(1_000_000),
1443 },
1444 )
1445 .await
1446 .expect("submitted claim should be idempotent");
1447
1448 assert!(applier.applied.lock().unwrap().is_empty());
1449 assert!(store.submitted.lock().unwrap().is_empty());
1450 }
1451}