Skip to main content

hypercall/startup/
hydromancer.rs

1//! Startup wiring for Hydromancer services.
2
3use std::collections::HashMap;
4use std::str::FromStr;
5use std::sync::Arc;
6
7use crate::db_handler::DbPool;
8use crate::price_oracle::hyperliquid_oracle::HyperliquidMarkPriceOracle;
9use hypercall_types::WalletAddress;
10use tracing::{debug, info, warn};
11
12/// Recover stuck EXPIRED_PENDING_PRICE instruments using Hydromancer fallback.
13///
14/// Called at startup to settle instruments that got stuck due to a restart
15/// during the settlement window.
16pub async fn recover_stuck_settlements(
17    pool: &Arc<DbPool>,
18    oracles: &HashMap<String, Arc<HyperliquidMarkPriceOracle>>,
19) -> anyhow::Result<usize> {
20    use crate::rsm::margin_manager::expiry_date_to_timestamp;
21    use anyhow::Context;
22    use diesel::prelude::*;
23    use hypercall_db_diesel::models::Instrument;
24    use hypercall_db_diesel::schema::instruments;
25
26    let stuck_instruments: Vec<Instrument> = {
27        let mut conn = pool
28            .get()
29            .context("Failed to get DB connection for recovery")?;
30        instruments::table
31            .filter(instruments::status.eq("EXPIRED_PENDING_PRICE"))
32            .order(instruments::expiry.asc())
33            .load::<Instrument>(&mut conn)?
34    };
35
36    if stuck_instruments.is_empty() {
37        return Ok(0);
38    }
39
40    info!(
41        "Found {} EXPIRED_PENDING_PRICE instruments for Hydromancer recovery",
42        stuck_instruments.len()
43    );
44
45    let mut settled_count = 0usize;
46
47    for instrument in &stuck_instruments {
48        let expiry_ts =
49            expiry_date_to_timestamp(&instrument.underlying, instrument.expiry as u64) as i64;
50
51        let oracle = match oracles.get(&instrument.underlying) {
52            Some(o) => o,
53            None => {
54                warn!(
55                    "No oracle for underlying {} (instrument {}), skipping recovery",
56                    instrument.underlying, instrument.id
57                );
58                continue;
59            }
60        };
61
62        if let Some(ref writer) = oracle.config().oracle_writer {
63            match writer.get_oracle_settlement_price_sync(&instrument.underlying, expiry_ts) {
64                Ok(Some(_)) => {
65                    debug!(
66                        "Settlement already exists for {} expiry={}, skipping",
67                        instrument.underlying, instrument.expiry
68                    );
69                    continue;
70                }
71                Ok(None) => {}
72                Err(e) => {
73                    warn!(
74                        "Failed to check settlement for {} expiry={}: {}",
75                        instrument.underlying, instrument.expiry, e
76                    );
77                    continue;
78                }
79            }
80        }
81
82        if let Some(_price) = oracle.attempt_hydromancer_fallback(expiry_ts).await {
83            settled_count += 1;
84            info!(
85                "Hydromancer recovery settled: instrument={}, underlying={}, expiry={}",
86                instrument.id, instrument.underlying, instrument.expiry
87            );
88        }
89    }
90
91    Ok(settled_count)
92}
93
94/// Account + manager pair from Exchange contract.
95pub struct ManagedAccountPair {
96    pub account: WalletAddress,
97    pub manager: WalletAddress,
98}
99
100/// Load all account addresses from the Exchange contract via RPC.
101/// Returns empty vec on any failure (non-fatal for feed startup).
102pub async fn load_exchange_accounts(rpc_url: &str, exchange_addr: &str) -> Vec<ManagedAccountPair> {
103    use alloy::primitives::{Address, U256};
104    use alloy::providers::{DynProvider, Provider, ProviderBuilder};
105    use alloy::sol;
106
107    sol! {
108        #[sol(rpc)]
109        contract ExchangeReader {
110            struct ApiWallet { bytes32 name; address addr; }
111            struct ManagedAccount { address account; address manager; ApiWallet[] apiWallets; }
112            function getManagedAccountsCount() external view returns (uint256);
113            function getManagedAccount(uint256 index) external view returns (ManagedAccount memory);
114        }
115    }
116
117    let Ok(url) = rpc_url.parse() else {
118        warn!("load_exchange_accounts: invalid RPC URL");
119        return vec![];
120    };
121    let Ok(addr) = Address::from_str(exchange_addr) else {
122        warn!("load_exchange_accounts: invalid Exchange address");
123        return vec![];
124    };
125
126    let provider: DynProvider = ProviderBuilder::new().connect_http(url).erased();
127    let exchange = ExchangeReader::new(addr, &provider);
128
129    let count: usize = match exchange.getManagedAccountsCount().call().await {
130        Ok(c) => c.try_into().unwrap_or(0usize),
131        Err(e) => {
132            warn!(
133                "load_exchange_accounts: getManagedAccountsCount failed: {}",
134                e
135            );
136            return vec![];
137        }
138    };
139
140    let mut accounts = Vec::with_capacity(count);
141    for i in 0..count {
142        match exchange.getManagedAccount(U256::from(i)).call().await {
143            Ok(result) => {
144                accounts.push(ManagedAccountPair {
145                    account: WalletAddress::from(result.account),
146                    manager: WalletAddress::from(result.manager),
147                });
148            }
149            Err(e) => {
150                warn!(index = i, "load_exchange_accounts: skipping account: {}", e);
151            }
152        }
153    }
154    accounts
155}