hypercall/messaging/
channel_event_bus.rs1use async_trait::async_trait;
2use std::collections::HashMap;
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::Arc;
5use tokio::sync::{mpsc, RwLock};
6use tracing::{debug, info};
7
8use super::event_bus_trait::{EventBusTrait, LegacyMessageEnvelope, MessageEnvelope, TopicOffsets};
9use hypercall_types::EngineMessage;
10
11pub struct ChannelEventBus {
13 sender: mpsc::UnboundedSender<EngineMessage>,
15 receiver: Arc<RwLock<Option<mpsc::UnboundedReceiver<EngineMessage>>>>,
16
17 subscribers: Arc<RwLock<HashMap<String, Vec<mpsc::UnboundedSender<EngineMessage>>>>>,
19
20 subscribers_with_offsets:
22 Arc<RwLock<HashMap<String, Vec<mpsc::UnboundedSender<LegacyMessageEnvelope>>>>>,
23
24 subscribers_with_sequence:
26 Arc<RwLock<HashMap<String, Vec<mpsc::UnboundedSender<MessageEnvelope>>>>>,
27
28 sequence: Arc<AtomicU64>,
30
31 replay_error: Arc<RwLock<Option<String>>>,
34}
35
36pub type MockEventBus = ChannelEventBus;
38
39impl ChannelEventBus {
40 pub fn new() -> Result<Self, String> {
41 let (sender, receiver) = mpsc::unbounded_channel();
42
43 Ok(Self {
44 sender,
45 receiver: Arc::new(RwLock::new(Some(receiver))),
46 subscribers: Arc::new(RwLock::new(HashMap::new())),
47 subscribers_with_offsets: Arc::new(RwLock::new(HashMap::new())),
48 subscribers_with_sequence: Arc::new(RwLock::new(HashMap::new())),
49 sequence: Arc::new(AtomicU64::new(0)),
50 replay_error: Arc::new(RwLock::new(None)),
51 })
52 }
53
54 pub async fn clear_subscribers(&self) {
58 self.subscribers.write().await.clear();
59 self.subscribers_with_offsets.write().await.clear();
60 self.subscribers_with_sequence.write().await.clear();
61 }
62
63 pub async fn set_replay_error(&self, error: Option<String>) {
65 *self.replay_error.write().await = error;
66 }
67
68 fn get_topic_for_event(event: &EngineMessage) -> &'static str {
69 use crate::shared::topics::*;
70 match event {
71 EngineMessage::OrderAction(_) => TOPIC_ORDER_ACTIONS,
72 EngineMessage::OrderUpdate(_) => TOPIC_ORDER_UPDATES,
73 EngineMessage::OrderInfo(_) => TOPIC_ORDER_INFOS,
74 EngineMessage::MarketAction(_) => TOPIC_MARKET_ACTIONS,
75 EngineMessage::MarketUpdate(_) => TOPIC_MARKET_UPDATES,
76 EngineMessage::OrderFilled { .. } => TOPIC_FILLS,
77 EngineMessage::OrderbookUpdated(_) => TOPIC_ORDERBOOK_UPDATES,
78 EngineMessage::L2Update(_) => TOPIC_L2_UPDATES,
79 EngineMessage::Trade(_) => TOPIC_TRADES,
80 EngineMessage::TransactionRequest(_) => TOPIC_TRANSACTION_REQUESTS,
81 EngineMessage::TransactionUpdate(_) => TOPIC_TRANSACTION_UPDATES,
82 EngineMessage::MmpTriggered(_) => TOPIC_MMP_TRIGGERS,
83 EngineMessage::PositionExpired(_) => TOPIC_POSITION_EXPIRED,
84 EngineMessage::TierUpdate(_) => TOPIC_TIER_UPDATES,
85 EngineMessage::HypercorePositionUpdate(_) => TOPIC_HYPERCORE_POSITION_UPDATES,
86 EngineMessage::LiquidationStateChange(_) => TOPIC_LIQUIDATION_STATE,
87 EngineMessage::RfqFilled(_) => TOPIC_RFQ_FILLS,
88 }
89 }
90}
91
92#[async_trait]
93impl EventBusTrait for ChannelEventBus {
94 fn get_sender(&self) -> mpsc::UnboundedSender<EngineMessage> {
95 self.sender.clone()
96 }
97
98 async fn subscribe(
99 &self,
100 topics: Vec<String>,
101 ) -> Result<mpsc::UnboundedReceiver<EngineMessage>, String> {
102 let (tx, rx) = mpsc::unbounded_channel();
103
104 let mut subscribers = self.subscribers.write().await;
105 for topic in &topics {
106 subscribers
107 .entry(topic.clone())
108 .or_insert_with(Vec::new)
109 .push(tx.clone());
110 }
111
112 Ok(rx)
113 }
114
115 async fn subscribe_with_offsets(
116 &self,
117 topics: Vec<String>,
118 ) -> Result<mpsc::UnboundedReceiver<LegacyMessageEnvelope>, String> {
119 let (tx, rx) = mpsc::unbounded_channel();
120
121 let mut subscribers = self.subscribers_with_offsets.write().await;
122 for topic in &topics {
123 subscribers
124 .entry(topic.clone())
125 .or_insert_with(Vec::new)
126 .push(tx.clone());
127 }
128
129 Ok(rx)
130 }
131
132 async fn subscribe_with_sequence(
133 &self,
134 topics: Vec<String>,
135 ) -> Result<mpsc::UnboundedReceiver<MessageEnvelope>, String> {
136 let (tx, rx) = mpsc::unbounded_channel();
137
138 let mut subscribers = self.subscribers_with_sequence.write().await;
139 for topic in &topics {
140 subscribers
141 .entry(topic.clone())
142 .or_insert_with(Vec::new)
143 .push(tx.clone());
144 }
145
146 Ok(rx)
147 }
148
149 async fn start_processing(self: Arc<Self>) {
150 let mut receiver = {
152 let mut lock = self.receiver.write().await;
153 lock.take().expect("start_processing called twice")
154 };
155
156 tokio::spawn(async move {
158 while let Some(event) = receiver.recv().await {
159 debug!("MockEventBus: Processing event {:?}", event);
160
161 let topic = Self::get_topic_for_event(&event);
163 let subscribers = self.subscribers.read().await;
164 if let Some(subs) = subscribers.get(topic) {
165 for sub in subs {
166 let _ = sub.send(event.clone());
167 }
168 }
169 drop(subscribers);
170
171 let seq = self.sequence.fetch_add(1, Ordering::Relaxed) + 1;
173
174 let envelope_subscribers = self.subscribers_with_offsets.read().await;
176 if let Some(subs) = envelope_subscribers.get(topic) {
177 let envelope = LegacyMessageEnvelope {
178 message: event.clone(),
179 partition: None,
180 offset: None,
181 };
182 for sub in subs {
183 let _ = sub.send(envelope.clone());
184 }
185 }
186 drop(envelope_subscribers);
187
188 let seq_subscribers = self.subscribers_with_sequence.read().await;
190 if let Some(subs) = seq_subscribers.get(topic) {
191 let envelope = MessageEnvelope {
192 message: event.clone(),
193 sequence: seq,
194 };
195 for sub in subs {
196 let _ = sub.send(envelope.clone());
197 }
198 }
199 }
200 });
201 }
202
203 async fn replay_from_offsets(
204 &self,
205 offsets: TopicOffsets,
206 _handler: super::event_bus_trait::ReplayHandler,
207 ) -> Result<TopicOffsets, String> {
208 if let Some(error) = self.replay_error.read().await.as_ref() {
210 info!(
211 "MockEventBus: replay_from_offsets returning simulated error: {}",
212 error
213 );
214 return Err(error.clone());
215 }
216
217 info!(
220 "MockEventBus: replay_from_offsets called with {} topics (no-op)",
221 offsets.len()
222 );
223 Ok(offsets)
224 }
225
226 fn get_current_offsets(&self) -> TopicOffsets {
227 HashMap::new()
229 }
230
231 fn get_current_sequence(&self) -> u64 {
232 self.sequence.load(Ordering::Relaxed)
233 }
234}