1#[cfg(feature = "test-endpoints")]
2use std::time::Instant;
3
4#[cfg(feature = "test-endpoints")]
5use rust_decimal_macros::dec;
6#[cfg(feature = "test-endpoints")]
7use utoipa::ToSchema;
8
9use crate::sonic_json::SonicJson;
10use crate::{
11 error::ApiError,
12 middleware::SignerContext,
13 models::{
14 DeleteUserTierRequest, GetUserTierQuery, SetUserTierRequest, UserTierData, UserTierResponse,
15 },
16};
17use axum::extract::{Query, State};
18use axum::http::StatusCode;
19use hypercall_db::UserTierUpdate;
20use hypercall_engine::command::{
21 AccruePmSettlementInterestCommand, JournalPmRecoveryPlanCommand,
22 MarkPmRecoveryActionSubmittedCommand, PmRecoveryActionResult, PmRecoveryExternalKind,
23 PmRecoveryPlanCommand, ResolvePmRecoveryActionCommand, SetPmSettlementPoolConfigCommand,
24};
25use hypercall_margin::portfolio::PmSettlementPoolConfig;
26#[cfg(feature = "test-endpoints")]
27use hypercall_runtime_api::increment_pending_requests;
28#[cfg(feature = "test-endpoints")]
29use hypercall_types::utils::get_timestamp_millis;
30#[cfg(feature = "test-endpoints")]
31use hypercall_types::Side;
32#[cfg(feature = "test-endpoints")]
33use hypercall_types::{OrderAction, OrderActionMessage, OrderInfo, OrderUpdateStatus, TimeInForce};
34use serde::{Deserialize, Serialize};
35
36use super::AppState;
37
38#[derive(Debug, Deserialize)]
39pub struct SetPmSettlementPoolConfigRequest {
40 pub request_id: uuid::Uuid,
41 pub input_digest: String,
42 pub underlying: String,
43 pub config_version: u32,
44 pub config: PmSettlementPoolConfig,
45 pub timestamp_ms: u64,
46}
47
48#[derive(Debug, Deserialize)]
49pub struct AccruePmSettlementInterestRequest {
50 pub request_id: uuid::Uuid,
51 pub input_digest: String,
52 pub wallet: hypercall_types::WalletAddress,
53 pub underlying: String,
54 pub to_ms: i64,
55 pub timestamp_ms: u64,
56}
57
58#[derive(Debug, Deserialize)]
59pub struct JournalPmRecoveryPlanRequest {
60 pub request_id: uuid::Uuid,
61 pub input_digest: String,
62 pub plan: PmRecoveryPlanCommand,
63 pub timestamp_ms: u64,
64}
65
66#[derive(Debug, Deserialize)]
67pub struct MarkPmRecoveryActionSubmittedRequest {
68 pub request_id: uuid::Uuid,
69 pub input_digest: String,
70 pub wallet: hypercall_types::WalletAddress,
71 pub plan_id: String,
72 pub action_index: u32,
73 pub attempt: u32,
74 pub external_id: String,
75 pub external_kind: PmRecoveryExternalKind,
76 pub timestamp_ms: u64,
77}
78
79#[derive(Debug, Deserialize)]
80pub struct ResolvePmRecoveryActionRequest {
81 pub request_id: uuid::Uuid,
82 pub input_digest: String,
83 pub wallet: hypercall_types::WalletAddress,
84 pub plan_id: String,
85 pub action_index: u32,
86 pub attempt: u32,
87 pub result: PmRecoveryActionResult,
88 pub recovered_usdc: rust_decimal::Decimal,
89 pub liability_reduction_usdc: rust_decimal::Decimal,
90 pub result_external_id: Option<String>,
91 pub timestamp_ms: u64,
92}
93
94#[derive(Debug, Serialize)]
95pub struct PmRecoveryProjectionResponse {
96 pub plans: Vec<hypercall_db::PmRecoveryPlanProjection>,
97 pub actions: Vec<hypercall_db::PmRecoveryActionProjection>,
98}
99
100#[derive(Debug, Serialize)]
101pub struct PmSettlementGateStateResponse {
102 pub portfolio_margin_pool_enabled: bool,
103 pub allowlist_count: usize,
104 pub allowlist_wallets: Vec<hypercall_types::WalletAddress>,
105 pub pools: Vec<PmSettlementGatePoolState>,
106 pub account_blockers: PmSettlementAccountBlockers,
107 pub recovery_actions: PmSettlementRecoveryActionGateState,
108 pub projection_freshness: PmSettlementProjectionFreshness,
109 pub expansion_ready: PmSettlementExpansionReadiness,
110}
111
112#[derive(Debug, Serialize)]
113pub struct PmSettlementGatePoolState {
114 pub underlying: String,
115 pub pool_available_usdc: rust_decimal::Decimal,
116 pub pool_target_usdc: rust_decimal::Decimal,
117 pub pool_capacity_usdc: rust_decimal::Decimal,
118 pub pool_utilization: Option<rust_decimal::Decimal>,
119 pub active_timing_bridge_usdc: rust_decimal::Decimal,
120 pub active_settlement_debt_usdc: rust_decimal::Decimal,
121 pub normal_utilization_cap: Option<rust_decimal::Decimal>,
122 pub crisis_utilization_cap: Option<rust_decimal::Decimal>,
123 pub below_target: bool,
124 pub utilization_unavailable: bool,
125 pub above_crisis_cap: bool,
126 pub projection_seq: i64,
127 pub last_engine_command_id: i64,
128}
129
130#[derive(Debug, Default, Serialize)]
131pub struct PmSettlementAccountBlockers {
132 pub total_accounts: usize,
133 pub bridged_accounts: usize,
134 pub debt_accounts: usize,
135 pub overdue_bridge_accounts: usize,
136 pub active_recovery_accounts: usize,
137 pub active_bridge_usdc: rust_decimal::Decimal,
138 pub active_debt_usdc: rust_decimal::Decimal,
139}
140
141#[derive(Debug, Default, Serialize)]
142pub struct PmSettlementRecoveryActionGateState {
143 pub total_actions: usize,
144 pub planned_actions: usize,
145 pub submitted_actions: usize,
146 pub terminal_actions: usize,
147}
148
149#[derive(Debug, Default, Serialize)]
150pub struct PmSettlementProjectionFreshness {
151 pub projection_rows: usize,
152 pub max_projection_seq: i64,
153 pub max_engine_command_id: i64,
154}
155
156#[derive(Debug, Serialize)]
157pub struct PmSettlementExpansionReadiness {
158 pub allowlist_configured: bool,
159 pub pool_facts_available: bool,
160 pub projections_present: bool,
161 pub no_pool_below_target: bool,
162 pub no_pool_above_crisis_cap: bool,
163 pub no_account_debt: bool,
164 pub no_active_recovery_accounts: bool,
165 pub no_overdue_bridge: bool,
166 pub no_planned_recovery_actions: bool,
167 pub no_submitted_recovery_actions: bool,
168 pub ready_for_single_wallet_smoke: bool,
169 pub ready_for_allowlist_expansion: bool,
170}
171
172#[utoipa::path(
176 post,
177 path = "/mmp/config",
178 request_body = SetMmpConfigRequest,
179 responses(
180 (status = 200, description = "MMP config updated", body = MmpConfigData),
181 (status = 403, description = "Wallet mismatch"),
182 (status = 500, description = "Internal server error")
183 ),
184 security(("eip712_signature" = [])),
185 tag = "MMP"
186)]
187pub async fn set_mmp_config(
188 State(state): State<AppState>,
189 signer_ctx: SignerContext,
190 SonicJson(request): SonicJson<crate::models::SetMmpConfigRequest>,
191) -> Result<SonicJson<crate::models::ApiResponse<crate::models::MmpConfigData>>, ApiError> {
192 tracing::info!(
193 "Setting MMP config for wallet: {} (signed by: {}), currency: {}",
194 signer_ctx.wallet_address,
195 signer_ctx.signer_address,
196 request.currency
197 );
198
199 if request.wallet != signer_ctx.wallet_address {
201 tracing::warn!("Wallet mismatch in MMP config request");
202 return Err(ApiError::forbidden(
203 "Wallet mismatch: signer does not match request wallet",
204 ));
205 }
206
207 let new_config = hypercall_db::MmpConfigRecord {
209 wallet_address: request.wallet,
210 currency: request.currency.clone(),
211 interval_ms: request.interval_ms,
212 frozen_time_ms: request.frozen_time_ms,
213 qty_limit: request.qty_limit,
214 delta_limit: request.delta_limit,
215 vega_limit: request.vega_limit,
216 enabled: true,
217 created_at: None,
218 updated_at: None,
219 };
220
221 state.mmp_cache.set_config(new_config).await.map_err(|e| {
223 tracing::error!("Failed to save MMP config: {}", e);
224 ApiError::internal_error("Failed to save MMP config")
225 })?;
226
227 let config_data = crate::models::MmpConfigData {
228 wallet_address: request.wallet,
229 currency: request.currency,
230 interval_ms: request.interval_ms,
231 frozen_time_ms: request.frozen_time_ms,
232 qty_limit: request.qty_limit,
233 delta_limit: request.delta_limit,
234 vega_limit: request.vega_limit,
235 enabled: true,
236 };
237
238 Ok(SonicJson(crate::models::ApiResponse::success(config_data)))
239}
240
241#[utoipa::path(
243 get,
244 path = "/mmp/config",
245 params(
246 ("wallet" = String, Query, description = "Wallet address"),
247 ("currency" = Option<String>, Query, description = "Currency filter")
248 ),
249 responses(
250 (status = 200, description = "MMP configuration(s)", body = MmpConfigResponse),
251 (status = 500, description = "Internal server error")
252 ),
253 tag = "MMP"
254)]
255pub async fn get_mmp_config(
256 State(state): State<AppState>,
257 Query(query): Query<crate::models::GetMmpConfigQuery>,
258) -> Result<SonicJson<crate::models::MmpConfigResponse>, ApiError> {
259 tracing::info!("Getting MMP config for wallet: {}", query.wallet);
260
261 let configs = if let Some(currency) = query.currency {
262 if let Some(config) = state.mmp_cache.get_config(&query.wallet, ¤cy).await {
264 vec![config]
265 } else {
266 vec![]
267 }
268 } else {
269 state.mmp_cache.get_configs_for_wallet(&query.wallet).await
271 };
272
273 let data: Vec<crate::models::MmpConfigData> = configs
274 .into_iter()
275 .map(|c| crate::models::MmpConfigData {
276 wallet_address: c.wallet_address,
277 currency: c.currency,
278 interval_ms: c.interval_ms,
279 frozen_time_ms: c.frozen_time_ms,
280 qty_limit: c.qty_limit,
281 delta_limit: c.delta_limit,
282 vega_limit: c.vega_limit,
283 enabled: c.enabled,
284 })
285 .collect();
286
287 Ok(SonicJson(crate::models::MmpConfigResponse {
288 success: true,
289 data,
290 }))
291}
292
293#[utoipa::path(
295 delete,
296 path = "/mmp/config",
297 request_body = DeleteMmpConfigRequest,
298 responses(
299 (status = 200, description = "MMP config deleted"),
300 (status = 403, description = "Wallet mismatch"),
301 (status = 500, description = "Internal server error")
302 ),
303 security(("eip712_signature" = [])),
304 tag = "MMP"
305)]
306pub async fn delete_mmp_config(
307 State(state): State<AppState>,
308 signer_ctx: SignerContext,
309 SonicJson(request): SonicJson<crate::models::DeleteMmpConfigRequest>,
310) -> Result<SonicJson<crate::models::ApiResponse<String>>, ApiError> {
311 tracing::info!(
312 "Deleting MMP config for wallet: {} (signed by: {}), currency: {}",
313 signer_ctx.wallet_address,
314 signer_ctx.signer_address,
315 request.currency
316 );
317
318 if request.wallet != signer_ctx.wallet_address {
320 tracing::warn!("Wallet mismatch in delete MMP config request");
321 return Err(ApiError::forbidden(
322 "Wallet mismatch: signer does not match request wallet",
323 ));
324 }
325
326 state
327 .mmp_cache
328 .delete_config(&request.wallet, &request.currency)
329 .await
330 .map_err(|e| {
331 tracing::error!("Failed to delete MMP config: {}", e);
332 ApiError::internal_error("Failed to delete MMP config")
333 })?;
334
335 Ok(SonicJson(crate::models::ApiResponse::success(
336 "MMP config deleted successfully".to_string(),
337 )))
338}
339
340#[utoipa::path(
342 post,
343 path = "/mmp/reset",
344 request_body = ResetMmpRequest,
345 responses(
346 (status = 200, description = "MMP state reset"),
347 (status = 403, description = "Wallet mismatch"),
348 (status = 500, description = "Internal server error")
349 ),
350 security(("eip712_signature" = [])),
351 tag = "MMP"
352)]
353pub async fn reset_mmp(
354 State(state): State<AppState>,
355 signer_ctx: SignerContext,
356 SonicJson(request): SonicJson<crate::models::ResetMmpRequest>,
357) -> Result<SonicJson<crate::models::ApiResponse<String>>, ApiError> {
358 tracing::info!(
359 "Resetting MMP for wallet: {} (signed by: {}), currency: {}",
360 signer_ctx.wallet_address,
361 signer_ctx.signer_address,
362 request.currency
363 );
364
365 if request.wallet != signer_ctx.wallet_address {
367 tracing::warn!("Wallet mismatch in reset MMP request");
368 return Err(ApiError::forbidden(
369 "Wallet mismatch: signer does not match request wallet",
370 ));
371 }
372
373 state
374 .mmp_cache
375 .reset_mmp(&request.wallet, &request.currency)
376 .await;
377
378 Ok(SonicJson(crate::models::ApiResponse::success(
379 "MMP state reset successfully".to_string(),
380 )))
381}
382
383#[utoipa::path(
387 get,
388 path = "/user/tier",
389 params(
390 ("wallet" = String, Query, description = "Wallet address")
391 ),
392 responses(
393 (status = 200, description = "User tier data", body = UserTierResponse),
394 (status = 500, description = "Internal server error")
395 ),
396 tag = "Admin"
397)]
398pub async fn get_user_tier(
399 State(state): State<AppState>,
400 Query(query): Query<GetUserTierQuery>,
401) -> Result<SonicJson<UserTierResponse>, ApiError> {
402 tracing::info!("Getting user tier for wallet: {}", query.wallet);
403
404 let tier = state
405 .tier_cache
406 .get_tier(&query.wallet)
407 .await
408 .unwrap_or_else(|| UserTierData {
409 wallet_address: query.wallet,
410 tier: "tier2".to_string(), });
412
413 Ok(SonicJson(UserTierResponse {
414 success: true,
415 data: tier,
416 }))
417}
418
419#[utoipa::path(
421 post,
422 path = "/user/tier",
423 request_body = SetUserTierRequest,
424 responses(
425 (status = 200, description = "User tier updated", body = UserTierData),
426 (status = 400, description = "Invalid tier value"),
427 (status = 500, description = "Internal server error")
428 ),
429 security(("eip712_signature" = [])),
430 tag = "Admin"
431)]
432pub async fn set_user_tier(
433 State(state): State<AppState>,
434 signer_ctx: SignerContext,
435 SonicJson(request): SonicJson<SetUserTierRequest>,
436) -> Result<SonicJson<crate::models::ApiResponse<UserTierData>>, ApiError> {
437 tracing::info!(
441 "Setting user tier for wallet: {} (signed by: {}), tier: {}",
442 request.wallet,
443 signer_ctx.signer_address,
444 request.tier
445 );
446
447 if request.tier != "tier1" && request.tier != "tier2" {
449 tracing::error!("Invalid tier value: {}", request.tier);
450 return Err(ApiError::bad_request(format!(
451 "Invalid tier value: {}. Must be 'tier1' or 'tier2'",
452 request.tier
453 )));
454 }
455
456 let previous_tier = state
457 .tier_cache
458 .get_tier_record(&request.wallet)
459 .await
460 .map_err(|e| {
461 tracing::error!("Failed to load current user tier: {}", e);
462 ApiError::internal_error("Failed to load current user tier")
463 })?;
464
465 let new_tier = UserTierUpdate {
466 wallet_address: request.wallet,
467 tier: request.tier.clone(),
468 margin_mode: None, version: None, max_open_orders: None, max_open_positions: None, orders_per_minute: None, cancels_per_minute: None, api_requests_per_minute: None, };
476
477 state.tier_cache.set_tier(new_tier).await.map_err(|e| {
478 tracing::error!("Failed to save user tier: {}", e);
479 ApiError::internal_error("Failed to save user tier")
480 })?;
481
482 let wallet = request.wallet;
483 if let Err(e) = super::submit_tier_update_command(&state, wallet).await {
484 tracing::error!("Failed to apply user tier update in engine: {}", e);
485 if let Err(rollback_err) = state
486 .tier_cache
487 .restore_tier_record(&wallet, previous_tier.as_ref())
488 .await
489 {
490 tracing::error!(
491 wallet = %wallet,
492 error = %rollback_err,
493 "Failed to roll back tier after engine apply failure"
494 );
495 }
496 return Err(ApiError::internal_error("Failed to apply user tier update"));
497 }
498
499 let tier_data = UserTierData {
500 wallet_address: wallet,
501 tier: request.tier,
502 };
503
504 Ok(SonicJson(crate::models::ApiResponse::success(tier_data)))
505}
506
507#[utoipa::path(
509 delete,
510 path = "/user/tier",
511 request_body = DeleteUserTierRequest,
512 responses(
513 (status = 200, description = "User tier deleted"),
514 (status = 500, description = "Internal server error")
515 ),
516 security(("eip712_signature" = [])),
517 tag = "Admin"
518)]
519pub async fn delete_user_tier(
520 State(state): State<AppState>,
521 signer_ctx: SignerContext,
522 SonicJson(request): SonicJson<DeleteUserTierRequest>,
523) -> Result<SonicJson<crate::models::ApiResponse<String>>, ApiError> {
524 tracing::info!(
527 "Resetting user tier for wallet: {} (signed by: {})",
528 request.wallet,
529 signer_ctx.signer_address
530 );
531
532 let previous_tier = state
533 .tier_cache
534 .get_tier_record(&request.wallet)
535 .await
536 .map_err(|e| {
537 tracing::error!("Failed to load current user tier: {}", e);
538 ApiError::internal_error("Failed to load current user tier")
539 })?;
540
541 state
542 .tier_cache
543 .delete_tier(&request.wallet)
544 .await
545 .map_err(|e| {
546 tracing::error!("Failed to reset user tier: {}", e);
547 ApiError::internal_error("Failed to reset user tier")
548 })?;
549
550 if let Err(e) = super::submit_tier_update_command(&state, request.wallet).await {
551 tracing::error!("Failed to apply user tier deletion in engine: {}", e);
552 if let Err(rollback_err) = state
553 .tier_cache
554 .restore_tier_record(&request.wallet, previous_tier.as_ref())
555 .await
556 {
557 tracing::error!(
558 wallet = %request.wallet,
559 error = %rollback_err,
560 "Failed to roll back tier after engine apply failure"
561 );
562 }
563 return Err(ApiError::internal_error(
564 "Failed to apply user tier deletion",
565 ));
566 }
567
568 Ok(SonicJson(crate::models::ApiResponse::success(
569 "User tier reset successfully (reverted to default)".to_string(),
570 )))
571}
572
573#[cfg(feature = "test-endpoints")]
578#[derive(Debug, Serialize, ToSchema)]
580pub struct CancelAllOrdersResponse {
581 pub total_mm_orders: usize,
583 pub message: String,
585}
586
587#[cfg(feature = "test-endpoints")]
588#[utoipa::path(
597 post,
598 path = "/test/cancel-all-orders",
599 responses(
600 (status = 200, description = "MM orders cancelled", body = ApiResponse<CancelAllOrdersResponse>),
601 (status = 403, description = "Not in testnet mode"),
602 (status = 500, description = "Internal server error")
603 ),
604 tag = "testnet"
605)]
606pub async fn cancel_all_orders(
607 State(state): State<AppState>,
608) -> Result<SonicJson<crate::models::ApiResponse<CancelAllOrdersResponse>>, ApiError> {
609 let all_orders = state.order_snapshot.get_all_orders();
610
611 {
613 let all_quotes = state.quote_provider.all_quotes();
614 let order_symbols: std::collections::HashSet<String> =
615 all_orders.iter().map(|(o, _)| o.symbol.clone()).collect();
616 let mut quoted_no_orders = Vec::new();
617 for (symbol, quote) in &all_quotes {
618 let has_depth = !quote.bids.is_empty() || !quote.asks.is_empty();
619 if has_depth && !order_symbols.contains(symbol) {
620 quoted_no_orders.push(format!(
621 "{}(bids={},asks={})",
622 symbol,
623 quote.bids.len(),
624 quote.asks.len()
625 ));
626 }
627 }
628 if !quoted_no_orders.is_empty() {
629 tracing::warn!(
630 count = quoted_no_orders.len(),
631 symbols = ?quoted_no_orders,
632 "cancel_all_orders: symbols have orderbook depth but NO orders in snapshot"
633 );
634 }
635 tracing::info!(
636 total_orders = all_orders.len(),
637 total_quoted_symbols = all_quotes.len(),
638 "cancel_all_orders: snapshot state"
639 );
640 }
641
642 let unknown_order_ids: Vec<i64> = all_orders
645 .iter()
646 .filter(|(o, _)| o.client_id.is_none())
647 .filter_map(|(o, _)| i64::try_from(o.order_id).ok())
648 .collect();
649
650 let db_client_ids: std::collections::HashMap<i64, Option<String>> = if !unknown_order_ids
651 .is_empty()
652 {
653 {
654 let analytics: &dyn hypercall_db::AnalyticsReader = state.db.as_ref();
655 match analytics
656 .get_client_ids_by_order_ids(&unknown_order_ids)
657 .await
658 {
659 Ok(map) => {
660 tracing::info!(
661 unknown_count = unknown_order_ids.len(),
662 db_found = map.len(),
663 "cancel_all_orders: resolved client_ids from DB"
664 );
665 map
666 }
667 Err(e) => {
668 tracing::warn!(
669 error = %e,
670 "cancel_all_orders: DB lookup failed, falling back to in-memory client_id"
671 );
672 std::collections::HashMap::new()
673 }
674 }
675 }
676 } else {
677 std::collections::HashMap::new()
678 };
679
680 let is_mm_order = |order: &hypercall_runtime_api::RuntimeOrderSummary| -> bool {
682 let effective_client_id = order.client_id.clone().or_else(|| {
683 i64::try_from(order.order_id)
685 .ok()
686 .and_then(|id| db_client_ids.get(&id))
687 .cloned()
688 .flatten()
689 });
690
691 match &effective_client_id {
692 Some(cid) => cid.starts_with("cli_") && cid[4..].bytes().all(|b| b.is_ascii_digit()),
693 None => {
694 false
698 }
699 }
700 };
701
702 for (order, wallet) in &all_orders {
704 if !is_mm_order(order) {
705 tracing::info!(
706 order_id = order.order_id,
707 symbol = %order.symbol,
708 client_id = ?order.client_id,
709 wallet = %wallet,
710 "cancel_all_orders: skipping non-MM order"
711 );
712 }
713 }
714
715 let mm_orders: Vec<_> = all_orders
717 .into_iter()
718 .filter(|(order, _wallet)| is_mm_order(order))
719 .collect();
720
721 let total = mm_orders.len();
722 tracing::info!(
723 "cancel_all_orders: found {} MM/orphan orders to cancel, spawning background task",
724 total
725 );
726
727 let order_sender = state.order_sender.clone();
728 tokio::spawn(async move {
729 let mut succeeded = 0usize;
730 let mut failed = 0usize;
731
732 for (i, (order, wallet)) in mm_orders.iter().enumerate() {
733 let order_id = order.order_id;
734 let order_info = OrderInfo {
735 symbol: String::new(),
736 price: dec!(0),
737 size: dec!(0),
738 side: Side::Buy,
739 tif: TimeInForce::GTC,
740 client_id: None,
741 order_id: Some(order_id),
742 is_perp: false,
743 underlying: None,
744 reduce_only: None,
745 nonce: None,
746 signature: None,
747 mmp_enabled: false,
748 builder_code_address: None,
749 };
750
751 let order_action_msg = OrderActionMessage {
752 timestamp: get_timestamp_millis(),
753 info: order_info,
754 action: OrderAction::CancelOrder,
755 wallet: *wallet,
756 api_wallet_address: Some(*wallet),
757 mmp_triggered: false,
758 request_id: Some(uuid::Uuid::now_v7().to_string()),
759 };
760
761 let (response_tx, mut response_rx) = tokio::sync::mpsc::channel(1);
762
763 let engine_request = hypercall_runtime_api::UnifiedEngineRequest {
764 message: order_action_msg,
765 response_tx,
766 enqueued_at: Instant::now(),
767 #[cfg(feature = "otel-tracing")]
768 trace_context: None,
769 };
770
771 increment_pending_requests();
772 if order_sender.send(engine_request).await.is_err() {
773 tracing::error!(
774 "cancel_all_orders: failed to send cancel for order {}",
775 order_id
776 );
777 failed += 1;
778 continue;
779 }
780
781 match tokio::time::timeout(std::time::Duration::from_secs(5), response_rx.recv()).await
782 {
783 Ok(Some(resp)) if resp.status == OrderUpdateStatus::Canceled => {
784 succeeded += 1;
785 }
786 Ok(Some(resp)) => {
787 tracing::warn!(
788 "cancel_all_orders: order {} got status {:?}",
789 order_id,
790 resp.status
791 );
792 failed += 1;
793 }
794 _ => {
795 tracing::warn!(
796 "cancel_all_orders: timeout/no response for order {}",
797 order_id
798 );
799 failed += 1;
800 }
801 }
802
803 if (i + 1) % 1000 == 0 {
804 tracing::info!(
805 "cancel_all_orders: progress {}/{} (succeeded={}, failed={})",
806 i + 1,
807 total,
808 succeeded,
809 failed
810 );
811 }
812 }
813
814 tracing::info!(
815 "cancel_all_orders: done. sent={}, succeeded={}, failed={}",
816 total,
817 succeeded,
818 failed
819 );
820 });
821
822 Ok(SonicJson(crate::models::ApiResponse::success(
823 CancelAllOrdersResponse {
824 total_mm_orders: total,
825 message: format!("Spawned background task to cancel {} MM orders", total),
826 },
827 )))
828}
829
830fn ensure_pm_settlement_pool_enabled(state: &AppState) -> Result<(), ApiError> {
831 if state.runtime_config.portfolio_margin_pool_enabled {
832 Ok(())
833 } else {
834 Err(ApiError::new(
835 StatusCode::SERVICE_UNAVAILABLE,
836 "portfolio_margin_pool_disabled",
837 "portfolio_margin_pool_enabled is false; PM settlement pool mutations are disabled",
838 ))
839 }
840}
841
842fn ensure_pm_settlement_wallet_allowed(
843 state: &AppState,
844 wallet: &hypercall_types::WalletAddress,
845) -> Result<(), ApiError> {
846 if state
847 .runtime_config
848 .portfolio_margin_settlement_allowlist
849 .contains(wallet)
850 {
851 Ok(())
852 } else {
853 Err(ApiError::new(
854 StatusCode::FORBIDDEN,
855 "portfolio_margin_wallet_not_allowlisted",
856 "wallet is not allowlisted for PM settlement pool recovery commands",
857 ))
858 }
859}
860
861async fn submit_pm_settlement_command(
862 state: &AppState,
863 command: hypercall_engine::command::EngineCommand,
864) -> Result<(), ApiError> {
865 let sender = state
866 .pm_settlement_admin_sender
867 .as_ref()
868 .ok_or_else(|| ApiError::internal_error("PM settlement admin channel is not configured"))?;
869 let (tx, rx) = tokio::sync::oneshot::channel();
870 sender
871 .send(hypercall_runtime_api::PmSettlementAdminRequest {
872 command,
873 applied_tx: tx,
874 })
875 .await
876 .map_err(|_| ApiError::internal_error("PM settlement admin channel is closed"))?;
877 tokio::time::timeout(super::ENGINE_RESPONSE_TIMEOUT, rx)
878 .await
879 .map_err(|_| ApiError::gateway_timeout("PM settlement command timed out"))?
880 .map_err(|_| ApiError::internal_error("PM settlement command acknowledgement dropped"))?
881 .map_err(ApiError::bad_request)
882}
883
884pub async fn set_pm_settlement_pool_config(
885 State(state): State<AppState>,
886 SonicJson(request): SonicJson<SetPmSettlementPoolConfigRequest>,
887) -> Result<SonicJson<crate::models::ApiResponse<String>>, ApiError> {
888 ensure_pm_settlement_pool_enabled(&state)?;
889 submit_pm_settlement_command(
890 &state,
891 hypercall_engine::command::EngineCommand::SetPmSettlementPoolConfig(
892 SetPmSettlementPoolConfigCommand {
893 request_id: request.request_id,
894 input_digest: request.input_digest,
895 underlying: request.underlying,
896 config_version: request.config_version,
897 config: request.config,
898 timestamp_ms: request.timestamp_ms,
899 },
900 ),
901 )
902 .await?;
903 Ok(SonicJson(crate::models::ApiResponse::success(
904 "PM settlement pool config command accepted".to_string(),
905 )))
906}
907
908pub async fn accrue_pm_settlement_interest(
909 State(state): State<AppState>,
910 SonicJson(request): SonicJson<AccruePmSettlementInterestRequest>,
911) -> Result<SonicJson<crate::models::ApiResponse<String>>, ApiError> {
912 ensure_pm_settlement_pool_enabled(&state)?;
913 ensure_pm_settlement_wallet_allowed(&state, &request.wallet)?;
914 submit_pm_settlement_command(
915 &state,
916 hypercall_engine::command::EngineCommand::AccruePmSettlementInterest(
917 AccruePmSettlementInterestCommand {
918 request_id: request.request_id,
919 input_digest: request.input_digest,
920 wallet: request.wallet,
921 underlying: request.underlying,
922 to_ms: request.to_ms,
923 timestamp_ms: request.timestamp_ms,
924 },
925 ),
926 )
927 .await?;
928 Ok(SonicJson(crate::models::ApiResponse::success(
929 "PM settlement interest accrual command accepted".to_string(),
930 )))
931}
932
933pub async fn journal_pm_recovery_plan(
934 State(state): State<AppState>,
935 SonicJson(request): SonicJson<JournalPmRecoveryPlanRequest>,
936) -> Result<SonicJson<crate::models::ApiResponse<String>>, ApiError> {
937 ensure_pm_settlement_pool_enabled(&state)?;
938 ensure_pm_settlement_wallet_allowed(&state, &request.plan.wallet)?;
939 submit_pm_settlement_command(
940 &state,
941 hypercall_engine::command::EngineCommand::JournalPmRecoveryPlan(
942 JournalPmRecoveryPlanCommand {
943 request_id: request.request_id,
944 input_digest: request.input_digest,
945 plan: request.plan,
946 timestamp_ms: request.timestamp_ms,
947 },
948 ),
949 )
950 .await?;
951 Ok(SonicJson(crate::models::ApiResponse::success(
952 "PM recovery plan journal command accepted".to_string(),
953 )))
954}
955
956pub async fn mark_pm_recovery_action_submitted(
957 State(state): State<AppState>,
958 SonicJson(request): SonicJson<MarkPmRecoveryActionSubmittedRequest>,
959) -> Result<SonicJson<crate::models::ApiResponse<String>>, ApiError> {
960 ensure_pm_settlement_pool_enabled(&state)?;
961 ensure_pm_settlement_wallet_allowed(&state, &request.wallet)?;
962 submit_pm_settlement_command(
963 &state,
964 hypercall_engine::command::EngineCommand::MarkPmRecoveryActionSubmitted(
965 MarkPmRecoveryActionSubmittedCommand {
966 request_id: request.request_id,
967 input_digest: request.input_digest,
968 wallet: request.wallet,
969 plan_id: request.plan_id,
970 action_index: request.action_index,
971 attempt: request.attempt,
972 external_id: request.external_id,
973 external_kind: request.external_kind,
974 timestamp_ms: request.timestamp_ms,
975 },
976 ),
977 )
978 .await?;
979 Ok(SonicJson(crate::models::ApiResponse::success(
980 "PM recovery action submission command accepted".to_string(),
981 )))
982}
983
984pub async fn resolve_pm_recovery_action(
985 State(state): State<AppState>,
986 SonicJson(request): SonicJson<ResolvePmRecoveryActionRequest>,
987) -> Result<SonicJson<crate::models::ApiResponse<String>>, ApiError> {
988 ensure_pm_settlement_pool_enabled(&state)?;
989 ensure_pm_settlement_wallet_allowed(&state, &request.wallet)?;
990 submit_pm_settlement_command(
991 &state,
992 hypercall_engine::command::EngineCommand::ResolvePmRecoveryAction(
993 ResolvePmRecoveryActionCommand {
994 request_id: request.request_id,
995 input_digest: request.input_digest,
996 wallet: request.wallet,
997 plan_id: request.plan_id,
998 action_index: request.action_index,
999 attempt: request.attempt,
1000 result: request.result,
1001 recovered_usdc: request.recovered_usdc,
1002 liability_reduction_usdc: request.liability_reduction_usdc,
1003 result_external_id: request.result_external_id,
1004 timestamp_ms: request.timestamp_ms,
1005 },
1006 ),
1007 )
1008 .await?;
1009 Ok(SonicJson(crate::models::ApiResponse::success(
1010 "PM recovery action resolution command accepted".to_string(),
1011 )))
1012}
1013
1014pub async fn get_pm_settlement_pools(
1015 State(state): State<AppState>,
1016) -> Result<
1017 SonicJson<crate::models::ApiResponse<Vec<hypercall_db::PmSettlementPoolProjection>>>,
1018 ApiError,
1019> {
1020 ensure_pm_settlement_pool_enabled(&state)?;
1021 let rows = state.db.list_pm_settlement_pools().await.map_err(|error| {
1022 tracing::error!("Failed to read PM settlement pool projections: {}", error);
1023 ApiError::internal_error("Failed to read PM settlement pool projections")
1024 })?;
1025 Ok(SonicJson(crate::models::ApiResponse::success(rows)))
1026}
1027
1028pub async fn get_pm_settlement_accounts(
1029 State(state): State<AppState>,
1030) -> Result<
1031 SonicJson<crate::models::ApiResponse<Vec<hypercall_db::PmSettlementAccountProjection>>>,
1032 ApiError,
1033> {
1034 ensure_pm_settlement_pool_enabled(&state)?;
1035 let rows = state
1036 .db
1037 .list_pm_settlement_accounts()
1038 .await
1039 .map_err(|error| {
1040 tracing::error!(
1041 "Failed to read PM settlement account projections: {}",
1042 error
1043 );
1044 ApiError::internal_error("Failed to read PM settlement account projections")
1045 })?;
1046 Ok(SonicJson(crate::models::ApiResponse::success(rows)))
1047}
1048
1049pub async fn get_pm_settlement_events(
1050 State(state): State<AppState>,
1051) -> Result<
1052 SonicJson<crate::models::ApiResponse<Vec<hypercall_db::PmSettlementEventProjection>>>,
1053 ApiError,
1054> {
1055 ensure_pm_settlement_pool_enabled(&state)?;
1056 let rows = state
1057 .db
1058 .list_pm_settlement_events()
1059 .await
1060 .map_err(|error| {
1061 tracing::error!("Failed to read PM settlement event projections: {}", error);
1062 ApiError::internal_error("Failed to read PM settlement event projections")
1063 })?;
1064 Ok(SonicJson(crate::models::ApiResponse::success(rows)))
1065}
1066
1067pub async fn get_pm_settlement_interest_events(
1068 State(state): State<AppState>,
1069) -> Result<
1070 SonicJson<crate::models::ApiResponse<Vec<hypercall_db::PmSettlementInterestEventProjection>>>,
1071 ApiError,
1072> {
1073 ensure_pm_settlement_pool_enabled(&state)?;
1074 let rows = state
1075 .db
1076 .list_pm_settlement_interest_events()
1077 .await
1078 .map_err(|error| {
1079 tracing::error!(
1080 "Failed to read PM settlement interest event projections: {}",
1081 error
1082 );
1083 ApiError::internal_error("Failed to read PM settlement interest event projections")
1084 })?;
1085 Ok(SonicJson(crate::models::ApiResponse::success(rows)))
1086}
1087
1088pub async fn get_pm_settlement_repayment_events(
1089 State(state): State<AppState>,
1090) -> Result<
1091 SonicJson<crate::models::ApiResponse<Vec<hypercall_db::PmSettlementRepaymentEventProjection>>>,
1092 ApiError,
1093> {
1094 ensure_pm_settlement_pool_enabled(&state)?;
1095 let rows = state
1096 .db
1097 .list_pm_settlement_repayment_events()
1098 .await
1099 .map_err(|error| {
1100 tracing::error!(
1101 "Failed to read PM settlement repayment event projections: {}",
1102 error
1103 );
1104 ApiError::internal_error("Failed to read PM settlement repayment event projections")
1105 })?;
1106 Ok(SonicJson(crate::models::ApiResponse::success(rows)))
1107}
1108
1109pub async fn get_pm_recovery_projections(
1110 State(state): State<AppState>,
1111) -> Result<SonicJson<crate::models::ApiResponse<PmRecoveryProjectionResponse>>, ApiError> {
1112 ensure_pm_settlement_pool_enabled(&state)?;
1113 let plans = state.db.list_pm_recovery_plans().await.map_err(|error| {
1114 tracing::error!("Failed to read PM recovery plan projections: {}", error);
1115 ApiError::internal_error("Failed to read PM recovery plan projections")
1116 })?;
1117 let actions = state.db.list_pm_recovery_actions().await.map_err(|error| {
1118 tracing::error!("Failed to read PM recovery action projections: {}", error);
1119 ApiError::internal_error("Failed to read PM recovery action projections")
1120 })?;
1121 Ok(SonicJson(crate::models::ApiResponse::success(
1122 PmRecoveryProjectionResponse { plans, actions },
1123 )))
1124}
1125
1126pub async fn get_pm_settlement_gate_state(
1127 State(state): State<AppState>,
1128) -> Result<SonicJson<crate::models::ApiResponse<PmSettlementGateStateResponse>>, ApiError> {
1129 ensure_pm_settlement_pool_enabled(&state)?;
1130
1131 let pools = state.db.list_pm_settlement_pools().await.map_err(|error| {
1132 tracing::error!(
1133 "Failed to read PM settlement pool projections for gate state: {}",
1134 error
1135 );
1136 ApiError::internal_error("Failed to read PM settlement pool projections")
1137 })?;
1138 let pool_gate_counts = state
1139 .db
1140 .pm_settlement_pool_gate_counts()
1141 .await
1142 .map_err(|error| {
1143 tracing::error!("Failed to read PM settlement pool gate counts: {}", error);
1144 ApiError::internal_error("Failed to read PM settlement pool gate counts")
1145 })?;
1146 let accounts = state
1147 .db
1148 .list_pm_settlement_accounts()
1149 .await
1150 .map_err(|error| {
1151 tracing::error!(
1152 "Failed to read PM settlement account projections for gate state: {}",
1153 error
1154 );
1155 ApiError::internal_error("Failed to read PM settlement account projections")
1156 })?;
1157 let recovery_plans = state.db.list_pm_recovery_plans().await.map_err(|error| {
1158 tracing::error!(
1159 "Failed to read PM recovery plan projections for gate state: {}",
1160 error
1161 );
1162 ApiError::internal_error("Failed to read PM recovery plan projections")
1163 })?;
1164 let recovery_action_rows = state.db.list_pm_recovery_actions().await.map_err(|error| {
1165 tracing::error!(
1166 "Failed to read PM recovery action projections for gate state: {}",
1167 error
1168 );
1169 ApiError::internal_error("Failed to read PM recovery action projections")
1170 })?;
1171 let now_ms = chrono::Utc::now().timestamp_millis();
1172 let account_gate_counts = state
1173 .db
1174 .pm_settlement_account_gate_counts(now_ms)
1175 .await
1176 .map_err(|error| {
1177 tracing::error!(
1178 "Failed to read PM settlement account gate counts: {}",
1179 error
1180 );
1181 ApiError::internal_error("Failed to read PM settlement account gate counts")
1182 })?;
1183 let recovery_action_gate_counts =
1184 state
1185 .db
1186 .pm_recovery_action_gate_counts()
1187 .await
1188 .map_err(|error| {
1189 tracing::error!("Failed to read PM recovery action gate counts: {}", error);
1190 ApiError::internal_error("Failed to read PM recovery action gate counts")
1191 })?;
1192
1193 let gate_pools = pools
1194 .iter()
1195 .map(|pool| {
1196 let below_target = pool.pool_available_usdc < pool.pool_target_usdc;
1197 let utilization_unavailable = pool.pool_utilization.is_none();
1198 let above_crisis_cap = match (pool.pool_utilization, pool.crisis_utilization_cap) {
1199 (Some(utilization), Some(cap)) => utilization > cap,
1200 _ => false,
1201 };
1202 PmSettlementGatePoolState {
1203 underlying: pool.underlying.clone(),
1204 pool_available_usdc: pool.pool_available_usdc,
1205 pool_target_usdc: pool.pool_target_usdc,
1206 pool_capacity_usdc: pool.pool_capacity_usdc,
1207 pool_utilization: pool.pool_utilization,
1208 active_timing_bridge_usdc: pool.active_timing_bridge_usdc,
1209 active_settlement_debt_usdc: pool.active_settlement_debt_usdc,
1210 normal_utilization_cap: pool.normal_utilization_cap,
1211 crisis_utilization_cap: pool.crisis_utilization_cap,
1212 below_target,
1213 utilization_unavailable,
1214 above_crisis_cap,
1215 projection_seq: pool.projection_seq,
1216 last_engine_command_id: pool.last_engine_command_id,
1217 }
1218 })
1219 .collect::<Vec<_>>();
1220
1221 let account_blockers = pm_settlement_account_blockers(account_gate_counts);
1222 let recovery_actions = pm_settlement_recovery_action_gate_state(recovery_action_gate_counts);
1223
1224 let projection_freshness = pm_settlement_projection_freshness(
1225 &pools,
1226 &accounts,
1227 &recovery_plans,
1228 &recovery_action_rows,
1229 );
1230 let pool_facts_available = pool_gate_counts.total_pools > 0
1231 && pool_gate_counts.missing_utilization_pools == 0
1232 && pool_gate_counts.missing_crisis_cap_pools == 0;
1233 let no_pool_below_target = pool_gate_counts.below_target_pools == 0;
1234 let no_pool_above_crisis_cap = pool_gate_counts.above_crisis_cap_pools == 0;
1235 let no_account_debt = account_blockers.debt_accounts == 0;
1236 let no_active_recovery_accounts = account_blockers.active_recovery_accounts == 0;
1237 let no_overdue_bridge = account_blockers.overdue_bridge_accounts == 0;
1238 let no_planned_recovery_actions = recovery_actions.planned_actions == 0;
1239 let no_submitted_recovery_actions = recovery_actions.submitted_actions == 0;
1240 let allowlist_configured = !state
1241 .runtime_config
1242 .portfolio_margin_settlement_allowlist
1243 .is_empty();
1244 let projections_present = projection_freshness.projection_rows > 0;
1245 let ready_for_single_wallet_smoke =
1246 allowlist_configured && pool_facts_available && projections_present;
1247 let ready_for_allowlist_expansion = ready_for_single_wallet_smoke
1248 && no_pool_below_target
1249 && no_pool_above_crisis_cap
1250 && no_account_debt
1251 && no_active_recovery_accounts
1252 && no_overdue_bridge
1253 && no_planned_recovery_actions
1254 && no_submitted_recovery_actions;
1255
1256 Ok(SonicJson(crate::models::ApiResponse::success(
1257 PmSettlementGateStateResponse {
1258 portfolio_margin_pool_enabled: true,
1259 allowlist_count: state
1260 .runtime_config
1261 .portfolio_margin_settlement_allowlist
1262 .len(),
1263 allowlist_wallets: state
1264 .runtime_config
1265 .portfolio_margin_settlement_allowlist
1266 .clone(),
1267 pools: gate_pools,
1268 account_blockers,
1269 recovery_actions,
1270 projection_freshness,
1271 expansion_ready: PmSettlementExpansionReadiness {
1272 allowlist_configured,
1273 pool_facts_available,
1274 projections_present,
1275 no_pool_below_target,
1276 no_pool_above_crisis_cap,
1277 no_account_debt,
1278 no_active_recovery_accounts,
1279 no_overdue_bridge,
1280 no_planned_recovery_actions,
1281 no_submitted_recovery_actions,
1282 ready_for_single_wallet_smoke,
1283 ready_for_allowlist_expansion,
1284 },
1285 },
1286 )))
1287}
1288
1289fn count_to_usize(count: i64) -> usize {
1290 usize::try_from(count).expect("Postgres COUNT returned an invalid count")
1291}
1292
1293fn pm_settlement_account_blockers(
1294 counts: hypercall_db::PmSettlementAccountGateCounts,
1295) -> PmSettlementAccountBlockers {
1296 PmSettlementAccountBlockers {
1297 total_accounts: count_to_usize(counts.total_accounts),
1298 bridged_accounts: count_to_usize(counts.bridged_accounts),
1299 debt_accounts: count_to_usize(counts.debt_accounts),
1300 overdue_bridge_accounts: count_to_usize(counts.overdue_bridge_accounts),
1301 active_recovery_accounts: count_to_usize(counts.active_recovery_accounts),
1302 active_bridge_usdc: counts.active_bridge_usdc,
1303 active_debt_usdc: counts.active_debt_usdc,
1304 }
1305}
1306
1307fn pm_settlement_recovery_action_gate_state(
1308 counts: hypercall_db::PmRecoveryActionGateCounts,
1309) -> PmSettlementRecoveryActionGateState {
1310 PmSettlementRecoveryActionGateState {
1311 total_actions: count_to_usize(counts.total_actions),
1312 planned_actions: count_to_usize(counts.planned_actions),
1313 submitted_actions: count_to_usize(counts.submitted_actions),
1314 terminal_actions: count_to_usize(counts.terminal_actions),
1315 }
1316}
1317
1318fn pm_settlement_projection_freshness(
1319 pools: &[hypercall_db::PmSettlementPoolProjection],
1320 accounts: &[hypercall_db::PmSettlementAccountProjection],
1321 plans: &[hypercall_db::PmRecoveryPlanProjection],
1322 actions: &[hypercall_db::PmRecoveryActionProjection],
1323) -> PmSettlementProjectionFreshness {
1324 let projection_rows = pools.len() + accounts.len() + plans.len() + actions.len();
1325 let max_projection_seq = pools
1326 .iter()
1327 .map(|row| row.projection_seq)
1328 .chain(accounts.iter().map(|row| row.projection_seq))
1329 .chain(plans.iter().map(|row| row.projection_seq))
1330 .chain(actions.iter().map(|row| row.projection_seq))
1331 .max()
1332 .unwrap_or_default();
1333 let max_engine_command_id = pools
1334 .iter()
1335 .map(|row| row.last_engine_command_id)
1336 .chain(accounts.iter().map(|row| row.last_engine_command_id))
1337 .chain(plans.iter().map(|row| row.engine_command_id))
1338 .chain(actions.iter().map(|row| row.engine_command_id))
1339 .max()
1340 .unwrap_or_default();
1341
1342 PmSettlementProjectionFreshness {
1343 projection_rows,
1344 max_projection_seq,
1345 max_engine_command_id,
1346 }
1347}