Skip to main content

hypercall/
hypercore_cash_ledger_observer.rs

1use std::sync::Arc;
2use std::time::{Duration, Instant};
3
4use alloy::primitives::FixedBytes;
5use anyhow::{anyhow, Context, Result};
6use async_trait::async_trait;
7use futures::{SinkExt, StreamExt};
8use hypercall_types::WalletAddress;
9use hypercore_rs::cash_ledger::{HyperliquidLedgerRequest, LedgerUpdate, WsLedgerEnvelope};
10use metrics::{counter, gauge};
11use rust_decimal::prelude::ToPrimitive;
12use rust_decimal::Decimal;
13use rust_decimal_macros::dec;
14use tokio::sync::{broadcast, mpsc, oneshot};
15use tokio_tungstenite::tungstenite::Message;
16use tracing::{debug, error, info, warn};
17
18use crate::read_cache::tier::TierCache;
19use crate::rsm::unified_engine::DepositRequest;
20use crate::shared::order_types::get_timestamp_millis;
21use crate::shared::service::{Service, ServiceOwner};
22use hypercall_db::HypercoreCashLedgerApply;
23use hypercall_db_diesel::DatabaseHandler;
24
25const DEFAULT_HL_INFO_URL: &str = "https://api.hyperliquid.xyz/info";
26const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
27const STARTUP_LOOKBACK_MS: i64 = 24 * 60 * 60 * 1000;
28const REPLAY_OVERLAP_MS: i64 = 5 * 60 * 1000;
29const WS_READ_TIMEOUT: Duration = Duration::from_secs(60);
30const WS_BASE_BACKOFF: Duration = Duration::from_secs(1);
31const WS_MAX_BACKOFF: Duration = Duration::from_secs(30);
32
33#[derive(Clone, Debug)]
34pub struct HypercoreCashLedgerObserverConfig {
35    pub poll_interval: Duration,
36    pub info_url: String,
37    pub exchange_address: WalletAddress,
38    pub core_deposit_wallet_address: WalletAddress,
39    pub ws_url: Option<String>,
40}
41
42impl HypercoreCashLedgerObserverConfig {
43    pub fn from_runtime_config(
44        config: &crate::backend_config::OnchainDepositsRuntimeConfig,
45        info_url: impl Into<String>,
46        exchange_address: WalletAddress,
47        core_deposit_wallet_address: WalletAddress,
48    ) -> Self {
49        Self {
50            poll_interval: Duration::from_millis(config.poll_interval_ms),
51            info_url: info_url.into(),
52            exchange_address,
53            core_deposit_wallet_address,
54            ws_url: config.ws_url.clone(),
55        }
56    }
57}
58
59impl Default for HypercoreCashLedgerObserverConfig {
60    fn default() -> Self {
61        Self {
62            poll_interval: Duration::from_millis(
63                crate::backend_config::OnchainDepositsRuntimeConfig::default().poll_interval_ms,
64            ),
65            info_url: DEFAULT_HL_INFO_URL.to_string(),
66            exchange_address: WalletAddress::default(),
67            core_deposit_wallet_address: WalletAddress::default(),
68            ws_url: None,
69        }
70    }
71}
72
73fn record_cash_ledger_watermark_gauges(
74    now_ms: i64,
75    last_event_time_ms: Option<i64>,
76    replay_start_time_ms: i64,
77) {
78    gauge!("ht_hypercore_cash_ledger_observer_replay_start_age_seconds")
79        .set(ms_age_seconds(now_ms, replay_start_time_ms));
80    match last_event_time_ms {
81        Some(last_event_time_ms) => {
82            gauge!("ht_hypercore_cash_ledger_observer_watermark_present").set(1.0);
83            gauge!("ht_hypercore_cash_ledger_observer_watermark_age_seconds")
84                .set(ms_age_seconds(now_ms, last_event_time_ms));
85        }
86        None => {
87            gauge!("ht_hypercore_cash_ledger_observer_watermark_present").set(0.0);
88        }
89    }
90}
91
92fn ms_age_seconds(now_ms: i64, observed_ms: i64) -> f64 {
93    now_ms.saturating_sub(observed_ms) as f64 / 1_000.0
94}
95
96#[async_trait]
97pub trait HypercoreCashLedgerStore: Send + Sync + 'static {
98    fn exchange_watermark(&self) -> Result<Option<i64>>;
99    async fn pending_rsm_usdc_deposit_for_amount(
100        &self,
101        amount_wei: &str,
102    ) -> Result<Option<hypercall_db::RsmUsdcDepositMatch>>;
103    async fn pending_rsm_usdc_deposit_for_evm_tx_hash(
104        &self,
105        evm_tx_hash: &str,
106        amount_wei: &str,
107    ) -> Result<Option<hypercall_db::RsmUsdcDepositMatch>>;
108    async fn pending_rsm_usdc_deposit_for_credited_hypercore_event(
109        &self,
110        event_hash: &str,
111        amount_wei: &str,
112    ) -> Result<Option<hypercall_db::RsmUsdcDepositMatch>>;
113    async fn pm_liquidity_deposit_for_evm_tx_hash(
114        &self,
115        evm_tx_hash: &str,
116        amount_wei: &str,
117    ) -> Result<Option<hypercall_db::RsmUsdcDepositMatch>>;
118    async fn pm_liquidity_deposit_for_amount(
119        &self,
120        amount_wei: &str,
121    ) -> Result<Option<hypercall_db::RsmUsdcDepositMatch>>;
122    async fn mark_pm_liquidity_deposit_hypercore_matched(
123        &self,
124        request_id: &str,
125        event_hash: &str,
126    ) -> Result<()>;
127    async fn credited_wallet_for_hypercore_cash_event(
128        &self,
129        event_hash: &str,
130    ) -> Result<Option<WalletAddress>>;
131    async fn non_crediting_hypercore_cash_event(
132        &self,
133        event_hash: &str,
134        amount_usdc: Decimal,
135    ) -> Result<bool>;
136    async fn mark_rsm_deposit_credit_submitted(&self, request_id: &str) -> Result<()>;
137    async fn record_non_crediting_deposit(&self, input: &HypercoreCashLedgerApply) -> Result<()>;
138    async fn apply_deposit(
139        &self,
140        input: &HypercoreCashLedgerApply,
141    ) -> Result<hypercall_db::HypercoreCashLedgerApplyResult>;
142}
143
144#[async_trait]
145impl HypercoreCashLedgerStore for DatabaseHandler {
146    fn exchange_watermark(&self) -> Result<Option<i64>> {
147        self.get_exchange_cash_ledger_watermark_sync()
148    }
149
150    async fn pending_rsm_usdc_deposit_for_amount(
151        &self,
152        amount_wei: &str,
153    ) -> Result<Option<hypercall_db::RsmUsdcDepositMatch>> {
154        self.pending_rsm_usdc_deposit_for_amount(amount_wei).await
155    }
156
157    async fn pending_rsm_usdc_deposit_for_evm_tx_hash(
158        &self,
159        evm_tx_hash: &str,
160        amount_wei: &str,
161    ) -> Result<Option<hypercall_db::RsmUsdcDepositMatch>> {
162        self.pending_rsm_usdc_deposit_for_evm_tx_hash(evm_tx_hash, amount_wei)
163            .await
164    }
165
166    async fn pending_rsm_usdc_deposit_for_credited_hypercore_event(
167        &self,
168        event_hash: &str,
169        amount_wei: &str,
170    ) -> Result<Option<hypercall_db::RsmUsdcDepositMatch>> {
171        self.pending_rsm_usdc_deposit_for_credited_hypercore_event(event_hash, amount_wei)
172            .await
173    }
174
175    async fn pm_liquidity_deposit_for_evm_tx_hash(
176        &self,
177        evm_tx_hash: &str,
178        amount_wei: &str,
179    ) -> Result<Option<hypercall_db::RsmUsdcDepositMatch>> {
180        self.pm_liquidity_deposit_for_evm_tx_hash(evm_tx_hash, amount_wei)
181            .await
182    }
183
184    async fn pm_liquidity_deposit_for_amount(
185        &self,
186        amount_wei: &str,
187    ) -> Result<Option<hypercall_db::RsmUsdcDepositMatch>> {
188        self.pm_liquidity_deposit_for_amount(amount_wei).await
189    }
190
191    async fn mark_pm_liquidity_deposit_hypercore_matched(
192        &self,
193        request_id: &str,
194        event_hash: &str,
195    ) -> Result<()> {
196        self.mark_pm_liquidity_deposit_hypercore_matched(request_id, event_hash)
197            .await
198    }
199
200    async fn credited_wallet_for_hypercore_cash_event(
201        &self,
202        event_hash: &str,
203    ) -> Result<Option<WalletAddress>> {
204        self.credited_wallet_for_hypercore_cash_event(event_hash)
205            .await
206    }
207
208    async fn non_crediting_hypercore_cash_event(
209        &self,
210        event_hash: &str,
211        amount_usdc: Decimal,
212    ) -> Result<bool> {
213        self.non_crediting_hypercore_cash_event(event_hash, amount_usdc)
214            .await
215    }
216
217    async fn mark_rsm_deposit_credit_submitted(&self, request_id: &str) -> Result<()> {
218        self.mark_rsm_deposit_credit_submitted(request_id).await
219    }
220
221    async fn record_non_crediting_deposit(&self, input: &HypercoreCashLedgerApply) -> Result<()> {
222        self.record_hypercore_cash_deposit_non_crediting(input)
223            .await
224    }
225
226    async fn apply_deposit(
227        &self,
228        input: &HypercoreCashLedgerApply,
229    ) -> Result<hypercall_db::HypercoreCashLedgerApplyResult> {
230        self.apply_hypercore_cash_deposit(input).await
231    }
232}
233
234#[async_trait]
235pub trait HypercoreCashDepositApplier: Send + Sync + 'static {
236    async fn apply_deposit(&self, request: DepositRequest) -> Result<()>;
237}
238
239#[async_trait]
240impl HypercoreCashDepositApplier for mpsc::Sender<DepositRequest> {
241    async fn apply_deposit(&self, mut request: DepositRequest) -> Result<()> {
242        let (tx, rx) = oneshot::channel();
243        request.applied_tx = Some(tx);
244        self.send(request)
245            .await
246            .map_err(|_| anyhow!("cash deposit receiver closed"))?;
247        rx.await
248            .map_err(|_| anyhow!("cash deposit apply acknowledgement dropped"))?
249            .map_err(|err| anyhow!(err))
250    }
251}
252
253pub struct HypercoreCashLedgerObserver {
254    store: Arc<dyn HypercoreCashLedgerStore>,
255    applier: Arc<dyn HypercoreCashDepositApplier>,
256    client: reqwest::Client,
257    config: HypercoreCashLedgerObserverConfig,
258    tier_cache: Arc<TierCache>,
259}
260
261#[derive(Debug, Clone, Copy, PartialEq, Eq)]
262enum CashLedgerCreditDecision {
263    Apply,
264    SkipPortfolioMargin,
265}
266
267impl HypercoreCashLedgerObserver {
268    pub fn new(
269        db: Arc<DatabaseHandler>,
270        deposit_sender: mpsc::Sender<DepositRequest>,
271        config: HypercoreCashLedgerObserverConfig,
272        tier_cache: Arc<TierCache>,
273    ) -> Result<Self> {
274        let client = reqwest::Client::builder()
275            .timeout(REQUEST_TIMEOUT)
276            .build()
277            .context("failed to build HyperCore cash ledger HTTP client")?;
278        Ok(Self {
279            store: db,
280            applier: Arc::new(deposit_sender),
281            client,
282            config,
283            tier_cache,
284        })
285    }
286
287    /// Decide whether the cash ledger credit can be applied.
288    ///
289    /// PM wallets get equity from hypercore_account_equity (Hydromancer), not
290    /// from the balance ledger, so deposit credits must be skipped. Wallets
291    /// without an explicit margin-mode row use Standard margin by default.
292    async fn cash_ledger_credit_decision(
293        &self,
294        wallet: &WalletAddress,
295    ) -> Result<CashLedgerCreditDecision> {
296        match self.tier_cache.get_margin_mode(wallet).await {
297            Ok(mode) if mode.is_portfolio() => Ok(CashLedgerCreditDecision::SkipPortfolioMargin),
298            Ok(_) => Ok(CashLedgerCreditDecision::Apply),
299            Err(error) => Err(error).with_context(|| {
300                format!(
301                    "failed to determine margin mode for {}; refusing cash ledger credit",
302                    wallet
303                )
304            }),
305        }
306    }
307
308    pub async fn run_with_shutdown(&self, mut shutdown_rx: broadcast::Receiver<()>) -> Result<()> {
309        match &self.config.ws_url {
310            Some(ws_url) => {
311                // Startup catchup: poll HTTP to recover any events missed while offline.
312                // Retry up to 3 times before entering WS mode so historical gaps are not
313                // silently accepted.
314                info!("HyperCore cash ledger observer: running startup catchup via HTTP poll");
315                let mut catchup_ok = false;
316                for attempt in 1..=3u32 {
317                    let poll_result = tokio::select! {
318                        result = self.poll_once() => result,
319                        _ = shutdown_rx.recv() => {
320                            info!("HyperCore cash ledger observer: shutdown signal received during startup catchup");
321                            return Ok(());
322                        }
323                    };
324
325                    match poll_result {
326                        Ok(()) => {
327                            catchup_ok = true;
328                            break;
329                        }
330                        Err(error) => {
331                            warn!(
332                                attempt,
333                                "HyperCore cash ledger observer: startup catchup failed: {}", error
334                            );
335                            if attempt < 3 {
336                                tokio::select! {
337                                    _ = tokio::time::sleep(Duration::from_secs(u64::from(attempt) * 2)) => {}
338                                    _ = shutdown_rx.recv() => {
339                                        info!("HyperCore cash ledger observer: shutdown signal received during startup catchup backoff");
340                                        return Ok(());
341                                    }
342                                }
343                            }
344                        }
345                    }
346                }
347                if !catchup_ok {
348                    return Err(anyhow!(
349                        "HyperCore cash ledger observer startup catchup exhausted retries; refusing to switch to WebSocket mode with a known historical gap"
350                    ));
351                }
352
353                // Switch to WebSocket live mode.
354                info!(
355                    "HyperCore cash ledger observer: switching to WebSocket mode at {}",
356                    ws_url
357                );
358                self.run_websocket(ws_url.clone(), &mut shutdown_rx).await
359            }
360            None => {
361                // Fallback: HTTP polling mode
362                info!("HyperCore cash ledger observer: running in HTTP polling mode (no ws_url configured)");
363                self.run_polling(shutdown_rx).await
364            }
365        }
366    }
367
368    /// HTTP polling loop (fallback when no ws_url is configured).
369    async fn run_polling(&self, mut shutdown_rx: broadcast::Receiver<()>) -> Result<()> {
370        let mut interval = tokio::time::interval(self.config.poll_interval);
371
372        loop {
373            tokio::select! {
374                _ = shutdown_rx.recv() => {
375                    info!("HyperCore cash ledger observer received shutdown signal");
376                    break;
377                }
378                _ = interval.tick() => {
379                    if let Err(error) = self.poll_once().await {
380                        counter!("ht_hypercore_cash_ledger_errors_total", "stage" => "poll").increment(1);
381                        warn!("HyperCore cash ledger observer poll failed: {}", error);
382                    }
383                }
384            }
385        }
386
387        Ok(())
388    }
389
390    /// WebSocket live mode with auto-reconnect and exponential backoff.
391    async fn run_websocket(
392        &self,
393        ws_url: String,
394        shutdown_rx: &mut broadcast::Receiver<()>,
395    ) -> Result<()> {
396        let mut consecutive_failures: u32 = 0;
397
398        loop {
399            let before = Instant::now();
400            tokio::select! {
401                _ = shutdown_rx.recv() => {
402                    info!("HyperCore cash ledger observer: shutdown signal received (WS mode)");
403                    return Ok(());
404                }
405                result = self.run_ws_connection(&ws_url) => {
406                    if let Err(ref err) = result {
407                        counter!("ht_hypercore_cash_ledger_errors_total", "stage" => "ws_connection").increment(1);
408                        warn!("HyperCore cash ledger observer WS connection error: {}", err);
409                    }
410
411                    // If the connection stayed up for >60s it was healthy; reset backoff
412                    if before.elapsed() > Duration::from_secs(60) {
413                        consecutive_failures = 0;
414                    }
415                    consecutive_failures += 1;
416                    let backoff = std::cmp::min(
417                        WS_BASE_BACKOFF * 2u32.saturating_pow(consecutive_failures - 1),
418                        WS_MAX_BACKOFF,
419                    );
420                    warn!(
421                        "HyperCore cash ledger observer: WS disconnected (failures: {}), reconnecting in {:?}",
422                        consecutive_failures, backoff
423                    );
424
425                    tokio::select! {
426                        _ = shutdown_rx.recv() => {
427                            info!("HyperCore cash ledger observer: shutdown during reconnect backoff");
428                            return Ok(());
429                        }
430                        _ = tokio::time::sleep(backoff) => {}
431                    }
432
433                    // Catch-up via HTTP poll before reconnecting WS to recover
434                    // any deposits missed during the connection gap.
435                    if let Err(error) = self.poll_once().await {
436                        warn!(
437                            "HyperCore cash ledger observer: reconnect catch-up poll failed: {}",
438                            error
439                        );
440                    }
441                }
442            }
443        }
444    }
445
446    /// Single WebSocket connection lifecycle: connect, subscribe, process events.
447    async fn run_ws_connection(&self, ws_url: &str) -> Result<()> {
448        info!(
449            "HyperCore cash ledger observer: connecting to WS at {}",
450            ws_url
451        );
452
453        let (ws_stream, _) = tokio_tungstenite::connect_async(ws_url)
454            .await
455            .context("failed to connect to Hydromancer WebSocket")?;
456
457        info!("HyperCore cash ledger observer: WS connected");
458        gauge!("ht_hypercore_cash_ledger_ws_connected").set(1.0);
459
460        let (mut write, mut read) = ws_stream.split();
461
462        // Subscribe to allUserNonFundingLedgerEvents
463        let sub_msg = serde_json::json!({
464            "method": "subscribe",
465            "subscription": {
466                "type": "allUserNonFundingLedgerEvents"
467            }
468        });
469        let sub_text = serde_json::to_string(&sub_msg)
470            .context("failed to serialize WS subscription message")?;
471        write
472            .send(Message::Text(sub_text))
473            .await
474            .context("failed to send WS subscription message")?;
475        info!("HyperCore cash ledger observer: subscribed to allUserNonFundingLedgerEvents");
476
477        let mut catchup_interval = tokio::time::interval(self.config.poll_interval);
478        catchup_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
479        catchup_interval.tick().await;
480        let read_timeout = tokio::time::sleep(WS_READ_TIMEOUT);
481        tokio::pin!(read_timeout);
482
483        // Read and process messages with timeout to detect hung connections
484        loop {
485            tokio::select! {
486                _ = &mut read_timeout => {
487                    warn!(
488                        "HyperCore cash ledger observer: WS read timeout ({}s), reconnecting",
489                        WS_READ_TIMEOUT.as_secs()
490                    );
491                    break;
492                },
493                maybe_msg = read.next() => {
494                    read_timeout.as_mut().reset(tokio::time::Instant::now() + WS_READ_TIMEOUT);
495                    match maybe_msg {
496                        Some(Ok(Message::Text(text))) => {
497                            if let Err(error) = self.handle_ws_message(&text).await {
498                                counter!("ht_hypercore_cash_ledger_errors_total", "stage" => "ws_message").increment(1);
499                                warn!(
500                                    "HyperCore cash ledger observer: error handling WS message: {}",
501                                    error
502                                );
503                            }
504                        }
505                        Some(Ok(Message::Ping(data))) => {
506                            if let Err(e) = write.send(Message::Pong(data)).await {
507                                error!("HyperCore cash ledger observer: failed to send pong: {}", e);
508                                break;
509                            }
510                        }
511                        Some(Ok(Message::Close(_))) => {
512                            info!("HyperCore cash ledger observer: server sent close frame");
513                            break;
514                        }
515                        Some(Err(e)) => {
516                            error!("HyperCore cash ledger observer: WS read error: {}", e);
517                            break;
518                        }
519                        None => {
520                            info!("HyperCore cash ledger observer: WS stream ended");
521                            break;
522                        }
523                        _ => {}
524                    }
525                },
526                _ = catchup_interval.tick() => {
527                    if let Err(error) = self.poll_once().await {
528                        counter!("ht_hypercore_cash_ledger_errors_total", "stage" => "ws_catchup_poll").increment(1);
529                        warn!(
530                            "HyperCore cash ledger observer WS catch-up poll failed: {}",
531                            error
532                        );
533                    }
534                }
535            }
536        }
537
538        gauge!("ht_hypercore_cash_ledger_ws_connected").set(0.0);
539        Ok(())
540    }
541
542    /// Process a single WebSocket message from the allUserNonFundingLedgerEvents stream.
543    async fn handle_ws_message(&self, text: &str) -> Result<()> {
544        // Try to parse as a ledger event envelope
545        let envelope: WsLedgerEnvelope = match serde_json::from_str(text) {
546            Ok(e) => e,
547            Err(_) => {
548                // Subscription confirmations and other non-data messages
549                debug!(
550                    "HyperCore cash ledger observer: non-data WS message: {}",
551                    &text[..text.len().min(200)]
552                );
553                return Ok(());
554            }
555        };
556
557        // Only process allUserNonFundingLedgerEvents channel messages
558        if !matches!(
559            envelope.channel.as_deref(),
560            Some(c) if c.contains("NonFundingLedger")
561        ) {
562            return Ok(());
563        }
564
565        let events = match envelope.data {
566            Some(events) => events,
567            None => return Ok(()),
568        };
569
570        for event in events {
571            self.handle_ledger_update(event, "websocket").await?;
572        }
573
574        Ok(())
575    }
576
577    /// Process a normalized HyperCore cash ledger update from websocket or HTTP polling.
578    async fn handle_ledger_update(&self, update: LedgerUpdate, source: &'static str) -> Result<()> {
579        let Some(transfer) = update.transfer()? else {
580            return Ok(());
581        };
582        // hypercore-rs exposes generic transfer facts. Hypercall-specific
583        // deposit semantics start here: only USDC transfers into the configured
584        // exchange account can become engine cash deposits.
585        if transfer.destination != self.config.exchange_address.0 {
586            return Ok(());
587        }
588        if transfer.token != "USDC" {
589            return Ok(());
590        };
591        let sender = WalletAddress::from(transfer.sender);
592        let amount_usdc = transfer.amount;
593        if amount_usdc <= Decimal::ZERO {
594            anyhow::bail!(
595                "pool transfer {} has non-positive amount {}",
596                transfer.event_hash,
597                amount_usdc
598            );
599        }
600
601        let credit_resolution = self
602            .resolve_usdc_deposit_credit(
603                sender,
604                amount_usdc,
605                &transfer.event_hash.to_string(),
606                transfer.writer_evm_tx_hash.as_deref(),
607            )
608            .await?;
609        let (credit_wallet, request_id, non_crediting_reason) = match credit_resolution {
610            CashCreditResolution::DirectSender => (sender, None, None),
611            CashCreditResolution::CreditAccount { wallet, request_id } => {
612                (wallet, request_id, None)
613            }
614            CashCreditResolution::SkipPmLiquidity => (sender, None, Some("pm_liquidity")),
615        };
616        let event_time_ms = i64::try_from(transfer.time)
617            .with_context(|| format!("ledger event time {} exceeds i64 range", transfer.time))?;
618        let event_hash = transfer.event_hash.to_string();
619        let apply = HypercoreCashLedgerApply {
620            wallet: credit_wallet,
621            event_hash,
622            event_time_ms,
623            amount_usdc,
624        };
625        if let Some(reason) = non_crediting_reason {
626            self.store.record_non_crediting_deposit(&apply).await?;
627            counter!(
628                "ht_hypercore_cash_pool_transfers_skipped_total",
629                "reason" => reason
630            )
631            .increment(1);
632            return Ok(());
633        }
634
635        // Portfolio-margin wallets get cash/equity from the Hydromancer equity
636        // path. Crediting this projection as cash would double-count the same
637        // HyperCore state in the engine.
638        match self.cash_ledger_credit_decision(&credit_wallet).await? {
639            CashLedgerCreditDecision::Apply => {}
640            CashLedgerCreditDecision::SkipPortfolioMargin => {
641                counter!("ht_hypercore_cash_pool_transfers_skipped_total", "reason" => "portfolio_margin")
642                    .increment(1);
643                warn!(
644                    wallet = %credit_wallet,
645                    amount_usdc = %amount_usdc,
646                    event_hash = %apply.event_hash,
647                    source = source,
648                    "Skipping balance_ledger credit for portfolio margin wallet (equity via Hydromancer)"
649                );
650                return Ok(());
651            }
652        }
653        // The DB projection write is idempotent by HyperCore event hash and
654        // returns the durable ledger_event_id used as the engine command
655        // sequence. Engine apply must use that sequence so replay and retries
656        // cannot double-apply the same transfer.
657        let result = self.store.apply_deposit(&apply).await?;
658        self.apply_engine_deposit(
659            credit_wallet,
660            amount_usdc,
661            &result,
662            transfer.time,
663            transfer.event_hash,
664        )
665        .await?;
666        if let Some(request_id) = request_id {
667            self.store
668                .mark_rsm_deposit_credit_submitted(&request_id)
669                .await?;
670        }
671        counter!("ht_hypercore_cash_pool_transfers_applied_total").increment(1);
672        info!(
673            wallet = %credit_wallet,
674            hypercore_sender = %sender,
675            amount_usdc = %amount_usdc,
676            event_hash = %apply.event_hash,
677            balance_after = ?result.balance_after,
678            exchange_address = %self.config.exchange_address,
679            source = source,
680            "Applied Account->Exchange pool transfer to engine cash ledger"
681        );
682        Ok(())
683    }
684
685    async fn poll_once(&self) -> Result<()> {
686        let now_ms =
687            i64::try_from(get_timestamp_millis()).context("current timestamp exceeds i64 range")?;
688        let last_event_time_ms = self.store.exchange_watermark()?;
689        let start_time = last_event_time_ms
690            .map(|last| last.saturating_sub(REPLAY_OVERLAP_MS))
691            .unwrap_or_else(|| now_ms.saturating_sub(STARTUP_LOOKBACK_MS));
692        record_cash_ledger_watermark_gauges(now_ms, last_event_time_ms, start_time);
693
694        let request = HyperliquidLedgerRequest::user_non_funding_ledger_updates(
695            self.config.exchange_address.to_string(),
696            start_time,
697        );
698        let response = self
699            .client
700            .post(&self.config.info_url)
701            .json(&request)
702            .send()
703            .await
704            .with_context(|| {
705                format!(
706                    "failed to request ledger updates for exchange {}",
707                    self.config.exchange_address
708                )
709            })?;
710        let status = response.status();
711        if !status.is_success() {
712            let body = response
713                .text()
714                .await
715                .unwrap_or_else(|_| "unknown error".to_string());
716            anyhow::bail!(
717                "ledger update request for exchange {} failed: {status}: {body}",
718                self.config.exchange_address
719            );
720        }
721
722        let updates: Vec<LedgerUpdate> = response.json().await.with_context(|| {
723            format!(
724                "failed to parse ledger updates for exchange {}",
725                self.config.exchange_address
726            )
727        })?;
728        for update in updates {
729            self.handle_ledger_update(update, "http_poll").await?;
730        }
731        Ok(())
732    }
733    async fn apply_engine_deposit(
734        &self,
735        wallet: WalletAddress,
736        amount_usdc: Decimal,
737        result: &hypercall_db::HypercoreCashLedgerApplyResult,
738        timestamp_ms: u64,
739        source_event_hash: FixedBytes<32>,
740    ) -> Result<()> {
741        let Some(sequence) = result.ledger_event_id else {
742            anyhow::bail!(
743                "cash ledger deposit for {} source_event_hash={} has no ledger_event_id; refusing engine apply",
744                wallet,
745                source_event_hash
746            );
747        };
748        self.applier
749            .apply_deposit(DepositRequest {
750                wallet,
751                amount: amount_usdc,
752                timestamp_ms,
753                sequence: Some(sequence),
754                source_event_hash,
755                journal_request_id: hypercore_cash_deposit_request_id(sequence),
756                outbox_appends: Vec::new(),
757                applied_tx: None,
758            })
759            .await
760    }
761
762    async fn resolve_usdc_deposit_credit(
763        &self,
764        hypercore_sender: WalletAddress,
765        amount_usdc: Decimal,
766        event_hash: &str,
767        writer_evm_tx_hash: Option<&str>,
768    ) -> Result<CashCreditResolution> {
769        if hypercore_sender != self.config.core_deposit_wallet_address
770            && hypercore_sender != self.config.exchange_address
771        {
772            return Ok(CashCreditResolution::DirectSender);
773        }
774
775        let amount_wei = usdc_decimal_to_wei_string(amount_usdc)?;
776        if let Some(wallet) = self
777            .store
778            .credited_wallet_for_hypercore_cash_event(event_hash)
779            .await?
780        {
781            let request_id = self
782                .store
783                .pending_rsm_usdc_deposit_for_credited_hypercore_event(event_hash, &amount_wei)
784                .await?
785                .map(|matched| matched.request_id);
786            return Ok(CashCreditResolution::CreditAccount { wallet, request_id });
787        }
788        if self
789            .store
790            .non_crediting_hypercore_cash_event(event_hash, amount_usdc)
791            .await?
792        {
793            return Ok(CashCreditResolution::SkipPmLiquidity);
794        }
795
796        let matched = if let Some(writer_evm_tx_hash) = writer_evm_tx_hash.and_then(non_empty_str) {
797            match self
798                .store
799                .pending_rsm_usdc_deposit_for_evm_tx_hash(writer_evm_tx_hash, &amount_wei)
800                .await?
801            {
802                Some(matched) => matched,
803                None => {
804                    if let Some(pm_match) = self
805                        .store
806                        .pm_liquidity_deposit_for_evm_tx_hash(writer_evm_tx_hash, &amount_wei)
807                        .await?
808                    {
809                        self.store
810                            .mark_pm_liquidity_deposit_hypercore_matched(
811                                &pm_match.request_id,
812                                event_hash,
813                            )
814                            .await?;
815                        info!(
816                            lp = %pm_match.account,
817                            evm_tx_hash = %pm_match.tx_hash,
818                            evm_log_index = pm_match.log_index,
819                            hypercore_event_hash = %event_hash,
820                            amount_wei = %amount_wei,
821                            "Matched Exchange.PmLiquidityDeposit to HyperCore cash ledger deposit; skipping user cash credit"
822                        );
823                        return Ok(CashCreditResolution::SkipPmLiquidity);
824                    }
825                    anyhow::bail!(
826                        "CoreWriter evm_tx_hash {} for HyperCore USDC deposit {} amount={} has no pending Exchange.UsdcDeposit attribution",
827                        writer_evm_tx_hash,
828                        event_hash,
829                        amount_usdc
830                    );
831                }
832            }
833        } else {
834            match self
835                .store
836                .pending_rsm_usdc_deposit_for_amount(&amount_wei)
837                .await?
838            {
839                Some(matched) => matched,
840                None => {
841                    if let Some(pm_match) = self
842                        .store
843                        .pm_liquidity_deposit_for_amount(&amount_wei)
844                        .await?
845                    {
846                        self.store
847                            .mark_pm_liquidity_deposit_hypercore_matched(
848                                &pm_match.request_id,
849                                event_hash,
850                            )
851                            .await?;
852                        info!(
853                            lp = %pm_match.account,
854                            evm_tx_hash = %pm_match.tx_hash,
855                            evm_log_index = pm_match.log_index,
856                            hypercore_event_hash = %event_hash,
857                            amount_wei = %amount_wei,
858                            "Matched Exchange.PmLiquidityDeposit to HyperCore cash ledger deposit by amount; skipping user cash credit"
859                        );
860                        return Ok(CashCreditResolution::SkipPmLiquidity);
861                    }
862                    anyhow::bail!(
863                        "Exchange/CoreDepositWallet-originated HyperCore USDC deposit {} amount={} has no pending Exchange.UsdcDeposit attribution",
864                        event_hash,
865                        amount_usdc
866                    );
867                }
868            }
869        };
870
871        info!(
872            credited_account = %matched.account,
873            evm_tx_hash = %matched.tx_hash,
874            evm_log_index = matched.log_index,
875            hypercore_event_hash = %event_hash,
876            writer_evm_tx_hash = writer_evm_tx_hash.unwrap_or(""),
877            amount_wei = %amount_wei,
878            "Matched Exchange.UsdcDeposit to HyperCore cash ledger deposit"
879        );
880        Ok(CashCreditResolution::CreditAccount {
881            wallet: matched.account,
882            request_id: Some(matched.request_id),
883        })
884    }
885}
886
887#[derive(Debug, Clone, PartialEq, Eq)]
888enum CashCreditResolution {
889    DirectSender,
890    CreditAccount {
891        wallet: WalletAddress,
892        request_id: Option<String>,
893    },
894    SkipPmLiquidity,
895}
896
897fn hypercore_cash_deposit_request_id(ledger_event_id: u64) -> String {
898    const HYPERCORE_CASH_DEPOSIT_UUID_PREFIX: u128 = 0x4859_4343_4153_4800_0000_0000_0000_0000;
899    uuid::Uuid::from_u128(HYPERCORE_CASH_DEPOSIT_UUID_PREFIX | u128::from(ledger_event_id))
900        .to_string()
901}
902
903fn usdc_decimal_to_wei_string(amount_usdc: Decimal) -> Result<String> {
904    if amount_usdc <= Decimal::ZERO {
905        anyhow::bail!("USDC deposit amount must be positive, got {}", amount_usdc);
906    }
907    let scaled = amount_usdc * dec!(1000000);
908    if scaled.fract() != Decimal::ZERO {
909        anyhow::bail!(
910            "USDC deposit amount {} has more than 6 decimal places",
911            amount_usdc
912        );
913    }
914    let wei = scaled
915        .to_u128()
916        .ok_or_else(|| anyhow!("USDC deposit amount {} exceeds u128 range", amount_usdc))?;
917    Ok(wei.to_string())
918}
919
920fn non_empty_str(value: &str) -> Option<&str> {
921    let trimmed = value.trim();
922    if trimmed.is_empty() {
923        None
924    } else {
925        Some(trimmed)
926    }
927}
928
929#[async_trait]
930impl Service for HypercoreCashLedgerObserver {
931    fn name(&self) -> &'static str {
932        "HypercoreCashLedgerObserver"
933    }
934
935    fn owner(&self) -> ServiceOwner {
936        ServiceOwner::Shared
937    }
938
939    async fn run(self: Arc<Self>, shutdown: crate::shared::shutdown::ShutdownRx) -> Result<()> {
940        self.run_with_shutdown(shutdown).await
941    }
942}
943
944#[cfg(test)]
945mod tests {
946    use super::*;
947    use hypercore_rs::cash_ledger::LedgerDelta;
948    use std::collections::{HashMap, HashSet, VecDeque};
949    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
950    use std::sync::Mutex;
951
952    // ── Config ────────────────────────────────────────────────────────
953
954    fn exchange_addr() -> WalletAddress {
955        WalletAddress::from(alloy::primitives::address!(
956            "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
957        ))
958    }
959
960    fn core_deposit_wallet_addr() -> WalletAddress {
961        WalletAddress::from(alloy::primitives::address!(
962            "6b9e773128f453f5c2c60935ee2de2cbc5390a24"
963        ))
964    }
965
966    fn sender_addr() -> WalletAddress {
967        WalletAddress::from(alloy::primitives::address!(
968            "301cd221cf81ef94cd276042aacfbcd0e3795589"
969        ))
970    }
971
972    fn test_event_hash(byte: u8) -> FixedBytes<32> {
973        FixedBytes::from([byte; 32])
974    }
975
976    fn test_event_hash_string(byte: u8) -> String {
977        test_event_hash(byte).to_string()
978    }
979
980    struct MockTierDb {
981        mode: Option<hypercall_types::MarginMode>,
982    }
983
984    impl hypercall_db::TierReader for MockTierDb {
985        fn get_margin_mode_sync(
986            &self,
987            _wallet: &WalletAddress,
988        ) -> Result<hypercall_types::MarginMode> {
989            Ok(self.mode.unwrap_or(hypercall_types::MarginMode::Standard))
990        }
991
992        fn get_existing_margin_mode_sync(
993            &self,
994            _wallet: &WalletAddress,
995        ) -> Result<Option<hypercall_types::MarginMode>> {
996            Ok(self.mode)
997        }
998
999        fn get_tier_defaults_sync(
1000            &self,
1001            _tier_name: &str,
1002        ) -> Result<Option<hypercall_db::TierDefaultsRecord>> {
1003            Ok(None)
1004        }
1005
1006        fn get_user_tier_sync(
1007            &self,
1008            _wallet: &WalletAddress,
1009        ) -> Result<Option<hypercall_db::UserTierRecord>> {
1010            Ok(None)
1011        }
1012
1013        fn get_all_user_tiers_sync(&self) -> Result<Vec<hypercall_db::UserTierRecord>> {
1014            Ok(Vec::new())
1015        }
1016    }
1017
1018    impl hypercall_db::TierWriter for MockTierDb {
1019        fn save_user_tier_sync(&self, _update: &hypercall_db::UserTierUpdate) -> Result<()> {
1020            Ok(())
1021        }
1022
1023        fn set_margin_mode_sync(
1024            &self,
1025            _wallet: &WalletAddress,
1026            _margin_mode: hypercall_types::MarginMode,
1027        ) -> Result<i64> {
1028            Ok(1)
1029        }
1030
1031        fn insert_margin_mode_if_missing_sync(
1032            &self,
1033            _wallet: &WalletAddress,
1034            _margin_mode: hypercall_types::MarginMode,
1035        ) -> Result<Option<i64>> {
1036            Ok(Some(1))
1037        }
1038
1039        fn delete_user_tier_sync(&self, _wallet: &WalletAddress) -> Result<()> {
1040            Ok(())
1041        }
1042    }
1043
1044    struct CountingStore {
1045        apply_calls: AtomicUsize,
1046        apply_inputs: Mutex<Vec<HypercoreCashLedgerApply>>,
1047        apply_results: Mutex<VecDeque<hypercall_db::HypercoreCashLedgerApplyResult>>,
1048        pending_usdc_matches: Mutex<VecDeque<hypercall_db::RsmUsdcDepositMatch>>,
1049        pm_liquidity_matches: Mutex<VecDeque<hypercall_db::RsmUsdcDepositMatch>>,
1050        matched_pm_liquidity_requests: Mutex<Vec<(String, String)>>,
1051        credited_event_wallets: Mutex<HashMap<String, WalletAddress>>,
1052        non_crediting_events: Mutex<HashSet<(String, Decimal)>>,
1053        submitted_usdc_requests: Mutex<Vec<String>>,
1054    }
1055
1056    #[async_trait::async_trait]
1057    impl HypercoreCashLedgerStore for CountingStore {
1058        fn exchange_watermark(&self) -> Result<Option<i64>> {
1059            Ok(None)
1060        }
1061
1062        async fn pending_rsm_usdc_deposit_for_amount(
1063            &self,
1064            _amount_wei: &str,
1065        ) -> Result<Option<hypercall_db::RsmUsdcDepositMatch>> {
1066            Ok(self
1067                .pending_usdc_matches
1068                .lock()
1069                .expect("pending_usdc_matches mutex poisoned")
1070                .pop_front())
1071        }
1072
1073        async fn pending_rsm_usdc_deposit_for_evm_tx_hash(
1074            &self,
1075            evm_tx_hash: &str,
1076            amount_wei: &str,
1077        ) -> Result<Option<hypercall_db::RsmUsdcDepositMatch>> {
1078            let mut matches = self
1079                .pending_usdc_matches
1080                .lock()
1081                .expect("pending_usdc_matches mutex poisoned");
1082            let Some(index) = matches
1083                .iter()
1084                .position(|row| row.tx_hash == evm_tx_hash && row.amount_wei == amount_wei)
1085            else {
1086                return Ok(None);
1087            };
1088            Ok(matches.remove(index))
1089        }
1090
1091        async fn pending_rsm_usdc_deposit_for_credited_hypercore_event(
1092            &self,
1093            event_hash: &str,
1094            amount_wei: &str,
1095        ) -> Result<Option<hypercall_db::RsmUsdcDepositMatch>> {
1096            let Some(wallet) = self
1097                .credited_event_wallets
1098                .lock()
1099                .expect("credited_event_wallets mutex poisoned")
1100                .get(event_hash)
1101                .copied()
1102            else {
1103                return Ok(None);
1104            };
1105            let mut matches = self
1106                .pending_usdc_matches
1107                .lock()
1108                .expect("pending_usdc_matches mutex poisoned");
1109            let Some(index) = matches
1110                .iter()
1111                .position(|row| row.account == wallet && row.amount_wei == amount_wei)
1112            else {
1113                return Ok(None);
1114            };
1115            Ok(matches.remove(index))
1116        }
1117
1118        async fn pm_liquidity_deposit_for_evm_tx_hash(
1119            &self,
1120            evm_tx_hash: &str,
1121            amount_wei: &str,
1122        ) -> Result<Option<hypercall_db::RsmUsdcDepositMatch>> {
1123            Ok(self
1124                .pm_liquidity_matches
1125                .lock()
1126                .expect("pm_liquidity_matches mutex poisoned")
1127                .iter()
1128                .find(|row| row.tx_hash == evm_tx_hash && row.amount_wei == amount_wei)
1129                .cloned())
1130        }
1131
1132        async fn pm_liquidity_deposit_for_amount(
1133            &self,
1134            amount_wei: &str,
1135        ) -> Result<Option<hypercall_db::RsmUsdcDepositMatch>> {
1136            Ok(self
1137                .pm_liquidity_matches
1138                .lock()
1139                .expect("pm_liquidity_matches mutex poisoned")
1140                .iter()
1141                .find(|row| row.amount_wei == amount_wei)
1142                .cloned())
1143        }
1144
1145        async fn mark_pm_liquidity_deposit_hypercore_matched(
1146            &self,
1147            request_id: &str,
1148            event_hash: &str,
1149        ) -> Result<()> {
1150            self.matched_pm_liquidity_requests
1151                .lock()
1152                .expect("matched_pm_liquidity_requests mutex poisoned")
1153                .push((request_id.to_string(), event_hash.to_string()));
1154            let mut matches = self
1155                .pm_liquidity_matches
1156                .lock()
1157                .expect("pm_liquidity_matches mutex poisoned");
1158            if let Some(index) = matches.iter().position(|row| row.request_id == request_id) {
1159                matches.remove(index);
1160            }
1161            Ok(())
1162        }
1163
1164        async fn credited_wallet_for_hypercore_cash_event(
1165            &self,
1166            event_hash: &str,
1167        ) -> Result<Option<WalletAddress>> {
1168            Ok(self
1169                .credited_event_wallets
1170                .lock()
1171                .expect("credited_event_wallets mutex poisoned")
1172                .get(event_hash)
1173                .copied())
1174        }
1175
1176        async fn non_crediting_hypercore_cash_event(
1177            &self,
1178            event_hash: &str,
1179            amount_usdc: Decimal,
1180        ) -> Result<bool> {
1181            Ok(self
1182                .non_crediting_events
1183                .lock()
1184                .expect("non_crediting_events mutex poisoned")
1185                .contains(&(event_hash.to_string(), amount_usdc)))
1186        }
1187
1188        async fn mark_rsm_deposit_credit_submitted(&self, request_id: &str) -> Result<()> {
1189            self.submitted_usdc_requests
1190                .lock()
1191                .expect("submitted_usdc_requests mutex poisoned")
1192                .push(request_id.to_string());
1193            Ok(())
1194        }
1195
1196        async fn record_non_crediting_deposit(
1197            &self,
1198            input: &HypercoreCashLedgerApply,
1199        ) -> Result<()> {
1200            self.non_crediting_events
1201                .lock()
1202                .expect("non_crediting_events mutex poisoned")
1203                .insert((input.event_hash.clone(), input.amount_usdc));
1204            self.apply_inputs
1205                .lock()
1206                .expect("apply_inputs mutex poisoned")
1207                .push(input.clone());
1208            Ok(())
1209        }
1210
1211        async fn apply_deposit(
1212            &self,
1213            input: &HypercoreCashLedgerApply,
1214        ) -> Result<hypercall_db::HypercoreCashLedgerApplyResult> {
1215            self.apply_calls.fetch_add(1, Ordering::SeqCst);
1216            self.apply_inputs
1217                .lock()
1218                .expect("apply_inputs mutex poisoned")
1219                .push(input.clone());
1220            if let Some(result) = self
1221                .apply_results
1222                .lock()
1223                .expect("apply_results mutex poisoned")
1224                .pop_front()
1225            {
1226                return Ok(result);
1227            }
1228            Ok(hypercall_db::HypercoreCashLedgerApplyResult {
1229                applied: true,
1230                balance_after: Some(Decimal::ONE),
1231                ledger_event_id: Some(1),
1232            })
1233        }
1234    }
1235
1236    struct CountingApplier {
1237        calls: AtomicUsize,
1238        fail_next: AtomicBool,
1239        journal_request_ids: Mutex<Vec<String>>,
1240    }
1241
1242    #[async_trait::async_trait]
1243    impl HypercoreCashDepositApplier for CountingApplier {
1244        async fn apply_deposit(&self, request: DepositRequest) -> Result<()> {
1245            self.calls.fetch_add(1, Ordering::SeqCst);
1246            self.journal_request_ids
1247                .lock()
1248                .expect("journal_request_ids mutex poisoned")
1249                .push(request.journal_request_id);
1250            if self.fail_next.swap(false, Ordering::SeqCst) {
1251                anyhow::bail!("injected engine apply failure");
1252            }
1253            Ok(())
1254        }
1255    }
1256
1257    fn test_observer_with_margin_mode(
1258        mode: Option<hypercall_types::MarginMode>,
1259    ) -> (
1260        HypercoreCashLedgerObserver,
1261        Arc<CountingStore>,
1262        Arc<CountingApplier>,
1263    ) {
1264        let store = Arc::new(CountingStore {
1265            apply_calls: AtomicUsize::new(0),
1266            apply_inputs: Mutex::new(Vec::new()),
1267            apply_results: Mutex::new(VecDeque::new()),
1268            pending_usdc_matches: Mutex::new(VecDeque::new()),
1269            pm_liquidity_matches: Mutex::new(VecDeque::new()),
1270            matched_pm_liquidity_requests: Mutex::new(Vec::new()),
1271            credited_event_wallets: Mutex::new(HashMap::new()),
1272            non_crediting_events: Mutex::new(HashSet::new()),
1273            submitted_usdc_requests: Mutex::new(Vec::new()),
1274        });
1275        let applier = Arc::new(CountingApplier {
1276            calls: AtomicUsize::new(0),
1277            fail_next: AtomicBool::new(false),
1278            journal_request_ids: Mutex::new(Vec::new()),
1279        });
1280        let tier_cache = Arc::new(
1281            TierCache::new(Arc::new(MockTierDb { mode })).expect("tier cache should initialize"),
1282        );
1283        let observer = HypercoreCashLedgerObserver {
1284            store: store.clone(),
1285            applier: applier.clone(),
1286            client: reqwest::Client::new(),
1287            config: HypercoreCashLedgerObserverConfig {
1288                exchange_address: exchange_addr(),
1289                core_deposit_wallet_address: core_deposit_wallet_addr(),
1290                ..Default::default()
1291            },
1292            tier_cache,
1293        };
1294        (observer, store, applier)
1295    }
1296
1297    #[tokio::test]
1298    async fn ws_missing_margin_mode_defaults_to_standard_and_credits() {
1299        let (observer, store, applier) = test_observer_with_margin_mode(None);
1300        let event = LedgerUpdate {
1301            time: 1778789705303_u64,
1302            hash: test_event_hash(1),
1303            evm_tx_hash: None,
1304            delta: LedgerDelta {
1305                delta_type: "send".to_string(),
1306                usdc: None,
1307                amount: Some("1.0".to_string()),
1308                token: Some("USDC".to_string()),
1309                user: Some(sender_addr().to_string()),
1310                destination: Some(exchange_addr().to_string()),
1311                evm_tx_hash: None,
1312            },
1313        };
1314
1315        observer
1316            .handle_ledger_update(event, "websocket")
1317            .await
1318            .expect("missing margin mode should default to Standard");
1319
1320        assert_eq!(store.apply_calls.load(Ordering::SeqCst), 1);
1321        assert_eq!(applier.calls.load(Ordering::SeqCst), 1);
1322        let apply_inputs = store
1323            .apply_inputs
1324            .lock()
1325            .expect("apply_inputs mutex poisoned");
1326        assert_eq!(apply_inputs.len(), 1);
1327        assert_eq!(
1328            apply_inputs[0].wallet,
1329            sender_addr(),
1330            "direct user HyperCore deposits must credit the sender"
1331        );
1332        assert!(
1333            applier
1334                .journal_request_ids
1335                .lock()
1336                .expect("journal_request_ids mutex poisoned")[0]
1337                .parse::<uuid::Uuid>()
1338                .ok()
1339                .is_some(),
1340            "standard-margin HyperCore cash deposits must carry a replayable engine journal request id"
1341        );
1342    }
1343
1344    #[tokio::test]
1345    async fn ws_deposit_replays_engine_apply_after_db_applied_retry() {
1346        let (observer, store, applier) =
1347            test_observer_with_margin_mode(Some(hypercall_types::MarginMode::Standard));
1348        store
1349            .apply_results
1350            .lock()
1351            .expect("apply_results mutex poisoned")
1352            .extend([
1353                hypercall_db::HypercoreCashLedgerApplyResult {
1354                    applied: true,
1355                    balance_after: Some(Decimal::ONE),
1356                    ledger_event_id: Some(41),
1357                },
1358                hypercall_db::HypercoreCashLedgerApplyResult {
1359                    applied: false,
1360                    balance_after: Some(Decimal::ONE),
1361                    ledger_event_id: Some(41),
1362                },
1363            ]);
1364        applier.fail_next.store(true, Ordering::SeqCst);
1365
1366        let event = LedgerUpdate {
1367            time: 1778789705303_u64,
1368            hash: test_event_hash(2),
1369            evm_tx_hash: None,
1370            delta: LedgerDelta {
1371                delta_type: "send".to_string(),
1372                usdc: None,
1373                amount: Some("1.0".to_string()),
1374                token: Some("USDC".to_string()),
1375                user: Some(sender_addr().to_string()),
1376                destination: Some(exchange_addr().to_string()),
1377                evm_tx_hash: None,
1378            },
1379        };
1380
1381        let first = observer
1382            .handle_ledger_update(event.clone(), "websocket")
1383            .await;
1384        assert!(first
1385            .expect_err("first apply should surface injected engine failure")
1386            .to_string()
1387            .contains("injected engine apply failure"));
1388
1389        observer
1390            .handle_ledger_update(event, "websocket")
1391            .await
1392            .expect("retry should replay engine apply using durable ledger_event_id");
1393
1394        assert_eq!(store.apply_calls.load(Ordering::SeqCst), 2);
1395        assert_eq!(applier.calls.load(Ordering::SeqCst), 2);
1396        let journal_request_ids = applier
1397            .journal_request_ids
1398            .lock()
1399            .expect("journal_request_ids mutex poisoned");
1400        assert_eq!(
1401            journal_request_ids.len(),
1402            2,
1403            "both first apply and retry should hand the engine a restart-source request id"
1404        );
1405        assert_eq!(
1406            journal_request_ids[0], journal_request_ids[1],
1407            "retries for the same durable ledger event must reuse the same engine journal request id"
1408        );
1409        assert!(
1410            journal_request_ids[0]
1411                .parse::<uuid::Uuid>()
1412                .ok()
1413                .is_some(),
1414            "standard-margin HyperCore cash deposits must carry a replayable engine journal request id"
1415        );
1416    }
1417
1418    #[tokio::test]
1419    async fn core_deposit_wallet_originated_deposit_credits_matched_usdc_account_not_exchange() {
1420        let (observer, store, applier) =
1421            test_observer_with_margin_mode(Some(hypercall_types::MarginMode::Standard));
1422        let credited_account = WalletAddress::from(alloy::primitives::address!(
1423            "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
1424        ));
1425        store
1426            .pending_usdc_matches
1427            .lock()
1428            .expect("pending_usdc_matches mutex poisoned")
1429            .push_back(hypercall_db::RsmUsdcDepositMatch {
1430                request_id: "usdc-request-1".to_string(),
1431                account: credited_account,
1432                tx_hash: "0xevm".to_string(),
1433                log_index: 3,
1434                amount_wei: "1000000".to_string(),
1435                token: WalletAddress::from(alloy::primitives::address!(
1436                    "cccccccccccccccccccccccccccccccccccccccc"
1437                )),
1438            });
1439
1440        let event = LedgerUpdate {
1441            time: 1778789705303_u64,
1442            hash: test_event_hash(3),
1443            evm_tx_hash: Some("0xevm".to_string()),
1444            delta: LedgerDelta {
1445                delta_type: "send".to_string(),
1446                usdc: None,
1447                amount: Some("1.0".to_string()),
1448                token: Some("USDC".to_string()),
1449                user: Some(core_deposit_wallet_addr().to_string()),
1450                destination: Some(exchange_addr().to_string()),
1451                evm_tx_hash: None,
1452            },
1453        };
1454
1455        observer
1456            .handle_ledger_update(event, "websocket")
1457            .await
1458            .expect("matched Exchange-originated deposit should credit attributed account");
1459
1460        let apply_inputs = store
1461            .apply_inputs
1462            .lock()
1463            .expect("apply_inputs mutex poisoned");
1464        assert_eq!(apply_inputs.len(), 1);
1465        assert_eq!(apply_inputs[0].wallet, credited_account);
1466        assert_ne!(
1467            apply_inputs[0].wallet,
1468            exchange_addr(),
1469            "Exchange-originated deposits must not credit Exchange itself"
1470        );
1471        assert_ne!(
1472            apply_inputs[0].wallet,
1473            core_deposit_wallet_addr(),
1474            "CoreDepositWallet-originated deposits must not credit CoreDepositWallet itself"
1475        );
1476        assert_eq!(apply_inputs[0].event_hash, test_event_hash_string(3));
1477        assert_eq!(applier.calls.load(Ordering::SeqCst), 1);
1478        assert_eq!(
1479            store
1480                .submitted_usdc_requests
1481                .lock()
1482                .expect("submitted_usdc_requests mutex poisoned")
1483                .as_slice(),
1484            &["usdc-request-1".to_string()]
1485        );
1486    }
1487
1488    #[tokio::test]
1489    async fn core_deposit_wallet_originated_pm_liquidity_with_evm_tx_is_consumed_and_not_credited()
1490    {
1491        let (observer, store, applier) =
1492            test_observer_with_margin_mode(Some(hypercall_types::MarginMode::Standard));
1493        store
1494            .pm_liquidity_matches
1495            .lock()
1496            .expect("pm_liquidity_matches mutex poisoned")
1497            .push_back(hypercall_db::RsmUsdcDepositMatch {
1498                request_id: "pm-liquidity-request-1".to_string(),
1499                account: sender_addr(),
1500                tx_hash: "0xpm-liquidity".to_string(),
1501                log_index: 9,
1502                amount_wei: "1000000".to_string(),
1503                token: WalletAddress::from(alloy::primitives::address!(
1504                    "cccccccccccccccccccccccccccccccccccccccc"
1505                )),
1506            });
1507
1508        let event = LedgerUpdate {
1509            time: 1778789705303_u64,
1510            hash: test_event_hash(7),
1511            evm_tx_hash: Some("0xpm-liquidity".to_string()),
1512            delta: LedgerDelta {
1513                delta_type: "send".to_string(),
1514                usdc: None,
1515                amount: Some("1.0".to_string()),
1516                token: Some("USDC".to_string()),
1517                user: Some(core_deposit_wallet_addr().to_string()),
1518                destination: Some(exchange_addr().to_string()),
1519                evm_tx_hash: None,
1520            },
1521        };
1522
1523        observer
1524            .handle_ledger_update(event, "websocket")
1525            .await
1526            .expect("PM liquidity deposit should be recorded as non-crediting");
1527
1528        assert_eq!(store.apply_calls.load(Ordering::SeqCst), 0);
1529        assert_eq!(applier.calls.load(Ordering::SeqCst), 0);
1530        assert_eq!(
1531            store
1532                .matched_pm_liquidity_requests
1533                .lock()
1534                .expect("matched_pm_liquidity_requests mutex poisoned")
1535                .as_slice(),
1536            &[(
1537                "pm-liquidity-request-1".to_string(),
1538                test_event_hash_string(7)
1539            )]
1540        );
1541        assert!(store
1542            .pm_liquidity_matches
1543            .lock()
1544            .expect("pm_liquidity_matches mutex poisoned")
1545            .is_empty());
1546        let apply_inputs = store
1547            .apply_inputs
1548            .lock()
1549            .expect("apply_inputs mutex poisoned");
1550        assert_eq!(apply_inputs.len(), 1);
1551        assert_eq!(apply_inputs[0].event_hash, test_event_hash_string(7));
1552    }
1553
1554    #[tokio::test]
1555    async fn core_deposit_wallet_originated_pm_liquidity_amount_match_is_consumed_and_not_credited()
1556    {
1557        let (observer, store, applier) =
1558            test_observer_with_margin_mode(Some(hypercall_types::MarginMode::Standard));
1559        store
1560            .pm_liquidity_matches
1561            .lock()
1562            .expect("pm_liquidity_matches mutex poisoned")
1563            .push_back(hypercall_db::RsmUsdcDepositMatch {
1564                request_id: "pm-liquidity-request-2".to_string(),
1565                account: sender_addr(),
1566                tx_hash: "0xpm-liquidity-amount".to_string(),
1567                log_index: 10,
1568                amount_wei: "1000000".to_string(),
1569                token: WalletAddress::from(alloy::primitives::address!(
1570                    "cccccccccccccccccccccccccccccccccccccccc"
1571                )),
1572            });
1573
1574        let event = LedgerUpdate {
1575            time: 1778789705303_u64,
1576            hash: test_event_hash(8),
1577            evm_tx_hash: None,
1578            delta: LedgerDelta {
1579                delta_type: "send".to_string(),
1580                usdc: None,
1581                amount: Some("1.0".to_string()),
1582                token: Some("USDC".to_string()),
1583                user: Some(core_deposit_wallet_addr().to_string()),
1584                destination: Some(exchange_addr().to_string()),
1585                evm_tx_hash: None,
1586            },
1587        };
1588
1589        observer
1590            .handle_ledger_update(event, "websocket")
1591            .await
1592            .expect("amount-matched PM liquidity deposit should be recorded as non-crediting");
1593
1594        assert_eq!(store.apply_calls.load(Ordering::SeqCst), 0);
1595        assert_eq!(applier.calls.load(Ordering::SeqCst), 0);
1596        assert_eq!(
1597            store
1598                .matched_pm_liquidity_requests
1599                .lock()
1600                .expect("matched_pm_liquidity_requests mutex poisoned")
1601                .as_slice(),
1602            &[(
1603                "pm-liquidity-request-2".to_string(),
1604                test_event_hash_string(8)
1605            )]
1606        );
1607        assert!(store
1608            .pm_liquidity_matches
1609            .lock()
1610            .expect("pm_liquidity_matches mutex poisoned")
1611            .is_empty());
1612        let apply_inputs = store
1613            .apply_inputs
1614            .lock()
1615            .expect("apply_inputs mutex poisoned");
1616        assert_eq!(apply_inputs.len(), 1);
1617        assert_eq!(apply_inputs[0].event_hash, test_event_hash_string(8));
1618    }
1619
1620    #[tokio::test]
1621    async fn replayed_pm_liquidity_non_crediting_event_does_not_rematch_or_credit() {
1622        let (observer, store, applier) =
1623            test_observer_with_margin_mode(Some(hypercall_types::MarginMode::Standard));
1624        store
1625            .pm_liquidity_matches
1626            .lock()
1627            .expect("pm_liquidity_matches mutex poisoned")
1628            .push_back(hypercall_db::RsmUsdcDepositMatch {
1629                request_id: "pm-liquidity-request-replay".to_string(),
1630                account: sender_addr(),
1631                tx_hash: "0xpm-liquidity-replay".to_string(),
1632                log_index: 11,
1633                amount_wei: "1000000".to_string(),
1634                token: WalletAddress::from(alloy::primitives::address!(
1635                    "cccccccccccccccccccccccccccccccccccccccc"
1636                )),
1637            });
1638
1639        let event = || LedgerUpdate {
1640            time: 1778789705303_u64,
1641            hash: test_event_hash(9),
1642            evm_tx_hash: Some("0xpm-liquidity-replay".to_string()),
1643            delta: LedgerDelta {
1644                delta_type: "send".to_string(),
1645                usdc: None,
1646                amount: Some("1.0".to_string()),
1647                token: Some("USDC".to_string()),
1648                user: Some(core_deposit_wallet_addr().to_string()),
1649                destination: Some(exchange_addr().to_string()),
1650                evm_tx_hash: None,
1651            },
1652        };
1653
1654        observer
1655            .handle_ledger_update(event(), "websocket")
1656            .await
1657            .expect("first PM liquidity deposit should be recorded as non-crediting");
1658        observer
1659            .handle_ledger_update(event(), "replay")
1660            .await
1661            .expect("replayed PM liquidity deposit should stay non-crediting");
1662
1663        assert_eq!(store.apply_calls.load(Ordering::SeqCst), 0);
1664        assert_eq!(applier.calls.load(Ordering::SeqCst), 0);
1665        assert_eq!(
1666            store
1667                .matched_pm_liquidity_requests
1668                .lock()
1669                .expect("matched_pm_liquidity_requests mutex poisoned")
1670                .as_slice(),
1671            &[(
1672                "pm-liquidity-request-replay".to_string(),
1673                test_event_hash_string(9)
1674            )],
1675            "replay must not try to consume the PM liquidity request twice"
1676        );
1677        let apply_inputs = store
1678            .apply_inputs
1679            .lock()
1680            .expect("apply_inputs mutex poisoned");
1681        assert_eq!(apply_inputs.len(), 2);
1682        assert!(apply_inputs
1683            .iter()
1684            .all(|input| input.event_hash == test_event_hash_string(9)));
1685    }
1686
1687    #[tokio::test]
1688    async fn core_deposit_wallet_originated_deposit_without_evm_event_fails_before_credit() {
1689        let (observer, store, applier) =
1690            test_observer_with_margin_mode(Some(hypercall_types::MarginMode::Standard));
1691        let event = LedgerUpdate {
1692            time: 1778789705303_u64,
1693            hash: test_event_hash(4),
1694            evm_tx_hash: None,
1695            delta: LedgerDelta {
1696                delta_type: "send".to_string(),
1697                usdc: None,
1698                amount: Some("1.0".to_string()),
1699                token: Some("USDC".to_string()),
1700                user: Some(core_deposit_wallet_addr().to_string()),
1701                destination: Some(exchange_addr().to_string()),
1702                evm_tx_hash: None,
1703            },
1704        };
1705
1706        let error = observer
1707            .handle_ledger_update(event, "websocket")
1708            .await
1709            .expect_err("unmatched CoreDepositWallet-originated deposit must not be credited");
1710        assert!(
1711            error
1712                .to_string()
1713                .contains("no pending Exchange.UsdcDeposit"),
1714            "unexpected error: {error:#}"
1715        );
1716        assert_eq!(store.apply_calls.load(Ordering::SeqCst), 0);
1717        assert!(
1718            store
1719                .apply_inputs
1720                .lock()
1721                .expect("apply_inputs mutex poisoned")
1722                .is_empty(),
1723            "unattributed CoreDepositWallet-originated deposits must not write a cash ledger row"
1724        );
1725        assert_eq!(applier.calls.load(Ordering::SeqCst), 0);
1726    }
1727
1728    #[tokio::test]
1729    async fn exchange_originated_deposit_replay_uses_existing_credited_wallet() {
1730        let (observer, store, applier) =
1731            test_observer_with_margin_mode(Some(hypercall_types::MarginMode::Standard));
1732        let credited_account = WalletAddress::from(alloy::primitives::address!(
1733            "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
1734        ));
1735        store
1736            .credited_event_wallets
1737            .lock()
1738            .expect("credited_event_wallets mutex poisoned")
1739            .insert(test_event_hash_string(5), credited_account);
1740
1741        let event = LedgerUpdate {
1742            time: 1778789705303_u64,
1743            hash: test_event_hash(5),
1744            evm_tx_hash: None,
1745            delta: LedgerDelta {
1746                delta_type: "send".to_string(),
1747                usdc: None,
1748                amount: Some("1.0".to_string()),
1749                token: Some("USDC".to_string()),
1750                user: Some(exchange_addr().to_string()),
1751                destination: Some(exchange_addr().to_string()),
1752                evm_tx_hash: None,
1753            },
1754        };
1755
1756        observer
1757            .handle_ledger_update(event, "websocket")
1758            .await
1759            .expect("replayed matched event should reuse credited wallet");
1760
1761        let apply_inputs = store
1762            .apply_inputs
1763            .lock()
1764            .expect("apply_inputs mutex poisoned");
1765        assert_eq!(apply_inputs.len(), 1);
1766        assert_eq!(apply_inputs[0].wallet, credited_account);
1767        assert!(
1768            store
1769                .submitted_usdc_requests
1770                .lock()
1771                .expect("submitted_usdc_requests mutex poisoned")
1772                .is_empty(),
1773            "replay path must not require a pending EVM request"
1774        );
1775        assert_eq!(applier.calls.load(Ordering::SeqCst), 1);
1776    }
1777
1778    #[tokio::test]
1779    async fn exchange_originated_deposit_replay_marks_recovered_pending_request_submitted() {
1780        let (observer, store, _applier) =
1781            test_observer_with_margin_mode(Some(hypercall_types::MarginMode::Standard));
1782        let credited_account = WalletAddress::from(alloy::primitives::address!(
1783            "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
1784        ));
1785        store
1786            .credited_event_wallets
1787            .lock()
1788            .expect("credited_event_wallets mutex poisoned")
1789            .insert(test_event_hash_string(6), credited_account);
1790        store
1791            .pending_usdc_matches
1792            .lock()
1793            .expect("pending_usdc_matches mutex poisoned")
1794            .push_back(hypercall_db::RsmUsdcDepositMatch {
1795                request_id: "usdc-request-recovered".to_string(),
1796                account: credited_account,
1797                tx_hash: "0xevm".to_string(),
1798                log_index: 7,
1799                amount_wei: "1000000".to_string(),
1800                token: WalletAddress::from(alloy::primitives::address!(
1801                    "cccccccccccccccccccccccccccccccccccccccc"
1802                )),
1803            });
1804
1805        let event = LedgerUpdate {
1806            time: 1778789705303_u64,
1807            hash: test_event_hash(6),
1808            evm_tx_hash: None,
1809            delta: LedgerDelta {
1810                delta_type: "send".to_string(),
1811                usdc: None,
1812                amount: Some("1.0".to_string()),
1813                token: Some("USDC".to_string()),
1814                user: Some(exchange_addr().to_string()),
1815                destination: Some(exchange_addr().to_string()),
1816                evm_tx_hash: None,
1817            },
1818        };
1819
1820        observer
1821            .handle_ledger_update(event, "websocket")
1822            .await
1823            .expect("replayed credited event should recover and submit the pending request id");
1824
1825        assert_eq!(
1826            store
1827                .submitted_usdc_requests
1828                .lock()
1829                .expect("submitted_usdc_requests mutex poisoned")
1830                .as_slice(),
1831            &["usdc-request-recovered".to_string()]
1832        );
1833    }
1834
1835    #[test]
1836    fn usdc_decimal_to_wei_string_requires_exact_six_decimals() {
1837        assert_eq!(
1838            usdc_decimal_to_wei_string(dec!(1.234567)).unwrap(),
1839            "1234567"
1840        );
1841        assert!(usdc_decimal_to_wei_string(dec!(1.0000001)).is_err());
1842    }
1843
1844    #[test]
1845    fn config_from_runtime_includes_ws_url() {
1846        let runtime = crate::backend_config::OnchainDepositsRuntimeConfig {
1847            ws_url: Some("wss://hydromancer.example.com/ws".to_string()),
1848            ..Default::default()
1849        };
1850        let config = HypercoreCashLedgerObserverConfig::from_runtime_config(
1851            &runtime,
1852            "https://api.example.com/info",
1853            exchange_addr(),
1854            core_deposit_wallet_addr(),
1855        );
1856        assert_eq!(
1857            config.ws_url.as_deref(),
1858            Some("wss://hydromancer.example.com/ws")
1859        );
1860    }
1861
1862    #[test]
1863    fn config_from_runtime_without_ws_url() {
1864        let runtime = crate::backend_config::OnchainDepositsRuntimeConfig::default();
1865        let config = HypercoreCashLedgerObserverConfig::from_runtime_config(
1866            &runtime,
1867            "https://api.example.com/info",
1868            exchange_addr(),
1869            core_deposit_wallet_addr(),
1870        );
1871        assert!(config.ws_url.is_none());
1872    }
1873}