Skip to main content

hypercall_admin/monitoring/
accounts.rs

1//! Account / position / deposit listing admin endpoints.
2
3use axum::{
4    extract::{Query, State},
5    http::StatusCode,
6    response::IntoResponse,
7};
8use rust_decimal::Decimal;
9use rust_decimal_macros::dec;
10use serde::{Deserialize, Serialize};
11use sonic_rs::json;
12use std::collections::HashMap;
13
14use crate::state::AdminState;
15use hypercall_db::{DepositMonitoringRow, RsmCreditReader};
16use hypercall_runtime_api::sonic_json::SonicJson;
17
18#[derive(Debug, Deserialize, utoipa::IntoParams, utoipa::ToSchema)]
19pub struct DepositsQuery {
20    #[serde(default = "default_deposits_limit")]
21    pub limit: usize,
22    #[serde(default)]
23    pub offset: usize,
24}
25
26fn default_deposits_limit() -> usize {
27    100
28}
29
30#[derive(Debug, Serialize, utoipa::ToSchema)]
31pub struct DepositMonitoringRowResponse {
32    pub source: String,
33    pub status: String,
34    pub correlation_status: String,
35    pub wallet: String,
36    pub amount_usdc: String,
37    pub event_hash: String,
38    pub tx_hash: Option<String>,
39    pub request_id: Option<String>,
40    pub observed_block: Option<i64>,
41    pub log_index: Option<i64>,
42    pub ledger_event_id: Option<i64>,
43    pub created_at: String,
44    pub updated_at: String,
45}
46
47#[derive(Debug, Serialize, utoipa::ToSchema)]
48pub struct DepositsResponse {
49    pub count: usize,
50    pub rows: Vec<DepositMonitoringRowResponse>,
51}
52
53fn deposits_response_from_rows(rows: Vec<DepositMonitoringRow>) -> DepositsResponse {
54    let rows: Vec<_> = rows
55        .into_iter()
56        .map(|row| DepositMonitoringRowResponse {
57            source: row.source,
58            status: row.status,
59            correlation_status: row.correlation_status,
60            wallet: row.wallet.to_string(),
61            amount_usdc: row.amount_usdc,
62            event_hash: row.event_hash,
63            tx_hash: row.tx_hash,
64            request_id: row.request_id,
65            observed_block: row.observed_block,
66            log_index: row.log_index,
67            ledger_event_id: row.ledger_event_id,
68            created_at: row.created_at,
69            updated_at: row.updated_at,
70        })
71        .collect();
72    let count = rows.len();
73    DepositsResponse { count, rows }
74}
75
76/// GET /monitoring/deposits - Recent deposit attribution and cash ledger rows.
77#[utoipa::path(
78    get,
79    path = "/monitoring/deposits",
80    params(DepositsQuery),
81    responses(
82        (status = 200, description = "Recent deposit rows", body = DepositsResponse),
83        (status = 400, description = "Invalid pagination parameters"),
84        (status = 401, description = "Invalid or missing X-Admin-Key header"),
85        (status = 500, description = "Deposit monitoring query failed"),
86        (status = 503, description = "Database handler not available")
87    ),
88    tag = "Monitoring",
89    security(("admin_key" = []))
90)]
91pub async fn list_deposits(
92    State(app_state): State<AdminState>,
93    Query(params): Query<DepositsQuery>,
94) -> impl IntoResponse {
95    let db_handler = match app_state.sync_db.as_ref() {
96        Some(handler) => handler,
97        None => {
98            return (
99                StatusCode::SERVICE_UNAVAILABLE,
100                SonicJson(json!({ "error": "database handler not available" })),
101            )
102                .into_response();
103        }
104    };
105
106    let limit = match i64::try_from(params.limit) {
107        Ok(limit) => limit,
108        Err(_) => {
109            return (
110                StatusCode::BAD_REQUEST,
111                SonicJson(json!({ "error": "limit is too large" })),
112            )
113                .into_response();
114        }
115    };
116    let offset = match i64::try_from(params.offset) {
117        Ok(offset) => offset,
118        Err(_) => {
119            return (
120                StatusCode::BAD_REQUEST,
121                SonicJson(json!({ "error": "offset is too large" })),
122            )
123                .into_response();
124        }
125    };
126
127    let rsm_credit_reader: &dyn RsmCreditReader = db_handler.as_ref();
128    match rsm_credit_reader
129        .list_recent_cash_deposit_monitoring_rows(limit, offset)
130        .await
131    {
132        Ok(rows) => (StatusCode::OK, SonicJson(deposits_response_from_rows(rows))).into_response(),
133        Err(error) => {
134            tracing::error!(%error, "deposit monitoring query failed");
135            (
136                StatusCode::INTERNAL_SERVER_ERROR,
137                SonicJson(json!({ "error": "deposit monitoring query failed" })),
138            )
139                .into_response()
140        }
141    }
142}
143
144/// GET /monitoring/accounts - List all accounts with balances
145///
146/// **Authentication**: Requires `X-Admin-Key` header (performance protection).
147#[utoipa::path(
148    get,
149    path = "/monitoring/accounts",
150    responses(
151        (status = 200, description = "Account list", body = MonitoringAccountsResponse),
152        (status = 401, description = "Invalid or missing X-Admin-Key header")
153    ),
154    tag = "Monitoring",
155    security(("admin_key" = []))
156)]
157pub async fn list_accounts(State(app_state): State<AdminState>) -> impl IntoResponse {
158    use hypercall_types::api_models::{MonitoringAccountSummary, MonitoringAccountsResponse};
159
160    // Get all portfolios from cache
161    let portfolios = app_state.portfolio_cache.get_all_portfolios().await;
162
163    let mut accounts: Vec<MonitoringAccountSummary> = Vec::new();
164
165    for (wallet, summary) in portfolios {
166        let position_count = summary.positions.len();
167        let total_notional: Decimal = summary
168            .positions
169            .values()
170            .map(|p| p.amount.abs() * p.entry_price)
171            .sum();
172
173        // Use the same margin-mode-aware calculation as GET /portfolio.
174        // STANDARD accounts must not be pushed through the PM SPAN pipeline.
175        let (margin_mode, equity, margin_used, margin_error) = match app_state
176            .portfolio_cache
177            .compute_wallet_margin_snapshot(&wallet)
178            .await
179        {
180            Ok(snapshot) => (
181                snapshot.mode.as_str().to_string(),
182                Some(snapshot.margin_summary.equity.to_string()),
183                Some(snapshot.total_margin_used.to_string()),
184                None,
185            ),
186            Err(e) => {
187                tracing::warn!(
188                    wallet = %wallet,
189                    error = %e,
190                    "Failed to compute margin for monitoring account"
191                );
192                ("unknown".to_string(), None, None, Some(e.to_string()))
193            }
194        };
195
196        accounts.push(MonitoringAccountSummary {
197            wallet: wallet.to_string(),
198            margin_mode,
199            equity,
200            margin_used,
201            position_count,
202            total_notional: total_notional.to_string(),
203            margin_error,
204        });
205    }
206
207    // Sort by equity descending. Parse to Decimal (not f64) so large equities
208    // are not subject to floating-point precision loss; wallets with no equity
209    // value sort last.
210    accounts.sort_by(|a, b| {
211        let equity = |value: &Option<String>| {
212            value
213                .as_deref()
214                .and_then(|v| v.parse::<rust_decimal::Decimal>().ok())
215        };
216        match (equity(&b.equity), equity(&a.equity)) {
217            (Some(eb), Some(ea)) => eb.cmp(&ea),
218            (Some(_), None) => std::cmp::Ordering::Less,
219            (None, Some(_)) => std::cmp::Ordering::Greater,
220            (None, None) => std::cmp::Ordering::Equal,
221        }
222    });
223
224    SonicJson(MonitoringAccountsResponse {
225        account_count: accounts.len(),
226        accounts,
227    })
228}
229
230/// GET /monitoring/positions - List all open positions grouped by symbol
231///
232/// **Authentication**: Requires `X-Admin-Key` header (performance protection).
233#[utoipa::path(
234    get,
235    path = "/monitoring/positions",
236    responses(
237        (status = 200, description = "Position summary by symbol", body = MonitoringPositionsResponse),
238        (status = 401, description = "Invalid or missing X-Admin-Key header")
239    ),
240    tag = "Monitoring",
241    security(("admin_key" = []))
242)]
243pub async fn list_positions(State(app_state): State<AdminState>) -> impl IntoResponse {
244    use hypercall_types::api_models::{
245        MonitoringPositionHolder, MonitoringPositionsResponse, MonitoringSymbolPosition,
246    };
247
248    let portfolios = app_state.portfolio_cache.get_all_portfolios().await;
249
250    // Aggregate positions by symbol
251    let mut by_symbol: HashMap<String, SymbolPositionSummaryInternal> = HashMap::new();
252
253    for (wallet, summary) in portfolios {
254        for (symbol, pos) in &summary.positions {
255            let entry =
256                by_symbol
257                    .entry(symbol.clone())
258                    .or_insert_with(|| SymbolPositionSummaryInternal {
259                        symbol: symbol.clone(),
260                        total_long: dec!(0),
261                        total_short: dec!(0),
262                        net_position: dec!(0),
263                        holder_count: 0,
264                        holders: Vec::new(),
265                    });
266
267            if pos.amount > dec!(0) {
268                entry.total_long += pos.amount;
269            } else {
270                entry.total_short += pos.amount.abs();
271            }
272            entry.net_position += pos.amount;
273            entry.holder_count += 1;
274            entry.holders.push(MonitoringPositionHolder {
275                wallet: wallet.to_string(),
276                amount: pos.amount.to_string(),
277                entry_price: pos.entry_price.to_string(),
278                unrealized_pnl: pos.unrealized_pnl.to_string(),
279            });
280        }
281    }
282
283    let mut symbols: Vec<MonitoringSymbolPosition> = by_symbol
284        .values()
285        .map(|s| MonitoringSymbolPosition {
286            symbol: s.symbol.clone(),
287            total_long: s.total_long.to_string(),
288            total_short: s.total_short.to_string(),
289            net_position: s.net_position.to_string(),
290            is_balanced: s.net_position.abs() < dec!(0.0001),
291            holder_count: s.holder_count,
292            holders: s.holders.clone(),
293        })
294        .collect();
295
296    // Sort by absolute net position descending (show imbalances first). Parse to
297    // Decimal (not f64) to avoid floating-point precision loss on large positions.
298    symbols.sort_by(|a, b| {
299        let abs_net = |value: &str| {
300            value
301                .parse::<rust_decimal::Decimal>()
302                .map(|d| d.abs())
303                .unwrap_or_default()
304        };
305        abs_net(&b.net_position).cmp(&abs_net(&a.net_position))
306    });
307
308    SonicJson(MonitoringPositionsResponse {
309        symbol_count: symbols.len(),
310        symbols,
311    })
312}
313
314#[derive(Debug)]
315struct SymbolPositionSummaryInternal {
316    symbol: String,
317    total_long: Decimal,
318    total_short: Decimal,
319    net_position: Decimal,
320    holder_count: usize,
321    holders: Vec<hypercall_types::api_models::MonitoringPositionHolder>,
322}