hypercall/
rsm_directive_publisher.rs1use std::sync::Arc;
2use std::time::Instant;
3
4use anyhow::anyhow;
5use sonic_rs::json;
6use tokio::sync::{mpsc, oneshot};
7
8use crate::journal::{EventPayload, JournalBatchSender, JournalEntry, JournalMessage};
9use crate::observability::command_trace::EngineStateDigest;
10use crate::shared::order_types::get_timestamp_millis;
11use crate::shared::topics::TOPIC_TRANSACTION_REQUESTS;
12use hypercall_db_diesel::engine_enums::{DbUuid, EventType};
13use hypercall_signer::SignedDirective;
14use hypercall_types::WalletAddress;
15use hypercall_types::{
16 serialize_to_wire_bytes, EngineMessage, SignedDirectiveTx, TransactionRequest, TransactionType,
17};
18
19const DEFAULT_DIRECTIVE_TTL_MS: u64 = 300_000;
20
21#[derive(Clone)]
22pub struct RsmDirectivePublisher {
23 transaction_request_sender: mpsc::UnboundedSender<EngineMessage>,
24 journal_batch_sender: Option<JournalBatchSender>,
25}
26
27impl RsmDirectivePublisher {
28 pub fn new(
29 transaction_request_sender: mpsc::UnboundedSender<EngineMessage>,
30 journal_batch_sender: Option<JournalBatchSender>,
31 ) -> Arc<Self> {
32 Arc::new(Self {
33 transaction_request_sender,
34 journal_batch_sender,
35 })
36 }
37
38 pub async fn publish(
39 &self,
40 request_id: &str,
41 account: &WalletAddress,
42 signed: SignedDirective,
43 ) -> anyhow::Result<()> {
44 let tx_request = build_rsm_transaction_request(request_id, account, &signed);
45 let message = EngineMessage::TransactionRequest(tx_request.clone());
46
47 if let Some(sender) = self.journal_batch_sender.clone() {
48 let (ack_tx, ack_rx) = oneshot::channel();
49 let entry =
50 build_journal_entry(request_id, account, &signed, &tx_request, &message, ack_tx)?;
51 let journal_message = JournalMessage::Entry(entry);
52 match sender.try_send(journal_message) {
53 Ok(()) => {}
54 Err(tokio::sync::mpsc::error::TrySendError::Full(message)) => sender
55 .send(message)
56 .await
57 .map_err(|_| anyhow!("failed to enqueue RSM transaction request"))?,
58 Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
59 anyhow::bail!("RSM transaction request journal batcher channel is closed")
60 }
61 }
62
63 match ack_rx.await {
64 Ok(()) => {}
65 Err(_) => {
66 panic!(
67 "CRITICAL_FAILURE: RSM TransactionRequest journal commit_ack dropped for request_id {}. \
68 Durability boundary is unknown, stopping to avoid live publish without confirmed persistence.",
69 request_id
70 );
71 }
72 }
73 }
74
75 self.transaction_request_sender
76 .send(message)
77 .map_err(|_| anyhow!("failed to enqueue RSM transaction request"))?;
78
79 Ok(())
80 }
81}
82
83fn build_journal_entry(
84 request_id: &str,
85 account: &WalletAddress,
86 signed: &SignedDirective,
87 tx_request: &TransactionRequest,
88 message: &EngineMessage,
89 ack_tx: oneshot::Sender<()>,
90) -> anyhow::Result<JournalEntry> {
91 let request_uuid = DbUuid(tx_request.request_id.parse::<uuid::Uuid>().map_err(|err| {
92 anyhow!(
93 "invalid transaction request_id {}: {}",
94 tx_request.request_id,
95 err
96 )
97 })?);
98
99 Ok(JournalEntry {
100 received_ts_ms: tx_request.timestamp,
101 command_data: serialize_to_wire_bytes(&json!({
102 "account": account,
103 "nonce": signed.nonce,
104 "requestId": request_id,
105 })),
106 response_data: None,
107 order_id: None,
108 pre_digest: EngineStateDigest::default(),
109 post_digest: EngineStateDigest::default(),
110 duration_ms: 0,
111 events: vec![EventPayload {
112 event_topic: TOPIC_TRANSACTION_REQUESTS.to_string(),
113 event_key: message.partition_key(),
114 event_data: serialize_to_wire_bytes(tx_request),
115 l2_sequence: None,
116 event_type_enum: EventType::TransactionRequest,
117 }],
118 outbox_appends: Vec::new(),
119 fill_side_effects: vec![],
120 cash_withdrawal_side_effect: None,
121 balance_updates: Vec::new(),
122 created_at: Instant::now(),
123 commit_ack: Some(ack_tx),
124 request_uuid,
125 command_type_enum: None,
126 #[cfg(feature = "rsm-state")]
127 command_identity_hash: [0u8; 32],
128 #[cfg(feature = "rsm-state")]
129 rsm_state_digest: None,
130 })
131}
132
133fn build_rsm_transaction_request(
134 request_id: &str,
135 account: &WalletAddress,
136 signed: &SignedDirective,
137) -> TransactionRequest {
138 let timestamp = get_timestamp_millis();
139 TransactionRequest {
140 request_id: request_id.to_string(),
141 wallet_address: *account,
142 account_contract: *account,
143 transaction_type: TransactionType::RsmDirective(SignedDirectiveTx {
144 directive: signed.directive.clone(),
145 signature: signed.signature.clone(),
146 }),
147 timestamp,
148 expires_at: timestamp + DEFAULT_DIRECTIVE_TTL_MS,
149 }
150}