hypercall_admin/monitoring/
accounts.rs1use axum::{
4 extract::{Query, State},
5 http::StatusCode,
6 response::IntoResponse,
7};
8use rust_decimal::Decimal;
9use rust_decimal_macros::dec;
10use serde::{Deserialize, Serialize};
11use sonic_rs::json;
12use std::collections::HashMap;
13
14use crate::state::AdminState;
15use hypercall_db::{DepositMonitoringRow, RsmCreditReader};
16use hypercall_runtime_api::sonic_json::SonicJson;
17
18#[derive(Debug, Deserialize, utoipa::IntoParams, utoipa::ToSchema)]
19pub struct DepositsQuery {
20 #[serde(default = "default_deposits_limit")]
21 pub limit: usize,
22 #[serde(default)]
23 pub offset: usize,
24}
25
26fn default_deposits_limit() -> usize {
27 100
28}
29
30#[derive(Debug, Serialize, utoipa::ToSchema)]
31pub struct DepositMonitoringRowResponse {
32 pub source: String,
33 pub status: String,
34 pub correlation_status: String,
35 pub wallet: String,
36 pub amount_usdc: String,
37 pub event_hash: String,
38 pub tx_hash: Option<String>,
39 pub request_id: Option<String>,
40 pub observed_block: Option<i64>,
41 pub log_index: Option<i64>,
42 pub ledger_event_id: Option<i64>,
43 pub created_at: String,
44 pub updated_at: String,
45}
46
47#[derive(Debug, Serialize, utoipa::ToSchema)]
48pub struct DepositsResponse {
49 pub count: usize,
50 pub rows: Vec<DepositMonitoringRowResponse>,
51}
52
53fn deposits_response_from_rows(rows: Vec<DepositMonitoringRow>) -> DepositsResponse {
54 let rows: Vec<_> = rows
55 .into_iter()
56 .map(|row| DepositMonitoringRowResponse {
57 source: row.source,
58 status: row.status,
59 correlation_status: row.correlation_status,
60 wallet: row.wallet.to_string(),
61 amount_usdc: row.amount_usdc,
62 event_hash: row.event_hash,
63 tx_hash: row.tx_hash,
64 request_id: row.request_id,
65 observed_block: row.observed_block,
66 log_index: row.log_index,
67 ledger_event_id: row.ledger_event_id,
68 created_at: row.created_at,
69 updated_at: row.updated_at,
70 })
71 .collect();
72 let count = rows.len();
73 DepositsResponse { count, rows }
74}
75
76#[utoipa::path(
78 get,
79 path = "/monitoring/deposits",
80 params(DepositsQuery),
81 responses(
82 (status = 200, description = "Recent deposit rows", body = DepositsResponse),
83 (status = 400, description = "Invalid pagination parameters"),
84 (status = 401, description = "Invalid or missing X-Admin-Key header"),
85 (status = 500, description = "Deposit monitoring query failed"),
86 (status = 503, description = "Database handler not available")
87 ),
88 tag = "Monitoring",
89 security(("admin_key" = []))
90)]
91pub async fn list_deposits(
92 State(app_state): State<AdminState>,
93 Query(params): Query<DepositsQuery>,
94) -> impl IntoResponse {
95 let db_handler = match app_state.sync_db.as_ref() {
96 Some(handler) => handler,
97 None => {
98 return (
99 StatusCode::SERVICE_UNAVAILABLE,
100 SonicJson(json!({ "error": "database handler not available" })),
101 )
102 .into_response();
103 }
104 };
105
106 let limit = match i64::try_from(params.limit) {
107 Ok(limit) => limit,
108 Err(_) => {
109 return (
110 StatusCode::BAD_REQUEST,
111 SonicJson(json!({ "error": "limit is too large" })),
112 )
113 .into_response();
114 }
115 };
116 let offset = match i64::try_from(params.offset) {
117 Ok(offset) => offset,
118 Err(_) => {
119 return (
120 StatusCode::BAD_REQUEST,
121 SonicJson(json!({ "error": "offset is too large" })),
122 )
123 .into_response();
124 }
125 };
126
127 let rsm_credit_reader: &dyn RsmCreditReader = db_handler.as_ref();
128 match rsm_credit_reader
129 .list_recent_cash_deposit_monitoring_rows(limit, offset)
130 .await
131 {
132 Ok(rows) => (StatusCode::OK, SonicJson(deposits_response_from_rows(rows))).into_response(),
133 Err(error) => {
134 tracing::error!(%error, "deposit monitoring query failed");
135 (
136 StatusCode::INTERNAL_SERVER_ERROR,
137 SonicJson(json!({ "error": "deposit monitoring query failed" })),
138 )
139 .into_response()
140 }
141 }
142}
143
144#[utoipa::path(
148 get,
149 path = "/monitoring/accounts",
150 responses(
151 (status = 200, description = "Account list", body = MonitoringAccountsResponse),
152 (status = 401, description = "Invalid or missing X-Admin-Key header")
153 ),
154 tag = "Monitoring",
155 security(("admin_key" = []))
156)]
157pub async fn list_accounts(State(app_state): State<AdminState>) -> impl IntoResponse {
158 use hypercall_types::api_models::{MonitoringAccountSummary, MonitoringAccountsResponse};
159
160 let portfolios = app_state.portfolio_cache.get_all_portfolios().await;
162
163 let mut accounts: Vec<MonitoringAccountSummary> = Vec::new();
164
165 for (wallet, summary) in portfolios {
166 let position_count = summary.positions.len();
167 let total_notional: Decimal = summary
168 .positions
169 .values()
170 .map(|p| p.amount.abs() * p.entry_price)
171 .sum();
172
173 let (margin_mode, equity, margin_used, margin_error) = match app_state
176 .portfolio_cache
177 .compute_wallet_margin_snapshot(&wallet)
178 .await
179 {
180 Ok(snapshot) => (
181 snapshot.mode.as_str().to_string(),
182 Some(snapshot.margin_summary.equity.to_string()),
183 Some(snapshot.total_margin_used.to_string()),
184 None,
185 ),
186 Err(e) => {
187 tracing::warn!(
188 wallet = %wallet,
189 error = %e,
190 "Failed to compute margin for monitoring account"
191 );
192 ("unknown".to_string(), None, None, Some(e.to_string()))
193 }
194 };
195
196 accounts.push(MonitoringAccountSummary {
197 wallet: wallet.to_string(),
198 margin_mode,
199 equity,
200 margin_used,
201 position_count,
202 total_notional: total_notional.to_string(),
203 margin_error,
204 });
205 }
206
207 accounts.sort_by(|a, b| {
211 let equity = |value: &Option<String>| {
212 value
213 .as_deref()
214 .and_then(|v| v.parse::<rust_decimal::Decimal>().ok())
215 };
216 match (equity(&b.equity), equity(&a.equity)) {
217 (Some(eb), Some(ea)) => eb.cmp(&ea),
218 (Some(_), None) => std::cmp::Ordering::Less,
219 (None, Some(_)) => std::cmp::Ordering::Greater,
220 (None, None) => std::cmp::Ordering::Equal,
221 }
222 });
223
224 SonicJson(MonitoringAccountsResponse {
225 account_count: accounts.len(),
226 accounts,
227 })
228}
229
230#[utoipa::path(
234 get,
235 path = "/monitoring/positions",
236 responses(
237 (status = 200, description = "Position summary by symbol", body = MonitoringPositionsResponse),
238 (status = 401, description = "Invalid or missing X-Admin-Key header")
239 ),
240 tag = "Monitoring",
241 security(("admin_key" = []))
242)]
243pub async fn list_positions(State(app_state): State<AdminState>) -> impl IntoResponse {
244 use hypercall_types::api_models::{
245 MonitoringPositionHolder, MonitoringPositionsResponse, MonitoringSymbolPosition,
246 };
247
248 let portfolios = app_state.portfolio_cache.get_all_portfolios().await;
249
250 let mut by_symbol: HashMap<String, SymbolPositionSummaryInternal> = HashMap::new();
252
253 for (wallet, summary) in portfolios {
254 for (symbol, pos) in &summary.positions {
255 let entry =
256 by_symbol
257 .entry(symbol.clone())
258 .or_insert_with(|| SymbolPositionSummaryInternal {
259 symbol: symbol.clone(),
260 total_long: dec!(0),
261 total_short: dec!(0),
262 net_position: dec!(0),
263 holder_count: 0,
264 holders: Vec::new(),
265 });
266
267 if pos.amount > dec!(0) {
268 entry.total_long += pos.amount;
269 } else {
270 entry.total_short += pos.amount.abs();
271 }
272 entry.net_position += pos.amount;
273 entry.holder_count += 1;
274 entry.holders.push(MonitoringPositionHolder {
275 wallet: wallet.to_string(),
276 amount: pos.amount.to_string(),
277 entry_price: pos.entry_price.to_string(),
278 unrealized_pnl: pos.unrealized_pnl.to_string(),
279 });
280 }
281 }
282
283 let mut symbols: Vec<MonitoringSymbolPosition> = by_symbol
284 .values()
285 .map(|s| MonitoringSymbolPosition {
286 symbol: s.symbol.clone(),
287 total_long: s.total_long.to_string(),
288 total_short: s.total_short.to_string(),
289 net_position: s.net_position.to_string(),
290 is_balanced: s.net_position.abs() < dec!(0.0001),
291 holder_count: s.holder_count,
292 holders: s.holders.clone(),
293 })
294 .collect();
295
296 symbols.sort_by(|a, b| {
299 let abs_net = |value: &str| {
300 value
301 .parse::<rust_decimal::Decimal>()
302 .map(|d| d.abs())
303 .unwrap_or_default()
304 };
305 abs_net(&b.net_position).cmp(&abs_net(&a.net_position))
306 });
307
308 SonicJson(MonitoringPositionsResponse {
309 symbol_count: symbols.len(),
310 symbols,
311 })
312}
313
314#[derive(Debug)]
315struct SymbolPositionSummaryInternal {
316 symbol: String,
317 total_long: Decimal,
318 total_short: Decimal,
319 net_position: Decimal,
320 holder_count: usize,
321 holders: Vec<hypercall_types::api_models::MonitoringPositionHolder>,
322}