Skip to main content

hypercall/
rsm_directive_publisher.rs

1use std::sync::Arc;
2use std::time::Instant;
3
4use anyhow::anyhow;
5use sonic_rs::json;
6use tokio::sync::{mpsc, oneshot};
7
8use crate::journal::{EventPayload, JournalBatchSender, JournalEntry, JournalMessage};
9use crate::observability::command_trace::EngineStateDigest;
10use crate::shared::order_types::get_timestamp_millis;
11use crate::shared::topics::TOPIC_TRANSACTION_REQUESTS;
12use hypercall_db_diesel::engine_enums::{DbUuid, EventType};
13use hypercall_signer::SignedDirective;
14use hypercall_types::WalletAddress;
15use hypercall_types::{
16    serialize_to_wire_bytes, EngineMessage, SignedDirectiveTx, TransactionRequest, TransactionType,
17};
18
19const DEFAULT_DIRECTIVE_TTL_MS: u64 = 300_000;
20
21#[derive(Clone)]
22pub struct RsmDirectivePublisher {
23    transaction_request_sender: mpsc::UnboundedSender<EngineMessage>,
24    journal_batch_sender: Option<JournalBatchSender>,
25}
26
27impl RsmDirectivePublisher {
28    pub fn new(
29        transaction_request_sender: mpsc::UnboundedSender<EngineMessage>,
30        journal_batch_sender: Option<JournalBatchSender>,
31    ) -> Arc<Self> {
32        Arc::new(Self {
33            transaction_request_sender,
34            journal_batch_sender,
35        })
36    }
37
38    pub async fn publish(
39        &self,
40        request_id: &str,
41        account: &WalletAddress,
42        signed: SignedDirective,
43    ) -> anyhow::Result<()> {
44        let tx_request = build_rsm_transaction_request(request_id, account, &signed);
45        let message = EngineMessage::TransactionRequest(tx_request.clone());
46
47        if let Some(sender) = self.journal_batch_sender.clone() {
48            let (ack_tx, ack_rx) = oneshot::channel();
49            let entry =
50                build_journal_entry(request_id, account, &signed, &tx_request, &message, ack_tx)?;
51            let journal_message = JournalMessage::Entry(entry);
52            match sender.try_send(journal_message) {
53                Ok(()) => {}
54                Err(tokio::sync::mpsc::error::TrySendError::Full(message)) => sender
55                    .send(message)
56                    .await
57                    .map_err(|_| anyhow!("failed to enqueue RSM transaction request"))?,
58                Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
59                    anyhow::bail!("RSM transaction request journal batcher channel is closed")
60                }
61            }
62
63            match ack_rx.await {
64                Ok(()) => {}
65                Err(_) => {
66                    panic!(
67                        "CRITICAL_FAILURE: RSM TransactionRequest journal commit_ack dropped for request_id {}. \
68                         Durability boundary is unknown, stopping to avoid live publish without confirmed persistence.",
69                        request_id
70                    );
71                }
72            }
73        }
74
75        self.transaction_request_sender
76            .send(message)
77            .map_err(|_| anyhow!("failed to enqueue RSM transaction request"))?;
78
79        Ok(())
80    }
81}
82
83fn build_journal_entry(
84    request_id: &str,
85    account: &WalletAddress,
86    signed: &SignedDirective,
87    tx_request: &TransactionRequest,
88    message: &EngineMessage,
89    ack_tx: oneshot::Sender<()>,
90) -> anyhow::Result<JournalEntry> {
91    let request_uuid = DbUuid(tx_request.request_id.parse::<uuid::Uuid>().map_err(|err| {
92        anyhow!(
93            "invalid transaction request_id {}: {}",
94            tx_request.request_id,
95            err
96        )
97    })?);
98
99    Ok(JournalEntry {
100        received_ts_ms: tx_request.timestamp,
101        command_data: serialize_to_wire_bytes(&json!({
102            "account": account,
103            "nonce": signed.nonce,
104            "requestId": request_id,
105        })),
106        response_data: None,
107        order_id: None,
108        pre_digest: EngineStateDigest::default(),
109        post_digest: EngineStateDigest::default(),
110        duration_ms: 0,
111        events: vec![EventPayload {
112            event_topic: TOPIC_TRANSACTION_REQUESTS.to_string(),
113            event_key: message.partition_key(),
114            event_data: serialize_to_wire_bytes(tx_request),
115            l2_sequence: None,
116            event_type_enum: EventType::TransactionRequest,
117        }],
118        outbox_appends: Vec::new(),
119        fill_side_effects: vec![],
120        cash_withdrawal_side_effect: None,
121        balance_updates: Vec::new(),
122        created_at: Instant::now(),
123        commit_ack: Some(ack_tx),
124        request_uuid,
125        command_type_enum: None,
126        #[cfg(feature = "rsm-state")]
127        command_identity_hash: [0u8; 32],
128        #[cfg(feature = "rsm-state")]
129        rsm_state_digest: None,
130    })
131}
132
133fn build_rsm_transaction_request(
134    request_id: &str,
135    account: &WalletAddress,
136    signed: &SignedDirective,
137) -> TransactionRequest {
138    let timestamp = get_timestamp_millis();
139    TransactionRequest {
140        request_id: request_id.to_string(),
141        wallet_address: *account,
142        account_contract: *account,
143        transaction_type: TransactionType::RsmDirective(SignedDirectiveTx {
144            directive: signed.directive.clone(),
145            signature: signed.signature.clone(),
146        }),
147        timestamp,
148        expires_at: timestamp + DEFAULT_DIRECTIVE_TTL_MS,
149    }
150}