1use std::sync::Arc;
2use std::time::{Duration, Instant};
3
4use alloy::primitives::FixedBytes;
5use anyhow::{anyhow, Context, Result};
6use async_trait::async_trait;
7use futures::{SinkExt, StreamExt};
8use hypercall_types::WalletAddress;
9use hypercore_rs::cash_ledger::{HyperliquidLedgerRequest, LedgerUpdate, WsLedgerEnvelope};
10use metrics::{counter, gauge};
11use rust_decimal::prelude::ToPrimitive;
12use rust_decimal::Decimal;
13use rust_decimal_macros::dec;
14use tokio::sync::{broadcast, mpsc, oneshot};
15use tokio_tungstenite::tungstenite::Message;
16use tracing::{debug, error, info, warn};
17
18use crate::read_cache::tier::TierCache;
19use crate::rsm::unified_engine::DepositRequest;
20use crate::shared::order_types::get_timestamp_millis;
21use crate::shared::service::{Service, ServiceOwner};
22use hypercall_db::HypercoreCashLedgerApply;
23use hypercall_db_diesel::DatabaseHandler;
24
25const DEFAULT_HL_INFO_URL: &str = "https://api.hyperliquid.xyz/info";
26const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
27const STARTUP_LOOKBACK_MS: i64 = 24 * 60 * 60 * 1000;
28const REPLAY_OVERLAP_MS: i64 = 5 * 60 * 1000;
29const WS_READ_TIMEOUT: Duration = Duration::from_secs(60);
30const WS_BASE_BACKOFF: Duration = Duration::from_secs(1);
31const WS_MAX_BACKOFF: Duration = Duration::from_secs(30);
32
33#[derive(Clone, Debug)]
34pub struct HypercoreCashLedgerObserverConfig {
35 pub poll_interval: Duration,
36 pub info_url: String,
37 pub exchange_address: WalletAddress,
38 pub core_deposit_wallet_address: WalletAddress,
39 pub ws_url: Option<String>,
40}
41
42impl HypercoreCashLedgerObserverConfig {
43 pub fn from_runtime_config(
44 config: &crate::backend_config::OnchainDepositsRuntimeConfig,
45 info_url: impl Into<String>,
46 exchange_address: WalletAddress,
47 core_deposit_wallet_address: WalletAddress,
48 ) -> Self {
49 Self {
50 poll_interval: Duration::from_millis(config.poll_interval_ms),
51 info_url: info_url.into(),
52 exchange_address,
53 core_deposit_wallet_address,
54 ws_url: config.ws_url.clone(),
55 }
56 }
57}
58
59impl Default for HypercoreCashLedgerObserverConfig {
60 fn default() -> Self {
61 Self {
62 poll_interval: Duration::from_millis(
63 crate::backend_config::OnchainDepositsRuntimeConfig::default().poll_interval_ms,
64 ),
65 info_url: DEFAULT_HL_INFO_URL.to_string(),
66 exchange_address: WalletAddress::default(),
67 core_deposit_wallet_address: WalletAddress::default(),
68 ws_url: None,
69 }
70 }
71}
72
73fn record_cash_ledger_watermark_gauges(
74 now_ms: i64,
75 last_event_time_ms: Option<i64>,
76 replay_start_time_ms: i64,
77) {
78 gauge!("ht_hypercore_cash_ledger_observer_replay_start_age_seconds")
79 .set(ms_age_seconds(now_ms, replay_start_time_ms));
80 match last_event_time_ms {
81 Some(last_event_time_ms) => {
82 gauge!("ht_hypercore_cash_ledger_observer_watermark_present").set(1.0);
83 gauge!("ht_hypercore_cash_ledger_observer_watermark_age_seconds")
84 .set(ms_age_seconds(now_ms, last_event_time_ms));
85 }
86 None => {
87 gauge!("ht_hypercore_cash_ledger_observer_watermark_present").set(0.0);
88 }
89 }
90}
91
92fn ms_age_seconds(now_ms: i64, observed_ms: i64) -> f64 {
93 now_ms.saturating_sub(observed_ms) as f64 / 1_000.0
94}
95
96#[async_trait]
97pub trait HypercoreCashLedgerStore: Send + Sync + 'static {
98 fn exchange_watermark(&self) -> Result<Option<i64>>;
99 async fn pending_rsm_usdc_deposit_for_amount(
100 &self,
101 amount_wei: &str,
102 ) -> Result<Option<hypercall_db::RsmUsdcDepositMatch>>;
103 async fn pending_rsm_usdc_deposit_for_evm_tx_hash(
104 &self,
105 evm_tx_hash: &str,
106 amount_wei: &str,
107 ) -> Result<Option<hypercall_db::RsmUsdcDepositMatch>>;
108 async fn pending_rsm_usdc_deposit_for_credited_hypercore_event(
109 &self,
110 event_hash: &str,
111 amount_wei: &str,
112 ) -> Result<Option<hypercall_db::RsmUsdcDepositMatch>>;
113 async fn pm_liquidity_deposit_for_evm_tx_hash(
114 &self,
115 evm_tx_hash: &str,
116 amount_wei: &str,
117 ) -> Result<Option<hypercall_db::RsmUsdcDepositMatch>>;
118 async fn pm_liquidity_deposit_for_amount(
119 &self,
120 amount_wei: &str,
121 ) -> Result<Option<hypercall_db::RsmUsdcDepositMatch>>;
122 async fn mark_pm_liquidity_deposit_hypercore_matched(
123 &self,
124 request_id: &str,
125 event_hash: &str,
126 ) -> Result<()>;
127 async fn credited_wallet_for_hypercore_cash_event(
128 &self,
129 event_hash: &str,
130 ) -> Result<Option<WalletAddress>>;
131 async fn non_crediting_hypercore_cash_event(
132 &self,
133 event_hash: &str,
134 amount_usdc: Decimal,
135 ) -> Result<bool>;
136 async fn mark_rsm_deposit_credit_submitted(&self, request_id: &str) -> Result<()>;
137 async fn record_non_crediting_deposit(&self, input: &HypercoreCashLedgerApply) -> Result<()>;
138 async fn apply_deposit(
139 &self,
140 input: &HypercoreCashLedgerApply,
141 ) -> Result<hypercall_db::HypercoreCashLedgerApplyResult>;
142}
143
144#[async_trait]
145impl HypercoreCashLedgerStore for DatabaseHandler {
146 fn exchange_watermark(&self) -> Result<Option<i64>> {
147 self.get_exchange_cash_ledger_watermark_sync()
148 }
149
150 async fn pending_rsm_usdc_deposit_for_amount(
151 &self,
152 amount_wei: &str,
153 ) -> Result<Option<hypercall_db::RsmUsdcDepositMatch>> {
154 self.pending_rsm_usdc_deposit_for_amount(amount_wei).await
155 }
156
157 async fn pending_rsm_usdc_deposit_for_evm_tx_hash(
158 &self,
159 evm_tx_hash: &str,
160 amount_wei: &str,
161 ) -> Result<Option<hypercall_db::RsmUsdcDepositMatch>> {
162 self.pending_rsm_usdc_deposit_for_evm_tx_hash(evm_tx_hash, amount_wei)
163 .await
164 }
165
166 async fn pending_rsm_usdc_deposit_for_credited_hypercore_event(
167 &self,
168 event_hash: &str,
169 amount_wei: &str,
170 ) -> Result<Option<hypercall_db::RsmUsdcDepositMatch>> {
171 self.pending_rsm_usdc_deposit_for_credited_hypercore_event(event_hash, amount_wei)
172 .await
173 }
174
175 async fn pm_liquidity_deposit_for_evm_tx_hash(
176 &self,
177 evm_tx_hash: &str,
178 amount_wei: &str,
179 ) -> Result<Option<hypercall_db::RsmUsdcDepositMatch>> {
180 self.pm_liquidity_deposit_for_evm_tx_hash(evm_tx_hash, amount_wei)
181 .await
182 }
183
184 async fn pm_liquidity_deposit_for_amount(
185 &self,
186 amount_wei: &str,
187 ) -> Result<Option<hypercall_db::RsmUsdcDepositMatch>> {
188 self.pm_liquidity_deposit_for_amount(amount_wei).await
189 }
190
191 async fn mark_pm_liquidity_deposit_hypercore_matched(
192 &self,
193 request_id: &str,
194 event_hash: &str,
195 ) -> Result<()> {
196 self.mark_pm_liquidity_deposit_hypercore_matched(request_id, event_hash)
197 .await
198 }
199
200 async fn credited_wallet_for_hypercore_cash_event(
201 &self,
202 event_hash: &str,
203 ) -> Result<Option<WalletAddress>> {
204 self.credited_wallet_for_hypercore_cash_event(event_hash)
205 .await
206 }
207
208 async fn non_crediting_hypercore_cash_event(
209 &self,
210 event_hash: &str,
211 amount_usdc: Decimal,
212 ) -> Result<bool> {
213 self.non_crediting_hypercore_cash_event(event_hash, amount_usdc)
214 .await
215 }
216
217 async fn mark_rsm_deposit_credit_submitted(&self, request_id: &str) -> Result<()> {
218 self.mark_rsm_deposit_credit_submitted(request_id).await
219 }
220
221 async fn record_non_crediting_deposit(&self, input: &HypercoreCashLedgerApply) -> Result<()> {
222 self.record_hypercore_cash_deposit_non_crediting(input)
223 .await
224 }
225
226 async fn apply_deposit(
227 &self,
228 input: &HypercoreCashLedgerApply,
229 ) -> Result<hypercall_db::HypercoreCashLedgerApplyResult> {
230 self.apply_hypercore_cash_deposit(input).await
231 }
232}
233
234#[async_trait]
235pub trait HypercoreCashDepositApplier: Send + Sync + 'static {
236 async fn apply_deposit(&self, request: DepositRequest) -> Result<()>;
237}
238
239#[async_trait]
240impl HypercoreCashDepositApplier for mpsc::Sender<DepositRequest> {
241 async fn apply_deposit(&self, mut request: DepositRequest) -> Result<()> {
242 let (tx, rx) = oneshot::channel();
243 request.applied_tx = Some(tx);
244 self.send(request)
245 .await
246 .map_err(|_| anyhow!("cash deposit receiver closed"))?;
247 rx.await
248 .map_err(|_| anyhow!("cash deposit apply acknowledgement dropped"))?
249 .map_err(|err| anyhow!(err))
250 }
251}
252
253pub struct HypercoreCashLedgerObserver {
254 store: Arc<dyn HypercoreCashLedgerStore>,
255 applier: Arc<dyn HypercoreCashDepositApplier>,
256 client: reqwest::Client,
257 config: HypercoreCashLedgerObserverConfig,
258 tier_cache: Arc<TierCache>,
259}
260
261#[derive(Debug, Clone, Copy, PartialEq, Eq)]
262enum CashLedgerCreditDecision {
263 Apply,
264 SkipPortfolioMargin,
265}
266
267impl HypercoreCashLedgerObserver {
268 pub fn new(
269 db: Arc<DatabaseHandler>,
270 deposit_sender: mpsc::Sender<DepositRequest>,
271 config: HypercoreCashLedgerObserverConfig,
272 tier_cache: Arc<TierCache>,
273 ) -> Result<Self> {
274 let client = reqwest::Client::builder()
275 .timeout(REQUEST_TIMEOUT)
276 .build()
277 .context("failed to build HyperCore cash ledger HTTP client")?;
278 Ok(Self {
279 store: db,
280 applier: Arc::new(deposit_sender),
281 client,
282 config,
283 tier_cache,
284 })
285 }
286
287 async fn cash_ledger_credit_decision(
293 &self,
294 wallet: &WalletAddress,
295 ) -> Result<CashLedgerCreditDecision> {
296 match self.tier_cache.get_margin_mode(wallet).await {
297 Ok(mode) if mode.is_portfolio() => Ok(CashLedgerCreditDecision::SkipPortfolioMargin),
298 Ok(_) => Ok(CashLedgerCreditDecision::Apply),
299 Err(error) => Err(error).with_context(|| {
300 format!(
301 "failed to determine margin mode for {}; refusing cash ledger credit",
302 wallet
303 )
304 }),
305 }
306 }
307
308 pub async fn run_with_shutdown(&self, mut shutdown_rx: broadcast::Receiver<()>) -> Result<()> {
309 match &self.config.ws_url {
310 Some(ws_url) => {
311 info!("HyperCore cash ledger observer: running startup catchup via HTTP poll");
315 let mut catchup_ok = false;
316 for attempt in 1..=3u32 {
317 let poll_result = tokio::select! {
318 result = self.poll_once() => result,
319 _ = shutdown_rx.recv() => {
320 info!("HyperCore cash ledger observer: shutdown signal received during startup catchup");
321 return Ok(());
322 }
323 };
324
325 match poll_result {
326 Ok(()) => {
327 catchup_ok = true;
328 break;
329 }
330 Err(error) => {
331 warn!(
332 attempt,
333 "HyperCore cash ledger observer: startup catchup failed: {}", error
334 );
335 if attempt < 3 {
336 tokio::select! {
337 _ = tokio::time::sleep(Duration::from_secs(u64::from(attempt) * 2)) => {}
338 _ = shutdown_rx.recv() => {
339 info!("HyperCore cash ledger observer: shutdown signal received during startup catchup backoff");
340 return Ok(());
341 }
342 }
343 }
344 }
345 }
346 }
347 if !catchup_ok {
348 return Err(anyhow!(
349 "HyperCore cash ledger observer startup catchup exhausted retries; refusing to switch to WebSocket mode with a known historical gap"
350 ));
351 }
352
353 info!(
355 "HyperCore cash ledger observer: switching to WebSocket mode at {}",
356 ws_url
357 );
358 self.run_websocket(ws_url.clone(), &mut shutdown_rx).await
359 }
360 None => {
361 info!("HyperCore cash ledger observer: running in HTTP polling mode (no ws_url configured)");
363 self.run_polling(shutdown_rx).await
364 }
365 }
366 }
367
368 async fn run_polling(&self, mut shutdown_rx: broadcast::Receiver<()>) -> Result<()> {
370 let mut interval = tokio::time::interval(self.config.poll_interval);
371
372 loop {
373 tokio::select! {
374 _ = shutdown_rx.recv() => {
375 info!("HyperCore cash ledger observer received shutdown signal");
376 break;
377 }
378 _ = interval.tick() => {
379 if let Err(error) = self.poll_once().await {
380 counter!("ht_hypercore_cash_ledger_errors_total", "stage" => "poll").increment(1);
381 warn!("HyperCore cash ledger observer poll failed: {}", error);
382 }
383 }
384 }
385 }
386
387 Ok(())
388 }
389
390 async fn run_websocket(
392 &self,
393 ws_url: String,
394 shutdown_rx: &mut broadcast::Receiver<()>,
395 ) -> Result<()> {
396 let mut consecutive_failures: u32 = 0;
397
398 loop {
399 let before = Instant::now();
400 tokio::select! {
401 _ = shutdown_rx.recv() => {
402 info!("HyperCore cash ledger observer: shutdown signal received (WS mode)");
403 return Ok(());
404 }
405 result = self.run_ws_connection(&ws_url) => {
406 if let Err(ref err) = result {
407 counter!("ht_hypercore_cash_ledger_errors_total", "stage" => "ws_connection").increment(1);
408 warn!("HyperCore cash ledger observer WS connection error: {}", err);
409 }
410
411 if before.elapsed() > Duration::from_secs(60) {
413 consecutive_failures = 0;
414 }
415 consecutive_failures += 1;
416 let backoff = std::cmp::min(
417 WS_BASE_BACKOFF * 2u32.saturating_pow(consecutive_failures - 1),
418 WS_MAX_BACKOFF,
419 );
420 warn!(
421 "HyperCore cash ledger observer: WS disconnected (failures: {}), reconnecting in {:?}",
422 consecutive_failures, backoff
423 );
424
425 tokio::select! {
426 _ = shutdown_rx.recv() => {
427 info!("HyperCore cash ledger observer: shutdown during reconnect backoff");
428 return Ok(());
429 }
430 _ = tokio::time::sleep(backoff) => {}
431 }
432
433 if let Err(error) = self.poll_once().await {
436 warn!(
437 "HyperCore cash ledger observer: reconnect catch-up poll failed: {}",
438 error
439 );
440 }
441 }
442 }
443 }
444 }
445
446 async fn run_ws_connection(&self, ws_url: &str) -> Result<()> {
448 info!(
449 "HyperCore cash ledger observer: connecting to WS at {}",
450 ws_url
451 );
452
453 let (ws_stream, _) = tokio_tungstenite::connect_async(ws_url)
454 .await
455 .context("failed to connect to Hydromancer WebSocket")?;
456
457 info!("HyperCore cash ledger observer: WS connected");
458 gauge!("ht_hypercore_cash_ledger_ws_connected").set(1.0);
459
460 let (mut write, mut read) = ws_stream.split();
461
462 let sub_msg = serde_json::json!({
464 "method": "subscribe",
465 "subscription": {
466 "type": "allUserNonFundingLedgerEvents"
467 }
468 });
469 let sub_text = serde_json::to_string(&sub_msg)
470 .context("failed to serialize WS subscription message")?;
471 write
472 .send(Message::Text(sub_text))
473 .await
474 .context("failed to send WS subscription message")?;
475 info!("HyperCore cash ledger observer: subscribed to allUserNonFundingLedgerEvents");
476
477 let mut catchup_interval = tokio::time::interval(self.config.poll_interval);
478 catchup_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
479 catchup_interval.tick().await;
480 let read_timeout = tokio::time::sleep(WS_READ_TIMEOUT);
481 tokio::pin!(read_timeout);
482
483 loop {
485 tokio::select! {
486 _ = &mut read_timeout => {
487 warn!(
488 "HyperCore cash ledger observer: WS read timeout ({}s), reconnecting",
489 WS_READ_TIMEOUT.as_secs()
490 );
491 break;
492 },
493 maybe_msg = read.next() => {
494 read_timeout.as_mut().reset(tokio::time::Instant::now() + WS_READ_TIMEOUT);
495 match maybe_msg {
496 Some(Ok(Message::Text(text))) => {
497 if let Err(error) = self.handle_ws_message(&text).await {
498 counter!("ht_hypercore_cash_ledger_errors_total", "stage" => "ws_message").increment(1);
499 warn!(
500 "HyperCore cash ledger observer: error handling WS message: {}",
501 error
502 );
503 }
504 }
505 Some(Ok(Message::Ping(data))) => {
506 if let Err(e) = write.send(Message::Pong(data)).await {
507 error!("HyperCore cash ledger observer: failed to send pong: {}", e);
508 break;
509 }
510 }
511 Some(Ok(Message::Close(_))) => {
512 info!("HyperCore cash ledger observer: server sent close frame");
513 break;
514 }
515 Some(Err(e)) => {
516 error!("HyperCore cash ledger observer: WS read error: {}", e);
517 break;
518 }
519 None => {
520 info!("HyperCore cash ledger observer: WS stream ended");
521 break;
522 }
523 _ => {}
524 }
525 },
526 _ = catchup_interval.tick() => {
527 if let Err(error) = self.poll_once().await {
528 counter!("ht_hypercore_cash_ledger_errors_total", "stage" => "ws_catchup_poll").increment(1);
529 warn!(
530 "HyperCore cash ledger observer WS catch-up poll failed: {}",
531 error
532 );
533 }
534 }
535 }
536 }
537
538 gauge!("ht_hypercore_cash_ledger_ws_connected").set(0.0);
539 Ok(())
540 }
541
542 async fn handle_ws_message(&self, text: &str) -> Result<()> {
544 let envelope: WsLedgerEnvelope = match serde_json::from_str(text) {
546 Ok(e) => e,
547 Err(_) => {
548 debug!(
550 "HyperCore cash ledger observer: non-data WS message: {}",
551 &text[..text.len().min(200)]
552 );
553 return Ok(());
554 }
555 };
556
557 if !matches!(
559 envelope.channel.as_deref(),
560 Some(c) if c.contains("NonFundingLedger")
561 ) {
562 return Ok(());
563 }
564
565 let events = match envelope.data {
566 Some(events) => events,
567 None => return Ok(()),
568 };
569
570 for event in events {
571 self.handle_ledger_update(event, "websocket").await?;
572 }
573
574 Ok(())
575 }
576
577 async fn handle_ledger_update(&self, update: LedgerUpdate, source: &'static str) -> Result<()> {
579 let Some(transfer) = update.transfer()? else {
580 return Ok(());
581 };
582 if transfer.destination != self.config.exchange_address.0 {
586 return Ok(());
587 }
588 if transfer.token != "USDC" {
589 return Ok(());
590 };
591 let sender = WalletAddress::from(transfer.sender);
592 let amount_usdc = transfer.amount;
593 if amount_usdc <= Decimal::ZERO {
594 anyhow::bail!(
595 "pool transfer {} has non-positive amount {}",
596 transfer.event_hash,
597 amount_usdc
598 );
599 }
600
601 let credit_resolution = self
602 .resolve_usdc_deposit_credit(
603 sender,
604 amount_usdc,
605 &transfer.event_hash.to_string(),
606 transfer.writer_evm_tx_hash.as_deref(),
607 )
608 .await?;
609 let (credit_wallet, request_id, non_crediting_reason) = match credit_resolution {
610 CashCreditResolution::DirectSender => (sender, None, None),
611 CashCreditResolution::CreditAccount { wallet, request_id } => {
612 (wallet, request_id, None)
613 }
614 CashCreditResolution::SkipPmLiquidity => (sender, None, Some("pm_liquidity")),
615 };
616 let event_time_ms = i64::try_from(transfer.time)
617 .with_context(|| format!("ledger event time {} exceeds i64 range", transfer.time))?;
618 let event_hash = transfer.event_hash.to_string();
619 let apply = HypercoreCashLedgerApply {
620 wallet: credit_wallet,
621 event_hash,
622 event_time_ms,
623 amount_usdc,
624 };
625 if let Some(reason) = non_crediting_reason {
626 self.store.record_non_crediting_deposit(&apply).await?;
627 counter!(
628 "ht_hypercore_cash_pool_transfers_skipped_total",
629 "reason" => reason
630 )
631 .increment(1);
632 return Ok(());
633 }
634
635 match self.cash_ledger_credit_decision(&credit_wallet).await? {
639 CashLedgerCreditDecision::Apply => {}
640 CashLedgerCreditDecision::SkipPortfolioMargin => {
641 counter!("ht_hypercore_cash_pool_transfers_skipped_total", "reason" => "portfolio_margin")
642 .increment(1);
643 warn!(
644 wallet = %credit_wallet,
645 amount_usdc = %amount_usdc,
646 event_hash = %apply.event_hash,
647 source = source,
648 "Skipping balance_ledger credit for portfolio margin wallet (equity via Hydromancer)"
649 );
650 return Ok(());
651 }
652 }
653 let result = self.store.apply_deposit(&apply).await?;
658 self.apply_engine_deposit(
659 credit_wallet,
660 amount_usdc,
661 &result,
662 transfer.time,
663 transfer.event_hash,
664 )
665 .await?;
666 if let Some(request_id) = request_id {
667 self.store
668 .mark_rsm_deposit_credit_submitted(&request_id)
669 .await?;
670 }
671 counter!("ht_hypercore_cash_pool_transfers_applied_total").increment(1);
672 info!(
673 wallet = %credit_wallet,
674 hypercore_sender = %sender,
675 amount_usdc = %amount_usdc,
676 event_hash = %apply.event_hash,
677 balance_after = ?result.balance_after,
678 exchange_address = %self.config.exchange_address,
679 source = source,
680 "Applied Account->Exchange pool transfer to engine cash ledger"
681 );
682 Ok(())
683 }
684
685 async fn poll_once(&self) -> Result<()> {
686 let now_ms =
687 i64::try_from(get_timestamp_millis()).context("current timestamp exceeds i64 range")?;
688 let last_event_time_ms = self.store.exchange_watermark()?;
689 let start_time = last_event_time_ms
690 .map(|last| last.saturating_sub(REPLAY_OVERLAP_MS))
691 .unwrap_or_else(|| now_ms.saturating_sub(STARTUP_LOOKBACK_MS));
692 record_cash_ledger_watermark_gauges(now_ms, last_event_time_ms, start_time);
693
694 let request = HyperliquidLedgerRequest::user_non_funding_ledger_updates(
695 self.config.exchange_address.to_string(),
696 start_time,
697 );
698 let response = self
699 .client
700 .post(&self.config.info_url)
701 .json(&request)
702 .send()
703 .await
704 .with_context(|| {
705 format!(
706 "failed to request ledger updates for exchange {}",
707 self.config.exchange_address
708 )
709 })?;
710 let status = response.status();
711 if !status.is_success() {
712 let body = response
713 .text()
714 .await
715 .unwrap_or_else(|_| "unknown error".to_string());
716 anyhow::bail!(
717 "ledger update request for exchange {} failed: {status}: {body}",
718 self.config.exchange_address
719 );
720 }
721
722 let updates: Vec<LedgerUpdate> = response.json().await.with_context(|| {
723 format!(
724 "failed to parse ledger updates for exchange {}",
725 self.config.exchange_address
726 )
727 })?;
728 for update in updates {
729 self.handle_ledger_update(update, "http_poll").await?;
730 }
731 Ok(())
732 }
733 async fn apply_engine_deposit(
734 &self,
735 wallet: WalletAddress,
736 amount_usdc: Decimal,
737 result: &hypercall_db::HypercoreCashLedgerApplyResult,
738 timestamp_ms: u64,
739 source_event_hash: FixedBytes<32>,
740 ) -> Result<()> {
741 let Some(sequence) = result.ledger_event_id else {
742 anyhow::bail!(
743 "cash ledger deposit for {} source_event_hash={} has no ledger_event_id; refusing engine apply",
744 wallet,
745 source_event_hash
746 );
747 };
748 self.applier
749 .apply_deposit(DepositRequest {
750 wallet,
751 amount: amount_usdc,
752 timestamp_ms,
753 sequence: Some(sequence),
754 source_event_hash,
755 journal_request_id: hypercore_cash_deposit_request_id(sequence),
756 outbox_appends: Vec::new(),
757 applied_tx: None,
758 })
759 .await
760 }
761
762 async fn resolve_usdc_deposit_credit(
763 &self,
764 hypercore_sender: WalletAddress,
765 amount_usdc: Decimal,
766 event_hash: &str,
767 writer_evm_tx_hash: Option<&str>,
768 ) -> Result<CashCreditResolution> {
769 if hypercore_sender != self.config.core_deposit_wallet_address
770 && hypercore_sender != self.config.exchange_address
771 {
772 return Ok(CashCreditResolution::DirectSender);
773 }
774
775 let amount_wei = usdc_decimal_to_wei_string(amount_usdc)?;
776 if let Some(wallet) = self
777 .store
778 .credited_wallet_for_hypercore_cash_event(event_hash)
779 .await?
780 {
781 let request_id = self
782 .store
783 .pending_rsm_usdc_deposit_for_credited_hypercore_event(event_hash, &amount_wei)
784 .await?
785 .map(|matched| matched.request_id);
786 return Ok(CashCreditResolution::CreditAccount { wallet, request_id });
787 }
788 if self
789 .store
790 .non_crediting_hypercore_cash_event(event_hash, amount_usdc)
791 .await?
792 {
793 return Ok(CashCreditResolution::SkipPmLiquidity);
794 }
795
796 let matched = if let Some(writer_evm_tx_hash) = writer_evm_tx_hash.and_then(non_empty_str) {
797 match self
798 .store
799 .pending_rsm_usdc_deposit_for_evm_tx_hash(writer_evm_tx_hash, &amount_wei)
800 .await?
801 {
802 Some(matched) => matched,
803 None => {
804 if let Some(pm_match) = self
805 .store
806 .pm_liquidity_deposit_for_evm_tx_hash(writer_evm_tx_hash, &amount_wei)
807 .await?
808 {
809 self.store
810 .mark_pm_liquidity_deposit_hypercore_matched(
811 &pm_match.request_id,
812 event_hash,
813 )
814 .await?;
815 info!(
816 lp = %pm_match.account,
817 evm_tx_hash = %pm_match.tx_hash,
818 evm_log_index = pm_match.log_index,
819 hypercore_event_hash = %event_hash,
820 amount_wei = %amount_wei,
821 "Matched Exchange.PmLiquidityDeposit to HyperCore cash ledger deposit; skipping user cash credit"
822 );
823 return Ok(CashCreditResolution::SkipPmLiquidity);
824 }
825 anyhow::bail!(
826 "CoreWriter evm_tx_hash {} for HyperCore USDC deposit {} amount={} has no pending Exchange.UsdcDeposit attribution",
827 writer_evm_tx_hash,
828 event_hash,
829 amount_usdc
830 );
831 }
832 }
833 } else {
834 match self
835 .store
836 .pending_rsm_usdc_deposit_for_amount(&amount_wei)
837 .await?
838 {
839 Some(matched) => matched,
840 None => {
841 if let Some(pm_match) = self
842 .store
843 .pm_liquidity_deposit_for_amount(&amount_wei)
844 .await?
845 {
846 self.store
847 .mark_pm_liquidity_deposit_hypercore_matched(
848 &pm_match.request_id,
849 event_hash,
850 )
851 .await?;
852 info!(
853 lp = %pm_match.account,
854 evm_tx_hash = %pm_match.tx_hash,
855 evm_log_index = pm_match.log_index,
856 hypercore_event_hash = %event_hash,
857 amount_wei = %amount_wei,
858 "Matched Exchange.PmLiquidityDeposit to HyperCore cash ledger deposit by amount; skipping user cash credit"
859 );
860 return Ok(CashCreditResolution::SkipPmLiquidity);
861 }
862 anyhow::bail!(
863 "Exchange/CoreDepositWallet-originated HyperCore USDC deposit {} amount={} has no pending Exchange.UsdcDeposit attribution",
864 event_hash,
865 amount_usdc
866 );
867 }
868 }
869 };
870
871 info!(
872 credited_account = %matched.account,
873 evm_tx_hash = %matched.tx_hash,
874 evm_log_index = matched.log_index,
875 hypercore_event_hash = %event_hash,
876 writer_evm_tx_hash = writer_evm_tx_hash.unwrap_or(""),
877 amount_wei = %amount_wei,
878 "Matched Exchange.UsdcDeposit to HyperCore cash ledger deposit"
879 );
880 Ok(CashCreditResolution::CreditAccount {
881 wallet: matched.account,
882 request_id: Some(matched.request_id),
883 })
884 }
885}
886
887#[derive(Debug, Clone, PartialEq, Eq)]
888enum CashCreditResolution {
889 DirectSender,
890 CreditAccount {
891 wallet: WalletAddress,
892 request_id: Option<String>,
893 },
894 SkipPmLiquidity,
895}
896
897fn hypercore_cash_deposit_request_id(ledger_event_id: u64) -> String {
898 const HYPERCORE_CASH_DEPOSIT_UUID_PREFIX: u128 = 0x4859_4343_4153_4800_0000_0000_0000_0000;
899 uuid::Uuid::from_u128(HYPERCORE_CASH_DEPOSIT_UUID_PREFIX | u128::from(ledger_event_id))
900 .to_string()
901}
902
903fn usdc_decimal_to_wei_string(amount_usdc: Decimal) -> Result<String> {
904 if amount_usdc <= Decimal::ZERO {
905 anyhow::bail!("USDC deposit amount must be positive, got {}", amount_usdc);
906 }
907 let scaled = amount_usdc * dec!(1000000);
908 if scaled.fract() != Decimal::ZERO {
909 anyhow::bail!(
910 "USDC deposit amount {} has more than 6 decimal places",
911 amount_usdc
912 );
913 }
914 let wei = scaled
915 .to_u128()
916 .ok_or_else(|| anyhow!("USDC deposit amount {} exceeds u128 range", amount_usdc))?;
917 Ok(wei.to_string())
918}
919
920fn non_empty_str(value: &str) -> Option<&str> {
921 let trimmed = value.trim();
922 if trimmed.is_empty() {
923 None
924 } else {
925 Some(trimmed)
926 }
927}
928
929#[async_trait]
930impl Service for HypercoreCashLedgerObserver {
931 fn name(&self) -> &'static str {
932 "HypercoreCashLedgerObserver"
933 }
934
935 fn owner(&self) -> ServiceOwner {
936 ServiceOwner::Shared
937 }
938
939 async fn run(self: Arc<Self>, shutdown: crate::shared::shutdown::ShutdownRx) -> Result<()> {
940 self.run_with_shutdown(shutdown).await
941 }
942}
943
944#[cfg(test)]
945mod tests {
946 use super::*;
947 use hypercore_rs::cash_ledger::LedgerDelta;
948 use std::collections::{HashMap, HashSet, VecDeque};
949 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
950 use std::sync::Mutex;
951
952 fn exchange_addr() -> WalletAddress {
955 WalletAddress::from(alloy::primitives::address!(
956 "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
957 ))
958 }
959
960 fn core_deposit_wallet_addr() -> WalletAddress {
961 WalletAddress::from(alloy::primitives::address!(
962 "6b9e773128f453f5c2c60935ee2de2cbc5390a24"
963 ))
964 }
965
966 fn sender_addr() -> WalletAddress {
967 WalletAddress::from(alloy::primitives::address!(
968 "301cd221cf81ef94cd276042aacfbcd0e3795589"
969 ))
970 }
971
972 fn test_event_hash(byte: u8) -> FixedBytes<32> {
973 FixedBytes::from([byte; 32])
974 }
975
976 fn test_event_hash_string(byte: u8) -> String {
977 test_event_hash(byte).to_string()
978 }
979
980 struct MockTierDb {
981 mode: Option<hypercall_types::MarginMode>,
982 }
983
984 impl hypercall_db::TierReader for MockTierDb {
985 fn get_margin_mode_sync(
986 &self,
987 _wallet: &WalletAddress,
988 ) -> Result<hypercall_types::MarginMode> {
989 Ok(self.mode.unwrap_or(hypercall_types::MarginMode::Standard))
990 }
991
992 fn get_existing_margin_mode_sync(
993 &self,
994 _wallet: &WalletAddress,
995 ) -> Result<Option<hypercall_types::MarginMode>> {
996 Ok(self.mode)
997 }
998
999 fn get_tier_defaults_sync(
1000 &self,
1001 _tier_name: &str,
1002 ) -> Result<Option<hypercall_db::TierDefaultsRecord>> {
1003 Ok(None)
1004 }
1005
1006 fn get_user_tier_sync(
1007 &self,
1008 _wallet: &WalletAddress,
1009 ) -> Result<Option<hypercall_db::UserTierRecord>> {
1010 Ok(None)
1011 }
1012
1013 fn get_all_user_tiers_sync(&self) -> Result<Vec<hypercall_db::UserTierRecord>> {
1014 Ok(Vec::new())
1015 }
1016 }
1017
1018 impl hypercall_db::TierWriter for MockTierDb {
1019 fn save_user_tier_sync(&self, _update: &hypercall_db::UserTierUpdate) -> Result<()> {
1020 Ok(())
1021 }
1022
1023 fn set_margin_mode_sync(
1024 &self,
1025 _wallet: &WalletAddress,
1026 _margin_mode: hypercall_types::MarginMode,
1027 ) -> Result<i64> {
1028 Ok(1)
1029 }
1030
1031 fn insert_margin_mode_if_missing_sync(
1032 &self,
1033 _wallet: &WalletAddress,
1034 _margin_mode: hypercall_types::MarginMode,
1035 ) -> Result<Option<i64>> {
1036 Ok(Some(1))
1037 }
1038
1039 fn delete_user_tier_sync(&self, _wallet: &WalletAddress) -> Result<()> {
1040 Ok(())
1041 }
1042 }
1043
1044 struct CountingStore {
1045 apply_calls: AtomicUsize,
1046 apply_inputs: Mutex<Vec<HypercoreCashLedgerApply>>,
1047 apply_results: Mutex<VecDeque<hypercall_db::HypercoreCashLedgerApplyResult>>,
1048 pending_usdc_matches: Mutex<VecDeque<hypercall_db::RsmUsdcDepositMatch>>,
1049 pm_liquidity_matches: Mutex<VecDeque<hypercall_db::RsmUsdcDepositMatch>>,
1050 matched_pm_liquidity_requests: Mutex<Vec<(String, String)>>,
1051 credited_event_wallets: Mutex<HashMap<String, WalletAddress>>,
1052 non_crediting_events: Mutex<HashSet<(String, Decimal)>>,
1053 submitted_usdc_requests: Mutex<Vec<String>>,
1054 }
1055
1056 #[async_trait::async_trait]
1057 impl HypercoreCashLedgerStore for CountingStore {
1058 fn exchange_watermark(&self) -> Result<Option<i64>> {
1059 Ok(None)
1060 }
1061
1062 async fn pending_rsm_usdc_deposit_for_amount(
1063 &self,
1064 _amount_wei: &str,
1065 ) -> Result<Option<hypercall_db::RsmUsdcDepositMatch>> {
1066 Ok(self
1067 .pending_usdc_matches
1068 .lock()
1069 .expect("pending_usdc_matches mutex poisoned")
1070 .pop_front())
1071 }
1072
1073 async fn pending_rsm_usdc_deposit_for_evm_tx_hash(
1074 &self,
1075 evm_tx_hash: &str,
1076 amount_wei: &str,
1077 ) -> Result<Option<hypercall_db::RsmUsdcDepositMatch>> {
1078 let mut matches = self
1079 .pending_usdc_matches
1080 .lock()
1081 .expect("pending_usdc_matches mutex poisoned");
1082 let Some(index) = matches
1083 .iter()
1084 .position(|row| row.tx_hash == evm_tx_hash && row.amount_wei == amount_wei)
1085 else {
1086 return Ok(None);
1087 };
1088 Ok(matches.remove(index))
1089 }
1090
1091 async fn pending_rsm_usdc_deposit_for_credited_hypercore_event(
1092 &self,
1093 event_hash: &str,
1094 amount_wei: &str,
1095 ) -> Result<Option<hypercall_db::RsmUsdcDepositMatch>> {
1096 let Some(wallet) = self
1097 .credited_event_wallets
1098 .lock()
1099 .expect("credited_event_wallets mutex poisoned")
1100 .get(event_hash)
1101 .copied()
1102 else {
1103 return Ok(None);
1104 };
1105 let mut matches = self
1106 .pending_usdc_matches
1107 .lock()
1108 .expect("pending_usdc_matches mutex poisoned");
1109 let Some(index) = matches
1110 .iter()
1111 .position(|row| row.account == wallet && row.amount_wei == amount_wei)
1112 else {
1113 return Ok(None);
1114 };
1115 Ok(matches.remove(index))
1116 }
1117
1118 async fn pm_liquidity_deposit_for_evm_tx_hash(
1119 &self,
1120 evm_tx_hash: &str,
1121 amount_wei: &str,
1122 ) -> Result<Option<hypercall_db::RsmUsdcDepositMatch>> {
1123 Ok(self
1124 .pm_liquidity_matches
1125 .lock()
1126 .expect("pm_liquidity_matches mutex poisoned")
1127 .iter()
1128 .find(|row| row.tx_hash == evm_tx_hash && row.amount_wei == amount_wei)
1129 .cloned())
1130 }
1131
1132 async fn pm_liquidity_deposit_for_amount(
1133 &self,
1134 amount_wei: &str,
1135 ) -> Result<Option<hypercall_db::RsmUsdcDepositMatch>> {
1136 Ok(self
1137 .pm_liquidity_matches
1138 .lock()
1139 .expect("pm_liquidity_matches mutex poisoned")
1140 .iter()
1141 .find(|row| row.amount_wei == amount_wei)
1142 .cloned())
1143 }
1144
1145 async fn mark_pm_liquidity_deposit_hypercore_matched(
1146 &self,
1147 request_id: &str,
1148 event_hash: &str,
1149 ) -> Result<()> {
1150 self.matched_pm_liquidity_requests
1151 .lock()
1152 .expect("matched_pm_liquidity_requests mutex poisoned")
1153 .push((request_id.to_string(), event_hash.to_string()));
1154 let mut matches = self
1155 .pm_liquidity_matches
1156 .lock()
1157 .expect("pm_liquidity_matches mutex poisoned");
1158 if let Some(index) = matches.iter().position(|row| row.request_id == request_id) {
1159 matches.remove(index);
1160 }
1161 Ok(())
1162 }
1163
1164 async fn credited_wallet_for_hypercore_cash_event(
1165 &self,
1166 event_hash: &str,
1167 ) -> Result<Option<WalletAddress>> {
1168 Ok(self
1169 .credited_event_wallets
1170 .lock()
1171 .expect("credited_event_wallets mutex poisoned")
1172 .get(event_hash)
1173 .copied())
1174 }
1175
1176 async fn non_crediting_hypercore_cash_event(
1177 &self,
1178 event_hash: &str,
1179 amount_usdc: Decimal,
1180 ) -> Result<bool> {
1181 Ok(self
1182 .non_crediting_events
1183 .lock()
1184 .expect("non_crediting_events mutex poisoned")
1185 .contains(&(event_hash.to_string(), amount_usdc)))
1186 }
1187
1188 async fn mark_rsm_deposit_credit_submitted(&self, request_id: &str) -> Result<()> {
1189 self.submitted_usdc_requests
1190 .lock()
1191 .expect("submitted_usdc_requests mutex poisoned")
1192 .push(request_id.to_string());
1193 Ok(())
1194 }
1195
1196 async fn record_non_crediting_deposit(
1197 &self,
1198 input: &HypercoreCashLedgerApply,
1199 ) -> Result<()> {
1200 self.non_crediting_events
1201 .lock()
1202 .expect("non_crediting_events mutex poisoned")
1203 .insert((input.event_hash.clone(), input.amount_usdc));
1204 self.apply_inputs
1205 .lock()
1206 .expect("apply_inputs mutex poisoned")
1207 .push(input.clone());
1208 Ok(())
1209 }
1210
1211 async fn apply_deposit(
1212 &self,
1213 input: &HypercoreCashLedgerApply,
1214 ) -> Result<hypercall_db::HypercoreCashLedgerApplyResult> {
1215 self.apply_calls.fetch_add(1, Ordering::SeqCst);
1216 self.apply_inputs
1217 .lock()
1218 .expect("apply_inputs mutex poisoned")
1219 .push(input.clone());
1220 if let Some(result) = self
1221 .apply_results
1222 .lock()
1223 .expect("apply_results mutex poisoned")
1224 .pop_front()
1225 {
1226 return Ok(result);
1227 }
1228 Ok(hypercall_db::HypercoreCashLedgerApplyResult {
1229 applied: true,
1230 balance_after: Some(Decimal::ONE),
1231 ledger_event_id: Some(1),
1232 })
1233 }
1234 }
1235
1236 struct CountingApplier {
1237 calls: AtomicUsize,
1238 fail_next: AtomicBool,
1239 journal_request_ids: Mutex<Vec<String>>,
1240 }
1241
1242 #[async_trait::async_trait]
1243 impl HypercoreCashDepositApplier for CountingApplier {
1244 async fn apply_deposit(&self, request: DepositRequest) -> Result<()> {
1245 self.calls.fetch_add(1, Ordering::SeqCst);
1246 self.journal_request_ids
1247 .lock()
1248 .expect("journal_request_ids mutex poisoned")
1249 .push(request.journal_request_id);
1250 if self.fail_next.swap(false, Ordering::SeqCst) {
1251 anyhow::bail!("injected engine apply failure");
1252 }
1253 Ok(())
1254 }
1255 }
1256
1257 fn test_observer_with_margin_mode(
1258 mode: Option<hypercall_types::MarginMode>,
1259 ) -> (
1260 HypercoreCashLedgerObserver,
1261 Arc<CountingStore>,
1262 Arc<CountingApplier>,
1263 ) {
1264 let store = Arc::new(CountingStore {
1265 apply_calls: AtomicUsize::new(0),
1266 apply_inputs: Mutex::new(Vec::new()),
1267 apply_results: Mutex::new(VecDeque::new()),
1268 pending_usdc_matches: Mutex::new(VecDeque::new()),
1269 pm_liquidity_matches: Mutex::new(VecDeque::new()),
1270 matched_pm_liquidity_requests: Mutex::new(Vec::new()),
1271 credited_event_wallets: Mutex::new(HashMap::new()),
1272 non_crediting_events: Mutex::new(HashSet::new()),
1273 submitted_usdc_requests: Mutex::new(Vec::new()),
1274 });
1275 let applier = Arc::new(CountingApplier {
1276 calls: AtomicUsize::new(0),
1277 fail_next: AtomicBool::new(false),
1278 journal_request_ids: Mutex::new(Vec::new()),
1279 });
1280 let tier_cache = Arc::new(
1281 TierCache::new(Arc::new(MockTierDb { mode })).expect("tier cache should initialize"),
1282 );
1283 let observer = HypercoreCashLedgerObserver {
1284 store: store.clone(),
1285 applier: applier.clone(),
1286 client: reqwest::Client::new(),
1287 config: HypercoreCashLedgerObserverConfig {
1288 exchange_address: exchange_addr(),
1289 core_deposit_wallet_address: core_deposit_wallet_addr(),
1290 ..Default::default()
1291 },
1292 tier_cache,
1293 };
1294 (observer, store, applier)
1295 }
1296
1297 #[tokio::test]
1298 async fn ws_missing_margin_mode_defaults_to_standard_and_credits() {
1299 let (observer, store, applier) = test_observer_with_margin_mode(None);
1300 let event = LedgerUpdate {
1301 time: 1778789705303_u64,
1302 hash: test_event_hash(1),
1303 evm_tx_hash: None,
1304 delta: LedgerDelta {
1305 delta_type: "send".to_string(),
1306 usdc: None,
1307 amount: Some("1.0".to_string()),
1308 token: Some("USDC".to_string()),
1309 user: Some(sender_addr().to_string()),
1310 destination: Some(exchange_addr().to_string()),
1311 evm_tx_hash: None,
1312 },
1313 };
1314
1315 observer
1316 .handle_ledger_update(event, "websocket")
1317 .await
1318 .expect("missing margin mode should default to Standard");
1319
1320 assert_eq!(store.apply_calls.load(Ordering::SeqCst), 1);
1321 assert_eq!(applier.calls.load(Ordering::SeqCst), 1);
1322 let apply_inputs = store
1323 .apply_inputs
1324 .lock()
1325 .expect("apply_inputs mutex poisoned");
1326 assert_eq!(apply_inputs.len(), 1);
1327 assert_eq!(
1328 apply_inputs[0].wallet,
1329 sender_addr(),
1330 "direct user HyperCore deposits must credit the sender"
1331 );
1332 assert!(
1333 applier
1334 .journal_request_ids
1335 .lock()
1336 .expect("journal_request_ids mutex poisoned")[0]
1337 .parse::<uuid::Uuid>()
1338 .ok()
1339 .is_some(),
1340 "standard-margin HyperCore cash deposits must carry a replayable engine journal request id"
1341 );
1342 }
1343
1344 #[tokio::test]
1345 async fn ws_deposit_replays_engine_apply_after_db_applied_retry() {
1346 let (observer, store, applier) =
1347 test_observer_with_margin_mode(Some(hypercall_types::MarginMode::Standard));
1348 store
1349 .apply_results
1350 .lock()
1351 .expect("apply_results mutex poisoned")
1352 .extend([
1353 hypercall_db::HypercoreCashLedgerApplyResult {
1354 applied: true,
1355 balance_after: Some(Decimal::ONE),
1356 ledger_event_id: Some(41),
1357 },
1358 hypercall_db::HypercoreCashLedgerApplyResult {
1359 applied: false,
1360 balance_after: Some(Decimal::ONE),
1361 ledger_event_id: Some(41),
1362 },
1363 ]);
1364 applier.fail_next.store(true, Ordering::SeqCst);
1365
1366 let event = LedgerUpdate {
1367 time: 1778789705303_u64,
1368 hash: test_event_hash(2),
1369 evm_tx_hash: None,
1370 delta: LedgerDelta {
1371 delta_type: "send".to_string(),
1372 usdc: None,
1373 amount: Some("1.0".to_string()),
1374 token: Some("USDC".to_string()),
1375 user: Some(sender_addr().to_string()),
1376 destination: Some(exchange_addr().to_string()),
1377 evm_tx_hash: None,
1378 },
1379 };
1380
1381 let first = observer
1382 .handle_ledger_update(event.clone(), "websocket")
1383 .await;
1384 assert!(first
1385 .expect_err("first apply should surface injected engine failure")
1386 .to_string()
1387 .contains("injected engine apply failure"));
1388
1389 observer
1390 .handle_ledger_update(event, "websocket")
1391 .await
1392 .expect("retry should replay engine apply using durable ledger_event_id");
1393
1394 assert_eq!(store.apply_calls.load(Ordering::SeqCst), 2);
1395 assert_eq!(applier.calls.load(Ordering::SeqCst), 2);
1396 let journal_request_ids = applier
1397 .journal_request_ids
1398 .lock()
1399 .expect("journal_request_ids mutex poisoned");
1400 assert_eq!(
1401 journal_request_ids.len(),
1402 2,
1403 "both first apply and retry should hand the engine a restart-source request id"
1404 );
1405 assert_eq!(
1406 journal_request_ids[0], journal_request_ids[1],
1407 "retries for the same durable ledger event must reuse the same engine journal request id"
1408 );
1409 assert!(
1410 journal_request_ids[0]
1411 .parse::<uuid::Uuid>()
1412 .ok()
1413 .is_some(),
1414 "standard-margin HyperCore cash deposits must carry a replayable engine journal request id"
1415 );
1416 }
1417
1418 #[tokio::test]
1419 async fn core_deposit_wallet_originated_deposit_credits_matched_usdc_account_not_exchange() {
1420 let (observer, store, applier) =
1421 test_observer_with_margin_mode(Some(hypercall_types::MarginMode::Standard));
1422 let credited_account = WalletAddress::from(alloy::primitives::address!(
1423 "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
1424 ));
1425 store
1426 .pending_usdc_matches
1427 .lock()
1428 .expect("pending_usdc_matches mutex poisoned")
1429 .push_back(hypercall_db::RsmUsdcDepositMatch {
1430 request_id: "usdc-request-1".to_string(),
1431 account: credited_account,
1432 tx_hash: "0xevm".to_string(),
1433 log_index: 3,
1434 amount_wei: "1000000".to_string(),
1435 token: WalletAddress::from(alloy::primitives::address!(
1436 "cccccccccccccccccccccccccccccccccccccccc"
1437 )),
1438 });
1439
1440 let event = LedgerUpdate {
1441 time: 1778789705303_u64,
1442 hash: test_event_hash(3),
1443 evm_tx_hash: Some("0xevm".to_string()),
1444 delta: LedgerDelta {
1445 delta_type: "send".to_string(),
1446 usdc: None,
1447 amount: Some("1.0".to_string()),
1448 token: Some("USDC".to_string()),
1449 user: Some(core_deposit_wallet_addr().to_string()),
1450 destination: Some(exchange_addr().to_string()),
1451 evm_tx_hash: None,
1452 },
1453 };
1454
1455 observer
1456 .handle_ledger_update(event, "websocket")
1457 .await
1458 .expect("matched Exchange-originated deposit should credit attributed account");
1459
1460 let apply_inputs = store
1461 .apply_inputs
1462 .lock()
1463 .expect("apply_inputs mutex poisoned");
1464 assert_eq!(apply_inputs.len(), 1);
1465 assert_eq!(apply_inputs[0].wallet, credited_account);
1466 assert_ne!(
1467 apply_inputs[0].wallet,
1468 exchange_addr(),
1469 "Exchange-originated deposits must not credit Exchange itself"
1470 );
1471 assert_ne!(
1472 apply_inputs[0].wallet,
1473 core_deposit_wallet_addr(),
1474 "CoreDepositWallet-originated deposits must not credit CoreDepositWallet itself"
1475 );
1476 assert_eq!(apply_inputs[0].event_hash, test_event_hash_string(3));
1477 assert_eq!(applier.calls.load(Ordering::SeqCst), 1);
1478 assert_eq!(
1479 store
1480 .submitted_usdc_requests
1481 .lock()
1482 .expect("submitted_usdc_requests mutex poisoned")
1483 .as_slice(),
1484 &["usdc-request-1".to_string()]
1485 );
1486 }
1487
1488 #[tokio::test]
1489 async fn core_deposit_wallet_originated_pm_liquidity_with_evm_tx_is_consumed_and_not_credited()
1490 {
1491 let (observer, store, applier) =
1492 test_observer_with_margin_mode(Some(hypercall_types::MarginMode::Standard));
1493 store
1494 .pm_liquidity_matches
1495 .lock()
1496 .expect("pm_liquidity_matches mutex poisoned")
1497 .push_back(hypercall_db::RsmUsdcDepositMatch {
1498 request_id: "pm-liquidity-request-1".to_string(),
1499 account: sender_addr(),
1500 tx_hash: "0xpm-liquidity".to_string(),
1501 log_index: 9,
1502 amount_wei: "1000000".to_string(),
1503 token: WalletAddress::from(alloy::primitives::address!(
1504 "cccccccccccccccccccccccccccccccccccccccc"
1505 )),
1506 });
1507
1508 let event = LedgerUpdate {
1509 time: 1778789705303_u64,
1510 hash: test_event_hash(7),
1511 evm_tx_hash: Some("0xpm-liquidity".to_string()),
1512 delta: LedgerDelta {
1513 delta_type: "send".to_string(),
1514 usdc: None,
1515 amount: Some("1.0".to_string()),
1516 token: Some("USDC".to_string()),
1517 user: Some(core_deposit_wallet_addr().to_string()),
1518 destination: Some(exchange_addr().to_string()),
1519 evm_tx_hash: None,
1520 },
1521 };
1522
1523 observer
1524 .handle_ledger_update(event, "websocket")
1525 .await
1526 .expect("PM liquidity deposit should be recorded as non-crediting");
1527
1528 assert_eq!(store.apply_calls.load(Ordering::SeqCst), 0);
1529 assert_eq!(applier.calls.load(Ordering::SeqCst), 0);
1530 assert_eq!(
1531 store
1532 .matched_pm_liquidity_requests
1533 .lock()
1534 .expect("matched_pm_liquidity_requests mutex poisoned")
1535 .as_slice(),
1536 &[(
1537 "pm-liquidity-request-1".to_string(),
1538 test_event_hash_string(7)
1539 )]
1540 );
1541 assert!(store
1542 .pm_liquidity_matches
1543 .lock()
1544 .expect("pm_liquidity_matches mutex poisoned")
1545 .is_empty());
1546 let apply_inputs = store
1547 .apply_inputs
1548 .lock()
1549 .expect("apply_inputs mutex poisoned");
1550 assert_eq!(apply_inputs.len(), 1);
1551 assert_eq!(apply_inputs[0].event_hash, test_event_hash_string(7));
1552 }
1553
1554 #[tokio::test]
1555 async fn core_deposit_wallet_originated_pm_liquidity_amount_match_is_consumed_and_not_credited()
1556 {
1557 let (observer, store, applier) =
1558 test_observer_with_margin_mode(Some(hypercall_types::MarginMode::Standard));
1559 store
1560 .pm_liquidity_matches
1561 .lock()
1562 .expect("pm_liquidity_matches mutex poisoned")
1563 .push_back(hypercall_db::RsmUsdcDepositMatch {
1564 request_id: "pm-liquidity-request-2".to_string(),
1565 account: sender_addr(),
1566 tx_hash: "0xpm-liquidity-amount".to_string(),
1567 log_index: 10,
1568 amount_wei: "1000000".to_string(),
1569 token: WalletAddress::from(alloy::primitives::address!(
1570 "cccccccccccccccccccccccccccccccccccccccc"
1571 )),
1572 });
1573
1574 let event = LedgerUpdate {
1575 time: 1778789705303_u64,
1576 hash: test_event_hash(8),
1577 evm_tx_hash: None,
1578 delta: LedgerDelta {
1579 delta_type: "send".to_string(),
1580 usdc: None,
1581 amount: Some("1.0".to_string()),
1582 token: Some("USDC".to_string()),
1583 user: Some(core_deposit_wallet_addr().to_string()),
1584 destination: Some(exchange_addr().to_string()),
1585 evm_tx_hash: None,
1586 },
1587 };
1588
1589 observer
1590 .handle_ledger_update(event, "websocket")
1591 .await
1592 .expect("amount-matched PM liquidity deposit should be recorded as non-crediting");
1593
1594 assert_eq!(store.apply_calls.load(Ordering::SeqCst), 0);
1595 assert_eq!(applier.calls.load(Ordering::SeqCst), 0);
1596 assert_eq!(
1597 store
1598 .matched_pm_liquidity_requests
1599 .lock()
1600 .expect("matched_pm_liquidity_requests mutex poisoned")
1601 .as_slice(),
1602 &[(
1603 "pm-liquidity-request-2".to_string(),
1604 test_event_hash_string(8)
1605 )]
1606 );
1607 assert!(store
1608 .pm_liquidity_matches
1609 .lock()
1610 .expect("pm_liquidity_matches mutex poisoned")
1611 .is_empty());
1612 let apply_inputs = store
1613 .apply_inputs
1614 .lock()
1615 .expect("apply_inputs mutex poisoned");
1616 assert_eq!(apply_inputs.len(), 1);
1617 assert_eq!(apply_inputs[0].event_hash, test_event_hash_string(8));
1618 }
1619
1620 #[tokio::test]
1621 async fn replayed_pm_liquidity_non_crediting_event_does_not_rematch_or_credit() {
1622 let (observer, store, applier) =
1623 test_observer_with_margin_mode(Some(hypercall_types::MarginMode::Standard));
1624 store
1625 .pm_liquidity_matches
1626 .lock()
1627 .expect("pm_liquidity_matches mutex poisoned")
1628 .push_back(hypercall_db::RsmUsdcDepositMatch {
1629 request_id: "pm-liquidity-request-replay".to_string(),
1630 account: sender_addr(),
1631 tx_hash: "0xpm-liquidity-replay".to_string(),
1632 log_index: 11,
1633 amount_wei: "1000000".to_string(),
1634 token: WalletAddress::from(alloy::primitives::address!(
1635 "cccccccccccccccccccccccccccccccccccccccc"
1636 )),
1637 });
1638
1639 let event = || LedgerUpdate {
1640 time: 1778789705303_u64,
1641 hash: test_event_hash(9),
1642 evm_tx_hash: Some("0xpm-liquidity-replay".to_string()),
1643 delta: LedgerDelta {
1644 delta_type: "send".to_string(),
1645 usdc: None,
1646 amount: Some("1.0".to_string()),
1647 token: Some("USDC".to_string()),
1648 user: Some(core_deposit_wallet_addr().to_string()),
1649 destination: Some(exchange_addr().to_string()),
1650 evm_tx_hash: None,
1651 },
1652 };
1653
1654 observer
1655 .handle_ledger_update(event(), "websocket")
1656 .await
1657 .expect("first PM liquidity deposit should be recorded as non-crediting");
1658 observer
1659 .handle_ledger_update(event(), "replay")
1660 .await
1661 .expect("replayed PM liquidity deposit should stay non-crediting");
1662
1663 assert_eq!(store.apply_calls.load(Ordering::SeqCst), 0);
1664 assert_eq!(applier.calls.load(Ordering::SeqCst), 0);
1665 assert_eq!(
1666 store
1667 .matched_pm_liquidity_requests
1668 .lock()
1669 .expect("matched_pm_liquidity_requests mutex poisoned")
1670 .as_slice(),
1671 &[(
1672 "pm-liquidity-request-replay".to_string(),
1673 test_event_hash_string(9)
1674 )],
1675 "replay must not try to consume the PM liquidity request twice"
1676 );
1677 let apply_inputs = store
1678 .apply_inputs
1679 .lock()
1680 .expect("apply_inputs mutex poisoned");
1681 assert_eq!(apply_inputs.len(), 2);
1682 assert!(apply_inputs
1683 .iter()
1684 .all(|input| input.event_hash == test_event_hash_string(9)));
1685 }
1686
1687 #[tokio::test]
1688 async fn core_deposit_wallet_originated_deposit_without_evm_event_fails_before_credit() {
1689 let (observer, store, applier) =
1690 test_observer_with_margin_mode(Some(hypercall_types::MarginMode::Standard));
1691 let event = LedgerUpdate {
1692 time: 1778789705303_u64,
1693 hash: test_event_hash(4),
1694 evm_tx_hash: None,
1695 delta: LedgerDelta {
1696 delta_type: "send".to_string(),
1697 usdc: None,
1698 amount: Some("1.0".to_string()),
1699 token: Some("USDC".to_string()),
1700 user: Some(core_deposit_wallet_addr().to_string()),
1701 destination: Some(exchange_addr().to_string()),
1702 evm_tx_hash: None,
1703 },
1704 };
1705
1706 let error = observer
1707 .handle_ledger_update(event, "websocket")
1708 .await
1709 .expect_err("unmatched CoreDepositWallet-originated deposit must not be credited");
1710 assert!(
1711 error
1712 .to_string()
1713 .contains("no pending Exchange.UsdcDeposit"),
1714 "unexpected error: {error:#}"
1715 );
1716 assert_eq!(store.apply_calls.load(Ordering::SeqCst), 0);
1717 assert!(
1718 store
1719 .apply_inputs
1720 .lock()
1721 .expect("apply_inputs mutex poisoned")
1722 .is_empty(),
1723 "unattributed CoreDepositWallet-originated deposits must not write a cash ledger row"
1724 );
1725 assert_eq!(applier.calls.load(Ordering::SeqCst), 0);
1726 }
1727
1728 #[tokio::test]
1729 async fn exchange_originated_deposit_replay_uses_existing_credited_wallet() {
1730 let (observer, store, applier) =
1731 test_observer_with_margin_mode(Some(hypercall_types::MarginMode::Standard));
1732 let credited_account = WalletAddress::from(alloy::primitives::address!(
1733 "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
1734 ));
1735 store
1736 .credited_event_wallets
1737 .lock()
1738 .expect("credited_event_wallets mutex poisoned")
1739 .insert(test_event_hash_string(5), credited_account);
1740
1741 let event = LedgerUpdate {
1742 time: 1778789705303_u64,
1743 hash: test_event_hash(5),
1744 evm_tx_hash: None,
1745 delta: LedgerDelta {
1746 delta_type: "send".to_string(),
1747 usdc: None,
1748 amount: Some("1.0".to_string()),
1749 token: Some("USDC".to_string()),
1750 user: Some(exchange_addr().to_string()),
1751 destination: Some(exchange_addr().to_string()),
1752 evm_tx_hash: None,
1753 },
1754 };
1755
1756 observer
1757 .handle_ledger_update(event, "websocket")
1758 .await
1759 .expect("replayed matched event should reuse credited wallet");
1760
1761 let apply_inputs = store
1762 .apply_inputs
1763 .lock()
1764 .expect("apply_inputs mutex poisoned");
1765 assert_eq!(apply_inputs.len(), 1);
1766 assert_eq!(apply_inputs[0].wallet, credited_account);
1767 assert!(
1768 store
1769 .submitted_usdc_requests
1770 .lock()
1771 .expect("submitted_usdc_requests mutex poisoned")
1772 .is_empty(),
1773 "replay path must not require a pending EVM request"
1774 );
1775 assert_eq!(applier.calls.load(Ordering::SeqCst), 1);
1776 }
1777
1778 #[tokio::test]
1779 async fn exchange_originated_deposit_replay_marks_recovered_pending_request_submitted() {
1780 let (observer, store, _applier) =
1781 test_observer_with_margin_mode(Some(hypercall_types::MarginMode::Standard));
1782 let credited_account = WalletAddress::from(alloy::primitives::address!(
1783 "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
1784 ));
1785 store
1786 .credited_event_wallets
1787 .lock()
1788 .expect("credited_event_wallets mutex poisoned")
1789 .insert(test_event_hash_string(6), credited_account);
1790 store
1791 .pending_usdc_matches
1792 .lock()
1793 .expect("pending_usdc_matches mutex poisoned")
1794 .push_back(hypercall_db::RsmUsdcDepositMatch {
1795 request_id: "usdc-request-recovered".to_string(),
1796 account: credited_account,
1797 tx_hash: "0xevm".to_string(),
1798 log_index: 7,
1799 amount_wei: "1000000".to_string(),
1800 token: WalletAddress::from(alloy::primitives::address!(
1801 "cccccccccccccccccccccccccccccccccccccccc"
1802 )),
1803 });
1804
1805 let event = LedgerUpdate {
1806 time: 1778789705303_u64,
1807 hash: test_event_hash(6),
1808 evm_tx_hash: None,
1809 delta: LedgerDelta {
1810 delta_type: "send".to_string(),
1811 usdc: None,
1812 amount: Some("1.0".to_string()),
1813 token: Some("USDC".to_string()),
1814 user: Some(exchange_addr().to_string()),
1815 destination: Some(exchange_addr().to_string()),
1816 evm_tx_hash: None,
1817 },
1818 };
1819
1820 observer
1821 .handle_ledger_update(event, "websocket")
1822 .await
1823 .expect("replayed credited event should recover and submit the pending request id");
1824
1825 assert_eq!(
1826 store
1827 .submitted_usdc_requests
1828 .lock()
1829 .expect("submitted_usdc_requests mutex poisoned")
1830 .as_slice(),
1831 &["usdc-request-recovered".to_string()]
1832 );
1833 }
1834
1835 #[test]
1836 fn usdc_decimal_to_wei_string_requires_exact_six_decimals() {
1837 assert_eq!(
1838 usdc_decimal_to_wei_string(dec!(1.234567)).unwrap(),
1839 "1234567"
1840 );
1841 assert!(usdc_decimal_to_wei_string(dec!(1.0000001)).is_err());
1842 }
1843
1844 #[test]
1845 fn config_from_runtime_includes_ws_url() {
1846 let runtime = crate::backend_config::OnchainDepositsRuntimeConfig {
1847 ws_url: Some("wss://hydromancer.example.com/ws".to_string()),
1848 ..Default::default()
1849 };
1850 let config = HypercoreCashLedgerObserverConfig::from_runtime_config(
1851 &runtime,
1852 "https://api.example.com/info",
1853 exchange_addr(),
1854 core_deposit_wallet_addr(),
1855 );
1856 assert_eq!(
1857 config.ws_url.as_deref(),
1858 Some("wss://hydromancer.example.com/ws")
1859 );
1860 }
1861
1862 #[test]
1863 fn config_from_runtime_without_ws_url() {
1864 let runtime = crate::backend_config::OnchainDepositsRuntimeConfig::default();
1865 let config = HypercoreCashLedgerObserverConfig::from_runtime_config(
1866 &runtime,
1867 "https://api.example.com/info",
1868 exchange_addr(),
1869 core_deposit_wallet_addr(),
1870 );
1871 assert!(config.ws_url.is_none());
1872 }
1873}