Skip to main content

hypercall_admin/monitoring/
outbox.rs

1//! Directive outbox + transaction submitter admin endpoints.
2
3use alloy::primitives::Address;
4use axum::{
5    extract::{Query, State},
6    http::StatusCode,
7    response::IntoResponse,
8};
9use serde::{Deserialize, Serialize};
10use sonic_rs::json;
11use std::str::FromStr;
12
13use crate::state::AdminState;
14use hypercall_db::AsyncDirectiveOutboxReader;
15use hypercall_runtime_api::sonic_json::SonicJson;
16use hypercall_transaction_submitter_db::TransactionSubmitterReader;
17
18#[derive(Debug, Deserialize, utoipa::IntoParams, utoipa::ToSchema)]
19pub struct DirectiveOutboxQuery {
20    #[serde(default = "default_directive_outbox_limit")]
21    pub limit: usize,
22    #[serde(default)]
23    pub offset: usize,
24}
25
26fn default_directive_outbox_limit() -> usize {
27    100
28}
29
30#[derive(Debug, Serialize, utoipa::ToSchema)]
31pub struct DirectiveOutboxRowResponse {
32    pub outbox_seq: i64,
33    pub directive_id: String,
34    pub kind: String,
35    pub action_key: String,
36    pub wallet_address: String,
37    pub account_address: String,
38    pub signer_address: String,
39    pub directive_nonce: i64,
40    pub domain_status: String,
41    pub delivery_status: String,
42    pub submitter_address: Option<String>,
43    pub submitter_nonce: Option<u64>,
44    pub tx_hash: Option<String>,
45    pub delivery_attempts: i64,
46    pub last_delivery_error: Option<String>,
47    pub created_at: String,
48    pub created_ts_ms: i64,
49    pub last_attempt_at: Option<String>,
50    pub last_attempt_ts_ms: Option<i64>,
51    pub expires_at: Option<String>,
52    pub expires_at_ms: Option<i64>,
53}
54
55#[derive(Debug, Serialize, utoipa::ToSchema)]
56pub struct DirectiveOutboxResponse {
57    pub count: usize,
58    pub rows: Vec<DirectiveOutboxRowResponse>,
59}
60
61#[derive(Debug, Deserialize, utoipa::IntoParams, utoipa::ToSchema)]
62pub struct TransactionSubmitterQuery {
63    pub submitter: String,
64    pub nonce: u64,
65}
66
67#[derive(Debug, Serialize, utoipa::ToSchema)]
68pub struct TransactionSubmitterAttemptResponse {
69    pub tx_hash: String,
70}
71
72#[derive(Debug, Serialize, utoipa::ToSchema)]
73pub struct TransactionSubmitterDetailResponse {
74    pub submitter: String,
75    pub nonce: u64,
76    pub status: String,
77    pub primary_tx_hash: Option<String>,
78    pub terminal_error: Option<String>,
79    pub attempts: Vec<TransactionSubmitterAttemptResponse>,
80}
81
82fn directive_outbox_response_from_rows(
83    rows: Vec<hypercall_db::DirectiveOutboxRecentRow>,
84) -> DirectiveOutboxResponse {
85    let rows: Vec<_> = rows
86        .into_iter()
87        .map(|row| DirectiveOutboxRowResponse {
88            outbox_seq: row.outbox_seq,
89            directive_id: row.directive_id,
90            kind: row.kind,
91            action_key: row.action_key,
92            wallet_address: row.wallet_address.to_string(),
93            account_address: row.account_address.to_string(),
94            signer_address: row.signer_address.to_string(),
95            directive_nonce: row.directive_nonce,
96            domain_status: row.domain_status,
97            delivery_status: row.delivery_status,
98            submitter_address: row.submitter_address.map(|address| address.to_string()),
99            submitter_nonce: row.submitter_nonce,
100            tx_hash: row.tx_hash,
101            delivery_attempts: row.delivery_attempts,
102            last_delivery_error: row.last_delivery_error,
103            created_at: timestamp_ms_to_rfc3339(row.created_ts_ms),
104            created_ts_ms: row.created_ts_ms,
105            last_attempt_at: optional_timestamp_ms_to_rfc3339(row.last_attempt_ts_ms),
106            last_attempt_ts_ms: row.last_attempt_ts_ms,
107            expires_at: optional_timestamp_ms_to_rfc3339(row.expires_at_ms),
108            expires_at_ms: row.expires_at_ms,
109        })
110        .collect();
111    let count = rows.len();
112    DirectiveOutboxResponse { count, rows }
113}
114
115fn transaction_submitter_detail_response(
116    row: hypercall_transaction_submitter_db::SubmissionDetailRow,
117) -> TransactionSubmitterDetailResponse {
118    TransactionSubmitterDetailResponse {
119        submitter: row.submitter.to_string(),
120        nonce: row.nonce,
121        status: row.status.as_str().to_string(),
122        primary_tx_hash: row.primary_tx_hash,
123        terminal_error: row.terminal_error,
124        attempts: row
125            .attempts
126            .into_iter()
127            .map(|attempt| TransactionSubmitterAttemptResponse {
128                tx_hash: attempt.tx_hash,
129            })
130            .collect(),
131    }
132}
133
134fn timestamp_ms_to_rfc3339(ms: i64) -> String {
135    chrono::DateTime::from_timestamp_millis(ms)
136        .map(|dt| dt.to_rfc3339())
137        .unwrap_or_else(|| ms.to_string())
138}
139
140fn optional_timestamp_ms_to_rfc3339(ms: Option<i64>) -> Option<String> {
141    ms.map(timestamp_ms_to_rfc3339)
142}
143
144/// GET /monitoring/directive-outbox - Recent directive delivery rows.
145#[utoipa::path(
146    get,
147    path = "/monitoring/directive-outbox",
148    params(DirectiveOutboxQuery),
149    responses(
150        (status = 200, description = "Recent directive outbox rows", body = DirectiveOutboxResponse),
151        (status = 401, description = "Invalid or missing X-Admin-Key header")
152    ),
153    tag = "Monitoring",
154    security(("admin_key" = []))
155)]
156pub async fn directive_outbox(
157    State(app_state): State<AdminState>,
158    Query(params): Query<DirectiveOutboxQuery>,
159) -> impl IntoResponse {
160    let directive_outbox: &dyn AsyncDirectiveOutboxReader = app_state.db.as_ref();
161
162    match directive_outbox
163        .list_recent_directive_outbox_rows(params.limit as i64, params.offset as i64)
164        .await
165    {
166        Ok(rows) => (
167            StatusCode::OK,
168            SonicJson(directive_outbox_response_from_rows(rows)),
169        )
170            .into_response(),
171        Err(error) => {
172            tracing::error!(%error, "directive outbox monitoring query failed");
173            (
174                StatusCode::INTERNAL_SERVER_ERROR,
175                SonicJson(json!({ "error": "directive outbox query failed" })),
176            )
177                .into_response()
178        }
179    }
180}
181
182/// GET /monitoring/transaction-submitter - Submitter-owned nonce detail.
183#[utoipa::path(
184    get,
185    path = "/monitoring/transaction-submitter",
186    params(TransactionSubmitterQuery),
187    responses(
188        (status = 200, description = "Submitter nonce detail", body = TransactionSubmitterDetailResponse),
189        (status = 400, description = "Invalid submitter query"),
190        (status = 404, description = "Submitter nonce not found"),
191        (status = 401, description = "Invalid or missing X-Admin-Key header")
192    ),
193    tag = "Monitoring",
194    security(("admin_key" = []))
195)]
196pub async fn transaction_submitter_detail(
197    State(app_state): State<AdminState>,
198    Query(params): Query<TransactionSubmitterQuery>,
199) -> impl IntoResponse {
200    let submitter = match Address::from_str(&params.submitter) {
201        Ok(submitter) => submitter,
202        Err(error) => {
203            return (
204                StatusCode::BAD_REQUEST,
205                SonicJson(json!({
206                    "error": format!("invalid submitter address: {error}")
207                })),
208            )
209                .into_response();
210        }
211    };
212    let reader: &dyn TransactionSubmitterReader = app_state.db.as_ref();
213    match reader
214        .get_submission_by_nonce(&submitter, params.nonce)
215        .await
216    {
217        Ok(Some(row)) => (
218            StatusCode::OK,
219            SonicJson(transaction_submitter_detail_response(row)),
220        )
221            .into_response(),
222        Ok(None) => (
223            StatusCode::NOT_FOUND,
224            SonicJson(json!({
225                "error": "submitter nonce not found"
226            })),
227        )
228            .into_response(),
229        Err(error) => (
230            StatusCode::INTERNAL_SERVER_ERROR,
231            SonicJson(json!({
232                "error": error.to_string()
233            })),
234        )
235            .into_response(),
236    }
237}
238
239#[cfg(test)]
240mod tests {
241    use super::*;
242    use hypercall_types::WalletAddress;
243
244    fn test_wallet(byte: u8) -> WalletAddress {
245        WalletAddress::from([byte; 20])
246    }
247
248    fn test_address(byte: u8) -> Address {
249        Address::from([byte; 20])
250    }
251
252    #[test]
253    fn directive_outbox_response_formats_recent_rows() {
254        let response =
255            directive_outbox_response_from_rows(vec![hypercall_db::DirectiveOutboxRecentRow {
256                outbox_seq: 42,
257                directive_id: "directive-1".to_string(),
258                kind: "needs_rsm_signature".to_string(),
259                action_key: "system_withdraw_token".to_string(),
260                wallet_address: test_wallet(1),
261                account_address: test_wallet(2),
262                signer_address: test_wallet(3),
263                directive_nonce: 9,
264                domain_status: "pending_chain_effect".to_string(),
265                delivery_status: "broadcasted".to_string(),
266                submitter_address: Some(test_address(4)),
267                submitter_nonce: Some(10),
268                tx_hash: Some("0xabc".to_string()),
269                delivery_attempts: 2,
270                last_delivery_error: Some("rpc timeout".to_string()),
271                created_ts_ms: 1_000,
272                last_attempt_ts_ms: Some(2_000),
273                expires_at_ms: Some(3_000),
274            }]);
275
276        assert_eq!(response.count, 1);
277        let row = &response.rows[0];
278        assert_eq!(row.outbox_seq, 42);
279        assert_eq!(row.directive_id, "directive-1");
280        assert_eq!(row.wallet_address, test_wallet(1).to_string());
281        assert_eq!(row.account_address, test_wallet(2).to_string());
282        assert_eq!(row.signer_address, test_wallet(3).to_string());
283        assert_eq!(row.submitter_address, Some(test_address(4).to_string()));
284        assert_eq!(row.submitter_nonce, Some(10));
285        assert_eq!(row.created_at, "1970-01-01T00:00:01+00:00");
286        assert_eq!(
287            row.last_attempt_at.as_deref(),
288            Some("1970-01-01T00:00:02+00:00")
289        );
290        assert_eq!(row.expires_at.as_deref(), Some("1970-01-01T00:00:03+00:00"));
291        assert_eq!(row.delivery_attempts, 2);
292        assert_eq!(row.last_delivery_error.as_deref(), Some("rpc timeout"));
293    }
294}